diff --git a/Cargo.lock b/Cargo.lock index 0098e9050..478b9c8f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5416,6 +5416,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.1", "http-body-util", + "hyper-util", "okapi-operation", "prost 0.13.1", "prost-dto", diff --git a/crates/admin/Cargo.toml b/crates/admin/Cargo.toml index 132bcd93b..b6851af4b 100644 --- a/crates/admin/Cargo.toml +++ b/crates/admin/Cargo.toml @@ -39,6 +39,7 @@ futures = { workspace = true } http = { workspace = true } http-body = { workspace = true } http-body-util = { workspace = true } +hyper-util = { workspace = true } okapi-operation = { version = "0.3.0-rc2", features = ["axum-integration"] } prost = { workspace = true } prost-dto = { workspace = true } diff --git a/crates/admin/src/rest_api/deployments.rs b/crates/admin/src/rest_api/deployments.rs index c18384056..f0b99e32e 100644 --- a/crates/admin/src/rest_api/deployments.rs +++ b/crates/admin/src/rest_api/deployments.rs @@ -11,7 +11,6 @@ use super::error::*; use crate::state::AdminServiceState; -use crate::rest_api::log_error; use crate::schema_registry::{ApplyMode, Force}; use axum::extract::{Path, Query, State}; use axum::http::{header, StatusCode}; @@ -19,6 +18,7 @@ use axum::response::IntoResponse; use axum::Json; use okapi_operation::*; use restate_admin_rest_model::deployments::*; +use restate_errors::fmt::CodedErrorResultExt; use restate_service_client::Endpoint; use restate_service_protocol::discovery::DiscoverEndpoint; use restate_types::identifiers::{DeploymentId, InvalidLambdaARN}; @@ -96,16 +96,10 @@ pub async fn create_deployment( }; let (id, services) = state - .task_center - .run_in_scope("create-deployment", None, async { - log_error( - state - .schema_registry - .register_deployment(discover_endpoint, force, apply_mode) - .await, - ) - }) - .await?; + .schema_registry + .register_deployment(discover_endpoint, force, apply_mode) + .await + .warn_it()?; let response_body = RegisterDeploymentResponse { id, services }; @@ -136,10 +130,8 @@ pub async fn get_deployment( Path(deployment_id): Path, ) -> Result, MetaApiError> { let (deployment, services) = state - .task_center - .run_in_scope_sync("get-deployment", None, || { - state.schema_registry.get_deployment(deployment_id) - }) + .schema_registry + .get_deployment(deployment_id) .ok_or_else(|| MetaApiError::DeploymentNotFound(deployment_id))?; Ok(DetailedDeploymentResponse { @@ -161,10 +153,8 @@ pub async fn list_deployments( State(state): State>, ) -> Json { let deployments = state - .task_center - .run_in_scope_sync("list-deployments", None, || { - state.schema_registry.list_deployments() - }) + .schema_registry + .list_deployments() .into_iter() .map(|(deployment, services)| DeploymentResponse { id: deployment.id, @@ -226,7 +216,11 @@ pub async fn delete_deployment( Query(DeleteDeploymentParams { force }): Query, ) -> Result { if let Some(true) = force { - log_error(state.schema_registry.delete_deployment(deployment_id).await)?; + state + .schema_registry + .delete_deployment(deployment_id) + .await + .warn_it()?; Ok(StatusCode::ACCEPTED) } else { Ok(StatusCode::NOT_IMPLEMENTED) diff --git a/crates/admin/src/rest_api/handlers.rs b/crates/admin/src/rest_api/handlers.rs index ce91dfec4..e28b3b799 100644 --- a/crates/admin/src/rest_api/handlers.rs +++ b/crates/admin/src/rest_api/handlers.rs @@ -33,11 +33,7 @@ pub async fn list_service_handlers( State(state): State>, Path(service_name): Path, ) -> Result, MetaApiError> { - match state - .task_center - .run_in_scope_sync("list-service-handlers", None, || { - state.schema_registry.list_service_handlers(&service_name) - }) { + match state.schema_registry.list_service_handlers(&service_name) { Some(handlers) => Ok(ListServiceHandlersResponse { handlers }.into()), None => Err(MetaApiError::ServiceNotFound(service_name)), } @@ -67,12 +63,9 @@ pub async fn get_service_handler( Path((service_name, handler_name)): Path<(String, String)>, ) -> Result, MetaApiError> { match state - .task_center - .run_in_scope_sync("get-service-handler", None, || { - state - .schema_registry - .get_service_handler(&service_name, &handler_name) - }) { + .schema_registry + .get_service_handler(&service_name, &handler_name) + { Some(metadata) => Ok(metadata.into()), _ => Err(MetaApiError::HandlerNotFound { service_name, diff --git a/crates/admin/src/rest_api/invocations.rs b/crates/admin/src/rest_api/invocations.rs index 6589b4a32..2e7284217 100644 --- a/crates/admin/src/rest_api/invocations.rs +++ b/crates/admin/src/rest_api/invocations.rs @@ -93,17 +93,11 @@ pub async fn delete_invocation( let partition_key = invocation_id.partition_key(); - let result = state - .task_center - .run_in_scope( - "delete_invocation", - None, - append_envelope_to_bifrost( - &state.bifrost, - Envelope::new(create_envelope_header(partition_key), cmd), - ), - ) - .await; + let result = append_envelope_to_bifrost( + &state.bifrost, + Envelope::new(create_envelope_header(partition_key), cmd), + ) + .await; if let Err(err) = result { warn!("Could not append invocation termination command to Bifrost: {err}"); diff --git a/crates/admin/src/rest_api/mod.rs b/crates/admin/src/rest_api/mod.rs index 35f120826..7a6441a18 100644 --- a/crates/admin/src/rest_api/mod.rs +++ b/crates/admin/src/rest_api/mod.rs @@ -19,10 +19,8 @@ mod services; mod subscriptions; mod version; -use codederror::CodedError; use okapi_operation::axum_integration::{delete, get, patch, post}; use okapi_operation::*; -use restate_errors::warn_it; use restate_types::identifiers::PartitionKey; use restate_types::schema::subscriptions::SubscriptionValidator; use restate_wal_protocol::{Destination, Header, Source}; @@ -108,11 +106,3 @@ fn create_envelope_header(partition_key: PartitionKey) -> Header { }, } } - -#[inline] -fn log_error(result: Result) -> Result { - result.map_err(|err| { - warn_it!(err); - err - }) -} diff --git a/crates/admin/src/rest_api/services.rs b/crates/admin/src/rest_api/services.rs index ce7172076..ab575ffb1 100644 --- a/crates/admin/src/rest_api/services.rs +++ b/crates/admin/src/rest_api/services.rs @@ -8,8 +8,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use super::create_envelope_header; use super::error::*; -use super::{create_envelope_header, log_error}; use crate::schema_registry::ModifyServiceChange; use crate::state::AdminServiceState; @@ -20,6 +20,7 @@ use http::StatusCode; use okapi_operation::*; use restate_admin_rest_model::services::ListServicesResponse; use restate_admin_rest_model::services::*; +use restate_errors::fmt::CodedErrorResultExt; use restate_types::identifiers::{ServiceId, WithPartitionKey}; use restate_types::schema::service::ServiceMetadata; use restate_types::state_mut::ExternalStateMutation; @@ -36,11 +37,7 @@ use tracing::{debug, warn}; pub async fn list_services( State(state): State>, ) -> Result, MetaApiError> { - let services = state - .task_center - .run_in_scope_sync("list-services", None, || { - state.schema_registry.list_services() - }); + let services = state.schema_registry.list_services(); Ok(ListServicesResponse { services }.into()) } @@ -62,10 +59,8 @@ pub async fn get_service( Path(service_name): Path, ) -> Result, MetaApiError> { state - .task_center - .run_in_scope_sync("get-service", None, || { - state.schema_registry.get_service(&service_name) - }) + .schema_registry + .get_service(&service_name) .map(Into::into) .ok_or_else(|| MetaApiError::ServiceNotFound(service_name)) } @@ -112,16 +107,10 @@ pub async fn modify_service( } let response = state - .task_center - .run_in_scope("modify-service", None, async { - log_error( - state - .schema_registry - .modify_service(service_name, modify_request) - .await, - ) - }) - .await?; + .schema_registry + .modify_service(service_name, modify_request) + .await + .warn_it()?; Ok(response.into()) } @@ -156,12 +145,7 @@ pub async fn modify_service_state( new_state, }): Json, ) -> Result { - if let Some(svc) = state - .task_center - .run_in_scope_sync("get-service", None, || { - state.schema_registry.get_service(&service_name) - }) - { + if let Some(svc) = state.schema_registry.get_service(&service_name) { if !svc.ty.has_state() { return Err(MetaApiError::UnsupportedOperation("modify state", svc.ty)); } @@ -189,20 +173,14 @@ pub async fn modify_service_state( state: new_state, }; - let result = state - .task_center - .run_in_scope( - "modify_service_state", - None, - append_envelope_to_bifrost( - &state.bifrost, - Envelope::new( - create_envelope_header(partition_key), - Command::PatchState(patch_state), - ), - ), - ) - .await; + let result = append_envelope_to_bifrost( + &state.bifrost, + Envelope::new( + create_envelope_header(partition_key), + Command::PatchState(patch_state), + ), + ) + .await; if let Err(err) = result { warn!("Could not append state patching command to Bifrost: {err}"); diff --git a/crates/admin/src/rest_api/subscriptions.rs b/crates/admin/src/rest_api/subscriptions.rs index 889bd0fc7..78f8be4db 100644 --- a/crates/admin/src/rest_api/subscriptions.rs +++ b/crates/admin/src/rest_api/subscriptions.rs @@ -14,12 +14,12 @@ use crate::state::AdminServiceState; use restate_admin_rest_model::subscriptions::*; use restate_types::schema::subscriptions::{ListSubscriptionFilter, SubscriptionValidator}; -use crate::rest_api::log_error; use axum::extract::Query; use axum::extract::{Path, State}; use axum::http::StatusCode; use axum::{http, Json}; use okapi_operation::*; +use restate_errors::fmt::CodedErrorResultExt; use restate_types::identifiers::SubscriptionId; /// Create subscription. @@ -42,12 +42,11 @@ pub async fn create_subscription( State(state): State>, #[request_body(required = true)] Json(payload): Json, ) -> Result { - let subscription = log_error( - state - .schema_registry - .create_subscription(payload.source, payload.sink, payload.options) - .await, - )?; + let subscription = state + .schema_registry + .create_subscription(payload.source, payload.sink, payload.options) + .await + .warn_it()?; Ok(( StatusCode::CREATED, @@ -76,10 +75,8 @@ pub async fn get_subscription( Path(subscription_id): Path, ) -> Result, MetaApiError> { let subscription = state - .task_center - .run_in_scope_sync("get-subscription", None, || { - state.schema_registry.get_subscription(subscription_id) - }) + .schema_registry + .get_subscription(subscription_id) .ok_or_else(|| MetaApiError::SubscriptionNotFound(subscription_id))?; Ok(SubscriptionResponse::from(subscription).into()) @@ -126,11 +123,7 @@ pub async fn list_subscriptions( _ => vec![], }; - let subscriptions = state - .task_center - .run_in_scope_sync("list-subscriptions", None, || { - state.schema_registry.list_subscriptions(&filters) - }); + let subscriptions = state.schema_registry.list_subscriptions(&filters); ListSubscriptionsResponse { subscriptions: subscriptions @@ -166,11 +159,10 @@ pub async fn delete_subscription( State(state): State>, Path(subscription_id): Path, ) -> Result { - log_error( - state - .schema_registry - .delete_subscription(subscription_id) - .await, - )?; + state + .schema_registry + .delete_subscription(subscription_id) + .await + .warn_it()?; Ok(StatusCode::ACCEPTED) } diff --git a/crates/admin/src/service.rs b/crates/admin/src/service.rs index c44bf6ca7..f10873054 100644 --- a/crates/admin/src/service.rs +++ b/crates/admin/src/service.rs @@ -17,16 +17,16 @@ use restate_types::config::AdminOptions; use restate_types::live::LiveLoad; use tonic::transport::Channel; use tower::ServiceBuilder; -use tracing::info; use restate_core::metadata_store::MetadataStoreClient; +use restate_core::network::net_util; use restate_core::network::protobuf::node_svc::node_svc_client::NodeSvcClient; -use restate_core::{cancellation_token, task_center, MetadataWriter}; +use restate_core::MetadataWriter; use restate_service_protocol::discovery::ServiceDiscovery; +use restate_types::net::BindAddress; use restate_types::schema::subscriptions::SubscriptionValidator; use crate::schema_registry::SchemaRegistry; -use crate::Error; use crate::{rest_api, state, storage_query}; #[derive(Debug, thiserror::Error)] @@ -65,8 +65,7 @@ where ) -> anyhow::Result<()> { let opts = updateable_config.live_load(); - let rest_state = - state::AdminServiceState::new(self.schema_registry, bifrost, task_center()); + let rest_state = state::AdminServiceState::new(self.schema_registry, bifrost); let query_state = Arc::new(state::QueryServiceState { node_svc_client }); let router = axum::Router::new().merge(storage_query::create_router(query_state)); @@ -85,28 +84,14 @@ where )), ); - // run our app with hyper - let listener = tokio::net::TcpListener::bind(&opts.bind_address) - .await - .map_err(|err| Error::Binding { - address: opts.bind_address, - source: Box::new(err), - })?; + let service = hyper_util::service::TowerToHyperService::new(router.into_service()); - let local_addr = listener.local_addr().map_err(|err| Error::Binding { - address: opts.bind_address, - source: Box::new(err), - })?; - info!( - net.host.addr = %local_addr.ip(), - net.host.port = %local_addr.port(), - "Admin API listening" - ); - - let cancellation_token = cancellation_token(); - Ok(axum::serve(listener, router) - .with_graceful_shutdown(async move { cancellation_token.cancelled().await }) - .await - .map_err(|e| Error::Running(Box::new(e)))?) + net_util::run_hyper_server( + &BindAddress::Socket(opts.bind_address), + service, + "admin-api-server", + ) + .await + .map_err(Into::into) } } diff --git a/crates/admin/src/state.rs b/crates/admin/src/state.rs index 46df7ab18..29eba463a 100644 --- a/crates/admin/src/state.rs +++ b/crates/admin/src/state.rs @@ -12,14 +12,12 @@ use crate::schema_registry::SchemaRegistry; use restate_bifrost::Bifrost; use restate_core::network::protobuf::node_svc::node_svc_client::NodeSvcClient; -use restate_core::TaskCenter; use tonic::transport::Channel; #[derive(Clone, derive_builder::Builder)] pub struct AdminServiceState { pub schema_registry: SchemaRegistry, pub bifrost: Bifrost, - pub task_center: TaskCenter, } #[derive(Clone)] @@ -28,15 +26,10 @@ pub struct QueryServiceState { } impl AdminServiceState { - pub fn new( - schema_registry: SchemaRegistry, - bifrost: Bifrost, - task_center: TaskCenter, - ) -> Self { + pub fn new(schema_registry: SchemaRegistry, bifrost: Bifrost) -> Self { Self { schema_registry, bifrost, - task_center, } } } diff --git a/crates/errors/src/fmt.rs b/crates/errors/src/fmt.rs index 7038f7a56..14c800347 100644 --- a/crates/errors/src/fmt.rs +++ b/crates/errors/src/fmt.rs @@ -116,6 +116,44 @@ impl fmt::Debug for RestateCode { } } +/// Extension trait which extends [`Result`] with some formatting functions. +pub trait CodedErrorResultExt { + /// Log on info level if error and return self. + fn info_it(self) -> Self; + + /// Log on warn level if error and return self. + fn warn_it(self) -> Self; + + /// Log on error level if error and return self. + fn error_it(self) -> Self; +} + +impl CodedErrorResultExt for Result { + #[inline] + fn info_it(self) -> Self { + if let Err(err) = &self { + info_it!(*err); + } + self + } + + #[inline] + fn warn_it(self) -> Self { + if let Err(err) = &self { + warn_it!(*err); + } + self + } + + #[inline] + fn error_it(self) -> Self { + if let Err(err) = &self { + error_it!(*err); + } + self + } +} + #[cfg(test)] mod tests { use super::*;