diff --git a/Cargo.lock b/Cargo.lock index 1b0893d..18a5c9b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -40,6 +40,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "alloc-no-stdlib" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3" + +[[package]] +name = "alloc-stdlib" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece" +dependencies = [ + "alloc-no-stdlib", +] + [[package]] name = "ansi_term" version = "0.12.1" @@ -67,10 +82,18 @@ version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "942c7cd7ae39e91bde4820d74132e9862e62c2f386c3aa90ccf55949f5bad63a" dependencies = [ + "brotli", + "bytes 0.5.6", + "bzip2", + "flate2", "futures-core", + "futures-io", "memchr", - "pin-project-lite", - "tokio", + "pin-project-lite 0.2.9", + "tokio 0.2.25", + "tokio 0.3.7", + "tokio 1.23.0", + "xz2", "zstd", "zstd-safe", ] @@ -143,6 +166,27 @@ dependencies = [ "generic-array 0.14.6", ] +[[package]] +name = "brotli" +version = "3.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1a0b1dbcc8ae29329621f8d4f0d835787c1c38bb1401979b49d13b0b305ff68" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor", +] + +[[package]] +name = "brotli-decompressor" +version = "2.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59ad2d4653bf5ca36ae797b1f4bb4dbddb60ce49ca4aed8a2ce4829f60425b80" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", +] + [[package]] name = "bytecount" version = "0.6.3" @@ -155,6 +199,12 @@ version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" +[[package]] +name = "bytes" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38" + [[package]] name = "bytes" version = "1.3.0" @@ -599,7 +649,7 @@ dependencies = [ "futures-core", "futures-macro", "futures-task", - "pin-project-lite", + "pin-project-lite 0.2.9", "pin-utils", "slab", ] @@ -1100,6 +1150,12 @@ dependencies = [ "indexmap", ] +[[package]] +name = "pin-project-lite" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "257b64915a082f7811703966789728173279bdebb956b143dbcd23f6f970a777" + [[package]] name = "pin-project-lite" version = "0.2.9" @@ -1242,8 +1298,7 @@ dependencies = [ "async-compression", "async-stream", "bincode", - "bytes", - "bzip2", + "bytes 1.3.0", "clap 4.0.32", "crossbeam", "crossbeam-channel", @@ -1255,7 +1310,6 @@ dependencies = [ "encoding_rs", "encoding_rs_io", "env_logger", - "flate2", "glob", "lazy_static", "log", @@ -1274,14 +1328,12 @@ dependencies = [ "structopt", "tar", "tempfile", - "tokio", + "tokio 1.23.0", "tokio-stream", "tokio-test", "tokio-util", "tree_magic_mini", - "xz2", "zip", - "zstd", ] [[package]] @@ -1663,6 +1715,26 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" +[[package]] +name = "tokio" +version = "0.2.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6703a273949a90131b290be1fe7b039d0fc884aa1935860dfcbe056f28cd8092" +dependencies = [ + "bytes 0.5.6", + "pin-project-lite 0.1.12", +] + +[[package]] +name = "tokio" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46409491c9375a693ce7032101970a54f8a2010efb77e13f70788f0d84489e39" +dependencies = [ + "autocfg", + "pin-project-lite 0.2.9", +] + [[package]] name = "tokio" version = "1.23.0" @@ -1670,13 +1742,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eab6d665857cc6ca78d6e80303a02cea7a7851e85dfbd77cbdc09bd129f1ef46" dependencies = [ "autocfg", - "bytes", + "bytes 1.3.0", "libc", "memchr", "mio", "num_cpus", "parking_lot", - "pin-project-lite", + "pin-project-lite 0.2.9", "signal-hook-registry", "socket2", "tokio-macros", @@ -1701,8 +1773,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d660770404473ccd7bc9f8b28494a811bc18542b915c0855c51e8f419d5223ce" dependencies = [ "futures-core", - "pin-project-lite", - "tokio", + "pin-project-lite 0.2.9", + "tokio 1.23.0", "tokio-util", ] @@ -1713,9 +1785,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53474327ae5e166530d17f2d956afcb4f8a004de581b3cae10f12006bc8163e3" dependencies = [ "async-stream", - "bytes", + "bytes 1.3.0", "futures-core", - "tokio", + "tokio 1.23.0", "tokio-stream", ] @@ -1725,15 +1797,15 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740" dependencies = [ - "bytes", + "bytes 1.3.0", "futures-core", "futures-io", "futures-sink", "futures-util", "hashbrown", - "pin-project-lite", + "pin-project-lite 0.2.9", "slab", - "tokio", + "tokio 1.23.0", "tracing", ] @@ -1744,7 +1816,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" dependencies = [ "cfg-if", - "pin-project-lite", + "pin-project-lite 0.2.9", "tracing-core", ] diff --git a/Cargo.toml b/Cargo.toml index bcff273..101f7f3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,11 +17,10 @@ version = "0.9.7-alpha.0" [dependencies] anyhow = "1.0.32" -async-compression = {version = "0.3.15", features = ["tokio", "zstd"]} +async-compression = {version = "0.3.15", features = ["all", "all-algorithms", "tokio"]} async-stream = "0.3.3" bincode = "1.3.1" bytes = "1.2.1" -bzip2 = "0.4.1" clap = {version = "4.0.18", features = ["wrap_help"]} crossbeam = "0.8.1" crossbeam-channel = "0.5.1" @@ -32,7 +31,6 @@ dyn-clone = "1.0.2" encoding_rs = "0.8.24" encoding_rs_io = "0.1.7" env_logger = "0.9.0" -flate2 = "1.0.14" glob = "0.3.0" lazy_static = "1.4.0" log = "0.4.11" @@ -54,9 +52,7 @@ tokio = {version = "1.21.2", features = ["full"]} tokio-stream = {version = "0.1.11", features = ["io-util", "tokio-util"]} tokio-util = {version = "0.7.4", features = ["io", "full"]} tree_magic = {package = "tree_magic_mini", version = "3.0.0"} -xz2 = "0.1.6" zip = "0.6.3" -zstd = "0.11.2" [dev-dependencies] ctor = "0.1.20" diff --git a/src/adapters.rs b/src/adapters.rs index bc5c3ee..ab29664 100644 --- a/src/adapters.rs +++ b/src/adapters.rs @@ -1,5 +1,5 @@ pub mod custom; -// pub mod decompress; +pub mod decompress; // pub mod ffmpeg; pub mod postproc; // pub mod pdfpages; @@ -117,7 +117,7 @@ pub fn get_all_adapters(custom_adapters: Option>) -> Ad Arc::new(PostprocPageBreaks::default()), //Rc::new(ffmpeg::FFmpegAdapter::new()), // Rc::new(zip::ZipAdapter::new()), - //Rc::new(decompress::DecompressAdapter::new()), + Arc::new(decompress::DecompressAdapter::new()), // Rc::new(tar::TarAdapter::new()), //Rc::new(sqlite::SqliteAdapter::new()), // Rc::new(pdfpages::PdfPagesAdapter::new()), diff --git a/src/adapters/decompress.rs b/src/adapters/decompress.rs index e9dcc1e..0ba70fa 100644 --- a/src/adapters/decompress.rs +++ b/src/adapters/decompress.rs @@ -1,9 +1,10 @@ use super::*; -use crate::preproc::rga_preproc; + use anyhow::Result; use lazy_static::lazy_static; +use tokio::io::BufReader; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; static EXTENSIONS: &[&str] = &["tgz", "tbz", "tbz2", "gz", "bz2", "xz", "zst"]; static MIME_TYPES: &[&str] = &[ @@ -49,26 +50,27 @@ impl GetMetadata for DecompressAdapter { } fn decompress_any(reason: &FileMatcher, inp: ReadBox) -> Result { + use async_compression::tokio::bufread; use FastFileMatcher::*; use FileMatcher::*; - let gz = |inp: ReadBox| Box::new(flate2::read::MultiGzDecoder::new(inp)); - let bz2 = |inp: ReadBox| Box::new(bzip2::read::BzDecoder::new(inp)); - let xz = |inp: ReadBox| Box::new(xz2::read::XzDecoder::new_multi_decoder(inp)); - let zst = |inp: ReadBox| zstd::stream::read::Decoder::new(inp); // returns result + let gz = |inp: ReadBox| Box::pin(bufread::GzipDecoder::new(BufReader::new(inp))); + let bz2 = |inp: ReadBox| Box::pin(bufread::BzDecoder::new(BufReader::new(inp))); + let xz = |inp: ReadBox| Box::pin(bufread::XzDecoder::new(BufReader::new(inp))); + let zst = |inp: ReadBox| Box::pin(bufread::ZstdDecoder::new(BufReader::new(inp))); Ok(match reason { Fast(FileExtension(ext)) => match ext.as_ref() { "tgz" | "gz" => gz(inp), "tbz" | "tbz2" | "bz2" => bz2(inp), "xz" => xz(inp), - "zst" => Box::new(zst(inp)?), + "zst" => zst(inp), ext => Err(format_err!("don't know how to decompress {}", ext))?, }, MimeType(mime) => match mime.as_ref() { "application/gzip" => gz(inp), "application/x-bzip" => bz2(inp), "application/x-xz" => xz(inp), - "application/zstd" => Box::new(zst(inp)?), + "application/zstd" => zst(inp), mime => Err(format_err!("don't know how to decompress mime {}", mime))?, }, }) @@ -76,13 +78,13 @@ fn decompress_any(reason: &FileMatcher, inp: ReadBox) -> Result { fn get_inner_filename(filename: &Path) -> PathBuf { let extension = filename .extension() - .map(|e| e.to_string_lossy().to_owned()) + .map(|e| e.to_string_lossy()) .unwrap_or(Cow::Borrowed("")); let stem = filename .file_stem() .expect("no filename given?") .to_string_lossy(); - let new_extension = match extension.to_owned().as_ref() { + let new_extension = match extension.as_ref() { "tgz" | "tbz" | "tbz2" => ".tar", _other => "", }; @@ -90,33 +92,27 @@ fn get_inner_filename(filename: &Path) -> PathBuf { } impl FileAdapter for DecompressAdapter { - fn adapt(&self, ai: AdaptInfo, detection_reason: &FileMatcher) -> Result { - let AdaptInfo { - filepath_hint, - inp, - line_prefix, - archive_recursion_depth, - config, - .. - } = ai; - - let ai2: AdaptInfo = AdaptInfo { - filepath_hint: get_inner_filename(&filepath_hint), + fn adapt(&self, ai: AdaptInfo, detection_reason: &FileMatcher) -> Result { + Ok(Box::pin(tokio_stream::once(AdaptInfo { + filepath_hint: get_inner_filename(&ai.filepath_hint), is_real_file: false, - archive_recursion_depth: archive_recursion_depth + 1, - inp: decompress_any(detection_reason, inp)?, - line_prefix, - config: config.clone(), - }; - rga_preproc(ai2) + archive_recursion_depth: ai.archive_recursion_depth + 1, + inp: decompress_any(detection_reason, ai.inp)?, + line_prefix: ai.line_prefix, + config: ai.config.clone(), + postprocess: ai.postprocess, + }))) } } #[cfg(test)] mod tests { use super::*; + use crate::preproc::loop_adapt; use crate::test_utils::*; - use std::fs::File; + use pretty_assertions::assert_eq; + use tokio::fs::File; + #[test] fn test_inner_filename() { for (a, b) in &[ @@ -132,38 +128,38 @@ mod tests { } } - #[test] - fn gz() -> Result<()> { + #[tokio::test] + async fn gz() -> Result<()> { let adapter = DecompressAdapter; let filepath = test_data_dir().join("hello.gz"); - let (a, d) = simple_adapt_info(&filepath, Box::new(File::open(&filepath)?)); - let mut r = adapter.adapt(a, &d)?; - let mut o = Vec::new(); - r.read_to_end(&mut o)?; + let (a, d) = simple_adapt_info(&filepath, Box::pin(File::open(&filepath).await?)); + let r = adapter.adapt(a, &d)?; + let o = adapted_to_vec(r).await?; assert_eq!(String::from_utf8(o)?, "hello\n"); Ok(()) } - #[test] - fn pdf_gz() -> Result<()> { + #[tokio::test] + async fn pdf_gz() -> Result<()> { let adapter = DecompressAdapter; let filepath = test_data_dir().join("short.pdf.gz"); - let (a, d) = simple_adapt_info(&filepath, Box::new(File::open(&filepath)?)); - let mut r = adapter.adapt(a, &d)?; - let mut o = Vec::new(); - r.read_to_end(&mut o)?; + let (a, d) = simple_adapt_info(&filepath, Box::pin(File::open(&filepath).await?)); + let r = loop_adapt(&adapter, d, a)?; + let o = adapted_to_vec(r).await?; assert_eq!( String::from_utf8(o)?, - "hello world -this is just a test. - -1 - -\u{c}" + "PREFIX:Page 1:hello world +PREFIX:Page 1:this is just a test. +PREFIX:Page 1: +PREFIX:Page 1:1 +PREFIX:Page 1: +PREFIX:Page 1: +PREFIX:Page 2: +" ); Ok(()) }