diff --git a/protoc-gen-restate-java-blocking/src/main/resources/blockingStub.mustache b/protoc-gen-restate-java-blocking/src/main/resources/blockingStub.mustache index 55a17e6d..f4b26eeb 100644 --- a/protoc-gen-restate-java-blocking/src/main/resources/blockingStub.mustache +++ b/protoc-gen-restate-java-blocking/src/main/resources/blockingStub.mustache @@ -100,8 +100,8 @@ public class {{className}} { @java.lang.Deprecated {{/deprecated}} {{{javadoc}}} - public {{#isOutputEmpty}}void{{/isOutputEmpty}}{{^isOutputEmpty}}{{outputType}}{{/isOutputEmpty}} {{methodName}}(RestateContext context{{^isInputEmpty}}, {{inputType}} request{{/isInputEmpty}}) { - throw new io.grpc.StatusRuntimeException(io.grpc.Status.UNIMPLEMENTED); + public {{#isOutputEmpty}}void{{/isOutputEmpty}}{{^isOutputEmpty}}{{outputType}}{{/isOutputEmpty}} {{methodName}}(RestateContext context{{^isInputEmpty}}, {{inputType}} request{{/isInputEmpty}}) throws dev.restate.sdk.core.TerminalException { + throw new dev.restate.sdk.core.TerminalException(dev.restate.sdk.core.TerminalException.Code.UNIMPLEMENTED); } {{/methods}} diff --git a/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/Entries.java b/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/Entries.java index 42513b5c..659ddc0c 100644 --- a/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/Entries.java +++ b/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/Entries.java @@ -292,7 +292,7 @@ public ReadyResultInternal parseEntryResult(InvokeEntryMessage actual) { if (actual.hasValue()) { return valueParser.apply(actual.getValue()); } - return ReadyResults.failure(Util.toGrpcStatus(actual.getFailure()).asRuntimeException()); + return ReadyResults.failure(Util.toRestateException(actual.getFailure())); } @Override @@ -301,7 +301,7 @@ public ReadyResultInternal parseCompletionResult(CompletionMessage actual) { return valueParser.apply(actual.getValue()); } if (actual.hasFailure()) { - return ReadyResults.failure(Util.toGrpcStatus(actual.getFailure()).asRuntimeException()); + return ReadyResults.failure(Util.toRestateException(actual.getFailure())); } return super.parseCompletionResult(actual); } @@ -352,7 +352,7 @@ public ReadyResultInternal parseEntryResult(AwakeableEntryMessage ac if (actual.hasValue()) { return ReadyResults.success(actual.getValue()); } - return ReadyResults.failure(Util.toGrpcStatus(actual.getFailure()).asRuntimeException()); + return ReadyResults.failure(Util.toRestateException(actual.getFailure())); } @Override @@ -361,7 +361,7 @@ public ReadyResultInternal parseCompletionResult(CompletionMessage a return ReadyResults.success(actual.getValue()); } if (actual.hasFailure()) { - return ReadyResults.failure(Util.toGrpcStatus(actual.getFailure()).asRuntimeException()); + return ReadyResults.failure(Util.toRestateException(actual.getFailure())); } return super.parseCompletionResult(actual); } diff --git a/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/ExecutorSwitchingWrappers.java b/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/ExecutorSwitchingWrappers.java index d195fdc8..158e5568 100644 --- a/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/ExecutorSwitchingWrappers.java +++ b/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/ExecutorSwitchingWrappers.java @@ -2,6 +2,7 @@ import com.google.protobuf.ByteString; import com.google.protobuf.MessageLite; +import dev.restate.sdk.core.TerminalException; import dev.restate.sdk.core.syscalls.*; import io.grpc.MethodDescriptor; import java.time.Duration; @@ -98,7 +99,7 @@ public void writeOutput(T value, SyscallCallback c } @Override - public void writeOutput(Throwable throwable, SyscallCallback callback) { + public void writeOutput(TerminalException throwable, SyscallCallback callback) { syscallsExecutor.execute(() -> syscalls.writeOutput(throwable, callback)); } diff --git a/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/GrpcServerCallListenerAdaptor.java b/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/GrpcServerCallListenerAdaptor.java index 9a932977..8adff937 100644 --- a/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/GrpcServerCallListenerAdaptor.java +++ b/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/GrpcServerCallListenerAdaptor.java @@ -2,6 +2,7 @@ import io.grpc.Metadata; import io.grpc.ServerCall; +import io.grpc.Status; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -67,7 +68,7 @@ private void closeWithException(Throwable e) { serverCall.close(Util.SUSPENDED_STATUS, new Metadata()); } else { LOG.warn("Error when processing the invocation", e); - serverCall.close(Util.toGrpcStatusWrappingUncaught(e), new Metadata()); + serverCall.close(Status.UNKNOWN.withCause(e), new Metadata()); } } } diff --git a/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/InvocationStateMachine.java b/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/InvocationStateMachine.java index 9466132e..653d0112 100644 --- a/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/InvocationStateMachine.java +++ b/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/InvocationStateMachine.java @@ -432,7 +432,7 @@ public void onError(Throwable e) { void completeSideEffectCallbackWithEntry( Java.SideEffectEntryMessage sideEffectEntry, ExitSideEffectSyscallCallback callback) { if (sideEffectEntry.hasFailure()) { - callback.onFailure(Util.toGrpcStatus(sideEffectEntry.getFailure()).asRuntimeException()); + callback.onFailure(Util.toRestateException(sideEffectEntry.getFailure())); } else { callback.onResult(sideEffectEntry.getValue()); } diff --git a/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/ReadyResults.java b/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/ReadyResults.java index 46d55c24..3730070d 100644 --- a/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/ReadyResults.java +++ b/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/ReadyResults.java @@ -1,7 +1,7 @@ package dev.restate.sdk.core.impl; +import dev.restate.sdk.core.TerminalException; import dev.restate.sdk.core.syscalls.ReadyResult; -import io.grpc.StatusRuntimeException; import java.util.function.Function; import javax.annotation.Nullable; @@ -18,7 +18,7 @@ static ReadyResultInternal success(T value) { return new Success<>(value); } - static ReadyResultInternal failure(StatusRuntimeException t) { + static ReadyResultInternal failure(TerminalException t) { return new Failure<>(t); } @@ -54,7 +54,7 @@ public ReadyResult map(Function mapper) { @Nullable @Override - public StatusRuntimeException getFailure() { + public TerminalException getFailure() { return null; } } @@ -89,15 +89,15 @@ public ReadyResult map(Function mapper) { @Nullable @Override - public StatusRuntimeException getFailure() { + public TerminalException getFailure() { return null; } } static class Failure implements ReadyResultInternal { - private final StatusRuntimeException cause; + private final TerminalException cause; - private Failure(StatusRuntimeException cause) { + private Failure(TerminalException cause) { this.cause = cause; } @@ -125,7 +125,7 @@ public ReadyResult map(Function mapper) { @Nullable @Override - public StatusRuntimeException getFailure() { + public TerminalException getFailure() { return cause; } } diff --git a/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/RestateServerCall.java b/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/RestateServerCall.java index cec0545d..99c4b223 100644 --- a/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/RestateServerCall.java +++ b/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/RestateServerCall.java @@ -1,6 +1,7 @@ package dev.restate.sdk.core.impl; import com.google.protobuf.MessageLite; +import dev.restate.sdk.core.TerminalException; import dev.restate.sdk.core.syscalls.SyscallCallback; import io.grpc.Metadata; import io.grpc.MethodDescriptor; @@ -87,18 +88,22 @@ public void close(Status status, Metadata trailers) { // Let's cancel the listener first listener.onCancel(); - if (status.getCause() instanceof UncaughtException) { - // This is the case where we have uncaught exceptions from GrpcServerCallListenerAdaptor - syscalls.fail(status.getCause().getCause()); - } else { + if (Util.isTerminalException(status.getCause())) { syscalls.writeOutput( - status.asRuntimeException(), + (TerminalException) status.getCause(), SyscallCallback.ofVoid( () -> { - LOG.trace("Closed correctly with non ok status {}", status); + LOG.trace("Closed correctly with non ok exception", status.getCause()); syscalls.close(); }, this::onError)); + } else { + if (status.getCause() != null) { + syscalls.fail(status.getCause()); + } else { + // Just propagate cause + syscalls.fail(status.asRuntimeException()); + } } } } diff --git a/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/SyscallsImpl.java b/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/SyscallsImpl.java index 70d27270..6d145233 100644 --- a/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/SyscallsImpl.java +++ b/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/SyscallsImpl.java @@ -9,6 +9,7 @@ import dev.restate.generated.sdk.java.Java; import dev.restate.generated.service.protocol.Protocol; import dev.restate.generated.service.protocol.Protocol.PollInputStreamEntryMessage; +import dev.restate.sdk.core.TerminalException; import dev.restate.sdk.core.impl.DeferredResults.SingleDeferredResultInternal; import dev.restate.sdk.core.impl.Entries.*; import dev.restate.sdk.core.impl.ReadyResults.ReadyResultInternal; @@ -35,7 +36,7 @@ public final class SyscallsImpl implements SyscallsInternal { private final InvocationStateMachine stateMachine; - public SyscallsImpl(InvocationStateMachine stateMachine) { + SyscallsImpl(InvocationStateMachine stateMachine) { this.stateMachine = stateMachine; } @@ -58,7 +59,7 @@ public void writeOutput(T value, SyscallCallback c } @Override - public void writeOutput(Throwable throwable, SyscallCallback callback) { + public void writeOutput(TerminalException throwable, SyscallCallback callback) { LOG.trace("writeOutput failure"); this.writeOutput( Protocol.OutputStreamEntryMessage.newBuilder() @@ -177,14 +178,7 @@ public void exitSideEffectBlockWithException( // If it's a non-terminal exception (such as a protocol exception), // we don't write it but simply throw it if (!isTerminalException(toWrite)) { - // For safety wrt Syscalls API we do this check and wrapping, - // but with the current APIs the exception should always be RuntimeException - // because that's what can be thrown inside a lambda - if (toWrite instanceof RuntimeException) { - throw (RuntimeException) toWrite; - } else { - throw new RuntimeException(toWrite); - } + Util.sneakyThrow(toWrite); } this.stateMachine.exitSideEffectBlock( diff --git a/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/UncaughtException.java b/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/UncaughtException.java deleted file mode 100644 index 70cfe6b4..00000000 --- a/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/UncaughtException.java +++ /dev/null @@ -1,12 +0,0 @@ -package dev.restate.sdk.core.impl; - -/** - * Just a marker exception used to mark an exception as uncaught in {@link - * GrpcServerCallListenerAdaptor}. - */ -class UncaughtException extends RuntimeException { - - UncaughtException(Throwable t) { - super(t); - } -} diff --git a/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/Util.java b/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/Util.java index b0feab46..cc134178 100644 --- a/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/Util.java +++ b/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/Util.java @@ -4,9 +4,8 @@ import dev.restate.generated.sdk.java.Java; import dev.restate.generated.service.protocol.Protocol; import dev.restate.sdk.core.SuspendedException; +import dev.restate.sdk.core.TerminalException; import io.grpc.Status; -import io.grpc.StatusException; -import io.grpc.StatusRuntimeException; import java.util.Objects; import java.util.Optional; import java.util.function.Predicate; @@ -16,6 +15,11 @@ private Util() {} static Status SUSPENDED_STATUS = Status.INTERNAL.withCause(SuspendedException.INSTANCE); + @SuppressWarnings("unchecked") + static void sneakyThrow(Throwable e) throws E { + throw (E) e; + } + /** * Finds a throwable fulfilling the condition in the cause chain of the given throwable. If there * is none, then the method returns an empty optional. @@ -44,20 +48,6 @@ static Optional findCause( return Optional.empty(); } - public static Status toGrpcStatusWrappingUncaught(Throwable t) { - Throwable cause = Objects.requireNonNull(t); - while (cause != null) { - if (cause instanceof StatusException) { - return ((StatusException) cause).getStatus(); - } else if (cause instanceof StatusRuntimeException) { - return ((StatusRuntimeException) cause).getStatus(); - } - cause = cause.getCause(); - } - // Couldn't find a cause with a Status - return Status.UNKNOWN.withCause(new UncaughtException(t)); - } - public static Optional findProtocolException(Throwable throwable) { return findCause(throwable, t -> t instanceof ProtocolException); } @@ -66,45 +56,28 @@ public static boolean containsSuspendedException(Throwable throwable) { return findCause(throwable, t -> t == SuspendedException.INSTANCE).isPresent(); } - static Protocol.Failure toProtocolFailure(Status status) { - Protocol.Failure.Builder builder = - Protocol.Failure.newBuilder().setCode(status.getCode().value()); - if (status.getDescription() != null) { - builder.setMessage(status.getDescription()); + static Protocol.Failure toProtocolFailure(TerminalException.Code code, String message) { + Protocol.Failure.Builder builder = Protocol.Failure.newBuilder().setCode(code.value()); + if (message != null) { + builder.setMessage(message); } return builder.build(); } static Protocol.Failure toProtocolFailure(Throwable throwable) { - return toProtocolFailure(toGrpcStatusErasingCause(throwable)); - } - - static Status toGrpcStatus(Protocol.Failure failure) { - return Status.fromCodeValue(failure.getCode()).withDescription(failure.getMessage()); - } - - static Status toGrpcStatusErasingCause(Throwable throwable) { - Status status; - if (throwable instanceof StatusException) { - status = ((StatusException) throwable).getStatus(); - } else if (throwable instanceof StatusRuntimeException) { - status = ((StatusRuntimeException) throwable).getStatus(); - } else { - return Status.UNKNOWN.withDescription(throwable.getMessage()); + if (throwable instanceof TerminalException) { + return toProtocolFailure(((TerminalException) throwable).getCode(), throwable.getMessage()); } + return toProtocolFailure(TerminalException.Code.UNKNOWN, throwable.toString()); + } - // We erase the cause as it's not stored in the call result structure - // and can cause non-determinism. - // - // We can still set the error message though. - if (status.getDescription() == null && status.getCause() != null) { - status = status.withDescription(status.getCause().toString()); - } - return status.withCause(null); + static TerminalException toRestateException(Protocol.Failure failure) { + return new TerminalException( + TerminalException.Code.fromValue(failure.getCode()), failure.getMessage()); } static boolean isTerminalException(Throwable throwable) { - return throwable instanceof StatusRuntimeException || throwable instanceof StatusException; + return throwable instanceof TerminalException; } static void assertIsEntry(MessageLite msg) { diff --git a/sdk-core-impl/src/test/java/dev/restate/sdk/core/impl/ProtoUtils.java b/sdk-core-impl/src/test/java/dev/restate/sdk/core/impl/ProtoUtils.java index 0ae58fd2..a0458feb 100644 --- a/sdk-core-impl/src/test/java/dev/restate/sdk/core/impl/ProtoUtils.java +++ b/sdk-core-impl/src/test/java/dev/restate/sdk/core/impl/ProtoUtils.java @@ -9,10 +9,10 @@ import dev.restate.generated.sdk.java.Java; import dev.restate.generated.service.protocol.Protocol; import dev.restate.generated.service.protocol.Protocol.StartMessage.StateEntry; +import dev.restate.sdk.core.TerminalException; import dev.restate.sdk.core.impl.testservices.GreetingRequest; import dev.restate.sdk.core.impl.testservices.GreetingResponse; import io.grpc.MethodDescriptor; -import io.grpc.Status; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -74,7 +74,7 @@ public static Protocol.CompletionMessage completionMessage( public static Protocol.CompletionMessage completionMessage(int index, Throwable e) { return Protocol.CompletionMessage.newBuilder() .setEntryIndex(index) - .setFailure(toProtocolFailure(Status.INTERNAL.withDescription(e.getMessage()))) + .setFailure(toProtocolFailure(e)) .build(); } @@ -98,16 +98,15 @@ public static Protocol.OutputStreamEntryMessage outputMessage(MessageLiteOrBuild .build(); } - public static Protocol.OutputStreamEntryMessage outputMessage(Status s) { + public static Protocol.OutputStreamEntryMessage outputMessage( + TerminalException.Code code, String message) { return Protocol.OutputStreamEntryMessage.newBuilder() - .setFailure(Util.toProtocolFailure(s.asRuntimeException())) + .setFailure(Util.toProtocolFailure(code, message)) .build(); } public static Protocol.OutputStreamEntryMessage outputMessage(Throwable e) { - return Protocol.OutputStreamEntryMessage.newBuilder() - .setFailure(toProtocolFailure(Status.INTERNAL.withDescription(e.getMessage()))) - .build(); + return Protocol.OutputStreamEntryMessage.newBuilder().setFailure(toProtocolFailure(e)).build(); } public static Protocol.GetStateEntryMessage.Builder getStateMessage(String key) { @@ -156,9 +155,7 @@ Protocol.InvokeEntryMessage invokeMessage( public static Protocol.InvokeEntryMessage invokeMessage( MethodDescriptor methodDescriptor, T parameter, Throwable e) { - return invokeMessage(methodDescriptor, parameter) - .setFailure(toProtocolFailure(Status.INTERNAL.withDescription(e.getMessage()))) - .build(); + return invokeMessage(methodDescriptor, parameter).setFailure(toProtocolFailure(e)).build(); } public static diff --git a/sdk-core-impl/src/test/java/dev/restate/sdk/core/impl/UserFailuresTestSuite.java b/sdk-core-impl/src/test/java/dev/restate/sdk/core/impl/UserFailuresTestSuite.java index 10d69702..d00500c0 100644 --- a/sdk-core-impl/src/test/java/dev/restate/sdk/core/impl/UserFailuresTestSuite.java +++ b/sdk-core-impl/src/test/java/dev/restate/sdk/core/impl/UserFailuresTestSuite.java @@ -5,25 +5,27 @@ import static dev.restate.sdk.core.impl.TestDefinitions.*; import dev.restate.generated.sdk.java.Java; +import dev.restate.sdk.core.TerminalException; import dev.restate.sdk.core.impl.testservices.GreeterGrpc; import dev.restate.sdk.core.impl.testservices.GreetingRequest; import io.grpc.BindableService; -import io.grpc.Status; import java.util.stream.Stream; public abstract class UserFailuresTestSuite implements TestSuite { - public static final Status INTERNAL_MY_ERROR = Status.INTERNAL.withDescription("my error"); + public static final String MY_ERROR = "my error"; - public static final Status UNKNOWN_MY_ERROR = Status.UNKNOWN.withDescription("Whatever"); + public static final String WHATEVER = "Whatever"; protected abstract BindableService throwIllegalStateException(); protected abstract BindableService sideEffectThrowIllegalStateException(); - protected abstract BindableService throwStatusRuntimeException(Status status); + protected abstract BindableService throwTerminalException( + TerminalException.Code code, String message); - protected abstract BindableService sideEffectThrowStatusRuntimeException(Status status); + protected abstract BindableService sideEffectThrowTerminalException( + TerminalException.Code code, String message); @Override public Stream definitions() { @@ -38,36 +40,39 @@ public Stream definitions() { // Cases completing the invocation with OutputStreamEntry.failure testInvocation( - () -> this.throwStatusRuntimeException(INTERNAL_MY_ERROR), + () -> this.throwTerminalException(TerminalException.Code.INTERNAL, MY_ERROR), GreeterGrpc.getGreetMethod()) .withInput(startMessage(1), inputMessage(GreetingRequest.getDefaultInstance())) - .expectingOutput(outputMessage(INTERNAL_MY_ERROR)) + .expectingOutput(outputMessage(TerminalException.Code.INTERNAL, MY_ERROR)) .named("With internal error"), testInvocation( - () -> this.throwStatusRuntimeException(UNKNOWN_MY_ERROR), + () -> this.throwTerminalException(TerminalException.Code.UNKNOWN, WHATEVER), GreeterGrpc.getGreetMethod()) .withInput(startMessage(1), inputMessage(GreetingRequest.getDefaultInstance())) - .expectingOutput(outputMessage(UNKNOWN_MY_ERROR)) + .expectingOutput(outputMessage(TerminalException.Code.UNKNOWN, WHATEVER)) .named("With unknown error"), testInvocation( - () -> this.sideEffectThrowStatusRuntimeException(INTERNAL_MY_ERROR), + () -> + this.sideEffectThrowTerminalException( + TerminalException.Code.INTERNAL, MY_ERROR), GreeterGrpc.getGreetMethod()) .withInput( startMessage(1), inputMessage(GreetingRequest.getDefaultInstance()), ackMessage(1)) .expectingOutput( Java.SideEffectEntryMessage.newBuilder() - .setFailure(Util.toProtocolFailure(INTERNAL_MY_ERROR)), - outputMessage(INTERNAL_MY_ERROR)) + .setFailure(Util.toProtocolFailure(TerminalException.Code.INTERNAL, MY_ERROR)), + outputMessage(TerminalException.Code.INTERNAL, MY_ERROR)) .named("With internal error"), testInvocation( - () -> this.sideEffectThrowStatusRuntimeException(UNKNOWN_MY_ERROR), + () -> + this.sideEffectThrowTerminalException(TerminalException.Code.UNKNOWN, WHATEVER), GreeterGrpc.getGreetMethod()) .withInput( startMessage(1), inputMessage(GreetingRequest.getDefaultInstance()), ackMessage(1)) .expectingOutput( Java.SideEffectEntryMessage.newBuilder() - .setFailure(Util.toProtocolFailure(UNKNOWN_MY_ERROR)), - outputMessage(UNKNOWN_MY_ERROR)) + .setFailure(Util.toProtocolFailure(TerminalException.Code.UNKNOWN, WHATEVER)), + outputMessage(TerminalException.Code.UNKNOWN, WHATEVER)) .named("With unknown error")); } } diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/TerminalException.java b/sdk-core/src/main/java/dev/restate/sdk/core/TerminalException.java new file mode 100644 index 00000000..3c2af979 --- /dev/null +++ b/sdk-core/src/main/java/dev/restate/sdk/core/TerminalException.java @@ -0,0 +1,104 @@ +package dev.restate.sdk.core; + +/** When thrown in a Restate service method, it will complete the invocation with an error. */ +public class TerminalException extends RuntimeException { + + /** + * @see io.grpc.Status.Code + */ + public enum Code { + OK(0), + CANCELLED(1), + UNKNOWN(2), + INVALID_ARGUMENT(3), + DEADLINE_EXCEEDED(4), + NOT_FOUND(5), + ALREADY_EXISTS(6), + PERMISSION_DENIED(7), + RESOURCE_EXHAUSTED(8), + FAILED_PRECONDITION(9), + ABORTED(10), + OUT_OF_RANGE(11), + UNIMPLEMENTED(12), + INTERNAL(13), + UNAVAILABLE(14), + DATA_LOSS(15), + UNAUTHENTICATED(16); + + private final int value; + + Code(int value) { + this.value = value; + } + + /** The numerical value of the code. */ + public int value() { + return value; + } + + public static Code fromValue(int value) { + switch (value) { + case 0: + return Code.OK; + case 1: + return Code.CANCELLED; + case 2: + return Code.UNKNOWN; + case 3: + return Code.INVALID_ARGUMENT; + case 4: + return Code.DEADLINE_EXCEEDED; + case 5: + return Code.NOT_FOUND; + case 6: + return Code.ALREADY_EXISTS; + case 7: + return Code.PERMISSION_DENIED; + case 8: + return Code.RESOURCE_EXHAUSTED; + case 9: + return Code.FAILED_PRECONDITION; + case 10: + return Code.ABORTED; + case 11: + return Code.OUT_OF_RANGE; + case 12: + return Code.UNIMPLEMENTED; + case 13: + return Code.INTERNAL; + case 14: + return Code.UNAVAILABLE; + case 15: + return Code.DATA_LOSS; + case 16: + return Code.UNAUTHENTICATED; + default: + return Code.UNKNOWN; + } + } + } + + private final Code code; + + public TerminalException() { + this.code = Code.UNKNOWN; + } + + public TerminalException(Code code) { + this.code = code; + } + + public TerminalException(Code code, String message) { + super(message); + this.code = code; + } + + public TerminalException(String message) { + super(message); + this.code = Code.UNKNOWN; + } + + public Code getCode() { + return code; + } +} diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/syscalls/ExitSideEffectSyscallCallback.java b/sdk-core/src/main/java/dev/restate/sdk/core/syscalls/ExitSideEffectSyscallCallback.java index 37bd5dbf..497e4e5b 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/syscalls/ExitSideEffectSyscallCallback.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/syscalls/ExitSideEffectSyscallCallback.java @@ -1,7 +1,7 @@ package dev.restate.sdk.core.syscalls; import com.google.protobuf.ByteString; -import io.grpc.StatusRuntimeException; +import dev.restate.sdk.core.TerminalException; import javax.annotation.Nullable; public interface ExitSideEffectSyscallCallback { @@ -9,7 +9,7 @@ public interface ExitSideEffectSyscallCallback { void onResult(ByteString t); /** This is user failure. */ - void onFailure(StatusRuntimeException t); + void onFailure(TerminalException t); /** * This is internal failure propagation, causing a cancellation of the processing. For example diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/syscalls/ReadyResult.java b/sdk-core/src/main/java/dev/restate/sdk/core/syscalls/ReadyResult.java index 145bc41d..b4b54380 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/syscalls/ReadyResult.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/syscalls/ReadyResult.java @@ -1,6 +1,6 @@ package dev.restate.sdk.core.syscalls; -import io.grpc.StatusRuntimeException; +import dev.restate.sdk.core.TerminalException; import java.util.function.Function; import javax.annotation.Nullable; @@ -35,5 +35,5 @@ public interface ReadyResult { ReadyResult map(Function mapper); @Nullable - StatusRuntimeException getFailure(); + TerminalException getFailure(); } diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/syscalls/Syscalls.java b/sdk-core/src/main/java/dev/restate/sdk/core/syscalls/Syscalls.java index d84841c1..d16f9a6b 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/syscalls/Syscalls.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/syscalls/Syscalls.java @@ -2,6 +2,7 @@ import com.google.protobuf.ByteString; import com.google.protobuf.MessageLite; +import dev.restate.sdk.core.TerminalException; import io.grpc.Context; import io.grpc.MethodDescriptor; import java.time.Duration; @@ -41,7 +42,7 @@ void pollInput( void writeOutput(T value, SyscallCallback callback); - void writeOutput(Throwable throwable, SyscallCallback callback); + void writeOutput(TerminalException exception, SyscallCallback callback); // ----- State diff --git a/sdk-java-blocking/src/main/java/dev/restate/sdk/blocking/Awaitable.java b/sdk-java-blocking/src/main/java/dev/restate/sdk/blocking/Awaitable.java index 4a1c1aef..fc9cafed 100644 --- a/sdk-java-blocking/src/main/java/dev/restate/sdk/blocking/Awaitable.java +++ b/sdk-java-blocking/src/main/java/dev/restate/sdk/blocking/Awaitable.java @@ -1,10 +1,10 @@ package dev.restate.sdk.blocking; import dev.restate.sdk.core.SuspendedException; +import dev.restate.sdk.core.TerminalException; import dev.restate.sdk.core.syscalls.DeferredResult; import dev.restate.sdk.core.syscalls.ReadyResultHolder; import dev.restate.sdk.core.syscalls.Syscalls; -import io.grpc.StatusRuntimeException; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -19,7 +19,7 @@ * execution stops until the asynchronous result is available. * *

The result can be either a success or a failure. In case of a failure, {@code await()} will - * throw a {@link StatusRuntimeException}. + * throw a {@link TerminalException}. * * @param type of the awaitable result */ @@ -46,9 +46,9 @@ Awaitable(Syscalls syscalls, DeferredResult deferredResult, FunctionNOTE: You should never wrap this invocation in a try-catch catching {@link * RuntimeException}, as it will catch {@link SuspendedException} as well. * - * @throws StatusRuntimeException if the awaitable is ready and contains a failure + * @throws TerminalException if the awaitable is ready and contains a failure */ - public T await() throws StatusRuntimeException { + public T await() throws TerminalException { if (!this.resultHolder.isCompleted()) { Util.blockOnSyscall( cb -> syscalls.resolveDeferred(this.resultHolder.getDeferredResult(), cb)); @@ -60,7 +60,7 @@ public T await() throws StatusRuntimeException { * Same as {@link #await()}, but throws a {@link TimeoutException} if this {@link Awaitable} * doesn't complete before the provided {@code timeout}. */ - public T await(Duration timeout) throws StatusRuntimeException, TimeoutException { + public T await(Duration timeout) throws TerminalException, TimeoutException { DeferredResult sleep = Util.blockOnSyscall(cb -> this.syscalls.sleep(timeout, cb)); int index = diff --git a/sdk-java-blocking/src/main/java/dev/restate/sdk/blocking/RestateBlockingService.java b/sdk-java-blocking/src/main/java/dev/restate/sdk/blocking/RestateBlockingService.java index c2a7f5f1..62b2b3d5 100644 --- a/sdk-java-blocking/src/main/java/dev/restate/sdk/blocking/RestateBlockingService.java +++ b/sdk-java-blocking/src/main/java/dev/restate/sdk/blocking/RestateBlockingService.java @@ -6,16 +6,17 @@ /** * Marker interface for Restate blocking services. * + *

+ * *

Error handling

* * The error handling of Restate services works as follows: * *
    - *
  • When throwing {@link io.grpc.StatusException} or {@link io.grpc.StatusRuntimeException}, - * the failure is considered "terminal" and will be used as invocation output + *
  • When throwing {@link dev.restate.sdk.core.TerminalException}, the failure will be used as + * invocation response error value *
  • When throwing any other type of exception, the failure is considered "non-terminal" and the * runtime will retry it, according to its configuration - *
  • In case {@code StreamObserver#onError} is invoked, the failure is considered "terminal" *
*/ public interface RestateBlockingService extends BindableBlockingService { diff --git a/sdk-java-blocking/src/main/java/dev/restate/sdk/blocking/RestateContext.java b/sdk-java-blocking/src/main/java/dev/restate/sdk/blocking/RestateContext.java index a0a6c520..67d4f900 100644 --- a/sdk-java-blocking/src/main/java/dev/restate/sdk/blocking/RestateContext.java +++ b/sdk-java-blocking/src/main/java/dev/restate/sdk/blocking/RestateContext.java @@ -87,6 +87,9 @@ default void sleep(Duration duration) { *

The returned {@link Channel} will execute the requests using the {@link * #call(MethodDescriptor, Object)} method. * + *

Please note that errors will be propagated as {@link dev.restate.sdk.core.TerminalException} + * and not as {@link io.grpc.StatusRuntimeException}. + * * @return a {@link Channel} to send requests through Restate. */ default Channel grpcChannel() { diff --git a/sdk-java-blocking/src/main/java/dev/restate/sdk/blocking/RestateContextImpl.java b/sdk-java-blocking/src/main/java/dev/restate/sdk/blocking/RestateContextImpl.java index 236c09e0..6ead6638 100644 --- a/sdk-java-blocking/src/main/java/dev/restate/sdk/blocking/RestateContextImpl.java +++ b/sdk-java-blocking/src/main/java/dev/restate/sdk/blocking/RestateContextImpl.java @@ -3,10 +3,10 @@ import com.google.protobuf.ByteString; import dev.restate.sdk.core.Serde; import dev.restate.sdk.core.StateKey; +import dev.restate.sdk.core.TerminalException; import dev.restate.sdk.core.function.ThrowingSupplier; import dev.restate.sdk.core.syscalls.*; import io.grpc.MethodDescriptor; -import io.grpc.StatusRuntimeException; import java.time.Duration; import java.util.Map; import java.util.Optional; @@ -87,7 +87,7 @@ public void onResult(ByteString result) { } @Override - public void onFailure(StatusRuntimeException t) { + public void onFailure(TerminalException t) { enterFut.complete(CompletableFuture.failedFuture(t)); } @@ -112,7 +112,7 @@ public void onResult(ByteString result) { } @Override - public void onFailure(StatusRuntimeException t) { + public void onFailure(TerminalException t) { exitFut.completeExceptionally(t); } @@ -140,7 +140,7 @@ public void onCancel(@Nullable Throwable t) { } @Override - public Awakeable awakeable(Serde serde) throws StatusRuntimeException { + public Awakeable awakeable(Serde serde) throws TerminalException { // Retrieve the awakeable Map.Entry> awakeable = Util.blockOnSyscall(syscalls::awakeable); diff --git a/sdk-java-blocking/src/test/java/dev/restate/sdk/blocking/UserFailuresTest.java b/sdk-java-blocking/src/test/java/dev/restate/sdk/blocking/UserFailuresTest.java index 8c4c7469..dbb96956 100644 --- a/sdk-java-blocking/src/test/java/dev/restate/sdk/blocking/UserFailuresTest.java +++ b/sdk-java-blocking/src/test/java/dev/restate/sdk/blocking/UserFailuresTest.java @@ -1,16 +1,16 @@ package dev.restate.sdk.blocking; +import static dev.restate.sdk.core.impl.AssertUtils.containsOnlyExactErrorMessage; import static dev.restate.sdk.core.impl.ProtoUtils.*; import static dev.restate.sdk.core.impl.TestDefinitions.testInvocation; +import dev.restate.sdk.core.TerminalException; import dev.restate.sdk.core.impl.TestDefinitions.TestDefinition; import dev.restate.sdk.core.impl.UserFailuresTestSuite; import dev.restate.sdk.core.impl.testservices.GreeterGrpc; import dev.restate.sdk.core.impl.testservices.GreetingRequest; import dev.restate.sdk.core.impl.testservices.GreetingResponse; import io.grpc.BindableService; -import io.grpc.Status; -import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import java.util.stream.Stream; @@ -46,33 +46,37 @@ protected BindableService sideEffectThrowIllegalStateException() { return new SideEffectThrowIllegalStateException(); } - private static class ThrowStatusRuntimeException extends GreeterGrpc.GreeterImplBase + private static class ThrowTerminalException extends GreeterGrpc.GreeterImplBase implements RestateBlockingService { - private final Status status; + private final TerminalException.Code code; + private final String message; - private ThrowStatusRuntimeException(Status status) { - this.status = status; + public ThrowTerminalException(TerminalException.Code code, String message) { + this.code = code; + this.message = message; } @Override public void greet(GreetingRequest request, StreamObserver responseObserver) { - throw new StatusRuntimeException(status); + throw new TerminalException(code, message); } } @Override - protected BindableService throwStatusRuntimeException(Status status) { - return new ThrowStatusRuntimeException(status); + protected BindableService throwTerminalException(TerminalException.Code code, String message) { + return new ThrowTerminalException(code, message); } - private static class SideEffectThrowStatusRuntimeException extends GreeterGrpc.GreeterImplBase + private static class SideEffectThrowTerminalException extends GreeterGrpc.GreeterImplBase implements RestateBlockingService { - private final Status status; + private final TerminalException.Code code; + private final String message; - private SideEffectThrowStatusRuntimeException(Status status) { - this.status = status; + private SideEffectThrowTerminalException(TerminalException.Code code, String message) { + this.code = code; + this.message = message; } @Override @@ -80,23 +84,24 @@ public void greet(GreetingRequest request, StreamObserver resp restateContext() .sideEffect( () -> { - throw new StatusRuntimeException(status); + throw new TerminalException(code, message); }); } } @Override - protected BindableService sideEffectThrowStatusRuntimeException(Status status) { - return new SideEffectThrowStatusRuntimeException(status); + protected BindableService sideEffectThrowTerminalException( + TerminalException.Code code, String message) { + return new SideEffectThrowTerminalException(code, message); } // -- Response observer is something specific to the sdk-java-blocking interface - private static class ResponseObserverOnErrorStatusRuntimeException - extends GreeterGrpc.GreeterImplBase implements RestateBlockingService { + private static class ResponseObserverOnErrorTerminalException extends GreeterGrpc.GreeterImplBase + implements RestateBlockingService { @Override public void greet(GreetingRequest request, StreamObserver responseObserver) { - responseObserver.onError(new StatusRuntimeException(INTERNAL_MY_ERROR)); + responseObserver.onError(new TerminalException(TerminalException.Code.INTERNAL, MY_ERROR)); } } @@ -114,17 +119,14 @@ public Stream definitions() { super.definitions(), Stream.of( testInvocation( - new ResponseObserverOnErrorStatusRuntimeException(), - GreeterGrpc.getGreetMethod()) + new ResponseObserverOnErrorTerminalException(), GreeterGrpc.getGreetMethod()) .withInput(startMessage(1), inputMessage(GreetingRequest.getDefaultInstance())) - .expectingOutput(outputMessage(INTERNAL_MY_ERROR)), + .expectingOutput(outputMessage(TerminalException.Code.INTERNAL, MY_ERROR)), testInvocation( new ResponseObserverOnErrorIllegalStateException(), GreeterGrpc.getGreetMethod()) .withInput(startMessage(1), inputMessage(GreetingRequest.getDefaultInstance())) - .expectingOutput( - outputMessage( - Status.UNKNOWN.withDescription( - new IllegalStateException("Whatever").toString()))))); + .assertingOutput( + containsOnlyExactErrorMessage(new IllegalStateException("Whatever"))))); } } diff --git a/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/RestateContextImpl.kt b/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/RestateContextImpl.kt index 840ec867..d4630258 100644 --- a/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/RestateContextImpl.kt +++ b/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/RestateContextImpl.kt @@ -8,7 +8,6 @@ import dev.restate.sdk.core.syscalls.EnterSideEffectSyscallCallback import dev.restate.sdk.core.syscalls.ExitSideEffectSyscallCallback import dev.restate.sdk.core.syscalls.Syscalls import io.grpc.MethodDescriptor -import io.grpc.StatusRuntimeException import kotlin.coroutines.resume import kotlin.time.Duration import kotlin.time.toJavaDuration @@ -107,7 +106,7 @@ internal class RestateContextImpl internal constructor(private val syscalls: Sys cont.resume(deferred) } - override fun onFailure(t: StatusRuntimeException) { + override fun onFailure(t: TerminalException) { val deferred: CompletableDeferred = CompletableDeferred() deferred.completeExceptionally(t) cont.resume(deferred) @@ -141,7 +140,7 @@ internal class RestateContextImpl internal constructor(private val syscalls: Sys exitResult.complete(t) } - override fun onFailure(t: StatusRuntimeException) { + override fun onFailure(t: TerminalException) { exitResult.completeExceptionally(t) } diff --git a/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt b/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt index d77d17ab..90d728c9 100644 --- a/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt +++ b/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt @@ -162,7 +162,7 @@ sealed interface RestateContext { * waits until the asynchronous result is available. * * The result can be either a success or a failure. In case of a failure, [await] will throw a - * [StatusRuntimeException]. + * [dev.restate.sdk.core.TerminalException]. * * @param T type of the awaitable result */ diff --git a/sdk-kotlin/src/test/kotlin/dev/restate/sdk/kotlin/StateMachineFailuresTest.kt b/sdk-kotlin/src/test/kotlin/dev/restate/sdk/kotlin/StateMachineFailuresTest.kt index 6c19addd..5bd16ba2 100644 --- a/sdk-kotlin/src/test/kotlin/dev/restate/sdk/kotlin/StateMachineFailuresTest.kt +++ b/sdk-kotlin/src/test/kotlin/dev/restate/sdk/kotlin/StateMachineFailuresTest.kt @@ -31,7 +31,7 @@ class StateMachineFailuresTest : StateMachineFailuresTestSuite() { } override fun getState(): BindableService { - throw UnsupportedOperationException("https://github.com/restatedev/sdk-java/issues/116") + return GetState() } private class SideEffectFailure(private val serde: Serde) : @@ -43,6 +43,6 @@ class StateMachineFailuresTest : StateMachineFailuresTestSuite() { } override fun sideEffectFailure(serde: Serde): BindableService { - throw UnsupportedOperationException("https://github.com/restatedev/sdk-java/issues/116") + return SideEffectFailure(serde) } } diff --git a/sdk-kotlin/src/test/kotlin/dev/restate/sdk/kotlin/UserFailuresTest.kt b/sdk-kotlin/src/test/kotlin/dev/restate/sdk/kotlin/UserFailuresTest.kt index 3071301e..19704239 100644 --- a/sdk-kotlin/src/test/kotlin/dev/restate/sdk/kotlin/UserFailuresTest.kt +++ b/sdk-kotlin/src/test/kotlin/dev/restate/sdk/kotlin/UserFailuresTest.kt @@ -1,12 +1,11 @@ package dev.restate.sdk.kotlin +import dev.restate.sdk.core.TerminalException import dev.restate.sdk.core.impl.UserFailuresTestSuite import dev.restate.sdk.core.impl.testservices.GreeterGrpcKt import dev.restate.sdk.core.impl.testservices.GreetingRequest import dev.restate.sdk.core.impl.testservices.GreetingResponse import io.grpc.BindableService -import io.grpc.Status -import io.grpc.StatusRuntimeException import kotlinx.coroutines.Dispatchers class UserFailuresTest : UserFailuresTestSuite() { @@ -18,7 +17,7 @@ class UserFailuresTest : UserFailuresTestSuite() { } override fun throwIllegalStateException(): BindableService { - throw UnsupportedOperationException("https://github.com/restatedev/sdk-java/issues/116") + return ThrowIllegalStateException() } private class SideEffectThrowIllegalStateException : @@ -33,26 +32,36 @@ class UserFailuresTest : UserFailuresTestSuite() { return SideEffectThrowIllegalStateException() } - private class ThrowStatusRuntimeException(private val status: Status) : - GreeterGrpcKt.GreeterCoroutineImplBase(Dispatchers.Unconfined), RestateCoroutineService { + private class ThrowTerminalException( + private val code: TerminalException.Code, + private val message: String + ) : GreeterGrpcKt.GreeterCoroutineImplBase(Dispatchers.Unconfined), RestateCoroutineService { override suspend fun greet(request: GreetingRequest): GreetingResponse { - throw StatusRuntimeException(status) + throw TerminalException(code, message) } } - override fun throwStatusRuntimeException(status: Status): BindableService { - return ThrowStatusRuntimeException(status) + override fun throwTerminalException( + code: TerminalException.Code, + message: String + ): BindableService { + return ThrowTerminalException(code, message) } - private class SideEffectThrowStatusRuntimeException(private val status: Status) : - GreeterGrpcKt.GreeterCoroutineImplBase(Dispatchers.Unconfined), RestateCoroutineService { + private class SideEffectThrowTerminalException( + private val code: TerminalException.Code, + private val message: String + ) : GreeterGrpcKt.GreeterCoroutineImplBase(Dispatchers.Unconfined), RestateCoroutineService { override suspend fun greet(request: GreetingRequest): GreetingResponse { - restateContext().sideEffect { throw StatusRuntimeException(status) } + restateContext().sideEffect { throw TerminalException(code, message) } throw IllegalStateException("Not expected to reach this point") } } - override fun sideEffectThrowStatusRuntimeException(status: Status): BindableService { - return SideEffectThrowStatusRuntimeException(status) + override fun sideEffectThrowTerminalException( + code: TerminalException.Code, + message: String + ): BindableService { + return SideEffectThrowTerminalException(code, message) } }