diff --git a/src/net/transport/mod.rs b/src/net/transport/mod.rs index 2b34c84..ec03297 100644 --- a/src/net/transport/mod.rs +++ b/src/net/transport/mod.rs @@ -77,6 +77,7 @@ impl Transport { } /// Splits transport into read and write halves + #[allow(dead_code)] pub fn split(self) -> (TransportReadHalf, TransportWriteHalf) { let key = self.key; let parts = self.inner.into_parts(); @@ -86,6 +87,13 @@ impl Transport { // into the new framed instances. This means we are dropping our old buffer // data (I think). This shouldn't be a problem since we are splitting // immediately, but it would be nice to cover this properly one day + // + // From initial testing, this may actually be a problem where part of a frame + // arrives so quickly that we lose the first message. So recommendation for + // now is to create the frame halves separately first so we have no + // chance of building a partial frame + // + // See https://github.com/tokio-rs/tokio/issues/4000 let t_read = TransportReadHalf { inner: FramedRead::new(read_half, parts.codec), key: Arc::clone(&key), @@ -106,6 +114,14 @@ pub struct TransportWriteHalf { } impl TransportWriteHalf { + /// Creates a new transport write half directly from a TCP write half + pub fn new(write_half: tcp::OwnedWriteHalf, key: Arc) -> Self { + Self { + inner: FramedWrite::new(write_half, DistantCodec), + key, + } + } + /// Sends some data across the wire pub async fn send(&mut self, data: T) -> Result<(), TransportError> { // Serialize, encrypt, and then (TODO) sign @@ -127,6 +143,14 @@ pub struct TransportReadHalf { } impl TransportReadHalf { + /// Creates a new transport read half directly from a TCP read half + pub fn new(read_half: tcp::OwnedReadHalf, key: Arc) -> Self { + Self { + inner: FramedRead::new(read_half, DistantCodec), + key, + } + } + /// Receives some data from out on the wire, waiting until it's available, /// returning none if the transport is now closed pub async fn receive(&mut self) -> Result, TransportError> { diff --git a/src/subcommand/listen/mod.rs b/src/subcommand/listen/mod.rs index a0b2c78..719c045 100644 --- a/src/subcommand/listen/mod.rs +++ b/src/subcommand/listen/mod.rs @@ -1,6 +1,6 @@ use crate::{ data::{Request, Response}, - net::{Transport, TransportReadHalf, TransportWriteHalf}, + net::{TransportReadHalf, TransportWriteHalf}, opt::{CommonOpt, ConvertToIpAddrError, ListenSubcommand}, }; use derive_more::{Display, Error, From}; @@ -132,7 +132,9 @@ async fn run_async(cmd: ListenSubcommand, _opt: CommonOpt, is_forked: bool) -> R // Build a transport around the client, splitting into read and write halves so we can // handle input and output concurrently - let (t_read, t_write) = Transport::new(client, Arc::clone(&key)).split(); + let (r, w) = client.into_split(); + let t_read = TransportReadHalf::new(r, Arc::clone(&key)); + let t_write = TransportWriteHalf::new(w, Arc::clone(&key)); let (tx, rx) = mpsc::channel(cmd.max_msg_capacity as usize); // Spawn a new task that loops to handle requests from the client