diff --git a/Cargo.lock b/Cargo.lock index 4db1f73..ba61062 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -491,6 +491,7 @@ dependencies = [ "indoc", "log", "once_cell", + "portable-pty 0.7.0", "predicates", "rand", "serde", @@ -1256,6 +1257,24 @@ dependencies = [ "winapi", ] +[[package]] +name = "portable-pty" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e4d17ec050a6b7ea4b15c430183772bce8384072d3f328e0967e72b7eec46b5" +dependencies = [ + "anyhow", + "bitflags", + "filedescriptor", + "lazy_static", + "libc", + "log", + "serial", + "shared_library", + "shell-words", + "winapi", +] + [[package]] name = "ppv-lite86" version = "0.2.10" @@ -2049,7 +2068,7 @@ dependencies = [ "filedescriptor", "filenamegen", "log", - "portable-pty", + "portable-pty 0.5.0", "regex", "smol", "ssh2", diff --git a/distant-core/Cargo.toml b/distant-core/Cargo.toml index dd9dd7b..bc1c26f 100644 --- a/distant-core/Cargo.toml +++ b/distant-core/Cargo.toml @@ -20,6 +20,7 @@ futures = "0.3.16" hex = "0.4.3" log = "0.4.14" once_cell = "1.8.0" +portable-pty = "0.7.0" rand = { version = "0.8.4", features = ["getrandom"] } serde = { version = "1.0.126", features = ["derive"] } serde_json = "1.0.64" diff --git a/distant-core/src/client/lsp/data.rs b/distant-core/src/client/lsp/data.rs index 6591a51..1fa9936 100644 --- a/distant-core/src/client/lsp/data.rs +++ b/distant-core/src/client/lsp/data.rs @@ -178,6 +178,11 @@ impl LspData { Ok(Self { header, content }) } + + /// Converts into a vec of bytes representing the string format + pub fn to_bytes(&self) -> Vec { + self.to_string().into_bytes() + } } impl fmt::Display for LspData { diff --git a/distant-core/src/client/lsp/mod.rs b/distant-core/src/client/lsp/mod.rs index 27d8ac5..7b7bfc8 100644 --- a/distant-core/src/client/lsp/mod.rs +++ b/distant-core/src/client/lsp/mod.rs @@ -1,8 +1,7 @@ use super::{RemoteProcess, RemoteProcessError, RemoteStderr, RemoteStdin, RemoteStdout}; -use crate::client::SessionChannel; +use crate::{client::SessionChannel, data::PtySize}; use futures::stream::{Stream, StreamExt}; use std::{ - fmt::Write, io::{self, Cursor, Read}, ops::{Deref, DerefMut}, }; @@ -32,8 +31,9 @@ impl RemoteLspProcess { cmd: impl Into, args: Vec, detached: bool, + pty: Option, ) -> Result { - let mut inner = RemoteProcess::spawn(tenant, channel, cmd, args, detached).await?; + let mut inner = RemoteProcess::spawn(tenant, channel, cmd, args, detached, pty).await?; let stdin = inner.stdin.take().map(RemoteLspStdin::new); let stdout = inner.stdout.take().map(RemoteLspStdout::new); let stderr = inner.stderr.take().map(RemoteLspStderr::new); @@ -70,7 +70,7 @@ impl DerefMut for RemoteLspProcess { #[derive(Debug)] pub struct RemoteLspStdin { inner: RemoteStdin, - buf: Option, + buf: Option>, } impl RemoteLspStdin { @@ -79,7 +79,7 @@ impl RemoteLspStdin { } /// Tries to write data to the stdin of a specific remote process - pub fn try_write(&mut self, data: &str) -> io::Result<()> { + pub fn try_write(&mut self, data: &[u8]) -> io::Result<()> { let queue = self.update_and_read_messages(data)?; // Process and then send out each LSP message in our queue @@ -87,14 +87,18 @@ impl RemoteLspStdin { // Convert distant:// to file:// data.mut_content().convert_distant_scheme_to_local(); data.refresh_content_length(); - self.inner.try_write(&data.to_string())?; + self.inner.try_write_str(data.to_string())?; } Ok(()) } + pub fn try_write_str(&mut self, data: &str) -> io::Result<()> { + self.try_write(data.as_bytes()) + } + /// Writes data to the stdin of a specific remote process - pub async fn write(&mut self, data: &str) -> io::Result<()> { + pub async fn write(&mut self, data: &[u8]) -> io::Result<()> { let queue = self.update_and_read_messages(data)?; // Process and then send out each LSP message in our queue @@ -102,17 +106,21 @@ impl RemoteLspStdin { // Convert distant:// to file:// data.mut_content().convert_distant_scheme_to_local(); data.refresh_content_length(); - self.inner.write(&data.to_string()).await?; + self.inner.write_str(data.to_string()).await?; } Ok(()) } - fn update_and_read_messages(&mut self, data: &str) -> io::Result> { + pub async fn write_str(&mut self, data: &str) -> io::Result<()> { + self.write(data.as_bytes()).await + } + + fn update_and_read_messages(&mut self, data: &[u8]) -> io::Result> { // Create or insert into our buffer match &mut self.buf { - Some(buf) => buf.push_str(data), - None => self.buf = Some(data.to_string()), + Some(buf) => buf.extend(data), + None => self.buf = Some(data.to_vec()), } // Read LSP messages from our internal buffer @@ -137,7 +145,7 @@ impl RemoteLspStdin { #[derive(Debug)] pub struct RemoteLspStdout { read_task: JoinHandle<()>, - rx: mpsc::Receiver>, + rx: mpsc::Receiver>>, } impl RemoteLspStdout { @@ -157,7 +165,7 @@ impl RemoteLspStdout { /// Tries to read a complete LSP message over stdout, returning `None` if no complete message /// is available - pub fn try_read(&mut self) -> io::Result> { + pub fn try_read(&mut self) -> io::Result>> { match self.rx.try_recv() { Ok(Ok(data)) => Ok(Some(data)), Ok(Err(x)) => Err(x), @@ -166,13 +174,30 @@ impl RemoteLspStdout { } } + /// Same as `try_read`, but returns a string + pub fn try_read_string(&mut self) -> io::Result> { + self.try_read().and_then(|x| match x { + Some(data) => String::from_utf8(data) + .map(Some) + .map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x)), + None => Ok(None), + }) + } + /// Reads a complete LSP message over stdout - pub async fn read(&mut self) -> io::Result { + pub async fn read(&mut self) -> io::Result> { self.rx .recv() .await .ok_or_else(|| io::Error::from(io::ErrorKind::BrokenPipe))? } + + /// Same as `read`, but returns a string + pub async fn read_string(&mut self) -> io::Result { + self.read().await.and_then(|data| { + String::from_utf8(data).map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x)) + }) + } } impl Drop for RemoteLspStdout { @@ -186,7 +211,7 @@ impl Drop for RemoteLspStdout { #[derive(Debug)] pub struct RemoteLspStderr { read_task: JoinHandle<()>, - rx: mpsc::Receiver>, + rx: mpsc::Receiver>>, } impl RemoteLspStderr { @@ -206,7 +231,7 @@ impl RemoteLspStderr { /// Tries to read a complete LSP message over stderr, returning `None` if no complete message /// is available - pub fn try_read(&mut self) -> io::Result> { + pub fn try_read(&mut self) -> io::Result>> { match self.rx.try_recv() { Ok(Ok(data)) => Ok(Some(data)), Ok(Err(x)) => Err(x), @@ -215,13 +240,30 @@ impl RemoteLspStderr { } } + /// Same as `try_read`, but returns a string + pub fn try_read_string(&mut self) -> io::Result> { + self.try_read().and_then(|x| match x { + Some(data) => String::from_utf8(data) + .map(Some) + .map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x)), + None => Ok(None), + }) + } + /// Reads a complete LSP message over stderr - pub async fn read(&mut self) -> io::Result { + pub async fn read(&mut self) -> io::Result> { self.rx .recv() .await .ok_or_else(|| io::Error::from(io::ErrorKind::BrokenPipe))? } + + /// Same as `read`, but returns a string + pub async fn read_string(&mut self) -> io::Result { + self.read().await.and_then(|data| { + String::from_utf8(data).map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x)) + }) + } } impl Drop for RemoteLspStderr { @@ -231,18 +273,18 @@ impl Drop for RemoteLspStderr { } } -fn spawn_read_task(mut stream: S) -> (JoinHandle<()>, mpsc::Receiver>) +fn spawn_read_task(mut stream: S) -> (JoinHandle<()>, mpsc::Receiver>>) where - S: Stream + Send + Unpin + 'static, + S: Stream> + Send + Unpin + 'static, { - let (tx, rx) = mpsc::channel::>(1); + let (tx, rx) = mpsc::channel::>>(1); let read_task = tokio::spawn(async move { - let mut task_buf: Option = None; + let mut task_buf: Option> = None; while let Some(data) = stream.next().await { // Create or insert into our buffer match &mut task_buf { - Some(buf) => buf.push_str(&data), + Some(buf) => buf.extend(data), None => task_buf = Some(data), } @@ -259,12 +301,12 @@ where // Process and then add each LSP message as output if !queue.is_empty() { - let mut out = String::new(); + let mut out = Vec::new(); for mut data in queue { // Convert file:// to distant:// data.mut_content().convert_local_scheme_to_distant(); data.refresh_content_length(); - write!(&mut out, "{}", data).unwrap(); + out.extend(data.to_bytes()); } if tx.send(Ok(out)).await.is_err() { break; @@ -276,7 +318,7 @@ where (read_task, rx) } -fn read_lsp_messages(input: &str) -> io::Result<(Option, Vec)> { +fn read_lsp_messages(input: &[u8]) -> io::Result<(Option>, Vec)> { let mut queue = Vec::new(); // Continue to read complete messages from the input until we either fail to parse or we reach @@ -290,10 +332,10 @@ fn read_lsp_messages(input: &str) -> io::Result<(Option, Vec)> } cursor.set_position(pos); - // Keep remainder of string not processed as LSP message in buffer + // Keep remainder of bytes not processed as LSP message in buffer let remainder = if (cursor.position() as usize) < cursor.get_ref().len() { - let mut buf = String::new(); - cursor.read_to_string(&mut buf)?; + let mut buf = Vec::new(); + cursor.read_to_end(&mut buf)?; Some(buf) } else { None @@ -326,6 +368,7 @@ mod tests { String::from("cmd"), vec![String::from("arg")], false, + None, ) .await }); @@ -337,7 +380,7 @@ mod tests { t1.send(Response::new( "test-tenant", req.id, - vec![ResponseData::ProcStart { id: rand::random() }], + vec![ResponseData::ProcSpawned { id: rand::random() }], )) .await .unwrap(); @@ -347,12 +390,12 @@ mod tests { (t1, proc) } - fn make_lsp_msg(value: T) -> String + fn make_lsp_msg(value: T) -> Vec where T: serde::Serialize, { let content = serde_json::to_string_pretty(&value).unwrap(); - format!("Content-Length: {}\r\n\r\n{}", content.len(), content) + format!("Content-Length: {}\r\n\r\n{}", content.len(), content).into_bytes() } async fn timeout(duration: Duration, f: F) -> io::Result @@ -455,7 +498,7 @@ mod tests { proc.stdin .as_mut() .unwrap() - .write(&format!("{}{}", msg, extra)) + .write_str(&format!("{}{}", String::from_utf8(msg).unwrap(), extra)) .await .unwrap(); @@ -477,7 +520,7 @@ mod tests { // Also validate that the internal buffer still contains the extra assert_eq!( - proc.stdin.unwrap().buf.unwrap(), + String::from_utf8(proc.stdin.unwrap().buf.unwrap()).unwrap(), extra, "Extra was not retained" ); @@ -501,7 +544,11 @@ mod tests { proc.stdin .as_mut() .unwrap() - .write(&format!("{}{}", msg_1, msg_2)) + .write_str(&format!( + "{}{}", + String::from_utf8(msg_1).unwrap(), + String::from_utf8(msg_2).unwrap() + )) .await .unwrap(); @@ -620,7 +667,7 @@ mod tests { proc.origin_id(), vec![ResponseData::ProcStdout { id: proc.id(), - data: msg_a.to_string(), + data: msg_a.to_vec(), }], )) .await @@ -639,7 +686,7 @@ mod tests { proc.origin_id(), vec![ResponseData::ProcStdout { id: proc.id(), - data: msg_b.to_string(), + data: msg_b.to_vec(), }], )) .await @@ -674,7 +721,7 @@ mod tests { proc.origin_id(), vec![ResponseData::ProcStdout { id: proc.id(), - data: format!("{}{}", msg, extra), + data: format!("{}{}", String::from_utf8(msg).unwrap(), extra).into_bytes(), }], )) .await @@ -718,7 +765,12 @@ mod tests { proc.origin_id(), vec![ResponseData::ProcStdout { id: proc.id(), - data: format!("{}{}", msg_1, msg_2), + data: format!( + "{}{}", + String::from_utf8(msg_1).unwrap(), + String::from_utf8(msg_2).unwrap() + ) + .into_bytes(), }], )) .await @@ -730,15 +782,18 @@ mod tests { out, format!( "{}{}", - make_lsp_msg(serde_json::json!({ + String::from_utf8(make_lsp_msg(serde_json::json!({ "field1": "a", "field2": "b", - })), - make_lsp_msg(serde_json::json!({ + }))) + .unwrap(), + String::from_utf8(make_lsp_msg(serde_json::json!({ "field1": "c", "field2": "d", - })) + }))) + .unwrap() ) + .into_bytes() ); } @@ -821,7 +876,7 @@ mod tests { proc.origin_id(), vec![ResponseData::ProcStderr { id: proc.id(), - data: msg_a.to_string(), + data: msg_a.to_vec(), }], )) .await @@ -840,7 +895,7 @@ mod tests { proc.origin_id(), vec![ResponseData::ProcStderr { id: proc.id(), - data: msg_b.to_string(), + data: msg_b.to_vec(), }], )) .await @@ -875,7 +930,7 @@ mod tests { proc.origin_id(), vec![ResponseData::ProcStderr { id: proc.id(), - data: format!("{}{}", msg, extra), + data: format!("{}{}", String::from_utf8(msg).unwrap(), extra).into_bytes(), }], )) .await @@ -919,7 +974,12 @@ mod tests { proc.origin_id(), vec![ResponseData::ProcStderr { id: proc.id(), - data: format!("{}{}", msg_1, msg_2), + data: format!( + "{}{}", + String::from_utf8(msg_1).unwrap(), + String::from_utf8(msg_2).unwrap() + ) + .into_bytes(), }], )) .await @@ -931,15 +991,18 @@ mod tests { err, format!( "{}{}", - make_lsp_msg(serde_json::json!({ + String::from_utf8(make_lsp_msg(serde_json::json!({ "field1": "a", "field2": "b", - })), - make_lsp_msg(serde_json::json!({ + }))) + .unwrap(), + String::from_utf8(make_lsp_msg(serde_json::json!({ "field1": "c", "field2": "d", - })) + }))) + .unwrap() ) + .into_bytes() ); } diff --git a/distant-core/src/client/process.rs b/distant-core/src/client/process.rs index 5996295..a94dff1 100644 --- a/distant-core/src/client/process.rs +++ b/distant-core/src/client/process.rs @@ -1,7 +1,7 @@ use crate::{ client::{Mailbox, SessionChannel}, constants::CLIENT_PIPE_CAPACITY, - data::{Request, RequestData, ResponseData}, + data::{PtySize, Request, RequestData, ResponseData}, net::TransportError, }; use derive_more::{Display, Error, From}; @@ -79,6 +79,7 @@ impl RemoteProcess { cmd: impl Into, args: Vec, detached: bool, + pty: Option, ) -> Result { let tenant = tenant.into(); let cmd = cmd.into(); @@ -87,10 +88,11 @@ impl RemoteProcess { let mut mailbox = channel .mail(Request::new( tenant.as_str(), - vec![RequestData::ProcRun { + vec![RequestData::ProcSpawn { cmd, args, detached, + pty, }], )) .await?; @@ -105,7 +107,7 @@ impl RemoteProcess { Some(res) => { let origin_id = res.origin_id; match res.payload.into_iter().next().unwrap() { - ResponseData::ProcStart { id } => (id, origin_id), + ResponseData::ProcSpawned { id } => (id, origin_id), ResponseData::Error(x) => { return Err(RemoteProcessError::TransportError(TransportError::IoError( x.into(), @@ -243,13 +245,13 @@ impl RemoteProcess { /// A handle to a remote process' standard input (stdin) #[derive(Debug)] -pub struct RemoteStdin(mpsc::Sender); +pub struct RemoteStdin(mpsc::Sender>); impl RemoteStdin { /// Tries to write to the stdin of the remote process, returning ok if immediately /// successful, `WouldBlock` if would need to wait to send data, and `BrokenPipe` /// if stdin has been closed - pub fn try_write(&mut self, data: impl Into) -> io::Result<()> { + pub fn try_write(&mut self, data: impl Into>) -> io::Result<()> { match self.0.try_send(data.into()) { Ok(data) => Ok(data), Err(TrySendError::Full(_)) => Err(io::Error::from(io::ErrorKind::WouldBlock)), @@ -257,14 +259,24 @@ impl RemoteStdin { } } + /// Same as `try_write`, but with a string + pub fn try_write_str(&mut self, data: impl Into) -> io::Result<()> { + self.try_write(data.into().into_bytes()) + } + /// Writes data to the stdin of a specific remote process - pub async fn write(&mut self, data: impl Into) -> io::Result<()> { + pub async fn write(&mut self, data: impl Into>) -> io::Result<()> { self.0 .send(data.into()) .await .map_err(|x| io::Error::new(io::ErrorKind::BrokenPipe, x)) } + /// Same as `write`, but with a string + pub async fn write_str(&mut self, data: impl Into) -> io::Result<()> { + self.write(data.into().into_bytes()).await + } + /// Checks if stdin has been closed pub fn is_closed(&self) -> bool { self.0.is_closed() @@ -273,12 +285,12 @@ impl RemoteStdin { /// A handle to a remote process' standard output (stdout) #[derive(Debug)] -pub struct RemoteStdout(mpsc::Receiver); +pub struct RemoteStdout(mpsc::Receiver>); impl RemoteStdout { /// Tries to receive latest stdout for a remote process, yielding `None` /// if no stdout is available, and `BrokenPipe` if stdout has been closed - pub fn try_read(&mut self) -> io::Result> { + pub fn try_read(&mut self) -> io::Result>> { match self.0.try_recv() { Ok(data) => Ok(Some(data)), Err(TryRecvError::Empty) => Ok(None), @@ -286,24 +298,41 @@ impl RemoteStdout { } } + /// Same as `try_read`, but returns a string + pub fn try_read_string(&mut self) -> io::Result> { + self.try_read().and_then(|x| match x { + Some(data) => String::from_utf8(data) + .map(Some) + .map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x)), + None => Ok(None), + }) + } + /// Retrieves the latest stdout for a specific remote process, and `BrokenPipe` if stdout has /// been closed - pub async fn read(&mut self) -> io::Result { + pub async fn read(&mut self) -> io::Result> { self.0 .recv() .await .ok_or_else(|| io::Error::from(io::ErrorKind::BrokenPipe)) } + + /// Same as `read`, but returns a string + pub async fn read_string(&mut self) -> io::Result { + self.read().await.and_then(|data| { + String::from_utf8(data).map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x)) + }) + } } /// A handle to a remote process' stderr #[derive(Debug)] -pub struct RemoteStderr(mpsc::Receiver); +pub struct RemoteStderr(mpsc::Receiver>); impl RemoteStderr { /// Tries to receive latest stderr for a remote process, yielding `None` /// if no stderr is available, and `BrokenPipe` if stderr has been closed - pub fn try_read(&mut self) -> io::Result> { + pub fn try_read(&mut self) -> io::Result>> { match self.0.try_recv() { Ok(data) => Ok(Some(data)), Err(TryRecvError::Empty) => Ok(None), @@ -311,14 +340,31 @@ impl RemoteStderr { } } + /// Same as `try_read`, but returns a string + pub fn try_read_string(&mut self) -> io::Result> { + self.try_read().and_then(|x| match x { + Some(data) => String::from_utf8(data) + .map(Some) + .map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x)), + None => Ok(None), + }) + } + /// Retrieves the latest stderr for a specific remote process, and `BrokenPipe` if stderr has /// been closed - pub async fn read(&mut self) -> io::Result { + pub async fn read(&mut self) -> io::Result> { self.0 .recv() .await .ok_or_else(|| io::Error::from(io::ErrorKind::BrokenPipe)) } + + /// Same as `read`, but returns a string + pub async fn read_string(&mut self) -> io::Result { + self.read().await.and_then(|data| { + String::from_utf8(data).map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x)) + }) + } } /// Helper function that loops, processing outgoing stdin requests to a remote process as well as @@ -327,7 +373,7 @@ async fn process_outgoing_requests( tenant: String, id: usize, mut channel: SessionChannel, - mut stdin_rx: mpsc::Receiver, + mut stdin_rx: mpsc::Receiver>, mut kill_rx: mpsc::Receiver<()>, ) -> Result<(), RemoteProcessError> { let result = loop { @@ -365,8 +411,8 @@ async fn process_outgoing_requests( async fn process_incoming_responses( proc_id: usize, mut mailbox: Mailbox, - stdout_tx: mpsc::Sender, - stderr_tx: mpsc::Sender, + stdout_tx: mpsc::Sender>, + stderr_tx: mpsc::Sender>, kill_tx: mpsc::Sender<()>, ) -> Result<(bool, Option), RemoteProcessError> { while let Some(res) = mailbox.next().await { @@ -436,6 +482,7 @@ mod tests { String::from("cmd"), vec![String::from("arg")], false, + None, ) .await }); @@ -475,6 +522,7 @@ mod tests { String::from("cmd"), vec![String::from("arg")], false, + None, ) .await }); @@ -521,6 +569,7 @@ mod tests { String::from("cmd"), vec![String::from("arg")], false, + None, ) .await }); @@ -534,7 +583,7 @@ mod tests { .send(Response::new( "test-tenant", req.id, - vec![ResponseData::ProcStart { id }], + vec![ResponseData::ProcSpawned { id }], )) .await .unwrap(); @@ -567,6 +616,7 @@ mod tests { String::from("cmd"), vec![String::from("arg")], false, + None, ) .await }); @@ -580,7 +630,7 @@ mod tests { .send(Response::new( "test-tenant", req.id, - vec![ResponseData::ProcStart { id }], + vec![ResponseData::ProcSpawned { id }], )) .await .unwrap(); @@ -624,6 +674,7 @@ mod tests { String::from("cmd"), vec![String::from("arg")], false, + None, ) .await }); @@ -637,7 +688,7 @@ mod tests { .send(Response::new( "test-tenant", req.id, - vec![ResponseData::ProcStart { id }], + vec![ResponseData::ProcSpawned { id }], )) .await .unwrap(); @@ -661,7 +712,7 @@ mod tests { { RequestData::ProcStdin { id, data } => { assert_eq!(*id, 12345); - assert_eq!(data, "some input"); + assert_eq!(data, b"some input"); } x => panic!("Unexpected request: {:?}", x), } @@ -680,6 +731,7 @@ mod tests { String::from("cmd"), vec![String::from("arg")], false, + None, ) .await }); @@ -693,7 +745,7 @@ mod tests { .send(Response::new( "test-tenant", req.id, - vec![ResponseData::ProcStart { id }], + vec![ResponseData::ProcSpawned { id }], )) .await .unwrap(); @@ -707,14 +759,14 @@ mod tests { req.id, vec![ResponseData::ProcStdout { id, - data: String::from("some out"), + data: b"some out".to_vec(), }], )) .await .unwrap(); let out = proc.stdout.as_mut().unwrap().read().await.unwrap(); - assert_eq!(out, "some out"); + assert_eq!(out, b"some out"); } #[tokio::test] @@ -730,6 +782,7 @@ mod tests { String::from("cmd"), vec![String::from("arg")], false, + None, ) .await }); @@ -743,7 +796,7 @@ mod tests { .send(Response::new( "test-tenant", req.id, - vec![ResponseData::ProcStart { id }], + vec![ResponseData::ProcSpawned { id }], )) .await .unwrap(); @@ -757,14 +810,14 @@ mod tests { req.id, vec![ResponseData::ProcStderr { id, - data: String::from("some err"), + data: b"some err".to_vec(), }], )) .await .unwrap(); let out = proc.stderr.as_mut().unwrap().read().await.unwrap(); - assert_eq!(out, "some err"); + assert_eq!(out, b"some err"); } #[tokio::test] @@ -780,6 +833,7 @@ mod tests { String::from("cmd"), vec![String::from("arg")], false, + None, ) .await }); @@ -793,7 +847,7 @@ mod tests { .send(Response::new( "test-tenant", req.id, - vec![ResponseData::ProcStart { id }], + vec![ResponseData::ProcSpawned { id }], )) .await .unwrap(); @@ -818,6 +872,7 @@ mod tests { String::from("cmd"), vec![String::from("arg")], false, + None, ) .await }); @@ -831,7 +886,7 @@ mod tests { .send(Response::new( "test-tenant", req.id, - vec![ResponseData::ProcStart { id }], + vec![ResponseData::ProcSpawned { id }], )) .await .unwrap(); @@ -864,6 +919,7 @@ mod tests { String::from("cmd"), vec![String::from("arg")], false, + None, ) .await }); @@ -877,7 +933,7 @@ mod tests { .send(Response::new( "test-tenant", req.id, - vec![ResponseData::ProcStart { id }], + vec![ResponseData::ProcSpawned { id }], )) .await .unwrap(); @@ -919,6 +975,7 @@ mod tests { String::from("cmd"), vec![String::from("arg")], false, + None, ) .await }); @@ -932,7 +989,7 @@ mod tests { .send(Response::new( "test-tenant", req.id, - vec![ResponseData::ProcStart { id }], + vec![ResponseData::ProcSpawned { id }], )) .await .unwrap(); @@ -962,6 +1019,7 @@ mod tests { String::from("cmd"), vec![String::from("arg")], false, + None, ) .await }); @@ -975,7 +1033,7 @@ mod tests { .send(Response::new( "test-tenant", req.id, - vec![ResponseData::ProcStart { id }], + vec![ResponseData::ProcSpawned { id }], )) .await .unwrap(); @@ -1012,6 +1070,7 @@ mod tests { String::from("cmd"), vec![String::from("arg")], false, + None, ) .await }); @@ -1025,7 +1084,7 @@ mod tests { .send(Response::new( "test-tenant", req.id, - vec![ResponseData::ProcStart { id }], + vec![ResponseData::ProcSpawned { id }], )) .await .unwrap(); diff --git a/distant-core/src/client/session/ext.rs b/distant-core/src/client/session/ext.rs index 4754792..8eca6c5 100644 --- a/distant-core/src/client/session/ext.rs +++ b/distant-core/src/client/session/ext.rs @@ -1,6 +1,9 @@ use crate::{ client::{RemoteLspProcess, RemoteProcess, RemoteProcessError, SessionChannel}, - data::{DirEntry, Error as Failure, Metadata, Request, RequestData, ResponseData, SystemInfo}, + data::{ + DirEntry, Error as Failure, Metadata, PtySize, Request, RequestData, ResponseData, + SystemInfo, + }, net::TransportError, }; use derive_more::{Display, Error, From}; @@ -122,6 +125,7 @@ pub trait SessionChannelExt { cmd: impl Into, args: Vec>, detached: bool, + pty: Option, ) -> AsyncReturn<'_, RemoteProcess, RemoteProcessError>; /// Spawns an LSP process on the remote machine @@ -131,6 +135,7 @@ pub trait SessionChannelExt { cmd: impl Into, args: Vec>, detached: bool, + pty: Option, ) -> AsyncReturn<'_, RemoteLspProcess, RemoteProcessError>; /// Retrieves information about the remote system @@ -375,13 +380,14 @@ impl SessionChannelExt for SessionChannel { cmd: impl Into, args: Vec>, detached: bool, + pty: Option, ) -> AsyncReturn<'_, RemoteProcess, RemoteProcessError> { let tenant = tenant.into(); let cmd = cmd.into(); let args = args.into_iter().map(Into::into).collect(); - Box::pin( - async move { RemoteProcess::spawn(tenant, self.clone(), cmd, args, detached).await }, - ) + Box::pin(async move { + RemoteProcess::spawn(tenant, self.clone(), cmd, args, detached, pty).await + }) } fn spawn_lsp( @@ -390,13 +396,14 @@ impl SessionChannelExt for SessionChannel { cmd: impl Into, args: Vec>, detached: bool, + pty: Option, ) -> AsyncReturn<'_, RemoteLspProcess, RemoteProcessError> { let tenant = tenant.into(); let cmd = cmd.into(); let args = args.into_iter().map(Into::into).collect(); - Box::pin( - async move { RemoteLspProcess::spawn(tenant, self.clone(), cmd, args, detached).await }, - ) + Box::pin(async move { + RemoteLspProcess::spawn(tenant, self.clone(), cmd, args, detached, pty).await + }) } fn system_info(&mut self, tenant: impl Into) -> AsyncReturn<'_, SystemInfo> { diff --git a/distant-core/src/data.rs b/distant-core/src/data.rs index 873304e..6a2bcac 100644 --- a/distant-core/src/data.rs +++ b/distant-core/src/data.rs @@ -1,6 +1,6 @@ -use derive_more::{Display, IsVariant}; +use derive_more::{Display, Error, IsVariant}; use serde::{Deserialize, Serialize}; -use std::{io, path::PathBuf}; +use std::{io, num::ParseIntError, path::PathBuf, str::FromStr}; use strum::AsRefStr; /// Type alias for a vec of bytes @@ -209,9 +209,9 @@ pub enum RequestData { resolve_file_type: bool, }, - /// Runs a process on the remote machine + /// Spawns a new process on the remote machine #[cfg_attr(feature = "structopt", structopt(visible_aliases = &["run"]))] - ProcRun { + ProcSpawn { /// Name of the command to run cmd: String, @@ -222,6 +222,10 @@ pub enum RequestData { /// killed when the associated client disconnects #[cfg_attr(feature = "structopt", structopt(long))] detached: bool, + + /// If provided, will spawn process in a pty, otherwise spawns directly + #[cfg_attr(feature = "structopt", structopt(long))] + pty: Option, }, /// Kills a process running on the remote machine @@ -237,7 +241,16 @@ pub enum RequestData { id: usize, /// Data to send to a process's stdin pipe - data: String, + data: Vec, + }, + + /// Resize pty of remote process + ProcResizePty { + /// Id of the actively-running process whose pty to resize + id: usize, + + /// The new pty dimensions + size: PtySize, }, /// Retrieve a list of all processes being managed by the remote server @@ -328,7 +341,7 @@ pub enum ResponseData { Metadata(Metadata), /// Response to starting a new process - ProcStart { + ProcSpawned { /// Arbitrary id associated with running process id: usize, }, @@ -339,7 +352,7 @@ pub enum ResponseData { id: usize, /// Data read from a process' stdout pipe - data: String, + data: Vec, }, /// Actively-transmitted stderr as part of running process @@ -348,7 +361,7 @@ pub enum ResponseData { id: usize, /// Data read from a process' stderr pipe - data: String, + data: Vec, }, /// Response to a process finishing @@ -373,6 +386,83 @@ pub enum ResponseData { SystemInfo(SystemInfo), } +/// Represents the size associated with a remote PTY +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct PtySize { + /// Number of lines of text + pub rows: u16, + + /// Number of columns of text + pub cols: u16, + + /// Width of a cell in pixels. Note that some systems never fill this value and ignore it. + pub pixel_width: u16, + + /// Height of a cell in pixels. Note that some systems never fill this value and ignore it. + pub pixel_height: u16, +} + +impl PtySize { + /// Creates new size using just rows and columns + pub fn from_rows_and_cols(rows: u16, cols: u16) -> Self { + Self { + rows, + cols, + pixel_width: 0, + pixel_height: 0, + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Display, Error)] +pub enum PtySizeParseError { + MissingRows, + MissingColumns, + InvalidRows(ParseIntError), + InvalidColumns(ParseIntError), + InvalidPixelWidth(ParseIntError), + InvalidPixelHeight(ParseIntError), +} + +impl FromStr for PtySize { + type Err = PtySizeParseError; + + /// Attempts to parse a str into PtySize using one of the following formats: + /// + /// * rows,cols (defaults to 0 for pixel_width & pixel_height) + /// * rows,cols,pixel_width,pixel_height + fn from_str(s: &str) -> Result { + let mut tokens = s.split(','); + + Ok(Self { + rows: tokens + .next() + .ok_or(PtySizeParseError::MissingRows)? + .trim() + .parse() + .map_err(PtySizeParseError::InvalidRows)?, + cols: tokens + .next() + .ok_or(PtySizeParseError::MissingColumns)? + .trim() + .parse() + .map_err(PtySizeParseError::InvalidColumns)?, + pixel_width: tokens + .next() + .map(|s| s.trim().parse()) + .transpose() + .map_err(PtySizeParseError::InvalidPixelWidth)? + .unwrap_or(0), + pixel_height: tokens + .next() + .map(|s| s.trim().parse()) + .transpose() + .map_err(PtySizeParseError::InvalidPixelHeight)? + .unwrap_or(0), + }) + } +} + /// Represents metadata about some path on a remote machine #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct Metadata { @@ -495,6 +585,9 @@ pub struct RunningProcess { /// Whether or not the process was run in detached mode pub detached: bool, + /// Pty associated with running process if it has one + pub pty: Option, + /// Arbitrary id associated with running process /// /// Not the same as the process' pid! diff --git a/distant-core/src/server/distant/handler.rs b/distant-core/src/server/distant/handler.rs index 4709935..66059ef 100644 --- a/distant-core/src/server/distant/handler.rs +++ b/distant-core/src/server/distant/handler.rs @@ -1,7 +1,7 @@ use crate::{ constants::{MAX_PIPE_CHUNK_SIZE, READ_PAUSE_MILLIS}, data::{ - self, DirEntry, FileType, Metadata, Request, RequestData, Response, ResponseData, + self, DirEntry, FileType, Metadata, PtySize, Request, RequestData, Response, ResponseData, RunningProcess, SystemInfo, }, server::distant::state::{Process, State}, @@ -98,13 +98,17 @@ pub(super) async fn process( canonicalize, resolve_file_type, } => metadata(path, canonicalize, resolve_file_type).await, - RequestData::ProcRun { + RequestData::ProcSpawn { cmd, args, detached, - } => proc_run(conn_id, state, reply, cmd, args, detached).await, + pty, + } => proc_spawn(conn_id, state, reply, cmd, args, detached, pty).await, RequestData::ProcKill { id } => proc_kill(conn_id, state, id).await, RequestData::ProcStdin { id, data } => proc_stdin(conn_id, state, id, data).await, + RequestData::ProcResizePty { id, size } => { + proc_resize_pty(conn_id, state, id, size).await + } RequestData::ProcList {} => proc_list(state).await, RequestData::SystemInfo {} => system_info().await, } @@ -423,13 +427,14 @@ async fn metadata( }))) } -async fn proc_run( +async fn proc_spawn( conn_id: usize, state: HState, reply: F, cmd: String, args: Vec, detached: bool, + pty: Option, ) -> Result where F: FnMut(Vec) -> ReplyRet + Clone + Send + 'static, @@ -452,7 +457,7 @@ where state .lock() .await - .push_process(conn_id, Process::new(id, cmd, args, detached)); + .push_process(conn_id, Process::new(id, cmd, args, detached, pty)); let post_hook = Box::new(move |mut state_lock: MutexGuard<'_, State>| { // Spawn a task that sends stdout as a response @@ -462,29 +467,21 @@ where let mut buf: [u8; MAX_PIPE_CHUNK_SIZE] = [0; MAX_PIPE_CHUNK_SIZE]; loop { match stdout.read(&mut buf).await { - Ok(n) if n > 0 => match String::from_utf8(buf[..n].to_vec()) { - Ok(data) => { - let payload = vec![ResponseData::ProcStdout { id, data }]; - if !reply_2(payload).await { - error!(" Stdout channel closed", conn_id, id); - break; - } - - // Pause to allow buffer to fill up a little bit, avoiding - // spamming with a lot of smaller responses - tokio::time::sleep(tokio::time::Duration::from_millis( - READ_PAUSE_MILLIS, - )) - .await; - } - Err(x) => { - error!( - " Invalid data read from stdout pipe: {}", - conn_id, id, x - ); + Ok(n) if n > 0 => { + let payload = vec![ResponseData::ProcStdout { + id, + data: buf[..n].to_vec(), + }]; + if !reply_2(payload).await { + error!(" Stdout channel closed", conn_id, id); break; } - }, + + // Pause to allow buffer to fill up a little bit, avoiding + // spamming with a lot of smaller responses + tokio::time::sleep(tokio::time::Duration::from_millis(READ_PAUSE_MILLIS)) + .await; + } Ok(_) => break, Err(x) => { error!( @@ -504,29 +501,21 @@ where let mut buf: [u8; MAX_PIPE_CHUNK_SIZE] = [0; MAX_PIPE_CHUNK_SIZE]; loop { match stderr.read(&mut buf).await { - Ok(n) if n > 0 => match String::from_utf8(buf[..n].to_vec()) { - Ok(data) => { - let payload = vec![ResponseData::ProcStderr { id, data }]; - if !reply_2(payload).await { - error!(" Stderr channel closed", conn_id, id); - break; - } - - // Pause to allow buffer to fill up a little bit, avoiding - // spamming with a lot of smaller responses - tokio::time::sleep(tokio::time::Duration::from_millis( - READ_PAUSE_MILLIS, - )) - .await; - } - Err(x) => { - error!( - " Invalid data read from stdout pipe: {}", - conn_id, id, x - ); + Ok(n) if n > 0 => { + let payload = vec![ResponseData::ProcStderr { + id, + data: buf[..n].to_vec(), + }]; + if !reply_2(payload).await { + error!(" Stderr channel closed", conn_id, id); break; } - }, + + // Pause to allow buffer to fill up a little bit, avoiding + // spamming with a lot of smaller responses + tokio::time::sleep(tokio::time::Duration::from_millis(READ_PAUSE_MILLIS)) + .await; + } Ok(_) => break, Err(x) => { error!( @@ -541,10 +530,10 @@ where // Spawn a task that sends stdin to the process let mut stdin = child.stdin.take().unwrap(); - let (stdin_tx, mut stdin_rx) = mpsc::channel::(1); + let (stdin_tx, mut stdin_rx) = mpsc::channel::>(1); let stdin_task = tokio::spawn(async move { while let Some(line) = stdin_rx.recv().await { - if let Err(x) = stdin.write_all(line.as_bytes()).await { + if let Err(x) = stdin.write_all(&line).await { error!( " Failed to send stdin: {}", conn_id, id, x @@ -661,7 +650,7 @@ where conn_id, id ); Ok(Outgoing { - data: ResponseData::ProcStart { id }, + data: ResponseData::ProcSpawned { id }, post_hook: Some(post_hook), }) } @@ -686,7 +675,7 @@ async fn proc_stdin( conn_id: usize, state: HState, id: usize, - data: String, + data: Vec, ) -> Result { if let Some(process) = state.lock().await.processes.get(&id) { if process.send_stdin(data).await { @@ -703,6 +692,15 @@ async fn proc_stdin( ))) } +async fn proc_resize_pty( + conn_id: usize, + state: HState, + id: usize, + size: PtySize, +) -> Result { + todo!(); +} + async fn proc_list(state: HState) -> Result { Ok(Outgoing::from(ResponseData::ProcEntries { entries: state @@ -714,6 +712,7 @@ async fn proc_list(state: HState) -> Result { cmd: p.cmd.to_string(), args: p.args.clone(), detached: p.detached, + pty: p.pty.clone(), id: p.id, }) .collect(), @@ -2219,10 +2218,11 @@ mod tests { let req = Request::new( "test-tenant", - vec![RequestData::ProcRun { + vec![RequestData::ProcSpawn { cmd: DOES_NOT_EXIST_BIN.to_str().unwrap().to_string(), args: Vec::new(), detached: false, + pty: None, }], ); @@ -2245,10 +2245,11 @@ mod tests { let req = Request::new( "test-tenant", - vec![RequestData::ProcRun { + vec![RequestData::ProcSpawn { cmd: SCRIPT_RUNNER.to_string(), args: vec![ECHO_ARGS_TO_STDOUT_SH.to_str().unwrap().to_string()], detached: false, + pty: None, }], ); @@ -2259,7 +2260,7 @@ mod tests { let res = rx.recv().await.unwrap(); assert_eq!(res.payload.len(), 1, "Wrong payload size"); assert!( - matches!(&res.payload[0], ResponseData::ProcStart { .. }), + matches!(&res.payload[0], ResponseData::ProcSpawned { .. }), "Unexpected response: {:?}", res.payload[0] ); @@ -2275,13 +2276,14 @@ mod tests { // Run a program that echoes to stdout let req = Request::new( "test-tenant", - vec![RequestData::ProcRun { + vec![RequestData::ProcSpawn { cmd: SCRIPT_RUNNER.to_string(), args: vec![ ECHO_ARGS_TO_STDOUT_SH.to_str().unwrap().to_string(), String::from("some stdout"), ], detached: false, + pty: None, }], ); @@ -2292,7 +2294,7 @@ mod tests { let res = rx.recv().await.unwrap(); assert_eq!(res.payload.len(), 1, "Wrong payload size"); assert!( - matches!(&res.payload[0], ResponseData::ProcStart { .. }), + matches!(&res.payload[0], ResponseData::ProcSpawned { .. }), "Unexpected response: {:?}", res.payload[0] ); @@ -2314,7 +2316,7 @@ mod tests { assert_eq!(res.payload.len(), 1, "Wrong payload size"); match &res.payload[0] { ResponseData::ProcStdout { data, .. } => { - assert_eq!(data, "some stdout", "Got wrong stdout"); + assert_eq!(data, b"some stdout", "Got wrong stdout"); got_stdout = true; } ResponseData::ProcDone { success, .. } => { @@ -2341,13 +2343,14 @@ mod tests { // Run a program that echoes to stderr let req = Request::new( "test-tenant", - vec![RequestData::ProcRun { + vec![RequestData::ProcSpawn { cmd: SCRIPT_RUNNER.to_string(), args: vec![ ECHO_ARGS_TO_STDERR_SH.to_str().unwrap().to_string(), String::from("some stderr"), ], detached: false, + pty: None, }], ); @@ -2358,7 +2361,7 @@ mod tests { let res = rx.recv().await.unwrap(); assert_eq!(res.payload.len(), 1, "Wrong payload size"); assert!( - matches!(&res.payload[0], ResponseData::ProcStart { .. }), + matches!(&res.payload[0], ResponseData::ProcSpawned { .. }), "Unexpected response: {:?}", res.payload[0] ); @@ -2380,7 +2383,7 @@ mod tests { assert_eq!(res.payload.len(), 1, "Wrong payload size"); match &res.payload[0] { ResponseData::ProcStderr { data, .. } => { - assert_eq!(data, "some stderr", "Got wrong stderr"); + assert_eq!(data, b"some stderr", "Got wrong stderr"); got_stderr = true; } ResponseData::ProcDone { success, .. } => { @@ -2407,10 +2410,11 @@ mod tests { // Run a program that ends after a little bit let req = Request::new( "test-tenant", - vec![RequestData::ProcRun { + vec![RequestData::ProcSpawn { cmd: SCRIPT_RUNNER.to_string(), args: vec![SLEEP_SH.to_str().unwrap().to_string(), String::from("0.1")], detached: false, + pty: None, }], ); @@ -2421,7 +2425,7 @@ mod tests { let res = rx.recv().await.unwrap(); assert_eq!(res.payload.len(), 1, "Wrong payload size"); let id = match &res.payload[0] { - ResponseData::ProcStart { id } => *id, + ResponseData::ProcSpawned { id } => *id, x => panic!("Unexpected response: {:?}", x), }; @@ -2450,10 +2454,11 @@ mod tests { // Run a program that ends slowly let req = Request::new( "test-tenant", - vec![RequestData::ProcRun { + vec![RequestData::ProcSpawn { cmd: SCRIPT_RUNNER.to_string(), args: vec![SLEEP_SH.to_str().unwrap().to_string(), String::from("1")], detached: false, + pty: None, }], ); @@ -2464,7 +2469,7 @@ mod tests { let res = rx.recv().await.unwrap(); assert_eq!(res.payload.len(), 1, "Wrong payload size"); let id = match &res.payload[0] { - ResponseData::ProcStart { id } => *id, + ResponseData::ProcSpawned { id } => *id, x => panic!("Unexpected response: {:?}", x), }; @@ -2525,10 +2530,11 @@ mod tests { // First, run a program that sits around (sleep for 1 second) let req = Request::new( "test-tenant", - vec![RequestData::ProcRun { + vec![RequestData::ProcSpawn { cmd: SCRIPT_RUNNER.to_string(), args: vec![SLEEP_SH.to_str().unwrap().to_string(), String::from("1")], detached: false, + pty: None, }], ); @@ -2541,7 +2547,7 @@ mod tests { // Second, grab the id of the started process let id = match &res.payload[0] { - ResponseData::ProcStart { id } => *id, + ResponseData::ProcSpawned { id } => *id, x => panic!("Unexpected response: {:?}", x), }; @@ -2592,7 +2598,7 @@ mod tests { "test-tenant", vec![RequestData::ProcStdin { id: 0xDEADBEEF, - data: String::from("some input"), + data: b"some input".to_vec(), }], ); @@ -2621,10 +2627,11 @@ mod tests { // First, run a program that listens for stdin let req = Request::new( "test-tenant", - vec![RequestData::ProcRun { + vec![RequestData::ProcSpawn { cmd: SCRIPT_RUNNER.to_string(), args: vec![ECHO_STDIN_TO_STDOUT_SH.to_str().unwrap().to_string()], detached: false, + pty: None, }], ); @@ -2637,7 +2644,7 @@ mod tests { // Second, grab the id of the started process let id = match &res.payload[0] { - ResponseData::ProcStart { id } => *id, + ResponseData::ProcSpawned { id } => *id, x => panic!("Unexpected response: {:?}", x), }; @@ -2646,7 +2653,7 @@ mod tests { "test-tenant", vec![RequestData::ProcStdin { id, - data: String::from("hello world\n"), + data: b"hello world\n".to_vec(), }], ); @@ -2672,7 +2679,7 @@ mod tests { match &res.payload[0] { ResponseData::Ok => got_ok = true, ResponseData::ProcStdout { data, .. } => { - assert_eq!(data, "hello world\n", "Mirrored data didn't match"); + assert_eq!(data, b"hello world\n", "Mirrored data didn't match"); got_stdout = true; } x => panic!("Unexpected response: {:?}", x), @@ -2694,10 +2701,11 @@ mod tests { let req = Request::new( "test-tenant", vec![ - RequestData::ProcRun { + RequestData::ProcSpawn { cmd: SCRIPT_RUNNER.to_string(), args: vec![SLEEP_SH.to_str().unwrap().to_string(), String::from("1")], detached: false, + pty: None, }, RequestData::ProcList {}, ], @@ -2710,7 +2718,7 @@ mod tests { // Grab the id of the started process let id = match &res.payload[0] { - ResponseData::ProcStart { id } => *id, + ResponseData::ProcSpawned { id } => *id, x => panic!("Unexpected response: {:?}", x), }; @@ -2722,6 +2730,7 @@ mod tests { cmd: SCRIPT_RUNNER.to_string(), args: vec![SLEEP_SH.to_str().unwrap().to_string(), String::from("1")], detached: false, + pty: None, id, }], }, diff --git a/distant-core/src/server/distant/state.rs b/distant-core/src/server/distant/state.rs index b8a8e83..bdb95c1 100644 --- a/distant-core/src/server/distant/state.rs +++ b/distant-core/src/server/distant/state.rs @@ -1,3 +1,4 @@ +use crate::data::PtySize; use log::*; use std::{ collections::HashMap, @@ -105,9 +106,12 @@ pub struct Process { /// Whether or not this process was run detached pub detached: bool, + /// Dimensions of pty associated with process, if it has one + pub pty: Option, + /// Transport channel to send new input to the stdin of the process, /// one line at a time - stdin_tx: Option>, + stdin_tx: Option>>, /// Transport channel to report that the process should be killed kill_tx: Option>, @@ -117,12 +121,19 @@ pub struct Process { } impl Process { - pub fn new(id: usize, cmd: String, args: Vec, detached: bool) -> Self { + pub fn new( + id: usize, + cmd: String, + args: Vec, + detached: bool, + pty: Option, + ) -> Self { Self { id, cmd, args, detached, + pty, stdin_tx: None, kill_tx: None, wait_task: None, @@ -132,7 +143,7 @@ impl Process { /// Lazy initialization of process state pub(crate) fn initialize( &mut self, - stdin_tx: mpsc::Sender, + stdin_tx: mpsc::Sender>, kill_tx: oneshot::Sender<()>, wait_task: JoinHandle<()>, ) { @@ -141,7 +152,7 @@ impl Process { self.wait_task = Some(wait_task); } - pub async fn send_stdin(&self, input: impl Into) -> bool { + pub async fn send_stdin(&self, input: impl Into>) -> bool { if let Some(stdin) = self.stdin_tx.as_ref() { if stdin.send(input.into()).await.is_ok() { return true; diff --git a/distant-core/src/server/relay.rs b/distant-core/src/server/relay.rs index 17ea684..770160f 100644 --- a/distant-core/src/server/relay.rs +++ b/distant-core/src/server/relay.rs @@ -172,7 +172,7 @@ where { let mut p_lock = processes.lock().await; for data in res.payload.iter() { - if let ResponseData::ProcStart { id } = *data { + if let ResponseData::ProcSpawned { id } = *data { p_lock.push(id); } } diff --git a/distant-ssh2/src/handler.rs b/distant-ssh2/src/handler.rs index 2a57716..87339e7 100644 --- a/distant-ssh2/src/handler.rs +++ b/distant-ssh2/src/handler.rs @@ -99,7 +99,7 @@ pub(super) async fn process( canonicalize, resolve_file_type, } => metadata(session, path, canonicalize, resolve_file_type).await, - RequestData::ProcRun { + RequestData::ProcSpawn { cmd, args, detached, @@ -841,7 +841,7 @@ where id ); Ok(Outgoing { - data: ResponseData::ProcStart { id }, + data: ResponseData::ProcSpawned { id }, post_hook: Some(post_hook), }) } diff --git a/distant-ssh2/tests/ssh2/session.rs b/distant-ssh2/tests/ssh2/session.rs index c8e8ce8..603c5a4 100644 --- a/distant-ssh2/tests/ssh2/session.rs +++ b/distant-ssh2/tests/ssh2/session.rs @@ -1352,7 +1352,7 @@ async fn proc_run_should_send_error_over_stderr_on_failure(#[future] session: Se let mut session = session.await; let req = Request::new( "test-tenant", - vec![RequestData::ProcRun { + vec![RequestData::ProcSpawn { cmd: DOES_NOT_EXIST_BIN.to_str().unwrap().to_string(), args: Vec::new(), detached: false, @@ -1367,7 +1367,7 @@ async fn proc_run_should_send_error_over_stderr_on_failure(#[future] session: Se let res = mailbox.next().await.unwrap(); assert_eq!(res.payload.len(), 1, "Wrong payload size"); let proc_id = match &res.payload[0] { - ResponseData::ProcStart { id } => *id, + ResponseData::ProcSpawned { id } => *id, x => panic!("Unexpected response: {:?}", x), }; @@ -1398,7 +1398,7 @@ async fn proc_run_should_send_back_proc_start_on_success(#[future] session: Sess let mut session = session.await; let req = Request::new( "test-tenant", - vec![RequestData::ProcRun { + vec![RequestData::ProcSpawn { cmd: SCRIPT_RUNNER.to_string(), args: vec![ECHO_ARGS_TO_STDOUT_SH.to_str().unwrap().to_string()], detached: false, @@ -1408,7 +1408,7 @@ async fn proc_run_should_send_back_proc_start_on_success(#[future] session: Sess let res = session.send(req).await.unwrap(); assert_eq!(res.payload.len(), 1, "Wrong payload size"); assert!( - matches!(&res.payload[0], ResponseData::ProcStart { .. }), + matches!(&res.payload[0], ResponseData::ProcSpawned { .. }), "Unexpected response: {:?}", res.payload[0] ); @@ -1424,7 +1424,7 @@ async fn proc_run_should_send_back_stdout_periodically_when_available(#[future] // Run a program that echoes to stdout let req = Request::new( "test-tenant", - vec![RequestData::ProcRun { + vec![RequestData::ProcSpawn { cmd: SCRIPT_RUNNER.to_string(), args: vec![ ECHO_ARGS_TO_STDOUT_SH.to_str().unwrap().to_string(), @@ -1439,7 +1439,7 @@ async fn proc_run_should_send_back_stdout_periodically_when_available(#[future] let res = mailbox.next().await.unwrap(); assert_eq!(res.payload.len(), 1, "Wrong payload size"); assert!( - matches!(&res.payload[0], ResponseData::ProcStart { .. }), + matches!(&res.payload[0], ResponseData::ProcSpawned { .. }), "Unexpected response: {:?}", res.payload[0] ); @@ -1488,7 +1488,7 @@ async fn proc_run_should_send_back_stderr_periodically_when_available(#[future] // Run a program that echoes to stderr let req = Request::new( "test-tenant", - vec![RequestData::ProcRun { + vec![RequestData::ProcSpawn { cmd: SCRIPT_RUNNER.to_string(), args: vec![ ECHO_ARGS_TO_STDERR_SH.to_str().unwrap().to_string(), @@ -1503,7 +1503,7 @@ async fn proc_run_should_send_back_stderr_periodically_when_available(#[future] let res = mailbox.next().await.unwrap(); assert_eq!(res.payload.len(), 1, "Wrong payload size"); assert!( - matches!(&res.payload[0], ResponseData::ProcStart { .. }), + matches!(&res.payload[0], ResponseData::ProcSpawned { .. }), "Unexpected response: {:?}", res.payload[0] ); @@ -1552,7 +1552,7 @@ async fn proc_run_should_clear_process_from_state_when_done(#[future] session: S // Run a program that ends after a little bit let req = Request::new( "test-tenant", - vec![RequestData::ProcRun { + vec![RequestData::ProcSpawn { cmd: SCRIPT_RUNNER.to_string(), args: vec![SLEEP_SH.to_str().unwrap().to_string(), String::from("0.1")], detached: false, @@ -1563,7 +1563,7 @@ async fn proc_run_should_clear_process_from_state_when_done(#[future] session: S let res = mailbox.next().await.unwrap(); assert_eq!(res.payload.len(), 1, "Wrong payload size"); let id = match &res.payload[0] { - ResponseData::ProcStart { id } => *id, + ResponseData::ProcSpawned { id } => *id, x => panic!("Unexpected response: {:?}", x), }; @@ -1600,7 +1600,7 @@ async fn proc_run_should_clear_process_from_state_when_killed(#[future] session: // Run a program that ends slowly let req = Request::new( "test-tenant", - vec![RequestData::ProcRun { + vec![RequestData::ProcSpawn { cmd: SCRIPT_RUNNER.to_string(), args: vec![SLEEP_SH.to_str().unwrap().to_string(), String::from("1")], detached: false, @@ -1612,7 +1612,7 @@ async fn proc_run_should_clear_process_from_state_when_killed(#[future] session: let res = mailbox.next().await.unwrap(); assert_eq!(res.payload.len(), 1, "Wrong payload size"); let id = match &res.payload[0] { - ResponseData::ProcStart { id } => *id, + ResponseData::ProcSpawned { id } => *id, x => panic!("Unexpected response: {:?}", x), }; @@ -1674,7 +1674,7 @@ async fn proc_kill_should_send_ok_and_done_responses_on_success(#[future] sessio // First, run a program that sits around (sleep for 1 second) let req = Request::new( "test-tenant", - vec![RequestData::ProcRun { + vec![RequestData::ProcSpawn { cmd: SCRIPT_RUNNER.to_string(), args: vec![SLEEP_SH.to_str().unwrap().to_string(), String::from("1")], detached: false, @@ -1688,7 +1688,7 @@ async fn proc_kill_should_send_ok_and_done_responses_on_success(#[future] sessio // Second, grab the id of the started process let id = match &res.payload[0] { - ResponseData::ProcStart { id } => *id, + ResponseData::ProcSpawned { id } => *id, x => panic!("Unexpected response: {:?}", x), }; @@ -1749,7 +1749,7 @@ async fn proc_stdin_should_send_ok_on_success_and_properly_send_stdin_to_process // First, run a program that listens for stdin let req = Request::new( "test-tenant", - vec![RequestData::ProcRun { + vec![RequestData::ProcSpawn { cmd: SCRIPT_RUNNER.to_string(), args: vec![ECHO_STDIN_TO_STDOUT_SH.to_str().unwrap().to_string()], detached: false, @@ -1762,7 +1762,7 @@ async fn proc_stdin_should_send_ok_on_success_and_properly_send_stdin_to_process // Second, grab the id of the started process let id = match &res.payload[0] { - ResponseData::ProcStart { id } => *id, + ResponseData::ProcSpawned { id } => *id, x => panic!("Unexpected response: {:?}", x), }; @@ -1798,7 +1798,7 @@ async fn proc_list_should_send_proc_entry_list(#[future] session: Session) { let mut session = session.await; let req = Request::new( "test-tenant", - vec![RequestData::ProcRun { + vec![RequestData::ProcSpawn { cmd: SCRIPT_RUNNER.to_string(), args: vec![SLEEP_SH.to_str().unwrap().to_string(), String::from("10")], detached: false, @@ -1810,7 +1810,7 @@ async fn proc_list_should_send_proc_entry_list(#[future] session: Session) { // Grab the id of the started process let id = match &res.payload[0] { - ResponseData::ProcStart { id } => *id, + ResponseData::ProcSpawned { id } => *id, x => panic!("Unexpected response: {:?}", x), }; diff --git a/src/output.rs b/src/output.rs index d869dda..6ba97cd 100644 --- a/src/output.rs +++ b/src/output.rs @@ -144,7 +144,7 @@ fn format_shell(data: ResponseData) -> ResponseOut { .collect::>() .join("\n"), ), - ResponseData::ProcStart { .. } => ResponseOut::None, + ResponseData::ProcSpawned { .. } => ResponseOut::None, ResponseData::ProcStdout { data, .. } => ResponseOut::Stdout(data), ResponseData::ProcStderr { data, .. } => ResponseOut::Stderr(data), ResponseData::ProcDone { id, success, code } => { diff --git a/src/subcommand/action.rs b/src/subcommand/action.rs index 9d9102a..58d66ad 100644 --- a/src/subcommand/action.rs +++ b/src/subcommand/action.rs @@ -88,7 +88,7 @@ async fn start( // the stdin will be used for sending ProcStdin to remote process ( _, - Some(RequestData::ProcRun { + Some(RequestData::ProcSpawn { cmd, args, detached, diff --git a/tests/cli/action/proc_run.rs b/tests/cli/action/proc_run.rs index 284a627..0c4ab84 100644 --- a/tests/cli/action/proc_run.rs +++ b/tests/cli/action/proc_run.rs @@ -168,7 +168,7 @@ fn should_support_json_to_execute_program_and_return_exit_status(mut action_cmd: let req = Request { id: rand::random(), tenant: random_tenant(), - payload: vec![RequestData::ProcRun { + payload: vec![RequestData::ProcSpawn { cmd: SCRIPT_RUNNER.to_string(), args: vec![ECHO_ARGS_TO_STDOUT_SH.to_str().unwrap().to_string()], detached: false, @@ -186,7 +186,7 @@ fn should_support_json_to_execute_program_and_return_exit_status(mut action_cmd: let res: Response = serde_json::from_slice(&cmd.get_output().stdout).unwrap(); assert!( - matches!(res.payload[0], ResponseData::ProcStart { .. }), + matches!(res.payload[0], ResponseData::ProcSpawned { .. }), "Unexpected response: {:?}", res.payload[0], ); @@ -198,7 +198,7 @@ fn should_support_json_to_capture_and_print_stdout(ctx: &'_ DistantServerCtx) { let req = Request { id: rand::random(), tenant: random_tenant(), - payload: vec![RequestData::ProcRun { + payload: vec![RequestData::ProcSpawn { cmd: SCRIPT_RUNNER.to_string(), args: vec![ ECHO_ARGS_TO_STDOUT_SH.to_str().unwrap().to_string(), @@ -230,7 +230,7 @@ fn should_support_json_to_capture_and_print_stdout(ctx: &'_ DistantServerCtx) { friendly_recv_line(&stdout, Duration::from_secs(30)).expect("Failed to get proc start"); let res: Response = serde_json::from_str(&out).unwrap(); assert!( - matches!(res.payload[0], ResponseData::ProcStart { .. }), + matches!(res.payload[0], ResponseData::ProcSpawned { .. }), "Unexpected response: {:?}", res.payload[0] ); @@ -267,7 +267,7 @@ fn should_support_json_to_capture_and_print_stderr(ctx: &'_ DistantServerCtx) { let req = Request { id: rand::random(), tenant: random_tenant(), - payload: vec![RequestData::ProcRun { + payload: vec![RequestData::ProcSpawn { cmd: SCRIPT_RUNNER.to_string(), args: vec![ ECHO_ARGS_TO_STDERR_SH.to_str().unwrap().to_string(), @@ -299,7 +299,7 @@ fn should_support_json_to_capture_and_print_stderr(ctx: &'_ DistantServerCtx) { friendly_recv_line(&stdout, Duration::from_secs(30)).expect("Failed to get proc start"); let res: Response = serde_json::from_str(&out).unwrap(); assert!( - matches!(res.payload[0], ResponseData::ProcStart { .. }), + matches!(res.payload[0], ResponseData::ProcSpawned { .. }), "Unexpected response: {:?}", res.payload[0] ); @@ -335,7 +335,7 @@ fn should_support_json_to_forward_stdin_to_remote_process(ctx: &'_ DistantServer let req = Request { id: rand::random(), tenant: random_tenant(), - payload: vec![RequestData::ProcRun { + payload: vec![RequestData::ProcSpawn { cmd: SCRIPT_RUNNER.to_string(), args: vec![ECHO_STDIN_TO_STDOUT_SH.to_str().unwrap().to_string()], detached: false, @@ -364,7 +364,7 @@ fn should_support_json_to_forward_stdin_to_remote_process(ctx: &'_ DistantServer friendly_recv_line(&stdout, Duration::from_secs(30)).expect("Failed to get proc start"); let res: Response = serde_json::from_str(&out).unwrap(); let id = match &res.payload[0] { - ResponseData::ProcStart { id } => *id, + ResponseData::ProcSpawned { id } => *id, x => panic!("Unexpected response: {:?}", x), }; @@ -429,7 +429,7 @@ fn should_support_json_output_for_error(mut action_cmd: Command) { let req = Request { id: rand::random(), tenant: random_tenant(), - payload: vec![RequestData::ProcRun { + payload: vec![RequestData::ProcSpawn { cmd: DOES_NOT_EXIST_BIN.to_str().unwrap().to_string(), args: Vec::new(), detached: false,