diff -Nru golang-github-nats-io-go-nats-1.20.0/debian/changelog golang-github-nats-io-go-nats-1.22.1/debian/changelog --- golang-github-nats-io-go-nats-1.20.0/debian/changelog 2022-11-26 01:11:29.000000000 +0000 +++ golang-github-nats-io-go-nats-1.22.1/debian/changelog 2022-12-23 23:53:51.000000000 +0000 @@ -1,3 +1,11 @@ +golang-github-nats-io-go-nats (1.22.1-1) unstable; urgency=medium + + * Team upload + * New upstream release + * Bump Standards-Version in d/control (no changes needed) + + -- Mathias Gibbens Fri, 23 Dec 2022 23:54:32 +0000 + golang-github-nats-io-go-nats (1.20.0-1) unstable; urgency=medium * Team upload. diff -Nru golang-github-nats-io-go-nats-1.20.0/debian/control golang-github-nats-io-go-nats-1.22.1/debian/control --- golang-github-nats-io-go-nats-1.20.0/debian/control 2022-11-26 01:10:58.000000000 +0000 +++ golang-github-nats-io-go-nats-1.22.1/debian/control 2022-12-23 23:52:56.000000000 +0000 @@ -10,7 +10,7 @@ golang-github-nats-io-nuid-dev, golang-google-protobuf-dev, golang-goprotobuf-dev -Standards-Version: 4.6.1 +Standards-Version: 4.6.2 Homepage: https://github.com/nats-io/nats.go Vcs-Browser: https://salsa.debian.org/go-team/packages/golang-github-nats-io-go-nats Vcs-Git: https://salsa.debian.org/go-team/packages/golang-github-nats-io-go-nats.git diff -Nru golang-github-nats-io-go-nats-1.20.0/examples/nats-bench/main.go golang-github-nats-io-go-nats-1.22.1/examples/nats-bench/main.go --- golang-github-nats-io-go-nats-1.20.0/examples/nats-bench/main.go 2022-11-11 15:16:40.000000000 +0000 +++ golang-github-nats-io-go-nats-1.22.1/examples/nats-bench/main.go 2022-12-22 14:09:21.000000000 +0000 @@ -16,7 +16,6 @@ import ( "flag" "fmt" - "io/ioutil" "log" "os" "sync" @@ -145,7 +144,7 @@ if len(*csvFile) > 0 { csv := benchmark.CSV() - ioutil.WriteFile(*csvFile, []byte(csv), 0644) + os.WriteFile(*csvFile, []byte(csv), 0644) fmt.Printf("Saved metric data in csv file %s\n", *csvFile) } } diff -Nru golang-github-nats-io-go-nats-1.20.0/examples/nats-echo/main.go golang-github-nats-io-go-nats-1.22.1/examples/nats-echo/main.go --- golang-github-nats-io-go-nats-1.20.0/examples/nats-echo/main.go 2022-11-11 15:16:40.000000000 +0000 +++ golang-github-nats-io-go-nats-1.22.1/examples/nats-echo/main.go 2022-12-22 14:09:21.000000000 +0000 @@ -17,7 +17,7 @@ "encoding/json" "flag" "fmt" - "io/ioutil" + "io" "log" "net/http" "os" @@ -208,7 +208,7 @@ log.Fatalf("Could not retrieve geo location data: %v", err) } defer resp.Body.Close() - body, _ := ioutil.ReadAll(resp.Body) + body, _ := io.ReadAll(resp.Body) g := geo{} if err := json.Unmarshal(body, &g); err != nil { log.Fatalf("Error unmarshalling geo: %v", err) diff -Nru golang-github-nats-io-go-nats-1.20.0/example_test.go golang-github-nats-io-go-nats-1.22.1/example_test.go --- golang-github-nats-io-go-nats-1.20.0/example_test.go 2022-11-11 15:16:40.000000000 +0000 +++ golang-github-nats-io-go-nats-1.22.1/example_test.go 2022-12-22 14:09:21.000000000 +0000 @@ -17,6 +17,7 @@ "context" "fmt" "log" + "net" "time" "github.com/nats-io/nats.go" @@ -44,6 +45,40 @@ nc.Close() } +type skipTLSDialer struct { + dialer *net.Dialer + skipTLS bool +} + +func (sd *skipTLSDialer) Dial(network, address string) (net.Conn, error) { + return sd.dialer.Dial(network, address) +} + +func (sd *skipTLSDialer) SkipTLSHandshake() bool { + return sd.skipTLS +} + +func ExampleCustomDialer() { + // Given the following CustomDialer implementation: + // + // type skipTLSDialer struct { + // dialer *net.Dialer + // skipTLS bool + // } + // + // func (sd *skipTLSDialer) Dial(network, address string) (net.Conn, error) { + // return sd.dialer.Dial(network, address) + // } + // + // func (sd *skipTLSDialer) SkipTLSHandshake() bool { + // return true + // } + // + sd := &skipTLSDialer{dialer: &net.Dialer{Timeout: 2 * time.Second}, skipTLS: true} + nc, _ := nats.Connect("demo.nats.io", nats.SetCustomDialer(sd)) + defer nc.Close() +} + // This Example shows an asynchronous subscriber. func ExampleConn_Subscribe() { nc, _ := nats.Connect(nats.DefaultURL) diff -Nru golang-github-nats-io-go-nats-1.20.0/jserrors.go golang-github-nats-io-go-nats-1.22.1/jserrors.go --- golang-github-nats-io-go-nats-1.20.0/jserrors.go 2022-11-11 15:16:40.000000000 +0000 +++ golang-github-nats-io-go-nats-1.22.1/jserrors.go 2022-12-22 14:09:21.000000000 +0000 @@ -101,6 +101,9 @@ // ErrConsumerDeleted is returned when attempting to send pull request to a consumer which does not exist ErrConsumerDeleted JetStreamError = &jsError{message: "consumer deleted"} + // ErrConsumerLeadershipChanged is returned when pending requests are no longer valid after leadership has changed + ErrConsumerLeadershipChanged JetStreamError = &jsError{message: "Leadership Changed"} + // DEPRECATED: ErrInvalidDurableName is no longer returned and will be removed in future releases. // Use ErrInvalidConsumerName instead. ErrInvalidDurableName = errors.New("nats: invalid durable name") @@ -123,6 +126,8 @@ JSErrCodeMessageNotFound ErrorCode = 10037 JSErrCodeBadRequest ErrorCode = 10003 + + JSErrCodeStreamWrongLastSequence ErrorCode = 10071 ) // APIError is included in all API responses if there was an error. diff -Nru golang-github-nats-io-go-nats-1.20.0/js.go golang-github-nats-io-go-nats-1.22.1/js.go --- golang-github-nats-io-go-nats-1.20.0/js.go 2022-11-11 15:16:40.000000000 +0000 +++ golang-github-nats-io-go-nats-1.22.1/js.go 2022-12-22 14:09:21.000000000 +0000 @@ -1528,13 +1528,11 @@ } // Find the stream mapped to the subject if not bound to a stream already. - if o.stream == _EMPTY_ { + if stream == _EMPTY_ { stream, err = js.StreamNameBySubject(subj) if err != nil { return nil, err } - } else { - stream = o.stream } // With an explicit durable name, we can lookup the consumer first @@ -2580,6 +2578,11 @@ err = ErrConsumerDeleted break } + + if strings.Contains(strings.ToLower(string(msg.Header.Get(descrHdr))), "leadership change") { + err = ErrConsumerLeadershipChanged + break + } fallthrough default: err = fmt.Errorf("nats: %s", msg.Header.Get(descrHdr)) diff -Nru golang-github-nats-io-go-nats-1.20.0/js_test.go golang-github-nats-io-go-nats-1.22.1/js_test.go --- golang-github-nats-io-go-nats-1.20.0/js_test.go 2022-11-11 15:16:40.000000000 +0000 +++ golang-github-nats-io-go-nats-1.22.1/js_test.go 2022-12-22 14:09:21.000000000 +0000 @@ -23,7 +23,6 @@ "encoding/json" "errors" "fmt" - "io/ioutil" "math/rand" "os" "reflect" @@ -69,13 +68,13 @@ func createConfFile(t *testing.T, content []byte) string { t.Helper() - conf, err := ioutil.TempFile("", "") + conf, err := os.CreateTemp("", "") if err != nil { t.Fatalf("Error creating conf file: %v", err) } fName := conf.Name() conf.Close() - if err := ioutil.WriteFile(fName, content, 0666); err != nil { + if err := os.WriteFile(fName, content, 0666); err != nil { os.Remove(fName) t.Fatalf("Error writing conf file: %v", err) } diff -Nru golang-github-nats-io-go-nats-1.20.0/kv.go golang-github-nats-io-go-nats-1.22.1/kv.go --- golang-github-nats-io-go-nats-1.20.0/kv.go 2022-11-11 15:16:40.000000000 +0000 +++ golang-github-nats-io-go-nats-1.22.1/kv.go 2022-12-22 14:09:21.000000000 +0000 @@ -25,9 +25,7 @@ "time" ) -// Notice: Experimental Preview -// -// This functionality is EXPERIMENTAL and may be changed in later releases. +// KeyValueManager is used to manage KeyValue stores. type KeyValueManager interface { // KeyValue will lookup and bind to an existing KeyValue store. KeyValue(bucket string) (KeyValue, error) @@ -41,9 +39,7 @@ KeyValueStores() <-chan KeyValueStatus } -// Notice: Experimental Preview -// -// This functionality is EXPERIMENTAL and may be changed in later releases. +// KeyValue contains methods to operate on a KeyValue store. type KeyValue interface { // Get returns the latest value for the key. Get(key string) (entry KeyValueEntry, err error) @@ -299,6 +295,10 @@ ErrNoKeysFound = errors.New("nats: no keys found") ) +var ( + ErrKeyExists JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamWrongLastSequence, Code: 400}, message: "key exists"} +) + const ( kvBucketNamePre = "KV_" kvBucketNameTmpl = "KV_%s" @@ -629,6 +629,13 @@ return kv.Update(key, value, e.Revision()) } + // Check if the expected last subject sequence is not zero which implies + // the key already exists. + if errors.Is(err, ErrKeyExists) { + jserr := ErrKeyExists.(*jsError) + return 0, fmt.Errorf("%w: %s", err, jserr.message) + } + return 0, err } diff -Nru golang-github-nats-io-go-nats-1.20.0/kv_test.go golang-github-nats-io-go-nats-1.22.1/kv_test.go --- golang-github-nats-io-go-nats-1.20.0/kv_test.go 2022-11-11 15:16:40.000000000 +0000 +++ golang-github-nats-io-go-nats-1.22.1/kv_test.go 2022-12-22 14:09:21.000000000 +0000 @@ -14,6 +14,7 @@ package nats import ( + "errors" "fmt" "testing" "time" @@ -203,3 +204,50 @@ } } } + +func TestKeyValueCreate(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + kv, err := js.CreateKeyValue(&KeyValueConfig{Bucket: "TEST"}) + if err != nil { + t.Fatalf("Error creating kv: %v", err) + } + + _, err = kv.Create("key", []byte("1")) + if err != nil { + t.Fatalf("Error creating key: %v", err) + } + + _, err = kv.Create("key", []byte("1")) + expected := "nats: wrong last sequence: 1: key exists" + if err.Error() != expected { + t.Fatalf("Expected %q, got: %v", expected, err) + } + if !errors.Is(err, ErrKeyExists) { + t.Fatalf("Expected ErrKeyExists, got: %v", err) + } + aerr := &APIError{} + if !errors.As(err, &aerr) { + t.Fatalf("Expected APIError, got: %v", err) + } + if aerr.Description != "wrong last sequence: 1" { + t.Fatalf("Unexpected APIError message, got: %v", aerr.Description) + } + if aerr.ErrorCode != 10071 { + t.Fatalf("Unexpected error code, got: %v", aerr.ErrorCode) + } + if aerr.Code != ErrKeyExists.APIError().Code { + t.Fatalf("Unexpected error code, got: %v", aerr.Code) + } + var kerr JetStreamError + if !errors.As(err, &kerr) { + t.Fatalf("Expected KeyValueError, got: %v", err) + } + if kerr.APIError().ErrorCode != 10071 { + t.Fatalf("Unexpected error code, got: %v", kerr.APIError().ErrorCode) + } +} diff -Nru golang-github-nats-io-go-nats-1.20.0/micro/example_package_test.go golang-github-nats-io-go-nats-1.22.1/micro/example_package_test.go --- golang-github-nats-io-go-nats-1.20.0/micro/example_package_test.go 1970-01-01 00:00:00.000000000 +0000 +++ golang-github-nats-io-go-nats-1.22.1/micro/example_package_test.go 2022-12-22 14:09:21.000000000 +0000 @@ -0,0 +1,83 @@ +// Copyright 2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package micro + +import ( + "fmt" + "log" + "strconv" + "time" + + "github.com/nats-io/nats.go" +) + +func Example() { + s := RunServerOnPort(-1) + defer s.Shutdown() + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + log.Fatal(err) + } + defer nc.Close() + + // Service handler is a function which takes Service.Request as argument. + // req.Respond or req.Error should be used to respond to the request. + incrementHandler := func(req *Request) { + val, err := strconv.Atoi(string(req.Data())) + if err != nil { + req.Error("400", "request data should be a number", nil) + return + } + + responseData := val + 1 + req.Respond([]byte(strconv.Itoa(responseData))) + } + + config := Config{ + Name: "IncrementService", + Version: "0.1.0", + Description: "Increment numbers", + Endpoint: Endpoint{ + // service handler + Handler: incrementHandler, + // a unique subject serving as a service endpoint + Subject: "numbers.increment", + }, + } + // Multiple instances of the servcice with the same name can be created. + // Requests to a service with the same name will be load-balanced. + for i := 0; i < 5; i++ { + svc, err := AddService(nc, config) + if err != nil { + log.Fatal(err) + } + defer svc.Stop() + } + + // send a request to a service + resp, err := nc.Request("numbers.increment", []byte("3"), 1*time.Second) + if err != nil { + log.Fatal(err) + } + responseVal, err := strconv.Atoi(string(resp.Data)) + if err != nil { + log.Fatal(err) + } + fmt.Println(responseVal) + + // + // Output: 4 + // +} diff -Nru golang-github-nats-io-go-nats-1.20.0/micro/example_test.go golang-github-nats-io-go-nats-1.22.1/micro/example_test.go --- golang-github-nats-io-go-nats-1.20.0/micro/example_test.go 1970-01-01 00:00:00.000000000 +0000 +++ golang-github-nats-io-go-nats-1.22.1/micro/example_test.go 2022-12-22 14:09:21.000000000 +0000 @@ -0,0 +1,265 @@ +// Copyright 2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package micro + +import ( + "fmt" + "log" + "reflect" + + "github.com/nats-io/nats.go" +) + +func ExampleAddService() { + nc, err := nats.Connect("127.0.0.1:4222") + if err != nil { + log.Fatal(err) + } + defer nc.Close() + + echoHandler := func(req *Request) { + req.Respond(req.Data()) + } + + config := Config{ + Name: "EchoService", + Version: "v1.0.0", + Description: "Send back what you receive", + Endpoint: Endpoint{ + Subject: "echo", + Handler: echoHandler, + }, + + // DoneHandler can be set to customize behavior on stopping a service. + DoneHandler: func(srv Service) { + info := srv.Info() + fmt.Printf("stopped service %q with ID %q\n", info.Name, info.ID) + }, + + // ErrorHandler can be used to customize behavior on service execution error. + ErrorHandler: func(srv Service, err *NATSError) { + info := srv.Info() + fmt.Printf("Service %q returned an error on subject %q: %s", info.Name, err.Subject, err.Description) + }, + } + + srv, err := AddService(nc, config) + if err != nil { + log.Fatal(err) + } + defer srv.Stop() +} + +func ExampleService_Info() { + nc, err := nats.Connect("127.0.0.1:4222") + if err != nil { + log.Fatal(err) + } + defer nc.Close() + + config := Config{ + Name: "EchoService", + Endpoint: Endpoint{ + Subject: "echo", + Handler: func(*Request) {}, + }, + } + + srv, _ := AddService(nc, config) + + // service info + info := srv.Info() + + fmt.Println(info.ID) + fmt.Println(info.Name) + fmt.Println(info.Description) + fmt.Println(info.Version) + fmt.Println(info.Subject) +} + +func ExampleService_Stats() { + nc, err := nats.Connect("127.0.0.1:4222") + if err != nil { + log.Fatal(err) + } + defer nc.Close() + + config := Config{ + Name: "EchoService", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "echo", + Handler: func(*Request) {}, + }, + } + + srv, _ := AddService(nc, config) + + // stats of a service instance + stats := srv.Stats() + + fmt.Println(stats.AverageProcessingTime) + fmt.Println(stats.ProcessingTime) + +} + +func ExampleService_Stop() { + nc, err := nats.Connect("127.0.0.1:4222") + if err != nil { + log.Fatal(err) + } + defer nc.Close() + + config := Config{ + Name: "EchoService", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "echo", + Handler: func(*Request) {}, + }, + } + + srv, _ := AddService(nc, config) + + // stop a service + err = srv.Stop() + if err != nil { + log.Fatal(err) + } + + // stop is idempotent so multiple executions will not return an error + err = srv.Stop() + if err != nil { + log.Fatal(err) + } +} + +func ExampleService_Stopped() { + nc, err := nats.Connect("127.0.0.1:4222") + if err != nil { + log.Fatal(err) + } + defer nc.Close() + + config := Config{ + Name: "EchoService", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "echo", + Handler: func(*Request) {}, + }, + } + + srv, _ := AddService(nc, config) + + // stop a service + err = srv.Stop() + if err != nil { + log.Fatal(err) + } + + if srv.Stopped() { + fmt.Println("service stopped") + } +} + +func ExampleService_Reset() { + nc, err := nats.Connect("127.0.0.1:4222") + if err != nil { + log.Fatal(err) + } + defer nc.Close() + + config := Config{ + Name: "EchoService", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "echo", + Handler: func(*Request) {}, + }, + } + + srv, _ := AddService(nc, config) + + // reset endpoint stats on this service + srv.Reset() + + empty := Stats{ + ServiceIdentity: srv.Info().ServiceIdentity, + } + if !reflect.DeepEqual(srv.Stats(), empty) { + log.Fatal("Expected endpoint stats to be empty") + } +} + +func ExampleControlSubject() { + + // subject used to get PING from all services + subjectPINGAll, _ := ControlSubject(PingVerb, "", "") + fmt.Println(subjectPINGAll) + + // subject used to get PING from services with provided name + subjectPINGName, _ := ControlSubject(PingVerb, "CoolService", "") + fmt.Println(subjectPINGName) + + // subject used to get PING from a service with provided name and ID + subjectPINGInstance, _ := ControlSubject(PingVerb, "CoolService", "123") + fmt.Println(subjectPINGInstance) + + // Output: + // $SRV.PING + // $SRV.PING.CoolService + // $SRV.PING.CoolService.123 +} + +func ExampleRequest_Respond() { + handler := func(req *Request) { + // respond to the request + if err := req.Respond(req.Data()); err != nil { + log.Fatal(err) + } + } + + fmt.Printf("%T", handler) +} + +func ExampleRequest_RespondJSON() { + type Point struct { + X int `json:"x"` + Y int `json:"y"` + } + + handler := func(req *Request) { + resp := Point{5, 10} + // respond to the request + // response will be serialized to {"x":5,"y":10} + if err := req.RespondJSON(resp); err != nil { + log.Fatal(err) + } + } + + fmt.Printf("%T", handler) +} + +func ExampleRequest_Error() { + handler := func(req *Request) { + // respond with an error + // Error sets Nats-Service-Error and Nats-Service-Error-Code headers in the response + if err := req.Error("400", "bad request", []byte(`{"error": "value should be a number"}`)); err != nil { + log.Fatal(err) + } + } + + fmt.Printf("%T", handler) +} diff -Nru golang-github-nats-io-go-nats-1.20.0/micro/request.go golang-github-nats-io-go-nats-1.22.1/micro/request.go --- golang-github-nats-io-go-nats-1.20.0/micro/request.go 1970-01-01 00:00:00.000000000 +0000 +++ golang-github-nats-io-go-nats-1.22.1/micro/request.go 2022-12-22 14:09:21.000000000 +0000 @@ -0,0 +1,132 @@ +// Copyright 2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package micro + +import ( + "encoding/json" + "errors" + "fmt" + + "github.com/nats-io/nats.go" +) + +type ( + // Request represents service request available in the service handler. + // It exposes methods to respond to the request, as well as + // getting the request data and headers. + Request struct { + msg *nats.Msg + respondError error + } + + // RequestHandler is a function used as a Handler for a service. + RequestHandler func(*Request) + + // Headers is a wrapper around [*nats.Header] + Headers nats.Header +) + +var ( + ErrRespond = errors.New("NATS error when sending response") + ErrMarshalResponse = errors.New("marshaling response") + ErrArgRequired = errors.New("argument required") +) + +// RespondOpt is a +type RespondOpt func(*nats.Msg) + +func (r *Request) Respond(response []byte, opts ...RespondOpt) error { + respMsg := &nats.Msg{ + Data: response, + } + for _, opt := range opts { + opt(respMsg) + } + + if err := r.msg.RespondMsg(respMsg); err != nil { + r.respondError = fmt.Errorf("%w: %s", ErrRespond, err) + return r.respondError + } + + return nil +} + +func (r *Request) RespondJSON(response interface{}, opts ...RespondOpt) error { + resp, err := json.Marshal(response) + if err != nil { + return ErrMarshalResponse + } + return r.Respond(resp, opts...) +} + +// Error prepares and publishes error response from a handler. +// A response error should be set containing an error code and description. +// Optionally, data can be set as response payload. +func (r *Request) Error(code, description string, data []byte, opts ...RespondOpt) error { + if code == "" { + return fmt.Errorf("%w: error code", ErrArgRequired) + } + if description == "" { + return fmt.Errorf("%w: description", ErrArgRequired) + } + response := &nats.Msg{ + Header: nats.Header{ + ErrorHeader: []string{description}, + ErrorCodeHeader: []string{code}, + }, + } + for _, opt := range opts { + opt(response) + } + + response.Data = data + if err := r.msg.RespondMsg(response); err != nil { + r.respondError = err + return err + } + return nil +} + +func WithHeaders(headers Headers) RespondOpt { + return func(m *nats.Msg) { + if m.Header == nil { + m.Header = nats.Header(headers) + return + } + + for k, v := range headers { + m.Header[k] = v + } + } +} + +func (r *Request) Data() []byte { + return r.msg.Data +} + +func (r *Request) Headers() Headers { + return Headers(r.msg.Header) +} + +// Get gets the first value associated with the given key. +// It is case-sensitive. +func (h Headers) Get(key string) string { + return nats.Header(h).Get(key) +} + +// Values returns all values associated with the given key. +// It is case-sensitive. +func (h Headers) Values(key string) []string { + return nats.Header(h).Values(key) +} diff -Nru golang-github-nats-io-go-nats-1.20.0/micro/service.go golang-github-nats-io-go-nats-1.22.1/micro/service.go --- golang-github-nats-io-go-nats-1.20.0/micro/service.go 1970-01-01 00:00:00.000000000 +0000 +++ golang-github-nats-io-go-nats-1.22.1/micro/service.go 2022-12-22 14:09:21.000000000 +0000 @@ -0,0 +1,596 @@ +// Copyright 2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package micro + +import ( + "encoding/json" + "errors" + "fmt" + "regexp" + "sync" + "time" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nuid" +) + +// Notice: Experimental Preview +// +// This functionality is EXPERIMENTAL and may be changed in later releases. + +type ( + Service interface { + // Info returns the service info. + Info() Info + + // Stats returns statisctics for the service endpoint and all monitoring endpoints. + Stats() Stats + + // Reset resets all statistics on a service instance. + Reset() + + // Stop drains the endpoint subscriptions and marks the service as stopped. + Stop() error + + // Stopped informs whether [Stop] was executed on the service. + Stopped() bool + } + + // ErrHandler is a function used to configure a custom error handler for a service, + ErrHandler func(Service, *NATSError) + + // DoneHandler is a function used to configure a custom done handler for a service. + DoneHandler func(Service) + + // StatsHandleris a function used to configure a custom STATS endpoint. + // It should return a value which can be serialized to JSON. + StatsHandler func(Endpoint) interface{} + + // ServiceIdentity contains fields helping to identidy a service instance. + ServiceIdentity struct { + Name string `json:"name"` + ID string `json:"id"` + Version string `json:"version"` + } + + // Stats is the type returned by STATS monitoring endpoint. + // It contains stats for a specific endpoint (either request handler or monitoring enpoints). + Stats struct { + ServiceIdentity + NumRequests int `json:"num_requests"` + NumErrors int `json:"num_errors"` + LastError string `json:"last_error"` + ProcessingTime time.Duration `json:"processing_time"` + AverageProcessingTime time.Duration `json:"average_processing_time"` + Started string `json:"started"` + Data json.RawMessage `json:"data,omitempty"` + } + + // Ping is the response type for PING monitoring endpoint. + Ping ServiceIdentity + + // Info is the basic information about a service type. + Info struct { + ServiceIdentity + Description string `json:"description"` + Subject string `json:"subject"` + } + + // SchemaResp is the response value for SCHEMA requests. + SchemaResp struct { + ServiceIdentity + Schema Schema `json:"schema"` + } + + // Schema can be used to configure a schema for a service. + // It is olso returned by the SCHEMA monitoring service (if set). + Schema struct { + Request string `json:"request"` + Response string `json:"response"` + } + + // Endpoint is used to configure a subject and handler for a service. + Endpoint struct { + Subject string `json:"subject"` + Handler RequestHandler + } + + // Verb represents a name of the monitoring service. + Verb int64 + + // Config is a configuration of a service. + Config struct { + Name string `json:"name"` + Version string `json:"version"` + Description string `json:"description"` + Schema Schema `json:"schema"` + Endpoint Endpoint `json:"endpoint"` + StatsHandler StatsHandler + DoneHandler DoneHandler + ErrorHandler ErrHandler + } + + // NATSError represents an error returned by a NATS Subscription. + // It contains a subject on which the subscription failed, so that + // it can be linked with a specific service endpoint. + NATSError struct { + Subject string + Description string + } + + // service represents a configured NATS service. + // It should be created using [Add] in order to configure the appropriate NATS subscriptions + // for request handler and monitoring. + service struct { + // Config contains a configuration of the service + Config + + m sync.Mutex + id string + reqSub *nats.Subscription + verbSubs map[string]*nats.Subscription + stats *Stats + conn *nats.Conn + natsHandlers handlers + stopped bool + + asyncDispatcher asyncCallbacksHandler + } + + handlers struct { + closed nats.ConnHandler + asyncErr nats.ErrHandler + } + + asyncCallbacksHandler struct { + cbQueue chan func() + } +) + +const ( + // Queue Group name used across all services + QG = "q" + + // APIPrefix is the root of all control subjects + APIPrefix = "$SRV" +) + +// Service Error headers +const ( + ErrorHeader = "Nats-Service-Error" + ErrorCodeHeader = "Nats-Service-Error-Code" +) + +// Verbs being used to set up a specific control subject. +const ( + PingVerb Verb = iota + StatsVerb + InfoVerb + SchemaVerb +) + +var ( + // this regular expression is suggested regexp for semver validation: https://semver.org/ + semVerRegexp = regexp.MustCompile(`^(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)(?:-((?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+([0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?$`) + serviceNameRegexp = regexp.MustCompile(`^[A-Za-z0-9\-_]+$`) +) + +// Common errors returned by the Service framework. +var ( + // ErrConfigValidation is returned when service configuration is invalid + ErrConfigValidation = errors.New("validation") + + // ErrVerbNotSupported is returned when invalid [Verb] is used (PING, SCHEMA, INFO, STATS) + ErrVerbNotSupported = errors.New("unsupported verb") + + // ErrServiceNameRequired is returned when attempting to generate control subject with ID but empty name + ErrServiceNameRequired = errors.New("service name is required to generate ID control subject") +) + +func (s Verb) String() string { + switch s { + case PingVerb: + return "PING" + case StatsVerb: + return "STATS" + case InfoVerb: + return "INFO" + case SchemaVerb: + return "SCHEMA" + default: + return "" + } +} + +// AddService adds a microservice. +// It will enable internal common services (PING, STATS, INFO and SCHEMA) as well as +// the actual service handler on the subject provided in config.Endpoint +// A service name, version and Endpoint configuration are required to add a service. +// AddService returns a [Service] interface, allowing service menagement. +// Each service is assigned a unique ID. +func AddService(nc *nats.Conn, config Config) (Service, error) { + if err := config.valid(); err != nil { + return nil, err + } + + id := nuid.Next() + svc := &service{ + Config: config, + conn: nc, + id: id, + asyncDispatcher: asyncCallbacksHandler{ + cbQueue: make(chan func(), 100), + }, + } + svcIdentity := ServiceIdentity{ + Name: config.Name, + ID: id, + Version: config.Version, + } + svc.verbSubs = make(map[string]*nats.Subscription) + svc.stats = &Stats{ + ServiceIdentity: svcIdentity, + } + + svc.setupAsyncCallbacks() + + go svc.asyncDispatcher.asyncCBDispatcher() + + // Setup internal subscriptions. + var err error + + svc.reqSub, err = nc.QueueSubscribe(config.Endpoint.Subject, QG, func(m *nats.Msg) { + svc.reqHandler(&Request{msg: m}) + }) + if err != nil { + svc.asyncDispatcher.close() + return nil, err + } + + ping := Ping(svcIdentity) + + infoHandler := func(req *Request) { + response, _ := json.Marshal(svc.Info()) + if err := req.Respond(response); err != nil { + if err := req.Error("500", fmt.Sprintf("Error handling INFO request: %s", err), nil); err != nil && config.ErrorHandler != nil { + svc.asyncDispatcher.push(func() { config.ErrorHandler(svc, &NATSError{req.msg.Subject, err.Error()}) }) + } + } + } + + pingHandler := func(req *Request) { + response, _ := json.Marshal(ping) + if err := req.Respond(response); err != nil { + if err := req.Error("500", fmt.Sprintf("Error handling PING request: %s", err), nil); err != nil && config.ErrorHandler != nil { + svc.asyncDispatcher.push(func() { config.ErrorHandler(svc, &NATSError{req.msg.Subject, err.Error()}) }) + } + } + } + + statsHandler := func(req *Request) { + response, _ := json.Marshal(svc.Stats()) + if err := req.Respond(response); err != nil { + if err := req.Error("500", fmt.Sprintf("Error handling STATS request: %s", err), nil); err != nil && config.ErrorHandler != nil { + svc.asyncDispatcher.push(func() { config.ErrorHandler(svc, &NATSError{req.msg.Subject, err.Error()}) }) + } + } + } + + schema := SchemaResp{ + ServiceIdentity: svcIdentity, + Schema: config.Schema, + } + schemaHandler := func(req *Request) { + response, _ := json.Marshal(schema) + if err := req.Respond(response); err != nil { + if err := req.Error("500", fmt.Sprintf("Error handling SCHEMA request: %s", err), nil); err != nil && config.ErrorHandler != nil { + svc.asyncDispatcher.push(func() { config.ErrorHandler(svc, &NATSError{req.msg.Subject, err.Error()}) }) + } + } + } + + if err := svc.verbHandlers(nc, InfoVerb, infoHandler); err != nil { + svc.asyncDispatcher.close() + return nil, err + } + if err := svc.verbHandlers(nc, PingVerb, pingHandler); err != nil { + svc.asyncDispatcher.close() + return nil, err + } + if err := svc.verbHandlers(nc, StatsVerb, statsHandler); err != nil { + svc.asyncDispatcher.close() + return nil, err + } + + if err := svc.verbHandlers(nc, SchemaVerb, schemaHandler); err != nil { + svc.asyncDispatcher.close() + return nil, err + } + svc.stats.Started = time.Now().Format(time.RFC3339) + + return svc, nil +} + +// dispatch is responsible for calling any async callbacks +func (ac *asyncCallbacksHandler) asyncCBDispatcher() { + for { + f := <-ac.cbQueue + if f == nil { + return + } + f() + } +} + +// dispatch is responsible for calling any async callbacks +func (ac *asyncCallbacksHandler) push(f func()) { + ac.cbQueue <- f +} + +func (ac *asyncCallbacksHandler) close() { + close(ac.cbQueue) +} + +func (s *Config) valid() error { + if !serviceNameRegexp.MatchString(s.Name) { + return fmt.Errorf("%w: service name: name should not be empty and should consist of alphanumerical charactest, dashes and underscores", ErrConfigValidation) + } + if !semVerRegexp.MatchString(s.Version) { + return fmt.Errorf("%w: version: version should not be empty should match the SemVer format", ErrConfigValidation) + } + return s.Endpoint.valid() +} + +func (e *Endpoint) valid() error { + if e.Subject == "" { + return fmt.Errorf("%w: endpoint: subject is required", ErrConfigValidation) + } + if e.Handler == nil { + return fmt.Errorf("%w: endpoint: handler is required", ErrConfigValidation) + } + return nil +} + +func (svc *service) setupAsyncCallbacks() { + svc.m.Lock() + defer svc.m.Unlock() + svc.natsHandlers.closed = svc.conn.ClosedHandler() + if svc.natsHandlers.closed != nil { + svc.conn.SetClosedHandler(func(c *nats.Conn) { + svc.Stop() + svc.natsHandlers.closed(c) + }) + } else { + svc.conn.SetClosedHandler(func(c *nats.Conn) { + svc.Stop() + }) + } + + svc.natsHandlers.asyncErr = svc.conn.ErrorHandler() + if svc.natsHandlers.asyncErr != nil { + svc.conn.SetErrorHandler(func(c *nats.Conn, s *nats.Subscription, err error) { + if !svc.matchSubscriptionSubject(s.Subject) { + svc.natsHandlers.asyncErr(c, s, err) + } + if svc.Config.ErrorHandler != nil { + svc.Config.ErrorHandler(svc, &NATSError{ + Subject: s.Subject, + Description: err.Error(), + }) + } + svc.m.Lock() + svc.stats.NumErrors++ + svc.stats.LastError = err.Error() + svc.m.Unlock() + svc.Stop() + svc.natsHandlers.asyncErr(c, s, err) + }) + } else { + svc.conn.SetErrorHandler(func(c *nats.Conn, s *nats.Subscription, err error) { + if !svc.matchSubscriptionSubject(s.Subject) { + return + } + if svc.Config.ErrorHandler != nil { + svc.Config.ErrorHandler(svc, &NATSError{ + Subject: s.Subject, + Description: err.Error(), + }) + } + svc.m.Lock() + svc.stats.NumErrors++ + svc.stats.LastError = err.Error() + svc.m.Unlock() + svc.Stop() + }) + } +} + +func (svc *service) matchSubscriptionSubject(subj string) bool { + if svc.reqSub.Subject == subj { + return true + } + for _, verbSub := range svc.verbSubs { + if verbSub.Subject == subj { + return true + } + } + return false +} + +// verbHandlers generates control handlers for a specific verb. +// Each request generates 3 subscriptions, one for the general verb +// affecting all services written with the framework, one that handles +// all services of a particular kind, and finally a specific service instance. +func (svc *service) verbHandlers(nc *nats.Conn, verb Verb, handler RequestHandler) error { + name := fmt.Sprintf("%s-all", verb.String()) + if err := svc.addInternalHandler(nc, verb, "", "", name, handler); err != nil { + return err + } + name = fmt.Sprintf("%s-kind", verb.String()) + if err := svc.addInternalHandler(nc, verb, svc.Config.Name, "", name, handler); err != nil { + return err + } + return svc.addInternalHandler(nc, verb, svc.Config.Name, svc.id, verb.String(), handler) +} + +// addInternalHandler registers a control subject handler. +func (s *service) addInternalHandler(nc *nats.Conn, verb Verb, kind, id, name string, handler RequestHandler) error { + subj, err := ControlSubject(verb, kind, id) + if err != nil { + s.Stop() + return err + } + + s.verbSubs[name], err = nc.Subscribe(subj, func(msg *nats.Msg) { + handler(&Request{msg: msg}) + }) + if err != nil { + s.Stop() + return err + } + return nil +} + +// reqHandller invokes the service request handler and modifies service stats +func (s *service) reqHandler(req *Request) { + start := time.Now() + s.Endpoint.Handler(req) + s.m.Lock() + s.stats.NumRequests++ + s.stats.ProcessingTime += time.Since(start) + avgProcessingTime := s.stats.ProcessingTime.Nanoseconds() / int64(s.stats.NumRequests) + s.stats.AverageProcessingTime = time.Duration(avgProcessingTime) + + if req.respondError != nil { + s.stats.NumErrors++ + s.stats.LastError = req.respondError.Error() + } + s.m.Unlock() +} + +// Stop drains the endpoint subscriptions and marks the service as stopped. +func (s *service) Stop() error { + s.m.Lock() + if s.stopped { + return nil + } + defer s.m.Unlock() + if s.reqSub != nil { + if err := s.reqSub.Drain(); err != nil { + return fmt.Errorf("draining subscription for request handler: %w", err) + } + s.reqSub = nil + } + var keys []string + for key, sub := range s.verbSubs { + keys = append(keys, key) + if err := sub.Drain(); err != nil { + return fmt.Errorf("draining subscription for subject %q: %w", sub.Subject, err) + } + } + for _, key := range keys { + delete(s.verbSubs, key) + } + restoreAsyncHandlers(s.conn, s.natsHandlers) + s.stopped = true + if s.DoneHandler != nil { + s.asyncDispatcher.push(func() { s.DoneHandler(s) }) + s.asyncDispatcher.close() + } + return nil +} + +func restoreAsyncHandlers(nc *nats.Conn, handlers handlers) { + nc.SetClosedHandler(handlers.closed) + nc.SetErrorHandler(handlers.asyncErr) +} + +// ID returns the service instance's unique ID. +func (s *service) Info() Info { + return Info{ + ServiceIdentity: ServiceIdentity{ + Name: s.Config.Name, + ID: s.id, + Version: s.Config.Version, + }, + Description: s.Config.Description, + Subject: s.Config.Endpoint.Subject, + } +} + +// Stats returns statisctics for the service endpoint and all monitoring endpoints. +func (s *service) Stats() Stats { + s.m.Lock() + defer s.m.Unlock() + if s.StatsHandler != nil { + s.stats.Data, _ = json.Marshal(s.StatsHandler(s.Endpoint)) + } + info := s.Info() + return Stats{ + ServiceIdentity: ServiceIdentity{ + Name: info.Name, + ID: info.ID, + Version: info.Version, + }, + NumRequests: s.stats.NumRequests, + NumErrors: s.stats.NumErrors, + ProcessingTime: s.stats.ProcessingTime, + AverageProcessingTime: s.stats.AverageProcessingTime, + Started: s.stats.Started, + Data: s.stats.Data, + } +} + +// Reset resets all statistics on a service instance. +func (s *service) Reset() { + s.m.Lock() + s.stats = &Stats{ + ServiceIdentity: s.Info().ServiceIdentity, + } + s.m.Unlock() +} + +// Stopped informs whether [Stop] was executed on the service. +func (s *service) Stopped() bool { + s.m.Lock() + defer s.m.Unlock() + return s.stopped +} + +// ControlSubject returns monitoring subjects used by the Service. +// Providing a verb is mandatory (it should be one of Ping, Schema, Info or Stats). +// Depending on whether kind and id are provided, ControlSubject will return one of the following: +// - verb only: subject used to monitor all available services +// - verb and kind: subject used to monitor services with the provided name +// - verb, name and id: subject used to monitor an instance of a service with the provided ID +func ControlSubject(verb Verb, name, id string) (string, error) { + verbStr := verb.String() + if verbStr == "" { + return "", fmt.Errorf("%w: %q", ErrVerbNotSupported, verbStr) + } + if name == "" && id != "" { + return "", ErrServiceNameRequired + } + if name == "" && id == "" { + return fmt.Sprintf("%s.%s", APIPrefix, verbStr), nil + } + if id == "" { + return fmt.Sprintf("%s.%s.%s", APIPrefix, verbStr, name), nil + } + return fmt.Sprintf("%s.%s.%s.%s", APIPrefix, verbStr, name, id), nil +} + +func (e *NATSError) Error() string { + return fmt.Sprintf("%q: %s", e.Subject, e.Description) +} diff -Nru golang-github-nats-io-go-nats-1.20.0/micro/service_test.go golang-github-nats-io-go-nats-1.22.1/micro/service_test.go --- golang-github-nats-io-go-nats-1.20.0/micro/service_test.go 1970-01-01 00:00:00.000000000 +0000 +++ golang-github-nats-io-go-nats-1.22.1/micro/service_test.go 2022-12-22 14:09:21.000000000 +0000 @@ -0,0 +1,1137 @@ +// Copyright 2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package micro + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "math/rand" + "reflect" + "testing" + "time" + + "github.com/nats-io/nats-server/v2/server" + natsserver "github.com/nats-io/nats-server/v2/test" + "github.com/nats-io/nats.go" +) + +func TestServiceBasics(t *testing.T) { + s := RunServerOnPort(-1) + defer s.Shutdown() + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Expected to connect to server, got %v", err) + } + defer nc.Close() + + // Stub service. + doAdd := func(req *Request) { + if rand.Intn(10) == 0 { + if err := req.Error("500", "Unexpected error!", nil); err != nil { + t.Fatalf("Unexpected error when sending error response: %v", err) + } + return + } + // Happy Path. + // Random delay between 5-10ms + time.Sleep(5*time.Millisecond + time.Duration(rand.Intn(5))*time.Millisecond) + if err := req.Respond([]byte("42")); err != nil { + if err := req.Error("500", "Unexpected error!", nil); err != nil { + t.Fatalf("Unexpected error when sending error response: %v", err) + } + return + } + } + + var svcs []Service + + // Create 5 service responders. + config := Config{ + Name: "CoolAddService", + Version: "0.1.0", + Description: "Add things together", + Endpoint: Endpoint{ + Subject: "svc.add", + Handler: doAdd, + }, + Schema: Schema{Request: "", Response: ""}, + } + + for i := 0; i < 5; i++ { + svc, err := AddService(nc, config) + if err != nil { + t.Fatalf("Expected to create Service, got %v", err) + } + defer svc.Stop() + svcs = append(svcs, svc) + } + + // Now send 50 requests. + for i := 0; i < 50; i++ { + _, err := nc.Request("svc.add", []byte(`{ "x": 22, "y": 11 }`), time.Second) + if err != nil { + t.Fatalf("Expected a response, got %v", err) + } + } + + for _, svc := range svcs { + info := svc.Info() + if info.Name != "CoolAddService" { + t.Fatalf("Expected %q, got %q", "CoolAddService", info.Name) + } + if len(info.Description) == 0 || len(info.Version) == 0 { + t.Fatalf("Expected non empty description and version") + } + } + + // Make sure we can request info, 1 response. + // This could be exported as well as main ServiceImpl. + subj, err := ControlSubject(InfoVerb, "CoolAddService", "") + if err != nil { + t.Fatalf("Failed to building info subject %v", err) + } + info, err := nc.Request(subj, nil, time.Second) + if err != nil { + t.Fatalf("Expected a response, got %v", err) + } + var inf Info + if err := json.Unmarshal(info.Data, &inf); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if inf.Subject != "svc.add" { + t.Fatalf("expected service subject to be srv.add: %s", inf.Subject) + } + + // Ping all services. Multiple responses. + inbox := nats.NewInbox() + sub, err := nc.SubscribeSync(inbox) + if err != nil { + t.Fatalf("subscribe failed: %s", err) + } + pingSubject, err := ControlSubject(PingVerb, "CoolAddService", "") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if err := nc.PublishRequest(pingSubject, inbox, nil); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + var pingCount int + for { + _, err := sub.NextMsg(250 * time.Millisecond) + if err != nil { + break + } + pingCount++ + } + if pingCount != 5 { + t.Fatalf("Expected 5 ping responses, got: %d", pingCount) + } + + // Get stats from all services + statsInbox := nats.NewInbox() + sub, err = nc.SubscribeSync(statsInbox) + if err != nil { + t.Fatalf("subscribe failed: %s", err) + } + statsSubject, err := ControlSubject(StatsVerb, "CoolAddService", "") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if err := nc.PublishRequest(statsSubject, statsInbox, nil); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + stats := make([]Stats, 0) + var requestsNum int + for { + resp, err := sub.NextMsg(250 * time.Millisecond) + if err != nil { + break + } + var srvStats Stats + if err := json.Unmarshal(resp.Data, &srvStats); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + requestsNum += srvStats.NumRequests + stats = append(stats, srvStats) + } + if len(stats) != 5 { + t.Fatalf("Expected stats for 5 services, got: %d", len(stats)) + } + + // Services should process 50 requests total + if requestsNum != 50 { + t.Fatalf("Expected a total fo 50 requests processed, got: %d", requestsNum) + } + // Reset stats for a service + svcs[0].Reset() + emptyStats := Stats{ + ServiceIdentity: svcs[0].Info().ServiceIdentity, + } + + if !reflect.DeepEqual(svcs[0].Stats(), emptyStats) { + t.Fatalf("Expected empty stats after reset; got: %+v", svcs[0].Stats()) + } + +} + +func TestAddService(t *testing.T) { + testHandler := func(*Request) {} + errNats := make(chan struct{}) + errService := make(chan struct{}) + closedNats := make(chan struct{}) + doneService := make(chan struct{}) + + tests := []struct { + name string + givenConfig Config + natsClosedHandler nats.ConnHandler + natsErrorHandler nats.ErrHandler + asyncErrorSubject string + expectedPing Ping + withError error + }{ + { + name: "minimal config", + givenConfig: Config{ + Name: "test_service", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "test.sub", + Handler: testHandler, + }, + }, + expectedPing: Ping{ + Name: "test_service", + Version: "0.1.0", + }, + }, + { + name: "with done handler, no handlers on nats connection", + givenConfig: Config{ + Name: "test_service", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "test.sub", + Handler: testHandler, + }, + DoneHandler: func(Service) { + doneService <- struct{}{} + }, + }, + expectedPing: Ping{ + Name: "test_service", + Version: "0.1.0", + }, + }, + { + name: "with error handler, no handlers on nats connection", + givenConfig: Config{ + Name: "test_service", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "test.sub", + Handler: testHandler, + }, + ErrorHandler: func(Service, *NATSError) { + errService <- struct{}{} + }, + }, + expectedPing: Ping{ + Name: "test_service", + Version: "0.1.0", + }, + asyncErrorSubject: "test.sub", + }, + { + name: "with error handler, no handlers on nats connection, error on monitoring subject", + givenConfig: Config{ + Name: "test_service", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "test.sub", + Handler: testHandler, + }, + ErrorHandler: func(Service, *NATSError) { + errService <- struct{}{} + }, + }, + expectedPing: Ping{ + Name: "test_service", + Version: "0.1.0", + }, + asyncErrorSubject: "$SVC.PING.TEST_SERVICE", + }, + { + name: "with done handler, append to nats handlers", + givenConfig: Config{ + Name: "test_service", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "test.sub", + Handler: testHandler, + }, + DoneHandler: func(Service) { + doneService <- struct{}{} + }, + }, + natsClosedHandler: func(c *nats.Conn) { + closedNats <- struct{}{} + }, + natsErrorHandler: func(*nats.Conn, *nats.Subscription, error) { + errNats <- struct{}{} + }, + expectedPing: Ping{ + Name: "test_service", + Version: "0.1.0", + }, + asyncErrorSubject: "test.sub", + }, + { + name: "with error handler, append to nats handlers", + givenConfig: Config{ + Name: "test_service", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "test.sub", + Handler: testHandler, + }, + DoneHandler: func(Service) { + doneService <- struct{}{} + }, + }, + natsClosedHandler: func(c *nats.Conn) { + closedNats <- struct{}{} + }, + natsErrorHandler: func(*nats.Conn, *nats.Subscription, error) { + errNats <- struct{}{} + }, + expectedPing: Ping{ + Name: "test_service", + Version: "0.1.0", + }, + }, + { + name: "with error handler, append to nats handlers, error on monitoring subject", + givenConfig: Config{ + Name: "test_service", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "test.sub", + Handler: testHandler, + }, + DoneHandler: func(Service) { + doneService <- struct{}{} + }, + }, + natsClosedHandler: func(c *nats.Conn) { + closedNats <- struct{}{} + }, + natsErrorHandler: func(*nats.Conn, *nats.Subscription, error) { + errNats <- struct{}{} + }, + expectedPing: Ping{ + Name: "test_service", + Version: "0.1.0", + }, + asyncErrorSubject: "$SVC.PING.TEST_SERVICE", + }, + { + name: "validation error, invalid service name", + givenConfig: Config{ + Name: "test_service!", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "test.sub", + Handler: testHandler, + }, + }, + withError: ErrConfigValidation, + }, + { + name: "validation error, invalid version", + givenConfig: Config{ + Name: "test_service!", + Version: "abc", + Endpoint: Endpoint{ + Subject: "test.sub", + Handler: testHandler, + }, + }, + withError: ErrConfigValidation, + }, + { + name: "validation error, empty subject", + givenConfig: Config{ + Name: "test_service", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "", + Handler: testHandler, + }, + }, + withError: ErrConfigValidation, + }, + { + name: "validation error, no handler", + givenConfig: Config{ + Name: "test_service", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "test_subject", + Handler: nil, + }, + }, + withError: ErrConfigValidation, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + s := RunServerOnPort(-1) + defer s.Shutdown() + + nc, err := nats.Connect(s.ClientURL(), + nats.ErrorHandler(test.natsErrorHandler), + nats.ClosedHandler(test.natsClosedHandler), + ) + if err != nil { + t.Fatalf("Expected to connect to server, got %v", err) + } + defer nc.Close() + + srv, err := AddService(nc, test.givenConfig) + if test.withError != nil { + if !errors.Is(err, test.withError) { + t.Fatalf("Expected error: %v; got: %v", test.withError, err) + } + return + } + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + info := srv.Info() + pingSubject, err := ControlSubject(PingVerb, info.Name, info.ID) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + pingResp, err := nc.Request(pingSubject, nil, 1*time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + var ping Ping + if err := json.Unmarshal(pingResp.Data, &ping); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + test.expectedPing.ID = info.ID + if test.expectedPing != ping { + t.Fatalf("Invalid ping response; want: %+v; got: %+v", test.expectedPing, ping) + } + + if test.givenConfig.DoneHandler != nil { + go nc.Opts.ClosedCB(nc) + select { + case <-doneService: + case <-time.After(1 * time.Second): + t.Fatalf("Timeout on DoneHandler") + } + if test.natsClosedHandler != nil { + select { + case <-closedNats: + case <-time.After(1 * time.Second): + t.Fatalf("Timeout on ClosedHandler") + } + } + } + + if test.givenConfig.ErrorHandler != nil { + go nc.Opts.AsyncErrorCB(nc, &nats.Subscription{Subject: test.asyncErrorSubject}, fmt.Errorf("oops")) + select { + case <-errService: + case <-time.After(1 * time.Second): + t.Fatalf("Timeout on ErrorHandler") + } + if test.natsErrorHandler != nil { + select { + case <-errNats: + case <-time.After(1 * time.Second): + t.Fatalf("Timeout on AsyncErrHandler") + } + } + } + + if err := srv.Stop(); err != nil { + t.Fatalf("Unexpected error when stopping the service: %v", err) + } + if test.natsClosedHandler != nil { + go nc.Opts.ClosedCB(nc) + select { + case <-doneService: + t.Fatalf("Expected to restore nats closed handler") + case <-time.After(50 * time.Millisecond): + } + select { + case <-closedNats: + case <-time.After(1 * time.Second): + t.Fatalf("Timeout on ClosedHandler") + } + } + if test.natsErrorHandler != nil { + go nc.Opts.AsyncErrorCB(nc, &nats.Subscription{Subject: test.asyncErrorSubject}, fmt.Errorf("oops")) + select { + case <-errService: + t.Fatalf("Expected to restore nats error handler") + case <-time.After(50 * time.Millisecond): + } + select { + case <-errNats: + case <-time.After(1 * time.Second): + t.Fatalf("Timeout on AsyncErrHandler") + } + } + }) + } +} + +func TestMonitoringHandlers(t *testing.T) { + s := RunServerOnPort(-1) + defer s.Shutdown() + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Expected to connect to server, got %v", err) + } + defer nc.Close() + + asyncErr := make(chan struct{}) + errHandler := func(s Service, n *NATSError) { + asyncErr <- struct{}{} + } + + config := Config{ + Name: "test_service", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "test.sub", + Handler: func(*Request) {}, + }, + Schema: Schema{ + Request: "some_schema", + }, + ErrorHandler: errHandler, + } + srv, err := AddService(nc, config) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer func() { + srv.Stop() + if !srv.Stopped() { + t.Fatalf("Expected service to be stopped") + } + }() + + info := srv.Info() + + tests := []struct { + name string + subject string + withError bool + expectedResponse interface{} + }{ + { + name: "PING all", + subject: "$SRV.PING", + expectedResponse: Ping{ + Name: "test_service", + Version: "0.1.0", + ID: info.ID, + }, + }, + { + name: "PING name", + subject: "$SRV.PING.test_service", + expectedResponse: Ping{ + Name: "test_service", + Version: "0.1.0", + ID: info.ID, + }, + }, + { + name: "PING ID", + subject: fmt.Sprintf("$SRV.PING.test_service.%s", info.ID), + expectedResponse: Ping{ + Name: "test_service", + Version: "0.1.0", + ID: info.ID, + }, + }, + { + name: "INFO all", + subject: "$SRV.INFO", + expectedResponse: Info{ + ServiceIdentity: ServiceIdentity{ + Name: "test_service", + Version: "0.1.0", + ID: info.ID, + }, + Subject: "test.sub", + }, + }, + { + name: "INFO name", + subject: "$SRV.INFO.test_service", + expectedResponse: Info{ + ServiceIdentity: ServiceIdentity{ + Name: "test_service", + Version: "0.1.0", + ID: info.ID, + }, + Subject: "test.sub", + }, + }, + { + name: "INFO ID", + subject: fmt.Sprintf("$SRV.INFO.test_service.%s", info.ID), + expectedResponse: Info{ + ServiceIdentity: ServiceIdentity{ + Name: "test_service", + Version: "0.1.0", + ID: info.ID, + }, + Subject: "test.sub", + }, + }, + { + name: "SCHEMA all", + subject: "$SRV.SCHEMA", + expectedResponse: SchemaResp{ + ServiceIdentity: ServiceIdentity{ + Name: "test_service", + Version: "0.1.0", + ID: info.ID, + }, + Schema: Schema{ + Request: "some_schema", + }, + }, + }, + { + name: "SCHEMA name", + subject: "$SRV.SCHEMA.test_service", + expectedResponse: SchemaResp{ + ServiceIdentity: ServiceIdentity{ + Name: "test_service", + Version: "0.1.0", + ID: info.ID, + }, + Schema: Schema{ + Request: "some_schema", + }, + }, + }, + { + name: "SCHEMA ID", + subject: fmt.Sprintf("$SRV.SCHEMA.test_service.%s", info.ID), + expectedResponse: SchemaResp{ + ServiceIdentity: ServiceIdentity{ + Name: "test_service", + Version: "0.1.0", + ID: info.ID, + }, + Schema: Schema{ + Request: "some_schema", + }, + }, + }, + { + name: "PING error", + subject: "$SRV.PING", + withError: true, + }, + { + name: "INFO error", + subject: "$SRV.INFO", + withError: true, + }, + { + name: "STATS error", + subject: "$SRV.STATS", + withError: true, + }, + { + name: "SCHEMA error", + subject: "$SRV.SCHEMA", + withError: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + if test.withError { + // use publish instead of request, so Respond will fail inside the handler + if err := nc.Publish(test.subject, nil); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + select { + case <-asyncErr: + return + case <-time.After(1 * time.Second): + t.Fatalf("Timeout waiting for async error") + } + return + } + + resp, err := nc.Request(test.subject, nil, 1*time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + respMap := make(map[string]interface{}) + if err := json.Unmarshal(resp.Data, &respMap); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + expectedResponseJSON, err := json.Marshal(test.expectedResponse) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + expectedRespMap := make(map[string]interface{}) + if err := json.Unmarshal(expectedResponseJSON, &expectedRespMap); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !reflect.DeepEqual(respMap, expectedRespMap) { + t.Fatalf("Invalid response; want: %+v; got: %+v", expectedRespMap, respMap) + } + }) + } +} + +func TestServiceStats(t *testing.T) { + handler := func(r *Request) { + r.Respond([]byte("ok")) + } + tests := []struct { + name string + config Config + expectedStats map[string]interface{} + }{ + { + name: "without schema or stats handler", + config: Config{ + Name: "test_service", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "test.sub", + Handler: handler, + }, + }, + }, + { + name: "with stats handler", + config: Config{ + Name: "test_service", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "test.sub", + Handler: handler, + }, + StatsHandler: func(e Endpoint) interface{} { + return map[string]interface{}{ + "key": "val", + } + }, + }, + expectedStats: map[string]interface{}{ + "key": "val", + }, + }, + { + name: "with schema", + config: Config{ + Name: "test_service", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "test.sub", + Handler: handler, + }, + Schema: Schema{ + Request: "some_schema", + }, + }, + }, + { + name: "with schema and stats handler", + config: Config{ + Name: "test_service", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "test.sub", + Handler: handler, + }, + Schema: Schema{ + Request: "some_schema", + }, + StatsHandler: func(e Endpoint) interface{} { + return map[string]interface{}{ + "key": "val", + } + }, + }, + expectedStats: map[string]interface{}{ + "key": "val", + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + s := RunServerOnPort(-1) + defer s.Shutdown() + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Expected to connect to server, got %v", err) + } + defer nc.Close() + + srv, err := AddService(nc, test.config) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer srv.Stop() + for i := 0; i < 10; i++ { + if _, err := nc.Request(srv.Info().Subject, []byte("msg"), time.Second); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + } + + // Malformed request, missing reply subjtct + // This should be reflected in errors + if err := nc.Publish(srv.Info().Subject, []byte("err")); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + time.Sleep(10 * time.Millisecond) + + info := srv.Info() + resp, err := nc.Request(fmt.Sprintf("$SRV.STATS.test_service.%s", info.ID), nil, 1*time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + var stats Stats + if err := json.Unmarshal(resp.Data, &stats); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if stats.Name != info.Name { + t.Errorf("Unexpected service name; want: %s; got: %s", info.Name, stats.Name) + } + if stats.ID != info.ID { + t.Errorf("Unexpected service name; want: %s; got: %s", info.ID, stats.ID) + } + if stats.NumRequests != 11 { + t.Errorf("Unexpected num_requests; want: 11; got: %d", stats.NumRequests) + } + if stats.NumErrors != 1 { + t.Errorf("Unexpected num_errors; want: 1; got: %d", stats.NumErrors) + } + if test.expectedStats != nil { + var data map[string]interface{} + if err := json.Unmarshal(stats.Data, &data); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !reflect.DeepEqual(data, test.expectedStats) { + t.Fatalf("Invalid data from stats handler; want: %v; got: %v", test.expectedStats, data) + } + } + }) + } +} + +func TestRequestRespond(t *testing.T) { + type x struct { + A string `json:"a"` + B int `json:"b"` + } + + tests := []struct { + name string + respondData interface{} + respondHeaders Headers + errDescription string + errCode string + errData []byte + expectedMessage string + expectedCode string + expectedResponse []byte + withRespondError error + }{ + { + name: "byte response", + respondData: []byte("OK"), + expectedResponse: []byte("OK"), + }, + { + name: "byte response, with headers", + respondHeaders: Headers{"key": []string{"value"}}, + respondData: []byte("OK"), + expectedResponse: []byte("OK"), + }, + { + name: "byte response, connection closed", + respondData: []byte("OK"), + withRespondError: ErrRespond, + }, + { + name: "struct response", + respondData: x{"abc", 5}, + expectedResponse: []byte(`{"a":"abc","b":5}`), + }, + { + name: "invalid response data", + respondData: func() {}, + withRespondError: ErrMarshalResponse, + }, + { + name: "generic error", + errDescription: "oops", + errCode: "500", + errData: []byte("error!"), + expectedMessage: "oops", + expectedCode: "500", + }, + { + name: "generic error, with headers", + respondHeaders: Headers{"key": []string{"value"}}, + errDescription: "oops", + errCode: "500", + errData: []byte("error!"), + expectedMessage: "oops", + expectedCode: "500", + }, + { + name: "error without response payload", + errDescription: "oops", + errCode: "500", + expectedMessage: "oops", + expectedCode: "500", + }, + { + name: "missing error code", + errDescription: "oops", + withRespondError: ErrArgRequired, + }, + { + name: "missing error description", + errCode: "500", + withRespondError: ErrArgRequired, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + s := RunServerOnPort(-1) + defer s.Shutdown() + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Expected to connect to server, got %v", err) + } + defer nc.Close() + + respData := test.respondData + respError := test.withRespondError + errCode := test.errCode + errDesc := test.errDescription + errData := test.errData + handler := func(req *Request) { + if errors.Is(test.withRespondError, ErrRespond) { + nc.Close() + } + if val := req.Headers().Get("key"); val != "value" { + t.Fatalf("Expected headers in the request") + } + if errCode == "" && errDesc == "" { + if resp, ok := respData.([]byte); ok { + err := req.Respond(resp, WithHeaders(test.respondHeaders)) + if respError != nil { + if !errors.Is(err, respError) { + t.Fatalf("Expected error: %v; got: %v", respError, err) + } + return + } + if err != nil { + t.Fatalf("Unexpected error when sending response: %v", err) + } + } else { + err := req.RespondJSON(respData, WithHeaders(test.respondHeaders)) + if respError != nil { + if !errors.Is(err, respError) { + t.Fatalf("Expected error: %v; got: %v", respError, err) + } + return + } + if err != nil { + t.Fatalf("Unexpected error when sending response: %v", err) + } + } + return + } + + err := req.Error(errCode, errDesc, errData, WithHeaders(test.respondHeaders)) + if respError != nil { + if !errors.Is(err, respError) { + t.Fatalf("Expected error: %v; got: %v", respError, err) + } + return + } + if err != nil { + t.Fatalf("Unexpected error when sending response: %v", err) + } + } + + svc, err := AddService(nc, Config{ + Name: "CoolService", + Version: "0.1.0", + Description: "test service", + Endpoint: Endpoint{ + Subject: "svc.test", + Handler: handler, + }, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer svc.Stop() + + resp, err := nc.RequestMsg(&nats.Msg{ + Subject: svc.Info().Subject, + Data: nil, + Header: nats.Header{"key": []string{"value"}}, + }, 50*time.Millisecond) + if test.withRespondError != nil { + return + } + if err != nil { + t.Fatalf("request error: %v", err) + } + + if test.errCode != "" { + description := resp.Header.Get("Nats-Service-Error") + if description != test.expectedMessage { + t.Fatalf("Invalid response message; want: %q; got: %q", test.expectedMessage, description) + } + expectedHeaders := Headers{ + "Nats-Service-Error-Code": []string{resp.Header.Get("Nats-Service-Error-Code")}, + "Nats-Service-Error": []string{resp.Header.Get("Nats-Service-Error")}, + } + for k, v := range test.respondHeaders { + expectedHeaders[k] = v + } + if !reflect.DeepEqual(expectedHeaders, Headers(resp.Header)) { + t.Fatalf("Invalid response headers; want: %v; got: %v", test.respondHeaders, resp.Header) + } + return + } + + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if !bytes.Equal(bytes.TrimSpace(resp.Data), bytes.TrimSpace(test.expectedResponse)) { + t.Fatalf("Invalid response; want: %s; got: %s", string(test.expectedResponse), string(resp.Data)) + } + + if !reflect.DeepEqual(test.respondHeaders, Headers(resp.Header)) { + t.Fatalf("Invalid response headers; want: %v; got: %v", test.respondHeaders, resp.Header) + } + }) + } +} + +func RunServerOnPort(port int) *server.Server { + opts := natsserver.DefaultTestOptions + opts.Port = port + return RunServerWithOptions(&opts) +} + +func RunServerWithOptions(opts *server.Options) *server.Server { + return natsserver.RunServer(opts) +} + +func TestControlSubject(t *testing.T) { + tests := []struct { + name string + verb Verb + srvName string + id string + expectedSubject string + withError error + }{ + { + name: "PING ALL", + verb: PingVerb, + expectedSubject: "$SRV.PING", + }, + { + name: "PING name", + verb: PingVerb, + srvName: "test", + expectedSubject: "$SRV.PING.test", + }, + { + name: "PING id", + verb: PingVerb, + srvName: "test", + id: "123", + expectedSubject: "$SRV.PING.test.123", + }, + { + name: "invalid verb", + verb: Verb(100), + withError: ErrVerbNotSupported, + }, + { + name: "name not provided", + verb: PingVerb, + srvName: "", + id: "123", + withError: ErrServiceNameRequired, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + res, err := ControlSubject(test.verb, test.srvName, test.id) + if test.withError != nil { + if !errors.Is(err, test.withError) { + t.Fatalf("Expected error: %v; got: %v", test.withError, err) + } + return + } + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if res != test.expectedSubject { + t.Errorf("Invalid subject; want: %q; got: %q", test.expectedSubject, res) + } + }) + } +} diff -Nru golang-github-nats-io-go-nats-1.20.0/nats.go golang-github-nats-io-go-nats-1.22.1/nats.go --- golang-github-nats-io-go-nats-1.20.0/nats.go 2022-11-11 15:16:40.000000000 +0000 +++ golang-github-nats-io-go-nats-1.22.1/nats.go 2022-12-22 14:09:21.000000000 +0000 @@ -24,7 +24,6 @@ "errors" "fmt" "io" - "io/ioutil" "math/rand" "net" "net/http" @@ -48,7 +47,7 @@ // Default Constants const ( - Version = "1.20.0" + Version = "1.22.1" DefaultURL = "nats://127.0.0.1:4222" DefaultPort = 4222 DefaultMaxReconnect = 60 @@ -247,8 +246,9 @@ // Option is a function on the options for a connection. type Option func(*Options) error -// CustomDialer can be used to specify any dialer, not necessarily -// a *net.Dialer. +// CustomDialer can be used to specify any dialer, not necessarily a +// *net.Dialer. A CustomDialer may also implement `SkipTLSHandshake() bool` +// in order to skip the TLS handshake in case not required. type CustomDialer interface { Dial(network, address string) (net.Conn, error) } @@ -375,6 +375,12 @@ // DisconnectedCB will not be called if DisconnectedErrCB is set DisconnectedErrCB ConnErrHandler + // ConnectedCB sets the connected handler called when the initial connection + // is established. It is not invoked on successful reconnects - for reconnections, + // use ReconnectedCB. ConnectedCB can be used in conjunction with RetryOnFailedConnect + // to detect whether the initial connect was successful. + ConnectedCB ConnHandler + // ReconnectedCB sets the reconnected handler called whenever // the connection is successfully reconnected. ReconnectedCB ConnHandler @@ -465,6 +471,10 @@ // InboxPrefix allows the default _INBOX prefix to be customized InboxPrefix string + + // IgnoreAuthErrorAbort - if set to true, client opts out of the default connect behavior of aborting + // subsequent reconnect attempts if server returns the same auth error twice (regardless of reconnect policy). + IgnoreAuthErrorAbort bool } const ( @@ -827,7 +837,7 @@ return func(o *Options) error { pool := x509.NewCertPool() for _, f := range file { - rootPEM, err := ioutil.ReadFile(f) + rootPEM, err := os.ReadFile(f) if err != nil || rootPEM == nil { return fmt.Errorf("nats: error loading or parsing rootCA file: %v", err) } @@ -999,6 +1009,14 @@ } } +// ConnectHandler is an Option to set the connected handler. +func ConnectHandler(cb ConnHandler) Option { + return func(o *Options) error { + o.ConnectedCB = cb + return nil + } +} + // ReconnectHandler is an Option to set the reconnected handler. func ReconnectHandler(cb ConnHandler) Option { return func(o *Options) error { @@ -1235,6 +1253,15 @@ } } +// IgnoreAuthErrorAbort opts out of the default connect behavior of aborting +// subsequent reconnect attempts if server returns the same auth error twice. +func IgnoreAuthErrorAbort() Option { + return func(o *Options) error { + o.IgnoreAuthErrorAbort = true + return nil + } +} + // Handler processing // SetDisconnectHandler will set the disconnect event handler. @@ -1258,6 +1285,16 @@ nc.Opts.DisconnectedErrCB = dcb } +// DisconnectErrHandler will return the disconnect event handler. +func (nc *Conn) DisconnectErrHandler() ConnErrHandler { + if nc == nil { + return nil + } + nc.mu.Lock() + defer nc.mu.Unlock() + return nc.Opts.DisconnectedErrCB +} + // SetReconnectHandler will set the reconnect event handler. func (nc *Conn) SetReconnectHandler(rcb ConnHandler) { if nc == nil { @@ -1268,6 +1305,16 @@ nc.Opts.ReconnectedCB = rcb } +// ReconnectHandler will return the reconnect event handler. +func (nc *Conn) ReconnectHandler() ConnHandler { + if nc == nil { + return nil + } + nc.mu.Lock() + defer nc.mu.Unlock() + return nc.Opts.ReconnectedCB +} + // SetDiscoveredServersHandler will set the discovered servers handler. func (nc *Conn) SetDiscoveredServersHandler(dscb ConnHandler) { if nc == nil { @@ -1278,6 +1325,16 @@ nc.Opts.DiscoveredServersCB = dscb } +// DiscoveredServersHandler will return the discovered servers handler. +func (nc *Conn) DiscoveredServersHandler() ConnHandler { + if nc == nil { + return nil + } + nc.mu.Lock() + defer nc.mu.Unlock() + return nc.Opts.DiscoveredServersCB +} + // SetClosedHandler will set the closed event handler. func (nc *Conn) SetClosedHandler(cb ConnHandler) { if nc == nil { @@ -1288,6 +1345,16 @@ nc.Opts.ClosedCB = cb } +// ClosedHandler will return the closed event handler. +func (nc *Conn) ClosedHandler() ConnHandler { + if nc == nil { + return nil + } + nc.mu.Lock() + defer nc.mu.Unlock() + return nc.Opts.ClosedCB +} + // SetErrorHandler will set the async error handler. func (nc *Conn) SetErrorHandler(cb ErrHandler) { if nc == nil { @@ -1298,6 +1365,16 @@ nc.Opts.AsyncErrorCB = cb } +// ErrorHandler will return the async error handler. +func (nc *Conn) ErrorHandler() ErrHandler { + if nc == nil { + return nil + } + nc.mu.Lock() + defer nc.mu.Unlock() + return nc.Opts.AsyncErrorCB +} + // Process the url string argument to Connect. // Return an array of urls, even if only one. func processUrlString(url string) []string { @@ -1367,13 +1444,18 @@ // Create reader/writer nc.newReaderWriter() - if err := nc.connect(); err != nil { + connectionEstablished, err := nc.connect() + if err != nil { return nil, err } // Spin up the async cb dispatcher on success go nc.ach.asyncCBDispatcher() + if connectionEstablished && nc.Opts.ConnectedCB != nil { + nc.ach.push(func() { nc.Opts.ConnectedCB(nc) }) + } + return nc, nil } @@ -1860,8 +1942,19 @@ return nil } +type skipTLSDialer interface { + SkipTLSHandshake() bool +} + // makeTLSConn will wrap an existing Conn using TLS func (nc *Conn) makeTLSConn() error { + if nc.Opts.CustomDialer != nil { + // we do nothing when asked to skip the TLS wrapper + sd, ok := nc.Opts.CustomDialer.(skipTLSDialer) + if ok && sd.SkipTLSHandshake() { + return nil + } + } // Allow the user to configure their own tls.Config structure. var tlsCopy *tls.Config if nc.Opts.TLSConfig != nil { @@ -2114,9 +2207,10 @@ return nil } -// Main connect function. Will connect to the nats-server -func (nc *Conn) connect() error { +// Main connect function. Will connect to the nats-server. +func (nc *Conn) connect() (bool, error) { var err error + var connectionEstablished bool // Create actual socket connection // For first connect we walk all servers in the pool and try @@ -2162,6 +2256,7 @@ } if err == nil { + connectionEstablished = true nc.initc = false } else if nc.Opts.RetryOnFailedConnect { nc.setup() @@ -2173,7 +2268,7 @@ nc.current = nil } - return err + return connectionEstablished, err } // This will check to see if the connection should be @@ -3132,8 +3227,8 @@ nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, nil, err) }) } // We should give up if we tried twice on this server and got the - // same error. - if nc.current.lastErr == err { + // same error. This behavior can be modified using IgnoreAuthErrorAbort. + if nc.current.lastErr == err && !nc.Opts.IgnoreAuthErrorAbort { nc.ar = true } else { nc.current.lastErr = err @@ -5340,7 +5435,7 @@ return _EMPTY_, fmt.Errorf("nats: %v", err) } - contents, err := ioutil.ReadFile(path) + contents, err := os.ReadFile(path) if err != nil { return _EMPTY_, fmt.Errorf("nats: %v", err) } @@ -5389,7 +5484,7 @@ } func nkeyPairFromSeedFile(seedFile string) (nkeys.KeyPair, error) { - contents, err := ioutil.ReadFile(seedFile) + contents, err := os.ReadFile(seedFile) if err != nil { return nil, fmt.Errorf("nats: %v", err) } diff -Nru golang-github-nats-io-go-nats-1.20.0/nats_test.go golang-github-nats-io-go-nats-1.22.1/nats_test.go --- golang-github-nats-io-go-nats-1.20.0/nats_test.go 2022-11-11 15:16:40.000000000 +0000 +++ golang-github-nats-io-go-nats-1.22.1/nats_test.go 2022-12-22 14:09:21.000000000 +0000 @@ -23,7 +23,6 @@ "encoding/json" "errors" "fmt" - "io/ioutil" "net" "net/http" "net/url" @@ -1610,10 +1609,14 @@ name string expectedProto string expectedErr error + ignoreAbort bool }{ - {"expired users credentials", AUTHENTICATION_EXPIRED_ERR, ErrAuthExpired}, - {"revoked users credentials", AUTHENTICATION_REVOKED_ERR, ErrAuthRevoked}, - {"expired account", ACCOUNT_AUTHENTICATION_EXPIRED_ERR, ErrAccountAuthExpired}, + {"expired users credentials", AUTHENTICATION_EXPIRED_ERR, ErrAuthExpired, false}, + {"revoked users credentials", AUTHENTICATION_REVOKED_ERR, ErrAuthRevoked, false}, + {"expired account", ACCOUNT_AUTHENTICATION_EXPIRED_ERR, ErrAccountAuthExpired, false}, + {"expired users credentials", AUTHENTICATION_EXPIRED_ERR, ErrAuthExpired, true}, + {"revoked users credentials", AUTHENTICATION_REVOKED_ERR, ErrAuthRevoked, true}, + {"expired account", ACCOUNT_AUTHENTICATION_EXPIRED_ERR, ErrAccountAuthExpired, true}, } { t.Run(test.name, func(t *testing.T) { l, e := net.Listen("tcp", "127.0.0.1:0") @@ -1661,8 +1664,8 @@ errCh := make(chan error, 10) url := fmt.Sprintf("nats://127.0.0.1:%d", addr.Port) - nc, err := Connect(url, - ReconnectWait(25*time.Millisecond), + opts := []Option{ + ReconnectWait(25 * time.Millisecond), ReconnectJitter(0, 0), MaxReconnects(-1), ErrorHandler(func(_ *Conn, _ *Subscription, e error) { @@ -1674,12 +1677,36 @@ ClosedHandler(func(nc *Conn) { ch <- true }), - ) + } + if test.ignoreAbort { + opts = append(opts, IgnoreAuthErrorAbort()) + } + nc, err := Connect(url, opts...) if err != nil { t.Fatalf("Expected to connect, got %v", err) } defer nc.Close() + if test.ignoreAbort { + // We expect more than 3 errors, as the connect attempt should not be aborted after 2 failed attempts. + for i := 0; i < 4; i++ { + select { + case e := <-errCh: + if i == 0 && e != test.expectedErr { + t.Fatalf("Expected error %q, got %q", test.expectedErr, e) + } else if i > 0 && e != ErrAuthorization { + t.Fatalf("Expected error %q, got %q", ErrAuthorization, e) + } + case <-time.After(time.Second): + if i == 0 { + t.Fatalf("Missing %q error", test.expectedErr) + } else { + t.Fatalf("Missing %q error", ErrAuthorization) + } + } + } + return + } // We should give up since we get the same error on both tries. if err := WaitTime(ch, 2*time.Second); err != nil { t.Fatal("Should have closed after multiple failed attempts.") @@ -1795,7 +1822,7 @@ sopts := natsserver.DefaultTestOptions sopts.Port = TEST_PORT - sopts.Nkeys = []*server.NkeyUser{&server.NkeyUser{Nkey: string(pub)}} + sopts.Nkeys = []*server.NkeyUser{{Nkey: string(pub)}} ts := RunServerWithOptions(&sopts) defer ts.Shutdown() @@ -1840,13 +1867,13 @@ func createTmpFile(t *testing.T, content []byte) string { t.Helper() - conf, err := ioutil.TempFile("", "") + conf, err := os.CreateTemp("", "") if err != nil { t.Fatalf("Error creating conf file: %v", err) } fName := conf.Name() conf.Close() - if err := ioutil.WriteFile(fName, content, 0666); err != nil { + if err := os.WriteFile(fName, content, 0666); err != nil { os.Remove(fName) t.Fatalf("Error writing conf file: %v", err) } @@ -1938,7 +1965,7 @@ checkErrChannel(t, errCh) // Now that option is already created, change content of file - ioutil.WriteFile(seedFile, []byte(`xxxxx`), 0666) + os.WriteFile(seedFile, []byte(`xxxxx`), 0666) ch = make(chan bool, 1) go rs(ch) diff -Nru golang-github-nats-io-go-nats-1.20.0/netchan.go golang-github-nats-io-go-nats-1.22.1/netchan.go --- golang-github-nats-io-go-nats-1.20.0/netchan.go 2022-11-11 15:16:40.000000000 +0000 +++ golang-github-nats-io-go-nats-1.22.1/netchan.go 2022-12-22 14:09:21.000000000 +0000 @@ -1,4 +1,4 @@ -// Copyright 2013-2018 The NATS Authors +// Copyright 2013-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff -Nru golang-github-nats-io-go-nats-1.20.0/parser.go golang-github-nats-io-go-nats-1.22.1/parser.go --- golang-github-nats-io-go-nats-1.20.0/parser.go 2022-11-11 15:16:40.000000000 +0000 +++ golang-github-nats-io-go-nats-1.22.1/parser.go 2022-12-22 14:09:21.000000000 +0000 @@ -1,4 +1,4 @@ -// Copyright 2012-2020 The NATS Authors +// Copyright 2012-2122 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff -Nru golang-github-nats-io-go-nats-1.20.0/README.md golang-github-nats-io-go-nats-1.22.1/README.md --- golang-github-nats-io-go-nats-1.20.0/README.md 2022-11-11 15:16:40.000000000 +0000 +++ golang-github-nats-io-go-nats-1.22.1/README.md 2022-12-22 14:09:21.000000000 +0000 @@ -29,7 +29,7 @@ ```bash # Go client latest or explicit version go get github.com/nats-io/nats.go/@latest -go get github.com/nats-io/nats.go/@v1.20.0 +go get github.com/nats-io/nats.go/@v1.22.1 # For latest NATS Server, add /v2 at the end go get github.com/nats-io/nats-server/v2 diff -Nru golang-github-nats-io-go-nats-1.20.0/services/errors.go golang-github-nats-io-go-nats-1.22.1/services/errors.go --- golang-github-nats-io-go-nats-1.20.0/services/errors.go 2022-11-11 15:16:40.000000000 +0000 +++ golang-github-nats-io-go-nats-1.22.1/services/errors.go 1970-01-01 00:00:00.000000000 +0000 @@ -1,25 +0,0 @@ -// Copyright 2022 The NATS Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package services - -import "fmt" - -type ServiceAPIError struct { - ErrorCode int - Description string -} - -func (e *ServiceAPIError) Error() string { - return fmt.Sprintf("%d %s", e.ErrorCode, e.Description) -} diff -Nru golang-github-nats-io-go-nats-1.20.0/services/service.go golang-github-nats-io-go-nats-1.22.1/services/service.go --- golang-github-nats-io-go-nats-1.20.0/services/service.go 2022-11-11 15:16:40.000000000 +0000 +++ golang-github-nats-io-go-nats-1.22.1/services/service.go 1970-01-01 00:00:00.000000000 +0000 @@ -1,404 +0,0 @@ -// Copyright 2022 The NATS Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package services - -import ( - "encoding/json" - "errors" - "fmt" - "strings" - "sync" - "time" - - "github.com/nats-io/nats.go" - "github.com/nats-io/nuid" -) - -// Notice: Experimental Preview -// -// This functionality is EXPERIMENTAL and may be changed in later releases. - -type ( - - // Service is an interface for sevice management. - // It exposes methods to stop/reset a service, as well as get information on a service. - Service interface { - ID() string - Name() string - Description() string - Version() string - Stats() ServiceStats - Reset() - Stop() - } - - // A request handler. - // TODO (could make error more and return more info to user automatically?) - ServiceHandler func(svc Service, req *nats.Msg) error - - // Clients can request as well. - ServiceStats struct { - Name string `json:"name"` - ID string `json:"id"` - Version string `json:"version"` - Started time.Time `json:"started"` - Endpoints []Stats `json:"stats"` - } - - Stats struct { - Name string `json:"name"` - NumRequests int `json:"num_requests"` - NumErrors int `json:"num_errors"` - TotalLatency time.Duration `json:"total_latency"` - AverageLatency time.Duration `json:"average_latency"` - Data interface{} `json:"data"` - } - - // ServiceInfo is the basic information about a service type - ServiceInfo struct { - Name string `json:"name"` - ID string `json:"id"` - Description string `json:"description"` - Version string `json:"version"` - Subject string `json:"subject"` - } - - ServiceSchema struct { - Request string `json:"request"` - Response string `json:"response"` - } - - Endpoint struct { - Subject string `json:"subject"` - Handler ServiceHandler - } - - InternalEndpoint struct { - Name string - Handler nats.MsgHandler - } - - ServiceVerb int64 - - ServiceConfig struct { - Name string `json:"name"` - Description string `json:"description"` - Version string `json:"version"` - Schema ServiceSchema `json:"schema"` - Endpoint Endpoint `json:"endpoint"` - StatusHandler func(Endpoint) interface{} - } - - // service is the internal implementation of a Service - service struct { - sync.Mutex - ServiceConfig - id string - // subs - reqSub *nats.Subscription - internal map[string]*nats.Subscription - statuses map[string]*Stats - stats *ServiceStats - conn *nats.Conn - } -) - -const ( - // We can fix this, as versions will be on separate subjects and use account mapping to roll requests to new versions etc. - QG = "svc" - - // ServiceApiPrefix is the root of all control subjects - ServiceApiPrefix = "$SRV" - - ServiceErrorHeader = "Nats-Service-Error" -) - -const ( - SrvPing ServiceVerb = iota - SrvStatus - SrvInfo - SrvSchema -) - -func (s *ServiceConfig) Valid() error { - if s.Name == "" { - return errors.New("name is required") - } - return s.Endpoint.Valid() -} - -func (e *Endpoint) Valid() error { - s := strings.TrimSpace(e.Subject) - if len(s) == 0 { - return errors.New("subject is required") - } - if e.Handler == nil { - return errors.New("handler is required") - } - return nil -} - -func (s ServiceVerb) String() string { - switch s { - case SrvPing: - return "PING" - case SrvStatus: - return "STATUS" - case SrvInfo: - return "INFO" - case SrvSchema: - return "SCHEMA" - default: - return "" - } -} - -// Add adds a microservice. -// NOTE we can do an OpenAPI version as well, but looking at it it was very involved. So I think keep simple version and -// also have a version that talkes full blown OpenAPI spec and we can pull these things out. -func Add(nc *nats.Conn, config ServiceConfig) (Service, error) { - if err := config.Valid(); err != nil { - return nil, err - } - - id := nuid.Next() - svc := &service{ - ServiceConfig: config, - conn: nc, - id: id, - } - svc.internal = make(map[string]*nats.Subscription) - svc.statuses = make(map[string]*Stats) - svc.statuses[""] = &Stats{ - Name: config.Name, - } - - svc.stats = &ServiceStats{ - Name: config.Name, - ID: id, - Version: config.Version, - Started: time.Now(), - } - - // Setup internal subscriptions. - var err error - - svc.reqSub, err = nc.QueueSubscribe(config.Endpoint.Subject, QG, func(m *nats.Msg) { - svc.reqHandler(m) - }) - if err != nil { - return nil, err - } - - info := &ServiceInfo{ - Name: config.Name, - ID: id, - Description: config.Description, - Version: config.Version, - Subject: config.Endpoint.Subject, - } - - infoHandler := func(m *nats.Msg) { - response, _ := json.MarshalIndent(info, "", " ") - m.Respond(response) - } - - pingHandler := func(m *nats.Msg) { - infoHandler(m) - } - - statusHandler := func(m *nats.Msg) { - response, _ := json.MarshalIndent(svc.Stats(), "", " ") - m.Respond(response) - } - - schemaHandler := func(m *nats.Msg) { - response, _ := json.MarshalIndent(svc.ServiceConfig.Schema, "", " ") - m.Respond(response) - } - - if err := svc.addInternalHandlerGroup(nc, SrvInfo, infoHandler); err != nil { - return nil, err - } - if err := svc.addInternalHandlerGroup(nc, SrvPing, pingHandler); err != nil { - return nil, err - } - if err := svc.addInternalHandlerGroup(nc, SrvStatus, statusHandler); err != nil { - return nil, err - } - - if svc.ServiceConfig.Schema.Request != "" || svc.ServiceConfig.Schema.Response != "" { - if err := svc.addInternalHandlerGroup(nc, SrvSchema, schemaHandler); err != nil { - return nil, err - } - } - - svc.stats.ID = id - svc.stats.Started = time.Now() - return svc, nil -} - -// addInternalHandlerGroup generates control handlers for a specific verb -// each request generates 3 subscriptions, one for the general verb -// affecting all services written with the framework, one that handles -// all services of a particular kind, and finally a specific service. -func (svc *service) addInternalHandlerGroup(nc *nats.Conn, verb ServiceVerb, handler nats.MsgHandler) error { - name := fmt.Sprintf("%s-all", verb.String()) - if err := svc.addInternalHandler(nc, verb, "", "", name, handler); err != nil { - return err - } - name = fmt.Sprintf("%s-kind", verb.String()) - if err := svc.addInternalHandler(nc, verb, svc.Name(), "", name, handler); err != nil { - return err - } - return svc.addInternalHandler(nc, verb, svc.Name(), svc.ID(), verb.String(), handler) -} - -// addInternalHandler registers a control subject handler -func (svc *service) addInternalHandler(nc *nats.Conn, verb ServiceVerb, kind, id, name string, handler nats.MsgHandler) error { - subj, err := SvcControlSubject(verb, kind, id) - if err != nil { - svc.Stop() - return err - } - - svc.internal[name], err = nc.Subscribe(subj, func(msg *nats.Msg) { - start := time.Now() - defer func() { - svc.Lock() - stats := svc.statuses[name] - stats.NumRequests++ - stats.TotalLatency += time.Since(start) - stats.AverageLatency = stats.TotalLatency / time.Duration(stats.NumRequests) - svc.Unlock() - }() - handler(msg) - }) - if err != nil { - svc.Stop() - return err - } - - svc.statuses[name] = &Stats{ - Name: name, - } - return nil -} - -// reqHandler itself -func (svc *service) reqHandler(req *nats.Msg) { - start := time.Now() - defer func() { - svc.Lock() - stats := svc.statuses[""] - stats.NumRequests++ - stats.TotalLatency += time.Since(start) - stats.AverageLatency = stats.TotalLatency / time.Duration(stats.NumRequests) - svc.Unlock() - }() - - if err := svc.ServiceConfig.Endpoint.Handler(svc, req); err != nil { - hdr := make(nats.Header) - apiErr := &ServiceAPIError{} - if ok := errors.As(err, &apiErr); !ok { - hdr[ServiceErrorHeader] = []string{fmt.Sprintf("%d %s", 500, err.Error())} - } else { - hdr[ServiceErrorHeader] = []string{apiErr.Error()} - } - svc.Lock() - stats := svc.statuses[""] - stats.NumErrors++ - svc.Unlock() - - svc.conn.PublishMsg(&nats.Msg{ - Subject: req.Reply, - Header: hdr, - }) - } -} - -func (svc *service) Stop() { - if svc.reqSub != nil { - svc.reqSub.Drain() - svc.reqSub = nil - } - var keys []string - for key, sub := range svc.internal { - keys = append(keys, key) - sub.Drain() - } - for _, key := range keys { - delete(svc.internal, key) - } -} - -func (svc *service) ID() string { - return svc.id -} - -func (svc *service) Name() string { - return svc.ServiceConfig.Name -} - -func (svc *service) Description() string { - return svc.ServiceConfig.Description -} - -func (svc *service) Version() string { - return svc.ServiceConfig.Version -} - -func (svc *service) Stats() ServiceStats { - svc.Lock() - defer func() { - svc.Unlock() - }() - if svc.ServiceConfig.StatusHandler != nil { - stats := svc.statuses[""] - stats.Data = svc.ServiceConfig.StatusHandler(svc.Endpoint) - } - idx := 0 - v := make([]Stats, len(svc.statuses)) - for _, se := range svc.statuses { - v[idx] = *se - idx++ - } - svc.stats.Endpoints = v - return *svc.stats -} - -func (svc *service) Reset() { - for _, se := range svc.statuses { - se.NumRequests = 0 - se.TotalLatency = 0 - se.NumErrors = 0 - se.Data = nil - } -} - -// SvcControlSubject returns monitoring subjects used by the ServiceImpl -func SvcControlSubject(verb ServiceVerb, kind, id string) (string, error) { - sverb := verb.String() - if sverb == "" { - return "", fmt.Errorf("unsupported service verb") - } - kind = strings.ToUpper(kind) - if kind == "" && id == "" { - return fmt.Sprintf("%s.%s", ServiceApiPrefix, sverb), nil - } - if id == "" { - return fmt.Sprintf("%s.%s.%s", ServiceApiPrefix, sverb, kind), nil - } - return fmt.Sprintf("%s.%s.%s.%s", ServiceApiPrefix, sverb, kind, id), nil -} diff -Nru golang-github-nats-io-go-nats-1.20.0/services/service_test.go golang-github-nats-io-go-nats-1.22.1/services/service_test.go --- golang-github-nats-io-go-nats-1.20.0/services/service_test.go 2022-11-11 15:16:40.000000000 +0000 +++ golang-github-nats-io-go-nats-1.22.1/services/service_test.go 1970-01-01 00:00:00.000000000 +0000 @@ -1,260 +0,0 @@ -// Copyright 2022 The NATS Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package services - -import ( - "encoding/json" - "fmt" - "math/rand" - "testing" - "time" - - "github.com/nats-io/nats-server/v2/server" - natsserver "github.com/nats-io/nats-server/v2/test" - "github.com/nats-io/nats.go" -) - -func TestServiceBasics(t *testing.T) { - s := RunServerOnPort(-1) - defer s.Shutdown() - - nc, err := nats.Connect(s.ClientURL()) - if err != nil { - t.Fatalf("Expected to connect to server, got %v", err) - } - defer nc.Close() - - // Stub service. - doAdd := func(svc Service, req *nats.Msg) error { - if rand.Intn(10) == 0 { - return fmt.Errorf("Unexpected Error!") - } - // Happy Path. - // Random delay between 5-10ms - time.Sleep(5*time.Millisecond + time.Duration(rand.Intn(5))*time.Millisecond) - if err := req.Respond([]byte("42")); err != nil { - return err - } - return nil - } - - var svcs []Service - - // Create 5 service responders. - config := ServiceConfig{ - Name: "CoolAddService", - Version: "v0.1", - Description: "Add things together", - Endpoint: Endpoint{ - Subject: "svc.add", - Handler: doAdd, - }, - Schema: ServiceSchema{Request: "", Response: ""}, - } - - for i := 0; i < 5; i++ { - svc, err := Add(nc, config) - if err != nil { - t.Fatalf("Expected to create Service, got %v", err) - } - defer svc.Stop() - svcs = append(svcs, svc) - } - - // Now send 50 requests. - for i := 0; i < 50; i++ { - _, err := nc.Request("svc.add", []byte(`{ "x": 22, "y": 11 }`), time.Second) - if err != nil { - t.Fatalf("Expected a response, got %v", err) - } - } - - for _, svc := range svcs { - if svc.Name() != "CoolAddService" { - t.Fatalf("Expected %q, got %q", "CoolAddService", svc.Name()) - } - if len(svc.Description()) == 0 || len(svc.Version()) == 0 { - t.Fatalf("Expected non empty description and version") - } - } - - // Make sure we can request info, 1 response. - // This could be exported as well as main ServiceImpl. - subj, err := SvcControlSubject(SrvInfo, "CoolAddService", "") - if err != nil { - t.Fatalf("Failed to building info subject %v", err) - } - info, err := nc.Request(subj, nil, time.Second) - if err != nil { - t.Fatalf("Expected a response, got %v", err) - } - var inf ServiceInfo - if err := json.Unmarshal(info.Data, &inf); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if inf.Subject != "svc.add" { - t.Fatalf("expected service subject to be srv.add: %s", inf.Subject) - } - - // Ping all services. Multiple responses. - // could do STATZ too? - inbox := nats.NewInbox() - sub, err := nc.SubscribeSync(inbox) - if err != nil { - t.Fatalf("subscribe failed: %s", err) - } - pingSubject, err := SvcControlSubject(SrvPing, "CoolAddService", "") - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if err := nc.PublishRequest(pingSubject, inbox, nil); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - var pingCount int - for { - _, err := sub.NextMsg(250 * time.Millisecond) - if err != nil { - break - } - pingCount++ - } - if pingCount != 5 { - t.Fatalf("Expected 5 ping responses, got: %d", pingCount) - } - - // Get stats from all services - statsInbox := nats.NewInbox() - sub, err = nc.SubscribeSync(statsInbox) - if err != nil { - t.Fatalf("subscribe failed: %s", err) - } - statsSubject, err := SvcControlSubject(SrvStatus, "CoolAddService", "") - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if err := nc.PublishRequest(statsSubject, statsInbox, nil); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - stats := make([]ServiceStats, 0) - var requestsNum int - for { - resp, err := sub.NextMsg(250 * time.Millisecond) - if err != nil { - break - } - var srvStats ServiceStats - if err := json.Unmarshal(resp.Data, &srvStats); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if len(srvStats.Endpoints) != 10 { - t.Fatalf("Expected 10 endpoints on a serivce, got: %d", len(srvStats.Endpoints)) - } - for _, e := range srvStats.Endpoints { - if e.Name == "CoolAddService" { - requestsNum += e.NumRequests - } - } - stats = append(stats, srvStats) - } - if len(stats) != 5 { - t.Fatalf("Expected stats for 5 services, got: %d", len(stats)) - } - - // Services should process 50 requests total - if requestsNum != 50 { - t.Fatalf("Expected a total fo 50 requests processed, got: %d", requestsNum) - } -} - -func TestServiceErrors(t *testing.T) { - tests := []struct { - name string - handlerResponse error - expectedStatus string - }{ - { - name: "generic error", - handlerResponse: fmt.Errorf("oops"), - expectedStatus: "500 oops", - }, - { - name: "api error", - handlerResponse: &ServiceAPIError{ErrorCode: 400, Description: "oops"}, - expectedStatus: "400 oops", - }, - { - name: "no error", - handlerResponse: nil, - expectedStatus: "", - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - s := RunServerOnPort(-1) - defer s.Shutdown() - - nc, err := nats.Connect(s.ClientURL()) - if err != nil { - t.Fatalf("Expected to connect to server, got %v", err) - } - defer nc.Close() - - // Stub service. - handler := func(svc Service, req *nats.Msg) error { - if test.handlerResponse == nil { - if err := req.Respond([]byte("ok")); err != nil { - return err - } - } - return test.handlerResponse - } - - svc, err := Add(nc, ServiceConfig{ - Name: "CoolService", - Description: "Erroring service", - Endpoint: Endpoint{ - Subject: "svc.fail", - Handler: handler, - }, - }) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - defer svc.Stop() - - resp, err := nc.Request("svc.fail", nil, 1*time.Second) - if err != nil { - t.Fatalf("request error") - } - - status := resp.Header.Get("Nats-Service-Error") - if status != test.expectedStatus { - t.Fatalf("Invalid response status; want: %q; got: %q", test.expectedStatus, status) - } - }) - } -} - -func RunServerOnPort(port int) *server.Server { - opts := natsserver.DefaultTestOptions - opts.Port = port - return RunServerWithOptions(&opts) -} - -func RunServerWithOptions(opts *server.Options) *server.Server { - return natsserver.RunServer(opts) -} diff -Nru golang-github-nats-io-go-nats-1.20.0/test/conn_test.go golang-github-nats-io-go-nats-1.22.1/test/conn_test.go --- golang-github-nats-io-go-nats-1.20.0/test/conn_test.go 2022-11-11 15:16:40.000000000 +0000 +++ golang-github-nats-io-go-nats-1.22.1/test/conn_test.go 2022-12-22 14:09:21.000000000 +0000 @@ -19,8 +19,8 @@ "crypto/tls" "crypto/x509" "fmt" - "io/ioutil" "net" + "os" "runtime" "strconv" "strings" @@ -198,7 +198,7 @@ // Let's be more TLS correct and verify servername, endpoint etc. // Now do more advanced checking, verifying servername and using rootCA. // Setup our own TLSConfig using RootCA from our self signed cert. - rootPEM, err := ioutil.ReadFile("./configs/certs/ca.pem") + rootPEM, err := os.ReadFile("./configs/certs/ca.pem") if err != nil || rootPEM == nil { t.Fatalf("failed to read root certificate") } @@ -668,15 +668,11 @@ defer s.Shutdown() firstDisconnect := true - dtime1 := time.Time{} - dtime2 := time.Time{} - rtime := time.Time{} - atime1 := time.Time{} - atime2 := time.Time{} - ctime := time.Time{} + var connTime, dtime1, dtime2, rtime, atime1, atime2, ctime time.Time cbErrors := make(chan error, 20) + connected := make(chan bool) reconnected := make(chan bool) closed := make(chan bool) asyncErr := make(chan bool, 2) @@ -684,6 +680,17 @@ recvCh1 := make(chan bool) recvCh2 := make(chan bool) + connCh := func(nc *nats.Conn) { + if err := isRunningInAsyncCBDispatcher(); err != nil { + cbErrors <- err + connected <- true + return + } + time.Sleep(50 * time.Millisecond) + connTime = time.Now() + connected <- true + } + dch := func(nc *nats.Conn) { if err := isRunningInAsyncCBDispatcher(); err != nil { cbErrors <- err @@ -738,6 +745,7 @@ url = "nats://" + url + "," + nats.DefaultURL nc, err := nats.Connect(url, + nats.ConnectHandler(connCh), nats.DisconnectHandler(dch), nats.ReconnectHandler(rch), nats.ClosedHandler(cch), @@ -751,6 +759,12 @@ } defer nc.Close() + // Wait for notification on connection established + err = Wait(connected) + if err != nil { + t.Fatal("Did not get the connected callback") + } + ncp, err := nats.Connect(nats.DefaultURL, nats.ReconnectWait(50*time.Millisecond)) if err != nil { @@ -771,8 +785,7 @@ t.Fatal("Did not get the reconnected callback") } - var sub1 *nats.Subscription - var sub2 *nats.Subscription + var sub1, sub2 *nats.Subscription recv := func(m *nats.Msg) { // Signal that one message is received @@ -840,12 +853,12 @@ t.Fatalf("%v", <-cbErrors) } - if (dtime1 == time.Time{}) || (dtime2 == time.Time{}) || (rtime == time.Time{}) || (atime1 == time.Time{}) || (atime2 == time.Time{}) || (ctime == time.Time{}) { + if (connTime == time.Time{}) || (dtime1 == time.Time{}) || (dtime2 == time.Time{}) || (rtime == time.Time{}) || (atime1 == time.Time{}) || (atime2 == time.Time{}) || (ctime == time.Time{}) { t.Fatalf("Some callbacks did not fire:\n%v\n%v\n%v\n%v\n%v\n%v", dtime1, rtime, atime1, atime2, dtime2, ctime) } - if rtime.Before(dtime1) || dtime2.Before(rtime) || atime2.Before(atime1) || ctime.Before(atime2) { - t.Fatalf("Wrong callback order:\n%v\n%v\n%v\n%v\n%v\n%v", dtime1, rtime, atime1, atime2, dtime2, ctime) + if dtime1.Before(connTime) || rtime.Before(dtime1) || dtime2.Before(rtime) || atime2.Before(atime1) || ctime.Before(atime2) { + t.Fatalf("Wrong callback order:\n%v\n%v\n%v\n%v\n%v\n%v\n%v", connTime, dtime1, rtime, atime1, atime2, dtime2, ctime) } // Close the other connection @@ -864,6 +877,82 @@ t.Fatalf("The async callback dispatcher(s) should have stopped") } +func TestConnectHandler(t *testing.T) { + t.Run("with RetryOnFailedConnect, connection established", func(t *testing.T) { + s := RunDefaultServer() + defer s.Shutdown() + + connected := make(chan bool) + connHandler := func(*nats.Conn) { + connected <- true + } + nc, err := nats.Connect(nats.DefaultURL, + nats.ConnectHandler(connHandler), + nats.RetryOnFailedConnect(true)) + + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + if err = Wait(connected); err != nil { + t.Fatal("Timeout waiting for connect handler") + } + }) + t.Run("with RetryOnFailedConnect, connection failed", func(t *testing.T) { + connected := make(chan bool) + connHandler := func(*nats.Conn) { + connected <- true + } + _, err := nats.Connect(nats.DefaultURL, + nats.ConnectHandler(connHandler), + nats.RetryOnFailedConnect(true)) + + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + select { + case <-connected: + t.Fatalf("ConnectedCB invoked when no connection established") + case <-time.After(100 * time.Millisecond): + } + }) + t.Run("no RetryOnFailedConnect, connection established", func(t *testing.T) { + s := RunDefaultServer() + defer s.Shutdown() + + connected := make(chan bool) + connHandler := func(*nats.Conn) { + connected <- true + } + nc, err := nats.Connect(nats.DefaultURL, + nats.ConnectHandler(connHandler)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + if err = Wait(connected); err != nil { + t.Fatal("Timeout waiting for connect handler") + } + }) + t.Run("no RetryOnFailedConnect, connection failed", func(t *testing.T) { + connected := make(chan bool) + connHandler := func(*nats.Conn) { + connected <- true + } + _, err := nats.Connect(nats.DefaultURL, + nats.ConnectHandler(connHandler)) + + if err == nil { + t.Fatalf("Expected error on connect, got nil") + } + select { + case <-connected: + t.Fatalf("ConnectedCB invoked when no connection established") + case <-time.After(100 * time.Millisecond): + } + }) +} + func TestFlushReleaseOnClose(t *testing.T) { serverInfo := "INFO {\"server_id\":\"foobar\",\"host\":\"%s\",\"port\":%d,\"auth_required\":false,\"tls_required\":false,\"max_payload\":1048576}\r\n" diff -Nru golang-github-nats-io-go-nats-1.20.0/test/helper_test.go golang-github-nats-io-go-nats-1.22.1/test/helper_test.go --- golang-github-nats-io-go-nats-1.20.0/test/helper_test.go 2022-11-11 15:16:40.000000000 +0000 +++ golang-github-nats-io-go-nats-1.22.1/test/helper_test.go 2022-12-22 14:09:21.000000000 +0000 @@ -16,7 +16,6 @@ import ( "errors" "fmt" - "io/ioutil" "os" "runtime" "strings" @@ -135,13 +134,13 @@ func createConfFile(t *testing.T, content []byte) string { t.Helper() - conf, err := ioutil.TempFile("", "") + conf, err := os.CreateTemp("", "") if err != nil { t.Fatalf("Error creating conf file: %v", err) } fName := conf.Name() conf.Close() - if err := ioutil.WriteFile(fName, content, 0666); err != nil { + if err := os.WriteFile(fName, content, 0666); err != nil { os.Remove(fName) t.Fatalf("Error writing conf file: %v", err) } diff -Nru golang-github-nats-io-go-nats-1.20.0/test/js_test.go golang-github-nats-io-go-nats-1.22.1/test/js_test.go --- golang-github-nats-io-go-nats-1.20.0/test/js_test.go 2022-11-11 15:16:40.000000000 +0000 +++ golang-github-nats-io-go-nats-1.22.1/test/js_test.go 2022-12-22 14:09:21.000000000 +0000 @@ -18,7 +18,6 @@ "crypto/rand" "errors" "fmt" - "io/ioutil" mrand "math/rand" "net" "net/url" @@ -5049,7 +5048,7 @@ o := natsserver.DefaultTestOptions o.JetStream = true o.ServerName = fmt.Sprintf("NODE_%d", i) - tdir, err := ioutil.TempDir(os.TempDir(), fmt.Sprintf("%s_%s-", o.ServerName, clusterName)) + tdir, err := os.MkdirTemp(os.TempDir(), fmt.Sprintf("%s_%s-", o.ServerName, clusterName)) if err != nil { t.Fatal(err) } diff -Nru golang-github-nats-io-go-nats-1.20.0/test/object_test.go golang-github-nats-io-go-nats-1.22.1/test/object_test.go --- golang-github-nats-io-go-nats-1.20.0/test/object_test.go 2022-11-11 15:16:40.000000000 +0000 +++ golang-github-nats-io-go-nats-1.22.1/test/object_test.go 2022-12-22 14:09:21.000000000 +0000 @@ -19,7 +19,7 @@ "crypto/rand" "crypto/sha256" "fmt" - "io/ioutil" + "io" "os" "path" "path/filepath" @@ -105,7 +105,7 @@ } // Check result. - copy, err := ioutil.ReadAll(result) + copy, err := io.ReadAll(result) expectOk(t, err) if !bytes.Equal(copy, blob) { t.Fatalf("Result not the same") @@ -143,7 +143,7 @@ res, err := obs.Get("A") expectOk(t, err) // first read should be successful - data, err := ioutil.ReadAll(res) + data, err := io.ReadAll(res) expectOk(t, err) if string(data) != "abc" { t.Fatalf("Expected result: 'abc'; got: %s", string(data)) @@ -158,7 +158,7 @@ res, err = obs.Get("A") expectOk(t, err) - _, err = ioutil.ReadAll(res) + _, err = io.ReadAll(res) expectErr(t, err, nats.ErrDigestMismatch) expectErr(t, res.Error(), nats.ErrDigestMismatch) } @@ -205,16 +205,16 @@ blob := make([]byte, 8*1024*1024+33) rand.Read(blob) - tmpFile, err := ioutil.TempFile("", "objfile") + tmpFile, err := os.CreateTemp("", "objfile") expectOk(t, err) defer os.Remove(tmpFile.Name()) // clean up - err = ioutil.WriteFile(tmpFile.Name(), blob, 0600) + err = os.WriteFile(tmpFile.Name(), blob, 0600) expectOk(t, err) _, err = obs.PutFile(tmpFile.Name()) expectOk(t, err) - tmpResult, err := ioutil.TempFile("", "objfileresult") + tmpResult, err := os.CreateTemp("", "objfileresult") expectOk(t, err) defer os.Remove(tmpResult.Name()) // clean up @@ -222,10 +222,10 @@ expectOk(t, err) // Make sure they are the same. - original, err := ioutil.ReadFile(tmpFile.Name()) + original, err := os.ReadFile(tmpFile.Name()) expectOk(t, err) - restored, err := ioutil.ReadFile(tmpResult.Name()) + restored, err := os.ReadFile(tmpResult.Name()) expectOk(t, err) if !bytes.Equal(original, restored) { @@ -244,7 +244,7 @@ expectOk(t, err) numFiles := 0 - fis, _ := ioutil.ReadDir(".") + fis, _ := os.ReadDir(".") for _, fi := range fis { fn := fi.Name() // Just grab clean test files. @@ -268,10 +268,10 @@ _, err = result.Info() expectOk(t, err) - copy, err := ioutil.ReadAll(result) + copy, err := io.ReadAll(result) expectOk(t, err) - orig, err := ioutil.ReadFile(path.Join(".", "object_test.go")) + orig, err := os.ReadFile(path.Join(".", "object_test.go")) expectOk(t, err) if !bytes.Equal(orig, copy) { @@ -929,7 +929,7 @@ for _, test := range tests { t.Run(test.inputFile, func(t *testing.T) { - data, err := ioutil.ReadFile(fmt.Sprintf("./testdata/%s", test.inputFile)) + data, err := os.ReadFile(fmt.Sprintf("./testdata/%s", test.inputFile)) expectOk(t, err) h := sha256.New() h.Write(data) @@ -966,7 +966,7 @@ for _, test := range tests { t.Run(test.expectedFile, func(t *testing.T) { - expected, err := ioutil.ReadFile(fmt.Sprintf("./testdata/%s", test.expectedFile)) + expected, err := os.ReadFile(fmt.Sprintf("./testdata/%s", test.expectedFile)) h := sha256.New() h.Write(expected) expected = h.Sum(nil) diff -Nru golang-github-nats-io-go-nats-1.20.0/timer.go golang-github-nats-io-go-nats-1.22.1/timer.go --- golang-github-nats-io-go-nats-1.20.0/timer.go 2022-11-11 15:16:40.000000000 +0000 +++ golang-github-nats-io-go-nats-1.22.1/timer.go 2022-12-22 14:09:21.000000000 +0000 @@ -1,4 +1,4 @@ -// Copyright 2017-2018 The NATS Authors +// Copyright 2017-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff -Nru golang-github-nats-io-go-nats-1.20.0/timer_test.go golang-github-nats-io-go-nats-1.22.1/timer_test.go --- golang-github-nats-io-go-nats-1.20.0/timer_test.go 2022-11-11 15:16:40.000000000 +0000 +++ golang-github-nats-io-go-nats-1.22.1/timer_test.go 2022-12-22 14:09:21.000000000 +0000 @@ -1,4 +1,4 @@ -// Copyright 2017-2018 The NATS Authors +// Copyright 2017-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff -Nru golang-github-nats-io-go-nats-1.20.0/ws.go golang-github-nats-io-go-nats-1.22.1/ws.go --- golang-github-nats-io-go-nats-1.20.0/ws.go 2022-11-11 15:16:40.000000000 +0000 +++ golang-github-nats-io-go-nats-1.22.1/ws.go 2022-12-22 14:09:21.000000000 +0000 @@ -1,4 +1,4 @@ -// Copyright 2021 The NATS Authors +// Copyright 2021-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -24,7 +24,6 @@ "errors" "fmt" "io" - "io/ioutil" mrand "math/rand" "net/http" "net/url" @@ -170,8 +169,7 @@ } else { d.flate.(flate.Resetter).Reset(d, nil) } - // TODO: When Go 1.15 support is dropped, replace with io.ReadAll() - b, err := ioutil.ReadAll(d.flate) + b, err := io.ReadAll(d.flate) // Now reset the compressed buffers list d.bufs = nil return b, err diff -Nru golang-github-nats-io-go-nats-1.20.0/ws_test.go golang-github-nats-io-go-nats-1.22.1/ws_test.go --- golang-github-nats-io-go-nats-1.20.0/ws_test.go 2022-11-11 15:16:40.000000000 +0000 +++ golang-github-nats-io-go-nats-1.22.1/ws_test.go 2022-12-22 14:09:21.000000000 +0000 @@ -1,4 +1,4 @@ -// Copyright 2021 The NATS Authors +// Copyright 2021-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -868,6 +868,60 @@ } } +type testSkipTLSDialer struct { + dialer *net.Dialer + skipTLS bool +} + +func (sd *testSkipTLSDialer) Dial(network, address string) (net.Conn, error) { + return sd.dialer.Dial(network, address) +} + +func (sd *testSkipTLSDialer) SkipTLSHandshake() bool { + return sd.skipTLS +} + +func TestWSWithTLSCustomDialer(t *testing.T) { + sopts := testWSGetDefaultOptions(t, true) + s := RunServerWithOptions(sopts) + defer s.Shutdown() + + sd := &testSkipTLSDialer{ + dialer: &net.Dialer{ + Timeout: 2 * time.Second, + }, + skipTLS: true, + } + + // Connect with CustomDialer that fails since TLSHandshake is disabled. + copts := make([]Option, 0) + copts = append(copts, Secure(&tls.Config{InsecureSkipVerify: true})) + copts = append(copts, SetCustomDialer(sd)) + _, err := Connect(fmt.Sprintf("wss://localhost:%d", sopts.Websocket.Port), copts...) + if err == nil { + t.Fatalf("Expected error on connect: %v", err) + } + if err.Error() != `invalid websocket connection` { + t.Logf("Expected invalid websocket connection: %v", err) + } + + // Retry with the dialer. + copts = make([]Option, 0) + sd = &testSkipTLSDialer{ + dialer: &net.Dialer{ + Timeout: 2 * time.Second, + }, + skipTLS: false, + } + copts = append(copts, Secure(&tls.Config{InsecureSkipVerify: true})) + copts = append(copts, SetCustomDialer(sd)) + nc, err := Connect(fmt.Sprintf("wss://localhost:%d", sopts.Websocket.Port), copts...) + if err != nil { + t.Fatalf("Unexpected error on connect: %v", err) + } + defer nc.Close() +} + func TestWSTlsNoConfig(t *testing.T) { opts := GetDefaultOptions() opts.Servers = []string{"wss://localhost:443"}