Upgrade tokio to 1.0

Upgrade bitcoin harness dependency to latest commit

Upgrade backoff to fix failing tests. The previous version of backoff had a broken version of the retry function. Upgraded to a newer comit which fixes this problem.

Upgrade hyper to 0.14 as the 0.13 was bringing in tokio 0.2.24

Upgraded bitcoin harness to version that uses tokio 1.0 and reqwest 0.11

Upgrade reqwest to 0.11. Reqwest 0.11 uses tokio 1.0

Upgrade libp2p to 0.34 in preparation for tokio 1.0 upgrade
pull/143/head
rishflab 3 years ago committed by Franck Royer
parent 73f89fefda
commit 77fc5743a2
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4

1060
Cargo.lock generated

File diff suppressed because it is too large Load Diff

@ -10,12 +10,12 @@ digest_auth = "0.2.3"
futures = "0.3" futures = "0.3"
port_check = "0.1" port_check = "0.1"
rand = "0.7" rand = "0.7"
reqwest = { version = "0.10", default-features = false, features = ["json", "native-tls"] } reqwest = { version = "0.11", default-features = false, features = ["json", "native-tls"] }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
spectral = "0.6" spectral = "0.6"
testcontainers = "0.11" testcontainers = "0.11"
tokio = { version = "0.2", default-features = false, features = ["blocking", "macros", "rt-core", "time"] } tokio = { version = "1.0", default-features = false, features = ["rt-multi-thread", "time", "macros"] }
tracing = "0.1" tracing = "0.1"
tracing-log = "0.1" tracing-log = "0.1"
tracing-subscriber = { version = "0.2", default-features = false, features = ["fmt", "ansi", "env-filter"] } tracing-subscriber = { version = "0.2", default-features = false, features = ["fmt", "ansi", "env-filter"] }

@ -266,7 +266,7 @@ impl<'c> MoneroWalletRpc {
// ~30 seconds // ~30 seconds
bail!("Wallet could not catch up with monerod after 30 retries.") bail!("Wallet could not catch up with monerod after 30 retries.")
} }
time::delay_for(Duration::from_millis(WAIT_WALLET_SYNC_MILLIS)).await; time::sleep(Duration::from_millis(WAIT_WALLET_SYNC_MILLIS)).await;
retry += 1; retry += 1;
} }
Ok(()) Ok(())
@ -293,7 +293,7 @@ impl<'c> MoneroWalletRpc {
/// Mine a block ever BLOCK_TIME_SECS seconds. /// Mine a block ever BLOCK_TIME_SECS seconds.
async fn mine(monerod: monerod::Client, reward_address: String) -> Result<()> { async fn mine(monerod: monerod::Client, reward_address: String) -> Result<()> {
loop { loop {
time::delay_for(Duration::from_secs(BLOCK_TIME_SECS)).await; time::sleep(Duration::from_secs(BLOCK_TIME_SECS)).await;
monerod.generate_blocks(1, &reward_address).await?; monerod.generate_blocks(1, &reward_address).await?;
} }
} }

