Skip to content

Commit

Permalink
Reverting the limiting of req missing blocks to the current batch.
Browse files Browse the repository at this point in the history
  • Loading branch information
ii-cruz committed Nov 12, 2024
1 parent 2153ecd commit 09c424d
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 239 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -318,10 +318,6 @@ impl<N: Network> BlockRequestComponent<N> {
self.pending_requests.contains(target_block_hash)
}

pub fn has_pending_requests(&self) -> bool {
!self.pending_requests.is_empty()
}

pub fn add_peer(&self, peer_id: N::PeerId) {
self.peers.write().add_peer(peer_id);
}
Expand Down
19 changes: 2 additions & 17 deletions consensus/src/sync/live/block_queue/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,24 +216,9 @@ impl<N: Network> BlockQueue<N> {
macro_height
);
block_source.ignore_block(&self.network);
} else if !self.request_component.has_pending_requests()
|| macro_height == Policy::last_macro_block(block_number)
{
// We only allow a new request missing blocks to start if the block is from the
// current batch or if there are no ongoing request.
self.buffer_and_request_missing_blocks(block, block_source);
} else {
// If we are on not within the same batch or we already are requesting blocks,
// we just buffer it without requesting for blocks.
// Any potential gaps will be filled after we sync up to the batch.
if self.insert_block_into_buffer(block, block_source) {
log::trace!(block_number, "Buffering block");
} else {
log::trace!(
block_number,
"Not buffering block - already known or exceeded the per peer limit",
);
}
// Block is inside the buffer window, put it in the buffer.
self.buffer_and_request_missing_blocks(block, block_source);
}

None
Expand Down
249 changes: 31 additions & 218 deletions consensus/tests/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ use nimiq_test_utils::{
},
mock_node::MockNode,
node::TESTING_BLS_CACHE_MAX_CAPACITY,
test_rng::test_rng,
};
use nimiq_utils::time::OffsetTime;
use parking_lot::{Mutex, RwLock};
use rand::Rng;
use tokio::{sync::mpsc, task::yield_now};
use tokio_stream::wrappers::ReceiverStream;

Expand Down Expand Up @@ -247,288 +249,99 @@ async fn send_two_micro_blocks_out_of_order() {
}

