Add auto-migration from sled to sqlite on startup

This commit is contained in:
rishflab 2021-09-23 11:18:39 +10:00
parent a738c9df8a
commit 0f7876c107
4 changed files with 93 additions and 24 deletions

View File

@ -29,12 +29,11 @@ use swap::asb::config::{
initial_setup, query_user_for_initial_config, read_config, Config, ConfigNotInitialized, initial_setup, query_user_for_initial_config, read_config, Config, ConfigNotInitialized,
}; };
use swap::asb::{cancel, punish, redeem, refund, safely_abort, EventLoop, Finality, KrakenRate}; use swap::asb::{cancel, punish, redeem, refund, safely_abort, EventLoop, Finality, KrakenRate};
use swap::database::SledDatabase; use swap::database::open_db;
use swap::monero::Amount; use swap::monero::Amount;
use swap::network::rendezvous::XmrBtcNamespace; use swap::network::rendezvous::XmrBtcNamespace;
use swap::network::swarm; use swap::network::swarm;
use swap::protocol::alice::{run, AliceState}; use swap::protocol::alice::{run, AliceState};
use swap::protocol::Database;
use swap::seed::Seed; use swap::seed::Seed;
use swap::tor::AuthenticatedClient; use swap::tor::AuthenticatedClient;
use swap::{asb, bitcoin, kraken, monero, tor}; use swap::{asb, bitcoin, kraken, monero, tor};
@ -93,10 +92,9 @@ async fn main() -> Result<()> {
} }
let db_path = config.data.dir.join("database"); let db_path = config.data.dir.join("database");
let sled_path = config.data.dir.join(db_path);
let db = SledDatabase::open(config.data.dir.join(db_path).as_path()) let db = open_db(sled_path, config.data.dir.join("sqlite"), true).await?;
.await
.context("Could not open database")?;
let seed = let seed =
Seed::from_file_or_generate(&config.data.dir).expect("Could not retrieve/initialize seed"); Seed::from_file_or_generate(&config.data.dir).expect("Could not retrieve/initialize seed");
@ -180,7 +178,7 @@ async fn main() -> Result<()> {
env_config, env_config,
Arc::new(bitcoin_wallet), Arc::new(bitcoin_wallet),
Arc::new(monero_wallet), Arc::new(monero_wallet),
Arc::new(db), db,
kraken_rate.clone(), kraken_rate.clone(),
config.maker.min_buy_btc, config.maker.min_buy_btc,
config.maker.max_buy_btc, config.maker.max_buy_btc,
@ -252,7 +250,7 @@ async fn main() -> Result<()> {
Command::Cancel { swap_id } => { Command::Cancel { swap_id } => {
let bitcoin_wallet = init_bitcoin_wallet(&config, &seed, env_config).await?; let bitcoin_wallet = init_bitcoin_wallet(&config, &seed, env_config).await?;
let (txid, _) = cancel(swap_id, Arc::new(bitcoin_wallet), Arc::new(db)).await?; let (txid, _) = cancel(swap_id, Arc::new(bitcoin_wallet), db).await?;
tracing::info!("Cancel transaction successfully published with id {}", txid); tracing::info!("Cancel transaction successfully published with id {}", txid);
} }
@ -264,7 +262,7 @@ async fn main() -> Result<()> {
swap_id, swap_id,
Arc::new(bitcoin_wallet), Arc::new(bitcoin_wallet),
Arc::new(monero_wallet), Arc::new(monero_wallet),
Arc::new(db), db,
) )
.await?; .await?;
@ -273,12 +271,12 @@ async fn main() -> Result<()> {
Command::Punish { swap_id } => { Command::Punish { swap_id } => {
let bitcoin_wallet = init_bitcoin_wallet(&config, &seed, env_config).await?; let bitcoin_wallet = init_bitcoin_wallet(&config, &seed, env_config).await?;
let (txid, _) = punish(swap_id, Arc::new(bitcoin_wallet), Arc::new(db)).await?; let (txid, _) = punish(swap_id, Arc::new(bitcoin_wallet), db).await?;
tracing::info!("Punish transaction successfully published with id {}", txid); tracing::info!("Punish transaction successfully published with id {}", txid);
} }
Command::SafelyAbort { swap_id } => { Command::SafelyAbort { swap_id } => {
safely_abort(swap_id, Arc::new(db)).await?; safely_abort(swap_id, db).await?;
tracing::info!("Swap safely aborted"); tracing::info!("Swap safely aborted");
} }
@ -291,7 +289,7 @@ async fn main() -> Result<()> {
let (txid, _) = redeem( let (txid, _) = redeem(
swap_id, swap_id,
Arc::new(bitcoin_wallet), Arc::new(bitcoin_wallet),
Arc::new(db), db,
Finality::from_bool(do_not_await_finality), Finality::from_bool(do_not_await_finality),
) )
.await?; .await?;

View File

@ -26,13 +26,13 @@ use std::time::Duration;
use swap::bitcoin::TxLock; use swap::bitcoin::TxLock;
use swap::cli::command::{parse_args_and_apply_defaults, Arguments, Command, ParseResult}; use swap::cli::command::{parse_args_and_apply_defaults, Arguments, Command, ParseResult};
use swap::cli::{list_sellers, EventLoop, SellerStatus}; use swap::cli::{list_sellers, EventLoop, SellerStatus};
use swap::database::SledDatabase; use swap::database::open_db;
use swap::env::Config; use swap::env::Config;
use swap::libp2p_ext::MultiAddrExt; use swap::libp2p_ext::MultiAddrExt;
use swap::network::quote::BidQuote; use swap::network::quote::BidQuote;
use swap::network::swarm; use swap::network::swarm;
use swap::protocol::bob;
use swap::protocol::bob::{BobState, Swap}; use swap::protocol::bob::{BobState, Swap};
use swap::protocol::{bob, Database};
use swap::seed::Seed; use swap::seed::Seed;
use swap::{bitcoin, cli, monero}; use swap::{bitcoin, cli, monero};
use url::Url; use url::Url;
@ -54,11 +54,7 @@ async fn main() -> Result<()> {
} }
}; };
let db = Arc::new( let db = open_db(data_dir.join("database"), data_dir.join("sqlite"), true).await?;
SledDatabase::open(data_dir.join("database").as_path())
.await
.context("Failed to open database")?,
);
match cmd { match cmd {
Command::BuyXmr { Command::BuyXmr {

View File

@ -3,10 +3,13 @@ pub use alice::Alice;
pub use bob::Bob; pub use bob::Bob;
pub use sqlite::SqliteDatabase; pub use sqlite::SqliteDatabase;
use crate::protocol::State; use crate::fs::ensure_directory_exists;
use crate::protocol::{Database, State};
use anyhow::{bail, Result}; use anyhow::{bail, Result};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::fmt::Display; use std::fmt::Display;
use std::path::Path;
use std::sync::Arc;
mod alice; mod alice;
mod bob; mod bob;
@ -81,3 +84,75 @@ impl Swap {
} }
} }
} }
pub async fn open_db(
sled_path: impl AsRef<Path>,
sqlite_path: impl AsRef<Path>,
force_sled: bool,
) -> Result<Arc<dyn Database + Send + Sync>> {
// if sled exists and sqlite doesnt exist try and migrate
// if sled and sqlite exists and the sled flag is set, use sled
// if sled and sqlite exists, use sqlite
match (
sled_path.as_ref().exists(),
sqlite_path.as_ref().exists(),
force_sled,
) {
(true, false, false) => {
tracing::info!("Attempting to migrate old data to the new sqlite database...");
let sled_db = SledDatabase::open(sled_path.as_ref()).await?;
ensure_directory_exists(sqlite_path.as_ref())?;
tokio::fs::File::create(&sqlite_path).await?;
let sqlite = SqliteDatabase::open(sqlite_path).await?;
let swap_states = sled_db.all().await?;
for (swap_id, state) in swap_states.iter() {
sqlite.insert_latest_state(*swap_id, state.clone()).await?;
}
let monero_addresses = sled_db.get_all_monero_addresses();
for (swap_id, monero_address) in monero_addresses.flatten() {
sqlite
.insert_monero_address(swap_id, monero_address)
.await?;
}
let peer_addresses = sled_db.get_all_addresses();
for (peer_id, addresses) in peer_addresses.flatten() {
for address in addresses {
sqlite.insert_address(peer_id, address).await?;
}
}
let peers = sled_db.get_all_peers();
for (swap_id, peer_id) in peers.flatten() {
sqlite.insert_peer_id(swap_id, peer_id).await?;
}
tracing::info!("Sucessfully migrated data to sqlite! Using sqlite.");
Ok(Arc::new(sqlite))
}
(_, false, false) => {
tracing::debug!("Creating and using new sqlite database.");
ensure_directory_exists(sqlite_path.as_ref())?;
tokio::fs::File::create(&sqlite_path).await?;
let sqlite = SqliteDatabase::open(sqlite_path).await?;
Ok(Arc::new(sqlite))
}
(_, true, false) => {
tracing::debug!("Using existing sqlite database.");
let sqlite = SqliteDatabase::open(sqlite_path).await?;
Ok(Arc::new(sqlite))
}
(false, _, true) => {
bail!("Sled database does not exist at specified location")
}
(true, _, true) => {
tracing::debug!("Sled flag set. Using sled database.");
let sled = SledDatabase::open(sled_path.as_ref()).await?;
Ok(Arc::new(sled))
}
}
}

View File

@ -22,10 +22,12 @@ impl SqliteDatabase {
{ {
let path_str = format!("sqlite:{}", path.as_ref().display()); let path_str = format!("sqlite:{}", path.as_ref().display());
let pool = SqlitePool::connect(&path_str).await?; let pool = SqlitePool::connect(&path_str).await?;
Ok(Self { pool }) let mut sqlite = Self { pool };
sqlite.run_migrations().await?;
Ok(sqlite)
} }
pub async fn run_migrations(&mut self) -> anyhow::Result<()> { async fn run_migrations(&mut self) -> anyhow::Result<()> {
sqlx::migrate!("./migrations").run(&self.pool).await?; sqlx::migrate!("./migrations").run(&self.pool).await?;
Ok(()) Ok(())
} }
@ -374,9 +376,7 @@ mod tests {
// file has to exist in order to connect with sqlite // file has to exist in order to connect with sqlite
File::create(temp_db.clone()).unwrap(); File::create(temp_db.clone()).unwrap();
let mut db = SqliteDatabase::open(temp_db).await?; let db = SqliteDatabase::open(temp_db).await?;
db.run_migrations().await.unwrap();
Ok(db) Ok(db)
} }