Add some extra logging and complete proc-run cli tests

pull/38/head
Chip Senkbeil 3 years ago
parent b362ff5ab8
commit cf0193edc9
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

@ -5,6 +5,7 @@ use crate::{
net::{DataStream, TransportError}, net::{DataStream, TransportError},
}; };
use derive_more::{Display, Error, From}; use derive_more::{Display, Error, From};
use log::*;
use tokio::{ use tokio::{
io, io,
sync::mpsc, sync::mpsc,
@ -200,7 +201,7 @@ async fn process_outgoing_requests<T>(
where where
T: DataStream, T: DataStream,
{ {
loop { let result = loop {
tokio::select! { tokio::select! {
data = stdin_rx.recv() => { data = stdin_rx.recv() => {
match data { match data {
@ -227,7 +228,10 @@ where
} }
} }
} }
} };
trace!("Process outgoing channel closed");
result
} }
/// Helper function that loops, processing incoming stdout & stderr requests from a remote process /// Helper function that loops, processing incoming stdout & stderr requests from a remote process
@ -280,6 +284,7 @@ async fn process_incoming_responses(
} }
} }
trace!("Process incoming channel closed");
result result
} }

@ -139,7 +139,10 @@ where
} }
} }
} }
Ok(None) => break, Ok(None) => {
debug!("Session closing response task as transport read-half closed!");
break;
}
Err(x) => { Err(x) => {
error!("{}", x); error!("{}", x);
break; break;

@ -112,13 +112,6 @@ pub(super) async fn process(
let res = Response::new(req.tenant, Some(req.id), payload); let res = Response::new(req.tenant, Some(req.id), payload);
debug!(
"<Conn @ {}> Sending response of type{} {}",
conn_id,
if res.payload.len() > 1 { "s" } else { "" },
res.to_payload_type_string()
);
// Send out our primary response from processing the request // Send out our primary response from processing the request
tx.send(res).await tx.send(res).await
} }
@ -391,13 +384,8 @@ async fn proc_run(
None, None,
vec![ResponseData::ProcStdout { id, data }], vec![ResponseData::ProcStdout { id, data }],
); );
debug!(
"<Conn @ {}> Sending response of type{} {}",
conn_id,
if res.payload.len() > 1 { "s" } else { "" },
res.to_payload_type_string()
);
if let Err(_) = tx_2.send(res).await { if let Err(_) = tx_2.send(res).await {
error!("<Conn @ {}> Stdout channel closed", conn_id);
break; break;
} }
@ -432,13 +420,8 @@ async fn proc_run(
None, None,
vec![ResponseData::ProcStderr { id, data }], vec![ResponseData::ProcStderr { id, data }],
); );
debug!(
"<Conn @ {}> Sending response of type{} {}",
conn_id,
if res.payload.len() > 1 { "s" } else { "" },
res.to_payload_type_string()
);
if let Err(_) = tx_2.send(res).await { if let Err(_) = tx_2.send(res).await {
error!("<Conn @ {}> Stderr channel closed", conn_id);
break; break;
} }
@ -464,7 +447,10 @@ async fn proc_run(
let stdin_task = tokio::spawn(async move { let stdin_task = tokio::spawn(async move {
while let Some(line) = stdin_rx.recv().await { while let Some(line) = stdin_rx.recv().await {
if let Err(x) = stdin.write_all(line.as_bytes()).await { if let Err(x) = stdin.write_all(line.as_bytes()).await {
error!("Failed to send stdin to process {}: {}", id, x); error!(
"<Conn @ {}> Failed to send stdin to process {}: {}",
conn_id, id, x
);
break; break;
} }
} }
@ -476,16 +462,18 @@ async fn proc_run(
let wait_task = tokio::spawn(async move { let wait_task = tokio::spawn(async move {
tokio::select! { tokio::select! {
status = child.wait() => { status = child.wait() => {
debug!("<Conn @ {}> Process {} done", conn_id, id);
// Force stdin task to abort if it hasn't exited as there is no // Force stdin task to abort if it hasn't exited as there is no
// point to sending any more stdin // point to sending any more stdin
stdin_task.abort(); stdin_task.abort();
if let Err(x) = stderr_task.await { if let Err(x) = stderr_task.await {
error!("Join on stderr task failed: {}", x); error!("<Conn @ {}> Join on stderr task failed: {}", conn_id, x);
} }
if let Err(x) = stdout_task.await { if let Err(x) = stdout_task.await {
error!("Join on stdout task failed: {}", x); error!("<Conn @ {}> Join on stdout task failed: {}", conn_id, x);
} }
match status { match status {
@ -497,34 +485,32 @@ async fn proc_run(
None, None,
vec![ResponseData::ProcDone { id, success, code }] vec![ResponseData::ProcDone { id, success, code }]
); );
debug!(
"<Conn @ {}> Sending response of type{} {}",
conn_id,
if res.payload.len() > 1 { "s" } else { "" },
res.to_payload_type_string()
);
if let Err(_) = tx.send(res).await { if let Err(_) = tx.send(res).await {
error!("Failed to send done for process {}!", id); error!(
"<Conn @ {}> Failed to send done for process {}!",
conn_id,
id,
);
} }
} }
Err(x) => { Err(x) => {
let res = Response::new(tenant.as_str(), None, vec![ResponseData::from(x)]); let res = Response::new(tenant.as_str(), None, vec![ResponseData::from(x)]);
debug!(
"<Conn @ {}> Sending response of type{} {}",
conn_id,
if res.payload.len() > 1 { "s" } else { "" },
res.to_payload_type_string()
);
if let Err(_) = tx.send(res).await { if let Err(_) = tx.send(res).await {
error!("Failed to send error for waiting on process {}!", id); error!(
"<Conn @ {}> Failed to send error for waiting on process {}!",
conn_id,
id,
);
} }
} }
} }
}, },
_ = kill_rx => { _ = kill_rx => {
debug!("<Conn @ {}> Process {} killed", conn_id, id);
if let Err(x) = child.kill().await { if let Err(x) = child.kill().await {
error!("Unable to kill process {}: {}", id, x); error!("<Conn @ {}> Unable to kill process {}: {}", conn_id, id, x);
} }
// Force stdin task to abort if it hasn't exited as there is no // Force stdin task to abort if it hasn't exited as there is no
@ -532,28 +518,22 @@ async fn proc_run(
stdin_task.abort(); stdin_task.abort();
if let Err(x) = stderr_task.await { if let Err(x) = stderr_task.await {
error!("Join on stderr task failed: {}", x); error!("<Conn @ {}> Join on stderr task failed: {}", conn_id, x);
} }
if let Err(x) = stdout_task.await { if let Err(x) = stdout_task.await {
error!("Join on stdout task failed: {}", x); error!("<Conn @ {}> Join on stdout task failed: {}", conn_id, x);
} }
let res = Response::new(tenant.as_str(), None, vec![ResponseData::ProcDone { let res = Response::new(tenant.as_str(), None, vec![ResponseData::ProcDone {
id, success: false, code: None id, success: false, code: None
}]); }]);
debug!(
"<Conn @ {}> Sending response of type{} {}",
conn_id,
if res.payload.len() > 1 { "s" } else { "" },
res.to_payload_type_string()
);
if let Err(_) = tx if let Err(_) = tx
.send(res) .send(res)
.await .await
{ {
error!("Failed to send done for process {}!", id); error!("<Conn @ {}> Failed to send done for process {}!", conn_id, id);
} }
} }
} }

@ -240,7 +240,7 @@ async fn request_loop<T>(
} }
} }
Ok(None) => { Ok(None) => {
info!("<Conn @ {}> Input from connection closed", conn_id); trace!("<Conn @ {}> Input from connection closed", conn_id);
break; break;
} }
Err(x) => { Err(x) => {
@ -264,11 +264,20 @@ async fn response_loop<T>(
T: AsyncWrite + Send + Unpin + 'static, T: AsyncWrite + Send + Unpin + 'static,
{ {
while let Some(res) = rx.recv().await { while let Some(res) = rx.recv().await {
debug!(
"<Conn @ {}> Sending response of type{} {}",
conn_id,
if res.payload.len() > 1 { "s" } else { "" },
res.to_payload_type_string()
);
if let Err(x) = transport.send(res).await { if let Err(x) = transport.send(res).await {
error!("<Conn @ {}> {}", conn_id, x); error!("<Conn @ {}> {}", conn_id, x);
break; break;
} }
} }
trace!("<Conn @ {}> Output to connection closed", conn_id);
} }
#[cfg(test)] #[cfg(test)]

@ -105,6 +105,10 @@ impl ExitCodeError for TransportError {
} }
impl ExitCodeError for RemoteProcessError { impl ExitCodeError for RemoteProcessError {
fn is_silent(&self) -> bool {
matches!(self, Self::BadResponse)
}
fn to_exit_code(&self) -> ExitCode { fn to_exit_code(&self) -> ExitCode {
match self { match self {
Self::BadResponse => ExitCode::DataErr, Self::BadResponse => ExitCode::DataErr,

@ -29,6 +29,7 @@ impl ExitCodeError for Error {
fn is_silent(&self) -> bool { fn is_silent(&self) -> bool {
match self { match self {
Self::BadProcessExit(_) | Self::OperationFailed => true, Self::BadProcessExit(_) | Self::OperationFailed => true,
Self::RemoteProcessError(x) => x.is_silent(),
_ => false, _ => false,
} }
} }

@ -20,6 +20,13 @@ pub enum Error {
} }
impl ExitCodeError for Error { impl ExitCodeError for Error {
fn is_silent(&self) -> bool {
match self {
Self::RemoteProcessError(x) => x.is_silent(),
_ => false,
}
}
fn to_exit_code(&self) -> ExitCode { fn to_exit_code(&self) -> ExitCode {
match self { match self {
Self::BadProcessExit(x) => ExitCode::Custom(*x), Self::BadProcessExit(x) => ExitCode::Custom(*x),

@ -76,7 +76,7 @@ fn should_forward_stdin_to_remote_process(mut action_cmd: Command) {
} }
#[rstest] #[rstest]
fn yield_an_error_when_fails(mut action_cmd: Command) { fn reflect_the_exit_code_of_the_process(mut action_cmd: Command) {
// distant action proc-run {cmd} [args] // distant action proc-run {cmd} [args]
action_cmd action_cmd
.args(&["proc-run", "--"]) .args(&["proc-run", "--"])
@ -88,6 +88,18 @@ fn yield_an_error_when_fails(mut action_cmd: Command) {
.stderr(""); .stderr("");
} }
#[rstest]
fn yield_an_error_when_fails(mut action_cmd: Command) {
// distant action proc-run {cmd} [args]
action_cmd
.args(&["proc-run", "--"])
.arg(DOES_NOT_EXIST_BIN.to_str().unwrap())
.assert()
.code(ExitCode::DataErr.to_i32())
.stdout("")
.stderr("");
}
#[rstest] #[rstest]
fn should_support_json_to_execute_program_and_return_exit_status(mut action_cmd: Command) { fn should_support_json_to_execute_program_and_return_exit_status(mut action_cmd: Command) {
let req = Request { let req = Request {

@ -1,11 +1,11 @@
use crate::cli::utils;
use assert_cmd::Command; use assert_cmd::Command;
use distant_core::*; use distant_core::*;
use rstest::*; use rstest::*;
use std::{ffi::OsStr, net::SocketAddr, thread}; use std::{ffi::OsStr, net::SocketAddr, thread};
use tokio::{runtime::Runtime, sync::mpsc}; use tokio::{runtime::Runtime, sync::mpsc};
use crate::cli::utils;
const LOG_PATH: &'static str = "/tmp/distant.server.log"; const LOG_PATH: &'static str = "/tmp/test.distant.server.log";
/// Context for some listening distant server /// Context for some listening distant server
pub struct DistantServerCtx { pub struct DistantServerCtx {
@ -65,7 +65,6 @@ impl DistantServerCtx {
let mut cmd = Command::cargo_bin(env!("CARGO_PKG_NAME")).unwrap(); let mut cmd = Command::cargo_bin(env!("CARGO_PKG_NAME")).unwrap();
cmd.arg(subcommand) cmd.arg(subcommand)
.args(&["--session", "environment"]) .args(&["--session", "environment"])
.args(&["-vvv", "--log-file", "/tmp/distant.client.log"])
.env("DISTANT_HOST", self.addr.ip().to_string()) .env("DISTANT_HOST", self.addr.ip().to_string())
.env("DISTANT_PORT", self.addr.port().to_string()) .env("DISTANT_PORT", self.addr.port().to_string())
.env("DISTANT_AUTH_KEY", self.auth_key.as_str()); .env("DISTANT_AUTH_KEY", self.auth_key.as_str());

Loading…
Cancel
Save