From cf0193edc9d885fd19a5ba628e2c0b909db37ef7 Mon Sep 17 00:00:00 2001 From: Chip Senkbeil Date: Mon, 30 Aug 2021 22:46:51 -0500 Subject: [PATCH] Add some extra logging and complete proc-run cli tests --- core/src/client/process.rs | 9 +++- core/src/client/session/mod.rs | 5 ++- core/src/server/distant/handler.rs | 72 +++++++++++------------------- core/src/server/distant/mod.rs | 11 ++++- src/exit.rs | 4 ++ src/subcommand/action.rs | 1 + src/subcommand/lsp.rs | 7 +++ tests/cli/action/proc_run.rs | 14 +++++- tests/cli/fixtures.rs | 5 +-- 9 files changed, 74 insertions(+), 54 deletions(-) diff --git a/core/src/client/process.rs b/core/src/client/process.rs index 8663498..a5f2772 100644 --- a/core/src/client/process.rs +++ b/core/src/client/process.rs @@ -5,6 +5,7 @@ use crate::{ net::{DataStream, TransportError}, }; use derive_more::{Display, Error, From}; +use log::*; use tokio::{ io, sync::mpsc, @@ -200,7 +201,7 @@ async fn process_outgoing_requests( where T: DataStream, { - loop { + let result = loop { tokio::select! { data = stdin_rx.recv() => { match data { @@ -227,7 +228,10 @@ where } } } - } + }; + + trace!("Process outgoing channel closed"); + result } /// Helper function that loops, processing incoming stdout & stderr requests from a remote process @@ -280,6 +284,7 @@ async fn process_incoming_responses( } } + trace!("Process incoming channel closed"); result } diff --git a/core/src/client/session/mod.rs b/core/src/client/session/mod.rs index 04c1101..31727a3 100644 --- a/core/src/client/session/mod.rs +++ b/core/src/client/session/mod.rs @@ -139,7 +139,10 @@ where } } } - Ok(None) => break, + Ok(None) => { + debug!("Session closing response task as transport read-half closed!"); + break; + } Err(x) => { error!("{}", x); break; diff --git a/core/src/server/distant/handler.rs b/core/src/server/distant/handler.rs index c4c4640..aedf9ea 100644 --- a/core/src/server/distant/handler.rs +++ b/core/src/server/distant/handler.rs @@ -112,13 +112,6 @@ pub(super) async fn process( let res = Response::new(req.tenant, Some(req.id), payload); - debug!( - " Sending response of type{} {}", - conn_id, - if res.payload.len() > 1 { "s" } else { "" }, - res.to_payload_type_string() - ); - // Send out our primary response from processing the request tx.send(res).await } @@ -391,13 +384,8 @@ async fn proc_run( None, vec![ResponseData::ProcStdout { id, data }], ); - debug!( - " Sending response of type{} {}", - conn_id, - if res.payload.len() > 1 { "s" } else { "" }, - res.to_payload_type_string() - ); if let Err(_) = tx_2.send(res).await { + error!(" Stdout channel closed", conn_id); break; } @@ -432,13 +420,8 @@ async fn proc_run( None, vec![ResponseData::ProcStderr { id, data }], ); - debug!( - " Sending response of type{} {}", - conn_id, - if res.payload.len() > 1 { "s" } else { "" }, - res.to_payload_type_string() - ); if let Err(_) = tx_2.send(res).await { + error!(" Stderr channel closed", conn_id); break; } @@ -464,7 +447,10 @@ async fn proc_run( let stdin_task = tokio::spawn(async move { while let Some(line) = stdin_rx.recv().await { if let Err(x) = stdin.write_all(line.as_bytes()).await { - error!("Failed to send stdin to process {}: {}", id, x); + error!( + " Failed to send stdin to process {}: {}", + conn_id, id, x + ); break; } } @@ -476,16 +462,18 @@ async fn proc_run( let wait_task = tokio::spawn(async move { tokio::select! { status = child.wait() => { + debug!(" Process {} done", conn_id, id); + // Force stdin task to abort if it hasn't exited as there is no // point to sending any more stdin stdin_task.abort(); if let Err(x) = stderr_task.await { - error!("Join on stderr task failed: {}", x); + error!(" Join on stderr task failed: {}", conn_id, x); } if let Err(x) = stdout_task.await { - error!("Join on stdout task failed: {}", x); + error!(" Join on stdout task failed: {}", conn_id, x); } match status { @@ -497,34 +485,32 @@ async fn proc_run( None, vec![ResponseData::ProcDone { id, success, code }] ); - debug!( - " Sending response of type{} {}", - conn_id, - if res.payload.len() > 1 { "s" } else { "" }, - res.to_payload_type_string() - ); if let Err(_) = tx.send(res).await { - error!("Failed to send done for process {}!", id); + error!( + " Failed to send done for process {}!", + conn_id, + id, + ); } } Err(x) => { let res = Response::new(tenant.as_str(), None, vec![ResponseData::from(x)]); - debug!( - " Sending response of type{} {}", - conn_id, - if res.payload.len() > 1 { "s" } else { "" }, - res.to_payload_type_string() - ); if let Err(_) = tx.send(res).await { - error!("Failed to send error for waiting on process {}!", id); + error!( + " Failed to send error for waiting on process {}!", + conn_id, + id, + ); } } } }, _ = kill_rx => { + debug!(" Process {} killed", conn_id, id); + if let Err(x) = child.kill().await { - error!("Unable to kill process {}: {}", id, x); + error!(" Unable to kill process {}: {}", conn_id, id, x); } // Force stdin task to abort if it hasn't exited as there is no @@ -532,28 +518,22 @@ async fn proc_run( stdin_task.abort(); if let Err(x) = stderr_task.await { - error!("Join on stderr task failed: {}", x); + error!(" Join on stderr task failed: {}", conn_id, x); } if let Err(x) = stdout_task.await { - error!("Join on stdout task failed: {}", x); + error!(" Join on stdout task failed: {}", conn_id, x); } let res = Response::new(tenant.as_str(), None, vec![ResponseData::ProcDone { id, success: false, code: None }]); - debug!( - " Sending response of type{} {}", - conn_id, - if res.payload.len() > 1 { "s" } else { "" }, - res.to_payload_type_string() - ); if let Err(_) = tx .send(res) .await { - error!("Failed to send done for process {}!", id); + error!(" Failed to send done for process {}!", conn_id, id); } } } diff --git a/core/src/server/distant/mod.rs b/core/src/server/distant/mod.rs index dfb5024..9cb9075 100644 --- a/core/src/server/distant/mod.rs +++ b/core/src/server/distant/mod.rs @@ -240,7 +240,7 @@ async fn request_loop( } } Ok(None) => { - info!(" Input from connection closed", conn_id); + trace!(" Input from connection closed", conn_id); break; } Err(x) => { @@ -264,11 +264,20 @@ async fn response_loop( T: AsyncWrite + Send + Unpin + 'static, { while let Some(res) = rx.recv().await { + debug!( + " Sending response of type{} {}", + conn_id, + if res.payload.len() > 1 { "s" } else { "" }, + res.to_payload_type_string() + ); + if let Err(x) = transport.send(res).await { error!(" {}", conn_id, x); break; } } + + trace!(" Output to connection closed", conn_id); } #[cfg(test)] diff --git a/src/exit.rs b/src/exit.rs index 69b461f..f8967ff 100644 --- a/src/exit.rs +++ b/src/exit.rs @@ -105,6 +105,10 @@ impl ExitCodeError for TransportError { } impl ExitCodeError for RemoteProcessError { + fn is_silent(&self) -> bool { + matches!(self, Self::BadResponse) + } + fn to_exit_code(&self) -> ExitCode { match self { Self::BadResponse => ExitCode::DataErr, diff --git a/src/subcommand/action.rs b/src/subcommand/action.rs index 93cd8f9..85cce04 100644 --- a/src/subcommand/action.rs +++ b/src/subcommand/action.rs @@ -29,6 +29,7 @@ impl ExitCodeError for Error { fn is_silent(&self) -> bool { match self { Self::BadProcessExit(_) | Self::OperationFailed => true, + Self::RemoteProcessError(x) => x.is_silent(), _ => false, } } diff --git a/src/subcommand/lsp.rs b/src/subcommand/lsp.rs index ad459da..9ea294d 100644 --- a/src/subcommand/lsp.rs +++ b/src/subcommand/lsp.rs @@ -20,6 +20,13 @@ pub enum Error { } impl ExitCodeError for Error { + fn is_silent(&self) -> bool { + match self { + Self::RemoteProcessError(x) => x.is_silent(), + _ => false, + } + } + fn to_exit_code(&self) -> ExitCode { match self { Self::BadProcessExit(x) => ExitCode::Custom(*x), diff --git a/tests/cli/action/proc_run.rs b/tests/cli/action/proc_run.rs index 2ffb721..2ff1b4a 100644 --- a/tests/cli/action/proc_run.rs +++ b/tests/cli/action/proc_run.rs @@ -76,7 +76,7 @@ fn should_forward_stdin_to_remote_process(mut action_cmd: Command) { } #[rstest] -fn yield_an_error_when_fails(mut action_cmd: Command) { +fn reflect_the_exit_code_of_the_process(mut action_cmd: Command) { // distant action proc-run {cmd} [args] action_cmd .args(&["proc-run", "--"]) @@ -88,6 +88,18 @@ fn yield_an_error_when_fails(mut action_cmd: Command) { .stderr(""); } +#[rstest] +fn yield_an_error_when_fails(mut action_cmd: Command) { + // distant action proc-run {cmd} [args] + action_cmd + .args(&["proc-run", "--"]) + .arg(DOES_NOT_EXIST_BIN.to_str().unwrap()) + .assert() + .code(ExitCode::DataErr.to_i32()) + .stdout("") + .stderr(""); +} + #[rstest] fn should_support_json_to_execute_program_and_return_exit_status(mut action_cmd: Command) { let req = Request { diff --git a/tests/cli/fixtures.rs b/tests/cli/fixtures.rs index 49b4056..c189d3f 100644 --- a/tests/cli/fixtures.rs +++ b/tests/cli/fixtures.rs @@ -1,11 +1,11 @@ +use crate::cli::utils; use assert_cmd::Command; use distant_core::*; use rstest::*; use std::{ffi::OsStr, net::SocketAddr, thread}; use tokio::{runtime::Runtime, sync::mpsc}; -use crate::cli::utils; -const LOG_PATH: &'static str = "/tmp/distant.server.log"; +const LOG_PATH: &'static str = "/tmp/test.distant.server.log"; /// Context for some listening distant server pub struct DistantServerCtx { @@ -65,7 +65,6 @@ impl DistantServerCtx { let mut cmd = Command::cargo_bin(env!("CARGO_PKG_NAME")).unwrap(); cmd.arg(subcommand) .args(&["--session", "environment"]) - .args(&["-vvv", "--log-file", "/tmp/distant.client.log"]) .env("DISTANT_HOST", self.addr.ip().to_string()) .env("DISTANT_PORT", self.addr.port().to_string()) .env("DISTANT_AUTH_KEY", self.auth_key.as_str());