@ -22,7 +22,7 @@ async fn init_miner_and_mine_to_miner_address() {
let got_miner_balance = miner_wallet.balance().await.unwrap(); let got_miner_balance = miner_wallet.balance().await.unwrap();
assert_that!(got_miner_balance).is_greater_than(0); assert_that!(got_miner_balance).is_greater_than(0);
time::delay_for(Duration::from_millis(1010)).await; time::sleep(Duration::from_millis(1010)).await;
// after a bit more than 1 sec another block should have been mined // after a bit more than 1 sec another block should have been mined
let block_height = monerod.client().get_block_count().await.unwrap(); let block_height = monerod.client().get_block_count().await.unwrap();

@ -10,10 +10,10 @@ anyhow = "1"
async-recursion = "0.3.1" async-recursion = "0.3.1"
async-trait = "0.1" async-trait = "0.1"
atty = "0.2" atty = "0.2"
backoff = { version = "0.2", features = ["tokio"] } backoff = { git = "https://github.com/ihrwein/backoff", rev = "9d03992a83dfdc596be26276d4e5c5254a4b11a2", features = ["tokio"] }
base64 = "0.12" base64 = "0.12"
bitcoin = { version = "0.25", features = ["rand", "use-serde"] } bitcoin = { version = "0.25", features = ["rand", "use-serde"] }
bitcoin-harness = { git = "https://github.com/coblox/bitcoin-harness-rs", rev = "864b55fcba2e770105f135781dd2e3002c503d12" } bitcoin-harness = { git = "https://github.com/coblox/bitcoin-harness-rs", rev = "ae2f6cd547496e680941c0910018bbe884128799" }
conquer-once = "0.3" conquer-once = "0.3"
cross-curve-dleq = { git = "https://github.com/comit-network/cross-curve-dleq", rev = "eddcdea1d1f16fa33ef581d1744014ece535c920", features = ["serde"] } cross-curve-dleq = { git = "https://github.com/comit-network/cross-curve-dleq", rev = "eddcdea1d1f16fa33ef581d1744014ece535c920", features = ["serde"] }
curve25519-dalek = "2" curve25519-dalek = "2"
@ -21,8 +21,7 @@ derivative = "2"
ecdsa_fun = { git = "https://github.com/LLFourn/secp256kfun", rev = "cdfbc766045ea678a41780919d6228dd5acee3be", features = ["libsecp_compat", "serde"] } ecdsa_fun = { git = "https://github.com/LLFourn/secp256kfun", rev = "cdfbc766045ea678a41780919d6228dd5acee3be", features = ["libsecp_compat", "serde"] }
ed25519-dalek = { version = "1.0.0-pre.4", features = ["serde"] }# Cannot be 1 because they depend on curve25519-dalek version 3 ed25519-dalek = { version = "1.0.0-pre.4", features = ["serde"] }# Cannot be 1 because they depend on curve25519-dalek version 3
futures = { version = "0.3", default-features = false } futures = { version = "0.3", default-features = false }
libp2p = { version = "0.29", default-features = false, features = ["tcp-tokio", "yamux", "mplex", "dns", "noise", "request-response"] } libp2p = { version = "0.34", default-features = false, features = ["tcp-tokio", "yamux", "mplex", "dns", "noise", "request-response"] }
libp2p-tokio-socks5 = "0.4"
log = { version = "0.4", features = ["serde"] } log = { version = "0.4", features = ["serde"] }
miniscript = { version = "4", features = ["serde"] } miniscript = { version = "4", features = ["serde"] }
monero = { version = "0.9", features = ["serde_support"] } monero = { version = "0.9", features = ["serde_support"] }
@ -30,7 +29,7 @@ monero-harness = { path = "../monero-harness" }
pem = "0.8" pem = "0.8"
prettytable-rs = "0.8" prettytable-rs = "0.8"
rand = "0.7" rand = "0.7"
reqwest = { version = "0.10", default-features = false, features = ["socks"] } reqwest = { version = "0.11", default-features = false }
rust_decimal = "1.8" rust_decimal = "1.8"
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
serde_cbor = "0.11" serde_cbor = "0.11"
@ -43,7 +42,7 @@ strum = { version = "0.20", features = ["derive"] }
tempfile = "3" tempfile = "3"
thiserror = "1" thiserror = "1"
time = "0.2" time = "0.2"
tokio = { version = "0.2", features = ["rt-threaded", "time", "macros", "sync"] } tokio = { version = "1.0", features = ["rt-multi-thread", "time", "macros", "sync"] }
tracing = { version = "0.1", features = ["attributes"] } tracing = { version = "0.1", features = ["attributes"] }
tracing-core = "0.1" tracing-core = "0.1"
tracing-futures = { version = "0.2", features = ["std-future", "futures-03"] } tracing-futures = { version = "0.2", features = ["std-future", "futures-03"] }
@ -55,7 +54,7 @@ void = "1"
[dev-dependencies] [dev-dependencies]
get-port = "3" get-port = "3"
hyper = "0.13" hyper = "0.14"
port_check = "0.1" port_check = "0.1"
serde_cbor = "0.11" serde_cbor = "0.11"
spectral = "0.6" spectral = "0.6"

@ -252,7 +252,7 @@ where
B: GetBlockHeight, B: GetBlockHeight,
{ {
while client.get_block_height().await < target { while client.get_block_height().await < target {
tokio::time::delay_for(std::time::Duration::from_secs(1)).await; tokio::time::sleep(std::time::Duration::from_secs(1)).await;
} }
} }

@ -9,7 +9,7 @@ use crate::{
use ::bitcoin::{util::psbt::PartiallySignedTransaction, Txid}; use ::bitcoin::{util::psbt::PartiallySignedTransaction, Txid};
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use async_trait::async_trait; use async_trait::async_trait;
use backoff::{backoff::Constant as ConstantBackoff, future::FutureOperation as _}; use backoff::{backoff::Constant as ConstantBackoff, tokio::retry};
use bitcoin_harness::{bitcoind_rpc::PsbtBase64, BitcoindRpcApi}; use bitcoin_harness::{bitcoind_rpc::PsbtBase64, BitcoindRpcApi};
use reqwest::Url; use reqwest::Url;
use std::time::Duration; use std::time::Duration;
@ -112,10 +112,11 @@ impl BroadcastSignedTransaction for Wallet {
#[async_trait] #[async_trait]
impl WatchForRawTransaction for Wallet { impl WatchForRawTransaction for Wallet {
async fn watch_for_raw_transaction(&self, txid: Txid) -> Transaction { async fn watch_for_raw_transaction(&self, txid: Txid) -> Transaction {
(|| async { Ok(self.inner.get_raw_transaction(txid).await?) }) retry(ConstantBackoff::new(Duration::from_secs(1)), || async {
.retry(ConstantBackoff::new(Duration::from_secs(1))) Ok(self.inner.get_raw_transaction(txid).await?)
.await })
.expect("transient errors to be retried") .await
.expect("transient errors to be retried")
} }
} }
@ -130,10 +131,11 @@ impl GetRawTransaction for Wallet {
#[async_trait] #[async_trait]
impl GetBlockHeight for Wallet { impl GetBlockHeight for Wallet {
async fn get_block_height(&self) -> BlockHeight { async fn get_block_height(&self) -> BlockHeight {
let height = (|| async { Ok(self.inner.client.getblockcount().await?) }) let height = retry(ConstantBackoff::new(Duration::from_secs(1)), || async {
.retry(ConstantBackoff::new(Duration::from_secs(1))) Ok(self.inner.client.getblockcount().await?)
.await })
.expect("transient errors to be retried"); .await
.expect("transient errors to be retried");
BlockHeight::new(height) BlockHeight::new(height)
} }
@ -148,7 +150,7 @@ impl TransactionBlockHeight for Wallet {
NotYetMined, NotYetMined,
} }
let height = (|| async { let height = retry(ConstantBackoff::new(Duration::from_secs(1)), || async {
let block_height = self let block_height = self
.inner .inner
.transaction_block_height(txid) .transaction_block_height(txid)
@ -160,7 +162,6 @@ impl TransactionBlockHeight for Wallet {
Result::<_, backoff::Error<Error>>::Ok(block_height) Result::<_, backoff::Error<Error>>::Ok(block_height)
}) })
.retry(ConstantBackoff::new(Duration::from_secs(1)))
.await .await
.expect("transient errors to be retried"); .expect("transient errors to be retried");

@ -60,7 +60,7 @@ impl From<&AliceState> for Alice {
.. ..
} => Alice::Negotiated { } => Alice::Negotiated {
state3: state3.as_ref().clone(), state3: state3.as_ref().clone(),
bob_peer_id: bob_peer_id.clone(), bob_peer_id: *bob_peer_id,
}, },
AliceState::BtcLocked { AliceState::BtcLocked {
state3, state3,
@ -68,7 +68,7 @@ impl From<&AliceState> for Alice {
.. ..
} => Alice::BtcLocked { } => Alice::BtcLocked {
state3: state3.as_ref().clone(), state3: state3.as_ref().clone(),
bob_peer_id: bob_peer_id.clone(), bob_peer_id: *bob_peer_id,
}, },
AliceState::XmrLocked { state3 } => Alice::XmrLocked(state3.as_ref().clone()), AliceState::XmrLocked { state3 } => Alice::XmrLocked(state3.as_ref().clone()),
AliceState::EncSigLearned { AliceState::EncSigLearned {

@ -16,12 +16,13 @@ use crate::cli::{Command, Options, Resume};
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use config::Config; use config::Config;
use database::Database; use database::Database;
use log::LevelFilter;
use prettytable::{row, Table}; use prettytable::{row, Table};
use protocol::{alice, bob, bob::Builder, SwapAmounts}; use protocol::{alice, bob, bob::Builder, SwapAmounts};
use std::sync::Arc; use std::sync::Arc;
use structopt::StructOpt; use structopt::StructOpt;
use trace::init_tracing; use trace::init_tracing;
use tracing::{info, log::LevelFilter}; use tracing::info;
use uuid::Uuid; use uuid::Uuid;
pub mod bitcoin; pub mod bitcoin;

@ -5,7 +5,7 @@ use crate::monero::{
use ::monero::{Address, Network, PrivateKey, PublicKey}; use ::monero::{Address, Network, PrivateKey, PublicKey};
use anyhow::Result; use anyhow::Result;
use async_trait::async_trait; use async_trait::async_trait;
use backoff::{backoff::Constant as ConstantBackoff, future::FutureOperation as _}; use backoff::{backoff::Constant as ConstantBackoff, tokio::retry};
use bitcoin::hashes::core::sync::atomic::AtomicU32; use bitcoin::hashes::core::sync::atomic::AtomicU32;
use monero_harness::rpc::wallet; use monero_harness::rpc::wallet;
use std::{ use std::{
@ -96,7 +96,6 @@ impl CreateWalletForOutput for Wallet {
// TODO: For retry, use `backoff::ExponentialBackoff` in production as opposed // TODO: For retry, use `backoff::ExponentialBackoff` in production as opposed
// to `ConstantBackoff`. // to `ConstantBackoff`.
#[async_trait] #[async_trait]
impl WatchForTransfer for Wallet { impl WatchForTransfer for Wallet {
async fn watch_for_transfer( async fn watch_for_transfer(
@ -117,46 +116,41 @@ impl WatchForTransfer for Wallet {
let wallet = self.inner.clone(); let wallet = self.inner.clone();
let confirmations = Arc::new(AtomicU32::new(0u32)); let confirmations = Arc::new(AtomicU32::new(0u32));
let res = (move || {
let confirmations = confirmations.clone(); let res = retry(ConstantBackoff::new(Duration::from_secs(1)), || async {
let transfer_proof = transfer_proof.clone(); // NOTE: Currently, this is conflicting IO errors with the transaction not being
let wallet = wallet.clone(); // in the blockchain yet, or not having enough confirmations on it. All these
async move { // errors warrant a retry, but the strategy should probably differ per case
// NOTE: Currently, this is conflicting IO errors with the transaction not being let proof = wallet
// in the blockchain yet, or not having enough confirmations on it. All these .check_tx_key(
// errors warrant a retry, but the strategy should probably differ per case &String::from(transfer_proof.tx_hash()),
let proof = wallet &transfer_proof.tx_key().to_string(),
.check_tx_key( &address.to_string(),
&String::from(transfer_proof.tx_hash()), )
&transfer_proof.tx_key().to_string(), .await
&address.to_string(), .map_err(|_| backoff::Error::Transient(Error::TxNotFound))?;
)
.await if proof.received != expected_amount.as_piconero() {
.map_err(|_| backoff::Error::Transient(Error::TxNotFound))?; return Err(backoff::Error::Permanent(Error::InsufficientFunds {
expected: expected_amount,
if proof.received != expected_amount.as_piconero() { actual: Amount::from_piconero(proof.received),
return Err(backoff::Error::Permanent(Error::InsufficientFunds { }));
expected: expected_amount, }
actual: Amount::from_piconero(proof.received),
})); if proof.confirmations > confirmations.load(Ordering::SeqCst) {
} confirmations.store(proof.confirmations, Ordering::SeqCst);
info!(
if proof.confirmations > confirmations.load(Ordering::SeqCst) { "Monero lock tx received {} out of {} confirmations",
confirmations.store(proof.confirmations, Ordering::SeqCst); proof.confirmations, expected_confirmations
info!( );
"Monero lock tx received {} out of {} confirmations",
proof.confirmations, expected_confirmations
);
}
if proof.confirmations < expected_confirmations {
return Err(backoff::Error::Transient(Error::InsufficientConfirmations));
}
Ok(proof)
} }
if proof.confirmations < expected_confirmations {
return Err(backoff::Error::Transient(Error::InsufficientConfirmations));
}
Ok(proof)
}) })
.retry(ConstantBackoff::new(Duration::from_secs(1)))
.await; .await;
if let Err(Error::InsufficientFunds { expected, actual }) = res { if let Err(Error::InsufficientFunds { expected, actual }) = res {

@ -12,7 +12,7 @@ use std::{
task::Poll, task::Poll,
}; };
#[derive(Debug)] #[derive(Debug, Copy, Clone)]
pub enum OutEvent { pub enum OutEvent {
ConnectionEstablished(PeerId), ConnectionEstablished(PeerId),
} }
@ -42,7 +42,7 @@ impl PeerTracker {
/// Returns the peer id of counterparty if we are connected. /// Returns the peer id of counterparty if we are connected.
pub fn counterparty_peer_id(&self) -> Option<PeerId> { pub fn counterparty_peer_id(&self) -> Option<PeerId> {
if let Some((id, _)) = &self.connected { if let Some((id, _)) = &self.connected {
return Some(id.clone()); return Some(*id);
} }
None None
} }
@ -50,7 +50,7 @@ impl PeerTracker {
/// Returns the peer_id and multiaddr of counterparty if we are connected. /// Returns the peer_id and multiaddr of counterparty if we are connected.
pub fn counterparty(&self) -> Option<(PeerId, Multiaddr)> { pub fn counterparty(&self) -> Option<(PeerId, Multiaddr)> {
if let Some((peer_id, addr)) = &self.connected { if let Some((peer_id, addr)) = &self.connected {
return Some((peer_id.clone(), addr.clone())); return Some((*peer_id, addr.clone()));
} }
None None
} }
@ -97,18 +97,18 @@ impl NetworkBehaviour for PeerTracker {
) { ) {
match point { match point {
ConnectedPoint::Dialer { address } => { ConnectedPoint::Dialer { address } => {
self.connected = Some((peer.clone(), address.clone())); self.connected = Some((*peer, address.clone()));
} }
ConnectedPoint::Listener { ConnectedPoint::Listener {
local_addr: _, local_addr: _,
send_back_addr, send_back_addr,
} => { } => {
self.connected = Some((peer.clone(), send_back_addr.clone())); self.connected = Some((*peer, send_back_addr.clone()));
} }
} }
self.events self.events
.push_back(OutEvent::ConnectionEstablished(peer.clone())); .push_back(OutEvent::ConnectionEstablished(*peer));
} }
fn inject_connection_closed(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) { fn inject_connection_closed(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) {

@ -31,7 +31,7 @@ pub fn build(id_keys: identity::Keypair) -> Result<SwapTransport> {
.upgrade(Version::V1) .upgrade(Version::V1)
.authenticate(noise) .authenticate(noise)
.multiplex(SelectUpgrade::new( .multiplex(SelectUpgrade::new(
yamux::Config::default(), yamux::YamuxConfig::default(),
MplexConfig::new(), MplexConfig::new(),
)) ))
.map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer))) .map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer)))

@ -165,7 +165,7 @@ impl Builder {
} }
pub fn peer_id(&self) -> PeerId { pub fn peer_id(&self) -> PeerId {
self.peer_id.clone() self.peer_id
} }
pub fn listen_address(&self) -> Multiaddr { pub fn listen_address(&self) -> Multiaddr {
@ -211,7 +211,7 @@ impl Builder {
alice_transport, alice_transport,
alice_behaviour, alice_behaviour,
self.listen_address.clone(), self.listen_address.clone(),
self.peer_id.clone(), self.peer_id,
) )
} }
} }
@ -322,21 +322,32 @@ impl Behaviour {
&mut self, &mut self,
channel: ResponseChannel<AliceToBob>, channel: ResponseChannel<AliceToBob>,
swap_response: SwapResponse, swap_response: SwapResponse,
) { ) -> Result<()> {
self.amounts.send(channel, swap_response); self.amounts.send(channel, swap_response)?;
info!("Sent amounts response"); info!("Sent amounts response");
Ok(())
} }
/// Send Message0 to Bob in response to receiving his Message0. /// Send Message0 to Bob in response to receiving his Message0.
pub fn send_message0(&mut self, channel: ResponseChannel<AliceToBob>, msg: Message0) { pub fn send_message0(
self.message0.send(channel, msg); &mut self,
channel: ResponseChannel<AliceToBob>,
msg: Message0,
) -> Result<()> {
self.message0.send(channel, msg)?;
debug!("Sent Message0"); debug!("Sent Message0");
Ok(())
} }
/// Send Message1 to Bob in response to receiving his Message1. /// Send Message1 to Bob in response to receiving his Message1.
pub fn send_message1(&mut self, channel: ResponseChannel<AliceToBob>, msg: Message1) { pub fn send_message1(
self.message1.send(channel, msg); &mut self,
channel: ResponseChannel<AliceToBob>,
msg: Message1,
) -> Result<()> {
self.message1.send(channel, msg)?;
debug!("Sent Message1"); debug!("Sent Message1");
Ok(())
} }
/// Send Transfer Proof to Bob. /// Send Transfer Proof to Bob.

@ -82,7 +82,10 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<Request, Response>> for B
debug!("Received encrypted signature"); debug!("Received encrypted signature");
self.events.push_back(OutEvent::Msg(*msg)); self.events.push_back(OutEvent::Msg(*msg));
// Send back empty response so that the request/response protocol completes. // Send back empty response so that the request/response protocol completes.
let _ = self.rr.send_response(channel, Response::EncryptedSignature); if let Err(error) = self.rr.send_response(channel, Response::EncryptedSignature)
{
error!("Failed to sen Encrypted Signature ack: {:?}", error);
}
} }
} }
RequestResponseEvent::Message { RequestResponseEvent::Message {
@ -95,6 +98,9 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<Request, Response>> for B
RequestResponseEvent::OutboundFailure { error, .. } => { RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error); error!("Outbound failure: {:?}", error);
} }
RequestResponseEvent::ResponseSent { .. } => {
debug!("Alice has sent an Message3 response to Bob");
}
} }
} }
} }

