From 71eb38af95e669b30c06a873831921216cada22f Mon Sep 17 00:00:00 2001 From: Yoganand Rajasekaran Date: Thu, 9 May 2024 20:56:24 -0700 Subject: [PATCH] Free pages after read completion. --- .../cs/src/core/Index/Recovery/Recovery.cs | 98 ++++++++++++------- test/Garnet.test/RespAdminCommandsTests.cs | 22 +---- 2 files changed, 64 insertions(+), 56 deletions(-) diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Recovery/Recovery.cs b/libs/storage/Tsavorite/cs/src/core/Index/Recovery/Recovery.cs index ccde9f05d1..cb25b05970 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Recovery/Recovery.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Recovery/Recovery.cs @@ -646,31 +646,12 @@ private void FreePagesBeyondUsableCapacity(long startPage, int capacity, int usa } } - private void ReadPagesWithMemoryConstraint(long endAddress, int capacity, RecoveryStatus recoveryStatus, long page, long endPage, int numPagesToRead, ref long lastFreedPage) + private void ReadPagesWithMemoryConstraint(long endAddress, int capacity, RecoveryStatus recoveryStatus, long page, long endPage, int numPagesToRead) { // Before reading in additional pages, make sure that any previously allocated pages that would violate the memory size // constraint are freed. FreePagesBeyondUsableCapacity(startPage: page, capacity: capacity, usableCapacity: capacity - hlog.MinEmptyPageCount, pagesToRead: numPagesToRead, recoveryStatus); - if (hlog.IsSizeBeyondLimit != null) - { - // free up additional pages, one at a time, to bring memory usage under control - var scanPage = Math.Max(0, page - capacity); // start with the earliest possible page - - while (scanPage < page && hlog.IsSizeBeyondLimit()) - { - var pageIndex = hlog.GetPageIndexForPage(scanPage); - if (hlog.IsAllocated(pageIndex)) - { - recoveryStatus.WaitFlush(pageIndex); - hlog.EvictPage(scanPage); - lastFreedPage = scanPage; - } - - scanPage++; - } - } - // Issue request to read pages as much as possible for (var p = page; p < endPage; p++) recoveryStatus.readStatus[hlog.GetPageIndexForPage(p)] = ReadStatus.Pending; hlog.AsyncReadPagesFromDevice(page, numPagesToRead, endAddress, @@ -679,10 +660,29 @@ private void ReadPagesWithMemoryConstraint(long endAddress, int capacity, Recove recoveryStatus.recoveryDevice, recoveryStatus.objectLogRecoveryDevice); } - private (long ReadEndPage, long LastFreedPage) ReadPagesForRecovery(long untilAddress, RecoveryStatus recoveryStatus, long endPage, int capacity, int numPagesToReadPerIteration, long page) + private long FreePagesToLimitHeapMemory(RecoveryStatus recoveryStatus, long page) { - var readEndPage = Math.Min(page + numPagesToReadPerIteration, endPage); long lastFreedPage = NoPageFreed; + if (hlog.IsSizeBeyondLimit == null) return lastFreedPage; + + // free up additional pages, one at a time, to bring memory usage under control starting with the earliest possible page + for (var p = Math.Max(0, page - recoveryStatus.usableCapacity + 1); p < page && hlog.IsSizeBeyondLimit(); p++) + { + var pageIndex = hlog.GetPageIndexForPage(p); + if (hlog.IsAllocated(pageIndex)) + { + recoveryStatus.WaitFlush(pageIndex); + hlog.EvictPage(p); + lastFreedPage = p; + } + } + + return lastFreedPage; + } + + private long ReadPagesForRecovery(long untilAddress, RecoveryStatus recoveryStatus, long endPage, int capacity, int numPagesToReadPerIteration, long page) + { + var readEndPage = Math.Min(page + numPagesToReadPerIteration, endPage); if (page < readEndPage) { var numPagesToRead = (int)(readEndPage - page); @@ -691,16 +691,15 @@ private void ReadPagesWithMemoryConstraint(long endAddress, int capacity, Recove // this must be done in batches of "all flushes' followed by "all reads" to ensure proper sequencing of reads when // usableCapacity != capacity (and thus the page-read index is not equal to the page-flush index). WaitUntilAllPagesHaveBeenFlushed(page, readEndPage, recoveryStatus); - ReadPagesWithMemoryConstraint(untilAddress, capacity, recoveryStatus, page, readEndPage, numPagesToRead, ref lastFreedPage); + ReadPagesWithMemoryConstraint(untilAddress, capacity, recoveryStatus, page, readEndPage, numPagesToRead); } - return (readEndPage, lastFreedPage); + return readEndPage; } - private async ValueTask<(long ReadEndPage, long LastFreedPage)> ReadPagesForRecoveryAsync(long untilAddress, RecoveryStatus recoveryStatus, long endPage, int capacity, int numPagesToReadPerIteration, long page, CancellationToken cancellationToken) + private async ValueTask ReadPagesForRecoveryAsync(long untilAddress, RecoveryStatus recoveryStatus, long endPage, int capacity, int numPagesToReadPerIteration, long page, CancellationToken cancellationToken) { var readEndPage = Math.Min(page + numPagesToReadPerIteration, endPage); - long lastFreedPage = NoPageFreed; if (page < readEndPage) { var numPagesToRead = (int)(readEndPage - page); @@ -709,10 +708,30 @@ private void ReadPagesWithMemoryConstraint(long endAddress, int capacity, Recove // this must be done in batches of "all flushes' followed by "all reads" to ensure proper sequencing of reads when // usableCapacity != capacity (and thus the page-read index is not equal to the page-flush index). await WaitUntilAllPagesHaveBeenFlushedAsync(page, readEndPage, recoveryStatus, cancellationToken).ConfigureAwait(false); - ReadPagesWithMemoryConstraint(untilAddress, capacity, recoveryStatus, page, readEndPage, numPagesToRead, ref lastFreedPage); + ReadPagesWithMemoryConstraint(untilAddress, capacity, recoveryStatus, page, readEndPage, numPagesToRead); + } + + return readEndPage; + } + + private async Task FreePagesToLimitHeapMemoryAsync(RecoveryStatus recoveryStatus, long page, CancellationToken cancellationToken) + { + long lastFreedPage = NoPageFreed; + if (hlog.IsSizeBeyondLimit == null) return lastFreedPage; + + // free up additional pages, one at a time, to bring memory usage under control starting with the earliest possible page + for (var p = Math.Max(0, page - recoveryStatus.usableCapacity + 1); p < page && hlog.IsSizeBeyondLimit(); p++) + { + var pageIndex = hlog.GetPageIndexForPage(p); + if (hlog.IsAllocated(pageIndex)) + { + await recoveryStatus.WaitFlushAsync(pageIndex, cancellationToken); + hlog.EvictPage(p); + lastFreedPage = p; + } } - return (readEndPage, lastFreedPage); + return lastFreedPage; } private long RecoverHybridLog(long scanFromAddress, long recoverFromAddress, long untilAddress, long nextVersion, CheckpointType checkpointType, RecoveryOptions options) @@ -724,9 +743,7 @@ private long RecoverHybridLog(long scanFromAddress, long recoverFromAddress, lon for (long page = startPage; page < endPage; page += numPagesToReadPerIteration) { - var result = ReadPagesForRecovery(untilAddress, recoveryStatus, endPage, capacity, numPagesToReadPerIteration, page); - var end = result.ReadEndPage; - if (result.LastFreedPage != NoPageFreed) lastFreedPage = result.LastFreedPage; + var end = ReadPagesForRecovery(untilAddress, recoveryStatus, endPage, capacity, numPagesToReadPerIteration, page); for (var p = page; p < end; p++) { @@ -734,6 +751,9 @@ private long RecoverHybridLog(long scanFromAddress, long recoverFromAddress, lon int pageIndex = hlog.GetPageIndexForPage(p); recoveryStatus.WaitRead(pageIndex); + var freedPage = FreePagesToLimitHeapMemory(recoveryStatus, p); + if (freedPage != NoPageFreed) lastFreedPage = freedPage; + // We make an extra pass to clear locks when reading every page back into memory ClearLocksOnPage(p, options); @@ -754,9 +774,7 @@ private async ValueTask RecoverHybridLogAsync(long scanFromAddress, long r for (long page = startPage; page < endPage; page += numPagesToReadPerIteration) { - var result = await ReadPagesForRecoveryAsync(untilAddress, recoveryStatus, endPage, capacity, numPagesToReadPerIteration, page, cancellationToken).ConfigureAwait(false); - var end = result.ReadEndPage; - if (result.LastFreedPage != NoPageFreed) lastFreedPage = result.LastFreedPage; + var end = await ReadPagesForRecoveryAsync(untilAddress, recoveryStatus, endPage, capacity, numPagesToReadPerIteration, page, cancellationToken).ConfigureAwait(false); for (var p = page; p < end; p++) { @@ -764,6 +782,9 @@ private async ValueTask RecoverHybridLogAsync(long scanFromAddress, long r int pageIndex = hlog.GetPageIndexForPage(p); await recoveryStatus.WaitReadAsync(pageIndex, cancellationToken).ConfigureAwait(false); + var freedPage = await FreePagesToLimitHeapMemoryAsync(recoveryStatus, p, cancellationToken).ConfigureAwait(false); + if (freedPage != NoPageFreed) lastFreedPage = freedPage; + // We make an extra pass to clear locks when reading every page back into memory ClearLocksOnPage(p, options); @@ -853,9 +874,8 @@ private long RecoverHybridLogFromSnapshotFile(long scanFromAddress, long recover for (long page = startPage; page < endPage; page += numPagesToReadPerIteration) { - var result = ReadPagesForRecovery(snapshotEndAddress, recoveryStatus, snapshotEndPage, capacity, numPagesToReadPerIteration, page); + ReadPagesForRecovery(snapshotEndAddress, recoveryStatus, snapshotEndPage, capacity, numPagesToReadPerIteration, page); var end = Math.Min(page + numPagesToReadPerIteration, endPage); - if (result.LastFreedPage != NoPageFreed) lastFreedPage = result.LastFreedPage; for (long p = page; p < end; p++) { int pageIndex = hlog.GetPageIndexForPage(p); @@ -863,6 +883,8 @@ private long RecoverHybridLogFromSnapshotFile(long scanFromAddress, long recover { // Ensure the page is read from file recoveryStatus.WaitRead(pageIndex); + var freedPage = FreePagesToLimitHeapMemory(recoveryStatus, p); + if (freedPage != NoPageFreed) lastFreedPage = freedPage; // We make an extra pass to clear locks when reading pages back into memory ClearLocksOnPage(p, options); @@ -895,8 +917,6 @@ private async ValueTask RecoverHybridLogFromSnapshotFileAsync(long scanFro var result = await ReadPagesForRecoveryAsync(snapshotEndAddress, recoveryStatus, snapshotEndPage, capacity, numPagesToReadPerIteration, page, cancellationToken).ConfigureAwait(false); var end = Math.Min(page + numPagesToReadPerIteration, endPage); - if (result.LastFreedPage != NoPageFreed) lastFreedPage = result.LastFreedPage; - for (long p = page; p < end; p++) { int pageIndex = hlog.GetPageIndexForPage(p); @@ -904,6 +924,8 @@ private async ValueTask RecoverHybridLogFromSnapshotFileAsync(long scanFro { // Ensure the page is read from file await recoveryStatus.WaitReadAsync(pageIndex, cancellationToken).ConfigureAwait(false); + var freedPage = await FreePagesToLimitHeapMemoryAsync(recoveryStatus, p, cancellationToken).ConfigureAwait(false); + if (freedPage != NoPageFreed) lastFreedPage = freedPage; // We make an extra pass to clear locks when reading pages back into memory ClearLocksOnPage(p, options); diff --git a/test/Garnet.test/RespAdminCommandsTests.cs b/test/Garnet.test/RespAdminCommandsTests.cs index eaae21d74e..e52b7df2db 100644 --- a/test/Garnet.test/RespAdminCommandsTests.cs +++ b/test/Garnet.test/RespAdminCommandsTests.cs @@ -235,18 +235,8 @@ public void SeSaveRecoverMultipleObjectsTest(int memorySize, int recoveryMemoryS using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig(allowAdmin: true))) { var db = redis.GetDatabase(0); - for (int i = 0; i < 3000; i++) - { - var key = $"SeSaveRecoverTestKey{i:0000}"; - db.ListLeftPush(key, ldata); - } - - for (int i = 0; i < 3000; i++) - { - var key = $"SeSaveRecoverTestKey{i:0000}"; - var returnedData = db.ListRange(key); - Assert.AreEqual(ldataArr, returnedData, $"key {key}"); - } + for (int i = 0; i < 3000; i++) db.ListLeftPush($"SeSaveRecoverTestKey{i:0000}", ldata); + for (int i = 0; i < 3000; i++) Assert.AreEqual(ldataArr, db.ListRange($"SeSaveRecoverTestKey{i:0000}"), $"key {i:0000}"); // Issue and wait for DB save var server = redis.GetServer($"{TestUtils.Address}:{TestUtils.Port}"); @@ -262,12 +252,8 @@ public void SeSaveRecoverMultipleObjectsTest(int memorySize, int recoveryMemoryS using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig(allowAdmin: true))) { var db = redis.GetDatabase(0); - for (int i = 0; i < 3000; i++) - { - var key = $"SeSaveRecoverTestKey{i:0000}"; - var returnedData = db.ListRange(key); - Assert.AreEqual(ldataArr, returnedData, $"key {key}"); - } + for (var i = 3000; i < 3100; i++) db.ListLeftPush($"SeSaveRecoverTestKey{i:0000}", ldata); + for (var i = 0; i < 3100; i++) Assert.AreEqual(ldataArr, db.ListRange($"SeSaveRecoverTestKey{i:0000}"), $"key {i:0000}"); } }