From 27dc5775f9b89a96b27df12b5ec43c9a1dcac03d Mon Sep 17 00:00:00 2001 From: Chip Senkbeil Date: Sun, 11 Dec 2022 04:15:26 -0600 Subject: [PATCH] Update ssh launch to use pty (#157) --- .github/workflows/ci.yml | 4 + CHANGELOG.md | 5 + distant-core/src/credentials.rs | 77 +++++++++--- distant-net/src/client.rs | 124 ++++++++++++++----- distant-net/src/client/config.rs | 6 + distant-ssh2/src/lib.rs | 175 ++++++++++++++++----------- distant-ssh2/tests/sshd/mod.rs | 113 ++++++++++------- src/cli/commands/manager/handlers.rs | 1 - src/config/client/launch.rs | 13 -- tests/cli/fixtures.rs | 4 +- 10 files changed, 346 insertions(+), 176 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ff8218a..951944b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -79,6 +79,7 @@ jobs: runs-on: ${{ matrix.os }} env: RUSTFLAGS: --cfg ci + RUST_LOG: trace strategy: fail-fast: false matrix: @@ -179,6 +180,9 @@ jobs: ssh-launch-tests: name: "Test ssh launch using Rust ${{ matrix.rust }} on ${{ matrix.os }}" runs-on: ${{ matrix.os }} + env: + RUSTFLAGS: --cfg ci + RUST_LOG: trace strategy: fail-fast: false matrix: diff --git a/CHANGELOG.md b/CHANGELOG.md index ebea231..a9087fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Persist option now removed from `ProcSpawn` message and CLI - Bump minimum Rust version to 1.64.0 +### Removed + +- `--no-shell` option is removed as we automatically detect and use the PTY of + the remote system using a default shell + ## [0.20.0-alpha.2] - 2022-11-20 ### Added diff --git a/distant-core/src/credentials.rs b/distant-core/src/credentials.rs index 7f88dbc..59360f9 100644 --- a/distant-core/src/credentials.rs +++ b/distant-core/src/credentials.rs @@ -95,8 +95,11 @@ impl<'de> Deserialize<'de> for DistantSingleKeyCredentials { impl DistantSingleKeyCredentials { /// Searches a str for `distant://[username]:{key}@{host}:{port}`, returning the first matching - /// credentials set if found - pub fn find(s: &str) -> Option { + /// credentials set if found, failing if anything is found immediately before or after the + /// credentials that is not whitespace or control characters + /// + /// If `strict` is false, then the scheme can be preceded by any character + pub fn find(s: &str, strict: bool) -> Option { let is_boundary = |c| char::is_whitespace(c) || char::is_control(c); for (i, _) in s.match_indices(SCHEME_WITH_SEP) { @@ -105,11 +108,11 @@ impl DistantSingleKeyCredentials { // Check character preceding the scheme to make sure it isn't a different scheme // Only whitespace or control characters preceding are okay, anything else is skipped - if !before.is_empty() && !before.ends_with(is_boundary) { + if strict && !before.is_empty() && !before.ends_with(is_boundary) { continue; } - // Consume until we reach whitespace, which indicates the potential end + // Consume until we reach whitespace or control, which indicates the potential end let s = match s.find(is_boundary) { Some(i) => &s[..i], None => s, @@ -124,6 +127,22 @@ impl DistantSingleKeyCredentials { None } + /// Equivalent to [`find(s, true)`]. + /// + /// [`find(s, true)`]: DistantSingleKeyCredentials::find + #[inline] + pub fn find_strict(s: &str) -> Option { + Self::find(s, true) + } + + /// Equivalent to [`find(s, false)`]. + /// + /// [`find(s, false)`]: DistantSingleKeyCredentials::find + #[inline] + pub fn find_lax(s: &str) -> Option { + Self::find(s, false) + } + /// Converts credentials into a [`Destination`] of the form /// `distant://[username]:{key}@{host}:{port}`, failing if the credentials would not produce a /// valid [`Destination`] @@ -175,29 +194,29 @@ mod tests { #[test] fn find_should_return_some_key_if_string_is_exact_match() { - let credentials = DistantSingleKeyCredentials::find(CREDENTIALS_STR_NO_USER.as_str()); + let credentials = DistantSingleKeyCredentials::find(CREDENTIALS_STR_NO_USER.as_str(), true); assert_eq!(credentials.unwrap(), *CREDENTIALS_NO_USER); - let credentials = DistantSingleKeyCredentials::find(CREDENTIALS_STR_USER.as_str()); + let credentials = DistantSingleKeyCredentials::find(CREDENTIALS_STR_USER.as_str(), true); assert_eq!(credentials.unwrap(), *CREDENTIALS_USER); } #[test] fn find_should_return_some_key_if_there_is_a_match_with_only_whitespace_on_either_side() { let s = format!(" {} ", CREDENTIALS_STR_NO_USER.as_str()); - let credentials = DistantSingleKeyCredentials::find(&s); + let credentials = DistantSingleKeyCredentials::find(&s, true); assert_eq!(credentials.unwrap(), *CREDENTIALS_NO_USER); let s = format!("\r{}\r", CREDENTIALS_STR_NO_USER.as_str()); - let credentials = DistantSingleKeyCredentials::find(&s); + let credentials = DistantSingleKeyCredentials::find(&s, true); assert_eq!(credentials.unwrap(), *CREDENTIALS_NO_USER); let s = format!("\t{}\t", CREDENTIALS_STR_NO_USER.as_str()); - let credentials = DistantSingleKeyCredentials::find(&s); + let credentials = DistantSingleKeyCredentials::find(&s, true); assert_eq!(credentials.unwrap(), *CREDENTIALS_NO_USER); let s = format!("\n{}\n", CREDENTIALS_STR_NO_USER.as_str()); - let credentials = DistantSingleKeyCredentials::find(&s); + let credentials = DistantSingleKeyCredentials::find(&s, true); assert_eq!(credentials.unwrap(), *CREDENTIALS_NO_USER); } @@ -205,7 +224,7 @@ mod tests { fn find_should_return_some_key_if_there_is_a_match_with_only_control_characters_on_either_side() { let s = format!("\x1b{} \x1b", CREDENTIALS_STR_NO_USER.as_str()); - let credentials = DistantSingleKeyCredentials::find(&s); + let credentials = DistantSingleKeyCredentials::find(&s, true); assert_eq!(credentials.unwrap(), *CREDENTIALS_NO_USER); } @@ -216,7 +235,7 @@ mod tests { CREDENTIALS_STR_NO_USER.as_str(), CREDENTIALS_STR_USER.as_str() ); - let credentials = DistantSingleKeyCredentials::find(&s); + let credentials = DistantSingleKeyCredentials::find(&s, true); assert_eq!(credentials.unwrap(), *CREDENTIALS_NO_USER); } @@ -228,14 +247,29 @@ mod tests { CREDENTIALS_STR_NO_USER.as_str(), CREDENTIALS_STR_NO_USER.as_str() ); - let credentials = DistantSingleKeyCredentials::find(&s); + let credentials = DistantSingleKeyCredentials::find(&s, true); assert_eq!(credentials.unwrap(), *CREDENTIALS_NO_USER); } #[test] - fn find_should_return_none_if_no_match_found() { + fn find_with_strict_false_should_ignore_any_character_preceding_scheme() { + let s = format!("a{}", CREDENTIALS_STR_NO_USER.as_str()); + let credentials = DistantSingleKeyCredentials::find(&s, false); + assert_eq!(credentials.unwrap(), *CREDENTIALS_NO_USER); + + let s = format!( + "a{} b{}", + CREDENTIALS_STR_NO_USER.as_str(), + CREDENTIALS_STR_NO_USER.as_str() + ); + let credentials = DistantSingleKeyCredentials::find(&s, false); + assert_eq!(credentials.unwrap(), *CREDENTIALS_NO_USER); + } + + #[test] + fn find_with_strict_true_should_not_find_if_non_whitespace_and_control_preceding_scheme() { let s = format!("a{}", CREDENTIALS_STR_NO_USER.as_str()); - let credentials = DistantSingleKeyCredentials::find(&s); + let credentials = DistantSingleKeyCredentials::find(&s, true); assert_eq!(credentials, None); let s = format!( @@ -243,7 +277,18 @@ mod tests { CREDENTIALS_STR_NO_USER.as_str(), CREDENTIALS_STR_NO_USER.as_str() ); - let credentials = DistantSingleKeyCredentials::find(&s); + let credentials = DistantSingleKeyCredentials::find(&s, true); + assert_eq!(credentials, None); + } + + #[test] + fn find_should_return_none_if_no_match_found() { + let s = "abc"; + let credentials = DistantSingleKeyCredentials::find(s, true); + assert_eq!(credentials, None); + + let s = "abc"; + let credentials = DistantSingleKeyCredentials::find(s, false); assert_eq!(credentials, None); } diff --git a/distant-net/src/client.rs b/distant-net/src/client.rs index 5c324c1..a99c5a1 100644 --- a/distant-net/src/client.rs +++ b/distant-net/src/client.rs @@ -48,8 +48,11 @@ pub struct UntypedClient { /// Used to send shutdown request to inner task. shutdown: Box, + /// Indicates whether the client task will be shutdown when the client is dropped. + shutdown_on_drop: bool, + /// Contains the task that is running to send requests and receive responses from a server. - task: JoinHandle>, + task: Option>>, } impl fmt::Debug for UntypedClient { @@ -58,24 +61,38 @@ impl fmt::Debug for UntypedClient { .field("channel", &self.channel) .field("shutdown", &"...") .field("task", &self.task) + .field("shutdown_on_drop", &self.shutdown_on_drop) .finish() } } +impl Drop for UntypedClient { + fn drop(&mut self) { + if self.shutdown_on_drop { + // TODO: Shutdown is an async operation, can we use it here? + if let Some(task) = self.task.take() { + debug!("Shutdown on drop = true, so aborting client task"); + task.abort(); + } + } + } +} + impl UntypedClient { /// Consumes the client, returning a typed variant. - pub fn into_typed_client(self) -> Client { + pub fn into_typed_client(mut self) -> Client { Client { - channel: self.channel.into_typed_channel(), - watcher: self.watcher, - shutdown: self.shutdown, - task: self.task, + channel: self.clone_channel().into_typed_channel(), + watcher: self.watcher.clone(), + shutdown: self.shutdown.clone(), + shutdown_on_drop: self.shutdown_on_drop, + task: self.task.take(), } } /// Convert into underlying channel. pub fn into_channel(self) -> UntypedChannel { - self.channel + self.clone_channel() } /// Clones the underlying channel for requests and returns the cloned instance. @@ -86,8 +103,8 @@ impl UntypedClient { /// Waits for the client to terminate, which resolves when the receiving end of the network /// connection is closed (or the client is shutdown). Returns whether or not the client exited /// successfully or due to an error. - pub async fn wait(self) -> io::Result<()> { - match self.task.await { + pub async fn wait(mut self) -> io::Result<()> { + match self.task.take().unwrap().await { Ok(x) => x, Err(x) => Err(io::Error::new(io::ErrorKind::Other, x)), } @@ -95,7 +112,9 @@ impl UntypedClient { /// Abort the client's current connection by forcing its tasks to abort. pub fn abort(&self) { - self.task.abort(); + if let Some(task) = self.task.as_ref() { + task.abort(); + } } /// Clones the underlying shutdown signaler for the client. This enables you to wait on the @@ -109,6 +128,18 @@ impl UntypedClient { self.shutdown.shutdown().await } + /// Returns whether the client should fully shutdown once it is dropped. If true, this will + /// result in all channels tied to the client no longer functioning once the client is dropped. + pub fn will_shutdown_on_drop(&mut self) -> bool { + self.shutdown_on_drop + } + + /// Sets whether the client should fully shutdown once it is dropped. If true, this will result + /// in all channels tied to the client no longer functioning once the client is dropped. + pub fn shutdown_on_drop(&mut self, shutdown_on_drop: bool) { + self.shutdown_on_drop = shutdown_on_drop; + } + /// Clones the underlying [`ConnectionStateWatcher`] for the client. pub fn clone_connection_watcher(&self) -> ConnectionWatcher { self.watcher.clone() @@ -125,7 +156,7 @@ impl UntypedClient { /// Returns true if client's underlying event processing has finished/terminated. pub fn is_finished(&self) -> bool { - self.task.is_finished() + self.task.is_none() || self.task.as_ref().unwrap().is_finished() } /// Spawns a client using the provided [`FramedTransport`] of [`InmemoryTransport`] and a @@ -161,6 +192,12 @@ impl UntypedClient { // Ensure that our transport starts off clean (nothing in buffers or backup) connection.clear(); + let ClientConfig { + mut reconnect_strategy, + shutdown_on_drop, + silence_duration, + } = config; + // Start a task that continually checks for responses and delivers them using the // post office let shutdown_tx_2 = shutdown_tx.clone(); @@ -175,10 +212,6 @@ impl UntypedClient { // would cause recv() to resolve immediately and result in the task shutting // down. let _shutdown_tx = shutdown_tx_2; - let ClientConfig { - mut reconnect_strategy, - silence_duration, - } = config; loop { // If we have flagged that a reconnect is needed, attempt to do so @@ -361,7 +394,8 @@ impl UntypedClient { channel, watcher: ConnectionWatcher(watcher_rx), shutdown: Box::new(shutdown_tx), - task, + shutdown_on_drop, + task: Some(task), } } } @@ -382,7 +416,7 @@ impl DerefMut for UntypedClient { impl From for UntypedChannel { fn from(client: UntypedClient) -> Self { - client.channel + client.into_channel() } } @@ -397,8 +431,11 @@ pub struct Client { /// Used to send shutdown request to inner task. shutdown: Box, + /// Indicates whether the client task will be shutdown when the client is dropped. + shutdown_on_drop: bool, + /// Contains the task that is running to send requests and receive responses from a server. - task: JoinHandle>, + task: Option>>, } impl fmt::Debug for Client { @@ -407,22 +444,36 @@ impl fmt::Debug for Client { .field("channel", &self.channel) .field("shutdown", &"...") .field("task", &self.task) + .field("shutdown_on_drop", &self.shutdown_on_drop) .finish() } } +impl Drop for Client { + fn drop(&mut self) { + if self.shutdown_on_drop { + // TODO: Shutdown is an async operation, can we use it here? + if let Some(task) = self.task.take() { + debug!("Shutdown on drop = true, so aborting client task"); + task.abort(); + } + } + } +} + impl Client where T: Send + Sync + Serialize + 'static, U: Send + Sync + DeserializeOwned + 'static, { /// Consumes the client, returning an untyped variant. - pub fn into_untyped_client(self) -> UntypedClient { + pub fn into_untyped_client(mut self) -> UntypedClient { UntypedClient { - channel: self.channel.into_untyped_channel(), - watcher: self.watcher, - shutdown: self.shutdown, - task: self.task, + channel: self.clone_channel().into_untyped_channel(), + watcher: self.watcher.clone(), + shutdown: self.shutdown.clone(), + shutdown_on_drop: self.shutdown_on_drop, + task: self.task.take(), } } @@ -483,7 +534,7 @@ impl Client<(), ()> { impl Client { /// Convert into underlying channel. pub fn into_channel(self) -> Channel { - self.channel + self.clone_channel() } /// Clones the underlying channel for requests and returns the cloned instance. @@ -494,8 +545,8 @@ impl Client { /// Waits for the client to terminate, which resolves when the receiving end of the network /// connection is closed (or the client is shutdown). Returns whether or not the client exited /// successfully or due to an error. - pub async fn wait(self) -> io::Result<()> { - match self.task.await { + pub async fn wait(mut self) -> io::Result<()> { + match self.task.take().unwrap().await { Ok(x) => x, Err(x) => Err(io::Error::new(io::ErrorKind::Other, x)), } @@ -503,7 +554,9 @@ impl Client { /// Abort the client's current connection by forcing its tasks to abort. pub fn abort(&self) { - self.task.abort(); + if let Some(task) = self.task.as_ref() { + task.abort(); + } } /// Clones the underlying shutdown signaler for the client. This enables you to wait on the @@ -517,6 +570,18 @@ impl Client { self.shutdown.shutdown().await } + /// Returns whether the client should fully shutdown once it is dropped. If true, this will + /// result in all channels tied to the client no longer functioning once the client is dropped. + pub fn will_shutdown_on_drop(&mut self) -> bool { + self.shutdown_on_drop + } + + /// Sets whether the client should fully shutdown once it is dropped. If true, this will result + /// in all channels tied to the client no longer functioning once the client is dropped. + pub fn shutdown_on_drop(&mut self, shutdown_on_drop: bool) { + self.shutdown_on_drop = shutdown_on_drop; + } + /// Clones the underlying [`ConnectionStateWatcher`] for the client. pub fn clone_connection_watcher(&self) -> ConnectionWatcher { self.watcher.clone() @@ -533,7 +598,7 @@ impl Client { /// Returns true if client's underlying event processing has finished/terminated. pub fn is_finished(&self) -> bool { - self.task.is_finished() + self.task.is_none() || self.task.as_ref().unwrap().is_finished() } } @@ -553,7 +618,7 @@ impl DerefMut for Client { impl From> for Channel { fn from(client: Client) -> Self { - client.channel + client.clone_channel() } } @@ -1238,6 +1303,7 @@ mod tests { max_retries: Some(3), timeout: None, }, + ..Default::default() }, ); diff --git a/distant-net/src/client/config.rs b/distant-net/src/client/config.rs index ad6c13b..2e48cd3 100644 --- a/distant-net/src/client/config.rs +++ b/distant-net/src/client/config.rs @@ -10,6 +10,10 @@ pub struct ClientConfig { /// Strategy to use when reconnecting to a server. pub reconnect_strategy: ReconnectStrategy, + /// If true, the client will shutdown its internal task once dropped, resulting in all channels + /// no longer receiving data. + pub shutdown_on_drop: bool, + /// A maximum duration to not receive any response/heartbeat from a server before deeming the /// server as lost and triggering a reconnect. pub silence_duration: Duration, @@ -19,6 +23,7 @@ impl ClientConfig { pub fn with_maximum_silence_duration(self) -> Self { Self { reconnect_strategy: self.reconnect_strategy, + shutdown_on_drop: self.shutdown_on_drop, silence_duration: MAXIMUM_SILENCE_DURATION, } } @@ -28,6 +33,7 @@ impl Default for ClientConfig { fn default() -> Self { Self { reconnect_strategy: ReconnectStrategy::Fail, + shutdown_on_drop: false, silence_duration: DEFAULT_SILENCE_DURATION, } } diff --git a/distant-ssh2/src/lib.rs b/distant-ssh2/src/lib.rs index 9967003..2ffd5c8 100644 --- a/distant-ssh2/src/lib.rs +++ b/distant-ssh2/src/lib.rs @@ -5,14 +5,13 @@ use async_compat::CompatExt; use async_once_cell::OnceCell; use async_trait::async_trait; use distant_core::{ - data::Environment, net::{ client::{Client, ClientConfig}, common::authentication::{AuthHandlerMap, DummyAuthHandler, Verifier}, - common::{InmemoryTransport, OneshotListener}, + common::{Host, InmemoryTransport, OneshotListener}, server::{Server, ServerRef}, }, - DistantApiServerHandler, DistantChannelExt, DistantClient, DistantSingleKeyCredentials, + DistantApiServerHandler, DistantClient, DistantSingleKeyCredentials, }; use log::*; use smol::channel::Receiver as SmolReceiver; @@ -25,7 +24,10 @@ use std::{ str::FromStr, time::Duration, }; -use wezterm_ssh::{Config as WezConfig, Session as WezSession, SessionEvent as WezSessionEvent}; +use wezterm_ssh::{ + ChildKiller, Config as WezConfig, MasterPty, PtySize, Session as WezSession, + SessionEvent as WezSessionEvent, +}; mod api; mod process; @@ -207,10 +209,6 @@ pub struct DistantLaunchOpts { /// Arguments to supply to the distant server when starting it pub args: String, - /// If true, launches via `echo distant listen ... | $SHELL -l`, otherwise attempts to launch - /// by directly invoking distant - pub use_login_shell: bool, - /// Timeout to use when connecting to the distant server pub timeout: Duration, } @@ -220,7 +218,6 @@ impl Default for DistantLaunchOpts { Self { binary: String::from("distant"), args: String::new(), - use_login_shell: false, timeout: Duration::from_secs(15), } } @@ -448,11 +445,13 @@ impl Ssh { while let Ok(event) = self.events.recv().await { match event { WezSessionEvent::Banner(banner) => { + trace!("ssh banner: {banner:?}"); if let Some(banner) = banner { handler.on_banner(banner.as_ref()).await; } } WezSessionEvent::HostVerify(verify) => { + trace!("ssh host verify: {verify:?}"); let verified = handler.on_verify_host(verify.message.as_str()).await?; verify .answer(verified) @@ -461,6 +460,7 @@ impl Ssh { .map_err(|x| io::Error::new(io::ErrorKind::Other, x))?; } WezSessionEvent::Authenticate(mut auth) => { + trace!("ssh authenticate: {auth:?}"); let ev = SshAuthEvent { username: auth.username.clone(), instructions: auth.instructions.clone(), @@ -481,10 +481,14 @@ impl Ssh { .map_err(|x| io::Error::new(io::ErrorKind::Other, x))?; } WezSessionEvent::Error(err) => { + trace!("ssh error: {err:?}"); handler.on_error(&err).await; return Err(io::Error::new(io::ErrorKind::PermissionDenied, err)); } - WezSessionEvent::Authenticated => break, + WezSessionEvent::Authenticated => { + trace!("ssh authenticated"); + break; + } } } @@ -603,10 +607,17 @@ impl Ssh { let family = self.detect_family().await?; trace!("Detected family: {}", family.as_static_str()); - let host = self.host().to_string(); + let host = self + .host() + .parse::() + .map_err(|x| io::Error::new(io::ErrorKind::InvalidInput, x))?; - // Turn our ssh connection into a client/server pair so we can use it to spawn our server - let (mut client, server) = self.into_distant_pair().await?; + let (mut pty, mut child) = self + .session + .request_pty("xterm-256color", PtySize::default(), None, None) + .compat() + .await + .map_err(utils::to_other_error)?; // Build arguments for distant to execute listen subcommand let mut args = vec![ @@ -622,70 +633,86 @@ impl Ssh { .map_err(|x| io::Error::new(io::ErrorKind::InvalidInput, x))?, }); - // If we are using a login shell, we need to make the binary be sh so we can appropriately - // pipe into the login shell. This is only available on unix. - let cmd = match family { - SshFamily::Unix if opts.use_login_shell => format!( - "sh -c {}", - shell_words::quote(&format!( - "echo {} {} | $SHELL -l", - opts.binary, - args.join(" ") - )) - ), - _ => format!("{} {}", opts.binary, args.join(" ")), - }; + // Write our command to stdin of pty to execute it + let cmd = format!("{} {}", opts.binary, args.join(" ")); + debug!("Executing {cmd}"); + pty.write_all(format!("{cmd}\r\n").as_bytes())?; + + // Get credentials from execution + let credentials = { + // Spawn a blocking thread to continually read stdout from the pty + let mut reader = pty.try_clone_reader().map_err(utils::to_other_error)?; + let (tx, mut rx) = tokio::sync::mpsc::channel::>(1); + let read_task = tokio::task::spawn_blocking(move || { + let mut buf = [0u8; 1024]; + while let Ok(n) = reader.read(&mut buf) { + if n == 0 { + break; + } + let _ = tx.blocking_send(buf[..n].to_vec()); + } + }); + + // Spawn an async task to read the forwarded stdout and attempt to detect credentials + // from the received stdout thus far. This will fail after waiting at least as long as + // the configured timeout duration. + // + // NOTE: We don't use `tokio::time::timeout` so we can capture and report back the + // stdout in the case of an error. Since there is no way easy way to know if the + // executed command on the pty failed, we rely on a timeout. + let start_instant = std::time::Instant::now(); + let timeout = opts.timeout; + tokio::spawn(async move { + let mut stdout = Vec::new(); + loop { + // Continually process received stdout + while let Ok(bytes) = rx.try_recv() { + trace!("Received {} more bytes over stdout", bytes.len()); + stdout.extend_from_slice(&bytes); + + if let Some(mut credentials) = + DistantSingleKeyCredentials::find_lax(&String::from_utf8_lossy(&stdout)) + { + credentials.host = host; + read_task.abort(); + return Ok(credentials); + } + } - // Spawn distant server and detach it so that we don't kill it when the - // ssh client is closed - debug!("Executing {}", cmd); - let output = client.output(cmd, Environment::new(), None, None).await?; - debug!( - "Completed with success = {}, code = {:?}", - output.success, output.code - ); + // We have waited at least as long as our timeout, so we fail + if start_instant.elapsed() >= timeout { + // Clean the bytes before including by removing anything that isn't ascii + // and isn't a control character (except whitespace) + stdout.retain(|b| { + b.is_ascii() && (b.is_ascii_whitespace() || !b.is_ascii_control()) + }); + + read_task.abort(); + return Err(io::Error::new( + io::ErrorKind::BrokenPipe, + format!( + "Failed to spawn server: '{}'", + shell_words::quote(&String::from_utf8_lossy(&stdout)) + ), + )); + } - // Close out ssh client by killing the internal server and client - server.shutdown(); - client.abort(); - let _ = client.wait().await; - - // If successful, grab the client information and establish a connection - // with the distant server - if output.success { - // Iterate over output as individual lines, looking for client info - trace!("Searching for credentials"); - match DistantSingleKeyCredentials::find(&String::from_utf8_lossy(&output.stdout)) { - Some(mut info) => { - info.host = host - .parse() - .map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x))?; - Ok(info) + // Otherwise, wait some period of time before trying again + tokio::time::sleep(Duration::from_millis(50)).await; } - None => Err(io::Error::new( - io::ErrorKind::InvalidData, - format!( - "Missing launch information: '{}'", - String::from_utf8_lossy(&output.stdout) - ), - )), - } - } else { - Err(io::Error::new( - io::ErrorKind::Other, - format!( - "Spawning distant failed [{}]: {}", - output - .code - .map(|x| x.to_string()) - .unwrap_or_else(|| String::from("???")), - match String::from_utf8(output.stderr) { - Ok(output) => output, - Err(x) => x.to_string(), - } - ), - )) - } + }) + }; + + // Wait a maximum amount of time before failing + trace!("Waiting for credentials to appear"); + let credentials = credentials.await??; + debug!("Got credentials"); + + // Attempt to kill the pty, but don't block if it fails + drop(pty); + let _ = child.kill(); + + Ok(credentials) } /// Consume [`Ssh`] and produce a [`DistantClient`] that is powered by an ssh client diff --git a/distant-ssh2/tests/sshd/mod.rs b/distant-ssh2/tests/sshd/mod.rs index a9738a2..09a49a0 100644 --- a/distant-ssh2/tests/sshd/mod.rs +++ b/distant-ssh2/tests/sshd/mod.rs @@ -6,6 +6,7 @@ use derive_more::Display; use derive_more::{Deref, DerefMut}; use distant_core::DistantClient; use distant_ssh2::{DistantLaunchOpts, Ssh, SshAuthEvent, SshAuthHandler, SshOpts}; +use log::*; use once_cell::sync::Lazy; use rstest::*; use std::{ @@ -19,7 +20,7 @@ use std::{ Mutex, }, thread, - time::Duration, + time::{Duration, Instant}, }; #[cfg(unix)] @@ -27,11 +28,11 @@ use std::os::unix::fs::PermissionsExt; #[derive(Deref, DerefMut)] pub struct Ctx { - pub sshd: Sshd, - #[deref] #[deref_mut] pub value: T, + + pub sshd: Sshd, } // NOTE: Should find path @@ -52,6 +53,8 @@ const WAIT_AFTER_SPAWN: Duration = Duration::from_millis(300); /// Maximum times to retry spawning sshd when it fails const SPAWN_RETRY_CNT: usize = 3; +const MAX_DROP_WAIT_TIME: Duration = Duration::from_millis(500); + pub struct SshKeygen; impl SshKeygen { @@ -421,7 +424,7 @@ impl Sshd { // Otherwise, try next port Err(_) | Ok(Err(_)) => { - eprintln!("sshd could not spawn on port {port}, so trying next port"); + error!("sshd could not spawn on port {port}, so trying next port"); continue; } } @@ -477,7 +480,7 @@ impl Sshd { true } Ok(Err((code, msg))) => { - eprintln!( + error!( "sshd died w/ exit code {}: {msg}", if let Some(code) = code { code.to_string() @@ -488,41 +491,47 @@ impl Sshd { false } Err(x) => { - eprintln!("Failed to check status of sshd: {x}"); + error!("Failed to check status of sshd: {x}"); false } } } else { - eprintln!("sshd is dead!"); + error!("sshd is dead!"); false } } fn print_log_file(&self) { if let Ok(log) = std::fs::read_to_string(&self.log_file) { - eprintln!(); - eprintln!("===================="); - eprintln!("= SSHD LOG FILE "); - eprintln!("===================="); - eprintln!(); - eprintln!("{log}"); - eprintln!(); - eprintln!("===================="); - eprintln!(); + let mut out = String::new(); + out.push('\n'); + out.push_str("====================\n"); + out.push_str("= SSHD LOG FILE \n"); + out.push_str("====================\n"); + out.push('\n'); + out.push_str(&log); + out.push('\n'); + out.push('\n'); + out.push_str("====================\n"); + out.push('\n'); + error!("{out}"); } } fn print_config_file(&self) { if let Ok(contents) = std::fs::read_to_string(&self.config_file) { - eprintln!(); - eprintln!("===================="); - eprintln!("= SSHD CONFIG FILE "); - eprintln!("===================="); - eprintln!(); - eprintln!("{contents}"); - eprintln!(); - eprintln!("===================="); - eprintln!(); + let mut out = String::new(); + out.push('\n'); + out.push_str("====================\n"); + out.push_str("= SSHD CONFIG FILE \n"); + out.push_str("====================\n"); + out.push('\n'); + out.push_str(&contents); + out.push('\n'); + out.push('\n'); + out.push_str("====================\n"); + out.push('\n'); + error!("{out}"); } } } @@ -530,9 +539,27 @@ impl Sshd { impl Drop for Sshd { /// Kills server upon drop fn drop(&mut self) { + debug!("Dropping sshd"); if let Some(mut child) = self.child.lock().unwrap().take() { let _ = child.kill(); - let _ = child.wait(); + + // Wait for a maximum period of time + let start = Instant::now(); + while start.elapsed() < MAX_DROP_WAIT_TIME { + match child.try_wait() { + Ok(Some(_)) => { + debug!("Sshd finished"); + return; + } + Err(x) => { + error!("Failed to wait for sshd to quit: {x}"); + return; + } + _ => thread::sleep(MAX_DROP_WAIT_TIME / 10), + } + } + + error!("Timed out waiting for sshd to quit"); } } } @@ -543,21 +570,21 @@ pub struct MockSshAuthHandler; #[async_trait] impl SshAuthHandler for MockSshAuthHandler { async fn on_authenticate(&self, event: SshAuthEvent) -> io::Result> { - eprintln!("on_authenticate: {:?}", event); + debug!("on_authenticate: {:?}", event); Ok(vec![String::new(); event.prompts.len()]) } async fn on_verify_host(&self, host: &str) -> io::Result { - eprintln!("on_host_verify: {}", host); + debug!("on_host_verify: {}", host); Ok(true) } async fn on_banner(&self, text: &str) { - eprintln!("on_banner: {:?}", text); + debug!("on_banner: {:?}", text); } async fn on_error(&self, text: &str) { - eprintln!("on_error: {:?}", text); + debug!("on_error: {:?}", text); } } @@ -603,11 +630,12 @@ pub fn sshd() -> Sshd { #[fixture] pub async fn client(sshd: Sshd) -> Ctx { let ssh_client = load_ssh_client(&sshd).await; - let client = ssh_client + let mut client = ssh_client .into_distant_client() .await .context("Failed to convert into distant client") .unwrap(); + client.shutdown_on_drop(true); Ctx { sshd, value: client, @@ -618,27 +646,28 @@ pub async fn client(sshd: Sshd) -> Ctx { #[fixture] pub async fn launched_client(sshd: Sshd) -> Ctx { let binary = std::env::var("DISTANT_PATH").unwrap_or_else(|_| String::from("distant")); - eprintln!("Setting path to distant binary as {binary}"); + debug!("Setting path to distant binary as {binary}"); // Attempt to launch the server and connect to it, using $DISTANT_PATH as the path to the // binary if provided, defaulting to assuming the binary is on our ssh path otherwise + // + // NOTE: Wrapping in ctx does not fully clean up the test as the launched distant server + // is not cleaned up during drop. We don't know what the server's pid is, so our + // only option would be to look up all running distant servers and kill them on drop, + // but that would cause other tests to fail. + // + // Setting an expiration of 1s would clean up running servers and possibly be good enough let ssh_client = load_ssh_client(&sshd).await; - let client = ssh_client + let mut client = ssh_client .launch_and_connect(DistantLaunchOpts { binary, - args: "--shutdown after=10".to_string(), + args: "--shutdown lonely=10".to_string(), ..Default::default() }) .await .context("Failed to launch and connect to distant server") .unwrap(); - - // TODO: Wrapping in ctx does not fully clean up the test as the launched distant server - // is not cleaned up during drop. We don't know what the server's pid is, so our - // only option would be to look up all running distant servers and kill them on drop, - // but that would cause other tests to fail. - // - // Setting an expiration of 1s would clean up running servers and possibly be good enough + client.shutdown_on_drop(true); Ctx { sshd, value: client, @@ -703,7 +732,7 @@ async fn load_ssh_client(sshd: &Sshd) -> Ssh { // Check if still alive, which will print out messages if sshd.check_is_alive() { - eprintln!("sshd is still alive, so something else is going on"); + warn!("sshd is still alive, so something else is going on"); } // We want to print out the log file from sshd in case it sheds clues on problem diff --git a/src/cli/commands/manager/handlers.rs b/src/cli/commands/manager/handlers.rs index 42cee96..fd1a972 100644 --- a/src/cli/commands/manager/handlers.rs +++ b/src/cli/commands/manager/handlers.rs @@ -178,7 +178,6 @@ impl LaunchHandler for SshLaunchHandler { DistantLaunchOpts { binary: config.distant.bin.unwrap_or(opts.binary), args: config.distant.args.unwrap_or(opts.args), - use_login_shell: !config.distant.no_shell, timeout: match options.get("timeout") { Some(s) => std::time::Duration::from_millis( s.parse::().map_err(|_| invalid("timeout"))?, diff --git a/src/config/client/launch.rs b/src/config/client/launch.rs index 0e2287d..132a5fc 100644 --- a/src/config/client/launch.rs +++ b/src/config/client/launch.rs @@ -28,10 +28,6 @@ impl From for ClientLaunchConfig { .remove("distant.bind_server") .and_then(|x| x.parse::().ok()), args: map.remove("distant.args"), - no_shell: map - .remove("distant.no_shell") - .and_then(|x| x.parse::().ok()) - .unwrap_or_default(), }, options: map, } @@ -54,11 +50,6 @@ impl From for Map { this.insert("distant.args".to_string(), x); } - this.insert( - "distant.no_shell".to_string(), - config.distant.no_shell.to_string(), - ); - this.extend(config.options); this @@ -90,8 +81,4 @@ pub struct ClientLaunchDistantConfig { /// Additional arguments to provide to the server #[clap(name = "distant-args", long, allow_hyphen_values(true))] pub args: Option, - - /// If specified, will not launch distant using a login shell but instead execute it directly - #[clap(long)] - pub no_shell: bool, } diff --git a/tests/cli/fixtures.rs b/tests/cli/fixtures.rs index 779a25c..aa833fc 100644 --- a/tests/cli/fixtures.rs +++ b/tests/cli/fixtures.rs @@ -130,7 +130,9 @@ impl DistantManagerCtx { let mut buf = [0u8; 1024]; while let Ok(n) = reader.read(&mut buf) { lines.push_str(&String::from_utf8_lossy(&buf[..n])); - if let Some(credentials) = DistantSingleKeyCredentials::find(&lines) { + if let Some(credentials) = + DistantSingleKeyCredentials::find(&lines, /* strict */ false) + { return credentials; } }