From f59ae7f6ed3f30846ecde0402c3a41d4b811299e Mon Sep 17 00:00:00 2001 From: Chip Senkbeil Date: Wed, 28 Jul 2021 15:51:42 -0500 Subject: [PATCH] Fix dropped messages on client side and lockup of transport when trying to read and write concurrently --- Cargo.lock | 200 +++++++++++++++++++++++++++++++ Cargo.toml | 1 + src/constants.rs | 3 + src/lib.rs | 1 + src/net/mod.rs | 59 ++++++--- src/net/transport/mod.rs | 9 +- src/subcommand/launch.rs | 2 +- src/subcommand/listen/handler.rs | 20 +++- src/subcommand/send.rs | 8 +- src/utils.rs | 4 +- 10 files changed, 281 insertions(+), 26 deletions(-) create mode 100644 src/constants.rs diff --git a/Cargo.lock b/Cargo.lock index 80a43cf..e10fa5d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -43,6 +43,15 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" +[[package]] +name = "block-buffer" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4" +dependencies = [ + "generic-array", +] + [[package]] name = "bumpalo" version = "3.7.0" @@ -89,18 +98,64 @@ dependencies = [ "vec_map", ] +[[package]] +name = "const-oid" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44c32f031ea41b4291d695026c023b95d59db2d8a2c7640800ed56bc8f510f22" + [[package]] name = "convert_case" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" +[[package]] +name = "cpufeatures" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66c99696f6c9dd7f35d486b9d04d7e6e202aa3e8c40d553f2fdf5e7e0c6a71ef" +dependencies = [ + "libc", +] + +[[package]] +name = "crypto-bigint" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b32a398eb1ccfbe7e4f452bc749c44d38dd732e9a253f19da224c416f00ee7f4" +dependencies = [ + "generic-array", + "rand_core", + "subtle", + "zeroize", +] + +[[package]] +name = "crypto-mac" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1d1a86f49236c215f271d40892d5fc950490551400b02ef360692c29815c714" +dependencies = [ + "generic-array", + "subtle", +] + [[package]] name = "ct-codecs" version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3b7eb4404b8195a9abb6356f4ac07d8ba267045c8d6d220ac4dc992e6cc75df" +[[package]] +name = "der" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49f215f706081a44cb702c71c39a52c05da637822e9c1645a50b7202689e982d" +dependencies = [ + "const-oid", +] + [[package]] name = "derive_more" version = "0.99.16" @@ -113,6 +168,15 @@ dependencies = [ "syn", ] +[[package]] +name = "digest" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066" +dependencies = [ + "generic-array", +] + [[package]] name = "directories" version = "3.0.2" @@ -144,6 +208,7 @@ dependencies = [ "fork", "futures", "hex", + "k256", "lazy_static", "log", "orion", @@ -160,6 +225,44 @@ dependencies = [ "whoami", ] +[[package]] +name = "ecdsa" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "713c32426287891008edb98f8b5c6abb2130aa043c93a818728fcda78606f274" +dependencies = [ + "der", + "elliptic-curve", + "hmac", + "signature", +] + +[[package]] +name = "elliptic-curve" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "069397e10739989e400628cbc0556a817a8a64119d7a2315767f4456e1332c23" +dependencies = [ + "crypto-bigint", + "ff", + "generic-array", + "group", + "pkcs8", + "rand_core", + "subtle", + "zeroize", +] + +[[package]] +name = "ff" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63eec06c61e487eecf0f7e6e6372e596a81922c28d33e645d6983ca6493a1af0" +dependencies = [ + "rand_core", + "subtle", +] + [[package]] name = "flexi_logger" version = "0.18.0" @@ -279,6 +382,16 @@ dependencies = [ "slab", ] +[[package]] +name = "generic-array" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "501466ecc8a30d1d3b7fc9229b122b2ce8ed6e9d9223f1138d4babb253e51817" +dependencies = [ + "typenum", + "version_check", +] + [[package]] name = "getrandom" version = "0.2.3" @@ -296,6 +409,17 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" +[[package]] +name = "group" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c363a5301b8f153d80747126a04b3c82073b9fe3130571a9d170cacdeaf7912" +dependencies = [ + "ff", + "rand_core", + "subtle", +] + [[package]] name = "half" version = "1.7.1" @@ -326,6 +450,16 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "hmac" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a2a2320eb7ec0ebe8da8f744d7812d9fc4cb4d09344ac01898dbcb6a20ae69b" +dependencies = [ + "crypto-mac", + "digest", +] + [[package]] name = "instant" version = "0.1.10" @@ -350,6 +484,18 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "k256" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "903ae2481bcdfdb7b68e0a9baa4b7c9aff600b9ae2e8e5bb5833b8c91ab851ea" +dependencies = [ + "cfg-if", + "ecdsa", + "elliptic-curve", + "sha2", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -452,6 +598,12 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "692fcb63b64b1758029e0a96ee63e049ce8c5948587f2f7208df04625e5f6b56" +[[package]] +name = "opaque-debug" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" + [[package]] name = "orion" version = "0.16.0" @@ -501,6 +653,16 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkcs8" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbee84ed13e44dd82689fa18348a49934fa79cc774a344c42fc9b301c71b140a" +dependencies = [ + "der", + "spki", +] + [[package]] name = "ppv-lite86" version = "0.2.10" @@ -699,6 +861,19 @@ dependencies = [ "serde", ] +[[package]] +name = "sha2" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b362ae5752fd2137731f9fa25fd4d9058af34666ca1966fb969119cc35719f12" +dependencies = [ + "block-buffer", + "cfg-if", + "cpufeatures", + "digest", + "opaque-debug", +] + [[package]] name = "signal-hook-registry" version = "1.4.0" @@ -708,6 +883,16 @@ dependencies = [ "libc", ] +[[package]] +name = "signature" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c19772be3c4dd2ceaacf03cb41d5885f2a02c4d8804884918e3a258480803335" +dependencies = [ + "digest", + "rand_core", +] + [[package]] name = "slab" version = "0.4.3" @@ -720,6 +905,15 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e" +[[package]] +name = "spki" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "987637c5ae6b3121aba9d513f869bd2bff11c4cc086c22473befd6649c0bd521" +dependencies = [ + "der", +] + [[package]] name = "strsim" version = "0.8.0" @@ -884,6 +1078,12 @@ dependencies = [ "tokio", ] +[[package]] +name = "typenum" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "879f6906492a7cd215bfa4cf595b600146ccfac0c79bcbd1f3000162af5e8b06" + [[package]] name = "unicode-segmentation" version = "1.8.0" diff --git a/Cargo.toml b/Cargo.toml index d4e1967..2c3ff10 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ derive_more = { version = "0.99.16", default-features = false, features = ["disp directories = "3.0.2" futures = "0.3.16" hex = "0.4.3" +k256 = { version = "0.9.6", features = ["ecdh"] } log = "0.4.14" orion = "0.16.0" rand = "0.8.4" diff --git a/src/constants.rs b/src/constants.rs new file mode 100644 index 0000000..b6e6f70 --- /dev/null +++ b/src/constants.rs @@ -0,0 +1,3 @@ +/// Capacity associated with a client broadcasting its received messages that +/// do not have a callback associated +pub static CLIENT_BROADCAST_CHANNEL_CAPACITY: usize = 100; diff --git a/src/lib.rs b/src/lib.rs index 1f5a584..2e3050d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +mod constants; mod data; mod net; mod opt; diff --git a/src/net/mod.rs b/src/net/mod.rs index e4a8995..d58cb30 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -2,7 +2,8 @@ mod transport; pub use transport::{Transport, TransportError, TransportReadHalf, TransportWriteHalf}; use crate::{ - data::{Request, Response, ResponsePayload}, + constants::CLIENT_BROADCAST_CHANNEL_CAPACITY, + data::{Request, Response}, utils::Session, }; use log::*; @@ -12,41 +13,54 @@ use std::{ }; use tokio::{ io, - sync::{oneshot, watch}, + sync::{broadcast, oneshot}, }; -use tokio_stream::wrappers::WatchStream; +use tokio_stream::wrappers::BroadcastStream; type Callbacks = Arc>>>; /// Represents a client that can make requests against a server pub struct Client { /// Underlying transport used by client - transport: Arc>, + t_write: TransportWriteHalf, /// Collection of callbacks to be invoked upon receiving a response to a request callbacks: Callbacks, /// Callback to trigger when a response is received without an origin or with an origin /// not found in the list of callbacks - rx: watch::Receiver, + broadcast: broadcast::Sender, + + /// Represents an initial receiver for broadcasted responses that can capture responses + /// prior to a stream being established and consumed + init_broadcast_receiver: Option>, } impl Client { /// Establishes a connection using the provided session pub async fn connect(session: Session) -> io::Result { - let transport = Arc::new(tokio::sync::Mutex::new(Transport::connect(session).await?)); + let transport = Transport::connect(session).await?; + debug!( + "Client has connected to {}", + transport + .peer_addr() + .map(|x| x.to_string()) + .unwrap_or_else(|_| String::from("???")) + ); + + let (mut t_read, t_write) = transport.into_split(); let callbacks: Callbacks = Arc::new(Mutex::new(HashMap::new())); - let (tx, rx) = watch::channel(Response::from(ResponsePayload::Error { - description: String::from("Fake server response"), - })); + let (broadcast, init_broadcast_receiver) = + broadcast::channel(CLIENT_BROADCAST_CHANNEL_CAPACITY); // Start a task that continually checks for responses and triggers callbacks - let transport_2 = Arc::clone(&transport); let callbacks_2 = Arc::clone(&callbacks); + let broadcast_2 = broadcast.clone(); tokio::spawn(async move { loop { - match transport_2.lock().await.receive::().await { + match t_read.receive::().await { Ok(Some(res)) => { + trace!("Client got response: {:?}", res); let maybe_callback = res .origin_id .as_ref() @@ -54,14 +68,16 @@ impl Client { // If there is an origin to this response, trigger the callback if let Some(tx) = maybe_callback { + trace!("Client has callback! Triggering!"); if let Err(res) = tx.send(res) { error!("Failed to trigger callback for response {}", res.id); } // Otherwise, this goes into the junk draw of response handlers } else { - if let Err(x) = tx.send(res) { - error!("Failed to trigger watch: {}", x); + trace!("Client does not have callback! Broadcasting!"); + if let Err(x) = broadcast_2.send(res) { + error!("Failed to trigger broadcast: {}", x); } } } @@ -75,20 +91,21 @@ impl Client { }); Ok(Self { - transport, + t_write, callbacks, - rx, + broadcast, + init_broadcast_receiver: Some(init_broadcast_receiver), }) } /// Sends a request and waits for a response - pub async fn send(&self, req: Request) -> Result { + pub async fn send(&mut self, req: Request) -> Result { // First, add a callback that will trigger when we get the response for this request let (tx, rx) = oneshot::channel(); self.callbacks.lock().unwrap().insert(req.id, tx); // Second, send the request - self.transport.lock().await.send(req).await?; + self.t_write.send(req).await?; // Third, wait for the response rx.await @@ -96,7 +113,11 @@ impl Client { } /// Creates and returns a new stream of responses that are received with no originating request - pub fn to_response_stream(&self) -> WatchStream { - WatchStream::new(self.rx.clone()) + pub fn to_response_stream(&mut self) -> BroadcastStream { + BroadcastStream::new( + self.init_broadcast_receiver + .take() + .unwrap_or_else(|| self.broadcast.subscribe()), + ) } } diff --git a/src/net/transport/mod.rs b/src/net/transport/mod.rs index a7ffa3a..0c9343d 100644 --- a/src/net/transport/mod.rs +++ b/src/net/transport/mod.rs @@ -7,7 +7,7 @@ use orion::{ errors::UnknownCryptoError, }; use serde::{de::DeserializeOwned, Serialize}; -use std::sync::Arc; +use std::{net::SocketAddr, sync::Arc}; use tokio::{ io, net::{tcp, TcpStream}, @@ -46,7 +46,13 @@ impl Transport { Ok(Self::new(stream, Arc::new(session.key))) } + /// Returns the address of the peer the transport is connected to + pub fn peer_addr(&self) -> io::Result { + self.inner.get_ref().peer_addr() + } + /// Sends some data across the wire + #[allow(dead_code)] pub async fn send(&mut self, data: T) -> Result<(), TransportError> { // Serialize, encrypt, and then (TODO) sign // NOTE: Cannot used packed implementation for now due to issues with deserialization @@ -61,6 +67,7 @@ impl Transport { /// Receives some data from out on the wire, waiting until it's available, /// returning none if the transport is now closed + #[allow(dead_code)] pub async fn receive(&mut self) -> Result, TransportError> { // If data is received, we process like usual if let Some(data) = self.inner.next().await { diff --git a/src/subcommand/launch.rs b/src/subcommand/launch.rs index ccffa53..2fcba44 100644 --- a/src/subcommand/launch.rs +++ b/src/subcommand/launch.rs @@ -89,7 +89,7 @@ async fn run_async(cmd: LaunchSubcommand, _opt: CommonOpt) -> Result<(), Error> session.save().await?; if cmd.print_startup_data { - println!("DISTANT DATA {} {}", port, session.to_hex_key()); + println!("DISTANT DATA {} {}", port, session.to_unprotected_hex_key()); } Ok(()) diff --git a/src/subcommand/listen/handler.rs b/src/subcommand/listen/handler.rs index 3cebcdf..79530a2 100644 --- a/src/subcommand/listen/handler.rs +++ b/src/subcommand/listen/handler.rs @@ -207,7 +207,7 @@ async fn proc_run( // Spawn a task that sends stdout as a response let tx_2 = tx.clone(); let mut stdout = child.stdout.take().unwrap(); - tokio::spawn(async move { + let stdout_task = tokio::spawn(async move { loop { let mut data = Vec::new(); match stdout.read_to_end(&mut data).await { @@ -228,7 +228,7 @@ async fn proc_run( // Spawn a task that sends stderr as a response let tx_2 = tx.clone(); let mut stderr = child.stderr.take().unwrap(); - tokio::spawn(async move { + let stderr_task = tokio::spawn(async move { loop { let mut data = Vec::new(); match stderr.read_to_end(&mut data).await { @@ -265,6 +265,14 @@ async fn proc_run( tokio::spawn(async move { tokio::select! { status = child.wait() => { + if let Err(x) = stderr_task.await { + error!("Join on stderr task failed: {}", x); + } + + if let Err(x) = stdout_task.await { + error!("Join on stdout task failed: {}", x); + } + match status { Ok(status) => { let success = status.success(); @@ -292,6 +300,14 @@ async fn proc_run( error!("Unable to kill process {}: {}", id, x); } + if let Err(x) = stderr_task.await { + error!("Join on stderr task failed: {}", x); + } + + if let Err(x) = stdout_task.await { + error!("Join on stdout task failed: {}", x); + } + if let Err(_) = tx .send(Response::from(ResponsePayload::ProcDone { id, success: false, code: None })) .await diff --git a/src/subcommand/send.rs b/src/subcommand/send.rs index e7b6c0a..ff89be4 100644 --- a/src/subcommand/send.rs +++ b/src/subcommand/send.rs @@ -23,7 +23,7 @@ pub fn run(cmd: SendSubcommand, opt: CommonOpt) -> Result<(), Error> { async fn run_async(cmd: SendSubcommand, _opt: CommonOpt) -> Result<(), Error> { let session = Session::load().await?; - let client = Client::connect(session).await?; + let mut client = Client::connect(session).await?; let req = Request::from(cmd.operation); @@ -43,6 +43,12 @@ async fn run_async(cmd: SendSubcommand, _opt: CommonOpt) -> Result<(), Error> { if is_proc_req && not_detach { let mut stream = client.to_response_stream(); while let Some(res) = stream.next().await { + let res = res.map_err(|_| { + io::Error::new( + io::ErrorKind::BrokenPipe, + "Response stream no longer available", + ) + })?; let done = res.payload.is_proc_done(); print_response(cmd.format, res)?; diff --git a/src/utils.rs b/src/utils.rs index 2f617c3..0386c8f 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -42,7 +42,7 @@ pub struct Session { impl Session { /// Returns a string representing the secret key as hex - pub fn to_hex_key(&self) -> String { + pub fn to_unprotected_hex_key(&self) -> String { hex::encode(self.key.unprotected_as_bytes()) } @@ -75,7 +75,7 @@ impl Session { /// Saves a session to disk pub async fn save(&self) -> io::Result<()> { - let key_hex_str = self.to_hex_key(); + let key_hex_str = self.to_unprotected_hex_key(); // Ensure our cache directory exists let cache_dir = PROJECT_DIRS.cache_dir();