Skip to content

Commit

Permalink
add RetryBackoffLayer
Browse files Browse the repository at this point in the history
  • Loading branch information
YadominJinta committed Dec 6, 2024
1 parent e123c41 commit 4582ae5
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 7 deletions.
5 changes: 5 additions & 0 deletions crates/cli/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ pub struct Args {
#[arg(long, default_value_t = 500, value_name = "B", help_heading = "Acquisition Options")]
pub initial_backoff: u64,

/// The number of compute units per second for this provider
#[arg(long, default_value_t = 50, value_name = "U", help_heading="Acquisition Options")]
pub compute_units_per_second: u64,


/// Global number of concurrent requests
#[arg(long, value_name = "M", help_heading = "Acquisition Options")]
pub max_concurrent_requests: Option<u64>,
Expand Down
18 changes: 15 additions & 3 deletions crates/cli/src/parse/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use std::env;
use crate::args::Args;
use alloy::{
providers::{Provider, ProviderBuilder, RootProvider},
transports::BoxTransport,
rpc::client::{BuiltInConnectionString, ClientBuilder, RpcClient},
transports::{layers::RetryBackoffLayer, BoxTransport},
};
use cryo_freeze::{ParseError, Source, SourceLabels};
use governor::{Quota, RateLimiter};
Expand All @@ -13,8 +14,19 @@ use std::num::NonZeroU32;
pub(crate) async fn parse_source(args: &Args) -> Result<Source, ParseError> {
// parse network info
let rpc_url = parse_rpc_url(args)?;
let provider: RootProvider<BoxTransport> =
ProviderBuilder::default().on_builtin(&rpc_url).await.map_err(ParseError::ProviderError)?;
let retry_layer = RetryBackoffLayer::new(
args.max_retries,
args.initial_backoff,
args.compute_units_per_second,
);
let connect: BuiltInConnectionString = rpc_url.parse().map_err(ParseError::ProviderError)?;
let client: RpcClient<BoxTransport> = ClientBuilder::default()
.layer(retry_layer)
.connect_boxed(connect)
.await
.map_err(ParseError::ProviderError)?
.boxed();
let provider: RootProvider<BoxTransport> = ProviderBuilder::default().on_client(client);
let chain_id = provider.get_chain_id().await.map_err(ParseError::ProviderError)?;
let rate_limiter = match args.requests_per_second {
Some(rate_limit) => match (NonZeroU32::new(1), NonZeroU32::new(rate_limit)) {
Expand Down
18 changes: 14 additions & 4 deletions crates/cli/src/parse/timestamps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ async fn get_latest_timestamp(source: Arc<Source>) -> Result<u64, ParseError> {
mod tests {
use std::num::NonZeroU32;

use alloy::providers::ProviderBuilder;
use alloy::{providers::ProviderBuilder, rpc::client::{BuiltInConnectionString, ClientBuilder, RpcClient}, transports::{layers::RetryBackoffLayer, BoxTransport}};
use governor::{Quota, RateLimiter};

use super::*;
Expand All @@ -335,10 +335,20 @@ mod tests {
Ok(url) => url,
Err(_) => std::process::exit(0),
};
// let max_retry = 5;
// let initial_backoff = 500;
let max_retry = 5;
let initial_backoff = 500;
let compute_units_per_second = 50;
let max_concurrent_requests = 100;
let provider = ProviderBuilder::new().on_http(rpc_url.parse().unwrap()).boxed();
let retry_layer = RetryBackoffLayer::new(max_retry, initial_backoff, compute_units_per_second);
let connect: BuiltInConnectionString = rpc_url.parse().map_err(ParseError::ProviderError).unwrap();
let client: RpcClient<BoxTransport> = ClientBuilder::default()
.layer(retry_layer)
.connect_boxed(connect)
.await
.map_err(ParseError::ProviderError)
.unwrap()
.boxed();
let provider = ProviderBuilder::default().on_client(client);
let quota = Quota::per_second(NonZeroU32::new(15).unwrap())
.allow_burst(NonZeroU32::new(1).unwrap());
let rate_limiter = Some(RateLimiter::direct(quota));
Expand Down

0 comments on commit 4582ae5

Please sign in to comment.