simplify sqlite adapter

pull/104/head
phiresky 1 year ago
parent 816e27802a
commit ddcfff9b4d

@ -2,12 +2,12 @@ use super::*;
use super::{AdaptInfo, AdapterMeta, FileAdapter, GetMetadata}; use super::{AdaptInfo, AdapterMeta, FileAdapter, GetMetadata};
use crate::adapted_iter::one_file; use crate::adapted_iter::one_file;
use crate::join_handle_to_stream;
use crate::{ use crate::{
adapted_iter::AdaptedFilesIterBox, adapted_iter::AdaptedFilesIterBox,
expand::expand_str_ez, expand::expand_str_ez,
matching::{FastFileMatcher, FileMatcher}, matching::{FastFileMatcher, FileMatcher},
}; };
use crate::{join_handle_to_stream, to_io_err};
use anyhow::Result; use anyhow::Result;
use async_stream::stream; use async_stream::stream;
use bytes::Bytes; use bytes::Bytes;
@ -148,8 +148,7 @@ fn proc_wait(mut child: Child) -> impl AsyncRead {
if res.success() { if res.success() {
yield std::io::Result::Ok(Bytes::new()); yield std::io::Result::Ok(Bytes::new());
} else { } else {
yield std::io::Result::Err(std::io::Error::new( yield std::io::Result::Err(to_io_err(
std::io::ErrorKind::Other,
format_err!("subprocess failed: {:?}", res), format_err!("subprocess failed: {:?}", res),
)); ));
} }

@ -1,18 +1,14 @@
use crate::{adapted_iter::one_file, join_handle_to_stream, to_io_err}; use super::{writing::WritingFileAdapter, *};
use super::*;
use anyhow::Result; use anyhow::Result;
use async_trait::async_trait;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use log::*; use log::*;
use rusqlite::types::ValueRef; use rusqlite::types::ValueRef;
use rusqlite::*; use rusqlite::*;
use std::{convert::TryInto, io::Cursor}; use std::{convert::TryInto, io::Write};
use tokio::{ use tokio::io::AsyncWrite;
io::AsyncReadExt,
sync::mpsc::{self, Sender}, use tokio_util::io::SyncIoBridge;
};
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::io::StreamReader;
static EXTENSIONS: &[&str] = &["db", "db3", "sqlite", "sqlite3"]; static EXTENSIONS: &[&str] = &["db", "db3", "sqlite", "sqlite3"];
@ -67,7 +63,7 @@ fn format_blob(b: ValueRef) -> String {
} }
} }
fn yielder(ai: AdaptInfo, s: Sender<std::io::Result<Cursor<Vec<u8>>>>) -> Result<()> { fn synchronous_dump_sqlite(ai: AdaptInfo, mut s: impl Write) -> Result<()> {
let AdaptInfo { let AdaptInfo {
is_real_file, is_real_file,
filepath_hint, filepath_hint,
@ -77,9 +73,7 @@ fn yielder(ai: AdaptInfo, s: Sender<std::io::Result<Cursor<Vec<u8>>>>) -> Result
if !is_real_file { if !is_real_file {
// db is in an archive // db is in an archive
// todo: read to memory and then use that blob if size < max // todo: read to memory and then use that blob if size < max
s.blocking_send(Ok(Cursor::new( writeln!(s, "{line_prefix}[rga: skipping sqlite in archive]",)?;
format!("{}[rga: skipping sqlite in archive]\n", line_prefix).into_bytes(),
)))?;
return Ok(()); return Ok(());
} }
let inp_fname = filepath_hint; let inp_fname = filepath_hint;
@ -107,43 +101,28 @@ fn yielder(ai: AdaptInfo, s: Sender<std::io::Result<Cursor<Vec<u8>>>>) -> Result
// kind of shitty (lossy) output. maybe output real csv or something? // kind of shitty (lossy) output. maybe output real csv or something?
while let Some(row) = z.next()? { while let Some(row) = z.next()? {
let str = format!( let row_str = col_names
"{}{}: {}\n", .iter()
line_prefix, .enumerate()
table, .map(|(i, e)| Ok(format!("{}={}", e, format_blob(row.get_ref(i)?))))
col_names .collect::<Result<Vec<String>>>()?
.iter() .join(", ");
.enumerate() writeln!(s, "{line_prefix}{table}: {row_str}",)?;
.map(|(i, e)| Ok(format!("{}={}", e, format_blob(row.get_ref(i)?))))
.collect::<Result<Vec<String>>>()?
.join(", ")
);
s.blocking_send(Ok(Cursor::new(str.into_bytes())))?;
} }
} }
Ok(()) Ok(())
} }
impl FileAdapter for SqliteAdapter { #[async_trait]
fn adapt(&self, ai: AdaptInfo, _detection_reason: &FileMatcher) -> Result<AdaptedFilesIterBox> { impl WritingFileAdapter for SqliteAdapter {
let (s, r) = mpsc::channel(10); async fn adapt_write(
let filepath_hint = format!("{}.txt", ai.filepath_hint.to_string_lossy()); ai: AdaptInfo,
let config = ai.config.clone(); _detection_reason: &FileMatcher,
let line_prefix = ai.line_prefix.clone(); oup: Pin<Box<dyn AsyncWrite + Send>>,
let postprocess = ai.postprocess; ) -> Result<()> {
let archive_recursion_depth = ai.archive_recursion_depth; let oup_sync = SyncIoBridge::new(oup);
let joiner = tokio::task::spawn_blocking(|| yielder(ai, s).map_err(to_io_err)); tokio::task::spawn_blocking(|| synchronous_dump_sqlite(ai, oup_sync)).await??;
Ok(one_file(AdaptInfo { Ok(())
is_real_file: false,
filepath_hint: filepath_hint.into(),
archive_recursion_depth: archive_recursion_depth + 1,
config,
inp: Box::pin(
StreamReader::new(ReceiverStream::new(r)).chain(join_handle_to_stream(joiner)),
),
line_prefix,
postprocess,
}))
} }
} }