@ -8,12 +8,11 @@ use crate::{
}, },
}; };
use anyhow::{anyhow, Context, Result}; use anyhow::{anyhow, Context, Result};
use futures::FutureExt;
use libp2p::{ use libp2p::{
core::Multiaddr, futures::StreamExt, request_response::ResponseChannel, PeerId, Swarm, core::Multiaddr, futures::FutureExt, request_response::ResponseChannel, PeerId, Swarm,
}; };
use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::mpsc::{Receiver, Sender};
use tracing::trace; use tracing::{error, trace};
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct Channels<T> { pub struct Channels<T> {
@ -227,24 +226,33 @@ impl EventLoop {
} }
} }
}, },
swap_response = self.send_swap_response.next().fuse() => { swap_response = self.send_swap_response.recv().fuse() => {
if let Some((channel, swap_response)) = swap_response { if let Some((channel, swap_response)) = swap_response {
self.swarm.send_swap_response(channel, swap_response); let _ = self
.swarm
.send_swap_response(channel, swap_response)
.map_err(|err|error!("Failed to send swap response: {:#}", err));
} }
}, },
msg0 = self.send_message0.next().fuse() => { msg0 = self.send_message0.recv().fuse() => {
if let Some((channel, msg)) = msg0 { if let Some((channel, msg)) = msg0 {
self.swarm.send_message0(channel, msg); let _ = self
.swarm
.send_message0(channel, msg)
.map_err(|err|error!("Failed to send message0: {:#}", err));
} }
}, },
msg1 = self.send_message1.next().fuse() => { msg1 = self.send_message1.recv().fuse() => {
if let Some((channel, msg)) = msg1 { if let Some((channel, msg)) = msg1 {
self.swarm.send_message1(channel, msg); let _ = self
.swarm
.send_message1(channel, msg)
.map_err(|err|error!("Failed to send message1: {:#}", err));
} }
}, },
transfer_proof = self.send_transfer_proof.next().fuse() => { transfer_proof = self.send_transfer_proof.recv().fuse() => {
if let Some((bob_peer_id, msg)) = transfer_proof { if let Some((bob_peer_id, msg)) = transfer_proof {
self.swarm.send_transfer_proof(bob_peer_id, msg); self.swarm.send_transfer_proof(bob_peer_id, msg)
} }
}, },
} }

