Skip to content

Commit

Permalink
Make ProtoSymbols::add_service and ::remove_service idempotent
Browse files Browse the repository at this point in the history
This commit changes the ProtoSymbols to support idempotent calls instead
of panicking if a service already exists when adding a new service or
when removing a non-existent one. The latter caused problems when re-
discovering a deployment which contained some private services.

This fixes #1205.
  • Loading branch information
tillrohrmann committed Feb 27, 2024
1 parent 8f7d5e2 commit 7c9855e
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 72 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/schema-impl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ restate-pb = { workspace = true, features = ["mocks"] }
restate-schema-api = { workspace = true, features = ["mocks"] }
restate-test-util = { workspace = true }

googletest = { workspace = true }
prost-reflect = { workspace = true }
test-log = { workspace = true }
tracing = { workspace = true }
Expand Down
112 changes: 53 additions & 59 deletions crates/schema-impl/src/proto_symbol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,17 +190,19 @@ impl SymbolsIndex {
&mut self,
service_name: &str,
methods: impl Iterator<Item = String>,
) -> Arc<Vec<String>> {
) -> Option<Arc<Vec<String>>> {
let service_symbol = self.0.remove(service_name);
for method in methods {
self.0.remove(&method);
}
match service_symbol {
Some(Symbol::ServiceOrMethod(arc)) => arc,
_ => {
panic!("The removed symbol should be a ServiceOrMethod!")

if service_symbol.is_some() {
for method in methods {
self.0.remove(&method);
}
}

service_symbol.map(|symbol| match symbol {
Symbol::ServiceOrMethod(arc) => arc,
Symbol::MessageOrEnum(_) => panic!("The removed symbol should be a ServiceOrMethod!"),
})
}

fn remove_message_or_enum(&mut self, symbol_name: &str, file: &str) {
Expand Down Expand Up @@ -238,72 +240,64 @@ impl ProtoSymbols {
deployment_id: &DeploymentId,
service_desc: &ServiceDescriptor,
) {
debug_assert!(
!self.symbols.contains(service_desc.full_name()),
"Cannot add service '{}' because it already exists",
service_desc.full_name()
);
if !self.symbols.contains(service_desc.full_name()) {
// Collect all the files belonging to this service
let files: HashMap<String, FileDescriptor> =
collect_service_related_file_descriptors(service_desc)
.into_iter()
.map(|file_desc| {
(
normalize_file_name(deployment_id, file_desc.name()),
file_desc,
)
})
.collect();

// Add service to symbols
self.symbols.add_service(
service_desc.full_name().to_string(),
service_desc
.methods()
.map(|m| m.full_name().to_string())
.collect(),
files.keys().cloned().collect(),
);

// Collect all the files belonging to this service
let files: HashMap<String, FileDescriptor> =
collect_service_related_file_descriptors(service_desc)
.into_iter()
.map(|file_desc| {
(
normalize_file_name(deployment_id, file_desc.name()),
file_desc,
)
})
.collect();

// Add service to symbols
self.symbols.add_service(
service_desc.full_name().to_string(),
service_desc
.methods()
.map(|m| m.full_name().to_string())
.collect(),
files.keys().cloned().collect(),
);
// Process files to register symbols and files
for (file_name, file_desc) in files {
// Discover all symbols in this file
let mut message_or_enum_symbols = HashSet::new();
collect_message_or_enum_symbols(&mut message_or_enum_symbols, &file_desc);

// Process files to register symbols and files
for (file_name, file_desc) in files {
// Discover all symbols in this file
let mut message_or_enum_symbols = HashSet::new();
collect_message_or_enum_symbols(&mut message_or_enum_symbols, &file_desc);
// Copy the file_symbols in symbols_index
for symbol in message_or_enum_symbols.clone() {
self.symbols.add_message_or_enum(symbol, file_name.clone());
}

// Copy the file_symbols in symbols_index
for symbol in message_or_enum_symbols.clone() {
self.symbols.add_message_or_enum(symbol, file_name.clone());
// Add the file descriptor
self.files
.add(file_name, deployment_id, file_desc, message_or_enum_symbols);
}

// Add the file descriptor
self.files
.add(file_name, deployment_id, file_desc, message_or_enum_symbols);
}
}

pub(super) fn remove_service(&mut self, service_desc: &ServiceDescriptor) {
debug_assert!(
self.symbols.contains(service_desc.full_name()),
"Cannot remove service '{}' because it doesn't exist",
service_desc.full_name()
);

// Remove the service from the symbols index
let methods = service_desc.methods();
let service_related_files = self.symbols.remove_service(
service_desc.full_name(),
methods.map(|m| m.full_name().to_string()),
);

// For each file related to the service, remove it
for file_name in service_related_files.iter() {
// If when removing a file we free it, then we need to remove the related message and symbols as well
if let Some(message_or_enum_symbols_to_remove) = self.files.remove(file_name) {
for message_or_enum_symbol in message_or_enum_symbols_to_remove {
self.symbols
.remove_message_or_enum(&message_or_enum_symbol, file_name);
if let Some(service_related_files) = service_related_files {
// For each file related to the service, remove it
for file_name in service_related_files.iter() {
// If when removing a file we free it, then we need to remove the related message and symbols as well
if let Some(message_or_enum_symbols_to_remove) = self.files.remove(file_name) {
for message_or_enum_symbol in message_or_enum_symbols_to_remove {
self.symbols
.remove_message_or_enum(&message_or_enum_symbol, file_name);
}
}
}
}
Expand Down
54 changes: 53 additions & 1 deletion crates/schema-impl/src/schemas_impl/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,10 +700,13 @@ fn is_map_with(field_descriptor: &FieldDescriptor, key_kind: Kind, value_kind: K
#[cfg(test)]
mod tests {
use super::*;
use googletest::assert_that;
use googletest::matchers::{contains, eq, not};

use test_log::test;

use restate_schema_api::deployment::{Deployment, DeploymentResolver};
use restate_schema_api::proto_symbol::ProtoSymbolResolver;
use restate_schema_api::service::ServiceMetadataResolver;
use restate_test_util::{assert, assert_eq, let_assert};

Expand Down Expand Up @@ -742,7 +745,14 @@ mod tests {

schemas.assert_service_revision(GREETER_SERVICE_NAME, 1);
schemas.assert_resolves_deployment(GREETER_SERVICE_NAME, deployment_id);
assert_eq!(schemas.list_services().first().unwrap().methods.len(), 3);
assert_eq!(
ServiceMetadataResolver::list_services(&schemas)
.first()
.unwrap()
.methods
.len(),
3
);
}

#[test]
Expand Down Expand Up @@ -800,6 +810,48 @@ mod tests {
schemas.assert_service_revision(ANOTHER_GREETER_SERVICE_NAME, 1);
}

/// This test case ensures that https://github.com/restatedev/restate/issues/1205 works
#[test]
fn force_deploy_private_service() -> Result<(), SchemasUpdateError> {
let schemas = Schemas::default();
let deployment = Deployment::mock();

let initial_deployment_commands = schemas.compute_new_deployment(
Some(deployment.id),
deployment.metadata.clone(),
vec![GREETER_SERVICE_NAME.to_owned()],
DESCRIPTOR.clone(),
false,
)?;
schemas.apply_updates(initial_deployment_commands)?;

let mark_service_private = SchemasUpdateCommand::ModifyService {
name: GREETER_SERVICE_NAME.to_owned(),
public: false,
};
schemas.apply_updates(vec![mark_service_private])?;
assert_that!(
ProtoSymbolResolver::list_services(&schemas),
not(contains(eq(GREETER_SERVICE_NAME.to_owned())))
);

let forced_deployment_commands = schemas.compute_new_deployment(
Some(deployment.id),
deployment.metadata.clone(),
vec![GREETER_SERVICE_NAME.to_owned()],
DESCRIPTOR.clone(),
true,
)?;

schemas.apply_updates(forced_deployment_commands)?;
assert_that!(
ProtoSymbolResolver::list_services(&schemas),
contains(eq(GREETER_SERVICE_NAME.to_owned()))
);

Ok(())
}

mod change_instance_type {
use super::*;

Expand Down
19 changes: 7 additions & 12 deletions crates/schema-impl/src/schemas_impl/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,15 @@ impl SchemasInner {

// Update proto_symbols
if let ServiceLocation::Deployment {
public: old_public_value,
latest_deployment,
latest_deployment, ..
} = &schemas.location
{
match (*old_public_value, new_public_value) {
(true, false) => {
self.proto_symbols
.remove_service(schemas.service_descriptor());
}
(false, true) => {
self.proto_symbols
.add_service(latest_deployment, schemas.service_descriptor());
}
_ => {}
if new_public_value {
self.proto_symbols
.add_service(latest_deployment, schemas.service_descriptor());
} else {
self.proto_symbols
.remove_service(schemas.service_descriptor());
}
}

Expand Down

0 comments on commit 7c9855e

Please sign in to comment.