-
Notifications
You must be signed in to change notification settings - Fork 40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle region snapshot replacement volume deletes #7046
Changes from 10 commits
92da0a9
bee3cca
309742d
c9455ec
4630e03
4c729e6
1c50f37
63cb56e
0ce2029
79f8ad9
8b0a677
b2a740f
18efdd3
640e678
1a60b15
9fc46ab
2cd7881
a78cabf
c869cad
231764d
7b93e2e
cb1c2fe
fdf800a
abf522e
715c9f7
e4d0844
518a86c
ec9bb9a
34db989
4931f58
2a37e9c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,7 +22,6 @@ use crate::db::update_and_check::UpdateAndCheck; | |
use crate::db::update_and_check::UpdateStatus; | ||
use crate::db::TransactionError; | ||
use crate::transaction_retry::OptionalError; | ||
use async_bb8_diesel::AsyncConnection; | ||
use async_bb8_diesel::AsyncRunQueryDsl; | ||
use diesel::prelude::*; | ||
use omicron_common::api::external::Error; | ||
|
@@ -94,44 +93,48 @@ impl DataStore { | |
volume_id: Uuid, | ||
) -> Result<(), Error> { | ||
let err = OptionalError::new(); | ||
self.pool_connection_authorized(opctx) | ||
.await? | ||
.transaction_async(|conn| { | ||
let err = err.clone(); | ||
async move { | ||
use db::schema::region_snapshot_replacement::dsl; | ||
let conn = self.pool_connection_authorized(opctx).await?; | ||
|
||
// An associated volume repair record isn't _strictly_ | ||
// needed: snapshot volumes should never be directly | ||
// constructed, and therefore won't ever have an associated | ||
// Upstairs that receives a volume replacement request. | ||
// However it's being done in an attempt to be overly | ||
// cautious, and it validates that the volume exist: | ||
// otherwise it would be possible to create a region | ||
// snapshot replacement request for a volume that didn't | ||
// exist! | ||
|
||
Self::volume_repair_insert_in_txn( | ||
&conn, err, volume_id, request.id, | ||
) | ||
.await?; | ||
self.transaction_retry_wrapper( | ||
"insert_region_snapshot_replacement_request_with_volume_id", | ||
) | ||
.transaction(&conn, |conn| { | ||
let request = request.clone(); | ||
let err = err.clone(); | ||
async move { | ||
use db::schema::region_snapshot_replacement::dsl; | ||
|
||
diesel::insert_into(dsl::region_snapshot_replacement) | ||
.values(request) | ||
.execute_async(&conn) | ||
.await?; | ||
// An associated volume repair record isn't _strictly_ | ||
// needed: snapshot volumes should never be directly | ||
// constructed, and therefore won't ever have an associated | ||
// Upstairs that receives a volume replacement request. | ||
// However it's being done in an attempt to be overly | ||
// cautious, and it validates that the volume exist: | ||
// otherwise it would be possible to create a region | ||
// snapshot replacement request for a volume that didn't | ||
// exist! | ||
|
||
Ok(()) | ||
} | ||
}) | ||
.await | ||
.map_err(|e| { | ||
if let Some(err) = err.take() { | ||
err | ||
} else { | ||
public_error_from_diesel(e, ErrorHandler::Server) | ||
} | ||
}) | ||
Self::volume_repair_insert_in_txn( | ||
&conn, err, volume_id, request.id, | ||
) | ||
.await?; | ||
|
||
diesel::insert_into(dsl::region_snapshot_replacement) | ||
.values(request) | ||
.execute_async(&conn) | ||
.await?; | ||
|
||
Ok(()) | ||
} | ||
}) | ||
.await | ||
.map_err(|e| { | ||
if let Some(err) = err.take() { | ||
err | ||
} else { | ||
public_error_from_diesel(e, ErrorHandler::Server) | ||
} | ||
}) | ||
} | ||
|
||
pub async fn get_region_snapshot_replacement_request_by_id( | ||
|
@@ -685,16 +688,25 @@ impl DataStore { | |
region_snapshot_replacement_id: Uuid, | ||
operating_saga_id: Uuid, | ||
) -> Result<(), Error> { | ||
type TxnError = TransactionError<Error>; | ||
|
||
let err = OptionalError::new(); | ||
let conn = self.pool_connection_authorized(opctx).await?; | ||
|
||
self.transaction_retry_wrapper("set_region_snapshot_replacement_complete") | ||
.transaction(&conn, |conn| { | ||
let err = err.clone(); | ||
async move { | ||
use db::schema::volume_repair::dsl as volume_repair_dsl; | ||
self.transaction_retry_wrapper( | ||
"set_region_snapshot_replacement_complete", | ||
) | ||
.transaction(&conn, |conn| { | ||
let err = err.clone(); | ||
async move { | ||
use db::schema::volume_repair::dsl as volume_repair_dsl; | ||
|
||
diesel::delete( | ||
volume_repair_dsl::volume_repair.filter( | ||
volume_repair_dsl::repair_id | ||
.eq(region_snapshot_replacement_id), | ||
), | ||
) | ||
.execute_async(&conn) | ||
.await?; | ||
|
||
use db::schema::region_snapshot_replacement::dsl; | ||
|
||
|
@@ -716,22 +728,81 @@ impl DataStore { | |
.execute_and_check(&conn) | ||
.await?; | ||
|
||
match result.status { | ||
UpdateStatus::Updated => Ok(()), | ||
UpdateStatus::NotUpdatedButExists => { | ||
let record = result.found; | ||
|
||
if record.replacement_state | ||
== RegionSnapshotReplacementState::Complete | ||
{ | ||
Ok(()) | ||
} else { | ||
Err(err.bail(Error::conflict(format!( | ||
"region snapshot replacement {} set to {:?} \ | ||
(operating saga id {:?})", | ||
region_snapshot_replacement_id, | ||
record.replacement_state, | ||
record.operating_saga_id, | ||
)))) | ||
Comment on lines
+741
to
+747
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is unexpected, correct? (It means either that the operating saga ID was wrong, or the caller called this on a replacement that wasn't Completing.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's unexpected, yeah. Even in the case where the saga node is rerun the state should be set to Complete already and the saga shouldn't unwind. |
||
} | ||
} | ||
} | ||
} | ||
}) | ||
.await | ||
.map_err(|e| match err.take() { | ||
Some(error) => error, | ||
None => public_error_from_diesel(e, ErrorHandler::Server), | ||
}) | ||
} | ||
|
||
/// Transition a RegionSnapshotReplacement record from Requested to Complete | ||
/// - this is required when the region snapshot is hard-deleted, which means | ||
/// that all volume references are gone and no replacement is required. Also | ||
/// removes the `volume_repair` record that is taking a "lock" on the | ||
/// Volume. | ||
pub async fn set_region_snapshot_replacement_complete_from_requested( | ||
&self, | ||
opctx: &OpContext, | ||
region_snapshot_replacement_id: Uuid, | ||
) -> Result<(), Error> { | ||
type TxnError = TransactionError<Error>; | ||
|
||
let err = OptionalError::new(); | ||
let conn = self.pool_connection_authorized(opctx).await?; | ||
|
||
self.transaction_retry_wrapper("set_region_snapshot_replacement_complete") | ||
.transaction(&conn, |conn| { | ||
let err = err.clone(); | ||
async move { | ||
use db::schema::volume_repair::dsl as volume_repair_dsl; | ||
use db::schema::region_snapshot_replacement::dsl; | ||
|
||
diesel::delete( | ||
volume_repair_dsl::volume_repair.filter( | ||
volume_repair_dsl::repair_id | ||
.eq(region_snapshot_replacement_id), | ||
), | ||
) | ||
.execute_async(&conn) | ||
.await?; | ||
|
||
let result = diesel::update(dsl::region_snapshot_replacement) | ||
.filter(dsl::id.eq(region_snapshot_replacement_id)) | ||
.filter( | ||
dsl::replacement_state | ||
.eq(RegionSnapshotReplacementState::Running), | ||
.eq(RegionSnapshotReplacementState::Requested), | ||
) | ||
.filter(dsl::operating_saga_id.is_null()) | ||
.set((dsl::replacement_state | ||
.eq(RegionSnapshotReplacementState::Complete),)) | ||
.filter(dsl::new_region_volume_id.is_null()) | ||
.set(dsl::replacement_state | ||
.eq(RegionSnapshotReplacementState::Complete)) | ||
.check_if_exists::<RegionSnapshotReplacement>( | ||
region_snapshot_replacement_id, | ||
) | ||
.execute_and_check(&conn) | ||
.await?; | ||
.await?; | ||
leftwo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
match result.status { | ||
UpdateStatus::Updated => Ok(()), | ||
|
@@ -750,7 +821,7 @@ impl DataStore { | |
region_snapshot_replacement_id, | ||
record.replacement_state, | ||
record.operating_saga_id, | ||
), | ||
) | ||
)))) | ||
} | ||
} | ||
|
@@ -1220,11 +1291,16 @@ impl DataStore { | |
opctx: &OpContext, | ||
region_snapshot_replacement_step: RegionSnapshotReplacementStep, | ||
) -> Result<(), Error> { | ||
type TxnError = TransactionError<Error>; | ||
let conn = self.pool_connection_authorized(opctx).await?; | ||
let err = OptionalError::new(); | ||
|
||
self.pool_connection_authorized(opctx) | ||
.await? | ||
.transaction_async(|conn| async move { | ||
self.transaction_retry_wrapper( | ||
"set_region_snapshot_replacement_complete", | ||
) | ||
.transaction(&conn, |conn| { | ||
let err = err.clone(); | ||
|
||
async move { | ||
use db::schema::volume_repair::dsl as volume_repair_dsl; | ||
|
||
diesel::delete( | ||
|
@@ -1267,27 +1343,23 @@ impl DataStore { | |
{ | ||
Ok(()) | ||
} else { | ||
Err(TxnError::CustomError(Error::conflict( | ||
format!( | ||
"region snapshot replacement step {} set \ | ||
Err(err.bail(Error::conflict(format!( | ||
"region snapshot replacement step {} set \ | ||
leftwo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
to {:?} (operating saga id {:?})", | ||
region_snapshot_replacement_step.id, | ||
record.replacement_state, | ||
record.operating_saga_id, | ||
), | ||
))) | ||
region_snapshot_replacement_step.id, | ||
record.replacement_state, | ||
record.operating_saga_id, | ||
)))) | ||
} | ||
} | ||
} | ||
}) | ||
.await | ||
.map_err(|e| match e { | ||
TxnError::CustomError(error) => error, | ||
|
||
TxnError::Database(error) => { | ||
public_error_from_diesel(error, ErrorHandler::Server) | ||
} | ||
}) | ||
} | ||
}) | ||
.await | ||
.map_err(|e| match err.take() { | ||
Some(error) => error, | ||
None => public_error_from_diesel(e, ErrorHandler::Server), | ||
}) | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I follow the bit about snapshots not being directly accessed by an upstairs, but I thought the repair record was still needed for mutual exclusion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not strictly speaking necessary - many replacements could occur on the snapshot volume at the same time, and because it's never constructed there wouldn't be any repair operation required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They wouldn't contend on the snapshot volume's database record?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They would only contend around the volume repair record. If there was no lock for the snapshot volume, then the individual replacement transactions could all fire in whatever order they're going to serialize in, and it would probably work.