Merge remote-tracking branch 'upstream/master' into mbox-extractor

pull/104/head
FliegendeWurst 10 months ago
commit 2259730c67

@ -0,0 +1,27 @@
---
name: Bug report
about: Create a report to help us improve
title: ''
labels: bug
assignees: ''
---
**Describe the bug**
**To Reproduce**
Attach example file:
Run command:
**Output**
**Screenshots**
If applicable, add screenshots to help explain your problem.
**Operating System and Version**
**Output of `rga --version`**

@ -0,0 +1,20 @@
---
name: Feature request
about: Suggest an idea for this project
title: ''
labels: ''
assignees: ''
---
**Is your feature request related to a problem? Please describe.**
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
**Describe the solution you'd like**
A clear and concise description of what you want to happen.
**Describe alternatives you've considered**
A clear and concise description of any alternative solutions or features you've considered.
**Additional context**
Add any other context or screenshots about the feature request here.

@ -1,75 +1,25 @@
# Based on https://github.com/actions-rs/meta/blob/master/recipes/quickstart.md # Based on https://github.com/actions-rs/meta/blob/master/recipes/quickstart.md
# #
# While our "example" application has the platform-specific code, # While our "example" application has platform-specific code,
# for simplicity we are compiling and testing everything on the Ubuntu environment only. # for simplicity we are compiling and testing everything in a nix-on-Linux environment only.
# For multi-OS testing see the `cross.yml` workflow.
on: [push, pull_request] on: [push, pull_request]
name: ci name: ci
jobs: jobs:
check: nix-flake-check:
name: Check name: nix flake check
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: Checkout sources - name: Checkout sources
uses: actions/checkout@v2 uses: actions/checkout@v3
- name: Install stable toolchain - name: Install nix
uses: actions-rs/toolchain@v1 uses: cachix/install-nix-action@v21
with:
profile: minimal
toolchain: stable
override: true
- name: Run cargo check - name: Ensure the build succeeds
uses: actions-rs/cargo@v1 run: nix build
with:
command: check
test: - name: Run `nix flake check` to run formatters, linters, and tests
name: Test Suite run: nix flake check --print-build-logs
runs-on: ubuntu-latest
steps:
- name: Checkout sources
uses: actions/checkout@v2
- name: Install stable toolchain
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
override: true
- name: Run cargo test
uses: actions-rs/cargo@v1
with:
command: test
lints:
name: Lints
runs-on: ubuntu-latest
steps:
- name: Checkout sources
uses: actions/checkout@v2
- name: Install stable toolchain
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
override: true
components: rustfmt, clippy
- name: Run cargo fmt
uses: actions-rs/cargo@v1
with:
command: fmt
args: --all -- --check
- name: Run cargo clippy
uses: actions-rs/cargo@v1
with:
command: clippy
args: -- -D warnings

@ -18,14 +18,14 @@ on:
# branches: # branches:
# - ag/release # - ag/release
tags: tags:
- 'v[0-9]+.[0-9]+.[0-9]+' - "v[0-9]+.[0-9]+.[0-9]+*"
jobs: jobs:
create-release: create-release:
name: create-release name: create-release
runs-on: ubuntu-latest runs-on: ubuntu-latest
# env: # env:
# Set to force version number, e.g., when no tag exists. # Set to force version number, e.g., when no tag exists.
# RG_VERSION: TEST-0.0.0 # RG_VERSION: TEST-0.0.0
steps: steps:
- name: Create artifacts directory - name: Create artifacts directory
run: mkdir artifacts run: mkdir artifacts
@ -62,7 +62,7 @@ jobs:
build-release: build-release:
name: build-release name: build-release
needs: ['create-release'] needs: ["create-release"]
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
env: env:
# For some builds, we use cross to test on 32-bit and big-endian # For some builds, we use cross to test on 32-bit and big-endian
@ -78,124 +78,124 @@ jobs:
matrix: matrix:
build: [linux, linux-arm, macos, win-msvc] build: [linux, linux-arm, macos, win-msvc]
include: include:
- build: linux - build: linux
os: ubuntu-18.04 os: ubuntu-22.04
rust: nightly rust: nightly
target: x86_64-unknown-linux-musl target: x86_64-unknown-linux-musl
- build: linux-arm - build: linux-arm
os: ubuntu-18.04 os: ubuntu-22.04
rust: nightly rust: nightly
target: arm-unknown-linux-gnueabihf target: arm-unknown-linux-gnueabihf
- build: macos - build: macos
os: macos-latest os: macos-latest
rust: nightly rust: nightly
target: x86_64-apple-darwin target: x86_64-apple-darwin
- build: win-msvc - build: win-msvc
os: windows-2019 os: windows-2019
rust: nightly rust: nightly
target: x86_64-pc-windows-msvc target: x86_64-pc-windows-msvc
#- build: win-gnu #- build: win-gnu
# os: windows-2019 # os: windows-2019
# rust: nightly-x86_64-gnu # rust: nightly-x86_64-gnu
# target: x86_64-pc-windows-gnu # target: x86_64-pc-windows-gnu
steps: steps:
- name: Checkout repository - name: Checkout repository
uses: actions/checkout@v2 uses: actions/checkout@v2
with: with:
fetch-depth: 1 fetch-depth: 1
- name: Install packages (Ubuntu) - name: Install packages (Ubuntu)
if: matrix.os == 'ubuntu-18.04' if: matrix.os == 'ubuntu-22.04'
run: | run: |
ci/ubuntu-install-packages ci/ubuntu-install-packages
- name: Install packages (macOS) - name: Install packages (macOS)
if: matrix.os == 'macos-latest' if: matrix.os == 'macos-latest'
run: | run: |
ci/macos-install-packages ci/macos-install-packages
- name: Install Rust - name: Install Rust
uses: actions-rs/toolchain@v1 uses: actions-rs/toolchain@v1
with: with:
toolchain: ${{ matrix.rust }} toolchain: ${{ matrix.rust }}
profile: minimal profile: minimal
override: true override: true
target: ${{ matrix.target }} target: ${{ matrix.target }}
- name: Use Cross - name: Use Cross
# if: matrix.os != 'windows-2019' shell: bash
run: | run: |
cargo install cross cargo install cross
echo "CARGO=cross" >> $GITHUB_ENV echo "CARGO=cross" >> $GITHUB_ENV
echo "TARGET_FLAGS=--target ${{ matrix.target }}" >> $GITHUB_ENV echo "TARGET_FLAGS=--target ${{ matrix.target }}" >> $GITHUB_ENV
echo "TARGET_DIR=./target/${{ matrix.target }}" >> $GITHUB_ENV echo "TARGET_DIR=./target/${{ matrix.target }}" >> $GITHUB_ENV
- name: Show command used for Cargo - name: Show command used for Cargo
run: | run: |
echo "cargo command is: ${{ env.CARGO }}" echo "cargo command is: ${{ env.CARGO }}"
echo "target flag is: ${{ env.TARGET_FLAGS }}" echo "target flag is: ${{ env.TARGET_FLAGS }}"
echo "target dir is: ${{ env.TARGET_DIR }}" echo "target dir is: ${{ env.TARGET_DIR }}"
- name: Get release download URL - name: Get release download URL
uses: actions/download-artifact@v1 uses: actions/download-artifact@v1
with: with:
name: artifacts name: artifacts
path: artifacts path: artifacts
- name: Set release upload URL and release version - name: Set release upload URL and release version
shell: bash shell: bash
run: | run: |
echo "RELEASE_UPLOAD_URL=$(cat artifacts/release-upload-url)" >> $GITHUB_ENV echo "RELEASE_UPLOAD_URL=$(cat artifacts/release-upload-url)" >> $GITHUB_ENV
echo "release upload url: $RELEASE_UPLOAD_URL" echo "release upload url: $RELEASE_UPLOAD_URL"
echo "RELEASE_VERSION=$(cat artifacts/release-version)" >> $GITHUB_ENV echo "RELEASE_VERSION=$(cat artifacts/release-version)" >> $GITHUB_ENV
echo "release version: $RELEASE_VERSION" echo "release version: $RELEASE_VERSION"
- name: Build release binary - name: Build release binary
run: ${{ env.CARGO }} build --verbose --release ${{ env.TARGET_FLAGS }} run: ${{ env.CARGO }} build --verbose --release ${{ env.TARGET_FLAGS }}
- name: Strip release binary (linux and macos) - name: Strip release binary (linux and macos)
if: matrix.build == 'linux' || matrix.build == 'macos' if: matrix.build == 'linux' || matrix.build == 'macos'
run: | run: |
strip "target/${{ matrix.target }}/release/rga" "target/${{ matrix.target }}/release/rga-preproc" strip "target/${{ matrix.target }}/release/rga" "target/${{ matrix.target }}/release/rga-preproc"
- name: Strip release binary (arm) - name: Strip release binary (arm)
if: matrix.build == 'linux-arm' if: matrix.build == 'linux-arm'
run: | run: |
docker run --rm -v \ docker run --rm -v \
"$PWD/target:/target:Z" \ "$PWD/target:/target:Z" \
rustembedded/cross:arm-unknown-linux-gnueabihf \ rustembedded/cross:arm-unknown-linux-gnueabihf \
arm-linux-gnueabihf-strip \ arm-linux-gnueabihf-strip \
/target/arm-unknown-linux-gnueabihf/release/rga \ /target/arm-unknown-linux-gnueabihf/release/rga \
/target/arm-unknown-linux-gnueabihf/release/rga-preproc /target/arm-unknown-linux-gnueabihf/release/rga-preproc
- name: Build archive - name: Build archive
shell: bash shell: bash
run: | run: |
staging="ripgrep_all-${{ env.RELEASE_VERSION }}-${{ matrix.target }}" staging="ripgrep_all-${{ env.RELEASE_VERSION }}-${{ matrix.target }}"
mkdir -p "$staging"/doc mkdir -p "$staging"/doc
cp {README.md,LICENSE.md} "$staging/" cp {README.md,LICENSE.md} "$staging/"
cp CHANGELOG.md "$staging/doc/" cp CHANGELOG.md "$staging/doc/"
if [ "${{ matrix.os }}" = "windows-2019" ]; then if [ "${{ matrix.os }}" = "windows-2019" ]; then
cp "target/${{ matrix.target }}/release/rga.exe" "$staging/" cp "target/${{ matrix.target }}/release/rga.exe" "$staging/"
cp "target/${{ matrix.target }}/release/rga-preproc.exe" "$staging/" cp "target/${{ matrix.target }}/release/rga-preproc.exe" "$staging/"
7z a "$staging.zip" "$staging" 7z a "$staging.zip" "$staging"
echo "ASSET=$staging.zip" >> $GITHUB_ENV echo "ASSET=$staging.zip" >> $GITHUB_ENV
else else
cp "target/${{ matrix.target }}/release/rga" "$staging/" cp "target/${{ matrix.target }}/release/rga" "$staging/"
cp "target/${{ matrix.target }}/release/rga-preproc" "$staging/" cp "target/${{ matrix.target }}/release/rga-preproc" "$staging/"
tar czf "$staging.tar.gz" "$staging" tar czf "$staging.tar.gz" "$staging"
echo "ASSET=$staging.tar.gz" >> $GITHUB_ENV echo "ASSET=$staging.tar.gz" >> $GITHUB_ENV
fi fi
- name: Upload release archive - name: Upload release archive
uses: actions/upload-release-asset@v1.0.1 uses: actions/upload-release-asset@v1.0.1
env: env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with: with:
upload_url: ${{ env.RELEASE_UPLOAD_URL }} upload_url: ${{ env.RELEASE_UPLOAD_URL }}
asset_path: ${{ env.ASSET }} asset_path: ${{ env.ASSET }}
asset_name: ${{ env.ASSET }} asset_name: ${{ env.ASSET }}
asset_content_type: application/octet-stream asset_content_type: application/octet-stream

1139
Cargo.lock generated

File diff suppressed because it is too large Load Diff

