Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Checksums for Upload #81

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions aws-s3-transfer-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,20 @@ publish = false
[dependencies]
async-channel = "2.3.1"
async-trait = "0.1.82"
aws-config = { version = "1.5.6", features = ["behavior-version-latest"] }
aws-sdk-s3 = { version = "1.51.0", features = ["behavior-version-latest"] }
aws-smithy-async = "1.2.1"
aws-smithy-experimental = { version = "0.1.3", features = ["crypto-aws-lc"] }
aws-smithy-runtime-api = "1.7.1"
aws-smithy-types = "1.2.6"
aws-types = "1.3.3"
#aws-config = { version = "1.5.6", features = ["behavior-version-latest"] }
#aws-sdk-s3 = { version = "1.51.0", features = ["behavior-version-latest"] }
#aws-smithy-async = "1.2.1"
#aws-smithy-experimental = { version = "0.1.3", features = ["crypto-aws-lc"] }
#aws-smithy-runtime-api = "1.7.1"
#aws-smithy-types = "1.2.6"
#aws-types = "1.3.3"
aws-config = { path = "../../motorcade/aws-sdk/sdk/aws-config" }
aws-sdk-s3 = { path = "../../motorcade/aws-sdk/sdk/s3" }
aws-smithy-async = { path = "../../motorcade/aws-sdk/sdk/aws-smithy-async" }
aws-smithy-experimental = { path = "../../motorcade/aws-sdk/sdk/aws-smithy-experimental", features = ["crypto-aws-lc"] }
aws-smithy-runtime-api = { path = "../../motorcade/aws-sdk/sdk/aws-smithy-runtime-api" }
aws-smithy-types = { path = "../../motorcade/aws-sdk/sdk/aws-smithy-types" }
aws-types = { path = "../../motorcade/aws-sdk/sdk/aws-types" }
blocking = "1.6.0"
bytes = "1"
bytes-utils = "0.1.4"
Expand All @@ -30,9 +37,12 @@ tracing = "0.1"
walkdir = "2"

