diff --git a/Cargo.lock b/Cargo.lock index fbbec2b2..c10cf499 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -481,6 +481,17 @@ dependencies = [ "byte-tools", ] +[[package]] +name = "bmrng" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ba7ad917af2fc43efa0b20d2bf3b2c1bd1090fa2a6b8c73847458c8335dea2b" +dependencies = [ + "futures", + "loom", + "tokio 1.4.0", +] + [[package]] name = "bs58" version = "0.4.0" @@ -1181,6 +1192,19 @@ dependencies = [ "byteorder", ] +[[package]] +name = "generator" +version = "0.6.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "061d3be1afec479d56fa3bd182bf966c7999ec175fcfdb87ac14d417241366c6" +dependencies = [ + "cc", + "libc", + "log 0.4.14", + "rustversion", + "winapi 0.3.9", +] + [[package]] name = "generic-array" version = "0.12.4" @@ -1909,6 +1933,20 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "loom" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d44c73b4636e497b4917eb21c33539efa3816741a2d3ff26c6316f1b529481a4" +dependencies = [ + "cfg-if 1.0.0", + "futures-util", + "generator", + "scoped-tls", + "serde", + "serde_json", +] + [[package]] name = "lru" version = "0.6.5" @@ -3014,6 +3052,12 @@ dependencies = [ "webpki", ] +[[package]] +name = "rustversion" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb5d2a036dc6d2d8fd16fde3498b04306e29bd193bf306a57427019b823d5acd" + [[package]] name = "rw-stream-sink" version = "0.2.1" @@ -3037,6 +3081,12 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef703b7cb59335eae2eb93ceb664c0eb7ea6bf567079d843e09420219668e072" +[[package]] +name = "scoped-tls" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea6a9290e3c9cf0f18145ef7ffa62d68ee0bf5fcd651017e586dc7fd5da448c2" + [[package]] name = "scopeguard" version = "1.1.0" @@ -3522,6 +3572,7 @@ dependencies = [ "big-bytes", "bitcoin", "bitcoin-harness", + "bmrng", "config", "conquer-once", "curve25519-dalek", diff --git a/swap/Cargo.toml b/swap/Cargo.toml index d36589dc..916ba533 100644 --- a/swap/Cargo.toml +++ b/swap/Cargo.toml @@ -18,6 +18,7 @@ base64 = "0.13" bdk = { version = "0.5" } big-bytes = "1" bitcoin = { version = "0.26", features = ["rand", "use-serde"] } +bmrng = "0.5" config = { version = "0.11", default-features = false, features = ["toml"] } conquer-once = "0.3" curve25519-dalek = "3" diff --git a/swap/src/protocol/bob.rs b/swap/src/protocol/bob.rs index f9ea8142..bb411560 100644 --- a/swap/src/protocol/bob.rs +++ b/swap/src/protocol/bob.rs @@ -5,7 +5,9 @@ use crate::protocol::bob; use crate::{bitcoin, monero}; use anyhow::{anyhow, Error, Result}; use libp2p::core::Multiaddr; -use libp2p::request_response::{RequestResponseEvent, RequestResponseMessage, ResponseChannel}; +use libp2p::request_response::{ + RequestId, RequestResponseEvent, RequestResponseMessage, ResponseChannel, +}; use libp2p::{NetworkBehaviour, PeerId}; use std::sync::Arc; use uuid::Uuid; @@ -107,14 +109,22 @@ impl Builder { #[derive(Debug)] pub enum OutEvent { - QuoteReceived(BidQuote), - SpotPriceReceived(spot_price::Response), - ExecutionSetupDone(Result>), + QuoteReceived { + id: RequestId, + response: BidQuote, + }, + SpotPriceReceived { + id: RequestId, + response: spot_price::Response, + }, + ExecutionSetupDone(Box>), TransferProofReceived { msg: Box, channel: ResponseChannel<()>, }, - EncryptedSignatureAcknowledged, + EncryptedSignatureAcknowledged { + id: RequestId, + }, ResponseSent, // Same variant is used for all messages as no processing is done CommunicationError(Error), } @@ -133,7 +143,13 @@ impl From for OutEvent { fn from(message: quote::Message) -> Self { match message { quote::Message::Request { .. } => OutEvent::unexpected_request(), - quote::Message::Response { response, .. } => OutEvent::QuoteReceived(response), + quote::Message::Response { + response, + request_id, + } => OutEvent::QuoteReceived { + id: request_id, + response, + }, } } } @@ -142,7 +158,13 @@ impl From for OutEvent { fn from(message: spot_price::Message) -> Self { match message { spot_price::Message::Request { .. } => OutEvent::unexpected_request(), - spot_price::Message::Response { response, .. } => OutEvent::SpotPriceReceived(response), + spot_price::Message::Response { + response, + request_id, + } => OutEvent::SpotPriceReceived { + id: request_id, + response, + }, } } } @@ -165,8 +187,8 @@ impl From for OutEvent { fn from(message: encrypted_signature::Message) -> Self { match message { encrypted_signature::Message::Request { .. } => OutEvent::unexpected_request(), - encrypted_signature::Message::Response { .. } => { - OutEvent::EncryptedSignatureAcknowledged + encrypted_signature::Message::Response { request_id, .. } => { + OutEvent::EncryptedSignatureAcknowledged { id: request_id } } } } @@ -221,7 +243,7 @@ where impl From for OutEvent { fn from(event: execution_setup::OutEvent) -> Self { match event { - execution_setup::OutEvent::Done(res) => OutEvent::ExecutionSetupDone(res.map(Box::new)), + execution_setup::OutEvent::Done(res) => OutEvent::ExecutionSetupDone(Box::new(res)), } } } diff --git a/swap/src/protocol/bob/event_loop.rs b/swap/src/protocol/bob/event_loop.rs index b6e71a64..de5bd87d 100644 --- a/swap/src/protocol/bob/event_loop.rs +++ b/swap/src/protocol/bob/event_loop.rs @@ -3,27 +3,46 @@ use crate::network::quote::BidQuote; use crate::network::{encrypted_signature, spot_price, transfer_proof}; use crate::protocol::bob::{Behaviour, OutEvent, State0, State2}; use crate::{bitcoin, monero}; -use anyhow::{anyhow, Result}; -use futures::FutureExt; +use anyhow::{Context, Result}; +use futures::future::{BoxFuture, OptionFuture}; +use futures::{FutureExt, StreamExt}; +use libp2p::request_response::{RequestId, ResponseChannel}; use libp2p::swarm::SwarmEvent; use libp2p::{PeerId, Swarm}; +use std::collections::HashMap; use std::sync::Arc; -use tokio::sync::mpsc::{Receiver, Sender}; -use tracing::{debug, error}; +use std::time::Duration; #[allow(missing_debug_implementations)] pub struct EventLoop { swarm: libp2p::Swarm, bitcoin_wallet: Arc, alice_peer_id: PeerId, - request_spot_price: Receiver, - recv_spot_price: Sender, - start_execution_setup: Receiver, - done_execution_setup: Sender>, - recv_transfer_proof: Sender, - send_encrypted_signature: Receiver, - request_quote: Receiver<()>, - recv_quote: Sender, + + // these streams represents outgoing requests that we have to make + quote_requests: bmrng::RequestReceiverStream<(), BidQuote>, + spot_price_requests: bmrng::RequestReceiverStream, + encrypted_signature_requests: bmrng::RequestReceiverStream, + execution_setup_requests: bmrng::RequestReceiverStream>, + + // these represents requests that are currently in-flight. + // once we get a response to a matching [`RequestId`], we will use the responder to relay the + // response. + inflight_spot_price_requests: HashMap>, + inflight_quote_requests: HashMap>, + inflight_encrypted_signature_requests: HashMap>, + inflight_execution_setup: Option>>, + + /// The sender we will use to relay incoming transfer proofs. + transfer_proof: bmrng::RequestSender, + /// The future representing the successful handling of an incoming transfer + /// proof. + /// + /// Once we've sent a transfer proof to the ongoing swap, this future waits + /// until the swap took it "out" of the `EventLoopHandle`. As this future + /// resolves, we use the `ResponseChannel` returned from it to send an ACK + /// to Alice that we have successfully processed the transfer proof. + pending_transfer_proof: OptionFuture>>, } impl EventLoop { @@ -32,38 +51,34 @@ impl EventLoop { alice_peer_id: PeerId, bitcoin_wallet: Arc, ) -> Result<(Self, EventLoopHandle)> { - let start_execution_setup = Channels::new(); - let done_execution_setup = Channels::new(); - let recv_transfer_proof = Channels::new(); - let send_encrypted_signature = Channels::new(); - let request_spot_price = Channels::new(); - let recv_spot_price = Channels::new(); - let request_quote = Channels::new(); - let recv_quote = Channels::new(); + let execution_setup = bmrng::channel_with_timeout(1, Duration::from_secs(30)); + let transfer_proof = bmrng::channel_with_timeout(1, Duration::from_secs(30)); + let encrypted_signature = bmrng::channel_with_timeout(1, Duration::from_secs(30)); + let spot_price = bmrng::channel_with_timeout(1, Duration::from_secs(30)); + let quote = bmrng::channel_with_timeout(1, Duration::from_secs(30)); let event_loop = EventLoop { swarm, alice_peer_id, bitcoin_wallet, - start_execution_setup: start_execution_setup.receiver, - done_execution_setup: done_execution_setup.sender, - recv_transfer_proof: recv_transfer_proof.sender, - send_encrypted_signature: send_encrypted_signature.receiver, - request_spot_price: request_spot_price.receiver, - recv_spot_price: recv_spot_price.sender, - request_quote: request_quote.receiver, - recv_quote: recv_quote.sender, + execution_setup_requests: execution_setup.1.into(), + transfer_proof: transfer_proof.0, + encrypted_signature_requests: encrypted_signature.1.into(), + spot_price_requests: spot_price.1.into(), + quote_requests: quote.1.into(), + inflight_spot_price_requests: HashMap::default(), + inflight_quote_requests: HashMap::default(), + inflight_execution_setup: None, + inflight_encrypted_signature_requests: HashMap::default(), + pending_transfer_proof: OptionFuture::from(None), }; let handle = EventLoopHandle { - start_execution_setup: start_execution_setup.sender, - done_execution_setup: done_execution_setup.receiver, - recv_transfer_proof: recv_transfer_proof.receiver, - send_encrypted_signature: send_encrypted_signature.sender, - request_spot_price: request_spot_price.sender, - recv_spot_price: recv_spot_price.receiver, - request_quote: request_quote.sender, - recv_quote: recv_quote.receiver, + execution_setup: execution_setup.0, + transfer_proof: transfer_proof.1, + encrypted_signature: encrypted_signature.0, + spot_price: spot_price.0, + quote: quote.0, }; Ok((event_loop, handle)) @@ -76,24 +91,40 @@ impl EventLoop { tokio::select! { swarm_event = self.swarm.next_event().fuse() => { match swarm_event { - SwarmEvent::Behaviour(OutEvent::SpotPriceReceived(msg)) => { - let _ = self.recv_spot_price.send(msg).await; - } - SwarmEvent::Behaviour(OutEvent::QuoteReceived(msg)) => { - let _ = self.recv_quote.send(msg).await; + SwarmEvent::Behaviour(OutEvent::SpotPriceReceived { id, response }) => { + if let Some(responder) = self.inflight_spot_price_requests.remove(&id) { + let _ = responder.respond(response); + } } - SwarmEvent::Behaviour(OutEvent::ExecutionSetupDone(res)) => { - let _ = self.done_execution_setup.send(res.map(|state|*state)).await; + SwarmEvent::Behaviour(OutEvent::QuoteReceived { id, response }) => { + if let Some(responder) = self.inflight_quote_requests.remove(&id) { + let _ = responder.respond(response); + } } - SwarmEvent::Behaviour(OutEvent::TransferProofReceived{ msg, channel }) => { - let _ = self.recv_transfer_proof.send(*msg).await; - // Send back empty response so that the request/response protocol completes. - if let Err(error) = self.swarm.transfer_proof.send_response(channel, ()) { - error!("Failed to send Transfer Proof ack: {:?}", error); + SwarmEvent::Behaviour(OutEvent::ExecutionSetupDone(response)) => { + if let Some(responder) = self.inflight_execution_setup.take() { + let _ = responder.respond(*response); } } - SwarmEvent::Behaviour(OutEvent::EncryptedSignatureAcknowledged) => { - debug!("Alice acknowledged encrypted signature"); + SwarmEvent::Behaviour(OutEvent::TransferProofReceived { msg, channel }) => { + let mut responder = match self.transfer_proof.send(*msg).await { + Ok(responder) => responder, + Err(_) => { + tracing::warn!("Failed to pass on transfer proof"); + continue; + } + }; + + self.pending_transfer_proof = OptionFuture::from(Some(async move { + let _ = responder.recv().await; + + channel + }.boxed())); + } + SwarmEvent::Behaviour(OutEvent::EncryptedSignatureAcknowledged { id }) => { + if let Some(responder) = self.inflight_encrypted_signature_requests.remove(&id) { + let _ = responder.respond(()); + } } SwarmEvent::Behaviour(OutEvent::ResponseSent) => { @@ -133,27 +164,26 @@ impl EventLoop { _ => {} } }, - spot_price_request = self.request_spot_price.recv().fuse() => { - if let Some(request) = spot_price_request { - self.swarm.spot_price.send_request(&self.alice_peer_id, request); - } + Some((request, responder)) = self.spot_price_requests.next().fuse() => { + let id = self.swarm.spot_price.send_request(&self.alice_peer_id, request); + self.inflight_spot_price_requests.insert(id, responder); }, - quote_request = self.request_quote.recv().fuse() => { - if quote_request.is_some() { - self.swarm.quote.send_request(&self.alice_peer_id, ()); - } + Some(((), responder)) = self.quote_requests.next().fuse() => { + let id = self.swarm.quote.send_request(&self.alice_peer_id, ()); + self.inflight_quote_requests.insert(id, responder); }, - option = self.start_execution_setup.recv().fuse() => { - if let Some(state0) = option { - let _ = self - .swarm - .execution_setup.run(self.alice_peer_id, state0, self.bitcoin_wallet.clone()); - } + Some((request, responder)) = self.execution_setup_requests.next().fuse() => { + self.swarm.execution_setup.run(self.alice_peer_id, request, self.bitcoin_wallet.clone()); + self.inflight_execution_setup = Some(responder); }, - encrypted_signature = self.send_encrypted_signature.recv().fuse() => { - if let Some(tx_redeem_encsig) = encrypted_signature { - self.swarm.encrypted_signature.send_request(&self.alice_peer_id, tx_redeem_encsig); - } + Some((request, responder)) = self.encrypted_signature_requests.next().fuse() => { + let id = self.swarm.encrypted_signature.send_request(&self.alice_peer_id, request); + self.inflight_encrypted_signature_requests.insert(id, responder); + }, + Some(response_channel) = &mut self.pending_transfer_proof => { + let _ = self.swarm.transfer_proof.send_response(response_channel, ()); + + self.pending_transfer_proof = OptionFuture::from(None); } } } @@ -162,87 +192,50 @@ impl EventLoop { #[derive(Debug)] pub struct EventLoopHandle { - start_execution_setup: Sender, - done_execution_setup: Receiver>, - recv_transfer_proof: Receiver, - send_encrypted_signature: Sender, - request_spot_price: Sender, - recv_spot_price: Receiver, - request_quote: Sender<()>, - recv_quote: Receiver, + execution_setup: bmrng::RequestSender>, + transfer_proof: bmrng::RequestReceiver, + encrypted_signature: bmrng::RequestSender, + spot_price: bmrng::RequestSender, + quote: bmrng::RequestSender<(), BidQuote>, } impl EventLoopHandle { pub async fn execution_setup(&mut self, state0: State0) -> Result { - let _ = self.start_execution_setup.send(state0).await?; - - self.done_execution_setup - .recv() - .await - .ok_or_else(|| anyhow!("Failed to setup execution with Alice"))? + self.execution_setup.send_receive(state0).await? } pub async fn recv_transfer_proof(&mut self) -> Result { - self.recv_transfer_proof + let (request, responder) = self + .transfer_proof .recv() .await - .ok_or_else(|| anyhow!("Failed to receive transfer proof from Alice")) + .context("Failed to receive transfer proof")?; + responder + .respond(()) + .context("Failed to acknowledge receipt of transfer proof")?; + + Ok(request) } pub async fn request_spot_price(&mut self, btc: bitcoin::Amount) -> Result { - let _ = self - .request_spot_price - .send(spot_price::Request { btc }) - .await?; - - let response = self - .recv_spot_price - .recv() - .await - .ok_or_else(|| anyhow!("Failed to receive spot price from Alice"))?; - - Ok(response.xmr) + Ok(self + .spot_price + .send_receive(spot_price::Request { btc }) + .await? + .xmr) } pub async fn request_quote(&mut self) -> Result { - let _ = self.request_quote.send(()).await?; - - let quote = self - .recv_quote - .recv() - .await - .ok_or_else(|| anyhow!("Failed to receive quote from Alice"))?; - - Ok(quote) + Ok(self.quote.send_receive(()).await?) } pub async fn send_encrypted_signature( &mut self, tx_redeem_encsig: EncryptedSignature, ) -> Result<()> { - self.send_encrypted_signature - .send(encrypted_signature::Request { tx_redeem_encsig }) - .await?; - - Ok(()) - } -} - -#[derive(Debug)] -struct Channels { - sender: Sender, - receiver: Receiver, -} - -impl Channels { - fn new() -> Channels { - let (sender, receiver) = tokio::sync::mpsc::channel(100); - Channels { sender, receiver } - } -} - -impl Default for Channels { - fn default() -> Self { - Self::new() + Ok(self + .encrypted_signature + .send_receive(encrypted_signature::Request { tx_redeem_encsig }) + .await?) } }