Skip to content

Commit

Permalink
migrate: ethers to alloy (#194)
Browse files Browse the repository at this point in the history
* migrate: ethers to alloy

* update alloy to 0.6.4

* fix ci

* add RetryBackoffLayer

* fix missing field

* fix calculate

* add missing n_rlp_bytes
  • Loading branch information
YadominJinta authored Jan 6, 2025
1 parent eba6192 commit 1abef98
Show file tree
Hide file tree
Showing 63 changed files with 3,540 additions and 2,969 deletions.
4,152 changes: 2,222 additions & 1,930 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 9 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@ cryo_cli = { version = "0.3.2", path = "./crates/cli" }
cryo_freeze = { version = "0.3.2", path = "./crates/freeze" }
cryo_to_df = { version = "0.3.2", path = "./crates/to_df" }

alloy = { version = "0.6.4", features = [
"full",
"rpc-types-trace",
"provider-ws",
"provider-ipc",
"provider-debug-api",
"provider-trace-api",
"transport-ipc-mock",
] }
anstyle = "1.0.4"
async-trait = "0.1.74"
chrono = { version = "0.4.31", features = ["serde"] }
Expand All @@ -29,8 +38,6 @@ clap_cryo = { version = "4.3.21-cryo", features = [
] }
colored = "2.0.4"
color-print = "0.3.5"
ethers = { version = "2.0.10", features = ["rustls", "ws", "ipc"] }
ethers-core = "2.0.10"
eyre = "0.6.8"
futures = "0.3.29"
governor = "0.6.0"
Expand Down
2 changes: 1 addition & 1 deletion crates/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ path = "src/main.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
alloy = { workspace = true }
anstyle = { workspace = true }
clap_cryo = { workspace = true }
color-print = { workspace = true }
colored = { workspace = true }
cryo_freeze = { workspace = true }
ethers = { workspace = true }
eyre = { workspace = true }
governor = { workspace = true }
hex = { workspace = true }
Expand Down
4 changes: 4 additions & 0 deletions crates/cli/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ pub struct Args {
#[arg(long, default_value_t = 500, value_name = "B", help_heading = "Acquisition Options")]
pub initial_backoff: u64,

/// The number of compute units per second for this provider
#[arg(long, default_value_t = 50, value_name = "U", help_heading = "Acquisition Options")]
pub compute_units_per_second: u64,

/// Global number of concurrent requests
#[arg(long, value_name = "M", help_heading = "Acquisition Options")]
pub max_concurrent_requests: Option<u64>,
Expand Down
100 changes: 73 additions & 27 deletions crates/cli/src/parse/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ async fn parse_block_inputs(
}
}

#[derive(Clone, Debug)]
enum RangePosition {
First,
Last,
Expand Down Expand Up @@ -322,15 +323,14 @@ async fn parse_block_number(
source: Arc<Source>,
) -> Result<u64, ParseError> {
match (block_ref, range_position) {
("latest", _) => source.get_block_number().await.map(|n| n.as_u64()).map_err(|_e| {
("latest", _) => source.get_block_number().await.map_err(|_e| {
ParseError::ParseError("Error retrieving latest block number".to_string())
}),
("", RangePosition::First) => Ok(0),
("", RangePosition::Last) => {
source.get_block_number().await.map(|n| n.as_u64()).map_err(|_e| {
ParseError::ParseError("Error retrieving last block number".to_string())
})
}
("", RangePosition::Last) => source
.get_block_number()
.await
.map_err(|_e| ParseError::ParseError("Error retrieving last block number".to_string())),
("", RangePosition::None) => Err(ParseError::ParseError("invalid input".to_string())),
_ if block_ref.ends_with('B') | block_ref.ends_with('b') => {
let s = &block_ref[..block_ref.len() - 1];
Expand Down Expand Up @@ -366,7 +366,7 @@ async fn apply_reorg_buffer(
0 => Ok(block_chunks),
reorg_filter => {
let latest_block = match source.get_block_number().await {
Ok(result) => result.as_u64(),
Ok(result) => result,
Err(_e) => {
return Err(ParseError::ParseError("reorg buffer parse error".to_string()))
}
Expand All @@ -387,24 +387,34 @@ pub(crate) async fn get_latest_block_number(source: Arc<Source>) -> Result<u64,
source
.get_block_number()
.await
.map(|n| n.as_u64())
.map_err(|_e| ParseError::ParseError("Error retrieving latest block number".to_string()))
}

#[cfg(test)]
mod tests {
use std::path::PathBuf;

use alloy::{
providers::{IpcConnect, ProviderBuilder},
transports::ipc::MockIpcServer,
};

use super::*;
use ethers::prelude::*;

#[derive(Clone, Debug)]
enum BlockTokenTest<'a> {
WithoutMock((&'a str, BlockChunk)), // Token | Expected
WithMock((&'a str, BlockChunk, u64)), // Token | Expected | Mock Block Response
}

async fn block_token_test_helper(tests: Vec<(BlockTokenTest<'_>, bool)>) {
let (provider, mock) = Provider::mocked();
async fn block_token_test_helper(
tests: Vec<(BlockTokenTest<'_>, bool)>,
mock_ipc_path: PathBuf,
) {
let ipc = IpcConnect::new(mock_ipc_path);
let provider = ProviderBuilder::new().on_ipc(ipc).await.unwrap().boxed();
let source = Source {
provider: provider.into(),
provider,
semaphore: Arc::new(None),
rate_limiter: Arc::new(None),
chain_id: 1,
Expand All @@ -416,8 +426,7 @@ mod tests {
let source = Arc::new(source);
for (test, res) in tests {
match test {
BlockTokenTest::WithMock((token, expected, latest)) => {
mock.push(U64::from(latest)).unwrap();
BlockTokenTest::WithMock((token, expected, _latest)) => {
assert_eq!(
block_token_test_executor(token, expected, source.clone()).await,
res
Expand Down Expand Up @@ -458,15 +467,20 @@ mod tests {
}
}

#[derive(Clone, Debug)]
enum BlockInputTest<'a> {
WithoutMock((&'a String, Vec<BlockChunk>)), // Token | Expected
WithMock((&'a String, Vec<BlockChunk>, u64)), // Token | Expected | Mock Block Response
}

async fn block_input_test_helper(tests: Vec<(BlockInputTest<'_>, bool)>) {
let (provider, mock) = Provider::mocked();
async fn block_input_test_helper(
tests: Vec<(BlockInputTest<'_>, bool)>,
mock_ipc_path: PathBuf,
) {
let ipc = IpcConnect::new(mock_ipc_path);
let provider = ProviderBuilder::new().on_ipc(ipc).await.unwrap().boxed();
let source = Arc::new(Source {
provider: provider.into(),
provider,
chain_id: 1,
rpc_url: "".to_string(),
inner_request_size: 1,
Expand All @@ -477,8 +491,7 @@ mod tests {
});
for (test, res) in tests {
match test {
BlockInputTest::WithMock((inputs, expected, latest)) => {
mock.push(U64::from(latest)).unwrap();
BlockInputTest::WithMock((inputs, expected, _latest)) => {
assert_eq!(
block_input_test_executor(inputs, expected, source.clone()).await,
res
Expand Down Expand Up @@ -531,15 +544,20 @@ mod tests {
true
}

#[derive(Clone, Debug)]
enum BlockNumberTest<'a> {
WithoutMock((&'a str, RangePosition, u64)),
WithMock((&'a str, RangePosition, u64, u64)),
}

async fn block_number_test_helper(tests: Vec<(BlockNumberTest<'_>, bool)>) {
let (provider, mock) = Provider::mocked();
async fn block_number_test_helper(
tests: Vec<(BlockNumberTest<'_>, bool)>,
mock_ipc_path: PathBuf,
) {
let provider =
ProviderBuilder::new().on_ipc(IpcConnect::new(mock_ipc_path)).await.unwrap().boxed();
let source = Source {
provider: provider.into(),
provider,
semaphore: Arc::new(None),
rate_limiter: Arc::new(None),
chain_id: 1,
Expand All @@ -551,8 +569,7 @@ mod tests {
let source = Arc::new(source);
for (test, res) in tests {
match test {
BlockNumberTest::WithMock((block_ref, range_position, expected, latest)) => {
mock.push(U64::from(latest)).unwrap();
BlockNumberTest::WithMock((block_ref, range_position, expected, _latest)) => {
assert_eq!(
block_number_test_executor(
block_ref,
Expand Down Expand Up @@ -604,7 +621,18 @@ mod tests {
// Number type
(BlockTokenTest::WithoutMock((r"1", BlockChunk::Numbers(vec![1]))), true), /* Single block */
];
block_token_test_helper(tests).await;
let mut mock_server = MockIpcServer::new();
let mock_ipc_path = mock_server.path().clone();
for (test, _) in tests.clone().into_iter() {
match test {
BlockTokenTest::WithoutMock(_) => {}
BlockTokenTest::WithMock((_, _, mock_response)) => {
mock_server.add_reply(mock_response)
}
}
}
mock_server.spawn().await;
block_token_test_helper(tests, mock_ipc_path).await;
}

#[tokio::test]
Expand Down Expand Up @@ -648,7 +676,16 @@ mod tests {
true,
), // Multi input complex
];
block_input_test_helper(tests).await;
let mut mock_server = MockIpcServer::new();
let mock_ipc_path = mock_server.path().clone();
for (test, _) in tests.clone() {
match test {
BlockInputTest::WithMock((_, _, expected)) => mock_server.add_reply(expected),
BlockInputTest::WithoutMock(_) => {}
}
}
mock_server.spawn().await;
block_input_test_helper(tests, mock_ipc_path).await;
}

#[tokio::test]
Expand All @@ -666,6 +703,15 @@ mod tests {
(BlockNumberTest::WithoutMock((r"1m", RangePosition::None, 1000000)), true), // m
(BlockNumberTest::WithoutMock((r"1k", RangePosition::None, 1000)), true), // k
];
block_number_test_helper(tests).await;
let mut mock_server = MockIpcServer::new();
let mock_ipc_path = mock_server.path().clone();
for (test, _) in tests.clone() {
match test {
BlockNumberTest::WithMock((_, _, _, expected)) => mock_server.add_reply(expected),
BlockNumberTest::WithoutMock(_) => {}
}
}
mock_server.spawn().await;
block_number_test_helper(tests, mock_ipc_path).await;
}
}
2 changes: 1 addition & 1 deletion crates/cli/src/parse/file_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ fn parse_row_group_size(
) -> Option<usize> {
match (row_group_size, n_row_groups, chunk_size) {
(Some(row_group_size), _, _) => Some(row_group_size),
(_, Some(n_row_groups), Some(cs)) => Some((cs + n_row_groups - 1) / n_row_groups),
(_, Some(n_row_groups), Some(cs)) => Some(cs.div_ceil(n_row_groups)),
_ => None,
}
}
2 changes: 1 addition & 1 deletion crates/cli/src/parse/parse_utils.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use cryo_freeze::ParseError;
use std::collections::HashMap;

pub(crate) fn hex_string_to_binary(hex_string: &String) -> Result<Vec<u8>, ParseError> {
pub(crate) fn hex_string_to_binary(hex_string: &str) -> Result<Vec<u8>, ParseError> {
let hex_string = hex_string.strip_prefix("0x").unwrap_or(hex_string);
hex::decode(hex_string)
.map_err(|_| ParseError::ParseError("could not parse data as hex".to_string()))
Expand Down
1 change: 0 additions & 1 deletion crates/cli/src/parse/partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use cryo_freeze::{
AddressChunk, CallDataChunk, Datatype, Dim, ParseError, Partition, PartitionLabels, SlotChunk,
Source, Table, TimeDimension, TopicChunk, TransactionChunk,
};
use ethers::prelude::*;
use rand::{seq::SliceRandom, thread_rng};
use std::{collections::HashMap, str::FromStr, sync::Arc};

Expand Down
47 changes: 20 additions & 27 deletions crates/cli/src/parse/source.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,33 @@
use std::env;

use crate::args::Args;
use cryo_freeze::{sources::ProviderWrapper, ParseError, Source, SourceLabels};
use ethers::prelude::*;
use alloy::{
providers::{Provider, ProviderBuilder, RootProvider},
rpc::client::{BuiltInConnectionString, ClientBuilder, RpcClient},
transports::{layers::RetryBackoffLayer, BoxTransport},
};
use cryo_freeze::{ParseError, Source, SourceLabels};
use governor::{Quota, RateLimiter};
use polars::prelude::*;
use std::num::NonZeroU32;

pub(crate) async fn parse_source(args: &Args) -> Result<Source, ParseError> {
// parse network info
let rpc_url = parse_rpc_url(args)?;
let (provider, chain_id): (ProviderWrapper, u64) = if rpc_url.starts_with("http") {
let provider = Provider::<RetryClient<Http>>::new_client(
&rpc_url,
args.max_retries,
args.initial_backoff,
)
.map_err(|_e| ParseError::ParseError("could not connect to provider".to_string()))?;
let chain_id = provider.get_chainid().await.map_err(ParseError::ProviderError)?.as_u64();
(provider.into(), chain_id)
} else if rpc_url.starts_with("ws") {
let provider = Provider::<Ws>::connect(&rpc_url).await.map_err(|_| {
ParseError::ParseError("could not instantiate HTTP Provider".to_string())
})?;
let chain_id = provider.get_chainid().await.map_err(ParseError::ProviderError)?.as_u64();
(provider.into(), chain_id)
} else if rpc_url.ends_with(".ipc") {
let provider: Provider<Ipc> = Provider::connect_ipc(&rpc_url).await.map_err(|_| {
ParseError::ParseError("could not instantiate HTTP Provider".to_string())
})?;
let chain_id = provider.get_chainid().await.map_err(ParseError::ProviderError)?.as_u64();
(provider.into(), chain_id)
} else {
return Err(ParseError::ParseError(format!("invalid rpc url: {}", rpc_url)));
};

let retry_layer = RetryBackoffLayer::new(
args.max_retries,
args.initial_backoff,
args.compute_units_per_second,
);
let connect: BuiltInConnectionString = rpc_url.parse().map_err(ParseError::ProviderError)?;
let client: RpcClient<BoxTransport> = ClientBuilder::default()
.layer(retry_layer)
.connect_boxed(connect)
.await
.map_err(ParseError::ProviderError)?
.boxed();
let provider: RootProvider<BoxTransport> = ProviderBuilder::default().on_client(client);
let chain_id = provider.get_chain_id().await.map_err(ParseError::ProviderError)?;
let rate_limiter = match args.requests_per_second {
Some(rate_limit) => match (NonZeroU32::new(1), NonZeroU32::new(rate_limit)) {
(Some(one), Some(value)) => {
Expand Down
Loading

0 comments on commit 1abef98

Please sign in to comment.