Terrible code, but it works properly now. Next up a major clenaup

pull/2/head
Benedikt Terhechte 3 years ago
parent 20d7a5f5de
commit 2f94e5624a

@ -22,4 +22,9 @@ serde_json = "*"
serde = { version = "*", features = ["derive"]}
strum = "0.21"
strum_macros = "0.21"
crossbeam-channel = "0.5.1"
crossbeam-channel = "0.5.1"
[profile.release]
lto = "fat"
codegen-units = 1
panic = "abort"

@ -0,0 +1,11 @@
```
Read: 632383, Processed: 632383, Inserted: 632382
________________________________________________________
Executed in 147.18 secs fish external
usr time 91.38 secs 157.00 micros 91.38 secs
sys time 193.30 secs 899.00 micros 193.30 secs
```
4301 Emails per Second!

@ -1,10 +1,10 @@
use std::path::PathBuf;
use std::{path::PathBuf, thread::JoinHandle};
use crate::emails::EmailEntry;
use chrono::Datelike;
use crossbeam_channel::{unbounded, Receiver, Sender};
use eyre::{Report, Result};
use rusqlite::{self, params, Connection, Error, Row};
use rusqlite::{self, params, Connection, Error, Row, Statement, Transaction};
#[derive(Debug)]
pub struct Database {
@ -21,7 +21,13 @@ impl Database {
/// Create a in-memory db.
pub fn new() -> Result<Self> {
//let mut connection = Connection::open_in_memory()?;
let mut connection = Connection::open("/tmp/db.sql")?;
let connection = Connection::open("/tmp/db.sql")?;
connection
.pragma_update(None, "journal_mode", &"memory")
.unwrap();
connection
.pragma_update(None, "synchronous", &"OFF")
.unwrap();
Self::create_tables(&connection)?;
//connection.trace(Some(|n| {
// println!("SQL: {}", &n);
@ -31,28 +37,59 @@ impl Database {
})
}
pub fn process(&mut self) -> Sender<DBMessage> {
pub fn process(&mut self) -> (Sender<DBMessage>, JoinHandle<Result<usize>>) {
let (sender, receiver) = unbounded();
let connection = self.connection.take().unwrap();
std::thread::spawn(move || loop {
let next = match receiver.recv() {
Ok(n) => n,
Err(e) => {
println!("Receiver error: {:?}", &e);
std::process::exit(0);
let mut connection = self.connection.take().unwrap();
let handle = std::thread::spawn(move || {
let mut counter = 0;
{
let transaction = connection.transaction().unwrap();
let sql = "INSERT INTO emails (path, domain, local_part, year, month, day, subject) VALUES (?, ?, ?, ?, ?, ?, ?)";
{
let mut prepared = transaction.prepare(sql).unwrap();
loop {
let next = match receiver.recv() {
Ok(n) => n,
Err(e) => {
println!("Receiver error: {:?}", &e);
panic!("should not happen");
}
};
let result = match next {
DBMessage::Mail(mail) => {
counter += 1;
insert_mail(&transaction, &mut prepared, &mail)
}
DBMessage::Error(report, path) => {
insert_error(&transaction, &report, &path)
}
DBMessage::Done => {
tracing::trace!("Received DBMessage::Done");
break;
}
};
result.unwrap();
//if let Err(e) = result {
// tracing::error!("SQL Error: {:?}", &e);
//}
}
}
};
let result = match next {
DBMessage::Mail(mail) => insert_mail(&connection, &mail),
DBMessage::Error(report, path) => insert_error(&connection, &report, &path),
DBMessage::Done => break,
};
result.unwrap();
//if let Err(e) = result {
// tracing::error!("SQL Error: {:?}", &e);
//}
if let Err(e) = transaction.commit() {
return Err(eyre::eyre!("Transaction Error: {:?}", &e));
}
}
let mut c = connection;
loop {
tracing::trace!("Attempting close");
match c.close() {
Ok(_n) => break,
Err((a, _b)) => c = a,
}
}
tracing::trace!("Finished SQLITE: {}", &counter);
Ok(counter)
});
sender
(sender, handle)
}
fn create_tables(connection: &Connection) -> Result<()> {
@ -64,7 +101,6 @@ CREATE TABLE IF NOT EXISTS emails (
year INTEGER NOT NULL,
month INTEGER NOT NULL,
day INTEGER NOT NULL,
kind TEXT NOT NULL,
subject TEXT NOT NULL
);"#;
connection.execute(&emails_table, params![])?;
@ -78,26 +114,27 @@ CREATE TABLE IF NOT EXISTS errors (
}
}
fn insert_mail(connection: &Connection, entry: &EmailEntry) -> Result<()> {
fn insert_mail(
transaction: &Transaction,
statement: &mut Statement,
entry: &EmailEntry,
) -> Result<()> {
let path = entry.path.display().to_string();
let domain = &entry.domain;
let local_part = &entry.local_part;
let year = entry.datetime.date().year();
let month = entry.datetime.date().month();
let day = entry.datetime.date().day();
let kind = entry.parser.to_string();
let subject = entry.subject.to_string();
let sql = "INSERT INTO emails (path, domain, local_part, year, month, day, kind, subject) VALUES (?, ?, ?, ?, ?, ?, ?, ?)";
let mut prepared = connection.prepare(sql)?;
prepared.execute(params![
path, domain, local_part, year, month, day, kind, subject
])?;
let r = statement.execute(params![path, domain, local_part, year, month, day, subject])?;
tracing::trace!("Insert Mail [{}] {}", r, &path);
Ok(())
}
fn insert_error(connection: &Connection, message: &Report, path: &PathBuf) -> Result<()> {
fn insert_error(transaction: &Transaction, message: &Report, path: &PathBuf) -> Result<()> {
let sql = "INSERT INTO errors (message, path) VALUES (?, ?)";
let mut prepared = connection.prepare(sql)?;
tracing::trace!("Insert Error {}", &path.display());
let mut prepared = transaction.prepare(sql)?;
prepared.execute(params![message.to_string(), path.display().to_string()])?;
Ok(())
}

@ -33,7 +33,6 @@ pub struct EmailEntry {
pub domain: String,
pub local_part: String,
pub datetime: chrono::DateTime<Utc>,
pub parser: ParserKind,
pub subject: String,
}
@ -180,6 +179,7 @@ fn read_emails(folder_path: &Path) -> Result<Vec<RawEmailEntry>> {
.to_path_buf(),
})
})
//.take(50)
.collect())
}
@ -206,7 +206,7 @@ fn parse_email_parser(raw_entry: &RawEmailEntry, content: &Vec<u8>) -> Result<Em
Err(error) => {
//let content_string = String::from_utf8(content.clone())?;
//println!("{}|{}", &error, &raw_entry.eml_path.display());
Err(eyre!("Could not `email_parser` email:\n{:?}", &error))
Err(eyre!("Could not parse email: {:?}", &error))
}
}
}
@ -229,7 +229,6 @@ fn parse_meta(raw_entry: &RawEmailEntry, _content: &Vec<u8>) -> Result<EmailEntr
domain: parsed.get_domain().to_owned(),
local_part: parsed.get_local_part().to_owned(),
datetime,
parser: ParserKind::Meta,
subject: meta.subject.clone(),
})
}
@ -248,7 +247,6 @@ impl<'a> TryFrom<(&PathBuf, email_parser::email::Email<'a>)> for EmailEntry {
domain,
local_part,
datetime,
parser: ParserKind::EmailParser,
subject,
})
}

