Refactor client interface to return a boxed stream

feat/RusshSupport
Chip Senkbeil 7 months ago
parent 2098edb7ca
commit bdb07a5ac8
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

@ -8,27 +8,29 @@ use tokio::sync::mpsc;
use crate::api::{
Api, Ctx, FileSystemApi, ProcessApi, SearchApi, SystemInfoApi, VersionApi, WatchApi,
};
use crate::common::Stream;
/// Full API for a distant-compatible client.
#[async_trait]
pub trait Client {
/// Sends a request and returns a stream of responses, failing if unable to send a request or
/// if the session's receiving line to the remote server has already been severed.
async fn mail(&mut self, request: Request) -> io::Result<mpsc::UnboundedReceiver<Response>>;
async fn send(&mut self, request: Request) -> io::Result<Box<dyn Stream<Item = Response>>>;
/// Sends a request and waits for a response, failing if unable to send a request or if
/// Sends a request and waits for a single response, failing if unable to send a request or if
/// the session's receiving line to the remote server has already been severed.
async fn send(&mut self, request: Request) -> io::Result<Response> {
let mut rx = self.mail(request).await?;
rx.recv()
async fn send_once(&mut self, request: Request) -> io::Result<Response> {
self.send(request)
.await?
.next()
.await
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Channel has closed"))
}
/// Sends a request without waiting for a response; this method is able to be used even
/// Sends a request without waiting for any response; this method is able to be used even
/// if the session's receiving line to the remote server has been severed.
async fn fire(&mut self, request: Request) -> io::Result<()> {
let _ = self.send(request).await?;
let _ = self.send_once(request).await?;
Ok(())
}
}
@ -52,7 +54,7 @@ impl<T: Api> ClientBridge<T> {
#[async_trait]
impl<T: Api + 'static> Client for ClientBridge<T> {
async fn mail(&mut self, request: Request) -> io::Result<mpsc::UnboundedReceiver<Response>> {
async fn send(&mut self, request: Request) -> io::Result<Box<dyn Stream<Item = Response>>> {
#[derive(Clone, Debug)]
struct __Ctx(u32, mpsc::UnboundedSender<Response>);
@ -87,7 +89,7 @@ impl<T: Api + 'static> Client for ClientBridge<T> {
}
});
Ok(rx)
Ok(Box::new(rx))
}
}

@ -1,6 +1,8 @@
mod destination;
mod map;
mod stream;
mod utils;
pub use destination::{Destination, Host, HostParseError};
pub use map::{Map, MapParseError};
pub use stream::Stream;

@ -0,0 +1,30 @@
use async_trait::async_trait;
use tokio::sync::mpsc;
/// Interface to an asynchronous stream of items.
#[async_trait]
pub trait Stream: Send {
type Item: Send;
/// Retrieves the next item from the stream, returning `None` if no more items are available
/// from the stream.
async fn next(&mut self) -> Option<Self::Item>;
}
#[async_trait]
impl<T: Send> Stream for mpsc::UnboundedReceiver<T> {
type Item = T;
async fn next(&mut self) -> Option<Self::Item> {
self.recv().await
}
}
#[async_trait]
impl<T: Send> Stream for mpsc::Receiver<T> {
type Item = T;
async fn next(&mut self) -> Option<Self::Item> {
self.recv().await
}
}
Loading…
Cancel
Save