Fix some proc issues, but proc tests still not fully passing

pull/38/head
Chip Senkbeil 3 years ago
parent f66a234873
commit fc1c262f55
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

@ -90,15 +90,19 @@ impl RemoteProcess {
let (stdout_tx, stdout_rx) = mpsc::channel(CLIENT_BROADCAST_CHANNEL_CAPACITY);
let (stderr_tx, stderr_rx) = mpsc::channel(CLIENT_BROADCAST_CHANNEL_CAPACITY);
// Used to terminate request task, either explicitly by the process or internally
// by the response task when it terminates
let (kill_tx, kill_rx) = mpsc::channel(1);
// Now we spawn a task to handle future responses that are async
// such as ProcStdout, ProcStderr, and ProcDone
let kill_tx_2 = kill_tx.clone();
let broadcast = session.broadcast.take().unwrap();
let res_task = tokio::spawn(async move {
process_incoming_responses(id, broadcast, stdout_tx, stderr_tx).await
process_incoming_responses(id, broadcast, stdout_tx, stderr_tx, kill_tx_2).await
});
// Spawn a task that takes stdin from our channel and forwards it to the remote process
let (kill_tx, kill_rx) = mpsc::channel(1);
let req_task = tokio::spawn(async move {
process_outgoing_requests(tenant, id, session, stdin_rx, kill_rx).await
});
@ -232,6 +236,7 @@ async fn process_incoming_responses(
mut broadcast: mpsc::Receiver<Response>,
stdout_tx: mpsc::Sender<String>,
stderr_tx: mpsc::Sender<String>,
kill_tx: mpsc::Sender<()>,
) -> Result<(bool, Option<i32>), RemoteProcessError> {
let mut result = Err(RemoteProcessError::UnexpectedEof);
@ -267,6 +272,10 @@ async fn process_incoming_responses(
// If we got a termination, then exit accordingly
if let Some((success, code)) = exit_status {
result = Ok((success, code));
// Flag that the other task should conclude
let _ = kill_tx.try_send(());
break;
}
}

@ -169,6 +169,8 @@ where
/// Sends a request and waits for a response
pub async fn send(&mut self, req: Request) -> Result<Response, TransportError> {
trace!("Sending request: {:?}", req);
// First, add a callback that will trigger when we get the response for this request
let (tx, rx) = oneshot::channel();
self.callbacks.lock().unwrap().insert(req.id, tx);
@ -197,6 +199,7 @@ where
///
/// Any response that would be received gets sent over the broadcast channel instead
pub async fn fire(&mut self, req: Request) -> Result<(), TransportError> {
trace!("Firing off request: {:?}", req);
self.t_write.send(req).await
}

@ -476,9 +476,9 @@ async fn proc_run(
let wait_task = tokio::spawn(async move {
tokio::select! {
status = child.wait() => {
if let Err(x) = stdin_task.await {
error!("Join on stdin task failed: {}", x);
}
// Force stdin task to abort if it hasn't exited as there is no
// point to sending any more stdin
stdin_task.abort();
if let Err(x) = stderr_task.await {
error!("Join on stderr task failed: {}", x);
@ -527,6 +527,10 @@ async fn proc_run(
error!("Unable to kill process {}: {}", id, x);
}
// Force stdin task to abort if it hasn't exited as there is no
// point to sending any more stdin
stdin_task.abort();
if let Err(x) = stderr_task.await {
error!("Join on stderr task failed: {}", x);
}
@ -560,7 +564,7 @@ async fn proc_run(
cmd,
args,
id,
stdin_tx,
stdin_tx: Some(stdin_tx),
kill_tx,
wait_task,
};
@ -584,9 +588,12 @@ async fn proc_kill(state: HState, id: usize) -> Result<ResponseData, ServerError
async fn proc_stdin(state: HState, id: usize, data: String) -> Result<ResponseData, ServerError> {
if let Some(process) = state.lock().await.processes.get(&id) {
process.stdin_tx.send(data).await.map_err(|_| {
io::Error::new(io::ErrorKind::BrokenPipe, "Unable to send stdin to process")
})?;
if !process.send_stdin(data).await {
return Err(ServerError::IoError(io::Error::new(
io::ErrorKind::BrokenPipe,
"Unable to send stdin to process",
)));
}
}
Ok(ResponseData::Ok)

@ -66,7 +66,7 @@ pub struct Process {
/// Transport channel to send new input to the stdin of the process,
/// one line at a time
pub stdin_tx: mpsc::Sender<String>,
pub stdin_tx: Option<mpsc::Sender<String>>,
/// Transport channel to report that the process should be killed
pub kill_tx: oneshot::Sender<()>,
@ -76,6 +76,20 @@ pub struct Process {
}
impl Process {
pub async fn send_stdin(&self, input: impl Into<String>) -> bool {
if let Some(stdin) = self.stdin_tx.as_ref() {
if stdin.send(input.into()).await.is_ok() {
return true;
}
}
false
}
pub fn close_stdin(&mut self) {
self.stdin_tx.take();
}
pub async fn kill_and_wait(self) -> Result<(), JoinError> {
let _ = self.kill_tx.send(());
self.wait_task.await

@ -81,9 +81,9 @@ fn yield_an_error_when_fails(mut action_cmd: Command) {
action_cmd
.args(&["proc-run", "--"])
.arg(EXIT_CODE_SH.to_str().unwrap())
.arg("3")
.arg("99")
.assert()
.code(3)
.code(99)
.stdout("")
.stderr("");
}

@ -3,6 +3,9 @@ use distant_core::*;
use rstest::*;
use std::{ffi::OsStr, net::SocketAddr, thread};
use tokio::{runtime::Runtime, sync::mpsc};
use crate::cli::utils;
const LOG_PATH: &'static str = "/tmp/distant.server.log";
/// Context for some listening distant server
pub struct DistantServerCtx {
@ -26,6 +29,7 @@ impl DistantServerCtx {
thread::spawn(move || match Runtime::new() {
Ok(rt) => {
rt.block_on(async move {
let logger = utils::init_logging(LOG_PATH);
let server = DistantServer::bind(ip_addr, "0".parse().unwrap(), None, 100)
.await
.unwrap();
@ -36,6 +40,8 @@ impl DistantServerCtx {
.unwrap();
let _ = done_rx.recv().await;
logger.flush();
logger.shutdown();
});
}
Err(x) => {
@ -59,6 +65,7 @@ impl DistantServerCtx {
let mut cmd = Command::cargo_bin(env!("CARGO_PKG_NAME")).unwrap();
cmd.arg(subcommand)
.args(&["--session", "environment"])
.args(&["-vvv", "--log-file", "/tmp/distant.client.log"])
.env("DISTANT_HOST", self.addr.ip().to_string())
.env("DISTANT_PORT", self.addr.port().to_string())
.env("DISTANT_AUTH_KEY", self.auth_key.as_str());

@ -1,4 +1,5 @@
use predicates::prelude::*;
use std::path::PathBuf;
lazy_static::lazy_static! {
/// Predicate that checks for a single line that is a failure
@ -15,3 +16,25 @@ pub fn regex_pred(s: &str) -> predicates::str::RegexPredicate {
pub fn random_tenant() -> String {
format!("test-tenant-{}", rand::random::<u16>())
}
/// Initializes logging (should only call once)
pub fn init_logging(path: impl Into<PathBuf>) -> flexi_logger::LoggerHandle {
use flexi_logger::{FileSpec, LevelFilter, LogSpecification, Logger};
let modules = &["distant", "distant_core"];
// Disable logging for everything but our binary, which is based on verbosity
let mut builder = LogSpecification::builder();
builder.default(LevelFilter::Off);
// For each module, configure logging
for module in modules {
builder.module(module, LevelFilter::Trace);
}
// Create our logger, but don't initialize yet
let logger = Logger::with(builder.build())
.format_for_files(flexi_logger::opt_format)
.log_to_file(FileSpec::try_from(path).expect("Failed to create log file spec"));
logger.start().expect("Failed to initialize logger")
}

Loading…
Cancel
Save