@ -19,12 +19,14 @@
* along with meli . If not , see < http ://www.gnu.org/licenses/>.
* /
use crate ::async_workers ::{ Async , AsyncBuilder, AsyncStatus , WorkContext} ;
use crate ::async_workers ::{ Async , WorkContext} ;
use crate ::backends ::* ;
use crate ::conf ::AccountSettings ;
use crate ::email ::* ;
use crate ::error ::{ MeliError , Result } ;
use reqwest ::blocking ::Client ;
use futures ::lock ::Mutex as FutureMutex ;
use isahc ::prelude ::HttpClient ;
use isahc ::ResponseExt ;
use std ::collections ::{ BTreeMap , HashMap } ;
use std ::convert ::TryFrom ;
use std ::str ::FromStr ;
@ -181,10 +183,10 @@ pub struct Store {
#[ derive(Debug) ]
pub struct JmapType {
account_name : String ,
online : Arc < Mutex< ( Instant , Result < ( ) > ) > > ,
online : Arc < Future Mutex< ( Instant , Result < ( ) > ) > > ,
is_subscribed : Arc < IsSubscribedFn > ,
server_conf : JmapServerConf ,
connection : Arc < JmapConnection> ,
connection : Arc < FutureMutex< JmapConnection> > ,
store : Arc < RwLock < Store > > ,
tag_index : Arc < RwLock < BTreeMap < u64 , String > > > ,
mailboxes : Arc < RwLock < HashMap < MailboxHash , JmapMailbox > > > ,
@ -193,7 +195,7 @@ pub struct JmapType {
impl MailBackend for JmapType {
fn capabilities ( & self ) -> MailBackendCapabilities {
const CAPABILITIES : MailBackendCapabilities = MailBackendCapabilities {
is_async : fals e,
is_async : tru e,
is_remote : true ,
supports_search : true ,
extensions : None ,
@ -202,61 +204,67 @@ impl MailBackend for JmapType {
CAPABILITIES
}
fn is_online ( & self ) -> Result < ( ) > {
if self . online . lock ( ) . unwrap ( ) . 1. is_err ( )
& & Instant ::now ( ) . duration_since ( self . online . lock ( ) . unwrap ( ) . 0 )
> = std ::time ::Duration ::new ( 2 , 0 )
{
let _ = self . mailboxes ( ) ;
}
self . online . lock ( ) . unwrap ( ) . 1. clone ( )
fn is_online_async ( & self ) -> ResultFuture < ( ) > {
let online = self . online . clone ( ) ;
Ok ( Box ::pin ( async move {
//match timeout(std::time::Duration::from_secs(3), connection.lock()).await {
let online_lck = online . lock ( ) . await ;
if online_lck . 1. is_err ( )
& & Instant ::now ( ) . duration_since ( online_lck . 0 ) > = std ::time ::Duration ::new ( 2 , 0 )
{
//let _ = self.mailboxes();
}
online_lck . 1. clone ( )
} ) )
}
fn fetch ( & mut self , mailbox_hash : MailboxHash ) -> Result < Async < Result < Vec < Envelope > > > > {
let mut w = AsyncBuilder ::new ( ) ;
fn fetch_async (
& mut self ,
mailbox_hash : MailboxHash ,
) -> Result < Pin < Box < dyn Stream < Item = Result < Vec < Envelope > > > + Send + ' static > > > {
let mailboxes = self . mailboxes . clone ( ) ;
let store = self . store . clone ( ) ;
let tag_index = self . tag_index . clone ( ) ;
let connection = self . connection . clone ( ) ;
let handle = {
let tx = w . tx ( ) ;
let closure = move | _work_context | {
tx . send ( AsyncStatus ::Payload ( protocol ::get (
& connection ,
& store ,
& tag_index ,
& mailboxes . read ( ) . unwrap ( ) [ & mailbox_hash ] ,
) ) )
. unwrap ( ) ;
tx . send ( AsyncStatus ::Finished ) . unwrap ( ) ;
} ;
Box ::new ( closure )
} ;
Ok ( w . build ( handle ) )
Ok ( Box ::pin ( async_stream ::try_stream ! {
let mut conn = connection . lock ( ) . await ;
conn . connect ( ) . await ? ;
let res = protocol ::fetch (
& conn ,
& store ,
& tag_index ,
& mailboxes ,
mailbox_hash ,
) . await ? ;
yield res ;
} ) )
}
fn watch (
& self ,
_sender : RefreshEventConsumer ,
_work_context : WorkContext ,
) -> Result < std ::thread ::ThreadId > {
fn watch_async ( & self , _sender : RefreshEventConsumer ) -> ResultFuture < ( ) > {
Err ( MeliError ::from ( "JMAP watch for updates is unimplemented" ) )
}
fn mailboxes ( & self ) -> Result < HashMap < MailboxHash , Mailbox > > {
if self . mailboxes . read ( ) . unwrap ( ) . is_empty ( ) {
let mailboxes = debug ! ( protocol ::get_mailboxes ( & self . connection ) ) ? ;
* self . mailboxes . write ( ) . unwrap ( ) = mailboxes ;
}
fn mailboxes_async ( & self ) -> ResultFuture < HashMap < MailboxHash , Mailbox > > {
let mailboxes = self . mailboxes . clone ( ) ;
let connection = self . connection . clone ( ) ;
Ok ( Box ::pin ( async move {
let mut conn = connection . lock ( ) . await ;
conn . connect ( ) . await ? ;
if mailboxes . read ( ) . unwrap ( ) . is_empty ( ) {
let new_mailboxes = debug ! ( protocol ::get_mailboxes ( & conn ) . await ) ? ;
* mailboxes . write ( ) . unwrap ( ) = new_mailboxes ;
}
let ret = mailboxes
. read ( )
. unwrap ( )
. iter ( )
. filter ( | ( _ , f ) | f . is_subscribed )
. map ( | ( & h , f ) | ( h , BackendMailbox ::clone ( f ) as Mailbox ) )
. collect ( ) ;
Ok ( self
. mailboxes
. read ( )
. unwrap ( )
. iter ( )
. filter ( | ( _ , f ) | f . is_subscribed )
. map ( | ( & h , f ) | ( h , BackendMailbox ::clone ( f ) as Mailbox ) )
. collect ( ) )
Ok ( ret )
} ) )
}
fn operation ( & self , hash : EnvelopeHash ) -> Result < Box < dyn BackendOp > > {
@ -289,7 +297,7 @@ impl MailBackend for JmapType {
q : crate ::search ::Query ,
mailbox_hash : Option < MailboxHash > ,
) -> ResultFuture < SmallVec < [ EnvelopeHash ; 512 ] > > {
let conn = self . connection . clone ( ) ;
let conn ection = self . connection . clone ( ) ;
let filter = if let Some ( mailbox_hash ) = mailbox_hash {
let mailbox_id = self . mailboxes . read ( ) . unwrap ( ) [ & mailbox_hash ] . id . clone ( ) ;
@ -304,44 +312,57 @@ impl MailBackend for JmapType {
Filter ::< EmailFilterCondition , EmailObject > ::from ( q )
} ;
let email_call : EmailQuery = EmailQuery ::new (
Query ::new ( )
. account_id ( conn . mail_account_id ( ) . to_string ( ) )
. filter ( Some ( filter ) )
. position ( 0 ) ,
)
. collapse_threads ( false ) ;
let mut req = Request ::new ( conn . request_no . clone ( ) ) ;
req . add_call ( & email_call ) ;
let res = conn
. client
. lock ( )
. unwrap ( )
. post ( & conn . session . api_url )
. basic_auth (
& conn . server_conf . server_username ,
Some ( & conn . server_conf . server_password ) ,
Ok ( Box ::pin ( async move {
let mut conn = connection . lock ( ) . await ;
conn . connect ( ) . await ? ;
let email_call : EmailQuery = EmailQuery ::new (
Query ::new ( )
. account_id ( conn . mail_account_id ( ) . to_string ( ) )
. filter ( Some ( filter ) )
. position ( 0 ) ,
)
. json ( & req )
. send ( ) ;
let res_text = res ? . text ( ) ? ;
let mut v : MethodResponse = serde_json ::from_str ( & res_text ) . unwrap ( ) ;
* conn . online_status . lock ( ) . unwrap ( ) = ( std ::time ::Instant ::now ( ) , Ok ( ( ) ) ) ;
let m = QueryResponse ::< EmailObject > ::try_from ( v . method_responses . remove ( 0 ) ) ? ;
let QueryResponse ::< EmailObject > { ids , .. } = m ;
let ret = ids
. into_iter ( )
. map ( | id | {
use std ::hash ::Hasher ;
let mut h = std ::collections ::hash_map ::DefaultHasher ::new ( ) ;
h . write ( id . as_bytes ( ) ) ;
h . finish ( )
} )
. collect ( ) ;
Ok ( Box ::pin ( async move { Ok ( ret ) } ) )
. collapse_threads ( false ) ;
let mut req = Request ::new ( conn . request_no . clone ( ) ) ;
req . add_call ( & email_call ) ;
let mut res = conn
. client
. post_async ( & conn . session . api_url , serde_json ::to_string ( & req ) ? )
. await ? ;
let res_text = res . text_async ( ) . await ? ;
let mut v : MethodResponse = serde_json ::from_str ( & res_text ) . unwrap ( ) ;
* conn . online_status . lock ( ) . await = ( std ::time ::Instant ::now ( ) , Ok ( ( ) ) ) ;
let m = QueryResponse ::< EmailObject > ::try_from ( v . method_responses . remove ( 0 ) ) ? ;
let QueryResponse ::< EmailObject > { ids , .. } = m ;
let ret = ids
. into_iter ( )
. map ( | id | {
use std ::hash ::Hasher ;
let mut h = std ::collections ::hash_map ::DefaultHasher ::new ( ) ;
h . write ( id . as_bytes ( ) ) ;
h . finish ( )
} )
. collect ( ) ;
Ok ( ret )
} ) )
}
fn fetch ( & mut self , _mailbox_hash : MailboxHash ) -> Result < Async < Result < Vec < Envelope > > > > {
Err ( MeliError ::new ( "Unimplemented." ) )
}
fn watch (
& self ,
_sender : RefreshEventConsumer ,
_work_context : WorkContext ,
) -> Result < std ::thread ::ThreadId > {
Err ( MeliError ::new ( "Unimplemented." ) )
}
fn mailboxes ( & self ) -> Result < HashMap < MailboxHash , Mailbox > > {
Err ( MeliError ::new ( "Unimplemented." ) )
}
}
@ -350,14 +371,17 @@ impl JmapType {
s : & AccountSettings ,
is_subscribed : Box < dyn Fn ( & str ) -> bool + Send + Sync > ,
) -> Result < Box < dyn MailBackend > > {
let online = Arc ::new ( Mutex::new ( (
let online = Arc ::new ( Future Mutex::new ( (
std ::time ::Instant ::now ( ) ,
Err ( MeliError ::new ( "Account is uninitialised." ) ) ,
) ) ) ;
let server_conf = JmapServerConf ::new ( s ) ? ;
Ok ( Box ::new ( JmapType {
connection : Arc ::new ( JmapConnection ::new ( & server_conf , online . clone ( ) ) ? ) ,
connection : Arc ::new ( FutureMutex ::new ( JmapConnection ::new (
& server_conf ,
online . clone ( ) ,
) ? ) ) ,
store : Arc ::new ( RwLock ::new ( Store ::default ( ) ) ) ,
tag_index : Arc ::new ( RwLock ::new ( Default ::default ( ) ) ) ,
mailboxes : Arc ::new ( RwLock ::new ( HashMap ::default ( ) ) ) ,