diff --git a/swap/src/asb.rs b/swap/src/asb.rs index 78ac0dde..f755822e 100644 --- a/swap/src/asb.rs +++ b/swap/src/asb.rs @@ -1,13 +1,7 @@ -mod amounts; pub mod command; pub mod config; -pub mod fixed_rate; -pub mod kraken; +mod fixed_rate; +mod rate; -pub use amounts::Rate; - -pub trait LatestRate { - type Error: std::error::Error + Send + Sync + 'static; - - fn latest_rate(&mut self) -> Result; -} +pub use self::fixed_rate::FixedRate; +pub use self::rate::Rate; diff --git a/swap/src/asb/fixed_rate.rs b/swap/src/asb/fixed_rate.rs index 1bf56364..b78da380 100644 --- a/swap/src/asb/fixed_rate.rs +++ b/swap/src/asb/fixed_rate.rs @@ -1,23 +1,20 @@ -use crate::asb::{LatestRate, Rate}; -use std::convert::Infallible; +use crate::asb::Rate; -pub const RATE: f64 = 0.01; +#[derive(Clone, Debug)] +pub struct FixedRate(Rate); -#[derive(Clone)] -pub struct RateService(Rate); +impl FixedRate { + pub const RATE: f64 = 0.01; -impl LatestRate for RateService { - type Error = Infallible; - - fn latest_rate(&mut self) -> Result { - Ok(self.0) + pub fn value(&self) -> Rate { + self.0 } } -impl Default for RateService { +impl Default for FixedRate { fn default() -> Self { Self(Rate { - ask: bitcoin::Amount::from_btc(RATE).expect("Static value should never fail"), + ask: bitcoin::Amount::from_btc(Self::RATE).expect("Static value should never fail"), }) } } diff --git a/swap/src/asb/kraken.rs b/swap/src/asb/kraken.rs deleted file mode 100644 index f6188dcb..00000000 --- a/swap/src/asb/kraken.rs +++ /dev/null @@ -1,203 +0,0 @@ -use crate::asb::{LatestRate, Rate}; -use anyhow::Result; -use bitcoin::util::amount::ParseAmountError; -use futures::{SinkExt, StreamExt}; -use reqwest::Url; -use serde::{Deserialize, Serialize}; -use serde_json::Value; -use std::convert::TryFrom; -use tokio::sync::watch; -use tokio_tungstenite::tungstenite::protocol::CloseFrame; -use tokio_tungstenite::tungstenite::Message; -use tracing::{error, trace}; -use watch::Receiver; - -const KRAKEN_WS_URL: &str = "wss://ws.kraken.com"; -const SUBSCRIBE_XMR_BTC_TICKER_PAYLOAD: &str = r#" -{ "event": "subscribe", - "pair": [ "XMR/XBT" ], - "subscription": { - "name": "ticker" - } -}"#; - -#[derive(Clone)] -pub struct RateService { - receiver: Receiver>, -} - -impl LatestRate for RateService { - type Error = Error; - - fn latest_rate(&mut self) -> Result { - (*self.receiver.borrow()).clone() - } -} - -#[derive(Clone, Debug, thiserror::Error)] -pub enum Error { - #[error("Rate has not yet been retrieved from Kraken websocket API")] - NotYetRetrieved, - #[error("Received close message from Kraken")] - CloseMessage, - #[error("Websocket: ")] - WebSocket(String), - #[error("Serde: ")] - Serde(String), - #[error("Data field is missing")] - DataFieldMissing, - #[error("Ask Rate Element is of unexpected type")] - UnexpectedAskRateElementType, - #[error("Ask Rate Element is missing")] - MissingAskRateElementType, - #[error("Bitcoin amount parse error: ")] - BitcoinParseAmount(#[from] ParseAmountError), -} - -impl From for Error { - fn from(err: tokio_tungstenite::tungstenite::Error) -> Self { - Error::WebSocket(format!("{:#}", err)) - } -} - -impl From for Error { - fn from(err: serde_json::Error) -> Self { - Error::Serde(format!("{:#}", err)) - } -} - -impl RateService { - pub async fn new() -> Result { - let (rate_update, rate_update_receiver) = watch::channel(Err(Error::NotYetRetrieved)); - - let (rate_stream, _response) = - tokio_tungstenite::connect_async(Url::parse(KRAKEN_WS_URL).expect("valid url")).await?; - - let (mut rate_stream_sink, mut rate_stream) = rate_stream.split(); - - tokio::spawn(async move { - while let Some(msg) = rate_stream.next().await { - let msg = match msg { - Ok(Message::Text(msg)) => msg, - Ok(Message::Close(close_frame)) => { - if let Some(CloseFrame { code, reason }) = close_frame { - error!( - "Kraken rate stream was closed with code {} and reason: {}", - code, reason - ); - } else { - error!("Kraken rate stream was closed without code and reason"); - } - let _ = rate_update.send(Err(Error::CloseMessage)); - continue; - } - Ok(msg) => { - trace!( - "Kraken rate stream returned non text message that will be ignored: {}", - msg - ); - continue; - } - Err(e) => { - error!("Error when reading from Kraken rate stream: {}", e); - let _ = rate_update.send(Err(e.into())); - continue; - } - }; - - // If we encounter a heartbeat we skip it and iterate again - if msg.eq(r#"{"event":"heartbeat"}"#) { - continue; - } - - let ticker = match serde_json::from_str::(&msg) { - Ok(ticker) => ticker, - Err(e) => { - let _ = rate_update.send(Err(e.into())); - continue; - } - }; - - let rate = match Rate::try_from(ticker) { - Ok(rate) => rate, - Err(e) => { - let _ = rate_update.send(Err(e)); - continue; - } - }; - - let _ = rate_update.send(Ok(rate)); - } - }); - - rate_stream_sink - .send(SUBSCRIBE_XMR_BTC_TICKER_PAYLOAD.into()) - .await?; - - Ok(Self { - receiver: rate_update_receiver, - }) - } -} - -#[derive(Debug, Serialize, Deserialize)] -#[serde(transparent)] -struct TickerUpdate(Vec); - -#[derive(Debug, Serialize, Deserialize)] -#[serde(untagged)] -enum TickerField { - Data(TickerData), - Metadata(Value), -} - -#[derive(Debug, Serialize, Deserialize)] -struct TickerData { - #[serde(rename = "a")] - ask: Vec, - #[serde(rename = "b")] - bid: Vec, -} - -#[derive(Debug, Serialize, Deserialize)] -#[serde(untagged)] -enum RateElement { - Text(String), - Number(u64), -} - -impl TryFrom for Rate { - type Error = Error; - - fn try_from(value: TickerUpdate) -> Result { - let data = value - .0 - .iter() - .find_map(|field| match field { - TickerField::Data(data) => Some(data), - TickerField::Metadata(_) => None, - }) - .ok_or(Error::DataFieldMissing)?; - let ask = data.ask.first().ok_or(Error::MissingAskRateElementType)?; - let ask = match ask { - RateElement::Text(ask) => { - bitcoin::Amount::from_str_in(ask, ::bitcoin::Denomination::Bitcoin)? - } - _ => return Err(Error::UnexpectedAskRateElementType), - }; - - Ok(Self { ask }) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[tokio::test] - async fn deserialize_ticker_update() { - let sample_response = r#"[980,{"a":["0.00521900",4,"4.84775132"],"b":["0.00520600",70,"70.35668921"],"c":["0.00520700","0.00000186"],"v":["18530.40510860","18531.94887860"],"p":["0.00489493","0.00489490"],"t":[5017,5018],"l":["0.00448300","0.00448300"],"h":["0.00525000","0.00525000"],"o":["0.00450000","0.00451000"]},"ticker","XMR/XBT"]"#; - - let _ = serde_json::from_str::(sample_response).unwrap(); - } -} diff --git a/swap/src/asb/amounts.rs b/swap/src/asb/rate.rs similarity index 100% rename from swap/src/asb/amounts.rs rename to swap/src/asb/rate.rs diff --git a/swap/src/bin/asb.rs b/swap/src/bin/asb.rs index 8ffd3a93..94f144eb 100644 --- a/swap/src/bin/asb.rs +++ b/swap/src/bin/asb.rs @@ -23,7 +23,6 @@ use swap::asb::command::{Arguments, Command}; use swap::asb::config::{ initial_setup, query_user_for_initial_testnet_config, read_config, Config, ConfigNotInitialized, }; -use swap::asb::kraken; use swap::database::Database; use swap::execution_params::GetExecutionParams; use swap::fs::default_config_path; @@ -31,7 +30,7 @@ use swap::monero::Amount; use swap::protocol::alice::EventLoop; use swap::seed::Seed; use swap::trace::init_tracing; -use swap::{bitcoin, execution_params, monero}; +use swap::{bitcoin, execution_params, kraken, monero}; use tracing::{info, warn}; use tracing_subscriber::filter::LevelFilter; @@ -93,7 +92,7 @@ async fn main() -> Result<()> { bitcoin_wallet.new_address().await? ); - let rate_service = kraken::RateService::new().await?; + let kraken_rate_updates = kraken::connect().await?; let (event_loop, _) = EventLoop::new( config.network.listen, @@ -102,7 +101,7 @@ async fn main() -> Result<()> { Arc::new(bitcoin_wallet), Arc::new(monero_wallet), Arc::new(db), - rate_service, + kraken_rate_updates, max_buy, ) .unwrap(); diff --git a/swap/src/bin/kraken_ticker.rs b/swap/src/bin/kraken_ticker.rs new file mode 100644 index 00000000..2dca382a --- /dev/null +++ b/swap/src/bin/kraken_ticker.rs @@ -0,0 +1,19 @@ +use anyhow::{Context, Result}; + +#[tokio::main] +async fn main() -> Result<()> { + tracing::subscriber::set_global_default( + tracing_subscriber::fmt().with_env_filter("trace").finish(), + )?; + + let mut ticker = swap::kraken::connect() + .await + .context("Failed to connect to kraken")?; + + loop { + match ticker.wait_for_update().await? { + Ok(rate) => println!("Rate update: {}", rate), + Err(e) => println!("Error: {:#}", e), + } + } +} diff --git a/swap/src/kraken.rs b/swap/src/kraken.rs new file mode 100644 index 00000000..e15c2f9f --- /dev/null +++ b/swap/src/kraken.rs @@ -0,0 +1,237 @@ +use crate::asb::Rate; +use anyhow::Result; +use bitcoin::util::amount::ParseAmountError; +use futures::{SinkExt, StreamExt}; +use reqwest::Url; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::convert::TryFrom; +use tokio::sync::watch; +use tokio_tungstenite::tungstenite; +use tracing::{error, trace}; + +pub async fn connect() -> Result { + let (rate_update, rate_update_receiver) = watch::channel(Err(Error::NotYetRetrieved)); + + let (rate_stream, _response) = + tokio_tungstenite::connect_async(Url::parse(KRAKEN_WS_URL).expect("valid url")).await?; + + let (mut rate_stream_sink, mut rate_stream) = rate_stream.split(); + + tokio::spawn(async move { + while let Some(msg) = rate_stream.next().await { + let msg = match msg { + Ok(tungstenite::Message::Text(msg)) => msg, + Ok(tungstenite::Message::Close(close_frame)) => { + if let Some(tungstenite::protocol::CloseFrame { code, reason }) = close_frame { + error!( + "Kraken rate stream was closed with code {} and reason: {}", + code, reason + ); + } else { + error!("Kraken rate stream was closed without code and reason"); + } + let _ = rate_update.send(Err(Error::ConnectionClosed)); + continue; + } + Ok(msg) => { + trace!( + "Kraken rate stream returned non text message that will be ignored: {}", + msg + ); + continue; + } + Err(e) => { + error!(%e, "Error when reading from Kraken rate stream"); + let _ = rate_update.send(Err(e.into())); + continue; + } + }; + + let update = match serde_json::from_str::(&msg) { + Ok(Event::SystemStatus) => { + tracing::debug!("Connected to Kraken websocket API"); + continue; + } + Ok(Event::SubscriptionStatus) => { + tracing::debug!("Subscribed to updates for ticker"); + continue; + } + Ok(Event::Heartbeat) => { + tracing::trace!("Received heartbeat message"); + continue; + } + // if the message is not an event, it is a ticker update or an unknown event + Err(_) => match serde_json::from_str::(&msg) { + Ok(ticker) => ticker, + Err(e) => { + tracing::warn!(%e, "Failed to deserialize message '{}' as ticker update", msg); + let _ = rate_update.send(Err(Error::UnknownMessage { msg })); + continue; + } + }, + }; + + let rate = match Rate::try_from(update) { + Ok(rate) => rate, + Err(e) => { + let _ = rate_update.send(Err(e)); + continue; + } + }; + + let _ = rate_update.send(Ok(rate)); + } + }); + + rate_stream_sink + .send(SUBSCRIBE_XMR_BTC_TICKER_PAYLOAD.into()) + .await?; + + Ok(RateUpdateStream { + inner: rate_update_receiver, + }) +} + +#[derive(Clone, Debug)] +pub struct RateUpdateStream { + inner: watch::Receiver>, +} + +impl RateUpdateStream { + pub async fn wait_for_update(&mut self) -> Result> { + self.inner.changed().await?; + + Ok(self.inner.borrow().clone()) + } + + pub fn latest_update(&mut self) -> Result { + self.inner.borrow().clone() + } +} + +const KRAKEN_WS_URL: &str = "wss://ws.kraken.com"; +const SUBSCRIBE_XMR_BTC_TICKER_PAYLOAD: &str = r#" +{ "event": "subscribe", + "pair": [ "XMR/XBT" ], + "subscription": { + "name": "ticker" + } +}"#; + +#[derive(Clone, Debug, thiserror::Error)] +pub enum Error { + #[error("Rate has not yet been retrieved from Kraken websocket API")] + NotYetRetrieved, + #[error("The Kraken server closed the websocket connection")] + ConnectionClosed, + #[error("Websocket: {0}")] + WebSocket(String), + #[error("Received unknown message from Kraken: {msg}")] + UnknownMessage { msg: String }, + #[error("Data field is missing")] + DataFieldMissing, + #[error("Ask Rate Element is of unexpected type")] + UnexpectedAskRateElementType, + #[error("Ask Rate Element is missing")] + MissingAskRateElementType, + #[error("Bitcoin amount parse error: ")] + BitcoinParseAmount(#[from] ParseAmountError), +} + +impl From for Error { + fn from(err: tungstenite::Error) -> Self { + Error::WebSocket(format!("{:#}", err)) + } +} + +#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[serde(tag = "event")] +enum Event { + #[serde(rename = "systemStatus")] + SystemStatus, + #[serde(rename = "heartbeat")] + Heartbeat, + #[serde(rename = "subscriptionStatus")] + SubscriptionStatus, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(transparent)] +struct TickerUpdate(Vec); + +#[derive(Debug, Serialize, Deserialize)] +#[serde(untagged)] +enum TickerField { + Data(TickerData), + Metadata(Value), +} + +#[derive(Debug, Serialize, Deserialize)] +struct TickerData { + #[serde(rename = "a")] + ask: Vec, + #[serde(rename = "b")] + bid: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(untagged)] +enum RateElement { + Text(String), + Number(u64), +} + +impl TryFrom for Rate { + type Error = Error; + + fn try_from(value: TickerUpdate) -> Result { + let data = value + .0 + .iter() + .find_map(|field| match field { + TickerField::Data(data) => Some(data), + TickerField::Metadata(_) => None, + }) + .ok_or(Error::DataFieldMissing)?; + let ask = data.ask.first().ok_or(Error::MissingAskRateElementType)?; + let ask = match ask { + RateElement::Text(ask) => { + bitcoin::Amount::from_str_in(ask, ::bitcoin::Denomination::Bitcoin)? + } + _ => return Err(Error::UnexpectedAskRateElementType), + }; + + Ok(Self { ask }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn can_deserialize_system_status_event() { + let event = r#"{"connectionID":14859574189081089471,"event":"systemStatus","status":"online","version":"1.8.1"}"#; + + let event = serde_json::from_str::(event).unwrap(); + + assert_eq!(event, Event::SystemStatus) + } + + #[test] + fn can_deserialize_subscription_status_event() { + let event = r#"{"channelID":980,"channelName":"ticker","event":"subscriptionStatus","pair":"XMR/XBT","status":"subscribed","subscription":{"name":"ticker"}}"#; + + let event = serde_json::from_str::(event).unwrap(); + + assert_eq!(event, Event::SubscriptionStatus) + } + + #[test] + fn deserialize_ticker_update() { + let message = r#"[980,{"a":["0.00440700",7,"7.35318535"],"b":["0.00440200",7,"7.57416678"],"c":["0.00440700","0.22579000"],"v":["273.75489000","4049.91233351"],"p":["0.00446205","0.00441699"],"t":[123,1310],"l":["0.00439400","0.00429900"],"h":["0.00450000","0.00450000"],"o":["0.00449100","0.00433700"]},"ticker","XMR/XBT"]"#; + + let _ = serde_json::from_str::(message).unwrap(); + } +} diff --git a/swap/src/lib.rs b/swap/src/lib.rs index f484da3b..b769dc96 100644 --- a/swap/src/lib.rs +++ b/swap/src/lib.rs @@ -22,6 +22,7 @@ pub mod cli; pub mod database; pub mod execution_params; pub mod fs; +pub mod kraken; pub mod monero; pub mod network; pub mod protocol; diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index 7df924ff..ad8b7c1f 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -1,4 +1,4 @@ -use crate::asb::LatestRate; +use crate::asb::{FixedRate, Rate}; use crate::database::Database; use crate::execution_params::ExecutionParams; use crate::monero::BalanceTooLow; @@ -8,70 +8,20 @@ use crate::protocol::alice; use crate::protocol::alice::{AliceState, Behaviour, OutEvent, State3, Swap, TransferProof}; use crate::protocol::bob::EncryptedSignature; use crate::seed::Seed; -use crate::{bitcoin, monero}; +use crate::{bitcoin, kraken, monero}; use anyhow::{bail, Context, Result}; use futures::future::RemoteHandle; use libp2p::core::Multiaddr; use libp2p::futures::FutureExt; use libp2p::{PeerId, Swarm}; use rand::rngs::OsRng; +use std::convert::Infallible; use std::sync::Arc; use tokio::sync::mpsc::error::SendError; use tokio::sync::{broadcast, mpsc}; use tracing::{debug, error, trace}; use uuid::Uuid; -#[allow(missing_debug_implementations)] -pub struct MpscChannels { - sender: mpsc::Sender, - receiver: mpsc::Receiver, -} - -impl Default for MpscChannels { - fn default() -> Self { - let (sender, receiver) = mpsc::channel(100); - MpscChannels { sender, receiver } - } -} - -#[allow(missing_debug_implementations)] -pub struct BroadcastChannels -where - T: Clone, -{ - sender: broadcast::Sender, -} - -impl Default for BroadcastChannels -where - T: Clone, -{ - fn default() -> Self { - let (sender, _receiver) = broadcast::channel(100); - BroadcastChannels { sender } - } -} - -#[derive(Debug)] -pub struct EventLoopHandle { - recv_encrypted_signature: broadcast::Receiver, - send_transfer_proof: mpsc::Sender<(PeerId, TransferProof)>, -} - -impl EventLoopHandle { - pub async fn recv_encrypted_signature(&mut self) -> Result { - self.recv_encrypted_signature - .recv() - .await - .context("Failed to receive Bitcoin encrypted signature from Bob") - } - pub async fn send_transfer_proof(&mut self, bob: PeerId, msg: TransferProof) -> Result<()> { - let _ = self.send_transfer_proof.send((bob, msg)).await?; - - Ok(()) - } -} - #[allow(missing_debug_implementations)] pub struct EventLoop { swarm: libp2p::Swarm, @@ -80,7 +30,7 @@ pub struct EventLoop { bitcoin_wallet: Arc, monero_wallet: Arc, db: Arc, - rate_service: RS, + latest_rate: RS, max_buy: bitcoin::Amount, recv_encrypted_signature: broadcast::Sender, @@ -92,9 +42,15 @@ pub struct EventLoop { swap_handle_sender: mpsc::Sender>>, } -impl EventLoop +#[derive(Debug)] +pub struct EventLoopHandle { + recv_encrypted_signature: broadcast::Receiver, + send_transfer_proof: mpsc::Sender<(PeerId, TransferProof)>, +} + +impl EventLoop where - RS: LatestRate, + LR: LatestRate, { #[allow(clippy::too_many_arguments)] pub fn new( @@ -104,7 +60,7 @@ where bitcoin_wallet: Arc, monero_wallet: Arc, db: Arc, - rate_service: RS, + latest_rate: LR, max_buy: bitcoin::Amount, ) -> Result<(Self, mpsc::Receiver>>)> { let identity = seed.derive_libp2p_identity(); @@ -132,7 +88,7 @@ where bitcoin_wallet, monero_wallet, db, - rate_service, + latest_rate, recv_encrypted_signature: recv_encrypted_signature.sender, send_transfer_proof: send_transfer_proof.receiver, send_transfer_proof_sender: send_transfer_proof.sender, @@ -239,7 +195,7 @@ where monero_wallet: Arc, ) -> Result { let rate = self - .rate_service + .latest_rate .latest_rate() .context("Failed to get latest rate")?; @@ -265,7 +221,7 @@ where async fn make_quote(&mut self, max_buy: bitcoin::Amount) -> Result { let rate = self - .rate_service + .latest_rate .latest_rate() .context("Failed to get latest rate")?; @@ -313,9 +269,76 @@ where } } +pub trait LatestRate { + type Error: std::error::Error + Send + Sync + 'static; + + fn latest_rate(&mut self) -> Result; +} + +impl LatestRate for FixedRate { + type Error = Infallible; + + fn latest_rate(&mut self) -> Result { + Ok(self.value()) + } +} + +impl LatestRate for kraken::RateUpdateStream { + type Error = kraken::Error; + + fn latest_rate(&mut self) -> Result { + self.latest_update() + } +} + +impl EventLoopHandle { + pub async fn recv_encrypted_signature(&mut self) -> Result { + self.recv_encrypted_signature + .recv() + .await + .context("Failed to receive Bitcoin encrypted signature from Bob") + } + pub async fn send_transfer_proof(&mut self, bob: PeerId, msg: TransferProof) -> Result<()> { + let _ = self.send_transfer_proof.send((bob, msg)).await?; + + Ok(()) + } +} + #[derive(Debug, Clone, Copy, thiserror::Error)] #[error("Refusing to buy {actual} because the maximum configured limit is {max}")] pub struct MaximumBuyAmountExceeded { pub max: bitcoin::Amount, pub actual: bitcoin::Amount, } + +#[allow(missing_debug_implementations)] +struct MpscChannels { + sender: mpsc::Sender, + receiver: mpsc::Receiver, +} + +impl Default for MpscChannels { + fn default() -> Self { + let (sender, receiver) = mpsc::channel(100); + MpscChannels { sender, receiver } + } +} + +#[allow(missing_debug_implementations)] +struct BroadcastChannels +where + T: Clone, +{ + sender: broadcast::Sender, +} + +impl Default for BroadcastChannels +where + T: Clone, +{ + fn default() -> Self { + let (sender, _receiver) = broadcast::channel(100); + BroadcastChannels { sender } + } +} diff --git a/swap/tests/testutils/mod.rs b/swap/tests/testutils/mod.rs index ad16dbb8..c1b3d3bc 100644 --- a/swap/tests/testutils/mod.rs +++ b/swap/tests/testutils/mod.rs @@ -14,8 +14,7 @@ use std::convert::Infallible; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; -use swap::asb::fixed_rate; -use swap::asb::fixed_rate::RATE; +use swap::asb::FixedRate; use swap::bitcoin::{CancelTimelock, PunishTimelock}; use swap::database::Database; use swap::execution_params::{ExecutionParams, GetExecutionParams}; @@ -344,7 +343,7 @@ where let (monero, containers) = testutils::init_containers(&cli).await; let btc_amount = bitcoin::Amount::from_sat(1_000_000); - let xmr_amount = monero::Amount::from_monero(btc_amount.as_btc() / RATE).unwrap(); + let xmr_amount = monero::Amount::from_monero(btc_amount.as_btc() / FixedRate::RATE).unwrap(); let alice_starting_balances = StartingBalances { xmr: xmr_amount * 10, @@ -410,7 +409,7 @@ where alice_bitcoin_wallet.clone(), alice_monero_wallet.clone(), alice_db, - fixed_rate::RateService::default(), + FixedRate::default(), bitcoin::Amount::ONE_BTC, ) .unwrap();