@ -7,7 +7,8 @@ use serde::{Deserialize, Serialize};
use serde_json ::Value ;
use serde_json ::Value ;
use std ::convert ::TryFrom ;
use std ::convert ::TryFrom ;
use tokio ::sync ::watch ;
use tokio ::sync ::watch ;
use tokio_tungstenite ::tungstenite ::Message ;
use tokio_tungstenite ::tungstenite ::{ protocol ::CloseFrame , Message } ;
use tracing ::{ error , trace } ;
use watch ::Receiver ;
use watch ::Receiver ;
const KRAKEN_WS_URL : & str = "wss://ws.kraken.com" ;
const KRAKEN_WS_URL : & str = "wss://ws.kraken.com" ;
@ -36,8 +37,8 @@ impl LatestRate for RateService {
pub enum Error {
pub enum Error {
#[ error( " Rate has not yet been retrieved from Kraken websocket API " ) ]
#[ error( " Rate has not yet been retrieved from Kraken websocket API " ) ]
NotYetRetrieved ,
NotYetRetrieved ,
#[ error( " Message is not text " ) ]
#[ error( " Received close message from Kraken " ) ]
NonText Message,
Close Message,
#[ error( " Websocket: " ) ]
#[ error( " Websocket: " ) ]
WebSocket ( String ) ,
WebSocket ( String ) ,
#[ error( " Serde: " ) ]
#[ error( " Serde: " ) ]
@ -77,11 +78,27 @@ impl RateService {
while let Some ( msg ) = rate_stream . next ( ) . await {
while let Some ( msg ) = rate_stream . next ( ) . await {
let msg = match msg {
let msg = match msg {
Ok ( Message ::Text ( msg ) ) = > msg ,
Ok ( Message ::Text ( msg ) ) = > msg ,
Ok ( _ ) = > {
Ok ( Message ::Close ( close_frame ) ) = > {
let _ = rate_update . send ( Err ( Error ::NonTextMessage ) ) ;
if let Some ( CloseFrame { code , reason } ) = close_frame {
error ! (
"Kraken rate stream was closed with code {} and reason: {}" ,
code , reason
) ;
} else {
error ! ( "Kraken rate stream was closed without code and reason" ) ;
}
let _ = rate_update . send ( Err ( Error ::CloseMessage ) ) ;
continue ;
}
Ok ( msg ) = > {
trace ! (
"Kraken rate stream returned non text message that will be ignored: {}" ,
msg
) ;
continue ;
continue ;
}
}
Err ( e ) = > {
Err ( e ) = > {
error ! ( "Error when reading from Kraken rate stream: {}" , e ) ;
let _ = rate_update . send ( Err ( e . into ( ) ) ) ;
let _ = rate_update . send ( Err ( e . into ( ) ) ) ;
continue ;
continue ;
}
}