Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add empty chunk handling, importing python crate dependencies, tx filtering by address, support for new datatypes in python crate #147

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
4 changes: 4 additions & 0 deletions crates/cli/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@ pub struct Args {
/// Avoid saving a summary report
#[arg(long, help_heading = "Output Options")]
pub no_report: bool,

/// Skip if writing if dataframe is empty
davidthegardens marked this conversation as resolved.
Show resolved Hide resolved
#[arg(long, help_heading = "Acquisition Options")]
pub write_empty: Option<bool>,

/// Address(es)
#[arg(long, help_heading = "Dataset-specific Options", num_args(1..))]
Expand Down
1 change: 1 addition & 0 deletions crates/cli/src/parse/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub(crate) fn parse_execution_env(args: &Args, n_tasks: u64) -> Result<Execution
.verbose(verbose)
.report(!args.no_report)
.report_dir(args.report_dir.clone())
.write_empty(args.write_empty)
.args(args_str);

let builder = if !args.no_verbose {
Expand Down
2 changes: 1 addition & 1 deletion crates/freeze/src/datasets/address_appearances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,4 +331,4 @@ fn process_appearances(
}

Ok(())
}
}
2 changes: 1 addition & 1 deletion crates/freeze/src/datasets/contracts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,4 @@ pub(crate) fn process_contracts(
}
}
Ok(())
}
}
17 changes: 0 additions & 17 deletions crates/freeze/src/datasets/erc20_transfers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use polars::prelude::*;
pub struct Erc20Transfers {
n_rows: u64,
block_number: Vec<u32>,
block_hash: Vec<Option<Vec<u8>>>,
davidthegardens marked this conversation as resolved.
Show resolved Hide resolved
transaction_index: Vec<u32>,
log_index: Vec<u32>,
transaction_hash: Vec<Vec<u8>>,
Expand All @@ -21,21 +20,6 @@ pub struct Erc20Transfers {

#[async_trait::async_trait]
impl Dataset for Erc20Transfers {
fn default_columns() -> Option<Vec<&'static str>> {
Some(vec![
"block_number",
// "block_hash",
"transaction_index",
"log_index",
"transaction_hash",
"erc20",
"from_address",
"to_address",
"value",
"chain_id",
])
}

fn optional_parameters() -> Vec<Dim> {
vec![Dim::Address, Dim::Topic0, Dim::Topic1, Dim::Topic2, Dim::FromAddress, Dim::ToAddress]
}
Expand Down Expand Up @@ -104,7 +88,6 @@ fn process_erc20_transfers(logs: Vec<Log>, columns: &mut Erc20Transfers, schema:
{
columns.n_rows += 1;
store!(schema, columns, block_number, bn.as_u32());
store!(schema, columns, block_hash, log.block_hash.map(|bh| bh.as_bytes().to_vec()));
store!(schema, columns, transaction_index, ti.as_u32());
store!(schema, columns, log_index, li.as_u32());
store!(schema, columns, transaction_hash, tx.as_bytes().to_vec());
Expand Down
2 changes: 1 addition & 1 deletion crates/freeze/src/datasets/erc721_transfers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,4 @@ fn process_erc721_transfers(
}
}
Ok(())
}
}
2 changes: 1 addition & 1 deletion crates/freeze/src/datasets/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,4 @@ fn process_logs(logs: Vec<Log>, columns: &mut Logs, schema: &Table) -> R<()> {
}

Ok(())
}
}
2 changes: 1 addition & 1 deletion crates/freeze/src/datasets/native_transfers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,4 @@ pub(crate) fn process_native_transfers(
}
}
Ok(())
}
}
49 changes: 42 additions & 7 deletions crates/freeze/src/datasets/transactions.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
use crate::*;
use ethers::prelude::*;
use polars::prelude::*;
use std::collections::HashSet;
use std::sync::Mutex;
use lazy_static::lazy_static;

lazy_static! {
static ref TRANSACTIONS_SET: Mutex<HashSet<Vec<u8>>> = Mutex::new(HashSet::new());
}


/// columns for transactions
#[cryo_to_df::to_df(Datatype::Transactions)]
Expand All @@ -11,6 +19,7 @@ pub struct Transactions {
transaction_index: Vec<Option<u64>>,
transaction_hash: Vec<Vec<u8>>,
nonce: Vec<u64>,
address: Vec<Vec<u8>>,
from_address: Vec<Vec<u8>>,
to_address: Vec<Option<Vec<u8>>>,
value: Vec<U256>,
Expand Down Expand Up @@ -55,7 +64,8 @@ impl Dataset for Transactions {
}

fn optional_parameters() -> Vec<Dim> {
vec![Dim::FromAddress, Dim::ToAddress]
vec![Dim::Address,Dim::FromAddress, Dim::ToAddress]
//vec![Dim::FromAddress, Dim::ToAddress]
}
}

