From ec1cc8100eac95c3f8fb0142adbc6b8e0363b30e Mon Sep 17 00:00:00 2001 From: Manos Pitsidianakis Date: Sat, 19 Aug 2023 09:16:44 +0300 Subject: [PATCH] WIP implement wait/delay for scheduled jobs This will allow scheduling reconnect attempts in the future instead of sleeping inside the async job. Then, the reconnect job won't appear active until its time to run has arrived. The frontend can then inspect reconnect attempts and display messages like "Retrying in X time" to the user. Signed-off-by: Manos Pitsidianakis --- meli/src/conf/accounts.rs | 91 ++++++++++++++------------------ meli/src/jobs.rs | 63 ++++++++++++++++++++-- meli/src/mail/listing/offline.rs | 27 +++++++++- 3 files changed, 126 insertions(+), 55 deletions(-) diff --git a/meli/src/conf/accounts.rs b/meli/src/conf/accounts.rs index 007bb197..dd36d23f 100644 --- a/meli/src/conf/accounts.rs +++ b/meli/src/conf/accounts.rs @@ -33,7 +33,7 @@ use std::{ pin::Pin, result, sync::{Arc, RwLock}, - time::Duration, + time::{Duration, Instant}, }; use futures::{ @@ -174,6 +174,7 @@ pub enum IsOnline { Err { value: Error, retries: u64, + wait: Option<(Instant, Duration)>, }, } @@ -189,9 +190,19 @@ impl IsOnline { *self = Self::Err { value: err, retries: 1, + wait: None, }; } } + + pub fn wait_left(&self) -> Option { + if let Self::Err { ref wait, .. } = self { + let wait = wait.as_ref()?; + Some(wait.1.saturating_sub(Instant::now() - wait.0)) + } else { + None + } + } } #[derive(Debug)] @@ -210,7 +221,7 @@ pub struct Account { pub main_loop_handler: MainLoopHandler, pub active_jobs: HashMap, - pub active_job_instants: BTreeMap, + pub active_job_instants: BTreeMap, pub event_queue: VecDeque<(MailboxHash, RefreshEvent)>, pub backend_capabilities: MailBackendCapabilities, } @@ -526,7 +537,7 @@ impl Account { }; let job_id = handle.job_id; active_jobs.insert(job_id, JobRequest::Mailboxes { handle }); - active_job_instants.insert(std::time::Instant::now(), job_id); + active_job_instants.insert(Instant::now(), job_id); main_loop_handler.send(ThreadEvent::UIEvent(UIEvent::StatusEvent( StatusEvent::NewJob(job_id), ))); @@ -737,8 +748,7 @@ impl Account { handle, }, ); - self.active_job_instants - .insert(std::time::Instant::now(), job_id); + self.active_job_instants.insert(Instant::now(), job_id); } } }); @@ -1640,6 +1650,7 @@ impl Account { IsOnline::Err { value, ref mut retries, + ref mut wait, } => { let ret = Err(value.clone()); if value.kind.is_authentication() @@ -1655,52 +1666,41 @@ impl Account { { return ret; } - let wait = if value.kind.is_timeout() + let new_wait = if value.kind.is_timeout() || value.kind.is_network_down() || value.kind.is_oserror() { let oldval = *retries; - if oldval != 8 { + if oldval != 4 { *retries *= 2; + Some(Duration::from_millis( + oldval * (4 * melib::utils::random::random_u8() as u64), + )) + } else { + Some(Duration::from_secs(5 * 60)) } - Some(Duration::from_millis( - oldval * (4 * melib::utils::random::random_u8() as u64), - )) } else { None }; + *wait = new_wait.map(|dur| (Instant::now(), dur)); - (ret, wait) + (ret, new_wait) } }; + if !self.active_jobs.values().any(JobRequest::is_online) { let online_fut = self.backend.read().unwrap().is_online(); if let Ok(online_fut) = online_fut { - use melib::utils::futures::sleep; - - let handle = match (wait, self.backend_capabilities.is_async) { - (Some(wait), true) => self.main_loop_handler.job_executor.spawn_specialized( - "is_online".into(), - async move { - sleep(wait).await; - online_fut.await - }, - ), - (None, true) => self - .main_loop_handler + let handle = if self.backend_capabilities.is_async { + self.main_loop_handler .job_executor - .spawn_specialized("is_online".into(), online_fut), - (Some(wait), false) => self.main_loop_handler.job_executor.spawn_blocking( + .spawn_specialized_delayed("is_online".into(), online_fut, wait) + } else { + self.main_loop_handler.job_executor.spawn_blocking_delayed( "is_online".into(), - async move { - sleep(wait).await; - online_fut.await - }, - ), - (None, false) => self - .main_loop_handler - .job_executor - .spawn_blocking("is_online".into(), online_fut), + online_fut, + wait, + ) }; self.insert_job(handle.job_id, JobRequest::IsOnline { handle }); } @@ -1908,23 +1908,15 @@ impl Account { return true; } Err(value) => { - self.is_online = IsOnline::Err { value, retries: 1 }; + self.is_online = IsOnline::Err { + value, + retries: 1, + wait: None, + }; } } } - let online_job = self.backend.read().unwrap().is_online(); - if let Ok(online_job) = online_job { - let handle = if self.backend_capabilities.is_async { - self.main_loop_handler - .job_executor - .spawn_specialized("is_online".into(), online_job) - } else { - self.main_loop_handler - .job_executor - .spawn_blocking("is_online".into(), online_job) - }; - self.insert_job(handle.job_id, JobRequest::IsOnline { handle }); - }; + _ = self.is_online(); } JobRequest::Refresh { ref mut handle, .. } => { match handle.chan.try_recv() { @@ -2333,8 +2325,7 @@ impl Account { pub fn insert_job(&mut self, job_id: JobId, job: JobRequest) { self.active_jobs.insert(job_id, job); - self.active_job_instants - .insert(std::time::Instant::now(), job_id); + self.active_job_instants.insert(Instant::now(), job_id); self.main_loop_handler .send(ThreadEvent::UIEvent(UIEvent::StatusEvent( StatusEvent::NewJob(job_id), diff --git a/meli/src/jobs.rs b/meli/src/jobs.rs index 8155e581..6830a817 100644 --- a/meli/src/jobs.rs +++ b/meli/src/jobs.rs @@ -38,7 +38,12 @@ use crossbeam::{ }; pub use futures::channel::oneshot; use indexmap::IndexMap; -use melib::{log, smol, utils::datetime, uuid::Uuid, UnixTimestamp}; +use melib::{ + log, smol, + utils::{datetime, futures::sleep}, + uuid::Uuid, + UnixTimestamp, +}; use crate::types::{ThreadEvent, UIEvent}; @@ -117,6 +122,7 @@ pub struct JobMetadata { pub desc: Cow<'static, str>, pub timer: bool, pub started: UnixTimestamp, + pub delay: Option, pub finished: Option, pub succeeded: bool, } @@ -233,8 +239,12 @@ impl JobExecutor { ret } - /// Spawns a future with a generic return value `R` - pub fn spawn_specialized(&self, desc: Cow<'static, str>, future: F) -> JoinHandle + fn inner_spawn_specialized( + &self, + desc: Cow<'static, str>, + future: F, + delay: Option, + ) -> JoinHandle where F: Future + Send + 'static, R: Send + 'static, @@ -252,6 +262,7 @@ impl JobExecutor { id: job_id, desc: desc.clone(), started: datetime::now(), + delay, finished: None, succeeded: true, timer: false, @@ -261,6 +272,10 @@ impl JobExecutor { // Create a task and schedule it for execution. let (handle, task) = async_task::spawn( async move { + if let Some(delay) = delay { + sleep(delay).await; + } + let res = future.await; let _ = sender.send(res); finished_sender @@ -293,6 +308,29 @@ impl JobExecutor { } } + /// Spawns a future with a generic return value `R` + pub fn spawn_specialized(&self, desc: Cow<'static, str>, future: F) -> JoinHandle + where + F: Future + Send + 'static, + R: Send + 'static, + { + self.inner_spawn_specialized(desc, future, None) + } + + /// Spawns a future with a generic return value `R` + pub fn spawn_specialized_delayed( + &self, + desc: Cow<'static, str>, + future: F, + delay: Option, + ) -> JoinHandle + where + F: Future + Send + 'static, + R: Send + 'static, + { + self.inner_spawn_specialized(desc, future, delay) + } + /// Spawns a future with a generic return value `R` that might block on a /// new thread pub fn spawn_blocking(&self, desc: Cow<'static, str>, future: F) -> JoinHandle @@ -306,6 +344,25 @@ impl JobExecutor { ) } + /// Spawns a future with a generic return value `R` that might block on a + /// new thread + pub fn spawn_blocking_delayed( + &self, + desc: Cow<'static, str>, + future: F, + delay: Option, + ) -> JoinHandle + where + F: Future + Send + 'static, + R: Send + 'static, + { + self.spawn_specialized_delayed( + desc, + smol::unblock(move || futures::executor::block_on(future)), + delay, + ) + } + pub fn create_timer(self: Arc, interval: Duration, value: Duration) -> Timer { let timer = TimerPrivate { interval, diff --git a/meli/src/mail/listing/offline.rs b/meli/src/mail/listing/offline.rs index 25d1509a..b5554f8c 100644 --- a/meli/src/mail/listing/offline.rs +++ b/meli/src/mail/listing/offline.rs @@ -141,6 +141,7 @@ impl Component for OfflineListing { let error_message = conf::value(context, "error_message"); clear_area(grid, area, theme_default); if let Err(err) = context.is_online(self.cursor_pos.0) { + let mut y_offset = 1; let (x, _) = write_string_to_grid( "offline: ", grid, @@ -159,6 +160,21 @@ impl Component for OfflineListing { (set_x(upper_left!(area), x + 1), bottom_right!(area)), Some(get_x(upper_left!(area))), ); + if let Some(wait) = context.accounts[&self.cursor_pos.0].is_online.wait_left() { + write_string_to_grid( + &format!("Retrying in {} seconds.", wait.as_secs()), + grid, + text_unfocused.fg, + text_unfocused.bg, + Attr::BOLD, + ( + pos_inc((0, y_offset), upper_left!(area)), + bottom_right!(area), + ), + None, + ); + y_offset += 1; + } if let Some(msg) = self.messages.last() { write_string_to_grid( msg, @@ -166,9 +182,13 @@ impl Component for OfflineListing { text_unfocused.fg, text_unfocused.bg, Attr::BOLD, - (pos_inc((0, 1), upper_left!(area)), bottom_right!(area)), + ( + pos_inc((0, y_offset), upper_left!(area)), + bottom_right!(area), + ), None, ); + y_offset += 1; } for (i, msg) in self.messages.iter().rev().skip(1).enumerate() { write_string_to_grid( @@ -177,7 +197,10 @@ impl Component for OfflineListing { text_unfocused.fg, text_unfocused.bg, text_unfocused.attrs, - (pos_inc((0, 2 + i), upper_left!(area)), bottom_right!(area)), + ( + pos_inc((0, y_offset + i), upper_left!(area)), + bottom_right!(area), + ), None, ); }