diff --git a/Cargo.lock b/Cargo.lock index c0db0194..5c1b477f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -38,6 +38,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "base64" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" + [[package]] name = "bigdecimal" version = "0.3.0" @@ -249,6 +255,7 @@ dependencies = [ "prost 0.11.3", "substreams 0.3.2", "substreams-common", + "substreams-entity-change", "substreams-ethereum", "substreams-helper", ] @@ -988,6 +995,27 @@ dependencies = [ "wee_alloc", ] +[[package]] +name = "substreams" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0af4a883f8ce09c8b5c2560c2373d230b6fce37972833f96151c0a0469b2c3b4" +dependencies = [ + "anyhow", + "bigdecimal", + "hex", + "hex-literal", + "num-bigint", + "num-traits", + "pad", + "prost 0.11.3", + "prost-build 0.11.1", + "prost-types 0.11.1", + "substreams-macro 0.5.6", + "thiserror", + "wee_alloc", +] + [[package]] name = "substreams-common" version = "0.1.0" @@ -999,6 +1027,18 @@ dependencies = [ "thiserror", ] +[[package]] +name = "substreams-entity-change" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c81fb0794efb438c5743c821ec76528c38878010c30d4a79d70799eac68837f6" +dependencies = [ + "base64", + "prost 0.11.3", + "prost-types 0.11.1", + "substreams 0.5.6", +] + [[package]] name = "substreams-erc20-holdings" version = "0.1.0" @@ -1143,6 +1183,18 @@ dependencies = [ "thiserror", ] +[[package]] +name = "substreams-macro" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f70ab0d9112fa637daad274d1163277d292600e19ac5edb9219cb063110970a" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "thiserror", +] + [[package]] name = "substreams-solana" version = "0.1.0" diff --git a/README.md b/README.md index 8704de8e..263261ab 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,12 @@ -# Messari Substreams +# Messari Substreams • [![GitHub license](https://img.shields.io/badge/license-MIT-blue)](https://github.com/messari/substreams/blob/master/LICENSE) [![PRs Welcome](https://img.shields.io/badge/PRs-welcome-brightgreen.svg)](https://github.com/messari/substreams/compare) [![Issues Report](https://img.shields.io/badge/issues-report-yellow.svg)](https://github.com/messari/substreams/issues/new) + +

+ Messari Logo +