@ -2,6 +2,7 @@ use core::num;
use eyre::{bail, Result};
use rayon::prelude::*;
use std::io::prelude::*;
use std::thread::JoinHandle;
use std::{io, path::PathBuf};
use thiserror;
use tracing_subscriber::EnvFilter;
@ -31,7 +32,7 @@ fn main() -> Result<()> {
setup();
let arguments: Vec<String> = std::env::args().collect();
let folder = arguments.get(1).ok_or(GmailDBError::MissingFolder)?;
let receiver = process_folder(&folder)?;
let (receiver, handle) = process_folder(&folder)?;
let mut stdout = stdout();
let mut total: Option<usize> = None;
@ -53,7 +54,8 @@ fn main() -> Result<()> {
};
counter += value;
}
print!("\rProcessing {}/{}...", counter, total);
// FIXME: Bring back
//print!("\rProcessing {}/{}...", counter, total);
} else {
match receiver.recv()? {
Err(e) => {
@ -68,8 +70,18 @@ fn main() -> Result<()> {
stdout.flush().unwrap();
sleep(Duration::from_millis(20));
}
let result = handle.join().map_err(|op| eyre::eyre!("{:?}", &op))??;
println!(
"Read: {}, Processed: {}, Inserted: {}",
total.unwrap_or_default(),
counter,
result
);
println!();
//process_email(&folder)?;
tracing::trace!("Exit Program");
Ok(())
}
@ -84,28 +96,32 @@ enum FolderProgress {
Parsed,
}
fn process_folder(folder: &str) -> Result<crossbeam_channel::Receiver<Result<Option<usize>>>> {
type ProcessReceiver = crossbeam_channel::Receiver<Result<Option<usize>>>;
fn process_folder(folder: &str) -> Result<(ProcessReceiver, JoinHandle<Result<usize>>)> {
// We return the status
let (tx, rx) = crossbeam_channel::bounded(100);
let folder = folder.to_owned();
std::thread::spawn(move || {
let emails = match emails::Emails::new(&folder) {
Ok(n) => n,
Err(e) => {
tx.send(Err(e)).unwrap();
return;
}
};
let handle = std::thread::spawn(move || {
let emails = emails::Emails::new(&folder)?;
//{
// Ok(n) => n,
// Err(e) => {
// tx.send(Err(e)).unwrap();
// return;
// }
//};
let total = emails.len();
tx.send(Ok(Some(total))).unwrap();
println!("Done Loading {} emails", &total);
let mut database = Database::new().expect("Expect a valid database");
let mut database = Database::new()?;
let sender = database.process();
let (sender, handle) = database.process();
use database::DBMessage;
emails
@ -124,18 +140,29 @@ fn process_folder(folder: &str) -> Result<crossbeam_channel::Receiver<Result<Opt
});
sender.send(database::DBMessage::Done).unwrap();
while !sender.is_empty() {
println!("left in sqlite: {}", sender.len());
sleep(Duration::from_millis(20));
}
//while !sender.is_empty() {
// println!("left in sqlite: {}", sender.len());
// sleep(Duration::from_millis(20));
//}
tracing::trace!("Send none");
tx.send(Ok(None)).unwrap();
//sleep(Duration::from_millis(200000));
tracing::info!("Waiting for SQLite to finish");
match handle.join() {
Ok(Ok(count)) => Ok(count),
Ok(Err(e)) => Err(e),
Err(e) => Err(eyre::eyre!("Join Error: {:?}", &e)),
}
//handle
// .join()
// .map_err(|op| )
});
Ok(rx)
Ok((rx, handle))
}
fn setup() {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "error")
std::env::set_var("RUST_LOG", "info")
}
tracing_subscriber::fmt::fmt()
.with_env_filter(EnvFilter::from_default_env())

Loading…
Cancel
Save