@ -4,48 +4,31 @@ use crate::execution_params::ExecutionParams;
use ::bitcoin ::util ::psbt ::PartiallySignedTransaction ;
use ::bitcoin ::Txid ;
use anyhow ::{ anyhow , bail , Context , Result } ;
use backoff ::backoff ::Constant as ConstantBackoff ;
use backoff ::future ::retry ;
use bdk ::blockchain ::{ noop_progress , Blockchain , ElectrumBlockchain } ;
use bdk ::descriptor ::Segwitv0 ;
use bdk ::electrum_client ::{ self , Client, ElectrumApi } ;
use bdk ::electrum_client ::{ self , ElectrumApi, GetHistoryRes } ;
use bdk ::keys ::DerivableKey ;
use bdk ::{ FeeRate , KeychainKind } ;
use bitcoin ::Script ;
use reqwest ::Url ;
use serde ::{ Deserialize , Serialize } ;
use std ::collections ::BTreeMap ;
use std ::convert ::TryFrom ;
use std ::fmt ;
use std ::path ::Path ;
use std ::sync ::Arc ;
use std ::time ::Duration ;
use std ::time ::{ Duration , Instant } ;
use tokio ::sync ::Mutex ;
use tokio ::time ::interval ;
const SLED_TREE_NAME : & str = "default_tree" ;
#[ derive(Debug, thiserror::Error) ]
enum Error {
#[ error( " Sending the request failed " ) ]
Io ( reqwest ::Error ) ,
#[ error( " Conversion to Integer failed " ) ]
Parse ( std ::num ::ParseIntError ) ,
#[ error( " The transaction is not minded yet " ) ]
NotYetMined ,
#[ error( " Deserialization failed " ) ]
JsonDeserialization ( reqwest ::Error ) ,
#[ error( " Electrum client error " ) ]
ElectrumClient ( electrum_client ::Error ) ,
}
pub struct Wallet {
inner : Arc < Mutex < bdk ::Wallet < ElectrumBlockchain , bdk ::sled ::Tree > > > ,
http_url : Url ,
rpc_url : Url ,
client : Arc < Mutex < Client > > ,
wallet : Arc < Mutex < bdk ::Wallet < ElectrumBlockchain , bdk ::sled ::Tree > > > ,
}
impl Wallet {
pub async fn new (
electrum_rpc_url : Url ,
electrum_http_url : Url ,
network : bitcoin ::Network ,
wallet_dir : & Path ,
key : impl DerivableKey < Segwitv0 > + Clone ,
@ -53,8 +36,9 @@ impl Wallet {
// Workaround for https://github.com/bitcoindevkit/rust-electrum-client/issues/47.
let config = electrum_client ::ConfigBuilder ::default ( ) . retry ( 2 ) . build ( ) ;
let client = Client ::from_config ( electrum_rpc_url . as_str ( ) , config )
. map_err ( | e | anyhow ! ( "Failed to init electrum rpc client: {:?}" , e ) ) ? ;
let client =
bdk ::electrum_client ::Client ::from_config ( electrum_rpc_url . as_str ( ) , config . clone ( ) )
. map_err ( | e | anyhow ! ( "Failed to init electrum rpc client: {:?}" , e ) ) ? ;
let db = bdk ::sled ::open ( wallet_dir ) ? . open_tree ( SLED_TREE_NAME ) ? ;
@ -66,16 +50,20 @@ impl Wallet {
ElectrumBlockchain ::from ( client ) ,
) ? ;
let electrum = bdk ::electrum_client ::Client ::from_config ( electrum_rpc_url . as_str ( ) , config )
. map_err ( | e | anyhow ! ( "Failed to init electrum rpc client {:?}" , e ) ) ? ;
let interval = Duration ::from_secs ( 5 ) ;
Ok ( Self {
inner : Arc ::new ( Mutex ::new ( bdk_wallet ) ) ,
http_url : electrum_http_url ,
rpc_url : electrum_rpc_url ,
wallet : Arc ::new ( Mutex ::new ( bdk_wallet ) ) ,
client : Arc ::new ( Mutex ::new ( Client ::new ( electrum , interval ) ? ) ) ,
} )
}
pub async fn balance ( & self ) -> Result < Amount > {
let balance = self
. inner
. wallet
. lock ( )
. await
. get_balance ( )
@ -86,7 +74,7 @@ impl Wallet {
pub async fn new_address ( & self ) -> Result < Address > {
let address = self
. inner
. wallet
. lock ( )
. await
. get_new_address ( )
@ -96,13 +84,14 @@ impl Wallet {
}
pub async fn get_tx ( & self , txid : Txid ) -> Result < Option < Transaction > > {
let tx = self . inner . lock ( ) . await . client ( ) . get_tx ( & txid ) ? ;
let tx = self . wallet . lock ( ) . await . client ( ) . get_tx ( & txid ) ? ;
Ok ( tx )
}
pub async fn transaction_fee ( & self , txid : Txid ) -> Result < Amount > {
let fees = self
. inner
. wallet
. lock ( )
. await
. list_transactions ( true ) ?
@ -117,7 +106,7 @@ impl Wallet {
}
pub async fn sync ( & self ) -> Result < ( ) > {
self . inner
self . wallet
. lock ( )
. await
. sync ( noop_progress ( ) , None )
@ -131,7 +120,7 @@ impl Wallet {
address : Address ,
amount : Amount ,
) -> Result < PartiallySignedTransaction > {
let wallet = self . inner . lock ( ) . await ;
let wallet = self . wallet . lock ( ) . await ;
let mut tx_builder = wallet . build_tx ( ) ;
tx_builder . add_recipient ( address . script_pubkey ( ) , amount . as_sat ( ) ) ;
@ -147,7 +136,7 @@ impl Wallet {
/// already accounting for the fees we need to spend to get the
/// transaction confirmed.
pub async fn max_giveable ( & self , locking_script_size : usize ) -> Result < Amount > {
let wallet = self . inner . lock ( ) . await ;
let wallet = self . wallet . lock ( ) . await ;
let mut tx_builder = wallet . build_tx ( ) ;
@ -163,7 +152,7 @@ impl Wallet {
}
pub async fn get_network ( & self ) -> bitcoin ::Network {
self . inner . lock ( ) . await . network ( )
self . wallet . lock ( ) . await . network ( )
}
/// Broadcast the given transaction to the network and emit a log statement
@ -171,7 +160,7 @@ impl Wallet {
pub async fn broadcast ( & self , transaction : Transaction , kind : & str ) -> Result < Txid > {
let txid = transaction . txid ( ) ;
self . inner
self . wallet
. lock ( )
. await
. broadcast ( transaction )
@ -185,7 +174,7 @@ impl Wallet {
}
pub async fn sign_and_finalize ( & self , psbt : PartiallySignedTransaction ) -> Result < Transaction > {
let ( signed_psbt , finalized ) = self . inner . lock ( ) . await . sign ( psbt , None ) ? ;
let ( signed_psbt , finalized ) = self . wallet . lock ( ) . await . sign ( psbt , None ) ? ;
if ! finalized {
bail ! ( "PSBT is not finalized" )
@ -202,106 +191,62 @@ impl Wallet {
. ok_or_else ( | | anyhow ! ( "Could not get raw tx with id: {}" , txid ) )
}
pub async fn watch_for_raw_transaction ( & self , txid : Txid ) -> Result < Transaction > {
tracing ::debug ! ( "watching for tx: {}" , txid ) ;
let tx = retry ( ConstantBackoff ::new ( Duration ::from_secs ( 1 ) ) , | | async {
let client = Client ::new ( self . rpc_url . as_ref ( ) )
. map_err ( | err | backoff ::Error ::Permanent ( Error ::ElectrumClient ( err ) ) ) ? ;
let tx = client . transaction_get ( & txid ) . map_err ( | err | match err {
electrum_client ::Error ::Protocol ( err ) = > {
tracing ::debug ! ( "Received protocol error {} from Electrum, retrying..." , err ) ;
backoff ::Error ::Transient ( Error ::NotYetMined )
}
err = > backoff ::Error ::Permanent ( Error ::ElectrumClient ( err ) ) ,
} ) ? ;
Result ::< _ , backoff ::Error < Error > > ::Ok ( tx )
} )
. await
. context ( "Transient errors should be retried" ) ? ;
Ok ( tx )
pub async fn status_of_script ( & self , script : & Script , txid : & Txid ) -> Result < ScriptStatus > {
self . client . lock ( ) . await . status_of_script ( script , txid )
}
pub async fn get_block_height ( & self ) -> Result < BlockHeight > {
let url = make_blocks_tip_height_url ( & self . http_url ) ? ;
pub async fn watch_until_status (
& self ,
txid : Txid ,
script : Script ,
mut status_fn : impl FnMut ( ScriptStatus ) -> bool ,
) -> Result < ( ) > {
let mut last_status = None ;
let height = retry ( ConstantBackoff ::new ( Duration ::from_secs ( 1 ) ) , | | async {
let height = reqwest ::get ( url . clone ( ) )
. await
. map_err ( Error ::Io ) ?
. text ( )
. await
. map_err ( Error ::Io ) ?
. parse ::< u32 > ( )
. map_err ( | err | backoff ::Error ::Permanent ( Error ::Parse ( err ) ) ) ? ;
Result ::< _ , backoff ::Error < Error > > ::Ok ( height )
} )
. await
. context ( "Transient errors should be retried" ) ? ;
loop {
let new_status = self . client . lock ( ) . await . status_of_script ( & script , & txid ) ? ;
Ok ( BlockHeight ::new ( height ) )
}
if Some ( new_status ) ! = last_status {
tracing ::debug ! ( % txid , "Transaction is {}" , new_status ) ;
}
last_status = Some ( new_status ) ;
pub async fn transaction_block_height ( & self , txid : Txid ) -> Result < BlockHeight > {
let status_url = make_tx_status_url ( & self . http_url , txid ) ? ;
if status_fn ( new_status ) {
break ;
}
#[ derive(Serialize, Deserialize, Debug, Clone) ]
struct TransactionStatus {
block_height : Option < u32 > ,
confirmed : bool ,
tokio ::time ::sleep ( Duration ::from_secs ( 5 ) ) . await ;
}
let height = retry ( ConstantBackoff ::new ( Duration ::from_secs ( 1 ) ) , | | async {
let block_height = reqwest ::get ( status_url . clone ( ) )
. await
. map_err ( | err | backoff ::Error ::Transient ( Error ::Io ( err ) ) ) ?
. json ::< TransactionStatus > ( )
. await
. map_err ( | err | backoff ::Error ::Permanent ( Error ::JsonDeserialization ( err ) ) ) ?
. block_height
. ok_or ( backoff ::Error ::Transient ( Error ::NotYetMined ) ) ? ;
Result ::< _ , backoff ::Error < Error > > ::Ok ( block_height )
} )
. await
. context ( "Transient errors should be retried" ) ? ;
Ok ( BlockHeight ::new ( height ) )
Ok ( ( ) )
}
pub async fn wait_for_transaction_finality (
& self ,
txid : Txid ,
script_to_watch : Script ,
execution_params : ExecutionParams ,
) -> Result < ( ) > {
let conf_target = execution_params . bitcoin_finality_confirmations ;
tracing ::info ! ( % txid , "Waiting for {} confirmation{} of Bitcoin transaction" , conf_target , if conf_target > 1 { "s" } else { "" } ) ;
// Divide by 4 to not check too often yet still be aware of the new block early
// on.
let mut interval = interval ( execution_params . bitcoin_avg_block_time / 4 ) ;
let mut seen_confirmations = 0 ;
loop {
let tx_block_height = self . transaction_block_height ( txid ) . await ? ;
tracing ::debug ! ( "tx_block_height: {:?}" , tx_block_height ) ;
let block_height = self . get_block_height ( ) . await ? ;
tracing ::debug ! ( "latest_block_height: {:?}" , block_height ) ;
if let Some ( confirmations ) = block_height . checked_sub (
tx_block_height
. checked_sub ( BlockHeight ::new ( 1 ) )
. expect ( "transaction must be included in block with height >= 1" ) ,
) {
tracing ::debug ! ( % txid , "confirmations: {:?}" , confirmations ) ;
if u32 ::from ( confirmations ) > = conf_target {
break ;
self . watch_until_status ( txid , script_to_watch , | status | match status {
ScriptStatus ::Confirmed ( inner ) = > {
let confirmations = inner . confirmations ( ) ;
if confirmations > seen_confirmations {
tracing ::info ! ( % txid , "Bitcoin tx has {} out of {} confirmation{}" , confirmations , conf_target , if conf_target > 1 { "s" } else { "" } ) ;
seen_confirmations = confirmations ;
}
}
interval . tick ( ) . await ;
}
inner . meets_target ( conf_target )
} ,
_ = > false
} )
. await ? ;
Ok ( ( ) )
}
@ -314,42 +259,258 @@ impl Wallet {
}
}
fn make_tx_status_url ( base_url : & Url , txid : Txid ) -> Result < Url > {
let url = base_url . join ( & format! ( "tx/{}/status" , txid ) ) ? ;
struct Client {
electrum : bdk ::electrum_client ::Client ,
latest_block : BlockHeight ,
last_ping : Instant ,
interval : Duration ,
script_history : BTreeMap < Script , Vec < GetHistoryRes > > ,
}
impl Client {
fn new ( electrum : bdk ::electrum_client ::Client , interval : Duration ) -> Result < Self > {
let latest_block = electrum . block_headers_subscribe ( ) . map_err ( | e | {
anyhow ! (
"Electrum client failed to subscribe to header notifications: {:?}" ,
e
)
} ) ? ;
Ok ( Self {
electrum ,
latest_block : BlockHeight ::try_from ( latest_block ) ? ,
last_ping : Instant ::now ( ) ,
interval ,
script_history : Default ::default ( ) ,
} )
}
/// Ping the electrum server unless we already did within the set interval.
///
/// Returns a boolean indicating whether we actually pinged the server.
fn ping ( & mut self ) -> bool {
if self . last_ping . elapsed ( ) < = self . interval {
return false ;
}
match self . electrum . ping ( ) {
Ok ( ( ) ) = > {
self . last_ping = Instant ::now ( ) ;
true
}
Err ( error ) = > {
tracing ::debug ! ( ? error , "Failed to ping electrum server" ) ;
false
}
}
}
fn drain_notifications ( & mut self ) -> Result < ( ) > {
let pinged = self . ping ( ) ;
if ! pinged {
return Ok ( ( ) ) ;
}
self . drain_blockheight_notifications ( ) ? ;
self . update_script_histories ( ) ? ;
Ok ( ( ) )
}
fn status_of_script ( & mut self , script : & Script , txid : & Txid ) -> Result < ScriptStatus > {
if ! self . script_history . contains_key ( script ) {
self . script_history . insert ( script . clone ( ) , vec! [ ] ) ;
}
self . drain_notifications ( ) ? ;
let history = self . script_history . entry ( script . clone ( ) ) . or_default ( ) ;
Ok ( url )
let history_of_tx = history
. iter ( )
. filter ( | entry | & entry . tx_hash = = txid )
. collect ::< Vec < _ > > ( ) ;
match history_of_tx . as_slice ( ) {
[ ] = > Ok ( ScriptStatus ::Unseen ) ,
[ remaining @ .. , last ] = > {
if ! remaining . is_empty ( ) {
tracing ::warn ! ( "Found more than a single history entry for script. This is highly unexpected and those history entries will be ignored." )
}
if last . height < = 0 {
Ok ( ScriptStatus ::InMempool )
} else {
Ok ( ScriptStatus ::Confirmed (
Confirmed ::from_inclusion_and_latest_block (
u32 ::try_from ( last . height ) ? ,
u32 ::from ( self . latest_block ) ,
) ,
) )
}
}
}
}
fn drain_blockheight_notifications ( & mut self ) -> Result < ( ) > {
let latest_block = std ::iter ::from_fn ( | | self . electrum . block_headers_pop ( ) . transpose ( ) )
. last ( )
. transpose ( )
. map_err ( | e | anyhow ! ( "Failed to pop header notification: {:?}" , e ) ) ? ;
if let Some ( new_block ) = latest_block {
tracing ::debug ! (
"Got notification for new block at height {}" ,
new_block . height
) ;
self . latest_block = BlockHeight ::try_from ( new_block ) ? ;
}
Ok ( ( ) )
}
fn update_script_histories ( & mut self ) -> Result < ( ) > {
let histories = self
. electrum
. batch_script_get_history ( self . script_history . keys ( ) )
. map_err ( | e | anyhow ! ( "Failed to get script histories {:?}" , e ) ) ? ;
if histories . len ( ) ! = self . script_history . len ( ) {
bail ! (
"Expected {} history entries, received {}" ,
self . script_history . len ( ) ,
histories . len ( )
) ;
}
let scripts = self . script_history . keys ( ) . cloned ( ) ;
let histories = histories . into_iter ( ) ;
self . script_history = scripts . zip ( histories ) . collect ::< BTreeMap < _ , _ > > ( ) ;
Ok ( ( ) )
}
}
#[ derive(Debug, Copy, Clone, PartialEq) ]
pub enum ScriptStatus {
Unseen ,
InMempool ,
Confirmed ( Confirmed ) ,
}
impl ScriptStatus {
pub fn from_confirmations ( confirmations : u32 ) -> Self {
match confirmations {
0 = > Self ::InMempool ,
confirmations = > Self ::Confirmed ( Confirmed ::new ( confirmations - 1 ) ) ,
}
}
}
#[ derive(Debug, Copy, Clone, PartialEq) ]
pub struct Confirmed {
/// The depth of this transaction within the blockchain.
///
/// Will be zero if the transaction is included in the latest block.
depth : u32 ,
}
fn make_blocks_tip_height_url ( base_url : & Url ) -> Result < Url > {
let url = base_url . join ( "blocks/tip/height" ) ? ;
impl Confirmed {
pub fn new ( depth : u32 ) -> Self {
Self { depth }
}
/// Compute the depth of a transaction based on its inclusion height and the
/// latest known block.
///
/// Our information about the latest block might be outdated. To avoid an
/// overflow, we make sure the depth is 0 in case the inclusion height
/// exceeds our latest known block,
pub fn from_inclusion_and_latest_block ( inclusion_height : u32 , latest_block : u32 ) -> Self {
let depth = latest_block . saturating_sub ( inclusion_height ) ;
Self { depth }
}
pub fn confirmations ( & self ) -> u32 {
self . depth + 1
}
Ok ( url )
pub fn meets_target < T > ( & self , target : T ) -> bool
where
u32 : PartialOrd < T > ,
{
self . confirmations ( ) > = target
}
}
impl ScriptStatus {
/// Check if the script has any confirmations.
pub fn is_confirmed ( & self ) -> bool {
matches! ( self , ScriptStatus ::Confirmed ( _ ) )
}
/// Check if the script has met the given confirmation target.
pub fn is_confirmed_with < T > ( & self , target : T ) -> bool
where
u32 : PartialOrd < T > ,
{
match self {
ScriptStatus ::Confirmed ( inner ) = > inner . meets_target ( target ) ,
_ = > false ,
}
}
pub fn has_been_seen ( & self ) -> bool {
matches! ( self , ScriptStatus ::InMempool | ScriptStatus ::Confirmed ( _ ) )
}
}
impl fmt ::Display for ScriptStatus {
fn fmt ( & self , f : & mut fmt ::Formatter < ' _ > ) -> fmt ::Result {
match self {
ScriptStatus ::Unseen = > write! ( f , "unseen" ) ,
ScriptStatus ::InMempool = > write! ( f , "in mempool" ) ,
ScriptStatus ::Confirmed ( inner ) = > {
write! ( f , "confirmed with {} blocks" , inner . confirmations ( ) )
}
}
}
}
#[ cfg(test) ]
mod tests {
use super ::* ;
use crate ::cli ::command ::DEFAULT_ELECTRUM_HTTP_URL ;
#[ test ]
fn create_tx_status_url_from_default_base_url_success ( ) {
let base_url = DEFAULT_ELECTRUM_HTTP_URL . parse ( ) . unwrap ( ) ;
let txid = Txid ::default ;
fn given_depth_0_should_meet_confirmation_target_one ( ) {
let script = ScriptStatus ::Confirmed ( Confirmed { depth : 0 } ) ;
let confirmed = script . is_confirmed_with ( 1 ) ;
assert! ( confirmed )
}
#[ test ]
fn given_confirmations_1_should_meet_confirmation_target_one ( ) {
let script = ScriptStatus ::from_confirmations ( 1 ) ;
let url = make_tx_status_url ( & base_url , txid ( ) ) . unwrap ( ) ;
let confirmed = script . is_confirmed_with ( 1 ) ;
assert_eq! ( url . as_str ( ) , "https://blockstream.info/testnet/api/tx/0000000000000000000000000000000000000000000000000000000000000000/status" ) ;
assert !( confirmed )
}
#[ test ]
fn create_block_tip_height_url_from_default_base_url_success ( ) {
let base_url = DEFAULT_ELECTRUM_HTTP_URL . parse ( ) . unwrap ( ) ;
fn given_inclusion_after_lastest_known_block_at_least_depth_0 ( ) {
let included_in = 10 ;
let latest_block = 9 ;
let url = make_blocks_tip_height_url ( & base_url ) . unwrap ( ) ;
let confirmed = Confirmed ::from_inclusion_and_latest_block ( included_in , latest_block ) ;
assert_eq! (
url . as_str ( ) ,
"https://blockstream.info/testnet/api/blocks/tip/height"
) ;
assert_eq! ( confirmed . depth , 0 )
}
}