Skip to content

Commit df5aadd

Browse files
committed
feat(sync): dynamically fetch block locators at sync
1 parent 7b1a50c commit df5aadd

File tree

20 files changed

+696
-273
lines changed

20 files changed

+696
-273
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

node/bft/events/src/block_locators.rs

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// Copyright (c) 2019-2025 Provable Inc.
2+
// This file is part of the snarkOS library.
3+
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at:
7+
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
use super::{BlockLocators, EventTrait, FromBytes, IoResult, Network, ToBytes};
17+
18+
use std::{
19+
borrow::Cow,
20+
io::{Read, Write},
21+
};
22+
23+
#[derive(Clone, Debug, PartialEq, Eq)]
24+
pub struct BlockLocatorsRequest {
25+
pub start_height: u32,
26+
pub end_height: u32,
27+
}
28+
29+
#[derive(Clone, Debug, PartialEq, Eq)]
30+
pub struct BlockLocatorsResponse<N: Network> {
31+
pub locators: BlockLocators<N>,
32+
}
33+
34+
impl EventTrait for BlockLocatorsRequest {
35+
/// Returns the event name.
36+
#[inline]
37+
fn name(&self) -> Cow<'static, str> {
38+
"BlockLocatorsRequest".into()
39+
}
40+
}
41+
42+
impl FromBytes for BlockLocatorsRequest {
43+
fn read_le<R: Read>(mut reader: R) -> IoResult<Self> {
44+
let start_height = u32::read_le(&mut reader)?;
45+
let end_height = u32::read_le(&mut reader)?;
46+
47+
Ok(Self { start_height, end_height })
48+
}
49+
}
50+
51+
impl ToBytes for BlockLocatorsRequest {
52+
fn write_le<W: Write>(&self, mut writer: W) -> IoResult<()> {
53+
self.start_height.write_le(&mut writer)?;
54+
self.end_height.write_le(&mut writer)?;
55+
Ok(())
56+
}
57+
}
58+
59+
impl<N: Network> EventTrait for BlockLocatorsResponse<N> {
60+
/// Returns the event name.
61+
#[inline]
62+
fn name(&self) -> Cow<'static, str> {
63+
"BlockLocatorsResponse".into()
64+
}
65+
}
66+
67+
impl<N: Network> FromBytes for BlockLocatorsResponse<N> {
68+
fn read_le<R: Read>(mut reader: R) -> IoResult<Self> {
69+
let locators = BlockLocators::read_le(&mut reader)?;
70+
Ok(Self { locators })
71+
}
72+
}
73+
74+
impl<N: Network> ToBytes for BlockLocatorsResponse<N> {
75+
fn write_le<W: Write>(&self, mut writer: W) -> IoResult<()> {
76+
self.locators.write_le(&mut writer)
77+
}
78+
}

node/bft/events/src/lib.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ pub use validators_response::ValidatorsResponse;
6666
mod worker_ping;
6767
pub use worker_ping::WorkerPing;
6868

69+
mod block_locators;
70+
pub use block_locators::{BlockLocatorsRequest, BlockLocatorsResponse};
71+
6972
use snarkos_node_sync_locators::BlockLocators;
7073
use snarkvm::{
7174
console::prelude::{FromBytes, Network, Read, ToBytes, Write, error},
@@ -90,7 +93,6 @@ pub trait EventTrait: ToBytes + FromBytes {
9093
#[derive(Clone, Debug, PartialEq, Eq)]
9194
// TODO (howardwu): For mainnet - Remove this clippy lint. The CertificateResponse should not
9295
// be a large enum variant, after removing the versioning.
93-
#[allow(clippy::large_enum_variant)]
9496
pub enum Event<N: Network> {
9597
BatchPropose(BatchPropose<N>),
9698
BatchSignature(BatchSignature<N>),
@@ -108,6 +110,8 @@ pub enum Event<N: Network> {
108110
ValidatorsRequest(ValidatorsRequest),
109111
ValidatorsResponse(ValidatorsResponse<N>),
110112
WorkerPing(WorkerPing<N>),
113+
BlockLocatorsRequest(BlockLocatorsRequest),
114+
BlockLocatorsResponse(BlockLocatorsResponse<N>),
111115
}
112116

113117
impl<N: Network> From<DisconnectReason> for Event<N> {
@@ -140,6 +144,8 @@ impl<N: Network> Event<N> {
140144
Self::ValidatorsRequest(event) => event.name(),
141145
Self::ValidatorsResponse(event) => event.name(),
142146
Self::WorkerPing(event) => event.name(),
147+
Self::BlockLocatorsRequest(event) => event.name(),
148+
Self::BlockLocatorsResponse(event) => event.name(),
143149
}
144150
}
145151

@@ -163,6 +169,8 @@ impl<N: Network> Event<N> {
163169
Self::ValidatorsRequest(..) => 13,
164170
Self::ValidatorsResponse(..) => 14,
165171
Self::WorkerPing(..) => 15,
172+
Self::BlockLocatorsRequest(..) => 16,
173+
Self::BlockLocatorsResponse(..) => 17,
166174
}
167175
}
168176
}
@@ -188,6 +196,8 @@ impl<N: Network> ToBytes for Event<N> {
188196
Self::ValidatorsRequest(event) => event.write_le(writer),
189197
Self::ValidatorsResponse(event) => event.write_le(writer),
190198
Self::WorkerPing(event) => event.write_le(writer),
199+
Self::BlockLocatorsRequest(event) => event.write_le(writer),
200+
Self::BlockLocatorsResponse(event) => event.write_le(writer),
191201
}
192202
}
193203
}
@@ -215,7 +225,9 @@ impl<N: Network> FromBytes for Event<N> {
215225
13 => Self::ValidatorsRequest(ValidatorsRequest::read_le(&mut reader)?),
216226
14 => Self::ValidatorsResponse(ValidatorsResponse::read_le(&mut reader)?),
217227
15 => Self::WorkerPing(WorkerPing::read_le(&mut reader)?),
218-
16.. => return Err(error(format!("Unknown event ID {id}"))),
228+
16 => Self::BlockLocatorsRequest(BlockLocatorsRequest::read_le(&mut reader)?),
229+
17 => Self::BlockLocatorsResponse(BlockLocatorsResponse::read_le(&mut reader)?),
230+
18.. => return Err(error(format!("Unknown event ID {id}"))),
219231
};
220232

