Update ssh launch to use pty (#157)

pull/172/head
Chip Senkbeil 1 year ago committed by GitHub
parent 9b2f0de0c5
commit 27dc5775f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -79,6 +79,7 @@ jobs:
runs-on: ${{ matrix.os }}
env:
RUSTFLAGS: --cfg ci
RUST_LOG: trace
strategy:
fail-fast: false
matrix:
@ -179,6 +180,9 @@ jobs:
ssh-launch-tests:
name: "Test ssh launch using Rust ${{ matrix.rust }} on ${{ matrix.os }}"
runs-on: ${{ matrix.os }}
env:
RUSTFLAGS: --cfg ci
RUST_LOG: trace
strategy:
fail-fast: false
matrix:

@ -30,6 +30,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Persist option now removed from `ProcSpawn` message and CLI
- Bump minimum Rust version to 1.64.0
### Removed
- `--no-shell` option is removed as we automatically detect and use the PTY of
the remote system using a default shell
## [0.20.0-alpha.2] - 2022-11-20
### Added

@ -95,8 +95,11 @@ impl<'de> Deserialize<'de> for DistantSingleKeyCredentials {
impl DistantSingleKeyCredentials {
/// Searches a str for `distant://[username]:{key}@{host}:{port}`, returning the first matching
/// credentials set if found
pub fn find(s: &str) -> Option<DistantSingleKeyCredentials> {
/// credentials set if found, failing if anything is found immediately before or after the
/// credentials that is not whitespace or control characters
///
/// If `strict` is false, then the scheme can be preceded by any character
pub fn find(s: &str, strict: bool) -> Option<DistantSingleKeyCredentials> {
let is_boundary = |c| char::is_whitespace(c) || char::is_control(c);
for (i, _) in s.match_indices(SCHEME_WITH_SEP) {
@ -105,11 +108,11 @@ impl DistantSingleKeyCredentials {
// Check character preceding the scheme to make sure it isn't a different scheme
// Only whitespace or control characters preceding are okay, anything else is skipped
if !before.is_empty() && !before.ends_with(is_boundary) {
if strict && !before.is_empty() && !before.ends_with(is_boundary) {
continue;
}
// Consume until we reach whitespace, which indicates the potential end
// Consume until we reach whitespace or control, which indicates the potential end
let s = match s.find(is_boundary) {
Some(i) => &s[..i],
None => s,
@ -124,6 +127,22 @@ impl DistantSingleKeyCredentials {
None
}
/// Equivalent to [`find(s, true)`].
///
/// [`find(s, true)`]: DistantSingleKeyCredentials::find
#[inline]
pub fn find_strict(s: &str) -> Option<DistantSingleKeyCredentials> {
Self::find(s, true)
}
/// Equivalent to [`find(s, false)`].
///
/// [`find(s, false)`]: DistantSingleKeyCredentials::find
#[inline]
pub fn find_lax(s: &str) -> Option<DistantSingleKeyCredentials> {
Self::find(s, false)
}
/// Converts credentials into a [`Destination`] of the form
/// `distant://[username]:{key}@{host}:{port}`, failing if the credentials would not produce a
/// valid [`Destination`]
@ -175,29 +194,29 @@ mod tests {
#[test]
fn find_should_return_some_key_if_string_is_exact_match() {
let credentials = DistantSingleKeyCredentials::find(CREDENTIALS_STR_NO_USER.as_str());
let credentials = DistantSingleKeyCredentials::find(CREDENTIALS_STR_NO_USER.as_str(), true);
assert_eq!(credentials.unwrap(), *CREDENTIALS_NO_USER);
let credentials = DistantSingleKeyCredentials::find(CREDENTIALS_STR_USER.as_str());
let credentials = DistantSingleKeyCredentials::find(CREDENTIALS_STR_USER.as_str(), true);
assert_eq!(credentials.unwrap(), *CREDENTIALS_USER);
}
#[test]
fn find_should_return_some_key_if_there_is_a_match_with_only_whitespace_on_either_side() {
let s = format!(" {} ", CREDENTIALS_STR_NO_USER.as_str());
let credentials = DistantSingleKeyCredentials::find(&s);
let credentials = DistantSingleKeyCredentials::find(&s, true);
assert_eq!(credentials.unwrap(), *CREDENTIALS_NO_USER);
let s = format!("\r{}\r", CREDENTIALS_STR_NO_USER.as_str());
let credentials = DistantSingleKeyCredentials::find(&s);
let credentials = DistantSingleKeyCredentials::find(&s, true);
assert_eq!(credentials.unwrap(), *CREDENTIALS_NO_USER);
let s = format!("\t{}\t", CREDENTIALS_STR_NO_USER.as_str());
let credentials = DistantSingleKeyCredentials::find(&s);
let credentials = DistantSingleKeyCredentials::find(&s, true);
assert_eq!(credentials.unwrap(), *CREDENTIALS_NO_USER);
let s = format!("\n{}\n", CREDENTIALS_STR_NO_USER.as_str());
let credentials = DistantSingleKeyCredentials::find(&s);
let credentials = DistantSingleKeyCredentials::find(&s, true);
assert_eq!(credentials.unwrap(), *CREDENTIALS_NO_USER);
}
@ -205,7 +224,7 @@ mod tests {
fn find_should_return_some_key_if_there_is_a_match_with_only_control_characters_on_either_side()
{
let s = format!("\x1b{} \x1b", CREDENTIALS_STR_NO_USER.as_str());
let credentials = DistantSingleKeyCredentials::find(&s);
let credentials = DistantSingleKeyCredentials::find(&s, true);
assert_eq!(credentials.unwrap(), *CREDENTIALS_NO_USER);
}
@ -216,7 +235,7 @@ mod tests {
CREDENTIALS_STR_NO_USER.as_str(),
CREDENTIALS_STR_USER.as_str()
);
let credentials = DistantSingleKeyCredentials::find(&s);
let credentials = DistantSingleKeyCredentials::find(&s, true);
assert_eq!(credentials.unwrap(), *CREDENTIALS_NO_USER);
}
@ -228,14 +247,29 @@ mod tests {
CREDENTIALS_STR_NO_USER.as_str(),
CREDENTIALS_STR_NO_USER.as_str()
);
let credentials = DistantSingleKeyCredentials::find(&s);
let credentials = DistantSingleKeyCredentials::find(&s, true);
assert_eq!(credentials.unwrap(), *CREDENTIALS_NO_USER);
}
#[test]
fn find_should_return_none_if_no_match_found() {
fn find_with_strict_false_should_ignore_any_character_preceding_scheme() {
let s = format!("a{}", CREDENTIALS_STR_NO_USER.as_str());
let credentials = DistantSingleKeyCredentials::find(&s, false);
assert_eq!(credentials.unwrap(), *CREDENTIALS_NO_USER);
let s = format!(
"a{} b{}",
CREDENTIALS_STR_NO_USER.as_str(),
CREDENTIALS_STR_NO_USER.as_str()
);
let credentials = DistantSingleKeyCredentials::find(&s, false);
assert_eq!(credentials.unwrap(), *CREDENTIALS_NO_USER);
}
#[test]
fn find_with_strict_true_should_not_find_if_non_whitespace_and_control_preceding_scheme() {
let s = format!("a{}", CREDENTIALS_STR_NO_USER.as_str());
let credentials = DistantSingleKeyCredentials::find(&s);
let credentials = DistantSingleKeyCredentials::find(&s, true);
assert_eq!(credentials, None);
let s = format!(
@ -243,7 +277,18 @@ mod tests {
CREDENTIALS_STR_NO_USER.as_str(),
CREDENTIALS_STR_NO_USER.as_str()
);
let credentials = DistantSingleKeyCredentials::find(&s);
let credentials = DistantSingleKeyCredentials::find(&s, true);
assert_eq!(credentials, None);
}
#[test]
fn find_should_return_none_if_no_match_found() {
let s = "abc";
let credentials = DistantSingleKeyCredentials::find(s, true);
assert_eq!(credentials, None);
let s = "abc";
let credentials = DistantSingleKeyCredentials::find(s, false);
assert_eq!(credentials, None);
}

@ -48,8 +48,11 @@ pub struct UntypedClient {
/// Used to send shutdown request to inner task.
shutdown: Box<dyn Shutdown>,
/// Indicates whether the client task will be shutdown when the client is dropped.
shutdown_on_drop: bool,
/// Contains the task that is running to send requests and receive responses from a server.
task: JoinHandle<io::Result<()>>,
task: Option<JoinHandle<io::Result<()>>>,
}
impl fmt::Debug for UntypedClient {
@ -58,24 +61,38 @@ impl fmt::Debug for UntypedClient {
.field("channel", &self.channel)
.field("shutdown", &"...")
.field("task", &self.task)
.field("shutdown_on_drop", &self.shutdown_on_drop)
.finish()
}
}
impl Drop for UntypedClient {
fn drop(&mut self) {
if self.shutdown_on_drop {
// TODO: Shutdown is an async operation, can we use it here?
if let Some(task) = self.task.take() {
debug!("Shutdown on drop = true, so aborting client task");
task.abort();
}
}
}
}
impl UntypedClient {
/// Consumes the client, returning a typed variant.
pub fn into_typed_client<T, U>(self) -> Client<T, U> {
pub fn into_typed_client<T, U>(mut self) -> Client<T, U> {
Client {
channel: self.channel.into_typed_channel(),
watcher: self.watcher,
shutdown: self.shutdown,
task: self.task,
channel: self.clone_channel().into_typed_channel(),
watcher: self.watcher.clone(),
shutdown: self.shutdown.clone(),
shutdown_on_drop: self.shutdown_on_drop,
task: self.task.take(),
}
}
/// Convert into underlying channel.
pub fn into_channel(self) -> UntypedChannel {
self.channel
self.clone_channel()
}
/// Clones the underlying channel for requests and returns the cloned instance.
@ -86,8 +103,8 @@ impl UntypedClient {
/// Waits for the client to terminate, which resolves when the receiving end of the network
/// connection is closed (or the client is shutdown). Returns whether or not the client exited
/// successfully or due to an error.
pub async fn wait(self) -> io::Result<()> {
match self.task.await {
pub async fn wait(mut self) -> io::Result<()> {
match self.task.take().unwrap().await {
Ok(x) => x,
Err(x) => Err(io::Error::new(io::ErrorKind::Other, x)),
}
@ -95,7 +112,9 @@ impl UntypedClient {
/// Abort the client's current connection by forcing its tasks to abort.
pub fn abort(&self) {
self.task.abort();
if let Some(task) = self.task.as_ref() {
task.abort();
}
}
/// Clones the underlying shutdown signaler for the client. This enables you to wait on the
@ -109,6 +128,18 @@ impl UntypedClient {
self.shutdown.shutdown().await
}
/// Returns whether the client should fully shutdown once it is dropped. If true, this will
/// result in all channels tied to the client no longer functioning once the client is dropped.
pub fn will_shutdown_on_drop(&mut self) -> bool {
self.shutdown_on_drop
}
/// Sets whether the client should fully shutdown once it is dropped. If true, this will result
/// in all channels tied to the client no longer functioning once the client is dropped.
pub fn shutdown_on_drop(&mut self, shutdown_on_drop: bool) {
self.shutdown_on_drop = shutdown_on_drop;
}
/// Clones the underlying [`ConnectionStateWatcher`] for the client.
pub fn clone_connection_watcher(&self) -> ConnectionWatcher {
self.watcher.clone()
@ -125,7 +156,7 @@ impl UntypedClient {
/// Returns true if client's underlying event processing has finished/terminated.
pub fn is_finished(&self) -> bool {
self.task.is_finished()
self.task.is_none() || self.task.as_ref().unwrap().is_finished()
}
/// Spawns a client using the provided [`FramedTransport`] of [`InmemoryTransport`] and a
@ -161,6 +192,12 @@ impl UntypedClient {
// Ensure that our transport starts off clean (nothing in buffers or backup)
connection.clear();
let ClientConfig {
mut reconnect_strategy,
shutdown_on_drop,
silence_duration,
} = config;
// Start a task that continually checks for responses and delivers them using the
// post office
let shutdown_tx_2 = shutdown_tx.clone();
@ -175,10 +212,6 @@ impl UntypedClient {
// would cause recv() to resolve immediately and result in the task shutting
// down.
let _shutdown_tx = shutdown_tx_2;
let ClientConfig {
mut reconnect_strategy,
silence_duration,
} = config;
loop {
// If we have flagged that a reconnect is needed, attempt to do so
@ -361,7 +394,8 @@ impl UntypedClient {
channel,
watcher: ConnectionWatcher(watcher_rx),
shutdown: Box::new(shutdown_tx),
task,
shutdown_on_drop,
task: Some(task),
}
}
}
@ -382,7 +416,7 @@ impl DerefMut for UntypedClient {
impl From<UntypedClient> for UntypedChannel {
fn from(client: UntypedClient) -> Self {
client.channel
client.into_channel()
}
}
@ -397,8 +431,11 @@ pub struct Client<T, U> {
/// Used to send shutdown request to inner task.
shutdown: Box<dyn Shutdown>,
/// Indicates whether the client task will be shutdown when the client is dropped.
shutdown_on_drop: bool,
/// Contains the task that is running to send requests and receive responses from a server.
task: JoinHandle<io::Result<()>>,
task: Option<JoinHandle<io::Result<()>>>,
}
impl<T, U> fmt::Debug for Client<T, U> {
@ -407,22 +444,36 @@ impl<T, U> fmt::Debug for Client<T, U> {
.field("channel", &self.channel)
.field("shutdown", &"...")
.field("task", &self.task)
.field("shutdown_on_drop", &self.shutdown_on_drop)
.finish()
}
}
impl<T, U> Drop for Client<T, U> {
fn drop(&mut self) {
if self.shutdown_on_drop {
// TODO: Shutdown is an async operation, can we use it here?
if let Some(task) = self.task.take() {
debug!("Shutdown on drop = true, so aborting client task");
task.abort();
}
}
}
}
impl<T, U> Client<T, U>
where
T: Send + Sync + Serialize + 'static,
U: Send + Sync + DeserializeOwned + 'static,
{
/// Consumes the client, returning an untyped variant.
pub fn into_untyped_client(self) -> UntypedClient {
pub fn into_untyped_client(mut self) -> UntypedClient {
UntypedClient {
channel: self.channel.into_untyped_channel(),
watcher: self.watcher,
shutdown: self.shutdown,
task: self.task,
channel: self.clone_channel().into_untyped_channel(),
watcher: self.watcher.clone(),
shutdown: self.shutdown.clone(),
shutdown_on_drop: self.shutdown_on_drop,
task: self.task.take(),
}
}
@ -483,7 +534,7 @@ impl Client<(), ()> {
impl<T, U> Client<T, U> {
/// Convert into underlying channel.
pub fn into_channel(self) -> Channel<T, U> {
self.channel
self.clone_channel()
}
/// Clones the underlying channel for requests and returns the cloned instance.
@ -494,8 +545,8 @@ impl<T, U> Client<T, U> {
/// Waits for the client to terminate, which resolves when the receiving end of the network
/// connection is closed (or the client is shutdown). Returns whether or not the client exited
/// successfully or due to an error.
pub async fn wait(self) -> io::Result<()> {
match self.task.await {
pub async fn wait(mut self) -> io::Result<()> {
match self.task.take().unwrap().await {
Ok(x) => x,
Err(x) => Err(io::Error::new(io::ErrorKind::Other, x)),
}
@ -503,7 +554,9 @@ impl<T, U> Client<T, U> {
/// Abort the client's current connection by forcing its tasks to abort.
pub fn abort(&self) {
self.task.abort();
if let Some(task) = self.task.as_ref() {
task.abort();
}
}
/// Clones the underlying shutdown signaler for the client. This enables you to wait on the
@ -517,6 +570,18 @@ impl<T, U> Client<T, U> {
self.shutdown.shutdown().await
}
/// Returns whether the client should fully shutdown once it is dropped. If true, this will
/// result in all channels tied to the client no longer functioning once the client is dropped.
pub fn will_shutdown_on_drop(&mut self) -> bool {
self.shutdown_on_drop
}
/// Sets whether the client should fully shutdown once it is dropped. If true, this will result
/// in all channels tied to the client no longer functioning once the client is dropped.
pub fn shutdown_on_drop(&mut self, shutdown_on_drop: bool) {
self.shutdown_on_drop = shutdown_on_drop;
}
/// Clones the underlying [`ConnectionStateWatcher`] for the client.
pub fn clone_connection_watcher(&self) -> ConnectionWatcher {
self.watcher.clone()
@ -533,7 +598,7 @@ impl<T, U> Client<T, U> {
/// Returns true if client's underlying event processing has finished/terminated.
pub fn is_finished(&self) -> bool {
self.task.is_finished()
self.task.is_none() || self.task.as_ref().unwrap().is_finished()
}
}
@ -553,7 +618,7 @@ impl<T, U> DerefMut for Client<T, U> {
impl<T, U> From<Client<T, U>> for Channel<T, U> {
fn from(client: Client<T, U>) -> Self {
client.channel
client.clone_channel()
}
}
@ -1238,6 +1303,7 @@ mod tests {
max_retries: Some(3),
timeout: None,
},
..Default::default()
},
);

@ -10,6 +10,10 @@ pub struct ClientConfig {
/// Strategy to use when reconnecting to a server.
pub reconnect_strategy: ReconnectStrategy,
/// If true, the client will shutdown its internal task once dropped, resulting in all channels
/// no longer receiving data.
pub shutdown_on_drop: bool,
/// A maximum duration to not receive any response/heartbeat from a server before deeming the
/// server as lost and triggering a reconnect.
pub silence_duration: Duration,
@ -19,6 +23,7 @@ impl ClientConfig {
pub fn with_maximum_silence_duration(self) -> Self {
Self {
reconnect_strategy: self.reconnect_strategy,
shutdown_on_drop: self.shutdown_on_drop,
silence_duration: MAXIMUM_SILENCE_DURATION,
}
}
@ -28,6 +33,7 @@ impl Default for ClientConfig {
fn default() -> Self {
Self {
reconnect_strategy: ReconnectStrategy::Fail,
shutdown_on_drop: false,
silence_duration: DEFAULT_SILENCE_DURATION,
}
}

@ -5,14 +5,13 @@ use async_compat::CompatExt;
use async_once_cell::OnceCell;
use async_trait::async_trait;
use distant_core::{
data::Environment,
net::{
client::{Client, ClientConfig},
common::authentication::{AuthHandlerMap, DummyAuthHandler, Verifier},
common::{InmemoryTransport, OneshotListener},
common::{Host, InmemoryTransport, OneshotListener},
server::{Server, ServerRef},
},
DistantApiServerHandler, DistantChannelExt, DistantClient, DistantSingleKeyCredentials,
DistantApiServerHandler, DistantClient, DistantSingleKeyCredentials,
};
use log::*;
use smol::channel::Receiver as SmolReceiver;
@ -25,7 +24,10 @@ use std::{
str::FromStr,
time::Duration,
};
use wezterm_ssh::{Config as WezConfig, Session as WezSession, SessionEvent as WezSessionEvent};
use wezterm_ssh::{
ChildKiller, Config as WezConfig, MasterPty, PtySize, Session as WezSession,
SessionEvent as WezSessionEvent,
};
mod api;
mod process;
@ -207,10 +209,6 @@ pub struct DistantLaunchOpts {
/// Arguments to supply to the distant server when starting it
pub args: String,
/// If true, launches via `echo distant listen ... | $SHELL -l`, otherwise attempts to launch
/// by directly invoking distant
pub use_login_shell: bool,
/// Timeout to use when connecting to the distant server
pub timeout: Duration,
}
@ -220,7 +218,6 @@ impl Default for DistantLaunchOpts {
Self {
binary: String::from("distant"),
args: String::new(),
use_login_shell: false,
timeout: Duration::from_secs(15),
}
}
@ -448,11 +445,13 @@ impl Ssh {
while let Ok(event) = self.events.recv().await {
match event {
WezSessionEvent::Banner(banner) => {
trace!("ssh banner: {banner:?}");
if let Some(banner) = banner {
handler.on_banner(banner.as_ref()).await;
}
}
WezSessionEvent::HostVerify(verify) => {
trace!("ssh host verify: {verify:?}");
let verified = handler.on_verify_host(verify.message.as_str()).await?;
verify
.answer(verified)
@ -461,6 +460,7 @@ impl Ssh {
.map_err(|x| io::Error::new(io::ErrorKind::Other, x))?;
}
WezSessionEvent::Authenticate(mut auth) => {
trace!("ssh authenticate: {auth:?}");
let ev = SshAuthEvent {
username: auth.username.clone(),
instructions: auth.instructions.clone(),
@ -481,10 +481,14 @@ impl Ssh {
.map_err(|x| io::Error::new(io::ErrorKind::Other, x))?;
}
WezSessionEvent::Error(err) => {
trace!("ssh error: {err:?}");
handler.on_error(&err).await;
return Err(io::Error::new(io::ErrorKind::PermissionDenied, err));
}
WezSessionEvent::Authenticated => break,
WezSessionEvent::Authenticated => {
trace!("ssh authenticated");
break;
}
}
}
@ -603,10 +607,17 @@ impl Ssh {
let family = self.detect_family().await?;
trace!("Detected family: {}", family.as_static_str());
let host = self.host().to_string();
let host = self
.host()
.parse::<Host>()
.map_err(|x| io::Error::new(io::ErrorKind::InvalidInput, x))?;
// Turn our ssh connection into a client/server pair so we can use it to spawn our server
let (mut client, server) = self.into_distant_pair().await?;
let (mut pty, mut child) = self
.session
.request_pty("xterm-256color", PtySize::default(), None, None)
.compat()
.await
.map_err(utils::to_other_error)?;
// Build arguments for distant to execute listen subcommand
let mut args = vec![
@ -622,70 +633,86 @@ impl Ssh {
.map_err(|x| io::Error::new(io::ErrorKind::InvalidInput, x))?,
});
// If we are using a login shell, we need to make the binary be sh so we can appropriately
// pipe into the login shell. This is only available on unix.
let cmd = match family {
SshFamily::Unix if opts.use_login_shell => format!(
"sh -c {}",
shell_words::quote(&format!(
"echo {} {} | $SHELL -l",
opts.binary,
args.join(" ")
))
),
_ => format!("{} {}", opts.binary, args.join(" ")),
};
// Write our command to stdin of pty to execute it
let cmd = format!("{} {}", opts.binary, args.join(" "));
debug!("Executing {cmd}");
pty.write_all(format!("{cmd}\r\n").as_bytes())?;
// Get credentials from execution
let credentials = {
// Spawn a blocking thread to continually read stdout from the pty
let mut reader = pty.try_clone_reader().map_err(utils::to_other_error)?;
let (tx, mut rx) = tokio::sync::mpsc::channel::<Vec<u8>>(1);
let read_task = tokio::task::spawn_blocking(move || {
let mut buf = [0u8; 1024];
while let Ok(n) = reader.read(&mut buf) {
if n == 0 {
break;
}
let _ = tx.blocking_send(buf[..n].to_vec());
}
});
// Spawn an async task to read the forwarded stdout and attempt to detect credentials
// from the received stdout thus far. This will fail after waiting at least as long as
// the configured timeout duration.
//
// NOTE: We don't use `tokio::time::timeout` so we can capture and report back the
// stdout in the case of an error. Since there is no way easy way to know if the
// executed command on the pty failed, we rely on a timeout.
let start_instant = std::time::Instant::now();
let timeout = opts.timeout;
tokio::spawn(async move {
let mut stdout = Vec::new();
loop {
// Continually process received stdout
while let Ok(bytes) = rx.try_recv() {
trace!("Received {} more bytes over stdout", bytes.len());
stdout.extend_from_slice(&bytes);
if let Some(mut credentials) =
DistantSingleKeyCredentials::find_lax(&String::from_utf8_lossy(&stdout))
{
credentials.host = host;
read_task.abort();
return Ok(credentials);
}
}
// Spawn distant server and detach it so that we don't kill it when the
// ssh client is closed
debug!("Executing {}", cmd);
let output = client.output(cmd, Environment::new(), None, None).await?;
debug!(
"Completed with success = {}, code = {:?}",
output.success, output.code
);
// We have waited at least as long as our timeout, so we fail
if start_instant.elapsed() >= timeout {
// Clean the bytes before including by removing anything that isn't ascii
// and isn't a control character (except whitespace)
stdout.retain(|b| {
b.is_ascii() && (b.is_ascii_whitespace() || !b.is_ascii_control())
});
read_task.abort();
return Err(io::Error::new(
io::ErrorKind::BrokenPipe,
format!(
"Failed to spawn server: '{}'",
shell_words::quote(&String::from_utf8_lossy(&stdout))
),
));
}
// Close out ssh client by killing the internal server and client
server.shutdown();
client.abort();
let _ = client.wait().await;
// If successful, grab the client information and establish a connection
// with the distant server
if output.success {
// Iterate over output as individual lines, looking for client info
trace!("Searching for credentials");
match DistantSingleKeyCredentials::find(&String::from_utf8_lossy(&output.stdout)) {
Some(mut info) => {
info.host = host
.parse()
.map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x))?;
Ok(info)
// Otherwise, wait some period of time before trying again
tokio::time::sleep(Duration::from_millis(50)).await;
}
None => Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"Missing launch information: '{}'",
String::from_utf8_lossy(&output.stdout)
),
)),
}
} else {
Err(io::Error::new(
io::ErrorKind::Other,
format!(
"Spawning distant failed [{}]: {}",
output
.code
.map(|x| x.to_string())
.unwrap_or_else(|| String::from("???")),
match String::from_utf8(output.stderr) {
Ok(output) => output,
Err(x) => x.to_string(),
}
),
))
}
})
};
// Wait a maximum amount of time before failing
trace!("Waiting for credentials to appear");
let credentials = credentials.await??;
debug!("Got credentials");
// Attempt to kill the pty, but don't block if it fails
drop(pty);
let _ = child.kill();
Ok(credentials)
}
/// Consume [`Ssh`] and produce a [`DistantClient`] that is powered by an ssh client

@ -6,6 +6,7 @@ use derive_more::Display;
use derive_more::{Deref, DerefMut};
use distant_core::DistantClient;
use distant_ssh2::{DistantLaunchOpts, Ssh, SshAuthEvent, SshAuthHandler, SshOpts};
use log::*;
use once_cell::sync::Lazy;
use rstest::*;
use std::{
@ -19,7 +20,7 @@ use std::{
Mutex,
},
thread,
time::Duration,
time::{Duration, Instant},
};
#[cfg(unix)]
@ -27,11 +28,11 @@ use std::os::unix::fs::PermissionsExt;
#[derive(Deref, DerefMut)]
pub struct Ctx<T> {
pub sshd: Sshd,
#[deref]
#[deref_mut]
pub value: T,
pub sshd: Sshd,
}
// NOTE: Should find path
@ -52,6 +53,8 @@ const WAIT_AFTER_SPAWN: Duration = Duration::from_millis(300);
/// Maximum times to retry spawning sshd when it fails
const SPAWN_RETRY_CNT: usize = 3;
const MAX_DROP_WAIT_TIME: Duration = Duration::from_millis(500);
pub struct SshKeygen;
impl SshKeygen {
@ -421,7 +424,7 @@ impl Sshd {
// Otherwise, try next port
Err(_) | Ok(Err(_)) => {
eprintln!("sshd could not spawn on port {port}, so trying next port");
error!("sshd could not spawn on port {port}, so trying next port");
continue;
}
}
@ -477,7 +480,7 @@ impl Sshd {
true
}
Ok(Err((code, msg))) => {
eprintln!(
error!(
"sshd died w/ exit code {}: {msg}",
if let Some(code) = code {
code.to_string()
@ -488,41 +491,47 @@ impl Sshd {
false
}
Err(x) => {
eprintln!("Failed to check status of sshd: {x}");
error!("Failed to check status of sshd: {x}");
false
}
}
} else {
eprintln!("sshd is dead!");
error!("sshd is dead!");
false
}
}
fn print_log_file(&self) {
if let Ok(log) = std::fs::read_to_string(&self.log_file) {
eprintln!();
eprintln!("====================");
eprintln!("= SSHD LOG FILE ");
eprintln!("====================");
eprintln!();
eprintln!("{log}");
eprintln!();
eprintln!("====================");
eprintln!();
let mut out = String::new();
out.push('\n');
out.push_str("====================\n");
out.push_str("= SSHD LOG FILE \n");
out.push_str("====================\n");
out.push('\n');
out.push_str(&log);
out.push('\n');
out.push('\n');
out.push_str("====================\n");
out.push('\n');
error!("{out}");
}
}
fn print_config_file(&self) {
if let Ok(contents) = std::fs::read_to_string(&self.config_file) {
eprintln!();
eprintln!("====================");
eprintln!("= SSHD CONFIG FILE ");
eprintln!("====================");
eprintln!();
eprintln!("{contents}");
eprintln!();
eprintln!("====================");
eprintln!();
let mut out = String::new();
out.push('\n');
out.push_str("====================\n");
out.push_str("= SSHD CONFIG FILE \n");
out.push_str("====================\n");
out.push('\n');
out.push_str(&contents);
out.push('\n');
out.push('\n');
out.push_str("====================\n");
out.push('\n');
error!("{out}");
}
}
}
@ -530,9 +539,27 @@ impl Sshd {
impl Drop for Sshd {
/// Kills server upon drop
fn drop(&mut self) {
debug!("Dropping sshd");
if let Some(mut child) = self.child.lock().unwrap().take() {
let _ = child.kill();
let _ = child.wait();
// Wait for a maximum period of time
let start = Instant::now();
while start.elapsed() < MAX_DROP_WAIT_TIME {
match child.try_wait() {
Ok(Some(_)) => {
debug!("Sshd finished");
return;
}
Err(x) => {
error!("Failed to wait for sshd to quit: {x}");
return;
}
_ => thread::sleep(MAX_DROP_WAIT_TIME / 10),
}
}
error!("Timed out waiting for sshd to quit");
}
}
}
@ -543,21 +570,21 @@ pub struct MockSshAuthHandler;
#[async_trait]
impl SshAuthHandler for MockSshAuthHandler {
async fn on_authenticate(&self, event: SshAuthEvent) -> io::Result<Vec<String>> {
eprintln!("on_authenticate: {:?}", event);
debug!("on_authenticate: {:?}", event);
Ok(vec![String::new(); event.prompts.len()])
}
async fn on_verify_host(&self, host: &str) -> io::Result<bool> {
eprintln!("on_host_verify: {}", host);
debug!("on_host_verify: {}", host);
Ok(true)
}
async fn on_banner(&self, text: &str) {
eprintln!("on_banner: {:?}", text);
debug!("on_banner: {:?}", text);
}
async fn on_error(&self, text: &str) {
eprintln!("on_error: {:?}", text);
debug!("on_error: {:?}", text);
}
}
@ -603,11 +630,12 @@ pub fn sshd() -> Sshd {
#[fixture]
pub async fn client(sshd: Sshd) -> Ctx<DistantClient> {
let ssh_client = load_ssh_client(&sshd).await;
let client = ssh_client
let mut client = ssh_client
.into_distant_client()
.await
.context("Failed to convert into distant client")
.unwrap();
client.shutdown_on_drop(true);
Ctx {
sshd,
value: client,
@ -618,27 +646,28 @@ pub async fn client(sshd: Sshd) -> Ctx<DistantClient> {
#[fixture]
pub async fn launched_client(sshd: Sshd) -> Ctx<DistantClient> {
let binary = std::env::var("DISTANT_PATH").unwrap_or_else(|_| String::from("distant"));
eprintln!("Setting path to distant binary as {binary}");
debug!("Setting path to distant binary as {binary}");
// Attempt to launch the server and connect to it, using $DISTANT_PATH as the path to the
// binary if provided, defaulting to assuming the binary is on our ssh path otherwise
//
// NOTE: Wrapping in ctx does not fully clean up the test as the launched distant server
// is not cleaned up during drop. We don't know what the server's pid is, so our
// only option would be to look up all running distant servers and kill them on drop,
// but that would cause other tests to fail.
//
// Setting an expiration of 1s would clean up running servers and possibly be good enough
let ssh_client = load_ssh_client(&sshd).await;
let client = ssh_client
let mut client = ssh_client
.launch_and_connect(DistantLaunchOpts {
binary,
args: "--shutdown after=10".to_string(),
args: "--shutdown lonely=10".to_string(),
..Default::default()
})
.await
.context("Failed to launch and connect to distant server")
.unwrap();
// TODO: Wrapping in ctx does not fully clean up the test as the launched distant server
// is not cleaned up during drop. We don't know what the server's pid is, so our
// only option would be to look up all running distant servers and kill them on drop,
// but that would cause other tests to fail.
//
// Setting an expiration of 1s would clean up running servers and possibly be good enough
client.shutdown_on_drop(true);
Ctx {
sshd,
value: client,
@ -703,7 +732,7 @@ async fn load_ssh_client(sshd: &Sshd) -> Ssh {
// Check if still alive, which will print out messages
if sshd.check_is_alive() {
eprintln!("sshd is still alive, so something else is going on");
warn!("sshd is still alive, so something else is going on");
}
// We want to print out the log file from sshd in case it sheds clues on problem

@ -178,7 +178,6 @@ impl LaunchHandler for SshLaunchHandler {
DistantLaunchOpts {
binary: config.distant.bin.unwrap_or(opts.binary),
args: config.distant.args.unwrap_or(opts.args),
use_login_shell: !config.distant.no_shell,
timeout: match options.get("timeout") {
Some(s) => std::time::Duration::from_millis(
s.parse::<u64>().map_err(|_| invalid("timeout"))?,

@ -28,10 +28,6 @@ impl From<Map> for ClientLaunchConfig {
.remove("distant.bind_server")
.and_then(|x| x.parse::<BindAddress>().ok()),
args: map.remove("distant.args"),
no_shell: map
.remove("distant.no_shell")
.and_then(|x| x.parse::<bool>().ok())
.unwrap_or_default(),
},
options: map,
}
@ -54,11 +50,6 @@ impl From<ClientLaunchConfig> for Map {
this.insert("distant.args".to_string(), x);
}
this.insert(
"distant.no_shell".to_string(),
config.distant.no_shell.to_string(),
);
this.extend(config.options);
this
@ -90,8 +81,4 @@ pub struct ClientLaunchDistantConfig {
/// Additional arguments to provide to the server
#[clap(name = "distant-args", long, allow_hyphen_values(true))]
pub args: Option<String>,
/// If specified, will not launch distant using a login shell but instead execute it directly
#[clap(long)]
pub no_shell: bool,
}

@ -130,7 +130,9 @@ impl DistantManagerCtx {
let mut buf = [0u8; 1024];
while let Ok(n) = reader.read(&mut buf) {
lines.push_str(&String::from_utf8_lossy(&buf[..n]));
if let Some(credentials) = DistantSingleKeyCredentials::find(&lines) {
if let Some(credentials) =
DistantSingleKeyCredentials::find(&lines, /* strict */ false)
{
return credentials;
}
}

Loading…
Cancel
Save