|
|
|
@ -158,6 +158,19 @@ impl<T: Transport> FramedTransport<T> {
|
|
|
|
|
///
|
|
|
|
|
/// [`ErrorKind::WouldBlock`]: io::ErrorKind::WouldBlock
|
|
|
|
|
pub fn try_read_frame(&mut self) -> io::Result<Option<OwnedFrame>> {
|
|
|
|
|
// If we have data remaining in the buffer, we first try to parse it in case we received
|
|
|
|
|
// multiple frames from a previous call.
|
|
|
|
|
//
|
|
|
|
|
// NOTE: This exists to avoid the situation where there is a valid frame remaining in the
|
|
|
|
|
// incoming buffer, but it is never evaluated because a call to `try_read` returns
|
|
|
|
|
// `WouldBlock`, 0 bytes, or some other error.
|
|
|
|
|
if !self.incoming.is_empty() {
|
|
|
|
|
match Frame::read(&mut self.incoming) {
|
|
|
|
|
Ok(None) => (),
|
|
|
|
|
x => return x,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Continually read bytes into the incoming queue and then attempt to tease out a frame
|
|
|
|
|
let mut buf = [0; READ_BUF_SIZE];
|
|
|
|
|
|
|
|
|
@ -745,6 +758,47 @@ mod tests {
|
|
|
|
|
assert_eq!(transport.try_read_frame().unwrap().unwrap(), b"hello world");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn try_read_frame_should_return_next_available_frame_if_already_in_incoming_buffer() {
|
|
|
|
|
// Store two frames in our data to transmit
|
|
|
|
|
let data = {
|
|
|
|
|
let mut data = BytesMut::new();
|
|
|
|
|
Frame::new(b"hello world").write(&mut data).unwrap();
|
|
|
|
|
Frame::new(b"hello again").write(&mut data).unwrap();
|
|
|
|
|
data.freeze()
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Configure transport to return both frames in single read such that we have another
|
|
|
|
|
// complete frame to parse (in the case that an underlying try_read would block, but we had
|
|
|
|
|
// data available before that)
|
|
|
|
|
let mut transport = FramedTransport::new(
|
|
|
|
|
TestTransport {
|
|
|
|
|
f_try_read: Box::new(move |buf| {
|
|
|
|
|
static mut CNT: usize = 0;
|
|
|
|
|
unsafe {
|
|
|
|
|
CNT += 1;
|
|
|
|
|
if CNT == 2 {
|
|
|
|
|
Err(io::Error::from(io::ErrorKind::WouldBlock))
|
|
|
|
|
} else {
|
|
|
|
|
let n = data.len();
|
|
|
|
|
buf[..data.len()].copy_from_slice(data.as_ref());
|
|
|
|
|
Ok(n)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}),
|
|
|
|
|
f_ready: Box::new(|_| Ok(Ready::READABLE)),
|
|
|
|
|
..Default::default()
|
|
|
|
|
},
|
|
|
|
|
Box::new(OkCodec),
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// Read first frame
|
|
|
|
|
assert_eq!(transport.try_read_frame().unwrap().unwrap(), b"hello world");
|
|
|
|
|
|
|
|
|
|
// Read second frame
|
|
|
|
|
assert_eq!(transport.try_read_frame().unwrap().unwrap(), b"hello again");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn try_read_frame_should_keep_reading_until_a_frame_is_found() {
|
|
|
|
|
const STEP_SIZE: usize = Frame::HEADER_SIZE + 7;
|
|
|
|
|