diff --git a/mux.go b/mux.go index 42e5a586..5209b938 100644 --- a/mux.go +++ b/mux.go @@ -2,6 +2,7 @@ package rueidis import ( "context" + "math/rand" "net" "runtime" "sync" @@ -223,7 +224,7 @@ func (m *mux) blockingMulti(ctx context.Context, cmd []Completed) (resp *redisre } func (m *mux) pipeline(ctx context.Context, cmd Completed) (resp RedisResult) { - slot := cmd.Slot() & uint16(len(m.wire)-1) + slot := slotfn(cmd.Slot(), len(m.wire)) wire := m.pipe(slot) if resp = wire.Do(ctx, cmd); isBroken(resp.NonRedisError(), wire) { m.wire[slot].CompareAndSwap(wire, m.init) @@ -232,7 +233,7 @@ func (m *mux) pipeline(ctx context.Context, cmd Completed) (resp RedisResult) { } func (m *mux) pipelineMulti(ctx context.Context, cmd []Completed) (resp *redisresults) { - slot := cmd[0].Slot() & uint16(len(m.wire)-1) + slot := slotfn(cmd[0].Slot(), len(m.wire)) wire := m.pipe(slot) resp = wire.DoMulti(ctx, cmd...) for _, r := range resp.s { @@ -311,7 +312,7 @@ func (m *mux) doMultiCache(ctx context.Context, slot uint16, multi []CacheableTT } func (m *mux) Receive(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { - slot := subscribe.Slot() & uint16(len(m.wire)-1) + slot := slotfn(subscribe.Slot(), len(m.wire)) wire := m.pipe(slot) err := wire.Receive(ctx, subscribe, fn) if isBroken(err, wire) { @@ -346,3 +347,23 @@ func (m *mux) Addr() string { func isBroken(err error, w wire) bool { return err != nil && err != ErrClosing && w.Error() != nil } + +var rngPool = sync.Pool{ + New: func() any { + return rand.New(rand.NewSource(time.Now().UnixNano())) + }, +} + +func fastrand(n int) (r int) { + s := rngPool.Get().(*rand.Rand) + r = s.Intn(n) + rngPool.Put(s) + return +} + +func slotfn(ks uint16, n int) uint16 { + if n == 1 || ks == cmds.NoSlot { + return 0 + } + return uint16(fastrand(n)) +} diff --git a/pipe.go b/pipe.go index 77e73bee..386a8e34 100644 --- a/pipe.go +++ b/pipe.go @@ -313,8 +313,7 @@ func (p *pipe) _background() { } var ( - ones = make([]Completed, 1) - multi []Completed + resps []RedisResult ch chan RedisResult cond *sync.Cond ) @@ -332,13 +331,12 @@ func (p *pipe) _background() { _, _, _ = p.queue.NextWriteCmd() default: } - if ones[0], multi, ch, cond = p.queue.NextResultCh(); ch != nil { - if multi == nil { - multi = ones - } - for range multi { - ch <- newErrResult(p.Error()) + if _, _, ch, resps, cond = p.queue.NextResultCh(); ch != nil { + err := newErrResult(p.Error()) + for i := range resps { + resps[i] = err } + ch <- err cond.L.Unlock() cond.Signal() } else { @@ -405,6 +403,7 @@ func (p *pipe) _backgroundRead() (err error) { cond *sync.Cond ones = make([]Completed, 1) multi []Completed + resps []RedisResult ch chan RedisResult ff int // fulfilled count skip int // skip rest push messages @@ -415,10 +414,12 @@ func (p *pipe) _backgroundRead() (err error) { ) defer func() { + resp := newErrResult(err) if err != nil && ff < len(multi) { - for ; ff < len(multi); ff++ { - ch <- newErrResult(err) + for ; ff < len(resps); ff++ { + resps[ff] = resp } + ch <- resp cond.L.Unlock() cond.Signal() } @@ -462,7 +463,7 @@ func (p *pipe) _backgroundRead() (err error) { } if ff == len(multi) { ff = 0 - ones[0], multi, ch, cond = p.queue.NextResultCh() // ch should not be nil, otherwise it must be a protocol bug + ones[0], multi, ch, resps, cond = p.queue.NextResultCh() // ch should not be nil, otherwise it must be a protocol bug if ch == nil { cond.L.Unlock() // Redis will send sunsubscribe notification proactively in the event of slot migration. @@ -523,8 +524,12 @@ func (p *pipe) _backgroundRead() (err error) { } else if multi[ff].NoReply() && msg.string == "QUEUED" { panic(multiexecsub) } - ch <- newResult(msg, err) + resp := newResult(msg, err) + if resps != nil { + resps[ff] = resp + } if ff++; ff == len(multi) { + ch <- resp cond.L.Unlock() cond.Signal() } @@ -911,32 +916,28 @@ func (p *pipe) DoMulti(ctx context.Context, multi ...Completed) *redisresults { return resp queue: - ch := p.queue.PutMulti(multi) + ch := p.queue.PutMulti(multi, resp.s) var i int if ctxCh := ctx.Done(); ctxCh == nil { - for ; i < len(resp.s); i++ { - resp.s[i] = <-ch - } + <-ch } else { - for ; i < len(resp.s); i++ { - select { - case resp.s[i] = <-ch: - case <-ctxCh: - goto abort - } + select { + case <-ch: + case <-ctxCh: + goto abort } } atomic.AddInt32(&p.waits, -1) atomic.AddInt32(&p.recvs, 1) return resp abort: - go func(i int) { - for ; i < len(resp.s); i++ { - <-ch - } + go func(i int, resp *redisresults) { + <-ch + resultsp.Put(resp) atomic.AddInt32(&p.waits, -1) atomic.AddInt32(&p.recvs, 1) - }(i) + }(i, resp) + resp = resultsp.Get(len(multi), len(multi)) err := newErrResult(ctx.Err()) for ; i < len(resp.s); i++ { resp.s[i] = err diff --git a/redis_test.go b/redis_test.go index 576e4a57..0df59187 100644 --- a/redis_test.go +++ b/redis_test.go @@ -18,10 +18,13 @@ func parallel(p int) (chan func(), func()) { wg.Add(p) for i := 0; i < p; i++ { go func() { + defer func() { + recover() + wg.Done() + }() for fn := range ch { fn() } - wg.Done() }() } return ch, func() { @@ -120,7 +123,7 @@ func testSETGET(t *testing.T, client Client, csc bool) { jobs <- func() { val, err := client.Do(ctx, client.B().Set().Key(key).Value(kvs[key]).Build()).ToString() if err != nil || val != "OK" { - t.Fatalf("unexpected set response %v %v", val, err) + t.Errorf("unexpected set response %v %v", val, err) } } } @@ -133,7 +136,7 @@ func testSETGET(t *testing.T, client Client, csc bool) { jobs <- func() { val, err := client.Do(ctx, client.B().Get().Key(key).Build()).ToString() if v, ok := kvs[key]; !((ok && val == v) || (!ok && IsRedisNil(err))) { - t.Fatalf("unexpected get response %v %v %v", val, err, ok) + t.Errorf("unexpected get response %v %v %v", val, err, ok) } } } @@ -148,7 +151,7 @@ func testSETGET(t *testing.T, client Client, csc bool) { resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), time.Minute) val, err := resp.ToString() if v, ok := kvs[key]; !((ok && val == v) || (!ok && IsRedisNil(err))) { - t.Fatalf("unexpected csc get response %v %v %v", val, err, ok) + t.Errorf("unexpected csc get response %v %v %v", val, err, ok) } if resp.IsCacheHit() { atomic.AddInt64(&hits, 1) @@ -175,12 +178,14 @@ func testSETGET(t *testing.T, client Client, csc bool) { jobs <- func() { val, err := client.Do(ctx, client.B().Del().Key(key).Build()).AsInt64() if _, ok := kvs[key]; !((val == 1 && ok) || (val == 0 && !ok)) { - t.Fatalf("unexpected del response %v %v %v", val, err, ok) + t.Errorf("unexpected del response %v %v %v", val, err, ok) } } } wait() + time.Sleep(time.Second) + t.Logf("testing client side caching after delete\n") jobs, wait = parallel(para) for i := 0; i < keys/100; i++ { @@ -188,10 +193,10 @@ func testSETGET(t *testing.T, client Client, csc bool) { jobs <- func() { resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), time.Minute) if !IsRedisNil(resp.Error()) { - t.Fatalf("unexpected csc get response after delete %v", resp) + t.Errorf("unexpected csc get response after delete %v", resp) } if resp.IsCacheHit() { - t.Fatalf("unexpected csc cache hit after delete") + t.Errorf("unexpected csc cache hit after delete") } } } @@ -320,6 +325,8 @@ func testMultiSETGET(t *testing.T, client Client, csc bool) { } wait() + time.Sleep(time.Second) + t.Logf("testing client side caching after delete\n") jobs, wait = parallel(para) for i := 0; i < keys/100; i += batch { @@ -415,7 +422,9 @@ func testMultiSETGETHelpers(t *testing.T, client Client, csc bool) { t.Fatalf("unexpecetd err %v\n", err) } } + time.Sleep(time.Second) + t.Logf("testing client side caching after delete\n") resp, err = MGetCache(client, ctx, time.Minute, cmdKeys) if err != nil { diff --git a/ring.go b/ring.go index a1345fc2..39e7ae17 100644 --- a/ring.go +++ b/ring.go @@ -7,10 +7,10 @@ import ( type queue interface { PutOne(m Completed) chan RedisResult - PutMulti(m []Completed) chan RedisResult + PutMulti(m []Completed, resps []RedisResult) chan RedisResult NextWriteCmd() (Completed, []Completed, chan RedisResult) WaitForWrite() (Completed, []Completed, chan RedisResult) - NextResultCh() (Completed, []Completed, chan RedisResult, *sync.Cond) + NextResultCh() (Completed, []Completed, chan RedisResult, []RedisResult, *sync.Cond) } var _ queue = (*ring)(nil) @@ -46,6 +46,7 @@ type node struct { ch chan RedisResult one Completed multi []Completed + resps []RedisResult mark uint32 slept bool } @@ -58,6 +59,7 @@ func (r *ring) PutOne(m Completed) chan RedisResult { } n.one = m n.multi = nil + n.resps = nil n.mark = 1 s := n.slept n.c1.L.Unlock() @@ -67,7 +69,7 @@ func (r *ring) PutOne(m Completed) chan RedisResult { return n.ch } -func (r *ring) PutMulti(m []Completed) chan RedisResult { +func (r *ring) PutMulti(m []Completed, resps []RedisResult) chan RedisResult { n := &r.store[atomic.AddUint64(&r.write, 1)&r.mask] n.c1.L.Lock() for n.mark != 0 { @@ -75,6 +77,7 @@ func (r *ring) PutMulti(m []Completed) chan RedisResult { } n.one = Completed{} n.multi = m + n.resps = resps n.mark = 1 s := n.slept n.c1.L.Unlock() @@ -118,14 +121,14 @@ func (r *ring) WaitForWrite() (one Completed, multi []Completed, ch chan RedisRe } // NextResultCh should be only called by one dedicated thread -func (r *ring) NextResultCh() (one Completed, multi []Completed, ch chan RedisResult, cond *sync.Cond) { +func (r *ring) NextResultCh() (one Completed, multi []Completed, ch chan RedisResult, resps []RedisResult, cond *sync.Cond) { r.read2++ p := r.read2 & r.mask n := &r.store[p] cond = n.c1 n.c1.L.Lock() if n.mark == 2 { - one, multi, ch = n.one, n.multi, n.ch + one, multi, ch, resps = n.one, n.multi, n.ch, n.resps n.mark = 0 } else { r.read2-- diff --git a/ring_test.go b/ring_test.go index 86fb9855..d17dfc6f 100644 --- a/ring_test.go +++ b/ring_test.go @@ -30,7 +30,7 @@ func TestRing(t *testing.T) { runtime.Gosched() continue } - cmd2, _, ch, cond := ring.NextResultCh() + cmd2, _, ch, _, cond := ring.NextResultCh() cond.L.Unlock() cond.Signal() if cmd1.Commands()[0] != cmd2.Commands()[0] { @@ -53,7 +53,7 @@ func TestRing(t *testing.T) { base := [][]string{{"a"}, {"b"}, {"c"}, {"d"}} for cmd := range fixture { - go ring.PutMulti(cmds.NewMultiCompleted(append([][]string{{cmd}}, base...))) + go ring.PutMulti(cmds.NewMultiCompleted(append([][]string{{cmd}}, base...)), nil) } for len(fixture) != 0 { @@ -62,7 +62,7 @@ func TestRing(t *testing.T) { runtime.Gosched() continue } - _, cmd2, ch, cond := ring.NextResultCh() + _, cmd2, ch, _, cond := ring.NextResultCh() cond.L.Unlock() cond.Signal() for j := 0; j < len(cmd1); j++ { @@ -82,7 +82,7 @@ func TestRing(t *testing.T) { if one, multi, _ := ring.NextWriteCmd(); !one.IsEmpty() || multi != nil { t.Fatalf("NextWriteCmd should returns nil if empty") } - if one, multi, ch, cond := ring.NextResultCh(); !one.IsEmpty() || multi != nil || ch != nil { + if one, multi, ch, _, cond := ring.NextResultCh(); !one.IsEmpty() || multi != nil || ch != nil { t.Fatalf("NextResultCh should returns nil if not NextWriteCmd yet") } else { cond.L.Unlock() @@ -93,18 +93,18 @@ func TestRing(t *testing.T) { if one, _, _ := ring.NextWriteCmd(); len(one.Commands()) == 0 || one.Commands()[0] != "0" { t.Fatalf("NextWriteCmd should returns next cmd") } - if one, _, ch, cond := ring.NextResultCh(); len(one.Commands()) == 0 || one.Commands()[0] != "0" || ch == nil { + if one, _, ch, _, cond := ring.NextResultCh(); len(one.Commands()) == 0 || one.Commands()[0] != "0" || ch == nil { t.Fatalf("NextResultCh should returns next cmd after NextWriteCmd") } else { cond.L.Unlock() cond.Signal() } - ring.PutMulti(cmds.NewMultiCompleted([][]string{{"0"}})) + ring.PutMulti(cmds.NewMultiCompleted([][]string{{"0"}}), nil) if _, multi, _ := ring.NextWriteCmd(); len(multi) == 0 || multi[0].Commands()[0] != "0" { t.Fatalf("NextWriteCmd should returns next cmd") } - if _, multi, ch, cond := ring.NextResultCh(); len(multi) == 0 || multi[0].Commands()[0] != "0" || ch == nil { + if _, multi, ch, _, cond := ring.NextResultCh(); len(multi) == 0 || multi[0].Commands()[0] != "0" || ch == nil { t.Fatalf("NextResultCh should returns next cmd after NextWriteCmd") } else { cond.L.Unlock() @@ -131,7 +131,7 @@ func TestRing(t *testing.T) { if _, multi, ch := ring.NextWriteCmd(); ch == nil { go func() { time.Sleep(time.Millisecond * 100) - ring.PutMulti([]Completed{cmds.QuitCmd}) + ring.PutMulti([]Completed{cmds.QuitCmd}, nil) }() if _, multi, ch = ring.WaitForWrite(); ch != nil && multi[0].Commands()[0] == cmds.QuitCmd.Commands()[0] { return diff --git a/rueidis.go b/rueidis.go index e5c29f05..7cb1d3c6 100644 --- a/rueidis.go +++ b/rueidis.go @@ -5,8 +5,10 @@ import ( "context" "crypto/tls" "errors" + "math" "math/rand" "net" + "runtime" "strings" "time" @@ -319,7 +321,9 @@ func NewClient(option ClientOption) (client Client, err error) { func singleClientMultiplex(multiplex int) int { if multiplex == 0 { - multiplex = 2 + if multiplex = int(math.Log2(float64(runtime.GOMAXPROCS(0)))); multiplex >= 2 { + multiplex = 2 + } } if multiplex < 0 { multiplex = 0