Skip to content

Commit

Permalink
Add possibility to run stress with sinusoidally changing rate
Browse files Browse the repository at this point in the history
To enable it just define following parameters:
  --rate=1000
  --rate-sine-amplitude=0.2 // rate +- 20%
  --rate-sine-period=10s

Note that sine wave gets enabled only when the '--rate=%value%' is
defined.
  • Loading branch information
vponomaryov committed Nov 7, 2024
1 parent f09e9b8 commit 8023916
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 24 deletions.
50 changes: 42 additions & 8 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ use serde::{Deserialize, Serialize};
/// Limit of retry errors to be kept and then printed in scope of a sampling interval
pub const PRINT_RETRY_ERROR_LIMIT: u64 = 5;

fn parse_f64(s: &str) -> Result<f64, String> {
let parsed_value: f64 = s.parse().map_err(|_| format!("Invalid float: {}", s))?;
if parsed_value >= 0.0 && parsed_value <= 1.0 {

Check failure on line 21 in src/config.rs

View workflow job for this annotation

GitHub Actions / Clippy

manual `RangeInclusive::contains` implementation
Ok(parsed_value)
} else {
Err("Value must be between 0.0 and 1.0".to_string())
}
}

/// Parse a single key-value pair
fn parse_key_val<T, U>(s: &str) -> Result<(T, U), anyhow::Error>
where
Expand Down Expand Up @@ -326,6 +335,35 @@ impl Display for WeightedFunction {
}
}

#[derive(Parser, Debug, Serialize, Deserialize)]
pub struct RateConf {
/// Number of cycles per second to execute.
/// If not given, the load cycles will be executed as fast as possible.
#[clap(short('r'), long, value_name = "COUNT")]
pub rate: Option<f64>,

/// Used to enable the 'sinusoidal' rate generation and defines
/// the relative value (0.0..1.0) for the variation from the main rate (sine amplitude).
/// Requires the "rate" option to be defined.
#[clap(
long("rate-sine-amplitude"),
aliases = &["rate-amplitude", "rate-sine-variation", "rate-variation"],
value_parser = parse_f64,
value_name = "RATE-MULTIPLIER",
)]
pub rate_sine_amplitude: Option<f64>,

/// Used for the 'sinusoidal' rate generation as a definition of the rate sine wave period.
#[clap(
long("rate-sine-period"),
aliases = &["sine-period"],
default_value = "1m",
value_name = "DURATION",
value_parser = parse_duration::parse,
)]
pub rate_sine_period: Duration,
}

