diff --git a/CHANGELOG.md b/CHANGELOG.md index 7558ec4..f061722 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Fixed + +- `shutdown-after` cli parameter and config option now properly shuts down + server after N seconds with no connections + ## [0.17.5] - 2022-08-18 ### Fixed diff --git a/distant-core/src/api.rs b/distant-core/src/api.rs index f1e5546..a91fdc9 100644 --- a/distant-core/src/api.rs +++ b/distant-core/src/api.rs @@ -3,7 +3,7 @@ use crate::{ ConnectionId, DistantMsg, DistantRequestData, DistantResponseData, }; use async_trait::async_trait; -use distant_net::{Reply, Server, ServerCtx}; +use distant_net::{Reply, Server, ServerConfig, ServerCtx}; use log::*; use std::{io, path::PathBuf, sync::Arc}; @@ -39,9 +39,9 @@ where impl DistantApiServer::LocalData> { /// Creates a new server using the [`LocalDistantApi`] implementation - pub fn local() -> io::Result { + pub fn local(config: ServerConfig) -> io::Result { Ok(Self { - api: LocalDistantApi::initialize()?, + api: LocalDistantApi::initialize(config)?, }) } } @@ -60,6 +60,11 @@ fn unsupported(label: &str) -> io::Result { pub trait DistantApi { type LocalData: Send + Sync; + /// Returns config associated with API server + fn config(&self) -> ServerConfig { + ServerConfig::default() + } + /// Invoked whenever a new connection is established, providing a mutable reference to the /// newly-created local data. This is a way to support modifying local data before it is used. #[allow(unused_variables)] @@ -385,6 +390,11 @@ where type Response = DistantMsg; type LocalData = D; + /// Overridden to leverage [`DistantApi`] implementation of `config` + fn config(&self) -> ServerConfig { + T::config(&self.api) + } + /// Overridden to leverage [`DistantApi`] implementation of `on_accept` async fn on_accept(&self, local_data: &mut Self::LocalData) { T::on_accept(&self.api, local_data).await diff --git a/distant-core/src/api/local.rs b/distant-core/src/api/local.rs index 2fdc740..819d816 100644 --- a/distant-core/src/api/local.rs +++ b/distant-core/src/api/local.rs @@ -6,6 +6,7 @@ use crate::{ DistantApi, DistantCtx, }; use async_trait::async_trait; +use distant_net::ServerConfig; use log::*; use std::{ io, @@ -25,13 +26,15 @@ use state::*; /// impementation of the API instead of a proxy to another machine as seen with /// implementations on top of SSH and other protocol pub struct LocalDistantApi { + config: ServerConfig, state: GlobalState, } impl LocalDistantApi { /// Initialize the api instance - pub fn initialize() -> io::Result { + pub fn initialize(config: ServerConfig) -> io::Result { Ok(Self { + config, state: GlobalState::initialize()?, }) } @@ -41,6 +44,10 @@ impl LocalDistantApi { impl DistantApi for LocalDistantApi { type LocalData = ConnectionState; + fn config(&self) -> ServerConfig { + self.config.clone() + } + /// Injects the global channels into the local connection async fn on_accept(&self, local_data: &mut Self::LocalData) { local_data.process_channel = self.state.process.clone_channel(); @@ -547,7 +554,7 @@ mod tests { DistantCtx, mpsc::Receiver, ) { - let api = LocalDistantApi::initialize().unwrap(); + let api = LocalDistantApi::initialize(Default::default()).unwrap(); let (reply, rx) = make_reply(buffer); let mut local_data = ConnectionState::default(); DistantApi::on_accept(&api, &mut local_data).await; diff --git a/distant-core/tests/manager_tests.rs b/distant-core/tests/manager_tests.rs index 6dbdd2c..ab7fe30 100644 --- a/distant-core/tests/manager_tests.rs +++ b/distant-core/tests/manager_tests.rs @@ -33,7 +33,7 @@ async fn should_be_able_to_establish_a_single_connection_and_communicate() { let (t1, t2) = FramedTransport::pair(100); // Spawn a server on one end - let _ = DistantApiServer::local() + let _ = DistantApiServer::local(Default::default()) .unwrap() .start(OneshotListener::from_value(t2.into_split()))?; diff --git a/distant-core/tests/stress/fixtures.rs b/distant-core/tests/stress/fixtures.rs index 5e8884a..33d2657 100644 --- a/distant-core/tests/stress/fixtures.rs +++ b/distant-core/tests/stress/fixtures.rs @@ -25,7 +25,7 @@ impl DistantClientCtx { let key = SecretKey::default(); let codec = XChaCha20Poly1305Codec::from(key.clone()); - if let Ok(api) = LocalDistantApi::initialize() { + if let Ok(api) = LocalDistantApi::initialize(Default::default()) { let port: PortRange = "0".parse().unwrap(); let port = { let server_ref = DistantApiServer::new(api) diff --git a/distant-net/src/server.rs b/distant-net/src/server.rs index 448d184..9626733 100644 --- a/distant-net/src/server.rs +++ b/distant-net/src/server.rs @@ -1,6 +1,9 @@ use async_trait::async_trait; use serde::{de::DeserializeOwned, Serialize}; +mod config; +pub use config::*; + mod connection; pub use connection::*; @@ -31,6 +34,11 @@ pub trait Server: Send { /// Type of data to store locally tied to the specific connection type LocalData: Send + Sync; + /// Returns configuration tied to server instance + fn config(&self) -> ServerConfig { + ServerConfig::default() + } + /// Invoked immediately on server start, being provided the raw listener to use (untyped /// transport), and returning the listener when ready to start (enabling servers that need to /// tweak a listener to do so) diff --git a/distant-net/src/server/config.rs b/distant-net/src/server/config.rs new file mode 100644 index 0000000..b6b7436 --- /dev/null +++ b/distant-net/src/server/config.rs @@ -0,0 +1,10 @@ +use serde::{Deserialize, Serialize}; +use std::time::Duration; + +/// Represents a general-purpose set of properties tied with a server instance +#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)] +pub struct ServerConfig { + /// If provided, will cause server to shut down if duration is exceeded with no active + /// connections + pub shutdown_after: Option, +} diff --git a/distant-net/src/server/ext.rs b/distant-net/src/server/ext.rs index 39a240c..5bc558d 100644 --- a/distant-net/src/server/ext.rs +++ b/distant-net/src/server/ext.rs @@ -1,11 +1,14 @@ use crate::{ - GenericServerRef, Listener, Request, Response, Server, ServerConnection, ServerCtx, ServerRef, - ServerReply, ServerState, TypedAsyncRead, TypedAsyncWrite, + utils::Timer, GenericServerRef, Listener, Request, Response, Server, ServerConnection, + ServerCtx, ServerRef, ServerReply, ServerState, TypedAsyncRead, TypedAsyncWrite, }; use log::*; use serde::{de::DeserializeOwned, Serialize}; -use std::{io, sync::Arc}; -use tokio::sync::mpsc; +use std::{ + io, + sync::{Arc, Weak}, +}; +use tokio::sync::{mpsc, Mutex}; mod tcp; pub use tcp::*; @@ -72,91 +75,162 @@ where R: TypedAsyncRead> + Send + 'static, W: TypedAsyncWrite> + Send + 'static, { + // Grab a copy of our server's configuration so we can leverage it below + let config = server.config(); + + // Create the timer that will be used shutdown the server after duration elapsed + let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1); + + // NOTE: We do a manual map such that the shutdown sender is not captured and dropped when + // there is no shutdown after configured. This is because we need the future for the + // shutdown receiver to last forever in the event that there is no shutdown configured, + // not return immediately, which is what would happen if the sender was dropped. + #[allow(clippy::manual_map)] + let mut shutdown_timer = match config.shutdown_after { + Some(duration) => Some(Timer::new(duration, async move { + let _ = shutdown_tx.send(()).await; + })), + None => None, + }; + + if let Some(timer) = shutdown_timer.as_mut() { + info!( + "Server shutdown timer configured: {}s", + timer.duration().as_secs_f32() + ); + timer.start(); + } + + let mut shutdown_timer = shutdown_timer.map(|timer| Arc::new(Mutex::new(timer))); + loop { let server = Arc::clone(&server); - match listener.accept().await { - Ok((mut writer, mut reader)) => { - let mut connection = ServerConnection::new(); - let connection_id = connection.id; - let state = Arc::clone(&state); - - // Create some default data for the new connection and pass it - // to the callback prior to processing new requests - let local_data = { - let mut data = Data::default(); - server.on_accept(&mut data).await; - Arc::new(data) - }; - - // Start a writer task that reads from a channel and forwards all - // data through the writer - let (tx, mut rx) = mpsc::channel::>(1); - connection.writer_task = Some(tokio::spawn(async move { - while let Some(data) = rx.recv().await { - // trace!("[Conn {}] Sending {:?}", connection_id, data.payload); - if let Err(x) = writer.write(data).await { - error!("[Conn {}] Failed to send {:?}", connection_id, x); - break; - } - } - })); - - // Start a reader task that reads requests and processes them - // using the provided handler - connection.reader_task = Some(tokio::spawn(async move { - loop { - match reader.read().await { - Ok(Some(request)) => { - let reply = ServerReply { - origin_id: request.id.clone(), - tx: tx.clone(), - }; - - let ctx = ServerCtx { - connection_id, - request, - reply: reply.clone(), - local_data: Arc::clone(&local_data), - }; - - server.on_request(ctx).await; - } - Ok(None) => { - debug!("[Conn {}] Connection closed", connection_id); - break; - } - Err(x) => { - // NOTE: We do NOT break out of the loop, as this could happen - // if someone sends bad data at any point, but does not - // mean that the reader itself has failed. This can - // happen from getting non-compliant typed data - error!("[Conn {}] {}", connection_id, x); - } + + // Receive a new connection, exiting if no longer accepting connections or if the shutdown + // signal has been received + let (mut writer, mut reader) = tokio::select! { + result = listener.accept() => { + match result { + Ok(x) => x, + Err(x) => { + error!("Server no longer accepting connections: {}", x); + if let Some(timer) = shutdown_timer.take() { + timer.lock().await.abort(); } + break; } - })); - - state - .connections - .write() - .await - .insert(connection_id, connection); + } } - Err(x) => { - error!("Server no longer accepting connections: {}", x); + _ = shutdown_rx.recv() => { + info!( + "Server shutdown triggered after {}s", + config.shutdown_after.unwrap_or_default().as_secs_f32(), + ); break; } + }; + + let mut connection = ServerConnection::new(); + let connection_id = connection.id; + let state = Arc::clone(&state); + + // Ensure that the shutdown timer is cancelled now that we have a connection + if let Some(timer) = shutdown_timer.as_ref() { + timer.lock().await.stop(); } + + // Create some default data for the new connection and pass it + // to the callback prior to processing new requests + let local_data = { + let mut data = Data::default(); + server.on_accept(&mut data).await; + Arc::new(data) + }; + + // Start a writer task that reads from a channel and forwards all + // data through the writer + let (tx, mut rx) = mpsc::channel::>(1); + connection.writer_task = Some(tokio::spawn(async move { + while let Some(data) = rx.recv().await { + // trace!("[Conn {}] Sending {:?}", connection_id, data.payload); + if let Err(x) = writer.write(data).await { + error!("[Conn {}] Failed to send {:?}", connection_id, x); + break; + } + } + })); + + // Start a reader task that reads requests and processes them + // using the provided handler + let weak_state = Arc::downgrade(&state); + let weak_shutdown_timer = shutdown_timer + .as_ref() + .map(Arc::downgrade) + .unwrap_or_default(); + connection.reader_task = Some(tokio::spawn(async move { + loop { + match reader.read().await { + Ok(Some(request)) => { + let reply = ServerReply { + origin_id: request.id.clone(), + tx: tx.clone(), + }; + + let ctx = ServerCtx { + connection_id, + request, + reply: reply.clone(), + local_data: Arc::clone(&local_data), + }; + + server.on_request(ctx).await; + } + Ok(None) => { + debug!("[Conn {}] Connection closed", connection_id); + + // Remove the connection from our state if it has closed + if let Some(state) = Weak::upgrade(&weak_state) { + state.connections.write().await.remove(&connection_id); + + // If we have no more connections, start the timer + if let Some(timer) = Weak::upgrade(&weak_shutdown_timer) { + if state.connections.read().await.is_empty() { + timer.lock().await.start(); + } + } + } + break; + } + Err(x) => { + // NOTE: We do NOT break out of the loop, as this could happen + // if someone sends bad data at any point, but does not + // mean that the reader itself has failed. This can + // happen from getting non-compliant typed data + error!("[Conn {}] {}", connection_id, x); + } + } + } + })); + + state + .connections + .write() + .await + .insert(connection_id, connection); } } #[cfg(test)] mod tests { use super::*; - use crate::{IntoSplit, MpscListener, MpscTransport}; + use crate::{ + IntoSplit, MpscListener, MpscTransport, MpscTransportReadHalf, MpscTransportWriteHalf, + ServerConfig, + }; use async_trait::async_trait; + use std::time::Duration; - pub struct TestServer; + pub struct TestServer(ServerConfig); #[async_trait] impl Server for TestServer { @@ -164,16 +238,36 @@ mod tests { type Response = String; type LocalData = (); + fn config(&self) -> ServerConfig { + self.0.clone() + } + async fn on_request(&self, ctx: ServerCtx) { // Always send back "hello" ctx.reply.send("hello".to_string()).await.unwrap(); } } + #[allow(clippy::type_complexity)] + fn make_listener( + buffer: usize, + ) -> ( + mpsc::Sender<( + MpscTransportWriteHalf>, + MpscTransportReadHalf>, + )>, + MpscListener<( + MpscTransportWriteHalf>, + MpscTransportReadHalf>, + )>, + ) { + MpscListener::channel(buffer) + } + #[tokio::test] async fn should_invoke_handler_upon_receiving_a_request() { // Create a test listener where we will forward a connection - let (tx, listener) = MpscListener::channel(100); + let (tx, listener) = make_listener(100); // Make bounded transport pair and send off one of them to act as our connection let (mut transport, connection) = @@ -182,7 +276,8 @@ mod tests { .await .expect("Failed to feed listener a connection"); - let _server = ServerExt::start(TestServer, listener).expect("Failed to start server"); + let _server = ServerExt::start(TestServer(ServerConfig::default()), listener) + .expect("Failed to start server"); transport .write(Request::new(123)) @@ -192,4 +287,93 @@ mod tests { let response: Response = transport.read().await.unwrap().unwrap(); assert_eq!(response.payload, "hello"); } + + #[tokio::test] + async fn should_shutdown_if_no_connections_received_after_n_secs_when_config_set() { + let (_tx, listener) = make_listener(100); + + let server = ServerExt::start( + TestServer(ServerConfig { + shutdown_after: Some(Duration::from_millis(100)), + }), + listener, + ) + .expect("Failed to start server"); + + // Wait for some time + tokio::time::sleep(Duration::from_millis(300)).await; + + assert!(server.is_finished(), "Server shutdown not triggered!"); + } + + #[tokio::test] + async fn should_shutdown_if_last_connection_terminated_and_then_no_connections_after_n_secs() { + // Create a test listener where we will forward a connection + let (tx, listener) = make_listener(100); + + // Make bounded transport pair and send off one of them to act as our connection + let (transport, connection) = MpscTransport::, Response>::pair(100); + tx.send(connection.into_split()) + .await + .expect("Failed to feed listener a connection"); + + let server = ServerExt::start( + TestServer(ServerConfig { + shutdown_after: Some(Duration::from_millis(100)), + }), + listener, + ) + .expect("Failed to start server"); + + // Drop the connection by dropping the transport + drop(transport); + + // Wait for some time + tokio::time::sleep(Duration::from_millis(300)).await; + + assert!(server.is_finished(), "Server shutdown not triggered!"); + } + + #[tokio::test] + async fn should_not_shutdown_as_long_as_a_connection_exists() { + // Create a test listener where we will forward a connection + let (tx, listener) = make_listener(100); + + // Make bounded transport pair and send off one of them to act as our connection + let (_transport, connection) = MpscTransport::, Response>::pair(100); + tx.send(connection.into_split()) + .await + .expect("Failed to feed listener a connection"); + + let server = ServerExt::start( + TestServer(ServerConfig { + shutdown_after: Some(Duration::from_millis(100)), + }), + listener, + ) + .expect("Failed to start server"); + + // Wait for some time + tokio::time::sleep(Duration::from_millis(300)).await; + + assert!(!server.is_finished(), "Server shutdown when it should not!"); + } + + #[tokio::test] + async fn should_never_shutdown_if_config_not_set() { + let (_tx, listener) = make_listener(100); + + let server = ServerExt::start( + TestServer(ServerConfig { + shutdown_after: None, + }), + listener, + ) + .expect("Failed to start server"); + + // Wait for some time + tokio::time::sleep(Duration::from_millis(300)).await; + + assert!(!server.is_finished(), "Server shutdown when it should not!"); + } } diff --git a/distant-net/src/utils.rs b/distant-net/src/utils.rs index 34b9074..ab8e510 100644 --- a/distant-net/src/utils.rs +++ b/distant-net/src/utils.rs @@ -1,5 +1,6 @@ use serde::{de::DeserializeOwned, Serialize}; -use std::io; +use std::{future::Future, io, time::Duration}; +use tokio::{sync::mpsc, task::JoinHandle}; pub fn serialize_to_vec(value: &T) -> io::Result> { rmp_serde::encode::to_vec_named(value).map_err(|x| { @@ -18,3 +19,147 @@ pub fn deserialize_from_slice(slice: &[u8]) -> io::Result +where + T: Send + 'static, +{ + active_timer: Option>, + callback: JoinHandle, + duration: Duration, + trigger: mpsc::Sender, +} + +impl Timer +where + T: Send + 'static, +{ + /// Create a new callback to trigger `future` that will be executed after `duration` is + /// exceeded. The timer is not started yet until `start` is invoked + pub fn new(duration: Duration, future: F) -> Self + where + F: Future + Send + 'static, + { + let (trigger, mut trigger_rx) = mpsc::channel(1); + let callback = tokio::spawn(async move { + trigger_rx.recv().await; + future.await + }); + + Self { + active_timer: None, + callback, + duration, + trigger, + } + } + + /// Returns duration of the timer + pub fn duration(&self) -> Duration { + self.duration + } + + /// Starts the timer, re-starting the countdown if already running. If the callback has already + /// been completed, this timer will not invoke it again; however, this will start the timer + /// itself, which will wait the duration and then fail to trigger the callback + pub fn start(&mut self) { + // Cancel the active timer task + self.stop(); + + // Exit early if callback completed as starting will do nothing + if self.callback.is_finished() { + return; + } + + // Create a new active timer task + let duration = self.duration; + let trigger = self.trigger.clone(); + self.active_timer = Some(tokio::spawn(async move { + tokio::time::sleep(duration).await; + let _ = trigger.send(true).await; + })); + } + + /// Stops the timer, cancelling the internal task, but leaving the callback in place in case + /// the timer is re-started later + pub fn stop(&mut self) { + // Delete the active timer task + if let Some(task) = self.active_timer.take() { + task.abort(); + } + } + + /// Aborts the timer's callback task and internal task to trigger the callback, which means + /// that the timer will never complete the callback and starting will have no effect + pub fn abort(&self) { + if let Some(task) = self.active_timer.as_ref() { + task.abort(); + } + + self.callback.abort(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + mod timer { + use super::*; + + #[tokio::test] + async fn should_not_invoke_callback_regardless_of_time_if_not_started() { + let timer = Timer::new(Duration::default(), async {}); + + tokio::time::sleep(Duration::from_millis(300)).await; + + assert!( + !timer.callback.is_finished(), + "Callback completed unexpectedly" + ); + } + + #[tokio::test] + async fn should_not_invoke_callback_if_only_stop_called() { + let mut timer = Timer::new(Duration::default(), async {}); + timer.stop(); + + tokio::time::sleep(Duration::from_millis(300)).await; + + assert!( + !timer.callback.is_finished(), + "Callback completed unexpectedly" + ); + } + + #[tokio::test] + async fn should_finish_callback_but_not_trigger_it_if_abort_called() { + let (tx, mut rx) = mpsc::channel(1); + + let timer = Timer::new(Duration::default(), async move { + let _ = tx.send(()).await; + }); + timer.abort(); + + tokio::time::sleep(Duration::from_millis(300)).await; + + assert!(timer.callback.is_finished(), "Callback not finished"); + assert!(rx.try_recv().is_err(), "Callback triggered unexpectedly"); + } + + #[tokio::test] + async fn should_trigger_callback_after_time_elapses_once_started() { + let (tx, mut rx) = mpsc::channel(1); + + let mut timer = Timer::new(Duration::default(), async move { + let _ = tx.send(()).await; + }); + timer.start(); + + tokio::time::sleep(Duration::from_millis(300)).await; + + assert!(timer.callback.is_finished(), "Callback not finished"); + assert!(rx.try_recv().is_ok(), "Callback not triggered"); + } + } +} diff --git a/distant-ssh2/tests/sshd/mod.rs b/distant-ssh2/tests/sshd/mod.rs index ee5ba39..c4048eb 100644 --- a/distant-ssh2/tests/sshd/mod.rs +++ b/distant-ssh2/tests/sshd/mod.rs @@ -529,6 +529,7 @@ pub async fn launched_client( let client = ssh_client .launch_and_connect(DistantLaunchOpts { binary, + args: "--shutdown-after 10".to_string(), ..Default::default() }) .await diff --git a/src/cli/commands/server.rs b/src/cli/commands/server.rs index cb1b68e..1efcfda 100644 --- a/src/cli/commands/server.rs +++ b/src/cli/commands/server.rs @@ -5,11 +5,17 @@ use crate::{ use anyhow::Context; use clap::Subcommand; use distant_core::{ - net::{SecretKey32, ServerRef, TcpServerExt, XChaCha20Poly1305Codec}, + net::{ + SecretKey32, ServerConfig as NetServerConfig, ServerRef, TcpServerExt, + XChaCha20Poly1305Codec, + }, DistantApiServer, DistantSingleKeyCredentials, Host, }; use log::*; -use std::io::{self, Read, Write}; +use std::{ + io::{self, Read, Write}, + time::Duration, +}; #[derive(Debug, Subcommand)] pub enum ServerSubcommand { @@ -36,18 +42,18 @@ pub enum ServerSubcommand { } impl ServerSubcommand { - pub fn run(self, _config: ServerConfig) -> CliResult { + pub fn run(self, config: ServerConfig) -> CliResult { match &self { - Self::Listen { daemon, .. } if *daemon => Self::run_daemon(self), + Self::Listen { daemon, .. } if *daemon => Self::run_daemon(self, config), Self::Listen { .. } => { let rt = tokio::runtime::Runtime::new().context("Failed to start up runtime")?; - rt.block_on(Self::async_run(self, false)) + rt.block_on(Self::async_run(self, config, false)) } } } #[cfg(windows)] - fn run_daemon(self) -> CliResult { + fn run_daemon(self, _config: ServerConfig) -> CliResult { use crate::cli::Spawner; use distant_core::net::{Listener, WindowsPipeListener}; use std::ffi::OsString; @@ -96,7 +102,7 @@ impl ServerSubcommand { } #[cfg(unix)] - fn run_daemon(self) -> CliResult { + fn run_daemon(self, config: ServerConfig) -> CliResult { use fork::{daemon, Fork}; // NOTE: We keep the stdin, stdout, stderr open so we can print out the pid with the parent @@ -104,7 +110,7 @@ impl ServerSubcommand { match daemon(true, true) { Ok(Fork::Child) => { let rt = tokio::runtime::Runtime::new().context("Failed to start up runtime")?; - rt.block_on(async { Self::async_run(self, true).await })?; + rt.block_on(async { Self::async_run(self, config, true).await })?; Ok(()) } Ok(Fork::Parent(pid)) => { @@ -119,21 +125,30 @@ impl ServerSubcommand { } } - async fn async_run(self, _is_forked: bool) -> CliResult { + async fn async_run(self, config: ServerConfig, _is_forked: bool) -> CliResult { match self { Self::Listen { - config, + config: listen_config, key_from_stdin, #[cfg(windows)] output_to_local_pipe, .. } => { - let host = config.host.unwrap_or(BindAddress::Any); + macro_rules! get { + (@flag $field:ident) => {{ + config.listen.$field || listen_config.$field + }}; + ($field:ident) => {{ + config.listen.$field.or(listen_config.$field) + }}; + } + + let host = get!(host).unwrap_or(BindAddress::Any); trace!("Starting server using unresolved host '{}'", host); - let addr = host.resolve(config.use_ipv6)?; + let addr = host.resolve(get!(@flag use_ipv6))?; // If specified, change the current working directory of this program - if let Some(path) = config.current_dir.as_ref() { + if let Some(path) = get!(current_dir) { debug!("Setting current directory to {:?}", path); std::env::set_current_dir(path) .context("Failed to set new current directory")?; @@ -156,25 +171,26 @@ impl ServerSubcommand { debug!( "Starting local API server, binding to {} {}", addr, - match config.port { + match get!(port) { Some(range) => format!("with port in range {}", range), None => "using an ephemeral port".to_string(), } ); - let server = DistantApiServer::local() - .context("Failed to create local distant api")? - .start(addr, config.port.unwrap_or_else(|| 0.into()), codec) - .await - .with_context(|| { - format!( - "Failed to start server @ {} with {}", - addr, - config - .port - .map(|p| format!("port in range {p}")) - .unwrap_or_else(|| String::from("ephemeral port")) - ) - })?; + let server = DistantApiServer::local(NetServerConfig { + shutdown_after: get!(shutdown_after).map(Duration::from_secs_f32), + }) + .context("Failed to create local distant api")? + .start(addr, get!(port).unwrap_or_else(|| 0.into()), codec) + .await + .with_context(|| { + format!( + "Failed to start server @ {} with {}", + addr, + get!(port) + .map(|p| format!("port in range {p}")) + .unwrap_or_else(|| String::from("ephemeral port")) + ) + })?; let credentials = DistantSingleKeyCredentials { host: Host::from(addr),