diff --git a/swap/src/bin/asb.rs b/swap/src/bin/asb.rs index df0b7676..a40e46f9 100644 --- a/swap/src/bin/asb.rs +++ b/swap/src/bin/asb.rs @@ -92,7 +92,7 @@ async fn main() -> Result<()> { bitcoin_wallet.new_address().await? ); - let kraken_rate_updates = kraken::connect().await?; + let kraken_rate_updates = kraken::connect()?; let (event_loop, _) = EventLoop::new( config.network.listen, diff --git a/swap/src/bin/kraken_ticker.rs b/swap/src/bin/kraken_ticker.rs index 2dca382a..9c0d3fdd 100644 --- a/swap/src/bin/kraken_ticker.rs +++ b/swap/src/bin/kraken_ticker.rs @@ -3,12 +3,10 @@ use anyhow::{Context, Result}; #[tokio::main] async fn main() -> Result<()> { tracing::subscriber::set_global_default( - tracing_subscriber::fmt().with_env_filter("trace").finish(), + tracing_subscriber::fmt().with_env_filter("debug").finish(), )?; - let mut ticker = swap::kraken::connect() - .await - .context("Failed to connect to kraken")?; + let mut ticker = swap::kraken::connect().context("Failed to connect to kraken")?; loop { match ticker.wait_for_update().await? { diff --git a/swap/src/kraken.rs b/swap/src/kraken.rs index e15c2f9f..69dc4bb5 100644 --- a/swap/src/kraken.rs +++ b/swap/src/kraken.rs @@ -1,93 +1,57 @@ use crate::asb::Rate; -use anyhow::Result; -use bitcoin::util::amount::ParseAmountError; -use futures::{SinkExt, StreamExt}; -use reqwest::Url; +use anyhow::{anyhow, Context, Result}; +use futures::{SinkExt, StreamExt, TryStreamExt}; use serde::{Deserialize, Serialize}; -use serde_json::Value; -use std::convert::TryFrom; +use std::convert::{Infallible, TryFrom}; +use std::sync::Arc; +use std::time::Duration; use tokio::sync::watch; -use tokio_tungstenite::tungstenite; -use tracing::{error, trace}; -pub async fn connect() -> Result { - let (rate_update, rate_update_receiver) = watch::channel(Err(Error::NotYetRetrieved)); - - let (rate_stream, _response) = - tokio_tungstenite::connect_async(Url::parse(KRAKEN_WS_URL).expect("valid url")).await?; - - let (mut rate_stream_sink, mut rate_stream) = rate_stream.split(); +/// Connect to Kraken websocket API for a constant stream of rate updates. +/// +/// If the connection fails, it will automatically be re-established. +pub fn connect() -> Result { + let (rate_update, rate_update_receiver) = watch::channel(Err(Error::NotYetAvailable)); + let rate_update = Arc::new(rate_update); tokio::spawn(async move { - while let Some(msg) = rate_stream.next().await { - let msg = match msg { - Ok(tungstenite::Message::Text(msg)) => msg, - Ok(tungstenite::Message::Close(close_frame)) => { - if let Some(tungstenite::protocol::CloseFrame { code, reason }) = close_frame { - error!( - "Kraken rate stream was closed with code {} and reason: {}", - code, reason - ); - } else { - error!("Kraken rate stream was closed without code and reason"); + let result = backoff::future::retry_notify::( + backoff::ExponentialBackoff::default(), + || { + let rate_update = rate_update.clone(); + async move { + let mut stream = connection::new().await?; + + while let Some(update) = stream.try_next().await.map_err(to_backoff)? { + let send_result = rate_update.send(Ok(update)); + + if send_result.is_err() { + return Err(backoff::Error::Permanent(anyhow!( + "receiver disconnected" + ))); + } } - let _ = rate_update.send(Err(Error::ConnectionClosed)); - continue; - } - Ok(msg) => { - trace!( - "Kraken rate stream returned non text message that will be ignored: {}", - msg - ); - continue; - } - Err(e) => { - error!(%e, "Error when reading from Kraken rate stream"); - let _ = rate_update.send(Err(e.into())); - continue; - } - }; - let update = match serde_json::from_str::(&msg) { - Ok(Event::SystemStatus) => { - tracing::debug!("Connected to Kraken websocket API"); - continue; - } - Ok(Event::SubscriptionStatus) => { - tracing::debug!("Subscribed to updates for ticker"); - continue; + Err(backoff::Error::Transient(anyhow!("stream ended"))) } - Ok(Event::Heartbeat) => { - tracing::trace!("Received heartbeat message"); - continue; - } - // if the message is not an event, it is a ticker update or an unknown event - Err(_) => match serde_json::from_str::(&msg) { - Ok(ticker) => ticker, - Err(e) => { - tracing::warn!(%e, "Failed to deserialize message '{}' as ticker update", msg); - let _ = rate_update.send(Err(Error::UnknownMessage { msg })); - continue; - } - }, - }; + }, + |error, next: Duration| { + tracing::info!(%error, "Kraken websocket connection failed, retrying in {}ms", next.as_millis()); + } + ) + .await; - let rate = match Rate::try_from(update) { - Ok(rate) => rate, - Err(e) => { - let _ = rate_update.send(Err(e)); - continue; - } - }; + match result { + Err(e) => { + tracing::warn!("Rate updates incurred an unrecoverable error: {:#}", e); - let _ = rate_update.send(Ok(rate)); + // in case the retries fail permanently, let the subscribers know + rate_update.send(Err(Error::PermanentFailure)) + } + Ok(never) => match never {}, } }); - rate_stream_sink - .send(SUBSCRIBE_XMR_BTC_TICKER_PAYLOAD.into()) - .await?; - Ok(RateUpdateStream { inner: rate_update_receiver, }) @@ -95,143 +59,263 @@ pub async fn connect() -> Result { #[derive(Clone, Debug)] pub struct RateUpdateStream { - inner: watch::Receiver>, + inner: watch::Receiver, } impl RateUpdateStream { - pub async fn wait_for_update(&mut self) -> Result> { + pub async fn wait_for_update(&mut self) -> Result { self.inner.changed().await?; Ok(self.inner.borrow().clone()) } - pub fn latest_update(&mut self) -> Result { + pub fn latest_update(&mut self) -> RateUpdate { self.inner.borrow().clone() } } -const KRAKEN_WS_URL: &str = "wss://ws.kraken.com"; -const SUBSCRIBE_XMR_BTC_TICKER_PAYLOAD: &str = r#" -{ "event": "subscribe", - "pair": [ "XMR/XBT" ], - "subscription": { - "name": "ticker" - } -}"#; - #[derive(Clone, Debug, thiserror::Error)] pub enum Error { - #[error("Rate has not yet been retrieved from Kraken websocket API")] - NotYetRetrieved, - #[error("The Kraken server closed the websocket connection")] - ConnectionClosed, - #[error("Websocket: {0}")] - WebSocket(String), - #[error("Received unknown message from Kraken: {msg}")] - UnknownMessage { msg: String }, - #[error("Data field is missing")] - DataFieldMissing, - #[error("Ask Rate Element is of unexpected type")] - UnexpectedAskRateElementType, - #[error("Ask Rate Element is missing")] - MissingAskRateElementType, - #[error("Bitcoin amount parse error: ")] - BitcoinParseAmount(#[from] ParseAmountError), + #[error("Rate is not yet available")] + NotYetAvailable, + #[error("Permanently failed to retrieve rate from Kraken")] + PermanentFailure, } -impl From for Error { - fn from(err: tungstenite::Error) -> Self { - Error::WebSocket(format!("{:#}", err)) +type RateUpdate = Result; + +/// Maps a [`connection::Error`] to a backoff error, effectively defining our +/// retry strategy. +fn to_backoff(e: connection::Error) -> backoff::Error { + use backoff::Error::*; + + match e { + // Connection closures and websocket errors will be retried + connection::Error::ConnectionClosed => Transient(anyhow::Error::from(e)), + connection::Error::WebSocket(_) => Transient(anyhow::Error::from(e)), + + // Failures while parsing a message are permanent because they most likely present a + // programmer error + connection::Error::Parse(_) => Permanent(anyhow::Error::from(e)), } } -#[derive(Debug, Serialize, Deserialize, PartialEq)] -#[serde(tag = "event")] -enum Event { - #[serde(rename = "systemStatus")] - SystemStatus, - #[serde(rename = "heartbeat")] - Heartbeat, - #[serde(rename = "subscriptionStatus")] - SubscriptionStatus, -} +/// Kraken websocket connection module. +/// +/// Responsible for establishing a connection to the Kraken websocket API and +/// transforming the received websocket frames into a stream of rate updates. +/// The connection may fail in which case it is simply terminated and the stream +/// ends. +mod connection { + use super::*; + use crate::kraken::wire; + use futures::stream::BoxStream; + use tokio_tungstenite::tungstenite; -#[derive(Debug, Serialize, Deserialize)] -#[serde(transparent)] -struct TickerUpdate(Vec); + pub async fn new() -> Result>> { + let (mut rate_stream, _) = tokio_tungstenite::connect_async("wss://ws.kraken.com") + .await + .context("Failed to connect to Kraken websocket API")?; -#[derive(Debug, Serialize, Deserialize)] -#[serde(untagged)] -enum TickerField { - Data(TickerData), - Metadata(Value), -} + rate_stream + .send(SUBSCRIBE_XMR_BTC_TICKER_PAYLOAD.into()) + .await?; -#[derive(Debug, Serialize, Deserialize)] -struct TickerData { - #[serde(rename = "a")] - ask: Vec, - #[serde(rename = "b")] - bid: Vec, -} + let stream = rate_stream.err_into().try_filter_map(parse_message).boxed(); -#[derive(Debug, Serialize, Deserialize)] -#[serde(untagged)] -enum RateElement { - Text(String), - Number(u64), -} + Ok(stream) + } + + /// Parse a websocket message into a [`Rate`]. + /// + /// Messages which are not actually ticker updates are ignored and result in + /// `None` being returned. In the context of a [`TryStream`], these will + /// simply be filtered out. + async fn parse_message(msg: tungstenite::Message) -> Result, Error> { + let msg = match msg { + tungstenite::Message::Text(msg) => msg, + tungstenite::Message::Close(close_frame) => { + if let Some(tungstenite::protocol::CloseFrame { code, reason }) = close_frame { + tracing::debug!( + "Kraken rate stream was closed with code {} and reason: {}", + code, + reason + ); + } else { + tracing::debug!("Kraken rate stream was closed without code and reason"); + } + + return Err(Error::ConnectionClosed); + } + msg => { + tracing::trace!( + "Kraken rate stream returned non text message that will be ignored: {}", + msg + ); -impl TryFrom for Rate { - type Error = Error; - - fn try_from(value: TickerUpdate) -> Result { - let data = value - .0 - .iter() - .find_map(|field| match field { - TickerField::Data(data) => Some(data), - TickerField::Metadata(_) => None, - }) - .ok_or(Error::DataFieldMissing)?; - let ask = data.ask.first().ok_or(Error::MissingAskRateElementType)?; - let ask = match ask { - RateElement::Text(ask) => { - bitcoin::Amount::from_str_in(ask, ::bitcoin::Denomination::Bitcoin)? + return Ok(None); } - _ => return Err(Error::UnexpectedAskRateElementType), }; - Ok(Self { ask }) + let update = match serde_json::from_str::(&msg) { + Ok(wire::Event::SystemStatus) => { + tracing::debug!("Connected to Kraken websocket API"); + + return Ok(None); + } + Ok(wire::Event::SubscriptionStatus) => { + tracing::debug!("Subscribed to updates for ticker"); + + return Ok(None); + } + Ok(wire::Event::Heartbeat) => { + tracing::trace!("Received heartbeat message"); + + return Ok(None); + } + // if the message is not an event, it is a ticker update or an unknown event + Err(_) => match serde_json::from_str::(&msg) { + Ok(ticker) => ticker, + Err(e) => { + tracing::warn!(%e, "Failed to deserialize message '{}' as ticker update", msg); + + return Ok(None); + } + }, + }; + + let update = Rate::try_from(update)?; + + Ok(Some(update)) + } + + #[derive(Debug, thiserror::Error)] + pub enum Error { + #[error("The Kraken server closed the websocket connection")] + ConnectionClosed, + #[error("Failed to read message from websocket stream")] + WebSocket(#[from] tungstenite::Error), + #[error("Failed to parse rate from websocket message")] + Parse(#[from] wire::Error), } + + const SUBSCRIBE_XMR_BTC_TICKER_PAYLOAD: &str = r#" + { "event": "subscribe", + "pair": [ "XMR/XBT" ], + "subscription": { + "name": "ticker" + } + }"#; } -#[cfg(test)] -mod tests { +/// Kraken websocket API wire module. +/// +/// Responsible for parsing websocket text messages to events and rate updates. +mod wire { use super::*; + use bitcoin::util::amount::ParseAmountError; + use serde_json::Value; + + #[derive(Debug, Serialize, Deserialize, PartialEq)] + #[serde(tag = "event")] + pub enum Event { + #[serde(rename = "systemStatus")] + SystemStatus, + #[serde(rename = "heartbeat")] + Heartbeat, + #[serde(rename = "subscriptionStatus")] + SubscriptionStatus, + } - #[test] - fn can_deserialize_system_status_event() { - let event = r#"{"connectionID":14859574189081089471,"event":"systemStatus","status":"online","version":"1.8.1"}"#; + #[derive(Clone, Debug, thiserror::Error)] + pub enum Error { + #[error("Data field is missing")] + DataFieldMissing, + #[error("Ask Rate Element is of unexpected type")] + UnexpectedAskRateElementType, + #[error("Ask Rate Element is missing")] + MissingAskRateElementType, + #[error("Failed to parse Bitcoin amount")] + BitcoinParseAmount(#[from] ParseAmountError), + } - let event = serde_json::from_str::(event).unwrap(); + #[derive(Debug, Serialize, Deserialize)] + #[serde(transparent)] + pub struct TickerUpdate(Vec); - assert_eq!(event, Event::SystemStatus) + #[derive(Debug, Serialize, Deserialize)] + #[serde(untagged)] + pub enum TickerField { + Data(TickerData), + Metadata(Value), } - #[test] - fn can_deserialize_subscription_status_event() { - let event = r#"{"channelID":980,"channelName":"ticker","event":"subscriptionStatus","pair":"XMR/XBT","status":"subscribed","subscription":{"name":"ticker"}}"#; + #[derive(Debug, Serialize, Deserialize)] + pub struct TickerData { + #[serde(rename = "a")] + ask: Vec, + #[serde(rename = "b")] + bid: Vec, + } + + #[derive(Debug, Serialize, Deserialize)] + #[serde(untagged)] + pub enum RateElement { + Text(String), + Number(u64), + } - let event = serde_json::from_str::(event).unwrap(); + impl TryFrom for Rate { + type Error = Error; + + fn try_from(value: TickerUpdate) -> Result { + let data = value + .0 + .iter() + .find_map(|field| match field { + TickerField::Data(data) => Some(data), + TickerField::Metadata(_) => None, + }) + .ok_or(Error::DataFieldMissing)?; + let ask = data.ask.first().ok_or(Error::MissingAskRateElementType)?; + let ask = match ask { + RateElement::Text(ask) => { + bitcoin::Amount::from_str_in(ask, ::bitcoin::Denomination::Bitcoin)? + } + _ => return Err(Error::UnexpectedAskRateElementType), + }; - assert_eq!(event, Event::SubscriptionStatus) + Ok(Self { ask }) + } } - #[test] - fn deserialize_ticker_update() { - let message = r#"[980,{"a":["0.00440700",7,"7.35318535"],"b":["0.00440200",7,"7.57416678"],"c":["0.00440700","0.22579000"],"v":["273.75489000","4049.91233351"],"p":["0.00446205","0.00441699"],"t":[123,1310],"l":["0.00439400","0.00429900"],"h":["0.00450000","0.00450000"],"o":["0.00449100","0.00433700"]},"ticker","XMR/XBT"]"#; + #[cfg(test)] + mod tests { + use super::*; + + #[test] + fn can_deserialize_system_status_event() { + let event = r#"{"connectionID":14859574189081089471,"event":"systemStatus","status":"online","version":"1.8.1"}"#; + + let event = serde_json::from_str::(event).unwrap(); - let _ = serde_json::from_str::(message).unwrap(); + assert_eq!(event, Event::SystemStatus) + } + + #[test] + fn can_deserialize_subscription_status_event() { + let event = r#"{"channelID":980,"channelName":"ticker","event":"subscriptionStatus","pair":"XMR/XBT","status":"subscribed","subscription":{"name":"ticker"}}"#; + + let event = serde_json::from_str::(event).unwrap(); + + assert_eq!(event, Event::SubscriptionStatus) + } + + #[test] + fn deserialize_ticker_update() { + let message = r#"[980,{"a":["0.00440700",7,"7.35318535"],"b":["0.00440200",7,"7.57416678"],"c":["0.00440700","0.22579000"],"v":["273.75489000","4049.91233351"],"p":["0.00446205","0.00441699"],"t":[123,1310],"l":["0.00439400","0.00429900"],"h":["0.00450000","0.00450000"],"o":["0.00449100","0.00433700"]},"ticker","XMR/XBT"]"#; + + let _ = serde_json::from_str::(message).unwrap(); + } } }