mirror of https://github.com/chipsenkbeil/distant
Complete shell support (#89)
parent
c6c07c5c2c
commit
050bb3496a
@ -0,0 +1,151 @@
|
||||
use crate::data::PtySize;
|
||||
use std::{future::Future, pin::Pin};
|
||||
use tokio::{io, sync::mpsc};
|
||||
|
||||
mod pty;
|
||||
pub use pty::*;
|
||||
|
||||
mod simple;
|
||||
pub use simple::*;
|
||||
|
||||
mod wait;
|
||||
pub use wait::{ExitStatus, WaitRx, WaitTx};
|
||||
|
||||
/// Alias to the return type of an async function (for use with traits)
|
||||
pub type FutureReturn<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
|
||||
|
||||
/// Represents a process on the remote server
|
||||
pub trait Process: ProcessKiller + ProcessPty {
|
||||
/// Represents the id of the process
|
||||
fn id(&self) -> usize;
|
||||
|
||||
/// Waits for the process to exit, returning the exit status
|
||||
///
|
||||
/// If the process has already exited, the status is returned immediately.
|
||||
fn wait(&mut self) -> FutureReturn<'_, io::Result<ExitStatus>>;
|
||||
|
||||
/// Returns a reference to stdin channel if the process still has it associated
|
||||
fn stdin(&self) -> Option<&dyn InputChannel>;
|
||||
|
||||
/// Returns a mutable reference to the stdin channel if the process still has it associated
|
||||
fn mut_stdin(&mut self) -> Option<&mut (dyn InputChannel + 'static)>;
|
||||
|
||||
/// Takes the stdin channel from the process if it is still associated
|
||||
fn take_stdin(&mut self) -> Option<Box<dyn InputChannel>>;
|
||||
|
||||
/// Returns a reference to stdout channel if the process still has it associated
|
||||
fn stdout(&self) -> Option<&dyn OutputChannel>;
|
||||
|
||||
/// Returns a mutable reference to the stdout channel if the process still has it associated
|
||||
fn mut_stdout(&mut self) -> Option<&mut (dyn OutputChannel + 'static)>;
|
||||
|
||||
/// Takes the stdout channel from the process if it is still associated
|
||||
fn take_stdout(&mut self) -> Option<Box<dyn OutputChannel>>;
|
||||
|
||||
/// Returns a reference to stderr channel if the process still has it associated
|
||||
fn stderr(&self) -> Option<&dyn OutputChannel>;
|
||||
|
||||
/// Returns a mutable reference to the stderr channel if the process still has it associated
|
||||
fn mut_stderr(&mut self) -> Option<&mut (dyn OutputChannel + 'static)>;
|
||||
|
||||
/// Takes the stderr channel from the process if it is still associated
|
||||
fn take_stderr(&mut self) -> Option<Box<dyn OutputChannel>>;
|
||||
}
|
||||
|
||||
/// Represents interface that can be used to work with a pty associated with a process
|
||||
pub trait ProcessPty: Send + Sync {
|
||||
/// Returns the current size of the process' pty if it has one
|
||||
fn pty_size(&self) -> Option<PtySize>;
|
||||
|
||||
/// Resize the pty associated with the process; returns an error if fails or if the
|
||||
/// process does not leverage a pty
|
||||
fn resize_pty(&self, size: PtySize) -> io::Result<()>;
|
||||
|
||||
/// Clone a process pty to support reading and updating pty independently
|
||||
fn clone_pty(&self) -> Box<dyn ProcessPty>;
|
||||
}
|
||||
|
||||
/// Trait that can be implemented to mark a process as not having a pty
|
||||
pub trait NoProcessPty: Send + Sync {}
|
||||
|
||||
/// Internal type so we can create a dummy instance that implements trait
|
||||
struct NoProcessPtyImpl {}
|
||||
impl NoProcessPty for NoProcessPtyImpl {}
|
||||
|
||||
impl<T: NoProcessPty> ProcessPty for T {
|
||||
fn pty_size(&self) -> Option<PtySize> {
|
||||
None
|
||||
}
|
||||
|
||||
fn resize_pty(&self, _size: PtySize) -> io::Result<()> {
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
"Process does not use pty",
|
||||
))
|
||||
}
|
||||
|
||||
fn clone_pty(&self) -> Box<dyn ProcessPty> {
|
||||
Box::new(NoProcessPtyImpl {})
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents interface that can be used to kill a remote process
|
||||
pub trait ProcessKiller: Send + Sync {
|
||||
/// Kill the process
|
||||
///
|
||||
/// If the process is dead or has already been killed, this will return
|
||||
/// an error.
|
||||
fn kill(&mut self) -> FutureReturn<'_, io::Result<()>>;
|
||||
|
||||
/// Clone a process killer to support sending signals independently
|
||||
fn clone_killer(&self) -> Box<dyn ProcessKiller>;
|
||||
}
|
||||
|
||||
impl ProcessKiller for mpsc::Sender<()> {
|
||||
fn kill(&mut self) -> FutureReturn<'_, io::Result<()>> {
|
||||
async fn inner(this: &mut mpsc::Sender<()>) -> io::Result<()> {
|
||||
this.send(())
|
||||
.await
|
||||
.map_err(|x| io::Error::new(io::ErrorKind::BrokenPipe, x))
|
||||
}
|
||||
Box::pin(inner(self))
|
||||
}
|
||||
|
||||
fn clone_killer(&self) -> Box<dyn ProcessKiller> {
|
||||
Box::new(self.clone())
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents an input channel of a process such as stdin
|
||||
pub trait InputChannel: Send + Sync {
|
||||
/// Sends input through channel, returning unit if succeeds or an error if fails
|
||||
fn send<'a>(&'a mut self, data: &[u8]) -> FutureReturn<'a, io::Result<()>>;
|
||||
}
|
||||
|
||||
impl InputChannel for mpsc::Sender<Vec<u8>> {
|
||||
fn send<'a>(&'a mut self, data: &[u8]) -> FutureReturn<'a, io::Result<()>> {
|
||||
let data = data.to_vec();
|
||||
Box::pin(async move {
|
||||
match mpsc::Sender::send(self, data).await {
|
||||
Ok(_) => Ok(()),
|
||||
Err(_) => Err(io::Error::new(
|
||||
io::ErrorKind::BrokenPipe,
|
||||
"Input channel closed",
|
||||
)),
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents an output channel of a process such as stdout or stderr
|
||||
pub trait OutputChannel: Send + Sync {
|
||||
/// Waits for next output from channel, returning Some(data) if there is output, None if
|
||||
/// the channel has been closed, or bubbles up an error if encountered
|
||||
fn recv(&mut self) -> FutureReturn<'_, io::Result<Option<Vec<u8>>>>;
|
||||
}
|
||||
|
||||
impl OutputChannel for mpsc::Receiver<Vec<u8>> {
|
||||
fn recv(&mut self) -> FutureReturn<'_, io::Result<Option<Vec<u8>>>> {
|
||||
Box::pin(async move { Ok(mpsc::Receiver::recv(self).await) })
|
||||
}
|
||||
}
|
@ -0,0 +1,278 @@
|
||||
use super::{
|
||||
wait, ExitStatus, FutureReturn, InputChannel, OutputChannel, Process, ProcessKiller,
|
||||
ProcessPty, PtySize, WaitRx,
|
||||
};
|
||||
use crate::constants::{MAX_PIPE_CHUNK_SIZE, READ_PAUSE_MILLIS};
|
||||
use portable_pty::{CommandBuilder, MasterPty, PtySize as PortablePtySize};
|
||||
use std::{
|
||||
ffi::OsStr,
|
||||
io::{self, Read, Write},
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
use tokio::{sync::mpsc, task::JoinHandle};
|
||||
|
||||
/// Represents a process that is associated with a pty
|
||||
pub struct PtyProcess {
|
||||
id: usize,
|
||||
pty_master: PtyProcessMaster,
|
||||
stdin: Option<Box<dyn InputChannel>>,
|
||||
stdout: Option<Box<dyn OutputChannel>>,
|
||||
stdin_task: Option<JoinHandle<()>>,
|
||||
stdout_task: Option<JoinHandle<io::Result<()>>>,
|
||||
kill_tx: mpsc::Sender<()>,
|
||||
wait: WaitRx,
|
||||
}
|
||||
|
||||
impl PtyProcess {
|
||||
/// Spawns a new simple process
|
||||
pub fn spawn<S, I, S2>(program: S, args: I, size: PtySize) -> io::Result<Self>
|
||||
where
|
||||
S: AsRef<OsStr>,
|
||||
I: IntoIterator<Item = S2>,
|
||||
S2: AsRef<OsStr>,
|
||||
{
|
||||
// Establish our new pty for the given size
|
||||
let pty_system = portable_pty::native_pty_system();
|
||||
let pty_pair = pty_system
|
||||
.openpty(PortablePtySize {
|
||||
rows: size.rows,
|
||||
cols: size.cols,
|
||||
pixel_width: size.pixel_width,
|
||||
pixel_height: size.pixel_height,
|
||||
})
|
||||
.map_err(|x| io::Error::new(io::ErrorKind::Other, x))?;
|
||||
let pty_master = pty_pair.master;
|
||||
let pty_slave = pty_pair.slave;
|
||||
|
||||
// Spawn our process within the pty
|
||||
let mut cmd = CommandBuilder::new(program);
|
||||
cmd.args(args);
|
||||
let mut child = pty_slave
|
||||
.spawn_command(cmd)
|
||||
.map_err(|x| io::Error::new(io::ErrorKind::Other, x))?;
|
||||
|
||||
// NOTE: Need to drop slave to close out file handles and avoid deadlock when waiting on
|
||||
// the child
|
||||
drop(pty_slave);
|
||||
|
||||
// Spawn a blocking task to process submitting stdin async
|
||||
let (stdin_tx, mut stdin_rx) = mpsc::channel::<Vec<u8>>(1);
|
||||
let mut stdin_writer = pty_master
|
||||
.try_clone_writer()
|
||||
.map_err(|x| io::Error::new(io::ErrorKind::Other, x))?;
|
||||
let stdin_task = tokio::task::spawn_blocking(move || {
|
||||
while let Some(input) = stdin_rx.blocking_recv() {
|
||||
if stdin_writer.write_all(&input).is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Spawn a blocking task to process receiving stdout async
|
||||
let (stdout_tx, stdout_rx) = mpsc::channel::<Vec<u8>>(1);
|
||||
let mut stdout_reader = pty_master
|
||||
.try_clone_reader()
|
||||
.map_err(|x| io::Error::new(io::ErrorKind::Other, x))?;
|
||||
let stdout_task = tokio::task::spawn_blocking(move || {
|
||||
let mut buf: [u8; MAX_PIPE_CHUNK_SIZE] = [0; MAX_PIPE_CHUNK_SIZE];
|
||||
loop {
|
||||
match stdout_reader.read(&mut buf) {
|
||||
Ok(n) if n > 0 => {
|
||||
let _ = stdout_tx.blocking_send(buf[..n].to_vec()).map_err(|_| {
|
||||
io::Error::new(io::ErrorKind::BrokenPipe, "Output channel closed")
|
||||
})?;
|
||||
}
|
||||
Ok(_) => return Ok(()),
|
||||
Err(x) => return Err(x),
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let (kill_tx, mut kill_rx) = mpsc::channel(1);
|
||||
let (mut wait_tx, wait_rx) = wait::channel();
|
||||
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match (child.try_wait(), kill_rx.try_recv()) {
|
||||
(Ok(Some(status)), _) => {
|
||||
// TODO: Keep track of io error
|
||||
let _ = wait_tx
|
||||
.send(ExitStatus {
|
||||
success: status.success(),
|
||||
code: None,
|
||||
})
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
(_, Ok(_)) => {
|
||||
// TODO: Keep track of io error
|
||||
let _ = wait_tx.kill().await;
|
||||
break;
|
||||
}
|
||||
(Err(x), _) => {
|
||||
// TODO: Keep track of io error
|
||||
let _ = wait_tx.send(x).await;
|
||||
break;
|
||||
}
|
||||
_ => {
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(READ_PAUSE_MILLIS))
|
||||
.await;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(Self {
|
||||
id: rand::random(),
|
||||
pty_master: PtyProcessMaster(Arc::new(Mutex::new(pty_master))),
|
||||
stdin: Some(Box::new(stdin_tx)),
|
||||
stdout: Some(Box::new(stdout_rx)),
|
||||
stdin_task: Some(stdin_task),
|
||||
stdout_task: Some(stdout_task),
|
||||
kill_tx,
|
||||
wait: wait_rx,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Process for PtyProcess {
|
||||
fn id(&self) -> usize {
|
||||
self.id
|
||||
}
|
||||
|
||||
fn wait(&mut self) -> FutureReturn<'_, io::Result<ExitStatus>> {
|
||||
async fn inner(this: &mut PtyProcess) -> io::Result<ExitStatus> {
|
||||
let mut status = this.wait.recv().await?;
|
||||
|
||||
if let Some(task) = this.stdin_task.take() {
|
||||
task.abort();
|
||||
}
|
||||
if let Some(task) = this.stdout_task.take() {
|
||||
let _ = task.await;
|
||||
}
|
||||
|
||||
if status.success && status.code.is_none() {
|
||||
status.code = Some(0);
|
||||
}
|
||||
Ok(status)
|
||||
}
|
||||
Box::pin(inner(self))
|
||||
}
|
||||
|
||||
fn stdin(&self) -> Option<&dyn InputChannel> {
|
||||
self.stdin.as_deref()
|
||||
}
|
||||
|
||||
fn mut_stdin(&mut self) -> Option<&mut (dyn InputChannel + 'static)> {
|
||||
self.stdin.as_deref_mut()
|
||||
}
|
||||
|
||||
fn take_stdin(&mut self) -> Option<Box<dyn InputChannel>> {
|
||||
self.stdin.take()
|
||||
}
|
||||
|
||||
fn stdout(&self) -> Option<&dyn OutputChannel> {
|
||||
self.stdout.as_deref()
|
||||
}
|
||||
|
||||
fn mut_stdout(&mut self) -> Option<&mut (dyn OutputChannel + 'static)> {
|
||||
self.stdout.as_deref_mut()
|
||||
}
|
||||
|
||||
fn take_stdout(&mut self) -> Option<Box<dyn OutputChannel>> {
|
||||
self.stdout.take()
|
||||
}
|
||||
|
||||
fn stderr(&self) -> Option<&dyn OutputChannel> {
|
||||
None
|
||||
}
|
||||
|
||||
fn mut_stderr(&mut self) -> Option<&mut (dyn OutputChannel + 'static)> {
|
||||
None
|
||||
}
|
||||
|
||||
fn take_stderr(&mut self) -> Option<Box<dyn OutputChannel>> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
impl ProcessKiller for PtyProcess {
|
||||
fn kill(&mut self) -> FutureReturn<'_, io::Result<()>> {
|
||||
async fn inner(this: &mut PtyProcess) -> io::Result<()> {
|
||||
this.kill_tx
|
||||
.send(())
|
||||
.await
|
||||
.map_err(|x| io::Error::new(io::ErrorKind::BrokenPipe, x))
|
||||
}
|
||||
Box::pin(inner(self))
|
||||
}
|
||||
|
||||
fn clone_killer(&self) -> Box<dyn ProcessKiller> {
|
||||
Box::new(self.kill_tx.clone())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct PtyProcessKiller(mpsc::Sender<()>);
|
||||
|
||||
impl ProcessKiller for PtyProcessKiller {
|
||||
fn kill(&mut self) -> FutureReturn<'_, io::Result<()>> {
|
||||
async fn inner(this: &mut PtyProcessKiller) -> io::Result<()> {
|
||||
this.0
|
||||
.send(())
|
||||
.await
|
||||
.map_err(|x| io::Error::new(io::ErrorKind::BrokenPipe, x))
|
||||
}
|
||||
Box::pin(inner(self))
|
||||
}
|
||||
|
||||
fn clone_killer(&self) -> Box<dyn ProcessKiller> {
|
||||
Box::new(self.clone())
|
||||
}
|
||||
}
|
||||
|
||||
impl ProcessPty for PtyProcess {
|
||||
fn pty_size(&self) -> Option<PtySize> {
|
||||
self.pty_master.pty_size()
|
||||
}
|
||||
|
||||
fn resize_pty(&self, size: PtySize) -> io::Result<()> {
|
||||
self.pty_master.resize_pty(size)
|
||||
}
|
||||
|
||||
fn clone_pty(&self) -> Box<dyn ProcessPty> {
|
||||
self.pty_master.clone_pty()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct PtyProcessMaster(Arc<Mutex<Box<dyn MasterPty + Send>>>);
|
||||
|
||||
impl ProcessPty for PtyProcessMaster {
|
||||
fn pty_size(&self) -> Option<PtySize> {
|
||||
self.0.lock().unwrap().get_size().ok().map(|size| PtySize {
|
||||
rows: size.rows,
|
||||
cols: size.cols,
|
||||
pixel_width: size.pixel_width,
|
||||
pixel_height: size.pixel_height,
|
||||
})
|
||||
}
|
||||
|
||||
fn resize_pty(&self, size: PtySize) -> io::Result<()> {
|
||||
self.0
|
||||
.lock()
|
||||
.unwrap()
|
||||
.resize(PortablePtySize {
|
||||
rows: size.rows,
|
||||
cols: size.cols,
|
||||
pixel_width: size.pixel_width,
|
||||
pixel_height: size.pixel_height,
|
||||
})
|
||||
.map_err(|x| io::Error::new(io::ErrorKind::Other, x))
|
||||
}
|
||||
|
||||
fn clone_pty(&self) -> Box<dyn ProcessPty> {
|
||||
Box::new(self.clone())
|
||||
}
|
||||
}
|
@ -0,0 +1,181 @@
|
||||
use super::{
|
||||
wait, ExitStatus, FutureReturn, InputChannel, NoProcessPty, OutputChannel, Process,
|
||||
ProcessKiller, WaitRx,
|
||||
};
|
||||
use std::{ffi::OsStr, process::Stdio};
|
||||
use tokio::{io, process::Command, sync::mpsc, task::JoinHandle};
|
||||
|
||||
mod tasks;
|
||||
|
||||
/// Represents a simple process that does not have a pty
|
||||
pub struct SimpleProcess {
|
||||
id: usize,
|
||||
stdin: Option<Box<dyn InputChannel>>,
|
||||
stdout: Option<Box<dyn OutputChannel>>,
|
||||
stderr: Option<Box<dyn OutputChannel>>,
|
||||
stdin_task: Option<JoinHandle<io::Result<()>>>,
|
||||
stdout_task: Option<JoinHandle<io::Result<()>>>,
|
||||
stderr_task: Option<JoinHandle<io::Result<()>>>,
|
||||
kill_tx: mpsc::Sender<()>,
|
||||
wait: WaitRx,
|
||||
}
|
||||
|
||||
impl SimpleProcess {
|
||||
/// Spawns a new simple process
|
||||
pub fn spawn<S, I, S2>(program: S, args: I) -> io::Result<Self>
|
||||
where
|
||||
S: AsRef<OsStr>,
|
||||
I: IntoIterator<Item = S2>,
|
||||
S2: AsRef<OsStr>,
|
||||
{
|
||||
let mut child = Command::new(program)
|
||||
.args(args)
|
||||
.stdin(Stdio::piped())
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.spawn()?;
|
||||
|
||||
let stdout = child.stdout.take().unwrap();
|
||||
let (stdout_task, stdout_ch) = tasks::spawn_read_task(stdout, 1);
|
||||
|
||||
let stderr = child.stderr.take().unwrap();
|
||||
let (stderr_task, stderr_ch) = tasks::spawn_read_task(stderr, 1);
|
||||
|
||||
let stdin = child.stdin.take().unwrap();
|
||||
let (stdin_task, stdin_ch) = tasks::spawn_write_task(stdin, 1);
|
||||
|
||||
let (kill_tx, mut kill_rx) = mpsc::channel(1);
|
||||
let (mut wait_tx, wait_rx) = wait::channel();
|
||||
|
||||
tokio::spawn(async move {
|
||||
tokio::select! {
|
||||
_ = kill_rx.recv() => {
|
||||
let status = match child.kill().await {
|
||||
Ok(_) => ExitStatus::killed(),
|
||||
Err(x) => ExitStatus::from(x),
|
||||
};
|
||||
|
||||
// TODO: Keep track of io error
|
||||
let _ = wait_tx.send(status).await;
|
||||
}
|
||||
status = child.wait() => {
|
||||
// TODO: Keep track of io error
|
||||
let _ = wait_tx.send(status).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(Self {
|
||||
id: rand::random(),
|
||||
stdin: Some(Box::new(stdin_ch)),
|
||||
stdout: Some(Box::new(stdout_ch)),
|
||||
stderr: Some(Box::new(stderr_ch)),
|
||||
stdin_task: Some(stdin_task),
|
||||
stdout_task: Some(stdout_task),
|
||||
stderr_task: Some(stderr_task),
|
||||
kill_tx,
|
||||
wait: wait_rx,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Process for SimpleProcess {
|
||||
fn id(&self) -> usize {
|
||||
self.id
|
||||
}
|
||||
|
||||
fn wait(&mut self) -> FutureReturn<'_, io::Result<ExitStatus>> {
|
||||
async fn inner(this: &mut SimpleProcess) -> io::Result<ExitStatus> {
|
||||
let mut status = this.wait.recv().await?;
|
||||
|
||||
if let Some(task) = this.stdin_task.take() {
|
||||
task.abort();
|
||||
}
|
||||
if let Some(task) = this.stdout_task.take() {
|
||||
let _ = task.await;
|
||||
}
|
||||
if let Some(task) = this.stderr_task.take() {
|
||||
let _ = task.await;
|
||||
}
|
||||
|
||||
if status.success && status.code.is_none() {
|
||||
status.code = Some(0);
|
||||
}
|
||||
Ok(status)
|
||||
}
|
||||
Box::pin(inner(self))
|
||||
}
|
||||
|
||||
fn stdin(&self) -> Option<&dyn InputChannel> {
|
||||
self.stdin.as_deref()
|
||||
}
|
||||
|
||||
fn mut_stdin(&mut self) -> Option<&mut (dyn InputChannel + 'static)> {
|
||||
self.stdin.as_deref_mut()
|
||||
}
|
||||
|
||||
fn take_stdin(&mut self) -> Option<Box<dyn InputChannel>> {
|
||||
self.stdin.take()
|
||||
}
|
||||
|
||||
fn stdout(&self) -> Option<&dyn OutputChannel> {
|
||||
self.stdout.as_deref()
|
||||
}
|
||||
|
||||
fn mut_stdout(&mut self) -> Option<&mut (dyn OutputChannel + 'static)> {
|
||||
self.stdout.as_deref_mut()
|
||||
}
|
||||
|
||||
fn take_stdout(&mut self) -> Option<Box<dyn OutputChannel>> {
|
||||
self.stdout.take()
|
||||
}
|
||||
|
||||
fn stderr(&self) -> Option<&dyn OutputChannel> {
|
||||
self.stderr.as_deref()
|
||||
}
|
||||
|
||||
fn mut_stderr(&mut self) -> Option<&mut (dyn OutputChannel + 'static)> {
|
||||
self.stderr.as_deref_mut()
|
||||
}
|
||||
|
||||
fn take_stderr(&mut self) -> Option<Box<dyn OutputChannel>> {
|
||||
self.stderr.take()
|
||||
}
|
||||
}
|
||||
|
||||
impl NoProcessPty for SimpleProcess {}
|
||||
|
||||
impl ProcessKiller for SimpleProcess {
|
||||
fn kill(&mut self) -> FutureReturn<'_, io::Result<()>> {
|
||||
async fn inner(this: &mut SimpleProcess) -> io::Result<()> {
|
||||
this.kill_tx
|
||||
.send(())
|
||||
.await
|
||||
.map_err(|x| io::Error::new(io::ErrorKind::BrokenPipe, x))
|
||||
}
|
||||
Box::pin(inner(self))
|
||||
}
|
||||
|
||||
fn clone_killer(&self) -> Box<dyn ProcessKiller> {
|
||||
Box::new(self.kill_tx.clone())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SimpleProcessKiller(mpsc::Sender<()>);
|
||||
|
||||
impl ProcessKiller for SimpleProcessKiller {
|
||||
fn kill(&mut self) -> FutureReturn<'_, io::Result<()>> {
|
||||
async fn inner(this: &mut SimpleProcessKiller) -> io::Result<()> {
|
||||
this.0
|
||||
.send(())
|
||||
.await
|
||||
.map_err(|x| io::Error::new(io::ErrorKind::BrokenPipe, x))
|
||||
}
|
||||
Box::pin(inner(self))
|
||||
}
|
||||
|
||||
fn clone_killer(&self) -> Box<dyn ProcessKiller> {
|
||||
Box::new(self.clone())
|
||||
}
|
||||
}
|
@ -0,0 +1,71 @@
|
||||
use crate::constants::{MAX_PIPE_CHUNK_SIZE, READ_PAUSE_MILLIS};
|
||||
use tokio::{
|
||||
io::{self, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
|
||||
sync::mpsc,
|
||||
task::JoinHandle,
|
||||
};
|
||||
|
||||
pub fn spawn_read_task<R>(
|
||||
reader: R,
|
||||
buf: usize,
|
||||
) -> (JoinHandle<io::Result<()>>, mpsc::Receiver<Vec<u8>>)
|
||||
where
|
||||
R: AsyncRead + Send + Unpin + 'static,
|
||||
{
|
||||
let (tx, rx) = mpsc::channel(buf);
|
||||
let task = tokio::spawn(read_handler(reader, tx));
|
||||
(task, rx)
|
||||
}
|
||||
|
||||
/// Continually reads from some reader and fowards to the provided sender until the reader
|
||||
/// or channel is closed
|
||||
async fn read_handler<R>(mut reader: R, channel: mpsc::Sender<Vec<u8>>) -> io::Result<()>
|
||||
where
|
||||
R: AsyncRead + Unpin,
|
||||
{
|
||||
let mut buf: [u8; MAX_PIPE_CHUNK_SIZE] = [0; MAX_PIPE_CHUNK_SIZE];
|
||||
loop {
|
||||
match reader.read(&mut buf).await {
|
||||
Ok(n) if n > 0 => {
|
||||
let _ = channel.send(buf[..n].to_vec()).await.map_err(|_| {
|
||||
io::Error::new(io::ErrorKind::BrokenPipe, "Output channel closed")
|
||||
})?;
|
||||
|
||||
// Pause to allow buffer to fill up a little bit, avoiding
|
||||
// spamming with a lot of smaller responses
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(READ_PAUSE_MILLIS)).await;
|
||||
}
|
||||
Ok(_) => return Ok(()),
|
||||
Err(x) if x.kind() == io::ErrorKind::WouldBlock => {
|
||||
// Pause to allow buffer to fill up a little bit, avoiding
|
||||
// spamming with a lot of smaller responses
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(READ_PAUSE_MILLIS)).await;
|
||||
}
|
||||
Err(x) => return Err(x),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn spawn_write_task<W>(
|
||||
writer: W,
|
||||
buf: usize,
|
||||
) -> (JoinHandle<io::Result<()>>, mpsc::Sender<Vec<u8>>)
|
||||
where
|
||||
W: AsyncWrite + Send + Unpin + 'static,
|
||||
{
|
||||
let (tx, rx) = mpsc::channel(buf);
|
||||
let task = tokio::spawn(write_handler(writer, rx));
|
||||
(task, tx)
|
||||
}
|
||||
|
||||
/// Continually writes to some writer by reading data from a provided receiver until the receiver
|
||||
/// or writer is closed
|
||||
async fn write_handler<W>(mut writer: W, mut channel: mpsc::Receiver<Vec<u8>>) -> io::Result<()>
|
||||
where
|
||||
W: AsyncWrite + Unpin,
|
||||
{
|
||||
while let Some(data) = channel.recv().await {
|
||||
let _ = writer.write_all(&data).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
@ -0,0 +1,136 @@
|
||||
use tokio::{io, sync::mpsc};
|
||||
|
||||
/// Exit status of a remote process
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
|
||||
pub struct ExitStatus {
|
||||
pub success: bool,
|
||||
pub code: Option<i32>,
|
||||
}
|
||||
|
||||
impl ExitStatus {
|
||||
/// Produces a new exit status representing a killed process
|
||||
pub fn killed() -> Self {
|
||||
Self {
|
||||
success: false,
|
||||
code: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, E> From<Result<T, E>> for ExitStatus
|
||||
where
|
||||
T: Into<ExitStatus>,
|
||||
E: Into<ExitStatus>,
|
||||
{
|
||||
fn from(res: Result<T, E>) -> Self {
|
||||
match res {
|
||||
Ok(x) => x.into(),
|
||||
Err(x) => x.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<io::Error> for ExitStatus {
|
||||
fn from(err: io::Error) -> Self {
|
||||
Self {
|
||||
success: false,
|
||||
code: err.raw_os_error(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<std::process::ExitStatus> for ExitStatus {
|
||||
fn from(status: std::process::ExitStatus) -> Self {
|
||||
Self {
|
||||
success: status.success(),
|
||||
code: status.code(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new channel for when the exit status will be ready
|
||||
pub fn channel() -> (WaitTx, WaitRx) {
|
||||
let (tx, rx) = mpsc::channel(1);
|
||||
(WaitTx::Pending(tx), WaitRx::Pending(rx))
|
||||
}
|
||||
|
||||
/// Represents a notifier for a specific waiting state
|
||||
#[derive(Debug)]
|
||||
pub enum WaitTx {
|
||||
/// Notification has been sent
|
||||
Done,
|
||||
|
||||
/// Notification has not been sent
|
||||
Pending(mpsc::Sender<ExitStatus>),
|
||||
}
|
||||
|
||||
impl WaitTx {
|
||||
/// Send exit status to receiving-side of wait
|
||||
pub async fn send<S>(&mut self, status: S) -> io::Result<()>
|
||||
where
|
||||
S: Into<ExitStatus>,
|
||||
{
|
||||
let status = status.into();
|
||||
|
||||
match self {
|
||||
Self::Done => Err(io::Error::new(
|
||||
io::ErrorKind::BrokenPipe,
|
||||
"Notifier is closed",
|
||||
)),
|
||||
Self::Pending(tx) => {
|
||||
let res = tx.send(status).await;
|
||||
*self = Self::Done;
|
||||
|
||||
match res {
|
||||
Ok(_) => Ok(()),
|
||||
Err(x) => Err(io::Error::new(io::ErrorKind::Other, x)),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Mark wait as completed using killed status
|
||||
pub async fn kill(&mut self) -> io::Result<()> {
|
||||
self.send(ExitStatus::killed()).await
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents the state of waiting for an exit status
|
||||
#[derive(Debug)]
|
||||
pub enum WaitRx {
|
||||
/// Exit status is ready
|
||||
Ready(ExitStatus),
|
||||
|
||||
/// If receiver for an exit status has been dropped without receiving the status
|
||||
Dropped,
|
||||
|
||||
/// Exit status is not ready and has a "oneshot" to be invoked when available
|
||||
Pending(mpsc::Receiver<ExitStatus>),
|
||||
}
|
||||
|
||||
impl WaitRx {
|
||||
/// Waits until the exit status is resolved; can be called repeatedly after being
|
||||
/// resolved to immediately return the exit status again
|
||||
pub async fn recv(&mut self) -> io::Result<ExitStatus> {
|
||||
match self {
|
||||
Self::Ready(status) => Ok(*status),
|
||||
Self::Dropped => Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
"Internal resolver dropped",
|
||||
)),
|
||||
Self::Pending(rx) => match rx.recv().await {
|
||||
Some(status) => {
|
||||
*self = Self::Ready(status);
|
||||
Ok(status)
|
||||
}
|
||||
None => {
|
||||
*self = Self::Dropped;
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
"Internal resolver dropped",
|
||||
))
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,519 @@
|
||||
use crate::common::{fixtures::*, lua, poll, session};
|
||||
use assert_fs::prelude::*;
|
||||
use mlua::chunk;
|
||||
use once_cell::sync::Lazy;
|
||||
use rstest::*;
|
||||
|
||||
static TEMP_SCRIPT_DIR: Lazy<assert_fs::TempDir> = Lazy::new(|| assert_fs::TempDir::new().unwrap());
|
||||
static SCRIPT_RUNNER: Lazy<String> = Lazy::new(|| String::from("bash"));
|
||||
|
||||
static ECHO_ARGS_TO_STDOUT_SH: Lazy<assert_fs::fixture::ChildPath> = Lazy::new(|| {
|
||||
let script = TEMP_SCRIPT_DIR.child("echo_args_to_stdout.sh");
|
||||
script
|
||||
.write_str(indoc::indoc!(
|
||||
r#"
|
||||
#/usr/bin/env bash
|
||||
printf "%s" "$*"
|
||||
"#
|
||||
))
|
||||
.unwrap();
|
||||
script
|
||||
});
|
||||
|
||||
static ECHO_ARGS_TO_STDERR_SH: Lazy<assert_fs::fixture::ChildPath> = Lazy::new(|| {
|
||||
let script = TEMP_SCRIPT_DIR.child("echo_args_to_stderr.sh");
|
||||
script
|
||||
.write_str(indoc::indoc!(
|
||||
r#"
|
||||
#/usr/bin/env bash
|
||||
printf "%s" "$*" 1>&2
|
||||
"#
|
||||
))
|
||||
.unwrap();
|
||||
script
|
||||
});
|
||||
|
||||
static ECHO_STDIN_TO_STDOUT_SH: Lazy<assert_fs::fixture::ChildPath> = Lazy::new(|| {
|
||||
let script = TEMP_SCRIPT_DIR.child("echo_stdin_to_stdout.sh");
|
||||
script
|
||||
.write_str(indoc::indoc!(
|
||||
r#"
|
||||
#/usr/bin/env bash
|
||||
while IFS= read; do echo "$REPLY"; done
|
||||
"#
|
||||
))
|
||||
.unwrap();
|
||||
script
|
||||
});
|
||||
|
||||
static SLEEP_SH: Lazy<assert_fs::fixture::ChildPath> = Lazy::new(|| {
|
||||
let script = TEMP_SCRIPT_DIR.child("sleep.sh");
|
||||
script
|
||||
.write_str(indoc::indoc!(
|
||||
r#"
|
||||
#!/usr/bin/env bash
|
||||
sleep "$1"
|
||||
"#
|
||||
))
|
||||
.unwrap();
|
||||
script
|
||||
});
|
||||
|
||||
static DOES_NOT_EXIST_BIN: Lazy<assert_fs::fixture::ChildPath> =
|
||||
Lazy::new(|| TEMP_SCRIPT_DIR.child("does_not_exist_bin"));
|
||||
|
||||
#[rstest]
|
||||
fn should_return_error_on_failure(ctx: &'_ DistantServerCtx) {
|
||||
let lua = lua::make().unwrap();
|
||||
let new_session = session::make_function(&lua, ctx).unwrap();
|
||||
let schedule_fn = poll::make_function(&lua).unwrap();
|
||||
|
||||
let cmd = DOES_NOT_EXIST_BIN.to_str().unwrap().to_string();
|
||||
let args: Vec<String> = Vec::new();
|
||||
|
||||
let result = lua
|
||||
.load(chunk! {
|
||||
local session = $new_session()
|
||||
local distant = require("distant_lua")
|
||||
local f = distant.utils.wrap_async(session.spawn_async, $schedule_fn)
|
||||
|
||||
// Because of our scheduler, the invocation turns async -> sync
|
||||
local err
|
||||
f(session, { cmd = $cmd, args = $args, pty = { rows = 24, cols = 80 } }, function(success, res)
|
||||
if not success then
|
||||
err = res
|
||||
end
|
||||
end)
|
||||
assert(err, "Unexpectedly succeeded")
|
||||
})
|
||||
.exec();
|
||||
assert!(result.is_ok(), "Failed: {}", result.unwrap_err());
|
||||
}
|
||||
|
||||
#[rstest]
|
||||
fn should_return_back_process_on_success(ctx: &'_ DistantServerCtx) {
|
||||
let lua = lua::make().unwrap();
|
||||
let new_session = session::make_function(&lua, ctx).unwrap();
|
||||
let schedule_fn = poll::make_function(&lua).unwrap();
|
||||
|
||||
let cmd = SCRIPT_RUNNER.to_string();
|
||||
let args = vec![ECHO_ARGS_TO_STDOUT_SH.to_str().unwrap().to_string()];
|
||||
|
||||
let result = lua
|
||||
.load(chunk! {
|
||||
local session = $new_session()
|
||||
local distant = require("distant_lua")
|
||||
local f = distant.utils.wrap_async(session.spawn_async, $schedule_fn)
|
||||
|
||||
// Because of our scheduler, the invocation turns async -> sync
|
||||
local err, proc
|
||||
f(session, { cmd = $cmd, args = $args, pty = { rows = 24, cols = 80 } }, function(success, res)
|
||||
if success then
|
||||
proc = res
|
||||
else
|
||||
err = res
|
||||
end
|
||||
end)
|
||||
assert(not err, "Unexpectedly failed to spawn process: " .. tostring(err))
|
||||
assert(proc.id >= 0, "Invalid process returned")
|
||||
})
|
||||
.exec();
|
||||
assert!(result.is_ok(), "Failed: {}", result.unwrap_err());
|
||||
}
|
||||
|
||||
// NOTE: Ignoring on windows because it's using WSL which wants a Linux path
|
||||
// with / but thinks it's on windows and is providing \
|
||||
#[rstest]
|
||||
#[cfg_attr(windows, ignore)]
|
||||
fn should_return_process_that_can_retrieve_stdout(ctx: &'_ DistantServerCtx) {
|
||||
let lua = lua::make().unwrap();
|
||||
let new_session = session::make_function(&lua, ctx).unwrap();
|
||||
let schedule_fn = poll::make_function(&lua).unwrap();
|
||||
|
||||
let cmd = SCRIPT_RUNNER.to_string();
|
||||
let args = vec![
|
||||
ECHO_ARGS_TO_STDOUT_SH.to_str().unwrap().to_string(),
|
||||
String::from("some stdout"),
|
||||
];
|
||||
|
||||
let wait_fn = lua
|
||||
.create_function(|_, ()| {
|
||||
std::thread::sleep(std::time::Duration::from_millis(50));
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let result = lua
|
||||
.load(chunk! {
|
||||
local session = $new_session()
|
||||
local distant = require("distant_lua")
|
||||
local f = distant.utils.wrap_async(session.spawn_async, $schedule_fn)
|
||||
|
||||
// Because of our scheduler, the invocation turns async -> sync
|
||||
local err, proc
|
||||
f(session, { cmd = $cmd, args = $args, pty = { rows = 24, cols = 80 } }, function(success, res)
|
||||
if success then
|
||||
proc = res
|
||||
else
|
||||
err = res
|
||||
end
|
||||
end)
|
||||
assert(not err, "Unexpectedly failed to spawn process: " .. tostring(err))
|
||||
assert(proc, "Missing proc")
|
||||
|
||||
// Wait briefly to ensure the process sends stdout
|
||||
$wait_fn()
|
||||
|
||||
local f = distant.utils.wrap_async(proc.read_stdout_async, $schedule_fn)
|
||||
local err, stdout
|
||||
f(proc, function(success, res)
|
||||
if success then
|
||||
stdout = res
|
||||
else
|
||||
err = res
|
||||
end
|
||||
end)
|
||||
assert(not err, "Unexpectedly failed reading stdout: " .. tostring(err))
|
||||
|
||||
stdout = string.char(unpack(stdout))
|
||||
assert(stdout == "some stdout", "Unexpected stdout: " .. stdout)
|
||||
})
|
||||
.exec();
|
||||
assert!(result.is_ok(), "Failed: {}", result.unwrap_err());
|
||||
}
|
||||
|
||||
// NOTE: Ignoring on windows because it's using WSL which wants a Linux path
|
||||
// with / but thinks it's on windows and is providing \
|
||||
#[rstest]
|
||||
#[cfg_attr(windows, ignore)]
|
||||
fn should_return_process_that_can_retrieve_stderr_as_part_of_stdout(ctx: &'_ DistantServerCtx) {
|
||||
let lua = lua::make().unwrap();
|
||||
let new_session = session::make_function(&lua, ctx).unwrap();
|
||||
let schedule_fn = poll::make_function(&lua).unwrap();
|
||||
|
||||
let cmd = SCRIPT_RUNNER.to_string();
|
||||
let args = vec![
|
||||
ECHO_ARGS_TO_STDERR_SH.to_str().unwrap().to_string(),
|
||||
String::from("some stderr"),
|
||||
];
|
||||
|
||||
let wait_fn = lua
|
||||
.create_function(|_, ()| {
|
||||
std::thread::sleep(std::time::Duration::from_millis(50));
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let result = lua
|
||||
.load(chunk! {
|
||||
local session = $new_session()
|
||||
local distant = require("distant_lua")
|
||||
local f = distant.utils.wrap_async(session.spawn_async, $schedule_fn)
|
||||
|
||||
// Because of our scheduler, the invocation turns async -> sync
|
||||
local err, proc
|
||||
f(session, { cmd = $cmd, args = $args, pty = { rows = 24, cols = 80 } }, function(success, res)
|
||||
if success then
|
||||
proc = res
|
||||
else
|
||||
err = res
|
||||
end
|
||||
end)
|
||||
assert(not err, "Unexpectedly failed to spawn process: " .. tostring(err))
|
||||
assert(proc, "Missing proc")
|
||||
|
||||
// Wait briefly to ensure the process sends stdout
|
||||
$wait_fn()
|
||||
|
||||
// stderr is a broken pipe as it does not exist for pty
|
||||
local f = distant.utils.wrap_async(proc.read_stderr_async, $schedule_fn)
|
||||
local err, stderr
|
||||
f(proc, function(success, res)
|
||||
if success then
|
||||
stderr = res
|
||||
else
|
||||
err = res
|
||||
end
|
||||
end)
|
||||
assert(err)
|
||||
|
||||
// in a pty process, stderr is part of stdout
|
||||
local f = distant.utils.wrap_async(proc.read_stdout_async, $schedule_fn)
|
||||
local err, stdout
|
||||
f(proc, function(success, res)
|
||||
if success then
|
||||
stdout = res
|
||||
else
|
||||
err = res
|
||||
end
|
||||
end)
|
||||
assert(not err, "Unexpectedly failed reading stdout: " .. tostring(err))
|
||||
|
||||
// stdout should match what we'd normally expect from stderr
|
||||
stdout = string.char(unpack(stdout))
|
||||
assert(stdout == "some stderr", "Unexpected stdout: " .. stdout)
|
||||
})
|
||||
.exec();
|
||||
assert!(result.is_ok(), "Failed: {}", result.unwrap_err());
|
||||
}
|
||||
|
||||
#[rstest]
|
||||
fn should_return_error_when_killing_dead_process(ctx: &'_ DistantServerCtx) {
|
||||
let lua = lua::make().unwrap();
|
||||
let new_session = session::make_function(&lua, ctx).unwrap();
|
||||
let schedule_fn = poll::make_function(&lua).unwrap();
|
||||
|
||||
// Spawn a process that will exit immediately, but is a valid process
|
||||
let cmd = SCRIPT_RUNNER.to_string();
|
||||
let args = vec![SLEEP_SH.to_str().unwrap().to_string(), String::from("0")];
|
||||
|
||||
let wait_fn = lua
|
||||
.create_function(|_, ()| {
|
||||
std::thread::sleep(std::time::Duration::from_millis(50));
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let result = lua
|
||||
.load(chunk! {
|
||||
local session = $new_session()
|
||||
local distant = require("distant_lua")
|
||||
local f = distant.utils.wrap_async(session.spawn_async, $schedule_fn)
|
||||
|
||||
// Because of our scheduler, the invocation turns async -> sync
|
||||
local err, proc
|
||||
f(session, { cmd = $cmd, args = $args, pty = { rows = 24, cols = 80 } }, function(success, res)
|
||||
if success then
|
||||
proc = res
|
||||
else
|
||||
err = res
|
||||
end
|
||||
end)
|
||||
assert(not err, "Unexpectedly failed to spawn process: " .. tostring(err))
|
||||
assert(proc, "Missing proc")
|
||||
|
||||
// Wait briefly to ensure the process dies
|
||||
$wait_fn()
|
||||
|
||||
local f = distant.utils.wrap_async(proc.kill_async, $schedule_fn)
|
||||
local err
|
||||
f(proc, function(success, res)
|
||||
if not success then
|
||||
err = res
|
||||
end
|
||||
end)
|
||||
assert(err, "Unexpectedly succeeded in killing dead process")
|
||||
})
|
||||
.exec();
|
||||
assert!(result.is_ok(), "Failed: {}", result.unwrap_err());
|
||||
}
|
||||
|
||||
#[rstest]
|
||||
fn should_support_killing_processing(ctx: &'_ DistantServerCtx) {
|
||||
let lua = lua::make().unwrap();
|
||||
let new_session = session::make_function(&lua, ctx).unwrap();
|
||||
let schedule_fn = poll::make_function(&lua).unwrap();
|
||||
|
||||
let cmd = SCRIPT_RUNNER.to_string();
|
||||
let args = vec![SLEEP_SH.to_str().unwrap().to_string(), String::from("1")];
|
||||
|
||||
let result = lua
|
||||
.load(chunk! {
|
||||
local session = $new_session()
|
||||
local distant = require("distant_lua")
|
||||
local f = distant.utils.wrap_async(session.spawn_async, $schedule_fn)
|
||||
|
||||
// Because of our scheduler, the invocation turns async -> sync
|
||||
local err, proc
|
||||
f(session, { cmd = $cmd, args = $args, pty = { rows = 24, cols = 80 } }, function(success, res)
|
||||
if success then
|
||||
proc = res
|
||||
else
|
||||
err = res
|
||||
end
|
||||
end)
|
||||
assert(not err, "Unexpectedly failed to spawn process: " .. tostring(err))
|
||||
assert(proc, "Missing proc")
|
||||
|
||||
local f = distant.utils.wrap_async(proc.kill_async, $schedule_fn)
|
||||
local err
|
||||
f(proc, function(success, res)
|
||||
if not success then
|
||||
err = res
|
||||
end
|
||||
end)
|
||||
assert(not err, "Unexpectedly failed to kill process: " .. tostring(err))
|
||||
})
|
||||
.exec();
|
||||
assert!(result.is_ok(), "Failed: {}", result.unwrap_err());
|
||||
}
|
||||
|
||||
#[rstest]
|
||||
fn should_return_error_if_sending_stdin_to_dead_process(ctx: &'_ DistantServerCtx) {
|
||||
let lua = lua::make().unwrap();
|
||||
let new_session = session::make_function(&lua, ctx).unwrap();
|
||||
let schedule_fn = poll::make_function(&lua).unwrap();
|
||||
|
||||
// Spawn a process that will exit immediately, but is a valid process
|
||||
let cmd = SCRIPT_RUNNER.to_string();
|
||||
let args = vec![SLEEP_SH.to_str().unwrap().to_string(), String::from("0")];
|
||||
|
||||
let wait_fn = lua
|
||||
.create_function(|_, ()| {
|
||||
std::thread::sleep(std::time::Duration::from_millis(50));
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let result = lua
|
||||
.load(chunk! {
|
||||
local session = $new_session()
|
||||
local distant = require("distant_lua")
|
||||
local f = distant.utils.wrap_async(session.spawn_async, $schedule_fn)
|
||||
|
||||
// Because of our scheduler, the invocation turns async -> sync
|
||||
local err, proc
|
||||
f(session, { cmd = $cmd, args = $args, pty = { rows = 24, cols = 80 } }, function(success, res)
|
||||
if success then
|
||||
proc = res
|
||||
else
|
||||
err = res
|
||||
end
|
||||
end)
|
||||
assert(not err, "Unexpectedly failed to spawn process: " .. tostring(err))
|
||||
assert(proc, "Missing proc")
|
||||
|
||||
// Wait briefly to ensure the process dies
|
||||
$wait_fn()
|
||||
|
||||
local f = distant.utils.wrap_async(proc.write_stdin_async, $schedule_fn)
|
||||
local err
|
||||
f(proc, "some text\n", function(success, res)
|
||||
if not success then
|
||||
err = res
|
||||
end
|
||||
end)
|
||||
assert(err, "Unexpectedly succeeded")
|
||||
})
|
||||
.exec();
|
||||
assert!(result.is_ok(), "Failed: {}", result.unwrap_err());
|
||||
}
|
||||
|
||||
// NOTE: Ignoring on windows because it's using WSL which wants a Linux path
|
||||
// with / but thinks it's on windows and is providing \
|
||||
#[rstest]
|
||||
#[cfg_attr(windows, ignore)]
|
||||
fn should_support_sending_stdin_to_spawned_process(ctx: &'_ DistantServerCtx) {
|
||||
let lua = lua::make().unwrap();
|
||||
let new_session = session::make_function(&lua, ctx).unwrap();
|
||||
let schedule_fn = poll::make_function(&lua).unwrap();
|
||||
|
||||
let cmd = SCRIPT_RUNNER.to_string();
|
||||
let args = vec![ECHO_STDIN_TO_STDOUT_SH.to_str().unwrap().to_string()];
|
||||
|
||||
let wait_fn = lua
|
||||
.create_function(|_, ()| {
|
||||
std::thread::sleep(std::time::Duration::from_millis(50));
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let result = lua
|
||||
.load(chunk! {
|
||||
local session = $new_session()
|
||||
local distant = require("distant_lua")
|
||||
local f = distant.utils.wrap_async(session.spawn_async, $schedule_fn)
|
||||
|
||||
// Because of our scheduler, the invocation turns async -> sync
|
||||
local err, proc
|
||||
f(session, { cmd = $cmd, args = $args, pty = { rows = 24, cols = 80 } }, function(success, res)
|
||||
if success then
|
||||
proc = res
|
||||
else
|
||||
err = res
|
||||
end
|
||||
end)
|
||||
assert(not err, "Unexpectedly failed spawning process: " .. tostring(err))
|
||||
assert(proc, "Missing proc")
|
||||
|
||||
local f = distant.utils.wrap_async(proc.write_stdin_async, $schedule_fn)
|
||||
local err
|
||||
f(proc, "some text\n", function(success, res)
|
||||
if not success then
|
||||
err = res
|
||||
end
|
||||
end)
|
||||
assert(not err, "Unexpectedly failed writing stdin: " .. tostring(err))
|
||||
|
||||
// Wait briefly to ensure that pty reflects everything
|
||||
$wait_fn()
|
||||
|
||||
local f = distant.utils.wrap_async(proc.read_stdout_async, $schedule_fn)
|
||||
local err, stdout
|
||||
f(proc, function(success, res)
|
||||
if success then
|
||||
stdout = res
|
||||
else
|
||||
err = res
|
||||
end
|
||||
end)
|
||||
assert(not err, "Unexpectedly failed reading stdout: " .. tostring(err))
|
||||
|
||||
// NOTE: We're removing whitespace as there's some issue with properly comparing
|
||||
// due to something else being captured from pty
|
||||
stdout = string.gsub(string.char(unpack(stdout)), "%s+", "")
|
||||
|
||||
// TODO: Sometimes this comes back as "sometextsometext" (double) and I'm assuming
|
||||
// this is part of pty output, but the tests seem to have a race condition
|
||||
// to produce it, so we're just checking for either right now
|
||||
assert(
|
||||
stdout == "sometext" or stdout == "sometextsometext",
|
||||
"Unexpected stdout received: " .. stdout
|
||||
)
|
||||
})
|
||||
.exec();
|
||||
assert!(result.is_ok(), "Failed: {}", result.unwrap_err());
|
||||
}
|
||||
|
||||
// NOTE: Ignoring on windows because it's using WSL which wants a Linux path
|
||||
// with / but thinks it's on windows and is providing \
|
||||
#[rstest]
|
||||
#[cfg_attr(windows, ignore)]
|
||||
fn should_support_resizing_pty(ctx: &'_ DistantServerCtx) {
|
||||
let lua = lua::make().unwrap();
|
||||
let new_session = session::make_function(&lua, ctx).unwrap();
|
||||
let schedule_fn = poll::make_function(&lua).unwrap();
|
||||
|
||||
let cmd = SCRIPT_RUNNER.to_string();
|
||||
let args: Vec<String> = Vec::new();
|
||||
|
||||
let result = lua
|
||||
.load(chunk! {
|
||||
local session = $new_session()
|
||||
local distant = require("distant_lua")
|
||||
local f = distant.utils.wrap_async(session.spawn_async, $schedule_fn)
|
||||
|
||||
// Because of our scheduler, the invocation turns async -> sync
|
||||
local err, proc
|
||||
f(session, { cmd = $cmd, args = $args, pty = { rows = 24, cols = 80 } }, function(success, res)
|
||||
if success then
|
||||
proc = res
|
||||
else
|
||||
err = res
|
||||
end
|
||||
end)
|
||||
assert(not err, "Unexpectedly failed spawning process: " .. tostring(err))
|
||||
assert(proc, "Missing proc")
|
||||
|
||||
local f = distant.utils.wrap_async(proc.resize_async, $schedule_fn)
|
||||
local err
|
||||
f(proc, { rows = 16, cols = 40 }, function(success, res)
|
||||
if not success then
|
||||
err = res
|
||||
end
|
||||
end)
|
||||
assert(not err, "Unexpectedly failed resizing proc: " .. tostring(err))
|
||||
})
|
||||
.exec();
|
||||
assert!(result.is_ok(), "Failed: {}", result.unwrap_err());
|
||||
}
|
@ -0,0 +1,324 @@
|
||||
use crate::common::{fixtures::*, lua, session};
|
||||
use assert_fs::prelude::*;
|
||||
use mlua::chunk;
|
||||
use once_cell::sync::Lazy;
|
||||
use rstest::*;
|
||||
|
||||
static TEMP_SCRIPT_DIR: Lazy<assert_fs::TempDir> = Lazy::new(|| assert_fs::TempDir::new().unwrap());
|
||||
static SCRIPT_RUNNER: Lazy<String> = Lazy::new(|| String::from("bash"));
|
||||
|
||||
static ECHO_ARGS_TO_STDOUT_SH: Lazy<assert_fs::fixture::ChildPath> = Lazy::new(|| {
|
||||
let script = TEMP_SCRIPT_DIR.child("echo_args_to_stdout.sh");
|
||||
script
|
||||
.write_str(indoc::indoc!(
|
||||
r#"
|
||||
#/usr/bin/env bash
|
||||
printf "%s" "$*"
|
||||
"#
|
||||
))
|
||||
.unwrap();
|
||||
script
|
||||
});
|
||||
|
||||
static ECHO_ARGS_TO_STDERR_SH: Lazy<assert_fs::fixture::ChildPath> = Lazy::new(|| {
|
||||
let script = TEMP_SCRIPT_DIR.child("echo_args_to_stderr.sh");
|
||||
script
|
||||
.write_str(indoc::indoc!(
|
||||
r#"
|
||||
#/usr/bin/env bash
|
||||
printf "%s" "$*" 1>&2
|
||||
"#
|
||||
))
|
||||
.unwrap();
|
||||
script
|
||||
});
|
||||
|
||||
static ECHO_STDIN_TO_STDOUT_SH: Lazy<assert_fs::fixture::ChildPath> = Lazy::new(|| {
|
||||
let script = TEMP_SCRIPT_DIR.child("echo_stdin_to_stdout.sh");
|
||||
script
|
||||
.write_str(indoc::indoc!(
|
||||
r#"
|
||||
#/usr/bin/env bash
|
||||
while IFS= read; do echo "$REPLY"; done
|
||||
"#
|
||||
))
|
||||
.unwrap();
|
||||
script
|
||||
});
|
||||
|
||||
static SLEEP_SH: Lazy<assert_fs::fixture::ChildPath> = Lazy::new(|| {
|
||||
let script = TEMP_SCRIPT_DIR.child("sleep.sh");
|
||||
script
|
||||
.write_str(indoc::indoc!(
|
||||
r#"
|
||||
#!/usr/bin/env bash
|
||||
sleep "$1"
|
||||
"#
|
||||
))
|
||||
.unwrap();
|
||||
script
|
||||
});
|
||||
|
||||
static DOES_NOT_EXIST_BIN: Lazy<assert_fs::fixture::ChildPath> =
|
||||
Lazy::new(|| TEMP_SCRIPT_DIR.child("does_not_exist_bin"));
|
||||
|
||||
#[rstest]
|
||||
fn should_return_error_on_failure(ctx: &'_ DistantServerCtx) {
|
||||
let lua = lua::make().unwrap();
|
||||
let new_session = session::make_function(&lua, ctx).unwrap();
|
||||
|
||||
let cmd = DOES_NOT_EXIST_BIN.to_str().unwrap().to_string();
|
||||
let args: Vec<String> = Vec::new();
|
||||
|
||||
let result = lua
|
||||
.load(chunk! {
|
||||
local session = $new_session()
|
||||
local status, _ = pcall(session.spawn, session, {
|
||||
cmd = $cmd,
|
||||
args = $args,
|
||||
pty = { rows = 24, cols = 80 }
|
||||
})
|
||||
assert(not status, "Unexpectedly succeeded!")
|
||||
})
|
||||
.exec();
|
||||
assert!(result.is_ok(), "Failed: {}", result.unwrap_err());
|
||||
}
|
||||
|
||||
#[rstest]
|
||||
fn should_return_back_process_on_success(ctx: &'_ DistantServerCtx) {
|
||||
let lua = lua::make().unwrap();
|
||||
let new_session = session::make_function(&lua, ctx).unwrap();
|
||||
|
||||
let cmd = SCRIPT_RUNNER.to_string();
|
||||
let args = vec![ECHO_ARGS_TO_STDOUT_SH.to_str().unwrap().to_string()];
|
||||
|
||||
let result = lua
|
||||
.load(chunk! {
|
||||
local session = $new_session()
|
||||
local proc = session:spawn({ cmd = $cmd, args = $args, pty = { rows = 24, cols = 80 } })
|
||||
assert(proc.id >= 0, "Invalid process returned")
|
||||
})
|
||||
.exec();
|
||||
assert!(result.is_ok(), "Failed: {}", result.unwrap_err());
|
||||
}
|
||||
|
||||
// NOTE: Ignoring on windows because it's using WSL which wants a Linux path
|
||||
// with / but thinks it's on windows and is providing \
|
||||
#[rstest]
|
||||
#[cfg_attr(windows, ignore)]
|
||||
fn should_return_process_that_can_retrieve_stdout(ctx: &'_ DistantServerCtx) {
|
||||
let lua = lua::make().unwrap();
|
||||
let new_session = session::make_function(&lua, ctx).unwrap();
|
||||
|
||||
let cmd = SCRIPT_RUNNER.to_string();
|
||||
let args = vec![
|
||||
ECHO_ARGS_TO_STDOUT_SH.to_str().unwrap().to_string(),
|
||||
String::from("some stdout"),
|
||||
];
|
||||
|
||||
let wait_fn = lua
|
||||
.create_function(|_, ()| {
|
||||
std::thread::sleep(std::time::Duration::from_millis(50));
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let result = lua
|
||||
.load(chunk! {
|
||||
local session = $new_session()
|
||||
local proc = session:spawn({ cmd = $cmd, args = $args, rows = 24, cols = 80 })
|
||||
|
||||
// Wait briefly to ensure the process sends stdout
|
||||
$wait_fn()
|
||||
|
||||
local stdout = proc:read_stdout()
|
||||
stdout = string.char(unpack(stdout))
|
||||
assert(stdout == "some stdout", "Unexpected stdout: " .. stdout)
|
||||
})
|
||||
.exec();
|
||||
assert!(result.is_ok(), "Failed: {}", result.unwrap_err());
|
||||
}
|
||||
|
||||
// NOTE: Ignoring on windows because it's using WSL which wants a Linux path
|
||||
// with / but thinks it's on windows and is providing \
|
||||
#[rstest]
|
||||
#[cfg_attr(windows, ignore)]
|
||||
fn should_return_process_that_can_retrieve_stderr(ctx: &'_ DistantServerCtx) {
|
||||
let lua = lua::make().unwrap();
|
||||
let new_session = session::make_function(&lua, ctx).unwrap();
|
||||
|
||||
let cmd = SCRIPT_RUNNER.to_string();
|
||||
let args = vec![
|
||||
ECHO_ARGS_TO_STDERR_SH.to_str().unwrap().to_string(),
|
||||
String::from("some stderr"),
|
||||
];
|
||||
|
||||
let wait_fn = lua
|
||||
.create_function(|_, ()| {
|
||||
std::thread::sleep(std::time::Duration::from_millis(50));
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let result = lua
|
||||
.load(chunk! {
|
||||
local session = $new_session()
|
||||
local proc = session:spawn({ cmd = $cmd, args = $args, rows = 24, cols = 80 })
|
||||
|
||||
// Wait briefly to ensure the process sends stdout
|
||||
$wait_fn()
|
||||
|
||||
local stderr = proc:read_stderr()
|
||||
stderr = string.char(unpack(stderr))
|
||||
assert(stderr == "some stderr", "Unexpected stderr: " .. stderr)
|
||||
})
|
||||
.exec();
|
||||
assert!(result.is_ok(), "Failed: {}", result.unwrap_err());
|
||||
}
|
||||
|
||||
#[rstest]
|
||||
fn should_return_error_when_killing_dead_process(ctx: &'_ DistantServerCtx) {
|
||||
let lua = lua::make().unwrap();
|
||||
let new_session = session::make_function(&lua, ctx).unwrap();
|
||||
|
||||
// Spawn a process that will exit immediately, but is a valid process
|
||||
let cmd = SCRIPT_RUNNER.to_string();
|
||||
let args = vec![SLEEP_SH.to_str().unwrap().to_string(), String::from("0")];
|
||||
|
||||
let wait_fn = lua
|
||||
.create_function(|_, ()| {
|
||||
std::thread::sleep(std::time::Duration::from_millis(50));
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let result = lua
|
||||
.load(chunk! {
|
||||
local session = $new_session()
|
||||
local proc = session:spawn({ cmd = $cmd, args = $args, rows = 24, cols = 80 })
|
||||
|
||||
// Wait briefly to ensure the process dies
|
||||
$wait_fn()
|
||||
|
||||
local status, _ = pcall(proc.kill, proc)
|
||||
assert(not status, "Unexpectedly succeeded")
|
||||
})
|
||||
.exec();
|
||||
assert!(result.is_ok(), "Failed: {}", result.unwrap_err());
|
||||
}
|
||||
|
||||
#[rstest]
|
||||
fn should_support_killing_processing(ctx: &'_ DistantServerCtx) {
|
||||
let lua = lua::make().unwrap();
|
||||
let new_session = session::make_function(&lua, ctx).unwrap();
|
||||
|
||||
let cmd = SCRIPT_RUNNER.to_string();
|
||||
let args = vec![SLEEP_SH.to_str().unwrap().to_string(), String::from("1")];
|
||||
|
||||
let result = lua
|
||||
.load(chunk! {
|
||||
local session = $new_session()
|
||||
local proc = session:spawn({ cmd = $cmd, args = $args, rows = 24, cols = 80 })
|
||||
proc:kill()
|
||||
})
|
||||
.exec();
|
||||
assert!(result.is_ok(), "Failed: {}", result.unwrap_err());
|
||||
}
|
||||
|
||||
#[rstest]
|
||||
fn should_return_error_if_sending_stdin_to_dead_process(ctx: &'_ DistantServerCtx) {
|
||||
let lua = lua::make().unwrap();
|
||||
let new_session = session::make_function(&lua, ctx).unwrap();
|
||||
|
||||
// Spawn a process that will exit immediately, but is a valid process
|
||||
let cmd = SCRIPT_RUNNER.to_string();
|
||||
let args = vec![SLEEP_SH.to_str().unwrap().to_string(), String::from("0")];
|
||||
|
||||
let wait_fn = lua
|
||||
.create_function(|_, ()| {
|
||||
std::thread::sleep(std::time::Duration::from_millis(50));
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let result = lua
|
||||
.load(chunk! {
|
||||
local session = $new_session()
|
||||
local proc = session:spawn({ cmd = $cmd, args = $args, rows = 24, cols = 80 })
|
||||
|
||||
// Wait briefly to ensure the process dies
|
||||
$wait_fn()
|
||||
|
||||
local status, _ = pcall(proc.write_stdin, proc, "some text")
|
||||
assert(not status, "Unexpectedly succeeded")
|
||||
})
|
||||
.exec();
|
||||
assert!(result.is_ok(), "Failed: {}", result.unwrap_err());
|
||||
}
|
||||
|
||||
// NOTE: Ignoring on windows because it's using WSL which wants a Linux path
|
||||
// with / but thinks it's on windows and is providing \
|
||||
#[rstest]
|
||||
#[cfg_attr(windows, ignore)]
|
||||
fn should_support_sending_stdin_to_spawned_process(ctx: &'_ DistantServerCtx) {
|
||||
let lua = lua::make().unwrap();
|
||||
let new_session = session::make_function(&lua, ctx).unwrap();
|
||||
|
||||
let cmd = SCRIPT_RUNNER.to_string();
|
||||
let args = vec![ECHO_STDIN_TO_STDOUT_SH.to_str().unwrap().to_string()];
|
||||
|
||||
let wait_fn = lua
|
||||
.create_function(|_, ()| {
|
||||
std::thread::sleep(std::time::Duration::from_millis(50));
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let result = lua
|
||||
.load(chunk! {
|
||||
local session = $new_session()
|
||||
local proc = session:spawn({ cmd = $cmd, args = $args, rows = 24, cols = 80 })
|
||||
proc:write_stdin("some text\n")
|
||||
|
||||
// Wait briefly to ensure the process echoes stdin
|
||||
$wait_fn()
|
||||
|
||||
local stdout = proc:read_stdout()
|
||||
stdout = string.char(unpack(stdout))
|
||||
assert(stdout == "some text\n", "Unexpected stdin sent: " .. stdout)
|
||||
})
|
||||
.exec();
|
||||
assert!(result.is_ok(), "Failed: {}", result.unwrap_err());
|
||||
}
|
||||
|
||||
// NOTE: Ignoring on windows because it's using WSL which wants a Linux path
|
||||
// with / but thinks it's on windows and is providing \
|
||||
#[rstest]
|
||||
#[cfg_attr(windows, ignore)]
|
||||
fn should_support_resizing_pty(ctx: &'_ DistantServerCtx) {
|
||||
let lua = lua::make().unwrap();
|
||||
let new_session = session::make_function(&lua, ctx).unwrap();
|
||||
|
||||
let cmd = SCRIPT_RUNNER.to_string();
|
||||
let args: Vec<String> = Vec::new();
|
||||
|
||||
let wait_fn = lua
|
||||
.create_function(|_, ()| {
|
||||
std::thread::sleep(std::time::Duration::from_millis(50));
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let result = lua
|
||||
.load(chunk! {
|
||||
local session = $new_session()
|
||||
local proc = session:spawn({ cmd = $cmd, args = $args, rows = 24, cols = 80 })
|
||||
|
||||
// Wait briefly to ensure the process starts
|
||||
$wait_fn()
|
||||
|
||||
proc:resize({ rows = 16, cols = 40 })
|
||||
})
|
||||
.exec();
|
||||
assert!(result.is_ok(), "Failed: {}", result.unwrap_err());
|
||||
}
|
@ -0,0 +1,411 @@
|
||||
use async_compat::CompatExt;
|
||||
use distant_core::{PtySize, ResponseData};
|
||||
use log::*;
|
||||
use std::{
|
||||
future::Future,
|
||||
io::{self, Read, Write},
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::{sync::mpsc, task::JoinHandle};
|
||||
use wezterm_ssh::{
|
||||
Child, ChildKiller, ExecResult, MasterPty, PtySize as PortablePtySize, Session, SshChildProcess,
|
||||
};
|
||||
|
||||
const MAX_PIPE_CHUNK_SIZE: usize = 8192;
|
||||
const THREAD_PAUSE_MILLIS: u64 = 50;
|
||||
|
||||
/// Result of spawning a process, containing means to send stdin, means to kill the process,
|
||||
/// and the initialization function to use to start processing stdin, stdout, and stderr
|
||||
pub struct SpawnResult {
|
||||
pub id: usize,
|
||||
pub stdin: mpsc::Sender<Vec<u8>>,
|
||||
pub killer: mpsc::Sender<()>,
|
||||
pub resizer: mpsc::Sender<PtySize>,
|
||||
pub initialize: Box<dyn FnOnce(mpsc::Sender<Vec<ResponseData>>) + Send>,
|
||||
}
|
||||
|
||||
/// Spawns a non-pty process, returning a function that initializes processing
|
||||
/// stdin, stdout, and stderr once called (for lazy processing)
|
||||
pub async fn spawn_simple<F, R>(session: &Session, cmd: &str, cleanup: F) -> io::Result<SpawnResult>
|
||||
where
|
||||
F: FnOnce(usize) -> R + Send + 'static,
|
||||
R: Future<Output = ()> + Send + 'static,
|
||||
{
|
||||
let ExecResult {
|
||||
mut stdin,
|
||||
mut stdout,
|
||||
mut stderr,
|
||||
mut child,
|
||||
} = session
|
||||
.exec(cmd, None)
|
||||
.compat()
|
||||
.await
|
||||
.map_err(to_other_error)?;
|
||||
|
||||
// Update to be nonblocking for reading and writing
|
||||
stdin.set_non_blocking(true).map_err(to_other_error)?;
|
||||
stdout.set_non_blocking(true).map_err(to_other_error)?;
|
||||
stderr.set_non_blocking(true).map_err(to_other_error)?;
|
||||
|
||||
// Check if the process died immediately and report
|
||||
// an error if that's the case
|
||||
if let Ok(Some(exit_status)) = child.try_wait() {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::BrokenPipe,
|
||||
format!("Process exited early: {:?}", exit_status),
|
||||
));
|
||||
}
|
||||
|
||||
let (stdin_tx, stdin_rx) = mpsc::channel(1);
|
||||
let (kill_tx, kill_rx) = mpsc::channel(1);
|
||||
|
||||
let id = rand::random();
|
||||
let session = session.clone();
|
||||
let initialize = Box::new(move |reply: mpsc::Sender<Vec<ResponseData>>| {
|
||||
let stdout_task = spawn_nonblocking_stdout_task(id, stdout, reply.clone());
|
||||
let stderr_task = spawn_nonblocking_stderr_task(id, stderr, reply.clone());
|
||||
let stdin_task = spawn_nonblocking_stdin_task(id, stdin, stdin_rx);
|
||||
let _ = spawn_cleanup_task(
|
||||
session,
|
||||
id,
|
||||
child,
|
||||
kill_rx,
|
||||
stdin_task,
|
||||
stdout_task,
|
||||
Some(stderr_task),
|
||||
reply,
|
||||
cleanup,
|
||||
);
|
||||
});
|
||||
|
||||
// Create a resizer that is already closed since a simple process does not resize
|
||||
let resizer = mpsc::channel(1).0;
|
||||
|
||||
Ok(SpawnResult {
|
||||
id,
|
||||
stdin: stdin_tx,
|
||||
killer: kill_tx,
|
||||
resizer,
|
||||
initialize,
|
||||
})
|
||||
}
|
||||
|
||||
/// Spawns a pty process, returning a function that initializes processing
|
||||
/// stdin and stdout/stderr once called (for lazy processing)
|
||||
pub async fn spawn_pty<F, R>(
|
||||
session: &Session,
|
||||
cmd: &str,
|
||||
size: PtySize,
|
||||
cleanup: F,
|
||||
) -> io::Result<SpawnResult>
|
||||
where
|
||||
F: FnOnce(usize) -> R + Send + 'static,
|
||||
R: Future<Output = ()> + Send + 'static,
|
||||
{
|
||||
// TODO: Do we need to support other terminal types for TERM?
|
||||
let (pty, mut child) = session
|
||||
.request_pty("xterm-256color", to_portable_size(size), Some(cmd), None)
|
||||
.compat()
|
||||
.await
|
||||
.map_err(to_other_error)?;
|
||||
|
||||
// Check if the process died immediately and report
|
||||
// an error if that's the case
|
||||
if let Ok(Some(exit_status)) = child.try_wait() {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::BrokenPipe,
|
||||
format!("Process exited early: {:?}", exit_status),
|
||||
));
|
||||
}
|
||||
|
||||
let reader = pty
|
||||
.try_clone_reader()
|
||||
.map_err(|x| io::Error::new(io::ErrorKind::Other, x))?;
|
||||
let writer = pty
|
||||
.try_clone_writer()
|
||||
.map_err(|x| io::Error::new(io::ErrorKind::Other, x))?;
|
||||
|
||||
let (stdin_tx, stdin_rx) = mpsc::channel(1);
|
||||
let (kill_tx, kill_rx) = mpsc::channel(1);
|
||||
|
||||
let id = rand::random();
|
||||
let session = session.clone();
|
||||
let initialize = Box::new(move |reply: mpsc::Sender<Vec<ResponseData>>| {
|
||||
let stdout_task = spawn_blocking_stdout_task(id, reader, reply.clone());
|
||||
let stdin_task = spawn_blocking_stdin_task(id, writer, stdin_rx);
|
||||
let _ = spawn_cleanup_task(
|
||||
session,
|
||||
id,
|
||||
child,
|
||||
kill_rx,
|
||||
stdin_task,
|
||||
stdout_task,
|
||||
None,
|
||||
reply,
|
||||
cleanup,
|
||||
);
|
||||
});
|
||||
|
||||
let (resize_tx, mut resize_rx) = mpsc::channel::<PtySize>(1);
|
||||
tokio::spawn(async move {
|
||||
while let Some(size) = resize_rx.recv().await {
|
||||
if pty.resize(to_portable_size(size)).is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(SpawnResult {
|
||||
id,
|
||||
stdin: stdin_tx,
|
||||
killer: kill_tx,
|
||||
resizer: resize_tx,
|
||||
initialize,
|
||||
})
|
||||
}
|
||||
|
||||
fn spawn_blocking_stdout_task(
|
||||
id: usize,
|
||||
mut reader: impl Read + Send + 'static,
|
||||
tx: mpsc::Sender<Vec<ResponseData>>,
|
||||
) -> JoinHandle<()> {
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let mut buf: [u8; MAX_PIPE_CHUNK_SIZE] = [0; MAX_PIPE_CHUNK_SIZE];
|
||||
loop {
|
||||
match reader.read(&mut buf) {
|
||||
Ok(n) if n > 0 => {
|
||||
let payload = vec![ResponseData::ProcStdout {
|
||||
id,
|
||||
data: buf[..n].to_vec(),
|
||||
}];
|
||||
if tx.blocking_send(payload).is_err() {
|
||||
error!("<Ssh | Proc {}> Stdout channel closed", id);
|
||||
break;
|
||||
}
|
||||
|
||||
std::thread::sleep(Duration::from_millis(THREAD_PAUSE_MILLIS));
|
||||
}
|
||||
Ok(_) => break,
|
||||
Err(x) => {
|
||||
error!("<Ssh | Proc {}> Stdout unexpectedly closed: {}", id, x);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn spawn_nonblocking_stdout_task(
|
||||
id: usize,
|
||||
mut reader: impl Read + Send + 'static,
|
||||
tx: mpsc::Sender<Vec<ResponseData>>,
|
||||
) -> JoinHandle<()> {
|
||||
tokio::spawn(async move {
|
||||
let mut buf: [u8; MAX_PIPE_CHUNK_SIZE] = [0; MAX_PIPE_CHUNK_SIZE];
|
||||
loop {
|
||||
match reader.read(&mut buf) {
|
||||
Ok(n) if n > 0 => {
|
||||
let payload = vec![ResponseData::ProcStdout {
|
||||
id,
|
||||
data: buf[..n].to_vec(),
|
||||
}];
|
||||
if tx.send(payload).await.is_err() {
|
||||
error!("<Ssh | Proc {}> Stdout channel closed", id);
|
||||
break;
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(THREAD_PAUSE_MILLIS)).await;
|
||||
}
|
||||
Ok(_) => break,
|
||||
Err(x) if x.kind() == io::ErrorKind::WouldBlock => {
|
||||
tokio::time::sleep(Duration::from_millis(THREAD_PAUSE_MILLIS)).await;
|
||||
}
|
||||
Err(x) => {
|
||||
error!("<Ssh | Proc {}> Stdout unexpectedly closed: {}", id, x);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn spawn_nonblocking_stderr_task(
|
||||
id: usize,
|
||||
mut reader: impl Read + Send + 'static,
|
||||
tx: mpsc::Sender<Vec<ResponseData>>,
|
||||
) -> JoinHandle<()> {
|
||||
tokio::spawn(async move {
|
||||
let mut buf: [u8; MAX_PIPE_CHUNK_SIZE] = [0; MAX_PIPE_CHUNK_SIZE];
|
||||
loop {
|
||||
match reader.read(&mut buf) {
|
||||
Ok(n) if n > 0 => {
|
||||
let payload = vec![ResponseData::ProcStderr {
|
||||
id,
|
||||
data: buf[..n].to_vec(),
|
||||
}];
|
||||
if tx.send(payload).await.is_err() {
|
||||
error!("<Ssh | Proc {}> Stderr channel closed", id);
|
||||
break;
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(THREAD_PAUSE_MILLIS)).await;
|
||||
}
|
||||
Ok(_) => break,
|
||||
Err(x) if x.kind() == io::ErrorKind::WouldBlock => {
|
||||
tokio::time::sleep(Duration::from_millis(THREAD_PAUSE_MILLIS)).await;
|
||||
}
|
||||
Err(x) => {
|
||||
error!("<Ssh | Proc {}> Stderr unexpectedly closed: {}", id, x);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn spawn_blocking_stdin_task(
|
||||
id: usize,
|
||||
mut writer: impl Write + Send + 'static,
|
||||
mut rx: mpsc::Receiver<Vec<u8>>,
|
||||
) -> JoinHandle<()> {
|
||||
tokio::task::spawn_blocking(move || {
|
||||
while let Some(data) = rx.blocking_recv() {
|
||||
if let Err(x) = writer.write_all(&data) {
|
||||
error!("<Ssh | Proc {}> Failed to send stdin: {}", id, x);
|
||||
break;
|
||||
}
|
||||
|
||||
std::thread::sleep(Duration::from_millis(THREAD_PAUSE_MILLIS));
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn spawn_nonblocking_stdin_task(
|
||||
id: usize,
|
||||
mut writer: impl Write + Send + 'static,
|
||||
mut rx: mpsc::Receiver<Vec<u8>>,
|
||||
) -> JoinHandle<()> {
|
||||
tokio::spawn(async move {
|
||||
while let Some(data) = rx.recv().await {
|
||||
if let Err(x) = writer.write_all(&data) {
|
||||
// In non-blocking mode, we'll just pause and try again if
|
||||
// the IO would block here; otherwise, stop the task
|
||||
if x.kind() != io::ErrorKind::WouldBlock {
|
||||
error!("<Ssh | Proc {}> Failed to send stdin: {}", id, x);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(THREAD_PAUSE_MILLIS)).await;
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn spawn_cleanup_task<F, R>(
|
||||
session: Session,
|
||||
id: usize,
|
||||
mut child: SshChildProcess,
|
||||
mut kill_rx: mpsc::Receiver<()>,
|
||||
stdin_task: JoinHandle<()>,
|
||||
stdout_task: JoinHandle<()>,
|
||||
stderr_task: Option<JoinHandle<()>>,
|
||||
tx: mpsc::Sender<Vec<ResponseData>>,
|
||||
cleanup: F,
|
||||
) -> JoinHandle<()>
|
||||
where
|
||||
F: FnOnce(usize) -> R + Send + 'static,
|
||||
R: Future<Output = ()> + Send + 'static,
|
||||
{
|
||||
tokio::spawn(async move {
|
||||
let mut should_kill = false;
|
||||
let mut success = false;
|
||||
tokio::select! {
|
||||
_ = kill_rx.recv() => {
|
||||
should_kill = true;
|
||||
}
|
||||
result = child.async_wait().compat() => {
|
||||
match result {
|
||||
Ok(status) => {
|
||||
success = status.success();
|
||||
}
|
||||
Err(x) => {
|
||||
error!("<Ssh | Proc {}> Waiting on process failed: {}", id, x);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Force stdin task to abort if it hasn't exited as there is no
|
||||
// point to sending any more stdin
|
||||
stdin_task.abort();
|
||||
|
||||
if should_kill {
|
||||
debug!("<Ssh | Proc {}> Killing", id);
|
||||
|
||||
if let Err(x) = child.kill() {
|
||||
error!("<Ssh | Proc {}> Unable to kill process: {}", id, x);
|
||||
}
|
||||
|
||||
// NOTE: At the moment, child.kill does nothing for wezterm_ssh::SshChildProcess;
|
||||
// so, we need to manually run kill/taskkill to make sure that the
|
||||
// process is sent a kill signal
|
||||
if let Some(pid) = child.process_id() {
|
||||
let _ = session
|
||||
.exec(&format!("kill -9 {}", pid), None)
|
||||
.compat()
|
||||
.await;
|
||||
let _ = session
|
||||
.exec(&format!("taskkill /F /PID {}", pid), None)
|
||||
.compat()
|
||||
.await;
|
||||
}
|
||||
} else {
|
||||
debug!(
|
||||
"<Ssh | Proc {}> Completed and waiting on stdout & stderr tasks",
|
||||
id
|
||||
);
|
||||
}
|
||||
|
||||
// We're done with the child, so drop it
|
||||
drop(child);
|
||||
|
||||
if let Some(task) = stderr_task {
|
||||
if let Err(x) = task.await {
|
||||
error!("<Ssh | Proc {}> Join on stderr task failed: {}", id, x);
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(x) = stdout_task.await {
|
||||
error!("<Ssh | Proc {}> Join on stdout task failed: {}", id, x);
|
||||
}
|
||||
|
||||
cleanup(id).await;
|
||||
|
||||
let payload = vec![ResponseData::ProcDone {
|
||||
id,
|
||||
success: !should_kill && success,
|
||||
code: if success { Some(0) } else { None },
|
||||
}];
|
||||
|
||||
if tx.send(payload).await.is_err() {
|
||||
error!("<Ssh | Proc {}> Failed to send done", id,);
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn to_other_error<E>(err: E) -> io::Error
|
||||
where
|
||||
E: Into<Box<dyn std::error::Error + Send + Sync>>,
|
||||
{
|
||||
io::Error::new(io::ErrorKind::Other, err)
|
||||
}
|
||||
|
||||
fn to_portable_size(size: PtySize) -> PortablePtySize {
|
||||
PortablePtySize {
|
||||
rows: size.rows,
|
||||
cols: size.cols,
|
||||
pixel_width: size.pixel_width,
|
||||
pixel_height: size.pixel_height,
|
||||
}
|
||||
}
|
@ -0,0 +1,164 @@
|
||||
use crate::{
|
||||
exit::{ExitCode, ExitCodeError},
|
||||
link::RemoteProcessLink,
|
||||
opt::{CommonOpt, ShellSubcommand},
|
||||
subcommand::CommandRunner,
|
||||
utils,
|
||||
};
|
||||
use derive_more::{Display, Error, From};
|
||||
use distant_core::{LspData, PtySize, RemoteProcess, RemoteProcessError, RemoteStdin, Session};
|
||||
use log::*;
|
||||
use terminal_size::{terminal_size, Height, Width};
|
||||
use termwiz::{
|
||||
caps::Capabilities,
|
||||
input::{InputEvent, KeyCodeEncodeModes},
|
||||
terminal::{new_terminal, Terminal},
|
||||
};
|
||||
use tokio::{io, time::Duration};
|
||||
|
||||
#[derive(Debug, Display, Error, From)]
|
||||
pub enum Error {
|
||||
#[display(fmt = "Process failed with exit code: {}", _0)]
|
||||
BadProcessExit(#[error(not(source))] i32),
|
||||
Io(io::Error),
|
||||
RemoteProcess(RemoteProcessError),
|
||||
}
|
||||
|
||||
impl ExitCodeError for Error {
|
||||
fn is_silent(&self) -> bool {
|
||||
match self {
|
||||
Self::RemoteProcess(x) => x.is_silent(),
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn to_exit_code(&self) -> ExitCode {
|
||||
match self {
|
||||
Self::BadProcessExit(x) => ExitCode::Custom(*x),
|
||||
Self::Io(x) => x.to_exit_code(),
|
||||
Self::RemoteProcess(x) => x.to_exit_code(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn run(cmd: ShellSubcommand, opt: CommonOpt) -> Result<(), Error> {
|
||||
let rt = tokio::runtime::Runtime::new()?;
|
||||
|
||||
rt.block_on(async { run_async(cmd, opt).await })
|
||||
}
|
||||
|
||||
async fn run_async(cmd: ShellSubcommand, opt: CommonOpt) -> Result<(), Error> {
|
||||
let method = cmd.method;
|
||||
let timeout = opt.to_timeout_duration();
|
||||
let ssh_connection = cmd.ssh_connection.clone();
|
||||
let session_input = cmd.session;
|
||||
let session_file = cmd.session_data.session_file.clone();
|
||||
let session_socket = cmd.session_data.session_socket.clone();
|
||||
|
||||
CommandRunner {
|
||||
method,
|
||||
ssh_connection,
|
||||
session_input,
|
||||
session_file,
|
||||
session_socket,
|
||||
timeout,
|
||||
}
|
||||
.run(
|
||||
|session, _, lsp_data| Box::pin(start(cmd, session, lsp_data)),
|
||||
Error::Io,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn start(
|
||||
cmd: ShellSubcommand,
|
||||
session: Session,
|
||||
lsp_data: Option<LspData>,
|
||||
) -> Result<(), Error> {
|
||||
let mut proc = RemoteProcess::spawn(
|
||||
utils::new_tenant(),
|
||||
session.clone_channel(),
|
||||
cmd.cmd.unwrap_or_else(|| "/bin/sh".to_string()),
|
||||
cmd.args,
|
||||
cmd.detached,
|
||||
terminal_size().map(|(Width(cols), Height(rows))| PtySize::from_rows_and_cols(rows, cols)),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// If we also parsed an LSP's initialize request for its session, we want to forward
|
||||
// it along in the case of a process call
|
||||
if let Some(data) = lsp_data {
|
||||
proc.stdin
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.write(data.to_string().as_bytes())
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Create a new terminal in raw mode
|
||||
let mut terminal = new_terminal(
|
||||
Capabilities::new_from_env().map_err(|x| io::Error::new(io::ErrorKind::Other, x))?,
|
||||
)
|
||||
.map_err(|x| io::Error::new(io::ErrorKind::Other, x))?;
|
||||
terminal
|
||||
.set_raw_mode()
|
||||
.map_err(|x| io::Error::new(io::ErrorKind::Other, x))?;
|
||||
|
||||
let mut stdin = proc.stdin.take().unwrap();
|
||||
let resizer = proc.clone_resizer();
|
||||
tokio::spawn(async move {
|
||||
while let Ok(input) = terminal.poll_input(Some(Duration::new(0, 0))) {
|
||||
match input {
|
||||
Some(InputEvent::Key(ev)) => {
|
||||
if let Ok(input) = ev.key.encode(
|
||||
ev.modifiers,
|
||||
KeyCodeEncodeModes {
|
||||
enable_csi_u_key_encoding: false,
|
||||
application_cursor_keys: false,
|
||||
newline_mode: false,
|
||||
},
|
||||
) {
|
||||
if let Err(x) = stdin.write_str(input).await {
|
||||
error!("Failed to write to stdin of remote process: {}", x);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(InputEvent::Resized { cols, rows }) => {
|
||||
if let Err(x) = resizer
|
||||
.resize(PtySize::from_rows_and_cols(rows as u16, cols as u16))
|
||||
.await
|
||||
{
|
||||
error!("Failed to resize remote process: {}", x);
|
||||
break;
|
||||
}
|
||||
}
|
||||
Some(_) => continue,
|
||||
None => tokio::time::sleep(Duration::from_millis(1)).await,
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Now, map the remote LSP server's stdin/stdout/stderr to our own process
|
||||
let link = RemoteProcessLink::from_remote_pipes(
|
||||
RemoteStdin::disconnected(),
|
||||
proc.stdout.take().unwrap(),
|
||||
proc.stderr.take().unwrap(),
|
||||
);
|
||||
|
||||
// Continually loop to check for terminal resize changes while the process is still running
|
||||
let (success, exit_code) = proc.wait().await?;
|
||||
|
||||
// Shut down our link
|
||||
link.shutdown().await;
|
||||
|
||||
if !success {
|
||||
if let Some(code) = exit_code {
|
||||
return Err(Error::BadProcessExit(code));
|
||||
} else {
|
||||
return Err(Error::BadProcessExit(1));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
Loading…
Reference in New Issue