@ -27,12 +27,26 @@ use futures::lock::Mutex as FutureMutex;
use isahc ::prelude ::HttpClient ;
use isahc ::ResponseExt ;
use serde_json ::Value ;
use std ::collections ::{ BTreeMap, HashMap } ;
use std ::collections ::{ hash_map::DefaultHasher , BTreeMap, HashMap , HashSet } ;
use std ::convert ::TryFrom ;
use std ::hash ::{ Hash , Hasher } ;
use std ::str ::FromStr ;
use std ::sync ::{ Arc , Mutex , RwLock } ;
use std ::time ::Instant ;
macro_rules! tag_hash {
( $t :ident ) = > { {
let mut hasher = DefaultHasher ::default ( ) ;
$t . hash ( & mut hasher ) ;
hasher . finish ( )
} } ;
( $t :literal ) = > { {
let mut hasher = DefaultHasher ::default ( ) ;
$t . hash ( & mut hasher ) ;
hasher . finish ( )
} } ;
}
#[ macro_export ]
macro_rules! _impl {
( $( #[ $outer:meta ] ) * $field :ident : $t :ty ) = > {
@ -131,7 +145,7 @@ impl JmapServerConf {
}
}
struct IsSubscribedFn ( Box < dyn Fn ( & str ) -> bool + Send + Sync > ) ;
pub struct IsSubscribedFn ( Box < dyn Fn ( & str ) -> bool + Send + Sync > ) ;
impl std ::fmt ::Debug for IsSubscribedFn {
fn fmt ( & self , f : & mut std ::fmt ::Formatter ) -> std ::fmt ::Result {
@ -173,24 +187,116 @@ macro_rules! get_conf_val {
} ;
}
#[ derive(Debug , Default )]
#[ derive(Debug )]
pub struct Store {
byte_cache : HashMap < EnvelopeHash , EnvelopeCache > ,
id_store : HashMap < EnvelopeHash , Id > ,
blob_id_store : HashMap < EnvelopeHash , Id > ,
pub account_name : Arc < String > ,
pub account_hash : AccountHash ,
pub account_id : Arc < Mutex < String > > ,
pub byte_cache : Arc < Mutex < HashMap < EnvelopeHash , EnvelopeCache > > > ,
pub id_store : Arc < Mutex < HashMap < EnvelopeHash , Id > > > ,
pub reverse_id_store : Arc < Mutex < HashMap < Id , EnvelopeHash > > > ,
pub blob_id_store : Arc < Mutex < HashMap < EnvelopeHash , Id > > > ,
pub tag_index : Arc < RwLock < BTreeMap < u64 , String > > > ,
pub mailboxes : Arc < RwLock < HashMap < MailboxHash , JmapMailbox > > > ,
pub mailboxes_index : Arc < RwLock < HashMap < MailboxHash , HashSet < EnvelopeHash > > > > ,
pub object_set_states : Arc < Mutex < HashMap < & ' static str , String > > > ,
pub online_status : Arc < FutureMutex < ( Instant , Result < ( ) > ) > > ,
pub is_subscribed : Arc < IsSubscribedFn > ,
pub event_consumer : BackendEventConsumer ,
}
impl Store {
pub fn add_envelope ( & self , obj : EmailObject ) -> Envelope {
let mut tag_lck = self . tag_index . write ( ) . unwrap ( ) ;
let tags = obj
. keywords ( )
. keys ( )
. map ( | tag | {
let tag_hash = {
let mut hasher = DefaultHasher ::default ( ) ;
tag . hash ( & mut hasher ) ;
hasher . finish ( )
} ;
if ! tag_lck . contains_key ( & tag_hash ) {
tag_lck . insert ( tag_hash , tag . to_string ( ) ) ;
}
tag_hash
} )
. collect ::< SmallVec < [ u64 ; 1024 ] > > ( ) ;
let id = obj . id . clone ( ) ;
let mailbox_ids = obj . mailbox_ids . clone ( ) ;
let blob_id = obj . blob_id . clone ( ) ;
drop ( tag_lck ) ;
let mut ret : Envelope = obj . into ( ) ;
debug_assert_eq! ( tag_hash ! ( "$draft" ) , 6613915297903591176 ) ;
debug_assert_eq! ( tag_hash ! ( "$seen" ) , 1683863812294339685 ) ;
debug_assert_eq! ( tag_hash ! ( "$flagged" ) , 2714010747478170100 ) ;
debug_assert_eq! ( tag_hash ! ( "$answered" ) , 8940855303929342213 ) ;
debug_assert_eq! ( tag_hash ! ( "$junk" ) , 2656839745430720464 ) ;
debug_assert_eq! ( tag_hash ! ( "$notjunk" ) , 4091323799684325059 ) ;
let mut id_store_lck = self . id_store . lock ( ) . unwrap ( ) ;
let mut reverse_id_store_lck = self . reverse_id_store . lock ( ) . unwrap ( ) ;
let mut blob_id_store_lck = self . blob_id_store . lock ( ) . unwrap ( ) ;
let mailboxes_lck = self . mailboxes . read ( ) . unwrap ( ) ;
let mut mailboxes_index_lck = self . mailboxes_index . write ( ) . unwrap ( ) ;
for ( mailbox_id , _ ) in mailbox_ids {
if let Some ( ( mailbox_hash , _ ) ) = mailboxes_lck . iter ( ) . find ( | ( _ , m ) | m . id = = mailbox_id )
{
mailboxes_index_lck
. entry ( * mailbox_hash )
. or_default ( )
. insert ( ret . hash ( ) ) ;
}
}
reverse_id_store_lck . insert ( id . clone ( ) , ret . hash ( ) ) ;
id_store_lck . insert ( ret . hash ( ) , id ) ;
blob_id_store_lck . insert ( ret . hash ( ) , blob_id ) ;
for t in tags {
match t {
6613915297903591176 = > {
ret . set_flags ( ret . flags ( ) | Flag ::DRAFT ) ;
}
1683863812294339685 = > {
ret . set_flags ( ret . flags ( ) | Flag ::SEEN ) ;
}
2714010747478170100 = > {
ret . set_flags ( ret . flags ( ) | Flag ::FLAGGED ) ;
}
8940855303929342213 = > {
ret . set_flags ( ret . flags ( ) | Flag ::REPLIED ) ;
}
2656839745430720464 | 4091323799684325059 = > { /* ignore */ }
_ = > ret . labels_mut ( ) . push ( t ) ,
}
}
ret
}
pub fn remove_envelope (
& self ,
obj_id : Id ,
) -> Option < ( EnvelopeHash , SmallVec < [ MailboxHash ; 8 ] > ) > {
let env_hash = self . reverse_id_store . lock ( ) . unwrap ( ) . remove ( & obj_id ) ? ;
self . id_store . lock ( ) . unwrap ( ) . remove ( & env_hash ) ;
self . blob_id_store . lock ( ) . unwrap ( ) . remove ( & env_hash ) ;
self . byte_cache . lock ( ) . unwrap ( ) . remove ( & env_hash ) ;
let mut mailbox_hashes = SmallVec ::new ( ) ;
let mailboxes_lck = self . mailboxes . read ( ) . unwrap ( ) ;
for ( k , set ) in self . mailboxes_index . write ( ) . unwrap ( ) . iter_mut ( ) {
if set . remove ( & env_hash ) {
mailbox_hashes . push ( * k ) ;
}
}
Some ( ( env_hash , mailbox_hashes ) )
}
}
#[ derive(Debug) ]
pub struct JmapType {
account_name : String ,
account_hash : AccountHash ,
online : Arc < FutureMutex < ( Instant , Result < ( ) > ) > > ,
is_subscribed : Arc < IsSubscribedFn > ,
server_conf : JmapServerConf ,
connection : Arc < FutureMutex < JmapConnection > > ,
store : Arc < RwLock < Store > > ,
tag_index : Arc < RwLock < BTreeMap < u64 , String > > > ,
mailboxes : Arc < RwLock < HashMap < MailboxHash , JmapMailbox > > > ,
store : Arc < Store > ,
}
impl MailBackend for JmapType {
@ -207,7 +313,7 @@ impl MailBackend for JmapType {
}
fn is_online ( & self ) -> ResultFuture < ( ) > {
let online = self . online. clone ( ) ;
let online = self . store. online_status . clone ( ) ;
Ok ( Box ::pin ( async move {
//match timeout(std::time::Duration::from_secs(3), connection.lock()).await {
let online_lck = online . lock ( ) . await ;
@ -224,9 +330,7 @@ impl MailBackend for JmapType {
& mut self ,
mailbox_hash : MailboxHash ,
) -> Result < Pin < Box < dyn Stream < Item = Result < Vec < Envelope > > > + Send + ' static > > > {
let mailboxes = self . mailboxes . clone ( ) ;
let store = self . store . clone ( ) ;
let tag_index = self . tag_index . clone ( ) ;
let connection = self . connection . clone ( ) ;
Ok ( Box ::pin ( async_stream ::try_stream ! {
let mut conn = connection . lock ( ) . await ;
@ -234,8 +338,6 @@ impl MailBackend for JmapType {
let res = protocol ::fetch (
& conn ,
& store ,
& tag_index ,
& mailboxes ,
mailbox_hash ,
) . await ? ;
yield res ;
@ -243,7 +345,13 @@ impl MailBackend for JmapType {
}
fn refresh ( & mut self , _mailbox_hash : MailboxHash ) -> ResultFuture < ( ) > {
Err ( MeliError ::new ( "Unimplemented." ) )
let connection = self . connection . clone ( ) ;
Ok ( Box ::pin ( async move {
let mut conn = connection . lock ( ) . await ;
conn . connect ( ) . await ? ;
conn . email_changes ( ) . await ? ;
Ok ( ( ) )
} ) )
}
fn watch ( & self ) -> ResultFuture < ( ) > {
@ -251,17 +359,18 @@ impl MailBackend for JmapType {
}
fn mailboxes ( & self ) -> ResultFuture < HashMap < MailboxHash , Mailbox > > {
let mailboxes = self . mailboxes . clone ( ) ;
let store = self . store . clone ( ) ;
let connection = self . connection . clone ( ) ;
Ok ( Box ::pin ( async move {
let mut conn = connection . lock ( ) . await ;
conn . connect ( ) . await ? ;
if mailboxes. read ( ) . unwrap ( ) . is_empty ( ) {
if store. mailboxes. read ( ) . unwrap ( ) . is_empty ( ) {
let new_mailboxes = debug ! ( protocol ::get_mailboxes ( & conn ) . await ) ? ;
* mailboxes. write ( ) . unwrap ( ) = new_mailboxes ;
* store. mailboxes. write ( ) . unwrap ( ) = new_mailboxes ;
}
let ret = mailboxes
let ret = store
. mailboxes
. read ( )
. unwrap ( )
. iter ( )
@ -287,7 +396,7 @@ impl MailBackend for JmapType {
mailbox_hash : MailboxHash ,
_flags : Option < Flag > ,
) -> ResultFuture < ( ) > {
let mailboxes = self . mailboxes . clone ( ) ;
let store = self . store . clone ( ) ;
let connection = self . connection . clone ( ) ;
Ok ( Box ::pin ( async move {
let mut conn = connection . lock ( ) . await ;
@ -305,7 +414,7 @@ impl MailBackend for JmapType {
. await ? ;
let mailbox_id : String = {
let mailboxes_lck = mailboxes. read ( ) . unwrap ( ) ;
let mailboxes_lck = store. mailboxes. read ( ) . unwrap ( ) ;
if let Some ( mailbox ) = mailboxes_lck . get ( & mailbox_hash ) {
mailbox . id . clone ( )
} else {
@ -360,7 +469,7 @@ impl MailBackend for JmapType {
}
fn tags ( & self ) -> Option < Arc < RwLock < BTreeMap < u64 , String > > > > {
Some ( self . tag_index. clone ( ) )
Some ( self . store. tag_index. clone ( ) )
}
fn as_any ( & self ) -> & dyn Any {
@ -376,9 +485,12 @@ impl MailBackend for JmapType {
q : crate ::search ::Query ,
mailbox_hash : Option < MailboxHash > ,
) -> ResultFuture < SmallVec < [ EnvelopeHash ; 512 ] > > {
let store = self . store . clone ( ) ;
let connection = self . connection . clone ( ) ;
let filter = if let Some ( mailbox_hash ) = mailbox_hash {
let mailbox_id = self . mailboxes . read ( ) . unwrap ( ) [ & mailbox_hash ] . id . clone ( ) ;
let mailbox_id = self . store . mailboxes . read ( ) . unwrap ( ) [ & mailbox_hash ]
. id
. clone ( ) ;
let mut f = Filter ::Condition (
EmailFilterCondition ::new ( )
@ -412,14 +524,13 @@ impl MailBackend for JmapType {
let res_text = res . text_async ( ) . await ? ;
let mut v : MethodResponse = serde_json ::from_str ( & res_text ) . unwrap ( ) ;
* conn . online_status . lock ( ) . await = ( std ::time ::Instant ::now ( ) , Ok ( ( ) ) ) ;
* store . online_status . lock ( ) . await = ( std ::time ::Instant ::now ( ) , Ok ( ( ) ) ) ;
let m = QueryResponse ::< EmailObject > ::try_from ( v . method_responses . remove ( 0 ) ) ? ;
let QueryResponse ::< EmailObject > { ids , .. } = m ;
let ret = ids
. into_iter ( )
. map ( | id | {
use std ::hash ::Hasher ;
let mut h = std ::collections ::hash_map ::DefaultHasher ::new ( ) ;
let mut h = DefaultHasher ::new ( ) ;
h . write ( id . as_bytes ( ) ) ;
h . finish ( )
} )
@ -450,12 +561,11 @@ impl MailBackend for JmapType {
destination_mailbox_hash : MailboxHash ,
move_ : bool ,
) -> ResultFuture < ( ) > {
let mailboxes = self . mailboxes . clone ( ) ;
let store = self . store . clone ( ) ;
let connection = self . connection . clone ( ) ;
Ok ( Box ::pin ( async move {
let ( source_mailbox_id , destination_mailbox_id ) = {
let mailboxes_lck = mailboxes. read ( ) . unwrap ( ) ;
let mailboxes_lck = store. mailboxes. read ( ) . unwrap ( ) ;
if ! mailboxes_lck . contains_key ( & source_mailbox_hash ) {
return Err ( MeliError ::new ( format! (
"Could not find source mailbox with hash {}" ,
@ -489,9 +599,8 @@ impl MailBackend for JmapType {
) ;
}
{
let store_lck = store . read ( ) . unwrap ( ) ;
for env_hash in env_hashes . iter ( ) {
if let Some ( id ) = store _lck . id_store . get ( & env_hash ) {
if let Some ( id ) = store . id_store . lock ( ) . unwrap ( ) . get ( & env_hash ) {
ids . push ( id . clone ( ) ) ;
id_map . insert ( id . clone ( ) , env_hash ) ;
update_map . insert ( id . clone ( ) , serde_json ::json ! ( update_keywords . clone ( ) ) ) ;
@ -517,7 +626,7 @@ impl MailBackend for JmapType {
let res_text = res . text_async ( ) . await ? ;
let mut v : MethodResponse = serde_json ::from_str ( & res_text ) . unwrap ( ) ;
* conn . online_status . lock ( ) . await = ( std ::time ::Instant ::now ( ) , Ok ( ( ) ) ) ;
* store . online_status . lock ( ) . await = ( std ::time ::Instant ::now ( ) , Ok ( ( ) ) ) ;
let m = SetResponse ::< EmailObject > ::try_from ( v . method_responses . remove ( 0 ) ) ? ;
if let Some ( ids ) = m . not_updated {
if ! ids . is_empty ( ) {
@ -540,13 +649,10 @@ impl MailBackend for JmapType {
mailbox_hash : MailboxHash ,
flags : SmallVec < [ ( std ::result ::Result < Flag , String > , bool ) ; 8 ] > ,
) -> ResultFuture < ( ) > {
let mailboxes = self . mailboxes . clone ( ) ;
let store = self . store . clone ( ) ;
let account_hash = self . account_hash ;
let tag_index = self . tag_index . clone ( ) ;
let connection = self . connection . clone ( ) ;
Ok ( Box ::pin ( async move {
let mailbox_id = mailboxes. read ( ) . unwrap ( ) [ & mailbox_hash ] . id . clone ( ) ;
let mailbox_id = store. mailboxes. read ( ) . unwrap ( ) [ & mailbox_hash ] . id . clone ( ) ;
let mut update_map : HashMap < String , Value > = HashMap ::default ( ) ;
let mut ids : Vec < Id > = Vec ::with_capacity ( env_hashes . rest . len ( ) + 1 ) ;
let mut id_map : HashMap < Id , EnvelopeHash > = HashMap ::default ( ) ;
@ -587,9 +693,8 @@ impl MailBackend for JmapType {
}
}
{
let store_lck = store . read ( ) . unwrap ( ) ;
for hash in env_hashes . iter ( ) {
if let Some ( id ) = store _lck . id_store . get ( & hash ) {
if let Some ( id ) = store . id_store . lock ( ) . unwrap ( ) . get ( & hash ) {
ids . push ( id . clone ( ) ) ;
id_map . insert ( id . clone ( ) , hash ) ;
update_map . insert ( id . clone ( ) , serde_json ::json ! ( update_keywords . clone ( ) ) ) ;
@ -626,7 +731,7 @@ impl MailBackend for JmapType {
* /
//debug!("res_text = {}", &res_text);
let mut v : MethodResponse = serde_json ::from_str ( & res_text ) . unwrap ( ) ;
* conn . online_status . lock ( ) . await = ( std ::time ::Instant ::now ( ) , Ok ( ( ) ) ) ;
* store . online_status . lock ( ) . await = ( std ::time ::Instant ::now ( ) , Ok ( ( ) ) ) ;
let m = SetResponse ::< EmailObject > ::try_from ( v . method_responses . remove ( 0 ) ) ? ;
if let Some ( ids ) = m . not_updated {
return Err ( MeliError ::new (
@ -637,24 +742,45 @@ impl MailBackend for JmapType {
) ) ;
}
let mut tag_index_lck = tag_index . write ( ) . unwrap ( ) ;
for ( flag , value ) in flags . iter ( ) {
match flag {
Ok ( f ) = > { }
Err ( t ) = > {
if * value {
tag_index_lck . insert ( tag_hash ! ( t ) , t . clone ( ) ) ;
{
let mut tag_index_lck = store . tag_index . write ( ) . unwrap ( ) ;
for ( flag , value ) in flags . iter ( ) {
match flag {
Ok ( f ) = > { }
Err ( t ) = > {
if * value {
tag_index_lck . insert ( tag_hash ! ( t ) , t . clone ( ) ) ;
}
}
}
}
drop ( tag_index_lck ) ;
}
let e = GetResponse ::< EmailObject > ::try_from ( v . method_responses . pop ( ) . unwrap ( ) ) ? ;
let GetResponse ::< EmailObject > { list , state , .. } = e ;
//debug!(&list);
{
let c = store
. object_set_states
. lock ( )
. unwrap ( )
. get ( & EmailObject ::NAME )
. map ( | prev_state | * prev_state = = state ) ;
if let Some ( false ) = c {
conn . email_changes ( ) . await ? ;
} else {
debug ! ( "{:?}: inserting state {}" , EmailObject ::NAME , & state ) ;
store
. object_set_states
. lock ( )
. unwrap ( )
. insert ( EmailObject ::NAME , state ) ;
}
}
debug ! ( & list ) ;
for envobj in list {
let env_hash = id_map [ & envobj . id ] ;
conn . add_refresh_event ( RefreshEvent {
account_hash ,
account_hash : store . account_hash ,
mailbox_hash ,
kind : RefreshEventKind ::NewFlags (
env_hash ,
@ -673,34 +799,41 @@ impl JmapType {
is_subscribed : Box < dyn Fn ( & str ) -> bool + Send + Sync > ,
event_consumer : BackendEventConsumer ,
) -> Result < Box < dyn MailBackend > > {
let online = Arc ::new ( FutureMutex ::new ( (
let online _status = Arc ::new ( FutureMutex ::new ( (
std ::time ::Instant ::now ( ) ,
Err ( MeliError ::new ( "Account is uninitialised." ) ) ,
) ) ) ;
let server_conf = JmapServerConf ::new ( s ) ? ;
let account_hash = {
use std ::collections ::hash_map ::DefaultHasher ;
use std ::hash ::Hasher ;
let mut hasher = DefaultHasher ::new ( ) ;
hasher . write ( s . name . as_bytes ( ) ) ;
hasher . finish ( )
} ;
let store = Arc ::new ( Store {
account_name : Arc ::new ( s . name . clone ( ) ) ,
account_hash ,
account_id : Arc ::new ( Mutex ::new ( String ::new ( ) ) ) ,
online_status ,
event_consumer ,
is_subscribed : Arc ::new ( IsSubscribedFn ( is_subscribed ) ) ,
byte_cache : Default ::default ( ) ,
id_store : Default ::default ( ) ,
reverse_id_store : Default ::default ( ) ,
blob_id_store : Default ::default ( ) ,
tag_index : Default ::default ( ) ,
mailboxes : Default ::default ( ) ,
mailboxes_index : Default ::default ( ) ,
object_set_states : Default ::default ( ) ,
} ) ;
Ok ( Box ::new ( JmapType {
connection : Arc ::new ( FutureMutex ::new ( JmapConnection ::new (
& server_conf ,
account_hash ,
event_consumer ,
online . clone ( ) ,
store . clone ( ) ,
) ? ) ) ,
store : Arc ::new ( RwLock ::new ( Store ::default ( ) ) ) ,
tag_index : Arc ::new ( RwLock ::new ( Default ::default ( ) ) ) ,
mailboxes : Arc ::new ( RwLock ::new ( HashMap ::default ( ) ) ) ,
account_name : s . name . clone ( ) ,
account_hash ,
online ,
is_subscribed : Arc ::new ( IsSubscribedFn ( is_subscribed ) ) ,
store ,
server_conf ,
} ) )
}