diff --git a/monero-harness/src/rpc/monerod.rs b/monero-harness/src/rpc/monerod.rs index 9e32c645..aa00a62d 100644 --- a/monero-harness/src/rpc/monerod.rs +++ b/monero-harness/src/rpc/monerod.rs @@ -6,12 +6,7 @@ use crate::{ use anyhow::Result; use reqwest::Url; use serde::{Deserialize, Serialize}; - -// #[cfg(not(test))] -// use tracing::debug; -// -// #[cfg(test)] -use std::eprintln as debug; +use tracing::debug; /// RPC client for monerod and monero-wallet-rpc. #[derive(Debug, Clone)] diff --git a/monero-harness/src/rpc/wallet.rs b/monero-harness/src/rpc/wallet.rs index 7afaa2c8..a81702ac 100644 --- a/monero-harness/src/rpc/wallet.rs +++ b/monero-harness/src/rpc/wallet.rs @@ -3,9 +3,7 @@ use crate::rpc::{Request, Response}; use anyhow::Result; use reqwest::Url; use serde::{Deserialize, Serialize}; - -// TODO: Either use println! directly or import tracing also? -use std::println as debug; +use tracing::debug; /// JSON RPC client for monero-wallet-rpc. #[derive(Debug)] diff --git a/swap/Cargo.toml b/swap/Cargo.toml index de572e12..fa283dd7 100644 --- a/swap/Cargo.toml +++ b/swap/Cargo.toml @@ -12,13 +12,15 @@ atty = "0.2" backoff = { version = "0.2", features = ["tokio"] } base64 = "0.12" bitcoin = { version = "0.23", features = ["rand", "use-serde"] } # TODO: Upgrade other crates in this repo to use this version. -bitcoin-harness = { git = "https://github.com/coblox/bitcoin-harness-rs", rev = "d402b36d3d6406150e3bfb71492ff4a0a7cb290e" } +bitcoin-harness = { git = "https://github.com/coblox/bitcoin-harness-rs", rev = "f1bbe6a4540d0741f1f4f22577cfeeadbfd7aaaf" } derivative = "2" futures = { version = "0.3", default-features = false } +genawaiter = "0.99.1" libp2p = { version = "0.29", default-features = false, features = ["tcp-tokio", "yamux", "mplex", "dns", "noise", "request-response"] } libp2p-tokio-socks5 = "0.4" log = { version = "0.4", features = ["serde"] } monero = "0.9" +monero-harness = { path = "../monero-harness" } rand = "0.7" reqwest = { version = "0.10", default-features = false, features = ["socks"] } serde = { version = "1", features = ["derive"] } @@ -45,6 +47,7 @@ hyper = "0.13" port_check = "0.1" spectral = "0.6" tempfile = "3" +testcontainers = "0.10" [features] default = [] diff --git a/swap/src/alice.rs b/swap/src/alice.rs index ca8a45e3..4e57c135 100644 --- a/swap/src/alice.rs +++ b/swap/src/alice.rs @@ -1,22 +1,30 @@ //! Run an XMR/BTC swap in the role of Alice. //! Alice holds XMR and wishes receive BTC. use anyhow::Result; +use async_trait::async_trait; +use backoff::{backoff::Constant as ConstantBackoff, future::FutureOperation as _}; +use genawaiter::GeneratorState; use libp2p::{ core::{identity::Keypair, Multiaddr}, request_response::ResponseChannel, NetworkBehaviour, PeerId, }; use rand::rngs::OsRng; -use std::thread; -use tracing::{debug, info}; +use std::{sync::Arc, time::Duration}; +use tokio::sync::Mutex; +use tracing::{debug, info, warn}; mod amounts; mod message0; mod message1; mod message2; +mod message3; -use self::{amounts::*, message0::*, message1::*, message2::*}; +use self::{amounts::*, message0::*, message1::*, message2::*, message3::*}; use crate::{ + bitcoin, + bitcoin::TX_LOCK_MINE_TIMEOUT, + monero, network::{ peer_tracker::{self, PeerTracker}, request_response::AliceToBob, @@ -24,32 +32,111 @@ use crate::{ }, SwapAmounts, PUNISH_TIMELOCK, REFUND_TIMELOCK, }; -use xmr_btc::{alice::State0, bob, monero}; - -pub type Swarm = libp2p::Swarm; +use xmr_btc::{ + alice::{self, action_generator, Action, ReceiveBitcoinRedeemEncsig, State0}, + bitcoin::BroadcastSignedTransaction, + bob, + monero::{CreateWalletForOutput, Transfer}, +}; pub async fn swap( + bitcoin_wallet: Arc, + monero_wallet: Arc, listen: Multiaddr, local_port: Option, - redeem_address: ::bitcoin::Address, - punish_address: ::bitcoin::Address, ) -> Result<()> { + struct Network { + swarm: Arc>, + channel: Option>, + } + + impl Network { + pub async fn send_message2(&mut self, proof: monero::TransferProof) { + match self.channel.take() { + None => warn!("Channel not found, did you call this twice?"), + Some(channel) => { + let mut guard = self.swarm.lock().await; + guard.send_message2(channel, alice::Message2 { + tx_lock_proof: proof, + }); + info!("Sent transfer proof"); + } + } + } + } + + // TODO: For retry, use `backoff::ExponentialBackoff` in production as opposed + // to `ConstantBackoff`. + + #[async_trait] + impl ReceiveBitcoinRedeemEncsig for Network { + async fn receive_bitcoin_redeem_encsig(&mut self) -> xmr_btc::bitcoin::EncryptedSignature { + #[derive(Debug)] + struct UnexpectedMessage; + + let encsig = (|| async { + let mut guard = self.swarm.lock().await; + let encsig = match guard.next().await { + OutEvent::Message3(msg) => msg.tx_redeem_encsig, + other => { + warn!("Expected Bob's Bitcoin redeem encsig, got: {:?}", other); + return Err(backoff::Error::Transient(UnexpectedMessage)); + } + }; + + Result::<_, backoff::Error>::Ok(encsig) + }) + .retry(ConstantBackoff::new(Duration::from_secs(1))) + .await + .expect("transient errors to be retried"); + + info!("Received Bitcoin redeem encsig"); + + encsig + } + } + let mut swarm = new_swarm(listen, local_port)?; let message0: bob::Message0; + let mut state0: Option = None; let mut last_amounts: Option = None; + // TODO: This loop is a neat idea for local development, as it allows us to keep + // Alice up and let Bob keep trying to connect, request amounts and/or send the + // first message of the handshake, but it comes at the cost of needing to handle + // mutable state, which has already been the source of a bug at one point. This + // is an obvious candidate for refactoring loop { match swarm.next().await { - OutEvent::ConnectionEstablished(id) => { - info!("Connection established with: {}", id); + OutEvent::ConnectionEstablished(bob) => { + info!("Connection established with: {}", bob); } OutEvent::Request(amounts::OutEvent::Btc { btc, channel }) => { - debug!("Got request from Bob to swap {}", btc); let amounts = calculate_amounts(btc); - // TODO: We cache the last amounts returned, this needs improving along with - // verification of message 0. last_amounts = Some(amounts); swarm.send_amounts(channel, amounts); + + let SwapAmounts { btc, xmr } = amounts; + + let redeem_address = bitcoin_wallet.as_ref().new_address().await?; + let punish_address = redeem_address.clone(); + + // TODO: Pass this in using + let rng = &mut OsRng; + let state = State0::new( + rng, + btc, + xmr, + REFUND_TIMELOCK, + PUNISH_TIMELOCK, + redeem_address, + punish_address, + ); + + info!("Commencing handshake"); + swarm.set_state0(state.clone()); + + state0 = Some(state) } OutEvent::Message0(msg) => { // We don't want Bob to be able to crash us by sending an out of @@ -64,26 +151,7 @@ pub async fn swap( }; } - let (xmr, btc) = match last_amounts { - Some(p) => (p.xmr, p.btc), - None => unreachable!("should have amounts by here"), - }; - - // TODO: Pass this in using - let rng = &mut OsRng; - let state0 = State0::new( - rng, - btc, - xmr, - REFUND_TIMELOCK, - PUNISH_TIMELOCK, - redeem_address, - punish_address, - ); - swarm.set_state0(state0.clone()); - - // TODO: Can we verify message 0 before calling this so we never fail? - let state1 = state0.receive(message0).expect("failed to receive msg 0"); + let state1 = state0.expect("to be set").receive(message0)?; let (state2, channel) = match swarm.next().await { OutEvent::Message1 { msg, channel } => { @@ -96,17 +164,72 @@ pub async fn swap( let msg = state2.next_message(); swarm.send_message1(channel, msg); - let _state3 = match swarm.next().await { - OutEvent::Message2(msg) => state2.receive(msg)?, + let (state3, channel) = match swarm.next().await { + OutEvent::Message2 { msg, channel } => { + let state3 = state2.receive(msg)?; + (state3, channel) + } other => panic!("Unexpected event: {:?}", other), }; info!("Handshake complete, we now have State3 for Alice."); - thread::park(); - Ok(()) + let network = Arc::new(Mutex::new(Network { + swarm: Arc::new(Mutex::new(swarm)), + channel: Some(channel), + })); + + let mut action_generator = action_generator( + network.clone(), + bitcoin_wallet.clone(), + state3, + TX_LOCK_MINE_TIMEOUT, + ); + + loop { + let state = action_generator.async_resume().await; + + tracing::info!("Resumed execution of generator, got: {:?}", state); + + match state { + GeneratorState::Yielded(Action::LockXmr { + amount, + public_spend_key, + public_view_key, + }) => { + let (transfer_proof, _) = monero_wallet + .transfer(public_spend_key, public_view_key, amount) + .await?; + + let mut guard = network.as_ref().lock().await; + guard.send_message2(transfer_proof).await; + info!("Sent transfer proof"); + } + + GeneratorState::Yielded(Action::RedeemBtc(tx)) => { + let _ = bitcoin_wallet.broadcast_signed_transaction(tx).await?; + } + GeneratorState::Yielded(Action::CancelBtc(tx)) => { + let _ = bitcoin_wallet.broadcast_signed_transaction(tx).await?; + } + GeneratorState::Yielded(Action::PunishBtc(tx)) => { + let _ = bitcoin_wallet.broadcast_signed_transaction(tx).await?; + } + GeneratorState::Yielded(Action::CreateMoneroWalletForOutput { + spend_key, + view_key, + }) => { + monero_wallet + .create_and_load_wallet_for_output(spend_key, view_key) + .await?; + } + GeneratorState::Complete(()) => return Ok(()), + } + } } +pub type Swarm = libp2p::Swarm; + fn new_swarm(listen: Multiaddr, port: Option) -> Result { use anyhow::Context as _; @@ -155,7 +278,11 @@ pub enum OutEvent { msg: bob::Message1, channel: ResponseChannel, }, - Message2(bob::Message2), + Message2 { + msg: bob::Message2, + channel: ResponseChannel, + }, + Message3(bob::Message3), } impl From for OutEvent { @@ -193,7 +320,15 @@ impl From for OutEvent { impl From for OutEvent { fn from(event: message2::OutEvent) -> Self { match event { - message2::OutEvent::Msg(msg) => OutEvent::Message2(msg), + message2::OutEvent::Msg { msg, channel } => OutEvent::Message2 { msg, channel }, + } + } +} + +impl From for OutEvent { + fn from(event: message3::OutEvent) -> Self { + match event { + message3::OutEvent::Msg(msg) => OutEvent::Message3(msg), } } } @@ -208,6 +343,7 @@ pub struct Alice { message0: Message0, message1: Message1, message2: Message2, + message3: Message3, #[behaviour(ignore)] identity: Keypair, } @@ -225,10 +361,12 @@ impl Alice { pub fn send_amounts(&mut self, channel: ResponseChannel, amounts: SwapAmounts) { let msg = AliceToBob::Amounts(amounts); self.amounts.send(channel, msg); + info!("Sent amounts response"); } /// Message0 gets sent within the network layer using this state0. pub fn set_state0(&mut self, state: State0) { + debug!("Set state 0"); let _ = self.message0.set_state(state); } @@ -238,7 +376,18 @@ impl Alice { channel: ResponseChannel, msg: xmr_btc::alice::Message1, ) { - self.message1.send(channel, msg) + self.message1.send(channel, msg); + debug!("Sent Message1"); + } + + /// Send Message2 to Bob in response to receiving his Message2. + pub fn send_message2( + &mut self, + channel: ResponseChannel, + msg: xmr_btc::alice::Message2, + ) { + self.message2.send(channel, msg); + debug!("Sent Message2"); } } @@ -252,17 +401,18 @@ impl Default for Alice { message0: Message0::default(), message1: Message1::default(), message2: Message2::default(), + message3: Message3::default(), identity, } } } fn calculate_amounts(btc: ::bitcoin::Amount) -> SwapAmounts { - const XMR_PER_BTC: u64 = 100; // TODO: Get this from an exchange. + // TODO: Get this from an exchange. + // This value corresponds to 100 XMR per BTC + const PICONERO_PER_SAT: u64 = 1_000_000; - // TODO: Check that this is correct. - // XMR uses 12 zerose BTC uses 8. - let picos = (btc.as_sat() * 10000) * XMR_PER_BTC; + let picos = btc.as_sat() * PICONERO_PER_SAT; let xmr = monero::Amount::from_piconero(picos); SwapAmounts { btc, xmr } diff --git a/swap/src/alice/amounts.rs b/swap/src/alice/amounts.rs index 39396861..33e230d5 100644 --- a/swap/src/alice/amounts.rs +++ b/swap/src/alice/amounts.rs @@ -1,11 +1,10 @@ -use anyhow::Result; use libp2p::{ request_response::{ - handler::RequestProtocol, ProtocolSupport, RequestId, RequestResponse, - RequestResponseConfig, RequestResponseEvent, RequestResponseMessage, ResponseChannel, + handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, + RequestResponseEvent, RequestResponseMessage, ResponseChannel, }, swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}, - NetworkBehaviour, PeerId, + NetworkBehaviour, }; use std::{ collections::VecDeque, @@ -14,7 +13,7 @@ use std::{ }; use tracing::{debug, error}; -use crate::network::request_response::{AliceToBob, BobToAlice, Codec, Protocol, TIMEOUT}; +use crate::network::request_response::{AliceToBob, AmountsProtocol, BobToAlice, Codec, TIMEOUT}; #[derive(Debug)] pub enum OutEvent { @@ -29,7 +28,7 @@ pub enum OutEvent { #[behaviour(out_event = "OutEvent", poll_method = "poll")] #[allow(missing_debug_implementations)] pub struct Amounts { - rr: RequestResponse, + rr: RequestResponse>, #[behaviour(ignore)] events: VecDeque, } @@ -40,23 +39,11 @@ impl Amounts { self.rr.send_response(channel, msg); } - pub async fn request_amounts( - &mut self, - alice: PeerId, - btc: ::bitcoin::Amount, - ) -> Result { - let msg = BobToAlice::AmountsFromBtc(btc); - let id = self.rr.send_request(&alice, msg); - debug!("Request sent to: {}", alice); - - Ok(id) - } - fn poll( &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll, OutEvent>> { + ) -> Poll>, OutEvent>> { if let Some(event) = self.events.pop_front() { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); } @@ -75,7 +62,7 @@ impl Default for Amounts { Self { rr: RequestResponse::new( Codec::default(), - vec![(Protocol, ProtocolSupport::Full)], + vec![(AmountsProtocol, ProtocolSupport::Full)], config, ), events: Default::default(), @@ -92,12 +79,12 @@ impl NetworkBehaviourEventProcess> request, channel, .. }, .. - } => match request { - BobToAlice::AmountsFromBtc(btc) => { + } => { + if let BobToAlice::AmountsFromBtc(btc) = request { + debug!("Received amounts request"); self.events.push_back(OutEvent::Btc { btc, channel }) } - other => debug!("got request: {:?}", other), - }, + } RequestResponseEvent::Message { message: RequestResponseMessage::Response { .. }, .. diff --git a/swap/src/alice/message0.rs b/swap/src/alice/message0.rs index 460c63e4..b10b3aca 100644 --- a/swap/src/alice/message0.rs +++ b/swap/src/alice/message0.rs @@ -15,7 +15,7 @@ use std::{ }; use tracing::{debug, error}; -use crate::network::request_response::{AliceToBob, BobToAlice, Codec, Protocol, TIMEOUT}; +use crate::network::request_response::{AliceToBob, BobToAlice, Codec, Message0Protocol, TIMEOUT}; use xmr_btc::{alice::State0, bob}; #[derive(Debug)] @@ -28,7 +28,7 @@ pub enum OutEvent { #[behaviour(out_event = "OutEvent", poll_method = "poll")] #[allow(missing_debug_implementations)] pub struct Message0 { - rr: RequestResponse, + rr: RequestResponse>, #[behaviour(ignore)] events: VecDeque, #[behaviour(ignore)] @@ -49,7 +49,7 @@ impl Message0 { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll, OutEvent>> { + ) -> Poll>, OutEvent>> { if let Some(event) = self.events.pop_front() { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); } @@ -67,7 +67,7 @@ impl Default for Message0 { Self { rr: RequestResponse::new( Codec::default(), - vec![(Protocol, ProtocolSupport::Full)], + vec![(Message0Protocol, ProtocolSupport::Full)], config, ), events: Default::default(), @@ -85,8 +85,9 @@ impl NetworkBehaviourEventProcess> request, channel, .. }, .. - } => match request { - BobToAlice::Message0(msg) => { + } => { + if let BobToAlice::Message0(msg) = request { + debug!("Received Message0"); let response = match &self.state { None => panic!("No state, did you forget to set it?"), Some(state) => { @@ -95,10 +96,11 @@ impl NetworkBehaviourEventProcess> } }; self.rr.send_response(channel, response); + debug!("Sent Message0"); + self.events.push_back(OutEvent::Msg(msg)); } - other => debug!("got request: {:?}", other), - }, + } RequestResponseEvent::Message { message: RequestResponseMessage::Response { .. }, .. diff --git a/swap/src/alice/message1.rs b/swap/src/alice/message1.rs index 27df9e20..4d44b0a2 100644 --- a/swap/src/alice/message1.rs +++ b/swap/src/alice/message1.rs @@ -13,7 +13,7 @@ use std::{ }; use tracing::{debug, error}; -use crate::network::request_response::{AliceToBob, BobToAlice, Codec, Protocol, TIMEOUT}; +use crate::network::request_response::{AliceToBob, BobToAlice, Codec, Message1Protocol, TIMEOUT}; use xmr_btc::bob; #[derive(Debug)] @@ -31,7 +31,7 @@ pub enum OutEvent { #[behaviour(out_event = "OutEvent", poll_method = "poll")] #[allow(missing_debug_implementations)] pub struct Message1 { - rr: RequestResponse, + rr: RequestResponse>, #[behaviour(ignore)] events: VecDeque, } @@ -46,7 +46,7 @@ impl Message1 { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll, OutEvent>> { + ) -> Poll>, OutEvent>> { if let Some(event) = self.events.pop_front() { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); } @@ -64,7 +64,7 @@ impl Default for Message1 { Self { rr: RequestResponse::new( Codec::default(), - vec![(Protocol, ProtocolSupport::Full)], + vec![(Message1Protocol, ProtocolSupport::Full)], config, ), events: Default::default(), @@ -81,12 +81,12 @@ impl NetworkBehaviourEventProcess> request, channel, .. }, .. - } => match request { - BobToAlice::Message1(msg) => { + } => { + if let BobToAlice::Message1(msg) = request { + debug!("Received Message1"); self.events.push_back(OutEvent::Msg { msg, channel }); } - other => debug!("got request: {:?}", other), - }, + } RequestResponseEvent::Message { message: RequestResponseMessage::Response { .. }, .. diff --git a/swap/src/alice/message2.rs b/swap/src/alice/message2.rs index a8704c0d..bab62d3e 100644 --- a/swap/src/alice/message2.rs +++ b/swap/src/alice/message2.rs @@ -13,12 +13,17 @@ use std::{ }; use tracing::{debug, error}; -use crate::network::request_response::{AliceToBob, BobToAlice, Codec, Protocol, TIMEOUT}; +use crate::network::request_response::{AliceToBob, BobToAlice, Codec, Message2Protocol, TIMEOUT}; use xmr_btc::bob; #[derive(Debug)] pub enum OutEvent { - Msg(bob::Message2), + Msg { + /// Received message from Bob. + msg: bob::Message2, + /// Channel to send back Alice's message 2. + channel: ResponseChannel, + }, } /// A `NetworkBehaviour` that represents receiving of message 2 from Bob. @@ -26,7 +31,7 @@ pub enum OutEvent { #[behaviour(out_event = "OutEvent", poll_method = "poll")] #[allow(missing_debug_implementations)] pub struct Message2 { - rr: RequestResponse, + rr: RequestResponse>, #[behaviour(ignore)] events: VecDeque, } @@ -41,7 +46,7 @@ impl Message2 { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll, OutEvent>> { + ) -> Poll>, OutEvent>> { if let Some(event) = self.events.pop_front() { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); } @@ -59,7 +64,7 @@ impl Default for Message2 { Self { rr: RequestResponse::new( Codec::default(), - vec![(Protocol, ProtocolSupport::Full)], + vec![(Message2Protocol, ProtocolSupport::Full)], config, ), events: Default::default(), @@ -76,15 +81,12 @@ impl NetworkBehaviourEventProcess> request, channel, .. }, .. - } => match request { - BobToAlice::Message2(msg) => { - self.events.push_back(OutEvent::Msg(msg)); - // Send back empty response so that the request/response protocol completes. - let msg = AliceToBob::EmptyResponse; - self.rr.send_response(channel, msg); + } => { + if let BobToAlice::Message2(msg) = request { + debug!("Received Message2"); + self.events.push_back(OutEvent::Msg { msg, channel }); } - other => debug!("got request: {:?}", other), - }, + } RequestResponseEvent::Message { message: RequestResponseMessage::Response { .. }, .. diff --git a/swap/src/alice/message3.rs b/swap/src/alice/message3.rs new file mode 100644 index 00000000..0e4bd773 --- /dev/null +++ b/swap/src/alice/message3.rs @@ -0,0 +1,94 @@ +use libp2p::{ + request_response::{ + handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, + RequestResponseEvent, RequestResponseMessage, + }, + swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}, + NetworkBehaviour, +}; +use std::{ + collections::VecDeque, + task::{Context, Poll}, + time::Duration, +}; +use tracing::{debug, error}; + +use crate::network::request_response::{AliceToBob, BobToAlice, Codec, Message3Protocol, TIMEOUT}; +use xmr_btc::bob; + +#[derive(Debug)] +pub enum OutEvent { + Msg(bob::Message3), +} + +/// A `NetworkBehaviour` that represents receiving of message 3 from Bob. +#[derive(NetworkBehaviour)] +#[behaviour(out_event = "OutEvent", poll_method = "poll")] +#[allow(missing_debug_implementations)] +pub struct Message3 { + rr: RequestResponse>, + #[behaviour(ignore)] + events: VecDeque, +} + +impl Message3 { + fn poll( + &mut self, + _: &mut Context<'_>, + _: &mut impl PollParameters, + ) -> Poll>, OutEvent>> { + if let Some(event) = self.events.pop_front() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + } + + Poll::Pending + } +} + +impl Default for Message3 { + fn default() -> Self { + let timeout = Duration::from_secs(TIMEOUT); + let mut config = RequestResponseConfig::default(); + config.set_request_timeout(timeout); + + Self { + rr: RequestResponse::new( + Codec::default(), + vec![(Message3Protocol, ProtocolSupport::Full)], + config, + ), + events: Default::default(), + } + } +} + +impl NetworkBehaviourEventProcess> for Message3 { + fn inject_event(&mut self, event: RequestResponseEvent) { + match event { + RequestResponseEvent::Message { + message: + RequestResponseMessage::Request { + request, channel, .. + }, + .. + } => { + if let BobToAlice::Message3(msg) = request { + debug!("Received Message3"); + self.events.push_back(OutEvent::Msg(msg)); + // Send back empty response so that the request/response protocol completes. + self.rr.send_response(channel, AliceToBob::Message3); + } + } + RequestResponseEvent::Message { + message: RequestResponseMessage::Response { .. }, + .. + } => panic!("Alice should not get a Response"), + RequestResponseEvent::InboundFailure { error, .. } => { + error!("Inbound failure: {:?}", error); + } + RequestResponseEvent::OutboundFailure { error, .. } => { + error!("Outbound failure: {:?}", error); + } + } + } +} diff --git a/swap/src/bitcoin.rs b/swap/src/bitcoin.rs index 2a4a8e8d..f9e1493d 100644 --- a/swap/src/bitcoin.rs +++ b/swap/src/bitcoin.rs @@ -1,14 +1,19 @@ +use std::time::Duration; + use anyhow::Result; use async_trait::async_trait; -use backoff::{future::FutureOperation as _, ExponentialBackoff}; +use backoff::{backoff::Constant as ConstantBackoff, future::FutureOperation as _}; use bitcoin::{util::psbt::PartiallySignedTransaction, Address, Transaction}; -use bitcoin_harness::bitcoind_rpc::PsbtBase64; +use bitcoin_harness::{bitcoind_rpc::PsbtBase64, Bitcoind}; use reqwest::Url; +use tokio::time; use xmr_btc::bitcoin::{ - Amount, BroadcastSignedTransaction, BuildTxLockPsbt, SignTxLock, TxLock, Txid, - WatchForRawTransaction, + Amount, BlockHeight, BroadcastSignedTransaction, BuildTxLockPsbt, SignTxLock, + TransactionBlockHeight, TxLock, Txid, WatchForRawTransaction, }; +pub const TX_LOCK_MINE_TIMEOUT: u64 = 3600; + // This is cut'n'paste from xmr_btc/tests/harness/wallet/bitcoin.rs #[derive(Debug)] @@ -41,6 +46,22 @@ impl Wallet { } } +pub async fn make_wallet( + name: &str, + bitcoind: &Bitcoind<'_>, + fund_amount: Amount, +) -> Result { + let wallet = Wallet::new(name, &bitcoind.node_url).await?; + let buffer = Amount::from_btc(1.0).unwrap(); + let amount = fund_amount + buffer; + + let address = wallet.0.new_address().await.unwrap(); + + bitcoind.mint(address, amount).await.unwrap(); + + Ok(wallet) +} + #[async_trait] impl BuildTxLockPsbt for Wallet { async fn build_tx_lock_psbt( @@ -81,19 +102,63 @@ impl SignTxLock for Wallet { impl BroadcastSignedTransaction for Wallet { async fn broadcast_signed_transaction(&self, transaction: Transaction) -> Result { let txid = self.0.send_raw_transaction(transaction).await?; + + // TODO: Instead of guessing how long it will take for the transaction to be + // mined we should ask bitcoind for the number of confirmations on `txid` + + // give time for transaction to be mined + time::delay_for(Duration::from_millis(1100)).await; + Ok(txid) } } +// TODO: For retry, use `backoff::ExponentialBackoff` in production as opposed +// to `ConstantBackoff`. + #[async_trait] impl WatchForRawTransaction for Wallet { async fn watch_for_raw_transaction(&self, txid: Txid) -> Transaction { (|| async { Ok(self.0.get_raw_transaction(txid).await?) }) - .retry(ExponentialBackoff { - max_elapsed_time: None, - ..Default::default() - }) + .retry(ConstantBackoff::new(Duration::from_secs(1))) .await .expect("transient errors to be retried") } } + +#[async_trait] +impl BlockHeight for Wallet { + async fn block_height(&self) -> u32 { + (|| async { Ok(self.0.block_height().await?) }) + .retry(ConstantBackoff::new(Duration::from_secs(1))) + .await + .expect("transient errors to be retried") + } +} + +#[async_trait] +impl TransactionBlockHeight for Wallet { + async fn transaction_block_height(&self, txid: Txid) -> u32 { + #[derive(Debug)] + enum Error { + Io, + NotYetMined, + } + + (|| async { + let block_height = self + .0 + .transaction_block_height(txid) + .await + .map_err(|_| backoff::Error::Transient(Error::Io))?; + + let block_height = + block_height.ok_or_else(|| backoff::Error::Transient(Error::NotYetMined))?; + + Result::<_, backoff::Error>::Ok(block_height) + }) + .retry(ConstantBackoff::new(Duration::from_secs(1))) + .await + .expect("transient errors to be retried") + } +} diff --git a/swap/src/bob.rs b/swap/src/bob.rs index a955d0fb..451fc200 100644 --- a/swap/src/bob.rs +++ b/swap/src/bob.rs @@ -1,46 +1,85 @@ //! Run an XMR/BTC swap in the role of Bob. //! Bob holds BTC and wishes receive XMR. use anyhow::Result; +use async_trait::async_trait; +use backoff::{backoff::Constant as ConstantBackoff, future::FutureOperation as _}; use futures::{ channel::mpsc::{Receiver, Sender}, - StreamExt, + FutureExt, StreamExt, }; +use genawaiter::GeneratorState; use libp2p::{core::identity::Keypair, Multiaddr, NetworkBehaviour, PeerId}; use rand::rngs::OsRng; -use std::{process, thread}; -use tracing::{debug, info}; +use std::{process, sync::Arc, time::Duration}; +use tokio::sync::Mutex; +use tracing::{debug, info, warn}; mod amounts; mod message0; mod message1; mod message2; +mod message3; -use self::{amounts::*, message0::*, message1::*, message2::*}; +use self::{amounts::*, message0::*, message1::*, message2::*, message3::*}; use crate::{ + bitcoin, + bitcoin::TX_LOCK_MINE_TIMEOUT, + monero, network::{ peer_tracker::{self, PeerTracker}, transport, TokioExecutor, }, - Cmd, Never, Rsp, SwapAmounts, PUNISH_TIMELOCK, REFUND_TIMELOCK, + Cmd, Rsp, SwapAmounts, PUNISH_TIMELOCK, REFUND_TIMELOCK, }; use xmr_btc::{ alice, - bitcoin::{BroadcastSignedTransaction, BuildTxLockPsbt, SignTxLock}, - bob::{self, State0}, + bitcoin::{BroadcastSignedTransaction, EncryptedSignature, SignTxLock}, + bob::{self, action_generator, ReceiveTransferProof, State0}, + monero::CreateWalletForOutput, }; -// FIXME: This whole function is horrible, needs total re-write. -pub async fn swap( +pub async fn swap( + bitcoin_wallet: Arc, + monero_wallet: Arc, btc: u64, addr: Multiaddr, mut cmd_tx: Sender, mut rsp_rx: Receiver, - refund_address: ::bitcoin::Address, - wallet: W, -) -> Result<()> -where - W: BuildTxLockPsbt + SignTxLock + BroadcastSignedTransaction + Send + Sync + 'static, -{ +) -> Result<()> { + struct Network(Swarm); + + // TODO: For retry, use `backoff::ExponentialBackoff` in production as opposed + // to `ConstantBackoff`. + + #[async_trait] + impl ReceiveTransferProof for Network { + async fn receive_transfer_proof(&mut self) -> monero::TransferProof { + #[derive(Debug)] + struct UnexpectedMessage; + + let future = self.0.next().shared(); + + let proof = (|| async { + let proof = match future.clone().await { + OutEvent::Message2(msg) => msg.tx_lock_proof, + other => { + warn!("Expected transfer proof, got: {:?}", other); + return Err(backoff::Error::Transient(UnexpectedMessage)); + } + }; + + Result::<_, backoff::Error>::Ok(proof) + }) + .retry(ConstantBackoff::new(Duration::from_secs(1))) + .await + .expect("transient errors to be retried"); + + info!("Received transfer proof"); + + proof + } + } + let mut swarm = new_swarm()?; libp2p::Swarm::dial_addr(&mut swarm, addr)?; @@ -48,27 +87,29 @@ where OutEvent::ConnectionEstablished(alice) => alice, other => panic!("unexpected event: {:?}", other), }; - info!("Connection established."); + info!("Connection established with: {}", alice); swarm.request_amounts(alice.clone(), btc); let (btc, xmr) = match swarm.next().await { OutEvent::Amounts(amounts) => { - debug!("Got amounts from Alice: {:?}", amounts); + info!("Got amounts from Alice: {:?}", amounts); let cmd = Cmd::VerifyAmounts(amounts); cmd_tx.try_send(cmd)?; let response = rsp_rx.next().await; if response == Some(Rsp::Abort) { - info!("Amounts no good, aborting ..."); + info!("User rejected amounts proposed by Alice, aborting..."); process::exit(0); } - info!("User verified amounts, continuing with swap ..."); + info!("User accepted amounts proposed by Alice"); (amounts.btc, amounts.xmr) } other => panic!("unexpected event: {:?}", other), }; + let refund_address = bitcoin_wallet.new_address().await?; + // TODO: Pass this in using let rng = &mut OsRng; let state0 = State0::new( @@ -80,13 +121,11 @@ where refund_address, ); + info!("Commencing handshake"); + swarm.send_message0(alice.clone(), state0.next_message(rng)); let state1 = match swarm.next().await { - OutEvent::Message0(msg) => { - // TODO: Verify the response message before calling receive() and handle any - // error gracefully. - state0.receive(&wallet, msg).await? - } + OutEvent::Message0(msg) => state0.receive(bitcoin_wallet.as_ref(), msg).await?, other => panic!("unexpected event: {:?}", other), }; @@ -100,10 +139,65 @@ where swarm.send_message2(alice.clone(), state2.next_message()); - info!("Handshake complete, we now have State2 for Bob."); + info!("Handshake complete"); - thread::park(); - Ok(()) + let network = Arc::new(Mutex::new(Network(swarm))); + + let mut action_generator = action_generator( + network.clone(), + monero_wallet.clone(), + bitcoin_wallet.clone(), + state2, + TX_LOCK_MINE_TIMEOUT, + ); + + loop { + let state = action_generator.async_resume().await; + + info!("Resumed execution of generator, got: {:?}", state); + + match state { + GeneratorState::Yielded(bob::Action::LockBtc(tx_lock)) => { + let signed_tx_lock = bitcoin_wallet.sign_tx_lock(tx_lock).await?; + let _ = bitcoin_wallet + .broadcast_signed_transaction(signed_tx_lock) + .await?; + } + GeneratorState::Yielded(bob::Action::SendBtcRedeemEncsig(tx_redeem_encsig)) => { + let mut guard = network.as_ref().lock().await; + guard.0.send_message3(alice.clone(), tx_redeem_encsig); + info!("Sent Bitcoin redeem encsig"); + + // TODO: Does Bob need to wait for Alice to send an empty response, or can we + // just continue? + match guard.0.next().shared().await { + OutEvent::Message3 => { + debug!("Got Message3 empty response"); + } + other => panic!("unexpected event: {:?}", other), + }; + } + GeneratorState::Yielded(bob::Action::CreateXmrWalletForOutput { + spend_key, + view_key, + }) => { + monero_wallet + .create_and_load_wallet_for_output(spend_key, view_key) + .await?; + } + GeneratorState::Yielded(bob::Action::CancelBtc(tx_cancel)) => { + let _ = bitcoin_wallet + .broadcast_signed_transaction(tx_cancel) + .await?; + } + GeneratorState::Yielded(bob::Action::RefundBtc(tx_refund)) => { + let _ = bitcoin_wallet + .broadcast_signed_transaction(tx_refund) + .await?; + } + GeneratorState::Complete(()) => return Ok(()), + } + } } pub type Swarm = libp2p::Swarm; @@ -137,12 +231,14 @@ fn new_swarm() -> Result { } #[allow(clippy::large_enum_variant)] -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum OutEvent { ConnectionEstablished(PeerId), Amounts(SwapAmounts), Message0(alice::Message0), Message1(alice::Message1), + Message2(alice::Message2), + Message3, } impl From for OutEvent { @@ -179,9 +275,19 @@ impl From for OutEvent { } } -impl From for OutEvent { - fn from(_: Never) -> Self { - panic!("this never happens") +impl From for OutEvent { + fn from(event: message2::OutEvent) -> Self { + match event { + message2::OutEvent::Msg(msg) => OutEvent::Message2(msg), + } + } +} + +impl From for OutEvent { + fn from(event: message3::OutEvent) -> Self { + match event { + message3::OutEvent::Msg => OutEvent::Message3, + } } } @@ -195,6 +301,7 @@ pub struct Bob { message0: Message0, message1: Message1, message2: Message2, + message3: Message3, #[behaviour(ignore)] identity: Keypair, } @@ -212,22 +319,32 @@ impl Bob { pub fn request_amounts(&mut self, alice: PeerId, btc: u64) { let btc = ::bitcoin::Amount::from_sat(btc); let _id = self.amounts.request_amounts(alice.clone(), btc); - debug!("Requesting amounts from: {}", alice); + info!("Requesting amounts from: {}", alice); } /// Sends Bob's first message to Alice. pub fn send_message0(&mut self, alice: PeerId, msg: bob::Message0) { - self.message0.send(alice, msg) + self.message0.send(alice, msg); + debug!("Sent Message0"); } /// Sends Bob's second message to Alice. pub fn send_message1(&mut self, alice: PeerId, msg: bob::Message1) { - self.message1.send(alice, msg) + self.message1.send(alice, msg); + debug!("Sent Message1"); } /// Sends Bob's third message to Alice. pub fn send_message2(&mut self, alice: PeerId, msg: bob::Message2) { - self.message2.send(alice, msg) + self.message2.send(alice, msg); + debug!("Sent Message2"); + } + + /// Sends Bob's fourth message to Alice. + pub fn send_message3(&mut self, alice: PeerId, tx_redeem_encsig: EncryptedSignature) { + let msg = bob::Message3 { tx_redeem_encsig }; + self.message3.send(alice, msg); + debug!("Sent Message3"); } /// Returns Alice's peer id if we are connected. @@ -246,6 +363,7 @@ impl Default for Bob { message0: Message0::default(), message1: Message1::default(), message2: Message2::default(), + message3: Message3::default(), identity, } } diff --git a/swap/src/bob/amounts.rs b/swap/src/bob/amounts.rs index 46c71920..7428ad30 100644 --- a/swap/src/bob/amounts.rs +++ b/swap/src/bob/amounts.rs @@ -15,7 +15,7 @@ use std::{ use tracing::{debug, error}; use crate::{ - network::request_response::{AliceToBob, BobToAlice, Codec, Protocol, TIMEOUT}, + network::request_response::{AliceToBob, AmountsProtocol, BobToAlice, Codec, TIMEOUT}, SwapAmounts, }; @@ -29,7 +29,7 @@ pub enum OutEvent { #[behaviour(out_event = "OutEvent", poll_method = "poll")] #[allow(missing_debug_implementations)] pub struct Amounts { - rr: RequestResponse, + rr: RequestResponse>, #[behaviour(ignore)] events: VecDeque, } @@ -46,7 +46,7 @@ impl Amounts { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll, OutEvent>> { + ) -> Poll>, OutEvent>> { if let Some(event) = self.events.pop_front() { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); } @@ -65,7 +65,7 @@ impl Default for Amounts { Self { rr: RequestResponse::new( Codec::default(), - vec![(Protocol, ProtocolSupport::Full)], + vec![(AmountsProtocol, ProtocolSupport::Full)], config, ), events: Default::default(), @@ -83,10 +83,12 @@ impl NetworkBehaviourEventProcess> RequestResponseEvent::Message { message: RequestResponseMessage::Response { response, .. }, .. - } => match response { - AliceToBob::Amounts(p) => self.events.push_back(OutEvent::Amounts(p)), - other => debug!("got response: {:?}", other), - }, + } => { + if let AliceToBob::Amounts(p) = response { + debug!("Received amounts response"); + self.events.push_back(OutEvent::Amounts(p)); + } + } RequestResponseEvent::InboundFailure { error, .. } => { error!("Inbound failure: {:?}", error); } diff --git a/swap/src/bob/message0.rs b/swap/src/bob/message0.rs index 268244a1..89504fbb 100644 --- a/swap/src/bob/message0.rs +++ b/swap/src/bob/message0.rs @@ -13,7 +13,7 @@ use std::{ }; use tracing::{debug, error}; -use crate::network::request_response::{AliceToBob, BobToAlice, Codec, Protocol, TIMEOUT}; +use crate::network::request_response::{AliceToBob, BobToAlice, Codec, Message0Protocol, TIMEOUT}; use xmr_btc::{alice, bob}; #[derive(Debug)] @@ -26,7 +26,7 @@ pub enum OutEvent { #[behaviour(out_event = "OutEvent", poll_method = "poll")] #[allow(missing_debug_implementations)] pub struct Message0 { - rr: RequestResponse, + rr: RequestResponse>, #[behaviour(ignore)] events: VecDeque, } @@ -41,7 +41,7 @@ impl Message0 { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll, OutEvent>> { + ) -> Poll>, OutEvent>> { if let Some(event) = self.events.pop_front() { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); } @@ -59,7 +59,7 @@ impl Default for Message0 { Self { rr: RequestResponse::new( Codec::default(), - vec![(Protocol, ProtocolSupport::Full)], + vec![(Message0Protocol, ProtocolSupport::Full)], config, ), events: Default::default(), @@ -77,10 +77,12 @@ impl NetworkBehaviourEventProcess> RequestResponseEvent::Message { message: RequestResponseMessage::Response { response, .. }, .. - } => match response { - AliceToBob::Message0(msg) => self.events.push_back(OutEvent::Msg(msg)), - other => debug!("got response: {:?}", other), - }, + } => { + if let AliceToBob::Message0(msg) = response { + debug!("Received Message0"); + self.events.push_back(OutEvent::Msg(msg)); + } + } RequestResponseEvent::InboundFailure { error, .. } => { error!("Inbound failure: {:?}", error); } diff --git a/swap/src/bob/message1.rs b/swap/src/bob/message1.rs index 9e85ce3d..32fd6c52 100644 --- a/swap/src/bob/message1.rs +++ b/swap/src/bob/message1.rs @@ -13,7 +13,7 @@ use std::{ }; use tracing::{debug, error}; -use crate::network::request_response::{AliceToBob, BobToAlice, Codec, Protocol, TIMEOUT}; +use crate::network::request_response::{AliceToBob, BobToAlice, Codec, Message1Protocol, TIMEOUT}; use xmr_btc::{alice, bob}; #[derive(Debug)] @@ -26,7 +26,7 @@ pub enum OutEvent { #[behaviour(out_event = "OutEvent", poll_method = "poll")] #[allow(missing_debug_implementations)] pub struct Message1 { - rr: RequestResponse, + rr: RequestResponse>, #[behaviour(ignore)] events: VecDeque, } @@ -41,7 +41,7 @@ impl Message1 { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll, OutEvent>> { + ) -> Poll>, OutEvent>> { if let Some(event) = self.events.pop_front() { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); } @@ -59,7 +59,7 @@ impl Default for Message1 { Self { rr: RequestResponse::new( Codec::default(), - vec![(Protocol, ProtocolSupport::Full)], + vec![(Message1Protocol, ProtocolSupport::Full)], config, ), events: Default::default(), @@ -77,10 +77,12 @@ impl NetworkBehaviourEventProcess> RequestResponseEvent::Message { message: RequestResponseMessage::Response { response, .. }, .. - } => match response { - AliceToBob::Message1(msg) => self.events.push_back(OutEvent::Msg(msg)), - other => debug!("got response: {:?}", other), - }, + } => { + if let AliceToBob::Message1(msg) = response { + debug!("Received Message1"); + self.events.push_back(OutEvent::Msg(msg)); + } + } RequestResponseEvent::InboundFailure { error, .. } => { error!("Inbound failure: {:?}", error); } diff --git a/swap/src/bob/message2.rs b/swap/src/bob/message2.rs index e8fa295b..17425227 100644 --- a/swap/src/bob/message2.rs +++ b/swap/src/bob/message2.rs @@ -7,23 +7,28 @@ use libp2p::{ NetworkBehaviour, PeerId, }; use std::{ + collections::VecDeque, task::{Context, Poll}, time::Duration, }; use tracing::{debug, error}; -use crate::{ - network::request_response::{AliceToBob, BobToAlice, Codec, Protocol, TIMEOUT}, - Never, -}; -use xmr_btc::bob; +use crate::network::request_response::{AliceToBob, BobToAlice, Codec, Message2Protocol, TIMEOUT}; +use xmr_btc::{alice, bob}; + +#[derive(Debug)] +pub enum OutEvent { + Msg(alice::Message2), +} /// A `NetworkBehaviour` that represents sending message 2 to Alice. #[derive(NetworkBehaviour)] -#[behaviour(out_event = "Never", poll_method = "poll")] +#[behaviour(out_event = "OutEvent", poll_method = "poll")] #[allow(missing_debug_implementations)] pub struct Message2 { - rr: RequestResponse, + rr: RequestResponse>, + #[behaviour(ignore)] + events: VecDeque, } impl Message2 { @@ -32,13 +37,15 @@ impl Message2 { let _id = self.rr.send_request(&alice, msg); } - // TODO: Do we need a custom implementation if we are not bubbling any out - // events? fn poll( &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll, Never>> { + ) -> Poll>, OutEvent>> { + if let Some(event) = self.events.pop_front() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + } + Poll::Pending } } @@ -52,9 +59,10 @@ impl Default for Message2 { Self { rr: RequestResponse::new( Codec::default(), - vec![(Protocol, ProtocolSupport::Full)], + vec![(Message2Protocol, ProtocolSupport::Full)], config, ), + events: VecDeque::default(), } } } @@ -69,10 +77,12 @@ impl NetworkBehaviourEventProcess> RequestResponseEvent::Message { message: RequestResponseMessage::Response { response, .. }, .. - } => match response { - AliceToBob::EmptyResponse => debug!("Alice correctly responded to message 2"), - other => debug!("unexpected response: {:?}", other), - }, + } => { + if let AliceToBob::Message2(msg) = response { + debug!("Received Message2"); + self.events.push_back(OutEvent::Msg(msg)); + } + } RequestResponseEvent::InboundFailure { error, .. } => { error!("Inbound failure: {:?}", error); } diff --git a/swap/src/bob/message3.rs b/swap/src/bob/message3.rs new file mode 100644 index 00000000..20873bfa --- /dev/null +++ b/swap/src/bob/message3.rs @@ -0,0 +1,93 @@ +use libp2p::{ + request_response::{ + handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, + RequestResponseEvent, RequestResponseMessage, + }, + swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}, + NetworkBehaviour, PeerId, +}; +use std::{ + collections::VecDeque, + task::{Context, Poll}, + time::Duration, +}; +use tracing::error; + +use crate::network::request_response::{AliceToBob, BobToAlice, Codec, Message3Protocol, TIMEOUT}; +use xmr_btc::bob; + +#[derive(Debug)] +pub enum OutEvent { + Msg, +} + +/// A `NetworkBehaviour` that represents sending message 3 to Alice. +#[derive(NetworkBehaviour)] +#[behaviour(out_event = "OutEvent", poll_method = "poll")] +#[allow(missing_debug_implementations)] +pub struct Message3 { + rr: RequestResponse>, + #[behaviour(ignore)] + events: VecDeque, +} + +impl Message3 { + pub fn send(&mut self, alice: PeerId, msg: bob::Message3) { + let msg = BobToAlice::Message3(msg); + let _id = self.rr.send_request(&alice, msg); + } + + fn poll( + &mut self, + _: &mut Context<'_>, + _: &mut impl PollParameters, + ) -> Poll>, OutEvent>> { + if let Some(event) = self.events.pop_front() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + } + + Poll::Pending + } +} + +impl Default for Message3 { + fn default() -> Self { + let timeout = Duration::from_secs(TIMEOUT); + let mut config = RequestResponseConfig::default(); + config.set_request_timeout(timeout); + + Self { + rr: RequestResponse::new( + Codec::default(), + vec![(Message3Protocol, ProtocolSupport::Full)], + config, + ), + events: Default::default(), + } + } +} + +impl NetworkBehaviourEventProcess> for Message3 { + fn inject_event(&mut self, event: RequestResponseEvent) { + match event { + RequestResponseEvent::Message { + message: RequestResponseMessage::Request { .. }, + .. + } => panic!("Bob should never get a request from Alice"), + RequestResponseEvent::Message { + message: RequestResponseMessage::Response { response, .. }, + .. + } => { + if let AliceToBob::Message3 = response { + self.events.push_back(OutEvent::Msg); + } + } + RequestResponseEvent::InboundFailure { error, .. } => { + error!("Inbound failure: {:?}", error); + } + RequestResponseEvent::OutboundFailure { error, .. } => { + error!("Outbound failure: {:?}", error); + } + } + } +} diff --git a/swap/src/lib.rs b/swap/src/lib.rs index 7a85eeed..0d682e77 100644 --- a/swap/src/lib.rs +++ b/swap/src/lib.rs @@ -4,15 +4,14 @@ use std::fmt::{self, Display}; pub mod alice; pub mod bitcoin; pub mod bob; +pub mod monero; pub mod network; pub mod storage; #[cfg(feature = "tor")] pub mod tor; -pub const ONE_BTC: u64 = 100_000_000; - const REFUND_TIMELOCK: u32 = 10; // Relative timelock, this is number of blocks. TODO: What should it be? -const PUNISH_TIMELOCK: u32 = 20; // FIXME: What should this be? +const PUNISH_TIMELOCK: u32 = 10; // FIXME: What should this be? pub type Never = std::convert::Infallible; diff --git a/swap/src/main.rs b/swap/src/main.rs index e981dccd..c2b0b4b6 100644 --- a/swap/src/main.rs +++ b/swap/src/main.rs @@ -13,28 +13,28 @@ #![forbid(unsafe_code)] use anyhow::{bail, Context, Result}; -use cli::Options; use futures::{channel::mpsc, StreamExt}; use libp2p::Multiaddr; use log::LevelFilter; -use std::{io, io::Write, process}; +use std::{io, io::Write, process, sync::Arc}; use structopt::StructOpt; -use swap::{alice, bitcoin::Wallet, bob, Cmd, Rsp, SwapAmounts}; use tracing::info; use url::Url; -use xmr_btc::bitcoin::{BroadcastSignedTransaction, BuildTxLockPsbt, SignTxLock}; mod cli; mod trace; +use cli::Options; +use swap::{alice, bitcoin, bob, monero, Cmd, Rsp, SwapAmounts}; + // TODO: Add root seed file instead of generating new seed each run. -// TODO: Remove all instances of the todo! macro // TODO: Add a config file with these in it. // Alice's address and port until we have a config file. pub const PORT: u16 = 9876; // Arbitrarily chosen. pub const ADDR: &str = "127.0.0.1"; pub const BITCOIND_JSON_RPC_URL: &str = "http://127.0.0.1:8332"; +pub const MONERO_WALLET_RPC_PORT: u16 = 18083; #[cfg(feature = "tor")] pub const TOR_PORT: u16 = PORT + 1; @@ -70,45 +70,37 @@ async fn main() -> Result<()> { } let url = Url::parse(BITCOIND_JSON_RPC_URL).expect("failed to parse url"); - let bitcoin_wallet = Wallet::new("alice", &url) + let bitcoin_wallet = bitcoin::Wallet::new("alice", &url) .await .expect("failed to create bitcoin wallet"); + let bitcoin_wallet = Arc::new(bitcoin_wallet); - let redeem = bitcoin_wallet - .new_address() - .await - .expect("failed to get new redeem address"); - let punish = bitcoin_wallet - .new_address() - .await - .expect("failed to get new punish address"); + let monero_wallet = Arc::new(monero::Wallet::localhost(MONERO_WALLET_RPC_PORT)); - swap_as_alice(alice.clone(), redeem, punish).await?; + swap_as_alice(bitcoin_wallet, monero_wallet, alice.clone()).await?; } else { info!("running swap node as Bob ..."); - let alice_address = match opt.alice_address { + let alice = match opt.alice_address { Some(addr) => addr, None => bail!("Address required to dial"), }; - let alice_address = multiaddr(&alice_address)?; + let alice = multiaddr(&alice)?; let url = Url::parse(BITCOIND_JSON_RPC_URL).expect("failed to parse url"); - let bitcoin_wallet = Wallet::new("bob", &url) + let bitcoin_wallet = bitcoin::Wallet::new("bob", &url) .await .expect("failed to create bitcoin wallet"); + let bitcoin_wallet = Arc::new(bitcoin_wallet); - let refund = bitcoin_wallet - .new_address() - .await - .expect("failed to get new address"); + let monero_wallet = Arc::new(monero::Wallet::localhost(MONERO_WALLET_RPC_PORT)); match (opt.piconeros, opt.satoshis) { (Some(_), Some(_)) => bail!("Please supply only a single amount to swap"), (None, None) => bail!("Please supply an amount to swap"), (Some(_picos), _) => todo!("support starting with picos"), (None, Some(sats)) => { - swap_as_bob(sats, alice_address, refund, bitcoin_wallet).await?; + swap_as_bob(bitcoin_wallet, monero_wallet, sats, alice).await?; } }; } @@ -135,32 +127,36 @@ async fn create_tor_service( } async fn swap_as_alice( + bitcoin_wallet: Arc, + monero_wallet: Arc, addr: Multiaddr, - redeem: bitcoin::Address, - punish: bitcoin::Address, ) -> Result<()> { #[cfg(not(feature = "tor"))] { - alice::swap(addr, None, redeem, punish).await + alice::swap(bitcoin_wallet, monero_wallet, addr, None).await } #[cfg(feature = "tor")] { - alice::swap(addr, Some(PORT), redeem, punish).await + alice::swap(bitcoin_wallet, monero_wallet, addr, Some(PORT)).await } } -async fn swap_as_bob( +async fn swap_as_bob( + bitcoin_wallet: Arc, + monero_wallet: Arc, sats: u64, alice: Multiaddr, - refund: bitcoin::Address, - wallet: W, -) -> Result<()> -where - W: BuildTxLockPsbt + SignTxLock + BroadcastSignedTransaction + Send + Sync + 'static, -{ +) -> Result<()> { let (cmd_tx, mut cmd_rx) = mpsc::channel(1); let (mut rsp_tx, rsp_rx) = mpsc::channel(1); - tokio::spawn(bob::swap(sats, alice, cmd_tx, rsp_rx, refund, wallet)); + tokio::spawn(bob::swap( + bitcoin_wallet, + monero_wallet, + sats, + alice, + cmd_tx, + rsp_rx, + )); loop { let read = cmd_rx.next().await; diff --git a/swap/src/monero.rs b/swap/src/monero.rs new file mode 100644 index 00000000..0d215738 --- /dev/null +++ b/swap/src/monero.rs @@ -0,0 +1,135 @@ +use anyhow::Result; +use async_trait::async_trait; +use backoff::{backoff::Constant as ConstantBackoff, future::FutureOperation as _}; +use monero::{Address, Network, PrivateKey}; +use monero_harness::rpc::wallet; +use std::{str::FromStr, time::Duration}; + +pub use xmr_btc::monero::{ + Amount, CreateWalletForOutput, InsufficientFunds, PrivateViewKey, PublicKey, PublicViewKey, + Transfer, TransferProof, TxHash, WatchForTransfer, *, +}; + +pub struct Wallet(pub wallet::Client); + +impl Wallet { + pub fn localhost(port: u16) -> Self { + Self(wallet::Client::localhost(port)) + } + + /// Get the balance of the primary account. + pub async fn get_balance(&self) -> Result { + let amount = self.0.get_balance(0).await?; + + Ok(Amount::from_piconero(amount)) + } +} + +#[async_trait] +impl Transfer for Wallet { + async fn transfer( + &self, + public_spend_key: PublicKey, + public_view_key: PublicViewKey, + amount: Amount, + ) -> Result<(TransferProof, Amount)> { + let destination_address = + Address::standard(Network::Mainnet, public_spend_key, public_view_key.into()); + + let res = self + .0 + .transfer(0, amount.as_piconero(), &destination_address.to_string()) + .await?; + + let tx_hash = TxHash(res.tx_hash); + let tx_key = PrivateKey::from_str(&res.tx_key)?; + + let fee = Amount::from_piconero(res.fee); + + Ok((TransferProof::new(tx_hash, tx_key), fee)) + } +} + +#[async_trait] +impl CreateWalletForOutput for Wallet { + async fn create_and_load_wallet_for_output( + &self, + private_spend_key: PrivateKey, + private_view_key: PrivateViewKey, + ) -> Result<()> { + let public_spend_key = PublicKey::from_private_key(&private_spend_key); + let public_view_key = PublicKey::from_private_key(&private_view_key.into()); + + let address = Address::standard(Network::Mainnet, public_spend_key, public_view_key); + + let _ = self + .0 + .generate_from_keys( + &address.to_string(), + &private_spend_key.to_string(), + &PrivateKey::from(private_view_key).to_string(), + ) + .await?; + + Ok(()) + } +} + +// TODO: For retry, use `backoff::ExponentialBackoff` in production as opposed +// to `ConstantBackoff`. + +#[async_trait] +impl WatchForTransfer for Wallet { + async fn watch_for_transfer( + &self, + public_spend_key: PublicKey, + public_view_key: PublicViewKey, + transfer_proof: TransferProof, + expected_amount: Amount, + expected_confirmations: u32, + ) -> Result<(), InsufficientFunds> { + enum Error { + TxNotFound, + InsufficientConfirmations, + InsufficientFunds { expected: Amount, actual: Amount }, + } + + let address = Address::standard(Network::Mainnet, public_spend_key, public_view_key.into()); + + let res = (|| async { + // NOTE: Currently, this is conflating IO errors with the transaction not being + // in the blockchain yet, or not having enough confirmations on it. All these + // errors warrant a retry, but the strategy should probably differ per case + let proof = self + .0 + .check_tx_key( + &String::from(transfer_proof.tx_hash()), + &transfer_proof.tx_key().to_string(), + &address.to_string(), + ) + .await + .map_err(|_| backoff::Error::Transient(Error::TxNotFound))?; + + if proof.received != expected_amount.as_piconero() { + return Err(backoff::Error::Permanent(Error::InsufficientFunds { + expected: expected_amount, + actual: Amount::from_piconero(proof.received), + })); + } + + if proof.confirmations < expected_confirmations { + return Err(backoff::Error::Transient(Error::InsufficientConfirmations)); + } + + Ok(proof) + }) + .retry(ConstantBackoff::new(Duration::from_secs(1))) + .await; + + if let Err(Error::InsufficientFunds { expected, actual }) = res { + return Err(InsufficientFunds { expected, actual }); + }; + + Ok(()) + } +} diff --git a/swap/src/network/request_response.rs b/swap/src/network/request_response.rs index f8d44deb..a042debc 100644 --- a/swap/src/network/request_response.rs +++ b/swap/src/network/request_response.rs @@ -5,7 +5,8 @@ use libp2p::{ request_response::{ProtocolName, RequestResponseCodec}, }; use serde::{Deserialize, Serialize}; -use std::{fmt::Debug, io}; +use std::{fmt::Debug, io, marker::PhantomData}; +use tracing::debug; use crate::SwapAmounts; use xmr_btc::{alice, bob, monero}; @@ -13,6 +14,9 @@ use xmr_btc::{alice, bob, monero}; /// Time to wait for a response back once we send a request. pub const TIMEOUT: u64 = 3600; // One hour. +/// Message receive buffer. +const BUF_SIZE: usize = 1024 * 1024; + // TODO: Think about whether there is a better way to do this, e.g., separate // Codec for each Message and a macro that implements them. @@ -26,6 +30,7 @@ pub enum BobToAlice { Message0(bob::Message0), Message1(bob::Message1), Message2(bob::Message2), + Message3(bob::Message3), } /// Messages Alice sends to Bob. @@ -35,25 +40,66 @@ pub enum AliceToBob { Amounts(SwapAmounts), Message0(alice::Message0), Message1(alice::Message1), - EmptyResponse, // This is sent back as response to Message2 from Bob. Message2(alice::Message2), + Message3, // empty response } #[derive(Debug, Clone, Copy, Default)] -pub struct Protocol; +pub struct AmountsProtocol; -impl ProtocolName for Protocol { +#[derive(Debug, Clone, Copy, Default)] +pub struct Message0Protocol; + +#[derive(Debug, Clone, Copy, Default)] +pub struct Message1Protocol; + +#[derive(Debug, Clone, Copy, Default)] +pub struct Message2Protocol; + +#[derive(Debug, Clone, Copy, Default)] +pub struct Message3Protocol; + +impl ProtocolName for AmountsProtocol { fn protocol_name(&self) -> &[u8] { - b"/xmr/btc/1.0.0" + b"/xmr/btc/amounts/1.0.0" + } +} + +impl ProtocolName for Message0Protocol { + fn protocol_name(&self) -> &[u8] { + b"/xmr/btc/message0/1.0.0" + } +} + +impl ProtocolName for Message1Protocol { + fn protocol_name(&self) -> &[u8] { + b"/xmr/btc/message1/1.0.0" + } +} + +impl ProtocolName for Message2Protocol { + fn protocol_name(&self) -> &[u8] { + b"/xmr/btc/message2/1.0.0" + } +} + +impl ProtocolName for Message3Protocol { + fn protocol_name(&self) -> &[u8] { + b"/xmr/btc/message3/1.0.0" } } #[derive(Clone, Copy, Debug, Default)] -pub struct Codec; +pub struct Codec

{ + phantom: PhantomData

, +} #[async_trait] -impl RequestResponseCodec for Codec { - type Protocol = Protocol; +impl

RequestResponseCodec for Codec

+where + P: Send + Sync + Clone + ProtocolName, +{ + type Protocol = P; type Request = BobToAlice; type Response = AliceToBob; @@ -61,11 +107,15 @@ impl RequestResponseCodec for Codec { where T: AsyncRead + Unpin + Send, { - let message = upgrade::read_one(io, 1024) + debug!("enter read_request"); + let message = upgrade::read_one(io, BUF_SIZE) .await .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - let mut de = serde_json::Deserializer::from_slice(&message); - let msg = BobToAlice::deserialize(&mut de)?; + let mut de = serde_cbor::Deserializer::from_slice(&message); + let msg = BobToAlice::deserialize(&mut de).map_err(|e| { + tracing::debug!("serde read_request error: {:?}", e); + io::Error::new(io::ErrorKind::Other, e) + })?; Ok(msg) } @@ -78,11 +128,15 @@ impl RequestResponseCodec for Codec { where T: AsyncRead + Unpin + Send, { - let message = upgrade::read_one(io, 1024) + debug!("enter read_response"); + let message = upgrade::read_one(io, BUF_SIZE) .await .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - let mut de = serde_json::Deserializer::from_slice(&message); - let msg = AliceToBob::deserialize(&mut de)?; + let mut de = serde_cbor::Deserializer::from_slice(&message); + let msg = AliceToBob::deserialize(&mut de).map_err(|e| { + tracing::debug!("serde read_response error: {:?}", e); + io::Error::new(io::ErrorKind::InvalidData, e) + })?; Ok(msg) } @@ -96,7 +150,9 @@ impl RequestResponseCodec for Codec { where T: AsyncWrite + Unpin + Send, { - let bytes = serde_json::to_vec(&req)?; + let bytes = + serde_cbor::to_vec(&req).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + upgrade::write_one(io, &bytes).await?; Ok(()) @@ -111,7 +167,11 @@ impl RequestResponseCodec for Codec { where T: AsyncWrite + Unpin + Send, { - let bytes = serde_json::to_vec(&res)?; + debug!("enter write_response"); + let bytes = serde_cbor::to_vec(&res).map_err(|e| { + tracing::debug!("serde write_reponse error: {:?}", e); + io::Error::new(io::ErrorKind::InvalidData, e) + })?; upgrade::write_one(io, &bytes).await?; Ok(()) diff --git a/swap/tests/e2e.rs b/swap/tests/e2e.rs new file mode 100644 index 00000000..09ba8fb0 --- /dev/null +++ b/swap/tests/e2e.rs @@ -0,0 +1,100 @@ +#[cfg(not(feature = "tor"))] +mod e2e_test { + use bitcoin_harness::Bitcoind; + use futures::{channel::mpsc, future::try_join}; + use libp2p::Multiaddr; + use monero_harness::Monero; + use std::sync::Arc; + use swap::{alice, bob}; + use testcontainers::clients::Cli; + use tracing_subscriber::util::SubscriberInitExt; + + #[tokio::test] + async fn swap() { + let _guard = tracing_subscriber::fmt() + .with_env_filter( + "swap=debug,xmr_btc=debug,hyper=off,reqwest=off,monero_harness=info,testcontainers=info,libp2p=debug", + ) + .with_ansi(false) + .set_default(); + + let alice_multiaddr: Multiaddr = "/ip4/127.0.0.1/tcp/9876" + .parse() + .expect("failed to parse Alice's address"); + + let cli = Cli::default(); + let bitcoind = Bitcoind::new(&cli, "0.19.1").unwrap(); + let _ = bitcoind.init(5).await; + + let btc = bitcoin::Amount::from_sat(1_000_000); + let btc_alice = bitcoin::Amount::ZERO; + let btc_bob = btc * 10; + + // this xmr value matches the logic of alice::calculate_amounts i.e. btc * + // 10_000 * 100 + let xmr = 1_000_000_000_000; + let xmr_alice = xmr * 10; + let xmr_bob = 0; + + let alice_btc_wallet = Arc::new( + swap::bitcoin::Wallet::new("alice", &bitcoind.node_url) + .await + .unwrap(), + ); + let bob_btc_wallet = Arc::new( + swap::bitcoin::Wallet::new("bob", &bitcoind.node_url) + .await + .unwrap(), + ); + bitcoind + .mint(bob_btc_wallet.0.new_address().await.unwrap(), btc_bob) + .await + .unwrap(); + + let (monero, _container) = Monero::new(&cli).unwrap(); + monero.init(xmr_alice, xmr_bob).await.unwrap(); + + let alice_xmr_wallet = Arc::new(swap::monero::Wallet(monero.alice_wallet_rpc_client())); + let bob_xmr_wallet = Arc::new(swap::monero::Wallet(monero.bob_wallet_rpc_client())); + + let alice_swap = alice::swap( + alice_btc_wallet.clone(), + alice_xmr_wallet.clone(), + alice_multiaddr.clone(), + None, + ); + + let (cmd_tx, mut _cmd_rx) = mpsc::channel(1); + let (mut rsp_tx, rsp_rx) = mpsc::channel(1); + let bob_swap = bob::swap( + bob_btc_wallet.clone(), + bob_xmr_wallet.clone(), + btc.as_sat(), + alice_multiaddr, + cmd_tx, + rsp_rx, + ); + + // automate the verification step by accepting any amounts sent over by Alice + rsp_tx.try_send(swap::Rsp::VerifiedAmounts).unwrap(); + + try_join(alice_swap, bob_swap).await.unwrap(); + + let btc_alice_final = alice_btc_wallet.as_ref().balance().await.unwrap(); + let btc_bob_final = bob_btc_wallet.as_ref().balance().await.unwrap(); + + let xmr_alice_final = alice_xmr_wallet.as_ref().get_balance().await.unwrap(); + + monero.wait_for_bob_wallet_block_height().await.unwrap(); + let xmr_bob_final = bob_xmr_wallet.as_ref().get_balance().await.unwrap(); + + assert_eq!( + btc_alice_final, + btc_alice + btc - bitcoin::Amount::from_sat(xmr_btc::bitcoin::TX_FEE) + ); + assert!(btc_bob_final <= btc_bob - btc); + + assert!(xmr_alice_final.as_piconero() <= xmr_alice - xmr); + assert_eq!(xmr_bob_final.as_piconero(), xmr_bob + xmr); + } +} diff --git a/xmr-btc/Cargo.toml b/xmr-btc/Cargo.toml index 22dfc30b..7d13407d 100644 --- a/xmr-btc/Cargo.toml +++ b/xmr-btc/Cargo.toml @@ -28,7 +28,7 @@ tracing = "0.1" [dev-dependencies] backoff = { version = "0.2", features = ["tokio"] } base64 = "0.12" -bitcoin-harness = { git = "https://github.com/coblox/bitcoin-harness-rs", rev = "7ff30a559ab57cc3aa71189e71433ef6b2a6c3a2" } +bitcoin-harness = { git = "https://github.com/coblox/bitcoin-harness-rs", rev = "f1bbe6a4540d0741f1f4f22577cfeeadbfd7aaaf" } futures = "0.3" monero-harness = { path = "../monero-harness" } reqwest = { version = "0.10", default-features = false } diff --git a/xmr-btc/src/alice.rs b/xmr-btc/src/alice.rs index 95e12ed6..b6392b04 100644 --- a/xmr-btc/src/alice.rs +++ b/xmr-btc/src/alice.rs @@ -24,8 +24,8 @@ use std::{ sync::Arc, time::Duration, }; -use tokio::time::timeout; -use tracing::error; +use tokio::{sync::Mutex, time::timeout}; +use tracing::{error, info}; pub mod message; pub use message::{Message, Message0, Message1, Message2}; @@ -62,7 +62,7 @@ pub trait ReceiveBitcoinRedeemEncsig { /// The argument `bitcoin_tx_lock_timeout` is used to determine how long we will /// wait for Bob, the counterparty, to lock up the bitcoin. pub fn action_generator( - mut network: N, + network: Arc>, bitcoin_client: Arc, // TODO: Replace this with a new, slimmer struct? State3 { @@ -86,7 +86,7 @@ pub fn action_generator( bitcoin_tx_lock_timeout: u64, ) -> GenBoxed where - N: ReceiveBitcoinRedeemEncsig + Send + Sync + 'static, + N: ReceiveBitcoinRedeemEncsig + Send + 'static, B: bitcoin::BlockHeight + bitcoin::TransactionBlockHeight + bitcoin::WatchForRawTransaction @@ -158,19 +158,26 @@ where // TODO: Watch for LockXmr using watch-only wallet. Doing so will prevent Alice // from cancelling/refunding unnecessarily. - let tx_redeem_encsig = match select( - network.receive_bitcoin_redeem_encsig(), - poll_until_btc_has_expired.clone(), - ) - .await - { - Either::Left((encsig, _)) => encsig, - Either::Right(_) => { - return Err(SwapFailed::AfterXmrLock { - reason: Reason::BtcExpired, - tx_lock_height, - }) - } + let tx_redeem_encsig = { + let mut guard = network.as_ref().lock().await; + let tx_redeem_encsig = match select( + guard.receive_bitcoin_redeem_encsig(), + poll_until_btc_has_expired.clone(), + ) + .await + { + Either::Left((encsig, _)) => encsig, + Either::Right(_) => { + return Err(SwapFailed::AfterXmrLock { + reason: Reason::BtcExpired, + tx_lock_height, + }) + } + }; + + tracing::debug!("select returned redeem encsig from message"); + + tx_redeem_encsig }; let (signed_tx_redeem, tx_redeem_txid) = { @@ -519,6 +526,7 @@ impl State0 { } pub fn next_message(&self, rng: &mut R) -> Message0 { + info!("Producing first message"); let dleq_proof_s_a = cross_curve_dleq::Proof::new(rng, &self.s_a); Message0 { diff --git a/xmr-btc/src/alice/message.rs b/xmr-btc/src/alice/message.rs index 30958a83..b82f6f9f 100644 --- a/xmr-btc/src/alice/message.rs +++ b/xmr-btc/src/alice/message.rs @@ -31,7 +31,7 @@ pub struct Message1 { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Message2 { - pub(crate) tx_lock_proof: monero::TransferProof, + pub tx_lock_proof: monero::TransferProof, } impl_try_from_parent_enum!(Message0, Message); diff --git a/xmr-btc/src/bob.rs b/xmr-btc/src/bob.rs index ac1180c9..8039a250 100644 --- a/xmr-btc/src/bob.rs +++ b/xmr-btc/src/bob.rs @@ -28,7 +28,7 @@ use std::{ sync::Arc, time::Duration, }; -use tokio::time::timeout; +use tokio::{sync::Mutex, time::timeout}; use tracing::error; pub mod message; @@ -62,7 +62,7 @@ pub trait ReceiveTransferProof { /// The argument `bitcoin_tx_lock_timeout` is used to determine how long we will /// wait for Bob, the caller of this function, to lock up the bitcoin. pub fn action_generator( - mut network: N, + network: Arc>, monero_client: Arc, bitcoin_client: Arc, // TODO: Replace this with a new, slimmer struct? @@ -85,7 +85,7 @@ pub fn action_generator( bitcoin_tx_lock_timeout: u64, ) -> GenBoxed where - N: ReceiveTransferProof + Send + Sync + 'static, + N: ReceiveTransferProof + Send + 'static, M: monero::WatchForTransfer + Send + Sync + 'static, B: bitcoin::BlockHeight + bitcoin::TransactionBlockHeight @@ -140,14 +140,21 @@ where .shared(); pin_mut!(poll_until_btc_has_expired); - let transfer_proof = match select( - network.receive_transfer_proof(), - poll_until_btc_has_expired.clone(), - ) - .await - { - Either::Left((proof, _)) => proof, - Either::Right(_) => return Err(SwapFailed::AfterBtcLock(Reason::BtcExpired)), + let transfer_proof = { + let mut guard = network.as_ref().lock().await; + let transfer_proof = match select( + guard.receive_transfer_proof(), + poll_until_btc_has_expired.clone(), + ) + .await + { + Either::Left((proof, _)) => proof, + Either::Right(_) => return Err(SwapFailed::AfterBtcLock(Reason::BtcExpired)), + }; + + tracing::debug!("select returned transfer proof from message"); + + transfer_proof }; let S_b_monero = monero::PublicKey::from_private_key(&monero::PrivateKey::from_scalar( diff --git a/xmr-btc/src/bob/message.rs b/xmr-btc/src/bob/message.rs index b6bed872..178c0218 100644 --- a/xmr-btc/src/bob/message.rs +++ b/xmr-btc/src/bob/message.rs @@ -33,9 +33,9 @@ pub struct Message2 { pub(crate) tx_cancel_sig: Signature, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Message3 { - pub(crate) tx_redeem_encsig: EncryptedSignature, + pub tx_redeem_encsig: EncryptedSignature, } impl_try_from_parent_enum!(Message0, Message); diff --git a/xmr-btc/tests/on_chain.rs b/xmr-btc/tests/on_chain.rs index f7293011..e61db076 100644 --- a/xmr-btc/tests/on_chain.rs +++ b/xmr-btc/tests/on_chain.rs @@ -1,7 +1,5 @@ pub mod harness; -use std::{convert::TryInto, sync::Arc}; - use anyhow::Result; use async_trait::async_trait; use futures::{ @@ -16,9 +14,10 @@ use harness::{ }; use monero_harness::Monero; use rand::rngs::OsRng; +use std::{convert::TryInto, sync::Arc}; use testcontainers::clients::Cli; +use tokio::sync::Mutex; use tracing::info; -use tracing_subscriber::util::SubscriberInitExt; use xmr_btc::{ alice::{self, ReceiveBitcoinRedeemEncsig}, bitcoin::{BroadcastSignedTransaction, EncryptedSignature, SignTxLock}, @@ -102,7 +101,7 @@ impl Default for BobBehaviour { } async fn swap_as_alice( - network: AliceNetwork, + network: Arc>, // FIXME: It would be more intuitive to have a single network/transport struct instead of // splitting into two, but Rust ownership rules make this tedious mut sender: Sender, @@ -168,7 +167,7 @@ async fn swap_as_alice( } async fn swap_as_bob( - network: BobNetwork, + network: Arc>, mut sender: Sender, monero_wallet: Arc, bitcoin_wallet: Arc, @@ -276,7 +275,7 @@ async fn on_chain_happy_path() { try_join( swap_as_alice( - alice_network, + Arc::new(Mutex::new(alice_network)), alice_sender, alice_monero_wallet.clone(), alice_bitcoin_wallet.clone(), @@ -284,7 +283,7 @@ async fn on_chain_happy_path() { alice, ), swap_as_bob( - bob_network, + Arc::new(Mutex::new(bob_network)), bob_sender, bob_monero_wallet.clone(), bob_bitcoin_wallet.clone(), @@ -367,7 +366,7 @@ async fn on_chain_both_refund_if_alice_never_redeems() { try_join( swap_as_alice( - alice_network, + Arc::new(Mutex::new(alice_network)), alice_sender, alice_monero_wallet.clone(), alice_bitcoin_wallet.clone(), @@ -378,7 +377,7 @@ async fn on_chain_both_refund_if_alice_never_redeems() { alice, ), swap_as_bob( - bob_network, + Arc::new(Mutex::new(bob_network)), bob_sender, bob_monero_wallet.clone(), bob_bitcoin_wallet.clone(), @@ -419,11 +418,6 @@ async fn on_chain_both_refund_if_alice_never_redeems() { #[tokio::test] async fn on_chain_alice_punishes_if_bob_never_acts_after_fund() { - let _guard = tracing_subscriber::fmt() - .with_env_filter("info") - .with_ansi(false) - .set_default(); - let cli = Cli::default(); let (monero, _container) = Monero::new(&cli).unwrap(); let bitcoind = init_bitcoind(&cli).await; @@ -461,7 +455,7 @@ async fn on_chain_alice_punishes_if_bob_never_acts_after_fund() { let (bob_network, alice_sender) = Network::::new(); let alice_swap = swap_as_alice( - alice_network, + Arc::new(Mutex::new(alice_network)), alice_sender, alice_monero_wallet.clone(), alice_bitcoin_wallet.clone(), @@ -469,7 +463,7 @@ async fn on_chain_alice_punishes_if_bob_never_acts_after_fund() { alice, ); let bob_swap = swap_as_bob( - bob_network, + Arc::new(Mutex::new(bob_network)), bob_sender, bob_monero_wallet.clone(), bob_bitcoin_wallet.clone(),