From b2c84607fbbfca82060a621aca9932ca859b4cf0 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Thu, 15 Aug 2024 13:52:26 +0200 Subject: [PATCH 1/3] Use net_util::run_hyper_server to run admin http server --- Cargo.lock | 1 + crates/admin/Cargo.toml | 1 + crates/admin/src/service.rs | 36 +++++++++++------------------------- 3 files changed, 13 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0098e9050..478b9c8f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5416,6 +5416,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.1", "http-body-util", + "hyper-util", "okapi-operation", "prost 0.13.1", "prost-dto", diff --git a/crates/admin/Cargo.toml b/crates/admin/Cargo.toml index 132bcd93b..b6851af4b 100644 --- a/crates/admin/Cargo.toml +++ b/crates/admin/Cargo.toml @@ -39,6 +39,7 @@ futures = { workspace = true } http = { workspace = true } http-body = { workspace = true } http-body-util = { workspace = true } +hyper-util = { workspace = true } okapi-operation = { version = "0.3.0-rc2", features = ["axum-integration"] } prost = { workspace = true } prost-dto = { workspace = true } diff --git a/crates/admin/src/service.rs b/crates/admin/src/service.rs index c44bf6ca7..49616cb8f 100644 --- a/crates/admin/src/service.rs +++ b/crates/admin/src/service.rs @@ -17,16 +17,16 @@ use restate_types::config::AdminOptions; use restate_types::live::LiveLoad; use tonic::transport::Channel; use tower::ServiceBuilder; -use tracing::info; use restate_core::metadata_store::MetadataStoreClient; +use restate_core::network::net_util; use restate_core::network::protobuf::node_svc::node_svc_client::NodeSvcClient; -use restate_core::{cancellation_token, task_center, MetadataWriter}; +use restate_core::{task_center, MetadataWriter}; use restate_service_protocol::discovery::ServiceDiscovery; +use restate_types::net::BindAddress; use restate_types::schema::subscriptions::SubscriptionValidator; use crate::schema_registry::SchemaRegistry; -use crate::Error; use crate::{rest_api, state, storage_query}; #[derive(Debug, thiserror::Error)] @@ -85,28 +85,14 @@ where )), ); - // run our app with hyper - let listener = tokio::net::TcpListener::bind(&opts.bind_address) - .await - .map_err(|err| Error::Binding { - address: opts.bind_address, - source: Box::new(err), - })?; + let service = hyper_util::service::TowerToHyperService::new(router.into_service()); - let local_addr = listener.local_addr().map_err(|err| Error::Binding { - address: opts.bind_address, - source: Box::new(err), - })?; - info!( - net.host.addr = %local_addr.ip(), - net.host.port = %local_addr.port(), - "Admin API listening" - ); - - let cancellation_token = cancellation_token(); - Ok(axum::serve(listener, router) - .with_graceful_shutdown(async move { cancellation_token.cancelled().await }) - .await - .map_err(|e| Error::Running(Box::new(e)))?) + net_util::run_hyper_server( + &BindAddress::Socket(opts.bind_address), + service, + "admin-api-server", + ) + .await + .map_err(Into::into) } } From 45be39cefa40fbf9938b9f2e9a37b2df149a4bdf Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Thu, 15 Aug 2024 13:56:19 +0200 Subject: [PATCH 2/3] Remove TaskCenter from AdminServiceState Since we are spawning the connection handling of the Admin HTTP server in the TaskCenter, we no longer need to spawn TaskCenter tasks from within the REST handlers. --- crates/admin/src/rest_api/deployments.rs | 29 ++++------- crates/admin/src/rest_api/handlers.rs | 15 ++---- crates/admin/src/rest_api/invocations.rs | 16 ++---- crates/admin/src/rest_api/services.rs | 58 +++++++--------------- crates/admin/src/rest_api/subscriptions.rs | 12 ++--- crates/admin/src/service.rs | 5 +- crates/admin/src/state.rs | 9 +--- 7 files changed, 43 insertions(+), 101 deletions(-) diff --git a/crates/admin/src/rest_api/deployments.rs b/crates/admin/src/rest_api/deployments.rs index c18384056..c9f466e5f 100644 --- a/crates/admin/src/rest_api/deployments.rs +++ b/crates/admin/src/rest_api/deployments.rs @@ -95,17 +95,12 @@ pub async fn create_deployment( ApplyMode::Apply }; - let (id, services) = state - .task_center - .run_in_scope("create-deployment", None, async { - log_error( - state - .schema_registry - .register_deployment(discover_endpoint, force, apply_mode) - .await, - ) - }) - .await?; + let (id, services) = log_error( + state + .schema_registry + .register_deployment(discover_endpoint, force, apply_mode) + .await, + )?; let response_body = RegisterDeploymentResponse { id, services }; @@ -136,10 +131,8 @@ pub async fn get_deployment( Path(deployment_id): Path, ) -> Result, MetaApiError> { let (deployment, services) = state - .task_center - .run_in_scope_sync("get-deployment", None, || { - state.schema_registry.get_deployment(deployment_id) - }) + .schema_registry + .get_deployment(deployment_id) .ok_or_else(|| MetaApiError::DeploymentNotFound(deployment_id))?; Ok(DetailedDeploymentResponse { @@ -161,10 +154,8 @@ pub async fn list_deployments( State(state): State>, ) -> Json { let deployments = state - .task_center - .run_in_scope_sync("list-deployments", None, || { - state.schema_registry.list_deployments() - }) + .schema_registry + .list_deployments() .into_iter() .map(|(deployment, services)| DeploymentResponse { id: deployment.id, diff --git a/crates/admin/src/rest_api/handlers.rs b/crates/admin/src/rest_api/handlers.rs index ce91dfec4..e28b3b799 100644 --- a/crates/admin/src/rest_api/handlers.rs +++ b/crates/admin/src/rest_api/handlers.rs @@ -33,11 +33,7 @@ pub async fn list_service_handlers( State(state): State>, Path(service_name): Path, ) -> Result, MetaApiError> { - match state - .task_center - .run_in_scope_sync("list-service-handlers", None, || { - state.schema_registry.list_service_handlers(&service_name) - }) { + match state.schema_registry.list_service_handlers(&service_name) { Some(handlers) => Ok(ListServiceHandlersResponse { handlers }.into()), None => Err(MetaApiError::ServiceNotFound(service_name)), } @@ -67,12 +63,9 @@ pub async fn get_service_handler( Path((service_name, handler_name)): Path<(String, String)>, ) -> Result, MetaApiError> { match state - .task_center - .run_in_scope_sync("get-service-handler", None, || { - state - .schema_registry - .get_service_handler(&service_name, &handler_name) - }) { + .schema_registry + .get_service_handler(&service_name, &handler_name) + { Some(metadata) => Ok(metadata.into()), _ => Err(MetaApiError::HandlerNotFound { service_name, diff --git a/crates/admin/src/rest_api/invocations.rs b/crates/admin/src/rest_api/invocations.rs index 6589b4a32..2e7284217 100644 --- a/crates/admin/src/rest_api/invocations.rs +++ b/crates/admin/src/rest_api/invocations.rs @@ -93,17 +93,11 @@ pub async fn delete_invocation( let partition_key = invocation_id.partition_key(); - let result = state - .task_center - .run_in_scope( - "delete_invocation", - None, - append_envelope_to_bifrost( - &state.bifrost, - Envelope::new(create_envelope_header(partition_key), cmd), - ), - ) - .await; + let result = append_envelope_to_bifrost( + &state.bifrost, + Envelope::new(create_envelope_header(partition_key), cmd), + ) + .await; if let Err(err) = result { warn!("Could not append invocation termination command to Bifrost: {err}"); diff --git a/crates/admin/src/rest_api/services.rs b/crates/admin/src/rest_api/services.rs index ce7172076..72a0bd5f7 100644 --- a/crates/admin/src/rest_api/services.rs +++ b/crates/admin/src/rest_api/services.rs @@ -36,11 +36,7 @@ use tracing::{debug, warn}; pub async fn list_services( State(state): State>, ) -> Result, MetaApiError> { - let services = state - .task_center - .run_in_scope_sync("list-services", None, || { - state.schema_registry.list_services() - }); + let services = state.schema_registry.list_services(); Ok(ListServicesResponse { services }.into()) } @@ -62,10 +58,8 @@ pub async fn get_service( Path(service_name): Path, ) -> Result, MetaApiError> { state - .task_center - .run_in_scope_sync("get-service", None, || { - state.schema_registry.get_service(&service_name) - }) + .schema_registry + .get_service(&service_name) .map(Into::into) .ok_or_else(|| MetaApiError::ServiceNotFound(service_name)) } @@ -111,17 +105,12 @@ pub async fn modify_service( return get_service(State(state), Path(service_name)).await; } - let response = state - .task_center - .run_in_scope("modify-service", None, async { - log_error( - state - .schema_registry - .modify_service(service_name, modify_request) - .await, - ) - }) - .await?; + let response = log_error( + state + .schema_registry + .modify_service(service_name, modify_request) + .await, + )?; Ok(response.into()) } @@ -156,12 +145,7 @@ pub async fn modify_service_state( new_state, }): Json, ) -> Result { - if let Some(svc) = state - .task_center - .run_in_scope_sync("get-service", None, || { - state.schema_registry.get_service(&service_name) - }) - { + if let Some(svc) = state.schema_registry.get_service(&service_name) { if !svc.ty.has_state() { return Err(MetaApiError::UnsupportedOperation("modify state", svc.ty)); } @@ -189,20 +173,14 @@ pub async fn modify_service_state( state: new_state, }; - let result = state - .task_center - .run_in_scope( - "modify_service_state", - None, - append_envelope_to_bifrost( - &state.bifrost, - Envelope::new( - create_envelope_header(partition_key), - Command::PatchState(patch_state), - ), - ), - ) - .await; + let result = append_envelope_to_bifrost( + &state.bifrost, + Envelope::new( + create_envelope_header(partition_key), + Command::PatchState(patch_state), + ), + ) + .await; if let Err(err) = result { warn!("Could not append state patching command to Bifrost: {err}"); diff --git a/crates/admin/src/rest_api/subscriptions.rs b/crates/admin/src/rest_api/subscriptions.rs index 889bd0fc7..373db8809 100644 --- a/crates/admin/src/rest_api/subscriptions.rs +++ b/crates/admin/src/rest_api/subscriptions.rs @@ -76,10 +76,8 @@ pub async fn get_subscription( Path(subscription_id): Path, ) -> Result, MetaApiError> { let subscription = state - .task_center - .run_in_scope_sync("get-subscription", None, || { - state.schema_registry.get_subscription(subscription_id) - }) + .schema_registry + .get_subscription(subscription_id) .ok_or_else(|| MetaApiError::SubscriptionNotFound(subscription_id))?; Ok(SubscriptionResponse::from(subscription).into()) @@ -126,11 +124,7 @@ pub async fn list_subscriptions( _ => vec![], }; - let subscriptions = state - .task_center - .run_in_scope_sync("list-subscriptions", None, || { - state.schema_registry.list_subscriptions(&filters) - }); + let subscriptions = state.schema_registry.list_subscriptions(&filters); ListSubscriptionsResponse { subscriptions: subscriptions diff --git a/crates/admin/src/service.rs b/crates/admin/src/service.rs index 49616cb8f..f10873054 100644 --- a/crates/admin/src/service.rs +++ b/crates/admin/src/service.rs @@ -21,7 +21,7 @@ use tower::ServiceBuilder; use restate_core::metadata_store::MetadataStoreClient; use restate_core::network::net_util; use restate_core::network::protobuf::node_svc::node_svc_client::NodeSvcClient; -use restate_core::{task_center, MetadataWriter}; +use restate_core::MetadataWriter; use restate_service_protocol::discovery::ServiceDiscovery; use restate_types::net::BindAddress; use restate_types::schema::subscriptions::SubscriptionValidator; @@ -65,8 +65,7 @@ where ) -> anyhow::Result<()> { let opts = updateable_config.live_load(); - let rest_state = - state::AdminServiceState::new(self.schema_registry, bifrost, task_center()); + let rest_state = state::AdminServiceState::new(self.schema_registry, bifrost); let query_state = Arc::new(state::QueryServiceState { node_svc_client }); let router = axum::Router::new().merge(storage_query::create_router(query_state)); diff --git a/crates/admin/src/state.rs b/crates/admin/src/state.rs index 46df7ab18..29eba463a 100644 --- a/crates/admin/src/state.rs +++ b/crates/admin/src/state.rs @@ -12,14 +12,12 @@ use crate::schema_registry::SchemaRegistry; use restate_bifrost::Bifrost; use restate_core::network::protobuf::node_svc::node_svc_client::NodeSvcClient; -use restate_core::TaskCenter; use tonic::transport::Channel; #[derive(Clone, derive_builder::Builder)] pub struct AdminServiceState { pub schema_registry: SchemaRegistry, pub bifrost: Bifrost, - pub task_center: TaskCenter, } #[derive(Clone)] @@ -28,15 +26,10 @@ pub struct QueryServiceState { } impl AdminServiceState { - pub fn new( - schema_registry: SchemaRegistry, - bifrost: Bifrost, - task_center: TaskCenter, - ) -> Self { + pub fn new(schema_registry: SchemaRegistry, bifrost: Bifrost) -> Self { Self { schema_registry, bifrost, - task_center, } } } From fbee26b9fee7bb681e0fe90edccc60118c173fd5 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Thu, 15 Aug 2024 22:44:10 +0200 Subject: [PATCH 3/3] Encapsulate Result formatting logic in CodedErrorResultExt --- crates/admin/src/rest_api/deployments.rs | 19 ++++++----- crates/admin/src/rest_api/mod.rs | 10 ------ crates/admin/src/rest_api/services.rs | 14 ++++---- crates/admin/src/rest_api/subscriptions.rs | 24 +++++++------- crates/errors/src/fmt.rs | 38 ++++++++++++++++++++++ 5 files changed, 67 insertions(+), 38 deletions(-) diff --git a/crates/admin/src/rest_api/deployments.rs b/crates/admin/src/rest_api/deployments.rs index c9f466e5f..f0b99e32e 100644 --- a/crates/admin/src/rest_api/deployments.rs +++ b/crates/admin/src/rest_api/deployments.rs @@ -11,7 +11,6 @@ use super::error::*; use crate::state::AdminServiceState; -use crate::rest_api::log_error; use crate::schema_registry::{ApplyMode, Force}; use axum::extract::{Path, Query, State}; use axum::http::{header, StatusCode}; @@ -19,6 +18,7 @@ use axum::response::IntoResponse; use axum::Json; use okapi_operation::*; use restate_admin_rest_model::deployments::*; +use restate_errors::fmt::CodedErrorResultExt; use restate_service_client::Endpoint; use restate_service_protocol::discovery::DiscoverEndpoint; use restate_types::identifiers::{DeploymentId, InvalidLambdaARN}; @@ -95,12 +95,11 @@ pub async fn create_deployment( ApplyMode::Apply }; - let (id, services) = log_error( - state - .schema_registry - .register_deployment(discover_endpoint, force, apply_mode) - .await, - )?; + let (id, services) = state + .schema_registry + .register_deployment(discover_endpoint, force, apply_mode) + .await + .warn_it()?; let response_body = RegisterDeploymentResponse { id, services }; @@ -217,7 +216,11 @@ pub async fn delete_deployment( Query(DeleteDeploymentParams { force }): Query, ) -> Result { if let Some(true) = force { - log_error(state.schema_registry.delete_deployment(deployment_id).await)?; + state + .schema_registry + .delete_deployment(deployment_id) + .await + .warn_it()?; Ok(StatusCode::ACCEPTED) } else { Ok(StatusCode::NOT_IMPLEMENTED) diff --git a/crates/admin/src/rest_api/mod.rs b/crates/admin/src/rest_api/mod.rs index 35f120826..7a6441a18 100644 --- a/crates/admin/src/rest_api/mod.rs +++ b/crates/admin/src/rest_api/mod.rs @@ -19,10 +19,8 @@ mod services; mod subscriptions; mod version; -use codederror::CodedError; use okapi_operation::axum_integration::{delete, get, patch, post}; use okapi_operation::*; -use restate_errors::warn_it; use restate_types::identifiers::PartitionKey; use restate_types::schema::subscriptions::SubscriptionValidator; use restate_wal_protocol::{Destination, Header, Source}; @@ -108,11 +106,3 @@ fn create_envelope_header(partition_key: PartitionKey) -> Header { }, } } - -#[inline] -fn log_error(result: Result) -> Result { - result.map_err(|err| { - warn_it!(err); - err - }) -} diff --git a/crates/admin/src/rest_api/services.rs b/crates/admin/src/rest_api/services.rs index 72a0bd5f7..ab575ffb1 100644 --- a/crates/admin/src/rest_api/services.rs +++ b/crates/admin/src/rest_api/services.rs @@ -8,8 +8,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use super::create_envelope_header; use super::error::*; -use super::{create_envelope_header, log_error}; use crate::schema_registry::ModifyServiceChange; use crate::state::AdminServiceState; @@ -20,6 +20,7 @@ use http::StatusCode; use okapi_operation::*; use restate_admin_rest_model::services::ListServicesResponse; use restate_admin_rest_model::services::*; +use restate_errors::fmt::CodedErrorResultExt; use restate_types::identifiers::{ServiceId, WithPartitionKey}; use restate_types::schema::service::ServiceMetadata; use restate_types::state_mut::ExternalStateMutation; @@ -105,12 +106,11 @@ pub async fn modify_service( return get_service(State(state), Path(service_name)).await; } - let response = log_error( - state - .schema_registry - .modify_service(service_name, modify_request) - .await, - )?; + let response = state + .schema_registry + .modify_service(service_name, modify_request) + .await + .warn_it()?; Ok(response.into()) } diff --git a/crates/admin/src/rest_api/subscriptions.rs b/crates/admin/src/rest_api/subscriptions.rs index 373db8809..78f8be4db 100644 --- a/crates/admin/src/rest_api/subscriptions.rs +++ b/crates/admin/src/rest_api/subscriptions.rs @@ -14,12 +14,12 @@ use crate::state::AdminServiceState; use restate_admin_rest_model::subscriptions::*; use restate_types::schema::subscriptions::{ListSubscriptionFilter, SubscriptionValidator}; -use crate::rest_api::log_error; use axum::extract::Query; use axum::extract::{Path, State}; use axum::http::StatusCode; use axum::{http, Json}; use okapi_operation::*; +use restate_errors::fmt::CodedErrorResultExt; use restate_types::identifiers::SubscriptionId; /// Create subscription. @@ -42,12 +42,11 @@ pub async fn create_subscription( State(state): State>, #[request_body(required = true)] Json(payload): Json, ) -> Result { - let subscription = log_error( - state - .schema_registry - .create_subscription(payload.source, payload.sink, payload.options) - .await, - )?; + let subscription = state + .schema_registry + .create_subscription(payload.source, payload.sink, payload.options) + .await + .warn_it()?; Ok(( StatusCode::CREATED, @@ -160,11 +159,10 @@ pub async fn delete_subscription( State(state): State>, Path(subscription_id): Path, ) -> Result { - log_error( - state - .schema_registry - .delete_subscription(subscription_id) - .await, - )?; + state + .schema_registry + .delete_subscription(subscription_id) + .await + .warn_it()?; Ok(StatusCode::ACCEPTED) } diff --git a/crates/errors/src/fmt.rs b/crates/errors/src/fmt.rs index 7038f7a56..14c800347 100644 --- a/crates/errors/src/fmt.rs +++ b/crates/errors/src/fmt.rs @@ -116,6 +116,44 @@ impl fmt::Debug for RestateCode { } } +/// Extension trait which extends [`Result`] with some formatting functions. +pub trait CodedErrorResultExt { + /// Log on info level if error and return self. + fn info_it(self) -> Self; + + /// Log on warn level if error and return self. + fn warn_it(self) -> Self; + + /// Log on error level if error and return self. + fn error_it(self) -> Self; +} + +impl CodedErrorResultExt for Result { + #[inline] + fn info_it(self) -> Self { + if let Err(err) = &self { + info_it!(*err); + } + self + } + + #[inline] + fn warn_it(self) -> Self { + if let Err(err) = &self { + warn_it!(*err); + } + self + } + + #[inline] + fn error_it(self) -> Self { + if let Err(err) = &self { + error_it!(*err); + } + self + } +} + #[cfg(test)] mod tests { use super::*;