Skip to content

Commit

Permalink
Merge pull request #291 from redis/perf-reduce-domulti-ch-send
Browse files Browse the repository at this point in the history
perf: improve DoMulti by reducing chansends
  • Loading branch information
rueian authored Jul 2, 2023
2 parents 47c3d7c + 1aba42b commit 9f8a06d
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 51 deletions.
27 changes: 24 additions & 3 deletions mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rueidis

import (
"context"
"math/rand"
"net"
"runtime"
"sync"
Expand Down Expand Up @@ -223,7 +224,7 @@ func (m *mux) blockingMulti(ctx context.Context, cmd []Completed) (resp *redisre
}

func (m *mux) pipeline(ctx context.Context, cmd Completed) (resp RedisResult) {
slot := cmd.Slot() & uint16(len(m.wire)-1)
slot := slotfn(cmd.Slot(), len(m.wire))
wire := m.pipe(slot)
if resp = wire.Do(ctx, cmd); isBroken(resp.NonRedisError(), wire) {
m.wire[slot].CompareAndSwap(wire, m.init)
Expand All @@ -232,7 +233,7 @@ func (m *mux) pipeline(ctx context.Context, cmd Completed) (resp RedisResult) {
}

func (m *mux) pipelineMulti(ctx context.Context, cmd []Completed) (resp *redisresults) {
slot := cmd[0].Slot() & uint16(len(m.wire)-1)
slot := slotfn(cmd[0].Slot(), len(m.wire))
wire := m.pipe(slot)
resp = wire.DoMulti(ctx, cmd...)
for _, r := range resp.s {
Expand Down Expand Up @@ -311,7 +312,7 @@ func (m *mux) doMultiCache(ctx context.Context, slot uint16, multi []CacheableTT
}

func (m *mux) Receive(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error {
slot := subscribe.Slot() & uint16(len(m.wire)-1)
slot := slotfn(subscribe.Slot(), len(m.wire))
wire := m.pipe(slot)
err := wire.Receive(ctx, subscribe, fn)
if isBroken(err, wire) {
Expand Down Expand Up @@ -346,3 +347,23 @@ func (m *mux) Addr() string {
func isBroken(err error, w wire) bool {
return err != nil && err != ErrClosing && w.Error() != nil
}

var rngPool = sync.Pool{
New: func() any {
return rand.New(rand.NewSource(time.Now().UnixNano()))
},
}

func fastrand(n int) (r int) {
s := rngPool.Get().(*rand.Rand)
r = s.Intn(n)
rngPool.Put(s)
return
}

func slotfn(ks uint16, n int) uint16 {
if n == 1 || ks == cmds.NoSlot {
return 0
}
return uint16(fastrand(n))
}
55 changes: 28 additions & 27 deletions pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,7 @@ func (p *pipe) _background() {
}

var (
ones = make([]Completed, 1)
multi []Completed
resps []RedisResult
ch chan RedisResult
cond *sync.Cond
)
Expand All @@ -332,13 +331,12 @@ func (p *pipe) _background() {
_, _, _ = p.queue.NextWriteCmd()
default:
}
if ones[0], multi, ch, cond = p.queue.NextResultCh(); ch != nil {
if multi == nil {
multi = ones
}
for range multi {
ch <- newErrResult(p.Error())
if _, _, ch, resps, cond = p.queue.NextResultCh(); ch != nil {
err := newErrResult(p.Error())
for i := range resps {
resps[i] = err
}
ch <- err
cond.L.Unlock()
cond.Signal()
} else {
Expand Down Expand Up @@ -405,6 +403,7 @@ func (p *pipe) _backgroundRead() (err error) {
cond *sync.Cond
ones = make([]Completed, 1)
multi []Completed
resps []RedisResult
ch chan RedisResult
ff int // fulfilled count
skip int // skip rest push messages
Expand All @@ -415,10 +414,12 @@ func (p *pipe) _backgroundRead() (err error) {
)

defer func() {
resp := newErrResult(err)
if err != nil && ff < len(multi) {
for ; ff < len(multi); ff++ {
ch <- newErrResult(err)
for ; ff < len(resps); ff++ {
resps[ff] = resp
}
ch <- resp
cond.L.Unlock()
cond.Signal()
}
Expand Down Expand Up @@ -462,7 +463,7 @@ func (p *pipe) _backgroundRead() (err error) {
}
if ff == len(multi) {
ff = 0
ones[0], multi, ch, cond = p.queue.NextResultCh() // ch should not be nil, otherwise it must be a protocol bug
ones[0], multi, ch, resps, cond = p.queue.NextResultCh() // ch should not be nil, otherwise it must be a protocol bug
if ch == nil {
cond.L.Unlock()
// Redis will send sunsubscribe notification proactively in the event of slot migration.
Expand Down Expand Up @@ -523,8 +524,12 @@ func (p *pipe) _backgroundRead() (err error) {
} else if multi[ff].NoReply() && msg.string == "QUEUED" {
panic(multiexecsub)
}
ch <- newResult(msg, err)
resp := newResult(msg, err)
if resps != nil {
resps[ff] = resp
}
if ff++; ff == len(multi) {
ch <- resp
cond.L.Unlock()
cond.Signal()
}
Expand Down Expand Up @@ -911,32 +916,28 @@ func (p *pipe) DoMulti(ctx context.Context, multi ...Completed) *redisresults {
return resp

queue:
ch := p.queue.PutMulti(multi)
ch := p.queue.PutMulti(multi, resp.s)
var i int
if ctxCh := ctx.Done(); ctxCh == nil {
for ; i < len(resp.s); i++ {
resp.s[i] = <-ch
}
<-ch
} else {
for ; i < len(resp.s); i++ {
select {
case resp.s[i] = <-ch:
case <-ctxCh:
goto abort
}
select {
case <-ch:
case <-ctxCh:
goto abort
}
}
atomic.AddInt32(&p.waits, -1)
atomic.AddInt32(&p.recvs, 1)
return resp
abort:
go func(i int) {
for ; i < len(resp.s); i++ {
<-ch
}
go func(i int, resp *redisresults) {
<-ch
resultsp.Put(resp)
atomic.AddInt32(&p.waits, -1)
atomic.AddInt32(&p.recvs, 1)
}(i)
}(i, resp)
resp = resultsp.Get(len(multi), len(multi))
err := newErrResult(ctx.Err())
for ; i < len(resp.s); i++ {
resp.s[i] = err
Expand Down
23 changes: 16 additions & 7 deletions redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ func parallel(p int) (chan func(), func()) {
wg.Add(p)
for i := 0; i < p; i++ {
go func() {
defer func() {
recover()
wg.Done()
}()
for fn := range ch {
fn()
}
wg.Done()
}()
}
return ch, func() {
Expand Down Expand Up @@ -120,7 +123,7 @@ func testSETGET(t *testing.T, client Client, csc bool) {
jobs <- func() {
val, err := client.Do(ctx, client.B().Set().Key(key).Value(kvs[key]).Build()).ToString()
if err != nil || val != "OK" {
t.Fatalf("unexpected set response %v %v", val, err)
t.Errorf("unexpected set response %v %v", val, err)
}
}
}
Expand All @@ -133,7 +136,7 @@ func testSETGET(t *testing.T, client Client, csc bool) {
jobs <- func() {
val, err := client.Do(ctx, client.B().Get().Key(key).Build()).ToString()
if v, ok := kvs[key]; !((ok && val == v) || (!ok && IsRedisNil(err))) {
t.Fatalf("unexpected get response %v %v %v", val, err, ok)
t.Errorf("unexpected get response %v %v %v", val, err, ok)
}
}
}
Expand All @@ -148,7 +151,7 @@ func testSETGET(t *testing.T, client Client, csc bool) {
resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), time.Minute)
val, err := resp.ToString()
if v, ok := kvs[key]; !((ok && val == v) || (!ok && IsRedisNil(err))) {
t.Fatalf("unexpected csc get response %v %v %v", val, err, ok)
t.Errorf("unexpected csc get response %v %v %v", val, err, ok)
}
if resp.IsCacheHit() {
atomic.AddInt64(&hits, 1)
Expand All @@ -175,23 +178,25 @@ func testSETGET(t *testing.T, client Client, csc bool) {
jobs <- func() {
val, err := client.Do(ctx, client.B().Del().Key(key).Build()).AsInt64()
if _, ok := kvs[key]; !((val == 1 && ok) || (val == 0 && !ok)) {
t.Fatalf("unexpected del response %v %v %v", val, err, ok)
t.Errorf("unexpected del response %v %v %v", val, err, ok)
}
}
}
wait()

time.Sleep(time.Second)

t.Logf("testing client side caching after delete\n")
jobs, wait = parallel(para)
for i := 0; i < keys/100; i++ {
key := strconv.Itoa(i)
jobs <- func() {
resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), time.Minute)
if !IsRedisNil(resp.Error()) {
t.Fatalf("unexpected csc get response after delete %v", resp)
t.Errorf("unexpected csc get response after delete %v", resp)
}
if resp.IsCacheHit() {
t.Fatalf("unexpected csc cache hit after delete")
t.Errorf("unexpected csc cache hit after delete")
}
}
}
Expand Down Expand Up @@ -320,6 +325,8 @@ func testMultiSETGET(t *testing.T, client Client, csc bool) {
}
wait()

time.Sleep(time.Second)

t.Logf("testing client side caching after delete\n")
jobs, wait = parallel(para)
for i := 0; i < keys/100; i += batch {
Expand Down Expand Up @@ -415,7 +422,9 @@ func testMultiSETGETHelpers(t *testing.T, client Client, csc bool) {
t.Fatalf("unexpecetd err %v\n", err)
}
}

time.Sleep(time.Second)

t.Logf("testing client side caching after delete\n")
resp, err = MGetCache(client, ctx, time.Minute, cmdKeys)
if err != nil {
Expand Down
13 changes: 8 additions & 5 deletions ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (

type queue interface {
PutOne(m Completed) chan RedisResult
PutMulti(m []Completed) chan RedisResult
PutMulti(m []Completed, resps []RedisResult) chan RedisResult
NextWriteCmd() (Completed, []Completed, chan RedisResult)
WaitForWrite() (Completed, []Completed, chan RedisResult)
NextResultCh() (Completed, []Completed, chan RedisResult, *sync.Cond)
NextResultCh() (Completed, []Completed, chan RedisResult, []RedisResult, *sync.Cond)
}

var _ queue = (*ring)(nil)
Expand Down Expand Up @@ -46,6 +46,7 @@ type node struct {
ch chan RedisResult
one Completed
multi []Completed
resps []RedisResult
mark uint32
slept bool
}
Expand All @@ -58,6 +59,7 @@ func (r *ring) PutOne(m Completed) chan RedisResult {
}
n.one = m
n.multi = nil
n.resps = nil
n.mark = 1
s := n.slept
n.c1.L.Unlock()
Expand All @@ -67,14 +69,15 @@ func (r *ring) PutOne(m Completed) chan RedisResult {
return n.ch
}

func (r *ring) PutMulti(m []Completed) chan RedisResult {
func (r *ring) PutMulti(m []Completed, resps []RedisResult) chan RedisResult {
n := &r.store[atomic.AddUint64(&r.write, 1)&r.mask]
n.c1.L.Lock()
for n.mark != 0 {
n.c1.Wait()
}
n.one = Completed{}
n.multi = m
n.resps = resps
n.mark = 1
s := n.slept
n.c1.L.Unlock()
Expand Down Expand Up @@ -118,14 +121,14 @@ func (r *ring) WaitForWrite() (one Completed, multi []Completed, ch chan RedisRe
}

// NextResultCh should be only called by one dedicated thread
func (r *ring) NextResultCh() (one Completed, multi []Completed, ch chan RedisResult, cond *sync.Cond) {
func (r *ring) NextResultCh() (one Completed, multi []Completed, ch chan RedisResult, resps []RedisResult, cond *sync.Cond) {
r.read2++
p := r.read2 & r.mask
n := &r.store[p]
cond = n.c1
n.c1.L.Lock()
if n.mark == 2 {
one, multi, ch = n.one, n.multi, n.ch
one, multi, ch, resps = n.one, n.multi, n.ch, n.resps
n.mark = 0
} else {
r.read2--
Expand Down
16 changes: 8 additions & 8 deletions ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestRing(t *testing.T) {
runtime.Gosched()
continue
}
cmd2, _, ch, cond := ring.NextResultCh()
cmd2, _, ch, _, cond := ring.NextResultCh()
cond.L.Unlock()
cond.Signal()
if cmd1.Commands()[0] != cmd2.Commands()[0] {
Expand All @@ -53,7 +53,7 @@ func TestRing(t *testing.T) {

base := [][]string{{"a"}, {"b"}, {"c"}, {"d"}}
for cmd := range fixture {
go ring.PutMulti(cmds.NewMultiCompleted(append([][]string{{cmd}}, base...)))
go ring.PutMulti(cmds.NewMultiCompleted(append([][]string{{cmd}}, base...)), nil)
}

for len(fixture) != 0 {
Expand All @@ -62,7 +62,7 @@ func TestRing(t *testing.T) {
runtime.Gosched()
continue
}
_, cmd2, ch, cond := ring.NextResultCh()
_, cmd2, ch, _, cond := ring.NextResultCh()
cond.L.Unlock()
cond.Signal()
for j := 0; j < len(cmd1); j++ {
Expand All @@ -82,7 +82,7 @@ func TestRing(t *testing.T) {
if one, multi, _ := ring.NextWriteCmd(); !one.IsEmpty() || multi != nil {
t.Fatalf("NextWriteCmd should returns nil if empty")
}
if one, multi, ch, cond := ring.NextResultCh(); !one.IsEmpty() || multi != nil || ch != nil {
if one, multi, ch, _, cond := ring.NextResultCh(); !one.IsEmpty() || multi != nil || ch != nil {
t.Fatalf("NextResultCh should returns nil if not NextWriteCmd yet")
} else {
cond.L.Unlock()
Expand All @@ -93,18 +93,18 @@ func TestRing(t *testing.T) {
if one, _, _ := ring.NextWriteCmd(); len(one.Commands()) == 0 || one.Commands()[0] != "0" {
t.Fatalf("NextWriteCmd should returns next cmd")
}
if one, _, ch, cond := ring.NextResultCh(); len(one.Commands()) == 0 || one.Commands()[0] != "0" || ch == nil {
if one, _, ch, _, cond := ring.NextResultCh(); len(one.Commands()) == 0 || one.Commands()[0] != "0" || ch == nil {
t.Fatalf("NextResultCh should returns next cmd after NextWriteCmd")
} else {
cond.L.Unlock()
cond.Signal()
}

ring.PutMulti(cmds.NewMultiCompleted([][]string{{"0"}}))
ring.PutMulti(cmds.NewMultiCompleted([][]string{{"0"}}), nil)
if _, multi, _ := ring.NextWriteCmd(); len(multi) == 0 || multi[0].Commands()[0] != "0" {
t.Fatalf("NextWriteCmd should returns next cmd")
}
if _, multi, ch, cond := ring.NextResultCh(); len(multi) == 0 || multi[0].Commands()[0] != "0" || ch == nil {
if _, multi, ch, _, cond := ring.NextResultCh(); len(multi) == 0 || multi[0].Commands()[0] != "0" || ch == nil {
t.Fatalf("NextResultCh should returns next cmd after NextWriteCmd")
} else {
cond.L.Unlock()
Expand All @@ -131,7 +131,7 @@ func TestRing(t *testing.T) {
if _, multi, ch := ring.NextWriteCmd(); ch == nil {
go func() {
time.Sleep(time.Millisecond * 100)
ring.PutMulti([]Completed{cmds.QuitCmd})
ring.PutMulti([]Completed{cmds.QuitCmd}, nil)
}()
if _, multi, ch = ring.WaitForWrite(); ch != nil && multi[0].Commands()[0] == cmds.QuitCmd.Commands()[0] {
return
Expand Down
Loading

0 comments on commit 9f8a06d

Please sign in to comment.