partial move to async

pull/149/head
phiresky 2 years ago
parent 94f06fba37
commit cde0e209d2

272
Cargo.lock generated

@ -70,6 +70,41 @@ version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4c527152e37cf757a3f78aae5a06fbeefdb07ccc535c980a3208ee3060dd544"
[[package]]
name = "async-compression"
version = "0.3.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "942c7cd7ae39e91bde4820d74132e9862e62c2f386c3aa90ccf55949f5bad63a"
dependencies = [
"futures-core",
"memchr",
"pin-project-lite",
"tokio",
"zstd",
"zstd-safe",
]
[[package]]
name = "async-stream"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dad5c83079eae9969be7fadefe640a1c566901f05ff91ab221de4b6f68d9507e"
dependencies = [
"async-stream-impl",
"futures-core",
]
[[package]]
name = "async-stream-impl"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10f203db73a71dfa2fb6dd22763990fa26f3d2625a6da2da900d23b87d26be27"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "atty"
version = "0.2.14"
@ -135,6 +170,12 @@ version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "bytes"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec8a7b6a70fde80372154c65702f00a0f56f3e1c36abbc6c440484be248856db"
[[package]]
name = "bzip2"
version = "0.4.3"
@ -598,6 +639,55 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "futures-core"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04909a7a7e4633ae6c4a9ab280aeb86da1236243a77b694a49eacd659a4bd3ac"
[[package]]
name = "futures-io"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb"
[[package]]
name = "futures-macro"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bdfb8ce053d86b91919aad980c220b1fb8401a9394410e1c289ed7e66b61835d"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39c15cf1a4aa79df40f1bb462fb39676d0ad9e366c2a33b590d7c66f4f81fcf9"
[[package]]
name = "futures-task"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ffb393ac5d9a6eaa9d3fdf37ae2776656b706e200c8e16b1bdb227f5198e6ea"
[[package]]
name = "futures-util"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "197676987abd2f9cadff84926f410af1c183608d36641465df73ae8211dc65d6"
dependencies = [
"futures-core",
"futures-macro",
"futures-task",
"pin-project-lite",
"pin-utils",
"slab",
]
[[package]]
name = "generic-array"
version = "0.12.4"
@ -845,6 +935,16 @@ dependencies = [
"pkg-config",
]
[[package]]
name = "lock_api"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "435011366fe56583b16cf956f9df0095b405b82d76425bc8981c0e22e60ec4df"
dependencies = [
"autocfg",
"scopeguard",
]
[[package]]
name = "log"
version = "0.4.17"
@ -895,6 +995,18 @@ dependencies = [
"adler",
]
[[package]]
name = "mio"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5d732bc30207a6423068df043e3d02e0735b155ad7ce1a6f76fe2baa5b158de"
dependencies = [
"libc",
"log",
"wasi 0.11.0+wasi-snapshot-preview1",
"windows-sys 0.42.0",
]
[[package]]
name = "nom"
version = "7.1.1"
@ -969,6 +1081,16 @@ dependencies = [
"autocfg",
]
[[package]]
name = "num_cpus"
version = "1.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1"
dependencies = [
"hermit-abi",
"libc",
]
[[package]]
name = "num_threads"
version = "0.1.6"
@ -1014,6 +1136,29 @@ dependencies = [
"stable_deref_trait",
]
[[package]]
name = "parking_lot"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
dependencies = [
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dc9e0dc2adc1c69d09143aff38d3d30c5c3f0df0dad82e6d25547af174ebec0"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"smallvec",
"windows-sys 0.42.0",
]
[[package]]
name = "password-hash"
version = "0.4.2"
@ -1065,6 +1210,18 @@ dependencies = [
"indexmap",
]
[[package]]
name = "pin-project-lite"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116"
[[package]]
name = "pin-utils"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pkg-config"
version = "0.3.26"
@ -1180,7 +1337,10 @@ name = "ripgrep_all"
version = "0.9.7-alpha.0"
dependencies = [
"anyhow",
"async-compression",
"async-stream",
"bincode",
"bytes",
"bzip2",
"chrono",
"clap 4.0.18",
@ -1213,6 +1373,9 @@ dependencies = [
"structopt",
"tar",
"tempfile",
"tokio",
"tokio-stream",
"tokio-util",
"tree_magic_mini",
"xz2",
"zip",
@ -1392,6 +1555,15 @@ dependencies = [
"digest",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0"
dependencies = [
"libc",
]
[[package]]
name = "size_format"
version = "1.0.2"
@ -1402,12 +1574,31 @@ dependencies = [
"num",
]
[[package]]
name = "slab"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4614a76b2a8be0058caa9dbbaf66d988527d86d003c11a94fbd335d7661edcef"
dependencies = [
"autocfg",
]
[[package]]
name = "smallvec"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0"
[[package]]
name = "socket2"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "02e2d2db9033d13a1567121ddd7a095ee144db4e1ca1b1bda3419bc0da294ebd"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "stable_deref_trait"
version = "1.2.0"
@ -1595,6 +1786,87 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]]
name = "tokio"
version = "1.21.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9e03c497dc955702ba729190dc4aac6f2a0ce97f913e5b1b5912fc5039d9099"
dependencies = [
"autocfg",
"bytes",
"libc",
"memchr",
"mio",
"num_cpus",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"winapi",
]
[[package]]
name = "tokio-macros"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9724f9a975fb987ef7a3cd9be0350edcbe130698af5b8f7a631e23d42d052484"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tokio-stream"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d660770404473ccd7bc9f8b28494a811bc18542b915c0855c51e8f419d5223ce"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
"tokio-util",
]
[[package]]
name = "tokio-util"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740"
dependencies = [
"bytes",
"futures-core",
"futures-io",
"futures-sink",
"futures-util",
"hashbrown",
"pin-project-lite",
"slab",
"tokio",
"tracing",
]
[[package]]
name = "tracing"
version = "0.1.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
dependencies = [
"cfg-if",
"pin-project-lite",
"tracing-core",
]
[[package]]
name = "tracing-core"
version = "0.1.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24eb03ba0eab1fd845050058ce5e616558e8f8d8fca633e6b163fe25c797213a"
dependencies = [
"once_cell",
]
[[package]]
name = "tree_magic_mini"
version = "3.0.3"

