Skip to content

Commit

Permalink
integrate Mempool data structure
Browse files Browse the repository at this point in the history
  • Loading branch information
aditiharini committed Jan 7, 2025
1 parent 35e507c commit 3dd13a2
Show file tree
Hide file tree
Showing 10 changed files with 143 additions and 172 deletions.
86 changes: 34 additions & 52 deletions src/connectors/fname/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::collections::HashMap;

use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::time::{sleep, Duration};
use tokio::{
sync::mpsc,
time::{sleep, Duration},
};
use tracing::{debug, error, info, warn};

use crate::{
mempool::routing::MessageRouter,
proto::{FnameTransfer, UserNameProof, UserNameType, ValidatorMessage},
storage::store::engine::{MempoolMessage, Senders},
storage::store::engine::MempoolMessage,
};

#[derive(Clone, Debug, Deserialize, Serialize)]
Expand Down Expand Up @@ -77,25 +77,16 @@ pub struct Fetcher {
position: u64,
transfers: Vec<Transfer>,
cfg: Config,
shard_senders: HashMap<u32, Senders>,
message_router: Box<dyn MessageRouter>,
num_shards: u32,
mempool_tx: mpsc::Sender<MempoolMessage>,
}

impl Fetcher {
pub fn new(
cfg: Config,
shard_senders: HashMap<u32, Senders>,
num_shards: u32,
message_router: Box<dyn MessageRouter>,
) -> Self {
pub fn new(cfg: Config, mempool_tx: mpsc::Sender<MempoolMessage>) -> Self {
Fetcher {
position: cfg.start_from,
transfers: vec![],
cfg: cfg,
shard_senders,
num_shards,
message_router,
mempool_tx,
}
}

Expand Down Expand Up @@ -127,41 +118,32 @@ impl Fetcher {
self.position = t.id;
self.transfers.push(t.clone()); // Just store these for now, we'll use them later

let shard = self.message_router.route_message(t.to, self.num_shards);
let senders = self.shard_senders.get(&shard);
match senders {
None => {
error!(id = t.id, "Unable to find shard to send fname transfer to")
}
Some(senders) => {
let username_proof = UserNameProof {
timestamp: t.timestamp,
name: t.username.into_bytes(),
owner: t.owner.into_bytes(),
signature: t.server_signature.into_bytes(),
fid: t.to,
r#type: UserNameType::UsernameTypeFname as i32,
};
if let Err(err) = senders
.messages_tx
.send(MempoolMessage::ValidatorMessage(ValidatorMessage {
on_chain_event: None,
fname_transfer: Some(FnameTransfer {
id: t.to,
from_fid: t.from,
proof: Some(username_proof),
}),
}))
.await
{
error!(
from = t.from,
to = t.to,
err = err.to_string(),
"Unable to send fname transfer to mempool"
)
}
}
let username_proof = UserNameProof {
timestamp: t.timestamp,
name: t.username.into_bytes(),
owner: t.owner.into_bytes(),
signature: t.server_signature.into_bytes(),
fid: t.to,
r#type: UserNameType::UsernameTypeFname as i32,
};
if let Err(err) = self
.mempool_tx
.send(MempoolMessage::ValidatorMessage(ValidatorMessage {
on_chain_event: None,
fname_transfer: Some(FnameTransfer {
id: t.to,
from_fid: t.from,
proof: Some(username_proof),
}),
}))
.await
{
error!(
from = t.from,
to = t.to,
err = err.to_string(),
"Unable to send fname transfer to mempool"
)
}
}
}
Expand Down
60 changes: 20 additions & 40 deletions src/connectors/onchain_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ use foundry_common::ens::EnsError;
use futures_util::stream::StreamExt;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::sync::mpsc;
use tracing::{error, info};

use crate::{
mempool::routing::MessageRouter,
proto::{
on_chain_event, IdRegisterEventBody, IdRegisterEventType, OnChainEvent, OnChainEventType,
SignerEventBody, SignerEventType, SignerMigratedEventBody, StorageRentEventBody,
ValidatorMessage,
},
storage::store::engine::{MempoolMessage, Senders},
storage::store::engine::MempoolMessage,
};

sol!(
Expand Down Expand Up @@ -135,9 +135,7 @@ impl L1Client for RealL1Client {
pub struct Subscriber {
provider: RootProvider<Http<Client>>,
onchain_events_by_block: HashMap<u32, Vec<OnChainEvent>>,
shard_senders: HashMap<u32, Senders>,
message_router: Box<dyn MessageRouter>,
num_shards: u32,
mempool_tx: mpsc::Sender<MempoolMessage>,
start_block_number: u64,
stop_block_number: u64,
}
Expand All @@ -146,9 +144,7 @@ pub struct Subscriber {
impl Subscriber {
pub fn new(
config: Config,
shard_senders: HashMap<u32, Senders>,
num_shards: u32,
message_router: Box<dyn MessageRouter>,
mempool_tx: mpsc::Sender<MempoolMessage>,
) -> Result<Subscriber, SubscribeError> {
if config.rpc_url.is_empty() {
return Err(SubscribeError::EmptyRpcUrl);
Expand All @@ -158,9 +154,7 @@ impl Subscriber {
Ok(Subscriber {
provider,
onchain_events_by_block: HashMap::new(),
shard_senders,
num_shards,
message_router,
mempool_tx,
start_block_number: config.start_block_number,
stop_block_number: config.stop_block_number,
})
Expand Down Expand Up @@ -208,35 +202,21 @@ impl Subscriber {
}
Some(events) => events.push(event.clone()),
}
let shard = self.message_router.route_message(fid, self.num_shards);
let senders = self.shard_senders.get(&shard);
match senders {
None => {
error!(
block_number = event.block_number,
tx_hash = hex::encode(&event.transaction_hash),
log_index = event.log_index,
"Unable to find shard to send onchain event to"
)
}
Some(senders) => {
if let Err(err) = senders
.messages_tx
.send(MempoolMessage::ValidatorMessage(ValidatorMessage {
on_chain_event: Some(event.clone()),
fname_transfer: None,
}))
.await
{
error!(
block_number = event.block_number,
tx_hash = hex::encode(&event.transaction_hash),
log_index = event.log_index,
err = err.to_string(),
"Unable to send onchain event to mempool"
)
}
}
if let Err(err) = self
.mempool_tx
.send(MempoolMessage::ValidatorMessage(ValidatorMessage {
on_chain_event: Some(event.clone()),
fname_transfer: None,
}))
.await
{
error!(
block_number = event.block_number,
tx_hash = hex::encode(&event.transaction_hash),
log_index = event.log_index,
err = err.to_string(),
"Unable to send onchain event to mempool"
)
}
}

Expand Down
22 changes: 11 additions & 11 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use informalsystems_malachitebft_metrics::{Metrics, SharedRegistry};
use snapchain::connectors::onchain_events::{L1Client, RealL1Client};
use snapchain::consensus::consensus::SystemMessage;
use snapchain::core::types::proto;
use snapchain::mempool::mempool::Mempool;
use snapchain::mempool::routing;
use snapchain::network::admin_server::{DbManager, MyAdminService};
use snapchain::network::gossip::GossipEvent;
Expand Down Expand Up @@ -121,7 +122,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
let node = SnapchainNode::create(
keypair.clone(),
app_config.consensus.clone(),
app_config.mempool.clone(),
Some(app_config.rpc_address.clone()),
gossip_tx.clone(),
None,
Expand All @@ -132,19 +132,20 @@ async fn main() -> Result<(), Box<dyn Error>> {
)
.await;

let admin_service = MyAdminService::new(
db_manager,
node.shard_senders.clone(),
let (mempool_tx, mempool_rx) = mpsc::channel(app_config.mempool.queue_size as usize);
let mut mempool = Mempool::new(
mempool_rx,
app_config.consensus.num_shards,
Box::new(routing::ShardRouter {}),
node.shard_senders.clone(),
);
tokio::spawn(async move { mempool.run().await });

let admin_service = MyAdminService::new(db_manager, mempool_tx.clone());

if !app_config.fnames.disable {
let mut fetcher = snapchain::connectors::fname::Fetcher::new(
app_config.fnames.clone(),
node.shard_senders.clone(),
app_config.consensus.num_shards,
Box::new(routing::ShardRouter {}),
mempool_tx.clone(),
);

tokio::spawn(async move {
Expand All @@ -155,9 +156,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
if !app_config.onchain_events.rpc_url.is_empty() {
let mut onchain_events_subscriber = snapchain::connectors::onchain_events::Subscriber::new(
app_config.onchain_events,
node.shard_senders.clone(),
app_config.consensus.num_shards,
Box::new(routing::ShardRouter {}),
mempool_tx.clone(),
)?;
tokio::spawn(async move {
let result = onchain_events_subscriber.run(false).await;
Expand Down Expand Up @@ -186,6 +185,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
statsd_client.clone(),
app_config.consensus.num_shards,
Box::new(routing::ShardRouter {}),
mempool_tx.clone(),
l1_client,
);

Expand Down
48 changes: 48 additions & 0 deletions src/mempool/mempool.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
use std::collections::HashMap;

use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;

use crate::storage::store::engine::{MempoolMessage, Senders};

use super::routing::{MessageRouter, ShardRouter};
use tracing::error;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Config {
Expand All @@ -10,3 +18,43 @@ impl Default for Config {
Self { queue_size: 500 }
}
}

pub struct Mempool {
shard_senders: HashMap<u32, Senders>,
message_router: Box<dyn MessageRouter>,
num_shards: u32,
mempool_rx: mpsc::Receiver<MempoolMessage>,
}

impl Mempool {
pub fn new(
mempool_rx: mpsc::Receiver<MempoolMessage>,
num_shards: u32,
shard_senders: HashMap<u32, Senders>,
) -> Self {
Mempool {
shard_senders,
num_shards,
mempool_rx,
message_router: Box::new(ShardRouter {}),
}
}

pub async fn run(&mut self) {
while let Some(message) = self.mempool_rx.recv().await {
let fid = message.fid();
let shard = self.message_router.route_message(fid, self.num_shards);
let senders = self.shard_senders.get(&shard);
match senders {
None => {
error!("Unable to find shard to send message to")
}
Some(senders) => {
if let Err(err) = senders.messages_tx.send(message).await {
error!("Unable to send message to engine: {}", err.to_string())
}
}
}
}
}
}
Loading

0 comments on commit 3dd13a2

Please sign in to comment.