This commit is contained in:
Benedikt Terhechte 2021-09-30 13:19:02 +02:00
parent e3a41158d0
commit 976004fbe3
7 changed files with 114 additions and 478 deletions

View File

@ -0,0 +1,20 @@
use rusqlite::{self, Error, Row};
pub trait RowConversion: Sized {
fn from_row<'stmt>(row: &Row<'stmt>) -> Result<Self, Error>;
}
/*impl RowConversion for EmailEntry {
fn from_row<'stmt>(row: &Row<'stmt>) -> Result<Self, Error> {
let path: String = row.get("path")?;
let domain: String = row.get("domain")?;
let local_part: String = row.get("local_part")?;
let year: usize = row.get("year")?;
let month: usize = row.get("month")?;
let day: usize = row.get("day")?;
let created = email_parser::time::DateTime::
Ok(EmailEntry {
path, domain, local_part, year, month, day
})
}
*/

View File

@ -1,40 +1,38 @@
use chrono::Datelike;
use crossbeam_channel::{unbounded, Sender};
use eyre::{Report, Result};
use rusqlite::{self, params, Connection, Statement};
use std::{ use std::{
path::{Path, PathBuf}, path::{Path, PathBuf},
thread::JoinHandle, thread::JoinHandle,
}; };
use super::{sql::*, DBMessage};
use crate::types::EmailEntry; use crate::types::EmailEntry;
use chrono::Datelike;
use crossbeam_channel::{unbounded, Receiver, Sender};
use eyre::{Report, Result};
use rusqlite::{self, params, Connection, Error, Row, Statement, Transaction};
#[derive(Debug)] #[derive(Debug)]
pub struct Database { pub struct Database {
connection: Option<Connection>, connection: Option<Connection>,
} }
pub enum DBMessage {
Mail(EmailEntry),
Error(Report, PathBuf),
Done,
}
impl Database { impl Database {
/// Create a in-memory db. /// Open database at path `Path`.
pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> { pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
//let mut connection = Connection::open_in_memory()?; #[allow(unused_mut)]
let connection = Connection::open(path.as_ref())?; let mut connection = Connection::open(path.as_ref())?;
connection
.pragma_update(None, "journal_mode", &"memory") // Improve the insertion performance.
.unwrap(); connection.pragma_update(None, "journal_mode", &"memory")?;
connection connection.pragma_update(None, "synchronous", &"OFF")?;
.pragma_update(None, "synchronous", &"OFF")
.unwrap();
Self::create_tables(&connection)?; Self::create_tables(&connection)?;
//connection.trace(Some(|n| {
// println!("SQL: {}", &n); #[cfg(feature = "trace-sql")]
//})); connection.trace(Some(|query| {
tracing::trace!("SQL: {}", &query);
}));
Ok(Database { Ok(Database {
connection: Some(connection), connection: Some(connection),
}) })
@ -58,14 +56,17 @@ impl Database {
/// ``` /// ```
pub fn import(mut self) -> (Sender<DBMessage>, JoinHandle<Result<usize>>) { pub fn import(mut self) -> (Sender<DBMessage>, JoinHandle<Result<usize>>) {
let (sender, receiver) = unbounded(); let (sender, receiver) = unbounded();
// Import can only be called *once* on a database created with `new`.
// Therefore there should always be a value to unwrap;
let mut connection = self.connection.take().unwrap(); let mut connection = self.connection.take().unwrap();
let handle = std::thread::spawn(move || { let handle = std::thread::spawn(move || {
let mut counter = 0; let mut counter = 0;
{ {
let transaction = connection.transaction().unwrap(); let transaction = connection.transaction()?;
let sql = "INSERT INTO emails (path, domain, local_part, year, month, day, subject) VALUES (?, ?, ?, ?, ?, ?, ?)";
{ {
let mut prepared = transaction.prepare(sql).unwrap(); let mut mail_prepared = transaction.prepare(QUERY_EMAILS)?;
let mut error_prepared = transaction.prepare(QUERY_ERRORS)?;
loop { loop {
let next = match receiver.recv() { let next = match receiver.recv() {
Ok(n) => n, Ok(n) => n,
@ -74,29 +75,26 @@ impl Database {
panic!("should not happen"); panic!("should not happen");
} }
}; };
let result = match next { match next {
DBMessage::Mail(mail) => { DBMessage::Mail(mail) => {
counter += 1; counter += 1;
insert_mail(&transaction, &mut prepared, &mail) insert_mail(&mut mail_prepared, &mail)
} }
DBMessage::Error(report, path) => { DBMessage::Error(report, path) => {
insert_error(&transaction, &report, &path) insert_error(&mut error_prepared, &report, &path)
} }
DBMessage::Done => { DBMessage::Done => {
tracing::trace!("Received DBMessage::Done"); tracing::trace!("Received DBMessage::Done");
break; break;
} }
}; }?;
result.unwrap();
//if let Err(e) = result {
// tracing::error!("SQL Error: {:?}", &e);
//}
} }
} }
if let Err(e) = transaction.commit() { if let Err(e) = transaction.commit() {
return Err(eyre::eyre!("Transaction Error: {:?}", &e)); return Err(eyre::eyre!("Transaction Error: {:?}", &e));
} }
} }
// In case closing the database fails, we try again until we succeed
let mut c = connection; let mut c = connection;
loop { loop {
tracing::trace!("Attempting close"); tracing::trace!("Attempting close");
@ -112,32 +110,13 @@ impl Database {
} }
fn create_tables(connection: &Connection) -> Result<()> { fn create_tables(connection: &Connection) -> Result<()> {
let emails_table = r#" connection.execute(TBL_EMAILS, params![])?;
CREATE TABLE IF NOT EXISTS emails ( connection.execute(TBL_ERRORS, params![])?;
path TEXT NOT NULL,
domain TEXT NOT NULL,
local_part TEXT NOT NULL,
year INTEGER NOT NULL,
month INTEGER NOT NULL,
day INTEGER NOT NULL,
subject TEXT NOT NULL
);"#;
connection.execute(&emails_table, params![])?;
let errors_table = r#"
CREATE TABLE IF NOT EXISTS errors (
message TEXT NOT NULL,
path TEXT NOT NULL
);"#;
connection.execute(&errors_table, params![])?;
Ok(()) Ok(())
} }
} }
fn insert_mail( fn insert_mail(statement: &mut Statement, entry: &EmailEntry) -> Result<()> {
transaction: &Transaction,
statement: &mut Statement,
entry: &EmailEntry,
) -> Result<()> {
let path = entry.path.display().to_string(); let path = entry.path.display().to_string();
let domain = &entry.domain; let domain = &entry.domain;
let local_part = &entry.local_part; let local_part = &entry.local_part;
@ -145,35 +124,13 @@ fn insert_mail(
let month = entry.datetime.date().month(); let month = entry.datetime.date().month();
let day = entry.datetime.date().day(); let day = entry.datetime.date().day();
let subject = entry.subject.to_string(); let subject = entry.subject.to_string();
let r = statement.execute(params![path, domain, local_part, year, month, day, subject])?; statement.execute(params![path, domain, local_part, year, month, day, subject])?;
tracing::trace!("Insert Mail [{}] {}", r, &path); tracing::trace!("Insert Mail {}", &path);
Ok(()) Ok(())
} }
fn insert_error(transaction: &Transaction, message: &Report, path: &PathBuf) -> Result<()> { fn insert_error(statement: &mut Statement, message: &Report, path: &PathBuf) -> Result<()> {
let sql = "INSERT INTO errors (message, path) VALUES (?, ?)"; statement.execute(params![message.to_string(), path.display().to_string()])?;
tracing::trace!("Insert Error {}", &path.display()); tracing::trace!("Insert Error {}", &path.display());
let mut prepared = transaction.prepare(sql)?;
prepared.execute(params![message.to_string(), path.display().to_string()])?;
Ok(()) Ok(())
} }
pub trait RowConversion: Sized {
fn from_row<'stmt>(row: &Row<'stmt>) -> Result<Self, Error>;
fn to_row(&self) -> Result<String, Error>;
}
/*impl RowConversion for EmailEntry {
fn from_row<'stmt>(row: &Row<'stmt>) -> Result<Self, Error> {
let path: String = row.get("path")?;
let domain: String = row.get("domain")?;
let local_part: String = row.get("local_part")?;
let year: usize = row.get("year")?;
let month: usize = row.get("month")?;
let day: usize = row.get("day")?;
let created = email_parser::time::DateTime::
Ok(EmailEntry {
path, domain, local_part, year, month, day
})
}
*/

View File

@ -0,0 +1,16 @@
use std::path::PathBuf;
use eyre::Report;
use crate::types::EmailEntry;
/// Parameter for sending work to the database during `import`.
pub enum DBMessage {
/// Send for a successfuly parsed mail
Mail(EmailEntry),
/// Send for any kind of error during reading / parsing
Error(Report, PathBuf),
/// Send once all parsing is done.
/// This is used to break out of the receiving loop
Done,
}

8
src/database/mod.rs Normal file
View File

@ -0,0 +1,8 @@
mod conversion;
mod database;
mod db_message;
mod sql;
pub use conversion::RowConversion;
pub use database::Database;
pub use db_message::DBMessage;

30
src/database/sql.rs Normal file
View File

@ -0,0 +1,30 @@
pub const TBL_EMAILS: &str = r#"
CREATE TABLE IF NOT EXISTS emails (
path TEXT NOT NULL,
domain TEXT NOT NULL,
local_part TEXT NOT NULL,
year INTEGER NOT NULL,
month INTEGER NOT NULL,
day INTEGER NOT NULL,
subject TEXT NOT NULL
);"#;
pub const TBL_ERRORS: &str = r#"
CREATE TABLE IF NOT EXISTS errors (
message TEXT NOT NULL,
path TEXT NOT NULL
);"#;
pub const QUERY_EMAILS: &str = r#"
INSERT INTO emails
(path, domain, local_part, year, month, day, subject)
VALUES
(?, ?, ?, ?, ?, ?, ?)
"#;
pub const QUERY_ERRORS: &str = r#"
INSERT INTO errors
(message, path)
VALUES
(?, ?)
"#;

View File

@ -1,376 +0,0 @@
use chrono::prelude::*;
use email_address_parser;
use eyre::{bail, eyre, Result, WrapErr};
use flate2;
use flate2::read::GzDecoder;
use rayon::prelude::*;
use serde::Deserialize;
use serde_json;
use strum_macros;
const SENDER_HEADER_NAMES: &[&str] = &["Sender", "Reply-to", "From"];
const DATE_HEADER_NAMES: &[&str] = &["Received", "Date"];
use std::{
convert::{TryFrom, TryInto},
io::Read,
path::{Path, PathBuf},
};
/// We want to know which library was used to parse this email
#[derive(Debug, strum_macros::EnumString, strum_macros::ToString)]
pub enum ParserKind {
EmailParser,
Eml,
Rhymessage,
Meta,
}
/// Representation of an email
#[derive(Debug)]
pub struct EmailEntry {
pub path: PathBuf,
pub domain: String,
pub local_part: String,
pub datetime: chrono::DateTime<Utc>,
pub subject: String,
}
/// Raw representation of an email.
/// Contains the paths to the relevant files as well
/// as the name of the folder the email was in.
#[derive(Debug)]
pub struct RawEmailEntry {
folder_name: String,
eml_path: PathBuf,
meta_path: PathBuf,
}
impl RawEmailEntry {
pub fn new<P: AsRef<std::path::Path>>(path: P) -> RawEmailEntry {
let path = path.as_ref();
let folder_name = path
.parent()
.unwrap()
.file_name()
.unwrap()
.to_str()
.unwrap()
.to_owned();
let eml_path = path.to_path_buf();
let meta_path = path
.parent()
.unwrap()
.join(format!(
"{}.meta",
&path
.file_stem()
.unwrap()
.to_str()
.unwrap()
.replace(".eml", "")
))
.to_path_buf();
RawEmailEntry {
folder_name,
eml_path,
meta_path,
}
}
pub fn path(&self) -> PathBuf {
self.eml_path.clone()
}
}
pub struct Emails {
/// The current index in the Vec of emails
curr: usize,
/// The `Vec` with the `EmailEntry` entries
pub emails: Vec<RawEmailEntry>,
}
impl Emails {
pub fn new<A: AsRef<Path>>(folder: A) -> Result<Self> {
let folder = folder.as_ref();
if !folder.exists() {
bail!("Folder {} does not exist", &folder.display());
}
let emails = read_folders(&folder)?;
Ok(Emails { curr: 0, emails })
}
pub fn len(&self) -> usize {
self.emails.len()
}
}
//impl Iterator for Emails {
// // We can refer to this type using Self::Item
// type Item = Result<EmailEntry>;
//
// fn next(&mut self) -> Option<Self::Item> {
// let new_next = self.curr + 1;
// let entry = self.emails.get(self.curr)?;
// self.curr = new_next;
// let email = read_email(&entry);
// Some(email)
// }
//}
//impl ParallelIterator for Emails {
// type Item = Result<EmailEntry>;
//
// fn drive_unindexed<C>(self, consumer: C) -> C::Result
// where
// C: rayon::iter::plumbing::UnindexedConsumer<Self::Item>,
// {
// self.emails
// .into_par_iter()
// .map(|e| read_email(&e))
// .drive_unindexed(consumer)
// }
//}
fn read_folders(folder: &Path) -> Result<Vec<RawEmailEntry>> {
Ok(std::fs::read_dir(&folder)?
.into_iter()
.par_bridge()
.filter_map(|entry| {
let path = entry
.map_err(|e| tracing::error!("{} {:?}", &folder.display(), &e))
.ok()?
.path();
if !path.is_dir() {
return None;
}
read_emails(&path)
.map_err(|e| tracing::error!("{} {:?}", &path.display(), &e))
.ok()
})
.flatten()
.collect())
}
fn read_emails(folder_path: &Path) -> Result<Vec<RawEmailEntry>> {
Ok(std::fs::read_dir(folder_path)?
.into_iter()
.par_bridge()
.filter_map(|entry| {
let path = entry
.map_err(|e| tracing::error!("{} {:?}", &folder_path.display(), &e))
.ok()?
.path();
if path.is_dir() {
return None;
}
if !path.extension()?.eq("gz") {
return None;
}
Some(RawEmailEntry {
folder_name: folder_path.file_name()?.to_str()?.to_string(),
eml_path: path.clone(),
meta_path: path
.parent()?
.join(format!(
"{}.meta",
&path.file_stem()?.to_str()?.replace(".eml", "")
))
.to_path_buf(),
})
})
//.take(50)
.collect())
}
pub fn read_email(raw_entry: &RawEmailEntry) -> Result<EmailEntry> {
let content = unziped_content(&raw_entry.eml_path)?;
// We have to try multiple different email readers as each of them seems to fail in a different way
let email = parse_email_parser(&raw_entry, &content).or_else(|e| {
tracing::trace!("Parser Error: {:?}", &e);
parse_meta(&raw_entry, &content)
});
Ok(email.wrap_err_with(|| {
format!(
"{}\n{:?}",
String::from_utf8(content.clone()).unwrap(),
&raw_entry
)
})?)
}
fn parse_email_parser(raw_entry: &RawEmailEntry, content: &Vec<u8>) -> Result<EmailEntry> {
match email_parser::email::Email::parse(&content) {
Ok(email) => (&raw_entry.eml_path, email).try_into(),
Err(error) => {
//let content_string = String::from_utf8(content.clone())?;
//println!("{}|{}", &error, &raw_entry.eml_path.display());
Err(eyre!("Could not parse email: {:?}", &error))
}
}
}
fn parse_meta(raw_entry: &RawEmailEntry, _content: &Vec<u8>) -> Result<EmailEntry> {
use chrono::prelude::*;
#[derive(Deserialize)]
struct Meta {
msg_id: String,
internal_date: i64,
subject: String,
}
let content = std::fs::read_to_string(&raw_entry.meta_path)?;
let meta: Meta = serde_json::from_str(&content)?;
let parsed = email_address_parser::EmailAddress::parse(&meta.msg_id, None)
.ok_or(eyre!("Cannot Parse Address: {}", &meta.msg_id))?;
let datetime = Utc.timestamp(meta.internal_date, 0);
Ok(EmailEntry {
path: raw_entry.eml_path.to_path_buf(),
domain: parsed.get_domain().to_owned(),
local_part: parsed.get_local_part().to_owned(),
datetime,
subject: meta.subject.clone(),
})
}
impl<'a> TryFrom<(&PathBuf, email_parser::email::Email<'a>)> for EmailEntry {
type Error = eyre::Report;
fn try_from(content: (&PathBuf, email_parser::email::Email)) -> Result<Self, Self::Error> {
let (path, email) = content;
let domain = email.sender.address.domain.to_string();
let local_part = email.sender.address.local_part.to_string();
let datetime = emaildatetime_to_chrono(&email.date);
let subject = email.subject.map(|e| e.to_string()).unwrap_or_default();
Ok(EmailEntry {
path: path.to_path_buf(),
domain,
local_part,
datetime,
subject,
})
}
}
fn emaildatetime_to_chrono(dt: &email_parser::time::DateTime) -> chrono::DateTime<Utc> {
Utc.ymd(
dt.date.year as i32,
dt.date.month_number() as u32,
dt.date.day as u32,
)
.and_hms(
dt.time.time.hour as u32,
dt.time.time.minute as u32,
dt.time.time.second as u32,
)
}
fn unziped_content(path: &Path) -> Result<Vec<u8>> {
let reader = std::fs::File::open(path)?;
let mut decoder = GzDecoder::new(reader);
let mut buffer = Vec::new();
decoder.read_to_end(&mut buffer)?;
Ok(buffer)
}
/// Try to parse unstructed data into some sort of
/// email address
//fn parse_unstructured(data: &str) -> Option<eml_parser::eml::EmailAddress> {
// use lazy_static::lazy_static;
// use regex::Regex;
// lazy_static! {
// static ref EMAIL_RE: Regex = Regex::new(r#"(?:[a-z0-9!#$%&'*+/=?^_`{|}~-]+(?:\.[a-z0-9!#$%&'*+/=?^_`{|}~-]+)*|"(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21\x23-\x5b\x5d-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])*")@(?:(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?|\[(?:(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9]))\.){3}(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9])|[a-z0-9-]*[a-z0-9]:(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21-\x5a\x53-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])+)\])"#).unwrap();
// }
// lazy_static! {
// static ref RE: Regex = Regex::new("<(.*?)>").unwrap();
// }
// if let Some(capture) = RE.captures(&data).and_then(|f| f.get(1)) {
// Some(eml_parser::eml::EmailAddress::AddressOnly {
// address: capture.as_str().to_string(),
// })
// } else {
// let capture = EMAIL_RE.captures(&data).and_then(|f| f.get(0))?;
// Some(eml_parser::eml::EmailAddress::AddressOnly {
// address: capture.as_str().to_string(),
// })
// }
//}
//fn extract_address(from: &eml_parser::eml::EmailAddress) -> String {
// use eml_parser::eml::EmailAddress::*;
// match from {
// AddressOnly { address } => address.clone(),
// NameAndEmailAddress { name: _, address } => address.clone(),
// }
//}
#[cfg(test)]
mod tests {
use std::{path::PathBuf, str::FromStr};
use super::RawEmailEntry;
#[test]
//fn test_weird_email1() {
// let data = "No Reply <no-reply@evernote.com>, terhechte.5cffa@m.evernote.com";
// let address = super::parse_unstructured(&data).unwrap();
// assert_eq!(
// address,
// eml_parser::eml::EmailAddress::AddressOnly {
// address: "no-reply@evernote.com".to_owned()
// }
// );
//}
#[test]
//fn test_weird_email2() {
// let data = r#"info@sport-news.denReply-To:info"@sport-news.denX-Mailer:Sport-News.de"#;
// let address = super::parse_unstructured(&data).unwrap();
// assert_eq!(
// address,
// eml_parser::eml::EmailAddress::AddressOnly {
// address: "info@sport-news.den".to_owned()
// }
// );
//}
#[test]
fn test_weird_email3() {
crate::setup();
let eml_path = PathBuf::from_str(
"/Users/terhechte/Documents/gmail_backup/db/2014-09/1479692635489080640.eml.gz",
)
.unwrap();
let meta_path = PathBuf::from_str(
"/Users/terhechte/Documents/gmail_backup/db/2014-09/1479692635489080640.meta",
)
.unwrap();
let r = RawEmailEntry {
folder_name: "2014-09".to_owned(),
eml_path,
meta_path,
};
//let result = super::read_email(&r).expect("");
let content = Vec::new();
let result = super::parse_meta(&r, &content).expect("");
dbg!(&result);
}
#[test]
fn test_weird_email4() {
crate::setup();
let eml_path = PathBuf::from_str(
"/Users/terhechte/Documents/gmail_backup/db/2014-08/1475705321427236077.eml.gz",
)
.unwrap();
let meta_path = PathBuf::from_str(
"/Users/terhechte/Documents/gmail_backup/db/2014-08/1475705321427236077.meta",
)
.unwrap();
let r = RawEmailEntry {
folder_name: "2014-08".to_owned(),
eml_path,
meta_path,
};
let result = super::read_email(&r).expect("");
dbg!(&result);
}
}

View File

@ -1,35 +1,16 @@
use core::num; use eyre::Result;
use eyre::{bail, Result};
use rayon::prelude::*;
use std::io::prelude::*;
use std::thread::JoinHandle;
use std::{io, path::PathBuf};
use thiserror;
use tracing_subscriber::EnvFilter;
use crossbeam_channel;
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::{ use std::{
io::{stdout, Write}, io::{stdout, Write},
thread::sleep, thread::sleep,
time::Duration, time::Duration,
}; };
use tracing_subscriber::EnvFilter;
use crate::database::Database;
mod database; mod database;
mod filesystem; mod filesystem;
mod parse; mod parse;
mod types; mod types;
#[derive(Debug, thiserror::Error)]
enum GmailDBError {
#[error("Missing folder argument")]
MissingFolder,
}
fn main() -> Result<()> { fn main() -> Result<()> {
setup(); setup();
let arguments: Vec<String> = std::env::args().collect(); let arguments: Vec<String> = std::env::args().collect();