It works!

pull/2/head
Benedikt Terhechte 3 years ago
parent 2f94e5624a
commit e3a41158d0
No known key found for this signature in database
GPG Key ID: 4ED2131DAE9B5381

288
Cargo.lock generated

@ -49,51 +49,6 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "block-buffer"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0940dc441f31689269e10ac70eb1002a3a1d3ad1390e030043662eb7fe4688b"
dependencies = [
"block-padding",
"byte-tools",
"byteorder",
"generic-array",
]
[[package]]
name = "block-padding"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa79dedbb091f449f1f39e53edf88d5dbe95f895dae6135a8d7b881fb5af73f5"
dependencies = [
"byte-tools",
]
[[package]]
name = "bumpalo"
version = "3.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9df67f7bf9ef8498769f994239c45613ef0c5899415fb58e9add412d2c1a538"
[[package]]
name = "byte-tools"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3b5ca7a04898ad4bcd41c90c5285445ff5b791899bb1b0abdd2a2aa791211d7"
[[package]]
name = "byteorder"
version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "cfg-if"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
[[package]]
name = "cfg-if"
version = "1.0.0"
@ -113,23 +68,13 @@ dependencies = [
"winapi",
]
[[package]]
name = "console_error_panic_hook"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8d976903543e0c48546a91908f21588a680a8c8f984df9a5d69feccb2b2a211"
dependencies = [
"cfg-if 0.1.10",
"wasm-bindgen",
]
[[package]]
name = "crc32fast"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81156fece84ab6a9f2afdb109ce3ae577e42b1228441eded99bd77f627953b1a"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
]
[[package]]
@ -138,7 +83,7 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"crossbeam-utils",
]
@ -148,7 +93,7 @@ version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6455c0ca19f0d2fbf751b908d5c55c1f5cbc65e03c4225427254b46890bdde1e"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"crossbeam-epoch",
"crossbeam-utils",
]
@ -159,7 +104,7 @@ version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ec02e091aa634e2c3ada4a392989e7c3116673ef0ac5b72232439094d73b7fd"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"crossbeam-utils",
"lazy_static",
"memoffset",
@ -172,38 +117,16 @@ version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"lazy_static",
]
[[package]]
name = "digest"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3d0c8c8752312f9713efd397ff63acb9f85585afbf179282e720e7704954dd5"
dependencies = [
"generic-array",
]
[[package]]
name = "either"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"
[[package]]
name = "email-address-parser"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1840503695adadbe314fe6cedd297fbc406d13f0fe06fd28d02e499a17c2a599"
dependencies = [
"console_error_panic_hook",
"pest",
"pest_derive",
"quick-xml",
"wasm-bindgen",
]
[[package]]
name = "email-parser"
version = "0.5.0"
@ -223,12 +146,6 @@ dependencies = [
"once_cell",
]
[[package]]
name = "fake-simd"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed"
[[package]]
name = "fallible-iterator"
version = "0.2.0"
@ -247,28 +164,19 @@ version = "1.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e6988e897c1c9c485f43b47a529cef42fde0547f9d8d41a7062518f1d8fc53f"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"crc32fast",
"libc",
"miniz_oxide",
]
[[package]]
name = "generic-array"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffdf9f34f1447443d37393cc6c2b8313aebddcd96906caf34e54c68d8e57d7bd"
dependencies = [
"typenum",
]
[[package]]
name = "getrandom"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"libc",
"wasi",
]
@ -279,7 +187,6 @@ version = "0.1.0"
dependencies = [
"chrono",
"crossbeam-channel",
"email-address-parser",
"email-parser",
"eyre",
"flate2",
@ -289,8 +196,6 @@ dependencies = [
"rusqlite",
"serde",
"serde_json",
"strum",
"strum_macros",
"thiserror",
"tracing",
"tracing-subscriber",
@ -314,15 +219,6 @@ dependencies = [
"hashbrown",
]
[[package]]
name = "heck"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c"
dependencies = [
"unicode-segmentation",
]
[[package]]
name = "hermit-abi"
version = "0.1.19"
@ -372,15 +268,9 @@ version = "0.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
]
[[package]]
name = "maplit"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d"
[[package]]
name = "matchers"
version = "0.0.1"
@ -450,55 +340,6 @@ version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "692fcb63b64b1758029e0a96ee63e049ce8c5948587f2f7208df04625e5f6b56"
[[package]]
name = "opaque-debug"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2839e79665f131bdb5782e51f2c6c9599c133c6098982a54c794358bf432529c"
[[package]]
name = "pest"
version = "2.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10f4872ae94d7b90ae48754df22fd42ad52ce740b8f370b03da4835417403e53"
dependencies = [
"ucd-trie",
]
[[package]]
name = "pest_derive"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "833d1ae558dc601e9a60366421196a8d94bc0ac980476d0b67e1d0988d72b2d0"
dependencies = [
"pest",
"pest_generator",
]
[[package]]
name = "pest_generator"
version = "2.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99b8db626e31e5b81787b9783425769681b347011cc59471e33ea46d2ea0cf55"
dependencies = [
"pest",
"pest_meta",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "pest_meta"
version = "2.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54be6e404f5317079812fc8f9f5279de376d8856929e21c184ecf6bbd692a11d"
dependencies = [
"maplit",
"pest",
"sha-1",
]
[[package]]
name = "phf"
version = "0.10.0"
@ -576,15 +417,6 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "quick-xml"
version = "0.18.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3cc440ee4802a86e357165021e3e255a9143724da31db1e2ea540214c96a0f82"
dependencies = [
"memchr",
]
[[package]]
name = "quote"
version = "1.0.9"
@ -744,18 +576,6 @@ dependencies = [
"serde",
]
[[package]]
name = "sha-1"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7d94d0bede923b3cea61f3f1ff57ff8cdfd77b400fb8f9998949e0cf04163df"
dependencies = [
"block-buffer",
"digest",
"fake-simd",
"opaque-debug",
]
[[package]]
name = "sharded-slab"
version = "0.1.3"
@ -777,24 +597,6 @@ version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ecab6c735a6bb4139c0caafd0cc3635748bbb3acf4550e8138122099251f309"
[[package]]
name = "strum"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aaf86bbcfd1fa9670b7a129f64fc0c9fcbbfe4f1bc4210e9e98fe71ffc12cde2"
[[package]]
name = "strum_macros"
version = "0.21.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d06aaeeee809dbc59eb4556183dd927df67db1540de5be8d3ec0b6636358a5ec"
dependencies = [
"heck",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "syn"
version = "1.0.77"
@ -867,7 +669,7 @@ version = "0.1.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84f96e095c0c82419687c20ddf5cb3eadb61f4e1405923c9dc8e53a1adacbda8"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
@ -936,24 +738,6 @@ dependencies = [
"tracing-serde",
]
[[package]]
name = "typenum"
version = "1.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b63708a265f51345575b27fe43f9500ad611579e764c79edbc2037b1121959ec"
[[package]]
name = "ucd-trie"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56dee185309b50d1f11bfedef0fe6d036842e3fb77413abef29f8f8d1c5d4c1c"
[[package]]
name = "unicode-segmentation"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8895849a949e7845e06bd6dc1aa51731a103c42707010a5b591c0038fb73385b"
[[package]]
name = "unicode-xid"
version = "0.2.2"
@ -978,60 +762,6 @@ version = "0.10.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f"
[[package]]
name = "wasm-bindgen"
version = "0.2.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "632f73e236b219150ea279196e54e610f5dbafa5d61786303d4da54f84e47fce"
dependencies = [
"cfg-if 1.0.0",
"wasm-bindgen-macro",
]
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a317bf8f9fba2476b4b2c85ef4c4af8ff39c3c7f0cdfeed4f82c34a880aa837b"
dependencies = [
"bumpalo",
"lazy_static",
"log",
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d56146e7c495528bf6587663bea13a8eb588d39b36b679d83972e1a2dbbdacf9"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
]
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7803e0eea25835f8abdc585cd3021b3deb11543c6fe226dcd30b228857c5c5ab"
dependencies = [
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0237232789cf037d5480773fe568aac745bfe2afbc11a863e97901780a6b47cc"
[[package]]
name = "winapi"
version = "0.3.9"

@ -16,15 +16,12 @@ flate2 = "1.0.22"
email-parser = { git = "https://github.com/terhechte/email-parser", features = ["sender", "from", "date", "subject", "mime", "allow-duplicate-headers"]}
rayon = "1.5.1"
chrono = "0.4.19"
email-address-parser = "1.0.3"
lazy_static = "*"
serde_json = "*"
serde = { version = "*", features = ["derive"]}
strum = "0.21"
strum_macros = "0.21"
crossbeam-channel = "0.5.1"
[profile.release]
lto = "fat"
codegen-units = 1
panic = "abort"
#[profile.release]
#lto = "fat"
#codegen-units = 1
#panic = "abort"

@ -1,6 +1,9 @@
use std::{path::PathBuf, thread::JoinHandle};
use std::{
path::{Path, PathBuf},
thread::JoinHandle,
};
use crate::emails::EmailEntry;
use crate::types::EmailEntry;
use chrono::Datelike;
use crossbeam_channel::{unbounded, Receiver, Sender};
use eyre::{Report, Result};
@ -19,9 +22,9 @@ pub enum DBMessage {
impl Database {
/// Create a in-memory db.
pub fn new() -> Result<Self> {
pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
//let mut connection = Connection::open_in_memory()?;
let connection = Connection::open("/tmp/db.sql")?;
let connection = Connection::open(path.as_ref())?;
connection
.pragma_update(None, "journal_mode", &"memory")
.unwrap();
@ -37,7 +40,23 @@ impl Database {
})
}
pub fn process(&mut self) -> (Sender<DBMessage>, JoinHandle<Result<usize>>) {
/// Begin the data import.
/// This will consume the `Database`. A new one has to be opened
/// afterwards in order to support multi-threading.
/// Returns an input `Sender` and a `JoinHandle`.
/// The `Sender` is used to submit work to the database via `DBMessage`
/// cases. The `JoinHandle` is used to wait for database completion.
///
/// # Examples
///
/// ```
/// let db = Database::new("db.sqlite").unwrap();
/// let (sender, handle) = db.import();
/// sender.send(DBMessage::Mail(m1)).unwrap();
/// sender.send(DBMessage::Mail(m2)).unwrap();
/// handle.join().unwrap();
/// ```
pub fn import(mut self) -> (Sender<DBMessage>, JoinHandle<Result<usize>>) {
let (sender, receiver) = unbounded();
let mut connection = self.connection.take().unwrap();
let handle = std::thread::spawn(move || {

@ -0,0 +1,135 @@
use eyre::{bail, eyre, Result};
use flate2::read::GzDecoder;
use rayon::prelude::*;
use std::io::Read;
use std::path::{Path, PathBuf};
use crate::types::Config;
/// Raw representation of an email.
/// Contains the paths to the relevant files as well
/// as the name of the folder the email was in.
#[derive(Debug)]
pub struct RawEmailEntry {
folder_name: String,
eml_path: PathBuf,
gmail_meta_path: Option<PathBuf>,
is_compressed: bool,
size: u64,
}
impl RawEmailEntry {
pub fn path(&self) -> &Path {
self.eml_path.as_path()
}
pub fn read(&self) -> Result<Vec<u8>> {
if self.is_compressed {
let reader = std::fs::File::open(&self.eml_path)?;
let mut decoder = GzDecoder::new(reader);
let mut buffer = Vec::new();
decoder.read_to_end(&mut buffer)?;
Ok(buffer)
} else {
std::fs::read(&self.eml_path).map_err(|e| eyre!("IO Error: {}", &e))
}
}
pub fn has_gmail_meta(&self) -> bool {
self.gmail_meta_path.is_some()
}
pub fn read_gmail_meta(&self) -> Option<Result<Vec<u8>>> {
match &self.gmail_meta_path {
Some(p) => Some(std::fs::read(p).map_err(|e| eyre!("IO Error: {}", &e))),
None => None,
}
}
}
impl RawEmailEntry {
fn new<P: AsRef<std::path::Path>>(path: P) -> Option<RawEmailEntry> {
let path = path.as_ref();
let stem = path.file_stem()?.to_str()?;
let name = path.file_name()?.to_str()?;
let is_eml_gz = name.ends_with(".eml.gz");
let is_eml = name.ends_with(".eml");
if !is_eml_gz && !is_eml {
return None;
}
let is_compressed = is_eml_gz;
let folder_name = path.parent()?.file_name()?.to_str()?.to_owned();
let eml_path = path.to_path_buf();
let file_metadata = path.metadata().ok()?;
// Build a meta path
let meta_path = path
.parent()?
.join(format!("{}.meta", stem.replace(".eml", "")))
.to_path_buf();
// Only embed it, if it exists
let gmail_meta_path = if meta_path.exists() {
Some(meta_path)
} else {
None
};
tracing::trace!(
"Email [c?: {}] {} {:?}",
is_compressed,
eml_path.display(),
gmail_meta_path
);
Some(RawEmailEntry {
folder_name,
eml_path,
gmail_meta_path,
is_compressed,
size: file_metadata.len(),
})
}
}
pub fn read_emails(config: &Config) -> Result<Vec<RawEmailEntry>> {
let folder = config.emails_folder_path.as_path();
if !folder.exists() {
bail!("Folder {} does not exist", &folder.display());
}
Ok(std::fs::read_dir(&folder)?
.into_iter()
.par_bridge()
.filter_map(|entry| {
let path = entry
.map_err(|e| tracing::error!("{} {:?}", &folder.display(), &e))
.ok()?
.path();
if !path.is_dir() {
return None;
}
read_folder(&path)
.map_err(|e| tracing::error!("{} {:?}", &path.display(), &e))
.ok()
})
.flatten()
.collect())
}
fn read_folder(path: &Path) -> Result<Vec<RawEmailEntry>> {
Ok(std::fs::read_dir(path)?
.into_iter()
.par_bridge()
.filter_map(|entry| {
let path = entry
.map_err(|e| tracing::error!("{} {:?}", &path.display(), &e))
.ok()?
.path();
if path.is_dir() {
return None;
}
RawEmailEntry::new(path)
})
//.take(50)
.collect())
}

@ -20,7 +20,9 @@ use std::{
use crate::database::Database;
mod database;
mod emails;
mod filesystem;
mod parse;
mod types;
#[derive(Debug, thiserror::Error)]
enum GmailDBError {
@ -31,42 +33,47 @@ enum GmailDBError {
fn main() -> Result<()> {
setup();
let arguments: Vec<String> = std::env::args().collect();
let folder = arguments.get(1).ok_or(GmailDBError::MissingFolder)?;
let (receiver, handle) = process_folder(&folder)?;
let folder = arguments
.get(1)
.unwrap_or_else(|| panic!("Missing folder path argument"));
let database = arguments
.get(2)
.unwrap_or_else(|| panic!("Missing database path argument"));
let config = crate::types::Config::new(database, folder);
println!("Collecting Mails...");
let emails = filesystem::read_emails(&config)?;
println!("Begin Parsing Mails...");
let (receiver, handle) = crate::parse::emails::parse_emails(&config, emails)?;
let mut stdout = stdout();
let mut total: Option<usize> = None;
let mut counter = 0;
let mut done = false;
println!("Collecting Mails...");
while done == false {
if let Some(total) = total {
for entry in receiver.try_iter() {
let value = match entry {
Err(e) => {
panic!("{:?}", &e);
}
Ok(None) => {
done = true;
0
}
Ok(Some(n)) => n,
};
counter += value;
}
// FIXME: Bring back
//print!("\rProcessing {}/{}...", counter, total);
} else {
match receiver.recv()? {
'outer: while done == false {
for entry in receiver.try_iter() {
let message = match entry {
Ok(n) => n,
Err(e) => {
panic!("{:?}", &e);
}
Ok(Some(n)) => {
total = Some(n);
println!("Processing Error: {:?}", &e);
break 'outer;
}
Ok(None) => done = true,
};
use parse::emails::ParseMessage;
match message {
ParseMessage::Done => done = true,
ParseMessage::Total(n) => total = Some(n),
ParseMessage::ParsedOne => counter += 1,
};
}
if let Some(total) = total {
print!("\rProcessing {}/{}...", counter, total);
}
stdout.flush().unwrap();
sleep(Duration::from_millis(20));
}
@ -85,81 +92,6 @@ fn main() -> Result<()> {
Ok(())
}
fn process_email(path: &str) -> Result<()> {
let entry = emails::RawEmailEntry::new(&path);
let mail = emails::read_email(&entry).unwrap();
Ok(())
}
enum FolderProgress {
Total(usize),
Parsed,
}
type ProcessReceiver = crossbeam_channel::Receiver<Result<Option<usize>>>;
fn process_folder(folder: &str) -> Result<(ProcessReceiver, JoinHandle<Result<usize>>)> {
// We return the status
let (tx, rx) = crossbeam_channel::bounded(100);
let folder = folder.to_owned();
let handle = std::thread::spawn(move || {
let emails = emails::Emails::new(&folder)?;
//{
// Ok(n) => n,
// Err(e) => {
// tx.send(Err(e)).unwrap();
// return;
// }
//};
let total = emails.len();
tx.send(Ok(Some(total))).unwrap();
println!("Done Loading {} emails", &total);
let mut database = Database::new()?;
let (sender, handle) = database.process();
use database::DBMessage;
emails
.emails
.par_iter()
//.iter()
.map(|raw_mail| (raw_mail.path(), emails::read_email(&raw_mail)))
.for_each(|(path, entry)| {
tx.send(Ok(Some(1))).unwrap();
if let Err(e) = match entry {
Ok(mail) => sender.send(DBMessage::Mail(mail)),
Err(e) => sender.send(DBMessage::Error(e, path)),
} {
tracing::info!("Error Inserting into Database: {:?}", &e);
}
});
sender.send(database::DBMessage::Done).unwrap();
//while !sender.is_empty() {
// println!("left in sqlite: {}", sender.len());
// sleep(Duration::from_millis(20));
//}
tracing::trace!("Send none");
tx.send(Ok(None)).unwrap();
//sleep(Duration::from_millis(200000));
tracing::info!("Waiting for SQLite to finish");
match handle.join() {
Ok(Ok(count)) => Ok(count),
Ok(Err(e)) => Err(e),
Err(e) => Err(eyre::eyre!("Join Error: {:?}", &e)),
}
//handle
// .join()
// .map_err(|op| )
});
Ok((rx, handle))
}
fn setup() {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "info")

@ -0,0 +1,142 @@
use crate::database::{DBMessage, Database};
use crate::filesystem::RawEmailEntry;
use crate::types::{Config, EmailEntry};
use chrono::prelude::*;
use eyre::{bail, eyre, Result};
use rayon::prelude::*;
use std::thread::JoinHandle;
use std::{
convert::{TryFrom, TryInto},
path::Path,
};
pub enum ParseMessage {
Total(usize),
ParsedOne,
Done,
}
pub type ProcessReceiver = crossbeam_channel::Receiver<Result<ParseMessage>>;
pub fn parse_emails(
config: &Config,
emails: Vec<RawEmailEntry>,
) -> Result<(ProcessReceiver, JoinHandle<Result<usize>>)> {
// This channel is used to communicate the parsing progress.
let (tx, rx) = crossbeam_channel::bounded(100);
let config = config.clone();
// Spawn all work into a new thread so it doesn't block the main thread.
let handle = std::thread::spawn(move || {
let total = emails.len();
tracing::info!("Loaded {} emails", &total);
// First, communicate the total amount of mails received
if let Err(e) = tx.send(Ok(ParseMessage::Total(total))) {
bail!("Channel Failure {:?}", &e);
}
// Create a new database connection
let database = Database::new(config.database_path)?;
// Consume the connection to begin the import. It will return the `handle` to use for
// waiting for the database to finish importing, and the `sender` to submit work.
let (sender, handle) = database.import();
// Iterate over the mails..
emails
// in paralell..
.par_iter()
// parsing them
.map(|raw_mail| (raw_mail.path(), parse_email(&raw_mail)))
// and inserting them into SQLite
.for_each(|(path, entry)| {
if let Err(e) = tx.send(Ok(ParseMessage::ParsedOne)) {
tracing::error!("Channel Failure: {:?}", &e);
}
if let Err(e) = match entry {
Ok(mail) => sender.send(DBMessage::Mail(mail)),
Err(e) => sender.send(DBMessage::Error(e, path.to_path_buf())),
} {
tracing::error!("Error Inserting into Database: {:?}", &e);
}
});
// Tell SQLite there's no more work coming. This will exit the listening loop
if let Err(e) = sender.send(DBMessage::Done) {
bail!("Channel Failure {:?}", &e);
}
// Wait for SQLite to finish parsing
tracing::info!("Waiting for SQLite to finish");
let output = match handle.join() {
Ok(Ok(count)) => Ok(count),
Ok(Err(e)) => Err(e),
Err(e) => Err(eyre::eyre!("Join Error: {:?}", &e)),
};
// Tell the caller that we're done processing. This will allow leaving the
// display loop
if let Err(e) = tx.send(Ok(ParseMessage::Done)) {
bail!("Channel Failure {:?}", &e);
}
output
});
Ok((rx, handle))
}
fn parse_email(raw_entry: &RawEmailEntry) -> Result<EmailEntry> {
let content = raw_entry.read()?;
parse_email_parser(&raw_entry, &content)
}
fn parse_email_parser(raw_entry: &RawEmailEntry, content: &Vec<u8>) -> Result<EmailEntry> {
match email_parser::email::Email::parse(&content) {
Ok(email) => (raw_entry.path(), email).try_into(),
Err(error) => {
//let content_string = String::from_utf8(content.clone())?;
//println!("{}|{}", &error, &raw_entry.eml_path.display());
Err(eyre!(
"Could not parse email: {:?} [{}]",
&error,
raw_entry.path().display()
))
}
}
}
impl<'a> TryFrom<(&Path, email_parser::email::Email<'a>)> for EmailEntry {
type Error = eyre::Report;
fn try_from(content: (&Path, email_parser::email::Email)) -> Result<Self, Self::Error> {
let (path, email) = content;
let domain = email.sender.address.domain.to_string();
let local_part = email.sender.address.local_part.to_string();
let datetime = emaildatetime_to_chrono(&email.date);
let subject = email.subject.map(|e| e.to_string()).unwrap_or_default();
Ok(EmailEntry {
path: path.to_path_buf(),
domain,
local_part,
datetime,
subject,
})
}
}
fn emaildatetime_to_chrono(dt: &email_parser::time::DateTime) -> chrono::DateTime<Utc> {
Utc.ymd(
dt.date.year as i32,
dt.date.month_number() as u32,
dt.date.day as u32,
)
.and_hms(
dt.time.time.hour as u32,
dt.time.time.minute as u32,
dt.time.time.second as u32,
)
}

@ -0,0 +1,35 @@
use chrono::prelude::*;
use eyre::{bail, Result};
use serde::Deserialize;
use serde_json;
use crate::filesystem::RawEmailEntry;
#[derive(Deserialize, Debug, Clone)]
pub struct Meta {
pub msg_id: String,
pub subject: String,
pub labels: Vec<String>,
pub flags: Vec<String>,
internal_date: i64,
#[serde(skip, default = "Utc::now")]
pub created: DateTime<Utc>,
}
impl Meta {
pub fn is_seen(&self) -> bool {
self.labels.contains(&"\\seen".to_owned())
}
}
fn parse_meta(raw_entry: &RawEmailEntry, _content: &Vec<u8>) -> Result<Meta> {
let content = match raw_entry.read_gmail_meta() {
None => bail!("No Gmail Meta Information Available"),
Some(content) => content?,
};
let mut meta: Meta = serde_json::from_slice(&content)?;
meta.created = Utc.timestamp(meta.internal_date, 0);
Ok(meta)
}

@ -0,0 +1,2 @@
pub mod emails;
pub mod gmail_meta;

@ -0,0 +1,30 @@
use std::path::{Path, PathBuf};
#[derive(Debug, Clone)]
pub struct Config {
pub database_path: PathBuf,
pub emails_folder_path: PathBuf,
}
impl Config {
pub fn new<A: AsRef<Path>>(db: A, mails: A) -> Self {
let database_path = db.as_ref().to_path_buf();
if database_path.is_dir() {
panic!(
"Database Path can't be a directory: {}",
&database_path.display()
);
}
let emails_folder_path = mails.as_ref().to_path_buf();
if !emails_folder_path.is_dir() {
panic!(
"Emails Folder Path is not a directory: {}",
&emails_folder_path.display()
);
}
Config {
database_path,
emails_folder_path,
}
}
}

@ -0,0 +1,12 @@
use chrono::prelude::*;
use std::path::PathBuf;
/// Representation of an email
#[derive(Debug)]
pub struct EmailEntry {
pub path: PathBuf,
pub domain: String,
pub local_part: String,
pub datetime: chrono::DateTime<Utc>,
pub subject: String,
}

@ -0,0 +1,5 @@
mod config;
mod email;
pub use config::Config;
pub use email::EmailEntry;
Loading…
Cancel
Save