@ -2,7 +2,7 @@
[package] [package]
authors = ["phiresky <phireskyde+git@gmail.com>"] authors = ["phiresky <phireskyde+git@gmail.com>"]
description = "rga: ripgrep, but also search in PDFs, E-Books, Office documents, zip, tar.gz, etc." description = "rga: ripgrep, but also search in PDFs, E-Books, Office documents, zip, tar.gz, etc."
edition = "2018" edition = "2021"
exclude = [ exclude = [
"exampledir/*", "exampledir/*",
] ]
@ -11,56 +11,57 @@ license = "AGPL-3.0-or-later"
name = "ripgrep_all" name = "ripgrep_all"
readme = "README.md" readme = "README.md"
repository = "https://github.com/phiresky/ripgrep-all" repository = "https://github.com/phiresky/ripgrep-all"
version = "1.0.0-alpha.2" version = "1.0.0-alpha.5"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
anyhow = "1.0.32" anyhow = {version = "1.0.71", features = ["backtrace"]}
async-compression = {version = "0.3.15", features = ["all", "all-algorithms", "tokio"]} async-compression = { version = "0.4.0", features = ["all", "all-algorithms", "tokio"] }
async-stream = "0.3.3" async-stream = "0.3.5"
async-trait = "0.1.64" async-trait = "0.1.68"
async_zip = "0.0.9" async_zip = {version = "0.0.12", features = ["full"]}
bincode = "1.3.1" bincode = "1.3.3"
bytes = "1.2.1" bytes = "1.4.0"
clap = {version = "4.0.18", features = ["wrap_help"]} clap = {version = "4.3.0", features = ["wrap_help"]}
crossbeam = "0.8.1" crossbeam = "0.8.2"
crossbeam-channel = "0.5.1" crossbeam-channel = "0.5.8"
derive_more = "0.99.9" derive_more = "0.99.17"
directories-next = "2.0.0" directories-next = "2.0.0"
dyn-clonable = "0.9.0" dyn-clonable = "0.9.0"
dyn-clone = "1.0.2" dyn-clone = "1.0.11"
encoding_rs = "0.8.24" encoding_rs = "0.8.32"
encoding_rs_io = "0.1.7" encoding_rs_io = "0.1.7"
env_logger = "0.9.0" env_logger = "0.10.0"
glob = "0.3.0" glob = "0.3.1"
json_comments = "0.2.1" json_comments = "0.2.1"
lazy_static = "1.4.0" lazy_static = "1.4.0"
log = "0.4.11" log = "0.4.17"
mailbox = "0.2.0" mailbox = "0.2.0"
mailparse = "0.14.0" mailparse = "0.14.0"
memchr = "2.3.3" memchr = "2.5.0"
mime2ext = "0.1.52" mime2ext = "0.1.52"
paste = "1.0.0" paste = "1.0.12"
path-clean = "0.1.0" path-clean = "1.0.1"
pretty-bytes = "0.2.2" pretty-bytes = "0.2.2"
regex = "1.3.9" regex = "1.8.2"
rkv = "0.17" rusqlite = {version = "0.29.0", features = ["vtab", "bundled"]}
rusqlite = {version = "0.28.0", features = ["vtab", "bundled"]} schemars = {version = "0.8.12", features = ["preserve_order"]}
schemars = {version = "0.8.0-alpha-4", features = ["preserve_order"]} serde = {version = "1.0.163", features = ["derive"]}
serde = {version = "1.0.115", features = ["derive"]} serde_json = "1.0.96"
serde_json = "1.0.57"
size_format = "1.0.2" size_format = "1.0.2"
structopt = "0.3.17" structopt = "0.3.26"
tempfile = "3.1.0" tempfile = "3.5.0"
tokio = {version = "1.21.2", features = ["full"]} tokio = {version = "1.28.1", features = ["full"]}
tokio-stream = {version = "0.1.11", features = ["io-util", "tokio-util"]} tokio-rusqlite = "0.4.0"
tokio-stream = {version = "0.1.14", features = ["io-util", "tokio-util"]}
tokio-tar = { git = "https://github.com/vorot93/tokio-tar", version = "0.3.0" } tokio-tar = { git = "https://github.com/vorot93/tokio-tar", version = "0.3.0" }
tokio-util = {version = "0.7.4", features = ["io", "full"]} tokio-util = {version = "0.7.8", features = ["io", "full"]}
tree_magic = {package = "tree_magic_mini", version = "3.0.0"} tree_magic = {package = "tree_magic_mini", version = "3.0.3"}
[dev-dependencies] [dev-dependencies]
async-recursion = "1.0.0" async-recursion = "1.0.4"
ctor = "0.1.20" ctor = "0.2.0"
pretty_assertions = "1.3.0" pretty_assertions = "1.3.0"
tempfile = "3.5.0"
tokio-test = "0.4.2" tokio-test = "0.4.2"

