mirror of
https://github.com/comit-network/xmr-btc-swap.git
synced 2024-11-17 15:26:14 +00:00
Properly handle concurrent messages to and from peers
Previously, we were forwarding incoming messages from peers to all swaps that were currently running. That is obviously wrong. The new design scopes an `EventLoopHandle` to a specific PeerId to avoid this problem.
This commit is contained in:
parent
95acbc6277
commit
a57f88d1b4
@ -4,7 +4,6 @@ use crate::monero::monero_private_key;
|
|||||||
use crate::protocol::alice;
|
use crate::protocol::alice;
|
||||||
use crate::protocol::alice::AliceState;
|
use crate::protocol::alice::AliceState;
|
||||||
use ::bitcoin::hashes::core::fmt::Display;
|
use ::bitcoin::hashes::core::fmt::Display;
|
||||||
use libp2p::PeerId;
|
|
||||||
use monero_rpc::wallet::BlockHeight;
|
use monero_rpc::wallet::BlockHeight;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
@ -15,13 +14,9 @@ use serde::{Deserialize, Serialize};
|
|||||||
pub enum Alice {
|
pub enum Alice {
|
||||||
Started {
|
Started {
|
||||||
state3: alice::State3,
|
state3: alice::State3,
|
||||||
#[serde(with = "crate::serde_peer_id")]
|
|
||||||
bob_peer_id: PeerId,
|
|
||||||
},
|
},
|
||||||
BtcLocked {
|
BtcLocked {
|
||||||
state3: alice::State3,
|
state3: alice::State3,
|
||||||
#[serde(with = "crate::serde_peer_id")]
|
|
||||||
bob_peer_id: PeerId,
|
|
||||||
},
|
},
|
||||||
XmrLocked {
|
XmrLocked {
|
||||||
monero_wallet_restore_blockheight: BlockHeight,
|
monero_wallet_restore_blockheight: BlockHeight,
|
||||||
@ -64,19 +59,11 @@ pub enum AliceEndState {
|
|||||||
impl From<&AliceState> for Alice {
|
impl From<&AliceState> for Alice {
|
||||||
fn from(alice_state: &AliceState) -> Self {
|
fn from(alice_state: &AliceState) -> Self {
|
||||||
match alice_state {
|
match alice_state {
|
||||||
AliceState::Started {
|
AliceState::Started { state3 } => Alice::Started {
|
||||||
state3,
|
|
||||||
bob_peer_id,
|
|
||||||
} => Alice::Started {
|
|
||||||
state3: state3.as_ref().clone(),
|
state3: state3.as_ref().clone(),
|
||||||
bob_peer_id: *bob_peer_id,
|
|
||||||
},
|
},
|
||||||
AliceState::BtcLocked {
|
AliceState::BtcLocked { state3 } => Alice::BtcLocked {
|
||||||
state3,
|
|
||||||
bob_peer_id,
|
|
||||||
} => Alice::BtcLocked {
|
|
||||||
state3: state3.as_ref().clone(),
|
state3: state3.as_ref().clone(),
|
||||||
bob_peer_id: *bob_peer_id,
|
|
||||||
},
|
},
|
||||||
AliceState::XmrLocked {
|
AliceState::XmrLocked {
|
||||||
monero_wallet_restore_blockheight,
|
monero_wallet_restore_blockheight,
|
||||||
@ -137,18 +124,10 @@ impl From<&AliceState> for Alice {
|
|||||||
impl From<Alice> for AliceState {
|
impl From<Alice> for AliceState {
|
||||||
fn from(db_state: Alice) -> Self {
|
fn from(db_state: Alice) -> Self {
|
||||||
match db_state {
|
match db_state {
|
||||||
Alice::Started {
|
Alice::Started { state3 } => AliceState::Started {
|
||||||
state3,
|
|
||||||
bob_peer_id,
|
|
||||||
} => AliceState::Started {
|
|
||||||
bob_peer_id,
|
|
||||||
state3: Box::new(state3),
|
state3: Box::new(state3),
|
||||||
},
|
},
|
||||||
Alice::BtcLocked {
|
Alice::BtcLocked { state3 } => AliceState::BtcLocked {
|
||||||
state3,
|
|
||||||
bob_peer_id,
|
|
||||||
} => AliceState::BtcLocked {
|
|
||||||
bob_peer_id,
|
|
||||||
state3: Box::new(state3),
|
state3: Box::new(state3),
|
||||||
},
|
},
|
||||||
Alice::XmrLocked {
|
Alice::XmrLocked {
|
||||||
|
@ -30,4 +30,3 @@ pub mod seed;
|
|||||||
pub mod trace;
|
pub mod trace;
|
||||||
|
|
||||||
mod monero_ext;
|
mod monero_ext;
|
||||||
mod serde_peer_id;
|
|
||||||
|
@ -32,6 +32,7 @@ pub enum OutEvent {
|
|||||||
EncryptedSignature {
|
EncryptedSignature {
|
||||||
msg: Box<EncryptedSignature>,
|
msg: Box<EncryptedSignature>,
|
||||||
channel: ResponseChannel<()>,
|
channel: ResponseChannel<()>,
|
||||||
|
peer: PeerId,
|
||||||
},
|
},
|
||||||
ResponseSent, // Same variant is used for all messages as no processing is done
|
ResponseSent, // Same variant is used for all messages as no processing is done
|
||||||
Failure(Error),
|
Failure(Error),
|
||||||
@ -140,9 +141,10 @@ impl From<encrypted_signature::OutEvent> for OutEvent {
|
|||||||
fn from(event: encrypted_signature::OutEvent) -> Self {
|
fn from(event: encrypted_signature::OutEvent) -> Self {
|
||||||
use crate::protocol::alice::encrypted_signature::OutEvent::*;
|
use crate::protocol::alice::encrypted_signature::OutEvent::*;
|
||||||
match event {
|
match event {
|
||||||
MsgReceived { msg, channel } => OutEvent::EncryptedSignature {
|
MsgReceived { msg, channel, peer } => OutEvent::EncryptedSignature {
|
||||||
msg: Box::new(msg),
|
msg: Box::new(msg),
|
||||||
channel,
|
channel,
|
||||||
|
peer,
|
||||||
},
|
},
|
||||||
AckSent => OutEvent::ResponseSent,
|
AckSent => OutEvent::ResponseSent,
|
||||||
Failure(err) => OutEvent::Failure(err.context("Failure with Encrypted Signature")),
|
Failure(err) => OutEvent::Failure(err.context("Failure with Encrypted Signature")),
|
||||||
|
@ -5,7 +5,7 @@ use libp2p::request_response::{
|
|||||||
ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent,
|
ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent,
|
||||||
RequestResponseMessage, ResponseChannel,
|
RequestResponseMessage, ResponseChannel,
|
||||||
};
|
};
|
||||||
use libp2p::NetworkBehaviour;
|
use libp2p::{NetworkBehaviour, PeerId};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tracing::debug;
|
use tracing::debug;
|
||||||
|
|
||||||
@ -14,6 +14,7 @@ pub enum OutEvent {
|
|||||||
MsgReceived {
|
MsgReceived {
|
||||||
msg: EncryptedSignature,
|
msg: EncryptedSignature,
|
||||||
channel: ResponseChannel<()>,
|
channel: ResponseChannel<()>,
|
||||||
|
peer: PeerId,
|
||||||
},
|
},
|
||||||
AckSent,
|
AckSent,
|
||||||
Failure(Error),
|
Failure(Error),
|
||||||
@ -67,6 +68,7 @@ impl From<RequestResponseEvent<EncryptedSignature, ()>> for OutEvent {
|
|||||||
OutEvent::MsgReceived {
|
OutEvent::MsgReceived {
|
||||||
msg: request,
|
msg: request,
|
||||||
channel,
|
channel,
|
||||||
|
peer,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
RequestResponseEvent::Message {
|
RequestResponseEvent::Message {
|
||||||
|
@ -9,13 +9,16 @@ use crate::protocol::bob::EncryptedSignature;
|
|||||||
use crate::seed::Seed;
|
use crate::seed::Seed;
|
||||||
use crate::{bitcoin, kraken, monero};
|
use crate::{bitcoin, kraken, monero};
|
||||||
use anyhow::{bail, Context, Result};
|
use anyhow::{bail, Context, Result};
|
||||||
|
use futures::future;
|
||||||
|
use futures::future::{BoxFuture, FutureExt};
|
||||||
|
use futures::stream::{FuturesUnordered, StreamExt};
|
||||||
use libp2p::core::Multiaddr;
|
use libp2p::core::Multiaddr;
|
||||||
use libp2p::futures::FutureExt;
|
|
||||||
use libp2p::{PeerId, Swarm};
|
use libp2p::{PeerId, Swarm};
|
||||||
use rand::rngs::OsRng;
|
use rand::rngs::OsRng;
|
||||||
|
use std::collections::HashMap;
|
||||||
use std::convert::Infallible;
|
use std::convert::Infallible;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::{broadcast, mpsc};
|
use tokio::sync::{mpsc, oneshot};
|
||||||
use tracing::{debug, error, trace};
|
use tracing::{debug, error, trace};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
@ -30,19 +33,19 @@ pub struct EventLoop<RS> {
|
|||||||
latest_rate: RS,
|
latest_rate: RS,
|
||||||
max_buy: bitcoin::Amount,
|
max_buy: bitcoin::Amount,
|
||||||
|
|
||||||
recv_encrypted_signature: broadcast::Sender<EncryptedSignature>,
|
/// Stores a sender per peer for incoming [`EncryptedSignature`]s.
|
||||||
send_transfer_proof: mpsc::Receiver<(PeerId, TransferProof)>,
|
recv_encrypted_signature: HashMap<PeerId, oneshot::Sender<EncryptedSignature>>,
|
||||||
|
/// Stores a list of futures, waiting for transfer proof which will be sent
|
||||||
// Only used to produce new handles
|
/// to the given peer.
|
||||||
send_transfer_proof_sender: mpsc::Sender<(PeerId, TransferProof)>,
|
send_transfer_proof: FuturesUnordered<BoxFuture<'static, Result<(PeerId, TransferProof)>>>,
|
||||||
|
|
||||||
swap_sender: mpsc::Sender<Swap>,
|
swap_sender: mpsc::Sender<Swap>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct EventLoopHandle {
|
pub struct EventLoopHandle {
|
||||||
recv_encrypted_signature: broadcast::Receiver<EncryptedSignature>,
|
recv_encrypted_signature: Option<oneshot::Receiver<EncryptedSignature>>,
|
||||||
send_transfer_proof: mpsc::Sender<(PeerId, TransferProof)>,
|
send_transfer_proof: Option<oneshot::Sender<TransferProof>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<LR> EventLoop<LR>
|
impl<LR> EventLoop<LR>
|
||||||
@ -74,8 +77,6 @@ where
|
|||||||
Swarm::listen_on(&mut swarm, listen_address.clone())
|
Swarm::listen_on(&mut swarm, listen_address.clone())
|
||||||
.with_context(|| format!("Address is not supported: {:#}", listen_address))?;
|
.with_context(|| format!("Address is not supported: {:#}", listen_address))?;
|
||||||
|
|
||||||
let recv_encrypted_signature = BroadcastChannels::default();
|
|
||||||
let send_transfer_proof = MpscChannels::default();
|
|
||||||
let swap_channel = MpscChannels::default();
|
let swap_channel = MpscChannels::default();
|
||||||
|
|
||||||
let event_loop = EventLoop {
|
let event_loop = EventLoop {
|
||||||
@ -86,30 +87,26 @@ where
|
|||||||
monero_wallet,
|
monero_wallet,
|
||||||
db,
|
db,
|
||||||
latest_rate,
|
latest_rate,
|
||||||
recv_encrypted_signature: recv_encrypted_signature.sender,
|
|
||||||
send_transfer_proof: send_transfer_proof.receiver,
|
|
||||||
send_transfer_proof_sender: send_transfer_proof.sender,
|
|
||||||
swap_sender: swap_channel.sender,
|
swap_sender: swap_channel.sender,
|
||||||
max_buy,
|
max_buy,
|
||||||
|
recv_encrypted_signature: Default::default(),
|
||||||
|
send_transfer_proof: Default::default(),
|
||||||
};
|
};
|
||||||
Ok((event_loop, swap_channel.receiver))
|
Ok((event_loop, swap_channel.receiver))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new_handle(&self) -> EventLoopHandle {
|
|
||||||
EventLoopHandle {
|
|
||||||
recv_encrypted_signature: self.recv_encrypted_signature.subscribe(),
|
|
||||||
send_transfer_proof: self.send_transfer_proof_sender.clone(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn peer_id(&self) -> PeerId {
|
pub fn peer_id(&self) -> PeerId {
|
||||||
self.peer_id
|
self.peer_id
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run(mut self) {
|
pub async fn run(mut self) {
|
||||||
|
// ensure that the send_transfer_proof stream is NEVER empty, otherwise it will
|
||||||
|
// terminate forever.
|
||||||
|
self.send_transfer_proof.push(future::pending().boxed());
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
swarm_event = self.swarm.next().fuse() => {
|
swarm_event = self.swarm.next() => {
|
||||||
match swarm_event {
|
match swarm_event {
|
||||||
OutEvent::ConnectionEstablished(alice) => {
|
OutEvent::ConnectionEstablished(alice) => {
|
||||||
debug!("Connection Established with {}", alice);
|
debug!("Connection Established with {}", alice);
|
||||||
@ -164,9 +161,17 @@ where
|
|||||||
OutEvent::TransferProofAcknowledged => {
|
OutEvent::TransferProofAcknowledged => {
|
||||||
trace!("Bob acknowledged transfer proof");
|
trace!("Bob acknowledged transfer proof");
|
||||||
}
|
}
|
||||||
OutEvent::EncryptedSignature{ msg, channel } => {
|
OutEvent::EncryptedSignature{ msg, channel, peer } => {
|
||||||
let _ = self.recv_encrypted_signature.send(*msg);
|
match self.recv_encrypted_signature.remove(&peer) {
|
||||||
// Send back empty response so that the request/response protocol completes.
|
Some(sender) => {
|
||||||
|
// this failing just means the receiver is no longer interested ...
|
||||||
|
let _ = sender.send(*msg);
|
||||||
|
},
|
||||||
|
None => {
|
||||||
|
tracing::warn!(%peer, "No sender for encrypted signature, maybe already handled?")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if let Err(error) = self.swarm.send_encrypted_signature_ack(channel) {
|
if let Err(error) = self.swarm.send_encrypted_signature_ack(channel) {
|
||||||
error!("Failed to send Encrypted Signature ack: {:?}", error);
|
error!("Failed to send Encrypted Signature ack: {:?}", error);
|
||||||
}
|
}
|
||||||
@ -177,11 +182,19 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
transfer_proof = self.send_transfer_proof.recv().fuse() => {
|
next_transfer_proof = self.send_transfer_proof.next() => {
|
||||||
if let Some((bob_peer_id, msg)) = transfer_proof {
|
match next_transfer_proof {
|
||||||
self.swarm.send_transfer_proof(bob_peer_id, msg);
|
Some(Ok((peer, transfer_proof))) => {
|
||||||
|
self.swarm.send_transfer_proof(peer, transfer_proof);
|
||||||
|
},
|
||||||
|
Some(Err(_)) => {
|
||||||
|
tracing::debug!("A swap stopped without sending a transfer proof");
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
unreachable!("stream of transfer proof receivers must never terminate")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
},
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -230,11 +243,10 @@ where
|
|||||||
|
|
||||||
async fn handle_execution_setup_done(&mut self, bob_peer_id: PeerId, state3: State3) {
|
async fn handle_execution_setup_done(&mut self, bob_peer_id: PeerId, state3: State3) {
|
||||||
let swap_id = Uuid::new_v4();
|
let swap_id = Uuid::new_v4();
|
||||||
let handle = self.new_handle();
|
let handle = self.new_handle(bob_peer_id);
|
||||||
|
|
||||||
let initial_state = AliceState::Started {
|
let initial_state = AliceState::Started {
|
||||||
state3: Box::new(state3),
|
state3: Box::new(state3),
|
||||||
bob_peer_id,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let swap = Swap {
|
let swap = Swap {
|
||||||
@ -251,6 +263,29 @@ where
|
|||||||
tracing::warn!(%swap_id, "Swap cannot be spawned: {}", error);
|
tracing::warn!(%swap_id, "Swap cannot be spawned: {}", error);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Create a new [`EventLoopHandle`] that is scoped for communication with
|
||||||
|
/// the given peer.
|
||||||
|
fn new_handle(&mut self, peer: PeerId) -> EventLoopHandle {
|
||||||
|
let (send_transfer_proof_sender, send_transfer_proof_receiver) = oneshot::channel();
|
||||||
|
let (recv_enc_sig_sender, recv_enc_sig_receiver) = oneshot::channel();
|
||||||
|
|
||||||
|
self.recv_encrypted_signature
|
||||||
|
.insert(peer, recv_enc_sig_sender);
|
||||||
|
self.send_transfer_proof.push(
|
||||||
|
async move {
|
||||||
|
let transfer_proof = send_transfer_proof_receiver.await?;
|
||||||
|
|
||||||
|
Ok((peer, transfer_proof))
|
||||||
|
}
|
||||||
|
.boxed(),
|
||||||
|
);
|
||||||
|
|
||||||
|
EventLoopHandle {
|
||||||
|
recv_encrypted_signature: Some(recv_enc_sig_receiver),
|
||||||
|
send_transfer_proof: Some(send_transfer_proof_sender),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait LatestRate {
|
pub trait LatestRate {
|
||||||
@ -277,13 +312,24 @@ impl LatestRate for kraken::RateUpdateStream {
|
|||||||
|
|
||||||
impl EventLoopHandle {
|
impl EventLoopHandle {
|
||||||
pub async fn recv_encrypted_signature(&mut self) -> Result<EncryptedSignature> {
|
pub async fn recv_encrypted_signature(&mut self) -> Result<EncryptedSignature> {
|
||||||
self.recv_encrypted_signature
|
let signature = self
|
||||||
.recv()
|
.recv_encrypted_signature
|
||||||
.await
|
.take()
|
||||||
.context("Failed to receive Bitcoin encrypted signature from Bob")
|
.context("Encrypted signature was already received")?
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(signature)
|
||||||
}
|
}
|
||||||
pub async fn send_transfer_proof(&mut self, bob: PeerId, msg: TransferProof) -> Result<()> {
|
pub async fn send_transfer_proof(&mut self, msg: TransferProof) -> Result<()> {
|
||||||
let _ = self.send_transfer_proof.send((bob, msg)).await?;
|
if self
|
||||||
|
.send_transfer_proof
|
||||||
|
.take()
|
||||||
|
.context("Transfer proof was already sent")?
|
||||||
|
.send(msg)
|
||||||
|
.is_err()
|
||||||
|
{
|
||||||
|
bail!("Failed to send transfer proof, receiver no longer listening?")
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -308,21 +354,3 @@ impl<T> Default for MpscChannels<T> {
|
|||||||
MpscChannels { sender, receiver }
|
MpscChannels { sender, receiver }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(missing_debug_implementations)]
|
|
||||||
struct BroadcastChannels<T>
|
|
||||||
where
|
|
||||||
T: Clone,
|
|
||||||
{
|
|
||||||
sender: broadcast::Sender<T>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> Default for BroadcastChannels<T>
|
|
||||||
where
|
|
||||||
T: Clone,
|
|
||||||
{
|
|
||||||
fn default() -> Self {
|
|
||||||
let (sender, _receiver) = broadcast::channel(100);
|
|
||||||
BroadcastChannels { sender }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -7,7 +7,6 @@ use crate::protocol::bob::{Message0, Message2, Message4};
|
|||||||
use crate::protocol::CROSS_CURVE_PROOF_SYSTEM;
|
use crate::protocol::CROSS_CURVE_PROOF_SYSTEM;
|
||||||
use crate::{bitcoin, monero};
|
use crate::{bitcoin, monero};
|
||||||
use anyhow::{anyhow, bail, Context, Result};
|
use anyhow::{anyhow, bail, Context, Result};
|
||||||
use libp2p::PeerId;
|
|
||||||
use monero_rpc::wallet::BlockHeight;
|
use monero_rpc::wallet::BlockHeight;
|
||||||
use rand::{CryptoRng, RngCore};
|
use rand::{CryptoRng, RngCore};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
@ -17,11 +16,9 @@ use std::fmt;
|
|||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum AliceState {
|
pub enum AliceState {
|
||||||
Started {
|
Started {
|
||||||
bob_peer_id: PeerId,
|
|
||||||
state3: Box<State3>,
|
state3: Box<State3>,
|
||||||
},
|
},
|
||||||
BtcLocked {
|
BtcLocked {
|
||||||
bob_peer_id: PeerId,
|
|
||||||
state3: Box<State3>,
|
state3: Box<State3>,
|
||||||
},
|
},
|
||||||
XmrLocked {
|
XmrLocked {
|
||||||
|
@ -7,10 +7,8 @@ use crate::protocol::alice::TransferProof;
|
|||||||
use crate::{bitcoin, monero};
|
use crate::{bitcoin, monero};
|
||||||
use anyhow::{bail, Context, Result};
|
use anyhow::{bail, Context, Result};
|
||||||
use futures::pin_mut;
|
use futures::pin_mut;
|
||||||
use libp2p::PeerId;
|
|
||||||
|
|
||||||
pub async fn lock_xmr(
|
pub async fn lock_xmr(
|
||||||
bob_peer_id: PeerId,
|
|
||||||
state3: alice::State3,
|
state3: alice::State3,
|
||||||
event_loop_handle: &mut EventLoopHandle,
|
event_loop_handle: &mut EventLoopHandle,
|
||||||
monero_wallet: &monero::Wallet,
|
monero_wallet: &monero::Wallet,
|
||||||
@ -30,7 +28,7 @@ pub async fn lock_xmr(
|
|||||||
// Otherwise Alice might publish the lock tx twice!
|
// Otherwise Alice might publish the lock tx twice!
|
||||||
|
|
||||||
event_loop_handle
|
event_loop_handle
|
||||||
.send_transfer_proof(bob_peer_id, TransferProof {
|
.send_transfer_proof(TransferProof {
|
||||||
tx_lock_proof: transfer_proof,
|
tx_lock_proof: transfer_proof,
|
||||||
})
|
})
|
||||||
.await?;
|
.await?;
|
||||||
|
@ -76,10 +76,7 @@ async fn run_until_internal(
|
|||||||
Ok(state)
|
Ok(state)
|
||||||
} else {
|
} else {
|
||||||
match state {
|
match state {
|
||||||
AliceState::Started {
|
AliceState::Started { state3 } => {
|
||||||
state3,
|
|
||||||
bob_peer_id,
|
|
||||||
} => {
|
|
||||||
timeout(
|
timeout(
|
||||||
execution_params.bob_time_to_act,
|
execution_params.bob_time_to_act,
|
||||||
bitcoin_wallet
|
bitcoin_wallet
|
||||||
@ -94,10 +91,7 @@ async fn run_until_internal(
|
|||||||
})
|
})
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let state = AliceState::BtcLocked {
|
let state = AliceState::BtcLocked { state3 };
|
||||||
bob_peer_id,
|
|
||||||
state3,
|
|
||||||
};
|
|
||||||
|
|
||||||
let db_state = (&state).into();
|
let db_state = (&state).into();
|
||||||
db.insert_latest_state(swap_id, database::Swap::Alice(db_state))
|
db.insert_latest_state(swap_id, database::Swap::Alice(db_state))
|
||||||
@ -114,21 +108,12 @@ async fn run_until_internal(
|
|||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
AliceState::BtcLocked {
|
AliceState::BtcLocked { state3 } => {
|
||||||
bob_peer_id,
|
|
||||||
state3,
|
|
||||||
} => {
|
|
||||||
// Record the current monero wallet block height so we don't have to scan from
|
// Record the current monero wallet block height so we don't have to scan from
|
||||||
// block 0 for scenarios where we create a refund wallet.
|
// block 0 for scenarios where we create a refund wallet.
|
||||||
let monero_wallet_restore_blockheight = monero_wallet.block_height().await?;
|
let monero_wallet_restore_blockheight = monero_wallet.block_height().await?;
|
||||||
|
|
||||||
lock_xmr(
|
lock_xmr(*state3.clone(), &mut event_loop_handle, &monero_wallet).await?;
|
||||||
bob_peer_id,
|
|
||||||
*state3.clone(),
|
|
||||||
&mut event_loop_handle,
|
|
||||||
&monero_wallet,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let state = AliceState::XmrLocked {
|
let state = AliceState::XmrLocked {
|
||||||
state3,
|
state3,
|
||||||
|
@ -1,49 +0,0 @@
|
|||||||
//! A serde module that defines how we want to serialize PeerIds on the
|
|
||||||
//! HTTP-API.
|
|
||||||
|
|
||||||
use libp2p::PeerId;
|
|
||||||
use serde::de::Error;
|
|
||||||
use serde::{Deserialize, Deserializer, Serializer};
|
|
||||||
|
|
||||||
pub fn serialize<S>(peer_id: &PeerId, serializer: S) -> Result<S::Ok, S::Error>
|
|
||||||
where
|
|
||||||
S: Serializer,
|
|
||||||
{
|
|
||||||
let string = peer_id.to_string();
|
|
||||||
serializer.serialize_str(&string)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[allow(dead_code)]
|
|
||||||
pub fn deserialize<'de, D>(deserializer: D) -> Result<PeerId, D::Error>
|
|
||||||
where
|
|
||||||
D: Deserializer<'de>,
|
|
||||||
{
|
|
||||||
let string = String::deserialize(deserializer)?;
|
|
||||||
let peer_id = string.parse().map_err(D::Error::custom)?;
|
|
||||||
|
|
||||||
Ok(peer_id)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
use serde::Serialize;
|
|
||||||
use spectral::prelude::*;
|
|
||||||
|
|
||||||
#[derive(Serialize)]
|
|
||||||
struct SerializablePeerId(#[serde(with = "super")] PeerId);
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn maker_id_serializes_as_expected() {
|
|
||||||
let peer_id = SerializablePeerId(
|
|
||||||
"QmfUfpC2frwFvcDzpspnfZitHt5wct6n4kpG5jzgRdsxkY"
|
|
||||||
.parse()
|
|
||||||
.unwrap(),
|
|
||||||
);
|
|
||||||
|
|
||||||
let got = serde_json::to_string(&peer_id).expect("failed to serialize peer id");
|
|
||||||
|
|
||||||
assert_that(&got)
|
|
||||||
.is_equal_to(r#""QmfUfpC2frwFvcDzpspnfZitHt5wct6n4kpG5jzgRdsxkY""#.to_string());
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user