Skip to content

Commit

Permalink
perf: introduce nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
proost committed Dec 13, 2024
1 parent 91bbcf3 commit 50fad71
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 39 deletions.
43 changes: 20 additions & 23 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,12 @@ func (c *clusterClient) _refresh() (err error) {
for master, g := range groups {
conns[master] = connrole{conn: c.connFn(master, c.opt)}
if c.rOpt != nil {
for _, addr := range g.nodes[1:] {
conns[addr] = connrole{conn: c.connFn(addr, c.rOpt)}
for _, nodeInfo := range g.nodes[1:] {
conns[nodeInfo.Addr] = connrole{conn: c.connFn(nodeInfo.Addr, c.rOpt)}
}
} else {
for _, addr := range g.nodes[1:] {
conns[addr] = connrole{conn: c.connFn(addr, c.opt)}
for _, nodeInfo := range g.nodes[1:] {
conns[nodeInfo.Addr] = connrole{conn: c.connFn(nodeInfo.Addr, c.opt)}
}
}
}
Expand Down Expand Up @@ -250,7 +250,7 @@ func (c *clusterClient) _refresh() (err error) {
nodesCount := len(g.nodes)
for _, slot := range g.slots {
for i := slot[0]; i <= slot[1]; i++ {
pslots[i] = conns[g.nodes[1+util.FastRand(nodesCount-1)]].conn
pslots[i] = conns[g.nodes[1+util.FastRand(nodesCount-1)].Addr].conn
}
}
case c.rOpt != nil:
Expand All @@ -259,18 +259,13 @@ func (c *clusterClient) _refresh() (err error) {
}
if len(g.nodes) > 1 {
n := len(g.nodes) - 1
replicas := make([]ReplicaInfo, 0, n)
for _, addr := range g.nodes[1:] {
replicas = append(replicas, ReplicaInfo{Addr: addr})
}

for _, slot := range g.slots {
for i := slot[0]; i <= slot[1]; i++ {
pslots[i] = conns[master].conn

rIndex := c.opt.ReplicaSelector(uint16(i), replicas)
rIndex := c.opt.ReplicaSelector(uint16(i), g.nodes[1:])
if rIndex >= 0 && rIndex < n {
rslots[i] = conns[g.nodes[1+rIndex]].conn
rslots[i] = conns[g.nodes[1+rIndex].Addr].conn
} else {
rslots[i] = conns[master].conn
}
Expand Down Expand Up @@ -325,8 +320,10 @@ func (c *clusterClient) nodes() []string {
return nodes
}

type nodes []ReplicaInfo

type group struct {
nodes []string
nodes nodes
slots [][2]int64
}

Expand All @@ -352,10 +349,10 @@ func parseSlots(slots RedisMessage, defaultAddr string) map[string]group {
g, ok := groups[master]
if !ok {
g.slots = make([][2]int64, 0)
g.nodes = make([]string, 0, len(v.values)-2)
g.nodes = make(nodes, 0, len(v.values)-2)
for i := 2; i < len(v.values); i++ {
if dst := parseEndpoint(defaultAddr, v.values[i].values[0].string, v.values[i].values[1].integer); dst != "" {
g.nodes = append(g.nodes, dst)
g.nodes = append(g.nodes, ReplicaInfo{Addr: dst})
}
}
}
Expand All @@ -373,16 +370,16 @@ func parseShards(shards RedisMessage, defaultAddr string, tls bool) map[string]g
m := -1
shard, _ := v.AsMap()
slots := shard["slots"].values
nodes := shard["nodes"].values
_nodes := shard["nodes"].values
g := group{
nodes: make([]string, 0, len(nodes)),
nodes: make(nodes, 0, len(_nodes)),
slots: make([][2]int64, len(slots)/2),
}
for i := range g.slots {
g.slots[i][0], _ = slots[i*2].AsInt64()
g.slots[i][1], _ = slots[i*2+1].AsInt64()
}
for _, n := range nodes {
for _, n := range _nodes {
dict, _ := n.AsMap()
if dict["health"].string != "online" {
continue
Expand All @@ -395,12 +392,12 @@ func parseShards(shards RedisMessage, defaultAddr string, tls bool) map[string]g
if dict["role"].string == "master" {
m = len(g.nodes)
}
g.nodes = append(g.nodes, dst)
g.nodes = append(g.nodes, ReplicaInfo{Addr: dst})
}
}
if m >= 0 {
g.nodes[0], g.nodes[m] = g.nodes[m], g.nodes[0]
groups[g.nodes[0]] = g
groups[g.nodes[0].Addr] = g
}
}
return groups
Expand Down Expand Up @@ -1106,15 +1103,15 @@ func (c *clusterClient) Dedicate() (DedicatedClient, func()) {

func (c *clusterClient) Nodes() map[string]Client {
c.mu.RLock()
nodes := make(map[string]Client, len(c.conns))
_nodes := make(map[string]Client, len(c.conns))
disableCache := c.opt != nil && c.opt.DisableCache
for addr, cc := range c.conns {
if !cc.hidden {
nodes[addr] = newSingleClientWithConn(cc.conn, c.cmd, c.retry, disableCache, c.retryHandler)
_nodes[addr] = newSingleClientWithConn(cc.conn, c.cmd, c.retry, disableCache, c.retryHandler)
}
}
c.mu.RUnlock()
return nodes
return _nodes
}

func (c *clusterClient) Close() {
Expand Down
34 changes: 19 additions & 15 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5048,13 +5048,15 @@ func TestClusterShardsParsing(t *testing.T) {
t.Fatalf("unexpected result %v", result)
}
for _, val := range result {
nodes := val.nodes
sort.Strings(nodes)
if len(nodes) != 3 ||
nodes[0] != "127.0.1.1:1" ||
nodes[1] != "127.0.2.1:2" ||
nodes[2] != "127.0.3.1:3" {
t.Fatalf("unexpected nodes %v", nodes)
_nodes := val.nodes
sort.Slice(_nodes, func(i, j int) bool {
return _nodes[i].Addr < _nodes[j].Addr
})
if len(_nodes) != 3 ||
_nodes[0].Addr != "127.0.1.1:1" ||
_nodes[1].Addr != "127.0.2.1:2" ||
_nodes[2].Addr != "127.0.3.1:3" {
t.Fatalf("unexpected nodes %v", _nodes)
}
}

Expand All @@ -5063,13 +5065,15 @@ func TestClusterShardsParsing(t *testing.T) {
t.Fatalf("unexpected result %v", result)
}
for _, val := range result {
nodes := val.nodes
sort.Strings(nodes)
if len(nodes) != 3 ||
nodes[0] != "127.0.1.1:0" ||
nodes[1] != "127.0.2.1:0" ||
nodes[2] != "127.0.3.1:3" {
t.Fatalf("unexpected nodes %v", nodes)
_nodes := val.nodes
sort.Slice(_nodes, func(i, j int) bool {
return _nodes[i].Addr < _nodes[j].Addr
})
if len(_nodes) != 3 ||
_nodes[0].Addr != "127.0.1.1:0" ||
_nodes[1].Addr != "127.0.2.1:0" ||
_nodes[2].Addr != "127.0.3.1:3" {
t.Fatalf("unexpected nodes %v", _nodes)
}
}
})
Expand All @@ -5080,7 +5084,7 @@ func TestClusterShardsParsing(t *testing.T) {
t.Fatalf("unexpected result %v", result)
}
for master, group := range result {
if len(group.nodes) == 0 || group.nodes[0] != master {
if len(group.nodes) == 0 || group.nodes[0].Addr != master {
t.Fatalf("unexpected first node %v", group)
}
}
Expand Down
3 changes: 2 additions & 1 deletion rueidis.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ type ClientOption struct {
// If the returned value is out of range, the primary node will be selected.
// If primary node does not have any replica, the primary node will be selected
// and function will not be called.
// currently only used for cluster client.
// Currently only used for cluster client.
// Each ReplicaInfo must not be modified.
// NOTE: This function can't be used with ReplicaOnly option.
// NOTE: This function must be used with SendToReplicas function.
ReplicaSelector func(slot uint16, replicas []ReplicaInfo) int
Expand Down

0 comments on commit 50fad71

Please sign in to comment.