@ -3,27 +3,46 @@ use crate::network::quote::BidQuote;
use crate ::network ::{ encrypted_signature , spot_price , transfer_proof } ;
use crate ::protocol ::bob ::{ Behaviour , OutEvent , State0 , State2 } ;
use crate ::{ bitcoin , monero } ;
use anyhow ::{ anyhow , Result } ;
use futures ::FutureExt ;
use anyhow ::{ Context , Result } ;
use futures ::future ::{ BoxFuture , OptionFuture } ;
use futures ::{ FutureExt , StreamExt } ;
use libp2p ::request_response ::{ RequestId , ResponseChannel } ;
use libp2p ::swarm ::SwarmEvent ;
use libp2p ::{ PeerId , Swarm } ;
use std ::collections ::HashMap ;
use std ::sync ::Arc ;
use tokio ::sync ::mpsc ::{ Receiver , Sender } ;
use tracing ::{ debug , error } ;
use std ::time ::Duration ;
#[ allow(missing_debug_implementations) ]
pub struct EventLoop {
swarm : libp2p ::Swarm < Behaviour > ,
bitcoin_wallet : Arc < bitcoin ::Wallet > ,
alice_peer_id : PeerId ,
request_spot_price : Receiver < spot_price ::Request > ,
recv_spot_price : Sender < spot_price ::Response > ,
start_execution_setup : Receiver < State0 > ,
done_execution_setup : Sender < Result < State2 > > ,
recv_transfer_proof : Sender < transfer_proof ::Request > ,
send_encrypted_signature : Receiver < encrypted_signature ::Request > ,
request_quote : Receiver < ( ) > ,
recv_quote : Sender < BidQuote > ,
// these streams represents outgoing requests that we have to make
quote_requests : bmrng ::RequestReceiverStream < ( ) , BidQuote > ,
spot_price_requests : bmrng ::RequestReceiverStream < spot_price ::Request , spot_price ::Response > ,
encrypted_signature_requests : bmrng ::RequestReceiverStream < encrypted_signature ::Request , ( ) > ,
execution_setup_requests : bmrng ::RequestReceiverStream < State0 , Result < State2 > > ,
// these represents requests that are currently in-flight.
// once we get a response to a matching [`RequestId`], we will use the responder to relay the
// response.
inflight_spot_price_requests : HashMap < RequestId , bmrng ::Responder < spot_price ::Response > > ,
inflight_quote_requests : HashMap < RequestId , bmrng ::Responder < BidQuote > > ,
inflight_encrypted_signature_requests : HashMap < RequestId , bmrng ::Responder < ( ) > > ,
inflight_execution_setup : Option < bmrng ::Responder < Result < State2 > > > ,
/// The sender we will use to relay incoming transfer proofs.
transfer_proof : bmrng ::RequestSender < transfer_proof ::Request , ( ) > ,
/// The future representing the successful handling of an incoming transfer
/// proof.
///
/// Once we've sent a transfer proof to the ongoing swap, this future waits
/// until the swap took it "out" of the `EventLoopHandle`. As this future
/// resolves, we use the `ResponseChannel` returned from it to send an ACK
/// to Alice that we have successfully processed the transfer proof.
pending_transfer_proof : OptionFuture < BoxFuture < ' static , ResponseChannel < ( ) > > > ,
}
impl EventLoop {
@ -32,38 +51,34 @@ impl EventLoop {
alice_peer_id : PeerId ,
bitcoin_wallet : Arc < bitcoin ::Wallet > ,
) -> Result < ( Self , EventLoopHandle ) > {
let start_execution_setup = Channels ::new ( ) ;
let done_execution_setup = Channels ::new ( ) ;
let recv_transfer_proof = Channels ::new ( ) ;
let send_encrypted_signature = Channels ::new ( ) ;
let request_spot_price = Channels ::new ( ) ;
let recv_spot_price = Channels ::new ( ) ;
let request_quote = Channels ::new ( ) ;
let recv_quote = Channels ::new ( ) ;
let execution_setup = bmrng ::channel_with_timeout ( 1 , Duration ::from_secs ( 30 ) ) ;
let transfer_proof = bmrng ::channel_with_timeout ( 1 , Duration ::from_secs ( 30 ) ) ;
let encrypted_signature = bmrng ::channel_with_timeout ( 1 , Duration ::from_secs ( 30 ) ) ;
let spot_price = bmrng ::channel_with_timeout ( 1 , Duration ::from_secs ( 30 ) ) ;
let quote = bmrng ::channel_with_timeout ( 1 , Duration ::from_secs ( 30 ) ) ;
let event_loop = EventLoop {
swarm ,
alice_peer_id ,
bitcoin_wallet ,
start_execution_setup : start_execution_setup . receiver ,
done_execution_setup : done_execution_setup . sender ,
recv_transfer_proof : recv_transfer_proof . sender ,
send_encrypted_signature : send_encrypted_signature . receiver ,
request_spot_price : request_spot_price . receiver ,
recv_spot_price : recv_spot_price . sender ,
request_quote : request_quote . receiver ,
recv_quote : recv_quote . sender ,
execution_setup_requests : execution_setup . 1. into ( ) ,
transfer_proof : transfer_proof . 0 ,
encrypted_signature_requests : encrypted_signature . 1. into ( ) ,
spot_price_requests : spot_price . 1. into ( ) ,
quote_requests : quote . 1. into ( ) ,
inflight_spot_price_requests : HashMap ::default ( ) ,
inflight_quote_requests : HashMap ::default ( ) ,
inflight_execution_setup : None ,
inflight_encrypted_signature_requests : HashMap ::default ( ) ,
pending_transfer_proof : OptionFuture ::from ( None ) ,
} ;
let handle = EventLoopHandle {
start_execution_setup : start_execution_setup . sender ,
done_execution_setup : done_execution_setup . receiver ,
recv_transfer_proof : recv_transfer_proof . receiver ,
send_encrypted_signature : send_encrypted_signature . sender ,
request_spot_price : request_spot_price . sender ,
recv_spot_price : recv_spot_price . receiver ,
request_quote : request_quote . sender ,
recv_quote : recv_quote . receiver ,
execution_setup : execution_setup . 0 ,
transfer_proof : transfer_proof . 1 ,
encrypted_signature : encrypted_signature . 0 ,
spot_price : spot_price . 0 ,
quote : quote . 0 ,
} ;
Ok ( ( event_loop , handle ) )
@ -76,24 +91,40 @@ impl EventLoop {
tokio ::select ! {
swarm_event = self . swarm . next_event ( ) . fuse ( ) = > {
match swarm_event {
SwarmEvent ::Behaviour ( OutEvent ::SpotPriceReceived ( msg ) ) = > {
let _ = self . recv_spot_price . send ( msg ) . await ;
}
SwarmEvent ::Behaviour ( OutEvent ::QuoteReceived ( msg ) ) = > {
let _ = self . recv_quote . send ( msg ) . await ;
SwarmEvent ::Behaviour ( OutEvent ::SpotPriceReceived { id , response } ) = > {
if let Some ( responder ) = self . inflight_spot_price_requests . remove ( & id ) {
let _ = responder . respond ( response ) ;
}
}
SwarmEvent ::Behaviour ( OutEvent ::ExecutionSetupDone ( res ) ) = > {
let _ = self . done_execution_setup . send ( res . map ( | state | * state ) ) . await ;
SwarmEvent ::Behaviour ( OutEvent ::QuoteReceived { id , response } ) = > {
if let Some ( responder ) = self . inflight_quote_requests . remove ( & id ) {
let _ = responder . respond ( response ) ;
}
}
SwarmEvent ::Behaviour ( OutEvent ::TransferProofReceived { msg , channel } ) = > {
let _ = self . recv_transfer_proof . send ( * msg ) . await ;
// Send back empty response so that the request/response protocol completes.
if let Err ( error ) = self . swarm . transfer_proof . send_response ( channel , ( ) ) {
error ! ( "Failed to send Transfer Proof ack: {:?}" , error ) ;
SwarmEvent ::Behaviour ( OutEvent ::ExecutionSetupDone ( response ) ) = > {
if let Some ( responder ) = self . inflight_execution_setup . take ( ) {
let _ = responder . respond ( * response ) ;
}
}
SwarmEvent ::Behaviour ( OutEvent ::EncryptedSignatureAcknowledged ) = > {
debug ! ( "Alice acknowledged encrypted signature" ) ;
SwarmEvent ::Behaviour ( OutEvent ::TransferProofReceived { msg , channel } ) = > {
let mut responder = match self . transfer_proof . send ( * msg ) . await {
Ok ( responder ) = > responder ,
Err ( _ ) = > {
tracing ::warn ! ( "Failed to pass on transfer proof" ) ;
continue ;
}
} ;
self . pending_transfer_proof = OptionFuture ::from ( Some ( async move {
let _ = responder . recv ( ) . await ;
channel
} . boxed ( ) ) ) ;
}
SwarmEvent ::Behaviour ( OutEvent ::EncryptedSignatureAcknowledged { id } ) = > {
if let Some ( responder ) = self . inflight_encrypted_signature_requests . remove ( & id ) {
let _ = responder . respond ( ( ) ) ;
}
}
SwarmEvent ::Behaviour ( OutEvent ::ResponseSent ) = > {
@ -133,27 +164,26 @@ impl EventLoop {
_ = > { }
}
} ,
spot_price_request = self . request_spot_price . recv ( ) . fuse ( ) = > {
if let Some ( request ) = spot_price_request {
self . swarm . spot_price . send_request ( & self . alice_peer_id , request ) ;
}
Some ( ( request , responder ) ) = self . spot_price_requests . next ( ) . fuse ( ) = > {
let id = self . swarm . spot_price . send_request ( & self . alice_peer_id , request ) ;
self . inflight_spot_price_requests . insert ( id , responder ) ;
} ,
quote_request = self . request_quote . recv ( ) . fuse ( ) = > {
if quote_request . is_some ( ) {
self . swarm . quote . send_request ( & self . alice_peer_id , ( ) ) ;
}
Some ( ( ( ) , responder ) ) = self . quote_requests . next ( ) . fuse ( ) = > {
let id = self . swarm . quote . send_request ( & self . alice_peer_id , ( ) ) ;
self . inflight_quote_requests . insert ( id , responder ) ;
} ,
option = self . start_execution_setup . recv ( ) . fuse ( ) = > {
if let Some ( state0 ) = option {
let _ = self
. swarm
. execution_setup . run ( self . alice_peer_id , state0 , self . bitcoin_wallet . clone ( ) ) ;
}
Some ( ( request , responder ) ) = self . execution_setup_requests . next ( ) . fuse ( ) = > {
self . swarm . execution_setup . run ( self . alice_peer_id , request , self . bitcoin_wallet . clone ( ) ) ;
self . inflight_execution_setup = Some ( responder ) ;
} ,
encrypted_signature = self . send_encrypted_signature . recv ( ) . fuse ( ) = > {
if let Some ( tx_redeem_encsig ) = encrypted_signature {
self . swarm . encrypted_signature . send_request ( & self . alice_peer_id , tx_redeem_encsig ) ;
}
Some ( ( request , responder ) ) = self . encrypted_signature_requests . next ( ) . fuse ( ) = > {
let id = self . swarm . encrypted_signature . send_request ( & self . alice_peer_id , request ) ;
self . inflight_encrypted_signature_requests . insert ( id , responder ) ;
} ,
Some ( response_channel ) = & mut self . pending_transfer_proof = > {
let _ = self . swarm . transfer_proof . send_response ( response_channel , ( ) ) ;
self . pending_transfer_proof = OptionFuture ::from ( None ) ;
}
}
}
@ -162,87 +192,50 @@ impl EventLoop {
#[ derive(Debug) ]
pub struct EventLoopHandle {
start_execution_setup : Sender < State0 > ,
done_execution_setup : Receiver < Result < State2 > > ,
recv_transfer_proof : Receiver < transfer_proof ::Request > ,
send_encrypted_signature : Sender < encrypted_signature ::Request > ,
request_spot_price : Sender < spot_price ::Request > ,
recv_spot_price : Receiver < spot_price ::Response > ,
request_quote : Sender < ( ) > ,
recv_quote : Receiver < BidQuote > ,
execution_setup : bmrng ::RequestSender < State0 , Result < State2 > > ,
transfer_proof : bmrng ::RequestReceiver < transfer_proof ::Request , ( ) > ,
encrypted_signature : bmrng ::RequestSender < encrypted_signature ::Request , ( ) > ,
spot_price : bmrng ::RequestSender < spot_price ::Request , spot_price ::Response > ,
quote : bmrng ::RequestSender < ( ) , BidQuote > ,
}
impl EventLoopHandle {
pub async fn execution_setup ( & mut self , state0 : State0 ) -> Result < State2 > {
let _ = self . start_execution_setup . send ( state0 ) . await ? ;
self . done_execution_setup
. recv ( )
. await
. ok_or_else ( | | anyhow ! ( "Failed to setup execution with Alice" ) ) ?
self . execution_setup . send_receive ( state0 ) . await ?
}
pub async fn recv_transfer_proof ( & mut self ) -> Result < transfer_proof ::Request > {
self . recv_transfer_proof
let ( request , responder ) = self
. transfer_proof
. recv ( )
. await
. ok_or_else ( | | anyhow ! ( "Failed to receive transfer proof from Alice" ) )
. context ( "Failed to receive transfer proof" ) ? ;
responder
. respond ( ( ) )
. context ( "Failed to acknowledge receipt of transfer proof" ) ? ;
Ok ( request )
}
pub async fn request_spot_price ( & mut self , btc : bitcoin ::Amount ) -> Result < monero ::Amount > {
let _ = self
. request_spot_price
. send ( spot_price ::Request { btc } )
. await ? ;
let response = self
. recv_spot_price
. recv ( )
. await
. ok_or_else ( | | anyhow ! ( "Failed to receive spot price from Alice" ) ) ? ;
Ok ( response . xmr )
Ok ( self
. spot_price
. send_receive ( spot_price ::Request { btc } )
. await ?
. xmr )
}
pub async fn request_quote ( & mut self ) -> Result < BidQuote > {
let _ = self . request_quote . send ( ( ) ) . await ? ;
let quote = self
. recv_quote
. recv ( )
. await
. ok_or_else ( | | anyhow ! ( "Failed to receive quote from Alice" ) ) ? ;
Ok ( quote )
Ok ( self . quote . send_receive ( ( ) ) . await ? )
}
pub async fn send_encrypted_signature (
& mut self ,
tx_redeem_encsig : EncryptedSignature ,
) -> Result < ( ) > {
self . send_encrypted_signature
. send ( encrypted_signature ::Request { tx_redeem_encsig } )
. await ? ;
Ok ( ( ) )
}
}
#[ derive(Debug) ]
struct Channels < T > {
sender : Sender < T > ,
receiver : Receiver < T > ,
}
impl < T > Channels < T > {
fn new ( ) -> Channels < T > {
let ( sender , receiver ) = tokio ::sync ::mpsc ::channel ( 100 ) ;
Channels { sender , receiver }
}
}
impl < T > Default for Channels < T > {
fn default ( ) -> Self {
Self ::new ( )
Ok ( self
. encrypted_signature
. send_receive ( encrypted_signature ::Request { tx_redeem_encsig } )
. await ? )
}
}