-
Notifications
You must be signed in to change notification settings - Fork 466
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
s3: Improve error messages and handling #10125
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -33,7 +33,6 @@ use std::default::Default; | |||||
use std::fmt::Formatter; | ||||||
use std::ops::AddAssign; | ||||||
|
||||||
use anyhow::anyhow; | ||||||
use async_compression::tokio::bufread::GzipDecoder; | ||||||
use aws_sdk_s3::error::{GetObjectError, ListObjectsV2Error}; | ||||||
use aws_sdk_s3::{Client as S3Client, SdkError}; | ||||||
|
@@ -222,7 +221,14 @@ async fn download_objects_task( | |||||
bucket_info.keys.insert(msg.key); | ||||||
} | ||||||
Err(DownloadError::Failed { err }) => { | ||||||
if tx.send(Err(S3Error::IoError(err))).await.is_err() { | ||||||
if tx | ||||||
.send(Err(S3Error::IoError { | ||||||
bucket: msg_ref.bucket.clone(), | ||||||
err, | ||||||
})) | ||||||
.await | ||||||
.is_err() | ||||||
{ | ||||||
rx.close(); | ||||||
break; | ||||||
}; | ||||||
|
@@ -354,20 +360,14 @@ async fn scan_bucket_task( | |||||
} | ||||||
continuation_token = response.next_continuation_token; | ||||||
} | ||||||
Err(e) => { | ||||||
let err_string = format!("{}", e); | ||||||
tx.send(Err(S3Error::ListObjectsFailed(e))) | ||||||
.await | ||||||
.unwrap_or_else(|e| { | ||||||
tracing::debug!("unable to send error on listing objects: {}", e) | ||||||
}); | ||||||
Err(err) => { | ||||||
tx.send(Err(S3Error::ListObjectsFailed { | ||||||
bucket: bucket.clone(), | ||||||
err, | ||||||
})) | ||||||
.await | ||||||
.unwrap_or_else(|e| tracing::debug!("Source queue has been shut down: {}", e)); | ||||||
|
||||||
tracing::error!("failed to list bucket {}: {}", bucket, err_string); | ||||||
tx.send(Err(S3Error::RetryFailed)) | ||||||
.await | ||||||
.unwrap_or_else(|e| { | ||||||
tracing::debug!("unable to send error on retries failed: {}", e) | ||||||
}); | ||||||
break; | ||||||
} | ||||||
} | ||||||
|
@@ -663,9 +663,14 @@ enum S3Error { | |||||
key: String, | ||||||
err: SdkError<GetObjectError>, | ||||||
}, | ||||||
ListObjectsFailed(SdkError<ListObjectsV2Error>), | ||||||
RetryFailed, | ||||||
IoError(std::io::Error), | ||||||
ListObjectsFailed { | ||||||
bucket: String, | ||||||
err: SdkError<ListObjectsV2Error>, | ||||||
}, | ||||||
IoError { | ||||||
bucket: String, | ||||||
err: std::io::Error, | ||||||
}, | ||||||
} | ||||||
|
||||||
impl std::error::Error for S3Error {} | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it would be nice to implement There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, currently we print all causes in the display impl instead of implementing the causes method. There is a rust-general discussion about what the best thing to is, but this is generally what we do in mz. For now I think it's more confusing to both impl display of causes and the |
||||||
|
@@ -679,13 +684,18 @@ impl From<S3Error> for std::io::Error { | |||||
impl std::fmt::Display for S3Error { | ||||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { | ||||||
match self { | ||||||
S3Error::ClientConstructionFailed(err) => err.fmt(f), | ||||||
S3Error::ClientConstructionFailed(err) => { | ||||||
write!(f, "Unable to build S3 client: {}", err) | ||||||
} | ||||||
S3Error::GetObjectError { bucket, key, err } => { | ||||||
write!(f, "getting object {}/{}: {}", bucket, key, err) | ||||||
write!(f, "Unable to get S3 object {}/{}: {}", bucket, key, err) | ||||||
} | ||||||
S3Error::ListObjectsFailed { bucket, err } => { | ||||||
write!(f, "Unable to list S3 bucket {}: {}", bucket, err) | ||||||
} | ||||||
S3Error::IoError { bucket, err } => { | ||||||
write!(f, "IO Error for S3 bucket {}: {}", bucket, err) | ||||||
} | ||||||
S3Error::ListObjectsFailed(err) => err.fmt(f), | ||||||
S3Error::RetryFailed => write!(f, "Retry failed to produce result"), | ||||||
S3Error::IoError(e) => write!(f, "IoError: {}", e), | ||||||
} | ||||||
} | ||||||
} | ||||||
|
@@ -934,24 +944,20 @@ impl SourceReader for S3SourceReader { | |||||
value: record, | ||||||
})) | ||||||
} | ||||||
Some(Some(Err(e))) => { | ||||||
tracing::warn!( | ||||||
"when reading source '{}' ({}): {}", | ||||||
self.source_name, | ||||||
self.id, | ||||||
e | ||||||
); | ||||||
match e { | ||||||
S3Error::ClientConstructionFailed(err) => { | ||||||
Err(anyhow!("Client construction failed: {}", err)) | ||||||
} | ||||||
S3Error::RetryFailed => Err(anyhow!("Retry failed")), | ||||||
S3Error::IoError(e) => Err(e.into()), | ||||||
S3Error::GetObjectError { .. } | S3Error::ListObjectsFailed(_) => { | ||||||
Ok(NextMessage::Pending) | ||||||
} | ||||||
Some(Some(Err(e))) => match e { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what does the double-Option mean? another good TODO to change that to an explicit enum There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe that the double-option is an an artifact of using |
||||||
S3Error::GetObjectError { .. } => { | ||||||
tracing::warn!( | ||||||
"when reading source '{}' ({}): {}", | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think that adding I agree that this is a sentence fragment, though. |
||||||
self.source_name, | ||||||
self.id, | ||||||
e | ||||||
); | ||||||
Ok(NextMessage::Pending) | ||||||
} | ||||||
} | ||||||
e @ (S3Error::ListObjectsFailed { .. } | ||||||
| S3Error::ClientConstructionFailed(_) | ||||||
| S3Error::IoError { .. }) => Err(e.into()), | ||||||
}, | ||||||
None => Ok(NextMessage::Pending), | ||||||
Some(None) => Ok(NextMessage::Finished), | ||||||
} | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this only happen during shutdown?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup.