2020-06-26 15:31:37 +00:00
|
|
|
/*
|
|
|
|
* meli - jobs executor
|
|
|
|
*
|
|
|
|
* Copyright 2020 Manos Pitsidianakis
|
|
|
|
*
|
|
|
|
* This file is part of meli.
|
|
|
|
*
|
|
|
|
* meli is free software: you can redistribute it and/or modify
|
|
|
|
* it under the terms of the GNU General Public License as published by
|
|
|
|
* the Free Software Foundation, either version 3 of the License, or
|
|
|
|
* (at your option) any later version.
|
|
|
|
*
|
|
|
|
* meli is distributed in the hope that it will be useful,
|
|
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
* GNU General Public License for more details.
|
|
|
|
*
|
|
|
|
* You should have received a copy of the GNU General Public License
|
|
|
|
* along with meli. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
*/
|
|
|
|
|
2020-06-28 21:18:24 +00:00
|
|
|
use melib::error::Result;
|
2020-06-27 18:40:46 +00:00
|
|
|
use melib::smol;
|
2020-06-26 15:31:37 +00:00
|
|
|
use std::future::Future;
|
|
|
|
use std::panic::catch_unwind;
|
|
|
|
use std::pin::Pin;
|
|
|
|
use std::sync::Arc;
|
|
|
|
use std::task::{Context, Poll};
|
|
|
|
use std::thread;
|
2020-06-27 18:40:46 +00:00
|
|
|
use std::time::Duration;
|
2020-06-26 15:31:37 +00:00
|
|
|
use uuid::Uuid;
|
|
|
|
|
2020-06-27 18:40:46 +00:00
|
|
|
use crate::types::ThreadEvent;
|
2020-07-04 12:59:09 +00:00
|
|
|
use crossbeam::deque::{Injector, Stealer, Worker};
|
2020-06-27 18:40:46 +00:00
|
|
|
use crossbeam::sync::{Parker, Unparker};
|
2020-06-26 15:31:37 +00:00
|
|
|
use crossbeam::Sender;
|
2020-06-28 21:18:24 +00:00
|
|
|
pub use futures::channel::oneshot;
|
2020-06-27 18:40:46 +00:00
|
|
|
use std::iter;
|
2020-06-26 15:31:37 +00:00
|
|
|
|
|
|
|
type AsyncTask = async_task::Task<()>;
|
|
|
|
|
2020-06-27 18:40:46 +00:00
|
|
|
fn find_task<T>(local: &Worker<T>, global: &Injector<T>, stealers: &[Stealer<T>]) -> Option<T> {
|
2020-06-26 15:31:37 +00:00
|
|
|
// Pop a task from the local queue, if not empty.
|
|
|
|
local.pop().or_else(|| {
|
|
|
|
// Otherwise, we need to look for a task elsewhere.
|
|
|
|
iter::repeat_with(|| {
|
|
|
|
// Try stealing a batch of tasks from the global queue.
|
2020-06-27 18:40:46 +00:00
|
|
|
global
|
|
|
|
.steal_batch_and_pop(local)
|
2020-06-26 15:31:37 +00:00
|
|
|
// Or try stealing a task from one of the other threads.
|
|
|
|
.or_else(|| stealers.iter().map(|s| s.steal()).collect())
|
|
|
|
})
|
|
|
|
// Loop while no task was stolen and any steal operation needs to be retried.
|
|
|
|
.find(|s| !s.is_retry())
|
|
|
|
// Extract the stolen task, if there is one.
|
|
|
|
.and_then(|s| s.success())
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
macro_rules! uuid_hash_type {
|
|
|
|
($n:ident) => {
|
2020-07-07 21:26:40 +00:00
|
|
|
#[derive(
|
|
|
|
PartialEq, Hash, Eq, Copy, Clone, Ord, PartialOrd, Serialize, Deserialize, Default,
|
|
|
|
)]
|
2020-06-26 15:31:37 +00:00
|
|
|
pub struct $n(Uuid);
|
|
|
|
|
|
|
|
impl core::fmt::Debug for $n {
|
|
|
|
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
|
|
|
|
write!(f, "{}", self.0.to_string())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl core::fmt::Display for $n {
|
|
|
|
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
|
|
|
|
write!(f, "{}", self.0.to_string())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl $n {
|
Add smtp client support for sending mail in UI
`mailer_command` was removed, and a new setting `send_mail` was added.
Its possible values are a string, consisting of a shell command to
execute, or settings to configure an smtp server connection. The
configuration I used for testing this is:
[composing]
send_mail = { hostname = "smtp.mail.tld", port = 587, auth = { type = "auto", username = "yoshi", password = { type = "command_eval", value = "gpg2 --no-tty -q -d ~/.passwords/msmtp/yoshi.gpg" } }, security = { type = "STARTTLS" } }
For local smtp server:
[composing]
send_mail = { hostname = "localhost", port = 25, auth = { type = "none" }, security = { type = "none" } }
2020-07-15 11:38:43 +00:00
|
|
|
pub fn new() -> Self {
|
2020-06-26 15:31:37 +00:00
|
|
|
$n(Uuid::new_v4())
|
|
|
|
}
|
|
|
|
pub fn null() -> Self {
|
|
|
|
$n(Uuid::nil())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
}
|
|
|
|
uuid_hash_type!(JobId);
|
|
|
|
|
|
|
|
/// A spawned future and its current state.
|
|
|
|
pub struct MeliTask {
|
|
|
|
task: AsyncTask,
|
|
|
|
id: JobId,
|
|
|
|
}
|
|
|
|
|
2020-06-27 18:40:46 +00:00
|
|
|
#[derive(Debug)]
|
2020-06-26 15:31:37 +00:00
|
|
|
pub struct JobExecutor {
|
|
|
|
active_jobs: Vec<JobId>,
|
|
|
|
global_queue: Arc<Injector<MeliTask>>,
|
|
|
|
workers: Vec<Stealer<MeliTask>>,
|
|
|
|
sender: Sender<ThreadEvent>,
|
|
|
|
parkers: Vec<Unparker>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl JobExecutor {
|
2020-06-27 18:40:46 +00:00
|
|
|
/// A queue that holds scheduled tasks.
|
2020-06-26 15:31:37 +00:00
|
|
|
pub fn new(sender: Sender<ThreadEvent>) -> Self {
|
2020-06-27 18:40:46 +00:00
|
|
|
// Create a queue.
|
2020-06-26 15:31:37 +00:00
|
|
|
let mut ret = JobExecutor {
|
|
|
|
active_jobs: vec![],
|
|
|
|
global_queue: Arc::new(Injector::new()),
|
|
|
|
workers: vec![],
|
2020-06-27 18:40:46 +00:00
|
|
|
parkers: vec![],
|
2020-06-26 15:31:37 +00:00
|
|
|
sender,
|
|
|
|
};
|
|
|
|
let mut workers = vec![];
|
2020-06-27 18:40:46 +00:00
|
|
|
for _ in 0..num_cpus::get().max(1) {
|
|
|
|
let new_worker = Worker::new_fifo();
|
|
|
|
ret.workers.push(new_worker.stealer());
|
|
|
|
let p = Parker::new();
|
|
|
|
ret.parkers.push(p.unparker().clone());
|
|
|
|
workers.push((new_worker, p));
|
|
|
|
}
|
2020-06-26 15:31:37 +00:00
|
|
|
|
2020-06-27 18:40:46 +00:00
|
|
|
// Reactor thread
|
2020-06-26 15:31:37 +00:00
|
|
|
thread::Builder::new()
|
2020-06-27 18:40:46 +00:00
|
|
|
.name("meli-reactor".to_string())
|
|
|
|
.spawn(move || {
|
|
|
|
smol::run(futures::future::pending::<()>());
|
2020-06-28 12:39:33 +00:00
|
|
|
})
|
|
|
|
.unwrap();
|
2020-06-27 18:40:46 +00:00
|
|
|
|
|
|
|
// Spawn executor threads the first time the queue is created.
|
|
|
|
for (i, (local, parker)) in workers.into_iter().enumerate() {
|
|
|
|
let global = ret.global_queue.clone();
|
|
|
|
let stealers = ret.workers.clone();
|
|
|
|
thread::Builder::new()
|
|
|
|
.name(format!("meli-executor-{}", i))
|
|
|
|
.spawn(move || loop {
|
|
|
|
parker.park_timeout(Duration::from_millis(100));
|
|
|
|
let task = find_task(&local, &global, stealers.as_slice());
|
|
|
|
if let Some(meli_task) = task {
|
|
|
|
let MeliTask { task, id } = meli_task;
|
|
|
|
debug!("Worker {} got task {:?}", i, id);
|
2020-06-28 12:39:33 +00:00
|
|
|
let _ = catch_unwind(|| task.run());
|
2020-07-05 10:22:48 +00:00
|
|
|
debug!("Worker {} returned after {:?}", i, id);
|
2020-06-26 15:31:37 +00:00
|
|
|
}
|
2020-06-28 12:39:33 +00:00
|
|
|
})
|
|
|
|
.unwrap();
|
2020-06-27 18:40:46 +00:00
|
|
|
}
|
|
|
|
ret
|
2020-06-26 15:31:37 +00:00
|
|
|
}
|
|
|
|
/// Spawns a future on the executor.
|
2020-06-28 21:18:24 +00:00
|
|
|
pub fn spawn<F>(&self, future: F) -> (JoinHandle, JobId)
|
2020-06-26 15:31:37 +00:00
|
|
|
where
|
2020-06-28 21:18:24 +00:00
|
|
|
F: Future<Output = Result<()>> + Send + 'static,
|
2020-06-26 15:31:37 +00:00
|
|
|
{
|
|
|
|
let job_id = JobId::new();
|
2020-07-13 15:49:27 +00:00
|
|
|
let _job_id = job_id;
|
|
|
|
let __job_id = job_id;
|
2020-06-27 18:40:46 +00:00
|
|
|
let finished_sender = self.sender.clone();
|
2020-06-26 15:31:37 +00:00
|
|
|
let injector = self.global_queue.clone();
|
|
|
|
// Create a task and schedule it for execution.
|
2020-06-27 18:40:46 +00:00
|
|
|
let (task, handle) = async_task::spawn(
|
|
|
|
async move {
|
2020-06-28 21:18:24 +00:00
|
|
|
let r = future.await;
|
2020-06-27 18:40:46 +00:00
|
|
|
finished_sender
|
|
|
|
.send(ThreadEvent::JobFinished(__job_id))
|
|
|
|
.unwrap();
|
2020-06-28 21:18:24 +00:00
|
|
|
r
|
2020-06-27 18:40:46 +00:00
|
|
|
},
|
|
|
|
move |task| injector.push(MeliTask { task, id: _job_id }),
|
|
|
|
(),
|
|
|
|
);
|
2020-06-26 15:31:37 +00:00
|
|
|
task.schedule();
|
|
|
|
for unparker in self.parkers.iter() {
|
2020-06-27 18:40:46 +00:00
|
|
|
unparker.unpark();
|
2020-06-26 15:31:37 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Return a join handle that retrieves the output of the future.
|
2020-06-28 21:18:24 +00:00
|
|
|
(JoinHandle(handle), job_id)
|
2020-06-26 15:31:37 +00:00
|
|
|
}
|
|
|
|
|
2020-06-27 18:40:46 +00:00
|
|
|
///// Spawns a future on the executor.
|
2020-07-15 08:02:53 +00:00
|
|
|
pub fn spawn_specialized<F, R>(&self, future: F) -> (oneshot::Receiver<R>, JoinHandle, JobId)
|
2020-06-27 18:40:46 +00:00
|
|
|
where
|
|
|
|
F: Future<Output = R> + Send + 'static,
|
2020-06-28 12:39:33 +00:00
|
|
|
R: Send + 'static,
|
2020-06-27 18:40:46 +00:00
|
|
|
{
|
|
|
|
let (sender, receiver) = oneshot::channel();
|
|
|
|
let finished_sender = self.sender.clone();
|
2020-06-26 15:31:37 +00:00
|
|
|
let job_id = JobId::new();
|
2020-07-13 15:49:27 +00:00
|
|
|
let _job_id = job_id;
|
|
|
|
let __job_id = job_id;
|
2020-06-26 15:31:37 +00:00
|
|
|
let injector = self.global_queue.clone();
|
|
|
|
// Create a task and schedule it for execution.
|
2020-07-15 08:02:53 +00:00
|
|
|
let (task, handle) = async_task::spawn(
|
2020-06-27 18:40:46 +00:00
|
|
|
async move {
|
|
|
|
let res = future.await;
|
2020-06-28 12:39:33 +00:00
|
|
|
let _ = sender.send(res);
|
2020-06-27 18:40:46 +00:00
|
|
|
finished_sender
|
|
|
|
.send(ThreadEvent::JobFinished(__job_id))
|
|
|
|
.unwrap();
|
2020-07-15 08:02:53 +00:00
|
|
|
Ok(())
|
2020-06-27 18:40:46 +00:00
|
|
|
},
|
|
|
|
move |task| injector.push(MeliTask { task, id: _job_id }),
|
|
|
|
(),
|
|
|
|
);
|
2020-06-26 15:31:37 +00:00
|
|
|
task.schedule();
|
|
|
|
for unparker in self.parkers.iter() {
|
2020-06-27 18:40:46 +00:00
|
|
|
unparker.unpark();
|
2020-06-26 15:31:37 +00:00
|
|
|
}
|
|
|
|
|
2020-07-15 08:02:53 +00:00
|
|
|
(receiver, JoinHandle(handle), job_id)
|
2020-06-27 18:40:46 +00:00
|
|
|
}
|
2020-07-16 19:53:16 +00:00
|
|
|
|
|
|
|
pub fn spawn_blocking<F, R>(&self, future: F) -> (oneshot::Receiver<R>, JoinHandle, JobId)
|
|
|
|
where
|
|
|
|
F: Future<Output = R> + Send + 'static,
|
|
|
|
R: Send + 'static,
|
|
|
|
{
|
|
|
|
self.spawn_specialized(smol::Task::blocking(async move { future.await }))
|
|
|
|
}
|
2020-06-26 15:31:37 +00:00
|
|
|
}
|
|
|
|
|
Add smtp client support for sending mail in UI
`mailer_command` was removed, and a new setting `send_mail` was added.
Its possible values are a string, consisting of a shell command to
execute, or settings to configure an smtp server connection. The
configuration I used for testing this is:
[composing]
send_mail = { hostname = "smtp.mail.tld", port = 587, auth = { type = "auto", username = "yoshi", password = { type = "command_eval", value = "gpg2 --no-tty -q -d ~/.passwords/msmtp/yoshi.gpg" } }, security = { type = "STARTTLS" } }
For local smtp server:
[composing]
send_mail = { hostname = "localhost", port = 25, auth = { type = "none" }, security = { type = "none" } }
2020-07-15 11:38:43 +00:00
|
|
|
pub type JobChannel<T> = oneshot::Receiver<Result<T>>;
|
|
|
|
|
2020-06-26 15:31:37 +00:00
|
|
|
///// Spawns a future on the executor.
|
|
|
|
//fn spawn<F, R>(future: F) -> JoinHandle<R>
|
|
|
|
//where
|
|
|
|
// F: Future<Output = R> + Send + 'static,
|
|
|
|
// R: Send + 'static,
|
|
|
|
//{
|
|
|
|
// // Create a task and schedule it for execution.
|
|
|
|
// let (task, handle) = async_task::spawn(future, |t| QUEUE.send(t).unwrap(), ());
|
|
|
|
// task.schedule();
|
|
|
|
//
|
|
|
|
// // Return a join handle that retrieves the output of the future.
|
|
|
|
// JoinHandle(handle)
|
|
|
|
//}
|
|
|
|
|
2020-06-28 21:18:24 +00:00
|
|
|
#[derive(Debug)]
|
2020-06-26 15:31:37 +00:00
|
|
|
/// Awaits the output of a spawned future.
|
2020-06-28 21:18:24 +00:00
|
|
|
pub struct JoinHandle(pub async_task::JoinHandle<Result<()>, ()>);
|
2020-06-26 15:31:37 +00:00
|
|
|
|
|
|
|
impl Future for JoinHandle {
|
2020-06-28 21:18:24 +00:00
|
|
|
type Output = Result<()>;
|
2020-06-26 15:31:37 +00:00
|
|
|
|
|
|
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
|
|
match Pin::new(&mut self.0).poll(cx) {
|
|
|
|
Poll::Pending => Poll::Pending,
|
|
|
|
Poll::Ready(output) => Poll::Ready(output.expect("task failed")),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
fn _test() {
|
|
|
|
let executor = JobExecutor::new();
|
|
|
|
futures::executor::block_on(async {
|
|
|
|
// Spawn a future.
|
|
|
|
let handle = executor.spawn(async {
|
|
|
|
println!("Running task...");
|
|
|
|
panic!();
|
|
|
|
});
|
|
|
|
|
|
|
|
// Await its output.
|
|
|
|
handle.await;
|
|
|
|
});
|
|
|
|
}
|
|
|
|
*/
|