Skip to content

Commit 9e3c01e

Browse files
committed
Merge upstream/master into kwsantiago/1973-simplify-examples
2 parents dc4e009 + 8d9df97 commit 9e3c01e

File tree

4 files changed

+219
-23
lines changed

4 files changed

+219
-23
lines changed

crates/chain/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ rusqlite = { version = "0.31.0", features = ["bundled"], optional = true }
2727
[dev-dependencies]
2828
rand = "0.8"
2929
proptest = "1.2.0"
30-
bdk_testenv = { path = "../testenv", default-features = false }
30+
bdk_testenv = { path = "../testenv" }
3131
criterion = { version = "0.2" }
3232

3333
[features]

crates/chain/src/indexed_tx_graph.rs

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,16 @@ impl<A: Anchor, I: Indexer> IndexedTxGraph<A, I> {
6767
indexer,
6868
}
6969
}
70+
71+
// If `tx` replaces a relevant tx, it should also be considered relevant.
72+
fn is_tx_or_conflict_relevant(&self, tx: &Transaction) -> bool {
73+
self.index.is_tx_relevant(tx)
74+
|| self
75+
.graph
76+
.direct_conflicts(tx)
77+
.filter_map(|(_, txid)| self.graph.get_tx(txid))
78+
.any(|tx| self.index.is_tx_relevant(&tx))
79+
}
7080
}
7181

7282
impl<A: Anchor, I: Indexer> IndexedTxGraph<A, I>
@@ -248,8 +258,11 @@ where
248258

