read+discard on archive recursion

pull/201/head
phiresky 4 months ago
parent dd2d2b7784
commit 1e93a36cb5

158
.vscode/launch.json vendored

@ -1,78 +1,84 @@
{ {
// Use IntelliSense to learn about possible attributes. // Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes. // Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0", "version": "0.2.0",
"configurations": [ "configurations": [
{ {
"type": "lldb", "type": "lldb",
"request": "launch", "request": "attach",
"name": "Debug unit tests in library 'rga'", "name": "Attach",
"cargo": { "program": "${workspaceFolder}/target/release/rga-preproc"
"args": ["test", "--no-run", "--lib", "--package=rga"], },
"filter": { {
"name": "rga", "type": "lldb",
"kind": "lib" "request": "launch",
} "name": "Debug unit tests in library 'rga'",
}, "cargo": {
"args": [], "args": ["test", "--no-run", "--lib", "--package=rga"],
"cwd": "${workspaceFolder}" "filter": {
}, "name": "rga",
{ "kind": "lib"
"type": "lldb", }
"request": "launch", },
"name": "Debug executable 'rga'", "args": [],
"cargo": { "cwd": "${workspaceFolder}"
"args": ["build", "--bin=rga", "--package=rga"], },
"filter": { {
"name": "rga", "type": "lldb",
"kind": "bin" "request": "launch",
} "name": "Debug executable 'rga'",
}, "cargo": {
"args": [], "args": ["build", "--bin=rga"],
"cwd": "${workspaceFolder}" "filter": {
}, "name": "rga",
{ "kind": "bin"
"type": "lldb", }
"request": "launch", },
"name": "Debug unit tests in executable 'rga'", "args": [],
"cargo": { "cwd": "${workspaceFolder}"
"args": ["test", "--no-run", "--bin=rga", "--package=rga"], },
"filter": { {
"name": "rga", "type": "lldb",
"kind": "bin" "request": "launch",
} "name": "Debug unit tests in executable 'rga'",
}, "cargo": {
"args": [], "args": ["test", "--no-run", "--bin=rga", "--package=ripgrep-all"],
"cwd": "${workspaceFolder}" "filter": {
}, "name": "rga",
{ "kind": "bin"
"type": "lldb", }
"request": "launch", },
"name": "Debug executable 'rga-preproc'", "args": [],
"cargo": { "cwd": "${workspaceFolder}"
"args": ["build", "--bin=rga-preproc", "--package=rga"], },
"filter": { {
"name": "rga-preproc", "type": "lldb",
"kind": "bin" "request": "launch",
} "name": "Debug executable 'rga-preproc'",
}, "cargo": {
"args": ["exampledir/short.pdf"], "args": ["build", "--bin=rga-preproc"],
"cwd": "${workspaceFolder}" "filter": {
}, "name": "rga-preproc",
{ "kind": "bin"
"type": "lldb", }
"request": "launch", },
"name": "Debug unit tests in executable 'rga-preproc'", "args": ["exampledir/tar/test.tar.bz2"],
"cargo": { "cwd": "${workspaceFolder}"
"args": ["test", "--no-run", "--bin=rga-preproc", "--package=rga"], },
"filter": { {
"name": "rga-preproc", "type": "lldb",
"kind": "bin" "request": "launch",
} "name": "Debug unit tests in executable 'rga-preproc'",
}, "cargo": {
"args": [], "args": ["test", "--no-run", "--bin=rga-preproc", "--package=rga"],
"cwd": "${workspaceFolder}" "filter": {
} "name": "rga-preproc",
] "kind": "bin"
}
},
"args": [],
"cwd": "${workspaceFolder}"
}
]
} }

@ -62,3 +62,7 @@ ctor = "0.2.0"
pretty_assertions = "1.3.0" pretty_assertions = "1.3.0"
tempfile = "3.5.0" tempfile = "3.5.0"
tokio-test = "0.4.2" tokio-test = "0.4.2"
[profile.release]
debug = true
split-debuginfo = "packed"

