Skip to content

Commit

Permalink
Add Workflow service type and handler type (#1506)
Browse files Browse the repository at this point in the history
* Adds the new service type and handler type, plus reorganizes a bit the internal representation of service types
* Implements the run-once semantics of workflow methods
* Allows the user to tweak the completion_retention_time of the workflow from the admin api
  • Loading branch information
slinkydeveloper authored May 15, 2024
1 parent c930d41 commit ab3c57e
Show file tree
Hide file tree
Showing 36 changed files with 1,048 additions and 531 deletions.
3 changes: 2 additions & 1 deletion cli/src/ui/service_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ pub fn create_service_handlers_table_diff(
pub fn icon_for_service_type(svc_type: &ServiceType) -> Icon {
match svc_type {
ServiceType::Service => Icon("", ""),
ServiceType::VirtualObject => Icon("⬅️ 🚶🚶🚶", "keyed"),
ServiceType::VirtualObject => Icon("⬅️ 🚶🚶🚶", "virtual object"),
ServiceType::Workflow => Icon("📝", "workflow"),
}
}

Expand Down
6 changes: 6 additions & 0 deletions crates/admin/src/rest_api/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ pub async fn modify_service<V>(
#[request_body(required = true)] Json(ModifyServiceRequest {
public,
idempotency_retention,
workflow_completion_retention,
}): Json<ModifyServiceRequest>,
) -> Result<Json<ServiceMetadata>, MetaApiError> {
let mut modify_request = vec![];
Expand All @@ -98,6 +99,11 @@ pub async fn modify_service<V>(
new_idempotency_retention.into(),
));
}
if let Some(new_workflow_completion_retention) = workflow_completion_retention {
modify_request.push(ModifyServiceChange::WorkflowCompletionRetention(
new_workflow_completion_retention.into(),
));
}

