From 3fa4ffa82c9ce25ec8f65687f1edec2fbdbb2976 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Wed, 3 Feb 2021 16:45:43 +1100 Subject: [PATCH] Implement new behaviour for execution setup --- Cargo.lock | 10 +++ docs/sequence.puml | 10 +-- swap/Cargo.toml | 1 + swap/src/network/request_response.rs | 2 +- swap/src/protocol/alice.rs | 16 ++++ swap/src/protocol/alice/event_loop.rs | 34 +++++++- swap/src/protocol/alice/execution_setup.rs | 91 ++++++++++++++++++++ swap/src/protocol/alice/steps.rs | 31 +------ swap/src/protocol/bob.rs | 26 +++++- swap/src/protocol/bob/event_loop.rs | 36 +++++++- swap/src/protocol/bob/execution_setup.rs | 97 ++++++++++++++++++++++ swap/src/protocol/bob/swap.rs | 25 +----- 12 files changed, 318 insertions(+), 61 deletions(-) create mode 100644 swap/src/protocol/alice/execution_setup.rs create mode 100644 swap/src/protocol/bob/execution_setup.rs diff --git a/Cargo.lock b/Cargo.lock index 3853cca9..ed5439a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1637,6 +1637,15 @@ dependencies = [ "wasm-timer", ] +[[package]] +name = "libp2p-async-await" +version = "0.1.0" +source = "git+https://github.com/comit-network/rust-libp2p-async-await?rev=1429cd780204624b4d244e7d8179fe6ff77988c3#1429cd780204624b4d244e7d8179fe6ff77988c3" +dependencies = [ + "libp2p", + "log", +] + [[package]] name = "libp2p-core" version = "0.27.0" @@ -3415,6 +3424,7 @@ dependencies = [ "get-port", "hyper", "libp2p", + "libp2p-async-await", "log", "miniscript", "monero", diff --git a/docs/sequence.puml b/docs/sequence.puml index 11638d4b..3a59aeb4 100644 --- a/docs/sequence.puml +++ b/docs/sequence.puml @@ -20,22 +20,22 @@ end group Execution Setup group Phase A [Messages can be exchanged in any order] - Bob -> Alice: bob::Message0 + Bob -> Alice: Message0 note left: Pubkeys\ndleq proof s_b\nxmr viewkey v_b\nbtc refund addr - Alice -> Bob: alice::Message0 + Alice -> Bob: Message1 note right: Pubkeys\ndleq proof s_a\nxmr view key v_a\nbtc redeem addr\nbtc punish addr end group Phase B [Messages must be exchanged in the given order] - Bob -> Alice: Message1 + Bob -> Alice: Message2 note left: unsigned btc lock tx - Alice -> Bob: Message2 + Alice -> Bob: Message3 note right: btc cancel tx sig\nbtc refund tx enc sig S_b - Bob -> Alice: Message3 + Bob -> Alice: Message4 note left: btc punish tx sig\nbtc cancel tx sig end diff --git a/swap/Cargo.toml b/swap/Cargo.toml index 94eda471..85609dcc 100644 --- a/swap/Cargo.toml +++ b/swap/Cargo.toml @@ -25,6 +25,7 @@ ecdsa_fun = { git = "https://github.com/LLFourn/secp256kfun", rev = "cdfbc766045 ed25519-dalek = { version = "1.0.0-pre.4", features = ["serde"] }# Cannot be 1 because they depend on curve25519-dalek version 3 futures = { version = "0.3", default-features = false } libp2p = { version = "0.34", default-features = false, features = ["tcp-tokio", "yamux", "mplex", "dns", "noise", "request-response"] } +libp2p-async-await = { git = "https://github.com/comit-network/rust-libp2p-async-await", rev = "1429cd780204624b4d244e7d8179fe6ff77988c3" } log = { version = "0.4", features = ["serde"] } miniscript = { version = "4", features = ["serde"] } monero = { version = "0.9", features = ["serde_support"] } diff --git a/swap/src/network/request_response.rs b/swap/src/network/request_response.rs index f0a01664..f35ac3e0 100644 --- a/swap/src/network/request_response.rs +++ b/swap/src/network/request_response.rs @@ -12,7 +12,7 @@ use std::{fmt::Debug, io, marker::PhantomData}; pub const TIMEOUT: u64 = 3600; // One hour. /// Message receive buffer. -const BUF_SIZE: usize = 1024 * 1024; +pub const BUF_SIZE: usize = 1024 * 1024; // TODO: Think about whether there is a better way to do this, e.g., separate // Codec for each Message and a macro that implements them. diff --git a/swap/src/protocol/alice.rs b/swap/src/protocol/alice.rs index 6f5b02d4..1c95eb19 100644 --- a/swap/src/protocol/alice.rs +++ b/swap/src/protocol/alice.rs @@ -34,6 +34,7 @@ use uuid::Uuid; mod encrypted_signature; pub mod event_loop; +mod execution_setup; mod message0; mod message1; mod message2; @@ -236,6 +237,7 @@ pub enum OutEvent { msg: Box, bob_peer_id: PeerId, }, + ExecutionSetupDone(Result>), TransferProofAcknowledged, EncryptedSignature(EncryptedSignature), } @@ -286,6 +288,14 @@ impl From for OutEvent { } } +impl From for OutEvent { + fn from(event: execution_setup::OutEvent) -> Self { + match event { + execution_setup::OutEvent::Done(res) => OutEvent::ExecutionSetupDone(res.map(Box::new)), + } + } +} + impl From for OutEvent { fn from(event: transfer_proof::OutEvent) -> Self { match event { @@ -312,6 +322,7 @@ pub struct Behaviour { message0: message0::Behaviour, message1: message1::Behaviour, message2: message2::Behaviour, + execution_setup: execution_setup::Behaviour, transfer_proof: transfer_proof::Behaviour, encrypted_signature: encrypted_signature::Behaviour, } @@ -328,6 +339,11 @@ impl Behaviour { Ok(()) } + pub fn start_execution_setup(&mut self, bob_peer_id: PeerId, state0: State0) { + self.execution_setup.run(bob_peer_id, state0); + info!("Start execution setup with {}", bob_peer_id); + } + /// Send Message0 to Bob in response to receiving his Message0. pub fn send_message0( &mut self, diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index 9e28934a..685f4606 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -2,7 +2,7 @@ use crate::{ network::{request_response::AliceToBob, transport::SwapTransport, TokioExecutor}, protocol::{ alice, - alice::{Behaviour, OutEvent, SwapResponse, TransferProof}, + alice::{Behaviour, OutEvent, State0, State3, SwapResponse, TransferProof}, bob, bob::EncryptedSignature, }, @@ -38,10 +38,12 @@ pub struct EventLoopHandle { recv_message0: Receiver<(bob::Message0, ResponseChannel)>, recv_message1: Receiver<(bob::Message1, ResponseChannel)>, recv_message2: Receiver, + done_execution_setup: Receiver>, recv_encrypted_signature: Receiver, request: Receiver, conn_established: Receiver, send_swap_response: Sender<(ResponseChannel, SwapResponse)>, + start_execution_setup: Sender<(PeerId, State0)>, send_message0: Sender<(ResponseChannel, alice::Message0)>, send_message1: Sender<(ResponseChannel, alice::Message1)>, send_transfer_proof: Sender<(PeerId, TransferProof)>, @@ -77,6 +79,18 @@ impl EventLoopHandle { .ok_or_else(|| anyhow!("Failed to receive message 2 from Bob")) } + pub async fn execution_setup(&mut self, bob_peer_id: PeerId, state0: State0) -> Result { + let _ = self + .start_execution_setup + .send((bob_peer_id, state0)) + .await?; + + self.done_execution_setup + .recv() + .await + .ok_or_else(|| anyhow!("Failed to setup execution with Bob"))? + } + pub async fn recv_encrypted_signature(&mut self) -> Result { self.recv_encrypted_signature .recv() @@ -140,6 +154,8 @@ pub struct EventLoop { recv_message0: Sender<(bob::Message0, ResponseChannel)>, recv_message1: Sender<(bob::Message1, ResponseChannel)>, recv_message2: Sender, + start_execution_setup: Receiver<(PeerId, State0)>, + done_execution_setup: Sender>, recv_encrypted_signature: Sender, request: Sender, conn_established: Sender, @@ -169,6 +185,8 @@ impl EventLoop { let recv_message0 = Channels::new(); let recv_message1 = Channels::new(); let recv_message2 = Channels::new(); + let start_execution_setup = Channels::new(); + let done_execution_setup = Channels::new(); let recv_encrypted_signature = Channels::new(); let request = Channels::new(); let conn_established = Channels::new(); @@ -183,6 +201,8 @@ impl EventLoop { recv_message0: recv_message0.sender, recv_message1: recv_message1.sender, recv_message2: recv_message2.sender, + start_execution_setup: start_execution_setup.receiver, + done_execution_setup: done_execution_setup.sender, recv_encrypted_signature: recv_encrypted_signature.sender, request: request.sender, conn_established: conn_established.sender, @@ -197,6 +217,8 @@ impl EventLoop { recv_message0: recv_message0.receiver, recv_message1: recv_message1.receiver, recv_message2: recv_message2.receiver, + start_execution_setup: start_execution_setup.sender, + done_execution_setup: done_execution_setup.receiver, recv_encrypted_signature: recv_encrypted_signature.receiver, request: request.receiver, conn_established: conn_established.receiver, @@ -227,6 +249,9 @@ impl EventLoop { OutEvent::Message2 { msg, bob_peer_id : _} => { let _ = self.recv_message2.send(*msg).await; } + OutEvent::ExecutionSetupDone(res) => { + let _ = self.done_execution_setup.send(res.map(|state|*state)).await; + } OutEvent::TransferProofAcknowledged => { trace!("Bob acknowledged transfer proof"); let _ = self.recv_transfer_proof_ack.send(()).await; @@ -247,6 +272,13 @@ impl EventLoop { .map_err(|err|error!("Failed to send swap response: {:#}", err)); } }, + option = self.start_execution_setup.recv().fuse() => { + if let Some((bob_peer_id, state0)) = option { + let _ = self + .swarm + .start_execution_setup(bob_peer_id, state0); + } + }, msg0 = self.send_message0.recv().fuse() => { if let Some((channel, msg)) = msg0 { let _ = self diff --git a/swap/src/protocol/alice/execution_setup.rs b/swap/src/protocol/alice/execution_setup.rs new file mode 100644 index 00000000..8360c1de --- /dev/null +++ b/swap/src/protocol/alice/execution_setup.rs @@ -0,0 +1,91 @@ +use crate::{ + network::request_response::BUF_SIZE, + protocol::{ + alice::{State0, State3}, + bob, + }, +}; +use anyhow::{Context, Error, Result}; +use libp2p::PeerId; +use libp2p_async_await::BehaviourOutEvent; + +#[derive(Debug)] +pub enum OutEvent { + Done(Result), +} + +impl From> for OutEvent { + fn from(event: BehaviourOutEvent) -> Self { + match event { + BehaviourOutEvent::Inbound(_, Ok(State3)) => OutEvent::Done(Ok(State3)), + BehaviourOutEvent::Inbound(_, Err(e)) => OutEvent::Done(Err(e)), + BehaviourOutEvent::Outbound(..) => unreachable!("Alice only supports inbound"), + } + } +} + +#[derive(libp2p::NetworkBehaviour)] +#[behaviour(out_event = "OutEvent", event_process = false)] +pub struct Behaviour { + inner: libp2p_async_await::Behaviour, +} + +impl Default for Behaviour { + fn default() -> Self { + Self { + inner: libp2p_async_await::Behaviour::new(b"/execution_setup/1.0.0"), + } + } +} + +impl Behaviour { + pub fn run(&mut self, bob: PeerId, state0: State0) { + self.inner + .do_protocol_listener(bob, move |mut substream| async move { + let alice_message0 = state0.next_message(); + + let state1 = { + let bob_message0 = serde_cbor::from_slice::( + &substream.read_message(BUF_SIZE).await?, + ) + .context("failed to deserialize message0")?; + state0.receive(bob_message0)? + }; + + substream + .write_message( + &serde_cbor::to_vec(&alice_message0) + .context("failed to serialize Message0")?, + ) + .await?; + + let state2 = { + let bob_message1 = serde_cbor::from_slice::( + &substream.read_message(BUF_SIZE).await?, + ) + .context("failed to deserialize message1")?; + state1.receive(bob_message1) + }; + + { + let alice_message2 = state2.next_message(); + substream + .write_message( + &serde_cbor::to_vec(&alice_message2) + .context("failed to serialize Message2")?, + ) + .await?; + } + + let state3 = { + let bob_message2 = serde_cbor::from_slice::( + &substream.read_message(BUF_SIZE).await?, + ) + .context("failed to deserialize message2")?; + state2.receive(bob_message2)? + }; + + Ok(state3) + }) + } +} diff --git a/swap/src/protocol/alice/steps.rs b/swap/src/protocol/alice/steps.rs index 3272d616..08f15cc0 100644 --- a/swap/src/protocol/alice/steps.rs +++ b/swap/src/protocol/alice/steps.rs @@ -55,39 +55,12 @@ pub async fn negotiate( .send_swap_response(event.channel, SwapResponse { xmr_amount }) .await?; - let (bob_message0, channel) = timeout( + let state3 = timeout( execution_params.bob_time_to_act, - event_loop_handle.recv_message0(), + event_loop_handle.execution_setup(bob_peer_id, state0), ) .await??; - let alice_message0 = state0.next_message(); - event_loop_handle - .send_message0(channel, alice_message0) - .await?; - - let state1 = state0.receive(bob_message0)?; - - let (bob_message1, channel) = timeout( - execution_params.bob_time_to_act, - event_loop_handle.recv_message1(), - ) - .await??; - - let state2 = state1.receive(bob_message1); - - event_loop_handle - .send_message1(channel, state2.next_message()) - .await?; - - let bob_message2 = timeout( - execution_params.bob_time_to_act, - event_loop_handle.recv_message2(), - ) - .await??; - - let state3 = state2.receive(bob_message2)?; - Ok((bob_peer_id, state3)) } diff --git a/swap/src/protocol/bob.rs b/swap/src/protocol/bob.rs index 810af8c2..8c7226c9 100644 --- a/swap/src/protocol/bob.rs +++ b/swap/src/protocol/bob.rs @@ -32,6 +32,7 @@ use crate::{execution_params::ExecutionParams, protocol::alice::TransferProof}; mod encrypted_signature; pub mod event_loop; +mod execution_setup; mod message0; mod message1; mod message2; @@ -162,6 +163,7 @@ impl Builder { } } } + fn init_event_loop( &self, ) -> Result<(bob::event_loop::EventLoop, bob::event_loop::EventLoopHandle)> { @@ -174,6 +176,7 @@ impl Builder { self.peer_id, self.alice_peer_id, self.alice_address.clone(), + self.bitcoin_wallet.clone(), ) } @@ -203,13 +206,14 @@ impl Builder { } } -#[derive(Debug, Clone)] +#[derive(Debug)] pub enum OutEvent { ConnectionEstablished(PeerId), SwapResponse(alice::SwapResponse), Message0(Box), Message1(Box), Message2, + ExecutionSetupDone(Result>), TransferProof(Box), EncryptedSignatureAcknowledged, } @@ -254,6 +258,14 @@ impl From for OutEvent { } } +impl From for OutEvent { + fn from(event: execution_setup::OutEvent) -> Self { + match event { + execution_setup::OutEvent::Done(res) => OutEvent::ExecutionSetupDone(res.map(Box::new)), + } + } +} + impl From for OutEvent { fn from(event: transfer_proof::OutEvent) -> Self { match event { @@ -280,6 +292,7 @@ pub struct Behaviour { message0: message0::Behaviour, message1: message1::Behaviour, message2: message2::Behaviour, + execution_setup: execution_setup::Behaviour, transfer_proof: transfer_proof::Behaviour, encrypted_signature: encrypted_signature::Behaviour, } @@ -291,6 +304,17 @@ impl Behaviour { info!("Requesting swap from: {}", alice); } + pub fn start_execution_setup( + &mut self, + alice_peer_id: PeerId, + state0: State0, + bitcoin_wallet: Arc, + ) { + self.execution_setup + .run(alice_peer_id, state0, bitcoin_wallet); + info!("Start execution setup with {}", alice_peer_id); + } + /// Sends Bob's first message to Alice. pub fn send_message0(&mut self, alice: PeerId, msg: bob::Message0) { self.message0.send(alice, msg); diff --git a/swap/src/protocol/bob/event_loop.rs b/swap/src/protocol/bob/event_loop.rs index 86f8bac6..abc646b6 100644 --- a/swap/src/protocol/bob/event_loop.rs +++ b/swap/src/protocol/bob/event_loop.rs @@ -1,15 +1,17 @@ use crate::{ + bitcoin, bitcoin::EncryptedSignature, network::{transport::SwapTransport, TokioExecutor}, protocol::{ alice, alice::{SwapResponse, TransferProof}, - bob::{self, Behaviour, OutEvent, SwapRequest}, + bob::{self, Behaviour, OutEvent, State0, State2, SwapRequest}, }, }; use anyhow::{anyhow, Result}; use futures::FutureExt; use libp2p::{core::Multiaddr, PeerId}; +use std::sync::Arc; use tokio::sync::mpsc::{Receiver, Sender}; use tracing::{debug, error, info}; @@ -37,6 +39,8 @@ pub struct EventLoopHandle { recv_swap_response: Receiver, recv_message0: Receiver, recv_message1: Receiver, + start_execution_setup: Sender, + done_execution_setup: Receiver>, recv_transfer_proof: Receiver, conn_established: Receiver, dial_alice: Sender<()>, @@ -70,6 +74,15 @@ impl EventLoopHandle { .ok_or_else(|| anyhow!("Failed to receive message 1 from Alice")) } + pub async fn execution_setup(&mut self, state0: State0) -> Result { + let _ = self.start_execution_setup.send(state0).await?; + + self.done_execution_setup + .recv() + .await + .ok_or_else(|| anyhow!("Failed to setup execution with Alice"))? + } + pub async fn recv_transfer_proof(&mut self) -> Result { self.recv_transfer_proof .recv() @@ -128,10 +141,13 @@ impl EventLoopHandle { #[allow(missing_debug_implementations)] pub struct EventLoop { swarm: libp2p::Swarm, + bitcoin_wallet: Arc, alice_peer_id: PeerId, recv_swap_response: Sender, recv_message0: Sender, recv_message1: Sender, + start_execution_setup: Receiver, + done_execution_setup: Sender>, recv_transfer_proof: Sender, dial_alice: Receiver<()>, conn_established: Sender, @@ -150,6 +166,7 @@ impl EventLoop { peer_id: PeerId, alice_peer_id: PeerId, alice_addr: Multiaddr, + bitcoin_wallet: Arc, ) -> Result<(Self, EventLoopHandle)> { let mut swarm = libp2p::swarm::SwarmBuilder::new(transport, behaviour, peer_id) .executor(Box::new(TokioExecutor { @@ -162,6 +179,8 @@ impl EventLoop { let swap_response = Channels::new(); let recv_message0 = Channels::new(); let recv_message1 = Channels::new(); + let start_execution_setup = Channels::new(); + let done_execution_setup = Channels::new(); let recv_transfer_proof = Channels::new(); let dial_alice = Channels::new(); let conn_established = Channels::new(); @@ -175,9 +194,12 @@ impl EventLoop { let event_loop = EventLoop { swarm, alice_peer_id, + bitcoin_wallet, recv_swap_response: swap_response.sender, recv_message0: recv_message0.sender, recv_message1: recv_message1.sender, + start_execution_setup: start_execution_setup.receiver, + done_execution_setup: done_execution_setup.sender, recv_transfer_proof: recv_transfer_proof.sender, conn_established: conn_established.sender, dial_alice: dial_alice.receiver, @@ -193,6 +215,8 @@ impl EventLoop { recv_swap_response: swap_response.receiver, recv_message0: recv_message0.receiver, recv_message1: recv_message1.receiver, + start_execution_setup: start_execution_setup.sender, + done_execution_setup: done_execution_setup.receiver, recv_transfer_proof: recv_transfer_proof.receiver, conn_established: conn_established.receiver, dial_alice: dial_alice.sender, @@ -225,6 +249,9 @@ impl EventLoop { let _ = self.recv_message1.send(*msg).await; } OutEvent::Message2 => info!("Alice acknowledged message 2 received"), + OutEvent::ExecutionSetupDone(res) => { + let _ = self.done_execution_setup.send(res.map(|state|*state)).await; + } OutEvent::TransferProof(msg) => { let _ = self.recv_transfer_proof.send(*msg).await; } @@ -272,6 +299,13 @@ impl EventLoop { self.swarm.send_message2(self.alice_peer_id, msg); } }, + option = self.start_execution_setup.recv().fuse() => { + if let Some(state0) = option { + let _ = self + .swarm + .start_execution_setup(self.alice_peer_id, state0, self.bitcoin_wallet.clone()); + } + }, encrypted_signature = self.send_encrypted_signature.recv().fuse() => { if let Some(tx_redeem_encsig) = encrypted_signature { self.swarm.send_encrypted_signature(self.alice_peer_id, tx_redeem_encsig); diff --git a/swap/src/protocol/bob/execution_setup.rs b/swap/src/protocol/bob/execution_setup.rs new file mode 100644 index 00000000..3a0f3af0 --- /dev/null +++ b/swap/src/protocol/bob/execution_setup.rs @@ -0,0 +1,97 @@ +use crate::{ + network::request_response::BUF_SIZE, + protocol::{ + alice, + bob::{State0, State2}, + }, +}; +use anyhow::{Context, Error, Result}; +use libp2p::PeerId; +use libp2p_async_await::BehaviourOutEvent; +use std::sync::Arc; + +#[derive(Debug)] +pub enum OutEvent { + Done(Result), +} + +impl From> for OutEvent { + fn from(event: BehaviourOutEvent<(), State2, Error>) -> Self { + match event { + BehaviourOutEvent::Outbound(_, Ok(State2)) => OutEvent::Done(Ok(State2)), + BehaviourOutEvent::Outbound(_, Err(e)) => OutEvent::Done(Err(e)), + BehaviourOutEvent::Inbound(..) => unreachable!("Bob only supports outbound"), + } + } +} + +#[derive(libp2p::NetworkBehaviour)] +#[behaviour(out_event = "OutEvent", event_process = false)] +pub struct Behaviour { + inner: libp2p_async_await::Behaviour<(), State2, anyhow::Error>, +} + +impl Default for Behaviour { + fn default() -> Self { + Self { + inner: libp2p_async_await::Behaviour::new(b"/execution_setup/1.0.0"), + } + } +} + +impl Behaviour { + pub fn run( + &mut self, + alice: PeerId, + state0: State0, + bitcoin_wallet: Arc, + ) { + self.inner + .do_protocol_dialer(alice, move |mut substream| async move { + let bob_message0 = state0.next_message(); + + substream + .write_message( + &serde_cbor::to_vec(&bob_message0) + .context("failed to serialize message0")?, + ) + .await?; + + let alice_message0 = serde_cbor::from_slice::( + &substream.read_message(BUF_SIZE).await?, + ) + .context("failed to deserialize message0")?; + + let state1 = state0 + .receive(bitcoin_wallet.as_ref(), alice_message0) + .await?; + { + let bob_message1 = state1.next_message(); + substream + .write_message( + &serde_cbor::to_vec(&bob_message1) + .context("failed to serialize Message1")?, + ) + .await?; + } + + let alice_message1 = serde_cbor::from_slice::( + &substream.read_message(BUF_SIZE).await?, + ) + .context("failed to deserialize message1")?; + let state2 = state1.receive(alice_message1)?; + + { + let bob_message2 = state2.next_message(); + substream + .write_message( + &serde_cbor::to_vec(&bob_message2) + .context("failed to serialize Message2")?, + ) + .await?; + } + + Ok(state2) + }) + } +} diff --git a/swap/src/protocol/bob/swap.rs b/swap/src/protocol/bob/swap.rs index 400a5be8..5036b52c 100644 --- a/swap/src/protocol/bob/swap.rs +++ b/swap/src/protocol/bob/swap.rs @@ -70,13 +70,7 @@ async fn run_until_internal( BobState::Started { state0, amounts } => { event_loop_handle.dial().await?; - let state2 = negotiate( - state0, - amounts, - &mut event_loop_handle, - bitcoin_wallet.clone(), - ) - .await?; + let state2 = negotiate(state0, amounts, &mut event_loop_handle).await?; let state = BobState::Negotiated(state2); let db_state = state.clone().into(); @@ -378,7 +372,6 @@ pub async fn negotiate( state0: crate::protocol::bob::state::State0, amounts: SwapAmounts, event_loop_handle: &mut EventLoopHandle, - bitcoin_wallet: Arc, ) -> Result { tracing::trace!("Starting negotiate"); event_loop_handle @@ -391,21 +384,7 @@ pub async fn negotiate( // argument. let _swap_response = event_loop_handle.recv_swap_response().await?; - event_loop_handle - .send_message0(state0.next_message()) - .await?; - let msg0 = event_loop_handle.recv_message0().await?; - let state1 = state0.receive(bitcoin_wallet.as_ref(), msg0).await?; - - event_loop_handle - .send_message1(state1.next_message()) - .await?; - let msg1 = event_loop_handle.recv_message1().await?; - let state2 = state1.receive(msg1)?; - - event_loop_handle - .send_message2(state2.next_message()) - .await?; + let state2 = event_loop_handle.execution_setup(state0).await?; Ok(state2) }