Skip to content

Commit

Permalink
Fixes and code cleanup.
Browse files Browse the repository at this point in the history
  • Loading branch information
Yoganand Rajasekaran committed May 10, 2024
1 parent 5feaf66 commit 233984d
Showing 1 changed file with 24 additions and 23 deletions.
47 changes: 24 additions & 23 deletions libs/storage/Tsavorite/cs/src/core/Index/Recovery/Recovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ public struct LogFileInfo

public partial class TsavoriteKV<Key, Value> : TsavoriteBase
{
private const long NoPageFreed = -1;

/// <summary>
/// GetLatestCheckpointTokens
/// </summary>
Expand Down Expand Up @@ -449,11 +451,11 @@ private long InternalRecover(IndexCheckpointInfo recoveredICInfo, HybridLogCheck
// Then recover snapshot into mutable region
var snapshotLastFreedPage = RecoverHybridLogFromSnapshotFile(recoveredHLCInfo.info.flushedLogicalAddress, recoverFromAddress, recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.snapshotStartFlushedLogicalAddress,
recoveredHLCInfo.info.snapshotFinalLogicalAddress, recoveredHLCInfo.info.nextVersion, recoveredHLCInfo.info.guid, options, recoveredHLCInfo.deltaLog, recoverTo);
if (snapshotLastFreedPage != -1) lastFreedPage = snapshotLastFreedPage;
if (snapshotLastFreedPage != NoPageFreed) lastFreedPage = snapshotLastFreedPage;
readOnlyAddress = recoveredHLCInfo.info.flushedLogicalAddress;
}

DoPostRecovery(recoveredICInfo, recoveredHLCInfo, tailAddress, ref headAddress, ref readOnlyAddress, ref lastFreedPage);
DoPostRecovery(recoveredICInfo, recoveredHLCInfo, tailAddress, ref headAddress, ref readOnlyAddress, lastFreedPage);
return recoveredHLCInfo.info.version;
}

Expand Down Expand Up @@ -494,19 +496,19 @@ private async ValueTask<long> InternalRecoverAsync(IndexCheckpointInfo recovered
// Then recover snapshot into mutable region
var snapshotLastFreedPage = await RecoverHybridLogFromSnapshotFileAsync(recoveredHLCInfo.info.flushedLogicalAddress, recoverFromAddress, recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.snapshotStartFlushedLogicalAddress,
recoveredHLCInfo.info.snapshotFinalLogicalAddress, recoveredHLCInfo.info.nextVersion, recoveredHLCInfo.info.guid, options, recoveredHLCInfo.deltaLog, recoverTo, cancellationToken).ConfigureAwait(false);
if (snapshotLastFreedPage != -1) lastFreedPage = snapshotLastFreedPage;
if (snapshotLastFreedPage != NoPageFreed) lastFreedPage = snapshotLastFreedPage;
readOnlyAddress = recoveredHLCInfo.info.flushedLogicalAddress;
}

DoPostRecovery(recoveredICInfo, recoveredHLCInfo, tailAddress, ref headAddress, ref readOnlyAddress, ref lastFreedPage);
DoPostRecovery(recoveredICInfo, recoveredHLCInfo, tailAddress, ref headAddress, ref readOnlyAddress, lastFreedPage);
return recoveredHLCInfo.info.version;
}

