593: Integrate rendezvous protocol r=thomaseizinger a=rishflab

Open things:

- [x] Default parameters for CLI
- [x] Print a human-readable table of sellers if `--json` is not passed
- [x] Unresolved comments of review

Co-authored-by: Daniel Karzel <daniel@comit.network>
Co-authored-by: Thomas Eizinger <thomas@eizinger.io>
Co-authored-by: rishflab <rishflab@hotmail.com>
pull/609/head
bors[bot] 3 years ago committed by GitHub
commit 3a99b753ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -14,6 +14,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Configuration setting for the websocket URL that the ASB connects to in order to receive price ticker updates.
Can be configured manually by editing the config.toml file directly.
It is expected that the server behind the url follows the same protocol as the [Kraken websocket api](https://docs.kraken.com/websockets/).
- Registration and discovery of ASBs using the [libp2p rendezvous protocol](https://github.com/libp2p/specs/blob/master/rendezvous/README.md).
ASBs can register with a rendezvous node upon startup and, once registered, can be automatically discovered by the CLI using the `list-sellers` command.
The rendezvous node address (`rendezvous_point`), as well as the ASB's external addresses (`external_addresses`) to be registered, is configured in the `network` section of the ASB config file.
A rendezvous node is provided at `/dnsaddr/rendezvous.coblox.tech/p2p/12D3KooWQUt9DkNZxEn2R5ymJzWj15MpG6mTW84kyd8vDaRZi46o` which is used as default for discovery in the CLI.
Upon discovery using `list-sellers` CLI users are provided with quote and connection information for each ASB discovered through the rendezvous node.
### Fixed

556
Cargo.lock generated

File diff suppressed because it is too large Load Diff

@ -19,6 +19,7 @@ bdk = "0.8"
big-bytes = "1"
bitcoin = { version = "0.26", features = [ "rand", "use-serde" ] }
bmrng = "0.5"
comfy-table = "4.0.0"
config = { version = "0.11", default-features = false, features = [ "toml" ] }
conquer-once = "0.3"
curve25519-dalek = { package = "curve25519-dalek-ng", version = "4" }
@ -29,12 +30,11 @@ ecdsa_fun = { git = "https://github.com/LLFourn/secp256kfun", default-features =
ed25519-dalek = "1"
futures = { version = "0.3", default-features = false }
itertools = "0.10"
libp2p = { version = "0.38", default-features = false, features = [ "tcp-tokio", "yamux", "mplex", "dns-tokio", "noise", "request-response", "websocket", "ping" ] }
libp2p = { git = "https://github.com/comit-network/rust-libp2p", branch = "rendezvous", default-features = false, features = [ "tcp-tokio", "yamux", "mplex", "dns-tokio", "noise", "request-response", "websocket", "ping", "rendezvous" ] }
miniscript = { version = "5", features = [ "serde" ] }
monero = { version = "0.12", features = [ "serde_support" ] }
monero-rpc = { path = "../monero-rpc" }
pem = "0.8"
prettytable-rs = "0.8"
proptest = "1"
qrcode = "0.12"
rand = "0.8"
@ -45,6 +45,7 @@ rust_decimal_macros = "1"
serde = { version = "1", features = [ "derive" ] }
serde_cbor = "0.11"
serde_json = "1"
serde_with = { version = "1.9.4", features = [ "macros" ] }
sha2 = "0.9"
sigma_fun = { git = "https://github.com/LLFourn/secp256kfun", default-features = false, features = [ "ed25519", "serde" ] }
sled = "0.34"

@ -1,14 +1,14 @@
mod behaviour;
pub mod command;
pub mod config;
mod event_loop;
mod network;
mod rate;
mod recovery;
pub mod tracing;
pub mod transport;
pub use behaviour::{Behaviour, OutEvent};
pub use event_loop::{EventLoop, EventLoopHandle, FixedRate, KrakenRate, LatestRate};
pub use network::behaviour::{Behaviour, OutEvent};
pub use network::transport;
pub use rate::Rate;
pub use recovery::cancel::cancel;
pub use recovery::punish::punish;
@ -16,3 +16,6 @@ pub use recovery::redeem::{redeem, Finality};
pub use recovery::refund::refund;
pub use recovery::safely_abort::safely_abort;
pub use recovery::{cancel, refund};
#[cfg(test)]
pub use network::rendezous;

@ -1,116 +0,0 @@
use crate::asb::event_loop::LatestRate;
use crate::env;
use crate::network::quote::BidQuote;
use crate::network::swap_setup::alice;
use crate::network::swap_setup::alice::WalletSnapshot;
use crate::network::{encrypted_signature, quote, transfer_proof};
use crate::protocol::alice::State3;
use anyhow::{anyhow, Error};
use libp2p::ping::{Ping, PingEvent};
use libp2p::request_response::{RequestId, ResponseChannel};
use libp2p::{NetworkBehaviour, PeerId};
use uuid::Uuid;
#[derive(Debug)]
pub enum OutEvent {
SwapSetupInitiated {
send_wallet_snapshot: bmrng::RequestReceiver<bitcoin::Amount, WalletSnapshot>,
},
SwapSetupCompleted {
peer_id: PeerId,
swap_id: Uuid,
state3: Box<State3>,
},
SwapDeclined {
peer: PeerId,
error: alice::Error,
},
QuoteRequested {
channel: ResponseChannel<BidQuote>,
peer: PeerId,
},
TransferProofAcknowledged {
peer: PeerId,
id: RequestId,
},
EncryptedSignatureReceived {
msg: Box<encrypted_signature::Request>,
channel: ResponseChannel<()>,
peer: PeerId,
},
Failure {
peer: PeerId,
error: Error,
},
/// "Fallback" variant that allows the event mapping code to swallow certain
/// events that we don't want the caller to deal with.
Other,
}
impl OutEvent {
pub fn unexpected_request(peer: PeerId) -> OutEvent {
OutEvent::Failure {
peer,
error: anyhow!("Unexpected request received"),
}
}
pub fn unexpected_response(peer: PeerId) -> OutEvent {
OutEvent::Failure {
peer,
error: anyhow!("Unexpected response received"),
}
}
}
/// A `NetworkBehaviour` that represents an XMR/BTC swap node as Alice.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", event_process = false)]
#[allow(missing_debug_implementations)]
pub struct Behaviour<LR>
where
LR: LatestRate + Send + 'static,
{
pub quote: quote::Behaviour,
pub swap_setup: alice::Behaviour<LR>,
pub transfer_proof: transfer_proof::Behaviour,
pub encrypted_signature: encrypted_signature::Behaviour,
/// Ping behaviour that ensures that the underlying network connection is
/// still alive. If the ping fails a connection close event will be
/// emitted that is picked up as swarm event.
ping: Ping,
}
impl<LR> Behaviour<LR>
where
LR: LatestRate + Send + 'static,
{
pub fn new(
min_buy: bitcoin::Amount,
max_buy: bitcoin::Amount,
latest_rate: LR,
resume_only: bool,
env_config: env::Config,
) -> Self {
Self {
quote: quote::asb(),
swap_setup: alice::Behaviour::new(
min_buy,
max_buy,
env_config,
latest_rate,
resume_only,
),
transfer_proof: transfer_proof::alice(),
encrypted_signature: encrypted_signature::alice(),
ping: Ping::default(),
}
}
}
impl From<PingEvent> for OutEvent {
fn from(_: PingEvent) -> Self {
OutEvent::Other
}
}

@ -1,5 +1,6 @@
use crate::env::{Mainnet, Testnet};
use crate::fs::{ensure_directory_exists, system_config_dir, system_data_dir};
use crate::network::rendezvous::DEFAULT_RENDEZVOUS_ADDRESS;
use crate::tor::{DEFAULT_CONTROL_PORT, DEFAULT_SOCKS5_PORT};
use anyhow::{bail, Context, Result};
use config::ConfigError;
@ -86,6 +87,7 @@ const DEFAULT_MAX_BUY_AMOUNT: f64 = 0.02f64;
const DEFAULT_SPREAD: f64 = 0.02f64;
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq)]
#[serde(deny_unknown_fields)]
pub struct Config {
pub data: Data,
pub network: Network,
@ -118,6 +120,10 @@ pub struct Data {
#[serde(deny_unknown_fields)]
pub struct Network {
pub listen: Vec<Multiaddr>,
#[serde(default)]
pub rendezvous_point: Option<Multiaddr>,
#[serde(default)]
pub external_addresses: Vec<Multiaddr>,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
@ -285,12 +291,25 @@ pub fn query_user_for_initial_config(testnet: bool) -> Result<Config> {
}
let ask_spread = Decimal::from_f64(ask_spread).context("Unable to parse spread")?;
let rendezvous_address = Input::with_theme(&ColorfulTheme::default())
.with_prompt("Do you want to advertise your ASB instance with a rendezvous node? Enter an empty string if not.")
.default(DEFAULT_RENDEZVOUS_ADDRESS.to_string())
.interact_text()?;
let rendezvous_point = if rendezvous_address.is_empty() {
None
} else {
Some(Multiaddr::from_str(&rendezvous_address)?)
};
println!();
Ok(Config {
data: Data { dir: data_dir },
network: Network {
listen: listen_addresses,
rendezvous_point,
external_addresses: vec![],
},
bitcoin: Bitcoin {
electrum_rpc_url,
@ -340,6 +359,8 @@ mod tests {
},
network: Network {
listen: vec![defaults.listen_address_tcp, defaults.listen_address_ws],
rendezvous_point: Some(DEFAULT_RENDEZVOUS_ADDRESS.parse().unwrap()),
external_addresses: vec![],
},
monero: Monero {
@ -381,6 +402,8 @@ mod tests {
},
network: Network {
listen: vec![defaults.listen_address_tcp, defaults.listen_address_ws],
rendezvous_point: Some(DEFAULT_RENDEZVOUS_ADDRESS.parse().unwrap()),
external_addresses: vec![],
},
monero: Monero {

@ -1,12 +1,10 @@
use crate::asb::behaviour::{Behaviour, OutEvent};
use crate::asb::Rate;
use crate::asb::{Behaviour, OutEvent, Rate};
use crate::database::Database;
use crate::env::Config;
use crate::network::quote::BidQuote;
use crate::network::swap_setup::alice::WalletSnapshot;
use crate::network::transfer_proof;
use crate::protocol::alice::{AliceState, State3, Swap};
use crate::{bitcoin, kraken, monero};
use crate::{bitcoin, env, kraken, monero};
use anyhow::{Context, Result};
use futures::future;
use futures::future::{BoxFuture, FutureExt};
@ -38,7 +36,7 @@ where
LR: LatestRate + Send + 'static + Debug + Clone,
{
swarm: libp2p::Swarm<Behaviour<LR>>,
env_config: Config,
env_config: env::Config,
bitcoin_wallet: Arc<bitcoin::Wallet>,
monero_wallet: Arc<monero::Wallet>,
db: Arc<Database>,
@ -70,7 +68,7 @@ where
#[allow(clippy::too_many_arguments)]
pub fn new(
swarm: Swarm<Behaviour<LR>>,
env_config: Config,
env_config: env::Config,
bitcoin_wallet: Arc<bitcoin::Wallet>,
monero_wallet: Arc<monero::Wallet>,
db: Arc<Database>,
@ -149,7 +147,7 @@ where
loop {
tokio::select! {
swarm_event = self.swarm.next_event() => {
swarm_event = self.swarm.select_next_some() => {
match swarm_event {
SwarmEvent::Behaviour(OutEvent::SwapSetupInitiated { mut send_wallet_snapshot }) => {
@ -173,7 +171,7 @@ where
let _ = responder.respond(wallet_snapshot);
}
SwarmEvent::Behaviour(OutEvent::SwapSetupCompleted{peer_id, swap_id, state3}) => {
let _ = self.handle_execution_setup_done(peer_id, swap_id, *state3).await;
let _ = self.handle_execution_setup_done(peer_id, swap_id, state3).await;
}
SwarmEvent::Behaviour(OutEvent::SwapDeclined { peer, error }) => {
tracing::warn!(%peer, "Ignoring spot price request because: {}", error);
@ -246,6 +244,12 @@ where
channel
}.boxed());
}
SwarmEvent::Behaviour(OutEvent::Rendezvous(libp2p::rendezvous::Event::Registered { .. })) => {
tracing::info!("Successfully registered with rendezvous node");
}
SwarmEvent::Behaviour(OutEvent::Rendezvous(libp2p::rendezvous::Event::RegisterFailed(error))) => {
tracing::error!("Registration with rendezvous node failed: {:#}", error);
}
SwarmEvent::Behaviour(OutEvent::Failure {peer, error}) => {
tracing::error!(
%peer,

@ -0,0 +1,441 @@
use crate::asb::event_loop::LatestRate;
use crate::env;
use crate::network::quote::BidQuote;
use crate::network::rendezvous::XmrBtcNamespace;
use crate::network::swap_setup::alice;
use crate::network::swap_setup::alice::WalletSnapshot;
use crate::network::transport::authenticate_and_multiplex;
use crate::network::{encrypted_signature, quote, transfer_proof};
use crate::protocol::alice::State3;
use anyhow::{anyhow, Error, Result};
use futures::FutureExt;
use libp2p::core::connection::ConnectionId;
use libp2p::core::muxing::StreamMuxerBox;
use libp2p::core::transport::Boxed;
use libp2p::dns::TokioDnsConfig;
use libp2p::ping::{Ping, PingEvent};
use libp2p::request_response::{RequestId, ResponseChannel};
use libp2p::swarm::{
DialPeerCondition, IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction,
PollParameters, ProtocolsHandler,
};
use libp2p::tcp::TokioTcpConfig;
use libp2p::websocket::WsConfig;
use libp2p::{identity, Multiaddr, NetworkBehaviour, PeerId, Transport};
use std::task::Poll;
use std::time::Duration;
use uuid::Uuid;
pub mod transport {
use super::*;
/// Creates the libp2p transport for the ASB.
pub fn new(identity: &identity::Keypair) -> Result<Boxed<(PeerId, StreamMuxerBox)>> {
let tcp = TokioTcpConfig::new().nodelay(true);
let tcp_with_dns = TokioDnsConfig::system(tcp)?;
let websocket_with_dns = WsConfig::new(tcp_with_dns.clone());
let transport = tcp_with_dns.or_transport(websocket_with_dns).boxed();
authenticate_and_multiplex(transport, identity)
}
}
pub mod behaviour {
use super::*;
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum OutEvent {
SwapSetupInitiated {
send_wallet_snapshot: bmrng::RequestReceiver<bitcoin::Amount, WalletSnapshot>,
},
SwapSetupCompleted {
peer_id: PeerId,
swap_id: Uuid,
state3: State3,
},
SwapDeclined {
peer: PeerId,
error: alice::Error,
},
QuoteRequested {
channel: ResponseChannel<BidQuote>,
peer: PeerId,
},
TransferProofAcknowledged {
peer: PeerId,
id: RequestId,
},
EncryptedSignatureReceived {
msg: encrypted_signature::Request,
channel: ResponseChannel<()>,
peer: PeerId,
},
Rendezvous(libp2p::rendezvous::Event),
Failure {
peer: PeerId,
error: Error,
},
/// "Fallback" variant that allows the event mapping code to swallow
/// certain events that we don't want the caller to deal with.
Other,
}
impl OutEvent {
pub fn unexpected_request(peer: PeerId) -> OutEvent {
OutEvent::Failure {
peer,
error: anyhow!("Unexpected request received"),
}
}
pub fn unexpected_response(peer: PeerId) -> OutEvent {
OutEvent::Failure {
peer,
error: anyhow!("Unexpected response received"),
}
}
}
/// A `NetworkBehaviour` that represents an XMR/BTC swap node as Alice.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", event_process = false)]
#[allow(missing_debug_implementations)]
pub struct Behaviour<LR>
where
LR: LatestRate + Send + 'static,
{
pub rendezvous: libp2p::swarm::toggle::Toggle<rendezous::Behaviour>,
pub quote: quote::Behaviour,
pub swap_setup: alice::Behaviour<LR>,
pub transfer_proof: transfer_proof::Behaviour,
pub encrypted_signature: encrypted_signature::Behaviour,
/// Ping behaviour that ensures that the underlying network connection
/// is still alive. If the ping fails a connection close event
/// will be emitted that is picked up as swarm event.
ping: Ping,
}
impl<LR> Behaviour<LR>
where
LR: LatestRate + Send + 'static,
{
pub fn new(
min_buy: bitcoin::Amount,
max_buy: bitcoin::Amount,
latest_rate: LR,
resume_only: bool,
env_config: env::Config,
rendezvous_params: Option<(identity::Keypair, PeerId, Multiaddr, XmrBtcNamespace)>,
) -> Self {
Self {
rendezvous: libp2p::swarm::toggle::Toggle::from(rendezvous_params.map(
|(identity, rendezvous_peer_id, rendezvous_address, namespace)| {
rendezous::Behaviour::new(
identity,
rendezvous_peer_id,
rendezvous_address,
namespace,
None, // use default ttl on rendezvous point
)
},
)),
quote: quote::asb(),
swap_setup: alice::Behaviour::new(
min_buy,
max_buy,
env_config,
latest_rate,
resume_only,
),
transfer_proof: transfer_proof::alice(),
encrypted_signature: encrypted_signature::alice(),
ping: Ping::default(),
}
}
}
impl From<PingEvent> for OutEvent {
fn from(_: PingEvent) -> Self {
OutEvent::Other
}
}
impl From<libp2p::rendezvous::Event> for OutEvent {
fn from(event: libp2p::rendezvous::Event) -> Self {
OutEvent::Rendezvous(event)
}
}
}
pub mod rendezous {
use super::*;
use std::pin::Pin;
#[derive(PartialEq)]
enum ConnectionStatus {
Disconnected,
Dialling,
Connected,
}
enum RegistrationStatus {
RegisterOnNextConnection,
Pending,
Registered {
re_register_in: Pin<Box<tokio::time::Sleep>>,
},
}
pub struct Behaviour {
inner: libp2p::rendezvous::Rendezvous,
rendezvous_point: Multiaddr,
rendezvous_peer_id: PeerId,
namespace: XmrBtcNamespace,
registration_status: RegistrationStatus,
connection_status: ConnectionStatus,
registration_ttl: Option<u64>,
}
impl Behaviour {
pub fn new(
identity: identity::Keypair,
rendezvous_peer_id: PeerId,
rendezvous_address: Multiaddr,
namespace: XmrBtcNamespace,
registration_ttl: Option<u64>,
) -> Self {
Self {
inner: libp2p::rendezvous::Rendezvous::new(
identity,
libp2p::rendezvous::Config::default(),
),
rendezvous_point: rendezvous_address,
rendezvous_peer_id,
namespace,
registration_status: RegistrationStatus::RegisterOnNextConnection,
connection_status: ConnectionStatus::Disconnected,
registration_ttl,
}
}
fn register(&mut self) {
self.inner.register(
self.namespace.into(),
self.rendezvous_peer_id,
self.registration_ttl,
);
}
}
impl NetworkBehaviour for Behaviour {
type ProtocolsHandler =
<libp2p::rendezvous::Rendezvous as NetworkBehaviour>::ProtocolsHandler;
type OutEvent = libp2p::rendezvous::Event;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
self.inner.new_handler()
}
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
if peer_id == &self.rendezvous_peer_id {
return vec![self.rendezvous_point.clone()];
}
vec![]
}
fn inject_connected(&mut self, peer_id: &PeerId) {
if peer_id == &self.rendezvous_peer_id {
self.connection_status = ConnectionStatus::Connected;
match &self.registration_status {
RegistrationStatus::RegisterOnNextConnection => {
self.register();
self.registration_status = RegistrationStatus::Pending;
}
RegistrationStatus::Registered { .. } => {}
RegistrationStatus::Pending => {}
}
}
}
fn inject_disconnected(&mut self, peer_id: &PeerId) {
if peer_id == &self.rendezvous_peer_id {
self.connection_status = ConnectionStatus::Disconnected;
}
}
fn inject_event(
&mut self,
peer_id: PeerId,
connection: ConnectionId,
event: <<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent,
) {
self.inner.inject_event(peer_id, connection, event)
}
fn inject_dial_failure(&mut self, peer_id: &PeerId) {
if peer_id == &self.rendezvous_peer_id {
self.connection_status = ConnectionStatus::Disconnected;
}
}
#[allow(clippy::type_complexity)]
fn poll(&mut self, cx: &mut std::task::Context<'_>, params: &mut impl PollParameters) -> Poll<NetworkBehaviourAction<<<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent, Self::OutEvent>>{
match &mut self.registration_status {
RegistrationStatus::RegisterOnNextConnection => match self.connection_status {
ConnectionStatus::Disconnected => {
self.connection_status = ConnectionStatus::Dialling;
return Poll::Ready(NetworkBehaviourAction::DialPeer {
peer_id: self.rendezvous_peer_id,
condition: DialPeerCondition::Disconnected,
});
}
ConnectionStatus::Dialling => {}
ConnectionStatus::Connected => {
self.registration_status = RegistrationStatus::Pending;
self.register();
}
},
RegistrationStatus::Registered { re_register_in } => {
if let Poll::Ready(()) = re_register_in.poll_unpin(cx) {
match self.connection_status {
ConnectionStatus::Connected => {
self.registration_status = RegistrationStatus::Pending;
self.register();
}
ConnectionStatus::Disconnected => {
self.registration_status =
RegistrationStatus::RegisterOnNextConnection;
return Poll::Ready(NetworkBehaviourAction::DialPeer {
peer_id: self.rendezvous_peer_id,
condition: DialPeerCondition::Disconnected,
});
}
ConnectionStatus::Dialling => {}
}
}
}
RegistrationStatus::Pending => {}
}
let inner_poll = self.inner.poll(cx, params);
// reset the timer if we successfully registered
if let Poll::Ready(NetworkBehaviourAction::GenerateEvent(
libp2p::rendezvous::Event::Registered { ttl, .. },
)) = &inner_poll
{
let half_of_ttl = Duration::from_secs(*ttl) / 2;
self.registration_status = RegistrationStatus::Registered {
re_register_in: Box::pin(tokio::time::sleep(half_of_ttl)),
};
}
inner_poll
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::network::test::{new_swarm, SwarmExt};
use futures::StreamExt;
use libp2p::swarm::SwarmEvent;
#[tokio::test]
async fn given_no_initial_connection_when_constructed_asb_connects_and_registers_with_rendezvous_node(
) {
let mut rendezvous_node = new_swarm(|_, identity| {
libp2p::rendezvous::Rendezvous::new(identity, libp2p::rendezvous::Config::default())
});
let rendezvous_address = rendezvous_node.listen_on_random_memory_address().await;
let mut asb = new_swarm(|_, identity| {
rendezous::Behaviour::new(
identity,
*rendezvous_node.local_peer_id(),
rendezvous_address,
XmrBtcNamespace::Testnet,
None,
)
});
asb.listen_on_random_memory_address().await; // this adds an external address
tokio::spawn(async move {
loop {
rendezvous_node.next().await;
}
});
let asb_registered = tokio::spawn(async move {
loop {
if let SwarmEvent::Behaviour(libp2p::rendezvous::Event::Registered { .. }) =
asb.select_next_some().await
{
break;
}
}
});
tokio::time::timeout(Duration::from_secs(10), asb_registered)
.await
.unwrap()
.unwrap();
}
#[tokio::test]
async fn asb_automatically_re_registers() {
let min_ttl = 5;
let mut rendezvous_node = new_swarm(|_, identity| {
libp2p::rendezvous::Rendezvous::new(
identity,
libp2p::rendezvous::Config::default().with_min_ttl(min_ttl),
)
});
let rendezvous_address = rendezvous_node.listen_on_random_memory_address().await;
let mut asb = new_swarm(|_, identity| {
rendezous::Behaviour::new(
identity,
*rendezvous_node.local_peer_id(),
rendezvous_address,
XmrBtcNamespace::Testnet,
Some(5),
)
});
asb.listen_on_random_memory_address().await; // this adds an external address
tokio::spawn(async move {
loop {
rendezvous_node.next().await;
}
});
let asb_registered_three_times = tokio::spawn(async move {
let mut number_of_registrations = 0;
loop {
if let SwarmEvent::Behaviour(libp2p::rendezvous::Event::Registered { .. }) =
asb.select_next_some().await
{
number_of_registrations += 1
}
if number_of_registrations == 3 {
break;
}
}
});
tokio::time::timeout(Duration::from_secs(30), asb_registered_three_times)
.await
.unwrap()
.unwrap();
}
}
}

@ -1,19 +0,0 @@
use crate::network::transport::authenticate_and_multiplex;
use anyhow::Result;
use libp2p::core::muxing::StreamMuxerBox;
use libp2p::core::transport::Boxed;
use libp2p::dns::TokioDnsConfig;
use libp2p::tcp::TokioTcpConfig;
use libp2p::websocket::WsConfig;
use libp2p::{identity, PeerId, Transport};
/// Creates the libp2p transport for the ASB.
pub fn new(identity: &identity::Keypair) -> Result<Boxed<(PeerId, StreamMuxerBox)>> {
let tcp = TokioTcpConfig::new().nodelay(true);
let tcp_with_dns = TokioDnsConfig::system(tcp)?;
let websocket_with_dns = WsConfig::new(tcp_with_dns.clone());
let transport = tcp_with_dns.or_transport(websocket_with_dns).boxed();
authenticate_and_multiplex(transport, identity)
}

@ -13,10 +13,11 @@
#![allow(non_snake_case)]
use anyhow::{bail, Context, Result};
use comfy_table::Table;
use libp2p::core::multiaddr::Protocol;
use libp2p::core::Multiaddr;
use libp2p::swarm::AddressScore;
use libp2p::Swarm;
use prettytable::{row, Table};
use std::env;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;
@ -29,6 +30,7 @@ use swap::asb::config::{
use swap::asb::{cancel, punish, redeem, refund, safely_abort, EventLoop, Finality, KrakenRate};
use swap::database::Database;
use swap::monero::Amount;
use swap::network::rendezvous::XmrBtcNamespace;
use swap::network::swarm;
use swap::protocol::alice::run;
use swap::seed::Seed;
@ -37,9 +39,6 @@ use swap::{asb, bitcoin, kraken, monero, tor};
use tracing::{debug, info, warn};
use tracing_subscriber::filter::LevelFilter;
#[macro_use]
extern crate prettytable;
const DEFAULT_WALLET_NAME: &str = "asb-wallet";
#[tokio::main]
@ -124,7 +123,7 @@ async fn main() -> Result<()> {
info!(%monero_balance, "Initialized Monero wallet");
}
let kraken_price_updates = kraken::connect(config.maker.price_ticker_ws_url)?;
let kraken_price_updates = kraken::connect(config.maker.price_ticker_ws_url.clone())?;
// setup Tor hidden services
let tor_client =
@ -151,15 +150,33 @@ async fn main() -> Result<()> {
kraken_rate.clone(),
resume_only,
env_config,
config.network.rendezvous_point.map(|rendezvous_point| {
(
rendezvous_point,
if testnet {
XmrBtcNamespace::Testnet
} else {
XmrBtcNamespace::Mainnet
},
)
}),
)?;
for listen in config.network.listen {
for listen in config.network.listen.clone() {
Swarm::listen_on(&mut swarm, listen.clone())
.with_context(|| format!("Failed to listen on network interface {}", listen))?;
}
tracing::info!(peer_id = %swarm.local_peer_id(), "Network layer initialized");
for external_address in config.network.external_addresses {
let _ = Swarm::add_external_address(
&mut swarm,
external_address,
AddressScore::Infinite,
);
}
let (event_loop, mut swap_receiver) = EventLoop::new(
swarm,
env_config,
@ -194,14 +211,13 @@ async fn main() -> Result<()> {
Command::History => {
let mut table = Table::new();
table.add_row(row!["SWAP ID", "STATE"]);
table.set_header(vec!["SWAP ID", "STATE"]);
for (swap_id, state) in db.all_alice()? {
table.add_row(row![swap_id, state]);
table.add_row(vec![swap_id.to_string(), state.to_string()]);
}
// Print the table to stdout
table.printstd();
println!("{}", table);
}
Command::WithdrawBtc { amount, address } => {
let bitcoin_wallet = init_bitcoin_wallet(&config, &seed, env_config).await?;

@ -13,7 +13,7 @@
#![allow(non_snake_case)]
use anyhow::{bail, Context, Result};
use prettytable::{row, Table};
use comfy_table::Table;
use qrcode::render::unicode;
use qrcode::QrCode;
use std::cmp::min;
@ -24,7 +24,7 @@ use std::sync::Arc;
use std::time::Duration;
use swap::bitcoin::TxLock;
use swap::cli::command::{parse_args_and_apply_defaults, Arguments, Command, ParseResult};
use swap::cli::EventLoop;
use swap::cli::{list_sellers, EventLoop};
use swap::database::Database;
use swap::env::Config;
use swap::libp2p_ext::MultiAddrExt;
@ -38,9 +38,6 @@ use tracing::{debug, error, info, warn};
use url::Url;
use uuid::Uuid;
#[macro_use]
extern crate prettytable;
#[tokio::main]
async fn main() -> Result<()> {
let Arguments {
@ -68,7 +65,7 @@ async fn main() -> Result<()> {
} => {
let swap_id = Uuid::new_v4();
cli::tracing::init(debug, json, data_dir.join("logs"), swap_id)?;
cli::tracing::init(debug, json, data_dir.join("logs"), Some(swap_id))?;
let db = Database::open(data_dir.join("database").as_path())
.context("Failed to open database")?;
let seed = Seed::from_file_or_generate(data_dir.as_path())
@ -91,14 +88,9 @@ async fn main() -> Result<()> {
.context("Seller address must contain peer ID")?;
db.insert_address(seller_peer_id, seller.clone()).await?;
let mut swarm = swarm::cli(
&seed,
seller_peer_id,
tor_socks5_port,
env_config,
bitcoin_wallet.clone(),
)
.await?;
let behaviour = cli::Behaviour::new(seller_peer_id, env_config, bitcoin_wallet.clone());
let mut swarm =
swarm::cli(seed.derive_libp2p_identity(), tor_socks5_port, behaviour).await?;
swarm.behaviour_mut().add_address(seller_peer_id, seller);
tracing::debug!(peer_id = %swarm.local_peer_id(), "Network layer initialized");
@ -149,14 +141,13 @@ async fn main() -> Result<()> {
let mut table = Table::new();
table.add_row(row!["SWAP ID", "STATE"]);
table.set_header(vec!["SWAP ID", "STATE"]);
for (swap_id, state) in db.all_bob()? {
table.add_row(row![swap_id, state]);
table.add_row(vec![swap_id.to_string(), state.to_string()]);
}
// Print the table to stdout
table.printstd();
println!("{}", table);
}
Command::Resume {
swap_id,
@ -166,7 +157,7 @@ async fn main() -> Result<()> {
monero_daemon_address,
tor_socks5_port,
} => {
cli::tracing::init(debug, json, data_dir.join("logs"), swap_id)?;
cli::tracing::init(debug, json, data_dir.join("logs"), Some(swap_id))?;
let db = Database::open(data_dir.join("database").as_path())
.context("Failed to open database")?;
let seed = Seed::from_file_or_generate(data_dir.as_path())
@ -191,14 +182,9 @@ async fn main() -> Result<()> {
let seller_peer_id = db.get_peer_id(swap_id)?;
let seller_addresses = db.get_addresses(seller_peer_id)?;
let mut swarm = swarm::cli(
&seed,
seller_peer_id,
tor_socks5_port,
env_config,
bitcoin_wallet.clone(),
)
.await?;
let behaviour = cli::Behaviour::new(seller_peer_id, env_config, bitcoin_wallet.clone());
let mut swarm =
swarm::cli(seed.derive_libp2p_identity(), tor_socks5_port, behaviour).await?;
let our_peer_id = swarm.local_peer_id();
tracing::debug!(peer_id = %our_peer_id, "Initializing network module");
@ -237,7 +223,7 @@ async fn main() -> Result<()> {
bitcoin_electrum_rpc_url,
bitcoin_target_block,
} => {
cli::tracing::init(debug, json, data_dir.join("logs"), swap_id)?;
cli::tracing::init(debug, json, data_dir.join("logs"), Some(swap_id))?;
let db = Database::open(data_dir.join("database").as_path())
.context("Failed to open database")?;
let seed = Seed::from_file_or_generate(data_dir.as_path())
@ -269,7 +255,7 @@ async fn main() -> Result<()> {
bitcoin_electrum_rpc_url,
bitcoin_target_block,
} => {
cli::tracing::init(debug, json, data_dir.join("logs"), swap_id)?;
cli::tracing::init(debug, json, data_dir.join("logs"), Some(swap_id))?;
let db = Database::open(data_dir.join("database").as_path())
.context("Failed to open database")?;
let seed = Seed::from_file_or_generate(data_dir.as_path())
@ -286,6 +272,50 @@ async fn main() -> Result<()> {
cli::refund(swap_id, Arc::new(bitcoin_wallet), db, force).await??;
}
Command::ListSellers {
rendezvous_node_addr,
namespace,
tor_socks5_port,
} => {
let rendezvous_node_peer_id = rendezvous_node_addr
.extract_peer_id()
.context("Rendezvous node address must contain peer ID")?;
cli::tracing::init(debug, json, data_dir.join("logs"), None)?;
let seed = Seed::from_file_or_generate(data_dir.as_path())
.context("Failed to read in seed file")?;
let identity = seed.derive_libp2p_identity();
let sellers = list_sellers(
rendezvous_node_peer_id,
rendezvous_node_addr,
namespace,
tor_socks5_port,
identity,
)
.await?;
if json {
for seller in sellers {
println!("{}", serde_json::to_string(&seller)?);
}
} else {
let mut table = Table::new();
table.set_header(vec!["PRICE", "MIN_QUANTITY", "MAX_QUANTITY", "ADDRESS"]);
for seller in sellers {
table.add_row(vec![
seller.quote.price.to_string(),
seller.quote.min_quantity.to_string(),
seller.quote.max_quantity.to_string(),
seller.multiaddr.to_string(),
]);
}
println!("{}", table);
}
}
};
Ok(())
}

@ -2,6 +2,7 @@ mod behaviour;
pub mod cancel;
pub mod command;
mod event_loop;
mod list_sellers;
pub mod refund;
pub mod tracing;
pub mod transport;
@ -9,4 +10,170 @@ pub mod transport;
pub use behaviour::{Behaviour, OutEvent};
pub use cancel::cancel;
pub use event_loop::{EventLoop, EventLoopHandle};
pub use list_sellers::list_sellers;
pub use refund::refund;
#[cfg(test)]
mod tests {
use super::*;
use crate::asb;
use crate::cli::list_sellers::Seller;
use crate::network::quote;
use crate::network::quote::BidQuote;
use crate::network::rendezvous::XmrBtcNamespace;
use crate::network::test::{new_swarm, SwarmExt};
use futures::StreamExt;
use libp2p::multiaddr::Protocol;
use libp2p::request_response::RequestResponseEvent;
use libp2p::swarm::{AddressScore, NetworkBehaviourEventProcess};
use libp2p::{identity, Multiaddr, PeerId};
use std::collections::HashSet;
use std::iter::FromIterator;
use std::time::Duration;
#[tokio::test]
async fn list_sellers_should_report_all_registered_asbs_with_a_quote() {
let namespace = XmrBtcNamespace::Mainnet;
let (rendezvous_address, rendezvous_peer_id) = setup_rendezvous_point().await;
let expected_seller_1 =
setup_asb(rendezvous_peer_id, rendezvous_address.clone(), namespace).await;
let expected_seller_2 =
setup_asb(rendezvous_peer_id, rendezvous_address.clone(), namespace).await;
let list_sellers = list_sellers(
rendezvous_peer_id,
rendezvous_address,
namespace,
0,
identity::Keypair::generate_ed25519(),
);
let sellers = tokio::time::timeout(Duration::from_secs(15), list_sellers)
.await
.unwrap()
.unwrap();
assert_eq!(
HashSet::<Seller>::from_iter(sellers),
HashSet::<Seller>::from_iter([expected_seller_1, expected_seller_2])
)
}
async fn setup_rendezvous_point() -> (Multiaddr, PeerId) {
let mut rendezvous_node = new_swarm(|_, identity| RendezvousPointBehaviour {
rendezvous: libp2p::rendezvous::Rendezvous::new(
identity,
libp2p::rendezvous::Config::default(),
),
ping: Default::default(),
});
let rendezvous_address = rendezvous_node.listen_on_tcp_localhost().await;
let rendezvous_peer_id = *rendezvous_node.local_peer_id();
tokio::spawn(async move {
loop {
rendezvous_node.next().await;
}
});
(rendezvous_address, rendezvous_peer_id)
}
async fn setup_asb(
rendezvous_peer_id: PeerId,
rendezvous_address: Multiaddr,
namespace: XmrBtcNamespace,
) -> Seller {
let static_quote = BidQuote {
price: bitcoin::Amount::from_sat(1337),
min_quantity: bitcoin::Amount::from_sat(42),
max_quantity: bitcoin::Amount::from_sat(9001),
};
let mut asb = new_swarm(|_, identity| StaticQuoteAsbBehaviour {
rendezvous: asb::rendezous::Behaviour::new(
identity,
rendezvous_peer_id,
rendezvous_address,
namespace,
None,
),
ping: Default::default(),
quote: quote::asb(),
static_quote,
registered: false,
});
let asb_address = asb.listen_on_tcp_localhost().await;
asb.add_external_address(asb_address.clone(), AddressScore::Infinite);
let asb_peer_id = *asb.local_peer_id();
// avoid race condition where `list_sellers` tries to discover before we are
// registered block this function until we are registered
while !asb.behaviour().registered {
asb.next().await;
}
tokio::spawn(async move {
loop {
asb.next().await;
}
});
Seller {
multiaddr: asb_address.with(Protocol::P2p(asb_peer_id.into())),
quote: static_quote,
}
}
#[derive(libp2p::NetworkBehaviour)]
struct StaticQuoteAsbBehaviour {
rendezvous: asb::rendezous::Behaviour,
// Support `Ping` as a workaround until https://github.com/libp2p/rust-libp2p/issues/2109 is fixed.
ping: libp2p::ping::Ping,
quote: quote::Behaviour,
#[behaviour(ignore)]
static_quote: BidQuote,
#[behaviour(ignore)]
registered: bool,
}
impl NetworkBehaviourEventProcess<libp2p::rendezvous::Event> for StaticQuoteAsbBehaviour {
fn inject_event(&mut self, event: libp2p::rendezvous::Event) {
if let libp2p::rendezvous::Event::Registered { .. } = event {
self.registered = true;
}
}
}
impl NetworkBehaviourEventProcess<libp2p::ping::PingEvent> for StaticQuoteAsbBehaviour {
fn inject_event(&mut self, _: libp2p::ping::PingEvent) {}
}
impl NetworkBehaviourEventProcess<quote::OutEvent> for StaticQuoteAsbBehaviour {
fn inject_event(&mut self, event: quote::OutEvent) {
if let RequestResponseEvent::Message {
message: quote::Message::Request { channel, .. },
..
} = event
{
self.quote
.send_response(channel, self.static_quote)
.unwrap();
}
}
}
#[derive(libp2p::NetworkBehaviour)]
struct RendezvousPointBehaviour {
rendezvous: libp2p::rendezvous::Rendezvous,
// Support `Ping` as a workaround until https://github.com/libp2p/rust-libp2p/issues/2109 is fixed.
ping: libp2p::ping::Ping,
}
impl NetworkBehaviourEventProcess<libp2p::rendezvous::Event> for RendezvousPointBehaviour {
fn inject_event(&mut self, _: libp2p::rendezvous::Event) {}
}
impl NetworkBehaviourEventProcess<libp2p::ping::PingEvent> for RendezvousPointBehaviour {
fn inject_event(&mut self, _: libp2p::ping::PingEvent) {}
}
}

@ -1,5 +1,6 @@
use crate::env::GetConfig;
use crate::fs::system_data_dir;
use crate::network::rendezvous::{XmrBtcNamespace, DEFAULT_RENDEZVOUS_ADDRESS};
use crate::{env, monero};
use anyhow::{Context, Result};
use libp2p::core::Multiaddr;
@ -188,6 +189,20 @@ where
bitcoin_target_block: bitcoin_target_block_from(bitcoin_target_block, is_testnet),
},
},
RawCommand::ListSellers {
rendezvous_node_addr,
tor: Tor { tor_socks5_port },
} => Arguments {
env_config: env_config_from(is_testnet),
debug,
json,
data_dir: data::data_dir_from(data, is_testnet)?,
cmd: Command::ListSellers {
rendezvous_node_addr,
namespace: rendezvous_namespace_from(is_testnet),
tor_socks5_port,
},
},
};
Ok(ParseResult::Arguments(arguments))
@ -224,6 +239,11 @@ pub enum Command {
bitcoin_electrum_rpc_url: Url,
bitcoin_target_block: usize,
},
ListSellers {
rendezvous_node_addr: Multiaddr,
namespace: XmrBtcNamespace,
tor_socks5_port: u16,
},
}
#[derive(structopt::StructOpt, Debug)]
@ -311,6 +331,17 @@ pub enum RawCommand {
#[structopt(flatten)]
bitcoin: Bitcoin,
},
ListSellers {
#[structopt(
long,
help = "The multiaddr (including peer-id) of a rendezvous node that sellers register with",
default_value = DEFAULT_RENDEZVOUS_ADDRESS
)]
rendezvous_node_addr: Multiaddr,
#[structopt(flatten)]
tor: Tor,
},
}
#[derive(structopt::StructOpt, Debug)]
@ -406,6 +437,14 @@ fn bitcoin_electrum_rpc_url_from(url: Option<Url>, testnet: bool) -> Result<Url>
}
}
fn rendezvous_namespace_from(is_testnet: bool) -> XmrBtcNamespace {
if is_testnet {
XmrBtcNamespace::Testnet
} else {
XmrBtcNamespace::Mainnet
}
}
fn bitcoin_target_block_from(target_block: Option<usize>, testnet: bool) -> usize {
if let Some(target_block) = target_block {
target_block

@ -94,7 +94,7 @@ impl EventLoop {
loop {
// Note: We are making very elaborate use of `select!` macro's feature here. Make sure to read the documentation thoroughly: https://docs.rs/tokio/1.4.0/tokio/macro.select.html
tokio::select! {
swarm_event = self.swarm.next_event().fuse() => {
swarm_event = self.swarm.select_next_some() => {
match swarm_event {
SwarmEvent::Behaviour(OutEvent::QuoteReceived { id, response }) => {
if let Some(responder) = self.inflight_quote_requests.remove(&id) {

@ -0,0 +1,281 @@
use crate::network::quote::BidQuote;
use crate::network::rendezvous::XmrBtcNamespace;
use crate::network::{quote, swarm};
use anyhow::{Context, Result};
use futures::StreamExt;
use libp2p::multiaddr::Protocol;
use libp2p::ping::{Ping, PingConfig, PingEvent};
use libp2p::rendezvous::{Namespace, Rendezvous};
use libp2p::request_response::{RequestResponseEvent, RequestResponseMessage};
use libp2p::swarm::SwarmEvent;
use libp2p::{identity, rendezvous, Multiaddr, PeerId, Swarm};
use serde::Serialize;
use serde_with::{serde_as, DisplayFromStr};
use std::collections::HashMap;
use std::time::Duration;
pub async fn list_sellers(
rendezvous_node_peer_id: PeerId,
rendezvous_node_addr: Multiaddr,
namespace: XmrBtcNamespace,
tor_socks5_port: u16,
identity: identity::Keypair,
) -> Result<Vec<Seller>> {
let behaviour = Behaviour {
rendezvous: Rendezvous::new(identity.clone(), rendezvous::Config::default()),
quote: quote::cli(),
ping: Ping::new(
PingConfig::new()
.with_keep_alive(false)
.with_interval(Duration::from_secs(86_400)),
),
};
let mut swarm = swarm::cli(identity, tor_socks5_port, behaviour).await?;
swarm
.behaviour_mut()
.quote
.add_address(&rendezvous_node_peer_id, rendezvous_node_addr.clone());
swarm
.dial(&rendezvous_node_peer_id)
.context("Failed to dial rendezvous node")?;
let event_loop = EventLoop::new(
swarm,
rendezvous_node_peer_id,
rendezvous_node_addr,
namespace,
);
let sellers = event_loop.run().await;
Ok(sellers)
}
#[serde_as]
#[derive(Debug, Serialize, PartialEq, Eq, Hash)]
pub struct Seller {
#[serde_as(as = "DisplayFromStr")]
pub multiaddr: Multiaddr,
pub quote: BidQuote,
}
#[derive(Debug)]
enum OutEvent {
Rendezvous(rendezvous::Event),
Quote(quote::OutEvent),
Ping(PingEvent),
}
impl From<rendezvous::Event> for OutEvent {
fn from(event: rendezvous::Event) -> Self {
OutEvent::Rendezvous(event)
}
}
impl From<quote::OutEvent> for OutEvent {
fn from(event: quote::OutEvent) -> Self {
OutEvent::Quote(event)
}
}
#[derive(libp2p::NetworkBehaviour)]
#[behaviour(event_process = false)]
#[behaviour(out_event = "OutEvent")]
struct Behaviour {
rendezvous: Rendezvous,
quote: quote::Behaviour,
ping: Ping,
}
#[derive(Debug)]
enum QuoteStatus {
Pending,
Received(BidQuote),
}
#[derive(Debug)]
enum State {
WaitForDiscovery,
WaitForQuoteCompletion,
}
struct EventLoop {
swarm: Swarm<Behaviour>,
rendezvous_peer_id: PeerId,
rendezvous_addr: Multiaddr,
namespace: XmrBtcNamespace,
asb_address: HashMap<PeerId, Multiaddr>,
asb_quote_status: HashMap<PeerId, QuoteStatus>,
state: State,
}
impl EventLoop {
fn new(
swarm: Swarm<Behaviour>,
rendezvous_peer_id: PeerId,
rendezvous_addr: Multiaddr,
namespace: XmrBtcNamespace,
) -> Self {
Self {
swarm,
rendezvous_peer_id,
rendezvous_addr,
namespace,
asb_address: Default::default(),
asb_quote_status: Default::default(),
state: State::WaitForDiscovery,
}
}
async fn run(mut self) -> Vec<Seller> {
loop {
tokio::select! {
swarm_event = self.swarm.select_next_some() => {
match swarm_event {
SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } => {
if peer_id == self.rendezvous_peer_id{
tracing::info!(
"Connected to rendezvous point, discovering nodes in '{}' namespace ...",
self.namespace
);
self.swarm.behaviour_mut().rendezvous.discover(
Some(Namespace::new(self.namespace.to_string()).expect("our namespace to be a correct string")),
None,
None,
self.rendezvous_peer_id,
);
} else {
let address = endpoint.get_remote_address();
self.asb_address.insert(peer_id, address.clone());
}
}
SwarmEvent::UnreachableAddr { peer_id, error, address, .. } => {
if address == self.rendezvous_addr {
tracing::error!(
"Failed to connect to rendezvous point at {}: {}",
address,
error
);
// if the rendezvous node is unreachable we just stop
return Vec::new();
} else {
tracing::debug!(
"Failed to connect to peer at {}: {}",
address,
error
);
// if a different peer than the rendezvous node is unreachable (i.e. a seller) we remove that seller from the quote status state
self.asb_quote_status.remove(&peer_id);
}
}
SwarmEvent::Behaviour(OutEvent::Rendezvous(
rendezvous::Event::Discovered { registrations, .. },
)) => {
self.state = State::WaitForQuoteCompletion;
for registration in registrations {
let peer = registration.record.peer_id();
for address in registration.record.addresses() {
tracing::info!("Discovered peer {} at {}", peer, address);
let p2p_suffix = Protocol::P2p(*peer.as_ref());
let _address_with_p2p = if !address
.ends_with(&Multiaddr::empty().with(p2p_suffix.clone()))
{
address.clone().with(p2p_suffix)
} else {
address.clone()
};
self.asb_quote_status.insert(peer, QuoteStatus::Pending);
// add all external addresses of that peer to the quote behaviour
self.swarm.behaviour_mut().quote.add_address(&peer, address.clone());
}
// request the quote, if we are not connected to the peer it will be dialed automatically
let _request_id = self.swarm.behaviour_mut().quote.send_request(&peer, ());
}
}
SwarmEvent::Behaviour(OutEvent::Quote(quote_response)) => {
match quote_response {
RequestResponseEvent::Message { peer, message } => {
match message {
RequestResponseMessage::Response { response, .. } => {
if self.asb_quote_status.insert(peer, QuoteStatus::Received(response)).is_none() {
tracing::error!(%peer, "Received bid quote from unexpected peer, this record will be removed!");
self.asb_quote_status.remove(&peer);
}
}
RequestResponseMessage::Request { .. } => unreachable!()
}
}
RequestResponseEvent::OutboundFailure { peer, error, .. } => {
if peer == self.rendezvous_peer_id {
tracing::debug!(%peer, "Outbound failure when communicating with rendezvous node: {:#}", error);
} else {
tracing::debug!(%peer, "Ignoring seller, because unable to request quote: {:#}", error);
self.asb_quote_status.remove(&peer);
}
}
RequestResponseEvent::InboundFailure { peer, error, .. } => {
if peer == self.rendezvous_peer_id {
tracing::debug!(%peer, "Inbound failure when communicating with rendezvous node: {:#}", error);
} else {
tracing::debug!(%peer, "Ignoring seller, because unable to request quote: {:#}", error);
self.asb_quote_status.remove(&peer);
}
},
RequestResponseEvent::ResponseSent { .. } => unreachable!()
}
}
_ => {}
}
}
}
match self.state {
State::WaitForDiscovery => {
continue;
}
State::WaitForQuoteCompletion => {
let all_quotes_fetched = self
.asb_quote_status
.iter()
.map(|(peer_id, quote_status)| match quote_status {
QuoteStatus::Pending => Err(StillPending {}),
QuoteStatus::Received(quote) => {
let address = self
.asb_address
.get(&peer_id)
.expect("if we got a quote we must have stored an address");
Ok(Seller {
multiaddr: address.clone(),
quote: *quote,
})
}
})
.collect::<Result<Vec<_>, _>>();
match all_quotes_fetched {
Ok(sellers) => break sellers,
Err(StillPending {}) => continue,
}
}
}
}
}
}
#[derive(Debug)]
struct StillPending {}
impl From<PingEvent> for OutEvent {
fn from(event: PingEvent) -> Self {
OutEvent::Ping(event)
}
}

@ -1,4 +1,5 @@
use anyhow::Result;
use std::option::Option::Some;
use std::path::Path;
use tracing::subscriber::set_global_default;
use tracing::{Event, Level, Subscriber};
@ -8,7 +9,7 @@ use tracing_subscriber::layer::{Context, SubscriberExt};
use tracing_subscriber::{fmt, EnvFilter, FmtSubscriber, Layer, Registry};
use uuid::Uuid;
pub fn init(debug: bool, json: bool, dir: impl AsRef<Path>, swap_id: Uuid) -> Result<()> {
pub fn init(debug: bool, json: bool, dir: impl AsRef<Path>, swap_id: Option<Uuid>) -> Result<()> {
if json {
let level = if debug { Level::DEBUG } else { Level::INFO };
@ -24,7 +25,7 @@ pub fn init(debug: bool, json: bool, dir: impl AsRef<Path>, swap_id: Uuid) -> Re
.init();
Ok(())
} else {
} else if let Some(swap_id) = swap_id {
let level_filter = EnvFilter::try_new("swap=debug")?;
let registry = Registry::default().with(level_filter);
@ -45,6 +46,19 @@ pub fn init(debug: bool, json: bool, dir: impl AsRef<Path>, swap_id: Uuid) -> Re
set_global_default(registry.with(file_logger).with(info_terminal_printer()))?;
}
Ok(())
} else {
let level = if debug { Level::DEBUG } else { Level::INFO };
let is_terminal = atty::is(atty::Stream::Stderr);
FmtSubscriber::builder()
.with_env_filter(format!("swap={}", level))
.with_writer(std::io::stderr)
.with_ansi(is_terminal)
.with_timer(ChronoLocal::with_format("%F %T".to_owned()))
.with_target(false)
.init();
Ok(())
}
}

@ -7,6 +7,7 @@ pub mod encrypted_signature;
pub mod json_pull_codec;
pub mod quote;
pub mod redial;
pub mod rendezvous;
pub mod swap_setup;
pub mod swarm;
pub mod tor_transport;

@ -52,7 +52,7 @@ impl From<(PeerId, Message)> for asb::OutEvent {
Message::Request {
request, channel, ..
} => Self::EncryptedSignatureReceived {
msg: Box::new(request),
msg: request,
channel,
peer,
},

@ -9,8 +9,8 @@ use libp2p::PeerId;
use serde::{Deserialize, Serialize};
const PROTOCOL: &str = "/comit/xmr/btc/bid-quote/1.0.0";
type OutEvent = RequestResponseEvent<(), BidQuote>;
type Message = RequestResponseMessage<(), BidQuote>;
pub type OutEvent = RequestResponseEvent<(), BidQuote>;
pub type Message = RequestResponseMessage<(), BidQuote>;
pub type Behaviour = RequestResponse<JsonPullCodec<BidQuoteProtocol, BidQuote>>;
@ -24,7 +24,7 @@ impl ProtocolName for BidQuoteProtocol {
}
/// Represents a quote for buying XMR.
#[derive(Serialize, Deserialize, Debug, Clone)]
#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub struct BidQuote {
/// The price at which the maker is willing to buy at.
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]

@ -0,0 +1,32 @@
use libp2p::rendezvous::Namespace;
use std::fmt;
pub const DEFAULT_RENDEZVOUS_ADDRESS: &str =
"/dnsaddr/rendezvous.coblox.tech/p2p/12D3KooWQUt9DkNZxEn2R5ymJzWj15MpG6mTW84kyd8vDaRZi46o";
#[derive(Debug, PartialEq, Clone, Copy)]
pub enum XmrBtcNamespace {
Mainnet,
Testnet,
}
const MAINNET: &str = "xmr-btc-swap-mainnet";
const TESTNET: &str = "xmr-btc-swap-testnet";
impl fmt::Display for XmrBtcNamespace {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
XmrBtcNamespace::Mainnet => write!(f, "{}", MAINNET),
XmrBtcNamespace::Testnet => write!(f, "{}", TESTNET),
}
}
}
impl From<XmrBtcNamespace> for Namespace {
fn from(namespace: XmrBtcNamespace) -> Self {
match namespace {
XmrBtcNamespace::Mainnet => Namespace::from_static(MAINNET),
XmrBtcNamespace::Testnet => Namespace::from_static(TESTNET),
}
}
}

@ -96,7 +96,7 @@ impl From<OutEvent> for asb::OutEvent {
} => asb::OutEvent::SwapSetupCompleted {
peer_id: bob_peer_id,
swap_id,
state3: Box::new(state3),
state3,
},
OutEvent::Error { peer_id, error } => asb::OutEvent::Failure {
peer: peer_id,

@ -1,11 +1,12 @@
use crate::asb::LatestRate;
use crate::libp2p_ext::MultiAddrExt;
use crate::network::rendezvous::XmrBtcNamespace;
use crate::seed::Seed;
use crate::{asb, bitcoin, cli, env, tor};
use anyhow::Result;
use libp2p::swarm::SwarmBuilder;
use libp2p::{PeerId, Swarm};
use anyhow::{Context, Result};
use libp2p::swarm::{NetworkBehaviour, SwarmBuilder};
use libp2p::{identity, Multiaddr, Swarm};
use std::fmt::Debug;
use std::sync::Arc;
#[allow(clippy::too_many_arguments)]
pub fn asb<LR>(
@ -15,13 +16,32 @@ pub fn asb<LR>(
latest_rate: LR,
resume_only: bool,
env_config: env::Config,
rendezvous_params: Option<(Multiaddr, XmrBtcNamespace)>,
) -> Result<Swarm<asb::Behaviour<LR>>>
where
LR: LatestRate + Send + 'static + Debug + Clone,
{
let behaviour = asb::Behaviour::new(min_buy, max_buy, latest_rate, resume_only, env_config);
let identity = seed.derive_libp2p_identity();
let rendezvous_params = if let Some((address, namespace)) = rendezvous_params {
let peer_id = address
.extract_peer_id()
.context("Rendezvous node address must contain peer ID")?;
Some((identity.clone(), peer_id, address, namespace))
} else {
None
};
let behaviour = asb::Behaviour::new(
min_buy,
max_buy,
latest_rate,
resume_only,
env_config,
rendezvous_params,
);
let transport = asb::transport::new(&identity)?;
let peer_id = identity.public().into_peer_id();
@ -34,21 +54,19 @@ where
Ok(swarm)
}
pub async fn cli(
seed: &Seed,
alice: PeerId,
pub async fn cli<T>(
identity: identity::Keypair,
tor_socks5_port: u16,
env_config: env::Config,
bitcoin_wallet: Arc<bitcoin::Wallet>,
) -> Result<Swarm<cli::Behaviour>> {
behaviour: T,
) -> Result<Swarm<T>>
where
T: NetworkBehaviour,
{
let maybe_tor_socks5_port = match tor::Client::new(tor_socks5_port).assert_tor_running().await {
Ok(()) => Some(tor_socks5_port),
Err(_) => None,
};
let behaviour = cli::Behaviour::new(alice, env_config, bitcoin_wallet);
let identity = seed.derive_libp2p_identity();
let transport = cli::transport::new(&identity, maybe_tor_socks5_port)?;
let peer_id = identity.public().into_peer_id();

@ -1,19 +1,19 @@
use futures::future;
use async_trait::async_trait;
use futures::stream::FusedStream;
use futures::{future, Future, Stream, StreamExt};
use libp2p::core::muxing::StreamMuxerBox;
use libp2p::core::transport::memory::MemoryTransport;
use libp2p::core::upgrade::{SelectUpgrade, Version};
use libp2p::core::{Executor, Multiaddr};
use libp2p::core::transport::upgrade::Version;
use libp2p::core::transport::MemoryTransport;
use libp2p::core::upgrade::SelectUpgrade;
use libp2p::core::{identity, Executor, Multiaddr, PeerId, Transport};
use libp2p::mplex::MplexConfig;
use libp2p::noise::{self, NoiseConfig, X25519Spec};
use libp2p::swarm::{
IntoProtocolsHandler, NetworkBehaviour, ProtocolsHandler, SwarmBuilder, SwarmEvent,
};
use libp2p::{identity, yamux, PeerId, Swarm, Transport};
use libp2p::noise::{Keypair, NoiseConfig, X25519Spec};
use libp2p::swarm::{AddressScore, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent};
use libp2p::tcp::TokioTcpConfig;
use libp2p::yamux::YamuxConfig;
use std::fmt::Debug;
use std::future::Future;
use std::pin::Pin;
use std::time::Duration;
use tokio::time;
/// An adaptor struct for libp2p that spawns futures into the current
/// thread-local runtime.
@ -25,138 +25,193 @@ impl Executor for GlobalSpawnTokioExecutor {
}
}
#[allow(missing_debug_implementations)]
pub struct Actor<B: NetworkBehaviour> {
pub swarm: Swarm<B>,
pub addr: Multiaddr,
pub peer_id: PeerId,
}
pub async fn new_connected_swarm_pair<B, F>(behaviour_fn: F) -> (Actor<B>, Actor<B>)
pub fn new_swarm<B, F>(behaviour_fn: F) -> Swarm<B>
where
B: NetworkBehaviour,
F: Fn(PeerId, identity::Keypair) -> B + Clone,
<<<B as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent: Clone,
<B as NetworkBehaviour>::OutEvent: Debug{
let (swarm, addr, peer_id) = new_swarm(behaviour_fn.clone());
let mut alice = Actor {
swarm,
addr,
peer_id,
};
let (swarm, addr, peer_id) = new_swarm(behaviour_fn);
let mut bob = Actor {
swarm,
addr,
peer_id,
};
connect(&mut alice.swarm, &mut bob.swarm).await;
(alice, bob)
}
pub fn new_swarm<B: NetworkBehaviour, F: Fn(PeerId, identity::Keypair) -> B>(
behaviour_fn: F,
) -> (Swarm<B>, Multiaddr, PeerId)
where
<B as NetworkBehaviour>::OutEvent: Debug,
B: NetworkBehaviour,
F: FnOnce(PeerId, identity::Keypair) -> B,
{
let id_keys = identity::Keypair::generate_ed25519();
let peer_id = PeerId::from(id_keys.public());
let identity = identity::Keypair::generate_ed25519();
let peer_id = PeerId::from(identity.public());
let dh_keys = noise::Keypair::<X25519Spec>::new()
.into_authentic(&id_keys)
let dh_keys = Keypair::<X25519Spec>::new()
.into_authentic(&identity)
.expect("failed to create dh_keys");
let noise = NoiseConfig::xx(dh_keys).into_authenticated();
let transport = MemoryTransport::default()
.or_transport(TokioTcpConfig::new())
.upgrade(Version::V1)
.authenticate(noise)
.multiplex(SelectUpgrade::new(
yamux::YamuxConfig::default(),
YamuxConfig::default(),
MplexConfig::new(),
))
.timeout(Duration::from_secs(5))
.map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer)))
.boxed();
let mut swarm: Swarm<B> = SwarmBuilder::new(transport, behaviour_fn(peer_id, id_keys), peer_id)
SwarmBuilder::new(transport, behaviour_fn(peer_id, identity), peer_id)
.executor(Box::new(GlobalSpawnTokioExecutor))
.build();
.build()
}
fn get_rand_memory_address() -> Multiaddr {
let address_port = rand::random::<u64>();
let addr = format!("/memory/{}", address_port)
.parse::<Multiaddr>()
.unwrap();
Swarm::listen_on(&mut swarm, addr.clone()).unwrap();
format!("/memory/{}", address_port).parse().unwrap()
}
(swarm, addr, peer_id)
async fn get_local_tcp_address() -> Multiaddr {
let random_port = {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
listener.local_addr().unwrap().port()
};
format!("/ip4/127.0.0.1/tcp/{}", random_port)
.parse()
.unwrap()
}
pub async fn await_events_or_timeout<A, B>(
alice_event: impl Future<Output = A>,
bob_event: impl Future<Output = B>,
) -> (A, B) {
time::timeout(
Duration::from_secs(10),
future::join(alice_event, bob_event),
pub async fn await_events_or_timeout<A, B, E1, E2>(
swarm_1: &mut (impl Stream<Item = SwarmEvent<A, E1>> + FusedStream + Unpin),
swarm_2: &mut (impl Stream<Item = SwarmEvent<B, E2>> + FusedStream + Unpin),
) -> (SwarmEvent<A, E1>, SwarmEvent<B, E2>)
where
SwarmEvent<A, E1>: Debug,
SwarmEvent<B, E2>: Debug,
{
tokio::time::timeout(
Duration::from_secs(30),
future::join(
swarm_1
.inspect(|event| tracing::debug!("Swarm1 emitted {:?}", event))
.select_next_some(),
swarm_2
.inspect(|event| tracing::debug!("Swarm2 emitted {:?}", event))
.select_next_some(),
),
)
.await
.expect("network behaviours to emit an event within 10 seconds")
}
/// Connects two swarms with each other.
///
/// This assumes the transport that is in use can be used by Bob to connect to
/// the listen address that is emitted by Alice. In other words, they have to be
/// on the same network. The memory transport used by the above `new_swarm`
/// function fulfills this.
///
/// We also assume that the swarms don't emit any behaviour events during the
/// connection phase. Any event emitted is considered a bug from this functions
/// PoV because they would be lost.
pub async fn connect<BA, BB>(alice: &mut Swarm<BA>, bob: &mut Swarm<BB>)
/// An extension trait for [`Swarm`] that makes it easier to set up a network of
/// [`Swarm`]s for tests.
#[async_trait]
pub trait SwarmExt {
/// Establishes a connection to the given [`Swarm`], polling both of them
/// until the connection is established.
async fn block_on_connection<T>(&mut self, other: &mut Swarm<T>)
where
T: NetworkBehaviour,
<T as NetworkBehaviour>::OutEvent: Debug;
/// Listens on a random memory address, polling the [`Swarm`] until the
/// transport is ready to accept connections.
async fn listen_on_random_memory_address(&mut self) -> Multiaddr;
/// Listens on a TCP port for localhost only, polling the [`Swarm`] until
/// the transport is ready to accept connections.
async fn listen_on_tcp_localhost(&mut self) -> Multiaddr;
}
#[async_trait]
impl<B> SwarmExt for Swarm<B>
where
BA: NetworkBehaviour,
BB: NetworkBehaviour,
<BA as NetworkBehaviour>::OutEvent: Debug,
<BB as NetworkBehaviour>::OutEvent: Debug,
B: NetworkBehaviour,
<B as NetworkBehaviour>::OutEvent: Debug,
{
let mut alice_connected = false;
let mut bob_connected = false;
while !alice_connected && !bob_connected {
let (alice_event, bob_event) = future::join(alice.next_event(), bob.next_event()).await;
match alice_event {
SwarmEvent::ConnectionEstablished { .. } => {
alice_connected = true;
}
SwarmEvent::NewListenAddr(addr) => {
bob.dial_addr(addr).unwrap();
async fn block_on_connection<T>(&mut self, other: &mut Swarm<T>)
where
T: NetworkBehaviour,
<T as NetworkBehaviour>::OutEvent: Debug,
{
let addr_to_dial = other.external_addresses().next().unwrap().addr.clone();
self.dial_addr(addr_to_dial.clone()).unwrap();
let mut dialer_done = false;
let mut listener_done = false;
loop {
let dialer_event_fut = self.select_next_some();
tokio::select! {
dialer_event = dialer_event_fut => {
match dialer_event {
SwarmEvent::ConnectionEstablished { .. } => {
dialer_done = true;
}
SwarmEvent::UnknownPeerUnreachableAddr { address, error } if address == addr_to_dial => {
panic!("Failed to dial address {}: {}", addr_to_dial, error)
}
other => {
tracing::debug!("Ignoring {:?}", other);
}
}
},
listener_event = other.select_next_some() => {
match listener_event {
SwarmEvent::ConnectionEstablished { .. } => {
listener_done = true;
}
SwarmEvent::IncomingConnectionError { error, .. } => {
panic!("Failure in incoming connection {}", error);
}
other => {
tracing::debug!("Ignoring {:?}", other);
}
}
}
}
SwarmEvent::Behaviour(event) => {
panic!(
"alice unexpectedly emitted a behaviour event during connection: {:?}",
event
);
if dialer_done && listener_done {
return;
}
_ => {}
}
match bob_event {
SwarmEvent::ConnectionEstablished { .. } => {
bob_connected = true;
}
async fn listen_on_random_memory_address(&mut self) -> Multiaddr {
let multiaddr = get_rand_memory_address();
self.listen_on(multiaddr.clone()).unwrap();
block_until_listening_on(self, &multiaddr).await;
// Memory addresses are externally reachable because they all share the same
// memory-space.
self.add_external_address(multiaddr.clone(), AddressScore::Infinite);
multiaddr
}
async fn listen_on_tcp_localhost(&mut self) -> Multiaddr {
let multiaddr = get_local_tcp_address().await;
self.listen_on(multiaddr.clone()).unwrap();
block_until_listening_on(self, &multiaddr).await;
multiaddr
}
}
async fn block_until_listening_on<B>(swarm: &mut Swarm<B>, multiaddr: &Multiaddr)
where
B: NetworkBehaviour,
<B as NetworkBehaviour>::OutEvent: Debug,
{
loop {
match swarm.select_next_some().await {
SwarmEvent::NewListenAddr(addr) if &addr == multiaddr => {
break;
}
SwarmEvent::Behaviour(event) => {
panic!(
"bob unexpectedly emitted a behaviour event during connection: {:?}",
event
other => {
tracing::debug!(
"Ignoring {:?} while waiting for listening to succeed",
other
);
}
_ => {}
}
}
}

@ -236,6 +236,7 @@ async fn start_alice(
latest_rate,
resume_only,
env_config,
None,
)
.unwrap();
swarm.listen_on(listen_address).unwrap();
@ -445,12 +446,16 @@ impl BobParams {
) -> Result<(cli::EventLoop, cli::EventLoopHandle)> {
let tor_socks5_port = get_port()
.expect("We don't care about Tor in the tests so we get a free port to disable it.");
let mut swarm = swarm::cli(
&self.seed,
let behaviour = cli::Behaviour::new(
self.alice_peer_id,
tor_socks5_port,
self.env_config,
self.bitcoin_wallet.clone(),
);
let mut swarm = swarm::cli(
self.seed.derive_libp2p_identity(),
tor_socks5_port,
behaviour,
)
.await?;
swarm

Loading…
Cancel
Save