Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API improvements #111

Merged
merged 5 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import org.gradle.api.tasks.testing.logging.TestLogEvent

plugins {
java
kotlin("jvm") version "1.6.20" apply false
kotlin("jvm") version "1.8.20" apply false

id("net.ltgt.errorprone") version "3.0.1"
id("com.github.jk1.dependency-license-report") version "2.0"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package dev.restate.sdk.examples

import RestateCoroutineService
import com.google.protobuf.Empty
import dev.restate.sdk.core.StateKey
import dev.restate.sdk.examples.generated.*
import dev.restate.sdk.kotlin.RestateCoroutineService
import dev.restate.sdk.vertx.RestateHttpEndpointBuilder
import io.vertx.core.Vertx
import io.vertx.kotlin.coroutines.dispatcher
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package dev.restate.sdk.examples

import RestateCoroutineService
import com.google.protobuf.Empty
import dev.restate.sdk.core.StateKey
import dev.restate.sdk.examples.generated.*
import dev.restate.sdk.kotlin.RestateCoroutineService
import kotlinx.coroutines.Dispatchers
import org.apache.logging.log4j.LogManager

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* <p>It can be used to let a service wait on a specific condition/result, which is fulfilled by
* another service or by an external system at a later point in time.
*
* <p>For example you can send a Kafka record including the {@link Awakeable#id()}, and then let
* <p>For example, you can send a Kafka record including the {@link Awakeable#id()}, and then let
* another service consume from Kafka the responses of given external system interaction by using
* {@link RestateContext#awakeableHandle(String)}.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package dev.restate.sdk.blocking;

import dev.restate.sdk.core.TypeTag;
import dev.restate.sdk.core.serde.Serde;
import javax.annotation.Nonnull;

/** This class represents a handle to an {@link Awakeable} created in another service. */
Expand All @@ -10,17 +9,16 @@ public interface AwakeableHandle {
/**
* Complete with success the {@link Awakeable}.
*
* @param payload the payload of the response. This can be either {@code byte[]}, {@link
* com.google.protobuf.ByteString}, or any object, which will be serialized by using the
* configured {@link Serde}. MUST NOT be null.
* @param typeTag used to serialize the {@link Awakeable} result payload.
* @param payload the result payload. MUST NOT be null.
* @see Awakeable
*/
<T> void resolve(TypeTag<T> typeTag, @Nonnull T payload);

/**
* Complete with failure the {@link Awakeable}.
*
* @param reason the rejection reason.
* @param reason the rejection reason. MUST NOT be null.
* @see Awakeable
*/
void reject(String reason);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,33 +29,33 @@ public interface RestateContext {
* Gets the state stored under key, deserializing the raw value using the registered {@link Serde}
* in the interceptor.
*
* @param key identifying the state to get and its type
* @param key identifying the state to get and its type.
* @return an {@link Optional} containing the stored state deserialized or an empty {@link
* Optional} if not set yet
* @throws RuntimeException when the state cannot be deserialized
* Optional} if not set yet.
* @throws RuntimeException when the state cannot be deserialized.
*/
<T> Optional<T> get(StateKey<T> key);

/**
* Clears the state stored under key.
*
* @param key identifying the state to clear
* @param key identifying the state to clear.
*/
void clear(StateKey<?> key);

/**
* Sets the given value under the given key, serializing the value using the registered {@link
* Serde} in the interceptor.
*
* @param key identifying the value to store and its type
* @param key identifying the value to store and its type.
* @param value to store under the given key. MUST NOT be null.
*/
<T> void set(StateKey<T> key, @Nonnull T value);

/**
* Causes the current execution of the function invocation to sleep for the given duration.
*
* @param duration for which to sleep
* @param duration for which to sleep.
*/
default void sleep(Duration duration) {
timer(duration).await();
Expand All @@ -65,25 +65,41 @@ default void sleep(Duration duration) {
* Causes the start of a timer for the given duration. You can await on the timer end by invoking
* {@link Awaitable#await()}.
*
* @param duration for which to sleep
* @param duration for which to sleep.
*/
Awaitable<Void> timer(Duration duration);

/**
* Invoke another Restate service method.
*
* @return an {@link Awaitable} that wraps the Restate service method result
* @param methodDescriptor The method descriptor of the method to invoke. This is found in the
* generated `*Grpc` class.
* @param parameter the invocation request parameter.
* @return an {@link Awaitable} that wraps the Restate service method result.
*/
<T extends MessageLite, R extends MessageLite> Awaitable<R> call(
MethodDescriptor<T, R> methodDescriptor, T parameter);

/** Invoke another Restate service in a fire and forget fashion. */
<T extends MessageLite> void backgroundCall(
/**
* Invoke another Restate service without waiting for the response.
*
* @param methodDescriptor The method descriptor of the method to invoke. This is found in the
* generated `*Grpc` class.
* @param parameter the invocation request parameter.
*/
<T extends MessageLite> void oneWayCall(
MethodDescriptor<T, ? extends MessageLite> methodDescriptor, T parameter);

/**
* Similar to {@link #backgroundCall(MethodDescriptor, MessageLite)}, but executes the invocation
* after the provided delay.
* Invoke another Restate service without waiting for the response after the provided {@code
* delay} has elapsed.
*
* <p>This method returns immediately, as the timer is executed and awaited on Restate.
*
* @param methodDescriptor The method descriptor of the method to invoke. This is found in the
* generated {@code *Grpc} class.
* @param parameter the invocation request parameter.
* @param delay time to wait before executing the call.
*/
<T extends MessageLite> void delayedCall(
MethodDescriptor<T, ? extends MessageLite> methodDescriptor, T parameter, Duration delay);
Expand All @@ -99,10 +115,10 @@ default <T> T sideEffect(Class<T> clazz, Supplier<T> action) {
*
* <p>Use this function if you want to perform non-deterministic operations.
*
* @param typeTag the type tag of the return value
* @param action to execute for its side effects
* @param <T> type of the return value
* @return value of the side effect operation
* @param typeTag the type tag of the return value, used to serialize/deserialize it.
* @param action to execute for its side effects.
* @param <T> type of the return value.
* @return value of the side effect operation.
*/
<T> T sideEffect(TypeTag<T> typeTag, Supplier<T> action);

Expand All @@ -129,15 +145,16 @@ default <T> Awakeable<T> awakeable(Class<T> type) {
* service consume from Kafka the responses of given external system interaction by using {@link
* #awakeableHandle(String)}.
*
* @param typeTag the response type tag to use for deserializing
* @return the result value of the external system interaction
* @param typeTag the response type tag to use for deserializing the {@link Awakeable} result.
* @return the {@link Awakeable} to await on.
* @see Awakeable
*/
<T> Awakeable<T> awakeable(TypeTag<T> typeTag);

/**
* Create a new {@link AwakeableHandle} for the provided identifier. You can use it to {@link
* AwakeableHandle#resolve(TypeTag, Object)} the linked {@link Awakeable}.
* AwakeableHandle#resolve(TypeTag, Object)} or {@link AwakeableHandle#reject(String)} the linked
* {@link Awakeable}.
*
* @see Awakeable
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public <T extends MessageLite, R extends MessageLite> Awaitable<R> call(
}

@Override
public <T extends MessageLite> void backgroundCall(
public <T extends MessageLite> void oneWayCall(
MethodDescriptor<T, ? extends MessageLite> methodDescriptor, T parameter) {
Util.<Void>blockOnSyscall(cb -> syscalls.backgroundCall(methodDescriptor, parameter, null, cb));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@
import dev.restate.sdk.core.syscalls.AnyDeferredResult;
import dev.restate.sdk.core.syscalls.DeferredResult;
import dev.restate.sdk.core.syscalls.Syscalls;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;

public interface SyscallsInternal extends Syscalls {

@Override
default AnyDeferredResult createAnyDeferred(Collection<DeferredResult<?>> children) {
default AnyDeferredResult createAnyDeferred(List<DeferredResult<?>> children) {
return DeferredResults.any(
children.stream().map(dr -> (DeferredResultInternal<?>) dr).collect(Collectors.toList()));
}

@Override
default DeferredResult<Void> createAllDeferred(Collection<DeferredResult<?>> children) {
default DeferredResult<Void> createAllDeferred(List<DeferredResult<?>> children) {
return DeferredResults.all(
children.stream().map(dr -> (DeferredResultInternal<?>) dr).collect(Collectors.toList()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private static class SideEffectGuard extends GreeterGrpc.GreeterImplBase
public void greet(GreetingRequest request, StreamObserver<GreetingResponse> responseObserver) {
RestateContext ctx = restateContext();
ctx.sideEffect(
() -> ctx.backgroundCall(GreeterGrpc.getGreetMethod(), greetingRequest("something")));
() -> ctx.oneWayCall(GreeterGrpc.getGreetMethod(), greetingRequest("something")));

throw new IllegalStateException("This point should not be reached");
}
Expand Down
11 changes: 10 additions & 1 deletion sdk-core/src/main/java/dev/restate/sdk/core/TypeTag.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,35 @@
import java.util.function.Function;

/**
* Marker interface to carry non-class type tags and generic at the same time.
* Marker interface defining the concrete type to serialize/deserialize.
*
* <p>It carries non-class type tags and generic at the same time.
*
* <p>This interface can be used by implementations of {@link dev.restate.sdk.core.serde.Serde} to
* support type tags different from {@link Class}, for example to circumvent type erasure (like
* Jackson's TypeReference).
*/
public interface TypeTag<T> {

/** Type tag for void. */
TypeTag<Void> VOID = TypeTag.ofClass(Void.TYPE);

/** Type tag for a byte array. */
TypeTag<byte[]> BYTES = TypeTag.ofClass(byte[].class);

/** Type tag for a UTF_8 encoded string. */
TypeTag<String> STRING_UTF8 =
TypeTag.using(
s -> s.getBytes(StandardCharsets.UTF_8), b -> new String(b, StandardCharsets.UTF_8));

Object get();

/** Create a TypeTag from a class. */
static <T> TypeTag<T> ofClass(Class<T> clazz) {
return () -> clazz;
}

/** Create a type tag providing custom serializer/deserializer functions. */
static <T> TypeTag<T> using(Function<T, byte[]> serializer, Function<byte[], T> deserializer) {
return new CustomSerdeFunctionsTypeTag<>(serializer, deserializer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import io.grpc.Context;
import io.grpc.MethodDescriptor;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
Expand Down Expand Up @@ -88,7 +88,7 @@ <T> void resolveAwakeable(

<T> void resolveDeferred(DeferredResult<T> deferredToResolve, SyscallCallback<Void> callback);

AnyDeferredResult createAnyDeferred(Collection<DeferredResult<?>> children);
AnyDeferredResult createAnyDeferred(List<DeferredResult<?>> children);

DeferredResult<Void> createAllDeferred(Collection<DeferredResult<?>> children);
DeferredResult<Void> createAllDeferred(List<DeferredResult<?>> children);
}
61 changes: 50 additions & 11 deletions sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/Awaitables.kt
Original file line number Diff line number Diff line change
@@ -1,23 +1,17 @@
package dev.restate.sdk.kotlin

import dev.restate.sdk.core.TypeTag
import dev.restate.sdk.core.syscalls.AnyDeferredResult
import dev.restate.sdk.core.syscalls.DeferredResult
import dev.restate.sdk.core.syscalls.Syscalls
import java.util.*
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.suspendCancellableCoroutine

sealed interface Awaitable<T> {
suspend fun await(): T
}

sealed interface Awakeable<T> : Awaitable<T> {
val id: String
}

internal abstract class BaseAwaitableImpl<JAVA_T, KT_T>
internal constructor(
private val syscalls: Syscalls,
protected val deferredResult: DeferredResult<JAVA_T>
internal val syscalls: Syscalls,
internal val deferredResult: DeferredResult<JAVA_T>
) : Awaitable<KT_T> {

abstract fun unpack(): KT_T
Expand Down Expand Up @@ -56,6 +50,45 @@ internal constructor(syscalls: Syscalls, deferredResult: DeferredResult<Void>) :
}
}

internal class AnyAwaitableImpl
internal constructor(syscalls: Syscalls, deferredResult: DeferredResult<Any>) :
BaseAwaitableImpl<Any, Any>(syscalls, deferredResult), AnyAwaitable {
override fun unpack(): Any {
val readyResult = deferredResult.toReadyResult()!!
if (!readyResult.isSuccess) {
throw readyResult.failure!!
}
return readyResult.result!!
}

override suspend fun awaitIndex(): Int {
if (!deferredResult.isCompleted) {
suspendCancellableCoroutine { cont: CancellableContinuation<Unit> ->
syscalls.resolveDeferred(deferredResult, completingUnitContinuation(cont))
}
}

return (deferredResult as AnyDeferredResult).completedIndex().orElseThrow {
IllegalStateException(
"completedIndex is empty when expecting a value. This looks like an SDK bug.")
}
}
}

internal fun wrapAllAwaitable(awaitables: List<Awaitable<*>>): Awaitable<Unit> {
val syscalls = (awaitables.get(0) as BaseAwaitableImpl<*, *>).syscalls
return UnitAwaitableImpl(
syscalls,
syscalls.createAllDeferred(awaitables.map { (it as BaseAwaitableImpl<*, *>).deferredResult }))
}

internal fun wrapAnyAwaitable(awaitables: List<Awaitable<*>>): AnyAwaitable {
val syscalls = (awaitables.get(0) as BaseAwaitableImpl<*, *>).syscalls
return AnyAwaitableImpl(
syscalls,
syscalls.createAnyDeferred(awaitables.map { (it as BaseAwaitableImpl<*, *>).deferredResult }))
}

internal class AwakeableImpl<T>
internal constructor(
syscalls: Syscalls,
Expand All @@ -64,9 +97,15 @@ internal constructor(
) : NonNullAwaitableImpl<T>(syscalls, deferredResult), Awakeable<T>

internal class AwakeableHandleImpl(val syscalls: Syscalls, val id: String) : AwakeableHandle {
override suspend fun <T : Any> complete(typeTag: TypeTag<T>, payload: T) {
override suspend fun <T : Any> resolve(typeTag: TypeTag<T>, payload: T) {
return suspendCancellableCoroutine { cont: CancellableContinuation<Unit> ->
syscalls.resolveAwakeable(id, typeTag, payload, completingUnitContinuation(cont))
}
}

override suspend fun reject(reason: String) {
return suspendCancellableCoroutine { cont: CancellableContinuation<Unit> ->
syscalls.rejectAwakeable(id, reason, completingUnitContinuation(cont))
}
}
}
Loading