Skip to content

Commit

Permalink
[Compatibility] Added ZMPOP command (#840)
Browse files Browse the repository at this point in the history
* Added ZMPOP command

* Format fix

* Fixed Test case

* Cluster test fix

* Review command fixes

* Review command fix

* Fixed RESP format
  • Loading branch information
Vijay-Nirmal authored Dec 9, 2024
1 parent 4e7cc74 commit b882e14
Show file tree
Hide file tree
Showing 16 changed files with 487 additions and 1 deletion.
52 changes: 52 additions & 0 deletions libs/resources/RespCommandsDocs.json
Original file line number Diff line number Diff line change
Expand Up @@ -5787,6 +5787,58 @@
}
]
},
{
"Command": "ZMPOP",
"Name": "ZMPOP",
"Summary": "Returns the highest- or lowest-scoring members from one or more sorted sets after removing them. Deletes the sorted set if the last member was popped.",
"Group": "SortedSet",
"Complexity": "O(K) \u002B O(M*log(N)) where K is the number of provided keys, N being the number of elements in the sorted set, and M being the number of elements popped.",
"Arguments": [
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "NUMKEYS",
"DisplayText": "numkeys",
"Type": "Integer"
},
{
"TypeDiscriminator": "RespCommandKeyArgument",
"Name": "KEY",
"DisplayText": "key",
"Type": "Key",
"ArgumentFlags": "Multiple",
"KeySpecIndex": 0
},
{
"TypeDiscriminator": "RespCommandContainerArgument",
"Name": "WHERE",
"Type": "OneOf",
"Arguments": [
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "MIN",
"DisplayText": "min",
"Type": "PureToken",
"Token": "MIN"
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "MAX",
"DisplayText": "max",
"Type": "PureToken",
"Token": "MAX"
}
]
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "COUNT",
"DisplayText": "count",
"Type": "Integer",
"Token": "COUNT",
"ArgumentFlags": "Optional"
}
]
},
{
"Command": "ZMSCORE",
"Name": "ZMSCORE",
Expand Down
22 changes: 22 additions & 0 deletions libs/resources/RespCommandsInfo.json
Original file line number Diff line number Diff line change
Expand Up @@ -4300,6 +4300,28 @@
}
]
},
{
"Command": "ZMPOP",
"Name": "ZMPOP",
"Arity": -4,
"Flags": "MovableKeys, Write",
"AclCategories": "SortedSet, Slow, Write",
"KeySpecifications": [
{
"BeginSearch": {
"TypeDiscriminator": "BeginSearchIndex",
"Index": 1
},
"FindKeys": {
"TypeDiscriminator": "FindKeysKeyNum",
"KeyNumIdx": 0,
"FirstKey": 1,
"KeyStep": 1
},
"Flags": "RW, Access, Delete"
}
]
},
{
"Command": "ZMSCORE",
"Name": "ZMSCORE",
Expand Down
4 changes: 4 additions & 0 deletions libs/server/API/GarnetApiObjectCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ public GarnetStatus SortedSetScores(byte[] key, ref ObjectInput input, ref Garne
public GarnetStatus SortedSetPop(byte[] key, ref ObjectInput input, ref GarnetObjectStoreOutput outputFooter)
=> storageSession.SortedSetPop(key, ref input, ref outputFooter, ref objectContext);

/// <inheritdoc />
public GarnetStatus SortedSetMPop(ReadOnlySpan<ArgSlice> keys, int count, bool lowScoresFirst, out ArgSlice poppedKey, out (ArgSlice member, ArgSlice score)[] pairs)
=> storageSession.SortedSetMPop(keys, count, lowScoresFirst, out poppedKey, out pairs);

/// <inheritdoc />
public GarnetStatus SortedSetPop(ArgSlice key, out (ArgSlice member, ArgSlice score)[] pairs, int count = 1, bool lowScoresFirst = true)
=> storageSession.SortedSetPop(key, count, lowScoresFirst, out pairs, ref objectContext);
Expand Down
11 changes: 11 additions & 0 deletions libs/server/API/IGarnetApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,17 @@ public interface IGarnetApi : IGarnetReadApi, IGarnetAdvancedApi
/// <returns></returns>
GarnetStatus SortedSetPop(byte[] key, ref ObjectInput input, ref GarnetObjectStoreOutput outputFooter);

/// <summary>
/// Removes and returns multiple elements from a sorted set.
/// </summary>
/// <param name="keys">The keys of the sorted set.</param>
/// <param name="count">The number of elements to pop.</param>
/// <param name="lowScoresFirst">If true, elements with the lowest scores are popped first; otherwise, elements with the highest scores are popped first.</param>
/// <param name="poppedKey">The key of the popped element.</param>
/// <param name="pairs">An array of tuples containing the member and score of each popped element.</param>
/// <returns>A <see cref="GarnetStatus"/> indicating the result of the operation.</returns>
GarnetStatus SortedSetMPop(ReadOnlySpan<ArgSlice> keys, int count, bool lowScoresFirst, out ArgSlice poppedKey, out (ArgSlice member, ArgSlice score)[] pairs);

/// <summary>
/// Removes and returns up to count members with the highest or lowest scores in the sorted set stored at key.
/// </summary>
Expand Down
2 changes: 2 additions & 0 deletions libs/server/Resp/CmdStrings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ static partial class CmdStrings
public static ReadOnlySpan<byte> BYLEX => "BYLEX"u8;
public static ReadOnlySpan<byte> REV => "REV"u8;
public static ReadOnlySpan<byte> LIMIT => "LIMIT"u8;
public static ReadOnlySpan<byte> MIN => "MIN"u8;
public static ReadOnlySpan<byte> MAX => "MAX"u8;

/// <summary>
/// Response strings
Expand Down
117 changes: 117 additions & 0 deletions libs/server/Resp/Objects/SortedSetCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT license.

using System;
using System.Text;
using Garnet.common;
using Tsavorite.core;

Expand Down Expand Up @@ -389,6 +390,122 @@ private unsafe bool SortedSetPop<TGarnetApi>(RespCommand command, ref TGarnetApi
return true;
}

/// <summary>
/// Removes and returns up to count members from the first non-empty sorted set key from the list of keys.
/// </summary>
private unsafe bool SortedSetMPop<TGarnetApi>(ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
// ZMPOP requires at least 3 args: numkeys, key [key...], <MIN|MAX> [COUNT count]
if (parseState.Count < 3)
{
return AbortWithWrongNumberOfArguments(nameof(RespCommand.ZMPOP));
}

// Get number of keys
if (!parseState.TryGetInt(0, out var numKeys) || numKeys < 1)
{
return AbortWithErrorMessage(CmdStrings.RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER);
}

if (numKeys < 0)
{
return AbortWithErrorMessage(Encoding.ASCII.GetBytes(string.Format(CmdStrings.GenericParamShouldBeGreaterThanZero, "numkeys")));
}

// Validate we have enough arguments (no of keys + (MIN or MAX))
if (parseState.Count < numKeys + 2)
{
return AbortWithErrorMessage(CmdStrings.RESP_SYNTAX_ERROR);
}

// Get MIN/MAX argument
var orderArg = parseState.GetArgSliceByRef(numKeys + 1);
var orderSpan = orderArg.ReadOnlySpan;
var lowScoresFirst = true;

if (orderSpan.EqualsUpperCaseSpanIgnoringCase(CmdStrings.MIN))
lowScoresFirst = true;
else if (orderSpan.EqualsUpperCaseSpanIgnoringCase(CmdStrings.MAX))
lowScoresFirst = false;
else
{
return AbortWithErrorMessage(CmdStrings.RESP_SYNTAX_ERROR);
}

// Parse optional COUNT argument
var count = 1;
if (parseState.Count > numKeys + 2)
{
if (parseState.Count != numKeys + 4)
{
return AbortWithErrorMessage(CmdStrings.RESP_SYNTAX_ERROR);
}

var countArg = parseState.GetArgSliceByRef(numKeys + 2);
if (!countArg.ReadOnlySpan.EqualsUpperCaseSpanIgnoringCase(CmdStrings.COUNT))
{
return AbortWithErrorMessage(CmdStrings.RESP_SYNTAX_ERROR);
}

if (!parseState.TryGetInt(numKeys + 3, out count) || count < 1)
{
return AbortWithErrorMessage(CmdStrings.RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER);
}

if (count < 0)
{
return AbortWithErrorMessage(Encoding.ASCII.GetBytes(string.Format(CmdStrings.GenericParamShouldBeGreaterThanZero, "count")));
}
}

var keys = parseState.Parameters.Slice(1, numKeys);
var status = storageApi.SortedSetMPop(keys, count, lowScoresFirst, out var poppedKey, out var pairs);

switch (status)
{
case GarnetStatus.OK:
if (pairs == null || pairs.Length == 0)
{
// No elements found
while (!RespWriteUtils.WriteNull(ref dcurr, dend))
SendAndReset();
}
else
{
// Write array with 2 elements: key and array of elements
while (!RespWriteUtils.WriteArrayLength(2, ref dcurr, dend))
SendAndReset();

// Write key
while (!RespWriteUtils.WriteBulkString(poppedKey.ReadOnlySpan, ref dcurr, dend))
SendAndReset();

// Write array of member-score pairs
while (!RespWriteUtils.WriteArrayLength(pairs.Length, ref dcurr, dend))
SendAndReset();

foreach (var (member, score) in pairs)
{
while (!RespWriteUtils.WriteArrayLength(2, ref dcurr, dend))
SendAndReset();
while (!RespWriteUtils.WriteBulkString(member.ReadOnlySpan, ref dcurr, dend))
SendAndReset();
while (!RespWriteUtils.WriteBulkString(score.ReadOnlySpan, ref dcurr, dend))
SendAndReset();
}
}
break;

case GarnetStatus.WRONGTYPE:
while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_WRONG_TYPE, ref dcurr, dend))
SendAndReset();
break;
}

return true;
}

