Skip to content

Commit

Permalink
[Compatibility] Added ZRANGESTORE command (#826)
Browse files Browse the repository at this point in the history
* WIP of ZRANGESTORE

* Final changes

* Added ACL test

* Removed temp code

* Removed TODO command

* Slot Test

* Review command fix
  • Loading branch information
Vijay-Nirmal authored Dec 8, 2024
1 parent c07afca commit 4e7cc74
Show file tree
Hide file tree
Showing 17 changed files with 530 additions and 1 deletion.
86 changes: 86 additions & 0 deletions libs/resources/RespCommandsDocs.json
Original file line number Diff line number Diff line change
Expand Up @@ -6040,6 +6040,92 @@
}
]
},
{
"Command": "ZRANGESTORE",
"Name": "ZRANGESTORE",
"Summary": "Stores a range of members from sorted set in a key.",
"Group": "SortedSet",
"Complexity": "O(log(N)\u002BM) with N being the number of elements in the sorted set and M the number of elements stored into the destination key.",
"Arguments": [
{
"TypeDiscriminator": "RespCommandKeyArgument",
"Name": "DST",
"DisplayText": "dst",
"Type": "Key",
"KeySpecIndex": 0
},
{
"TypeDiscriminator": "RespCommandKeyArgument",
"Name": "SRC",
"DisplayText": "src",
"Type": "Key",
"KeySpecIndex": 1
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "MIN",
"DisplayText": "min",
"Type": "String"
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "MAX",
"DisplayText": "max",
"Type": "String"
},
{
"TypeDiscriminator": "RespCommandContainerArgument",
"Name": "SORTBY",
"Type": "OneOf",
"ArgumentFlags": "Optional",
"Arguments": [
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "BYSCORE",
"DisplayText": "byscore",
"Type": "PureToken",
"Token": "BYSCORE"
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "BYLEX",
"DisplayText": "bylex",
"Type": "PureToken",
"Token": "BYLEX"
}
]
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "REV",
"DisplayText": "rev",
"Type": "PureToken",
"Token": "REV",
"ArgumentFlags": "Optional"
},
{
"TypeDiscriminator": "RespCommandContainerArgument",
"Name": "LIMIT",
"Type": "Block",
"Token": "LIMIT",
"ArgumentFlags": "Optional",
"Arguments": [
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "OFFSET",
"DisplayText": "offset",
"Type": "Integer"
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "COUNT",
"DisplayText": "count",
"Type": "Integer"
}
]
}
]
},
{
"Command": "ZRANK",
"Name": "ZRANK",
Expand Down
38 changes: 38 additions & 0 deletions libs/resources/RespCommandsInfo.json
Original file line number Diff line number Diff line change
Expand Up @@ -4453,6 +4453,44 @@
}
]
},
{
"Command": "ZRANGESTORE",
"Name": "ZRANGESTORE",
"Arity": -5,
"Flags": "DenyOom, Write",
"FirstKey": 1,
"LastKey": 2,
"Step": 1,
"AclCategories": "SortedSet, Slow, Write",
"KeySpecifications": [
{
"BeginSearch": {
"TypeDiscriminator": "BeginSearchIndex",
"Index": 1
},
"FindKeys": {
"TypeDiscriminator": "FindKeysRange",
"LastKey": 0,
"KeyStep": 1,
"Limit": 0
},
"Flags": "OW, Update"
},
{
"BeginSearch": {
"TypeDiscriminator": "BeginSearchIndex",
"Index": 2
},
"FindKeys": {
"TypeDiscriminator": "FindKeysRange",
"LastKey": 0,
"KeyStep": 1,
"Limit": 0
},
"Flags": "RO, Access"
}
]
},
{
"Command": "ZRANK",
"Name": "ZRANK",
Expand Down
4 changes: 4 additions & 0 deletions libs/server/API/GarnetApiObjectCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ public GarnetStatus SortedSetAdd(ArgSlice key, (ArgSlice score, ArgSlice member)
public GarnetStatus SortedSetAdd(byte[] key, ref ObjectInput input, ref GarnetObjectStoreOutput output)
=> storageSession.SortedSetAdd(key, ref input, ref output, ref objectContext);

/// <inheritdoc />
public GarnetStatus SortedSetRangeStore(ArgSlice dstKey, ArgSlice srcKey, ref ObjectInput input, out int result)
=> storageSession.SortedSetRangeStore(dstKey, srcKey, ref input, out result, ref objectContext);

/// <inheritdoc />
public GarnetStatus SortedSetRemove(ArgSlice key, ArgSlice member, out int zremCount)
=> storageSession.SortedSetRemove(key.ToArray(), member, out zremCount, ref objectContext);
Expand Down
10 changes: 10 additions & 0 deletions libs/server/API/IGarnetApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,16 @@ public interface IGarnetApi : IGarnetReadApi, IGarnetAdvancedApi
/// <returns></returns>
GarnetStatus SortedSetAdd(byte[] key, ref ObjectInput input, ref GarnetObjectStoreOutput output);

/// <summary>
/// Stores a range of sorted set elements in the specified key space.
/// </summary>
/// <param name="dstKey">The distribution key for the sorted set.</param>
/// <param name="srcKey">The sub-key for the sorted set.</param>
/// <param name="input">The input object containing the elements to store.</param>
/// <param name="result">The result of the store operation.</param>
/// <returns>A <see cref="GarnetStatus"/> indicating the status of the operation.</returns>
GarnetStatus SortedSetRangeStore(ArgSlice dstKey, ArgSlice srcKey, ref ObjectInput input, out int result);

/// <summary>
/// Removes the specified member from the sorted set stored at key.
/// </summary>
Expand Down
2 changes: 2 additions & 0 deletions libs/server/Objects/SortedSet/SortedSetObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public enum SortedSetOperation : byte
ZRANK,
ZRANGE,
ZRANGEBYSCORE,
ZRANGESTORE,
GEOADD,
GEOHASH,
GEODIST,
Expand Down Expand Up @@ -257,6 +258,7 @@ public override unsafe bool Operate(ref ObjectInput input, ref SpanByteAndMemory
SortedSetRank(ref input, ref output);
break;
case SortedSetOperation.ZRANGE:
case SortedSetOperation.ZRANGESTORE:
SortedSetRange(ref input, ref output);
break;
case SortedSetOperation.ZRANGEBYSCORE:
Expand Down
3 changes: 3 additions & 0 deletions libs/server/Objects/SortedSet/SortedSetObjectImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,9 @@ private void SortedSetRange(ref ObjectInput input, ref SpanByteAndMemory output)
ZRangeOptions options = new();
switch (input.header.SortedSetOp)
{
case SortedSetOperation.ZRANGESTORE:
options.WithScores = true;
break;
case SortedSetOperation.ZRANGEBYSCORE:
options.ByScore = true;
break;
Expand Down
32 changes: 32 additions & 0 deletions libs/server/Resp/Objects/SortedSetCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,38 @@ private unsafe bool SortedSetRange<TGarnetApi>(RespCommand command, ref TGarnetA
return true;
}

private unsafe bool SortedSetRangeStore<TGarnetApi>(ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
// ZRANGESTORE dst src min max [BYSCORE | BYLEX] [REV] [LIMIT offset count]
if (parseState.Count is < 4 or > 9)
{
return AbortWithWrongNumberOfArguments(nameof(RespCommand.ZRANGESTORE));
}

var dstKey = parseState.GetArgSliceByRef(0);
var srcKey = parseState.GetArgSliceByRef(1);

var header = new RespInputHeader(GarnetObjectType.SortedSet) { SortedSetOp = SortedSetOperation.ZRANGESTORE };
var input = new ObjectInput(header, ref parseState, startIdx: 2, arg1: respProtocolVersion);

var status = storageApi.SortedSetRangeStore(dstKey, srcKey, ref input, out int result);

switch (status)
{
case GarnetStatus.OK:
while (!RespWriteUtils.WriteInteger(result, ref dcurr, dend))
SendAndReset();
break;
case GarnetStatus.WRONGTYPE:
while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_WRONG_TYPE, ref dcurr, dend))
SendAndReset();
break;
}

return true;
}

/// <summary>
/// Returns the score of member in the sorted set at key.
/// If member does not exist in the sorted set, or key does not exist, nil is returned.
Expand Down
5 changes: 5 additions & 0 deletions libs/server/Resp/Parser/RespCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ public enum RespCommand : ushort
ZINCRBY,
ZPOPMAX,
ZPOPMIN,
ZRANGESTORE,
ZREM,
ZREMRANGEBYLEX,
ZREMRANGEBYRANK,
Expand Down Expand Up @@ -1418,6 +1419,10 @@ private RespCommand FastParseArrayCommand(ref int count, ref ReadOnlySpan<byte>
{
return RespCommand.INCRBYFLOAT;
}
else if (*(ulong*)(ptr + 2) == MemoryMarshal.Read<ulong>("1\r\nZRANG"u8) && *(ulong*)(ptr + 10) == MemoryMarshal.Read<ulong>("ESTORE\r\n"u8))
{
return RespCommand.ZRANGESTORE;
}
break;

case 12:
Expand Down
1 change: 1 addition & 0 deletions libs/server/Resp/RespServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,7 @@ private bool ProcessArrayCommands<TGarnetApi>(RespCommand cmd, ref TGarnetApi st
RespCommand.ZINCRBY => SortedSetIncrement(ref storageApi),
RespCommand.ZRANK => SortedSetRank(cmd, ref storageApi),
RespCommand.ZRANGE => SortedSetRange(cmd, ref storageApi),
RespCommand.ZRANGESTORE => SortedSetRangeStore(ref storageApi),
RespCommand.ZRANGEBYSCORE => SortedSetRange(cmd, ref storageApi),
RespCommand.ZREVRANK => SortedSetRank(cmd, ref storageApi),
RespCommand.ZREMRANGEBYLEX => SortedSetLengthByValue(cmd, ref storageApi),
Expand Down
105 changes: 105 additions & 0 deletions libs/server/Storage/Session/ObjectStore/SortedSetOps.cs
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,111 @@ public GarnetStatus SortedSetAdd<TObjectContext>(byte[] key, ref ObjectInput inp
where TObjectContext : ITsavoriteContext<byte[], IGarnetObject, ObjectInput, GarnetObjectStoreOutput, long, ObjectSessionFunctions, ObjectStoreFunctions, ObjectStoreAllocator>
=> RMWObjectStoreOperationWithOutput(key, ref input, ref objectStoreContext, ref output);

/// <summary>
/// ZRANGESTORE - Stores a range of sorted set elements into a destination key.
/// </summary>
/// <typeparam name="TObjectContext">The type of the object context.</typeparam>
/// <param name="dstKey">The destination key where the range will be stored.</param>
/// <param name="srcKey">The source key from which the range will be taken.</param>
/// <param name="input">The input object containing range parameters.</param>
/// <param name="result">The result of the operation, indicating the number of elements stored.</param>
/// <param name="objectStoreContext">The context of the object store.</param>
/// <returns>Returns a GarnetStatus indicating the success or failure of the operation.</returns>
public unsafe GarnetStatus SortedSetRangeStore<TObjectContext>(ArgSlice dstKey, ArgSlice srcKey, ref ObjectInput input, out int result, ref TObjectContext objectStoreContext)
where TObjectContext : ITsavoriteContext<byte[], IGarnetObject, ObjectInput, GarnetObjectStoreOutput, long, ObjectSessionFunctions, ObjectStoreFunctions, ObjectStoreAllocator>
{
if (txnManager.ObjectStoreLockableContext.Session is null)
ThrowObjectStoreUninitializedException();

result = 0;

if (dstKey.Length == 0 || srcKey.Length == 0)
return GarnetStatus.OK;

var createTransaction = false;

if (txnManager.state != TxnState.Running)
{
Debug.Assert(txnManager.state == TxnState.None);
createTransaction = true;
txnManager.SaveKeyEntryToLock(dstKey, true, LockType.Exclusive);
txnManager.SaveKeyEntryToLock(srcKey, true, LockType.Shared);
_ = txnManager.Run(true);
}

// SetObject
var objectStoreLockableContext = txnManager.ObjectStoreLockableContext;

try
{
SpanByteAndMemory rangeOutputMem = default;
var rangeOutput = new GarnetObjectStoreOutput() { spanByteAndMemory = rangeOutputMem };
var status = SortedSetRange(srcKey.ToArray(), ref input, ref rangeOutput, ref objectStoreLockableContext);
rangeOutputMem = rangeOutput.spanByteAndMemory;

if (status == GarnetStatus.WRONGTYPE)
{
return GarnetStatus.WRONGTYPE;
}

if (status == GarnetStatus.NOTFOUND)
{
// Expire/Delete the destination key if the source key is not found
_ = EXPIRE(dstKey, TimeSpan.Zero, out _, StoreType.Object, ExpireOption.None, ref lockableContext, ref objectStoreLockableContext);
return GarnetStatus.OK;
}

Debug.Assert(!rangeOutputMem.IsSpanByte, "Output should not be in SpanByte format when the status is OK");

var rangeOutputHandler = rangeOutputMem.Memory.Memory.Pin();
try
{
var rangeOutPtr = (byte*)rangeOutputHandler.Pointer;
ref var currOutPtr = ref rangeOutPtr;
var endOutPtr = rangeOutPtr + rangeOutputMem.Length;

var destinationKey = dstKey.ToArray();
objectStoreLockableContext.Delete(ref destinationKey);

RespReadUtils.ReadUnsignedArrayLength(out var arrayLen, ref currOutPtr, endOutPtr);
Debug.Assert(arrayLen % 2 == 0, "Should always contain element and its score");
result = arrayLen / 2;

if (result > 0)
{
parseState.Initialize(arrayLen); // 2 elements per pair (result * 2)

for (int j = 0; j < result; j++)
{
// Read member/element into parse state
parseState.Read((2 * j) + 1, ref currOutPtr, endOutPtr);
// Read score into parse state
parseState.Read(2 * j, ref currOutPtr, endOutPtr);
}

var zAddInput = new ObjectInput(new RespInputHeader
{
type = GarnetObjectType.SortedSet,
SortedSetOp = SortedSetOperation.ZADD,
}, ref parseState);

var zAddOutput = new GarnetObjectStoreOutput { spanByteAndMemory = new SpanByteAndMemory(null) };
RMWObjectStoreOperationWithOutput(destinationKey, ref zAddInput, ref objectStoreLockableContext, ref zAddOutput);
}
}
finally
{
rangeOutputHandler.Dispose();
}
return status;
}
finally
{
if (createTransaction)
txnManager.Commit(true);
}
}

/// <summary>
/// Removes the specified members from the sorted set stored at key.
/// Non existing members are ignored.
Expand Down
1 change: 1 addition & 0 deletions playground/CommandInfoUpdater/SupportedCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ public class SupportedCommand
new("ZRANDMEMBER", RespCommand.ZRANDMEMBER),
new("ZRANGE", RespCommand.ZRANGE),
new("ZRANGEBYSCORE", RespCommand.ZRANGEBYSCORE),
new("ZRANGESTORE", RespCommand.ZRANGESTORE),
new("ZRANK", RespCommand.ZRANK),
new("ZREM", RespCommand.ZREM),
new("ZREMRANGEBYLEX", RespCommand.ZREMRANGEBYLEX),
Expand Down
Loading

0 comments on commit 4e7cc74

Please sign in to comment.