Add ConnectionState & ConnectionWatcher; update server to drop connection on read error

pull/156/head
Chip Senkbeil 2 years ago
parent 1c393ef723
commit a544587bab
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

@ -7,6 +7,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Added
- New `ConnectionState` and `ConnectionWatcher` to support watching changes to
the client connection, supporting `clone_connection_watcher` and
`on_connection_change` methods for the client
### Changed
- Server will now drop the connection if it receives an error (other than
WouldBlock) while trying to read from the transport, rather than just logging
the error, regardless of whether the error is resumable
## [0.20.0-alpha.1] - 2022-11-19
**NOTE: This is incomplete as v0.20.0 is a near-complete rewrite internally.**

@ -11,7 +11,7 @@ use std::{
time::Duration,
};
use tokio::{
sync::{mpsc, oneshot},
sync::{mpsc, oneshot, watch},
task::JoinHandle,
};
@ -39,6 +39,9 @@ pub struct UntypedClient {
/// Used to send requests to a server.
channel: UntypedChannel,
/// Used to watch for changes in the connection state.
watcher: ConnectionWatcher,
/// Used to send shutdown request to inner task.
shutdown: Box<dyn Shutdown>,
@ -61,6 +64,7 @@ impl UntypedClient {
pub fn into_typed_client<T, U>(self) -> Client<T, U> {
Client {
channel: self.channel.into_typed_channel(),
watcher: self.watcher,
shutdown: self.shutdown,
task: self.task,
}
@ -102,6 +106,20 @@ impl UntypedClient {
self.shutdown.shutdown().await
}
/// Clones the underlying [`ConnectionStateWatcher`] for the client.
pub fn clone_connection_watcher(&self) -> ConnectionWatcher {
self.watcher.clone()
}
/// Spawns a new task that continually monitors for connection changes and invokes the function
/// `f` whenever a new change is detected.
pub fn on_connection_change<F>(&self, f: F) -> JoinHandle<()>
where
F: FnMut(ConnectionState) + Send + 'static,
{
self.watcher.on_change(f)
}
/// Returns true if client's underlying event processing has finished/terminated.
pub fn is_finished(&self) -> bool {
self.task.is_finished()
@ -143,6 +161,7 @@ impl UntypedClient {
// Start a task that continually checks for responses and delivers them using the
// post office
let shutdown_tx_2 = shutdown_tx.clone();
let (watcher_tx, watcher_rx) = watch::channel(ConnectionState::Connected);
let task = tokio::spawn(async move {
let mut needs_reconnect = false;
@ -163,10 +182,12 @@ impl UntypedClient {
match strategy.reconnect(&mut connection).await {
Ok(x) => {
needs_reconnect = false;
watcher_tx.send_replace(ConnectionState::Connected);
x
}
Err(x) => {
error!("Unable to re-establish connection: {x}");
watcher_tx.send_replace(ConnectionState::Disconnected);
break Err(x);
}
}
@ -178,6 +199,7 @@ impl UntypedClient {
debug!("Client got shutdown signal, so exiting event loop");
let cb = cb.expect("Impossible: shutdown channel closed!");
let _ = cb.send(Ok(()));
watcher_tx.send_replace(ConnectionState::Disconnected);
break Ok(());
}
result = connection.ready(Interest::READABLE | Interest::WRITABLE) => {
@ -186,6 +208,7 @@ impl UntypedClient {
Err(x) => {
error!("Failed to examine ready state: {x}");
needs_reconnect = true;
watcher_tx.send_replace(ConnectionState::Reconnecting);
continue;
}
}
@ -222,12 +245,14 @@ impl UntypedClient {
Ok(None) => {
debug!("Connection closed");
needs_reconnect = true;
watcher_tx.send_replace(ConnectionState::Reconnecting);
continue;
}
Err(x) if x.kind() == io::ErrorKind::WouldBlock => read_blocked = true,
Err(x) => {
error!("Failed to read next frame: {x}");
needs_reconnect = true;
watcher_tx.send_replace(ConnectionState::Reconnecting);
continue;
}
}
@ -250,6 +275,7 @@ impl UntypedClient {
Err(x) => {
error!("Failed to write frame: {x}");
needs_reconnect = true;
watcher_tx.send_replace(ConnectionState::Reconnecting);
continue;
}
}
@ -267,6 +293,7 @@ impl UntypedClient {
Err(x) => {
error!("Failed to flush outgoing data: {x}");
needs_reconnect = true;
watcher_tx.send_replace(ConnectionState::Reconnecting);
continue;
}
}
@ -287,6 +314,7 @@ impl UntypedClient {
Self {
channel,
watcher: ConnectionWatcher(watcher_rx),
shutdown: Box::new(shutdown_tx),
task,
}
@ -318,6 +346,9 @@ pub struct Client<T, U> {
/// Used to send requests to a server.
channel: Channel<T, U>,
/// Used to watch for changes in the connection state.
watcher: ConnectionWatcher,
/// Used to send shutdown request to inner task.
shutdown: Box<dyn Shutdown>,
@ -344,6 +375,7 @@ where
pub fn into_untyped_client(self) -> UntypedClient {
UntypedClient {
channel: self.channel.into_untyped_channel(),
watcher: self.watcher,
shutdown: self.shutdown,
task: self.task,
}
@ -440,6 +472,20 @@ impl<T, U> Client<T, U> {
self.shutdown.shutdown().await
}
/// Clones the underlying [`ConnectionStateWatcher`] for the client.
pub fn clone_connection_watcher(&self) -> ConnectionWatcher {
self.watcher.clone()
}
/// Spawns a new task that continually monitors for connection changes and invokes the function
/// `f` whenever a new change is detected.
pub fn on_connection_change<F>(&self, f: F) -> JoinHandle<()>
where
F: FnMut(ConnectionState) + Send + 'static,
{
self.watcher.on_change(f)
}
/// Returns true if client's underlying event processing has finished/terminated.
pub fn is_finished(&self) -> bool {
self.task.is_finished()

@ -1,6 +1,78 @@
use super::Reconnectable;
use std::io;
use std::time::Duration;
use strum::Display;
use tokio::sync::watch;
use tokio::task::JoinHandle;
/// Represents a watcher over a [`ConnectionState`].
#[derive(Clone)]
pub struct ConnectionWatcher(pub(super) watch::Receiver<ConnectionState>);
impl ConnectionWatcher {
/// Returns next [`ConnectionState`] after a change is detected, or `None` if no more changes
/// will be detected.
pub async fn next(&mut self) -> Option<ConnectionState> {
self.0.changed().await.ok()?;
Some(self.last())
}
/// Returns true if the connection state has changed.
pub fn has_changed(&self) -> bool {
self.0.has_changed().ok().unwrap_or(false)
}
/// Returns the last [`ConnectionState`] observed.
pub fn last(&self) -> ConnectionState {
*self.0.borrow()
}
/// Spawns a new task that continually monitors for connection state changes and invokes the
/// function `f` whenever a new change is detected.
pub fn on_change<F>(&self, mut f: F) -> JoinHandle<()>
where
F: FnMut(ConnectionState) + Send + 'static,
{
let rx = self.0.clone();
tokio::spawn(async move {
let mut watcher = Self(rx);
while let Some(state) = watcher.next().await {
f(state);
}
})
}
}
/// Represents the state of a connection.
#[derive(Copy, Clone, Debug, Display, PartialEq, Eq)]
#[strum(serialize_all = "snake_case")]
pub enum ConnectionState {
/// Connection is not active, but currently going through reconnection process.
Reconnecting,
/// Connection is active.
Connected,
/// Connection is not active.
Disconnected,
}
impl ConnectionState {
/// Returns true if reconnecting.
pub fn is_reconnecting(&self) -> bool {
matches!(self, Self::Reconnecting)
}
/// Returns true if connected.
pub fn is_connected(&self) -> bool {
matches!(self, Self::Connected)
}
/// Returns true if disconnected.
pub fn is_disconnected(&self) -> bool {
matches!(self, Self::Disconnected)
}
}
/// Represents the strategy to apply when attempting to reconnect the client to the server.
#[derive(Clone, Debug)]

@ -348,11 +348,7 @@ where
}
Err(x) if x.kind() == io::ErrorKind::WouldBlock => read_blocked = true,
Err(x) => {
// NOTE: We do NOT break out of the loop, as this could happen
// if someone sends bad data at any point, but does not
// mean that the reader itself has failed. This can
// happen from getting non-compliant typed data
error!("[Conn {id}] {x}");
terminate_connection!(@error "[Conn {id}] {x}");
}
}
}

@ -48,6 +48,12 @@ impl<T: AuthHandler + Clone> Client<T> {
/// the [`NetworkConfig`] provided to the client earlier. Will return a new instance
/// of the [`ManagerClient`] upon successful connection
pub async fn connect(self) -> anyhow::Result<ManagerClient> {
let client = self.connect_impl().await?;
client.on_connection_change(|state| debug!("Client is now {state}"));
Ok(client)
}
async fn connect_impl(self) -> anyhow::Result<ManagerClient> {
#[cfg(unix)]
{
let mut maybe_client = None;

Loading…
Cancel
Save