Expand All @@ -65,8 +75,10 @@ pub type TransactionAndReceipt = (Transaction, Option<TransactionReceipt>);
#[async_trait::async_trait]
impl CollectByBlock for Transactions {
type Response = (Block<Transaction>, Vec<TransactionAndReceipt>, bool);
//println!("impl");

async fn extract(request: Params, source: Arc<Source>, query: Arc<Query>) -> R<Self::Response> {

let block = source
.fetcher
.get_block_with_txs(request.block_number()?)
Expand All @@ -77,8 +89,10 @@ impl CollectByBlock for Transactions {
// 1. collect transactions and filter them if optional parameters are supplied
// filter by from_address
let from_filter: Box<dyn Fn(&Transaction) -> bool + Send> =
if let Some(from_address) = &request.from_address {
Box::new(move |tx| tx.from.as_bytes() == from_address)
if let Some(from_address) = &request.from_address {
Box::new(move |tx| {
from_address == tx.from.as_bytes()
})
} else {
Box::new(|_| true)
};
Expand All @@ -89,16 +103,37 @@ impl CollectByBlock for Transactions {
} else {
Box::new(|_| true)
};

// filter by address
let addr_filter: Box<dyn Fn(&Transaction) -> bool + Send> =
if let Some(address) = &request.address {
Box::new(move |tx| {
let mut transactions_set = TRANSACTIONS_SET.lock().unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the reason for using TRANSACTIONS_SET here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In cases where you prompt 2 or more addresses, anytime both the sender and receiver of the transaction are in --addresses, you will get duplicated transactions in the df. Originally, I used TRANSACTIONS_SET to prevent duplicates.

I've since made things more efficient and avoided global variables by making the list of addresses accessible.
This is how it works now:

from_bool = addresses.contains(from_address)
to_bool = addresses.contains(to_address)
if from_bool and to_bool:
    return from_address==current_address
else:
    return from_address==current_address or to_address==current_address

I'm unsure if the way I'm getting the list of addresses is the best way, but I couldn't figure out how else to do it. Currently, it gets the addresses by parsing 'cli_command' from ExecutionEnv.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, looking very clean now

additionally I think the request.addresses() will give the relevant info without need for get_addresses(). no need to parse cli arg data, request.addresses() will give you the list of addresses relevant to the current chunk (which might be different from all of the addresses, if youre chunking by address)

Copy link
Author

@davidthegardens davidthegardens Dec 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I can't seem to find .addresses(), in request. There is .address though, which only has one address :/
Since cryo filters transactions by each address separately, I'd need the complete list of addresses in addition to the address for the current chunk to prevent duplicates.

if transactions_set.contains(&tx.hash.as_bytes().to_vec()) {
false
} else {
let condition = tx.to.as_ref().map_or(false, |x| x.as_bytes() == address) ||
tx.from.as_bytes() == address;
if condition {
transactions_set.insert(tx.hash.as_bytes().to_vec());
}
condition
}
})
} else {
Box::new(|_| true)
};

let transactions =
block.transactions.clone().into_iter().filter(from_filter).filter(to_filter).collect();
block.transactions.clone().into_iter().filter(from_filter).filter(to_filter).filter(addr_filter).collect();

// 2. collect receipts if necessary
// if transactions are filtered fetch by set of transaction hashes, else fetch all receipts
// in block
let receipts: Vec<Option<_>> =
if schema.has_column("gas_used") | schema.has_column("success") {
// receipts required
let receipts = if request.from_address.is_some() || request.to_address.is_some() {
let receipts = if request.from_address.is_some() || request.to_address.is_some() || request.address.is_some() {
source.get_tx_receipts(&transactions).await?
} else {
source.get_tx_receipts_in_block(&block).await?
Expand All @@ -108,8 +143,8 @@ impl CollectByBlock for Transactions {
vec![None; block.transactions.len()]
};

let transactions_with_receips = transactions.into_iter().zip(receipts).collect();
Ok((block, transactions_with_receips, query.exclude_failed))
let transactions_with_receipts = transactions.into_iter().zip(receipts).collect();
Ok((block, transactions_with_receipts, query.exclude_failed))
}

fn transform(response: Self::Response, columns: &mut Self, query: &Arc<Query>) -> R<()> {
Expand Down
5 changes: 5 additions & 0 deletions crates/freeze/src/freeze.rs
davidthegardens marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ async fn freeze_partition(payload: PartitionPayload) -> Result<(), CollectError>

// write dataframes to disk
for (datatype, mut df) in dfs {
if Some(env.write_empty)==Some(Some(false)){
if df.shape().0 == 0 {
continue;
};
};
let path = paths.get(&datatype).ok_or_else(|| {
CollectError::CollectError("could not get path for datatype".to_string())
})?;
Expand Down
10 changes: 10 additions & 0 deletions crates/freeze/src/types/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ pub struct ExecutionEnv {
pub t_end: Option<SystemTime>,
/// report directory
pub report_dir: Option<PathBuf>,
/// skips writing empty dfs
pub write_empty: Option<bool>,
}

impl ExecutionEnv {
Expand Down Expand Up @@ -67,6 +69,7 @@ pub struct ExecutionEnvBuilder {
t_start: SystemTime,
t_end: Option<SystemTime>,
report_dir: Option<PathBuf>,
write_empty: Option<bool>,
}

impl Default for ExecutionEnvBuilder {
Expand All @@ -82,6 +85,7 @@ impl Default for ExecutionEnvBuilder {
t_start: SystemTime::now(),
t_end: None,
report_dir: None,
write_empty: Some(true)
}
}
}
Expand Down Expand Up @@ -134,6 +138,11 @@ impl ExecutionEnvBuilder {
self
}

/// write_empty
pub fn write_empty(mut self, write_empty: Option<bool>) -> Self {
self.write_empty = write_empty;
self
}
/// build final output
pub fn build(self) -> ExecutionEnv {
ExecutionEnv {
Expand All @@ -147,6 +156,7 @@ impl ExecutionEnvBuilder {
t_start: self.t_start,
t_end: self.t_end,
report_dir: self.report_dir,
write_empty: self.write_empty,
}
}
}
1 change: 1 addition & 0 deletions crates/python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ build-backend = "maturin"
[project]
name = "cryo_python"
requires-python = ">=3.7"
dependencies = ["maturin>=1.1,<2.0","pandas==2.1.3","polars==0.19.19","pyarrow==14.0.1"]
davidthegardens marked this conversation as resolved.
Show resolved Hide resolved
classifiers = [
"Programming Language :: Rust",
"Programming Language :: Python :: Implementation :: CPython",
Expand Down
5 changes: 3 additions & 2 deletions crates/python/python/cryo/_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ def parse_cli_args(
else:
raise Exception('unknown file_format')

kwargs['no_verbose'] = not verbose

if 'no_verbose' not in kwargs.keys():
kwargs['no_verbose'] = not verbose

return kwargs

10 changes: 10 additions & 0 deletions crates/python/python/cryo/_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,14 @@ class CryoCliArgs(TypedDict, total=False):
topic3: str | bytes | None
inner_request_size: int | None
no_verbose: bool
address: typing.Sequence[str] | None
to_address: typing.Sequence[str] | None
from_address: typing.Sequence[str] | None
call_data: typing.Sequence[str] | None
function: typing.Sequence[str] | None
inputs: typing.Sequence[str] | None
slot: typing.Sequence[str] | None
event_signature: str | bytes | None
inner_request_size: int | None
js_tracer: str | bytes | None

3 changes: 3 additions & 0 deletions crates/python/rust/collect_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ use cryo_freeze::collect;
verbose = false,
no_verbose = false,
event_signature = None,
write_empty = false
)
)]
#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -125,6 +126,7 @@ pub fn _collect(
verbose: bool,
no_verbose: bool,
event_signature: Option<String>,
write_empty: Option<bool>
) -> PyResult<&PyAny> {
if let Some(command) = command {
pyo3_asyncio::tokio::future_into_py(py, async move {
Expand Down Expand Up @@ -190,6 +192,7 @@ pub fn _collect(
verbose,
no_verbose,
event_signature,
write_empty,
};
pyo3_asyncio::tokio::future_into_py(py, async move {
match run_collect(args).await {
Expand Down
3 changes: 3 additions & 0 deletions crates/python/rust/freeze_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ use cryo_cli::{run, Args};
verbose = false,
no_verbose = false,
event_signature = None,
write_empty = None,
)
)]
#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -122,6 +123,7 @@ pub fn _freeze(
verbose: bool,
no_verbose: bool,
event_signature: Option<String>,
write_empty: Option<bool>
) -> PyResult<&PyAny> {
if let Some(command) = command {
freeze_command(py, command)
Expand Down Expand Up @@ -182,6 +184,7 @@ pub fn _freeze(
verbose,
no_verbose,
event_signature,
write_empty,
};

pyo3_asyncio::tokio::future_into_py(py, async move {
Expand Down