Fix loop logic for client/server to mark blocked status if not readable/writable

pull/146/head
Chip Senkbeil 2 years ago
parent 4b75e8a2bd
commit 371c6538b7
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

@ -88,8 +88,8 @@ where
}
};
let mut read_blocked = false;
let mut write_blocked = false;
let mut read_blocked = !ready.is_readable();
let mut write_blocked = !ready.is_writable();
if ready.is_readable() {
match transport.try_read_frame() {

@ -214,6 +214,7 @@ where
let (tx, mut rx) = mpsc::channel::<Response<H::Response>>(1);
// Perform a handshake to ensure that the connection is properly established
debug!("[Conn {id}] Performing handshake");
let mut transport: FramedTransport<T> =
match FramedTransport::from_server_handshake(transport).await {
Ok(x) => x,
@ -223,6 +224,7 @@ where
};
// Perform authentication to ensure the connection is valid
debug!("[Conn {id}] Verifying connection");
match Weak::upgrade(&verifier) {
Some(verifier) => {
if let Err(x) = verifier.verify(&mut transport).await {
@ -236,6 +238,7 @@ where
// Create local data for the connection and then process it as well as perform
// authentication and any other tasks on first connecting
debug!("[Conn {id}] Accepting connection");
let mut local_data = H::LocalData::default();
if let Err(x) = handler
.on_accept(ConnectionCtx {
@ -261,8 +264,8 @@ where
};
// Keep track of whether we read or wrote anything
let mut read_blocked = false;
let mut write_blocked = false;
let mut read_blocked = !ready.is_readable();
let mut write_blocked = !ready.is_writable();
if ready.is_readable() {
match transport.try_read_frame() {

@ -99,7 +99,27 @@ impl<T> fmt::Debug for FramedTransport<T> {
impl<T: Transport> FramedTransport<T> {
/// Waits for the transport to be ready based on the given interest, returning the ready status
pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
Transport::ready(&self.inner, interest).await
// If interest includes reading, we check if we already have a frame in our queue,
// as there can be a scenario where a frame was received and then the connection
// was closed, and we still want to be able to read the next frame is if it is
// available in the connection.
let ready = if interest.is_readable() && Frame::available(&self.incoming) {
Ready::READABLE
} else {
Ready::EMPTY
};
// If we know that we are readable and not checking for write status, we can short-circuit
// to avoid an async call by returning immediately that we are readable
if !interest.is_writable() && ready.is_readable() {
return Ok(ready);
}
// Otherwise, we need to check the status using the underlying transport and merge it with
// our current understanding based on internal state
Transport::ready(&self.inner, interest)
.await
.map(|r| r | ready)
}
/// Waits for the transport to be readable to follow up with `try_read`

@ -100,6 +100,12 @@ impl Frame<'_> {
Ok(Some(Frame::from(item)))
}
/// Checks if a full frame is available from `src`, returning true if a frame was found false
/// if the current `src` does not contain a frame. Does not consume the frame.
pub fn available(src: &BytesMut) -> bool {
matches!(Frame::read(&mut src.clone()), Ok(Some(_)))
}
/// Returns a new frame which is identical but has a lifetime tied to this frame.
pub fn as_borrowed(&self) -> Frame<'_> {
let item = match &self.item {

Loading…
Cancel
Save