Add interactive loop for shell and json modes; fix some minor discrepancies

pull/38/head
Chip Senkbeil 3 years ago
parent c7b8db517c
commit b4706e88bc
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

@ -202,6 +202,7 @@ impl From<ResponsePayload> for Response {
tag = "type",
content = "data"
)]
#[strum(serialize_all = "snake_case")]
pub enum ResponsePayload {
/// General okay with no extra data, returned in cases like
/// creating or removing a directory, copying a file, or renaming

@ -109,13 +109,19 @@ pub struct SendSubcommand {
short,
long,
case_insensitive = true,
default_value = "Shell",
default_value = "shell",
possible_values = SendMode::VARIANTS
)]
pub mode: SendMode,
/// If specified, commands to send are sent over stdin and responses are received
/// over stdout (and stderr if mode is shell)
#[structopt(short, long)]
pub interactive: bool,
/// Operation to send over the wire if not in interactive mode
#[structopt(subcommand)]
pub operation: RequestPayload,
pub operation: Option<RequestPayload>,
}
/// Represents options for binding a server to an IP address

@ -3,7 +3,7 @@ use crate::data::{
DirEntry, FileType, Request, RequestPayload, Response, ResponsePayload, RunningProcess,
};
use log::*;
use std::{error::Error, path::PathBuf, process::Stdio, sync::Arc};
use std::{error::Error, net::SocketAddr, path::PathBuf, process::Stdio, sync::Arc};
use tokio::{
io::{self, AsyncReadExt, AsyncWriteExt},
process::Command,
@ -16,13 +16,13 @@ type HState = Arc<Mutex<State>>;
/// Processes the provided request, sending replies using the given sender
pub(super) async fn process(
client_id: usize,
addr: SocketAddr,
state: HState,
req: Request,
tx: Reply,
) -> Result<(), mpsc::error::SendError<Response>> {
async fn inner(
client_id: usize,
addr: SocketAddr,
state: HState,
payload: RequestPayload,
tx: Reply,
@ -37,9 +37,7 @@ pub(super) async fn process(
RequestPayload::Remove { path, force } => remove(path, force).await,
RequestPayload::Copy { src, dst } => copy(src, dst).await,
RequestPayload::Rename { src, dst } => rename(src, dst).await,
RequestPayload::ProcRun { cmd, args } => {
proc_run(client_id, state, tx, cmd, args).await
}
RequestPayload::ProcRun { cmd, args } => proc_run(addr, state, tx, cmd, args).await,
RequestPayload::ProcKill { id } => proc_kill(state, id).await,
RequestPayload::ProcStdin { id, data } => proc_stdin(state, id, data).await,
RequestPayload::ProcList {} => proc_list(state).await,
@ -47,7 +45,7 @@ pub(super) async fn process(
}
let res = Response::from_payload_with_origin(
match inner(client_id, state, req.payload, tx.clone()).await {
match inner(addr, state, req.payload, tx.clone()).await {
Ok(payload) => payload,
Err(x) => ResponsePayload::Error {
description: x.to_string(),
@ -56,6 +54,12 @@ pub(super) async fn process(
req.id,
);
trace!(
"<Client @ {}> Sending response of type {}",
addr,
res.payload.as_ref()
);
// Send out our primary response from processing the request
tx.send(res).await
}
@ -187,7 +191,7 @@ async fn rename(src: PathBuf, dst: PathBuf) -> Result<ResponsePayload, Box<dyn E
}
async fn proc_run(
client_id: usize,
addr: SocketAddr,
state: HState,
tx: Reply,
cmd: String,
@ -210,10 +214,13 @@ async fn proc_run(
let mut data = Vec::new();
match stdout.read_to_end(&mut data).await {
Ok(n) if n > 0 => {
if let Err(_) = tx_2
.send(Response::from(ResponsePayload::ProcStdout { id, data }))
.await
{
let res = Response::from(ResponsePayload::ProcStdout { id, data });
trace!(
"<Client @ {}> Sending response of type {}",
addr,
res.payload.as_ref()
);
if let Err(_) = tx_2.send(res).await {
break;
}
}
@ -231,10 +238,13 @@ async fn proc_run(
let mut data = Vec::new();
match stderr.read_to_end(&mut data).await {
Ok(n) if n > 0 => {
if let Err(_) = tx_2
.send(Response::from(ResponsePayload::ProcStderr { id, data }))
.await
{
let res = Response::from(ResponsePayload::ProcStderr { id, data });
trace!(
"<Client @ {}> Sending response of type {}",
addr,
res.payload.as_ref()
);
if let Err(_) = tx_2.send(res).await {
break;
}
}
@ -275,18 +285,26 @@ async fn proc_run(
Ok(status) => {
let success = status.success();
let code = status.code();
if let Err(_) = tx
.send(Response::from(ResponsePayload::ProcDone { id, success, code }))
.await
{
let res = Response::from(ResponsePayload::ProcDone { id, success, code });
trace!(
"<Client @ {}> Sending response of type {}",
addr,
res.payload.as_ref()
);
if let Err(_) = tx.send(res).await {
error!("Failed to send done for process {}!", id);
}
}
Err(x) => {
if let Err(_) = tx
.send(Response::from(ResponsePayload::Error { description: x.to_string() }))
.await
{
let res = Response::from(ResponsePayload::Error {
description: x.to_string()
});
trace!(
"<Client @ {}> Sending response of type {}",
addr,
res.payload.as_ref()
);
if let Err(_) = tx.send(res).await {
error!("Failed to send error for waiting on process {}!", id);
}
}
@ -306,8 +324,17 @@ async fn proc_run(
error!("Join on stdout task failed: {}", x);
}
let res = Response::from(ResponsePayload::ProcDone {
id, success: false, code: None
});
trace!(
"<Client @ {}> Sending response of type {}",
addr,
res.payload.as_ref()
);
if let Err(_) = tx
.send(Response::from(ResponsePayload::ProcDone { id, success: false, code: None }))
.send(res)
.await
{
error!("Failed to send done for process {}!", id);
@ -330,7 +357,7 @@ async fn proc_run(
.lock()
.await
.client_processes
.entry(client_id)
.entry(addr)
.or_insert(Vec::new())
.push(id);

@ -7,7 +7,7 @@ use derive_more::{Display, Error, From};
use fork::{daemon, Fork};
use log::*;
use orion::aead::SecretKey;
use std::{collections::HashMap, sync::Arc};
use std::{collections::HashMap, net::SocketAddr, sync::Arc};
use tokio::{
io,
net::TcpListener,
@ -30,13 +30,13 @@ struct State {
processes: HashMap<usize, Process>,
/// List of processes that will be killed when a client drops
client_processes: HashMap<usize, Vec<usize>>,
client_processes: HashMap<SocketAddr, Vec<usize>>,
}
impl State {
/// Cleans up state associated with a particular client
pub async fn cleanup_client(&mut self, id: usize) {
if let Some(ids) = self.client_processes.remove(&id) {
pub async fn cleanup_client(&mut self, addr: SocketAddr) {
if let Some(ids) = self.client_processes.remove(&addr) {
for id in ids {
if let Some(process) = self.processes.remove(&id) {
if let Err(_) = process.kill_tx.send(()) {
@ -121,26 +121,22 @@ async fn run_async(cmd: ListenSubcommand, _opt: CommonOpt, is_forked: bool) -> R
// receiving data from the client
while let Ok((client, _)) = listener.accept().await {
// Grab the client's remote address for later logging purposes
let addr_string = match client.peer_addr() {
let addr = match client.peer_addr() {
Ok(addr) => {
let addr_string = addr.to_string();
info!("<Client @ {}> Established connection", addr_string);
addr_string
info!("<Client @ {}> Established connection", addr);
addr
}
Err(x) => {
error!("Unable to examine client's peer address: {}", x);
"???".to_string()
continue;
}
};
// Create a unique id for the client
let id = rand::random();
// Establish a proper connection via a handshake, discarding the connection otherwise
let transport = match Transport::from_handshake(client, Arc::clone(&key)).await {
Ok(transport) => transport,
Err(x) => {
error!("<Client @ {}> Failed handshake: {}", addr_string, x);
error!("<Client @ {}> Failed handshake: {}", addr, x);
continue;
}
};
@ -152,17 +148,17 @@ async fn run_async(cmd: ListenSubcommand, _opt: CommonOpt, is_forked: bool) -> R
// Spawn a new task that loops to handle requests from the client
tokio::spawn({
let f = request_loop(id, addr_string.to_string(), Arc::clone(&state), t_read, tx);
let f = request_loop(addr, Arc::clone(&state), t_read, tx);
let state = Arc::clone(&state);
async move {
f.await;
state.lock().await.cleanup_client(id).await;
state.lock().await.cleanup_client(addr).await;
}
});
// Spawn a new task that loops to handle responses to the client
tokio::spawn(async move { response_loop(addr_string, t_write, rx).await });
tokio::spawn(async move { response_loop(addr, t_write, rx).await });
}
Ok(())
@ -171,8 +167,7 @@ async fn run_async(cmd: ListenSubcommand, _opt: CommonOpt, is_forked: bool) -> R
/// Repeatedly reads in new requests, processes them, and sends their responses to the
/// response loop
async fn request_loop(
id: usize,
addr: String,
addr: SocketAddr,
state: Arc<Mutex<State>>,
mut transport: TransportReadHalf,
tx: mpsc::Sender<Response>,
@ -182,21 +177,21 @@ async fn request_loop(
Ok(Some(req)) => {
trace!(
"<Client @ {}> Received request of type {}",
addr.as_str(),
addr,
req.payload.as_ref()
);
if let Err(x) = handler::process(id, Arc::clone(&state), req, tx.clone()).await {
error!("<Client @ {}> {}", addr.as_str(), x);
if let Err(x) = handler::process(addr, Arc::clone(&state), req, tx.clone()).await {
error!("<Client @ {}> {}", addr, x);
break;
}
}
Ok(None) => {
info!("<Client @ {}> Closed connection", addr.as_str());
info!("<Client @ {}> Closed connection", addr);
break;
}
Err(x) => {
error!("<Client @ {}> {}", addr.as_str(), x);
error!("<Client @ {}> {}", addr, x);
break;
}
}
@ -204,13 +199,13 @@ async fn request_loop(
}
async fn response_loop(
addr: String,
addr: SocketAddr,
mut transport: TransportWriteHalf,
mut rx: mpsc::Receiver<Response>,
) {
while let Some(res) = rx.recv().await {
if let Err(x) = transport.send(res).await {
error!("<Client @ {}> {}", addr.as_str(), x);
error!("<Client @ {}> {}", addr, x);
break;
}
}

@ -6,7 +6,14 @@ use crate::{
};
use derive_more::{Display, Error, From};
use log::*;
use tokio::{io, sync::mpsc};
use structopt::StructOpt;
use tokio::{
io,
sync::{
mpsc,
oneshot::{self, error::TryRecvError},
},
};
use tokio_stream::StreamExt;
#[derive(Debug, Display, Error, From)]
@ -14,6 +21,9 @@ pub enum Error {
IoError(io::Error),
SessionError(SessionError),
TransportError(TransportError),
#[display(fmt = "Non-interactive but no operation supplied")]
MissingOperation,
}
pub fn run(cmd: SendSubcommand, opt: CommonOpt) -> Result<(), Error> {
@ -26,61 +36,34 @@ async fn run_async(cmd: SendSubcommand, _opt: CommonOpt) -> Result<(), Error> {
let session = Session::load().await?;
let mut client = Client::connect(session).await?;
let req = Request::from(cmd.operation);
if !cmd.interactive && cmd.operation.is_none() {
return Err(Error::MissingOperation);
}
// Special conditions for continuing to process responses
let is_proc_req = req.payload.is_proc_run();
let res = client.send(req).await?;
// Store the spawned process id for using in sending stdin (if we spawned a proc)
let proc_id = match &res.payload {
ResponsePayload::ProcStart { id } => *id,
_ => 0,
};
format_response(cmd.mode, res)?.print();
// If we are executing a process and not detaching, we want to continue receiving
// responses sent to us
if is_proc_req {
let mut stream = client.to_response_stream();
// We also want to spawn a task to handle sending stdin to the remote process
let mut rx = spawn_stdin_reader();
tokio::spawn(async move {
while let Some(line) = rx.recv().await {
trace!("Client sending stdin: {:?}", line);
let req = Request::from(RequestPayload::ProcStdin {
id: proc_id,
data: line.into_bytes(),
});
let result = client.send(req).await;
if let Err(x) = result {
error!(
"Failed to send stdin to remote process ({}): {}",
proc_id, x
);
}
}
});
let mut is_proc_req = false;
let mut proc_id = 0;
while let Some(res) = stream.next().await {
let res = res.map_err(|_| {
io::Error::new(
io::ErrorKind::BrokenPipe,
"Response stream no longer available",
)
})?;
let done = res.payload.is_proc_done();
if let Some(req) = cmd.operation.map(Request::from) {
is_proc_req = req.payload.is_proc_run();
format_response(cmd.mode, res)?.print();
let res = client.send(req).await?;
if done {
break;
}
}
// Store the spawned process id for using in sending stdin (if we spawned a proc)
proc_id = match &res.payload {
ResponsePayload::ProcStart { id } => *id,
_ => 0,
};
format_response(cmd.mode, res)?.print();
}
// If we are executing a process, we want to continue interacting via stdin and receiving
// results via stdout/stderr
//
// If we are interactive, we want to continue looping regardless
if is_proc_req || cmd.interactive {
interactive_loop(client, proc_id, cmd.mode, cmd.interactive).await?;
}
Ok(())
@ -112,6 +95,125 @@ fn spawn_stdin_reader() -> mpsc::Receiver<String> {
rx
}
async fn interactive_loop(
mut client: Client,
id: usize,
mode: SendMode,
interactive: bool,
) -> Result<(), Error> {
let mut stream = client.to_response_stream();
// Create a channel that can report when we should stop the loop based on a received request
let (tx_stop, mut rx_stop) = oneshot::channel::<()>();
// We also want to spawn a task to handle sending stdin to the remote process
let mut rx = spawn_stdin_reader();
tokio::spawn(async move {
while let Some(line) = rx.recv().await {
match mode {
// Special exit condition for interactive mode
_ if line.trim() == "exit" => {
if let Err(_) = tx_stop.send(()) {
error!("Failed to close interactive loop!");
}
break;
}
// For json mode, all stdin is treated as individual requests
SendMode::Json => {
trace!("Client sending request: {:?}", line);
let result = serde_json::from_str(&line)
.map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x));
match result {
Ok(req) => match client.send(req).await {
Ok(res) => match format_response(mode, res) {
Ok(out) => out.print(),
Err(x) => error!("Failed to format response: {}", x),
},
Err(x) => {
error!("Failed to send request to remote process ({}): {}", id, x)
}
},
Err(x) => {
error!("Failed to serialize request: {}", x);
}
}
}
// For interactive shell mode, parse stdin as individual commands
SendMode::Shell if interactive => {
if line.trim().is_empty() {
continue;
}
trace!("Client sending command: {:?}", line);
// NOTE: We have to stick something in as the first argument as clap/structopt
// expect the binary name as the first item in the iterator
let payload_result = RequestPayload::from_iter_safe(
std::iter::once("distant")
.chain(line.trim().split(' ').filter(|s| !s.trim().is_empty())),
);
match payload_result {
Ok(payload) => match client.send(Request::from(payload)).await {
Ok(res) => match format_response(mode, res) {
Ok(out) => out.print(),
Err(x) => error!("Failed to format response: {}", x),
},
Err(x) => {
error!("Failed to send request to remote process ({}): {}", id, x)
}
},
Err(x) => {
error!("Failed to parse command: {}", x);
}
}
}
// For non-interactive shell mode, all stdin is treated as a proc's stdin
SendMode::Shell => {
trace!("Client sending stdin: {:?}", line);
let req = Request::from(RequestPayload::ProcStdin {
id,
data: line.into_bytes(),
});
let result = client.send(req).await;
if let Err(x) = result {
error!("Failed to send stdin to remote process ({}): {}", id, x);
}
}
}
}
});
while let Err(TryRecvError::Empty) = rx_stop.try_recv() {
if let Some(res) = stream.next().await {
let res = res.map_err(|_| {
io::Error::new(
io::ErrorKind::BrokenPipe,
"Response stream no longer available",
)
})?;
let done = res.payload.is_proc_done() && !interactive;
format_response(mode, res)?.print();
// If we aren't interactive but are just running a proc and
// we've received the end of the proc, we should exit
if done {
break;
}
// If we have nothing else in our stream, we should also exit
} else {
break;
}
}
Ok(())
}
/// Represents the output content and destination
enum ResponseOut {
Stdout(String),
@ -131,10 +233,11 @@ impl ResponseOut {
fn format_response(mode: SendMode, res: Response) -> io::Result<ResponseOut> {
Ok(match mode {
SendMode::Json => ResponseOut::Stdout(
SendMode::Json => ResponseOut::Stdout(format!(
"{}\n",
serde_json::to_string(&res)
.map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x))?,
),
.map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x))?
)),
SendMode::Shell => format_shell(res),
})
}
@ -143,13 +246,14 @@ fn format_shell(res: Response) -> ResponseOut {
match res.payload {
ResponsePayload::Ok => ResponseOut::None,
ResponsePayload::Error { description } => {
ResponseOut::Stderr(format!("Failed: '{}'.", description))
ResponseOut::Stderr(format!("Failed: '{}'.\n", description))
}
ResponsePayload::Blob { data } => {
ResponseOut::Stdout(String::from_utf8_lossy(&data).to_string())
}
ResponsePayload::Text { data } => ResponseOut::Stdout(data),
ResponsePayload::DirEntries { entries } => ResponseOut::Stdout(
ResponsePayload::DirEntries { entries } => ResponseOut::Stdout(format!(
"{}\n",
entries
.into_iter()
.map(|entry| {
@ -165,14 +269,15 @@ fn format_shell(res: Response) -> ResponseOut {
})
.collect::<Vec<String>>()
.join("\n"),
),
ResponsePayload::ProcEntries { entries } => ResponseOut::Stdout(
)),
ResponsePayload::ProcEntries { entries } => ResponseOut::Stdout(format!(
"{}\n",
entries
.into_iter()
.map(|entry| format!("{}: {} {}", entry.id, entry.cmd, entry.args.join(" ")))
.collect::<Vec<String>>()
.join("\n"),
),
)),
ResponsePayload::ProcStart { .. } => ResponseOut::None,
ResponsePayload::ProcStdout { data, .. } => {
ResponseOut::Stdout(String::from_utf8_lossy(&data).to_string())
@ -184,9 +289,9 @@ fn format_shell(res: Response) -> ResponseOut {
if success {
ResponseOut::None
} else if let Some(code) = code {
ResponseOut::Stderr(format!("Proc {} failed with code {}", id, code))
ResponseOut::Stderr(format!("Proc {} failed with code {}\n", id, code))
} else {
ResponseOut::Stderr(format!("Proc {} failed", id))
ResponseOut::Stderr(format!("Proc {} failed\n", id))
}
}
}

Loading…
Cancel
Save