|
|
|
@ -18,8 +18,8 @@ pub struct InmemoryStream {
|
|
|
|
|
impl InmemoryStream {
|
|
|
|
|
pub fn new(incoming: mpsc::Receiver<Vec<u8>>, outgoing: mpsc::Sender<Vec<u8>>) -> Self {
|
|
|
|
|
Self {
|
|
|
|
|
incoming: InmemoryStreamReadHalf(incoming),
|
|
|
|
|
outgoing: InmemoryStreamWriteHalf(outgoing),
|
|
|
|
|
incoming: InmemoryStreamReadHalf::new(incoming),
|
|
|
|
|
outgoing: InmemoryStreamWriteHalf::new(outgoing),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -73,7 +73,19 @@ impl AsyncWrite for InmemoryStream {
|
|
|
|
|
|
|
|
|
|
/// Read portion of an inmemory channel
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
|
pub struct InmemoryStreamReadHalf(mpsc::Receiver<Vec<u8>>);
|
|
|
|
|
pub struct InmemoryStreamReadHalf {
|
|
|
|
|
rx: mpsc::Receiver<Vec<u8>>,
|
|
|
|
|
overflow: Vec<u8>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl InmemoryStreamReadHalf {
|
|
|
|
|
pub fn new(rx: mpsc::Receiver<Vec<u8>>) -> Self {
|
|
|
|
|
Self {
|
|
|
|
|
rx,
|
|
|
|
|
overflow: Vec::new(),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl AsyncRead for InmemoryStreamReadHalf {
|
|
|
|
|
fn poll_read(
|
|
|
|
@ -81,9 +93,37 @@ impl AsyncRead for InmemoryStreamReadHalf {
|
|
|
|
|
cx: &mut Context<'_>,
|
|
|
|
|
buf: &mut ReadBuf<'_>,
|
|
|
|
|
) -> Poll<io::Result<()>> {
|
|
|
|
|
self.0.poll_recv(cx).map(|x| match x {
|
|
|
|
|
Some(x) => {
|
|
|
|
|
buf.put_slice(&x);
|
|
|
|
|
// If we cannot fit any more into the buffer at the moment, we wait
|
|
|
|
|
if buf.remaining() == 0 {
|
|
|
|
|
return Poll::Ready(Err(io::Error::new(
|
|
|
|
|
io::ErrorKind::Other,
|
|
|
|
|
"Cannot poll as buf.remaining() == 0",
|
|
|
|
|
)));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// If we have overflow from the last poll, put that in the buffer
|
|
|
|
|
if !self.overflow.is_empty() {
|
|
|
|
|
if self.overflow.len() > buf.remaining() {
|
|
|
|
|
let extra = self.overflow.split_off(buf.remaining());
|
|
|
|
|
buf.put_slice(&self.overflow);
|
|
|
|
|
self.overflow = extra;
|
|
|
|
|
} else {
|
|
|
|
|
buf.put_slice(&self.overflow);
|
|
|
|
|
self.overflow.clear();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return Poll::Ready(Ok(()));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Otherwise, we poll for the next batch to read in
|
|
|
|
|
self.rx.poll_recv(cx).map(|x| match x {
|
|
|
|
|
Some(mut x) => {
|
|
|
|
|
if x.len() > buf.remaining() {
|
|
|
|
|
self.overflow = x.split_off(buf.remaining());
|
|
|
|
|
buf.put_slice(&x);
|
|
|
|
|
} else {
|
|
|
|
|
buf.put_slice(&x);
|
|
|
|
|
}
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
None => Ok(()),
|
|
|
|
@ -95,6 +135,12 @@ impl AsyncRead for InmemoryStreamReadHalf {
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
|
pub struct InmemoryStreamWriteHalf(mpsc::Sender<Vec<u8>>);
|
|
|
|
|
|
|
|
|
|
impl InmemoryStreamWriteHalf {
|
|
|
|
|
pub fn new(tx: mpsc::Sender<Vec<u8>>) -> Self {
|
|
|
|
|
Self(tx)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl AsyncWrite for InmemoryStreamWriteHalf {
|
|
|
|
|
fn poll_write(
|
|
|
|
|
self: Pin<&mut Self>,
|
|
|
|
|