Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Improve cluster connection pool logic when disconnecting #1864

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
0d2a416
Tell Redis cluster to disable protected mode before running tests
martinslota Mar 6, 2024
5385c8e
Try to enable Redis cluster tests on CI
martinslota Mar 6, 2024
f67d73a
Add a failing test around Redis cluster disconnection logic
martinslota Mar 6, 2024
66efb1a
Rename function parameter
martinslota Mar 6, 2024
f0cadc5
Turn node error listener into an arrow function so that `this` points…
martinslota Mar 6, 2024
49e9edd
Extract node listeners into separate constants
martinslota Mar 6, 2024
dee2623
Keep track of listeners along with each Redis client
martinslota Mar 6, 2024
4d519a6
Remove node listeners when the node is being removed
martinslota Mar 6, 2024
ed3e190
Emit node removal events whenever a node is removed
martinslota Mar 6, 2024
cd3b74a
When resetting, add nodes before removing old ones
martinslota Mar 6, 2024
2b47e94
Rename Node type to NodeRecord for clarity
martinslota Mar 6, 2024
e835185
Also rename the field holding node records
martinslota Mar 6, 2024
eb2d07c
Rename variable to nodeRecord
martinslota Mar 6, 2024
62427a8
Rename another variable to nodeRecord
martinslota Mar 6, 2024
2979176
Fix a reference to connection pool nodes
martinslota Mar 8, 2024
4d4ba69
Do not fail when retrieving a node by non-existing key
martinslota Mar 25, 2024
1c3df53
Merge branch 'main' into martinslota/clean-up-node-listeners-upon-dis…
martinslota Jun 10, 2024
5a490c1
Revert "Fix a reference to connection pool nodes"
martinslota Aug 15, 2024
d67b1b6
Fix a reference to connection pool nodes, this time a bit more correctly
martinslota Aug 15, 2024
d779cb4
Add a valid slots table to mock server in tests that expect to be abl…
martinslota Aug 15, 2024
3165ff0
Do not assume that node removal will occur *after* refreshSlotsCache(…
martinslota Aug 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ jobs:
flag-name: node-${{matrix.node}}
parallel: true

# test-cluster:
# runs-on: ubuntu-latest
# steps:
# - uses: actions/checkout@v2
# - name: Build and test cluster
# run: bash test/cluster/docker/main.sh
test-cluster:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Build and test cluster
run: bash test/cluster/docker/main.sh

code-coverage:
needs: test
Expand Down
2 changes: 1 addition & 1 deletion lib/Pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ Pipeline.prototype.exec = function (callback: Callback): Promise<Array<any>> {
if (_this.isCluster) {
node = {
slot: pipelineSlot,
redis: _this.redis.connectionPool.nodes.all[_this.preferKey],
redis: _this.redis.connectionPool.getInstanceByKey(_this.preferKey),
};
}

Expand Down
106 changes: 60 additions & 46 deletions lib/cluster/ConnectionPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,15 @@ const debug = Debug("cluster:connectionPool");

type NODE_TYPE = "all" | "master" | "slave";

type NodeRecord = {
redis: Redis;
endListener: () => void;
errorListener: (error: unknown) => void;
};

export default class ConnectionPool extends EventEmitter {
// master + slave = all
private nodes: { [key in NODE_TYPE]: { [key: string]: Redis } } = {
private nodeRecords: { [key in NODE_TYPE]: { [key: string]: NodeRecord } } = {
all: {},
master: {},
slave: {},
Expand All @@ -22,51 +28,51 @@ export default class ConnectionPool extends EventEmitter {
}

getNodes(role: NodeRole = "all"): Redis[] {
const nodes = this.nodes[role];
return Object.keys(nodes).map((key) => nodes[key]);
const nodeRecords = this.nodeRecords[role];
return Object.keys(nodeRecords).map((key) => nodeRecords[key].redis);
}

getInstanceByKey(key: NodeKey): Redis {
return this.nodes.all[key];
return this.nodeRecords.all[key]?.redis;
}

getSampleInstance(role: NodeRole): Redis {
const keys = Object.keys(this.nodes[role]);
const keys = Object.keys(this.nodeRecords[role]);
const sampleKey = sample(keys);
return this.nodes[role][sampleKey];
return this.nodeRecords[role][sampleKey].redis;
}

/**
* Find or create a connection to the node
*/
findOrCreate(node: RedisOptions, readOnly = false): Redis {
const key = getNodeKey(node);
findOrCreate(redisOptions: RedisOptions, readOnly = false): NodeRecord {
const key = getNodeKey(redisOptions);
readOnly = Boolean(readOnly);

if (this.specifiedOptions[key]) {
Object.assign(node, this.specifiedOptions[key]);
Object.assign(redisOptions, this.specifiedOptions[key]);
} else {
this.specifiedOptions[key] = node;
this.specifiedOptions[key] = redisOptions;
}

let redis: Redis;
if (this.nodes.all[key]) {
redis = this.nodes.all[key];
if (redis.options.readOnly !== readOnly) {
redis.options.readOnly = readOnly;
let nodeRecord: NodeRecord;
if (this.nodeRecords.all[key]) {
nodeRecord = this.nodeRecords.all[key];
if (nodeRecord.redis.options.readOnly !== readOnly) {
nodeRecord.redis.options.readOnly = readOnly;
debug("Change role of %s to %s", key, readOnly ? "slave" : "master");
redis[readOnly ? "readonly" : "readwrite"]().catch(noop);
nodeRecord.redis[readOnly ? "readonly" : "readwrite"]().catch(noop);
if (readOnly) {
delete this.nodes.master[key];
this.nodes.slave[key] = redis;
delete this.nodeRecords.master[key];
this.nodeRecords.slave[key] = nodeRecord;
} else {
delete this.nodes.slave[key];
this.nodes.master[key] = redis;
delete this.nodeRecords.slave[key];
this.nodeRecords.master[key] = nodeRecord;
}
}
} else {
debug("Connecting to %s as %s", key, readOnly ? "slave" : "master");
redis = new Redis(
const redis = new Redis(
defaults(
{
// Never try to reconnect when a node is lose,
Expand All @@ -79,30 +85,30 @@ export default class ConnectionPool extends EventEmitter {
enableOfflineQueue: true,
readOnly: readOnly,
},
node,
redisOptions,
this.redisOptions,
{ lazyConnect: true }
)
);
this.nodes.all[key] = redis;
this.nodes[readOnly ? "slave" : "master"][key] = redis;

redis.once("end", () => {
const endListener = () => {
this.removeNode(key);
this.emit("-node", redis, key);
if (!Object.keys(this.nodes.all).length) {
this.emit("drain");
}
});
};
const errorListener = (error: unknown) => {
this.emit("nodeError", error, key);
};
nodeRecord = { redis, endListener, errorListener };

this.nodeRecords.all[key] = nodeRecord;
this.nodeRecords[readOnly ? "slave" : "master"][key] = nodeRecord;

redis.once("end", endListener);

this.emit("+node", redis, key);

redis.on("error", function (error) {
this.emit("nodeError", error, key);
});
redis.on("error", errorListener);
}

return redis;
return nodeRecord;
}

/**
Expand All @@ -122,29 +128,37 @@ export default class ConnectionPool extends EventEmitter {
}
});

Object.keys(this.nodes.all).forEach((key) => {
Object.keys(newNodes).forEach((key) => {
const node = newNodes[key];
this.findOrCreate(node, node.readOnly);
});
Object.keys(this.nodeRecords.all).forEach((key) => {
if (!newNodes[key]) {
debug("Disconnect %s because the node does not hold any slot", key);
this.nodes.all[key].disconnect();
this.nodeRecords.all[key].redis.disconnect();
this.removeNode(key);
}
});
Object.keys(newNodes).forEach((key) => {
const node = newNodes[key];
this.findOrCreate(node, node.readOnly);
});
}

/**
* Remove a node from the pool.
*/
private removeNode(key: string): void {
const { nodes } = this;
if (nodes.all[key]) {
const { nodeRecords } = this;
const nodeRecord = nodeRecords.all[key];
if (nodeRecord) {
debug("Remove %s from the pool", key);
delete nodes.all[key];
nodeRecord.redis.removeListener("end", nodeRecord.endListener);
nodeRecord.redis.removeListener("error", nodeRecord.errorListener);
delete nodeRecords.all[key];
delete nodeRecords.master[key];
delete nodeRecords.slave[key];

this.emit("-node", nodeRecord.redis, key);
if (!Object.keys(nodeRecords.all).length) {
this.emit("drain");
}
}
delete nodes.master[key];
delete nodes.slave[key];
}
}
21 changes: 21 additions & 0 deletions test/cluster/basic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,25 @@ describe("cluster", () => {
expect(await cluster2.get("prefix:foo")).to.eql("bar");
});
});

describe("disconnect and connect again", () => {
it("works 20 times in a row", async () => {
const cluster = new Cluster([{ host: "127.0.0.1", port: masters[0] }]);

for (let i = 1; i <= 20; i++) {
await cluster.set("foo", `bar${i}`);

const endPromise = new Promise((resolve) =>
cluster.once("end", resolve)
);
await cluster.quit();
cluster.disconnect();
await endPromise;

cluster.connect();
expect(await cluster.get("foo")).to.equal(`bar${i}`);
await cluster.del("foo");
}
});
});
});
14 changes: 13 additions & 1 deletion test/cluster/docker/main.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
docker run -e "INITIAL_PORT=30000" -e "IP=0.0.0.0" -p 30000-30005:30000-30005 grokzen/redis-cluster:latest &
#!/bin/bash

set -euo pipefail

docker run --rm --name redis-cluster-ioredis-test -e "INITIAL_PORT=30000" -e "IP=0.0.0.0" -p 30000-30005:30000-30005 grokzen/redis-cluster:latest &
trap 'docker stop redis-cluster-ioredis-test' EXIT

npm install

sleep 15

for port in {30000..30005}; do
docker exec redis-cluster-ioredis-test /bin/bash -c "redis-cli -p $port CONFIG SET protected-mode no"
done

npm run test:js:cluster || npm run test:js:cluster || npm run test:js:cluster
7 changes: 6 additions & 1 deletion test/functional/cluster/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,12 @@ describe("cluster:connect", () => {

describe("multiple reconnect", () => {
it("should reconnect after multiple consecutive disconnect(true) are called", (done) => {
new MockServer(30001);
const slotTable = [[0, 16383, ["127.0.0.1", 30001]]];
new MockServer(30001, (argv) => {
if (argv[0] === "cluster" && argv[1] === "SLOTS") {
return slotTable;
}
});
const cluster = new Cluster([{ host: "127.0.0.1", port: "30001" }], {
enableReadyCheck: false,
});
Expand Down
7 changes: 6 additions & 1 deletion test/functional/cluster/disconnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ describe("disconnection", () => {
});

it("should clear all timers on disconnect", (done) => {
const server = new MockServer(30000);
const slotTable = [[0, 16383, ["127.0.0.1", 30000]]];
const server = new MockServer(30000, (argv) => {
if (argv[0] === "cluster" && argv[1] === "SLOTS") {
return slotTable;
}
});

const setIntervalCalls = sinon.spy(global, "setInterval");
const clearIntervalCalls = sinon.spy(global, "clearInterval");
Expand Down
31 changes: 16 additions & 15 deletions test/functional/cluster/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -370,27 +370,28 @@ describe("cluster", () => {
});
cluster.on("ready", () => {
expect(cluster.nodes("master")).to.have.lengthOf(2);
expect(cluster.nodes("all")).to.have.lengthOf(3);
slotTable = [
[0, 5460, ["127.0.0.1", 30003]],
[5461, 10922, ["127.0.0.1", 30002]],
];
cluster.refreshSlotsCache(() => {
cluster.once("-node", function (removed) {
expect(removed.options.port).to.eql(30001);
expect(cluster.nodes("master")).to.have.lengthOf(2);
expect(
[
cluster.nodes("master")[0].options.port,
cluster.nodes("master")[1].options.port,
].sort()
).to.eql([30002, 30003]);
cluster.nodes("master").forEach(function (node) {
expect(node.options).to.have.property("readOnly", false);
});
cluster.disconnect();
done();
cluster.once("-node", function (removed) {
expect(removed.options.port).to.eql(30001);
expect(cluster.nodes("master")).to.have.lengthOf(2);
expect(cluster.nodes("all")).to.have.lengthOf(2);
expect(
[
cluster.nodes("master")[0].options.port,
cluster.nodes("master")[1].options.port,
].sort()
).to.eql([30002, 30003]);
cluster.nodes("master").forEach(function (node) {
expect(node.options).to.have.property("readOnly", false);
});
cluster.disconnect();
done();
});
cluster.refreshSlotsCache();
});
});
});
Expand Down