Fix 100% cpu pegging due to try_flush succeeding with zero bytes written due to outgoing buffer being empty

pull/146/head
Chip Senkbeil 2 years ago
parent a6952a98bb
commit 78a5383dc3
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

@ -90,7 +90,6 @@ where
let mut read_blocked = false;
let mut write_blocked = false;
let mut flush_blocked = false;
if ready.is_readable() {
match transport.try_read_frame() {
@ -121,7 +120,10 @@ where
}
}
}
Ok(None) => (),
Ok(None) => {
debug!("Connection closed");
break;
}
Err(x) if x.kind() == io::ErrorKind::WouldBlock => read_blocked = true,
Err(x) => {
error!("Failed to read next frame: {x}");
@ -130,6 +132,9 @@ where
}
if ready.is_writable() {
// If we get more data to write, attempt to write it, which will result in
// writing any queued bytes as well. Othewise, we attempt to flush any pending
// outgoing bytes that weren't sent earlier.
if let Ok(request) = rx.try_recv() {
match request.to_vec() {
Ok(data) => match transport.try_write_frame(data) {
@ -143,23 +148,26 @@ where
error!("Unable to serialize outgoing request: {x}");
}
}
}
match transport.try_flush() {
Ok(()) => (),
Err(x) if x.kind() == io::ErrorKind::WouldBlock => flush_blocked = true,
Err(x) => {
error!("Failed to flush outgoing data: {x}");
} else {
// In the case of flushing, there are two scenarios in which we want to
// mark no write occurring:
//
// 1. When flush did not write any bytes, which can happen when the buffer
// is empty
// 2. When the call to write bytes blocks
match transport.try_flush() {
Ok(0) => write_blocked = true,
Ok(_) => (),
Err(x) if x.kind() == io::ErrorKind::WouldBlock => write_blocked = true,
Err(x) => {
error!("Failed to flush outgoing data: {x}");
}
}
}
}
// If we did not read or write anything, sleep a bit to offload CPU usage
if read_blocked && write_blocked && flush_blocked {
trace!(
"Client blocked on read and write, so sleeping {}s",
SLEEP_DURATION.as_secs_f32()
);
if read_blocked && write_blocked {
tokio::time::sleep(SLEEP_DURATION).await;
}
}

@ -134,17 +134,25 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::{auth::Authenticator, Client, FramedTransport};
use crate::{Client, FramedTransport, InmemoryTransport};
use std::time::Duration;
use test_log::test;
type TestClient = Client<u8, u8>;
/// Set up two connected transports without any handshake or authentication. This should be
/// okay since we are creating a raw client that
async fn setup(buffer: usize) -> (TestClient, FramedTransport<InmemoryTransport>) {
let (t1, t2) = FramedTransport::pair(buffer);
let client = TestClient::new(t1);
(client, t2)
}
#[test(tokio::test)]
async fn mail_should_return_mailbox_that_receives_responses_until_transport_closes() {
let (t1, mut t2) = FramedTransport::test_pair(100);
let session = TestClient::new(t1);
let mut channel = session.clone_channel();
let (client, mut server) = setup(100).await;
let mut channel = client.clone_channel();
let req = Request::new(0);
let res = Response::new(req.id.clone(), 1);
@ -152,13 +160,13 @@ mod tests {
let mut mailbox = channel.mail(req).await.unwrap();
// Get first response
match tokio::join!(mailbox.next(), t2.write_frame(res.to_vec().unwrap())) {
match tokio::join!(mailbox.next(), server.write_frame(res.to_vec().unwrap())) {
(Some(actual), _) => assert_eq!(actual, res),
x => panic!("Unexpected response: {:?}", x),
}
// Get second response
match tokio::join!(mailbox.next(), t2.write_frame(res.to_vec().unwrap())) {
match tokio::join!(mailbox.next(), server.write_frame(res.to_vec().unwrap())) {
(Some(actual), _) => assert_eq!(actual, res),
x => panic!("Unexpected response: {:?}", x),
}
@ -168,7 +176,7 @@ mod tests {
let next_task = tokio::spawn(async move { mailbox.next().await });
tokio::task::yield_now().await;
drop(t2);
drop(server);
match next_task.await {
Ok(None) => {}
x => panic!("Unexpected response: {:?}", x),
@ -177,14 +185,14 @@ mod tests {
#[test(tokio::test)]
async fn send_should_wait_until_response_received() {
let (t1, mut t2) = FramedTransport::test_pair(100);
let session = TestClient::new(t1);
let mut channel = session.clone_channel();
let (client, mut server) = setup(100).await;
let mut channel = client.clone_channel();
let req = Request::new(0);
let res = Response::new(req.id.clone(), 1);
let (actual, _) = tokio::join!(channel.send(req), t2.write_frame(res.to_vec().unwrap()));
let (actual, _) =
tokio::join!(channel.send(req), server.write_frame(res.to_vec().unwrap()));
match actual {
Ok(actual) => assert_eq!(actual, res),
x => panic!("Unexpected response: {:?}", x),
@ -193,9 +201,8 @@ mod tests {
#[test(tokio::test)]
async fn send_timeout_should_fail_if_response_not_received_in_time() {
let (t1, mut t2) = FramedTransport::test_pair(100);
let session: TestClient = Client::new(t1);
let mut channel = session.clone_channel();
let (client, mut server) = setup(100).await;
let mut channel = client.clone_channel();
let req = Request::new(0);
match channel.send_timeout(req, Duration::from_millis(30)).await {
@ -203,15 +210,14 @@ mod tests {
x => panic!("Unexpected response: {:?}", x),
}
let frame = t2.try_read_frame().unwrap().unwrap();
let frame = server.read_frame().await.unwrap().unwrap();
let _req: Request<u8> = Request::from_slice(frame.as_item()).unwrap();
}
#[test(tokio::test)]
async fn fire_should_send_request_and_not_wait_for_response() {
let (t1, mut t2) = FramedTransport::test_pair(100);
let session: TestClient = Client::new(t1);
let mut channel = session.clone_channel();
let (client, mut server) = setup(100).await;
let mut channel = client.clone_channel();
let req = Request::new(0);
match channel.fire(req).await {
@ -219,7 +225,7 @@ mod tests {
x => panic!("Unexpected response: {:?}", x),
}
let frame = t2.try_read_frame().unwrap().unwrap();
let frame = server.read_frame().await.unwrap().unwrap();
let _req: Request<u8> = Request::from_slice(frame.as_item()).unwrap();
}
}

@ -234,7 +234,6 @@ where
// Keep track of whether we read or wrote anything
let mut read_blocked = false;
let mut write_blocked = false;
let mut flush_blocked = false;
if ready.is_readable() {
match transport.try_read_frame() {
@ -300,6 +299,9 @@ where
// If our socket is ready to be written to, we try to get the next item from
// the queue and process it
if ready.is_writable() {
// If we get more data to write, attempt to write it, which will result in writing
// any queued bytes as well. Othewise, we attempt to flush any pending outgoing
// bytes that weren't sent earlier.
if let Ok(response) = rx.try_recv() {
// Log our message as a string, which can be expensive
if log_enabled!(Level::Trace) {
@ -324,23 +326,26 @@ where
);
}
}
}
match transport.try_flush() {
Ok(()) => (),
Err(x) if x.kind() == io::ErrorKind::WouldBlock => flush_blocked = true,
Err(x) => {
error!("[Conn {connection_id}] Failed to flush outgoing data: {x}");
} else {
// In the case of flushing, there are two scenarios in which we want to
// mark no write occurring:
//
// 1. When flush did not write any bytes, which can happen when the buffer
// is empty
// 2. When the call to write bytes blocks
match transport.try_flush() {
Ok(0) => write_blocked = true,
Ok(_) => (),
Err(x) if x.kind() == io::ErrorKind::WouldBlock => write_blocked = true,
Err(x) => {
error!("[Conn {connection_id}] Failed to flush outgoing data: {x}");
}
}
}
}
// If we did not read or write anything, sleep a bit to offload CPU usage
if read_blocked && write_blocked && flush_blocked {
trace!(
"[Conn {connection_id}] Blocked on read and write, so sleeping {}s",
SLEEP_DURATION.as_secs_f32()
);
if read_blocked && write_blocked {
tokio::time::sleep(SLEEP_DURATION).await;
}
}

@ -115,7 +115,10 @@ impl<T: Transport> FramedTransport<T> {
Ok(())
}
/// Attempts to flush any remaining bytes in the outgoing queue.
/// Attempts to flush any remaining bytes in the outgoing queue, returning the total bytes
/// written as a result of the flush. Note that a return of 0 bytes does not indicate that the
/// underlying transport has closed, but rather that no bytes were flushed such as when the
/// outgoing queue is empty.
///
/// This is accomplished by continually calling the inner transport's `try_write`. If 0 is
/// returned from a call to `try_write`, this will fail with [`ErrorKind::WriteZero`].
@ -124,18 +127,19 @@ impl<T: Transport> FramedTransport<T> {
/// is not ready to write data.
///
/// [`ErrorKind::WouldBlock`]: io::ErrorKind::WouldBlock
pub fn try_flush(&mut self) -> io::Result<()> {
pub fn try_flush(&mut self) -> io::Result<usize> {
let mut bytes_written = 0;
// Continue to send from the outgoing buffer until we either finish or fail
while !self.outgoing.is_empty() {
trace!("try_flush({} bytes)", self.outgoing.len());
match self.inner.try_write(self.outgoing.as_ref()) {
// Getting 0 bytes on write indicates the channel has closed
Ok(0) => return Err(io::Error::from(io::ErrorKind::WriteZero)),
// Successful write will advance the outgoing buffer
Ok(n) => {
trace!("try_flush() successfully flushed {n} bytes");
self.outgoing.advance(n);
bytes_written += n;
}
// Any error (including WouldBlock) will get bubbled up
@ -143,7 +147,7 @@ impl<T: Transport> FramedTransport<T> {
}
}
Ok(())
Ok(bytes_written)
}
/// Reads a frame of bytes by using the [`Codec`] tied to this transport. Returns
@ -202,10 +206,6 @@ impl<T: Transport> FramedTransport<T> {
match self.try_read_frame() {
Err(x) if x.kind() == io::ErrorKind::WouldBlock => {
// NOTE: We sleep for a little bit before trying again to avoid pegging CPU
trace!(
"read_frame() blocked, so sleeping {}s",
SLEEP_DURATION.as_secs_f32()
);
tokio::time::sleep(SLEEP_DURATION).await
}
x => return x,
@ -237,7 +237,9 @@ impl<T: Transport> FramedTransport<T> {
frame.write(&mut self.outgoing)?;
// Attempt to write everything in our queue
self.try_flush()
self.try_flush()?;
Ok(())
}
/// Invokes [`try_write_frame`] followed by a continuous calls to [`try_flush`] until a frame
@ -261,13 +263,10 @@ impl<T: Transport> FramedTransport<T> {
match self.try_flush() {
Err(x) if x.kind() == io::ErrorKind::WouldBlock => {
// NOTE: We sleep for a little bit before trying again to avoid pegging CPU
trace!(
"write_frame() blocked, so sleeping {}s",
SLEEP_DURATION.as_secs_f32()
);
tokio::time::sleep(SLEEP_DURATION).await
}
x => return x,
Err(x) => return Err(x),
Ok(_) => return Ok(()),
}
},
@ -512,7 +511,7 @@ where
impl FramedTransport<InmemoryTransport> {
/// Produces a pair of inmemory transports that are connected to each other using
/// a standard codec
/// a standard codec.
///
/// Sets the buffer for message passing for each underlying transport to the given buffer size
pub fn pair(

Loading…
Cancel
Save