Skip to content

Commit

Permalink
API improvements (#111)
Browse files Browse the repository at this point in the history
* Improvements to the Kotlin API, including KDocs and any/all interface

* backgroundCall -> oneWayCall

* Collection -> List because order of deferred matters.

* Add RestateHttpEndpointBuilder#builder()

* Dependency bumps
  • Loading branch information
slinkydeveloper authored Sep 29, 2023
1 parent f39ef15 commit 2c8ed80
Show file tree
Hide file tree
Showing 21 changed files with 390 additions and 134 deletions.
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import org.gradle.api.tasks.testing.logging.TestLogEvent

plugins {
java
kotlin("jvm") version "1.6.20" apply false
kotlin("jvm") version "1.8.20" apply false

id("net.ltgt.errorprone") version "3.0.1"
id("com.github.jk1.dependency-license-report") version "2.0"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package dev.restate.sdk.examples

import RestateCoroutineService
import com.google.protobuf.Empty
import dev.restate.sdk.core.StateKey
import dev.restate.sdk.examples.generated.*
import dev.restate.sdk.kotlin.RestateCoroutineService
import dev.restate.sdk.vertx.RestateHttpEndpointBuilder
import io.vertx.core.Vertx
import io.vertx.kotlin.coroutines.dispatcher
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package dev.restate.sdk.examples

import RestateCoroutineService
import com.google.protobuf.Empty
import dev.restate.sdk.core.StateKey
import dev.restate.sdk.examples.generated.*
import dev.restate.sdk.kotlin.RestateCoroutineService
import kotlinx.coroutines.Dispatchers
import org.apache.logging.log4j.LogManager

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* <p>It can be used to let a service wait on a specific condition/result, which is fulfilled by
* another service or by an external system at a later point in time.
*
* <p>For example you can send a Kafka record including the {@link Awakeable#id()}, and then let
* <p>For example, you can send a Kafka record including the {@link Awakeable#id()}, and then let
* another service consume from Kafka the responses of given external system interaction by using
* {@link RestateContext#awakeableHandle(String)}.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package dev.restate.sdk.blocking;

import dev.restate.sdk.core.TypeTag;
import dev.restate.sdk.core.serde.Serde;
import javax.annotation.Nonnull;

/** This class represents a handle to an {@link Awakeable} created in another service. */
Expand All @@ -10,17 +9,16 @@ public interface AwakeableHandle {
/**
* Complete with success the {@link Awakeable}.
*
* @param payload the payload of the response. This can be either {@code byte[]}, {@link
* com.google.protobuf.ByteString}, or any object, which will be serialized by using the
* configured {@link Serde}. MUST NOT be null.
* @param typeTag used to serialize the {@link Awakeable} result payload.
* @param payload the result payload. MUST NOT be null.
* @see Awakeable
*/
<T> void resolve(TypeTag<T> typeTag, @Nonnull T payload);

/**
* Complete with failure the {@link Awakeable}.
*
* @param reason the rejection reason.
* @param reason the rejection reason. MUST NOT be null.
* @see Awakeable
*/
void reject(String reason);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,33 +29,33 @@ public interface RestateContext {
* Gets the state stored under key, deserializing the raw value using the registered {@link Serde}
* in the interceptor.
*
* @param key identifying the state to get and its type
* @param key identifying the state to get and its type.
* @return an {@link Optional} containing the stored state deserialized or an empty {@link
* Optional} if not set yet
* @throws RuntimeException when the state cannot be deserialized
* Optional} if not set yet.
* @throws RuntimeException when the state cannot be deserialized.
*/
<T> Optional<T> get(StateKey<T> key);

/**
* Clears the state stored under key.
*
* @param key identifying the state to clear
* @param key identifying the state to clear.
*/
void clear(StateKey<?> key);

/**
* Sets the given value under the given key, serializing the value using the registered {@link
* Serde} in the interceptor.
*
* @param key identifying the value to store and its type
* @param key identifying the value to store and its type.
* @param value to store under the given key. MUST NOT be null.
*/
<T> void set(StateKey<T> key, @Nonnull T value);

/**
* Causes the current execution of the function invocation to sleep for the given duration.
*
* @param duration for which to sleep
* @param duration for which to sleep.
*/
default void sleep(Duration duration) {
timer(duration).await();
Expand All @@ -65,25 +65,41 @@ default void sleep(Duration duration) {
* Causes the start of a timer for the given duration. You can await on the timer end by invoking
* {@link Awaitable#await()}.
*
* @param duration for which to sleep
* @param duration for which to sleep.
*/
Awaitable<Void> timer(Duration duration);

/**
* Invoke another Restate service method.
*
* @return an {@link Awaitable} that wraps the Restate service method result
* @param methodDescriptor The method descriptor of the method to invoke. This is found in the
* generated `*Grpc` class.
* @param parameter the invocation request parameter.
* @return an {@link Awaitable} that wraps the Restate service method result.
*/
<T extends MessageLite, R extends MessageLite> Awaitable<R> call(
MethodDescriptor<T, R> methodDescriptor, T parameter);

/** Invoke another Restate service in a fire and forget fashion. */
<T extends MessageLite> void backgroundCall(
/**
* Invoke another Restate service without waiting for the response.
*
* @param methodDescriptor The method descriptor of the method to invoke. This is found in the
* generated `*Grpc` class.
* @param parameter the invocation request parameter.
*/
<T extends MessageLite> void oneWayCall(
MethodDescriptor<T, ? extends MessageLite> methodDescriptor, T parameter);

/**
* Similar to {@link #backgroundCall(MethodDescriptor, MessageLite)}, but executes the invocation
* after the provided delay.
* Invoke another Restate service without waiting for the response after the provided {@code
* delay} has elapsed.
*
* <p>This method returns immediately, as the timer is executed and awaited on Restate.
*
* @param methodDescriptor The method descriptor of the method to invoke. This is found in the
* generated {@code *Grpc} class.
* @param parameter the invocation request parameter.
* @param delay time to wait before executing the call.
*/
<T extends MessageLite> void delayedCall(
MethodDescriptor<T, ? extends MessageLite> methodDescriptor, T parameter, Duration delay);
Expand All @@ -99,10 +115,10 @@ default <T> T sideEffect(Class<T> clazz, Supplier<T> action) {
*
* <p>Use this function if you want to perform non-deterministic operations.
*
* @param typeTag the type tag of the return value
* @param action to execute for its side effects
* @param <T> type of the return value
* @return value of the side effect operation
* @param typeTag the type tag of the return value, used to serialize/deserialize it.
* @param action to execute for its side effects.
* @param <T> type of the return value.
* @return value of the side effect operation.
*/
<T> T sideEffect(TypeTag<T> typeTag, Supplier<T> action);

Expand All @@ -129,15 +145,16 @@ default <T> Awakeable<T> awakeable(Class<T> type) {
* service consume from Kafka the responses of given external system interaction by using {@link
* #awakeableHandle(String)}.
*
* @param typeTag the response type tag to use for deserializing
* @return the result value of the external system interaction
* @param typeTag the response type tag to use for deserializing the {@link Awakeable} result.
* @return the {@link Awakeable} to await on.
* @see Awakeable
*/
<T> Awakeable<T> awakeable(TypeTag<T> typeTag);

/**
* Create a new {@link AwakeableHandle} for the provided identifier. You can use it to {@link
* AwakeableHandle#resolve(TypeTag, Object)} the linked {@link Awakeable}.
* AwakeableHandle#resolve(TypeTag, Object)} or {@link AwakeableHandle#reject(String)} the linked
* {@link Awakeable}.
*
* @see Awakeable
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public <T extends MessageLite, R extends MessageLite> Awaitable<R> call(
}

@Override
public <T extends MessageLite> void backgroundCall(
public <T extends MessageLite> void oneWayCall(
MethodDescriptor<T, ? extends MessageLite> methodDescriptor, T parameter) {
Util.<Void>blockOnSyscall(cb -> syscalls.backgroundCall(methodDescriptor, parameter, null, cb));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@
import dev.restate.sdk.core.syscalls.AnyDeferredResult;
import dev.restate.sdk.core.syscalls.DeferredResult;
import dev.restate.sdk.core.syscalls.Syscalls;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;

public interface SyscallsInternal extends Syscalls {

@Override
default AnyDeferredResult createAnyDeferred(Collection<DeferredResult<?>> children) {
default AnyDeferredResult createAnyDeferred(List<DeferredResult<?>> children) {
return DeferredResults.any(
children.stream().map(dr -> (DeferredResultInternal<?>) dr).collect(Collectors.toList()));
}

@Override
default DeferredResult<Void> createAllDeferred(Collection<DeferredResult<?>> children) {
default DeferredResult<Void> createAllDeferred(List<DeferredResult<?>> children) {
return DeferredResults.all(
children.stream().map(dr -> (DeferredResultInternal<?>) dr).collect(Collectors.toList()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private static class SideEffectGuard extends GreeterGrpc.GreeterImplBase
public void greet(GreetingRequest request, StreamObserver<GreetingResponse> responseObserver) {
RestateContext ctx = restateContext();
ctx.sideEffect(
() -> ctx.backgroundCall(GreeterGrpc.getGreetMethod(), greetingRequest("something")));
() -> ctx.oneWayCall(GreeterGrpc.getGreetMethod(), greetingRequest("something")));

throw new IllegalStateException("This point should not be reached");
}
Expand Down
11 changes: 10 additions & 1 deletion sdk-core/src/main/java/dev/restate/sdk/core/TypeTag.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,35 @@
import java.util.function.Function;

/**
* Marker interface to carry non-class type tags and generic at the same time.
* Marker interface defining the concrete type to serialize/deserialize.
*
* <p>It carries non-class type tags and generic at the same time.
*
* <p>This interface can be used by implementations of {@link dev.restate.sdk.core.serde.Serde} to
* support type tags different from {@link Class}, for example to circumvent type erasure (like
* Jackson's TypeReference).
*/
public interface TypeTag<T> {

/** Type tag for void. */
TypeTag<Void> VOID = TypeTag.ofClass(Void.TYPE);

/** Type tag for a byte array. */
TypeTag<byte[]> BYTES = TypeTag.ofClass(byte[].class);

/** Type tag for a UTF_8 encoded string. */
TypeTag<String> STRING_UTF8 =
TypeTag.using(
s -> s.getBytes(StandardCharsets.UTF_8), b -> new String(b, StandardCharsets.UTF_8));

Object get();

/** Create a TypeTag from a class. */
static <T> TypeTag<T> ofClass(Class<T> clazz) {
return () -> clazz;
}

/** Create a type tag providing custom serializer/deserializer functions. */
static <T> TypeTag<T> using(Function<T, byte[]> serializer, Function<byte[], T> deserializer) {
return new CustomSerdeFunctionsTypeTag<>(serializer, deserializer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import io.grpc.Context;
import io.grpc.MethodDescriptor;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
Expand Down Expand Up @@ -88,7 +88,7 @@ <T> void resolveAwakeable(

<T> void resolveDeferred(DeferredResult<T> deferredToResolve, SyscallCallback<Void> callback);

AnyDeferredResult createAnyDeferred(Collection<DeferredResult<?>> children);
AnyDeferredResult createAnyDeferred(List<DeferredResult<?>> children);

DeferredResult<Void> createAllDeferred(Collection<DeferredResult<?>> children);
DeferredResult<Void> createAllDeferred(List<DeferredResult<?>> children);
}
61 changes: 50 additions & 11 deletions sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/Awaitables.kt
Original file line number Diff line number Diff line change
@@ -1,23 +1,17 @@
package dev.restate.sdk.kotlin

import dev.restate.sdk.core.TypeTag
import dev.restate.sdk.core.syscalls.AnyDeferredResult
import dev.restate.sdk.core.syscalls.DeferredResult
import dev.restate.sdk.core.syscalls.Syscalls
import java.util.*
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.suspendCancellableCoroutine

sealed interface Awaitable<T> {
suspend fun await(): T
}

sealed interface Awakeable<T> : Awaitable<T> {
val id: String
}

internal abstract class BaseAwaitableImpl<JAVA_T, KT_T>
internal constructor(
private val syscalls: Syscalls,
protected val deferredResult: DeferredResult<JAVA_T>
internal val syscalls: Syscalls,
internal val deferredResult: DeferredResult<JAVA_T>
) : Awaitable<KT_T> {

abstract fun unpack(): KT_T
Expand Down Expand Up @@ -56,6 +50,45 @@ internal constructor(syscalls: Syscalls, deferredResult: DeferredResult<Void>) :
}
}

internal class AnyAwaitableImpl
internal constructor(syscalls: Syscalls, deferredResult: DeferredResult<Any>) :
BaseAwaitableImpl<Any, Any>(syscalls, deferredResult), AnyAwaitable {
override fun unpack(): Any {
val readyResult = deferredResult.toReadyResult()!!
if (!readyResult.isSuccess) {
throw readyResult.failure!!
}
return readyResult.result!!
}

override suspend fun awaitIndex(): Int {
if (!deferredResult.isCompleted) {
suspendCancellableCoroutine { cont: CancellableContinuation<Unit> ->
syscalls.resolveDeferred(deferredResult, completingUnitContinuation(cont))
}
}

return (deferredResult as AnyDeferredResult).completedIndex().orElseThrow {
IllegalStateException(
"completedIndex is empty when expecting a value. This looks like an SDK bug.")
}
}
}

internal fun wrapAllAwaitable(awaitables: List<Awaitable<*>>): Awaitable<Unit> {
val syscalls = (awaitables.get(0) as BaseAwaitableImpl<*, *>).syscalls
return UnitAwaitableImpl(
syscalls,
syscalls.createAllDeferred(awaitables.map { (it as BaseAwaitableImpl<*, *>).deferredResult }))
}

internal fun wrapAnyAwaitable(awaitables: List<Awaitable<*>>): AnyAwaitable {
val syscalls = (awaitables.get(0) as BaseAwaitableImpl<*, *>).syscalls
return AnyAwaitableImpl(
syscalls,
syscalls.createAnyDeferred(awaitables.map { (it as BaseAwaitableImpl<*, *>).deferredResult }))
}

internal class AwakeableImpl<T>
internal constructor(
syscalls: Syscalls,
Expand All @@ -64,9 +97,15 @@ internal constructor(
) : NonNullAwaitableImpl<T>(syscalls, deferredResult), Awakeable<T>

internal class AwakeableHandleImpl(val syscalls: Syscalls, val id: String) : AwakeableHandle {
override suspend fun <T : Any> complete(typeTag: TypeTag<T>, payload: T) {
override suspend fun <T : Any> resolve(typeTag: TypeTag<T>, payload: T) {
return suspendCancellableCoroutine { cont: CancellableContinuation<Unit> ->
syscalls.resolveAwakeable(id, typeTag, payload, completingUnitContinuation(cont))
}
}

override suspend fun reject(reason: String) {
return suspendCancellableCoroutine { cont: CancellableContinuation<Unit> ->
syscalls.rejectAwakeable(id, reason, completingUnitContinuation(cont))
}
}
}
Loading

0 comments on commit 2c8ed80

Please sign in to comment.