remove one level of abstraction from spawning adapters

pull/152/head
phiresky 1 year ago
parent 3efc0a727c
commit 2e1c74909e

@ -1,14 +1,27 @@
use super::{
spawning::{SpawningFileAdapter, SpawningFileAdapterTrait},
AdapterMeta, GetMetadata,
use super::*;
use super::{AdaptInfo, AdapterMeta, FileAdapter, GetMetadata};
use crate::{
adapted_iter::AdaptedFilesIterBox,
expand::expand_str_ez,
matching::{FastFileMatcher, FileMatcher},
};
use crate::{matching::{FastFileMatcher, FileMatcher}, expand::expand_str_ez};
use anyhow::Result;
use async_stream::{stream, AsyncStream};
use bytes::{Buf, Bytes};
use lazy_static::lazy_static;
use log::debug;
use log::*;
use regex::{Captures, Regex};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::path::Path;
use tokio::process::Command;
use tokio_util::io::StreamReader;
use std::future::Future;
use std::process::{ExitStatus, Stdio};
use tokio::io::AsyncReadExt;
use tokio::process::Child;
// mostly the same as AdapterMeta + SpawningFileAdapter
#[derive(Debug, Deserialize, Serialize, JsonSchema, Default, PartialEq, Clone)]
@ -33,7 +46,11 @@ pub struct CustomAdapterConfig {
/// {}: the file path (TODO)
/// stdin of the program will be connected to the input file, and stdout is assumed to be the converted file
pub args: Vec<String>,
// TODO: make more flexible for inner matching (e.g. foo.tar.gz should be foo.tar after gunzipping)
/// The output path hint.
///
/// If not set, defaults to ${input_path}.txt
///
/// TODO: make more flexible for inner matching (e.g. foo.tar.gz should be foo.tar after gunzipping)
pub output_path_hint: Option<String>,
}
@ -94,7 +111,8 @@ lazy_static! {
"--markdown-headings=atx"
]),
disabled_by_default: None,
match_only_by_mime: None
match_only_by_mime: None,
output_path_hint: None
},
CustomAdapterConfig {
name: "poppler".to_owned(),
@ -108,12 +126,72 @@ lazy_static! {
binary: "pdftotext".to_string(),
args: strs(&["-", "-"]),
disabled_by_default: None,
match_only_by_mime: None
match_only_by_mime: None,
output_path_hint: Some("${input_path}.txt.asciipagebreaks".into())
// postprocessors: [{name: "add_page_numbers_by_pagebreaks"}]
}
];
}
/// replace a Command.spawn() error "File not found" with a more readable error
/// to indicate some program is not installed
pub fn map_exe_error(err: std::io::Error, exe_name: &str, help: &str) -> Error {
use std::io::ErrorKind::*;
match err.kind() {
NotFound => format_err!("Could not find executable \"{}\". {}", exe_name, help),
_ => Error::from(err),
}
}
/** waits for a process to finish, returns an io error if the process failed */
struct ProcWaitReader {
process: Option<Child>,
future: Option<Pin<Box<dyn Future<Output = std::io::Result<ExitStatus>>>>>,
}
impl ProcWaitReader {
fn new(cmd: Child) -> ProcWaitReader {
ProcWaitReader {
process: Some(cmd),
future: None,
}
}
}
fn proc_wait(mut child: Child) -> impl AsyncRead {
let s = stream! {
let res = child.wait().await?;
if res.success() {
yield std::io::Result::Ok(Bytes::new());
} else {
yield std::io::Result::Err(std::io::Error::new(
std::io::ErrorKind::Other,
format_err!("subprocess failed: {:?}", res),
));
}
};
StreamReader::new(s)
}
pub fn pipe_output<'a>(
_line_prefix: &str,
mut cmd: Command,
inp: ReadBox,
exe_name: &str,
help: &str,
) -> Result<ReadBox> {
let mut cmd = cmd
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
.map_err(|e| map_exe_error(e, exe_name, help))?;
let mut stdi = cmd.stdin.take().expect("is piped");
let stdo = cmd.stdout.take().expect("is piped");
tokio::spawn(async move {
let mut z = inp;
tokio::io::copy(&mut z, &mut stdi).await.unwrap();
});
Ok(Box::pin(stdo.chain(proc_wait(cmd))))
}
pub struct CustomSpawningFileAdapter {
binary: String,
args: Vec<String>,
@ -127,13 +205,14 @@ impl GetMetadata for CustomSpawningFileAdapter {
}
fn arg_replacer(arg: &str, filepath_hint: &Path) -> Result<String> {
Ok(expand_str_ez(arg, |s| match s {
"file_extension" => &filepath_hint
"file_extension" => filepath_hint
.extension()
.map(|e| e.to_string_lossy())
.unwrap_or("".into()),
_ => panic!("unknown replacer"),
}))
}
impl SpawningFileAdapterTrait for CustomSpawningFileAdapter {
impl CustomSpawningFileAdapter {
fn get_exe(&self) -> &str {
&self.binary
}
@ -152,12 +231,53 @@ impl SpawningFileAdapterTrait for CustomSpawningFileAdapter {
Ok(command)
}
}
impl FileAdapter for CustomSpawningFileAdapter {
fn adapt<'a>(
&self,
ai: AdaptInfo,
_detection_reason: &FileMatcher,
) -> Result<AdaptedFilesIterBox> {
let AdaptInfo {
filepath_hint,
inp,
line_prefix,
archive_recursion_depth,
postprocess,
config,
..
} = ai;
let cmd = Command::new(&self.binary);
let cmd = self
.command(&filepath_hint, cmd)
.with_context(|| format!("Could not set cmd arguments for {}", self.binary))?;
debug!("executing {:?}", cmd);
let output = pipe_output(&line_prefix, cmd, inp, &self.binary, "")?;
Ok(Box::pin(tokio_stream::once(AdaptInfo {
filepath_hint: PathBuf::from(expand_str_ez(
self.output_path_hint
.as_deref()
.unwrap_or("${input_path}.txt"),
|r| match r {
"input_path" => filepath_hint.to_string_lossy(),
_ => panic!("unknown replacer in output_path_hint"),
},
)),
inp: output,
line_prefix,
is_real_file: false,
archive_recursion_depth,
postprocess,
config,
})))
}
}
impl CustomAdapterConfig {
pub fn to_adapter(&self) -> SpawningFileAdapter {
let ad = CustomSpawningFileAdapter {
pub fn to_adapter(&self) -> CustomSpawningFileAdapter {
CustomSpawningFileAdapter {
binary: self.binary.clone(),
args: self.args.clone(),
output_path_hint: self.output_path_hint,
output_path_hint: self.output_path_hint.clone(),
meta: AdapterMeta {
name: self.name.clone(),
version: self.version,
@ -182,8 +302,7 @@ impl CustomAdapterConfig {
keep_fast_matchers_if_accurate: !self.match_only_by_mime.unwrap_or(false),
disabled_by_default: self.disabled_by_default.unwrap_or(false),
},
};
SpawningFileAdapter::new(Box::new(ad))
}
}
}
@ -223,4 +342,52 @@ PREFIX:\u{c}
);
Ok(())
}
use std::io::Cursor;
use super::*;
use crate::adapters::FileAdapter;
use crate::{
adapters::custom::CustomAdapterConfig,
test_utils::{adapted_to_vec, simple_adapt_info},
};
#[tokio::test]
async fn streaming() -> anyhow::Result<()> {
// an adapter that converts input line by line (deadlocks if the parent process tries to write everything and only then read it)
let adapter = CustomAdapterConfig {
name: "simple text replacer".to_string(),
description: "oo".to_string(),
disabled_by_default: None,
version: 1,
extensions: vec!["txt".to_string()],
mimetypes: None,
match_only_by_mime: None,
binary: "sed".to_string(),
args: vec!["s/e/u/g".to_string()],
};
let adapter = adapter.to_adapter();
let input = r#"
This is the story of a
very strange lorry
with a long dead crew
and a witch with the flu
"#;
let input = format!("{0}{0}{0}{0}", input);
let input = format!("{0}{0}{0}{0}", input);
let input = format!("{0}{0}{0}{0}", input);
let input = format!("{0}{0}{0}{0}", input);
let input = format!("{0}{0}{0}{0}", input);
let input = format!("{0}{0}{0}{0}", input);
let (a, d) = simple_adapt_info(
&Path::new("foo.txt"),
Box::pin(Cursor::new(Vec::from(input))),
);
let output = adapter.adapt(a, &d).unwrap();
let oup = adapted_to_vec(output).await?;
println!("output: {}", String::from_utf8_lossy(&oup));
Ok(())
}
}

