Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add empty chunk handling, importing python crate dependencies, tx filtering by address, support for new datatypes in python crate #147

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
4 changes: 4 additions & 0 deletions crates/cli/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@ pub struct Args {
/// Avoid saving a summary report
#[arg(long, help_heading = "Output Options")]
pub no_report: bool,

/// Skip writing to dataframe if the df is empty
#[arg(long, help_heading = "Acquisition Options")]
pub skip_empty: Option<bool>,

/// Address(es)
#[arg(long, help_heading = "Dataset-specific Options", num_args(1..))]
Expand Down
1 change: 1 addition & 0 deletions crates/cli/src/parse/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub(crate) fn parse_execution_env(args: &Args, n_tasks: u64) -> Result<Execution
.verbose(verbose)
.report(!args.no_report)
.report_dir(args.report_dir.clone())
.skip_empty(args.skip_empty)
.args(args_str);

let builder = if !args.no_verbose {
Expand Down
2 changes: 1 addition & 1 deletion crates/freeze/src/datasets/address_appearances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,4 +331,4 @@ fn process_appearances(
}

Ok(())
}
}
2 changes: 1 addition & 1 deletion crates/freeze/src/datasets/contracts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,4 @@ pub(crate) fn process_contracts(
}
}
Ok(())
}
}
2 changes: 1 addition & 1 deletion crates/freeze/src/datasets/erc721_transfers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,4 @@ fn process_erc721_transfers(
}
}
Ok(())
}
}
2 changes: 1 addition & 1 deletion crates/freeze/src/datasets/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,4 @@ fn process_logs(logs: Vec<Log>, columns: &mut Logs, schema: &Table) -> R<()> {
}

Ok(())
}
}
2 changes: 1 addition & 1 deletion crates/freeze/src/datasets/native_transfers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,4 @@ pub(crate) fn process_native_transfers(
}
}
Ok(())
}
}
49 changes: 42 additions & 7 deletions crates/freeze/src/datasets/transactions.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::*;
use ethers::prelude::*;
use polars::prelude::*;
use crate::ExecutionEnv;


/// columns for transactions
#[cryo_to_df::to_df(Datatype::Transactions)]
Expand Down Expand Up @@ -55,7 +57,7 @@ impl Dataset for Transactions {
}

fn optional_parameters() -> Vec<Dim> {
vec![Dim::FromAddress, Dim::ToAddress]
vec![Dim::Address,Dim::FromAddress, Dim::ToAddress]
}
}

