diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 040df430..3abe3fea 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -35,12 +35,12 @@ jobs: flag-name: node-${{matrix.node}} parallel: true - # test-cluster: - # runs-on: ubuntu-latest - # steps: - # - uses: actions/checkout@v2 - # - name: Build and test cluster - # run: bash test/cluster/docker/main.sh + test-cluster: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - name: Build and test cluster + run: bash test/cluster/docker/main.sh code-coverage: needs: test diff --git a/lib/Pipeline.ts b/lib/Pipeline.ts index cf128fbf..7be5cb40 100644 --- a/lib/Pipeline.ts +++ b/lib/Pipeline.ts @@ -343,7 +343,7 @@ Pipeline.prototype.exec = function (callback: Callback): Promise> { if (_this.isCluster) { node = { slot: pipelineSlot, - redis: _this.redis.connectionPool.nodes.all[_this.preferKey], + redis: _this.redis.connectionPool.getInstanceByKey(_this.preferKey), }; } diff --git a/lib/cluster/ConnectionPool.ts b/lib/cluster/ConnectionPool.ts index dbab62c9..5a117bad 100644 --- a/lib/cluster/ConnectionPool.ts +++ b/lib/cluster/ConnectionPool.ts @@ -7,9 +7,15 @@ const debug = Debug("cluster:connectionPool"); type NODE_TYPE = "all" | "master" | "slave"; +type NodeRecord = { + redis: Redis; + endListener: () => void; + errorListener: (error: unknown) => void; +}; + export default class ConnectionPool extends EventEmitter { // master + slave = all - private nodes: { [key in NODE_TYPE]: { [key: string]: Redis } } = { + private nodeRecords: { [key in NODE_TYPE]: { [key: string]: NodeRecord } } = { all: {}, master: {}, slave: {}, @@ -22,51 +28,51 @@ export default class ConnectionPool extends EventEmitter { } getNodes(role: NodeRole = "all"): Redis[] { - const nodes = this.nodes[role]; - return Object.keys(nodes).map((key) => nodes[key]); + const nodeRecords = this.nodeRecords[role]; + return Object.keys(nodeRecords).map((key) => nodeRecords[key].redis); } getInstanceByKey(key: NodeKey): Redis { - return this.nodes.all[key]; + return this.nodeRecords.all[key]?.redis; } getSampleInstance(role: NodeRole): Redis { - const keys = Object.keys(this.nodes[role]); + const keys = Object.keys(this.nodeRecords[role]); const sampleKey = sample(keys); - return this.nodes[role][sampleKey]; + return this.nodeRecords[role][sampleKey].redis; } /** * Find or create a connection to the node */ - findOrCreate(node: RedisOptions, readOnly = false): Redis { - const key = getNodeKey(node); + findOrCreate(redisOptions: RedisOptions, readOnly = false): NodeRecord { + const key = getNodeKey(redisOptions); readOnly = Boolean(readOnly); if (this.specifiedOptions[key]) { - Object.assign(node, this.specifiedOptions[key]); + Object.assign(redisOptions, this.specifiedOptions[key]); } else { - this.specifiedOptions[key] = node; + this.specifiedOptions[key] = redisOptions; } - let redis: Redis; - if (this.nodes.all[key]) { - redis = this.nodes.all[key]; - if (redis.options.readOnly !== readOnly) { - redis.options.readOnly = readOnly; + let nodeRecord: NodeRecord; + if (this.nodeRecords.all[key]) { + nodeRecord = this.nodeRecords.all[key]; + if (nodeRecord.redis.options.readOnly !== readOnly) { + nodeRecord.redis.options.readOnly = readOnly; debug("Change role of %s to %s", key, readOnly ? "slave" : "master"); - redis[readOnly ? "readonly" : "readwrite"]().catch(noop); + nodeRecord.redis[readOnly ? "readonly" : "readwrite"]().catch(noop); if (readOnly) { - delete this.nodes.master[key]; - this.nodes.slave[key] = redis; + delete this.nodeRecords.master[key]; + this.nodeRecords.slave[key] = nodeRecord; } else { - delete this.nodes.slave[key]; - this.nodes.master[key] = redis; + delete this.nodeRecords.slave[key]; + this.nodeRecords.master[key] = nodeRecord; } } } else { debug("Connecting to %s as %s", key, readOnly ? "slave" : "master"); - redis = new Redis( + const redis = new Redis( defaults( { // Never try to reconnect when a node is lose, @@ -79,30 +85,30 @@ export default class ConnectionPool extends EventEmitter { enableOfflineQueue: true, readOnly: readOnly, }, - node, + redisOptions, this.redisOptions, { lazyConnect: true } ) ); - this.nodes.all[key] = redis; - this.nodes[readOnly ? "slave" : "master"][key] = redis; - - redis.once("end", () => { + const endListener = () => { this.removeNode(key); - this.emit("-node", redis, key); - if (!Object.keys(this.nodes.all).length) { - this.emit("drain"); - } - }); + }; + const errorListener = (error: unknown) => { + this.emit("nodeError", error, key); + }; + nodeRecord = { redis, endListener, errorListener }; + + this.nodeRecords.all[key] = nodeRecord; + this.nodeRecords[readOnly ? "slave" : "master"][key] = nodeRecord; + + redis.once("end", endListener); this.emit("+node", redis, key); - redis.on("error", function (error) { - this.emit("nodeError", error, key); - }); + redis.on("error", errorListener); } - return redis; + return nodeRecord; } /** @@ -122,29 +128,37 @@ export default class ConnectionPool extends EventEmitter { } }); - Object.keys(this.nodes.all).forEach((key) => { + Object.keys(newNodes).forEach((key) => { + const node = newNodes[key]; + this.findOrCreate(node, node.readOnly); + }); + Object.keys(this.nodeRecords.all).forEach((key) => { if (!newNodes[key]) { debug("Disconnect %s because the node does not hold any slot", key); - this.nodes.all[key].disconnect(); + this.nodeRecords.all[key].redis.disconnect(); this.removeNode(key); } }); - Object.keys(newNodes).forEach((key) => { - const node = newNodes[key]; - this.findOrCreate(node, node.readOnly); - }); } /** * Remove a node from the pool. */ private removeNode(key: string): void { - const { nodes } = this; - if (nodes.all[key]) { + const { nodeRecords } = this; + const nodeRecord = nodeRecords.all[key]; + if (nodeRecord) { debug("Remove %s from the pool", key); - delete nodes.all[key]; + nodeRecord.redis.removeListener("end", nodeRecord.endListener); + nodeRecord.redis.removeListener("error", nodeRecord.errorListener); + delete nodeRecords.all[key]; + delete nodeRecords.master[key]; + delete nodeRecords.slave[key]; + + this.emit("-node", nodeRecord.redis, key); + if (!Object.keys(nodeRecords.all).length) { + this.emit("drain"); + } } - delete nodes.master[key]; - delete nodes.slave[key]; } } diff --git a/test/cluster/basic.ts b/test/cluster/basic.ts index 98d5dafe..deb9882e 100644 --- a/test/cluster/basic.ts +++ b/test/cluster/basic.ts @@ -148,4 +148,25 @@ describe("cluster", () => { expect(await cluster2.get("prefix:foo")).to.eql("bar"); }); }); + + describe("disconnect and connect again", () => { + it("works 20 times in a row", async () => { + const cluster = new Cluster([{ host: "127.0.0.1", port: masters[0] }]); + + for (let i = 1; i <= 20; i++) { + await cluster.set("foo", `bar${i}`); + + const endPromise = new Promise((resolve) => + cluster.once("end", resolve) + ); + await cluster.quit(); + cluster.disconnect(); + await endPromise; + + cluster.connect(); + expect(await cluster.get("foo")).to.equal(`bar${i}`); + await cluster.del("foo"); + } + }); + }); }); diff --git a/test/cluster/docker/main.sh b/test/cluster/docker/main.sh index f5f75df7..cf91dead 100755 --- a/test/cluster/docker/main.sh +++ b/test/cluster/docker/main.sh @@ -1,4 +1,16 @@ -docker run -e "INITIAL_PORT=30000" -e "IP=0.0.0.0" -p 30000-30005:30000-30005 grokzen/redis-cluster:latest & +#!/bin/bash + +set -euo pipefail + +docker run --rm --name redis-cluster-ioredis-test -e "INITIAL_PORT=30000" -e "IP=0.0.0.0" -p 30000-30005:30000-30005 grokzen/redis-cluster:latest & +trap 'docker stop redis-cluster-ioredis-test' EXIT + npm install + sleep 15 + +for port in {30000..30005}; do + docker exec redis-cluster-ioredis-test /bin/bash -c "redis-cli -p $port CONFIG SET protected-mode no" +done + npm run test:js:cluster || npm run test:js:cluster || npm run test:js:cluster diff --git a/test/functional/cluster/connect.ts b/test/functional/cluster/connect.ts index a2346f23..c0caf114 100644 --- a/test/functional/cluster/connect.ts +++ b/test/functional/cluster/connect.ts @@ -391,7 +391,12 @@ describe("cluster:connect", () => { describe("multiple reconnect", () => { it("should reconnect after multiple consecutive disconnect(true) are called", (done) => { - new MockServer(30001); + const slotTable = [[0, 16383, ["127.0.0.1", 30001]]]; + new MockServer(30001, (argv) => { + if (argv[0] === "cluster" && argv[1] === "SLOTS") { + return slotTable; + } + }); const cluster = new Cluster([{ host: "127.0.0.1", port: "30001" }], { enableReadyCheck: false, }); diff --git a/test/functional/cluster/disconnection.ts b/test/functional/cluster/disconnection.ts index 40e33e01..1adafcb3 100644 --- a/test/functional/cluster/disconnection.ts +++ b/test/functional/cluster/disconnection.ts @@ -9,7 +9,12 @@ describe("disconnection", () => { }); it("should clear all timers on disconnect", (done) => { - const server = new MockServer(30000); + const slotTable = [[0, 16383, ["127.0.0.1", 30000]]]; + const server = new MockServer(30000, (argv) => { + if (argv[0] === "cluster" && argv[1] === "SLOTS") { + return slotTable; + } + }); const setIntervalCalls = sinon.spy(global, "setInterval"); const clearIntervalCalls = sinon.spy(global, "clearInterval"); diff --git a/test/functional/cluster/index.ts b/test/functional/cluster/index.ts index e31bdd91..c4b09a7d 100644 --- a/test/functional/cluster/index.ts +++ b/test/functional/cluster/index.ts @@ -370,27 +370,28 @@ describe("cluster", () => { }); cluster.on("ready", () => { expect(cluster.nodes("master")).to.have.lengthOf(2); + expect(cluster.nodes("all")).to.have.lengthOf(3); slotTable = [ [0, 5460, ["127.0.0.1", 30003]], [5461, 10922, ["127.0.0.1", 30002]], ]; - cluster.refreshSlotsCache(() => { - cluster.once("-node", function (removed) { - expect(removed.options.port).to.eql(30001); - expect(cluster.nodes("master")).to.have.lengthOf(2); - expect( - [ - cluster.nodes("master")[0].options.port, - cluster.nodes("master")[1].options.port, - ].sort() - ).to.eql([30002, 30003]); - cluster.nodes("master").forEach(function (node) { - expect(node.options).to.have.property("readOnly", false); - }); - cluster.disconnect(); - done(); + cluster.once("-node", function (removed) { + expect(removed.options.port).to.eql(30001); + expect(cluster.nodes("master")).to.have.lengthOf(2); + expect(cluster.nodes("all")).to.have.lengthOf(2); + expect( + [ + cluster.nodes("master")[0].options.port, + cluster.nodes("master")[1].options.port, + ].sort() + ).to.eql([30002, 30003]); + cluster.nodes("master").forEach(function (node) { + expect(node.options).to.have.property("readOnly", false); }); + cluster.disconnect(); + done(); }); + cluster.refreshSlotsCache(); }); }); });