Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds cancellation token support for long running work grains. #53

Merged
merged 5 commits into from
Feb 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ This package introduces a few "requirements" against Orleans:
Create an interface for the grain, which implements `ISyncWorker<TRequest, TResult>`, as well as one of the `IGrainWith...Key` interfaces. Then create a new class that extends the `SyncWorker<TRequest, TResult>` abstract class, and implements the new interface that was introduced:

```cs
public interface IPasswordVerifierGrain : ISyncWorker<PasswordVerifierRequest, PasswordVerifierResult>, IGrainWithGuidKey
public interface IPasswordVerifierGrain
: ISyncWorker<PasswordVerifierRequest, PasswordVerifierResult>, IGrainWithGuidKey;

public class PasswordVerifierGrain : SyncWorker<PasswordVerifierRequest, PasswordVerifierResult>, IPasswordVerifierGrain
{
Expand All @@ -79,7 +80,8 @@ public class PasswordVerifierGrain : SyncWorker<PasswordVerifierRequest, Passwor
_passwordVerifier = passwordVerifier;
}

protected override async Task<PasswordVerifierResult> PerformWork(PasswordVerifierRequest request)
protected override async Task<PasswordVerifierResult> PerformWork(
PasswordVerifierRequest request, GrainCancellationToken grainCancellationToken)
{
var verifyResult = await _passwordVerifier.VerifyPassword(request.PasswordHash, request.Password);

Expand All @@ -89,6 +91,7 @@ public class PasswordVerifierGrain : SyncWorker<PasswordVerifierRequest, Passwor
};
}
}

public class PasswordVerifierRequest
{
public string Password { get; set; }
Expand Down
58 changes: 58 additions & 0 deletions samples/Orleans.SyncWork.Demo.Services/Grains/CancellableGrain.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

namespace Orleans.SyncWork.Demo.Services.Grains;

public class CancellableGrain : SyncWorker<SampleCancellationRequest, SampleCancellationResult>, ICancellableGrain
{
public CancellableGrain(
ILogger<CancellableGrain> logger,
LimitedConcurrencyLevelTaskScheduler limitedConcurrencyScheduler) : base(logger, limitedConcurrencyScheduler)
{
}

protected override async Task<SampleCancellationResult> PerformWork(
SampleCancellationRequest request, GrainCancellationToken grainCancellationToken)
{
var startingValue = request.StartingValue;

for (var i = 0; i < request.EnumerationMax; i++)
{
if (grainCancellationToken.CancellationToken.IsCancellationRequested)
{
Logger.LogInformation("Task cancelled on iteration {Iteration}", i);

if (request.ThrowOnCancel)
throw new OperationCanceledException(grainCancellationToken.CancellationToken);

return new SampleCancellationResult() { EndingValue = startingValue };
}

startingValue += 1;
await Task.Delay(request.EnumerationDelay);
}

return new SampleCancellationResult() { EndingValue = startingValue };
}
}

[GenerateSerializer]
public class SampleCancellationRequest
{
[Id(0)]
public TimeSpan EnumerationDelay { get; init; }
[Id(1)]
public int StartingValue { get; init; }
[Id(2)]
public int EnumerationMax { get; init; } = 1_000;
[Id(3)]
public bool ThrowOnCancel { get; init; }
}

[GenerateSerializer]
public class SampleCancellationResult
{
[Id(0)]
public int EndingValue { get; init; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
namespace Orleans.SyncWork.Demo.Services.Grains;

public interface ICancellableGrain
: ISyncWorker<SampleCancellationRequest, SampleCancellationResult>, IGrainWithGuidKey;
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ public PasswordVerifierGrain(
_passwordVerifier = passwordVerifier;
}

protected override async Task<PasswordVerifierResult> PerformWork(PasswordVerifierRequest request)
protected override async Task<PasswordVerifierResult> PerformWork(
PasswordVerifierRequest request, GrainCancellationToken grainCancellationToken)
{
var verifyResult = await _passwordVerifier.VerifyPassword(request.PasswordHash, request.Password);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@

namespace Orleans.SyncWork.Demo.Services.TestGrains;

public class GrainThatWaitsSetTimePriorToExceptionResultBecomingAvailable : SyncWorker<TestDelayExceptionRequest, TestDelayExceptionResult>, IGrainThatWaitsSetTimePriorToExceptionResultBecomingAvailable
public class GrainThatWaitsSetTimePriorToExceptionResultBecomingAvailable :
SyncWorker<TestDelayExceptionRequest, TestDelayExceptionResult>,
IGrainThatWaitsSetTimePriorToExceptionResultBecomingAvailable
{
public GrainThatWaitsSetTimePriorToExceptionResultBecomingAvailable(
ILogger<GrainThatWaitsSetTimePriorToExceptionResultBecomingAvailable> logger,
LimitedConcurrencyLevelTaskScheduler limitedConcurrencyScheduler
) : base(logger, limitedConcurrencyScheduler) { }
) : base(logger, limitedConcurrencyScheduler)
{
}

protected override async Task<TestDelayExceptionResult> PerformWork(TestDelayExceptionRequest request)
protected override async Task<TestDelayExceptionResult> PerformWork(TestDelayExceptionRequest request,
GrainCancellationToken grainCancellationToken)
{
Logger.LogInformation($"Waiting {request.MsDelayPriorToResult} on {this.IdentityString}");
await Task.Delay(request.MsDelayPriorToResult);
Expand All @@ -35,4 +40,3 @@ public class TestDelayExceptionRequest

[GenerateSerializer]
public class TestDelayExceptionResult;

Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,23 @@

namespace Orleans.SyncWork.Demo.Services.TestGrains;

public class GrainThatWaitsSetTimePriorToSuccessResultBecomingAvailable : SyncWorker<TestDelaySuccessRequest, TestDelaySuccessResult>, IGrainThatWaitsSetTimePriorToSuccessResultBecomingAvailable
public class GrainThatWaitsSetTimePriorToSuccessResultBecomingAvailable :
SyncWorker<TestDelaySuccessRequest, TestDelaySuccessResult>,
IGrainThatWaitsSetTimePriorToSuccessResultBecomingAvailable
{
public GrainThatWaitsSetTimePriorToSuccessResultBecomingAvailable(
ILogger<GrainThatWaitsSetTimePriorToSuccessResultBecomingAvailable> logger,
LimitedConcurrencyLevelTaskScheduler limitedConcurrencyScheduler
) : base(logger, limitedConcurrencyScheduler) { }
) : base(logger, limitedConcurrencyScheduler)
{
}

protected override async Task<TestDelaySuccessResult> PerformWork(TestDelaySuccessRequest request)
protected override async Task<TestDelaySuccessResult> PerformWork(TestDelaySuccessRequest request,
GrainCancellationToken grainCancellationToken)
{
await Task.Delay(request.MsDelayPriorToResult);

return new TestDelaySuccessResult()
{
Started = request.Started,
Ended = DateTime.UtcNow
};
return new TestDelaySuccessResult() { Started = request.Started, Ended = DateTime.UtcNow };
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/Orleans.SyncWork/Enums/SyncWorkStatus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ public enum SyncWorkStatus
/// <summary>
/// The work has been completed, though an exception was thrown.
/// </summary>
Faulted
Faulted,
}
14 changes: 14 additions & 0 deletions src/Orleans.SyncWork/ISyncWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,20 @@ public interface ISyncWorker<in TRequest, TResult> : IGrain
/// <returns>true if work is started, false if it was already started.</returns>
Task<bool> Start(TRequest request);
/// <summary>
/// <para>
/// Start long running work with the provided parameter.
/// </para>
/// <para>
/// Supports cancellation, but any cancellation logic is up to the grain implementation. It could
/// conceivably return a result at the point of cancellation, or throw, depending on what makes sense for
/// the particular grain.
/// </para>
/// </summary>
/// <param name="request">The parameter containing all necessary information to start the workload.</param>
/// <param name="grainCancellationToken">The token for cancelling tasks.</param>
/// <returns>true if work is started, false if it was already started.</returns>
Task<bool> Start(TRequest request, GrainCancellationToken grainCancellationToken);
/// <summary>
/// Gets the long running work status.
/// </summary>
/// <returns>The status of the long running work.</returns>
Expand Down
25 changes: 17 additions & 8 deletions src/Orleans.SyncWork/SyncWorker.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Orleans.SyncWork.Enums;
Expand Down Expand Up @@ -40,7 +39,15 @@ protected SyncWorker(ILogger logger, LimitedConcurrencyLevelTaskScheduler limite
}

/// <inheritdoc />
public Task<bool> Start(TRequest request)
public async Task<bool> Start(TRequest request)
{
var token = new GrainCancellationTokenSource().Token;

return await Start(request, token);
}

/// <inheritdoc />
public Task<bool> Start(TRequest request, GrainCancellationToken grainCancellationToken)
{
if (_task != null)
{
Expand All @@ -50,7 +57,7 @@ public Task<bool> Start(TRequest request)

Logger.LogDebug("{Method}: Starting task, set status to running.", nameof(Start));
_status = SyncWorkStatus.Running;
_task = CreateTask(request);
_task = CreateTask(request, grainCancellationToken);

return Task.FromResult(true);
}
Expand Down Expand Up @@ -94,22 +101,24 @@ public Task<bool> Start(TRequest request)
/// The method that actually performs the long running work.
/// </summary>
/// <param name="request">The request/parameters used for the execution of the method.</param>
/// <returns></returns>
protected abstract Task<TResult> PerformWork(TRequest request);
/// <param name="grainCancellationToken">The cancellation token.</param>
/// <returns>A result once available.</returns>
protected abstract Task<TResult> PerformWork(TRequest request, GrainCancellationToken grainCancellationToken);

/// <summary>
/// The task creation that fires off the long running work to the <see cref="LimitedConcurrencyLevelTaskScheduler"/>.
/// </summary>
/// <param name="request">The request to use for the invoke of the long running work.</param>
/// <param name="grainCancellationToken">The cancellation token.</param>
/// <returns>a <see cref="Task"/> representing the fact that the work has been dispatched.</returns>
private Task CreateTask(TRequest request)
private Task CreateTask(TRequest request, GrainCancellationToken grainCancellationToken)
{
return Task.Factory.StartNew(async () =>
{
try
{
Logger.LogInformation("{Method}: Beginning work for task.", nameof(CreateTask));
_result = await PerformWork(request);
_result = await PerformWork(request, grainCancellationToken);
_exception = default;
_status = SyncWorkStatus.Completed;
Logger.LogInformation("{Method}: Completed work for task.", nameof(CreateTask));
Expand All @@ -121,6 +130,6 @@ private Task CreateTask(TRequest request)
_exception = e;
_status = SyncWorkStatus.Faulted;
}
}, CancellationToken.None, TaskCreationOptions.LongRunning, _limitedConcurrencyScheduler);
}, grainCancellationToken.CancellationToken, TaskCreationOptions.LongRunning, _limitedConcurrencyScheduler);
}
}
57 changes: 53 additions & 4 deletions src/Orleans.SyncWork/SyncWorkerExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@ namespace Orleans.SyncWork;
public static class SyncWorkerExtensions
{
/// <summary>
/// Starts the work of a <see cref="ISyncWorker{TRequest, TResult}"/>, polls it until a result is available, then returns it.
///
/// <para>
/// Starts the work of a <see cref="ISyncWorker{TRequest, TResult}"/>, polls it until a result is available,
/// then returns it.
/// </para>
/// <para>
/// Polls the <see cref="ISyncWorker{TRequest, TResult}"/> every 1000ms for a result, until one is available.
/// </para>
/// </summary>
/// <typeparam name="TRequest">The type of request being dispatched.</typeparam>
/// <typeparam name="TResult">The type of result expected from the work.</typeparam>
Expand All @@ -23,7 +27,29 @@ public static class SyncWorkerExtensions
/// <exception cref="Exception">Thrown when the <see cref="ISyncWorker{TRequest, TResult}"/> is in a <see cref="SyncWorkStatus.Faulted"/> state.</exception>
public static Task<TResult> StartWorkAndPollUntilResult<TRequest, TResult>(this ISyncWorker<TRequest, TResult> worker, TRequest request)
{
return worker.StartWorkAndPollUntilResult(request, 1000);
var grainCancellationToken = new GrainCancellationTokenSource().Token;
return worker.StartWorkAndPollUntilResult(request, 1000, grainCancellationToken);
}

/// <summary>
/// <para>
/// Starts the work of a <see cref="ISyncWorker{TRequest, TResult}"/>, polls it until a result is available,
/// then returns it.
/// </para>
/// <para>
/// Polls the <see cref="ISyncWorker{TRequest, TResult}"/> every 1000ms for a result, until one is available.
/// </para>
/// </summary>
/// <typeparam name="TRequest">The type of request being dispatched.</typeparam>
/// <typeparam name="TResult">The type of result expected from the work.</typeparam>
/// <param name="worker">The <see cref="ISyncWorker{TRequest, TResult}"/> doing the work.</param>
/// <param name="request">The request to be dispatched.</param>
/// <param name="grainCancellationToken">The token for cancelling tasks.</param>
/// <returns>The result of the <see cref="ISyncWorker{TRequest, TResult}"/>.</returns>
/// <exception cref="Exception">Thrown when the <see cref="ISyncWorker{TRequest, TResult}"/> is in a <see cref="SyncWorkStatus.Faulted"/> state.</exception>
public static Task<TResult> StartWorkAndPollUntilResult<TRequest, TResult>(this ISyncWorker<TRequest, TResult> worker, TRequest request, GrainCancellationToken grainCancellationToken)
{
return worker.StartWorkAndPollUntilResult(request, 1000, grainCancellationToken);
}

/// <summary>
Expand All @@ -43,6 +69,29 @@ public static Task<TResult> StartWorkAndPollUntilResult<TRequest, TResult>(this
/// <returns>The result of the <see cref="ISyncWorker{TRequest, TResult}"/>.</returns>
/// <exception cref="Exception">Thrown when the <see cref="ISyncWorker{TRequest, TResult}"/> is in a <see cref="SyncWorkStatus.Faulted"/> state.</exception>
public static async Task<TResult> StartWorkAndPollUntilResult<TRequest, TResult>(this ISyncWorker<TRequest, TResult> worker, TRequest request, int msDelayPerStatusPoll)
{
var grainCancellationToken = new GrainCancellationTokenSource().Token;
return await StartWorkAndPollUntilResult(worker, request, msDelayPerStatusPoll, grainCancellationToken);
}

/// <summary>
/// Starts the work of a <see cref="ISyncWorker{TRequest, TResult}"/>, polls it until a result is available, then returns it.
/// </summary>
/// <remarks>
/// Caution is advised when setting the msDelayPerStatusPoll "too low" - 1000 ms seems to be pretty safe,
/// but if the cluster is under *enough* load, that much grain polling could overwhelm it.
/// </remarks>
/// <typeparam name="TRequest">The type of request being dispatched.</typeparam>
/// <typeparam name="TResult">The type of result expected from the work.</typeparam>
/// <param name="worker">The <see cref="ISyncWorker{TRequest, TResult}"/> doing the work.</param>
/// <param name="request">The request to be dispatched.</param>
/// <param name="msDelayPerStatusPoll">
/// The ms delay per attempt to poll for a <see cref="SyncWorkStatus.Completed"/> or <see cref="SyncWorkStatus.Faulted"/> status.
/// </param>
/// <param name="grainCancellationToken">The token for cancelling tasks.</param>
/// <returns>The result of the <see cref="ISyncWorker{TRequest, TResult}"/>.</returns>
/// <exception cref="Exception">Thrown when the <see cref="ISyncWorker{TRequest, TResult}"/> is in a <see cref="SyncWorkStatus.Faulted"/> state.</exception>
public static async Task<TResult> StartWorkAndPollUntilResult<TRequest, TResult>(this ISyncWorker<TRequest, TResult> worker, TRequest request, int msDelayPerStatusPoll, GrainCancellationToken grainCancellationToken)
{
await worker.Start(request);
await Task.Delay(100);
Expand All @@ -64,7 +113,7 @@ public static async Task<TResult> StartWorkAndPollUntilResult<TRequest, TResult>
case SyncWorkStatus.NotStarted:
throw new InvalidStateException("This shouldn't happen, but if it does, it probably means the cluster may have died and restarted, and/or a timeout occurred and the grain got reinstantiated without firing off the work.");
default:
throw new Exception("How did we even get here...?");
throw new InvalidStateException("How did we even get here...?");
}
}
}
Expand Down
Loading
Loading