Expand All @@ -67,6 +69,22 @@ impl CollectByBlock for Transactions {
type Response = (Block<Transaction>, Vec<TransactionAndReceipt>, bool);

async fn extract(request: Params, source: Arc<Source>, query: Arc<Query>) -> R<Self::Response> {

fn get_addresses() -> Vec<H160> {
let env = ExecutionEnv::default();
let cli_command = env.cli_command.unwrap();
if let Some(address_index) = cli_command.iter().position(|arg| arg == "--address") {
cli_command[address_index+1..]
.to_vec()
.iter()
.take_while(|&arg| !arg.starts_with("--"))
.map(|s| s.parse::<H160>().expect("Invalid H160"))
.collect::<Vec<H160>>()
} else {
Vec::new()
}
}

let block = source
.fetcher
.get_block_with_txs(request.block_number()?)
Expand All @@ -77,8 +95,8 @@ impl CollectByBlock for Transactions {
// 1. collect transactions and filter them if optional parameters are supplied
// filter by from_address
let from_filter: Box<dyn Fn(&Transaction) -> bool + Send> =
if let Some(from_address) = &request.from_address {
Box::new(move |tx| tx.from.as_bytes() == from_address)
if let Some(from_address) = &request.from_address {
Box::new(move |tx| {from_address == tx.from.as_bytes()})
} else {
Box::new(|_| true)
};
Expand All @@ -89,16 +107,33 @@ impl CollectByBlock for Transactions {
} else {
Box::new(|_| true)
};
// filter by addresses (if either the to or from address are in the vector of addresses)
let addresses=get_addresses();
let addr_filter: Box<dyn Fn(&Transaction) -> bool + Send> =
if let Some(address) = &request.address {
Box::new(move |tx| {
let to_address = addresses.contains(&tx.to.unwrap());
let from_address = addresses.contains(&tx.from);
if !(to_address && from_address){
tx.to.as_ref().map_or(false, |x| x.as_bytes()==address)
} else {
tx.from.as_bytes()==address}
})
} else {
Box::new(|_| true)
};


let transactions =
block.transactions.clone().into_iter().filter(from_filter).filter(to_filter).collect();
block.transactions.clone().into_iter().filter(from_filter).filter(to_filter).filter(addr_filter).collect();

// 2. collect receipts if necessary
// if transactions are filtered fetch by set of transaction hashes, else fetch all receipts
// in block
let receipts: Vec<Option<_>> =
if schema.has_column("gas_used") | schema.has_column("success") {
// receipts required
let receipts = if request.from_address.is_some() || request.to_address.is_some() {
let receipts = if request.from_address.is_some() || request.to_address.is_some() || request.address.is_some() {
source.get_tx_receipts(&transactions).await?
} else {
source.get_tx_receipts_in_block(&block).await?
Expand All @@ -108,8 +143,8 @@ impl CollectByBlock for Transactions {
vec![None; block.transactions.len()]
};

let transactions_with_receips = transactions.into_iter().zip(receipts).collect();
Ok((block, transactions_with_receips, query.exclude_failed))
let transactions_with_receipts = transactions.into_iter().zip(receipts).collect();
Ok((block, transactions_with_receipts, query.exclude_failed))
}

fn transform(response: Self::Response, columns: &mut Self, query: &Arc<Query>) -> R<()> {
Expand Down
5 changes: 5 additions & 0 deletions crates/freeze/src/freeze.rs
davidthegardens marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ async fn freeze_partition(payload: PartitionPayload) -> Result<(), CollectError>

// write dataframes to disk
for (datatype, mut df) in dfs {
if env.skip_empty==Some(true){
if df.shape().0 == 0 {
continue;
};
};
let path = paths.get(&datatype).ok_or_else(|| {
CollectError::CollectError("could not get path for datatype".to_string())
})?;
Expand Down
10 changes: 10 additions & 0 deletions crates/freeze/src/types/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ pub struct ExecutionEnv {
pub t_end: Option<SystemTime>,
/// report directory
pub report_dir: Option<PathBuf>,
/// skips writing empty dfs
pub skip_empty: Option<bool>,
}

impl ExecutionEnv {
Expand Down Expand Up @@ -67,6 +69,7 @@ pub struct ExecutionEnvBuilder {
t_start: SystemTime,
t_end: Option<SystemTime>,
report_dir: Option<PathBuf>,
skip_empty: Option<bool>,
}

impl Default for ExecutionEnvBuilder {
Expand All @@ -82,6 +85,7 @@ impl Default for ExecutionEnvBuilder {
t_start: SystemTime::now(),
t_end: None,
report_dir: None,
skip_empty: Some(false)
}
}
}
Expand Down Expand Up @@ -134,6 +138,11 @@ impl ExecutionEnvBuilder {
self
}

/// skip_empty
pub fn skip_empty(mut self, skip_empty: Option<bool>) -> Self {
self.skip_empty = skip_empty;
self
}
/// build final output
pub fn build(self) -> ExecutionEnv {
ExecutionEnv {
Expand All @@ -147,6 +156,7 @@ impl ExecutionEnvBuilder {
t_start: self.t_start,
t_end: self.t_end,
report_dir: self.report_dir,
skip_empty: self.skip_empty,
}
}
}
1 change: 1 addition & 0 deletions crates/python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ build-backend = "maturin"
[project]
name = "cryo_python"
requires-python = ">=3.7"
dependencies = ["pandas==2.1.3","polars==0.19.19","pyarrow==14.0.1"]
classifiers = [
"Programming Language :: Rust",
"Programming Language :: Python :: Implementation :: CPython",
Expand Down
5 changes: 3 additions & 2 deletions crates/python/python/cryo/_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ def parse_cli_args(
else:
raise Exception('unknown file_format')

kwargs['no_verbose'] = not verbose

if 'no_verbose' not in kwargs.keys():
kwargs['no_verbose'] = not verbose

return kwargs

10 changes: 10 additions & 0 deletions crates/python/python/cryo/_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,14 @@ class CryoCliArgs(TypedDict, total=False):
topic3: str | bytes | None
inner_request_size: int | None
no_verbose: bool
address: typing.Sequence[str] | None
to_address: typing.Sequence[str] | None
from_address: typing.Sequence[str] | None
call_data: typing.Sequence[str] | None
function: typing.Sequence[str] | None
inputs: typing.Sequence[str] | None
slot: typing.Sequence[str] | None
event_signature: str | bytes | None
inner_request_size: int | None
js_tracer: str | bytes | None

3 changes: 3 additions & 0 deletions crates/python/rust/collect_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ use cryo_freeze::collect;
verbose = false,
no_verbose = false,
event_signature = None,
skip_empty = Some(false)
)
)]
#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -125,6 +126,7 @@ pub fn _collect(
verbose: bool,
no_verbose: bool,
event_signature: Option<String>,
skip_empty: Option<bool>
) -> PyResult<&PyAny> {
if let Some(command) = command {
pyo3_asyncio::tokio::future_into_py(py, async move {
Expand Down Expand Up @@ -190,6 +192,7 @@ pub fn _collect(
verbose,
no_verbose,
event_signature,
skip_empty,
};
pyo3_asyncio::tokio::future_into_py(py, async move {
match run_collect(args).await {
Expand Down
3 changes: 3 additions & 0 deletions crates/python/rust/freeze_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ use cryo_cli::{run, Args};
verbose = false,
no_verbose = false,
event_signature = None,
skip_empty = Some(False),
)
)]
#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -122,6 +123,7 @@ pub fn _freeze(
verbose: bool,
no_verbose: bool,
event_signature: Option<String>,
skip_empty: Option<bool>
) -> PyResult<&PyAny> {
if let Some(command) = command {
freeze_command(py, command)
Expand Down Expand Up @@ -182,6 +184,7 @@ pub fn _freeze(
verbose,
no_verbose,
event_signature,
skip_empty,
};

pyo3_asyncio::tokio::future_into_py(py, async move {
Expand Down