Skip to content

Commit

Permalink
Some refactor of internal state machines (#112)
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper authored Oct 2, 2023
1 parent 8b99612 commit b608f9e
Show file tree
Hide file tree
Showing 15 changed files with 356 additions and 313 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package dev.restate.sdk.core.impl;

import java.util.function.Consumer;

// Implements the base logic for state machines containing suspensable callbacks.
abstract class BaseSuspendableCallbackStateMachine<CB extends SuspendableCallback> {

private final CallbackHandle<CB> callbackHandle;
private final InputPublisherState inputPublisherState;

BaseSuspendableCallbackStateMachine() {
this.callbackHandle = new CallbackHandle<>();
this.inputPublisherState = new InputPublisherState();
}

void abort(Throwable cause) {
this.inputPublisherState.notifyClosed(cause);
}

public void tryFailCallback() {
callbackHandle.consume(
cb -> {
if (inputPublisherState.isSuspended()) {
cb.onSuspend();
} else if (inputPublisherState.isClosed()) {
cb.onError(inputPublisherState.getCloseCause());
}
});
}

public void consumeCallback(Consumer<CB> consumer) {
this.callbackHandle.consume(consumer);
}

public void consumeCallbackOrElse(Consumer<CB> consumer, Runnable elseRunnable) {
this.callbackHandle.consumeOrElse(consumer, elseRunnable);
}

public void assertCallbackNotSet(String reason) {
if (!this.callbackHandle.isEmpty()) {
throw new IllegalStateException(reason);
}
}

void setCallback(CB callback) {
if (inputPublisherState.isSuspended()) {
callback.onSuspend();
} else if (inputPublisherState.isClosed()) {
callback.onError(inputPublisherState.getCloseCause());
} else {
callbackHandle.set(callback);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package dev.restate.sdk.core.impl;

import java.util.function.Consumer;
import javax.annotation.Nullable;

/** Handle for callbacks. */
final class CallbackHandle<T> {

private @Nullable T cb = null;

public void set(T t) {
this.cb = t;
}

public boolean isEmpty() {
return this.cb == null;
}

public void consume(Consumer<T> consumer) {
if (this.cb != null) {
consumer.accept(pop());
}
}

public void consumeOrElse(Consumer<T> consumer, Runnable elseRunnable) {
if (this.cb != null) {
consumer.accept(pop());
} else {
elseRunnable.run();
}
}

private @Nullable T pop() {
T temp = this.cb;
this.cb = null;
return temp;
}
}
48 changes: 23 additions & 25 deletions sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/Entries.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ void checkEntryHeader(E expected, MessageLite actual) throws ProtocolException {

abstract void trace(E expected, Span span);

void updateLocalStateStorage(E expected, LocalStateStorage localStateStorage) {}
void updateUserStateStoreWithEntry(E expected, UserStateStore userStateStore) {}
}

abstract static class CompletableJournalEntry<E extends MessageLite, R> extends JournalEntry<E> {
Expand All @@ -32,12 +32,12 @@ ReadyResultInternal<R> parseCompletionResult(CompletionMessage actual) {
this.getClass().getName(), actual.getResultCase());
}

E tryCompleteWithLocalStateStorage(E expected, LocalStateStorage localStateStorage) {
E tryCompleteWithUserStateStorage(E expected, UserStateStore userStateStore) {
return expected;
}

void updateLocalStateStorageWithCompletion(
E expected, CompletionMessage actual, LocalStateStorage localStateStorage) {}
void updateUserStateStorageWithCompletion(
E expected, CompletionMessage actual, UserStateStore userStateStore) {}
}

static final class PollInputEntry<R extends MessageLite>
Expand Down Expand Up @@ -135,32 +135,30 @@ public ReadyResultInternal<R> parseCompletionResult(CompletionMessage actual) {
}

@Override
void updateLocalStateStorage(
GetStateEntryMessage expected, LocalStateStorage localStateStorage) {
localStateStorage.set(expected.getKey(), expected.getValue());
void updateUserStateStoreWithEntry(
GetStateEntryMessage expected, UserStateStore userStateStore) {
userStateStore.set(expected.getKey(), expected.getValue());
}

@Override
GetStateEntryMessage tryCompleteWithLocalStateStorage(
GetStateEntryMessage expected, LocalStateStorage localStateStorage) {
LocalStateStorage.State value = localStateStorage.get(expected.getKey());
if (value instanceof LocalStateStorage.Value) {
return expected.toBuilder().setValue(((LocalStateStorage.Value) value).getValue()).build();
} else if (value instanceof LocalStateStorage.Empty) {
GetStateEntryMessage tryCompleteWithUserStateStorage(
GetStateEntryMessage expected, UserStateStore userStateStore) {
UserStateStore.State value = userStateStore.get(expected.getKey());
if (value instanceof UserStateStore.Value) {
return expected.toBuilder().setValue(((UserStateStore.Value) value).getValue()).build();
} else if (value instanceof UserStateStore.Empty) {
return expected.toBuilder().setEmpty(Empty.getDefaultInstance()).build();
}
return expected;
}

@Override
void updateLocalStateStorageWithCompletion(
GetStateEntryMessage expected,
CompletionMessage actual,
LocalStateStorage localStateStorage) {
void updateUserStateStorageWithCompletion(
GetStateEntryMessage expected, CompletionMessage actual, UserStateStore userStateStore) {
if (actual.hasEmpty()) {
localStateStorage.clear(expected.getKey());
userStateStore.clear(expected.getKey());
} else {
localStateStorage.set(expected.getKey(), actual.getValue());
userStateStore.set(expected.getKey(), actual.getValue());
}
}
}
Expand All @@ -184,9 +182,9 @@ void checkEntryHeader(ClearStateEntryMessage expected, MessageLite actual)
}

@Override
void updateLocalStateStorage(
ClearStateEntryMessage expected, LocalStateStorage localStateStorage) {
localStateStorage.clear(expected.getKey());
void updateUserStateStoreWithEntry(
ClearStateEntryMessage expected, UserStateStore userStateStore) {
userStateStore.clear(expected.getKey());
}
}

Expand All @@ -209,9 +207,9 @@ void checkEntryHeader(SetStateEntryMessage expected, MessageLite actual)
}

@Override
void updateLocalStateStorage(
SetStateEntryMessage expected, LocalStateStorage localStateStorage) {
localStateStorage.set(expected.getKey(), expected.getValue());
void updateUserStateStoreWithEntry(
SetStateEntryMessage expected, UserStateStore userStateStore) {
userStateStore.set(expected.getKey(), expected.getValue());
}
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package dev.restate.sdk.core.impl;

import com.google.protobuf.MessageLite;
import java.util.ArrayDeque;
import java.util.Queue;

class IncomingEntriesStateMachine
extends BaseSuspendableCallbackStateMachine<IncomingEntriesStateMachine.OnEntryCallback> {

interface OnEntryCallback extends SuspendableCallback {
void onEntry(MessageLite msg);
}

private final Queue<MessageLite> unprocessedMessages;

IncomingEntriesStateMachine() {
this.unprocessedMessages = new ArrayDeque<>();
}

void offer(MessageLite msg) {
Util.assertIsEntry(msg);
this.consumeCallbackOrElse(cb -> cb.onEntry(msg), () -> this.unprocessedMessages.offer(msg));
}

void read(OnEntryCallback msgCallback) {
this.assertCallbackNotSet("Two concurrent reads were requested.");

MessageLite popped = this.unprocessedMessages.poll();
if (popped != null) {
msgCallback.onEntry(popped);
} else {
this.setCallback(msgCallback);
}
}

boolean isEmpty() {
return this.unprocessedMessages.isEmpty();
}

@Override
void abort(Throwable cause) {
super.abort(cause);
// We can't do anything else if the input stream is closed, so we just fail the callback, if any
this.tryFailCallback();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package dev.restate.sdk.core.impl;

import dev.restate.sdk.core.SuspendedException;
import javax.annotation.Nullable;

class InputPublisherState {

private @Nullable Throwable closeCause = null;

void notifyClosed(Throwable cause) {
closeCause = cause;
}

boolean isSuspended() {
return this.closeCause == SuspendedException.INSTANCE;
}

boolean isClosed() {
return this.closeCause != null;
}

public Throwable getCloseCause() {
return closeCause;
}
}
Loading

0 comments on commit b608f9e

Please sign in to comment.