159: Introduce "one shots" r=D4nte a=D4nte

Instead of Alice sending messages to Bob by replying on a `RequestResponse` channel, she just acks reception on the channel and open her own channel to send messages to Bob.

This means we do not need to carry the channel around anymore and instead carry Bob's peer id.

We take this opportunity to rename messages as per the sequence diagram.

At this stage we only touch message 4 & 5 as they are the one shots messages of the protocol.

Decoupling message 4 from message 3 (still current called 2) will allow the introduction of an await/async behaviour that handles message 1, 2, 3.

To introduce message 1, 2, 3 we will introduced the async/await behaviour using https://github.com/comit-network/rust-libp2p-async-await

First, we need to upgrade tokio and libp2p.

Please note that thanks to this change, Alice now uses Bob's peer id to send message meaning that recovery is improved (as before we could not save the channel to the db).

Co-authored-by: Franck Royer <franck@coblox.tech>
pull/162/head
bors[bot] 3 years ago committed by GitHub
commit 73f89fefda
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -46,10 +46,10 @@ group Execution
Alice ->> Monero: Lock
Alice -> Bob: Message4
Alice -> Bob: TransferProof
note right: xmr lock tx transfer proof\nThis can be removed if Bob watches the blockchain.
Bob -> Alice: Message5
Bob -> Alice: EncryptedSignature
note left: redeem tx enc sig S_a
Alice ->> Bitcoin: Redeem

