|
|
|
@ -122,6 +122,129 @@ where
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<T, const CAPACITY: usize> FramedTransport<T, CAPACITY>
|
|
|
|
|
where
|
|
|
|
|
T: Transport,
|
|
|
|
|
{
|
|
|
|
|
/// Reads a frame of bytes by using the [`Codec`] tied to this transport. Returns
|
|
|
|
|
/// `Ok(Some(frame))` upon reading a frame, or `Ok(None)` if the underlying transport has
|
|
|
|
|
/// closed.
|
|
|
|
|
///
|
|
|
|
|
/// This call may return an error with [`ErrorKind::WouldBlock`] in the case that the transport
|
|
|
|
|
/// is not ready to read data or has not received a full frame before waiting.
|
|
|
|
|
///
|
|
|
|
|
/// [`ErrorKind::WouldBlock`]: io::ErrorKind::WouldBlock
|
|
|
|
|
pub fn try_read_frame(&mut self) -> io::Result<Option<OwnedFrame>> {
|
|
|
|
|
// Continually read bytes into the incoming queue and then attempt to tease out a frame
|
|
|
|
|
let mut buf = [0; CAPACITY];
|
|
|
|
|
|
|
|
|
|
loop {
|
|
|
|
|
match self.inner.try_read(&mut buf) {
|
|
|
|
|
// Getting 0 bytes on read indicates the channel has closed. If we were still
|
|
|
|
|
// expecting more bytes for our frame, then this is an error, otherwise if we
|
|
|
|
|
// have nothing remaining if our queue then this is an expected end and we
|
|
|
|
|
// return None
|
|
|
|
|
Ok(0) if self.incoming.is_empty() => return Ok(None),
|
|
|
|
|
Ok(0) => return Err(io::Error::from(io::ErrorKind::UnexpectedEof)),
|
|
|
|
|
|
|
|
|
|
// Got some additional bytes, which we will add to our queue and then attempt to
|
|
|
|
|
// decode into a frame
|
|
|
|
|
Ok(n) => {
|
|
|
|
|
self.incoming.extend_from_slice(&buf[..n]);
|
|
|
|
|
|
|
|
|
|
// Attempt to read a frame, returning the decoded frame if we get one,
|
|
|
|
|
// continuing to try to read more bytes if we don't find a frame, and returning
|
|
|
|
|
// any error that is encountered from reading frames or failing to decode
|
|
|
|
|
let frame = match Frame::read(&mut self.incoming) {
|
|
|
|
|
Ok(Some(frame)) => frame,
|
|
|
|
|
Ok(None) => continue,
|
|
|
|
|
Err(x) => return Err(x),
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
return Ok(Some(self.codec.decode(frame)?.into_owned()));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Any error (including WouldBlock) will get bubbled up
|
|
|
|
|
Err(x) => return Err(x),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Continues to invoke [`try_read_frame`] until a frame is successfully read, an error is
|
|
|
|
|
/// encountered that is not [`ErrorKind::WouldBlock`], or the underlying transport has closed.
|
|
|
|
|
///
|
|
|
|
|
/// [`try_read_frame`]: FramedTransport::try_read_frame
|
|
|
|
|
/// [`ErrorKind::WouldBlock`]: io::ErrorKind::WouldBlock
|
|
|
|
|
pub async fn read_frame(&mut self) -> io::Result<Option<OwnedFrame>> {
|
|
|
|
|
loop {
|
|
|
|
|
self.readable().await?;
|
|
|
|
|
|
|
|
|
|
match self.try_read_frame() {
|
|
|
|
|
Err(x) if x.kind() == io::ErrorKind::WouldBlock => continue,
|
|
|
|
|
x => return x,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Writes a `frame` of bytes by using the [`Codec`] tied to this transport.
|
|
|
|
|
///
|
|
|
|
|
/// This is accomplished by continually calling the inner transport's `try_write`. If 0 is
|
|
|
|
|
/// returned from a call to `try_write`, this will fail with [`ErrorKind::WriteZero`].
|
|
|
|
|
///
|
|
|
|
|
/// This call may return an error with [`ErrorKind::WouldBlock`] in the case that the transport
|
|
|
|
|
/// is not ready to write data or has not written the entire frame before waiting.
|
|
|
|
|
///
|
|
|
|
|
/// [`ErrorKind::WriteZero`]: io::ErrorKind::WriteZero
|
|
|
|
|
/// [`ErrorKind::WouldBlock`]: io::ErrorKind::WouldBlock
|
|
|
|
|
pub fn try_write_frame<'a>(&mut self, frame: impl Into<Frame<'a>>) -> io::Result<()> {
|
|
|
|
|
// Encode the frame and store it in our outgoing queue
|
|
|
|
|
let frame = self.codec.encode(frame.into())?;
|
|
|
|
|
frame.write(&mut self.outgoing)?;
|
|
|
|
|
|
|
|
|
|
// Attempt to write everything in our queue
|
|
|
|
|
self.try_flush()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Invokes [`try_write_frame`] followed by a continuous calls to [`try_flush`] until a frame
|
|
|
|
|
/// is successfully written, an error is encountered that is not [`ErrorKind::WouldBlock`], or
|
|
|
|
|
/// the underlying transport has closed.
|
|
|
|
|
///
|
|
|
|
|
/// [`try_write_frame`]: FramedTransport::try_write_frame
|
|
|
|
|
/// [`try_flush`]: FramedTransport::try_flush
|
|
|
|
|
/// [`ErrorKind::WouldBlock`]: io::ErrorKind::WouldBlock
|
|
|
|
|
pub async fn write_frame<'a>(&mut self, frame: impl Into<Frame<'a>>) -> io::Result<()> {
|
|
|
|
|
self.writeable().await?;
|
|
|
|
|
|
|
|
|
|
match self.try_write_frame(frame) {
|
|
|
|
|
// Would block, so continually try to flush until good to go
|
|
|
|
|
Err(x) if x.kind() == io::ErrorKind::WouldBlock => loop {
|
|
|
|
|
self.writeable().await?;
|
|
|
|
|
match self.try_flush() {
|
|
|
|
|
Err(x) if x.kind() == io::ErrorKind::WouldBlock => continue,
|
|
|
|
|
x => return x,
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
// Already fully succeeded or failed
|
|
|
|
|
x => x,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Perform the client-side of a handshake. See [`handshake`] for more details.
|
|
|
|
|
///
|
|
|
|
|
/// [`handshake`]: FramedTransport::handshake
|
|
|
|
|
pub async fn client_handshake(&mut self) -> io::Result<()> {
|
|
|
|
|
self.handshake(Handshake::client()).await
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Perform the server-side of a handshake. See [`handshake`] for more details.
|
|
|
|
|
///
|
|
|
|
|
/// [`handshake`]: FramedTransport::handshake
|
|
|
|
|
pub async fn server_handshake(&mut self) -> io::Result<()> {
|
|
|
|
|
self.handshake(Handshake::server()).await
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Performs a handshake in order to establish a new codec to use between this transport and
|
|
|
|
|
/// the other side. The parameter `handshake` defines how the transport will handle the
|
|
|
|
@ -158,10 +281,7 @@ where
|
|
|
|
|
/// If a failure happens, the codec will be reset to what it was prior to the handshake
|
|
|
|
|
/// request, and all internal buffers will be cleared to avoid corruption.
|
|
|
|
|
///
|
|
|
|
|
pub async fn handshake(&mut self, handshake: Handshake) -> io::Result<()>
|
|
|
|
|
where
|
|
|
|
|
T: Transport,
|
|
|
|
|
{
|
|
|
|
|
pub async fn handshake(&mut self, handshake: Handshake) -> io::Result<()> {
|
|
|
|
|
// Place transport in plain text communication mode for start of handshake, and clear any
|
|
|
|
|
// data that is lingering within internal buffers
|
|
|
|
|
//
|
|
|
|
@ -300,9 +420,17 @@ where
|
|
|
|
|
|
|
|
|
|
// Bundle our compression and encryption codecs into a single, chained codec
|
|
|
|
|
let codec: BoxedCodec = match (compression_codec, encryption_codec) {
|
|
|
|
|
(Some(c), Some(e)) => Box::new(ChainCodec::new(c, e)),
|
|
|
|
|
// If we have both encryption and compression, do the encryption first and then
|
|
|
|
|
// compress in order to get smallest result
|
|
|
|
|
(Some(c), Some(e)) => Box::new(ChainCodec::new(e, c)),
|
|
|
|
|
|
|
|
|
|
// If we just have compression, pass along the compression codec
|
|
|
|
|
(Some(c), None) => Box::new(c),
|
|
|
|
|
|
|
|
|
|
// If we just have encryption, pass along the encryption codec
|
|
|
|
|
(None, Some(e)) => Box::new(e),
|
|
|
|
|
|
|
|
|
|
// If we have neither compression nor encryption, use a plaintext codec
|
|
|
|
|
(None, None) => Box::new(PlainCodec::new()),
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
@ -310,115 +438,6 @@ where
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<T, const CAPACITY: usize> FramedTransport<T, CAPACITY>
|
|
|
|
|
where
|
|
|
|
|
T: Transport,
|
|
|
|
|
{
|
|
|
|
|
/// Reads a frame of bytes by using the [`Codec`] tied to this transport. Returns
|
|
|
|
|
/// `Ok(Some(frame))` upon reading a frame, or `Ok(None)` if the underlying transport has
|
|
|
|
|
/// closed.
|
|
|
|
|
///
|
|
|
|
|
/// This call may return an error with [`ErrorKind::WouldBlock`] in the case that the transport
|
|
|
|
|
/// is not ready to read data or has not received a full frame before waiting.
|
|
|
|
|
///
|
|
|
|
|
/// [`ErrorKind::WouldBlock`]: io::ErrorKind::WouldBlock
|
|
|
|
|
pub fn try_read_frame(&mut self) -> io::Result<Option<OwnedFrame>> {
|
|
|
|
|
// Continually read bytes into the incoming queue and then attempt to tease out a frame
|
|
|
|
|
let mut buf = [0; CAPACITY];
|
|
|
|
|
|
|
|
|
|
loop {
|
|
|
|
|
match self.inner.try_read(&mut buf) {
|
|
|
|
|
// Getting 0 bytes on read indicates the channel has closed. If we were still
|
|
|
|
|
// expecting more bytes for our frame, then this is an error, otherwise if we
|
|
|
|
|
// have nothing remaining if our queue then this is an expected end and we
|
|
|
|
|
// return None
|
|
|
|
|
Ok(0) if self.incoming.is_empty() => return Ok(None),
|
|
|
|
|
Ok(0) => return Err(io::Error::from(io::ErrorKind::UnexpectedEof)),
|
|
|
|
|
|
|
|
|
|
// Got some additional bytes, which we will add to our queue and then attempt to
|
|
|
|
|
// decode into a frame
|
|
|
|
|
Ok(n) => {
|
|
|
|
|
self.incoming.extend_from_slice(&buf[..n]);
|
|
|
|
|
|
|
|
|
|
// Attempt to read a frame, returning the decoded frame if we get one,
|
|
|
|
|
// continuing to try to read more bytes if we don't find a frame, and returning
|
|
|
|
|
// any error that is encountered from reading frames or failing to decode
|
|
|
|
|
let frame = match Frame::read(&mut self.incoming) {
|
|
|
|
|
Ok(Some(frame)) => frame,
|
|
|
|
|
Ok(None) => continue,
|
|
|
|
|
Err(x) => return Err(x),
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
return Ok(Some(self.codec.decode(frame)?.into_owned()));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Any error (including WouldBlock) will get bubbled up
|
|
|
|
|
Err(x) => return Err(x),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Continues to invoke [`try_read_frame`] until a frame is successfully read, an error is
|
|
|
|
|
/// encountered that is not [`ErrorKind::WouldBlock`], or the underlying transport has closed.
|
|
|
|
|
///
|
|
|
|
|
/// [`try_read_frame`]: FramedTransport::try_read_frame
|
|
|
|
|
/// [`ErrorKind::WouldBlock`]: io::ErrorKind::WouldBlock
|
|
|
|
|
pub async fn read_frame(&mut self) -> io::Result<Option<OwnedFrame>> {
|
|
|
|
|
loop {
|
|
|
|
|
self.readable().await?;
|
|
|
|
|
|
|
|
|
|
match self.try_read_frame() {
|
|
|
|
|
Err(x) if x.kind() == io::ErrorKind::WouldBlock => continue,
|
|
|
|
|
x => return x,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Writes a `frame` of bytes by using the [`Codec`] tied to this transport.
|
|
|
|
|
///
|
|
|
|
|
/// This is accomplished by continually calling the inner transport's `try_write`. If 0 is
|
|
|
|
|
/// returned from a call to `try_write`, this will fail with [`ErrorKind::WriteZero`].
|
|
|
|
|
///
|
|
|
|
|
/// This call may return an error with [`ErrorKind::WouldBlock`] in the case that the transport
|
|
|
|
|
/// is not ready to write data or has not written the entire frame before waiting.
|
|
|
|
|
///
|
|
|
|
|
/// [`ErrorKind::WriteZero`]: io::ErrorKind::WriteZero
|
|
|
|
|
/// [`ErrorKind::WouldBlock`]: io::ErrorKind::WouldBlock
|
|
|
|
|
pub fn try_write_frame<'a>(&mut self, frame: impl Into<Frame<'a>>) -> io::Result<()> {
|
|
|
|
|
// Encode the frame and store it in our outgoing queue
|
|
|
|
|
let frame = self.codec.encode(frame.into())?;
|
|
|
|
|
frame.write(&mut self.outgoing)?;
|
|
|
|
|
|
|
|
|
|
// Attempt to write everything in our queue
|
|
|
|
|
self.try_flush()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Invokes [`try_write_frame`] followed by a continuous calls to [`try_flush`] until a frame
|
|
|
|
|
/// is successfully written, an error is encountered that is not [`ErrorKind::WouldBlock`], or
|
|
|
|
|
/// the underlying transport has closed.
|
|
|
|
|
///
|
|
|
|
|
/// [`try_write_frame`]: FramedTransport::try_write_frame
|
|
|
|
|
/// [`try_flush`]: FramedTransport::try_flush
|
|
|
|
|
/// [`ErrorKind::WouldBlock`]: io::ErrorKind::WouldBlock
|
|
|
|
|
pub async fn write_frame<'a>(&mut self, frame: impl Into<Frame<'a>>) -> io::Result<()> {
|
|
|
|
|
self.writeable().await?;
|
|
|
|
|
|
|
|
|
|
match self.try_write_frame(frame) {
|
|
|
|
|
// Would block, so continually try to flush until good to go
|
|
|
|
|
Err(x) if x.kind() == io::ErrorKind::WouldBlock => loop {
|
|
|
|
|
self.writeable().await?;
|
|
|
|
|
match self.try_flush() {
|
|
|
|
|
Err(x) if x.kind() == io::ErrorKind::WouldBlock => continue,
|
|
|
|
|
x => return x,
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
// Already fully succeeded or failed
|
|
|
|
|
x => x,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[async_trait]
|
|
|
|
|
impl<T, const CAPACITY: usize> Reconnectable for FramedTransport<T, CAPACITY>
|
|
|
|
|
where
|
|
|
|
@ -447,6 +466,19 @@ impl<const CAPACITY: usize> FramedTransport<super::InmemoryTransport, CAPACITY>
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
|
impl FramedTransport<super::InmemoryTransport> {
|
|
|
|
|
/// Generates a test pair with default capacity
|
|
|
|
|
pub fn test_pair(
|
|
|
|
|
buffer: usize,
|
|
|
|
|
) -> (
|
|
|
|
|
FramedTransport<super::InmemoryTransport>,
|
|
|
|
|
FramedTransport<super::InmemoryTransport>,
|
|
|
|
|
) {
|
|
|
|
|
Self::pair(buffer)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
|
mod tests {
|
|
|
|
|
use super::*;
|
|
|
|
|