[dev-dependencies]
aws-sdk-s3 = { version = "1.51.0", features = ["behavior-version-latest", "test-util"] }
aws-smithy-mocks-experimental = "0.2.1"
aws-smithy-runtime = { version = "1.7.1", features = ["client", "connector-hyper-0-14-x", "test-util", "wire-mock"] }
#aws-sdk-s3 = { version = "1.51.0", features = ["behavior-version-latest", "test-util"] }
#aws-smithy-mocks-experimental = "0.2.1"
#aws-smithy-runtime = { version = "1.7.1", features = ["client", "connector-hyper-0-14-x", "test-util", "wire-mock"] }
aws-sdk-s3 = { path = "../../motorcade/aws-sdk/sdk/s3", features = ["behavior-version-latest", "test-util"] }
aws-smithy-mocks-experimental = { path = "../../motorcade/aws-sdk/sdk/aws-smithy-mocks-experimental" }
aws-smithy-runtime = { path = "../../motorcade/aws-sdk/sdk/aws-smithy-runtime", features = ["client", "connector-hyper-0-14-x", "test-util", "wire-mock"] }
clap = { version = "4.5.7", default-features = false, features = ["derive", "std", "help"] }
console-subscriber = "0.4.0"
http-02x = { package = "http", version = "0.2.9" }
Expand Down
2 changes: 1 addition & 1 deletion aws-s3-transfer-manager/src/io/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ where
let data: Bytes = inner_buf.into();
let part_number = *this.next_part;
*this.next_part += 1;
let part = PartData { part_number, data };
let part = PartData::new(part_number, data);
return Poll::Ready(Some(Ok(part)));
} else if n == 0 {
// EOF
Expand Down
12 changes: 12 additions & 0 deletions aws-s3-transfer-manager/src/io/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,13 @@ pub struct PartData {
// 1-indexed
pub(crate) part_number: u64,
pub(crate) data: Bytes,

// TODO: getters, setters, builders, docs, cats, dogs, etc
pub checksum_crc32: Option<String>,
pub checksum_crc32_c: Option<String>,
pub checksum_crc64_nvme: Option<String>,
pub checksum_sha1: Option<String>,
pub checksum_sha256: Option<String>,
}

impl PartData {
Expand All @@ -183,6 +190,11 @@ impl PartData {
Self {
part_number,
data: data.into(),
checksum_crc32: None,
checksum_crc32_c: None,
checksum_crc64_nvme: None,
checksum_sha1: None,
checksum_sha256: None,
}
}
}
Expand Down
95 changes: 95 additions & 0 deletions aws-s3-transfer-manager/src/operation/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ impl Upload {
handle: Arc<crate::client::Handle>,
mut input: crate::operation::upload::UploadInput,
) -> Result<UploadHandle, error::Error> {
input = validate_and_fix_up_checksum_fields(&handle, input)?;

let min_mpu_threshold = handle.mpu_threshold_bytes();

let stream = input.take_body();
Expand All @@ -66,6 +68,95 @@ impl Upload {
}
}

/// Validate checksum fields.
/// Fix up the input and return it, so that checksum_algorithm and multipart_checksum_type
/// are both set, if we're doing checksums at all.
fn validate_and_fix_up_checksum_fields(
handle: &crate::client::Handle,
mut input: crate::operation::upload::UploadInput,
) -> Result<crate::operation::upload::UploadInput, error::Error> {
use aws_sdk_s3::types::{ChecksumAlgorithm, ChecksumType};

// Ensure user didn't pass multiple full_object_checksum_ values
let full_object_checksum_count = [
&input.full_object_checksum_crc32,
&input.full_object_checksum_crc32_c,
&input.full_object_checksum_crc64_nvme,
]
.iter()
.filter(|x| x.is_some())
.count();
if full_object_checksum_count > 1 {
return Err(error::invalid_input(
"Expecting a single full_object_checksum_ value. Multiple values are not allowed.",
));
}

// Ensure that, if user set multipart_checksum_type, we know the algorithm they want to use
if input.multipart_checksum_type.is_some()
&& input.checksum_algorithm.is_none()
&& full_object_checksum_count == 0
{
return Err(error::invalid_input(
"multipart_checksum_type can only be used when a checksum algorithm is specified.",
));
}

// If user set full_object_checksum_ value, ensure checksum_algorithm is set to match
if full_object_checksum_count == 1 {
let expected_algorithm = if input.full_object_checksum_crc32.is_some() {
ChecksumAlgorithm::Crc32
} else if input.full_object_checksum_crc32_c.is_some() {
ChecksumAlgorithm::Crc32C
} else {
assert!(input.full_object_checksum_crc64_nvme.is_some());
ChecksumAlgorithm::Crc64Nvme
};

match &input.checksum_algorithm {
None => input.checksum_algorithm = Some(expected_algorithm),
Some(checksum_algorithm) => {
if checksum_algorithm != &expected_algorithm {
return Err(error::invalid_input(
"Given checksum_algorithm does not match full_object_checksum_ value",
));
}
}
}
}

// If checksum_algorithm is still unset, but request_checksum_calculation is WhenSupported,
// default to using Crc64Nvme.
if input.checksum_algorithm.is_none()
&& handle
.config
.client()
.config()
.request_checksum_calculation()
.unwrap()
.eq(&aws_sdk_s3::config::RequestChecksumCalculation::WhenSupported)
{
input.checksum_algorithm = Some(ChecksumAlgorithm::Crc64Nvme);
}

// If multipart_checksum_type is still unset, and we know the algorithm,
// default to FullObject (unless it's SHA which must use Composite for multipart)
if input.multipart_checksum_type.is_none() {
input.multipart_checksum_type = match &input.checksum_algorithm {
None => None,
Some(
ChecksumAlgorithm::Crc32 | ChecksumAlgorithm::Crc32C | ChecksumAlgorithm::Crc64Nvme,
) => Some(ChecksumType::FullObject),
Some(ChecksumAlgorithm::Sha1 | ChecksumAlgorithm::Sha256) => {
Some(ChecksumType::Composite)
}
Some(_) => return Err(error::invalid_input("Unknown checksum_algorithm")),
};
}

Ok(input)
}

async fn try_start_put_object(
ctx: UploadContext,
stream: InputStream,
Expand Down Expand Up @@ -126,6 +217,9 @@ async fn put_object(
.set_object_lock_legal_hold_status(ctx.request.object_lock_legal_hold_status.clone())
.set_expected_bucket_owner(ctx.request.expected_bucket_owner.clone())
.set_checksum_algorithm(ctx.request.checksum_algorithm.clone())
.set_checksum_crc32(ctx.request.full_object_checksum_crc32.clone())
.set_checksum_crc32_c(ctx.request.full_object_checksum_crc32_c.clone())
.set_checksum_crc64_nvme(ctx.request.full_object_checksum_crc64_nvme.clone())
.send()
.instrument(tracing::info_span!(
"send-upload-part",
Expand Down Expand Up @@ -211,6 +305,7 @@ async fn start_mpu(ctx: &UploadContext) -> Result<UploadOutputBuilder, crate::er
.set_object_lock_legal_hold_status(req.object_lock_legal_hold_status.clone())
.set_expected_bucket_owner(req.expected_bucket_owner.clone())
.set_checksum_algorithm(req.checksum_algorithm.clone())
.set_checksum_type(req.multipart_checksum_type.clone())
.send()
.instrument(tracing::debug_span!("send-create-multipart-upload"))
.await?;
Expand Down
Loading
Loading