@ -3,6 +3,7 @@ use crate::{
network::request_response::{AliceToBob, BobToAlice, Codec, Message0Protocol, TIMEOUT}, network::request_response::{AliceToBob, BobToAlice, Codec, Message0Protocol, TIMEOUT},
protocol::bob, protocol::bob,
}; };
use anyhow::{anyhow, Result};
use libp2p::{ use libp2p::{
request_response::{ request_response::{
handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig,
@ -49,9 +50,11 @@ pub struct Behaviour {
} }
impl Behaviour { impl Behaviour {
pub fn send(&mut self, channel: ResponseChannel<AliceToBob>, msg: Message0) { pub fn send(&mut self, channel: ResponseChannel<AliceToBob>, msg: Message0) -> Result<()> {
let msg = AliceToBob::Message0(Box::new(msg)); let msg = AliceToBob::Message0(Box::new(msg));
self.rr.send_response(channel, msg); self.rr
.send_response(channel, msg)
.map_err(|_| anyhow!("Sending Amounts response failed"))
} }
fn poll( fn poll(
&mut self, &mut self,
@ -108,6 +111,9 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>>
RequestResponseEvent::OutboundFailure { error, .. } => { RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error); error!("Outbound failure: {:?}", error);
} }
RequestResponseEvent::ResponseSent { .. } => {
debug!("Alice has sent an Message0 response to Bob");
}
} }
} }
} }

