Skip to content

Commit

Permalink
Use all available threads for aggregation verification
Browse files Browse the repository at this point in the history
  • Loading branch information
hrxi committed Nov 25, 2024
1 parent 8336a5b commit bd92653
Showing 1 changed file with 31 additions and 26 deletions.
57 changes: 31 additions & 26 deletions handel/src/aggregation.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::{
num::NonZeroUsize,
pin::Pin,
task::{Context, Poll},
thread,
};

use futures::{
future::{BoxFuture, FutureExt},
stream::{BoxStream, Stream, StreamExt},
stream::{BoxStream, FuturesUnordered, Stream, StreamExt},

Check warning on line 10 in handel/src/aggregation.rs

View workflow job for this annotation

GitHub Actions / Clippy Report

use of a disallowed type `futures_util::stream::FuturesUnordered`

warning: use of a disallowed type `futures_util::stream::FuturesUnordered` --> handel/src/aggregation.rs:10:25 | 10 | stream::{BoxStream, FuturesUnordered, Stream, StreamExt}, | ^^^^^^^^^^^^^^^^ | = note: use `nimiq_utils::stream::FuturesUnordered` instead, it does not need manual `Waker`s = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#disallowed_types = note: `#[warn(clippy::disallowed_types)]` on by default
};
use nimiq_time::{interval, Interval};

Expand Down Expand Up @@ -68,12 +70,15 @@ where

/// Future of the currently verified pending contribution.
/// There is only ever one contribution being verified at a time.
current_verification:
Option<BoxFuture<'static, (VerificationResult, PendingContribution<P::Contribution>)>>,
current_verifications: FuturesUnordered<
BoxFuture<'static, (VerificationResult, PendingContribution<P::Contribution>)>,
>,

Check warning on line 75 in handel/src/aggregation.rs

View workflow job for this annotation

GitHub Actions / Clippy Report

use of a disallowed type `futures_util::stream::FuturesUnordered`

warning: use of a disallowed type `futures_util::stream::FuturesUnordered` --> handel/src/aggregation.rs:73:28 | 73 | current_verifications: FuturesUnordered< | ____________________________^ 74 | | BoxFuture<'static, (VerificationResult, PendingContribution<P::Contribution>)>, 75 | | >, | |_____^ | = note: use `nimiq_utils::stream::FuturesUnordered` instead, it does not need manual `Waker`s = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#disallowed_types

/// The final result of the aggregation once it has been produced.
/// A `Some(_)` value here indicates that the aggregation has finished.
final_result: Option<P::Contribution>,

available_parallelism: usize,
}

impl<TId, P, N> Aggregation<TId, P, N>
Expand Down Expand Up @@ -131,8 +136,9 @@ where
network: sender,
start_level_interval,
periodic_update_interval,
current_verification: None,
current_verifications: FuturesUnordered::new(),

Check warning on line 139 in handel/src/aggregation.rs

View workflow job for this annotation

GitHub Actions / Clippy Report

use of a disallowed type `futures_util::stream::FuturesUnordered`

warning: use of a disallowed type `futures_util::stream::FuturesUnordered` --> handel/src/aggregation.rs:139:36 | 139 | current_verifications: FuturesUnordered::new(), | ^^^^^^^^^^^^^^^^ | = note: use `nimiq_utils::stream::FuturesUnordered` instead, it does not need manual `Waker`s = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#disallowed_types
final_result: None,
available_parallelism: thread::available_parallelism().map_or(1, NonZeroUsize::get),
}
}

Expand Down Expand Up @@ -406,7 +412,7 @@ where
return Some(result);
}

self.current_verification = Some(fut);
self.current_verifications.push(fut);
None
}

Expand Down Expand Up @@ -466,27 +472,25 @@ where
}

// Poll the verification future if there is one.
if let Some(future) = &mut self.current_verification {
if let Poll::Ready((result, contribution)) = future.poll_unpin(cx) {
// If a result is produced, unset the future such that a new one can take its place.
self.current_verification = None;

if result.is_ok() {
// If the contribution was successfully verified, apply it and return the new
// best aggregate.
best_aggregate = Some(self.apply_contribution(contribution))
} else {
// Verification failed, ban sender.
warn!(
id = %self.protocol.identify(),
?result,
?contribution,
"Rejecting invalid contribution"
);
self.network.ban_node(contribution.origin);
}
while let Poll::Ready(Some((result, contribution))) =
self.current_verifications.poll_next_unpin(cx)
{
if result.is_ok() {
// If the contribution was successfully verified, apply it and return the new
// best aggregate.
best_aggregate = Some(self.apply_contribution(contribution));
break;
} else {
// Verification failed, ban sender.
warn!(
id = %self.protocol.identify(),
?result,
?contribution,
"Rejecting invalid contribution"
);
self.network.ban_node(contribution.origin);
}
};
}

// Check if the automatic update interval triggers, if so perform the update.
while let Poll::Ready(_instant) = self.periodic_update_interval.poll_next_unpin(cx) {
Expand All @@ -505,7 +509,8 @@ where
// does not have produced a value yet. This is necessary as the verification future could
// resolve immediately producing a second item for the stream. As the new best aggregate
// will be returned, this stream will be polled again creating the future in the next poll.
if self.current_verification.is_none() && best_aggregate.is_none() {
if self.current_verifications.len() < self.available_parallelism && best_aggregate.is_none()
{
// Get the next best pending contribution.
while let Poll::Ready(Some(contribution)) =
self.pending_contributions.poll_next_unpin(cx)
Expand Down

0 comments on commit bd92653

Please sign in to comment.