Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Apr 8, 2024
1 parent f71eda4 commit b12cd20
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 92 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ tokio-util = { version = "~0.7.8", features = ["codec"] }
uuid = { version = "~1.8.0", features = ["v4"] }

[dev-dependencies]
escargot = "0.5.10"
goldenfile = "~1.6.0"
paste = "~1.0.14"
pretty_assertions = "~1.4.0"
Expand Down
2 changes: 0 additions & 2 deletions tests/cluster/mod.rs

This file was deleted.

13 changes: 10 additions & 3 deletions tests/cluster/isolation.rs → tests/e2e/isolation.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::super::{assert_row, assert_rows, setup};
use super::super::assert_row;
use super::TestCluster;

use toydb::error::{Error, Result};
use toydb::sql::types::Value;
Expand All @@ -8,8 +9,10 @@ use serial_test::serial;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
// A dirty write is when b overwrites an uncommitted value written by a.
async fn anomaly_dirty_write() -> Result<()> {
let (mut a, mut b, _, _teardown) = setup::cluster_simple().await?;
async fn dirty_write() -> Result<()> {
let tc = TestCluster::run(5).await?;
let mut a = tc.connect_any().await?;
let mut b = tc.connect_any().await?;

a.execute("BEGIN").await?;
a.execute("INSERT INTO test VALUES (1, 'a')").await?;
Expand All @@ -25,6 +28,8 @@ async fn anomaly_dirty_write() -> Result<()> {
Ok(())
}

/*
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
// A dirty read is when b can read an uncommitted value set by a.
Expand Down Expand Up @@ -142,3 +147,5 @@ async fn anomaly_phantom_read() -> Result<()> {
}
// FIXME We should test write skew, but we need to implement serializable snapshot isolation first.
*/
174 changes: 174 additions & 0 deletions tests/e2e/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
//! End-to-end tests for toyDB. These spin up toyDB clusters as separate child
//! processes using a built binary.
//!
//! TODO: this should be rewritten as data-driven golden master tests.
mod isolation;
mod recovery;

use lazy_static::lazy_static;
use rand::Rng;
use toydb::error::Result;
use toydb::raft::NodeID;
use toydb::Client;

lazy_static! {
static ref DATASET_TEST_TABLE: Vec<String> =
vec!["CREATE TABLE test (id INTEGER PRIMARY KEY, value STRING)".to_string(),];
}

/// Runs a toyDB cluster using the built binary in a temporary directory. The
/// cluster will be killed and removed when dropped.
///
/// This runs the cluster as child processes using the built binary instead of
/// spawning in-memory threads for a couple of reasons: it avoids having to
/// gracefully shut down the server (which is complicated by e.g.
/// TcpListener::accept() not being interruptable), and it tests the entire
/// server (and optionally the toySQL client) end-to-end.
pub struct TestCluster {
nodes: u8,
dir: tempdir::TempDir,
children: std::collections::HashMap<NodeID, std::process::Child>,
}

impl TestCluster {
const SQL_BASE_PORT: u16 = 9600;
const RAFT_BASE_PORT: u16 = 9700;

/// Creates a new test cluster.
pub fn new(nodes: u8) -> Result<Self> {
Ok(Self {
nodes,
dir: tempdir::TempDir::new("toydb")?,
children: std::collections::HashMap::new(),
})
}

/// Creates a new test cluster and starts it.
pub async fn run(nodes: u8) -> Result<Self> {
let mut tc = Self::new(nodes)?;
tc.start().await?;
Ok(tc)
}

/// Creates a new test cluster, starts it, and imports an initial dataset.
pub async fn run_with(nodes: u8, init: Vec<String>) -> Result<Self> {
let tc = Self::run(nodes).await?;

let mut c = tc.connect_any().await?;
c.execute("BEGIN").await?;
for stmt in init {
c.execute(&stmt).await?;
}
c.execute("COMMIT").await?;

Ok(tc)
}

/// Returns an iterator over the cluster node IDs.
fn ids(&self) -> impl Iterator<Item = NodeID> {
1..=self.nodes
}

/// Asserts that the given node ID exists.
fn assert_id(&self, id: NodeID) {
assert!(id > 0 && id <= self.nodes, "invalid node ID {}", id)
}

/// Returns the path to the given node's directory.
fn node_path(&self, id: NodeID) -> std::path::PathBuf {
self.assert_id(id);
self.dir.path().join(format!("toydb{}", id))
}

/// Generates a config file for the given node.
fn node_config(&self, id: NodeID) -> String {
self.assert_id(id);
let mut cfg = String::new();
cfg.push_str(&format!("id: {}\n", id));
cfg.push_str(&format!("data_dir: {}\n", self.node_path(id).to_string_lossy()));
cfg.push_str(&format!("listen_sql: {}\n", self.node_address_sql(id)));
cfg.push_str(&format!("listen_raft: {}\n", self.node_address_raft(id)));
cfg.push_str("peers:\n");
for peer in self.ids().filter(|p| p != &id) {
cfg.push_str(&format!(" '{}': {}\n", peer, self.node_address_raft(peer)))
}
cfg
}

/// Returns the given node's Raft TCP address.
fn node_address_raft(&self, id: NodeID) -> String {
self.assert_id(id);
format!("localhost:{}", Self::RAFT_BASE_PORT + id as u16)
}

/// Returns the given node's SQL TCP address.
fn node_address_sql(&self, id: NodeID) -> String {
self.assert_id(id);
format!("localhost:{}", Self::SQL_BASE_PORT + id as u16)
}

/// Starts the test cluster. It keeps running until the cluster is dropped.
///
/// TODO: this only uses async because Client is still async. Remove it.
pub async fn start(&mut self) -> Result<()> {
// Build the binary.
let build = escargot::CargoBuild::new().bin("toydb").run().expect("Failed to build binary");

// Spawn nodes.
for id in self.ids() {
// Create node directory and config file.
std::fs::create_dir_all(&self.node_path(id))?;
std::fs::write(&self.node_path(id).join("toydb.yaml"), self.node_config(id))?;

// Spawn node.
let child = build
.command()
.args(vec!["-c", &self.node_path(id).join("toydb.yaml").to_string_lossy()])
.spawn()?;
self.children.insert(id, child);
}

// Wait for all nodes to be ready, by connecting to them and fetching
// the cluster status.
const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
const COOLDOWN: std::time::Duration = std::time::Duration::from_millis(100);

let deadline = std::time::Instant::now().checked_add(TIMEOUT).unwrap();
for id in self.ids() {
while let Err(e) = async { self.connect(id).await?.status().await }.await {
if std::time::Instant::now().ge(&deadline) {
return Err(e);
}
tokio::time::sleep(COOLDOWN).await
}
}

Ok(())
}

/// Connects to the given cluster node.
pub async fn connect(&self, id: NodeID) -> Result<Client> {
self.assert_id(id);
Client::new(self.node_address_sql(id)).await
}

/// Connects to a random cluster node.
pub async fn connect_any(&self) -> Result<Client> {
self.connect(rand::thread_rng().gen_range(1..=self.nodes)).await
}
}

impl Drop for TestCluster {
/// Kills the child processes when the cluster is dropped. The temp dir is
/// removed by TempDir::drop().
///
/// Note that cargo will itself kill all child processes if the tests are
/// aborted via e.g. Ctrl-C: https://github.com/rust-lang/cargo/issues/5598
fn drop(&mut self) {
for (_, mut child) in self.children.drain() {
child.kill().expect("Failed to kill node");
child.wait().expect("Failed to wait for node to terminate");
}
}
}
11 changes: 8 additions & 3 deletions tests/cluster/recovery.rs → tests/e2e/recovery.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::super::{assert_row, setup};
use super::super::assert_row;
use super::{TestCluster, DATASET_TEST_TABLE};

use toydb::error::{Error, Result};
use toydb::sql::types::Value;
Expand All @@ -9,7 +10,9 @@ use serial_test::serial;
#[serial]
// A client disconnect or termination should roll back its transaction.
async fn client_disconnect_rollback() -> Result<()> {
let (mut a, mut b, _, _teardown) = setup::cluster_simple().await?;
let tc = TestCluster::run_with(5, DATASET_TEST_TABLE.to_vec()).await?;
let mut a = tc.connect_any().await?;
let mut b = tc.connect_any().await?;

a.execute("BEGIN").await?;
a.execute("INSERT INTO test VALUES (1, 'a')").await?;
Expand All @@ -28,7 +31,9 @@ async fn client_disconnect_rollback() -> Result<()> {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn client_commit_error() -> Result<()> {
let (mut a, mut b, _, _teardown) = setup::cluster_simple().await?;
let tc = TestCluster::run(5).await?;
let mut a = tc.connect_any().await?;
let mut b = tc.connect_any().await?;

a.execute("BEGIN").await?;
a.execute("INSERT INTO test VALUES (1, 'a')").await?;
Expand Down
83 changes: 0 additions & 83 deletions tests/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ use toydb::server::Server;
use toydb::{raft, sql, storage};

use futures_util::future::FutureExt as _;
use pretty_assertions::assert_eq;
use std::collections::HashMap;
use std::time::Duration;
use tempdir::TempDir;
use tokio::net::TcpListener;

Expand Down Expand Up @@ -64,11 +62,6 @@ pub fn movies() -> Vec<&'static str> {
]
}

/// Simple data
pub fn simple() -> Vec<&'static str> {
vec!["CREATE TABLE test (id INTEGER PRIMARY KEY, value STRING)"]
}

/// Sets up a test server
pub async fn server(
id: raft::NodeID,
Expand Down Expand Up @@ -108,76 +101,6 @@ pub async fn server_with_client(queries: Vec<&str>) -> Result<(Client, Teardown)
Ok((client, teardown))
}

/// Sets up a server cluster.
pub async fn cluster(nodes: HashMap<raft::NodeID, (String, String)>) -> Result<Teardown> {
let mut teardown = Teardown::empty();
for (id, (addr_sql, addr_raft)) in nodes.iter() {
let peers = nodes
.iter()
.filter(|(i, _)| i != &id)
.map(|(id, (_, raft))| (*id, raft.clone()))
.collect();
teardown.merge(server(*id, addr_sql, addr_raft, peers).await?);
}

// Wait for nodes to have a leader.
for (id, (addr_sql, _)) in nodes.iter() {
for _ in 0..10 {
match Client::new(addr_sql).await {
Ok(mut client) => match client.status().await {
Ok(status) if status.raft.leader > 0 => break,
Ok(_) => log::error!("no leader"),
Err(err) => log::error!("Status failed for {}: {}", id, err),
},
Err(err) => log::error!("Client failed for {}: {}", id, err),
}
tokio::time::sleep(Duration::from_millis(100)).await
}
}

Ok(teardown)
}

/// Sets up a server cluster with clients
pub async fn cluster_with_clients(size: u8, queries: Vec<&str>) -> Result<(Vec<Client>, Teardown)> {
let mut nodes = HashMap::new();
for i in 1..=size {
nodes.insert(
i,
(format!("127.0.0.1:{}", 9605 + i as u64), format!("127.0.0.1:{}", 9705 + i as u64)),
);
}
let teardown = cluster(nodes.clone()).await?;

let mut clients = Vec::<Client>::new();
for (id, (addr_sql, _)) in nodes {
let mut client = Client::new(addr_sql).await?;
assert_eq!(id, client.status().await?.raft.server);
clients.push(client);
}

if !queries.is_empty() {
let c = clients.get_mut(0).unwrap();
c.execute("BEGIN").await?;
for query in queries {
c.execute(query).await?;
}
c.execute("COMMIT").await?;
}

Ok((clients, teardown))
}

/// Sets up a simple cluster with 3 clients and a test table
pub async fn cluster_simple() -> Result<(Client, Client, Client, Teardown)> {
let (mut clients, teardown) = cluster_with_clients(3, simple()).await?;
let a = clients.remove(0);
let b = clients.remove(0);
let c = clients.remove(0);

Ok((a, b, c, teardown))
}

/// Tears down a test fixture when dropped.
pub struct Teardown {
fns: Vec<Box<dyn FnOnce()>>,
Expand All @@ -197,12 +120,6 @@ impl Teardown {
fn on_drop<F: FnOnce() + 'static>(&mut self, f: F) {
self.fns.push(Box::new(f))
}

fn merge(&mut self, mut other: Teardown) {
while !other.fns.is_empty() {
self.fns.push(other.fns.remove(0))
}
}
}

impl Drop for Teardown {
Expand Down
2 changes: 1 addition & 1 deletion tests/tests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![warn(clippy::all)]

mod client;
mod cluster;
mod e2e;
mod setup;
mod sql;

Expand Down

0 comments on commit b12cd20

Please sign in to comment.