diff --git a/swap/src/network/quote.rs b/swap/src/network/quote.rs index 7fb999ff..46fdcc2a 100644 --- a/swap/src/network/quote.rs +++ b/swap/src/network/quote.rs @@ -1,6 +1,6 @@ use crate::bitcoin; use crate::network::json_pull_codec::JsonPullCodec; -use crate::protocol::{alice, bob}; +use crate::protocol::{bob}; use libp2p::core::ProtocolName; use libp2p::request_response::{ ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent, @@ -10,10 +10,10 @@ use libp2p::PeerId; use serde::{Deserialize, Serialize}; const PROTOCOL: &str = "/comit/xmr/btc/bid-quote/1.0.0"; -type OutEvent = RequestResponseEvent<(), BidQuote>; -type Message = RequestResponseMessage<(), BidQuote>; +pub type OutEvent = RequestResponseEvent<(), Response>; +type Message = RequestResponseMessage<(), Response>; -pub type Behaviour = RequestResponse>; +pub type Behaviour = RequestResponse>; #[derive(Debug, Clone, Copy, Default)] pub struct BidQuoteProtocol; @@ -24,6 +24,12 @@ impl ProtocolName for BidQuoteProtocol { } } +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum Response { + Quote(BidQuote), + Error +} + /// Represents a quote for buying XMR. #[derive(Serialize, Deserialize, Debug, Clone)] pub struct BidQuote { @@ -38,17 +44,6 @@ pub struct BidQuote { pub max_quantity: bitcoin::Amount, } -/// Constructs a new instance of the `quote` behaviour to be used by Alice. -/// -/// Alice only supports inbound connections, i.e. handing out quotes. -pub fn alice() -> Behaviour { - Behaviour::new( - JsonPullCodec::default(), - vec![(BidQuoteProtocol, ProtocolSupport::Inbound)], - RequestResponseConfig::default(), - ) -} - /// Constructs a new instance of the `quote` behaviour to be used by Bob. /// /// Bob only supports outbound connections, i.e. requesting quotes. @@ -60,16 +55,6 @@ pub fn bob() -> Behaviour { ) } -impl From<(PeerId, Message)> for alice::OutEvent { - fn from((peer, message): (PeerId, Message)) -> Self { - match message { - Message::Request { channel, .. } => Self::QuoteRequested { channel, peer }, - Message::Response { .. } => Self::unexpected_response(peer), - } - } -} -crate::impl_from_rr_event!(OutEvent, alice::OutEvent, PROTOCOL); - impl From<(PeerId, Message)> for bob::OutEvent { fn from((peer, message): (PeerId, Message)) -> Self { match message { @@ -77,7 +62,7 @@ impl From<(PeerId, Message)> for bob::OutEvent { Message::Response { response, request_id, - } => Self::QuoteReceived { + } => Self::QuoteResponse { id: request_id, response, }, diff --git a/swap/src/network/swarm.rs b/swap/src/network/swarm.rs index 37e9f219..cd053063 100644 --- a/swap/src/network/swarm.rs +++ b/swap/src/network/swarm.rs @@ -19,7 +19,7 @@ pub fn asb( env_config: env::Config, ) -> Result>> where - LR: LatestRate + Send + 'static + Debug, + LR: LatestRate + Clone + Send + 'static + Debug, { let behaviour = alice::Behaviour::new( balance, diff --git a/swap/src/protocol/alice.rs b/swap/src/protocol/alice.rs index 9d5394ba..f5ff4d61 100644 --- a/swap/src/protocol/alice.rs +++ b/swap/src/protocol/alice.rs @@ -24,6 +24,7 @@ mod recovery; mod spot_price; pub mod state; pub mod swap; +pub mod quote; pub struct Swap { pub state: AliceState, diff --git a/swap/src/protocol/alice/behaviour.rs b/swap/src/protocol/alice/behaviour.rs index ce7334b9..f5790c6a 100644 --- a/swap/src/protocol/alice/behaviour.rs +++ b/swap/src/protocol/alice/behaviour.rs @@ -1,13 +1,14 @@ -use crate::network::quote::BidQuote; -use crate::network::{encrypted_signature, quote, transfer_proof}; +use crate::network::{encrypted_signature, transfer_proof}; use crate::protocol::alice::event_loop::LatestRate; -use crate::protocol::alice::{execution_setup, spot_price, State3}; +use crate::protocol::alice::{execution_setup, spot_price, State3, quote}; use crate::{env, monero}; use anyhow::{anyhow, Error}; use libp2p::ping::{Ping, PingEvent}; use libp2p::request_response::{RequestId, ResponseChannel}; use libp2p::{NetworkBehaviour, PeerId}; use uuid::Uuid; +use crate::network::quote::BidQuote; +use crate::protocol::alice; #[derive(Debug)] pub enum OutEvent { @@ -20,9 +21,13 @@ pub enum OutEvent { btc: bitcoin::Amount, xmr: monero::Amount, }, - QuoteRequested { - channel: ResponseChannel, + QuoteSent { peer: PeerId, + quote: BidQuote + }, + QuoteError { + peer: PeerId, + error: alice::quote::Error }, ExecutionSetupDone { bob_peer_id: PeerId, @@ -71,7 +76,7 @@ pub struct Behaviour where LR: LatestRate + Send + 'static, { - pub quote: quote::Behaviour, + pub quote: quote::Behaviour, pub spot_price: spot_price::Behaviour, pub execution_setup: execution_setup::Behaviour, pub transfer_proof: transfer_proof::Behaviour, @@ -85,7 +90,7 @@ where impl Behaviour where - LR: LatestRate + Send + 'static, + LR: LatestRate + Clone + Send + 'static, { pub fn new( balance: monero::Amount, @@ -97,7 +102,7 @@ where env_config: env::Config, ) -> Self { Self { - quote: quote::alice(), + quote: alice::quote::Behaviour::new(balance, lock_fee, max_buy, latest_rate.clone(), resume_only), spot_price: spot_price::Behaviour::new( balance, lock_fee, diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index 49f32d73..eb4b19c2 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -1,7 +1,6 @@ use crate::asb::Rate; use crate::database::Database; use crate::env::Config; -use crate::network::quote::BidQuote; use crate::network::transfer_proof; use crate::protocol::alice::spot_price::Error; use crate::protocol::alice::{AliceState, Behaviour, OutEvent, State0, State3, Swap}; @@ -21,6 +20,7 @@ use std::fmt::Debug; use std::sync::Arc; use tokio::sync::mpsc; use uuid::Uuid; +use crate::network::quote::BidQuote; /// A future that resolves to a tuple of `PeerId`, `transfer_proof::Request` and /// `Responder`. @@ -222,29 +222,52 @@ where } } } - SwarmEvent::Behaviour(OutEvent::QuoteRequested { channel, peer }) => { - // TODO: Move the spot-price update into dedicated update stream to decouple it from quote requests - let current_balance = self.monero_wallet.get_balance().await; - match current_balance { - Ok(balance) => { - self.swarm.behaviour_mut().spot_price.update_balance(balance); - } - Err(e) => { - tracing::error!("Failed to fetch Monero balance: {:#}", e); - } - } - - let quote = match self.make_quote(self.min_buy, self.max_buy).await { - Ok(quote) => quote, - Err(error) => { - tracing::warn!(%peer, "Failed to make quote. Error {:#}", error); - continue; - } - }; - - if self.swarm.behaviour_mut().quote.send_response(channel, quote).is_err() { - tracing::debug!(%peer, "Failed to respond with quote"); - } + SwarmEvent::Behaviour(OutEvent::QuoteSent { peer, quote }) => { + // TODO: Move the balance for quote/spot-price update into dedicated update stream to decouple it from quote requests + + // TODO: move commented code into quote behaviour + + // let current_balance = self.monero_wallet.get_balance().await; + // match current_balance { + // Ok(balance) => { + // self.swarm.behaviour_mut().spot_price.update_balance(balance); + // } + // Err(e) => { + // tracing::error!("Failed to fetch Monero balance: {:#}", e); + // } + // } + // + // let rate = match self.latest_rate.latest_rate() { + // Ok(rate) => rate, + // Err(e) => { + // self.decline(peer, channel, Error::LatestRateFetchFailed(Box::new(e))); + // return; + // } + // }; + // let xmr = match rate.sell_quote(self.max_buy) { + // Ok(xmr) => xmr, + // Err(e) => { + // self.decline(peer, channel, Error::SellQuoteCalculationFailed(e)); + // return; + // } + // }; + // + // // TODO: Lock fee should be taken into account here too... + // if current_balance < xmr { + // + // } + // + // let quote = match self.make_quote(self.min_buy, self.max_buy).await { + // Ok(quote) => quote, + // Err(error) => { + // tracing::warn!(%peer, "Failed to make quote. Error {:#}", error); + // continue; + // } + // }; + // + // if self.swarm.behaviour_mut().quote.send_response(channel, quote::Response::Quote(quote)).is_err() { + // tracing::debug!(%peer, "Failed to respond with quote"); + // } } SwarmEvent::Behaviour(OutEvent::ExecutionSetupDone{bob_peer_id, swap_id, state3}) => { let _ = self.handle_execution_setup_done(bob_peer_id, swap_id, *state3).await; diff --git a/swap/src/protocol/alice/quote.rs b/swap/src/protocol/alice/quote.rs new file mode 100644 index 00000000..6426beb5 --- /dev/null +++ b/swap/src/protocol/alice/quote.rs @@ -0,0 +1,153 @@ +use libp2p::{NetworkBehaviour, PeerId}; +use crate::network::quote::{BidQuote, BidQuoteProtocol}; +use crate::protocol::alice::event_loop::LatestRate; +use std::collections::VecDeque; +use libp2p::request_response::{RequestResponseConfig, ProtocolSupport, ResponseChannel}; +use crate::network::json_pull_codec::JsonPullCodec; +use crate::monero; +use std::task::{Context, Poll}; +use libp2p::swarm::{PollParameters, NetworkBehaviourAction, NetworkBehaviourEventProcess}; +use crate::protocol::alice; +use crate::network::quote; + +#[derive(Debug)] +pub enum OutEvent { + QuoteSent { + peer: PeerId, + quote: BidQuote + }, + Error { + peer: PeerId, + error: Error, + }, +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("ASB is running in resume-only mode")] + FailedToCreateQuote, + #[error("Balance is {xmr_balance} which is insufficient to fulfill max buy of {max_buy} at price {price}")] + InsufficientFunds { + max_buy: bitcoin::Amount, + price: bitcoin::Amount, + xmr_balance: monero::Amount + }, +} + +#[derive(NetworkBehaviour)] +#[behaviour(out_event = "OutEvent", poll_method = "poll", event_process = true)] +#[allow(missing_debug_implementations)] +pub struct Behaviour + where + LR: LatestRate + Send + 'static, +{ + behaviour: quote::Behaviour, + + #[behaviour(ignore)] + events: VecDeque, + + #[behaviour(ignore)] + balance: monero::Amount, + #[behaviour(ignore)] + lock_fee: monero::Amount, + #[behaviour(ignore)] + max_buy: bitcoin::Amount, + #[behaviour(ignore)] + latest_rate: LR, + #[behaviour(ignore)] + resume_only: bool, +} + +/// Behaviour that handles quotes +/// All the logic how to react to a quote request is contained here, events +/// reporting the successful handling of a quote request or a failure are +/// bubbled up to the parent behaviour. +impl Behaviour + where + LR: LatestRate + Send + 'static, +{ + pub fn new( + balance: monero::Amount, + lock_fee: monero::Amount, + max_buy: bitcoin::Amount, + latest_rate: LR, + resume_only: bool, + ) -> Self { + Self { + behaviour: quote::Behaviour::new( + JsonPullCodec::default(), + vec![(BidQuoteProtocol, ProtocolSupport::Inbound)], + RequestResponseConfig::default(), + ), + events: Default::default(), + balance, + lock_fee, + max_buy, + latest_rate, + resume_only, + } + } + + pub fn update_balance(&mut self, balance: monero::Amount) { + self.balance = balance; + } + + fn decline( + &mut self, + peer: PeerId, + channel: ResponseChannel, + error: Error, + ) { + if self + .behaviour + .send_response( + channel, + quote::Response::Error, + ) + .is_err() + { + tracing::debug!(%peer, "Unable to send error response for quote request"); + } + + self.events.push_back(OutEvent::Error { peer, error }); + } + + fn poll( + &mut self, + _cx: &mut Context<'_>, + _params: &mut impl PollParameters, + ) -> Poll> { + if let Some(event) = self.events.pop_front() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + } + + // We trust in libp2p to poll us. + Poll::Pending + } +} + +impl NetworkBehaviourEventProcess for Behaviour + where + LR: LatestRate + Send + 'static, +{ + fn inject_event(&mut self, event: quote::OutEvent) { + + // TODO: Move the quote from the event loop into here + + todo!() + } +} + +impl From for alice::OutEvent { + fn from(event: OutEvent) -> Self { + match event { + OutEvent::QuoteSent { peer, quote } => { + Self::QuoteSent { peer, quote } + } + OutEvent::Error { peer, error } => Self::QuoteError { peer, error }, + } + } +} + + +// TODO: Add tests similar to spot price diff --git a/swap/src/protocol/bob/behaviour.rs b/swap/src/protocol/bob/behaviour.rs index 8f156e1b..f1bd72ab 100644 --- a/swap/src/protocol/bob/behaviour.rs +++ b/swap/src/protocol/bob/behaviour.rs @@ -1,4 +1,3 @@ -use crate::network::quote::BidQuote; use crate::network::{encrypted_signature, quote, redial, spot_price, transfer_proof}; use crate::protocol::bob; use crate::protocol::bob::{execution_setup, State2}; @@ -11,9 +10,9 @@ use std::time::Duration; #[derive(Debug)] pub enum OutEvent { - QuoteReceived { + QuoteResponse { id: RequestId, - response: BidQuote, + response: quote::Response, }, SpotPriceReceived { id: RequestId, diff --git a/swap/src/protocol/bob/event_loop.rs b/swap/src/protocol/bob/event_loop.rs index 4fe347bf..d7bf6212 100644 --- a/swap/src/protocol/bob/event_loop.rs +++ b/swap/src/protocol/bob/event_loop.rs @@ -1,5 +1,6 @@ use crate::bitcoin::EncryptedSignature; use crate::network::quote::BidQuote; +use crate::network::quote; use crate::network::spot_price::{BlockchainNetwork, Response}; use crate::network::{encrypted_signature, spot_price}; use crate::protocol::bob; @@ -111,9 +112,16 @@ impl EventLoop { let _ = responder.respond(response); } } - SwarmEvent::Behaviour(OutEvent::QuoteReceived { id, response }) => { - if let Some(responder) = self.inflight_quote_requests.remove(&id) { - let _ = responder.respond(response); + SwarmEvent::Behaviour(OutEvent::QuoteResponse { id, response }) => { + match response { + quote::Response::Quote(bid_quote) => { + if let Some(responder) = self.inflight_quote_requests.remove(&id) { + let _ = responder.respond(bid_quote); + } + } + quote::Response::Error => { + tracing::warn!("This ASB currently does not accept trades"); + } } } SwarmEvent::Behaviour(OutEvent::ExecutionSetupDone(response)) => {