melib/notmuch: issue proper Refresh events in set_flags()

When using set_flags() to change notmuch tags and other mail flags, the
new state was not reflected on the UI, which only detects changes from
RefreshEvents.

Issue refresh events for mail moved to new mailboxes, removed from
mailboxes or just have tag modifications in general.

Closes #133

"Notmuch backend never updates the visible tags" https://git.meli.delivery/meli/meli/issues/133
pull/154/head
Manos Pitsidianakis 2 years ago
parent c6bdda03cf
commit 000b8feb90

@ -25,10 +25,7 @@ use crate::error::{MeliError, Result};
use crate::shellexpand::ShellExpandTrait; use crate::shellexpand::ShellExpandTrait;
use crate::{backends::*, Collection}; use crate::{backends::*, Collection};
use smallvec::SmallVec; use smallvec::SmallVec;
use std::collections::{ use std::collections::hash_map::{DefaultHasher, HashMap};
hash_map::{DefaultHasher, HashMap},
BTreeMap,
};
use std::error::Error; use std::error::Error;
use std::ffi::{CStr, CString, OsStr}; use std::ffi::{CStr, CString, OsStr};
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
@ -66,117 +63,38 @@ mod tags;
pub use tags::*; pub use tags::*;
mod thread; mod thread;
pub use thread::*; pub use thread::*;
mod query;
pub use query::*;
#[derive(Debug)] #[derive(Debug)]
pub struct DbConnection { pub struct DbConnection {
#[allow(dead_code)] pub state: Arc<NotmuchState>,
pub lib: Arc<libloading::Library>,
pub inner: Arc<RwLock<*mut notmuch_database_t>>, pub inner: Arc<RwLock<*mut notmuch_database_t>>,
pub revision_uuid: Arc<RwLock<u64>>,
pub database_ph: std::marker::PhantomData<&'static mut notmuch_database_t>, pub database_ph: std::marker::PhantomData<&'static mut notmuch_database_t>,
} }
impl DbConnection { impl DbConnection {
pub fn get_revision_uuid(&self) -> u64 { pub fn get_revision_uuid(&self) -> u64 {
unsafe { unsafe {
call!(self.lib, notmuch_database_get_revision)( call!(self.state.lib, notmuch_database_get_revision)(
*self.inner.read().unwrap(), *self.inner.read().unwrap(),
std::ptr::null_mut(), std::ptr::null_mut(),
) )
} }
} }
fn refresh( fn refresh(&mut self, new_revision_uuid: u64) -> Result<()> {
&mut self,
mailboxes: Arc<RwLock<HashMap<MailboxHash, NotmuchMailbox>>>,
index: Arc<RwLock<HashMap<EnvelopeHash, CString>>>,
mailbox_index: Arc<RwLock<HashMap<EnvelopeHash, SmallVec<[MailboxHash; 16]>>>>,
tag_index: Arc<RwLock<BTreeMap<u64, String>>>,
account_hash: AccountHash,
event_consumer: BackendEventConsumer,
new_revision_uuid: u64,
) -> Result<()> {
use RefreshEventKind::*;
let query_str = format!( let query_str = format!(
"lastmod:{}..{}", "lastmod:{}..{}",
*self.revision_uuid.read().unwrap(), *self.state.revision_uuid.read().unwrap(),
new_revision_uuid new_revision_uuid
); );
let query: Query = Query::new(self, &query_str)?; let query: Query = Query::new(self, &query_str)?;
let iter = query.search()?; let iter = query.search()?;
let mailbox_index_lck = mailbox_index.write().unwrap();
let mailboxes_lck = mailboxes.read().unwrap();
for message in iter { for message in iter {
let env_hash = message.env_hash(); self.state.update_message_status(self, message)?;
if let Some(mailbox_hashes) = mailbox_index_lck.get(&env_hash) {
let tags: (Flag, Vec<String>) = message.tags().collect_flags_and_tags();
let mut tag_lock = tag_index.write().unwrap();
for tag in tags.1.iter() {
let mut hasher = DefaultHasher::new();
hasher.write(tag.as_bytes());
let num = hasher.finish();
if !tag_lock.contains_key(&num) {
tag_lock.insert(num, tag.clone());
}
}
for &mailbox_hash in mailbox_hashes {
(event_consumer)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: NewFlags(env_hash, tags.clone()),
}),
);
}
} else {
let message_id = message.msg_id_cstr().to_string_lossy().to_string();
let env = message.into_envelope(&index, &tag_index);
for (&mailbox_hash, m) in mailboxes_lck.iter() {
let query_str = format!("{} id:{}", m.query_str.as_str(), &message_id);
let query: Query = Query::new(self, &query_str)?;
if query.count().unwrap_or(0) > 0 {
let mut total_lck = m.total.lock().unwrap();
let mut unseen_lck = m.unseen.lock().unwrap();
*total_lck += 1;
if !env.is_seen() {
*unseen_lck += 1;
}
(event_consumer)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Create(Box::new(env.clone())),
}),
);
}
}
}
} }
drop(query); drop(query);
index.write().unwrap().retain(|&env_hash, msg_id| {
if Message::find_message(self, msg_id).is_err() {
if let Some(mailbox_hashes) = mailbox_index_lck.get(&env_hash) {
for &mailbox_hash in mailbox_hashes {
let m = &mailboxes_lck[&mailbox_hash];
let mut total_lck = m.total.lock().unwrap();
*total_lck = total_lck.saturating_sub(1);
(event_consumer)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Remove(env_hash),
}),
);
}
}
false
} else {
true
}
});
Ok(()) Ok(())
} }
} }
@ -202,12 +120,17 @@ impl Drop for DbConnection {
fn drop(&mut self) { fn drop(&mut self) {
let inner = self.inner.write().unwrap(); let inner = self.inner.write().unwrap();
unsafe { unsafe {
if let Err(err) = try_call!(self.lib, call!(self.lib, notmuch_database_close)(*inner)) { if let Err(err) = try_call!(
self.state.lib,
call!(self.state.lib, notmuch_database_close)(*inner)
) {
debug!(err); debug!(err);
return; return;
} }
if let Err(err) = try_call!(self.lib, call!(self.lib, notmuch_database_destroy)(*inner)) if let Err(err) = try_call!(
{ self.state.lib,
call!(self.state.lib, notmuch_database_destroy)(*inner)
) {
debug!(err); debug!(err);
} }
} }
@ -216,22 +139,182 @@ impl Drop for DbConnection {
#[derive(Debug)] #[derive(Debug)]
pub struct NotmuchDb { pub struct NotmuchDb {
#[allow(dead_code)]
lib: Arc<libloading::Library>,
state: Arc<NotmuchState>,
collection: Collection,
path: PathBuf,
#[allow(dead_code)]
account_name: Arc<String>,
#[allow(dead_code)]
account_hash: AccountHash,
#[allow(dead_code)]
event_consumer: BackendEventConsumer,
save_messages_to: Option<PathBuf>,
}
unsafe impl Send for NotmuchDb {}
unsafe impl Sync for NotmuchDb {}
#[derive(Debug, Clone)]
pub struct NotmuchState {
#[allow(dead_code)] #[allow(dead_code)]
lib: Arc<libloading::Library>, lib: Arc<libloading::Library>,
revision_uuid: Arc<RwLock<u64>>, revision_uuid: Arc<RwLock<u64>>,
mailboxes: Arc<RwLock<HashMap<MailboxHash, NotmuchMailbox>>>, mailboxes: Arc<RwLock<HashMap<MailboxHash, NotmuchMailbox>>>,
index: Arc<RwLock<HashMap<EnvelopeHash, CString>>>, index: Arc<RwLock<HashMap<EnvelopeHash, CString>>>,
mailbox_index: Arc<RwLock<HashMap<EnvelopeHash, SmallVec<[MailboxHash; 16]>>>>,
collection: Collection, collection: Collection,
path: PathBuf, path: PathBuf,
_account_name: Arc<String>, #[allow(dead_code)]
account_name: Arc<String>,
#[allow(dead_code)]
account_hash: AccountHash, account_hash: AccountHash,
event_consumer: BackendEventConsumer, event_consumer: BackendEventConsumer,
#[allow(dead_code)]
save_messages_to: Option<PathBuf>, save_messages_to: Option<PathBuf>,
} }
unsafe impl Send for NotmuchDb {} impl NotmuchState {
unsafe impl Sync for NotmuchDb {} pub fn into_envelope(&self, message: Message<'_>) -> Envelope {
let env_hash = message.env_hash();
self.index
.write()
.unwrap()
.insert(env_hash, message.msg_id_cstr().into());
let mut tag_lock = self.collection.tag_index.write().unwrap();
let (_, tags) = TagIterator::new(&message).collect_flags_and_tags();
for tag in tags {
let mut hasher = DefaultHasher::new();
hasher.write(tag.as_bytes());
let num = hasher.finish();
tag_lock.entry(num).or_insert(tag);
}
message.into_envelope()
}
pub fn new_flags(&self, message: &Message<'_>) -> Result<()> {
let (_, tags): (Flag, Vec<String>) = message.tags().collect_flags_and_tags();
{
let mut tag_lock = self.collection.tag_index.write().unwrap();
for tag in tags.iter() {
let mut hasher = DefaultHasher::new();
hasher.write(tag.as_bytes());
let num = hasher.finish();
tag_lock.entry(num).or_insert_with(|| tag.clone());
}
}
Ok(())
}
pub fn update_message_status(
&self,
database: &DbConnection,
message: Message<'_>,
) -> Result<()> {
use RefreshEventKind::*;
self.new_flags(&message)?;
let account_hash = self.account_hash;
let message_id = message.msg_id_cstr().to_string_lossy();
let env_hash = message.env_hash();
for (&mailbox_hash, m) in self.mailboxes.read().unwrap().iter() {
let query_str = format!("{} id:{}", m.query_str.as_str(), &message_id);
let query: Query = Query::new(database, &query_str)?;
if query.count().unwrap_or(0) > 0 {
if m.contains(&env_hash) {
let tags: (Flag, Vec<String>) = message.tags().collect_flags_and_tags();
(self.event_consumer)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: RefreshEventKind::NewFlags(env_hash, tags.clone()),
}),
);
} else {
let env = self.into_envelope(message.clone());
let mut total_lck = m.total.lock().unwrap();
let mut unseen_lck = m.unseen.lock().unwrap();
*total_lck += 1;
if !env.is_seen() {
*unseen_lck += 1;
}
m.insert(env.hash());
(self.event_consumer)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Create(Box::new(env)),
}),
);
}
} else if m.remove(&env_hash) {
(self.event_consumer)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Remove(env_hash),
}),
);
}
}
Ok(())
}
pub fn new_message(&self, database: &DbConnection, message: Message<'_>) -> Result<()> {
use RefreshEventKind::*;
let account_hash = self.account_hash;
let message_id = message.msg_id_cstr().to_string_lossy().to_string();
let env = self.into_envelope(message);
for (&mailbox_hash, m) in self.mailboxes.read().unwrap().iter() {
let query_str = format!("{} id:{}", m.query_str.as_str(), &message_id);
let query: Query = Query::new(database, &query_str)?;
if query.count().unwrap_or(0) > 0 {
let mut total_lck = m.total.lock().unwrap();
let mut unseen_lck = m.unseen.lock().unwrap();
*total_lck += 1;
if !env.is_seen() {
*unseen_lck += 1;
}
m.insert(env.hash());
(self.event_consumer)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Create(Box::new(env.clone())),
}),
);
}
}
Ok(())
}
pub fn remove(&self, env_hash: EnvelopeHash) {
use RefreshEventKind::*;
let account_hash = self.account_hash;
self.index.write().unwrap().remove(&env_hash);
for (&mailbox_hash, m) in self.mailboxes.write().unwrap().iter_mut() {
if m.remove(&env_hash) {
let mut total_lck = m.total.lock().unwrap();
//let mut unseen_lck = m.unseen.lock().unwrap();
*total_lck = total_lck.saturating_sub(1);
(self.event_consumer)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Remove(env_hash),
}),
);
}
}
}
}
#[derive(Debug, Clone, Default)] #[derive(Debug, Clone, Default)]
struct NotmuchMailbox { struct NotmuchMailbox {
@ -242,7 +325,7 @@ struct NotmuchMailbox {
path: String, path: String,
query_str: String, query_str: String,
usage: Arc<RwLock<SpecialUsageMailbox>>, usage: Arc<RwLock<SpecialUsageMailbox>>,
envelopes: Arc<RwLock<BTreeSet<EnvelopeHash>>>,
total: Arc<Mutex<usize>>, total: Arc<Mutex<usize>>,
unseen: Arc<Mutex<usize>>, unseen: Arc<Mutex<usize>>,
} }
@ -303,6 +386,20 @@ impl BackendMailbox for NotmuchMailbox {
unsafe impl Send for NotmuchMailbox {} unsafe impl Send for NotmuchMailbox {}
unsafe impl Sync for NotmuchMailbox {} unsafe impl Sync for NotmuchMailbox {}
impl NotmuchMailbox {
pub fn contains(&self, env_hash: &EnvelopeHash) -> bool {
self.envelopes.read().unwrap().contains(env_hash)
}
pub fn insert(&self, env_hash: EnvelopeHash) {
self.envelopes.write().unwrap().insert(env_hash);
}
pub fn remove(&self, env_hash: &EnvelopeHash) -> bool {
self.envelopes.write().unwrap().remove(env_hash)
}
}
impl NotmuchDb { impl NotmuchDb {
pub fn new( pub fn new(
s: &AccountSettings, s: &AccountSettings,
@ -379,6 +476,7 @@ impl NotmuchDb {
parent: None, parent: None,
query_str: query_str.to_string(), query_str: query_str.to_string(),
usage: Arc::new(RwLock::new(SpecialUsageMailbox::Normal)), usage: Arc::new(RwLock::new(SpecialUsageMailbox::Normal)),
envelopes: Arc::new(RwLock::new(BTreeSet::default())),
total: Arc::new(Mutex::new(0)), total: Arc::new(Mutex::new(0)),
unseen: Arc::new(Mutex::new(0)), unseen: Arc::new(Mutex::new(0)),
}, },
@ -398,19 +496,30 @@ impl NotmuchDb {
hasher.write(s.name().as_bytes()); hasher.write(s.name().as_bytes());
hasher.finish() hasher.finish()
}; };
Ok(Box::new(NotmuchDb { let account_name = Arc::new(s.name().to_string());
lib,
revision_uuid: Arc::new(RwLock::new(0)),
path,
index: Arc::new(RwLock::new(Default::default())),
mailbox_index: Arc::new(RwLock::new(Default::default())),
collection: Collection::default(),
let collection = Collection::default();
let state = Arc::new(NotmuchState {
lib: lib.clone(),
revision_uuid: Arc::new(RwLock::new(0)),
mailboxes: Arc::new(RwLock::new(mailboxes)), mailboxes: Arc::new(RwLock::new(mailboxes)),
index: Arc::new(RwLock::new(Default::default())),
collection: collection.clone(),
path: path.clone(),
account_name: account_name.clone(),
account_hash,
event_consumer: event_consumer.clone(),
save_messages_to: None, save_messages_to: None,
_account_name: Arc::new(s.name().to_string()), });
Ok(Box::new(NotmuchDb {
lib,
state,
collection,
path,
account_name,
account_hash, account_hash,
event_consumer, event_consumer,
save_messages_to: None,
})) }))
} }
@ -465,17 +574,12 @@ impl NotmuchDb {
Ok(()) Ok(())
} }
fn new_connection( fn new_connection(state: Arc<NotmuchState>, write: bool) -> Result<DbConnection> {
path: &Path, let path_c = std::ffi::CString::new(state.path.to_str().unwrap()).unwrap();
revision_uuid: Arc<RwLock<u64>>,
lib: Arc<libloading::Library>,
write: bool,
) -> Result<DbConnection> {
let path_c = std::ffi::CString::new(path.to_str().unwrap()).unwrap();
let path_ptr = path_c.as_ptr(); let path_ptr = path_c.as_ptr();
let mut database: *mut notmuch_database_t = std::ptr::null_mut(); let mut database: *mut notmuch_database_t = std::ptr::null_mut();
let status = unsafe { let status = unsafe {
call!(lib, notmuch_database_open)( call!(state.lib, notmuch_database_open)(
path_ptr, path_ptr,
if write { if write {
notmuch_database_mode_t_NOTMUCH_DATABASE_MODE_READ_WRITE notmuch_database_mode_t_NOTMUCH_DATABASE_MODE_READ_WRITE
@ -488,20 +592,19 @@ impl NotmuchDb {
if status != 0 { if status != 0 {
return Err(MeliError::new(format!( return Err(MeliError::new(format!(
"Could not open notmuch database at path {}. notmuch_database_open returned {}.", "Could not open notmuch database at path {}. notmuch_database_open returned {}.",
path.display(), state.path.display(),
status status
))); )));
} }
assert!(!database.is_null()); assert!(!database.is_null());
let ret = DbConnection { let ret = DbConnection {
lib, state,
revision_uuid,
inner: Arc::new(RwLock::new(database)), inner: Arc::new(RwLock::new(database)),
database_ph: std::marker::PhantomData, database_ph: std::marker::PhantomData,
}; };
if *ret.revision_uuid.read().unwrap() == 0 { if *ret.state.revision_uuid.read().unwrap() == 0 {
let new = ret.get_revision_uuid(); let new = ret.get_revision_uuid();
*ret.revision_uuid.write().unwrap() = new; *ret.state.revision_uuid.write().unwrap() = new;
} }
Ok(ret) Ok(ret)
} }
@ -531,17 +634,13 @@ impl MailBackend for NotmuchDb {
struct FetchState { struct FetchState {
mailbox_hash: MailboxHash, mailbox_hash: MailboxHash,
database: Arc<DbConnection>, database: Arc<DbConnection>,
index: Arc<RwLock<HashMap<EnvelopeHash, CString>>>, state: Arc<NotmuchState>,
mailbox_index: Arc<RwLock<HashMap<EnvelopeHash, SmallVec<[MailboxHash; 16]>>>>,
mailboxes: Arc<RwLock<HashMap<u64, NotmuchMailbox>>>,
tag_index: Arc<RwLock<BTreeMap<u64, String>>>,
iter: std::vec::IntoIter<CString>, iter: std::vec::IntoIter<CString>,
} }
impl FetchState { impl FetchState {
async fn fetch(&mut self) -> Result<Option<Vec<Envelope>>> { async fn fetch(&mut self) -> Result<Option<Vec<Envelope>>> {
let mut unseen_count = 0; let mut unseen_count = 0;
let chunk_size = 250; let chunk_size = 250;
let mut mailbox_index_lck = self.mailbox_index.write().unwrap();
let mut ret: Vec<Envelope> = Vec::with_capacity(chunk_size); let mut ret: Vec<Envelope> = Vec::with_capacity(chunk_size);
let mut done: bool = false; let mut done: bool = false;
for _ in 0..chunk_size { for _ in 0..chunk_size {
@ -552,11 +651,7 @@ impl MailBackend for NotmuchDb {
} else { } else {
continue; continue;
}; };
let env = message.into_envelope(&self.index, &self.tag_index); let env = self.state.into_envelope(message);
mailbox_index_lck
.entry(env.hash())
.or_default()
.push(self.mailbox_hash);
if !env.is_seen() { if !env.is_seen() {
unseen_count += 1; unseen_count += 1;
} }
@ -567,7 +662,7 @@ impl MailBackend for NotmuchDb {
} }
} }
{ {
let mailboxes_lck = self.mailboxes.read().unwrap(); let mailboxes_lck = self.state.mailboxes.read().unwrap();
let mailbox = mailboxes_lck.get(&self.mailbox_hash).unwrap(); let mailbox = mailboxes_lck.get(&self.mailbox_hash).unwrap();
let mut unseen_lck = mailbox.unseen.lock().unwrap(); let mut unseen_lck = mailbox.unseen.lock().unwrap();
*unseen_lck += unseen_count; *unseen_lck += unseen_count;
@ -579,19 +674,11 @@ impl MailBackend for NotmuchDb {
} }
} }
} }
let database = Arc::new(NotmuchDb::new_connection( let database = Arc::new(NotmuchDb::new_connection(self.state.clone(), false)?);
self.path.as_path(),
self.revision_uuid.clone(),
self.lib.clone(),
false,
)?);
let index = self.index.clone();
let mailbox_index = self.mailbox_index.clone();
let tag_index = self.collection.tag_index.clone();
let mailboxes = self.mailboxes.clone();
let v: Vec<CString>; let v: Vec<CString>;
let state = self.state.clone();
{ {
let mailboxes_lck = mailboxes.read().unwrap(); let mailboxes_lck = state.mailboxes.read().unwrap();
let mailbox = mailboxes_lck.get(&mailbox_hash).unwrap(); let mailbox = mailboxes_lck.get(&mailbox_hash).unwrap();
let query: Query = Query::new(&database, mailbox.query_str.as_str())?; let query: Query = Query::new(&database, mailbox.query_str.as_str())?;
{ {
@ -600,7 +687,7 @@ impl MailBackend for NotmuchDb {
*total_lck = query.count()? as usize; *total_lck = query.count()? as usize;
*unseen_lck = 0; *unseen_lck = 0;
} }
let mut index_lck = index.write().unwrap(); let mut index_lck = state.index.write().unwrap();
v = query v = query
.search()? .search()?
.into_iter() .into_iter()
@ -611,48 +698,27 @@ impl MailBackend for NotmuchDb {
.collect(); .collect();
} }
let mut state = FetchState { let mut fetch_state = FetchState {
mailbox_hash, mailbox_hash,
mailboxes, state,
database, database,
index,
mailbox_index,
tag_index,
iter: v.into_iter(), iter: v.into_iter(),
}; };
Ok(Box::pin(async_stream::try_stream! { Ok(Box::pin(async_stream::try_stream! {
while let Some(res) = state.fetch().await.map_err(|err| { debug!("fetch err {:?}", &err); err})? { while let Some(res) = fetch_state.fetch().await.map_err(|err| { debug!("fetch err {:?}", &err); err})? {
yield res; yield res;
} }
})) }))
} }
fn refresh(&mut self, _mailbox_hash: MailboxHash) -> ResultFuture<()> { fn refresh(&mut self, _mailbox_hash: MailboxHash) -> ResultFuture<()> {
let account_hash = self.account_hash; let mut database = NotmuchDb::new_connection(self.state.clone(), false)?;
let mut database = NotmuchDb::new_connection( let state = self.state.clone();
self.path.as_path(),
self.revision_uuid.clone(),
self.lib.clone(),
false,
)?;
let mailboxes = self.mailboxes.clone();
let index = self.index.clone();
let mailbox_index = self.mailbox_index.clone();
let tag_index = self.collection.tag_index.clone();
let event_consumer = self.event_consumer.clone();
Ok(Box::pin(async move { Ok(Box::pin(async move {
let new_revision_uuid = database.get_revision_uuid(); let new_revision_uuid = database.get_revision_uuid();
if new_revision_uuid > *database.revision_uuid.read().unwrap() { if new_revision_uuid > *state.revision_uuid.read().unwrap() {
database.refresh( database.refresh(new_revision_uuid)?;
mailboxes, *state.revision_uuid.write().unwrap() = new_revision_uuid;
index,
mailbox_index,
tag_index,
account_hash,
event_consumer,
new_revision_uuid,
)?;
*database.revision_uuid.write().unwrap() = new_revision_uuid;
} }
Ok(()) Ok(())
})) }))
@ -662,15 +728,7 @@ impl MailBackend for NotmuchDb {
extern crate notify; extern crate notify;
use notify::{watcher, RecursiveMode, Watcher}; use notify::{watcher, RecursiveMode, Watcher};
let account_hash = self.account_hash; let state = self.state.clone();
let collection = self.collection.clone();
let lib = self.lib.clone();
let path = self.path.clone();
let revision_uuid = self.revision_uuid.clone();
let mailboxes = self.mailboxes.clone();
let index = self.index.clone();
let mailbox_index = self.mailbox_index.clone();
let event_consumer = self.event_consumer.clone();
let (tx, rx) = std::sync::mpsc::channel(); let (tx, rx) = std::sync::mpsc::channel();
let mut watcher = watcher(tx, std::time::Duration::from_secs(2)).unwrap(); let mut watcher = watcher(tx, std::time::Duration::from_secs(2)).unwrap();
@ -681,24 +739,11 @@ impl MailBackend for NotmuchDb {
loop { loop {
let _ = rx.recv().map_err(|err| err.to_string())?; let _ = rx.recv().map_err(|err| err.to_string())?;
{ {
let mut database = NotmuchDb::new_connection( let mut database = NotmuchDb::new_connection(state.clone(), false)?;
path.as_path(),
revision_uuid.clone(),
lib.clone(),
false,
)?;
let new_revision_uuid = database.get_revision_uuid(); let new_revision_uuid = database.get_revision_uuid();
if new_revision_uuid > *database.revision_uuid.read().unwrap() { if new_revision_uuid > *state.revision_uuid.read().unwrap() {
database.refresh( database.refresh(new_revision_uuid)?;
mailboxes.clone(), *state.revision_uuid.write().unwrap() = new_revision_uuid;
index.clone(),
mailbox_index.clone(),
collection.tag_index.clone(),
account_hash,
event_consumer.clone(),
new_revision_uuid,
)?;
*revision_uuid.write().unwrap() = new_revision_uuid;
} }
} }
} }
@ -707,6 +752,7 @@ impl MailBackend for NotmuchDb {
fn mailboxes(&self) -> ResultFuture<HashMap<MailboxHash, Mailbox>> { fn mailboxes(&self) -> ResultFuture<HashMap<MailboxHash, Mailbox>> {
let ret = Ok(self let ret = Ok(self
.state
.mailboxes .mailboxes
.read() .read()
.unwrap() .unwrap()
@ -718,15 +764,8 @@ impl MailBackend for NotmuchDb {
fn operation(&self, hash: EnvelopeHash) -> Result<Box<dyn BackendOp>> { fn operation(&self, hash: EnvelopeHash) -> Result<Box<dyn BackendOp>> {
Ok(Box::new(NotmuchOp { Ok(Box::new(NotmuchOp {
database: Arc::new(Self::new_connection( database: Arc::new(Self::new_connection(self.state.clone(), true)?),
self.path.as_path(),
self.revision_uuid.clone(),
self.lib.clone(),
true,
)?),
lib: self.lib.clone(),
hash, hash,
index: self.index.clone(),
bytes: None, bytes: None,
})) }))
} }
@ -763,28 +802,23 @@ impl MailBackend for NotmuchDb {
_mailbox_hash: MailboxHash, _mailbox_hash: MailboxHash,
flags: SmallVec<[(std::result::Result<Flag, String>, bool); 8]>, flags: SmallVec<[(std::result::Result<Flag, String>, bool); 8]>,
) -> ResultFuture<()> { ) -> ResultFuture<()> {
let database = Self::new_connection( let database = Self::new_connection(self.state.clone(), true)?;
self.path.as_path(), let state = self.state.clone();
self.revision_uuid.clone(),
self.lib.clone(),
true,
)?;
let collection = self.collection.clone();
let index = self.index.clone();
Ok(Box::pin(async move { Ok(Box::pin(async move {
let mut index_lck = index.write().unwrap();
for env_hash in env_hashes.iter() { for env_hash in env_hashes.iter() {
debug!(&env_hash); debug!(&env_hash);
let message = match Message::find_message(&database, &index_lck[&env_hash]) { let message =
Ok(v) => v, match Message::find_message(&database, &state.index.read().unwrap()[&env_hash])
Err(err) => { {
debug!("not found {}", err); Ok(v) => v,
continue; Err(err) => {
} debug!("not found {}", err);
}; continue;
}
};
let tags = debug!(message.tags().collect::<Vec<&CStr>>()); let tags = message.tags().collect::<Vec<&CStr>>();
//flags.set(f, value); //flags.set(f, value);
macro_rules! cstr { macro_rules! cstr {
@ -819,8 +853,6 @@ impl MailBackend for NotmuchDb {
for (f, v) in flags.iter() { for (f, v) in flags.iter() {
let value = *v; let value = *v;
debug!(&f);
debug!(&value);
match f { match f {
Ok(Flag::DRAFT) if value => add_tag!(b"draft\0"), Ok(Flag::DRAFT) if value => add_tag!(b"draft\0"),
Ok(Flag::DRAFT) => remove_tag!(b"draft\0"), Ok(Flag::DRAFT) => remove_tag!(b"draft\0"),
@ -848,21 +880,7 @@ impl MailBackend for NotmuchDb {
/* Update message filesystem path. */ /* Update message filesystem path. */
message.tags_to_maildir_flags()?; message.tags_to_maildir_flags()?;
state.update_message_status(&database, message)?;
let msg_id = message.msg_id_cstr();
if let Some(p) = index_lck.get_mut(&env_hash) {
*p = msg_id.into();
}
}
for (f, v) in flags.iter() {
if let (Err(tag), true) = (f, v) {
let hash = tag_hash!(tag);
collection
.tag_index
.write()
.unwrap()
.insert(hash, tag.to_string());
}
} }
Ok(()) Ok(())
@ -882,17 +900,12 @@ impl MailBackend for NotmuchDb {
melib_query: crate::search::Query, melib_query: crate::search::Query,
mailbox_hash: Option<MailboxHash>, mailbox_hash: Option<MailboxHash>,
) -> ResultFuture<SmallVec<[EnvelopeHash; 512]>> { ) -> ResultFuture<SmallVec<[EnvelopeHash; 512]>> {
let database = NotmuchDb::new_connection( let database = NotmuchDb::new_connection(self.state.clone(), false)?;
self.path.as_path(), let state = self.state.clone();
self.revision_uuid.clone(),
self.lib.clone(),
false,
)?;
let mailboxes = self.mailboxes.clone();
Ok(Box::pin(async move { Ok(Box::pin(async move {
let mut ret = SmallVec::new(); let mut ret = SmallVec::new();
let mut query_s = if let Some(mailbox_hash) = mailbox_hash { let mut query_s = if let Some(mailbox_hash) = mailbox_hash {
if let Some(m) = mailboxes.read().unwrap().get(&mailbox_hash) { if let Some(m) = state.mailboxes.read().unwrap().get(&mailbox_hash) {
let mut s = m.query_str.clone(); let mut s = m.query_str.clone();
s.push(' '); s.push(' ');
s s
@ -930,17 +943,16 @@ impl MailBackend for NotmuchDb {
#[derive(Debug)] #[derive(Debug)]
struct NotmuchOp { struct NotmuchOp {
hash: EnvelopeHash, hash: EnvelopeHash,
index: Arc<RwLock<HashMap<EnvelopeHash, CString>>>,
database: Arc<DbConnection>, database: Arc<DbConnection>,
bytes: Option<Vec<u8>>, bytes: Option<Vec<u8>>,
#[allow(dead_code)]
lib: Arc<libloading::Library>,
} }
impl BackendOp for NotmuchOp { impl BackendOp for NotmuchOp {
fn as_bytes(&mut self) -> ResultFuture<Vec<u8>> { fn as_bytes(&mut self) -> ResultFuture<Vec<u8>> {
let index_lck = self.index.write().unwrap(); let message = Message::find_message(
let message = Message::find_message(&self.database, &index_lck[&self.hash])?; &self.database,
&self.database.state.index.write().unwrap()[&self.hash],
)?;
let mut f = std::fs::File::open(message.get_filename())?; let mut f = std::fs::File::open(message.get_filename())?;
let mut response = Vec::new(); let mut response = Vec::new();
f.read_to_end(&mut response)?; f.read_to_end(&mut response)?;
@ -950,202 +962,11 @@ impl BackendOp for NotmuchOp {
} }
fn fetch_flags(&self) -> ResultFuture<Flag> { fn fetch_flags(&self) -> ResultFuture<Flag> {
let index_lck = self.index.write().unwrap(); let message = Message::find_message(
let message = Message::find_message(&self.database, &index_lck[&self.hash])?; &self.database,
&self.database.state.index.write().unwrap()[&self.hash],
)?;
let (flags, _tags) = message.tags().collect_flags_and_tags(); let (flags, _tags) = message.tags().collect_flags_and_tags();
Ok(Box::pin(async move { Ok(flags) })) Ok(Box::pin(async move { Ok(flags) }))
} }
} }
pub struct Query<'s> {
#[allow(dead_code)]
lib: Arc<libloading::Library>,
ptr: *mut notmuch_query_t,
query_str: &'s str,
}
impl<'s> Query<'s> {
fn new(database: &DbConnection, query_str: &'s str) -> Result<Self> {
let lib: Arc<libloading::Library> = database.lib.clone();
let query_cstr = std::ffi::CString::new(query_str)?;
let query: *mut notmuch_query_t = unsafe {
call!(lib, notmuch_query_create)(*database.inner.read().unwrap(), query_cstr.as_ptr())
};
if query.is_null() {
return Err(MeliError::new("Could not create query. Out of memory?"));
}
Ok(Query {
lib,
ptr: query,
query_str,
})
}
fn count(&self) -> Result<u32> {
let mut count = 0_u32;
unsafe {
try_call!(
self.lib,
call!(self.lib, notmuch_query_count_messages)(self.ptr, &mut count as *mut _)
)
.map_err(|err| err.0)?;
}
Ok(count)
}
fn search(&'s self) -> Result<MessageIterator<'s>> {
let mut messages: *mut notmuch_messages_t = std::ptr::null_mut();
let status = unsafe {
call!(self.lib, notmuch_query_search_messages)(self.ptr, &mut messages as *mut _)
};
if status != 0 {
return Err(MeliError::new(format!(
"Search for {} returned {}",
self.query_str, status,
)));
}
assert!(!messages.is_null());
Ok(MessageIterator {
messages,
lib: self.lib.clone(),
_ph: std::marker::PhantomData,
is_from_thread: false,
})
}
}
impl Drop for Query<'_> {
fn drop(&mut self) {
unsafe {
call!(self.lib, notmuch_query_destroy)(self.ptr);
}
}
}
pub trait MelibQueryToNotmuchQuery {
fn query_to_string(&self, ret: &mut String);
}
impl MelibQueryToNotmuchQuery for crate::search::Query {
fn query_to_string(&self, ret: &mut String) {
use crate::search::Query::*;
match self {
Before(timestamp) => {
ret.push_str("date:..@");
ret.push_str(&timestamp.to_string());
}
After(timestamp) => {
ret.push_str("date:@");
ret.push_str(&timestamp.to_string());
ret.push_str("..");
}
Between(a, b) => {
ret.push_str("date:@");
ret.push_str(&a.to_string());
ret.push_str("..@");
ret.push_str(&b.to_string());
}
On(timestamp) => {
ret.push_str("date:@");
ret.push_str(&timestamp.to_string());
}
/* * * * */
From(s) => {
ret.push_str("from:\"");
for c in s.chars() {
if c == '"' {
ret.push_str("\\\"");
} else {
ret.push(c);
}
}
ret.push('"');
}
To(s) | Cc(s) | Bcc(s) => {
ret.push_str("to:\"");
for c in s.chars() {
if c == '"' {
ret.push_str("\\\"");
} else {
ret.push(c);
}
}
ret.push('"');
}
InReplyTo(_s) | References(_s) | AllAddresses(_s) => {}
/* * * * */
Body(s) => {
ret.push_str("body:\"");
for c in s.chars() {
if c == '"' {
ret.push_str("\\\"");
} else {
ret.push(c);
}
}
ret.push('"');
}
Subject(s) => {
ret.push_str("subject:\"");
for c in s.chars() {
if c == '"' {
ret.push_str("\\\"");
} else {
ret.push(c);
}
}
ret.push('"');
}
AllText(s) => {
ret.push('"');
for c in s.chars() {
if c == '"' {
ret.push_str("\\\"");
} else {
ret.push(c);
}
}
ret.push('"');
}
/* * * * */
Flags(v) => {
for f in v {
ret.push_str("tag:\"");
for c in f.chars() {
if c == '"' {
ret.push_str("\\\"");
} else {
ret.push(c);
}
}
ret.push_str("\" ");
}
if !v.is_empty() {
ret.pop();
}
}
HasAttachment => {
ret.push_str("tag:attachment");
}
And(q1, q2) => {
ret.push('(');
q1.query_to_string(ret);
ret.push_str(") AND (");
q2.query_to_string(ret);
ret.push(')');
}
Or(q1, q2) => {
ret.push('(');
q1.query_to_string(ret);
ret.push_str(") OR (");
q2.query_to_string(ret);
ret.push(')');
}
Not(q) => {
ret.push_str("(NOT (");
q.query_to_string(ret);
ret.push_str("))");
}
}
}
}

@ -33,7 +33,7 @@ pub struct Message<'m> {
impl<'m> Message<'m> { impl<'m> Message<'m> {
pub fn find_message(db: &'m DbConnection, msg_id: &CStr) -> Result<Message<'m>> { pub fn find_message(db: &'m DbConnection, msg_id: &CStr) -> Result<Message<'m>> {
let mut message: *mut notmuch_message_t = std::ptr::null_mut(); let mut message: *mut notmuch_message_t = std::ptr::null_mut();
let lib = db.lib.clone(); let lib = db.state.lib.clone();
unsafe { unsafe {
call!(lib, notmuch_database_find_message)( call!(lib, notmuch_database_find_message)(
*db.inner.read().unwrap(), *db.inner.read().unwrap(),
@ -89,26 +89,14 @@ impl<'m> Message<'m> {
(unsafe { call!(self.lib, notmuch_message_get_date)(self.message) }) as u64 (unsafe { call!(self.lib, notmuch_message_get_date)(self.message) }) as u64
} }
pub fn into_envelope( pub fn into_envelope(self) -> Envelope {
self,
index: &RwLock<HashMap<EnvelopeHash, CString>>,
tag_index: &RwLock<BTreeMap<u64, String>>,
) -> Envelope {
let env_hash = self.env_hash(); let env_hash = self.env_hash();
let mut env = Envelope::new(env_hash); let mut env = Envelope::new(env_hash);
index
.write()
.unwrap()
.insert(env_hash, self.msg_id_cstr().into());
let mut tag_lock = tag_index.write().unwrap();
let (flags, tags) = TagIterator::new(&self).collect_flags_and_tags(); let (flags, tags) = TagIterator::new(&self).collect_flags_and_tags();
for tag in tags { for tag in tags {
let mut hasher = DefaultHasher::new(); let mut hasher = DefaultHasher::new();
hasher.write(tag.as_bytes()); hasher.write(tag.as_bytes());
let num = hasher.finish(); let num = hasher.finish();
if !tag_lock.contains_key(&num) {
tag_lock.insert(num, tag);
}
env.labels_mut().push(num); env.labels_mut().push(num);
} }
unsafe { unsafe {

@ -0,0 +1,214 @@
/*
* meli - notmuch backend
*
* Copyright 2019 - 2022 Manos Pitsidianakis
*
* This file is part of meli.
*
* meli is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* meli is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
use super::*;
pub trait MelibQueryToNotmuchQuery {
fn query_to_string(&self, ret: &mut String);
}
impl MelibQueryToNotmuchQuery for crate::search::Query {
fn query_to_string(&self, ret: &mut String) {
use crate::search::Query::*;
match self {
Before(timestamp) => {
ret.push_str("date:..@");
ret.push_str(&timestamp.to_string());
}
After(timestamp) => {
ret.push_str("date:@");
ret.push_str(&timestamp.to_string());
ret.push_str("..");
}
Between(a, b) => {
ret.push_str("date:@");
ret.push_str(&a.to_string());
ret.push_str("..@");
ret.push_str(&b.to_string());
}
On(timestamp) => {
ret.push_str("date:@");
ret.push_str(&timestamp.to_string());
}
/* * * * */
From(s) => {
ret.push_str("from:\"");
for c in s.chars() {
if c == '"' {
ret.push_str("\\\"");
} else {
ret.push(c);
}
}
ret.push('"');
}
To(s) | Cc(s) | Bcc(s) => {
ret.push_str("to:\"");
for c in s.chars() {
if c == '"' {
ret.push_str("\\\"");
} else {
ret.push(c);
}
}
ret.push('"');
}
InReplyTo(_s) | References(_s) | AllAddresses(_s) => {}
/* * * * */
Body(s) => {
ret.push_str("body:\"");
for c in s.chars() {
if c == '"' {
ret.push_str("\\\"");
} else {
ret.push(c);
}
}
ret.push('"');
}
Subject(s) => {
ret.push_str("subject:\"");
for c in s.chars() {
if c == '"' {
ret.push_str("\\\"");
} else {
ret.push(c);
}
}
ret.push('"');
}
AllText(s) => {
ret.push('"');
for c in s.chars() {
if c == '"' {
ret.push_str("\\\"");
} else {
ret.push(c);
}
}
ret.push('"');
}
/* * * * */
Flags(v) => {
for f in v {
ret.push_str("tag:\"");
for c in f.chars() {
if c == '"' {
ret.push_str("\\\"");
} else {
ret.push(c);
}
}
ret.push_str("\" ");
}
if !v.is_empty() {
ret.pop();
}
}
HasAttachment => {
ret.push_str("tag:attachment");
}
And(q1, q2) => {
ret.push('(');
q1.query_to_string(ret);
ret.push_str(") AND (");
q2.query_to_string(ret);
ret.push(')');
}
Or(q1, q2) => {
ret.push('(');
q1.query_to_string(ret);
ret.push_str(") OR (");
q2.query_to_string(ret);
ret.push(')');
}
Not(q) => {
ret.push_str("(NOT (");
q.query_to_string(ret);
ret.push_str("))");
}
}
}
}
pub struct Query<'s> {
#[allow(dead_code)]
pub lib: Arc<libloading::Library>,
pub ptr: *mut notmuch_query_t,
pub query_str: &'s str,
}
impl<'s> Query<'s> {
pub fn new(database: &DbConnection, query_str: &'s str) -> Result<Self> {
let lib: Arc<libloading::Library> = database.state.lib.clone();
let query_cstr = std::ffi::CString::new(query_str)?;
let query: *mut notmuch_query_t = unsafe {
call!(lib, notmuch_query_create)(*database.inner.read().unwrap(), query_cstr.as_ptr())
};
if query.is_null() {
return Err(MeliError::new("Could not create query. Out of memory?"));
}
Ok(Query {
lib,
ptr: query,
query_str,
})
}
pub fn count(&self) -> Result<u32> {
let mut count = 0_u32;
unsafe {
try_call!(
self.lib,
call!(self.lib, notmuch_query_count_messages)(self.ptr, &mut count as *mut _)
)
.map_err(|err| err.0)?;
}
Ok(count)
}
pub fn search(&'s self) -> Result<MessageIterator<'s>> {
let mut messages: *mut notmuch_messages_t = std::ptr::null_mut();
let status = unsafe {
call!(self.lib, notmuch_query_search_messages)(self.ptr, &mut messages as *mut _)
};
if status != 0 {
return Err(MeliError::new(format!(
"Search for {} returned {}",
self.query_str, status,
)));
}
assert!(!messages.is_null());
Ok(MessageIterator {
messages,
lib: self.lib.clone(),
_ph: std::marker::PhantomData,
is_from_thread: false,
})
}
}
impl Drop for Query<'_> {
fn drop(&mut self) {
unsafe {
call!(self.lib, notmuch_query_destroy)(self.ptr);
}
}
}

@ -42,6 +42,10 @@ impl<'q> Thread<'q> {
(unsafe { call!(self.lib, notmuch_thread_get_total_messages)(self.ptr) }) as usize (unsafe { call!(self.lib, notmuch_thread_get_total_messages)(self.ptr) }) as usize
} }
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn iter(&'q self) -> MessageIterator<'q> { pub fn iter(&'q self) -> MessageIterator<'q> {
let ptr = unsafe { call!(self.lib, notmuch_thread_get_messages)(self.ptr) }; let ptr = unsafe { call!(self.lib, notmuch_thread_get_messages)(self.ptr) };
MessageIterator { MessageIterator {

Loading…
Cancel
Save