2020-07-30 17:58:53 +00:00
/*
* meli - nntp module .
*
* Copyright 2019 Manos Pitsidianakis
*
* This file is part of meli .
*
* meli is free software : you can redistribute it and / or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation , either version 3 of the License , or
* ( at your option ) any later version .
*
* meli is distributed in the hope that it will be useful ,
* but WITHOUT ANY WARRANTY ; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE . See the
* GNU General Public License for more details .
*
* You should have received a copy of the GNU General Public License
* along with meli . If not , see < http ://www.gnu.org/licenses/>.
* /
use crate ::get_conf_val ;
use crate ::get_path_hash ;
use smallvec ::SmallVec ;
#[ macro_use ]
mod protocol_parser ;
pub use protocol_parser ::* ;
mod mailbox ;
pub use mailbox ::* ;
mod operations ;
pub use operations ::* ;
mod connection ;
pub use connection ::* ;
use crate ::conf ::AccountSettings ;
2021-01-10 20:32:25 +00:00
use crate ::connections ::timeout ;
2020-07-30 17:58:53 +00:00
use crate ::email ::* ;
2020-08-01 21:44:45 +00:00
use crate ::error ::{ MeliError , Result , ResultIntoMeliError } ;
2020-08-10 11:24:21 +00:00
use crate ::{ backends ::* , Collection } ;
2020-07-30 17:58:53 +00:00
use futures ::lock ::Mutex as FutureMutex ;
use futures ::stream ::Stream ;
2020-08-10 11:24:21 +00:00
use std ::collections ::{ hash_map ::DefaultHasher , BTreeSet , HashMap , HashSet } ;
2020-07-30 17:58:53 +00:00
use std ::hash ::Hasher ;
use std ::pin ::Pin ;
use std ::str ::FromStr ;
2020-08-10 11:24:21 +00:00
use std ::sync ::{ Arc , Mutex } ;
2021-01-10 20:32:25 +00:00
use std ::time ::{ Duration , Instant } ;
2020-07-30 17:58:53 +00:00
pub type UID = usize ;
2021-09-04 18:48:21 +00:00
macro_rules ! get_conf_val {
( $s :ident [ $var :literal ] ) = > {
$s . extra . get ( $var ) . ok_or_else ( | | {
MeliError ::new ( format! (
" Configuration error ({}): NNTP connection requires the field `{}` set " ,
$s . name . as_str ( ) ,
$var
) )
} )
} ;
( $s :ident [ $var :literal ] , $default :expr ) = > {
$s . extra
. get ( $var )
. map ( | v | {
< _ > ::from_str ( v ) . map_err ( | e | {
MeliError ::new ( format! (
" Configuration error ({}) NNTP: Invalid value for field `{}`: {} \n {} " ,
$s . name . as_str ( ) ,
$var ,
v ,
e
) )
} )
} )
. unwrap_or_else ( | | Ok ( $default ) )
} ;
}
2020-07-30 17:58:53 +00:00
pub static SUPPORTED_CAPABILITIES : & [ & str ] = & [
#[ cfg(feature = " deflate_compression " ) ]
" COMPRESS DEFLATE " ,
" VERSION 2 " ,
2021-09-05 09:09:29 +00:00
" NEWNEWS " ,
" POST " ,
" OVER " ,
" OVER MSGID " ,
" READER " ,
" STARTTLS " ,
" HDR " ,
" AUTHINFO USER " ,
2020-07-30 17:58:53 +00:00
] ;
#[ derive(Debug, Clone) ]
pub struct NntpServerConf {
pub server_hostname : String ,
pub server_username : String ,
2020-08-01 21:44:45 +00:00
pub server_password : String ,
2020-07-30 17:58:53 +00:00
pub server_port : u16 ,
pub use_starttls : bool ,
pub use_tls : bool ,
2020-08-01 21:44:45 +00:00
pub require_auth : bool ,
2020-07-30 17:58:53 +00:00
pub danger_accept_invalid_certs : bool ,
pub extension_use : NntpExtensionUse ,
}
2020-08-01 09:36:47 +00:00
type Capabilities = HashSet < String > ;
2020-07-30 17:58:53 +00:00
#[ derive(Debug) ]
pub struct UIDStore {
account_hash : AccountHash ,
account_name : Arc < String > ,
offline_cache : bool ,
capabilities : Arc < Mutex < Capabilities > > ,
2021-09-05 09:09:29 +00:00
message_id_index : Arc < Mutex < HashMap < String , EnvelopeHash > > > ,
2020-07-30 17:58:53 +00:00
hash_index : Arc < Mutex < HashMap < EnvelopeHash , ( UID , MailboxHash ) > > > ,
uid_index : Arc < Mutex < HashMap < ( MailboxHash , UID ) , EnvelopeHash > > > ,
2020-08-10 11:24:21 +00:00
collection : Collection ,
2020-07-30 17:58:53 +00:00
mailboxes : Arc < FutureMutex < HashMap < MailboxHash , NntpMailbox > > > ,
is_online : Arc < Mutex < ( Instant , Result < ( ) > ) > > ,
2020-08-19 22:55:24 +00:00
event_consumer : BackendEventConsumer ,
2020-07-30 17:58:53 +00:00
}
2020-08-19 22:55:24 +00:00
impl UIDStore {
fn new (
account_hash : AccountHash ,
account_name : Arc < String > ,
event_consumer : BackendEventConsumer ,
) -> Self {
2020-07-30 17:58:53 +00:00
UIDStore {
2020-08-19 22:55:24 +00:00
account_hash ,
account_name ,
event_consumer ,
2020-07-30 17:58:53 +00:00
offline_cache : false ,
capabilities : Default ::default ( ) ,
2021-09-05 09:09:29 +00:00
message_id_index : Default ::default ( ) ,
2020-07-30 17:58:53 +00:00
hash_index : Default ::default ( ) ,
uid_index : Default ::default ( ) ,
mailboxes : Arc ::new ( FutureMutex ::new ( Default ::default ( ) ) ) ,
2020-08-10 11:24:21 +00:00
collection : Collection ::new ( ) ,
2020-07-30 17:58:53 +00:00
is_online : Arc ::new ( Mutex ::new ( (
Instant ::now ( ) ,
Err ( MeliError ::new ( " Account is uninitialised. " ) ) ,
) ) ) ,
}
}
}
#[ derive(Debug) ]
pub struct NntpType {
is_subscribed : Arc < IsSubscribedFn > ,
connection : Arc < FutureMutex < NntpConnection > > ,
server_conf : NntpServerConf ,
uid_store : Arc < UIDStore > ,
can_create_flags : Arc < Mutex < bool > > ,
}
impl MailBackend for NntpType {
fn capabilities ( & self ) -> MailBackendCapabilities {
2020-08-01 09:36:47 +00:00
let mut extensions = self
. uid_store
. capabilities
. lock ( )
. unwrap ( )
. iter ( )
. map ( | c | {
(
c . to_string ( ) ,
MailBackendExtensionStatus ::Unsupported { comment : None } ,
)
} )
. collect ::< Vec < ( String , MailBackendExtensionStatus ) > > ( ) ;
2021-09-03 21:32:57 +00:00
let mut supports_submission = false ;
2020-08-01 09:36:47 +00:00
let NntpExtensionUse {
#[ cfg(feature = " deflate_compression " ) ]
deflate ,
} = self . server_conf . extension_use ;
{
for ( name , status ) in extensions . iter_mut ( ) {
match name . as_str ( ) {
2021-09-03 21:32:57 +00:00
s if s . eq_ignore_ascii_case ( " POST " ) = > {
supports_submission = true ;
* status = MailBackendExtensionStatus ::Enabled { comment : None } ;
}
2020-08-01 09:36:47 +00:00
" COMPRESS DEFLATE " = > {
2020-08-25 18:34:22 +00:00
#[ cfg(feature = " deflate_compression " ) ]
{
2020-08-01 09:36:47 +00:00
if deflate {
* status = MailBackendExtensionStatus ::Enabled { comment : None } ;
} else {
* status = MailBackendExtensionStatus ::Supported {
comment : Some ( " Disabled by user configuration " ) ,
} ;
}
2020-08-25 18:34:22 +00:00
}
#[ cfg(not(feature = " deflate_compression " )) ]
{
2020-08-01 09:36:47 +00:00
* status = MailBackendExtensionStatus ::Unsupported {
comment : Some ( " melib not compiled with DEFLATE. " ) ,
} ;
}
}
_ = > {
if SUPPORTED_CAPABILITIES . contains ( & name . as_str ( ) ) {
* status = MailBackendExtensionStatus ::Enabled { comment : None } ;
}
}
}
}
}
extensions . sort_by ( | a , b | a . 0. cmp ( & b . 0 ) ) ;
MailBackendCapabilities {
2020-07-30 17:58:53 +00:00
is_async : true ,
is_remote : true ,
supports_search : false ,
2020-08-01 09:36:47 +00:00
extensions : Some ( extensions ) ,
2020-07-30 17:58:53 +00:00
supports_tags : false ,
2021-09-03 21:32:57 +00:00
supports_submission ,
2020-08-01 09:36:47 +00:00
}
2020-07-30 17:58:53 +00:00
}
2020-08-20 14:37:19 +00:00
fn fetch (
2020-07-30 17:58:53 +00:00
& mut self ,
mailbox_hash : MailboxHash ,
) -> Result < Pin < Box < dyn Stream < Item = Result < Vec < Envelope > > > + Send + 'static > > > {
2020-08-09 18:23:13 +00:00
let mut state = FetchState {
mailbox_hash ,
uid_store : self . uid_store . clone ( ) ,
connection : self . connection . clone ( ) ,
high_low_total : None ,
} ;
2020-07-30 17:58:53 +00:00
Ok ( Box ::pin ( async_stream ::try_stream! {
{
2020-08-09 18:23:13 +00:00
let f = & state . uid_store . mailboxes . lock ( ) . await [ & state . mailbox_hash ] ;
2020-07-30 17:58:53 +00:00
f . exists . lock ( ) . unwrap ( ) . clear ( ) ;
f . unseen . lock ( ) . unwrap ( ) . clear ( ) ;
} ;
2020-08-09 18:23:13 +00:00
loop {
if let Some ( ret ) = state . fetch_envs ( ) . await ? {
yield ret ;
continue ;
}
break ;
}
2020-07-30 17:58:53 +00:00
} ) )
}
2021-09-05 09:09:29 +00:00
fn refresh ( & mut self , mailbox_hash : MailboxHash ) -> ResultFuture < ( ) > {
let uid_store = self . uid_store . clone ( ) ;
let connection = self . connection . clone ( ) ;
Ok ( Box ::pin ( async move {
/* To get updates, either issue NEWNEWS if it's supported by the server, and fallback
* to OVER otherwise * /
let mbox : NntpMailbox = uid_store . mailboxes . lock ( ) . await . get ( & mailbox_hash ) . map ( std ::clone ::Clone ::clone ) . ok_or_else ( | | MeliError ::new ( format! ( " Mailbox with hash {} not found in NNTP connection, this could possibly be a bug or it was deleted. " , mailbox_hash ) ) ) ? ;
let mut latest_article : Option < crate ::UnixTimestamp > =
mbox . latest_article . lock ( ) . unwrap ( ) . clone ( ) ;
let ( over_msgid_support , newnews_support ) : ( bool , bool ) = {
let caps = uid_store . capabilities . lock ( ) . unwrap ( ) ;
(
caps . iter ( ) . any ( | c | c . eq_ignore_ascii_case ( " OVER MSGID " ) ) ,
caps . iter ( ) . any ( | c | c . eq_ignore_ascii_case ( " NEWNEWS " ) ) ,
)
} ;
let mut res = String ::with_capacity ( 8 * 1024 ) ;
let mut conn = timeout ( Some ( Duration ::from_secs ( 60 * 16 ) ) , connection . lock ( ) ) . await ? ;
if let Some ( ref mut latest_article ) = latest_article {
let timestamp = * latest_article - 10 * 60 ;
let datetime_str =
crate ::datetime ::timestamp_to_string ( timestamp , Some ( " %Y%m%d %H%M%S " ) , true ) ;
if newnews_support {
conn . send_command (
format! ( " NEWNEWS {} {} " , & mbox . nntp_path , datetime_str ) . as_bytes ( ) ,
)
. await ? ;
conn . read_response ( & mut res , true , & [ " 230 " ] ) . await ? ;
let message_ids = {
let message_id_lck = uid_store . message_id_index . lock ( ) . unwrap ( ) ;
res . split_rn ( )
. skip ( 1 )
. map ( | s | s . trim ( ) )
. filter ( | msg_id | ! message_id_lck . contains_key ( * msg_id ) )
. map ( str ::to_string )
. collect ::< Vec < String > > ( )
} ;
if message_ids . is_empty ( ) | | ! over_msgid_support {
return Ok ( ( ) ) ;
}
for msg_id in message_ids {
conn . send_command ( format! ( " OVER {} " , msg_id ) . as_bytes ( ) )
. await ? ;
conn . read_response ( & mut res , true , & [ " 224 " ] ) . await ? ;
let mut message_id_lck = uid_store . message_id_index . lock ( ) . unwrap ( ) ;
let mut hash_index_lck = uid_store . hash_index . lock ( ) . unwrap ( ) ;
let mut uid_index_lck = uid_store . uid_index . lock ( ) . unwrap ( ) ;
for l in res . split_rn ( ) . skip ( 1 ) {
let ( _ , ( num , env ) ) = protocol_parser ::over_article ( & l ) ? ;
message_id_lck . insert ( env . message_id_display ( ) . to_string ( ) , env . hash ( ) ) ;
hash_index_lck . insert ( env . hash ( ) , ( num , mailbox_hash ) ) ;
uid_index_lck . insert ( ( mailbox_hash , num ) , env . hash ( ) ) ;
* latest_article = std ::cmp ::max ( * latest_article , env . timestamp ) ;
( uid_store . event_consumer ) (
uid_store . account_hash ,
crate ::backends ::BackendEvent ::Refresh ( RefreshEvent {
mailbox_hash ,
account_hash : uid_store . account_hash ,
kind : RefreshEventKind ::Create ( Box ::new ( env ) ) ,
} ) ,
) ;
}
}
return Ok ( ( ) ) ;
}
}
//conn.select_group(mailbox_hash, false, &mut res).await?;
Ok ( ( ) )
} ) )
2020-07-30 17:58:53 +00:00
}
2020-08-20 14:37:19 +00:00
fn mailboxes ( & self ) -> ResultFuture < HashMap < MailboxHash , Mailbox > > {
2020-07-30 17:58:53 +00:00
let uid_store = self . uid_store . clone ( ) ;
let connection = self . connection . clone ( ) ;
Ok ( Box ::pin ( async move {
NntpType ::nntp_mailboxes ( & connection ) . await ? ;
let mailboxes_lck = uid_store . mailboxes . lock ( ) . await ;
let ret = mailboxes_lck
. iter ( )
. map ( | ( h , f ) | ( * h , Box ::new ( Clone ::clone ( f ) ) as Mailbox ) )
. collect ( ) ;
Ok ( ret )
} ) )
}
2020-08-20 14:37:19 +00:00
fn is_online ( & self ) -> ResultFuture < ( ) > {
2020-07-30 17:58:53 +00:00
let connection = self . connection . clone ( ) ;
Ok ( Box ::pin ( async move {
2021-01-10 20:32:25 +00:00
match timeout ( Some ( Duration ::from_secs ( 60 * 16 ) ) , connection . lock ( ) ) . await {
2020-07-30 17:58:53 +00:00
Ok ( mut conn ) = > {
2020-08-20 14:37:19 +00:00
debug! ( " is_online " ) ;
2021-01-10 20:32:25 +00:00
match debug! ( timeout ( Some ( Duration ::from_secs ( 60 * 16 ) ) , conn . connect ( ) ) . await )
{
2020-07-30 17:58:53 +00:00
Ok ( Ok ( ( ) ) ) = > Ok ( ( ) ) ,
Err ( err ) | Ok ( Err ( err ) ) = > {
conn . stream = Err ( err . clone ( ) ) ;
debug! ( conn . connect ( ) . await )
}
}
}
Err ( err ) = > Err ( err ) ,
}
} ) )
}
2020-08-20 14:37:19 +00:00
fn watch ( & self ) -> ResultFuture < ( ) > {
2021-09-05 09:39:15 +00:00
Err ( MeliError ::new ( " Unimplemented. " ) . set_kind ( ErrorKind ::NotImplemented ) )
2020-07-30 17:58:53 +00:00
}
fn operation ( & self , env_hash : EnvelopeHash ) -> Result < Box < dyn BackendOp > > {
let ( uid , mailbox_hash ) = if let Some ( v ) =
self . uid_store . hash_index . lock ( ) . unwrap ( ) . get ( & env_hash )
{
* v
} else {
return Err ( MeliError ::new (
" Message not found in local cache, it might have been deleted before you requested it. "
) ) ;
} ;
Ok ( Box ::new ( NntpOp ::new (
uid ,
mailbox_hash ,
self . connection . clone ( ) ,
self . uid_store . clone ( ) ,
) ) )
}
fn save (
& self ,
_bytes : Vec < u8 > ,
_mailbox_hash : MailboxHash ,
_flags : Option < Flag > ,
) -> ResultFuture < ( ) > {
2020-10-13 10:57:04 +00:00
Err ( MeliError ::new ( " NNTP doesn't support saving. " ) )
2020-07-30 17:58:53 +00:00
}
fn copy_messages (
& mut self ,
_env_hashes : EnvelopeHashBatch ,
_source_mailbox_hash : MailboxHash ,
_destination_mailbox_hash : MailboxHash ,
_move_ : bool ,
) -> ResultFuture < ( ) > {
2020-10-13 10:57:04 +00:00
Err ( MeliError ::new ( " NNTP doesn't support copying/moving. " ) )
2020-07-30 17:58:53 +00:00
}
fn set_flags (
& mut self ,
_env_hashes : EnvelopeHashBatch ,
_mailbox_hash : MailboxHash ,
_flags : SmallVec < [ ( std ::result ::Result < Flag , String > , bool ) ; 8 ] > ,
) -> ResultFuture < ( ) > {
2020-10-13 10:57:04 +00:00
Err ( MeliError ::new ( " NNTP doesn't support flags. " ) )
}
fn delete_messages (
& mut self ,
_env_hashes : EnvelopeHashBatch ,
_mailbox_hash : MailboxHash ,
) -> ResultFuture < ( ) > {
Err ( MeliError ::new ( " NNTP doesn't support deletion. " ) )
2020-07-30 17:58:53 +00:00
}
2020-08-20 18:25:12 +00:00
fn as_any ( & self ) -> & dyn Any {
2020-07-30 17:58:53 +00:00
self
}
2020-08-20 18:25:12 +00:00
fn as_any_mut ( & mut self ) -> & mut dyn Any {
self
2020-07-30 17:58:53 +00:00
}
2020-08-10 11:24:21 +00:00
fn collection ( & self ) -> Collection {
self . uid_store . collection . clone ( )
}
2020-07-30 17:58:53 +00:00
fn create_mailbox (
& mut self ,
_path : String ,
) -> ResultFuture < ( MailboxHash , HashMap < MailboxHash , Mailbox > ) > {
Err ( MeliError ::new ( " Unimplemented. " ) )
}
fn delete_mailbox (
& mut self ,
_mailbox_hash : MailboxHash ,
) -> ResultFuture < HashMap < MailboxHash , Mailbox > > {
Err ( MeliError ::new ( " Unimplemented. " ) )
}
fn set_mailbox_subscription (
& mut self ,
_mailbox_hash : MailboxHash ,
_new_val : bool ,
) -> ResultFuture < ( ) > {
Err ( MeliError ::new ( " Unimplemented. " ) )
}
fn rename_mailbox (
& mut self ,
_mailbox_hash : MailboxHash ,
_new_path : String ,
) -> ResultFuture < Mailbox > {
Err ( MeliError ::new ( " Unimplemented. " ) )
}
fn set_mailbox_permissions (
& mut self ,
_mailbox_hash : MailboxHash ,
_val : crate ::backends ::MailboxPermissions ,
) -> ResultFuture < ( ) > {
Err ( MeliError ::new ( " Unimplemented. " ) )
}
fn search (
& self ,
_query : crate ::search ::Query ,
_mailbox_hash : Option < MailboxHash > ,
) -> ResultFuture < SmallVec < [ EnvelopeHash ; 512 ] > > {
Err ( MeliError ::new ( " Unimplemented. " ) )
}
2021-09-03 21:32:57 +00:00
fn submit (
& self ,
bytes : Vec < u8 > ,
mailbox_hash : Option < MailboxHash > ,
2021-09-04 13:52:17 +00:00
_flags : Option < Flag > ,
2021-09-03 21:32:57 +00:00
) -> ResultFuture < ( ) > {
let connection = self . connection . clone ( ) ;
Ok ( Box ::pin ( async move {
match timeout ( Some ( Duration ::from_secs ( 60 * 16 ) ) , connection . lock ( ) ) . await {
Ok ( mut conn ) = > {
match & conn . stream {
Ok ( stream ) = > {
if ! stream . supports_submission {
return Err ( MeliError ::new ( " Server prohibits posting. " ) ) ;
}
}
Err ( err ) = > return Err ( err . clone ( ) ) ,
}
let mut res = String ::with_capacity ( 8 * 1024 ) ;
if let Some ( mailbox_hash ) = mailbox_hash {
conn . select_group ( mailbox_hash , false , & mut res ) . await ? ;
}
conn . send_command ( b " POST " ) . await ? ;
conn . read_response ( & mut res , false , & [ " 340 " ] ) . await ? ;
conn . send_multiline_data_block ( & bytes ) . await ? ;
conn . read_response ( & mut res , false , & [ " 240 " ] ) . await ? ;
Ok ( ( ) )
}
Err ( err ) = > Err ( err ) ,
}
} ) )
}
2020-07-30 17:58:53 +00:00
}
impl NntpType {
pub fn new (
s : & AccountSettings ,
is_subscribed : Box < dyn Fn ( & str ) -> bool + Send + Sync > ,
2020-08-19 22:55:24 +00:00
event_consumer : BackendEventConsumer ,
2020-07-30 17:58:53 +00:00
) -> Result < Box < dyn MailBackend > > {
let server_hostname = get_conf_val! ( s [ " server_hostname " ] ) ? ;
/* let server_username = get_conf_val!(s["server_username"], "")?;
let server_password = if ! s . extra . contains_key ( " server_password_command " ) {
get_conf_val! ( s [ " server_password " ] , " " ) ? . to_string ( )
} else {
let invocation = get_conf_val! ( s [ " server_password_command " ] ) ? ;
let output = std ::process ::Command ::new ( " sh " )
. args ( & [ " -c " , invocation ] )
. stdin ( std ::process ::Stdio ::piped ( ) )
. stdout ( std ::process ::Stdio ::piped ( ) )
. stderr ( std ::process ::Stdio ::piped ( ) )
. output ( ) ? ;
if ! output . status . success ( ) {
return Err ( MeliError ::new ( format! (
" ({}) server_password_command `{}` returned {}: {} " ,
s . name ,
get_conf_val! ( s [ " server_password_command " ] ) ? ,
output . status ,
String ::from_utf8_lossy ( & output . stderr )
) ) ) ;
}
std ::str ::from_utf8 ( & output . stdout ) ? . trim_end ( ) . to_string ( )
} ;
* /
let server_port = get_conf_val! ( s [ " server_port " ] , 119 ) ? ;
2020-08-01 21:44:45 +00:00
let use_tls = get_conf_val! ( s [ " use_tls " ] , server_port = = 563 ) ? ;
2021-09-04 18:48:21 +00:00
let use_starttls = use_tls & & get_conf_val! ( s [ " use_starttls " ] , false ) ? ;
2020-07-30 17:58:53 +00:00
let danger_accept_invalid_certs : bool =
get_conf_val! ( s [ " danger_accept_invalid_certs " ] , false ) ? ;
2021-09-04 18:48:21 +00:00
let require_auth = get_conf_val! ( s [ " require_auth " ] , false ) ? ;
2020-07-30 17:58:53 +00:00
let server_conf = NntpServerConf {
server_hostname : server_hostname . to_string ( ) ,
2020-08-01 21:44:45 +00:00
server_username : if require_auth {
get_conf_val! ( s [ " server_username " ] ) ? . to_string ( )
} else {
get_conf_val! ( s [ " server_username " ] , String ::new ( ) ) ?
} ,
server_password : if require_auth {
get_conf_val! ( s [ " server_password " ] ) ? . to_string ( )
} else {
get_conf_val! ( s [ " server_password " ] , String ::new ( ) ) ?
} ,
require_auth ,
2020-07-30 17:58:53 +00:00
server_port ,
use_tls ,
use_starttls ,
danger_accept_invalid_certs ,
2020-08-01 09:36:47 +00:00
extension_use : NntpExtensionUse {
#[ cfg(feature = " deflate_compression " ) ]
2021-09-04 18:48:21 +00:00
deflate : get_conf_val ! ( s [ " use_deflate " ] , false ) ? ,
2020-08-01 09:36:47 +00:00
} ,
2020-07-30 17:58:53 +00:00
} ;
let account_hash = {
let mut hasher = DefaultHasher ::new ( ) ;
hasher . write ( s . name . as_bytes ( ) ) ;
hasher . finish ( )
} ;
let account_name = Arc ::new ( s . name ( ) . to_string ( ) ) ;
let mut mailboxes = HashMap ::default ( ) ;
for ( k , _f ) in s . mailboxes . iter ( ) {
let mailbox_hash = get_path_hash! ( & k ) ;
mailboxes . insert (
mailbox_hash ,
NntpMailbox {
hash : mailbox_hash ,
nntp_path : k . to_string ( ) ,
high_watermark : Arc ::new ( Mutex ::new ( 0 ) ) ,
low_watermark : Arc ::new ( Mutex ::new ( 0 ) ) ,
2021-09-05 09:09:29 +00:00
latest_article : Arc ::new ( Mutex ::new ( None ) ) ,
2020-07-30 17:58:53 +00:00
exists : Default ::default ( ) ,
unseen : Default ::default ( ) ,
} ,
) ;
}
if mailboxes . is_empty ( ) {
return Err ( MeliError ::new ( format! (
" {} has no newsgroups configured. " ,
account_name
) ) ) ;
}
let uid_store : Arc < UIDStore > = Arc ::new ( UIDStore {
2020-08-01 21:44:45 +00:00
offline_cache : false , //get_conf_val!(s["X_header_caching"], false)?,
2020-07-30 17:58:53 +00:00
mailboxes : Arc ::new ( FutureMutex ::new ( mailboxes ) ) ,
2020-08-19 22:55:24 +00:00
.. UIDStore ::new ( account_hash , account_name , event_consumer )
2020-07-30 17:58:53 +00:00
} ) ;
let connection = NntpConnection ::new_connection ( & server_conf , uid_store . clone ( ) ) ;
Ok ( Box ::new ( NntpType {
server_conf ,
is_subscribed : Arc ::new ( IsSubscribedFn ( is_subscribed ) ) ,
can_create_flags : Arc ::new ( Mutex ::new ( false ) ) ,
connection : Arc ::new ( FutureMutex ::new ( connection ) ) ,
uid_store ,
} ) )
}
pub async fn nntp_mailboxes ( connection : & Arc < FutureMutex < NntpConnection > > ) -> Result < ( ) > {
let mut res = String ::with_capacity ( 8 * 1024 ) ;
let mut conn = connection . lock ( ) . await ;
let command = {
let mailboxes_lck = conn . uid_store . mailboxes . lock ( ) . await ;
mailboxes_lck
. values ( )
. fold ( " LIST ACTIVE " . to_string ( ) , | mut acc , x | {
if acc . len ( ) ! = " LIST ACTIVE " . len ( ) {
acc . push ( ',' ) ;
}
acc . push_str ( x . name ( ) ) ;
acc
} )
} ;
conn . send_command ( command . as_bytes ( ) ) . await ? ;
2020-08-01 21:44:45 +00:00
conn . read_response ( & mut res , true , & [ " 215 " ] )
. await
. chain_err_summary ( | | {
format! (
" Could not get newsgroups {}: expected LIST ACTIVE response but got: {} " ,
& conn . uid_store . account_name , res
)
} ) ? ;
2020-07-30 17:58:53 +00:00
debug! ( & res ) ;
let mut mailboxes_lck = conn . uid_store . mailboxes . lock ( ) . await ;
for l in res . split_rn ( ) . skip ( 1 ) {
let s = l . split_whitespace ( ) . collect ::< SmallVec < [ & str ; 4 ] > > ( ) ;
if s . len ( ) ! = 3 {
continue ;
}
let mailbox_hash = get_path_hash! ( & s [ 0 ] ) ;
mailboxes_lck . entry ( mailbox_hash ) . and_modify ( | m | {
* m . high_watermark . lock ( ) . unwrap ( ) = usize ::from_str ( s [ 1 ] ) . unwrap_or ( 0 ) ;
* m . low_watermark . lock ( ) . unwrap ( ) = usize ::from_str ( s [ 2 ] ) . unwrap_or ( 0 ) ;
} ) ;
}
Ok ( ( ) )
}
pub fn validate_config ( s : & AccountSettings ) -> Result < ( ) > {
2021-09-04 18:48:21 +00:00
let mut keys : HashSet < & 'static str > = Default ::default ( ) ;
macro_rules ! get_conf_val {
( $s :ident [ $var :literal ] ) = > { {
keys . insert ( $var ) ;
$s . extra . get ( $var ) . ok_or_else ( | | {
MeliError ::new ( format! (
" Configuration error ({}): NNTP connection requires the field `{}` set " ,
$s . name . as_str ( ) ,
$var
) )
} )
} } ;
( $s :ident [ $var :literal ] , $default :expr ) = > { {
keys . insert ( $var ) ;
$s . extra
. get ( $var )
. map ( | v | {
< _ > ::from_str ( v ) . map_err ( | e | {
MeliError ::new ( format! (
" Configuration error ({}) NNTP: Invalid value for field `{}`: {} \n {} " ,
$s . name . as_str ( ) ,
$var ,
v ,
e
) )
} )
} )
. unwrap_or_else ( | | Ok ( $default ) )
} } ;
}
get_conf_val! ( s [ " require_auth " ] , false ) ? ;
2020-07-30 17:58:53 +00:00
get_conf_val! ( s [ " server_hostname " ] ) ? ;
get_conf_val! ( s [ " server_username " ] , String ::new ( ) ) ? ;
if ! s . extra . contains_key ( " server_password_command " ) {
get_conf_val! ( s [ " server_password " ] , String ::new ( ) ) ? ;
} else if s . extra . contains_key ( " server_password " ) {
return Err ( MeliError ::new ( format! (
" Configuration error ({}): both server_password and server_password_command are set, cannot choose " ,
s . name . as_str ( ) ,
) ) ) ;
}
let server_port = get_conf_val! ( s [ " server_port " ] , 119 ) ? ;
2020-08-01 21:44:45 +00:00
let use_tls = get_conf_val! ( s [ " use_tls " ] , server_port = = 563 ) ? ;
let use_starttls = get_conf_val! ( s [ " use_starttls " ] , ! ( server_port = = 563 ) ) ? ;
2020-07-30 17:58:53 +00:00
if ! use_tls & & use_starttls {
return Err ( MeliError ::new ( format! (
" Configuration error ({}): incompatible use_tls and use_starttls values: use_tls = false, use_starttls = true " ,
s . name . as_str ( ) ,
) ) ) ;
}
2020-08-01 21:44:45 +00:00
#[ cfg(feature = " deflate_compression " ) ]
2021-09-04 18:48:21 +00:00
get_conf_val! ( s [ " use_deflate " ] , false ) ? ;
2020-08-01 21:44:45 +00:00
#[ cfg(not(feature = " deflate_compression " )) ]
if s . extra . contains_key ( " use_deflate " ) {
return Err ( MeliError ::new ( format! (
" Configuration error ({}): setting `use_deflate` is set but this version of meli isn't compiled with DEFLATE support. " ,
s . name . as_str ( ) ,
) ) ) ;
}
2020-07-30 17:58:53 +00:00
get_conf_val! ( s [ " danger_accept_invalid_certs " ] , false ) ? ;
2021-09-04 18:48:21 +00:00
let extra_keys = s
. extra
. keys ( )
. map ( String ::as_str )
. collect ::< HashSet < & str > > ( ) ;
let diff = extra_keys . difference ( & keys ) . collect ::< Vec < & & str > > ( ) ;
if ! diff . is_empty ( ) {
return Err ( MeliError ::new ( format! (
" Configuration error ({}) NNTP: the following flags are set but are not recognized: {:?}. " ,
s . name . as_str ( ) , diff
) ) ) ;
}
2020-07-30 17:58:53 +00:00
Ok ( ( ) )
}
pub fn capabilities ( & self ) -> Vec < String > {
self . uid_store
. capabilities
. lock ( )
. unwrap ( )
. iter ( )
2020-08-01 09:36:47 +00:00
. map ( | c | c . clone ( ) )
2020-07-30 17:58:53 +00:00
. collect ::< Vec < String > > ( )
}
}
2020-08-09 18:23:13 +00:00
struct FetchState {
2020-07-30 17:58:53 +00:00
mailbox_hash : MailboxHash ,
connection : Arc < FutureMutex < NntpConnection > > ,
2020-08-09 18:23:13 +00:00
uid_store : Arc < UIDStore > ,
high_low_total : Option < ( usize , usize , usize ) > ,
}
impl FetchState {
async fn fetch_envs ( & mut self ) -> Result < Option < Vec < Envelope > > > {
let FetchState {
mailbox_hash ,
ref connection ,
ref uid_store ,
ref mut high_low_total ,
} = self ;
let mailbox_hash = * mailbox_hash ;
let mut res = String ::with_capacity ( 8 * 1024 ) ;
let mut conn = connection . lock ( ) . await ;
if high_low_total . is_none ( ) {
conn . select_group ( mailbox_hash , true , & mut res ) . await ? ;
/*
* Parameters
group Name of newsgroup
number Estimated number of articles in the group
low Reported low water mark
high Reported high water mark
* /
let s = res . split_whitespace ( ) . collect ::< SmallVec < [ & str ; 6 ] > > ( ) ;
let path = conn . uid_store . mailboxes . lock ( ) . await [ & mailbox_hash ]
. name ( )
. to_string ( ) ;
if s . len ( ) ! = 5 {
return Err ( MeliError ::new ( format! (
" {} Could not select newsgroup {}: expected GROUP response but got: {} " ,
& uid_store . account_name , path , res
) ) ) ;
}
let total = usize ::from_str ( & s [ 1 ] ) . unwrap_or ( 0 ) ;
let _low = usize ::from_str ( & s [ 2 ] ) . unwrap_or ( 0 ) ;
let high = usize ::from_str ( & s [ 3 ] ) . unwrap_or ( 0 ) ;
* high_low_total = Some ( ( high , _low , total ) ) ;
{
let f = & uid_store . mailboxes . lock ( ) . await [ & mailbox_hash ] ;
f . exists . lock ( ) . unwrap ( ) . set_not_yet_seen ( total ) ;
f . unseen . lock ( ) . unwrap ( ) . set_not_yet_seen ( total ) ;
} ;
}
2020-08-25 12:55:21 +00:00
let ( high , low , _ ) = high_low_total . unwrap ( ) ;
2020-08-09 18:23:13 +00:00
if high < = low {
return Ok ( None ) ;
}
2021-09-04 13:06:42 +00:00
const CHUNK_SIZE : usize = 50000 ;
2020-08-09 18:23:13 +00:00
let new_low = std ::cmp ::max ( low , high . saturating_sub ( CHUNK_SIZE ) ) ;
high_low_total . as_mut ( ) . unwrap ( ) . 0 = new_low ;
2021-09-05 09:09:29 +00:00
// FIXME: server might not implement OVER capability
2020-08-09 18:23:13 +00:00
conn . send_command ( format! ( " OVER {} - {} " , new_low , high ) . as_bytes ( ) )
. await ? ;
conn . read_response ( & mut res , true , command_to_replycodes ( " OVER " ) )
. await
. chain_err_summary ( | | {
format! (
" {} Could not select newsgroup: expected OVER response but got: {} " ,
& uid_store . account_name , res
)
} ) ? ;
let mut ret = Vec ::with_capacity ( high - new_low ) ;
//hash_index: Arc<Mutex<HashMap<EnvelopeHash, (UID, MailboxHash)>>>,
//uid_index: Arc<Mutex<HashMap<(MailboxHash, UID), EnvelopeHash>>>,
2021-09-05 09:09:29 +00:00
let mut latest_article : Option < crate ::UnixTimestamp > = None ;
2020-08-09 18:23:13 +00:00
{
2021-09-05 09:09:29 +00:00
let mut message_id_lck = uid_store . message_id_index . lock ( ) . unwrap ( ) ;
2020-08-09 18:23:13 +00:00
let mut hash_index_lck = uid_store . hash_index . lock ( ) . unwrap ( ) ;
let mut uid_index_lck = uid_store . uid_index . lock ( ) . unwrap ( ) ;
for l in res . split_rn ( ) . skip ( 1 ) {
let ( _ , ( num , env ) ) = protocol_parser ::over_article ( & l ) ? ;
2021-09-05 09:09:29 +00:00
message_id_lck . insert ( env . message_id_display ( ) . to_string ( ) , env . hash ( ) ) ;
2020-08-09 18:23:13 +00:00
hash_index_lck . insert ( env . hash ( ) , ( num , mailbox_hash ) ) ;
uid_index_lck . insert ( ( mailbox_hash , num ) , env . hash ( ) ) ;
2021-09-05 09:09:29 +00:00
if let Some ( ref mut v ) = latest_article {
* v = std ::cmp ::max ( * v , env . timestamp ) ;
} else {
latest_article = Some ( env . timestamp ) ;
}
2020-08-09 18:23:13 +00:00
ret . push ( env ) ;
}
}
{
let hash_set : BTreeSet < EnvelopeHash > = ret . iter ( ) . map ( | env | env . hash ( ) ) . collect ( ) ;
let f = & uid_store . mailboxes . lock ( ) . await [ & mailbox_hash ] ;
2021-09-05 09:09:29 +00:00
* f . latest_article . lock ( ) . unwrap ( ) = latest_article ;
2020-08-09 18:23:13 +00:00
f . exists
. lock ( )
. unwrap ( )
. insert_existing_set ( hash_set . clone ( ) ) ;
f . unseen . lock ( ) . unwrap ( ) . insert_existing_set ( hash_set ) ;
} ;
Ok ( Some ( ret ) )
}
2020-07-30 17:58:53 +00:00
}