diff --git a/README.md b/README.md index 864f57e..3361a1e 100644 --- a/README.md +++ b/README.md @@ -65,7 +65,8 @@ This package introduces a few "requirements" against Orleans: Create an interface for the grain, which implements `ISyncWorker`, as well as one of the `IGrainWith...Key` interfaces. Then create a new class that extends the `SyncWorker` abstract class, and implements the new interface that was introduced: ```cs -public interface IPasswordVerifierGrain : ISyncWorker, IGrainWithGuidKey +public interface IPasswordVerifierGrain + : ISyncWorker, IGrainWithGuidKey; public class PasswordVerifierGrain : SyncWorker, IPasswordVerifierGrain { @@ -79,7 +80,8 @@ public class PasswordVerifierGrain : SyncWorker PerformWork(PasswordVerifierRequest request) + protected override async Task PerformWork( + PasswordVerifierRequest request, GrainCancellationToken grainCancellationToken) { var verifyResult = await _passwordVerifier.VerifyPassword(request.PasswordHash, request.Password); @@ -89,6 +91,7 @@ public class PasswordVerifierGrain : SyncWorker, ICancellableGrain +{ + public CancellableGrain( + ILogger logger, + LimitedConcurrencyLevelTaskScheduler limitedConcurrencyScheduler) : base(logger, limitedConcurrencyScheduler) + { + } + + protected override async Task PerformWork( + SampleCancellationRequest request, GrainCancellationToken grainCancellationToken) + { + var startingValue = request.StartingValue; + + for (var i = 0; i < request.EnumerationMax; i++) + { + if (grainCancellationToken.CancellationToken.IsCancellationRequested) + { + Logger.LogInformation("Task cancelled on iteration {Iteration}", i); + + if (request.ThrowOnCancel) + throw new OperationCanceledException(grainCancellationToken.CancellationToken); + + return new SampleCancellationResult() { EndingValue = startingValue }; + } + + startingValue += 1; + await Task.Delay(request.EnumerationDelay); + } + + return new SampleCancellationResult() { EndingValue = startingValue }; + } +} + +[GenerateSerializer] +public class SampleCancellationRequest +{ + [Id(0)] + public TimeSpan EnumerationDelay { get; init; } + [Id(1)] + public int StartingValue { get; init; } + [Id(2)] + public int EnumerationMax { get; init; } = 1_000; + [Id(3)] + public bool ThrowOnCancel { get; init; } +} + +[GenerateSerializer] +public class SampleCancellationResult +{ + [Id(0)] + public int EndingValue { get; init; } +} diff --git a/samples/Orleans.SyncWork.Demo.Services/Grains/ICancellableGrain.cs b/samples/Orleans.SyncWork.Demo.Services/Grains/ICancellableGrain.cs new file mode 100644 index 0000000..3d48258 --- /dev/null +++ b/samples/Orleans.SyncWork.Demo.Services/Grains/ICancellableGrain.cs @@ -0,0 +1,4 @@ +namespace Orleans.SyncWork.Demo.Services.Grains; + +public interface ICancellableGrain + : ISyncWorker, IGrainWithGuidKey; diff --git a/samples/Orleans.SyncWork.Demo.Services/Grains/PasswordVerifierGrain.cs b/samples/Orleans.SyncWork.Demo.Services/Grains/PasswordVerifierGrain.cs index b386f27..f304b0c 100644 --- a/samples/Orleans.SyncWork.Demo.Services/Grains/PasswordVerifierGrain.cs +++ b/samples/Orleans.SyncWork.Demo.Services/Grains/PasswordVerifierGrain.cs @@ -15,7 +15,8 @@ public PasswordVerifierGrain( _passwordVerifier = passwordVerifier; } - protected override async Task PerformWork(PasswordVerifierRequest request) + protected override async Task PerformWork( + PasswordVerifierRequest request, GrainCancellationToken grainCancellationToken) { var verifyResult = await _passwordVerifier.VerifyPassword(request.PasswordHash, request.Password); diff --git a/samples/Orleans.SyncWork.Demo.Services/TestGrains/GrainThatWaitsSetTimePriorToExceptionResultBecomingAvailable.cs b/samples/Orleans.SyncWork.Demo.Services/TestGrains/GrainThatWaitsSetTimePriorToExceptionResultBecomingAvailable.cs index 177d1bc..50c4c32 100644 --- a/samples/Orleans.SyncWork.Demo.Services/TestGrains/GrainThatWaitsSetTimePriorToExceptionResultBecomingAvailable.cs +++ b/samples/Orleans.SyncWork.Demo.Services/TestGrains/GrainThatWaitsSetTimePriorToExceptionResultBecomingAvailable.cs @@ -4,14 +4,19 @@ namespace Orleans.SyncWork.Demo.Services.TestGrains; -public class GrainThatWaitsSetTimePriorToExceptionResultBecomingAvailable : SyncWorker, IGrainThatWaitsSetTimePriorToExceptionResultBecomingAvailable +public class GrainThatWaitsSetTimePriorToExceptionResultBecomingAvailable : + SyncWorker, + IGrainThatWaitsSetTimePriorToExceptionResultBecomingAvailable { public GrainThatWaitsSetTimePriorToExceptionResultBecomingAvailable( ILogger logger, LimitedConcurrencyLevelTaskScheduler limitedConcurrencyScheduler - ) : base(logger, limitedConcurrencyScheduler) { } + ) : base(logger, limitedConcurrencyScheduler) + { + } - protected override async Task PerformWork(TestDelayExceptionRequest request) + protected override async Task PerformWork(TestDelayExceptionRequest request, + GrainCancellationToken grainCancellationToken) { Logger.LogInformation($"Waiting {request.MsDelayPriorToResult} on {this.IdentityString}"); await Task.Delay(request.MsDelayPriorToResult); @@ -35,4 +40,3 @@ public class TestDelayExceptionRequest [GenerateSerializer] public class TestDelayExceptionResult; - diff --git a/samples/Orleans.SyncWork.Demo.Services/TestGrains/GrainThatWaitsSetTimePriorToSuccessResultBecomingAvailable.cs b/samples/Orleans.SyncWork.Demo.Services/TestGrains/GrainThatWaitsSetTimePriorToSuccessResultBecomingAvailable.cs index c0e2edc..9244a96 100644 --- a/samples/Orleans.SyncWork.Demo.Services/TestGrains/GrainThatWaitsSetTimePriorToSuccessResultBecomingAvailable.cs +++ b/samples/Orleans.SyncWork.Demo.Services/TestGrains/GrainThatWaitsSetTimePriorToSuccessResultBecomingAvailable.cs @@ -4,22 +4,23 @@ namespace Orleans.SyncWork.Demo.Services.TestGrains; -public class GrainThatWaitsSetTimePriorToSuccessResultBecomingAvailable : SyncWorker, IGrainThatWaitsSetTimePriorToSuccessResultBecomingAvailable +public class GrainThatWaitsSetTimePriorToSuccessResultBecomingAvailable : + SyncWorker, + IGrainThatWaitsSetTimePriorToSuccessResultBecomingAvailable { public GrainThatWaitsSetTimePriorToSuccessResultBecomingAvailable( ILogger logger, LimitedConcurrencyLevelTaskScheduler limitedConcurrencyScheduler - ) : base(logger, limitedConcurrencyScheduler) { } + ) : base(logger, limitedConcurrencyScheduler) + { + } - protected override async Task PerformWork(TestDelaySuccessRequest request) + protected override async Task PerformWork(TestDelaySuccessRequest request, + GrainCancellationToken grainCancellationToken) { await Task.Delay(request.MsDelayPriorToResult); - return new TestDelaySuccessResult() - { - Started = request.Started, - Ended = DateTime.UtcNow - }; + return new TestDelaySuccessResult() { Started = request.Started, Ended = DateTime.UtcNow }; } } diff --git a/src/Orleans.SyncWork/Enums/SyncWorkStatus.cs b/src/Orleans.SyncWork/Enums/SyncWorkStatus.cs index c994063..bb79584 100644 --- a/src/Orleans.SyncWork/Enums/SyncWorkStatus.cs +++ b/src/Orleans.SyncWork/Enums/SyncWorkStatus.cs @@ -20,5 +20,5 @@ public enum SyncWorkStatus /// /// The work has been completed, though an exception was thrown. /// - Faulted + Faulted, } diff --git a/src/Orleans.SyncWork/ISyncWorker.cs b/src/Orleans.SyncWork/ISyncWorker.cs index 6a011f4..25c1149 100644 --- a/src/Orleans.SyncWork/ISyncWorker.cs +++ b/src/Orleans.SyncWork/ISyncWorker.cs @@ -18,6 +18,20 @@ public interface ISyncWorker : IGrain /// true if work is started, false if it was already started. Task Start(TRequest request); /// + /// + /// Start long running work with the provided parameter. + /// + /// + /// Supports cancellation, but any cancellation logic is up to the grain implementation. It could + /// conceivably return a result at the point of cancellation, or throw, depending on what makes sense for + /// the particular grain. + /// + /// + /// The parameter containing all necessary information to start the workload. + /// The token for cancelling tasks. + /// true if work is started, false if it was already started. + Task Start(TRequest request, GrainCancellationToken grainCancellationToken); + /// /// Gets the long running work status. /// /// The status of the long running work. diff --git a/src/Orleans.SyncWork/SyncWorker.cs b/src/Orleans.SyncWork/SyncWorker.cs index d37322e..72ef349 100644 --- a/src/Orleans.SyncWork/SyncWorker.cs +++ b/src/Orleans.SyncWork/SyncWorker.cs @@ -1,5 +1,4 @@ using System; -using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Orleans.SyncWork.Enums; @@ -40,7 +39,15 @@ protected SyncWorker(ILogger logger, LimitedConcurrencyLevelTaskScheduler limite } /// - public Task Start(TRequest request) + public async Task Start(TRequest request) + { + var token = new GrainCancellationTokenSource().Token; + + return await Start(request, token); + } + + /// + public Task Start(TRequest request, GrainCancellationToken grainCancellationToken) { if (_task != null) { @@ -50,7 +57,7 @@ public Task Start(TRequest request) Logger.LogDebug("{Method}: Starting task, set status to running.", nameof(Start)); _status = SyncWorkStatus.Running; - _task = CreateTask(request); + _task = CreateTask(request, grainCancellationToken); return Task.FromResult(true); } @@ -94,22 +101,24 @@ public Task Start(TRequest request) /// The method that actually performs the long running work. /// /// The request/parameters used for the execution of the method. - /// - protected abstract Task PerformWork(TRequest request); + /// The cancellation token. + /// A result once available. + protected abstract Task PerformWork(TRequest request, GrainCancellationToken grainCancellationToken); /// /// The task creation that fires off the long running work to the . /// /// The request to use for the invoke of the long running work. + /// The cancellation token. /// a representing the fact that the work has been dispatched. - private Task CreateTask(TRequest request) + private Task CreateTask(TRequest request, GrainCancellationToken grainCancellationToken) { return Task.Factory.StartNew(async () => { try { Logger.LogInformation("{Method}: Beginning work for task.", nameof(CreateTask)); - _result = await PerformWork(request); + _result = await PerformWork(request, grainCancellationToken); _exception = default; _status = SyncWorkStatus.Completed; Logger.LogInformation("{Method}: Completed work for task.", nameof(CreateTask)); @@ -121,6 +130,6 @@ private Task CreateTask(TRequest request) _exception = e; _status = SyncWorkStatus.Faulted; } - }, CancellationToken.None, TaskCreationOptions.LongRunning, _limitedConcurrencyScheduler); + }, grainCancellationToken.CancellationToken, TaskCreationOptions.LongRunning, _limitedConcurrencyScheduler); } } diff --git a/src/Orleans.SyncWork/SyncWorkerExtensions.cs b/src/Orleans.SyncWork/SyncWorkerExtensions.cs index 7ad754b..14954ad 100644 --- a/src/Orleans.SyncWork/SyncWorkerExtensions.cs +++ b/src/Orleans.SyncWork/SyncWorkerExtensions.cs @@ -11,9 +11,13 @@ namespace Orleans.SyncWork; public static class SyncWorkerExtensions { /// - /// Starts the work of a , polls it until a result is available, then returns it. - /// + /// + /// Starts the work of a , polls it until a result is available, + /// then returns it. + /// + /// /// Polls the every 1000ms for a result, until one is available. + /// /// /// The type of request being dispatched. /// The type of result expected from the work. @@ -23,7 +27,29 @@ public static class SyncWorkerExtensions /// Thrown when the is in a state. public static Task StartWorkAndPollUntilResult(this ISyncWorker worker, TRequest request) { - return worker.StartWorkAndPollUntilResult(request, 1000); + var grainCancellationToken = new GrainCancellationTokenSource().Token; + return worker.StartWorkAndPollUntilResult(request, 1000, grainCancellationToken); + } + + /// + /// + /// Starts the work of a , polls it until a result is available, + /// then returns it. + /// + /// + /// Polls the every 1000ms for a result, until one is available. + /// + /// + /// The type of request being dispatched. + /// The type of result expected from the work. + /// The doing the work. + /// The request to be dispatched. + /// The token for cancelling tasks. + /// The result of the . + /// Thrown when the is in a state. + public static Task StartWorkAndPollUntilResult(this ISyncWorker worker, TRequest request, GrainCancellationToken grainCancellationToken) + { + return worker.StartWorkAndPollUntilResult(request, 1000, grainCancellationToken); } /// @@ -43,6 +69,29 @@ public static Task StartWorkAndPollUntilResult(this /// The result of the . /// Thrown when the is in a state. public static async Task StartWorkAndPollUntilResult(this ISyncWorker worker, TRequest request, int msDelayPerStatusPoll) + { + var grainCancellationToken = new GrainCancellationTokenSource().Token; + return await StartWorkAndPollUntilResult(worker, request, msDelayPerStatusPoll, grainCancellationToken); + } + + /// + /// Starts the work of a , polls it until a result is available, then returns it. + /// + /// + /// Caution is advised when setting the msDelayPerStatusPoll "too low" - 1000 ms seems to be pretty safe, + /// but if the cluster is under *enough* load, that much grain polling could overwhelm it. + /// + /// The type of request being dispatched. + /// The type of result expected from the work. + /// The doing the work. + /// The request to be dispatched. + /// + /// The ms delay per attempt to poll for a or status. + /// + /// The token for cancelling tasks. + /// The result of the . + /// Thrown when the is in a state. + public static async Task StartWorkAndPollUntilResult(this ISyncWorker worker, TRequest request, int msDelayPerStatusPoll, GrainCancellationToken grainCancellationToken) { await worker.Start(request); await Task.Delay(100); @@ -64,7 +113,7 @@ public static async Task StartWorkAndPollUntilResult case SyncWorkStatus.NotStarted: throw new InvalidStateException("This shouldn't happen, but if it does, it probably means the cluster may have died and restarted, and/or a timeout occurred and the grain got reinstantiated without firing off the work."); default: - throw new Exception("How did we even get here...?"); + throw new InvalidStateException("How did we even get here...?"); } } } diff --git a/test/Orleans.SyncWork.Tests/SyncWorkerTests.cs b/test/Orleans.SyncWork.Tests/SyncWorkerTests.cs index 5c8ed3c..e75e577 100644 --- a/test/Orleans.SyncWork.Tests/SyncWorkerTests.cs +++ b/test/Orleans.SyncWork.Tests/SyncWorkerTests.cs @@ -6,6 +6,7 @@ using Orleans.SyncWork.Demo.Services; using Orleans.SyncWork.Demo.Services.Grains; using Orleans.SyncWork.Demo.Services.TestGrains; +using Orleans.SyncWork.Enums; using Orleans.SyncWork.Exceptions; using Orleans.SyncWork.Tests.TestClusters; using Orleans.SyncWork.Tests.XUnitTraits; @@ -18,6 +19,8 @@ namespace Orleans.SyncWork.Tests; /// public class SyncWorkerTests : ClusterTestBase { + private readonly GrainCancellationTokenSource _cancellationTokenSource = new(); + public SyncWorkerTests(ClusterFixture fixture) : base(fixture) { } [Theory, Trait(Traits.Category, Traits.Categories.LongRunning)] @@ -89,7 +92,7 @@ await grain.Start(new TestDelaySuccessRequest() { Started = DateTime.UtcNow, MsDelayPriorToResult = delay - }); + }, _cancellationTokenSource.Token); var status = await grain.GetWorkStatus(); @@ -111,7 +114,7 @@ await grain.Start(new TestDelaySuccessRequest() { Started = DateTime.UtcNow, MsDelayPriorToResult = delay - }); + }, _cancellationTokenSource.Token); var action = new Func(async () => await grain.GetResult()); @@ -126,7 +129,7 @@ public async Task WhenGrainExceptionStartedButWorkNotCompleted_ShouldReturnStatu await grain.Start(new TestDelayExceptionRequest() { MsDelayPriorToResult = delay - }); + }, _cancellationTokenSource.Token); var status = await grain.GetWorkStatus(); @@ -147,7 +150,7 @@ public async Task WhenExceptionGrainStartedButWorkNotCompleted_ShouldThrowWhenAt await grain.Start(new TestDelayExceptionRequest() { MsDelayPriorToResult = delay - }); + }, _cancellationTokenSource.Token); var action = new Func(async () => await grain.GetException()); @@ -233,4 +236,86 @@ public async Task WhenExceptionGrainWorkCompleted_ShouldThrowForGetResult() await action.Should().ThrowAsync(); } + + [Fact] + public async Task WhenGrainHasCancellationSupport_ShouldRunThroughFullGrainLogicIfNoCancellation() + { + var request = new SampleCancellationRequest + { + StartingValue = 1, + EnumerationDelay = TimeSpan.FromMilliseconds(10), + EnumerationMax = 1_000 + }; + + var grain = Cluster.GrainFactory.GetGrain(Guid.NewGuid()); + + var result = await grain.StartWorkAndPollUntilResult(request); + + result.EndingValue.Should().BeGreaterOrEqualTo(1000); + } + + [Fact] + public async Task WhenGrainHasCancellationSupport_CanReturnResultAtCancellation() + { + var request = new SampleCancellationRequest + { + StartingValue = 1, + EnumerationDelay = TimeSpan.FromMilliseconds(10), + EnumerationMax = 1_000, + ThrowOnCancel = false, + }; + + var grain = Cluster.GrainFactory.GetGrain(Guid.NewGuid()); + + var cancellationToken = _cancellationTokenSource.Token; + + _ = await grain.Start(request, cancellationToken); + await Task.Delay(TimeSpan.FromSeconds(1)); + await _cancellationTokenSource.Cancel(); + + // Since sending the cancellation is not instantaneous, wait until the work status has changed from running + var status = await grain.GetWorkStatus(); + while (status == Enums.SyncWorkStatus.Running) + { + await Task.Delay(100); + status = await grain.GetWorkStatus(); + } + + var result = await grain.GetResult(); + + result!.EndingValue.Should().BeGreaterThan(1); + result!.EndingValue.Should().BeLessThan(1_000); + } + + [Fact] + public async Task WhenGrainHasCancellationSupport_CanThrow() + { + var request = new SampleCancellationRequest + { + StartingValue = 1, + EnumerationDelay = TimeSpan.FromMilliseconds(10), + EnumerationMax = 1_000, + ThrowOnCancel = true, + }; + + var grain = Cluster.GrainFactory.GetGrain(Guid.NewGuid()); + + var cancellationToken = _cancellationTokenSource.Token; + + _ = await grain.Start(request, cancellationToken); + await Task.Delay(TimeSpan.FromSeconds(1)); + await _cancellationTokenSource.Cancel(); + + // Since sending the cancellation is not instantaneous, wait until the work status has changed from running + var status = await grain.GetWorkStatus(); + while (status == Enums.SyncWorkStatus.Running) + { + await Task.Delay(100); + status = await grain.GetWorkStatus(); + } + + var result = await grain.GetException(); + + result.Should().BeOfType(); + } }