From 9e4d9c4f34b9eff619df078d7ba4ca409479cb8f Mon Sep 17 00:00:00 2001 From: davidthegardens Date: Thu, 7 Dec 2023 14:36:37 -0400 Subject: [PATCH 01/12] various-fixes --- crates/cli/src/args.rs | 4 + crates/cli/src/parse/execution.rs | 1 + crates/cli/src/parse/timestamps.rs | 2 +- .../src/datasets/address_appearances.rs | 127 ++---------------- crates/freeze/src/datasets/contracts.rs | 2 - crates/freeze/src/datasets/erc20_transfers.rs | 17 --- .../freeze/src/datasets/erc721_transfers.rs | 17 --- crates/freeze/src/datasets/logs.rs | 20 --- .../freeze/src/datasets/native_transfers.rs | 2 - crates/freeze/src/datasets/transactions.rs | 51 +++++-- crates/freeze/src/freeze.rs | 5 + crates/freeze/src/types/execution.rs | 10 ++ crates/python/pyproject.toml | 1 + crates/python/python/cryo/_args.py | 5 +- crates/python/python/cryo/_spec.py | 10 ++ crates/python/rust/collect_adapter.rs | 3 + crates/python/rust/freeze_adapter.rs | 3 + 17 files changed, 97 insertions(+), 183 deletions(-) diff --git a/crates/cli/src/args.rs b/crates/cli/src/args.rs index a147845a..125e5489 100644 --- a/crates/cli/src/args.rs +++ b/crates/cli/src/args.rs @@ -195,6 +195,10 @@ pub struct Args { /// Avoid saving a summary report #[arg(long, help_heading = "Output Options")] pub no_report: bool, + + /// Skip if writing if dataframe is empty + #[arg(long, help_heading = "Acquisition Options")] + pub write_empty: Option, /// Address(es) #[arg(long, help_heading = "Dataset-specific Options", num_args(1..))] diff --git a/crates/cli/src/parse/execution.rs b/crates/cli/src/parse/execution.rs index 949a6356..4dce1154 100644 --- a/crates/cli/src/parse/execution.rs +++ b/crates/cli/src/parse/execution.rs @@ -17,6 +17,7 @@ pub(crate) fn parse_execution_env(args: &Args, n_tasks: u64) -> Result( .map_err(|_e| ParseError::ParseError("Error fetching block for timestamp".to_string()))? .unwrap(); - #[allow(clippy::comparison_chain)] + #[warn(clippy::comparison_chain)] if block.timestamp == timestamp.into() { return Ok(mid); } else if block.timestamp < timestamp.into() { diff --git a/crates/freeze/src/datasets/address_appearances.rs b/crates/freeze/src/datasets/address_appearances.rs index 683379ae..897f969d 100644 --- a/crates/freeze/src/datasets/address_appearances.rs +++ b/crates/freeze/src/datasets/address_appearances.rs @@ -9,7 +9,6 @@ use std::collections::HashMap; pub struct AddressAppearances { n_rows: usize, block_number: Vec, - block_hash: Vec>, transaction_hash: Vec>, address: Vec>, relationship: Vec, @@ -18,17 +17,6 @@ pub struct AddressAppearances { #[async_trait::async_trait] impl Dataset for AddressAppearances { - fn default_columns() -> Option> { - Some(vec![ - "block_number", - // "block_hash", - "transaction_hash", - "address", - "relationship", - "chain_id", - ]) - } - fn default_sort() -> Option> { Some(vec!["block_number", "transaction_hash", "address", "relationship"]) } @@ -128,8 +116,7 @@ impl AddressAppearances { logs_by_tx: &HashMap>, ) { let block_number = trace.block_number as u32; - let block_hash = trace.block_hash.as_bytes().to_vec(); - self.process_address(block_author, "miner_fee", block_number, &block_hash, tx_hash, schema); + self.process_address(block_author, "miner_fee", block_number, tx_hash, schema); if let Some(logs) = logs_by_tx.get(&tx_hash) { for log in logs.iter() { @@ -139,26 +126,12 @@ impl AddressAppearances { from.copy_from_slice(&log.topics[1].to_fixed_bytes()[12..32]); let name = &(name.to_string() + "_from"); - self.process_address( - H160(from), - name, - block_number, - &block_hash, - tx_hash, - schema, - ); + self.process_address(H160(from), name, block_number, tx_hash, schema); let mut to: [u8; 20] = [0; 20]; to.copy_from_slice(&log.topics[1].to_fixed_bytes()[12..32]); let name = &(name.to_string() + "_to"); - self.process_address( - H160(to), - name, - block_number, - &block_hash, - tx_hash, - schema, - ); + self.process_address(H160(to), name, block_number, tx_hash, schema); } } } @@ -166,119 +139,47 @@ impl AddressAppearances { match &trace.action { Action::Call(action) => { - self.process_address( - action.from, - "tx_from", - block_number, - &block_hash, - tx_hash, - schema, - ); - self.process_address( - action.to, - "tx_to", - block_number, - &block_hash, - tx_hash, - schema, - ); + self.process_address(action.from, "tx_from", block_number, tx_hash, schema); + self.process_address(action.to, "tx_to", block_number, tx_hash, schema); } Action::Create(action) => { - self.process_address( - action.from, - "tx_from", - block_number, - &block_hash, - tx_hash, - schema, - ); + self.process_address(action.from, "tx_from", block_number, tx_hash, schema); } _ => {} } if let Some(Res::Create(result)) = &trace.result { - self.process_address( - result.address, - "tx_to", - block_number, - &block_hash, - tx_hash, - schema, - ); + self.process_address(result.address, "tx_to", block_number, tx_hash, schema); } } fn process_trace(&mut self, trace: &Trace, schema: &Table, tx_hash: H256) { let block_number = trace.block_number as u32; - let block_hash = trace.block_hash.as_bytes().to_vec(); match &trace.action { Action::Call(action) => { - self.process_address( - action.from, - "call_from", - block_number, - &block_hash, - tx_hash, - schema, - ); - self.process_address( - action.to, - "call_to", - block_number, - &block_hash, - tx_hash, - schema, - ); + self.process_address(action.from, "call_from", block_number, tx_hash, schema); + self.process_address(action.to, "call_to", block_number, tx_hash, schema); } Action::Create(action) => { - self.process_address( - action.from, - "factory", - block_number, - &block_hash, - tx_hash, - schema, - ); + self.process_address(action.from, "factory", block_number, tx_hash, schema); } Action::Suicide(action) => { - self.process_address( - action.address, - "suicide", - block_number, - &block_hash, - tx_hash, - schema, - ); + self.process_address(action.address, "suicide", block_number, tx_hash, schema); self.process_address( action.refund_address, "suicide_refund", block_number, - &block_hash, tx_hash, schema, ); } Action::Reward(action) => { - self.process_address( - action.author, - "author", - block_number, - &block_hash, - tx_hash, - schema, - ); + self.process_address(action.author, "author", block_number, tx_hash, schema); } } if let Some(Res::Create(result)) = &trace.result { - self.process_address( - result.address, - "create", - block_number, - &block_hash, - tx_hash, - schema, - ); + self.process_address(result.address, "create", block_number, tx_hash, schema); }; } @@ -287,7 +188,6 @@ impl AddressAppearances { address: H160, relationship: &str, block_number: u32, - block_hash: &[u8], transaction_hash: H256, schema: &Table, ) { @@ -295,7 +195,6 @@ impl AddressAppearances { store!(schema, self, address, address.as_bytes().to_vec()); store!(schema, self, relationship, relationship.to_string()); store!(schema, self, block_number, block_number); - store!(schema, self, block_hash, block_hash.to_vec()); store!(schema, self, transaction_hash, transaction_hash.as_bytes().to_vec()); } } diff --git a/crates/freeze/src/datasets/contracts.rs b/crates/freeze/src/datasets/contracts.rs index 0dd3a033..e6a3d209 100644 --- a/crates/freeze/src/datasets/contracts.rs +++ b/crates/freeze/src/datasets/contracts.rs @@ -10,7 +10,6 @@ use polars::prelude::*; pub struct Contracts { n_rows: u64, block_number: Vec, - block_hash: Vec>, create_index: Vec, transaction_hash: Vec>>, contract_address: Vec>, @@ -83,7 +82,6 @@ pub(crate) fn process_contracts( { columns.n_rows += 1; store!(schema, columns, block_number, trace.block_number as u32); - store!(schema, columns, block_hash, trace.block_hash.as_bytes().to_vec()); store!(schema, columns, create_index, create_index); create_index += 1; let tx = trace.transaction_hash; diff --git a/crates/freeze/src/datasets/erc20_transfers.rs b/crates/freeze/src/datasets/erc20_transfers.rs index 7870bf41..7d7ba245 100644 --- a/crates/freeze/src/datasets/erc20_transfers.rs +++ b/crates/freeze/src/datasets/erc20_transfers.rs @@ -8,7 +8,6 @@ use polars::prelude::*; pub struct Erc20Transfers { n_rows: u64, block_number: Vec, - block_hash: Vec>>, transaction_index: Vec, log_index: Vec, transaction_hash: Vec>, @@ -21,21 +20,6 @@ pub struct Erc20Transfers { #[async_trait::async_trait] impl Dataset for Erc20Transfers { - fn default_columns() -> Option> { - Some(vec![ - "block_number", - // "block_hash", - "transaction_index", - "log_index", - "transaction_hash", - "erc20", - "from_address", - "to_address", - "value", - "chain_id", - ]) - } - fn optional_parameters() -> Vec { vec![Dim::Address, Dim::Topic0, Dim::Topic1, Dim::Topic2, Dim::FromAddress, Dim::ToAddress] } @@ -104,7 +88,6 @@ fn process_erc20_transfers(logs: Vec, columns: &mut Erc20Transfers, schema: { columns.n_rows += 1; store!(schema, columns, block_number, bn.as_u32()); - store!(schema, columns, block_hash, log.block_hash.map(|bh| bh.as_bytes().to_vec())); store!(schema, columns, transaction_index, ti.as_u32()); store!(schema, columns, log_index, li.as_u32()); store!(schema, columns, transaction_hash, tx.as_bytes().to_vec()); diff --git a/crates/freeze/src/datasets/erc721_transfers.rs b/crates/freeze/src/datasets/erc721_transfers.rs index 7d00f304..b01b3378 100644 --- a/crates/freeze/src/datasets/erc721_transfers.rs +++ b/crates/freeze/src/datasets/erc721_transfers.rs @@ -8,7 +8,6 @@ use polars::prelude::*; pub struct Erc721Transfers { n_rows: u64, block_number: Vec, - block_hash: Vec>>, transaction_index: Vec, log_index: Vec, transaction_hash: Vec>, @@ -21,21 +20,6 @@ pub struct Erc721Transfers { #[async_trait::async_trait] impl Dataset for Erc721Transfers { - fn default_columns() -> Option> { - Some(vec![ - "block_number", - // "block_hash", - "transaction_index", - "log_index", - "transaction_hash", - "erc20", - "from_address", - "to_address", - "token_id", - "chain_id", - ]) - } - fn optional_parameters() -> Vec { vec![Dim::Address, Dim::FromAddress, Dim::ToAddress] } @@ -109,7 +93,6 @@ fn process_erc721_transfers( { columns.n_rows += 1; store!(schema, columns, block_number, bn.as_u32()); - store!(schema, columns, block_hash, log.block_hash.map(|bh| bh.as_bytes().to_vec())); store!(schema, columns, transaction_index, ti.as_u32()); store!(schema, columns, log_index, li.as_u32()); store!(schema, columns, transaction_hash, tx.as_bytes().to_vec()); diff --git a/crates/freeze/src/datasets/logs.rs b/crates/freeze/src/datasets/logs.rs index c4be6327..322f3c7e 100644 --- a/crates/freeze/src/datasets/logs.rs +++ b/crates/freeze/src/datasets/logs.rs @@ -8,7 +8,6 @@ use polars::prelude::*; pub struct Logs { n_rows: u64, block_number: Vec, - block_hash: Vec>>, transaction_index: Vec, log_index: Vec, transaction_hash: Vec>, @@ -28,24 +27,6 @@ impl Dataset for Logs { vec!["events"] } - fn default_columns() -> Option> { - Some(vec![ - "block_number", - // "block_hash", - "transaction_index", - "log_index", - "transaction_hash", - "address", - "topic0", - "topic1", - "topic2", - "topic3", - "data", - // "event_cols", - "chain_id", - ]) - } - fn optional_parameters() -> Vec { vec![Dim::Address, Dim::Topic0, Dim::Topic1, Dim::Topic2, Dim::Topic3] } @@ -123,7 +104,6 @@ fn process_logs(logs: Vec, columns: &mut Logs, schema: &Table) -> R<()> { columns.n_rows += 1; store!(schema, columns, block_number, bn.as_u32()); - store!(schema, columns, block_hash, log.block_hash.map(|bh| bh.as_bytes().to_vec())); store!(schema, columns, transaction_index, ti.as_u32()); store!(schema, columns, log_index, li.as_u32()); store!(schema, columns, transaction_hash, tx.as_bytes().to_vec()); diff --git a/crates/freeze/src/datasets/native_transfers.rs b/crates/freeze/src/datasets/native_transfers.rs index 950bcdec..53bbbd21 100644 --- a/crates/freeze/src/datasets/native_transfers.rs +++ b/crates/freeze/src/datasets/native_transfers.rs @@ -8,7 +8,6 @@ use polars::prelude::*; pub struct NativeTransfers { n_rows: u64, block_number: Vec, - block_hash: Vec>, transaction_index: Vec>, transfer_index: Vec, transaction_hash: Vec>>, @@ -68,7 +67,6 @@ pub(crate) fn process_native_transfers( columns.n_rows += 1; store!(schema, columns, block_number, trace.block_number as u32); store!(schema, columns, transaction_index, trace.transaction_position.map(|x| x as u32)); - store!(schema, columns, block_hash, trace.block_hash.as_bytes().to_vec()); store!(schema, columns, transfer_index, transfer_index as u32); store!( schema, diff --git a/crates/freeze/src/datasets/transactions.rs b/crates/freeze/src/datasets/transactions.rs index 896b12ab..ebf5ad52 100644 --- a/crates/freeze/src/datasets/transactions.rs +++ b/crates/freeze/src/datasets/transactions.rs @@ -1,6 +1,14 @@ use crate::*; use ethers::prelude::*; use polars::prelude::*; +use std::collections::HashSet; +use std::sync::Mutex; +use lazy_static::lazy_static; + +lazy_static! { + static ref TRANSACTIONS_SET: Mutex>> = Mutex::new(HashSet::new()); +} + /// columns for transactions #[cryo_to_df::to_df(Datatype::Transactions)] @@ -11,6 +19,7 @@ pub struct Transactions { transaction_index: Vec>, transaction_hash: Vec>, nonce: Vec, + address: Vec>, from_address: Vec>, to_address: Vec>>, value: Vec, @@ -55,7 +64,8 @@ impl Dataset for Transactions { } fn optional_parameters() -> Vec { - vec![Dim::FromAddress, Dim::ToAddress] + vec![Dim::Address,Dim::FromAddress, Dim::ToAddress] + //vec![Dim::FromAddress, Dim::ToAddress] } } @@ -65,8 +75,10 @@ pub type TransactionAndReceipt = (Transaction, Option); #[async_trait::async_trait] impl CollectByBlock for Transactions { type Response = (Block, Vec, bool); + //println!("impl"); async fn extract(request: Params, source: Arc, query: Arc) -> R { + let block = source .fetcher .get_block_with_txs(request.block_number()?) @@ -77,8 +89,10 @@ impl CollectByBlock for Transactions { // 1. collect transactions and filter them if optional parameters are supplied // filter by from_address let from_filter: Box bool + Send> = - if let Some(from_address) = &request.from_address { - Box::new(move |tx| tx.from.as_bytes() == from_address) + if let Some(from_address) = &request.from_address { + Box::new(move |tx| { + from_address == tx.from.as_bytes() + }) } else { Box::new(|_| true) }; @@ -89,16 +103,37 @@ impl CollectByBlock for Transactions { } else { Box::new(|_| true) }; - let transactions = - block.transactions.clone().into_iter().filter(from_filter).filter(to_filter).collect(); + // filter by address + let addr_filter: Box bool + Send> = + if let Some(address) = &request.address { + Box::new(move |tx| { + let mut transactions_set = TRANSACTIONS_SET.lock().unwrap(); + if transactions_set.contains(&tx.hash.as_bytes().to_vec()) { + false + } else { + let condition = tx.to.as_ref().map_or(false, |x| x.as_bytes() == address) || + tx.from.as_bytes() == address; + if condition { + transactions_set.insert(tx.hash.as_bytes().to_vec()); + } + condition + } + }) + } else { + Box::new(|_| true) + }; + + let transactions = + block.transactions.clone().into_iter().filter(from_filter).filter(to_filter).filter(addr_filter).collect(); + //block.transactions.clone().into_iter().filter(from_filter).filter(to_filter).collect(); // 2. collect receipts if necessary // if transactions are filtered fetch by set of transaction hashes, else fetch all receipts // in block let receipts: Vec> = if schema.has_column("gas_used") | schema.has_column("success") { // receipts required - let receipts = if request.from_address.is_some() || request.to_address.is_some() { + let receipts = if request.from_address.is_some() || request.to_address.is_some() || request.address.is_some() { source.get_tx_receipts(&transactions).await? } else { source.get_tx_receipts_in_block(&block).await? @@ -108,8 +143,8 @@ impl CollectByBlock for Transactions { vec![None; block.transactions.len()] }; - let transactions_with_receips = transactions.into_iter().zip(receipts).collect(); - Ok((block, transactions_with_receips, query.exclude_failed)) + let transactions_with_receipts = transactions.into_iter().zip(receipts).collect(); + Ok((block, transactions_with_receipts, query.exclude_failed)) } fn transform(response: Self::Response, columns: &mut Self, query: &Arc) -> R<()> { diff --git a/crates/freeze/src/freeze.rs b/crates/freeze/src/freeze.rs index a9b3f82e..9862ca3c 100644 --- a/crates/freeze/src/freeze.rs +++ b/crates/freeze/src/freeze.rs @@ -178,6 +178,11 @@ async fn freeze_partition(payload: PartitionPayload) -> Result<(), CollectError> // write dataframes to disk for (datatype, mut df) in dfs { + if Some(env.write_empty)==Some(Some(false)){ + if df.shape().0 == 0 { + continue; + }; + }; let path = paths.get(&datatype).ok_or_else(|| { CollectError::CollectError("could not get path for datatype".to_string()) })?; diff --git a/crates/freeze/src/types/execution.rs b/crates/freeze/src/types/execution.rs index a8285d22..0404909c 100644 --- a/crates/freeze/src/types/execution.rs +++ b/crates/freeze/src/types/execution.rs @@ -25,6 +25,8 @@ pub struct ExecutionEnv { pub t_end: Option, /// report directory pub report_dir: Option, + /// skips writing empty dfs + pub write_empty: Option, } impl ExecutionEnv { @@ -67,6 +69,7 @@ pub struct ExecutionEnvBuilder { t_start: SystemTime, t_end: Option, report_dir: Option, + write_empty: Option, } impl Default for ExecutionEnvBuilder { @@ -82,6 +85,7 @@ impl Default for ExecutionEnvBuilder { t_start: SystemTime::now(), t_end: None, report_dir: None, + write_empty: Some(true) } } } @@ -134,6 +138,11 @@ impl ExecutionEnvBuilder { self } + /// write_empty + pub fn write_empty(mut self, write_empty: Option) -> Self { + self.write_empty = write_empty; + self + } /// build final output pub fn build(self) -> ExecutionEnv { ExecutionEnv { @@ -147,6 +156,7 @@ impl ExecutionEnvBuilder { t_start: self.t_start, t_end: self.t_end, report_dir: self.report_dir, + write_empty: self.write_empty, } } } diff --git a/crates/python/pyproject.toml b/crates/python/pyproject.toml index f7bbb405..3d8a7ffa 100644 --- a/crates/python/pyproject.toml +++ b/crates/python/pyproject.toml @@ -5,6 +5,7 @@ build-backend = "maturin" [project] name = "cryo_python" requires-python = ">=3.7" +dependencies = ["maturin>=1.1,<2.0","pandas==2.1.3","polars==0.19.19","pyarrow==14.0.1"] classifiers = [ "Programming Language :: Rust", "Programming Language :: Python :: Implementation :: CPython", diff --git a/crates/python/python/cryo/_args.py b/crates/python/python/cryo/_args.py index 01ef0e86..bad37b61 100644 --- a/crates/python/python/cryo/_args.py +++ b/crates/python/python/cryo/_args.py @@ -37,7 +37,8 @@ def parse_cli_args( else: raise Exception('unknown file_format') - kwargs['no_verbose'] = not verbose - + if 'no_verbose' not in kwargs.keys(): + kwargs['no_verbose'] = not verbose + return kwargs diff --git a/crates/python/python/cryo/_spec.py b/crates/python/python/cryo/_spec.py index 11af8ae3..d01a3679 100644 --- a/crates/python/python/cryo/_spec.py +++ b/crates/python/python/cryo/_spec.py @@ -54,4 +54,14 @@ class CryoCliArgs(TypedDict, total=False): topic3: str | bytes | None inner_request_size: int | None no_verbose: bool + address: typing.Sequence[str] | None + to_address: typing.Sequence[str] | None + from_address: typing.Sequence[str] | None + call_data: typing.Sequence[str] | None + function: typing.Sequence[str] | None + inputs: typing.Sequence[str] | None + slot: typing.Sequence[str] | None + event_signature: str | bytes | None + inner_request_size: int | None + js_tracer: str | bytes | None diff --git a/crates/python/rust/collect_adapter.rs b/crates/python/rust/collect_adapter.rs index 0f8e4672..dabb8997 100644 --- a/crates/python/rust/collect_adapter.rs +++ b/crates/python/rust/collect_adapter.rs @@ -64,6 +64,7 @@ use cryo_freeze::collect; verbose = false, no_verbose = false, event_signature = None, + write_empty = false ) )] #[allow(clippy::too_many_arguments)] @@ -125,6 +126,7 @@ pub fn _collect( verbose: bool, no_verbose: bool, event_signature: Option, + write_empty: Option ) -> PyResult<&PyAny> { if let Some(command) = command { pyo3_asyncio::tokio::future_into_py(py, async move { @@ -190,6 +192,7 @@ pub fn _collect( verbose, no_verbose, event_signature, + write_empty, }; pyo3_asyncio::tokio::future_into_py(py, async move { match run_collect(args).await { diff --git a/crates/python/rust/freeze_adapter.rs b/crates/python/rust/freeze_adapter.rs index 5b7616ea..368adb02 100644 --- a/crates/python/rust/freeze_adapter.rs +++ b/crates/python/rust/freeze_adapter.rs @@ -61,6 +61,7 @@ use cryo_cli::{run, Args}; verbose = false, no_verbose = false, event_signature = None, + write_empty = None, ) )] #[allow(clippy::too_many_arguments)] @@ -122,6 +123,7 @@ pub fn _freeze( verbose: bool, no_verbose: bool, event_signature: Option, + write_empty: Option ) -> PyResult<&PyAny> { if let Some(command) = command { freeze_command(py, command) @@ -182,6 +184,7 @@ pub fn _freeze( verbose, no_verbose, event_signature, + write_empty, }; pyo3_asyncio::tokio::future_into_py(py, async move { From 3a2a533eb0b566ededef66b76b78f2c1f930fc48 Mon Sep 17 00:00:00 2001 From: davidthegardens Date: Thu, 7 Dec 2023 15:04:47 -0400 Subject: [PATCH 02/12] woops wrong commit --- crates/cli/src/parse/timestamps.rs | 2 +- .../src/datasets/address_appearances.rs | 129 ++++++++++++++++-- crates/freeze/src/datasets/contracts.rs | 4 +- .../freeze/src/datasets/erc721_transfers.rs | 19 ++- crates/freeze/src/datasets/logs.rs | 22 ++- .../freeze/src/datasets/native_transfers.rs | 4 +- crates/freeze/src/datasets/transactions.rs | 2 +- 7 files changed, 162 insertions(+), 20 deletions(-) diff --git a/crates/cli/src/parse/timestamps.rs b/crates/cli/src/parse/timestamps.rs index a9342c6e..8ee4a594 100644 --- a/crates/cli/src/parse/timestamps.rs +++ b/crates/cli/src/parse/timestamps.rs @@ -297,7 +297,7 @@ async fn timestamp_to_block_number( .map_err(|_e| ParseError::ParseError("Error fetching block for timestamp".to_string()))? .unwrap(); - #[warn(clippy::comparison_chain)] + #[allow(clippy::comparison_chain)] if block.timestamp == timestamp.into() { return Ok(mid); } else if block.timestamp < timestamp.into() { diff --git a/crates/freeze/src/datasets/address_appearances.rs b/crates/freeze/src/datasets/address_appearances.rs index 897f969d..71f3c1db 100644 --- a/crates/freeze/src/datasets/address_appearances.rs +++ b/crates/freeze/src/datasets/address_appearances.rs @@ -9,6 +9,7 @@ use std::collections::HashMap; pub struct AddressAppearances { n_rows: usize, block_number: Vec, + block_hash: Vec>, transaction_hash: Vec>, address: Vec>, relationship: Vec, @@ -17,6 +18,17 @@ pub struct AddressAppearances { #[async_trait::async_trait] impl Dataset for AddressAppearances { + fn default_columns() -> Option> { + Some(vec![ + "block_number", + // "block_hash", + "transaction_hash", + "address", + "relationship", + "chain_id", + ]) + } + fn default_sort() -> Option> { Some(vec!["block_number", "transaction_hash", "address", "relationship"]) } @@ -116,7 +128,8 @@ impl AddressAppearances { logs_by_tx: &HashMap>, ) { let block_number = trace.block_number as u32; - self.process_address(block_author, "miner_fee", block_number, tx_hash, schema); + let block_hash = trace.block_hash.as_bytes().to_vec(); + self.process_address(block_author, "miner_fee", block_number, &block_hash, tx_hash, schema); if let Some(logs) = logs_by_tx.get(&tx_hash) { for log in logs.iter() { @@ -126,12 +139,26 @@ impl AddressAppearances { from.copy_from_slice(&log.topics[1].to_fixed_bytes()[12..32]); let name = &(name.to_string() + "_from"); - self.process_address(H160(from), name, block_number, tx_hash, schema); + self.process_address( + H160(from), + name, + block_number, + &block_hash, + tx_hash, + schema, + ); let mut to: [u8; 20] = [0; 20]; to.copy_from_slice(&log.topics[1].to_fixed_bytes()[12..32]); let name = &(name.to_string() + "_to"); - self.process_address(H160(to), name, block_number, tx_hash, schema); + self.process_address( + H160(to), + name, + block_number, + &block_hash, + tx_hash, + schema, + ); } } } @@ -139,47 +166,119 @@ impl AddressAppearances { match &trace.action { Action::Call(action) => { - self.process_address(action.from, "tx_from", block_number, tx_hash, schema); - self.process_address(action.to, "tx_to", block_number, tx_hash, schema); + self.process_address( + action.from, + "tx_from", + block_number, + &block_hash, + tx_hash, + schema, + ); + self.process_address( + action.to, + "tx_to", + block_number, + &block_hash, + tx_hash, + schema, + ); } Action::Create(action) => { - self.process_address(action.from, "tx_from", block_number, tx_hash, schema); + self.process_address( + action.from, + "tx_from", + block_number, + &block_hash, + tx_hash, + schema, + ); } _ => {} } if let Some(Res::Create(result)) = &trace.result { - self.process_address(result.address, "tx_to", block_number, tx_hash, schema); + self.process_address( + result.address, + "tx_to", + block_number, + &block_hash, + tx_hash, + schema, + ); } } fn process_trace(&mut self, trace: &Trace, schema: &Table, tx_hash: H256) { let block_number = trace.block_number as u32; + let block_hash = trace.block_hash.as_bytes().to_vec(); match &trace.action { Action::Call(action) => { - self.process_address(action.from, "call_from", block_number, tx_hash, schema); - self.process_address(action.to, "call_to", block_number, tx_hash, schema); + self.process_address( + action.from, + "call_from", + block_number, + &block_hash, + tx_hash, + schema, + ); + self.process_address( + action.to, + "call_to", + block_number, + &block_hash, + tx_hash, + schema, + ); } Action::Create(action) => { - self.process_address(action.from, "factory", block_number, tx_hash, schema); + self.process_address( + action.from, + "factory", + block_number, + &block_hash, + tx_hash, + schema, + ); } Action::Suicide(action) => { - self.process_address(action.address, "suicide", block_number, tx_hash, schema); + self.process_address( + action.address, + "suicide", + block_number, + &block_hash, + tx_hash, + schema, + ); self.process_address( action.refund_address, "suicide_refund", block_number, + &block_hash, tx_hash, schema, ); } Action::Reward(action) => { - self.process_address(action.author, "author", block_number, tx_hash, schema); + self.process_address( + action.author, + "author", + block_number, + &block_hash, + tx_hash, + schema, + ); } } if let Some(Res::Create(result)) = &trace.result { - self.process_address(result.address, "create", block_number, tx_hash, schema); + self.process_address( + result.address, + "create", + block_number, + &block_hash, + tx_hash, + schema, + ); }; } @@ -188,6 +287,7 @@ impl AddressAppearances { address: H160, relationship: &str, block_number: u32, + block_hash: &[u8], transaction_hash: H256, schema: &Table, ) { @@ -195,6 +295,7 @@ impl AddressAppearances { store!(schema, self, address, address.as_bytes().to_vec()); store!(schema, self, relationship, relationship.to_string()); store!(schema, self, block_number, block_number); + store!(schema, self, block_hash, block_hash.to_vec()); store!(schema, self, transaction_hash, transaction_hash.as_bytes().to_vec()); } } @@ -230,4 +331,4 @@ fn process_appearances( } Ok(()) -} +} \ No newline at end of file diff --git a/crates/freeze/src/datasets/contracts.rs b/crates/freeze/src/datasets/contracts.rs index e6a3d209..a4f7fb5e 100644 --- a/crates/freeze/src/datasets/contracts.rs +++ b/crates/freeze/src/datasets/contracts.rs @@ -10,6 +10,7 @@ use polars::prelude::*; pub struct Contracts { n_rows: u64, block_number: Vec, + block_hash: Vec>, create_index: Vec, transaction_hash: Vec>>, contract_address: Vec>, @@ -82,6 +83,7 @@ pub(crate) fn process_contracts( { columns.n_rows += 1; store!(schema, columns, block_number, trace.block_number as u32); + store!(schema, columns, block_hash, trace.block_hash.as_bytes().to_vec()); store!(schema, columns, create_index, create_index); create_index += 1; let tx = trace.transaction_hash; @@ -96,4 +98,4 @@ pub(crate) fn process_contracts( } } Ok(()) -} +} \ No newline at end of file diff --git a/crates/freeze/src/datasets/erc721_transfers.rs b/crates/freeze/src/datasets/erc721_transfers.rs index b01b3378..ab699cec 100644 --- a/crates/freeze/src/datasets/erc721_transfers.rs +++ b/crates/freeze/src/datasets/erc721_transfers.rs @@ -8,6 +8,7 @@ use polars::prelude::*; pub struct Erc721Transfers { n_rows: u64, block_number: Vec, + block_hash: Vec>>, transaction_index: Vec, log_index: Vec, transaction_hash: Vec>, @@ -20,6 +21,21 @@ pub struct Erc721Transfers { #[async_trait::async_trait] impl Dataset for Erc721Transfers { + fn default_columns() -> Option> { + Some(vec![ + "block_number", + // "block_hash", + "transaction_index", + "log_index", + "transaction_hash", + "erc20", + "from_address", + "to_address", + "token_id", + "chain_id", + ]) + } + fn optional_parameters() -> Vec { vec![Dim::Address, Dim::FromAddress, Dim::ToAddress] } @@ -93,6 +109,7 @@ fn process_erc721_transfers( { columns.n_rows += 1; store!(schema, columns, block_number, bn.as_u32()); + store!(schema, columns, block_hash, log.block_hash.map(|bh| bh.as_bytes().to_vec())); store!(schema, columns, transaction_index, ti.as_u32()); store!(schema, columns, log_index, li.as_u32()); store!(schema, columns, transaction_hash, tx.as_bytes().to_vec()); @@ -103,4 +120,4 @@ fn process_erc721_transfers( } } Ok(()) -} +} \ No newline at end of file diff --git a/crates/freeze/src/datasets/logs.rs b/crates/freeze/src/datasets/logs.rs index 322f3c7e..ab775c79 100644 --- a/crates/freeze/src/datasets/logs.rs +++ b/crates/freeze/src/datasets/logs.rs @@ -8,6 +8,7 @@ use polars::prelude::*; pub struct Logs { n_rows: u64, block_number: Vec, + block_hash: Vec>>, transaction_index: Vec, log_index: Vec, transaction_hash: Vec>, @@ -27,6 +28,24 @@ impl Dataset for Logs { vec!["events"] } + fn default_columns() -> Option> { + Some(vec![ + "block_number", + // "block_hash", + "transaction_index", + "log_index", + "transaction_hash", + "address", + "topic0", + "topic1", + "topic2", + "topic3", + "data", + // "event_cols", + "chain_id", + ]) + } + fn optional_parameters() -> Vec { vec![Dim::Address, Dim::Topic0, Dim::Topic1, Dim::Topic2, Dim::Topic3] } @@ -104,6 +123,7 @@ fn process_logs(logs: Vec, columns: &mut Logs, schema: &Table) -> R<()> { columns.n_rows += 1; store!(schema, columns, block_number, bn.as_u32()); + store!(schema, columns, block_hash, log.block_hash.map(|bh| bh.as_bytes().to_vec())); store!(schema, columns, transaction_index, ti.as_u32()); store!(schema, columns, log_index, li.as_u32()); store!(schema, columns, transaction_hash, tx.as_bytes().to_vec()); @@ -129,4 +149,4 @@ fn process_logs(logs: Vec, columns: &mut Logs, schema: &Table) -> R<()> { } Ok(()) -} +} \ No newline at end of file diff --git a/crates/freeze/src/datasets/native_transfers.rs b/crates/freeze/src/datasets/native_transfers.rs index 53bbbd21..8b693598 100644 --- a/crates/freeze/src/datasets/native_transfers.rs +++ b/crates/freeze/src/datasets/native_transfers.rs @@ -8,6 +8,7 @@ use polars::prelude::*; pub struct NativeTransfers { n_rows: u64, block_number: Vec, + block_hash: Vec>, transaction_index: Vec>, transfer_index: Vec, transaction_hash: Vec>>, @@ -67,6 +68,7 @@ pub(crate) fn process_native_transfers( columns.n_rows += 1; store!(schema, columns, block_number, trace.block_number as u32); store!(schema, columns, transaction_index, trace.transaction_position.map(|x| x as u32)); + store!(schema, columns, block_hash, trace.block_hash.as_bytes().to_vec()); store!(schema, columns, transfer_index, transfer_index as u32); store!( schema, @@ -104,4 +106,4 @@ pub(crate) fn process_native_transfers( } } Ok(()) -} +} \ No newline at end of file diff --git a/crates/freeze/src/datasets/transactions.rs b/crates/freeze/src/datasets/transactions.rs index ebf5ad52..2ff1333f 100644 --- a/crates/freeze/src/datasets/transactions.rs +++ b/crates/freeze/src/datasets/transactions.rs @@ -126,7 +126,7 @@ impl CollectByBlock for Transactions { let transactions = block.transactions.clone().into_iter().filter(from_filter).filter(to_filter).filter(addr_filter).collect(); - //block.transactions.clone().into_iter().filter(from_filter).filter(to_filter).collect(); + // 2. collect receipts if necessary // if transactions are filtered fetch by set of transaction hashes, else fetch all receipts // in block From 7add8dfae01269b135473e232db3c2a7873e9943 Mon Sep 17 00:00:00 2001 From: davidthegardens Date: Fri, 29 Dec 2023 18:09:55 -0500 Subject: [PATCH 03/12] corrections 4 storm --- crates/cli/src/args.rs | 4 +- crates/cli/src/parse/execution.rs | 2 +- crates/freeze/src/datasets/erc20_transfers.rs | 17 +++++++ crates/freeze/src/datasets/transactions.rs | 50 +++++++++---------- crates/freeze/src/freeze.rs | 2 +- crates/freeze/src/types/execution.rs | 14 +++--- crates/python/rust/collect_adapter.rs | 6 +-- crates/python/rust/freeze_adapter.rs | 6 +-- 8 files changed, 59 insertions(+), 42 deletions(-) diff --git a/crates/cli/src/args.rs b/crates/cli/src/args.rs index 125e5489..038bd52a 100644 --- a/crates/cli/src/args.rs +++ b/crates/cli/src/args.rs @@ -196,9 +196,9 @@ pub struct Args { #[arg(long, help_heading = "Output Options")] pub no_report: bool, - /// Skip if writing if dataframe is empty + /// Skip writing to dataframe if the df is empty #[arg(long, help_heading = "Acquisition Options")] - pub write_empty: Option, + pub skip_empty: Option, /// Address(es) #[arg(long, help_heading = "Dataset-specific Options", num_args(1..))] diff --git a/crates/cli/src/parse/execution.rs b/crates/cli/src/parse/execution.rs index 4dce1154..91162b33 100644 --- a/crates/cli/src/parse/execution.rs +++ b/crates/cli/src/parse/execution.rs @@ -17,7 +17,7 @@ pub(crate) fn parse_execution_env(args: &Args, n_tasks: u64) -> Result, + block_hash: Vec>>, transaction_index: Vec, log_index: Vec, transaction_hash: Vec>, @@ -20,6 +21,21 @@ pub struct Erc20Transfers { #[async_trait::async_trait] impl Dataset for Erc20Transfers { + fn default_columns() -> Option> { + Some(vec![ + "block_number", + // "block_hash", + "transaction_index", + "log_index", + "transaction_hash", + "erc20", + "from_address", + "to_address", + "value", + "chain_id", + ]) + } + fn optional_parameters() -> Vec { vec![Dim::Address, Dim::Topic0, Dim::Topic1, Dim::Topic2, Dim::FromAddress, Dim::ToAddress] } @@ -88,6 +104,7 @@ fn process_erc20_transfers(logs: Vec, columns: &mut Erc20Transfers, schema: { columns.n_rows += 1; store!(schema, columns, block_number, bn.as_u32()); + store!(schema, columns, block_hash, log.block_hash.map(|bh| bh.as_bytes().to_vec())); store!(schema, columns, transaction_index, ti.as_u32()); store!(schema, columns, log_index, li.as_u32()); store!(schema, columns, transaction_hash, tx.as_bytes().to_vec()); diff --git a/crates/freeze/src/datasets/transactions.rs b/crates/freeze/src/datasets/transactions.rs index 2ff1333f..cb9c9d1b 100644 --- a/crates/freeze/src/datasets/transactions.rs +++ b/crates/freeze/src/datasets/transactions.rs @@ -1,13 +1,7 @@ use crate::*; use ethers::prelude::*; use polars::prelude::*; -use std::collections::HashSet; -use std::sync::Mutex; -use lazy_static::lazy_static; - -lazy_static! { - static ref TRANSACTIONS_SET: Mutex>> = Mutex::new(HashSet::new()); -} +use crate::ExecutionEnv; /// columns for transactions @@ -19,7 +13,6 @@ pub struct Transactions { transaction_index: Vec>, transaction_hash: Vec>, nonce: Vec, - address: Vec>, from_address: Vec>, to_address: Vec>>, value: Vec, @@ -65,7 +58,6 @@ impl Dataset for Transactions { fn optional_parameters() -> Vec { vec![Dim::Address,Dim::FromAddress, Dim::ToAddress] - //vec![Dim::FromAddress, Dim::ToAddress] } } @@ -75,10 +67,24 @@ pub type TransactionAndReceipt = (Transaction, Option); #[async_trait::async_trait] impl CollectByBlock for Transactions { type Response = (Block, Vec, bool); - //println!("impl"); async fn extract(request: Params, source: Arc, query: Arc) -> R { + fn get_addresses() -> Vec { + let env = ExecutionEnv::default(); + let cli_command = env.cli_command.unwrap(); + if let Some(address_index) = cli_command.iter().position(|arg| arg == "--address") { + cli_command[address_index+1..] + .to_vec() + .iter() + .take_while(|&arg| !arg.starts_with("--")) + .map(|s| s.parse::().expect("Invalid H160")) + .collect::>() + } else { + Vec::new() + } + } + let block = source .fetcher .get_block_with_txs(request.block_number()?) @@ -90,9 +96,7 @@ impl CollectByBlock for Transactions { // filter by from_address let from_filter: Box bool + Send> = if let Some(from_address) = &request.from_address { - Box::new(move |tx| { - from_address == tx.from.as_bytes() - }) + Box::new(move |tx| {from_address == tx.from.as_bytes()}) } else { Box::new(|_| true) }; @@ -103,27 +107,23 @@ impl CollectByBlock for Transactions { } else { Box::new(|_| true) }; - - // filter by address + // filter by addresses (if either the to or from address are in the vector of addresses) + let addresses=get_addresses(); let addr_filter: Box bool + Send> = if let Some(address) = &request.address { Box::new(move |tx| { - let mut transactions_set = TRANSACTIONS_SET.lock().unwrap(); - if transactions_set.contains(&tx.hash.as_bytes().to_vec()) { - false + let to_address = addresses.contains(&tx.to.unwrap()); + let from_address = addresses.contains(&tx.from); + if !(to_address && from_address){ + tx.to.as_ref().map_or(false, |x| x.as_bytes()==address) } else { - let condition = tx.to.as_ref().map_or(false, |x| x.as_bytes() == address) || - tx.from.as_bytes() == address; - if condition { - transactions_set.insert(tx.hash.as_bytes().to_vec()); - } - condition - } + tx.from.as_bytes()==address} }) } else { Box::new(|_| true) }; + let transactions = block.transactions.clone().into_iter().filter(from_filter).filter(to_filter).filter(addr_filter).collect(); diff --git a/crates/freeze/src/freeze.rs b/crates/freeze/src/freeze.rs index 9862ca3c..e2ae831b 100644 --- a/crates/freeze/src/freeze.rs +++ b/crates/freeze/src/freeze.rs @@ -178,7 +178,7 @@ async fn freeze_partition(payload: PartitionPayload) -> Result<(), CollectError> // write dataframes to disk for (datatype, mut df) in dfs { - if Some(env.write_empty)==Some(Some(false)){ + if env.skip_empty==Some(true){ if df.shape().0 == 0 { continue; }; diff --git a/crates/freeze/src/types/execution.rs b/crates/freeze/src/types/execution.rs index 0404909c..7d48d9c6 100644 --- a/crates/freeze/src/types/execution.rs +++ b/crates/freeze/src/types/execution.rs @@ -26,7 +26,7 @@ pub struct ExecutionEnv { /// report directory pub report_dir: Option, /// skips writing empty dfs - pub write_empty: Option, + pub skip_empty: Option, } impl ExecutionEnv { @@ -69,7 +69,7 @@ pub struct ExecutionEnvBuilder { t_start: SystemTime, t_end: Option, report_dir: Option, - write_empty: Option, + skip_empty: Option, } impl Default for ExecutionEnvBuilder { @@ -85,7 +85,7 @@ impl Default for ExecutionEnvBuilder { t_start: SystemTime::now(), t_end: None, report_dir: None, - write_empty: Some(true) + skip_empty: Some(false) } } } @@ -138,9 +138,9 @@ impl ExecutionEnvBuilder { self } - /// write_empty - pub fn write_empty(mut self, write_empty: Option) -> Self { - self.write_empty = write_empty; + /// skip_empty + pub fn skip_empty(mut self, skip_empty: Option) -> Self { + self.skip_empty = skip_empty; self } /// build final output @@ -156,7 +156,7 @@ impl ExecutionEnvBuilder { t_start: self.t_start, t_end: self.t_end, report_dir: self.report_dir, - write_empty: self.write_empty, + skip_empty: self.skip_empty, } } } diff --git a/crates/python/rust/collect_adapter.rs b/crates/python/rust/collect_adapter.rs index dabb8997..d9569379 100644 --- a/crates/python/rust/collect_adapter.rs +++ b/crates/python/rust/collect_adapter.rs @@ -64,7 +64,7 @@ use cryo_freeze::collect; verbose = false, no_verbose = false, event_signature = None, - write_empty = false + skip_empty = Some(false) ) )] #[allow(clippy::too_many_arguments)] @@ -126,7 +126,7 @@ pub fn _collect( verbose: bool, no_verbose: bool, event_signature: Option, - write_empty: Option + skip_empty: Option ) -> PyResult<&PyAny> { if let Some(command) = command { pyo3_asyncio::tokio::future_into_py(py, async move { @@ -192,7 +192,7 @@ pub fn _collect( verbose, no_verbose, event_signature, - write_empty, + skip_empty, }; pyo3_asyncio::tokio::future_into_py(py, async move { match run_collect(args).await { diff --git a/crates/python/rust/freeze_adapter.rs b/crates/python/rust/freeze_adapter.rs index 368adb02..093ecd2e 100644 --- a/crates/python/rust/freeze_adapter.rs +++ b/crates/python/rust/freeze_adapter.rs @@ -61,7 +61,7 @@ use cryo_cli::{run, Args}; verbose = false, no_verbose = false, event_signature = None, - write_empty = None, + skip_empty = Some(False), ) )] #[allow(clippy::too_many_arguments)] @@ -123,7 +123,7 @@ pub fn _freeze( verbose: bool, no_verbose: bool, event_signature: Option, - write_empty: Option + skip_empty: Option ) -> PyResult<&PyAny> { if let Some(command) = command { freeze_command(py, command) @@ -184,7 +184,7 @@ pub fn _freeze( verbose, no_verbose, event_signature, - write_empty, + skip_empty, }; pyo3_asyncio::tokio::future_into_py(py, async move { From cc7b4e39a02a0e4cad94f292afbd563d57d598cf Mon Sep 17 00:00:00 2001 From: davidthegardens Date: Fri, 29 Dec 2023 18:13:35 -0500 Subject: [PATCH 04/12] remove maturin and pandas --- crates/python/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/python/pyproject.toml b/crates/python/pyproject.toml index 3d8a7ffa..5dcfe503 100644 --- a/crates/python/pyproject.toml +++ b/crates/python/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "maturin" [project] name = "cryo_python" requires-python = ">=3.7" -dependencies = ["maturin>=1.1,<2.0","pandas==2.1.3","polars==0.19.19","pyarrow==14.0.1"] +dependencies = ["polars==0.19.19","pyarrow==14.0.1"] classifiers = [ "Programming Language :: Rust", "Programming Language :: Python :: Implementation :: CPython", From 5facb1f8f0f9eaa125fc2f2e05cc51312944573b Mon Sep 17 00:00:00 2001 From: davidthegardens Date: Fri, 29 Dec 2023 18:27:31 -0500 Subject: [PATCH 05/12] add pandas back --- crates/python/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/python/pyproject.toml b/crates/python/pyproject.toml index 5dcfe503..eed58002 100644 --- a/crates/python/pyproject.toml +++ b/crates/python/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "maturin" [project] name = "cryo_python" requires-python = ">=3.7" -dependencies = ["polars==0.19.19","pyarrow==14.0.1"] +dependencies = ["pandas==2.1.3","polars==0.19.19","pyarrow==14.0.1"] classifiers = [ "Programming Language :: Rust", "Programming Language :: Python :: Implementation :: CPython", From e0bdbb468fff6c6e193881c79db4447b67e1d7a8 Mon Sep 17 00:00:00 2001 From: davidthegardens Date: Sat, 30 Dec 2023 13:25:16 -0500 Subject: [PATCH 06/12] fix dependencies --- crates/python/pyproject.toml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/python/pyproject.toml b/crates/python/pyproject.toml index eed58002..a2e54fce 100644 --- a/crates/python/pyproject.toml +++ b/crates/python/pyproject.toml @@ -5,13 +5,15 @@ build-backend = "maturin" [project] name = "cryo_python" requires-python = ">=3.7" -dependencies = ["pandas==2.1.3","polars==0.19.19","pyarrow==14.0.1"] +dependencies = ["polars==*","pyarrow==*"] classifiers = [ "Programming Language :: Rust", "Programming Language :: Python :: Implementation :: CPython", "Programming Language :: Python :: Implementation :: PyPy", ] +[project.optional-dependencies] +pandas = ["pandas>=2.1.3"] [tool.maturin] python-source = "python" From afebb53dd73f0916a30039abf5c537f5a556edc5 Mon Sep 17 00:00:00 2001 From: davidthegardens Date: Sat, 30 Dec 2023 13:35:24 -0500 Subject: [PATCH 07/12] fix unspecified --- crates/python/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/python/pyproject.toml b/crates/python/pyproject.toml index a2e54fce..2babcb6a 100644 --- a/crates/python/pyproject.toml +++ b/crates/python/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "maturin" [project] name = "cryo_python" requires-python = ">=3.7" -dependencies = ["polars==*","pyarrow==*"] +dependencies = ["polars","pyarrow"] classifiers = [ "Programming Language :: Rust", "Programming Language :: Python :: Implementation :: CPython", From 16a73224f875f771ffd54d5d5bbd3b10ff580392 Mon Sep 17 00:00:00 2001 From: davidthegardens Date: Sat, 30 Dec 2023 13:38:13 -0500 Subject: [PATCH 08/12] correct boolean --- crates/python/rust/freeze_adapter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/python/rust/freeze_adapter.rs b/crates/python/rust/freeze_adapter.rs index 093ecd2e..cfe8d568 100644 --- a/crates/python/rust/freeze_adapter.rs +++ b/crates/python/rust/freeze_adapter.rs @@ -61,7 +61,7 @@ use cryo_cli::{run, Args}; verbose = false, no_verbose = false, event_signature = None, - skip_empty = Some(False), + skip_empty = Some(false), ) )] #[allow(clippy::too_many_arguments)] From 79775f9ee99892da4f98b2a705a70000de310df8 Mon Sep 17 00:00:00 2001 From: davidthegardens Date: Tue, 2 Jan 2024 11:00:38 -0500 Subject: [PATCH 09/12] cleanup for fmt --- crates/cli/src/args.rs | 2 +- .../src/datasets/address_appearances.rs | 2 +- crates/freeze/src/datasets/contracts.rs | 2 +- .../freeze/src/datasets/erc721_transfers.rs | 2 +- crates/freeze/src/datasets/logs.rs | 2 +- .../freeze/src/datasets/native_transfers.rs | 2 +- crates/freeze/src/datasets/transactions.rs | 56 ++++++++++--------- crates/freeze/src/freeze.rs | 2 +- crates/freeze/src/types/execution.rs | 2 +- crates/python/rust/collect_adapter.rs | 4 +- crates/python/rust/freeze_adapter.rs | 2 +- 11 files changed, 41 insertions(+), 37 deletions(-) diff --git a/crates/cli/src/args.rs b/crates/cli/src/args.rs index 038bd52a..b4d76817 100644 --- a/crates/cli/src/args.rs +++ b/crates/cli/src/args.rs @@ -195,7 +195,7 @@ pub struct Args { /// Avoid saving a summary report #[arg(long, help_heading = "Output Options")] pub no_report: bool, - + /// Skip writing to dataframe if the df is empty #[arg(long, help_heading = "Acquisition Options")] pub skip_empty: Option, diff --git a/crates/freeze/src/datasets/address_appearances.rs b/crates/freeze/src/datasets/address_appearances.rs index 71f3c1db..683379ae 100644 --- a/crates/freeze/src/datasets/address_appearances.rs +++ b/crates/freeze/src/datasets/address_appearances.rs @@ -331,4 +331,4 @@ fn process_appearances( } Ok(()) -} \ No newline at end of file +} diff --git a/crates/freeze/src/datasets/contracts.rs b/crates/freeze/src/datasets/contracts.rs index a4f7fb5e..0dd3a033 100644 --- a/crates/freeze/src/datasets/contracts.rs +++ b/crates/freeze/src/datasets/contracts.rs @@ -98,4 +98,4 @@ pub(crate) fn process_contracts( } } Ok(()) -} \ No newline at end of file +} diff --git a/crates/freeze/src/datasets/erc721_transfers.rs b/crates/freeze/src/datasets/erc721_transfers.rs index ab699cec..7d00f304 100644 --- a/crates/freeze/src/datasets/erc721_transfers.rs +++ b/crates/freeze/src/datasets/erc721_transfers.rs @@ -120,4 +120,4 @@ fn process_erc721_transfers( } } Ok(()) -} \ No newline at end of file +} diff --git a/crates/freeze/src/datasets/logs.rs b/crates/freeze/src/datasets/logs.rs index ab775c79..c4be6327 100644 --- a/crates/freeze/src/datasets/logs.rs +++ b/crates/freeze/src/datasets/logs.rs @@ -149,4 +149,4 @@ fn process_logs(logs: Vec, columns: &mut Logs, schema: &Table) -> R<()> { } Ok(()) -} \ No newline at end of file +} diff --git a/crates/freeze/src/datasets/native_transfers.rs b/crates/freeze/src/datasets/native_transfers.rs index 8b693598..950bcdec 100644 --- a/crates/freeze/src/datasets/native_transfers.rs +++ b/crates/freeze/src/datasets/native_transfers.rs @@ -106,4 +106,4 @@ pub(crate) fn process_native_transfers( } } Ok(()) -} \ No newline at end of file +} diff --git a/crates/freeze/src/datasets/transactions.rs b/crates/freeze/src/datasets/transactions.rs index cb9c9d1b..e51cfcad 100644 --- a/crates/freeze/src/datasets/transactions.rs +++ b/crates/freeze/src/datasets/transactions.rs @@ -1,8 +1,6 @@ use crate::*; use ethers::prelude::*; use polars::prelude::*; -use crate::ExecutionEnv; - /// columns for transactions #[cryo_to_df::to_df(Datatype::Transactions)] @@ -57,7 +55,7 @@ impl Dataset for Transactions { } fn optional_parameters() -> Vec { - vec![Dim::Address,Dim::FromAddress, Dim::ToAddress] + vec![Dim::Address, Dim::FromAddress, Dim::ToAddress] } } @@ -67,14 +65,12 @@ pub type TransactionAndReceipt = (Transaction, Option); #[async_trait::async_trait] impl CollectByBlock for Transactions { type Response = (Block, Vec, bool); - async fn extract(request: Params, source: Arc, query: Arc) -> R { - fn get_addresses() -> Vec { let env = ExecutionEnv::default(); let cli_command = env.cli_command.unwrap(); if let Some(address_index) = cli_command.iter().position(|arg| arg == "--address") { - cli_command[address_index+1..] + cli_command[address_index + 1..] .to_vec() .iter() .take_while(|&arg| !arg.starts_with("--")) @@ -95,8 +91,8 @@ impl CollectByBlock for Transactions { // 1. collect transactions and filter them if optional parameters are supplied // filter by from_address let from_filter: Box bool + Send> = - if let Some(from_address) = &request.from_address { - Box::new(move |tx| {from_address == tx.from.as_bytes()}) + if let Some(from_address) = &request.from_address { + Box::new(move |tx| from_address == tx.from.as_bytes()) } else { Box::new(|_| true) }; @@ -108,24 +104,29 @@ impl CollectByBlock for Transactions { Box::new(|_| true) }; // filter by addresses (if either the to or from address are in the vector of addresses) - let addresses=get_addresses(); + let addresses = get_addresses(); let addr_filter: Box bool + Send> = - if let Some(address) = &request.address { - Box::new(move |tx| { - let to_address = addresses.contains(&tx.to.unwrap()); - let from_address = addresses.contains(&tx.from); - if !(to_address && from_address){ - tx.to.as_ref().map_or(false, |x| x.as_bytes()==address) - } else { - tx.from.as_bytes()==address} - }) - } else { - Box::new(|_| true) - }; - - - let transactions = - block.transactions.clone().into_iter().filter(from_filter).filter(to_filter).filter(addr_filter).collect(); + if let Some(address) = &request.address { + Box::new(move |tx| { + let to_address = addresses.contains(&tx.to.unwrap()); + let from_address = addresses.contains(&tx.from); + if !(to_address && from_address) { + tx.to.as_ref().map_or(false, |x| x.as_bytes() == address) + } else { + tx.from.as_bytes() == address + } + }) + } else { + Box::new(|_| true) + }; + let transactions = block + .transactions + .clone() + .into_iter() + .filter(from_filter) + .filter(to_filter) + .filter(addr_filter) + .collect(); // 2. collect receipts if necessary // if transactions are filtered fetch by set of transaction hashes, else fetch all receipts @@ -133,7 +134,10 @@ impl CollectByBlock for Transactions { let receipts: Vec> = if schema.has_column("gas_used") | schema.has_column("success") { // receipts required - let receipts = if request.from_address.is_some() || request.to_address.is_some() || request.address.is_some() { + let receipts = if request.from_address.is_some() || + request.to_address.is_some() || + request.address.is_some() + { source.get_tx_receipts(&transactions).await? } else { source.get_tx_receipts_in_block(&block).await? diff --git a/crates/freeze/src/freeze.rs b/crates/freeze/src/freeze.rs index e2ae831b..e1816d00 100644 --- a/crates/freeze/src/freeze.rs +++ b/crates/freeze/src/freeze.rs @@ -178,7 +178,7 @@ async fn freeze_partition(payload: PartitionPayload) -> Result<(), CollectError> // write dataframes to disk for (datatype, mut df) in dfs { - if env.skip_empty==Some(true){ + if env.skip_empty == Some(true) { if df.shape().0 == 0 { continue; }; diff --git a/crates/freeze/src/types/execution.rs b/crates/freeze/src/types/execution.rs index 7d48d9c6..1ecaf0a8 100644 --- a/crates/freeze/src/types/execution.rs +++ b/crates/freeze/src/types/execution.rs @@ -85,7 +85,7 @@ impl Default for ExecutionEnvBuilder { t_start: SystemTime::now(), t_end: None, report_dir: None, - skip_empty: Some(false) + skip_empty: Some(false), } } } diff --git a/crates/python/rust/collect_adapter.rs b/crates/python/rust/collect_adapter.rs index d9569379..1bd53871 100644 --- a/crates/python/rust/collect_adapter.rs +++ b/crates/python/rust/collect_adapter.rs @@ -64,7 +64,7 @@ use cryo_freeze::collect; verbose = false, no_verbose = false, event_signature = None, - skip_empty = Some(false) + skip_empty = Some(false), ) )] #[allow(clippy::too_many_arguments)] @@ -126,7 +126,7 @@ pub fn _collect( verbose: bool, no_verbose: bool, event_signature: Option, - skip_empty: Option + skip_empty: Option, ) -> PyResult<&PyAny> { if let Some(command) = command { pyo3_asyncio::tokio::future_into_py(py, async move { diff --git a/crates/python/rust/freeze_adapter.rs b/crates/python/rust/freeze_adapter.rs index cfe8d568..6712bc08 100644 --- a/crates/python/rust/freeze_adapter.rs +++ b/crates/python/rust/freeze_adapter.rs @@ -123,7 +123,7 @@ pub fn _freeze( verbose: bool, no_verbose: bool, event_signature: Option, - skip_empty: Option + skip_empty: Option, ) -> PyResult<&PyAny> { if let Some(command) = command { freeze_command(py, command) From 22781c3282e156af0de52580eba2466fde5fc53c Mon Sep 17 00:00:00 2001 From: davidthegardens Date: Wed, 3 Jan 2024 10:41:15 -0500 Subject: [PATCH 10/12] fix freeze.rs n_rows conflict --- crates/freeze/src/datasets/transactions.rs | 14 +- crates/freeze/src/freeze.rs | 9 +- python | 166 +++++++++++++++++++++ 3 files changed, 180 insertions(+), 9 deletions(-) create mode 100644 python diff --git a/crates/freeze/src/datasets/transactions.rs b/crates/freeze/src/datasets/transactions.rs index e51cfcad..cdf14cb5 100644 --- a/crates/freeze/src/datasets/transactions.rs +++ b/crates/freeze/src/datasets/transactions.rs @@ -108,12 +108,16 @@ impl CollectByBlock for Transactions { let addr_filter: Box bool + Send> = if let Some(address) = &request.address { Box::new(move |tx| { - let to_address = addresses.contains(&tx.to.unwrap()); - let from_address = addresses.contains(&tx.from); - if !(to_address && from_address) { - tx.to.as_ref().map_or(false, |x| x.as_bytes() == address) + let to_address_is_in = addresses.contains(&tx.to.unwrap()); + let from_address_is_in = addresses.contains(&tx.from); + if addresses.len() == 1 { + to_address_is_in || from_address_is_in } else { - tx.from.as_bytes() == address + if !(to_address_is_in && from_address_is_in) { + tx.to.as_ref().map_or(false, |x| x.as_bytes() == address) + } else { + tx.from.as_bytes() == address + } } }) } else { diff --git a/crates/freeze/src/freeze.rs b/crates/freeze/src/freeze.rs index e1816d00..e7d0c97d 100644 --- a/crates/freeze/src/freeze.rs +++ b/crates/freeze/src/freeze.rs @@ -177,11 +177,12 @@ async fn freeze_partition(payload: PartitionPayload) -> Result<(), CollectError> let dfs = collect_partition(datatype, partition, query, source).await?; // write dataframes to disk + let mut n_rows = 0; for (datatype, mut df) in dfs { - if env.skip_empty == Some(true) { - if df.shape().0 == 0 { - continue; - }; + let df_height = df.height() as u64; + n_rows += df_height; + if env.skip_empty == Some(true) && df_height == 0 { + continue; }; let path = paths.get(&datatype).ok_or_else(|| { CollectError::CollectError("could not get path for datatype".to_string()) diff --git a/python b/python new file mode 100644 index 00000000..3949621f --- /dev/null +++ b/python @@ -0,0 +1,166 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + 404 – Hugging Face + + + + + +
+
Hugging Face's logo + +
+ +
+
+ +
+
+ +
+ + +
+ + +
+

404

+

Users/davidthegardens/Documents/python/cryo-1/env/bin/python does not exist on "main" +

+
+ + + + + + + + + + From 501a1c8bfaa82d991c3a881daa08ee586b1a6be0 Mon Sep 17 00:00:00 2001 From: davidthegardens Date: Wed, 3 Jan 2024 10:48:06 -0500 Subject: [PATCH 11/12] move addition to after skip --- crates/freeze/src/freeze.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/freeze/src/freeze.rs b/crates/freeze/src/freeze.rs index e7d0c97d..992b454f 100644 --- a/crates/freeze/src/freeze.rs +++ b/crates/freeze/src/freeze.rs @@ -180,10 +180,10 @@ async fn freeze_partition(payload: PartitionPayload) -> Result<(), CollectError> let mut n_rows = 0; for (datatype, mut df) in dfs { let df_height = df.height() as u64; - n_rows += df_height; if env.skip_empty == Some(true) && df_height == 0 { continue; }; + n_rows += df_height; let path = paths.get(&datatype).ok_or_else(|| { CollectError::CollectError("could not get path for datatype".to_string()) })?; From 2fccee412397a2d3bd0f232b52b6c30a420b0c8f Mon Sep 17 00:00:00 2001 From: "David J. Desjardins" <100804766+davidthegardens@users.noreply.github.com> Date: Mon, 8 Jan 2024 15:17:03 -0500 Subject: [PATCH 12/12] Delete file --- python | 166 --------------------------------------------------------- 1 file changed, 166 deletions(-) delete mode 100644 python diff --git a/python b/python deleted file mode 100644 index 3949621f..00000000 --- a/python +++ /dev/null @@ -1,166 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - 404 – Hugging Face - - - - - -
-
Hugging Face's logo - -
- -
-
- -
-
- -
- - -
- - -
-

404

-

Users/davidthegardens/Documents/python/cryo-1/env/bin/python does not exist on "main" -

-
- - - - - - - - - -