Skip to content

Commit

Permalink
- Rename IFunctions to ISessionFunctions (in prep for StoreFunctions)
Browse files Browse the repository at this point in the history
- Clean up ITsavoriteSession to SessionFunctionsWrapper, minimizing the per-Context code to just the locking portion
- remove RUMD (Read/Upsert/rMw/Delete) functionality from ClientSession; users should use one of the 4 Contexts (Basic, Lockable, Unsafe, LockableUnsafe) for all such actions. This changed a lot of tests
  • Loading branch information
TedHartMS committed May 31, 2024
1 parent 2a923ae commit 198d329
Show file tree
Hide file tree
Showing 138 changed files with 2,295 additions and 2,428 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public bool SingleReader(ref SpanByte key, ref SpanByte value, RecordMetadata re
cursorRecordResult = CursorRecordResult.Accept; // default; not used here
var s = HashSlotUtils.HashSlot(key.ToPointer(), key.Length);
if (slots.Contains(s))
session.Delete(key);
session.BasicContext.Delete(key);
return true;
}

Expand Down
45 changes: 25 additions & 20 deletions libs/server/AOF/AofProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ public sealed unsafe partial class AofProcessor
/// Session for main store
/// </summary>
readonly ClientSession<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainStoreFunctions> session = null;
readonly BasicContext<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainStoreFunctions> sessionBasicContext;

/// <summary>
/// Session for object store
/// </summary>
readonly ClientSession<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long, ObjectStoreFunctions> objectStoreSession = null;
readonly BasicContext<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long, ObjectStoreFunctions> objectStoreSessionBasicContext;

readonly Dictionary<int, List<byte[]>> inflightTxns;
readonly byte[] buffer;
Expand Down Expand Up @@ -76,7 +78,10 @@ public AofProcessor(
this.respServerSession = new RespServerSession(null, replayAofStoreWrapper, null);

session = respServerSession.storageSession.session;
sessionBasicContext = session.BasicContext;
objectStoreSession = respServerSession.storageSession.objectStoreSession;
if (objectStoreSession is not null)
objectStoreSessionBasicContext = objectStoreSession.BasicContext;

inflightTxns = new Dictionary<int, List<byte[]>>();
buffer = new byte[BufferSizeUtils.ServerBufferSize(new MaxSizeSettings())];
Expand Down Expand Up @@ -232,22 +237,22 @@ private unsafe bool ReplayOp(byte* entryPtr)
switch (header.opType)
{
case AofEntryType.StoreUpsert:
StoreUpsert(session, entryPtr);
StoreUpsert(sessionBasicContext, entryPtr);
break;
case AofEntryType.StoreRMW:
StoreRMW(session, entryPtr);
StoreRMW(sessionBasicContext, entryPtr);
break;
case AofEntryType.StoreDelete:
StoreDelete(session, entryPtr);
StoreDelete(sessionBasicContext, entryPtr);
break;
case AofEntryType.ObjectStoreRMW:
ObjectStoreRMW(objectStoreSession, entryPtr, bufferPtr, buffer.Length);
ObjectStoreRMW(objectStoreSessionBasicContext, entryPtr, bufferPtr, buffer.Length);
break;
case AofEntryType.ObjectStoreUpsert:
ObjectStoreUpsert(objectStoreSession, storeWrapper.GarnetObjectSerializer, entryPtr, bufferPtr, buffer.Length);
ObjectStoreUpsert(objectStoreSessionBasicContext, storeWrapper.GarnetObjectSerializer, entryPtr, bufferPtr, buffer.Length);
break;
case AofEntryType.ObjectStoreDelete:
ObjectStoreDelete(objectStoreSession, entryPtr);
ObjectStoreDelete(objectStoreSessionBasicContext, entryPtr);
break;
case AofEntryType.StoredProcedure:
ref var input = ref Unsafe.AsRef<SpanByte>(entryPtr + sizeof(AofHeader));
Expand All @@ -259,37 +264,37 @@ private unsafe bool ReplayOp(byte* entryPtr)
return true;
}

static unsafe void StoreUpsert(ClientSession<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainStoreFunctions> session, byte* ptr)
static unsafe void StoreUpsert(BasicContext<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainStoreFunctions> basicContext, byte* ptr)
{
ref var key = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader));
ref var input = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader) + key.TotalSize);
ref var value = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader) + key.TotalSize + input.TotalSize);

