Skip to content

Commit

Permalink
fix: keep pubsub commands dedicated to the same connection in single …
Browse files Browse the repository at this point in the history
…client
  • Loading branch information
rueian committed Jul 4, 2023
1 parent 9f8a06d commit fa1f7d9
Showing 1 changed file with 5 additions and 5 deletions.
10 changes: 5 additions & 5 deletions mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (m *mux) blockingMulti(ctx context.Context, cmd []Completed) (resp *redisre
}

func (m *mux) pipeline(ctx context.Context, cmd Completed) (resp RedisResult) {
slot := slotfn(cmd.Slot(), len(m.wire))
slot := slotfn(len(m.wire), cmd.Slot(), cmd.NoReply())
wire := m.pipe(slot)
if resp = wire.Do(ctx, cmd); isBroken(resp.NonRedisError(), wire) {
m.wire[slot].CompareAndSwap(wire, m.init)
Expand All @@ -233,7 +233,7 @@ func (m *mux) pipeline(ctx context.Context, cmd Completed) (resp RedisResult) {
}

func (m *mux) pipelineMulti(ctx context.Context, cmd []Completed) (resp *redisresults) {
slot := slotfn(cmd[0].Slot(), len(m.wire))
slot := slotfn(len(m.wire), cmd[0].Slot(), cmd[0].NoReply())
wire := m.pipe(slot)
resp = wire.DoMulti(ctx, cmd...)
for _, r := range resp.s {
Expand Down Expand Up @@ -312,7 +312,7 @@ func (m *mux) doMultiCache(ctx context.Context, slot uint16, multi []CacheableTT
}

func (m *mux) Receive(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error {
slot := slotfn(subscribe.Slot(), len(m.wire))
slot := slotfn(len(m.wire), subscribe.Slot(), subscribe.NoReply())
wire := m.pipe(slot)
err := wire.Receive(ctx, subscribe, fn)
if isBroken(err, wire) {
Expand Down Expand Up @@ -361,8 +361,8 @@ func fastrand(n int) (r int) {
return
}

func slotfn(ks uint16, n int) uint16 {
if n == 1 || ks == cmds.NoSlot {
func slotfn(n int, ks uint16, noreply bool) uint16 {
if n == 1 || ks == cmds.NoSlot || noreply {
return 0
}
return uint16(fastrand(n))
Expand Down

0 comments on commit fa1f7d9

Please sign in to comment.