diff --git a/distant-net/src/client.rs b/distant-net/src/client.rs index 0fce532..05350d3 100644 --- a/distant-net/src/client.rs +++ b/distant-net/src/client.rs @@ -212,16 +212,11 @@ impl UntypedClient { // down. let _shutdown_tx = shutdown_tx_2; - // Keep track of block status so we can log appropriately - let mut was_blocked = false; - loop { // If we have flagged that a reconnect is needed, attempt to do so if needs_reconnect { info!("Client encountered issue, attempting to reconnect"); - if log::log_enabled!(log::Level::Debug) { - debug!("Using strategy {reconnect_strategy:?}"); - } + debug!("Using strategy {reconnect_strategy:?}"); match reconnect_strategy.reconnect(&mut connection).await { Ok(()) => { info!("Client successfully reconnected!"); @@ -239,7 +234,7 @@ impl UntypedClient { macro_rules! silence_needs_reconnect { () => {{ - debug!( + info!( "Client exceeded {}s without server activity, so attempting to reconnect", silence_duration.as_secs_f32(), ); @@ -263,7 +258,7 @@ impl UntypedClient { let ready = tokio::select! { // NOTE: This should NEVER return None as we never allow the channel to close. cb = shutdown_rx.recv() => { - debug!("Client got shutdown signal, so exiting event loop"); + info!("Client got shutdown signal, so exiting event loop"); let cb = cb.expect("Impossible: shutdown channel closed!"); let _ = cb.send(Ok(())); watcher_tx.send_replace(ConnectionState::Disconnected); @@ -338,7 +333,7 @@ impl UntypedClient { } Ok(None) => { - debug!("Connection closed"); + info!("Connection closed"); needs_reconnect = true; watcher_tx.send_replace(ConnectionState::Reconnecting); continue; @@ -398,18 +393,6 @@ impl UntypedClient { // If we did not read or write anything, sleep a bit to offload CPU usage if read_blocked && write_blocked { tokio::time::sleep(SLEEP_DURATION).await; - - if !was_blocked { - trace!("Client entering blocked state"); - } - - was_blocked = true; - } else { - if was_blocked { - trace!("Client exiting blocked state"); - } - - was_blocked = false; } } }); diff --git a/distant-net/src/manager/server.rs b/distant-net/src/manager/server.rs index 3af11ba..75ad981 100644 --- a/distant-net/src/manager/server.rs +++ b/distant-net/src/manager/server.rs @@ -188,6 +188,7 @@ impl ServerHandler for ManagerServer { type Response = ManagerResponse; async fn on_request(&self, ctx: RequestCtx) { + debug!("manager::on_request({ctx:?})"); let RequestCtx { connection_id, request, @@ -195,113 +196,161 @@ impl ServerHandler for ManagerServer { } = ctx; let response = match request.payload { - ManagerRequest::Capabilities {} => match self.capabilities().await { - Ok(supported) => ManagerResponse::Capabilities { supported }, - Err(x) => ManagerResponse::from(x), - }, + ManagerRequest::Capabilities {} => { + debug!("Looking up capabilities"); + match self.capabilities().await { + Ok(supported) => ManagerResponse::Capabilities { supported }, + Err(x) => ManagerResponse::from(x), + } + } ManagerRequest::Launch { destination, options, - } => match self - .launch( - *destination, - options, - ManagerAuthenticator { - reply: reply.clone(), - registry: Arc::clone(&self.registry), - }, - ) - .await - { - Ok(destination) => ManagerResponse::Launched { destination }, - Err(x) => ManagerResponse::from(x), - }, + } => { + info!("Launching {destination} with {options}"); + match self + .launch( + *destination, + options, + ManagerAuthenticator { + reply: reply.clone(), + registry: Arc::clone(&self.registry), + }, + ) + .await + { + Ok(destination) => ManagerResponse::Launched { destination }, + Err(x) => ManagerResponse::from(x), + } + } ManagerRequest::Connect { destination, options, - } => match self - .connect( - *destination, - options, - ManagerAuthenticator { - reply: reply.clone(), - registry: Arc::clone(&self.registry), - }, - ) - .await - { - Ok(id) => ManagerResponse::Connected { id }, - Err(x) => ManagerResponse::from(x), - }, + } => { + info!("Connecting to {destination} with {options}"); + match self + .connect( + *destination, + options, + ManagerAuthenticator { + reply: reply.clone(), + registry: Arc::clone(&self.registry), + }, + ) + .await + { + Ok(id) => ManagerResponse::Connected { id }, + Err(x) => ManagerResponse::from(x), + } + } ManagerRequest::Authenticate { id, msg } => { + trace!("Retrieving authentication callback registry"); match self.registry.write().await.remove(&id) { - Some(cb) => match cb.send(msg) { - Ok(_) => return, - Err(_) => ManagerResponse::Error { - description: "Unable to forward authentication callback".to_string(), - }, - }, + Some(cb) => { + trace!("Sending {msg:?} through authentication callback"); + match cb.send(msg) { + Ok(_) => return, + Err(_) => ManagerResponse::Error { + description: "Unable to forward authentication callback" + .to_string(), + }, + } + } None => ManagerResponse::from(io::Error::new( io::ErrorKind::InvalidInput, "Invalid authentication id", )), } } - ManagerRequest::OpenChannel { id } => match self.connections.read().await.get(&id) { - Some(connection) => match connection.open_channel(reply.clone()) { - Ok(channel) => { - debug!("[Conn {id}] Channel {} has been opened", channel.id()); - let id = channel.id(); - self.channels.write().await.insert(id, channel); - ManagerResponse::ChannelOpened { id } + ManagerRequest::OpenChannel { id } => { + debug!("Attempting to retrieve connection {id}"); + match self.connections.read().await.get(&id) { + Some(connection) => { + debug!("Opening channel through connection {id}"); + match connection.open_channel(reply.clone()) { + Ok(channel) => { + info!("[Conn {id}] Channel {} has been opened", channel.id()); + let id = channel.id(); + self.channels.write().await.insert(id, channel); + ManagerResponse::ChannelOpened { id } + } + Err(x) => ManagerResponse::from(x), + } } - Err(x) => ManagerResponse::from(x), - }, - None => ManagerResponse::from(io::Error::new( - io::ErrorKind::NotConnected, - "Connection does not exist", - )), - }, + None => ManagerResponse::from(io::Error::new( + io::ErrorKind::NotConnected, + "Connection does not exist", + )), + } + } ManagerRequest::Channel { id, request } => { + debug!("Attempting to retrieve channel {id}"); match self.channels.read().await.get(&id) { // TODO: For now, we are NOT sending back a response to acknowledge // a successful channel send. We could do this in order for // the client to listen for a complete send, but is it worth it? - Some(channel) => match channel.send(request) { - Ok(_) => return, - Err(x) => ManagerResponse::from(x), - }, + Some(channel) => { + debug!("Sending {request:?} through channel {id}"); + match channel.send(request) { + Ok(_) => return, + Err(x) => ManagerResponse::from(x), + } + } + None => ManagerResponse::from(io::Error::new( + io::ErrorKind::NotConnected, + "Channel is not open or does not exist", + )), + } + } + ManagerRequest::CloseChannel { id } => { + debug!("Attempting to remove channel {id}"); + match self.channels.write().await.remove(&id) { + Some(channel) => { + debug!("Removed channel {}", channel.id()); + match channel.close() { + Ok(_) => { + info!("Channel {id} has been closed"); + ManagerResponse::ChannelClosed { id } + } + Err(x) => ManagerResponse::from(x), + } + } None => ManagerResponse::from(io::Error::new( io::ErrorKind::NotConnected, "Channel is not open or does not exist", )), } } - ManagerRequest::CloseChannel { id } => match self.channels.write().await.remove(&id) { - Some(channel) => match channel.close() { - Ok(_) => { - debug!("Channel {id} has been closed"); - ManagerResponse::ChannelClosed { id } + ManagerRequest::Info { id } => { + debug!("Attempting to retrieve information for connection {id}"); + match self.info(id).await { + Ok(info) => { + info!("Retrieved information for connection {id}"); + ManagerResponse::Info(info) } Err(x) => ManagerResponse::from(x), - }, - None => ManagerResponse::from(io::Error::new( - io::ErrorKind::NotConnected, - "Channel is not open or does not exist", - )), - }, - ManagerRequest::Info { id } => match self.info(id).await { - Ok(info) => ManagerResponse::Info(info), - Err(x) => ManagerResponse::from(x), - }, - ManagerRequest::List => match self.list().await { - Ok(list) => ManagerResponse::List(list), - Err(x) => ManagerResponse::from(x), - }, - ManagerRequest::Kill { id } => match self.kill(id).await { - Ok(()) => ManagerResponse::Killed, - Err(x) => ManagerResponse::from(x), - }, + } + } + ManagerRequest::List => { + debug!("Attempting to retrieve the list of connections"); + match self.list().await { + Ok(list) => { + info!("Retrieved list of connections"); + ManagerResponse::List(list) + } + Err(x) => ManagerResponse::from(x), + } + } + ManagerRequest::Kill { id } => { + debug!("Attempting to kill connection {id}"); + match self.kill(id).await { + Ok(()) => { + info!("Killed connection {id}"); + ManagerResponse::Killed + } + Err(x) => ManagerResponse::from(x), + } + } }; if let Err(x) = reply.send(response).await { diff --git a/distant-net/src/server/connection.rs b/distant-net/src/server/connection.rs index e6be9c6..c66cfc3 100644 --- a/distant-net/src/server/connection.rs +++ b/distant-net/src/server/connection.rs @@ -453,9 +453,6 @@ where // Store our connection details state.connections.write().await.insert(id, connection_state); - // Keep track of block status so we can log appropriately - let mut was_blocked = false; - debug!("[Conn {id}] Beginning read/write loop"); loop { let ready = match await_or_shutdown!( @@ -477,10 +474,14 @@ where Ok(Some(frame)) => match UntypedRequest::from_slice(frame.as_item()) { Ok(request) => match request.to_typed_request() { Ok(request) => { - debug!( - "[Conn {id}] New request {} | header {}", - request.id, request.header - ); + if log::log_enabled!(Level::Debug) { + let debug_header = if !request.header.is_empty() { + format!(" | header {}", request.header) + } else { + String::new() + }; + debug!("[Conn {id}] New request {}{debug_header}", request.id); + } let origin_id = request.id.clone(); let ctx = RequestCtx { connection_id: id, @@ -580,18 +581,6 @@ where // If we did not read or write anything, sleep a bit to offload CPU usage if read_blocked && write_blocked { tokio::time::sleep(sleep_duration).await; - - if !was_blocked { - trace!("[Conn {id}] Entering blocked state"); - } - - was_blocked = true; - } else { - if was_blocked { - trace!("[Conn {id}] Exiting blocked state"); - } - - was_blocked = false; } } } diff --git a/distant-net/src/server/context.rs b/distant-net/src/server/context.rs index b6bc63a..dd36065 100644 --- a/distant-net/src/server/context.rs +++ b/distant-net/src/server/context.rs @@ -1,5 +1,6 @@ use super::ServerReply; use crate::common::{ConnectionId, Request}; +use std::fmt; /// Represents contextual information for working with an inbound request. pub struct RequestCtx { @@ -12,3 +13,16 @@ pub struct RequestCtx { /// Used to send replies back to be sent out by the server. pub reply: ServerReply, } + +impl fmt::Debug for RequestCtx +where + T: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RequestCtx") + .field("connection_id", &self.connection_id) + .field("request", &self.request) + .field("reply", &"...") + .finish() + } +}