mirror of
https://git.meli.delivery/meli/meli
synced 2024-10-30 21:20:34 +00:00
melib: make Work use FnOnce closures
There was no need to use Fn() instead of FnOnce()
This commit is contained in:
parent
8de5a9412d
commit
21526b5faf
@ -37,7 +37,6 @@ use crossbeam::{
|
|||||||
select,
|
select,
|
||||||
};
|
};
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct WorkContext {
|
pub struct WorkContext {
|
||||||
@ -47,11 +46,10 @@ pub struct WorkContext {
|
|||||||
pub finished: Sender<std::thread::ThreadId>,
|
pub finished: Sender<std::thread::ThreadId>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct Work {
|
pub struct Work {
|
||||||
priority: u64,
|
priority: u64,
|
||||||
pub is_static: bool,
|
pub is_static: bool,
|
||||||
pub closure: Arc<Box<dyn Fn(WorkContext) -> () + Send + Sync>>,
|
pub closure: Box<dyn FnOnce(WorkContext) -> () + Send + Sync>,
|
||||||
name: String,
|
name: String,
|
||||||
status: String,
|
status: String,
|
||||||
}
|
}
|
||||||
@ -77,7 +75,7 @@ impl PartialEq for Work {
|
|||||||
impl Eq for Work {}
|
impl Eq for Work {}
|
||||||
|
|
||||||
impl Work {
|
impl Work {
|
||||||
pub fn compute(&self, work_context: WorkContext) {
|
pub fn compute(self, work_context: WorkContext) {
|
||||||
(self.closure)(work_context);
|
(self.closure)(work_context);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -118,9 +116,9 @@ pub struct AsyncBuilder<T: Send + Sync> {
|
|||||||
is_static: bool,
|
is_static: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Async<T: Send + Sync> {
|
pub struct Async<T: Send + Sync> {
|
||||||
work: Work,
|
work: Option<Work>,
|
||||||
active: bool,
|
active: bool,
|
||||||
tx: Sender<AsyncStatus<T>>,
|
tx: Sender<AsyncStatus<T>>,
|
||||||
rx: Receiver<AsyncStatus<T>>,
|
rx: Receiver<AsyncStatus<T>>,
|
||||||
@ -165,15 +163,15 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Returns an `Async<T>` object that contains a `Thread` join handle that returns a `T`
|
/// Returns an `Async<T>` object that contains a `Thread` join handle that returns a `T`
|
||||||
pub fn build(self, work: Box<dyn Fn(WorkContext) -> () + Send + Sync>) -> Async<T> {
|
pub fn build(self, work: Box<dyn FnOnce(WorkContext) -> () + Send + Sync>) -> Async<T> {
|
||||||
Async {
|
Async {
|
||||||
work: Work {
|
work: Some(Work {
|
||||||
priority: self.priority,
|
priority: self.priority,
|
||||||
is_static: self.is_static,
|
is_static: self.is_static,
|
||||||
closure: Arc::new(work),
|
closure: work,
|
||||||
name: String::new(),
|
name: String::new(),
|
||||||
status: String::new(),
|
status: String::new(),
|
||||||
},
|
}),
|
||||||
tx: self.tx,
|
tx: self.tx,
|
||||||
rx: self.rx,
|
rx: self.rx,
|
||||||
active: false,
|
active: false,
|
||||||
@ -188,7 +186,7 @@ where
|
|||||||
pub fn work(&mut self) -> Option<Work> {
|
pub fn work(&mut self) -> Option<Work> {
|
||||||
if !self.active {
|
if !self.active {
|
||||||
self.active = true;
|
self.active = true;
|
||||||
Some(self.work.clone())
|
self.work.take()
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
@ -185,8 +185,6 @@ impl MailBackend for ImapType {
|
|||||||
tx.send(AsyncStatus::Finished).unwrap();
|
tx.send(AsyncStatus::Finished).unwrap();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
let connection = connection.clone();
|
|
||||||
let tx = tx.clone();
|
|
||||||
let mut response = String::with_capacity(8 * 1024);
|
let mut response = String::with_capacity(8 * 1024);
|
||||||
let conn = connection.lock();
|
let conn = connection.lock();
|
||||||
exit_on_error!(&tx, conn);
|
exit_on_error!(&tx, conn);
|
||||||
@ -246,7 +244,7 @@ impl MailBackend for ImapType {
|
|||||||
response.lines().collect::<Vec<&str>>().len()
|
response.lines().collect::<Vec<&str>>().len()
|
||||||
);
|
);
|
||||||
match protocol_parser::uid_fetch_responses(&response) {
|
match protocol_parser::uid_fetch_responses(&response) {
|
||||||
Ok((_, v)) => {
|
Ok((_, v, _)) => {
|
||||||
debug!("responses len is {}", v.len());
|
debug!("responses len is {}", v.len());
|
||||||
for UidFetchResponse {
|
for UidFetchResponse {
|
||||||
uid,
|
uid,
|
||||||
|
@ -333,6 +333,13 @@ impl ImapConnection {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn read_response(&mut self, ret: &mut String) -> Result<()> {
|
pub fn read_response(&mut self, ret: &mut String) -> Result<()> {
|
||||||
|
if let (instant, ref mut status @ Ok(())) = *self.online.lock().unwrap() {
|
||||||
|
if Instant::now().duration_since(instant) >= std::time::Duration::new(60 * 30, 0) {
|
||||||
|
*status = Err(MeliError::new("Connection timed out"));
|
||||||
|
self.stream = Err(MeliError::new("Connection timed out"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if let Ok(ref mut stream) = self.stream {
|
if let Ok(ref mut stream) = self.stream {
|
||||||
if let Ok(_) = stream.read_response(ret) {
|
if let Ok(_) = stream.read_response(ret) {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
@ -418,7 +425,7 @@ impl ImapConnection {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn send_command(&mut self, command: &[u8]) -> Result<usize> {
|
pub fn send_command(&mut self, command: &[u8]) -> Result<usize> {
|
||||||
if let (instant, ref mut status @ Ok(())) = *self.online.lock().unwrap() {
|
if let (instant, ref mut status) = *self.online.lock().unwrap() {
|
||||||
if Instant::now().duration_since(instant) >= std::time::Duration::new(60 * 30, 0) {
|
if Instant::now().duration_since(instant) >= std::time::Duration::new(60 * 30, 0) {
|
||||||
*status = Err(MeliError::new("Connection timed out"));
|
*status = Err(MeliError::new("Connection timed out"));
|
||||||
self.stream = Err(MeliError::new("Connection timed out"));
|
self.stream = Err(MeliError::new("Connection timed out"));
|
||||||
|
@ -5,7 +5,9 @@ use std::collections::hash_map::DefaultHasher;
|
|||||||
use std::hash::{Hash, Hasher};
|
use std::hash::{Hash, Hasher};
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
|
|
||||||
pub type ImapParseResult<'a, T> = Result<(&'a str, T)>;
|
#[derive(Debug)]
|
||||||
|
pub struct Alert(String);
|
||||||
|
pub type ImapParseResult<'a, T> = Result<(&'a str, T, Option<Alert>)>;
|
||||||
pub struct ImapLineIterator<'a> {
|
pub struct ImapLineIterator<'a> {
|
||||||
slice: &'a str,
|
slice: &'a str,
|
||||||
}
|
}
|
||||||
@ -328,13 +330,22 @@ pub fn uid_fetch_response(input: &str) -> ImapParseResult<UidFetchResponse<'_>>
|
|||||||
env.set_has_attachments(has_attachments);
|
env.set_has_attachments(has_attachments);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok((&input[i..], ret))
|
Ok((&input[i..], ret, None))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn uid_fetch_responses(mut input: &str) -> ImapParseResult<Vec<UidFetchResponse<'_>>> {
|
pub fn uid_fetch_responses(mut input: &str) -> ImapParseResult<Vec<UidFetchResponse<'_>>> {
|
||||||
let mut ret = Vec::new();
|
let mut ret = Vec::new();
|
||||||
|
let mut alert: Option<Alert> = None;
|
||||||
|
|
||||||
while let Ok((rest, el)) = uid_fetch_response(input) {
|
while let Ok((rest, el, el_alert)) = uid_fetch_response(input) {
|
||||||
|
if let Some(el_alert) = el_alert {
|
||||||
|
match &mut alert {
|
||||||
|
Some(Alert(ref mut alert)) => {
|
||||||
|
alert.extend(el_alert.0.chars());
|
||||||
|
}
|
||||||
|
a @ None => *a = Some(el_alert),
|
||||||
|
}
|
||||||
|
}
|
||||||
input = rest;
|
input = rest;
|
||||||
ret.push(el);
|
ret.push(el);
|
||||||
}
|
}
|
||||||
@ -345,7 +356,7 @@ pub fn uid_fetch_responses(mut input: &str) -> ImapParseResult<Vec<UidFetchRespo
|
|||||||
input
|
input
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
Ok((input, ret))
|
Ok((input, ret, None))
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -306,7 +306,7 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> {
|
|||||||
);
|
);
|
||||||
debug!(&response);
|
debug!(&response);
|
||||||
match protocol_parser::uid_fetch_responses(&response) {
|
match protocol_parser::uid_fetch_responses(&response) {
|
||||||
Ok((_, v)) => {
|
Ok((_, v, _)) => {
|
||||||
let len = v.len();
|
let len = v.len();
|
||||||
let mut ctr = 0;
|
let mut ctr = 0;
|
||||||
for UidFetchResponse {
|
for UidFetchResponse {
|
||||||
@ -421,7 +421,7 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> {
|
|||||||
conn.read_response(&mut response)
|
conn.read_response(&mut response)
|
||||||
);
|
);
|
||||||
match protocol_parser::uid_fetch_responses(&response) {
|
match protocol_parser::uid_fetch_responses(&response) {
|
||||||
Ok((_, v)) => {
|
Ok((_, v, _)) => {
|
||||||
let len = v.len();
|
let len = v.len();
|
||||||
let mut ctr = 0;
|
let mut ctr = 0;
|
||||||
for UidFetchResponse {
|
for UidFetchResponse {
|
||||||
@ -600,7 +600,7 @@ fn examine_updates(
|
|||||||
);
|
);
|
||||||
debug!(&response);
|
debug!(&response);
|
||||||
match protocol_parser::uid_fetch_responses(&response) {
|
match protocol_parser::uid_fetch_responses(&response) {
|
||||||
Ok((_, v)) => {
|
Ok((_, v, _)) => {
|
||||||
for UidFetchResponse {
|
for UidFetchResponse {
|
||||||
uid, flags, body, ..
|
uid, flags, body, ..
|
||||||
} in v
|
} in v
|
||||||
@ -679,7 +679,7 @@ fn examine_updates(
|
|||||||
conn.read_response(&mut response)
|
conn.read_response(&mut response)
|
||||||
);
|
);
|
||||||
match protocol_parser::uid_fetch_responses(&response) {
|
match protocol_parser::uid_fetch_responses(&response) {
|
||||||
Ok((_, v)) => {
|
Ok((_, v, _)) => {
|
||||||
for UidFetchResponse {
|
for UidFetchResponse {
|
||||||
uid, flags, body, ..
|
uid, flags, body, ..
|
||||||
} in v
|
} in v
|
||||||
|
@ -218,26 +218,13 @@ impl MailBackend for MaildirType {
|
|||||||
let sender = Arc::new(sender);
|
let sender = Arc::new(sender);
|
||||||
|
|
||||||
Box::new(move |work_context: crate::async_workers::WorkContext| {
|
Box::new(move |work_context: crate::async_workers::WorkContext| {
|
||||||
let cache_dir = cache_dir.clone();
|
|
||||||
let folder_index = folder_index.clone();
|
|
||||||
let root_path = root_path.clone();
|
|
||||||
let path = path.clone();
|
|
||||||
let name = name.clone();
|
|
||||||
let map = map.clone();
|
|
||||||
let sender = sender.clone();
|
|
||||||
work_context
|
work_context
|
||||||
.set_name
|
.set_name
|
||||||
.send((std::thread::current().id(), name.clone()))
|
.send((std::thread::current().id(), name.clone()))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let thunk = move |sender: &RefreshEventConsumer| {
|
let thunk = move |sender: &RefreshEventConsumer| {
|
||||||
debug!("refreshing");
|
debug!("refreshing");
|
||||||
let cache_dir = cache_dir.clone();
|
|
||||||
let map = map.clone();
|
|
||||||
let folder_index = folder_index.clone();
|
|
||||||
let folder_hash = folder_hash.clone();
|
|
||||||
let root_path = root_path.clone();
|
|
||||||
let mut path = path.clone();
|
let mut path = path.clone();
|
||||||
let cache_dir = cache_dir.clone();
|
|
||||||
path.push("new");
|
path.push("new");
|
||||||
for d in path.read_dir()? {
|
for d in path.read_dir()? {
|
||||||
if let Ok(p) = d {
|
if let Ok(p) = d {
|
||||||
@ -789,35 +776,23 @@ impl MaildirType {
|
|||||||
|
|
||||||
let handle = {
|
let handle = {
|
||||||
let tx = w.tx();
|
let tx = w.tx();
|
||||||
// TODO: Avoid clone
|
|
||||||
let folder: &MaildirFolder = &self.folders[&self.owned_folder_idx(folder)];
|
let folder: &MaildirFolder = &self.folders[&self.owned_folder_idx(folder)];
|
||||||
let folder_hash = folder.hash();
|
let folder_hash = folder.hash();
|
||||||
let unseen = folder.unseen.clone();
|
let unseen = folder.unseen.clone();
|
||||||
let total = folder.total.clone();
|
let total = folder.total.clone();
|
||||||
let tx_final = w.tx();
|
let tx_final = w.tx();
|
||||||
let path: PathBuf = folder.fs_path().into();
|
let mut path: PathBuf = folder.fs_path().into();
|
||||||
let name = format!("parsing {:?}", folder.name());
|
let name = format!("parsing {:?}", folder.name());
|
||||||
let root_path = self.path.to_path_buf();
|
let root_path = self.path.to_path_buf();
|
||||||
let map = self.hash_indexes.clone();
|
let map = self.hash_indexes.clone();
|
||||||
let folder_index = self.folder_index.clone();
|
let folder_index = self.folder_index.clone();
|
||||||
|
|
||||||
let closure = move |work_context: crate::async_workers::WorkContext| {
|
let closure = move |work_context: crate::async_workers::WorkContext| {
|
||||||
let unseen = unseen.clone();
|
|
||||||
let total = total.clone();
|
|
||||||
let name = name.clone();
|
|
||||||
work_context
|
work_context
|
||||||
.set_name
|
.set_name
|
||||||
.send((std::thread::current().id(), name.clone()))
|
.send((std::thread::current().id(), name.clone()))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let root_path = root_path.clone();
|
let mut thunk = move || {
|
||||||
let map = map.clone();
|
|
||||||
let folder_index = folder_index.clone();
|
|
||||||
let tx = tx.clone();
|
|
||||||
let cache_dir = cache_dir.clone();
|
|
||||||
let path = path.clone();
|
|
||||||
let thunk = move || {
|
|
||||||
let mut path = path.clone();
|
|
||||||
let cache_dir = cache_dir.clone();
|
|
||||||
path.push("new");
|
path.push("new");
|
||||||
for d in path.read_dir()? {
|
for d in path.read_dir()? {
|
||||||
if let Ok(p) = d {
|
if let Ok(p) = d {
|
||||||
@ -841,7 +816,6 @@ impl MaildirType {
|
|||||||
if !files.is_empty() {
|
if !files.is_empty() {
|
||||||
crossbeam::scope(|scope| {
|
crossbeam::scope(|scope| {
|
||||||
let mut threads = Vec::with_capacity(cores);
|
let mut threads = Vec::with_capacity(cores);
|
||||||
let cache_dir = cache_dir.clone();
|
|
||||||
let chunk_size = if count / cores > 0 {
|
let chunk_size = if count / cores > 0 {
|
||||||
count / cores
|
count / cores
|
||||||
} else {
|
} else {
|
||||||
|
@ -480,7 +480,7 @@ impl Account {
|
|||||||
work_context: &WorkContext,
|
work_context: &WorkContext,
|
||||||
notify_fn: Arc<NotifyFn>,
|
notify_fn: Arc<NotifyFn>,
|
||||||
) -> Worker {
|
) -> Worker {
|
||||||
let mailbox_handle = backend.write().unwrap().get(&folder);
|
let mut mailbox_handle = backend.write().unwrap().get(&folder);
|
||||||
let mut builder = AsyncBuilder::new();
|
let mut builder = AsyncBuilder::new();
|
||||||
let our_tx = builder.tx();
|
let our_tx = builder.tx();
|
||||||
let folder_hash = folder.hash();
|
let folder_hash = folder.hash();
|
||||||
@ -508,7 +508,6 @@ impl Account {
|
|||||||
builder.set_priority(priority).set_is_static(true);
|
builder.set_priority(priority).set_is_static(true);
|
||||||
let mut w = builder.build(Box::new(move |work_context| {
|
let mut w = builder.build(Box::new(move |work_context| {
|
||||||
let name = format!("Parsing {}", folder.path());
|
let name = format!("Parsing {}", folder.path());
|
||||||
let mut mailbox_handle = mailbox_handle.clone();
|
|
||||||
let work = mailbox_handle.work().unwrap();
|
let work = mailbox_handle.work().unwrap();
|
||||||
work_context.new_work.send(work).unwrap();
|
work_context.new_work.send(work).unwrap();
|
||||||
let thread_id = std::thread::current().id();
|
let thread_id = std::thread::current().id();
|
||||||
|
Loading…
Reference in New Issue
Block a user