287: Properly deal with messages on kraken websocket connection r=da-kami a=thomaseizinger



Co-authored-by: Thomas Eizinger <thomas@eizinger.io>
pull/289/head
bors[bot] 3 years ago committed by GitHub
commit b7709e1200
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,13 +1,7 @@
mod amounts;
pub mod command;
pub mod config;
pub mod fixed_rate;
pub mod kraken;
mod fixed_rate;
mod rate;
pub use amounts::Rate;
pub trait LatestRate {
type Error: std::error::Error + Send + Sync + 'static;
fn latest_rate(&mut self) -> Result<Rate, Self::Error>;
}
pub use self::fixed_rate::FixedRate;
pub use self::rate::Rate;

@ -1,23 +1,20 @@
use crate::asb::{LatestRate, Rate};
use std::convert::Infallible;
use crate::asb::Rate;
pub const RATE: f64 = 0.01;
#[derive(Clone, Debug)]
pub struct FixedRate(Rate);
#[derive(Clone)]
pub struct RateService(Rate);
impl FixedRate {
pub const RATE: f64 = 0.01;
impl LatestRate for RateService {
type Error = Infallible;
fn latest_rate(&mut self) -> Result<Rate, Infallible> {
Ok(self.0)
pub fn value(&self) -> Rate {
self.0
}
}
impl Default for RateService {
impl Default for FixedRate {
fn default() -> Self {
Self(Rate {
ask: bitcoin::Amount::from_btc(RATE).expect("Static value should never fail"),
ask: bitcoin::Amount::from_btc(Self::RATE).expect("Static value should never fail"),
})
}
}

@ -1,203 +0,0 @@
use crate::asb::{LatestRate, Rate};
use anyhow::Result;
use bitcoin::util::amount::ParseAmountError;
use futures::{SinkExt, StreamExt};
use reqwest::Url;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::convert::TryFrom;
use tokio::sync::watch;
use tokio_tungstenite::tungstenite::protocol::CloseFrame;
use tokio_tungstenite::tungstenite::Message;
use tracing::{error, trace};
use watch::Receiver;
const KRAKEN_WS_URL: &str = "wss://ws.kraken.com";
const SUBSCRIBE_XMR_BTC_TICKER_PAYLOAD: &str = r#"
{ "event": "subscribe",
"pair": [ "XMR/XBT" ],
"subscription": {
"name": "ticker"
}
}"#;
#[derive(Clone)]
pub struct RateService {
receiver: Receiver<Result<Rate, Error>>,
}
impl LatestRate for RateService {
type Error = Error;
fn latest_rate(&mut self) -> Result<Rate, Self::Error> {
(*self.receiver.borrow()).clone()
}
}
#[derive(Clone, Debug, thiserror::Error)]
pub enum Error {
#[error("Rate has not yet been retrieved from Kraken websocket API")]
NotYetRetrieved,
#[error("Received close message from Kraken")]
CloseMessage,
#[error("Websocket: ")]
WebSocket(String),
#[error("Serde: ")]
Serde(String),
#[error("Data field is missing")]
DataFieldMissing,
#[error("Ask Rate Element is of unexpected type")]
UnexpectedAskRateElementType,
#[error("Ask Rate Element is missing")]
MissingAskRateElementType,
#[error("Bitcoin amount parse error: ")]
BitcoinParseAmount(#[from] ParseAmountError),
}
impl From<tokio_tungstenite::tungstenite::Error> for Error {
fn from(err: tokio_tungstenite::tungstenite::Error) -> Self {
Error::WebSocket(format!("{:#}", err))
}
}
impl From<serde_json::Error> for Error {
fn from(err: serde_json::Error) -> Self {
Error::Serde(format!("{:#}", err))
}
}
impl RateService {
pub async fn new() -> Result<Self> {
let (rate_update, rate_update_receiver) = watch::channel(Err(Error::NotYetRetrieved));
let (rate_stream, _response) =
tokio_tungstenite::connect_async(Url::parse(KRAKEN_WS_URL).expect("valid url")).await?;
let (mut rate_stream_sink, mut rate_stream) = rate_stream.split();
tokio::spawn(async move {
while let Some(msg) = rate_stream.next().await {
let msg = match msg {
Ok(Message::Text(msg)) => msg,
Ok(Message::Close(close_frame)) => {
if let Some(CloseFrame { code, reason }) = close_frame {
error!(
"Kraken rate stream was closed with code {} and reason: {}",
code, reason
);
} else {
error!("Kraken rate stream was closed without code and reason");
}
let _ = rate_update.send(Err(Error::CloseMessage));
continue;
}
Ok(msg) => {
trace!(
"Kraken rate stream returned non text message that will be ignored: {}",
msg
);
continue;
}
Err(e) => {
error!("Error when reading from Kraken rate stream: {}", e);
let _ = rate_update.send(Err(e.into()));
continue;
}
};
// If we encounter a heartbeat we skip it and iterate again
if msg.eq(r#"{"event":"heartbeat"}"#) {
continue;
}
let ticker = match serde_json::from_str::<TickerUpdate>(&msg) {
Ok(ticker) => ticker,
Err(e) => {
let _ = rate_update.send(Err(e.into()));
continue;
}
};
let rate = match Rate::try_from(ticker) {
Ok(rate) => rate,
Err(e) => {
let _ = rate_update.send(Err(e));
continue;
}
};
let _ = rate_update.send(Ok(rate));
}
});
rate_stream_sink
.send(SUBSCRIBE_XMR_BTC_TICKER_PAYLOAD.into())
.await?;
Ok(Self {
receiver: rate_update_receiver,
})
}
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(transparent)]
struct TickerUpdate(Vec<TickerField>);
#[derive(Debug, Serialize, Deserialize)]
#[serde(untagged)]
enum TickerField {
Data(TickerData),
Metadata(Value),
}
#[derive(Debug, Serialize, Deserialize)]
struct TickerData {
#[serde(rename = "a")]
ask: Vec<RateElement>,
#[serde(rename = "b")]
bid: Vec<RateElement>,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(untagged)]
enum RateElement {
Text(String),
Number(u64),
}
impl TryFrom<TickerUpdate> for Rate {
type Error = Error;
fn try_from(value: TickerUpdate) -> Result<Self, Error> {
let data = value
.0
.iter()
.find_map(|field| match field {
TickerField::Data(data) => Some(data),
TickerField::Metadata(_) => None,
})
.ok_or(Error::DataFieldMissing)?;
let ask = data.ask.first().ok_or(Error::MissingAskRateElementType)?;
let ask = match ask {
RateElement::Text(ask) => {
bitcoin::Amount::from_str_in(ask, ::bitcoin::Denomination::Bitcoin)?
}
_ => return Err(Error::UnexpectedAskRateElementType),
};
Ok(Self { ask })
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn deserialize_ticker_update() {
let sample_response = r#"[980,{"a":["0.00521900",4,"4.84775132"],"b":["0.00520600",70,"70.35668921"],"c":["0.00520700","0.00000186"],"v":["18530.40510860","18531.94887860"],"p":["0.00489493","0.00489490"],"t":[5017,5018],"l":["0.00448300","0.00448300"],"h":["0.00525000","0.00525000"],"o":["0.00450000","0.00451000"]},"ticker","XMR/XBT"]"#;
let _ = serde_json::from_str::<TickerUpdate>(sample_response).unwrap();
}
}

@ -23,7 +23,6 @@ use swap::asb::command::{Arguments, Command};
use swap::asb::config::{
initial_setup, query_user_for_initial_testnet_config, read_config, Config, ConfigNotInitialized,
};
use swap::asb::kraken;
use swap::database::Database;
use swap::execution_params::GetExecutionParams;
use swap::fs::default_config_path;
@ -31,7 +30,7 @@ use swap::monero::Amount;
use swap::protocol::alice::EventLoop;
use swap::seed::Seed;
use swap::trace::init_tracing;
use swap::{bitcoin, execution_params, monero};
use swap::{bitcoin, execution_params, kraken, monero};
use tracing::{info, warn};
use tracing_subscriber::filter::LevelFilter;
@ -93,7 +92,7 @@ async fn main() -> Result<()> {
bitcoin_wallet.new_address().await?
);
let rate_service = kraken::RateService::new().await?;
let kraken_rate_updates = kraken::connect().await?;
let (event_loop, _) = EventLoop::new(
config.network.listen,
@ -102,7 +101,7 @@ async fn main() -> Result<()> {
Arc::new(bitcoin_wallet),
Arc::new(monero_wallet),
Arc::new(db),
rate_service,
kraken_rate_updates,
max_buy,
)
.unwrap();

@ -0,0 +1,19 @@
use anyhow::{Context, Result};
#[tokio::main]
async fn main() -> Result<()> {
tracing::subscriber::set_global_default(
tracing_subscriber::fmt().with_env_filter("trace").finish(),
)?;
let mut ticker = swap::kraken::connect()
.await
.context("Failed to connect to kraken")?;
loop {
match ticker.wait_for_update().await? {
Ok(rate) => println!("Rate update: {}", rate),
Err(e) => println!("Error: {:#}", e),
}
}
}

@ -0,0 +1,237 @@
use crate::asb::Rate;
use anyhow::Result;
use bitcoin::util::amount::ParseAmountError;
use futures::{SinkExt, StreamExt};
use reqwest::Url;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::convert::TryFrom;
use tokio::sync::watch;
use tokio_tungstenite::tungstenite;
use tracing::{error, trace};
pub async fn connect() -> Result<RateUpdateStream> {
let (rate_update, rate_update_receiver) = watch::channel(Err(Error::NotYetRetrieved));
let (rate_stream, _response) =
tokio_tungstenite::connect_async(Url::parse(KRAKEN_WS_URL).expect("valid url")).await?;
let (mut rate_stream_sink, mut rate_stream) = rate_stream.split();
tokio::spawn(async move {
while let Some(msg) = rate_stream.next().await {
let msg = match msg {
Ok(tungstenite::Message::Text(msg)) => msg,
Ok(tungstenite::Message::Close(close_frame)) => {
if let Some(tungstenite::protocol::CloseFrame { code, reason }) = close_frame {
error!(
"Kraken rate stream was closed with code {} and reason: {}",
code, reason
);
} else {
error!("Kraken rate stream was closed without code and reason");
}
let _ = rate_update.send(Err(Error::ConnectionClosed));
continue;
}
Ok(msg) => {
trace!(
"Kraken rate stream returned non text message that will be ignored: {}",
msg
);
continue;
}
Err(e) => {
error!(%e, "Error when reading from Kraken rate stream");
let _ = rate_update.send(Err(e.into()));
continue;
}
};
let update = match serde_json::from_str::<Event>(&msg) {
Ok(Event::SystemStatus) => {
tracing::debug!("Connected to Kraken websocket API");
continue;
}
Ok(Event::SubscriptionStatus) => {
tracing::debug!("Subscribed to updates for ticker");
continue;
}
Ok(Event::Heartbeat) => {
tracing::trace!("Received heartbeat message");
continue;
}
// if the message is not an event, it is a ticker update or an unknown event
Err(_) => match serde_json::from_str::<TickerUpdate>(&msg) {
Ok(ticker) => ticker,
Err(e) => {
tracing::warn!(%e, "Failed to deserialize message '{}' as ticker update", msg);
let _ = rate_update.send(Err(Error::UnknownMessage { msg }));
continue;
}
},
};
let rate = match Rate::try_from(update) {
Ok(rate) => rate,
Err(e) => {
let _ = rate_update.send(Err(e));
continue;
}
};
let _ = rate_update.send(Ok(rate));
}
});
rate_stream_sink
.send(SUBSCRIBE_XMR_BTC_TICKER_PAYLOAD.into())
.await?;
Ok(RateUpdateStream {
inner: rate_update_receiver,
})
}
#[derive(Clone, Debug)]
pub struct RateUpdateStream {
inner: watch::Receiver<Result<Rate, Error>>,
}
impl RateUpdateStream {
pub async fn wait_for_update(&mut self) -> Result<Result<Rate, Error>> {
self.inner.changed().await?;
Ok(self.inner.borrow().clone())
}
pub fn latest_update(&mut self) -> Result<Rate, Error> {
self.inner.borrow().clone()
}
}
const KRAKEN_WS_URL: &str = "wss://ws.kraken.com";
const SUBSCRIBE_XMR_BTC_TICKER_PAYLOAD: &str = r#"
{ "event": "subscribe",
"pair": [ "XMR/XBT" ],
"subscription": {
"name": "ticker"
}
}"#;
#[derive(Clone, Debug, thiserror::Error)]
pub enum Error {
#[error("Rate has not yet been retrieved from Kraken websocket API")]
NotYetRetrieved,
#[error("The Kraken server closed the websocket connection")]
ConnectionClosed,
#[error("Websocket: {0}")]
WebSocket(String),
#[error("Received unknown message from Kraken: {msg}")]
UnknownMessage { msg: String },
#[error("Data field is missing")]
DataFieldMissing,
#[error("Ask Rate Element is of unexpected type")]
UnexpectedAskRateElementType,
#[error("Ask Rate Element is missing")]
MissingAskRateElementType,
#[error("Bitcoin amount parse error: ")]
BitcoinParseAmount(#[from] ParseAmountError),
}
impl From<tungstenite::Error> for Error {
fn from(err: tungstenite::Error) -> Self {
Error::WebSocket(format!("{:#}", err))
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[serde(tag = "event")]
enum Event {
#[serde(rename = "systemStatus")]
SystemStatus,
#[serde(rename = "heartbeat")]
Heartbeat,
#[serde(rename = "subscriptionStatus")]
SubscriptionStatus,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(transparent)]
struct TickerUpdate(Vec<TickerField>);
#[derive(Debug, Serialize, Deserialize)]
#[serde(untagged)]
enum TickerField {
Data(TickerData),
Metadata(Value),
}
#[derive(Debug, Serialize, Deserialize)]
struct TickerData {
#[serde(rename = "a")]
ask: Vec<RateElement>,
#[serde(rename = "b")]
bid: Vec<RateElement>,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(untagged)]
enum RateElement {
Text(String),
Number(u64),
}
impl TryFrom<TickerUpdate> for Rate {
type Error = Error;
fn try_from(value: TickerUpdate) -> Result<Self, Error> {
let data = value
.0
.iter()
.find_map(|field| match field {
TickerField::Data(data) => Some(data),
TickerField::Metadata(_) => None,
})
.ok_or(Error::DataFieldMissing)?;
let ask = data.ask.first().ok_or(Error::MissingAskRateElementType)?;
let ask = match ask {
RateElement::Text(ask) => {
bitcoin::Amount::from_str_in(ask, ::bitcoin::Denomination::Bitcoin)?
}
_ => return Err(Error::UnexpectedAskRateElementType),
};
Ok(Self { ask })
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn can_deserialize_system_status_event() {
let event = r#"{"connectionID":14859574189081089471,"event":"systemStatus","status":"online","version":"1.8.1"}"#;
let event = serde_json::from_str::<Event>(event).unwrap();
assert_eq!(event, Event::SystemStatus)
}
#[test]
fn can_deserialize_subscription_status_event() {
let event = r#"{"channelID":980,"channelName":"ticker","event":"subscriptionStatus","pair":"XMR/XBT","status":"subscribed","subscription":{"name":"ticker"}}"#;
let event = serde_json::from_str::<Event>(event).unwrap();
assert_eq!(event, Event::SubscriptionStatus)
}
#[test]
fn deserialize_ticker_update() {
let message = r#"[980,{"a":["0.00440700",7,"7.35318535"],"b":["0.00440200",7,"7.57416678"],"c":["0.00440700","0.22579000"],"v":["273.75489000","4049.91233351"],"p":["0.00446205","0.00441699"],"t":[123,1310],"l":["0.00439400","0.00429900"],"h":["0.00450000","0.00450000"],"o":["0.00449100","0.00433700"]},"ticker","XMR/XBT"]"#;
let _ = serde_json::from_str::<TickerUpdate>(message).unwrap();
}
}

@ -22,6 +22,7 @@ pub mod cli;
pub mod database;
pub mod execution_params;
pub mod fs;
pub mod kraken;
pub mod monero;
pub mod network;
pub mod protocol;

@ -1,4 +1,4 @@
use crate::asb::LatestRate;
use crate::asb::{FixedRate, Rate};
use crate::database::Database;
use crate::execution_params::ExecutionParams;
use crate::monero::BalanceTooLow;
@ -8,70 +8,20 @@ use crate::protocol::alice;
use crate::protocol::alice::{AliceState, Behaviour, OutEvent, State3, Swap, TransferProof};
use crate::protocol::bob::EncryptedSignature;
use crate::seed::Seed;
use crate::{bitcoin, monero};
use crate::{bitcoin, kraken, monero};
use anyhow::{bail, Context, Result};
use futures::future::RemoteHandle;
use libp2p::core::Multiaddr;
use libp2p::futures::FutureExt;
use libp2p::{PeerId, Swarm};
use rand::rngs::OsRng;
use std::convert::Infallible;
use std::sync::Arc;
use tokio::sync::mpsc::error::SendError;
use tokio::sync::{broadcast, mpsc};
use tracing::{debug, error, trace};
use uuid::Uuid;
#[allow(missing_debug_implementations)]
pub struct MpscChannels<T> {
sender: mpsc::Sender<T>,
receiver: mpsc::Receiver<T>,
}
impl<T> Default for MpscChannels<T> {
fn default() -> Self {
let (sender, receiver) = mpsc::channel(100);
MpscChannels { sender, receiver }
}
}
#[allow(missing_debug_implementations)]
pub struct BroadcastChannels<T>
where
T: Clone,
{
sender: broadcast::Sender<T>,
}
impl<T> Default for BroadcastChannels<T>
where
T: Clone,
{
fn default() -> Self {
let (sender, _receiver) = broadcast::channel(100);
BroadcastChannels { sender }
}
}
#[derive(Debug)]
pub struct EventLoopHandle {
recv_encrypted_signature: broadcast::Receiver<EncryptedSignature>,
send_transfer_proof: mpsc::Sender<(PeerId, TransferProof)>,
}
impl EventLoopHandle {
pub async fn recv_encrypted_signature(&mut self) -> Result<EncryptedSignature> {
self.recv_encrypted_signature
.recv()
.await
.context("Failed to receive Bitcoin encrypted signature from Bob")
}
pub async fn send_transfer_proof(&mut self, bob: PeerId, msg: TransferProof) -> Result<()> {
let _ = self.send_transfer_proof.send((bob, msg)).await?;
Ok(())
}
}
#[allow(missing_debug_implementations)]
pub struct EventLoop<RS> {
swarm: libp2p::Swarm<Behaviour>,
@ -80,7 +30,7 @@ pub struct EventLoop<RS> {
bitcoin_wallet: Arc<bitcoin::Wallet>,
monero_wallet: Arc<monero::Wallet>,
db: Arc<Database>,
rate_service: RS,
latest_rate: RS,
max_buy: bitcoin::Amount,
recv_encrypted_signature: broadcast::Sender<EncryptedSignature>,
@ -92,9 +42,15 @@ pub struct EventLoop<RS> {
swap_handle_sender: mpsc::Sender<RemoteHandle<Result<AliceState>>>,
}
impl<RS> EventLoop<RS>
#[derive(Debug)]
pub struct EventLoopHandle {
recv_encrypted_signature: broadcast::Receiver<EncryptedSignature>,
send_transfer_proof: mpsc::Sender<(PeerId, TransferProof)>,
}
impl<LR> EventLoop<LR>
where
RS: LatestRate,
LR: LatestRate,
{
#[allow(clippy::too_many_arguments)]
pub fn new(
@ -104,7 +60,7 @@ where
bitcoin_wallet: Arc<bitcoin::Wallet>,
monero_wallet: Arc<monero::Wallet>,
db: Arc<Database>,
rate_service: RS,
latest_rate: LR,
max_buy: bitcoin::Amount,
) -> Result<(Self, mpsc::Receiver<RemoteHandle<Result<AliceState>>>)> {
let identity = seed.derive_libp2p_identity();
@ -132,7 +88,7 @@ where
bitcoin_wallet,
monero_wallet,
db,
rate_service,
latest_rate,
recv_encrypted_signature: recv_encrypted_signature.sender,
send_transfer_proof: send_transfer_proof.receiver,
send_transfer_proof_sender: send_transfer_proof.sender,
@ -239,7 +195,7 @@ where
monero_wallet: Arc<monero::Wallet>,
) -> Result<monero::Amount> {
let rate = self
.rate_service
.latest_rate
.latest_rate()
.context("Failed to get latest rate")?;
@ -265,7 +221,7 @@ where
async fn make_quote(&mut self, max_buy: bitcoin::Amount) -> Result<BidQuote> {
let rate = self
.rate_service
.latest_rate
.latest_rate()
.context("Failed to get latest rate")?;
@ -313,9 +269,76 @@ where
}
}
pub trait LatestRate {
type Error: std::error::Error + Send + Sync + 'static;
fn latest_rate(&mut self) -> Result<Rate, Self::Error>;
}
impl LatestRate for FixedRate {
type Error = Infallible;
fn latest_rate(&mut self) -> Result<Rate, Self::Error> {
Ok(self.value())
}
}
impl LatestRate for kraken::RateUpdateStream {
type Error = kraken::Error;
fn latest_rate(&mut self) -> Result<Rate, Self::Error> {
self.latest_update()
}
}
impl EventLoopHandle {
pub async fn recv_encrypted_signature(&mut self) -> Result<EncryptedSignature> {
self.recv_encrypted_signature
.recv()
.await
.context("Failed to receive Bitcoin encrypted signature from Bob")
}
pub async fn send_transfer_proof(&mut self, bob: PeerId, msg: TransferProof) -> Result<()> {
let _ = self.send_transfer_proof.send((bob, msg)).await?;
Ok(())
}
}
#[derive(Debug, Clone, Copy, thiserror::Error)]
#[error("Refusing to buy {actual} because the maximum configured limit is {max}")]
pub struct MaximumBuyAmountExceeded {
pub max: bitcoin::Amount,
pub actual: bitcoin::Amount,
}
#[allow(missing_debug_implementations)]
struct MpscChannels<T> {
sender: mpsc::Sender<T>,
receiver: mpsc::Receiver<T>,
}
impl<T> Default for MpscChannels<T> {
fn default() -> Self {
let (sender, receiver) = mpsc::channel(100);
MpscChannels { sender, receiver }
}
}
#[allow(missing_debug_implementations)]
struct BroadcastChannels<T>
where
T: Clone,
{
sender: broadcast::Sender<T>,
}
impl<T> Default for BroadcastChannels<T>
where
T: Clone,
{
fn default() -> Self {
let (sender, _receiver) = broadcast::channel(100);
BroadcastChannels { sender }
}
}

@ -14,8 +14,7 @@ use std::convert::Infallible;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use swap::asb::fixed_rate;
use swap::asb::fixed_rate::RATE;
use swap::asb::FixedRate;
use swap::bitcoin::{CancelTimelock, PunishTimelock};
use swap::database::Database;
use swap::execution_params::{ExecutionParams, GetExecutionParams};
@ -344,7 +343,7 @@ where
let (monero, containers) = testutils::init_containers(&cli).await;
let btc_amount = bitcoin::Amount::from_sat(1_000_000);
let xmr_amount = monero::Amount::from_monero(btc_amount.as_btc() / RATE).unwrap();
let xmr_amount = monero::Amount::from_monero(btc_amount.as_btc() / FixedRate::RATE).unwrap();
let alice_starting_balances = StartingBalances {
xmr: xmr_amount * 10,
@ -410,7 +409,7 @@ where
alice_bitcoin_wallet.clone(),
alice_monero_wallet.clone(),
alice_db,
fixed_rate::RateService::default(),
FixedRate::default(),
bitcoin::Amount::ONE_BTC,
)
.unwrap();

Loading…
Cancel
Save