/// <summary>
/// Returns the number of elements in the sorted set at key with a score between min and max.
/// </summary>
Expand Down
5 changes: 5 additions & 0 deletions libs/server/Resp/Parser/RespCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ public enum RespCommand : ushort
ZADD,
ZDIFFSTORE,
ZINCRBY,
ZMPOP,
ZPOPMAX,
ZPOPMIN,
ZRANGESTORE,
Expand Down Expand Up @@ -1044,6 +1045,10 @@ private RespCommand FastParseArrayCommand(ref int count, ref ReadOnlySpan<byte>
{
return RespCommand.ZSCAN;
}
else if (*(ulong*)(ptr + 3) == MemoryMarshal.Read<ulong>("\nZMPOP\r\n"u8))
{
return RespCommand.ZMPOP;
}
break;
}
break;
Expand Down
1 change: 1 addition & 0 deletions libs/server/Resp/RespServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,7 @@ private bool ProcessArrayCommands<TGarnetApi>(RespCommand cmd, ref TGarnetApi st
RespCommand.ZREMRANGEBYSCORE => SortedSetRemoveRange(cmd, ref storageApi),
RespCommand.ZLEXCOUNT => SortedSetLengthByValue(cmd, ref storageApi),
RespCommand.ZPOPMIN => SortedSetPop(cmd, ref storageApi),
RespCommand.ZMPOP => SortedSetMPop(ref storageApi),
RespCommand.ZRANDMEMBER => SortedSetRandomMember(ref storageApi),
RespCommand.ZDIFF => SortedSetDifference(ref storageApi),
RespCommand.ZDIFFSTORE => SortedSetDifferenceStore(ref storageApi),
Expand Down
56 changes: 56 additions & 0 deletions libs/server/Storage/Session/ObjectStore/SortedSetOps.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1032,5 +1032,61 @@ private GarnetStatus SortedSetDifference<TObjectContext>(ReadOnlySpan<ArgSlice>

return GarnetStatus.OK;
}

/// <summary>
/// Removes and returns up to count members and their scores from the first sorted set that contains a member.
/// </summary>
public unsafe GarnetStatus SortedSetMPop(ReadOnlySpan<ArgSlice> keys, int count, bool lowScoresFirst, out ArgSlice poppedKey, out (ArgSlice member, ArgSlice score)[] pairs)
{
if (txnManager.ObjectStoreLockableContext.Session is null)
ThrowObjectStoreUninitializedException();

pairs = default;
poppedKey = default;

if (keys.Length == 0)
return GarnetStatus.OK;

var createTransaction = false;

if (txnManager.state != TxnState.Running)
{
Debug.Assert(txnManager.state == TxnState.None);
createTransaction = true;
foreach (var key in keys)
txnManager.SaveKeyEntryToLock(key, true, LockType.Exclusive);
txnManager.Run(true);
}

var storeLockableContext = txnManager.ObjectStoreLockableContext;

try
{
// Try popping from each key until we find one with members
foreach (var key in keys)
{
if (key.Length == 0) continue;

var status = SortedSetPop(key, count, lowScoresFirst, out pairs, ref storeLockableContext);
if (status == GarnetStatus.OK && pairs != null && pairs.Length > 0)
{
poppedKey = key;
return status;
}

if (status != GarnetStatus.OK && status != GarnetStatus.NOTFOUND)
{
return status;
}
}

return GarnetStatus.OK;
}
finally
{
if (createTransaction)
txnManager.Commit(true);
}
}
}
}
1 change: 1 addition & 0 deletions playground/CommandInfoUpdater/SupportedCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ public class SupportedCommand
new("ZINCRBY", RespCommand.ZINCRBY),
new("ZLEXCOUNT", RespCommand.ZLEXCOUNT),
new("ZMSCORE", RespCommand.ZMSCORE),
new("ZMPOP", RespCommand.ZMPOP),
new("ZPOPMAX", RespCommand.ZPOPMAX),
new("ZPOPMIN", RespCommand.ZPOPMIN),
new("ZRANDMEMBER", RespCommand.ZRANDMEMBER),
Expand Down
29 changes: 29 additions & 0 deletions test/Garnet.test.cluster/RedirectTests/BaseCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2026,6 +2026,35 @@ public override ArraySegment<string>[] SetupSingleSlotRequest()
}
}

internal class ZMPOP : BaseCommand
{
public override bool IsArrayCommand => true;
public override bool ArrayResponse => false;
public override string Command => nameof(ZMPOP);

public override string[] GetSingleSlotRequest()
{
var ssk = GetSingleSlotKeys;
return ["3", ssk[0], ssk[1], ssk[2], "MIN", "COUNT", "1"];
}

public override string[] GetCrossSlotRequest()
{
var csk = GetCrossSlotKeys;
return ["3", csk[0], csk[1], csk[2], "MIN", "COUNT", "1"];
}

public override ArraySegment<string>[] SetupSingleSlotRequest()
{
var ssk = GetSingleSlotKeys;
var setup = new ArraySegment<string>[3];
setup[0] = new ArraySegment<string>(["ZADD", ssk[1], "1", "a"]);
setup[1] = new ArraySegment<string>(["ZADD", ssk[2], "2", "b"]);
setup[2] = new ArraySegment<string>(["ZADD", ssk[3], "3", "c"]);
return setup;
}
}

#endregion

#region HashCommands
Expand Down
Loading

0 comments on commit b882e14

Please sign in to comment.