diff --git a/build.gradle.kts b/build.gradle.kts index 5d3ee36c..7db96009 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -4,7 +4,7 @@ import org.gradle.api.tasks.testing.logging.TestLogEvent plugins { java - kotlin("jvm") version "1.6.20" apply false + kotlin("jvm") version "1.8.20" apply false id("net.ltgt.errorprone") version "3.0.1" id("com.github.jk1.dependency-license-report") version "2.0" diff --git a/examples/http/src/main/kotlin/dev/restate/sdk/examples/Counter.kt b/examples/http/src/main/kotlin/dev/restate/sdk/examples/Counter.kt index aed02c53..3c594136 100644 --- a/examples/http/src/main/kotlin/dev/restate/sdk/examples/Counter.kt +++ b/examples/http/src/main/kotlin/dev/restate/sdk/examples/Counter.kt @@ -1,9 +1,9 @@ package dev.restate.sdk.examples -import RestateCoroutineService import com.google.protobuf.Empty import dev.restate.sdk.core.StateKey import dev.restate.sdk.examples.generated.* +import dev.restate.sdk.kotlin.RestateCoroutineService import dev.restate.sdk.vertx.RestateHttpEndpointBuilder import io.vertx.core.Vertx import io.vertx.kotlin.coroutines.dispatcher diff --git a/examples/lambda/src/main/kotlin/dev/restate/sdk/examples/Counter.kt b/examples/lambda/src/main/kotlin/dev/restate/sdk/examples/Counter.kt index 5468ad62..18b51ea3 100644 --- a/examples/lambda/src/main/kotlin/dev/restate/sdk/examples/Counter.kt +++ b/examples/lambda/src/main/kotlin/dev/restate/sdk/examples/Counter.kt @@ -1,9 +1,9 @@ package dev.restate.sdk.examples -import RestateCoroutineService import com.google.protobuf.Empty import dev.restate.sdk.core.StateKey import dev.restate.sdk.examples.generated.* +import dev.restate.sdk.kotlin.RestateCoroutineService import kotlinx.coroutines.Dispatchers import org.apache.logging.log4j.LogManager diff --git a/sdk-blocking/src/main/java/dev/restate/sdk/blocking/Awakeable.java b/sdk-blocking/src/main/java/dev/restate/sdk/blocking/Awakeable.java index 46ad1a4a..bfc50e0c 100644 --- a/sdk-blocking/src/main/java/dev/restate/sdk/blocking/Awakeable.java +++ b/sdk-blocking/src/main/java/dev/restate/sdk/blocking/Awakeable.java @@ -11,7 +11,7 @@ *

It can be used to let a service wait on a specific condition/result, which is fulfilled by * another service or by an external system at a later point in time. * - *

For example you can send a Kafka record including the {@link Awakeable#id()}, and then let + *

For example, you can send a Kafka record including the {@link Awakeable#id()}, and then let * another service consume from Kafka the responses of given external system interaction by using * {@link RestateContext#awakeableHandle(String)}. */ diff --git a/sdk-blocking/src/main/java/dev/restate/sdk/blocking/AwakeableHandle.java b/sdk-blocking/src/main/java/dev/restate/sdk/blocking/AwakeableHandle.java index a4f11a51..ec4ed784 100644 --- a/sdk-blocking/src/main/java/dev/restate/sdk/blocking/AwakeableHandle.java +++ b/sdk-blocking/src/main/java/dev/restate/sdk/blocking/AwakeableHandle.java @@ -1,7 +1,6 @@ package dev.restate.sdk.blocking; import dev.restate.sdk.core.TypeTag; -import dev.restate.sdk.core.serde.Serde; import javax.annotation.Nonnull; /** This class represents a handle to an {@link Awakeable} created in another service. */ @@ -10,9 +9,8 @@ public interface AwakeableHandle { /** * Complete with success the {@link Awakeable}. * - * @param payload the payload of the response. This can be either {@code byte[]}, {@link - * com.google.protobuf.ByteString}, or any object, which will be serialized by using the - * configured {@link Serde}. MUST NOT be null. + * @param typeTag used to serialize the {@link Awakeable} result payload. + * @param payload the result payload. MUST NOT be null. * @see Awakeable */ void resolve(TypeTag typeTag, @Nonnull T payload); @@ -20,7 +18,7 @@ public interface AwakeableHandle { /** * Complete with failure the {@link Awakeable}. * - * @param reason the rejection reason. + * @param reason the rejection reason. MUST NOT be null. * @see Awakeable */ void reject(String reason); diff --git a/sdk-blocking/src/main/java/dev/restate/sdk/blocking/RestateContext.java b/sdk-blocking/src/main/java/dev/restate/sdk/blocking/RestateContext.java index 0354a1d8..1d555146 100644 --- a/sdk-blocking/src/main/java/dev/restate/sdk/blocking/RestateContext.java +++ b/sdk-blocking/src/main/java/dev/restate/sdk/blocking/RestateContext.java @@ -29,17 +29,17 @@ public interface RestateContext { * Gets the state stored under key, deserializing the raw value using the registered {@link Serde} * in the interceptor. * - * @param key identifying the state to get and its type + * @param key identifying the state to get and its type. * @return an {@link Optional} containing the stored state deserialized or an empty {@link - * Optional} if not set yet - * @throws RuntimeException when the state cannot be deserialized + * Optional} if not set yet. + * @throws RuntimeException when the state cannot be deserialized. */ Optional get(StateKey key); /** * Clears the state stored under key. * - * @param key identifying the state to clear + * @param key identifying the state to clear. */ void clear(StateKey key); @@ -47,7 +47,7 @@ public interface RestateContext { * Sets the given value under the given key, serializing the value using the registered {@link * Serde} in the interceptor. * - * @param key identifying the value to store and its type + * @param key identifying the value to store and its type. * @param value to store under the given key. MUST NOT be null. */ void set(StateKey key, @Nonnull T value); @@ -55,7 +55,7 @@ public interface RestateContext { /** * Causes the current execution of the function invocation to sleep for the given duration. * - * @param duration for which to sleep + * @param duration for which to sleep. */ default void sleep(Duration duration) { timer(duration).await(); @@ -65,25 +65,41 @@ default void sleep(Duration duration) { * Causes the start of a timer for the given duration. You can await on the timer end by invoking * {@link Awaitable#await()}. * - * @param duration for which to sleep + * @param duration for which to sleep. */ Awaitable timer(Duration duration); /** * Invoke another Restate service method. * - * @return an {@link Awaitable} that wraps the Restate service method result + * @param methodDescriptor The method descriptor of the method to invoke. This is found in the + * generated `*Grpc` class. + * @param parameter the invocation request parameter. + * @return an {@link Awaitable} that wraps the Restate service method result. */ Awaitable call( MethodDescriptor methodDescriptor, T parameter); - /** Invoke another Restate service in a fire and forget fashion. */ - void backgroundCall( + /** + * Invoke another Restate service without waiting for the response. + * + * @param methodDescriptor The method descriptor of the method to invoke. This is found in the + * generated `*Grpc` class. + * @param parameter the invocation request parameter. + */ + void oneWayCall( MethodDescriptor methodDescriptor, T parameter); /** - * Similar to {@link #backgroundCall(MethodDescriptor, MessageLite)}, but executes the invocation - * after the provided delay. + * Invoke another Restate service without waiting for the response after the provided {@code + * delay} has elapsed. + * + *

This method returns immediately, as the timer is executed and awaited on Restate. + * + * @param methodDescriptor The method descriptor of the method to invoke. This is found in the + * generated {@code *Grpc} class. + * @param parameter the invocation request parameter. + * @param delay time to wait before executing the call. */ void delayedCall( MethodDescriptor methodDescriptor, T parameter, Duration delay); @@ -99,10 +115,10 @@ default T sideEffect(Class clazz, Supplier action) { * *

Use this function if you want to perform non-deterministic operations. * - * @param typeTag the type tag of the return value - * @param action to execute for its side effects - * @param type of the return value - * @return value of the side effect operation + * @param typeTag the type tag of the return value, used to serialize/deserialize it. + * @param action to execute for its side effects. + * @param type of the return value. + * @return value of the side effect operation. */ T sideEffect(TypeTag typeTag, Supplier action); @@ -129,15 +145,16 @@ default Awakeable awakeable(Class type) { * service consume from Kafka the responses of given external system interaction by using {@link * #awakeableHandle(String)}. * - * @param typeTag the response type tag to use for deserializing - * @return the result value of the external system interaction + * @param typeTag the response type tag to use for deserializing the {@link Awakeable} result. + * @return the {@link Awakeable} to await on. * @see Awakeable */ Awakeable awakeable(TypeTag typeTag); /** * Create a new {@link AwakeableHandle} for the provided identifier. You can use it to {@link - * AwakeableHandle#resolve(TypeTag, Object)} the linked {@link Awakeable}. + * AwakeableHandle#resolve(TypeTag, Object)} or {@link AwakeableHandle#reject(String)} the linked + * {@link Awakeable}. * * @see Awakeable */ diff --git a/sdk-blocking/src/main/java/dev/restate/sdk/blocking/RestateContextImpl.java b/sdk-blocking/src/main/java/dev/restate/sdk/blocking/RestateContextImpl.java index 2e6f6697..3ed98997 100644 --- a/sdk-blocking/src/main/java/dev/restate/sdk/blocking/RestateContextImpl.java +++ b/sdk-blocking/src/main/java/dev/restate/sdk/blocking/RestateContextImpl.java @@ -59,7 +59,7 @@ public Awaitable call( } @Override - public void backgroundCall( + public void oneWayCall( MethodDescriptor methodDescriptor, T parameter) { Util.blockOnSyscall(cb -> syscalls.backgroundCall(methodDescriptor, parameter, null, cb)); } diff --git a/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/SyscallsInternal.java b/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/SyscallsInternal.java index 4ced86b6..32420385 100644 --- a/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/SyscallsInternal.java +++ b/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/SyscallsInternal.java @@ -4,19 +4,19 @@ import dev.restate.sdk.core.syscalls.AnyDeferredResult; import dev.restate.sdk.core.syscalls.DeferredResult; import dev.restate.sdk.core.syscalls.Syscalls; -import java.util.Collection; +import java.util.List; import java.util.stream.Collectors; public interface SyscallsInternal extends Syscalls { @Override - default AnyDeferredResult createAnyDeferred(Collection> children) { + default AnyDeferredResult createAnyDeferred(List> children) { return DeferredResults.any( children.stream().map(dr -> (DeferredResultInternal) dr).collect(Collectors.toList())); } @Override - default DeferredResult createAllDeferred(Collection> children) { + default DeferredResult createAllDeferred(List> children) { return DeferredResults.all( children.stream().map(dr -> (DeferredResultInternal) dr).collect(Collectors.toList())); } diff --git a/sdk-core-impl/src/test/java/dev/restate/sdk/core/impl/SideEffectTest.java b/sdk-core-impl/src/test/java/dev/restate/sdk/core/impl/SideEffectTest.java index 463ba14a..e14faa42 100644 --- a/sdk-core-impl/src/test/java/dev/restate/sdk/core/impl/SideEffectTest.java +++ b/sdk-core-impl/src/test/java/dev/restate/sdk/core/impl/SideEffectTest.java @@ -93,7 +93,7 @@ private static class SideEffectGuard extends GreeterGrpc.GreeterImplBase public void greet(GreetingRequest request, StreamObserver responseObserver) { RestateContext ctx = restateContext(); ctx.sideEffect( - () -> ctx.backgroundCall(GreeterGrpc.getGreetMethod(), greetingRequest("something"))); + () -> ctx.oneWayCall(GreeterGrpc.getGreetMethod(), greetingRequest("something"))); throw new IllegalStateException("This point should not be reached"); } diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/TypeTag.java b/sdk-core/src/main/java/dev/restate/sdk/core/TypeTag.java index 1117a680..645a6a07 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/TypeTag.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/TypeTag.java @@ -5,7 +5,9 @@ import java.util.function.Function; /** - * Marker interface to carry non-class type tags and generic at the same time. + * Marker interface defining the concrete type to serialize/deserialize. + * + *

It carries non-class type tags and generic at the same time. * *

This interface can be used by implementations of {@link dev.restate.sdk.core.serde.Serde} to * support type tags different from {@link Class}, for example to circumvent type erasure (like @@ -13,18 +15,25 @@ */ public interface TypeTag { + /** Type tag for void. */ TypeTag VOID = TypeTag.ofClass(Void.TYPE); + + /** Type tag for a byte array. */ TypeTag BYTES = TypeTag.ofClass(byte[].class); + + /** Type tag for a UTF_8 encoded string. */ TypeTag STRING_UTF8 = TypeTag.using( s -> s.getBytes(StandardCharsets.UTF_8), b -> new String(b, StandardCharsets.UTF_8)); Object get(); + /** Create a TypeTag from a class. */ static TypeTag ofClass(Class clazz) { return () -> clazz; } + /** Create a type tag providing custom serializer/deserializer functions. */ static TypeTag using(Function serializer, Function deserializer) { return new CustomSerdeFunctionsTypeTag<>(serializer, deserializer); } diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/syscalls/Syscalls.java b/sdk-core/src/main/java/dev/restate/sdk/core/syscalls/Syscalls.java index e3a23081..3a56f570 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/syscalls/Syscalls.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/syscalls/Syscalls.java @@ -6,7 +6,7 @@ import io.grpc.Context; import io.grpc.MethodDescriptor; import java.time.Duration; -import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.function.Function; @@ -88,7 +88,7 @@ void resolveAwakeable( void resolveDeferred(DeferredResult deferredToResolve, SyscallCallback callback); - AnyDeferredResult createAnyDeferred(Collection> children); + AnyDeferredResult createAnyDeferred(List> children); - DeferredResult createAllDeferred(Collection> children); + DeferredResult createAllDeferred(List> children); } diff --git a/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/Awaitables.kt b/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/Awaitables.kt index 4b2b082e..98de977b 100644 --- a/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/Awaitables.kt +++ b/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/Awaitables.kt @@ -1,23 +1,17 @@ package dev.restate.sdk.kotlin import dev.restate.sdk.core.TypeTag +import dev.restate.sdk.core.syscalls.AnyDeferredResult import dev.restate.sdk.core.syscalls.DeferredResult import dev.restate.sdk.core.syscalls.Syscalls +import java.util.* import kotlinx.coroutines.CancellableContinuation import kotlinx.coroutines.suspendCancellableCoroutine -sealed interface Awaitable { - suspend fun await(): T -} - -sealed interface Awakeable : Awaitable { - val id: String -} - internal abstract class BaseAwaitableImpl internal constructor( - private val syscalls: Syscalls, - protected val deferredResult: DeferredResult + internal val syscalls: Syscalls, + internal val deferredResult: DeferredResult ) : Awaitable { abstract fun unpack(): KT_T @@ -56,6 +50,45 @@ internal constructor(syscalls: Syscalls, deferredResult: DeferredResult) : } } +internal class AnyAwaitableImpl +internal constructor(syscalls: Syscalls, deferredResult: DeferredResult) : + BaseAwaitableImpl(syscalls, deferredResult), AnyAwaitable { + override fun unpack(): Any { + val readyResult = deferredResult.toReadyResult()!! + if (!readyResult.isSuccess) { + throw readyResult.failure!! + } + return readyResult.result!! + } + + override suspend fun awaitIndex(): Int { + if (!deferredResult.isCompleted) { + suspendCancellableCoroutine { cont: CancellableContinuation -> + syscalls.resolveDeferred(deferredResult, completingUnitContinuation(cont)) + } + } + + return (deferredResult as AnyDeferredResult).completedIndex().orElseThrow { + IllegalStateException( + "completedIndex is empty when expecting a value. This looks like an SDK bug.") + } + } +} + +internal fun wrapAllAwaitable(awaitables: List>): Awaitable { + val syscalls = (awaitables.get(0) as BaseAwaitableImpl<*, *>).syscalls + return UnitAwaitableImpl( + syscalls, + syscalls.createAllDeferred(awaitables.map { (it as BaseAwaitableImpl<*, *>).deferredResult })) +} + +internal fun wrapAnyAwaitable(awaitables: List>): AnyAwaitable { + val syscalls = (awaitables.get(0) as BaseAwaitableImpl<*, *>).syscalls + return AnyAwaitableImpl( + syscalls, + syscalls.createAnyDeferred(awaitables.map { (it as BaseAwaitableImpl<*, *>).deferredResult })) +} + internal class AwakeableImpl internal constructor( syscalls: Syscalls, @@ -64,9 +97,15 @@ internal constructor( ) : NonNullAwaitableImpl(syscalls, deferredResult), Awakeable internal class AwakeableHandleImpl(val syscalls: Syscalls, val id: String) : AwakeableHandle { - override suspend fun complete(typeTag: TypeTag, payload: T) { + override suspend fun resolve(typeTag: TypeTag, payload: T) { return suspendCancellableCoroutine { cont: CancellableContinuation -> syscalls.resolveAwakeable(id, typeTag, payload, completingUnitContinuation(cont)) } } + + override suspend fun reject(reason: String) { + return suspendCancellableCoroutine { cont: CancellableContinuation -> + syscalls.rejectAwakeable(id, reason, completingUnitContinuation(cont)) + } + } } diff --git a/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/RestateContext.kt b/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/RestateContext.kt deleted file mode 100644 index 6813d03d..00000000 --- a/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/RestateContext.kt +++ /dev/null @@ -1,62 +0,0 @@ -package dev.restate.sdk.kotlin - -import com.google.protobuf.MessageLite -import dev.restate.sdk.core.StateKey -import dev.restate.sdk.core.TypeTag -import io.grpc.MethodDescriptor -import kotlin.time.Duration - -sealed interface RestateContext { - - suspend fun get(key: StateKey): T? - - suspend fun set(key: StateKey, value: T) - - suspend fun clear(key: StateKey<*>) - - suspend fun sleep(duration: Duration) { - timer(duration).await() - } - - suspend fun timer(duration: Duration): Awaitable - - suspend fun call( - methodDescriptor: MethodDescriptor, - parameter: T - ): R { - return callAsync(methodDescriptor, parameter).await() - } - - suspend fun callAsync( - methodDescriptor: MethodDescriptor, - parameter: T - ): Awaitable - - suspend fun backgroundCall( - methodDescriptor: MethodDescriptor, - parameter: T - ) - - suspend fun delayedCall( - methodDescriptor: MethodDescriptor, - parameter: T, - delay: Duration - ) - - suspend fun sideEffect(typeTag: TypeTag, sideEffectAction: suspend () -> T?): T? - - suspend fun sideEffect(sideEffectAction: suspend () -> Unit) { - sideEffect(TypeTag.VOID) { - sideEffectAction() - null - } - } - - suspend fun awakeable(typeTag: TypeTag): Awakeable - - fun awakeableHandle(id: String): AwakeableHandle -} - -sealed interface AwakeableHandle { - suspend fun complete(typeTag: TypeTag, payload: T) -} diff --git a/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/RestateContextImpl.kt b/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/RestateContextImpl.kt index 3cf3e5b1..1e083b2d 100644 --- a/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/RestateContextImpl.kt +++ b/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/RestateContextImpl.kt @@ -70,7 +70,7 @@ internal class RestateContextImpl internal constructor(private val syscalls: Sys return NonNullAwaitableImpl(syscalls, deferredResult) } - override suspend fun backgroundCall( + override suspend fun oneWayCall( methodDescriptor: MethodDescriptor, parameter: T ) { diff --git a/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/RestateCoroutineService.kt b/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/RestateCoroutineService.kt deleted file mode 100644 index e61e20e6..00000000 --- a/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/RestateCoroutineService.kt +++ /dev/null @@ -1,10 +0,0 @@ -import dev.restate.sdk.core.BindableNonBlockingService -import dev.restate.sdk.core.syscalls.Syscalls -import dev.restate.sdk.kotlin.RestateContext -import dev.restate.sdk.kotlin.RestateContextImpl - -interface RestateCoroutineService : BindableNonBlockingService { - fun restateContext(): RestateContext { - return RestateContextImpl(Syscalls.SYSCALLS_KEY.get()) - } -} diff --git a/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt b/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt new file mode 100644 index 00000000..ff92346e --- /dev/null +++ b/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt @@ -0,0 +1,260 @@ +package dev.restate.sdk.kotlin + +import com.google.protobuf.MessageLite +import dev.restate.sdk.core.BindableNonBlockingService +import dev.restate.sdk.core.StateKey +import dev.restate.sdk.core.TypeTag +import dev.restate.sdk.core.syscalls.Syscalls +import io.grpc.MethodDescriptor +import java.util.* +import kotlin.time.Duration + +/** + * This interface exposes the Restate functionalities to Restate services. It can be used to access + * the service instance key-value state storage, interact with other Restate services, record side + * effects, execute timers and synchronize with external systems. + * + * To use it within your Restate service, implement [RestateCoroutineService] and get an instance + * with [RestateCoroutineService.restateContext]. + * + * NOTE: This interface should never be accessed concurrently since it can lead to different + * orderings of user actions, corrupting the execution of the invocation. + */ +sealed interface RestateContext { + + /** + * Gets the state stored under key, deserializing the raw value using the registered + * [dev.restate.sdk.core.serde.Serde] in the interceptor. + * + * @param key identifying the state to get and its type. + * @return the value containing the stored state deserialized. + * @throws RuntimeException when the state cannot be deserialized. + */ + suspend fun get(key: StateKey): T? + + /** + * Sets the given value under the given key, serializing the value using the registered + * [dev.restate.sdk.core.serde.Serde] in the interceptor. + * + * @param key identifying the value to store and its type. + * @param value to store under the given key. + */ + suspend fun set(key: StateKey, value: T) + + /** + * Clears the state stored under key. + * + * @param key identifying the state to clear. + */ + suspend fun clear(key: StateKey<*>) + + /** + * Causes the current execution of the function invocation to sleep for the given duration. + * + * @param duration for which to sleep. + */ + suspend fun sleep(duration: Duration) { + timer(duration).await() + } + + /** + * Causes the start of a timer for the given duration. You can await on the timer end by invoking + * [Awaitable.await]. + * + * @param duration for which to sleep. + */ + suspend fun timer(duration: Duration): Awaitable + + /** + * Invoke another Restate service method and wait for the response. Same as + * `call(methodDescriptor, parameter).await()`. + * + * @param methodDescriptor The method descriptor of the method to invoke. This is found in the + * generated `*Grpc` class. + * @param parameter the invocation request parameter. + * @return the invocation response. + */ + suspend fun call( + methodDescriptor: MethodDescriptor, + parameter: T + ): R { + return callAsync(methodDescriptor, parameter).await() + } + + /** + * Invoke another Restate service method. + * + * @param methodDescriptor The method descriptor of the method to invoke. This is found in the + * generated `*Grpc` class. + * @param parameter the invocation request parameter. + * @return an [Awaitable] that wraps the Restate service method result. + */ + suspend fun callAsync( + methodDescriptor: MethodDescriptor, + parameter: T + ): Awaitable + + /** + * Invoke another Restate service without waiting for the response. + * + * @param methodDescriptor The method descriptor of the method to invoke. This is found in the + * generated `*Grpc` class. + * @param parameter the invocation request parameter. + */ + suspend fun oneWayCall( + methodDescriptor: MethodDescriptor, + parameter: T + ) + + /** + * Invoke another Restate service without waiting for the response after the provided `delay` has + * elapsed. + * + * This method returns immediately, as the timer is executed and awaited on Restate. + * + * @param methodDescriptor The method descriptor of the method to invoke. This is found in the + * generated `*Grpc` class. + * @param parameter the invocation request parameter. + * @param delay time to wait before executing the call + */ + suspend fun delayedCall( + methodDescriptor: MethodDescriptor, + parameter: T, + delay: Duration + ) + + /** + * Registers side effects that will be re-played in case of re-invocation (e.g. because of failure + * recovery or suspension point). + * + *

Use this function if you want to perform non-deterministic operations. + * + * @param typeTag the type tag of the return value, used to serialize/deserialize it. + * @param sideEffectAction to execute for its side effects. + * @param T type of the return value. + * @return value of the side effect operation. + */ + suspend fun sideEffect(typeTag: TypeTag, sideEffectAction: suspend () -> T?): T? + + /** Like [sideEffect] without a return value. */ + suspend fun sideEffect(sideEffectAction: suspend () -> Unit) { + sideEffect(TypeTag.VOID) { + sideEffectAction() + null + } + } + + /** + * Create an [Awakeable], addressable through [Awakeable.id]. + * + * You can use this feature to implement external asynchronous systems interactions, for example + * you can send a Kafka record including the [Awakeable.id], and then let another service consume + * from Kafka the responses of given external system interaction by using [awakeableHandle]. + * + * @param typeTag the response type tag to use for deserializing the [Awakeable] result. + * @return the [Awakeable] to await on. + * @see Awakeable + */ + suspend fun awakeable(typeTag: TypeTag): Awakeable + + /** + * Create a new [AwakeableHandle] for the provided identifier. You can use it to + * [AwakeableHandle.resolve] or [AwakeableHandle.reject] the linked [Awakeable]. + * + * @see Awakeable + */ + fun awakeableHandle(id: String): AwakeableHandle +} + +/** + * An [Awaitable] allows to await an asynchronous result. Once [await] is called, the execution + * waits until the asynchronous result is available. + * + * The result can be either a success or a failure. In case of a failure, [await] will throw a + * [StatusRuntimeException]. + * + * @param T type of the awaitable result + */ +sealed interface Awaitable { + suspend fun await(): T + + companion object { + fun all( + first: Awaitable<*>, + second: Awaitable<*>, + vararg others: Awaitable<*> + ): Awaitable { + return wrapAllAwaitable(listOf(first) + listOf(second) + others.asList()) + } + + fun any(first: Awaitable<*>, second: Awaitable<*>, vararg others: Awaitable<*>): AnyAwaitable { + return wrapAnyAwaitable(listOf(first) + listOf(second) + others.asList()) + } + } +} + +/** Like [kotlinx.coroutines.awaitAll], but for [Awaitable]. */ +suspend fun Collection>.awaitAll(): List { + return awaitAll(*toTypedArray()) +} + +/** Like [kotlinx.coroutines.awaitAll], but for [Awaitable]. */ +suspend fun awaitAll(vararg awaitables: Awaitable): List { + if (awaitables.isEmpty()) { + return emptyList() + } + if (awaitables.size == 1) { + return listOf(awaitables[0].await()) + } + wrapAllAwaitable(awaitables.asList()).await() + return awaitables.map { it.await() }.toList() +} + +sealed interface AnyAwaitable : Awaitable { + /** Same as [Awaitable.await], but returns the index of the first completed element. */ + suspend fun awaitIndex(): Int +} + +/** + * An [Awakeable] is a special type of [Awaitable] which can be arbitrarily completed by another + * service, by addressing it with its [id]. + * + * It can be used to let a service wait on a specific condition/result, which is fulfilled by + * another service or by an external system at a later point in time. + * + * For example, you can send a Kafka record including the [Awakeable.id], and then let another + * service consume from Kafka the responses of given external system interaction by using + * [RestateContext.awakeableHandle]. + */ +sealed interface Awakeable : Awaitable { + /** The unique identifier of this [Awakeable] instance. */ + val id: String +} + +/** This class represents a handle to an [Awakeable] created in another service. */ +sealed interface AwakeableHandle { + /** + * Complete with success the [Awakeable]. + * + * @param typeTag used to serialize the [Awakeable] result payload. + * @param payload the result payload. + * @see Awakeable + */ + suspend fun resolve(typeTag: TypeTag, payload: T) + + /** + * Complete with failure the [Awakeable]. + * + * @param reason the rejection reason. + * @see Awakeable + */ + suspend fun reject(reason: String) +} + +/** Marker interface for Kotlin Restate coroutine services. */ +interface RestateCoroutineService : BindableNonBlockingService { + /** @return an instance of the [RestateContext]. */ + fun restateContext(): RestateContext { + return RestateContextImpl(Syscalls.SYSCALLS_KEY.get()) + } +} diff --git a/sdk-lambda/src/test/kotlin/dev/restate/sdk/lambda/testservices/KotlinCounterService.kt b/sdk-lambda/src/test/kotlin/dev/restate/sdk/lambda/testservices/KotlinCounterService.kt index a1c227c4..1955bcec 100644 --- a/sdk-lambda/src/test/kotlin/dev/restate/sdk/lambda/testservices/KotlinCounterService.kt +++ b/sdk-lambda/src/test/kotlin/dev/restate/sdk/lambda/testservices/KotlinCounterService.kt @@ -1,6 +1,6 @@ package dev.restate.sdk.lambda.testservices -import RestateCoroutineService +import dev.restate.sdk.kotlin.RestateCoroutineService import kotlinx.coroutines.Dispatchers class KotlinCounterService : diff --git a/sdk-testing/src/test/java/dev/restate/sdk/testing/services/GreeterOne.java b/sdk-testing/src/test/java/dev/restate/sdk/testing/services/GreeterOne.java index 6472b712..7bd0ed75 100644 --- a/sdk-testing/src/test/java/dev/restate/sdk/testing/services/GreeterOne.java +++ b/sdk-testing/src/test/java/dev/restate/sdk/testing/services/GreeterOne.java @@ -116,7 +116,7 @@ public void forwardBackgroundGreeting( LOG.debug("Executing the GreeterOne.forwardBackgroundGreeting method"); RestateContext ctx = restateContext(); - ctx.backgroundCall( + ctx.oneWayCall( GreeterTwoGrpc.getCountForwardedGreetingsMethod(), GreeterTwoRequest.newBuilder().setName(request.getName()).build()); @@ -206,7 +206,7 @@ public void sleepAndGetWokenUp( // Tell GreeterTwo to wake us up with the awakeable identifier. AwakeServiceRequest info = AwakeServiceRequest.newBuilder().setId(a1.id()).build(); - ctx.backgroundCall(AwakeServiceGrpc.getAwakeMethod(), info); + ctx.oneWayCall(AwakeServiceGrpc.getAwakeMethod(), info); // Suspend until GreeterTwo wakes us up. String output = a1.await(); diff --git a/sdk-vertx/src/main/java/dev/restate/sdk/vertx/RestateHttpEndpointBuilder.java b/sdk-vertx/src/main/java/dev/restate/sdk/vertx/RestateHttpEndpointBuilder.java index 96ae2f40..d6b120fd 100644 --- a/sdk-vertx/src/main/java/dev/restate/sdk/vertx/RestateHttpEndpointBuilder.java +++ b/sdk-vertx/src/main/java/dev/restate/sdk/vertx/RestateHttpEndpointBuilder.java @@ -47,6 +47,11 @@ private RestateHttpEndpointBuilder(Vertx vertx) { this.vertx = vertx; } + /** Create a new builder. */ + public static RestateHttpEndpointBuilder builder() { + return new RestateHttpEndpointBuilder(Vertx.vertx()); + } + /** Create a new builder. */ public static RestateHttpEndpointBuilder builder(Vertx vertx) { return new RestateHttpEndpointBuilder(vertx); diff --git a/sdk-vertx/src/test/kotlin/dev/restate/sdk/vertx/testservices/GreeterKtService.kt b/sdk-vertx/src/test/kotlin/dev/restate/sdk/vertx/testservices/GreeterKtService.kt index b8ad1788..6d075ea8 100644 --- a/sdk-vertx/src/test/kotlin/dev/restate/sdk/vertx/testservices/GreeterKtService.kt +++ b/sdk-vertx/src/test/kotlin/dev/restate/sdk/vertx/testservices/GreeterKtService.kt @@ -1,10 +1,10 @@ package dev.restate.sdk.vertx.testservices -import RestateCoroutineService import dev.restate.sdk.core.impl.testservices.GreeterGrpcKt import dev.restate.sdk.core.impl.testservices.GreetingRequest import dev.restate.sdk.core.impl.testservices.GreetingResponse import dev.restate.sdk.core.impl.testservices.greetingResponse +import dev.restate.sdk.kotlin.RestateCoroutineService import kotlin.coroutines.CoroutineContext import kotlin.time.Duration.Companion.seconds diff --git a/settings.gradle.kts b/settings.gradle.kts index e677a698..7d2aac71 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -26,11 +26,11 @@ dependencyResolutionManagement { versionCatalogs { create("coreLibs") { - version("protobuf", "3.21.9") - version("grpc", "1.50.2") - version("grpckt", "1.3.0") - version("log4j", "2.19.0") - version("opentelemetry", "1.19.0") + version("protobuf", "3.24.3") + version("grpc", "1.58.0") + version("grpckt", "1.4.0") + version("log4j", "2.20.0") + version("opentelemetry", "1.30.1") library("protoc", "com.google.protobuf", "protoc").versionRef("protobuf") library("protobuf-java", "com.google.protobuf", "protobuf-java").versionRef("protobuf") @@ -54,10 +54,10 @@ dependencyResolutionManagement { library("javax-annotation-api", "org.apache.tomcat", "annotations-api").version("6.0.53") } create("vertxLibs") { - library("vertx-bom", "io.vertx:vertx-stack-depchain:4.4.4") + library("vertx-bom", "io.vertx:vertx-stack-depchain:4.4.5") library("vertx-core", "io.vertx", "vertx-core").withoutVersion() library("vertx-grpc-context-storage", "io.vertx", "vertx-grpc-context-storage") - .version("4.4.4") + .version("4.4.5") library("vertx-kotlin-coroutines", "io.vertx", "vertx-lang-kotlin-coroutines") .withoutVersion() library("vertx-junit5", "io.vertx", "vertx-junit5").withoutVersion() @@ -67,7 +67,7 @@ dependencyResolutionManagement { library("events", "com.amazonaws:aws-lambda-java-events:3.11.0") } create("jacksonLibs") { - version("jackson", "2.14.0") + version("jackson", "2.15.2") library("jackson-bom", "com.fasterxml.jackson", "jackson-bom").versionRef("jackson") library("jackson-core", "com.fasterxml.jackson.core", "jackson-core").withoutVersion() @@ -78,7 +78,7 @@ dependencyResolutionManagement { } create("kotlinLibs") { library("kotlinx-coroutines", "org.jetbrains.kotlinx", "kotlinx-coroutines-core") - .version("1.6.4") + .version("1.7.3") } create("testingLibs") { version("junit-jupiter", "5.9.1")