SpanByteAndMemory output = default;
session.Upsert(ref key, ref input, ref value, ref output);
basicContext.Upsert(ref key, ref input, ref value, ref output);
if (!output.IsSpanByte)
output.Memory.Dispose();
}

static unsafe void StoreRMW(ClientSession<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainStoreFunctions> session, byte* ptr)
static unsafe void StoreRMW(BasicContext<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainStoreFunctions> basicContext, byte* ptr)
{
byte* pbOutput = stackalloc byte[32];
ref var key = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader));
ref var input = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader) + key.TotalSize);
var output = new SpanByteAndMemory(pbOutput, 32);
if (session.RMW(ref key, ref input, ref output).IsPending)
session.CompletePending(true);
if (basicContext.RMW(ref key, ref input, ref output).IsPending)
basicContext.CompletePending(true);
if (!output.IsSpanByte)
output.Memory.Dispose();
}

static unsafe void StoreDelete(ClientSession<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainStoreFunctions> session, byte* ptr)
static unsafe void StoreDelete(BasicContext<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainStoreFunctions> basicContext, byte* ptr)
{
ref var key = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader));
session.Delete(ref key);
basicContext.Delete(ref key);
}

static unsafe void ObjectStoreUpsert(ClientSession<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long, ObjectStoreFunctions> session, GarnetObjectSerializer garnetObjectSerializer, byte* ptr, byte* outputPtr, int outputLength)
static unsafe void ObjectStoreUpsert(BasicContext<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long, ObjectStoreFunctions> basicContext, GarnetObjectSerializer garnetObjectSerializer, byte* ptr, byte* outputPtr, int outputLength)
{
ref var key = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader));
var keyB = key.ToByteArray();
Expand All @@ -299,29 +304,29 @@ static unsafe void ObjectStoreUpsert(ClientSession<byte[], IGarnetObject, SpanBy
var valB = garnetObjectSerializer.Deserialize(value.ToByteArray());

var output = new GarnetObjectStoreOutput { spanByteAndMemory = new(outputPtr, outputLength) };
session.Upsert(ref keyB, ref valB);
basicContext.Upsert(ref keyB, ref valB);
if (!output.spanByteAndMemory.IsSpanByte)
output.spanByteAndMemory.Memory.Dispose();
}

static unsafe void ObjectStoreRMW(ClientSession<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long, ObjectStoreFunctions> session, byte* ptr, byte* outputPtr, int outputLength)
static unsafe void ObjectStoreRMW(BasicContext<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long, ObjectStoreFunctions> basicContext, byte* ptr, byte* outputPtr, int outputLength)
{
ref var key = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader));
var keyB = key.ToByteArray();

ref var input = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader) + key.TotalSize);
var output = new GarnetObjectStoreOutput { spanByteAndMemory = new(outputPtr, outputLength) };
if (session.RMW(ref keyB, ref input, ref output).IsPending)
session.CompletePending(true);
if (basicContext.RMW(ref keyB, ref input, ref output).IsPending)
basicContext.CompletePending(true);
if (!output.spanByteAndMemory.IsSpanByte)
output.spanByteAndMemory.Memory.Dispose();
}

static unsafe void ObjectStoreDelete(ClientSession<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long, ObjectStoreFunctions> session, byte* ptr)
static unsafe void ObjectStoreDelete(BasicContext<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long, ObjectStoreFunctions> basicContext, byte* ptr)
{
ref var key = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader));
var keyB = key.ToByteArray();
session.Delete(ref keyB);
basicContext.Delete(ref keyB);
}

