diff --git a/CHANGELOG.md b/CHANGELOG.md index 68a0a996..8992f81c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 The CLI expects to be connected to the ASB throughout the entire swap and hence reconnects as soon as the connection is closed. This resulted in a loop of connections being established but instantly closed again because the ASB deemed the connection to not be necessary. See issue https://github.com/comit-network/xmr-btc-swap/issues/648. +- An issue where the ASB was unable to use the Monero wallet in case `monero-wallet-rpc` has been restarted. + In case no wallet is loaded when we try to interact with the `monero-wallet-rpc` daemon, we now load the correct wallet on-demand. + See issue https://github.com/comit-network/xmr-btc-swap/issues/652. ## [0.8.1] - 2021-08-16 diff --git a/monero-rpc/src/lib.rs b/monero-rpc/src/lib.rs index a1b2eaaa..dd09c06d 100644 --- a/monero-rpc/src/lib.rs +++ b/monero-rpc/src/lib.rs @@ -14,3 +14,5 @@ pub mod monerod; pub mod wallet; + +pub use jsonrpc_client as jsonrpc; diff --git a/swap/src/monero/wallet.rs b/swap/src/monero/wallet.rs index 955a12b1..859b451b 100644 --- a/swap/src/monero/wallet.rs +++ b/swap/src/monero/wallet.rs @@ -4,9 +4,8 @@ use crate::monero::{ }; use ::monero::{Address, Network, PrivateKey, PublicKey}; use anyhow::{Context, Result}; -use monero_rpc::wallet; -use monero_rpc::wallet::{BlockHeight, CheckTxKey, MoneroWalletRpc as _, Refreshed}; -use std::future::Future; +use monero_rpc::wallet::{BlockHeight, MoneroWalletRpc as _, Refreshed}; +use monero_rpc::{jsonrpc, wallet}; use std::str::FromStr; use std::time::Duration; use tokio::sync::Mutex; @@ -172,6 +171,13 @@ impl Wallet { } pub async fn transfer(&self, request: TransferRequest) -> Result { + let inner = self.inner.lock().await; + + inner + .open_wallet(self.name.clone()) + .await + .with_context(|| format!("Failed to open wallet {}", self.name))?; + let TransferRequest { public_spend_key, public_view_key, @@ -181,10 +187,7 @@ impl Wallet { let destination_address = Address::standard(self.network, public_spend_key, public_view_key.into()); - let res = self - .inner - .lock() - .await + let res = inner .transfer_single(0, amount.as_piconero(), &destination_address.to_string()) .await?; @@ -222,24 +225,15 @@ impl Wallet { let address = Address::standard(self.network, public_spend_key, public_view_key.into()); let check_interval = tokio::time::interval(self.sync_interval); - let key = transfer_proof.tx_key().to_string(); wait_for_confirmations( - txid.0, - move |txid| { - let key = key.clone(); - async move { - Ok(self - .inner - .lock() - .await - .check_tx_key(txid, key, address.to_string()) - .await?) - } - }, - check_interval, + &self.inner, + transfer_proof, + address, expected, conf_target, + check_interval, + self.name.clone(), ) .await?; @@ -294,27 +288,49 @@ pub struct WatchRequest { pub expected: Amount, } -async fn wait_for_confirmations( - txid: String, - fetch_tx: impl Fn(String) -> Fut, - mut check_interval: Interval, +async fn wait_for_confirmations + Sync>( + client: &Mutex, + transfer_proof: TransferProof, + to_address: Address, expected: Amount, conf_target: u64, -) -> Result<(), InsufficientFunds> -where - Fut: Future>, -{ + mut check_interval: Interval, + wallet_name: String, +) -> Result<(), InsufficientFunds> { let mut seen_confirmations = 0u64; while seen_confirmations < conf_target { check_interval.tick().await; // tick() at the beginning of the loop so every `continue` tick()s as well - let tx = match fetch_tx(txid.clone()).await { + let txid = transfer_proof.tx_hash().to_string(); + let client = client.lock().await; + + let tx = match client + .check_tx_key( + txid.clone(), + transfer_proof.tx_key.to_string(), + to_address.to_string(), + ) + .await + { Ok(proof) => proof, - Err(error) => { + Err(jsonrpc::Error::JsonRpc(jsonrpc::JsonRpcError { code: -1, .. })) => { + tracing::warn!(%txid, "`monero-wallet-rpc` failed to fetch transaction, may need to be restarted"); + continue; + } + // TODO: Implement this using a generic proxy for each function call once https://github.com/thomaseizinger/rust-jsonrpc-client/issues/47 is fixed. + Err(jsonrpc::Error::JsonRpc(jsonrpc::JsonRpcError { code: -13, .. })) => { + tracing::debug!( + "Opening wallet `{}` because no wallet is loaded", + wallet_name + ); + let _ = client.open_wallet(wallet_name.clone()).await; + continue; + } + Err(other) => { tracing::debug!( %txid, - "Failed to retrieve tx from blockchain: {:#}", error + "Failed to retrieve tx from blockchain: {:#}", other ); continue; // treating every error as transient and retrying // is obviously wrong but the jsonrpc client is @@ -349,76 +365,208 @@ where #[cfg(test)] mod tests { use super::*; + use crate::tracing_ext::capture_logs; use monero_rpc::wallet::CheckTxKey; - use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; - use std::sync::Arc; + use std::sync::atomic::{AtomicU32, Ordering}; + use tracing::metadata::LevelFilter; #[tokio::test] async fn given_exact_confirmations_does_not_fetch_tx_again() { - let requests = Arc::new(AtomicU32::new(0)); + let client = Mutex::new(DummyClient::new(vec![Ok(CheckTxKey { + confirmations: 10, + received: 100, + })])); let result = wait_for_confirmations( - String::from("TXID"), - move |_| { - let requests = requests.clone(); - - async move { - match requests.fetch_add(1, Ordering::SeqCst) { - 0 => Ok(CheckTxKey { - confirmations: 10, - received: 100, - }), - _ => panic!("should not be called more than once"), - } - } - }, - tokio::time::interval(Duration::from_millis(10)), + &client, + TransferProof::new(TxHash("".to_owned()), PrivateKey { + scalar: crate::monero::Scalar::random(&mut rand::thread_rng()) + }), + "53H3QthYLckeCXh9u38vohb2gZ4QgEG3FMWHNxccR6MqV1LdDVYwF1FKsRJPj4tTupWLf9JtGPBcn2MVN6c9oR7p5Uf7JdJ".parse().unwrap(), Amount::from_piconero(100), 10, + tokio::time::interval(Duration::from_millis(10)), + "foo-wallet".to_owned() ) .await; - assert!(result.is_ok()) + assert!(result.is_ok()); + assert_eq!( + client + .lock() + .await + .check_tx_key_invocations + .load(Ordering::SeqCst), + 1 + ); } - /// A test that allows us to easily, visually verify if the log output is as - /// we desire. - /// - /// We want the following properties: - /// - Only print confirmations if they changed i.e. not every time we - /// request them - /// - Also print the last one, i.e. 10 / 10 #[tokio::test] async fn visual_log_check() { - let _ = tracing_subscriber::fmt().with_test_writer().try_init(); - const MAX_REQUESTS: u64 = 20; + let writer = capture_logs(LevelFilter::INFO); + + let client = Mutex::new(DummyClient::new(vec![ + Ok(CheckTxKey { + confirmations: 1, + received: 100, + }), + Ok(CheckTxKey { + confirmations: 1, + received: 100, + }), + Ok(CheckTxKey { + confirmations: 1, + received: 100, + }), + Ok(CheckTxKey { + confirmations: 3, + received: 100, + }), + Ok(CheckTxKey { + confirmations: 5, + received: 100, + }), + ])); - let requests = Arc::new(AtomicU64::new(0)); - - let result = wait_for_confirmations( - String::from("TXID"), - move |_| { - let requests = requests.clone(); - - async move { - match requests.fetch_add(1, Ordering::SeqCst) { - requests if requests <= MAX_REQUESTS => { - Ok(CheckTxKey { - confirmations: requests / 2, /* every 2nd request "yields" a - * confirmation */ - received: 100, - }) - } - _ => panic!("should not be called more than {} times", MAX_REQUESTS), - } - } - }, + wait_for_confirmations( + &client, + TransferProof::new(TxHash("".to_owned()), PrivateKey { + scalar: crate::monero::Scalar::random(&mut rand::thread_rng()) + }), + "53H3QthYLckeCXh9u38vohb2gZ4QgEG3FMWHNxccR6MqV1LdDVYwF1FKsRJPj4tTupWLf9JtGPBcn2MVN6c9oR7p5Uf7JdJ".parse().unwrap(), + Amount::from_piconero(100), + 5, tokio::time::interval(Duration::from_millis(10)), + "foo-wallet".to_owned() + ) + .await + .unwrap(); + + assert_eq!( + writer.captured(), + r" INFO swap::monero::wallet: Received new confirmation for Monero lock tx txid= seen_confirmations=1 needed_confirmations=5 + INFO swap::monero::wallet: Received new confirmation for Monero lock tx txid= seen_confirmations=3 needed_confirmations=5 + INFO swap::monero::wallet: Received new confirmation for Monero lock tx txid= seen_confirmations=5 needed_confirmations=5 +" + ); + } + + #[tokio::test] + async fn reopens_wallet_in_case_not_available() { + let writer = capture_logs(LevelFilter::DEBUG); + + let client = Mutex::new(DummyClient::new(vec![ + Ok(CheckTxKey { + confirmations: 1, + received: 100, + }), + Ok(CheckTxKey { + confirmations: 1, + received: 100, + }), + Err((-13, "No wallet file".to_owned())), + Ok(CheckTxKey { + confirmations: 3, + received: 100, + }), + Ok(CheckTxKey { + confirmations: 5, + received: 100, + }), + ])); + + wait_for_confirmations( + &client, + TransferProof::new(TxHash("".to_owned()), PrivateKey { + scalar: crate::monero::Scalar::random(&mut rand::thread_rng()) + }), + "53H3QthYLckeCXh9u38vohb2gZ4QgEG3FMWHNxccR6MqV1LdDVYwF1FKsRJPj4tTupWLf9JtGPBcn2MVN6c9oR7p5Uf7JdJ".parse().unwrap(), Amount::from_piconero(100), - 10, + 5, + tokio::time::interval(Duration::from_millis(10)), + "foo-wallet".to_owned() ) - .await; + .await + .unwrap(); + + assert_eq!( + writer.captured(), + r" INFO swap::monero::wallet: Received new confirmation for Monero lock tx txid= seen_confirmations=1 needed_confirmations=5 +DEBUG swap::monero::wallet: Opening wallet `foo-wallet` because no wallet is loaded + INFO swap::monero::wallet: Received new confirmation for Monero lock tx txid= seen_confirmations=3 needed_confirmations=5 + INFO swap::monero::wallet: Received new confirmation for Monero lock tx txid= seen_confirmations=5 needed_confirmations=5 +" + ); + assert_eq!( + client + .lock() + .await + .open_wallet_invocations + .load(Ordering::SeqCst), + 1 + ); + } + + type ErrorCode = i64; + type ErrorMessage = String; + + struct DummyClient { + check_tx_key_responses: Vec>, + + check_tx_key_invocations: AtomicU32, + open_wallet_invocations: AtomicU32, + } - assert!(result.is_ok()) + impl DummyClient { + fn new( + check_tx_key_responses: Vec>, + ) -> Self { + Self { + check_tx_key_responses, + check_tx_key_invocations: Default::default(), + open_wallet_invocations: Default::default(), + } + } + } + + #[async_trait::async_trait] + impl monero_rpc::wallet::MoneroWalletRpc for DummyClient { + async fn open_wallet( + &self, + _: String, + ) -> Result> { + self.open_wallet_invocations.fetch_add(1, Ordering::SeqCst); + + Ok(monero_rpc::wallet::Empty {}) + } + + async fn check_tx_key( + &self, + _: String, + _: String, + _: String, + ) -> Result> { + let index = self.check_tx_key_invocations.fetch_add(1, Ordering::SeqCst); + + self.check_tx_key_responses[index as usize] + .clone() + .map_err(|(code, message)| { + monero_rpc::jsonrpc::Error::JsonRpc(monero_rpc::jsonrpc::JsonRpcError { + code, + message, + data: None, + }) + }) + } + + async fn send_request

( + &self, + _: String, + ) -> Result, reqwest::Error> + where + P: serde::de::DeserializeOwned, + { + todo!() + } } }