@ -1,121 +1,77 @@
use crate ::{
bitcoin ,
database ::Database ,
execution_params ::ExecutionParams ,
network ::{ transport ::SwapTransport , TokioExecutor } ,
monero , network ,
network ::{ transport , TokioExecutor } ,
protocol ::{
alice ::{ Behaviour , OutEvent , State0 , State3 , SwapResponse , TransferProof } ,
alice ,
alice ::{ Behaviour , Builder , OutEvent , State0 , State3 , SwapResponse , TransferProof } ,
bob ::{ EncryptedSignature , SwapRequest } ,
SwapAmounts ,
} ,
seed ::Seed ,
} ;
use anyhow ::{ anyhow , Context , Result } ;
use libp2p ::{
core ::Multiaddr , futures ::FutureExt , request_response ::ResponseChannel , PeerId , Swarm ,
} ;
use tokio ::{
sync ::mpsc ::{ Receiver , Sender } ,
time ::timeout ,
} ;
use tracing ::{ error , trace } ;
use rand ::rngs ::OsRng ;
use std ::{ collections ::HashMap , sync ::Arc } ;
use tokio ::sync ::{ broadcast , mpsc } ;
use tracing ::{ debug , error , trace } ;
use uuid ::Uuid ;
// TODO: Use dynamic
const RATE : u32 = 100 ;
#[ allow(missing_debug_implementations) ]
pub struct Channels < T > {
sender : Sender< T > ,
receiver : Receiver< T > ,
pub struct Mpsc Channels< T > {
sender : mpsc:: Sender< T > ,
receiver : mpsc:: Receiver< T > ,
}
impl < T > Channels< T > {
pub fn new ( ) -> Channels < T > {
let ( sender , receiver ) = tokio::sync :: mpsc::channel ( 100 ) ;
Channels { sender , receiver }
impl < T > Default for Mpsc Channels< T > {
fn default ( ) -> Self {
let ( sender , receiver ) = mpsc::channel ( 100 ) ;
Mpsc Channels { sender , receiver }
}
}
impl < T > Default for Channels < T > {
#[ allow(missing_debug_implementations) ]
pub struct BroadcastChannels < T >
where
T : Clone ,
{
sender : broadcast ::Sender < T > ,
}
impl < T > Default for BroadcastChannels < T >
where
T : Clone ,
{
fn default ( ) -> Self {
Self ::new ( )
let ( sender , _receiver ) = broadcast ::channel ( 100 ) ;
BroadcastChannels { sender }
}
}
#[ derive(Debug) ]
pub struct EventLoopHandle {
done_execution_setup : Receiver < Result < State3 > > ,
recv_encrypted_signature : Receiver < EncryptedSignature > ,
recv_swap_request : Receiver < ( SwapRequest , ResponseChannel < SwapResponse > ) > ,
conn_established : Receiver < PeerId > ,
send_swap_response : Sender < ( ResponseChannel < SwapResponse > , SwapResponse ) > ,
start_execution_setup : Sender < ( PeerId , State0 ) > ,
send_transfer_proof : Sender < ( PeerId , TransferProof ) > ,
recv_transfer_proof_ack : Receiver < ( ) > ,
recv_encrypted_signature : broadcast ::Receiver < EncryptedSignature > ,
send_transfer_proof : mpsc ::Sender < ( PeerId , TransferProof ) > ,
}
impl EventLoopHandle {
pub async fn recv_conn_established ( & mut self ) -> Result < PeerId > {
self . conn_established
. recv ( )
. await
. ok_or_else ( | | anyhow ! ( "Failed to receive connection established from Bob" ) )
}
pub async fn execution_setup ( & mut self , bob_peer_id : PeerId , state0 : State0 ) -> Result < State3 > {
let _ = self
. start_execution_setup
. send ( ( bob_peer_id , state0 ) )
. await ? ;
self . done_execution_setup
. recv ( )
. await
. ok_or_else ( | | anyhow ! ( "Failed to setup execution with Bob" ) ) ?
}
pub async fn recv_encrypted_signature ( & mut self ) -> Result < EncryptedSignature > {
self . recv_encrypted_signature
. recv ( )
. await
. ok_or_else( | | anyhow ! ( "Failed to receive Bitcoin encrypted signature from Bob" ) )
. context ( "Failed to receive Bitcoin encrypted signature from Bob" )
}
pub async fn recv_swap_request (
& mut self ,
) -> Result < ( SwapRequest , ResponseChannel < SwapResponse > ) > {
self . recv_swap_request
. recv ( )
. await
. ok_or_else ( | | anyhow ! ( "Failed to receive amounts request from Bob" ) )
}
pub async fn send_swap_response (
& mut self ,
channel : ResponseChannel < SwapResponse > ,
swap_response : SwapResponse ,
) -> Result < ( ) > {
let _ = self
. send_swap_response
. send ( ( channel , swap_response ) )
. await ? ;
Ok ( ( ) )
}
pub async fn send_transfer_proof (
& mut self ,
bob : PeerId ,
msg : TransferProof ,
execution_params : ExecutionParams ,
) -> Result < ( ) > {
pub async fn send_transfer_proof ( & mut self , bob : PeerId , msg : TransferProof ) -> Result < ( ) > {
let _ = self . send_transfer_proof . send ( ( bob , msg ) ) . await ? ;
// TODO: Re-evaluate if these acknowledges are necessary at all.
// If we don't use a timeout here and Alice fails to dial Bob she will wait
// indefinitely for this acknowledge.
if timeout (
execution_params . bob_time_to_act ,
self . recv_transfer_proof_ack . recv ( ) ,
)
. await
. is_err ( )
{
error ! ( "Failed to receive transfer proof ack from Bob" )
}
Ok ( ( ) )
}
}
@ -123,65 +79,74 @@ impl EventLoopHandle {
#[ allow(missing_debug_implementations) ]
pub struct EventLoop {
swarm : libp2p ::Swarm < Behaviour > ,
start_execution_setup : Receiver < ( PeerId , State0 ) > ,
done_execution_setup : Sender < Result < State3 > > ,
recv_encrypted_signature : Sender < EncryptedSignature > ,
recv_swap_request : Sender < ( SwapRequest , ResponseChannel < SwapResponse > ) > ,
conn_established : Sender < PeerId > ,
send_swap_response : Receiver < ( ResponseChannel < SwapResponse > , SwapResponse ) > ,
send_transfer_proof : Receiver < ( PeerId , TransferProof ) > ,
recv_transfer_proof_ack : Sender < ( ) > ,
peer_id : PeerId ,
execution_params : ExecutionParams ,
bitcoin_wallet : Arc < bitcoin ::Wallet > ,
monero_wallet : Arc < monero ::Wallet > ,
db : Arc < Database > ,
listen_address : Multiaddr ,
// Amounts agreed upon for swaps currently in the execution setup phase
// Note: We can do one execution setup per peer at a given time.
swap_amounts : HashMap < PeerId , SwapAmounts > ,
recv_encrypted_signature : broadcast ::Sender < EncryptedSignature > ,
send_transfer_proof : mpsc ::Receiver < ( PeerId , TransferProof ) > ,
// Only used to produce new handles
send_transfer_proof_sender : mpsc ::Sender < ( PeerId , TransferProof ) > ,
}
impl EventLoop {
pub fn new (
transport : SwapTransport ,
behaviour : Behaviour ,
listen : Multiaddr ,
peer_id : PeerId ,
) -> Result < ( Self , EventLoopHandle ) > {
listen_address : Multiaddr ,
seed : Seed ,
execution_params : ExecutionParams ,
bitcoin_wallet : Arc < bitcoin ::Wallet > ,
monero_wallet : Arc < monero ::Wallet > ,
db : Arc < Database > ,
) -> Result < Self > {
let identity = network ::Seed ::new ( seed ) . derive_libp2p_identity ( ) ;
let behaviour = Behaviour ::default ( ) ;
let transport = transport ::build ( & identity ) ? ;
let peer_id = PeerId ::from ( identity . public ( ) ) ;
let mut swarm = libp2p ::swarm ::SwarmBuilder ::new ( transport , behaviour , peer_id )
. executor ( Box ::new ( TokioExecutor {
handle : tokio ::runtime ::Handle ::current ( ) ,
} ) )
. build ( ) ;
Swarm ::listen_on ( & mut swarm , listen . clone ( ) )
. with_context ( | | format! ( "Address is not supported: {:#}" , listen ) ) ? ;
Swarm ::listen_on ( & mut swarm , listen _address . clone ( ) )
. with_context ( | | format! ( "Address is not supported: {:#}" , listen _address ) ) ? ;
let start_execution_setup = Channels ::new ( ) ;
let done_execution_setup = Channels ::new ( ) ;
let recv_encrypted_signature = Channels ::new ( ) ;
let request = Channels ::new ( ) ;
let conn_established = Channels ::new ( ) ;
let send_swap_response = Channels ::new ( ) ;
let send_transfer_proof = Channels ::new ( ) ;
let recv_transfer_proof_ack = Channels ::new ( ) ;
let recv_encrypted_signature = BroadcastChannels ::default ( ) ;
let send_transfer_proof = MpscChannels ::default ( ) ;
let driver = EventLoop {
Ok ( EventLoop {
swarm ,
start_execution_setup : start_execution_setup . receiver ,
done_execution_setup : done_execution_setup . sender ,
peer_id ,
execution_params ,
bitcoin_wallet ,
monero_wallet ,
db ,
listen_address ,
swap_amounts : Default ::default ( ) ,
recv_encrypted_signature : recv_encrypted_signature . sender ,
recv_swap_request : request . sender ,
conn_established : conn_established . sender ,
send_swap_response : send_swap_response . receiver ,
send_transfer_proof : send_transfer_proof . receiver ,
recv_transfer_proof_ack : recv_transfer_proof_ack . sender ,
} ;
let handle = EventLoopHandle {
start_execution_setup : start_execution_setup . sender ,
done_execution_setup : done_execution_setup . receiver ,
recv_encrypted_signature : recv_encrypted_signature . receiver ,
recv_swap_request : request . receiver ,
conn_established : conn_established . receiver ,
send_swap_response : send_swap_response . sender ,
send_transfer_proof : send_transfer_proof . sender ,
recv_transfer_proof_ack : recv_transfer_proof_ack . receiver ,
} ;
Ok ( ( driver , handle ) )
send_transfer_proof_sender : send_transfer_proof . sender ,
} )
}
pub fn new_handle ( & self ) -> EventLoopHandle {
EventLoopHandle {
recv_encrypted_signature : self . recv_encrypted_signature . subscribe ( ) ,
send_transfer_proof : self . send_transfer_proof_sender . clone ( ) ,
}
}
pub fn peer_id ( & self ) -> PeerId {
self . peer_id
}
pub async fn run ( & mut self ) {
@ -190,22 +155,21 @@ impl EventLoop {
swarm_event = self . swarm . next ( ) . fuse ( ) = > {
match swarm_event {
OutEvent ::ConnectionEstablished ( alice ) = > {
let _ = self . conn_established . send ( alice ) . await ;
debug ! ( "Connection Established with {}" , alice ) ;
}
OutEvent ::SwapRequest { msg , channel } = > {
let _ = self . recv_swap_request. send ( ( msg , channel ) ) . await ;
OutEvent ::SwapRequest { msg , channel , bob_peer_id } = > {
let _ = self . handle_swap_request( msg , channel , bob_peer_id ) . await ;
}
OutEvent ::ExecutionSetupDone (res ) = > {
let _ = self . done_execution_setup. send ( res . map ( | state | * state ) ) . await ;
OutEvent ::ExecutionSetupDone {bob_peer_id , state3 } = > {
let _ = self . handle_execution_setup_done( bob_peer_id , * state3 ) . await ;
}
OutEvent ::TransferProofAcknowledged = > {
trace ! ( "Bob acknowledged transfer proof" ) ;
let _ = self . recv_transfer_proof_ack . send ( ( ) ) . await ;
}
OutEvent ::EncryptedSignature { msg , channel } = > {
let _ = self . recv_encrypted_signature . send ( * msg ) .await ;
let _ = self . recv_encrypted_signature . send ( * msg ) ;
// Send back empty response so that the request/response protocol completes.
if let Err ( error ) = self . swarm . encrypted_signature. send _ack( channel ) {
if let Err ( error ) = self . swarm . send_ encrypted_signature_ack( channel ) {
error ! ( "Failed to send Encrypted Signature ack: {:?}" , error ) ;
}
}
@ -215,21 +179,6 @@ impl EventLoop {
}
}
} ,
swap_response = self . send_swap_response . recv ( ) . fuse ( ) = > {
if let Some ( ( channel , swap_response ) ) = swap_response {
let _ = self
. swarm
. send_swap_response ( channel , swap_response )
. map_err ( | err | error ! ( "Failed to send swap response: {:#}" , err ) ) ;
}
} ,
option = self . start_execution_setup . recv ( ) . fuse ( ) = > {
if let Some ( ( bob_peer_id , state0 ) ) = option {
let _ = self
. swarm
. start_execution_setup ( bob_peer_id , state0 ) ;
}
} ,
transfer_proof = self . send_transfer_proof . recv ( ) . fuse ( ) = > {
if let Some ( ( bob_peer_id , msg ) ) = transfer_proof {
self . swarm . send_transfer_proof ( bob_peer_id , msg ) ;
@ -238,4 +187,78 @@ impl EventLoop {
}
}
}
async fn handle_swap_request (
& mut self ,
swap_request : SwapRequest ,
channel : ResponseChannel < SwapResponse > ,
bob_peer_id : PeerId ,
) -> Result < ( ) > {
// 1. Check if acceptable request
// 2. Send response
let btc_amount = swap_request . btc_amount ;
let xmr_amount = btc_amount . as_btc ( ) * RATE as f64 ;
let xmr_amount = monero ::Amount ::from_monero ( xmr_amount ) ? ;
let swap_response = SwapResponse { xmr_amount } ;
self . swarm
. send_swap_response ( channel , swap_response )
. context ( "Failed to send swap response" ) ? ;
// 3. Start setup execution
let state0 = State0 ::new (
btc_amount ,
xmr_amount ,
self . execution_params ,
self . bitcoin_wallet . as_ref ( ) ,
& mut OsRng ,
)
. await ? ;
// if a node restart during execution setup, the swap is aborted (safely).
self . swap_amounts . insert ( bob_peer_id , SwapAmounts {
btc : btc_amount ,
xmr : xmr_amount ,
} ) ;
self . swarm . start_execution_setup ( bob_peer_id , state0 ) ;
// Continues once the execution setup protocol is done
Ok ( ( ) )
}
async fn handle_execution_setup_done (
& mut self ,
bob_peer_id : PeerId ,
state3 : State3 ,
) -> Result < ( ) > {
let swap_id = Uuid ::new_v4 ( ) ;
let handle = self . new_handle ( ) ;
let swap_amounts = self . swap_amounts . remove ( & bob_peer_id ) . ok_or_else ( | | {
anyhow ! (
"execution setup done for an unknown peer id: {}, node restarted in between?" ,
bob_peer_id
)
} ) ? ;
let swap = Builder ::new (
self . peer_id ,
self . execution_params ,
swap_id ,
self . bitcoin_wallet . clone ( ) ,
self . monero_wallet . clone ( ) ,
self . db . clone ( ) ,
self . listen_address . clone ( ) ,
handle ,
)
. with_init_params ( swap_amounts , bob_peer_id , state3 )
. build ( )
. await ? ;
tokio ::spawn ( async move { alice ::run ( swap ) . await } ) ;
Ok ( ( ) )
}
}