From 5e192acac29b67c15e9366ea88cc2d96434c4546 Mon Sep 17 00:00:00 2001 From: Byron Hambly Date: Tue, 1 Aug 2023 15:42:10 +0200 Subject: [PATCH] feat(asb): allow asb to register with mulitple rendezvous nodes --- CHANGELOG.md | 3 +- docs/asb/README.md | 7 +- swap/src/asb.rs | 3 +- swap/src/asb/config.rs | 45 +++-- swap/src/asb/event_loop.rs | 4 +- swap/src/asb/network.rs | 330 ++++++++++++++++++++++++------------- swap/src/bin/asb.rs | 15 +- swap/src/cli.rs | 35 ++-- swap/src/network/swarm.rs | 25 +-- swap/tests/harness/mod.rs | 2 +- 10 files changed, 299 insertions(+), 170 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 51a1b67a..16f4730b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed -- Minimum Supported Rust Version (MSRV) bumped to 1.63 +- Minimum Supported Rust Version (MSRV) bumped to 1.67 +- ASB can now register with multiple rendezvous nodes. The `rendezvous_point` option in `config.toml` can be a string with comma separated addresses, or a toml array of address strings. ## [0.12.1] - 2023-01-09 diff --git a/docs/asb/README.md b/docs/asb/README.md index e0991d8d..1c924252 100644 --- a/docs/asb/README.md +++ b/docs/asb/README.md @@ -42,13 +42,16 @@ Since the ASB is a long running task we specify the person running an ASB as ser The ASB daemon supports the libp2p [rendezvous-protocol](https://github.com/libp2p/specs/tree/master/rendezvous). Usage of the rendezvous functionality is entirely optional. -You can configure a rendezvous point in the `[network]` section of your config file. +You can configure one or more rendezvous point in the `[network]` section of your config file. For the registration to be successful, you also need to configure the externally reachable addresses within the `[network]` section. For example: ```toml [network] -rendezvous_point = "/dns4/discover.unstoppableswap.net/tcp/8888/p2p/12D3KooWA6cnqJpVnreBVnoro8midDL9Lpzmg8oJPoAGi7YYaamE" +rendezvous_point = [ + "/dns4/discover.unstoppableswap.net/tcp/8888/p2p/12D3KooWA6cnqJpVnreBVnoro8midDL9Lpzmg8oJPoAGi7YYaamE", + "/dns4/eratosthen.es/tcp/7798/p2p/12D3KooWAh7EXXa2ZyegzLGdjvj1W4G3EXrTGrf6trraoT1MEobs", +] external_addresses = ["/dns4/example.com/tcp/9939"] ``` diff --git a/swap/src/asb.rs b/swap/src/asb.rs index fccd2f8b..b5ed8ac1 100644 --- a/swap/src/asb.rs +++ b/swap/src/asb.rs @@ -8,6 +8,7 @@ pub mod tracing; pub use event_loop::{EventLoop, EventLoopHandle, FixedRate, KrakenRate, LatestRate}; pub use network::behaviour::{Behaviour, OutEvent}; +pub use network::rendezvous::RendezvousNode; pub use network::transport; pub use rate::Rate; pub use recovery::cancel::cancel; @@ -18,4 +19,4 @@ pub use recovery::safely_abort::safely_abort; pub use recovery::{cancel, refund}; #[cfg(test)] -pub use network::rendezous; +pub use network::rendezvous; diff --git a/swap/src/asb/config.rs b/swap/src/asb/config.rs index 7e8492ec..ac651c71 100644 --- a/swap/src/asb/config.rs +++ b/swap/src/asb/config.rs @@ -134,8 +134,8 @@ pub struct Data { pub struct Network { #[serde(deserialize_with = "addr_list::deserialize")] pub listen: Vec, - #[serde(default)] - pub rendezvous_point: Option, + #[serde(default, deserialize_with = "addr_list::deserialize")] + pub rendezvous_point: Vec, #[serde(default, deserialize_with = "addr_list::deserialize")] pub external_addresses: Vec, } @@ -156,7 +156,7 @@ mod addr_list { let list: Result, _> = s .split(',') .filter(|s| !s.is_empty()) - .map(|s| s.parse().map_err(de::Error::custom)) + .map(|s| s.trim().parse().map_err(de::Error::custom)) .collect(); Ok(list?) } @@ -165,7 +165,7 @@ mod addr_list { .iter() .map(|v| { if let Value::String(s) = v { - s.parse().map_err(de::Error::custom) + s.trim().parse().map_err(de::Error::custom) } else { Err(de::Error::custom("expected a string")) } @@ -347,10 +347,27 @@ pub fn query_user_for_initial_config(testnet: bool) -> Result { } let ask_spread = Decimal::from_f64(ask_spread).context("Unable to parse spread")?; - let rendezvous_point = Input::::with_theme(&ColorfulTheme::default()) - .with_prompt("Do you want to advertise your ASB instance with a rendezvous node? Enter an empty string if not.") - .allow_empty(true) - .interact_text()?; + let mut number = 1; + let mut done = false; + let mut rendezvous_points = Vec::new(); + println!("ASB can register with multiple rendezvous nodes for discoverability. This can also be edited in the config file later."); + while !done { + let prompt = format!( + "Enter the address for rendezvous node ({number}). Or just hit Enter to continue." + ); + let rendezvous_addr = Input::::with_theme(&ColorfulTheme::default()) + .with_prompt(prompt) + .allow_empty(true) + .interact_text()?; + if rendezvous_addr.is_empty() { + done = true; + } else if rendezvous_points.contains(&rendezvous_addr) { + println!("That rendezvous address is already in the list."); + } else { + rendezvous_points.push(rendezvous_addr); + number += 1; + } + } println!(); @@ -358,11 +375,7 @@ pub fn query_user_for_initial_config(testnet: bool) -> Result { data: Data { dir: data_dir }, network: Network { listen: listen_addresses, - rendezvous_point: if rendezvous_point.is_empty() { - None - } else { - Some(rendezvous_point) - }, + rendezvous_point: rendezvous_points, // keeping the singular key name for backcompat external_addresses: vec![], }, bitcoin: Bitcoin { @@ -417,7 +430,7 @@ mod tests { }, network: Network { listen: vec![defaults.listen_address_tcp, defaults.listen_address_ws], - rendezvous_point: None, + rendezvous_point: vec![], external_addresses: vec![], }, monero: Monero { @@ -461,7 +474,7 @@ mod tests { }, network: Network { listen: vec![defaults.listen_address_tcp, defaults.listen_address_ws], - rendezvous_point: None, + rendezvous_point: vec![], external_addresses: vec![], }, monero: Monero { @@ -515,7 +528,7 @@ mod tests { }, network: Network { listen, - rendezvous_point: None, + rendezvous_point: vec![], external_addresses, }, monero: Monero { diff --git a/swap/src/asb/event_loop.rs b/swap/src/asb/event_loop.rs index 130494bc..4083f130 100644 --- a/swap/src/asb/event_loop.rs +++ b/swap/src/asb/event_loop.rs @@ -253,8 +253,8 @@ where channel }.boxed()); } - SwarmEvent::Behaviour(OutEvent::Rendezvous(libp2p::rendezvous::client::Event::Registered { .. })) => { - tracing::info!("Successfully registered with rendezvous node"); + SwarmEvent::Behaviour(OutEvent::Rendezvous(libp2p::rendezvous::client::Event::Registered { rendezvous_node, ttl, namespace })) => { + tracing::info!("Successfully registered with rendezvous node: {} with namespace: {} and TTL: {:?}", rendezvous_node, namespace, ttl); } SwarmEvent::Behaviour(OutEvent::Rendezvous(libp2p::rendezvous::client::Event::RegisterFailed(error))) => { tracing::error!("Registration with rendezvous node failed: {:?}", error); diff --git a/swap/src/asb/network.rs b/swap/src/asb/network.rs index 41b62c31..181ec9bc 100644 --- a/swap/src/asb/network.rs +++ b/swap/src/asb/network.rs @@ -44,7 +44,9 @@ pub mod transport { } pub mod behaviour { - use super::*; + use libp2p::swarm::behaviour::toggle::Toggle; + + use super::{rendezvous::RendezvousNode, *}; #[allow(clippy::large_enum_variant)] #[derive(Debug)] @@ -108,7 +110,7 @@ pub mod behaviour { where LR: LatestRate + Send + 'static, { - pub rendezvous: libp2p::swarm::behaviour::toggle::Toggle, + pub rendezvous: Toggle, pub quote: quote::Behaviour, pub swap_setup: alice::Behaviour, pub transfer_proof: transfer_proof::Behaviour, @@ -132,25 +134,22 @@ pub mod behaviour { resume_only: bool, env_config: env::Config, identify_params: (identity::Keypair, XmrBtcNamespace), - rendezvous_params: Option<(identity::Keypair, PeerId, Multiaddr, XmrBtcNamespace)>, + rendezvous_nodes: Vec, ) -> Self { - let agentVersion = format!("asb/{} ({})", env!("CARGO_PKG_VERSION"), identify_params.1); - let protocolVersion = "/comit/xmr/btc/1.0.0".to_string(); - let identifyConfig = IdentifyConfig::new(protocolVersion, identify_params.0.public()) - .with_agent_version(agentVersion); + let (identity, namespace) = identify_params; + let agent_version = format!("asb/{} ({})", env!("CARGO_PKG_VERSION"), namespace); + let protocol_version = "/comit/xmr/btc/1.0.0".to_string(); + let identifyConfig = IdentifyConfig::new(protocol_version, identity.public()) + .with_agent_version(agent_version); + + let behaviour = if rendezvous_nodes.is_empty() { + None + } else { + Some(rendezvous::Behaviour::new(identity, rendezvous_nodes)) + }; Self { - rendezvous: libp2p::swarm::behaviour::toggle::Toggle::from(rendezvous_params.map( - |(identity, rendezvous_peer_id, rendezvous_address, namespace)| { - rendezous::Behaviour::new( - identity, - rendezvous_peer_id, - rendezvous_address, - namespace, - None, // use default ttl on rendezvous point - ) - }, - )), + rendezvous: Toggle::from(behaviour), quote: quote::asb(), swap_setup: alice::Behaviour::new( min_buy, @@ -186,13 +185,14 @@ pub mod behaviour { } } -pub mod rendezous { +pub mod rendezvous { use super::*; use libp2p::swarm::dial_opts::DialOpts; use libp2p::swarm::DialError; + use std::collections::VecDeque; use std::pin::Pin; - #[derive(PartialEq)] + #[derive(Clone, PartialEq)] enum ConnectionStatus { Disconnected, Dialling, @@ -209,39 +209,59 @@ pub mod rendezous { pub struct Behaviour { inner: libp2p::rendezvous::client::Behaviour, - rendezvous_point: Multiaddr, - rendezvous_peer_id: PeerId, - namespace: XmrBtcNamespace, - registration_status: RegistrationStatus, + rendezvous_nodes: Vec, + to_dial: VecDeque, + } + + pub struct RendezvousNode { + pub address: Multiaddr, connection_status: ConnectionStatus, - registration_ttl: Option, + pub peer_id: PeerId, + registration_status: RegistrationStatus, + pub registration_ttl: Option, + pub namespace: XmrBtcNamespace, } - impl Behaviour { + impl RendezvousNode { pub fn new( - identity: identity::Keypair, - rendezvous_peer_id: PeerId, - rendezvous_address: Multiaddr, + address: &Multiaddr, + peer_id: PeerId, namespace: XmrBtcNamespace, registration_ttl: Option, ) -> Self { Self { - inner: libp2p::rendezvous::client::Behaviour::new(identity), - rendezvous_point: rendezvous_address, - rendezvous_peer_id, + address: address.to_owned(), + connection_status: ConnectionStatus::Disconnected, namespace, + peer_id, registration_status: RegistrationStatus::RegisterOnNextConnection, - connection_status: ConnectionStatus::Disconnected, registration_ttl, } } - fn register(&mut self) { - self.inner.register( - self.namespace.into(), - self.rendezvous_peer_id, - self.registration_ttl, - ); + fn set_connection(&mut self, status: ConnectionStatus) { + self.connection_status = status; + } + + fn set_registration(&mut self, status: RegistrationStatus) { + self.registration_status = status; + } + } + + impl Behaviour { + pub fn new(identity: identity::Keypair, rendezvous_nodes: Vec) -> Self { + Self { + inner: libp2p::rendezvous::client::Behaviour::new(identity), + rendezvous_nodes, + to_dial: VecDeque::new(), + } + } + + /// Calls the rendezvous register method of the node at node_index in the Vec of rendezvous nodes + fn register(&mut self, node_index: usize) { + let node = &self.rendezvous_nodes[node_index]; + self.inner + .register(node.namespace.into(), node.peer_id, node.registration_ttl); } } @@ -255,31 +275,37 @@ pub mod rendezous { } fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { - if peer_id == &self.rendezvous_peer_id { - return vec![self.rendezvous_point.clone()]; + for node in self.rendezvous_nodes.iter() { + if peer_id == &node.peer_id { + return vec![node.address.clone()]; + } } vec![] } fn inject_connected(&mut self, peer_id: &PeerId) { - if peer_id == &self.rendezvous_peer_id { - self.connection_status = ConnectionStatus::Connected; - - match &self.registration_status { - RegistrationStatus::RegisterOnNextConnection => { - self.register(); - self.registration_status = RegistrationStatus::Pending; + for i in 0..self.rendezvous_nodes.len() { + if peer_id == &self.rendezvous_nodes[i].peer_id { + self.rendezvous_nodes[i].set_connection(ConnectionStatus::Connected); + match &self.rendezvous_nodes[i].registration_status { + RegistrationStatus::RegisterOnNextConnection => { + self.register(i); + self.rendezvous_nodes[i].set_registration(RegistrationStatus::Pending); + } + RegistrationStatus::Registered { .. } => {} + RegistrationStatus::Pending => {} } - RegistrationStatus::Registered { .. } => {} - RegistrationStatus::Pending => {} } } } fn inject_disconnected(&mut self, peer_id: &PeerId) { - if peer_id == &self.rendezvous_peer_id { - self.connection_status = ConnectionStatus::Disconnected; + for i in 0..self.rendezvous_nodes.len() { + let mut node = &mut self.rendezvous_nodes[i]; + if peer_id == &node.peer_id { + node.connection_status = ConnectionStatus::Disconnected; + } } } @@ -298,9 +324,12 @@ pub mod rendezous { _handler: Self::ProtocolsHandler, _error: &DialError, ) { - if let Some(id) = peer_id { - if id == self.rendezvous_peer_id { - self.connection_status = ConnectionStatus::Disconnected; + for i in 0..self.rendezvous_nodes.len() { + let mut node = &mut self.rendezvous_nodes[i]; + if let Some(id) = peer_id { + if id == node.peer_id { + node.connection_status = ConnectionStatus::Disconnected; + } } } } @@ -311,62 +340,73 @@ pub mod rendezous { cx: &mut std::task::Context<'_>, params: &mut impl PollParameters, ) -> Poll> { - match &mut self.registration_status { - RegistrationStatus::RegisterOnNextConnection => match self.connection_status { - ConnectionStatus::Disconnected => { - self.connection_status = ConnectionStatus::Dialling; - - return Poll::Ready(NetworkBehaviourAction::Dial { - opts: DialOpts::peer_id(self.rendezvous_peer_id) - .condition(PeerCondition::Disconnected) - .build(), - - handler: Self::ProtocolsHandler::new(Duration::from_secs(30)), - }); - } - ConnectionStatus::Dialling => {} - ConnectionStatus::Connected => { - self.registration_status = RegistrationStatus::Pending; - self.register(); - } - }, - RegistrationStatus::Registered { re_register_in } => { - if let Poll::Ready(()) = re_register_in.poll_unpin(cx) { - match self.connection_status { - ConnectionStatus::Connected => { - self.registration_status = RegistrationStatus::Pending; - self.register(); - } - ConnectionStatus::Disconnected => { - self.registration_status = - RegistrationStatus::RegisterOnNextConnection; - - return Poll::Ready(NetworkBehaviourAction::Dial { - opts: DialOpts::peer_id(self.rendezvous_peer_id) - .condition(PeerCondition::Disconnected) - .build(), - handler: Self::ProtocolsHandler::new(Duration::from_secs(30)), - }); + if let Some(peer_id) = self.to_dial.pop_front() { + return Poll::Ready(NetworkBehaviourAction::Dial { + opts: DialOpts::peer_id(peer_id) + .condition(PeerCondition::Disconnected) + .build(), + + handler: Self::ProtocolsHandler::new(Duration::from_secs(30)), + }); + } + // check the status of each rendezvous node + for i in 0..self.rendezvous_nodes.len() { + let connection_status = self.rendezvous_nodes[i].connection_status.clone(); + match &mut self.rendezvous_nodes[i].registration_status { + RegistrationStatus::RegisterOnNextConnection => match connection_status { + ConnectionStatus::Disconnected => { + self.rendezvous_nodes[i].set_connection(ConnectionStatus::Dialling); + self.to_dial.push_back(self.rendezvous_nodes[i].peer_id); + } + ConnectionStatus::Dialling => {} + ConnectionStatus::Connected => { + self.rendezvous_nodes[i].set_registration(RegistrationStatus::Pending); + self.register(i); + } + }, + RegistrationStatus::Registered { re_register_in } => { + if let Poll::Ready(()) = re_register_in.poll_unpin(cx) { + match connection_status { + ConnectionStatus::Connected => { + self.rendezvous_nodes[i] + .set_registration(RegistrationStatus::Pending); + self.register(i); + } + ConnectionStatus::Disconnected => { + self.rendezvous_nodes[i].set_registration( + RegistrationStatus::RegisterOnNextConnection, + ); + self.to_dial.push_back(self.rendezvous_nodes[i].peer_id); + } + ConnectionStatus::Dialling => {} } - ConnectionStatus::Dialling => {} } } + RegistrationStatus::Pending => {} } - RegistrationStatus::Pending => {} } let inner_poll = self.inner.poll(cx, params); - // reset the timer if we successfully registered + // reset the timer for the specific rendezvous node if we successfully registered if let Poll::Ready(NetworkBehaviourAction::GenerateEvent( - libp2p::rendezvous::client::Event::Registered { ttl, .. }, + libp2p::rendezvous::client::Event::Registered { + ttl, + rendezvous_node, + .. + }, )) = &inner_poll { - let half_of_ttl = Duration::from_secs(*ttl) / 2; - - self.registration_status = RegistrationStatus::Registered { - re_register_in: Box::pin(tokio::time::sleep(half_of_ttl)), - }; + if let Some(i) = self + .rendezvous_nodes + .iter() + .position(|n| &n.peer_id == rendezvous_node) + { + let half_of_ttl = Duration::from_secs(*ttl) / 2; + let re_register_in = Box::pin(tokio::time::sleep(half_of_ttl)); + let status = RegistrationStatus::Registered { re_register_in }; + self.rendezvous_nodes[i].set_registration(status); + } } inner_poll @@ -380,6 +420,7 @@ pub mod rendezous { use futures::StreamExt; use libp2p::rendezvous; use libp2p::swarm::SwarmEvent; + use std::collections::HashMap; #[tokio::test] async fn given_no_initial_connection_when_constructed_asb_connects_and_registers_with_rendezvous_node( @@ -387,16 +428,16 @@ pub mod rendezous { let mut rendezvous_node = new_swarm(|_, _| { rendezvous::server::Behaviour::new(rendezvous::server::Config::default()) }); - let rendezvous_address = rendezvous_node.listen_on_random_memory_address().await; + let address = rendezvous_node.listen_on_random_memory_address().await; + let rendezvous_point = RendezvousNode::new( + &address, + rendezvous_node.local_peer_id().to_owned(), + XmrBtcNamespace::Testnet, + None, + ); let mut asb = new_swarm(|_, identity| { - rendezous::Behaviour::new( - identity, - *rendezvous_node.local_peer_id(), - rendezvous_address, - XmrBtcNamespace::Testnet, - None, - ) + super::rendezvous::Behaviour::new(identity, vec![rendezvous_point]) }); asb.listen_on_random_memory_address().await; // this adds an external address @@ -428,16 +469,16 @@ pub mod rendezous { rendezvous::server::Config::default().with_min_ttl(2), ) }); - let rendezvous_address = rendezvous_node.listen_on_random_memory_address().await; + let address = rendezvous_node.listen_on_random_memory_address().await; + let rendezvous_point = RendezvousNode::new( + &address, + rendezvous_node.local_peer_id().to_owned(), + XmrBtcNamespace::Testnet, + Some(5), + ); let mut asb = new_swarm(|_, identity| { - rendezous::Behaviour::new( - identity, - *rendezvous_node.local_peer_id(), - rendezvous_address, - XmrBtcNamespace::Testnet, - Some(5), - ) + super::rendezvous::Behaviour::new(identity, vec![rendezvous_point]) }); asb.listen_on_random_memory_address().await; // this adds an external address @@ -467,5 +508,62 @@ pub mod rendezous { .unwrap() .unwrap(); } + + #[tokio::test] + async fn asb_registers_multiple() { + let registration_ttl = Some(10); + let mut rendezvous_nodes = Vec::new(); + let mut registrations = HashMap::new(); + // register with 5 rendezvous nodes + for _ in 0..5 { + let mut rendezvous = new_swarm(|_, _| { + rendezvous::server::Behaviour::new( + rendezvous::server::Config::default().with_min_ttl(2), + ) + }); + let address = rendezvous.listen_on_random_memory_address().await; + let id = *rendezvous.local_peer_id(); + registrations.insert(id, 0); + rendezvous_nodes.push(RendezvousNode::new( + &address, + *rendezvous.local_peer_id(), + XmrBtcNamespace::Testnet, + registration_ttl, + )); + tokio::spawn(async move { + loop { + rendezvous.next().await; + } + }); + } + + let mut asb = new_swarm(|_, identity| { + super::rendezvous::Behaviour::new(identity, rendezvous_nodes) + }); + asb.listen_on_random_memory_address().await; // this adds an external address + + let handle = tokio::spawn(async move { + loop { + if let SwarmEvent::Behaviour(rendezvous::client::Event::Registered { + rendezvous_node, + .. + }) = asb.select_next_some().await + { + registrations + .entry(rendezvous_node) + .and_modify(|counter| *counter += 1); + } + + if registrations.iter().all(|(_, &count)| count >= 4) { + break; + } + } + }); + + tokio::time::timeout(Duration::from_secs(30), handle) + .await + .unwrap() + .unwrap(); + } } } diff --git a/swap/src/bin/asb.rs b/swap/src/bin/asb.rs index d05a0dbb..ff3b0e4c 100644 --- a/swap/src/bin/asb.rs +++ b/swap/src/bin/asb.rs @@ -102,6 +102,19 @@ async fn main() -> Result<()> { match cmd { Command::Start { resume_only } => { + // check and warn for duplicate rendezvous points + let mut rendezvous_addrs = config.network.rendezvous_point.clone(); + let prev_len = rendezvous_addrs.len(); + rendezvous_addrs.sort(); + rendezvous_addrs.dedup(); + let new_len = rendezvous_addrs.len(); + if new_len < prev_len { + tracing::warn!( + "`rendezvous_point` config has {} duplicate entries, they are being ignored.", + prev_len - new_len + ); + } + let monero_wallet = init_monero_wallet(&config, env_config).await?; let monero_address = monero_wallet.get_main_address(); tracing::info!(%monero_address, "Monero wallet address"); @@ -161,7 +174,7 @@ async fn main() -> Result<()> { resume_only, env_config, namespace, - config.network.rendezvous_point, + &rendezvous_addrs, )?; for listen in config.network.listen.clone() { diff --git a/swap/src/cli.rs b/swap/src/cli.rs index c98634d2..f0faf146 100644 --- a/swap/src/cli.rs +++ b/swap/src/cli.rs @@ -15,6 +15,7 @@ pub use list_sellers::{list_sellers, Seller, Status as SellerStatus}; mod tests { use super::*; use crate::asb; + use crate::asb::rendezvous::RendezvousNode; use crate::cli::list_sellers::{Seller, Status}; use crate::network::quote; use crate::network::quote::BidQuote; @@ -33,10 +34,8 @@ mod tests { async fn list_sellers_should_report_all_registered_asbs_with_a_quote() { let namespace = XmrBtcNamespace::Mainnet; let (rendezvous_address, rendezvous_peer_id) = setup_rendezvous_point().await; - let expected_seller_1 = - setup_asb(rendezvous_peer_id, rendezvous_address.clone(), namespace).await; - let expected_seller_2 = - setup_asb(rendezvous_peer_id, rendezvous_address.clone(), namespace).await; + let expected_seller_1 = setup_asb(rendezvous_peer_id, &rendezvous_address, namespace).await; + let expected_seller_2 = setup_asb(rendezvous_peer_id, &rendezvous_address, namespace).await; let list_sellers = list_sellers( rendezvous_peer_id, @@ -72,7 +71,7 @@ mod tests { async fn setup_asb( rendezvous_peer_id: PeerId, - rendezvous_address: Multiaddr, + rendezvous_address: &Multiaddr, namespace: XmrBtcNamespace, ) -> Seller { let static_quote = BidQuote { @@ -81,18 +80,18 @@ mod tests { max_quantity: bitcoin::Amount::from_sat(9001), }; - let mut asb = new_swarm(|_, identity| StaticQuoteAsbBehaviour { - rendezvous: asb::rendezous::Behaviour::new( - identity, - rendezvous_peer_id, - rendezvous_address, - namespace, - None, - ), - ping: Default::default(), - quote: quote::asb(), - static_quote, - registered: false, + let mut asb = new_swarm(|_, identity| { + let rendezvous_node = + RendezvousNode::new(rendezvous_address, rendezvous_peer_id, namespace, None); + let rendezvous = asb::rendezvous::Behaviour::new(identity, vec![rendezvous_node]); + + StaticQuoteAsbBehaviour { + rendezvous, + ping: Default::default(), + quote: quote::asb(), + static_quote, + registered: false, + } }); let asb_address = asb.listen_on_tcp_localhost().await; @@ -121,7 +120,7 @@ mod tests { #[derive(libp2p::NetworkBehaviour)] #[behaviour(event_process = true)] struct StaticQuoteAsbBehaviour { - rendezvous: asb::rendezous::Behaviour, + rendezvous: asb::rendezvous::Behaviour, // Support `Ping` as a workaround until https://github.com/libp2p/rust-libp2p/issues/2109 is fixed. ping: libp2p::ping::Ping, quote: quote::Behaviour, diff --git a/swap/src/network/swarm.rs b/swap/src/network/swarm.rs index 21bbfc4d..37bb0a5e 100644 --- a/swap/src/network/swarm.rs +++ b/swap/src/network/swarm.rs @@ -1,9 +1,9 @@ -use crate::asb::LatestRate; +use crate::asb::{LatestRate, RendezvousNode}; use crate::libp2p_ext::MultiAddrExt; use crate::network::rendezvous::XmrBtcNamespace; use crate::seed::Seed; use crate::{asb, bitcoin, cli, env, tor}; -use anyhow::{Context, Result}; +use anyhow::Result; use libp2p::swarm::{NetworkBehaviour, SwarmBuilder}; use libp2p::{identity, Multiaddr, Swarm}; use std::fmt::Debug; @@ -17,22 +17,23 @@ pub fn asb( resume_only: bool, env_config: env::Config, namespace: XmrBtcNamespace, - rendezvous_point: Option, + rendezvous_addrs: &[Multiaddr], ) -> Result>> where LR: LatestRate + Send + 'static + Debug + Clone, { let identity = seed.derive_libp2p_identity(); - let rendezvous_params = if let Some(address) = rendezvous_point { - let peer_id = address - .extract_peer_id() - .context("Rendezvous node address must contain peer ID")?; + let rendezvous_nodes = rendezvous_addrs + .iter() + .map(|addr| { + let peer_id = addr + .extract_peer_id() + .expect("Rendezvous node address must contain peer ID"); - Some((identity.clone(), peer_id, address, namespace)) - } else { - None - }; + RendezvousNode::new(addr, peer_id, namespace, None) + }) + .collect(); let behaviour = asb::Behaviour::new( min_buy, @@ -41,7 +42,7 @@ where resume_only, env_config, (identity.clone(), namespace), - rendezvous_params, + rendezvous_nodes, ); let transport = asb::transport::new(&identity)?; diff --git a/swap/tests/harness/mod.rs b/swap/tests/harness/mod.rs index 0c896055..4f3f5fee 100644 --- a/swap/tests/harness/mod.rs +++ b/swap/tests/harness/mod.rs @@ -248,7 +248,7 @@ async fn start_alice( resume_only, env_config, XmrBtcNamespace::Testnet, - None, + &[], ) .unwrap(); swarm.listen_on(listen_address).unwrap();