#[test(tokio::test)]
async fn try_to_induce_request_missing_blocks_on_next_batch() {
async fn send_micro_blocks_out_of_order() {
let blockchain1 = blockchain();
let blockchain_proxy_1 = BlockchainProxy::from(&blockchain1);
let blockchain2 = blockchain();

let mut hub = MockHub::new();
let network = Arc::new(hub.new_network());
let (block_tx, block_rx) = mpsc::channel(32);

let block_queue = BlockQueue::with_gossipsub_block_stream(
blockchain_proxy_1.clone(),
Arc::clone(&network),
ReceiverStream::new(block_rx).boxed(),
QueueConfig::default(),
);

let live_sync = BlockLiveSync::with_queue(
blockchain_proxy_1.clone(),
Arc::clone(&network),
block_queue,
bls_cache(),
);

let mut syncer = Syncer::new(
blockchain_proxy_1,
Arc::clone(&network),
live_sync,
MockHistorySyncStream::new(),
);

let mut mock_node = MockNode::with_network_and_blockchain(
Arc::new(hub.new_network()),
Arc::clone(&blockchain2),
);

let mock_node =
MockNode::with_network_and_blockchain(Arc::new(hub.new_network()), blockchain());
network.dial_mock(&mock_node.network);
syncer
.live_sync
.add_peer(mock_node.network.get_local_peer_id());

let mut blocks = Vec::new();
let mut rng = test_rng(false);
let mut ordered_blocks = Vec::new();

let mock_id = MockId::new(mock_node.network.get_local_peer_id());
let producer = BlockProducer::new(signing_key(), voting_key());
let first_gossiped_block = 6;
let n_blocks = rng.gen_range(2..15);

for _ in 0..first_gossiped_block + 2 {
for _ in 0..n_blocks {
let block = push_micro_block(&producer, &blockchain2);
blocks.push(block);
ordered_blocks.push(block);
}
// Finish producing that batch and add one more block on the next batch.
produce_macro_blocks(&producer, &blockchain2, 1);
let block_next_batch = push_micro_block(&producer, &blockchain2);
let block_next_batch2 = push_micro_block(&producer, &blockchain2);

// Send block 6.
block_tx
.send((blocks[first_gossiped_block - 1].clone(), mock_id.clone()))
.await
.unwrap();
// Run the block_queue one iteration, i.e. until it processed one block.
let _ = poll!(syncer.next());
// Yield to allow the internal BlockQueue task to proceed.
yield_now().await;
let mut blocks = ordered_blocks.clone();

// Only block 6 should be buffered and a request missing blocks should be sent.
assert_eq!(
blockchain1.read().block_number(),
Policy::genesis_block_number()
);
// Obtain the buffered blocks
assert_eq!(syncer.live_sync.queue().num_buffered_blocks() as u64, 1);
while blocks.len() > 1 {
let index = rng.gen_range(1..blocks.len());

// Send block on the next batch and assert that we buffer it without sending missing blocks requests.
block_tx
.send((block_next_batch.clone(), mock_id.clone()))
.await
.unwrap();
let _ = poll!(syncer.next());
yield_now().await;

// No new blocks should be added to the chain.
assert_eq!(
blockchain1.read().block_number(),
Policy::genesis_block_number()
);
// We should have 2 buffered block, the first block that was gossiped and the new gossiped one.
assert_eq!(syncer.live_sync.queue().num_buffered_blocks() as u64, 2);

// Now send blocks 1-5 to fill the gap.
for i in 0..first_gossiped_block - 1 {
block_tx
.send((blocks[i].clone(), mock_id.clone()))
.send((blocks.remove(index).clone(), mock_id.clone()))
.await
.unwrap();
syncer.next().await;
}
syncer.next().await;

// Verify all blocks except the genesis.
for i in 1..=first_gossiped_block - 1 {
assert_eq!(
blockchain1
.read()
.get_block_at(i as u32 + Policy::genesis_block_number(), true, None)
.unwrap(),
blocks[(i - 1) as usize]
);
// Run the block_queue one iteration, i.e. until it processed one block
let _ = poll!(syncer.next());
// Yield to allow the internal BlockQueue task to proceed.
yield_now().await;
}
assert_eq!(syncer.live_sync.queue().num_buffered_blocks(), 1);

// Send block on next batch again, nothing should be done since the block is already buffered.
block_tx
.send((block_next_batch.clone(), mock_id.clone()))
.await
.unwrap();
let _ = poll!(syncer.next());
yield_now().await;
assert_eq!(
blockchain1.read().block_number(),
first_gossiped_block as u32 + Policy::genesis_block_number()
);
assert_eq!(syncer.live_sync.queue().num_buffered_blocks() as u64, 1);
// Let old missing blocks request be processed.
// The reply will result in the blocks being discarded since they have been applied through the block stream.
mock_node.next().await;
syncer.next().await;
assert_eq!(syncer.live_sync.queue().num_buffered_blocks() as u64, 1);

// Send block on the next batch. It is already buffered but now a missing blocks request should be sent.
block_tx
.send((block_next_batch2.clone(), mock_id.clone()))
.await
.unwrap();
_ = poll!(syncer.next());
yield_now().await;
assert_eq!(
blockchain1.read().block_number(),
first_gossiped_block as u32 + Policy::genesis_block_number()
);
assert_eq!(syncer.live_sync.queue().num_buffered_blocks() as u64, 2);

// Allow the last requests missing blocks to finish.
mock_node.next().await;
syncer.next().await;
syncer.next().await; // To push the buffered block 1
assert_eq!(blockchain1.read().head_hash(), block_next_batch.hash());
syncer.next().await;
_ = poll!(syncer.next()); // To push the buffered block 2
assert_eq!(blockchain1.read().head_hash(), block_next_batch2.hash());
assert_eq!(syncer.live_sync.queue().num_buffered_blocks() as u64, 0);
}

#[test(tokio::test)]
async fn try_to_induce_request_missing_blocks_gaps() {
let blockchain1 = blockchain();
let blockchain_proxy_1 = BlockchainProxy::from(&blockchain1);
let blockchain2 = blockchain();

let mut hub = MockHub::new();
let network = Arc::new(hub.new_network());
let (block_tx, block_rx) = mpsc::channel(32);

let block_queue = BlockQueue::with_gossipsub_block_stream(
blockchain_proxy_1.clone(),
Arc::clone(&network),
ReceiverStream::new(block_rx).boxed(),
QueueConfig::default(),
);

let live_sync = BlockLiveSync::with_queue(
blockchain_proxy_1.clone(),
Arc::clone(&network),
block_queue,
bls_cache(),
);

let mut syncer = Syncer::new(
blockchain_proxy_1,
Arc::clone(&network),
live_sync,
MockHistorySyncStream::new(),
);

let mut mock_node = MockNode::with_network_and_blockchain(
Arc::new(hub.new_network()),
Arc::clone(&blockchain2),
);

network.dial_mock(&mock_node.network);
syncer
.live_sync
.add_peer(mock_node.network.get_local_peer_id());

let mut blocks = Vec::new();

let mock_id = MockId::new(mock_node.network.get_local_peer_id());
let producer = BlockProducer::new(signing_key(), voting_key());
let total_blocks: usize = 10;
let first_gossiped_block: usize = 6;

for _ in 0..total_blocks {
let block = push_micro_block(&producer, &blockchain2);
blocks.push(block);
}

// Send block 6.
block_tx
.send((blocks[first_gossiped_block - 1].clone(), mock_id.clone()))
.await
.unwrap();
// Run the block_queue one iteration, i.e. until it processed one block.
let _ = poll!(syncer.next());
// Yield to allow the internal BlockQueue task to proceed.
yield_now().await;

// Only block 6 should be buffered and a request missing blocks should be sent.
// All blocks should be buffered
assert_eq!(
blockchain1.read().block_number(),
Policy::genesis_block_number()
);
// Obtain the buffered blocks
assert_eq!(syncer.live_sync.queue().num_buffered_blocks() as u64, 1);

// Send block 8, thus creating a gap and assert that we request missing blocks for it.
let index = 7;
block_tx
.send((blocks[index].clone(), mock_id.clone()))
.await
.unwrap();
let _ = poll!(syncer.next());
yield_now().await;

// Obtain the buffered blocks
assert_eq!(
blockchain1.read().block_number(),
Policy::genesis_block_number()
syncer.live_sync.queue().num_buffered_blocks() as u64,
n_blocks - 1
);
// One more block should be buffered.
assert_eq!(syncer.live_sync.queue().num_buffered_blocks() as u64, 2);

// Now send blocks 1-5 to fill the gap.
for i in 0..first_gossiped_block - 1 {
block_tx
.send((blocks[i].clone(), mock_id.clone()))
.await
.unwrap();
// Now send block1 to fill the gap
block_tx.send((blocks[0].clone(), mock_id)).await.unwrap();

for _ in 0..n_blocks {
syncer.next().await;
}
syncer.next().await;

// Verify all blocks except the genesis.
for i in 1..=first_gossiped_block - 1 {
// Verify all blocks except the genesis
for i in 1..=n_blocks {
assert_eq!(
blockchain1
.read()
.get_block_at(i as u32 + Policy::genesis_block_number(), true, None)
.unwrap(),
blocks[(i - 1) as usize]
ordered_blocks[(i - 1) as usize]
);
}
assert_eq!(syncer.live_sync.queue().num_buffered_blocks(), 1);

// Let first missing blocks request be processed.
// The reply will result in the blocks for the first request to be discarded
// since they have been applied through the block stream.
mock_node.next().await;
syncer.next().await;

// Assert that the second missing blocks request is still pending.
assert_eq!(
blockchain1.read().block_number(),
first_gossiped_block as u32 + Policy::genesis_block_number()
);
assert_eq!(syncer.live_sync.queue().num_buffered_blocks() as u64, 1);

// Allow the last requests missing blocks to finish.
mock_node.next().await;
syncer.next().await;
syncer.next().await; // To push the buffered block

assert_eq!(blockchain1.read().head_hash(), blocks[index].hash());
assert_eq!(syncer.live_sync.queue().num_buffered_blocks() as u64, 0);
// No blocks buffered
assert_eq!(syncer.live_sync.queue().num_buffered_blocks(), 0);
}

#[test(tokio::test)]
Expand Down

0 comments on commit 09c424d

Please sign in to comment.