@ -5,6 +5,7 @@ use crate::{
protocol::{alice, alice::AliceState, SwapAmounts},
};
use ::bitcoin::hashes::core::fmt::Display;
use libp2p::PeerId;
use serde::{Deserialize, Serialize};
// Large enum variant is fine because this is only used for database
@ -16,8 +17,16 @@ pub enum Alice {
amounts: SwapAmounts,
state0: alice::State0,
},
Negotiated(alice::State3),
BtcLocked(alice::State3),
Negotiated {
state3: alice::State3,
#[serde(with = "crate::serde_peer_id")]
bob_peer_id: PeerId,
},
BtcLocked {
state3: alice::State3,
#[serde(with = "crate::serde_peer_id")]
bob_peer_id: PeerId,
},
XmrLocked(alice::State3),
EncSigLearned {
encrypted_signature: EncryptedSignature,
@ -45,8 +54,22 @@ pub enum AliceEndState {
impl From<&AliceState> for Alice {
fn from(alice_state: &AliceState) -> Self {
match alice_state {
AliceState::Negotiated { state3, .. } => Alice::Negotiated(state3.as_ref().clone()),
AliceState::BtcLocked { state3, .. } => Alice::BtcLocked(state3.as_ref().clone()),
AliceState::Negotiated {
state3,
bob_peer_id,
..
} => Alice::Negotiated {
state3: state3.as_ref().clone(),
bob_peer_id: bob_peer_id.clone(),
},
AliceState::BtcLocked {
state3,
bob_peer_id,
..
} => Alice::BtcLocked {
state3: state3.as_ref().clone(),
bob_peer_id: bob_peer_id.clone(),
},
AliceState::XmrLocked { state3 } => Alice::XmrLocked(state3.as_ref().clone()),
AliceState::EncSigLearned {
state3,
@ -82,16 +105,22 @@ impl From<Alice> for AliceState {
fn from(db_state: Alice) -> Self {
match db_state {
Alice::Started { amounts, state0 } => AliceState::Started { amounts, state0 },
Alice::Negotiated(state3) => AliceState::Negotiated {
channel: None,
Alice::Negotiated {
state3,
bob_peer_id,
} => AliceState::Negotiated {
bob_peer_id,
amounts: SwapAmounts {
btc: state3.btc,
xmr: state3.xmr,
},
state3: Box::new(state3),
},
Alice::BtcLocked(state3) => AliceState::BtcLocked {
channel: None,
Alice::BtcLocked {
state3,
bob_peer_id,
} => AliceState::BtcLocked {
bob_peer_id,
amounts: SwapAmounts {
btc: state3.btc,
xmr: state3.xmr,
@ -157,8 +186,8 @@ impl Display for Alice {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Alice::Started { .. } => write!(f, "Started"),
Alice::Negotiated(_) => f.write_str("Negotiated"),
Alice::BtcLocked(_) => f.write_str("Bitcoin locked"),
Alice::Negotiated { .. } => f.write_str("Negotiated"),
Alice::BtcLocked { .. } => f.write_str("Bitcoin locked"),
Alice::XmrLocked(_) => f.write_str("Monero locked"),
Alice::CancelTimelockExpired(_) => f.write_str("Cancel timelock is expired"),
Alice::BtcCancelled(_) => f.write_str("Bitcoin cancel transaction published"),

@ -26,3 +26,4 @@ pub mod seed;
pub mod trace;
mod fs;
mod serde_peer_id;

@ -25,16 +25,18 @@ use tracing::{info, log::LevelFilter};
use uuid::Uuid;
pub mod bitcoin;
mod cli;
pub mod config;
pub mod database;
mod fs;
pub mod monero;
pub mod network;
pub mod protocol;
pub mod seed;
pub mod trace;
mod cli;
mod fs;
mod serde_peer_id;
#[macro_use]
extern crate prettytable;

@ -1,8 +1,8 @@
use crate::protocol::{alice, bob};
use crate::protocol::{alice, alice::TransferProof, bob, bob::EncryptedSignature};
use async_trait::async_trait;
use futures::prelude::*;
use libp2p::{
core::upgrade,
core::{upgrade, upgrade::ReadOneError},
request_response::{ProtocolName, RequestResponseCodec},
};
use serde::{Deserialize, Serialize};
@ -21,21 +21,34 @@ const BUF_SIZE: usize = 1024 * 1024;
/// Messages Bob sends to Alice.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum BobToAlice {
SwapRequest(bob::SwapRequest),
SwapRequest(Box<bob::SwapRequest>),
Message0(Box<bob::Message0>),
Message1(bob::Message1),
Message2(bob::Message2),
Message3(bob::Message3),
Message1(Box<bob::Message1>),
Message2(Box<bob::Message2>),
}
/// Messages Alice sends to Bob.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum AliceToBob {
SwapResponse(alice::SwapResponse),
SwapResponse(Box<alice::SwapResponse>),
Message0(Box<alice::Message0>),
Message1(Box<alice::Message1>),
Message2(alice::Message2),
Message3, // empty response
Message2,
}
/// Messages sent from one party to the other.
/// All responses are empty
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum Request {
TransferProof(Box<TransferProof>),
EncryptedSignature(Box<EncryptedSignature>),
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
/// Response are only used for acknowledgement purposes.
pub enum Response {
TransferProof,
EncryptedSignature,
}
#[derive(Debug, Clone, Copy, Default)]
@ -51,7 +64,10 @@ pub struct Message1Protocol;
pub struct Message2Protocol;
#[derive(Debug, Clone, Copy, Default)]
pub struct Message3Protocol;
pub struct TransferProofProtocol;
#[derive(Debug, Clone, Copy, Default)]
pub struct EncryptedSignatureProtocol;
impl ProtocolName for Swap {
fn protocol_name(&self) -> &[u8] {
@ -77,9 +93,15 @@ impl ProtocolName for Message2Protocol {
}
}
impl ProtocolName for Message3Protocol {
impl ProtocolName for TransferProofProtocol {
fn protocol_name(&self) -> &[u8] {
b"/xmr/btc/transfer_proof/1.0.0"
}
}
impl ProtocolName for EncryptedSignatureProtocol {
fn protocol_name(&self) -> &[u8] {
b"/xmr/btc/message3/1.0.0"
b"/xmr/btc/encrypted_signature/1.0.0"
}
}
@ -171,3 +193,93 @@ where
Ok(())
}
}
#[derive(Clone, Copy, Debug, Default)]
pub struct OneShotCodec<P> {
phantom: PhantomData<P>,
}
#[async_trait]
impl<P> RequestResponseCodec for OneShotCodec<P>
where
P: Send + Sync + Clone + ProtocolName,
{
type Protocol = P;
type Request = Request;
type Response = Response;
async fn read_request<T>(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result<Self::Request>
where
T: AsyncRead + Unpin + Send,
{
debug!("enter read_request");
let message = upgrade::read_one(io, BUF_SIZE).await.map_err(|e| match e {
ReadOneError::Io(err) => err,
e => io::Error::new(io::ErrorKind::Other, e),
})?;
let mut de = serde_cbor::Deserializer::from_slice(&message);
let msg = Request::deserialize(&mut de).map_err(|e| {
tracing::debug!("serde read_request error: {:?}", e);
io::Error::new(io::ErrorKind::Other, e)
})?;
Ok(msg)
}
async fn read_response<T>(
&mut self,
_: &Self::Protocol,
io: &mut T,
) -> io::Result<Self::Response>
where
T: AsyncRead + Unpin + Send,
{
debug!("enter read_response");
let message = upgrade::read_one(io, BUF_SIZE)
.await
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
let mut de = serde_cbor::Deserializer::from_slice(&message);
let msg = Response::deserialize(&mut de).map_err(|e| {
tracing::debug!("serde read_response error: {:?}", e);
io::Error::new(io::ErrorKind::InvalidData, e)
})?;
Ok(msg)
}
async fn write_request<T>(
&mut self,
_: &Self::Protocol,
io: &mut T,
req: Self::Request,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
let bytes =
serde_cbor::to_vec(&req).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
upgrade::write_one(io, &bytes).await?;
Ok(())
}
async fn write_response<T>(
&mut self,
_: &Self::Protocol,
io: &mut T,
res: Self::Response,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
debug!("enter write_response");
let bytes = serde_cbor::to_vec(&res).map_err(|e| {
tracing::debug!("serde write_reponse error: {:?}", e);
io::Error::new(io::ErrorKind::InvalidData, e)
})?;
upgrade::write_one(io, &bytes).await?;
Ok(())
}
}

@ -4,10 +4,10 @@ pub use self::{
event_loop::{EventLoop, EventLoopHandle},
message0::Message0,
message1::Message1,
message2::Message2,
state::*,
swap::{run, run_until},
swap_response::*,
transfer_proof::TransferProof,
};
use crate::{
bitcoin,
@ -21,7 +21,7 @@ use crate::{
transport::build,
Seed as NetworkSeed,
},
protocol::{bob, SwapAmounts},
protocol::{bob, bob::EncryptedSignature, SwapAmounts},
seed::Seed,
};
use anyhow::{bail, Result};
@ -33,15 +33,16 @@ use std::{path::PathBuf, sync::Arc};
use tracing::{debug, info};
use uuid::Uuid;
mod encrypted_signature;
pub mod event_loop;
mod message0;
mod message1;
mod message2;
mod message3;
pub mod state;
mod steps;
pub mod swap;
mod swap_response;
mod transfer_proof;
pub struct Swap {
pub state: AliceState,
@ -232,10 +233,11 @@ pub enum OutEvent {
channel: ResponseChannel<AliceToBob>,
},
Message2 {
msg: bob::Message2,
channel: ResponseChannel<AliceToBob>,
msg: Box<bob::Message2>,
bob_peer_id: PeerId,
},
Message3(bob::Message3),
TransferProof,
EncryptedSignature(EncryptedSignature),
}
impl From<peer_tracker::OutEvent> for OutEvent {
@ -276,15 +278,26 @@ impl From<message1::OutEvent> for OutEvent {
impl From<message2::OutEvent> for OutEvent {
fn from(event: message2::OutEvent) -> Self {
match event {
message2::OutEvent::Msg { msg, channel } => OutEvent::Message2 { msg, channel },
message2::OutEvent::Msg { msg, bob_peer_id } => OutEvent::Message2 {
msg: Box::new(msg),
bob_peer_id,
},
}
}
}
impl From<transfer_proof::OutEvent> for OutEvent {
fn from(event: transfer_proof::OutEvent) -> Self {
match event {
transfer_proof::OutEvent::Msg => OutEvent::TransferProof,
}
}
}
impl From<message3::OutEvent> for OutEvent {
fn from(event: message3::OutEvent) -> Self {
impl From<encrypted_signature::OutEvent> for OutEvent {
fn from(event: encrypted_signature::OutEvent) -> Self {
match event {
message3::OutEvent::Msg(msg) => OutEvent::Message3(msg),
encrypted_signature::OutEvent::Msg(msg) => OutEvent::EncryptedSignature(msg),
}
}
}
@ -299,7 +312,8 @@ pub struct Behaviour {
message0: message0::Behaviour,
message1: message1::Behaviour,
message2: message2::Behaviour,
message3: message3::Behaviour,
transfer_proof: transfer_proof::Behaviour,
encrypted_signature: encrypted_signature::Behaviour,
}
impl Behaviour {
@ -325,9 +339,9 @@ impl Behaviour {
debug!("Sent Message1");
}
/// Send Message2 to Bob in response to receiving his Message2.
pub fn send_message2(&mut self, channel: ResponseChannel<AliceToBob>, msg: Message2) {
self.message2.send(channel, msg);
debug!("Sent Message2");
/// Send Transfer Proof to Bob.
pub fn send_transfer_proof(&mut self, bob: PeerId, msg: TransferProof) {
self.transfer_proof.send(bob, msg);
debug!("Sent Transfer Proof");
}
}

@ -1,6 +1,8 @@
use crate::{
network::request_response::{AliceToBob, BobToAlice, Codec, Message3Protocol, TIMEOUT},
protocol::bob,
network::request_response::{
EncryptedSignatureProtocol, OneShotCodec, Request, Response, TIMEOUT,
},
protocol::bob::EncryptedSignature,
};
use libp2p::{
request_response::{
@ -19,15 +21,16 @@ use tracing::{debug, error};
#[derive(Debug)]
pub enum OutEvent {
Msg(bob::Message3),
Msg(EncryptedSignature),
}
/// A `NetworkBehaviour` that represents receiving of message 3 from Bob.
/// A `NetworkBehaviour` that represents receiving the Bitcoin encrypted
/// signature from Bob.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", poll_method = "poll")]
#[allow(missing_debug_implementations)]
pub struct Behaviour {
rr: RequestResponse<Codec<Message3Protocol>>,
rr: RequestResponse<OneShotCodec<EncryptedSignatureProtocol>>,
#[behaviour(ignore)]
events: VecDeque<OutEvent>,
}
@ -37,7 +40,9 @@ impl Behaviour {
&mut self,
_: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<RequestProtocol<Codec<Message3Protocol>>, OutEvent>> {
) -> Poll<
NetworkBehaviourAction<RequestProtocol<OneShotCodec<EncryptedSignatureProtocol>>, OutEvent>,
> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
@ -54,8 +59,8 @@ impl Default for Behaviour {
Self {
rr: RequestResponse::new(
Codec::default(),
vec![(Message3Protocol, ProtocolSupport::Full)],
OneShotCodec::default(),
vec![(EncryptedSignatureProtocol, ProtocolSupport::Inbound)],
config,
),
events: Default::default(),
@ -63,8 +68,8 @@ impl Default for Behaviour {
}
}
impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>> for Behaviour {
fn inject_event(&mut self, event: RequestResponseEvent<BobToAlice, AliceToBob>) {
impl NetworkBehaviourEventProcess<RequestResponseEvent<Request, Response>> for Behaviour {
fn inject_event(&mut self, event: RequestResponseEvent<Request, Response>) {
match event {
RequestResponseEvent::Message {
message:
@ -73,11 +78,11 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>>
},
..
} => {
if let BobToAlice::Message3(msg) = request {
debug!("Received Message3");
self.events.push_back(OutEvent::Msg(msg));
if let Request::EncryptedSignature(msg) = request {
debug!("Received encrypted signature");
self.events.push_back(OutEvent::Msg(*msg));
// Send back empty response so that the request/response protocol completes.
self.rr.send_response(channel, AliceToBob::Message3);
let _ = self.rr.send_response(channel, Response::EncryptedSignature);
}
}
RequestResponseEvent::Message {

@ -2,8 +2,9 @@ use crate::{
network::{request_response::AliceToBob, transport::SwapTransport, TokioExecutor},
protocol::{
alice,
alice::{Behaviour, OutEvent, SwapResponse},
alice::{Behaviour, OutEvent, SwapResponse, TransferProof},
bob,
bob::EncryptedSignature,
},
};
use anyhow::{anyhow, Context, Result};
@ -12,6 +13,7 @@ use libp2p::{
core::Multiaddr, futures::StreamExt, request_response::ResponseChannel, PeerId, Swarm,
};
use tokio::sync::mpsc::{Receiver, Sender};
use tracing::trace;
#[allow(missing_debug_implementations)]
pub struct Channels<T> {
@ -34,16 +36,16 @@ impl<T> Default for Channels<T> {
#[derive(Debug)]
pub struct EventLoopHandle {
msg0: Receiver<(bob::Message0, ResponseChannel<AliceToBob>)>,
msg1: Receiver<(bob::Message1, ResponseChannel<AliceToBob>)>,
msg2: Receiver<(bob::Message2, ResponseChannel<AliceToBob>)>,
msg3: Receiver<bob::Message3>,
recv_message0: Receiver<(bob::Message0, ResponseChannel<AliceToBob>)>,
recv_message1: Receiver<(bob::Message1, ResponseChannel<AliceToBob>)>,
recv_message2: Receiver<bob::Message2>,
recv_encrypted_signature: Receiver<EncryptedSignature>,
request: Receiver<crate::protocol::alice::swap_response::OutEvent>,
conn_established: Receiver<PeerId>,
send_swap_response: Sender<(ResponseChannel<AliceToBob>, SwapResponse)>,
send_msg0: Sender<(ResponseChannel<AliceToBob>, alice::Message0)>,
send_msg1: Sender<(ResponseChannel<AliceToBob>, alice::Message1)>,
send_msg2: Sender<(ResponseChannel<AliceToBob>, alice::Message2)>,
send_message0: Sender<(ResponseChannel<AliceToBob>, alice::Message0)>,
send_message1: Sender<(ResponseChannel<AliceToBob>, alice::Message1)>,
send_transfer_proof: Sender<(PeerId, TransferProof)>,
}
impl EventLoopHandle {
@ -55,28 +57,28 @@ impl EventLoopHandle {
}
pub async fn recv_message0(&mut self) -> Result<(bob::Message0, ResponseChannel<AliceToBob>)> {
self.msg0
self.recv_message0
.recv()
.await
.ok_or_else(|| anyhow!("Failed to receive message 0 from Bob"))
}
pub async fn recv_message1(&mut self) -> Result<(bob::Message1, ResponseChannel<AliceToBob>)> {
self.msg1
self.recv_message1
.recv()
.await
.ok_or_else(|| anyhow!("Failed to receive message 1 from Bob"))
}
pub async fn recv_message2(&mut self) -> Result<(bob::Message2, ResponseChannel<AliceToBob>)> {
self.msg2
pub async fn recv_message2(&mut self) -> Result<bob::Message2> {
self.recv_message2
.recv()
.await
.ok_or_else(|| anyhow!("Failed o receive message 2 from Bob"))
.ok_or_else(|| anyhow!("Failed to receive message 2 from Bob"))
}
pub async fn recv_message3(&mut self) -> Result<bob::Message3> {
self.msg3
pub async fn recv_encrypted_signature(&mut self) -> Result<EncryptedSignature> {
self.recv_encrypted_signature
.recv()
.await
.ok_or_else(|| anyhow!("Failed to receive Bitcoin encrypted signature from Bob"))
@ -108,7 +110,7 @@ impl EventLoopHandle {
channel: ResponseChannel<AliceToBob>,
msg: alice::Message0,
) -> Result<()> {
let _ = self.send_msg0.send((channel, msg)).await?;
let _ = self.send_message0.send((channel, msg)).await?;
Ok(())
}
@ -117,16 +119,12 @@ impl EventLoopHandle {
channel: ResponseChannel<AliceToBob>,
msg: alice::Message1,
) -> Result<()> {
let _ = self.send_msg1.send((channel, msg)).await?;
let _ = self.send_message1.send((channel, msg)).await?;
Ok(())
}
pub async fn send_message2(
&mut self,
channel: ResponseChannel<AliceToBob>,
msg: alice::Message2,
) -> Result<()> {
let _ = self.send_msg2.send((channel, msg)).await?;
pub async fn send_transfer_proof(&mut self, bob: PeerId, msg: TransferProof) -> Result<()> {
let _ = self.send_transfer_proof.send((bob, msg)).await?;
Ok(())
}
}
@ -134,16 +132,16 @@ impl EventLoopHandle {
#[allow(missing_debug_implementations)]
pub struct EventLoop {
swarm: libp2p::Swarm<Behaviour>,
msg0: Sender<(bob::Message0, ResponseChannel<AliceToBob>)>,
msg1: Sender<(bob::Message1, ResponseChannel<AliceToBob>)>,
msg2: Sender<(bob::Message2, ResponseChannel<AliceToBob>)>,
msg3: Sender<bob::Message3>,
recv_message0: Sender<(bob::Message0, ResponseChannel<AliceToBob>)>,
recv_message1: Sender<(bob::Message1, ResponseChannel<AliceToBob>)>,
recv_message2: Sender<bob::Message2>,
recv_encrypted_signature: Sender<EncryptedSignature>,
request: Sender<crate::protocol::alice::swap_response::OutEvent>,
conn_established: Sender<PeerId>,
send_swap_response: Receiver<(ResponseChannel<AliceToBob>, SwapResponse)>,
send_msg0: Receiver<(ResponseChannel<AliceToBob>, alice::Message0)>,
send_msg1: Receiver<(ResponseChannel<AliceToBob>, alice::Message1)>,
send_msg2: Receiver<(ResponseChannel<AliceToBob>, alice::Message2)>,
send_message0: Receiver<(ResponseChannel<AliceToBob>, alice::Message0)>,
send_message1: Receiver<(ResponseChannel<AliceToBob>, alice::Message1)>,
send_transfer_proof: Receiver<(PeerId, TransferProof)>,
}
impl EventLoop {
@ -162,42 +160,42 @@ impl EventLoop {
Swarm::listen_on(&mut swarm, listen.clone())
.with_context(|| format!("Address is not supported: {:#}", listen))?;
let msg0 = Channels::new();
let msg1 = Channels::new();
let msg2 = Channels::new();
let msg3 = Channels::new();
let recv_message0 = Channels::new();
let recv_message1 = Channels::new();
let recv_message2 = Channels::new();
let recv_encrypted_signature = Channels::new();
let request = Channels::new();
let conn_established = Channels::new();
let send_swap_response = Channels::new();
let send_msg0 = Channels::new();
let send_msg1 = Channels::new();
let send_msg2 = Channels::new();
let send_message0 = Channels::new();
let send_message1 = Channels::new();
let send_transfer_proof = Channels::new();
let driver = EventLoop {
swarm,
msg0: msg0.sender,
msg1: msg1.sender,
msg2: msg2.sender,
msg3: msg3.sender,
recv_message0: recv_message0.sender,
recv_message1: recv_message1.sender,
recv_message2: recv_message2.sender,
recv_encrypted_signature: recv_encrypted_signature.sender,
request: request.sender,
conn_established: conn_established.sender,
send_swap_response: send_swap_response.receiver,
send_msg0: send_msg0.receiver,
send_msg1: send_msg1.receiver,
send_msg2: send_msg2.receiver,
send_message0: send_message0.receiver,
send_message1: send_message1.receiver,
send_transfer_proof: send_transfer_proof.receiver,
};
let handle = EventLoopHandle {
msg0: msg0.receiver,
msg1: msg1.receiver,
msg2: msg2.receiver,
msg3: msg3.receiver,
recv_message0: recv_message0.receiver,
recv_message1: recv_message1.receiver,
recv_message2: recv_message2.receiver,
recv_encrypted_signature: recv_encrypted_signature.receiver,
request: request.receiver,
conn_established: conn_established.receiver,
send_swap_response: send_swap_response.sender,
send_msg0: send_msg0.sender,
send_msg1: send_msg1.sender,
send_msg2: send_msg2.sender,
send_message0: send_message0.sender,
send_message1: send_message1.sender,
send_transfer_proof: send_transfer_proof.sender,
};
Ok((driver, handle))
@ -212,16 +210,17 @@ impl EventLoop {
let _ = self.conn_established.send(alice).await;
}
OutEvent::Message0 { msg, channel } => {
let _ = self.msg0.send((*msg, channel)).await;
let _ = self.recv_message0.send((*msg, channel)).await;
}
OutEvent::Message1 { msg, channel } => {
let _ = self.msg1.send((msg, channel)).await;
let _ = self.recv_message1.send((msg, channel)).await;
}
OutEvent::Message2 { msg, channel } => {
let _ = self.msg2.send((msg, channel)).await;
OutEvent::Message2 { msg, bob_peer_id : _} => {
let _ = self.recv_message2.send(*msg).await;
}
OutEvent::Message3(msg) => {
let _ = self.msg3.send(msg).await;
OutEvent::TransferProof => trace!("Bob ack'd receiving the transfer proof"),
OutEvent::EncryptedSignature(msg) => {
let _ = self.recv_encrypted_signature.send(msg).await;
}
OutEvent::Request(event) => {
let _ = self.request.send(*event).await;
@ -233,19 +232,19 @@ impl EventLoop {
self.swarm.send_swap_response(channel, swap_response);
}
},
msg0 = self.send_msg0.next().fuse() => {
msg0 = self.send_message0.next().fuse() => {
if let Some((channel, msg)) = msg0 {
self.swarm.send_message0(channel, msg);
}
},
msg1 = self.send_msg1.next().fuse() => {
msg1 = self.send_message1.next().fuse() => {
if let Some((channel, msg)) = msg1 {
self.swarm.send_message1(channel, msg);
}
},
msg2 = self.send_msg2.next().fuse() => {
if let Some((channel, msg)) = msg2 {
self.swarm.send_message2(channel, msg);
transfer_proof = self.send_transfer_proof.next().fuse() => {
if let Some((bob_peer_id, msg)) = transfer_proof {
self.swarm.send_transfer_proof(bob_peer_id, msg);
}
},
}

@ -93,7 +93,7 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>>
} => {
if let BobToAlice::Message1(msg) = request {
debug!("Received Message1");
self.events.push_back(OutEvent::Msg { msg, channel });
self.events.push_back(OutEvent::Msg { msg: *msg, channel });
}
}
RequestResponseEvent::Message {

@ -1,17 +1,15 @@
use crate::{
monero,
network::request_response::{AliceToBob, BobToAlice, Codec, Message2Protocol, TIMEOUT},
protocol::bob,
};
use libp2p::{
request_response::{
handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig,
RequestResponseEvent, RequestResponseMessage, ResponseChannel,
RequestResponseEvent, RequestResponseMessage,
},
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters},
NetworkBehaviour,
NetworkBehaviour, PeerId,
};
use serde::{Deserialize, Serialize};
use std::{
collections::VecDeque,
task::{Context, Poll},
@ -22,18 +20,11 @@ use tracing::{debug, error};
#[derive(Debug)]
pub enum OutEvent {
Msg {
/// Received message from Bob.
msg: bob::Message2,
/// Channel to send back Alice's message 2.
channel: ResponseChannel<AliceToBob>,
bob_peer_id: PeerId,
},
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Message2 {
pub tx_lock_proof: monero::TransferProof,
}
/// A `NetworkBehaviour` that represents receiving of message 2 from Bob.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", poll_method = "poll")]
@ -45,11 +36,6 @@ pub struct Behaviour {
}
impl Behaviour {
pub fn send(&mut self, channel: ResponseChannel<AliceToBob>, msg: Message2) {
let msg = AliceToBob::Message2(msg);
self.rr.send_response(channel, msg);
}
fn poll(
&mut self,
_: &mut Context<'_>,
@ -84,15 +70,20 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>>
fn inject_event(&mut self, event: RequestResponseEvent<BobToAlice, AliceToBob>) {
match event {
RequestResponseEvent::Message {
peer,
message:
RequestResponseMessage::Request {
request, channel, ..
},
..
} => {
if let BobToAlice::Message2(msg) = request {
debug!("Received Message2");
self.events.push_back(OutEvent::Msg { msg, channel });
debug!("Received Message 2");
self.events.push_back(OutEvent::Msg {
msg: *msg,
bob_peer_id: peer,
});
// Send back empty response so that the request/response protocol completes.
let _ = self.rr.send_response(channel, AliceToBob::Message2);
}
}
RequestResponseEvent::Message {

@ -8,15 +8,11 @@ use crate::{
},
monero,
monero::CreateWalletForOutput,
network::request_response::AliceToBob,
protocol::{alice, bob, SwapAmounts},
protocol::{alice, alice::TransferProof, bob, bob::EncryptedSignature, SwapAmounts},
};
use anyhow::{anyhow, Context, Result};
use ecdsa_fun::{
adaptor::{Adaptor, EncryptedSignature},
nonce::Deterministic,
};
use libp2p::request_response::ResponseChannel;
use ecdsa_fun::{adaptor::Adaptor, nonce::Deterministic};
use libp2p::PeerId;
use rand::{CryptoRng, RngCore};
use serde::{Deserialize, Serialize};
use sha2::Sha256;
@ -30,12 +26,12 @@ pub enum AliceState {
state0: State0,
},
Negotiated {
channel: Option<ResponseChannel<AliceToBob>>,
bob_peer_id: PeerId,
amounts: SwapAmounts,
state3: Box<State3>,
},
BtcLocked {
channel: Option<ResponseChannel<AliceToBob>>,
bob_peer_id: PeerId,
amounts: SwapAmounts,
state3: Box<State3>,
},
@ -43,7 +39,7 @@ pub enum AliceState {
state3: Box<State3>,
},
EncSigLearned {
encrypted_signature: EncryptedSignature,
encrypted_signature: bitcoin::EncryptedSignature,
state3: Box<State3>,
},
BtcRedeemed,
@ -476,13 +472,13 @@ pub struct State5 {
}
impl State5 {
pub fn next_message(&self) -> alice::Message2 {
alice::Message2 {
pub fn next_message(&self) -> TransferProof {
TransferProof {
tx_lock_proof: self.tx_lock_proof.clone(),
}
}
pub fn receive(self, msg: bob::Message3) -> State6 {
pub fn receive(self, msg: EncryptedSignature) -> State6 {
State6 {
a: self.a,
B: self.B,
@ -554,7 +550,7 @@ pub struct State6 {
tx_lock: bitcoin::TxLock,
tx_punish_sig_bob: bitcoin::Signature,
tx_redeem_encsig: EncryptedSignature,
tx_redeem_encsig: bitcoin::EncryptedSignature,
lock_xmr_fee: monero::Amount,
}

@ -10,10 +10,9 @@ use crate::{
config::Config,
monero,
monero::Transfer,
network::request_response::AliceToBob,
protocol::{
alice,
alice::{event_loop::EventLoopHandle, SwapResponse},
alice::{event_loop::EventLoopHandle, SwapResponse, TransferProof},
SwapAmounts,
},
};
@ -23,7 +22,7 @@ use futures::{
future::{select, Either},
pin_mut,
};
use libp2p::request_response::ResponseChannel;
use libp2p::PeerId;
use rand::rngs::OsRng;
use sha2::Sha256;
use std::sync::Arc;
@ -35,11 +34,11 @@ pub async fn negotiate(
xmr_amount: monero::Amount,
event_loop_handle: &mut EventLoopHandle,
config: Config,
) -> Result<(ResponseChannel<AliceToBob>, alice::State3)> {
) -> Result<(PeerId, alice::State3)> {
trace!("Starting negotiate");
// todo: we can move this out, we dont need to timeout here
let _peer_id = timeout(
let bob_peer_id = timeout(
config.bob_time_to_act,
event_loop_handle.recv_conn_established(),
)
@ -73,12 +72,11 @@ pub async fn negotiate(
.send_message1(channel, state2.next_message())
.await?;
let (bob_message2, channel) =
timeout(config.bob_time_to_act, event_loop_handle.recv_message2()).await??;
let bob_message2 = timeout(config.bob_time_to_act, event_loop_handle.recv_message2()).await??;
let state3 = state2.receive(bob_message2)?;
Ok((channel, state3))
Ok((bob_peer_id, state3))
}
// TODO(Franck): Use helper functions from xmr-btc instead of re-writing them
@ -108,7 +106,7 @@ where
}
pub async fn lock_xmr<W>(
channel: ResponseChannel<AliceToBob>,
bob_peer_id: PeerId,
amounts: SwapAmounts,
state3: alice::State3,
event_loop_handle: &mut EventLoopHandle,
@ -134,7 +132,7 @@ where
// Otherwise Alice might publish the lock tx twice!
event_loop_handle
.send_message2(channel, alice::Message2 {
.send_transfer_proof(bob_peer_id, TransferProof {
tx_lock_proof: transfer_proof,
})
.await?;
@ -146,7 +144,7 @@ pub async fn wait_for_bitcoin_encrypted_signature(
event_loop_handle: &mut EventLoopHandle,
) -> Result<EncryptedSignature> {
let msg3 = event_loop_handle
.recv_message3()
.recv_encrypted_signature()
.await
.context("Failed to receive Bitcoin encrypted signature from Bob")?;

@ -91,11 +91,11 @@ async fn run_until_internal(
} else {
match state {
AliceState::Started { amounts, state0 } => {
let (channel, state3) =
let (bob_peer_id, state3) =
negotiate(state0, amounts.xmr, &mut event_loop_handle, config).await?;
let state = AliceState::Negotiated {
channel: Some(channel),
bob_peer_id,
amounts,
state3: Box::new(state3),
};
@ -117,30 +117,17 @@ async fn run_until_internal(
}
AliceState::Negotiated {
state3,
channel,
bob_peer_id,
amounts,
} => {
let state = match channel {
Some(channel) => {
let _ = wait_for_locked_bitcoin(
state3.tx_lock.txid(),
bitcoin_wallet.clone(),
config,
)
let _ =
wait_for_locked_bitcoin(state3.tx_lock.txid(), bitcoin_wallet.clone(), config)
.await?;
AliceState::BtcLocked {
channel: Some(channel),
amounts,
state3,
}
}
None => {
tracing::info!("Cannot resume swap from negotiated state, aborting");
// Alice did not lock Xmr yet
AliceState::SafelyAborted
}
let state = AliceState::BtcLocked {
bob_peer_id,
amounts,
state3,
};
let db_state = (&state).into();
@ -159,30 +146,20 @@ async fn run_until_internal(
.await
}
AliceState::BtcLocked {
channel,
bob_peer_id,
amounts,
state3,
} => {
let state = match channel {
Some(channel) => {
lock_xmr(
channel,
amounts,
*state3.clone(),
&mut event_loop_handle,
monero_wallet.clone(),
)
.await?;
AliceState::XmrLocked { state3 }
}
None => {
tracing::info!("Cannot resume swap from BTC locked state, aborting");
lock_xmr(
bob_peer_id,
amounts,
*state3.clone(),
&mut event_loop_handle,
monero_wallet.clone(),
)
.await?;
// Alice did not lock Xmr yet
AliceState::SafelyAborted
}
};
let state = AliceState::XmrLocked { state3 };
let db_state = (&state).into();
db.insert_latest_state(swap_id, database::Swap::Alice(db_state))

@ -45,7 +45,7 @@ pub struct Behaviour {
impl Behaviour {
/// Alice always sends her messages as a response to a request from Bob.
pub fn send(&mut self, channel: ResponseChannel<AliceToBob>, msg: SwapResponse) {
let msg = AliceToBob::SwapResponse(msg);
let msg = AliceToBob::SwapResponse(Box::new(msg));
self.rr.send_response(channel, msg);
}
@ -92,7 +92,7 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>>
} => {
if let BobToAlice::SwapRequest(msg) = request {
debug!("Received swap request");
self.events.push_back(OutEvent { msg, channel })
self.events.push_back(OutEvent { msg: *msg, channel })
}
}
RequestResponseEvent::Message {

@ -0,0 +1,102 @@
use crate::{
monero,
network::request_response::{OneShotCodec, Request, Response, TransferProofProtocol, TIMEOUT},
};
use libp2p::{
request_response::{
handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig,
RequestResponseEvent, RequestResponseMessage,
},
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters},
NetworkBehaviour, PeerId,
};
use serde::{Deserialize, Serialize};
use std::{
collections::VecDeque,
task::{Context, Poll},
time::Duration,
};
use tracing::error;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TransferProof {
pub tx_lock_proof: monero::TransferProof,
}
#[derive(Debug, Copy, Clone)]
pub enum OutEvent {
Msg,
}
/// A `NetworkBehaviour` that represents sending the Monero transfer proof to
/// Bob.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", poll_method = "poll")]
#[allow(missing_debug_implementations)]
pub struct Behaviour {
rr: RequestResponse<OneShotCodec<TransferProofProtocol>>,
#[behaviour(ignore)]
events: VecDeque<OutEvent>,
}
impl Behaviour {
pub fn send(&mut self, bob: PeerId, msg: TransferProof) {
let msg = Request::TransferProof(Box::new(msg));
let _id = self.rr.send_request(&bob, msg);
}
fn poll(
&mut self,
_: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<RequestProtocol<OneShotCodec<TransferProofProtocol>>, OutEvent>>
{
if let Some(event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
Poll::Pending
}
}
impl Default for Behaviour {
fn default() -> Self {
let timeout = Duration::from_secs(TIMEOUT);
let mut config = RequestResponseConfig::default();
config.set_request_timeout(timeout);
Self {
rr: RequestResponse::new(
OneShotCodec::default(),
vec![(TransferProofProtocol, ProtocolSupport::Outbound)],
config,
),
events: Default::default(),
}
}
}
impl NetworkBehaviourEventProcess<RequestResponseEvent<Request, Response>> for Behaviour {
fn inject_event(&mut self, event: RequestResponseEvent<Request, Response>) {
match event {
RequestResponseEvent::Message {
message: RequestResponseMessage::Request { .. },
..
} => panic!("Alice should never get a transfer proof request from Bob"),
RequestResponseEvent::Message {
message: RequestResponseMessage::Response { response, .. },
..
} => {
if let Response::TransferProof = response {
self.events.push_back(OutEvent::Msg);
}
}
RequestResponseEvent::InboundFailure { error, .. } => {
error!("Inbound failure: {:?}", error);
}
RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error);
}
}
}
}

@ -2,7 +2,6 @@
//! Bob holds BTC and wishes receive XMR.
use crate::{
bitcoin,
bitcoin::EncryptedSignature,
config::Config,
database,
database::Database,
@ -22,24 +21,26 @@ use tracing::{debug, info};
use uuid::Uuid;
pub use self::{
encrypted_signature::EncryptedSignature,
event_loop::{EventLoop, EventLoopHandle},
message0::Message0,
message1::Message1,
message2::Message2,
message3::Message3,
state::*,
swap::{run, run_until},
swap_request::*,
};
use crate::protocol::alice::TransferProof;
mod encrypted_signature;
pub mod event_loop;
mod message0;
mod message1;
mod message2;
mod message3;
pub mod state;
pub mod swap;
mod swap_request;
mod transfer_proof;
pub struct Swap {
pub state: BobState,
@ -210,8 +211,9 @@ pub enum OutEvent {
SwapResponse(alice::SwapResponse),
Message0(Box<alice::Message0>),
Message1(Box<alice::Message1>),
Message2(alice::Message2),
Message3,
Message2,
TransferProof(Box<TransferProof>),
EncryptedSignature,
}
impl From<peer_tracker::OutEvent> for OutEvent {
@ -249,15 +251,23 @@ impl From<message1::OutEvent> for OutEvent {
impl From<message2::OutEvent> for OutEvent {
fn from(event: message2::OutEvent) -> Self {
match event {
message2::OutEvent::Msg(msg) => OutEvent::Message2(msg),
message2::OutEvent::Msg => OutEvent::Message2,
}
}
}
impl From<message3::OutEvent> for OutEvent {
fn from(event: message3::OutEvent) -> Self {
impl From<transfer_proof::OutEvent> for OutEvent {
fn from(event: transfer_proof::OutEvent) -> Self {
match event {
message3::OutEvent::Msg => OutEvent::Message3,
transfer_proof::OutEvent::Msg(msg) => OutEvent::TransferProof(Box::new(msg)),
}
}
}
impl From<encrypted_signature::OutEvent> for OutEvent {
fn from(event: encrypted_signature::OutEvent) -> Self {
match event {
encrypted_signature::OutEvent::Msg => OutEvent::EncryptedSignature,
}
}
}
@ -272,7 +282,8 @@ pub struct Behaviour {
message0: message0::Behaviour,
message1: message1::Behaviour,
message2: message2::Behaviour,
message3: message3::Behaviour,
transfer_proof: transfer_proof::Behaviour,
encrypted_signature: encrypted_signature::Behaviour,
}
impl Behaviour {
@ -301,9 +312,13 @@ impl Behaviour {
}
/// Sends Bob's fourth message to Alice.
pub fn send_message3(&mut self, alice: PeerId, tx_redeem_encsig: EncryptedSignature) {
let msg = bob::Message3 { tx_redeem_encsig };
self.message3.send(alice, msg);
pub fn send_encrypted_signature(
&mut self,
alice: PeerId,
tx_redeem_encsig: bitcoin::EncryptedSignature,
) {
let msg = EncryptedSignature { tx_redeem_encsig };
self.encrypted_signature.send(alice, msg);
debug!("Sent Message3");
}

@ -1,6 +1,5 @@
use crate::{
bitcoin::EncryptedSignature,
network::request_response::{AliceToBob, BobToAlice, Codec, Message3Protocol, TIMEOUT},
use crate::network::request_response::{
EncryptedSignatureProtocol, OneShotCodec, Request, Response, TIMEOUT,
};
use libp2p::{
request_response::{
@ -19,8 +18,8 @@ use std::{
use tracing::error;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Message3 {
pub tx_redeem_encsig: EncryptedSignature,
pub struct EncryptedSignature {
pub tx_redeem_encsig: crate::bitcoin::EncryptedSignature,
}
#[derive(Debug, Copy, Clone)]
@ -28,19 +27,19 @@ pub enum OutEvent {
Msg,
}
/// A `NetworkBehaviour` that represents sending message 3 to Alice.
/// A `NetworkBehaviour` that represents sending encrypted signature to Alice.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", poll_method = "poll")]
#[allow(missing_debug_implementations)]
pub struct Behaviour {
rr: RequestResponse<Codec<Message3Protocol>>,
rr: RequestResponse<OneShotCodec<EncryptedSignatureProtocol>>,
#[behaviour(ignore)]
events: VecDeque<OutEvent>,
}
impl Behaviour {
pub fn send(&mut self, alice: PeerId, msg: Message3) {
let msg = BobToAlice::Message3(msg);
pub fn send(&mut self, alice: PeerId, msg: EncryptedSignature) {
let msg = Request::EncryptedSignature(Box::new(msg));
let _id = self.rr.send_request(&alice, msg);
}
@ -48,7 +47,9 @@ impl Behaviour {
&mut self,
_: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<RequestProtocol<Codec<Message3Protocol>>, OutEvent>> {
) -> Poll<
NetworkBehaviourAction<RequestProtocol<OneShotCodec<EncryptedSignatureProtocol>>, OutEvent>,
> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
@ -65,8 +66,8 @@ impl Default for Behaviour {
Self {
rr: RequestResponse::new(
Codec::default(),
vec![(Message3Protocol, ProtocolSupport::Full)],
OneShotCodec::default(),
vec![(EncryptedSignatureProtocol, ProtocolSupport::Outbound)],
config,
),
events: Default::default(),
@ -74,8 +75,8 @@ impl Default for Behaviour {
}
}
impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>> for Behaviour {
fn inject_event(&mut self, event: RequestResponseEvent<BobToAlice, AliceToBob>) {
impl NetworkBehaviourEventProcess<RequestResponseEvent<Request, Response>> for Behaviour {
fn inject_event(&mut self, event: RequestResponseEvent<Request, Response>) {
match event {
RequestResponseEvent::Message {
message: RequestResponseMessage::Request { .. },
@ -85,7 +86,7 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>>
message: RequestResponseMessage::Response { response, .. },
..
} => {
if let AliceToBob::Message3 = response {
if let Response::EncryptedSignature = response {
self.events.push_back(OutEvent::Msg);
}
}

@ -3,7 +3,7 @@ use crate::{
network::{transport::SwapTransport, TokioExecutor},
protocol::{
alice,
alice::SwapResponse,
alice::{SwapResponse, TransferProof},
bob::{self, Behaviour, OutEvent, SwapRequest},
},
};
@ -37,46 +37,46 @@ impl<T> Default for Channels<T> {
#[derive(Debug)]
pub struct EventLoopHandle {
swap_response: Receiver<SwapResponse>,
msg0: Receiver<alice::Message0>,
msg1: Receiver<alice::Message1>,
msg2: Receiver<alice::Message2>,
recv_swap_response: Receiver<SwapResponse>,
recv_message0: Receiver<alice::Message0>,
recv_message1: Receiver<alice::Message1>,
recv_transfer_proof: Receiver<TransferProof>,
conn_established: Receiver<PeerId>,
dial_alice: Sender<()>,
send_swap_request: Sender<SwapRequest>,
send_msg0: Sender<bob::Message0>,
send_msg1: Sender<bob::Message1>,
send_msg2: Sender<bob::Message2>,
send_msg3: Sender<EncryptedSignature>,
send_message0: Sender<bob::Message0>,
send_message1: Sender<bob::Message1>,
send_message2: Sender<bob::Message2>,
send_encrypted_signature: Sender<EncryptedSignature>,
}
impl EventLoopHandle {
pub async fn recv_swap_response(&mut self) -> Result<SwapResponse> {
self.swap_response
self.recv_swap_response
.recv()
.await
.ok_or_else(|| anyhow!("Failed to receive swap response from Alice"))
}
pub async fn recv_message0(&mut self) -> Result<alice::Message0> {
self.msg0
self.recv_message0
.recv()
.await
.ok_or_else(|| anyhow!("Failed to receive message 0 from Alice"))
}
pub async fn recv_message1(&mut self) -> Result<alice::Message1> {
self.msg1
self.recv_message1
.recv()
.await
.ok_or_else(|| anyhow!("Failed to receive message 1 from Alice"))
}
pub async fn recv_message2(&mut self) -> Result<alice::Message2> {
self.msg2
pub async fn recv_transfer_proof(&mut self) -> Result<TransferProof> {
self.recv_transfer_proof
.recv()
.await
.ok_or_else(|| anyhow!("Failed o receive message 2 from Alice"))
.ok_or_else(|| anyhow!("Failed to receive transfer proof from Alice"))
}
/// Dials other party and wait for the connection to be established.
@ -99,22 +99,25 @@ impl EventLoopHandle {
}
pub async fn send_message0(&mut self, msg: bob::Message0) -> Result<()> {
let _ = self.send_msg0.send(msg).await?;
let _ = self.send_message0.send(msg).await?;
Ok(())
}
pub async fn send_message1(&mut self, msg: bob::Message1) -> Result<()> {
let _ = self.send_msg1.send(msg).await?;
let _ = self.send_message1.send(msg).await?;
Ok(())
}
pub async fn send_message2(&mut self, msg: bob::Message2) -> Result<()> {
let _ = self.send_msg2.send(msg).await?;
let _ = self.send_message2.send(msg).await?;
Ok(())
}
pub async fn send_message3(&mut self, tx_redeem_encsig: EncryptedSignature) -> Result<()> {
let _ = self.send_msg3.send(tx_redeem_encsig).await?;
pub async fn send_encrypted_signature(
&mut self,
tx_redeem_encsig: EncryptedSignature,
) -> Result<()> {
let _ = self.send_encrypted_signature.send(tx_redeem_encsig).await?;
Ok(())
}
}
@ -123,17 +126,17 @@ impl EventLoopHandle {
pub struct EventLoop {
swarm: libp2p::Swarm<Behaviour>,
alice_peer_id: PeerId,
swap_response: Sender<SwapResponse>,
msg0: Sender<alice::Message0>,
msg1: Sender<alice::Message1>,
msg2: Sender<alice::Message2>,
conn_established: Sender<PeerId>,
recv_swap_response: Sender<SwapResponse>,
recv_message0: Sender<alice::Message0>,
recv_message1: Sender<alice::Message1>,
recv_transfer_proof: Sender<TransferProof>,
dial_alice: Receiver<()>,
conn_established: Sender<PeerId>,
send_swap_request: Receiver<SwapRequest>,
send_msg0: Receiver<bob::Message0>,
send_msg1: Receiver<bob::Message1>,
send_msg2: Receiver<bob::Message2>,
send_msg3: Receiver<EncryptedSignature>,
send_message0: Receiver<bob::Message0>,
send_message1: Receiver<bob::Message1>,
send_message2: Receiver<bob::Message2>,
send_encrypted_signature: Receiver<EncryptedSignature>,
}
impl EventLoop {
@ -153,45 +156,45 @@ impl EventLoop {
swarm.add_address(alice_peer_id.clone(), alice_addr);
let swap_response = Channels::new();
let msg0 = Channels::new();
let msg1 = Channels::new();
let msg2 = Channels::new();
let conn_established = Channels::new();
let recv_message0 = Channels::new();
let recv_message1 = Channels::new();
let recv_transfer_proof = Channels::new();
let dial_alice = Channels::new();
let conn_established = Channels::new();
let send_swap_request = Channels::new();
let send_msg0 = Channels::new();
let send_msg1 = Channels::new();
let send_msg2 = Channels::new();
let send_msg3 = Channels::new();
let send_message0 = Channels::new();
let send_message1 = Channels::new();
let send_message2 = Channels::new();
let send_encrypted_signature = Channels::new();
let event_loop = EventLoop {
swarm,
alice_peer_id,
swap_response: swap_response.sender,
msg0: msg0.sender,
msg1: msg1.sender,
msg2: msg2.sender,
recv_swap_response: swap_response.sender,
recv_message0: recv_message0.sender,
recv_message1: recv_message1.sender,
recv_transfer_proof: recv_transfer_proof.sender,
conn_established: conn_established.sender,
dial_alice: dial_alice.receiver,
send_swap_request: send_swap_request.receiver,
send_msg0: send_msg0.receiver,
send_msg1: send_msg1.receiver,
send_msg2: send_msg2.receiver,
send_msg3: send_msg3.receiver,
send_message0: send_message0.receiver,
send_message1: send_message1.receiver,
send_message2: send_message2.receiver,
send_encrypted_signature: send_encrypted_signature.receiver,
};
let handle = EventLoopHandle {
swap_response: swap_response.receiver,
msg0: msg0.receiver,
msg1: msg1.receiver,
msg2: msg2.receiver,
recv_swap_response: swap_response.receiver,
recv_message0: recv_message0.receiver,
recv_message1: recv_message1.receiver,
recv_transfer_proof: recv_transfer_proof.receiver,
conn_established: conn_established.receiver,
dial_alice: dial_alice.sender,
send_swap_request: send_swap_request.sender,
send_msg0: send_msg0.sender,
send_msg1: send_msg1.sender,
send_msg2: send_msg2.sender,
send_msg3: send_msg3.sender,
send_message0: send_message0.sender,
send_message1: send_message1.sender,
send_message2: send_message2.sender,
send_encrypted_signature: send_encrypted_signature.sender,
};
Ok((event_loop, handle))
@ -206,18 +209,19 @@ impl EventLoop {
let _ = self.conn_established.send(peer_id).await;
}
OutEvent::SwapResponse(msg) => {
let _ = self.swap_response.send(msg).await;
let _ = self.recv_swap_response.send(msg).await;
},
OutEvent::Message0(msg) => {
let _ = self.msg0.send(*msg).await;
let _ = self.recv_message0.send(*msg).await;
}
OutEvent::Message1(msg) => {
let _ = self.msg1.send(*msg).await;
let _ = self.recv_message1.send(*msg).await;
}
OutEvent::Message2(msg) => {
let _ = self.msg2.send(msg).await;
OutEvent::Message2 => info!("Alice acknowledged message 2 received"),
OutEvent::TransferProof(msg) => {
let _ = self.recv_transfer_proof.send(*msg).await;
}
OutEvent::Message3 => info!("Alice acknowledged message 3 received"),
OutEvent::EncryptedSignature => info!("Alice acknowledged encrypted signature received"),
}
},
option = self.dial_alice.next().fuse() => {
@ -242,25 +246,25 @@ impl EventLoop {
}
},
msg0 = self.send_msg0.next().fuse() => {
msg0 = self.send_message0.next().fuse() => {
if let Some(msg) = msg0 {
self.swarm.send_message0(self.alice_peer_id.clone(), msg);
}
}
msg1 = self.send_msg1.next().fuse() => {
msg1 = self.send_message1.next().fuse() => {
if let Some(msg) = msg1 {
self.swarm.send_message1(self.alice_peer_id.clone(), msg);
}
},
msg2 = self.send_msg2.next().fuse() => {
msg2 = self.send_message2.next().fuse() => {
if let Some(msg) = msg2 {
self.swarm.send_message2(self.alice_peer_id.clone(), msg);
}
},
msg3 = self.send_msg3.next().fuse() => {
if let Some(tx_redeem_encsig) = msg3 {
self.swarm.send_message3(self.alice_peer_id.clone(), tx_redeem_encsig);
encrypted_signature = self.send_encrypted_signature.next().fuse() => {
if let Some(tx_redeem_encsig) = encrypted_signature {
self.swarm.send_encrypted_signature(self.alice_peer_id.clone(), tx_redeem_encsig);
}
}
}

@ -41,7 +41,7 @@ pub struct Behaviour {
impl Behaviour {
pub fn send(&mut self, alice: PeerId, msg: Message1) {
let msg = BobToAlice::Message1(msg);
let msg = BobToAlice::Message1(Box::new(msg));
let _id = self.rr.send_request(&alice, msg);
}

@ -1,7 +1,4 @@
use crate::{
network::request_response::{AliceToBob, BobToAlice, Codec, Message2Protocol, TIMEOUT},
protocol::alice,
};
use crate::network::request_response::{AliceToBob, BobToAlice, Codec, Message2Protocol, TIMEOUT};
use ecdsa_fun::Signature;
use libp2p::{
request_response::{
@ -25,9 +22,9 @@ pub struct Message2 {
pub(crate) tx_cancel_sig: Signature,
}
#[derive(Debug)]
#[derive(Clone, Copy, Debug)]
pub enum OutEvent {
Msg(alice::Message2),
Msg,
}
/// A `NetworkBehaviour` that represents sending message 2 to Alice.
@ -42,7 +39,7 @@ pub struct Behaviour {
impl Behaviour {
pub fn send(&mut self, alice: PeerId, msg: Message2) {
let msg = BobToAlice::Message2(msg);
let msg = BobToAlice::Message2(Box::new(msg));
let _id = self.rr.send_request(&alice, msg);
}
@ -87,9 +84,9 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>>
message: RequestResponseMessage::Response { response, .. },
..
} => {
if let AliceToBob::Message2(msg) = response {
debug!("Received Message2");
self.events.push_back(OutEvent::Msg(msg));
if let AliceToBob::Message2 = response {
debug!("Received Message 2 acknowledgement");
self.events.push_back(OutEvent::Msg);
}
}
RequestResponseEvent::InboundFailure { error, .. } => {

@ -9,14 +9,10 @@ use crate::{
config::Config,
monero,
monero::{monero_private_key, TransferProof},
protocol::{alice, bob, SwapAmounts},
protocol::{alice, bob, bob::EncryptedSignature, SwapAmounts},
};
use anyhow::{anyhow, Result};
use ecdsa_fun::{
adaptor::{Adaptor, EncryptedSignature},
nonce::Deterministic,
Signature,
};
use ecdsa_fun::{adaptor::Adaptor, nonce::Deterministic, Signature};
use monero_harness::rpc::wallet::BlockHeight;
use rand::{CryptoRng, RngCore};
use serde::{Deserialize, Serialize};
@ -244,7 +240,7 @@ pub struct State2 {
pub punish_address: bitcoin::Address,
pub tx_lock: bitcoin::TxLock,
pub tx_cancel_sig_a: Signature,
pub tx_refund_encsig: EncryptedSignature,
pub tx_refund_encsig: bitcoin::EncryptedSignature,
pub min_monero_confirmations: u32,
}
@ -313,7 +309,7 @@ pub struct State3 {
punish_address: bitcoin::Address,
pub tx_lock: bitcoin::TxLock,
pub tx_cancel_sig_a: Signature,
pub tx_refund_encsig: EncryptedSignature,
pub tx_refund_encsig: bitcoin::EncryptedSignature,
pub min_monero_confirmations: u32,
}
@ -433,19 +429,19 @@ pub struct State4 {
punish_address: bitcoin::Address,
pub tx_lock: bitcoin::TxLock,
pub tx_cancel_sig_a: Signature,
pub tx_refund_encsig: EncryptedSignature,
pub tx_refund_encsig: bitcoin::EncryptedSignature,
pub monero_wallet_restore_blockheight: u32,
}
impl State4 {
pub fn next_message(&self) -> bob::Message3 {
pub fn next_message(&self) -> EncryptedSignature {
let tx_redeem = bitcoin::TxRedeem::new(&self.tx_lock, &self.redeem_address);
let tx_redeem_encsig = self.b.encsign(self.S_a_bitcoin, tx_redeem.digest());
bob::Message3 { tx_redeem_encsig }
EncryptedSignature { tx_redeem_encsig }
}
pub fn tx_redeem_encsig(&self) -> EncryptedSignature {
pub fn tx_redeem_encsig(&self) -> bitcoin::EncryptedSignature {
let tx_redeem = bitcoin::TxRedeem::new(&self.tx_lock, &self.redeem_address);
self.b.encsign(self.S_a_bitcoin, tx_redeem.digest())
}
@ -615,7 +611,7 @@ pub struct State5 {
pub redeem_address: bitcoin::Address,
punish_address: bitcoin::Address,
pub tx_lock: bitcoin::TxLock,
tx_refund_encsig: EncryptedSignature,
tx_refund_encsig: bitcoin::EncryptedSignature,
tx_cancel_sig: Signature,
pub monero_wallet_restore_blockheight: u32,
}

@ -130,7 +130,7 @@ where
{
event_loop_handle.dial().await?;
let msg2_watcher = event_loop_handle.recv_message2();
let transfer_proof_watcher = event_loop_handle.recv_transfer_proof();
let cancel_timelock_expires =
state3.wait_for_cancel_timelock_to_expire(bitcoin_wallet.as_ref());
@ -143,13 +143,11 @@ where
monero_wallet.inner.block_height().await?;
select! {
msg2 = msg2_watcher => {
let msg2 = msg2?;
transfer_proof = transfer_proof_watcher => {
let transfer_proof = transfer_proof?;
BobState::XmrLockProofReceived {
state: state3,
lock_transfer_proof: msg2.tx_lock_proof,
lock_transfer_proof: transfer_proof.tx_lock_proof,
monero_wallet_restore_blockheight
}
},
@ -235,7 +233,8 @@ where
let state4_clone = state.clone();
let enc_sig_sent_watcher = event_loop_handle.send_message3(tx_redeem_encsig);
let enc_sig_sent_watcher =
event_loop_handle.send_encrypted_signature(tx_redeem_encsig);
let bitcoin_wallet = bitcoin_wallet.clone();
let cancel_timelock_expires =
state4_clone.wait_for_cancel_timelock_to_expire(bitcoin_wallet.as_ref());

@ -42,7 +42,7 @@ pub struct Behaviour {
impl Behaviour {
pub fn send(&mut self, alice: PeerId, swap_request: SwapRequest) -> Result<RequestId> {
let msg = BobToAlice::SwapRequest(swap_request);
let msg = BobToAlice::SwapRequest(Box::new(swap_request));
let id = self.rr.send_request(&alice, msg);
Ok(id)
@ -71,7 +71,7 @@ impl Default for Behaviour {
Self {
rr: RequestResponse::new(
Codec::default(),
vec![(Swap, ProtocolSupport::Full)],
vec![(Swap, ProtocolSupport::Outbound)],
config,
),
events: Default::default(),
@ -92,7 +92,9 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>>
} => {
if let AliceToBob::SwapResponse(swap_response) = response {
debug!("Received swap response");
self.events.push_back(OutEvent { swap_response });
self.events.push_back(OutEvent {
swap_response: *swap_response,
});
}
}
RequestResponseEvent::InboundFailure { error, .. } => {

@ -0,0 +1,97 @@
use crate::{
network::request_response::{OneShotCodec, Request, Response, TransferProofProtocol, TIMEOUT},
protocol::alice::TransferProof,
};
use libp2p::{
request_response::{
handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig,
RequestResponseEvent, RequestResponseMessage,
},
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters},
NetworkBehaviour,
};
use std::{
collections::VecDeque,
task::{Context, Poll},
time::Duration,
};
use tracing::{debug, error};
#[derive(Debug)]
pub enum OutEvent {
Msg(TransferProof),
}
/// A `NetworkBehaviour` that represents receiving the transfer proof from
/// Alice.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", poll_method = "poll")]
#[allow(missing_debug_implementations)]
pub struct Behaviour {
rr: RequestResponse<OneShotCodec<TransferProofProtocol>>,
#[behaviour(ignore)]
events: VecDeque<OutEvent>,
}
impl Behaviour {
fn poll(
&mut self,
_: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<RequestProtocol<OneShotCodec<TransferProofProtocol>>, OutEvent>>
{
if let Some(event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
Poll::Pending
}
}
impl Default for Behaviour {
fn default() -> Self {
let timeout = Duration::from_secs(TIMEOUT);
let mut config = RequestResponseConfig::default();
config.set_request_timeout(timeout);
Self {
rr: RequestResponse::new(
OneShotCodec::default(),
vec![(TransferProofProtocol, ProtocolSupport::Inbound)],
config,
),
events: Default::default(),
}
}
}
impl NetworkBehaviourEventProcess<RequestResponseEvent<Request, Response>> for Behaviour {
fn inject_event(&mut self, event: RequestResponseEvent<Request, Response>) {
match event {
RequestResponseEvent::Message {
message:
RequestResponseMessage::Request {
request, channel, ..
},
..
} => {
if let Request::TransferProof(msg) = request {
debug!("Received Transfer Proof");
self.events.push_back(OutEvent::Msg(*msg));
// Send back empty response so that the request/response protocol completes.
let _ = self.rr.send_response(channel, Response::TransferProof);
}
}
RequestResponseEvent::Message {
message: RequestResponseMessage::Response { .. },
..
} => panic!("Bob should not get a Response"),
RequestResponseEvent::InboundFailure { error, .. } => {
error!("Inbound failure: {:?}", error);
}
RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error);
}
}
}
}

@ -0,0 +1,48 @@
//! A serde module that defines how we want to serialize PeerIds on the
//! HTTP-API.
use libp2p::PeerId;
use serde::{de::Error, Deserialize, Deserializer, Serializer};
pub fn serialize<S>(peer_id: &PeerId, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let string = peer_id.to_string();
serializer.serialize_str(&string)
}
#[allow(dead_code)]
pub fn deserialize<'de, D>(deserializer: D) -> Result<PeerId, D::Error>
where
D: Deserializer<'de>,
{
let string = String::deserialize(deserializer)?;
let peer_id = string.parse().map_err(D::Error::custom)?;
Ok(peer_id)
}
#[cfg(test)]
mod tests {
use super::*;
use serde::Serialize;
use spectral::prelude::*;
#[derive(Serialize)]
struct SerializablePeerId(#[serde(with = "super")] PeerId);
#[test]
fn maker_id_serializes_as_expected() {
let peer_id = SerializablePeerId(
"QmfUfpC2frwFvcDzpspnfZitHt5wct6n4kpG5jzgRdsxkY"
.parse()
.unwrap(),
);
let got = serde_json::to_string(&peer_id).expect("failed to serialize peer id");
assert_that(&got)
.is_equal_to(r#""QmfUfpC2frwFvcDzpspnfZitHt5wct6n4kpG5jzgRdsxkY""#.to_string());
}
}
Loading…
Cancel
Save