249259
/// Batch insert transactions, filtering out those that are irrelevant.
250260
///
251-
/// Relevancy is determined by the [`Indexer::is_tx_relevant`] implementation of `I`. Irrelevant
252-
/// transactions in `txs` will be ignored. `txs` do not need to be in topological order.
261+
/// `txs` do not need to be in topological order.
262+
///
263+
/// Relevancy is determined by the internal [`Indexer::is_tx_relevant`] implementation of `I`.
264+
/// A transaction that conflicts with a relevant transaction is also considered relevant.
265+
/// Irrelevant transactions in `txs` will be ignored.
253266
pub fn batch_insert_relevant<T: Into<Arc<Transaction>>>(
254267
&mut self,
255268
txs: impl IntoIterator<Item = (T, impl IntoIterator<Item = A>)>,
@@ -272,7 +285,7 @@ where
272285

273286
let mut tx_graph = tx_graph::ChangeSet::default();
274287
for (tx, anchors) in txs {
275-
if self.index.is_tx_relevant(&tx) {
288+
if self.is_tx_or_conflict_relevant(&tx) {
276289
let txid = tx.compute_txid();
277290
tx_graph.merge(self.graph.insert_tx(tx.clone()));
278291
for anchor in anchors {
@@ -287,7 +300,8 @@ where
287300
/// Batch insert unconfirmed transactions, filtering out those that are irrelevant.
288301
///
289302
/// Relevancy is determined by the internal [`Indexer::is_tx_relevant`] implementation of `I`.
290-
/// Irrelevant transactions in `txs` will be ignored.
303+
/// A transaction that conflicts with a relevant transaction is also considered relevant.
304+
/// Irrelevant transactions in `unconfirmed_txs` will be ignored.
291305
///
292306
/// Items of `txs` are tuples containing the transaction and a *last seen* timestamp. The
293307
/// *last seen* communicates when the transaction is last seen in the mempool which is used for
@@ -314,8 +328,9 @@ where
314328

315329
let graph = self.graph.batch_insert_unconfirmed(
316330
txs.into_iter()
317-
.filter(|(tx, _)| self.index.is_tx_relevant(tx))
318-
.map(|(tx, seen_at)| (tx.clone(), seen_at)),
331+
.filter(|(tx, _)| self.is_tx_or_conflict_relevant(tx))
332+
.map(|(tx, seen_at)| (tx.clone(), seen_at))
333+
.collect::<Vec<_>>(),
319334
);
320335

321336
ChangeSet {
@@ -359,7 +374,8 @@ where
359374
/// Each inserted transaction's anchor will be constructed using [`TxPosInBlock`].
360375
///
361376
/// Relevancy is determined by the internal [`Indexer::is_tx_relevant`] implementation of `I`.
362-
/// Irrelevant transactions in `txs` will be ignored.
377+
/// A transaction that conflicts with a relevant transaction is also considered relevant.
378+
/// Irrelevant transactions in `block` will be ignored.
363379
///
364380
/// # Example
365381
///
@@ -384,7 +400,7 @@ where
384400
let mut changeset = ChangeSet::<A, I::ChangeSet>::default();
385401
for (tx_pos, tx) in block.txdata.iter().enumerate() {
386402
changeset.indexer.merge(self.index.index_tx(tx));
387-
if self.index.is_tx_relevant(tx) {
403+
if self.is_tx_or_conflict_relevant(tx) {
388404
let txid = tx.compute_txid();
389405
let anchor = TxPosInBlock {
390406
block,

crates/chain/tests/test_indexed_tx_graph.rs

Lines changed: 178 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,198 @@
33
#[macro_use]
44
mod common;
55

6-
use std::{collections::BTreeSet, sync::Arc};
6+
use std::{
7+
collections::{BTreeSet, HashMap},
8+
sync::Arc,
9+
};
710

811
use bdk_chain::{
912
indexed_tx_graph::{self, IndexedTxGraph},
1013
indexer::keychain_txout::KeychainTxOutIndex,
1114
local_chain::LocalChain,
15+
spk_txout::SpkTxOutIndex,
1216
tx_graph, Balance, CanonicalizationParams, ChainPosition, ConfirmationBlockTime, DescriptorExt,
1317
SpkIterator,
1418
};
1519
use bdk_testenv::{
20+
anyhow::{self},
21+
bitcoincore_rpc::{json::CreateRawTransactionInput, RpcApi},
1622
block_id, hash,
1723
utils::{new_tx, DESCRIPTORS},
24+
TestEnv,
25+
};
26+
use bitcoin::{
27+
secp256k1::Secp256k1, Address, Amount, Network, OutPoint, ScriptBuf, Transaction, TxIn, TxOut,
28+
Txid,
1829
};
19-
use bitcoin::{secp256k1::Secp256k1, Amount, OutPoint, ScriptBuf, Transaction, TxIn, TxOut};
2030
use miniscript::Descriptor;
2131

32+
fn gen_spk() -> ScriptBuf {
33+
use bitcoin::secp256k1::{Secp256k1, SecretKey};
34+
35+
let secp = Secp256k1::new();
36+
let (x_only_pk, _) = SecretKey::new(&mut rand::thread_rng())
37+
.public_key(&secp)
38+
.x_only_public_key();
39+
ScriptBuf::new_p2tr(&secp, x_only_pk, None)
40+
}
41+
42+
/// Conflicts of relevant transactions must also be considered relevant.
43+
///
44+
/// This allows the receiving structures to determine the reason why a given transaction is not part
45+
/// of the best history. I.e. Is this transaction evicted from the mempool because of insufficient
46+
/// fee, or because a conflict is confirmed?
47+
///
48+
/// This tests the behavior of the "relevant-conflicts" logic.
49+
#[test]
50+
fn relevant_conflicts() -> anyhow::Result<()> {
51+
type SpkTxGraph = IndexedTxGraph<ConfirmationBlockTime, SpkTxOutIndex<()>>;
52+
53+
/// This environment contains a sender and receiver.
54+
///
55+
/// The sender sends a transaction to the receiver and attempts to cancel it later.
56+
struct ScenarioEnv {
57+
env: TestEnv,
58+
graph: SpkTxGraph,
59+
tx_send: Transaction,
60+
tx_cancel: Transaction,
61+
}
62+
63+
impl ScenarioEnv {
64+
fn new() -> anyhow::Result<Self> {
65+
let env = TestEnv::new()?;
66+
let client = env.rpc_client();
67+
68+
let sender_addr = client
69+
.get_new_address(None, None)?
70+
.require_network(Network::Regtest)?;
71+
72+
let recv_spk = gen_spk();
73+
let recv_addr = Address::from_script(&recv_spk, &bitcoin::params::REGTEST)?;
74+
75+
let mut graph = SpkTxGraph::default();
76+
assert!(graph.index.insert_spk((), recv_spk));
77+
78+
env.mine_blocks(1, Some(sender_addr.clone()))?;
79+
env.mine_blocks(101, None)?;
80+
81+
let tx_input = client
82+
.list_unspent(None, None, None, None, None)?
83+
.into_iter()
84+
.take(1)
85+
.map(|r| CreateRawTransactionInput {
86+
txid: r.txid,
87+
vout: r.vout,
88+
sequence: None,
89+
})
90+
.collect::<Vec<_>>();
91+
let tx_send = {
92+
let outputs =
93+
HashMap::from([(recv_addr.to_string(), Amount::from_btc(49.999_99)?)]);
94+
let tx = client.create_raw_transaction(&tx_input, &outputs, None, Some(true))?;
95+
client
96+
.sign_raw_transaction_with_wallet(&tx, None, None)?
97+
.transaction()?
98+
};
99+
let tx_cancel = {
100+
let outputs =
101+
HashMap::from([(sender_addr.to_string(), Amount::from_btc(49.999_98)?)]);
102+
let tx = client.create_raw_transaction(&tx_input, &outputs, None, Some(true))?;
103+
client
104+
.sign_raw_transaction_with_wallet(&tx, None, None)?
105+
.transaction()?
106+
};
107+
108+
Ok(Self {
109+
env,
110+
graph,
111+
tx_send,
112+
tx_cancel,
113+
})
114+
}
115+
116+
/// Rudimentary sync implementation.
117+
///
118+
/// Scans through all transactions in the blockchain + mempool.
119+
fn sync(&mut self) -> anyhow::Result<()> {
120+
let client = self.env.rpc_client();
121+
for height in 0..=client.get_block_count()? {
122+
let hash = client.get_block_hash(height)?;
123+
let block = client.get_block(&hash)?;
124+
let _ = self.graph.apply_block_relevant(&block, height as _);
125+
}
126+
let _ = self.graph.batch_insert_relevant_unconfirmed(
127+
client
128+
.get_raw_mempool()?
129+
.into_iter()
130+
.map(|txid| client.get_raw_transaction(&txid, None).map(|tx| (tx, 0)))
131+
.collect::<Result<Vec<_>, _>>()?,
132+
);
133+
Ok(())
134+
}
135+
136+
/// Broadcast the original sending transaction.
137+
fn broadcast_send(&self) -> anyhow::Result<Txid> {
138+
let client = self.env.rpc_client();
139+
Ok(client.send_raw_transaction(&self.tx_send)?)
140+
}
141+
142+
/// Broadcast the cancellation transaction.
143+
fn broadcast_cancel(&self) -> anyhow::Result<Txid> {
144+
let client = self.env.rpc_client();
145+
Ok(client.send_raw_transaction(&self.tx_cancel)?)
146+
}
147+
}
148+
149+
// Broadcast `tx_send`.
150+
// Sync.
151+
// Broadcast `tx_cancel`.
152+
// `tx_cancel` gets confirmed.
153+
// Sync.
154+
// Expect: Both `tx_send` and `tx_cancel` appears in `recv_graph`.
155+
{
156+
let mut env = ScenarioEnv::new()?;
157+
let send_txid = env.broadcast_send()?;
158+
env.sync()?;
159+
let cancel_txid = env.broadcast_cancel()?;
160+
env.env.mine_blocks(6, None)?;
161+
env.sync()?;
162+
163+
assert_eq!(env.graph.graph().full_txs().count(), 2);
164+
assert!(env.graph.graph().get_tx(send_txid).is_some());
165+
assert!(env.graph.graph().get_tx(cancel_txid).is_some());
166+
}
167+
168+
// Broadcast `tx_send`.
169+
// Sync.
170+
// Broadcast `tx_cancel`.
171+
// Sync.
172+
// Expect: Both `tx_send` and `tx_cancel` appears in `recv_graph`.
173+
{
174+
let mut env = ScenarioEnv::new()?;
175+
let send_txid = env.broadcast_send()?;
176+
env.sync()?;
177+
let cancel_txid = env.broadcast_cancel()?;
178+
env.sync()?;
179+
180+
assert_eq!(env.graph.graph().full_txs().count(), 2);
181+
assert!(env.graph.graph().get_tx(send_txid).is_some());
182+
assert!(env.graph.graph().get_tx(cancel_txid).is_some());
183+
}
184+
185+
// If we don't see `tx_send` in the first place, `tx_cancel` should not be relevant.
186+
{
187+
let mut env = ScenarioEnv::new()?;
188+
let _ = env.broadcast_send()?;
189+
let _ = env.broadcast_cancel()?;
190+
env.sync()?;
191+
192+
assert_eq!(env.graph.graph().full_txs().count(), 0);
193+
}
194+
195+
Ok(())
196+
}
197+
22198
/// Ensure [`IndexedTxGraph::insert_relevant_txs`] can successfully index transactions NOT presented
23199
/// in topological order.
24200
///

crates/file_store/src/store.rs

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ where
118118
/// let mut new_store =
119119
/// Store::create(&MAGIC_BYTES, &new_file_path).expect("must create new file");
120120
/// if let Some(aggregated_changeset) = changeset {
121-
/// new_store.append(&aggregated_changeset)?;
121+
/// new_store.append(aggregated_changeset.as_ref())?;
122122
/// }
123123
/// // The following will overwrite the original file. You will loose the corrupted
124124
/// // portion of the original file forever.
@@ -152,7 +152,7 @@ where
152152
f.read_exact(&mut magic_buf)?;
153153
if magic_buf != magic {
154154
return Err(StoreErrorWithDump {
155-
changeset: Option::<C>::None,
155+
changeset: Option::<Box<C>>::None,
156156
error: StoreError::InvalidMagicBytes {
157157
got: magic_buf,
158158
expected: magic.to_vec(),
@@ -194,7 +194,7 @@ where
194194
Ok(aggregated_changeset)
195195
}
196196
Err(iter_error) => Err(StoreErrorWithDump {
197-
changeset: aggregated_changeset,
197+
changeset: aggregated_changeset.map(Box::new),
198198
error: iter_error,
199199
}),
200200
},
@@ -220,7 +220,7 @@ where
220220
Self::create(magic, file_path)
221221
.map(|store| (store, Option::<C>::None))
222222
.map_err(|err: StoreError| StoreErrorWithDump {
223-
changeset: Option::<C>::None,
223+
changeset: Option::<Box<C>>::None,
224224
error: err,
225225
})
226226
}
@@ -257,7 +257,7 @@ where
257257
#[derive(Debug)]
258258
pub struct StoreErrorWithDump<C> {
259259
/// The partially-aggregated changeset.
260-
pub changeset: Option<C>,
260+
pub changeset: Option<Box<C>>,
261261

262262
/// The [`StoreError`]
263263
pub error: StoreError,
@@ -266,7 +266,7 @@ pub struct StoreErrorWithDump<C> {
266266
impl<C> From<io::Error> for StoreErrorWithDump<C> {
267267
fn from(value: io::Error) -> Self {
268268
Self {
269-
changeset: Option::<C>::None,
269+
changeset: Option::<Box<C>>::None,
270270
error: StoreError::Io(value),
271271
}
272272
}
@@ -371,7 +371,7 @@ mod test {
371371
changeset,
372372
error: StoreError::Bincode(_),
373373
}) => {
374-
assert_eq!(changeset, Some(test_changesets))
374+
assert_eq!(changeset, Some(Box::new(test_changesets)))
375375
}
376376
unexpected_res => panic!("unexpected result: {unexpected_res:?}"),
377377
}
@@ -399,7 +399,7 @@ mod test {
399399
changeset,
400400
error: StoreError::Bincode(_),
401401
}) => {
402-
assert_eq!(changeset, Some(test_changesets))
402+
assert_eq!(changeset, Some(Box::new(test_changesets)))
403403
}
404404
unexpected_res => panic!("unexpected result: {unexpected_res:?}"),
405405
}
@@ -500,10 +500,14 @@ mod test {
500500
.expect_err("should fail to aggregate");
501501
assert_eq!(
502502
err.changeset,
503-
changesets.iter().cloned().reduce(|mut acc, cs| {
504-
Merge::merge(&mut acc, cs);
505-
acc
506-
}),
503+
changesets
504+
.iter()
505+
.cloned()
506+
.reduce(|mut acc, cs| {
507+
Merge::merge(&mut acc, cs);
508+
acc
509+
})
510+
.map(Box::new),
507511
"should recover all changesets that are written in full",
508512
);
509513
// Remove file and start again

0 commit comments

Comments
 (0)