Update inmemory reconnect to work when both channels open and fail with connection refused if either closed

pull/146/head
Chip Senkbeil 2 years ago
parent ecc86cf8a4
commit e0c8d94592
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

@ -92,12 +92,16 @@ impl InmemoryTransport {
#[async_trait]
impl Reconnectable for InmemoryTransport {
/// Once the underlying channels have closed, there is no way for this transport to
/// re-establish those channels; therefore, reconnecting will always fail with
/// [`ErrorKind::Unsupported`]
/// re-establish those channels; therefore, reconnecting will fail with
/// [`ErrorKind::ConnectionRefused`] if either underlying channel has closed.
///
/// [`ErrorKind::Unsupported`]: io::ErrorKind::Unsupported
/// [`ErrorKind::ConnectionRefused`]: io::ErrorKind::ConnectionRefused
async fn reconnect(&mut self) -> io::Result<()> {
Err(io::Error::from(io::ErrorKind::Unsupported))
if self.tx.is_closed() || self.is_rx_closed() {
Err(io::Error::from(io::ErrorKind::ConnectionRefused))
} else {
Ok(())
}
}
}
@ -333,17 +337,38 @@ mod tests {
}
#[test(tokio::test)]
async fn reconnect_should_fail_as_unsupported() {
async fn reconnect_should_fail_if_read_channel_closed() {
let (write_tx, _write_rx) = mpsc::channel(1);
let (_, read_rx) = mpsc::channel(1);
let mut transport = InmemoryTransport::new(write_tx, read_rx);
assert_eq!(
transport.reconnect().await.unwrap_err().kind(),
io::ErrorKind::ConnectionRefused
);
}
#[test(tokio::test)]
async fn reconnect_should_fail_if_write_channel_closed() {
let (write_tx, _) = mpsc::channel(1);
let (_read_tx, read_rx) = mpsc::channel(1);
let mut transport = InmemoryTransport::new(write_tx, read_rx);
assert_eq!(
transport.reconnect().await.unwrap_err().kind(),
io::ErrorKind::Unsupported
io::ErrorKind::ConnectionRefused
);
}
#[test(tokio::test)]
async fn reconnect_should_succeed_if_both_channels_open() {
let (write_tx, _write_rx) = mpsc::channel(1);
let (_read_tx, read_rx) = mpsc::channel(1);
let mut transport = InmemoryTransport::new(write_tx, read_rx);
transport.reconnect().await.unwrap();
}
#[test(tokio::test)]
async fn ready_should_report_read_closed_if_channel_closed_and_internal_buf_empty() {
let (write_tx, _write_rx) = mpsc::channel(1);

Loading…
Cancel
Save