Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Apr 4, 2024
1 parent 2f63025 commit d9edb42
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 196 deletions.
62 changes: 33 additions & 29 deletions src/bin/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@

#![warn(clippy::all)]

use futures::stream::TryStreamExt as _;
use rand::distributions::Distribution;
use rand::Rng as _;
use std::cell::Cell;
use std::rc::Rc;
use tokio::net::ToSocketAddrs;
use toydb::client::Pool;
use toydb::error::{Error, Result};
use toydb::Client;

#[tokio::main]
async fn main() -> Result<()> {
Expand Down Expand Up @@ -65,7 +63,7 @@ async fn main() -> Result<()> {
.get_matches();

Bank::new(
args.get_many::<String>("host").unwrap().collect(),
args.get_many::<String>("host").unwrap().cloned().collect(),
*args.get_one::<u64>("concurrency").unwrap(),
*args.get_one("customers").unwrap(),
*args.get_one("accounts").unwrap(),
Expand All @@ -76,7 +74,8 @@ async fn main() -> Result<()> {
}

struct Bank {
clients: Pool,
addrs: Vec<String>,
concurrency: u64,
customers: u64,
customer_accounts: u64,
}
Expand All @@ -85,38 +84,46 @@ impl Bank {
const INITIAL_BALANCE: u64 = 100;

// Creates a new bank simulation.
async fn new<A: ToSocketAddrs + Clone>(
addrs: Vec<A>,
async fn new(
addrs: Vec<String>,
concurrency: u64,
customers: u64,
accounts: u64,
) -> Result<Self> {
Ok(Self {
clients: Pool::new(addrs, concurrency).await?,
customers,
customer_accounts: accounts,
})
Ok(Self { addrs, concurrency, customers, customer_accounts: accounts })
}

// Runs the bank simulation, making transfers between customer accounts.
async fn run(&self, transactions: u64) -> Result<()> {
self.setup().await?;
self.verify().await?;
let mut client = Client::new(&self.addrs[0]).await?;

self.setup(&mut client).await?;
self.verify(&mut client).await?;
println!();

let js = tokio::task::JoinSet::new();
let (transfers_tx, transfers_rx) = tokio::sync::mpsc::unbounded_channel();
for (i, addr) in self.addrs.iter().cycle().enumerate().take(self.concurrency as usize) {
let client = Client::new(addr).await?;
let transfers_rx = transfers_rx.clone();
js.spawn(async move {
while let Some((from, to)) = transfers_rx.recv().await {
Self::transfer(i, &mut client, from, to).await;
}
});
}

let mut rng = rand::thread_rng();
let customers = rand::distributions::Uniform::from(1..=self.customers);
let transfers = futures::stream::iter(
let mut transfers =
std::iter::from_fn(|| Some((customers.sample(&mut rng), customers.sample(&mut rng))))
.filter(|(from, to)| from != to)
.map(Ok)
.take(transactions as usize),
);
.take(transactions as usize);

let start = std::time::Instant::now();
transfers
.try_for_each_concurrent(self.clients.size(), |(from, to)| self.transfer(from, to))
.await?;
for transfer in transfers {
transfers_tx.send(transfer)?;
}
let elapsed = start.elapsed().as_secs_f64();

println!();
Expand All @@ -127,13 +134,12 @@ impl Bank {
transactions as f64 / elapsed
);

self.verify().await?;
self.verify(&mut client).await?;
Ok(())
}

// Sets up the database with customers and accounts.
async fn setup(&self) -> Result<()> {
let client = self.clients.get().await;
async fn setup(&self, client: &mut Client) -> Result<()> {
let start = std::time::Instant::now();
client.execute("BEGIN").await?;
client
Expand Down Expand Up @@ -186,8 +192,7 @@ impl Bank {
}

/// Verifies that all invariants hold (same total balance, no negative balances).
async fn verify(&self) -> Result<()> {
let client = self.clients.get().await;
async fn verify(&self, client: &mut Client) -> Result<()> {
let expect = self.customers * self.customer_accounts * Self::INITIAL_BALANCE;
let balance =
client.execute("SELECT SUM(balance) FROM account").await?.into_value()?.integer()?
Expand All @@ -211,8 +216,7 @@ impl Bank {
}

/// Transfers a random amount between two customers, retrying serialization failures.
async fn transfer(&self, from: u64, to: u64) -> Result<()> {
let client = self.clients.get().await;
async fn transfer(id: usize, client: &mut Client, from: u64, to: u64) -> Result<()> {
let attempts = Rc::new(Cell::new(0_u8));
let start = std::time::Instant::now();

Expand Down Expand Up @@ -266,7 +270,7 @@ impl Bank {

println!(
"Thread {} transferred {: >4} from {: >3} ({:0>4}) to {: >3} ({:0>4}) in {:.3}s ({} attempts)",
client.id(),
id,
amount,
from,
from_account,
Expand Down
70 changes: 0 additions & 70 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@ use crate::sql::engine::Status;
use crate::sql::execution::ResultSet;
use crate::sql::schema::Table;

use futures::future::FutureExt as _;
use futures::sink::SinkExt as _;
use futures::stream::TryStreamExt as _;
use rand::Rng as _;
use std::cell::Cell;
use std::future::Future;
use std::ops::{Deref, Drop};
use std::sync::Arc;
use tokio::net::{TcpStream, ToSocketAddrs};
use tokio::sync::{Mutex, MutexGuard};
Expand Down Expand Up @@ -156,71 +154,3 @@ impl Client {
Err(Error::Serialization)
}
}

/// A toyDB client pool
pub struct Pool {
clients: Vec<Mutex<Client>>,
}

impl Pool {
/// Creates a new connection pool for the given servers, eagerly connecting clients.
pub async fn new<A: ToSocketAddrs + Clone>(addrs: Vec<A>, size: u64) -> Result<Self> {
let mut addrs = addrs.into_iter().cycle();
let clients = futures::future::try_join_all(
std::iter::from_fn(|| {
Some(Client::new(addrs.next().unwrap()).map(|r| r.map(Mutex::new)))
})
.take(size as usize),
)
.await?;
Ok(Self { clients })
}

/// Fetches a client from the pool. It is reset (i.e. any open txns are rolled back) and
/// returned when it goes out of scope.
pub async fn get(&self) -> PoolClient<'_> {
let (client, index, _) =
futures::future::select_all(self.clients.iter().map(|m| m.lock().boxed())).await;
PoolClient::new(index, client)
}

/// Returns the size of the pool
pub fn size(&self) -> usize {
self.clients.len()
}
}

/// A client returned from the pool
pub struct PoolClient<'a> {
id: usize,
client: MutexGuard<'a, Client>,
}

impl<'a> PoolClient<'a> {
/// Creates a new PoolClient
fn new(id: usize, client: MutexGuard<'a, Client>) -> Self {
Self { id, client }
}

/// Returns the ID of the client in the pool
pub fn id(&self) -> usize {
self.id
}
}

impl<'a> Deref for PoolClient<'a> {
type Target = MutexGuard<'a, Client>;

fn deref(&self) -> &Self::Target {
&self.client
}
}

impl<'a> Drop for PoolClient<'a> {
fn drop(&mut self) {
if self.txn().is_some() {
// FIXME This should disconnect or destroy the client if it errors.
futures::executor::block_on(self.client.execute("ROLLBACK")).ok();
}
}
}
67 changes: 0 additions & 67 deletions tests/client/pool.rs

This file was deleted.

30 changes: 0 additions & 30 deletions tests/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,36 +168,6 @@ pub async fn cluster_with_clients(size: u8, queries: Vec<&str>) -> Result<(Vec<C
Ok((clients, teardown))
}

/// Sets up a server cluster with a client pool
pub async fn cluster_with_pool(
cluster_size: u8,
pool_size: u64,
queries: Vec<&str>,
) -> Result<(Pool, Teardown)> {
let mut nodes = HashMap::new();
for i in 1..=cluster_size {
nodes.insert(
i,
(format!("127.0.0.1:{}", 9605 + i as u64), format!("127.0.0.1:{}", 9705 + i as u64)),
);
}
let teardown = cluster(nodes.clone()).await?;

let pool = Pool::new(nodes.into_iter().map(|(_, (addr, _))| addr).collect(), pool_size).await?;
pool.get().await.status().await?;

if !queries.is_empty() {
let c = pool.get().await;
c.execute("BEGIN").await?;
for query in queries {
c.execute(query).await?;
}
c.execute("COMMIT").await?;
}

Ok((pool, teardown))
}

/// Sets up a simple cluster with 3 clients and a test table
pub async fn cluster_simple() -> Result<(Client, Client, Client, Teardown)> {
let (mut clients, teardown) = cluster_with_clients(3, simple()).await?;
Expand Down

0 comments on commit d9edb42

Please sign in to comment.