melib/maildir: refactor MaildirOp and watch()

Refactor to remove unwraps and add None checks with returning errors
where appropriate.

Hopefully addresses #426

Signed-off-by: Manos Pitsidianakis <manos@pitsidianak.is>
pull/428/head
Manos Pitsidianakis 4 months ago
parent a82d1e1ebe
commit 5110813e87
No known key found for this signature in database
GPG Key ID: 7729C7707F7E09D0

@ -25,38 +25,36 @@
//! specification. <https://cr.yp.to/proto/maildir.html> //! specification. <https://cr.yp.to/proto/maildir.html>
use std::{ use std::{
collections::{hash_map::DefaultHasher, HashMap, HashSet}, collections::{HashMap, HashSet},
ffi::OsStr,
fs, fs,
hash::{Hash, Hasher},
io::{self, Read, Write}, io::{self, Read, Write},
ops::{Deref, DerefMut}, ops::{Deref, DerefMut},
os::unix::fs::PermissionsExt, os::unix::fs::PermissionsExt,
path::{Component, Path, PathBuf}, path::{Path, PathBuf},
sync::{mpsc::channel, Arc, Mutex}, sync::{mpsc::channel, Arc, Mutex},
time::Duration, time::Duration,
}; };
use notify::{event::EventKind as NotifyEvent, RecommendedWatcher, RecursiveMode, Watcher}; use notify::{RecommendedWatcher, RecursiveMode, Watcher};
use super::{MaildirMailbox, MaildirOp, MaildirPathTrait}; use super::{watch, MaildirMailbox, MaildirOp, MaildirPathTrait};
use crate::{ use crate::{
backends::{prelude::*, RefreshEventKind::*}, backends::{prelude::*, RefreshEventKind::*},
error::{Error, ErrorKind, IntoError, Result}, error::{Error, ErrorKind, IntoError, Result},
utils::shellexpand::ShellExpandTrait, utils::shellexpand::ShellExpandTrait,
}; };
#[derive(Clone, Debug, PartialEq)] #[derive(Clone, Debug, Eq, PartialEq)]
pub(super) enum PathMod { pub enum PathMod {
Path(PathBuf), Path(PathBuf),
Hash(EnvelopeHash), Hash(EnvelopeHash),
} }
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct MaildirPath { pub struct MaildirPath {
pub(super) buf: PathBuf, pub buf: PathBuf,
pub(super) modified: Option<PathMod>, pub modified: Option<PathMod>,
pub(super) removed: bool, pub removed: bool,
} }
impl Deref for MaildirPath { impl Deref for MaildirPath {
@ -110,47 +108,13 @@ pub type HashIndexes = Arc<Mutex<HashMap<MailboxHash, HashIndex>>>;
/// The maildir backend instance type. /// The maildir backend instance type.
#[derive(Debug)] #[derive(Debug)]
pub struct MaildirType { pub struct MaildirType {
name: String, pub name: String,
mailboxes: HashMap<MailboxHash, MaildirMailbox>, pub mailboxes: HashMap<MailboxHash, MaildirMailbox>,
mailbox_index: Arc<Mutex<HashMap<EnvelopeHash, MailboxHash>>>, pub mailbox_index: Arc<Mutex<HashMap<EnvelopeHash, MailboxHash>>>,
hash_indexes: HashIndexes, pub hash_indexes: HashIndexes,
event_consumer: BackendEventConsumer, pub event_consumer: BackendEventConsumer,
collection: Collection, pub collection: Collection,
path: PathBuf, pub path: PathBuf,
}
macro_rules! path_is_new {
($path:expr) => {
if $path.is_dir() {
false
} else {
let mut iter = $path.components().rev();
iter.next();
iter.next() == Some(Component::Normal(OsStr::new("new")))
}
};
}
macro_rules! get_path_hash {
($path:expr) => {{
let mut path = $path.clone();
if path.is_dir() {
if path.ends_with("cur") | path.ends_with("new") {
path.pop();
}
} else {
path.pop();
path.pop();
};
crate::get_path_hash!(path)
}};
}
pub fn get_file_hash(file: &Path) -> EnvelopeHash {
let mut hasher = DefaultHasher::default();
file.hash(&mut hasher);
EnvelopeHash(hasher.finish())
} }
pub fn move_to_cur(p: &Path) -> Result<PathBuf> { pub fn move_to_cur(p: &Path) -> Result<PathBuf> {
@ -231,7 +195,7 @@ impl MailBackend for MaildirType {
map.keys().cloned().collect::<HashSet<EnvelopeHash>>() map.keys().cloned().collect::<HashSet<EnvelopeHash>>()
}; };
for file in files { for file in files {
let hash = get_file_hash(&file); let hash = file.to_envelope_hash();
{ {
let mut map = map.lock().unwrap(); let mut map = map.lock().unwrap();
let map = map.entry(mailbox_hash).or_default(); let map = map.entry(mailbox_hash).or_default();
@ -294,9 +258,7 @@ impl MailBackend for MaildirType {
} }
fn watch(&self) -> ResultFuture<()> { fn watch(&self) -> ResultFuture<()> {
let account_hash = AccountHash::from_bytes(self.name.as_bytes());
let root_mailbox = self.path.to_path_buf(); let root_mailbox = self.path.to_path_buf();
let sender = self.event_consumer.clone();
let (tx, rx) = channel(); let (tx, rx) = channel();
let watcher = RecommendedWatcher::new( let watcher = RecommendedWatcher::new(
tx, tx,
@ -304,11 +266,9 @@ impl MailBackend for MaildirType {
) )
.and_then(|mut watcher| { .and_then(|mut watcher| {
watcher.watch(&root_mailbox, RecursiveMode::Recursive)?; watcher.watch(&root_mailbox, RecursiveMode::Recursive)?;
Ok(watcher) Ok(Box::new(watcher))
}) })
.map_err(|err| err.set_err_details("Failed to create file change monitor."))?; .map_err(|err| err.set_err_details("Failed to create file change monitor."))?;
let hash_indexes = self.hash_indexes.clone();
let mailbox_index = self.mailbox_index.clone();
let root_mailbox_hash: MailboxHash = self let root_mailbox_hash: MailboxHash = self
.mailboxes .mailboxes
.values() .values()
@ -320,454 +280,18 @@ impl MailBackend for MaildirType {
.iter() .iter()
.map(|(&k, v)| (k, (v.unseen.clone(), v.total.clone()))) .map(|(&k, v)| (k, (v.unseen.clone(), v.total.clone())))
.collect::<HashMap<MailboxHash, (Arc<Mutex<usize>>, Arc<Mutex<usize>>)>>(); .collect::<HashMap<MailboxHash, (Arc<Mutex<usize>>, Arc<Mutex<usize>>)>>();
Ok(Box::pin(async move { let watch_state = watch::MaildirWatch {
// Move `watcher` in the closure's scope so that it doesn't get dropped. watcher,
let _watcher = watcher; account_hash: AccountHash::from_bytes(self.name.as_bytes()),
let mut buf = Vec::with_capacity(4096); event_consumer: self.event_consumer.clone(),
loop { root_mailbox,
match rx.recv() { rx,
Ok(Ok(event)) => match event.kind { hash_indexes: self.hash_indexes.clone(),
/* Create */ mailbox_index: self.mailbox_index.clone(),
NotifyEvent::Create(_) => { root_mailbox_hash,
log::debug!("Create events: (path = {:?})", event.paths); mailbox_counts,
for mut pathbuf in event.paths { };
if path_is_new!(pathbuf) { Ok(Box::pin(async move { watch_state.watch().await }))
// This creates a Rename event that we will receive later
pathbuf = match move_to_cur(&pathbuf) {
Ok(p) => p,
Err(err) => {
log::error!(
"Could not move {} to /cur: {}",
pathbuf.display(),
err
);
pathbuf
}
};
}
let mailbox_hash = MailboxHash(get_path_hash!(pathbuf));
let file_name = pathbuf
.as_path()
.strip_prefix(&root_mailbox)
.unwrap()
.to_path_buf();
if let Ok(env) = add_path_to_index(
&hash_indexes,
mailbox_hash,
pathbuf.as_path(),
file_name,
&mut buf,
) {
mailbox_index
.lock()
.unwrap()
.insert(env.hash(), mailbox_hash);
log::debug!(
"Create event {} {} {}",
env.hash(),
env.subject(),
pathbuf.display()
);
if !env.is_seen() {
*mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1;
}
*mailbox_counts[&mailbox_hash].1.lock().unwrap() += 1;
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Create(Box::new(env)),
}),
);
}
}
}
NotifyEvent::Modify(
notify::event::ModifyKind::Any
| notify::event::ModifyKind::Data(_)
| notify::event::ModifyKind::Other,
) => {
log::debug!("Modify events: (path = {:?})", event.paths);
for pathbuf in event.paths {
let mailbox_hash = MailboxHash(get_path_hash!(pathbuf));
let mut hash_indexes_lock = hash_indexes.lock().unwrap();
let index_lock =
&mut hash_indexes_lock.entry(mailbox_hash).or_default();
let file_name = pathbuf
.as_path()
.strip_prefix(&root_mailbox)
.unwrap()
.to_path_buf();
/* Linear search in hash_index to find old hash */
let old_hash: EnvelopeHash = {
if let Some((k, v)) =
index_lock.iter_mut().find(|(_, v)| *v.buf == pathbuf)
{
*v = pathbuf.clone().into();
*k
} else {
drop(hash_indexes_lock);
/* Did we just miss a Create event? In any case, create
* envelope. */
if let Ok(env) = add_path_to_index(
&hash_indexes,
mailbox_hash,
pathbuf.as_path(),
file_name,
&mut buf,
) {
mailbox_index
.lock()
.unwrap()
.insert(env.hash(), mailbox_hash);
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Create(Box::new(env)),
}),
);
}
continue;
}
};
let new_hash: EnvelopeHash = get_file_hash(pathbuf.as_path());
let mut reader = io::BufReader::new(fs::File::open(&pathbuf)?);
buf.clear();
reader.read_to_end(&mut buf)?;
if index_lock.get_mut(&new_hash).is_none() {
if let Ok(mut env) =
Envelope::from_bytes(buf.as_slice(), Some(pathbuf.flags()))
{
env.set_hash(new_hash);
index_lock.insert(new_hash, pathbuf.into());
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Update(old_hash, Box::new(env)),
}),
);
}
}
}
}
NotifyEvent::Remove(_) => {
for pathbuf in event.paths {
log::debug!("NotifyEvent::Remove(path = {:?}", pathbuf);
let mailbox_hash = MailboxHash(get_path_hash!(pathbuf));
let mut hash_indexes_lock = hash_indexes.lock().unwrap();
let index_lock = hash_indexes_lock.entry(mailbox_hash).or_default();
let hash: EnvelopeHash = if let Some((k, _)) =
index_lock.iter().find(|(_, v)| *v.buf == pathbuf)
{
*k
} else {
log::debug!("removed but not contained in index");
continue;
};
if let Some(ref modif) = &index_lock[&hash].modified {
match modif {
PathMod::Path(path) => log::trace!(
"envelope {} has modified path set {}",
hash,
path.display()
),
PathMod::Hash(hash) => log::trace!(
"envelope {} has modified path set {}",
hash,
&index_lock[hash].buf.display()
),
}
index_lock.entry(hash).and_modify(|e| {
e.removed = false;
});
continue;
}
{
let mut lck = mailbox_counts[&mailbox_hash].1.lock().unwrap();
*lck = lck.saturating_sub(1);
}
if !pathbuf.flags().contains(Flag::SEEN) {
let mut lck = mailbox_counts[&mailbox_hash].0.lock().unwrap();
*lck = lck.saturating_sub(1);
}
index_lock.entry(hash).and_modify(|e| {
e.removed = true;
});
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Remove(hash),
}),
);
}
}
NotifyEvent::Modify(notify::event::ModifyKind::Name(
notify::event::RenameMode::Both,
)) if event.paths.len() == 2 => {
let [ref src, ref dest] = event.paths[..] else {
unreachable!()
};
log::debug!("NotifyEvent::Rename(src = {:?}, dest = {:?})", src, dest);
let mailbox_hash = MailboxHash(get_path_hash!(src));
let dest_mailbox = {
let dest_mailbox = MailboxHash(get_path_hash!(dest));
if dest_mailbox == mailbox_hash {
None
} else {
Some(dest_mailbox)
}
};
let old_hash: EnvelopeHash = get_file_hash(src.as_path());
let new_hash: EnvelopeHash = get_file_hash(dest.as_path());
let mut hash_indexes_lock = hash_indexes.lock().unwrap();
let index_lock = hash_indexes_lock.entry(mailbox_hash).or_default();
let old_flags = src.flags();
let new_flags = dest.flags();
let was_seen: bool = old_flags.contains(Flag::SEEN);
let is_seen: bool = new_flags.contains(Flag::SEEN);
if index_lock.contains_key(&old_hash) && !index_lock[&old_hash].removed
{
if let Some(dest_mailbox) = dest_mailbox {
index_lock.entry(old_hash).and_modify(|e| {
e.removed = true;
});
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Remove(old_hash),
}),
);
let file_name = dest
.as_path()
.strip_prefix(&root_mailbox)
.unwrap()
.to_path_buf();
drop(hash_indexes_lock);
if let Ok(env) = add_path_to_index(
&hash_indexes,
dest_mailbox,
dest.as_path(),
file_name,
&mut buf,
) {
mailbox_index
.lock()
.unwrap()
.insert(env.hash(), dest_mailbox);
log::trace!(
"Create event {} {} {}",
env.hash(),
env.subject(),
dest.display()
);
if !env.is_seen() {
*mailbox_counts[&dest_mailbox].0.lock().unwrap() += 1;
}
*mailbox_counts[&dest_mailbox].1.lock().unwrap() += 1;
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash: dest_mailbox,
kind: Create(Box::new(env)),
}),
);
}
} else {
index_lock.entry(old_hash).and_modify(|e| {
e.modified = Some(PathMod::Hash(new_hash));
});
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Rename(old_hash, new_hash),
}),
);
if !was_seen && is_seen {
let mut lck =
mailbox_counts[&mailbox_hash].0.lock().unwrap();
*lck = lck.saturating_sub(1);
} else if was_seen && !is_seen {
*mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1;
}
if old_flags != new_flags {
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: NewFlags(new_hash, (new_flags, vec![])),
}),
);
}
mailbox_index.lock().unwrap().insert(new_hash, mailbox_hash);
index_lock.insert(new_hash, dest.to_path_buf().into());
}
continue;
} else if !index_lock.contains_key(&new_hash)
&& index_lock
.get(&old_hash)
.map(|e| e.removed)
.unwrap_or(false)
{
if index_lock
.get(&old_hash)
.map(|e| e.removed)
.unwrap_or(false)
{
index_lock.entry(old_hash).and_modify(|e| {
e.modified = Some(PathMod::Hash(new_hash));
e.removed = false;
});
log::trace!(
"contains_old_key, key was marked as removed (by external \
source)"
);
} else {
log::trace!("not contains_new_key");
}
let file_name = dest
.as_path()
.strip_prefix(&root_mailbox)
.unwrap()
.to_path_buf();
log::trace!("filename = {:?}", file_name);
drop(hash_indexes_lock);
if let Ok(env) = add_path_to_index(
&hash_indexes,
dest_mailbox.unwrap_or(mailbox_hash),
dest.as_path(),
file_name,
&mut buf,
) {
mailbox_index
.lock()
.unwrap()
.insert(env.hash(), dest_mailbox.unwrap_or(mailbox_hash));
log::trace!(
"Create event {} {} {}",
env.hash(),
env.subject(),
dest.display()
);
if !env.is_seen() {
*mailbox_counts[&dest_mailbox.unwrap_or(mailbox_hash)]
.0
.lock()
.unwrap() += 1;
}
*mailbox_counts[&dest_mailbox.unwrap_or(mailbox_hash)]
.1
.lock()
.unwrap() += 1;
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash: dest_mailbox.unwrap_or(mailbox_hash),
kind: Create(Box::new(env)),
}),
);
continue;
} else {
log::trace!("not valid email");
}
} else if let Some(dest_mailbox) = dest_mailbox {
drop(hash_indexes_lock);
let file_name = dest
.as_path()
.strip_prefix(&root_mailbox)
.unwrap()
.to_path_buf();
if let Ok(env) = add_path_to_index(
&hash_indexes,
dest_mailbox,
dest.as_path(),
file_name,
&mut buf,
) {
mailbox_index
.lock()
.unwrap()
.insert(env.hash(), dest_mailbox);
log::trace!(
"Create event {} {} {}",
env.hash(),
env.subject(),
dest.display()
);
if !env.is_seen() {
*mailbox_counts[&dest_mailbox].0.lock().unwrap() += 1;
}
*mailbox_counts[&dest_mailbox].1.lock().unwrap() += 1;
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash: dest_mailbox,
kind: Create(Box::new(env)),
}),
);
}
} else {
if was_seen && !is_seen {
*mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1;
}
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Rename(old_hash, new_hash),
}),
);
log::trace!("contains_new_key");
if old_flags != new_flags {
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: NewFlags(new_hash, (new_flags, vec![])),
}),
);
}
}
}
_ => {
log::debug!("Received unexpected fs watcher notify event: {:?}", event);
/* Trigger rescan of mailbox */
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash: root_mailbox_hash,
kind: Rescan,
}),
);
}
},
Ok(Err(e)) => log::debug!("watch error: {:?}", e),
Err(e) => log::debug!("watch error: {:?}", e),
}
}
}))
} }
fn operation(&self, hash: EnvelopeHash) -> Result<Box<dyn BackendOp>> { fn operation(&self, hash: EnvelopeHash) -> Result<Box<dyn BackendOp>> {
@ -976,7 +500,7 @@ impl MailBackend for MaildirType {
.map(|item| *item.0) .map(|item| *item.0)
}); });
let mailbox_hash = MailboxHash(get_path_hash!(&path)); let mailbox_hash: MailboxHash = path.to_mailbox_hash();
if let Some(parent) = parent { if let Some(parent) = parent {
self.mailboxes self.mailboxes
.entry(parent) .entry(parent)
@ -1324,31 +848,3 @@ impl MaildirType {
Ok(files) Ok(files)
} }
} }
fn add_path_to_index(
hash_index: &HashIndexes,
mailbox_hash: MailboxHash,
path: &Path,
file_name: PathBuf,
buf: &mut Vec<u8>,
) -> Result<Envelope> {
log::trace!("add_path_to_index path {:?} filename{:?}", path, file_name);
let env_hash = get_file_hash(path);
hash_index
.lock()
.unwrap()
.entry(mailbox_hash)
.or_default()
.insert(env_hash, path.to_path_buf().into());
let mut reader = io::BufReader::new(fs::File::open(path)?);
buf.clear();
reader.read_to_end(buf)?;
let mut env = Envelope::from_bytes(buf.as_slice(), Some(path.flags()))?;
env.set_hash(env_hash);
log::trace!(
"add_path_to_index gen {}\t{}",
env_hash,
file_name.display()
);
Ok(env)
}

