From 654cfff2a87248cc14ed8d820f984c1cf849fb64 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 1 Apr 2021 12:09:51 +1100 Subject: [PATCH] Make `kraken` module emit `PriceUpdate`s instead of `Rate`s --- swap/src/bin/asb.rs | 4 ++-- swap/src/bin/kraken_ticker.rs | 4 ++-- swap/src/kraken.rs | 31 +++++++++++++-------------- swap/src/protocol/alice/event_loop.rs | 7 ++++-- 4 files changed, 24 insertions(+), 22 deletions(-) diff --git a/swap/src/bin/asb.rs b/swap/src/bin/asb.rs index 637e659f..6694e35d 100644 --- a/swap/src/bin/asb.rs +++ b/swap/src/bin/asb.rs @@ -92,7 +92,7 @@ async fn main() -> Result<()> { info!("Monero balance: {}", monero_balance); } - let kraken_rate_updates = kraken::connect()?; + let kraken_price_updates = kraken::connect()?; let mut swarm = swarm::new::(&seed)?; Swarm::listen_on(&mut swarm, config.network.listen) @@ -104,7 +104,7 @@ async fn main() -> Result<()> { Arc::new(bitcoin_wallet), Arc::new(monero_wallet), Arc::new(db), - kraken_rate_updates, + kraken_price_updates, max_buy, ) .unwrap(); diff --git a/swap/src/bin/kraken_ticker.rs b/swap/src/bin/kraken_ticker.rs index 9c0d3fdd..a28bac50 100644 --- a/swap/src/bin/kraken_ticker.rs +++ b/swap/src/bin/kraken_ticker.rs @@ -9,8 +9,8 @@ async fn main() -> Result<()> { let mut ticker = swap::kraken::connect().context("Failed to connect to kraken")?; loop { - match ticker.wait_for_update().await? { - Ok(rate) => println!("Rate update: {}", rate), + match ticker.wait_for_next_update().await? { + Ok(update) => println!("Price update: {}", update.ask), Err(e) => println!("Error: {:#}", e), } } diff --git a/swap/src/kraken.rs b/swap/src/kraken.rs index 7aee6c5c..57e2861c 100644 --- a/swap/src/kraken.rs +++ b/swap/src/kraken.rs @@ -1,4 +1,3 @@ -use crate::asb::Rate; use anyhow::{anyhow, Context, Result}; use futures::{SinkExt, StreamExt, TryStreamExt}; use serde::Deserialize; @@ -10,9 +9,9 @@ use tokio::sync::watch; /// Connect to Kraken websocket API for a constant stream of rate updates. /// /// If the connection fails, it will automatically be re-established. -pub fn connect() -> Result { - let (rate_update, rate_update_receiver) = watch::channel(Err(Error::NotYetAvailable)); - let rate_update = Arc::new(rate_update); +pub fn connect() -> Result { + let (price_update, price_update_receiver) = watch::channel(Err(Error::NotYetAvailable)); + let price_update = Arc::new(price_update); tokio::spawn(async move { // The default backoff config is fine for us apart from one thing: @@ -26,12 +25,12 @@ pub fn connect() -> Result { let result = backoff::future::retry_notify::( backoff, || { - let rate_update = rate_update.clone(); + let price_update = price_update.clone(); async move { let mut stream = connection::new().await?; while let Some(update) = stream.try_next().await.map_err(to_backoff)? { - let send_result = rate_update.send(Ok(Rate::new(update.ask))); + let send_result = price_update.send(Ok(update)); if send_result.is_err() { return Err(backoff::Error::Permanent(anyhow!( @@ -54,30 +53,30 @@ pub fn connect() -> Result { tracing::warn!("Rate updates incurred an unrecoverable error: {:#}", e); // in case the retries fail permanently, let the subscribers know - rate_update.send(Err(Error::PermanentFailure)) + price_update.send(Err(Error::PermanentFailure)) } Ok(never) => match never {}, } }); - Ok(RateUpdateStream { - inner: rate_update_receiver, + Ok(PriceUpdates { + inner: price_update_receiver, }) } #[derive(Clone, Debug)] -pub struct RateUpdateStream { - inner: watch::Receiver, +pub struct PriceUpdates { + inner: watch::Receiver, } -impl RateUpdateStream { - pub async fn wait_for_update(&mut self) -> Result { +impl PriceUpdates { + pub async fn wait_for_next_update(&mut self) -> Result { self.inner.changed().await?; Ok(self.inner.borrow().clone()) } - pub fn latest_update(&mut self) -> RateUpdate { + pub fn latest_update(&mut self) -> PriceUpdate { self.inner.borrow().clone() } } @@ -90,7 +89,7 @@ pub enum Error { PermanentFailure, } -type RateUpdate = Result; +type PriceUpdate = Result; /// Maps a [`connection::Error`] to a backoff error, effectively defining our /// retry strategy. @@ -246,7 +245,7 @@ mod wire { } /// Represents an update within the price ticker. - #[derive(Debug, Deserialize)] + #[derive(Clone, Debug, Deserialize)] #[serde(try_from = "TickerUpdate")] pub struct PriceUpdate { pub ask: bitcoin::Amount, diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index 5179fb6f..f48fd080 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -354,11 +354,14 @@ impl LatestRate for FixedRate { } } -impl LatestRate for kraken::RateUpdateStream { +impl LatestRate for kraken::PriceUpdates { type Error = kraken::Error; fn latest_rate(&mut self) -> Result { - self.latest_update() + let update = self.latest_update()?; + let rate = Rate::new(update.ask); + + Ok(rate) } }