#[derive(Parser, Debug, Serialize, Deserialize)]
#[command(next_line_help = true)]
pub struct EditCommand {
Expand Down Expand Up @@ -353,10 +391,8 @@ pub struct SchemaCommand {
#[derive(Parser, Debug, Serialize, Deserialize)]
#[command(next_line_help = true)]
pub struct LoadCommand {
/// Number of cycles per second to execute.
/// If not given, the load cycles will be executed as fast as possible.
#[clap(short('r'), long, value_name = "COUNT")]
pub rate: Option<f64>,
#[clap(flatten)]
pub rate: RateConf,

/// Number of worker threads used by the driver.
#[clap(short('t'), long, default_value = "1", value_name = "COUNT")]
Expand Down Expand Up @@ -386,10 +422,8 @@ pub struct LoadCommand {
#[derive(Parser, Debug, Serialize, Deserialize)]
#[command(next_line_help = true)]
pub struct RunCommand {
/// Number of cycles per second to execute.
/// If not given, the benchmark cycles will be executed as fast as possible.
#[clap(short('r'), long, value_name = "COUNT")]
pub rate: Option<f64>,
#[clap(flatten)]
pub rate: RateConf,

/// Number of cycles or duration of the warmup phase.
#[clap(
Expand Down
146 changes: 134 additions & 12 deletions src/exec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ use itertools::Itertools;
use pin_project::pin_project;
use status_line::StatusLine;
use std::cmp::max;
use std::f64::consts;
use std::future::ready;
use std::num::NonZeroUsize;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Instant;
use std::time::{Duration, Instant};
use tokio::pin;
use tokio::signal::ctrl_c;
use tokio::time::MissedTickBehavior;
use tokio_stream::wrappers::IntervalStream;
Expand All @@ -27,6 +29,95 @@ pub mod cycle;
pub mod progress;
pub mod workload;

/// Infinite iterator returning floats that form a sinusoidal wave
pub struct InfiniteSinusoidalIterator {
rate: f64,
amplitude: f64,
step: f64,
start: Instant,
}

impl InfiniteSinusoidalIterator {
pub fn new(rate: f64, amplitude: f64, frequency: f64) -> InfiniteSinusoidalIterator {
let step = consts::PI * 2.0 * frequency;
InfiniteSinusoidalIterator {
rate,
amplitude,
step: step,

Check failure on line 46 in src/exec/mod.rs

View workflow job for this annotation

GitHub Actions / Clippy

redundant field names in struct initialization
start: Instant::now(),
}
}
}

impl Iterator for InfiniteSinusoidalIterator {
type Item = f64;

fn next(&mut self) -> Option<f64> {
let elapsed = self.start.elapsed().as_secs_f64();
let adjusted_rate = self.rate + self.amplitude * (self.step * elapsed).sin();
Some(adjusted_rate)
}
}

/// Custom interval stream for sinusoidal ticking.
struct SinusoidalIntervalStream {
tick_iterator: InfiniteSinusoidalIterator,
next_expected_tick: Instant,
}

impl SinusoidalIntervalStream {
fn new(rate: f64, amplitude: f64, frequency: f64) -> Self {
let tick_iterator = InfiniteSinusoidalIterator::new(rate, amplitude, frequency);
let now = Instant::now();
let initial_duration = tokio::time::Duration::from_secs_f64(1.0 / rate);
Self {
tick_iterator,
next_expected_tick: now + initial_duration,
}
}

fn next_interval_duration(&mut self) -> tokio::time::Duration {
let adjusted_rate = self.tick_iterator.next();
let period_ns = (1_000_000_000.0 / adjusted_rate.unwrap_or(1.0)).max(1.0) as u64;
tokio::time::Duration::from_nanos(period_ns)
}
}

impl Stream for SinusoidalIntervalStream {
type Item = ();

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let now = Instant::now();
if now >= self.next_expected_tick {
let interval_duration = self.next_interval_duration();
self.next_expected_tick += interval_duration;
Poll::Ready(Some(()))
} else {
// NOTE: If we are behind, keep trying to emit ticks until we catch up
let interval_duration = self.next_interval_duration();
let next_tick = self.next_expected_tick + interval_duration;
if next_tick <= now {
self.next_expected_tick = next_tick;
Poll::Ready(Some(()))
} else {
// NOTE: If we are ahead, sleep for the remaining duration
let sleep_duration = self.next_expected_tick - now;
let waker = cx.waker().clone();
tokio::spawn(async move {
tokio::time::sleep(sleep_duration).await;
waker.wake();
});
Poll::Pending
}
}
}
}

/// Returns a stream emitting sinusoidally changing number of `rate` events per second.
fn sinusoidal_interval_stream(rate: f64, amplitude: f64, frequency: f64) -> impl Stream<Item = ()> {
SinusoidalIntervalStream::new(rate, amplitude, frequency)
}

/// Returns a stream emitting `rate` events per second.
fn interval_stream(rate: f64) -> IntervalStream {
let period = tokio::time::Duration::from_nanos(max(1, (1000000000.0 / rate) as u64));
Expand Down Expand Up @@ -100,6 +191,8 @@ async fn run_stream<T>(
fn spawn_stream(

Check failure on line 191 in src/exec/mod.rs

View workflow job for this annotation

GitHub Actions / Clippy

this function has too many arguments (8/7)
concurrency: NonZeroUsize,
rate: Option<f64>,
rate_sine_amplitude: Option<f64>, // Enables the sine wave if set
rate_sine_frequency: f64,
sampling: Interval,
workload: Workload,
iter_counter: BoundedCycleCounter,
Expand All @@ -110,17 +203,38 @@ fn spawn_stream(
tokio::spawn(async move {
match rate {
Some(rate) => {
let stream = interval_stream(rate);
run_stream(
stream,
workload,
iter_counter,
concurrency,
sampling,
progress,
tx,
)
.await
match rate_sine_amplitude {
Some(rate_sine_amplitude) => {
let stream = sinusoidal_interval_stream(
rate,
rate_sine_amplitude * rate, // transform to absolute value
rate_sine_frequency,
);
run_stream(
stream,
workload,
iter_counter,
concurrency,
sampling,
progress,
tx,
)
.await
}
None => {
let stream = interval_stream(rate);
run_stream(
stream,
workload,
iter_counter,
concurrency,
sampling,
progress,
tx,
)
.await
}
}
}
None => {
let stream = futures::stream::repeat_with(|| ());
Expand Down Expand Up @@ -163,6 +277,10 @@ pub struct ExecutionOptions {
pub cycle_range: (i64, i64),
/// Maximum rate of requests in requests per second, `None` means no limit
pub rate: Option<f64>,
/// Rate sine wave amplitude
pub rate_sine_amplitude: Option<f64>,
/// Rate sine wave period
pub rate_sine_period: Duration,
/// Number of parallel threads of execution
pub threads: NonZeroUsize,
/// Number of outstanding async requests per each thread
Expand Down Expand Up @@ -196,6 +314,8 @@ pub async fn par_execute(
let thread_count = exec_options.threads.get();
let concurrency = exec_options.concurrency;
let rate = exec_options.rate;
let rate_sine_amplitude = exec_options.rate_sine_amplitude;
let rate_sine_frequency = 1.0 / exec_options.rate_sine_period.as_secs_f64();
let progress = match exec_options.duration {
Interval::Count(count) => Progress::with_count(name.to_string(), count),
Interval::Time(duration) => Progress::with_duration(name.to_string(), duration),
Expand All @@ -214,6 +334,8 @@ pub async fn par_execute(
let s = spawn_stream(
concurrency,
rate.map(|r| r / (thread_count as f64)),
rate_sine_amplitude,
rate_sine_frequency,
sampling,
workload.clone()?,
deadline.share(),
Expand Down
10 changes: 8 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ async fn load(conf: LoadCommand) -> Result<()> {
let load_options = ExecutionOptions {
duration: config::Interval::Count(load_count),
cycle_range: (0, i64::MAX),
rate: conf.rate,
rate: conf.rate.rate,
rate_sine_amplitude: conf.rate.rate_sine_amplitude,
rate_sine_period: conf.rate.rate_sine_period,
threads: conf.threads,
concurrency: conf.concurrency,
};
Expand Down Expand Up @@ -250,6 +252,8 @@ async fn run(conf: RunCommand) -> Result<()> {
duration: conf.warmup_duration,
cycle_range: (conf.start_cycle, conf.end_cycle),
rate: None,
rate_sine_amplitude: conf.rate.rate_sine_amplitude,
rate_sine_period: conf.rate.rate_sine_period,
threads: conf.threads,
concurrency: conf.concurrency,
};
Expand Down Expand Up @@ -278,7 +282,9 @@ async fn run(conf: RunCommand) -> Result<()> {
duration: conf.run_duration,
cycle_range: (conf.start_cycle, conf.end_cycle),
concurrency: conf.concurrency,
rate: conf.rate,
rate: conf.rate.rate,
rate_sine_amplitude: conf.rate.rate_sine_amplitude,
rate_sine_period: conf.rate.rate_sine_period,
threads: conf.threads,
};

Expand Down
4 changes: 2 additions & 2 deletions src/report/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl Report {
.and_then(|ts| Local.timestamp_opt(ts, 0).latest()),
tags: self.conf.tags.clone(),
params: self.conf.params.clone(),
rate: self.conf.rate,
rate: self.conf.rate.rate,
throughput: self.result.cycle_throughput.value,
latency_p50: self
.result
Expand Down Expand Up @@ -545,7 +545,7 @@ impl<'a> Display for RunConfigCmp<'a> {
self.line("Concurrency", "req", |conf| {
Quantity::from(conf.concurrency)
}),
self.line("Max rate", "op/s", |conf| Quantity::from(conf.rate)),
self.line("Max rate", "op/s", |conf| Quantity::from(conf.rate.rate)),
self.line("Warmup", "s", |conf| {
Quantity::from(conf.warmup_duration.period_secs())
}),
Expand Down

0 comments on commit 8023916

Please sign in to comment.