@ -28,7 +28,7 @@ use crate::jobs1::{JobExecutor, JobId, JoinHandle};
use melib ::async_workers ::{ Async , AsyncBuilder , AsyncStatus , WorkContext } ;
use melib ::backends ::{
AccountHash , BackendOp , Backends , MailBackend , Mailbox , MailboxHash , NotifyFn , ReadOnlyOp ,
RefreshEvent , RefreshEventConsumer , RefreshEventKind , SpecialUsageMailbox,
RefreshEvent , RefreshEventConsumer , RefreshEventKind , ResultFuture, SpecialUsageMailbox,
} ;
use melib ::email ::* ;
use melib ::error ::{ MeliError , Result } ;
@ -43,6 +43,7 @@ use crate::types::UIEvent::{self, EnvelopeRemove, EnvelopeRename, EnvelopeUpdate
use crate ::{ StatusEvent , ThreadEvent } ;
use crossbeam ::Sender ;
use futures ::channel ::oneshot ;
use futures ::future ::FutureExt ;
pub use futures ::stream ::Stream ;
use futures ::stream ::StreamExt ;
use std ::collections ::VecDeque ;
@ -131,13 +132,13 @@ pub struct Account {
pub ( crate ) backend : Arc < RwLock < Box < dyn MailBackend > > > ,
pub job_executor : Arc < JobExecutor > ,
active_jobs : HashMap < JobId , JobRequest > ,
pub active_jobs : HashMap < JobId , JobRequest > ,
sender : Sender < ThreadEvent > ,
event_queue : VecDeque < ( MailboxHash , RefreshEvent ) > ,
notify_fn : Arc < NotifyFn > ,
}
enum JobRequest {
pub enum JobRequest {
Mailboxes ( oneshot ::Receiver < Result < HashMap < MailboxHash , Mailbox > > > ) ,
Get (
MailboxHash ,
@ -154,11 +155,7 @@ enum JobRequest {
CreateMailbox ( oneshot ::Receiver < Result < ( MailboxHash , HashMap < MailboxHash , Mailbox > ) > > ) ,
DeleteMailbox ( oneshot ::Receiver < Result < HashMap < MailboxHash , Mailbox > > > ) ,
//RenameMailbox,
Search (
crate ::search ::Query ,
Option < MailboxHash > ,
oneshot ::Receiver < Result < SmallVec < [ EnvelopeHash ; 512 ] > > > ,
) ,
Search ,
SetMailboxPermissions ( MailboxHash , oneshot ::Receiver < Result < ( ) > > ) ,
SetMailboxSubscription ( MailboxHash , oneshot ::Receiver < Result < ( ) > > ) ,
Watch ( JoinHandle ) ,
@ -177,7 +174,7 @@ impl core::fmt::Debug for JobRequest {
JobRequest ::CreateMailbox ( _ ) = > write! ( f , "{}" , "JobRequest::CreateMailbox" ) ,
JobRequest ::DeleteMailbox ( _ ) = > write! ( f , "{}" , "JobRequest::DeleteMailbox" ) ,
//JobRequest::RenameMailbox,
JobRequest ::Search ( _ , _ , _ ) = > write! ( f , "{}" , "JobRequest::Search" ) ,
JobRequest ::Search = > write! ( f , "{}" , "JobRequest::Search" ) ,
JobRequest ::SetMailboxPermissions ( _ , _ ) = > {
write! ( f , "{}" , "JobRequest::SetMailboxPermissions" )
}
@ -189,6 +186,22 @@ impl core::fmt::Debug for JobRequest {
}
}
impl JobRequest {
fn is_get ( & self , mailbox_hash : MailboxHash ) -> bool {
match self {
JobRequest ::Get ( h , _ ) if * h = = mailbox_hash = > true ,
_ = > false ,
}
}
fn is_online ( & self ) -> bool {
match self {
JobRequest ::IsOnline ( _ ) = > true ,
_ = > false ,
}
}
}
impl Drop for Account {
fn drop ( & mut self ) {
if let Ok ( data_dir ) = xdg ::BaseDirectories ::with_profile ( "meli" , & self . name ) {
@ -294,14 +307,17 @@ impl Account {
let mut active_jobs = HashMap ::default ( ) ;
if settings . conf . is_async {
if let Ok ( mailboxes_job ) = backend . mailboxes_async ( ) {
if let Ok ( online_job ) = backend . is_online_async ( ) {
let ( rcvr , job_id ) =
job_executor . spawn_specialized ( online_job . then ( | _ | mailboxes_job ) ) ;
active_jobs . insert ( job_id , JobRequest ::Mailboxes ( rcvr ) ) ;
}
}
if let Ok ( online_job ) = backend . is_online_async ( ) {
let ( rcvr , job_id ) = job_executor . spawn_specialized ( online_job ) ;
active_jobs . insert ( job_id , JobRequest ::IsOnline ( rcvr ) ) ;
}
if let Ok ( mailboxes_job ) = backend . mailboxes_async ( ) {
let ( rcvr , job_id ) = job_executor . spawn_specialized ( mailboxes_job ) ;
active_jobs . insert ( job_id , JobRequest ::Mailboxes ( rcvr ) ) ;
}
}
Ok ( Account {
@ -460,6 +476,11 @@ impl Account {
if let Ok ( mailbox_job ) = self . backend . write ( ) . unwrap ( ) . get_async ( & f ) {
let mailbox_job = mailbox_job . into_future ( ) ;
let ( rcvr , job_id ) = self . job_executor . spawn_specialized ( mailbox_job ) ;
self . sender
. send ( ThreadEvent ::UIEvent ( UIEvent ::StatusEvent (
StatusEvent ::NewJob ( job_id . clone ( ) ) ,
) ) )
. unwrap ( ) ;
self . active_jobs . insert ( job_id , JobRequest ::Get ( * h , rcvr ) ) ;
}
} else {
@ -790,6 +811,11 @@ impl Account {
if self . settings . conf . is_async {
if let Ok ( refresh_job ) = self . backend . write ( ) . unwrap ( ) . refresh_async ( mailbox_hash , r ) {
let ( rcvr , job_id ) = self . job_executor . spawn_specialized ( refresh_job ) ;
self . sender
. send ( ThreadEvent ::UIEvent ( UIEvent ::StatusEvent (
StatusEvent ::NewJob ( job_id . clone ( ) ) ,
) ) )
. unwrap ( ) ;
self . active_jobs
. insert ( job_id , JobRequest ::Refresh ( mailbox_hash , rcvr ) ) ;
}
@ -882,18 +908,25 @@ impl Account {
}
MailboxStatus ::None = > {
if self . settings . conf . is_async {
if let Ok ( mailbox_job ) =
self . backend . write ( ) . unwrap ( ) . get_async (
& & self . mailbox_entries [ & mailbox_hash ] . ref_mailbox ,
)
{
let mailbox_job = mailbox_job . into_future ( ) ;
let ( rcvr , job_id ) =
self . job_executor . spawn_specialized ( mailbox_job ) ;
self . active_jobs
. insert ( job_id , JobRequest ::Get ( mailbox_hash , rcvr ) ) ;
if ! self . active_jobs . values ( ) . any ( | j | j . is_get ( mailbox_hash ) ) {
if let Ok ( mailbox_job ) =
self . backend . write ( ) . unwrap ( ) . get_async (
& & self . mailbox_entries [ & mailbox_hash ] . ref_mailbox ,
)
{
let mailbox_job = mailbox_job . into_future ( ) ;
let ( rcvr , job_id ) =
self . job_executor . spawn_specialized ( mailbox_job ) ;
self . sender
. send ( ThreadEvent ::UIEvent ( UIEvent ::StatusEvent (
StatusEvent ::NewJob ( job_id . clone ( ) ) ,
) ) )
. unwrap ( ) ;
self . active_jobs
. insert ( job_id , JobRequest ::Get ( mailbox_hash , rcvr ) ) ;
}
}
} else {
} else if self . mailbox_entries [ & mailbox_hash ] . worker . is_none ( ) {
let handle = Account ::new_worker (
self . mailbox_entries [ & mailbox_hash ] . ref_mailbox . clone ( ) ,
& mut self . backend ,
@ -991,7 +1024,7 @@ impl Account {
}
pub fn save_special (
& self ,
& mut self ,
bytes : & [ u8 ] ,
mailbox_type : SpecialUsageMailbox ,
flags : Flag ,
@ -1042,27 +1075,55 @@ impl Account {
}
}
pub fn save ( & self , bytes : & [ u8 ] , mailbox_hash : MailboxHash , flags : Option < Flag > ) -> Result < ( ) > {
pub fn save (
& mut self ,
bytes : & [ u8 ] ,
mailbox_hash : MailboxHash ,
flags : Option < Flag > ,
) -> Result < ( ) > {
if self . settings . account . read_only ( ) {
return Err ( MeliError ::new ( format! (
"Account {} is read-only." ,
self . name . as_str ( )
) ) ) ;
}
self . backend
let job = self
. backend
. write ( )
. unwrap ( )
. save ( bytes , mailbox_hash , flags )
. save ( bytes . to_vec ( ) , mailbox_hash , flags ) ? ;
let ( rcvr , job_id ) = self . job_executor . spawn_specialized ( job ) ;
self . sender
. send ( ThreadEvent ::UIEvent ( UIEvent ::StatusEvent (
StatusEvent ::NewJob ( job_id . clone ( ) ) ,
) ) )
. unwrap ( ) ;
self . active_jobs
. insert ( job_id , JobRequest ::SaveMessage ( mailbox_hash , rcvr ) ) ;
Ok ( ( ) )
}
pub fn delete ( & self , env_hash : EnvelopeHash , mailbox_hash : MailboxHash ) -> Result < ( ) > {
pub fn delete ( & mut self , env_hash : EnvelopeHash , mailbox_hash : MailboxHash ) -> Result < ( ) > {
if self . settings . account . read_only ( ) {
return Err ( MeliError ::new ( format! (
"Account {} is read-only." ,
self . name . as_str ( )
) ) ) ;
}
self . backend . write ( ) . unwrap ( ) . delete ( env_hash , mailbox_hash )
let job = self
. backend
. write ( )
. unwrap ( )
. delete ( env_hash , mailbox_hash ) ? ;
let ( rcvr , job_id ) = self . job_executor . spawn_specialized ( job ) ;
self . sender
. send ( ThreadEvent ::UIEvent ( UIEvent ::StatusEvent (
StatusEvent ::NewJob ( job_id . clone ( ) ) ,
) ) )
. unwrap ( ) ;
self . active_jobs
. insert ( job_id , JobRequest ::DeleteMessage ( env_hash , rcvr ) ) ;
Ok ( ( ) )
}
pub fn contains_key ( & self , h : EnvelopeHash ) -> bool {
@ -1091,11 +1152,12 @@ impl Account {
}
match op {
MailboxOperation ::Create ( path ) = > {
let ( mailbox_hash , mut mailboxes ) = self
. backend
. write ( )
. unwrap ( )
. create_mailbox ( path . to_string ( ) ) ? ;
let ( mailbox_hash , mut mailboxes ) = futures ::executor ::block_on (
self . backend
. write ( )
. unwrap ( )
. create_mailbox ( path . to_string ( ) ) ? ,
) ? ;
self . sender
. send ( ThreadEvent ::UIEvent ( UIEvent ::MailboxCreate ( (
self . index ,
@ -1159,7 +1221,9 @@ impl Account {
return Err ( MeliError ::new ( "Cannot delete only mailbox." ) ) ;
}
let mailbox_hash = self . mailbox_by_path ( & path ) ? ;
let mut mailboxes = self . backend . write ( ) . unwrap ( ) . delete_mailbox ( mailbox_hash ) ? ;
let mut mailboxes = futures ::executor ::block_on (
self . backend . write ( ) . unwrap ( ) . delete_mailbox ( mailbox_hash ) ? ,
) ? ;
self . sender
. send ( ThreadEvent ::UIEvent ( UIEvent ::MailboxDelete ( (
self . index ,
@ -1254,13 +1318,7 @@ impl Account {
if self . is_online {
return Ok ( ( ) ) ;
}
if ! self . active_jobs . values ( ) . any ( | j | {
if let JobRequest ::IsOnline ( _ ) = j {
true
} else {
false
}
} ) {
if ! self . active_jobs . values ( ) . any ( JobRequest ::is_online ) {
if let Ok ( online_job ) = self . backend . read ( ) . unwrap ( ) . is_online_async ( ) {
let ( rcvr , job_id ) = self . job_executor . spawn_specialized ( online_job ) ;
self . active_jobs . insert ( job_id , JobRequest ::IsOnline ( rcvr ) ) ;
@ -1270,7 +1328,7 @@ impl Account {
} else {
let ret = self . backend . read ( ) . unwrap ( ) . is_online ( ) ;
if ret . is_ok ( ) ! = self . is_online & & ret . is_ok ( ) {
//self.init()?;
self . init ( None ) ? ;
}
self . is_online = ret . is_ok ( ) ;
if ! self . is_online {
@ -1285,33 +1343,14 @@ impl Account {
search_term : & str ,
sort : ( SortField , SortOrder ) ,
mailbox_hash : MailboxHash ,
) -> Result < SmallVec < [ EnvelopeHash ; 512 ] > > {
if self . settings . account ( ) . format ( ) = = "imap" {
use melib ::parsec ::Parser ;
let query = melib ::search ::query ( ) . parse ( search_term ) ? . 1 ;
return self
. backend
. read ( )
. unwrap ( )
. search ( query , Some ( mailbox_hash ) ) ;
}
#[ cfg(feature = " notmuch " ) ]
{
if self . settings . account ( ) . format ( ) = = "notmuch" {
let backend_lck = self . backend . read ( ) . unwrap ( ) ;
let b = ( * backend_lck ) . as_any ( ) ;
return if let Some ( notmuch_backend ) = b . downcast_ref ::< melib ::backends ::NotmuchDb > ( )
{
notmuch_backend . search ( search_term )
} else {
Err ( MeliError ::new (
"Internal error: Could not downcast backend to NotmuchDb" ,
) )
} ;
}
}
) -> ResultFuture < SmallVec < [ EnvelopeHash ; 512 ] > > {
use melib ::parsec ::Parser ;
let query = melib ::search ::query ( ) . parse ( search_term ) ? . 1 ;
self . backend
. read ( )
. unwrap ( )
. search ( query , Some ( mailbox_hash ) )
/*
#[ cfg(feature = " sqlite3 " ) ]
{
crate ::sqlite3 ::search ( search_term , sort )
@ -1346,6 +1385,7 @@ impl Account {
}
Ok ( ret )
}
* /
}
pub fn mailbox_by_path ( & self , path : & str ) -> Result < MailboxHash > {
@ -1379,6 +1419,12 @@ impl Account {
}
}
JobRequest ::Get ( mailbox_hash , mut chan ) = > {
self . sender
. send ( ThreadEvent ::UIEvent ( UIEvent ::StatusEvent (
StatusEvent ::JobFinished ( job_id . clone ( ) ) ,
) ) )
. unwrap ( ) ;
let ( payload , rest ) : ( Option < Result < Vec < Envelope > > > , _ ) =
chan . try_recv ( ) . unwrap ( ) . unwrap ( ) ;
debug ! ( "got payload in status for {}" , mailbox_hash ) ;
@ -1398,8 +1444,12 @@ impl Account {
. unwrap ( ) ;
return true ;
}
let ( rcvr , job_id ) = self . job_executor . spawn_specialized ( rest . into_future ( ) ) ;
self . sender
. send ( ThreadEvent ::UIEvent ( UIEvent ::StatusEvent (
StatusEvent ::NewJob ( job_id . clone ( ) ) ,
) ) )
. unwrap ( ) ;
self . active_jobs
. insert ( job_id , JobRequest ::Get ( mailbox_hash , rcvr ) ) ;
let payload = payload . unwrap ( ) ;
@ -1465,18 +1515,11 @@ impl Account {
if r . is_some ( ) & & r . unwrap ( ) . is_ok ( ) {
self . is_online = true ;
}
}
JobRequest ::Refresh ( _ , mut chan ) = > {
let r = chan . try_recv ( ) . unwrap ( ) ;
if let Some ( Err ( err ) ) = r {
self . sender
. send ( ThreadEvent ::UIEvent ( UIEvent ::Notification (
Some ( format! ( "{} refresh exited with error" , & self . name ) ) ,
err . to_string ( ) ,
Some ( crate ::types ::NotificationType ::ERROR ) ,
) ) )
. expect ( "Could not send event on main channel" ) ;
}
self . sender
. send ( ThreadEvent ::UIEvent ( UIEvent ::StatusEvent (
StatusEvent ::JobFinished ( job_id . clone ( ) ) ,
) ) )
. unwrap ( ) ;
}
JobRequest ::SetFlags ( _ , mut chan ) = > {
let r = chan . try_recv ( ) . unwrap ( ) ;
@ -1558,21 +1601,12 @@ impl Account {
}
}
//JobRequest::RenameMailbox,
JobRequest ::Search ( _ , _ , mut chan ) = > {
let r = chan . try_recv ( ) . unwrap ( ) ;
match r {
Some ( Err ( err ) ) = > {
self . sender
. send ( ThreadEvent ::UIEvent ( UIEvent ::Notification (
Some ( format! ( "{}: could not perform search" , & self . name ) ) ,
err . to_string ( ) ,
Some ( crate ::types ::NotificationType ::ERROR ) ,
) ) )
. expect ( "Could not send event on main channel" ) ;
}
Some ( Ok ( v ) ) = > unimplemented! ( ) ,
None = > { }
}
JobRequest ::Search = > {
self . sender
. send ( ThreadEvent ::UIEvent ( UIEvent ::StatusEvent (
StatusEvent ::JobFinished ( job_id . clone ( ) ) ,
) ) )
. unwrap ( ) ;
}
JobRequest ::SetMailboxPermissions ( _ , mut chan ) = > {
let r = chan . try_recv ( ) . unwrap ( ) ;