Skip to content

Commit

Permalink
chore: porting the changes from pro (#25660)
Browse files Browse the repository at this point in the history
move ringbuffer to be allocated on heap as the MAX_CAPACITY per event
type has gone up to 10k
  • Loading branch information
praveen-influx authored Dec 16, 2024
1 parent df84f9e commit 9fa4932
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 12 deletions.
2 changes: 1 addition & 1 deletion influxdb3_sys_events/benches/store_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use iox_time::{SystemProvider, TimeProvider};
use observability_deps::tracing::debug;
use rand::Rng;

const MAX_WRITE_ITERATIONS: u32 = 5000;
const MAX_WRITE_ITERATIONS: u32 = 100_000;

#[allow(dead_code)]
#[derive(Debug)]
Expand Down
29 changes: 18 additions & 11 deletions influxdb3_sys_events/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use arrow_array::RecordBatch;
use dashmap::DashMap;
use iox_time::TimeProvider;

const MAX_CAPACITY: usize = 1000;
const MAX_CAPACITY: usize = 10_000;

/// This trait is not dyn compatible
pub trait ToRecordBatch<E> {
Expand Down Expand Up @@ -105,34 +105,41 @@ impl SysEventStore {
}
}

pub type RingBuffer<T> = RingBufferArray<T, MAX_CAPACITY>;
// we've increased the max capacity to 10k by default, it makes
// sense to use heap.
pub type RingBuffer<T> = RingBufferVec<T>;

pub struct RingBufferArray<T, const N: usize> {
buf: [Option<T>; N],
pub struct RingBufferVec<T> {
buf: Vec<T>,
max: usize,
write_index: usize,
}

impl<T, const N: usize> RingBufferArray<T, N> {
impl<T> RingBufferVec<T> {
fn new(capacity: usize) -> Self {
let buf_array: [Option<T>; N] = [const { None }; N];
Self {
buf: buf_array,
buf: Vec::with_capacity(capacity),
max: capacity,
write_index: 0,
}
}

fn push(&mut self, val: T) {
let _ = replace(&mut self.buf[self.write_index], Some(val));
if !self.is_at_max() {
self.buf.push(val);
} else {
let _ = replace(&mut self.buf[self.write_index], val);
}
self.write_index = (self.write_index + 1) % self.max;
}

fn is_at_max(&mut self) -> bool {
self.buf.len() >= self.max
}

pub fn in_order(&self) -> impl Iterator<Item = &T> {
let (head, tail) = self.buf.split_at(self.write_index);
tail.iter()
.chain(head.iter())
.filter_map(|item| item.as_ref())
tail.iter().chain(head.iter())
}
}

Expand Down

0 comments on commit 9fa4932

Please sign in to comment.