Merge #780
780: Sqlite database r=rishflab a=rishflab Closes #427, #762, #770 Co-authored-by: rishflab <rishflab@hotmail.com>pull/804/head^2
commit
9ea73a8e66
@ -0,0 +1,25 @@
|
||||
CREATE TABLE if NOT EXISTS swap_states
|
||||
(
|
||||
id INTEGER PRIMARY KEY autoincrement NOT NULL,
|
||||
swap_id TEXT NOT NULL,
|
||||
entered_at TEXT NOT NULL,
|
||||
state TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE if NOT EXISTS monero_addresses
|
||||
(
|
||||
swap_id TEXT PRIMARY KEY NOT NULL,
|
||||
address TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE if NOT EXISTS peers
|
||||
(
|
||||
swap_id TEXT PRIMARY KEY NOT NULL,
|
||||
peer_id TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE if NOT EXISTS peer_addresses
|
||||
(
|
||||
peer_id TEXT NOT NULL,
|
||||
address TEXT NOT NULL
|
||||
);
|
@ -0,0 +1,6 @@
|
||||
# crated temporary DB
|
||||
# run the migration scripts to create the tables
|
||||
# prepare the sqlx-data.json rust mappings
|
||||
DATABASE_URL=sqlite:tempdb cargo sqlx database create
|
||||
DATABASE_URL=sqlite:tempdb cargo sqlx migrate run
|
||||
DATABASE_URL=sqlite:./swap/tempdb cargo sqlx prepare -- --bin swap
|
@ -0,0 +1,139 @@
|
||||
{
|
||||
"db": "SQLite",
|
||||
"081c729a0f1ad6e4ff3e13d6702c946bc4d37d50f40670b4f51d2efcce595aa6": {
|
||||
"query": "\n SELECT peer_id\n FROM peers\n WHERE swap_id = ?\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"name": "peer_id",
|
||||
"ordinal": 0,
|
||||
"type_info": "Text"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Right": 1
|
||||
},
|
||||
"nullable": [
|
||||
false
|
||||
]
|
||||
}
|
||||
},
|
||||
"0ab84c094964968e96a3f2bf590d9ae92227d057386921e0e57165b887de3c75": {
|
||||
"query": "\n insert into peer_addresses (\n peer_id,\n address\n ) values (?, ?);\n ",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Right": 2
|
||||
},
|
||||
"nullable": []
|
||||
}
|
||||
},
|
||||
"1ec38c85e7679b2eb42b3df75d9098772ce44fdb8db3012d3c2410d828b74157": {
|
||||
"query": "\n SELECT swap_id, state\n FROM (\n SELECT max(id), swap_id, state\n FROM swap_states\n GROUP BY swap_id\n )\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"name": "swap_id",
|
||||
"ordinal": 0,
|
||||
"type_info": "Text"
|
||||
},
|
||||
{
|
||||
"name": "state",
|
||||
"ordinal": 1,
|
||||
"type_info": "Text"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Right": 0
|
||||
},
|
||||
"nullable": [
|
||||
false,
|
||||
false
|
||||
]
|
||||
}
|
||||
},
|
||||
"2a356078a41b321234adf2aa385b501749f907f7c422945a8bdda2b6274f5225": {
|
||||
"query": "\n insert into peers (\n swap_id,\n peer_id\n ) values (?, ?);\n ",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Right": 2
|
||||
},
|
||||
"nullable": []
|
||||
}
|
||||
},
|
||||
"50a5764546f69c118fa0b64120da50f51073d36257d49768de99ff863e3511e0": {
|
||||
"query": "\n insert into monero_addresses (\n swap_id,\n address\n ) values (?, ?);\n ",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Right": 2
|
||||
},
|
||||
"nullable": []
|
||||
}
|
||||
},
|
||||
"88f761a4f7a0429cad1df0b1bebb1c0a27b2a45656549b23076d7542cfa21ecf": {
|
||||
"query": "\n SELECT state\n FROM swap_states\n WHERE swap_id = ?\n ORDER BY id desc\n LIMIT 1;\n\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"name": "state",
|
||||
"ordinal": 0,
|
||||
"type_info": "Text"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Right": 1
|
||||
},
|
||||
"nullable": [
|
||||
false
|
||||
]
|
||||
}
|
||||
},
|
||||
"a0eb85d04ee3842c52291dad4d225941d1141af735922fcbc665868997fce304": {
|
||||
"query": "\n SELECT address\n FROM peer_addresses\n WHERE peer_id = ?\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"name": "address",
|
||||
"ordinal": 0,
|
||||
"type_info": "Text"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Right": 1
|
||||
},
|
||||
"nullable": [
|
||||
false
|
||||
]
|
||||
}
|
||||
},
|
||||
"b703032b4ddc627a1124817477e7a8e5014bdc694c36a14053ef3bb2fc0c69b0": {
|
||||
"query": "\n insert into swap_states (\n swap_id,\n entered_at,\n state\n ) values (?, ?, ?);\n ",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Right": 3
|
||||
},
|
||||
"nullable": []
|
||||
}
|
||||
},
|
||||
"ce270dd4a4b9615695a79864240c5401e2122077365e5e5a19408c068c7f9454": {
|
||||
"query": "\n SELECT address\n FROM monero_addresses\n WHERE swap_id = ?\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"name": "address",
|
||||
"ordinal": 0,
|
||||
"type_info": "Text"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Right": 1
|
||||
},
|
||||
"nullable": [
|
||||
false
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,400 @@
|
||||
use crate::database::Swap;
|
||||
use crate::protocol::{Database, State};
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use async_trait::async_trait;
|
||||
use libp2p::{Multiaddr, PeerId};
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde::Serialize;
|
||||
use std::path::Path;
|
||||
use std::str::FromStr;
|
||||
use uuid::Uuid;
|
||||
|
||||
pub use crate::database::alice::Alice;
|
||||
pub use crate::database::bob::Bob;
|
||||
|
||||
pub struct SledDatabase {
|
||||
swaps: sled::Tree,
|
||||
peers: sled::Tree,
|
||||
addresses: sled::Tree,
|
||||
monero_addresses: sled::Tree,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Database for SledDatabase {
|
||||
async fn insert_peer_id(&self, swap_id: Uuid, peer_id: PeerId) -> Result<()> {
|
||||
let peer_id_str = peer_id.to_string();
|
||||
|
||||
let key = serialize(&swap_id)?;
|
||||
let value = serialize(&peer_id_str).context("Could not serialize peer-id")?;
|
||||
|
||||
self.peers.insert(key, value)?;
|
||||
|
||||
self.peers
|
||||
.flush_async()
|
||||
.await
|
||||
.map(|_| ())
|
||||
.context("Could not flush db")
|
||||
}
|
||||
|
||||
async fn get_peer_id(&self, swap_id: Uuid) -> Result<PeerId> {
|
||||
let key = serialize(&swap_id)?;
|
||||
|
||||
let encoded = self
|
||||
.peers
|
||||
.get(&key)?
|
||||
.ok_or_else(|| anyhow!("No peer-id found for swap id {} in database", swap_id))?;
|
||||
|
||||
let peer_id: String = deserialize(&encoded).context("Could not deserialize peer-id")?;
|
||||
Ok(PeerId::from_str(peer_id.as_str())?)
|
||||
}
|
||||
|
||||
async fn insert_monero_address(&self, swap_id: Uuid, address: monero::Address) -> Result<()> {
|
||||
let key = swap_id.as_bytes();
|
||||
let value = serialize(&address)?;
|
||||
|
||||
self.monero_addresses.insert(key, value)?;
|
||||
|
||||
self.monero_addresses
|
||||
.flush_async()
|
||||
.await
|
||||
.map(|_| ())
|
||||
.context("Could not flush db")
|
||||
}
|
||||
|
||||
async fn get_monero_address(&self, swap_id: Uuid) -> Result<monero::Address> {
|
||||
let encoded = self
|
||||
.monero_addresses
|
||||
.get(swap_id.as_bytes())?
|
||||
.ok_or_else(|| {
|
||||
anyhow!(
|
||||
"No Monero address found for swap id {} in database",
|
||||
swap_id
|
||||
)
|
||||
})?;
|
||||
|
||||
let monero_address = deserialize(&encoded)?;
|
||||
|
||||
Ok(monero_address)
|
||||
}
|
||||
|
||||
async fn insert_address(&self, peer_id: PeerId, address: Multiaddr) -> Result<()> {
|
||||
let key = peer_id.to_bytes();
|
||||
|
||||
let existing_addresses = self.addresses.get(&key)?;
|
||||
|
||||
let new_addresses = {
|
||||
let existing_addresses = existing_addresses.clone();
|
||||
|
||||
Some(match existing_addresses {
|
||||
Some(encoded) => {
|
||||
let mut addresses = deserialize::<Vec<Multiaddr>>(&encoded)?;
|
||||
addresses.push(address);
|
||||
|
||||
serialize(&addresses)?
|
||||
}
|
||||
None => serialize(&[address])?,
|
||||
})
|
||||
};
|
||||
|
||||
self.addresses
|
||||
.compare_and_swap(key, existing_addresses, new_addresses)??;
|
||||
|
||||
self.addresses
|
||||
.flush_async()
|
||||
.await
|
||||
.map(|_| ())
|
||||
.context("Could not flush db")
|
||||
}
|
||||
|
||||
async fn get_addresses(&self, peer_id: PeerId) -> Result<Vec<Multiaddr>> {
|
||||
let key = peer_id.to_bytes();
|
||||
|
||||
let addresses = match self.addresses.get(&key)? {
|
||||
Some(encoded) => deserialize(&encoded).context("Failed to deserialize addresses")?,
|
||||
None => vec![],
|
||||
};
|
||||
|
||||
Ok(addresses)
|
||||
}
|
||||
|
||||
async fn insert_latest_state(&self, swap_id: Uuid, state: State) -> Result<()> {
|
||||
let key = serialize(&swap_id)?;
|
||||
let swap = Swap::from(state);
|
||||
let new_value = serialize(&swap).context("Could not serialize new state value")?;
|
||||
|
||||
let old_value = self.swaps.get(&key)?;
|
||||
|
||||
self.swaps
|
||||
.compare_and_swap(key, old_value, Some(new_value))
|
||||
.context("Could not write in the DB")?
|
||||
.context("Stored swap somehow changed, aborting saving")?;
|
||||
|
||||
self.swaps
|
||||
.flush_async()
|
||||
.await
|
||||
.map(|_| ())
|
||||
.context("Could not flush db")
|
||||
}
|
||||
|
||||
async fn get_state(&self, swap_id: Uuid) -> Result<State> {
|
||||
let key = serialize(&swap_id)?;
|
||||
|
||||
let encoded = self
|
||||
.swaps
|
||||
.get(&key)?
|
||||
.ok_or_else(|| anyhow!("Swap with id {} not found in database", swap_id))?;
|
||||
|
||||
let swap = deserialize::<Swap>(&encoded).context("Could not deserialize state")?;
|
||||
|
||||
let state = State::from(swap);
|
||||
|
||||
Ok(state)
|
||||
}
|
||||
|
||||
async fn all(&self) -> Result<Vec<(Uuid, State)>> {
|
||||
self.all_iter().collect()
|
||||
}
|
||||
}
|
||||
|
||||
impl SledDatabase {
|
||||
pub async fn open(path: &Path) -> Result<Self> {
|
||||
tracing::debug!("Opening database at {}", path.display());
|
||||
|
||||
let db =
|
||||
sled::open(path).with_context(|| format!("Could not open the DB at {:?}", path))?;
|
||||
|
||||
let swaps = db.open_tree("swaps")?;
|
||||
let peers = db.open_tree("peers")?;
|
||||
let addresses = db.open_tree("addresses")?;
|
||||
let monero_addresses = db.open_tree("monero_addresses")?;
|
||||
|
||||
Ok(SledDatabase {
|
||||
swaps,
|
||||
peers,
|
||||
addresses,
|
||||
monero_addresses,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn get_all_peers(&self) -> impl Iterator<Item = Result<(Uuid, PeerId)>> {
|
||||
self.peers.iter().map(|item| {
|
||||
let (key, value) = item.context("Failed to retrieve peer id from DB")?;
|
||||
|
||||
let swap_id = deserialize::<Uuid>(&key)?;
|
||||
let peer_id_bytes =
|
||||
deserialize::<Vec<u8>>(&value).context("Failed to deserialize swap")?;
|
||||
|
||||
let peer_id = PeerId::from_bytes(&peer_id_bytes)?;
|
||||
|
||||
Ok((swap_id, peer_id))
|
||||
})
|
||||
}
|
||||
|
||||
pub fn get_all_addresses(&self) -> impl Iterator<Item = Result<(PeerId, Vec<Multiaddr>)>> {
|
||||
self.addresses.iter().map(|item| {
|
||||
let (key, value) = item.context("Failed to retrieve peer address from DB")?;
|
||||
|
||||
let peer_id_bytes = deserialize::<Vec<u8>>(&key)?;
|
||||
let addr =
|
||||
deserialize::<Vec<Multiaddr>>(&value).context("Failed to deserialize swap")?;
|
||||
|
||||
let peer_id = PeerId::from_bytes(&peer_id_bytes)?;
|
||||
|
||||
Ok((peer_id, addr))
|
||||
})
|
||||
}
|
||||
|
||||
pub fn get_all_monero_addresses(
|
||||
&self,
|
||||
) -> impl Iterator<Item = Result<(Uuid, monero::Address)>> {
|
||||
self.monero_addresses.iter().map(|item| {
|
||||
let (key, value) = item.context("Failed to retrieve monero address from DB")?;
|
||||
|
||||
let swap_id = deserialize::<Uuid>(&key)?;
|
||||
let addr =
|
||||
deserialize::<monero::Address>(&value).context("Failed to deserialize swap")?;
|
||||
|
||||
Ok((swap_id, addr))
|
||||
})
|
||||
}
|
||||
|
||||
fn all_iter(&self) -> impl Iterator<Item = Result<(Uuid, State)>> {
|
||||
self.swaps.iter().map(|item| {
|
||||
let (key, value) = item.context("Failed to retrieve swap from DB")?;
|
||||
|
||||
let swap_id = deserialize::<Uuid>(&key)?;
|
||||
let swap = deserialize::<Swap>(&value).context("Failed to deserialize swap")?;
|
||||
|
||||
let state = State::from(swap);
|
||||
|
||||
Ok((swap_id, state))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub fn serialize<T>(t: &T) -> Result<Vec<u8>>
|
||||
where
|
||||
T: Serialize,
|
||||
{
|
||||
Ok(serde_cbor::to_vec(t)?)
|
||||
}
|
||||
|
||||
pub fn deserialize<T>(v: &[u8]) -> Result<T>
|
||||
where
|
||||
T: DeserializeOwned,
|
||||
{
|
||||
Ok(serde_cbor::from_slice(&v)?)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::protocol::alice::AliceState;
|
||||
|
||||
#[tokio::test]
|
||||
async fn can_write_and_read_to_multiple_keys() {
|
||||
let db_dir = tempfile::tempdir().unwrap();
|
||||
let db = SledDatabase::open(db_dir.path()).await.unwrap();
|
||||
|
||||
let state_1 = State::from(AliceState::BtcRedeemed);
|
||||
let swap_id_1 = Uuid::new_v4();
|
||||
db.insert_latest_state(swap_id_1, state_1.clone())
|
||||
.await
|
||||
.expect("Failed to save second state");
|
||||
|
||||
let state_2 = State::from(AliceState::BtcPunished);
|
||||
let swap_id_2 = Uuid::new_v4();
|
||||
db.insert_latest_state(swap_id_2, state_2.clone())
|
||||
.await
|
||||
.expect("Failed to save first state");
|
||||
|
||||
let recovered_1 = db
|
||||
.get_state(swap_id_1)
|
||||
.await
|
||||
.expect("Failed to recover first state");
|
||||
|
||||
let recovered_2 = db
|
||||
.get_state(swap_id_2)
|
||||
.await
|
||||
.expect("Failed to recover second state");
|
||||
|
||||
assert_eq!(recovered_1, state_1);
|
||||
assert_eq!(recovered_2, state_2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn can_write_twice_to_one_key() {
|
||||
let db_dir = tempfile::tempdir().unwrap();
|
||||
let db = SledDatabase::open(db_dir.path()).await.unwrap();
|
||||
|
||||
let state = State::from(AliceState::SafelyAborted);
|
||||
|
||||
let swap_id = Uuid::new_v4();
|
||||
db.insert_latest_state(swap_id, state.clone())
|
||||
.await
|
||||
.expect("Failed to save state the first time");
|
||||
let recovered = db
|
||||
.get_state(swap_id)
|
||||
.await
|
||||
.expect("Failed to recover state the first time");
|
||||
|
||||
// We insert and recover twice to ensure database implementation allows the
|
||||
// caller to write to an existing key
|
||||
db.insert_latest_state(swap_id, recovered)
|
||||
.await
|
||||
.expect("Failed to save state the second time");
|
||||
let recovered = db
|
||||
.get_state(swap_id)
|
||||
.await
|
||||
.expect("Failed to recover state the second time");
|
||||
|
||||
assert_eq!(recovered, state);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn can_save_swap_state_and_peer_id_with_same_swap_id() -> Result<()> {
|
||||
let db_dir = tempfile::tempdir().unwrap();
|
||||
let db = SledDatabase::open(db_dir.path()).await.unwrap();
|
||||
|
||||
let alice_id = Uuid::new_v4();
|
||||
let alice_state = State::from(AliceState::BtcPunished);
|
||||
let peer_id = PeerId::random();
|
||||
|
||||
db.insert_latest_state(alice_id, alice_state.clone())
|
||||
.await?;
|
||||
db.insert_peer_id(alice_id, peer_id).await?;
|
||||
|
||||
let loaded_swap = db.get_state(alice_id).await?;
|
||||
let loaded_peer_id = db.get_peer_id(alice_id).await?;
|
||||
|
||||
assert_eq!(alice_state, loaded_swap);
|
||||
assert_eq!(peer_id, loaded_peer_id);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_reopen_db() -> Result<()> {
|
||||
let db_dir = tempfile::tempdir().unwrap();
|
||||
let alice_id = Uuid::new_v4();
|
||||
let alice_state = State::from(AliceState::BtcPunished);
|
||||
|
||||
let peer_id = PeerId::random();
|
||||
|
||||
{
|
||||
let db = SledDatabase::open(db_dir.path()).await.unwrap();
|
||||
db.insert_latest_state(alice_id, alice_state.clone())
|
||||
.await?;
|
||||
db.insert_peer_id(alice_id, peer_id).await?;
|
||||
}
|
||||
|
||||
let db = SledDatabase::open(db_dir.path()).await.unwrap();
|
||||
|
||||
let loaded_swap = db.get_state(alice_id).await?;
|
||||
let loaded_peer_id = db.get_peer_id(alice_id).await?;
|
||||
|
||||
assert_eq!(alice_state, loaded_swap);
|
||||
assert_eq!(peer_id, loaded_peer_id);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn save_and_load_addresses() -> Result<()> {
|
||||
let db_dir = tempfile::tempdir()?;
|
||||
let peer_id = PeerId::random();
|
||||
let home1 = "/ip4/127.0.0.1/tcp/1".parse::<Multiaddr>()?;
|
||||
let home2 = "/ip4/127.0.0.1/tcp/2".parse::<Multiaddr>()?;
|
||||
|
||||
{
|
||||
let db = SledDatabase::open(db_dir.path()).await?;
|
||||
db.insert_address(peer_id, home1.clone()).await?;
|
||||
db.insert_address(peer_id, home2.clone()).await?;
|
||||
}
|
||||
|
||||
let addresses = SledDatabase::open(db_dir.path())
|
||||
.await?
|
||||
.get_addresses(peer_id)
|
||||
.await?;
|
||||
|
||||
assert_eq!(addresses, vec![home1, home2]);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn save_and_load_monero_address() -> Result<()> {
|
||||
let db_dir = tempfile::tempdir()?;
|
||||
let swap_id = Uuid::new_v4();
|
||||
|
||||
SledDatabase::open(db_dir.path()).await?.insert_monero_address(swap_id, "53gEuGZUhP9JMEBZoGaFNzhwEgiG7hwQdMCqFxiyiTeFPmkbt1mAoNybEUvYBKHcnrSgxnVWgZsTvRBaHBNXPa8tHiCU51a".parse()?).await?;
|
||||
let loaded_monero_address = SledDatabase::open(db_dir.path())
|
||||
.await?
|
||||
.get_monero_address(swap_id)
|
||||
.await?;
|
||||
|
||||
assert_eq!(loaded_monero_address.to_string(), "53gEuGZUhP9JMEBZoGaFNzhwEgiG7hwQdMCqFxiyiTeFPmkbt1mAoNybEUvYBKHcnrSgxnVWgZsTvRBaHBNXPa8tHiCU51a");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
@ -0,0 +1,383 @@
|
||||
use crate::database::Swap;
|
||||
use crate::monero::Address;
|
||||
use crate::protocol::{Database, State};
|
||||
use anyhow::{Context, Result};
|
||||
use async_trait::async_trait;
|
||||
use libp2p::{Multiaddr, PeerId};
|
||||
use sqlx::sqlite::Sqlite;
|
||||
use sqlx::{Pool, SqlitePool};
|
||||
use std::path::Path;
|
||||
use std::str::FromStr;
|
||||
use time::OffsetDateTime;
|
||||
use uuid::Uuid;
|
||||
|
||||
pub struct SqliteDatabase {
|
||||
pool: Pool<Sqlite>,
|
||||
}
|
||||
|
||||
impl SqliteDatabase {
|
||||
pub async fn open(path: impl AsRef<Path>) -> Result<Self>
|
||||
where
|
||||
Self: std::marker::Sized,
|
||||
{
|
||||
let path_str = format!("sqlite:{}", path.as_ref().display());
|
||||
let pool = SqlitePool::connect(&path_str).await?;
|
||||
let mut sqlite = Self { pool };
|
||||
sqlite.run_migrations().await?;
|
||||
Ok(sqlite)
|
||||
}
|
||||
|
||||
async fn run_migrations(&mut self) -> anyhow::Result<()> {
|
||||
sqlx::migrate!("./migrations").run(&self.pool).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Database for SqliteDatabase {
|
||||
async fn insert_peer_id(&self, swap_id: Uuid, peer_id: PeerId) -> Result<()> {
|
||||
let mut conn = self.pool.acquire().await?;
|
||||
|
||||
let swap_id = swap_id.to_string();
|
||||
let peer_id = peer_id.to_string();
|
||||
|
||||
sqlx::query!(
|
||||
r#"
|
||||
insert into peers (
|
||||
swap_id,
|
||||
peer_id
|
||||
) values (?, ?);
|
||||
"#,
|
||||
swap_id,
|
||||
peer_id
|
||||
)
|
||||
.execute(&mut conn)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_peer_id(&self, swap_id: Uuid) -> Result<PeerId> {
|
||||
let mut conn = self.pool.acquire().await?;
|
||||
|
||||
let swap_id = swap_id.to_string();
|
||||
|
||||
let row = sqlx::query!(
|
||||
r#"
|
||||
SELECT peer_id
|
||||
FROM peers
|
||||
WHERE swap_id = ?
|
||||
"#,
|
||||
swap_id
|
||||
)
|
||||
.fetch_one(&mut conn)
|
||||
.await?;
|
||||
|
||||
let peer_id = PeerId::from_str(&row.peer_id)?;
|
||||
Ok(peer_id)
|
||||
}
|
||||
|
||||
async fn insert_monero_address(&self, swap_id: Uuid, address: Address) -> Result<()> {
|
||||
let mut conn = self.pool.acquire().await?;
|
||||
|
||||
let swap_id = swap_id.to_string();
|
||||
let address = address.to_string();
|
||||
|
||||
sqlx::query!(
|
||||
r#"
|
||||
insert into monero_addresses (
|
||||
swap_id,
|
||||
address
|
||||
) values (?, ?);
|
||||
"#,
|
||||
swap_id,
|
||||
address
|
||||
)
|
||||
.execute(&mut conn)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_monero_address(&self, swap_id: Uuid) -> Result<Address> {
|
||||
let mut conn = self.pool.acquire().await?;
|
||||
|
||||
let swap_id = swap_id.to_string();
|
||||
|
||||
let row = sqlx::query!(
|
||||
r#"
|
||||
SELECT address
|
||||
FROM monero_addresses
|
||||
WHERE swap_id = ?
|
||||
"#,
|
||||
swap_id
|
||||
)
|
||||
.fetch_one(&mut conn)
|
||||
.await?;
|
||||
|
||||
let address = row.address.parse()?;
|
||||
|
||||
Ok(address)
|
||||
}
|
||||
|
||||
async fn insert_address(&self, peer_id: PeerId, address: Multiaddr) -> Result<()> {
|
||||
let mut conn = self.pool.acquire().await?;
|
||||
|
||||
let peer_id = peer_id.to_string();
|
||||
let address = address.to_string();
|
||||
|
||||
sqlx::query!(
|
||||
r#"
|
||||
insert into peer_addresses (
|
||||
peer_id,
|
||||
address
|
||||
) values (?, ?);
|
||||
"#,
|
||||
peer_id,
|
||||
address
|
||||
)
|
||||
.execute(&mut conn)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_addresses(&self, peer_id: PeerId) -> Result<Vec<Multiaddr>> {
|
||||
let mut conn = self.pool.acquire().await?;
|
||||
|
||||
let peer_id = peer_id.to_string();
|
||||
|
||||
let rows = sqlx::query!(
|
||||
r#"
|
||||
SELECT address
|
||||
FROM peer_addresses
|
||||
WHERE peer_id = ?
|
||||
"#,
|
||||
peer_id,
|
||||
)
|
||||
.fetch_all(&mut conn)
|
||||
.await?;
|
||||
|
||||
let addresses = rows
|
||||
.iter()
|
||||
.map(|row| {
|
||||
let multiaddr = Multiaddr::from_str(&row.address)?;
|
||||
Ok(multiaddr)
|
||||
})
|
||||
.collect::<Result<Vec<Multiaddr>>>();
|
||||
|
||||
addresses
|
||||
}
|
||||
|
||||
async fn insert_latest_state(&self, swap_id: Uuid, state: State) -> Result<()> {
|
||||
let mut conn = self.pool.acquire().await?;
|
||||
let entered_at = OffsetDateTime::now_utc();
|
||||
|
||||
let swap_id = swap_id.to_string();
|
||||
let swap = serde_json::to_string(&Swap::from(state))?;
|
||||
let entered_at = entered_at.to_string();
|
||||
|
||||
sqlx::query!(
|
||||
r#"
|
||||
insert into swap_states (
|
||||
swap_id,
|
||||
entered_at,
|
||||
state
|
||||
) values (?, ?, ?);
|
||||
"#,
|
||||
swap_id,
|
||||
entered_at,
|
||||
swap
|
||||
)
|
||||
.execute(&mut conn)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_state(&self, swap_id: Uuid) -> Result<State> {
|
||||
let mut conn = self.pool.acquire().await?;
|
||||
let swap_id = swap_id.to_string();
|
||||
let row = sqlx::query!(
|
||||
r#"
|
||||
SELECT state
|
||||
FROM swap_states
|
||||
WHERE swap_id = ?
|
||||
ORDER BY id desc
|
||||
LIMIT 1;
|
||||
|
||||
"#,
|
||||
swap_id
|
||||
)
|
||||
.fetch_all(&mut conn)
|
||||
.await?;
|
||||
|
||||
let row = row
|
||||
.first()
|
||||
.context(format!("No state in database for swap: {}", swap_id))?;
|
||||
let swap: Swap = serde_json::from_str(&row.state)?;
|
||||
|
||||
Ok(swap.into())
|
||||
}
|
||||
|
||||
async fn all(&self) -> Result<Vec<(Uuid, State)>> {
|
||||
let mut conn = self.pool.acquire().await?;
|
||||
let rows = sqlx::query!(
|
||||
r#"
|
||||
SELECT swap_id, state
|
||||
FROM (
|
||||
SELECT max(id), swap_id, state
|
||||
FROM swap_states
|
||||
GROUP BY swap_id
|
||||
)
|
||||
"#
|
||||
)
|
||||
.fetch_all(&mut conn)
|
||||
.await?;
|
||||
|
||||
let result = rows
|
||||
.iter()
|
||||
.map(|row| {
|
||||
let swap_id = Uuid::from_str(&row.swap_id)?;
|
||||
let state = match serde_json::from_str::<Swap>(&row.state) {
|
||||
Ok(a) => Ok(State::from(a)),
|
||||
Err(e) => Err(e),
|
||||
}?;
|
||||
Ok((swap_id, state))
|
||||
})
|
||||
.collect::<Result<Vec<(Uuid, State)>>>();
|
||||
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::protocol::alice::AliceState;
|
||||
use crate::protocol::bob::BobState;
|
||||
use std::fs::File;
|
||||
use tempfile::tempdir;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_insert_and_load_state() {
|
||||
let db = setup_test_db().await.unwrap();
|
||||
|
||||
let state_1 = State::Alice(AliceState::BtcRedeemed);
|
||||
let swap_id_1 = Uuid::new_v4();
|
||||
|
||||
db.insert_latest_state(swap_id_1, state_1).await.unwrap();
|
||||
|
||||
let state_1 = State::Alice(AliceState::BtcRedeemed);
|
||||
|
||||
db.insert_latest_state(swap_id_1, state_1.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let state_1_loaded = db.get_state(swap_id_1).await.unwrap();
|
||||
|
||||
assert_eq!(state_1, state_1_loaded);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_retrieve_all_latest_states() {
|
||||
let db = setup_test_db().await.unwrap();
|
||||
|
||||
let state_1 = State::Alice(AliceState::BtcRedeemed);
|
||||
let state_2 = State::Alice(AliceState::BtcPunished);
|
||||
let state_3 = State::Alice(AliceState::SafelyAborted);
|
||||
let state_4 = State::Bob(BobState::SafelyAborted);
|
||||
let swap_id_1 = Uuid::new_v4();
|
||||
let swap_id_2 = Uuid::new_v4();
|
||||
|
||||
db.insert_latest_state(swap_id_1, state_1.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
db.insert_latest_state(swap_id_1, state_2.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
db.insert_latest_state(swap_id_1, state_3.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
db.insert_latest_state(swap_id_2, state_4.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let latest_loaded = db.all().await.unwrap();
|
||||
|
||||
assert_eq!(latest_loaded.len(), 2);
|
||||
|
||||
assert!(latest_loaded.contains(&(swap_id_1, state_3)));
|
||||
assert!(latest_loaded.contains(&(swap_id_2, state_4)));
|
||||
|
||||
assert!(!latest_loaded.contains(&(swap_id_1, state_1)));
|
||||
assert!(!latest_loaded.contains(&(swap_id_1, state_2)));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_insert_load_monero_address() -> Result<()> {
|
||||
let db = setup_test_db().await?;
|
||||
|
||||
let swap_id = Uuid::new_v4();
|
||||
let monero_address = "53gEuGZUhP9JMEBZoGaFNzhwEgiG7hwQdMCqFxiyiTeFPmkbt1mAoNybEUvYBKHcnrSgxnVWgZsTvRBaHBNXPa8tHiCU51a".parse()?;
|
||||
|
||||
db.insert_monero_address(swap_id, monero_address).await?;
|
||||
|
||||
let loaded_monero_address = db.get_monero_address(swap_id).await?;
|
||||
|
||||
assert_eq!(monero_address, loaded_monero_address);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_insert_and_load_multiaddr() -> Result<()> {
|
||||
let db = setup_test_db().await?;
|
||||
|
||||
let peer_id = PeerId::random();
|
||||
let multiaddr1 = "/ip4/127.0.0.1".parse::<Multiaddr>()?;
|
||||
let multiaddr2 = "/ip4/127.0.0.2".parse::<Multiaddr>()?;
|
||||
|
||||
db.insert_address(peer_id, multiaddr1.clone()).await?;
|
||||
db.insert_address(peer_id, multiaddr2.clone()).await?;
|
||||
|
||||
let loaded_multiaddr = db.get_addresses(peer_id).await?;
|
||||
|
||||
assert!(loaded_multiaddr.contains(&multiaddr1));
|
||||
assert!(loaded_multiaddr.contains(&multiaddr2));
|
||||
assert_eq!(loaded_multiaddr.len(), 2);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_insert_and_load_peer_id() -> Result<()> {
|
||||
let db = setup_test_db().await?;
|
||||
|
||||
let peer_id = PeerId::random();
|
||||
let multiaddr1 = "/ip4/127.0.0.1".parse::<Multiaddr>()?;
|
||||
let multiaddr2 = "/ip4/127.0.0.2".parse::<Multiaddr>()?;
|
||||
|
||||
db.insert_address(peer_id, multiaddr1.clone()).await?;
|
||||
db.insert_address(peer_id, multiaddr2.clone()).await?;
|
||||
|
||||
let loaded_multiaddr = db.get_addresses(peer_id).await?;
|
||||
|
||||
assert!(loaded_multiaddr.contains(&multiaddr1));
|
||||
assert!(loaded_multiaddr.contains(&multiaddr2));
|
||||
assert_eq!(loaded_multiaddr.len(), 2);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn setup_test_db() -> Result<SqliteDatabase> {
|
||||
let temp_db = tempdir().unwrap().into_path().join("tempdb");
|
||||
|
||||
// file has to exist in order to connect with sqlite
|
||||
File::create(temp_db.clone()).unwrap();
|
||||
|
||||
let db = SqliteDatabase::open(temp_db).await?;
|
||||
|
||||
Ok(db)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue