Skip to content

Commit

Permalink
test: add tests for GlideClusterClient scan with allowNonCoveredSlots…
Browse files Browse the repository at this point in the history
… option

Signed-off-by: avifenesh <[email protected]>
  • Loading branch information
avifenesh committed Dec 14, 2024
1 parent 7e78843 commit e394509
Show file tree
Hide file tree
Showing 4 changed files with 231 additions and 59 deletions.
2 changes: 1 addition & 1 deletion node/src/GlideClusterClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ export namespace GlideClusterClientConfiguration {
/**
* Configuration options for creating a {@link GlideClusterClient | GlideClusterClient}.
*
* Extends `BaseClientConfiguration` with properties specific to `GlideClusterClient`, such as periodic topology checks
* Extends {@link BaseClientConfiguration | BaseClientConfiguration} with properties specific to `GlideClusterClient`, such as periodic topology checks
* and Pub/Sub subscription settings.
*
* @remarks
Expand Down
152 changes: 143 additions & 9 deletions node/tests/ScanTest.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ import {
GlideString,
ObjectType,
ProtocolVersion,
GlideClusterClientConfiguration,
} from "..";
import { ValkeyCluster } from "../../utils/TestUtils.js";
import {
flushAndCloseClient,
getClientConfigurationOption,
getServerVersion,
parseEndpoints,
waitForClusterReady,
} from "./TestUtilities";

const TIMEOUT = 50000;
Expand All @@ -33,12 +35,12 @@ describe("Scan GlideClusterClient", () => {
// Connect to cluster or create a new one based on the parsed addresses
cluster = clusterAddresses
? await ValkeyCluster.initFromExistingCluster(
true,
parseEndpoints(clusterAddresses),
getServerVersion,
)
true,
parseEndpoints(clusterAddresses),
getServerVersion,
)
: // setting replicaCount to 1 to facilitate tests routed to replicas
await ValkeyCluster.createCluster(true, 3, 1, getServerVersion);
await ValkeyCluster.createCluster(true, 3, 1, getServerVersion);
}, 40000);

afterEach(async () => {
Expand Down Expand Up @@ -376,6 +378,138 @@ describe("Scan GlideClusterClient", () => {
},
TIMEOUT,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`GlideClusterClient scan with allowNonCoveredSlots %p`,
async (protocol) => {
const testCluster = await ValkeyCluster.createCluster(
true,
3,
0,
getServerVersion,
);
const config: GlideClusterClientConfiguration = {
addresses: testCluster
.getAddresses()
.map(([host, port]) => ({ host, port })),
requestTimeout: 50,
protocol,
};
const testClient = await GlideClusterClient.createClient(config);

try {
for (let i = 0; i < 10000; i++) {
const result = await testClient.set(`${uuidv4()}`, "value");
expect(result).toBe("OK");
}

// Perform an initial scan to ensure all works as expected
let cursor = new ClusterScanCursor();
let result = await testClient.scan(cursor);
cursor = result[0];
expect(cursor.isFinished()).toBe(false);

// Set 'cluster-require-full-coverage' to 'no' to allow operations with missing slots
await testClient.configSet({
"cluster-require-full-coverage": "no",
});

// Forget one server to simulate a node failure
const addresses = testCluster.getAddresses();
const addressToForget = addresses[0];
const allOtherAddresses = addresses.slice(1);
const idToForget = await testClient.customCommand(
["CLUSTER", "MYID"],
{
route: {
type: "routeByAddress",
host: addressToForget[0],
port: addressToForget[1],
},
},
);

for (const address of allOtherAddresses) {
await testClient.customCommand(
["CLUSTER", "FORGET", idToForget as string],
{
route: {
type: "routeByAddress",
host: address[0],
port: address[1],
},
},
);
}

// Wait for the cluster to stabilize after forgetting a node
const ready = await waitForClusterReady(
testClient,
allOtherAddresses.length,
);
expect(ready).toBe(true);

// Attempt to scan without 'allowNonCoveredSlots', expecting an error
cursor = new ClusterScanCursor();
await expect(async () => {
while (!cursor.isFinished()) {
result = await testClient.scan(cursor);
cursor = result[0];
}
}).rejects.toThrow(
"Could not find an address covering a slot, SCAN operation cannot continue",
);

// Perform scan with 'allowNonCoveredSlots: true'
cursor = new ClusterScanCursor();

while (!cursor.isFinished()) {
result = await testClient.scan(cursor, {
allowNonCoveredSlots: true,
});
cursor = result[0];
}

expect(cursor.isFinished()).toBe(true);

// Get keys using 'KEYS *' from the remaining nodes
const keys: GlideString[] = [];

for (const address of allOtherAddresses) {
const result = await testClient.customCommand(
["KEYS", "*"],
{
route: {
type: "routeByAddress",
host: address[0],
port: address[1],
},
},
);
keys.push(...(result as GlideString[]));
}

// Scan again with 'allowNonCoveredSlots: true' and collect results
cursor = new ClusterScanCursor();
const results: GlideString[] = [];

while (!cursor.isFinished()) {
result = await testClient.scan(cursor, {
allowNonCoveredSlots: true,
});
results.push(...result[1]);
cursor = result[0];
}

// Compare the sets of keys obtained from 'KEYS *' and 'SCAN'
expect(new Set(results)).toEqual(new Set(keys));
} finally {
testClient.close();
await testCluster.close();
}
},
TIMEOUT,
);
});

//standalone tests
Expand All @@ -387,10 +521,10 @@ describe("Scan GlideClient", () => {
const standaloneAddresses = global.STAND_ALONE_ENDPOINT;
cluster = standaloneAddresses
? await ValkeyCluster.initFromExistingCluster(
false,
parseEndpoints(standaloneAddresses),
getServerVersion,
)
false,
parseEndpoints(standaloneAddresses),
getServerVersion,
)
: await ValkeyCluster.createCluster(false, 1, 1, getServerVersion);
}, 20000);

Expand Down
128 changes: 83 additions & 45 deletions node/tests/TestUtilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,44 @@ function intoArrayInternal(obj: any, builder: string[]) {
}
}

export async function waitForClusterReady(
client: GlideClusterClient,
count: number,
): Promise<boolean> {
const timeout = 20000; // 20 seconds timeout in milliseconds
const startTime = Date.now();

while (true) {

if (Date.now() - startTime > timeout) {
return false;
}

const clusterInfo = await client.customCommand(["CLUSTER", "INFO"]);
// parse the response
const clusterInfoMap = new Map<string, string>();

if (clusterInfo) {
const clusterInfoLines = clusterInfo.toString().split("\n").filter((line) => line.length > 0);

for (const line of clusterInfoLines) {

const [key, value] = line.split(":");

clusterInfoMap.set(key.trim(), value.trim());
}

if (clusterInfoMap.get("cluster_state") == "ok" && Number(clusterInfoMap.get("cluster_known_nodes")) == count) {
break;
}
}

await new Promise((resolve) => setTimeout(resolve, 2000));
}

return true;
}

/**
* accept any variable `v` and convert it into String, recursively
*/
Expand Down Expand Up @@ -430,7 +468,7 @@ export function checkFunctionListResponse(
typeof libName === "string"
? libName === lib["library_name"]
: (libName as Buffer).compare(lib["library_name"] as Buffer) ==
0;
0;

if (hasLib) {
const functions = lib["functions"];
Expand Down Expand Up @@ -483,7 +521,7 @@ export function checkFunctionStatsResponse(
if (response.running_script !== null && runningFunction.length == 0) {
fail(
"Unexpected running function info: " +
(response.running_script.command as string[]).join(" "),
(response.running_script.command as string[]).join(" "),
);
}

Expand Down Expand Up @@ -523,10 +561,10 @@ export function validateTransactionResponse(
const actual =
response?.[i] instanceof Map
? JSON.stringify(
Array.from(
(response?.[i] as ReturnTypeMap)?.entries(),
),
)
Array.from(
(response?.[i] as ReturnTypeMap)?.entries(),
),
)
: JSON.stringify(response?.[i]);
failedChecks.push(
`${testName} failed, expected <${expected}>, actual <${actual}>`,
Expand All @@ -537,7 +575,7 @@ export function validateTransactionResponse(
if (failedChecks.length > 0) {
throw new Error(
"Checks failed in transaction response:\n" +
failedChecks.join("\n"),
failedChecks.join("\n"),
);
}
}
Expand Down Expand Up @@ -701,33 +739,33 @@ export async function transactionTest(
palermo,
catania,
] = [
decodeString(fieldStr, decoder),
decodeString(fieldStr + 1, decoder),
decodeString(fieldStr + 2, decoder),
decodeString(fieldStr + 3, decoder),
decodeString(fieldStr + 4, decoder),
decodeString("value1", decoder),
decodeString("value2", decoder),
decodeString("value3", decoder),
decodeString("foo", decoder),
decodeString("bar", decoder),
decodeString("baz", decoder),
decodeString("test_message", decoder),
decodeString("one", decoder),
decodeString("two", decoder),
decodeString("three", decoder),
decodeString("_", decoder),
decodeString("non_existing_member", decoder),
decodeString("member1", decoder),
decodeString("member2", decoder),
decodeString("member3", decoder),
decodeString("member4", decoder),
decodeString("member5", decoder),
decodeString("member6", decoder),
decodeString("member7", decoder),
decodeString("Palermo", decoder),
decodeString("Catania", decoder),
];
decodeString(fieldStr, decoder),
decodeString(fieldStr + 1, decoder),
decodeString(fieldStr + 2, decoder),
decodeString(fieldStr + 3, decoder),
decodeString(fieldStr + 4, decoder),
decodeString("value1", decoder),
decodeString("value2", decoder),
decodeString("value3", decoder),
decodeString("foo", decoder),
decodeString("bar", decoder),
decodeString("baz", decoder),
decodeString("test_message", decoder),
decodeString("one", decoder),
decodeString("two", decoder),
decodeString("three", decoder),
decodeString("_", decoder),
decodeString("non_existing_member", decoder),
decodeString("member1", decoder),
decodeString("member2", decoder),
decodeString("member3", decoder),
decodeString("member4", decoder),
decodeString("member5", decoder),
decodeString("member6", decoder),
decodeString("member7", decoder),
decodeString("Palermo", decoder),
decodeString("Catania", decoder),
];

// array of tuples - first element is test name/description, second - expected return value
const responseData: [string, GlideReturnType][] = [];
Expand Down Expand Up @@ -1425,18 +1463,18 @@ export async function transactionTest(
'xautoclaim(key9, groupName1, consumer, 0, "0-0", { count: 1 })',
!cluster.checkIfServerVersionLessThan("7.0.0")
? [
"0-0",
convertRecordToGlideRecord({
"0-2": [[field.toString(), value2.toString()]],
}),
[],
]
"0-0",
convertRecordToGlideRecord({
"0-2": [[field.toString(), value2.toString()]],
}),
[],
]
: [
"0-0",
convertRecordToGlideRecord({
"0-2": [[field.toString(), value2.toString()]],
}),
],
"0-0",
convertRecordToGlideRecord({
"0-2": [[field.toString(), value2.toString()]],
}),
],
]);
baseTransaction.xautoclaimJustId(key9, groupName1, consumer, 0, "0-0");
responseData.push([
Expand Down
8 changes: 4 additions & 4 deletions utils/TestUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ function parseOutput(input: string): {
.split(",")
.map((address) => address.split(":"))
.map((address) => [address[0], Number(address[1])]) as [
string,
number,
][];
string,
number,
][];

if (clusterFolder === undefined || ports === undefined) {
throw new Error(`Insufficient data in input: ${input}`);
Expand Down Expand Up @@ -82,7 +82,7 @@ export class ValkeyCluster {
execFile(
"python3",
[PY_SCRIPT_PATH, ...command.split(" ")],
(error, stdout, stderr) => {
(error, stdout) => {
if (error) {
reject(error);
} else {
Expand Down

0 comments on commit e394509

Please sign in to comment.