remove lifetimes, fix
parent
906043060b
commit
af8cf228b3
@ -1,24 +1,24 @@
|
||||
use crate::adapters::AdaptInfo;
|
||||
|
||||
// TODO: using iterator trait possible?? should basically be Iterator<AdaptInfo>
|
||||
pub trait AdaptedFilesIter {
|
||||
pub trait AdaptedFilesIter: Send {
|
||||
// next takes a 'a-lived reference and returns an AdaptInfo that lives as long as the reference
|
||||
fn next<'a>(&'a mut self) -> Option<AdaptInfo<'a>>;
|
||||
fn next<'a>(&'a mut self) -> Option<AdaptInfo>;
|
||||
}
|
||||
|
||||
/// A single AdaptInfo
|
||||
pub struct SingleAdaptedFileAsIter<'a> {
|
||||
ai: Option<AdaptInfo<'a>>,
|
||||
pub struct SingleAdaptedFileAsIter {
|
||||
ai: Option<AdaptInfo>,
|
||||
}
|
||||
impl SingleAdaptedFileAsIter<'_> {
|
||||
pub fn new<'a>(ai: AdaptInfo<'a>) -> SingleAdaptedFileAsIter<'a> {
|
||||
impl SingleAdaptedFileAsIter {
|
||||
pub fn new<'a>(ai: AdaptInfo) -> SingleAdaptedFileAsIter {
|
||||
SingleAdaptedFileAsIter { ai: Some(ai) }
|
||||
}
|
||||
}
|
||||
impl AdaptedFilesIter for SingleAdaptedFileAsIter<'_> {
|
||||
fn next<'a>(&'a mut self) -> Option<AdaptInfo<'a>> {
|
||||
impl AdaptedFilesIter for SingleAdaptedFileAsIter {
|
||||
fn next<'a>(&'a mut self) -> Option<AdaptInfo> {
|
||||
self.ai.take()
|
||||
}
|
||||
}
|
||||
|
||||
pub type AdaptedFilesIterBox<'a> = Box<dyn AdaptedFilesIter + 'a>;
|
||||
pub type AdaptedFilesIterBox = Box<dyn AdaptedFilesIter>;
|
||||
|
@ -1,112 +1,72 @@
|
||||
use std::{pin::Pin, task::Poll};
|
||||
use std::pin::Pin;
|
||||
|
||||
use anyhow::Result;
|
||||
use async_compression::tokio::write::ZstdEncoder;
|
||||
use async_stream::stream;
|
||||
|
||||
use log::*;
|
||||
use tokio::{
|
||||
io::{AsyncRead, AsyncWrite, AsyncWriteExt},
|
||||
pin,
|
||||
};
|
||||
use tokio::io::{AsyncRead, AsyncWriteExt};
|
||||
use tokio_stream::StreamExt;
|
||||
use tokio_util::io::{ReaderStream, StreamReader};
|
||||
|
||||
use crate::adapters::ReadBox;
|
||||
|
||||
/**
|
||||
* wrap a writer so that it is passthrough,
|
||||
* wrap a AsyncRead so that it is passthrough,
|
||||
* but also the written data is compressed and written into a buffer,
|
||||
* unless more than max_cache_size bytes is written, then the cache is dropped and it is pure passthrough.
|
||||
*/
|
||||
pub struct CachingReader<R: AsyncRead> {
|
||||
pub fn async_read_and_write_to_cache<'a>(
|
||||
inp: impl AsyncRead + Send +'a,
|
||||
max_cache_size: usize,
|
||||
// set to none if the size goes over the limit
|
||||
zstd_writer: Option<ZstdEncoder<Vec<u8>>>,
|
||||
inp: Pin<Box<R>>,
|
||||
bytes_written: u64,
|
||||
compression_level: i32,
|
||||
on_finish: Box<dyn FnOnce((u64, Option<Vec<u8>>)) -> Result<()> + Send>,
|
||||
}
|
||||
impl<R: AsyncRead> CachingReader<R> {
|
||||
pub fn new(
|
||||
inp: R,
|
||||
max_cache_size: usize,
|
||||
compression_level: i32,
|
||||
on_finish: Box<dyn FnOnce((u64, Option<Vec<u8>>)) -> Result<()> + Send>,
|
||||
) -> Result<CachingReader<R>> {
|
||||
Ok(CachingReader {
|
||||
inp: Box::pin(inp),
|
||||
max_cache_size,
|
||||
zstd_writer: Some(ZstdEncoder::with_quality(
|
||||
Vec::new(),
|
||||
async_compression::Level::Precise(compression_level as u32),
|
||||
)),
|
||||
bytes_written: 0,
|
||||
on_finish,
|
||||
})
|
||||
}
|
||||
pub fn finish(
|
||||
&mut self,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::io::Result<(u64, Option<Vec<u8>>)> {
|
||||
if let Some(writer) = self.zstd_writer.take() {
|
||||
pin!(writer);
|
||||
writer.as_mut().poll_shutdown(cx)?;
|
||||
let res = writer.get_pin_mut().clone(); // TODO: without copying possible?
|
||||
if res.len() <= self.max_cache_size {
|
||||
return Ok((self.bytes_written, Some(res)));
|
||||
}
|
||||
}
|
||||
Ok((self.bytes_written, None))
|
||||
}
|
||||
async fn write_to_compressed(&mut self, buf: &[u8]) -> std::io::Result<()> {
|
||||
if let Some(writer) = self.zstd_writer.as_mut() {
|
||||
writer.write_all(buf).await?;
|
||||
let compressed_len = writer.get_ref().len();
|
||||
trace!("wrote {} to zstd, len now {}", buf.len(), compressed_len);
|
||||
if compressed_len > self.max_cache_size {
|
||||
debug!("cache longer than max, dropping");
|
||||
//writer.finish();
|
||||
self.zstd_writer.take();
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
impl<R> AsyncRead for CachingReader<R>
|
||||
where
|
||||
R: AsyncRead,
|
||||
{
|
||||
fn poll_read(
|
||||
mut self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
mut buf: &mut tokio::io::ReadBuf<'_>,
|
||||
) -> std::task::Poll<std::io::Result<()>> {
|
||||
let old_filled_len = buf.filled().len();
|
||||
match self.inp.as_mut().poll_read(cx, &mut buf) {
|
||||
/*Ok(0) => {
|
||||
) -> Result<Pin<Box<dyn AsyncRead + Send +'a>>> {
|
||||
let inp = Box::pin(inp);
|
||||
let mut zstd_writer = Some(ZstdEncoder::with_quality(
|
||||
Vec::new(),
|
||||
async_compression::Level::Precise(compression_level as u32),
|
||||
));
|
||||
let mut bytes_written = 0;
|
||||
|
||||
}
|
||||
Ok(read_bytes) => {
|
||||
self.write_to_compressed(&buf[0..read_bytes])?;
|
||||
self.bytes_written += read_bytes as u64;
|
||||
Ok(read_bytes)
|
||||
}*/
|
||||
Poll::Ready(rdy) => {
|
||||
if let Ok(()) = &rdy {
|
||||
let slice = buf.filled();
|
||||
let read_bytes = slice.len() - old_filled_len;
|
||||
if read_bytes == 0 {
|
||||
// EOF
|
||||
// move out of box, replace with noop lambda
|
||||
let on_finish =
|
||||
std::mem::replace(&mut self.on_finish, Box::new(|_| Ok(())));
|
||||
// EOF, finish!
|
||||
(on_finish)(self.finish(cx)?)
|
||||
.map(|()| 0)
|
||||
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
|
||||
} else {
|
||||
self.write_to_compressed(&slice[old_filled_len..]);
|
||||
self.bytes_written += read_bytes as u64;
|
||||
let s = stream! {
|
||||
let mut stream = ReaderStream::new(inp);
|
||||
while let Some(bytes) = stream.next().await {
|
||||
if let Ok(bytes) = &bytes {
|
||||
if let Some(writer) = zstd_writer.as_mut() {
|
||||
writer.write_all(&bytes).await?;
|
||||
bytes_written += bytes.len() as u64;
|
||||
let compressed_len = writer.get_ref().len();
|
||||
trace!("wrote {} to zstd, len now {}", bytes.len(), compressed_len);
|
||||
if compressed_len > max_cache_size {
|
||||
debug!("cache longer than max, dropping");
|
||||
//writer.finish();
|
||||
zstd_writer.take();
|
||||
}
|
||||
}
|
||||
Poll::Ready(rdy)
|
||||
}
|
||||
Poll::Pending => Poll::Pending,
|
||||
yield bytes;
|
||||
}
|
||||
}
|
||||
// EOF, call on_finish
|
||||
let finish = {
|
||||
if let Some(mut writer) = zstd_writer.take() {
|
||||
writer.shutdown().await?;
|
||||
let res = writer.into_inner();
|
||||
if res.len() <= max_cache_size {
|
||||
(bytes_written, Some(res))
|
||||
} else {
|
||||
(bytes_written, None)
|
||||
}
|
||||
} else {
|
||||
(bytes_written, None)
|
||||
}
|
||||
};
|
||||
|
||||
// EOF, finish!
|
||||
on_finish(finish)
|
||||
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
|
||||
|
||||
};
|
||||
|
||||
Ok(Box::pin(StreamReader::new(s)))
|
||||
}
|
||||
|
Loading…
Reference in New Issue