mirror of https://git.meli.delivery/meli/meli
You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
388 lines
12 KiB
Rust
388 lines
12 KiB
Rust
/*
|
|
* meli - jobs executor
|
|
*
|
|
* Copyright 2020 Manos Pitsidianakis
|
|
*
|
|
* This file is part of meli.
|
|
*
|
|
* meli is free software: you can redistribute it and/or modify
|
|
* it under the terms of the GNU General Public License as published by
|
|
* the Free Software Foundation, either version 3 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* meli is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU General Public License
|
|
* along with meli. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
//! Async job executor thread pool
|
|
|
|
use melib::smol;
|
|
use melib::uuid::Uuid;
|
|
use std::collections::HashMap;
|
|
use std::future::Future;
|
|
use std::panic::catch_unwind;
|
|
use std::sync::{Arc, Mutex};
|
|
use std::thread;
|
|
use std::time::Duration;
|
|
|
|
use crate::types::{ThreadEvent, UIEvent};
|
|
use crossbeam::channel::Sender;
|
|
use crossbeam::deque::{Injector, Stealer, Worker};
|
|
use crossbeam::sync::{Parker, Unparker};
|
|
pub use futures::channel::oneshot;
|
|
use std::iter;
|
|
|
|
type AsyncTask = async_task::Runnable;
|
|
|
|
fn find_task<T>(local: &Worker<T>, global: &Injector<T>, stealers: &[Stealer<T>]) -> Option<T> {
|
|
// Pop a task from the local queue, if not empty.
|
|
local.pop().or_else(|| {
|
|
// Otherwise, we need to look for a task elsewhere.
|
|
iter::repeat_with(|| {
|
|
// Try stealing a batch of tasks from the global queue.
|
|
global
|
|
.steal_batch_and_pop(local)
|
|
// Or try stealing a task from one of the other threads.
|
|
.or_else(|| stealers.iter().map(|s| s.steal()).collect())
|
|
})
|
|
// Loop while no task was stolen and any steal operation needs to be retried.
|
|
.find(|s| !s.is_retry())
|
|
// Extract the stolen task, if there is one.
|
|
.and_then(|s| s.success())
|
|
})
|
|
}
|
|
|
|
macro_rules! uuid_hash_type {
|
|
($n:ident) => {
|
|
#[derive(
|
|
PartialEq, Hash, Eq, Copy, Clone, Ord, PartialOrd, Serialize, Deserialize, Default,
|
|
)]
|
|
pub struct $n(Uuid);
|
|
|
|
impl core::fmt::Debug for $n {
|
|
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
|
|
write!(f, "{}", self.0.to_string())
|
|
}
|
|
}
|
|
|
|
impl core::fmt::Display for $n {
|
|
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
|
|
write!(f, "{}", self.0.to_string())
|
|
}
|
|
}
|
|
|
|
impl $n {
|
|
pub fn new() -> Self {
|
|
$n(Uuid::new_v4())
|
|
}
|
|
pub fn null() -> Self {
|
|
$n(Uuid::nil())
|
|
}
|
|
}
|
|
};
|
|
}
|
|
uuid_hash_type!(JobId);
|
|
|
|
/// A spawned future and its current state.
|
|
pub struct MeliTask {
|
|
task: AsyncTask,
|
|
id: JobId,
|
|
timer: bool,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub struct JobExecutor {
|
|
global_queue: Arc<Injector<MeliTask>>,
|
|
workers: Vec<Stealer<MeliTask>>,
|
|
sender: Sender<ThreadEvent>,
|
|
parkers: Vec<Unparker>,
|
|
timers: Arc<Mutex<HashMap<Uuid, TimerPrivate>>>,
|
|
}
|
|
|
|
#[derive(Debug, Default)]
|
|
struct TimerPrivate {
|
|
/// Interval for periodic timer.
|
|
interval: Duration,
|
|
/// Time until next expiration.
|
|
value: Duration,
|
|
active: bool,
|
|
handle: Option<async_task::Task<()>>,
|
|
cancel: Arc<Mutex<bool>>,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub struct Timer {
|
|
id: Uuid,
|
|
job_executor: Arc<JobExecutor>,
|
|
}
|
|
|
|
impl Timer {
|
|
pub fn id(&self) -> Uuid {
|
|
self.id
|
|
}
|
|
|
|
pub fn rearm(&self) {
|
|
self.job_executor.rearm(self.id);
|
|
}
|
|
|
|
pub fn disable(&self) {
|
|
self.job_executor.disable_timer(self.id);
|
|
}
|
|
|
|
pub fn set_interval(&self, new_val: Duration) {
|
|
self.job_executor.set_interval(self.id, new_val);
|
|
}
|
|
}
|
|
|
|
impl Drop for Timer {
|
|
fn drop(&mut self) {
|
|
self.disable();
|
|
}
|
|
}
|
|
|
|
impl JobExecutor {
|
|
/// A queue that holds scheduled tasks.
|
|
pub fn new(sender: Sender<ThreadEvent>) -> Self {
|
|
// Create a queue.
|
|
let mut ret = JobExecutor {
|
|
global_queue: Arc::new(Injector::new()),
|
|
workers: vec![],
|
|
parkers: vec![],
|
|
sender,
|
|
timers: Arc::new(Mutex::new(HashMap::default())),
|
|
};
|
|
let mut workers = vec![];
|
|
for _ in 0..num_cpus::get().max(1) {
|
|
let new_worker = Worker::new_fifo();
|
|
ret.workers.push(new_worker.stealer());
|
|
let p = Parker::new();
|
|
ret.parkers.push(p.unparker().clone());
|
|
workers.push((new_worker, p));
|
|
}
|
|
|
|
// Reactor thread
|
|
thread::Builder::new()
|
|
.name("meli-reactor".to_string())
|
|
.spawn(move || {
|
|
let ex = smol::Executor::new();
|
|
|
|
futures::executor::block_on(ex.run(futures::future::pending::<()>()));
|
|
})
|
|
.unwrap();
|
|
|
|
// Spawn executor threads the first time the queue is created.
|
|
for (i, (local, parker)) in workers.into_iter().enumerate() {
|
|
let global = ret.global_queue.clone();
|
|
let stealers = ret.workers.clone();
|
|
thread::Builder::new()
|
|
.name(format!("meli-executor-{}", i))
|
|
.spawn(move || loop {
|
|
parker.park_timeout(Duration::from_millis(100));
|
|
let task = find_task(&local, &global, stealers.as_slice());
|
|
if let Some(meli_task) = task {
|
|
let MeliTask { task, id, timer } = meli_task;
|
|
if !timer {
|
|
debug!("Worker {} got task {:?}", i, id);
|
|
}
|
|
let _ = catch_unwind(|| task.run());
|
|
if !timer {
|
|
debug!("Worker {} returned after {:?}", i, id);
|
|
}
|
|
}
|
|
})
|
|
.unwrap();
|
|
}
|
|
ret
|
|
}
|
|
|
|
/// Spawns a future with a generic return value `R`
|
|
pub fn spawn_specialized<F, R>(&self, future: F) -> JoinHandle<R>
|
|
where
|
|
F: Future<Output = R> + Send + 'static,
|
|
R: Send + 'static,
|
|
{
|
|
let (sender, receiver) = oneshot::channel();
|
|
let finished_sender = self.sender.clone();
|
|
let job_id = JobId::new();
|
|
let injector = self.global_queue.clone();
|
|
let cancel = Arc::new(Mutex::new(false));
|
|
let cancel2 = cancel.clone();
|
|
// Create a task and schedule it for execution.
|
|
let (handle, task) = async_task::spawn(
|
|
async move {
|
|
let res = future.await;
|
|
let _ = sender.send(res);
|
|
finished_sender
|
|
.send(ThreadEvent::JobFinished(job_id))
|
|
.unwrap();
|
|
},
|
|
move |task| {
|
|
if *cancel.lock().unwrap() {
|
|
return;
|
|
}
|
|
injector.push(MeliTask {
|
|
task,
|
|
id: job_id,
|
|
timer: false,
|
|
})
|
|
},
|
|
);
|
|
handle.schedule();
|
|
for unparker in self.parkers.iter() {
|
|
unparker.unpark();
|
|
}
|
|
|
|
JoinHandle {
|
|
task: Arc::new(Mutex::new(Some(task))),
|
|
cancel: cancel2,
|
|
chan: receiver,
|
|
job_id,
|
|
}
|
|
}
|
|
|
|
/// Spawns a future with a generic return value `R` that might block on a new thread
|
|
pub fn spawn_blocking<F, R>(&self, future: F) -> JoinHandle<R>
|
|
where
|
|
F: Future<Output = R> + Send + 'static,
|
|
R: Send + 'static,
|
|
{
|
|
self.spawn_specialized(smol::unblock(move || futures::executor::block_on(future)))
|
|
}
|
|
|
|
pub fn create_timer(self: Arc<JobExecutor>, interval: Duration, value: Duration) -> Timer {
|
|
let id = Uuid::new_v4();
|
|
let timer = TimerPrivate {
|
|
interval,
|
|
cancel: Arc::new(Mutex::new(false)),
|
|
value,
|
|
active: true,
|
|
handle: None,
|
|
};
|
|
self.timers.lock().unwrap().insert(id, timer);
|
|
self.arm_timer(id, value);
|
|
Timer {
|
|
id,
|
|
job_executor: self,
|
|
}
|
|
}
|
|
|
|
pub fn rearm(&self, timer_id: Uuid) {
|
|
let mut timers_lck = self.timers.lock().unwrap();
|
|
if let Some(timer) = timers_lck.get_mut(&timer_id) {
|
|
let value = timer.value;
|
|
drop(timers_lck);
|
|
self.arm_timer(timer_id, value);
|
|
}
|
|
}
|
|
|
|
fn arm_timer(&self, id: Uuid, value: Duration) {
|
|
let job_id = JobId::new();
|
|
let sender = self.sender.clone();
|
|
let injector = self.global_queue.clone();
|
|
let timers = self.timers.clone();
|
|
let cancel = Arc::new(Mutex::new(false));
|
|
let cancel2 = cancel.clone();
|
|
let (task, handle) = async_task::spawn(
|
|
async move {
|
|
let mut value = value;
|
|
loop {
|
|
smol::Timer::after(value).await;
|
|
sender
|
|
.send(ThreadEvent::UIEvent(UIEvent::Timer(id)))
|
|
.unwrap();
|
|
if let Some(interval) = timers.lock().unwrap().get(&id).and_then(|timer| {
|
|
if timer.interval.as_millis() == 0 && timer.interval.as_secs() == 0 {
|
|
None
|
|
} else if timer.active {
|
|
Some(timer.interval)
|
|
} else {
|
|
None
|
|
}
|
|
}) {
|
|
value = interval;
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
},
|
|
move |task| {
|
|
if *cancel.lock().unwrap() {
|
|
return;
|
|
}
|
|
injector.push(MeliTask {
|
|
task,
|
|
id: job_id,
|
|
timer: true,
|
|
})
|
|
},
|
|
);
|
|
self.timers.lock().unwrap().entry(id).and_modify(|timer| {
|
|
timer.handle = Some(handle);
|
|
timer.cancel = cancel2;
|
|
timer.active = true;
|
|
});
|
|
task.schedule();
|
|
for unparker in self.parkers.iter() {
|
|
unparker.unpark();
|
|
}
|
|
}
|
|
|
|
fn disable_timer(&self, id: Uuid) {
|
|
let mut timers_lck = self.timers.lock().unwrap();
|
|
if let Some(timer) = timers_lck.get_mut(&id) {
|
|
timer.active = false;
|
|
*timer.cancel.lock().unwrap() = true;
|
|
}
|
|
}
|
|
|
|
fn set_interval(&self, id: Uuid, new_val: Duration) {
|
|
let mut timers_lck = self.timers.lock().unwrap();
|
|
if let Some(timer) = timers_lck.get_mut(&id) {
|
|
timer.interval = new_val;
|
|
}
|
|
}
|
|
}
|
|
|
|
pub type JobChannel<T> = oneshot::Receiver<T>;
|
|
|
|
#[derive(Debug)]
|
|
/// JoinHandle for the future that allows us to cancel the task.
|
|
pub struct JoinHandle<T> {
|
|
pub task: Arc<Mutex<Option<async_task::Task<()>>>>,
|
|
pub chan: JobChannel<T>,
|
|
pub cancel: Arc<Mutex<bool>>,
|
|
pub job_id: JobId,
|
|
}
|
|
|
|
impl<T> JoinHandle<T> {
|
|
pub fn cancel(&self) {
|
|
*self.cancel.lock().unwrap() = true;
|
|
}
|
|
}
|
|
|
|
impl<T> std::cmp::PartialEq<JobId> for JoinHandle<T> {
|
|
fn eq(&self, other: &JobId) -> bool {
|
|
self.job_id == *other
|
|
}
|
|
}
|
|
|
|
/*
|
|
use std::pin::Pin;
|
|
use std::task::{Context, Poll};
|
|
impl Future for JoinHandle {
|
|
type Output = Result<()>;
|
|
|
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
match Pin::new(&mut self.inner).poll(cx) {
|
|
Poll::Pending => Poll::Pending,
|
|
Poll::Ready(output) => Poll::Ready(output.expect("task failed")),
|
|
}
|
|
}
|
|
}
|
|
*/
|