diff -Nru golang-github-go-redis-redis-6.7.4/cluster.go golang-github-go-redis-redis-6.9.2/cluster.go --- golang-github-go-redis-redis-6.7.4/cluster.go 2017-11-20 09:12:32.000000000 +0000 +++ golang-github-go-redis-redis-6.9.2/cluster.go 2018-02-27 14:11:25.000000000 +0000 @@ -2,6 +2,7 @@ import ( "fmt" + "math" "math/rand" "net" "sync" @@ -12,10 +13,10 @@ "github.com/go-redis/redis/internal/hashtag" "github.com/go-redis/redis/internal/pool" "github.com/go-redis/redis/internal/proto" + "github.com/go-redis/redis/internal/singleflight" ) var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes") -var errNilClusterState = fmt.Errorf("redis: cannot load cluster slots") // ClusterOptions are used to configure a cluster client and should be // passed to NewClusterClient. @@ -25,7 +26,7 @@ // The maximum number of retries before giving up. Command is retried // on network errors and MOVED/ASK redirects. - // Default is 16. + // Default is 8. MaxRedirects int // Enables read-only commands on slave nodes. @@ -57,7 +58,7 @@ if opt.MaxRedirects == -1 { opt.MaxRedirects = 0 } else if opt.MaxRedirects == 0 { - opt.MaxRedirects = 16 + opt.MaxRedirects = 8 } if opt.RouteByLatency { @@ -118,11 +119,11 @@ //------------------------------------------------------------------------------ type clusterNode struct { - Client *Client - Latency time.Duration + Client *Client - loading time.Time - generation uint32 + latency uint32 // atomic + generation uint32 // atomic + loading int64 // atomic } func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode { @@ -132,36 +133,69 @@ Client: NewClient(opt), } + node.latency = math.MaxUint32 if clOpt.RouteByLatency { - node.updateLatency() + go node.updateLatency() } return &node } +func (n *clusterNode) Close() error { + return n.Client.Close() +} + +func (n *clusterNode) Test() error { + return n.Client.ClusterInfo().Err() +} + func (n *clusterNode) updateLatency() { const probes = 10 + + var latency uint32 for i := 0; i < probes; i++ { start := time.Now() n.Client.Ping() - n.Latency += time.Since(start) + probe := uint32(time.Since(start) / time.Microsecond) + latency = (latency + probe) / 2 } - n.Latency = n.Latency / probes + atomic.StoreUint32(&n.latency, latency) +} + +func (n *clusterNode) Latency() time.Duration { + latency := atomic.LoadUint32(&n.latency) + return time.Duration(latency) * time.Microsecond +} + +func (n *clusterNode) MarkAsLoading() { + atomic.StoreInt64(&n.loading, time.Now().Unix()) } func (n *clusterNode) Loading() bool { - return !n.loading.IsZero() && time.Since(n.loading) < time.Minute + const minute = int64(time.Minute / time.Second) + + loading := atomic.LoadInt64(&n.loading) + if loading == 0 { + return false + } + if time.Now().Unix()-loading < minute { + return true + } + atomic.StoreInt64(&n.loading, 0) + return false } func (n *clusterNode) Generation() uint32 { - return n.generation + return atomic.LoadUint32(&n.generation) } func (n *clusterNode) SetGeneration(gen uint32) { - if gen < n.generation { - panic("gen < n.generation") + for { + v := atomic.LoadUint32(&n.generation) + if gen < v || atomic.CompareAndSwapUint32(&n.generation, v, gen) { + break + } } - n.generation = gen } //------------------------------------------------------------------------------ @@ -169,18 +203,23 @@ type clusterNodes struct { opt *ClusterOptions - mu sync.RWMutex - addrs []string - nodes map[string]*clusterNode - closed bool + mu sync.RWMutex + allAddrs []string + allNodes map[string]*clusterNode + clusterAddrs []string + closed bool + + nodeCreateGroup singleflight.Group generation uint32 } func newClusterNodes(opt *ClusterOptions) *clusterNodes { return &clusterNodes{ - opt: opt, - nodes: make(map[string]*clusterNode), + opt: opt, + + allAddrs: opt.Addrs, + allNodes: make(map[string]*clusterNode), } } @@ -194,21 +233,29 @@ c.closed = true var firstErr error - for _, node := range c.nodes { + for _, node := range c.allNodes { if err := node.Client.Close(); err != nil && firstErr == nil { firstErr = err } } - c.addrs = nil - c.nodes = nil + + c.allNodes = nil + c.clusterAddrs = nil return firstErr } func (c *clusterNodes) Addrs() ([]string, error) { + var addrs []string c.mu.RLock() closed := c.closed - addrs := c.addrs + if !closed { + if len(c.clusterAddrs) > 0 { + addrs = c.clusterAddrs + } else { + addrs = c.allAddrs + } + } c.mu.RUnlock() if closed { @@ -226,31 +273,23 @@ } // GC removes unused nodes. -func (c *clusterNodes) GC(generation uint32) error { +func (c *clusterNodes) GC(generation uint32) { var collected []*clusterNode c.mu.Lock() - for i := 0; i < len(c.addrs); { - addr := c.addrs[i] - node := c.nodes[addr] + for addr, node := range c.allNodes { if node.Generation() >= generation { - i++ continue } - c.addrs = append(c.addrs[:i], c.addrs[i+1:]...) - delete(c.nodes, addr) + c.clusterAddrs = remove(c.clusterAddrs, addr) + delete(c.allNodes, addr) collected = append(collected, node) } c.mu.Unlock() - var firstErr error for _, node := range collected { - if err := node.Client.Close(); err != nil && firstErr == nil { - firstErr = err - } + _ = node.Client.Close() } - - return firstErr } func (c *clusterNodes) All() ([]*clusterNode, error) { @@ -261,26 +300,36 @@ return nil, pool.ErrClosed } - nodes := make([]*clusterNode, 0, len(c.nodes)) - for _, node := range c.nodes { - nodes = append(nodes, node) + cp := make([]*clusterNode, 0, len(c.allNodes)) + for _, node := range c.allNodes { + cp = append(cp, node) } - return nodes, nil + return cp, nil } func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) { var node *clusterNode - var ok bool + var err error c.mu.RLock() - if !c.closed { - node, ok = c.nodes[addr] + if c.closed { + err = pool.ErrClosed + } else { + node = c.allNodes[addr] } c.mu.RUnlock() - if ok { + if err != nil { + return nil, err + } + if node != nil { return node, nil } + v, err := c.nodeCreateGroup.Do(addr, func() (interface{}, error) { + node := newClusterNode(c.opt, addr) + return node, node.Test() + }) + c.mu.Lock() defer c.mu.Unlock() @@ -288,15 +337,20 @@ return nil, pool.ErrClosed } - node, ok = c.nodes[addr] + node, ok := c.allNodes[addr] if ok { - return node, nil + _ = v.(*clusterNode).Close() + return node, err } + node = v.(*clusterNode) - c.addrs = append(c.addrs, addr) - node = newClusterNode(c.opt, addr) - c.nodes[addr] = node - return node, nil + c.allAddrs = appendIfNotExists(c.allAddrs, addr) + if err == nil { + c.clusterAddrs = append(c.clusterAddrs, addr) + } + c.allNodes[addr] = node + + return node, err } func (c *clusterNodes) Random() (*clusterNode, error) { @@ -305,20 +359,8 @@ return nil, err } - var nodeErr error - for i := 0; i <= c.opt.MaxRedirects; i++ { - n := rand.Intn(len(addrs)) - node, err := c.GetOrCreate(addrs[n]) - if err != nil { - return nil, err - } - - nodeErr = node.Client.ClusterInfo().Err() - if nodeErr == nil { - return node, nil - } - } - return nil, nodeErr + n := rand.Intn(len(addrs)) + return c.GetOrCreate(addrs[n]) } //------------------------------------------------------------------------------ @@ -419,7 +461,7 @@ if n.Loading() { continue } - if node == nil || node.Latency-n.Latency > threshold { + if node == nil || node.Latency()-n.Latency() > threshold { node = n } } @@ -441,13 +483,20 @@ type ClusterClient struct { cmdable - opt *ClusterOptions - nodes *clusterNodes - _state atomic.Value + opt *ClusterOptions + nodes *clusterNodes + + _state atomic.Value + stateErrMu sync.RWMutex + stateErr error cmdsInfoOnce internal.Once cmdsInfo map[string]*CommandInfo + process func(Cmder) error + processPipeline func([]Cmder) error + processTxPipeline func([]Cmder) error + // Reports whether slots reloading is in progress. reloading uint32 } @@ -461,22 +510,14 @@ opt: opt, nodes: newClusterNodes(opt), } - c.setProcessor(c.Process) - // Add initial nodes. - for _, addr := range opt.Addrs { - _, _ = c.nodes.GetOrCreate(addr) - } + c.process = c.defaultProcess + c.processPipeline = c.defaultProcessPipeline + c.processTxPipeline = c.defaultProcessTxPipeline - // Preload cluster slots. - for i := 0; i < 10; i++ { - state, err := c.reloadState() - if err == nil { - c._state.Store(state) - break - } - } + c.cmdable.setProcessor(c.Process) + c.reloadState() if opt.IdleCheckFrequency > 0 { go c.reaper(opt.IdleCheckFrequency) } @@ -493,21 +534,6 @@ return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff) } -func (c *ClusterClient) state() (*clusterState, error) { - v := c._state.Load() - if v != nil { - return v.(*clusterState), nil - } - - _, err := c.nodes.Addrs() - if err != nil { - return nil, err - } - - c.lazyReloadState() - return nil, errNilClusterState -} - func (c *ClusterClient) cmdInfo(name string) *CommandInfo { err := c.cmdsInfoOnce.Do(func() error { node, err := c.nodes.Random() @@ -533,16 +559,27 @@ return info } +func cmdSlot(cmd Cmder, pos int) int { + if pos == 0 { + return hashtag.RandomSlot() + } + firstKey := cmd.stringArg(pos) + return hashtag.Slot(firstKey) +} + func (c *ClusterClient) cmdSlot(cmd Cmder) int { cmdInfo := c.cmdInfo(cmd.Name()) - firstKey := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo)) - return hashtag.Slot(firstKey) + return cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo)) } -func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *clusterNode, error) { +func (c *ClusterClient) cmdSlotAndNode(cmd Cmder) (int, *clusterNode, error) { + state, err := c.state() + if err != nil { + return 0, nil, err + } + cmdInfo := c.cmdInfo(cmd.Name()) - firstKey := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo)) - slot := hashtag.Slot(firstKey) + slot := cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo)) if cmdInfo != nil && cmdInfo.ReadOnly && c.opt.ReadOnly { if c.opt.RouteByLatency { @@ -558,16 +595,24 @@ return slot, node, err } +func (c *ClusterClient) slotMasterNode(slot int) (*clusterNode, error) { + state, err := c.state() + if err != nil { + return nil, err + } + + nodes := state.slotNodes(slot) + if len(nodes) > 0 { + return nodes[0], nil + } + return c.nodes.Random() +} + func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error { if len(keys) == 0 { return fmt.Errorf("redis: keys don't hash to the same slot") } - state, err := c.state() - if err != nil { - return err - } - slot := hashtag.Slot(keys[0]) for _, key := range keys[1:] { if hashtag.Slot(key) != slot { @@ -575,7 +620,7 @@ } } - node, err := state.slotMasterNode(slot) + node, err := c.slotMasterNode(slot) if err != nil { return err } @@ -590,6 +635,10 @@ break } + if internal.IsRetryableError(err, true) { + continue + } + moved, ask, addr := internal.IsMovedError(err) if moved || ask { c.lazyReloadState() @@ -600,6 +649,14 @@ continue } + if err == pool.ErrClosed { + node, err = c.slotMasterNode(slot) + if err != nil { + return err + } + continue + } + return err } @@ -614,31 +671,40 @@ return c.nodes.Close() } -func (c *ClusterClient) Process(cmd Cmder) error { - state, err := c.state() - if err != nil { - cmd.setErr(err) - return err - } +func (c *ClusterClient) WrapProcess( + fn func(oldProcess func(Cmder) error) func(Cmder) error, +) { + c.process = fn(c.process) +} - _, node, err := c.cmdSlotAndNode(state, cmd) - if err != nil { - cmd.setErr(err) - return err - } +func (c *ClusterClient) Process(cmd Cmder) error { + return c.process(cmd) +} +func (c *ClusterClient) defaultProcess(cmd Cmder) error { + var node *clusterNode var ask bool for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { if attempt > 0 { time.Sleep(c.retryBackoff(attempt)) } + if node == nil { + var err error + _, node, err = c.cmdSlotAndNode(cmd) + if err != nil { + cmd.setErr(err) + break + } + } + + var err error if ask { pipe := node.Client.Pipeline() - pipe.Process(NewCmd("ASKING")) - pipe.Process(cmd) + _ = pipe.Process(NewCmd("ASKING")) + _ = pipe.Process(cmd) _, err = pipe.Exec() - pipe.Close() + _ = pipe.Close() ask = false } else { err = node.Client.Process(cmd) @@ -651,15 +717,13 @@ // If slave is loading - read from master. if c.opt.ReadOnly && internal.IsLoadingError(err) { - // TODO: race - node.loading = time.Now() + node.MarkAsLoading() continue } if internal.IsRetryableError(err, true) { - var nodeErr error - node, nodeErr = c.nodes.Random() - if nodeErr != nil { + node, err = c.nodes.Random() + if err != nil { break } continue @@ -671,14 +735,18 @@ if moved || ask { c.lazyReloadState() - var nodeErr error - node, nodeErr = c.nodes.GetOrCreate(addr) - if nodeErr != nil { + node, err = c.nodes.GetOrCreate(addr) + if err != nil { break } continue } + if err == pool.ErrClosed { + node = nil + continue + } + break } @@ -828,31 +896,39 @@ if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) { return } - go func() { - defer atomic.StoreUint32(&c.reloading, 0) - - for { - state, err := c.reloadState() - if err == pool.ErrClosed { - return - } + if c.reloadState() { + time.Sleep(time.Second) + } + atomic.StoreUint32(&c.reloading, 0) + }() +} - if err != nil { - time.Sleep(time.Millisecond) - continue - } +func (c *ClusterClient) reloadState() bool { + for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { + if attempt > 0 { + time.Sleep(c.retryBackoff(attempt)) + } + state, err := c.loadState() + if err == nil { c._state.Store(state) - time.Sleep(5 * time.Second) - c.nodes.GC(state.generation) - break + time.AfterFunc(time.Minute, func() { + c.nodes.GC(state.generation) + }) + return true } - }() + + c.setStateErr(err) + switch err { + case pool.ErrClosed, errClusterNoNodes: + return false + } + } + return false } -// Not thread-safe. -func (c *ClusterClient) reloadState() (*clusterState, error) { +func (c *ClusterClient) loadState() (*clusterState, error) { node, err := c.nodes.Random() if err != nil { return nil, err @@ -866,6 +942,27 @@ return newClusterState(c.nodes, slots, node.Client.opt.Addr) } +func (c *ClusterClient) state() (*clusterState, error) { + v := c._state.Load() + if v != nil { + return v.(*clusterState), nil + } + return nil, c.getStateErr() +} + +func (c *ClusterClient) setStateErr(err error) { + c.stateErrMu.Lock() + c.stateErr = err + c.stateErrMu.Unlock() +} + +func (c *ClusterClient) getStateErr() error { + c.stateErrMu.RLock() + err := c.stateErr + c.stateErrMu.RUnlock() + return err +} + // reaper closes idle connections to the cluster. func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) { ticker := time.NewTicker(idleCheckFrequency) @@ -888,9 +985,9 @@ func (c *ClusterClient) Pipeline() Pipeliner { pipe := Pipeline{ - exec: c.pipelineExec, + exec: c.processPipeline, } - pipe.setProcessor(pipe.Process) + pipe.statefulCmdable.setProcessor(pipe.Process) return &pipe } @@ -898,7 +995,13 @@ return c.Pipeline().Pipelined(fn) } -func (c *ClusterClient) pipelineExec(cmds []Cmder) error { +func (c *ClusterClient) WrapProcessPipeline( + fn func(oldProcess func([]Cmder) error) func([]Cmder) error, +) { + c.processPipeline = fn(c.processPipeline) +} + +func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error { cmdsMap, err := c.mapCmdsByNode(cmds) if err != nil { setCmdsErr(cmds, err) @@ -915,7 +1018,11 @@ for node, cmds := range cmdsMap { cn, _, err := node.Client.getConn() if err != nil { - setCmdsErr(cmds, err) + if err == pool.ErrClosed { + c.remapCmds(cmds, failedCmds) + } else { + setCmdsErr(cmds, err) + } continue } @@ -955,18 +1062,32 @@ return cmdsMap, nil } +func (c *ClusterClient) remapCmds(cmds []Cmder, failedCmds map[*clusterNode][]Cmder) { + remappedCmds, err := c.mapCmdsByNode(cmds) + if err != nil { + setCmdsErr(cmds, err) + return + } + + for node, cmds := range remappedCmds { + failedCmds[node] = cmds + } +} + func (c *ClusterClient) pipelineProcessCmds( node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, ) error { - cn.SetWriteTimeout(c.opt.WriteTimeout) - if err := writeCmd(cn, cmds...); err != nil { + _ = cn.SetWriteTimeout(c.opt.WriteTimeout) + + err := writeCmd(cn, cmds...) + if err != nil { setCmdsErr(cmds, err) failedCmds[node] = cmds return err } // Set read timeout for all commands. - cn.SetReadTimeout(c.opt.ReadTimeout) + _ = cn.SetReadTimeout(c.opt.ReadTimeout) return c.pipelineReadCmds(cn, cmds, failedCmds) } @@ -1026,9 +1147,9 @@ // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC. func (c *ClusterClient) TxPipeline() Pipeliner { pipe := Pipeline{ - exec: c.txPipelineExec, + exec: c.processTxPipeline, } - pipe.setProcessor(pipe.Process) + pipe.statefulCmdable.setProcessor(pipe.Process) return &pipe } @@ -1036,7 +1157,7 @@ return c.TxPipeline().Pipelined(fn) } -func (c *ClusterClient) txPipelineExec(cmds []Cmder) error { +func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error { state, err := c.state() if err != nil { return err @@ -1061,7 +1182,11 @@ for node, cmds := range cmdsMap { cn, _, err := node.Client.getConn() if err != nil { - setCmdsErr(cmds, err) + if err == pool.ErrClosed { + c.remapCmds(cmds, failedCmds) + } else { + setCmdsErr(cmds, err) + } continue } @@ -1179,12 +1304,7 @@ slot = -1 } - state, err := c.state() - if err != nil { - return nil, err - } - - masterNode, err := state.slotMasterNode(slot) + masterNode, err := c.slotMasterNode(slot) if err != nil { return nil, err } diff -Nru golang-github-go-redis-redis-6.7.4/cluster_test.go golang-github-go-redis-redis-6.9.2/cluster_test.go --- golang-github-go-redis-redis-6.7.4/cluster_test.go 2017-11-20 09:12:32.000000000 +0000 +++ golang-github-go-redis-redis-6.9.2/cluster_test.go 2018-02-27 14:11:25.000000000 +0000 @@ -190,13 +190,11 @@ assertClusterClient := func() { It("should GET/SET/DEL", func() { - val, err := client.Get("A").Result() + err := client.Get("A").Err() Expect(err).To(Equal(redis.Nil)) - Expect(val).To(Equal("")) - val, err = client.Set("A", "VALUE", 0).Result() + err = client.Set("A", "VALUE", 0).Err() Expect(err).NotTo(HaveOccurred()) - Expect(val).To(Equal("OK")) Eventually(func() string { return client.Get("A").Val() @@ -295,9 +293,9 @@ } wg.Wait() - n, err := client.Get("key").Int64() - Expect(err).NotTo(HaveOccurred()) - Expect(n).To(Equal(int64(100))) + Eventually(func() string { + return client.Get("key").Val() + }, 30*time.Second).Should(Equal("100")) }) Describe("pipelining", func() { @@ -320,6 +318,14 @@ Expect(err).NotTo(HaveOccurred()) Expect(cmds).To(HaveLen(14)) + _ = client.ForEachNode(func(node *redis.Client) error { + defer GinkgoRecover() + Eventually(func() int64 { + return node.DBSize().Val() + }, 30*time.Second).ShouldNot(BeZero()) + return nil + }) + for _, key := range keys { slot := hashtag.Slot(key) client.SwapSlotNodes(slot) @@ -339,7 +345,7 @@ ttl := cmds[(i*2)+1].(*redis.DurationCmd) dur := time.Duration(i+1) * time.Hour - Expect(ttl.Val()).To(BeNumerically("~", dur, 5*time.Second)) + Expect(ttl.Val()).To(BeNumerically("~", dur, 10*time.Second)) } }) @@ -536,21 +542,51 @@ Expect(nodesList).Should(HaveLen(1)) }) + It("should RANDOMKEY", func() { + const nkeys = 100 + + for i := 0; i < nkeys; i++ { + err := client.Set(fmt.Sprintf("key%d", i), "value", 0).Err() + Expect(err).NotTo(HaveOccurred()) + } + + var keys []string + addKey := func(key string) { + for _, k := range keys { + if k == key { + return + } + } + keys = append(keys, key) + } + + for i := 0; i < nkeys*10; i++ { + key := client.RandomKey().Val() + addKey(key) + } + + Expect(len(keys)).To(BeNumerically("~", nkeys, nkeys/10)) + }) + assertClusterClient() }) Describe("ClusterClient failover", func() { BeforeEach(func() { opt = redisClusterOptions() + opt.MinRetryBackoff = 250 * time.Millisecond + opt.MaxRetryBackoff = time.Second client = cluster.clusterClient(opt) - _ = client.ForEachMaster(func(master *redis.Client) error { - return master.FlushDB().Err() - }) - _ = client.ForEachSlave(func(slave *redis.Client) error { + defer GinkgoRecover() + + _ = client.ForEachMaster(func(master *redis.Client) error { + return master.FlushDB().Err() + }) + Eventually(func() int64 { - return client.DBSize().Val() + return slave.DBSize().Val() }, 30*time.Second).Should(Equal(int64(0))) return slave.ClusterFailover().Err() }) @@ -635,7 +671,7 @@ It("returns an error", func() { err := client.Ping().Err() - Expect(err).To(MatchError("redis: cannot load cluster slots")) + Expect(err).To(MatchError("ERR This instance has cluster support disabled")) }) It("pipeline returns an error", func() { @@ -643,7 +679,7 @@ pipe.Ping() return nil }) - Expect(err).To(MatchError("redis: cannot load cluster slots")) + Expect(err).To(MatchError("ERR This instance has cluster support disabled")) }) }) @@ -691,13 +727,13 @@ }) } - const pause = time.Second + const pause = 3 * time.Second Context("read/write timeout", func() { BeforeEach(func() { opt := redisClusterOptions() - opt.ReadTimeout = 100 * time.Millisecond - opt.WriteTimeout = 100 * time.Millisecond + opt.ReadTimeout = 200 * time.Millisecond + opt.WriteTimeout = 200 * time.Millisecond opt.MaxRedirects = 1 client = cluster.clusterClient(opt) @@ -708,7 +744,8 @@ }) AfterEach(func() { - client.ForEachNode(func(client *redis.Client) error { + _ = client.ForEachNode(func(client *redis.Client) error { + defer GinkgoRecover() Eventually(func() error { return client.Ping().Err() }, 2*pause).ShouldNot(HaveOccurred()) diff -Nru golang-github-go-redis-redis-6.7.4/command.go golang-github-go-redis-redis-6.9.2/command.go --- golang-github-go-redis-redis-6.7.4/command.go 2017-11-20 09:12:32.000000000 +0000 +++ golang-github-go-redis-redis-6.9.2/command.go 2018-02-27 14:11:25.000000000 +0000 @@ -10,6 +10,7 @@ "github.com/go-redis/redis/internal" "github.com/go-redis/redis/internal/pool" "github.com/go-redis/redis/internal/proto" + "github.com/go-redis/redis/internal/util" ) type Cmder interface { @@ -81,14 +82,14 @@ case "eval", "evalsha": if cmd.stringArg(2) != "0" { return 3 - } else { - return -1 } + + return 0 case "publish": return 1 } if info == nil { - return -1 + return 0 } return int(info.FirstKeyPos) } @@ -436,7 +437,7 @@ } func (cmd *StringCmd) Val() string { - return internal.BytesToString(cmd.val) + return util.BytesToString(cmd.val) } func (cmd *StringCmd) Result() (string, error) { diff -Nru golang-github-go-redis-redis-6.7.4/commands.go golang-github-go-redis-redis-6.9.2/commands.go --- golang-github-go-redis-redis-6.7.4/commands.go 2017-11-20 09:12:32.000000000 +0000 +++ golang-github-go-redis-redis-6.9.2/commands.go 2018-02-27 14:11:25.000000000 +0000 @@ -1,6 +1,7 @@ package redis import ( + "errors" "io" "time" @@ -70,8 +71,10 @@ RenameNX(key, newkey string) *BoolCmd Restore(key string, ttl time.Duration, value string) *StatusCmd RestoreReplace(key string, ttl time.Duration, value string) *StatusCmd - Sort(key string, sort Sort) *StringSliceCmd - SortInterfaces(key string, sort Sort) *SliceCmd + Sort(key string, sort *Sort) *StringSliceCmd + SortStore(key, store string, sort *Sort) *IntCmd + SortInterfaces(key string, sort *Sort) *SliceCmd + Touch(keys ...string) *IntCmd TTL(key string) *DurationCmd Type(key string) *StatusCmd Scan(cursor uint64, match string, count int64) *ScanCmd @@ -252,6 +255,7 @@ Cmdable Auth(password string) *StatusCmd Select(index int) *StatusCmd + SwapDB(index1, index2 int) *StatusCmd ClientSetName(name string) *BoolCmd ReadOnly() *StatusCmd ReadWrite() *StatusCmd @@ -316,6 +320,12 @@ return cmd } +func (c *statefulCmdable) SwapDB(index1, index2 int) *StatusCmd { + cmd := NewStatusCmd("swapdb", index1, index2) + c.process(cmd) + return cmd +} + //------------------------------------------------------------------------------ func (c *cmdable) Del(keys ...string) *IntCmd { @@ -484,11 +494,10 @@ type Sort struct { By string - Offset, Count float64 + Offset, Count int64 Get []string Order string - IsAlpha bool - Store string + Alpha bool } func (sort *Sort) args(key string) []interface{} { @@ -505,27 +514,45 @@ if sort.Order != "" { args = append(args, sort.Order) } - if sort.IsAlpha { + if sort.Alpha { args = append(args, "alpha") } - if sort.Store != "" { - args = append(args, "store", sort.Store) - } return args } -func (c *cmdable) Sort(key string, sort Sort) *StringSliceCmd { +func (c *cmdable) Sort(key string, sort *Sort) *StringSliceCmd { cmd := NewStringSliceCmd(sort.args(key)...) c.process(cmd) return cmd } -func (c *cmdable) SortInterfaces(key string, sort Sort) *SliceCmd { +func (c *cmdable) SortStore(key, store string, sort *Sort) *IntCmd { + args := sort.args(key) + if store != "" { + args = append(args, "store", store) + } + cmd := NewIntCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) SortInterfaces(key string, sort *Sort) *SliceCmd { cmd := NewSliceCmd(sort.args(key)...) c.process(cmd) return cmd } +func (c *cmdable) Touch(keys ...string) *IntCmd { + args := make([]interface{}, len(keys)+1) + args[0] = "touch" + for i, key := range keys { + args[i+1] = key + } + cmd := NewIntCmd(args...) + c.process(cmd) + return cmd +} + func (c *cmdable) TTL(key string) *DurationCmd { cmd := NewDurationCmd(time.Second, "ttl", key) c.process(cmd) @@ -677,6 +704,7 @@ return cmd } +// Redis `GET key` command. It returns redis.Nil error when key does not exist. func (c *cmdable) Get(key string) *StringCmd { cmd := NewStringCmd("get", key) c.process(cmd) @@ -1775,7 +1803,7 @@ } } else { // Server did not quit. String reply contains the reason. - cmd.err = internal.RedisError(cmd.val) + cmd.err = errors.New(cmd.val) cmd.val = "" } return cmd diff -Nru golang-github-go-redis-redis-6.7.4/commands_test.go golang-github-go-redis-redis-6.9.2/commands_test.go --- golang-github-go-redis-redis-6.7.4/commands_test.go 2017-11-20 09:12:32.000000000 +0000 +++ golang-github-go-redis-redis-6.9.2/commands_test.go 2018-02-27 14:11:25.000000000 +0000 @@ -10,6 +10,7 @@ . "github.com/onsi/gomega" "github.com/go-redis/redis" + "github.com/go-redis/redis/internal/proto" ) var _ = Describe("Commands", func() { @@ -61,12 +62,14 @@ }) It("should Wait", func() { + const wait = 3 * time.Second + // assume testing on single redis instance start := time.Now() - wait := client.Wait(1, time.Second) - Expect(wait.Err()).NotTo(HaveOccurred()) - Expect(wait.Val()).To(Equal(int64(0))) - Expect(time.Now()).To(BeTemporally("~", start.Add(time.Second), 800*time.Millisecond)) + val, err := client.Wait(1, wait).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal(int64(0))) + Expect(time.Now()).To(BeTemporally("~", start.Add(wait), time.Second)) }) It("should Select", func() { @@ -79,6 +82,16 @@ Expect(sel.Val()).To(Equal("OK")) }) + It("should SwapDB", func() { + pipe := client.Pipeline() + sel := pipe.SwapDB(1, 2) + _, err := pipe.Exec() + Expect(err).NotTo(HaveOccurred()) + + Expect(sel.Err()).NotTo(HaveOccurred()) + Expect(sel.Val()).To(Equal("OK")) + }) + It("should BgRewriteAOF", func() { Skip("flaky test") @@ -447,7 +460,7 @@ pttl := client.PTTL("key") Expect(pttl.Err()).NotTo(HaveOccurred()) - Expect(pttl.Val()).To(BeNumerically("~", expiration, 10*time.Millisecond)) + Expect(pttl.Val()).To(BeNumerically("~", expiration, 100*time.Millisecond)) }) It("should PExpireAt", func() { @@ -466,7 +479,7 @@ pttl := client.PTTL("key") Expect(pttl.Err()).NotTo(HaveOccurred()) - Expect(pttl.Val()).To(BeNumerically("~", expiration, 10*time.Millisecond)) + Expect(pttl.Val()).To(BeNumerically("~", expiration, 100*time.Millisecond)) }) It("should PTTL", func() { @@ -481,7 +494,7 @@ pttl := client.PTTL("key") Expect(pttl.Err()).NotTo(HaveOccurred()) - Expect(pttl.Val()).To(BeNumerically("~", expiration, 10*time.Millisecond)) + Expect(pttl.Val()).To(BeNumerically("~", expiration, 100*time.Millisecond)) }) It("should RandomKey", func() { @@ -582,7 +595,7 @@ Expect(err).NotTo(HaveOccurred()) Expect(size).To(Equal(int64(3))) - els, err := client.Sort("list", redis.Sort{ + els, err := client.Sort("list", &redis.Sort{ Offset: 0, Count: 2, Order: "ASC", @@ -608,7 +621,7 @@ Expect(err).NotTo(HaveOccurred()) { - els, err := client.Sort("list", redis.Sort{ + els, err := client.Sort("list", &redis.Sort{ Get: []string{"object_*"}, }).Result() Expect(err).NotTo(HaveOccurred()) @@ -616,7 +629,7 @@ } { - els, err := client.SortInterfaces("list", redis.Sort{ + els, err := client.SortInterfaces("list", &redis.Sort{ Get: []string{"object_*"}, }).Result() Expect(err).NotTo(HaveOccurred()) @@ -624,6 +637,46 @@ } }) + It("should Sort and Store", func() { + size, err := client.LPush("list", "1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(size).To(Equal(int64(1))) + + size, err = client.LPush("list", "3").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(size).To(Equal(int64(2))) + + size, err = client.LPush("list", "2").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(size).To(Equal(int64(3))) + + n, err := client.SortStore("list", "list2", &redis.Sort{ + Offset: 0, + Count: 2, + Order: "ASC", + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(2))) + + els, err := client.LRange("list2", 0, -1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(els).To(Equal([]string{"1", "2"})) + }) + + It("should Touch", func() { + set1 := client.Set("touch1", "hello", 0) + Expect(set1.Err()).NotTo(HaveOccurred()) + Expect(set1.Val()).To(Equal("OK")) + + set2 := client.Set("touch2", "hello", 0) + Expect(set2.Err()).NotTo(HaveOccurred()) + Expect(set2.Val()).To(Equal("OK")) + + touch := client.Touch("touch1", "touch2", "touch3") + Expect(touch.Err()).NotTo(HaveOccurred()) + Expect(touch.Val()).To(Equal(int64(2))) + }) + It("should TTL", func() { ttl := client.TTL("key") Expect(ttl.Err()).NotTo(HaveOccurred()) @@ -2943,6 +2996,15 @@ Expect(vals).To(Equal([]interface{}{"key", "hello"})) }) + It("returns all values after an error", func() { + vals, err := client.Eval( + `return {12, {err="error"}, "abc"}`, + nil, + ).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(vals).To(Equal([]interface{}{int64(12), proto.RedisError("error"), "abc"})) + }) + }) }) diff -Nru golang-github-go-redis-redis-6.7.4/debian/changelog golang-github-go-redis-redis-6.9.2/debian/changelog --- golang-github-go-redis-redis-6.7.4/debian/changelog 2017-12-15 06:43:08.000000000 +0000 +++ golang-github-go-redis-redis-6.9.2/debian/changelog 2018-03-08 03:04:03.000000000 +0000 @@ -1,3 +1,18 @@ +golang-github-go-redis-redis (6.9.2-2) unstable; urgency=medium + + * d/patches/fix-build-on-32-bit-arch.patch: backport fix from upstream git. + * Add myself to uploaders. + + -- Michael Hudson-Doyle Thu, 08 Mar 2018 16:00:20 +1300 + +golang-github-go-redis-redis (6.9.2-1) unstable; urgency=medium + + * New upstream release + * Bump Standards-Version, no changes needed + * Switch to debhelper 11 + + -- Christos Trochalakis Wed, 28 Feb 2018 11:53:53 +0200 + golang-github-go-redis-redis (6.7.4-1) unstable; urgency=medium * New upstream version 6.7.4 diff -Nru golang-github-go-redis-redis-6.7.4/debian/compat golang-github-go-redis-redis-6.9.2/debian/compat --- golang-github-go-redis-redis-6.7.4/debian/compat 2017-12-15 06:43:08.000000000 +0000 +++ golang-github-go-redis-redis-6.9.2/debian/compat 2018-03-08 03:04:03.000000000 +0000 @@ -1 +1 @@ -10 +11 diff -Nru golang-github-go-redis-redis-6.7.4/debian/control golang-github-go-redis-redis-6.9.2/debian/control --- golang-github-go-redis-redis-6.7.4/debian/control 2017-12-15 06:43:08.000000000 +0000 +++ golang-github-go-redis-redis-6.9.2/debian/control 2018-03-08 03:04:03.000000000 +0000 @@ -2,14 +2,15 @@ Section: devel Priority: optional Maintainer: Debian Go Packaging Team -Uploaders: Christos Trochalakis -Build-Depends: debhelper (>= 10), +Uploaders: Christos Trochalakis , + Michael Hudson-Doyle +Build-Depends: debhelper (>= 11), dh-golang, golang-any, golang-ginkgo-dev, golang-gomega-dev, redis-server -Standards-Version: 4.1.2 +Standards-Version: 4.1.3 Homepage: https://github.com/go-redis/redis Vcs-Browser: https://anonscm.debian.org/cgit/pkg-go/packages/golang-github-go-redis-redis.git Vcs-Git: https://anonscm.debian.org/git/pkg-go/packages/golang-github-go-redis-redis.git diff -Nru golang-github-go-redis-redis-6.7.4/debian/patches/fix-build-on-32-bit-arch.patch golang-github-go-redis-redis-6.9.2/debian/patches/fix-build-on-32-bit-arch.patch --- golang-github-go-redis-redis-6.7.4/debian/patches/fix-build-on-32-bit-arch.patch 1970-01-01 00:00:00.000000000 +0000 +++ golang-github-go-redis-redis-6.9.2/debian/patches/fix-build-on-32-bit-arch.patch 2018-03-08 03:04:03.000000000 +0000 @@ -0,0 +1,56 @@ +From 111cbb98e93fc120536848613cfff4d3ba437978 Mon Sep 17 00:00:00 2001 +From: Vladimir Mihailenco +Date: Wed, 7 Mar 2018 12:20:26 +0200 +Subject: [PATCH] Fix build on 32bit arch + +--- + Makefile | 1 + + cluster.go | 10 +++++----- + 2 files changed, 6 insertions(+), 5 deletions(-) + +--- a/Makefile ++++ b/Makefile +@@ -1,6 +1,7 @@ + all: testdeps + go test ./... + go test ./... -short -race ++ env GOOS=linux GOARCH=386 go test ./... + go vet + + testdeps: testdata/redis/src/redis-server +--- a/cluster.go ++++ b/cluster.go +@@ -123,7 +123,7 @@ + + latency uint32 // atomic + generation uint32 // atomic +- loading int64 // atomic ++ loading uint32 // atomic + } + + func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode { +@@ -168,20 +168,20 @@ + } + + func (n *clusterNode) MarkAsLoading() { +- atomic.StoreInt64(&n.loading, time.Now().Unix()) ++ atomic.StoreUint32(&n.loading, uint32(time.Now().Unix())) + } + + func (n *clusterNode) Loading() bool { + const minute = int64(time.Minute / time.Second) + +- loading := atomic.LoadInt64(&n.loading) ++ loading := atomic.LoadUint32(&n.loading) + if loading == 0 { + return false + } +- if time.Now().Unix()-loading < minute { ++ if time.Now().Unix()-int64(loading) < minute { + return true + } +- atomic.StoreInt64(&n.loading, 0) ++ atomic.StoreUint32(&n.loading, 0) + return false + } + diff -Nru golang-github-go-redis-redis-6.7.4/debian/patches/series golang-github-go-redis-redis-6.9.2/debian/patches/series --- golang-github-go-redis-redis-6.7.4/debian/patches/series 2017-12-15 06:43:08.000000000 +0000 +++ golang-github-go-redis-redis-6.9.2/debian/patches/series 2018-03-08 03:04:03.000000000 +0000 @@ -1 +1,2 @@ +fix-build-on-32-bit-arch.patch 0001-test-Autopkgtest-support.patch diff -Nru golang-github-go-redis-redis-6.7.4/example_instrumentation_test.go golang-github-go-redis-redis-6.9.2/example_instrumentation_test.go --- golang-github-go-redis-redis-6.7.4/example_instrumentation_test.go 2017-11-20 09:12:32.000000000 +0000 +++ golang-github-go-redis-redis-6.9.2/example_instrumentation_test.go 2018-02-27 14:11:25.000000000 +0000 @@ -2,58 +2,47 @@ import ( "fmt" - "sync/atomic" - "time" "github.com/go-redis/redis" ) func Example_instrumentation() { - ring := redis.NewRing(&redis.RingOptions{ - Addrs: map[string]string{ - "shard1": ":6379", - }, + cl := redis.NewClient(&redis.Options{ + Addr: ":6379", }) - ring.ForEachShard(func(client *redis.Client) error { - wrapRedisProcess(client) - return nil + cl.WrapProcess(func(old func(cmd redis.Cmder) error) func(cmd redis.Cmder) error { + return func(cmd redis.Cmder) error { + fmt.Printf("starting processing: <%s>\n", cmd) + err := old(cmd) + fmt.Printf("finished processing: <%s>\n", cmd) + return err + } }) - for { - ring.Ping() - } + cl.Ping() + // Output: starting processing: + // finished processing: } -func wrapRedisProcess(client *redis.Client) { - const precision = time.Microsecond - var count, avgDur uint32 - - go func() { - for range time.Tick(3 * time.Second) { - n := atomic.LoadUint32(&count) - dur := time.Duration(atomic.LoadUint32(&avgDur)) * precision - fmt.Printf("%s: processed=%d avg_dur=%s\n", client, n, dur) - } - }() - - client.WrapProcess(func(oldProcess func(redis.Cmder) error) func(redis.Cmder) error { - return func(cmd redis.Cmder) error { - start := time.Now() - err := oldProcess(cmd) - dur := time.Since(start) - - const decay = float64(1) / 100 - ms := float64(dur / precision) - for { - avg := atomic.LoadUint32(&avgDur) - newAvg := uint32((1-decay)*float64(avg) + decay*ms) - if atomic.CompareAndSwapUint32(&avgDur, avg, newAvg) { - break - } - } - atomic.AddUint32(&count, 1) +func Example_Pipeline_instrumentation() { + client := redis.NewClient(&redis.Options{ + Addr: ":6379", + }) + client.WrapProcessPipeline(func(old func([]redis.Cmder) error) func([]redis.Cmder) error { + return func(cmds []redis.Cmder) error { + fmt.Printf("pipeline starting processing: %v\n", cmds) + err := old(cmds) + fmt.Printf("pipeline finished processing: %v\n", cmds) return err } }) + + client.Pipelined(func(pipe redis.Pipeliner) error { + pipe.Ping() + pipe.Ping() + return nil + }) + // Output: pipeline starting processing: [ping: ping: ] + // pipeline finished processing: [ping: PONG ping: PONG] } diff -Nru golang-github-go-redis-redis-6.7.4/example_test.go golang-github-go-redis-redis-6.9.2/example_test.go --- golang-github-go-redis-redis-6.7.4/example_test.go 2017-11-20 09:12:32.000000000 +0000 +++ golang-github-go-redis-redis-6.9.2/example_test.go 2018-02-27 14:11:25.000000000 +0000 @@ -96,14 +96,14 @@ val2, err := client.Get("key2").Result() if err == redis.Nil { - fmt.Println("key2 does not exists") + fmt.Println("key2 does not exist") } else if err != nil { panic(err) } else { fmt.Println("key2", val2) } // Output: key value - // key2 does not exists + // key2 does not exist } func ExampleClient_Set() { diff -Nru golang-github-go-redis-redis-6.7.4/internal/error.go golang-github-go-redis-redis-6.9.2/internal/error.go --- golang-github-go-redis-redis-6.7.4/internal/error.go 2017-11-20 09:12:32.000000000 +0000 +++ golang-github-go-redis-redis-6.9.2/internal/error.go 2018-02-27 14:11:25.000000000 +0000 @@ -4,13 +4,9 @@ "io" "net" "strings" -) - -const Nil = RedisError("redis: nil") - -type RedisError string -func (e RedisError) Error() string { return string(e) } + "github.com/go-redis/redis/internal/proto" +) func IsRetryableError(err error, retryNetError bool) bool { if IsNetworkError(err) { @@ -30,7 +26,7 @@ } func IsRedisError(err error) bool { - _, ok := err.(RedisError) + _, ok := err.(proto.RedisError) return ok } @@ -42,6 +38,10 @@ return ok } +func IsReadOnlyError(err error) bool { + return strings.HasPrefix(err.Error(), "READONLY ") +} + func IsBadConn(err error, allowTimeout bool) bool { if err == nil { return false diff -Nru golang-github-go-redis-redis-6.7.4/internal/hashtag/hashtag.go golang-github-go-redis-redis-6.9.2/internal/hashtag/hashtag.go --- golang-github-go-redis-redis-6.7.4/internal/hashtag/hashtag.go 2017-11-20 09:12:32.000000000 +0000 +++ golang-github-go-redis-redis-6.9.2/internal/hashtag/hashtag.go 2018-02-27 14:11:25.000000000 +0000 @@ -55,13 +55,17 @@ return key } +func RandomSlot() int { + return rand.Intn(SlotNumber) +} + // hashSlot returns a consistent slot number between 0 and 16383 // for any given string key. func Slot(key string) int { - key = Key(key) if key == "" { - return rand.Intn(SlotNumber) + return RandomSlot() } + key = Key(key) return int(crc16sum(key)) % SlotNumber } diff -Nru golang-github-go-redis-redis-6.7.4/internal/proto/reader.go golang-github-go-redis-redis-6.9.2/internal/proto/reader.go --- golang-github-go-redis-redis-6.7.4/internal/proto/reader.go 2017-11-20 09:12:32.000000000 +0000 +++ golang-github-go-redis-redis-6.9.2/internal/proto/reader.go 2018-02-27 14:11:25.000000000 +0000 @@ -6,7 +6,7 @@ "io" "strconv" - "github.com/go-redis/redis/internal" + "github.com/go-redis/redis/internal/util" ) const bytesAllocLimit = 1024 * 1024 // 1mb @@ -19,6 +19,16 @@ ArrayReply = '*' ) +//------------------------------------------------------------------------------ + +const Nil = RedisError("redis: nil") + +type RedisError string + +func (e RedisError) Error() string { return string(e) } + +//------------------------------------------------------------------------------ + type MultiBulkParse func(*Reader, int64) (interface{}, error) type Reader struct { @@ -37,25 +47,25 @@ r.src.Reset(rd) } -func (p *Reader) PeekBuffered() []byte { - if n := p.src.Buffered(); n != 0 { - b, _ := p.src.Peek(n) +func (r *Reader) PeekBuffered() []byte { + if n := r.src.Buffered(); n != 0 { + b, _ := r.src.Peek(n) return b } return nil } -func (p *Reader) ReadN(n int) ([]byte, error) { - b, err := readN(p.src, p.buf, n) +func (r *Reader) ReadN(n int) ([]byte, error) { + b, err := readN(r.src, r.buf, n) if err != nil { return nil, err } - p.buf = b + r.buf = b return b, nil } -func (p *Reader) ReadLine() ([]byte, error) { - line, isPrefix, err := p.src.ReadLine() +func (r *Reader) ReadLine() ([]byte, error) { + line, isPrefix, err := r.src.ReadLine() if err != nil { return nil, err } @@ -66,13 +76,13 @@ return nil, fmt.Errorf("redis: reply is empty") } if isNilReply(line) { - return nil, internal.Nil + return nil, Nil } return line, nil } -func (p *Reader) ReadReply(m MultiBulkParse) (interface{}, error) { - line, err := p.ReadLine() +func (r *Reader) ReadReply(m MultiBulkParse) (interface{}, error) { + line, err := r.ReadLine() if err != nil { return nil, err } @@ -83,21 +93,21 @@ case StatusReply: return parseStatusValue(line), nil case IntReply: - return parseInt(line[1:], 10, 64) + return util.ParseInt(line[1:], 10, 64) case StringReply: - return p.readTmpBytesValue(line) + return r.readTmpBytesValue(line) case ArrayReply: n, err := parseArrayLen(line) if err != nil { return nil, err } - return m(p, n) + return m(r, n) } return nil, fmt.Errorf("redis: can't parse %.100q", line) } -func (p *Reader) ReadIntReply() (int64, error) { - line, err := p.ReadLine() +func (r *Reader) ReadIntReply() (int64, error) { + line, err := r.ReadLine() if err != nil { return 0, err } @@ -105,14 +115,14 @@ case ErrorReply: return 0, ParseErrorReply(line) case IntReply: - return parseInt(line[1:], 10, 64) + return util.ParseInt(line[1:], 10, 64) default: return 0, fmt.Errorf("redis: can't parse int reply: %.100q", line) } } -func (p *Reader) ReadTmpBytesReply() ([]byte, error) { - line, err := p.ReadLine() +func (r *Reader) ReadTmpBytesReply() ([]byte, error) { + line, err := r.ReadLine() if err != nil { return nil, err } @@ -120,7 +130,7 @@ case ErrorReply: return nil, ParseErrorReply(line) case StringReply: - return p.readTmpBytesValue(line) + return r.readTmpBytesValue(line) case StatusReply: return parseStatusValue(line), nil default: @@ -138,24 +148,24 @@ return cp, nil } -func (p *Reader) ReadStringReply() (string, error) { - b, err := p.ReadTmpBytesReply() +func (r *Reader) ReadStringReply() (string, error) { + b, err := r.ReadTmpBytesReply() if err != nil { return "", err } return string(b), nil } -func (p *Reader) ReadFloatReply() (float64, error) { - b, err := p.ReadTmpBytesReply() +func (r *Reader) ReadFloatReply() (float64, error) { + b, err := r.ReadTmpBytesReply() if err != nil { return 0, err } - return parseFloat(b, 64) + return util.ParseFloat(b, 64) } -func (p *Reader) ReadArrayReply(m MultiBulkParse) (interface{}, error) { - line, err := p.ReadLine() +func (r *Reader) ReadArrayReply(m MultiBulkParse) (interface{}, error) { + line, err := r.ReadLine() if err != nil { return nil, err } @@ -167,14 +177,14 @@ if err != nil { return nil, err } - return m(p, n) + return m(r, n) default: return nil, fmt.Errorf("redis: can't parse array reply: %.100q", line) } } -func (p *Reader) ReadArrayLen() (int64, error) { - line, err := p.ReadLine() +func (r *Reader) ReadArrayLen() (int64, error) { + line, err := r.ReadLine() if err != nil { return 0, err } @@ -188,8 +198,8 @@ } } -func (p *Reader) ReadScanReply() ([]string, uint64, error) { - n, err := p.ReadArrayLen() +func (r *Reader) ReadScanReply() ([]string, uint64, error) { + n, err := r.ReadArrayLen() if err != nil { return nil, 0, err } @@ -197,19 +207,19 @@ return nil, 0, fmt.Errorf("redis: got %d elements in scan reply, expected 2", n) } - cursor, err := p.ReadUint() + cursor, err := r.ReadUint() if err != nil { return nil, 0, err } - n, err = p.ReadArrayLen() + n, err = r.ReadArrayLen() if err != nil { return nil, 0, err } keys := make([]string, n) for i := int64(0); i < n; i++ { - key, err := p.ReadStringReply() + key, err := r.ReadStringReply() if err != nil { return nil, 0, err } @@ -219,9 +229,9 @@ return keys, cursor, err } -func (p *Reader) readTmpBytesValue(line []byte) ([]byte, error) { +func (r *Reader) readTmpBytesValue(line []byte) ([]byte, error) { if isNilReply(line) { - return nil, internal.Nil + return nil, Nil } replyLen, err := strconv.Atoi(string(line[1:])) @@ -229,7 +239,7 @@ return nil, err } - b, err := p.ReadN(replyLen + 2) + b, err := r.ReadN(replyLen + 2) if err != nil { return nil, err } @@ -241,7 +251,7 @@ if err != nil { return 0, err } - return parseInt(b, 10, 64) + return util.ParseInt(b, 10, 64) } func (r *Reader) ReadUint() (uint64, error) { @@ -249,7 +259,7 @@ if err != nil { return 0, err } - return parseUint(b, 10, 64) + return util.ParseUint(b, 10, 64) } // -------------------------------------------------------------------- @@ -303,7 +313,7 @@ } func ParseErrorReply(line []byte) error { - return internal.RedisError(string(line[1:])) + return RedisError(string(line[1:])) } func parseStatusValue(line []byte) []byte { @@ -312,23 +322,7 @@ func parseArrayLen(line []byte) (int64, error) { if isNilReply(line) { - return 0, internal.Nil + return 0, Nil } - return parseInt(line[1:], 10, 64) -} - -func atoi(b []byte) (int, error) { - return strconv.Atoi(internal.BytesToString(b)) -} - -func parseInt(b []byte, base int, bitSize int) (int64, error) { - return strconv.ParseInt(internal.BytesToString(b), base, bitSize) -} - -func parseUint(b []byte, base int, bitSize int) (uint64, error) { - return strconv.ParseUint(internal.BytesToString(b), base, bitSize) -} - -func parseFloat(b []byte, bitSize int) (float64, error) { - return strconv.ParseFloat(internal.BytesToString(b), bitSize) + return util.ParseInt(line[1:], 10, 64) } diff -Nru golang-github-go-redis-redis-6.7.4/internal/proto/scan.go golang-github-go-redis-redis-6.9.2/internal/proto/scan.go --- golang-github-go-redis-redis-6.7.4/internal/proto/scan.go 2017-11-20 09:12:32.000000000 +0000 +++ golang-github-go-redis-redis-6.9.2/internal/proto/scan.go 2018-02-27 14:11:25.000000000 +0000 @@ -5,7 +5,7 @@ "fmt" "reflect" - "github.com/go-redis/redis/internal" + "github.com/go-redis/redis/internal/util" ) func Scan(b []byte, v interface{}) error { @@ -13,80 +13,80 @@ case nil: return fmt.Errorf("redis: Scan(nil)") case *string: - *v = internal.BytesToString(b) + *v = util.BytesToString(b) return nil case *[]byte: *v = b return nil case *int: var err error - *v, err = atoi(b) + *v, err = util.Atoi(b) return err case *int8: - n, err := parseInt(b, 10, 8) + n, err := util.ParseInt(b, 10, 8) if err != nil { return err } *v = int8(n) return nil case *int16: - n, err := parseInt(b, 10, 16) + n, err := util.ParseInt(b, 10, 16) if err != nil { return err } *v = int16(n) return nil case *int32: - n, err := parseInt(b, 10, 32) + n, err := util.ParseInt(b, 10, 32) if err != nil { return err } *v = int32(n) return nil case *int64: - n, err := parseInt(b, 10, 64) + n, err := util.ParseInt(b, 10, 64) if err != nil { return err } *v = n return nil case *uint: - n, err := parseUint(b, 10, 64) + n, err := util.ParseUint(b, 10, 64) if err != nil { return err } *v = uint(n) return nil case *uint8: - n, err := parseUint(b, 10, 8) + n, err := util.ParseUint(b, 10, 8) if err != nil { return err } *v = uint8(n) return nil case *uint16: - n, err := parseUint(b, 10, 16) + n, err := util.ParseUint(b, 10, 16) if err != nil { return err } *v = uint16(n) return nil case *uint32: - n, err := parseUint(b, 10, 32) + n, err := util.ParseUint(b, 10, 32) if err != nil { return err } *v = uint32(n) return nil case *uint64: - n, err := parseUint(b, 10, 64) + n, err := util.ParseUint(b, 10, 64) if err != nil { return err } *v = n return nil case *float32: - n, err := parseFloat(b, 32) + n, err := util.ParseFloat(b, 32) if err != nil { return err } @@ -94,7 +94,7 @@ return err case *float64: var err error - *v, err = parseFloat(b, 64) + *v, err = util.ParseFloat(b, 64) return err case *bool: *v = len(b) == 1 && b[0] == '1' @@ -120,13 +120,47 @@ return fmt.Errorf("redis: ScanSlice(non-slice %T)", slice) } - next := internal.MakeSliceNextElemFunc(v) + next := makeSliceNextElemFunc(v) for i, s := range data { elem := next() - if err := Scan(internal.StringToBytes(s), elem.Addr().Interface()); err != nil { - return fmt.Errorf("redis: ScanSlice(index=%d value=%q) failed: %s", i, s, err) + if err := Scan([]byte(s), elem.Addr().Interface()); err != nil { + err = fmt.Errorf("redis: ScanSlice index=%d value=%q failed: %s", i, s, err) + return err } } return nil } + +func makeSliceNextElemFunc(v reflect.Value) func() reflect.Value { + elemType := v.Type().Elem() + + if elemType.Kind() == reflect.Ptr { + elemType = elemType.Elem() + return func() reflect.Value { + if v.Len() < v.Cap() { + v.Set(v.Slice(0, v.Len()+1)) + elem := v.Index(v.Len() - 1) + if elem.IsNil() { + elem.Set(reflect.New(elemType)) + } + return elem.Elem() + } + + elem := reflect.New(elemType) + v.Set(reflect.Append(v, elem)) + return elem.Elem() + } + } + + zero := reflect.Zero(elemType) + return func() reflect.Value { + if v.Len() < v.Cap() { + v.Set(v.Slice(0, v.Len()+1)) + return v.Index(v.Len() - 1) + } + + v.Set(reflect.Append(v, zero)) + return v.Index(v.Len() - 1) + } +} diff -Nru golang-github-go-redis-redis-6.7.4/internal/safe.go golang-github-go-redis-redis-6.9.2/internal/safe.go --- golang-github-go-redis-redis-6.7.4/internal/safe.go 2017-11-20 09:12:32.000000000 +0000 +++ golang-github-go-redis-redis-6.9.2/internal/safe.go 1970-01-01 00:00:00.000000000 +0000 @@ -1,11 +0,0 @@ -// +build appengine - -package internal - -func BytesToString(b []byte) string { - return string(b) -} - -func StringToBytes(s string) []byte { - return []byte(s) -} diff -Nru golang-github-go-redis-redis-6.7.4/internal/singleflight/singleflight.go golang-github-go-redis-redis-6.9.2/internal/singleflight/singleflight.go --- golang-github-go-redis-redis-6.7.4/internal/singleflight/singleflight.go 1970-01-01 00:00:00.000000000 +0000 +++ golang-github-go-redis-redis-6.9.2/internal/singleflight/singleflight.go 2018-02-27 14:11:25.000000000 +0000 @@ -0,0 +1,64 @@ +/* +Copyright 2013 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package singleflight provides a duplicate function call suppression +// mechanism. +package singleflight + +import "sync" + +// call is an in-flight or completed Do call +type call struct { + wg sync.WaitGroup + val interface{} + err error +} + +// Group represents a class of work and forms a namespace in which +// units of work can be executed with duplicate suppression. +type Group struct { + mu sync.Mutex // protects m + m map[string]*call // lazily initialized +} + +// Do executes and returns the results of the given function, making +// sure that only one execution is in-flight for a given key at a +// time. If a duplicate comes in, the duplicate caller waits for the +// original to complete and receives the same results. +func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) { + g.mu.Lock() + if g.m == nil { + g.m = make(map[string]*call) + } + if c, ok := g.m[key]; ok { + g.mu.Unlock() + c.wg.Wait() + return c.val, c.err + } + c := new(call) + c.wg.Add(1) + g.m[key] = c + g.mu.Unlock() + + c.val, c.err = fn() + c.wg.Done() + + g.mu.Lock() + delete(g.m, key) + g.mu.Unlock() + + return c.val, c.err +} diff -Nru golang-github-go-redis-redis-6.7.4/internal/unsafe.go golang-github-go-redis-redis-6.9.2/internal/unsafe.go --- golang-github-go-redis-redis-6.7.4/internal/unsafe.go 2017-11-20 09:12:32.000000000 +0000 +++ golang-github-go-redis-redis-6.9.2/internal/unsafe.go 1970-01-01 00:00:00.000000000 +0000 @@ -1,27 +0,0 @@ -// +build !appengine - -package internal - -import ( - "reflect" - "unsafe" -) - -func BytesToString(b []byte) string { - bytesHeader := (*reflect.SliceHeader)(unsafe.Pointer(&b)) - strHeader := reflect.StringHeader{ - Data: bytesHeader.Data, - Len: bytesHeader.Len, - } - return *(*string)(unsafe.Pointer(&strHeader)) -} - -func StringToBytes(s string) []byte { - sh := (*reflect.StringHeader)(unsafe.Pointer(&s)) - bh := reflect.SliceHeader{ - Data: sh.Data, - Len: sh.Len, - Cap: sh.Len, - } - return *(*[]byte)(unsafe.Pointer(&bh)) -} diff -Nru golang-github-go-redis-redis-6.7.4/internal/util/safe.go golang-github-go-redis-redis-6.9.2/internal/util/safe.go --- golang-github-go-redis-redis-6.7.4/internal/util/safe.go 1970-01-01 00:00:00.000000000 +0000 +++ golang-github-go-redis-redis-6.9.2/internal/util/safe.go 2018-02-27 14:11:25.000000000 +0000 @@ -0,0 +1,7 @@ +// +build appengine + +package util + +func BytesToString(b []byte) string { + return string(b) +} diff -Nru golang-github-go-redis-redis-6.7.4/internal/util/strconv.go golang-github-go-redis-redis-6.9.2/internal/util/strconv.go --- golang-github-go-redis-redis-6.7.4/internal/util/strconv.go 1970-01-01 00:00:00.000000000 +0000 +++ golang-github-go-redis-redis-6.9.2/internal/util/strconv.go 2018-02-27 14:11:25.000000000 +0000 @@ -0,0 +1,19 @@ +package util + +import "strconv" + +func Atoi(b []byte) (int, error) { + return strconv.Atoi(BytesToString(b)) +} + +func ParseInt(b []byte, base int, bitSize int) (int64, error) { + return strconv.ParseInt(BytesToString(b), base, bitSize) +} + +func ParseUint(b []byte, base int, bitSize int) (uint64, error) { + return strconv.ParseUint(BytesToString(b), base, bitSize) +} + +func ParseFloat(b []byte, bitSize int) (float64, error) { + return strconv.ParseFloat(BytesToString(b), bitSize) +} diff -Nru golang-github-go-redis-redis-6.7.4/internal/util/unsafe.go golang-github-go-redis-redis-6.9.2/internal/util/unsafe.go --- golang-github-go-redis-redis-6.7.4/internal/util/unsafe.go 1970-01-01 00:00:00.000000000 +0000 +++ golang-github-go-redis-redis-6.9.2/internal/util/unsafe.go 2018-02-27 14:11:25.000000000 +0000 @@ -0,0 +1,12 @@ +// +build !appengine + +package util + +import ( + "unsafe" +) + +// BytesToString converts byte slice to string. +func BytesToString(b []byte) string { + return *(*string)(unsafe.Pointer(&b)) +} diff -Nru golang-github-go-redis-redis-6.7.4/internal/util.go golang-github-go-redis-redis-6.9.2/internal/util.go --- golang-github-go-redis-redis-6.7.4/internal/util.go 2017-11-20 09:12:32.000000000 +0000 +++ golang-github-go-redis-redis-6.9.2/internal/util.go 2018-02-27 14:11:25.000000000 +0000 @@ -1,6 +1,6 @@ package internal -import "reflect" +import "github.com/go-redis/redis/internal/util" func ToLower(s string) string { if isLower(s) { @@ -15,7 +15,7 @@ } b[i] = c } - return BytesToString(b) + return util.BytesToString(b) } func isLower(s string) bool { @@ -27,36 +27,3 @@ } return true } - -func MakeSliceNextElemFunc(v reflect.Value) func() reflect.Value { - elemType := v.Type().Elem() - - if elemType.Kind() == reflect.Ptr { - elemType = elemType.Elem() - return func() reflect.Value { - if v.Len() < v.Cap() { - v.Set(v.Slice(0, v.Len()+1)) - elem := v.Index(v.Len() - 1) - if elem.IsNil() { - elem.Set(reflect.New(elemType)) - } - return elem.Elem() - } - - elem := reflect.New(elemType) - v.Set(reflect.Append(v, elem)) - return elem.Elem() - } - } - - zero := reflect.Zero(elemType) - return func() reflect.Value { - if v.Len() < v.Cap() { - v.Set(v.Slice(0, v.Len()+1)) - return v.Index(v.Len() - 1) - } - - v.Set(reflect.Append(v, zero)) - return v.Index(v.Len() - 1) - } -} diff -Nru golang-github-go-redis-redis-6.7.4/options_test.go golang-github-go-redis-redis-6.9.2/options_test.go --- golang-github-go-redis-redis-6.7.4/options_test.go 2017-11-20 09:12:32.000000000 +0000 +++ golang-github-go-redis-redis-6.9.2/options_test.go 2018-02-27 14:11:25.000000000 +0000 @@ -71,7 +71,7 @@ t.Run(c.u, func(t *testing.T) { o, err := ParseURL(c.u) if c.err == nil && err != nil { - t.Fatalf("unexpected error: '%q'", err) + t.Fatalf("unexpected error: %q", err) return } if c.err != nil && err != nil { diff -Nru golang-github-go-redis-redis-6.7.4/parser.go golang-github-go-redis-redis-6.9.2/parser.go --- golang-github-go-redis-redis-6.7.4/parser.go 2017-11-20 09:12:32.000000000 +0000 +++ golang-github-go-redis-redis-6.9.2/parser.go 2018-02-27 14:11:25.000000000 +0000 @@ -14,17 +14,23 @@ vals := make([]interface{}, 0, n) for i := int64(0); i < n; i++ { v, err := rd.ReadReply(sliceParser) - if err == Nil { - vals = append(vals, nil) - } else if err != nil { - return nil, err - } else { - switch vv := v.(type) { - case []byte: - vals = append(vals, string(vv)) - default: - vals = append(vals, v) + if err != nil { + if err == Nil { + vals = append(vals, nil) + continue + } + if err, ok := err.(proto.RedisError); ok { + vals = append(vals, err) + continue } + return nil, err + } + + switch v := v.(type) { + case []byte: + vals = append(vals, string(v)) + default: + vals = append(vals, v) } } return vals, nil diff -Nru golang-github-go-redis-redis-6.7.4/pubsub.go golang-github-go-redis-redis-6.9.2/pubsub.go --- golang-github-go-redis-redis-6.7.4/pubsub.go 2017-11-20 09:12:32.000000000 +0000 +++ golang-github-go-redis-redis-6.9.2/pubsub.go 2018-02-27 14:11:25.000000000 +0000 @@ -127,7 +127,7 @@ return nil } -// Subscribes the client to the specified channels. It returns +// Subscribe the client to the specified channels. It returns // empty subscription if there are no channels. func (c *PubSub) Subscribe(channels ...string) error { c.mu.Lock() @@ -137,7 +137,7 @@ return err } -// Subscribes the client to the given patterns. It returns +// PSubscribe the client to the given patterns. It returns // empty subscription if there are no patterns. func (c *PubSub) PSubscribe(patterns ...string) error { c.mu.Lock() @@ -147,7 +147,7 @@ return err } -// Unsubscribes the client from the given channels, or from all of +// Unsubscribe the client from the given channels, or from all of // them if none is given. func (c *PubSub) Unsubscribe(channels ...string) error { c.mu.Lock() @@ -157,7 +157,7 @@ return err } -// Unsubscribes the client from the given patterns, or from all of +// PUnsubscribe the client from the given patterns, or from all of // them if none is given. func (c *PubSub) PUnsubscribe(patterns ...string) error { c.mu.Lock() @@ -196,7 +196,7 @@ return err } -// Message received after a successful subscription to channel. +// Subscription received after a successful subscription to channel. type Subscription struct { // Can be "subscribe", "unsubscribe", "psubscribe" or "punsubscribe". Kind string diff -Nru golang-github-go-redis-redis-6.7.4/race_test.go golang-github-go-redis-redis-6.9.2/race_test.go --- golang-github-go-redis-redis-6.7.4/race_test.go 2017-11-20 09:12:32.000000000 +0000 +++ golang-github-go-redis-redis-6.9.2/race_test.go 2018-02-27 14:11:25.000000000 +0000 @@ -243,6 +243,21 @@ Expect(err).NotTo(HaveOccurred()) Expect(n).To(Equal(int64(N))) }) + + It("should TxPipeline", func() { + pipe := client.TxPipeline() + perform(N, func(id int) { + pipe.Incr("key") + }) + + cmds, err := pipe.Exec() + Expect(err).NotTo(HaveOccurred()) + Expect(cmds).To(HaveLen(N)) + + n, err := client.Get("key").Int64() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(N))) + }) }) func bigVal() []byte { diff -Nru golang-github-go-redis-redis-6.7.4/README.md golang-github-go-redis-redis-6.9.2/README.md --- golang-github-go-redis-redis-6.7.4/README.md 2017-11-20 09:12:32.000000000 +0000 +++ golang-github-go-redis-redis-6.9.2/README.md 2018-02-27 14:11:25.000000000 +0000 @@ -2,6 +2,7 @@ [![Build Status](https://travis-ci.org/go-redis/redis.png?branch=master)](https://travis-ci.org/go-redis/redis) [![GoDoc](https://godoc.org/github.com/go-redis/redis?status.svg)](https://godoc.org/github.com/go-redis/redis) +[![Airbrake](https://img.shields.io/badge/kudos-airbrake.io-orange.svg)](https://airbrake.io) Supports: @@ -66,14 +67,14 @@ val2, err := client.Get("key2").Result() if err == redis.Nil { - fmt.Println("key2 does not exists") + fmt.Println("key2 does not exist") } else if err != nil { panic(err) } else { fmt.Println("key2", val2) } // Output: key value - // key2 does not exists + // key2 does not exist } ``` diff -Nru golang-github-go-redis-redis-6.7.4/redis_context.go golang-github-go-redis-redis-6.9.2/redis_context.go --- golang-github-go-redis-redis-6.7.4/redis_context.go 2017-11-20 09:12:32.000000000 +0000 +++ golang-github-go-redis-redis-6.9.2/redis_context.go 2018-02-27 14:11:25.000000000 +0000 @@ -12,7 +12,10 @@ connPool pool.Pooler opt *Options - process func(Cmder) error + process func(Cmder) error + processPipeline func([]Cmder) error + processTxPipeline func([]Cmder) error + onClose func() error // hook called when client is closed ctx context.Context diff -Nru golang-github-go-redis-redis-6.7.4/redis.go golang-github-go-redis-redis-6.9.2/redis.go --- golang-github-go-redis-redis-6.7.4/redis.go 2017-11-20 09:12:32.000000000 +0000 +++ golang-github-go-redis-redis-6.9.2/redis.go 2018-02-27 14:11:25.000000000 +0000 @@ -11,8 +11,8 @@ "github.com/go-redis/redis/internal/proto" ) -// Redis nil reply, .e.g. when key does not exist. -const Nil = internal.Nil +// Nil reply redis returned when key does not exist. +const Nil = proto.Nil func init() { SetLogger(log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile)) @@ -22,6 +22,12 @@ internal.Logger = logger } +func (c *baseClient) init() { + c.process = c.defaultProcess + c.processPipeline = c.defaultProcessPipeline + c.processTxPipeline = c.defaultProcessTxPipeline +} + func (c *baseClient) String() string { return fmt.Sprintf("Redis<%s db:%d>", c.getAddr(), c.opt.DB) } @@ -85,7 +91,8 @@ connPool: pool.NewSingleConnPool(cn), }, } - conn.setProcessor(conn.Process) + conn.baseClient.init() + conn.statefulCmdable.setProcessor(conn.Process) _, err := conn.Pipelined(func(pipe Pipeliner) error { if c.opt.Password != "" { @@ -117,14 +124,11 @@ // an input and returns the new wrapper process func. createWrapper should // use call the old process func within the new process func. func (c *baseClient) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) { - c.process = fn(c.defaultProcess) + c.process = fn(c.process) } func (c *baseClient) Process(cmd Cmder) error { - if c.process != nil { - return c.process(cmd) - } - return c.defaultProcess(cmd) + return c.process(cmd) } func (c *baseClient) defaultProcess(cmd Cmder) error { @@ -172,9 +176,9 @@ func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration { if timeout := cmd.readTimeout(); timeout != nil { return *timeout - } else { - return c.opt.ReadTimeout } + + return c.opt.ReadTimeout } // Close closes the client, releasing any open resources. @@ -198,35 +202,48 @@ return c.opt.Addr } +func (c *baseClient) WrapProcessPipeline( + fn func(oldProcess func([]Cmder) error) func([]Cmder) error, +) { + c.processPipeline = fn(c.processPipeline) + c.processTxPipeline = fn(c.processTxPipeline) +} + +func (c *baseClient) defaultProcessPipeline(cmds []Cmder) error { + return c.generalProcessPipeline(cmds, c.pipelineProcessCmds) +} + +func (c *baseClient) defaultProcessTxPipeline(cmds []Cmder) error { + return c.generalProcessPipeline(cmds, c.txPipelineProcessCmds) +} + type pipelineProcessor func(*pool.Conn, []Cmder) (bool, error) -func (c *baseClient) pipelineExecer(p pipelineProcessor) pipelineExecer { - return func(cmds []Cmder) error { - for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { - if attempt > 0 { - time.Sleep(c.retryBackoff(attempt)) - } +func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) error { + for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { + if attempt > 0 { + time.Sleep(c.retryBackoff(attempt)) + } - cn, _, err := c.getConn() - if err != nil { - setCmdsErr(cmds, err) - return err - } + cn, _, err := c.getConn() + if err != nil { + setCmdsErr(cmds, err) + return err + } - canRetry, err := p(cn, cmds) + canRetry, err := p(cn, cmds) - if err == nil || internal.IsRedisError(err) { - _ = c.connPool.Put(cn) - break - } - _ = c.connPool.Remove(cn) + if err == nil || internal.IsRedisError(err) { + _ = c.connPool.Put(cn) + break + } + _ = c.connPool.Remove(cn) - if !canRetry || !internal.IsRetryableError(err, true) { - break - } + if !canRetry || !internal.IsRetryableError(err, true) { + break } - return firstCmdsErr(cmds) } + return firstCmdsErr(cmds) } func (c *baseClient) pipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) { @@ -324,14 +341,15 @@ } func newClient(opt *Options, pool pool.Pooler) *Client { - client := Client{ + c := Client{ baseClient: baseClient{ opt: opt, connPool: pool, }, } - client.setProcessor(client.Process) - return &client + c.baseClient.init() + c.cmdable.setProcessor(c.Process) + return &c } // NewClient returns a client to the Redis Server specified by Options. @@ -343,7 +361,7 @@ func (c *Client) copy() *Client { c2 := new(Client) *c2 = *c - c2.setProcessor(c2.Process) + c2.cmdable.setProcessor(c2.Process) return c2 } @@ -366,9 +384,9 @@ func (c *Client) Pipeline() Pipeliner { pipe := Pipeline{ - exec: c.pipelineExecer(c.pipelineProcessCmds), + exec: c.processPipeline, } - pipe.setProcessor(pipe.Process) + pipe.statefulCmdable.setProcessor(pipe.Process) return &pipe } @@ -379,9 +397,9 @@ // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC. func (c *Client) TxPipeline() Pipeliner { pipe := Pipeline{ - exec: c.pipelineExecer(c.txPipelineProcessCmds), + exec: c.processTxPipeline, } - pipe.setProcessor(pipe.Process) + pipe.statefulCmdable.setProcessor(pipe.Process) return &pipe } @@ -430,9 +448,9 @@ func (c *Conn) Pipeline() Pipeliner { pipe := Pipeline{ - exec: c.pipelineExecer(c.pipelineProcessCmds), + exec: c.processPipeline, } - pipe.setProcessor(pipe.Process) + pipe.statefulCmdable.setProcessor(pipe.Process) return &pipe } @@ -443,8 +461,8 @@ // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC. func (c *Conn) TxPipeline() Pipeliner { pipe := Pipeline{ - exec: c.pipelineExecer(c.txPipelineProcessCmds), + exec: c.processTxPipeline, } - pipe.setProcessor(pipe.Process) + pipe.statefulCmdable.setProcessor(pipe.Process) return &pipe } diff -Nru golang-github-go-redis-redis-6.7.4/redis_no_context.go golang-github-go-redis-redis-6.9.2/redis_no_context.go --- golang-github-go-redis-redis-6.7.4/redis_no_context.go 2017-11-20 09:12:32.000000000 +0000 +++ golang-github-go-redis-redis-6.9.2/redis_no_context.go 2018-02-27 14:11:25.000000000 +0000 @@ -10,6 +10,9 @@ connPool pool.Pooler opt *Options - process func(Cmder) error + process func(Cmder) error + processPipeline func([]Cmder) error + processTxPipeline func([]Cmder) error + onClose func() error // hook called when client is closed } diff -Nru golang-github-go-redis-redis-6.7.4/redis_test.go golang-github-go-redis-redis-6.9.2/redis_test.go --- golang-github-go-redis-redis-6.7.4/redis_test.go 2017-11-20 09:12:32.000000000 +0000 +++ golang-github-go-redis-redis-6.9.2/redis_test.go 2018-02-27 14:11:25.000000000 +0000 @@ -157,30 +157,15 @@ }) It("should retry with backoff", func() { - Expect(client.Close()).NotTo(HaveOccurred()) - - // use up all the available connections to force a fail - connectionHogClient := redis.NewClient(&redis.Options{ - Addr: redisAddr, - MaxRetries: 1, - }) - defer connectionHogClient.Close() - - for i := 0; i <= 1002; i++ { - connectionHogClient.Pool().NewConn() - } - clientNoRetry := redis.NewClient(&redis.Options{ - Addr: redisAddr, - PoolSize: 1, - MaxRetryBackoff: -1, + Addr: ":1234", + MaxRetries: 0, }) defer clientNoRetry.Close() clientRetry := redis.NewClient(&redis.Options{ - Addr: redisAddr, + Addr: ":1234", MaxRetries: 5, - PoolSize: 1, MaxRetryBackoff: 128 * time.Millisecond, }) defer clientRetry.Close() @@ -195,7 +180,7 @@ Expect(err).To(HaveOccurred()) elapseRetry := time.Since(startRetry) - Expect(elapseRetry > elapseNoRetry).To(BeTrue()) + Expect(elapseRetry).To(BeNumerically(">", elapseNoRetry, 10*time.Millisecond)) }) It("should update conn.UsedAt on read/write", func() { diff -Nru golang-github-go-redis-redis-6.7.4/ring.go golang-github-go-redis-redis-6.9.2/ring.go --- golang-github-go-redis-redis-6.7.4/ring.go 2017-11-20 09:12:32.000000000 +0000 +++ golang-github-go-redis-redis-6.9.2/ring.go 2018-02-27 14:11:25.000000000 +0000 @@ -150,6 +150,8 @@ shards map[string]*ringShard shardsList []*ringShard + processPipeline func([]Cmder) error + cmdsInfoOnce internal.Once cmdsInfo map[string]*CommandInfo @@ -158,7 +160,9 @@ func NewRing(opt *RingOptions) *Ring { const nreplicas = 100 + opt.init() + ring := &Ring{ opt: opt, nreplicas: nreplicas, @@ -166,13 +170,17 @@ hash: consistenthash.New(nreplicas, nil), shards: make(map[string]*ringShard), } - ring.setProcessor(ring.Process) + ring.processPipeline = ring.defaultProcessPipeline + ring.cmdable.setProcessor(ring.Process) + for name, addr := range opt.Addrs { clopt := opt.clientOptions() clopt.Addr = addr ring.addShard(name, NewClient(clopt)) } + go ring.heartbeat() + return ring } @@ -298,6 +306,9 @@ if err != nil { return nil } + if c.cmdsInfo == nil { + return nil + } info := c.cmdsInfo[name] if info == nil { internal.Logf("info for cmd=%s not found", name) @@ -343,10 +354,21 @@ func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) { cmdInfo := c.cmdInfo(cmd.Name()) - firstKey := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo)) + pos := cmdFirstKeyPos(cmd, cmdInfo) + if pos == 0 { + return c.randomShard() + } + firstKey := cmd.stringArg(pos) return c.shardByKey(firstKey) } +func (c *Ring) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) { + c.ForEachShard(func(c *Client) error { + c.WrapProcess(fn) + return nil + }) +} + func (c *Ring) Process(cmd Cmder) error { shard, err := c.cmdShard(cmd) if err != nil { @@ -429,9 +451,9 @@ func (c *Ring) Pipeline() Pipeliner { pipe := Pipeline{ - exec: c.pipelineExec, + exec: c.processPipeline, } - pipe.setProcessor(pipe.Process) + pipe.cmdable.setProcessor(pipe.Process) return &pipe } @@ -439,7 +461,13 @@ return c.Pipeline().Pipelined(fn) } -func (c *Ring) pipelineExec(cmds []Cmder) error { +func (c *Ring) WrapProcessPipeline( + fn func(oldProcess func([]Cmder) error) func([]Cmder) error, +) { + c.processPipeline = fn(c.processPipeline) +} + +func (c *Ring) defaultProcessPipeline(cmds []Cmder) error { cmdsMap := make(map[string][]Cmder) for _, cmd := range cmds { cmdInfo := c.cmdInfo(cmd.Name()) diff -Nru golang-github-go-redis-redis-6.7.4/sentinel.go golang-github-go-redis-redis-6.9.2/sentinel.go --- golang-github-go-redis-redis-6.7.4/sentinel.go 2017-11-20 09:12:32.000000000 +0000 +++ golang-github-go-redis-redis-6.9.2/sentinel.go 2018-02-27 14:11:25.000000000 +0000 @@ -76,7 +76,7 @@ opt: opt, } - client := Client{ + c := Client{ baseClient: baseClient{ opt: opt, connPool: failover.Pool(), @@ -86,9 +86,10 @@ }, }, } - client.setProcessor(client.Process) + c.baseClient.init() + c.setProcessor(c.Process) - return &client + return &c } //------------------------------------------------------------------------------ @@ -100,14 +101,15 @@ func newSentinel(opt *Options) *sentinelClient { opt.init() - client := sentinelClient{ + c := sentinelClient{ baseClient: baseClient{ opt: opt, connPool: newConnPool(opt), }, } - client.cmdable = cmdable{client.Process} - return &client + c.baseClient.init() + c.cmdable.setProcessor(c.Process) + return &c } func (c *sentinelClient) PubSub() *PubSub { diff -Nru golang-github-go-redis-redis-6.7.4/.travis.yml golang-github-go-redis-redis-6.9.2/.travis.yml --- golang-github-go-redis-redis-6.7.4/.travis.yml 2017-11-20 09:12:32.000000000 +0000 +++ golang-github-go-redis-redis-6.9.2/.travis.yml 2018-02-27 14:11:25.000000000 +0000 @@ -5,7 +5,6 @@ - redis-server go: - - 1.4.x - 1.7.x - 1.8.x - 1.9.x @@ -13,7 +12,6 @@ matrix: allow_failures: - - go: 1.4.x - go: tip install: diff -Nru golang-github-go-redis-redis-6.7.4/tx.go golang-github-go-redis-redis-6.9.2/tx.go --- golang-github-go-redis-redis-6.7.4/tx.go 2017-11-20 09:12:32.000000000 +0000 +++ golang-github-go-redis-redis-6.9.2/tx.go 2018-02-27 14:11:25.000000000 +0000 @@ -1,12 +1,12 @@ package redis import ( - "github.com/go-redis/redis/internal" "github.com/go-redis/redis/internal/pool" + "github.com/go-redis/redis/internal/proto" ) -// Redis transaction failed. -const TxFailedErr = internal.RedisError("redis: transaction failed") +// TxFailedErr transaction redis failed. +const TxFailedErr = proto.RedisError("redis: transaction failed") // Tx implements Redis transactions as described in // http://redis.io/topics/transactions. It's NOT safe for concurrent use @@ -24,7 +24,8 @@ connPool: pool.NewStickyConnPool(c.connPool.(*pool.ConnPool), true), }, } - tx.setProcessor(tx.Process) + tx.baseClient.init() + tx.statefulCmdable.setProcessor(tx.Process) return &tx } @@ -42,7 +43,7 @@ return err } -// close closes the transaction, releasing any open resources. +// Close closes the transaction, releasing any open resources. func (c *Tx) Close() error { _ = c.Unwatch().Err() return c.baseClient.Close() @@ -75,9 +76,9 @@ func (c *Tx) Pipeline() Pipeliner { pipe := Pipeline{ - exec: c.pipelineExecer(c.txPipelineProcessCmds), + exec: c.processTxPipeline, } - pipe.setProcessor(pipe.Process) + pipe.statefulCmdable.setProcessor(pipe.Process) return &pipe } diff -Nru golang-github-go-redis-redis-6.7.4/universal.go golang-github-go-redis-redis-6.9.2/universal.go --- golang-github-go-redis-redis-6.7.4/universal.go 2017-11-20 09:12:32.000000000 +0000 +++ golang-github-go-redis-redis-6.9.2/universal.go 2018-02-27 14:11:25.000000000 +0000 @@ -114,6 +114,8 @@ type UniversalClient interface { Cmdable Process(cmd Cmder) error + Subscribe(channels ...string) *PubSub + PSubscribe(channels ...string) *PubSub Close() error }