Remove acknowledgements processing

We are aware of issues of timeouts when waiting for acknowledgements.
Also, to properly supports acks in a multiple swap context, we need to
revert to doing event processing on the behaviour so that we can link
leverage the `RequestResponse` libp2p behaviour and link the messages
requests ids to swap ids when receiving an ack or response.

Acks are usefully for specific scenarios where we queue a message on the
behaviour to be sent, save as sent in the DB but crash before the
message is actually sent. With acks we are able to resume the swap,
without ack, the swap will abort (refund).
pull/177/head
Franck Royer 4 years ago
parent cc8b855117
commit bfc19d5628
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4

@ -1,5 +1,4 @@
use crate::{ use crate::{
execution_params::ExecutionParams,
network::{transport, TokioExecutor}, network::{transport, TokioExecutor},
protocol::{ protocol::{
alice::{ alice::{
@ -13,10 +12,7 @@ use anyhow::{Context, Result};
use libp2p::{ use libp2p::{
core::Multiaddr, futures::FutureExt, request_response::ResponseChannel, PeerId, Swarm, core::Multiaddr, futures::FutureExt, request_response::ResponseChannel, PeerId, Swarm,
}; };
use tokio::{ use tokio::sync::{broadcast, mpsc};
sync::{broadcast, mpsc},
time::timeout,
};
use tracing::{debug, error, trace}; use tracing::{debug, error, trace};
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
@ -55,7 +51,6 @@ where
pub struct EventLoopHandle { pub struct EventLoopHandle {
recv_encrypted_signature: broadcast::Receiver<EncryptedSignature>, recv_encrypted_signature: broadcast::Receiver<EncryptedSignature>,
send_transfer_proof: mpsc::Sender<(PeerId, TransferProof)>, send_transfer_proof: mpsc::Sender<(PeerId, TransferProof)>,
recv_transfer_proof_ack: broadcast::Receiver<()>,
} }
impl EventLoopHandle { impl EventLoopHandle {
@ -65,27 +60,9 @@ impl EventLoopHandle {
.await .await
.context("Failed to receive Bitcoin encrypted signature from Bob") .context("Failed to receive Bitcoin encrypted signature from Bob")
} }
pub async fn send_transfer_proof( pub async fn send_transfer_proof(&mut self, bob: PeerId, msg: TransferProof) -> Result<()> {
&mut self,
bob: PeerId,
msg: TransferProof,
execution_params: ExecutionParams,
) -> Result<()> {
let _ = self.send_transfer_proof.send((bob, msg)).await?; let _ = self.send_transfer_proof.send((bob, msg)).await?;
// TODO: Re-evaluate if these acknowledges are necessary at all.
// If we don't use a timeout here and Alice fails to dial Bob she will wait
// indefinitely for this acknowledge.
if timeout(
execution_params.bob_time_to_act,
self.recv_transfer_proof_ack.recv(),
)
.await
.is_err()
{
error!("Failed to receive transfer proof ack from Bob")
}
Ok(()) Ok(())
} }
} }
@ -95,7 +72,6 @@ pub struct EventLoop {
swarm: libp2p::Swarm<Behaviour>, swarm: libp2p::Swarm<Behaviour>,
recv_encrypted_signature: broadcast::Sender<EncryptedSignature>, recv_encrypted_signature: broadcast::Sender<EncryptedSignature>,
send_transfer_proof: mpsc::Receiver<(PeerId, TransferProof)>, send_transfer_proof: mpsc::Receiver<(PeerId, TransferProof)>,
recv_transfer_proof_ack: broadcast::Sender<()>,
// Only used to clone further handles // Only used to clone further handles
handle: EventLoopHandle, handle: EventLoopHandle,
@ -121,26 +97,22 @@ impl EventLoop {
let recv_encrypted_signature = BroadcastChannels::default(); let recv_encrypted_signature = BroadcastChannels::default();
let send_transfer_proof = MpscChannels::default(); let send_transfer_proof = MpscChannels::default();
let recv_transfer_proof_ack = BroadcastChannels::default();
let handle_clone = EventLoopHandle { let handle_clone = EventLoopHandle {
recv_encrypted_signature: recv_encrypted_signature.sender.subscribe(), recv_encrypted_signature: recv_encrypted_signature.sender.subscribe(),
send_transfer_proof: send_transfer_proof.sender.clone(), send_transfer_proof: send_transfer_proof.sender.clone(),
recv_transfer_proof_ack: recv_transfer_proof_ack.sender.subscribe(),
}; };
let driver = EventLoop { let driver = EventLoop {
swarm, swarm,
recv_encrypted_signature: recv_encrypted_signature.sender, recv_encrypted_signature: recv_encrypted_signature.sender,
send_transfer_proof: send_transfer_proof.receiver, send_transfer_proof: send_transfer_proof.receiver,
recv_transfer_proof_ack: recv_transfer_proof_ack.sender,
handle: handle_clone, handle: handle_clone,
}; };
let handle = EventLoopHandle { let handle = EventLoopHandle {
recv_encrypted_signature: recv_encrypted_signature.receiver, recv_encrypted_signature: recv_encrypted_signature.receiver,
send_transfer_proof: send_transfer_proof.sender, send_transfer_proof: send_transfer_proof.sender,
recv_transfer_proof_ack: recv_transfer_proof_ack.receiver,
}; };
Ok((driver, handle)) Ok((driver, handle))
@ -150,7 +122,6 @@ impl EventLoop {
EventLoopHandle { EventLoopHandle {
recv_encrypted_signature: self.recv_encrypted_signature.subscribe(), recv_encrypted_signature: self.recv_encrypted_signature.subscribe(),
send_transfer_proof: self.handle.send_transfer_proof.clone(), send_transfer_proof: self.handle.send_transfer_proof.clone(),
recv_transfer_proof_ack: self.recv_transfer_proof_ack.subscribe(),
} }
} }
@ -170,7 +141,6 @@ impl EventLoop {
} }
OutEvent::TransferProofAcknowledged => { OutEvent::TransferProofAcknowledged => {
trace!("Bob acknowledged transfer proof"); trace!("Bob acknowledged transfer proof");
let _ = self.recv_transfer_proof_ack.send(());
} }
OutEvent::EncryptedSignature{ msg, channel } => { OutEvent::EncryptedSignature{ msg, channel } => {
let _ = self.recv_encrypted_signature.send(*msg); let _ = self.recv_encrypted_signature.send(*msg);

@ -60,7 +60,6 @@ pub async fn lock_xmr<W>(
state3: alice::State3, state3: alice::State3,
event_loop_handle: &mut EventLoopHandle, event_loop_handle: &mut EventLoopHandle,
monero_wallet: Arc<W>, monero_wallet: Arc<W>,
execution_params: ExecutionParams,
) -> Result<()> ) -> Result<()>
where where
W: Transfer, W: Transfer,
@ -82,13 +81,9 @@ where
// Otherwise Alice might publish the lock tx twice! // Otherwise Alice might publish the lock tx twice!
event_loop_handle event_loop_handle
.send_transfer_proof( .send_transfer_proof(bob_peer_id, TransferProof {
bob_peer_id, tx_lock_proof: transfer_proof,
TransferProof { })
tx_lock_proof: transfer_proof,
},
execution_params,
)
.await?; .await?;
Ok(()) Ok(())

@ -157,7 +157,6 @@ async fn run_until_internal(
*state3.clone(), *state3.clone(),
&mut event_loop_handle, &mut event_loop_handle,
monero_wallet.clone(), monero_wallet.clone(),
execution_params,
) )
.await?; .await?;

@ -43,7 +43,6 @@ pub struct EventLoopHandle {
dial_alice: Sender<()>, dial_alice: Sender<()>,
send_swap_request: Sender<SwapRequest>, send_swap_request: Sender<SwapRequest>,
send_encrypted_signature: Sender<EncryptedSignature>, send_encrypted_signature: Sender<EncryptedSignature>,
recv_encrypted_signature_ack: Receiver<()>,
} }
impl EventLoopHandle { impl EventLoopHandle {
@ -95,10 +94,6 @@ impl EventLoopHandle {
) -> Result<()> { ) -> Result<()> {
self.send_encrypted_signature.send(tx_redeem_encsig).await?; self.send_encrypted_signature.send(tx_redeem_encsig).await?;
self.recv_encrypted_signature_ack
.recv()
.await
.ok_or_else(|| anyhow!("Failed to receive encrypted signature ack from Alice"))?;
Ok(()) Ok(())
} }
} }
@ -116,7 +111,6 @@ pub struct EventLoop {
conn_established: Sender<PeerId>, conn_established: Sender<PeerId>,
send_swap_request: Receiver<SwapRequest>, send_swap_request: Receiver<SwapRequest>,
send_encrypted_signature: Receiver<EncryptedSignature>, send_encrypted_signature: Receiver<EncryptedSignature>,
recv_encrypted_signature_ack: Sender<()>,
} }
impl EventLoop { impl EventLoop {
@ -144,7 +138,6 @@ impl EventLoop {
let conn_established = Channels::new(); let conn_established = Channels::new();
let send_swap_request = Channels::new(); let send_swap_request = Channels::new();
let send_encrypted_signature = Channels::new(); let send_encrypted_signature = Channels::new();
let recv_encrypted_signature_ack = Channels::new();
let event_loop = EventLoop { let event_loop = EventLoop {
swarm, swarm,
@ -158,7 +151,6 @@ impl EventLoop {
dial_alice: dial_alice.receiver, dial_alice: dial_alice.receiver,
send_swap_request: send_swap_request.receiver, send_swap_request: send_swap_request.receiver,
send_encrypted_signature: send_encrypted_signature.receiver, send_encrypted_signature: send_encrypted_signature.receiver,
recv_encrypted_signature_ack: recv_encrypted_signature_ack.sender,
}; };
let handle = EventLoopHandle { let handle = EventLoopHandle {
@ -170,7 +162,6 @@ impl EventLoop {
dial_alice: dial_alice.sender, dial_alice: dial_alice.sender,
send_swap_request: send_swap_request.sender, send_swap_request: send_swap_request.sender,
send_encrypted_signature: send_encrypted_signature.sender, send_encrypted_signature: send_encrypted_signature.sender,
recv_encrypted_signature_ack: recv_encrypted_signature_ack.receiver,
}; };
Ok((event_loop, handle)) Ok((event_loop, handle))
@ -199,7 +190,6 @@ impl EventLoop {
} }
OutEvent::EncryptedSignatureAcknowledged => { OutEvent::EncryptedSignatureAcknowledged => {
debug!("Alice acknowledged encrypted signature"); debug!("Alice acknowledged encrypted signature");
let _ = self.recv_encrypted_signature_ack.send(()).await;
} }
OutEvent::ResponseSent => {} OutEvent::ResponseSent => {}
OutEvent::Failure(err) => { OutEvent::Failure(err) => {

Loading…
Cancel
Save