@ -103,7 +103,9 @@ impl FileAdapter for ZipAdapter {
let mut zip = ZipFileReader::new(inp); let mut zip = ZipFileReader::new(inp);
let s = stream! { let s = stream! {
trace!("begin zip");
while let Some(mut entry) = zip.next_entry().await? { while let Some(mut entry) = zip.next_entry().await? {
trace!("zip next entry");
let file = entry.entry(); let file = entry.entry();
if file.filename().ends_with('/') { if file.filename().ends_with('/') {
zip = entry.skip().await?; zip = entry.skip().await?;
@ -143,6 +145,7 @@ impl FileAdapter for ZipAdapter {
zip = entry.done().await.context("going to next file in zip but entry was not read fully")?; zip = entry.done().await.context("going to next file in zip but entry was not read fully")?;
} }
trace!("zip over");
}; };
Ok(Box::pin(s)) Ok(Box::pin(s))

@ -33,6 +33,7 @@ pub fn async_read_and_write_to_cache<'a>(
let s = stream! { let s = stream! {
let mut stream = ReaderStream::new(inp); let mut stream = ReaderStream::new(inp);
while let Some(bytes) = stream.next().await { while let Some(bytes) = stream.next().await {
trace!("read bytes: {:?}", bytes);
if let Ok(bytes) = &bytes { if let Ok(bytes) = &bytes {
if let Some(writer) = zstd_writer.as_mut() { if let Some(writer) = zstd_writer.as_mut() {
writer.write_all(bytes).await?; writer.write_all(bytes).await?;
@ -48,14 +49,18 @@ pub fn async_read_and_write_to_cache<'a>(
} }
yield bytes; yield bytes;
} }
trace!("eof");
// EOF, call on_finish // EOF, call on_finish
let finish = { let finish = {
if let Some(mut writer) = zstd_writer.take() { if let Some(mut writer) = zstd_writer.take() {
writer.shutdown().await?; writer.shutdown().await?;
let res = writer.into_inner(); let res = writer.into_inner();
trace!("EOF");
if res.len() <= max_cache_size { if res.len() <= max_cache_size {
trace!("writing {} bytes to cache", res.len());
(bytes_written, Some(res)) (bytes_written, Some(res))
} else { } else {
trace!("cache longer than max, dropping");
(bytes_written, None) (bytes_written, None)
} }
} else { } else {

@ -20,9 +20,9 @@ use std::io::Cursor;
use std::path::Path; use std::path::Path;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use tokio::io::AsyncBufRead;
use tokio::io::AsyncBufReadExt; use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader; use tokio::io::BufReader;
use tokio::io::{AsyncBufRead, AsyncReadExt};
pub type ActiveAdapters = Vec<Arc<dyn FileAdapter>>; pub type ActiveAdapters = Vec<Arc<dyn FileAdapter>>;
@ -185,6 +185,17 @@ async fn adapt_caching(
} }
} }
async fn read_discard(mut x: ReadBox) -> Result<()> {
let mut buf = [0u8; 1 << 16];
loop {
let n = x.read(&mut buf).await?;
if n == 0 {
break;
}
}
Ok(())
}
pub fn loop_adapt( pub fn loop_adapt(
adapter: &dyn FileAdapter, adapter: &dyn FileAdapter,
detection_reason: FileMatcher, detection_reason: FileMatcher,
@ -213,9 +224,12 @@ pub async fn loop_adapt_inner(
}; };
let s = stream! { let s = stream! {
for await file in inp { for await file in inp {
trace!("next file");
match buf_choose_adapter(file?).await? { match buf_choose_adapter(file?).await? {
Ret::Recurse(ai, adapter, detection_reason, _active_adapters) => { Ret::Recurse(ai, adapter, detection_reason, _active_adapters) => {
if ai.archive_recursion_depth >= ai.config.max_archive_recursion.0 { if ai.archive_recursion_depth >= ai.config.max_archive_recursion.0 {
// some adapters (esp. zip) assume that the entry is read fully and might hang otherwise
read_discard(ai.inp).await?;
let s = format!("{}[rga: max archive recursion reached ({})]\n", ai.line_prefix, ai.archive_recursion_depth).into_bytes(); let s = format!("{}[rga: max archive recursion reached ({})]\n", ai.line_prefix, ai.archive_recursion_depth).into_bytes();
yield Ok(AdaptInfo { yield Ok(AdaptInfo {
inp: Box::pin(Cursor::new(s)), inp: Box::pin(Cursor::new(s)),
@ -241,7 +255,9 @@ pub async fn loop_adapt_inner(
yield Ok(ai); yield Ok(ai);
} }
} }
trace!("done with files");
} }
trace!("stream ended");
}; };
Ok(Box::pin(s)) Ok(Box::pin(s))
} }

@ -2,10 +2,7 @@ use crate::{adapters::FileAdapter, preproc::ActiveAdapters};
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use path_clean::PathClean; use path_clean::PathClean;
use rusqlite::{named_params, OptionalExtension}; use rusqlite::{named_params, OptionalExtension};
use std::{ use std::{path::Path, time::UNIX_EPOCH};
path::Path,
time::{Duration, UNIX_EPOCH},
};
use tokio_rusqlite::Connection; use tokio_rusqlite::Connection;
#[derive(Clone)] #[derive(Clone)]
@ -58,7 +55,7 @@ async fn connect_pragmas(db: &Connection) -> Result<()> {
//db.execute(&format!("pragma page_size = {};", want_page_size)) //db.execute(&format!("pragma page_size = {};", want_page_size))
// .context("setup pragma 1")?; // .context("setup pragma 1")?;
db.call(|db| { db.call(|db| {
db.busy_timeout(Duration::from_secs(10))?; // db.busy_timeout(Duration::from_secs(10))?;
db.pragma_update(None, "journal_mode", "wal")?; db.pragma_update(None, "journal_mode", "wal")?;
db.pragma_update(None, "foreign_keys", "on")?; db.pragma_update(None, "foreign_keys", "on")?;
db.pragma_update(None, "temp_store", "memory")?; db.pragma_update(None, "temp_store", "memory")?;
@ -145,6 +142,12 @@ impl PreprocCache for SqliteCache {
async fn set(&mut self, key: &CacheKey, value: Vec<u8>) -> Result<()> { async fn set(&mut self, key: &CacheKey, value: Vec<u8>) -> Result<()> {
let key = (*key).clone(); // todo: without cloning let key = (*key).clone(); // todo: without cloning
log::trace!(
"Writing to cache: {}, {}, {} byte",
key.adapter,
key.file_path,
value.len()
);
Ok(self Ok(self
.db .db
.call(move |db| { .call(move |db| {

Loading…
Cancel
Save