/*
* meli - jobs executor
*
* Copyright 2020 Manos Pitsidianakis
*
* This file is part of meli.
*
* meli is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* meli is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with meli. If not, see .
*/
//! Async job executor thread pool
use std::{
borrow::Cow,
future::Future,
iter,
panic::catch_unwind,
sync::{Arc, Mutex},
thread,
time::Duration,
};
use crossbeam::{
channel::Sender,
deque::{Injector, Stealer, Worker},
sync::{Parker, Unparker},
};
pub use futures::channel::oneshot;
use indexmap::IndexMap;
use melib::{log, smol, utils::datetime, uuid::Uuid, UnixTimestamp};
use crate::types::{StatusEvent, ThreadEvent, UIEvent};
type AsyncTask = async_task::Runnable;
fn find_task(
local: &Worker,
global: &Injector,
stealers: &[Stealer],
) -> Option {
// Pop a task from the local queue, if not empty.
local.pop().or_else(|| {
// Otherwise, we need to look for a task elsewhere.
iter::repeat_with(|| {
// Try stealing a batch of tasks from the global queue.
global
.steal_batch_and_pop(local)
// Or try stealing a task from one of the other threads.
.or_else(|| stealers.iter().map(|s| s.steal()).collect())
})
// Loop while no task was stolen and any steal operation needs to be retried.
.find(|s| !s.is_retry())
// Extract the stolen task, if there is one.
.and_then(|s| s.success())
})
}
macro_rules! uuid_hash_type {
($n:ident) => {
#[derive(PartialEq, Hash, Eq, Copy, Clone, Ord, PartialOrd, Serialize, Deserialize)]
pub struct $n(Uuid);
impl std::fmt::Debug for $n {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self.0.to_string())
}
}
impl std::fmt::Display for $n {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self.0.to_string())
}
}
impl Default for $n {
fn default() -> Self {
Self::new()
}
}
impl $n {
pub fn new() -> Self {
Self(Uuid::new_v4())
}
pub fn null() -> Self {
Self(Uuid::nil())
}
}
};
}
uuid_hash_type!(JobId);
uuid_hash_type!(TimerId);
/// A spawned future and its current state.
pub struct MeliTask {
task: AsyncTask,
id: JobId,
desc: Cow<'static, str>,
timer: bool,
}
#[derive(Clone, Debug)]
/// A spawned future's metadata for book-keeping.
pub struct JobMetadata {
pub id: JobId,
pub desc: Cow<'static, str>,
pub timer: bool,
pub started: UnixTimestamp,
pub finished: Option,
pub succeeded: bool,
}
#[derive(Debug)]
pub struct JobExecutor {
global_queue: Arc>,
workers: Vec>,
sender: Sender,
parkers: Vec,
timers: Arc>>,
pub jobs: Arc>>,
}
#[derive(Debug, Default)]
struct TimerPrivate {
/// Interval for periodic timer.
interval: Duration,
/// Time until next expiration.
value: Duration,
active: bool,
handle: Option>,
cancel: Arc>,
}
#[derive(Debug)]
pub struct Timer {
id: TimerId,
job_executor: Arc,
}
impl Timer {
pub fn id(&self) -> TimerId {
self.id
}
pub fn rearm(&self) {
self.job_executor.rearm(self.id);
}
pub fn disable(&self) {
self.job_executor.disable_timer(self.id);
}
pub fn set_interval(&self, new_val: Duration) {
self.job_executor.set_interval(self.id, new_val);
}
}
impl Drop for Timer {
fn drop(&mut self) {
self.disable();
}
}
impl JobExecutor {
/// A queue that holds scheduled tasks.
pub fn new(sender: Sender) -> Self {
// Create a queue.
let mut ret = Self {
global_queue: Arc::new(Injector::new()),
workers: vec![],
parkers: vec![],
sender,
timers: Arc::new(Mutex::new(IndexMap::default())),
jobs: Arc::new(Mutex::new(IndexMap::default())),
};
let mut workers = vec![];
for _ in 0..num_cpus::get().max(1) {
let new_worker = Worker::new_fifo();
ret.workers.push(new_worker.stealer());
let p = Parker::new();
ret.parkers.push(p.unparker().clone());
workers.push((new_worker, p));
}
// Reactor thread
thread::Builder::new()
.name("meli-reactor".to_string())
.spawn(move || {
let ex = smol::Executor::new();
futures::executor::block_on(ex.run(futures::future::pending::<()>()));
})
.unwrap();
// Spawn executor threads the first time the queue is created.
for (i, (local, parker)) in workers.into_iter().enumerate() {
let global = ret.global_queue.clone();
let stealers = ret.workers.clone();
thread::Builder::new()
.name(format!("meli-executor-{}", i))
.spawn(move || loop {
parker.park_timeout(Duration::from_millis(100));
let task = find_task(&local, &global, stealers.as_slice());
if let Some(meli_task) = task {
let MeliTask {
task,
id,
timer,
desc,
} = meli_task;
if !timer {
log::trace!("Worker {} got task {:?} {:?}", i, desc, id);
}
let _ = catch_unwind(|| task.run());
if !timer {
log::trace!("Worker {} returned after {:?} {:?}", i, desc, id);
}
}
})
.unwrap();
}
ret
}
/// Spawns a future with a generic return value `R`
#[inline(always)]
pub fn spawn_specialized(&self, desc: Cow<'static, str>, future: F) -> JoinHandle
where
F: Future