diff --git a/swap/src/asb/kraken.rs b/swap/src/asb/kraken.rs index 4f8e36b0..27020597 100644 --- a/swap/src/asb/kraken.rs +++ b/swap/src/asb/kraken.rs @@ -7,7 +7,8 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use std::convert::TryFrom; use tokio::sync::watch; -use tokio_tungstenite::tungstenite::Message; +use tokio_tungstenite::tungstenite::{protocol::CloseFrame, Message}; +use tracing::{error, trace}; use watch::Receiver; const KRAKEN_WS_URL: &str = "wss://ws.kraken.com"; @@ -36,8 +37,8 @@ impl LatestRate for RateService { pub enum Error { #[error("Rate has not yet been retrieved from Kraken websocket API")] NotYetRetrieved, - #[error("Message is not text")] - NonTextMessage, + #[error("Received close message from Kraken")] + CloseMessage, #[error("Websocket: ")] WebSocket(String), #[error("Serde: ")] @@ -77,11 +78,27 @@ impl RateService { while let Some(msg) = rate_stream.next().await { let msg = match msg { Ok(Message::Text(msg)) => msg, - Ok(_) => { - let _ = rate_update.send(Err(Error::NonTextMessage)); + Ok(Message::Close(close_frame)) => { + if let Some(CloseFrame { code, reason }) = close_frame { + error!( + "Kraken rate stream was closed with code {} and reason: {}", + code, reason + ); + } else { + error!("Kraken rate stream was closed without code and reason"); + } + let _ = rate_update.send(Err(Error::CloseMessage)); + continue; + } + Ok(msg) => { + trace!( + "Kraken rate stream returned non text message that will be ignored: {}", + msg + ); continue; } Err(e) => { + error!("Error when reading from Kraken rate stream: {}", e); let _ = rate_update.send(Err(e.into())); continue; }