Skip to content

Commit

Permalink
s3: Improve error messages and handling
Browse files Browse the repository at this point in the history
* Include more information in all error messages so that users can understand
  what caused the problem
* ListBucketsError is a fatal error, but we were just sending two messages
  instead of making it a hard error.
  • Loading branch information
quodlibetor committed Jan 19, 2022
1 parent 6b6be90 commit fa1be10
Showing 1 changed file with 48 additions and 40 deletions.
88 changes: 48 additions & 40 deletions src/dataflow/src/source/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use std::default::Default;
use std::fmt::Formatter;
use std::ops::AddAssign;

use anyhow::anyhow;
use async_compression::tokio::bufread::GzipDecoder;
use aws_sdk_s3::error::{GetObjectError, ListObjectsV2Error};
use aws_sdk_s3::{Client as S3Client, SdkError};
Expand Down Expand Up @@ -222,7 +221,14 @@ async fn download_objects_task(
bucket_info.keys.insert(msg.key);
}
Err(DownloadError::Failed { err }) => {
if tx.send(Err(S3Error::IoError(err))).await.is_err() {
if tx
.send(Err(S3Error::IoError {
bucket: msg_ref.bucket.clone(),
err,
}))
.await
.is_err()
{
rx.close();
break;
};
Expand Down Expand Up @@ -354,20 +360,14 @@ async fn scan_bucket_task(
}
continuation_token = response.next_continuation_token;
}
Err(e) => {
let err_string = format!("{}", e);
tx.send(Err(S3Error::ListObjectsFailed(e)))
.await
.unwrap_or_else(|e| {
tracing::debug!("unable to send error on listing objects: {}", e)
});
Err(err) => {
tx.send(Err(S3Error::ListObjectsFailed {
bucket: bucket.clone(),
err,
}))
.await
.unwrap_or_else(|e| tracing::debug!("Source queue has been shut down: {}", e));

tracing::error!("failed to list bucket {}: {}", bucket, err_string);
tx.send(Err(S3Error::RetryFailed))
.await
.unwrap_or_else(|e| {
tracing::debug!("unable to send error on retries failed: {}", e)
});
break;
}
}
Expand Down Expand Up @@ -663,9 +663,14 @@ enum S3Error {
key: String,
err: SdkError<GetObjectError>,
},
ListObjectsFailed(SdkError<ListObjectsV2Error>),
RetryFailed,
IoError(std::io::Error),
ListObjectsFailed {
bucket: String,
err: SdkError<ListObjectsV2Error>,
},
IoError {
bucket: String,
err: std::io::Error,
},
}

impl std::error::Error for S3Error {}
Expand All @@ -679,13 +684,18 @@ impl From<S3Error> for std::io::Error {
impl std::fmt::Display for S3Error {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
S3Error::ClientConstructionFailed(err) => err.fmt(f),
S3Error::ClientConstructionFailed(err) => {
write!(f, "Unable to build S3 client: {}", err)
}
S3Error::GetObjectError { bucket, key, err } => {
write!(f, "getting object {}/{}: {}", bucket, key, err)
write!(f, "Unable to get S3 object {}/{}: {}", bucket, key, err)
}
S3Error::ListObjectsFailed { bucket, err } => {
write!(f, "Unable to list S3 bucket {}: {}", bucket, err)
}
S3Error::IoError { bucket, err } => {
write!(f, "IO Error for S3 bucket {}: {}", bucket, err)
}
S3Error::ListObjectsFailed(err) => err.fmt(f),
S3Error::RetryFailed => write!(f, "Retry failed to produce result"),
S3Error::IoError(e) => write!(f, "IoError: {}", e),
}
}
}
Expand Down Expand Up @@ -934,24 +944,22 @@ impl SourceReader for S3SourceReader {
value: record,
}))
}
Some(Some(Err(e))) => {
tracing::warn!(
"when reading source '{}' ({}): {}",
self.source_name,
self.id,
e
);
match e {
S3Error::ClientConstructionFailed(err) => {
Err(anyhow!("Client construction failed: {}", err))
}
S3Error::RetryFailed => Err(anyhow!("Retry failed")),
S3Error::IoError(e) => Err(e.into()),
S3Error::GetObjectError { .. } | S3Error::ListObjectsFailed(_) => {
Ok(NextMessage::Pending)
}
Some(Some(Err(e))) => match e {
S3Error::GetObjectError { .. } => {
tracing::warn!(
"when reading source '{}' ({}): {}",
self.source_name,
self.id,
e
);
Ok(NextMessage::Pending)
}
}
e
@
(S3Error::ListObjectsFailed { .. }
| S3Error::ClientConstructionFailed(_)
| S3Error::IoError { .. }) => Err(e.into()),
},
None => Ok(NextMessage::Pending),
Some(None) => Ok(NextMessage::Finished),
}
Expand Down

0 comments on commit fa1be10

Please sign in to comment.