diff --git a/Cargo.lock b/Cargo.lock index 7e540ec..388e8ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -288,6 +288,27 @@ dependencies = [ "winapi", ] +[[package]] +name = "dyn-clonable" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e9232f0e607a262ceb9bd5141a3dfb3e4db6994b31989bbfd845878cba59fd4" +dependencies = [ + "dyn-clonable-impl", + "dyn-clone", +] + +[[package]] +name = "dyn-clonable-impl" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "558e40ea573c374cf53507fd240b7ee2f5477df7cfebdb97323ec61c719399c5" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "dyn-clone" version = "1.0.1" @@ -950,8 +971,11 @@ dependencies = [ "chrono", "clap", "crossbeam", + "crossbeam-channel", "derive_more", "directories-next", + "dyn-clonable", + "dyn-clone", "encoding_rs", "encoding_rs_io", "env_logger", diff --git a/Cargo.toml b/Cargo.toml index 373af1d..cddb686 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,3 +49,6 @@ directories-next = "1.0.1" derive_more = "0.99.7" pretty-bytes = "0.2.2" memchr = "2.3.3" +crossbeam-channel = "0.4.2" +dyn-clone = "1.0.1" +dyn-clonable = "0.9.0" diff --git a/exampledir/test.djvu b/exampledir/test.djvu new file mode 100644 index 0000000..3028bb5 Binary files /dev/null and b/exampledir/test.djvu differ diff --git a/exampledir/test/hello.gz b/exampledir/test/hello.gz new file mode 100644 index 0000000..33a6d70 Binary files /dev/null and b/exampledir/test/hello.gz differ diff --git a/exampledir/test/hello.sqlite3 b/exampledir/test/hello.sqlite3 new file mode 100644 index 0000000..8faee19 Binary files /dev/null and b/exampledir/test/hello.sqlite3 differ diff --git a/exampledir/test/short.pdf b/exampledir/test/short.pdf new file mode 100644 index 0000000..0371069 Binary files /dev/null and b/exampledir/test/short.pdf differ diff --git a/exampledir/test/short.pdf.gz b/exampledir/test/short.pdf.gz new file mode 100644 index 0000000..40a9ef9 Binary files /dev/null and b/exampledir/test/short.pdf.gz differ diff --git a/src/adapters.rs b/src/adapters.rs index 39be834..8584a91 100644 --- a/src/adapters.rs +++ b/src/adapters.rs @@ -1,28 +1,31 @@ pub mod custom; pub mod decompress; -pub mod ffmpeg; +//pub mod ffmpeg; pub mod fns; -pub mod pdfpages; +//pub mod pdfpages; pub mod poppler; pub mod spawning; pub mod sqlite; -pub mod tar; -pub mod tesseract; -pub mod zip; +//pub mod tar; +//pub mod tesseract; +pub mod writing; +// pub mod zip; use crate::matching::*; use crate::preproc::PreprocConfig; use anyhow::*; use custom::builtin_spawning_adapters; use custom::CustomAdapterConfig; use log::*; -use regex::Regex; + use std::borrow::Cow; use std::collections::HashMap; use std::io::prelude::*; use std::iter::Iterator; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::rc::Rc; +pub type ReadBox = Box; + pub struct AdapterMeta { /// unique short name of this adapter (a-z0-9 only) pub name: String, @@ -63,22 +66,20 @@ pub trait FileAdapter: GetMetadata { /// adapt a file. /// /// detection_reason is the Matcher that was used to identify this file. Unless --rga-accurate was given, it is always a FastMatcher - fn adapt(&self, a: AdaptInfo, detection_reason: &SlowMatcher) -> Result<()>; + fn adapt(&self, a: AdaptInfo, detection_reason: &SlowMatcher) -> Result; } -pub struct AdaptInfo<'a> { +pub struct AdaptInfo { /// file path. May not be an actual file on the file system (e.g. in an archive). Used for matching file extensions. - pub filepath_hint: &'a Path, + pub filepath_hint: PathBuf, /// true if filepath_hint is an actual file on the file system pub is_real_file: bool, /// depth at which this file is in archives. 0 for real filesystem pub archive_recursion_depth: i32, /// stream to read the file from. can be from a file or from some decoder - pub inp: &'a mut dyn Read, - /// stream to write to. will be written to from a different thread - pub oup: &'a mut (dyn Write + Send), + pub inp: ReadBox, /// prefix every output line with this string to better indicate the file's location if it is in some archive - pub line_prefix: &'a str, - pub config: PreprocConfig<'a>, + pub line_prefix: String, + pub config: PreprocConfig, } /// (enabledAdapters, disabledAdapters) @@ -94,13 +95,13 @@ pub fn get_all_adapters(custom_adapters: Option>) -> Ad } let internal_adapters: Vec> = vec![ - Rc::new(ffmpeg::FFmpegAdapter::new()), - Rc::new(zip::ZipAdapter::new()), + //Rc::new(ffmpeg::FFmpegAdapter::new()), + //Rc::new(zip::ZipAdapter::new()), Rc::new(decompress::DecompressAdapter::new()), - Rc::new(tar::TarAdapter::new()), + // Rc::new(tar::TarAdapter::new()), Rc::new(sqlite::SqliteAdapter::new()), - Rc::new(pdfpages::PdfPagesAdapter::new()), - Rc::new(tesseract::TesseractAdapter::new()), + // Rc::new(pdfpages::PdfPagesAdapter::new()), + //Rc::new(tesseract::TesseractAdapter::new()), ]; adapters.extend( builtin_spawning_adapters diff --git a/src/adapters/custom.rs b/src/adapters/custom.rs index 7c46d96..71d7c6c 100644 --- a/src/adapters/custom.rs +++ b/src/adapters/custom.rs @@ -1,4 +1,7 @@ -use super::{spawning::SpawningFileAdapter, AdapterMeta, GetMetadata}; +use super::{ + spawning::{SpawningFileAdapter, SpawningFileAdapterTrait}, + AdapterMeta, GetMetadata, +}; use crate::matching::{FastMatcher, SlowMatcher}; use lazy_static::lazy_static; use schemars::JsonSchema; @@ -112,7 +115,7 @@ impl GetMetadata for CustomSpawningFileAdapter { &self.meta } } -impl SpawningFileAdapter for CustomSpawningFileAdapter { +impl SpawningFileAdapterTrait for CustomSpawningFileAdapter { fn get_exe(&self) -> &str { &self.binary } @@ -126,12 +129,12 @@ impl SpawningFileAdapter for CustomSpawningFileAdapter { } } impl CustomAdapterConfig { - pub fn to_adapter(self) -> CustomSpawningFileAdapter { - CustomSpawningFileAdapter { + pub fn to_adapter(&self) -> SpawningFileAdapter { + let ad = CustomSpawningFileAdapter { binary: self.binary.clone(), args: self.args.clone(), meta: AdapterMeta { - name: self.name, + name: self.name.clone(), version: self.version, description: format!( "{}\nRuns: {} {}", @@ -145,7 +148,7 @@ impl CustomAdapterConfig { .iter() .map(|s| FastMatcher::FileExtension(s.to_string())) .collect(), - slow_matchers: self.mimetypes.map(|mimetypes| { + slow_matchers: self.mimetypes.as_ref().map(|mimetypes| { mimetypes .iter() .map(|s| SlowMatcher::MimeType(s.to_string())) @@ -153,6 +156,43 @@ impl CustomAdapterConfig { }), disabled_by_default: self.disabled_by_default.unwrap_or(false), }, - } + }; + SpawningFileAdapter::new(Box::new(ad)) + } +} + +#[cfg(test)] +mod test { + use super::super::FileAdapter; + use super::*; + use crate::test_utils::*; + use anyhow::Result; + use std::fs::File; + + #[test] + fn poppler() -> Result<()> { + let adapter = builtin_spawning_adapters + .iter() + .find(|e| e.name == "poppler") + .expect("no poppler adapter"); + + let adapter = adapter.to_adapter(); + + let filepath = test_data_dir().join("short.pdf"); + + let (a, d) = simple_adapt_info(&filepath, Box::new(File::open(&filepath)?)); + let mut r = adapter.adapt(a, &d)?; + let mut o = Vec::new(); + r.read_to_end(&mut o)?; + assert_eq!( + String::from_utf8(o)?, + "hello world +this is just a test. + +1 + +\u{c}" + ); + Ok(()) } } diff --git a/src/adapters/decompress.rs b/src/adapters/decompress.rs index 2b2bf84..50399c8 100644 --- a/src/adapters/decompress.rs +++ b/src/adapters/decompress.rs @@ -1,6 +1,6 @@ use super::*; use crate::preproc::rga_preproc; -use anyhow::*; +use anyhow::Result; use lazy_static::lazy_static; use std::path::PathBuf; @@ -47,16 +47,13 @@ impl GetMetadata for DecompressAdapter { } } -fn decompress_any<'a, R>(reason: &SlowMatcher, inp: &'a mut R) -> Result> -where - R: Read, -{ +fn decompress_any(reason: &SlowMatcher, inp: ReadBox) -> Result { use FastMatcher::*; use SlowMatcher::*; - let gz = |inp: &'a mut R| Box::new(flate2::read::MultiGzDecoder::new(inp)); - let bz2 = |inp: &'a mut R| Box::new(bzip2::read::BzDecoder::new(inp)); - let xz = |inp: &'a mut R| Box::new(xz2::read::XzDecoder::new_multi_decoder(inp)); - let zst = |inp: &'a mut R| zstd::stream::read::Decoder::new(inp); // returns result + let gz = |inp: ReadBox| Box::new(flate2::read::MultiGzDecoder::new(inp)); + let bz2 = |inp: ReadBox| Box::new(bzip2::read::BzDecoder::new(inp)); + let xz = |inp: ReadBox| Box::new(xz2::read::XzDecoder::new_multi_decoder(inp)); + let zst = |inp: ReadBox| zstd::stream::read::Decoder::new(inp); // returns result Ok(match reason { Fast(FileExtension(ext)) => match ext.as_ref() { @@ -92,35 +89,33 @@ fn get_inner_filename(filename: &Path) -> PathBuf { } impl FileAdapter for DecompressAdapter { - fn adapt(&self, ai: AdaptInfo, detection_reason: &SlowMatcher) -> Result<()> { + fn adapt(&self, ai: AdaptInfo, detection_reason: &SlowMatcher) -> Result { let AdaptInfo { filepath_hint, - mut inp, - oup, + inp, line_prefix, archive_recursion_depth, config, .. } = ai; - let mut decompress = decompress_any(detection_reason, &mut inp)?; let ai2: AdaptInfo = AdaptInfo { - filepath_hint: &get_inner_filename(filepath_hint), + filepath_hint: get_inner_filename(&filepath_hint), is_real_file: false, archive_recursion_depth: archive_recursion_depth + 1, - inp: &mut decompress, - oup, + inp: decompress_any(detection_reason, inp)?, line_prefix, config: config.clone(), }; - rga_preproc(ai2)?; - Ok(()) + rga_preproc(ai2) } } #[cfg(test)] mod tests { use super::*; + use crate::test_utils::*; + use std::fs::File; #[test] fn test_inner_filename() { for (a, b) in &[ @@ -135,4 +130,40 @@ mod tests { assert_eq!(get_inner_filename(&PathBuf::from(a)), PathBuf::from(*b)); } } + + #[test] + fn gz() -> Result<()> { + let adapter = DecompressAdapter; + + let filepath = test_data_dir().join("hello.gz"); + + let (a, d) = simple_adapt_info(&filepath, Box::new(File::open(&filepath)?)); + let mut r = adapter.adapt(a, &d)?; + let mut o = Vec::new(); + r.read_to_end(&mut o)?; + assert_eq!(String::from_utf8(o)?, "hello\n"); + Ok(()) + } + + #[test] + fn pdf_gz() -> Result<()> { + let adapter = DecompressAdapter; + + let filepath = test_data_dir().join("short.pdf.gz"); + + let (a, d) = simple_adapt_info(&filepath, Box::new(File::open(&filepath)?)); + let mut r = adapter.adapt(a, &d)?; + let mut o = Vec::new(); + r.read_to_end(&mut o)?; + assert_eq!( + String::from_utf8(o)?, + "hello world +this is just a test. + +1 + +\u{c}" + ); + Ok(()) + } } diff --git a/src/adapters/fns.rs b/src/adapters/fns.rs index 1ef2ba3..4f6e9e8 100644 --- a/src/adapters/fns.rs +++ b/src/adapters/fns.rs @@ -92,7 +92,7 @@ where } } -pub fn postprocB(line_prefix: &str, inp: impl Read) -> Result { +pub fn postprocB(_line_prefix: &str, inp: impl Read) -> Result { let mut page_count = 1; Ok(ByteReplacer { diff --git a/src/adapters/poppler.rs b/src/adapters/poppler.rs index addc9df..1439eb8 100644 --- a/src/adapters/poppler.rs +++ b/src/adapters/poppler.rs @@ -1,8 +1,8 @@ -use super::*; -use lazy_static::lazy_static; -use spawning::SpawningFileAdapter; -use std::io::BufReader; -use std::process::Command; + + + + + /* static EXTENSIONS: &[&str] = &["pdf"]; diff --git a/src/adapters/spawning.rs b/src/adapters/spawning.rs index d111950..0281963 100644 --- a/src/adapters/spawning.rs +++ b/src/adapters/spawning.rs @@ -5,7 +5,7 @@ use log::*; use std::io::prelude::*; use std::io::BufReader; use std::process::Command; -use std::process::Stdio; +use std::process::{Child, Stdio}; /** * Copy a Read to a Write, while prefixing every line with a prefix. @@ -53,15 +53,37 @@ pub fn postproc_line_prefix( } Ok(()) } -pub trait SpawningFileAdapter: GetMetadata { +pub trait SpawningFileAdapterTrait: GetMetadata { fn get_exe(&self) -> &str; fn command(&self, filepath_hint: &Path, command: Command) -> Command; - fn postproc(line_prefix: &str, inp: &mut dyn Read, oup: &mut dyn Write) -> Result<()> { + /*fn postproc(&self, line_prefix: &str, inp: &mut dyn Read, oup: &mut dyn Write) -> Result<()> { postproc_line_prefix(line_prefix, inp, oup) + }*/ +} + +pub struct SpawningFileAdapter { + inner: Box, +} + +impl SpawningFileAdapter { + pub fn new(inner: Box) -> SpawningFileAdapter { + SpawningFileAdapter { inner } + } +} + +impl GetMetadata for SpawningFileAdapter { + fn metadata(&self) -> &AdapterMeta { + self.inner.metadata() } } +/*impl From for SpawningFileAdapter { + fn from(e: dyn T) -> Self { + SpawningFileAdapter { inner: Box::new(e) } + } +}*/ + /// replace a Command.spawn() error "File not found" with a more readable error /// to indicate some program is not installed pub fn map_exe_error(err: std::io::Error, exe_name: &str, help: &str) -> Error { @@ -71,63 +93,61 @@ pub fn map_exe_error(err: std::io::Error, exe_name: &str, help: &str) -> Error { _ => Error::from(err), } } + +struct ProcWaitReader { + proce: Child, +} +impl Read for ProcWaitReader { + fn read(&mut self, _buf: &mut [u8]) -> std::io::Result { + let status = self.proce.wait()?; + if status.success() { + Ok(0) + } else { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + format_err!("subprocess failed: {:?}", status), + )) + } + } +} pub fn pipe_output( - line_prefix: &str, + _line_prefix: &str, mut cmd: Command, inp: &mut (dyn Read), - oup: &mut (dyn Write + Send), exe_name: &str, help: &str, - cp: fn(line_prefix: &str, &mut dyn Read, &mut dyn Write) -> Result<()>, -) -> Result<()> { +) -> Result { let mut cmd = cmd .stdin(Stdio::piped()) .stdout(Stdio::piped()) .spawn() .map_err(|e| map_exe_error(e, exe_name, help))?; let mut stdi = cmd.stdin.take().expect("is piped"); - let mut stdo = cmd.stdout.take().expect("is piped"); + let stdo = cmd.stdout.take().expect("is piped"); // TODO: how to handle this copying better? // do we really need threads for this? - crossbeam::scope(|s| -> Result<()> { - s.spawn(|_| cp(line_prefix, &mut stdo, oup).unwrap()); // errors? + crossbeam::scope(|_s| -> Result<()> { std::io::copy(inp, &mut stdi)?; drop(stdi); // NEEDED! otherwise deadlock Ok(()) }) .unwrap()?; - let status = cmd.wait()?; - if status.success() { - Ok(()) - } else { - Err(format_err!("subprocess failed: {:?}", status)) - } + Ok(Box::new(stdo.chain(ProcWaitReader { proce: cmd }))) } -impl FileAdapter for T -where - T: SpawningFileAdapter, -{ - fn adapt(&self, ai: AdaptInfo, _detection_reason: &SlowMatcher) -> Result<()> { +impl FileAdapter for SpawningFileAdapter { + fn adapt(&self, ai: AdaptInfo, _detection_reason: &SlowMatcher) -> Result { let AdaptInfo { filepath_hint, mut inp, - oup, line_prefix, .. } = ai; - let cmd = Command::new(self.get_exe()); - let cmd = self.command(filepath_hint, cmd); + + let cmd = Command::new(self.inner.get_exe()); + let cmd = self.inner.command(&filepath_hint, cmd); debug!("executing {:?}", cmd); - pipe_output( - line_prefix, - cmd, - &mut inp, - oup, - self.get_exe(), - "", - Self::postproc, - ) + pipe_output(&line_prefix, cmd, &mut inp, self.inner.get_exe(), "") } } diff --git a/src/adapters/sqlite.rs b/src/adapters/sqlite.rs index 762a5fa..e6218af 100644 --- a/src/adapters/sqlite.rs +++ b/src/adapters/sqlite.rs @@ -5,6 +5,7 @@ use log::*; use rusqlite::types::ValueRef; use rusqlite::*; use std::convert::TryInto; +use writing::{WritingFileAdapter, WritingFileAdapterTrait}; static EXTENSIONS: &[&str] = &["db", "db3", "sqlite", "sqlite3"]; @@ -27,12 +28,12 @@ lazy_static! { }; } -#[derive(Default)] +#[derive(Default, Clone)] pub struct SqliteAdapter; impl SqliteAdapter { - pub fn new() -> SqliteAdapter { - SqliteAdapter + pub fn new() -> WritingFileAdapter { + WritingFileAdapter::new(Box::new(SqliteAdapter {})) } } impl GetMetadata for SqliteAdapter { @@ -58,12 +59,16 @@ fn format_blob(b: ValueRef) -> String { } } -impl FileAdapter for SqliteAdapter { - fn adapt(&self, ai: AdaptInfo, _detection_reason: &SlowMatcher) -> Result<()> { +impl WritingFileAdapterTrait for SqliteAdapter { + fn adapt_write( + &self, + ai: AdaptInfo, + _detection_reason: &SlowMatcher, + oup: &mut dyn Write, + ) -> Result<()> { let AdaptInfo { is_real_file, filepath_hint, - oup, line_prefix, .. } = ai; @@ -116,3 +121,29 @@ impl FileAdapter for SqliteAdapter { Ok(()) } } + +#[cfg(test)] +mod test { + use super::*; + use crate::{test_utils::*}; + use std::{fs::File}; + + #[test] + fn simple() -> Result<()> { + let adapter: Box = Box::new(SqliteAdapter::new()); + let fname = test_data_dir().join("hello.sqlite3"); + let rd = File::open(&fname)?; + let (a, d) = simple_adapt_info(&fname, Box::new(rd)); + let mut res = adapter.adapt(a, &d)?; + + let mut buf = Vec::new(); + res.read_to_end(&mut buf)?; + + assert_eq!( + String::from_utf8(buf)?, + "PREFIX:tbl: greeting='hello', from='sqlite database!'\nPREFIX:tbl2: x=123, y=456.789\n", + ); + + Ok(()) + } +} diff --git a/src/adapters/tesseract.rs b/src/adapters/tesseract.rs index 810feba..88ecd4c 100644 --- a/src/adapters/tesseract.rs +++ b/src/adapters/tesseract.rs @@ -1,6 +1,6 @@ use super::*; use lazy_static::lazy_static; -use spawning::SpawningFileAdapter; +use spawning::{SpawningFileAdapter, SpawningFileAdapterTrait}; use std::process::Command; static EXTENSIONS: &[&str] = &["jpg", "png"]; @@ -33,7 +33,7 @@ impl GetMetadata for TesseractAdapter { &METADATA } } -impl SpawningFileAdapter for TesseractAdapter { +impl SpawningFileAdapterTrait for TesseractAdapter { fn get_exe(&self) -> &str { "tesseract" } diff --git a/src/adapters/writing.rs b/src/adapters/writing.rs new file mode 100644 index 0000000..d0c95da --- /dev/null +++ b/src/adapters/writing.rs @@ -0,0 +1,51 @@ +use super::{FileAdapter, GetMetadata, ReadBox}; +use anyhow::Result; +use std::io::Write; + +#[dyn_clonable::clonable] +pub trait WritingFileAdapterTrait: GetMetadata + Send + Clone { + fn adapt_write( + &self, + a: super::AdaptInfo, + detection_reason: &crate::matching::SlowMatcher, + oup: &mut dyn Write, + ) -> Result<()>; +} + +pub struct WritingFileAdapter { + inner: Box, +} +impl WritingFileAdapter { + pub fn new(inner: Box) -> WritingFileAdapter { + WritingFileAdapter { inner } + } +} + +impl GetMetadata for WritingFileAdapter { + fn metadata(&self) -> &super::AdapterMeta { + self.inner.metadata() + } +} + +impl FileAdapter for WritingFileAdapter { + fn adapt( + &self, + a: super::AdaptInfo, + detection_reason: &crate::matching::SlowMatcher, + ) -> anyhow::Result { + let (r, w) = crate::pipe::pipe(); + let cc = self.inner.clone(); + let detc = detection_reason.clone(); + std::thread::spawn(move || { + let mut oup = w; + let ai = a; + let res = cc.adapt_write(ai, &detc, &mut oup); + if let Err(e) = res { + oup.write_err(std::io::Error::new(std::io::ErrorKind::Other, e)) + .expect("could not write err"); + } + }); + + Ok(Box::new(r)) + } +} diff --git a/src/args.rs b/src/args.rs index 73b1258..ea8a661 100644 --- a/src/args.rs +++ b/src/args.rs @@ -85,7 +85,7 @@ impl FromStr for CacheMaxBlobLen { /// /// 1. describing the command line arguments using structopt+clap and for man page / readme generation /// 2. describing the config file format (output as JSON schema via schemars) -#[derive(StructOpt, Debug, Deserialize, Serialize, JsonSchema, Default)] +#[derive(StructOpt, Debug, Deserialize, Serialize, JsonSchema, Default, Clone)] #[structopt( name = "ripgrep-all", rename_all = "kebab-case", diff --git a/src/bin/rga-preproc.rs b/src/bin/rga-preproc.rs index 10ff557..53a4ace 100644 --- a/src/bin/rga-preproc.rs +++ b/src/bin/rga-preproc.rs @@ -16,7 +16,7 @@ fn main() -> anyhow::Result<()> { std::env::current_dir()?.join(&filepath) }; - let mut i = File::open(&path)?; + let i = File::open(&path)?; let mut o = std::io::stdout(); let cache = if args.no_cache { None @@ -24,14 +24,14 @@ fn main() -> anyhow::Result<()> { Some(rga::preproc_cache::open().context("could not open cache")?) }; let ai = AdaptInfo { - inp: &mut i, - filepath_hint: &path, + inp: Box::new(i), + filepath_hint: path, is_real_file: true, - oup: &mut o, - line_prefix: "", + line_prefix: "".to_string(), archive_recursion_depth: 0, - config: PreprocConfig { cache, args: &args }, + config: PreprocConfig { cache, args }, }; - rga_preproc(ai)?; + let mut oup = rga_preproc(ai)?; + std::io::copy(&mut oup, &mut o).context("copying adapter output to stdout")?; Ok(()) } diff --git a/src/lib.rs b/src/lib.rs index 9aca539..685e382 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,11 +1,16 @@ #![warn(clippy::all)] +#![feature(negative_impls)] +#![feature(specialization)] pub mod adapters; pub mod args; mod caching_writer; pub mod matching; +pub mod pipe; pub mod preproc; pub mod preproc_cache; +#[cfg(test)] +pub mod test_utils; use anyhow::Context; use anyhow::Result; pub use caching_writer::CachingWriter; diff --git a/src/matching.rs b/src/matching.rs index 9256943..90707d4 100644 --- a/src/matching.rs +++ b/src/matching.rs @@ -33,6 +33,12 @@ pub enum SlowMatcher { MimeType(String), } +impl From for SlowMatcher { + fn from(t: FastMatcher) -> Self { + SlowMatcher::Fast(t) + } +} + pub struct FileMeta { // filename is not actually a utf8 string, but since we can't do regex on OsStr and can't get a &[u8] from OsStr either, // and since we probably only want to do only matching on ascii stuff anyways, this is the filename as a string with non-valid bytes removed diff --git a/src/pipe.rs b/src/pipe.rs new file mode 100644 index 0000000..127524d --- /dev/null +++ b/src/pipe.rs @@ -0,0 +1,196 @@ +// https://github.com/arcnmx/pipe-rs/blob/master/src/lib.rs +// extended to support sending io errors + +#![deny(missing_docs)] +#![doc(html_root_url = "https://docs.rs/pipe/0.3.0")] +#![cfg_attr(feature = "unstable-doc-cfg", feature(doc_cfg))] + +//! Synchronous in-memory pipe +//! +//! ## Example +//! +//! ``` +//! use std::thread::spawn; +//! use std::io::{Read, Write}; +//! +//! let (mut read, mut write) = ripgrep_all::pipe::pipe(); +//! +//! let message = "Hello, world!"; +//! spawn(move || write.write_all(message.as_bytes()).unwrap()); +//! +//! let mut s = String::new(); +//! read.read_to_string(&mut s).unwrap(); +//! +//! assert_eq!(&s, message); +//! ``` + +use crossbeam_channel::{Receiver, Sender}; +use std::cmp::min; +use std::io::{self, BufRead, Read, Result, Write}; + +/// The `Read` end of a pipe (see `pipe()`) +pub struct PipeReader { + receiver: Receiver>>, + buffer: Vec, + position: usize, +} + +/// The `Write` end of a pipe (see `pipe()`) +#[derive(Clone)] +pub struct PipeWriter { + sender: Sender>>, +} + +/// Creates a synchronous memory pipe +pub fn pipe() -> (PipeReader, PipeWriter) { + let (sender, receiver) = crossbeam_channel::bounded(0); + + ( + PipeReader { + receiver, + buffer: Vec::new(), + position: 0, + }, + PipeWriter { sender }, + ) +} + +impl PipeWriter { + /// Extracts the inner `SyncSender` from the writer + pub fn into_inner(self) -> Sender>> { + self.sender + } + + /// Write any error into the pipe, will be handled as an IO error + pub fn write_err(&self, e: std::io::Error) -> Result<()> { + self.sender + .send(Err(e)) + .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "pipe reader has been dropped")) + } +} + +impl PipeReader { + /// Extracts the inner `Receiver` from the writer, and any pending buffered data + pub fn into_inner(mut self) -> (Receiver>>, Vec) { + self.buffer.drain(..self.position); + (self.receiver, self.buffer) + } +} + +impl BufRead for PipeReader { + fn fill_buf(&mut self) -> io::Result<&[u8]> { + while self.position >= self.buffer.len() { + match self.receiver.recv() { + // The only existing error is EOF + Err(_) => break, + Ok(Err(e)) => Err(e)?, + Ok(Ok(data)) => { + self.buffer = data; + self.position = 0; + } + } + } + + Ok(&self.buffer[self.position..]) + } + + fn consume(&mut self, amt: usize) { + debug_assert!(self.buffer.len() - self.position >= amt); + self.position += amt + } +} + +impl Read for PipeReader { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + if buf.is_empty() { + return Ok(0); + } + + let internal = self.fill_buf()?; + + let len = min(buf.len(), internal.len()); + if len > 0 { + buf[..len].copy_from_slice(&internal[..len]); + self.consume(len); + } + Ok(len) + } +} + +impl Write for PipeWriter { + fn write(&mut self, buf: &[u8]) -> io::Result { + let data = buf.to_vec(); + + self.sender + .send(Ok(data)) + .map(|_| buf.len()) + .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "pipe reader has been dropped")) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::io::{Read, Write}; + use std::thread::spawn; + + #[test] + fn pipe_reader() { + let i = b"hello there"; + let mut o = Vec::with_capacity(i.len()); + let (mut r, mut w) = pipe(); + let guard = spawn(move || { + w.write_all(&i[..5]).unwrap(); + w.write_all(&i[5..]).unwrap(); + drop(w); + }); + + r.read_to_end(&mut o).unwrap(); + assert_eq!(i, &o[..]); + + guard.join().unwrap(); + } + + #[test] + fn pipe_writer_fail() { + let i = b"hi"; + let (r, mut w) = pipe(); + let guard = spawn(move || { + drop(r); + }); + + assert!(w.write_all(i).is_err()); + + guard.join().unwrap(); + } + + #[test] + fn small_reads() { + let block_cnt = 20; + const BLOCK: usize = 20; + let (mut r, mut w) = pipe(); + let guard = spawn(move || { + for _ in 0..block_cnt { + let data = &[0; BLOCK]; + w.write_all(data).unwrap(); + } + }); + + let mut buff = [0; BLOCK / 2]; + let mut read = 0; + while let Ok(size) = r.read(&mut buff) { + // 0 means EOF + if size == 0 { + break; + } + read += size; + } + assert_eq!(block_cnt * BLOCK, read); + + guard.join().unwrap(); + } +} diff --git a/src/preproc.rs b/src/preproc.rs index 943569b..e45a63a 100644 --- a/src/preproc.rs +++ b/src/preproc.rs @@ -4,20 +4,20 @@ use crate::matching::*; use crate::{print_bytes, print_dur, CachingWriter}; use anyhow::*; use log::*; -use path_clean::PathClean; -use std::convert::TryInto; -use std::io::BufRead; + + + use std::io::BufReader; -use std::io::BufWriter; + use std::{ sync::{Arc, RwLock}, time::Instant, }; #[derive(Clone)] -pub struct PreprocConfig<'a> { +pub struct PreprocConfig { pub cache: Option>>, - pub args: &'a RgaConfig, + pub args: RgaConfig, } /** * preprocess a file as defined in `ai`. @@ -25,19 +25,18 @@ pub struct PreprocConfig<'a> { * If a cache is passed, read/write to it. * */ -pub fn rga_preproc(ai: AdaptInfo) -> Result<()> { +pub fn rga_preproc(ai: AdaptInfo) -> Result { let AdaptInfo { filepath_hint, is_real_file, inp, - oup, line_prefix, config, archive_recursion_depth, .. } = ai; debug!("path (hint) to preprocess: {:?}", filepath_hint); - let PreprocConfig { mut cache, args } = config; + let PreprocConfig { cache: _, args } = config; let filtered_adapters = get_adapters_filtered(args.custom_adapters.clone(), &args.adapters)?; let adapters = adapter_matcher(&filtered_adapters, args.accurate)?; let filename = filepath_hint @@ -45,22 +44,22 @@ pub fn rga_preproc(ai: AdaptInfo) -> Result<()> { .ok_or_else(|| format_err!("Empty filename"))?; debug!("Archive recursion depth: {}", archive_recursion_depth); if archive_recursion_depth >= args.max_archive_recursion.0 { - writeln!(oup, "{}[rga: max archive recursion reached]", line_prefix)?; - return Ok(()); + let s = format!("{}[rga: max archive recursion reached]", line_prefix).into_bytes(); + return Ok(Box::new(std::io::Cursor::new(s))); } // todo: figure out when using a bufreader is a good idea and when it is not // seems to be good for File::open() reads, but not sure about within archives (tar, zip) - let inp = &mut BufReader::with_capacity(1 << 13, inp); + let inp = BufReader::with_capacity(1 << 16, inp); - let mimetype = if args.accurate { - let buf = inp.fill_buf()?; // fill but do not consume! - let mimetype = tree_magic::from_u8(buf); - debug!("mimetype: {:?}", mimetype); - Some(mimetype) - } else { - None - }; + let mimetype = None; /*if args.accurate { + let buf = inp.fill_buf()?; // fill but do not consume! + let mimetype = tree_magic::from_u8(buf); + debug!("mimetype: {:?}", mimetype); + Some(mimetype) + } else { + None + };*/ let adapter = adapters(FileMeta { mimetype, lossy_filename: filename.to_string_lossy().to_string(), @@ -77,8 +76,8 @@ pub fn rga_preproc(ai: AdaptInfo) -> Result<()> { filepath_hint.to_string_lossy(), &meta.name ); - let db_name = format!("{}.v{}", meta.name, meta.version); - if let Some(cache) = cache.as_mut() { + let _db_name = format!("{}.v{}", meta.name, meta.version); + /*if let Some(cache) = cache.as_mut() { let cache_key: Vec = { let clean_path = filepath_hint.to_owned().clean(); let meta = std::fs::metadata(&filepath_hint)?; @@ -160,45 +159,43 @@ pub fn rga_preproc(ai: AdaptInfo) -> Result<()> { }), )?; Ok(()) - } else { - // no cache arc - probably within archive - debug!("adapting without caching..."); - let start = Instant::now(); - adapter - .adapt( - AdaptInfo { - line_prefix, - filepath_hint, - is_real_file, - inp, - oup, - archive_recursion_depth, - config: PreprocConfig { cache: None, args }, - }, - &detection_reason, + } else { */ + // no cache arc - probably within archive + debug!("adapting without caching..."); + let start = Instant::now(); + let oread = adapter + .adapt( + AdaptInfo { + line_prefix, + filepath_hint: filepath_hint.clone(), + is_real_file, + inp: Box::new(inp), + archive_recursion_depth, + config: PreprocConfig { cache: None, args }, + }, + &detection_reason, + ) + .with_context(|| { + format!( + "adapting {} via {} without caching failed", + filepath_hint.to_string_lossy(), + meta.name ) - .with_context(|| { - format!( - "adapting {} via {} without caching failed", - filepath_hint.to_string_lossy(), - meta.name - ) - })?; - debug!( - "running adapter {} took {}", - adapter.metadata().name, - print_dur(start) - ); - Ok(()) - } + })?; + debug!( + "running adapter {} took {}", + adapter.metadata().name, + print_dur(start) + ); + Ok(oread) + /* }*/ } None => { // allow passthrough if the file is in an archive or accurate matching is enabled // otherwise it should have been filtered out by rg pre-glob since rg can handle those better than us let allow_cat = !is_real_file || args.accurate; if allow_cat { - spawning::postproc_line_prefix(line_prefix, inp, oup)?; - Ok(()) + Ok(Box::new(inp)) } else { Err(format_err!( "No adapter found for file {:?}, passthrough disabled.", diff --git a/src/preproc_cache.rs b/src/preproc_cache.rs index 27915c6..b2cb13a 100644 --- a/src/preproc_cache.rs +++ b/src/preproc_cache.rs @@ -10,7 +10,7 @@ use std::{ pub fn open() -> Result>> { Ok(Arc::new(RwLock::new(LmdbCache::open()?))) } -pub trait PreprocCache { +pub trait PreprocCache: Send + Sync { // possible without second lambda? fn get_or_run<'a>( &mut self, diff --git a/src/test_utils.rs b/src/test_utils.rs new file mode 100644 index 0000000..09f03b8 --- /dev/null +++ b/src/test_utils.rs @@ -0,0 +1,33 @@ +use crate::{ + adapters::{AdaptInfo, ReadBox}, + args::RgaConfig, + matching::{FastMatcher, SlowMatcher}, + preproc::PreprocConfig, +}; +use std::{ + path::{Path, PathBuf}, +}; + +pub fn test_data_dir() -> PathBuf { + let mut d = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + d.push("exampledir/test/"); + d +} + +pub fn simple_adapt_info(filepath: &Path, inp: ReadBox) -> (AdaptInfo, SlowMatcher) { + ( + AdaptInfo { + filepath_hint: filepath.to_owned(), + is_real_file: true, + archive_recursion_depth: 0, + inp, + line_prefix: "PREFIX:".to_string(), + config: PreprocConfig { + cache: None, + args: RgaConfig::default(), + }, + }, + FastMatcher::FileExtension(filepath.extension().unwrap().to_string_lossy().into_owned()) + .into(), + ) +}