@ -2,6 +2,7 @@ use crate::{
network::request_response::{AliceToBob, BobToAlice, Codec, Message1Protocol, TIMEOUT}, network::request_response::{AliceToBob, BobToAlice, Codec, Message1Protocol, TIMEOUT},
protocol::bob, protocol::bob,
}; };
use anyhow::{anyhow, Result};
use ecdsa_fun::{adaptor::EncryptedSignature, Signature}; use ecdsa_fun::{adaptor::EncryptedSignature, Signature};
use libp2p::{ use libp2p::{
request_response::{ request_response::{
@ -46,9 +47,11 @@ pub struct Behaviour {
} }
impl Behaviour { impl Behaviour {
pub fn send(&mut self, channel: ResponseChannel<AliceToBob>, msg: Message1) { pub fn send(&mut self, channel: ResponseChannel<AliceToBob>, msg: Message1) -> Result<()> {
let msg = AliceToBob::Message1(Box::new(msg)); let msg = AliceToBob::Message1(Box::new(msg));
self.rr.send_response(channel, msg); self.rr
.send_response(channel, msg)
.map_err(|_| anyhow!("Sending Amounts response failed"))
} }
fn poll( fn poll(
@ -106,6 +109,9 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>>
RequestResponseEvent::OutboundFailure { error, .. } => { RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error); error!("Outbound failure: {:?}", error);
} }
RequestResponseEvent::ResponseSent { .. } => {
debug!("Alice has sent an Message1 response to Bob");
}
} }
} }
} }

