From 8023916c311a07fc7691c45067e7a693329034b9 Mon Sep 17 00:00:00 2001 From: Valerii Ponomarov Date: Tue, 5 Nov 2024 21:41:24 +0200 Subject: [PATCH] Add possibility to run stress with sinusoidally changing rate To enable it just define following parameters: --rate=1000 --rate-sine-amplitude=0.2 // rate +- 20% --rate-sine-period=10s Note that sine wave gets enabled only when the '--rate=%value%' is defined. --- src/config.rs | 50 +++++++++++++--- src/exec/mod.rs | 146 ++++++++++++++++++++++++++++++++++++++++++---- src/main.rs | 10 +++- src/report/mod.rs | 4 +- 4 files changed, 186 insertions(+), 24 deletions(-) diff --git a/src/config.rs b/src/config.rs index 1dfd9b6..2c72dd9 100644 --- a/src/config.rs +++ b/src/config.rs @@ -16,6 +16,15 @@ use serde::{Deserialize, Serialize}; /// Limit of retry errors to be kept and then printed in scope of a sampling interval pub const PRINT_RETRY_ERROR_LIMIT: u64 = 5; +fn parse_f64(s: &str) -> Result { + let parsed_value: f64 = s.parse().map_err(|_| format!("Invalid float: {}", s))?; + if parsed_value >= 0.0 && parsed_value <= 1.0 { + Ok(parsed_value) + } else { + Err("Value must be between 0.0 and 1.0".to_string()) + } +} + /// Parse a single key-value pair fn parse_key_val(s: &str) -> Result<(T, U), anyhow::Error> where @@ -326,6 +335,35 @@ impl Display for WeightedFunction { } } +#[derive(Parser, Debug, Serialize, Deserialize)] +pub struct RateConf { + /// Number of cycles per second to execute. + /// If not given, the load cycles will be executed as fast as possible. + #[clap(short('r'), long, value_name = "COUNT")] + pub rate: Option, + + /// Used to enable the 'sinusoidal' rate generation and defines + /// the relative value (0.0..1.0) for the variation from the main rate (sine amplitude). + /// Requires the "rate" option to be defined. + #[clap( + long("rate-sine-amplitude"), + aliases = &["rate-amplitude", "rate-sine-variation", "rate-variation"], + value_parser = parse_f64, + value_name = "RATE-MULTIPLIER", + )] + pub rate_sine_amplitude: Option, + + /// Used for the 'sinusoidal' rate generation as a definition of the rate sine wave period. + #[clap( + long("rate-sine-period"), + aliases = &["sine-period"], + default_value = "1m", + value_name = "DURATION", + value_parser = parse_duration::parse, + )] + pub rate_sine_period: Duration, +} + #[derive(Parser, Debug, Serialize, Deserialize)] #[command(next_line_help = true)] pub struct EditCommand { @@ -353,10 +391,8 @@ pub struct SchemaCommand { #[derive(Parser, Debug, Serialize, Deserialize)] #[command(next_line_help = true)] pub struct LoadCommand { - /// Number of cycles per second to execute. - /// If not given, the load cycles will be executed as fast as possible. - #[clap(short('r'), long, value_name = "COUNT")] - pub rate: Option, + #[clap(flatten)] + pub rate: RateConf, /// Number of worker threads used by the driver. #[clap(short('t'), long, default_value = "1", value_name = "COUNT")] @@ -386,10 +422,8 @@ pub struct LoadCommand { #[derive(Parser, Debug, Serialize, Deserialize)] #[command(next_line_help = true)] pub struct RunCommand { - /// Number of cycles per second to execute. - /// If not given, the benchmark cycles will be executed as fast as possible. - #[clap(short('r'), long, value_name = "COUNT")] - pub rate: Option, + #[clap(flatten)] + pub rate: RateConf, /// Number of cycles or duration of the warmup phase. #[clap( diff --git a/src/exec/mod.rs b/src/exec/mod.rs index fe3d134..166fd0b 100644 --- a/src/exec/mod.rs +++ b/src/exec/mod.rs @@ -6,12 +6,14 @@ use itertools::Itertools; use pin_project::pin_project; use status_line::StatusLine; use std::cmp::max; +use std::f64::consts; use std::future::ready; use std::num::NonZeroUsize; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use std::time::Instant; +use std::time::{Duration, Instant}; +use tokio::pin; use tokio::signal::ctrl_c; use tokio::time::MissedTickBehavior; use tokio_stream::wrappers::IntervalStream; @@ -27,6 +29,95 @@ pub mod cycle; pub mod progress; pub mod workload; +/// Infinite iterator returning floats that form a sinusoidal wave +pub struct InfiniteSinusoidalIterator { + rate: f64, + amplitude: f64, + step: f64, + start: Instant, +} + +impl InfiniteSinusoidalIterator { + pub fn new(rate: f64, amplitude: f64, frequency: f64) -> InfiniteSinusoidalIterator { + let step = consts::PI * 2.0 * frequency; + InfiniteSinusoidalIterator { + rate, + amplitude, + step: step, + start: Instant::now(), + } + } +} + +impl Iterator for InfiniteSinusoidalIterator { + type Item = f64; + + fn next(&mut self) -> Option { + let elapsed = self.start.elapsed().as_secs_f64(); + let adjusted_rate = self.rate + self.amplitude * (self.step * elapsed).sin(); + Some(adjusted_rate) + } +} + +/// Custom interval stream for sinusoidal ticking. +struct SinusoidalIntervalStream { + tick_iterator: InfiniteSinusoidalIterator, + next_expected_tick: Instant, +} + +impl SinusoidalIntervalStream { + fn new(rate: f64, amplitude: f64, frequency: f64) -> Self { + let tick_iterator = InfiniteSinusoidalIterator::new(rate, amplitude, frequency); + let now = Instant::now(); + let initial_duration = tokio::time::Duration::from_secs_f64(1.0 / rate); + Self { + tick_iterator, + next_expected_tick: now + initial_duration, + } + } + + fn next_interval_duration(&mut self) -> tokio::time::Duration { + let adjusted_rate = self.tick_iterator.next(); + let period_ns = (1_000_000_000.0 / adjusted_rate.unwrap_or(1.0)).max(1.0) as u64; + tokio::time::Duration::from_nanos(period_ns) + } +} + +impl Stream for SinusoidalIntervalStream { + type Item = (); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let now = Instant::now(); + if now >= self.next_expected_tick { + let interval_duration = self.next_interval_duration(); + self.next_expected_tick += interval_duration; + Poll::Ready(Some(())) + } else { + // NOTE: If we are behind, keep trying to emit ticks until we catch up + let interval_duration = self.next_interval_duration(); + let next_tick = self.next_expected_tick + interval_duration; + if next_tick <= now { + self.next_expected_tick = next_tick; + Poll::Ready(Some(())) + } else { + // NOTE: If we are ahead, sleep for the remaining duration + let sleep_duration = self.next_expected_tick - now; + let waker = cx.waker().clone(); + tokio::spawn(async move { + tokio::time::sleep(sleep_duration).await; + waker.wake(); + }); + Poll::Pending + } + } + } +} + +/// Returns a stream emitting sinusoidally changing number of `rate` events per second. +fn sinusoidal_interval_stream(rate: f64, amplitude: f64, frequency: f64) -> impl Stream { + SinusoidalIntervalStream::new(rate, amplitude, frequency) +} + /// Returns a stream emitting `rate` events per second. fn interval_stream(rate: f64) -> IntervalStream { let period = tokio::time::Duration::from_nanos(max(1, (1000000000.0 / rate) as u64)); @@ -100,6 +191,8 @@ async fn run_stream( fn spawn_stream( concurrency: NonZeroUsize, rate: Option, + rate_sine_amplitude: Option, // Enables the sine wave if set + rate_sine_frequency: f64, sampling: Interval, workload: Workload, iter_counter: BoundedCycleCounter, @@ -110,17 +203,38 @@ fn spawn_stream( tokio::spawn(async move { match rate { Some(rate) => { - let stream = interval_stream(rate); - run_stream( - stream, - workload, - iter_counter, - concurrency, - sampling, - progress, - tx, - ) - .await + match rate_sine_amplitude { + Some(rate_sine_amplitude) => { + let stream = sinusoidal_interval_stream( + rate, + rate_sine_amplitude * rate, // transform to absolute value + rate_sine_frequency, + ); + run_stream( + stream, + workload, + iter_counter, + concurrency, + sampling, + progress, + tx, + ) + .await + } + None => { + let stream = interval_stream(rate); + run_stream( + stream, + workload, + iter_counter, + concurrency, + sampling, + progress, + tx, + ) + .await + } + } } None => { let stream = futures::stream::repeat_with(|| ()); @@ -163,6 +277,10 @@ pub struct ExecutionOptions { pub cycle_range: (i64, i64), /// Maximum rate of requests in requests per second, `None` means no limit pub rate: Option, + /// Rate sine wave amplitude + pub rate_sine_amplitude: Option, + /// Rate sine wave period + pub rate_sine_period: Duration, /// Number of parallel threads of execution pub threads: NonZeroUsize, /// Number of outstanding async requests per each thread @@ -196,6 +314,8 @@ pub async fn par_execute( let thread_count = exec_options.threads.get(); let concurrency = exec_options.concurrency; let rate = exec_options.rate; + let rate_sine_amplitude = exec_options.rate_sine_amplitude; + let rate_sine_frequency = 1.0 / exec_options.rate_sine_period.as_secs_f64(); let progress = match exec_options.duration { Interval::Count(count) => Progress::with_count(name.to_string(), count), Interval::Time(duration) => Progress::with_duration(name.to_string(), duration), @@ -214,6 +334,8 @@ pub async fn par_execute( let s = spawn_stream( concurrency, rate.map(|r| r / (thread_count as f64)), + rate_sine_amplitude, + rate_sine_frequency, sampling, workload.clone()?, deadline.share(), diff --git a/src/main.rs b/src/main.rs index 9590085..5e8eb25 100644 --- a/src/main.rs +++ b/src/main.rs @@ -186,7 +186,9 @@ async fn load(conf: LoadCommand) -> Result<()> { let load_options = ExecutionOptions { duration: config::Interval::Count(load_count), cycle_range: (0, i64::MAX), - rate: conf.rate, + rate: conf.rate.rate, + rate_sine_amplitude: conf.rate.rate_sine_amplitude, + rate_sine_period: conf.rate.rate_sine_period, threads: conf.threads, concurrency: conf.concurrency, }; @@ -250,6 +252,8 @@ async fn run(conf: RunCommand) -> Result<()> { duration: conf.warmup_duration, cycle_range: (conf.start_cycle, conf.end_cycle), rate: None, + rate_sine_amplitude: conf.rate.rate_sine_amplitude, + rate_sine_period: conf.rate.rate_sine_period, threads: conf.threads, concurrency: conf.concurrency, }; @@ -278,7 +282,9 @@ async fn run(conf: RunCommand) -> Result<()> { duration: conf.run_duration, cycle_range: (conf.start_cycle, conf.end_cycle), concurrency: conf.concurrency, - rate: conf.rate, + rate: conf.rate.rate, + rate_sine_amplitude: conf.rate.rate_sine_amplitude, + rate_sine_period: conf.rate.rate_sine_period, threads: conf.threads, }; diff --git a/src/report/mod.rs b/src/report/mod.rs index 8ddb1f3..1d5b803 100644 --- a/src/report/mod.rs +++ b/src/report/mod.rs @@ -82,7 +82,7 @@ impl Report { .and_then(|ts| Local.timestamp_opt(ts, 0).latest()), tags: self.conf.tags.clone(), params: self.conf.params.clone(), - rate: self.conf.rate, + rate: self.conf.rate.rate, throughput: self.result.cycle_throughput.value, latency_p50: self .result @@ -545,7 +545,7 @@ impl<'a> Display for RunConfigCmp<'a> { self.line("Concurrency", "req", |conf| { Quantity::from(conf.concurrency) }), - self.line("Max rate", "op/s", |conf| Quantity::from(conf.rate)), + self.line("Max rate", "op/s", |conf| Quantity::from(conf.rate.rate)), self.line("Warmup", "s", |conf| { Quantity::from(conf.warmup_duration.period_secs()) }),