Async Cursors WIP

pull/258/head
Manos Pitsidianakis 10 months ago
parent cf9a04a591
commit 486d01272e
No known key found for this signature in database
GPG Key ID: 7729C7707F7E09D0

@ -21,6 +21,7 @@
pub mod utf7;
use smallvec::SmallVec;
use uuid::Uuid;
#[cfg(feature = "imap_backend")]
pub mod imap;
@ -62,6 +63,7 @@ use super::email::{Envelope, EnvelopeHash, Flag};
use crate::{
conf::AccountSettings,
error::{Error, ErrorKind, Result},
search::Query,
LogLevel,
};
@ -351,6 +353,8 @@ pub enum MailBackendExtensionStatus {
}
pub type ResultFuture<T> = Result<Pin<Box<dyn Future<Output = Result<T>> + Send + 'static>>>;
#[allow(clippy::type_complexity)]
pub type EnvelopeStream = Pin<Box<dyn Stream<Item = Result<Vec<Envelope>>> + Send + 'static>>;
pub trait MailBackend: ::std::fmt::Debug + Send + Sync {
fn capabilities(&self) -> MailBackendCapabilities;
@ -358,11 +362,10 @@ pub trait MailBackend: ::std::fmt::Debug + Send + Sync {
Ok(Box::pin(async { Ok(()) }))
}
#[allow(clippy::type_complexity)]
fn fetch(
&mut self,
mailbox_hash: MailboxHash,
) -> Result<Pin<Box<dyn Stream<Item = Result<Vec<Envelope>>> + Send + 'static>>>;
fn fetch(&mut self, mailbox_hash: MailboxHash) -> Result<EnvelopeStream>;
fn cursor(&mut self, _mailbox_hash: MailboxHash) -> Result<MailCursor> {
Err(Error::new("Cursor not supported in this backend.").set_kind(ErrorKind::NotSupported))
}
fn refresh(&mut self, mailbox_hash: MailboxHash) -> ResultFuture<()>;
fn watch(&self) -> ResultFuture<()>;
@ -747,20 +750,6 @@ impl LazyCountSet {
}
}
#[test]
fn test_lazy_count_set() {
let mut new = LazyCountSet::default();
assert_eq!(new.len(), 0);
new.set_not_yet_seen(10);
assert_eq!(new.len(), 10);
for i in 0..10 {
assert!(new.insert_existing(EnvelopeHash(i)));
}
assert_eq!(new.len(), 10);
assert!(!new.insert_existing(EnvelopeHash(10)));
assert_eq!(new.len(), 10);
}
pub struct IsSubscribedFn(Box<dyn Fn(&str) -> bool + Send + Sync>);
impl std::fmt::Debug for IsSubscribedFn {
@ -775,3 +764,121 @@ impl std::ops::Deref for IsSubscribedFn {
&self.0
}
}
use cursor::MailCursor;
pub mod cursor {
use super::*;
use futures::StreamExt;
use std::sync::atomic::{AtomicUsize, Ordering};
pub trait Cursor: std::fmt::Debug + Send + Sync {
fn fetch(&self, start: usize, count: usize) -> Result<EnvelopeStream>;
fn total(&self) -> ResultFuture<Option<usize>>;
fn unseen(&self) -> ResultFuture<Option<usize>>;
fn has_updates(&self) -> ResultFuture<bool>;
fn reset(&self) -> ResultFuture<()>;
fn query(&self) -> Result<Arc<Query>>;
}
pub struct MailCursor {
pub(crate) id: Uuid,
pub(crate) inner: Arc<dyn Cursor>,
pub(crate) length: Arc<AtomicUsize>,
pub(crate) page_size: Arc<AtomicUsize>,
pub(crate) position: Arc<AtomicUsize>,
}
impl From<Box<dyn Cursor>> for MailCursor {
fn from(c: Box<dyn Cursor>) -> Self {
Self {
id: Uuid::new_v4(),
inner: c.into(),
length: Arc::new(0.into()),
page_size: Arc::new(0.into()),
position: Arc::new(0.into()),
}
}
}
impl MailCursor {
pub fn new(cursor: Box<dyn Cursor>) -> Self {
Self::from(cursor)
}
pub fn set_page_size(&self, new_value: usize) -> &Self {
self.page_size.store(new_value, Ordering::Relaxed);
self
}
pub fn set_position(&self, new_value: usize) -> &Self {
let length = self.length();
self.position.store(
std::cmp::min(length.saturating_sub(1), new_value),
Ordering::Relaxed,
);
self
}
pub fn length(&self) -> usize {
self.length.load(Ordering::Relaxed)
}
pub fn position(&self) -> usize {
self.position.load(Ordering::Relaxed)
}
pub fn page_size(&self) -> usize {
self.page_size.load(Ordering::Relaxed)
}
pub fn id(&self) -> Uuid {
self.id
}
pub async fn update(self) -> Result<()> {
if !self.inner.has_updates()?.await? {
return Ok(());
}
self.inner.reset()?.await?;
if let Some(length) = self.inner.total()?.await? {
self.length.store(length, Ordering::Relaxed);
self.set_position(self.position());
} else {
self.length.store(0, Ordering::Relaxed);
self.position.store(0, Ordering::Relaxed);
}
Ok(())
}
pub async fn fetch(self) -> Result<Option<EnvelopeStream>> {
if self.position() >= self.length().saturating_sub(1) {
//return Ok(None);
}
let stream = self.inner.fetch(self.position(), self.page_size())?;
self.set_position(self.position() + self.page_size());
Ok(Some(stream))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_lazy_count_set() {
let mut new = LazyCountSet::default();
assert_eq!(new.len(), 0);
new.set_not_yet_seen(10);
assert_eq!(new.len(), 10);
for i in 0..10 {
assert!(new.insert_existing(EnvelopeHash(i)));
}
assert_eq!(new.len(), 10);
assert!(!new.insert_existing(EnvelopeHash(10)));
assert_eq!(new.len(), 10);
}
}

@ -36,6 +36,7 @@ mod search;
pub use search::*;
mod cache;
use cache::{ImapCacheReset, ModSequence};
mod cursor;
pub mod error;
pub mod managesieve;
mod untagged;
@ -50,7 +51,10 @@ use std::{
time::{Duration, SystemTime},
};
use futures::{lock::Mutex as FutureMutex, stream::Stream};
use futures::{
lock::Mutex as FutureMutex,
stream::{Stream, StreamExt},
};
use imap_codec::{
command::CommandBody,
core::Literal,
@ -368,7 +372,11 @@ impl MailBackend for ImapType {
unseen.set_not_yet_seen(total);
}
};
let cursor = self.cursor(mailbox_hash)?;
Ok(Box::pin(async_stream::try_stream! {
if let Ok(Some(c)) = cursor.fetch().await{
dbg!(c.into_future().await.0);
}
#[cfg(debug_assertions)]
let id = state.connection.lock().await.id.clone();
{
@ -394,6 +402,18 @@ impl MailBackend for ImapType {
}))
}
fn cursor(&mut self, mailbox_hash: MailboxHash) -> Result<MailCursor> {
let imap_cursor = cursor::ImapCursor {
connection: self.connection.clone(),
mailbox_hash,
uid_store: self.uid_store.clone(),
};
Ok(MailCursor::from(
Box::new(imap_cursor) as Box<dyn crate::cursor::Cursor>
))
}
fn refresh(&mut self, mailbox_hash: MailboxHash) -> ResultFuture<()> {
let main_conn = self.connection.clone();
let uid_store = self.uid_store.clone();

@ -466,6 +466,7 @@ impl ImapStream {
ImapProtocol::ManageSieve => Vec::new(),
};
self.read_lines(ret, &id, true).await?;
log::trace!("response {}", String::from_utf8_lossy(&ret));
Ok(())
}
@ -584,6 +585,7 @@ impl ImapStream {
ImapProtocol::ManageSieve => {}
}
log::trace!("sending {}", String::from_utf8_lossy(command));
self.stream.write_all(command).await?;
self.stream.write_all(b"\r\n").await?;
self.stream.flush().await?;

@ -0,0 +1,80 @@
/*
* ____
*
* Copyright ____ Manos Pitsidianakis
*
* This file is part of ____.
*
* ____ is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* ____ is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with ____. If not, see <http://www.gnu.org/licenses/>.
*/
use super::*;
#[derive(Debug)]
pub struct ImapCursor {
pub connection: Arc<FutureMutex<ImapConnection>>,
pub mailbox_hash: MailboxHash,
pub uid_store: Arc<UIDStore>,
}
impl crate::cursor::Cursor for ImapCursor {
fn fetch(&self, start: usize, count: usize) -> Result<EnvelopeStream> {
let connection = self.connection.clone();
let mailbox_hash = self.mailbox_hash;
let uid_store = self.uid_store.clone();
Ok(Box::pin(async_stream::try_stream! {
let mut response = Vec::with_capacity(8 * 1024);
{
let f = &uid_store.mailboxes.lock().await[&mailbox_hash];
if f.no_select {
yield vec![];
return;
}
};
loop {
{
let mut conn = connection.lock().await;
conn.examine_mailbox(mailbox_hash, &mut response, false)
.await?;
conn.send_command_raw(b"UID SORT (REVERSE ARRIVAL) UTF-8 ALL").await?;
conn.read_response(&mut response, RequiredResponses::empty())
.await?;
}
yield vec![];
return;
}
}))
}
fn total(&self) -> ResultFuture<Option<usize>> {
Err(Error::new("Cursor not supported in this backend.").set_kind(ErrorKind::NotSupported))
}
fn unseen(&self) -> ResultFuture<Option<usize>> {
Err(Error::new("Cursor not supported in this backend.").set_kind(ErrorKind::NotSupported))
}
fn has_updates(&self) -> ResultFuture<bool> {
Err(Error::new("Cursor not supported in this backend.").set_kind(ErrorKind::NotSupported))
}
fn reset(&self) -> ResultFuture<()> {
Err(Error::new("Cursor not supported in this backend.").set_kind(ErrorKind::NotSupported))
}
fn query(&self) -> Result<Arc<Query>> {
Err(Error::new("Cursor not supported in this backend.").set_kind(ErrorKind::NotSupported))
}
}
Loading…
Cancel
Save