@ -96,6 +96,9 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>>
RequestResponseEvent::OutboundFailure { error, .. } => { RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error); error!("Outbound failure: {:?}", error);
} }
RequestResponseEvent::ResponseSent { .. } => {
debug!("Alice has sent an Message2 response to Bob");
}
} }
} }
} }

@ -1,3 +1,4 @@
use anyhow::{anyhow, Result};
use libp2p::{ use libp2p::{
request_response::{ request_response::{
handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig,
@ -44,9 +45,11 @@ pub struct Behaviour {
impl Behaviour { impl Behaviour {
/// Alice always sends her messages as a response to a request from Bob. /// Alice always sends her messages as a response to a request from Bob.
pub fn send(&mut self, channel: ResponseChannel<AliceToBob>, msg: SwapResponse) { pub fn send(&mut self, channel: ResponseChannel<AliceToBob>, msg: SwapResponse) -> Result<()> {
let msg = AliceToBob::SwapResponse(Box::new(msg)); let msg = AliceToBob::SwapResponse(Box::new(msg));
self.rr.send_response(channel, msg); self.rr
.send_response(channel, msg)
.map_err(|_| anyhow!("Sending swap response failed"))
} }
fn poll( fn poll(
@ -105,6 +108,9 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>>
RequestResponseEvent::OutboundFailure { error, .. } => { RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error); error!("Outbound failure: {:?}", error);
} }
RequestResponseEvent::ResponseSent { .. } => {
debug!("Alice has sent an Amounts response to Bob");
}
} }
} }
} }

@ -97,6 +97,7 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<Request, Response>> for B
RequestResponseEvent::OutboundFailure { error, .. } => { RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error); error!("Outbound failure: {:?}", error);
} }
RequestResponseEvent::ResponseSent { .. } => {}
} }
} }
} }

