diff --git a/src/adapters/custom.rs b/src/adapters/custom.rs index ec6cd7e..d935296 100644 --- a/src/adapters/custom.rs +++ b/src/adapters/custom.rs @@ -2,12 +2,12 @@ use super::*; use super::{AdaptInfo, AdapterMeta, FileAdapter, GetMetadata}; use crate::adapted_iter::one_file; -use crate::join_handle_to_stream; use crate::{ adapted_iter::AdaptedFilesIterBox, expand::expand_str_ez, matching::{FastFileMatcher, FileMatcher}, }; +use crate::{join_handle_to_stream, to_io_err}; use anyhow::Result; use async_stream::stream; use bytes::Bytes; @@ -148,8 +148,7 @@ fn proc_wait(mut child: Child) -> impl AsyncRead { if res.success() { yield std::io::Result::Ok(Bytes::new()); } else { - yield std::io::Result::Err(std::io::Error::new( - std::io::ErrorKind::Other, + yield std::io::Result::Err(to_io_err( format_err!("subprocess failed: {:?}", res), )); } diff --git a/src/adapters/sqlite.rs b/src/adapters/sqlite.rs index ce677bf..836b330 100644 --- a/src/adapters/sqlite.rs +++ b/src/adapters/sqlite.rs @@ -1,18 +1,14 @@ -use crate::{adapted_iter::one_file, join_handle_to_stream, to_io_err}; - -use super::*; +use super::{writing::WritingFileAdapter, *}; use anyhow::Result; +use async_trait::async_trait; use lazy_static::lazy_static; use log::*; use rusqlite::types::ValueRef; use rusqlite::*; -use std::{convert::TryInto, io::Cursor}; -use tokio::{ - io::AsyncReadExt, - sync::mpsc::{self, Sender}, -}; -use tokio_stream::wrappers::ReceiverStream; -use tokio_util::io::StreamReader; +use std::{convert::TryInto, io::Write}; +use tokio::io::AsyncWrite; + +use tokio_util::io::SyncIoBridge; static EXTENSIONS: &[&str] = &["db", "db3", "sqlite", "sqlite3"]; @@ -67,7 +63,7 @@ fn format_blob(b: ValueRef) -> String { } } -fn yielder(ai: AdaptInfo, s: Sender>>>) -> Result<()> { +fn synchronous_dump_sqlite(ai: AdaptInfo, mut s: impl Write) -> Result<()> { let AdaptInfo { is_real_file, filepath_hint, @@ -77,9 +73,7 @@ fn yielder(ai: AdaptInfo, s: Sender>>>) -> Result if !is_real_file { // db is in an archive // todo: read to memory and then use that blob if size < max - s.blocking_send(Ok(Cursor::new( - format!("{}[rga: skipping sqlite in archive]\n", line_prefix).into_bytes(), - )))?; + writeln!(s, "{line_prefix}[rga: skipping sqlite in archive]",)?; return Ok(()); } let inp_fname = filepath_hint; @@ -107,43 +101,28 @@ fn yielder(ai: AdaptInfo, s: Sender>>>) -> Result // kind of shitty (lossy) output. maybe output real csv or something? while let Some(row) = z.next()? { - let str = format!( - "{}{}: {}\n", - line_prefix, - table, - col_names - .iter() - .enumerate() - .map(|(i, e)| Ok(format!("{}={}", e, format_blob(row.get_ref(i)?)))) - .collect::>>()? - .join(", ") - ); - s.blocking_send(Ok(Cursor::new(str.into_bytes())))?; + let row_str = col_names + .iter() + .enumerate() + .map(|(i, e)| Ok(format!("{}={}", e, format_blob(row.get_ref(i)?)))) + .collect::>>()? + .join(", "); + writeln!(s, "{line_prefix}{table}: {row_str}",)?; } } Ok(()) } -impl FileAdapter for SqliteAdapter { - fn adapt(&self, ai: AdaptInfo, _detection_reason: &FileMatcher) -> Result { - let (s, r) = mpsc::channel(10); - let filepath_hint = format!("{}.txt", ai.filepath_hint.to_string_lossy()); - let config = ai.config.clone(); - let line_prefix = ai.line_prefix.clone(); - let postprocess = ai.postprocess; - let archive_recursion_depth = ai.archive_recursion_depth; - let joiner = tokio::task::spawn_blocking(|| yielder(ai, s).map_err(to_io_err)); - Ok(one_file(AdaptInfo { - is_real_file: false, - filepath_hint: filepath_hint.into(), - archive_recursion_depth: archive_recursion_depth + 1, - config, - inp: Box::pin( - StreamReader::new(ReceiverStream::new(r)).chain(join_handle_to_stream(joiner)), - ), - line_prefix, - postprocess, - })) +#[async_trait] +impl WritingFileAdapter for SqliteAdapter { + async fn adapt_write( + ai: AdaptInfo, + _detection_reason: &FileMatcher, + oup: Pin>, + ) -> Result<()> { + let oup_sync = SyncIoBridge::new(oup); + tokio::task::spawn_blocking(|| synchronous_dump_sqlite(ai, oup_sync)).await??; + Ok(()) } } diff --git a/src/adapters/writing.rs b/src/adapters/writing.rs index 3a61ea2..548711c 100644 --- a/src/adapters/writing.rs +++ b/src/adapters/writing.rs @@ -1,11 +1,11 @@ use std::pin::Pin; -use crate::adapted_iter::one_file; +use crate::{adapted_iter::one_file, join_handle_to_stream, to_io_err}; use super::{AdaptInfo, FileAdapter, GetMetadata}; use anyhow::Result; use async_trait::async_trait; -use tokio::io::AsyncWrite; +use tokio::io::{AsyncReadExt, AsyncWrite}; #[async_trait] pub trait WritingFileAdapter: GetMetadata + Send + Sync + Clone { @@ -57,9 +57,9 @@ where let postprocess = a.postprocess; let line_prefix = a.line_prefix.clone(); let config = a.config.clone(); - tokio::spawn(async move { + let joiner = tokio::spawn(async move { let x = d2; - T::adapt_write(a, &x, Box::pin(w)).await.unwrap() + T::adapt_write(a, &x, Box::pin(w)).await.map_err(to_io_err) }); Ok(one_file(AdaptInfo { @@ -67,28 +67,9 @@ where filepath_hint: filepath_hint.into(), archive_recursion_depth, config, - inp: Box::pin(r), + inp: Box::pin(r.chain(join_handle_to_stream(joiner))), line_prefix, postprocess, })) } - /*fn adapt( - &self, - ai_outer: super::AdaptInfo, - detection_reason: &crate::matching::FileMatcher, - ) -> anyhow::Result { - let detc = detection_reason.clone(); - panic!("ooo");*/ - // cc.adapt_write(ai_outer, detc, ) - /*tokio::spawn(move || { - let mut oup = w; - let ai = ai_outer; - let res = cc.adapt_write(ai, &detc, &mut oup); - if let Err(e) = res { - oup.write_err(std::io::Error::new(std::io::ErrorKind::Other, e)) - .expect("could not write err"); - } - }); */ - - //Ok(Box::new(r)) } diff --git a/src/recurse.rs b/src/recurse.rs index 1772876..8994d7f 100644 --- a/src/recurse.rs +++ b/src/recurse.rs @@ -1,12 +1,12 @@ use tokio_util::io::{ReaderStream, StreamReader}; -use crate::{adapted_iter::AdaptedFilesIterBox, adapters::*}; +use crate::{adapted_iter::AdaptedFilesIterBox, adapters::*, to_io_err}; use async_stream::stream; pub fn concat_read_streams(input: AdaptedFilesIterBox) -> ReadBox { let s = stream! { for await output in input { - let o = output.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?.inp; + let o = output.map_err(to_io_err)?.inp; let stream = ReaderStream::new(o); for await bytes in stream { yield bytes;