From c2e588544f08e294d76b6117946d44d398dcb8ad Mon Sep 17 00:00:00 2001 From: Chip Senkbeil Date: Sun, 10 Oct 2021 23:09:42 -0500 Subject: [PATCH] Update wezterm-ssh dep to 0.2.0, fix ssh -> distant session, refactor session to have optional details included --- Cargo.lock | 47 ++------------ distant-core/src/client/session/mod.rs | 85 ++++++++++++++++++++++---- distant-core/src/net/transport/mod.rs | 24 ++++++++ distant-lua/src/session.rs | 12 ++-- distant-ssh2/Cargo.toml | 2 +- distant-ssh2/src/handler.rs | 11 +++- distant-ssh2/src/lib.rs | 78 +++++++++++++++++++++-- distant-ssh2/tests/sshd.rs | 2 +- src/subcommand/mod.rs | 5 +- 9 files changed, 196 insertions(+), 70 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7c3c671..9af6fb7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -183,14 +183,6 @@ version = "4.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0" -[[package]] -name = "async_ossl" -version = "0.1.0" -source = "git+https://github.com/chipsenkbeil/wezterm#f25fbb737563666006c166233cab10daf853a3b1" -dependencies = [ - "openssl", -] - [[package]] name = "atomic-waker" version = "1.0.0" @@ -580,7 +572,8 @@ dependencies = [ [[package]] name = "filedescriptor" version = "0.8.1" -source = "git+https://github.com/chipsenkbeil/wezterm#f25fbb737563666006c166233cab10daf853a3b1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed3d8a5e20435ff00469e51a0d82049bae66504b5c429920dadf9bb54d47b3f" dependencies = [ "libc", "thiserror", @@ -647,21 +640,6 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" -[[package]] -name = "foreign-types" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" -dependencies = [ - "foreign-types-shared", -] - -[[package]] -name = "foreign-types-shared" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" - [[package]] name = "fork" version = "0.1.18" @@ -1128,20 +1106,6 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" -[[package]] -name = "openssl" -version = "0.10.36" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d9facdb76fec0b73c406f125d44d86fdad818d66fef0531eec9233ca425ff4a" -dependencies = [ - "bitflags", - "cfg-if", - "foreign-types", - "libc", - "once_cell", - "openssl-sys", -] - [[package]] name = "openssl-src" version = "111.16.0+1.1.1l" @@ -1247,7 +1211,8 @@ dependencies = [ [[package]] name = "portable-pty" version = "0.5.0" -source = "git+https://github.com/chipsenkbeil/wezterm#f25fbb737563666006c166233cab10daf853a3b1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b8383c3934bd6da733223ad1b22f8102c46e8bbced07800b2346cc34326ff83" dependencies = [ "anyhow", "bitflags", @@ -2053,10 +2018,10 @@ dependencies = [ [[package]] name = "wezterm-ssh" version = "0.2.0" -source = "git+https://github.com/chipsenkbeil/wezterm#f25fbb737563666006c166233cab10daf853a3b1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e233b00aaa22b6ef9b573534e529e9672da8662a8355c37acacedd88741ce1ec" dependencies = [ "anyhow", - "async_ossl", "base64", "bitflags", "camino", diff --git a/distant-core/src/client/session/mod.rs b/distant-core/src/client/session/mod.rs index 4a3c0e1..fc3d919 100644 --- a/distant-core/src/client/session/mod.rs +++ b/distant-core/src/client/session/mod.rs @@ -5,10 +5,12 @@ use crate::{ net::{Codec, DataStream, Transport, TransportError}, }; use log::*; +use serde::{Deserialize, Serialize}; use std::{ convert, net::SocketAddr, ops::{Deref, DerefMut}, + path::{Path, PathBuf}, sync::{Arc, Weak}, }; use tokio::{ @@ -29,13 +31,54 @@ mod mailbox; pub use mailbox::Mailbox; use mailbox::PostOffice; +/// Details about the session +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum SessionDetails { + /// Indicates session is a TCP type + Tcp { addr: SocketAddr, tag: String }, + + /// Indicates session is a Unix socket type + Socket { path: PathBuf, tag: String }, + + /// Indicates session type is inmemory + Inmemory { tag: String }, +} + +impl SessionDetails { + /// Represents the tag associated with the session + pub fn tag(&self) -> &str { + match self { + Self::Tcp { tag, .. } => tag.as_str(), + Self::Socket { tag, .. } => tag.as_str(), + Self::Inmemory { tag } => tag.as_str(), + } + } + + /// Represents the socket address associated with the session, if it has one + pub fn addr(&self) -> Option { + match self { + Self::Tcp { addr, .. } => Some(*addr), + _ => None, + } + } + + /// Represents the path associated with the session, if it has one + pub fn path(&self) -> Option<&Path> { + match self { + Self::Socket { path, .. } => Some(path.as_path()), + _ => None, + } + } +} + /// Represents a session with a remote server that can be used to send requests & receive responses pub struct Session { /// Used to send requests to a server channel: SessionChannel, - /// Textual description of the underlying connection - connection_tag: String, + /// Details about the session + details: Option, /// Contains the task that is running to send requests to a server request_task: JoinHandle<()>, @@ -54,6 +97,10 @@ impl Session { U: Codec + Send + 'static, { let transport = Transport::::connect(addr, codec).await?; + let details = SessionDetails::Tcp { + addr, + tag: transport.to_connection_tag(), + }; debug!( "Session has been established with {}", transport @@ -61,7 +108,7 @@ impl Session { .map(|x| x.to_string()) .unwrap_or_else(|_| String::from("???")) ); - Self::initialize(transport) + Self::initialize_with_details(transport, Some(details)) } /// Connect to a remote TCP server, timing out after duration has passed @@ -86,7 +133,12 @@ impl Session { where U: Codec + Send + 'static, { - let transport = Transport::::connect(path, codec).await?; + let p = path.as_ref(); + let transport = Transport::::connect(p, codec).await?; + let details = SessionDetails::Socket { + path: p.to_path_buf(), + tag: transport.to_connection_tag(), + }; debug!( "Session has been established with {}", transport @@ -94,7 +146,7 @@ impl Session { .map(|x| format!("{:?}", x)) .unwrap_or_else(|_| String::from("???")) ); - Self::initialize(transport) + Self::initialize_with_details(transport, Some(details)) } /// Connect to a proxy unix socket, timing out after duration has passed @@ -113,13 +165,24 @@ impl Session { } impl Session { - /// Initializes a session using the provided transport + /// Initializes a session using the provided transport and no extra details pub fn initialize(transport: Transport) -> io::Result where T: DataStream, U: Codec + Send + 'static, { - let connection_tag = transport.to_connection_tag(); + Self::initialize_with_details(transport, None) + } + + /// Initializes a session using the provided transport and extra details + pub fn initialize_with_details( + transport: Transport, + details: Option, + ) -> io::Result + where + T: DataStream, + U: Codec + Send + 'static, + { let (mut t_read, mut t_write) = transport.into_split(); let post_office = Arc::new(Mutex::new(PostOffice::new())); let weak_post_office = Arc::downgrade(&post_office); @@ -190,7 +253,7 @@ impl Session { Ok(Self { channel, - connection_tag, + details, request_task, response_task, prune_task, @@ -199,9 +262,9 @@ impl Session { } impl Session { - /// Returns a textual description of the underlying connection - pub fn connection_tag(&self) -> &str { - &self.connection_tag + /// Returns details about the session, if it has any + pub fn details(&self) -> Option<&SessionDetails> { + self.details.as_ref() } /// Waits for the session to terminate, which results when the receiving end of the network diff --git a/distant-core/src/net/transport/mod.rs b/distant-core/src/net/transport/mod.rs index 840f08e..7ccf66e 100644 --- a/distant-core/src/net/transport/mod.rs +++ b/distant-core/src/net/transport/mod.rs @@ -89,6 +89,30 @@ where self.0.get_ref().to_connection_tag() } + /// Returns a reference to the underlying I/O stream + /// + /// Note that care should be taken to not tamper with the underlying stream of data coming in + /// as it may corrupt the stream of frames otherwise being worked with + pub fn get_ref(&self) -> &T { + self.0.get_ref() + } + + /// Returns a reference to the underlying I/O stream + /// + /// Note that care should be taken to not tamper with the underlying stream of data coming in + /// as it may corrupt the stream of frames otherwise being worked with + pub fn get_mut(&mut self) -> &mut T { + self.0.get_mut() + } + + /// Consumes the transport, returning its underlying I/O stream + /// + /// Note that care should be taken to not tamper with the underlying stream of data coming in + /// as it may corrupt the stream of frames otherwise being worked with. + pub fn into_inner(self) -> T { + self.0.into_inner() + } + /// Splits transport into read and write halves pub fn into_split( self, diff --git a/distant-lua/src/session.rs b/distant-lua/src/session.rs index d060359..5b8b631 100644 --- a/distant-lua/src/session.rs +++ b/distant-lua/src/session.rs @@ -1,6 +1,6 @@ use crate::{runtime, utils}; use distant_core::{ - SecretKey32, Session as DistantSession, SessionChannel, XChaCha20Poly1305Codec, + SecretKey32, Session as DistantSession, SessionChannel, SessionDetails, XChaCha20Poly1305Codec, }; use distant_ssh2::{IntoDistantSessionOpts, Ssh2Session}; use log::*; @@ -118,8 +118,8 @@ fn with_session(id: usize, f: impl FnOnce(&DistantSession) -> T) -> LuaResult Ok(f(session)) } -fn get_session_connection_tag(id: usize) -> LuaResult { - with_session(id, |session| session.connection_tag().to_string()) +fn get_session_details(id: usize) -> LuaResult> { + with_session(id, |session| session.details().cloned()) } fn get_session_channel(id: usize) -> LuaResult { @@ -181,7 +181,7 @@ impl Session { }) .await .to_lua_err()?, - Mode::Ssh => ssh_session.into_ssh_client_session().to_lua_err()?, + Mode::Ssh => ssh_session.into_ssh_client_session().await.to_lua_err()?, }; // Fourth, store our current session in our global map and then return a reference @@ -276,8 +276,8 @@ macro_rules! impl_methods { impl UserData for Session { fn add_fields<'lua, F: UserDataFields<'lua, Self>>(fields: &mut F) { fields.add_field_method_get("id", |_, this| Ok(this.id)); - fields.add_field_method_get("connection_tag", |_, this| { - get_session_connection_tag(this.id) + fields.add_field_method_get("details", |lua, this| { + get_session_details(this.id).and_then(|x| to_value!(lua, &x)) }); } diff --git a/distant-ssh2/Cargo.toml b/distant-ssh2/Cargo.toml index a624a35..9c07532 100644 --- a/distant-ssh2/Cargo.toml +++ b/distant-ssh2/Cargo.toml @@ -20,7 +20,7 @@ rpassword = "5.0.1" shell-words = "1.0" smol = "1.2" tokio = { version = "1.12.0", features = ["full"] } -wezterm-ssh = { version = "0.2.0", features = ["vendored-openssl"], git = "https://github.com/chipsenkbeil/wezterm" } +wezterm-ssh = { version = "0.2.0", features = ["vendored-openssl"] } # Optional serde support for data structures serde = { version = "1.0.126", features = ["derive"], optional = true } diff --git a/distant-ssh2/src/handler.rs b/distant-ssh2/src/handler.rs index 045a2a4..f51e080 100644 --- a/distant-ssh2/src/handler.rs +++ b/distant-ssh2/src/handler.rs @@ -14,7 +14,9 @@ use std::{ sync::Arc, }; use tokio::sync::{mpsc, Mutex}; -use wezterm_ssh::{Child, ExecResult, OpenFileType, OpenOptions, Session as WezSession, WriteMode}; +use wezterm_ssh::{ + Child, ExecResult, FilePermissions, OpenFileType, OpenOptions, Session as WezSession, WriteMode, +}; const MAX_PIPE_CHUNK_SIZE: usize = 8192; const READ_PAUSE_MILLIS: u64 = 50; @@ -588,9 +590,12 @@ async fn metadata( Ok(Outgoing::from(ResponseData::Metadata(Metadata { canonicalized_path, file_type, - len: metadata.len(), + len: metadata.size.unwrap_or(0), // Check that owner, group, or other has write permission (if not, then readonly) - readonly: metadata.is_readonly(), + readonly: metadata + .permissions + .map(FilePermissions::is_readonly) + .unwrap_or(true), accessed: metadata.accessed.map(u128::from), modified: metadata.modified.map(u128::from), created: None, diff --git a/distant-ssh2/src/lib.rs b/distant-ssh2/src/lib.rs index 0794616..e8582fa 100644 --- a/distant-ssh2/src/lib.rs +++ b/distant-ssh2/src/lib.rs @@ -1,12 +1,14 @@ use async_compat::CompatExt; use distant_core::{ - Request, Session, SessionChannelExt, SessionInfo, Transport, XChaCha20Poly1305Codec, + Request, Session, SessionChannelExt, SessionDetails, SessionInfo, Transport, + XChaCha20Poly1305Codec, }; use log::*; use smol::channel::Receiver as SmolReceiver; use std::{ collections::BTreeMap, io::{self, Write}, + net::{IpAddr, SocketAddr}, path::PathBuf, sync::Arc, time::Duration, @@ -187,6 +189,8 @@ impl Default for Ssh2AuthHandler<'static> { pub struct Ssh2Session { session: WezSession, events: SmolReceiver, + host: String, + port: u16, authenticated: bool, } @@ -243,6 +247,13 @@ impl Ssh2Session { // Add in any of the other options provided config.extend(opts.other); + // Port should always exist, otherwise WezSession will panic from unwrap() + let port = config + .get("port") + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "Missing port"))? + .parse::() + .map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x))?; + // Establish a connection let (session, events) = WezSession::connect(config).map_err(|x| io::Error::new(io::ErrorKind::Other, x))?; @@ -250,10 +261,22 @@ impl Ssh2Session { Ok(Self { session, events, + host: host.as_ref().to_string(), + port, authenticated: false, }) } + /// Host this session is connected to + pub fn host(&self) -> &str { + &self.host + } + + /// Port this session is connected to on remote host + pub fn port(&self) -> u16 { + self.port + } + #[inline] pub fn is_authenticated(&self) -> bool { self.authenticated @@ -328,7 +351,23 @@ impl Ssh2Session { )); } - let mut session = self.into_ssh_client_session()?; + // Determine distinct candidate ip addresses for connecting + let mut candidate_ips = tokio::net::lookup_host(format!("{}:{}", self.host, self.port)) + .await? + .into_iter() + .map(|addr| addr.ip()) + .collect::>(); + candidate_ips.sort_unstable(); + candidate_ips.dedup(); + if candidate_ips.is_empty() { + return Err(io::Error::new( + io::ErrorKind::AddrNotAvailable, + format!("Unable to resolve {}:{}", self.host, self.port), + )); + } + + // Turn our ssh connection into a client session so we can use it to spawn our server + let mut session = self.into_ssh_client_session().await?; // Build arguments for distant let mut args = vec![String::from("listen"), String::from("--daemon")]; @@ -358,6 +397,7 @@ impl Ssh2Session { // Spawn distant server and detach it so that we don't kill it when the // ssh session is closed + debug!("Executing {} {}", bin, args.join(" ")); let mut proc = session .spawn("", bin, args, true) .await @@ -380,15 +420,29 @@ impl Ssh2Session { while let Ok(data) = stdout.read().await { out.push_str(&data); } + let maybe_info = out .lines() .find_map(|line| line.parse::().ok()); match maybe_info { Some(info) => { - let addr = info.to_socket_addr().await?; let key = info.key; let codec = XChaCha20Poly1305Codec::from(key); - Session::tcp_connect_timeout(addr, codec, opts.timeout).await + + // Try each IP address with the same port to see if one works + let mut err = None; + for ip in candidate_ips { + let addr = SocketAddr::new(ip, info.port); + debug!("Attempting to connect to distant server @ {}", addr); + match Session::tcp_connect_timeout(addr, codec.clone(), opts.timeout).await + { + Ok(session) => return Ok(session), + Err(x) => err = Some(x), + } + } + + // If all failed, return the last error we got + Err(err.expect("Err set above")) } None => Err(io::Error::new( io::ErrorKind::InvalidData, @@ -415,7 +469,7 @@ impl Ssh2Session { /// Consume [`Ssh2Session`] and produce a distant [`Session`] that is powered by an ssh client /// underneath - pub fn into_ssh_client_session(self) -> io::Result { + pub async fn into_ssh_client_session(self) -> io::Result { // Exit early if not authenticated as this is a requirement if !self.authenticated { return Err(io::Error::new( @@ -425,7 +479,19 @@ impl Ssh2Session { } let (t1, t2) = Transport::pair(1); - let session = Session::initialize(t1)?; + let addr = tokio::net::lookup_host(format!("{}:{}", self.host, self.port)) + .await? + .next() + .ok_or_else(|| { + io::Error::new( + io::ErrorKind::AddrNotAvailable, + format!("Failed to resolve host: {}", self.host), + ) + })?; + let tag = t1.to_connection_tag(); + + let session = + Session::initialize_with_details(t1, Some(SessionDetails::Tcp { addr, tag }))?; // Spawn tasks that forward requests to the ssh session // and send back responses from the ssh session diff --git a/distant-ssh2/tests/sshd.rs b/distant-ssh2/tests/sshd.rs index 7e6baa6..d1f3ea0 100644 --- a/distant-ssh2/tests/sshd.rs +++ b/distant-ssh2/tests/sshd.rs @@ -432,5 +432,5 @@ pub async fn session(sshd: &'_ Sshd, _logger: &'_ flexi_logger::LoggerHandle) -> .await .unwrap(); - ssh2_session.into_ssh_client_session().unwrap() + ssh2_session.into_ssh_client_session().await.unwrap() } diff --git a/src/subcommand/mod.rs b/src/subcommand/mod.rs index b4fbade..e09a964 100644 --- a/src/subcommand/mod.rs +++ b/src/subcommand/mod.rs @@ -66,7 +66,10 @@ impl CommandRunner { .await .map_err(wrap_err)?; - (session.into_ssh_client_session().map_err(wrap_err)?, None) + ( + session.into_ssh_client_session().await.map_err(wrap_err)?, + None, + ) } Method::Distant => {