From 879b637a50885846f3556f9d2187600be51960c3 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Sun, 7 Apr 2024 21:57:02 +0200 Subject: [PATCH] wip --- Cargo.lock | 42 +++++ Cargo.toml | 1 + src/bin/toydb.rs | 2 +- src/bin/toysql.rs | 38 +++-- src/bin/workload.rs | 305 ++++++++++++++++--------------------- src/client.rs | 54 +++---- src/error.rs | 24 +++ src/server.rs | 65 ++++---- tests/client/mod.rs | 120 +++++++-------- tests/cluster/isolation.rs | 75 +++++---- tests/cluster/recovery.rs | 24 +-- tests/setup.rs | 24 +-- 12 files changed, 384 insertions(+), 390 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 41253d489..07267ab76 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -392,6 +392,19 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + [[package]] name = "crossbeam-channel" version = "0.5.12" @@ -401,6 +414,34 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-deque" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-queue" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df0346b5d5e76ac2fe4e327c5fd1118d6be7c51dfb18f9b7922923f287471e35" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.19" @@ -1835,6 +1876,7 @@ dependencies = [ "bincode", "clap 4.5.4", "config", + "crossbeam", "derivative", "fs4", "futures", diff --git a/Cargo.toml b/Cargo.toml index c6d5f426f..1f53f7de2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ async-channel = "~2.2.0" bincode = "~1.3.3" clap = { version = "~4.5.4", features = ["cargo", "derive"] } config = "~0.14.0" +crossbeam = { version = "0.8.4", features = ["crossbeam-channel"] } derivative = "~2.2.0" fs4 = "~0.8.1" futures = "~0.3.15" diff --git a/src/bin/toydb.rs b/src/bin/toydb.rs index 94d796843..2e6a9854d 100644 --- a/src/bin/toydb.rs +++ b/src/bin/toydb.rs @@ -60,7 +60,7 @@ async fn main() -> Result<()> { let srv = Server::new(cfg.id, cfg.peers, raft_log, raft_state)?; let raft_listener = TcpListener::bind(&cfg.listen_raft).await?; - let sql_listener = TcpListener::bind(&cfg.listen_sql).await?; + let sql_listener = std::net::TcpListener::bind(&cfg.listen_sql)?; srv.serve(raft_listener, sql_listener).await } diff --git a/src/bin/toysql.rs b/src/bin/toysql.rs index 07d65801e..e4e856a08 100644 --- a/src/bin/toysql.rs +++ b/src/bin/toysql.rs @@ -14,8 +14,7 @@ use toydb::sql::execution::ResultSet; use toydb::sql::parser::{Lexer, Token}; use toydb::Client; -#[tokio::main] -async fn main() -> Result<()> { +fn main() -> Result<()> { let opts = clap::command!() .name("toysql") .about("A ToyDB client.") @@ -36,13 +35,12 @@ async fn main() -> Result<()> { .get_matches(); let mut toysql = - ToySQL::new(opts.get_one::("host").unwrap(), *opts.get_one("port").unwrap()) - .await?; + ToySQL::new(opts.get_one::("host").unwrap(), *opts.get_one("port").unwrap())?; if let Some(command) = opts.get_one::<&str>("command") { - toysql.execute(command).await + toysql.execute(command) } else { - toysql.run().await + toysql.run() } } @@ -56,9 +54,9 @@ struct ToySQL { impl ToySQL { /// Creates a new ToySQL REPL for the given server host and port - async fn new(host: &str, port: u16) -> Result { + fn new(host: &str, port: u16) -> Result { Ok(Self { - client: Client::new((host, port)).await?, + client: Client::new((host, port))?, editor: Editor::new()?, history_path: std::env::var_os("HOME") .map(|home| std::path::Path::new(&home).join(".toysql.history")), @@ -67,18 +65,18 @@ impl ToySQL { } /// Executes a line of input - async fn execute(&mut self, input: &str) -> Result<()> { + fn execute(&mut self, input: &str) -> Result<()> { if input.starts_with('!') { - self.execute_command(input).await + self.execute_command(input) } else if !input.is_empty() { - self.execute_query(input).await + self.execute_query(input) } else { Ok(()) } } /// Handles a REPL command (prefixed by !, e.g. !help) - async fn execute_command(&mut self, input: &str) -> Result<()> { + fn execute_command(&mut self, input: &str) -> Result<()> { let mut input = input.split_ascii_whitespace(); let command = input.next().ok_or_else(|| Error::Parse("Expected command.".to_string()))?; @@ -116,7 +114,7 @@ The following commands are also available: "# ), "!status" => { - let status = self.client.status().await?; + let status = self.client.status()?; let mut node_logs = status .raft .last_index @@ -166,11 +164,11 @@ Storage: {keys} keys, {logical_size} MB logical, {nodes}x {disk_size} MB disk, } "!table" => { let args = getargs(1)?; - println!("{}", self.client.get_table(args[0]).await?); + println!("{}", self.client.get_table(args[0])?); } "!tables" => { getargs(0)?; - for table in self.client.list_tables().await? { + for table in self.client.list_tables()? { println!("{}", table) } } @@ -180,8 +178,8 @@ Storage: {keys} keys, {logical_size} MB logical, {nodes}x {disk_size} MB disk, } /// Runs a query and displays the results - async fn execute_query(&mut self, query: &str) -> Result<()> { - match self.client.execute(query).await? { + fn execute_query(&mut self, query: &str) -> Result<()> { + match self.client.execute(query)? { ResultSet::Begin { version, read_only } => match read_only { false => println!("Began transaction at new version {}", version), true => println!("Began read-only transaction at version {}", version), @@ -237,7 +235,7 @@ Storage: {keys} keys, {logical_size} MB logical, {nodes}x {disk_size} MB disk, } /// Runs the ToySQL REPL - async fn run(&mut self) -> Result<()> { + fn run(&mut self) -> Result<()> { if let Some(path) = &self.history_path { match self.editor.load_history(path) { Ok(_) => {} @@ -252,14 +250,14 @@ Storage: {keys} keys, {logical_size} MB logical, {nodes}x {disk_size} MB disk, rustyline::Cmd::Noop, ); - let status = self.client.status().await?; + let status = self.client.status()?; println!( "Connected to toyDB node \"{}\". Enter !help for instructions.", status.raft.server ); while let Some(input) = self.prompt()? { - match self.execute(&input).await { + match self.execute(&input) { Ok(()) => {} error @ Err(Error::Internal(_)) => return error, Err(error) => println!("Error: {}", error), diff --git a/src/bin/workload.rs b/src/bin/workload.rs index 26391ef98..f84549970 100644 --- a/src/bin/workload.rs +++ b/src/bin/workload.rs @@ -21,13 +21,12 @@ use std::time::Duration; use toydb::error::{Error, Result}; use toydb::{Client, ResultSet}; -#[tokio::main] -async fn main() -> Result<()> { +fn main() -> Result<()> { let Command { runner, subcommand } = Command::parse(); match subcommand { - Subcommand::Read(read) => runner.run(read).await, - Subcommand::Write(write) => runner.run(write).await, - Subcommand::Bank(bank) => runner.run(bank).await, + Subcommand::Read(read) => runner.run(read), + Subcommand::Write(write) => runner.run(write), + Subcommand::Bank(bank) => runner.run(bank), } } @@ -76,9 +75,9 @@ struct Runner { impl Runner { /// Runs the specified workload. - async fn run(self, workload: W) -> Result<()> { + fn run(self, workload: W) -> Result<()> { let mut rng = rand::rngs::StdRng::seed_from_u64(self.seed); - let mut client = Client::new(&self.hosts[0]).await?; + let mut client = Client::new(&self.hosts[0])?; // Set up a histogram recording txn latencies as nanoseconds. The // buckets range from 0.001s to 10s. @@ -89,90 +88,80 @@ impl Runner { print!("Preparing initial dataset... "); std::io::stdout().flush()?; let start = std::time::Instant::now(); - workload.prepare(&mut client, &mut rng).await?; + workload.prepare(&mut client, &mut rng)?; println!("done ({:.3}s)", start.elapsed().as_secs_f64()); // Spawn workers, round robin across hosts. - print!("Spawning {} workers... ", self.concurrency); - std::io::stdout().flush()?; - let start = std::time::Instant::now(); + std::thread::scope(|s| -> Result<()> { + print!("Spawning {} workers... ", self.concurrency); + std::io::stdout().flush()?; + let start = std::time::Instant::now(); + + let (work_tx, work_rx) = crossbeam::channel::bounded(self.concurrency); + + for addr in self.hosts.iter().cycle().take(self.concurrency) { + let mut client = Client::new(addr)?; + let work_rx = work_rx.clone(); + let mut recorder = hist.recorder(); + s.spawn(move || -> Result<()> { + while let Ok(item) = work_rx.recv() { + let start = std::time::Instant::now(); + Self::execute_with_retry::(&mut client, item)?; + recorder.record(start.elapsed().as_nanos() as u64)?; + } + Ok(()) + }); + } - let mut js = tokio::task::JoinSet::>::new(); - let (work_tx, work_rx) = async_channel::bounded(self.concurrency); - - for addr in self.hosts.iter().cycle().take(self.concurrency) { - let mut client = Client::new(addr).await?; - let work_rx = work_rx.clone(); - let mut recorder = hist.recorder(); - js.spawn(async move { - while let Ok(item) = work_rx.recv().await { - let start = std::time::Instant::now(); - Self::execute_with_retry::(&mut client, item).await?; - recorder.record(start.elapsed().as_nanos() as u64)?; - } - Ok(()) - }); - } + println!("done ({:.3}s)", start.elapsed().as_secs_f64()); - println!("done ({:.3}s)", start.elapsed().as_secs_f64()); + // Spawn work generator. + { + println!("Running workload {}...", workload); + let generator = workload.generate(rng).take(self.count); + s.spawn(move || -> Result<()> { + for item in generator { + work_tx.send(item)?; + } + Ok(()) + }); + } - // Spawn work generator. - { - println!("Running workload {}...", workload); - let generator = workload.generate(rng).take(self.count); - js.spawn(async move { - for item in generator { - work_tx.send(item).await?; - } - work_tx.close(); - Ok(()) + // Wait for workers to complete, and periodically print stats. + let start = std::time::Instant::now(); + let ticker = crossbeam::channel::tick(Duration::from_secs(1)); + + println!("Time Progress Txns Rate p50 p90 p99 pMax"); + println!(); + + s.spawn(move || loop { + ticker.recv().unwrap(); + + let duration = start.elapsed().as_secs_f64(); + hist.refresh_timeout(Duration::from_secs(1)); + + println!( + "{:<8} {:>5.1}% {:>7} {:>6.0}/s {:>6.1}ms {:>6.1}ms {:>6.1}ms {:>6.1}ms", + format!("{:.1}s", duration), + hist.len() as f64 / self.count as f64 * 100.0, + hist.len(), + hist.len() as f64 / duration, + Duration::from_nanos(hist.value_at_quantile(0.5)).as_secs_f64() * 1000.0, + Duration::from_nanos(hist.value_at_quantile(0.9)).as_secs_f64() * 1000.0, + Duration::from_nanos(hist.value_at_quantile(0.99)).as_secs_f64() * 1000.0, + Duration::from_nanos(hist.max()).as_secs_f64() * 1000.0, + ); }); - } - // Wait for workers to complete, and periodically print stats. - let start = std::time::Instant::now(); - let mut ticker = tokio::time::interval(Duration::from_secs(1)); - ticker.tick().await; // skip first tick - - println!(); - println!("Time Progress Txns Rate p50 p90 p99 pMax"); - - let mut print_stats = || { - let duration = start.elapsed().as_secs_f64(); - hist.refresh_timeout(Duration::from_secs(1)); - println!( - "{:<8} {:>5.1}% {:>7} {:>6.0}/s {:>6.1}ms {:>6.1}ms {:>6.1}ms {:>6.1}ms", - format!("{:.1}s", duration), - hist.len() as f64 / self.count as f64 * 100.0, - hist.len(), - hist.len() as f64 / duration, - Duration::from_nanos(hist.value_at_quantile(0.5)).as_secs_f64() * 1000.0, - Duration::from_nanos(hist.value_at_quantile(0.9)).as_secs_f64() * 1000.0, - Duration::from_nanos(hist.value_at_quantile(0.99)).as_secs_f64() * 1000.0, - Duration::from_nanos(hist.max()).as_secs_f64() * 1000.0, - ); - }; - - loop { - tokio::select! { - // Print stats every second. - _ = ticker.tick() => print_stats(), - - // Check if tasks are done. - result = js.join_next() => match result { - Some(result) => result??, - None => break, - }, - } - } - print_stats(); - println!(); + Ok(()) + })?; // Verify the final dataset. + println!(); print!("Verifying dataset... "); std::io::stdout().flush()?; let start = std::time::Instant::now(); - workload.verify(&mut client, self.count).await?; + workload.verify(&mut client, self.count)?; println!("done ({:.3}s)", start.elapsed().as_secs_f64()); Ok(()) @@ -183,18 +172,18 @@ impl Runner { /// the client or workload trait. /// /// TODO: move this to a Client.with_txn() helper once async is removed. - async fn execute_with_retry(client: &mut Client, item: W::Item) -> Result<()> { + fn execute_with_retry(client: &mut Client, item: W::Item) -> Result<()> { const MAX_RETRIES: u32 = 10; const MIN_WAIT: u64 = 10; const MAX_WAIT: u64 = 2_000; let mut retries: u32 = 0; loop { - match W::execute(client, &item).await { + match W::execute(client, &item) { Ok(()) => return Ok(()), Err(Error::Serialization | Error::Abort) if retries < MAX_RETRIES => { if client.txn().is_some() { - client.execute("ROLLBACK").await?; + client.execute("ROLLBACK")?; } // Use exponential backoff starting at MIN_WAIT doubling up @@ -202,12 +191,12 @@ impl Runner { // to reduce the chance of collisions. let mut wait = std::cmp::min(MIN_WAIT * 2_u64.pow(retries), MAX_WAIT); wait = rand::thread_rng().gen_range(MIN_WAIT..=wait); - tokio::time::sleep(std::time::Duration::from_millis(wait)).await; + std::thread::sleep(std::time::Duration::from_millis(wait)); retries += 1; } Err(e) => { if client.txn().is_some() { - client.execute("ROLLBACK").await.ok(); // ignore rollback error + client.execute("ROLLBACK").ok(); // ignore rollback error } return Err(e); } @@ -222,20 +211,17 @@ trait Workload: std::fmt::Display + 'static { type Item: Send; /// Prepares the workload by creating initial tables and data. - async fn prepare(&self, client: &mut Client, rng: &mut StdRng) -> Result<()>; + fn prepare(&self, client: &mut Client, rng: &mut StdRng) -> Result<()>; /// Generates work items as an iterator. fn generate(&self, rng: StdRng) -> impl Iterator + Send + 'static; /// Executes a single work item. This will automatically be retried on /// certain errors, and must use a transaction where appropriate. - fn execute( - client: &mut Client, - item: &Self::Item, - ) -> impl std::future::Future> + Send; + fn execute(client: &mut Client, item: &Self::Item) -> Result<()>; /// Verifies the dataset after the workload has completed. - async fn verify(&self, _client: &mut Client, _txns: usize) -> Result<()> { + fn verify(&self, _client: &mut Client, _txns: usize) -> Result<()> { Ok(()) } } @@ -268,12 +254,10 @@ impl std::fmt::Display for Read { impl Workload for Read { type Item = HashSet; - async fn prepare(&self, client: &mut Client, rng: &mut StdRng) -> Result<()> { - client.execute("BEGIN").await?; - client.execute(r#"DROP TABLE IF EXISTS "read""#).await?; - client - .execute(r#"CREATE TABLE "read" (id INT PRIMARY KEY, value STRING NOT NULL)"#) - .await?; + fn prepare(&self, client: &mut Client, rng: &mut StdRng) -> Result<()> { + client.execute("BEGIN")?; + client.execute(r#"DROP TABLE IF EXISTS "read""#)?; + client.execute(r#"CREATE TABLE "read" (id INT PRIMARY KEY, value STRING NOT NULL)"#)?; let chars = &mut rand::distributions::Alphanumeric.sample_iter(rng).map(|b| b as char); let rows = (1..=self.rows).map(|id| (id, chars.take(self.size).collect::())); @@ -285,9 +269,9 @@ impl Workload for Read { ) }); for query in queries { - client.execute(&query).await?; + client.execute(&query)?; } - client.execute("COMMIT").await?; + client.execute("COMMIT")?; Ok(()) } @@ -299,20 +283,19 @@ impl Workload for Read { } } - async fn execute(client: &mut Client, item: &Self::Item) -> Result<()> { + fn execute(client: &mut Client, item: &Self::Item) -> Result<()> { let batch_size = item.len(); let query = format!( r#"SELECT * FROM "read" WHERE {}"#, item.iter().map(|id| format!("id = {}", id)).join(" OR ") ); - let rows = client.execute(&query).await?.into_rows()?; + let rows = client.execute(&query)?.into_rows()?; assert_eq!(rows.count(), batch_size, "Unexpected row count"); Ok(()) } - async fn verify(&self, client: &mut Client, _: usize) -> Result<()> { - let count = - client.execute(r#"SELECT COUNT(*) FROM "read""#).await?.into_value()?.integer()?; + fn verify(&self, client: &mut Client, _: usize) -> Result<()> { + let count = client.execute(r#"SELECT COUNT(*) FROM "read""#)?.into_value()?.integer()?; assert_eq!(count as u64, self.rows, "Unexpected row count"); Ok(()) } @@ -365,13 +348,11 @@ impl std::fmt::Display for Write { impl Workload for Write { type Item = Vec<(u64, String)>; - async fn prepare(&self, client: &mut Client, _: &mut StdRng) -> Result<()> { - client.execute("BEGIN").await?; - client.execute(r#"DROP TABLE IF EXISTS "write""#).await?; - client - .execute(r#"CREATE TABLE "write" (id INT PRIMARY KEY, value STRING NOT NULL)"#) - .await?; - client.execute("COMMIT").await?; + fn prepare(&self, client: &mut Client, _: &mut StdRng) -> Result<()> { + client.execute("BEGIN")?; + client.execute(r#"DROP TABLE IF EXISTS "write""#)?; + client.execute(r#"CREATE TABLE "write" (id INT PRIMARY KEY, value STRING NOT NULL)"#)?; + client.execute("COMMIT")?; Ok(()) } @@ -379,13 +360,13 @@ impl Workload for Write { WriteGenerator { next_id: 1, size: self.size, batch: self.batch, rng } } - async fn execute(client: &mut Client, item: &Self::Item) -> Result<()> { + fn execute(client: &mut Client, item: &Self::Item) -> Result<()> { let batch_size = item.len(); let query = format!( r#"INSERT INTO "write" (id, value) VALUES {}"#, item.iter().map(|(id, value)| format!("({}, '{}')", id, value)).join(", ") ); - if let ResultSet::Create { count } = client.execute(&query).await? { + if let ResultSet::Create { count } = client.execute(&query)? { assert_eq!(count as usize, batch_size, "Unexpected row count"); } else { panic!("Unexpected result") @@ -393,9 +374,8 @@ impl Workload for Write { Ok(()) } - async fn verify(&self, client: &mut Client, txns: usize) -> Result<()> { - let count = - client.execute(r#"SELECT COUNT(*) FROM "write""#).await?.into_value()?.integer()?; + fn verify(&self, client: &mut Client, txns: usize) -> Result<()> { + let count = client.execute(r#"SELECT COUNT(*) FROM "write""#)?.into_value()?.integer()?; assert_eq!(count as usize, txns * self.batch, "Unexpected row count"); Ok(()) } @@ -460,47 +440,39 @@ impl std::fmt::Display for Bank { impl Workload for Bank { type Item = (u64, u64, u64); // from,to,amount - async fn prepare(&self, client: &mut Client, rng: &mut StdRng) -> Result<()> { + fn prepare(&self, client: &mut Client, rng: &mut StdRng) -> Result<()> { let petnames = petname::Petnames::default(); - client.execute("BEGIN").await?; - client.execute("DROP TABLE IF EXISTS account").await?; - client.execute("DROP TABLE IF EXISTS customer").await?; - client - .execute( - "CREATE TABLE customer ( + client.execute("BEGIN")?; + client.execute("DROP TABLE IF EXISTS account")?; + client.execute("DROP TABLE IF EXISTS customer")?; + client.execute( + "CREATE TABLE customer ( id INTEGER PRIMARY KEY, name STRING NOT NULL )", - ) - .await?; - client - .execute( - "CREATE TABLE account ( + )?; + client.execute( + "CREATE TABLE account ( id INTEGER PRIMARY KEY, customer_id INTEGER NOT NULL INDEX REFERENCES customer, balance INTEGER NOT NULL )", - ) - .await?; - client - .execute(&format!( - "INSERT INTO customer VALUES {}", - (1..=self.customers) - .zip(petnames.iter(rng, 3, " ")) - .map(|(id, name)| format!("({}, '{}')", id, name)) - .join(", ") - )) - .await?; - client - .execute(&format!( - "INSERT INTO account VALUES {}", - (1..=self.customers) - .flat_map(|c| (1..=self.accounts).map(move |a| (c, (c-1)*self.accounts + a))) - .map(|(c, a)| (format!("({}, {}, {})", a, c, self.balance))) - .join(", ") - )) - .await?; - client.execute("COMMIT").await?; + )?; + client.execute(&format!( + "INSERT INTO customer VALUES {}", + (1..=self.customers) + .zip(petnames.iter(rng, 3, " ")) + .map(|(id, name)| format!("({}, '{}')", id, name)) + .join(", ") + ))?; + client.execute(&format!( + "INSERT INTO account VALUES {}", + (1..=self.customers) + .flat_map(|c| (1..=self.accounts).map(move |a| (c, (c - 1) * self.accounts + a))) + .map(|(c, a)| (format!("({}, {}, {})", a, c, self.balance))) + .join(", ") + ))?; + client.execute("COMMIT")?; Ok(()) } @@ -516,10 +488,10 @@ impl Workload for Bank { .filter(|(from, to, _)| from != to) } - async fn execute(client: &mut Client, item: &Self::Item) -> Result<()> { + fn execute(client: &mut Client, item: &Self::Item) -> Result<()> { let (from, to, mut amount) = item; - client.execute("BEGIN").await?; + client.execute("BEGIN")?; let mut row = client .execute(&format!( @@ -529,8 +501,7 @@ impl Workload for Bank { ORDER BY a.balance DESC LIMIT 1", from - )) - .await? + ))? .into_row()?; let from_balance = row.pop().unwrap().integer()?; let from_account = row.pop().unwrap().integer()?; @@ -544,36 +515,30 @@ impl Workload for Bank { ORDER BY a.balance ASC LIMIT 1", to - )) - .await? + ))? .into_value()? .integer()?; - client - .execute(&format!( - "UPDATE account SET balance = balance - {} WHERE id = {}", - amount, from_account, - )) - .await?; - client - .execute(&format!( - "UPDATE account SET balance = balance + {} WHERE id = {}", - amount, to_account, - )) - .await?; + client.execute(&format!( + "UPDATE account SET balance = balance - {} WHERE id = {}", + amount, from_account, + ))?; + client.execute(&format!( + "UPDATE account SET balance = balance + {} WHERE id = {}", + amount, to_account, + ))?; - client.execute("COMMIT").await?; + client.execute("COMMIT")?; Ok(()) } - async fn verify(&self, client: &mut Client, _: usize) -> Result<()> { + fn verify(&self, client: &mut Client, _: usize) -> Result<()> { let balance = - client.execute("SELECT SUM(balance) FROM account").await?.into_value()?.integer()?; + client.execute("SELECT SUM(balance) FROM account")?.into_value()?.integer()?; assert_eq!(balance as u64, self.customers * self.accounts * self.balance); let negative = client - .execute("SELECT COUNT(*) FROM account WHERE balance < 0") - .await? + .execute("SELECT COUNT(*) FROM account WHERE balance < 0")? .into_value()? .integer()?; assert_eq!(negative, 0); diff --git a/src/client.rs b/src/client.rs index f786312ea..89fd34e0b 100644 --- a/src/client.rs +++ b/src/client.rs @@ -3,57 +3,37 @@ use crate::server::{Request, Response}; use crate::sql::engine::Status; use crate::sql::execution::ResultSet; use crate::sql::schema::Table; - -use futures::sink::SinkExt as _; -use futures::stream::TryStreamExt as _; -use tokio::net::{TcpStream, ToSocketAddrs}; -use tokio_util::codec::{Framed, LengthDelimitedCodec}; - -type Connection = tokio_serde::Framed< - Framed, - Result, - Request, - tokio_serde::formats::Bincode, Request>, ->; +use crate::storage::bincode; /// A toyDB client pub struct Client { - conn: Connection, + conn: std::net::TcpStream, txn: Option<(u64, bool)>, } impl Client { /// Creates a new client - pub async fn new(addr: A) -> Result { - Ok(Self { - conn: tokio_serde::Framed::new( - Framed::new(TcpStream::connect(addr).await?, LengthDelimitedCodec::new()), - tokio_serde::formats::Bincode::default(), - ), - txn: None, - }) + pub fn new(addr: impl std::net::ToSocketAddrs) -> Result { + Ok(Self { conn: std::net::TcpStream::connect(addr)?, txn: None }) } /// Call a server method - async fn call(&mut self, request: Request) -> Result { - self.conn.send(request).await?; - match self.conn.try_next().await? { - Some(result) => result, - None => Err(Error::Internal("Server disconnected".into())), - } + fn call(&mut self, request: Request) -> Result { + bincode::serialize_into(&mut self.conn, &request)?; + bincode::deserialize_from(&mut self.conn)? } /// Executes a query - pub async fn execute(&mut self, query: &str) -> Result { - let mut resultset = match self.call(Request::Execute(query.into())).await? { + pub fn execute(&mut self, query: &str) -> Result { + let mut resultset = match self.call(Request::Execute(query.into()))? { Response::Execute(rs) => rs, resp => return Err(Error::Internal(format!("Unexpected response {:?}", resp))), }; if let ResultSet::Query { columns, .. } = resultset { // FIXME We buffer rows for now to avoid lifetime hassles let mut rows = Vec::new(); - while let Some(result) = self.conn.try_next().await? { - match result? { + loop { + match bincode::deserialize_from::<_, Result>(&mut self.conn)?? { Response::Row(Some(row)) => rows.push(row), Response::Row(None) => break, response => { @@ -73,24 +53,24 @@ impl Client { } /// Fetches the table schema as SQL - pub async fn get_table(&mut self, table: &str) -> Result { - match self.call(Request::GetTable(table.into())).await? { + pub fn get_table(&mut self, table: &str) -> Result
{ + match self.call(Request::GetTable(table.into()))? { Response::GetTable(t) => Ok(t), resp => Err(Error::Value(format!("Unexpected response: {:?}", resp))), } } /// Lists database tables - pub async fn list_tables(&mut self) -> Result> { - match self.call(Request::ListTables).await? { + pub fn list_tables(&mut self) -> Result> { + match self.call(Request::ListTables)? { Response::ListTables(t) => Ok(t), resp => Err(Error::Value(format!("Unexpected response: {:?}", resp))), } } /// Checks server status - pub async fn status(&mut self) -> Result { - match self.call(Request::Status).await? { + pub fn status(&mut self) -> Result { + match self.call(Request::Status)? { Response::Status(s) => Ok(s), resp => Err(Error::Value(format!("Unexpected response: {:?}", resp))), } diff --git a/src/error.rs b/src/error.rs index 39de0a5f6..063c0b2b1 100644 --- a/src/error.rs +++ b/src/error.rs @@ -61,6 +61,30 @@ impl From for Error { } } +impl From for Error { + fn from(err: crossbeam::channel::RecvError) -> Self { + Error::Internal(err.to_string()) + } +} + +impl From> for Error { + fn from(err: crossbeam::channel::SendError) -> Self { + Error::Internal(err.to_string()) + } +} + +impl From for Error { + fn from(err: crossbeam::channel::TryRecvError) -> Self { + Error::Internal(err.to_string()) + } +} + +impl From> for Error { + fn from(err: crossbeam::channel::TrySendError) -> Self { + Error::Internal(err.to_string()) + } +} + impl From for Error { fn from(err: hdrhistogram::CreationError) -> Self { Error::Internal(err.to_string()) diff --git a/src/server.rs b/src/server.rs index fa551cc96..7c7360149 100644 --- a/src/server.rs +++ b/src/server.rs @@ -5,16 +5,13 @@ use crate::sql::engine::Engine as _; use crate::sql::execution::ResultSet; use crate::sql::schema::{Catalog as _, Table}; use crate::sql::types::Row; +use crate::storage::bincode; use ::log::{debug, error, info}; -use futures::sink::SinkExt as _; use serde_derive::{Deserialize, Serialize}; use std::collections::HashMap; -use tokio::net::{TcpListener, TcpStream}; +use tokio::net::TcpListener; use tokio::sync::mpsc; -use tokio_stream::wrappers::TcpListenerStream; -use tokio_stream::StreamExt as _; -use tokio_util::codec::{Framed, LengthDelimitedCodec}; /// A toyDB server. pub struct Server { @@ -33,7 +30,11 @@ impl Server { } /// Serves Raft and SQL requests until the returned future is dropped. Consumes the server. - pub async fn serve(self, raft_listener: TcpListener, sql_listener: TcpListener) -> Result<()> { + pub async fn serve( + self, + raft_listener: TcpListener, + sql_listener: std::net::TcpListener, + ) -> Result<()> { info!( "Listening on {} (SQL) and {} (Raft)", sql_listener.local_addr()?, @@ -42,28 +43,32 @@ impl Server { let (raft_tx, raft_rx) = mpsc::unbounded_channel(); - tokio::try_join!( - self.raft.serve(raft_listener, raft_rx), - Self::serve_sql(sql_listener, raft_tx), - )?; + tokio::task::spawn_blocking(move || Self::serve_sql(sql_listener, raft_tx)); + + tokio::try_join!(self.raft.serve(raft_listener, raft_rx))?; Ok(()) } /// Serves SQL clients. - async fn serve_sql(listener: TcpListener, raft_tx: raft::ClientSender) -> Result<()> { - let mut listener = TcpListenerStream::new(listener); - while let Some(socket) = listener.try_next().await? { - let peer = socket.peer_addr()?; - let session = Session::new(sql::engine::Raft::new(raft_tx.clone())); - tokio::spawn(async move { + fn serve_sql(listener: std::net::TcpListener, raft_tx: raft::ClientSender) { + std::thread::scope(|s| loop { + let (socket, peer) = match listener.accept() { + Ok(r) => r, + Err(err) => { + error!("Connection failed: {}", err); + continue; + } + }; + let raft_tx = raft_tx.clone(); + s.spawn(move || { + let session = Session::new(sql::engine::Raft::new(raft_tx)); info!("Client {} connected", peer); - match session.handle(socket).await { + match session.handle(socket) { Ok(()) => info!("Client {} disconnected", peer), Err(err) => error!("Client {} error: {}", peer, err), } }); - } - Ok(()) + }) } } @@ -99,18 +104,16 @@ impl Session { } /// Handles a client connection. - async fn handle(mut self, socket: TcpStream) -> Result<()> { - let mut stream = tokio_serde::Framed::new( - Framed::new(socket, LengthDelimitedCodec::new()), - tokio_serde::formats::Bincode::default(), - ); - while let Some(request) = stream.try_next().await? { - let mut response = tokio::task::block_in_place(|| self.request(request)); + fn handle(mut self, mut socket: std::net::TcpStream) -> Result<()> { + loop { + let request = bincode::deserialize_from(&socket)?; + let mut response = self.request(request); let mut rows: Box> + Send> = Box::new(std::iter::empty()); if let Ok(Response::Execute(ResultSet::Query { rows: ref mut resultrows, .. })) = &mut response { + // TODO: don't stream results, for simplicity. rows = Box::new( std::mem::replace(resultrows, Box::new(std::iter::empty())) .map(|result| result.map(|row| Response::Row(Some(row)))) @@ -126,12 +129,14 @@ impl Session { .fuse(), ); } - stream.send(response).await?; - stream.send_all(&mut tokio_stream::iter(rows.map(Ok))).await?; + + bincode::serialize_into(&mut socket, &response)?; + + for row in rows { + bincode::serialize_into(&mut socket, &row)?; + } } - Ok(()) } - /// Executes a request. pub fn request(&mut self, request: Request) -> Result { debug!("Processing request {:?}", request); diff --git a/tests/client/mod.rs b/tests/client/mod.rs index dc6b6bd16..cc15e7565 100644 --- a/tests/client/mod.rs +++ b/tests/client/mod.rs @@ -13,17 +13,14 @@ use toydb::Client; use pretty_assertions::assert_eq; use serial_test::serial; -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +#[tokio::test(flavor = "multi_thread")] #[serial] async fn get_table() -> Result<()> { let (mut c, _teardown) = setup::server_with_client(setup::movies()).await?; + assert_eq!(c.get_table("unknown"), Err(Error::Value("Table unknown does not exist".into()))); assert_eq!( - c.get_table("unknown").await, - Err(Error::Value("Table unknown does not exist".into())) - ); - assert_eq!( - c.get_table("movies").await?, + c.get_table("movies")?, schema::Table { name: "movies".into(), columns: vec![ @@ -108,7 +105,7 @@ async fn get_table() -> Result<()> { async fn list_tables() -> Result<()> { let (mut c, _teardown) = setup::server_with_client(setup::movies()).await?; - assert_eq!(c.list_tables().await?, vec!["countries", "genres", "movies", "studios"]); + assert_eq!(c.list_tables()?, vec!["countries", "genres", "movies", "studios"]); Ok(()) } @@ -118,7 +115,7 @@ async fn status() -> Result<()> { let (mut c, _teardown) = setup::server_with_client(setup::movies()).await?; assert_eq!( - c.status().await?, + c.status()?, Status { raft: raft::Status { server: 1, @@ -159,7 +156,7 @@ async fn execute() -> Result<()> { let (mut c, _teardown) = setup::server_with_client(setup::movies()).await?; // SELECT - let result = c.execute("SELECT * FROM genres").await?; + let result = c.execute("SELECT * FROM genres")?; assert_eq!( result, ResultSet::Query { @@ -176,7 +173,7 @@ async fn execute() -> Result<()> { ], ); - let result = c.execute("SELECT * FROM genres WHERE FALSE").await?; + let result = c.execute("SELECT * FROM genres WHERE FALSE")?; assert_eq!( result, ResultSet::Query { @@ -186,50 +183,41 @@ async fn execute() -> Result<()> { ); assert_rows(result, Vec::new()); - assert_eq!( - c.execute("SELECT * FROM x").await, - Err(Error::Value("Table x does not exist".into())) - ); + assert_eq!(c.execute("SELECT * FROM x"), Err(Error::Value("Table x does not exist".into()))); // INSERT assert_eq!( - c.execute("INSERT INTO genres VALUES (1, 'Western')").await, + c.execute("INSERT INTO genres VALUES (1, 'Western')"), Err(Error::Value("Primary key 1 already exists for table genres".into())), ); assert_eq!( - c.execute("INSERT INTO genres VALUES (9, 'Western')").await, + c.execute("INSERT INTO genres VALUES (9, 'Western')"), Ok(ResultSet::Create { count: 1 }), ); assert_eq!( - c.execute("INSERT INTO x VALUES (9, 'Western')").await, + c.execute("INSERT INTO x VALUES (9, 'Western')"), Err(Error::Value("Table x does not exist".into())) ); // UPDATE assert_eq!( - c.execute("UPDATE genres SET name = 'Horror' WHERE FALSE").await, + c.execute("UPDATE genres SET name = 'Horror' WHERE FALSE"), Ok(ResultSet::Update { count: 0 }), ); assert_eq!( - c.execute("UPDATE genres SET name = 'Horror' WHERE id = 9").await, + c.execute("UPDATE genres SET name = 'Horror' WHERE id = 9"), Ok(ResultSet::Update { count: 1 }), ); assert_eq!( - c.execute("UPDATE genres SET id = 1 WHERE id = 9").await, + c.execute("UPDATE genres SET id = 1 WHERE id = 9"), Err(Error::Value("Primary key 1 already exists for table genres".into())) ); // DELETE + assert_eq!(c.execute("DELETE FROM genres WHERE FALSE"), Ok(ResultSet::Delete { count: 0 }),); + assert_eq!(c.execute("DELETE FROM genres WHERE id = 9"), Ok(ResultSet::Delete { count: 1 }),); assert_eq!( - c.execute("DELETE FROM genres WHERE FALSE").await, - Ok(ResultSet::Delete { count: 0 }), - ); - assert_eq!( - c.execute("DELETE FROM genres WHERE id = 9").await, - Ok(ResultSet::Delete { count: 1 }), - ); - assert_eq!( - c.execute("DELETE FROM genres WHERE x = 1").await, + c.execute("DELETE FROM genres WHERE x = 1"), Err(Error::Value("Unknown field x".into())) ); @@ -244,76 +232,73 @@ async fn execute_txn() -> Result<()> { assert_eq!(c.txn(), None); // Committing a change in a txn should work - assert_eq!(c.execute("BEGIN").await?, ResultSet::Begin { version: 2, read_only: false }); + assert_eq!(c.execute("BEGIN")?, ResultSet::Begin { version: 2, read_only: false }); assert_eq!(c.txn(), Some((2, false))); - c.execute("INSERT INTO genres VALUES (4, 'Drama')").await?; - assert_eq!(c.execute("COMMIT").await?, ResultSet::Commit { version: 2 }); + c.execute("INSERT INTO genres VALUES (4, 'Drama')")?; + assert_eq!(c.execute("COMMIT")?, ResultSet::Commit { version: 2 }); assert_eq!(c.txn(), None); assert_row( - c.execute("SELECT * FROM genres WHERE id = 4").await?, + c.execute("SELECT * FROM genres WHERE id = 4")?, vec![Value::Integer(4), Value::String("Drama".into())], ); assert_eq!(c.txn(), None); // Rolling back a change in a txn should also work - assert_eq!(c.execute("BEGIN").await?, ResultSet::Begin { version: 3, read_only: false }); + assert_eq!(c.execute("BEGIN")?, ResultSet::Begin { version: 3, read_only: false }); assert_eq!(c.txn(), Some((3, false))); - c.execute("INSERT INTO genres VALUES (5, 'Musical')").await?; + c.execute("INSERT INTO genres VALUES (5, 'Musical')")?; assert_row( - c.execute("SELECT * FROM genres WHERE id = 5").await?, + c.execute("SELECT * FROM genres WHERE id = 5")?, vec![Value::Integer(5), Value::String("Musical".into())], ); - assert_eq!(c.execute("ROLLBACK").await?, ResultSet::Rollback { version: 3 }); - assert_rows(c.execute("SELECT * FROM genres WHERE id = 5").await?, Vec::new()); + assert_eq!(c.execute("ROLLBACK")?, ResultSet::Rollback { version: 3 }); + assert_rows(c.execute("SELECT * FROM genres WHERE id = 5")?, Vec::new()); assert_eq!(c.txn(), None); // Starting a read-only txn should block writes - assert_eq!( - c.execute("BEGIN READ ONLY").await?, - ResultSet::Begin { version: 4, read_only: true } - ); + assert_eq!(c.execute("BEGIN READ ONLY")?, ResultSet::Begin { version: 4, read_only: true }); assert_eq!(c.txn(), Some((4, true))); assert_row( - c.execute("SELECT * FROM genres WHERE id = 4").await?, + c.execute("SELECT * FROM genres WHERE id = 4")?, vec![Value::Integer(4), Value::String("Drama".into())], ); - assert_eq!(c.execute("INSERT INTO genres VALUES (5, 'Musical')").await, Err(Error::ReadOnly)); + assert_eq!(c.execute("INSERT INTO genres VALUES (5, 'Musical')"), Err(Error::ReadOnly)); assert_row( - c.execute("SELECT * FROM genres WHERE id = 4").await?, + c.execute("SELECT * FROM genres WHERE id = 4")?, vec![Value::Integer(4), Value::String("Drama".into())], ); - assert_eq!(c.execute("COMMIT").await?, ResultSet::Commit { version: 4 }); + assert_eq!(c.execute("COMMIT")?, ResultSet::Commit { version: 4 }); // Starting a time-travel txn should work, it shouldn't see recent changes, and it should // block writes assert_eq!( - c.execute("BEGIN READ ONLY AS OF SYSTEM TIME 2").await?, + c.execute("BEGIN READ ONLY AS OF SYSTEM TIME 2")?, ResultSet::Begin { version: 2, read_only: true }, ); assert_eq!(c.txn(), Some((2, true))); assert_rows( - c.execute("SELECT * FROM genres").await?, + c.execute("SELECT * FROM genres")?, vec![ vec![Value::Integer(1), Value::String("Science Fiction".into())], vec![Value::Integer(2), Value::String("Action".into())], vec![Value::Integer(3), Value::String("Comedy".into())], ], ); - assert_eq!(c.execute("INSERT INTO genres VALUES (5, 'Musical')").await, Err(Error::ReadOnly)); - assert_eq!(c.execute("COMMIT").await?, ResultSet::Commit { version: 2 }); + assert_eq!(c.execute("INSERT INTO genres VALUES (5, 'Musical')"), Err(Error::ReadOnly)); + assert_eq!(c.execute("COMMIT")?, ResultSet::Commit { version: 2 }); // A txn should still be usable after an error occurs - assert_eq!(c.execute("BEGIN").await?, ResultSet::Begin { version: 4, read_only: false }); - c.execute("INSERT INTO genres VALUES (5, 'Horror')").await?; + assert_eq!(c.execute("BEGIN")?, ResultSet::Begin { version: 4, read_only: false }); + c.execute("INSERT INTO genres VALUES (5, 'Horror')")?; assert_eq!( - c.execute("INSERT INTO genres VALUES (5, 'Musical')").await, + c.execute("INSERT INTO genres VALUES (5, 'Musical')"), Err(Error::Value("Primary key 5 already exists for table genres".into())) ); assert_eq!(c.txn(), Some((4, false))); - c.execute("INSERT INTO genres VALUES (6, 'Western')").await?; - assert_eq!(c.execute("COMMIT").await?, ResultSet::Commit { version: 4 }); + c.execute("INSERT INTO genres VALUES (6, 'Western')")?; + assert_eq!(c.execute("COMMIT")?, ResultSet::Commit { version: 4 }); assert_rows( - c.execute("SELECT * FROM genres").await?, + c.execute("SELECT * FROM genres")?, vec![ vec![Value::Integer(1), Value::String("Science Fiction".into())], vec![Value::Integer(2), Value::String("Action".into())], @@ -331,35 +316,32 @@ async fn execute_txn() -> Result<()> { #[serial] async fn execute_txn_concurrent() -> Result<()> { let (mut a, _teardown) = setup::server_with_client(setup::movies()).await?; - let mut b = Client::new("127.0.0.1:9605").await?; + let mut b = Client::new("127.0.0.1:9605")?; // Concurrent updates should throw a serialization failure on conflict. - assert_eq!(a.execute("BEGIN").await?, ResultSet::Begin { version: 2, read_only: false }); - assert_eq!(b.execute("BEGIN").await?, ResultSet::Begin { version: 3, read_only: false }); + assert_eq!(a.execute("BEGIN")?, ResultSet::Begin { version: 2, read_only: false }); + assert_eq!(b.execute("BEGIN")?, ResultSet::Begin { version: 3, read_only: false }); assert_row( - a.execute("SELECT * FROM genres WHERE id = 1").await?, + a.execute("SELECT * FROM genres WHERE id = 1")?, vec![Value::Integer(1), Value::String("Science Fiction".into())], ); assert_row( - b.execute("SELECT * FROM genres WHERE id = 1").await?, + b.execute("SELECT * FROM genres WHERE id = 1")?, vec![Value::Integer(1), Value::String("Science Fiction".into())], ); assert_eq!( - a.execute("UPDATE genres SET name = 'x' WHERE id = 1").await, + a.execute("UPDATE genres SET name = 'x' WHERE id = 1"), Ok(ResultSet::Update { count: 1 }) ); - assert_eq!( - b.execute("UPDATE genres SET name = 'y' WHERE id = 1").await, - Err(Error::Serialization) - ); + assert_eq!(b.execute("UPDATE genres SET name = 'y' WHERE id = 1"), Err(Error::Serialization)); - assert_eq!(a.execute("COMMIT").await, Ok(ResultSet::Commit { version: 2 })); - assert_eq!(b.execute("ROLLBACK").await, Ok(ResultSet::Rollback { version: 3 })); + assert_eq!(a.execute("COMMIT"), Ok(ResultSet::Commit { version: 2 })); + assert_eq!(b.execute("ROLLBACK"), Ok(ResultSet::Rollback { version: 3 })); assert_row( - a.execute("SELECT * FROM genres WHERE id = 1").await?, + a.execute("SELECT * FROM genres WHERE id = 1")?, vec![Value::Integer(1), Value::String("x".into())], ); diff --git a/tests/cluster/isolation.rs b/tests/cluster/isolation.rs index 53be778e7..cc67d2591 100644 --- a/tests/cluster/isolation.rs +++ b/tests/cluster/isolation.rs @@ -11,14 +11,14 @@ use serial_test::serial; async fn anomaly_dirty_write() -> Result<()> { let (mut a, mut b, _, _teardown) = setup::cluster_simple().await?; - a.execute("BEGIN").await?; - a.execute("INSERT INTO test VALUES (1, 'a')").await?; + a.execute("BEGIN")?; + a.execute("INSERT INTO test VALUES (1, 'a')")?; - assert_eq!(b.execute("INSERT INTO test VALUES (1, 'b')").await, Err(Error::Serialization)); + assert_eq!(b.execute("INSERT INTO test VALUES (1, 'b')"), Err(Error::Serialization)); - a.execute("COMMIT").await?; + a.execute("COMMIT")?; assert_row( - a.execute("SELECT * FROM test WHERE id = 1").await?, + a.execute("SELECT * FROM test WHERE id = 1")?, vec![Value::Integer(1), Value::String("a".into())], ); @@ -31,10 +31,10 @@ async fn anomaly_dirty_write() -> Result<()> { async fn anomaly_dirty_read() -> Result<()> { let (mut a, mut b, _, _teardown) = setup::cluster_simple().await?; - a.execute("BEGIN").await?; - a.execute("INSERT INTO test VALUES (1, 'a')").await?; + a.execute("BEGIN")?; + a.execute("INSERT INTO test VALUES (1, 'a')")?; - assert_rows(b.execute("SELECT * FROM test").await?, vec![]); + assert_rows(b.execute("SELECT * FROM test")?, vec![]); Ok(()) } @@ -45,20 +45,17 @@ async fn anomaly_dirty_read() -> Result<()> { async fn anomaly_lost_update() -> Result<()> { let (mut a, mut b, mut c, _teardown) = setup::cluster_simple().await?; - c.execute("INSERT INTO test VALUES (1, 'c')").await?; + c.execute("INSERT INTO test VALUES (1, 'c')")?; - a.execute("BEGIN").await?; - b.execute("BEGIN").await?; + a.execute("BEGIN")?; + b.execute("BEGIN")?; - a.execute("UPDATE test SET value = 'a' WHERE id = 1").await?; - assert_eq!( - b.execute("UPDATE test SET value = 'b' WHERE id = 1").await, - Err(Error::Serialization) - ); - a.execute("COMMIT").await?; + a.execute("UPDATE test SET value = 'a' WHERE id = 1")?; + assert_eq!(b.execute("UPDATE test SET value = 'b' WHERE id = 1"), Err(Error::Serialization)); + a.execute("COMMIT")?; assert_row( - c.execute("SELECT * FROM test WHERE id = 1").await?, + c.execute("SELECT * FROM test WHERE id = 1")?, vec![Value::Integer(1), Value::String("a".into())], ); @@ -71,19 +68,19 @@ async fn anomaly_lost_update() -> Result<()> { async fn anomaly_fuzzy_read() -> Result<()> { let (mut a, mut b, mut c, _teardown) = setup::cluster_simple().await?; - c.execute("INSERT INTO test VALUES (1, 'c')").await?; + c.execute("INSERT INTO test VALUES (1, 'c')")?; - a.execute("BEGIN").await?; - b.execute("BEGIN").await?; + a.execute("BEGIN")?; + b.execute("BEGIN")?; assert_row( - b.execute("SELECT * FROM test WHERE id = 1").await?, + b.execute("SELECT * FROM test WHERE id = 1")?, vec![Value::Integer(1), Value::String("c".into())], ); - a.execute("UPDATE test SET value = 'a' WHERE id = 1").await?; - a.execute("COMMIT").await?; + a.execute("UPDATE test SET value = 'a' WHERE id = 1")?; + a.execute("COMMIT")?; assert_row( - b.execute("SELECT * FROM test WHERE id = 1").await?, + b.execute("SELECT * FROM test WHERE id = 1")?, vec![Value::Integer(1), Value::String("c".into())], ); @@ -96,19 +93,19 @@ async fn anomaly_fuzzy_read() -> Result<()> { async fn anomaly_read_skew() -> Result<()> { let (mut a, mut b, mut c, _teardown) = setup::cluster_simple().await?; - c.execute("INSERT INTO test VALUES (1, 'c'), (2, 'c')").await?; + c.execute("INSERT INTO test VALUES (1, 'c'), (2, 'c')")?; - a.execute("BEGIN").await?; - b.execute("BEGIN").await?; + a.execute("BEGIN")?; + b.execute("BEGIN")?; assert_row( - a.execute("SELECT * FROM test WHERE id = 1").await?, + a.execute("SELECT * FROM test WHERE id = 1")?, vec![Value::Integer(1), Value::String("c".into())], ); - b.execute("UPDATE test SET value = 'b' WHERE id = 2").await?; - b.execute("COMMIT").await?; + b.execute("UPDATE test SET value = 'b' WHERE id = 2")?; + b.execute("COMMIT")?; assert_row( - a.execute("SELECT * FROM test WHERE id = 2").await?, + a.execute("SELECT * FROM test WHERE id = 2")?, vec![Value::Integer(2), Value::String("c".into())], ); @@ -122,19 +119,19 @@ async fn anomaly_read_skew() -> Result<()> { async fn anomaly_phantom_read() -> Result<()> { let (mut a, mut b, mut c, _teardown) = setup::cluster_simple().await?; - c.execute("INSERT INTO test VALUES (1, 'true'), (2, 'false')").await?; + c.execute("INSERT INTO test VALUES (1, 'true'), (2, 'false')")?; - a.execute("BEGIN").await?; - b.execute("BEGIN").await?; + a.execute("BEGIN")?; + b.execute("BEGIN")?; assert_rows( - a.execute("SELECT * FROM test WHERE value = 'true'").await?, + a.execute("SELECT * FROM test WHERE value = 'true'")?, vec![vec![Value::Integer(1), Value::String("true".into())]], ); - b.execute("UPDATE test SET value = 'true' WHERE id = 2").await?; - b.execute("COMMIT").await?; + b.execute("UPDATE test SET value = 'true' WHERE id = 2")?; + b.execute("COMMIT")?; assert_rows( - a.execute("SELECT * FROM test WHERE value = 'true'").await?, + a.execute("SELECT * FROM test WHERE value = 'true'")?, vec![vec![Value::Integer(1), Value::String("true".into())]], ); diff --git a/tests/cluster/recovery.rs b/tests/cluster/recovery.rs index d6b02af27..77bd5f8a2 100644 --- a/tests/cluster/recovery.rs +++ b/tests/cluster/recovery.rs @@ -11,14 +11,14 @@ use serial_test::serial; async fn client_disconnect_rollback() -> Result<()> { let (mut a, mut b, _, _teardown) = setup::cluster_simple().await?; - a.execute("BEGIN").await?; - a.execute("INSERT INTO test VALUES (1, 'a')").await?; + a.execute("BEGIN")?; + a.execute("INSERT INTO test VALUES (1, 'a')")?; std::mem::drop(a); // This would fail with a serialization error if the txn is not rolled back. - b.execute("INSERT INTO test VALUES (1, 'b')").await?; + b.execute("INSERT INTO test VALUES (1, 'b')")?; assert_row( - b.execute("SELECT * FROM test WHERE id = 1").await?, + b.execute("SELECT * FROM test WHERE id = 1")?, vec![Value::Integer(1), Value::String("b".into())], ); @@ -30,18 +30,18 @@ async fn client_disconnect_rollback() -> Result<()> { async fn client_commit_error() -> Result<()> { let (mut a, mut b, _, _teardown) = setup::cluster_simple().await?; - a.execute("BEGIN").await?; - a.execute("INSERT INTO test VALUES (1, 'a')").await?; + a.execute("BEGIN")?; + a.execute("INSERT INTO test VALUES (1, 'a')")?; // When B gets a serialization error, it should still be in the txn and able to roll it back. - b.execute("BEGIN").await?; - b.execute("INSERT INTO test VALUES (2, 'b')").await?; - assert_eq!(b.execute("INSERT INTO test VALUES (1, 'b')").await, Err(Error::Serialization)); - b.execute("ROLLBACK").await?; + b.execute("BEGIN")?; + b.execute("INSERT INTO test VALUES (2, 'b')")?; + assert_eq!(b.execute("INSERT INTO test VALUES (1, 'b')"), Err(Error::Serialization)); + b.execute("ROLLBACK")?; // Once rolled back, A should be able to write ID 2 and commit. - a.execute("INSERT INTO test VALUES (2, 'a')").await?; - a.execute("COMMIT").await?; + a.execute("INSERT INTO test VALUES (2, 'a')")?; + a.execute("COMMIT")?; Ok(()) } diff --git a/tests/setup.rs b/tests/setup.rs index 2f7c9e695..a03c63ef7 100644 --- a/tests/setup.rs +++ b/tests/setup.rs @@ -80,7 +80,7 @@ pub async fn server( let raft_log = raft::Log::new(storage::engine::BitCask::new(dir.path().join("log"))?, false)?; let raft_state = Box::new(sql::engine::Raft::new_state(storage::engine::Memory::new())?); let raft_listener = TcpListener::bind(addr_raft).await?; - let sql_listener = TcpListener::bind(addr_sql).await?; + let sql_listener = std::net::TcpListener::bind(addr_sql)?; let (task, abort) = Server::new(id, peers, raft_log, raft_state)? .serve(raft_listener, sql_listener) @@ -97,13 +97,13 @@ pub async fn server( /// Sets up a server with a client pub async fn server_with_client(queries: Vec<&str>) -> Result<(Client, Teardown)> { let teardown = server(1, "127.0.0.1:9605", "127.0.0.1:9705", HashMap::new()).await?; - let mut client = Client::new("127.0.0.1:9605").await?; + let mut client = Client::new("127.0.0.1:9605")?; if !queries.is_empty() { - client.execute("BEGIN").await?; + client.execute("BEGIN")?; for query in queries { - client.execute(query).await?; + client.execute(query)?; } - client.execute("COMMIT").await?; + client.execute("COMMIT")?; } Ok((client, teardown)) } @@ -123,8 +123,8 @@ pub async fn cluster(nodes: HashMap) -> Result match client.status().await { + match Client::new(addr_sql) { + Ok(mut client) => match client.status() { Ok(status) if status.raft.leader > 0 => break, Ok(_) => log::error!("no leader"), Err(err) => log::error!("Status failed for {}: {}", id, err), @@ -151,18 +151,18 @@ pub async fn cluster_with_clients(size: u8, queries: Vec<&str>) -> Result<(Vec::new(); for (id, (addr_sql, _)) in nodes { - let mut client = Client::new(addr_sql).await?; - assert_eq!(id, client.status().await?.raft.server); + let mut client = Client::new(addr_sql)?; + assert_eq!(id, client.status()?.raft.server); clients.push(client); } if !queries.is_empty() { let c = clients.get_mut(0).unwrap(); - c.execute("BEGIN").await?; + c.execute("BEGIN")?; for query in queries { - c.execute(query).await?; + c.execute(query)?; } - c.execute("COMMIT").await?; + c.execute("COMMIT")?; } Ok((clients, teardown))