diff --git a/src/pipe_reader.rs b/src/pipe_reader.rs index 19716a8..c053dca 100644 --- a/src/pipe_reader.rs +++ b/src/pipe_reader.rs @@ -1,47 +1,28 @@ -use crate::app::{ExternalMsg, MsgIn, Task}; +use crate::app::ExternalMsg; use anyhow::Result; use std::fs; use std::io::prelude::*; -use std::sync::mpsc::Sender; -pub fn read_all(pipe: &str, tx: Sender) -> Result<()> { - match fs::OpenOptions::new() +pub fn read_all(pipe: &str) -> Result> { + let mut file = fs::OpenOptions::new() .read(true) .write(true) .create(false) - .open(&pipe) - { - Ok(mut file) => { - let mut in_str = String::new(); - file.read_to_string(&mut in_str)?; - file.set_len(0)?; + .open(&pipe)?; - if !in_str.is_empty() { - let msgs = in_str - .lines() - .map(|s| serde_yaml::from_str::(s.trim())); + let mut in_str = String::new(); + file.read_to_string(&mut in_str)?; + file.set_len(0)?; - for msg in msgs { - match msg { - Ok(m) => { - tx.send(Task::new(MsgIn::External(m), None))?; - } - Err(e) => tx.send(Task::new( - MsgIn::External(ExternalMsg::LogError(e.to_string())), - None, - ))?, - } - } - } + if !in_str.is_empty() { + let mut msgs = vec![]; + for msg in in_str.lines().map(|s| serde_yaml::from_str(s.trim())) { + msgs.push(msg?); } - Err(err) => { - tx.send(Task::new( - MsgIn::External(ExternalMsg::LogError(err.to_string())), - None, - ))?; - } - }; - Ok(()) + Ok(msgs) + } else { + Ok(vec![]) + } } // pub fn keep_reading(pipe: String, tx: Sender) { diff --git a/src/runner.rs b/src/runner.rs index a01b6a3..0eba62d 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -225,7 +225,20 @@ pub fn run( }) .unwrap_or_else(|e| Err(e.to_string())); - pipe_reader::read_all(app.pipe().msg_in(), tx_msg_in.clone())?; + match pipe_reader::read_all(app.pipe().msg_in()) { + Ok(msgs) => { + for msg in msgs { + app = app.handle_task(app::Task::new( + app::MsgIn::External(msg), + None, + ))?; + } + } + Err(err) => { + app = app.log_error(err.to_string())?; + } + }; + app.cleanup_pipes()?; if let Err(e) = status { @@ -291,7 +304,20 @@ pub fn run( }) .unwrap_or_else(|e| Err(e.to_string())); - pipe_reader::read_all(app.pipe().msg_in(), tx_msg_in.clone())?; + match pipe_reader::read_all(app.pipe().msg_in()) { + Ok(msgs) => { + for msg in msgs { + app = app.handle_task(app::Task::new( + app::MsgIn::External(msg), + None, + ))?; + } + } + Err(err) => { + app = app.log_error(err.to_string())?; + } + }; + app.cleanup_pipes()?; if let Err(e) = status {