private void DoPostRecovery(IndexCheckpointInfo recoveredICInfo, HybridLogCheckpointInfo recoveredHLCInfo, long tailAddress, ref long headAddress, ref long readOnlyAddress, ref long lastFreedPage)
private void DoPostRecovery(IndexCheckpointInfo recoveredICInfo, HybridLogCheckpointInfo recoveredHLCInfo, long tailAddress, ref long headAddress, ref long readOnlyAddress, long lastFreedPage)
{
// Adjust head and read-only address post-recovery
var _head = (1 + (tailAddress >> hlog.LogPageSizeBits) - (hlog.GetCapacityNumPages() - hlog.MinEmptyPageCount)) << hlog.LogPageSizeBits;
if (lastFreedPage != -1)
if (lastFreedPage != NoPageFreed)
{
var nextAddress = (lastFreedPage + 1) << hlog.LogPageSizeBits;
if (_head < nextAddress)
Expand Down Expand Up @@ -628,19 +630,18 @@ private bool SetRecoveryPageRanges(HybridLogCheckpointInfo recoveredHLCInfo, int
/// We free these 14 pages, leaving 18 allocated, and then read 2, which fills up usableCapacity.
/// The beg, end can only be zero on the first pass through the buffer, as the page number continuously increases
/// </summary>
private void FreePagesBeyondUsableCapacity(long startPage, int capacity, int usableCapacity, int pagesToRead, RecoveryStatus recoveryStatus, ref long lastFreedPage)
private void FreePagesBeyondUsableCapacity(long startPage, int capacity, int usableCapacity, int pagesToRead, RecoveryStatus recoveryStatus)
{
var beg = Math.Max(0, startPage - capacity);
var end = Math.Max(0, startPage - (usableCapacity - pagesToRead));

for (var page = beg; page < end; page++)
{
var pageIndex = hlog.GetPageIndexForPage(page);
recoveryStatus.WaitFlush(pageIndex);
if (hlog.IsAllocated(pageIndex))
{
recoveryStatus.WaitFlush(pageIndex);
hlog.EvictPage(page);
lastFreedPage = page;
}
}
}
Expand All @@ -649,7 +650,7 @@ private void ReadPagesWithMemoryConstraint(long endAddress, int capacity, Recove
{
// Before reading in additional pages, make sure that any previously allocated pages that would violate the memory size
// constraint are freed.
FreePagesBeyondUsableCapacity(startPage: page, capacity: capacity, usableCapacity: capacity - hlog.MinEmptyPageCount, pagesToRead: numPagesToRead, recoveryStatus, ref lastFreedPage);
FreePagesBeyondUsableCapacity(startPage: page, capacity: capacity, usableCapacity: capacity - hlog.MinEmptyPageCount, pagesToRead: numPagesToRead, recoveryStatus);

if (hlog.IsSizeBeyondLimit != null)
{
Expand Down Expand Up @@ -681,7 +682,7 @@ private void ReadPagesWithMemoryConstraint(long endAddress, int capacity, Recove
private (long ReadEndPage, long LastFreedPage) ReadPagesForRecovery(long untilAddress, RecoveryStatus recoveryStatus, long endPage, int capacity, int numPagesToReadPerIteration, long page)
{
var readEndPage = Math.Min(page + numPagesToReadPerIteration, endPage);
long lastFreedPage = -1;
long lastFreedPage = NoPageFreed;
if (page < readEndPage)
{
var numPagesToRead = (int)(readEndPage - page);
Expand All @@ -699,7 +700,7 @@ private void ReadPagesWithMemoryConstraint(long endAddress, int capacity, Recove
private async ValueTask<(long ReadEndPage, long LastFreedPage)> ReadPagesForRecoveryAsync(long untilAddress, RecoveryStatus recoveryStatus, long endPage, int capacity, int numPagesToReadPerIteration, long page, CancellationToken cancellationToken)
{
var readEndPage = Math.Min(page + numPagesToReadPerIteration, endPage);
long lastFreedPage = -1;
long lastFreedPage = NoPageFreed;
if (page < readEndPage)
{
var numPagesToRead = (int)(readEndPage - page);
Expand All @@ -716,7 +717,7 @@ private void ReadPagesWithMemoryConstraint(long endAddress, int capacity, Recove

private long RecoverHybridLog(long scanFromAddress, long recoverFromAddress, long untilAddress, long nextVersion, CheckpointType checkpointType, RecoveryOptions options)
{
long lastFreedPage = -1;
long lastFreedPage = NoPageFreed;
if (untilAddress <= scanFromAddress)
return lastFreedPage;
var recoveryStatus = GetPageRangesToRead(scanFromAddress, untilAddress, checkpointType, out long startPage, out long endPage, out int capacity, out int numPagesToReadPerIteration);
Expand All @@ -725,7 +726,7 @@ private long RecoverHybridLog(long scanFromAddress, long recoverFromAddress, lon
{
var result = ReadPagesForRecovery(untilAddress, recoveryStatus, endPage, capacity, numPagesToReadPerIteration, page);
var end = result.ReadEndPage;
if (result.LastFreedPage != -1) lastFreedPage = result.LastFreedPage;
if (result.LastFreedPage != NoPageFreed) lastFreedPage = result.LastFreedPage;

for (var p = page; p < end; p++)
{
Expand All @@ -746,7 +747,7 @@ private long RecoverHybridLog(long scanFromAddress, long recoverFromAddress, lon

private async ValueTask<long> RecoverHybridLogAsync(long scanFromAddress, long recoverFromAddress, long untilAddress, long nextVersion, CheckpointType checkpointType, RecoveryOptions options, CancellationToken cancellationToken)
{
long lastFreedPage = -1;
long lastFreedPage = NoPageFreed;
if (untilAddress <= scanFromAddress)
return lastFreedPage;
var recoveryStatus = GetPageRangesToRead(scanFromAddress, untilAddress, checkpointType, out long startPage, out long endPage, out int capacity, out int numPagesToReadPerIteration);
Expand All @@ -755,7 +756,7 @@ private async ValueTask<long> RecoverHybridLogAsync(long scanFromAddress, long r
{
var result = await ReadPagesForRecoveryAsync(untilAddress, recoveryStatus, endPage, capacity, numPagesToReadPerIteration, page, cancellationToken).ConfigureAwait(false);
var end = result.ReadEndPage;
if (result.LastFreedPage != -1) lastFreedPage = result.LastFreedPage;
if (result.LastFreedPage != NoPageFreed) lastFreedPage = result.LastFreedPage;

for (var p = page; p < end; p++)
{
Expand Down Expand Up @@ -847,14 +848,14 @@ private async ValueTask WaitUntilAllPagesHaveBeenFlushedAsync(long startPage, lo

private long RecoverHybridLogFromSnapshotFile(long scanFromAddress, long recoverFromAddress, long untilAddress, long snapshotStartAddress, long snapshotEndAddress, long nextVersion, Guid guid, RecoveryOptions options, DeltaLog deltaLog, long recoverTo)
{
long lastFreedPage = -1;
long lastFreedPage = NoPageFreed;
GetSnapshotPageRangesToRead(scanFromAddress, untilAddress, snapshotStartAddress, snapshotEndAddress, guid, out long startPage, out long endPage, out long snapshotEndPage, out int capacity, out var recoveryStatus, out int numPagesToReadPerIteration);

for (long page = startPage; page < endPage; page += numPagesToReadPerIteration)
{
var result = ReadPagesForRecovery(snapshotEndAddress, recoveryStatus, snapshotEndPage, capacity, numPagesToReadPerIteration, page);
var end = result.ReadEndPage;
if (result.LastFreedPage != -1) lastFreedPage = result.LastFreedPage;
var end = Math.Min(page + numPagesToReadPerIteration, endPage);
if (result.LastFreedPage != NoPageFreed) lastFreedPage = result.LastFreedPage;
for (long p = page; p < end; p++)
{
int pageIndex = hlog.GetPageIndexForPage(p);
Expand Down Expand Up @@ -886,14 +887,15 @@ private long RecoverHybridLogFromSnapshotFile(long scanFromAddress, long recover

private async ValueTask<long> RecoverHybridLogFromSnapshotFileAsync(long scanFromAddress, long recoverFromAddress, long untilAddress, long snapshotStartAddress, long snapshotEndAddress, long nextVersion, Guid guid, RecoveryOptions options, DeltaLog deltaLog, long recoverTo, CancellationToken cancellationToken)
{
long lastFreedPage = -1;
long lastFreedPage = NoPageFreed;
GetSnapshotPageRangesToRead(scanFromAddress, untilAddress, snapshotStartAddress, snapshotEndAddress, guid, out long startPage, out long endPage, out long snapshotEndPage, out int capacity, out var recoveryStatus, out int numPagesToReadPerIteration);

for (long page = startPage; page < endPage; page += numPagesToReadPerIteration)
{
var result = await ReadPagesForRecoveryAsync(snapshotEndAddress, recoveryStatus, snapshotEndPage, capacity, numPagesToReadPerIteration, page, cancellationToken).ConfigureAwait(false);
var end = result.ReadEndPage;
if (result.LastFreedPage != -1) lastFreedPage = result.LastFreedPage;

var end = Math.Min(page + numPagesToReadPerIteration, endPage);
if (result.LastFreedPage != NoPageFreed) lastFreedPage = result.LastFreedPage;

for (long p = page; p < end; p++)
{
Expand All @@ -917,7 +919,6 @@ private async ValueTask<long> RecoverHybridLogFromSnapshotFileAsync(long scanFro
}

ApplyDelta(scanFromAddress, recoverFromAddress, untilAddress, nextVersion, options, deltaLog, recoverTo, endPage, snapshotEndPage, capacity, numPagesToReadPerIteration, recoveryStatus, page, end);
return lastFreedPage;
}

await WaitUntilAllPagesHaveBeenFlushedAsync(startPage, endPage, recoveryStatus, cancellationToken).ConfigureAwait(false);
Expand Down

0 comments on commit 233984d

Please sign in to comment.