diff --git a/swap/src/database/alice.rs b/swap/src/database/alice.rs index cb6c30b6..b2424fba 100644 --- a/swap/src/database/alice.rs +++ b/swap/src/database/alice.rs @@ -4,7 +4,6 @@ use crate::monero::monero_private_key; use crate::protocol::alice; use crate::protocol::alice::AliceState; use ::bitcoin::hashes::core::fmt::Display; -use libp2p::PeerId; use monero_rpc::wallet::BlockHeight; use serde::{Deserialize, Serialize}; @@ -15,13 +14,9 @@ use serde::{Deserialize, Serialize}; pub enum Alice { Started { state3: alice::State3, - #[serde(with = "crate::serde_peer_id")] - bob_peer_id: PeerId, }, BtcLocked { state3: alice::State3, - #[serde(with = "crate::serde_peer_id")] - bob_peer_id: PeerId, }, XmrLocked { monero_wallet_restore_blockheight: BlockHeight, @@ -64,19 +59,11 @@ pub enum AliceEndState { impl From<&AliceState> for Alice { fn from(alice_state: &AliceState) -> Self { match alice_state { - AliceState::Started { - state3, - bob_peer_id, - } => Alice::Started { + AliceState::Started { state3 } => Alice::Started { state3: state3.as_ref().clone(), - bob_peer_id: *bob_peer_id, }, - AliceState::BtcLocked { - state3, - bob_peer_id, - } => Alice::BtcLocked { + AliceState::BtcLocked { state3 } => Alice::BtcLocked { state3: state3.as_ref().clone(), - bob_peer_id: *bob_peer_id, }, AliceState::XmrLocked { monero_wallet_restore_blockheight, @@ -137,18 +124,10 @@ impl From<&AliceState> for Alice { impl From for AliceState { fn from(db_state: Alice) -> Self { match db_state { - Alice::Started { - state3, - bob_peer_id, - } => AliceState::Started { - bob_peer_id, + Alice::Started { state3 } => AliceState::Started { state3: Box::new(state3), }, - Alice::BtcLocked { - state3, - bob_peer_id, - } => AliceState::BtcLocked { - bob_peer_id, + Alice::BtcLocked { state3 } => AliceState::BtcLocked { state3: Box::new(state3), }, Alice::XmrLocked { diff --git a/swap/src/lib.rs b/swap/src/lib.rs index b769dc96..eaab1fec 100644 --- a/swap/src/lib.rs +++ b/swap/src/lib.rs @@ -30,4 +30,3 @@ pub mod seed; pub mod trace; mod monero_ext; -mod serde_peer_id; diff --git a/swap/src/protocol/alice/behaviour.rs b/swap/src/protocol/alice/behaviour.rs index c3790392..97a20f1c 100644 --- a/swap/src/protocol/alice/behaviour.rs +++ b/swap/src/protocol/alice/behaviour.rs @@ -32,6 +32,7 @@ pub enum OutEvent { EncryptedSignature { msg: Box, channel: ResponseChannel<()>, + peer: PeerId, }, ResponseSent, // Same variant is used for all messages as no processing is done Failure(Error), @@ -140,9 +141,10 @@ impl From for OutEvent { fn from(event: encrypted_signature::OutEvent) -> Self { use crate::protocol::alice::encrypted_signature::OutEvent::*; match event { - MsgReceived { msg, channel } => OutEvent::EncryptedSignature { + MsgReceived { msg, channel, peer } => OutEvent::EncryptedSignature { msg: Box::new(msg), channel, + peer, }, AckSent => OutEvent::ResponseSent, Failure(err) => OutEvent::Failure(err.context("Failure with Encrypted Signature")), diff --git a/swap/src/protocol/alice/encrypted_signature.rs b/swap/src/protocol/alice/encrypted_signature.rs index dac19113..f7dee187 100644 --- a/swap/src/protocol/alice/encrypted_signature.rs +++ b/swap/src/protocol/alice/encrypted_signature.rs @@ -5,7 +5,7 @@ use libp2p::request_response::{ ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent, RequestResponseMessage, ResponseChannel, }; -use libp2p::NetworkBehaviour; +use libp2p::{NetworkBehaviour, PeerId}; use std::time::Duration; use tracing::debug; @@ -14,6 +14,7 @@ pub enum OutEvent { MsgReceived { msg: EncryptedSignature, channel: ResponseChannel<()>, + peer: PeerId, }, AckSent, Failure(Error), @@ -67,6 +68,7 @@ impl From> for OutEvent { OutEvent::MsgReceived { msg: request, channel, + peer, } } RequestResponseEvent::Message { diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index 84da64d9..41cbb910 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -9,13 +9,16 @@ use crate::protocol::bob::EncryptedSignature; use crate::seed::Seed; use crate::{bitcoin, kraken, monero}; use anyhow::{bail, Context, Result}; +use futures::future; +use futures::future::{BoxFuture, FutureExt}; +use futures::stream::{FuturesUnordered, StreamExt}; use libp2p::core::Multiaddr; -use libp2p::futures::FutureExt; use libp2p::{PeerId, Swarm}; use rand::rngs::OsRng; +use std::collections::HashMap; use std::convert::Infallible; use std::sync::Arc; -use tokio::sync::{broadcast, mpsc}; +use tokio::sync::{mpsc, oneshot}; use tracing::{debug, error, trace}; use uuid::Uuid; @@ -30,19 +33,19 @@ pub struct EventLoop { latest_rate: RS, max_buy: bitcoin::Amount, - recv_encrypted_signature: broadcast::Sender, - send_transfer_proof: mpsc::Receiver<(PeerId, TransferProof)>, - - // Only used to produce new handles - send_transfer_proof_sender: mpsc::Sender<(PeerId, TransferProof)>, + /// Stores a sender per peer for incoming [`EncryptedSignature`]s. + recv_encrypted_signature: HashMap>, + /// Stores a list of futures, waiting for transfer proof which will be sent + /// to the given peer. + send_transfer_proof: FuturesUnordered>>, swap_sender: mpsc::Sender, } #[derive(Debug)] pub struct EventLoopHandle { - recv_encrypted_signature: broadcast::Receiver, - send_transfer_proof: mpsc::Sender<(PeerId, TransferProof)>, + recv_encrypted_signature: Option>, + send_transfer_proof: Option>, } impl EventLoop @@ -74,8 +77,6 @@ where Swarm::listen_on(&mut swarm, listen_address.clone()) .with_context(|| format!("Address is not supported: {:#}", listen_address))?; - let recv_encrypted_signature = BroadcastChannels::default(); - let send_transfer_proof = MpscChannels::default(); let swap_channel = MpscChannels::default(); let event_loop = EventLoop { @@ -86,30 +87,26 @@ where monero_wallet, db, latest_rate, - recv_encrypted_signature: recv_encrypted_signature.sender, - send_transfer_proof: send_transfer_proof.receiver, - send_transfer_proof_sender: send_transfer_proof.sender, swap_sender: swap_channel.sender, max_buy, + recv_encrypted_signature: Default::default(), + send_transfer_proof: Default::default(), }; Ok((event_loop, swap_channel.receiver)) } - pub fn new_handle(&self) -> EventLoopHandle { - EventLoopHandle { - recv_encrypted_signature: self.recv_encrypted_signature.subscribe(), - send_transfer_proof: self.send_transfer_proof_sender.clone(), - } - } - pub fn peer_id(&self) -> PeerId { self.peer_id } pub async fn run(mut self) { + // ensure that the send_transfer_proof stream is NEVER empty, otherwise it will + // terminate forever. + self.send_transfer_proof.push(future::pending().boxed()); + loop { tokio::select! { - swarm_event = self.swarm.next().fuse() => { + swarm_event = self.swarm.next() => { match swarm_event { OutEvent::ConnectionEstablished(alice) => { debug!("Connection Established with {}", alice); @@ -164,9 +161,17 @@ where OutEvent::TransferProofAcknowledged => { trace!("Bob acknowledged transfer proof"); } - OutEvent::EncryptedSignature{ msg, channel } => { - let _ = self.recv_encrypted_signature.send(*msg); - // Send back empty response so that the request/response protocol completes. + OutEvent::EncryptedSignature{ msg, channel, peer } => { + match self.recv_encrypted_signature.remove(&peer) { + Some(sender) => { + // this failing just means the receiver is no longer interested ... + let _ = sender.send(*msg); + }, + None => { + tracing::warn!(%peer, "No sender for encrypted signature, maybe already handled?") + } + } + if let Err(error) = self.swarm.send_encrypted_signature_ack(channel) { error!("Failed to send Encrypted Signature ack: {:?}", error); } @@ -177,11 +182,19 @@ where } } }, - transfer_proof = self.send_transfer_proof.recv().fuse() => { - if let Some((bob_peer_id, msg)) = transfer_proof { - self.swarm.send_transfer_proof(bob_peer_id, msg); + next_transfer_proof = self.send_transfer_proof.next() => { + match next_transfer_proof { + Some(Ok((peer, transfer_proof))) => { + self.swarm.send_transfer_proof(peer, transfer_proof); + }, + Some(Err(_)) => { + tracing::debug!("A swap stopped without sending a transfer proof"); + } + None => { + unreachable!("stream of transfer proof receivers must never terminate") + } } - }, + } } } } @@ -230,11 +243,10 @@ where async fn handle_execution_setup_done(&mut self, bob_peer_id: PeerId, state3: State3) { let swap_id = Uuid::new_v4(); - let handle = self.new_handle(); + let handle = self.new_handle(bob_peer_id); let initial_state = AliceState::Started { state3: Box::new(state3), - bob_peer_id, }; let swap = Swap { @@ -251,6 +263,29 @@ where tracing::warn!(%swap_id, "Swap cannot be spawned: {}", error); } } + + /// Create a new [`EventLoopHandle`] that is scoped for communication with + /// the given peer. + fn new_handle(&mut self, peer: PeerId) -> EventLoopHandle { + let (send_transfer_proof_sender, send_transfer_proof_receiver) = oneshot::channel(); + let (recv_enc_sig_sender, recv_enc_sig_receiver) = oneshot::channel(); + + self.recv_encrypted_signature + .insert(peer, recv_enc_sig_sender); + self.send_transfer_proof.push( + async move { + let transfer_proof = send_transfer_proof_receiver.await?; + + Ok((peer, transfer_proof)) + } + .boxed(), + ); + + EventLoopHandle { + recv_encrypted_signature: Some(recv_enc_sig_receiver), + send_transfer_proof: Some(send_transfer_proof_sender), + } + } } pub trait LatestRate { @@ -277,13 +312,24 @@ impl LatestRate for kraken::RateUpdateStream { impl EventLoopHandle { pub async fn recv_encrypted_signature(&mut self) -> Result { - self.recv_encrypted_signature - .recv() - .await - .context("Failed to receive Bitcoin encrypted signature from Bob") + let signature = self + .recv_encrypted_signature + .take() + .context("Encrypted signature was already received")? + .await?; + + Ok(signature) } - pub async fn send_transfer_proof(&mut self, bob: PeerId, msg: TransferProof) -> Result<()> { - let _ = self.send_transfer_proof.send((bob, msg)).await?; + pub async fn send_transfer_proof(&mut self, msg: TransferProof) -> Result<()> { + if self + .send_transfer_proof + .take() + .context("Transfer proof was already sent")? + .send(msg) + .is_err() + { + bail!("Failed to send transfer proof, receiver no longer listening?") + } Ok(()) } @@ -308,21 +354,3 @@ impl Default for MpscChannels { MpscChannels { sender, receiver } } } - -#[allow(missing_debug_implementations)] -struct BroadcastChannels -where - T: Clone, -{ - sender: broadcast::Sender, -} - -impl Default for BroadcastChannels -where - T: Clone, -{ - fn default() -> Self { - let (sender, _receiver) = broadcast::channel(100); - BroadcastChannels { sender } - } -} diff --git a/swap/src/protocol/alice/state.rs b/swap/src/protocol/alice/state.rs index 263aee10..dfd2441e 100644 --- a/swap/src/protocol/alice/state.rs +++ b/swap/src/protocol/alice/state.rs @@ -7,7 +7,6 @@ use crate::protocol::bob::{Message0, Message2, Message4}; use crate::protocol::CROSS_CURVE_PROOF_SYSTEM; use crate::{bitcoin, monero}; use anyhow::{anyhow, bail, Context, Result}; -use libp2p::PeerId; use monero_rpc::wallet::BlockHeight; use rand::{CryptoRng, RngCore}; use serde::{Deserialize, Serialize}; @@ -17,11 +16,9 @@ use std::fmt; #[derive(Debug)] pub enum AliceState { Started { - bob_peer_id: PeerId, state3: Box, }, BtcLocked { - bob_peer_id: PeerId, state3: Box, }, XmrLocked { diff --git a/swap/src/protocol/alice/steps.rs b/swap/src/protocol/alice/steps.rs index 3f7b18e8..eec8a86b 100644 --- a/swap/src/protocol/alice/steps.rs +++ b/swap/src/protocol/alice/steps.rs @@ -7,10 +7,8 @@ use crate::protocol::alice::TransferProof; use crate::{bitcoin, monero}; use anyhow::{bail, Context, Result}; use futures::pin_mut; -use libp2p::PeerId; pub async fn lock_xmr( - bob_peer_id: PeerId, state3: alice::State3, event_loop_handle: &mut EventLoopHandle, monero_wallet: &monero::Wallet, @@ -30,7 +28,7 @@ pub async fn lock_xmr( // Otherwise Alice might publish the lock tx twice! event_loop_handle - .send_transfer_proof(bob_peer_id, TransferProof { + .send_transfer_proof(TransferProof { tx_lock_proof: transfer_proof, }) .await?; diff --git a/swap/src/protocol/alice/swap.rs b/swap/src/protocol/alice/swap.rs index 2d652416..33822dbb 100644 --- a/swap/src/protocol/alice/swap.rs +++ b/swap/src/protocol/alice/swap.rs @@ -76,10 +76,7 @@ async fn run_until_internal( Ok(state) } else { match state { - AliceState::Started { - state3, - bob_peer_id, - } => { + AliceState::Started { state3 } => { timeout( execution_params.bob_time_to_act, bitcoin_wallet @@ -94,10 +91,7 @@ async fn run_until_internal( }) .await?; - let state = AliceState::BtcLocked { - bob_peer_id, - state3, - }; + let state = AliceState::BtcLocked { state3 }; let db_state = (&state).into(); db.insert_latest_state(swap_id, database::Swap::Alice(db_state)) @@ -114,21 +108,12 @@ async fn run_until_internal( ) .await } - AliceState::BtcLocked { - bob_peer_id, - state3, - } => { + AliceState::BtcLocked { state3 } => { // Record the current monero wallet block height so we don't have to scan from // block 0 for scenarios where we create a refund wallet. let monero_wallet_restore_blockheight = monero_wallet.block_height().await?; - lock_xmr( - bob_peer_id, - *state3.clone(), - &mut event_loop_handle, - &monero_wallet, - ) - .await?; + lock_xmr(*state3.clone(), &mut event_loop_handle, &monero_wallet).await?; let state = AliceState::XmrLocked { state3, diff --git a/swap/src/serde_peer_id.rs b/swap/src/serde_peer_id.rs deleted file mode 100644 index 8dc9a3e9..00000000 --- a/swap/src/serde_peer_id.rs +++ /dev/null @@ -1,49 +0,0 @@ -//! A serde module that defines how we want to serialize PeerIds on the -//! HTTP-API. - -use libp2p::PeerId; -use serde::de::Error; -use serde::{Deserialize, Deserializer, Serializer}; - -pub fn serialize(peer_id: &PeerId, serializer: S) -> Result -where - S: Serializer, -{ - let string = peer_id.to_string(); - serializer.serialize_str(&string) -} - -#[allow(dead_code)] -pub fn deserialize<'de, D>(deserializer: D) -> Result -where - D: Deserializer<'de>, -{ - let string = String::deserialize(deserializer)?; - let peer_id = string.parse().map_err(D::Error::custom)?; - - Ok(peer_id) -} - -#[cfg(test)] -mod tests { - use super::*; - use serde::Serialize; - use spectral::prelude::*; - - #[derive(Serialize)] - struct SerializablePeerId(#[serde(with = "super")] PeerId); - - #[test] - fn maker_id_serializes_as_expected() { - let peer_id = SerializablePeerId( - "QmfUfpC2frwFvcDzpspnfZitHt5wct6n4kpG5jzgRdsxkY" - .parse() - .unwrap(), - ); - - let got = serde_json::to_string(&peer_id).expect("failed to serialize peer id"); - - assert_that(&got) - .is_equal_to(r#""QmfUfpC2frwFvcDzpspnfZitHt5wct6n4kpG5jzgRdsxkY""#.to_string()); - } -}