From 79fe86ae152044cbacbaa5f9bfbb77dac35c6dd0 Mon Sep 17 00:00:00 2001 From: Chip Senkbeil Date: Wed, 28 Jul 2021 03:31:21 -0500 Subject: [PATCH] Update into_split to reuse buffers; fix newline appearing after stdout/stderr from program --- src/net/transport/mod.rs | 42 ++++++++++-------------------------- src/subcommand/listen/mod.rs | 6 ++---- src/subcommand/send.rs | 13 +++++++---- 3 files changed, 22 insertions(+), 39 deletions(-) diff --git a/src/net/transport/mod.rs b/src/net/transport/mod.rs index ec03297..a7ffa3a 100644 --- a/src/net/transport/mod.rs +++ b/src/net/transport/mod.rs @@ -77,29 +77,25 @@ impl Transport { } /// Splits transport into read and write halves - #[allow(dead_code)] - pub fn split(self) -> (TransportReadHalf, TransportWriteHalf) { + pub fn into_split(self) -> (TransportReadHalf, TransportWriteHalf) { let key = self.key; let parts = self.inner.into_parts(); let (read_half, write_half) = parts.io.into_split(); - // TODO: I can't figure out a way to re-inject the read/write buffers from parts - // into the new framed instances. This means we are dropping our old buffer - // data (I think). This shouldn't be a problem since we are splitting - // immediately, but it would be nice to cover this properly one day - // - // From initial testing, this may actually be a problem where part of a frame - // arrives so quickly that we lose the first message. So recommendation for - // now is to create the frame halves separately first so we have no - // chance of building a partial frame - // - // See https://github.com/tokio-rs/tokio/issues/4000 + // Create our split read half and populate its buffer with existing contents + let mut f_read = FramedRead::new(read_half, parts.codec); + *f_read.read_buffer_mut() = parts.read_buf; + + // Create our split write half and populate its buffer with existing contents + let mut f_write = FramedWrite::new(write_half, parts.codec); + *f_write.write_buffer_mut() = parts.write_buf; + let t_read = TransportReadHalf { - inner: FramedRead::new(read_half, parts.codec), + inner: f_read, key: Arc::clone(&key), }; let t_write = TransportWriteHalf { - inner: FramedWrite::new(write_half, parts.codec), + inner: f_write, key, }; @@ -114,14 +110,6 @@ pub struct TransportWriteHalf { } impl TransportWriteHalf { - /// Creates a new transport write half directly from a TCP write half - pub fn new(write_half: tcp::OwnedWriteHalf, key: Arc) -> Self { - Self { - inner: FramedWrite::new(write_half, DistantCodec), - key, - } - } - /// Sends some data across the wire pub async fn send(&mut self, data: T) -> Result<(), TransportError> { // Serialize, encrypt, and then (TODO) sign @@ -143,14 +131,6 @@ pub struct TransportReadHalf { } impl TransportReadHalf { - /// Creates a new transport read half directly from a TCP read half - pub fn new(read_half: tcp::OwnedReadHalf, key: Arc) -> Self { - Self { - inner: FramedRead::new(read_half, DistantCodec), - key, - } - } - /// Receives some data from out on the wire, waiting until it's available, /// returning none if the transport is now closed pub async fn receive(&mut self) -> Result, TransportError> { diff --git a/src/subcommand/listen/mod.rs b/src/subcommand/listen/mod.rs index 719c045..6d463b7 100644 --- a/src/subcommand/listen/mod.rs +++ b/src/subcommand/listen/mod.rs @@ -1,6 +1,6 @@ use crate::{ data::{Request, Response}, - net::{TransportReadHalf, TransportWriteHalf}, + net::{Transport, TransportReadHalf, TransportWriteHalf}, opt::{CommonOpt, ConvertToIpAddrError, ListenSubcommand}, }; use derive_more::{Display, Error, From}; @@ -132,9 +132,7 @@ async fn run_async(cmd: ListenSubcommand, _opt: CommonOpt, is_forked: bool) -> R // Build a transport around the client, splitting into read and write halves so we can // handle input and output concurrently - let (r, w) = client.into_split(); - let t_read = TransportReadHalf::new(r, Arc::clone(&key)); - let t_write = TransportWriteHalf::new(w, Arc::clone(&key)); + let (t_read, t_write) = Transport::new(client, Arc::clone(&key)).into_split(); let (tx, rx) = mpsc::channel(cmd.max_msg_capacity as usize); // Spawn a new task that loops to handle requests from the client diff --git a/src/subcommand/send.rs b/src/subcommand/send.rs index a3726ce..0175c0f 100644 --- a/src/subcommand/send.rs +++ b/src/subcommand/send.rs @@ -60,17 +60,22 @@ fn print_response(fmt: ResponseFormat, res: Response) -> io::Result<()> { // to print out the results let is_fmt_program = fmt.is_program(); let is_type_stderr = res.payload.is_proc_stderr(); - let do_print = !is_fmt_program || is_type_stderr || res.payload.is_proc_stdout(); + let is_type_stdout = res.payload.is_proc_stdout(); + let do_print = !is_fmt_program || is_type_stderr || is_type_stdout; let out = format_response(fmt, res)?; // Print out our response if flagged to do so if do_print { - // If we are program format and got stderr, write it to stderr + // If we are program format and got stderr, write it to stderr without altering content if is_fmt_program && is_type_stderr { - eprintln!("{}", out); + eprint!("{}", out); - // Otherwise, always go to stdout + // Else, if we are program format and got stdout, write it to stdout without altering content + } else if is_fmt_program && is_type_stdout { + print!("{}", out); + + // Otherwise, always go to stdout with traditional println } else { println!("{}", out); }