221233
// Ensure that there are no "dangling" bytes.

node/bft/events/src/primary_ping.rs

Lines changed: 13 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,21 @@ use super::*;
1818
#[derive(Clone, Debug, PartialEq, Eq)]
1919
pub struct PrimaryPing<N: Network> {
2020
pub version: u32,
21-
pub block_locators: BlockLocators<N>,
21+
pub current_block_height: u32,
2222
pub primary_certificate: Data<BatchCertificate<N>>,
2323
}
2424

2525
impl<N: Network> PrimaryPing<N> {
2626
/// Initializes a new ping event.
27-
pub const fn new(
28-
version: u32,
29-
block_locators: BlockLocators<N>,
30-
primary_certificate: Data<BatchCertificate<N>>,
31-
) -> Self {
32-
Self { version, block_locators, primary_certificate }
27+
pub const fn new(version: u32, current_block_height: u32, primary_certificate: Data<BatchCertificate<N>>) -> Self {
28+
Self { version, current_block_height, primary_certificate }
3329
}
3430
}
3531

36-
impl<N: Network> From<(u32, BlockLocators<N>, BatchCertificate<N>)> for PrimaryPing<N> {
32+
impl<N: Network> From<(u32, u32, BatchCertificate<N>)> for PrimaryPing<N> {
3733
/// Initializes a new ping event.
38-
fn from((version, block_locators, primary_certificate): (u32, BlockLocators<N>, BatchCertificate<N>)) -> Self {
39-
Self::new(version, block_locators, Data::Object(primary_certificate))
34+
fn from((version, current_block_height, primary_certificate): (u32, u32, BatchCertificate<N>)) -> Self {
35+
Self::new(version, current_block_height, Data::Object(primary_certificate))
4036
}
4137
}
4238

@@ -50,11 +46,8 @@ impl<N: Network> EventTrait for PrimaryPing<N> {
5046

5147
impl<N: Network> ToBytes for PrimaryPing<N> {
5248
fn write_le<W: Write>(&self, mut writer: W) -> IoResult<()> {
53-
// Write the version.
5449
self.version.write_le(&mut writer)?;
55-
// Write the block locators.
56-
self.block_locators.write_le(&mut writer)?;
57-
// Write the primary certificate.
50+
self.current_block_height.write_le(&mut writer)?;
5851
self.primary_certificate.write_le(&mut writer)?;
5952

6053
Ok(())
@@ -66,19 +59,18 @@ impl<N: Network> FromBytes for PrimaryPing<N> {
6659
// Read the version.
6760
let version = u32::read_le(&mut reader)?;
6861
// Read the block locators.
69-
let block_locators = BlockLocators::read_le(&mut reader)?;
62+
let current_block_height = u32::read_le(&mut reader)?;
7063
// Read the primary certificate.
7164
let primary_certificate = Data::read_le(&mut reader)?;
7265

7366
// Return the ping event.
74-
Ok(Self::new(version, block_locators, primary_certificate))
67+
Ok(Self::new(version, current_block_height, primary_certificate))
7568
}
7669
}
7770

7871
#[cfg(test)]
7972
pub mod prop_tests {
8073
use crate::{PrimaryPing, certificate_response::prop_tests::any_batch_certificate};
81-
use snarkos_node_sync_locators::{BlockLocators, test_helpers::sample_block_locators};
8274
use snarkvm::utilities::{FromBytes, ToBytes};
8375

8476
use bytes::{Buf, BufMut, BytesMut};
@@ -87,14 +79,10 @@ pub mod prop_tests {
8779

8880
type CurrentNetwork = snarkvm::prelude::MainnetV0;
8981

90-
pub fn any_block_locators() -> BoxedStrategy<BlockLocators<CurrentNetwork>> {
91-
any::<u32>().prop_map(sample_block_locators).boxed()
92-
}
93-
9482
pub fn any_primary_ping() -> BoxedStrategy<PrimaryPing<CurrentNetwork>> {
95-
(any::<u32>(), any_block_locators(), any_batch_certificate())
96-
.prop_map(|(version, block_locators, batch_certificate)| {
97-
PrimaryPing::from((version, block_locators, batch_certificate.clone()))
83+
(any::<u32>(), any::<u32>(), any_batch_certificate())
84+
.prop_map(|(version, current_block_height, batch_certificate)| {
85+
PrimaryPing::from((version, current_block_height, batch_certificate.clone()))
9886
})
9987
.boxed()
10088
}
@@ -105,7 +93,7 @@ pub mod prop_tests {
10593
primary_ping.write_le(&mut bytes).unwrap();
10694
let decoded = PrimaryPing::<CurrentNetwork>::read_le(&mut bytes.into_inner().reader()).unwrap();
10795
assert_eq!(primary_ping.version, decoded.version);
108-
assert_eq!(primary_ping.block_locators, decoded.block_locators);
96+
assert_eq!(primary_ping.current_block_height, decoded.current_block_height);
10997
assert_eq!(
11098
primary_ping.primary_certificate.deserialize_blocking().unwrap(),
11199
decoded.primary_certificate.deserialize_blocking().unwrap(),

node/bft/src/gateway.rs

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ use crate::{
2626
};
2727
use snarkos_account::Account;
2828
use snarkos_node_bft_events::{
29+
BlockLocatorsRequest,
30+
BlockLocatorsResponse,
2931
BlockRequest,
3032
BlockResponse,
3133
CertificateRequest,
@@ -62,6 +64,7 @@ use snarkvm::{
6264
prelude::{Address, Field},
6365
};
6466

67+
use anyhow::anyhow;
6568
use colored::Colorize;
6669
use futures::SinkExt;
6770
use indexmap::{IndexMap, IndexSet};
@@ -267,6 +270,11 @@ impl<N: Network> CommunicationService for Gateway<N> {
267270
Event::BlockRequest(BlockRequest { start_height, end_height })
268271
}
269272

273+
/// Prepare a block locators requets to be sent
274+
fn prepare_block_locators_request(start_height: u32, end_height: u32) -> Self::Message {
275+
Event::BlockLocatorsRequest(BlockLocatorsRequest { start_height, end_height })
276+
}
277+
270278
/// Sends the given message to specified peer.
271279
///
272280
/// This function returns as soon as the message is queued to be sent,
@@ -730,7 +738,7 @@ impl<N: Network> Gateway<N> {
730738
bail!("{CONTEXT} {:?}", disconnect.reason)
731739
}
732740
Event::PrimaryPing(ping) => {
733-
let PrimaryPing { version, block_locators, primary_certificate } = ping;
741+
let PrimaryPing { version, current_block_height, primary_certificate } = ping;
734742

735743
// Ensure the event version is not outdated.
736744
if version < Event::<N>::VERSION {
@@ -740,7 +748,7 @@ impl<N: Network> Gateway<N> {
740748
// Update the peer locators. Except for some tests, there is always a sync sender.
741749
if let Some(sync_sender) = self.sync_sender.get() {
742750
// Check the block locators are valid, and update the validators in the sync module.
743-
if let Err(error) = sync_sender.update_peer_locators(peer_ip, block_locators).await {
751+
if let Err(error) = sync_sender.update_peer_block_height(peer_ip, current_block_height).await {
744752
bail!("Validator '{peer_ip}' sent invalid block locators - {error}");
745753
}
746754
}
@@ -882,6 +890,29 @@ impl<N: Network> Gateway<N> {
882890
}
883891
Ok(())
884892
}
893+
Event::BlockLocatorsRequest(BlockLocatorsRequest { start_height, end_height }) => {
894+
ensure!(start_height < end_height, "Invalid block locators range");
895+
896+
let Some(sync_sender) = self.sync_sender.get() else {
897+
bail!("Cannot process block locators request: no sync sender");
898+
};
899+
900+
let locators = sync_sender.get_block_locators(start_height, end_height).await?;
901+
let event = Event::BlockLocatorsResponse(BlockLocatorsResponse { locators });
902+
903+
let Some(result) = Transport::send(self, peer_ip, event).await else {
904+
bail!("Failed to send block locator response to peer {peer_ip}");
905+
};
906+
907+
result.await?.map_err(|err| anyhow!("Send failed: {err}"))
908+
}
909+
Event::BlockLocatorsResponse(BlockLocatorsResponse { locators }) => {
910+
if let Some(sync_sender) = self.sync_sender.get() {
911+
sync_sender.update_peer_block_locators(peer_ip, locators).await
912+
} else {
913+
bail!("Cannot update peer block locators: no sync sender");
914+
}
915+
}
885916
}
886917
}
887918

0 commit comments

Comments
 (0)