/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Providers/TsavoriteKVProviderBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace Garnet.server
/// [K, V, I, O, F, P]
/// </summary>
public abstract class TsavoriteKVProviderBase<Key, Value, Input, Output, Functions, ParameterSerializer> : ISessionProvider
where Functions : IFunctions<Key, Value, Input, Output, long>
where Functions : ISessionFunctions<Key, Value, Input, Output, long>
where ParameterSerializer : IServerSerializer<Key, Value, Input, Output>
{
/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Storage/Functions/MainStore/CallbackMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Garnet.server
/// <summary>
/// Callback functions for main store
/// </summary>
public readonly unsafe partial struct MainStoreFunctions : IFunctions<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
public readonly unsafe partial struct MainStoreFunctions : ISessionFunctions<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
{
/// <inheritdoc />
public void ReadCompletionCallback(ref SpanByte key, ref SpanByte input, ref SpanByteAndMemory output, long ctx, Status status, RecordMetadata recordMetadata)
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Storage/Functions/MainStore/DeleteMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Garnet.server
/// <summary>
/// Callback functions for main store
/// </summary>
public readonly unsafe partial struct MainStoreFunctions : IFunctions<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
public readonly unsafe partial struct MainStoreFunctions : ISessionFunctions<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
{
/// <inheritdoc />
public bool SingleDeleter(ref SpanByte key, ref SpanByte value, ref DeleteInfo deleteInfo, ref RecordInfo recordInfo)
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Storage/Functions/MainStore/DisposeMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Garnet.server
/// <summary>
/// Callback functions for main store
/// </summary>
public readonly unsafe partial struct MainStoreFunctions : IFunctions<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
public readonly unsafe partial struct MainStoreFunctions : ISessionFunctions<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
{
/// <inheritdoc />
public void DisposeSingleWriter(ref SpanByte key, ref SpanByte input, ref SpanByte src, ref SpanByte dst, ref SpanByteAndMemory output, ref UpsertInfo upsertInfo, WriteReason reason)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Garnet.server
/// <summary>
/// Callback functions for main store
/// </summary>
public readonly unsafe partial struct MainStoreFunctions : IFunctions<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
public readonly unsafe partial struct MainStoreFunctions : ISessionFunctions<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
{
readonly FunctionsState functionsState;

Expand Down
2 changes: 1 addition & 1 deletion libs/server/Storage/Functions/MainStore/PrivateMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace Garnet.server
/// <summary>
/// Callback functions for main store
/// </summary>
public readonly unsafe partial struct MainStoreFunctions : IFunctions<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
public readonly unsafe partial struct MainStoreFunctions : ISessionFunctions<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
{
static void CopyTo(ref SpanByte src, ref SpanByteAndMemory dst, MemoryPool<byte> memoryPool)
{
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Storage/Functions/MainStore/RMWMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace Garnet.server
/// <summary>
/// Callback functions for main store
/// </summary>
public readonly unsafe partial struct MainStoreFunctions : IFunctions<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
public readonly unsafe partial struct MainStoreFunctions : ISessionFunctions<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
{
/// <inheritdoc />
public bool NeedInitialUpdate(ref SpanByte key, ref SpanByte input, ref SpanByteAndMemory output, ref RMWInfo rmwInfo)
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Storage/Functions/MainStore/ReadMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace Garnet.server
/// <summary>
/// Callback functions for main store
/// </summary>
public readonly unsafe partial struct MainStoreFunctions : IFunctions<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
public readonly unsafe partial struct MainStoreFunctions : ISessionFunctions<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
{
/// <inheritdoc />
public bool SingleReader(ref SpanByte key, ref SpanByte input, ref SpanByte value, ref SpanByteAndMemory dst, ref ReadInfo readInfo)
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Storage/Functions/MainStore/UpsertMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Garnet.server
/// <summary>
/// Callback functions for main store
/// </summary>
public readonly unsafe partial struct MainStoreFunctions : IFunctions<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
public readonly unsafe partial struct MainStoreFunctions : ISessionFunctions<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
{
/// <inheritdoc />
public bool SingleWriter(ref SpanByte key, ref SpanByte input, ref SpanByte src, ref SpanByte dst, ref SpanByteAndMemory output, ref UpsertInfo upsertInfo, WriteReason reason, ref RecordInfo recordInfo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace Garnet.server
/// <summary>
/// Callback functions for main store
/// </summary>
public readonly unsafe partial struct MainStoreFunctions : IFunctions<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
public readonly unsafe partial struct MainStoreFunctions : ISessionFunctions<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
{
/// <summary>
/// Parse ASCII byte array into long and validate that only contains ASCII decimal characters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Garnet.server
/// <summary>
/// Object store functions
/// </summary>
public readonly unsafe partial struct ObjectStoreFunctions : IFunctions<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long>
public readonly unsafe partial struct ObjectStoreFunctions : ISessionFunctions<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long>
{
/// <inheritdoc />
public void ReadCompletionCallback(ref byte[] key, ref SpanByte input, ref GarnetObjectStoreOutput output, long ctx, Status status, RecordMetadata recordMetadata)
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Storage/Functions/ObjectStore/DeleteMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Garnet.server
/// <summary>
/// Object store functions
/// </summary>
public readonly unsafe partial struct ObjectStoreFunctions : IFunctions<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long>
public readonly unsafe partial struct ObjectStoreFunctions : ISessionFunctions<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long>
{
/// <inheritdoc />
public bool SingleDeleter(ref byte[] key, ref IGarnetObject value, ref DeleteInfo deleteInfo, ref RecordInfo recordInfo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Garnet.server
/// <summary>
/// Object store functions
/// </summary>
public readonly unsafe partial struct ObjectStoreFunctions : IFunctions<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long>
public readonly unsafe partial struct ObjectStoreFunctions : ISessionFunctions<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long>
{
/// <inheritdoc />
public void DisposeSingleWriter(ref byte[] key, ref SpanByte input, ref IGarnetObject src, ref IGarnetObject dst, ref GarnetObjectStoreOutput output, ref UpsertInfo upsertInfo, WriteReason reason)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Garnet.server
/// <summary>
/// Object store functions
/// </summary>
public readonly unsafe partial struct ObjectStoreFunctions : IFunctions<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long>
public readonly unsafe partial struct ObjectStoreFunctions : ISessionFunctions<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long>
{
readonly FunctionsState functionsState;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace Garnet.server
/// <summary>
/// Object store functions
/// </summary>
public readonly unsafe partial struct ObjectStoreFunctions : IFunctions<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long>
public readonly unsafe partial struct ObjectStoreFunctions : ISessionFunctions<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long>
{
/// <summary>
/// Logging upsert from
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Storage/Functions/ObjectStore/RMWMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Garnet.server
/// <summary>
/// Object store functions
/// </summary>
public readonly unsafe partial struct ObjectStoreFunctions : IFunctions<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long>
public readonly unsafe partial struct ObjectStoreFunctions : ISessionFunctions<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long>
{
/// <inheritdoc />
public bool NeedInitialUpdate(ref byte[] key, ref SpanByte input, ref GarnetObjectStoreOutput output, ref RMWInfo rmwInfo)
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Storage/Functions/ObjectStore/ReadMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace Garnet.server
/// <summary>
/// Object store functions
/// </summary>
public readonly unsafe partial struct ObjectStoreFunctions : IFunctions<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long>
public readonly unsafe partial struct ObjectStoreFunctions : ISessionFunctions<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long>
{
/// <inheritdoc />
public bool SingleReader(ref byte[] key, ref SpanByte input, ref IGarnetObject value, ref GarnetObjectStoreOutput dst, ref ReadInfo readInfo)
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Storage/Functions/ObjectStore/UpsertMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Garnet.server
/// <summary>
/// Object store functions
/// </summary>
public readonly unsafe partial struct ObjectStoreFunctions : IFunctions<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long>
public readonly unsafe partial struct ObjectStoreFunctions : ISessionFunctions<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long>
{
/// <inheritdoc />
public bool SingleWriter(ref byte[] key, ref SpanByte input, ref IGarnetObject src, ref IGarnetObject dst, ref GarnetObjectStoreOutput output, ref UpsertInfo upsertInfo, WriteReason reason, ref RecordInfo recordInfo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace Garnet.server
/// <summary>
/// Object store functions
/// </summary>
public readonly unsafe partial struct ObjectStoreFunctions : IFunctions<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long>
public readonly unsafe partial struct ObjectStoreFunctions : ISessionFunctions<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long>
{
/// <inheritdoc/>
public int GetRMWModifiedValueLength(ref IGarnetObject value, ref SpanByte input)
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Storage/Session/MainStore/BitmapOps.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public unsafe GarnetStatus StringBitOperation(ArgSlice[] keys, BitmapOperation b
var localSrcBitmapPtr = (byte*)((IntPtr)(*(long*)outputBitmapPtr));
var len = *(int*)(outputBitmapPtr + 8);

// Keep track of pointers returned from IFunctions
// Keep track of pointers returned from ISessionFunctions
srcBitmapStartPtrs[keysFound] = localSrcBitmapPtr;
srcBitmapEndPtrs[keysFound] = localSrcBitmapPtr + len;
keysFound++;
Expand Down
Loading

0 comments on commit 198d329

Please sign in to comment.