From 4582ae5d2c1e89dfd4a2be78c6589b8c423ca4ce Mon Sep 17 00:00:00 2001 From: Yadomin Jinta Date: Fri, 6 Dec 2024 23:50:15 +0800 Subject: [PATCH] add RetryBackoffLayer --- crates/cli/src/args.rs | 5 +++++ crates/cli/src/parse/source.rs | 18 +++++++++++++++--- crates/cli/src/parse/timestamps.rs | 18 ++++++++++++++---- 3 files changed, 34 insertions(+), 7 deletions(-) diff --git a/crates/cli/src/args.rs b/crates/cli/src/args.rs index 49eff61c..7a859647 100644 --- a/crates/cli/src/args.rs +++ b/crates/cli/src/args.rs @@ -106,6 +106,11 @@ pub struct Args { #[arg(long, default_value_t = 500, value_name = "B", help_heading = "Acquisition Options")] pub initial_backoff: u64, + /// The number of compute units per second for this provider + #[arg(long, default_value_t = 50, value_name = "U", help_heading="Acquisition Options")] + pub compute_units_per_second: u64, + + /// Global number of concurrent requests #[arg(long, value_name = "M", help_heading = "Acquisition Options")] pub max_concurrent_requests: Option, diff --git a/crates/cli/src/parse/source.rs b/crates/cli/src/parse/source.rs index 525566f8..6b625157 100644 --- a/crates/cli/src/parse/source.rs +++ b/crates/cli/src/parse/source.rs @@ -3,7 +3,8 @@ use std::env; use crate::args::Args; use alloy::{ providers::{Provider, ProviderBuilder, RootProvider}, - transports::BoxTransport, + rpc::client::{BuiltInConnectionString, ClientBuilder, RpcClient}, + transports::{layers::RetryBackoffLayer, BoxTransport}, }; use cryo_freeze::{ParseError, Source, SourceLabels}; use governor::{Quota, RateLimiter}; @@ -13,8 +14,19 @@ use std::num::NonZeroU32; pub(crate) async fn parse_source(args: &Args) -> Result { // parse network info let rpc_url = parse_rpc_url(args)?; - let provider: RootProvider = - ProviderBuilder::default().on_builtin(&rpc_url).await.map_err(ParseError::ProviderError)?; + let retry_layer = RetryBackoffLayer::new( + args.max_retries, + args.initial_backoff, + args.compute_units_per_second, + ); + let connect: BuiltInConnectionString = rpc_url.parse().map_err(ParseError::ProviderError)?; + let client: RpcClient = ClientBuilder::default() + .layer(retry_layer) + .connect_boxed(connect) + .await + .map_err(ParseError::ProviderError)? + .boxed(); + let provider: RootProvider = ProviderBuilder::default().on_client(client); let chain_id = provider.get_chain_id().await.map_err(ParseError::ProviderError)?; let rate_limiter = match args.requests_per_second { Some(rate_limit) => match (NonZeroU32::new(1), NonZeroU32::new(rate_limit)) { diff --git a/crates/cli/src/parse/timestamps.rs b/crates/cli/src/parse/timestamps.rs index 4badc708..428823aa 100644 --- a/crates/cli/src/parse/timestamps.rs +++ b/crates/cli/src/parse/timestamps.rs @@ -324,7 +324,7 @@ async fn get_latest_timestamp(source: Arc) -> Result { mod tests { use std::num::NonZeroU32; - use alloy::providers::ProviderBuilder; + use alloy::{providers::ProviderBuilder, rpc::client::{BuiltInConnectionString, ClientBuilder, RpcClient}, transports::{layers::RetryBackoffLayer, BoxTransport}}; use governor::{Quota, RateLimiter}; use super::*; @@ -335,10 +335,20 @@ mod tests { Ok(url) => url, Err(_) => std::process::exit(0), }; - // let max_retry = 5; - // let initial_backoff = 500; + let max_retry = 5; + let initial_backoff = 500; + let compute_units_per_second = 50; let max_concurrent_requests = 100; - let provider = ProviderBuilder::new().on_http(rpc_url.parse().unwrap()).boxed(); + let retry_layer = RetryBackoffLayer::new(max_retry, initial_backoff, compute_units_per_second); + let connect: BuiltInConnectionString = rpc_url.parse().map_err(ParseError::ProviderError).unwrap(); + let client: RpcClient = ClientBuilder::default() + .layer(retry_layer) + .connect_boxed(connect) + .await + .map_err(ParseError::ProviderError) + .unwrap() + .boxed(); + let provider = ProviderBuilder::default().on_client(client); let quota = Quota::per_second(NonZeroU32::new(15).unwrap()) .allow_burst(NonZeroU32::new(1).unwrap()); let rate_limiter = Some(RateLimiter::direct(quota));