Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support bench vote using quic #4298

Merged
merged 15 commits into from
Jan 8, 2025
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion bench-vote/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ edition = { workspace = true }

[dependencies]
bincode = { workspace = true }
clap = { version = "3.1.5", features = ["cargo"] }
clap = { workspace = true }
crossbeam-channel = { workspace = true }
solana-clap-utils = { workspace = true }
solana-client = { workspace = true }
solana-connection-cache = { workspace = true }
solana-logger = { workspace = true }
solana-net-utils = { workspace = true }
solana-sdk = { workspace = true }
solana-streamer = { workspace = true }
Expand Down
175 changes: 137 additions & 38 deletions bench-vote/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,29 @@
#![allow(clippy::arithmetic_side_effects)]

use {
clap::{crate_description, crate_name, Arg, Command},
clap::{crate_description, crate_name, value_t, value_t_or_exit, App, Arg},
crossbeam_channel::unbounded,
solana_clap_utils::{input_parsers::keypair_of, input_validators::is_keypair_or_ask_keyword},
solana_client::connection_cache::ConnectionCache,
solana_connection_cache::client_connection::ClientConnection,
solana_net_utils::{bind_to_unspecified, SocketConfig},
solana_sdk::{
hash::Hash, message::Message, signature::Keypair, signer::Signer, transaction::Transaction,
hash::Hash, message::Message, pubkey::Pubkey, signature::Keypair, signer::Signer,
transaction::Transaction,
},
solana_streamer::{
packet::PacketBatchRecycler,
streamer::{receiver, PacketBatchReceiver, StreamerReceiveStats},
quic::{spawn_server_multi, QuicServerParams},
streamer::{receiver, PacketBatchReceiver, StakedNodes, StreamerReceiveStats},
},
solana_vote_program::{vote_instruction, vote_state::Vote},
std::{
cmp::max,
collections::HashMap,
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
Arc, RwLock,
},
thread::{self, spawn, JoinHandle, Result},
time::{Duration, Instant, SystemTime},
Expand Down Expand Up @@ -57,82 +61,124 @@ fn sink(
const TRANSACTIONS_PER_THREAD: u64 = 1_000_000; // Number of transactions per thread

fn main() -> Result<()> {
let matches = Command::new(crate_name!())
let matches = App::new(crate_name!())
.about(crate_description!())
.version(solana_version::version!())
.arg(
Arg::new("num-recv-sockets")
Arg::with_name("identity")
.short("i")
.long("identity")
.value_name("KEYPAIR")
.takes_value(true)
.validator(is_keypair_or_ask_keyword)
.help("Identity keypair for the QUIC endpoint when '--use-quic' is set true. If it is not specified a dynamic key is created."),
)
.arg(
Arg::with_name("num-recv-sockets")
.long("num-recv-sockets")
.value_name("NUM")
.takes_value(true)
.help("Use NUM receive sockets"),
)
.arg(
Arg::new("num-producers")
Arg::with_name("num-producers")
.long("num-producers")
.value_name("NUM")
.takes_value(true)
.help("Use this many producer threads."),
)
.arg(
Arg::new("server-only")
Arg::with_name("server-only")
.long("server-only")
.takes_value(false)
.help("Run the bench tool as a server only."),
)
.arg(
Arg::new("client-only")
Arg::with_name("client-only")
.long("client-only")
.takes_value(false)
.requires("server-address")
.help("Run the bench tool as a client only."),
)
.arg(
Arg::with_name("server-address")
.short('n')
.short("n")
.long("server-address")
.value_name("HOST:PORT")
.takes_value(true)
.validator(|arg| solana_net_utils::is_host_port(arg.to_string()))
.help("The destination streamer address to which the client will send transactions to"),
)
.arg(
Arg::new("use-connection-cache")
Arg::with_name("use-connection-cache")
.long("use-connection-cache")
.takes_value(false)
.help("Use this many producer threads."),
)
.arg(
Arg::new("verbose")
Arg::with_name("verbose")
.long("verbose")
.takes_value(false)
.help("Show verbose messages."),
)
.arg(
Arg::with_name("use-quic")
.long("use-quic")
.value_name("Boolean")
.takes_value(true)
.default_value("false")
.help("Controls if to use QUIC for sending/receiving vote transactions."),
)
.get_matches();

solana_logger::setup();

let mut num_sockets = 1usize;
if let Some(n) = matches.value_of("num-recv-sockets") {
num_sockets = max(num_sockets, n.to_string().parse().expect("integer"));
}

let num_producers: u64 = matches.value_of_t("num-producers").unwrap_or(4);

let vote_use_quic = value_t_or_exit!(matches, "use-quic", bool);
let num_producers: u64 = value_t!(matches, "num-producers", u64).unwrap_or(4);
let use_connection_cache = matches.is_present("use-connection-cache");

let server_only = matches.is_present("server-only");
let client_only = matches.is_present("client-only");
let verbose = matches.is_present("verbose");

let destination = matches.is_present("server-address").then(|| {
let addr = matches
.value_of("server-address")
.expect("Destination must be set when --client-only is used");
.expect("Server address must be set when --client-only is used");
solana_net_utils::parse_host_port(addr).expect("Expecting a valid server address")
});

let port = destination.map_or(0, |addr| addr.port());
let ip_addr = destination.map_or(IpAddr::V4(Ipv4Addr::UNSPECIFIED), |addr| addr.ip());

let quic_params = vote_use_quic.then(|| {
let identity_keypair = keypair_of(&matches, "identity").or_else(|| {
println!("--identity is not specified when --use-quic is on. Will generate a key dynamically.");
Some(Keypair::new())
}).unwrap();

let stake: u64 = 1024;
let total_stake: u64 = 1024;

let stakes = HashMap::from([
(identity_keypair.pubkey(), stake),
(Pubkey::new_unique(), total_stake.saturating_sub(stake)),
]);
let staked_nodes: Arc<RwLock<StakedNodes>> = Arc::new(RwLock::new(StakedNodes::new(
Arc::new(stakes),
HashMap::<Pubkey, u64>::default(), // overrides
)));

QuicParams {
identity_keypair,
staked_nodes
}
});

let (exit, read_threads, sink_threads, destination) = if !client_only {
let exit = Arc::new(AtomicBool::new(false));

Expand All @@ -147,24 +193,48 @@ fn main() -> Result<()> {
num_sockets,
)
.unwrap();
let stats = Arc::new(StreamerReceiveStats::new("bench-streamer-test"));
for read in read_sockets {
read.set_read_timeout(Some(SOCKET_RECEIVE_TIMEOUT)).unwrap();

let stats = Arc::new(StreamerReceiveStats::new("bench-vote-test"));

if let Some(quic_params) = &quic_params {
let quic_server_params = QuicServerParams {
max_connections_per_ipaddr_per_min: 1024,
max_connections_per_peer: 1024,
..Default::default()
};
let (s_reader, r_reader) = unbounded();
read_channels.push(r_reader);
read_threads.push(receiver(
"solRcvrBenStrmr".to_string(),
Arc::new(read),
exit.clone(),

let server = spawn_server_multi(
"solRcvrBenVote",
"bench_vote_metrics",
read_sockets,
&quic_params.identity_keypair,
s_reader,
recycler.clone(),
stats.clone(),
COALESCE_TIME, // coalesce
true, // use_pinned_memory
None, // in_vote_only_mode
false, // is_staked_service
));
exit.clone(),
quic_params.staked_nodes.clone(),
quic_server_params,
)
.unwrap();
read_threads.push(server.thread);
} else {
for read in read_sockets {
read.set_read_timeout(Some(SOCKET_RECEIVE_TIMEOUT)).unwrap();

let (s_reader, r_reader) = unbounded();
read_channels.push(r_reader);
read_threads.push(receiver(
"solRcvrBenVote".to_string(),
Arc::new(read),
exit.clone(),
s_reader,
recycler.clone(),
stats.clone(),
COALESCE_TIME, // coalesce
true, // use_pinned_memory
None, // in_vote_only_mode
false, // is_staked_service
));
}
}

let received_size = Arc::new(AtomicUsize::new(0));
Expand All @@ -187,8 +257,15 @@ fn main() -> Result<()> {

let start = SystemTime::now();

let producer_threads =
(!server_only).then(|| producer(destination, num_producers, use_connection_cache, verbose));
let producer_threads = (!server_only).then(|| {
producer(
destination,
num_producers,
use_connection_cache,
verbose,
quic_params,
)
});

producer_threads
.into_iter()
Expand Down Expand Up @@ -231,18 +308,40 @@ enum Transporter {
DirectSocket(Arc<UdpSocket>),
}

struct QuicParams {
identity_keypair: Keypair,
staked_nodes: Arc<RwLock<StakedNodes>>,
}

fn producer(
sock: SocketAddr,
num_producers: u64,
use_connection_cache: bool,
verbose: bool,
quic_params: Option<QuicParams>,
) -> Vec<JoinHandle<()>> {
println!("Running clients against {sock:?}");
let transporter = if use_connection_cache {
Transporter::Cache(Arc::new(ConnectionCache::with_udp(
"connection_cache_vote_udp",
1, // connection_pool_size
)))
let transporter = if use_connection_cache || quic_params.is_some() {
if let Some(quic_params) = &quic_params {
Transporter::Cache(Arc::new(ConnectionCache::new_with_client_options(
"connection_cache_vote_quic",
256, // connection_pool_size
None, // client_endpoint
Some((
&quic_params.identity_keypair,
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
)),
Some((
&quic_params.staked_nodes,
&quic_params.identity_keypair.pubkey(),
)),
)))
} else {
Transporter::Cache(Arc::new(ConnectionCache::with_udp(
"connection_cache_vote_udp",
1, // connection_pool_size
)))
}
} else {
Transporter::DirectSocket(Arc::new(bind_to_unspecified().unwrap()))
};
Expand Down
1 change: 1 addition & 0 deletions quic-client/src/nonblocking/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ impl QuicLazyInitializedEndpoint {
)
.expect("QuicLazyInitializedEndpoint::create_endpoint bind_in_range")
.1;
info!("Local endpoint is : {client_socket:?}");

QuicNewConnection::create_endpoint(EndpointConfig::default(), client_socket)
};
Expand Down
Loading