Skip to content

Commit

Permalink
Streaming snapshot checkpoint in Tsavorite (#824)
Browse files Browse the repository at this point in the history
* Mild refactor in preparation

* updates

* updates

* updates

* fix tests to not cover steaming snapshot as it is not a traditional checkpoint

* add unit test

* fix tests

* fix test

* updates

* Support maxAddress during liveness checks

* improvements to api

* Add SetVersion API

* more checks

* add comment

* add comments
  • Loading branch information
badrishc authored Dec 13, 2024
1 parent aa004d3 commit ca02229
Show file tree
Hide file tree
Showing 32 changed files with 1,169 additions and 517 deletions.
36 changes: 24 additions & 12 deletions libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorScan.cs
Original file line number Diff line number Diff line change
Expand Up @@ -186,25 +186,28 @@ internal unsafe bool GetFromDiskAndPushToReader<TScanFunctions>(ref TKey key, re
/// <remarks>Currently we load an entire page, which while inefficient in performance, allows us to make the cursor safe (by ensuring we align to a valid record) if it is not
/// the last one returned. We could optimize this to load only the subset of a page that is pointed to by the cursor and do GetRequiredRecordSize/RetrievedFullRecord as in
/// AsyncGetFromDiskCallback. However, this would not validate the cursor and would therefore require maintaining a cursor history.</remarks>
internal abstract bool ScanCursor<TScanFunctions>(TsavoriteKV<TKey, TValue, TStoreFunctions, TAllocator> store, ScanCursorState<TKey, TValue> scanCursorState, ref long cursor, long count, TScanFunctions scanFunctions, long endAddress, bool validateCursor)
internal abstract bool ScanCursor<TScanFunctions>(TsavoriteKV<TKey, TValue, TStoreFunctions, TAllocator> store, ScanCursorState<TKey, TValue> scanCursorState, ref long cursor, long count, TScanFunctions scanFunctions, long endAddress, bool validateCursor, long maxAddress)
where TScanFunctions : IScanIteratorFunctions<TKey, TValue>;

private protected bool ScanLookup<TInput, TOutput, TScanFunctions, TScanIterator>(TsavoriteKV<TKey, TValue, TStoreFunctions, TAllocator> store,
ScanCursorState<TKey, TValue> scanCursorState, ref long cursor, long count, TScanFunctions scanFunctions, TScanIterator iter, bool validateCursor)
ScanCursorState<TKey, TValue> scanCursorState, ref long cursor, long count, TScanFunctions scanFunctions, TScanIterator iter, bool validateCursor, long maxAddress)
where TScanFunctions : IScanIteratorFunctions<TKey, TValue>
where TScanIterator : ITsavoriteScanIterator<TKey, TValue>, IPushScanIterator<TKey>
{
using var session = store.NewSession<TInput, TOutput, Empty, LogScanCursorFunctions<TInput, TOutput>>(new LogScanCursorFunctions<TInput, TOutput>());
var bContext = session.BasicContext;

if (cursor >= GetTailAddress())
goto IterationComplete;

if (cursor < BeginAddress) // This includes 0, which means to start the Scan
cursor = BeginAddress;
else if (validateCursor)
iter.SnapCursorToLogicalAddress(ref cursor);

if (!scanFunctions.OnStart(cursor, iter.EndAddress))
return false;

if (cursor >= GetTailAddress())
goto IterationComplete;

scanCursorState.Initialize(scanFunctions);

long numPending = 0;
Expand All @@ -214,7 +217,7 @@ private protected bool ScanLookup<TInput, TOutput, TScanFunctions, TScanIterator
{
ref var key = ref iter.GetKey();
ref var value = ref iter.GetValue();
var status = bContext.ConditionalScanPush(scanCursorState, recordInfo, ref key, ref value, iter.CurrentAddress, iter.NextAddress);
var status = bContext.ConditionalScanPush(scanCursorState, recordInfo, ref key, ref value, iter.CurrentAddress, iter.NextAddress, maxAddress);
if (status.IsPending)
{
++numPending;
Expand All @@ -227,13 +230,19 @@ private protected bool ScanLookup<TInput, TOutput, TScanFunctions, TScanIterator
}

// Update the cursor to point to the next record.
cursor = iter.NextAddress;
if (scanCursorState.retryLastRecord)
cursor = iter.CurrentAddress;
else
cursor = iter.NextAddress;

// Now see if we completed the enumeration.
if (scanCursorState.stop)
goto IterationComplete;
if (scanCursorState.acceptedCount >= count || scanCursorState.endBatch)
{
scanFunctions.OnStop(true, scanCursorState.acceptedCount);
return true;
}
}

// Drain any pending pushes. We have ended the iteration; there are no more records, so drop through to end it.
Expand All @@ -242,12 +251,13 @@ private protected bool ScanLookup<TInput, TOutput, TScanFunctions, TScanIterator

IterationComplete:
cursor = 0;
scanFunctions.OnStop(false, scanCursorState.acceptedCount);
return false;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal Status ConditionalScanPush<TInput, TOutput, TContext, TSessionFunctionsWrapper>(TSessionFunctionsWrapper sessionFunctions, ScanCursorState<TKey, TValue> scanCursorState, RecordInfo recordInfo,
ref TKey key, ref TValue value, long currentAddress, long minAddress)
ref TKey key, ref TValue value, long currentAddress, long minAddress, long maxAddress)
where TSessionFunctionsWrapper : ISessionFunctionsWrapper<TKey, TValue, TInput, TOutput, TContext, TStoreFunctions, TAllocator>
{
Debug.Assert(epoch.ThisInstanceProtected(), "This is called only from ScanLookup so the epoch should be protected");
Expand All @@ -259,7 +269,7 @@ internal Status ConditionalScanPush<TInput, TOutput, TContext, TSessionFunctions
do
{
// If a more recent version of the record exists, do not push this one. Start by searching in-memory.
if (sessionFunctions.Store.TryFindRecordInMainLogForConditionalOperation<TInput, TOutput, TContext, TSessionFunctionsWrapper>(sessionFunctions, ref key, ref stackCtx, currentAddress, minAddress, out internalStatus, out needIO))
if (sessionFunctions.Store.TryFindRecordInMainLogForConditionalOperation<TInput, TOutput, TContext, TSessionFunctionsWrapper>(sessionFunctions, ref key, ref stackCtx, currentAddress, minAddress, maxAddress, out internalStatus, out needIO))
return Status.CreateFound();
}
while (sessionFunctions.Store.HandleImmediateNonPendingRetryStatus<TInput, TOutput, TContext, TSessionFunctionsWrapper>(internalStatus, sessionFunctions));
Expand All @@ -270,7 +280,7 @@ internal Status ConditionalScanPush<TInput, TOutput, TContext, TSessionFunctions
{
// A more recent version of the key was not (yet) found and we need another IO to continue searching.
internalStatus = PrepareIOForConditionalScan(sessionFunctions, ref pendingContext, ref key, ref input, ref value, ref output, default,
ref stackCtx, minAddress, scanCursorState);
ref stackCtx, minAddress, maxAddress, scanCursorState);
}
else
{
Expand All @@ -288,6 +298,8 @@ internal Status ConditionalScanPush<TInput, TOutput, TContext, TSessionFunctions
Interlocked.Increment(ref scanCursorState.acceptedCount);
if ((cursorRecordResult & CursorRecordResult.EndBatch) != 0)
scanCursorState.endBatch = true;
if ((cursorRecordResult & CursorRecordResult.RetryLastRecord) != 0)
scanCursorState.retryLastRecord = true;
}
internalStatus = OperationStatus.SUCCESS;
}
Expand All @@ -298,12 +310,12 @@ internal Status ConditionalScanPush<TInput, TOutput, TContext, TSessionFunctions
internal static OperationStatus PrepareIOForConditionalScan<TInput, TOutput, TContext, TSessionFunctionsWrapper>(TSessionFunctionsWrapper sessionFunctions,
ref TsavoriteKV<TKey, TValue, TStoreFunctions, TAllocator>.PendingContext<TInput, TOutput, TContext> pendingContext,
ref TKey key, ref TInput input, ref TValue value, ref TOutput output, TContext userContext,
ref OperationStackContext<TKey, TValue, TStoreFunctions, TAllocator> stackCtx, long minAddress, ScanCursorState<TKey, TValue> scanCursorState)
ref OperationStackContext<TKey, TValue, TStoreFunctions, TAllocator> stackCtx, long minAddress, long maxAddress, ScanCursorState<TKey, TValue> scanCursorState)
where TSessionFunctionsWrapper : ISessionFunctionsWrapper<TKey, TValue, TInput, TOutput, TContext, TStoreFunctions, TAllocator>
{
// WriteReason is not surfaced for this operation, so pick anything.
var status = sessionFunctions.Store.PrepareIOForConditionalOperation(sessionFunctions, ref pendingContext, ref key, ref input, ref value, ref output,
userContext, ref stackCtx, minAddress, WriteReason.Compaction, OperationType.CONDITIONAL_SCAN_PUSH);
userContext, ref stackCtx, minAddress, maxAddress, WriteReason.Compaction, OperationType.CONDITIONAL_SCAN_PUSH);
pendingContext.scanCursorState = scanCursorState;
return status;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,10 @@ internal override bool Scan<TScanFunctions>(TsavoriteKV<TKey, TValue, TStoreFunc
/// Implementation for push-scanning Tsavorite log with a cursor, called from LogAccessor
/// </summary>
internal override bool ScanCursor<TScanFunctions>(TsavoriteKV<TKey, TValue, TStoreFunctions, BlittableAllocator<TKey, TValue, TStoreFunctions>> store,
ScanCursorState<TKey, TValue> scanCursorState, ref long cursor, long count, TScanFunctions scanFunctions, long endAddress, bool validateCursor)
ScanCursorState<TKey, TValue> scanCursorState, ref long cursor, long count, TScanFunctions scanFunctions, long endAddress, bool validateCursor, long maxAddress)
{
using BlittableScanIterator<TKey, TValue, TStoreFunctions> iter = new(store, this, cursor, endAddress, ScanBufferingMode.SinglePageBuffering, false, epoch, logger: logger);
return ScanLookup<long, long, TScanFunctions, BlittableScanIterator<TKey, TValue, TStoreFunctions>>(store, scanCursorState, ref cursor, count, scanFunctions, iter, validateCursor);
return ScanLookup<long, long, TScanFunctions, BlittableScanIterator<TKey, TValue, TStoreFunctions>>(store, scanCursorState, ref cursor, count, scanFunctions, iter, validateCursor, maxAddress);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1013,10 +1013,10 @@ internal override bool Scan<TScanFunctions>(TsavoriteKV<TKey, TValue, TStoreFunc
/// Implementation for push-scanning Tsavorite log with a cursor, called from LogAccessor
/// </summary>
internal override bool ScanCursor<TScanFunctions>(TsavoriteKV<TKey, TValue, TStoreFunctions, GenericAllocator<TKey, TValue, TStoreFunctions>> store,
ScanCursorState<TKey, TValue> scanCursorState, ref long cursor, long count, TScanFunctions scanFunctions, long endAddress, bool validateCursor)
ScanCursorState<TKey, TValue> scanCursorState, ref long cursor, long count, TScanFunctions scanFunctions, long endAddress, bool validateCursor, long maxAddress)
{
using GenericScanIterator<TKey, TValue, TStoreFunctions> iter = new(store, this, cursor, endAddress, ScanBufferingMode.SinglePageBuffering, false, epoch, logger: logger);
return ScanLookup<long, long, TScanFunctions, GenericScanIterator<TKey, TValue, TStoreFunctions>>(store, scanCursorState, ref cursor, count, scanFunctions, iter, validateCursor);
return ScanLookup<long, long, TScanFunctions, GenericScanIterator<TKey, TValue, TStoreFunctions>>(store, scanCursorState, ref cursor, count, scanFunctions, iter, validateCursor, maxAddress);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@ public enum CursorRecordResult
/// <summary>
/// End the current cursor batch (as if "count" had been met); return a valid cursor for the next ScanCursor call
/// </summary>
EndBatch = 4
EndBatch = 4,

/// <summary>
/// Retry the last record when returning a valid cursor
/// </summary>
RetryLastRecord = 8,
}

/// <summary>
Expand All @@ -42,7 +47,7 @@ public interface IScanIteratorFunctions<TKey, TValue>
/// <param name="key">Reference to the current record's key</param>
/// <param name="value">Reference to the current record's Value</param>
/// <param name="recordMetadata">Record metadata, including <see cref="RecordInfo"/> and the current record's logical address</param>
/// <param name="numberOfRecords">The number of records returned so far, including the current one.</param>
/// <param name="numberOfRecords">The number of records accepted so far, not including the current one.</param>
/// <param name="cursorRecordResult">Indicates whether the current record was accepted, or whether to end the current ScanCursor call.
/// Ignored for non-cursor Scans; set to <see cref="CursorRecordResult.Accept"/>.</param>
/// <returns>True to continue iteration, else false</returns>
Expand All @@ -52,7 +57,7 @@ public interface IScanIteratorFunctions<TKey, TValue>
/// <param name="key">Reference to the current record's key</param>
/// <param name="value">Reference to the current record's Value</param>
/// <param name="recordMetadata">Record metadata, including <see cref="RecordInfo"/> and the current record's logical address</param>
/// <param name="numberOfRecords">The number of records returned so far, including the current one.</param>
/// <param name="numberOfRecords">The number of records accepted so far, not including the current one.</param>
/// <param name="cursorRecordResult">Indicates whether the current record was accepted, or whether to end the current ScanCursor call.
/// Ignored for non-cursor Scans; set to <see cref="CursorRecordResult.Accept"/>.</param>
/// <returns>True to continue iteration, else false</returns>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System;

namespace Tsavorite.core
{
/// <summary>
/// Callback functions for streaming snapshot iteration
/// </summary>
public interface IStreamingSnapshotIteratorFunctions<TKey, TValue>
{
/// <summary>Iteration is starting.</summary>
/// <param name="checkpointToken">Checkpoint token</param>
/// <param name="currentVersion">Current version of database</param>
/// <param name="targetVersion">Target version of database</param>
/// <returns>True to continue iteration, else false</returns>
bool OnStart(Guid checkpointToken, long currentVersion, long targetVersion);

/// <summary>Next record in the streaming snapshot.</summary>
/// <param name="key">Reference to the current record's key</param>
/// <param name="value">Reference to the current record's Value</param>
/// <param name="recordMetadata">Record metadata, including <see cref="RecordInfo"/> and the current record's logical address</param>
/// <param name="numberOfRecords">The number of records returned so far, not including the current one.</param>
/// <returns>True to continue iteration, else false</returns>
bool Reader(ref TKey key, ref TValue value, RecordMetadata recordMetadata, long numberOfRecords);

/// <summary>Iteration is complete.</summary>
/// <param name="completed">If true, the iteration completed; else OnStart() or Reader() returned false to stop the iteration.</param>
/// <param name="numberOfRecords">The number of records returned before the iteration stopped.</param>
void OnStop(bool completed, long numberOfRecords);

/// <summary>An exception was thrown on iteration (likely during <see name="Reader"/>.</summary>
/// <param name="exception">The exception that was thrown.</param>
/// <param name="numberOfRecords">The number of records returned before the exception.</param>
void OnException(Exception exception, long numberOfRecords);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@ namespace Tsavorite.core
internal sealed class ScanCursorState<TKey, TValue>
{
internal IScanIteratorFunctions<TKey, TValue> functions;
internal long acceptedCount; // Number of records pushed to and accepted by the caller
internal bool endBatch; // End the batch (but return a valid cursor for the next batch, as of "count" records had been returned)
internal bool stop; // Stop the operation (as if all records in the db had been returned)
internal long acceptedCount; // Number of records pushed to and accepted by the caller
internal bool endBatch; // End the batch (but return a valid cursor for the next batch, as if "count" records had been returned)
internal bool retryLastRecord; // Retry the last record when returning a valid cursor
internal bool stop; // Stop the operation (as if all records in the db had been returned)

internal void Initialize(IScanIteratorFunctions<TKey, TValue> scanIteratorFunctions)
{
functions = scanIteratorFunctions;
acceptedCount = 0;
endBatch = false;
retryLastRecord = false;
stop = false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,10 +359,10 @@ internal override bool Scan<TScanFunctions>(TsavoriteKV<SpanByte, SpanByte, TSto
/// Implementation for push-scanning Tsavorite log with a cursor, called from LogAccessor
/// </summary>
internal override bool ScanCursor<TScanFunctions>(TsavoriteKV<SpanByte, SpanByte, TStoreFunctions, SpanByteAllocator<TStoreFunctions>> store,
ScanCursorState<SpanByte, SpanByte> scanCursorState, ref long cursor, long count, TScanFunctions scanFunctions, long endAddress, bool validateCursor)
ScanCursorState<SpanByte, SpanByte> scanCursorState, ref long cursor, long count, TScanFunctions scanFunctions, long endAddress, bool validateCursor, long maxAddress)
{
using SpanByteScanIterator<TStoreFunctions> iter = new(store, this, cursor, endAddress, ScanBufferingMode.SinglePageBuffering, false, epoch, logger: logger);
return ScanLookup<SpanByte, SpanByteAndMemory, TScanFunctions, SpanByteScanIterator<TStoreFunctions>>(store, scanCursorState, ref cursor, count, scanFunctions, iter, validateCursor);
return ScanLookup<SpanByte, SpanByteAndMemory, TScanFunctions, SpanByteScanIterator<TStoreFunctions>>(store, scanCursorState, ref cursor, count, scanFunctions, iter, validateCursor, maxAddress);
}

/// <summary>
Expand Down
Loading

0 comments on commit ca02229

Please sign in to comment.