@ -33,45 +33,7 @@ demo/
![rga-fzf](doc/rga-fzf.gif) ![rga-fzf](doc/rga-fzf.gif)
You can use rga interactively via fzf. Add the following to your ~/.{bash,zsh}rc: See [the wiki](https://github.com/phiresky/ripgrep-all/wiki/fzf-Integration) for instructions of integrating rga with fzf.
```bash
rga-fzf() {
RG_PREFIX="rga --files-with-matches"
local file
file="$(
FZF_DEFAULT_COMMAND="$RG_PREFIX '$1'" \
fzf --sort --preview="[[ ! -z {} ]] && rga --pretty --context 5 {q} {}" \
--phony -q "$1" \
--bind "change:reload:$RG_PREFIX {q}" \
--preview-window="70%:wrap"
)" &&
echo "opening $file" &&
xdg-open "$file"
}
```
And for your `~/.config/fish/config.fish`:
```
function rga-fzf
set RG_PREFIX 'rga --files-with-matches'
if test (count $argv) -gt 1
set RG_PREFIX "$RG_PREFIX $argv[1..-2]"
end
set -l file $file
set file (
FZF_DEFAULT_COMMAND="$RG_PREFIX '$argv[-1]'" \
fzf --sort \
--preview='test ! -z {} && \
rga --pretty --context 5 {q} {}' \
--phony -q "$argv[-1]" \
--bind "change:reload:$RG_PREFIX {q}" \
--preview-window='50%:wrap'
) && \
echo "opening $file" && \
open "$file"
end
```
## INSTALLATION ## INSTALLATION
@ -86,9 +48,11 @@ Linux x64, macOS and Windows binaries are available [in GitHub Releases][latestr
`pacman -S ripgrep-all`. `pacman -S ripgrep-all`.
#### Nix #### Nix
`nix-env -iA nixpkgs.ripgrep-all` `nix-env -iA nixpkgs.ripgrep-all`
#### Debian-based #### Debian-based
download the [rga binary][latestrelease] and get the dependencies like this: download the [rga binary][latestrelease] and get the dependencies like this:
`apt install ripgrep pandoc poppler-utils ffmpeg` `apt install ripgrep pandoc poppler-utils ffmpeg`
@ -117,7 +81,7 @@ If you get an error like `VCRUNTIME140.DLL could not be found`, you need to inst
To install the dependencies that are each not strictly necessary but very useful: To install the dependencies that are each not strictly necessary but very useful:
`brew install pandoc poppler tesseract ffmpeg` `brew install pandoc poppler ffmpeg`
### Compile from source ### Compile from source
@ -131,58 +95,58 @@ rga should compile with stable Rust (v1.36.0+, check with `rustc --version`). To
## Available Adapters ## Available Adapters
rga works with _adapters_ that adapt various file formats. It comes with a few adapters integrated:
``` ```
rga --rga-list-adapters rga --rga-list-adapters
``` ```
You can also add **custom adapters**. See [the wiki](https://github.com/phiresky/ripgrep-all/wiki) for more information.
<!-- this part generated by update-readme.sh --> <!-- this part generated by update-readme.sh -->
Adapters: Adapters:
- **ffmpeg** - **pandoc**
Uses ffmpeg to extract video metadata/chapters and subtitles. Uses pandoc to convert binary/unreadable text documents to plain markdown-like text
Extensions: `.mkv`, `.mp4`, `.avi` Runs: pandoc --from= --to=plain --wrap=none --markdown-headings=atx
Extensions: .epub, .odt, .docx, .fb2, .ipynb
* **pandoc**
Uses pandoc to convert binary/unreadable text documents to plain markdown-like text.
Extensions: `.epub`, `.odt`, `.docx`, `.fb2`, `.ipynb`
- **poppler** - **poppler**
Uses pdftotext (from poppler-utils) to extract plain text from PDF files. Uses pdftotext (from poppler-utils) to extract plain text from PDF files
Extensions: `.pdf` Runs: pdftotext - -
Mime Types: `application/pdf` Extensions: .pdf
Mime Types: application/pdf
- **zip** - **postprocpagebreaks**
Reads a zip file as a stream and recurses down into its contents. Adds the page number to each line for an input file that specifies page breaks as ascii page break character.
Extensions: `.zip` Mainly to be used internally by the poppler adapter.
Mime Types: `application/zip` Extensions: .asciipagebreaks
- **decompress** - **ffmpeg**
Reads compressed file as a stream and runs a different extractor on the contents. Uses ffmpeg to extract video metadata/chapters, subtitles, lyrics, and other metadata
Extensions: `.tgz`, `.tbz`, `.tbz2`, `.gz`, `.bz2`, `.xz`, `.zst` Extensions: .mkv, .mp4, .avi, .mp3, .ogg, .flac, .webm
Mime Types: `application/gzip`, `application/x-bzip`, `application/x-xz`, `application/zstd`
- **tar** - **zip**
Reads a tar file as a stream and recurses down into its contents. Reads a zip file as a stream and recurses down into its contents
Extensions: `.tar` Extensions: .zip, .jar
Mime Types: application/zip
* **sqlite** - **decompress**
Uses sqlite bindings to convert sqlite databases into a simple plain text format. Reads compressed file as a stream and runs a different extractor on the contents.
Extensions: `.db`, `.db3`, `.sqlite`, `.sqlite3` Extensions: .tgz, .tbz, .tbz2, .gz, .bz2, .xz, .zst
Mime Types: `application/x-sqlite3` Mime Types: application/gzip, application/x-bzip, application/x-xz, application/zstd
The following adapters are disabled by default, and can be enabled using `--rga-adapters=+pdfpages,tesseract`: - **tar**
Reads a tar file as a stream and recurses down into its contents
Extensions: .tar
- **pdfpages** - **sqlite**
Converts a pdf to its individual pages as png files. Only useful in combination with tesseract. Uses sqlite bindings to convert sqlite databases into a simple plain text format
Extensions: `.pdf` Extensions: .db, .db3, .sqlite, .sqlite3
Mime Types: `application/pdf` Mime Types: application/x-sqlite3
- **tesseract** The following adapters are disabled by default, and can be enabled using '--rga-adapters=+foo,bar':
Uses tesseract to run OCR on images to make them searchable.
May need `-j1` to prevent overloading the system.
Make sure you have tesseract installed.
Extensions: `.jpg`, `.png`
## USAGE: ## USAGE:
@ -202,6 +166,17 @@ The following adapters are disabled by default, and can be enabled using `--rga-
> Detection is only done on the first 8KiB of the file, since we can\'t > Detection is only done on the first 8KiB of the file, since we can\'t
> always seek on the input (in archives). > always seek on the input (in archives).
**\--rga-no-cache**
> Disable caching of results
>
> By default, rga caches the extracted text, if it is small enough, to a
> database in \${XDG*CACHE_DIR-\~/.cache}/ripgrep-all on Linux,
> *\~/Library/Caches/ripgrep-all\_ on macOS, or
> C:\\Users\\username\\AppData\\Local\\ripgrep-all on Windows. This way,
> repeated searches on the same set of files will be much faster. If you
> pass this flag, all caching will be disabled.
**-h**, **\--help** **-h**, **\--help**
> Prints help information > Prints help information
@ -210,15 +185,9 @@ The following adapters are disabled by default, and can be enabled using `--rga-
> List all known adapters > List all known adapters
**\--rga-no-cache** **\--rga-print-config-schema**
> Disable caching of results > Print the JSON Schema of the configuration file
>
> By default, rga caches the extracted text, if it is small enough, to a
> database in \~/.cache/rga on Linux, _\~/Library/Caches/rga_ on macOS,
> or C:\\Users\\username\\AppData\\Local\\rga on Windows. This way,
> repeated searches on the same set of files will be much faster. If you
> pass this flag, all caching will be disabled.
**\--rg-help** **\--rg-help**
@ -242,24 +211,31 @@ The following adapters are disabled by default, and can be enabled using `--rga-
> use all default adapters except for bar and baz. \"+bar,baz\" means > use all default adapters except for bar and baz. \"+bar,baz\" means
> use all default adapters and also bar and baz. > use all default adapters and also bar and baz.
**\--rga-cache-compression-level=**\<cache-compression-level\> **\--rga-cache-compression-level=**\<compression-level\>
> ZSTD compression level to apply to adapter outputs before storing in > ZSTD compression level to apply to adapter outputs before storing in
> cache db > cache db
> >
> Ranges from 1 - 22 \[default: 12\] > Ranges from 1 - 22 \[default: 12\]
**\--rga-cache-max-blob-len=**\<cache-max-blob-len\> **\--rga-config-file=**\<config-file-path\>
**\--rga-max-archive-recursion=**\<max-archive-recursion\>
> Maximum nestedness of archives to recurse into \[default: 4\]
**\--rga-cache-max-blob-len=**\<max-blob-len\>
> Max compressed size to cache > Max compressed size to cache
> >
> Longest byte length (after compression) to store in cache. Longer > Longest byte length (after compression) to store in cache. Longer
> adapter outputs will not be cached and recomputed every time. Allowed > adapter outputs will not be cached and recomputed every time.
> suffixes: k M G \[default: 2000000\] >
> Allowed suffixes on command line: k M G \[default: 2000000\]
**\--rga-max-archive-recursion=**\<max-archive-recursion\> **\--rga-cache-path=**\<path\>
> Maximum nestedness of archives to recurse into \[default: 4\] > Path to store cache db \[default: /home/phire/.cache/ripgrep-all\]
**-h** shows a concise overview, **\--help** shows more detail and **-h** shows a concise overview, **\--help** shows more detail and
advanced options. advanced options.
@ -287,6 +263,7 @@ to debug the adapters.
You can use the provided [`flake.nix`](./flake.nix) to setup all build- and You can use the provided [`flake.nix`](./flake.nix) to setup all build- and
run-time dependencies: run-time dependencies:
1. Enable [Flakes](https://nixos.wiki/wiki/Flakes) in your Nix configuration. 1. Enable [Flakes](https://nixos.wiki/wiki/Flakes) in your Nix configuration.
1. Add [`direnv`](https://direnv.net/) to your profile: 1. Add [`direnv`](https://direnv.net/) to your profile:
`nix profile install nixpkgs#direnv` `nix profile install nixpkgs#direnv`

@ -7,7 +7,7 @@
// https://github.com/phiresky/ripgrep-all/blob/master/doc/config.default.jsonc // https://github.com/phiresky/ripgrep-all/blob/master/doc/config.default.jsonc
// The config options are the same as the command line options, // The config options are the same as the command line options,
// but with --rga- prefix removed and - replaced with _. // but with --rga- prefix removed and - and . replaced with _.
// e.g. --rga-no-cache becomes `"no_cache": true. // e.g. --rga-no-cache becomes `"no_cache": true.
// The only exception is the `custom_adapters` option, which can only be set in this file. // The only exception is the `custom_adapters` option, which can only be set in this file.

@ -5,7 +5,7 @@ content=$(
<!-- this part generated by update-readme.sh --> <!-- this part generated by update-readme.sh -->
$(cargo run --bin rga -- --rga-list-adapters) $(cargo run --bin rga -- --rga-list-adapters)
$(help2man -N "cargo run --bin rga --" | pandoc -f man -t markdown --atx-headers | rg --multiline "## USAGE:(.|\n)*") $(help2man -N "cargo run --bin rga --" | pandoc -f man -t markdown --markdown-headings=atx | rg --multiline "## USAGE:(.|\n)*")
<!-- end of part generated by update-readme.sh --> <!-- end of part generated by update-readme.sh -->
END END
) )

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.9 MiB

@ -3,11 +3,11 @@
"advisory-db": { "advisory-db": {
"flake": false, "flake": false,
"locked": { "locked": {
"lastModified": 1670452192, "lastModified": 1685821301,
"narHash": "sha256-f8NIFbqSbCzpssgDUK4srfgKaVaMhDScEptw4uuxGAc=", "narHash": "sha256-4XRcnSboLJw1XKjDpg2jBU70jEw/8Bgx4nUmnq3kXbY=",
"owner": "rustsec", "owner": "rustsec",
"repo": "advisory-db", "repo": "advisory-db",
"rev": "0a2faeb87195392b23333a8097309d29f2c5d31d", "rev": "af3f3d503f82056785841bee49997bae65eba1c0",
"type": "github" "type": "github"
}, },
"original": { "original": {
@ -26,11 +26,11 @@
"rust-overlay": "rust-overlay" "rust-overlay": "rust-overlay"
}, },
"locked": { "locked": {
"lastModified": 1670546681, "lastModified": 1684981077,
"narHash": "sha256-S33bhME0zPHPEZyZPCsrdQL/4WW/A020PwN+a3z7Q+I=", "narHash": "sha256-68X9cFm0RTZm8u0rXPbeBzOVUH5OoUGAfeHHVoxGd9o=",
"owner": "ipetkov", "owner": "ipetkov",
"repo": "crane", "repo": "crane",
"rev": "63f80ee278897e72a1468090278716b5befa5128", "rev": "35110cccf28823320f4fd697fcafcb5038683982",
"type": "github" "type": "github"
}, },
"original": { "original": {
@ -42,11 +42,11 @@
"flake-compat": { "flake-compat": {
"flake": false, "flake": false,
"locked": { "locked": {
"lastModified": 1668681692, "lastModified": 1673956053,
"narHash": "sha256-Ht91NGdewz8IQLtWZ9LCeNXMSXHUss+9COoqu6JLmXU=", "narHash": "sha256-4gtG9iQuiKITOjNQQeQIpoIB6b16fm+504Ch3sNKLd8=",
"owner": "edolstra", "owner": "edolstra",
"repo": "flake-compat", "repo": "flake-compat",
"rev": "009399224d5e398d03b22badca40a37ac85412a1", "rev": "35bb57c0c8d8b62bbfd284272c928ceb64ddbde9",
"type": "github" "type": "github"
}, },
"original": { "original": {
@ -58,11 +58,11 @@
"flake-compat_2": { "flake-compat_2": {
"flake": false, "flake": false,
"locked": { "locked": {
"lastModified": 1668681692, "lastModified": 1673956053,
"narHash": "sha256-Ht91NGdewz8IQLtWZ9LCeNXMSXHUss+9COoqu6JLmXU=", "narHash": "sha256-4gtG9iQuiKITOjNQQeQIpoIB6b16fm+504Ch3sNKLd8=",
"owner": "edolstra", "owner": "edolstra",
"repo": "flake-compat", "repo": "flake-compat",
"rev": "009399224d5e398d03b22badca40a37ac85412a1", "rev": "35bb57c0c8d8b62bbfd284272c928ceb64ddbde9",
"type": "github" "type": "github"
}, },
"original": { "original": {
@ -72,12 +72,15 @@
} }
}, },
"flake-utils": { "flake-utils": {
"inputs": {
"systems": "systems"
},
"locked": { "locked": {
"lastModified": 1667395993, "lastModified": 1681202837,
"narHash": "sha256-nuEHfE/LcWyuSWnS8t12N1wc105Qtau+/OdUAjtQ0rA=", "narHash": "sha256-H+Rh19JDwRtpVPAWp64F+rlEtxUWBAQW28eAi3SRSzg=",
"owner": "numtide", "owner": "numtide",
"repo": "flake-utils", "repo": "flake-utils",
"rev": "5aed5285a952e0b949eb3ba02c12fa4fcfef535f", "rev": "cfacdce06f30d2b68473a46042957675eebb3401",
"type": "github" "type": "github"
}, },
"original": { "original": {
@ -87,27 +90,15 @@
} }
}, },
"flake-utils_2": { "flake-utils_2": {
"locked": { "inputs": {
"lastModified": 1667395993, "systems": "systems_2"
"narHash": "sha256-nuEHfE/LcWyuSWnS8t12N1wc105Qtau+/OdUAjtQ0rA=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "5aed5285a952e0b949eb3ba02c12fa4fcfef535f",
"type": "github"
}, },
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"flake-utils_3": {
"locked": { "locked": {
"lastModified": 1667395993, "lastModified": 1685518550,
"narHash": "sha256-nuEHfE/LcWyuSWnS8t12N1wc105Qtau+/OdUAjtQ0rA=", "narHash": "sha256-o2d0KcvaXzTrPRIo0kOLV0/QXHhDQ5DTi+OxcjO8xqY=",
"owner": "numtide", "owner": "numtide",
"repo": "flake-utils", "repo": "flake-utils",
"rev": "5aed5285a952e0b949eb3ba02c12fa4fcfef535f", "rev": "a1720a10a6cfe8234c0e93907ffe81be440f4cef",
"type": "github" "type": "github"
}, },
"original": { "original": {
@ -139,48 +130,31 @@
}, },
"nixpkgs": { "nixpkgs": {
"locked": { "locked": {
"lastModified": 1670525689, "lastModified": 1685860998,
"narHash": "sha256-YIjGzxrRQa5LYO0zlnH/ardcwXsRgsnHe3TkGkvCxbc=", "narHash": "sha256-ZexAPe8yvJaLvn5aVgjW0vY41RnmJGbgOdGBJk1yDIE=",
"owner": "NixOS", "owner": "NixOS",
"repo": "nixpkgs", "repo": "nixpkgs",
"rev": "f21f11aa2a02cb78651c6d57546c7d7541f9240c", "rev": "45d47b647d7bbaede5121d731cbee78f6093b6d6",
"type": "github" "type": "github"
}, },
"original": { "original": {
"owner": "NixOS", "owner": "NixOS",
"ref": "nixpkgs-unstable",
"repo": "nixpkgs", "repo": "nixpkgs",
"type": "github" "type": "github"
} }
}, },
"nixpkgs-stable": { "nixpkgs-stable": {
"locked": { "locked": {
"lastModified": 1668984258, "lastModified": 1678872516,
"narHash": "sha256-0gDMJ2T3qf58xgcSbYoXiRGUkPWmKyr5C3vcathWhKs=", "narHash": "sha256-/E1YwtMtFAu2KUQKV/1+KFuReYPANM2Rzehk84VxVoc=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "cf63ade6f74bbc9d2a017290f1b2e33e8fbfa70a",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-22.05",
"repo": "nixpkgs",
"type": "github"
}
},
"nixpkgs_2": {
"locked": {
"lastModified": 1668994630,
"narHash": "sha256-1lqx6HLyw6fMNX/hXrrETG1vMvZRGm2XVC9O/Jt0T6c=",
"owner": "NixOS", "owner": "NixOS",
"repo": "nixpkgs", "repo": "nixpkgs",
"rev": "af50806f7c6ab40df3e6b239099e8f8385f6c78b", "rev": "9b8e5abb18324c7fe9f07cb100c3cd4a29cda8b8",
"type": "github" "type": "github"
}, },
"original": { "original": {
"owner": "NixOS", "owner": "NixOS",
"ref": "nixos-unstable", "ref": "nixos-22.11",
"repo": "nixpkgs", "repo": "nixpkgs",
"type": "github" "type": "github"
} }
@ -188,17 +162,21 @@
"pre-commit-hooks": { "pre-commit-hooks": {
"inputs": { "inputs": {
"flake-compat": "flake-compat_2", "flake-compat": "flake-compat_2",
"flake-utils": "flake-utils_3", "flake-utils": [
"flake-utils"
],
"gitignore": "gitignore", "gitignore": "gitignore",
"nixpkgs": "nixpkgs_2", "nixpkgs": [
"nixpkgs"
],
"nixpkgs-stable": "nixpkgs-stable" "nixpkgs-stable": "nixpkgs-stable"
}, },
"locked": { "locked": {
"lastModified": 1670413394, "lastModified": 1685361114,
"narHash": "sha256-M7sWqrKtOqUv9euX1t3HCxis8cPy9MNiZxQmUf0KF1o=", "narHash": "sha256-4RjrlSb+OO+e1nzTExKW58o3WRwVGpXwj97iCta8aj4=",
"owner": "cachix", "owner": "cachix",
"repo": "pre-commit-hooks.nix", "repo": "pre-commit-hooks.nix",
"rev": "1303a1a76e9eb074075bfe566518c413f6fc104e", "rev": "ca2fdbf3edda2a38140184da6381d49f8206eaf4",
"type": "github" "type": "github"
}, },
"original": { "original": {
@ -229,11 +207,11 @@
] ]
}, },
"locked": { "locked": {
"lastModified": 1670034122, "lastModified": 1683080331,
"narHash": "sha256-EqmuOKucPWtMvCZtHraHr3Q3bgVszq1x2PoZtQkUuEk=", "narHash": "sha256-nGDvJ1DAxZIwdn6ww8IFwzoHb2rqBP4wv/65Wt5vflk=",
"owner": "oxalica", "owner": "oxalica",
"repo": "rust-overlay", "repo": "rust-overlay",
"rev": "a0d5773275ecd4f141d792d3a0376277c0fc0b65", "rev": "d59c3fa0cba8336e115b376c2d9e91053aa59e56",
"type": "github" "type": "github"
}, },
"original": { "original": {
@ -252,11 +230,11 @@
] ]
}, },
"locked": { "locked": {
"lastModified": 1670552927, "lastModified": 1685846256,
"narHash": "sha256-lCE51eAGrAFS4k9W5aDGFpVtOAwQQ/rFMN80PCDh0vo=", "narHash": "sha256-G4aYK4VqlMHImvZ0lUnLHw1A+Cx28T0sBMvAKZBcGpk=",
"owner": "oxalica", "owner": "oxalica",
"repo": "rust-overlay", "repo": "rust-overlay",
"rev": "a0fdafd18c9cf599fde17fbaf07dbb20fa57eecb", "rev": "1ef3c6de6127a1cba94cc5492cdde52e33d06ea4",
"type": "github" "type": "github"
}, },
"original": { "original": {
@ -264,6 +242,36 @@
"repo": "rust-overlay", "repo": "rust-overlay",
"type": "github" "type": "github"
} }
},
"systems": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
},
"systems_2": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
} }
}, },
"root": "root", "root": "root",

@ -3,7 +3,7 @@
"ripgrep, but also search in PDFs, E-Books, Office documents, zip, tar.gz, etc."; "ripgrep, but also search in PDFs, E-Books, Office documents, zip, tar.gz, etc.";
inputs = { inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixpkgs-unstable"; nixpkgs.url = "github:NixOS/nixpkgs";
crane = { crane = {
url = "github:ipetkov/crane"; url = "github:ipetkov/crane";
@ -25,7 +25,13 @@
flake = false; flake = false;
}; };
pre-commit-hooks.url = "github:cachix/pre-commit-hooks.nix"; pre-commit-hooks = {
url = "github:cachix/pre-commit-hooks.nix";
inputs = {
nixpkgs.follows = "nixpkgs";
flake-utils.follows = "flake-utils";
};
};
}; };
outputs = { self, nixpkgs, crane, flake-utils, rust-overlay, advisory-db outputs = { self, nixpkgs, crane, flake-utils, rust-overlay, advisory-db
@ -36,14 +42,16 @@
inherit system; inherit system;
overlays = [ (import rust-overlay) ]; overlays = [ (import rust-overlay) ];
}; };
inherit (pkgs) lib;
craneLib = crane.lib.${system}; craneLib = crane.lib.${system};
src = craneLib.cleanCargoSource ./.; src = pkgs.lib.cleanSourceWith {
src = craneLib.path ./.;
filter = pkgs.lib.cleanSourceFilter;
};
buildInputs = with pkgs; buildInputs = with pkgs;
[ ffmpeg imagemagick pandoc poppler_utils ripgrep tesseract ] [ ffmpeg imagemagick pandoc poppler_utils ripgrep tesseract ]
++ lib.optionals pkgs.stdenv.isDarwin [ ++ pkgs.lib.optionals pkgs.stdenv.isDarwin [
# Additional darwin specific inputs can be set here # Additional darwin specific inputs can be set here
pkgs.libiconv pkgs.libiconv
]; ];
@ -54,10 +62,7 @@
# Build the actual crate itself, reusing the dependency # Build the actual crate itself, reusing the dependency
# artifacts from above. # artifacts from above.
rga = craneLib.buildPackage { rga = craneLib.buildPackage { inherit cargoArtifacts src buildInputs; };
inherit cargoArtifacts src buildInputs;
doCheck = false;
};
pre-commit = pre-commit-hooks.lib."${system}".run; pre-commit = pre-commit-hooks.lib."${system}".run;
in { in {
@ -97,18 +102,20 @@
hooks = { hooks = {
nixfmt.enable = true; nixfmt.enable = true;
rustfmt.enable = true; rustfmt.enable = true;
cargo-check.enable = true; typos = {
enable = true;
types = [ "text" ];
excludes = [ "exampledir/.*" ];
};
}; };
}; };
} // lib.optionalAttrs (system == "x86_64-linux") {
# NB: cargo-tarpaulin only supports x86_64 systems
# Check code coverage (note: this will not upload coverage anywhere)
rga-coverage =
craneLib.cargoTarpaulin { inherit cargoArtifacts src; };
}; };
# `nix build` # `nix build`
packages.default = rga; packages = {
inherit rga; # `nix build .#rga`
default = rga; # `nix build`
};
# `nix run` # `nix run`
apps.default = flake-utils.lib.mkApp { drv = rga; }; apps.default = flake-utils.lib.mkApp { drv = rga; };

@ -10,6 +10,7 @@ pub mod writing;
pub mod zip; pub mod zip;
use crate::{adapted_iter::AdaptedFilesIterBox, config::RgaConfig, matching::*}; use crate::{adapted_iter::AdaptedFilesIterBox, config::RgaConfig, matching::*};
use anyhow::{format_err, Context, Result}; use anyhow::{format_err, Context, Result};
use async_trait::async_trait;
use custom::CustomAdapterConfig; use custom::CustomAdapterConfig;
use custom::BUILTIN_SPAWNING_ADAPTERS; use custom::BUILTIN_SPAWNING_ADAPTERS;
use log::*; use log::*;
@ -77,11 +78,17 @@ impl AdapterMeta {
pub trait GetMetadata { pub trait GetMetadata {
fn metadata(&self) -> &AdapterMeta; fn metadata(&self) -> &AdapterMeta;
} }
#[async_trait]
pub trait FileAdapter: GetMetadata + Send + Sync { pub trait FileAdapter: GetMetadata + Send + Sync {
/// adapt a file. /// adapt a file.
/// ///
/// detection_reason is the Matcher that was used to identify this file. Unless --rga-accurate was given, it is always a FastMatcher /// detection_reason is the Matcher that was used to identify this file. Unless --rga-accurate was given, it is always a FastMatcher
fn adapt(&self, a: AdaptInfo, detection_reason: &FileMatcher) -> Result<AdaptedFilesIterBox>; async fn adapt(
&self,
a: AdaptInfo,
detection_reason: &FileMatcher,
) -> Result<AdaptedFilesIterBox>;
} }
pub struct AdaptInfo { pub struct AdaptInfo {

@ -49,8 +49,9 @@ pub struct CustomAdapterConfig {
pub args: Vec<String>, pub args: Vec<String>,
/// The output path hint. The placeholders are the same as for `.args` /// The output path hint. The placeholders are the same as for `.args`
/// ///
/// If not set, defaults to ${input_virtual_path}.txt /// If not set, defaults to "${input_virtual_path}.txt"
/// ///
/// Setting this is useful if the output format is not plain text (.txt) but instead some other format that should be passed to another adapter
pub output_path_hint: Option<String>, pub output_path_hint: Option<String>,
} }
@ -128,7 +129,6 @@ lazy_static! {
disabled_by_default: None, disabled_by_default: None,
match_only_by_mime: None, match_only_by_mime: None,
output_path_hint: Some("${input_virtual_path}.txt.asciipagebreaks".into()) output_path_hint: Some("${input_virtual_path}.txt.asciipagebreaks".into())
// postprocessors: [{name: "add_page_numbers_by_pagebreaks"}]
} }
]; ];
} }
@ -143,15 +143,13 @@ pub fn map_exe_error(err: std::io::Error, exe_name: &str, help: &str) -> anyhow:
} }
} }
fn proc_wait(mut child: Child) -> impl AsyncRead { fn proc_wait(mut child: Child, context: impl FnOnce() -> String) -> impl AsyncRead {
let s = stream! { let s = stream! {
let res = child.wait().await?; let res = child.wait().await?;
if res.success() { if res.success() {
yield std::io::Result::Ok(Bytes::new()); yield std::io::Result::Ok(Bytes::new());
} else { } else {
yield std::io::Result::Err(to_io_err( Err(format_err!("{:?}", res)).with_context(context).map_err(to_io_err)?;
format_err!("subprocess failed: {:?}", res),
));
} }
}; };
StreamReader::new(s) StreamReader::new(s)
@ -164,6 +162,7 @@ pub fn pipe_output(
exe_name: &str, exe_name: &str,
help: &str, help: &str,
) -> Result<ReadBox> { ) -> Result<ReadBox> {
let cmd_log = format!("{:?}", cmd); // todo: perf
let mut cmd = cmd let mut cmd = cmd
.stdin(Stdio::piped()) .stdin(Stdio::piped())
.stdout(Stdio::piped()) .stdout(Stdio::piped())
@ -177,10 +176,9 @@ pub fn pipe_output(
tokio::io::copy(&mut z, &mut stdi).await?; tokio::io::copy(&mut z, &mut stdi).await?;
std::io::Result::Ok(()) std::io::Result::Ok(())
}); });
Ok(Box::pin(stdo.chain(
Ok(Box::pin( proc_wait(cmd, move || format!("subprocess: {cmd_log}")).chain(join_handle_to_stream(join)),
stdo.chain(proc_wait(cmd).chain(join_handle_to_stream(join))), )))
))
} }
pub struct CustomSpawningFileAdapter { pub struct CustomSpawningFileAdapter {
@ -224,8 +222,9 @@ impl CustomSpawningFileAdapter {
Ok(command) Ok(command)
} }
} }
#[async_trait]
impl FileAdapter for CustomSpawningFileAdapter { impl FileAdapter for CustomSpawningFileAdapter {
fn adapt<'a>( async fn adapt(
&self, &self,
ai: AdaptInfo, ai: AdaptInfo,
_detection_reason: &FileMatcher, _detection_reason: &FileMatcher,
@ -314,7 +313,7 @@ mod test {
let (a, d) = simple_adapt_info(&filepath, Box::pin(File::open(&filepath).await?)); let (a, d) = simple_adapt_info(&filepath, Box::pin(File::open(&filepath).await?));
// let r = adapter.adapt(a, &d)?; // let r = adapter.adapt(a, &d)?;
let r = loop_adapt(&adapter, d, a)?; let r = loop_adapt(&adapter, d, a).await?;
let o = adapted_to_vec(r).await?; let o = adapted_to_vec(r).await?;
assert_eq!( assert_eq!(
String::from_utf8(o)?, String::from_utf8(o)?,
@ -368,7 +367,7 @@ PREFIX:Page 1:
Path::new("foo.txt"), Path::new("foo.txt"),
Box::pin(Cursor::new(Vec::from(input))), Box::pin(Cursor::new(Vec::from(input))),
); );
let output = adapter.adapt(a, &d).unwrap(); let output = adapter.adapt(a, &d).await.unwrap();
let oup = adapted_to_vec(output).await?; let oup = adapted_to_vec(output).await?;
println!("output: {}", String::from_utf8_lossy(&oup)); println!("output: {}", String::from_utf8_lossy(&oup));

@ -93,8 +93,13 @@ fn get_inner_filename(filename: &Path) -> PathBuf {
filename.with_file_name(format!("{}{}", stem, new_extension)) filename.with_file_name(format!("{}{}", stem, new_extension))
} }
#[async_trait]
impl FileAdapter for DecompressAdapter { impl FileAdapter for DecompressAdapter {
fn adapt(&self, ai: AdaptInfo, detection_reason: &FileMatcher) -> Result<AdaptedFilesIterBox> { async fn adapt(
&self,
ai: AdaptInfo,
detection_reason: &FileMatcher,
) -> Result<AdaptedFilesIterBox> {
Ok(one_file(AdaptInfo { Ok(one_file(AdaptInfo {
filepath_hint: get_inner_filename(&ai.filepath_hint), filepath_hint: get_inner_filename(&ai.filepath_hint),
is_real_file: false, is_real_file: false,
@ -137,7 +142,7 @@ mod tests {
let filepath = test_data_dir().join("hello.gz"); let filepath = test_data_dir().join("hello.gz");
let (a, d) = simple_adapt_info(&filepath, Box::pin(File::open(&filepath).await?)); let (a, d) = simple_adapt_info(&filepath, Box::pin(File::open(&filepath).await?));
let r = adapter.adapt(a, &d)?; let r = adapter.adapt(a, &d).await?;
let o = adapted_to_vec(r).await?; let o = adapted_to_vec(r).await?;
assert_eq!(String::from_utf8(o)?, "hello\n"); assert_eq!(String::from_utf8(o)?, "hello\n");
Ok(()) Ok(())
@ -150,7 +155,7 @@ mod tests {
let filepath = test_data_dir().join("short.pdf.gz"); let filepath = test_data_dir().join("short.pdf.gz");
let (a, d) = simple_adapt_info(&filepath, Box::pin(File::open(&filepath).await?)); let (a, d) = simple_adapt_info(&filepath, Box::pin(File::open(&filepath).await?));
let r = loop_adapt(&adapter, d, a)?; let r = loop_adapt(&adapter, d, a).await?;
let o = adapted_to_vec(r).await?; let o = adapted_to_vec(r).await?;
assert_eq!( assert_eq!(
String::from_utf8(o)?, String::from_utf8(o)?,

@ -14,13 +14,15 @@ use writing::WritingFileAdapter;
// maybe todo: read list of extensions from // maybe todo: read list of extensions from
// ffmpeg -demuxers | tail -n+5 | awk '{print $2}' | while read demuxer; do echo MUX=$demuxer; ffmpeg -h demuxer=$demuxer | grep 'Common extensions'; done 2>/dev/null // ffmpeg -demuxers | tail -n+5 | awk '{print $2}' | while read demuxer; do echo MUX=$demuxer; ffmpeg -h demuxer=$demuxer | grep 'Common extensions'; done 2>/dev/null
// but really, the probability of getting useful information from a .flv is low // but really, the probability of getting useful information from a .flv is low
static EXTENSIONS: &[&str] = &["mkv", "mp4", "avi"]; static EXTENSIONS: &[&str] = &["mkv", "mp4", "avi", "mp3", "ogg", "flac", "webm"];
lazy_static! { lazy_static! {
static ref METADATA: AdapterMeta = AdapterMeta { static ref METADATA: AdapterMeta = AdapterMeta {
name: "ffmpeg".to_owned(), name: "ffmpeg".to_owned(),
version: 1, version: 1,
description: "Uses ffmpeg to extract video metadata/chapters and subtitles".to_owned(), description:
"Uses ffmpeg to extract video metadata/chapters, subtitles, lyrics, and other metadata"
.to_owned(),
recurses: false, recurses: false,
fast_matchers: EXTENSIONS fast_matchers: EXTENSIONS
.iter() .iter()
@ -52,7 +54,7 @@ struct FFprobeOutput {
} }
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
struct FFprobeStream { struct FFprobeStream {
codec_type: String, // video,audio,subtitle index: i32, // stream index
} }
#[async_trait] #[async_trait]
@ -78,17 +80,17 @@ impl WritingFileAdapter for FFmpegAdapter {
} }
let inp_fname = filepath_hint; let inp_fname = filepath_hint;
let spawn_fail = |e| map_exe_error(e, "ffprobe", "Make sure you have ffmpeg installed."); let spawn_fail = |e| map_exe_error(e, "ffprobe", "Make sure you have ffmpeg installed.");
let has_subtitles = { let subtitle_streams = {
let probe = Command::new("ffprobe") let probe = Command::new("ffprobe")
.args(vec![ .args(vec![
"-v", "-v",
"error", "error", // show all errors
"-select_streams", "-select_streams",
"s", "s", // show only subtitle streams
"-of", "-of",
"json", "json", // use json as output format
"-show_entries", "-show_entries",
"stream=codec_type", "stream=index", // show index of subtitle streams
]) ])
.arg("-i") .arg("-i")
.arg(&inp_fname) .arg(&inp_fname)
@ -96,10 +98,14 @@ impl WritingFileAdapter for FFmpegAdapter {
.await .await
.map_err(spawn_fail)?; .map_err(spawn_fail)?;
if !probe.status.success() { if !probe.status.success() {
return Err(format_err!("ffprobe failed: {:?}", probe.status)); return Err(format_err!(
"ffprobe failed: {:?}\n{}",
probe.status,
String::from_utf8_lossy(&probe.stderr)
));
} }
let p: FFprobeOutput = serde_json::from_slice(&probe.stdout)?; let p: FFprobeOutput = serde_json::from_slice(&probe.stdout)?;
!p.streams.is_empty() p.streams
}; };
{ {
// extract file metadata (especially chapter names in a greppable format) // extract file metadata (especially chapter names in a greppable format)
@ -124,6 +130,7 @@ impl WritingFileAdapter for FFmpegAdapter {
.spawn()?; .spawn()?;
let mut lines = BufReader::new(probe.stdout.as_mut().unwrap()).lines(); let mut lines = BufReader::new(probe.stdout.as_mut().unwrap()).lines();
while let Some(line) = lines.next_line().await? { while let Some(line) = lines.next_line().await? {
let line = line.replace("\\r\\n", "\n").replace("\\n", "\n"); // just unescape newlines
async_writeln!(oup, "metadata: {line}")?; async_writeln!(oup, "metadata: {line}")?;
} }
let exit = probe.wait().await?; let exit = probe.wait().await?;
@ -131,31 +138,35 @@ impl WritingFileAdapter for FFmpegAdapter {
return Err(format_err!("ffprobe failed: {:?}", exit)); return Err(format_err!("ffprobe failed: {:?}", exit));
} }
} }
if has_subtitles { if !subtitle_streams.is_empty() {
// extract subtitles for probe_stream in subtitle_streams.iter() {
let mut cmd = Command::new("ffmpeg"); // extract subtitles
cmd.arg("-hide_banner") let mut cmd = Command::new("ffmpeg");
.arg("-loglevel") cmd.arg("-hide_banner")
.arg("panic") .arg("-loglevel")
.arg("-i") .arg("panic")
.arg(&inp_fname) .arg("-i")
.arg("-f") .arg(&inp_fname)
.arg("webvtt") .arg("-map")
.arg("-"); .arg(format!("0:{}", probe_stream.index)) // 0 for first input
let mut cmd = cmd.stdout(Stdio::piped()).spawn().map_err(spawn_fail)?; .arg("-f")
let stdo = cmd.stdout.as_mut().expect("is piped"); .arg("webvtt")
let time_re = Regex::new(r".*\d.*-->.*\d.*").unwrap(); .arg("-");
let mut time: String = "".to_owned(); let mut cmd = cmd.stdout(Stdio::piped()).spawn().map_err(spawn_fail)?;
// rewrite subtitle times so they are shown as a prefix in every line let stdo = cmd.stdout.as_mut().expect("is piped");
let mut lines = BufReader::new(stdo).lines(); let time_re = Regex::new(r".*\d.*-->.*\d.*").unwrap();
while let Some(line) = lines.next_line().await? { let mut time: String = "".to_owned();
// 09:55.195 --> 09:56.730 // rewrite subtitle times so they are shown as a prefix in every line
if time_re.is_match(&line) { let mut lines = BufReader::new(stdo).lines();
time = line.to_owned(); while let Some(line) = lines.next_line().await? {
} else if line.is_empty() { // 09:55.195 --> 09:56.730
async_writeln!(oup)?; if time_re.is_match(&line) {
} else { time = line.to_owned();
async_writeln!(oup, "{time}: {line}")?; } else if line.is_empty() {
async_writeln!(oup)?;
} else {
async_writeln!(oup, "{time}: {line}")?;
}
} }
} }
} }

@ -4,7 +4,11 @@
use anyhow::Result; use anyhow::Result;
use async_stream::stream; use async_stream::stream;
use async_trait::async_trait;
use bytes::Bytes; use bytes::Bytes;
use encoding_rs::Encoding;
use encoding_rs_io::DecodeReaderBytesBuilder;
use tokio_util::io::SyncIoBridge;
use std::io::Cursor; use std::io::Cursor;
use std::path::PathBuf; use std::path::PathBuf;
@ -41,15 +45,16 @@ impl GetMetadata for PostprocPrefix {
&METADATA &METADATA
} }
} }
#[async_trait]
impl FileAdapter for PostprocPrefix { impl FileAdapter for PostprocPrefix {
fn adapt<'a>( async fn adapt(
&self, &self,
a: super::AdaptInfo, a: super::AdaptInfo,
_detection_reason: &crate::matching::FileMatcher, _detection_reason: &crate::matching::FileMatcher,
) -> Result<AdaptedFilesIterBox> { ) -> Result<AdaptedFilesIterBox> {
let read = add_newline(postproc_prefix( let read = add_newline(postproc_prefix(
&a.line_prefix, &a.line_prefix,
postproc_encoding(&a.line_prefix, a.inp)?, postproc_encoding(&a.line_prefix, a.inp).await?,
)); ));
// keep adapt info (filename etc) except replace inp // keep adapt info (filename etc) except replace inp
let ai = AdaptInfo { let ai = AdaptInfo {
@ -74,50 +79,53 @@ impl Read for ReadErr {
* Detects and converts encodings other than utf-8 to utf-8. * Detects and converts encodings other than utf-8 to utf-8.
* If the input stream does not contain valid text, returns the string `[rga: binary data]` instead * If the input stream does not contain valid text, returns the string `[rga: binary data]` instead
*/ */
pub fn postproc_encoding( async fn postproc_encoding(
_line_prefix: &str, _line_prefix: &str,
inp: impl AsyncRead + Send + 'static, inp: Pin<Box<dyn AsyncRead + Send>>,
) -> Result<Pin<Box<dyn AsyncRead + Send>>> { ) -> Result<Pin<Box<dyn AsyncRead + Send>>> {
Ok(Box::pin(inp))
// panic!("todo: implement");
/*// TODO: parse these options from ripgrep's configuration
let encoding = None; // detect bom but usually assume utf8
let bom_sniffing = true;
let mut decode_builder = DecodeReaderBytesBuilder::new();
// https://github.com/BurntSushi/ripgrep/blob/a7d26c8f144a4957b75f71087a66692d0b25759a/grep-searcher/src/searcher/mod.rs#L706
// this detects utf-16 BOMs and transcodes to utf-8 if they are present
// it does not detect any other char encodings. that would require https://github.com/hsivonen/chardetng or similar but then binary detection is hard (?)
let inp = decode_builder
.encoding(encoding)
.utf8_passthru(true)
.strip_bom(bom_sniffing)
.bom_override(true)
.bom_sniffing(bom_sniffing)
.build(inp);
// check for binary content in first 8kB // check for binary content in first 8kB
// read the first 8kB into a buffer, check for null bytes, then return the buffer concatenated with the rest of the file // read the first 8kB into a buffer, check for null bytes, then return the buffer concatenated with the rest of the file
let mut fourk = Vec::with_capacity(1 << 13); let mut fourk = Vec::with_capacity(1 << 13);
let mut beginning = inp.take(1 << 13); let mut beginning = inp.take(1 << 13);
beginning.read_to_end(&mut fourk)?; beginning.read_to_end(&mut fourk).await?;
let has_binary = fourk.contains(&0u8);
if fourk.contains(&0u8) {
log::debug!("detected binary"); let enc = Encoding::for_bom(&fourk);
let v = "[rga: binary data]"; let inp = Cursor::new(fourk).chain(beginning.into_inner());
return Ok(Box::new(std::io::Cursor::new(v))); match enc {
/*let err = std::io::Error::new( Some((enc, _)) if enc != encoding_rs::UTF_8 => {
std::io::ErrorKind::InvalidData, // detected UTF16LE or UTF16BE, convert to UTF8 in separate thread
format!("{}[rga: binary data]", line_prefix), // TODO: parse these options from ripgrep's configuration
); let encoding = None; // detect bom but usually assume utf8
return Err(err).context(""); let bom_sniffing = true;
return ReadErr { let mut decode_builder = DecodeReaderBytesBuilder::new();
err, // https://github.com/BurntSushi/ripgrep/blob/a7d26c8f144a4957b75f71087a66692d0b25759a/grep-searcher/src/searcher/mod.rs#L706
};*/ // this detects utf-16 BOMs and transcodes to utf-8 if they are present
// it does not detect any other char encodings. that would require https://github.com/hsivonen/chardetng or similar but then binary detection is hard (?)
let mut inp = decode_builder
.encoding(encoding)
.utf8_passthru(true)
.strip_bom(bom_sniffing)
.bom_override(true)
.bom_sniffing(bom_sniffing)
.build(SyncIoBridge::new(inp));
let oup = tokio::task::spawn_blocking(move || -> Result<Vec<u8>> {
let mut oup = Vec::new();
std::io::Read::read_to_end(&mut inp, &mut oup)?;
Ok(oup)
})
.await??;
Ok(Box::pin(Cursor::new(oup)))
}
_ => {
if has_binary {
log::debug!("detected binary");
return Ok(Box::pin(Cursor::new("[rga: binary data]")));
}
Ok(Box::pin(inp))
}
} }
Ok(Box::new(
std::io::Cursor::new(fourk).chain(beginning.into_inner()),
))*/
} }
/// Adds the given prefix to each line in an `AsyncRead`. /// Adds the given prefix to each line in an `AsyncRead`.
@ -164,13 +172,14 @@ impl GetMetadata for PostprocPageBreaks {
&METADATA &METADATA
} }
} }
#[async_trait]
impl FileAdapter for PostprocPageBreaks { impl FileAdapter for PostprocPageBreaks {
fn adapt<'a>( async fn adapt(
&self, &self,
a: super::AdaptInfo, a: super::AdaptInfo,
_detection_reason: &crate::matching::FileMatcher, _detection_reason: &crate::matching::FileMatcher,
) -> Result<AdaptedFilesIterBox> { ) -> Result<AdaptedFilesIterBox> {
let read = postproc_pagebreaks(postproc_encoding(&a.line_prefix, a.inp)?); let read = postproc_pagebreaks(postproc_encoding(&a.line_prefix, a.inp).await?);
// keep adapt info (filename etc) except replace inp // keep adapt info (filename etc) except replace inp
let ai = AdaptInfo { let ai = AdaptInfo {
inp: Box::pin(read), inp: Box::pin(read),
@ -282,7 +291,7 @@ mod tests {
let fname = test_data_dir().join("twoblankpages.pdf"); let fname = test_data_dir().join("twoblankpages.pdf");
let rd = File::open(&fname).await?; let rd = File::open(&fname).await?;
let (a, d) = simple_adapt_info(&fname, Box::pin(rd)); let (a, d) = simple_adapt_info(&fname, Box::pin(rd));
let res = loop_adapt(&adapter, d, a)?; let res = loop_adapt(&adapter, d, a).await?;
let buf = adapted_to_vec(res).await?; let buf = adapted_to_vec(res).await?;
@ -327,7 +336,8 @@ PREFIX:Page 3:
b: &str, b: &str,
) -> Result<()> { ) -> Result<()> {
let mut oup = Vec::new(); let mut oup = Vec::new();
let inp = postproc_encoding("", a)?; let inp = Box::pin(Cursor::new(a));
let inp = postproc_encoding("", inp).await?;
if pagebreaks { if pagebreaks {
postproc_pagebreaks(inp).read_to_end(&mut oup).await?; postproc_pagebreaks(inp).read_to_end(&mut oup).await?;
} else { } else {
@ -341,6 +351,23 @@ PREFIX:Page 3:
Ok(()) Ok(())
} }
#[tokio::test]
async fn test_utf16() -> Result<()> {
let utf16lebom: &[u8] = &[
0xff, 0xfe, 0x68, 0x00, 0x65, 0x00, 0x6c, 0x00, 0x6c, 0x00, 0x6f, 0x00, 0x20, 0x00,
0x77, 0x00, 0x6f, 0x00, 0x72, 0x00, 0x6c, 0x00, 0x64, 0x00, 0x20, 0x00, 0x3d, 0xd8,
0xa9, 0xdc, 0x0a, 0x00,
];
let utf16bebom: &[u8] = &[
0xfe, 0xff, 0x00, 0x68, 0x00, 0x65, 0x00, 0x6c, 0x00, 0x6c, 0x00, 0x6f, 0x00, 0x20,
0x00, 0x77, 0x00, 0x6f, 0x00, 0x72, 0x00, 0x6c, 0x00, 0x64, 0x00, 0x20, 0xd8, 0x3d,
0xdc, 0xa9, 0x00, 0x0a,
];
test_from_bytes(false, "", utf16lebom, "hello world 💩\n").await?;
test_from_bytes(false, "", utf16bebom, "hello world 💩\n").await?;
Ok(())
}
#[tokio::test] #[tokio::test]
async fn post1() -> Result<()> { async fn post1() -> Result<()> {
let inp = "What is this\nThis is a test\nFoo"; let inp = "What is this\nThis is a test\nFoo";
@ -362,20 +389,19 @@ PREFIX:Page 3:
Ok(()) Ok(())
} }
/*
todo: uncomment when fixed
#[tokio::test] #[tokio::test]
async fn test_binary_content() -> Result<()> { async fn test_binary_content() -> Result<()> {
test_from_strs( test_from_strs(
false, false,
"foo:", "foo:",
"this is a test \n\n \0 foo", "this is a test \n\n \0 foo",
"foo:[rga: binary data]", "foo:[rga: binary data]",
) )
.await?; .await?;
test_from_strs(false, "foo:", "\0", "foo:[rga: binary data]").await?; test_from_strs(false, "foo:", "\0", "foo:[rga: binary data]").await?;
Ok(()) Ok(())
}*/ }
/*#[test] /*#[test]
fn chardet() -> Result<()> { fn chardet() -> Result<()> {

@ -77,11 +77,13 @@ fn synchronous_dump_sqlite(ai: AdaptInfo, mut s: impl Write) -> Result<()> {
return Ok(()); return Ok(());
} }
let inp_fname = filepath_hint; let inp_fname = filepath_hint;
let conn = Connection::open_with_flags(&inp_fname, OpenFlags::SQLITE_OPEN_READ_ONLY)
let conn = Connection::open_with_flags(inp_fname, OpenFlags::SQLITE_OPEN_READ_ONLY)?; .with_context(|| format!("opening sqlite connection to {}", inp_fname.display()))?;
let tables: Vec<String> = conn let tables: Vec<String> = conn
.prepare("select name from sqlite_master where type='table'")? .prepare("select name from sqlite_master where type='table'")
.query_map([], |r| r.get::<_, String>(0))? .context("while preparing query")?
.query_map([], |r| r.get::<_, String>(0))
.context("while executing query")?
.filter_map(|e| e.ok()) .filter_map(|e| e.ok())
.collect(); .collect();
debug!("db has {} tables", tables.len()); debug!("db has {} tables", tables.len());
@ -121,7 +123,9 @@ impl WritingFileAdapter for SqliteAdapter {
oup: Pin<Box<dyn AsyncWrite + Send>>, oup: Pin<Box<dyn AsyncWrite + Send>>,
) -> Result<()> { ) -> Result<()> {
let oup_sync = SyncIoBridge::new(oup); let oup_sync = SyncIoBridge::new(oup);
tokio::task::spawn_blocking(|| synchronous_dump_sqlite(ai, oup_sync)).await??; tokio::task::spawn_blocking(|| synchronous_dump_sqlite(ai, oup_sync))
.await?
.context("in synchronous sqlite task")?;
Ok(()) Ok(())
} }
} }
@ -134,10 +138,10 @@ mod test {
#[tokio::test] #[tokio::test]
async fn simple() -> Result<()> { async fn simple() -> Result<()> {
let adapter: Box<dyn FileAdapter> = Box::new(SqliteAdapter::default()); let adapter: Box<dyn FileAdapter> = Box::<SqliteAdapter>::default();
let fname = test_data_dir().join("hello.sqlite3"); let fname = test_data_dir().join("hello.sqlite3");
let (a, d) = simple_fs_adapt_info(&fname).await?; let (a, d) = simple_fs_adapt_info(&fname).await?;
let res = adapter.adapt(a, &d)?; let res = adapter.adapt(a, &d).await?;
let buf = adapted_to_vec(res).await?; let buf = adapted_to_vec(res).await?;

@ -6,6 +6,7 @@ use crate::{
}; };
use anyhow::*; use anyhow::*;
use async_stream::stream; use async_stream::stream;
use async_trait::async_trait;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use log::*; use log::*;
use std::path::PathBuf; use std::path::PathBuf;
@ -45,8 +46,13 @@ impl GetMetadata for TarAdapter {
} }
} }
#[async_trait]
impl FileAdapter for TarAdapter { impl FileAdapter for TarAdapter {
fn adapt(&self, ai: AdaptInfo, _detection_reason: &FileMatcher) -> Result<AdaptedFilesIterBox> { async fn adapt(
&self,
ai: AdaptInfo,
_detection_reason: &FileMatcher,
) -> Result<AdaptedFilesIterBox> {
let AdaptInfo { let AdaptInfo {
filepath_hint, filepath_hint,
inp, inp,
@ -103,7 +109,7 @@ mod tests {
let (a, d) = simple_adapt_info(&filepath, Box::pin(File::open(&filepath).await?)); let (a, d) = simple_adapt_info(&filepath, Box::pin(File::open(&filepath).await?));
let adapter = TarAdapter::new(); let adapter = TarAdapter::new();
let r = loop_adapt(&adapter, d, a).context("adapt")?; let r = loop_adapt(&adapter, d, a).await.context("adapt")?;
let o = adapted_to_vec(r).await.context("adapted_to_vec")?; let o = adapted_to_vec(r).await.context("adapted_to_vec")?;
assert_eq!( assert_eq!(
String::from_utf8(o).context("parsing utf8")?, String::from_utf8(o).context("parsing utf8")?,

@ -3,7 +3,7 @@ use std::pin::Pin;
use crate::{adapted_iter::one_file, join_handle_to_stream, to_io_err}; use crate::{adapted_iter::one_file, join_handle_to_stream, to_io_err};
use super::{AdaptInfo, FileAdapter, GetMetadata}; use super::{AdaptInfo, FileAdapter, GetMetadata};
use anyhow::Result; use anyhow::{Context, Result};
use async_trait::async_trait; use async_trait::async_trait;
use tokio::io::{AsyncReadExt, AsyncWrite}; use tokio::io::{AsyncReadExt, AsyncWrite};
@ -41,15 +41,17 @@ macro_rules! async_writeln {
} }
pub(crate) use async_writeln; pub(crate) use async_writeln;
#[async_trait]
impl<T> FileAdapter for T impl<T> FileAdapter for T
where where
T: WritingFileAdapter, T: WritingFileAdapter,
{ {
fn adapt( async fn adapt(
&self, &self,
a: super::AdaptInfo, a: super::AdaptInfo,
detection_reason: &crate::matching::FileMatcher, detection_reason: &crate::matching::FileMatcher,
) -> Result<crate::adapted_iter::AdaptedFilesIterBox> { ) -> Result<crate::adapted_iter::AdaptedFilesIterBox> {
let name = self.metadata().name.clone();
let (w, r) = tokio::io::duplex(128 * 1024); let (w, r) = tokio::io::duplex(128 * 1024);
let d2 = detection_reason.clone(); let d2 = detection_reason.clone();
let archive_recursion_depth = a.archive_recursion_depth + 1; let archive_recursion_depth = a.archive_recursion_depth + 1;
@ -59,7 +61,10 @@ where
let config = a.config.clone(); let config = a.config.clone();
let joiner = tokio::spawn(async move { let joiner = tokio::spawn(async move {
let x = d2; let x = d2;
T::adapt_write(a, &x, Box::pin(w)).await.map_err(to_io_err) T::adapt_write(a, &x, Box::pin(w))
.await
.with_context(|| format!("in {}.adapt_write", name))
.map_err(to_io_err)
}); });
Ok(one_file(AdaptInfo { Ok(one_file(AdaptInfo {

@ -5,7 +5,7 @@ use async_stream::stream;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use log::*; use log::*;
static EXTENSIONS: &[&str] = &["zip"]; static EXTENSIONS: &[&str] = &["zip", "jar"];
lazy_static! { lazy_static! {
static ref METADATA: AdapterMeta = AdapterMeta { static ref METADATA: AdapterMeta = AdapterMeta {
@ -36,8 +36,13 @@ impl GetMetadata for ZipAdapter {
} }
} }
#[async_trait]
impl FileAdapter for ZipAdapter { impl FileAdapter for ZipAdapter {
fn adapt(&self, ai: AdaptInfo, _detection_reason: &FileMatcher) -> Result<AdaptedFilesIterBox> { async fn adapt(
&self,
ai: AdaptInfo,
_detection_reason: &FileMatcher,
) -> Result<AdaptedFilesIterBox> {
// let (s, r) = mpsc::channel(1); // let (s, r) = mpsc::channel(1);
let AdaptInfo { let AdaptInfo {
inp, inp,
@ -52,11 +57,11 @@ impl FileAdapter for ZipAdapter {
if is_real_file { if is_real_file {
use async_zip::read::fs::ZipFileReader; use async_zip::read::fs::ZipFileReader;
let zip = ZipFileReader::new(&filepath_hint).await?;
let s = stream! { let s = stream! {
let zip = ZipFileReader::new(&filepath_hint).await?; for i in 0..zip.file().entries().len() {
for i in 0..zip.entries().len() { let file = zip.get_entry(i)?;
let reader = zip.entry_reader(i).await?; let reader = zip.entry(i).await?;
let file = reader.entry();
if file.filename().ends_with('/') { if file.filename().ends_with('/') {
continue; continue;
} }
@ -98,10 +103,11 @@ impl FileAdapter for ZipAdapter {
let mut zip = ZipFileReader::new(inp); let mut zip = ZipFileReader::new(inp);
let s = stream! { let s = stream! {
while !zip.finished() { while let Some(mut entry) = zip.next_entry().await? {
if let Some(reader) = zip.entry_reader().await? { let file = entry.entry();
let file = reader.entry();
if file.filename().ends_with('/') { if file.filename().ends_with('/') {
zip = entry.skip().await?;
continue; continue;
} }
debug!( debug!(
@ -114,6 +120,7 @@ impl FileAdapter for ZipAdapter {
); );
let new_line_prefix = format!("{}{}: ", line_prefix, file.filename()); let new_line_prefix = format!("{}{}: ", line_prefix, file.filename());
let fname = PathBuf::from(file.filename()); let fname = PathBuf::from(file.filename());
let reader = entry.reader();
tokio::pin!(reader); tokio::pin!(reader);
// SAFETY: this should be solvable without unsafe but idk how :( // SAFETY: this should be solvable without unsafe but idk how :(
// the issue is that ZipEntryReader borrows from ZipFileReader, but we need to yield it here into the stream // the issue is that ZipEntryReader borrows from ZipFileReader, but we need to yield it here into the stream
@ -133,7 +140,8 @@ impl FileAdapter for ZipAdapter {
postprocess, postprocess,
config: config.clone(), config: config.clone(),
}); });
} zip = entry.done().await.context("going to next file in zip but entry was not read fully")?;
} }
}; };
@ -182,7 +190,6 @@ impl<'a> AdaptedFilesIter for ZipAdaptIter<'a> {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use async_zip::{write::ZipFileWriter, Compression, ZipEntryBuilder}; use async_zip::{write::ZipFileWriter, Compression, ZipEntryBuilder};
use super::*; use super::*;
use crate::{preproc::loop_adapt, test_utils::*}; use crate::{preproc::loop_adapt, test_utils::*};
@ -213,7 +220,7 @@ mod test {
async fn only_seek_zip_fs() -> Result<()> { async fn only_seek_zip_fs() -> Result<()> {
let zip = test_data_dir().join("only-seek-zip.zip"); let zip = test_data_dir().join("only-seek-zip.zip");
let (a, d) = simple_fs_adapt_info(&zip).await?; let (a, d) = simple_fs_adapt_info(&zip).await?;
let _v = adapted_to_vec(loop_adapt(&ZipAdapter::new(), d, a)?).await?; let _v = adapted_to_vec(loop_adapt(&ZipAdapter::new(), d, a).await?).await?;
// assert_eq!(String::from_utf8(v)?, ""); // assert_eq!(String::from_utf8(v)?, "");
Ok(()) Ok(())
@ -236,7 +243,7 @@ mod test {
&PathBuf::from("outer.zip"), &PathBuf::from("outer.zip"),
Box::pin(std::io::Cursor::new(zipfile)), Box::pin(std::io::Cursor::new(zipfile)),
); );
let buf = adapted_to_vec(loop_adapt(&adapter, d, a)?).await?; let buf = adapted_to_vec(loop_adapt(&adapter, d, a).await?).await?;
assert_eq!( assert_eq!(
String::from_utf8(buf)?, String::from_utf8(buf)?,

@ -43,7 +43,7 @@ async fn main() -> anyhow::Result<()> {
// happens if e.g. ripgrep detects binary data in the pipe so it cancels reading // happens if e.g. ripgrep detects binary data in the pipe so it cancels reading
debug!("output cancelled (broken pipe)"); debug!("output cancelled (broken pipe)");
} else { } else {
Err(e).context("copying adapter output to stdout {}")?; Err(e).context("copying adapter output to stdout")?;
} }
} }
debug!("running adapter took {} total", print_dur(start)); debug!("running adapter took {} total", print_dur(start));

@ -1,17 +1,17 @@
use std::pin::Pin; use std::{future::Future, pin::Pin};
use anyhow::Result; use anyhow::{Context, Result};
use async_compression::tokio::write::ZstdEncoder; use async_compression::tokio::write::ZstdEncoder;
use async_stream::stream; use async_stream::stream;
use crate::to_io_err;
use log::*; use log::*;
use tokio::io::{AsyncRead, AsyncWriteExt}; use tokio::io::{AsyncRead, AsyncWriteExt};
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use tokio_util::io::{ReaderStream, StreamReader}; use tokio_util::io::{ReaderStream, StreamReader};
use crate::to_io_err; type FinishHandler =
dyn FnOnce((u64, Option<Vec<u8>>)) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send;
type FinishHandler = dyn FnOnce((u64, Option<Vec<u8>>)) -> Result<()> + Send;
/** /**
* wrap a AsyncRead so that it is passthrough, * wrap a AsyncRead so that it is passthrough,
* but also the written data is compressed and written into a buffer, * but also the written data is compressed and written into a buffer,
@ -26,7 +26,7 @@ pub fn async_read_and_write_to_cache<'a>(
let inp = Box::pin(inp); let inp = Box::pin(inp);
let mut zstd_writer = Some(ZstdEncoder::with_quality( let mut zstd_writer = Some(ZstdEncoder::with_quality(
Vec::new(), Vec::new(),
async_compression::Level::Precise(compression_level as u32), async_compression::Level::Precise(compression_level),
)); ));
let mut bytes_written = 0; let mut bytes_written = 0;
@ -64,7 +64,7 @@ pub fn async_read_and_write_to_cache<'a>(
}; };
// EOF, finish! // EOF, finish!
on_finish(finish) on_finish(finish).await.context("write_to_cache on_finish")
.map_err(to_io_err)?; .map_err(to_io_err)?;
}; };

@ -108,6 +108,7 @@ impl FromStr for CacheMaxBlobLen {
rename_all = "kebab-case", rename_all = "kebab-case",
about = env!("CARGO_PKG_DESCRIPTION"), about = env!("CARGO_PKG_DESCRIPTION"),
author = env!("CARGO_PKG_HOMEPAGE"), author = env!("CARGO_PKG_HOMEPAGE"),
long_about="rga: ripgrep, but also search in PDFs, E-Books, Office documents, zip, tar.gz, etc.",
// TODO: long_about does not seem to work to only show this on short help // TODO: long_about does not seem to work to only show this on short help
after_help = "-h shows a concise overview, --help shows more detail and advanced options.\n\nAll other options not shown here are passed directly to rg, especially [PATTERN] and [PATH ...]", after_help = "-h shows a concise overview, --help shows more detail and advanced options.\n\nAll other options not shown here are passed directly to rg, especially [PATTERN] and [PATH ...]",
usage = "rga [RGA OPTIONS] [RG OPTIONS] PATTERN [PATH ...]" usage = "rga [RGA OPTIONS] [RG OPTIONS] PATTERN [PATH ...]"
@ -197,9 +198,9 @@ pub struct CacheConfig {
/// Disable caching of results /// Disable caching of results
/// ///
/// By default, rga caches the extracted text, if it is small enough, /// By default, rga caches the extracted text, if it is small enough,
/// to a database in ~/.cache/rga on Linux, /// to a database in ${XDG_CACHE_DIR-~/.cache}/ripgrep-all on Linux,
/// ~/Library/Caches/rga on macOS, /// ~/Library/Caches/ripgrep-all on macOS,
/// or C:\Users\username\AppData\Local\rga on Windows. /// or C:\Users\username\AppData\Local\ripgrep-all on Windows.
/// This way, repeated searches on the same set of files will be much faster. /// This way, repeated searches on the same set of files will be much faster.
/// If you pass this flag, all caching will be disabled. /// If you pass this flag, all caching will be disabled.
#[serde(default, skip_serializing_if = "is_default")] #[serde(default, skip_serializing_if = "is_default")]
@ -208,7 +209,9 @@ pub struct CacheConfig {
/// Max compressed size to cache /// Max compressed size to cache
/// ///
/// Longest byte length (after compression) to store in cache. Longer adapter outputs will not be cached and recomputed every time. Allowed suffixes: k M G /// Longest byte length (after compression) to store in cache. Longer adapter outputs will not be cached and recomputed every time.
///
/// Allowed suffixes on command line: k M G
#[serde(default, skip_serializing_if = "is_default")] #[serde(default, skip_serializing_if = "is_default")]
#[structopt( #[structopt(
default_value, default_value,

@ -3,25 +3,28 @@ use crate::adapters::*;
use crate::caching_writer::async_read_and_write_to_cache; use crate::caching_writer::async_read_and_write_to_cache;
use crate::config::RgaConfig; use crate::config::RgaConfig;
use crate::matching::*; use crate::matching::*;
use crate::preproc_cache::CacheKey;
use crate::recurse::concat_read_streams; use crate::recurse::concat_read_streams;
use crate::{ use crate::{
preproc_cache::{LmdbCache, PreprocCache}, preproc_cache::{open_cache_db, PreprocCache},
print_bytes, print_bytes,
}; };
use anyhow::*; use anyhow::*;
use async_compression::tokio::bufread::ZstdDecoder; use async_compression::tokio::bufread::ZstdDecoder;
use async_stream::stream; use async_stream::stream;
// use futures::future::{BoxFuture, FutureExt};
use log::*; use log::*;
use path_clean::PathClean;
use postproc::PostprocPrefix; use postproc::PostprocPrefix;
use std::future::Future;
use std::io::Cursor; use std::io::Cursor;
use std::path::Path; use std::path::Path;
use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use tokio::io::AsyncBufRead; use tokio::io::AsyncBufRead;
use tokio::io::AsyncBufReadExt; use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader; use tokio::io::BufReader;
type ActiveAdapters = Vec<Arc<dyn FileAdapter>>; pub type ActiveAdapters = Vec<Arc<dyn FileAdapter>>;
async fn choose_adapter( async fn choose_adapter(
config: &RgaConfig, config: &RgaConfig,
@ -120,36 +123,6 @@ pub async fn rga_preproc(ai: AdaptInfo) -> Result<ReadBox> {
.with_context(|| format!("run_adapter({})", &path_hint_copy.to_string_lossy())) .with_context(|| format!("run_adapter({})", &path_hint_copy.to_string_lossy()))
} }
fn compute_cache_key(
filepath_hint: &Path,
adapter: &dyn FileAdapter,
active_adapters: ActiveAdapters,
) -> Result<Vec<u8>> {
let clean_path = filepath_hint.to_owned().clean();
let meta = std::fs::metadata(filepath_hint)
.with_context(|| format!("reading metadata for {}", filepath_hint.to_string_lossy()))?;
let modified = meta.modified().expect("weird OS that can't into mtime");
if adapter.metadata().recurses {
let active_adapters_cache_key = active_adapters
.iter()
.map(|a| (a.metadata().name.clone(), a.metadata().version))
.collect::<Vec<_>>();
let key = (active_adapters_cache_key, clean_path, modified);
debug!("Cache key (with recursion): {:?}", key);
bincode::serialize(&key).context("could not serialize path")
} else {
let key = (
adapter.metadata().name.clone(),
adapter.metadata().version,
clean_path,
modified,
);
debug!("Cache key (no recursion): {:?}", key);
bincode::serialize(&key).context("could not serialize path")
}
}
async fn adapt_caching( async fn adapt_caching(
ai: AdaptInfo, ai: AdaptInfo,
adapter: Arc<dyn FileAdapter>, adapter: Arc<dyn FileAdapter>,
@ -166,41 +139,44 @@ async fn adapt_caching(
ai.filepath_hint.to_string_lossy(), ai.filepath_hint.to_string_lossy(),
&meta.name &meta.name
); );
let db_name = format!("{}.v{}", meta.name, meta.version);
let cache_compression_level = ai.config.cache.compression_level; let cache_compression_level = ai.config.cache.compression_level;
let cache_max_blob_len = ai.config.cache.max_blob_len; let cache_max_blob_len = ai.config.cache.max_blob_len;
let cache = if ai.is_real_file { let cache = if ai.is_real_file && !ai.config.cache.disabled {
LmdbCache::open(&ai.config.cache)? Some(open_cache_db(Path::new(&ai.config.cache.path.0)).await?)
} else { } else {
None None
}; };
let mut cache = cache.context("No cache?")?; let mut cache = cache.context("No cache?")?;
let cache_key: Vec<u8> = let cache_key = CacheKey::new(&ai.filepath_hint, adapter.as_ref(), &active_adapters)?;
compute_cache_key(&ai.filepath_hint, adapter.as_ref(), active_adapters)?;
// let dbg_ctx = format!("adapter {}", &adapter.metadata().name); // let dbg_ctx = format!("adapter {}", &adapter.metadata().name);
let cached = cache.get(&db_name, &cache_key)?; let cached = cache.get(&cache_key).await.context("cache.get")?;
match cached { match cached {
Some(cached) => Ok(Box::pin(ZstdDecoder::new(Cursor::new(cached)))), Some(cached) => Ok(Box::pin(ZstdDecoder::new(Cursor::new(cached)))),
None => { None => {
debug!("cache MISS, running adapter with caching..."); debug!("cache MISS, running adapter with caching...");
let inp = loop_adapt(adapter.as_ref(), detection_reason, ai)?; let inp = loop_adapt(adapter.as_ref(), detection_reason, ai).await?;
let inp = concat_read_streams(inp); let inp = concat_read_streams(inp);
let inp = async_read_and_write_to_cache( let inp = async_read_and_write_to_cache(
inp, inp,
cache_max_blob_len.0, cache_max_blob_len.0,
cache_compression_level.0, cache_compression_level.0,
Box::new(move |(uncompressed_size, compressed)| { Box::new(move |(uncompressed_size, compressed)| {
debug!( Box::pin(async move {
"uncompressed output: {}", debug!(
print_bytes(uncompressed_size as f64) "uncompressed output: {}",
); print_bytes(uncompressed_size as f64)
if let Some(cached) = compressed { );
debug!("compressed output: {}", print_bytes(cached.len() as f64)); if let Some(cached) = compressed {
cache.set(&db_name, &cache_key, &cached)? debug!("compressed output: {}", print_bytes(cached.len() as f64));
} cache
Ok(()) .set(&cache_key, cached)
.await
.context("writing to cache")?
}
Ok(())
})
}), }),
)?; )?;
@ -213,21 +189,34 @@ pub fn loop_adapt(
adapter: &dyn FileAdapter, adapter: &dyn FileAdapter,
detection_reason: FileMatcher, detection_reason: FileMatcher,
ai: AdaptInfo, ai: AdaptInfo,
) -> Pin<Box<dyn Future<Output = anyhow::Result<AdaptedFilesIterBox>> + Send + '_>> {
Box::pin(async move { loop_adapt_inner(adapter, detection_reason, ai).await })
}
pub async fn loop_adapt_inner(
adapter: &dyn FileAdapter,
detection_reason: FileMatcher,
ai: AdaptInfo,
) -> anyhow::Result<AdaptedFilesIterBox> { ) -> anyhow::Result<AdaptedFilesIterBox> {
let fph = ai.filepath_hint.clone(); let fph = ai.filepath_hint.clone();
let inp = adapter.adapt(ai, &detection_reason).with_context(|| { let inp = adapter.adapt(ai, &detection_reason).await;
format!( let inp = if adapter.metadata().name == "postprocprefix" {
"adapting {} via {} failed", // don't add confusing error context
fph.to_string_lossy(), inp?
adapter.metadata().name } else {
) inp.with_context(|| {
})?; format!(
"adapting {} via {} failed",
fph.to_string_lossy(),
adapter.metadata().name
)
})?
};
let s = stream! { let s = stream! {
for await file in inp { for await file in inp {
match buf_choose_adapter(file?).await? { match buf_choose_adapter(file?).await? {
Ret::Recurse(ai, adapter, detection_reason, _active_adapters) => { Ret::Recurse(ai, adapter, detection_reason, _active_adapters) => {
if ai.archive_recursion_depth >= ai.config.max_archive_recursion.0 { if ai.archive_recursion_depth >= ai.config.max_archive_recursion.0 {
let s = format!("{}[rga: max archive recursion reached ({})]", ai.line_prefix, ai.archive_recursion_depth).into_bytes(); let s = format!("{}[rga: max archive recursion reached ({})]\n", ai.line_prefix, ai.archive_recursion_depth).into_bytes();
yield Ok(AdaptInfo { yield Ok(AdaptInfo {
inp: Box::pin(Cursor::new(s)), inp: Box::pin(Cursor::new(s)),
..ai ..ai
@ -243,7 +232,7 @@ pub fn loop_adapt(
ai.filepath_hint.to_string_lossy(), ai.filepath_hint.to_string_lossy(),
&adapter.metadata().name &adapter.metadata().name
); );
for await ifile in loop_adapt(adapter.as_ref(), detection_reason, ai)? { for await ifile in loop_adapt(adapter.as_ref(), detection_reason, ai).await? {
yield ifile; yield ifile;
} }
} }

@ -1,135 +1,188 @@
use crate::{config::CacheConfig, print_bytes, print_dur}; use crate::{adapters::FileAdapter, preproc::ActiveAdapters};
use anyhow::{format_err, Context, Result}; use anyhow::{Context, Result};
use log::*; use path_clean::PathClean;
use rkv::backend::{BackendEnvironmentBuilder, LmdbEnvironment}; use rusqlite::{named_params, OptionalExtension};
use std::{fmt::Display, path::Path, time::Instant}; use std::{path::Path, time::UNIX_EPOCH};
use tokio_rusqlite::Connection;
pub trait PreprocCache: Send + Sync { #[derive(Clone)]
/*/// gets cache at specified key. pub struct CacheKey {
/// if cache hit, return the resulting data adapter: String,
/// else, run the given lambda, and store its result in the cache if present adapter_version: i32,
fn get_or_run<'a>( active_adapters: String,
&mut self, file_path: String,
db_name: &str, file_mtime_unix_ms: i64,
key: &[u8],
debug_name: &str,
runner: Box<dyn FnOnce() -> Result<Option<Vec<u8>>> + 'a>,
) -> Result<Option<Vec<u8>>>;*/
fn get(&self, db_name: &str, key: &[u8]) -> Result<Option<Vec<u8>>>;
fn set(&mut self, db_name: &str, key: &[u8], value: &[u8]) -> Result<()>;
} }
impl CacheKey {
/// opens a LMDB cache pub fn new(
fn open_cache_db( filepath_hint: &Path,
path: &Path, adapter: &dyn FileAdapter,
) -> Result<std::sync::Arc<std::sync::RwLock<rkv::Rkv<LmdbEnvironment>>>> { active_adapters: &ActiveAdapters,
std::fs::create_dir_all(path)?; ) -> Result<CacheKey> {
// use rkv::backend::LmdbEnvironmentFlags; let meta = std::fs::metadata(filepath_hint)
.with_context(|| format!("reading metadata for {}", filepath_hint.to_string_lossy()))?;
rkv::Manager::<LmdbEnvironment>::singleton() let modified = meta.modified().expect("weird OS that can't into mtime");
.write() let file_mtime_unix_ms = modified.duration_since(UNIX_EPOCH)?.as_millis() as i64;
.map_err(|_| format_err!("could not write cache db manager"))? let active_adapters = if adapter.metadata().recurses {
.get_or_create(path, |p| { serde_json::to_string(
let mut builder = rkv::Rkv::environment_builder::<rkv::backend::Lmdb>(); &active_adapters
builder .iter()
.set_flags(rkv::EnvironmentFlags::NO_SYNC) .map(|a| format!("{}.v{}", a.metadata().name, a.metadata().version))
.set_flags(rkv::EnvironmentFlags::WRITE_MAP) // not durable cuz it's a cache .collect::<Vec<_>>(),
// i'm not sure why NO_TLS is needed. otherwise LMDB transactions (open readers) will keep piling up until it fails with )?
// LmdbError(ReadersFull). Those "open readers" stay even after the corresponding processes exit. } else {
// hope setting this doesn't break integrity "null".to_string()
.set_flags(rkv::EnvironmentFlags::NO_TLS) };
// sometimes, this seems to cause the data.mdb file to appear as 2GB in size (with holes), but sometimes not? Ok(CacheKey {
.set_map_size(2 * 1024 * 1024 * 1024) adapter: adapter.metadata().name.clone(),
.set_max_dbs(100) adapter_version: adapter.metadata().version,
.set_max_readers(128); file_path: filepath_hint.clean().to_string_lossy().to_string(),
rkv::Rkv::from_builder(p, builder) file_mtime_unix_ms,
active_adapters,
}) })
.map_err(|e| format_err!("could not get/create cache db: {}", e)) }
} }
pub struct LmdbCache { #[async_trait::async_trait]
db_arc: std::sync::Arc<std::sync::RwLock<rkv::Rkv<LmdbEnvironment>>>, pub trait PreprocCache {
async fn get(&self, key: &CacheKey) -> Result<Option<Vec<u8>>>;
async fn set(&mut self, key: &CacheKey, value: Vec<u8>) -> Result<()>;
} }
impl LmdbCache { async fn connect_pragmas(db: &Connection) -> Result<()> {
pub fn open(config: &CacheConfig) -> Result<Option<LmdbCache>> { // https://phiresky.github.io/blog/2020/sqlite-performance-tuning/
if config.disabled { //let want_page_size = 32768;
return Ok(None); //db.execute(&format!("pragma page_size = {};", want_page_size))
} // .context("setup pragma 1")?;
let path = Path::new(&config.path.0); db.call(|db| {
Ok(Some(LmdbCache { db.execute_batch(
db_arc: open_cache_db(path)?, "
})) pragma journal_mode = WAL;
pragma foreign_keys = on;
pragma temp_store = memory;
pragma synchronous = off; -- integrity isn't very important here
pragma mmap_size = 30000000000;
create table if not exists preproc_cache (
adapter text not null,
adapter_version integer not null,
created_unix_ms integer not null default (unixepoch() * 1000),
active_adapters text not null, -- 'null' if adapter cannot recurse
file_path text not null,
file_mtime_unix_ms integer not null,
text_content_zstd blob not null
) strict;
create unique index if not exists preproc_cache_idx on preproc_cache (adapter, adapter_version, file_path, active_adapters);
",
)
})
.await.context("connect_pragmas")?;
let jm: i64 = db
.call(|db| db.pragma_query_value(None, "application_id", |r| r.get(0)))
.await?;
if jm != 924716026 {
// (probably) newly created db
create_pragmas(db).await.context("create_pragmas")?;
} }
Ok(())
} }
#[derive(Debug)] async fn create_pragmas(db: &Connection) -> Result<()> {
struct RkvErrWrap(rkv::StoreError); db.call(|db| {
impl Display for RkvErrWrap { db.execute_batch(
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { "
self.0.fmt(f) pragma application_id = 924716026;
} pragma user_version = 2; -- todo: on upgrade clear db if version is unexpected
",
)
})
.await?;
Ok(())
} }
impl std::error::Error for RkvErrWrap {} struct SqliteCache {
db: Connection,
}
impl SqliteCache {
async fn new(path: &Path) -> Result<SqliteCache> {
let db = Connection::open(path.join("cache.sqlite3")).await?;
connect_pragmas(&db).await?;
impl PreprocCache for LmdbCache { Ok(SqliteCache { db })
fn get(&self, db_name: &str, key: &[u8]) -> Result<Option<Vec<u8>>> { }
let start = Instant::now(); }
let db_env = self
.db_arc
.read()
.map_err(|_| anyhow::anyhow!("Could not open lock, some lock writer panicked"))?;
let db = db_env
.open_single(db_name, rkv::store::Options::create())
.map_err(RkvErrWrap)
.context("could not open cache db store")?;
let reader = db_env.read().expect("could not get reader"); #[async_trait::async_trait]
let cached = db impl PreprocCache for SqliteCache {
.get(&reader, key) async fn get(&self, key: &CacheKey) -> Result<Option<Vec<u8>>> {
.map_err(RkvErrWrap) let key = (*key).clone(); // todo: without cloning
.context("could not read from db")?; Ok(self
.db
.call(move |db| {
db.query_row(
"select text_content_zstd from preproc_cache where
adapter = :adapter
and adapter_version = :adapter_version
and active_adapters = :active_adapters
and file_path = :file_path
and file_mtime_unix_ms = :file_mtime_unix_ms
",
named_params! {
":adapter": &key.adapter,
":adapter_version": &key.adapter_version,
":active_adapters": &key.active_adapters,
":file_path": &key.file_path,
":file_mtime_unix_ms": &key.file_mtime_unix_ms
},
|r| r.get::<_, Vec<u8>>(0),
)
.optional()
})
.await
.context("reading from cache")?)
}
match cached { async fn set(&mut self, key: &CacheKey, value: Vec<u8>) -> Result<()> {
Some(rkv::Value::Blob(cached)) => { let key = (*key).clone(); // todo: without cloning
debug!( Ok(self
"cache HIT, reading {} (compressed) from cache", .db
print_bytes(cached.len() as f64) .call(move |db| {
); db.execute(
debug!("reading from cache took {}", print_dur(start)); "insert into preproc_cache (adapter, adapter_version, active_adapters, file_path, file_mtime_unix_ms, text_content_zstd) values
Ok(Some(Vec::from(cached))) (:adapter, :adapter_version, :active_adapters, :file_path, :file_mtime_unix_ms, :text_content_zstd)
} on conflict (adapter, adapter_version, active_adapters, file_path) do update set
Some(_) => Err(format_err!("Integrity: value not blob"))?, file_mtime_unix_ms = :file_mtime_unix_ms,
None => Ok(None), created_unix_ms = unixepoch() * 1000,
} text_content_zstd = :text_content_zstd",
named_params! {
":adapter": &key.adapter,
":adapter_version": &key.adapter_version,
":active_adapters": &key.active_adapters,
":file_path": &key.file_path,
":file_mtime_unix_ms": &key.file_mtime_unix_ms,
":text_content_zstd": value
}
).map(|_| ())
})
.await?)
} }
fn set(&mut self, db_name: &str, key: &[u8], got: &[u8]) -> Result<()> { }
let start = Instant::now(); /// opens a default cache
debug!("writing {} to cache", print_bytes(got.len() as f64)); pub async fn open_cache_db(path: &Path) -> Result<impl PreprocCache> {
let db_env = self std::fs::create_dir_all(path)?;
.db_arc SqliteCache::new(path).await
.read() }
.map_err(|_| anyhow::anyhow!("Could not open lock, some lock writer panicked"))?;
let db = db_env #[cfg(test)]
.open_single(db_name, rkv::store::Options::create()) mod test {
.map_err(RkvErrWrap)
.context("could not open cache db store")?;
let mut writer = db_env use crate::preproc_cache::*;
.write()
.map_err(RkvErrWrap)
.with_context(|| format_err!("could not open write handle to cache"))?;
db.put(&mut writer, key, &rkv::Value::Blob(got)) #[tokio::test]
.map_err(RkvErrWrap) async fn test_read_write() -> anyhow::Result<()> {
.with_context(|| format_err!("could not write to cache"))?; let path = tempfile::tempdir()?;
writer let _db = open_cache_db(&path.path().join("foo.sqlite3")).await?;
.commit() // db.set();
.map_err(RkvErrWrap)
.with_context(|| "could not write cache".to_string())?;
debug!("writing to cache took {}", print_dur(start));
Ok(()) Ok(())
} }
} }

Loading…
Cancel
Save