@ -22,6 +22,7 @@
#[macro_use] #[macro_use]
mod backend; mod backend;
pub use self::backend::*; pub use self::backend::*;
pub mod watch;
mod stream; mod stream;
use std::{ use std::{
@ -67,26 +68,33 @@ impl MaildirOp {
hash, hash,
} }
} }
fn path(&self) -> Result<PathBuf> {
pub fn path(&self) -> Option<PathBuf> {
let map = self.hash_index.lock().unwrap(); let map = self.hash_index.lock().unwrap();
let map = &map[&self.mailbox_hash]; let map = map.get(&self.mailbox_hash)?;
debug!("looking for {} in {} map", self.hash, self.mailbox_hash); log::trace!("looking for {} in {} map", self.hash, self.mailbox_hash);
if !map.contains_key(&self.hash) { let mut hash = self.hash;
debug!("doesn't contain it though len = {}\n{:#?}", map.len(), map); loop {
for e in map.iter() { let Some(p) = map.get(&hash) else {
debug!("{:#?}", e); log::trace!("doesn't contain it though len = {}\n{:#?}", map.len(), map);
for e in map.iter() {
log::debug!("{:#?}", e);
}
return None;
};
if let Some(ref modif) = p.modified {
match modif {
PathMod::Path(ref path) => return Some(path.to_path_buf()),
PathMod::Hash(next_hash) => {
hash = *next_hash;
}
}
} else if p.removed {
return None;
} else {
return Some(p.buf.to_path_buf());
} }
return Err(Error::new("File not found"));
} }
Ok(if let Some(modif) = &map[&self.hash].modified {
match modif {
PathMod::Path(ref path) => path.clone(),
PathMod::Hash(hash) => map[hash].to_path_buf(),
}
} else {
map.get(&self.hash).unwrap().to_path_buf()
})
} }
} }
@ -95,10 +103,15 @@ impl BackendOp for MaildirOp {
let _self = self.clone(); let _self = self.clone();
Ok(Box::pin(async move { Ok(Box::pin(async move {
smol::unblock(move || { smol::unblock(move || {
let Some(path) = _self.path() else {
return Err(Error::new("Not found")
.set_summary(format!("Message with hash {} was not found.", _self.hash))
.set_kind(ErrorKind::NotFound));
};
let file = std::fs::OpenOptions::new() let file = std::fs::OpenOptions::new()
.read(true) .read(true)
.write(false) .write(false)
.open(_self.path()?)?; .open(path)?;
let mut buf_reader = BufReader::new(file); let mut buf_reader = BufReader::new(file);
let mut contents = Vec::new(); let mut contents = Vec::new();
buf_reader.read_to_end(&mut contents)?; buf_reader.read_to_end(&mut contents)?;
@ -255,6 +268,9 @@ impl BackendMailbox for MaildirMailbox {
pub trait MaildirPathTrait { pub trait MaildirPathTrait {
fn flags(&self) -> Flag; fn flags(&self) -> Flag;
fn to_mailbox_hash(&self) -> MailboxHash;
fn to_envelope_hash(&self) -> EnvelopeHash;
fn is_in_new(&self) -> bool;
} }
impl MaildirPathTrait for Path { impl MaildirPathTrait for Path {
@ -286,4 +302,38 @@ impl MaildirPathTrait for Path {
flag flag
} }
fn to_mailbox_hash(&self) -> MailboxHash {
let mut path = self.to_path_buf();
if path.is_file() {
path.pop();
}
if path.is_dir() && (path.ends_with("cur") || path.ends_with("tmp") | path.ends_with("new"))
{
path.pop();
}
let mut hasher = DefaultHasher::new();
path.hash(&mut hasher);
MailboxHash(hasher.finish())
}
fn to_envelope_hash(&self) -> EnvelopeHash {
debug_assert!(self.is_file());
let mut hasher = DefaultHasher::default();
self.hash(&mut hasher);
EnvelopeHash(hasher.finish())
}
fn is_in_new(&self) -> bool {
use std::{ffi::OsStr, path::Component};
if self.is_dir() {
false
} else {
let mut iter = self.components().rev();
iter.next();
iter.next() == Some(Component::Normal(OsStr::new("new")))
}
}
} }

@ -95,7 +95,7 @@ impl MaildirStream {
let mut unseen_total: usize = 0; let mut unseen_total: usize = 0;
let mut buf = Vec::with_capacity(4096); let mut buf = Vec::with_capacity(4096);
for file in chunk { for file in chunk {
let env_hash = get_file_hash(&file); let env_hash = file.to_envelope_hash();
{ {
map.lock() map.lock()
.unwrap() .unwrap()

@ -0,0 +1,504 @@
//
// meli
//
// Copyright 2017 Emmanouil Pitsidianakis <manos@pitsidianak.is>
//
// This file is part of meli.
//
// meli is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// meli is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with meli. If not, see <http://www.gnu.org/licenses/>.
//
// SPDX-License-Identifier: EUPL-1.2 OR GPL-3.0-or-later
use std::{
collections::HashMap,
fs,
io::{self, Read},
path::{Path, PathBuf},
sync::{mpsc, Arc, Mutex},
};
use notify::{self, event::EventKind as NotifyEvent};
use crate::{
backends::{prelude::*, RefreshEventKind::*},
error,
maildir::{move_to_cur, HashIndexes, MaildirPathTrait, PathMod},
};
pub struct MaildirWatch {
pub watcher: Box<dyn notify::Watcher + Send>,
pub account_hash: AccountHash,
pub event_consumer: BackendEventConsumer,
pub root_mailbox: PathBuf,
pub rx: mpsc::Receiver<std::result::Result<notify::Event, notify::Error>>,
pub hash_indexes: HashIndexes,
pub mailbox_index: Arc<Mutex<HashMap<EnvelopeHash, MailboxHash>>>,
pub root_mailbox_hash: MailboxHash,
#[allow(clippy::type_complexity)]
pub mailbox_counts: HashMap<MailboxHash, (Arc<Mutex<usize>>, Arc<Mutex<usize>>)>,
}
impl MaildirWatch {
pub async fn watch(self) -> error::Result<()> {
let Self {
watcher: _watcher,
account_hash,
event_consumer,
root_mailbox: _root_mailbox,
rx,
hash_indexes,
mailbox_index,
root_mailbox_hash,
mailbox_counts,
} = self;
let mut buf = Vec::with_capacity(4096);
loop {
match rx.recv() {
Ok(Ok(event)) => match event.kind {
NotifyEvent::Create(_) => {
log::trace!("Create events: (path = {:?})", event.paths);
for mut pathbuf in event.paths {
if pathbuf.is_in_new() {
// This creates a Rename event that we will receive later
pathbuf = match move_to_cur(&pathbuf) {
Ok(p) => p,
Err(err) => {
log::error!(
"Could not move {} to /cur: {}",
pathbuf.display(),
err
);
pathbuf
}
};
}
let mailbox_hash: MailboxHash = pathbuf.to_mailbox_hash();
if let Ok(env) = add_path_to_index(
&hash_indexes,
mailbox_hash,
pathbuf.as_path(),
&mut buf,
) {
mailbox_index
.lock()
.unwrap()
.insert(env.hash(), mailbox_hash);
log::trace!(
"Create event {} {} {}",
env.hash(),
env.subject(),
pathbuf.display()
);
if !env.is_seen() {
*mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1;
}
*mailbox_counts[&mailbox_hash].1.lock().unwrap() += 1;
(event_consumer)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Create(Box::new(env)),
}),
);
}
}
}
NotifyEvent::Modify(
notify::event::ModifyKind::Any
| notify::event::ModifyKind::Data(_)
| notify::event::ModifyKind::Other,
) => {
log::trace!("Modify events: (path = {:?})", event.paths);
for pathbuf in event.paths {
let mailbox_hash: MailboxHash = pathbuf.to_mailbox_hash();
let mut hash_indexes_lock = hash_indexes.lock().unwrap();
let index_lock =
&mut hash_indexes_lock.entry(mailbox_hash).or_default();
// Linear search in hash_index to find old hash
let old_hash: EnvelopeHash = {
if let Some((k, v)) =
index_lock.iter_mut().find(|(_, v)| *v.buf == pathbuf)
{
*v = pathbuf.clone().into();
*k
} else {
drop(hash_indexes_lock);
// Did we just miss a Create event? In any case, create
// envelope.
if let Ok(env) = add_path_to_index(
&hash_indexes,
mailbox_hash,
pathbuf.as_path(),
&mut buf,
) {
mailbox_index
.lock()
.unwrap()
.insert(env.hash(), mailbox_hash);
(event_consumer)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Create(Box::new(env)),
}),
);
}
continue;
}
};
let new_hash: EnvelopeHash = pathbuf.to_envelope_hash();
let mut reader = io::BufReader::new(fs::File::open(&pathbuf)?);
buf.clear();
reader.read_to_end(&mut buf)?;
if index_lock.get_mut(&new_hash).is_none() {
if let Ok(mut env) =
Envelope::from_bytes(buf.as_slice(), Some(pathbuf.flags()))
{
env.set_hash(new_hash);
index_lock.insert(new_hash, pathbuf.into());
(event_consumer)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Update(old_hash, Box::new(env)),
}),
);
}
}
}
}
NotifyEvent::Remove(_) => {
for pathbuf in event.paths {
log::trace!("NotifyEvent::Remove(path = {:?}", pathbuf);
let mailbox_hash: MailboxHash = pathbuf.to_mailbox_hash();
let mut hash_indexes_lock = hash_indexes.lock().unwrap();
let index_lock = hash_indexes_lock.entry(mailbox_hash).or_default();
let hash: EnvelopeHash = if let Some((k, _)) =
index_lock.iter().find(|(_, v)| *v.buf == pathbuf)
{
*k
} else {
log::trace!("removed but not contained in index");
continue;
};
if let Some(ref modif) = &index_lock[&hash].modified {
match modif {
PathMod::Path(path) => log::trace!(
"envelope {} has modified path set {}",
hash,
path.display()
),
PathMod::Hash(hash) => log::trace!(
"envelope {} has modified path set {}",
hash,
&index_lock[hash].buf.display()
),
}
index_lock.entry(hash).and_modify(|e| {
e.removed = false;
});
continue;
}
{
let mut lck = mailbox_counts[&mailbox_hash].1.lock().unwrap();
*lck = lck.saturating_sub(1);
}
if !pathbuf.flags().contains(Flag::SEEN) {
let mut lck = mailbox_counts[&mailbox_hash].0.lock().unwrap();
*lck = lck.saturating_sub(1);
}
index_lock.entry(hash).and_modify(|e| {
e.removed = true;
});
(event_consumer)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Remove(hash),
}),
);
}
}
NotifyEvent::Modify(notify::event::ModifyKind::Name(
notify::event::RenameMode::Both,
)) if event.paths.len() == 2 => {
let [ref src, ref dest] = event.paths[..] else {
unreachable!()
};
log::trace!("NotifyEvent::Rename(src = {:?}, dest = {:?})", src, dest);
let mailbox_hash: MailboxHash = src.to_mailbox_hash();
let dest_mailbox = {
let dest_mailbox: MailboxHash = dest.to_mailbox_hash();
if dest_mailbox == mailbox_hash {
None
} else {
Some(dest_mailbox)
}
};
let old_hash: EnvelopeHash = src.to_envelope_hash();
let new_hash: EnvelopeHash = dest.to_envelope_hash();
let mut hash_indexes_lock = hash_indexes.lock().unwrap();
let index_lock = hash_indexes_lock.entry(mailbox_hash).or_default();
let old_flags = src.flags();
let new_flags = dest.flags();
let was_seen: bool = old_flags.contains(Flag::SEEN);
let is_seen: bool = new_flags.contains(Flag::SEEN);
if index_lock.contains_key(&old_hash) && !index_lock[&old_hash].removed {
if let Some(dest_mailbox) = dest_mailbox {
index_lock.entry(old_hash).and_modify(|e| {
e.removed = true;
});
(event_consumer)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Remove(old_hash),
}),
);
drop(hash_indexes_lock);
if let Ok(env) = add_path_to_index(
&hash_indexes,
dest_mailbox,
dest.as_path(),
&mut buf,
) {
mailbox_index
.lock()
.unwrap()
.insert(env.hash(), dest_mailbox);
log::trace!(
"Create event {} {} {}",
env.hash(),
env.subject(),
dest.display()
);
if !env.is_seen() {
*mailbox_counts[&dest_mailbox].0.lock().unwrap() += 1;
}
*mailbox_counts[&dest_mailbox].1.lock().unwrap() += 1;
(event_consumer)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash: dest_mailbox,
kind: Create(Box::new(env)),
}),
);
}
} else {
index_lock.entry(old_hash).and_modify(|e| {
e.modified = Some(PathMod::Hash(new_hash));
});
(event_consumer)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Rename(old_hash, new_hash),
}),
);
if !was_seen && is_seen {
let mut lck = mailbox_counts[&mailbox_hash].0.lock().unwrap();
*lck = lck.saturating_sub(1);
} else if was_seen && !is_seen {
*mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1;
}
if old_flags != new_flags {
(event_consumer)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: NewFlags(new_hash, (new_flags, vec![])),
}),
);
}
mailbox_index.lock().unwrap().insert(new_hash, mailbox_hash);
index_lock.insert(new_hash, dest.to_path_buf().into());
}
continue;
} else if !index_lock.contains_key(&new_hash)
&& index_lock
.get(&old_hash)
.map(|e| e.removed)
.unwrap_or(false)
{
if index_lock
.get(&old_hash)
.map(|e| e.removed)
.unwrap_or(false)
{
index_lock.entry(old_hash).and_modify(|e| {
e.modified = Some(PathMod::Hash(new_hash));
e.removed = false;
});
log::trace!(
"contains_old_key, key was marked as removed (by external \
source)"
);
} else {
log::trace!("not contains_new_key");
}
drop(hash_indexes_lock);
if let Ok(env) = add_path_to_index(
&hash_indexes,
dest_mailbox.unwrap_or(mailbox_hash),
dest.as_path(),
&mut buf,
) {
mailbox_index
.lock()
.unwrap()
.insert(env.hash(), dest_mailbox.unwrap_or(mailbox_hash));
log::trace!(
"Create event {} {} {}",
env.hash(),
env.subject(),
dest.display()
);
if !env.is_seen() {
*mailbox_counts[&dest_mailbox.unwrap_or(mailbox_hash)]
.0
.lock()
.unwrap() += 1;
}
*mailbox_counts[&dest_mailbox.unwrap_or(mailbox_hash)]
.1
.lock()
.unwrap() += 1;
(event_consumer)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash: dest_mailbox.unwrap_or(mailbox_hash),
kind: Create(Box::new(env)),
}),
);
continue;
} else {
log::trace!("not valid email");
}
} else if let Some(dest_mailbox) = dest_mailbox {
drop(hash_indexes_lock);
if let Ok(env) = add_path_to_index(
&hash_indexes,
dest_mailbox,
dest.as_path(),
&mut buf,
) {
mailbox_index
.lock()
.unwrap()
.insert(env.hash(), dest_mailbox);
log::trace!(
"Create event {} {} {}",
env.hash(),
env.subject(),
dest.display()
);
if !env.is_seen() {
*mailbox_counts[&dest_mailbox].0.lock().unwrap() += 1;
}
*mailbox_counts[&dest_mailbox].1.lock().unwrap() += 1;
(event_consumer)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash: dest_mailbox,
kind: Create(Box::new(env)),
}),
);
}
} else {
if was_seen && !is_seen {
*mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1;
}
(event_consumer)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Rename(old_hash, new_hash),
}),
);
log::trace!("contains_new_key");
if old_flags != new_flags {
(event_consumer)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: NewFlags(new_hash, (new_flags, vec![])),
}),
);
}
}
}
_ => {
log::debug!("Received unexpected fs watcher notify event: {:?}", event);
// Trigger rescan of mailbox.
(event_consumer)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash: root_mailbox_hash,
kind: Rescan,
}),
);
}
},
Ok(Err(e)) => log::debug!("watch error: {:?}", e),
Err(e) => log::debug!("watch error: {:?}", e),
}
}
}
}
fn add_path_to_index(
hash_index: &HashIndexes,
mailbox_hash: MailboxHash,
path: &Path,
buf: &mut Vec<u8>,
) -> error::Result<Envelope> {
log::trace!(
"add_path_to_index path {:?} filename{:?}",
path,
path.file_name()
);
let env_hash = path.to_envelope_hash();
hash_index
.lock()
.unwrap()
.entry(mailbox_hash)
.or_default()
.insert(env_hash, path.to_path_buf().into());
let mut reader = io::BufReader::new(fs::File::open(path)?);
buf.clear();
reader.read_to_end(buf)?;
let mut env = Envelope::from_bytes(buf.as_slice(), Some(path.flags()))?;
env.set_hash(env_hash);
log::trace!("add_path_to_index gen {}\t{:?}", env_hash, path.file_name());
Ok(env)
}
Loading…
Cancel
Save