From c46790fb23ca531348c6c7b6aaa792823bc2dfae Mon Sep 17 00:00:00 2001 From: Valerii Ponomarov Date: Tue, 27 Feb 2024 18:03:02 +0200 Subject: [PATCH 01/22] Add 'text' latte function for getting random strings in rune scripts Usage is as following: pub async fn load(ctxt, i) { ... let randomstring = latte::text(i, 64); ... --- src/context.rs | 11 +++++++++++ src/workload.rs | 1 + 2 files changed, 12 insertions(+) diff --git a/src/context.rs b/src/context.rs index 72ea14c..5c2cdd1 100644 --- a/src/context.rs +++ b/src/context.rs @@ -613,6 +613,17 @@ pub fn blob(seed: i64, len: usize) -> rune::runtime::Bytes { rune::runtime::Bytes::from_vec(v) } +/// Generates random string of given length. +/// Parameter `seed` is used to seed the RNG. +pub fn text(seed: i64, len: usize) -> rune::runtime::StaticString { + let mut rng = StdRng::seed_from_u64(seed as u64); + let s: String = (0..len).map(|_| { + let code_point = rng.gen_range(0x0061u32..=0x007Au32); // Unicode range for 'a-z' + std::char::from_u32(code_point).unwrap() + }).collect(); + rune::runtime::StaticString::new(s) +} + /// Generates 'now' timestamp pub fn now_timestamp() -> i64 { Utc::now().timestamp() diff --git a/src/workload.rs b/src/workload.rs index 9890581..ebefc0e 100644 --- a/src/workload.rs +++ b/src/workload.rs @@ -128,6 +128,7 @@ impl Program { let mut latte_module = Module::with_crate("latte"); latte_module.function(&["blob"], context::blob).unwrap(); + latte_module.function(&["text"], context::text).unwrap(); latte_module.function(&["now_timestamp"], context::now_timestamp).unwrap(); latte_module.function(&["hash"], context::hash).unwrap(); latte_module.function(&["hash2"], context::hash2).unwrap(); From 089d24e13d8de8d23b780b566120f2147d8767df Mon Sep 17 00:00:00 2001 From: Valerii Ponomarov Date: Thu, 22 Feb 2024 10:20:24 +0200 Subject: [PATCH 02/22] Simplify usage of latte's 'connect' function from the 'context' module Before there were 2 separate fn calls in the 'src/main.rs' module: let session = context::connect(conf).await?; let session = Context::new(session); With this change we simplify it the way that we will do only one call: let session = context::connect(conf).await?; --- src/context.rs | 7 ++++--- src/main.rs | 1 - 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/context.rs b/src/context.rs index 5c2cdd1..2a51c39 100644 --- a/src/context.rs +++ b/src/context.rs @@ -55,13 +55,13 @@ fn ssl_context(conf: &&ConnectionConf) -> Result, CassError> } /// Configures connection to Cassandra. -pub async fn connect(conf: &ConnectionConf) -> Result { +pub async fn connect(conf: &ConnectionConf) -> Result { let profile = ExecutionProfile::builder() .consistency(conf.consistency.scylla_consistency()) .request_timeout(Some(Duration::from_secs(60))) // no request timeout .build(); - SessionBuilder::new() + let scylla_session = SessionBuilder::new() .known_nodes(&conf.addresses) .pool_size(PoolSize::PerShard(conf.count)) .user(&conf.user, &conf.password) @@ -69,7 +69,8 @@ pub async fn connect(conf: &ConnectionConf) -> Result PathBuf { async fn connect(conf: &ConnectionConf) -> Result<(Context, Option)> { eprintln!("info: Connecting to {:?}... ", conf.addresses); let session = context::connect(conf).await?; - let session = Context::new(session); let cluster_info = session.cluster_info().await?; eprintln!( "info: Connected to {} running Cassandra version {}", From 05593388a3a35ae061b053b108b0cce9037c1551 Mon Sep 17 00:00:00 2001 From: Valerii Ponomarov Date: Mon, 19 Feb 2024 19:01:26 +0200 Subject: [PATCH 03/22] Add retry mechanism With this change it is now possible to configure retry approach for queries. Following new options are available: --retry-number=10 --retry-interval=200ms | --retry-interval=100ms,3s --request-timeout=5 The '--retry-number' option allows to configure number of retries to be applied in case of query failures. Default is '10'. The '--retry-interval' option may store one or two time values separated with comma. Values may have 'ms' (milliseconds), 's' (seconds) units. If 2 are specified then it will be 'minimum' and 'maximum' waiting interval with exponential growth based on the made attempts. Default is '100ms,5s'. The '--request-timeout' allows to configure the time for a query after which it is considered as 'failed'. Measured in seconds. Default is '5'. Print only 5 retry error messages per sample interval. Also, hide long 'string' and 'blob' values from the retry error messages. --- src/config.rs | 70 +++++++++++++++++++ src/context.rs | 181 ++++++++++++++++++++++++++++++++++++++++++------- src/report.rs | 33 ++++++++- src/stats.rs | 14 +++- 4 files changed, 271 insertions(+), 27 deletions(-) diff --git a/src/config.rs b/src/config.rs index 3a0f3e1..7799109 100644 --- a/src/config.rs +++ b/src/config.rs @@ -10,6 +10,9 @@ use clap::builder::PossibleValue; use clap::{Parser, ValueEnum}; use serde::{Deserialize, Serialize}; +/// Limit of retry errors to be kept and then printed in scope of a sampling interval +pub const PRINT_RETRY_ERROR_LIMIT: u64 = 5; + /// Parse a single key-value pair fn parse_key_val(s: &str) -> Result<(T, U), anyhow::Error> where @@ -81,6 +84,63 @@ impl FromStr for Interval { } } +/// Controls the min and max retry interval for retry mechanism +#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] +pub struct RetryInterval { + pub min_ms: u64, + pub max_ms: u64, +} + +impl RetryInterval { + pub fn new(time: &str) -> Option { + let values: Vec<&str> = time.split(',').collect(); + if values.len() > 2 { + return None; + } + let min_ms = RetryInterval::parse_time(values.get(0).unwrap_or(&""))?; + let max_ms = RetryInterval::parse_time(values.get(1).unwrap_or(&"")).unwrap_or(min_ms); + if min_ms > max_ms { + None + } else { + Some(RetryInterval { min_ms, max_ms }) + } + } + + fn parse_time(time: &str) -> Option { + let trimmed_time = time.trim(); + if trimmed_time.is_empty() { + return None; + } + + let value_str = match trimmed_time { + s if s.ends_with("ms") => s.trim_end_matches("ms"), + s if s.ends_with('s') => { + let num = s.trim_end_matches('s').parse::().ok()?; + return Some(num * 1000); + } + _ => trimmed_time, + }; + + let value = value_str.trim().parse::().ok()?; + Some(value) + } +} + +impl FromStr for RetryInterval { + type Err = String; + + fn from_str(s: &str) -> Result { + if let Some(interval) = RetryInterval::new(s) { + Ok(interval) + } else { + Err(concat!( + "Expected 1 or 2 parts separated by comma such as '500ms' or '200ms,5s' or '1s'.", + " First value cannot be bigger than second one.", + ).to_string()) + } + } +} + #[derive(Parser, Debug, Serialize, Deserialize)] pub struct ConnectionConf { /// Number of connections per Cassandra node / Scylla shard. @@ -123,6 +183,16 @@ pub struct ConnectionConf { /// Default CQL query consistency level #[clap(long("consistency"), required = false, default_value = "LOCAL_QUORUM")] pub consistency: Consistency, + + #[clap(long("request-timeout"), default_value = "5", value_name = "COUNT")] + pub request_timeout: NonZeroUsize, + + #[clap(long("retry-number"), default_value = "10", value_name = "COUNT")] + pub retry_number: u64, + + #[clap(long("retry-interval"), default_value = "100ms,5s", value_name = "TIME[,TIME]")] + pub retry_interval: RetryInterval, + } #[derive(Clone, Copy, Default, Debug, Eq, PartialEq, Serialize, Deserialize)] diff --git a/src/context.rs b/src/context.rs index 2a51c39..86d0cf2 100644 --- a/src/context.rs +++ b/src/context.rs @@ -33,7 +33,7 @@ use tokio::time::{Duration, Instant}; use try_lock::TryLock; use uuid::{Variant, Version}; -use crate::config::ConnectionConf; +use crate::config::{ConnectionConf, PRINT_RETRY_ERROR_LIMIT, RetryInterval}; use crate::LatteError; fn ssl_context(conf: &&ConnectionConf) -> Result, CassError> { @@ -58,7 +58,7 @@ fn ssl_context(conf: &&ConnectionConf) -> Result, CassError> pub async fn connect(conf: &ConnectionConf) -> Result { let profile = ExecutionProfile::builder() .consistency(conf.consistency.scylla_consistency()) - .request_timeout(Some(Duration::from_secs(60))) // no request timeout + .request_timeout(Some(Duration::from_secs(conf.request_timeout.get() as u64))) .build(); let scylla_session = SessionBuilder::new() @@ -70,7 +70,7 @@ pub async fn connect(conf: &ConnectionConf) -> Result { .build() .await .map_err(|e| CassError(CassErrorKind::FailedToConnect(conf.addresses.clone(), e)))?; - Ok(Context::new(scylla_session)) + Ok(Context::new(scylla_session, conf.retry_number, conf.retry_interval)) } pub struct ClusterInfo { @@ -78,6 +78,56 @@ pub struct ClusterInfo { pub cassandra_version: String, } +/// Transforms a CqlValue object to a string dedicated to be part of CassError message +pub fn cql_value_obj_to_string(v: &CqlValue) -> String { + let no_transformation_size_limit = 32; + match v { + // Replace big string- and bytes-alike object values with it's size labels + CqlValue::Text(param) if param.len() > no_transformation_size_limit => { + format!("Text(={})", param.len()) + }, + CqlValue::Ascii(param) if param.len() > no_transformation_size_limit => { + format!("Ascii(={})", param.len()) + }, + CqlValue::Blob(param) if param.len() > no_transformation_size_limit => { + format!("Blob(={})", param.len()) + }, + CqlValue::UserDefinedType { keyspace, type_name, fields } => { + let mut result = format!( + "UDT {{ keyspace: \"{}\", type_name: \"{}\", fields: [", + keyspace, type_name, + ); + for (field_name, field_value) in fields { + let field_string = match field_value { + Some(field) => cql_value_obj_to_string(field), + None => String::from("None"), + }; + result.push_str(&format!("(\"{}\", {}), ", field_name, field_string)); + } + if result.len() >= 2 { + result.truncate(result.len() - 2); + } + result.push_str(&format!("] }}")); + result + }, + CqlValue::List(elements) => { + let mut result = String::from("List(["); + for element in elements { + let element_string = cql_value_obj_to_string(element); + result.push_str(&element_string); + result.push_str(", "); + } + if result.len() >= 2 { + result.truncate(result.len() - 2); + } + result.push_str("])"); + result + }, + // TODO: cover 'CqlValue::Map' and 'CqlValue::Set' + _ => format!("{v:?}"), + } +} + #[derive(Any, Debug)] pub struct CassError(pub CassErrorKind); @@ -89,7 +139,7 @@ impl CassError { fn query_execution_error(cql: &str, params: &[CqlValue], err: QueryError) -> CassError { let query = QueryInfo { cql: cql.to_string(), - params: params.iter().map(|v| format!("{v:?}")).collect(), + params: params.iter().map(|v| cql_value_obj_to_string(v)).collect(), }; let kind = match err { QueryError::RequestTimeout(_) @@ -102,6 +152,12 @@ impl CassError { }; CassError(kind) } + + fn query_retries_exceeded(retry_number: u64) -> CassError { + CassError(CassErrorKind::QueryRetriesExceeded( + format!("Max retry attempts ({}) reached", retry_number) + )) + } } #[derive(Debug)] @@ -126,6 +182,7 @@ pub enum CassErrorKind { SslConfiguration(ErrorStack), FailedToConnect(Vec, NewSessionError), PreparedStatementNotFound(String), + QueryRetriesExceeded(String), UnsupportedType(TypeInfo), Prepare(String, QueryError), Overloaded(QueryInfo, QueryError), @@ -145,6 +202,9 @@ impl CassError { CassErrorKind::PreparedStatementNotFound(s) => { write!(buf, "Prepared statement not found: {s}") } + CassErrorKind::QueryRetriesExceeded(s) => { + write!(buf, "QueryRetriesExceeded: {s}") + } CassErrorKind::UnsupportedType(s) => { write!(buf, "Unsupported type: {s}") } @@ -180,6 +240,8 @@ impl std::error::Error for CassError {} #[derive(Clone, Debug)] pub struct SessionStats { pub req_count: u64, + pub retry_errors: HashSet, + pub retry_error_count: u64, pub req_errors: HashSet, pub req_error_count: u64, pub row_count: u64, @@ -216,12 +278,21 @@ impl SessionStats { } } + pub fn store_retry_error(&mut self, error_str: String) { + self.retry_error_count += 1; + if self.retry_error_count <= PRINT_RETRY_ERROR_LIMIT { + self.retry_errors.insert(error_str); + } + } + /// Resets all accumulators pub fn reset(&mut self) { self.req_error_count = 0; self.row_count = 0; self.req_count = 0; self.mean_queue_length = 0.0; + self.retry_error_count = 0; + self.retry_errors.clear(); self.req_errors.clear(); self.resp_times_ns.clear(); @@ -234,6 +305,8 @@ impl Default for SessionStats { fn default() -> Self { SessionStats { req_count: 0, + retry_errors: HashSet::new(), + retry_error_count: 0, req_errors: HashSet::new(), req_error_count: 0, row_count: 0, @@ -244,6 +317,44 @@ impl Default for SessionStats { } } +pub fn get_expoinential_retry_interval(min_interval: u64, + max_interval: u64, + current_attempt_num: u64) -> u64 { + let min_interval_float: f64 = min_interval as f64; + let mut current_interval: f64 = min_interval_float * ( + 2u64.pow((current_attempt_num - 1).try_into().unwrap_or(0)) as f64 + ); + + // Add jitter + current_interval += rand::thread_rng().gen::() * min_interval_float; + current_interval -= min_interval_float / 2.0; + + std::cmp::min(current_interval as u64, max_interval as u64) as u64 +} + +pub async fn handle_retry_error(ctxt: &Context, current_attempt_num: u64, current_error: CassError) { + let current_retry_interval = get_expoinential_retry_interval( + ctxt.retry_interval.min_ms, ctxt.retry_interval.max_ms, current_attempt_num, + ); + + let mut next_attempt_str = String::new(); + let is_last_attempt = current_attempt_num == ctxt.retry_number; + if !is_last_attempt { + next_attempt_str += &format!("[Retry in {}ms]", current_retry_interval); + } + let err_msg = format!( + "{}: [ERROR][Attempt {}/{}]{} {}", + Utc::now().format("%Y-%m-%d %H:%M:%S%.3f").to_string(), + current_attempt_num, ctxt.retry_number, next_attempt_str, current_error, + ); + if !is_last_attempt { + ctxt.stats.try_lock().unwrap().store_retry_error(err_msg); + tokio::time::sleep(Duration::from_millis(current_retry_interval)).await; + } else { + eprintln!("{}", err_msg); + } +} + /// This is the main object that a workload script uses to interface with the outside world. /// It also tracks query execution metrics such as number of requests, rows, response times etc. #[derive(Any)] @@ -251,6 +362,8 @@ pub struct Context { session: Arc, statements: HashMap>, stats: TryLock, + retry_number: u64, + retry_interval: RetryInterval, #[rune(get, set, add_assign, copy)] pub load_cycle_count: u64, #[rune(get)] @@ -267,11 +380,13 @@ unsafe impl Send for Context {} unsafe impl Sync for Context {} impl Context { - pub fn new(session: scylla::Session) -> Context { + pub fn new(session: scylla::Session, retry_number: u64, retry_interval: RetryInterval) -> Context { Context { session: Arc::new(session), statements: HashMap::new(), stats: TryLock::new(SessionStats::new()), + retry_number: retry_number, + retry_interval: retry_interval, load_cycle_count: 0, data: Value::Object(Shared::new(Object::new())), } @@ -288,6 +403,8 @@ impl Context { session: self.session.clone(), statements: self.statements.clone(), stats: TryLock::new(SessionStats::default()), + retry_number: self.retry_number, + retry_interval: self.retry_interval, load_cycle_count: self.load_cycle_count, data: deserialized, }) @@ -327,15 +444,23 @@ impl Context { /// Executes an ad-hoc CQL statement with no parameters. Does not prepare. pub async fn execute(&self, cql: &str) -> Result<(), CassError> { - let start_time = self.stats.try_lock().unwrap().start_request(); - let rs = self.session.query(cql, ()).await; - let duration = Instant::now() - start_time; - self.stats - .try_lock() - .unwrap() - .complete_request(duration, &rs); - rs.map_err(|e| CassError::query_execution_error(cql, &[], e))?; - Ok(()) + for current_attempt_num in 0..self.retry_number+1 { + let start_time = self.stats.try_lock().unwrap().start_request(); + let rs = self.session.query(cql, ()).await; + let duration = Instant::now() - start_time; + match rs { + Ok(_) => {} + Err(e) => { + let current_error = CassError::query_execution_error(cql, &[], e.clone()); + handle_retry_error(self, current_attempt_num, current_error).await; + continue + } + } + self.stats.try_lock().unwrap().complete_request(duration, &rs); + rs.map_err(|e| CassError::query_execution_error(cql, &[], e.clone()))?; + return Ok(()) + } + Err(CassError::query_retries_exceeded(self.retry_number)) } /// Executes a statement prepared and registered earlier by a call to `prepare`. @@ -345,15 +470,25 @@ impl Context { .get(key) .ok_or_else(|| CassError(CassErrorKind::PreparedStatementNotFound(key.to_string())))?; let params = bind::to_scylla_query_params(¶ms)?; - let start_time = self.stats.try_lock().unwrap().start_request(); - let rs = self.session.execute(statement, params.clone()).await; - let duration = Instant::now() - start_time; - self.stats - .try_lock() - .unwrap() - .complete_request(duration, &rs); - rs.map_err(|e| CassError::query_execution_error(statement.get_statement(), ¶ms, e))?; - Ok(()) + for current_attempt_num in 0..self.retry_number+1 { + let start_time = self.stats.try_lock().unwrap().start_request(); + let rs = self.session.execute(statement, params.clone()).await; + let duration = Instant::now() - start_time; + match rs { + Ok(_) => {} + Err(e) => { + let current_error = CassError::query_execution_error( + statement.get_statement(), ¶ms, e.clone() + ); + handle_retry_error(self, current_attempt_num, current_error).await; + continue + } + } + self.stats.try_lock().unwrap().complete_request(duration, &rs); + rs.map_err(|e| CassError::query_execution_error(statement.get_statement(), ¶ms, e))?; + return Ok(()); + } + Err(CassError::query_retries_exceeded(self.retry_number)) } /// Returns the current accumulated request stats snapshot and resets the stats. diff --git a/src/report.rs b/src/report.rs index fd0f4a5..cb2407a 100644 --- a/src/report.rs +++ b/src/report.rs @@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize}; use statrs::statistics::Statistics; use strum::IntoEnumIterator; -use crate::config::RunCommand; +use crate::config::{PRINT_RETRY_ERROR_LIMIT, RunCommand}; use crate::stats::{ BenchmarkCmp, BenchmarkStats, Bucket, Mean, Percentile, Sample, Significance, TimeDistribution, }; @@ -516,6 +516,19 @@ impl<'a> Display for RunConfigCmp<'a> { self.line("└─", "op", |conf| { Quantity::from(conf.sampling_interval.count()) }), + self.line("Request timeout", "", |conf| { + Quantity::from(conf.connection.request_timeout) + }), + self.line("Retries", "", |_| {Quantity::from("")}), + self.line("┌──────┴number", "", |conf| { + Quantity::from(conf.connection.retry_number) + }), + self.line("├─min interval", "ms", |conf| { + Quantity::from(conf.connection.retry_interval.min_ms) + }), + self.line("└─max interval", "ms", |conf| { + Quantity::from(conf.connection.retry_interval.max_ms) + }), ]; for l in lines { @@ -533,6 +546,24 @@ pub fn print_log_header() { impl Display for Sample { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + if self.retry_error_count > 0 { + let mut num_of_printed_errors = 0; + let mut error_msg_bunch = String::new(); + for retry_error in &self.retry_errors { + if num_of_printed_errors < PRINT_RETRY_ERROR_LIMIT { + error_msg_bunch += &format!("{}\n", retry_error); + num_of_printed_errors += 1; + } else { break } + } + let num_of_dropped_errors = self.retry_error_count - num_of_printed_errors; + if num_of_dropped_errors > 0 { + error_msg_bunch += &format!( + "...number of dropped error messages per sampling period: {}", + num_of_dropped_errors, + ); + } + eprintln!("{}", error_msg_bunch); + } write!( f, "{:8.3} {:11.0} {:11.0} {:9.3} {:9.3} {:9.3} {:9.3} {:9.3} {:9.3} {:9.3} {:9.3} {:9.3}", diff --git a/src/stats.rs b/src/stats.rs index bfd8571..0b1ef52 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -265,8 +265,10 @@ pub struct Sample { pub duration_s: f32, pub cycle_count: u64, pub request_count: u64, - pub error_count: u64, + pub retry_errors: HashSet, + pub retry_error_count: u64, pub errors: HashSet, + pub error_count: u64, pub row_count: u64, pub mean_queue_len: f32, pub cycle_throughput: f32, @@ -287,6 +289,8 @@ impl Sample { let mut cycle_times_ns = Histogram::new(3).unwrap(); let mut request_count = 0; + let mut retry_errors = HashSet::new(); + let mut retry_error_count = 0; let mut row_count = 0; let mut errors = HashSet::new(); let mut error_count = 0; @@ -306,6 +310,8 @@ impl Sample { errors.extend(ss.req_errors.iter().cloned()); } error_count += ss.req_error_count; + retry_errors.extend(ss.retry_errors.iter().cloned()); + retry_error_count += ss.retry_error_count; mean_queue_len += ss.mean_queue_length / stats.len() as f32; duration_s += (s.end_time - s.start_time).as_secs_f32() / stats.len() as f32; resp_times_ns.add(&ss.resp_times_ns).unwrap(); @@ -323,9 +329,11 @@ impl Sample { duration_s, cycle_count, request_count, - row_count, - error_count, + retry_errors, + retry_error_count, errors, + error_count, + row_count, mean_queue_len: not_nan_f32(mean_queue_len).unwrap_or(0.0), cycle_throughput: cycle_count as f32 / duration_s, req_throughput: request_count as f32 / duration_s, From 4d0106b9e195273ccecdb959b1206004c8bb9511 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ko=C5=82aczkowski?= Date: Thu, 20 Jun 2024 13:33:13 +0200 Subject: [PATCH 04/22] Add latte::uniform for getting random numbers with uniform distribution --- Cargo.lock | 214 +++++++++++++++++------------------------------- Cargo.toml | 10 +-- README.md | 1 + src/config.rs | 14 +++- src/context.rs | 125 ++++++++++++++++++---------- src/report.rs | 8 +- src/stats.rs | 2 - src/workload.rs | 7 +- 8 files changed, 188 insertions(+), 193 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c93b1a7..d00b27a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -28,6 +28,18 @@ dependencies = [ "version_check", ] +[[package]] +name = "ahash" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.2" @@ -37,6 +49,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" + [[package]] name = "android-tzdata" version = "0.1.1" @@ -182,6 +200,12 @@ version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "bitflags" version = "1.3.2" @@ -282,7 +306,7 @@ version = "4.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "307bc0538d5f0f83b8248db3087aa92fe504e4691294d0c96c0eabc33f47ba47" dependencies = [ - "heck 0.4.1", + "heck", "proc-macro2", "quote", "syn 2.0.50", @@ -590,12 +614,6 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" -[[package]] -name = "equivalent" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" - [[package]] name = "err-derive" version = "0.3.1" @@ -834,7 +852,7 @@ version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" dependencies = [ - "ahash", + "ahash 0.7.8", "serde", ] @@ -843,6 +861,10 @@ name = "hashbrown" version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +dependencies = [ + "ahash 0.8.11", + "allocator-api2", +] [[package]] name = "hdrhistogram" @@ -850,7 +872,7 @@ version = "7.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d" dependencies = [ - "base64", + "base64 0.21.7", "byteorder", "crossbeam-channel", "flate2", @@ -858,15 +880,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "heck" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c" -dependencies = [ - "unicode-segmentation", -] - [[package]] name = "heck" version = "0.4.1" @@ -950,16 +963,6 @@ dependencies = [ "png", ] -[[package]] -name = "indexmap" -version = "2.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "233cf39063f058ea2caae4091bf4a3ef70a653afbc026f5c4a4135d114e3c177" -dependencies = [ - "equivalent", - "hashbrown 0.14.3", -] - [[package]] name = "itertools" version = "0.11.0" @@ -971,9 +974,9 @@ dependencies = [ [[package]] name = "itertools" -version = "0.12.1" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" dependencies = [ "either", ] @@ -1027,10 +1030,10 @@ dependencies = [ [[package]] name = "latte-cli" -version = "0.25.0" +version = "0.26.0" dependencies = [ "anyhow", - "base64", + "base64 0.22.1", "chrono", "clap", "console", @@ -1040,7 +1043,7 @@ dependencies = [ "futures", "hdrhistogram", "hytra", - "itertools 0.12.1", + "itertools 0.13.0", "jemallocator", "lazy_static", "metrohash", @@ -1061,8 +1064,8 @@ dependencies = [ "serde_json", "statrs", "status-line", - "strum 0.26.1", - "strum_macros 0.26.1", + "strum", + "strum_macros", "thiserror", "time", "tokio", @@ -1194,9 +1197,9 @@ dependencies = [ [[package]] name = "nalgebra" -version = "0.29.0" +version = "0.32.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d506eb7e08d6329505faa8a3a00a5dcc6de9f76e0c77e4b75763ae3c770831ff" +checksum = "7b5c17de023a86f59ed79891b2e5d5a94c705dbe904a5b5c9c952ea6221b03e4" dependencies = [ "approx", "matrixmultiply", @@ -1212,9 +1215,9 @@ dependencies = [ [[package]] name = "nalgebra-macros" -version = "0.1.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01fcc0b8149b4632adc89ac3b7b31a12fb6099a0317a4eb2ebff574ef7de7218" +checksum = "91761aed67d03ad966ef783ae962ef9bbaca728d2dd7ceb7939ec110fffad998" dependencies = [ "proc-macro2", "quote", @@ -1391,27 +1394,6 @@ dependencies = [ "libc", ] -[[package]] -name = "num_enum" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a015b430d3c108a207fd776d2e2196aaf8b1cf8cf93253e3a097ff3085076a1" -dependencies = [ - "num_enum_derive", -] - -[[package]] -name = "num_enum_derive" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96667db765a921f7b295ffee8b60472b686a51d4f21c2ee4ffdb94c7013b65a6" -dependencies = [ - "proc-macro-crate", - "proc-macro2", - "quote", - "syn 2.0.50", -] - [[package]] name = "object" version = "0.32.2" @@ -1639,16 +1621,6 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" -[[package]] -name = "proc-macro-crate" -version = "1.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f4c021e1093a56626774e81216a4ce732a735e5bad4868a03f3ed65ca0c3919" -dependencies = [ - "once_cell", - "toml_edit", -] - [[package]] name = "proc-macro-error" version = "1.0.4" @@ -1962,9 +1934,9 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "scylla" -version = "0.12.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03d2db76aa23f55d2ece5354e1a3778633098a3d1ea76153f494d71e92cd02d8" +checksum = "9439d92eea9f86c07175c819c3a129ca28b02477b47df26db354a1f4ea7ee276" dependencies = [ "arc-swap", "async-trait", @@ -1973,10 +1945,11 @@ dependencies = [ "chrono", "dashmap", "futures", + "hashbrown 0.14.3", "histogram", "itertools 0.11.0", + "lazy_static", "lz4_flex", - "num_enum", "openssl", "rand", "rand_pcg", @@ -1985,8 +1958,6 @@ dependencies = [ "smallvec", "snap", "socket2", - "strum 0.23.0", - "strum_macros 0.23.1", "thiserror", "tokio", "tokio-openssl", @@ -1996,15 +1967,14 @@ dependencies = [ [[package]] name = "scylla-cql" -version = "0.1.0" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "345626c0dd5d9624c413daaba854685bba6a65cff4eb5ea0fb0366df16901f67" +checksum = "64037fb9d9c59ae15137fff9a56c4d528908dfd38d09e75b5f8e56e3894966dd" dependencies = [ "async-trait", "byteorder", "bytes", "lz4_flex", - "num_enum", "scylla-macros", "snap", "thiserror", @@ -2014,9 +1984,9 @@ dependencies = [ [[package]] name = "scylla-macros" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb6085ff9c3fd7e5163826901d39164ab86f11bdca16b2f766a00c528ff9cef9" +checksum = "7e5fe1d389adebe6a1a27bce18b81a65ff18c25d58a795de490e18b0e7a27b9f" dependencies = [ "darling", "proc-macro2", @@ -2098,9 +2068,9 @@ dependencies = [ [[package]] name = "simba" -version = "0.6.0" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0b7840f121a46d63066ee7a99fc81dcabbc6105e437cae43528cea199b5a05f" +checksum = "061507c94fc6ab4ba1c9a0305018408e312e17c041eb63bef8aa726fa33aceae" dependencies = [ "approx", "num-complex 0.4.5", @@ -2157,12 +2127,11 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" [[package]] name = "statrs" -version = "0.16.0" +version = "0.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d08e5e1748192713cc281da8b16924fb46be7b0c2431854eadc785823e5696e" +checksum = "f697a07e4606a0a25c044de247e583a330dbb1731d11bc7350b81f48ad567255" dependencies = [ "approx", - "lazy_static", "nalgebra", "num-traits", "rand", @@ -2190,32 +2159,13 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ee073c9e4cd00e28217186dbe12796d692868f432bf2e97ee73bed0c56dfa01" -[[package]] -name = "strum" -version = "0.23.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cae14b91c7d11c9a851d3fbc80a963198998c2a64eec840477fa92d8ce9b70bb" - [[package]] name = "strum" version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "723b93e8addf9aa965ebe2d11da6d7540fa2283fcea14b3371ff055f7ba13f5f" dependencies = [ - "strum_macros 0.26.1", -] - -[[package]] -name = "strum_macros" -version = "0.23.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5bb0dc7ee9c15cea6199cde9a127fa16a4c5819af85395457ad72d68edc85a38" -dependencies = [ - "heck 0.3.3", - "proc-macro2", - "quote", - "rustversion", - "syn 1.0.109", + "strum_macros", ] [[package]] @@ -2224,7 +2174,7 @@ version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a3417fc93d76740d974a01654a09777cb500428cc874ca9f45edfe0c4d4cd18" dependencies = [ - "heck 0.4.1", + "heck", "proc-macro2", "quote", "rustversion", @@ -2375,23 +2325,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "toml_datetime" -version = "0.6.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3550f4e9685620ac18a50ed434eb3aec30db8ba93b0287467bca5826ea25baf1" - -[[package]] -name = "toml_edit" -version = "0.19.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" -dependencies = [ - "indexmap", - "toml_datetime", - "winnow", -] - [[package]] name = "tracing" version = "0.1.40" @@ -2483,12 +2416,6 @@ version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" -[[package]] -name = "unicode-segmentation" -version = "1.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202" - [[package]] name = "unicode-width" version = "0.1.11" @@ -2802,15 +2729,6 @@ version = "0.52.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0770833d60a970638e989b3fa9fd2bb1aaadcf88963d1659fd7d9990196ed2d6" -[[package]] -name = "winnow" -version = "0.5.40" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f593a95398737aeed53e489c785df13f3618e41dbcd6718c6addbf1395aa6876" -dependencies = [ - "memchr", -] - [[package]] name = "wio" version = "0.2.2" @@ -2831,3 +2749,23 @@ dependencies = [ "once_cell", "pkg-config", ] + +[[package]] +name = "zerocopy" +version = "0.7.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae87e3fcd617500e5d106f0380cf7b77f3c6092aae37191433159dda23cfb087" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15e934569e47891f7d9411f1a451d947a60e000ab3bd24fbb970f000387d1b3b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.50", +] diff --git a/Cargo.toml b/Cargo.toml index b5f2f15..f969d62 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "latte-cli" description = "A database benchmarking tool for Apache Cassandra" -version = "0.25.0" +version = "0.26.0" authors = ["Piotr Kołaczkowski "] edition = "2021" readme = "README.md" @@ -14,7 +14,7 @@ path = "src/main.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] anyhow = "1.0" -base64 = "0.21" +base64 = "0.22" rmp = "0.8.10" rmp-serde = "1.0.0-beta.2" chrono = { version = "0.4.18", features = ["serde"] } @@ -26,7 +26,7 @@ err-derive = "0.3" futures = "0.3" hdrhistogram = "7.1.0" hytra = "0.1.2" -itertools = "0.12" +itertools = "0.13" jemallocator = "0.5" lazy_static = "1.4.0" metrohash = "1.0" @@ -39,11 +39,11 @@ rand = "0.8" regex = "1.5" rune = "0.12" rust-embed = "8" -scylla = { version = "0.12", features = ["ssl"] } +scylla = { version = "0.13", features = ["ssl"] } search_path = "0.1" serde = { version = "1.0.116", features = ["derive"] } serde_json = "1.0.57" -statrs = "0.16" +statrs = "0.17" status-line = "0.2.0" strum = { version = "0.26", features = ["derive"] } strum_macros = "0.26" diff --git a/README.md b/README.md index 84d89d3..90b1901 100644 --- a/README.md +++ b/README.md @@ -209,6 +209,7 @@ are pure, i.e. invoking them multiple times with the same parameters yields alwa - `latte::hash_select(i, vector)` – selects an item from a vector based on a hash - `latte::blob(i, len)` – generates a random binary blob of length `len` - `latte::normal(i, mean, std_dev)` – generates a floating point number from a normal distribution +- `latte::uniform(i, min, max)` – generates a floating point number from a uniform distribution #### Numeric conversions diff --git a/src/config.rs b/src/config.rs index 7799109..746c4d2 100644 --- a/src/config.rs +++ b/src/config.rs @@ -136,7 +136,8 @@ impl FromStr for RetryInterval { Err(concat!( "Expected 1 or 2 parts separated by comma such as '500ms' or '200ms,5s' or '1s'.", " First value cannot be bigger than second one.", - ).to_string()) + ) + .to_string()) } } } @@ -190,9 +191,12 @@ pub struct ConnectionConf { #[clap(long("retry-number"), default_value = "10", value_name = "COUNT")] pub retry_number: u64, - #[clap(long("retry-interval"), default_value = "100ms,5s", value_name = "TIME[,TIME]")] + #[clap( + long("retry-interval"), + default_value = "100ms,5s", + value_name = "TIME[,TIME]" + )] pub retry_interval: RetryInterval, - } #[derive(Clone, Copy, Default, Debug, Eq, PartialEq, Serialize, Deserialize)] @@ -544,6 +548,7 @@ pub struct AppConfig { } #[derive(Debug, Deserialize, Default)] +#[allow(unused)] pub struct SchemaConfig { #[serde(default)] pub script: Vec, @@ -552,6 +557,7 @@ pub struct SchemaConfig { } #[derive(Debug, Deserialize)] +#[allow(unused)] pub struct LoadConfig { pub count: u64, #[serde(default)] @@ -567,6 +573,7 @@ mod defaults { } #[derive(Debug, Deserialize)] +#[allow(unused)] pub struct RunConfig { #[serde(default = "defaults::ratio")] pub ratio: f64, @@ -577,6 +584,7 @@ pub struct RunConfig { } #[derive(Debug, Deserialize)] +#[allow(unused)] pub struct WorkloadConfig { #[serde(default)] pub schema: SchemaConfig, diff --git a/src/context.rs b/src/context.rs index 86d0cf2..01b7c86 100644 --- a/src/context.rs +++ b/src/context.rs @@ -28,12 +28,12 @@ use scylla::prepared_statement::PreparedStatement; use scylla::transport::errors::{DbError, NewSessionError, QueryError}; use scylla::transport::session::PoolSize; use scylla::{ExecutionProfile, QueryResult, SessionBuilder}; -use statrs::distribution::Normal; +use statrs::distribution::{Normal, Uniform}; use tokio::time::{Duration, Instant}; use try_lock::TryLock; use uuid::{Variant, Version}; -use crate::config::{ConnectionConf, PRINT_RETRY_ERROR_LIMIT, RetryInterval}; +use crate::config::{ConnectionConf, RetryInterval, PRINT_RETRY_ERROR_LIMIT}; use crate::LatteError; fn ssl_context(conf: &&ConnectionConf) -> Result, CassError> { @@ -70,7 +70,11 @@ pub async fn connect(conf: &ConnectionConf) -> Result { .build() .await .map_err(|e| CassError(CassErrorKind::FailedToConnect(conf.addresses.clone(), e)))?; - Ok(Context::new(scylla_session, conf.retry_number, conf.retry_interval)) + Ok(Context::new( + scylla_session, + conf.retry_number, + conf.retry_interval, + )) } pub struct ClusterInfo { @@ -85,14 +89,18 @@ pub fn cql_value_obj_to_string(v: &CqlValue) -> String { // Replace big string- and bytes-alike object values with it's size labels CqlValue::Text(param) if param.len() > no_transformation_size_limit => { format!("Text(={})", param.len()) - }, + } CqlValue::Ascii(param) if param.len() > no_transformation_size_limit => { format!("Ascii(={})", param.len()) - }, + } CqlValue::Blob(param) if param.len() > no_transformation_size_limit => { format!("Blob(={})", param.len()) - }, - CqlValue::UserDefinedType { keyspace, type_name, fields } => { + } + CqlValue::UserDefinedType { + keyspace, + type_name, + fields, + } => { let mut result = format!( "UDT {{ keyspace: \"{}\", type_name: \"{}\", fields: [", keyspace, type_name, @@ -109,7 +117,7 @@ pub fn cql_value_obj_to_string(v: &CqlValue) -> String { } result.push_str(&format!("] }}")); result - }, + } CqlValue::List(elements) => { let mut result = String::from("List(["); for element in elements { @@ -122,7 +130,7 @@ pub fn cql_value_obj_to_string(v: &CqlValue) -> String { } result.push_str("])"); result - }, + } // TODO: cover 'CqlValue::Map' and 'CqlValue::Set' _ => format!("{v:?}"), } @@ -154,9 +162,10 @@ impl CassError { } fn query_retries_exceeded(retry_number: u64) -> CassError { - CassError(CassErrorKind::QueryRetriesExceeded( - format!("Max retry attempts ({}) reached", retry_number) - )) + CassError(CassErrorKind::QueryRetriesExceeded(format!( + "Max retry attempts ({}) reached", + retry_number + ))) } } @@ -317,13 +326,14 @@ impl Default for SessionStats { } } -pub fn get_expoinential_retry_interval(min_interval: u64, - max_interval: u64, - current_attempt_num: u64) -> u64 { +pub fn get_expoinential_retry_interval( + min_interval: u64, + max_interval: u64, + current_attempt_num: u64, +) -> u64 { let min_interval_float: f64 = min_interval as f64; - let mut current_interval: f64 = min_interval_float * ( - 2u64.pow((current_attempt_num - 1).try_into().unwrap_or(0)) as f64 - ); + let mut current_interval: f64 = + min_interval_float * (2u64.pow((current_attempt_num - 1).try_into().unwrap_or(0)) as f64); // Add jitter current_interval += rand::thread_rng().gen::() * min_interval_float; @@ -332,9 +342,15 @@ pub fn get_expoinential_retry_interval(min_interval: u64, std::cmp::min(current_interval as u64, max_interval as u64) as u64 } -pub async fn handle_retry_error(ctxt: &Context, current_attempt_num: u64, current_error: CassError) { +pub async fn handle_retry_error( + ctxt: &Context, + current_attempt_num: u64, + current_error: CassError, +) { let current_retry_interval = get_expoinential_retry_interval( - ctxt.retry_interval.min_ms, ctxt.retry_interval.max_ms, current_attempt_num, + ctxt.retry_interval.min_ms, + ctxt.retry_interval.max_ms, + current_attempt_num, ); let mut next_attempt_str = String::new(); @@ -345,7 +361,10 @@ pub async fn handle_retry_error(ctxt: &Context, current_attempt_num: u64, curren let err_msg = format!( "{}: [ERROR][Attempt {}/{}]{} {}", Utc::now().format("%Y-%m-%d %H:%M:%S%.3f").to_string(), - current_attempt_num, ctxt.retry_number, next_attempt_str, current_error, + current_attempt_num, + ctxt.retry_number, + next_attempt_str, + current_error, ); if !is_last_attempt { ctxt.stats.try_lock().unwrap().store_retry_error(err_msg); @@ -380,7 +399,11 @@ unsafe impl Send for Context {} unsafe impl Sync for Context {} impl Context { - pub fn new(session: scylla::Session, retry_number: u64, retry_interval: RetryInterval) -> Context { + pub fn new( + session: scylla::Session, + retry_number: u64, + retry_interval: RetryInterval, + ) -> Context { Context { session: Arc::new(session), statements: HashMap::new(), @@ -444,7 +467,7 @@ impl Context { /// Executes an ad-hoc CQL statement with no parameters. Does not prepare. pub async fn execute(&self, cql: &str) -> Result<(), CassError> { - for current_attempt_num in 0..self.retry_number+1 { + for current_attempt_num in 0..self.retry_number + 1 { let start_time = self.stats.try_lock().unwrap().start_request(); let rs = self.session.query(cql, ()).await; let duration = Instant::now() - start_time; @@ -453,12 +476,15 @@ impl Context { Err(e) => { let current_error = CassError::query_execution_error(cql, &[], e.clone()); handle_retry_error(self, current_attempt_num, current_error).await; - continue + continue; } } - self.stats.try_lock().unwrap().complete_request(duration, &rs); + self.stats + .try_lock() + .unwrap() + .complete_request(duration, &rs); rs.map_err(|e| CassError::query_execution_error(cql, &[], e.clone()))?; - return Ok(()) + return Ok(()); } Err(CassError::query_retries_exceeded(self.retry_number)) } @@ -470,7 +496,7 @@ impl Context { .get(key) .ok_or_else(|| CassError(CassErrorKind::PreparedStatementNotFound(key.to_string())))?; let params = bind::to_scylla_query_params(¶ms)?; - for current_attempt_num in 0..self.retry_number+1 { + for current_attempt_num in 0..self.retry_number + 1 { let start_time = self.stats.try_lock().unwrap().start_request(); let rs = self.session.execute(statement, params.clone()).await; let duration = Instant::now() - start_time; @@ -478,14 +504,21 @@ impl Context { Ok(_) => {} Err(e) => { let current_error = CassError::query_execution_error( - statement.get_statement(), ¶ms, e.clone() + statement.get_statement(), + ¶ms, + e.clone(), ); handle_retry_error(self, current_attempt_num, current_error).await; - continue + continue; } } - self.stats.try_lock().unwrap().complete_request(duration, &rs); - rs.map_err(|e| CassError::query_execution_error(statement.get_statement(), ¶ms, e))?; + self.stats + .try_lock() + .unwrap() + .complete_request(duration, &rs); + rs.map_err(|e| { + CassError::query_execution_error(statement.get_statement(), ¶ms, e) + })?; return Ok(()); } Err(CassError::query_retries_exceeded(self.retry_number)) @@ -546,21 +579,23 @@ mod bind { }; let keys = borrowed.keys(); - let values: Result>, _> = borrowed.values() - .map(|value| to_scylla_value(&value.clone()) - .map(Some)).collect(); - let fields: Vec<(String, Option)> = keys.into_iter() + let values: Result>, _> = borrowed + .values() + .map(|value| to_scylla_value(&value.clone()).map(Some)) + .collect(); + let fields: Vec<(String, Option)> = keys + .into_iter() .zip(values?.into_iter()) .filter(|&(key, _)| key != "_keyspace" && key != "_type_name") .map(|(key, value)| (key.to_string(), value)) .collect(); - let udt = CqlValue::UserDefinedType{ + let udt = CqlValue::UserDefinedType { keyspace: keyspace, type_name: type_name, fields: fields, }; Ok(udt) - }, + } Value::Any(obj) => { let obj = obj.borrow_ref().unwrap(); let h = obj.type_hash(); @@ -731,6 +766,12 @@ pub fn normal(i: i64, mean: f64, std_dev: f64) -> Result { Ok(distribution.sample(&mut rng)) } +pub fn uniform(i: i64, min: f64, max: f64) -> Result { + let mut rng = StdRng::seed_from_u64(i as u64); + let distribution = Uniform::new(min, max).map_err(|e| VmError::panic(format!("{e}")))?; + Ok(distribution.sample(&mut rng)) +} + /// Restricts a value to a certain interval unless it is NaN. pub fn clamp_float(value: f64, min: f64, max: f64) -> f64 { value.clamp(min, max) @@ -753,10 +794,12 @@ pub fn blob(seed: i64, len: usize) -> rune::runtime::Bytes { /// Parameter `seed` is used to seed the RNG. pub fn text(seed: i64, len: usize) -> rune::runtime::StaticString { let mut rng = StdRng::seed_from_u64(seed as u64); - let s: String = (0..len).map(|_| { - let code_point = rng.gen_range(0x0061u32..=0x007Au32); // Unicode range for 'a-z' - std::char::from_u32(code_point).unwrap() - }).collect(); + let s: String = (0..len) + .map(|_| { + let code_point = rng.gen_range(0x0061u32..=0x007Au32); // Unicode range for 'a-z' + std::char::from_u32(code_point).unwrap() + }) + .collect(); rune::runtime::StaticString::new(s) } diff --git a/src/report.rs b/src/report.rs index cb2407a..556ecf4 100644 --- a/src/report.rs +++ b/src/report.rs @@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize}; use statrs::statistics::Statistics; use strum::IntoEnumIterator; -use crate::config::{PRINT_RETRY_ERROR_LIMIT, RunCommand}; +use crate::config::{RunCommand, PRINT_RETRY_ERROR_LIMIT}; use crate::stats::{ BenchmarkCmp, BenchmarkStats, Bucket, Mean, Percentile, Sample, Significance, TimeDistribution, }; @@ -519,7 +519,7 @@ impl<'a> Display for RunConfigCmp<'a> { self.line("Request timeout", "", |conf| { Quantity::from(conf.connection.request_timeout) }), - self.line("Retries", "", |_| {Quantity::from("")}), + self.line("Retries", "", |_| Quantity::from("")), self.line("┌──────┴number", "", |conf| { Quantity::from(conf.connection.retry_number) }), @@ -553,7 +553,9 @@ impl Display for Sample { if num_of_printed_errors < PRINT_RETRY_ERROR_LIMIT { error_msg_bunch += &format!("{}\n", retry_error); num_of_printed_errors += 1; - } else { break } + } else { + break; + } } let num_of_dropped_errors = self.retry_error_count - num_of_printed_errors; if num_of_dropped_errors > 0 { diff --git a/src/stats.rs b/src/stats.rs index 0b1ef52..4326b21 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -560,7 +560,6 @@ pub struct Recorder { pub row_count: u64, pub cycle_times_ns: Histogram, pub resp_times_ns: Histogram, - pub queue_len_sum: u64, log: Log, rate_limit: Option, concurrency_limit: NonZeroUsize, @@ -590,7 +589,6 @@ impl Recorder { error_count: 0, cycle_times_ns: Histogram::new(3).unwrap(), resp_times_ns: Histogram::new(3).unwrap(), - queue_len_sum: 0, } } diff --git a/src/workload.rs b/src/workload.rs index ebefc0e..7aa8af2 100644 --- a/src/workload.rs +++ b/src/workload.rs @@ -129,7 +129,9 @@ impl Program { let mut latte_module = Module::with_crate("latte"); latte_module.function(&["blob"], context::blob).unwrap(); latte_module.function(&["text"], context::text).unwrap(); - latte_module.function(&["now_timestamp"], context::now_timestamp).unwrap(); + latte_module + .function(&["now_timestamp"], context::now_timestamp) + .unwrap(); latte_module.function(&["hash"], context::hash).unwrap(); latte_module.function(&["hash2"], context::hash2).unwrap(); latte_module @@ -142,6 +144,9 @@ impl Program { .function(&["uuid"], context::Uuid::new) .unwrap(); latte_module.function(&["normal"], context::normal).unwrap(); + latte_module + .function(&["uniform"], context::uniform) + .unwrap(); latte_module .macro_(&["param"], move |ctx, ts| context::param(ctx, ¶ms, ts)) .unwrap(); From b34863ce1ab2a7c59ab2cdbcabccf4b9b34cba78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ko=C5=82aczkowski?= Date: Thu, 20 Jun 2024 21:14:20 +0200 Subject: [PATCH 05/22] Fix clippy warnings --- src/config.rs | 2 +- src/context.rs | 36 ++++++++++++++++++------------------ src/report.rs | 2 +- 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/src/config.rs b/src/config.rs index 746c4d2..92b3d9f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -97,7 +97,7 @@ impl RetryInterval { if values.len() > 2 { return None; } - let min_ms = RetryInterval::parse_time(values.get(0).unwrap_or(&""))?; + let min_ms = RetryInterval::parse_time(values.first().unwrap_or(&""))?; let max_ms = RetryInterval::parse_time(values.get(1).unwrap_or(&"")).unwrap_or(min_ms); if min_ms > max_ms { None diff --git a/src/context.rs b/src/context.rs index 01b7c86..5e9176f 100644 --- a/src/context.rs +++ b/src/context.rs @@ -15,7 +15,7 @@ use openssl::error::ErrorStack; use openssl::ssl::{SslContext, SslContextBuilder, SslFiletype, SslMethod}; use rand::distributions::Distribution; use rand::rngs::StdRng; -use rand::{Rng, SeedableRng}; +use rand::{random, Rng, SeedableRng}; use rune::ast; use rune::ast::Kind; use rune::macros::{quote, MacroContext, TokenStream}; @@ -86,7 +86,7 @@ pub struct ClusterInfo { pub fn cql_value_obj_to_string(v: &CqlValue) -> String { let no_transformation_size_limit = 32; match v { - // Replace big string- and bytes-alike object values with it's size labels + // Replace big string- and bytes-alike object values with its size labels CqlValue::Text(param) if param.len() > no_transformation_size_limit => { format!("Text(={})", param.len()) } @@ -115,7 +115,7 @@ pub fn cql_value_obj_to_string(v: &CqlValue) -> String { if result.len() >= 2 { result.truncate(result.len() - 2); } - result.push_str(&format!("] }}")); + result.push_str("] }"); result } CqlValue::List(elements) => { @@ -147,7 +147,7 @@ impl CassError { fn query_execution_error(cql: &str, params: &[CqlValue], err: QueryError) -> CassError { let query = QueryInfo { cql: cql.to_string(), - params: params.iter().map(|v| cql_value_obj_to_string(v)).collect(), + params: params.iter().map(cql_value_obj_to_string).collect(), }; let kind = match err { QueryError::RequestTimeout(_) @@ -326,7 +326,7 @@ impl Default for SessionStats { } } -pub fn get_expoinential_retry_interval( +pub fn get_exponential_retry_interval( min_interval: u64, max_interval: u64, current_attempt_num: u64, @@ -336,10 +336,10 @@ pub fn get_expoinential_retry_interval( min_interval_float * (2u64.pow((current_attempt_num - 1).try_into().unwrap_or(0)) as f64); // Add jitter - current_interval += rand::thread_rng().gen::() * min_interval_float; + current_interval += random::() * min_interval_float; current_interval -= min_interval_float / 2.0; - std::cmp::min(current_interval as u64, max_interval as u64) as u64 + std::cmp::min(current_interval as u64, max_interval) } pub async fn handle_retry_error( @@ -347,7 +347,7 @@ pub async fn handle_retry_error( current_attempt_num: u64, current_error: CassError, ) { - let current_retry_interval = get_expoinential_retry_interval( + let current_retry_interval = get_exponential_retry_interval( ctxt.retry_interval.min_ms, ctxt.retry_interval.max_ms, current_attempt_num, @@ -356,11 +356,11 @@ pub async fn handle_retry_error( let mut next_attempt_str = String::new(); let is_last_attempt = current_attempt_num == ctxt.retry_number; if !is_last_attempt { - next_attempt_str += &format!("[Retry in {}ms]", current_retry_interval); + next_attempt_str += &format!("[Retry in {} ms]", current_retry_interval); } let err_msg = format!( "{}: [ERROR][Attempt {}/{}]{} {}", - Utc::now().format("%Y-%m-%d %H:%M:%S%.3f").to_string(), + Utc::now().format("%Y-%m-%d %H:%M:%S%.3f"), current_attempt_num, ctxt.retry_number, next_attempt_str, @@ -390,9 +390,9 @@ pub struct Context { } // Needed, because Rune `Value` is !Send, as it may contain some internal pointers. -// Therefore it is not safe to pass a `Value` to another thread by cloning it, because +// Therefore, it is not safe to pass a `Value` to another thread by cloning it, because // both objects could accidentally share some unprotected, `!Sync` data. -// To make it safe, the same `Context` is never used by more than one thread at once and +// To make it safe, the same `Context` is never used by more than one thread at once, and // we make sure in `clone` to make a deep copy of the `data` field by serializing // and deserializing it, so no pointers could get through. unsafe impl Send for Context {} @@ -408,8 +408,8 @@ impl Context { session: Arc::new(session), statements: HashMap::new(), stats: TryLock::new(SessionStats::new()), - retry_number: retry_number, - retry_interval: retry_interval, + retry_number, + retry_interval, load_cycle_count: 0, data: Value::Object(Shared::new(Object::new())), } @@ -585,14 +585,14 @@ mod bind { .collect(); let fields: Vec<(String, Option)> = keys .into_iter() - .zip(values?.into_iter()) + .zip(values?) .filter(|&(key, _)| key != "_keyspace" && key != "_type_name") .map(|(key, value)| (key.to_string(), value)) .collect(); let udt = CqlValue::UserDefinedType { - keyspace: keyspace, - type_name: type_name, - fields: fields, + keyspace, + type_name, + fields, }; Ok(udt) } diff --git a/src/report.rs b/src/report.rs index 556ecf4..bba0e85 100644 --- a/src/report.rs +++ b/src/report.rs @@ -564,7 +564,7 @@ impl Display for Sample { num_of_dropped_errors, ); } - eprintln!("{}", error_msg_bunch); + writeln!(f, "{}", error_msg_bunch)?; } write!( f, From 9777d962280f7986cf22be33598e3d6e6f7ff023 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ko=C5=82aczkowski?= Date: Thu, 20 Jun 2024 21:26:46 +0200 Subject: [PATCH 06/22] Change plot extension to svg Fixes #53 --- src/plot.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/plot.rs b/src/plot.rs index 85c2d00..2b4a7b8 100644 --- a/src/plot.rs +++ b/src/plot.rs @@ -144,7 +144,7 @@ pub async fn plot_graph(conf: PlotCommand) -> Result<()> { let output_path = conf .output - .unwrap_or(reports[0].conf.default_output_file_name("png")); + .unwrap_or(reports[0].conf.default_output_file_name("svg")); let root = SVGBackend::new(&output_path, (2000, 1000)).into_drawing_area(); root.fill(&WHITE).unwrap(); From 1ed0f187fa8a156a2dc2e0486d50492c71d03043 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ko=C5=82aczkowski?= Date: Fri, 21 Jun 2024 11:43:26 +0200 Subject: [PATCH 07/22] Fix int underflow in get_exponential_retry_interval --- src/context.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/context.rs b/src/context.rs index 5e9176f..970d0db 100644 --- a/src/context.rs +++ b/src/context.rs @@ -333,7 +333,7 @@ pub fn get_exponential_retry_interval( ) -> u64 { let min_interval_float: f64 = min_interval as f64; let mut current_interval: f64 = - min_interval_float * (2u64.pow((current_attempt_num - 1).try_into().unwrap_or(0)) as f64); + min_interval_float * (2u64.pow(current_attempt_num.try_into().unwrap_or(0)) as f64); // Add jitter current_interval += random::() * min_interval_float; From 48874128295003a62b59d015da75478878290f0e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ko=C5=82aczkowski?= Date: Fri, 21 Jun 2024 11:48:15 +0200 Subject: [PATCH 08/22] Print floating point script parameters --- src/config.rs | 4 ++-- src/report.rs | 64 +++++++++++++++++++++++++++++++++------------------ 2 files changed, 43 insertions(+), 25 deletions(-) diff --git a/src/config.rs b/src/config.rs index 92b3d9f..e8d42df 100644 --- a/src/config.rs +++ b/src/config.rs @@ -419,8 +419,8 @@ impl RunCommand { } /// Returns the value of parameter under given key. - /// If key doesn't exist, or parameter is not an integer, returns `None`. - pub fn get_param(&self, key: &str) -> Option { + /// If key doesn't exist, or parameter is not a number, returns `None`. + pub fn get_param(&self, key: &str) -> Option { self.params .iter() .find(|(k, _)| k == key) diff --git a/src/report.rs b/src/report.rs index bba0e85..b4c1393 100644 --- a/src/report.rs +++ b/src/report.rs @@ -71,7 +71,7 @@ impl Report { pub struct Quantity { pub value: Option, pub error: Option, - pub precision: usize, + pub precision: Option, } impl Quantity { @@ -79,12 +79,12 @@ impl Quantity { Quantity { value, error: None, - precision: 0, + precision: None, } } pub fn with_precision(mut self, precision: usize) -> Self { - self.precision = precision; + self.precision = Some(precision); self } @@ -96,9 +96,10 @@ impl Quantity { impl Quantity { fn format_error(&self) -> String { + let prec = self.precision.unwrap_or_default(); match &self.error { None => "".to_owned(), - Some(e) => format!("± {:<6.prec$}", e, prec = self.precision), + Some(e) => format!("± {:<6.prec$}", e, prec = prec), } } } @@ -142,13 +143,19 @@ impl From<&Option> for Quantity { impl Display for Quantity { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - match &self.value { - None => write!(f, "{}", " ".repeat(18)), - Some(v) => write!( + match (&self.value, self.precision) { + (None, _) => write!(f, "{}", " ".repeat(18)), + (Some(v), None) => write!( + f, + "{value:9} {error:8}", + value = style(v).bright().for_stdout(), + error = style(self.format_error()).dim().for_stdout(), + ), + (Some(v), Some(prec)) => write!( f, "{value:9.prec$} {error:8}", value = style(v).bright().for_stdout(), - prec = self.precision, + prec = prec, error = style(self.format_error()).dim().for_stdout(), ), } @@ -636,25 +643,36 @@ impl<'a> Display for BenchmarkCmp<'a> { }), self.line("Samples", "", |s| Quantity::from(s.log.len())), self.line("Mean sample size", "op", |s| { - Quantity::from(s.log.iter().map(|s| s.cycle_count as f64).mean()) + Quantity::from(s.log.iter().map(|s| s.cycle_count as f64).mean()).with_precision(0) }), self.line("└─", "req", |s| { Quantity::from(s.log.iter().map(|s| s.request_count as f64).mean()) + .with_precision(0) + }), + self.line("Concurrency", "req", |s| { + Quantity::from(s.concurrency).with_precision(0) + }), + self.line("└─", "%", |s| { + Quantity::from(s.concurrency_ratio).with_precision(0) }), - self.line("Concurrency", "req", |s| Quantity::from(s.concurrency)), - self.line("└─", "%", |s| Quantity::from(s.concurrency_ratio)), - self.line("Throughput", "op/s", |s| Quantity::from(s.cycle_throughput)) - .with_significance(self.cmp_cycle_throughput()) - .with_orientation(1) - .into_box(), - self.line("├─", "req/s", |s| Quantity::from(s.req_throughput)) - .with_significance(self.cmp_req_throughput()) - .with_orientation(1) - .into_box(), - self.line("└─", "row/s", |s| Quantity::from(s.row_throughput)) - .with_significance(self.cmp_row_throughput()) - .with_orientation(1) - .into_box(), + self.line("Throughput", "op/s", |s| { + Quantity::from(s.cycle_throughput).with_precision(0) + }) + .with_significance(self.cmp_cycle_throughput()) + .with_orientation(1) + .into_box(), + self.line("├─", "req/s", |s| { + Quantity::from(s.req_throughput).with_precision(0) + }) + .with_significance(self.cmp_req_throughput()) + .with_orientation(1) + .into_box(), + self.line("└─", "row/s", |s| { + Quantity::from(s.row_throughput).with_precision(0) + }) + .with_significance(self.cmp_row_throughput()) + .with_orientation(1) + .into_box(), self.line("Mean cycle time", "ms", |s| { Quantity::from(&s.cycle_time_ms).with_precision(3) }) From 206d6ee5c08eb0576a0d01a545b4638ac33faaae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ko=C5=82aczkowski?= Date: Fri, 21 Jun 2024 13:57:25 +0200 Subject: [PATCH 09/22] Replace deprecated code --- src/report.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/report.rs b/src/report.rs index b4c1393..cf6a7fc 100644 --- a/src/report.rs +++ b/src/report.rs @@ -5,7 +5,7 @@ use std::num::NonZeroUsize; use std::path::Path; use std::{fs, io}; -use chrono::{Local, NaiveDateTime, TimeZone}; +use chrono::{Local, TimeZone}; use console::{pad_str, style, Alignment}; use err_derive::*; use itertools::Itertools; @@ -438,8 +438,10 @@ impl RunConfigCmp<'_> { fn format_time(&self, conf: &RunCommand, format: &str) -> String { conf.timestamp .and_then(|ts| { - NaiveDateTime::from_timestamp_opt(ts, 0) - .map(|utc| Local.from_utc_datetime(&utc).format(format).to_string()) + Local + .timestamp_opt(ts, 0) + .latest() + .map(|l| l.format(format).to_string()) }) .unwrap_or_default() } From a8af4021f6830f5fc4c11b7eee9e7c78965c992a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ko=C5=82aczkowski?= Date: Fri, 21 Jun 2024 14:49:03 +0200 Subject: [PATCH 10/22] Make interrupting by Ctrl-C more reliable The retrying logic made Ctrl-C handling a bit broken, because the streams could be terminated only at the complete cycles. If a query retries for a long time it prevented terminating the program. This commit implements interruption logic in a different way, by using tokio::signal and tokio::select! so all streams are terminated immediately, regardless of their progress. --- Cargo.lock | 40 +++++++++++----------------------------- Cargo.toml | 3 +-- src/error.rs | 3 ++- src/exec.rs | 48 +++++++++++++++++++++--------------------------- src/histogram.rs | 1 + src/interrupt.rs | 21 --------------------- src/main.rs | 23 +++++++++-------------- src/stats.rs | 8 ++++---- 8 files changed, 49 insertions(+), 98 deletions(-) delete mode 100644 src/interrupt.rs diff --git a/Cargo.lock b/Cargo.lock index d00b27a..7348958 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -483,16 +483,6 @@ dependencies = [ "typenum", ] -[[package]] -name = "ctrlc" -version = "3.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b467862cc8610ca6fc9a1532d7777cee0804e678ab45410897b9396495994a0b" -dependencies = [ - "nix", - "windows-sys 0.52.0", -] - [[package]] name = "darling" version = "0.20.8" @@ -1030,7 +1020,7 @@ dependencies = [ [[package]] name = "latte-cli" -version = "0.26.0" +version = "0.27.0" dependencies = [ "anyhow", "base64 0.22.1", @@ -1038,7 +1028,6 @@ dependencies = [ "clap", "console", "cpu-time", - "ctrlc", "err-derive", "futures", "hdrhistogram", @@ -1224,17 +1213,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "nix" -version = "0.27.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053" -dependencies = [ - "bitflags 2.4.2", - "cfg-if", - "libc", -] - [[package]] name = "nom" version = "7.1.3" @@ -1935,8 +1913,6 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "scylla" version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9439d92eea9f86c07175c819c3a129ca28b02477b47df26db354a1f4ea7ee276" dependencies = [ "arc-swap", "async-trait", @@ -1968,8 +1944,6 @@ dependencies = [ [[package]] name = "scylla-cql" version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64037fb9d9c59ae15137fff9a56c4d528908dfd38d09e75b5f8e56e3894966dd" dependencies = [ "async-trait", "byteorder", @@ -1985,8 +1959,6 @@ dependencies = [ [[package]] name = "scylla-macros" version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e5fe1d389adebe6a1a27bce18b81a65ff18c25d58a795de490e18b0e7a27b9f" dependencies = [ "darling", "proc-macro2", @@ -2066,6 +2038,15 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "signal-hook-registry" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" +dependencies = [ + "libc", +] + [[package]] name = "simba" version = "0.8.1" @@ -2286,6 +2267,7 @@ dependencies = [ "num_cpus", "parking_lot", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.48.0", diff --git a/Cargo.toml b/Cargo.toml index f969d62..b27e7ce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,6 @@ chrono = { version = "0.4.18", features = ["serde"] } clap = { version = "4", features = ["derive", "cargo", "env"] } console = "0.15.0" cpu-time = "1.0.0" -ctrlc = "3.2.1" err-derive = "0.3" futures = "0.3" hdrhistogram = "7.1.0" @@ -49,7 +48,7 @@ strum = { version = "0.26", features = ["derive"] } strum_macros = "0.26" time = "0.3" thiserror = "1.0.26" -tokio = { version = "1", features = ["rt", "rt-multi-thread", "time", "parking_lot"] } +tokio = { version = "1", features = ["rt", "rt-multi-thread", "time", "parking_lot", "signal"] } tokio-stream = "0.1" tracing = "0.1" tracing-subscriber = "0.3" diff --git a/src/error.rs b/src/error.rs index 88e8e8e..5e7bbb4 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,4 +1,5 @@ use crate::context::CassError; +use crate::stats::BenchmarkStats; use err_derive::*; use hdrhistogram::serialization::interval_log::IntervalLogWriterError; use hdrhistogram::serialization::V2DeflateSerializeError; @@ -37,7 +38,7 @@ pub enum LatteError { HdrLogWrite(#[source] IntervalLogWriterError), #[error(display = "Interrupted")] - Interrupted, + Interrupted(Box), } pub type Result = std::result::Result; diff --git a/src/exec.rs b/src/exec.rs index 7097b9e..ef2a2ec 100644 --- a/src/exec.rs +++ b/src/exec.rs @@ -11,10 +11,10 @@ use std::sync::Arc; use std::time::Instant; use tokio_stream::wrappers::IntervalStream; -use crate::error::Result; +use crate::error::{LatteError, Result}; use crate::{ - BenchmarkStats, BoundedCycleCounter, InterruptHandler, Interval, Progress, Recorder, Sampler, - Workload, WorkloadStats, + BenchmarkStats, BoundedCycleCounter, Interval, Progress, Recorder, Sampler, Workload, + WorkloadStats, }; /// Returns a stream emitting `rate` events per second. @@ -43,7 +43,6 @@ async fn run_stream( cycle_counter: BoundedCycleCounter, concurrency: NonZeroUsize, sampling: Interval, - interrupt: Arc, progress: Arc>, mut out: Sender>, ) { @@ -68,9 +67,6 @@ async fn run_stream( return; } } - if interrupt.is_interrupted() { - break; - } } // Send the statistics of remaining requests sampler.finish().await; @@ -88,7 +84,6 @@ fn spawn_stream( sampling: Interval, workload: Workload, iter_counter: BoundedCycleCounter, - interrupt: Arc, progress: Arc>, ) -> Receiver> { let (tx, rx) = channel(1); @@ -103,7 +98,6 @@ fn spawn_stream( iter_counter, concurrency, sampling, - interrupt, progress, tx, ) @@ -117,7 +111,6 @@ fn spawn_stream( iter_counter, concurrency, sampling, - interrupt, progress, tx, ) @@ -169,7 +162,6 @@ pub async fn par_execute( exec_options: &ExecutionOptions, sampling: Interval, workload: Workload, - signals: Arc, show_progress: bool, ) -> Result { let thread_count = exec_options.threads.get(); @@ -196,29 +188,31 @@ pub async fn par_execute( sampling, workload.clone()?, deadline.share(), - signals.clone(), progress.clone(), ); streams.push(s); } loop { - let partial_stats: Vec<_> = receive_one_of_each(&mut streams) - .await - .into_iter() - .try_collect()?; - - if partial_stats.is_empty() { - break; - } + tokio::select! { + partial_stats = receive_one_of_each(&mut streams) => { + let partial_stats: Vec<_> = partial_stats.into_iter().try_collect()?; + if partial_stats.is_empty() { + break Ok(stats.finish()); + } + + let aggregate = stats.record(&partial_stats); + if sampling.is_bounded() { + progress.set_visible(false); + println!("{aggregate}"); + progress.set_visible(show_progress); + } + } - let aggregate = stats.record(&partial_stats); - if sampling.is_bounded() { - progress.set_visible(false); - println!("{aggregate}"); - progress.set_visible(show_progress); + _ = tokio::signal::ctrl_c() => { + progress.set_visible(false); + break Err(LatteError::Interrupted(Box::new(stats.finish()))); + } } } - - Ok(stats.finish()) } diff --git a/src/histogram.rs b/src/histogram.rs index ba90630..6a320a9 100644 --- a/src/histogram.rs +++ b/src/histogram.rs @@ -9,6 +9,7 @@ use serde::{Deserialize, Deserializer, Serialize}; /// A wrapper for HDR histogram that allows us to serialize/deserialize it to/from /// a base64 encoded string we can store in JSON report. +#[derive(Debug)] pub struct SerializableHistogram(pub Histogram); impl Serialize for SerializableHistogram { diff --git a/src/interrupt.rs b/src/interrupt.rs deleted file mode 100644 index 9ce27dd..0000000 --- a/src/interrupt.rs +++ /dev/null @@ -1,21 +0,0 @@ -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; - -/// Notifies about received Ctrl-C signal -pub struct InterruptHandler { - interrupted: Arc, -} - -impl InterruptHandler { - pub fn install() -> InterruptHandler { - let cell = Arc::new(AtomicBool::new(false)); - let cell_ref = cell.clone(); - let _ = ctrlc::set_handler(move || cell_ref.store(true, Ordering::Relaxed)); - InterruptHandler { interrupted: cell } - } - - /// Returns true if Ctrl-C was pressed - pub fn is_interrupted(&self) -> bool { - self.interrupted.load(Ordering::Relaxed) - } -} diff --git a/src/main.rs b/src/main.rs index dfa2e3f..1f0691f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,7 +2,6 @@ use std::fs::File; use std::io::{stdout, Write}; use std::path::{Path, PathBuf}; use std::process::exit; -use std::sync::Arc; use std::time::Duration; use clap::Parser; @@ -24,7 +23,6 @@ use crate::context::{CassError, CassErrorKind, Context, SessionStats}; use crate::cycle::BoundedCycleCounter; use crate::error::{LatteError, Result}; use crate::exec::{par_execute, ExecutionOptions}; -use crate::interrupt::InterruptHandler; use crate::plot::plot_graph; use crate::progress::Progress; use crate::report::{Report, RunConfigCmp}; @@ -38,7 +36,6 @@ mod cycle; mod error; mod exec; mod histogram; -mod interrupt; mod plot; mod progress; mod report; @@ -170,7 +167,6 @@ async fn load(conf: LoadCommand) -> Result<()> { } } - let interrupt = Arc::new(InterruptHandler::install()); eprintln!("info: Loading data..."); let loader = Workload::new(session.clone()?, program.clone(), FnRef::new(LOAD_FN)); let load_options = ExecutionOptions { @@ -184,7 +180,6 @@ async fn load(conf: LoadCommand) -> Result<()> { &load_options, config::Interval::Unbounded, loader, - interrupt.clone(), !conf.quiet, ) .await?; @@ -228,7 +223,6 @@ async fn run(conf: RunCommand) -> Result<()> { } let runner = Workload::new(session.clone()?, program.clone(), function); - let interrupt = Arc::new(InterruptHandler::install()); if conf.warmup_duration.is_not_zero() { eprintln!("info: Warming up..."); let warmup_options = ExecutionOptions { @@ -242,16 +236,11 @@ async fn run(conf: RunCommand) -> Result<()> { &warmup_options, Interval::Unbounded, runner.clone()?, - interrupt.clone(), !conf.quiet, ) .await?; } - if interrupt.is_interrupted() { - return Err(LatteError::Interrupted); - } - eprintln!("info: Running benchmark..."); println!( @@ -270,15 +259,21 @@ async fn run(conf: RunCommand) -> Result<()> { }; report::print_log_header(); - let stats = par_execute( + let stats = match par_execute( "Running...", &exec_options, conf.sampling_interval, runner, - interrupt.clone(), !conf.quiet, ) - .await?; + .await + { + Ok(stats) => stats, + Err(LatteError::Interrupted(stats)) => *stats, + Err(e) => { + return Err(e); + } + }; let stats_cmp = BenchmarkCmp { v1: &stats, diff --git a/src/stats.rs b/src/stats.rs index 4326b21..91d7b42 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -259,7 +259,7 @@ impl Percentile { } /// Records basic statistics for a sample (a group) of requests -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Debug)] pub struct Sample { pub time_s: f32, pub duration_s: f32, @@ -442,7 +442,7 @@ impl Log { } } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Debug)] pub struct Bucket { pub percentile: f64, pub duration_ms: f64, @@ -450,7 +450,7 @@ pub struct Bucket { pub cumulative_count: u64, } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Debug)] pub struct TimeDistribution { pub mean: Mean, pub percentiles: Vec, @@ -458,7 +458,7 @@ pub struct TimeDistribution { } /// Stores the final statistics of the test run. -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Debug)] pub struct BenchmarkStats { pub start_time: DateTime, pub end_time: DateTime, From 75b98fafcb09b78d26b26f2453b6f8ef19dce17e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ko=C5=82aczkowski?= Date: Mon, 24 Jun 2024 09:41:44 +0200 Subject: [PATCH 11/22] Add support for creating 32-bit float values --- README.md | 7 +++++-- src/context.rs | 15 +++++++++++++++ src/workload.rs | 5 +++++ 3 files changed, 25 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 90b1901..6ba53db 100644 --- a/README.md +++ b/README.md @@ -214,13 +214,16 @@ are pure, i.e. invoking them multiple times with the same parameters yields alwa #### Numeric conversions Rune represents integers as 64-bit signed values. Therefore, it is possible to directly pass a Rune integer to -a Cassandra column of type `bigint`. However, binding a 64-bit value to smaller integer column types, like -`int`, `smallint` or `tinyint` will result in a runtime error. As long as an integer value does not exceed the bounds, +a Cassandra column of type `bigint`, and to pass a Rune float to a Cassandra column of type `double`. +However, binding a 64-bit value to smaller integer or float column types, like +`int`, `smallint`, `tinyint` or `float` will result in a runtime error. +As long as an integer value does not exceed the bounds, you can convert it to smaller signed integer types by using the following instance functions: - `x.to_i32()` – converts a float or integer to a 32-bit signed integer, compatible with Cassandra `int` type - `x.to_i16()` – converts a float or integer to a 16-bit signed integer, compatible with Cassandra `smallint` type - `x.to_i8()` – converts a float or integer to an 8-bit signed integer, compatible with Cassandra `tinyint` type +- `x.to_f32()` - converts a float or integer value to a 32-bit float, compatible with Cassandra `float` type - `x.clamp(min, max)` – restricts the range of an integer or a float value to given range You can also convert between floats and integers by calling `to_integer` or `to_float` instance functions. diff --git a/src/context.rs b/src/context.rs index 970d0db..7556524 100644 --- a/src/context.rs +++ b/src/context.rs @@ -495,6 +495,7 @@ impl Context { .statements .get(key) .ok_or_else(|| CassError(CassErrorKind::PreparedStatementNotFound(key.to_string())))?; + let params = bind::to_scylla_query_params(¶ms)?; for current_attempt_num in 0..self.retry_number + 1 { let start_time = self.stats.try_lock().unwrap().start_request(); @@ -611,6 +612,9 @@ mod bind { } else if h == Int8::type_hash() { let int8: &Int8 = obj.downcast_borrow_ref().unwrap(); Ok(CqlValue::TinyInt(int8.0)) + } else if h == Float32::type_hash() { + let float32: &Float32 = obj.downcast_borrow_ref().unwrap(); + Ok(CqlValue::Float(float32.0)) } else { Err(CassError(CassErrorKind::UnsupportedType( v.type_info().unwrap(), @@ -684,6 +688,9 @@ pub struct Int16(pub i16); #[derive(Clone, Debug, Any)] pub struct Int32(pub i32); +#[derive(Clone, Debug, Any)] +pub struct Float32(pub f32); + /// Returns the literal value stored in the `params` map under the key given as the first /// macro arg, and if not found, returns the expression from the second arg. pub fn param( @@ -737,6 +744,14 @@ pub fn float_to_i32(value: f64) -> Option { int_to_i32(value as i64) } +pub fn int_to_f32(value: i64) -> Option { + Some(Float32(value as f32)) +} + +pub fn float_to_f32(value: f64) -> Option { + Some(Float32(value as f32)) +} + /// Computes a hash of an integer value `i`. /// Returns a value in range `0..i64::MAX`. pub fn hash(i: i64) -> i64 { diff --git a/src/workload.rs b/src/workload.rs index 7aa8af2..834ebc7 100644 --- a/src/workload.rs +++ b/src/workload.rs @@ -161,6 +161,11 @@ impl Program { .unwrap(); latte_module.inst_fn("to_i8", context::int_to_i8).unwrap(); latte_module.inst_fn("to_i8", context::float_to_i8).unwrap(); + latte_module.inst_fn("to_f32", context::int_to_f32).unwrap(); + latte_module + .inst_fn("to_f32", context::float_to_f32) + .unwrap(); + latte_module.inst_fn("clamp", context::clamp_float).unwrap(); latte_module.inst_fn("clamp", context::clamp_int).unwrap(); From d28412eef42ec50a92028b3702c58178dc86de3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ko=C5=82aczkowski?= Date: Mon, 24 Jun 2024 17:25:34 +0200 Subject: [PATCH 12/22] Automatic type conversions in prepared statements --- Cargo.lock | 8 +- README.md | 26 +++---- src/context.rs | 199 +++++++++++++++++++++++++++++++----------------- src/workload.rs | 7 ++ 4 files changed, 156 insertions(+), 84 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7348958..3fafe35 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1020,7 +1020,7 @@ dependencies = [ [[package]] name = "latte-cli" -version = "0.27.0" +version = "0.26.0" dependencies = [ "anyhow", "base64 0.22.1", @@ -1913,6 +1913,8 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "scylla" version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9439d92eea9f86c07175c819c3a129ca28b02477b47df26db354a1f4ea7ee276" dependencies = [ "arc-swap", "async-trait", @@ -1944,6 +1946,8 @@ dependencies = [ [[package]] name = "scylla-cql" version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64037fb9d9c59ae15137fff9a56c4d528908dfd38d09e75b5f8e56e3894966dd" dependencies = [ "async-trait", "byteorder", @@ -1959,6 +1963,8 @@ dependencies = [ [[package]] name = "scylla-macros" version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e5fe1d389adebe6a1a27bce18b81a65ff18c25d58a795de490e18b0e7a27b9f" dependencies = [ "darling", "proc-macro2", diff --git a/README.md b/README.md index 6ba53db..50bd535 100644 --- a/README.md +++ b/README.md @@ -211,20 +211,18 @@ are pure, i.e. invoking them multiple times with the same parameters yields alwa - `latte::normal(i, mean, std_dev)` – generates a floating point number from a normal distribution - `latte::uniform(i, min, max)` – generates a floating point number from a uniform distribution -#### Numeric conversions - -Rune represents integers as 64-bit signed values. Therefore, it is possible to directly pass a Rune integer to -a Cassandra column of type `bigint`, and to pass a Rune float to a Cassandra column of type `double`. -However, binding a 64-bit value to smaller integer or float column types, like -`int`, `smallint`, `tinyint` or `float` will result in a runtime error. -As long as an integer value does not exceed the bounds, -you can convert it to smaller signed integer types by using the following instance functions: - -- `x.to_i32()` – converts a float or integer to a 32-bit signed integer, compatible with Cassandra `int` type -- `x.to_i16()` – converts a float or integer to a 16-bit signed integer, compatible with Cassandra `smallint` type -- `x.to_i8()` – converts a float or integer to an 8-bit signed integer, compatible with Cassandra `tinyint` type -- `x.to_f32()` - converts a float or integer value to a 32-bit float, compatible with Cassandra `float` type -- `x.clamp(min, max)` – restricts the range of an integer or a float value to given range +#### Type conversions +Rune uses 64-bit representation for integers and floats. +Since version 0.28 Rune numbers are automatically converted to proper target query parameter type, +therefore you don't need to do explicit conversions. E.g. you can pass an integer as a parameter +of Cassandra type `smallint`. If the number is too big to fit into the range allowed by the target +type, a runtime error will be signalled. + +The following methods are available: +- `x.to_integer()` – converts a float to an integer +- `x.to_float()` – converts an integer to a float +- `x.to_string()` – converts a float or integer to a string +- `x.clamp(min, max)` – restricts the range of an integer or a float value to given range You can also convert between floats and integers by calling `to_integer` or `to_float` instance functions. diff --git a/src/context.rs b/src/context.rs index 7556524..2900681 100644 --- a/src/context.rs +++ b/src/context.rs @@ -23,6 +23,7 @@ use rune::parse::Parser; use rune::runtime::{Object, Shared, TypeInfo, VmError}; use rune::{Any, Value}; use rust_embed::RustEmbed; +use scylla::_macro_internal::ColumnType; use scylla::frame::response::result::CqlValue; use scylla::prepared_statement::PreparedStatement; use scylla::transport::errors::{DbError, NewSessionError, QueryError}; @@ -192,7 +193,10 @@ pub enum CassErrorKind { FailedToConnect(Vec, NewSessionError), PreparedStatementNotFound(String), QueryRetriesExceeded(String), - UnsupportedType(TypeInfo), + QueryParamConversion(TypeInfo, ColumnType), + ValueOutOfRange(String, ColumnType), + InvalidNumberOfQueryParams, + InvalidQueryParamsObject(TypeInfo), Prepare(String, QueryError), Overloaded(QueryInfo, QueryError), QueryExecution(QueryInfo, QueryError), @@ -214,8 +218,20 @@ impl CassError { CassErrorKind::QueryRetriesExceeded(s) => { write!(buf, "QueryRetriesExceeded: {s}") } - CassErrorKind::UnsupportedType(s) => { - write!(buf, "Unsupported type: {s}") + CassErrorKind::ValueOutOfRange(v, t) => { + write!(buf, "Value {v} out of range for Cassandra type {t:?}") + } + CassErrorKind::QueryParamConversion(s, t) => { + write!( + buf, + "Cannot convert value of type {s} to Cassandra type {t:?}" + ) + } + CassErrorKind::InvalidNumberOfQueryParams => { + write!(buf, "Incorrect number of query parameters") + } + CassErrorKind::InvalidQueryParamsObject(t) => { + write!(buf, "Value of type {t} cannot by used as query parameters; expected a list or object") } CassErrorKind::Prepare(q, e) => { write!(buf, "Failed to prepare query \"{q}\": {e}") @@ -496,7 +512,7 @@ impl Context { .get(key) .ok_or_else(|| CassError(CassErrorKind::PreparedStatementNotFound(key.to_string())))?; - let params = bind::to_scylla_query_params(¶ms)?; + let params = bind::to_scylla_query_params(¶ms, statement.get_variable_col_specs())?; for current_attempt_num in 0..self.retry_number + 1 { let start_time = self.stats.try_lock().unwrap().start_request(); let rs = self.session.execute(statement, params.clone()).await; @@ -542,110 +558,147 @@ impl Context { /// Functions for binding rune values to CQL parameters mod bind { use crate::CassErrorKind; - use scylla::frame::response::result::CqlValue; + use scylla::_macro_internal::ColumnType; + use scylla::frame::response::result::{ColumnSpec, CqlValue}; use super::*; - fn to_scylla_value(v: &Value) -> Result { - match v { - Value::Bool(v) => Ok(CqlValue::Boolean(*v)), - Value::Byte(v) => Ok(CqlValue::TinyInt(*v as i8)), - Value::Integer(v) => Ok(CqlValue::BigInt(*v)), - Value::Float(v) => Ok(CqlValue::Double(*v)), - Value::StaticString(v) => Ok(CqlValue::Text(v.as_str().to_string())), - Value::String(v) => Ok(CqlValue::Text(v.borrow_ref().unwrap().as_str().to_string())), - Value::Bytes(v) => Ok(CqlValue::Blob(v.borrow_ref().unwrap().to_vec())), - Value::Option(v) => match v.borrow_ref().unwrap().as_ref() { - Some(v) => to_scylla_value(v), + fn to_scylla_value(v: &Value, typ: &ColumnType) -> Result { + match (v, typ) { + (Value::Bool(v), ColumnType::Boolean) => Ok(CqlValue::Boolean(*v)), + + (Value::Byte(v), ColumnType::TinyInt) => Ok(CqlValue::TinyInt(*v as i8)), + (Value::Byte(v), ColumnType::SmallInt) => Ok(CqlValue::SmallInt(*v as i16)), + (Value::Byte(v), ColumnType::Int) => Ok(CqlValue::Int(*v as i32)), + (Value::Byte(v), ColumnType::BigInt) => Ok(CqlValue::BigInt(*v as i64)), + + (Value::Integer(v), ColumnType::TinyInt) => { + convert_int(*v, ColumnType::TinyInt, CqlValue::TinyInt) + } + (Value::Integer(v), ColumnType::SmallInt) => { + convert_int(*v, ColumnType::SmallInt, CqlValue::SmallInt) + } + (Value::Integer(v), ColumnType::Int) => convert_int(*v, ColumnType::Int, CqlValue::Int), + (Value::Integer(v), ColumnType::BigInt) => Ok(CqlValue::BigInt(*v)), + + (Value::Float(v), ColumnType::Float) => Ok(CqlValue::Float(*v as f32)), + (Value::Float(v), ColumnType::Double) => Ok(CqlValue::Double(*v)), + + (Value::StaticString(v), ColumnType::Text | ColumnType::Ascii) => { + Ok(CqlValue::Text(v.as_str().to_string())) + } + (Value::String(v), ColumnType::Text | ColumnType::Ascii) => { + Ok(CqlValue::Text(v.borrow_ref().unwrap().as_str().to_string())) + } + + (Value::Bytes(v), ColumnType::Blob) => { + Ok(CqlValue::Blob(v.borrow_ref().unwrap().to_vec())) + } + (Value::Option(v), typ) => match v.borrow_ref().unwrap().as_ref() { + Some(v) => to_scylla_value(v, typ), None => Ok(CqlValue::Empty), }, - Value::Vec(v) => { + (Value::Vec(v), ColumnType::List(elt)) => { let v = v.borrow_ref().unwrap(); - let elements = v.as_ref().iter().map(to_scylla_value).try_collect()?; + let elements = v + .as_ref() + .iter() + .map(|v| to_scylla_value(v, elt)) + .try_collect()?; Ok(CqlValue::List(elements)) } - Value::Object(v) => { - let borrowed = v.borrow_ref().unwrap(); - - // // Get value of "_keyspace" key or set default value - let keyspace = match borrowed.get_value::("_keyspace") { - Ok(Some(value)) => value, - _ => "unknown".to_string(), - }; - - // // Get value of "_type_name" key or set default value - let type_name = match borrowed.get_value::("_type_name") { - Ok(Some(value)) => value, - _ => "unknown".to_string(), - }; - - let keys = borrowed.keys(); - let values: Result>, _> = borrowed - .values() - .map(|value| to_scylla_value(&value.clone()).map(Some)) - .collect(); - let fields: Vec<(String, Option)> = keys - .into_iter() - .zip(values?) - .filter(|&(key, _)| key != "_keyspace" && key != "_type_name") - .map(|(key, value)| (key.to_string(), value)) - .collect(); - let udt = CqlValue::UserDefinedType { + (Value::Vec(v), ColumnType::Set(elt)) => { + let v = v.borrow_ref().unwrap(); + let elements = v + .as_ref() + .iter() + .map(|v| to_scylla_value(v, elt)) + .try_collect()?; + Ok(CqlValue::Set(elements)) + } + ( + Value::Object(v), + ColumnType::UserDefinedType { keyspace, type_name, + field_types, + }, + ) => { + let borrowed = v.borrow_ref().unwrap(); + let mut fields = Vec::new(); + for (field_name, field_type) in field_types { + let value = match borrowed.get_value(field_name) { + Err(_) => None, + Ok(None) => Some(CqlValue::Empty), + Ok(Some(value)) => Some(to_scylla_value(&value, field_type)?), + }; + fields.push((field_name.to_string(), value)) + } + Ok(CqlValue::UserDefinedType { + keyspace: keyspace.to_string(), + type_name: type_name.to_string(), fields, - }; - Ok(udt) + }) } - Value::Any(obj) => { + (Value::Any(obj), ColumnType::Uuid) => { let obj = obj.borrow_ref().unwrap(); let h = obj.type_hash(); if h == Uuid::type_hash() { let uuid: &Uuid = obj.downcast_borrow_ref().unwrap(); Ok(CqlValue::Uuid(uuid.0)) - } else if h == Int32::type_hash() { - let int32: &Int32 = obj.downcast_borrow_ref().unwrap(); - Ok(CqlValue::Int(int32.0)) - } else if h == Int16::type_hash() { - let int16: &Int16 = obj.downcast_borrow_ref().unwrap(); - Ok(CqlValue::SmallInt(int16.0)) - } else if h == Int8::type_hash() { - let int8: &Int8 = obj.downcast_borrow_ref().unwrap(); - Ok(CqlValue::TinyInt(int8.0)) - } else if h == Float32::type_hash() { - let float32: &Float32 = obj.downcast_borrow_ref().unwrap(); - Ok(CqlValue::Float(float32.0)) } else { - Err(CassError(CassErrorKind::UnsupportedType( + Err(CassError(CassErrorKind::QueryParamConversion( v.type_info().unwrap(), + ColumnType::Uuid, ))) } } - other => Err(CassError(CassErrorKind::UnsupportedType( - other.type_info().unwrap(), + (value, typ) => Err(CassError(CassErrorKind::QueryParamConversion( + value.type_info().unwrap(), + typ.clone(), ))), } } + fn convert_int, R>( + value: i64, + typ: ColumnType, + f: impl Fn(T) -> R, + ) -> Result { + let converted = value.try_into().map_err(|_| { + CassError(CassErrorKind::ValueOutOfRange( + value.to_string(), + typ.clone(), + )) + })?; + Ok(f(converted)) + } + /// Binds parameters passed as a single rune value to the arguments of the statement. /// The `params` value can be a tuple, a vector, a struct or an object. - pub fn to_scylla_query_params(params: &Value) -> Result, CassError> { + pub fn to_scylla_query_params( + params: &Value, + types: &[ColumnSpec], + ) -> Result, CassError> { let mut values = Vec::new(); match params { Value::Tuple(tuple) => { let tuple = tuple.borrow_ref().unwrap(); - for v in tuple.iter() { - values.push(to_scylla_value(v)?); + if tuple.len() != types.len() { + return Err(CassError(CassErrorKind::InvalidNumberOfQueryParams)); + } + for (v, t) in tuple.iter().zip(types) { + values.push(to_scylla_value(v, &t.typ)?); } } Value::Vec(vec) => { let vec = vec.borrow_ref().unwrap(); - for v in vec.iter() { - values.push(to_scylla_value(v)?); + for (v, t) in vec.iter().zip(types) { + values.push(to_scylla_value(v, &t.typ)?); } } other => { - return Err(CassError(CassErrorKind::UnsupportedType( + return Err(CassError(CassErrorKind::InvalidQueryParamsObject( other.type_info().unwrap(), ))); } @@ -752,6 +805,14 @@ pub fn float_to_f32(value: f64) -> Option { Some(Float32(value as f32)) } +pub fn int_to_string(value: i64) -> Option { + Some(value.to_string()) +} + +pub fn float_to_string(value: f64) -> Option { + Some(value.to_string()) +} + /// Computes a hash of an integer value `i`. /// Returns a value in range `0..i64::MAX`. pub fn hash(i: i64) -> i64 { diff --git a/src/workload.rs b/src/workload.rs index 834ebc7..a9cfc6f 100644 --- a/src/workload.rs +++ b/src/workload.rs @@ -151,6 +151,13 @@ impl Program { .macro_(&["param"], move |ctx, ts| context::param(ctx, ¶ms, ts)) .unwrap(); + latte_module + .inst_fn("to_string", context::int_to_string) + .unwrap(); + latte_module + .inst_fn("to_string", context::float_to_string) + .unwrap(); + latte_module.inst_fn("to_i32", context::int_to_i32).unwrap(); latte_module .inst_fn("to_i32", context::float_to_i32) From d3c685069e7d589726e9a7d80e241f550b247599 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ko=C5=82aczkowski?= Date: Tue, 25 Jun 2024 15:06:12 +0200 Subject: [PATCH 13/22] Allow objects and structs as source of named query params --- README.md | 13 +++++++++ src/context.rs | 73 +++++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 74 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 50bd535..cde656b 100644 --- a/README.md +++ b/README.md @@ -170,6 +170,19 @@ pub async fn run(ctx, i) { } ``` +Query parameters can be bound and passed by names as well: +```rust +const INSERT = "my_insert"; + +pub async fn prepare(ctx) { + ctx.prepare(INSERT, "INSERT INTO test.test(id, data) VALUES (:id, :data)").await?; +} + +pub async fn run(ctx, i) { + ctx.execute_prepared(INSERT, #{id: 5, data: "foo"}).await +} +``` + ### Populating the database Read queries are more interesting when they return non-empty result sets. diff --git a/src/context.rs b/src/context.rs index 2900681..e1b6776 100644 --- a/src/context.rs +++ b/src/context.rs @@ -624,22 +624,31 @@ mod bind { field_types, }, ) => { - let borrowed = v.borrow_ref().unwrap(); - let mut fields = Vec::new(); - for (field_name, field_type) in field_types { - let value = match borrowed.get_value(field_name) { - Err(_) => None, - Ok(None) => Some(CqlValue::Empty), - Ok(Some(value)) => Some(to_scylla_value(&value, field_type)?), - }; - fields.push((field_name.to_string(), value)) - } + let obj = v.borrow_ref().unwrap(); + let fields = read_fields(|s| obj.get(s), field_types)?; + Ok(CqlValue::UserDefinedType { + keyspace: keyspace.to_string(), + type_name: type_name.to_string(), + fields, + }) + } + ( + Value::Struct(v), + ColumnType::UserDefinedType { + keyspace, + type_name, + field_types, + }, + ) => { + let obj = v.borrow_ref().unwrap(); + let fields = read_fields(|s| obj.get(s), field_types)?; Ok(CqlValue::UserDefinedType { keyspace: keyspace.to_string(), type_name: type_name.to_string(), fields, }) } + (Value::Any(obj), ColumnType::Uuid) => { let obj = obj.borrow_ref().unwrap(); let h = obj.type_hash(); @@ -680,9 +689,9 @@ mod bind { params: &Value, types: &[ColumnSpec], ) -> Result, CassError> { - let mut values = Vec::new(); - match params { + Ok(match params { Value::Tuple(tuple) => { + let mut values = Vec::new(); let tuple = tuple.borrow_ref().unwrap(); if tuple.len() != types.len() { return Err(CassError(CassErrorKind::InvalidNumberOfQueryParams)); @@ -690,18 +699,58 @@ mod bind { for (v, t) in tuple.iter().zip(types) { values.push(to_scylla_value(v, &t.typ)?); } + values } Value::Vec(vec) => { + let mut values = Vec::new(); + let vec = vec.borrow_ref().unwrap(); for (v, t) in vec.iter().zip(types) { values.push(to_scylla_value(v, &t.typ)?); } + values + } + Value::Object(obj) => { + let obj = obj.borrow_ref().unwrap(); + read_params(|f| obj.get(f), types)? + } + Value::Struct(obj) => { + let obj = obj.borrow_ref().unwrap(); + read_params(|f| obj.get(f), types)? } other => { return Err(CassError(CassErrorKind::InvalidQueryParamsObject( other.type_info().unwrap(), ))); } + }) + } + + fn read_params<'a, 'b>( + get_value: impl Fn(&String) -> Option<&'a Value>, + params: &[ColumnSpec], + ) -> Result, CassError> { + let mut values = Vec::with_capacity(params.len()); + for column in params { + let value = match get_value(&column.name) { + Some(value) => to_scylla_value(value, &column.typ)?, + None => CqlValue::Empty, + }; + values.push(value) + } + Ok(values) + } + + fn read_fields<'a, 'b>( + get_value: impl Fn(&String) -> Option<&'a Value>, + fields: &[(String, ColumnType)], + ) -> Result)>, CassError> { + let mut values = Vec::with_capacity(fields.len()); + for (field_name, field_type) in fields { + if let Some(value) = get_value(field_name) { + let value = Some(to_scylla_value(value, field_type)?); + values.push((field_name.to_string(), value)) + }; } Ok(values) } From 72b12c6f68eb8ec51805bd933cf87f95036abcb0 Mon Sep 17 00:00:00 2001 From: Valerii Ponomarov Date: Thu, 27 Jun 2024 22:25:14 +0300 Subject: [PATCH 14/22] Add 'ColumnType::Map' support for the prepared statements --- src/context.rs | 70 ++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 68 insertions(+), 2 deletions(-) diff --git a/src/context.rs b/src/context.rs index e1b6776..87b81bd 100644 --- a/src/context.rs +++ b/src/context.rs @@ -132,7 +132,32 @@ pub fn cql_value_obj_to_string(v: &CqlValue) -> String { result.push_str("])"); result } - // TODO: cover 'CqlValue::Map' and 'CqlValue::Set' + CqlValue::Set(elements) => { + let mut result = String::from("Set(["); + for element in elements { + let element_string = cql_value_obj_to_string(element); + result.push_str(&element_string); + result.push_str(", "); + } + if result.len() >= 2 { + result.truncate(result.len() - 2); + } + result.push_str("])"); + result + } + CqlValue::Map(pairs) => { + let mut result = String::from("Map({"); + for (key, value) in pairs { + let key_string = cql_value_obj_to_string(key); + let value_string = cql_value_obj_to_string(value); + result.push_str(&format!("({}: {}), ", key_string, value_string)); + } + if result.len() >= 2 { + result.truncate(result.len() - 2); + } + result.push_str("})"); + result + } _ => format!("{v:?}"), } } @@ -197,6 +222,7 @@ pub enum CassErrorKind { ValueOutOfRange(String, ColumnType), InvalidNumberOfQueryParams, InvalidQueryParamsObject(TypeInfo), + WrongDataStructure(String), Prepare(String, QueryError), Overloaded(QueryInfo, QueryError), QueryExecution(QueryInfo, QueryError), @@ -233,6 +259,9 @@ impl CassError { CassErrorKind::InvalidQueryParamsObject(t) => { write!(buf, "Value of type {t} cannot by used as query parameters; expected a list or object") } + CassErrorKind::WrongDataStructure(s) => { + write!(buf, "Wrong data structure: {s}") + } CassErrorKind::Prepare(q, e) => { write!(buf, "Failed to prepare query \"{q}\": {e}") } @@ -564,6 +593,10 @@ mod bind { use super::*; fn to_scylla_value(v: &Value, typ: &ColumnType) -> Result { + // TODO: add support for the following native CQL types: + // 'counter', 'date', 'decimal', 'duration', 'inet', 'time', + // 'timestamp', 'timeuuid' and 'variant'. + // Also, for the 'tuple'. match (v, typ) { (Value::Bool(v), ColumnType::Boolean) => Ok(CqlValue::Boolean(*v)), @@ -597,7 +630,7 @@ mod bind { (Value::Option(v), typ) => match v.borrow_ref().unwrap().as_ref() { Some(v) => to_scylla_value(v, typ), None => Ok(CqlValue::Empty), - }, + } (Value::Vec(v), ColumnType::List(elt)) => { let v = v.borrow_ref().unwrap(); let elements = v @@ -616,6 +649,39 @@ mod bind { .try_collect()?; Ok(CqlValue::Set(elements)) } + (Value::Vec(v), ColumnType::Map(key_elt, value_elt)) => { + let v = v.borrow_ref().unwrap(); + if v.len() > 0 { + if let Value::Tuple(first_tuple) = &v[0] { + if first_tuple.borrow_ref().unwrap().len() == 2 { + let map_values: Vec<(CqlValue, CqlValue)> = v + .iter() + .filter_map(|tuple_wrapped| { + if let Value::Tuple(tuple_wrapped) = &tuple_wrapped { + let tuple = tuple_wrapped.borrow_ref().unwrap(); + let key = to_scylla_value(tuple.get(0).unwrap(), key_elt).unwrap(); + let value = to_scylla_value(tuple.get(1).unwrap(), value_elt).unwrap(); + Some((key, value)) + } else { + None + } + }) + .collect(); + Ok(CqlValue::Map(map_values)) + } else { + Err(CassError(CassErrorKind::WrongDataStructure( + "Vector's tuple must have exactly 2 elements".to_string(), + ))) + } + } else { + Err(CassError(CassErrorKind::WrongDataStructure( + "ColumnType::Map expects only vector of tuples".to_string(), + ))) + } + } else { + Ok(CqlValue::Map(vec![])) + } + } ( Value::Object(v), ColumnType::UserDefinedType { From 7fedf5db856a3ab1cbc30655d9bc14798fe38f70 Mon Sep 17 00:00:00 2001 From: Valerii Ponomarov Date: Thu, 27 Jun 2024 22:28:47 +0300 Subject: [PATCH 15/22] Add 'ColumnType::Timestamp' support for the prepared statements --- src/context.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/context.rs b/src/context.rs index 87b81bd..e7d6df2 100644 --- a/src/context.rs +++ b/src/context.rs @@ -613,6 +613,9 @@ mod bind { } (Value::Integer(v), ColumnType::Int) => convert_int(*v, ColumnType::Int, CqlValue::Int), (Value::Integer(v), ColumnType::BigInt) => Ok(CqlValue::BigInt(*v)), + (Value::Integer(v), ColumnType::Timestamp) => { + Ok(CqlValue::Timestamp(scylla::frame::value::CqlTimestamp(*v))) + } (Value::Float(v), ColumnType::Float) => Ok(CqlValue::Float(*v as f32)), (Value::Float(v), ColumnType::Double) => Ok(CqlValue::Double(*v)), From b244451be876c206bab14ccac092368e0ac5bc95 Mon Sep 17 00:00:00 2001 From: Valerii Ponomarov Date: Fri, 28 Jun 2024 17:48:24 +0300 Subject: [PATCH 16/22] Add 'ColumnType::Inet' support for the prepared statements --- src/context.rs | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/src/context.rs b/src/context.rs index e7d6df2..3984eb7 100644 --- a/src/context.rs +++ b/src/context.rs @@ -4,6 +4,8 @@ use std::fs::File; use std::hash::{Hash, Hasher}; use std::io; use std::io::{BufRead, BufReader, ErrorKind, Read}; +use std::net::IpAddr; +use std::str::FromStr; use std::sync::Arc; use anyhow::anyhow; @@ -626,6 +628,29 @@ mod bind { (Value::String(v), ColumnType::Text | ColumnType::Ascii) => { Ok(CqlValue::Text(v.borrow_ref().unwrap().as_str().to_string())) } + (Value::StaticString(v), ColumnType::Inet) => { + let ipaddr = IpAddr::from_str(v); + match ipaddr { + Ok(ipaddr) => Ok(CqlValue::Inet(ipaddr)), + Err(e) => { + Err(CassError(CassErrorKind::WrongDataStructure( + format!("Failed to parse '{}' StaticString as IP address: {}", v.as_str(), e), + ))) + } + } + } + (Value::String(v), ColumnType::Inet) => { + let ipaddr_str = v.borrow_ref().unwrap(); + let ipaddr = IpAddr::from_str(ipaddr_str.as_str()); + match ipaddr { + Ok(ipaddr) => Ok(CqlValue::Inet(ipaddr)), + Err(e) => { + Err(CassError(CassErrorKind::WrongDataStructure( + format!("Failed to parse '{}' String as IP address: {}", ipaddr_str.as_str(), e), + ))) + } + } + } (Value::Bytes(v), ColumnType::Blob) => { Ok(CqlValue::Blob(v.borrow_ref().unwrap().to_vec())) From ad3041f109bee4371a57c94c42989fc4fa2250e4 Mon Sep 17 00:00:00 2001 From: Valerii Ponomarov Date: Fri, 28 Jun 2024 19:56:24 +0300 Subject: [PATCH 17/22] Add 'ColumnType::Timeuuid' support for the prepared statements --- src/context.rs | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/context.rs b/src/context.rs index 3984eb7..431ebfd 100644 --- a/src/context.rs +++ b/src/context.rs @@ -27,6 +27,7 @@ use rune::{Any, Value}; use rust_embed::RustEmbed; use scylla::_macro_internal::ColumnType; use scylla::frame::response::result::CqlValue; +use scylla::frame::value::CqlTimeuuid; use scylla::prepared_statement::PreparedStatement; use scylla::transport::errors::{DbError, NewSessionError, QueryError}; use scylla::transport::session::PoolSize; @@ -622,6 +623,29 @@ mod bind { (Value::Float(v), ColumnType::Float) => Ok(CqlValue::Float(*v as f32)), (Value::Float(v), ColumnType::Double) => Ok(CqlValue::Double(*v)), + (Value::StaticString(v), ColumnType::Timeuuid) => { + let timeuuid = CqlTimeuuid::from_str(v); + match timeuuid { + Ok(timeuuid) => Ok(CqlValue::Timeuuid(timeuuid)), + Err(e) => { + Err(CassError(CassErrorKind::WrongDataStructure( + format!("Failed to parse '{}' StaticString as Timeuuid: {}", v.as_str(), e), + ))) + } + } + } + (Value::String(v), ColumnType::Timeuuid) => { + let timeuuid_str = v.borrow_ref().unwrap(); + let timeuuid = CqlTimeuuid::from_str(timeuuid_str.as_str()); + match timeuuid { + Ok(timeuuid) => Ok(CqlValue::Timeuuid(timeuuid)), + Err(e) => { + Err(CassError(CassErrorKind::WrongDataStructure( + format!("Failed to parse '{}' String as Timeuuid: {}", timeuuid_str.as_str(), e), + ))) + } + } + } (Value::StaticString(v), ColumnType::Text | ColumnType::Ascii) => { Ok(CqlValue::Text(v.as_str().to_string())) } From 05fa8a6e3eca715037ab8eab663a243ed613d215 Mon Sep 17 00:00:00 2001 From: Valerii Ponomarov Date: Sun, 24 Mar 2024 20:12:44 +0200 Subject: [PATCH 18/22] Support printing string values of optional params set with '-P' flag Before we would have following output on the start of latte: -P str_param -P int_param 5 If we provided following params to the latte command: latte ... -P str_param="\"foo_str\"" -P int_param=5 With this change we will have following output: -P str_param "foo_str" -P int_param 5 --- src/report.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/report.rs b/src/report.rs index cf6a7fc..9ba0595 100644 --- a/src/report.rs +++ b/src/report.rs @@ -492,7 +492,18 @@ impl<'a> Display for RunConfigCmp<'a> { if !param_names.is_empty() { for k in param_names { let label = format!("-P {k}"); - let line = self.line(label.as_str(), "", |conf| Quantity::from(conf.get_param(k))); + let line = self.line(label.as_str(), "", |conf| { + match Quantity::from(conf.get_param(k)).value { + Some(quantity) => quantity.to_string(), + None => { + let str_value = conf.params.iter() + .find(|(key, _)| key == k) + .map(|(_, value)| value.clone()) + .unwrap_or_else(|| "".to_string()); + str_value + } + } + }); writeln!(f, "{line}").unwrap(); } writeln!(f, "{}", fmt_horizontal_line()).unwrap(); From 52db0cd41c783fc1e10055ee63f0bde1f9cbab73 Mon Sep 17 00:00:00 2001 From: Valerii Ponomarov Date: Fri, 19 Apr 2024 19:37:56 +0300 Subject: [PATCH 19/22] Allow to prioritize nodes based on their datacenter --- src/config.rs | 4 ++++ src/context.rs | 7 +++++++ src/report.rs | 4 ++++ 3 files changed, 15 insertions(+) diff --git a/src/config.rs b/src/config.rs index e8d42df..7566b97 100644 --- a/src/config.rs +++ b/src/config.rs @@ -181,6 +181,10 @@ pub struct ConnectionConf { #[clap(long("ssl-key"), value_name = "PATH")] pub ssl_key_file: Option, + /// Datacenter name + #[clap(long("datacenter"), required = false, default_value = "")] + pub datacenter: String, + /// Default CQL query consistency level #[clap(long("consistency"), required = false, default_value = "LOCAL_QUORUM")] pub consistency: Consistency, diff --git a/src/context.rs b/src/context.rs index 431ebfd..60be4be 100644 --- a/src/context.rs +++ b/src/context.rs @@ -28,6 +28,7 @@ use rust_embed::RustEmbed; use scylla::_macro_internal::ColumnType; use scylla::frame::response::result::CqlValue; use scylla::frame::value::CqlTimeuuid; +use scylla::load_balancing::DefaultPolicy; use scylla::prepared_statement::PreparedStatement; use scylla::transport::errors::{DbError, NewSessionError, QueryError}; use scylla::transport::session::PoolSize; @@ -60,8 +61,14 @@ fn ssl_context(conf: &&ConnectionConf) -> Result, CassError> /// Configures connection to Cassandra. pub async fn connect(conf: &ConnectionConf) -> Result { + let mut policy_builder = DefaultPolicy::builder().token_aware(true); + let dc = &conf.datacenter; + if !dc.is_empty() { + policy_builder = policy_builder.prefer_datacenter(dc.to_owned()).permit_dc_failover(true); + } let profile = ExecutionProfile::builder() .consistency(conf.consistency.scylla_consistency()) + .load_balancing_policy(policy_builder.build()) .request_timeout(Some(Duration::from_secs(conf.request_timeout.get() as u64))) .build(); diff --git a/src/report.rs b/src/report.rs index 9ba0595..015cc3b 100644 --- a/src/report.rs +++ b/src/report.rs @@ -510,6 +510,10 @@ impl<'a> Display for RunConfigCmp<'a> { } let lines: Vec> = vec![ + self.line("Datacenter", "", |conf| {conf.connection.datacenter.clone()}), + self.line("Consistency", "", |conf| { + conf.connection.consistency.scylla_consistency().to_string() + }), self.line("Threads", "", |conf| Quantity::from(conf.threads)), self.line("Connections", "", |conf| { Quantity::from(conf.connection.count) From 21dadc18376341ca9b2d3af893c14a34c502e7bd Mon Sep 17 00:00:00 2001 From: Valerii Ponomarov Date: Tue, 2 Jul 2024 18:25:11 +0300 Subject: [PATCH 20/22] Update list of native types to be supported for prepared statements --- src/context.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/context.rs b/src/context.rs index 60be4be..9fed1e5 100644 --- a/src/context.rs +++ b/src/context.rs @@ -604,8 +604,7 @@ mod bind { fn to_scylla_value(v: &Value, typ: &ColumnType) -> Result { // TODO: add support for the following native CQL types: - // 'counter', 'date', 'decimal', 'duration', 'inet', 'time', - // 'timestamp', 'timeuuid' and 'variant'. + // 'counter', 'date', 'decimal', 'duration', 'time' and 'variant'. // Also, for the 'tuple'. match (v, typ) { (Value::Bool(v), ColumnType::Boolean) => Ok(CqlValue::Boolean(*v)), From bd38d36acd0eb1137482b30605653ab9cda9cdc3 Mon Sep 17 00:00:00 2001 From: Valerii Ponomarov Date: Mon, 29 Apr 2024 18:10:56 +0300 Subject: [PATCH 21/22] Add possibility to not store all the samples that leak memory It affects the possibility to generate report that requires data. So, add new config option called '--generate-report' to cover the situation with memory leaks. If it is set then we store all the samples as before and generate final report based on the stored data. Side-effect - linear memory leaks. If it is not set (default) then the samples data won't be stored and final report won't be generated keeping stable memory consumption. --- src/config.rs | 6 +++ src/exec.rs | 3 +- src/main.rs | 31 +++++++++------- src/report.rs | 100 ++++++++++++++++++++++++++------------------------ src/stats.rs | 19 ++++++++-- 5 files changed, 93 insertions(+), 66 deletions(-) diff --git a/src/config.rs b/src/config.rs index 7566b97..66256f2 100644 --- a/src/config.rs +++ b/src/config.rs @@ -374,6 +374,12 @@ pub struct RunCommand { #[clap(long("tag"), number_of_values = 1)] pub tags: Vec, + /// Wether to generate final report or not. If disabled (default) then memory consumption will + /// be static, otherwise it will leak linearly storing samples info for a final report + /// calculation. + #[clap(long("generate-report"), required = false)] + pub generate_report: bool, + /// Path to an output file or directory where the JSON report should be written to. #[clap(short('o'), long)] #[serde(skip)] diff --git a/src/exec.rs b/src/exec.rs index ef2a2ec..41d8b43 100644 --- a/src/exec.rs +++ b/src/exec.rs @@ -161,6 +161,7 @@ pub async fn par_execute( name: &str, exec_options: &ExecutionOptions, sampling: Interval, + store_samples: bool, workload: Workload, show_progress: bool, ) -> Result { @@ -179,7 +180,7 @@ pub async fn par_execute( let progress = Arc::new(StatusLine::with_options(progress, progress_opts)); let deadline = BoundedCycleCounter::new(exec_options.duration); let mut streams = Vec::with_capacity(thread_count); - let mut stats = Recorder::start(rate, concurrency); + let mut stats = Recorder::start(rate, concurrency, store_samples); for _ in 0..thread_count { let s = spawn_stream( diff --git a/src/main.rs b/src/main.rs index 1f0691f..3d7eb00 100644 --- a/src/main.rs +++ b/src/main.rs @@ -179,6 +179,7 @@ async fn load(conf: LoadCommand) -> Result<()> { "Loading...", &load_options, config::Interval::Unbounded, + false, loader, !conf.quiet, ) @@ -235,6 +236,7 @@ async fn run(conf: RunCommand) -> Result<()> { "Warming up...", &warmup_options, Interval::Unbounded, + conf.generate_report, runner.clone()?, !conf.quiet, ) @@ -263,6 +265,7 @@ async fn run(conf: RunCommand) -> Result<()> { "Running...", &exec_options, conf.sampling_interval, + conf.generate_report, runner, !conf.quiet, ) @@ -282,19 +285,21 @@ async fn run(conf: RunCommand) -> Result<()> { println!(); println!("{}", &stats_cmp); - let path = conf - .output - .clone() - .unwrap_or_else(|| conf.default_output_file_name("json")); - - let report = Report::new(conf, stats); - match report.save(&path) { - Ok(()) => { - eprintln!("info: Saved report to {}", path.display()); - } - Err(e) => { - eprintln!("error: Failed to save report to {}: {}", path.display(), e); - exit(1); + if stats_cmp.v1.log.len() > 1 { + let path = conf + .output + .clone() + .unwrap_or_else(|| conf.default_output_file_name("json")); + + let report = Report::new(conf, stats); + match report.save(&path) { + Ok(()) => { + eprintln!("info: Saved report to {}", path.display()); + } + Err(e) => { + eprintln!("error: Failed to save report to {}: {}", path.display(), e); + exit(1); + } } } Ok(()) diff --git a/src/report.rs b/src/report.rs index 015cc3b..e1adadb 100644 --- a/src/report.rs +++ b/src/report.rs @@ -635,7 +635,7 @@ impl<'a> Display for BenchmarkCmp<'a> { writeln!(f, "{}", fmt_cmp_header(true))?; } - let summary: Vec> = vec![ + let mut summary: Vec> = vec![ self.line("Elapsed time", "s", |s| { Quantity::from(s.elapsed_time_s).with_precision(3) }), @@ -658,58 +658,62 @@ impl<'a> Display for BenchmarkCmp<'a> { self.line("└─", "row/req", |s| { Quantity::from(s.row_count_per_req).with_precision(1) }), - self.line("Samples", "", |s| Quantity::from(s.log.len())), - self.line("Mean sample size", "op", |s| { - Quantity::from(s.log.iter().map(|s| s.cycle_count as f64).mean()).with_precision(0) - }), - self.line("└─", "req", |s| { - Quantity::from(s.log.iter().map(|s| s.request_count as f64).mean()) - .with_precision(0) - }), - self.line("Concurrency", "req", |s| { - Quantity::from(s.concurrency).with_precision(0) - }), - self.line("└─", "%", |s| { - Quantity::from(s.concurrency_ratio).with_precision(0) - }), - self.line("Throughput", "op/s", |s| { - Quantity::from(s.cycle_throughput).with_precision(0) - }) - .with_significance(self.cmp_cycle_throughput()) - .with_orientation(1) - .into_box(), - self.line("├─", "req/s", |s| { - Quantity::from(s.req_throughput).with_precision(0) - }) - .with_significance(self.cmp_req_throughput()) - .with_orientation(1) - .into_box(), - self.line("└─", "row/s", |s| { - Quantity::from(s.row_throughput).with_precision(0) - }) - .with_significance(self.cmp_row_throughput()) - .with_orientation(1) - .into_box(), - self.line("Mean cycle time", "ms", |s| { - Quantity::from(&s.cycle_time_ms).with_precision(3) - }) - .with_significance(self.cmp_mean_resp_time()) - .with_orientation(-1) - .into_box(), - self.line("Mean resp. time", "ms", |s| { - Quantity::from(s.resp_time_ms.as_ref().map(|rt| rt.mean)).with_precision(3) - }) - .with_significance(self.cmp_mean_resp_time()) - .with_orientation(-1) - .into_box(), + self.line("Samples", "", |s| Quantity::from(s.samples_count)), ]; - + if self.v1.log.len() > 1 { + let summary_part2: Vec> = vec![ + self.line("Mean sample size", "op", |s| { + Quantity::from(s.log.iter().map(|s| s.cycle_count as f64).mean()) + .with_precision(0) + }), + self.line("└─", "req", |s| { + Quantity::from(s.log.iter().map(|s| s.request_count as f64).mean()) + .with_precision(0) + }), + self.line("Concurrency", "req", |s| { + Quantity::from(s.concurrency).with_precision(0) + }), + self.line("└─", "%", |s| { + Quantity::from(s.concurrency_ratio).with_precision(0) + }), + self.line("Throughput", "op/s", |s| { + Quantity::from(s.cycle_throughput).with_precision(0) + }) + .with_significance(self.cmp_cycle_throughput()) + .with_orientation(1) + .into_box(), + self.line("├─", "req/s", |s| { + Quantity::from(s.req_throughput).with_precision(0) + }) + .with_significance(self.cmp_req_throughput()) + .with_orientation(1) + .into_box(), + self.line("└─", "row/s", |s| { + Quantity::from(s.row_throughput).with_precision(0) + }) + .with_significance(self.cmp_row_throughput()) + .with_orientation(1) + .into_box(), + self.line("Mean cycle time", "ms", |s| { + Quantity::from(&s.cycle_time_ms).with_precision(3) + }) + .with_significance(self.cmp_mean_resp_time()) + .with_orientation(-1) + .into_box(), + self.line("Mean resp. time", "ms", |s| { + Quantity::from(s.resp_time_ms.as_ref().map(|rt| rt.mean)).with_precision(3) + }) + .with_significance(self.cmp_mean_resp_time()) + .with_orientation(-1) + .into_box(), + ]; + summary.extend(summary_part2); + } for l in summary { writeln!(f, "{l}")?; } - writeln!(f)?; - if self.v1.request_count > 0 { + if self.v1.request_count > 0 && self.v1.log.len() > 1 { let resp_time_percentiles = [ Percentile::Min, Percentile::P25, diff --git a/src/stats.rs b/src/stats.rs index 91d7b42..b9977c6 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -351,17 +351,26 @@ impl Sample { /// Collects the samples and computes aggregate statistics struct Log { samples: Vec, + samples_counter: u64, + store_samples: bool, } impl Log { - fn new() -> Log { + fn new(store_samples: bool) -> Log { Log { samples: Vec::new(), + samples_counter: 0, + store_samples, } } fn append(&mut self, sample: Sample) -> &Sample { - self.samples.push(sample); + if self.store_samples || self.samples.is_empty() { + self.samples.push(sample); + } else { + self.samples[0] = sample; + } + self.samples_counter += 1; self.samples.last().unwrap() } @@ -473,6 +482,7 @@ pub struct BenchmarkStats { pub errors_ratio: Option, pub row_count: u64, pub row_count_per_req: Option, + pub samples_count: u64, pub cycle_throughput: Mean, pub cycle_throughput_ratio: Option, pub req_throughput: Mean, @@ -569,7 +579,7 @@ impl Recorder { /// Creates a new recorder. /// The `rate_limit` and `concurrency_limit` parameters are used only as the /// reference levels for relative throughput and relative parallelism. - pub fn start(rate_limit: Option, concurrency_limit: NonZeroUsize) -> Recorder { + pub fn start(rate_limit: Option, concurrency_limit: NonZeroUsize, store_samples: bool) -> Recorder { let start_time = SystemTime::now(); let start_instant = Instant::now(); Recorder { @@ -579,7 +589,7 @@ impl Recorder { end_instant: start_instant, start_cpu_time: ProcessTime::now(), end_cpu_time: ProcessTime::now(), - log: Log::new(), + log: Log::new(store_samples), rate_limit, concurrency_limit, cycle_count: 0, @@ -661,6 +671,7 @@ impl Recorder { requests_per_cycle: self.request_count as f64 / self.cycle_count as f64, row_count: self.row_count, row_count_per_req: not_nan(self.row_count as f64 / self.request_count as f64), + samples_count: self.log.samples_counter, cycle_throughput, cycle_throughput_ratio, req_throughput, From 60f95738893d3e466ad03674f5eb0b4ee1a64d40 Mon Sep 17 00:00:00 2001 From: Valerii Ponomarov Date: Tue, 2 Jul 2024 18:31:21 +0300 Subject: [PATCH 22/22] Update latte version from '0.26.0' to '0.26.1-scylladb' It will allow anyone to distnguish binaries built out of our fork and the source project. Source project version used as base is '0.26.0'. And our fork with additional commits based on it is '0.26.1-scylladb' --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3fafe35..ab1059a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1020,7 +1020,7 @@ dependencies = [ [[package]] name = "latte-cli" -version = "0.26.0" +version = "0.26.1-scylladb" dependencies = [ "anyhow", "base64 0.22.1", diff --git a/Cargo.toml b/Cargo.toml index b27e7ce..1d40286 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "latte-cli" description = "A database benchmarking tool for Apache Cassandra" -version = "0.26.0" +version = "0.26.1-scylladb" authors = ["Piotr Kołaczkowski "] edition = "2021" readme = "README.md"