From 486d01272e9924e83ce3b7764a0d53619eb8d525 Mon Sep 17 00:00:00 2001 From: Manos Pitsidianakis Date: Fri, 14 Jul 2023 15:36:23 +0300 Subject: [PATCH] Async Cursors WIP --- melib/src/backends.rs | 145 ++++++++++++++++++++++---- melib/src/backends/imap.rs | 22 +++- melib/src/backends/imap/connection.rs | 2 + melib/src/backends/imap/cursor.rs | 80 ++++++++++++++ 4 files changed, 229 insertions(+), 20 deletions(-) create mode 100644 melib/src/backends/imap/cursor.rs diff --git a/melib/src/backends.rs b/melib/src/backends.rs index 6c673c90..5704c68a 100644 --- a/melib/src/backends.rs +++ b/melib/src/backends.rs @@ -21,6 +21,7 @@ pub mod utf7; use smallvec::SmallVec; +use uuid::Uuid; #[cfg(feature = "imap_backend")] pub mod imap; @@ -62,6 +63,7 @@ use super::email::{Envelope, EnvelopeHash, Flag}; use crate::{ conf::AccountSettings, error::{Error, ErrorKind, Result}, + search::Query, LogLevel, }; @@ -351,6 +353,8 @@ pub enum MailBackendExtensionStatus { } pub type ResultFuture = Result> + Send + 'static>>>; +#[allow(clippy::type_complexity)] +pub type EnvelopeStream = Pin>> + Send + 'static>>; pub trait MailBackend: ::std::fmt::Debug + Send + Sync { fn capabilities(&self) -> MailBackendCapabilities; @@ -358,11 +362,10 @@ pub trait MailBackend: ::std::fmt::Debug + Send + Sync { Ok(Box::pin(async { Ok(()) })) } - #[allow(clippy::type_complexity)] - fn fetch( - &mut self, - mailbox_hash: MailboxHash, - ) -> Result>> + Send + 'static>>>; + fn fetch(&mut self, mailbox_hash: MailboxHash) -> Result; + fn cursor(&mut self, _mailbox_hash: MailboxHash) -> Result { + Err(Error::new("Cursor not supported in this backend.").set_kind(ErrorKind::NotSupported)) + } fn refresh(&mut self, mailbox_hash: MailboxHash) -> ResultFuture<()>; fn watch(&self) -> ResultFuture<()>; @@ -747,20 +750,6 @@ impl LazyCountSet { } } -#[test] -fn test_lazy_count_set() { - let mut new = LazyCountSet::default(); - assert_eq!(new.len(), 0); - new.set_not_yet_seen(10); - assert_eq!(new.len(), 10); - for i in 0..10 { - assert!(new.insert_existing(EnvelopeHash(i))); - } - assert_eq!(new.len(), 10); - assert!(!new.insert_existing(EnvelopeHash(10))); - assert_eq!(new.len(), 10); -} - pub struct IsSubscribedFn(Box bool + Send + Sync>); impl std::fmt::Debug for IsSubscribedFn { @@ -775,3 +764,121 @@ impl std::ops::Deref for IsSubscribedFn { &self.0 } } + +use cursor::MailCursor; +pub mod cursor { + use super::*; + use futures::StreamExt; + use std::sync::atomic::{AtomicUsize, Ordering}; + + pub trait Cursor: std::fmt::Debug + Send + Sync { + fn fetch(&self, start: usize, count: usize) -> Result; + fn total(&self) -> ResultFuture>; + fn unseen(&self) -> ResultFuture>; + fn has_updates(&self) -> ResultFuture; + fn reset(&self) -> ResultFuture<()>; + fn query(&self) -> Result>; + } + + pub struct MailCursor { + pub(crate) id: Uuid, + pub(crate) inner: Arc, + pub(crate) length: Arc, + pub(crate) page_size: Arc, + pub(crate) position: Arc, + } + + impl From> for MailCursor { + fn from(c: Box) -> Self { + Self { + id: Uuid::new_v4(), + inner: c.into(), + length: Arc::new(0.into()), + page_size: Arc::new(0.into()), + position: Arc::new(0.into()), + } + } + } + + impl MailCursor { + pub fn new(cursor: Box) -> Self { + Self::from(cursor) + } + + pub fn set_page_size(&self, new_value: usize) -> &Self { + self.page_size.store(new_value, Ordering::Relaxed); + self + } + + pub fn set_position(&self, new_value: usize) -> &Self { + let length = self.length(); + self.position.store( + std::cmp::min(length.saturating_sub(1), new_value), + Ordering::Relaxed, + ); + self + } + + pub fn length(&self) -> usize { + self.length.load(Ordering::Relaxed) + } + + pub fn position(&self) -> usize { + self.position.load(Ordering::Relaxed) + } + + pub fn page_size(&self) -> usize { + self.page_size.load(Ordering::Relaxed) + } + + pub fn id(&self) -> Uuid { + self.id + } + + pub async fn update(self) -> Result<()> { + if !self.inner.has_updates()?.await? { + return Ok(()); + } + self.inner.reset()?.await?; + if let Some(length) = self.inner.total()?.await? { + self.length.store(length, Ordering::Relaxed); + self.set_position(self.position()); + } else { + self.length.store(0, Ordering::Relaxed); + self.position.store(0, Ordering::Relaxed); + } + + Ok(()) + } + + pub async fn fetch(self) -> Result> { + if self.position() >= self.length().saturating_sub(1) { + //return Ok(None); + } + + let stream = self.inner.fetch(self.position(), self.page_size())?; + self.set_position(self.position() + self.page_size()); + + Ok(Some(stream)) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_lazy_count_set() { + let mut new = LazyCountSet::default(); + assert_eq!(new.len(), 0); + new.set_not_yet_seen(10); + assert_eq!(new.len(), 10); + for i in 0..10 { + assert!(new.insert_existing(EnvelopeHash(i))); + } + assert_eq!(new.len(), 10); + assert!(!new.insert_existing(EnvelopeHash(10))); + assert_eq!(new.len(), 10); + } +} diff --git a/melib/src/backends/imap.rs b/melib/src/backends/imap.rs index 70e1d49a..39a478ed 100644 --- a/melib/src/backends/imap.rs +++ b/melib/src/backends/imap.rs @@ -36,6 +36,7 @@ mod search; pub use search::*; mod cache; use cache::{ImapCacheReset, ModSequence}; +mod cursor; pub mod error; pub mod managesieve; mod untagged; @@ -50,7 +51,10 @@ use std::{ time::{Duration, SystemTime}, }; -use futures::{lock::Mutex as FutureMutex, stream::Stream}; +use futures::{ + lock::Mutex as FutureMutex, + stream::{Stream, StreamExt}, +}; use imap_codec::{ command::CommandBody, core::Literal, @@ -368,7 +372,11 @@ impl MailBackend for ImapType { unseen.set_not_yet_seen(total); } }; + let cursor = self.cursor(mailbox_hash)?; Ok(Box::pin(async_stream::try_stream! { + if let Ok(Some(c)) = cursor.fetch().await{ + dbg!(c.into_future().await.0); + } #[cfg(debug_assertions)] let id = state.connection.lock().await.id.clone(); { @@ -394,6 +402,18 @@ impl MailBackend for ImapType { })) } + fn cursor(&mut self, mailbox_hash: MailboxHash) -> Result { + let imap_cursor = cursor::ImapCursor { + connection: self.connection.clone(), + mailbox_hash, + uid_store: self.uid_store.clone(), + }; + + Ok(MailCursor::from( + Box::new(imap_cursor) as Box + )) + } + fn refresh(&mut self, mailbox_hash: MailboxHash) -> ResultFuture<()> { let main_conn = self.connection.clone(); let uid_store = self.uid_store.clone(); diff --git a/melib/src/backends/imap/connection.rs b/melib/src/backends/imap/connection.rs index 21275848..ace2a267 100644 --- a/melib/src/backends/imap/connection.rs +++ b/melib/src/backends/imap/connection.rs @@ -466,6 +466,7 @@ impl ImapStream { ImapProtocol::ManageSieve => Vec::new(), }; self.read_lines(ret, &id, true).await?; + log::trace!("response {}", String::from_utf8_lossy(&ret)); Ok(()) } @@ -584,6 +585,7 @@ impl ImapStream { ImapProtocol::ManageSieve => {} } + log::trace!("sending {}", String::from_utf8_lossy(command)); self.stream.write_all(command).await?; self.stream.write_all(b"\r\n").await?; self.stream.flush().await?; diff --git a/melib/src/backends/imap/cursor.rs b/melib/src/backends/imap/cursor.rs new file mode 100644 index 00000000..0b82b046 --- /dev/null +++ b/melib/src/backends/imap/cursor.rs @@ -0,0 +1,80 @@ +/* + * ____ + * + * Copyright ____ Manos Pitsidianakis + * + * This file is part of ____. + * + * ____ is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * ____ is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with ____. If not, see . + */ + +use super::*; + +#[derive(Debug)] +pub struct ImapCursor { + pub connection: Arc>, + pub mailbox_hash: MailboxHash, + pub uid_store: Arc, +} + +impl crate::cursor::Cursor for ImapCursor { + fn fetch(&self, start: usize, count: usize) -> Result { + let connection = self.connection.clone(); + let mailbox_hash = self.mailbox_hash; + let uid_store = self.uid_store.clone(); + Ok(Box::pin(async_stream::try_stream! { + let mut response = Vec::with_capacity(8 * 1024); + { + let f = &uid_store.mailboxes.lock().await[&mailbox_hash]; + if f.no_select { + yield vec![]; + return; + } + }; + loop { + { + let mut conn = connection.lock().await; + conn.examine_mailbox(mailbox_hash, &mut response, false) + .await?; + conn.send_command_raw(b"UID SORT (REVERSE ARRIVAL) UTF-8 ALL").await?; + conn.read_response(&mut response, RequiredResponses::empty()) + .await?; + } + + yield vec![]; + return; + } + })) + } + + fn total(&self) -> ResultFuture> { + Err(Error::new("Cursor not supported in this backend.").set_kind(ErrorKind::NotSupported)) + } + + fn unseen(&self) -> ResultFuture> { + Err(Error::new("Cursor not supported in this backend.").set_kind(ErrorKind::NotSupported)) + } + + fn has_updates(&self) -> ResultFuture { + Err(Error::new("Cursor not supported in this backend.").set_kind(ErrorKind::NotSupported)) + } + + fn reset(&self) -> ResultFuture<()> { + Err(Error::new("Cursor not supported in this backend.").set_kind(ErrorKind::NotSupported)) + } + + fn query(&self) -> Result> { + Err(Error::new("Cursor not supported in this backend.").set_kind(ErrorKind::NotSupported)) + } +}