+ +Messari substreams aim to contextualize on chain data using [streamingfast](https://www.streamingfast.io/)'s [substreams](https://substreams.streamingfast.io/) 🚀 + +> Learn more about messari substream development in our [`./docs`](./docs) ## Pre-requisites diff --git a/common/README.md b/common/README.md new file mode 100644 index 00000000..b8c18fb6 --- /dev/null +++ b/common/README.md @@ -0,0 +1,17 @@ +# Common Definitions + +This folder contains common definitions for protobufs used more than once. We intentionally have this in order to standardize the way we define the data to make it easier for the end user to "plug-n-play". + +We do NOT want to "over-standardize" on a substream level. Substreams works with raw, lower-level data. We do not want to lose any descriptiveness at this layer, therefore it is important to not generalize any data to try and fit a standard. + +## Common Proto + +TODO + +## DEX AMM Proto + +TODO + +## EVM Token Proto + +This proto is designed to support fungible tokens on evm chains. In order to preserve the data from each chain we have separate definitions for tokens across chain implementations. diff --git a/common/proto/erc20.proto b/common/proto/erc20.proto index 2c0b99cf..da0e6d9f 100644 --- a/common/proto/erc20.proto +++ b/common/proto/erc20.proto @@ -11,6 +11,8 @@ message ERC20Token { string name = 2; string symbol = 3; uint64 decimals = 4; + string tx_created = 5; + uint64 block_created = 6; } message TransferEvents { @@ -19,20 +21,26 @@ message TransferEvents { message TransferEvent { string tx_hash = 1; - uint32 log_index = 2; - uint64 log_ordinal = 3; - string token_address = 4; - string from = 5; - string to = 6; - string amount = 7; // BigInt, in token's native amount + uint64 block_number = 2; + uint64 timestamp = 3; + uint32 log_index = 4; + optional uint64 log_ordinal = 5; + string token_address = 6; + string from = 7; + string to = 8; + string amount = 9; // BigInt, in token's native amount + repeated TokenBalance balance_changes = 10; } message TokenBalance { - string token_address = 1; - string balance = 2; // BigInt, in token's native amount + optional uint64 log_ordinal = 1; + ERC20Token token = 2; + string address = 3; // account address of the balance change + optional string old_balance = 4; // BigInt, in token's native amount + string new_balance = 5; // BigInt, in token's native amount + optional int32 reason = 6; } -message Account { - string address = 1; - repeated TokenBalance balances = 2; +message TokenBalances { + TokenBalance items = 1; } diff --git a/common/src/codegen.rs b/common/src/codegen.rs index fdf2549f..249b0865 100644 --- a/common/src/codegen.rs +++ b/common/src/codegen.rs @@ -101,6 +101,9 @@ pub fn generate_pb(out_dir: Option<&str>) -> Result<(), Error> { // parse version from file name let filename = file.split('.').collect::>(); // let package_name = filename[0]; + if filename.len() < 2 { + continue; + } let name = filename[1].to_string(); let version = filename[2]; pb_files_hash @@ -146,15 +149,20 @@ pub fn generate_pb(out_dir: Option<&str>) -> Result<(), Error> { let pb_file_content = pb_files .into_iter() .map(|(filename, versions)| { + let mut name = "messari".to_string(); + if filename == "entity".to_string() { + name = "substreams".to_string(); + } let (mod_content, registration_content): (Vec, Vec) = versions .into_iter() .map(|version| { ( + format!( "#[rustfmt::skip]\n\ - #[path = \"../{}/pb/messari.{}.{}.rs\"]\n\ - pub(in crate::pb) mod {1}_{2};\n", - out_dir, filename, version + #[path = \"../{}/pb/{}.{}.{}.rs\"]\n\ + pub(in crate::pb) mod {2}_{3};\n", + out_dir, name, filename, version ), format!( " pub mod {} {{\n \ diff --git a/compound-v2/src/pb.rs b/compound-v2/src/pb.rs index 963d1415..b6d84e80 100644 --- a/compound-v2/src/pb.rs +++ b/compound-v2/src/pb.rs @@ -1,9 +1,9 @@ #[rustfmt::skip] -#[path = "../target/pb/messari.compound.v1.rs"] -pub(in crate::pb) mod compound_v1; +#[path = "../target/pb/messari.erc721.v1.rs"] +pub(in crate::pb) mod erc721_v1; -pub mod compound { +pub mod erc721 { pub mod v1 { - pub use super::super::compound_v1::*; + pub use super::super::erc721_v1::*; } } diff --git a/docs/DECODING.md b/docs/DECODING.md new file mode 100644 index 00000000..fd36e2f2 --- /dev/null +++ b/docs/DECODING.md @@ -0,0 +1,3 @@ +# Decoding Data + +Firehose delivers data in a very raw format and in order to make use of the data we need to be able to decode it. This file will go over examples of where/how you might need to do this. diff --git a/docs/STANDARDS.md b/docs/STANDARDS.md new file mode 100644 index 00000000..b0d9e2e7 --- /dev/null +++ b/docs/STANDARDS.md @@ -0,0 +1,9 @@ +# Standards + +Standardizing where it makes sense and early pays dividends in terms of developer time in the long run. This file will explore where we standardize and why those decisions were made. + +## Protobuf Definitions + +Where the same data structures are used multiple times, it makes sense to make a [common](../common) protobuf definition. It shows the user this definition can be found in multiple places. It also makes the code base cleaner. You can find these under [`../common/proto`](../common/proto). + +> You can learn more details about this [here](../common/README.md) diff --git a/docs/images/logos/messari-logo.png b/docs/images/logos/messari-logo.png new file mode 100644 index 00000000..0ffc2e3f Binary files /dev/null and b/docs/images/logos/messari-logo.png differ diff --git a/erc20-holdings/Makefile b/erc20-holdings/Makefile index a5bf0781..150a87ed 100644 --- a/erc20-holdings/Makefile +++ b/erc20-holdings/Makefile @@ -6,4 +6,4 @@ build: .PHONY: run run: - substreams run -e mainnet.eth.streamingfast.io:443 substreams.yaml map_block_to_erc20_contracts -s 1 + substreams run -e mainnet.eth.streamingfast.io:443 substreams.yaml map_block_to_erc20_contracts -s 6082465 -t +1 diff --git a/erc20-holdings/README.md b/erc20-holdings/README.md new file mode 100644 index 00000000..7fd342ac --- /dev/null +++ b/erc20-holdings/README.md @@ -0,0 +1,8 @@ +# ERC20 Holdings + +This substream will find all of the ERC20 tokens on ethereum with metadata. In addition it will map transfers. Balance can be derived from this. Then the `store_balance_usd` module combines chainlink data to store the prices as well. + +## Notes + +- The `map_block_to_erc20_contracts` module should be a `store` module since multiple other modules will want to use it as input to get ERC20 token metadata. +- `map_block_to_erc20_contracts` gets ERC20 metadata some of the time. It should grab all ERC20 contracts, however the metadata is not always populated. I am not quite sure why this is. diff --git a/erc20-holdings/src/lib.rs b/erc20-holdings/src/lib.rs index 6419a124..32644c46 100644 --- a/erc20-holdings/src/lib.rs +++ b/erc20-holdings/src/lib.rs @@ -4,8 +4,8 @@ pub mod abi; pub mod pb; mod keyer; +mod rpc; -use pb::common::v1 as common; use pb::erc20::v1 as erc20; use pb::erc20_price::v1::Erc20Price; use std::str::FromStr; @@ -25,7 +25,9 @@ use substreams_ethereum::{pb::eth as pbeth, Event, NULL_ADDRESS}; use substreams_helper::keyer::chainlink_asset_key; use substreams_helper::types::Address; -fn contract_bytecode_len(call: &pbeth::v2::Call) -> usize { +const INITIALIZE_METHOD_HASH: [u8; 4] = hex!("1459457a"); + +fn code_len(call: &pbeth::v2::Call) -> usize { let mut len = 0; for code_change in &call.code_changes { len += code_change.new_code.len() @@ -34,34 +36,87 @@ fn contract_bytecode_len(call: &pbeth::v2::Call) -> usize { len } +// TODO: this should be a store module /// Extracts erc20 contract deployments from the blocks #[substreams::handlers::map] fn map_block_to_erc20_contracts( block: pbeth::v2::Block, -) -> Result { - let mut erc20_contracts = common::Addresses { items: vec![] }; - - for call_view in block.calls() { - let call = call_view.call; - if call.call_type == pbeth::v2::CallType::Create as i32 { - // skipping contracts that are too short to be an erc20 token - if contract_bytecode_len(call) < 150 { - continue; - } +) -> Result { + let mut erc20_tokens = erc20::Erc20Tokens { items: vec![] }; - let address = Hex(call.address.clone()).to_string(); - - // check if contract is an erc20 token - if substreams_helper::erc20::get_erc20_token(address.clone()).is_none() { + for tx in block.transaction_traces { + for call in tx.calls { + if call.state_reverted { continue; } - log::info!("Create {}, len {}", address, contract_bytecode_len(call)); - erc20_contracts.items.push(common::Address { address }); + if call.call_type == pbeth::v2::CallType::Create as i32 + || call.call_type == pbeth::v2::CallType::Call as i32 + // proxy contract creation + { + let call_input_len = call.input.len(); + if call.call_type == pbeth::v2::CallType::Call as i32 + && (call_input_len < 4 || call.input[0..4] != INITIALIZE_METHOD_HASH) + { + // this will check if a proxy contract has been called to create a ERC20 contract. + // if that is the case the Proxy contract will call the initialize function on the ERC20 contract + // this is part of the OpenZeppelin Proxy contract standard + continue; + } + + // Contract creation not from proxy contract + if call.call_type == pbeth::v2::CallType::Create as i32 { + let code_change_len = code_len(&call); + + if code_change_len <= 150 { + // skipping contracts with less than 150 bytes of code + log::info!( + "Skipping contract: {}. Contract code is less than 150 bytes.", + Hex::encode(&call.address) + ); + continue; + } + } + + log::info!( + "Attempting to get metadata for erc20: {}", + Hex::encode(&call.address) + ); + + let decimals: u64; + let decimal_result = rpc::get_erc20_decimals(&call.address); + match decimal_result { + Ok(_decimals) => decimals = _decimals, + Err(_e) => continue, + }; + + let symbol: String; + let symbaol_result = rpc::get_erc20_symbol(&call.address); + match symbaol_result { + Ok(_symbol) => symbol = _symbol, + Err(_e) => continue, + }; + + let name: String; + let name_result = rpc::get_erc20_name(&call.address); + match name_result { + Ok(_name) => name = _name, + Err(_e) => continue, + }; + + erc20_tokens.items.push(erc20::Erc20Token { + address: Hex::encode(call.address.clone()), + name: name, + symbol: symbol, + decimals: decimals, + tx_created: Hex::encode(&tx.hash), + block_created: block.number, + }); + } } } - Ok(erc20_contracts) + Ok(erc20_tokens) } /// Extracts transfer events from the blocks @@ -81,13 +136,23 @@ fn map_block_to_transfers( } transfer_events.items.push(erc20::TransferEvent { - tx_hash: Hex(log.receipt.transaction.clone().hash).to_string(), + tx_hash: Hex::encode(log.receipt.transaction.clone().hash), + block_number: block.number, + timestamp: block + .header + .as_ref() + .unwrap() + .timestamp + .as_ref() + .unwrap() + .seconds as u64, log_index: log.index(), - log_ordinal: log.ordinal(), - token_address: Hex(log.address()).to_string(), - from: Hex(event.from).to_string(), - to: Hex(event.to).to_string(), + log_ordinal: Some(log.ordinal()), + token_address: Hex::encode(log.address()), + from: Hex::encode(event.from), + to: Hex::encode(event.to), amount: event.value.to_string(), + balance_changes: vec![], }) } } @@ -100,7 +165,7 @@ fn store_transfers(transfers: erc20::TransferEvents, output: store::StoreSetRaw) log::info!("Stored events {}", transfers.items.len()); for transfer in transfers.items { output.set( - transfer.log_ordinal, + transfer.log_ordinal.unwrap(), Hex::encode(&transfer.token_address), &proto::encode(&transfer).unwrap(), ); @@ -112,14 +177,14 @@ fn store_balance(transfers: erc20::TransferEvents, output: store::StoreAddBigInt log::info!("Stored events {}", transfers.items.len()); for transfer in transfers.items { output.add( - transfer.log_ordinal, + transfer.log_ordinal.unwrap(), keyer::account_balance_key(&transfer.to), &BigInt::from_str(transfer.amount.as_str()).unwrap(), ); if Hex::decode(transfer.from.clone()).unwrap() != NULL_ADDRESS { output.add( - transfer.log_ordinal, + transfer.log_ordinal.unwrap(), keyer::account_balance_key(&transfer.from), &BigInt::from_str((transfer.amount).as_str()).unwrap().neg(), ); @@ -148,7 +213,7 @@ fn store_balance_usd( match balances.get_last(keyer::account_balance_key(&transfer.to)) { Some(balance) => output.set( - transfer.log_ordinal, + transfer.log_ordinal.unwrap(), keyer::account_balance_usd_key(&transfer.to), &(token_price.clone() * balance.to_decimal(token_decimals.into())), ), @@ -158,7 +223,7 @@ fn store_balance_usd( if Hex::decode(transfer.from.clone()).unwrap() != NULL_ADDRESS { match balances.get_last(keyer::account_balance_key(&transfer.from)) { Some(balance) => output.set( - transfer.log_ordinal, + transfer.log_ordinal.unwrap(), keyer::account_balance_usd_key(&transfer.from), &(token_price.clone() * balance.to_decimal(token_decimals.into())), ), diff --git a/erc20-holdings/src/rpc.rs b/erc20-holdings/src/rpc.rs new file mode 100644 index 00000000..43168d04 --- /dev/null +++ b/erc20-holdings/src/rpc.rs @@ -0,0 +1,78 @@ +use std::fmt::Error; +use substreams::{log, Hex}; +use substreams_ethereum::pb::eth; +use substreams_ethereum::rpc::eth_call; +use substreams_helper::utils::{read_string, read_uint32}; + +// Functions to attempt to get erc20 contract calls + +pub const DECIMALS: &str = "313ce567"; +pub const NAME: &str = "06fdde03"; +pub const SYMBOL: &str = "95d89b41"; + +pub fn get_erc20_decimals(call_addr: &Vec) -> Result { + let rpc_call_decimal = create_rpc_calls(call_addr, vec![DECIMALS]); + let rpc_responses_unmarshalled_decimal = eth_call(&rpc_call_decimal); + let response_decimal = rpc_responses_unmarshalled_decimal.responses; + if response_decimal[0].failed { + log::info!("Failed to get decimals"); + return Err(Error); + } + + let decoded_decimals = read_uint32(response_decimal[0].raw.as_ref()); + if decoded_decimals.is_err() { + log::info!("Failed to decode decimals"); + return Err(Error); + } + + return Ok(decoded_decimals.unwrap() as u64); +} + +pub fn get_erc20_symbol(call_addr: &Vec) -> Result { + let rpc_call_symbol = create_rpc_calls(call_addr, vec![SYMBOL]); + let rpc_responses_unmarshalled = eth_call(&rpc_call_symbol); + let responses = rpc_responses_unmarshalled.responses; + if responses[0].failed { + log::info!("Failed to get symbol"); + return Err(Error); + }; + + let decoded_symbol = read_string(responses[0].raw.as_ref()); + if decoded_symbol.is_err() { + log::info!("Failed to decode symbol"); + return Err(Error); + } + + return Ok(decoded_symbol.unwrap()); +} + +pub fn get_erc20_name(call_addr: &Vec) -> Result { + let rpc_call_name = create_rpc_calls(call_addr, vec![NAME]); + let rpc_responses_unmarshalled = eth_call(&rpc_call_name); + let responses = rpc_responses_unmarshalled.responses; + if responses[0].failed { + log::info!("Failed to get name"); + return Err(Error); + }; + + let decoded_name = read_string(responses[0].raw.as_ref()); + if decoded_name.is_err() { + log::info!("Failed to decode name"); + return Err(Error); + } + + return Ok(decoded_name.unwrap()); +} + +fn create_rpc_calls(addr: &Vec, method_signatures: Vec<&str>) -> eth::rpc::RpcCalls { + let mut rpc_calls = eth::rpc::RpcCalls { calls: vec![] }; + + for method_signature in method_signatures { + rpc_calls.calls.push(eth::rpc::RpcCall { + to_addr: Vec::from(addr.clone()), + data: Hex::decode(method_signature).unwrap(), + }) + } + + return rpc_calls; +} diff --git a/erc20-holdings/substreams.yaml b/erc20-holdings/substreams.yaml index 7353d0de..c72d7109 100644 --- a/erc20-holdings/substreams.yaml +++ b/erc20-holdings/substreams.yaml @@ -20,13 +20,14 @@ binaries: file: ../target/wasm32-unknown-unknown/release/substreams_erc20_holdings.wasm modules: + # This should be a store module since it is needed by multiple modules - name: map_block_to_erc20_contracts kind: map initialBlock: 1 inputs: - source: sf.ethereum.type.v2.Block output: - type: proto:messari.common.v1.Addresses + type: proto:messari.erc20.v1.ERC20Tokens - name: map_block_to_transfers kind: map diff --git a/erc20-market-cap/src/pb.rs b/erc20-market-cap/src/pb.rs index 9a8fb3eb..eb61bfbe 100644 --- a/erc20-market-cap/src/pb.rs +++ b/erc20-market-cap/src/pb.rs @@ -37,3 +37,13 @@ pub mod erc20_price { pub use super::super::erc20_price_v1::*; } } + +#[rustfmt::skip] +#[path = "../target/pb/messari.uniswap.v1.rs"] +pub(in crate::pb) mod uniswap_v1; + +pub mod uniswap { + pub mod v1 { + pub use super::super::uniswap_v1::*; + } +} diff --git a/erc20-price/src/modules/1_store_chainlink_aggregator.rs b/erc20-price/src/modules/1_store_chainlink_aggregator.rs index f8d8f0ab..8515aa3e 100644 --- a/erc20-price/src/modules/1_store_chainlink_aggregator.rs +++ b/erc20-price/src/modules/1_store_chainlink_aggregator.rs @@ -78,12 +78,16 @@ fn store_chainlink_aggregator(block: eth::Block, output: StoreSetProto Result { + let mut transfers = vec![]; -#[substreams::handlers::store] -fn store_balance(block: pbeth::v2::Block, output: store::StoreSetRaw) { for transaction in &block.transaction_traces { + let mut balance_changes = vec![]; for call in &transaction.calls { for balance_change in &call.balance_changes { - // TODO: replace this with substreams::scalar::BigInt once the wrapper is integrated - let new_value = balance_change - .new_value - .as_ref() - .map(|value| { - num_bigint::BigInt::from_bytes_be(num_bigint::Sign::Plus, &value.bytes) - .into() - }) - .unwrap_or(BigInt::zero()); - - output.set( - transaction.end_ordinal, - format!("Address:{}", Hex(&balance_change.address).to_string()), - &new_value.to_string(), - ) + balance_changes.push(proto::TokenBalance { + log_ordinal: Some(balance_change.ordinal), + token: token_helper::get_eth_token(), + address: Hex::encode(&balance_change.address), + old_balance: Some(token_helper::bigint_to_string( + balance_change.old_value.clone(), + )), + new_balance: token_helper::bigint_to_string(balance_change.new_value.clone()), + reason: Some(balance_change.reason), + }); } } + transfers.push(proto::TransferEvent { + tx_hash: Hex::encode(&transaction.hash), + block_number: block.number, + timestamp: block + .header + .as_ref() + .unwrap() + .timestamp + .as_ref() + .unwrap() + .seconds as u64, + log_index: transaction.index, + log_ordinal: None, + token_address: ETH_ADDRESS.to_string(), + from: Hex::encode(&transaction.from), + to: Hex::encode(&transaction.to), + amount: token_helper::bigint_to_string(transaction.value.clone()), + balance_changes: balance_changes, + }); + } + + Ok(proto::TransferEvents { items: transfers }) +} + +#[substreams::handlers::map] +fn map_entity_changes( + transfer_events: proto::TransferEvents, +) -> Result { + let mut transfer_enitites = vec![]; + let mut balance_change_entities = vec![]; + + for transfer in transfer_events.items { + // extract balance changes + for balance_change in transfer.balance_changes { + balance_change_entities.push(EntityChange { + entity: "TokenBalance".to_string(), + id: transfer.tx_hash.clone() + + transfer + .log_ordinal + .unwrap_or_default() + .to_string() + .as_str(), + ordinal: transfer.log_ordinal.unwrap_or_default(), + operation: EntityChangeOperation::Create.into(), + fields: vec![ + transfer.tx_hash.clone().to_field("transfer".to_string()), + transfer + .log_ordinal + .unwrap_or_default() + .to_field("logOrdinal".to_string()), + balance_change.address.to_field("account".to_string()), + balance_change + .old_balance + .unwrap() + .to_field("oldBalance".to_string()), + balance_change + .new_balance + .to_field("newBalance".to_string()), + balance_change + .reason + .unwrap() + .to_field("reason".to_string()), + ], + }); + } + + // map transfer entity changes + transfer_enitites.push(EntityChange { + entity: "Transfer".to_string(), + id: transfer.tx_hash.clone(), + ordinal: transfer.block_number, + operation: Operation::Create.into(), + fields: vec![ + transfer.block_number.to_field("blockNumber".to_string()), + transfer.timestamp.to_field("timestamp".to_string()), + transfer.log_index.to_string().to_field("logIndex".to_string()), + transfer + .log_ordinal + .unwrap_or_default() + .to_field("logOrdinal".to_string()), + transfer.token_address.to_field("tokenAddress".to_string()), + transfer.from.to_field("from".to_string()), + transfer.to.to_field("to".to_string()), + transfer.amount.to_field("amount".to_string()), + ], + }); } + + Ok(EntityChanges { + entity_changes: [transfer_enitites, balance_change_entities].concat(), + }) } diff --git a/eth-balance/src/pb.rs b/eth-balance/src/pb.rs index 21c620eb..bf50187b 100644 --- a/eth-balance/src/pb.rs +++ b/eth-balance/src/pb.rs @@ -1,9 +1,19 @@ #[rustfmt::skip] -#[path = "../target/pb/messari.eth_balance.v1.rs"] -pub(in crate::pb) mod eth_balance_v1; +#[path = "../target/pb/substreams.entity.v1.rs"] +pub(in crate::pb) mod entity_v1; -pub mod eth_balance { +pub mod entity { pub mod v1 { - pub use super::super::eth_balance_v1::*; + pub use super::super::entity_v1::*; + } +} + +#[rustfmt::skip] +#[path = "../target/pb/messari.erc20.v1.rs"] +pub(in crate::pb) mod erc20_v1; + +pub mod erc20 { + pub mod v1 { + pub use super::super::erc20_v1::*; } } diff --git a/eth-balance/subgraph/schema.graphql b/eth-balance/subgraph/schema.graphql new file mode 100644 index 00000000..7e0457ba --- /dev/null +++ b/eth-balance/subgraph/schema.graphql @@ -0,0 +1,40 @@ +type Transfer @entity { + " {transaction hash} " + id: Bytes! + + blockNumber: BigInt! + timestamp: BigInt! + logIndex: BigInt! + logOrdinal: BigInt + + " The address of the token being transferred " + tokenAddress: Bytes! + + " Address sending the tokens " + from: Bytes! + + " Address receiving the tokens " + to: Bytes! + + " Amount of tokens being transferred " + amount: BigInt! + + balanceChanges: [BalanceChange!]! @derivedFrom(field: "transfer") +} + +type BalanceChange @entity { + " {transaction hash}-{log ordinal} " + id: Bytes! + + " Related Transfer event ID (ie, transaction hash) " + transfer: Bytes! + + logOrdinal: BigInt + + account: Bytes! + + oldBalance: BigInt + newBalance: BigInt! + + reason: Int! +} \ No newline at end of file diff --git a/eth-balance/subgraph/subgraph.yaml b/eth-balance/subgraph/subgraph.yaml new file mode 100644 index 00000000..c016abdb --- /dev/null +++ b/eth-balance/subgraph/subgraph.yaml @@ -0,0 +1,15 @@ +specVersion: 0.0.4 +description: ETH Balance Change subgraph powered by substreams +schema: + file: schema.graphql +dataSources: + - kind: substreams + name: graph_network_substreams + network: mainnet + source: + package: + moduleName: map_entity_changes + file: ../eth-balance-v0.1.0.spkg + mapping: + kind: substreams/graph-entities + apiVersion: 0.0.5 diff --git a/eth-balance/substreams.yaml b/eth-balance/substreams.yaml index 2291509f..52778cd9 100644 --- a/eth-balance/substreams.yaml +++ b/eth-balance/substreams.yaml @@ -5,12 +5,14 @@ package: imports: eth: https://github.com/streamingfast/sf-ethereum/releases/download/v0.10.2/ethereum-v0.10.4.spkg + entity: https://github.com/streamingfast/substreams-entity-change/releases/download/v0.2.1/substreams-entity-change-v0.2.1.spkg protobuf: files: - - eth_balance.proto + - erc20.proto + - importPaths: - - proto/v1 + - ../common/proto binaries: default: @@ -18,9 +20,16 @@ binaries: file: "../target/wasm32-unknown-unknown/release/eth_balance.wasm" modules: - - name: store_balance - kind: store - updatePolicy: set - valueType: string + - name: map_balances + kind: map inputs: - source: sf.ethereum.type.v2.Block + output: + type: proto:messari.erc20.v1.TransferEvents + + - name: map_entity_changes + kind: map + inputs: + - map: map_balances + output: + type: proto:substreams.entity.v1.EntityChanges \ No newline at end of file diff --git a/substreams-helper/build.rs b/substreams-helper/build.rs index 0f1a2864..1667f817 100644 --- a/substreams-helper/build.rs +++ b/substreams-helper/build.rs @@ -2,6 +2,7 @@ use anyhow::{Ok, Result}; use substreams_common::codegen; fn main() -> Result<(), anyhow::Error> { - codegen::generate_abi(None)?; + codegen::generate(None)?; + Ok(()) } diff --git a/substreams-helper/src/lib.rs b/substreams-helper/src/lib.rs index b6cf1767..ad0b4954 100644 --- a/substreams-helper/src/lib.rs +++ b/substreams-helper/src/lib.rs @@ -2,6 +2,8 @@ pub mod abi; pub mod erc20; pub mod keyer; pub mod math; +pub mod pb; pub mod price; +pub mod token; pub mod types; pub mod utils; diff --git a/substreams-helper/src/pb.rs b/substreams-helper/src/pb.rs new file mode 100644 index 00000000..cd2c33a1 --- /dev/null +++ b/substreams-helper/src/pb.rs @@ -0,0 +1,9 @@ +#[rustfmt::skip] +#[path = "../target/pb/messari.erc20.v1.rs"] +pub(in crate::pb) mod erc20_v1; + +pub mod erc20 { + pub mod v1 { + pub use super::super::erc20_v1::*; + } +} diff --git a/substreams-helper/src/token.rs b/substreams-helper/src/token.rs new file mode 100644 index 00000000..b0d5e1a4 --- /dev/null +++ b/substreams-helper/src/token.rs @@ -0,0 +1,27 @@ +use crate::pb::erc20::v1::Erc20Token; +use num_bigint; +use substreams::scalar::BigInt; +use substreams_ethereum::pb::eth as pbeth; + +pub const ETH_ADDRESS: &str = "0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee"; + +pub fn get_eth_token() -> Option { + let eth_token = Erc20Token { + address: ETH_ADDRESS.to_string(), + name: "Ethereum".to_string(), + symbol: "ETH".to_string(), + decimals: 18_u64, + tx_created: "".to_string(), + block_created: 0_u64, + }; + + Some(eth_token) +} + +pub fn bigint_to_string(number: Option) -> String { + number + .as_ref() + .map(|value| num_bigint::BigInt::from_bytes_be(num_bigint::Sign::Plus, &value.bytes)) + .unwrap_or(BigInt::zero().into()) + .to_string() +} diff --git a/substreams-helper/substreams.yaml b/substreams-helper/substreams.yaml new file mode 100644 index 00000000..fd59c8ab --- /dev/null +++ b/substreams-helper/substreams.yaml @@ -0,0 +1,18 @@ +specVersion: v0.1.0 +package: + name: substreams_helper + version: v0.1.0 + +imports: + eth: https://github.com/streamingfast/sf-ethereum/releases/download/v0.10.2/ethereum-v0.10.4.spkg + +protobuf: + files: + - erc20.proto + importPaths: + - ../common/proto + +binaries: + default: + type: wasm/rust-v1 + file: "../target/wasm32-unknown-unknown/release/substreams_helper.wasm"