partial migration to read->read

pull/71/head
phiresky 4 years ago
parent d0d74adfe9
commit 2f580b135a

24
Cargo.lock generated

@ -288,6 +288,27 @@ dependencies = [
"winapi",
]
[[package]]
name = "dyn-clonable"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e9232f0e607a262ceb9bd5141a3dfb3e4db6994b31989bbfd845878cba59fd4"
dependencies = [
"dyn-clonable-impl",
"dyn-clone",
]
[[package]]
name = "dyn-clonable-impl"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "558e40ea573c374cf53507fd240b7ee2f5477df7cfebdb97323ec61c719399c5"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "dyn-clone"
version = "1.0.1"
@ -950,8 +971,11 @@ dependencies = [
"chrono",
"clap",
"crossbeam",
"crossbeam-channel",
"derive_more",
"directories-next",
"dyn-clonable",
"dyn-clone",
"encoding_rs",
"encoding_rs_io",
"env_logger",

@ -49,3 +49,6 @@ directories-next = "1.0.1"
derive_more = "0.99.7"
pretty-bytes = "0.2.2"
memchr = "2.3.3"
crossbeam-channel = "0.4.2"
dyn-clone = "1.0.1"
dyn-clonable = "0.9.0"

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

@ -1,28 +1,31 @@
pub mod custom;
pub mod decompress;
pub mod ffmpeg;
//pub mod ffmpeg;
pub mod fns;
pub mod pdfpages;
//pub mod pdfpages;
pub mod poppler;
pub mod spawning;
pub mod sqlite;
pub mod tar;
pub mod tesseract;
pub mod zip;
//pub mod tar;
//pub mod tesseract;
pub mod writing;
// pub mod zip;
use crate::matching::*;
use crate::preproc::PreprocConfig;
use anyhow::*;
use custom::builtin_spawning_adapters;
use custom::CustomAdapterConfig;
use log::*;
use regex::Regex;
use std::borrow::Cow;
use std::collections::HashMap;
use std::io::prelude::*;
use std::iter::Iterator;
use std::path::Path;
use std::path::{Path, PathBuf};
use std::rc::Rc;
pub type ReadBox = Box<dyn Read + Send>;
pub struct AdapterMeta {
/// unique short name of this adapter (a-z0-9 only)
pub name: String,
@ -63,22 +66,20 @@ pub trait FileAdapter: GetMetadata {
/// adapt a file.
///
/// detection_reason is the Matcher that was used to identify this file. Unless --rga-accurate was given, it is always a FastMatcher
fn adapt(&self, a: AdaptInfo, detection_reason: &SlowMatcher) -> Result<()>;
fn adapt(&self, a: AdaptInfo, detection_reason: &SlowMatcher) -> Result<ReadBox>;
}
pub struct AdaptInfo<'a> {
pub struct AdaptInfo {
/// file path. May not be an actual file on the file system (e.g. in an archive). Used for matching file extensions.
pub filepath_hint: &'a Path,
pub filepath_hint: PathBuf,
/// true if filepath_hint is an actual file on the file system
pub is_real_file: bool,
/// depth at which this file is in archives. 0 for real filesystem
pub archive_recursion_depth: i32,
/// stream to read the file from. can be from a file or from some decoder
pub inp: &'a mut dyn Read,
/// stream to write to. will be written to from a different thread
pub oup: &'a mut (dyn Write + Send),
pub inp: ReadBox,
/// prefix every output line with this string to better indicate the file's location if it is in some archive
pub line_prefix: &'a str,
pub config: PreprocConfig<'a>,
pub line_prefix: String,
pub config: PreprocConfig,
}
/// (enabledAdapters, disabledAdapters)
@ -94,13 +95,13 @@ pub fn get_all_adapters(custom_adapters: Option<Vec<CustomAdapterConfig>>) -> Ad
}
let internal_adapters: Vec<Rc<dyn FileAdapter>> = vec![
Rc::new(ffmpeg::FFmpegAdapter::new()),
Rc::new(zip::ZipAdapter::new()),
//Rc::new(ffmpeg::FFmpegAdapter::new()),
//Rc::new(zip::ZipAdapter::new()),
Rc::new(decompress::DecompressAdapter::new()),
Rc::new(tar::TarAdapter::new()),
// Rc::new(tar::TarAdapter::new()),
Rc::new(sqlite::SqliteAdapter::new()),
Rc::new(pdfpages::PdfPagesAdapter::new()),
Rc::new(tesseract::TesseractAdapter::new()),
// Rc::new(pdfpages::PdfPagesAdapter::new()),
//Rc::new(tesseract::TesseractAdapter::new()),
];
adapters.extend(
builtin_spawning_adapters

@ -1,4 +1,7 @@
use super::{spawning::SpawningFileAdapter, AdapterMeta, GetMetadata};
use super::{
spawning::{SpawningFileAdapter, SpawningFileAdapterTrait},
AdapterMeta, GetMetadata,
};
use crate::matching::{FastMatcher, SlowMatcher};
use lazy_static::lazy_static;
use schemars::JsonSchema;
@ -112,7 +115,7 @@ impl GetMetadata for CustomSpawningFileAdapter {
&self.meta
}
}
impl SpawningFileAdapter for CustomSpawningFileAdapter {
impl SpawningFileAdapterTrait for CustomSpawningFileAdapter {
fn get_exe(&self) -> &str {
&self.binary
}
@ -126,12 +129,12 @@ impl SpawningFileAdapter for CustomSpawningFileAdapter {
}
}
impl CustomAdapterConfig {
pub fn to_adapter(self) -> CustomSpawningFileAdapter {
CustomSpawningFileAdapter {
pub fn to_adapter(&self) -> SpawningFileAdapter {
let ad = CustomSpawningFileAdapter {
binary: self.binary.clone(),
args: self.args.clone(),
meta: AdapterMeta {
name: self.name,
name: self.name.clone(),
version: self.version,
description: format!(
"{}\nRuns: {} {}",
@ -145,7 +148,7 @@ impl CustomAdapterConfig {
.iter()
.map(|s| FastMatcher::FileExtension(s.to_string()))
.collect(),
slow_matchers: self.mimetypes.map(|mimetypes| {
slow_matchers: self.mimetypes.as_ref().map(|mimetypes| {
mimetypes
.iter()
.map(|s| SlowMatcher::MimeType(s.to_string()))
@ -153,6 +156,43 @@ impl CustomAdapterConfig {
}),
disabled_by_default: self.disabled_by_default.unwrap_or(false),
},
}
};
SpawningFileAdapter::new(Box::new(ad))
}
}
#[cfg(test)]
mod test {
use super::super::FileAdapter;
use super::*;
use crate::test_utils::*;
use anyhow::Result;
use std::fs::File;
#[test]
fn poppler() -> Result<()> {
let adapter = builtin_spawning_adapters
.iter()
.find(|e| e.name == "poppler")
.expect("no poppler adapter");
let adapter = adapter.to_adapter();
let filepath = test_data_dir().join("short.pdf");
let (a, d) = simple_adapt_info(&filepath, Box::new(File::open(&filepath)?));
let mut r = adapter.adapt(a, &d)?;
let mut o = Vec::new();
r.read_to_end(&mut o)?;
assert_eq!(
String::from_utf8(o)?,
"hello world
this is just a test.
1
\u{c}"
);
Ok(())
}
}

@ -1,6 +1,6 @@
use super::*;
use crate::preproc::rga_preproc;
use anyhow::*;
use anyhow::Result;
use lazy_static::lazy_static;
use std::path::PathBuf;
@ -47,16 +47,13 @@ impl GetMetadata for DecompressAdapter {
}
}
fn decompress_any<'a, R>(reason: &SlowMatcher, inp: &'a mut R) -> Result<Box<dyn Read + 'a>>
where
R: Read,
{
fn decompress_any(reason: &SlowMatcher, inp: ReadBox) -> Result<ReadBox> {
use FastMatcher::*;
use SlowMatcher::*;
let gz = |inp: &'a mut R| Box::new(flate2::read::MultiGzDecoder::new(inp));
let bz2 = |inp: &'a mut R| Box::new(bzip2::read::BzDecoder::new(inp));
let xz = |inp: &'a mut R| Box::new(xz2::read::XzDecoder::new_multi_decoder(inp));
let zst = |inp: &'a mut R| zstd::stream::read::Decoder::new(inp); // returns result
let gz = |inp: ReadBox| Box::new(flate2::read::MultiGzDecoder::new(inp));
let bz2 = |inp: ReadBox| Box::new(bzip2::read::BzDecoder::new(inp));
let xz = |inp: ReadBox| Box::new(xz2::read::XzDecoder::new_multi_decoder(inp));
let zst = |inp: ReadBox| zstd::stream::read::Decoder::new(inp); // returns result
Ok(match reason {
Fast(FileExtension(ext)) => match ext.as_ref() {
@ -92,35 +89,33 @@ fn get_inner_filename(filename: &Path) -> PathBuf {
}
impl FileAdapter for DecompressAdapter {
fn adapt(&self, ai: AdaptInfo, detection_reason: &SlowMatcher) -> Result<()> {
fn adapt(&self, ai: AdaptInfo, detection_reason: &SlowMatcher) -> Result<ReadBox> {
let AdaptInfo {
filepath_hint,
mut inp,
oup,
inp,
line_prefix,
archive_recursion_depth,
config,
..
} = ai;
let mut decompress = decompress_any(detection_reason, &mut inp)?;
let ai2: AdaptInfo = AdaptInfo {
filepath_hint: &get_inner_filename(filepath_hint),
filepath_hint: get_inner_filename(&filepath_hint),
is_real_file: false,
archive_recursion_depth: archive_recursion_depth + 1,
inp: &mut decompress,
oup,
inp: decompress_any(detection_reason, inp)?,
line_prefix,
config: config.clone(),
};
rga_preproc(ai2)?;
Ok(())
rga_preproc(ai2)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::*;
use std::fs::File;
#[test]
fn test_inner_filename() {
for (a, b) in &[
@ -135,4 +130,40 @@ mod tests {
assert_eq!(get_inner_filename(&PathBuf::from(a)), PathBuf::from(*b));
}
}
#[test]
fn gz() -> Result<()> {
let adapter = DecompressAdapter;
let filepath = test_data_dir().join("hello.gz");
let (a, d) = simple_adapt_info(&filepath, Box::new(File::open(&filepath)?));
let mut r = adapter.adapt(a, &d)?;
let mut o = Vec::new();
r.read_to_end(&mut o)?;
assert_eq!(String::from_utf8(o)?, "hello\n");
Ok(())
}
#[test]
fn pdf_gz() -> Result<()> {
let adapter = DecompressAdapter;
let filepath = test_data_dir().join("short.pdf.gz");
let (a, d) = simple_adapt_info(&filepath, Box::new(File::open(&filepath)?));
let mut r = adapter.adapt(a, &d)?;
let mut o = Vec::new();
r.read_to_end(&mut o)?;
assert_eq!(
String::from_utf8(o)?,
"hello world
this is just a test.
1
\u{c}"
);
Ok(())
}
}

@ -92,7 +92,7 @@ where
}
}
pub fn postprocB(line_prefix: &str, inp: impl Read) -> Result<impl Read> {
pub fn postprocB(_line_prefix: &str, inp: impl Read) -> Result<impl Read> {
let mut page_count = 1;
Ok(ByteReplacer {

@ -1,8 +1,8 @@
use super::*;
use lazy_static::lazy_static;
use spawning::SpawningFileAdapter;
use std::io::BufReader;
use std::process::Command;
/*
static EXTENSIONS: &[&str] = &["pdf"];

@ -5,7 +5,7 @@ use log::*;
use std::io::prelude::*;
use std::io::BufReader;
use std::process::Command;
use std::process::Stdio;
use std::process::{Child, Stdio};
/**
* Copy a Read to a Write, while prefixing every line with a prefix.
@ -53,15 +53,37 @@ pub fn postproc_line_prefix(
}
Ok(())
}
pub trait SpawningFileAdapter: GetMetadata {
pub trait SpawningFileAdapterTrait: GetMetadata {
fn get_exe(&self) -> &str;
fn command(&self, filepath_hint: &Path, command: Command) -> Command;
fn postproc(line_prefix: &str, inp: &mut dyn Read, oup: &mut dyn Write) -> Result<()> {
/*fn postproc(&self, line_prefix: &str, inp: &mut dyn Read, oup: &mut dyn Write) -> Result<()> {
postproc_line_prefix(line_prefix, inp, oup)
}*/
}
pub struct SpawningFileAdapter {
inner: Box<dyn SpawningFileAdapterTrait>,
}
impl SpawningFileAdapter {
pub fn new(inner: Box<dyn SpawningFileAdapterTrait>) -> SpawningFileAdapter {
SpawningFileAdapter { inner }
}
}
impl GetMetadata for SpawningFileAdapter {
fn metadata(&self) -> &AdapterMeta {
self.inner.metadata()
}
}
/*impl<T: SpawningFileAdapterTrait> From<T> for SpawningFileAdapter {
fn from(e: dyn T) -> Self {
SpawningFileAdapter { inner: Box::new(e) }
}
}*/
/// replace a Command.spawn() error "File not found" with a more readable error
/// to indicate some program is not installed
pub fn map_exe_error(err: std::io::Error, exe_name: &str, help: &str) -> Error {
@ -71,63 +93,61 @@ pub fn map_exe_error(err: std::io::Error, exe_name: &str, help: &str) -> Error {
_ => Error::from(err),
}
}
struct ProcWaitReader {
proce: Child,
}
impl Read for ProcWaitReader {
fn read(&mut self, _buf: &mut [u8]) -> std::io::Result<usize> {
let status = self.proce.wait()?;
if status.success() {
Ok(0)
} else {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
format_err!("subprocess failed: {:?}", status),
))
}
}
}
pub fn pipe_output(
line_prefix: &str,
_line_prefix: &str,
mut cmd: Command,
inp: &mut (dyn Read),
oup: &mut (dyn Write + Send),
exe_name: &str,
help: &str,
cp: fn(line_prefix: &str, &mut dyn Read, &mut dyn Write) -> Result<()>,
) -> Result<()> {
) -> Result<ReadBox> {
let mut cmd = cmd
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
.map_err(|e| map_exe_error(e, exe_name, help))?;
let mut stdi = cmd.stdin.take().expect("is piped");
let mut stdo = cmd.stdout.take().expect("is piped");
let stdo = cmd.stdout.take().expect("is piped");
// TODO: how to handle this copying better?
// do we really need threads for this?
crossbeam::scope(|s| -> Result<()> {
s.spawn(|_| cp(line_prefix, &mut stdo, oup).unwrap()); // errors?
crossbeam::scope(|_s| -> Result<()> {
std::io::copy(inp, &mut stdi)?;
drop(stdi); // NEEDED! otherwise deadlock
Ok(())
})
.unwrap()?;
let status = cmd.wait()?;
if status.success() {
Ok(())
} else {
Err(format_err!("subprocess failed: {:?}", status))
}
Ok(Box::new(stdo.chain(ProcWaitReader { proce: cmd })))
}
impl<T> FileAdapter for T
where
T: SpawningFileAdapter,
{
fn adapt(&self, ai: AdaptInfo, _detection_reason: &SlowMatcher) -> Result<()> {
impl FileAdapter for SpawningFileAdapter {
fn adapt(&self, ai: AdaptInfo, _detection_reason: &SlowMatcher) -> Result<ReadBox> {
let AdaptInfo {
filepath_hint,
mut inp,
oup,
line_prefix,
..
} = ai;
let cmd = Command::new(self.get_exe());
let cmd = self.command(filepath_hint, cmd);
let cmd = Command::new(self.inner.get_exe());
let cmd = self.inner.command(&filepath_hint, cmd);
debug!("executing {:?}", cmd);
pipe_output(
line_prefix,
cmd,
&mut inp,
oup,
self.get_exe(),
"",
Self::postproc,
)
pipe_output(&line_prefix, cmd, &mut inp, self.inner.get_exe(), "")
}
}

@ -5,6 +5,7 @@ use log::*;
use rusqlite::types::ValueRef;
use rusqlite::*;
use std::convert::TryInto;
use writing::{WritingFileAdapter, WritingFileAdapterTrait};
static EXTENSIONS: &[&str] = &["db", "db3", "sqlite", "sqlite3"];
@ -27,12 +28,12 @@ lazy_static! {
};
}
#[derive(Default)]
#[derive(Default, Clone)]
pub struct SqliteAdapter;
impl SqliteAdapter {
pub fn new() -> SqliteAdapter {
SqliteAdapter
pub fn new() -> WritingFileAdapter {
WritingFileAdapter::new(Box::new(SqliteAdapter {}))
}
}
impl GetMetadata for SqliteAdapter {
@ -58,12 +59,16 @@ fn format_blob(b: ValueRef) -> String {
}
}
impl FileAdapter for SqliteAdapter {
fn adapt(&self, ai: AdaptInfo, _detection_reason: &SlowMatcher) -> Result<()> {
impl WritingFileAdapterTrait for SqliteAdapter {
fn adapt_write(
&self,
ai: AdaptInfo,
_detection_reason: &SlowMatcher,
oup: &mut dyn Write,
) -> Result<()> {
let AdaptInfo {
is_real_file,
filepath_hint,
oup,
line_prefix,
..
} = ai;
@ -116,3 +121,29 @@ impl FileAdapter for SqliteAdapter {
Ok(())
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::{test_utils::*};
use std::{fs::File};
#[test]
fn simple() -> Result<()> {
let adapter: Box<dyn FileAdapter> = Box::new(SqliteAdapter::new());
let fname = test_data_dir().join("hello.sqlite3");
let rd = File::open(&fname)?;
let (a, d) = simple_adapt_info(&fname, Box::new(rd));
let mut res = adapter.adapt(a, &d)?;
let mut buf = Vec::new();
res.read_to_end(&mut buf)?;
assert_eq!(
String::from_utf8(buf)?,
"PREFIX:tbl: greeting='hello', from='sqlite database!'\nPREFIX:tbl2: x=123, y=456.789\n",
);
Ok(())
}
}

@ -1,6 +1,6 @@
use super::*;
use lazy_static::lazy_static;
use spawning::SpawningFileAdapter;
use spawning::{SpawningFileAdapter, SpawningFileAdapterTrait};
use std::process::Command;
static EXTENSIONS: &[&str] = &["jpg", "png"];
@ -33,7 +33,7 @@ impl GetMetadata for TesseractAdapter {
&METADATA
}
}
impl SpawningFileAdapter for TesseractAdapter {
impl SpawningFileAdapterTrait for TesseractAdapter {
fn get_exe(&self) -> &str {
"tesseract"
}

@ -0,0 +1,51 @@
use super::{FileAdapter, GetMetadata, ReadBox};
use anyhow::Result;
use std::io::Write;
#[dyn_clonable::clonable]
pub trait WritingFileAdapterTrait: GetMetadata + Send + Clone {
fn adapt_write(
&self,
a: super::AdaptInfo,
detection_reason: &crate::matching::SlowMatcher,
oup: &mut dyn Write,
) -> Result<()>;
}
pub struct WritingFileAdapter {
inner: Box<dyn WritingFileAdapterTrait>,
}
impl WritingFileAdapter {
pub fn new(inner: Box<dyn WritingFileAdapterTrait>) -> WritingFileAdapter {
WritingFileAdapter { inner }
}
}
impl GetMetadata for WritingFileAdapter {
fn metadata(&self) -> &super::AdapterMeta {
self.inner.metadata()
}
}
impl FileAdapter for WritingFileAdapter {
fn adapt(
&self,
a: super::AdaptInfo,
detection_reason: &crate::matching::SlowMatcher,
) -> anyhow::Result<ReadBox> {
let (r, w) = crate::pipe::pipe();
let cc = self.inner.clone();
let detc = detection_reason.clone();
std::thread::spawn(move || {
let mut oup = w;
let ai = a;
let res = cc.adapt_write(ai, &detc, &mut oup);
if let Err(e) = res {
oup.write_err(std::io::Error::new(std::io::ErrorKind::Other, e))
.expect("could not write err");
}
});
Ok(Box::new(r))
}
}

@ -85,7 +85,7 @@ impl FromStr for CacheMaxBlobLen {
///
/// 1. describing the command line arguments using structopt+clap and for man page / readme generation
/// 2. describing the config file format (output as JSON schema via schemars)
#[derive(StructOpt, Debug, Deserialize, Serialize, JsonSchema, Default)]
#[derive(StructOpt, Debug, Deserialize, Serialize, JsonSchema, Default, Clone)]
#[structopt(
name = "ripgrep-all",
rename_all = "kebab-case",

@ -16,7 +16,7 @@ fn main() -> anyhow::Result<()> {
std::env::current_dir()?.join(&filepath)
};
let mut i = File::open(&path)?;
let i = File::open(&path)?;
let mut o = std::io::stdout();
let cache = if args.no_cache {
None
@ -24,14 +24,14 @@ fn main() -> anyhow::Result<()> {
Some(rga::preproc_cache::open().context("could not open cache")?)
};
let ai = AdaptInfo {
inp: &mut i,
filepath_hint: &path,
inp: Box::new(i),
filepath_hint: path,
is_real_file: true,
oup: &mut o,
line_prefix: "",
line_prefix: "".to_string(),
archive_recursion_depth: 0,
config: PreprocConfig { cache, args: &args },
config: PreprocConfig { cache, args },
};
rga_preproc(ai)?;
let mut oup = rga_preproc(ai)?;
std::io::copy(&mut oup, &mut o).context("copying adapter output to stdout")?;
Ok(())
}

@ -1,11 +1,16 @@
#![warn(clippy::all)]
#![feature(negative_impls)]
#![feature(specialization)]
pub mod adapters;
pub mod args;
mod caching_writer;
pub mod matching;
pub mod pipe;
pub mod preproc;
pub mod preproc_cache;
#[cfg(test)]
pub mod test_utils;
use anyhow::Context;
use anyhow::Result;
pub use caching_writer::CachingWriter;

@ -33,6 +33,12 @@ pub enum SlowMatcher {
MimeType(String),
}
impl From<FastMatcher> for SlowMatcher {
fn from(t: FastMatcher) -> Self {
SlowMatcher::Fast(t)
}
}
pub struct FileMeta {
// filename is not actually a utf8 string, but since we can't do regex on OsStr and can't get a &[u8] from OsStr either,
// and since we probably only want to do only matching on ascii stuff anyways, this is the filename as a string with non-valid bytes removed

@ -0,0 +1,196 @@
// https://github.com/arcnmx/pipe-rs/blob/master/src/lib.rs
// extended to support sending io errors
#![deny(missing_docs)]
#![doc(html_root_url = "https://docs.rs/pipe/0.3.0")]
#![cfg_attr(feature = "unstable-doc-cfg", feature(doc_cfg))]
//! Synchronous in-memory pipe
//!
//! ## Example
//!
//! ```
//! use std::thread::spawn;
//! use std::io::{Read, Write};
//!
//! let (mut read, mut write) = ripgrep_all::pipe::pipe();
//!
//! let message = "Hello, world!";
//! spawn(move || write.write_all(message.as_bytes()).unwrap());
//!
//! let mut s = String::new();
//! read.read_to_string(&mut s).unwrap();
//!
//! assert_eq!(&s, message);
//! ```
use crossbeam_channel::{Receiver, Sender};
use std::cmp::min;
use std::io::{self, BufRead, Read, Result, Write};
/// The `Read` end of a pipe (see `pipe()`)
pub struct PipeReader {
receiver: Receiver<Result<Vec<u8>>>,
buffer: Vec<u8>,
position: usize,
}
/// The `Write` end of a pipe (see `pipe()`)
#[derive(Clone)]
pub struct PipeWriter {
sender: Sender<Result<Vec<u8>>>,
}
/// Creates a synchronous memory pipe
pub fn pipe() -> (PipeReader, PipeWriter) {
let (sender, receiver) = crossbeam_channel::bounded(0);
(
PipeReader {
receiver,
buffer: Vec::new(),
position: 0,
},
PipeWriter { sender },
)
}
impl PipeWriter {
/// Extracts the inner `SyncSender` from the writer
pub fn into_inner(self) -> Sender<Result<Vec<u8>>> {
self.sender
}
/// Write any error into the pipe, will be handled as an IO error
pub fn write_err(&self, e: std::io::Error) -> Result<()> {
self.sender
.send(Err(e))
.map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "pipe reader has been dropped"))
}
}
impl PipeReader {
/// Extracts the inner `Receiver` from the writer, and any pending buffered data
pub fn into_inner(mut self) -> (Receiver<Result<Vec<u8>>>, Vec<u8>) {
self.buffer.drain(..self.position);
(self.receiver, self.buffer)
}
}
impl BufRead for PipeReader {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
while self.position >= self.buffer.len() {
match self.receiver.recv() {
// The only existing error is EOF
Err(_) => break,
Ok(Err(e)) => Err(e)?,
Ok(Ok(data)) => {
self.buffer = data;
self.position = 0;
}
}
}
Ok(&self.buffer[self.position..])
}
fn consume(&mut self, amt: usize) {
debug_assert!(self.buffer.len() - self.position >= amt);
self.position += amt
}
}
impl Read for PipeReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if buf.is_empty() {
return Ok(0);
}
let internal = self.fill_buf()?;
let len = min(buf.len(), internal.len());
if len > 0 {
buf[..len].copy_from_slice(&internal[..len]);
self.consume(len);
}
Ok(len)
}
}
impl Write for PipeWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let data = buf.to_vec();
self.sender
.send(Ok(data))
.map(|_| buf.len())
.map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "pipe reader has been dropped"))
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::{Read, Write};
use std::thread::spawn;
#[test]
fn pipe_reader() {
let i = b"hello there";
let mut o = Vec::with_capacity(i.len());
let (mut r, mut w) = pipe();
let guard = spawn(move || {
w.write_all(&i[..5]).unwrap();
w.write_all(&i[5..]).unwrap();
drop(w);
});
r.read_to_end(&mut o).unwrap();
assert_eq!(i, &o[..]);
guard.join().unwrap();
}
#[test]
fn pipe_writer_fail() {
let i = b"hi";
let (r, mut w) = pipe();
let guard = spawn(move || {
drop(r);
});
assert!(w.write_all(i).is_err());
guard.join().unwrap();
}
#[test]
fn small_reads() {
let block_cnt = 20;
const BLOCK: usize = 20;
let (mut r, mut w) = pipe();
let guard = spawn(move || {
for _ in 0..block_cnt {
let data = &[0; BLOCK];
w.write_all(data).unwrap();
}
});
let mut buff = [0; BLOCK / 2];
let mut read = 0;
while let Ok(size) = r.read(&mut buff) {
// 0 means EOF
if size == 0 {
break;
}
read += size;
}
assert_eq!(block_cnt * BLOCK, read);
guard.join().unwrap();
}
}

@ -4,20 +4,20 @@ use crate::matching::*;
use crate::{print_bytes, print_dur, CachingWriter};
use anyhow::*;
use log::*;
use path_clean::PathClean;
use std::convert::TryInto;
use std::io::BufRead;
use std::io::BufReader;
use std::io::BufWriter;
use std::{
sync::{Arc, RwLock},
time::Instant,
};
#[derive(Clone)]
pub struct PreprocConfig<'a> {
pub struct PreprocConfig {
pub cache: Option<Arc<RwLock<dyn crate::preproc_cache::PreprocCache>>>,
pub args: &'a RgaConfig,
pub args: RgaConfig,
}
/**
* preprocess a file as defined in `ai`.
@ -25,19 +25,18 @@ pub struct PreprocConfig<'a> {
* If a cache is passed, read/write to it.
*
*/
pub fn rga_preproc(ai: AdaptInfo) -> Result<()> {
pub fn rga_preproc(ai: AdaptInfo) -> Result<ReadBox> {
let AdaptInfo {
filepath_hint,
is_real_file,
inp,
oup,
line_prefix,
config,
archive_recursion_depth,
..
} = ai;
debug!("path (hint) to preprocess: {:?}", filepath_hint);
let PreprocConfig { mut cache, args } = config;
let PreprocConfig { cache: _, args } = config;
let filtered_adapters = get_adapters_filtered(args.custom_adapters.clone(), &args.adapters)?;
let adapters = adapter_matcher(&filtered_adapters, args.accurate)?;
let filename = filepath_hint
@ -45,22 +44,22 @@ pub fn rga_preproc(ai: AdaptInfo) -> Result<()> {
.ok_or_else(|| format_err!("Empty filename"))?;
debug!("Archive recursion depth: {}", archive_recursion_depth);
if archive_recursion_depth >= args.max_archive_recursion.0 {
writeln!(oup, "{}[rga: max archive recursion reached]", line_prefix)?;
return Ok(());
let s = format!("{}[rga: max archive recursion reached]", line_prefix).into_bytes();
return Ok(Box::new(std::io::Cursor::new(s)));
}
// todo: figure out when using a bufreader is a good idea and when it is not
// seems to be good for File::open() reads, but not sure about within archives (tar, zip)
let inp = &mut BufReader::with_capacity(1 << 13, inp);
let inp = BufReader::with_capacity(1 << 16, inp);
let mimetype = if args.accurate {
let buf = inp.fill_buf()?; // fill but do not consume!
let mimetype = tree_magic::from_u8(buf);
debug!("mimetype: {:?}", mimetype);
Some(mimetype)
} else {
None
};
let mimetype = None; /*if args.accurate {
let buf = inp.fill_buf()?; // fill but do not consume!
let mimetype = tree_magic::from_u8(buf);
debug!("mimetype: {:?}", mimetype);
Some(mimetype)
} else {
None
};*/
let adapter = adapters(FileMeta {
mimetype,
lossy_filename: filename.to_string_lossy().to_string(),
@ -77,8 +76,8 @@ pub fn rga_preproc(ai: AdaptInfo) -> Result<()> {
filepath_hint.to_string_lossy(),
&meta.name
);
let db_name = format!("{}.v{}", meta.name, meta.version);
if let Some(cache) = cache.as_mut() {
let _db_name = format!("{}.v{}", meta.name, meta.version);
/*if let Some(cache) = cache.as_mut() {
let cache_key: Vec<u8> = {
let clean_path = filepath_hint.to_owned().clean();
let meta = std::fs::metadata(&filepath_hint)?;
@ -160,45 +159,43 @@ pub fn rga_preproc(ai: AdaptInfo) -> Result<()> {
}),
)?;
Ok(())
} else {
// no cache arc - probably within archive
debug!("adapting without caching...");
let start = Instant::now();
adapter
.adapt(
AdaptInfo {
line_prefix,
filepath_hint,
is_real_file,
inp,
oup,
archive_recursion_depth,
config: PreprocConfig { cache: None, args },
},
&detection_reason,
} else { */
// no cache arc - probably within archive
debug!("adapting without caching...");
let start = Instant::now();
let oread = adapter
.adapt(
AdaptInfo {
line_prefix,
filepath_hint: filepath_hint.clone(),
is_real_file,
inp: Box::new(inp),
archive_recursion_depth,
config: PreprocConfig { cache: None, args },
},
&detection_reason,
)
.with_context(|| {
format!(
"adapting {} via {} without caching failed",
filepath_hint.to_string_lossy(),
meta.name
)
.with_context(|| {
format!(
"adapting {} via {} without caching failed",
filepath_hint.to_string_lossy(),
meta.name
)
})?;
debug!(
"running adapter {} took {}",
adapter.metadata().name,
print_dur(start)
);
Ok(())
}
})?;
debug!(
"running adapter {} took {}",
adapter.metadata().name,
print_dur(start)
);
Ok(oread)
/* }*/
}
None => {
// allow passthrough if the file is in an archive or accurate matching is enabled
// otherwise it should have been filtered out by rg pre-glob since rg can handle those better than us
let allow_cat = !is_real_file || args.accurate;
if allow_cat {
spawning::postproc_line_prefix(line_prefix, inp, oup)?;
Ok(())
Ok(Box::new(inp))
} else {
Err(format_err!(
"No adapter found for file {:?}, passthrough disabled.",

@ -10,7 +10,7 @@ use std::{
pub fn open() -> Result<Arc<RwLock<dyn PreprocCache>>> {
Ok(Arc::new(RwLock::new(LmdbCache::open()?)))
}
pub trait PreprocCache {
pub trait PreprocCache: Send + Sync {
// possible without second lambda?
fn get_or_run<'a>(
&mut self,

@ -0,0 +1,33 @@
use crate::{
adapters::{AdaptInfo, ReadBox},
args::RgaConfig,
matching::{FastMatcher, SlowMatcher},
preproc::PreprocConfig,
};
use std::{
path::{Path, PathBuf},
};
pub fn test_data_dir() -> PathBuf {
let mut d = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
d.push("exampledir/test/");
d
}
pub fn simple_adapt_info(filepath: &Path, inp: ReadBox) -> (AdaptInfo, SlowMatcher) {
(
AdaptInfo {
filepath_hint: filepath.to_owned(),
is_real_file: true,
archive_recursion_depth: 0,
inp,
line_prefix: "PREFIX:".to_string(),
config: PreprocConfig {
cache: None,
args: RgaConfig::default(),
},
},
FastMatcher::FileExtension(filepath.extension().unwrap().to_string_lossy().into_owned())
.into(),
)
}
Loading…
Cancel
Save