From 460f038ccd57fead6a78b01ae9bf8b4fee38d54f Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Thu, 19 Dec 2024 18:25:29 -0500 Subject: [PATCH] Pick a non-expunged clone source (#7283) When performing region snapshot replacement, the associated start saga chose the request's region snapshot as the clone source, but if that region snapshot was backed by an expunged dataset then it may be gone. This commit adds logic to choose another clone source, either another region snapshot from the same snapshot, or one of the read-only regions for that snapshot. Basic sanity tests were added for ensuring that region replacements and region snapshot replacements resulting from expungement can occur. It was an oversight not to originally include these! Rn order to support these new sanity tests, the simulated pantry has to fake activating volumes in the background. This commit also refactors the simulated Pantry to have one Mutex around an "inner" struct instead of many Mutexes. Fixes #7209 --- nexus/db-queries/src/db/datastore/region.rs | 36 ++ .../src/db/datastore/region_snapshot.rs | 36 ++ .../region_snapshot_replacement_start.rs | 568 +++++++++++++++++- .../crucible_replacements.rs | 379 +++++++++++- sled-agent/src/sim/storage.rs | 97 +-- 5 files changed, 1043 insertions(+), 73 deletions(-) diff --git a/nexus/db-queries/src/db/datastore/region.rs b/nexus/db-queries/src/db/datastore/region.rs index 8e59462aa3..67bd37cf69 100644 --- a/nexus/db-queries/src/db/datastore/region.rs +++ b/nexus/db-queries/src/db/datastore/region.rs @@ -548,6 +548,42 @@ impl DataStore { Ok(records) } + + /// Find regions not on expunged disks that match a volume id + pub async fn find_non_expunged_regions( + &self, + opctx: &OpContext, + volume_id: Uuid, + ) -> LookupResult> { + let conn = self.pool_connection_authorized(opctx).await?; + + use db::schema::dataset::dsl as dataset_dsl; + use db::schema::physical_disk::dsl as physical_disk_dsl; + use db::schema::region::dsl as region_dsl; + use db::schema::zpool::dsl as zpool_dsl; + + region_dsl::region + .filter(region_dsl::dataset_id.eq_any( + dataset_dsl::dataset + .filter(dataset_dsl::time_deleted.is_null()) + .filter(dataset_dsl::pool_id.eq_any( + zpool_dsl::zpool + .filter(zpool_dsl::time_deleted.is_null()) + .filter(zpool_dsl::physical_disk_id.eq_any( + physical_disk_dsl::physical_disk + .filter(physical_disk_dsl::disk_policy.eq(PhysicalDiskPolicy::InService)) + .select(physical_disk_dsl::id) + )) + .select(zpool_dsl::id) + )) + .select(dataset_dsl::id) + )) + .filter(region_dsl::volume_id.eq(volume_id)) + .select(Region::as_select()) + .load_async(&*conn) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } } #[cfg(test)] diff --git a/nexus/db-queries/src/db/datastore/region_snapshot.rs b/nexus/db-queries/src/db/datastore/region_snapshot.rs index 0129869f4f..f7a34fdb52 100644 --- a/nexus/db-queries/src/db/datastore/region_snapshot.rs +++ b/nexus/db-queries/src/db/datastore/region_snapshot.rs @@ -120,4 +120,40 @@ impl DataStore { .await .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) } + + /// Find region snapshots not on expunged disks that match a snapshot id + pub async fn find_non_expunged_region_snapshots( + &self, + opctx: &OpContext, + snapshot_id: Uuid, + ) -> LookupResult> { + let conn = self.pool_connection_authorized(opctx).await?; + + use db::schema::dataset::dsl as dataset_dsl; + use db::schema::physical_disk::dsl as physical_disk_dsl; + use db::schema::region_snapshot::dsl as region_snapshot_dsl; + use db::schema::zpool::dsl as zpool_dsl; + + region_snapshot_dsl::region_snapshot + .filter(region_snapshot_dsl::dataset_id.eq_any( + dataset_dsl::dataset + .filter(dataset_dsl::time_deleted.is_null()) + .filter(dataset_dsl::pool_id.eq_any( + zpool_dsl::zpool + .filter(zpool_dsl::time_deleted.is_null()) + .filter(zpool_dsl::physical_disk_id.eq_any( + physical_disk_dsl::physical_disk + .filter(physical_disk_dsl::disk_policy.eq(PhysicalDiskPolicy::InService)) + .select(physical_disk_dsl::id) + )) + .select(zpool_dsl::id) + )) + .select(dataset_dsl::id) + )) + .filter(region_snapshot_dsl::snapshot_id.eq(snapshot_id)) + .select(RegionSnapshot::as_select()) + .load_async(&*conn) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } } diff --git a/nexus/src/app/sagas/region_snapshot_replacement_start.rs b/nexus/src/app/sagas/region_snapshot_replacement_start.rs index bb5fd60209..b9ed75c288 100644 --- a/nexus/src/app/sagas/region_snapshot_replacement_start.rs +++ b/nexus/src/app/sagas/region_snapshot_replacement_start.rs @@ -65,6 +65,7 @@ use crate::app::{authn, db}; use nexus_types::identity::Asset; use nexus_types::identity::Resource; use omicron_common::api::external::Error; +use omicron_uuid_kinds::DatasetUuid; use serde::Deserialize; use serde::Serialize; use sled_agent_client::types::CrucibleOpts; @@ -91,6 +92,9 @@ declare_saga_actions! { + rsrss_set_saga_id - rsrss_set_saga_id_undo } + GET_CLONE_SOURCE -> "clone_source" { + + rsrss_get_clone_source + } GET_ALLOC_REGION_PARAMS -> "alloc_region_params" { + rsrss_get_alloc_region_params } @@ -194,6 +198,7 @@ impl NexusSaga for SagaRegionSnapshotReplacementStart { )); builder.append(set_saga_id_action()); + builder.append(get_clone_source_action()); builder.append(get_alloc_region_params_action()); builder.append(alloc_new_region_action()); builder.append(find_new_region_action()); @@ -265,6 +270,169 @@ async fn rsrss_set_saga_id_undo( Ok(()) } +#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)] +enum CloneSource { + RegionSnapshot { dataset_id: DatasetUuid, region_id: Uuid }, + Region { region_id: Uuid }, +} + +async fn rsrss_get_clone_source( + sagactx: NexusActionContext, +) -> Result { + let params = sagactx.saga_params::()?; + let osagactx = sagactx.user_data(); + let log = osagactx.log(); + + // Find either a region snapshot or a read-only region that is associated + // with the request snapshot that has not been expunged, and return that as + // the source to be used to populate the read-only region that will replace + // the request's region snapshot. + // + // Importantly, determine the clone source before new region alloc step in + // this saga, otherwise the query that searches for read-only region + // candidates will match the newly allocated region (that is not created + // yet!). + // + // Choose a clone source based on the following policy: + // + // - choose a region snapshot associated with the one being replaced + // + // - choose a read-only region from the associated snapshot volume + // + // - choose the region snapshot being replaced (only if it is not expunged! + // if the downstairs being cloned from is on an expunged dataset, we have + // to assume that the clone will never succeed, even if the expunged + // thing is still there) + // + // The policy here prefers to choose a clone source that isn't the region + // snapshot in the request: if it's flaky, it shouldn't be used as a clone + // source! This function does not know _why_ the replacement request was + // created for that region snapshot, and assumes that there may be a problem + // with it and will choose it as a last resort (if no other candidate clone + // source is found and the request's region snapshot is not on an expunged + // dataset, then it has to be chosen as a clone source, as the alternative + // is lost data). The request's region snapshot may also be completely fine, + // for example if a scrub is being requested. + // + // Also, the policy also prefers to choose to clone from a region snapshot + // instead of a read-only region: this is an arbitrary order, there is no + // reason behind this. The region snapshots and read-only regions will have + // identical contents. + + // First, try to select another region snapshot that's part of this + // snapshot. + + let opctx = crate::context::op_context_for_saga_action( + &sagactx, + ¶ms.serialized_authn, + ); + + let mut non_expunged_region_snapshots = osagactx + .datastore() + .find_non_expunged_region_snapshots( + &opctx, + params.request.old_snapshot_id, + ) + .await + .map_err(ActionError::action_failed)?; + + // Filter out the request's region snapshot - if there are no other + // candidates, this could be chosen later in this function. + + non_expunged_region_snapshots.retain(|rs| { + !(rs.dataset_id == params.request.old_dataset_id + && rs.region_id == params.request.old_region_id + && rs.snapshot_id == params.request.old_snapshot_id) + }); + + if let Some(candidate) = non_expunged_region_snapshots.pop() { + info!( + log, + "found another non-expunged region snapshot"; + "snapshot_id" => %params.request.old_snapshot_id, + "dataset_id" => %candidate.dataset_id, + "region_id" => %candidate.region_id, + ); + + return Ok(CloneSource::RegionSnapshot { + dataset_id: candidate.dataset_id.into(), + region_id: candidate.region_id, + }); + } + + // Next, try to select a read-only region that's associated with the + // snapshot volume + + info!( + log, + "no region snapshot clone source candidates"; + "snapshot_id" => %params.request.old_snapshot_id, + ); + + // Look up the existing snapshot + let maybe_db_snapshot = osagactx + .datastore() + .snapshot_get(&opctx, params.request.old_snapshot_id) + .await + .map_err(ActionError::action_failed)?; + + let Some(db_snapshot) = maybe_db_snapshot else { + return Err(ActionError::action_failed(Error::internal_error( + &format!( + "snapshot {} was hard deleted!", + params.request.old_snapshot_id + ), + ))); + }; + + let mut non_expunged_read_only_regions = osagactx + .datastore() + .find_non_expunged_regions(&opctx, db_snapshot.volume_id) + .await + .map_err(ActionError::action_failed)?; + + if let Some(candidate) = non_expunged_read_only_regions.pop() { + info!( + log, + "found region clone source candidate"; + "snapshot_id" => %params.request.old_snapshot_id, + "dataset_id" => %candidate.dataset_id(), + "region_id" => %candidate.id(), + ); + + return Ok(CloneSource::Region { region_id: candidate.id() }); + } + + // If no other non-expunged region snapshot or read-only region exists, then + // check if the request's region snapshot is non-expunged. This will use the + // region snapshot that is being replaced as a clone source, which may not + // work if there's a problem with that region snapshot that this replacement + // request is meant to fix! + + let request_dataset_on_in_service_physical_disk = osagactx + .datastore() + .dataset_physical_disk_in_service(params.request.old_dataset_id.into()) + .await + .map_err(ActionError::action_failed)?; + + if request_dataset_on_in_service_physical_disk { + // If the request region snapshot's dataset has not been expunged, it + // can be used + return Ok(CloneSource::RegionSnapshot { + dataset_id: params.request.old_dataset_id.into(), + region_id: params.request.old_region_id, + }); + } + + // If all targets of a Volume::Region are on expunged datasets, then the + // user's data is gone, and this code will fail to select a clone source. + + return Err(ActionError::action_failed(format!( + "no clone source candidate for {}!", + params.request.old_snapshot_id, + ))); +} + #[derive(Debug, Deserialize, Serialize)] struct AllocRegionParams { block_size: u64, @@ -445,46 +613,67 @@ async fn rsrss_new_region_ensure( "new_dataset_and_region", )?; - let region_snapshot = osagactx - .datastore() - .region_snapshot_get( - params.request.old_dataset_id.into(), - params.request.old_region_id, - params.request.old_snapshot_id, - ) - .await - .map_err(ActionError::action_failed)?; + let clone_source = sagactx.lookup::("clone_source")?; + + let mut source_repair_addr: SocketAddrV6 = match clone_source { + CloneSource::RegionSnapshot { dataset_id, region_id } => { + let region_snapshot = osagactx + .datastore() + .region_snapshot_get( + dataset_id, + region_id, + params.request.old_snapshot_id, + ) + .await + .map_err(ActionError::action_failed)?; - let Some(region_snapshot) = region_snapshot else { - return Err(ActionError::action_failed(format!( - "region snapshot {} {} {} deleted!", - params.request.old_dataset_id, - params.request.old_region_id, - params.request.old_snapshot_id, - ))); - }; + let Some(region_snapshot) = region_snapshot else { + return Err(ActionError::action_failed(format!( + "region snapshot {} {} {} deleted!", + dataset_id, region_id, params.request.old_snapshot_id, + ))); + }; - let (new_dataset, new_region) = new_dataset_and_region; + match region_snapshot.snapshot_addr.parse() { + Ok(addr) => addr, - // Currently, the repair port is set using a fixed offset above the - // downstairs port. Once this goes away, Nexus will require a way to query - // for the repair port! + Err(e) => { + return Err(ActionError::action_failed(format!( + "error parsing region_snapshot.snapshot_addr: {e}" + ))); + } + } + } - let mut source_repair_addr: SocketAddrV6 = - match region_snapshot.snapshot_addr.parse() { - Ok(addr) => addr, + CloneSource::Region { region_id } => { + let maybe_addr = osagactx + .datastore() + .region_addr(region_id) + .await + .map_err(ActionError::action_failed)?; - Err(e) => { - return Err(ActionError::action_failed(format!( - "error parsing region_snapshot.snapshot_addr: {e}" - ))); + match maybe_addr { + Some(addr) => addr, + + None => { + return Err(ActionError::action_failed(format!( + "region clone source {region_id} has no port!" + ))); + } } - }; + } + }; + + // Currently, the repair port is set using a fixed offset above the + // downstairs port. Once this goes away, Nexus will require a way to query + // for the repair port! source_repair_addr.set_port( source_repair_addr.port() + crucible_common::REPAIR_PORT_OFFSET, ); + let (new_dataset, new_region) = new_dataset_and_region; + let ensured_region = osagactx .nexus() .ensure_region_in_dataset( @@ -945,6 +1134,7 @@ pub(crate) mod test { app::sagas::region_snapshot_replacement_start::*, app::sagas::test_helpers::test_opctx, app::RegionAllocationStrategy, }; + use nexus_db_model::PhysicalDiskPolicy; use nexus_db_model::RegionSnapshotReplacement; use nexus_db_model::RegionSnapshotReplacementState; use nexus_db_model::Volume; @@ -954,9 +1144,11 @@ pub(crate) mod test { use nexus_test_utils::resource_helpers::create_project; use nexus_test_utils::resource_helpers::create_snapshot; use nexus_test_utils::resource_helpers::DiskTest; + use nexus_test_utils::resource_helpers::DiskTestBuilder; use nexus_test_utils_macros::nexus_test; use nexus_types::external_api::views; use nexus_types::identity::Asset; + use omicron_uuid_kinds::GenericUuid; use sled_agent_client::types::VolumeConstructionRequest; type ControlPlaneTestContext = @@ -1517,4 +1709,322 @@ pub(crate) mod test { ) .await; } + + /// Tests that the region snapshot replacement start saga will not choose + /// the request's region snapshot, but instead will choose the other + /// non-expunged one. + #[nexus_test(server = crate::Server)] + async fn test_region_snapshot_replacement_start_prefer_not_self( + cptestctx: &ControlPlaneTestContext, + ) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = OpContext::for_tests( + cptestctx.logctx.log.new(o!()), + datastore.clone(), + ); + + // Create four zpools, each with one dataset. This is required for + // region and region snapshot replacement to have somewhere to move the + // data, and for this test we're doing one expungements. + let sled_id = cptestctx.first_sled(); + + let disk_test = DiskTestBuilder::new(&cptestctx) + .on_specific_sled(sled_id) + .with_zpool_count(4) + .build() + .await; + + // Any volumes sent to the Pantry for reconciliation should return + // active for this test + + cptestctx + .sled_agent + .pantry_server + .as_ref() + .unwrap() + .pantry + .set_auto_activate_volumes() + .await; + + // Create a disk and a snapshot + let client = &cptestctx.external_client; + let _project_id = + create_project(&client, PROJECT_NAME).await.identity.id; + + let disk = create_disk(&client, PROJECT_NAME, "disk").await; + let snapshot = + create_snapshot(&client, PROJECT_NAME, "disk", "snap").await; + + // Before expunging any physical disk, save some DB models + let (.., db_disk) = LookupPath::new(&opctx, &datastore) + .disk_id(disk.identity.id) + .fetch() + .await + .unwrap(); + + let (.., db_snapshot) = LookupPath::new(&opctx, &datastore) + .snapshot_id(snapshot.identity.id) + .fetch() + .await + .unwrap(); + + let disk_allocated_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + let snapshot_allocated_regions = datastore + .get_allocated_regions(db_snapshot.volume_id) + .await + .unwrap(); + + assert_eq!(disk_allocated_regions.len(), 3); + assert_eq!(snapshot_allocated_regions.len(), 0); + + // Expunge one physical disk + { + let (dataset, _) = &disk_allocated_regions[0]; + + let zpool = disk_test + .zpools() + .find(|x| *x.id.as_untyped_uuid() == dataset.pool_id) + .expect("Expected at least one zpool"); + + let (_, db_zpool) = LookupPath::new(&opctx, datastore) + .zpool_id(zpool.id.into_untyped_uuid()) + .fetch() + .await + .unwrap(); + + datastore + .physical_disk_update_policy( + &opctx, + db_zpool.physical_disk_id.into(), + PhysicalDiskPolicy::Expunged, + ) + .await + .unwrap(); + } + + // Request that the second region snapshot be replaced + + let region_snapshot = datastore + .region_snapshot_get( + disk_allocated_regions[1].0.id(), // dataset id + disk_allocated_regions[1].1.id(), // region id + snapshot.identity.id, + ) + .await + .unwrap() + .unwrap(); + + let request_id = datastore + .create_region_snapshot_replacement_request( + &opctx, + ®ion_snapshot, + ) + .await + .unwrap(); + + // Manually invoke the region snapshot replacement start saga + + let saga_outputs = nexus + .sagas + .saga_execute::(Params { + serialized_authn: Serialized::for_opctx(&opctx), + + request: datastore + .get_region_snapshot_replacement_request_by_id( + &opctx, request_id, + ) + .await + .unwrap(), + + allocation_strategy: RegionAllocationStrategy::Random { + seed: None, + }, + }) + .await + .unwrap(); + + // The third region snapshot should have been selected as the clone + // source + + let selected_clone_source = saga_outputs + .lookup_node_output::("clone_source") + .unwrap(); + + assert_eq!( + selected_clone_source, + CloneSource::RegionSnapshot { + dataset_id: disk_allocated_regions[2].0.id(), + region_id: disk_allocated_regions[2].1.id(), + }, + ); + + let snapshot_allocated_regions = datastore + .get_allocated_regions(db_snapshot.volume_id) + .await + .unwrap(); + + assert_eq!(snapshot_allocated_regions.len(), 1); + assert!(snapshot_allocated_regions.iter().all(|(_, r)| r.read_only())); + } + + /// Tests that a region snapshot replacement request can select the region + /// snapshot being replaced as a clone source (but only if it is not + /// expunged!) + #[nexus_test(server = crate::Server)] + async fn test_region_snapshot_replacement_start_hail_mary( + cptestctx: &ControlPlaneTestContext, + ) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = OpContext::for_tests( + cptestctx.logctx.log.new(o!()), + datastore.clone(), + ); + + // Create five zpools, each with one dataset. This is required for + // region and region snapshot replacement to have somewhere to move the + // data, and for this test we're doing two expungements. + let sled_id = cptestctx.first_sled(); + + let disk_test = DiskTestBuilder::new(&cptestctx) + .on_specific_sled(sled_id) + .with_zpool_count(5) + .build() + .await; + + // Any volumes sent to the Pantry for reconciliation should return + // active for this test + + cptestctx + .sled_agent + .pantry_server + .as_ref() + .unwrap() + .pantry + .set_auto_activate_volumes() + .await; + + // Create a disk and a snapshot + let client = &cptestctx.external_client; + let _project_id = + create_project(&client, PROJECT_NAME).await.identity.id; + + let disk = create_disk(&client, PROJECT_NAME, "disk").await; + let snapshot = + create_snapshot(&client, PROJECT_NAME, "disk", "snap").await; + + // Before expunging any physical disk, save some DB models + let (.., db_disk) = LookupPath::new(&opctx, &datastore) + .disk_id(disk.identity.id) + .fetch() + .await + .unwrap(); + + let (.., db_snapshot) = LookupPath::new(&opctx, &datastore) + .snapshot_id(snapshot.identity.id) + .fetch() + .await + .unwrap(); + + let disk_allocated_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + let snapshot_allocated_regions = datastore + .get_allocated_regions(db_snapshot.volume_id) + .await + .unwrap(); + + assert_eq!(disk_allocated_regions.len(), 3); + assert_eq!(snapshot_allocated_regions.len(), 0); + + // Expunge two physical disks + for i in [0, 1] { + let (dataset, _) = &disk_allocated_regions[i]; + + let zpool = disk_test + .zpools() + .find(|x| *x.id.as_untyped_uuid() == dataset.pool_id) + .expect("Expected at least one zpool"); + + let (_, db_zpool) = LookupPath::new(&opctx, datastore) + .zpool_id(zpool.id.into_untyped_uuid()) + .fetch() + .await + .unwrap(); + + datastore + .physical_disk_update_policy( + &opctx, + db_zpool.physical_disk_id.into(), + PhysicalDiskPolicy::Expunged, + ) + .await + .unwrap(); + } + + // Request that the third region snapshot be replaced + + let region_snapshot = datastore + .region_snapshot_get( + disk_allocated_regions[2].0.id(), // dataset id + disk_allocated_regions[2].1.id(), // region id + snapshot.identity.id, + ) + .await + .unwrap() + .unwrap(); + + let request_id = datastore + .create_region_snapshot_replacement_request( + &opctx, + ®ion_snapshot, + ) + .await + .unwrap(); + + // Manually invoke the region snapshot replacement start saga + + let saga_outputs = nexus + .sagas + .saga_execute::(Params { + serialized_authn: Serialized::for_opctx(&opctx), + + request: datastore + .get_region_snapshot_replacement_request_by_id( + &opctx, request_id, + ) + .await + .unwrap(), + + allocation_strategy: RegionAllocationStrategy::Random { + seed: None, + }, + }) + .await + .unwrap(); + + // This should have chosen the request's region snapshot as a clone + // source, and replaced it with a read-only region + + let selected_clone_source = saga_outputs + .lookup_node_output::("clone_source") + .unwrap(); + + assert_eq!( + selected_clone_source, + CloneSource::RegionSnapshot { + dataset_id: disk_allocated_regions[2].0.id(), + region_id: disk_allocated_regions[2].1.id(), + }, + ); + + let snapshot_allocated_regions = datastore + .get_allocated_regions(db_snapshot.volume_id) + .await + .unwrap(); + + assert_eq!(snapshot_allocated_regions.len(), 1); + assert!(snapshot_allocated_regions.iter().all(|(_, r)| r.read_only())); + } } diff --git a/nexus/tests/integration_tests/crucible_replacements.rs b/nexus/tests/integration_tests/crucible_replacements.rs index e84c8a0614..57dc624187 100644 --- a/nexus/tests/integration_tests/crucible_replacements.rs +++ b/nexus/tests/integration_tests/crucible_replacements.rs @@ -881,11 +881,36 @@ async fn test_racing_replacements_for_soft_deleted_disk_volume( .await; // Assert the region snapshot was deleted. - assert!(datastore - .region_snapshot_get(dataset.id(), region.id(), snapshot.identity.id) - .await - .unwrap() - .is_none()); + wait_for_condition( + || { + let dataset_id = dataset.id(); + let region_id = region.id(); + let snapshot_id = snapshot.identity.id; + + async move { + let region_snapshot = datastore + .region_snapshot_get(dataset_id, region_id, snapshot_id) + .await + .unwrap(); + + match region_snapshot { + Some(_) => { + // Region snapshot not garbage collected yet + Err(CondCheckError::<()>::NotYet) + } + + None => { + // Region snapshot garbage collected ok + Ok(()) + } + } + } + }, + &std::time::Duration::from_millis(500), + &std::time::Duration::from_secs(60), + ) + .await + .expect("region snapshot garbage collected"); // Assert that the disk's volume is still only soft-deleted, because the two // other associated region snapshots still exist. @@ -959,12 +984,19 @@ async fn test_racing_replacements_for_soft_deleted_disk_volume( // The saga transitioned the request ok Ok(()) } else if state == RegionReplacementState::Driving { - // The saga is still running + // The drive saga is still running + Err(CondCheckError::<()>::NotYet) + } else if state == RegionReplacementState::Running { + // The drive saga hasn't started yet Err(CondCheckError::<()>::NotYet) } else if state == RegionReplacementState::Completing { // The saga transitioned the request ok, and it's now being // finished by the region replacement finish saga Ok(()) + } else if state == RegionReplacementState::Complete { + // The saga transitioned the request ok, and it was finished + // by the region replacement finish saga + Ok(()) } else { // Any other state is not expected panic!("unexpected state {state:?}!"); @@ -1707,3 +1739,338 @@ async fn test_delete_volume_region_snapshot_replacement_step( test_harness.assert_no_crucible_resources_leaked().await; } + +/// Tests that replacement can occur until completion +#[nexus_test] +async fn test_replacement_sanity(cptestctx: &ControlPlaneTestContext) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = + OpContext::for_tests(cptestctx.logctx.log.new(o!()), datastore.clone()); + + // Create four zpools, each with one dataset. This is required for region + // and region snapshot replacement to have somewhere to move the data. + let sled_id = cptestctx.first_sled(); + + let disk_test = DiskTestBuilder::new(&cptestctx) + .on_specific_sled(sled_id) + .with_zpool_count(4) + .build() + .await; + + // Create a disk and a snapshot and a disk from that snapshot + let client = &cptestctx.external_client; + let _project_id = create_project_and_pool(client).await; + + let disk = create_disk(&client, PROJECT_NAME, "disk").await; + let snapshot = create_snapshot(&client, PROJECT_NAME, "disk", "snap").await; + let _disk_from_snapshot = create_disk_from_snapshot( + &client, + PROJECT_NAME, + "disk-from-snap", + snapshot.identity.id, + ) + .await; + + // Before expunging the physical disk, save the DB model + let (.., db_disk) = LookupPath::new(&opctx, &datastore) + .disk_id(disk.identity.id) + .fetch() + .await + .unwrap(); + + assert_eq!(db_disk.id(), disk.identity.id); + + // Next, expunge a physical disk that contains a region + + let disk_allocated_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + let (dataset, _) = &disk_allocated_regions[0]; + + let zpool = disk_test + .zpools() + .find(|x| *x.id.as_untyped_uuid() == dataset.pool_id) + .expect("Expected at least one zpool"); + + let (_, db_zpool) = LookupPath::new(&opctx, datastore) + .zpool_id(zpool.id.into_untyped_uuid()) + .fetch() + .await + .unwrap(); + + datastore + .physical_disk_update_policy( + &opctx, + db_zpool.physical_disk_id.into(), + PhysicalDiskPolicy::Expunged, + ) + .await + .unwrap(); + + // Any volumes sent to the Pantry for reconciliation should return active + // for this test + + cptestctx + .sled_agent + .pantry_server + .as_ref() + .unwrap() + .pantry + .set_auto_activate_volumes() + .await; + + // Now, run all replacement tasks to completion + let internal_client = &cptestctx.internal_client; + run_replacement_tasks_to_completion(&internal_client).await; +} + +/// Tests that multiple replacements can occur until completion +#[nexus_test] +async fn test_region_replacement_triple_sanity( + cptestctx: &ControlPlaneTestContext, +) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = + OpContext::for_tests(cptestctx.logctx.log.new(o!()), datastore.clone()); + + // Create five zpools, each with one dataset. This is required for region + // and region snapshot replacement to have somewhere to move the data, and + // for this test we're doing two expungements. + let sled_id = cptestctx.first_sled(); + + let disk_test = DiskTestBuilder::new(&cptestctx) + .on_specific_sled(sled_id) + .with_zpool_count(6) + .build() + .await; + + // Any volumes sent to the Pantry for reconciliation should return active + // for this test + + cptestctx + .sled_agent + .pantry_server + .as_ref() + .unwrap() + .pantry + .set_auto_activate_volumes() + .await; + + // Create a disk and a snapshot and a disk from that snapshot + let client = &cptestctx.external_client; + let _project_id = create_project_and_pool(client).await; + + let disk = create_disk(&client, PROJECT_NAME, "disk").await; + let snapshot = create_snapshot(&client, PROJECT_NAME, "disk", "snap").await; + let _disk_from_snapshot = create_disk_from_snapshot( + &client, + PROJECT_NAME, + "disk-from-snap", + snapshot.identity.id, + ) + .await; + + // Before expunging any physical disk, save some DB models + let (.., db_disk) = LookupPath::new(&opctx, &datastore) + .disk_id(disk.identity.id) + .fetch() + .await + .unwrap(); + + let (.., db_snapshot) = LookupPath::new(&opctx, &datastore) + .snapshot_id(snapshot.identity.id) + .fetch() + .await + .unwrap(); + + let internal_client = &cptestctx.internal_client; + + let disk_allocated_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + let snapshot_allocated_regions = + datastore.get_allocated_regions(db_snapshot.volume_id).await.unwrap(); + + assert_eq!(disk_allocated_regions.len(), 3); + assert_eq!(snapshot_allocated_regions.len(), 0); + + for i in disk_allocated_regions { + let (dataset, _) = &i; + + let zpool = disk_test + .zpools() + .find(|x| *x.id.as_untyped_uuid() == dataset.pool_id) + .expect("Expected at least one zpool"); + + let (_, db_zpool) = LookupPath::new(&opctx, datastore) + .zpool_id(zpool.id.into_untyped_uuid()) + .fetch() + .await + .unwrap(); + + datastore + .physical_disk_update_policy( + &opctx, + db_zpool.physical_disk_id.into(), + PhysicalDiskPolicy::Expunged, + ) + .await + .unwrap(); + + // Now, run all replacement tasks to completion + run_replacement_tasks_to_completion(&internal_client).await; + } + + let disk_allocated_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + let snapshot_allocated_regions = + datastore.get_allocated_regions(db_snapshot.volume_id).await.unwrap(); + + assert_eq!(disk_allocated_regions.len(), 3); + assert!(disk_allocated_regions.iter().all(|(_, r)| !r.read_only())); + + // Assert region snapshots replaced with three read-only regions + assert_eq!(snapshot_allocated_regions.len(), 3); + assert!(snapshot_allocated_regions.iter().all(|(_, r)| r.read_only())); +} + +/// Tests that multiple replacements can occur until completion, after expunging +/// two physical disks before any replacements occur (aka we can lose two +/// physical disks and still recover) +#[nexus_test] +async fn test_region_replacement_triple_sanity_2( + cptestctx: &ControlPlaneTestContext, +) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = + OpContext::for_tests(cptestctx.logctx.log.new(o!()), datastore.clone()); + + // Create five zpools, each with one dataset. This is required for region + // and region snapshot replacement to have somewhere to move the data, and + // for this test we're doing two expungements. + let sled_id = cptestctx.first_sled(); + + let disk_test = DiskTestBuilder::new(&cptestctx) + .on_specific_sled(sled_id) + .with_zpool_count(6) + .build() + .await; + + // Any volumes sent to the Pantry for reconciliation should return active + // for this test + + cptestctx + .sled_agent + .pantry_server + .as_ref() + .unwrap() + .pantry + .set_auto_activate_volumes() + .await; + + // Create a disk and a snapshot and a disk from that snapshot + let client = &cptestctx.external_client; + let _project_id = create_project_and_pool(client).await; + + let disk = create_disk(&client, PROJECT_NAME, "disk").await; + let snapshot = create_snapshot(&client, PROJECT_NAME, "disk", "snap").await; + let _disk_from_snapshot = create_disk_from_snapshot( + &client, + PROJECT_NAME, + "disk-from-snap", + snapshot.identity.id, + ) + .await; + + // Before expunging any physical disk, save some DB models + let (.., db_disk) = LookupPath::new(&opctx, &datastore) + .disk_id(disk.identity.id) + .fetch() + .await + .unwrap(); + + let (.., db_snapshot) = LookupPath::new(&opctx, &datastore) + .snapshot_id(snapshot.identity.id) + .fetch() + .await + .unwrap(); + + let internal_client = &cptestctx.internal_client; + + let disk_allocated_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + let snapshot_allocated_regions = + datastore.get_allocated_regions(db_snapshot.volume_id).await.unwrap(); + + assert_eq!(disk_allocated_regions.len(), 3); + assert_eq!(snapshot_allocated_regions.len(), 0); + + // Expunge two physical disks before any replacements occur + for i in [0, 1] { + let (dataset, _) = &disk_allocated_regions[i]; + + let zpool = disk_test + .zpools() + .find(|x| *x.id.as_untyped_uuid() == dataset.pool_id) + .expect("Expected at least one zpool"); + + let (_, db_zpool) = LookupPath::new(&opctx, datastore) + .zpool_id(zpool.id.into_untyped_uuid()) + .fetch() + .await + .unwrap(); + + datastore + .physical_disk_update_policy( + &opctx, + db_zpool.physical_disk_id.into(), + PhysicalDiskPolicy::Expunged, + ) + .await + .unwrap(); + } + + // Now, run all replacement tasks to completion + run_replacement_tasks_to_completion(&internal_client).await; + + // Expunge the last physical disk + { + let (dataset, _) = &disk_allocated_regions[2]; + + let zpool = disk_test + .zpools() + .find(|x| *x.id.as_untyped_uuid() == dataset.pool_id) + .expect("Expected at least one zpool"); + + let (_, db_zpool) = LookupPath::new(&opctx, datastore) + .zpool_id(zpool.id.into_untyped_uuid()) + .fetch() + .await + .unwrap(); + + datastore + .physical_disk_update_policy( + &opctx, + db_zpool.physical_disk_id.into(), + PhysicalDiskPolicy::Expunged, + ) + .await + .unwrap(); + } + + // Now, run all replacement tasks to completion + run_replacement_tasks_to_completion(&internal_client).await; + + let disk_allocated_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + let snapshot_allocated_regions = + datastore.get_allocated_regions(db_snapshot.volume_id).await.unwrap(); + + assert_eq!(disk_allocated_regions.len(), 3); + assert!(disk_allocated_regions.iter().all(|(_, r)| !r.read_only())); + + // Assert region snapshots replaced with three read-only regions + assert_eq!(snapshot_allocated_regions.len(), 3); + assert!(snapshot_allocated_regions.iter().all(|(_, r)| r.read_only())); +} diff --git a/sled-agent/src/sim/storage.rs b/sled-agent/src/sim/storage.rs index 8fd648096a..2299ba9db9 100644 --- a/sled-agent/src/sim/storage.rs +++ b/sled-agent/src/sim/storage.rs @@ -117,6 +117,8 @@ impl CrucibleDataInner { bail!("region creation error!"); } + let read_only = params.source.is_some(); + let region = Region { id: params.id, block_size: params.block_size, @@ -129,8 +131,8 @@ impl CrucibleDataInner { cert_pem: None, key_pem: None, root_pem: None, - source: None, - read_only: params.source.is_some(), + source: params.source, + read_only, }; let old = self.regions.insert(id, region.clone()); @@ -1364,29 +1366,41 @@ pub struct PantryVolume { activate_job: Option, } +pub struct PantryInner { + /// Map Volume UUID to PantryVolume struct + volumes: HashMap, + + jobs: HashSet, + + /// Auto activate volumes attached in the background + auto_activate_volumes: bool, +} + /// Simulated crucible pantry pub struct Pantry { pub id: OmicronZoneUuid, - /// Map Volume UUID to PantryVolume struct - volumes: Mutex>, sled_agent: Arc, - jobs: Mutex>, + inner: Mutex, } impl Pantry { pub fn new(sled_agent: Arc) -> Self { Self { id: OmicronZoneUuid::new_v4(), - volumes: Mutex::new(HashMap::default()), sled_agent, - jobs: Mutex::new(HashSet::default()), + inner: Mutex::new(PantryInner { + volumes: HashMap::default(), + jobs: HashSet::default(), + auto_activate_volumes: false, + }), } } pub async fn status(&self) -> Result { + let inner = self.inner.lock().await; Ok(PantryStatus { - volumes: self.volumes.lock().await.keys().cloned().collect(), - num_job_handles: self.jobs.lock().await.len(), + volumes: inner.volumes.keys().cloned().collect(), + num_job_handles: inner.jobs.len(), }) } @@ -1394,8 +1408,9 @@ impl Pantry { &self, volume_id: String, ) -> Result { - let volumes = self.volumes.lock().await; - match volumes.get(&volume_id) { + let inner = self.inner.lock().await; + + match inner.volumes.get(&volume_id) { Some(entry) => Ok(entry.vcr.clone()), None => Err(HttpError::for_not_found(None, volume_id)), @@ -1407,9 +1422,9 @@ impl Pantry { volume_id: String, volume_construction_request: VolumeConstructionRequest, ) -> Result<()> { - let mut volumes = self.volumes.lock().await; + let mut inner = self.inner.lock().await; - volumes.insert( + inner.volumes.insert( volume_id, PantryVolume { vcr: volume_construction_request, @@ -1425,29 +1440,34 @@ impl Pantry { Ok(()) } + pub async fn set_auto_activate_volumes(&self) { + self.inner.lock().await.auto_activate_volumes = true; + } + pub async fn attach_activate_background( &self, volume_id: String, activate_job_id: String, volume_construction_request: VolumeConstructionRequest, ) -> Result<(), HttpError> { - let mut volumes = self.volumes.lock().await; - let mut jobs = self.jobs.lock().await; + let mut inner = self.inner.lock().await; + + let auto_activate_volumes = inner.auto_activate_volumes; - volumes.insert( + inner.volumes.insert( volume_id, PantryVolume { vcr: volume_construction_request, status: VolumeStatus { - active: false, - seen_active: false, + active: auto_activate_volumes, + seen_active: auto_activate_volumes, num_job_handles: 1, }, activate_job: Some(activate_job_id.clone()), }, ); - jobs.insert(activate_job_id); + inner.jobs.insert(activate_job_id); Ok(()) } @@ -1457,8 +1477,8 @@ impl Pantry { volume_id: String, ) -> Result { let activate_job = { - let volumes = self.volumes.lock().await; - volumes.get(&volume_id).unwrap().activate_job.clone().unwrap() + let inner = self.inner.lock().await; + inner.volumes.get(&volume_id).unwrap().activate_job.clone().unwrap() }; let mut status = self.volume_status(volume_id.clone()).await?; @@ -1475,9 +1495,9 @@ impl Pantry { &self, volume_id: String, ) -> Result { - let volumes = self.volumes.lock().await; + let inner = self.inner.lock().await; - match volumes.get(&volume_id) { + match inner.volumes.get(&volume_id) { Some(pantry_volume) => Ok(pantry_volume.status.clone()), None => Err(HttpError::for_not_found(None, volume_id)), @@ -1489,9 +1509,9 @@ impl Pantry { volume_id: String, status: VolumeStatus, ) -> Result<(), HttpError> { - let mut volumes = self.volumes.lock().await; + let mut inner = self.inner.lock().await; - match volumes.get_mut(&volume_id) { + match inner.volumes.get_mut(&volume_id) { Some(pantry_volume) => { pantry_volume.status = status; Ok(()) @@ -1505,8 +1525,8 @@ impl Pantry { &self, job_id: String, ) -> Result { - let jobs = self.jobs.lock().await; - if !jobs.contains(&job_id) { + let inner = self.inner.lock().await; + if !inner.jobs.contains(&job_id) { return Err(HttpError::for_not_found(None, job_id)); } Ok(true) @@ -1516,11 +1536,11 @@ impl Pantry { &self, job_id: String, ) -> Result, HttpError> { - let mut jobs = self.jobs.lock().await; - if !jobs.contains(&job_id) { + let mut inner = self.inner.lock().await; + if !inner.jobs.contains(&job_id) { return Err(HttpError::for_not_found(None, job_id)); } - jobs.remove(&job_id); + inner.jobs.remove(&job_id); Ok(Ok(true)) } @@ -1533,9 +1553,9 @@ impl Pantry { self.entry(volume_id).await?; // Make up job - let mut jobs = self.jobs.lock().await; + let mut inner = self.inner.lock().await; let job_id = Uuid::new_v4().to_string(); - jobs.insert(job_id.clone()); + inner.jobs.insert(job_id.clone()); Ok(job_id) } @@ -1549,8 +1569,9 @@ impl Pantry { // the simulated instance ensure, then call // [`instance_issue_disk_snapshot_request`] as the snapshot logic is the // same. - let volumes = self.volumes.lock().await; - let volume_construction_request = &volumes.get(&volume_id).unwrap().vcr; + let inner = self.inner.lock().await; + let volume_construction_request = + &inner.volumes.get(&volume_id).unwrap().vcr; self.sled_agent .map_disk_ids_to_region_ids(volume_construction_request) @@ -1630,16 +1651,16 @@ impl Pantry { self.entry(volume_id).await?; // Make up job - let mut jobs = self.jobs.lock().await; + let mut inner = self.inner.lock().await; let job_id = Uuid::new_v4().to_string(); - jobs.insert(job_id.clone()); + inner.jobs.insert(job_id.clone()); Ok(job_id) } pub async fn detach(&self, volume_id: String) -> Result<()> { - let mut volumes = self.volumes.lock().await; - volumes.remove(&volume_id); + let mut inner = self.inner.lock().await; + inner.volumes.remove(&volume_id); Ok(()) } }