Skip to content

Commit

Permalink
Free pages after read completion.
Browse files Browse the repository at this point in the history
  • Loading branch information
Yoganand Rajasekaran committed May 10, 2024
1 parent f0c6a34 commit 71eb38a
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 56 deletions.
98 changes: 60 additions & 38 deletions libs/storage/Tsavorite/cs/src/core/Index/Recovery/Recovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -646,31 +646,12 @@ private void FreePagesBeyondUsableCapacity(long startPage, int capacity, int usa
}
}

private void ReadPagesWithMemoryConstraint(long endAddress, int capacity, RecoveryStatus recoveryStatus, long page, long endPage, int numPagesToRead, ref long lastFreedPage)
private void ReadPagesWithMemoryConstraint(long endAddress, int capacity, RecoveryStatus recoveryStatus, long page, long endPage, int numPagesToRead)
{
// Before reading in additional pages, make sure that any previously allocated pages that would violate the memory size
// constraint are freed.
FreePagesBeyondUsableCapacity(startPage: page, capacity: capacity, usableCapacity: capacity - hlog.MinEmptyPageCount, pagesToRead: numPagesToRead, recoveryStatus);

if (hlog.IsSizeBeyondLimit != null)
{
// free up additional pages, one at a time, to bring memory usage under control
var scanPage = Math.Max(0, page - capacity); // start with the earliest possible page

while (scanPage < page && hlog.IsSizeBeyondLimit())
{
var pageIndex = hlog.GetPageIndexForPage(scanPage);
if (hlog.IsAllocated(pageIndex))
{
recoveryStatus.WaitFlush(pageIndex);
hlog.EvictPage(scanPage);
lastFreedPage = scanPage;
}

scanPage++;
}
}

// Issue request to read pages as much as possible
for (var p = page; p < endPage; p++) recoveryStatus.readStatus[hlog.GetPageIndexForPage(p)] = ReadStatus.Pending;
hlog.AsyncReadPagesFromDevice(page, numPagesToRead, endAddress,
Expand All @@ -679,10 +660,29 @@ private void ReadPagesWithMemoryConstraint(long endAddress, int capacity, Recove
recoveryStatus.recoveryDevice, recoveryStatus.objectLogRecoveryDevice);
}

private (long ReadEndPage, long LastFreedPage) ReadPagesForRecovery(long untilAddress, RecoveryStatus recoveryStatus, long endPage, int capacity, int numPagesToReadPerIteration, long page)
private long FreePagesToLimitHeapMemory(RecoveryStatus recoveryStatus, long page)
{
var readEndPage = Math.Min(page + numPagesToReadPerIteration, endPage);
long lastFreedPage = NoPageFreed;
if (hlog.IsSizeBeyondLimit == null) return lastFreedPage;

// free up additional pages, one at a time, to bring memory usage under control starting with the earliest possible page
for (var p = Math.Max(0, page - recoveryStatus.usableCapacity + 1); p < page && hlog.IsSizeBeyondLimit(); p++)
{
var pageIndex = hlog.GetPageIndexForPage(p);
if (hlog.IsAllocated(pageIndex))
{
recoveryStatus.WaitFlush(pageIndex);
hlog.EvictPage(p);
lastFreedPage = p;
}
}

return lastFreedPage;
}

private long ReadPagesForRecovery(long untilAddress, RecoveryStatus recoveryStatus, long endPage, int capacity, int numPagesToReadPerIteration, long page)
{
var readEndPage = Math.Min(page + numPagesToReadPerIteration, endPage);
if (page < readEndPage)
{
var numPagesToRead = (int)(readEndPage - page);
Expand All @@ -691,16 +691,15 @@ private void ReadPagesWithMemoryConstraint(long endAddress, int capacity, Recove
// this must be done in batches of "all flushes' followed by "all reads" to ensure proper sequencing of reads when
// usableCapacity != capacity (and thus the page-read index is not equal to the page-flush index).
WaitUntilAllPagesHaveBeenFlushed(page, readEndPage, recoveryStatus);
ReadPagesWithMemoryConstraint(untilAddress, capacity, recoveryStatus, page, readEndPage, numPagesToRead, ref lastFreedPage);
ReadPagesWithMemoryConstraint(untilAddress, capacity, recoveryStatus, page, readEndPage, numPagesToRead);
}

return (readEndPage, lastFreedPage);
return readEndPage;
}

private async ValueTask<(long ReadEndPage, long LastFreedPage)> ReadPagesForRecoveryAsync(long untilAddress, RecoveryStatus recoveryStatus, long endPage, int capacity, int numPagesToReadPerIteration, long page, CancellationToken cancellationToken)
private async ValueTask<long> ReadPagesForRecoveryAsync(long untilAddress, RecoveryStatus recoveryStatus, long endPage, int capacity, int numPagesToReadPerIteration, long page, CancellationToken cancellationToken)
{
var readEndPage = Math.Min(page + numPagesToReadPerIteration, endPage);
long lastFreedPage = NoPageFreed;
if (page < readEndPage)
{
var numPagesToRead = (int)(readEndPage - page);
Expand All @@ -709,10 +708,30 @@ private void ReadPagesWithMemoryConstraint(long endAddress, int capacity, Recove
// this must be done in batches of "all flushes' followed by "all reads" to ensure proper sequencing of reads when
// usableCapacity != capacity (and thus the page-read index is not equal to the page-flush index).
await WaitUntilAllPagesHaveBeenFlushedAsync(page, readEndPage, recoveryStatus, cancellationToken).ConfigureAwait(false);
ReadPagesWithMemoryConstraint(untilAddress, capacity, recoveryStatus, page, readEndPage, numPagesToRead, ref lastFreedPage);
ReadPagesWithMemoryConstraint(untilAddress, capacity, recoveryStatus, page, readEndPage, numPagesToRead);
}

return readEndPage;
}

private async Task<long> FreePagesToLimitHeapMemoryAsync(RecoveryStatus recoveryStatus, long page, CancellationToken cancellationToken)
{
long lastFreedPage = NoPageFreed;
if (hlog.IsSizeBeyondLimit == null) return lastFreedPage;

// free up additional pages, one at a time, to bring memory usage under control starting with the earliest possible page
for (var p = Math.Max(0, page - recoveryStatus.usableCapacity + 1); p < page && hlog.IsSizeBeyondLimit(); p++)
{
var pageIndex = hlog.GetPageIndexForPage(p);
if (hlog.IsAllocated(pageIndex))
{
await recoveryStatus.WaitFlushAsync(pageIndex, cancellationToken);
hlog.EvictPage(p);
lastFreedPage = p;
}
}

return (readEndPage, lastFreedPage);
return lastFreedPage;
}

private long RecoverHybridLog(long scanFromAddress, long recoverFromAddress, long untilAddress, long nextVersion, CheckpointType checkpointType, RecoveryOptions options)
Expand All @@ -724,16 +743,17 @@ private long RecoverHybridLog(long scanFromAddress, long recoverFromAddress, lon

for (long page = startPage; page < endPage; page += numPagesToReadPerIteration)
{
var result = ReadPagesForRecovery(untilAddress, recoveryStatus, endPage, capacity, numPagesToReadPerIteration, page);
var end = result.ReadEndPage;
if (result.LastFreedPage != NoPageFreed) lastFreedPage = result.LastFreedPage;
var end = ReadPagesForRecovery(untilAddress, recoveryStatus, endPage, capacity, numPagesToReadPerIteration, page);

for (var p = page; p < end; p++)
{
// Ensure page has been read into memory
int pageIndex = hlog.GetPageIndexForPage(p);
recoveryStatus.WaitRead(pageIndex);

var freedPage = FreePagesToLimitHeapMemory(recoveryStatus, p);
if (freedPage != NoPageFreed) lastFreedPage = freedPage;

// We make an extra pass to clear locks when reading every page back into memory
ClearLocksOnPage(p, options);

Expand All @@ -754,16 +774,17 @@ private async ValueTask<long> RecoverHybridLogAsync(long scanFromAddress, long r

for (long page = startPage; page < endPage; page += numPagesToReadPerIteration)
{
var result = await ReadPagesForRecoveryAsync(untilAddress, recoveryStatus, endPage, capacity, numPagesToReadPerIteration, page, cancellationToken).ConfigureAwait(false);
var end = result.ReadEndPage;
if (result.LastFreedPage != NoPageFreed) lastFreedPage = result.LastFreedPage;
var end = await ReadPagesForRecoveryAsync(untilAddress, recoveryStatus, endPage, capacity, numPagesToReadPerIteration, page, cancellationToken).ConfigureAwait(false);

for (var p = page; p < end; p++)
{
// Ensure page has been read into memory
int pageIndex = hlog.GetPageIndexForPage(p);
await recoveryStatus.WaitReadAsync(pageIndex, cancellationToken).ConfigureAwait(false);

var freedPage = await FreePagesToLimitHeapMemoryAsync(recoveryStatus, p, cancellationToken).ConfigureAwait(false);
if (freedPage != NoPageFreed) lastFreedPage = freedPage;

// We make an extra pass to clear locks when reading every page back into memory
ClearLocksOnPage(p, options);

Expand Down Expand Up @@ -853,16 +874,17 @@ private long RecoverHybridLogFromSnapshotFile(long scanFromAddress, long recover

for (long page = startPage; page < endPage; page += numPagesToReadPerIteration)
{
var result = ReadPagesForRecovery(snapshotEndAddress, recoveryStatus, snapshotEndPage, capacity, numPagesToReadPerIteration, page);
ReadPagesForRecovery(snapshotEndAddress, recoveryStatus, snapshotEndPage, capacity, numPagesToReadPerIteration, page);
var end = Math.Min(page + numPagesToReadPerIteration, endPage);
if (result.LastFreedPage != NoPageFreed) lastFreedPage = result.LastFreedPage;
for (long p = page; p < end; p++)
{
int pageIndex = hlog.GetPageIndexForPage(p);
if (p < snapshotEndPage)
{
// Ensure the page is read from file
recoveryStatus.WaitRead(pageIndex);
var freedPage = FreePagesToLimitHeapMemory(recoveryStatus, p);
if (freedPage != NoPageFreed) lastFreedPage = freedPage;

// We make an extra pass to clear locks when reading pages back into memory
ClearLocksOnPage(p, options);
Expand Down Expand Up @@ -895,15 +917,15 @@ private async ValueTask<long> RecoverHybridLogFromSnapshotFileAsync(long scanFro
var result = await ReadPagesForRecoveryAsync(snapshotEndAddress, recoveryStatus, snapshotEndPage, capacity, numPagesToReadPerIteration, page, cancellationToken).ConfigureAwait(false);

var end = Math.Min(page + numPagesToReadPerIteration, endPage);
if (result.LastFreedPage != NoPageFreed) lastFreedPage = result.LastFreedPage;

for (long p = page; p < end; p++)
{
int pageIndex = hlog.GetPageIndexForPage(p);
if (p < snapshotEndPage)
{
// Ensure the page is read from file
await recoveryStatus.WaitReadAsync(pageIndex, cancellationToken).ConfigureAwait(false);
var freedPage = await FreePagesToLimitHeapMemoryAsync(recoveryStatus, p, cancellationToken).ConfigureAwait(false);
if (freedPage != NoPageFreed) lastFreedPage = freedPage;

// We make an extra pass to clear locks when reading pages back into memory
ClearLocksOnPage(p, options);
Expand Down
22 changes: 4 additions & 18 deletions test/Garnet.test/RespAdminCommandsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -235,18 +235,8 @@ public void SeSaveRecoverMultipleObjectsTest(int memorySize, int recoveryMemoryS
using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig(allowAdmin: true)))
{
var db = redis.GetDatabase(0);
for (int i = 0; i < 3000; i++)
{
var key = $"SeSaveRecoverTestKey{i:0000}";
db.ListLeftPush(key, ldata);
}

for (int i = 0; i < 3000; i++)
{
var key = $"SeSaveRecoverTestKey{i:0000}";
var returnedData = db.ListRange(key);
Assert.AreEqual(ldataArr, returnedData, $"key {key}");
}
for (int i = 0; i < 3000; i++) db.ListLeftPush($"SeSaveRecoverTestKey{i:0000}", ldata);
for (int i = 0; i < 3000; i++) Assert.AreEqual(ldataArr, db.ListRange($"SeSaveRecoverTestKey{i:0000}"), $"key {i:0000}");

// Issue and wait for DB save
var server = redis.GetServer($"{TestUtils.Address}:{TestUtils.Port}");
Expand All @@ -262,12 +252,8 @@ public void SeSaveRecoverMultipleObjectsTest(int memorySize, int recoveryMemoryS
using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig(allowAdmin: true)))
{
var db = redis.GetDatabase(0);
for (int i = 0; i < 3000; i++)
{
var key = $"SeSaveRecoverTestKey{i:0000}";
var returnedData = db.ListRange(key);
Assert.AreEqual(ldataArr, returnedData, $"key {key}");
}
for (var i = 3000; i < 3100; i++) db.ListLeftPush($"SeSaveRecoverTestKey{i:0000}", ldata);
for (var i = 0; i < 3100; i++) Assert.AreEqual(ldataArr, db.ListRange($"SeSaveRecoverTestKey{i:0000}"), $"key {i:0000}");
}
}

Expand Down

0 comments on commit 71eb38a

Please sign in to comment.