From 3be7104b83106f876fc96aff9e8835addfc61291 Mon Sep 17 00:00:00 2001 From: stormshield-frb <144998884+stormshield-frb@users.noreply.github.com> Date: Tue, 17 Dec 2024 12:43:57 +0100 Subject: [PATCH] fix(kad): improve memory allocation when iterating over kbuckets Proposal to fix https://github.com/libp2p/rust-libp2p/issues/5712. I have changed to `ClosestIter` structure to only allocate when `kbucket_size` is higher than `K_VALUE` and only once along the life of `ClosestIter`. I think I did not break anything but I would really like some experienced people with Kademlia to take a look (@guillaumemichel :wink:). Pull-Request: #5715. --- protocols/kad/CHANGELOG.md | 2 + protocols/kad/src/behaviour.rs | 2 + protocols/kad/src/kbucket.rs | 109 ++++++++++++++++++++++----------- 3 files changed, 76 insertions(+), 37 deletions(-) diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index 22af5fb5074..71ef499a179 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -2,6 +2,8 @@ - Expose Distance private field U256 to public. See [PR 5705](https://github.com/libp2p/rust-libp2p/pull/5705). +- Fix systematic memory allocation when iterating over `KBuckets`. + See [PR 5715](https://github.com/libp2p/rust-libp2p/pull/5715). ## 0.47.0 diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 988a16dc41f..1ba8b1e27af 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -434,6 +434,8 @@ impl Config { /// Sets the configuration for the k-buckets. /// /// * Default to K_VALUE. + /// + /// **WARNING**: setting a `size` higher that `K_VALUE` may imply additional memory allocations. pub fn set_kbucket_size(&mut self, size: NonZeroUsize) -> &mut Self { self.kbucket_config.set_bucket_size(size); self diff --git a/protocols/kad/src/kbucket.rs b/protocols/kad/src/kbucket.rs index 3f4b1281c3a..f32c34e9bb7 100644 --- a/protocols/kad/src/kbucket.rs +++ b/protocols/kad/src/kbucket.rs @@ -77,6 +77,7 @@ use std::{collections::VecDeque, num::NonZeroUsize, time::Duration}; use bucket::KBucket; pub use bucket::NodeStatus; pub use entry::*; +use smallvec::SmallVec; use web_time::Instant; /// Maximum number of k-buckets. @@ -282,11 +283,8 @@ where iter: None, table: self, buckets_iter: ClosestBucketsIter::new(distance), - fmap: move |b: &KBucket| -> Vec<_> { - let mut vec = Vec::with_capacity(bucket_size); - vec.extend(b.iter().map(|(n, _)| n.key.clone())); - vec - }, + fmap: |(n, _status): (&Node, NodeStatus)| n.key.clone(), + bucket_size, } } @@ -307,15 +305,11 @@ where iter: None, table: self, buckets_iter: ClosestBucketsIter::new(distance), - fmap: move |b: &KBucket<_, TVal>| -> Vec<_> { - b.iter() - .take(bucket_size) - .map(|(n, status)| EntryView { - node: n.clone(), - status, - }) - .collect() + fmap: |(n, status): (&Node, NodeStatus)| EntryView { + node: n.clone(), + status, }, + bucket_size, } } @@ -358,10 +352,12 @@ struct ClosestIter<'a, TTarget, TKey, TVal, TMap, TOut> { /// distance of the local key to the target. buckets_iter: ClosestBucketsIter, /// The iterator over the entries in the currently traversed bucket. - iter: Option>, + iter: Option>, /// The projection function / mapping applied on each bucket as /// it is encountered, producing the next `iter`ator. fmap: TMap, + /// The maximal number of nodes that a bucket can contain. + bucket_size: usize, } /// An iterator over the bucket indices, in the order determined by the `Distance` of @@ -463,41 +459,80 @@ where TTarget: AsRef, TKey: Clone + AsRef, TVal: Clone, - TMap: Fn(&KBucket) -> Vec, + TMap: Fn((&Node, NodeStatus)) -> TOut, TOut: AsRef, { type Item = TOut; fn next(&mut self) -> Option { loop { - match &mut self.iter { - Some(iter) => match iter.next() { - Some(k) => return Some(k), - None => self.iter = None, - }, - None => { - if let Some(i) = self.buckets_iter.next() { - let bucket = &mut self.table.buckets[i.get()]; - if let Some(applied) = bucket.apply_pending() { - self.table.applied_pending.push_back(applied) - } - let mut v = (self.fmap)(bucket); - v.sort_by(|a, b| { - self.target - .as_ref() - .distance(a.as_ref()) - .cmp(&self.target.as_ref().distance(b.as_ref())) - }); - self.iter = Some(v.into_iter()); - } else { - return None; - } + let (mut buffer, bucket_index) = if let Some(mut iter) = self.iter.take() { + if let Some(next) = iter.next() { + self.iter = Some(iter); + return Some(next); } + + let bucket_index = self.buckets_iter.next()?; + + // Reusing the same buffer so if there were any allocation, it only happen once over + // a `ClosestIter` life. + iter.buffer.clear(); + + (iter.buffer, bucket_index) + } else { + let bucket_index = self.buckets_iter.next()?; + + // Allocation only occurs if `kbucket_size` is greater than `K_VALUE`. + (SmallVec::with_capacity(self.bucket_size), bucket_index) + }; + + let bucket = &mut self.table.buckets[bucket_index.get()]; + if let Some(applied) = bucket.apply_pending() { + self.table.applied_pending.push_back(applied) } + + buffer.extend( + bucket + .iter() + .take(self.bucket_size) + .map(|e| (self.fmap)(e)) + .map(Some), + ); + buffer.sort_by(|a, b| { + let a = a.as_ref().expect("just initialized"); + let b = b.as_ref().expect("just initialized"); + self.target + .as_ref() + .distance(a.as_ref()) + .cmp(&self.target.as_ref().distance(b.as_ref())) + }); + + self.iter = Some(ClosestIterBuffer::new(buffer)); } } } +struct ClosestIterBuffer { + buffer: SmallVec<[Option; K_VALUE.get()]>, + index: usize, +} + +impl ClosestIterBuffer { + fn new(buffer: SmallVec<[Option; K_VALUE.get()]>) -> Self { + Self { buffer, index: 0 } + } +} + +impl Iterator for ClosestIterBuffer { + type Item = TOut; + + fn next(&mut self) -> Option { + let entry = self.buffer.get_mut(self.index)?; + self.index += 1; + entry.take() + } +} + /// A reference to a bucket. pub struct KBucketRef<'a, TKey, TVal> { index: BucketIndex,