@ -144,7 +144,6 @@ pub fn postproc_prefix(line_prefix: &str, inp: impl AsyncRead + Send) -> impl As
Box::pin(StreamReader::new(oup_stream))
}
pub struct PostprocPageBreaks {}
impl GetMetadata for PostprocPageBreaks {
fn metadata(&self) -> &super::AdapterMeta {
@ -154,7 +153,7 @@ impl GetMetadata for PostprocPageBreaks {
version: 1,
description: "Adds the page number to each line for an input file that specifies page breaks as ascii page break character".to_owned(),
recurses: false,
fast_matchers: vec![FastFileMatcher::FileExtension("txtwithpagebreaks".to_string())],
fast_matchers: vec![FastFileMatcher::FileExtension("asciipagebreaks".to_string())],
slow_matchers: None,
keep_fast_matchers_if_accurate: false,
disabled_by_default: false
@ -221,9 +220,9 @@ pub fn postproc_pagebreaks(
mod tests {
use super::*;
use anyhow::Result;
use tokio::pin;
use tokio_test::io::Builder;
use tokio_test::io::Mock;
use tokio::pin;
#[tokio::test]
async fn test_with_pagebreaks() {

@ -1,186 +1 @@
use super::*;
use anyhow::Result;
use async_stream::{stream, AsyncStream};
use bytes::{Buf, Bytes};
use log::*;
use tokio_util::io::StreamReader;
use crate::adapters::FileAdapter;
use crate::expand::expand_str_ez;
use std::future::Future;
use std::path::Path;
use std::process::{ExitStatus, Stdio};
use tokio::io::AsyncReadExt;
use tokio::process::{Child, Command};
// TODO: don't separate the trait and the struct
pub trait SpawningFileAdapterTrait: GetMetadata + Send + Sync {
fn get_exe(&self) -> &str;
fn command(&self, filepath_hint: &Path, command: Command) -> Result<Command>;
}
pub struct SpawningFileAdapter {
inner: Box<dyn SpawningFileAdapterTrait>,
}
impl SpawningFileAdapter {
pub fn new(inner: Box<dyn SpawningFileAdapterTrait>) -> SpawningFileAdapter {
SpawningFileAdapter { inner }
}
}
impl GetMetadata for SpawningFileAdapter {
fn metadata(&self) -> &AdapterMeta {
self.inner.metadata()
}
}
/// replace a Command.spawn() error "File not found" with a more readable error
/// to indicate some program is not installed
pub fn map_exe_error(err: std::io::Error, exe_name: &str, help: &str) -> Error {
use std::io::ErrorKind::*;
match err.kind() {
NotFound => format_err!("Could not find executable \"{}\". {}", exe_name, help),
_ => Error::from(err),
}
}
/** waits for a process to finish, returns an io error if the process failed */
struct ProcWaitReader {
process: Option<Child>,
future: Option<Pin<Box<dyn Future<Output = std::io::Result<ExitStatus>>>>>,
}
impl ProcWaitReader {
fn new(cmd: Child) -> ProcWaitReader {
ProcWaitReader {
process: Some(cmd),
future: None,
}
}
}
fn proc_wait(mut child: Child) -> impl AsyncRead {
let s = stream! {
let res = child.wait().await?;
if res.success() {
yield std::io::Result::Ok(Bytes::new());
} else {
yield std::io::Result::Err(std::io::Error::new(
std::io::ErrorKind::Other,
format_err!("subprocess failed: {:?}", res),
));
}
};
StreamReader::new(s)
}
pub fn pipe_output<'a>(
_line_prefix: &str,
mut cmd: Command,
inp: ReadBox,
exe_name: &str,
help: &str,
) -> Result<ReadBox> {
let mut cmd = cmd
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
.map_err(|e| map_exe_error(e, exe_name, help))?;
let mut stdi = cmd.stdin.take().expect("is piped");
let stdo = cmd.stdout.take().expect("is piped");
tokio::spawn(async move {
let mut z = inp;
tokio::io::copy(&mut z, &mut stdi).await.unwrap();
});
Ok(Box::pin(stdo.chain(proc_wait(cmd))))
}
impl FileAdapter for SpawningFileAdapter {
fn adapt<'a>(
&self,
ai: AdaptInfo,
_detection_reason: &FileMatcher,
) -> Result<AdaptedFilesIterBox> {
let AdaptInfo {
filepath_hint,
inp,
line_prefix,
archive_recursion_depth,
postprocess,
config,
..
} = ai;
let cmd = Command::new(self.inner.get_exe());
let cmd = self
.inner
.command(&filepath_hint, cmd)
.with_context(|| format!("Could not set cmd arguments for {}", self.inner.get_exe()))?;
debug!("executing {:?}", cmd);
let output = pipe_output(&line_prefix, cmd, inp, self.inner.get_exe(), "")?;
Ok(Box::pin(tokio_stream::once(AdaptInfo {
filepath_hint: PathBuf::from(
expand_str_ez(self.inner.output_path_hint, |r| match r {
"fullname" => &filepath_hint.to_string_lossy()
}
)),
inp: output,
line_prefix,
is_real_file: false,
archive_recursion_depth,
postprocess,
config,
})))
}
}
#[cfg(test)]
mod test {
use std::io::Cursor;
use super::*;
use crate::adapters::FileAdapter;
use crate::{
adapters::custom::CustomAdapterConfig,
test_utils::{adapted_to_vec, simple_adapt_info},
};
#[tokio::test]
async fn streaming() -> anyhow::Result<()> {
// an adapter that converts input line by line (deadlocks if the parent process tries to write everything and only then read it)
let adapter = CustomAdapterConfig {
name: "simple text replacer".to_string(),
description: "oo".to_string(),
disabled_by_default: None,
version: 1,
extensions: vec!["txt".to_string()],
mimetypes: None,
match_only_by_mime: None,
binary: "sed".to_string(),
args: vec!["s/e/u/g".to_string()],
};
let adapter = adapter.to_adapter();
let input = r#"
This is the story of a
very strange lorry
with a long dead crew
and a witch with the flu
"#;
let input = format!("{0}{0}{0}{0}", input);
let input = format!("{0}{0}{0}{0}", input);
let input = format!("{0}{0}{0}{0}", input);
let input = format!("{0}{0}{0}{0}", input);
let input = format!("{0}{0}{0}{0}", input);
let input = format!("{0}{0}{0}{0}", input);
let (a, d) = simple_adapt_info(
&Path::new("foo.txt"),
Box::pin(Cursor::new(Vec::from(input))),
);
let output = adapter.adapt(a, &d).unwrap();
let oup = adapted_to_vec(output).await?;
println!("output: {}", String::from_utf8_lossy(&oup));
Ok(())
}
}

@ -1,3 +1,5 @@
use std::borrow::Cow;
use regex::Captures;
// from https://github.com/phiresky/timetrackrs/blob/1c3df09ba2c1fda6065f2927045bd28dea0738d3/src/expand.rs
@ -27,7 +29,7 @@ pub fn get_capture<'a>(caps: &'a [Captures], reference: &str) -> Option<&'a str>
pub fn expand_str_captures(caps: &[Captures], replacement: &str) -> String {
let mut dst = String::new();
expand_str_lambda(
|reference: &str| get_capture(caps, reference).unwrap_or(""),
|reference: &str| Cow::Borrowed(get_capture(caps, reference).unwrap_or("")),
replacement,
&mut dst,
);
@ -36,7 +38,7 @@ pub fn expand_str_captures(caps: &[Captures], replacement: &str) -> String {
pub fn expand_str_ez<'a, F>(replacement: &'a str, lambda: F) -> String
where
F: Fn(&str) -> &'a str,
F: Fn(&str) -> Cow<'a, str>,
{
let mut dst = String::new();
expand_str_lambda(lambda, replacement, &mut dst);
@ -45,7 +47,7 @@ where
pub fn expand_str_lambda<'a, F>(cap: F, replacement: &'a str, dst: &mut String)
where
F: Fn(&str) -> &'a str,
F: Fn(&str) -> Cow<'a, str>,
{
let mut replacement = replacement;
while !replacement.is_empty() {
@ -71,7 +73,7 @@ where
}
};
replacement = &replacement[cap_ref.end..];
dst.push_str(cap(cap_ref.cap));
dst.push_str(cap(cap_ref.cap).as_ref());
}
dst.push_str(replacement);
}

Loading…
Cancel
Save