Skip to content

Commit

Permalink
Remove all the usages of built-in services (#1457)
Browse files Browse the repository at this point in the history
* Add Command::ProxyThrough to implement the same functionality implemented by the proxy built in service
* Now the ingress dispatcher can send Invoke, ProxyThrough and InvocationResponse commands to PPs.
* Forbid shared handlers as kafka event receivers. Also use the new ingress dispatcher API. See #1423
* Use the new ingress dispatcher api in ingress_http. Complete awakeable directly from the invoker. Fix #1376
* Completely prune built-in service infra.
  • Loading branch information
slinkydeveloper authored Apr 25, 2024
1 parent 310f820 commit d715f21
Show file tree
Hide file tree
Showing 43 changed files with 524 additions and 2,243 deletions.
22 changes: 1 addition & 21 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ restate-network = { path = "crates/network" }
restate-node = { path = "crates/node" }
restate-node-protocol = { path = "crates/node-protocol" }
restate-node-services = { path = "crates/node-services" }
restate-pb = { path = "crates/pb" }
restate-queue = { path = "crates/queue" }
restate-rocksdb = { path = "crates/rocksdb" }
restate-schema = { path = "crates/schema" }
Expand Down
1 change: 0 additions & 1 deletion crates/admin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ restate-fs-util = { workspace = true }
restate-futures-util = { workspace = true }
restate-meta-rest-model = { workspace = true, features = ["schema"] }
restate-node-services = { workspace = true, features = ["servers", "clients"] }
restate-pb = { workspace = true }
restate-schema = { workspace = true }
restate-schema-api = { workspace = true, features = ["deployment", "serde", "serde_schema"] }
restate-service-client = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions crates/admin/src/schema_registry/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ pub enum SubscriptionError {
InvalidServiceSinkAuthority(Uri),
#[error("invalid sink URI '{0}': cannot find service/handler specified in the sink URI.")]
SinkServiceNotFound(Uri),
#[error("invalid sink URI '{0}': virtual object shared handlers cannot be used as sinks.")]
InvalidSinkSharedHandler(Uri),

#[error(transparent)]
#[code(unknown)]
Expand Down
27 changes: 16 additions & 11 deletions crates/admin/src/schema_registry/updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ impl SchemaUpdater {
Source::Kafka {
cluster: cluster_name.to_string(),
topic: topic_name.to_string(),
ordering_key_format: Default::default(),
}
}
_ => {
Expand Down Expand Up @@ -323,17 +322,23 @@ impl SchemaUpdater {
sink.clone(),
))
})?;
if !service_schemas.handlers.contains_key(handler_name) {
return Err(SchemaError::Subscription(
SubscriptionError::SinkServiceNotFound(sink),
));
}
let handler_schemas =
service_schemas.handlers.get(handler_name).ok_or_else(|| {
SchemaError::Subscription(SubscriptionError::SinkServiceNotFound(
sink.clone(),
))
})?;

