From fa1be104a1ea7fc554f7d6b59856755790314d4b Mon Sep 17 00:00:00 2001 From: Brandon W Maister Date: Wed, 19 Jan 2022 12:24:25 -0500 Subject: [PATCH] s3: Improve error messages and handling * Include more information in all error messages so that users can understand what caused the problem * ListBucketsError is a fatal error, but we were just sending two messages instead of making it a hard error. --- src/dataflow/src/source/s3.rs | 88 +++++++++++++++++++---------------- 1 file changed, 48 insertions(+), 40 deletions(-) diff --git a/src/dataflow/src/source/s3.rs b/src/dataflow/src/source/s3.rs index aa03162c04b35..5c2d9e0f36039 100644 --- a/src/dataflow/src/source/s3.rs +++ b/src/dataflow/src/source/s3.rs @@ -33,7 +33,6 @@ use std::default::Default; use std::fmt::Formatter; use std::ops::AddAssign; -use anyhow::anyhow; use async_compression::tokio::bufread::GzipDecoder; use aws_sdk_s3::error::{GetObjectError, ListObjectsV2Error}; use aws_sdk_s3::{Client as S3Client, SdkError}; @@ -222,7 +221,14 @@ async fn download_objects_task( bucket_info.keys.insert(msg.key); } Err(DownloadError::Failed { err }) => { - if tx.send(Err(S3Error::IoError(err))).await.is_err() { + if tx + .send(Err(S3Error::IoError { + bucket: msg_ref.bucket.clone(), + err, + })) + .await + .is_err() + { rx.close(); break; }; @@ -354,20 +360,14 @@ async fn scan_bucket_task( } continuation_token = response.next_continuation_token; } - Err(e) => { - let err_string = format!("{}", e); - tx.send(Err(S3Error::ListObjectsFailed(e))) - .await - .unwrap_or_else(|e| { - tracing::debug!("unable to send error on listing objects: {}", e) - }); + Err(err) => { + tx.send(Err(S3Error::ListObjectsFailed { + bucket: bucket.clone(), + err, + })) + .await + .unwrap_or_else(|e| tracing::debug!("Source queue has been shut down: {}", e)); - tracing::error!("failed to list bucket {}: {}", bucket, err_string); - tx.send(Err(S3Error::RetryFailed)) - .await - .unwrap_or_else(|e| { - tracing::debug!("unable to send error on retries failed: {}", e) - }); break; } } @@ -663,9 +663,14 @@ enum S3Error { key: String, err: SdkError, }, - ListObjectsFailed(SdkError), - RetryFailed, - IoError(std::io::Error), + ListObjectsFailed { + bucket: String, + err: SdkError, + }, + IoError { + bucket: String, + err: std::io::Error, + }, } impl std::error::Error for S3Error {} @@ -679,13 +684,18 @@ impl From for std::io::Error { impl std::fmt::Display for S3Error { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { - S3Error::ClientConstructionFailed(err) => err.fmt(f), + S3Error::ClientConstructionFailed(err) => { + write!(f, "Unable to build S3 client: {}", err) + } S3Error::GetObjectError { bucket, key, err } => { - write!(f, "getting object {}/{}: {}", bucket, key, err) + write!(f, "Unable to get S3 object {}/{}: {}", bucket, key, err) + } + S3Error::ListObjectsFailed { bucket, err } => { + write!(f, "Unable to list S3 bucket {}: {}", bucket, err) + } + S3Error::IoError { bucket, err } => { + write!(f, "IO Error for S3 bucket {}: {}", bucket, err) } - S3Error::ListObjectsFailed(err) => err.fmt(f), - S3Error::RetryFailed => write!(f, "Retry failed to produce result"), - S3Error::IoError(e) => write!(f, "IoError: {}", e), } } } @@ -934,24 +944,22 @@ impl SourceReader for S3SourceReader { value: record, })) } - Some(Some(Err(e))) => { - tracing::warn!( - "when reading source '{}' ({}): {}", - self.source_name, - self.id, - e - ); - match e { - S3Error::ClientConstructionFailed(err) => { - Err(anyhow!("Client construction failed: {}", err)) - } - S3Error::RetryFailed => Err(anyhow!("Retry failed")), - S3Error::IoError(e) => Err(e.into()), - S3Error::GetObjectError { .. } | S3Error::ListObjectsFailed(_) => { - Ok(NextMessage::Pending) - } + Some(Some(Err(e))) => match e { + S3Error::GetObjectError { .. } => { + tracing::warn!( + "when reading source '{}' ({}): {}", + self.source_name, + self.id, + e + ); + Ok(NextMessage::Pending) } - } + e + @ + (S3Error::ListObjectsFailed { .. } + | S3Error::ClientConstructionFailed(_) + | S3Error::IoError { .. }) => Err(e.into()), + }, None => Ok(NextMessage::Pending), Some(None) => Ok(NextMessage::Finished), }