Skip to content

Commit

Permalink
[nexus] Use retryable transactions more extensively (#7212)
Browse files Browse the repository at this point in the history
This PR finds spots where we use `transaction_async` and makes them use
`transaction_retry_wrapper` instead.

This means that under contention, we'll avoid wasting work, and can make
use of CockroachDB's automated retry mechanisms.

Additionally, this PR adds a clippy lint to help future usage avoid the
"non-retryable" transaction variant.
There are some use cases where avoiding retries is still reasonable:

1. Test-only code
2. Transactions which have truly minimal contention, or which can fail
with serialization errors without issue
3. Nested transactions
  • Loading branch information
smklein authored Dec 6, 2024
1 parent e73a30e commit 9b662ea
Show file tree
Hide file tree
Showing 16 changed files with 478 additions and 384 deletions.
6 changes: 6 additions & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,10 @@ disallowed-methods = [
# `IncompleteOnConflictExt::as_partial_index` in `nexus-db-queries`.
# See the documentation of that method for more.
"diesel::upsert::DecoratableTarget::filter_target",

# This form of transaction is susceptible to serialization failures,
# and can fail spuriously.
# Instead, the "transaction_retry_wrapper" should be preferred, as it
# automatically retries transactions experiencing contention.
{ path = "async_bb8_diesel::AsyncConnection::transaction_async", reason = "Prefer to use transaction_retry_wrapper, if possible. Feel free to override this for tests and nested transactions." },
]
2 changes: 2 additions & 0 deletions dev-tools/omdb/src/bin/omdb/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
// NOTE: emanates from Tabled macros
#![allow(clippy::useless_vec)]
// NOTE: allowing "transaction_async" without retry
#![allow(clippy::disallowed_methods)]

use crate::check_allow_destructive::DestructiveOperationToken;
use crate::helpers::const_max_len;
Expand Down
35 changes: 22 additions & 13 deletions nexus/db-queries/src/db/datastore/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,11 @@ impl DataStore {
// batch rather than making a bunch of round-trips to the database.
// We'd do that if we had an interface for doing that with bound
// parameters, etc. See oxidecomputer/omicron#973.

// The risk of a serialization error is possible here, but low,
// as most of the operations should be insertions rather than in-place
// modifications of existing tables.
#[allow(clippy::disallowed_methods)]
conn.transaction_async(|conn| async move {
// Insert the row for the blueprint.
{
Expand Down Expand Up @@ -1087,6 +1092,7 @@ impl DataStore {
// start removing it and we'd also need to make sure we didn't leak a
// collection if we crash while deleting it.
let conn = self.pool_connection_authorized(opctx).await?;
let err = OptionalError::new();

let (
nblueprints,
Expand All @@ -1101,19 +1107,23 @@ impl DataStore {
nclickhouse_cluster_configs,
nclickhouse_keepers,
nclickhouse_servers,
) = conn
.transaction_async(|conn| async move {
) = self.transaction_retry_wrapper("blueprint_delete")
.transaction(&conn, |conn| {
let err = err.clone();
async move {
// Ensure that blueprint we're about to delete is not the
// current target.
let current_target =
Self::blueprint_current_target_only(&conn).await?;
let current_target = Self::blueprint_current_target_only(&conn)
.await
.map_err(|txn_err| txn_err.into_diesel(&err))?;

if current_target.target_id == blueprint_id {
return Err(TransactionError::CustomError(
return Err(err.bail(TransactionError::CustomError(
Error::conflict(format!(
"blueprint {blueprint_id} is the \
current target and cannot be deleted",
)),
));
)));
}

// Remove the record describing the blueprint itself.
Expand All @@ -1130,9 +1140,9 @@ impl DataStore {
// references to it in any of the remaining tables either, since
// deletion always goes through this transaction.
if nblueprints == 0 {
return Err(TransactionError::CustomError(
return Err(err.bail(TransactionError::CustomError(
authz_blueprint.not_found(),
));
)));
}

// Remove rows associated with sled states.
Expand Down Expand Up @@ -1259,13 +1269,12 @@ impl DataStore {
nclickhouse_keepers,
nclickhouse_servers,
))
}
})
.await
.map_err(|error| match error {
TransactionError::CustomError(e) => e,
TransactionError::Database(e) => {
public_error_from_diesel(e, ErrorHandler::Server)
}
.map_err(|e| match err.take() {
Some(err) => err.into(),
None => public_error_from_diesel(e, ErrorHandler::Server),
})?;

info!(&opctx.log, "removed blueprint";
Expand Down
79 changes: 47 additions & 32 deletions nexus/db-queries/src/db/datastore/dns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::db::pagination::paginated;
use crate::db::pagination::Paginator;
use crate::db::pool::DbConnection;
use crate::db::TransactionError;
use crate::transaction_retry::OptionalError;
use async_bb8_diesel::AsyncConnection;
use async_bb8_diesel::AsyncRunQueryDsl;
use diesel::prelude::*;
Expand Down Expand Up @@ -363,40 +364,49 @@ impl DataStore {
) -> Result<(), Error> {
opctx.authorize(authz::Action::Modify, &authz::DNS_CONFIG).await?;
let conn = self.pool_connection_authorized(opctx).await?;
conn.transaction_async(|c| async move {
let zones = self
.dns_zones_list_all_on_connection(opctx, &c, update.dns_group)
.await?;
// This looks like a time-of-check-to-time-of-use race, but this
// approach works because we're inside a transaction and the
// isolation level is SERIALIZABLE.
let version = self
.dns_group_latest_version_conn(opctx, &c, update.dns_group)
.await?;
if version.version != old_version {
return Err(TransactionError::CustomError(Error::conflict(
format!(
"expected current DNS version to be {}, found {}",
*old_version, *version.version,
),
)));
}

self.dns_write_version_internal(
&c,
update,
zones,
Generation(old_version.next()),
)
let err = OptionalError::new();

self.transaction_retry_wrapper("dns_update_from_version")
.transaction(&conn, |c| {
let err = err.clone();
let update = update.clone();
async move {
let zones = self
.dns_zones_list_all_on_connection(opctx, &c, update.dns_group)
.await
.map_err(|txn_error| txn_error.into_diesel(&err))?;
// This looks like a time-of-check-to-time-of-use race, but this
// approach works because we're inside a transaction and the
// isolation level is SERIALIZABLE.
let version = self
.dns_group_latest_version_conn(opctx, &c, update.dns_group)
.await
.map_err(|txn_error| txn_error.into_diesel(&err))?;
if version.version != old_version {
return Err(err.bail(TransactionError::CustomError(Error::conflict(
format!(
"expected current DNS version to be {}, found {}",
*old_version, *version.version,
),
))));
}

self.dns_write_version_internal(
&c,
update,
zones,
Generation(old_version.next()),
)
.await
.map_err(|txn_error| txn_error.into_diesel(&err))
}
})
.await
})
.await
.map_err(|e| match e {
TransactionError::CustomError(e) => e,
TransactionError::Database(e) => {
public_error_from_diesel(e, ErrorHandler::Server)
}
})
.map_err(|e| match err.take() {
Some(err) => err.into(),
None => public_error_from_diesel(e, ErrorHandler::Server),
})
}

/// Update the configuration of a DNS zone as specified in `update`
Expand Down Expand Up @@ -441,6 +451,9 @@ impl DataStore {
.dns_zones_list_all_on_connection(opctx, conn, update.dns_group)
.await?;

// This method is used in nested transactions, which are not supported
// with retryable transactions.
#[allow(clippy::disallowed_methods)]
conn.transaction_async(|c| async move {
let version = self
.dns_group_latest_version_conn(opctx, conn, update.dns_group)
Expand Down Expand Up @@ -1724,6 +1737,8 @@ mod test {

let cds = datastore.clone();
let copctx = opctx.child(std::collections::BTreeMap::new());

#[allow(clippy::disallowed_methods)]
let mut fut = conn1
.transaction_async(|c1| async move {
cds.dns_update_incremental(&copctx, &c1, update1)
Expand Down
Loading

0 comments on commit 9b662ea

Please sign in to comment.