let ty = match service_schemas.ty {
ServiceType::VirtualObject => EventReceiverServiceType::VirtualObject {
ordering_key_is_key: false,
},
ServiceType::Service => EventReceiverServiceType::Service,
let ty = match (service_schemas.ty, handler_schemas.target_meta.handler_ty) {
(ServiceType::VirtualObject, HandlerType::Exclusive) => {
EventReceiverServiceType::VirtualObject
}
(ServiceType::VirtualObject, HandlerType::Shared) => {
return Err(SchemaError::Subscription(
SubscriptionError::InvalidSinkSharedHandler(sink),
))
}
(ServiceType::Service, _) => EventReceiverServiceType::Service,
};

Sink::Service {
Expand Down
2 changes: 0 additions & 2 deletions crates/ingress-dispatcher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ mocks = ["dep:restate-test-util"]
restate-bifrost = { workspace = true }
restate-core = { workspace = true }
restate-node-protocol = { workspace = true }
restate-pb = { workspace = true, features = ["restate-types"] }
restate-schema-api = { workspace = true, features = ["subscription"] }
# todo: only needed for DedupInformation :-( Probably fixed by merging restate-storage-api with restate-types
restate-storage-api = { workspace = true }
Expand All @@ -28,7 +27,6 @@ bytes = { workspace = true }
bytestring = { workspace = true }
dashmap = { workspace = true }
drain = { workspace = true }
prost = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
Expand Down
119 changes: 68 additions & 51 deletions crates/ingress-dispatcher/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

use crate::error::IngressDispatchError;
use crate::{
wrap_service_invocation_in_envelope, IngressCorrelationId, IngressDispatcherRequest,
IngressCorrelationId, IngressDispatcherRequest, IngressDispatcherRequestInner,
IngressDispatcherResponse, IngressRequestMode, IngressResponseSender,
};
use dashmap::DashMap;
Expand All @@ -19,12 +19,16 @@ use restate_core::metadata;
use restate_core::network::MessageHandler;
use restate_node_protocol::codec::Targeted;
use restate_node_protocol::ingress::IngressMessage;
use restate_types::invocation::{self, ServiceInvocation, ServiceInvocationResponseSink};
use restate_storage_api::deduplication_table::DedupInformation;
use restate_types::identifiers::{PartitionKey, WithPartitionKey};
use restate_types::message::MessageIndex;
use restate_wal_protocol::append_envelope_to_bifrost;
use restate_types::GenerationalNodeId;
use restate_wal_protocol::{
append_envelope_to_bifrost, Command, Destination, Envelope, Header, Source,
};
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use tracing::{debug, info, trace};
use tracing::{debug, trace};

/// Dispatches a request from ingress to bifrost
pub trait DispatchIngressRequest {
Expand Down Expand Up @@ -74,61 +78,44 @@ impl DispatchIngressRequest for IngressDispatcher {
ingress_request: IngressDispatcherRequest,
) -> Result<(), IngressDispatchError> {
let mut bifrost = self.bifrost.clone();
let my_node_id = metadata().my_node_id();
let IngressDispatcherRequest {
correlation_id,
invocation_id,
invocation_target,
argument,
span_context,
inner,
request_mode,
idempotency,
headers,
execution_time,
} = ingress_request;

let response_sink = if matches!(request_mode, IngressRequestMode::RequestResponse(_)) {
Some(ServiceInvocationResponseSink::Ingress(my_node_id))
} else {
None
};

let service_invocation = ServiceInvocation {
invocation_id,
invocation_target,
argument,
source: invocation::Source::Ingress,
response_sink,
span_context,
headers,
execution_time,
idempotency,
};

let (dedup_source, msg_index) = match request_mode {
IngressRequestMode::RequestResponse(response_sender) => {
let (dedup_source, msg_index, proxying_partition_key) = match request_mode {
IngressRequestMode::RequestResponse(correlation_id, response_sender) => {
self.state
.waiting_responses
.insert(correlation_id, response_sender);
(None, self.state.get_and_increment_msg_index())
(None, self.state.get_and_increment_msg_index(), None)
}
IngressRequestMode::FireAndForget => {
let msg_index = self.state.get_and_increment_msg_index();
(None, msg_index)
(None, msg_index, None)
}
IngressRequestMode::DedupFireAndForget(dedup_id) => (Some(dedup_id.0), dedup_id.1),
IngressRequestMode::DedupFireAndForget {
deduplication_id,
proxying_partition_key,
} => (
Some(deduplication_id.0),
deduplication_id.1,
proxying_partition_key,
),
};

let partition_key = proxying_partition_key.unwrap_or_else(|| inner.partition_key());

let envelope = wrap_service_invocation_in_envelope(
service_invocation,
my_node_id,
partition_key,
inner,
metadata().my_node_id(),
dedup_source,
msg_index,
);
let (log_id, lsn) = append_envelope_to_bifrost(&mut bifrost, envelope).await?;

info!(
restate.invocation.id = %invocation_id,
debug!(
log_id = %log_id,
lsn = %lsn,
"Ingress request written to bifrost"
Expand Down Expand Up @@ -181,10 +168,39 @@ impl MessageHandler for IngressDispatcher {
}
}

fn wrap_service_invocation_in_envelope(
partition_key: PartitionKey,
inner: IngressDispatcherRequestInner,
from_node_id: GenerationalNodeId,
deduplication_source: Option<String>,
msg_index: MessageIndex,
) -> Envelope {
let header = Header {
source: Source::Ingress {
node_id: from_node_id,
nodes_config_version: metadata().nodes_config_version(),
},
dest: Destination::Processor {
partition_key,
dedup: deduplication_source.map(|src| DedupInformation::ingress(src, msg_index)),
},
};

Envelope::new(
header,
match inner {
IngressDispatcherRequestInner::Invoke(si) => Command::Invoke(si),
IngressDispatcherRequestInner::ProxyThrough(si) => Command::ProxyThrough(si),
IngressDispatcherRequestInner::InvocationResponse(ir) => {
Command::InvocationResponse(ir)
}
},
)
}

#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;

use bytes::Bytes;
use bytestring::ByteString;
Expand All @@ -195,13 +211,14 @@ mod tests {
use restate_test_util::{let_assert, matchers::*};
use restate_types::identifiers::{IdempotencyId, InvocationId, WithPartitionKey};
use restate_types::invocation::{
HandlerType, Idempotency, InvocationTarget, ResponseResult, SpanRelation,
HandlerType, Idempotency, InvocationTarget, ResponseResult, ServiceInvocation,
};
use restate_types::logs::{LogId, Lsn, SequenceNumber};
use restate_types::partition_table::{FindPartition, FixedPartitionTable};
use restate_types::Version;
use restate_wal_protocol::Command;
use restate_wal_protocol::Envelope;
use std::time::Duration;
use test_log::test;

#[test(tokio::test)]
Expand Down Expand Up @@ -242,18 +259,18 @@ mod tests {
idempotency_key.clone(),
);

let (invocation, res) = IngressDispatcherRequest::invocation(
let mut invocation = ServiceInvocation::initialize(
invocation_id,
invocation_target.clone(),
argument.clone(),
SpanRelation::None,
Some(Idempotency {
key: idempotency_key.clone(),
retention: Duration::from_secs(60),
}),
vec![],
restate_types::invocation::Source::Ingress,
);
dispatcher.dispatch_ingress_request(invocation).await?;
invocation.argument = argument.clone();
invocation.idempotency = Some(Idempotency {
key: idempotency_key.clone(),
retention: Duration::from_secs(60),
});
let (ingress_req, _, res) = IngressDispatcherRequest::invocation(invocation);
dispatcher.dispatch_ingress_request(ingress_req).await?;

// Let's check we correct have generated a bifrost write
let partition_id = node_env
Expand Down
Loading

0 comments on commit d715f21

Please sign in to comment.