From 9fa4932598b872ed84090c76c98d44be4224a676 Mon Sep 17 00:00:00 2001 From: praveen-influx Date: Mon, 16 Dec 2024 15:12:19 +0000 Subject: [PATCH] chore: porting the changes from pro (#25660) move ringbuffer to be allocated on heap as the MAX_CAPACITY per event type has gone up to 10k --- .../benches/store_benchmark.rs | 2 +- influxdb3_sys_events/src/lib.rs | 29 ++++++++++++------- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/influxdb3_sys_events/benches/store_benchmark.rs b/influxdb3_sys_events/benches/store_benchmark.rs index 1a4480b9d39..d577b341e57 100644 --- a/influxdb3_sys_events/benches/store_benchmark.rs +++ b/influxdb3_sys_events/benches/store_benchmark.rs @@ -12,7 +12,7 @@ use iox_time::{SystemProvider, TimeProvider}; use observability_deps::tracing::debug; use rand::Rng; -const MAX_WRITE_ITERATIONS: u32 = 5000; +const MAX_WRITE_ITERATIONS: u32 = 100_000; #[allow(dead_code)] #[derive(Debug)] diff --git a/influxdb3_sys_events/src/lib.rs b/influxdb3_sys_events/src/lib.rs index 9ac936f9fdc..a6748a5794a 100644 --- a/influxdb3_sys_events/src/lib.rs +++ b/influxdb3_sys_events/src/lib.rs @@ -10,7 +10,7 @@ use arrow_array::RecordBatch; use dashmap::DashMap; use iox_time::TimeProvider; -const MAX_CAPACITY: usize = 1000; +const MAX_CAPACITY: usize = 10_000; /// This trait is not dyn compatible pub trait ToRecordBatch { @@ -105,34 +105,41 @@ impl SysEventStore { } } -pub type RingBuffer = RingBufferArray; +// we've increased the max capacity to 10k by default, it makes +// sense to use heap. +pub type RingBuffer = RingBufferVec; -pub struct RingBufferArray { - buf: [Option; N], +pub struct RingBufferVec { + buf: Vec, max: usize, write_index: usize, } -impl RingBufferArray { +impl RingBufferVec { fn new(capacity: usize) -> Self { - let buf_array: [Option; N] = [const { None }; N]; Self { - buf: buf_array, + buf: Vec::with_capacity(capacity), max: capacity, write_index: 0, } } fn push(&mut self, val: T) { - let _ = replace(&mut self.buf[self.write_index], Some(val)); + if !self.is_at_max() { + self.buf.push(val); + } else { + let _ = replace(&mut self.buf[self.write_index], val); + } self.write_index = (self.write_index + 1) % self.max; } + fn is_at_max(&mut self) -> bool { + self.buf.len() >= self.max + } + pub fn in_order(&self) -> impl Iterator { let (head, tail) = self.buf.split_at(self.write_index); - tail.iter() - .chain(head.iter()) - .filter_map(|item| item.as_ref()) + tail.iter().chain(head.iter()) } }