diff --git a/distant-plugin/src/client.rs b/distant-plugin/src/client.rs index 8907c83..0e133d9 100644 --- a/distant-plugin/src/client.rs +++ b/distant-plugin/src/client.rs @@ -8,27 +8,29 @@ use tokio::sync::mpsc; use crate::api::{ Api, Ctx, FileSystemApi, ProcessApi, SearchApi, SystemInfoApi, VersionApi, WatchApi, }; +use crate::common::Stream; /// Full API for a distant-compatible client. #[async_trait] pub trait Client { /// Sends a request and returns a stream of responses, failing if unable to send a request or /// if the session's receiving line to the remote server has already been severed. - async fn mail(&mut self, request: Request) -> io::Result>; + async fn send(&mut self, request: Request) -> io::Result>>; - /// Sends a request and waits for a response, failing if unable to send a request or if + /// Sends a request and waits for a single response, failing if unable to send a request or if /// the session's receiving line to the remote server has already been severed. - async fn send(&mut self, request: Request) -> io::Result { - let mut rx = self.mail(request).await?; - rx.recv() + async fn send_once(&mut self, request: Request) -> io::Result { + self.send(request) + .await? + .next() .await .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Channel has closed")) } - /// Sends a request without waiting for a response; this method is able to be used even + /// Sends a request without waiting for any response; this method is able to be used even /// if the session's receiving line to the remote server has been severed. async fn fire(&mut self, request: Request) -> io::Result<()> { - let _ = self.send(request).await?; + let _ = self.send_once(request).await?; Ok(()) } } @@ -52,7 +54,7 @@ impl ClientBridge { #[async_trait] impl Client for ClientBridge { - async fn mail(&mut self, request: Request) -> io::Result> { + async fn send(&mut self, request: Request) -> io::Result>> { #[derive(Clone, Debug)] struct __Ctx(u32, mpsc::UnboundedSender); @@ -87,7 +89,7 @@ impl Client for ClientBridge { } }); - Ok(rx) + Ok(Box::new(rx)) } } diff --git a/distant-plugin/src/common.rs b/distant-plugin/src/common.rs index 5f8f628..1b724ae 100644 --- a/distant-plugin/src/common.rs +++ b/distant-plugin/src/common.rs @@ -1,6 +1,8 @@ mod destination; mod map; +mod stream; mod utils; pub use destination::{Destination, Host, HostParseError}; pub use map::{Map, MapParseError}; +pub use stream::Stream; diff --git a/distant-plugin/src/common/stream.rs b/distant-plugin/src/common/stream.rs new file mode 100644 index 0000000..db24659 --- /dev/null +++ b/distant-plugin/src/common/stream.rs @@ -0,0 +1,30 @@ +use async_trait::async_trait; +use tokio::sync::mpsc; + +/// Interface to an asynchronous stream of items. +#[async_trait] +pub trait Stream: Send { + type Item: Send; + + /// Retrieves the next item from the stream, returning `None` if no more items are available + /// from the stream. + async fn next(&mut self) -> Option; +} + +#[async_trait] +impl Stream for mpsc::UnboundedReceiver { + type Item = T; + + async fn next(&mut self) -> Option { + self.recv().await + } +} + +#[async_trait] +impl Stream for mpsc::Receiver { + type Item = T; + + async fn next(&mut self) -> Option { + self.recv().await + } +}