Skip to content

Commit

Permalink
fix: broken tx retris for cluster clients after #697
Browse files Browse the repository at this point in the history
Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian committed Dec 24, 2024
1 parent e5cfe35 commit 060a937
Show file tree
Hide file tree
Showing 2 changed files with 310 additions and 5 deletions.
15 changes: 12 additions & 3 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,14 +858,23 @@ func (c *clusterClient) DoCache(ctx context.Context, cmd Cacheable, ttl time.Dur
}

func askingMulti(cc conn, ctx context.Context, multi []Completed) *redisresults {
var inTx bool
commands := make([]Completed, 0, len(multi)*2)
for _, cmd := range multi {
commands = append(commands, cmds.AskingCmd, cmd)
if inTx {
commands = append(commands, cmd)
inTx = !isExec(cmd)
} else {
commands = append(commands, cmds.AskingCmd, cmd)
inTx = isMulti(cmd)
}
}
results := resultsp.Get(0, len(multi))
resps := cc.DoMulti(ctx, commands...)
for i := 1; i < len(resps.s); i += 2 {
results.s = append(results.s, resps.s[i])
for i, resp := range resps.s {
if commands[i] != cmds.AskingCmd {
results.s = append(results.s, resp)
}
}
resultsp.Put(resps)
return results
Expand Down
300 changes: 298 additions & 2 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4206,6 +4206,106 @@ func TestClusterClientErr(t *testing.T) {
}
})

t.Run("slot moved DoMulti transactions ASKING", func(t *testing.T) {
var count int64
client, err := newClusterClient(
&ClientOption{InitAddress: []string{":0"}},
func(dst string, opt *ClientOption) conn {
return &mockConn{DoFn: func(cmd Completed) RedisResult {
return slotsMultiResp
}, DoMultiFn: func(multi ...Completed) *redisresults {
switch atomic.AddInt64(&count, 1) {
case 1:
return &redisresults{s: []RedisResult{
newResult(RedisMessage{typ: '+', string: "1"}, nil),
newResult(RedisMessage{typ: '+', string: "OK"}, nil),
newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil),
newResult(RedisMessage{typ: '-', string: "ASK 0 :1"}, nil),
newResult(RedisMessage{typ: '-', string: "EXECABORT"}, nil),
newResult(RedisMessage{typ: '+', string: "4"}, nil),
newResult(RedisMessage{typ: '+', string: "OK"}, nil),
newResult(RedisMessage{typ: '-', string: "ASK 0 :1"}, nil),
newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil),
newResult(RedisMessage{typ: '-', string: "EXECABORT"}, nil),
newResult(RedisMessage{typ: '+', string: "7"}, nil),
}}
case 2:
return &redisresults{s: []RedisResult{
newResult(RedisMessage{typ: '+', string: "OK"}, nil),
newResult(RedisMessage{typ: '+', string: "OK"}, nil),
newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil),
newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil),
newResult(RedisMessage{typ: '*', values: []RedisMessage{
{typ: '+', string: "2"},
{typ: '+', string: "3"},
}}, nil),
newResult(RedisMessage{typ: '+', string: "OK"}, nil),
newResult(RedisMessage{typ: '+', string: "OK"}, nil),
newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil),
newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil),
newResult(RedisMessage{typ: '*', values: []RedisMessage{
{typ: '+', string: "5"},
{typ: '+', string: "6"},
}}, nil),
}}
}
return nil
}}
},
newRetryer(defaultRetryDelayFn),
)
if err != nil {
t.Fatalf("unexpected err %v", err)
}
resps := client.DoMulti(
context.Background(),
client.B().Get().Key("1{t}").Build(),
client.B().Multi().Build(),
client.B().Get().Key("2{t}").Build(),
client.B().Get().Key("3{t}").Build(),
client.B().Exec().Build(),
client.B().Get().Key("4{t}").Build(),
client.B().Multi().Build(),
client.B().Get().Key("5{t}").Build(),
client.B().Get().Key("6{t}").Build(),
client.B().Exec().Build(),
client.B().Get().Key("7{t}").Build(),
)
if v, err := resps[0].ToString(); err != nil || v != "1" {
t.Fatalf("unexpected resp %v %v", v, err)
}
if v, err := resps[1].ToString(); err != nil || v != "OK" {
t.Fatalf("unexpected resp %v %v", v, err)
}
if v, err := resps[2].ToString(); err != nil || v != "QUEUED" {
t.Fatalf("unexpected resp %v %v", v, err)
}
if v, err := resps[3].ToString(); err != nil || v != "QUEUED" {
t.Fatalf("unexpected resp %v %v", v, err)
}
if v, err := resps[4].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"2", "3"}) {
t.Fatalf("unexpected resp %v %v", v, err)
}
if v, err := resps[5].ToString(); err != nil || v != "4" {
t.Fatalf("unexpected resp %v %v", v, err)
}
if v, err := resps[6].ToString(); err != nil || v != "OK" {
t.Fatalf("unexpected resp %v %v", v, err)
}
if v, err := resps[7].ToString(); err != nil || v != "QUEUED" {
t.Fatalf("unexpected resp %v %v", v, err)
}
if v, err := resps[8].ToString(); err != nil || v != "QUEUED" {
t.Fatalf("unexpected resp %v %v", v, err)
}
if v, err := resps[9].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"5", "6"}) {
t.Fatalf("unexpected resp %v %v", v, err)
}
if v, err := resps[10].ToString(); err != nil || v != "7" {
t.Fatalf("unexpected resp %v %v", v, err)
}
})

