diff -Nru golang-github-samuel-go-zookeeper-0.0~git20161029.0.1d7be4e/debian/changelog golang-github-samuel-go-zookeeper-0.0~git20180130.c4fab1a/debian/changelog --- golang-github-samuel-go-zookeeper-0.0~git20161029.0.1d7be4e/debian/changelog 2016-11-04 03:13:44.000000000 +0000 +++ golang-github-samuel-go-zookeeper-0.0~git20180130.c4fab1a/debian/changelog 2018-03-06 08:26:14.000000000 +0000 @@ -1,8 +1,23 @@ +golang-github-samuel-go-zookeeper (0.0~git20180130.c4fab1a-1) unstable; urgency=medium + + * Team upload. + * New upstream release. + * d/control: + + Bump Standards-Version. + + Fix testsuite-autopkgtest-missing. + * d/copyright: + + Switch the copyright-format URL to https. + + Drop copyright section about removed files. + * Add d/gbp.conf. + + -- Mpampis Kostas Tue, 06 Mar 2018 10:26:14 +0200 + golang-github-samuel-go-zookeeper (0.0~git20161029.0.1d7be4e-1) unstable; urgency=medium [ Paul Tagliamonte ] * Team upload. * Use a secure transport for the Vcs-Git and Vcs-Browser URL + * Remove Built-Using from arch:all -dev package [ Martín Ferrari ] * Merge upstream branch, so work is possible. diff -Nru golang-github-samuel-go-zookeeper-0.0~git20161029.0.1d7be4e/debian/control golang-github-samuel-go-zookeeper-0.0~git20180130.c4fab1a/debian/control --- golang-github-samuel-go-zookeeper-0.0~git20161029.0.1d7be4e/debian/control 2016-11-04 03:13:44.000000000 +0000 +++ golang-github-samuel-go-zookeeper-0.0~git20180130.c4fab1a/debian/control 2018-03-06 08:26:14.000000000 +0000 @@ -1,6 +1,6 @@ Source: golang-github-samuel-go-zookeeper Section: devel -Priority: extra +Priority: optional Maintainer: pkg-go Uploaders: Dmitry Smirnov , Martín Ferrari , @@ -8,16 +8,16 @@ Build-Depends: debhelper (>= 9), dh-golang (>= 1.17~), golang-any, -Standards-Version: 3.9.8 +Standards-Version: 4.1.3 Homepage: https://github.com/samuel/go-zookeeper Vcs-Browser: https://anonscm.debian.org/cgit/pkg-go/packages/golang-github-samuel-go-zookeeper.git Vcs-Git: https://anonscm.debian.org/git/pkg-go/packages/golang-github-samuel-go-zookeeper.git XS-Go-Import-Path: github.com/samuel/go-zookeeper +Testsuite: autopkgtest-pkg-go Package: golang-github-samuel-go-zookeeper-dev Architecture: all Depends: ${misc:Depends}, ${shlibs:Depends}, -Built-Using: ${misc:Built-Using} Description: native ZooKeeper client for Go Native Go Zookeeper Client Library diff -Nru golang-github-samuel-go-zookeeper-0.0~git20161029.0.1d7be4e/debian/copyright golang-github-samuel-go-zookeeper-0.0~git20180130.c4fab1a/debian/copyright --- golang-github-samuel-go-zookeeper-0.0~git20161029.0.1d7be4e/debian/copyright 2016-11-04 03:13:44.000000000 +0000 +++ golang-github-samuel-go-zookeeper-0.0~git20180130.c4fab1a/debian/copyright 2018-03-06 08:26:14.000000000 +0000 @@ -1,4 +1,4 @@ -Format: http://www.debian.org/doc/packaging-manuals/copyright-format/1.0/ +Format: https://www.debian.org/doc/packaging-manuals/copyright-format/1.0/ Upstream-Name: go-zookeeper Source: https://github.com/samuel/go-zookeeper @@ -10,11 +10,6 @@ Copyright: 2015 Dmitry Smirnov License: GPL-3+ -Files: debian/patches/* -Copyright: 2015 Dmitry Smirnov -License: GPL-3+ or BSD-3-clause -Comment: patches can be licensed under the same terms as upstream. - License: BSD-3-clause Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: diff -Nru golang-github-samuel-go-zookeeper-0.0~git20161029.0.1d7be4e/debian/gbp.conf golang-github-samuel-go-zookeeper-0.0~git20180130.c4fab1a/debian/gbp.conf --- golang-github-samuel-go-zookeeper-0.0~git20161029.0.1d7be4e/debian/gbp.conf 1970-01-01 00:00:00.000000000 +0000 +++ golang-github-samuel-go-zookeeper-0.0~git20180130.c4fab1a/debian/gbp.conf 2018-03-06 08:26:14.000000000 +0000 @@ -0,0 +1,2 @@ +[DEFAULT] +pristine-tar = True diff -Nru golang-github-samuel-go-zookeeper-0.0~git20161029.0.1d7be4e/.travis.yml golang-github-samuel-go-zookeeper-0.0~git20180130.c4fab1a/.travis.yml --- golang-github-samuel-go-zookeeper-0.0~git20161029.0.1d7be4e/.travis.yml 2016-10-28 23:23:40.000000000 +0000 +++ golang-github-samuel-go-zookeeper-0.0~git20180130.c4fab1a/.travis.yml 2018-03-02 14:37:44.000000000 +0000 @@ -1,6 +1,9 @@ language: go go: - - 1.7 + - 1.9 + +jdk: + - oraclejdk9 sudo: false @@ -9,12 +12,13 @@ - master before_install: - - wget http://apache.claz.org/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz - - tar -zxvf zookeeper*tar.gz + - wget http://apache.cs.utah.edu/zookeeper/zookeeper-${zk_version}/zookeeper-${zk_version}.tar.gz + - tar -zxvf zookeeper*tar.gz && zip -d zookeeper-${zk_version}/contrib/fatjar/zookeeper-${zk_version}-fatjar.jar 'META-INF/*.SF' 'META-INF/*.DSA' - go get github.com/mattn/goveralls - go get golang.org/x/tools/cmd/cover script: + - jdk_switcher use oraclejdk9 - go build ./... - go fmt ./... - go vet ./... @@ -25,3 +29,5 @@ env: global: secure: Coha3DDcXmsekrHCZlKvRAc+pMBaQU1QS/3++3YCCUXVDBWgVsC1ZIc9df4RLdZ/ncGd86eoRq/S+zyn1XbnqK5+ePqwJoUnJ59BE8ZyHLWI9ajVn3fND1MTduu/ksGsS79+IYbdVI5wgjSgjD3Ktp6Y5uPl+BPosjYBGdNcHS4= + matrix: + - zk_version=3.4.10 diff -Nru golang-github-samuel-go-zookeeper-0.0~git20161029.0.1d7be4e/zk/cluster_test.go golang-github-samuel-go-zookeeper-0.0~git20180130.c4fab1a/zk/cluster_test.go --- golang-github-samuel-go-zookeeper-0.0~git20161029.0.1d7be4e/zk/cluster_test.go 2016-10-28 23:23:40.000000000 +0000 +++ golang-github-samuel-go-zookeeper-0.0~git20180130.c4fab1a/zk/cluster_test.go 2018-03-02 14:37:44.000000000 +0000 @@ -75,8 +75,7 @@ tc.StopServer(hasSessionEvent1.Server) // Wait for the session to be reconnected with the new leader. - hasSessionWatcher2.Wait(8 * time.Second) - if hasSessionWatcher2 == nil { + if hasSessionWatcher2.Wait(8*time.Second) == nil { t.Fatalf("Failover failed") } diff -Nru golang-github-samuel-go-zookeeper-0.0~git20161029.0.1d7be4e/zk/conn.go golang-github-samuel-go-zookeeper-0.0~git20180130.c4fab1a/zk/conn.go --- golang-github-samuel-go-zookeeper-0.0~git20161029.0.1d7be4e/zk/conn.go 2016-10-28 23:23:40.000000000 +0000 +++ golang-github-samuel-go-zookeeper-0.0~git20180130.c4fab1a/zk/conn.go 2018-03-02 14:37:44.000000000 +0000 @@ -85,6 +85,7 @@ pingInterval time.Duration recvTimeout time.Duration connectTimeout time.Duration + maxBufferSize int creds []authCreds credsMu sync.Mutex // protects server @@ -97,9 +98,15 @@ closeChan chan struct{} // channel to tell send loop stop // Debug (used by unit tests) - reconnectDelay time.Duration + reconnectLatch chan struct{} + setWatchLimit int + setWatchCallback func([]*setWatchesRequest) + // Debug (for recurring re-auth hang) + debugCloseRecvLoop bool + debugReauthDone chan struct{} - logger Logger + logger Logger + logInfo bool // true if information messages are logged; false if only errors are logged buf []byte } @@ -197,10 +204,8 @@ watchers: make(map[watchPathType][]chan Event), passwd: emptyPassword, logger: DefaultLogger, + logInfo: true, // default is true for backwards compatability buf: make([]byte, bufferSize), - - // Debug - reconnectDelay: 0, } // Set provided options. @@ -237,6 +242,21 @@ } } +// WithLogger returns a connection option specifying a non-default Logger +func WithLogger(logger Logger) connOption { + return func(c *Conn) { + c.logger = logger + } +} + +// WithLogInfo returns a connection option specifying whether or not information messages +// shoud be logged. +func WithLogInfo(logInfo bool) connOption { + return func(c *Conn) { + c.logInfo = logInfo + } +} + // EventCallback is a function that is called when an Event occurs. type EventCallback func(Event) @@ -249,6 +269,46 @@ } } +// WithMaxBufferSize sets the maximum buffer size used to read and decode +// packets received from the Zookeeper server. The standard Zookeeper client for +// Java defaults to a limit of 1mb. For backwards compatibility, this Go client +// defaults to unbounded unless overridden via this option. A value that is zero +// or negative indicates that no limit is enforced. +// +// This is meant to prevent resource exhaustion in the face of potentially +// malicious data in ZK. It should generally match the server setting (which +// also defaults ot 1mb) so that clients and servers agree on the limits for +// things like the size of data in an individual znode and the total size of a +// transaction. +// +// For production systems, this should be set to a reasonable value (ideally +// that matches the server configuration). For ops tooling, it is handy to use a +// much larger limit, in order to do things like clean-up problematic state in +// the ZK tree. For example, if a single znode has a huge number of children, it +// is possible for the response to a "list children" operation to exceed this +// buffer size and cause errors in clients. The only way to subsequently clean +// up the tree (by removing superfluous children) is to use a client configured +// with a larger buffer size that can successfully query for all of the child +// names and then remove them. (Note there are other tools that can list all of +// the child names without an increased buffer size in the client, but they work +// by inspecting the servers' transaction logs to enumerate children instead of +// sending an online request to a server. +func WithMaxBufferSize(maxBufferSize int) connOption { + return func(c *Conn) { + c.maxBufferSize = maxBufferSize + } +} + +// WithMaxConnBufferSize sets maximum buffer size used to send and encode +// packets to Zookeeper server. The standard Zookeepeer client for java defaults +// to a limit of 1mb. This option should be used for non-standard server setup +// where znode is bigger than default 1mb. +func WithMaxConnBufferSize(maxBufferSize int) connOption { + return func(c *Conn) { + c.buf = make([]byte, maxBufferSize) + } +} + func (c *Conn) Close() { close(c.shouldQuit) @@ -321,7 +381,9 @@ if err == nil { c.conn = zkConn c.setState(StateConnected) - c.logger.Printf("Connected to %s", c.Server()) + if c.logInfo { + c.logger.Printf("Connected to %s", c.Server()) + } return nil } @@ -330,15 +392,32 @@ } func (c *Conn) resendZkAuth(reauthReadyChan chan struct{}) { + shouldCancel := func() bool { + select { + case <-c.shouldQuit: + return true + case <-c.closeChan: + return true + default: + return false + } + } + c.credsMu.Lock() defer c.credsMu.Unlock() defer close(reauthReadyChan) - c.logger.Printf("Re-submitting `%d` credentials after reconnect", - len(c.creds)) + if c.logInfo { + c.logger.Printf("Re-submitting `%d` credentials after reconnect", + len(c.creds)) + } for _, cred := range c.creds { + if shouldCancel() { + c.logger.Printf("Cancel rer-submitting credentials") + return + } resChan, err := c.sendRequest( opSetAuth, &setAuthRequest{Type: 0, @@ -354,7 +433,16 @@ continue } - res := <-resChan + var res response + select { + case res = <-resChan: + case <-c.closeChan: + c.logger.Printf("Recv closed, cancel re-submitting credentials") + return + case <-c.shouldQuit: + c.logger.Printf("Should quit, cancel re-submitting credentials") + return + } if res.err != nil { c.logger.Printf("Credential re-submit failed: %s", res.err) // FIXME(prozlach): lets ignore errors for now @@ -404,7 +492,9 @@ c.logger.Printf("Authentication failed: %s", err) c.conn.Close() case err == nil: - c.logger.Printf("Authenticated: id=%d, timeout=%d", c.SessionID(), c.sessionTimeoutMs) + if c.logInfo { + c.logger.Printf("Authenticated: id=%d, timeout=%d", c.SessionID(), c.sessionTimeoutMs) + } c.hostProvider.Connected() // mark success c.closeChan = make(chan struct{}) // channel to tell send loop stop reauthChan := make(chan struct{}) // channel to tell send loop that authdata has been resubmitted @@ -413,16 +503,28 @@ wg.Add(1) go func() { <-reauthChan + if c.debugCloseRecvLoop { + close(c.debugReauthDone) + } err := c.sendLoop() - c.logger.Printf("Send loop terminated: err=%v", err) + if err != nil || c.logInfo { + c.logger.Printf("Send loop terminated: err=%v", err) + } c.conn.Close() // causes recv loop to EOF/exit wg.Done() }() wg.Add(1) go func() { - err := c.recvLoop(c.conn) - c.logger.Printf("Recv loop terminated: err=%v", err) + var err error + if c.debugCloseRecvLoop { + err = errors.New("DEBUG: close recv loop") + } else { + err = c.recvLoop(c.conn) + } + if err != io.EOF || c.logInfo { + c.logger.Printf("Recv loop terminated: err=%v", err) + } if err == nil { panic("zk: recvLoop should never return nil error") } @@ -450,11 +552,11 @@ } c.flushRequests(err) - if c.reconnectDelay > 0 { + if c.reconnectLatch != nil { select { case <-c.shouldQuit: return - case <-time.After(c.reconnectDelay): + case <-c.reconnectLatch: } } } @@ -506,17 +608,41 @@ return } - req := &setWatchesRequest{ - RelativeZxid: c.lastZxid, - DataWatches: make([]string, 0), - ExistWatches: make([]string, 0), - ChildWatches: make([]string, 0), + // NB: A ZK server, by default, rejects packets >1mb. So, if we have too + // many watches to reset, we need to break this up into multiple packets + // to avoid hitting that limit. Mirroring the Java client behavior: we are + // conservative in that we limit requests to 128kb (since server limit is + // is actually configurable and could conceivably be configured smaller + // than default of 1mb). + limit := 128 * 1024 + if c.setWatchLimit > 0 { + limit = c.setWatchLimit } + + var reqs []*setWatchesRequest + var req *setWatchesRequest + var sizeSoFar int + n := 0 for pathType, watchers := range c.watchers { if len(watchers) == 0 { continue } + addlLen := 4 + len(pathType.path) + if req == nil || sizeSoFar+addlLen > limit { + if req != nil { + // add to set of requests that we'll send + reqs = append(reqs, req) + } + sizeSoFar = 28 // fixed overhead of a set-watches packet + req = &setWatchesRequest{ + RelativeZxid: c.lastZxid, + DataWatches: make([]string, 0), + ExistWatches: make([]string, 0), + ChildWatches: make([]string, 0), + } + } + sizeSoFar += addlLen switch pathType.wType { case watchTypeData: req.DataWatches = append(req.DataWatches, pathType.path) @@ -530,12 +656,26 @@ if n == 0 { return } + if req != nil { // don't forget any trailing packet we were building + reqs = append(reqs, req) + } + + if c.setWatchCallback != nil { + c.setWatchCallback(reqs) + } go func() { res := &setWatchesResponse{} - _, err := c.request(opSetWatches, req, res, nil) - if err != nil { - c.logger.Printf("Failed to set previous watches: %s", err.Error()) + // TODO: Pipeline these so queue all of them up before waiting on any + // response. That will require some investigation to make sure there + // aren't failure modes where a blocking write to the channel of requests + // could hang indefinitely and cause this goroutine to leak... + for _, req := range reqs { + _, err := c.request(opSetWatches, req, res, nil) + if err != nil { + c.logger.Printf("Failed to set previous watches: %s", err.Error()) + break + } } }() } @@ -676,7 +816,11 @@ } func (c *Conn) recvLoop(conn net.Conn) error { - buf := make([]byte, bufferSize) + sz := bufferSize + if c.maxBufferSize > 0 && sz > c.maxBufferSize { + sz = c.maxBufferSize + } + buf := make([]byte, sz) for { // package length conn.SetReadDeadline(time.Now().Add(c.recvTimeout)) @@ -687,6 +831,9 @@ blen := int(binary.BigEndian.Uint32(buf[:4])) if cap(buf) < blen { + if c.maxBufferSize > 0 && blen > c.maxBufferSize { + return fmt.Errorf("received packet from server with length %d, which exceeds max buffer size %d", blen, c.maxBufferSize) + } buf = make([]byte, blen) } @@ -831,12 +978,20 @@ } func (c *Conn) Children(path string) ([]string, *Stat, error) { + if err := validatePath(path, false); err != nil { + return nil, nil, err + } + res := &getChildren2Response{} _, err := c.request(opGetChildren2, &getChildren2Request{Path: path, Watch: false}, res, nil) return res.Children, &res.Stat, err } func (c *Conn) ChildrenW(path string) ([]string, *Stat, <-chan Event, error) { + if err := validatePath(path, false); err != nil { + return nil, nil, nil, err + } + var ech <-chan Event res := &getChildren2Response{} _, err := c.request(opGetChildren2, &getChildren2Request{Path: path, Watch: true}, res, func(req *request, res *responseHeader, err error) { @@ -851,6 +1006,10 @@ } func (c *Conn) Get(path string) ([]byte, *Stat, error) { + if err := validatePath(path, false); err != nil { + return nil, nil, err + } + res := &getDataResponse{} _, err := c.request(opGetData, &getDataRequest{Path: path, Watch: false}, res, nil) return res.Data, &res.Stat, err @@ -858,6 +1017,10 @@ // GetW returns the contents of a znode and sets a watch func (c *Conn) GetW(path string) ([]byte, *Stat, <-chan Event, error) { + if err := validatePath(path, false); err != nil { + return nil, nil, nil, err + } + var ech <-chan Event res := &getDataResponse{} _, err := c.request(opGetData, &getDataRequest{Path: path, Watch: true}, res, func(req *request, res *responseHeader, err error) { @@ -872,15 +1035,20 @@ } func (c *Conn) Set(path string, data []byte, version int32) (*Stat, error) { - if path == "" { - return nil, ErrInvalidPath + if err := validatePath(path, false); err != nil { + return nil, err } + res := &setDataResponse{} _, err := c.request(opSetData, &SetDataRequest{path, data, version}, res, nil) return &res.Stat, err } func (c *Conn) Create(path string, data []byte, flags int32, acl []ACL) (string, error) { + if err := validatePath(path, flags&FlagSequence == FlagSequence); err != nil { + return "", err + } + res := &createResponse{} _, err := c.request(opCreate, &CreateRequest{path, data, acl, flags}, res, nil) return res.Path, err @@ -891,6 +1059,10 @@ // ephemeral node still exists. Therefore, on reconnect we need to check if a node // with a GUID generated on create exists. func (c *Conn) CreateProtectedEphemeralSequential(path string, data []byte, acl []ACL) (string, error) { + if err := validatePath(path, true); err != nil { + return "", err + } + var guid [16]byte _, err := io.ReadFull(rand.Reader, guid[:16]) if err != nil { @@ -932,11 +1104,19 @@ } func (c *Conn) Delete(path string, version int32) error { + if err := validatePath(path, false); err != nil { + return err + } + _, err := c.request(opDelete, &DeleteRequest{path, version}, &deleteResponse{}, nil) return err } func (c *Conn) Exists(path string) (bool, *Stat, error) { + if err := validatePath(path, false); err != nil { + return false, nil, err + } + res := &existsResponse{} _, err := c.request(opExists, &existsRequest{Path: path, Watch: false}, res, nil) exists := true @@ -948,6 +1128,10 @@ } func (c *Conn) ExistsW(path string) (bool, *Stat, <-chan Event, error) { + if err := validatePath(path, false); err != nil { + return false, nil, nil, err + } + var ech <-chan Event res := &existsResponse{} _, err := c.request(opExists, &existsRequest{Path: path, Watch: true}, res, func(req *request, res *responseHeader, err error) { @@ -969,17 +1153,29 @@ } func (c *Conn) GetACL(path string) ([]ACL, *Stat, error) { + if err := validatePath(path, false); err != nil { + return nil, nil, err + } + res := &getAclResponse{} _, err := c.request(opGetAcl, &getAclRequest{Path: path}, res, nil) return res.Acl, &res.Stat, err } func (c *Conn) SetACL(path string, acl []ACL, version int32) (*Stat, error) { + if err := validatePath(path, false); err != nil { + return nil, err + } + res := &setAclResponse{} _, err := c.request(opSetAcl, &setAclRequest{Path: path, Acl: acl, Version: version}, res, nil) return &res.Stat, err } func (c *Conn) Sync(path string) (string, error) { + if err := validatePath(path, false); err != nil { + return "", err + } + res := &syncResponse{} _, err := c.request(opSync, &syncRequest{Path: path}, res, nil) return res.Path, err diff -Nru golang-github-samuel-go-zookeeper-0.0~git20161029.0.1d7be4e/zk/conn_test.go golang-github-samuel-go-zookeeper-0.0~git20180130.c4fab1a/zk/conn_test.go --- golang-github-samuel-go-zookeeper-0.0~git20161029.0.1d7be4e/zk/conn_test.go 1970-01-01 00:00:00.000000000 +0000 +++ golang-github-samuel-go-zookeeper-0.0~git20180130.c4fab1a/zk/conn_test.go 2018-03-02 14:37:44.000000000 +0000 @@ -0,0 +1,57 @@ +package zk + +import ( + "io/ioutil" + "testing" + "time" +) + +func TestRecurringReAuthHang(t *testing.T) { + t.Skip("Race condition in test") + + sessionTimeout := 2 * time.Second + + finish := make(chan struct{}) + defer close(finish) + go func() { + select { + case <-finish: + return + case <-time.After(5 * sessionTimeout): + panic("expected not hang") + } + }() + + zkC, err := StartTestCluster(2, ioutil.Discard, ioutil.Discard) + if err != nil { + panic(err) + } + defer zkC.Stop() + + conn, evtC, err := zkC.ConnectAll() + if err != nil { + panic(err) + } + for conn.State() != StateHasSession { + time.Sleep(50 * time.Millisecond) + } + + go func() { + for range evtC { + } + }() + + // Add auth. + conn.AddAuth("digest", []byte("test:test")) + + currentServer := conn.Server() + conn.debugCloseRecvLoop = true + conn.debugReauthDone = make(chan struct{}) + zkC.StopServer(currentServer) + // wait connect to new zookeeper. + for conn.Server() == currentServer && conn.State() != StateHasSession { + time.Sleep(100 * time.Millisecond) + } + + <-conn.debugReauthDone +} diff -Nru golang-github-samuel-go-zookeeper-0.0~git20161029.0.1d7be4e/zk/util.go golang-github-samuel-go-zookeeper-0.0~git20180130.c4fab1a/zk/util.go --- golang-github-samuel-go-zookeeper-0.0~git20161029.0.1d7be4e/zk/util.go 2016-10-28 23:23:40.000000000 +0000 +++ golang-github-samuel-go-zookeeper-0.0~git20180130.c4fab1a/zk/util.go 2018-03-02 14:37:44.000000000 +0000 @@ -7,6 +7,7 @@ "math/rand" "strconv" "strings" + "unicode/utf8" ) // AuthACL produces an ACL list containing a single ACL which uses the @@ -52,3 +53,64 @@ s[i], s[j] = s[j], s[i] } } + +// validatePath will make sure a path is valid before sending the request +func validatePath(path string, isSequential bool) error { + if path == "" { + return ErrInvalidPath + } + + if path[0] != '/' { + return ErrInvalidPath + } + + n := len(path) + if n == 1 { + // path is just the root + return nil + } + + if !isSequential && path[n-1] == '/' { + return ErrInvalidPath + } + + // Start at rune 1 since we already know that the first character is + // a '/'. + for i, w := 1, 0; i < n; i += w { + r, width := utf8.DecodeRuneInString(path[i:]) + switch { + case r == '\u0000': + return ErrInvalidPath + case r == '/': + last, _ := utf8.DecodeLastRuneInString(path[:i]) + if last == '/' { + return ErrInvalidPath + } + case r == '.': + last, lastWidth := utf8.DecodeLastRuneInString(path[:i]) + + // Check for double dot + if last == '.' { + last, _ = utf8.DecodeLastRuneInString(path[:i-lastWidth]) + } + + if last == '/' { + if i+1 == n { + return ErrInvalidPath + } + + next, _ := utf8.DecodeRuneInString(path[i+w:]) + if next == '/' { + return ErrInvalidPath + } + } + case r >= '\u0000' && r <= '\u001f', + r >= '\u007f' && r <= '\u009f', + r >= '\uf000' && r <= '\uf8ff', + r >= '\ufff0' && r < '\uffff': + return ErrInvalidPath + } + w = width + } + return nil +} diff -Nru golang-github-samuel-go-zookeeper-0.0~git20161029.0.1d7be4e/zk/util_test.go golang-github-samuel-go-zookeeper-0.0~git20180130.c4fab1a/zk/util_test.go --- golang-github-samuel-go-zookeeper-0.0~git20161029.0.1d7be4e/zk/util_test.go 2016-10-28 23:23:40.000000000 +0000 +++ golang-github-samuel-go-zookeeper-0.0~git20180130.c4fab1a/zk/util_test.go 2018-03-02 14:37:44.000000000 +0000 @@ -12,3 +12,42 @@ } } } + +func TestValidatePath(t *testing.T) { + tt := []struct { + path string + seq bool + valid bool + }{ + {"/this is / a valid/path", false, true}, + {"/", false, true}, + {"", false, false}, + {"not/valid", false, false}, + {"/ends/with/slash/", false, false}, + {"/sequential/", true, true}, + {"/test\u0000", false, false}, + {"/double//slash", false, false}, + {"/single/./period", false, false}, + {"/double/../period", false, false}, + {"/double/..ok/period", false, true}, + {"/double/alsook../period", false, true}, + {"/double/period/at/end/..", false, false}, + {"/name/with.period", false, true}, + {"/test\u0001", false, false}, + {"/test\u001f", false, false}, + {"/test\u0020", false, true}, // first allowable + {"/test\u007e", false, true}, // last valid ascii + {"/test\u007f", false, false}, + {"/test\u009f", false, false}, + {"/test\uf8ff", false, false}, + {"/test\uffef", false, true}, + {"/test\ufff0", false, false}, + } + + for _, tc := range tt { + err := validatePath(tc.path, tc.seq) + if (err != nil) == tc.valid { + t.Errorf("failed to validate path %q", tc.path) + } + } +} diff -Nru golang-github-samuel-go-zookeeper-0.0~git20161029.0.1d7be4e/zk/zk_test.go golang-github-samuel-go-zookeeper-0.0~git20180130.c4fab1a/zk/zk_test.go --- golang-github-samuel-go-zookeeper-0.0~git20161029.0.1d7be4e/zk/zk_test.go 2016-10-28 23:23:40.000000000 +0000 +++ golang-github-samuel-go-zookeeper-0.0~git20180130.c4fab1a/zk/zk_test.go 2018-03-02 14:37:44.000000000 +0000 @@ -6,7 +6,12 @@ "fmt" "io" "net" + "reflect" + "regexp" + "sort" "strings" + "sync" + "sync/atomic" "testing" "time" ) @@ -464,7 +469,12 @@ } defer zk.Close() - zk.reconnectDelay = time.Second + zk.reconnectLatch = make(chan struct{}) + zk.setWatchLimit = 1024 // break up set-watch step into 1k requests + var setWatchReqs atomic.Value + zk.setWatchCallback = func(reqs []*setWatchesRequest) { + setWatchReqs.Store(reqs) + } zk2, _, err := ts.ConnectAll() if err != nil { @@ -476,14 +486,27 @@ t.Fatalf("Delete returned error: %+v", err) } - testPath, err := zk.Create("/gozk-test-2", []byte{}, 0, WorldACL(PermAll)) - if err != nil { - t.Fatalf("Create returned: %+v", err) - } + testPaths := map[string]<-chan Event{} + defer func() { + // clean up all of the test paths we create + for p := range testPaths { + zk2.Delete(p, -1) + } + }() - _, _, testEvCh, err := zk.GetW(testPath) - if err != nil { - t.Fatalf("GetW returned: %+v", err) + // we create lots of paths to watch, to make sure a "set watches" request + // on re-create will be too big and be required to span multiple packets + for i := 0; i < 1000; i++ { + testPath, err := zk.Create(fmt.Sprintf("/gozk-test-%d", i), []byte{}, 0, WorldACL(PermAll)) + if err != nil { + t.Fatalf("Create returned: %+v", err) + } + testPaths[testPath] = nil + _, _, testEvCh, err := zk.GetW(testPath) + if err != nil { + t.Fatalf("GetW returned: %+v", err) + } + testPaths[testPath] = testEvCh } children, stat, childCh, err := zk.ChildrenW("/") @@ -497,28 +520,48 @@ // Simulate network error by brutally closing the network connection. zk.conn.Close() - if err := zk2.Delete(testPath, -1); err != nil && err != ErrNoNode { - t.Fatalf("Delete returned error: %+v", err) + for p := range testPaths { + if err := zk2.Delete(p, -1); err != nil && err != ErrNoNode { + t.Fatalf("Delete returned error: %+v", err) + } } - // Allow some time for the `zk` session to reconnect and set watches. - time.Sleep(time.Millisecond * 100) - if path, err := zk2.Create("/gozk-test", []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil { t.Fatalf("Create returned error: %+v", err) } else if path != "/gozk-test" { t.Fatalf("Create returned different path '%s' != '/gozk-test'", path) } - select { - case ev := <-testEvCh: - if ev.Err != nil { - t.Fatalf("GetW watcher error %+v", ev.Err) + time.Sleep(100 * time.Millisecond) + + // zk should still be waiting to reconnect, so none of the watches should have been triggered + for p, ch := range testPaths { + select { + case <-ch: + t.Fatalf("GetW watcher for %q should not have triggered yet", p) + default: } - if ev.Path != testPath { - t.Fatalf("GetW watcher wrong path %s instead of %s", ev.Path, testPath) + } + select { + case <-childCh: + t.Fatalf("ChildrenW watcher should not have triggered yet") + default: + } + + // now we let the reconnect occur and make sure it resets watches + close(zk.reconnectLatch) + + for p, ch := range testPaths { + select { + case ev := <-ch: + if ev.Err != nil { + t.Fatalf("GetW watcher error %+v", ev.Err) + } + if ev.Path != p { + t.Fatalf("GetW watcher wrong path %s instead of %s", ev.Path, p) + } + case <-time.After(2 * time.Second): + t.Fatal("GetW watcher timed out") } - case <-time.After(2 * time.Second): - t.Fatal("GetW watcher timed out") } select { @@ -532,6 +575,29 @@ case <-time.After(2 * time.Second): t.Fatal("Child watcher timed out") } + + // Yay! All watches fired correctly. Now we also inspect the actual set-watch request objects + // to ensure they didn't exceed the expected packet set. + buf := make([]byte, bufferSize) + totalWatches := 0 + actualReqs := setWatchReqs.Load().([]*setWatchesRequest) + if len(actualReqs) < 12 { + // sanity check: we should have generated *at least* 12 requests to reset watches + t.Fatalf("too few setWatchesRequest messages: %d", len(actualReqs)) + } + for _, r := range actualReqs { + totalWatches += len(r.ChildWatches) + len(r.DataWatches) + len(r.ExistWatches) + n, err := encodePacket(buf, r) + if err != nil { + t.Fatalf("encodePacket failed: %v! request:\n%+v", err, r) + } else if n > 1024 { + t.Fatalf("setWatchesRequest exceeded allowed size (%d > 1024)! request:\n%+v", n, r) + } + } + + if totalWatches != len(testPaths)+1 { + t.Fatalf("setWatchesRequests did not include all expected watches; expecting %d, got %d", len(testPaths)+1, totalWatches) + } } func TestExpiringWatch(t *testing.T) { @@ -716,3 +782,158 @@ }() return ln.Addr().String(), stopCh, nil } + +func TestMaxBufferSize(t *testing.T) { + ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "}) + if err != nil { + t.Fatal(err) + } + defer ts.Stop() + // no buffer size + zk, _, err := ts.ConnectWithOptions(15 * time.Second) + var l testLogger + if err != nil { + t.Fatalf("Connect returned error: %+v", err) + } + defer zk.Close() + // 1k buffer size, logs to custom test logger + zkLimited, _, err := ts.ConnectWithOptions(15*time.Second, WithMaxBufferSize(1024), func(conn *Conn) { + conn.SetLogger(&l) + }) + if err != nil { + t.Fatalf("Connect returned error: %+v", err) + } + defer zkLimited.Close() + + // With small node with small number of children + data := []byte{101, 102, 103, 103} + _, err = zk.Create("/foo", data, 0, WorldACL(PermAll)) + if err != nil { + t.Fatalf("Create returned error: %+v", err) + } + var children []string + for i := 0; i < 4; i++ { + childName, err := zk.Create("/foo/child", nil, FlagEphemeral|FlagSequence, WorldACL(PermAll)) + if err != nil { + t.Fatalf("Create returned error: %+v", err) + } + children = append(children, childName[len("/foo/"):]) // strip parent prefix from name + } + sort.Strings(children) + + // Limited client works fine + resultData, _, err := zkLimited.Get("/foo") + if err != nil { + t.Fatalf("Get returned error: %+v", err) + } + if !reflect.DeepEqual(resultData, data) { + t.Fatalf("Get returned unexpected data; expecting %+v, got %+v", data, resultData) + } + resultChildren, _, err := zkLimited.Children("/foo") + if err != nil { + t.Fatalf("Children returned error: %+v", err) + } + sort.Strings(resultChildren) + if !reflect.DeepEqual(resultChildren, children) { + t.Fatalf("Children returned unexpected names; expecting %+v, got %+v", children, resultChildren) + } + + // With large node though... + data = make([]byte, 1024) + for i := 0; i < 1024; i++ { + data[i] = byte(i) + } + _, err = zk.Create("/bar", data, 0, WorldACL(PermAll)) + if err != nil { + t.Fatalf("Create returned error: %+v", err) + } + _, _, err = zkLimited.Get("/bar") + // NB: Sadly, without actually de-serializing the too-large response packet, we can't send the + // right error to the corresponding outstanding request. So the request just sees ErrConnectionClosed + // while the log will see the actual reason the connection was closed. + expectErr(t, err, ErrConnectionClosed) + expectLogMessage(t, &l, "received packet from server with length .*, which exceeds max buffer size 1024") + + // Or with large number of children... + totalLen := 0 + children = nil + for totalLen < 1024 { + childName, err := zk.Create("/bar/child", nil, FlagEphemeral|FlagSequence, WorldACL(PermAll)) + if err != nil { + t.Fatalf("Create returned error: %+v", err) + } + n := childName[len("/bar/"):] // strip parent prefix from name + children = append(children, n) + totalLen += len(n) + } + sort.Strings(children) + _, _, err = zkLimited.Children("/bar") + expectErr(t, err, ErrConnectionClosed) + expectLogMessage(t, &l, "received packet from server with length .*, which exceeds max buffer size 1024") + + // Other client (without buffer size limit) can successfully query the node and its children, of course + resultData, _, err = zk.Get("/bar") + if err != nil { + t.Fatalf("Get returned error: %+v", err) + } + if !reflect.DeepEqual(resultData, data) { + t.Fatalf("Get returned unexpected data; expecting %+v, got %+v", data, resultData) + } + resultChildren, _, err = zk.Children("/bar") + if err != nil { + t.Fatalf("Children returned error: %+v", err) + } + sort.Strings(resultChildren) + if !reflect.DeepEqual(resultChildren, children) { + t.Fatalf("Children returned unexpected names; expecting %+v, got %+v", children, resultChildren) + } +} + +func expectErr(t *testing.T, err error, expected error) { + if err == nil { + t.Fatalf("Get for node that is too large should have returned error!") + } + if err != expected { + t.Fatalf("Get returned wrong error; expecting ErrClosing, got %+v", err) + } +} + +func expectLogMessage(t *testing.T, logger *testLogger, pattern string) { + re := regexp.MustCompile(pattern) + events := logger.Reset() + if len(events) == 0 { + t.Fatalf("Failed to log error; expecting message that matches pattern: %s", pattern) + } + var found []string + for _, e := range events { + if re.Match([]byte(e)) { + found = append(found, e) + } + } + if len(found) == 0 { + t.Fatalf("Failed to log error; expecting message that matches pattern: %s", pattern) + } else if len(found) > 1 { + t.Fatalf("Logged error redundantly %d times:\n%+v", len(found), found) + } +} + +type testLogger struct { + mu sync.Mutex + events []string +} + +func (l *testLogger) Printf(msgFormat string, args ...interface{}) { + msg := fmt.Sprintf(msgFormat, args...) + fmt.Println(msg) + l.mu.Lock() + defer l.mu.Unlock() + l.events = append(l.events, msg) +} + +func (l *testLogger) Reset() []string { + l.mu.Lock() + defer l.mu.Unlock() + ret := l.events + l.events = nil + return ret +}