From 63cfcf22e0f3d5992dd81cfd428c07ac27101e12 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 25 Jun 2021 12:44:37 +1000 Subject: [PATCH] Simplify error handling --- Cargo.lock | 46 +++++----- swap/Cargo.toml | 4 +- swap/src/network/swap_setup.rs | 22 +++-- swap/src/protocol/alice/swap_setup.rs | 124 ++++++++++++-------------- 4 files changed, 92 insertions(+), 104 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a6b897a8..0b007335 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -294,6 +294,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4d73a8ae8ce52d09395e4cafc83b5b81c3deb70a97740e907669c8683c4dd50a" +[[package]] +name = "bimap" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50ae17cabbc8a38a1e3e4c1a6a664e9a09672dc14d0896fa8d865d3a5a446b07" + [[package]] name = "bincode" version = "1.3.1" @@ -1832,9 +1838,8 @@ dependencies = [ [[package]] name = "libp2p-dns" -version = "0.28.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62e63dab8b5ff35e0c101a3e51e843ba782c07bbb1682f5fd827622e0d02b98b" +version = "0.29.0" +source = "git+https://github.com/comit-network/rust-libp2p.git?rev=96002105b0a7019330413826a332b3a12a7e8a57#96002105b0a7019330413826a332b3a12a7e8a57" dependencies = [ "futures", "libp2p-core", @@ -1860,9 +1865,8 @@ dependencies = [ [[package]] name = "libp2p-mplex" -version = "0.28.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85e9b544335d1ed30af71daa96edbefadef6f19c7a55f078b9fc92c87163105d" +version = "0.29.0" +source = "git+https://github.com/comit-network/rust-libp2p.git?rev=96002105b0a7019330413826a332b3a12a7e8a57#96002105b0a7019330413826a332b3a12a7e8a57" dependencies = [ "asynchronous-codec", "bytes 1.0.1", @@ -1878,9 +1882,8 @@ dependencies = [ [[package]] name = "libp2p-noise" -version = "0.31.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57a2aa6fc4e6855eaf9ea1941a14f7ec4df35636fb6b85951e17481df8dcecf6" +version = "0.32.0" +source = "git+https://github.com/comit-network/rust-libp2p.git?rev=96002105b0a7019330413826a332b3a12a7e8a57#96002105b0a7019330413826a332b3a12a7e8a57" dependencies = [ "bytes 1.0.1", "curve25519-dalek", @@ -1900,9 +1903,8 @@ dependencies = [ [[package]] name = "libp2p-ping" -version = "0.29.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf4bfaffac63bf3c7ec11ed9d8879d455966ddea7e78ee14737f0b6dce0d1cd1" +version = "0.30.0" +source = "git+https://github.com/comit-network/rust-libp2p.git?rev=96002105b0a7019330413826a332b3a12a7e8a57#96002105b0a7019330413826a332b3a12a7e8a57" dependencies = [ "futures", "libp2p-core", @@ -1937,9 +1939,8 @@ dependencies = [ [[package]] name = "libp2p-request-response" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cdbe172f08e6d0f95fa8634e273d4c4268c4063de2e33e7435194b0130c62e3" +version = "0.12.0" +source = "git+https://github.com/comit-network/rust-libp2p.git?rev=96002105b0a7019330413826a332b3a12a7e8a57#96002105b0a7019330413826a332b3a12a7e8a57" dependencies = [ "async-trait", "bytes 1.0.1", @@ -1981,9 +1982,8 @@ dependencies = [ [[package]] name = "libp2p-tcp" -version = "0.28.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b1a27d21c477951799e99d5c105d78868258502ce092988040a808d5a19bbd9" +version = "0.29.0" +source = "git+https://github.com/comit-network/rust-libp2p.git?rev=96002105b0a7019330413826a332b3a12a7e8a57#96002105b0a7019330413826a332b3a12a7e8a57" dependencies = [ "futures", "futures-timer", @@ -1998,9 +1998,8 @@ dependencies = [ [[package]] name = "libp2p-websocket" -version = "0.29.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cace60995ef6f637e4752cccbb2590f6bc358e8741a0d066307636c69a4b3a74" +version = "0.30.0" +source = "git+https://github.com/comit-network/rust-libp2p.git?rev=96002105b0a7019330413826a332b3a12a7e8a57#96002105b0a7019330413826a332b3a12a7e8a57" dependencies = [ "either", "futures", @@ -2016,9 +2015,8 @@ dependencies = [ [[package]] name = "libp2p-yamux" -version = "0.32.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f35da42cfc6d5cb0dcf3ad6881bc68d146cdf38f98655e09e33fbba4d13eabc4" +version = "0.33.0" +source = "git+https://github.com/comit-network/rust-libp2p.git?rev=96002105b0a7019330413826a332b3a12a7e8a57#96002105b0a7019330413826a332b3a12a7e8a57" dependencies = [ "futures", "libp2p-core", diff --git a/swap/Cargo.toml b/swap/Cargo.toml index 0658ada7..23a51b44 100644 --- a/swap/Cargo.toml +++ b/swap/Cargo.toml @@ -29,8 +29,8 @@ ecdsa_fun = { git = "https://github.com/LLFourn/secp256kfun", default-features = ed25519-dalek = "1" futures = { version = "0.3", default-features = false } itertools = "0.10" -libp2p = { git = "https://github.com/comit-network/rust-libp2p.git", rev = "96002105b0a7019330413826a332b3a12a7e8a57", default-features = false, features = ["rendezvous", "request-response", "websocket", "ping", "tcp-tokio", "yamux", "mplex", "dns-tokio", "noise", "identify"] } -miniscript = { version = "5", features = ["serde"] } +libp2p = { git = "https://github.com/comit-network/rust-libp2p.git", rev = "96002105b0a7019330413826a332b3a12a7e8a57", default-features = false, features = [ "rendezvous", "request-response", "websocket", "ping", "tcp-tokio", "yamux", "mplex", "dns-tokio", "noise", "identify" ] } +miniscript = { version = "5", features = [ "serde" ] } monero = { version = "0.12", features = [ "serde_support" ] } monero-rpc = { path = "../monero-rpc" } pem = "0.8" diff --git a/swap/src/network/swap_setup.rs b/swap/src/network/swap_setup.rs index 43d5ff49..fb5a5657 100644 --- a/swap/src/network/swap_setup.rs +++ b/swap/src/network/swap_setup.rs @@ -1,4 +1,5 @@ use crate::monero; +use anyhow::{Context, Result}; use libp2p::core::upgrade; use libp2p::swarm::NegotiatedSubstream; use serde::de::DeserializeOwned; @@ -82,26 +83,29 @@ pub enum SpotPriceError { Other, } -pub async fn read_cbor_message(substream: &mut NegotiatedSubstream) -> anyhow::Result +pub async fn read_cbor_message(substream: &mut NegotiatedSubstream) -> Result where T: DeserializeOwned, { - let bytes = upgrade::read_one(substream, BUF_SIZE).await?; + let bytes = upgrade::read_one(substream, BUF_SIZE) + .await + .context("Failed to read length-prefixed message from stream")?; let mut de = serde_cbor::Deserializer::from_slice(&bytes); - let message = T::deserialize(&mut de)?; + let message = + T::deserialize(&mut de).context("Failed to deserialize bytes into message using CBOR")?; Ok(message) } -pub async fn write_cbor_message( - substream: &mut NegotiatedSubstream, - message: T, -) -> anyhow::Result<()> +pub async fn write_cbor_message(substream: &mut NegotiatedSubstream, message: T) -> Result<()> where T: Serialize, { - let bytes = serde_cbor::to_vec(&message)?; - upgrade::write_with_len_prefix(substream, &bytes).await?; + let bytes = + serde_cbor::to_vec(&message).context("Failed to serialize message as bytes using CBOR")?; + upgrade::write_with_len_prefix(substream, &bytes) + .await + .context("Failed to write bytes as length-prefixed message")?; Ok(()) } diff --git a/swap/src/protocol/alice/swap_setup.rs b/swap/src/protocol/alice/swap_setup.rs index 1d228fcb..333a4769 100644 --- a/swap/src/protocol/alice/swap_setup.rs +++ b/swap/src/protocol/alice/swap_setup.rs @@ -1,9 +1,12 @@ -use std::collections::VecDeque; -use std::fmt::Debug; -use std::task::{Context, Poll}; -use std::time::Duration; - -use anyhow::{anyhow, Context as _, Result}; +use crate::network::swap_setup; +use crate::network::swap_setup::{ + protocol, BlockchainNetwork, SpotPriceError, SpotPriceRequest, SpotPriceResponse, +}; +use crate::protocol::alice::event_loop::LatestRate; +use crate::protocol::alice::{State0, State3}; +use crate::protocol::{alice, Message0, Message2, Message4}; +use crate::{bitcoin, env, monero}; +use anyhow::{anyhow, Context, Result}; use futures::future::{BoxFuture, OptionFuture}; use futures::FutureExt; use libp2p::core::connection::ConnectionId; @@ -13,19 +16,13 @@ use libp2p::swarm::{ ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, }; use libp2p::{Multiaddr, PeerId}; -use std::time::Instant; +use std::collections::VecDeque; +use std::fmt::Debug; +use std::task::Poll; +use std::time::{Duration, Instant}; use uuid::Uuid; use void::Void; -use crate::network::swap_setup; -use crate::network::swap_setup::{ - protocol, BlockchainNetwork, SpotPriceError, SpotPriceRequest, SpotPriceResponse, -}; -use crate::protocol::alice::event_loop::LatestRate; -use crate::protocol::alice::{State0, State3}; -use crate::protocol::{alice, Message0, Message2, Message4}; -use crate::{bitcoin, env, monero}; - #[derive(Debug)] #[allow(clippy::large_enum_variant)] pub enum OutEvent { @@ -39,7 +36,7 @@ pub enum OutEvent { }, Error { peer_id: PeerId, - error: Error, + error: anyhow::Error, }, } @@ -186,7 +183,7 @@ where fn poll( &mut self, - _cx: &mut Context<'_>, + _cx: &mut std::task::Context<'_>, _params: &mut impl PollParameters, ) -> Poll> { if let Some(event) = self.events.pop_front() { @@ -197,7 +194,7 @@ where } } -type InboundStream = BoxFuture<'static, anyhow::Result<(Uuid, alice::State3), Error>>; +type InboundStream = BoxFuture<'static, Result<(Uuid, alice::State3)>>; pub struct Handler { inbound_stream: OptionFuture, @@ -239,7 +236,7 @@ impl Handler { #[allow(clippy::large_enum_variant)] pub enum HandlerOutEvent { Initiated(bmrng::RequestReceiver), - Completed(anyhow::Result<(Uuid, alice::State3), Error>), + Completed(Result<(Uuid, alice::State3)>), } impl ProtocolsHandler for Handler @@ -278,11 +275,12 @@ where let protocol = tokio::time::timeout(self.timeout, async move { let request = swap_setup::read_cbor_message::(&mut substream) .await - .map_err(Error::Io)?; + .context("Failed to read spot price request")?; + let wallet_snapshot = sender .send_receive(request.btc) .await - .map_err(|e| Error::WalletSnapshotFailed(anyhow!(e)))?; + .context("Failed to receive wallet snapshot")?; // wrap all of these into another future so we can `return` from all the // different blocks @@ -334,24 +332,16 @@ where Ok(xmr) }; - let xmr = match validate.await { - Ok(xmr) => { - swap_setup::write_cbor_message(&mut substream, SpotPriceResponse::Xmr(xmr)) - .await - .map_err(Error::Io)?; + let result = validate.await; - xmr - } - Err(e) => { - swap_setup::write_cbor_message( - &mut substream, - SpotPriceResponse::Error(e.to_error_response()), - ) - .await - .map_err(Error::Io)?; - return Err(e); - } - }; + swap_setup::write_cbor_message( + &mut substream, + SpotPriceResponse::from_result_ref(&result), + ) + .await + .context("Failed to write spot price response")?; + + let xmr = result?; let state0 = State0::new( request.btc, @@ -366,35 +356,32 @@ where let message0 = swap_setup::read_cbor_message::(&mut substream) .await - .context("Failed to deserialize message0") - .map_err(Error::Io)?; - let (swap_id, state1) = state0.receive(message0).map_err(Error::Io)?; + .context("Failed to read message0")?; + let (swap_id, state1) = state0 + .receive(message0) + .context("Failed to transition state0 -> state1 using message0")?; swap_setup::write_cbor_message(&mut substream, state1.next_message()) .await - .map_err(Error::Io)?; + .context("Failed to send message1")?; let message2 = swap_setup::read_cbor_message::(&mut substream) .await - .context("Failed to deserialize message2") - .map_err(Error::Io)?; + .context("Failed to read message2")?; let state2 = state1 .receive(message2) - .context("Failed to receive Message2") - .map_err(Error::Io)?; + .context("Failed to transition state1 -> state2 using message2")?; swap_setup::write_cbor_message(&mut substream, state2.next_message()) .await - .map_err(Error::Io)?; + .context("Failed to send message3")?; let message4 = swap_setup::read_cbor_message::(&mut substream) .await - .context("Failed to deserialize message4") - .map_err(Error::Io)?; + .context("Failed to read message4")?; let state3 = state2 .receive(message4) - .context("Failed to receive Message4") - .map_err(Error::Io)?; + .context("Failed to transition state2 -> state3 using message4")?; Ok((swap_id, state3)) }); @@ -402,8 +389,8 @@ where let max_seconds = self.timeout.as_secs(); self.inbound_stream = OptionFuture::from(Some( async move { - protocol.await.map_err(|_| Error::Timeout { - seconds: max_seconds, + protocol.await.with_context(|| { + format!("Failed to complete execution setup within {}s", max_seconds) })? } .boxed(), @@ -413,7 +400,7 @@ where } fn inject_fully_negotiated_outbound(&mut self, _: Void, _: Self::OutboundOpenInfo) { - unreachable!("Alice does not support outbound in the hanlder") + unreachable!("Alice does not support outbound in the handler") } fn inject_event(&mut self, _: Self::InEvent) { @@ -435,7 +422,7 @@ where #[allow(clippy::type_complexity)] fn poll( &mut self, - cx: &mut Context<'_>, + cx: &mut std::task::Context<'_>, ) -> Poll< ProtocolsHandlerEvent< Self::OutboundProtocol, @@ -460,8 +447,15 @@ where } } -// TODO: Differentiate between errors that we send back and shit that happens on -// our side (IO, timeout) +impl SpotPriceResponse { + pub fn from_result_ref(result: &Result) -> Self { + match result { + Ok(amount) => SpotPriceResponse::Xmr(*amount), + Err(error) => SpotPriceResponse::Error(error.to_error_response()), + } + } +} + #[derive(Debug, thiserror::Error)] pub enum Error { #[error("ASB is running in resume-only mode")] @@ -490,12 +484,6 @@ pub enum Error { cli: BlockchainNetwork, asb: BlockchainNetwork, }, - #[error("Io Error")] - Io(#[source] anyhow::Error), - #[error("Failed to request wallet snapshot")] - WalletSnapshotFailed(#[source] anyhow::Error), - #[error("Failed to complete execution setup within {seconds}s")] - Timeout { seconds: u64 }, } impl Error { @@ -517,11 +505,9 @@ impl Error { asb: *asb, } } - Error::LatestRateFetchFailed(_) - | Error::SellQuoteCalculationFailed(_) - | Error::WalletSnapshotFailed(_) - | Error::Timeout { .. } - | Error::Io(_) => SpotPriceError::Other, + Error::LatestRateFetchFailed(_) | Error::SellQuoteCalculationFailed(_) => { + SpotPriceError::Other + } } } }