t.Run("slot moved DoMulti except transactions", func(t *testing.T) {
var count int64
client, err := newClusterClient(
Expand Down Expand Up @@ -4299,6 +4399,102 @@ func TestClusterClientErr(t *testing.T) {
}
})

t.Run("slot moved DoMulti except transactions ASKING", func(t *testing.T) {
var count int64
client, err := newClusterClient(
&ClientOption{InitAddress: []string{":0"}},
func(dst string, opt *ClientOption) conn {
return &mockConn{DoFn: func(cmd Completed) RedisResult {
return slotsMultiResp
}, DoMultiFn: func(multi ...Completed) *redisresults {
switch atomic.AddInt64(&count, 1) {
case 1:
return &redisresults{s: []RedisResult{
newResult(RedisMessage{typ: '-', string: "ASK 0 :1"}, nil),
newResult(RedisMessage{typ: '+', string: "OK"}, nil),
newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil),
newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil),
newResult(RedisMessage{typ: '*', values: []RedisMessage{
{typ: '+', string: "2"},
{typ: '+', string: "3"},
}}, nil),
newResult(RedisMessage{typ: '-', string: "ASK 0 :1"}, nil),
newResult(RedisMessage{typ: '+', string: "OK"}, nil),
newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil),
newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil),
newResult(RedisMessage{typ: '*', values: []RedisMessage{
{typ: '+', string: "5"},
{typ: '+', string: "6"},
}}, nil),
newResult(RedisMessage{typ: '-', string: "ASK 0 :1"}, nil),
}}
case 2:
return &redisresults{s: []RedisResult{
newResult(RedisMessage{typ: '+', string: "OK"}, nil),
newResult(RedisMessage{typ: '+', string: "1"}, nil),
newResult(RedisMessage{typ: '+', string: "OK"}, nil),
newResult(RedisMessage{typ: '+', string: "4"}, nil),
newResult(RedisMessage{typ: '+', string: "OK"}, nil),
newResult(RedisMessage{typ: '+', string: "7"}, nil),
}}
}
return nil
}}
},
newRetryer(defaultRetryDelayFn),
)
if err != nil {
t.Fatalf("unexpected err %v", err)
}
resps := client.DoMulti(
context.Background(),
client.B().Get().Key("1{t}").Build(),
client.B().Multi().Build(),
client.B().Get().Key("2{t}").Build(),
client.B().Get().Key("3{t}").Build(),
client.B().Exec().Build(),
client.B().Get().Key("4{t}").Build(),
client.B().Multi().Build(),
client.B().Get().Key("5{t}").Build(),
client.B().Get().Key("6{t}").Build(),
client.B().Exec().Build(),
client.B().Get().Key("7{t}").Build(),
)
if v, err := resps[0].ToString(); err != nil || v != "1" {
t.Fatalf("unexpected resp %v %v", v, err)
}
if v, err := resps[1].ToString(); err != nil || v != "OK" {
t.Fatalf("unexpected resp %v %v", v, err)
}
if v, err := resps[2].ToString(); err != nil || v != "QUEUED" {
t.Fatalf("unexpected resp %v %v", v, err)
}
if v, err := resps[3].ToString(); err != nil || v != "QUEUED" {
t.Fatalf("unexpected resp %v %v", v, err)
}
if v, err := resps[4].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"2", "3"}) {
t.Fatalf("unexpected resp %v %v", v, err)
}
if v, err := resps[5].ToString(); err != nil || v != "4" {
t.Fatalf("unexpected resp %v %v", v, err)
}
if v, err := resps[6].ToString(); err != nil || v != "OK" {
t.Fatalf("unexpected resp %v %v", v, err)
}
if v, err := resps[7].ToString(); err != nil || v != "QUEUED" {
t.Fatalf("unexpected resp %v %v", v, err)
}
if v, err := resps[8].ToString(); err != nil || v != "QUEUED" {
t.Fatalf("unexpected resp %v %v", v, err)
}
if v, err := resps[9].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"5", "6"}) {
t.Fatalf("unexpected resp %v %v", v, err)
}
if v, err := resps[10].ToString(); err != nil || v != "7" {
t.Fatalf("unexpected resp %v %v", v, err)
}
})