if modify_request.is_empty() {
// No need to do anything
Expand Down
10 changes: 9 additions & 1 deletion crates/admin/src/schema_registry/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ use http::Uri;
use restate_core::metadata_store::ReadModifyWriteError;
use restate_core::ShutdownError;
use restate_schema_api::invocation_target::BadInputContentType;
use restate_service_protocol::discovery::schema;
use restate_types::errors::GenericError;
use restate_types::identifiers::DeploymentId;
use restate_types::invocation::ServiceType;

#[derive(Debug, thiserror::Error, codederror::CodedError)]
pub enum SchemaRegistryError {
Expand Down Expand Up @@ -87,6 +89,12 @@ pub enum ServiceError {
#[error("the handler '{0}' output content-type is not valid: {1}")]
#[code(unknown)]
BadOutputContentType(String, InvalidHeaderValue),
#[error("invalid combination of service type and handler type '({0}, {1:?})'")]
#[code(unknown)]
BadServiceAndHandlerType(ServiceType, Option<schema::HandlerType>),
#[error("modifying retention time for service type {0} is unsupported")]
#[code(unknown)]
CannotModifyRetentionTime(ServiceType),
}

#[derive(Debug, thiserror::Error, codederror::CodedError)]
Expand All @@ -107,7 +115,7 @@ pub enum SubscriptionError {
InvalidServiceSinkAuthority(Uri),
#[error("invalid sink URI '{0}': cannot find service/handler specified in the sink URI.")]
SinkServiceNotFound(Uri),
#[error("invalid sink URI '{0}': virtual object shared handlers cannot be used as sinks.")]
#[error("invalid sink URI '{0}': shared handlers cannot be used as sinks.")]
InvalidSinkSharedHandler(Uri),

#[error(transparent)]
Expand Down
3 changes: 2 additions & 1 deletion crates/admin/src/schema_registry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ impl ApplyMode {
pub enum ModifyServiceChange {
Public(bool),
IdempotencyRetention(Duration),
WorkflowCompletionRetention(Duration),
}

/// Responsible for updating the registered schema information. This includes the discovery of
Expand Down Expand Up @@ -213,7 +214,7 @@ impl<V> SchemaRegistry<V> {
.is_some()
{
let mut updater = SchemaUpdater::from(schema_information);
updater.modify_service(service_name.clone(), changes.clone());
updater.modify_service(service_name.clone(), changes.clone())?;
Ok(updater.into_inner())
} else {
Err(SchemaError::NotFound(format!(
Expand Down
86 changes: 68 additions & 18 deletions crates/admin/src/schema_registry/updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ use restate_schema::Schema;
use restate_schema_api::deployment::DeploymentMetadata;
use restate_schema_api::invocation_target::{
InputRules, InputValidationRule, InvocationTargetMetadata, OutputContentTypeRule, OutputRules,
DEFAULT_IDEMPOTENCY_RETENTION,
DEFAULT_IDEMPOTENCY_RETENTION, DEFAULT_WORKFLOW_COMPLETION_RETENTION,
};
use restate_schema_api::subscription::{
EventReceiverServiceType, Sink, Source, Subscription, SubscriptionValidator,
};
use restate_service_protocol::discovery::schema;
use restate_types::identifiers::{DeploymentId, SubscriptionId};
use restate_types::invocation::{HandlerType, ServiceType};
use restate_types::invocation::{
InvocationTargetType, ServiceType, VirtualObjectHandlerType, WorkflowHandlerType,
};
use serde::{Deserialize, Serialize};
use std::collections::hash_map::Entry;
use std::collections::HashMap;
Expand Down Expand Up @@ -127,7 +129,6 @@ impl SchemaUpdater {
for (service_name, service) in proposed_services {
let service_type = ServiceType::from(service.ty);
let handlers = DiscoveredHandlerMetadata::compute_handlers(
service_type,
service
.handlers
.into_iter()
Expand Down Expand Up @@ -201,6 +202,11 @@ impl SchemaUpdater {
public: true,
},
idempotency_retention: DEFAULT_IDEMPOTENCY_RETENTION,
workflow_completion_retention: if service_type == ServiceType::Workflow {
Some(DEFAULT_WORKFLOW_COMPLETION_RETENTION)
} else {
None
},
}
};

Expand Down Expand Up @@ -329,16 +335,19 @@ impl SchemaUpdater {
))
})?;

let ty = match (service_schemas.ty, handler_schemas.target_meta.handler_ty) {
(ServiceType::VirtualObject, HandlerType::Exclusive) => {
let ty = match handler_schemas.target_meta.target_ty {
InvocationTargetType::Workflow(WorkflowHandlerType::Workflow) => {
EventReceiverServiceType::Workflow
}
InvocationTargetType::VirtualObject(VirtualObjectHandlerType::Exclusive) => {
EventReceiverServiceType::VirtualObject
}
(ServiceType::VirtualObject, HandlerType::Shared) => {
InvocationTargetType::Workflow(_) | InvocationTargetType::VirtualObject(_) => {
return Err(SchemaError::Subscription(
SubscriptionError::InvalidSinkSharedHandler(sink),
))
}
(ServiceType::Service, _) => EventReceiverServiceType::Service,
InvocationTargetType::Service => EventReceiverServiceType::Service,
};

Sink::Service {
Expand Down Expand Up @@ -382,7 +391,11 @@ impl SchemaUpdater {
}
}

pub fn modify_service(&mut self, name: String, changes: Vec<ModifyServiceChange>) {
pub fn modify_service(
&mut self,
name: String,
changes: Vec<ModifyServiceChange>,
) -> Result<(), SchemaError> {
if let Some(schemas) = self.schema_information.services.get_mut(&name) {
for command in changes {
match command {
Expand All @@ -398,18 +411,38 @@ impl SchemaUpdater {
h.target_meta.idempotency_retention = new_idempotency_retention;
}
}
ModifyServiceChange::WorkflowCompletionRetention(
new_workflow_completion_retention,
) => {
if schemas.ty != ServiceType::Workflow {
return Err(SchemaError::Service(
ServiceError::CannotModifyRetentionTime(schemas.ty),
));
}
schemas.workflow_completion_retention =
Some(new_workflow_completion_retention);
for h in schemas.handlers.values_mut().filter(|w| {
w.target_meta.target_ty
== InvocationTargetType::Workflow(WorkflowHandlerType::Workflow)
}) {
h.target_meta.completion_retention =
Some(new_workflow_completion_retention);
}
}
}
}
}

self.modified = true;

Ok(())
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct DiscoveredHandlerMetadata {
name: String,
ty: HandlerType,
ty: InvocationTargetType,
input: InputRules,
output: OutputRules,
}
Expand All @@ -419,15 +452,27 @@ impl DiscoveredHandlerMetadata {
service_type: ServiceType,
handler: schema::Handler,
) -> Result<Self, ServiceError> {
let handler_type = match handler.ty {
None => HandlerType::default_for_service_type(service_type),
Some(schema::HandlerType::Exclusive) => HandlerType::Exclusive,
Some(schema::HandlerType::Shared) => HandlerType::Shared,
let ty = match (service_type, handler.ty) {
(ServiceType::Service, None | Some(schema::HandlerType::Shared)) => {
InvocationTargetType::Service
}
(ServiceType::VirtualObject, None | Some(schema::HandlerType::Exclusive)) => {
InvocationTargetType::VirtualObject(VirtualObjectHandlerType::Exclusive)
}
(ServiceType::VirtualObject, Some(schema::HandlerType::Shared)) => {
InvocationTargetType::VirtualObject(VirtualObjectHandlerType::Shared)
}
_ => {
return Err(ServiceError::BadServiceAndHandlerType(
service_type,
handler.ty,
))
}
};

Ok(Self {
name: handler.name.to_string(),
ty: handler_type,
ty,
input: handler
.input
.map(|s| DiscoveredHandlerMetadata::input_rules_from_schema(&handler.name, s))
Expand Down Expand Up @@ -495,7 +540,6 @@ impl DiscoveredHandlerMetadata {
}

fn compute_handlers(
service_ty: ServiceType,
handlers: Vec<DiscoveredHandlerMetadata>,
) -> HashMap<String, HandlerSchemas> {
handlers
Expand All @@ -507,8 +551,14 @@ impl DiscoveredHandlerMetadata {
target_meta: InvocationTargetMetadata {
public: true,
idempotency_retention: DEFAULT_IDEMPOTENCY_RETENTION,
service_ty,
handler_ty: handler.ty,
completion_retention: if handler.ty
== InvocationTargetType::Workflow(WorkflowHandlerType::Workflow)
{
Some(DEFAULT_WORKFLOW_COMPLETION_RETENTION)
} else {
None
},
target_ty: handler.ty,
input_rules: handler.input,
output_rules: handler.output,
},
Expand Down Expand Up @@ -662,7 +712,7 @@ mod tests {
updater.modify_service(
GREETER_SERVICE_NAME.to_owned(),
vec![ModifyServiceChange::Public(false)],
);
)?;
let schemas = updater.into_inner();

assert!(version_before_modification < schemas.version());
Expand Down
16 changes: 6 additions & 10 deletions crates/ingress-dispatcher/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ mod tests {
use restate_test_util::{let_assert, matchers::*};
use restate_types::identifiers::{IdempotencyId, InvocationId, WithPartitionKey};
use restate_types::invocation::{
HandlerType, Idempotency, InvocationTarget, ResponseResult, ServiceInvocation,
InvocationTarget, ResponseResult, ServiceInvocation, VirtualObjectHandlerType,
};
use restate_types::logs::{LogId, Lsn, SequenceNumber};
use restate_types::partition_table::{FindPartition, FixedPartitionTable};
Expand Down Expand Up @@ -245,7 +245,7 @@ mod tests {
"MySvc",
"MyKey",
"pippo",
HandlerType::Exclusive,
VirtualObjectHandlerType::Exclusive,
);
let argument = Bytes::from_static(b"nbfjksdfs");
let idempotency_key = ByteString::from_static("123");
Expand All @@ -265,10 +265,8 @@ mod tests {
restate_types::invocation::Source::Ingress,
);
invocation.argument = argument.clone();
invocation.idempotency = Some(Idempotency {
key: idempotency_key.clone(),
retention: Duration::from_secs(60),
});
invocation.idempotency_key = Some(idempotency_key.clone());
invocation.completion_retention_time = Some(Duration::from_secs(60));
let (ingress_req, _, res) = IngressDispatcherRequest::invocation(invocation);
dispatcher.dispatch_ingress_request(ingress_req).await?;

Expand Down Expand Up @@ -296,10 +294,8 @@ mod tests {
invocation_id: eq(invocation_id),
invocation_target: eq(invocation_target.clone()),
argument: eq(argument.clone()),
idempotency: some(eq(Idempotency {
key: idempotency_key.clone(),
retention: Duration::from_secs(60),
}))
idempotency_key: some(eq(idempotency_key.clone())),
completion_retention_time: some(eq(Duration::from_secs(60)))
})
);

Expand Down
21 changes: 15 additions & 6 deletions crates/ingress-dispatcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@
// by the Apache License, Version 2.0.

use bytes::Bytes;
use bytestring::ByteString;
use restate_core::metadata;
use restate_schema_api::subscription::{EventReceiverServiceType, Sink, Subscription};
use restate_types::identifiers::{
partitioner, IdempotencyId, InvocationId, PartitionKey, WithPartitionKey,
};
use restate_types::invocation::{
HandlerType, Idempotency, InvocationResponse, InvocationTarget, ResponseResult,
ServiceInvocation, ServiceInvocationResponseSink, SpanRelation,
InvocationResponse, InvocationTarget, ResponseResult, ServiceInvocation,
ServiceInvocationResponseSink, SpanRelation, VirtualObjectHandlerType, WorkflowHandlerType,
};
use restate_types::message::MessageIndex;
use std::fmt::Display;
Expand Down Expand Up @@ -102,7 +103,7 @@ impl IngressDispatcherRequest {
let correlation_id = ingress_correlation_id(
&service_invocation.invocation_id,
&service_invocation.invocation_target,
service_invocation.idempotency.as_ref(),
service_invocation.idempotency_key.as_ref(),
);

let my_node_id = metadata().my_node_id();
Expand Down Expand Up @@ -168,7 +169,15 @@ impl IngressDispatcherRequest {
.map_err(|e| anyhow::anyhow!("The key must be valid UTF-8: {e}"))?
.to_owned(),
&**handler,
HandlerType::Exclusive,
VirtualObjectHandlerType::Exclusive,
),
EventReceiverServiceType::Workflow => InvocationTarget::workflow(
&**name,
std::str::from_utf8(&key)
.map_err(|e| anyhow::anyhow!("The key must be valid UTF-8: {e}"))?
.to_owned(),
&**handler,
WorkflowHandlerType::Workflow,
),
EventReceiverServiceType::Service => {
InvocationTarget::service(&**name, &**handler)
Expand Down Expand Up @@ -211,13 +220,13 @@ impl IngressDispatcherRequest {
pub fn ingress_correlation_id(
id: &InvocationId,
invocation_target: &InvocationTarget,
idempotency: Option<&Idempotency>,
idempotency: Option<&ByteString>,
) -> IngressCorrelationId {
if let Some(idempotency) = idempotency {
IngressCorrelationId::IdempotencyId(IdempotencyId::combine(
*id,
invocation_target,
idempotency.key.clone(),
idempotency.clone(),
))
} else {
IngressCorrelationId::InvocationId(*id)
Expand Down
7 changes: 6 additions & 1 deletion crates/ingress-http/src/handler/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ pub(crate) enum HandlerError {
"cannot use the delay query parameter with calls. The delay is supported only with sends"
)]
UnsupportedDelay,
#[error(
"cannot use the idempotency key with workflow handlers. The handler invocation will already be idempotent by the workflow key itself."
)]
UnsupportedIdempotencyKey,
#[error("bad awakeable id '{0}': {1}")]
BadAwakeableId(String, IdDecodeError),
}
Expand Down Expand Up @@ -89,7 +93,8 @@ impl HandlerError {
| HandlerError::UnsupportedDelay
| HandlerError::BadHeader(_, _)
| HandlerError::BadAwakeableId(_, _)
| HandlerError::InputValidation(_) => StatusCode::BAD_REQUEST,
| HandlerError::InputValidation(_)
| HandlerError::UnsupportedIdempotencyKey => StatusCode::BAD_REQUEST,
HandlerError::Body(_) => StatusCode::INTERNAL_SERVER_ERROR,
HandlerError::Unavailable => StatusCode::SERVICE_UNAVAILABLE,
HandlerError::MethodNotAllowed => StatusCode::METHOD_NOT_ALLOWED,
Expand Down
Loading

0 comments on commit ab3c57e

Please sign in to comment.