jobs: make `cancel` flag an AtomicBool

Signed-off-by: Manos Pitsidianakis <manos@pitsidianak.is>
pull/473/head
Manos Pitsidianakis 2 months ago
parent 4bbf446bc1
commit 9c1b442452
No known key found for this signature in database
GPG Key ID: 7729C7707F7E09D0

@ -1323,9 +1323,20 @@ impl Account {
.send(ThreadEvent::UIEvent(UIEvent::StatusEvent(
StatusEvent::JobFinished(*job_id),
)));
let job_id = *job_id;
macro_rules! is_canceled {
($handle:expr) => {{
if $handle.is_canceled() {
self.main_loop_handler
.job_executor
.set_job_success(job_id, false);
/* canceled */
return true;
}
}};
}
if let Some(mut job) = self.active_jobs.remove(job_id) {
let job_id = *job_id;
if let Some(mut job) = self.active_jobs.remove(&job_id) {
match job {
JobRequest::Mailbox(inner) => {
self.process_mailbox_event(job_id, inner);
@ -1336,6 +1347,7 @@ impl Account {
ref mut handle,
..
} => {
is_canceled! { handle };
log::trace!("got payload in status for {}", mailbox_hash);
match handle.chan.try_recv() {
Err(_) => {
@ -1439,6 +1451,7 @@ impl Account {
{
return true;
}
is_canceled! { handle };
if let Ok(Some(is_online)) = handle.chan.try_recv() {
self.main_loop_handler.send(ThreadEvent::UIEvent(
UIEvent::AccountStatusChange(self.hash, None),
@ -1473,6 +1486,7 @@ impl Account {
{
return true;
}
is_canceled! { handle };
match handle.chan.try_recv() {
Err(_) => { /* canceled */ }
Ok(None) => {}
@ -1502,70 +1516,77 @@ impl Account {
ref env_hashes,
ref mailbox_hash,
ref flags,
} => match handle.chan.try_recv() {
Ok(Some(Err(err))) => {
self.main_loop_handler
.job_executor
.set_job_success(job_id, false);
self.main_loop_handler
.send(ThreadEvent::UIEvent(UIEvent::Notification {
title: Some(format!("{}: could not set flag", &self.name).into()),
source: None,
body: err.to_string().into(),
kind: Some(NotificationType::Error(err.kind)),
}));
}
Ok(Some(Ok(()))) => {
for env_hash in env_hashes.iter() {
if !self.collection.contains_key(&env_hash) {
continue;
}
let mut env_lck = self.collection.envelopes.write().unwrap();
env_lck.entry(env_hash).and_modify(|entry| {
for op in flags.iter() {
match op {
FlagOp::Set(f) => {
let mut flags = entry.flags();
flags.set(*f, true);
entry.set_flags(flags);
}
FlagOp::UnSet(f) => {
let mut flags = entry.flags();
flags.set(*f, false);
entry.set_flags(flags);
}
FlagOp::SetTag(t) => {
entry
.tags_mut()
.insert(TagHash::from_bytes(t.as_bytes()));
}
FlagOp::UnSetTag(t) => {
entry
.tags_mut()
.shift_remove(&TagHash::from_bytes(t.as_bytes()));
} => {
is_canceled! { handle };
match handle.chan.try_recv() {
Ok(Some(Err(err))) => {
self.main_loop_handler
.job_executor
.set_job_success(job_id, false);
self.main_loop_handler.send(ThreadEvent::UIEvent(
UIEvent::Notification {
title: Some(
format!("{}: could not set flag", &self.name).into(),
),
source: None,
body: err.to_string().into(),
kind: Some(NotificationType::Error(err.kind)),
},
));
}
Ok(Some(Ok(()))) => {
for env_hash in env_hashes.iter() {
if !self.collection.contains_key(&env_hash) {
continue;
}
let mut env_lck = self.collection.envelopes.write().unwrap();
env_lck.entry(env_hash).and_modify(|entry| {
for op in flags.iter() {
match op {
FlagOp::Set(f) => {
let mut flags = entry.flags();
flags.set(*f, true);
entry.set_flags(flags);
}
FlagOp::UnSet(f) => {
let mut flags = entry.flags();
flags.set(*f, false);
entry.set_flags(flags);
}
FlagOp::SetTag(t) => {
entry
.tags_mut()
.insert(TagHash::from_bytes(t.as_bytes()));
}
FlagOp::UnSetTag(t) => {
entry.tags_mut().shift_remove(
&TagHash::from_bytes(t.as_bytes()),
);
}
}
}
});
#[cfg(feature = "sqlite3")]
if let Some(env) = env_lck.get(&env_hash).cloned() {
drop(env_lck);
self.update_cached_env(env, None);
}
});
#[cfg(feature = "sqlite3")]
if let Some(env) = env_lck.get(&env_hash).cloned() {
drop(env_lck);
self.update_cached_env(env, None);
}
for env_hash in env_hashes.iter() {
self.collection.update_flags(env_hash, *mailbox_hash);
self.main_loop_handler
.send(ThreadEvent::UIEvent(UIEvent::EnvelopeUpdate(env_hash)));
}
}
for env_hash in env_hashes.iter() {
self.collection.update_flags(env_hash, *mailbox_hash);
self.main_loop_handler
.send(ThreadEvent::UIEvent(UIEvent::EnvelopeUpdate(env_hash)));
}
Err(_) | Ok(None) => {}
}
Err(_) | Ok(None) => {}
},
}
JobRequest::SaveMessage {
ref mut handle,
ref bytes,
..
} => {
is_canceled! { handle };
if let Ok(Some(Err(err))) = handle.chan.try_recv() {
self.main_loop_handler
.job_executor
@ -1607,6 +1628,7 @@ impl Account {
}
JobRequest::SendMessage => {}
JobRequest::SendMessageBackground { ref mut handle, .. } => {
is_canceled! { handle };
if let Ok(Some(Err(err))) = handle.chan.try_recv() {
self.main_loop_handler
.job_executor
@ -1621,6 +1643,7 @@ impl Account {
}
}
JobRequest::DeleteMessages { ref mut handle, .. } => {
is_canceled! { handle };
if let Ok(Some(Err(err))) = handle.chan.try_recv() {
self.main_loop_handler
.job_executor
@ -1670,6 +1693,7 @@ impl Account {
ref mut on_finish,
log_level,
} => {
is_canceled! { handle };
match handle.chan.try_recv() {
Ok(Some(Err(err))) => {
self.main_loop_handler

@ -26,7 +26,7 @@ use futures::stream::Stream;
use melib::{backends::*, email::*, error::Result, LogLevel};
use smallvec::SmallVec;
use crate::{is_variant, jobs::JoinHandle};
use crate::{is_variant, jobs::JoinHandle, StatusEvent};
pub enum MailboxJobRequest {
Mailboxes {
@ -94,6 +94,19 @@ impl std::fmt::Display for MailboxJobRequest {
}
}
impl MailboxJobRequest {
pub fn cancel(&self) -> Option<StatusEvent> {
match self {
Self::Mailboxes { handle } => handle.cancel(),
Self::CreateMailbox { handle, .. } => handle.cancel(),
Self::DeleteMailbox { handle, .. } => handle.cancel(),
Self::RenameMailbox { handle, .. } => handle.cancel(),
Self::SetMailboxPermissions { handle, .. } => handle.cancel(),
Self::SetMailboxSubscription { handle, .. } => handle.cancel(),
}
}
}
pub enum JobRequest {
Fetch {
mailbox_hash: MailboxHash,
@ -208,10 +221,27 @@ impl std::fmt::Display for JobRequest {
impl JobRequest {
is_variant! { is_watch, Watch { .. } }
is_variant! { is_online, IsOnline { .. } }
is_variant! { is_any_fetch, Fetch { .. } }
pub fn is_fetch(&self, mailbox_hash: MailboxHash) -> bool {
matches!(self, Self::Fetch {
mailbox_hash: h, ..
} if *h == mailbox_hash)
}
pub fn cancel(&self) -> Option<StatusEvent> {
match self {
Self::Generic { handle, .. } => handle.cancel(),
Self::Mailbox(inner) => inner.cancel(),
Self::Fetch { handle, .. } => handle.cancel(),
Self::IsOnline { handle, .. } => handle.cancel(),
Self::Refresh { handle, .. } => handle.cancel(),
Self::SetFlags { handle, .. } => handle.cancel(),
Self::SaveMessage { handle, .. } => handle.cancel(),
Self::DeleteMessages { handle, .. } => handle.cancel(),
Self::Watch { handle, .. } => handle.cancel(),
Self::SendMessage => None,
Self::SendMessageBackground { handle, .. } => handle.cancel(),
}
}
}

@ -26,7 +26,10 @@ use std::{
future::Future,
iter,
panic::catch_unwind,
sync::{Arc, Mutex},
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, Mutex,
},
thread,
time::Duration,
};
@ -49,6 +52,7 @@ pub enum IsAsync {
}
type AsyncTask = async_task::Runnable;
type AtomicUnixTimestamp = Arc<AtomicU64>;
fn find_task(
local: &Worker<MeliTask>,
@ -119,12 +123,47 @@ pub struct MeliTask {
#[derive(Clone, Debug)]
/// A spawned future's metadata for book-keeping.
pub struct JobMetadata {
pub id: JobId,
pub desc: Cow<'static, str>,
pub timer: bool,
pub started: UnixTimestamp,
pub finished: Option<UnixTimestamp>,
pub succeeded: bool,
id: JobId,
desc: Cow<'static, str>,
timer: bool,
started: UnixTimestamp,
finished: AtomicUnixTimestamp,
succeeded: bool,
}
impl JobMetadata {
pub fn id(&self) -> &JobId {
&self.id
}
pub fn description(&self) -> &str {
&self.desc
}
pub fn is_timer(&self) -> bool {
self.timer
}
pub fn started(&self) -> UnixTimestamp {
self.started
}
pub fn finished(&self) -> Option<UnixTimestamp> {
let value = self.finished.load(Ordering::SeqCst);
if value == 0 {
return None;
}
Some(value)
}
fn set_finished(&self, new_value: Option<UnixTimestamp>) {
let new_value = new_value.unwrap_or_default();
self.finished.store(new_value, Ordering::SeqCst);
}
pub fn succeeded(&self) -> bool {
self.succeeded
}
}
#[derive(Debug)]
@ -145,7 +184,7 @@ struct TimerPrivate {
value: Duration,
active: bool,
handle: Option<async_task::Task<()>>,
cancel: Arc<Mutex<bool>>,
cancel: Arc<AtomicBool>,
}
#[derive(Debug)]
@ -272,8 +311,8 @@ impl JobExecutor {
let finished_sender = self.sender.clone();
let job_id = JobId::new();
let injector = self.global_queue.clone();
let cancel = Arc::new(Mutex::new(false));
let cancel2 = cancel.clone();
let finished = Arc::new(AtomicU64::new(0));
let cancel = Arc::new(AtomicBool::new(false));
self.jobs.lock().unwrap().insert(
job_id,
@ -281,34 +320,39 @@ impl JobExecutor {
id: job_id,
desc: desc.clone(),
started: datetime::now(),
finished: None,
finished: finished.clone(),
succeeded: true,
timer: false,
},
);
// Create a task and schedule it for execution.
let (handle, task) = async_task::spawn(
async move {
let res = future.await;
let _ = sender.send(res);
finished_sender
.send(ThreadEvent::JobFinished(job_id))
.unwrap();
},
move |task| {
if *cancel.lock().unwrap() {
return;
}
let desc = desc.clone();
injector.push(MeliTask {
task,
id: job_id,
desc,
timer: false,
})
},
);
let (handle, task) = {
let cancel = cancel.clone();
let finished = finished.clone();
async_task::spawn(
async move {
let res = future.await;
let _ = sender.send(res);
finished.store(datetime::now(), Ordering::SeqCst);
finished_sender
.send(ThreadEvent::JobFinished(job_id))
.unwrap();
},
move |task| {
if cancel.load(Ordering::SeqCst) {
return;
}
let desc = desc.clone();
injector.push(MeliTask {
task,
id: job_id,
desc,
timer: false,
})
},
)
};
handle.schedule();
for unparker in self.parkers.iter() {
unparker.unpark();
@ -316,7 +360,8 @@ impl JobExecutor {
JoinHandle {
task: Arc::new(Mutex::new(Some(task))),
cancel: cancel2,
cancel,
finished,
chan: receiver,
job_id,
}
@ -339,7 +384,7 @@ impl JobExecutor {
pub fn create_timer(self: Arc<Self>, interval: Duration, value: Duration) -> Timer {
let timer = TimerPrivate {
interval,
cancel: Arc::new(Mutex::new(false)),
cancel: Arc::new(AtomicBool::new(false)),
value,
active: true,
handle: None,
@ -367,46 +412,48 @@ impl JobExecutor {
let sender = self.sender.clone();
let injector = self.global_queue.clone();
let timers = self.timers.clone();
let cancel = Arc::new(Mutex::new(false));
let cancel2 = cancel.clone();
let (task, handle) = async_task::spawn(
async move {
let mut value = value;
loop {
smol::Timer::after(value).await;
sender
.send(ThreadEvent::UIEvent(UIEvent::Timer(id)))
.unwrap();
if let Some(interval) = timers.lock().unwrap().get(&id).and_then(|timer| {
if timer.interval.as_millis() == 0 && timer.interval.as_secs() == 0 {
None
} else if timer.active {
Some(timer.interval)
let cancel = Arc::new(AtomicBool::new(false));
let (task, handle) = {
let cancel = cancel.clone();
async_task::spawn(
async move {
let mut value = value;
loop {
smol::Timer::after(value).await;
sender
.send(ThreadEvent::UIEvent(UIEvent::Timer(id)))
.unwrap();
if let Some(interval) = timers.lock().unwrap().get(&id).and_then(|timer| {
if timer.interval.as_millis() == 0 && timer.interval.as_secs() == 0 {
None
} else if timer.active {
Some(timer.interval)
} else {
None
}
}) {
value = interval;
} else {
None
break;
}
}) {
value = interval;
} else {
break;
}
}
},
move |task| {
if *cancel.lock().unwrap() {
return;
}
injector.push(MeliTask {
task,
id: job_id,
desc: "timer".into(),
timer: true,
})
},
);
},
move |task| {
if cancel.load(Ordering::SeqCst) {
return;
}
injector.push(MeliTask {
task,
id: job_id,
desc: Cow::Borrowed("timer"),
timer: true,
})
},
)
};
self.timers.lock().unwrap().entry(id).and_modify(|timer| {
timer.handle = Some(handle);
timer.cancel = cancel2;
timer.cancel = cancel;
timer.active = true;
});
task.schedule();
@ -419,7 +466,7 @@ impl JobExecutor {
let mut timers_lck = self.timers.lock().unwrap();
if let Some(timer) = timers_lck.get_mut(&id) {
timer.active = false;
*timer.cancel.lock().unwrap() = true;
timer.cancel.store(true, Ordering::SeqCst);
}
}
@ -432,7 +479,7 @@ impl JobExecutor {
pub fn set_job_finished(&self, id: JobId) {
self.jobs.lock().unwrap().entry(id).and_modify(|entry| {
entry.finished = Some(datetime::now());
entry.set_finished(Some(datetime::now()));
});
}
@ -450,20 +497,38 @@ pub type JobChannel<T> = oneshot::Receiver<T>;
pub struct JoinHandle<T> {
pub task: Arc<Mutex<Option<async_task::Task<()>>>>,
pub chan: JobChannel<T>,
pub cancel: Arc<Mutex<bool>>,
pub cancel: Arc<AtomicBool>,
finished: AtomicUnixTimestamp,
pub job_id: JobId,
}
impl<T> JoinHandle<T> {
pub fn cancel(&self) -> Option<StatusEvent> {
let mut lck = self.cancel.lock().unwrap();
if !*lck {
*lck = true;
let was_active = self.cancel.swap(true, Ordering::SeqCst);
if was_active {
self.set_finished(Some(datetime::now()));
Some(StatusEvent::JobCanceled(self.job_id))
} else {
None
}
}
pub fn finished(&self) -> Option<UnixTimestamp> {
let value = self.finished.load(Ordering::SeqCst);
if value == 0 {
return None;
}
Some(value)
}
fn set_finished(&self, new_value: Option<UnixTimestamp>) {
let new_value = new_value.unwrap_or_default();
self.finished.store(new_value, Ordering::SeqCst);
}
pub fn is_canceled(&self) -> bool {
self.cancel.load(Ordering::SeqCst)
}
}
impl<T> std::cmp::PartialEq<JobId> for JoinHandle<T> {

@ -117,23 +117,23 @@ impl JobManager {
self.length = entries.len();
entries.sort_by(|_, a, _, b| match (self.sort_col, self.sort_order) {
(Column::_0, SortOrder::Asc) => a.id.cmp(&b.id),
(Column::_0, SortOrder::Desc) => b.id.cmp(&b.id),
(Column::_1, SortOrder::Asc) => a.desc.cmp(&b.desc),
(Column::_1, SortOrder::Desc) => b.desc.cmp(&a.desc),
(Column::_2, SortOrder::Asc) => a.started.cmp(&b.started),
(Column::_2, SortOrder::Desc) => b.started.cmp(&a.started),
(Column::_3, SortOrder::Asc) => a.finished.cmp(&b.finished),
(Column::_3, SortOrder::Desc) => b.finished.cmp(&a.finished),
(Column::_4, SortOrder::Asc) if a.finished.is_some() && b.finished.is_some() => {
a.succeeded.cmp(&b.succeeded)
(Column::_0, SortOrder::Asc) => a.id().cmp(b.id()),
(Column::_0, SortOrder::Desc) => b.id().cmp(b.id()),
(Column::_1, SortOrder::Asc) => a.description().cmp(b.description()),
(Column::_1, SortOrder::Desc) => b.description().cmp(a.description()),
(Column::_2, SortOrder::Asc) => a.started().cmp(&b.started()),
(Column::_2, SortOrder::Desc) => b.started().cmp(&a.started()),
(Column::_3, SortOrder::Asc) => a.finished().cmp(&b.finished()),
(Column::_3, SortOrder::Desc) => b.finished().cmp(&a.finished()),
(Column::_4, SortOrder::Asc) if a.finished().is_some() && b.finished().is_some() => {
a.succeeded().cmp(&b.succeeded())
}
(Column::_4, SortOrder::Desc) if a.finished.is_some() && b.finished.is_some() => {
b.succeeded.cmp(&a.succeeded)
(Column::_4, SortOrder::Desc) if a.finished().is_some() && b.finished().is_some() => {
b.succeeded().cmp(&a.succeeded())
}
(Column::_4, SortOrder::Asc) if a.finished.is_none() => std::cmp::Ordering::Less,
(Column::_4, SortOrder::Asc) if a.finished().is_none() => std::cmp::Ordering::Less,
(Column::_4, SortOrder::Asc) => std::cmp::Ordering::Greater,
(Column::_4, SortOrder::Desc) if a.finished.is_none() => std::cmp::Ordering::Greater,
(Column::_4, SortOrder::Desc) if a.finished().is_none() => std::cmp::Ordering::Greater,
(Column::_4, SortOrder::Desc) => std::cmp::Ordering::Less,
});
self.entries = entries;
@ -147,9 +147,9 @@ impl JobManager {
for c in self.entries.values() {
// title
self.min_width[0] = self.min_width[0].max(c.id.to_string().len());
self.min_width[0] = self.min_width[0].max(c.id().to_string().len());
// desc
self.min_width[1] = self.min_width[1].max(c.desc.len());
self.min_width[1] = self.min_width[1].max(c.description().len());
}
self.min_width[2] = "1970-01-01 00:00:00".len();
self.min_width[3] = self.min_width[2];
@ -193,7 +193,7 @@ impl JobManager {
{
let area = self.data_columns.columns[0].area().nth_row(idx);
self.data_columns.columns[0].grid_mut().write_string(
&e.id.to_string(),
&e.id().to_string(),
self.theme_default.fg,
self.theme_default.bg,
self.theme_default.attrs,
@ -206,7 +206,7 @@ impl JobManager {
{
let area = self.data_columns.columns[1].area().nth_row(idx);
self.data_columns.columns[1].grid_mut().write_string(
&e.desc,
e.description(),
self.theme_default.fg,
self.theme_default.bg,
self.theme_default.attrs,
@ -220,7 +220,7 @@ impl JobManager {
let area = self.data_columns.columns[2].area().nth_row(idx);
self.data_columns.columns[2].grid_mut().write_string(
&datetime::timestamp_to_string(
e.started,
e.started(),
Some(RFC3339_DATETIME_AND_SPACE),
true,
),
@ -236,7 +236,7 @@ impl JobManager {
{
let area = self.data_columns.columns[3].area().nth_row(idx);
self.data_columns.columns[3].grid_mut().write_string(
&if let Some(t) = e.finished {
&if let Some(t) = e.finished() {
Cow::Owned(datetime::timestamp_to_string(
t,
Some(RFC3339_DATETIME_AND_SPACE),
@ -257,8 +257,8 @@ impl JobManager {
{
let area = self.data_columns.columns[4].area().nth_row(idx);
self.data_columns.columns[4].grid_mut().write_string(
&if e.finished.is_some() {
Cow::Owned(format!("{:?}", e.succeeded))
&if e.finished().is_some() {
Cow::Owned(format!("{:?}", e.succeeded()))
} else {
Cow::Borrowed("-")
},

Loading…
Cancel
Save