@ -4,9 +4,8 @@ use crate::monero::{
} ;
use ::monero ::{ Address , Network , PrivateKey , PublicKey } ;
use anyhow ::{ Context , Result } ;
use monero_rpc ::wallet ;
use monero_rpc ::wallet ::{ BlockHeight , CheckTxKey , MoneroWalletRpc as _ , Refreshed } ;
use std ::future ::Future ;
use monero_rpc ::wallet ::{ BlockHeight , MoneroWalletRpc as _ , Refreshed } ;
use monero_rpc ::{ jsonrpc , wallet } ;
use std ::str ::FromStr ;
use std ::time ::Duration ;
use tokio ::sync ::Mutex ;
@ -172,6 +171,13 @@ impl Wallet {
}
pub async fn transfer ( & self , request : TransferRequest ) -> Result < TransferProof > {
let inner = self . inner . lock ( ) . await ;
inner
. open_wallet ( self . name . clone ( ) )
. await
. with_context ( | | format! ( "Failed to open wallet {}" , self . name ) ) ? ;
let TransferRequest {
public_spend_key ,
public_view_key ,
@ -181,10 +187,7 @@ impl Wallet {
let destination_address =
Address ::standard ( self . network , public_spend_key , public_view_key . into ( ) ) ;
let res = self
. inner
. lock ( )
. await
let res = inner
. transfer_single ( 0 , amount . as_piconero ( ) , & destination_address . to_string ( ) )
. await ? ;
@ -222,24 +225,15 @@ impl Wallet {
let address = Address ::standard ( self . network , public_spend_key , public_view_key . into ( ) ) ;
let check_interval = tokio ::time ::interval ( self . sync_interval ) ;
let key = transfer_proof . tx_key ( ) . to_string ( ) ;
wait_for_confirmations (
txid . 0 ,
move | txid | {
let key = key . clone ( ) ;
async move {
Ok ( self
. inner
. lock ( )
. await
. check_tx_key ( txid , key , address . to_string ( ) )
. await ? )
}
} ,
check_interval ,
& self . inner ,
transfer_proof ,
address ,
expected ,
conf_target ,
check_interval ,
self . name . clone ( ) ,
)
. await ? ;
@ -294,27 +288,49 @@ pub struct WatchRequest {
pub expected : Amount ,
}
async fn wait_for_confirmations < Fut > (
txid: String ,
fetch_tx: impl Fn ( String ) -> Fut ,
mut check_interval : Interval ,
async fn wait_for_confirmations < C: monero_rpc ::wallet ::MoneroWalletRpc < reqwest ::Client > + Sync > (
client: & Mutex < C > ,
transfer_proof: TransferProof ,
to_address : Address ,
expected : Amount ,
conf_target : u64 ,
) -> Result < ( ) , InsufficientFunds >
where
Fut : Future < Output = Result < CheckTxKey > > ,
{
mut check_interval : Interval ,
wallet_name : String ,
) -> Result < ( ) , InsufficientFunds > {
let mut seen_confirmations = 0 u64 ;
while seen_confirmations < conf_target {
check_interval . tick ( ) . await ; // tick() at the beginning of the loop so every `continue` tick()s as well
let tx = match fetch_tx ( txid . clone ( ) ) . await {
let txid = transfer_proof . tx_hash ( ) . to_string ( ) ;
let client = client . lock ( ) . await ;
let tx = match client
. check_tx_key (
txid . clone ( ) ,
transfer_proof . tx_key . to_string ( ) ,
to_address . to_string ( ) ,
)
. await
{
Ok ( proof ) = > proof ,
Err ( error ) = > {
Err ( jsonrpc ::Error ::JsonRpc ( jsonrpc ::JsonRpcError { code : - 1 , .. } ) ) = > {
tracing ::warn ! ( % txid , "`monero-wallet-rpc` failed to fetch transaction, may need to be restarted" ) ;
continue ;
}
// TODO: Implement this using a generic proxy for each function call once https://github.com/thomaseizinger/rust-jsonrpc-client/issues/47 is fixed.
Err ( jsonrpc ::Error ::JsonRpc ( jsonrpc ::JsonRpcError { code : - 13 , .. } ) ) = > {
tracing ::debug ! (
"Opening wallet `{}` because no wallet is loaded" ,
wallet_name
) ;
let _ = client . open_wallet ( wallet_name . clone ( ) ) . await ;
continue ;
}
Err ( other ) = > {
tracing ::debug ! (
% txid ,
"Failed to retrieve tx from blockchain: {:#}" , error
"Failed to retrieve tx from blockchain: {:#}" , othe r
) ;
continue ; // treating every error as transient and retrying
// is obviously wrong but the jsonrpc client is
@ -349,76 +365,208 @@ where
#[ cfg(test) ]
mod tests {
use super ::* ;
use crate ::tracing_ext ::capture_logs ;
use monero_rpc ::wallet ::CheckTxKey ;
use std ::sync ::atomic ::{ AtomicU32 , AtomicU64, Ordering} ;
use std::sync ::Arc ;
use std ::sync ::atomic ::{ AtomicU32 , Ordering} ;
use tracing::metadata ::LevelFilter ;
#[ tokio::test ]
async fn given_exact_confirmations_does_not_fetch_tx_again ( ) {
let requests = Arc ::new ( AtomicU32 ::new ( 0 ) ) ;
let result = wait_for_confirmations (
String ::from ( "TXID" ) ,
move | _ | {
let requests = requests . clone ( ) ;
async move {
match requests . fetch_add ( 1 , Ordering ::SeqCst ) {
0 = > Ok ( CheckTxKey {
let client = Mutex ::new ( DummyClient ::new ( vec! [ Ok ( CheckTxKey {
confirmations : 10 ,
received : 100 ,
} ) ] ) ) ;
let result = wait_for_confirmations (
& client ,
TransferProof ::new ( TxHash ( "<FOO>" . to_owned ( ) ) , PrivateKey {
scalar : crate ::monero ::Scalar ::random ( & mut rand ::thread_rng ( ) )
} ) ,
_ = > panic! ( "should not be called more than once" ) ,
}
}
} ,
tokio ::time ::interval ( Duration ::from_millis ( 10 ) ) ,
"53H3QthYLckeCXh9u38vohb2gZ4QgEG3FMWHNxccR6MqV1LdDVYwF1FKsRJPj4tTupWLf9JtGPBcn2MVN6c9oR7p5Uf7JdJ" . parse ( ) . unwrap ( ) ,
Amount ::from_piconero ( 100 ) ,
10 ,
tokio ::time ::interval ( Duration ::from_millis ( 10 ) ) ,
"foo-wallet" . to_owned ( )
)
. await ;
assert! ( result . is_ok ( ) )
assert! ( result . is_ok ( ) ) ;
assert_eq! (
client
. lock ( )
. await
. check_tx_key_invocations
. load ( Ordering ::SeqCst ) ,
1
) ;
}
/// A test that allows us to easily, visually verify if the log output is as
/// we desire.
///
/// We want the following properties:
/// - Only print confirmations if they changed i.e. not every time we
/// request them
/// - Also print the last one, i.e. 10 / 10
#[ tokio::test ]
async fn visual_log_check ( ) {
let _ = tracing_subscriber ::fmt ( ) . with_test_writer ( ) . try_init ( ) ;
const MAX_REQUESTS : u64 = 20 ;
let writer = capture_logs ( LevelFilter ::INFO ) ;
let requests = Arc ::new ( AtomicU64 ::new ( 0 ) ) ;
let client = Mutex ::new ( DummyClient ::new ( vec! [
Ok ( CheckTxKey {
confirmations : 1 ,
received : 100 ,
} ) ,
Ok ( CheckTxKey {
confirmations : 1 ,
received : 100 ,
} ) ,
Ok ( CheckTxKey {
confirmations : 1 ,
received : 100 ,
} ) ,
Ok ( CheckTxKey {
confirmations : 3 ,
received : 100 ,
} ) ,
Ok ( CheckTxKey {
confirmations : 5 ,
received : 100 ,
} ) ,
] ) ) ;
let result = wait_for_confirmations (
String ::from ( "TXID" ) ,
move | _ | {
let requests = requests . clone ( ) ;
wait_for_confirmations (
& client ,
TransferProof ::new ( TxHash ( "<FOO>" . to_owned ( ) ) , PrivateKey {
scalar : crate ::monero ::Scalar ::random ( & mut rand ::thread_rng ( ) )
} ) ,
"53H3QthYLckeCXh9u38vohb2gZ4QgEG3FMWHNxccR6MqV1LdDVYwF1FKsRJPj4tTupWLf9JtGPBcn2MVN6c9oR7p5Uf7JdJ" . parse ( ) . unwrap ( ) ,
Amount ::from_piconero ( 100 ) ,
5 ,
tokio ::time ::interval ( Duration ::from_millis ( 10 ) ) ,
"foo-wallet" . to_owned ( )
)
. await
. unwrap ( ) ;
assert_eq! (
writer . captured ( ) ,
r " INFO swap ::monero ::wallet : Received new confirmation for Monero lock tx txid = < FOO > seen_confirmations = 1 needed_confirmations = 5
INFO swap ::monero ::wallet : Received new confirmation for Monero lock tx txid = < FOO > seen_confirmations = 3 needed_confirmations = 5
INFO swap ::monero ::wallet : Received new confirmation for Monero lock tx txid = < FOO > seen_confirmations = 5 needed_confirmations = 5
"
) ;
}
async move {
match requests . fetch_add ( 1 , Ordering ::SeqCst ) {
requests if requests < = MAX_REQUESTS = > {
#[ tokio::test ]
async fn reopens_wallet_in_case_not_available ( ) {
let writer = capture_logs ( LevelFilter ::DEBUG ) ;
let client = Mutex ::new ( DummyClient ::new ( vec! [
Ok ( CheckTxKey {
confirmations : requests / 2 , /* every 2nd request "yields" a
* confirmation * /
confirmations : 1 ,
received : 100 ,
} )
} ) ,
Ok ( CheckTxKey {
confirmations : 1 ,
received : 100 ,
} ) ,
Err ( ( - 13 , "No wallet file" . to_owned ( ) ) ) ,
Ok ( CheckTxKey {
confirmations : 3 ,
received : 100 ,
} ) ,
Ok ( CheckTxKey {
confirmations : 5 ,
received : 100 ,
} ) ,
] ) ) ;
wait_for_confirmations (
& client ,
TransferProof ::new ( TxHash ( "<FOO>" . to_owned ( ) ) , PrivateKey {
scalar : crate ::monero ::Scalar ::random ( & mut rand ::thread_rng ( ) )
} ) ,
"53H3QthYLckeCXh9u38vohb2gZ4QgEG3FMWHNxccR6MqV1LdDVYwF1FKsRJPj4tTupWLf9JtGPBcn2MVN6c9oR7p5Uf7JdJ" . parse ( ) . unwrap ( ) ,
Amount ::from_piconero ( 100 ) ,
5 ,
tokio ::time ::interval ( Duration ::from_millis ( 10 ) ) ,
"foo-wallet" . to_owned ( )
)
. await
. unwrap ( ) ;
assert_eq! (
writer . captured ( ) ,
r " INFO swap ::monero ::wallet : Received new confirmation for Monero lock tx txid = < FOO > seen_confirmations = 1 needed_confirmations = 5
DEBUG swap ::monero ::wallet : Opening wallet ` foo - wallet ` because no wallet is loaded
INFO swap ::monero ::wallet : Received new confirmation for Monero lock tx txid = < FOO > seen_confirmations = 3 needed_confirmations = 5
INFO swap ::monero ::wallet : Received new confirmation for Monero lock tx txid = < FOO > seen_confirmations = 5 needed_confirmations = 5
"
) ;
assert_eq! (
client
. lock ( )
. await
. open_wallet_invocations
. load ( Ordering ::SeqCst ) ,
1
) ;
}
type ErrorCode = i64 ;
type ErrorMessage = String ;
struct DummyClient {
check_tx_key_responses : Vec < Result < wallet ::CheckTxKey , ( ErrorCode , ErrorMessage ) > > ,
check_tx_key_invocations : AtomicU32 ,
open_wallet_invocations : AtomicU32 ,
}
_ = > panic! ( "should not be called more than {} times" , MAX_REQUESTS ) ,
impl DummyClient {
fn new (
check_tx_key_responses : Vec < Result < wallet ::CheckTxKey , ( ErrorCode , ErrorMessage ) > > ,
) -> Self {
Self {
check_tx_key_responses ,
check_tx_key_invocations : Default ::default ( ) ,
open_wallet_invocations : Default ::default ( ) ,
}
}
} ,
tokio ::time ::interval ( Duration ::from_millis ( 10 ) ) ,
Amount ::from_piconero ( 100 ) ,
10 ,
)
. await ;
}
#[ async_trait::async_trait ]
impl monero_rpc ::wallet ::MoneroWalletRpc < reqwest ::Client > for DummyClient {
async fn open_wallet (
& self ,
_ : String ,
) -> Result < wallet ::WalletOpened , monero_rpc ::jsonrpc ::Error < reqwest ::Error > > {
self . open_wallet_invocations . fetch_add ( 1 , Ordering ::SeqCst ) ;
Ok ( monero_rpc ::wallet ::Empty { } )
}
async fn check_tx_key (
& self ,
_ : String ,
_ : String ,
_ : String ,
) -> Result < wallet ::CheckTxKey , monero_rpc ::jsonrpc ::Error < reqwest ::Error > > {
let index = self . check_tx_key_invocations . fetch_add ( 1 , Ordering ::SeqCst ) ;
self . check_tx_key_responses [ index as usize ]
. clone ( )
. map_err ( | ( code , message ) | {
monero_rpc ::jsonrpc ::Error ::JsonRpc ( monero_rpc ::jsonrpc ::JsonRpcError {
code ,
message ,
data : None ,
} )
} )
}
assert! ( result . is_ok ( ) )
async fn send_request < P > (
& self ,
_ : String ,
) -> Result < monero_rpc ::jsonrpc ::Response < P > , reqwest ::Error >
where
P : serde ::de ::DeserializeOwned ,
{
todo! ( )
}
}
}