t.Run("slot moved DoMulti transactions mixed", func(t *testing.T) {
var count int64
client, err := newClusterClient(
Expand Down Expand Up @@ -4398,6 +4594,108 @@ func TestClusterClientErr(t *testing.T) {
}
})

t.Run("slot moved DoMulti transactions mixed ASKING", func(t *testing.T) {
var count int64
client, err := newClusterClient(
&ClientOption{InitAddress: []string{":0"}},
func(dst string, opt *ClientOption) conn {
return &mockConn{DoFn: func(cmd Completed) RedisResult {
return slotsMultiResp
}, DoMultiFn: func(multi ...Completed) *redisresults {
switch atomic.AddInt64(&count, 1) {
case 1:
return &redisresults{s: []RedisResult{
newResult(RedisMessage{typ: '+', string: "1"}, nil),
newResult(RedisMessage{typ: '+', string: "OK"}, nil),
newResult(RedisMessage{typ: '-', string: "ASK 0 :1"}, nil),
newResult(RedisMessage{typ: '-', string: "ASK 0 :1"}, nil),
newResult(RedisMessage{typ: '-', string: "EXECABORT"}, nil),
newResult(RedisMessage{typ: '-', string: "ASK 0 :1"}, nil),
newResult(RedisMessage{typ: '+', string: "OK"}, nil),
newResult(RedisMessage{typ: '-', string: "ASK 0 :1"}, nil),
newResult(RedisMessage{typ: '-', string: "ASK 0 :1"}, nil),
newResult(RedisMessage{typ: '-', string: "EXECABORT"}, nil),
newResult(RedisMessage{typ: '+', string: "7"}, nil),
}}
case 2:
return &redisresults{s: []RedisResult{
newResult(RedisMessage{typ: '+', string: "OK"}, nil),
newResult(RedisMessage{typ: '+', string: "OK"}, nil),
newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil),
newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil),
newResult(RedisMessage{typ: '*', values: []RedisMessage{
{typ: '+', string: "2"},
{typ: '+', string: "3"},
}}, nil),
newResult(RedisMessage{typ: '+', string: "OK"}, nil),
newResult(RedisMessage{typ: '+', string: "4"}, nil),
newResult(RedisMessage{typ: '+', string: "OK"}, nil),
newResult(RedisMessage{typ: '+', string: "OK"}, nil),
newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil),
newResult(RedisMessage{typ: '+', string: "QUEUED"}, nil),
newResult(RedisMessage{typ: '*', values: []RedisMessage{
{typ: '+', string: "5"},
{typ: '+', string: "6"},
}}, nil),
}}
}
return nil
}}
},
newRetryer(defaultRetryDelayFn),
)
if err != nil {
t.Fatalf("unexpected err %v", err)
}
resps := client.DoMulti(
context.Background(),
client.B().Get().Key("1{t}").Build(),
client.B().Multi().Build(),
client.B().Get().Key("2{t}").Build(),
client.B().Get().Key("3{t}").Build(),
client.B().Exec().Build(),
client.B().Get().Key("4{t}").Build(),
client.B().Multi().Build(),
client.B().Get().Key("5{t}").Build(),
client.B().Get().Key("6{t}").Build(),
client.B().Exec().Build(),
client.B().Get().Key("7{t}").Build(),
)
if v, err := resps[0].ToString(); err != nil || v != "1" {
t.Fatalf("unexpected resp %v %v", v, err)
}
if v, err := resps[1].ToString(); err != nil || v != "OK" {
t.Fatalf("unexpected resp %v %v", v, err)
}
if v, err := resps[2].ToString(); err != nil || v != "QUEUED" {
t.Fatalf("unexpected resp %v %v", v, err)
}
if v, err := resps[3].ToString(); err != nil || v != "QUEUED" {
t.Fatalf("unexpected resp %v %v", v, err)
}
if v, err := resps[4].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"2", "3"}) {
t.Fatalf("unexpected resp %v %v", v, err)
}
if v, err := resps[5].ToString(); err != nil || v != "4" {
t.Fatalf("unexpected resp %v %v", v, err)
}
if v, err := resps[6].ToString(); err != nil || v != "OK" {
t.Fatalf("unexpected resp %v %v", v, err)
}
if v, err := resps[7].ToString(); err != nil || v != "QUEUED" {
t.Fatalf("unexpected resp %v %v", v, err)
}
if v, err := resps[8].ToString(); err != nil || v != "QUEUED" {
t.Fatalf("unexpected resp %v %v", v, err)
}
if v, err := resps[9].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"5", "6"}) {
t.Fatalf("unexpected resp %v %v", v, err)
}
if v, err := resps[10].ToString(); err != nil || v != "7" {
t.Fatalf("unexpected resp %v %v", v, err)
}
})

t.Run("slot moved DoMulti transactions edge cases 1", func(t *testing.T) {
var count int64
client, err := newClusterClient(
Expand Down Expand Up @@ -4545,8 +4843,6 @@ func TestClusterClientErr(t *testing.T) {
newResult(RedisMessage{typ: '-', string: "ERR Command not allowed inside a transaction"}, nil),
newResult(RedisMessage{typ: '-', string: "MOVED 0 :1"}, nil),
}}
case 2:
fmt.Println(multi)
}
return nil
}}
Expand Down

0 comments on commit 060a937

Please sign in to comment.