@ -173,8 +173,8 @@ impl Builder {
bob::event_loop::EventLoop::new( bob::event_loop::EventLoop::new(
bob_transport, bob_transport,
bob_behaviour, bob_behaviour,
self.peer_id.clone(), self.peer_id,
self.alice_peer_id.clone(), self.alice_peer_id,
self.alice_address.clone(), self.alice_address.clone(),
) )
} }
@ -289,7 +289,7 @@ pub struct Behaviour {
impl Behaviour { impl Behaviour {
/// Sends a swap request to Alice to negotiate the swap. /// Sends a swap request to Alice to negotiate the swap.
pub fn send_swap_request(&mut self, alice: PeerId, swap_request: SwapRequest) { pub fn send_swap_request(&mut self, alice: PeerId, swap_request: SwapRequest) {
let _id = self.swap_request.send(alice.clone(), swap_request); let _id = self.swap_request.send(alice, swap_request);
info!("Requesting swap from: {}", alice); info!("Requesting swap from: {}", alice);
} }

@ -96,6 +96,9 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<Request, Response>> for B
RequestResponseEvent::OutboundFailure { error, .. } => { RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error); error!("Outbound failure: {:?}", error);
} }
RequestResponseEvent::ResponseSent { .. } => {
unreachable!("Bob should never send a Amounts response to Alice");
}
} }
} }
} }

@ -10,10 +10,7 @@ use crate::{
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use futures::FutureExt; use futures::FutureExt;
use libp2p::{core::Multiaddr, PeerId}; use libp2p::{core::Multiaddr, PeerId};
use tokio::{ use tokio::sync::mpsc::{Receiver, Sender};
stream::StreamExt,
sync::mpsc::{Receiver, Sender},
};
use tracing::{debug, error, info}; use tracing::{debug, error, info};
#[derive(Debug)] #[derive(Debug)]
@ -153,7 +150,7 @@ impl EventLoop {
})) }))
.build(); .build();
swarm.add_address(alice_peer_id.clone(), alice_addr); swarm.add_address(alice_peer_id, alice_addr);
let swap_response = Channels::new(); let swap_response = Channels::new();
let recv_message0 = Channels::new(); let recv_message0 = Channels::new();
@ -224,9 +221,9 @@ impl EventLoop {
OutEvent::EncryptedSignature => info!("Alice acknowledged encrypted signature received"), OutEvent::EncryptedSignature => info!("Alice acknowledged encrypted signature received"),
} }
}, },
option = self.dial_alice.next().fuse() => { option = self.dial_alice.recv().fuse() => {
if option.is_some() { if option.is_some() {
let peer_id = self.alice_peer_id.clone(); let peer_id = self.alice_peer_id;
if self.swarm.pt.is_connected(&peer_id) { if self.swarm.pt.is_connected(&peer_id) {
debug!("Already connected to Alice: {}", peer_id); debug!("Already connected to Alice: {}", peer_id);
let _ = self.conn_established.send(peer_id).await; let _ = self.conn_established.send(peer_id).await;
@ -240,31 +237,31 @@ impl EventLoop {
} }
} }
}, },
swap_request = self.send_swap_request.next().fuse() => { swap_request = self.send_swap_request.recv().fuse() => {
if let Some(swap_request) = swap_request { if let Some(swap_request) = swap_request {
self.swarm.send_swap_request(self.alice_peer_id.clone(), swap_request); self.swarm.send_swap_request(self.alice_peer_id, swap_request);
} }
}, },
msg0 = self.send_message0.next().fuse() => { msg0 = self.send_message0.recv().fuse() => {
if let Some(msg) = msg0 { if let Some(msg) = msg0 {
self.swarm.send_message0(self.alice_peer_id.clone(), msg); self.swarm.send_message0(self.alice_peer_id, msg);
} }
} }
msg1 = self.send_message1.next().fuse() => { msg1 = self.send_message1.recv().fuse() => {
if let Some(msg) = msg1 { if let Some(msg) = msg1 {
self.swarm.send_message1(self.alice_peer_id.clone(), msg); self.swarm.send_message1(self.alice_peer_id, msg);
} }
}, },
msg2 = self.send_message2.next().fuse() => { msg2 = self.send_message2.recv().fuse() => {
if let Some(msg) = msg2 { if let Some(msg) = msg2 {
self.swarm.send_message2(self.alice_peer_id.clone(), msg); self.swarm.send_message2(self.alice_peer_id, msg);
} }
}, },
encrypted_signature = self.send_encrypted_signature.next().fuse() => { encrypted_signature = self.send_encrypted_signature.recv().fuse() => {
if let Some(tx_redeem_encsig) = encrypted_signature { if let Some(tx_redeem_encsig) = encrypted_signature {
self.swarm.send_encrypted_signature(self.alice_peer_id.clone(), tx_redeem_encsig); self.swarm.send_encrypted_signature(self.alice_peer_id, tx_redeem_encsig);
} }
} }
} }