@ -1,11 +1,11 @@
use std::pin::Pin; use std::pin::Pin;
use crate::adapted_iter::one_file; use crate::{adapted_iter::one_file, join_handle_to_stream, to_io_err};
use super::{AdaptInfo, FileAdapter, GetMetadata}; use super::{AdaptInfo, FileAdapter, GetMetadata};
use anyhow::Result; use anyhow::Result;
use async_trait::async_trait; use async_trait::async_trait;
use tokio::io::AsyncWrite; use tokio::io::{AsyncReadExt, AsyncWrite};
#[async_trait] #[async_trait]
pub trait WritingFileAdapter: GetMetadata + Send + Sync + Clone { pub trait WritingFileAdapter: GetMetadata + Send + Sync + Clone {
@ -57,9 +57,9 @@ where
let postprocess = a.postprocess; let postprocess = a.postprocess;
let line_prefix = a.line_prefix.clone(); let line_prefix = a.line_prefix.clone();
let config = a.config.clone(); let config = a.config.clone();
tokio::spawn(async move { let joiner = tokio::spawn(async move {
let x = d2; let x = d2;
T::adapt_write(a, &x, Box::pin(w)).await.unwrap() T::adapt_write(a, &x, Box::pin(w)).await.map_err(to_io_err)
}); });
Ok(one_file(AdaptInfo { Ok(one_file(AdaptInfo {
@ -67,28 +67,9 @@ where
filepath_hint: filepath_hint.into(), filepath_hint: filepath_hint.into(),
archive_recursion_depth, archive_recursion_depth,
config, config,
inp: Box::pin(r), inp: Box::pin(r.chain(join_handle_to_stream(joiner))),
line_prefix, line_prefix,
postprocess, postprocess,
})) }))
} }
/*fn adapt(
&self,
ai_outer: super::AdaptInfo,
detection_reason: &crate::matching::FileMatcher,
) -> anyhow::Result<ReadBox> {
let detc = detection_reason.clone();
panic!("ooo");*/
// cc.adapt_write(ai_outer, detc, )
/*tokio::spawn(move || {
let mut oup = w;
let ai = ai_outer;
let res = cc.adapt_write(ai, &detc, &mut oup);
if let Err(e) = res {
oup.write_err(std::io::Error::new(std::io::ErrorKind::Other, e))
.expect("could not write err");
}
}); */
//Ok(Box::new(r))
} }

@ -1,12 +1,12 @@
use tokio_util::io::{ReaderStream, StreamReader}; use tokio_util::io::{ReaderStream, StreamReader};
use crate::{adapted_iter::AdaptedFilesIterBox, adapters::*}; use crate::{adapted_iter::AdaptedFilesIterBox, adapters::*, to_io_err};
use async_stream::stream; use async_stream::stream;
pub fn concat_read_streams(input: AdaptedFilesIterBox) -> ReadBox { pub fn concat_read_streams(input: AdaptedFilesIterBox) -> ReadBox {
let s = stream! { let s = stream! {
for await output in input { for await output in input {
let o = output.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?.inp; let o = output.map_err(to_io_err)?.inp;
let stream = ReaderStream::new(o); let stream = ReaderStream::new(o);
for await bytes in stream { for await bytes in stream {
yield bytes; yield bytes;

Loading…
Cancel
Save