From a544587bab3ec83fa4f4af6cd14f88b931ed1524 Mon Sep 17 00:00:00 2001 From: Chip Senkbeil Date: Sun, 20 Nov 2022 22:29:20 -0600 Subject: [PATCH] Add ConnectionState & ConnectionWatcher; update server to drop connection on read error --- CHANGELOG.md | 12 +++++ distant-net/src/client.rs | 48 ++++++++++++++++++- distant-net/src/client/reconnect.rs | 72 ++++++++++++++++++++++++++++ distant-net/src/server/connection.rs | 6 +-- src/cli/client.rs | 6 +++ 5 files changed, 138 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index df381d6..d500398 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- New `ConnectionState` and `ConnectionWatcher` to support watching changes to + the client connection, supporting `clone_connection_watcher` and + `on_connection_change` methods for the client + +### Changed + +- Server will now drop the connection if it receives an error (other than + WouldBlock) while trying to read from the transport, rather than just logging + the error, regardless of whether the error is resumable + ## [0.20.0-alpha.1] - 2022-11-19 **NOTE: This is incomplete as v0.20.0 is a near-complete rewrite internally.** diff --git a/distant-net/src/client.rs b/distant-net/src/client.rs index fa3fba3..aaa2e22 100644 --- a/distant-net/src/client.rs +++ b/distant-net/src/client.rs @@ -11,7 +11,7 @@ use std::{ time::Duration, }; use tokio::{ - sync::{mpsc, oneshot}, + sync::{mpsc, oneshot, watch}, task::JoinHandle, }; @@ -39,6 +39,9 @@ pub struct UntypedClient { /// Used to send requests to a server. channel: UntypedChannel, + /// Used to watch for changes in the connection state. + watcher: ConnectionWatcher, + /// Used to send shutdown request to inner task. shutdown: Box, @@ -61,6 +64,7 @@ impl UntypedClient { pub fn into_typed_client(self) -> Client { Client { channel: self.channel.into_typed_channel(), + watcher: self.watcher, shutdown: self.shutdown, task: self.task, } @@ -102,6 +106,20 @@ impl UntypedClient { self.shutdown.shutdown().await } + /// Clones the underlying [`ConnectionStateWatcher`] for the client. + pub fn clone_connection_watcher(&self) -> ConnectionWatcher { + self.watcher.clone() + } + + /// Spawns a new task that continually monitors for connection changes and invokes the function + /// `f` whenever a new change is detected. + pub fn on_connection_change(&self, f: F) -> JoinHandle<()> + where + F: FnMut(ConnectionState) + Send + 'static, + { + self.watcher.on_change(f) + } + /// Returns true if client's underlying event processing has finished/terminated. pub fn is_finished(&self) -> bool { self.task.is_finished() @@ -143,6 +161,7 @@ impl UntypedClient { // Start a task that continually checks for responses and delivers them using the // post office let shutdown_tx_2 = shutdown_tx.clone(); + let (watcher_tx, watcher_rx) = watch::channel(ConnectionState::Connected); let task = tokio::spawn(async move { let mut needs_reconnect = false; @@ -163,10 +182,12 @@ impl UntypedClient { match strategy.reconnect(&mut connection).await { Ok(x) => { needs_reconnect = false; + watcher_tx.send_replace(ConnectionState::Connected); x } Err(x) => { error!("Unable to re-establish connection: {x}"); + watcher_tx.send_replace(ConnectionState::Disconnected); break Err(x); } } @@ -178,6 +199,7 @@ impl UntypedClient { debug!("Client got shutdown signal, so exiting event loop"); let cb = cb.expect("Impossible: shutdown channel closed!"); let _ = cb.send(Ok(())); + watcher_tx.send_replace(ConnectionState::Disconnected); break Ok(()); } result = connection.ready(Interest::READABLE | Interest::WRITABLE) => { @@ -186,6 +208,7 @@ impl UntypedClient { Err(x) => { error!("Failed to examine ready state: {x}"); needs_reconnect = true; + watcher_tx.send_replace(ConnectionState::Reconnecting); continue; } } @@ -222,12 +245,14 @@ impl UntypedClient { Ok(None) => { debug!("Connection closed"); needs_reconnect = true; + watcher_tx.send_replace(ConnectionState::Reconnecting); continue; } Err(x) if x.kind() == io::ErrorKind::WouldBlock => read_blocked = true, Err(x) => { error!("Failed to read next frame: {x}"); needs_reconnect = true; + watcher_tx.send_replace(ConnectionState::Reconnecting); continue; } } @@ -250,6 +275,7 @@ impl UntypedClient { Err(x) => { error!("Failed to write frame: {x}"); needs_reconnect = true; + watcher_tx.send_replace(ConnectionState::Reconnecting); continue; } } @@ -267,6 +293,7 @@ impl UntypedClient { Err(x) => { error!("Failed to flush outgoing data: {x}"); needs_reconnect = true; + watcher_tx.send_replace(ConnectionState::Reconnecting); continue; } } @@ -287,6 +314,7 @@ impl UntypedClient { Self { channel, + watcher: ConnectionWatcher(watcher_rx), shutdown: Box::new(shutdown_tx), task, } @@ -318,6 +346,9 @@ pub struct Client { /// Used to send requests to a server. channel: Channel, + /// Used to watch for changes in the connection state. + watcher: ConnectionWatcher, + /// Used to send shutdown request to inner task. shutdown: Box, @@ -344,6 +375,7 @@ where pub fn into_untyped_client(self) -> UntypedClient { UntypedClient { channel: self.channel.into_untyped_channel(), + watcher: self.watcher, shutdown: self.shutdown, task: self.task, } @@ -440,6 +472,20 @@ impl Client { self.shutdown.shutdown().await } + /// Clones the underlying [`ConnectionStateWatcher`] for the client. + pub fn clone_connection_watcher(&self) -> ConnectionWatcher { + self.watcher.clone() + } + + /// Spawns a new task that continually monitors for connection changes and invokes the function + /// `f` whenever a new change is detected. + pub fn on_connection_change(&self, f: F) -> JoinHandle<()> + where + F: FnMut(ConnectionState) + Send + 'static, + { + self.watcher.on_change(f) + } + /// Returns true if client's underlying event processing has finished/terminated. pub fn is_finished(&self) -> bool { self.task.is_finished() diff --git a/distant-net/src/client/reconnect.rs b/distant-net/src/client/reconnect.rs index f3d1ffd..2162826 100644 --- a/distant-net/src/client/reconnect.rs +++ b/distant-net/src/client/reconnect.rs @@ -1,6 +1,78 @@ use super::Reconnectable; use std::io; use std::time::Duration; +use strum::Display; +use tokio::sync::watch; +use tokio::task::JoinHandle; + +/// Represents a watcher over a [`ConnectionState`]. +#[derive(Clone)] +pub struct ConnectionWatcher(pub(super) watch::Receiver); + +impl ConnectionWatcher { + /// Returns next [`ConnectionState`] after a change is detected, or `None` if no more changes + /// will be detected. + pub async fn next(&mut self) -> Option { + self.0.changed().await.ok()?; + Some(self.last()) + } + + /// Returns true if the connection state has changed. + pub fn has_changed(&self) -> bool { + self.0.has_changed().ok().unwrap_or(false) + } + + /// Returns the last [`ConnectionState`] observed. + pub fn last(&self) -> ConnectionState { + *self.0.borrow() + } + + /// Spawns a new task that continually monitors for connection state changes and invokes the + /// function `f` whenever a new change is detected. + pub fn on_change(&self, mut f: F) -> JoinHandle<()> + where + F: FnMut(ConnectionState) + Send + 'static, + { + let rx = self.0.clone(); + tokio::spawn(async move { + let mut watcher = Self(rx); + while let Some(state) = watcher.next().await { + f(state); + } + }) + } +} + +/// Represents the state of a connection. +#[derive(Copy, Clone, Debug, Display, PartialEq, Eq)] +#[strum(serialize_all = "snake_case")] +pub enum ConnectionState { + /// Connection is not active, but currently going through reconnection process. + Reconnecting, + + /// Connection is active. + Connected, + + /// Connection is not active. + Disconnected, +} + +impl ConnectionState { + /// Returns true if reconnecting. + pub fn is_reconnecting(&self) -> bool { + matches!(self, Self::Reconnecting) + } + + /// Returns true if connected. + pub fn is_connected(&self) -> bool { + matches!(self, Self::Connected) + } + + /// Returns true if disconnected. + pub fn is_disconnected(&self) -> bool { + matches!(self, Self::Disconnected) + } +} /// Represents the strategy to apply when attempting to reconnect the client to the server. #[derive(Clone, Debug)] diff --git a/distant-net/src/server/connection.rs b/distant-net/src/server/connection.rs index c5e3c11..8899ad9 100644 --- a/distant-net/src/server/connection.rs +++ b/distant-net/src/server/connection.rs @@ -348,11 +348,7 @@ where } Err(x) if x.kind() == io::ErrorKind::WouldBlock => read_blocked = true, Err(x) => { - // NOTE: We do NOT break out of the loop, as this could happen - // if someone sends bad data at any point, but does not - // mean that the reader itself has failed. This can - // happen from getting non-compliant typed data - error!("[Conn {id}] {x}"); + terminate_connection!(@error "[Conn {id}] {x}"); } } } diff --git a/src/cli/client.rs b/src/cli/client.rs index 25175c8..59ca261 100644 --- a/src/cli/client.rs +++ b/src/cli/client.rs @@ -48,6 +48,12 @@ impl Client { /// the [`NetworkConfig`] provided to the client earlier. Will return a new instance /// of the [`ManagerClient`] upon successful connection pub async fn connect(self) -> anyhow::Result { + let client = self.connect_impl().await?; + client.on_connection_change(|state| debug!("Client is now {state}")); + Ok(client) + } + + async fn connect_impl(self) -> anyhow::Result { #[cfg(unix)] { let mut maybe_client = None;