@ -102,6 +102,9 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>>
RequestResponseEvent::OutboundFailure { error, .. } => { RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error); error!("Outbound failure: {:?}", error);
} }
RequestResponseEvent::ResponseSent { .. } => {
unreachable!("Bob should never send a Amounts response to Alice");
}
} }
} }
} }

@ -97,6 +97,9 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>>
RequestResponseEvent::OutboundFailure { error, .. } => { RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error); error!("Outbound failure: {:?}", error);
} }
RequestResponseEvent::ResponseSent { .. } => {
unreachable!("Bob should never send a Amounts response to Alice");
}
} }
} }
} }

@ -95,6 +95,9 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>>
RequestResponseEvent::OutboundFailure { error, .. } => { RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error); error!("Outbound failure: {:?}", error);
} }
RequestResponseEvent::ResponseSent { .. } => {
unreachable!("Bob should never send a Amounts response to Alice");
}
} }
} }
} }

@ -103,6 +103,9 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>>
RequestResponseEvent::OutboundFailure { error, .. } => { RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error); error!("Outbound failure: {:?}", error);
} }
RequestResponseEvent::ResponseSent { .. } => {
error!("Bob should never send a Amounts response to Alice");
}
} }
} }
} }

@ -92,6 +92,7 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<Request, Response>> for B
RequestResponseEvent::OutboundFailure { error, .. } => { RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error); error!("Outbound failure: {:?}", error);
} }
RequestResponseEvent::ResponseSent { .. } => debug!("Bob ack'd transfer proof message"),
} }
} }
} }

@ -4,7 +4,7 @@ use tracing::{info, subscriber};
use tracing_log::LogTracer; use tracing_log::LogTracer;
use tracing_subscriber::FmtSubscriber; use tracing_subscriber::FmtSubscriber;
pub fn init_tracing(level: log::LevelFilter) -> anyhow::Result<()> { pub fn init_tracing(level: LevelFilter) -> anyhow::Result<()> {
if level == LevelFilter::Off { if level == LevelFilter::Off {
return Ok(()); return Ok(());
} }
@ -15,8 +15,8 @@ pub fn init_tracing(level: log::LevelFilter) -> anyhow::Result<()> {
let is_terminal = atty::is(atty::Stream::Stderr); let is_terminal = atty::is(atty::Stream::Stderr);
let subscriber = FmtSubscriber::builder() let subscriber = FmtSubscriber::builder()
.with_env_filter(format!( .with_env_filter(format!(
"swap={},xmr_btc={},monero_harness={},bitcoin_harness={},http=warn,warp=warn", "swap={},monero_harness={},bitcoin_harness={},http=warn,warp=warn",
level, level, level, level level, level, level,
)) ))
.with_writer(std::io::stderr) .with_writer(std::io::stderr)
.with_ansi(is_terminal) .with_ansi(is_terminal)

@ -73,7 +73,7 @@ impl BobParams {
self.bitcoin_wallet.clone(), self.bitcoin_wallet.clone(),
self.monero_wallet.clone(), self.monero_wallet.clone(),
self.alice_address.clone(), self.alice_address.clone(),
self.alice_peer_id.clone(), self.alice_peer_id,
self.config, self.config,
) )
} }

Loading…
Cancel
Save