Unfinished testing of inmemory transport

pull/146/head
Chip Senkbeil 2 years ago
parent ec28292396
commit b3271df559
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

@ -5,11 +5,11 @@ use std::io;
mod raw;
pub use raw::*;
/*
mod typed;
pub use typed::*;
/* mod typed;
pub use typed::*; */
/* mod untyped;
mod untyped;
pub use untyped::*; */
pub use tokio::io::{Interest, Ready};

@ -5,8 +5,8 @@ use std::io;
/* mod framed;
pub use framed::*; */
/* mod inmemory;
pub use inmemory::*; */
mod inmemory;
pub use inmemory::*;
mod tcp;
pub use tcp::*;

@ -34,10 +34,12 @@ where
T: RawTransport,
C: Codec,
{
/// Tries to read a frame of data into `buf`
fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
todo!();
}
/// Tries to write `buf` as a frame of data
fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
todo!();
}

@ -13,7 +13,7 @@ use tokio::sync::mpsc::{
#[derive(Debug)]
pub struct InmemoryTransport {
tx: mpsc::Sender<Vec<u8>>,
rx: mpsc::Receiver<Vec<u8>>,
rx: Mutex<mpsc::Receiver<Vec<u8>>>,
/// Internal storage used when we get more data from a `try_read` than can be returned
buf: Mutex<Option<Vec<u8>>>,
@ -23,7 +23,7 @@ impl InmemoryTransport {
pub fn new(tx: mpsc::Sender<Vec<u8>>, rx: mpsc::Receiver<Vec<u8>>) -> Self {
Self {
tx,
rx,
rx: Mutex::new(rx),
buf: Mutex::new(None),
}
}
@ -57,12 +57,12 @@ impl InmemoryTransport {
///
/// Track https://github.com/tokio-rs/tokio/issues/4638 for future `is_closed` on rx
fn is_rx_closed(&self) -> bool {
match self.rx.try_recv() {
match self.rx.lock().unwrap().try_recv() {
Ok(mut data) => {
let buf_lock = self.buf.lock().unwrap();
let mut buf_lock = self.buf.lock().unwrap();
let data = match buf_lock.take() {
Some(existing) => {
Some(mut existing) => {
existing.append(&mut data);
existing
}
@ -71,7 +71,7 @@ impl InmemoryTransport {
*buf_lock = Some(data);
true
false
}
Err(TryRecvError::Empty) => false,
Err(TryRecvError::Disconnected) => true,
@ -96,17 +96,17 @@ impl RawTransport for InmemoryTransport {
fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
// Lock our internal storage to ensure that nothing else mutates it for the lifetime of
// this call as we want to make sure that data is read and stored in order
let buf_lock = self.buf.lock().unwrap();
let mut buf_lock = self.buf.lock().unwrap();
// Check if we have data in our internal buffer, and if so feed it into the outgoing buf
if let Some(data) = buf_lock.take() {
return Ok(copy_and_store(buf_lock, data, buf));
}
match self.rx.try_recv() {
match self.rx.lock().unwrap().try_recv() {
Ok(data) => Ok(copy_and_store(buf_lock, data, buf)),
Err(TryRecvError::Empty) => Err(io::Error::from(io::ErrorKind::WouldBlock)),
Err(TryRecvError::Disconnected) => Ok(None),
Err(TryRecvError::Disconnected) => Ok(0),
}
}
@ -114,7 +114,7 @@ impl RawTransport for InmemoryTransport {
match self.tx.try_send(buf.to_vec()) {
Ok(()) => Ok(buf.len()),
Err(TrySendError::Full(_)) => Err(io::Error::from(io::ErrorKind::WouldBlock)),
Err(TryRecvError::Closed(_)) => Err(io::Error::from(io::ErrorKind::BrokenPipe)),
Err(TrySendError::Closed(_)) => Ok(0),
}
}
@ -133,7 +133,7 @@ impl RawTransport for InmemoryTransport {
};
}
if interest.is_writeable() {
if interest.is_writable() {
status |= if self.tx.is_closed() {
Ready::WRITE_CLOSED
} else {
@ -147,7 +147,11 @@ impl RawTransport for InmemoryTransport {
/// Copies `data` into `out`, storing any overflow from `data` into the storage pointed to by the
/// mutex `buf_lock`
fn copy_and_store(buf_lock: MutexGuard<Option<Vec<u8>>>, data: Vec<u8>, out: &mut [u8]) -> usize {
fn copy_and_store(
mut buf_lock: MutexGuard<Option<Vec<u8>>>,
mut data: Vec<u8>,
out: &mut [u8],
) -> usize {
// NOTE: We can get data that is larger than the destination buf; so,
// we store as much as we can and queue up the rest in our temporary
// storage for future retrievals
@ -166,11 +170,15 @@ fn copy_and_store(buf_lock: MutexGuard<Option<Vec<u8>>>, data: Vec<u8>, out: &mu
#[cfg(test)]
mod tests {
use super::*;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::test]
async fn ready_should_properly_report_if_channels_are_open() {
todo!();
}
#[tokio::test]
async fn make_should_return_sender_that_sends_data_to_transport() {
let (tx, _, mut transport) = InmemoryTransport::make(3);
let (tx, _, transport) = InmemoryTransport::make(3);
tx.send(b"test msg 1".to_vec()).await.unwrap();
tx.send(b"test msg 2".to_vec()).await.unwrap();
@ -178,11 +186,11 @@ mod tests {
// Should get data matching a singular message
let mut buf = [0; 256];
let len = transport.read(&mut buf).await.unwrap();
let len = transport.try_read(&mut buf).unwrap();
assert_eq!(&buf[..len], b"test msg 1");
// Next call would get the second message
let len = transport.read(&mut buf).await.unwrap();
let len = transport.try_read(&mut buf).unwrap();
assert_eq!(&buf[..len], b"test msg 2");
// When the last of the senders is dropped, we should still get
@ -190,16 +198,16 @@ mod tests {
// an indicator that there is no more data
drop(tx);
let len = transport.read(&mut buf).await.unwrap();
let len = transport.try_read(&mut buf).unwrap();
assert_eq!(&buf[..len], b"test msg 3");
let len = transport.read(&mut buf).await.unwrap();
let len = transport.try_read(&mut buf).unwrap();
assert_eq!(len, 0, "Unexpectedly got more data");
}
#[tokio::test]
async fn make_should_return_receiver_that_receives_data_from_transport() {
let (_, mut rx, mut transport) = InmemoryTransport::make(3);
let (_, mut rx, transport) = InmemoryTransport::make(3);
transport.write_all(b"test msg 1").await.unwrap();
transport.write_all(b"test msg 2").await.unwrap();

@ -1,6 +1,6 @@
use super::{Interest, Ready, Reconnectable, TypedTransport};
use async_trait::async_trait;
use std::io;
use std::{io, sync::Mutex};
use tokio::sync::mpsc::{
self,
error::{TryRecvError, TrySendError},
@ -14,12 +14,15 @@ use tokio::sync::mpsc::{
#[derive(Debug)]
pub struct InmemoryTypedTransport<T, U> {
tx: mpsc::Sender<T>,
rx: mpsc::Receiver<U>,
rx: Mutex<mpsc::Receiver<U>>,
}
impl<T, U> InmemoryTypedTransport<T, U> {
pub fn new(tx: mpsc::Sender<T>, rx: mpsc::Receiver<U>) -> Self {
Self { tx, rx }
Self {
tx,
rx: Mutex::new(rx),
}
}
/// Creates a pair of connected transports using `buffer` as maximum
@ -35,7 +38,11 @@ impl<T, U> InmemoryTypedTransport<T, U> {
}
#[async_trait]
impl<T, U> Reconnectable for InmemoryTypedTransport<T, U> {
impl<T, U> Reconnectable for InmemoryTypedTransport<T, U>
where
T: Send,
U: Send,
{
/// Once the underlying channels have closed, there is no way for this transport to
/// re-establish those channels; therefore, reconnecting will always fail with
/// [`ErrorKind::Unsupported`]
@ -47,12 +54,16 @@ impl<T, U> Reconnectable for InmemoryTypedTransport<T, U> {
}
#[async_trait]
impl<T, U> TypedTransport<T, U> for InmemoryTypedTransport<T, U> {
impl<T, U> TypedTransport for InmemoryTypedTransport<T, U>
where
T: Send,
U: Send,
{
type Input = U;
type Output = T;
fn try_read(&self) -> io::Result<Option<Self::Input>> {
match self.rx.try_recv() {
match self.rx.lock().unwrap().try_recv() {
Ok(x) => Ok(Some(x)),
Err(TryRecvError::Empty) => Err(io::Error::from(io::ErrorKind::WouldBlock)),
Err(TryRecvError::Disconnected) => Ok(None),
@ -63,11 +74,33 @@ impl<T, U> TypedTransport<T, U> for InmemoryTypedTransport<T, U> {
match self.tx.try_send(value) {
Ok(()) => Ok(()),
Err(TrySendError::Full(_)) => Err(io::Error::from(io::ErrorKind::WouldBlock)),
Err(TryRecvError::Closed(_)) => Err(io::Error::from(io::ErrorKind::BrokenPipe)),
Err(TrySendError::Closed(_)) => Err(io::Error::from(io::ErrorKind::BrokenPipe)),
}
}
async fn ready(&self, interest: Interest) -> io::Result<Ready> {
todo!();
let mut status = Ready::EMPTY;
if interest.is_readable() {
// TODO: Replace `self.is_rx_closed()` with `self.rx.is_closed()` once the tokio issue
// is resolved that adds `is_closed` to the `mpsc::Receiver`
//
// See https://github.com/tokio-rs/tokio/issues/4638
status |= if self.is_rx_closed() && self.buf.lock().unwrap().is_none() {
Ready::READ_CLOSED
} else {
Ready::READABLE
};
}
if interest.is_writable() {
status |= if self.tx.is_closed() {
Ready::WRITE_CLOSED
} else {
Ready::WRITABLE
};
}
Ok(status)
}
}

@ -45,26 +45,3 @@ pub trait UntypedTransport: Reconnectable {
Ok(())
}
}
#[async_trait]
impl<T, I, O> TypedTransport<I, O> for T
where
T: UntypedTransport,
I: DeserializeOwned,
O: Serialize,
{
/// Tries to read a value from the transport
fn try_read(&self) -> io::Result<Option<I>> {
UntypedTransport::try_read(self)
}
/// Try to write a value to the transport
fn try_write(&self, value: O) -> io::Result<()> {
UntypedTransport::try_write(self, &value)
}
/// Waits for the transport to be ready based on the given interest, returning the ready status
async fn ready(&self, interest: Interest) -> io::Result<Ready> {
UntypedTransport::ready(self, interest).await
}
}

Loading…
Cancel
Save