Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

s3: Improve error messages and handling #10125

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 46 additions & 40 deletions src/dataflow/src/source/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use std::default::Default;
use std::fmt::Formatter;
use std::ops::AddAssign;

use anyhow::anyhow;
use async_compression::tokio::bufread::GzipDecoder;
use aws_sdk_s3::error::{GetObjectError, ListObjectsV2Error};
use aws_sdk_s3::{Client as S3Client, SdkError};
Expand Down Expand Up @@ -222,7 +221,14 @@ async fn download_objects_task(
bucket_info.keys.insert(msg.key);
}
Err(DownloadError::Failed { err }) => {
if tx.send(Err(S3Error::IoError(err))).await.is_err() {
if tx
.send(Err(S3Error::IoError {
bucket: msg_ref.bucket.clone(),
err,
}))
.await
.is_err()
{
rx.close();
break;
};
Expand Down Expand Up @@ -354,20 +360,14 @@ async fn scan_bucket_task(
}
continuation_token = response.next_continuation_token;
}
Err(e) => {
let err_string = format!("{}", e);
tx.send(Err(S3Error::ListObjectsFailed(e)))
.await
.unwrap_or_else(|e| {
tracing::debug!("unable to send error on listing objects: {}", e)
});
Err(err) => {
tx.send(Err(S3Error::ListObjectsFailed {
bucket: bucket.clone(),
err,
}))
.await
.unwrap_or_else(|e| tracing::debug!("Source queue has been shut down: {}", e));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this only happen during shutdown?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


tracing::error!("failed to list bucket {}: {}", bucket, err_string);
tx.send(Err(S3Error::RetryFailed))
.await
.unwrap_or_else(|e| {
tracing::debug!("unable to send error on retries failed: {}", e)
});
break;
}
}
Expand Down Expand Up @@ -663,9 +663,14 @@ enum S3Error {
key: String,
err: SdkError<GetObjectError>,
},
ListObjectsFailed(SdkError<ListObjectsV2Error>),
RetryFailed,
IoError(std::io::Error),
ListObjectsFailed {
bucket: String,
err: SdkError<ListObjectsV2Error>,
},
IoError {
bucket: String,
err: std::io::Error,
},
}

impl std::error::Error for S3Error {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be nice to implement source on the error impl, for the cases that have inner errors (which right now is all of them), maybe a good TODO?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, currently we print all causes in the display impl instead of implementing the causes method. There is a rust-general discussion about what the best thing to is, but this is generally what we do in mz.

For now I think it's more confusing to both impl display of causes and the cause() method, but I do think it would be better in general if we impl'd cause() everywhere and used anyhow's {:#?} format for final display of errors.

Expand All @@ -679,13 +684,18 @@ impl From<S3Error> for std::io::Error {
impl std::fmt::Display for S3Error {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
S3Error::ClientConstructionFailed(err) => err.fmt(f),
S3Error::ClientConstructionFailed(err) => {
write!(f, "Unable to build S3 client: {}", err)
}
S3Error::GetObjectError { bucket, key, err } => {
write!(f, "getting object {}/{}: {}", bucket, key, err)
write!(f, "Unable to get S3 object {}/{}: {}", bucket, key, err)
}
S3Error::ListObjectsFailed { bucket, err } => {
write!(f, "Unable to list S3 bucket {}: {}", bucket, err)
}
S3Error::IoError { bucket, err } => {
write!(f, "IO Error for S3 bucket {}: {}", bucket, err)
}
S3Error::ListObjectsFailed(err) => err.fmt(f),
S3Error::RetryFailed => write!(f, "Retry failed to produce result"),
S3Error::IoError(e) => write!(f, "IoError: {}", e),
}
}
}
Expand Down Expand Up @@ -934,24 +944,20 @@ impl SourceReader for S3SourceReader {
value: record,
}))
}
Some(Some(Err(e))) => {
tracing::warn!(
"when reading source '{}' ({}): {}",
self.source_name,
self.id,
e
);
match e {
S3Error::ClientConstructionFailed(err) => {
Err(anyhow!("Client construction failed: {}", err))
}
S3Error::RetryFailed => Err(anyhow!("Retry failed")),
S3Error::IoError(e) => Err(e.into()),
S3Error::GetObjectError { .. } | S3Error::ListObjectsFailed(_) => {
Ok(NextMessage::Pending)
}
Some(Some(Err(e))) => match e {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does the double-Option mean? another good TODO to change that to an explicit enum

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that the double-option is an an artifact of using now_or_never() on an async queue -- recv() yields an Option, and now_or_never() yields an Option of the thing it's called on.

S3Error::GetObjectError { .. } => {
tracing::warn!(
"when reading source '{}' ({}): {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"when reading source '{}' ({}): {}",
"error when reading source '{}' ({}): {}",

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that adding error to warn! logs is helpful, it just makes grep error (and especially case-insensitive search in e.g. less) less useful/more confusing. This will currently show up as WARN <mod> when reading source....

I agree that this is a sentence fragment, though.

self.source_name,
self.id,
e
);
Ok(NextMessage::Pending)
}
}
e @ (S3Error::ListObjectsFailed { .. }
| S3Error::ClientConstructionFailed(_)
| S3Error::IoError { .. }) => Err(e.into()),
},
None => Ok(NextMessage::Pending),
Some(None) => Ok(NextMessage::Finished),
}
Expand Down