Skip to content

Commit 908f7f4

Browse files
mothrannategraf
andauthored
BM-273: Fixes batch deadline detection by regular polling for finalize conditions (github#87)
This PR moves the finalize checks to a function that can run incrementally as we poll for new orders such that batch deadlines can be detected and finalized without new orders coming into the aggregator to trigger it. It also adds a config option for how often to the aggregator should check for new proofs to aggregate / finalize checks. Additionally it adds a unit test for batch-deadline detection. This PR does increase the get_block_number() RPC calls and means we should probably make a block number service internally to broker sooner rather than later. --------- Co-authored-by: Victor Graf <[email protected]>
1 parent f727dbc commit 908f7f4

File tree

5 files changed

+222
-68
lines changed

5 files changed

+222
-68
lines changed

broker.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,4 @@ batch_max_time = 1000
2323
batch_size = 1
2424
block_deadline_buffer_secs = 120
2525
txn_timeout = 45
26+
# batch_poll_time_ms = 500

crates/broker/src/aggregator.rs

Lines changed: 189 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -342,58 +342,18 @@ where
342342
Ok(())
343343
}
344344

345-
async fn aggregate_proofs(&mut self) -> Result<()> {
346-
let new_proofs = self
347-
.db
348-
.get_aggregation_proofs()
349-
.await
350-
.context("Failed to get pending agg proofs from DB")?;
351-
352-
if new_proofs.is_empty() {
353-
return Ok(());
354-
}
355-
356-
let batch_id = self.db.get_current_batch().await.context("Failed to get current batch")?;
357-
358-
for agg_proof in new_proofs {
359-
match self.aggregate_proof(batch_id, agg_proof.order_id, agg_proof.proof_id).await {
360-
Ok(_) => {
361-
tracing::info!("Completed aggregation of proof {:x}", agg_proof.order_id);
362-
self.db
363-
.update_batch(
364-
batch_id,
365-
agg_proof.order_id,
366-
agg_proof.expire_block,
367-
agg_proof.fee,
368-
)
369-
.await
370-
.context("Failed to update batch with new order details")?;
371-
}
372-
Err(err) => {
373-
tracing::error!(
374-
"Failed to complete aggregation of proof {:x} {err:?}",
375-
agg_proof.order_id
376-
);
377-
if let Err(db_err) =
378-
self.db.set_order_failure(agg_proof.order_id, format!("{err:?}")).await
379-
{
380-
tracing::error!("Failed to mark order failure in db: {db_err}");
381-
}
382-
}
383-
}
384-
}
385-
386-
// Check if we should finalize the batch
387-
// - check current min-deadline and batch timer
388-
// - need to fetch current block, might be good to make that a long polling service with a
389-
// Atomic everyone reads
390-
// if so:
391-
// - finalize
392-
// - snark proof
393-
// - insert batch data in to DB for finalizer
394-
// - mark all orders in batch as Aggregated
395-
// - reset batch timer
396-
345+
/// Check if we should finalize the batch
346+
///
347+
/// - check current min-deadline and batch timer
348+
/// - need to fetch current block, might be good to make that a long polling service with a
349+
/// Atomic everyone reads
350+
///
351+
/// if so:
352+
/// - finalize
353+
/// - snark proof
354+
/// - insert batch data in to DB for finalizer
355+
/// - mark all orders in batch as Aggregated
356+
async fn check_finalize(&mut self, batch_id: usize) -> Result<()> {
397357
let (conf_batch_size, conf_batch_time, conf_batch_fees) = {
398358
let config = self.config.lock_all().context("Failed to lock config")?;
399359

@@ -457,11 +417,19 @@ where
457417
let block_number =
458418
self.provider.get_block_number().await.context("Failed to get current block")?;
459419

460-
// tracing::info!("{} {} {}", self.current_deadline, block_number, self.block_time);
461420
let remaining_secs = (batch.block_deadline.expect("batch missing block deadline")
462421
- block_number)
463422
* self.block_time;
464423
let buffer_secs = conf_block_deadline_buf;
424+
// tracing::info!(
425+
// "{:?} {} {} {} {}",
426+
// batch.block_deadline,
427+
// block_number,
428+
// self.block_time,
429+
// remaining_secs,
430+
// buffer_secs
431+
// );
432+
465433
if remaining_secs <= buffer_secs {
466434
tracing::info!("Batch getting close to deadline {remaining_secs}, finalizing");
467435
finalize = true;
@@ -474,6 +442,54 @@ where
474442

475443
Ok(())
476444
}
445+
446+
async fn aggregate_proofs(&mut self) -> Result<()> {
447+
let new_proofs = self
448+
.db
449+
.get_aggregation_proofs()
450+
.await
451+
.context("Failed to get pending agg proofs from DB")?;
452+
453+
let batch_id = self.db.get_current_batch().await.context("Failed to get current batch")?;
454+
455+
self.check_finalize(batch_id).await?;
456+
457+
if new_proofs.is_empty() {
458+
return Ok(());
459+
}
460+
461+
for agg_proof in new_proofs {
462+
match self.aggregate_proof(batch_id, agg_proof.order_id, agg_proof.proof_id).await {
463+
Ok(_) => {
464+
tracing::info!("Completed aggregation of proof {:x}", agg_proof.order_id);
465+
self.db
466+
.update_batch(
467+
batch_id,
468+
agg_proof.order_id,
469+
agg_proof.expire_block,
470+
agg_proof.fee,
471+
)
472+
.await
473+
.context("Failed to update batch with new order details")?;
474+
}
475+
Err(err) => {
476+
tracing::error!(
477+
"Failed to complete aggregation of proof {:x} {err:?}",
478+
agg_proof.order_id
479+
);
480+
if let Err(db_err) =
481+
self.db.set_order_failure(agg_proof.order_id, format!("{err:?}")).await
482+
{
483+
tracing::error!("Failed to mark order failure in db: {db_err}");
484+
}
485+
}
486+
}
487+
}
488+
489+
self.check_finalize(batch_id).await?;
490+
491+
Ok(())
492+
}
477493
}
478494

479495
impl<T, P> RetryTask for AggregatorService<T, P>
@@ -483,12 +499,21 @@ where
483499
{
484500
fn spawn(&self) -> RetryRes {
485501
let mut self_clone = self.clone();
502+
486503
Box::pin(async move {
487504
tracing::info!("Starting Aggregator service");
488505
loop {
506+
let conf_poll_time_ms = {
507+
let config = self_clone
508+
.config
509+
.lock_all()
510+
.context("Failed to lock config")
511+
.map_err(SupervisorErr::Fault)?;
512+
config.batcher.batch_poll_time_ms.unwrap_or(1000)
513+
};
514+
489515
self_clone.aggregate_proofs().await.map_err(SupervisorErr::Recover)?;
490-
// TODO: configuration
491-
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
516+
tokio::time::sleep(tokio::time::Duration::from_millis(conf_poll_time_ms)).await;
492517
}
493518
})
494519
}
@@ -507,7 +532,7 @@ mod tests {
507532
network::EthereumWallet,
508533
node_bindings::Anvil,
509534
primitives::{aliases::U96, Keccak256, B256},
510-
providers::ProviderBuilder,
535+
providers::{ext::AnvilApi, ProviderBuilder},
511536
signers::local::PrivateKeySigner,
512537
};
513538
use boundless_market::contracts::{
@@ -838,4 +863,112 @@ mod tests {
838863
assert!(!batch.orders.is_empty());
839864
assert_eq!(batch.status, BatchStatus::PendingSubmission);
840865
}
866+
867+
#[tokio::test]
868+
#[traced_test]
869+
async fn deadline_finalize() {
870+
let anvil = Anvil::new().spawn();
871+
let signer: PrivateKeySigner = anvil.keys()[0].clone().into();
872+
let provider = Arc::new(
873+
ProviderBuilder::new()
874+
.with_recommended_fillers()
875+
.wallet(EthereumWallet::from(signer))
876+
.on_http(anvil.endpoint().parse().unwrap()),
877+
);
878+
let db: DbObj = Arc::new(SqliteDb::new("sqlite::memory:").await.unwrap());
879+
let config = ConfigLock::default();
880+
{
881+
let mut config = config.load_write().unwrap();
882+
config.batcher.batch_size = Some(2);
883+
config.batcher.block_deadline_buffer_secs = 100;
884+
}
885+
886+
let prover: ProverObj = Arc::new(MockProver::default());
887+
888+
// Pre-prove the echo aka app guest:
889+
let image_id = Digest::from(ECHO_ID);
890+
let image_id_str = image_id.to_string();
891+
prover.upload_image(&image_id_str, ECHO_ELF.to_vec()).await.unwrap();
892+
let input_id = prover
893+
.upload_input(encode_input(&vec![0x41, 0x41, 0x41, 0x41]).unwrap())
894+
.await
895+
.unwrap();
896+
let proof_res =
897+
prover.prove_and_monitor_stark(&image_id_str, &input_id, vec![]).await.unwrap();
898+
899+
let mut aggregator = AggregatorService::new(
900+
db.clone(),
901+
provider.clone(),
902+
Digest::from(SET_BUILDER_GUEST_ID),
903+
SET_BUILDER_GUEST_ELF.to_vec(),
904+
Digest::from(ASSESSOR_GUEST_ID),
905+
ASSESSOR_GUEST_ELF.to_vec(),
906+
Address::ZERO,
907+
config,
908+
prover,
909+
2,
910+
)
911+
.await
912+
.unwrap();
913+
914+
let customer_signer: PrivateKeySigner = anvil.keys()[1].clone().into();
915+
let chain_id = provider.get_chain_id().await.unwrap();
916+
917+
let min_price = 200000000000000000u64;
918+
let order_request = ProvingRequest::new(
919+
0,
920+
&customer_signer.address(),
921+
Requirements {
922+
imageId: B256::from_slice(image_id.as_bytes()),
923+
predicate: Predicate {
924+
predicateType: PredicateType::PrefixMatch,
925+
data: Default::default(),
926+
},
927+
},
928+
"http://risczero.com/image".into(),
929+
Input { inputType: InputType::Inline, data: Default::default() },
930+
Offer {
931+
minPrice: U96::from(min_price),
932+
maxPrice: U96::from(250000000000000000u64),
933+
biddingStart: 0,
934+
timeout: 50,
935+
rampUpPeriod: 1,
936+
lockinStake: U96::from(10),
937+
},
938+
);
939+
940+
let client_sig = order_request
941+
.sign_request(&customer_signer, Address::ZERO, chain_id)
942+
.unwrap()
943+
.as_bytes();
944+
945+
let order = Order {
946+
status: OrderStatus::PendingAgg,
947+
updated_at: Utc::now(),
948+
target_block: None,
949+
request: order_request,
950+
image_id: Some(image_id_str.clone()),
951+
input_id: Some(input_id.clone()),
952+
proof_id: Some(proof_res.id),
953+
expire_block: Some(100),
954+
path: None,
955+
client_sig: client_sig.into(),
956+
lock_price: Some(U256::from(min_price)),
957+
error_msg: None,
958+
};
959+
let order_id = U256::from(order.request.id);
960+
db.add_order(order_id, order.clone()).await.unwrap();
961+
962+
provider.anvil_mine(Some(U256::from(51)), Some(U256::from(2))).await.unwrap();
963+
964+
aggregator.aggregate_proofs().await.unwrap();
965+
966+
let db_order = db.get_order(order_id).await.unwrap().unwrap();
967+
assert_eq!(db_order.status, OrderStatus::PendingSubmission);
968+
969+
let (_batch_id, batch) = db.get_complete_batch().await.unwrap().unwrap();
970+
assert!(!batch.orders.is_empty());
971+
assert_eq!(batch.status, BatchStatus::PendingSubmission);
972+
assert!(logs_contain("Batch getting close to deadline"));
973+
}
841974
}

crates/broker/src/config.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,12 @@ pub struct BatcherConfig {
123123
pub block_deadline_buffer_secs: u64,
124124
/// Timeout, in seconds for transaction confirmations
125125
pub txn_timeout: Option<u64>,
126-
/// Use the single TXN submission that batchs submit_merkle / fulfill_batch into
126+
/// Polling time, in milliseconds
127+
///
128+
/// The time between polls for new orders to aggregate and how often to check for
129+
/// batch finalize conditions
130+
pub batch_poll_time_ms: Option<u64>,
131+
/// Use the single TXN submission that batches submit_merkle / fulfill_batch into
127132
/// A single transaction. Requires the `submitRootAndFulfillBatch` method
128133
/// be present on the deployed contract
129134
#[serde(default)]
@@ -138,6 +143,7 @@ impl Default for BatcherConfig {
138143
batch_max_fees: None,
139144
block_deadline_buffer_secs: 120,
140145
txn_timeout: None,
146+
batch_poll_time_ms: Some(1000),
141147
single_txn_fulfill: false,
142148
}
143149
}
@@ -346,6 +352,7 @@ batch_max_time = 300
346352
batch_size = 2
347353
block_deadline_buffer_secs = 120
348354
txn_timeout = 45
355+
batch_poll_time_ms = 1200
349356
single_txn_fulfill = true"#;
350357

351358
const BAD_CONFIG: &str = r#"
@@ -389,6 +396,7 @@ error = ?"#;
389396
assert_eq!(config.batcher.batch_max_fees, Some("0.1".into()));
390397
assert_eq!(config.batcher.block_deadline_buffer_secs, 120);
391398
assert_eq!(config.batcher.txn_timeout, None);
399+
assert_eq!(config.batcher.batch_poll_time_ms, None);
392400
}
393401

394402
#[tokio::test]
@@ -433,6 +441,7 @@ error = ?"#;
433441
assert_eq!(config.prover.status_poll_ms, 1000);
434442
assert!(config.prover.bonsai_r0_zkvm_ver.is_none());
435443
assert_eq!(config.batcher.txn_timeout, Some(45));
444+
assert_eq!(config.batcher.batch_poll_time_ms, Some(1200));
436445
assert_eq!(config.batcher.single_txn_fulfill, true);
437446
}
438447
tracing::debug!("closing...");

0 commit comments

Comments
 (0)