Unfinished pty support, renaming ProcRun -> ProcSpawn and ProcStarted -> ProcSpawned, switch ProcStdin/ProcStdout/ProcStderr to use Vec<u8> instead of String, update RemoteProcess and RemoteLspProcess to support reading/writing string and vec<u8>

pull/96/head
Chip Senkbeil 2 years ago
parent 9365a92586
commit c6c07c5c2c
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

21
Cargo.lock generated

@ -491,6 +491,7 @@ dependencies = [
"indoc",
"log",
"once_cell",
"portable-pty 0.7.0",
"predicates",
"rand",
"serde",
@ -1256,6 +1257,24 @@ dependencies = [
"winapi",
]
[[package]]
name = "portable-pty"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e4d17ec050a6b7ea4b15c430183772bce8384072d3f328e0967e72b7eec46b5"
dependencies = [
"anyhow",
"bitflags",
"filedescriptor",
"lazy_static",
"libc",
"log",
"serial",
"shared_library",
"shell-words",
"winapi",
]
[[package]]
name = "ppv-lite86"
version = "0.2.10"
@ -2049,7 +2068,7 @@ dependencies = [
"filedescriptor",
"filenamegen",
"log",
"portable-pty",
"portable-pty 0.5.0",
"regex",
"smol",
"ssh2",

@ -20,6 +20,7 @@ futures = "0.3.16"
hex = "0.4.3"
log = "0.4.14"
once_cell = "1.8.0"
portable-pty = "0.7.0"
rand = { version = "0.8.4", features = ["getrandom"] }
serde = { version = "1.0.126", features = ["derive"] }
serde_json = "1.0.64"

@ -178,6 +178,11 @@ impl LspData {
Ok(Self { header, content })
}
/// Converts into a vec of bytes representing the string format
pub fn to_bytes(&self) -> Vec<u8> {
self.to_string().into_bytes()
}
}
impl fmt::Display for LspData {

@ -1,8 +1,7 @@
use super::{RemoteProcess, RemoteProcessError, RemoteStderr, RemoteStdin, RemoteStdout};
use crate::client::SessionChannel;
use crate::{client::SessionChannel, data::PtySize};
use futures::stream::{Stream, StreamExt};
use std::{
fmt::Write,
io::{self, Cursor, Read},
ops::{Deref, DerefMut},
};
@ -32,8 +31,9 @@ impl RemoteLspProcess {
cmd: impl Into<String>,
args: Vec<String>,
detached: bool,
pty: Option<PtySize>,
) -> Result<Self, RemoteProcessError> {
let mut inner = RemoteProcess::spawn(tenant, channel, cmd, args, detached).await?;
let mut inner = RemoteProcess::spawn(tenant, channel, cmd, args, detached, pty).await?;
let stdin = inner.stdin.take().map(RemoteLspStdin::new);
let stdout = inner.stdout.take().map(RemoteLspStdout::new);
let stderr = inner.stderr.take().map(RemoteLspStderr::new);
@ -70,7 +70,7 @@ impl DerefMut for RemoteLspProcess {
#[derive(Debug)]
pub struct RemoteLspStdin {
inner: RemoteStdin,
buf: Option<String>,
buf: Option<Vec<u8>>,
}
impl RemoteLspStdin {
@ -79,7 +79,7 @@ impl RemoteLspStdin {
}
/// Tries to write data to the stdin of a specific remote process
pub fn try_write(&mut self, data: &str) -> io::Result<()> {
pub fn try_write(&mut self, data: &[u8]) -> io::Result<()> {
let queue = self.update_and_read_messages(data)?;
// Process and then send out each LSP message in our queue
@ -87,14 +87,18 @@ impl RemoteLspStdin {
// Convert distant:// to file://
data.mut_content().convert_distant_scheme_to_local();
data.refresh_content_length();
self.inner.try_write(&data.to_string())?;
self.inner.try_write_str(data.to_string())?;
}
Ok(())
}
pub fn try_write_str(&mut self, data: &str) -> io::Result<()> {
self.try_write(data.as_bytes())
}
/// Writes data to the stdin of a specific remote process
pub async fn write(&mut self, data: &str) -> io::Result<()> {
pub async fn write(&mut self, data: &[u8]) -> io::Result<()> {
let queue = self.update_and_read_messages(data)?;
// Process and then send out each LSP message in our queue
@ -102,17 +106,21 @@ impl RemoteLspStdin {
// Convert distant:// to file://
data.mut_content().convert_distant_scheme_to_local();
data.refresh_content_length();
self.inner.write(&data.to_string()).await?;
self.inner.write_str(data.to_string()).await?;
}
Ok(())
}
fn update_and_read_messages(&mut self, data: &str) -> io::Result<Vec<LspData>> {
pub async fn write_str(&mut self, data: &str) -> io::Result<()> {
self.write(data.as_bytes()).await
}
fn update_and_read_messages(&mut self, data: &[u8]) -> io::Result<Vec<LspData>> {
// Create or insert into our buffer
match &mut self.buf {
Some(buf) => buf.push_str(data),
None => self.buf = Some(data.to_string()),
Some(buf) => buf.extend(data),
None => self.buf = Some(data.to_vec()),
}
// Read LSP messages from our internal buffer
@ -137,7 +145,7 @@ impl RemoteLspStdin {
#[derive(Debug)]
pub struct RemoteLspStdout {
read_task: JoinHandle<()>,
rx: mpsc::Receiver<io::Result<String>>,
rx: mpsc::Receiver<io::Result<Vec<u8>>>,
}
impl RemoteLspStdout {
@ -157,7 +165,7 @@ impl RemoteLspStdout {
/// Tries to read a complete LSP message over stdout, returning `None` if no complete message
/// is available
pub fn try_read(&mut self) -> io::Result<Option<String>> {
pub fn try_read(&mut self) -> io::Result<Option<Vec<u8>>> {
match self.rx.try_recv() {
Ok(Ok(data)) => Ok(Some(data)),
Ok(Err(x)) => Err(x),
@ -166,13 +174,30 @@ impl RemoteLspStdout {
}
}
/// Same as `try_read`, but returns a string
pub fn try_read_string(&mut self) -> io::Result<Option<String>> {
self.try_read().and_then(|x| match x {
Some(data) => String::from_utf8(data)
.map(Some)
.map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x)),
None => Ok(None),
})
}
/// Reads a complete LSP message over stdout
pub async fn read(&mut self) -> io::Result<String> {
pub async fn read(&mut self) -> io::Result<Vec<u8>> {
self.rx
.recv()
.await
.ok_or_else(|| io::Error::from(io::ErrorKind::BrokenPipe))?
}
/// Same as `read`, but returns a string
pub async fn read_string(&mut self) -> io::Result<String> {
self.read().await.and_then(|data| {
String::from_utf8(data).map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x))
})
}
}
impl Drop for RemoteLspStdout {
@ -186,7 +211,7 @@ impl Drop for RemoteLspStdout {
#[derive(Debug)]
pub struct RemoteLspStderr {
read_task: JoinHandle<()>,
rx: mpsc::Receiver<io::Result<String>>,
rx: mpsc::Receiver<io::Result<Vec<u8>>>,
}
impl RemoteLspStderr {
@ -206,7 +231,7 @@ impl RemoteLspStderr {
/// Tries to read a complete LSP message over stderr, returning `None` if no complete message
/// is available
pub fn try_read(&mut self) -> io::Result<Option<String>> {
pub fn try_read(&mut self) -> io::Result<Option<Vec<u8>>> {
match self.rx.try_recv() {
Ok(Ok(data)) => Ok(Some(data)),
Ok(Err(x)) => Err(x),
@ -215,13 +240,30 @@ impl RemoteLspStderr {
}
}
/// Same as `try_read`, but returns a string
pub fn try_read_string(&mut self) -> io::Result<Option<String>> {
self.try_read().and_then(|x| match x {
Some(data) => String::from_utf8(data)
.map(Some)
.map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x)),
None => Ok(None),
})
}
/// Reads a complete LSP message over stderr
pub async fn read(&mut self) -> io::Result<String> {
pub async fn read(&mut self) -> io::Result<Vec<u8>> {
self.rx
.recv()
.await
.ok_or_else(|| io::Error::from(io::ErrorKind::BrokenPipe))?
}
/// Same as `read`, but returns a string
pub async fn read_string(&mut self) -> io::Result<String> {
self.read().await.and_then(|data| {
String::from_utf8(data).map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x))
})
}
}
impl Drop for RemoteLspStderr {
@ -231,18 +273,18 @@ impl Drop for RemoteLspStderr {
}
}
fn spawn_read_task<S>(mut stream: S) -> (JoinHandle<()>, mpsc::Receiver<io::Result<String>>)
fn spawn_read_task<S>(mut stream: S) -> (JoinHandle<()>, mpsc::Receiver<io::Result<Vec<u8>>>)
where
S: Stream<Item = String> + Send + Unpin + 'static,
S: Stream<Item = Vec<u8>> + Send + Unpin + 'static,
{
let (tx, rx) = mpsc::channel::<io::Result<String>>(1);
let (tx, rx) = mpsc::channel::<io::Result<Vec<u8>>>(1);
let read_task = tokio::spawn(async move {
let mut task_buf: Option<String> = None;
let mut task_buf: Option<Vec<u8>> = None;
while let Some(data) = stream.next().await {
// Create or insert into our buffer
match &mut task_buf {
Some(buf) => buf.push_str(&data),
Some(buf) => buf.extend(data),
None => task_buf = Some(data),
}
@ -259,12 +301,12 @@ where
// Process and then add each LSP message as output
if !queue.is_empty() {
let mut out = String::new();
let mut out = Vec::new();
for mut data in queue {
// Convert file:// to distant://
data.mut_content().convert_local_scheme_to_distant();
data.refresh_content_length();
write!(&mut out, "{}", data).unwrap();
out.extend(data.to_bytes());
}
if tx.send(Ok(out)).await.is_err() {
break;
@ -276,7 +318,7 @@ where
(read_task, rx)
}
fn read_lsp_messages(input: &str) -> io::Result<(Option<String>, Vec<LspData>)> {
fn read_lsp_messages(input: &[u8]) -> io::Result<(Option<Vec<u8>>, Vec<LspData>)> {
let mut queue = Vec::new();
// Continue to read complete messages from the input until we either fail to parse or we reach
@ -290,10 +332,10 @@ fn read_lsp_messages(input: &str) -> io::Result<(Option<String>, Vec<LspData>)>
}
cursor.set_position(pos);
// Keep remainder of string not processed as LSP message in buffer
// Keep remainder of bytes not processed as LSP message in buffer
let remainder = if (cursor.position() as usize) < cursor.get_ref().len() {
let mut buf = String::new();
cursor.read_to_string(&mut buf)?;
let mut buf = Vec::new();
cursor.read_to_end(&mut buf)?;
Some(buf)
} else {
None
@ -326,6 +368,7 @@ mod tests {
String::from("cmd"),
vec![String::from("arg")],
false,
None,
)
.await
});
@ -337,7 +380,7 @@ mod tests {
t1.send(Response::new(
"test-tenant",
req.id,
vec![ResponseData::ProcStart { id: rand::random() }],
vec![ResponseData::ProcSpawned { id: rand::random() }],
))
.await
.unwrap();
@ -347,12 +390,12 @@ mod tests {
(t1, proc)
}
fn make_lsp_msg<T>(value: T) -> String
fn make_lsp_msg<T>(value: T) -> Vec<u8>
where
T: serde::Serialize,
{
let content = serde_json::to_string_pretty(&value).unwrap();
format!("Content-Length: {}\r\n\r\n{}", content.len(), content)
format!("Content-Length: {}\r\n\r\n{}", content.len(), content).into_bytes()
}
async fn timeout<F, R>(duration: Duration, f: F) -> io::Result<R>
@ -455,7 +498,7 @@ mod tests {
proc.stdin
.as_mut()
.unwrap()
.write(&format!("{}{}", msg, extra))
.write_str(&format!("{}{}", String::from_utf8(msg).unwrap(), extra))
.await
.unwrap();
@ -477,7 +520,7 @@ mod tests {
// Also validate that the internal buffer still contains the extra
assert_eq!(
proc.stdin.unwrap().buf.unwrap(),
String::from_utf8(proc.stdin.unwrap().buf.unwrap()).unwrap(),
extra,
"Extra was not retained"
);
@ -501,7 +544,11 @@ mod tests {
proc.stdin
.as_mut()
.unwrap()
.write(&format!("{}{}", msg_1, msg_2))
.write_str(&format!(
"{}{}",
String::from_utf8(msg_1).unwrap(),
String::from_utf8(msg_2).unwrap()
))
.await
.unwrap();
@ -620,7 +667,7 @@ mod tests {
proc.origin_id(),
vec![ResponseData::ProcStdout {
id: proc.id(),
data: msg_a.to_string(),
data: msg_a.to_vec(),
}],
))
.await
@ -639,7 +686,7 @@ mod tests {
proc.origin_id(),
vec![ResponseData::ProcStdout {
id: proc.id(),
data: msg_b.to_string(),
data: msg_b.to_vec(),
}],
))
.await
@ -674,7 +721,7 @@ mod tests {
proc.origin_id(),
vec![ResponseData::ProcStdout {
id: proc.id(),
data: format!("{}{}", msg, extra),
data: format!("{}{}", String::from_utf8(msg).unwrap(), extra).into_bytes(),
}],
))
.await
@ -718,7 +765,12 @@ mod tests {
proc.origin_id(),
vec![ResponseData::ProcStdout {
id: proc.id(),
data: format!("{}{}", msg_1, msg_2),
data: format!(
"{}{}",
String::from_utf8(msg_1).unwrap(),
String::from_utf8(msg_2).unwrap()
)
.into_bytes(),
}],
))
.await
@ -730,15 +782,18 @@ mod tests {
out,
format!(
"{}{}",
make_lsp_msg(serde_json::json!({
String::from_utf8(make_lsp_msg(serde_json::json!({
"field1": "a",
"field2": "b",
})),
make_lsp_msg(serde_json::json!({
})))
.unwrap(),
String::from_utf8(make_lsp_msg(serde_json::json!({
"field1": "c",
"field2": "d",
}))
})))
.unwrap()
)
.into_bytes()
);
}
@ -821,7 +876,7 @@ mod tests {
proc.origin_id(),
vec![ResponseData::ProcStderr {
id: proc.id(),
data: msg_a.to_string(),
data: msg_a.to_vec(),
}],
))
.await
@ -840,7 +895,7 @@ mod tests {
proc.origin_id(),
vec![ResponseData::ProcStderr {
id: proc.id(),
data: msg_b.to_string(),
data: msg_b.to_vec(),
}],
))
.await
@ -875,7 +930,7 @@ mod tests {
proc.origin_id(),
vec![ResponseData::ProcStderr {
id: proc.id(),
data: format!("{}{}", msg, extra),
data: format!("{}{}", String::from_utf8(msg).unwrap(), extra).into_bytes(),
}],
))
.await
@ -919,7 +974,12 @@ mod tests {
proc.origin_id(),
vec![ResponseData::ProcStderr {
id: proc.id(),
data: format!("{}{}", msg_1, msg_2),
data: format!(
"{}{}",
String::from_utf8(msg_1).unwrap(),
String::from_utf8(msg_2).unwrap()
)
.into_bytes(),
}],
))
.await
@ -931,15 +991,18 @@ mod tests {
err,
format!(
"{}{}",
make_lsp_msg(serde_json::json!({
String::from_utf8(make_lsp_msg(serde_json::json!({
"field1": "a",
"field2": "b",
})),
make_lsp_msg(serde_json::json!({
})))
.unwrap(),
String::from_utf8(make_lsp_msg(serde_json::json!({
"field1": "c",
"field2": "d",
}))
})))
.unwrap()
)
.into_bytes()
);
}

@ -1,7 +1,7 @@
use crate::{
client::{Mailbox, SessionChannel},
constants::CLIENT_PIPE_CAPACITY,
data::{Request, RequestData, ResponseData},
data::{PtySize, Request, RequestData, ResponseData},
net::TransportError,
};
use derive_more::{Display, Error, From};
@ -79,6 +79,7 @@ impl RemoteProcess {
cmd: impl Into<String>,
args: Vec<String>,
detached: bool,
pty: Option<PtySize>,
) -> Result<Self, RemoteProcessError> {
let tenant = tenant.into();
let cmd = cmd.into();
@ -87,10 +88,11 @@ impl RemoteProcess {
let mut mailbox = channel
.mail(Request::new(
tenant.as_str(),
vec![RequestData::ProcRun {
vec![RequestData::ProcSpawn {
cmd,
args,
detached,
pty,
}],
))
.await?;
@ -105,7 +107,7 @@ impl RemoteProcess {
Some(res) => {
let origin_id = res.origin_id;
match res.payload.into_iter().next().unwrap() {
ResponseData::ProcStart { id } => (id, origin_id),
ResponseData::ProcSpawned { id } => (id, origin_id),
ResponseData::Error(x) => {
return Err(RemoteProcessError::TransportError(TransportError::IoError(
x.into(),
@ -243,13 +245,13 @@ impl RemoteProcess {
/// A handle to a remote process' standard input (stdin)
#[derive(Debug)]
pub struct RemoteStdin(mpsc::Sender<String>);
pub struct RemoteStdin(mpsc::Sender<Vec<u8>>);
impl RemoteStdin {
/// Tries to write to the stdin of the remote process, returning ok if immediately
/// successful, `WouldBlock` if would need to wait to send data, and `BrokenPipe`
/// if stdin has been closed
pub fn try_write(&mut self, data: impl Into<String>) -> io::Result<()> {
pub fn try_write(&mut self, data: impl Into<Vec<u8>>) -> io::Result<()> {
match self.0.try_send(data.into()) {
Ok(data) => Ok(data),
Err(TrySendError::Full(_)) => Err(io::Error::from(io::ErrorKind::WouldBlock)),
@ -257,14 +259,24 @@ impl RemoteStdin {
}
}
/// Same as `try_write`, but with a string
pub fn try_write_str(&mut self, data: impl Into<String>) -> io::Result<()> {
self.try_write(data.into().into_bytes())
}
/// Writes data to the stdin of a specific remote process
pub async fn write(&mut self, data: impl Into<String>) -> io::Result<()> {
pub async fn write(&mut self, data: impl Into<Vec<u8>>) -> io::Result<()> {
self.0
.send(data.into())
.await
.map_err(|x| io::Error::new(io::ErrorKind::BrokenPipe, x))
}
/// Same as `write`, but with a string
pub async fn write_str(&mut self, data: impl Into<String>) -> io::Result<()> {
self.write(data.into().into_bytes()).await
}
/// Checks if stdin has been closed
pub fn is_closed(&self) -> bool {
self.0.is_closed()
@ -273,12 +285,12 @@ impl RemoteStdin {
/// A handle to a remote process' standard output (stdout)
#[derive(Debug)]
pub struct RemoteStdout(mpsc::Receiver<String>);
pub struct RemoteStdout(mpsc::Receiver<Vec<u8>>);
impl RemoteStdout {
/// Tries to receive latest stdout for a remote process, yielding `None`
/// if no stdout is available, and `BrokenPipe` if stdout has been closed
pub fn try_read(&mut self) -> io::Result<Option<String>> {
pub fn try_read(&mut self) -> io::Result<Option<Vec<u8>>> {
match self.0.try_recv() {
Ok(data) => Ok(Some(data)),
Err(TryRecvError::Empty) => Ok(None),
@ -286,24 +298,41 @@ impl RemoteStdout {
}
}
/// Same as `try_read`, but returns a string
pub fn try_read_string(&mut self) -> io::Result<Option<String>> {
self.try_read().and_then(|x| match x {
Some(data) => String::from_utf8(data)
.map(Some)
.map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x)),
None => Ok(None),
})
}
/// Retrieves the latest stdout for a specific remote process, and `BrokenPipe` if stdout has
/// been closed
pub async fn read(&mut self) -> io::Result<String> {
pub async fn read(&mut self) -> io::Result<Vec<u8>> {
self.0
.recv()
.await
.ok_or_else(|| io::Error::from(io::ErrorKind::BrokenPipe))
}
/// Same as `read`, but returns a string
pub async fn read_string(&mut self) -> io::Result<String> {
self.read().await.and_then(|data| {
String::from_utf8(data).map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x))
})
}
}
/// A handle to a remote process' stderr
#[derive(Debug)]
pub struct RemoteStderr(mpsc::Receiver<String>);
pub struct RemoteStderr(mpsc::Receiver<Vec<u8>>);
impl RemoteStderr {
/// Tries to receive latest stderr for a remote process, yielding `None`
/// if no stderr is available, and `BrokenPipe` if stderr has been closed
pub fn try_read(&mut self) -> io::Result<Option<String>> {
pub fn try_read(&mut self) -> io::Result<Option<Vec<u8>>> {
match self.0.try_recv() {
Ok(data) => Ok(Some(data)),
Err(TryRecvError::Empty) => Ok(None),
@ -311,14 +340,31 @@ impl RemoteStderr {
}
}
/// Same as `try_read`, but returns a string
pub fn try_read_string(&mut self) -> io::Result<Option<String>> {
self.try_read().and_then(|x| match x {
Some(data) => String::from_utf8(data)
.map(Some)
.map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x)),
None => Ok(None),
})
}
/// Retrieves the latest stderr for a specific remote process, and `BrokenPipe` if stderr has
/// been closed
pub async fn read(&mut self) -> io::Result<String> {
pub async fn read(&mut self) -> io::Result<Vec<u8>> {
self.0
.recv()
.await
.ok_or_else(|| io::Error::from(io::ErrorKind::BrokenPipe))
}
/// Same as `read`, but returns a string
pub async fn read_string(&mut self) -> io::Result<String> {
self.read().await.and_then(|data| {
String::from_utf8(data).map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x))
})
}
}
/// Helper function that loops, processing outgoing stdin requests to a remote process as well as
@ -327,7 +373,7 @@ async fn process_outgoing_requests(
tenant: String,
id: usize,
mut channel: SessionChannel,
mut stdin_rx: mpsc::Receiver<String>,
mut stdin_rx: mpsc::Receiver<Vec<u8>>,
mut kill_rx: mpsc::Receiver<()>,
) -> Result<(), RemoteProcessError> {
let result = loop {
@ -365,8 +411,8 @@ async fn process_outgoing_requests(
async fn process_incoming_responses(
proc_id: usize,
mut mailbox: Mailbox,
stdout_tx: mpsc::Sender<String>,
stderr_tx: mpsc::Sender<String>,
stdout_tx: mpsc::Sender<Vec<u8>>,
stderr_tx: mpsc::Sender<Vec<u8>>,
kill_tx: mpsc::Sender<()>,
) -> Result<(bool, Option<i32>), RemoteProcessError> {
while let Some(res) = mailbox.next().await {
@ -436,6 +482,7 @@ mod tests {
String::from("cmd"),
vec![String::from("arg")],
false,
None,
)
.await
});
@ -475,6 +522,7 @@ mod tests {
String::from("cmd"),
vec![String::from("arg")],
false,
None,
)
.await
});
@ -521,6 +569,7 @@ mod tests {
String::from("cmd"),
vec![String::from("arg")],
false,
None,
)
.await
});
@ -534,7 +583,7 @@ mod tests {
.send(Response::new(
"test-tenant",
req.id,
vec![ResponseData::ProcStart { id }],
vec![ResponseData::ProcSpawned { id }],
))
.await
.unwrap();
@ -567,6 +616,7 @@ mod tests {
String::from("cmd"),
vec![String::from("arg")],
false,
None,
)
.await
});
@ -580,7 +630,7 @@ mod tests {
.send(Response::new(
"test-tenant",
req.id,
vec![ResponseData::ProcStart { id }],
vec![ResponseData::ProcSpawned { id }],
))
.await
.unwrap();
@ -624,6 +674,7 @@ mod tests {
String::from("cmd"),
vec![String::from("arg")],
false,
None,
)
.await
});
@ -637,7 +688,7 @@ mod tests {
.send(Response::new(
"test-tenant",
req.id,
vec![ResponseData::ProcStart { id }],
vec![ResponseData::ProcSpawned { id }],
))
.await
.unwrap();
@ -661,7 +712,7 @@ mod tests {
{
RequestData::ProcStdin { id, data } => {
assert_eq!(*id, 12345);
assert_eq!(data, "some input");
assert_eq!(data, b"some input");
}
x => panic!("Unexpected request: {:?}", x),
}
@ -680,6 +731,7 @@ mod tests {
String::from("cmd"),
vec![String::from("arg")],
false,
None,
)
.await
});
@ -693,7 +745,7 @@ mod tests {
.send(Response::new(
"test-tenant",
req.id,
vec![ResponseData::ProcStart { id }],
vec![ResponseData::ProcSpawned { id }],
))
.await
.unwrap();
@ -707,14 +759,14 @@ mod tests {
req.id,
vec![ResponseData::ProcStdout {
id,
data: String::from("some out"),
data: b"some out".to_vec(),
}],
))
.await
.unwrap();
let out = proc.stdout.as_mut().unwrap().read().await.unwrap();
assert_eq!(out, "some out");
assert_eq!(out, b"some out");
}
#[tokio::test]
@ -730,6 +782,7 @@ mod tests {
String::from("cmd"),
vec![String::from("arg")],
false,
None,
)
.await
});
@ -743,7 +796,7 @@ mod tests {
.send(Response::new(
"test-tenant",
req.id,
vec![ResponseData::ProcStart { id }],
vec![ResponseData::ProcSpawned { id }],
))
.await
.unwrap();
@ -757,14 +810,14 @@ mod tests {
req.id,
vec![ResponseData::ProcStderr {
id,
data: String::from("some err"),
data: b"some err".to_vec(),
}],
))
.await
.unwrap();
let out = proc.stderr.as_mut().unwrap().read().await.unwrap();
assert_eq!(out, "some err");
assert_eq!(out, b"some err");
}
#[tokio::test]
@ -780,6 +833,7 @@ mod tests {
String::from("cmd"),
vec![String::from("arg")],
false,
None,
)
.await
});
@ -793,7 +847,7 @@ mod tests {
.send(Response::new(
"test-tenant",
req.id,
vec![ResponseData::ProcStart { id }],
vec![ResponseData::ProcSpawned { id }],
))
.await
.unwrap();
@ -818,6 +872,7 @@ mod tests {
String::from("cmd"),
vec![String::from("arg")],
false,
None,
)
.await
});
@ -831,7 +886,7 @@ mod tests {
.send(Response::new(
"test-tenant",
req.id,
vec![ResponseData::ProcStart { id }],
vec![ResponseData::ProcSpawned { id }],
))
.await
.unwrap();
@ -864,6 +919,7 @@ mod tests {
String::from("cmd"),
vec![String::from("arg")],
false,
None,
)
.await
});
@ -877,7 +933,7 @@ mod tests {
.send(Response::new(
"test-tenant",
req.id,
vec![ResponseData::ProcStart { id }],
vec![ResponseData::ProcSpawned { id }],
))
.await
.unwrap();
@ -919,6 +975,7 @@ mod tests {
String::from("cmd"),
vec![String::from("arg")],
false,
None,
)
.await
});
@ -932,7 +989,7 @@ mod tests {
.send(Response::new(
"test-tenant",
req.id,
vec![ResponseData::ProcStart { id }],
vec![ResponseData::ProcSpawned { id }],
))
.await
.unwrap();
@ -962,6 +1019,7 @@ mod tests {
String::from("cmd"),
vec![String::from("arg")],
false,
None,
)
.await
});
@ -975,7 +1033,7 @@ mod tests {
.send(Response::new(
"test-tenant",
req.id,
vec![ResponseData::ProcStart { id }],
vec![ResponseData::ProcSpawned { id }],
))
.await
.unwrap();
@ -1012,6 +1070,7 @@ mod tests {
String::from("cmd"),
vec![String::from("arg")],
false,
None,
)
.await
});
@ -1025,7 +1084,7 @@ mod tests {
.send(Response::new(
"test-tenant",
req.id,
vec![ResponseData::ProcStart { id }],
vec![ResponseData::ProcSpawned { id }],
))
.await
.unwrap();

@ -1,6 +1,9 @@
use crate::{
client::{RemoteLspProcess, RemoteProcess, RemoteProcessError, SessionChannel},
data::{DirEntry, Error as Failure, Metadata, Request, RequestData, ResponseData, SystemInfo},
data::{
DirEntry, Error as Failure, Metadata, PtySize, Request, RequestData, ResponseData,
SystemInfo,
},
net::TransportError,
};
use derive_more::{Display, Error, From};
@ -122,6 +125,7 @@ pub trait SessionChannelExt {
cmd: impl Into<String>,
args: Vec<impl Into<String>>,
detached: bool,
pty: Option<PtySize>,
) -> AsyncReturn<'_, RemoteProcess, RemoteProcessError>;
/// Spawns an LSP process on the remote machine
@ -131,6 +135,7 @@ pub trait SessionChannelExt {
cmd: impl Into<String>,
args: Vec<impl Into<String>>,
detached: bool,
pty: Option<PtySize>,
) -> AsyncReturn<'_, RemoteLspProcess, RemoteProcessError>;
/// Retrieves information about the remote system
@ -375,13 +380,14 @@ impl SessionChannelExt for SessionChannel {
cmd: impl Into<String>,
args: Vec<impl Into<String>>,
detached: bool,
pty: Option<PtySize>,
) -> AsyncReturn<'_, RemoteProcess, RemoteProcessError> {
let tenant = tenant.into();
let cmd = cmd.into();
let args = args.into_iter().map(Into::into).collect();
Box::pin(
async move { RemoteProcess::spawn(tenant, self.clone(), cmd, args, detached).await },
)
Box::pin(async move {
RemoteProcess::spawn(tenant, self.clone(), cmd, args, detached, pty).await
})
}
fn spawn_lsp(
@ -390,13 +396,14 @@ impl SessionChannelExt for SessionChannel {
cmd: impl Into<String>,
args: Vec<impl Into<String>>,
detached: bool,
pty: Option<PtySize>,
) -> AsyncReturn<'_, RemoteLspProcess, RemoteProcessError> {
let tenant = tenant.into();
let cmd = cmd.into();
let args = args.into_iter().map(Into::into).collect();
Box::pin(
async move { RemoteLspProcess::spawn(tenant, self.clone(), cmd, args, detached).await },
)
Box::pin(async move {
RemoteLspProcess::spawn(tenant, self.clone(), cmd, args, detached, pty).await
})
}
fn system_info(&mut self, tenant: impl Into<String>) -> AsyncReturn<'_, SystemInfo> {

@ -1,6 +1,6 @@
use derive_more::{Display, IsVariant};
use derive_more::{Display, Error, IsVariant};
use serde::{Deserialize, Serialize};
use std::{io, path::PathBuf};
use std::{io, num::ParseIntError, path::PathBuf, str::FromStr};
use strum::AsRefStr;
/// Type alias for a vec of bytes
@ -209,9 +209,9 @@ pub enum RequestData {
resolve_file_type: bool,
},
/// Runs a process on the remote machine
/// Spawns a new process on the remote machine
#[cfg_attr(feature = "structopt", structopt(visible_aliases = &["run"]))]
ProcRun {
ProcSpawn {
/// Name of the command to run
cmd: String,
@ -222,6 +222,10 @@ pub enum RequestData {
/// killed when the associated client disconnects
#[cfg_attr(feature = "structopt", structopt(long))]
detached: bool,
/// If provided, will spawn process in a pty, otherwise spawns directly
#[cfg_attr(feature = "structopt", structopt(long))]
pty: Option<PtySize>,
},
/// Kills a process running on the remote machine
@ -237,7 +241,16 @@ pub enum RequestData {
id: usize,
/// Data to send to a process's stdin pipe
data: String,
data: Vec<u8>,
},
/// Resize pty of remote process
ProcResizePty {
/// Id of the actively-running process whose pty to resize
id: usize,
/// The new pty dimensions
size: PtySize,
},
/// Retrieve a list of all processes being managed by the remote server
@ -328,7 +341,7 @@ pub enum ResponseData {
Metadata(Metadata),
/// Response to starting a new process
ProcStart {
ProcSpawned {
/// Arbitrary id associated with running process
id: usize,
},
@ -339,7 +352,7 @@ pub enum ResponseData {
id: usize,
/// Data read from a process' stdout pipe
data: String,
data: Vec<u8>,
},
/// Actively-transmitted stderr as part of running process
@ -348,7 +361,7 @@ pub enum ResponseData {
id: usize,
/// Data read from a process' stderr pipe
data: String,
data: Vec<u8>,
},
/// Response to a process finishing
@ -373,6 +386,83 @@ pub enum ResponseData {
SystemInfo(SystemInfo),
}
/// Represents the size associated with a remote PTY
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct PtySize {
/// Number of lines of text
pub rows: u16,
/// Number of columns of text
pub cols: u16,
/// Width of a cell in pixels. Note that some systems never fill this value and ignore it.
pub pixel_width: u16,
/// Height of a cell in pixels. Note that some systems never fill this value and ignore it.
pub pixel_height: u16,
}
impl PtySize {
/// Creates new size using just rows and columns
pub fn from_rows_and_cols(rows: u16, cols: u16) -> Self {
Self {
rows,
cols,
pixel_width: 0,
pixel_height: 0,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, Display, Error)]
pub enum PtySizeParseError {
MissingRows,
MissingColumns,
InvalidRows(ParseIntError),
InvalidColumns(ParseIntError),
InvalidPixelWidth(ParseIntError),
InvalidPixelHeight(ParseIntError),
}
impl FromStr for PtySize {
type Err = PtySizeParseError;
/// Attempts to parse a str into PtySize using one of the following formats:
///
/// * rows,cols (defaults to 0 for pixel_width & pixel_height)
/// * rows,cols,pixel_width,pixel_height
fn from_str(s: &str) -> Result<Self, Self::Err> {
let mut tokens = s.split(',');
Ok(Self {
rows: tokens
.next()
.ok_or(PtySizeParseError::MissingRows)?
.trim()
.parse()
.map_err(PtySizeParseError::InvalidRows)?,
cols: tokens
.next()
.ok_or(PtySizeParseError::MissingColumns)?
.trim()
.parse()
.map_err(PtySizeParseError::InvalidColumns)?,
pixel_width: tokens
.next()
.map(|s| s.trim().parse())
.transpose()
.map_err(PtySizeParseError::InvalidPixelWidth)?
.unwrap_or(0),
pixel_height: tokens
.next()
.map(|s| s.trim().parse())
.transpose()
.map_err(PtySizeParseError::InvalidPixelHeight)?
.unwrap_or(0),
})
}
}
/// Represents metadata about some path on a remote machine
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct Metadata {
@ -495,6 +585,9 @@ pub struct RunningProcess {
/// Whether or not the process was run in detached mode
pub detached: bool,
/// Pty associated with running process if it has one
pub pty: Option<PtySize>,
/// Arbitrary id associated with running process
///
/// Not the same as the process' pid!

@ -1,7 +1,7 @@
use crate::{
constants::{MAX_PIPE_CHUNK_SIZE, READ_PAUSE_MILLIS},
data::{
self, DirEntry, FileType, Metadata, Request, RequestData, Response, ResponseData,
self, DirEntry, FileType, Metadata, PtySize, Request, RequestData, Response, ResponseData,
RunningProcess, SystemInfo,
},
server::distant::state::{Process, State},
@ -98,13 +98,17 @@ pub(super) async fn process(
canonicalize,
resolve_file_type,
} => metadata(path, canonicalize, resolve_file_type).await,
RequestData::ProcRun {
RequestData::ProcSpawn {
cmd,
args,
detached,
} => proc_run(conn_id, state, reply, cmd, args, detached).await,
pty,
} => proc_spawn(conn_id, state, reply, cmd, args, detached, pty).await,
RequestData::ProcKill { id } => proc_kill(conn_id, state, id).await,
RequestData::ProcStdin { id, data } => proc_stdin(conn_id, state, id, data).await,
RequestData::ProcResizePty { id, size } => {
proc_resize_pty(conn_id, state, id, size).await
}
RequestData::ProcList {} => proc_list(state).await,
RequestData::SystemInfo {} => system_info().await,
}
@ -423,13 +427,14 @@ async fn metadata(
})))
}
async fn proc_run<F>(
async fn proc_spawn<F>(
conn_id: usize,
state: HState,
reply: F,
cmd: String,
args: Vec<String>,
detached: bool,
pty: Option<PtySize>,
) -> Result<Outgoing, ServerError>
where
F: FnMut(Vec<ResponseData>) -> ReplyRet + Clone + Send + 'static,
@ -452,7 +457,7 @@ where
state
.lock()
.await
.push_process(conn_id, Process::new(id, cmd, args, detached));
.push_process(conn_id, Process::new(id, cmd, args, detached, pty));
let post_hook = Box::new(move |mut state_lock: MutexGuard<'_, State>| {
// Spawn a task that sends stdout as a response
@ -462,29 +467,21 @@ where
let mut buf: [u8; MAX_PIPE_CHUNK_SIZE] = [0; MAX_PIPE_CHUNK_SIZE];
loop {
match stdout.read(&mut buf).await {
Ok(n) if n > 0 => match String::from_utf8(buf[..n].to_vec()) {
Ok(data) => {
let payload = vec![ResponseData::ProcStdout { id, data }];
if !reply_2(payload).await {
error!("<Conn @ {} | Proc {}> Stdout channel closed", conn_id, id);
break;
}
// Pause to allow buffer to fill up a little bit, avoiding
// spamming with a lot of smaller responses
tokio::time::sleep(tokio::time::Duration::from_millis(
READ_PAUSE_MILLIS,
))
.await;
}
Err(x) => {
error!(
"<Conn @ {} | Proc {}> Invalid data read from stdout pipe: {}",
conn_id, id, x
);
Ok(n) if n > 0 => {
let payload = vec![ResponseData::ProcStdout {
id,
data: buf[..n].to_vec(),
}];
if !reply_2(payload).await {
error!("<Conn @ {} | Proc {}> Stdout channel closed", conn_id, id);
break;
}
},
// Pause to allow buffer to fill up a little bit, avoiding
// spamming with a lot of smaller responses
tokio::time::sleep(tokio::time::Duration::from_millis(READ_PAUSE_MILLIS))
.await;
}
Ok(_) => break,
Err(x) => {
error!(
@ -504,29 +501,21 @@ where
let mut buf: [u8; MAX_PIPE_CHUNK_SIZE] = [0; MAX_PIPE_CHUNK_SIZE];
loop {
match stderr.read(&mut buf).await {
Ok(n) if n > 0 => match String::from_utf8(buf[..n].to_vec()) {
Ok(data) => {
let payload = vec![ResponseData::ProcStderr { id, data }];
if !reply_2(payload).await {
error!("<Conn @ {} | Proc {}> Stderr channel closed", conn_id, id);
break;
}
// Pause to allow buffer to fill up a little bit, avoiding
// spamming with a lot of smaller responses
tokio::time::sleep(tokio::time::Duration::from_millis(
READ_PAUSE_MILLIS,
))
.await;
}
Err(x) => {
error!(
"<Conn @ {} | Proc {}> Invalid data read from stdout pipe: {}",
conn_id, id, x
);
Ok(n) if n > 0 => {
let payload = vec![ResponseData::ProcStderr {
id,
data: buf[..n].to_vec(),
}];
if !reply_2(payload).await {
error!("<Conn @ {} | Proc {}> Stderr channel closed", conn_id, id);
break;
}
},
// Pause to allow buffer to fill up a little bit, avoiding
// spamming with a lot of smaller responses
tokio::time::sleep(tokio::time::Duration::from_millis(READ_PAUSE_MILLIS))
.await;
}
Ok(_) => break,
Err(x) => {
error!(
@ -541,10 +530,10 @@ where
// Spawn a task that sends stdin to the process
let mut stdin = child.stdin.take().unwrap();
let (stdin_tx, mut stdin_rx) = mpsc::channel::<String>(1);
let (stdin_tx, mut stdin_rx) = mpsc::channel::<Vec<u8>>(1);
let stdin_task = tokio::spawn(async move {
while let Some(line) = stdin_rx.recv().await {
if let Err(x) = stdin.write_all(line.as_bytes()).await {
if let Err(x) = stdin.write_all(&line).await {
error!(
"<Conn @ {} | Proc {}> Failed to send stdin: {}",
conn_id, id, x
@ -661,7 +650,7 @@ where
conn_id, id
);
Ok(Outgoing {
data: ResponseData::ProcStart { id },
data: ResponseData::ProcSpawned { id },
post_hook: Some(post_hook),
})
}
@ -686,7 +675,7 @@ async fn proc_stdin(
conn_id: usize,
state: HState,
id: usize,
data: String,
data: Vec<u8>,
) -> Result<Outgoing, ServerError> {
if let Some(process) = state.lock().await.processes.get(&id) {
if process.send_stdin(data).await {
@ -703,6 +692,15 @@ async fn proc_stdin(
)))
}
async fn proc_resize_pty(
conn_id: usize,
state: HState,
id: usize,
size: PtySize,
) -> Result<Outgoing, ServerError> {
todo!();
}
async fn proc_list(state: HState) -> Result<Outgoing, ServerError> {
Ok(Outgoing::from(ResponseData::ProcEntries {
entries: state
@ -714,6 +712,7 @@ async fn proc_list(state: HState) -> Result<Outgoing, ServerError> {
cmd: p.cmd.to_string(),
args: p.args.clone(),
detached: p.detached,
pty: p.pty.clone(),
id: p.id,
})
.collect(),
@ -2219,10 +2218,11 @@ mod tests {
let req = Request::new(
"test-tenant",
vec![RequestData::ProcRun {
vec![RequestData::ProcSpawn {
cmd: DOES_NOT_EXIST_BIN.to_str().unwrap().to_string(),
args: Vec::new(),
detached: false,
pty: None,
}],
);
@ -2245,10 +2245,11 @@ mod tests {
let req = Request::new(
"test-tenant",
vec![RequestData::ProcRun {
vec![RequestData::ProcSpawn {
cmd: SCRIPT_RUNNER.to_string(),
args: vec![ECHO_ARGS_TO_STDOUT_SH.to_str().unwrap().to_string()],
detached: false,
pty: None,
}],
);
@ -2259,7 +2260,7 @@ mod tests {
let res = rx.recv().await.unwrap();
assert_eq!(res.payload.len(), 1, "Wrong payload size");
assert!(
matches!(&res.payload[0], ResponseData::ProcStart { .. }),
matches!(&res.payload[0], ResponseData::ProcSpawned { .. }),
"Unexpected response: {:?}",
res.payload[0]
);
@ -2275,13 +2276,14 @@ mod tests {
// Run a program that echoes to stdout
let req = Request::new(
"test-tenant",
vec![RequestData::ProcRun {
vec![RequestData::ProcSpawn {
cmd: SCRIPT_RUNNER.to_string(),
args: vec![
ECHO_ARGS_TO_STDOUT_SH.to_str().unwrap().to_string(),
String::from("some stdout"),
],
detached: false,
pty: None,
}],
);
@ -2292,7 +2294,7 @@ mod tests {
let res = rx.recv().await.unwrap();
assert_eq!(res.payload.len(), 1, "Wrong payload size");
assert!(
matches!(&res.payload[0], ResponseData::ProcStart { .. }),
matches!(&res.payload[0], ResponseData::ProcSpawned { .. }),
"Unexpected response: {:?}",
res.payload[0]
);
@ -2314,7 +2316,7 @@ mod tests {
assert_eq!(res.payload.len(), 1, "Wrong payload size");
match &res.payload[0] {
ResponseData::ProcStdout { data, .. } => {
assert_eq!(data, "some stdout", "Got wrong stdout");
assert_eq!(data, b"some stdout", "Got wrong stdout");
got_stdout = true;
}
ResponseData::ProcDone { success, .. } => {
@ -2341,13 +2343,14 @@ mod tests {
// Run a program that echoes to stderr
let req = Request::new(
"test-tenant",
vec![RequestData::ProcRun {
vec![RequestData::ProcSpawn {
cmd: SCRIPT_RUNNER.to_string(),
args: vec![
ECHO_ARGS_TO_STDERR_SH.to_str().unwrap().to_string(),
String::from("some stderr"),
],
detached: false,
pty: None,
}],
);
@ -2358,7 +2361,7 @@ mod tests {
let res = rx.recv().await.unwrap();
assert_eq!(res.payload.len(), 1, "Wrong payload size");
assert!(
matches!(&res.payload[0], ResponseData::ProcStart { .. }),
matches!(&res.payload[0], ResponseData::ProcSpawned { .. }),
"Unexpected response: {:?}",
res.payload[0]
);
@ -2380,7 +2383,7 @@ mod tests {
assert_eq!(res.payload.len(), 1, "Wrong payload size");
match &res.payload[0] {
ResponseData::ProcStderr { data, .. } => {
assert_eq!(data, "some stderr", "Got wrong stderr");
assert_eq!(data, b"some stderr", "Got wrong stderr");
got_stderr = true;
}
ResponseData::ProcDone { success, .. } => {
@ -2407,10 +2410,11 @@ mod tests {
// Run a program that ends after a little bit
let req = Request::new(
"test-tenant",
vec![RequestData::ProcRun {
vec![RequestData::ProcSpawn {
cmd: SCRIPT_RUNNER.to_string(),
args: vec![SLEEP_SH.to_str().unwrap().to_string(), String::from("0.1")],
detached: false,
pty: None,
}],
);
@ -2421,7 +2425,7 @@ mod tests {
let res = rx.recv().await.unwrap();
assert_eq!(res.payload.len(), 1, "Wrong payload size");
let id = match &res.payload[0] {
ResponseData::ProcStart { id } => *id,
ResponseData::ProcSpawned { id } => *id,
x => panic!("Unexpected response: {:?}", x),
};
@ -2450,10 +2454,11 @@ mod tests {
// Run a program that ends slowly
let req = Request::new(
"test-tenant",
vec![RequestData::ProcRun {
vec![RequestData::ProcSpawn {
cmd: SCRIPT_RUNNER.to_string(),
args: vec![SLEEP_SH.to_str().unwrap().to_string(), String::from("1")],
detached: false,
pty: None,
}],
);
@ -2464,7 +2469,7 @@ mod tests {
let res = rx.recv().await.unwrap();
assert_eq!(res.payload.len(), 1, "Wrong payload size");
let id = match &res.payload[0] {
ResponseData::ProcStart { id } => *id,
ResponseData::ProcSpawned { id } => *id,
x => panic!("Unexpected response: {:?}", x),
};
@ -2525,10 +2530,11 @@ mod tests {
// First, run a program that sits around (sleep for 1 second)
let req = Request::new(
"test-tenant",
vec![RequestData::ProcRun {
vec![RequestData::ProcSpawn {
cmd: SCRIPT_RUNNER.to_string(),
args: vec![SLEEP_SH.to_str().unwrap().to_string(), String::from("1")],
detached: false,
pty: None,
}],
);
@ -2541,7 +2547,7 @@ mod tests {
// Second, grab the id of the started process
let id = match &res.payload[0] {
ResponseData::ProcStart { id } => *id,
ResponseData::ProcSpawned { id } => *id,
x => panic!("Unexpected response: {:?}", x),
};
@ -2592,7 +2598,7 @@ mod tests {
"test-tenant",
vec![RequestData::ProcStdin {
id: 0xDEADBEEF,
data: String::from("some input"),
data: b"some input".to_vec(),
}],
);
@ -2621,10 +2627,11 @@ mod tests {
// First, run a program that listens for stdin
let req = Request::new(
"test-tenant",
vec![RequestData::ProcRun {
vec![RequestData::ProcSpawn {
cmd: SCRIPT_RUNNER.to_string(),
args: vec![ECHO_STDIN_TO_STDOUT_SH.to_str().unwrap().to_string()],
detached: false,
pty: None,
}],
);
@ -2637,7 +2644,7 @@ mod tests {
// Second, grab the id of the started process
let id = match &res.payload[0] {
ResponseData::ProcStart { id } => *id,
ResponseData::ProcSpawned { id } => *id,
x => panic!("Unexpected response: {:?}", x),
};
@ -2646,7 +2653,7 @@ mod tests {
"test-tenant",
vec![RequestData::ProcStdin {
id,
data: String::from("hello world\n"),
data: b"hello world\n".to_vec(),
}],
);
@ -2672,7 +2679,7 @@ mod tests {
match &res.payload[0] {
ResponseData::Ok => got_ok = true,
ResponseData::ProcStdout { data, .. } => {
assert_eq!(data, "hello world\n", "Mirrored data didn't match");
assert_eq!(data, b"hello world\n", "Mirrored data didn't match");
got_stdout = true;
}
x => panic!("Unexpected response: {:?}", x),
@ -2694,10 +2701,11 @@ mod tests {
let req = Request::new(
"test-tenant",
vec![
RequestData::ProcRun {
RequestData::ProcSpawn {
cmd: SCRIPT_RUNNER.to_string(),
args: vec![SLEEP_SH.to_str().unwrap().to_string(), String::from("1")],
detached: false,
pty: None,
},
RequestData::ProcList {},
],
@ -2710,7 +2718,7 @@ mod tests {
// Grab the id of the started process
let id = match &res.payload[0] {
ResponseData::ProcStart { id } => *id,
ResponseData::ProcSpawned { id } => *id,
x => panic!("Unexpected response: {:?}", x),
};
@ -2722,6 +2730,7 @@ mod tests {
cmd: SCRIPT_RUNNER.to_string(),
args: vec![SLEEP_SH.to_str().unwrap().to_string(), String::from("1")],
detached: false,
pty: None,
id,
}],
},

@ -1,3 +1,4 @@
use crate::data::PtySize;
use log::*;
use std::{
collections::HashMap,
@ -105,9 +106,12 @@ pub struct Process {
/// Whether or not this process was run detached
pub detached: bool,
/// Dimensions of pty associated with process, if it has one
pub pty: Option<PtySize>,
/// Transport channel to send new input to the stdin of the process,
/// one line at a time
stdin_tx: Option<mpsc::Sender<String>>,
stdin_tx: Option<mpsc::Sender<Vec<u8>>>,
/// Transport channel to report that the process should be killed
kill_tx: Option<oneshot::Sender<()>>,
@ -117,12 +121,19 @@ pub struct Process {
}
impl Process {
pub fn new(id: usize, cmd: String, args: Vec<String>, detached: bool) -> Self {
pub fn new(
id: usize,
cmd: String,
args: Vec<String>,
detached: bool,
pty: Option<PtySize>,
) -> Self {
Self {
id,
cmd,
args,
detached,
pty,
stdin_tx: None,
kill_tx: None,
wait_task: None,
@ -132,7 +143,7 @@ impl Process {
/// Lazy initialization of process state
pub(crate) fn initialize(
&mut self,
stdin_tx: mpsc::Sender<String>,
stdin_tx: mpsc::Sender<Vec<u8>>,
kill_tx: oneshot::Sender<()>,
wait_task: JoinHandle<()>,
) {
@ -141,7 +152,7 @@ impl Process {
self.wait_task = Some(wait_task);
}
pub async fn send_stdin(&self, input: impl Into<String>) -> bool {
pub async fn send_stdin(&self, input: impl Into<Vec<u8>>) -> bool {
if let Some(stdin) = self.stdin_tx.as_ref() {
if stdin.send(input.into()).await.is_ok() {
return true;

@ -172,7 +172,7 @@ where
{
let mut p_lock = processes.lock().await;
for data in res.payload.iter() {
if let ResponseData::ProcStart { id } = *data {
if let ResponseData::ProcSpawned { id } = *data {
p_lock.push(id);
}
}

@ -99,7 +99,7 @@ pub(super) async fn process(
canonicalize,
resolve_file_type,
} => metadata(session, path, canonicalize, resolve_file_type).await,
RequestData::ProcRun {
RequestData::ProcSpawn {
cmd,
args,
detached,
@ -841,7 +841,7 @@ where
id
);
Ok(Outgoing {
data: ResponseData::ProcStart { id },
data: ResponseData::ProcSpawned { id },
post_hook: Some(post_hook),
})
}

@ -1352,7 +1352,7 @@ async fn proc_run_should_send_error_over_stderr_on_failure(#[future] session: Se
let mut session = session.await;
let req = Request::new(
"test-tenant",
vec![RequestData::ProcRun {
vec![RequestData::ProcSpawn {
cmd: DOES_NOT_EXIST_BIN.to_str().unwrap().to_string(),
args: Vec::new(),
detached: false,
@ -1367,7 +1367,7 @@ async fn proc_run_should_send_error_over_stderr_on_failure(#[future] session: Se
let res = mailbox.next().await.unwrap();
assert_eq!(res.payload.len(), 1, "Wrong payload size");
let proc_id = match &res.payload[0] {
ResponseData::ProcStart { id } => *id,
ResponseData::ProcSpawned { id } => *id,
x => panic!("Unexpected response: {:?}", x),
};
@ -1398,7 +1398,7 @@ async fn proc_run_should_send_back_proc_start_on_success(#[future] session: Sess
let mut session = session.await;
let req = Request::new(
"test-tenant",
vec![RequestData::ProcRun {
vec![RequestData::ProcSpawn {
cmd: SCRIPT_RUNNER.to_string(),
args: vec![ECHO_ARGS_TO_STDOUT_SH.to_str().unwrap().to_string()],
detached: false,
@ -1408,7 +1408,7 @@ async fn proc_run_should_send_back_proc_start_on_success(#[future] session: Sess
let res = session.send(req).await.unwrap();
assert_eq!(res.payload.len(), 1, "Wrong payload size");
assert!(
matches!(&res.payload[0], ResponseData::ProcStart { .. }),
matches!(&res.payload[0], ResponseData::ProcSpawned { .. }),
"Unexpected response: {:?}",
res.payload[0]
);
@ -1424,7 +1424,7 @@ async fn proc_run_should_send_back_stdout_periodically_when_available(#[future]
// Run a program that echoes to stdout
let req = Request::new(
"test-tenant",
vec![RequestData::ProcRun {
vec![RequestData::ProcSpawn {
cmd: SCRIPT_RUNNER.to_string(),
args: vec![
ECHO_ARGS_TO_STDOUT_SH.to_str().unwrap().to_string(),
@ -1439,7 +1439,7 @@ async fn proc_run_should_send_back_stdout_periodically_when_available(#[future]
let res = mailbox.next().await.unwrap();
assert_eq!(res.payload.len(), 1, "Wrong payload size");
assert!(
matches!(&res.payload[0], ResponseData::ProcStart { .. }),
matches!(&res.payload[0], ResponseData::ProcSpawned { .. }),
"Unexpected response: {:?}",
res.payload[0]
);
@ -1488,7 +1488,7 @@ async fn proc_run_should_send_back_stderr_periodically_when_available(#[future]
// Run a program that echoes to stderr
let req = Request::new(
"test-tenant",
vec![RequestData::ProcRun {
vec![RequestData::ProcSpawn {
cmd: SCRIPT_RUNNER.to_string(),
args: vec![
ECHO_ARGS_TO_STDERR_SH.to_str().unwrap().to_string(),
@ -1503,7 +1503,7 @@ async fn proc_run_should_send_back_stderr_periodically_when_available(#[future]
let res = mailbox.next().await.unwrap();
assert_eq!(res.payload.len(), 1, "Wrong payload size");
assert!(
matches!(&res.payload[0], ResponseData::ProcStart { .. }),
matches!(&res.payload[0], ResponseData::ProcSpawned { .. }),
"Unexpected response: {:?}",
res.payload[0]
);
@ -1552,7 +1552,7 @@ async fn proc_run_should_clear_process_from_state_when_done(#[future] session: S
// Run a program that ends after a little bit
let req = Request::new(
"test-tenant",
vec![RequestData::ProcRun {
vec![RequestData::ProcSpawn {
cmd: SCRIPT_RUNNER.to_string(),
args: vec![SLEEP_SH.to_str().unwrap().to_string(), String::from("0.1")],
detached: false,
@ -1563,7 +1563,7 @@ async fn proc_run_should_clear_process_from_state_when_done(#[future] session: S
let res = mailbox.next().await.unwrap();
assert_eq!(res.payload.len(), 1, "Wrong payload size");
let id = match &res.payload[0] {
ResponseData::ProcStart { id } => *id,
ResponseData::ProcSpawned { id } => *id,
x => panic!("Unexpected response: {:?}", x),
};
@ -1600,7 +1600,7 @@ async fn proc_run_should_clear_process_from_state_when_killed(#[future] session:
// Run a program that ends slowly
let req = Request::new(
"test-tenant",
vec![RequestData::ProcRun {
vec![RequestData::ProcSpawn {
cmd: SCRIPT_RUNNER.to_string(),
args: vec![SLEEP_SH.to_str().unwrap().to_string(), String::from("1")],
detached: false,
@ -1612,7 +1612,7 @@ async fn proc_run_should_clear_process_from_state_when_killed(#[future] session:
let res = mailbox.next().await.unwrap();
assert_eq!(res.payload.len(), 1, "Wrong payload size");
let id = match &res.payload[0] {
ResponseData::ProcStart { id } => *id,
ResponseData::ProcSpawned { id } => *id,
x => panic!("Unexpected response: {:?}", x),
};
@ -1674,7 +1674,7 @@ async fn proc_kill_should_send_ok_and_done_responses_on_success(#[future] sessio
// First, run a program that sits around (sleep for 1 second)
let req = Request::new(
"test-tenant",
vec![RequestData::ProcRun {
vec![RequestData::ProcSpawn {
cmd: SCRIPT_RUNNER.to_string(),
args: vec![SLEEP_SH.to_str().unwrap().to_string(), String::from("1")],
detached: false,
@ -1688,7 +1688,7 @@ async fn proc_kill_should_send_ok_and_done_responses_on_success(#[future] sessio
// Second, grab the id of the started process
let id = match &res.payload[0] {
ResponseData::ProcStart { id } => *id,
ResponseData::ProcSpawned { id } => *id,
x => panic!("Unexpected response: {:?}", x),
};
@ -1749,7 +1749,7 @@ async fn proc_stdin_should_send_ok_on_success_and_properly_send_stdin_to_process
// First, run a program that listens for stdin
let req = Request::new(
"test-tenant",
vec![RequestData::ProcRun {
vec![RequestData::ProcSpawn {
cmd: SCRIPT_RUNNER.to_string(),
args: vec![ECHO_STDIN_TO_STDOUT_SH.to_str().unwrap().to_string()],
detached: false,
@ -1762,7 +1762,7 @@ async fn proc_stdin_should_send_ok_on_success_and_properly_send_stdin_to_process
// Second, grab the id of the started process
let id = match &res.payload[0] {
ResponseData::ProcStart { id } => *id,
ResponseData::ProcSpawned { id } => *id,
x => panic!("Unexpected response: {:?}", x),
};
@ -1798,7 +1798,7 @@ async fn proc_list_should_send_proc_entry_list(#[future] session: Session) {
let mut session = session.await;
let req = Request::new(
"test-tenant",
vec![RequestData::ProcRun {
vec![RequestData::ProcSpawn {
cmd: SCRIPT_RUNNER.to_string(),
args: vec![SLEEP_SH.to_str().unwrap().to_string(), String::from("10")],
detached: false,
@ -1810,7 +1810,7 @@ async fn proc_list_should_send_proc_entry_list(#[future] session: Session) {
// Grab the id of the started process
let id = match &res.payload[0] {
ResponseData::ProcStart { id } => *id,
ResponseData::ProcSpawned { id } => *id,
x => panic!("Unexpected response: {:?}", x),
};

@ -144,7 +144,7 @@ fn format_shell(data: ResponseData) -> ResponseOut {
.collect::<Vec<String>>()
.join("\n"),
),
ResponseData::ProcStart { .. } => ResponseOut::None,
ResponseData::ProcSpawned { .. } => ResponseOut::None,
ResponseData::ProcStdout { data, .. } => ResponseOut::Stdout(data),
ResponseData::ProcStderr { data, .. } => ResponseOut::Stderr(data),
ResponseData::ProcDone { id, success, code } => {

@ -88,7 +88,7 @@ async fn start(
// the stdin will be used for sending ProcStdin to remote process
(
_,
Some(RequestData::ProcRun {
Some(RequestData::ProcSpawn {
cmd,
args,
detached,

@ -168,7 +168,7 @@ fn should_support_json_to_execute_program_and_return_exit_status(mut action_cmd:
let req = Request {
id: rand::random(),
tenant: random_tenant(),
payload: vec![RequestData::ProcRun {
payload: vec![RequestData::ProcSpawn {
cmd: SCRIPT_RUNNER.to_string(),
args: vec![ECHO_ARGS_TO_STDOUT_SH.to_str().unwrap().to_string()],
detached: false,
@ -186,7 +186,7 @@ fn should_support_json_to_execute_program_and_return_exit_status(mut action_cmd:
let res: Response = serde_json::from_slice(&cmd.get_output().stdout).unwrap();
assert!(
matches!(res.payload[0], ResponseData::ProcStart { .. }),
matches!(res.payload[0], ResponseData::ProcSpawned { .. }),
"Unexpected response: {:?}",
res.payload[0],
);
@ -198,7 +198,7 @@ fn should_support_json_to_capture_and_print_stdout(ctx: &'_ DistantServerCtx) {
let req = Request {
id: rand::random(),
tenant: random_tenant(),
payload: vec![RequestData::ProcRun {
payload: vec![RequestData::ProcSpawn {
cmd: SCRIPT_RUNNER.to_string(),
args: vec![
ECHO_ARGS_TO_STDOUT_SH.to_str().unwrap().to_string(),
@ -230,7 +230,7 @@ fn should_support_json_to_capture_and_print_stdout(ctx: &'_ DistantServerCtx) {
friendly_recv_line(&stdout, Duration::from_secs(30)).expect("Failed to get proc start");
let res: Response = serde_json::from_str(&out).unwrap();
assert!(
matches!(res.payload[0], ResponseData::ProcStart { .. }),
matches!(res.payload[0], ResponseData::ProcSpawned { .. }),
"Unexpected response: {:?}",
res.payload[0]
);
@ -267,7 +267,7 @@ fn should_support_json_to_capture_and_print_stderr(ctx: &'_ DistantServerCtx) {
let req = Request {
id: rand::random(),
tenant: random_tenant(),
payload: vec![RequestData::ProcRun {
payload: vec![RequestData::ProcSpawn {
cmd: SCRIPT_RUNNER.to_string(),
args: vec![
ECHO_ARGS_TO_STDERR_SH.to_str().unwrap().to_string(),
@ -299,7 +299,7 @@ fn should_support_json_to_capture_and_print_stderr(ctx: &'_ DistantServerCtx) {
friendly_recv_line(&stdout, Duration::from_secs(30)).expect("Failed to get proc start");
let res: Response = serde_json::from_str(&out).unwrap();
assert!(
matches!(res.payload[0], ResponseData::ProcStart { .. }),
matches!(res.payload[0], ResponseData::ProcSpawned { .. }),
"Unexpected response: {:?}",
res.payload[0]
);
@ -335,7 +335,7 @@ fn should_support_json_to_forward_stdin_to_remote_process(ctx: &'_ DistantServer
let req = Request {
id: rand::random(),
tenant: random_tenant(),
payload: vec![RequestData::ProcRun {
payload: vec![RequestData::ProcSpawn {
cmd: SCRIPT_RUNNER.to_string(),
args: vec![ECHO_STDIN_TO_STDOUT_SH.to_str().unwrap().to_string()],
detached: false,
@ -364,7 +364,7 @@ fn should_support_json_to_forward_stdin_to_remote_process(ctx: &'_ DistantServer
friendly_recv_line(&stdout, Duration::from_secs(30)).expect("Failed to get proc start");
let res: Response = serde_json::from_str(&out).unwrap();
let id = match &res.payload[0] {
ResponseData::ProcStart { id } => *id,
ResponseData::ProcSpawned { id } => *id,
x => panic!("Unexpected response: {:?}", x),
};
@ -429,7 +429,7 @@ fn should_support_json_output_for_error(mut action_cmd: Command) {
let req = Request {
id: rand::random(),
tenant: random_tenant(),
payload: vec![RequestData::ProcRun {
payload: vec![RequestData::ProcSpawn {
cmd: DOES_NOT_EXIST_BIN.to_str().unwrap().to_string(),
args: Vec::new(),
detached: false,

Loading…
Cancel
Save