diff --git a/crates/cli/src/args.rs b/crates/cli/src/args.rs index 49eff61c..1c942898 100644 --- a/crates/cli/src/args.rs +++ b/crates/cli/src/args.rs @@ -196,6 +196,10 @@ pub struct Args { #[arg(long, help_heading = "Output Options")] pub no_report: bool, + /// Skip writing to dataframe if the df is empty + #[arg(long, help_heading = "Acquisition Options")] + pub skip_empty: Option, + /// Address(es) #[arg(long, help_heading = "Dataset-specific Options", num_args(1..))] pub address: Option>, diff --git a/crates/cli/src/parse/execution.rs b/crates/cli/src/parse/execution.rs index 949a6356..91162b33 100644 --- a/crates/cli/src/parse/execution.rs +++ b/crates/cli/src/parse/execution.rs @@ -17,6 +17,7 @@ pub(crate) fn parse_execution_env(args: &Args, n_tasks: u64) -> Result Vec { - vec![Dim::FromAddress, Dim::ToAddress] + vec![Dim::Address, Dim::FromAddress, Dim::ToAddress] } } @@ -65,8 +65,22 @@ pub type TransactionAndReceipt = (Transaction, Option); #[async_trait::async_trait] impl CollectByBlock for Transactions { type Response = (Block, Vec, bool); - async fn extract(request: Params, source: Arc, query: Arc) -> R { + fn get_addresses() -> Vec { + let env = ExecutionEnv::default(); + let cli_command = env.cli_command.unwrap(); + if let Some(address_index) = cli_command.iter().position(|arg| arg == "--address") { + cli_command[address_index + 1..] + .to_vec() + .iter() + .take_while(|&arg| !arg.starts_with("--")) + .map(|s| s.parse::().expect("Invalid H160")) + .collect::>() + } else { + Vec::new() + } + } + let block = source .get_block_with_txs(request.block_number()?) .await? @@ -77,7 +91,7 @@ impl CollectByBlock for Transactions { // filter by from_address let from_filter: Box bool + Send> = if let Some(from_address) = &request.from_address { - Box::new(move |tx| tx.from.as_bytes() == from_address) + Box::new(move |tx| from_address == tx.from.as_bytes()) } else { Box::new(|_| true) }; @@ -88,8 +102,34 @@ impl CollectByBlock for Transactions { } else { Box::new(|_| true) }; - let transactions = - block.transactions.clone().into_iter().filter(from_filter).filter(to_filter).collect(); + // filter by addresses (if either the to or from address are in the vector of addresses) + let addresses = get_addresses(); + let addr_filter: Box bool + Send> = + if let Some(address) = &request.address { + Box::new(move |tx| { + let to_address_is_in = addresses.contains(&tx.to.unwrap()); + let from_address_is_in = addresses.contains(&tx.from); + if addresses.len() == 1 { + to_address_is_in || from_address_is_in + } else { + if !(to_address_is_in && from_address_is_in) { + tx.to.as_ref().map_or(false, |x| x.as_bytes() == address) + } else { + tx.from.as_bytes() == address + } + } + }) + } else { + Box::new(|_| true) + }; + let transactions = block + .transactions + .clone() + .into_iter() + .filter(from_filter) + .filter(to_filter) + .filter(addr_filter) + .collect(); // 2. collect receipts if necessary // if transactions are filtered fetch by set of transaction hashes, else fetch all receipts @@ -97,7 +137,10 @@ impl CollectByBlock for Transactions { let receipts: Vec> = if schema.has_column("gas_used") | schema.has_column("success") { // receipts required - let receipts = if request.from_address.is_some() || request.to_address.is_some() { + let receipts = if request.from_address.is_some() || + request.to_address.is_some() || + request.address.is_some() + { source.get_tx_receipts(&transactions).await? } else { source.get_tx_receipts_in_block(&block).await? @@ -107,8 +150,8 @@ impl CollectByBlock for Transactions { vec![None; block.transactions.len()] }; - let transactions_with_receips = transactions.into_iter().zip(receipts).collect(); - Ok((block, transactions_with_receips, query.exclude_failed)) + let transactions_with_receipts = transactions.into_iter().zip(receipts).collect(); + Ok((block, transactions_with_receipts, query.exclude_failed)) } fn transform(response: Self::Response, columns: &mut Self, query: &Arc) -> R<()> { diff --git a/crates/freeze/src/freeze.rs b/crates/freeze/src/freeze.rs index 38841ff2..3ecde7a4 100644 --- a/crates/freeze/src/freeze.rs +++ b/crates/freeze/src/freeze.rs @@ -183,7 +183,11 @@ async fn freeze_partition(payload: PartitionPayload) -> Result, /// report directory pub report_dir: Option, + /// skips writing empty dfs + pub skip_empty: Option, } impl ExecutionEnv { @@ -67,6 +69,7 @@ pub struct ExecutionEnvBuilder { t_start: SystemTime, t_end: Option, report_dir: Option, + skip_empty: Option, } impl Default for ExecutionEnvBuilder { @@ -82,6 +85,7 @@ impl Default for ExecutionEnvBuilder { t_start: SystemTime::now(), t_end: None, report_dir: None, + skip_empty: Some(false), } } } @@ -134,6 +138,11 @@ impl ExecutionEnvBuilder { self } + /// skip_empty + pub fn skip_empty(mut self, skip_empty: Option) -> Self { + self.skip_empty = skip_empty; + self + } /// build final output pub fn build(self) -> ExecutionEnv { ExecutionEnv { @@ -147,6 +156,7 @@ impl ExecutionEnvBuilder { t_start: self.t_start, t_end: self.t_end, report_dir: self.report_dir, + skip_empty: self.skip_empty, } } } diff --git a/crates/python/pyproject.toml b/crates/python/pyproject.toml index 2d2ce6c3..1a5a32ed 100644 --- a/crates/python/pyproject.toml +++ b/crates/python/pyproject.toml @@ -6,6 +6,7 @@ build-backend = "maturin" name = "cryo" description = "cryo is the easiest way to extract blockchain data to parquet, csv, json, or a python dataframe." requires-python = ">=3.7" +dependencies = ["polars","pyarrow"] classifiers = [ "Programming Language :: Rust", "Programming Language :: Python :: Implementation :: CPython", @@ -13,6 +14,8 @@ classifiers = [ ] readme = "README.md" +[project.optional-dependencies] +pandas = ["pandas>=2.1.3"] [tool.maturin] python-source = "python" diff --git a/crates/python/python/cryo/_args.py b/crates/python/python/cryo/_args.py index 01ef0e86..bad37b61 100644 --- a/crates/python/python/cryo/_args.py +++ b/crates/python/python/cryo/_args.py @@ -37,7 +37,8 @@ def parse_cli_args( else: raise Exception('unknown file_format') - kwargs['no_verbose'] = not verbose - + if 'no_verbose' not in kwargs.keys(): + kwargs['no_verbose'] = not verbose + return kwargs diff --git a/crates/python/python/cryo/_spec.py b/crates/python/python/cryo/_spec.py index 11af8ae3..d01a3679 100644 --- a/crates/python/python/cryo/_spec.py +++ b/crates/python/python/cryo/_spec.py @@ -54,4 +54,14 @@ class CryoCliArgs(TypedDict, total=False): topic3: str | bytes | None inner_request_size: int | None no_verbose: bool + address: typing.Sequence[str] | None + to_address: typing.Sequence[str] | None + from_address: typing.Sequence[str] | None + call_data: typing.Sequence[str] | None + function: typing.Sequence[str] | None + inputs: typing.Sequence[str] | None + slot: typing.Sequence[str] | None + event_signature: str | bytes | None + inner_request_size: int | None + js_tracer: str | bytes | None diff --git a/crates/python/rust/collect_adapter.rs b/crates/python/rust/collect_adapter.rs index 0f8e4672..1bd53871 100644 --- a/crates/python/rust/collect_adapter.rs +++ b/crates/python/rust/collect_adapter.rs @@ -64,6 +64,7 @@ use cryo_freeze::collect; verbose = false, no_verbose = false, event_signature = None, + skip_empty = Some(false), ) )] #[allow(clippy::too_many_arguments)] @@ -125,6 +126,7 @@ pub fn _collect( verbose: bool, no_verbose: bool, event_signature: Option, + skip_empty: Option, ) -> PyResult<&PyAny> { if let Some(command) = command { pyo3_asyncio::tokio::future_into_py(py, async move { @@ -190,6 +192,7 @@ pub fn _collect( verbose, no_verbose, event_signature, + skip_empty, }; pyo3_asyncio::tokio::future_into_py(py, async move { match run_collect(args).await { diff --git a/crates/python/rust/freeze_adapter.rs b/crates/python/rust/freeze_adapter.rs index 5b7616ea..6712bc08 100644 --- a/crates/python/rust/freeze_adapter.rs +++ b/crates/python/rust/freeze_adapter.rs @@ -61,6 +61,7 @@ use cryo_cli::{run, Args}; verbose = false, no_verbose = false, event_signature = None, + skip_empty = Some(false), ) )] #[allow(clippy::too_many_arguments)] @@ -122,6 +123,7 @@ pub fn _freeze( verbose: bool, no_verbose: bool, event_signature: Option, + skip_empty: Option, ) -> PyResult<&PyAny> { if let Some(command) = command { freeze_command(py, command) @@ -182,6 +184,7 @@ pub fn _freeze( verbose, no_verbose, event_signature, + skip_empty, }; pyo3_asyncio::tokio::future_into_py(py, async move {