Refactor RawTransport to Transport and delete UntypedTransport

pull/146/head
Chip Senkbeil 2 years ago
parent 3c345f86fe
commit df9405dcf4
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

@ -1,5 +1,5 @@
use crate::{
Codec, FramedTransport, IntoSplit, RawTransport, RawTransportRead, RawTransportWrite, Request,
Codec, FramedTransport, IntoSplit, Transport, TransportRead, TransportWrite, Request,
Response, TypedAsyncRead, TypedAsyncWrite,
};
use serde::{de::DeserializeOwned, Serialize};

@ -64,7 +64,7 @@ impl Listener for TcpListener {
#[cfg(test)]
mod tests {
use super::*;
use crate::RawTransport;
use crate::Transport;
use std::net::{Ipv6Addr, SocketAddr};
use tokio::{sync::oneshot, task::JoinHandle};

@ -94,7 +94,7 @@ impl Listener for UnixSocketListener {
#[cfg(test)]
mod tests {
use super::*;
use crate::RawTransport;
use crate::Transport;
use tempfile::NamedTempFile;
use tokio::{sync::oneshot, task::JoinHandle};

@ -66,7 +66,7 @@ impl Listener for WindowsPipeListener {
#[cfg(test)]
mod tests {
use super::*;
use crate::RawTransport;
use crate::Transport;
use tokio::{sync::oneshot, task::JoinHandle};
#[tokio::test]

@ -3,14 +3,26 @@ use std::io;
// mod router;
mod raw;
pub use raw::*;
/*
mod typed;
pub use typed::*;
/* mod framed;
pub use framed::*; */
mod untyped;
pub use untyped::*; */
mod inmemory;
pub use inmemory::*;
mod tcp;
pub use tcp::*;
#[cfg(unix)]
mod unix;
#[cfg(unix)]
pub use unix::*;
#[cfg(windows)]
mod windows;
#[cfg(windows)]
pub use windows::*;
pub use tokio::io::{Interest, Ready};
@ -20,3 +32,106 @@ pub trait Reconnectable {
/// Attempts to reconnect an already-established connection
async fn reconnect(&mut self) -> io::Result<()>;
}
/// Interface representing a transport of raw bytes into and out of the system
#[async_trait]
pub trait Transport: Reconnectable {
/// Tries to read data from the transport into the provided buffer, returning how many bytes
/// were read
///
/// This call may return an error with [`ErrorKind::WouldBlock`] in the case that the transport
/// is not ready to read data.
///
/// [`ErrorKind::WouldBlock`]: io::ErrorKind::WouldBlock
fn try_read(&self, buf: &mut [u8]) -> io::Result<usize>;
/// Try to write a buffer to the transport, returning how many bytes were written
///
/// This call may return an error with [`ErrorKind::WouldBlock`] in the case that the transport
/// is not ready to write data.
///
/// [`ErrorKind::WouldBlock`]: io::ErrorKind::WouldBlock
fn try_write(&self, buf: &[u8]) -> io::Result<usize>;
/// Waits for the transport to be ready based on the given interest, returning the ready status
async fn ready(&self, interest: Interest) -> io::Result<Ready>;
/// Waits for the transport to be readable to follow up with `try_read`
async fn readable(&self) -> io::Result<()> {
let _ = self.ready(Interest::READABLE).await?;
Ok(())
}
/// Waits for the transport to be writeable to follow up with `try_write`
async fn writeable(&self) -> io::Result<()> {
let _ = self.ready(Interest::WRITABLE).await?;
Ok(())
}
/// Reads exactly `n` bytes where `n` is the length of `buf` by continuing to call [`try_read`]
/// until completed. Calls to [`readable`] are made to ensure the transport is ready. Returns
/// the total bytes read.
///
/// [`try_read`]: Transport::try_read
/// [`readable`]: Transport::readable
async fn read_exact(&self, buf: &mut [u8]) -> io::Result<usize> {
let mut i = 0;
while i < buf.len() {
self.readable().await?;
match self.try_read(&mut buf[i..]) {
// If we get 0 bytes read, this usually means that the underlying reader
// has closed, so we will return an EOF error to reflect that
//
// NOTE: `try_read` can also return 0 if the buf len is zero, but because we check
// that our index is < len, the situation where we call try_read with a buf
// of len 0 will never happen
Ok(0) => return Err(io::Error::from(io::ErrorKind::UnexpectedEof)),
Ok(n) => i += n,
// Because we are using `try_read`, it can be possible for it to return
// WouldBlock; so, if we encounter that then we just wait for next readable
Err(x) if x.kind() == io::ErrorKind::WouldBlock => continue,
Err(x) => return Err(x),
}
}
Ok(i)
}
/// Writes all of `buf` by continuing to call [`try_write`] until completed. Calls to
/// [`writeable`] are made to ensure the transport is ready.
///
/// [`try_write`]: Transport::try_write
/// [`writable`]: Transport::writable
async fn write_all(&self, buf: &[u8]) -> io::Result<()> {
let mut i = 0;
while i < buf.len() {
self.writeable().await?;
match self.try_write(&buf[i..]) {
// If we get 0 bytes written, this usually means that the underlying writer
// has closed, so we will return a broken pipe error to reflect that
//
// NOTE: `try_write` can also return 0 if the buf len is zero, but because we check
// that our index is < len, the situation where we call try_write with a buf
// of len 0 will never happen
Ok(0) => return Err(io::Error::from(io::ErrorKind::BrokenPipe)),
Ok(n) => i += n,
// Because we are using `try_write`, it can be possible for it to return
// WouldBlock; so, if we encounter that then we just wait for next writeable
Err(x) if x.kind() == io::ErrorKind::WouldBlock => continue,
Err(x) => return Err(x),
}
}
Ok(())
}
}

@ -1,16 +1,16 @@
use super::{Interest, RawTransport, Ready, Reconnectable};
use super::{Interest, Transport, Ready, Reconnectable};
use async_trait::async_trait;
use std::io;
mod codec;
pub use codec::*;
/// Represents a [`RawTransport`] that reads and writes using frames defined by a [`Codec`],
/// Represents a [`Transport`] that reads and writes using frames defined by a [`Codec`],
/// which provides the ability to guarantee that data is read and written completely and also
/// follows the format of the given codec such as encryption and authentication of bytes
pub struct FramedRawTransport<T, C>
pub struct FramedTransport<T, C>
where
T: RawTransport,
T: Transport,
C: Codec,
{
inner: T,
@ -18,9 +18,9 @@ where
}
#[async_trait]
impl<T, C> Reconnectable for FramedRawTransport<T, C>
impl<T, C> Reconnectable for FramedTransport<T, C>
where
T: RawTransport,
T: Transport,
C: Codec,
{
async fn reconnect(&mut self) -> io::Result<()> {
@ -29,9 +29,9 @@ where
}
#[async_trait]
impl<T, C> RawTransport for FramedRawTransport<T, C>
impl<T, C> Transport for FramedTransport<T, C>
where
T: RawTransport,
T: Transport,
C: Codec,
{
/// Tries to read a frame of data into `buf`
@ -49,7 +49,7 @@ where
}
}
impl FramedRawTransport<super::InmemoryTransport, PlainCodec> {
impl FramedTransport<super::InmemoryTransport, PlainCodec> {
/// Produces a pair of inmemory transports that are connected to each other using
/// a standard codec
///
@ -57,12 +57,12 @@ impl FramedRawTransport<super::InmemoryTransport, PlainCodec> {
pub fn pair(
buffer: usize,
) -> (
FramedRawTransport<super::InmemoryTransport, PlainCodec>,
FramedRawTransport<super::InmemoryTransport, PlainCodec>,
FramedTransport<super::InmemoryTransport, PlainCodec>,
FramedTransport<super::InmemoryTransport, PlainCodec>,
) {
let (a, b) = super::InmemoryTransport::pair(buffer);
let a = FramedRawTransport::new(a, PlainCodec::new());
let b = FramedRawTransport::new(b, PlainCodec::new());
let a = FramedTransport::new(a, PlainCodec::new());
let b = FramedTransport::new(b, PlainCodec::new());
(a, b)
}
}

@ -1,4 +1,4 @@
use super::{Interest, RawTransport, Ready, Reconnectable};
use super::{Interest, Ready, Reconnectable, Transport};
use async_trait::async_trait;
use std::{
io,
@ -9,7 +9,7 @@ use tokio::sync::mpsc::{
error::{TryRecvError, TrySendError},
};
/// Represents a [`RawTransport`] comprised of two inmemory channels
/// Represents a [`Transport`] comprised of two inmemory channels
#[derive(Debug)]
pub struct InmemoryTransport {
tx: mpsc::Sender<Vec<u8>>,
@ -92,7 +92,7 @@ impl Reconnectable for InmemoryTransport {
}
#[async_trait]
impl RawTransport for InmemoryTransport {
impl Transport for InmemoryTransport {
fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
// Lock our internal storage to ensure that nothing else mutates it for the lifetime of
// this call as we want to make sure that data is read and stored in order

@ -1,127 +0,0 @@
use super::{Interest, Ready, Reconnectable};
use async_trait::async_trait;
use std::io;
/* mod framed;
pub use framed::*; */
mod inmemory;
pub use inmemory::*;
mod tcp;
pub use tcp::*;
#[cfg(unix)]
mod unix;
#[cfg(unix)]
pub use unix::*;
#[cfg(windows)]
mod windows;
#[cfg(windows)]
pub use windows::*;
/// Interface representing a transport of raw bytes into and out of the system
#[async_trait]
pub trait RawTransport: Reconnectable {
/// Tries to read data from the transport into the provided buffer, returning how many bytes
/// were read
///
/// This call may return an error with [`ErrorKind::WouldBlock`] in the case that the transport
/// is not ready to read data.
///
/// [`ErrorKind::WouldBlock`]: io::ErrorKind::WouldBlock
fn try_read(&self, buf: &mut [u8]) -> io::Result<usize>;
/// Try to write a buffer to the transport, returning how many bytes were written
///
/// This call may return an error with [`ErrorKind::WouldBlock`] in the case that the transport
/// is not ready to write data.
///
/// [`ErrorKind::WouldBlock`]: io::ErrorKind::WouldBlock
fn try_write(&self, buf: &[u8]) -> io::Result<usize>;
/// Waits for the transport to be ready based on the given interest, returning the ready status
async fn ready(&self, interest: Interest) -> io::Result<Ready>;
/// Waits for the transport to be readable to follow up with `try_read`
async fn readable(&self) -> io::Result<()> {
let _ = self.ready(Interest::READABLE).await?;
Ok(())
}
/// Waits for the transport to be writeable to follow up with `try_write`
async fn writeable(&self) -> io::Result<()> {
let _ = self.ready(Interest::WRITABLE).await?;
Ok(())
}
/// Reads exactly `n` bytes where `n` is the length of `buf` by continuing to call [`try_read`]
/// until completed. Calls to [`readable`] are made to ensure the transport is ready. Returns
/// the total bytes read.
///
/// [`try_read`]: RawTransport::try_read
/// [`readable`]: RawTransport::readable
async fn read_exact(&self, buf: &mut [u8]) -> io::Result<usize> {
let mut i = 0;
while i < buf.len() {
self.readable().await?;
match self.try_read(&mut buf[i..]) {
// If we get 0 bytes read, this usually means that the underlying reader
// has closed, so we will return an EOF error to reflect that
//
// NOTE: `try_read` can also return 0 if the buf len is zero, but because we check
// that our index is < len, the situation where we call try_read with a buf
// of len 0 will never happen
Ok(0) => return Err(io::Error::from(io::ErrorKind::UnexpectedEof)),
Ok(n) => i += n,
// Because we are using `try_read`, it can be possible for it to return
// WouldBlock; so, if we encounter that then we just wait for next readable
Err(x) if x.kind() == io::ErrorKind::WouldBlock => continue,
Err(x) => return Err(x),
}
}
Ok(i)
}
/// Writes all of `buf` by continuing to call [`try_write`] until completed. Calls to
/// [`writeable`] are made to ensure the transport is ready.
///
/// [`try_write`]: RawTransport::try_write
/// [`writable`]: RawTransport::writable
async fn write_all(&self, buf: &[u8]) -> io::Result<()> {
let mut i = 0;
while i < buf.len() {
self.writeable().await?;
match self.try_write(&buf[i..]) {
// If we get 0 bytes written, this usually means that the underlying writer
// has closed, so we will return a broken pipe error to reflect that
//
// NOTE: `try_write` can also return 0 if the buf len is zero, but because we check
// that our index is < len, the situation where we call try_write with a buf
// of len 0 will never happen
Ok(0) => return Err(io::Error::from(io::ErrorKind::BrokenPipe)),
Ok(n) => i += n,
// Because we are using `try_write`, it can be possible for it to return
// WouldBlock; so, if we encounter that then we just wait for next writeable
Err(x) if x.kind() == io::ErrorKind::WouldBlock => continue,
Err(x) => return Err(x),
}
}
Ok(())
}
}

@ -1,9 +1,9 @@
use super::{Interest, RawTransport, Ready, Reconnectable};
use super::{Interest, Ready, Reconnectable, Transport};
use async_trait::async_trait;
use std::{fmt, io, net::IpAddr};
use tokio::net::{TcpStream, ToSocketAddrs};
/// Represents a [`RawTransport`] that leverages a TCP stream
/// Represents a [`Transport`] that leverages a TCP stream
pub struct TcpTransport {
pub(crate) addr: IpAddr,
pub(crate) port: u16,
@ -52,7 +52,7 @@ impl Reconnectable for TcpTransport {
}
#[async_trait]
impl RawTransport for TcpTransport {
impl Transport for TcpTransport {
fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.try_read(buf)
}

@ -1,4 +1,4 @@
use super::{Interest, RawTransport, Ready, Reconnectable};
use super::{Interest, Ready, Reconnectable, Transport};
use async_trait::async_trait;
use std::{
fmt, io,
@ -6,7 +6,7 @@ use std::{
};
use tokio::net::UnixStream;
/// Represents a [`RawTransport`] that leverages a Unix socket
/// Represents a [`Transport`] that leverages a Unix socket
pub struct UnixSocketTransport {
pub(crate) path: PathBuf,
pub(crate) inner: UnixStream,
@ -45,7 +45,7 @@ impl Reconnectable for UnixSocketTransport {
}
#[async_trait]
impl RawTransport for UnixSocketTransport {
impl Transport for UnixSocketTransport {
fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.try_read(buf)
}

@ -1,47 +0,0 @@
use super::{Interest, Ready, Reconnectable, TypedTransport};
use async_trait::async_trait;
use serde::{de::DeserializeOwned, Serialize};
use std::io;
/// Interface representing a transport that uses [`serde`] to serialize and deserialize data
/// as it is sent and received
#[async_trait]
pub trait UntypedTransport: Reconnectable {
/// Attempts to read some data as `T`, returning [`io::Error`] if unable to deserialize
/// or some other error occurs. `Some(T)` is returned if successful. `None` is
/// returned if no more data is available.
///
/// This call may return an error with [`ErrorKind::WouldBlock`] in the case that the transport
/// is not ready to read data.
///
/// [`ErrorKind::WouldBlock`]: io::ErrorKind::WouldBlock
fn try_read<T>(&self) -> io::Result<Option<T>>
where
T: DeserializeOwned;
/// Attempts to write some data `T` by serializing it into bytes, returning [`io::Error`] if
/// unable to serialize or some other error occurs
///
/// This call may return an error with [`ErrorKind::WouldBlock`] in the case that the transport
/// is not ready to write data.
///
/// [`ErrorKind::WouldBlock`]: io::ErrorKind::WouldBlock
fn try_write<T>(&self, value: &T) -> io::Result<()>
where
T: Serialize;
/// Waits for the transport to be ready based on the given interest, returning the ready status
async fn ready(&self, interest: Interest) -> io::Result<Ready>;
/// Waits for the transport to be readable to follow up with `try_read`
async fn readable(&self) -> io::Result<()> {
let _ = self.ready(Interest::READABLE).await?;
Ok(())
}
/// Waits for the transport to be writable to follow up with `try_write`
async fn writeable(&self) -> io::Result<()> {
let _ = self.ready(Interest::WRITABLE).await?;
Ok(())
}
}

@ -1,4 +1,4 @@
use super::{Interest, RawTransport, Ready, Reconnectable};
use super::{Interest, Transport, Ready, Reconnectable};
use async_trait::async_trait;
use std::{
ffi::{OsStr, OsString},
@ -8,7 +8,7 @@ use std::{
mod pipe;
pub use pipe::NamedPipe;
/// Represents a [`RawTransport`] that leverages a named Windows pipe (client or server)
/// Represents a [`Transport`] that leverages a named Windows pipe (client or server)
pub struct WindowsPipeTransport {
pub(crate) addr: OsString,
pub(crate) inner: NamedPipe,
@ -62,7 +62,7 @@ impl Reconnectable for WindowsPipeTransport {
}
#[async_trait]
impl RawTransport for WindowsPipeTransport {
impl Transport for WindowsPipeTransport {
fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
match &self.inner {
NamedPipe::Client(x) => x.try_read(buf),
Loading…
Cancel
Save