Need a bigger refactoring for speed

test_work_stealing
Benedikt Terhechte 3 years ago
parent e97ec16a04
commit 20d7a5f5de

141
Cargo.lock generated

@ -207,18 +207,12 @@ dependencies = [
[[package]]
name = "email-parser"
version = "0.5.0"
source = "git+https://github.com/terhechte/email-parser#da8582a266385b8f1b2f7e150ac891e3f850d41a"
dependencies = [
"textcode",
"timezone-abbreviations",
]
[[package]]
name = "eml-parser"
version = "0.1.2"
dependencies = [
"regex",
]
[[package]]
name = "eyre"
version = "0.6.5"
@ -287,13 +281,11 @@ dependencies = [
"crossbeam-channel",
"email-address-parser",
"email-parser",
"eml-parser",
"eyre",
"flate2",
"lazy_static",
"rayon",
"regex",
"rhymessage",
"rusqlite",
"serde",
"serde_json",
@ -423,17 +415,6 @@ dependencies = [
"autocfg",
]
[[package]]
name = "named_tuple"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "040c7794ae549f63b67c97aa325278ca37c8260226147a1be50d9af96f292430"
dependencies = [
"proc-macro2 0.4.30",
"quote 0.6.13",
"syn 0.15.44",
]
[[package]]
name = "num-integer"
version = "0.1.44"
@ -502,9 +483,9 @@ checksum = "99b8db626e31e5b81787b9783425769681b347011cc59471e33ea46d2ea0cf55"
dependencies = [
"pest",
"pest_meta",
"proc-macro2 1.0.29",
"quote 1.0.9",
"syn 1.0.77",
"proc-macro2",
"quote",
"syn",
]
[[package]]
@ -548,9 +529,9 @@ dependencies = [
"phf_generator",
"phf_shared",
"proc-macro-hack",
"proc-macro2 1.0.29",
"quote 1.0.9",
"syn 1.0.77",
"proc-macro2",
"quote",
"syn",
]
[[package]]
@ -586,22 +567,13 @@ version = "0.5.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5"
[[package]]
name = "proc-macro2"
version = "0.4.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf3d2011ab5c909338f7887f4fc896d35932e29146c12c8d01da6b22a80ba759"
dependencies = [
"unicode-xid 0.1.0",
]
[[package]]
name = "proc-macro2"
version = "1.0.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9f5105d4fdaab20335ca9565e106a5d9b82b6219b5ba735731124ac6711d23d"
dependencies = [
"unicode-xid 0.2.2",
"unicode-xid",
]
[[package]]
@ -613,22 +585,13 @@ dependencies = [
"memchr",
]
[[package]]
name = "quote"
version = "0.6.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ce23b6b870e8f94f81fb0a363d65d86675884b34a09043c81e5562f11c1f8e1"
dependencies = [
"proc-macro2 0.4.30",
]
[[package]]
name = "quote"
version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3d0b9745dc2debf507c8422de05d7226cc1f0644216dfdfead988f9b1ab32a7"
dependencies = [
"proc-macro2 1.0.29",
"proc-macro2",
]
[[package]]
@ -722,16 +685,6 @@ version = "0.6.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b"
[[package]]
name = "rhymessage"
version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e9d29b9f4112a4525d66aa9d6ec6c62dd48d7acdfd5ab414836727653a72d79"
dependencies = [
"named_tuple",
"thiserror",
]
[[package]]
name = "rusqlite"
version = "0.25.3"
@ -775,9 +728,9 @@ version = "1.0.130"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7bc1a1ab1961464eae040d96713baa5a724a8152c1222492465b54322ec508b"
dependencies = [
"proc-macro2 1.0.29",
"quote 1.0.9",
"syn 1.0.77",
"proc-macro2",
"quote",
"syn",
]
[[package]]
@ -820,9 +773,9 @@ checksum = "533494a8f9b724d33625ab53c6c4800f7cc445895924a8ef649222dcb76e938b"
[[package]]
name = "smallvec"
version = "1.6.1"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e"
checksum = "1ecab6c735a6bb4139c0caafd0cc3635748bbb3acf4550e8138122099251f309"
[[package]]
name = "strum"
@ -837,20 +790,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d06aaeeee809dbc59eb4556183dd927df67db1540de5be8d3ec0b6636358a5ec"
dependencies = [
"heck",
"proc-macro2 1.0.29",
"quote 1.0.9",
"syn 1.0.77",
]
[[package]]
name = "syn"
version = "0.15.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ca4b3b69a77cbe1ffc9e198781b7acb0c7365a883670e8f1c1bc66fba79a5c5"
dependencies = [
"proc-macro2 0.4.30",
"quote 0.6.13",
"unicode-xid 0.1.0",
"proc-macro2",
"quote",
"syn",
]
[[package]]
@ -859,9 +801,9 @@ version = "1.0.77"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5239bc68e0fef57495900cfea4e8dc75596d9a319d7e16b1e0a440d24e6fe0a0"
dependencies = [
"proc-macro2 1.0.29",
"quote 1.0.9",
"unicode-xid 0.2.2",
"proc-macro2",
"quote",
"unicode-xid",
]
[[package]]
@ -885,9 +827,9 @@ version = "1.0.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bad553cc2c78e8de258400763a647e80e6d1b31ee237275d756f6836d204494c"
dependencies = [
"proc-macro2 1.0.29",
"quote 1.0.9",
"syn 1.0.77",
"proc-macro2",
"quote",
"syn",
]
[[package]]
@ -901,17 +843,20 @@ dependencies = [
[[package]]
name = "time"
version = "0.1.43"
version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438"
checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255"
dependencies = [
"libc",
"wasi",
"winapi",
]
[[package]]
name = "timezone-abbreviations"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ead19eae5d0834473ce509eb859282c262e5b869a0171641d1668cd54c594d8f"
dependencies = [
"phf",
]
@ -934,9 +879,9 @@ version = "0.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "98863d0dd09fa59a1b79c6750ad80dbda6b75f4e71c437a6a1a8cb91a8bcbd77"
dependencies = [
"proc-macro2 1.0.29",
"quote 1.0.9",
"syn 1.0.77",
"proc-macro2",
"quote",
"syn",
]
[[package]]
@ -1009,12 +954,6 @@ version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8895849a949e7845e06bd6dc1aa51731a103c42707010a5b591c0038fb73385b"
[[package]]
name = "unicode-xid"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc72304796d0818e357ead4e000d19c9c174ab23dc11093ac919054d20a6a7fc"
[[package]]
name = "unicode-xid"
version = "0.2.2"
@ -1035,9 +974,9 @@ checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe"
[[package]]
name = "wasi"
version = "0.10.2+wasi-snapshot-preview1"
version = "0.10.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6"
checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f"
[[package]]
name = "wasm-bindgen"
@ -1058,9 +997,9 @@ dependencies = [
"bumpalo",
"lazy_static",
"log",
"proc-macro2 1.0.29",
"quote 1.0.9",
"syn 1.0.77",
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-shared",
]
@ -1070,7 +1009,7 @@ version = "0.2.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d56146e7c495528bf6587663bea13a8eb588d39b36b679d83972e1a2dbbdacf9"
dependencies = [
"quote 1.0.9",
"quote",
"wasm-bindgen-macro-support",
]
@ -1080,9 +1019,9 @@ version = "0.2.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7803e0eea25835f8abdc585cd3021b3deb11543c6fe226dcd30b228857c5c5ab"
dependencies = [
"proc-macro2 1.0.29",
"quote 1.0.9",
"syn 1.0.77",
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]

@ -13,13 +13,11 @@ tracing-subscriber = "0.2.24"
rusqlite = {version = "0.25.3", features = ["chrono", "trace"]}
regex = "1.5.3"
flate2 = "1.0.22"
email-parser = { path = "../email-parser/email-parser", features = ["sender", "from", "date", "subject", "mime", "allow-duplicate-headers"]}
email-parser = { git = "https://github.com/terhechte/email-parser", features = ["sender", "from", "date", "subject", "mime", "allow-duplicate-headers"]}
rayon = "1.5.1"
chrono = "0.4.19"
eml-parser = { path = "../EmlParser" }
email-address-parser = "1.0.3"
lazy_static = "*"
rhymessage = "1.3.1"
serde_json = "*"
serde = { version = "*", features = ["derive"]}
strum = "0.21"

@ -64,13 +64,14 @@ CREATE TABLE IF NOT EXISTS emails (
year INTEGER NOT NULL,
month INTEGER NOT NULL,
day INTEGER NOT NULL,
kind TEXT NOT NULL
kind TEXT NOT NULL,
subject TEXT NOT NULL
);"#;
connection.execute(&emails_table, params![])?;
let errors_table = r#"
CREATE TABLE IF NOT EXISTS errors (
message TEXT NOT NULL,
path TEXT NOT NULL,
path TEXT NOT NULL
);"#;
connection.execute(&errors_table, params![])?;
Ok(())
@ -85,9 +86,12 @@ fn insert_mail(connection: &Connection, entry: &EmailEntry) -> Result<()> {
let month = entry.datetime.date().month();
let day = entry.datetime.date().day();
let kind = entry.parser.to_string();
let sql = "INSERT INTO emails (path, domain, local_part, year, month, day, kind) VALUES (?, ?, ?, ?, ?, ?, ?)";
let subject = entry.subject.to_string();
let sql = "INSERT INTO emails (path, domain, local_part, year, month, day, kind, subject) VALUES (?, ?, ?, ?, ?, ?, ?, ?)";
let mut prepared = connection.prepare(sql)?;
prepared.execute(params![path, domain, local_part, year, month, day, kind])?;
prepared.execute(params![
path, domain, local_part, year, month, day, kind, subject
])?;
Ok(())
}

@ -1,11 +1,9 @@
use chrono::prelude::*;
use email_address_parser;
use eml_parser::eml::HeaderFieldValue;
use eyre::{bail, eyre, Result, WrapErr};
use flate2;
use flate2::read::GzDecoder;
use rayon::prelude::*;
use rhymessage;
use serde::Deserialize;
use serde_json;
use strum_macros;
@ -36,6 +34,7 @@ pub struct EmailEntry {
pub local_part: String,
pub datetime: chrono::DateTime<Utc>,
pub parser: ParserKind,
pub subject: String,
}
/// Raw representation of an email.
@ -187,19 +186,10 @@ fn read_emails(folder_path: &Path) -> Result<Vec<RawEmailEntry>> {
pub fn read_email(raw_entry: &RawEmailEntry) -> Result<EmailEntry> {
let content = unziped_content(&raw_entry.eml_path)?;
// We have to try multiple different email readers as each of them seems to fail in a different way
let email = parse_email_parser(&raw_entry, &content)
.or_else(|e| {
tracing::trace!("Parser Error: {:?}", &e);
parse_rhymessage(&raw_entry, &content)
})
.or_else(|e| {
tracing::trace!("Parser Error: {:?}", &e);
parse_eml(&raw_entry, &content)
})
.or_else(|e| {
tracing::trace!("Parser Error: {:?}", &e);
parse_meta(&raw_entry, &content)
});
let email = parse_email_parser(&raw_entry, &content).or_else(|e| {
tracing::trace!("Parser Error: {:?}", &e);
parse_meta(&raw_entry, &content)
});
Ok(email.wrap_err_with(|| {
format!(
@ -211,34 +201,13 @@ pub fn read_email(raw_entry: &RawEmailEntry) -> Result<EmailEntry> {
}
fn parse_email_parser(raw_entry: &RawEmailEntry, content: &Vec<u8>) -> Result<EmailEntry> {
let x = match email_parser::email::Email::parse(&content) {
match email_parser::email::Email::parse(&content) {
Ok(email) => (&raw_entry.eml_path, email).try_into(),
Err(error) => {
let content_string = String::from_utf8(content.clone())?;
//println!(
// "---\n{}\n---\n{:?}\n---\n{}",
// &content_string,
// &error,
// &raw_entry.eml_path.display()
//);
println!("{}|{}", &error, &raw_entry.eml_path.display());
//Err(eyre!("Could not `email_parser` email:\n{:?}", &error))
Err(eyre!("Could not `email_parser` email"))
//let content_string = String::from_utf8(content.clone())?;
//println!("{}|{}", &error, &raw_entry.eml_path.display());
Err(eyre!("Could not `email_parser` email:\n{:?}", &error))
}
};
x
//.unwrap();
//Ok(x)
}
fn parse_eml(raw_entry: &RawEmailEntry, content: &Vec<u8>) -> Result<EmailEntry> {
let content_string = String::from_utf8(content.clone())?;
match eml_parser::EmlParser::from_string(content_string)
.ignore_body()
.parse()
{
Ok(eml) => (&raw_entry.eml_path, eml).try_into(),
Err(error) => bail!("Could not `eml` parse email:\n{:?}", &error),
}
}
@ -248,6 +217,7 @@ fn parse_meta(raw_entry: &RawEmailEntry, _content: &Vec<u8>) -> Result<EmailEntr
struct Meta {
msg_id: String,
internal_date: i64,
subject: String,
}
let content = std::fs::read_to_string(&raw_entry.meta_path)?;
let meta: Meta = serde_json::from_str(&content)?;
@ -260,19 +230,10 @@ fn parse_meta(raw_entry: &RawEmailEntry, _content: &Vec<u8>) -> Result<EmailEntr
local_part: parsed.get_local_part().to_owned(),
datetime,
parser: ParserKind::Meta,
subject: meta.subject.clone(),
})
}
fn parse_rhymessage(raw_entry: &RawEmailEntry, content: &Vec<u8>) -> Result<EmailEntry> {
use rhymessage::MessageHeaders;
let mut headers = MessageHeaders::new();
match headers.parse(&content) {
Ok(_) => (),
Err(e) => bail!("Error Parsing Message: {:?}", &e),
}
Ok((&raw_entry.eml_path, headers).try_into()?)
}
impl<'a> TryFrom<(&PathBuf, email_parser::email::Email<'a>)> for EmailEntry {
type Error = eyre::Report;
fn try_from(content: (&PathBuf, email_parser::email::Email)) -> Result<Self, Self::Error> {
@ -280,6 +241,7 @@ impl<'a> TryFrom<(&PathBuf, email_parser::email::Email<'a>)> for EmailEntry {
let domain = email.sender.address.domain.to_string();
let local_part = email.sender.address.local_part.to_string();
let datetime = emaildatetime_to_chrono(&email.date);
let subject = email.subject.map(|e| e.to_string()).unwrap_or_default();
Ok(EmailEntry {
path: path.to_path_buf(),
@ -287,130 +249,18 @@ impl<'a> TryFrom<(&PathBuf, email_parser::email::Email<'a>)> for EmailEntry {
local_part,
datetime,
parser: ParserKind::EmailParser,
})
}
}
impl TryFrom<(&PathBuf, rhymessage::MessageHeaders)> for EmailEntry {
type Error = eyre::Report;
fn try_from(content: (&PathBuf, rhymessage::MessageHeaders)) -> Result<Self, Self::Error> {
let (path, headers) = content;
let mut address: Option<String> = None;
let mut date: Option<String> = None;
for entry in headers.headers() {
if address == None && SENDER_HEADER_NAMES.contains(&entry.name.as_ref()) {
address = Some(entry.value.to_string());
}
if date == None && DATE_HEADER_NAMES.contains(&entry.name.as_ref()) {
date = Some(entry.value.to_string());
}
if address.is_some() && date.is_some() {
break;
}
}
let address = address.ok_or(eyre!("Cannot find sender header"))?;
let date = date.ok_or(eyre!("Cannot find date header"))?;
let parsed_address = email_address_parser::EmailAddress::parse(&address, None)
.ok_or(eyre!("Cannot Parse Address: {}", &address))?;
let parsed_date = date
.parse::<DateTime<Utc>>()
.map_err(|e| eyre!("Cannot Parse Date {}: {:?}", &date, &e))?;
Ok(EmailEntry {
path: path.to_path_buf(),
domain: parsed_address.get_domain().to_string(),
local_part: parsed_address.get_local_part().to_string(),
datetime: parsed_date,
parser: ParserKind::Rhymessage,
})
}
}
impl TryFrom<(&PathBuf, eml_parser::eml::Eml)> for EmailEntry {
type Error = eyre::Report;
fn try_from(content: (&PathBuf, eml_parser::eml::Eml)) -> Result<Self, Self::Error> {
use eml_parser::eml::EmailAddress;
let (path, email) = content;
let headers = email.headers;
let sender = email
.from
.as_ref()
.or_else(|| {
// Try to find the address from some other field
headers
.iter()
.find(|f| SENDER_HEADER_NAMES.contains(&f.name.as_str()))
.map(|f| &f.value)
})
.ok_or(eyre!("Missing From Field"))?;
let datetime = headers
.iter()
.find(|f| DATE_HEADER_NAMES.contains(&f.name.as_str()))
.map(|f| match &f.value {
HeaderFieldValue::Unstructured(s) => Some(s.clone()),
_ => None,
})
.flatten()
.ok_or(eyre!("Missing Date Field"))?;
let parsed_date = datetime
.parse::<DateTime<Utc>>()
.map_err(|e| eyre!("Cannot Parse Date {}: {:?}", &datetime, &e))?;
use eml_parser::eml::HeaderFieldValue::*;
let address = match &sender {
SingleEmailAddress(e) => EmailAddress::AddressOnly {
address: extract_address(e),
},
MultipleEmailAddresses(e) if !e.is_empty() => EmailAddress::AddressOnly {
address: extract_address(e.get(0).unwrap()),
},
Unstructured(data) => {
parse_unstructured(&data).ok_or(eyre!("Invalid Unstructered Email: {}", &data))?
}
MultipleEmailAddresses(e) => {
bail!("Email has invalid amount of senders: {:?}", &e)
}
_ => bail!("Email has invalid amount of senders: {:?}", &sender),
};
let address = extract_address(&address);
let parsed = email_address_parser::EmailAddress::parse(&address, None)
.ok_or(eyre!("Cannot Parse Address: {}", &address))?;
Ok(EmailEntry {
path: path.to_path_buf(),
domain: parsed.get_domain().to_string(),
local_part: parsed.get_local_part().to_string(),
datetime: parsed_date,
parser: ParserKind::Eml,
subject,
})
}
}
fn emaildatetime_to_chrono(dt: &email_parser::time::DateTime) -> chrono::DateTime<Utc> {
use email_parser::time::Month::*;
let m = match dt.date.month {
January => 1,
February => 2,
March => 3,
April => 4,
May => 5,
June => 6,
July => 7,
August => 8,
September => 9,
October => 10,
November => 11,
December => 12,
};
Utc.ymd(dt.date.year as i32, m, dt.date.day as u32).and_hms(
Utc.ymd(
dt.date.year as i32,
dt.date.month_number() as u32,
dt.date.day as u32,
)
.and_hms(
dt.time.time.hour as u32,
dt.time.time.minute as u32,
dt.time.time.second as u32,
@ -427,34 +277,34 @@ fn unziped_content(path: &Path) -> Result<Vec<u8>> {
/// Try to parse unstructed data into some sort of
/// email address
fn parse_unstructured(data: &str) -> Option<eml_parser::eml::EmailAddress> {
use lazy_static::lazy_static;
use regex::Regex;
lazy_static! {
static ref EMAIL_RE: Regex = Regex::new(r#"(?:[a-z0-9!#$%&'*+/=?^_`{|}~-]+(?:\.[a-z0-9!#$%&'*+/=?^_`{|}~-]+)*|"(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21\x23-\x5b\x5d-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])*")@(?:(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?|\[(?:(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9]))\.){3}(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9])|[a-z0-9-]*[a-z0-9]:(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21-\x5a\x53-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])+)\])"#).unwrap();
}
lazy_static! {
static ref RE: Regex = Regex::new("<(.*?)>").unwrap();
}
if let Some(capture) = RE.captures(&data).and_then(|f| f.get(1)) {
Some(eml_parser::eml::EmailAddress::AddressOnly {
address: capture.as_str().to_string(),
})
} else {
let capture = EMAIL_RE.captures(&data).and_then(|f| f.get(0))?;
Some(eml_parser::eml::EmailAddress::AddressOnly {
address: capture.as_str().to_string(),
})
}
}
//fn parse_unstructured(data: &str) -> Option<eml_parser::eml::EmailAddress> {
// use lazy_static::lazy_static;
// use regex::Regex;
// lazy_static! {
// static ref EMAIL_RE: Regex = Regex::new(r#"(?:[a-z0-9!#$%&'*+/=?^_`{|}~-]+(?:\.[a-z0-9!#$%&'*+/=?^_`{|}~-]+)*|"(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21\x23-\x5b\x5d-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])*")@(?:(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?|\[(?:(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9]))\.){3}(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9])|[a-z0-9-]*[a-z0-9]:(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21-\x5a\x53-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])+)\])"#).unwrap();
// }
// lazy_static! {
// static ref RE: Regex = Regex::new("<(.*?)>").unwrap();
// }
// if let Some(capture) = RE.captures(&data).and_then(|f| f.get(1)) {
// Some(eml_parser::eml::EmailAddress::AddressOnly {
// address: capture.as_str().to_string(),
// })
// } else {
// let capture = EMAIL_RE.captures(&data).and_then(|f| f.get(0))?;
// Some(eml_parser::eml::EmailAddress::AddressOnly {
// address: capture.as_str().to_string(),
// })
// }
//}
fn extract_address(from: &eml_parser::eml::EmailAddress) -> String {
use eml_parser::eml::EmailAddress::*;
match from {
AddressOnly { address } => address.clone(),
NameAndEmailAddress { name: _, address } => address.clone(),
}
}
//fn extract_address(from: &eml_parser::eml::EmailAddress) -> String {
// use eml_parser::eml::EmailAddress::*;
// match from {
// AddressOnly { address } => address.clone(),
// NameAndEmailAddress { name: _, address } => address.clone(),
// }
//}
#[cfg(test)]
mod tests {
@ -463,29 +313,27 @@ mod tests {
use super::RawEmailEntry;
#[test]
fn test_weird_email1() {
let data = "No Reply <no-reply@evernote.com>, terhechte.5cffa@m.evernote.com";
let address = super::parse_unstructured(&data).unwrap();
assert_eq!(
address,
eml_parser::eml::EmailAddress::AddressOnly {
address: "no-reply@evernote.com".to_owned()
}
);
}
//fn test_weird_email1() {
// let data = "No Reply <no-reply@evernote.com>, terhechte.5cffa@m.evernote.com";
// let address = super::parse_unstructured(&data).unwrap();
// assert_eq!(
// address,
// eml_parser::eml::EmailAddress::AddressOnly {
// address: "no-reply@evernote.com".to_owned()
// }
// );
//}
#[test]
fn test_weird_email2() {
let data = r#"info@sport-news.denReply-To:info"@sport-news.denX-Mailer:Sport-News.de"#;
let address = super::parse_unstructured(&data).unwrap();
assert_eq!(
address,
eml_parser::eml::EmailAddress::AddressOnly {
address: "info@sport-news.den".to_owned()
}
);
}
//fn test_weird_email2() {
// let data = r#"info@sport-news.denReply-To:info"@sport-news.denX-Mailer:Sport-News.de"#;
// let address = super::parse_unstructured(&data).unwrap();
// assert_eq!(
// address,
// eml_parser::eml::EmailAddress::AddressOnly {
// address: "info@sport-news.den".to_owned()
// }
// );
//}
#[test]
fn test_weird_email3() {
crate::setup();

@ -1,3 +1,4 @@
use core::num;
use eyre::{bail, Result};
use rayon::prelude::*;
use std::io::prelude::*;
@ -5,9 +6,16 @@ use std::{io, path::PathBuf};
use thiserror;
use tracing_subscriber::EnvFilter;
use crossbeam_channel;
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::{
io::{stdout, Write},
thread::sleep,
time::Duration,
};
use crate::database::Database;
mod database;
@ -23,7 +31,44 @@ fn main() -> Result<()> {
setup();
let arguments: Vec<String> = std::env::args().collect();
let folder = arguments.get(1).ok_or(GmailDBError::MissingFolder)?;
process_folder(&folder)?;
let receiver = process_folder(&folder)?;
let mut stdout = stdout();
let mut total: Option<usize> = None;
let mut counter = 0;
let mut done = false;
println!("Collecting Mails...");
while done == false {
if let Some(total) = total {
for entry in receiver.try_iter() {
let value = match entry {
Err(e) => {
panic!("{:?}", &e);
}
Ok(None) => {
done = true;
0
}
Ok(Some(n)) => n,
};
counter += value;
}
print!("\rProcessing {}/{}...", counter, total);
} else {
match receiver.recv()? {
Err(e) => {
panic!("{:?}", &e);
}
Ok(Some(n)) => {
total = Some(n);
}
Ok(None) => done = true,
};
}
stdout.flush().unwrap();
sleep(Duration::from_millis(20));
}
println!();
//process_email(&folder)?;
Ok(())
}
@ -34,34 +79,58 @@ fn process_email(path: &str) -> Result<()> {
Ok(())
}
fn process_folder(folder: &str) -> Result<()> {
let emails = emails::Emails::new(&folder)?;
let total = emails.len();
enum FolderProgress {
Total(usize),
Parsed,
}
println!("Done Loading {} emails", &total);
fn process_folder(folder: &str) -> Result<crossbeam_channel::Receiver<Result<Option<usize>>>> {
// We return the status
let (tx, rx) = crossbeam_channel::bounded(100);
let folder = folder.to_owned();
let mut database = Database::new().expect("Expect a valid database");
std::thread::spawn(move || {
let emails = match emails::Emails::new(&folder) {
Ok(n) => n,
Err(e) => {
tx.send(Err(e)).unwrap();
return;
}
};
let total = emails.len();
let sender = database.process();
tx.send(Ok(Some(total))).unwrap();
use database::DBMessage;
emails
.emails
//.par_iter()
.iter()
.map(|raw_mail| (&raw_mail, emails::read_email(&raw_mail)))
.for_each(|(raw_mail, entry)| {
if let Err(e) = match entry {
Ok(mail) => sender.send(DBMessage::Mail(mail)),
Err(e) => sender.send(DBMessage::Error(e, raw_mail.path())),
} {
tracing::info!("Error Inserting into Database: {:?}", &e);
}
});
println!("Done Loading {} emails", &total);
sender.send(database::DBMessage::Done).unwrap();
while !sender.is_empty() {}
Ok(())
let mut database = Database::new().expect("Expect a valid database");
let sender = database.process();
use database::DBMessage;
emails
.emails
.par_iter()
//.iter()
.map(|raw_mail| (raw_mail.path(), emails::read_email(&raw_mail)))
.for_each(|(path, entry)| {
tx.send(Ok(Some(1))).unwrap();
if let Err(e) = match entry {
Ok(mail) => sender.send(DBMessage::Mail(mail)),
Err(e) => sender.send(DBMessage::Error(e, path)),
} {
tracing::info!("Error Inserting into Database: {:?}", &e);
}
});
sender.send(database::DBMessage::Done).unwrap();
while !sender.is_empty() {
println!("left in sqlite: {}", sender.len());
sleep(Duration::from_millis(20));
}
tx.send(Ok(None)).unwrap();
});
Ok(rx)
}
fn setup() {

Loading…
Cancel
Save