diff --git a/src/database/conversion.rs b/src/database/conversion.rs new file mode 100644 index 0000000..d3a9c33 --- /dev/null +++ b/src/database/conversion.rs @@ -0,0 +1,20 @@ +use rusqlite::{self, Error, Row}; + +pub trait RowConversion: Sized { + fn from_row<'stmt>(row: &Row<'stmt>) -> Result; +} + +/*impl RowConversion for EmailEntry { +fn from_row<'stmt>(row: &Row<'stmt>) -> Result { + let path: String = row.get("path")?; + let domain: String = row.get("domain")?; + let local_part: String = row.get("local_part")?; + let year: usize = row.get("year")?; + let month: usize = row.get("month")?; + let day: usize = row.get("day")?; + let created = email_parser::time::DateTime:: + Ok(EmailEntry { + path, domain, local_part, year, month, day + }) +} +*/ diff --git a/src/database.rs b/src/database/database.rs similarity index 51% rename from src/database.rs rename to src/database/database.rs index b87ec55..864482d 100644 --- a/src/database.rs +++ b/src/database/database.rs @@ -1,40 +1,38 @@ +use chrono::Datelike; +use crossbeam_channel::{unbounded, Sender}; +use eyre::{Report, Result}; +use rusqlite::{self, params, Connection, Statement}; + use std::{ path::{Path, PathBuf}, thread::JoinHandle, }; +use super::{sql::*, DBMessage}; use crate::types::EmailEntry; -use chrono::Datelike; -use crossbeam_channel::{unbounded, Receiver, Sender}; -use eyre::{Report, Result}; -use rusqlite::{self, params, Connection, Error, Row, Statement, Transaction}; #[derive(Debug)] pub struct Database { connection: Option, } -pub enum DBMessage { - Mail(EmailEntry), - Error(Report, PathBuf), - Done, -} - impl Database { - /// Create a in-memory db. + /// Open database at path `Path`. pub fn new>(path: P) -> Result { - //let mut connection = Connection::open_in_memory()?; - let connection = Connection::open(path.as_ref())?; - connection - .pragma_update(None, "journal_mode", &"memory") - .unwrap(); - connection - .pragma_update(None, "synchronous", &"OFF") - .unwrap(); + #[allow(unused_mut)] + let mut connection = Connection::open(path.as_ref())?; + + // Improve the insertion performance. + connection.pragma_update(None, "journal_mode", &"memory")?; + connection.pragma_update(None, "synchronous", &"OFF")?; + Self::create_tables(&connection)?; - //connection.trace(Some(|n| { - // println!("SQL: {}", &n); - //})); + + #[cfg(feature = "trace-sql")] + connection.trace(Some(|query| { + tracing::trace!("SQL: {}", &query); + })); + Ok(Database { connection: Some(connection), }) @@ -58,14 +56,17 @@ impl Database { /// ``` pub fn import(mut self) -> (Sender, JoinHandle>) { let (sender, receiver) = unbounded(); + + // Import can only be called *once* on a database created with `new`. + // Therefore there should always be a value to unwrap; let mut connection = self.connection.take().unwrap(); let handle = std::thread::spawn(move || { let mut counter = 0; { - let transaction = connection.transaction().unwrap(); - let sql = "INSERT INTO emails (path, domain, local_part, year, month, day, subject) VALUES (?, ?, ?, ?, ?, ?, ?)"; + let transaction = connection.transaction()?; { - let mut prepared = transaction.prepare(sql).unwrap(); + let mut mail_prepared = transaction.prepare(QUERY_EMAILS)?; + let mut error_prepared = transaction.prepare(QUERY_ERRORS)?; loop { let next = match receiver.recv() { Ok(n) => n, @@ -74,29 +75,26 @@ impl Database { panic!("should not happen"); } }; - let result = match next { + match next { DBMessage::Mail(mail) => { counter += 1; - insert_mail(&transaction, &mut prepared, &mail) + insert_mail(&mut mail_prepared, &mail) } DBMessage::Error(report, path) => { - insert_error(&transaction, &report, &path) + insert_error(&mut error_prepared, &report, &path) } DBMessage::Done => { tracing::trace!("Received DBMessage::Done"); break; } - }; - result.unwrap(); - //if let Err(e) = result { - // tracing::error!("SQL Error: {:?}", &e); - //} + }?; } } if let Err(e) = transaction.commit() { return Err(eyre::eyre!("Transaction Error: {:?}", &e)); } } + // In case closing the database fails, we try again until we succeed let mut c = connection; loop { tracing::trace!("Attempting close"); @@ -112,32 +110,13 @@ impl Database { } fn create_tables(connection: &Connection) -> Result<()> { - let emails_table = r#" -CREATE TABLE IF NOT EXISTS emails ( - path TEXT NOT NULL, - domain TEXT NOT NULL, - local_part TEXT NOT NULL, - year INTEGER NOT NULL, - month INTEGER NOT NULL, - day INTEGER NOT NULL, - subject TEXT NOT NULL -);"#; - connection.execute(&emails_table, params![])?; - let errors_table = r#" -CREATE TABLE IF NOT EXISTS errors ( - message TEXT NOT NULL, - path TEXT NOT NULL -);"#; - connection.execute(&errors_table, params![])?; + connection.execute(TBL_EMAILS, params![])?; + connection.execute(TBL_ERRORS, params![])?; Ok(()) } } -fn insert_mail( - transaction: &Transaction, - statement: &mut Statement, - entry: &EmailEntry, -) -> Result<()> { +fn insert_mail(statement: &mut Statement, entry: &EmailEntry) -> Result<()> { let path = entry.path.display().to_string(); let domain = &entry.domain; let local_part = &entry.local_part; @@ -145,35 +124,13 @@ fn insert_mail( let month = entry.datetime.date().month(); let day = entry.datetime.date().day(); let subject = entry.subject.to_string(); - let r = statement.execute(params![path, domain, local_part, year, month, day, subject])?; - tracing::trace!("Insert Mail [{}] {}", r, &path); + statement.execute(params![path, domain, local_part, year, month, day, subject])?; + tracing::trace!("Insert Mail {}", &path); Ok(()) } -fn insert_error(transaction: &Transaction, message: &Report, path: &PathBuf) -> Result<()> { - let sql = "INSERT INTO errors (message, path) VALUES (?, ?)"; +fn insert_error(statement: &mut Statement, message: &Report, path: &PathBuf) -> Result<()> { + statement.execute(params![message.to_string(), path.display().to_string()])?; tracing::trace!("Insert Error {}", &path.display()); - let mut prepared = transaction.prepare(sql)?; - prepared.execute(params![message.to_string(), path.display().to_string()])?; Ok(()) } - -pub trait RowConversion: Sized { - fn from_row<'stmt>(row: &Row<'stmt>) -> Result; - fn to_row(&self) -> Result; -} - -/*impl RowConversion for EmailEntry { -fn from_row<'stmt>(row: &Row<'stmt>) -> Result { - let path: String = row.get("path")?; - let domain: String = row.get("domain")?; - let local_part: String = row.get("local_part")?; - let year: usize = row.get("year")?; - let month: usize = row.get("month")?; - let day: usize = row.get("day")?; - let created = email_parser::time::DateTime:: - Ok(EmailEntry { - path, domain, local_part, year, month, day - }) -} -*/ diff --git a/src/database/db_message.rs b/src/database/db_message.rs new file mode 100644 index 0000000..162fbab --- /dev/null +++ b/src/database/db_message.rs @@ -0,0 +1,16 @@ +use std::path::PathBuf; + +use eyre::Report; + +use crate::types::EmailEntry; + +/// Parameter for sending work to the database during `import`. +pub enum DBMessage { + /// Send for a successfuly parsed mail + Mail(EmailEntry), + /// Send for any kind of error during reading / parsing + Error(Report, PathBuf), + /// Send once all parsing is done. + /// This is used to break out of the receiving loop + Done, +} diff --git a/src/database/mod.rs b/src/database/mod.rs new file mode 100644 index 0000000..ebe8dc5 --- /dev/null +++ b/src/database/mod.rs @@ -0,0 +1,8 @@ +mod conversion; +mod database; +mod db_message; +mod sql; + +pub use conversion::RowConversion; +pub use database::Database; +pub use db_message::DBMessage; diff --git a/src/database/sql.rs b/src/database/sql.rs new file mode 100644 index 0000000..9acdd10 --- /dev/null +++ b/src/database/sql.rs @@ -0,0 +1,30 @@ +pub const TBL_EMAILS: &str = r#" +CREATE TABLE IF NOT EXISTS emails ( + path TEXT NOT NULL, + domain TEXT NOT NULL, + local_part TEXT NOT NULL, + year INTEGER NOT NULL, + month INTEGER NOT NULL, + day INTEGER NOT NULL, + subject TEXT NOT NULL +);"#; + +pub const TBL_ERRORS: &str = r#" +CREATE TABLE IF NOT EXISTS errors ( + message TEXT NOT NULL, + path TEXT NOT NULL +);"#; + +pub const QUERY_EMAILS: &str = r#" +INSERT INTO emails + (path, domain, local_part, year, month, day, subject) +VALUES + (?, ?, ?, ?, ?, ?, ?) +"#; + +pub const QUERY_ERRORS: &str = r#" +INSERT INTO errors + (message, path) +VALUES + (?, ?) +"#; diff --git a/src/emails.rs b/src/emails.rs deleted file mode 100644 index 06fae85..0000000 --- a/src/emails.rs +++ /dev/null @@ -1,376 +0,0 @@ -use chrono::prelude::*; -use email_address_parser; -use eyre::{bail, eyre, Result, WrapErr}; -use flate2; -use flate2::read::GzDecoder; -use rayon::prelude::*; -use serde::Deserialize; -use serde_json; -use strum_macros; - -const SENDER_HEADER_NAMES: &[&str] = &["Sender", "Reply-to", "From"]; -const DATE_HEADER_NAMES: &[&str] = &["Received", "Date"]; - -use std::{ - convert::{TryFrom, TryInto}, - io::Read, - path::{Path, PathBuf}, -}; - -/// We want to know which library was used to parse this email -#[derive(Debug, strum_macros::EnumString, strum_macros::ToString)] -pub enum ParserKind { - EmailParser, - Eml, - Rhymessage, - Meta, -} - -/// Representation of an email -#[derive(Debug)] -pub struct EmailEntry { - pub path: PathBuf, - pub domain: String, - pub local_part: String, - pub datetime: chrono::DateTime, - pub subject: String, -} - -/// Raw representation of an email. -/// Contains the paths to the relevant files as well -/// as the name of the folder the email was in. -#[derive(Debug)] -pub struct RawEmailEntry { - folder_name: String, - eml_path: PathBuf, - meta_path: PathBuf, -} - -impl RawEmailEntry { - pub fn new>(path: P) -> RawEmailEntry { - let path = path.as_ref(); - let folder_name = path - .parent() - .unwrap() - .file_name() - .unwrap() - .to_str() - .unwrap() - .to_owned(); - let eml_path = path.to_path_buf(); - let meta_path = path - .parent() - .unwrap() - .join(format!( - "{}.meta", - &path - .file_stem() - .unwrap() - .to_str() - .unwrap() - .replace(".eml", "") - )) - .to_path_buf(); - RawEmailEntry { - folder_name, - eml_path, - meta_path, - } - } - - pub fn path(&self) -> PathBuf { - self.eml_path.clone() - } -} - -pub struct Emails { - /// The current index in the Vec of emails - curr: usize, - /// The `Vec` with the `EmailEntry` entries - pub emails: Vec, -} - -impl Emails { - pub fn new>(folder: A) -> Result { - let folder = folder.as_ref(); - if !folder.exists() { - bail!("Folder {} does not exist", &folder.display()); - } - let emails = read_folders(&folder)?; - Ok(Emails { curr: 0, emails }) - } - - pub fn len(&self) -> usize { - self.emails.len() - } -} - -//impl Iterator for Emails { -// // We can refer to this type using Self::Item -// type Item = Result; -// -// fn next(&mut self) -> Option { -// let new_next = self.curr + 1; -// let entry = self.emails.get(self.curr)?; -// self.curr = new_next; -// let email = read_email(&entry); -// Some(email) -// } -//} - -//impl ParallelIterator for Emails { -// type Item = Result; -// -// fn drive_unindexed(self, consumer: C) -> C::Result -// where -// C: rayon::iter::plumbing::UnindexedConsumer, -// { -// self.emails -// .into_par_iter() -// .map(|e| read_email(&e)) -// .drive_unindexed(consumer) -// } -//} - -fn read_folders(folder: &Path) -> Result> { - Ok(std::fs::read_dir(&folder)? - .into_iter() - .par_bridge() - .filter_map(|entry| { - let path = entry - .map_err(|e| tracing::error!("{} {:?}", &folder.display(), &e)) - .ok()? - .path(); - if !path.is_dir() { - return None; - } - read_emails(&path) - .map_err(|e| tracing::error!("{} {:?}", &path.display(), &e)) - .ok() - }) - .flatten() - .collect()) -} - -fn read_emails(folder_path: &Path) -> Result> { - Ok(std::fs::read_dir(folder_path)? - .into_iter() - .par_bridge() - .filter_map(|entry| { - let path = entry - .map_err(|e| tracing::error!("{} {:?}", &folder_path.display(), &e)) - .ok()? - .path(); - if path.is_dir() { - return None; - } - if !path.extension()?.eq("gz") { - return None; - } - Some(RawEmailEntry { - folder_name: folder_path.file_name()?.to_str()?.to_string(), - eml_path: path.clone(), - meta_path: path - .parent()? - .join(format!( - "{}.meta", - &path.file_stem()?.to_str()?.replace(".eml", "") - )) - .to_path_buf(), - }) - }) - //.take(50) - .collect()) -} - -pub fn read_email(raw_entry: &RawEmailEntry) -> Result { - let content = unziped_content(&raw_entry.eml_path)?; - // We have to try multiple different email readers as each of them seems to fail in a different way - let email = parse_email_parser(&raw_entry, &content).or_else(|e| { - tracing::trace!("Parser Error: {:?}", &e); - parse_meta(&raw_entry, &content) - }); - - Ok(email.wrap_err_with(|| { - format!( - "{}\n{:?}", - String::from_utf8(content.clone()).unwrap(), - &raw_entry - ) - })?) -} - -fn parse_email_parser(raw_entry: &RawEmailEntry, content: &Vec) -> Result { - match email_parser::email::Email::parse(&content) { - Ok(email) => (&raw_entry.eml_path, email).try_into(), - Err(error) => { - //let content_string = String::from_utf8(content.clone())?; - //println!("{}|{}", &error, &raw_entry.eml_path.display()); - Err(eyre!("Could not parse email: {:?}", &error)) - } - } -} - -fn parse_meta(raw_entry: &RawEmailEntry, _content: &Vec) -> Result { - use chrono::prelude::*; - #[derive(Deserialize)] - struct Meta { - msg_id: String, - internal_date: i64, - subject: String, - } - let content = std::fs::read_to_string(&raw_entry.meta_path)?; - let meta: Meta = serde_json::from_str(&content)?; - let parsed = email_address_parser::EmailAddress::parse(&meta.msg_id, None) - .ok_or(eyre!("Cannot Parse Address: {}", &meta.msg_id))?; - let datetime = Utc.timestamp(meta.internal_date, 0); - Ok(EmailEntry { - path: raw_entry.eml_path.to_path_buf(), - domain: parsed.get_domain().to_owned(), - local_part: parsed.get_local_part().to_owned(), - datetime, - subject: meta.subject.clone(), - }) -} - -impl<'a> TryFrom<(&PathBuf, email_parser::email::Email<'a>)> for EmailEntry { - type Error = eyre::Report; - fn try_from(content: (&PathBuf, email_parser::email::Email)) -> Result { - let (path, email) = content; - let domain = email.sender.address.domain.to_string(); - let local_part = email.sender.address.local_part.to_string(); - let datetime = emaildatetime_to_chrono(&email.date); - let subject = email.subject.map(|e| e.to_string()).unwrap_or_default(); - - Ok(EmailEntry { - path: path.to_path_buf(), - domain, - local_part, - datetime, - subject, - }) - } -} - -fn emaildatetime_to_chrono(dt: &email_parser::time::DateTime) -> chrono::DateTime { - Utc.ymd( - dt.date.year as i32, - dt.date.month_number() as u32, - dt.date.day as u32, - ) - .and_hms( - dt.time.time.hour as u32, - dt.time.time.minute as u32, - dt.time.time.second as u32, - ) -} - -fn unziped_content(path: &Path) -> Result> { - let reader = std::fs::File::open(path)?; - let mut decoder = GzDecoder::new(reader); - let mut buffer = Vec::new(); - decoder.read_to_end(&mut buffer)?; - Ok(buffer) -} - -/// Try to parse unstructed data into some sort of -/// email address -//fn parse_unstructured(data: &str) -> Option { -// use lazy_static::lazy_static; -// use regex::Regex; -// lazy_static! { -// static ref EMAIL_RE: Regex = Regex::new(r#"(?:[a-z0-9!#$%&'*+/=?^_`{|}~-]+(?:\.[a-z0-9!#$%&'*+/=?^_`{|}~-]+)*|"(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21\x23-\x5b\x5d-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])*")@(?:(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?|\[(?:(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9]))\.){3}(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9])|[a-z0-9-]*[a-z0-9]:(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21-\x5a\x53-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])+)\])"#).unwrap(); -// } -// lazy_static! { -// static ref RE: Regex = Regex::new("<(.*?)>").unwrap(); -// } -// if let Some(capture) = RE.captures(&data).and_then(|f| f.get(1)) { -// Some(eml_parser::eml::EmailAddress::AddressOnly { -// address: capture.as_str().to_string(), -// }) -// } else { -// let capture = EMAIL_RE.captures(&data).and_then(|f| f.get(0))?; -// Some(eml_parser::eml::EmailAddress::AddressOnly { -// address: capture.as_str().to_string(), -// }) -// } -//} - -//fn extract_address(from: &eml_parser::eml::EmailAddress) -> String { -// use eml_parser::eml::EmailAddress::*; -// match from { -// AddressOnly { address } => address.clone(), -// NameAndEmailAddress { name: _, address } => address.clone(), -// } -//} - -#[cfg(test)] -mod tests { - use std::{path::PathBuf, str::FromStr}; - - use super::RawEmailEntry; - - #[test] - //fn test_weird_email1() { - // let data = "No Reply , terhechte.5cffa@m.evernote.com"; - // let address = super::parse_unstructured(&data).unwrap(); - // assert_eq!( - // address, - // eml_parser::eml::EmailAddress::AddressOnly { - // address: "no-reply@evernote.com".to_owned() - // } - // ); - //} - #[test] - //fn test_weird_email2() { - // let data = r#"info@sport-news.denReply-To:info"@sport-news.denX-Mailer:Sport-News.de"#; - // let address = super::parse_unstructured(&data).unwrap(); - // assert_eq!( - // address, - // eml_parser::eml::EmailAddress::AddressOnly { - // address: "info@sport-news.den".to_owned() - // } - // ); - //} - #[test] - fn test_weird_email3() { - crate::setup(); - let eml_path = PathBuf::from_str( - "/Users/terhechte/Documents/gmail_backup/db/2014-09/1479692635489080640.eml.gz", - ) - .unwrap(); - let meta_path = PathBuf::from_str( - "/Users/terhechte/Documents/gmail_backup/db/2014-09/1479692635489080640.meta", - ) - .unwrap(); - let r = RawEmailEntry { - folder_name: "2014-09".to_owned(), - eml_path, - meta_path, - }; - //let result = super::read_email(&r).expect(""); - let content = Vec::new(); - let result = super::parse_meta(&r, &content).expect(""); - dbg!(&result); - } - - #[test] - fn test_weird_email4() { - crate::setup(); - let eml_path = PathBuf::from_str( - "/Users/terhechte/Documents/gmail_backup/db/2014-08/1475705321427236077.eml.gz", - ) - .unwrap(); - let meta_path = PathBuf::from_str( - "/Users/terhechte/Documents/gmail_backup/db/2014-08/1475705321427236077.meta", - ) - .unwrap(); - let r = RawEmailEntry { - folder_name: "2014-08".to_owned(), - eml_path, - meta_path, - }; - let result = super::read_email(&r).expect(""); - dbg!(&result); - } -} diff --git a/src/main.rs b/src/main.rs index 46abc0c..5f36d83 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,35 +1,16 @@ -use core::num; -use eyre::{bail, Result}; -use rayon::prelude::*; -use std::io::prelude::*; -use std::thread::JoinHandle; -use std::{io, path::PathBuf}; -use thiserror; -use tracing_subscriber::EnvFilter; - -use crossbeam_channel; -use std::path::Path; -use std::sync::{Arc, Mutex}; - +use eyre::Result; use std::{ io::{stdout, Write}, thread::sleep, time::Duration, }; - -use crate::database::Database; +use tracing_subscriber::EnvFilter; mod database; mod filesystem; mod parse; mod types; -#[derive(Debug, thiserror::Error)] -enum GmailDBError { - #[error("Missing folder argument")] - MissingFolder, -} - fn main() -> Result<()> { setup(); let arguments: Vec = std::env::args().collect();