@ -53,6 +53,12 @@ dyn-clone = "1.0.2"
dyn-clonable = "0.9.0"
zip = "0.6.3"
owning_ref = "0.4.1"
tokio = { version = "1.21.2", features = ["full"] }
async-compression = { version = "0.3.15", features = ["tokio", "zstd"] }
tokio-stream = { version = "0.1.11", features = ["io-util", "tokio-util"] }
async-stream = "0.3.3"
bytes = "1.2.1"
tokio-util = { version = "0.7.4", features = ["io", "full"] }
[dev-dependencies]
ctor = "0.1.20"

@ -1,28 +1,29 @@
pub mod custom;
// pub mod decompress;
// pub mod ffmpeg;
pub mod postproc;
// pub mod postproc;
// pub mod pdfpages;
pub mod spawning;
// pub mod sqlite;
// pub mod tar;
// pub mod tesseract;
// pub mod writing;
pub mod zip;
// pub mod zip;
use crate::{adapted_iter::AdaptedFilesIterBox, config::RgaConfig, matching::*};
use anyhow::*;
use custom::builtin_spawning_adapters;
use custom::CustomAdapterConfig;
use log::*;
use tokio::io::AsyncRead;
use std::borrow::Cow;
use std::collections::HashMap;
use std::io::prelude::*;
use std::iter::Iterator;
use std::path::PathBuf;
use std::pin::Pin;
use std::rc::Rc;
pub type ReadBox<'a> = Box<dyn Read + 'a>;
pub type ReadBox<'a> = Pin<Box<dyn AsyncRead + 'a>>;
pub struct AdapterMeta {
/// unique short name of this adapter (a-z0-9 only)
pub name: String,
@ -115,7 +116,7 @@ pub fn get_all_adapters(custom_adapters: Option<Vec<CustomAdapterConfig>>) -> Ad
let internal_adapters: Vec<Rc<dyn FileAdapter>> = vec![
//Rc::new(ffmpeg::FFmpegAdapter::new()),
Rc::new(zip::ZipAdapter::new()),
// Rc::new(zip::ZipAdapter::new()),
//Rc::new(decompress::DecompressAdapter::new()),
// Rc::new(tar::TarAdapter::new()),
//Rc::new(sqlite::SqliteAdapter::new()),

@ -166,8 +166,8 @@ impl SpawningFileAdapterTrait for CustomSpawningFileAdapter {
fn command(
&self,
filepath_hint: &std::path::Path,
mut command: std::process::Command,
) -> Result<std::process::Command> {
mut command: tokio::process::Command,
) -> Result<tokio::process::Command> {
command.args(
self.args
.iter()
@ -218,10 +218,10 @@ mod test {
use super::*;
use crate::test_utils::*;
use anyhow::Result;
use std::fs::File;
use tokio::fs::File;
#[test]
fn poppler() -> Result<()> {
#[tokio::test]
async fn poppler() -> Result<()> {
let adapter = builtin_spawning_adapters
.iter()
.find(|e| e.name == "poppler")
@ -231,7 +231,7 @@ mod test {
let filepath = test_data_dir().join("short.pdf");
let (a, d) = simple_adapt_info(&filepath, Box::new(File::open(&filepath)?));
let (a, d) = simple_adapt_info(&filepath, Box::pin(File::open(&filepath).await?));
let r = adapter.adapt(a, &d)?;
let o = adapted_to_vec(r)?;
assert_eq!(

@ -5,10 +5,10 @@
use anyhow::Context;
use anyhow::Result;
use encoding_rs_io::DecodeReaderBytesBuilder;
use tokio::io::AsyncRead;
use std::{
cmp::min,
io::{BufRead, BufReader, Read},
};
use crate::adapted_iter::{AdaptedFilesIterBox, SingleAdaptedFileAsIter};
@ -16,11 +16,11 @@ use crate::adapted_iter::{AdaptedFilesIterBox, SingleAdaptedFileAsIter};
use super::{AdaptInfo, AdapterMeta, FileAdapter, GetMetadata};
/** pass through, except adding \n at the end */
pub struct EnsureEndsWithNewline<R: Read> {
pub struct EnsureEndsWithNewline<R: AsyncRead> {
inner: R,
added_newline: bool,
}
impl<R: Read> EnsureEndsWithNewline<R> {
impl<R: AsyncRead> EnsureEndsWithNewline<R> {
pub fn new(r: R) -> EnsureEndsWithNewline<R> {
EnsureEndsWithNewline {
inner: r,
@ -28,8 +28,8 @@ impl<R: Read> EnsureEndsWithNewline<R> {
}
}
}
impl<R: Read> Read for EnsureEndsWithNewline<R> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
impl<R: AsyncRead> AsyncRead for EnsureEndsWithNewline<R> {
fn poll_read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
match self.inner.read(buf) {
Ok(0) => {
if self.added_newline {
@ -47,7 +47,7 @@ impl<R: Read> Read for EnsureEndsWithNewline<R> {
}
struct ByteReplacer<R>
where
R: Read,
R:AsyncRead,
{
inner: R,
next_read: Vec<u8>,
@ -57,7 +57,7 @@ where
impl<R> ByteReplacer<R>
where
R: Read,
R: AsyncRead,
{
fn output_next(&mut self, buf: &mut [u8], buf_valid_until: usize, replacement: &[u8]) -> usize {
let after_part1 = Vec::from(&buf[1..buf_valid_until]);
@ -80,11 +80,11 @@ where
}
}
impl<R> Read for ByteReplacer<R>
impl<R> AsyncRead for ByteReplacer<R>
where
R: Read,
R: AsyncRead,
{
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
fn poll_read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let read = if self.next_read.len() > 0 {
let count = std::cmp::min(self.next_read.len(), buf.len());
buf[0..count].copy_from_slice(&self.next_read[0..count]);
@ -158,10 +158,10 @@ impl Read for ReadErr {
}
}*/
pub fn postproc_encoding<'a, R: Read + 'a>(
pub fn postproc_encoding<'a, R: AsyncRead + 'a>(
line_prefix: &str,
inp: R,
) -> Result<Box<dyn Read + 'a>> {
) -> Result<Box<dyn AsyncRead + 'a>> {
// TODO: parse these options from ripgrep's configuration
let encoding = None; // detect bom but usually assume utf8
let bom_sniffing = true;
@ -202,7 +202,7 @@ pub fn postproc_encoding<'a, R: Read + 'a>(
))
}
pub fn postproc_prefix(line_prefix: &str, inp: impl Read) -> impl Read {
pub fn postproc_prefix(line_prefix: &str, inp: impl AsyncRead) -> impl AsyncRead {
let line_prefix = line_prefix.to_string(); // clone since we need it later
ByteReplacer {
inner: inp,
@ -212,7 +212,7 @@ pub fn postproc_prefix(line_prefix: &str, inp: impl Read) -> impl Read {
}
}
pub fn postproc_pagebreaks(line_prefix: &str, inp: impl Read) -> impl Read {
pub fn postproc_pagebreaks(line_prefix: &str, inp: impl AsyncRead) -> impl AsyncRead {
let line_prefix = line_prefix.to_string(); // clone since
let mut page_count = 1;

@ -1,14 +1,17 @@
use crate::adapted_iter::SingleAdaptedFileAsIter;
use super::*;
use anyhow::Result;
use log::*;
use std::process::Command;
use std::process::{Child, Stdio};
use std::{io::prelude::*, path::Path};
use crate::adapters::FileAdapter;
use std::future::Future;
use std::path::Path;
use std::process::{ExitStatus, Stdio};
use std::task::Poll;
use tokio::io::AsyncReadExt;
use tokio::process::Child;
use tokio::process::Command;
// TODO: don't separate the trait and the struct
pub trait SpawningFileAdapterTrait: GetMetadata {
@ -50,11 +53,29 @@ pub fn map_exe_error(err: std::io::Error, exe_name: &str, help: &str) -> Error {
/** waits for a process to finish, returns an io error if the process failed */
struct ProcWaitReader {
proce: Child,
proce: Pin<Box<dyn Future<Output = std::io::Result<ExitStatus>>>>,
}
impl Read for ProcWaitReader {
fn read(&mut self, _buf: &mut [u8]) -> std::io::Result<usize> {
let status = self.proce.wait()?;
impl AsyncRead for ProcWaitReader {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
match self.proce.as_mut().poll(cx) {
std::task::Poll::Ready(x) => {
let x = x?;
if x.success() {
Poll::Ready(std::io::Result::Ok(()))
} else {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::Other,
format_err!("subprocess failed: {:?}", x),
)))
}
}
Poll::Pending => std::task::Poll::Pending,
}
/*let status = self.proce.wait();
if status.success() {
std::io::Result::Ok(0)
} else {
@ -62,13 +83,13 @@ impl Read for ProcWaitReader {
std::io::ErrorKind::Other,
format_err!("subprocess failed: {:?}", status),
))
}
}*/
}
}
pub fn pipe_output<'a>(
_line_prefix: &str,
mut cmd: Command,
inp: &mut (dyn Read + 'a),
inp: ReadBox<'a>,
exe_name: &str,
help: &str,
) -> Result<ReadBox<'a>> {
@ -81,10 +102,14 @@ pub fn pipe_output<'a>(
let stdo = cmd.stdout.take().expect("is piped");
// TODO: deadlocks since this is run in the same thread as the thing reading from stdout of the process
std::io::copy(inp, &mut stdi)?;
drop(stdi);
Ok(Box::new(stdo.chain(ProcWaitReader { proce: cmd })))
tokio::task::spawn_local(async {
tokio::pin!(inp);
tokio::io::copy(&mut inp, &mut stdi).await;
});
Ok(Box::pin(stdo.chain(ProcWaitReader {
proce: Box::pin(cmd.wait()),
})))
}
impl FileAdapter for SpawningFileAdapter {
@ -109,7 +134,7 @@ impl FileAdapter for SpawningFileAdapter {
.command(&filepath_hint, cmd)
.with_context(|| format!("Could not set cmd arguments for {}", self.inner.get_exe()))?;
debug!("executing {:?}", cmd);
let output = pipe_output(&line_prefix, cmd, &mut inp, self.inner.get_exe(), "")?;
let output = pipe_output(&line_prefix, cmd, inp, self.inner.get_exe(), "")?;
Ok(Box::new(SingleAdaptedFileAsIter::new(AdaptInfo {
filepath_hint: PathBuf::from(format!("{}.txt", filepath_hint.to_string_lossy())), // TODO: customizable
inp: output,
@ -122,14 +147,16 @@ impl FileAdapter for SpawningFileAdapter {
}
}
#[cfg(test)]
mod test {
use std::io::Cursor;
use crate::{adapters::custom::CustomAdapterConfig, test_utils::{adapted_to_vec, simple_adapt_info}};
use super::*;
use crate::adapters::FileAdapter;
use crate::{
adapters::custom::CustomAdapterConfig,
test_utils::{adapted_to_vec, simple_adapt_info},
};
#[test]
fn streaming() {
@ -143,7 +170,7 @@ mod test {
mimetypes: None,
match_only_by_mime: None,
binary: "sed".to_string(),
args: vec!["s/e/u/g".to_string()]
args: vec!["s/e/u/g".to_string()],
};
let adapter = adapter.to_adapter();
@ -159,10 +186,13 @@ mod test {
let input = format!("{0}{0}{0}{0}", input);
let input = format!("{0}{0}{0}{0}", input);
let input = format!("{0}{0}{0}{0}", input);
let (a, d) = simple_adapt_info(&Path::new("foo.txt"), Box::new(Cursor::new(input.as_bytes())));
let (a, d) = simple_adapt_info(
&Path::new("foo.txt"),
Box::new(Cursor::new(input.as_bytes())),
);
let output = adapter.adapt(a, &d).unwrap();
let oup = adapted_to_vec(output).unwrap();
println!("output: {}", String::from_utf8_lossy(&oup));
}
}
}

@ -85,7 +85,7 @@ impl<'a> AdaptedFilesIter for ZipAdaptIter<'a> {
#[cfg(test)]
mod test {
use super::*;
use crate::{recurse::RecursingConcattyReader, test_utils::*};
use crate::test_utils::*;
fn create_zip(fname: &str, content: &str, add_inner: bool) -> Result<Vec<u8>> {
use ::zip::write::FileOptions;

@ -1,20 +1,24 @@
use std::{pin::Pin, task::Poll};
use anyhow::Result;
use log::*;
use std::io::{Read, Write};
use tokio::{io::{AsyncRead, AsyncWriteExt, AsyncWrite}, pin};
use async_compression::tokio::write::ZstdEncoder;
/**
* wrap a writer so that it is passthrough,
* but also the written data is compressed and written into a buffer,
* unless more than max_cache_size bytes is written, then the cache is dropped and it is pure passthrough.
*/
pub struct CachingReader<R: Read> {
pub struct CachingReader<R: AsyncRead> {
max_cache_size: usize,
zstd_writer: Option<zstd::stream::write::Encoder<'static, Vec<u8>>>,
inp: R,
// set to none if the size goes over the limit
zstd_writer: Option<ZstdEncoder<Vec<u8>>>,
inp: Pin<Box<R>>,
bytes_written: u64,
on_finish: Box<dyn FnOnce((u64, Option<Vec<u8>>)) -> Result<()> + Send>,
}
impl<R: Read> CachingReader<R> {
impl<R: AsyncRead> CachingReader<R> {
pub fn new(
inp: R,
max_cache_size: usize,
@ -22,56 +26,78 @@ impl<R: Read> CachingReader<R> {
on_finish: Box<dyn FnOnce((u64, Option<Vec<u8>>)) -> Result<()> + Send>,
) -> Result<CachingReader<R>> {
Ok(CachingReader {
inp,
inp: Box::pin(inp),
max_cache_size,
zstd_writer: Some(zstd::stream::write::Encoder::new(
zstd_writer: Some(ZstdEncoder::with_quality(
Vec::new(),
compression_level,
)?),
async_compression::Level::Precise(compression_level as u32),
)),
bytes_written: 0,
on_finish,
})
}
pub fn finish(&mut self) -> std::io::Result<(u64, Option<Vec<u8>>)> {
pub fn finish(&mut self, cx: &mut std::task::Context<'_>) -> std::io::Result<(u64, Option<Vec<u8>>)> {
if let Some(writer) = self.zstd_writer.take() {
let res = writer.finish()?;
pin!(writer);
writer.as_mut().poll_shutdown(cx)?;
let res = writer.into_inner();
if res.len() <= self.max_cache_size {
return Ok((self.bytes_written, Some(res)));
}
}
Ok((self.bytes_written, None))
}
fn write_to_compressed(&mut self, buf: &[u8]) -> std::io::Result<()> {
async fn write_to_compressed(&mut self, buf: &[u8]) -> std::io::Result<()> {
if let Some(writer) = self.zstd_writer.as_mut() {
let wrote = writer.write(buf)?;
let wrote = writer.write_all(buf).await?;
let compressed_len = writer.get_ref().len();
trace!("wrote {} to zstd, len now {}", wrote, compressed_len);
trace!("wrote {} to zstd, len now {}", buf.len(), compressed_len);
if compressed_len > self.max_cache_size {
debug!("cache longer than max, dropping");
//writer.finish();
self.zstd_writer.take().unwrap().finish()?;
self.zstd_writer.take();
}
}
Ok(())
}
}
impl<R: Read> Read for CachingReader<R> {
fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result<usize> {
match self.inp.read(&mut buf) {
Ok(0) => {
// move out of box, replace with noop lambda
let on_finish = std::mem::replace(&mut self.on_finish, Box::new(|_| Ok(())));
// EOF, finish!
(on_finish)(self.finish()?)
.map(|()| 0)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
impl<R> AsyncRead for CachingReader<R> where R: AsyncRead {
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
let old_filled = buf.filled();
match self.inp.as_mut().poll_read(cx, &mut buf) {
/*Ok(0) => {
}
Ok(read_bytes) => {
self.write_to_compressed(&buf[0..read_bytes])?;
self.bytes_written += read_bytes as u64;
Ok(read_bytes)
}
Err(e) => Err(e),
}*/
Poll::Ready(rdy) => {
if let Ok(()) = &rdy {
let slice = buf.filled();
let read_bytes = slice.len() - old_filled.len();
if read_bytes == 0 {
// EOF
// move out of box, replace with noop lambda
let on_finish = std::mem::replace(&mut self.on_finish, Box::new(|_| Ok(())));
// EOF, finish!
(on_finish)(self.finish(cx)?)
.map(|()| 0)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
} else {
self.write_to_compressed(&slice[old_filled.len()..]);
self.bytes_written += read_bytes as u64;
}
}
Poll::Ready(rdy)
},
Poll::Pending => Poll::Pending,
}
}
}

@ -1,4 +1,6 @@
use crate::adapters::*;
use crate::config::RgaConfig;
use crate::recurse::concat_read_streams;
use crate::{matching::*, recurse::RecursingConcattyReader};
use crate::{
preproc_cache::{LmdbCache, PreprocCache},
@ -7,47 +9,32 @@ use crate::{
use anyhow::*;
use log::*;
use path_clean::PathClean;
use postproc::PostprocPrefix;
// use postproc::PostprocPrefix;
use std::convert::TryInto;
use std::io::{BufRead, BufReader};
use std::path::Path;
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
use tokio::io::{AsyncBufRead, AsyncRead};
use std::{rc::Rc, time::Instant};
/**
* preprocess a file as defined in `ai`.
*
* If a cache is passed, read/write to it.
*
*/
pub fn rga_preproc(ai: AdaptInfo) -> Result<ReadBox> {
let AdaptInfo {
filepath_hint,
is_real_file,
inp,
line_prefix,
config,
archive_recursion_depth,
postprocess,
} = ai;
debug!("path (hint) to preprocess: {:?}", filepath_hint);
let filtered_adapters =
get_adapters_filtered(config.custom_adapters.clone(), &config.adapters)?;
let adapters = adapter_matcher(&filtered_adapters, config.accurate)?;
type ActiveAdapters = Vec<(Rc<dyn FileAdapter>)>;
async fn choose_adapter(
config: &RgaConfig,
filepath_hint: &Path,
archive_recursion_depth: i32,
mut inp: &mut (impl AsyncBufRead + Unpin),
) -> Result<Option<(Rc<dyn FileAdapter>, FileMatcher, ActiveAdapters)>> {
let active_adapters = get_adapters_filtered(config.custom_adapters.clone(), &config.adapters)?;
let adapters = adapter_matcher(&active_adapters, config.accurate)?;
let filename = filepath_hint
.file_name()
.ok_or_else(|| format_err!("Empty filename"))?;
debug!("Archive recursion depth: {}", archive_recursion_depth);
if archive_recursion_depth >= config.max_archive_recursion.0 {
let s = format!("{}[rga: max archive recursion reached]", line_prefix).into_bytes();
return Ok(Box::new(std::io::Cursor::new(s)));
}
// todo: figure out when using a bufreader is a good idea and when it is not
// seems to be good for File::open() reads, but not sure about within archives (tar, zip)
let mut inp = BufReader::with_capacity(1 << 16, inp);
let mimetype = if config.accurate {
let buf = inp.fill_buf()?; // fill but do not consume!
let buf = inp.fill_buf().await?; // fill but do not consume!
let mimetype = tree_magic::from_u8(buf);
debug!("mimetype: {:?}", mimetype);
Some(mimetype)
@ -58,52 +45,105 @@ pub fn rga_preproc(ai: AdaptInfo) -> Result<ReadBox> {
mimetype,
lossy_filename: filename.to_string_lossy().to_string(),
});
let (adapter, detection_reason) = match adapter {
Some((a, d)) => (a, d),
Ok(adapter.map(|e| (e.0, e.1, active_adapters)))
}
/**
* preprocess a file as defined in `ai`.
*
* If a cache is passed, read/write to it.
*
*/
pub async fn rga_preproc(ai: AdaptInfo<'_>) -> Result<ReadBox<'_>> {
debug!("path (hint) to preprocess: {:?}", ai.filepath_hint);
/*todo: move if archive_recursion_depth >= config.max_archive_recursion.0 {
let s = format!("{}[rga: max archive recursion reached]", line_prefix).into_bytes();
return Ok(Box::new(std::io::Cursor::new(s)));
}*/
// todo: figure out when using a bufreader is a good idea and when it is not
// seems to be good for File::open() reads, but not sure about within archives (tar, zip)
let mut inp = BufReader::with_capacity(1 << 16, ai.inp);
let adapter = choose_adapter(
&ai.config,
&ai.filepath_hint,
ai.archive_recursion_depth,
&mut inp,
)
.await?;
let (adapter, detection_reason, active_adapters) = match adapter {
Some((a, d, e)) => (a, d, e),
None => {
// allow passthrough if the file is in an archive or accurate matching is enabled
// otherwise it should have been filtered out by rg pre-glob since rg can handle those better than us
let allow_cat = !is_real_file || config.accurate;
let allow_cat = !ai.is_real_file || ai.config.accurate;
if allow_cat {
if postprocess {
(
if ai.postprocess {
panic!("not implemented");
/* (
Rc::new(PostprocPrefix {}) as Rc<dyn FileAdapter>,
FileMatcher::Fast(FastFileMatcher::FileExtension("default".to_string())), // todo: separate enum value for this
)
)*/
} else {
return Ok(Box::new(inp));
return Ok(Box::pin(inp));
}
} else {
return Err(format_err!(
"No adapter found for file {:?}, passthrough disabled.",
filename
ai.filepath_hint
.file_name()
.ok_or_else(|| format_err!("Empty filename"))?
));
}
}
};
let path_hint_copy = filepath_hint.clone();
run_adapter(
let path_hint_copy = ai.filepath_hint.clone();
run_adapter_recursively(
AdaptInfo {
filepath_hint,
is_real_file,
inp: Box::new(inp),
line_prefix,
config,
archive_recursion_depth,
postprocess,
inp: Box::pin(inp),
..ai
},
adapter,
detection_reason,
&filtered_adapters,
active_adapters,
)
.await
.with_context(|| format!("run_adapter({})", &path_hint_copy.to_string_lossy()))
}
fn run_adapter<'a>(
fn compute_cache_key(
filepath_hint: &Path,
adapter: &dyn FileAdapter,
active_adapters: ActiveAdapters,
) -> Result<Vec<u8>> {
let clean_path = filepath_hint.to_owned().clean();
let meta = std::fs::metadata(&filepath_hint)
.with_context(|| format!("reading metadata for {}", filepath_hint.to_string_lossy()))?;
let modified = meta.modified().expect("weird OS that can't into mtime");
if adapter.metadata().recurses {
let active_adapters_cache_key = active_adapters
.iter()
.map(|a| (a.metadata().name.clone(), a.metadata().version))
.collect::<Vec<_>>();
let key = (active_adapters_cache_key, clean_path, modified);
debug!("Cache key (with recursion): {:?}", key);
bincode::serialize(&key).context("could not serialize path")
} else {
let key = (
adapter.metadata().name.clone(),
adapter.metadata().version,
clean_path,
modified,
);
debug!("Cache key (no recursion): {:?}", key);
bincode::serialize(&key).context("could not serialize path")
}
}
async fn run_adapter_recursively<'a>(
ai: AdaptInfo<'a>,
adapter: Rc<dyn FileAdapter>,
detection_reason: FileMatcher,
filtered_adapters: &Vec<Rc<dyn FileAdapter>>,
active_adapters: ActiveAdapters,
) -> Result<ReadBox<'a>> {
let AdaptInfo {
filepath_hint,
@ -134,116 +174,56 @@ fn run_adapter<'a>(
None
};
if let Some(mut cache) = cache {
let cache_key: Vec<u8> = {
let clean_path = filepath_hint.to_owned().clean();
let meta = std::fs::metadata(&filepath_hint).with_context(|| {
format!("reading metadata for {}", filepath_hint.to_string_lossy())
})?;
let modified = meta.modified().expect("weird OS that can't into mtime");
if adapter.metadata().recurses {
let key = (
filtered_adapters
.iter()
.map(|a| (a.metadata().name.clone(), a.metadata().version))
.collect::<Vec<_>>(),
clean_path,
modified,
);
debug!("Cache key (with recursion): {:?}", key);
bincode::serialize(&key).expect("could not serialize path")
} else {
let key = (
adapter.metadata().name.clone(),
adapter.metadata().version,
clean_path,
modified,
);
debug!("Cache key (no recursion): {:?}", key);
bincode::serialize(&key).expect("could not serialize path")
}
};
// let dbg_ctx = format!("adapter {}", &adapter.metadata().name);
let cached = cache.get(&db_name, &cache_key)?;
match cached {
Some(cached) => Ok(Box::new(
zstd::stream::read::Decoder::new(std::io::Cursor::new(cached))
.context("could not create zstd decoder")?,
)),
None => {
debug!("cache MISS, running adapter");
debug!("adapting with caching...");
let inp = adapter
.adapt(
AdaptInfo {
line_prefix,
filepath_hint: filepath_hint.clone(),
is_real_file,
inp: Box::new(inp),
archive_recursion_depth,
config,
postprocess,
},
&detection_reason,
let mut cache = cache.context("No cache?")?;
let cache_key: Vec<u8> = compute_cache_key(&filepath_hint, adapter.as_ref(), active_adapters)?;
// let dbg_ctx = format!("adapter {}", &adapter.metadata().name);
let cached = cache.get(&db_name, &cache_key)?;
match cached {
Some(cached) => Ok(Box::pin(
async_compression::tokio::bufread::ZstdDecoder::new(std::io::Cursor::new(cached)),
)),
None => {
debug!("cache MISS, running adapter");
debug!("adapting with caching...");
let inp = adapter
.adapt(
AdaptInfo {
line_prefix,
filepath_hint: filepath_hint.clone(),
is_real_file,
inp,
archive_recursion_depth,
config,
postprocess,
},
&detection_reason,
)
.with_context(|| {
format!(
"adapting {} via {} failed",
filepath_hint.to_string_lossy(),
meta.name
)
.with_context(|| {
format!(
"adapting {} via {} failed",
filepath_hint.to_string_lossy(),
meta.name
)
})?;
let inp = RecursingConcattyReader::concat(inp)?;
let inp = CachingReader::new(
inp,
cache_max_blob_len.0.try_into().unwrap(),
cache_compression_level.0.try_into().unwrap(),
Box::new(move |(uncompressed_size, compressed)| {
debug!(
"uncompressed output: {}",
print_bytes(uncompressed_size as f64)
);
if let Some(cached) = compressed {
debug!("compressed output: {}", print_bytes(cached.len() as f64));
cache.set(&db_name, &cache_key, &cached)?
}
Ok(())
}),
)?;
})?;
let inp = concat_read_streams(inp);
let inp = CachingReader::new(
inp,
cache_max_blob_len.0.try_into().unwrap(),
cache_compression_level.0.try_into().unwrap(),
Box::new(move |(uncompressed_size, compressed)| {
debug!(
"uncompressed output: {}",
print_bytes(uncompressed_size as f64)
);
if let Some(cached) = compressed {
debug!("compressed output: {}", print_bytes(cached.len() as f64));
cache.set(&db_name, &cache_key, &cached)?
}
Ok(())
}),
)?;
Ok(Box::new(inp))
}
Ok(Box::pin(inp))
}
} else {
// no cache arc - probably within archive
debug!("adapting without caching...");
let start = Instant::now();
let oread = adapter
.adapt(
AdaptInfo {
line_prefix,
filepath_hint: filepath_hint.clone(),
is_real_file,
inp,
archive_recursion_depth,
config,
postprocess,
},
&detection_reason,
)
.with_context(|| {
format!(
"adapting {} via {} without caching failed",
filepath_hint.to_string_lossy(),
meta.name
)
})?;
debug!(
"running adapter {} took {}",
adapter.metadata().name,
print_dur(start)
);
Ok(RecursingConcattyReader::concat(oread)?)
}
}

@ -1,14 +1,33 @@
use tokio::io::AsyncRead;
use tokio_util::io::{ReaderStream, StreamReader};
use crate::preproc::rga_preproc;
use crate::{adapted_iter::AdaptedFilesIterBox, adapters::*};
use std::io::Read;
use async_stream::stream;
use tokio_stream::Stream;
use bytes::Bytes;
use tokio_stream::StreamExt;
pub struct RecursingConcattyReader<'a> {
inp: AdaptedFilesIterBox<'a>,
cur: Option<ReadBox<'a>>,
}
pub fn concat_read_streams(
mut input: AdaptedFilesIterBox<'_>,
) -> ReadBox<'_> {
let s = stream! {
while let Some(output) = input.next() {
let mut stream = ReaderStream::new(output.inp);
while let Some(bytes) = stream.next().await {
yield bytes;
}
}
};
Box::pin(StreamReader::new(s))
}
/*
impl<'a> RecursingConcattyReader<'a> {
pub fn concat(inp: AdaptedFilesIterBox<'a>) -> anyhow::Result<Box<dyn Read + 'a>> {
pub fn concat(inp: AdaptedFilesIterBox<'a>) -> anyhow::Result<ReadBox<'a>> {
let mut r = RecursingConcattyReader { inp, cur: None };
r.ascend()?;
Ok(Box::new(r))
@ -28,7 +47,7 @@ impl<'a> RecursingConcattyReader<'a> {
Ok(())
}
}
impl<'a> Read for RecursingConcattyReader<'a> {
impl<'a> AsyncRead for RecursingConcattyReader<'a> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
match &mut self.cur {
None => Ok(0), // last file ended
@ -45,3 +64,4 @@ impl<'a> Read for RecursingConcattyReader<'a> {
}
}
}
*/
Loading…
Cancel
Save