pull/146/head
Chip Senkbeil 2 years ago
parent 32df51336e
commit 3e5741050f
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

File diff suppressed because it is too large Load Diff

@ -1,5 +1,7 @@
use crate::common::{Request, Response};
use std::{convert, io, sync::Weak};
use crate::common::{Request, Response, UntypedRequest, UntypedResponse};
use log::*;
use serde::{de::DeserializeOwned, Serialize};
use std::{convert, fmt, io, marker::PhantomData, sync::Weak};
use tokio::{sync::mpsc, time::Duration};
mod mailbox;
@ -14,48 +16,198 @@ const CHANNEL_MAILBOX_CAPACITY: usize = 10000;
///
/// [`Client`]: crate::client::Client
pub struct Channel<T, U> {
/// Used to send requests to a server
pub(crate) tx: mpsc::Sender<Request<T>>,
/// Collection of mailboxes for receiving responses to requests
pub(crate) post_office: Weak<PostOffice<Response<U>>>,
inner: UntypedChannel,
_request: PhantomData<T>,
_response: PhantomData<U>,
}
// NOTE: Implemented manually to avoid needing clone to be defined on generic types
impl<T, U> Clone for Channel<T, U> {
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
post_office: Weak::clone(&self.post_office),
inner: self.inner.clone(),
_request: self._request,
_response: self._response,
}
}
}
impl<T, U> fmt::Debug for Channel<T, U> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Channel")
.field("tx", &self.inner.tx)
.field("post_office", &self.inner.post_office)
.field("_request", &self._request)
.field("_response", &self._response)
.finish()
}
}
impl<T, U> Channel<T, U>
where
T: Send + Sync,
U: Send + Sync + 'static,
T: Send + Sync + Serialize + 'static,
U: Send + Sync + DeserializeOwned + 'static,
{
/// Returns true if no more requests can be transferred
pub fn is_closed(&self) -> bool {
self.tx.is_closed()
self.inner.is_closed()
}
/// Consumes this channel, returning an untyped variant
pub fn into_untyped_channel(self) -> UntypedChannel {
self.inner
}
/// Sends a request and returns a mailbox that can receive one or more responses, failing if
/// unable to send a request or if the session's receiving line to the remote server has
/// already been severed
pub async fn mail(&mut self, req: impl Into<Request<T>>) -> io::Result<Mailbox<Response<U>>> {
let req = req.into();
Ok(self
.inner
.mail(req.into().to_untyped_request()?)
.await?
.map_opt(|res| match res.to_typed_response() {
Ok(res) => Some(res),
Err(x) => {
if log::log_enabled!(Level::Trace) {
trace!(
"Invalid response payload: {}",
String::from_utf8_lossy(&res.payload)
);
}
error!(
"Unable to parse response payload into {}: {x}",
std::any::type_name::<U>()
);
None
}
}))
}
/// Sends a request and returns a mailbox, timing out after duration has passed
pub async fn mail_timeout(
&mut self,
req: impl Into<Request<T>>,
duration: impl Into<Option<Duration>>,
) -> io::Result<Mailbox<Response<U>>> {
match duration.into() {
Some(duration) => tokio::time::timeout(duration, self.mail(req))
.await
.map_err(|x| io::Error::new(io::ErrorKind::TimedOut, x))
.and_then(convert::identity),
None => self.mail(req).await,
}
}
/// Sends a request and waits for a response, failing if unable to send a request or if
/// the session's receiving line to the remote server has already been severed
pub async fn send(&mut self, req: impl Into<Request<T>>) -> io::Result<Response<U>> {
// Send mail and get back a mailbox
let mut mailbox = self.mail(req).await?;
// Wait for first response, and then drop the mailbox
mailbox
.next()
.await
.ok_or_else(|| io::Error::from(io::ErrorKind::ConnectionAborted))
}
/// Sends a request and waits for a response, timing out after duration has passed
pub async fn send_timeout(
&mut self,
req: impl Into<Request<T>>,
duration: impl Into<Option<Duration>>,
) -> io::Result<Response<U>> {
match duration.into() {
Some(duration) => tokio::time::timeout(duration, self.send(req))
.await
.map_err(|x| io::Error::new(io::ErrorKind::TimedOut, x))
.and_then(convert::identity),
None => self.send(req).await,
}
}
/// Sends a request without waiting for a response; this method is able to be used even
/// if the session's receiving line to the remote server has been severed
pub async fn fire(&mut self, req: impl Into<Request<T>>) -> io::Result<()> {
self.inner.fire(req.into().to_untyped_request()?).await
}
/// Sends a request without waiting for a response, timing out after duration has passed
pub async fn fire_timeout(
&mut self,
req: impl Into<Request<T>>,
duration: impl Into<Option<Duration>>,
) -> io::Result<()> {
match duration.into() {
Some(duration) => tokio::time::timeout(duration, self.fire(req))
.await
.map_err(|x| io::Error::new(io::ErrorKind::TimedOut, x))
.and_then(convert::identity),
None => self.fire(req).await,
}
}
}
/// Represents a sender of requests tied to a session, holding onto a weak reference of
/// mailboxes to relay responses, meaning that once the [`Client`] is closed or dropped,
/// any sent request will no longer be able to receive responses.
///
/// In contrast to [`Channel`], this implementation is untyped, meaning that the payload of
/// requests and responses are not validated.
///
/// [`Client`]: crate::client::Client
#[derive(Debug)]
pub struct UntypedChannel {
/// Used to send requests to a server
pub(crate) tx: mpsc::Sender<UntypedRequest<'static>>,
/// Collection of mailboxes for receiving responses to requests
pub(crate) post_office: Weak<PostOffice<UntypedResponse<'static>>>,
}
// NOTE: Implemented manually to avoid needing clone to be defined on generic types
impl Clone for UntypedChannel {
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
post_office: Weak::clone(&self.post_office),
}
}
}
impl UntypedChannel {
/// Returns true if no more requests can be transferred
pub fn is_closed(&self) -> bool {
self.tx.is_closed()
}
/// Consumes this channel, returning a typed variant
pub fn into_typed_channel<T, U>(self) -> Channel<T, U> {
Channel {
inner: self,
_request: PhantomData,
_response: PhantomData,
}
}
/// Sends a request and returns a mailbox that can receive one or more responses, failing if
/// unable to send a request or if the session's receiving line to the remote server has
/// already been severed
pub async fn mail(
&mut self,
req: UntypedRequest<'_>,
) -> io::Result<Mailbox<UntypedResponse<'static>>> {
// First, create a mailbox using the request's id
let mailbox = Weak::upgrade(&self.post_office)
.ok_or_else(|| {
io::Error::new(
io::ErrorKind::NotConnected,
"Session's post office is no longer available",
"Channel's post office is no longer available",
)
})?
.make_mailbox(req.id.clone(), CHANNEL_MAILBOX_CAPACITY)
.make_mailbox(req.id.clone().into_owned(), CHANNEL_MAILBOX_CAPACITY)
.await;
// Second, send the request
@ -68,9 +220,9 @@ where
/// Sends a request and returns a mailbox, timing out after duration has passed
pub async fn mail_timeout(
&mut self,
req: impl Into<Request<T>>,
req: UntypedRequest<'_>,
duration: impl Into<Option<Duration>>,
) -> io::Result<Mailbox<Response<U>>> {
) -> io::Result<Mailbox<UntypedResponse<'static>>> {
match duration.into() {
Some(duration) => tokio::time::timeout(duration, self.mail(req))
.await
@ -82,7 +234,7 @@ where
/// Sends a request and waits for a response, failing if unable to send a request or if
/// the session's receiving line to the remote server has already been severed
pub async fn send(&mut self, req: impl Into<Request<T>>) -> io::Result<Response<U>> {
pub async fn send(&mut self, req: UntypedRequest<'_>) -> io::Result<UntypedResponse<'static>> {
// Send mail and get back a mailbox
let mut mailbox = self.mail(req).await?;
@ -96,9 +248,9 @@ where
/// Sends a request and waits for a response, timing out after duration has passed
pub async fn send_timeout(
&mut self,
req: impl Into<Request<T>>,
req: UntypedRequest<'_>,
duration: impl Into<Option<Duration>>,
) -> io::Result<Response<U>> {
) -> io::Result<UntypedResponse<'static>> {
match duration.into() {
Some(duration) => tokio::time::timeout(duration, self.send(req))
.await
@ -110,9 +262,9 @@ where
/// Sends a request without waiting for a response; this method is able to be used even
/// if the session's receiving line to the remote server has been severed
pub async fn fire(&mut self, req: impl Into<Request<T>>) -> io::Result<()> {
pub async fn fire(&mut self, req: UntypedRequest<'_>) -> io::Result<()> {
self.tx
.send(req.into())
.send(req.into_owned())
.await
.map_err(|x| io::Error::new(io::ErrorKind::BrokenPipe, x.to_string()))
}
@ -120,7 +272,7 @@ where
/// Sends a request without waiting for a response, timing out after duration has passed
pub async fn fire_timeout(
&mut self,
req: impl Into<Request<T>>,
req: UntypedRequest<'_>,
duration: impl Into<Option<Duration>>,
) -> io::Result<()> {
match duration.into() {
@ -136,103 +288,227 @@ where
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::time::Duration;
use test_log::test;
type TestChannel = Channel<u8, u8>;
type Setup = (
TestChannel,
mpsc::Receiver<Request<u8>>,
Arc<PostOffice<Response<u8>>>,
);
mod typed {
use super::*;
use std::sync::Arc;
use std::time::Duration;
use test_log::test;
type TestChannel = Channel<u8, u8>;
type Setup = (
TestChannel,
mpsc::Receiver<UntypedRequest<'static>>,
Arc<PostOffice<UntypedResponse<'static>>>,
);
fn setup(buffer: usize) -> Setup {
let post_office = Arc::new(PostOffice::default());
let (tx, rx) = mpsc::channel(buffer);
let channel = {
let post_office = Arc::downgrade(&post_office);
TestChannel { tx, post_office }
};
fn setup(buffer: usize) -> Setup {
let post_office = Arc::new(PostOffice::default());
let (tx, rx) = mpsc::channel(buffer);
let channel = {
let post_office = Arc::downgrade(&post_office);
UntypedChannel { tx, post_office }
};
(channel, rx, post_office)
}
(channel.into_typed_channel(), rx, post_office)
}
#[test(tokio::test)]
async fn mail_should_return_mailbox_that_receives_responses_until_post_office_drops_it() {
let (mut channel, _server, post_office) = setup(100);
#[test(tokio::test)]
async fn mail_should_return_mailbox_that_receives_responses_until_post_office_drops_it() {
let (mut channel, _server, post_office) = setup(100);
let req = Request::new(0);
let res = Response::new(req.id.clone(), 1);
let mut mailbox = channel.mail(req).await.unwrap();
// Send and receive first response
assert!(
post_office
.deliver_untyped_response(res.to_untyped_response().unwrap().into_owned())
.await,
"Failed to deliver: {res:?}"
);
assert_eq!(mailbox.next().await, Some(res.clone()));
// Send and receive second response
assert!(
post_office
.deliver_untyped_response(res.to_untyped_response().unwrap().into_owned())
.await,
"Failed to deliver: {res:?}"
);
assert_eq!(mailbox.next().await, Some(res.clone()));
// Trigger the mailbox to wait BEFORE closing our mailbox to ensure that
// we don't get stuck if the mailbox was already waiting
let next_task = tokio::spawn(async move { mailbox.next().await });
tokio::task::yield_now().await;
// Close our specific mailbox
post_office.cancel(&res.origin_id).await;
match next_task.await {
Ok(None) => {}
x => panic!("Unexpected response: {:?}", x),
}
}
let req = Request::new(0);
let res = Response::new(req.id.clone(), 1);
#[test(tokio::test)]
async fn send_should_wait_until_response_received() {
let (mut channel, _server, post_office) = setup(100);
let req = Request::new(0);
let res = Response::new(req.id.clone(), 1);
let (actual, _) = tokio::join!(
channel.send(req),
post_office
.deliver_untyped_response(res.to_untyped_response().unwrap().into_owned())
);
match actual {
Ok(actual) => assert_eq!(actual, res),
x => panic!("Unexpected response: {:?}", x),
}
}
let mut mailbox = channel.mail(req).await.unwrap();
#[test(tokio::test)]
async fn send_timeout_should_fail_if_response_not_received_in_time() {
let (mut channel, mut server, _post_office) = setup(100);
// Send and receive first response
assert!(
post_office.deliver_response(res.clone()).await,
"Failed to deliver: {res:?}"
);
assert_eq!(mailbox.next().await, Some(res.clone()));
let req = Request::new(0);
match channel.send_timeout(req, Duration::from_millis(30)).await {
Err(x) => assert_eq!(x.kind(), io::ErrorKind::TimedOut),
x => panic!("Unexpected response: {:?}", x),
}
// Send and receive second response
assert!(
post_office.deliver_response(res.clone()).await,
"Failed to deliver: {res:?}"
);
assert_eq!(mailbox.next().await, Some(res.clone()));
let _frame = server.recv().await.unwrap();
}
// Trigger the mailbox to wait BEFORE closing our mailbox to ensure that
// we don't get stuck if the mailbox was already waiting
let next_task = tokio::spawn(async move { mailbox.next().await });
tokio::task::yield_now().await;
#[test(tokio::test)]
async fn fire_should_send_request_and_not_wait_for_response() {
let (mut channel, mut server, _post_office) = setup(100);
// Close our specific mailbox
post_office.cancel(&res.origin_id).await;
let req = Request::new(0);
match channel.fire(req).await {
Ok(_) => {}
x => panic!("Unexpected response: {:?}", x),
}
match next_task.await {
Ok(None) => {}
x => panic!("Unexpected response: {:?}", x),
let _frame = server.recv().await.unwrap();
}
}
#[test(tokio::test)]
async fn send_should_wait_until_response_received() {
let (mut channel, _server, post_office) = setup(100);
mod untyped {
use super::*;
use std::sync::Arc;
use std::time::Duration;
use test_log::test;
type TestChannel = UntypedChannel;
type Setup = (
TestChannel,
mpsc::Receiver<UntypedRequest<'static>>,
Arc<PostOffice<UntypedResponse<'static>>>,
);
let req = Request::new(0);
let res = Response::new(req.id.clone(), 1);
fn setup(buffer: usize) -> Setup {
let post_office = Arc::new(PostOffice::default());
let (tx, rx) = mpsc::channel(buffer);
let channel = {
let post_office = Arc::downgrade(&post_office);
TestChannel { tx, post_office }
};
let (actual, _) =
tokio::join!(channel.send(req), post_office.deliver_response(res.clone()));
match actual {
Ok(actual) => assert_eq!(actual, res),
x => panic!("Unexpected response: {:?}", x),
(channel, rx, post_office)
}
}
#[test(tokio::test)]
async fn send_timeout_should_fail_if_response_not_received_in_time() {
let (mut channel, mut server, _post_office) = setup(100);
#[test(tokio::test)]
async fn mail_should_return_mailbox_that_receives_responses_until_post_office_drops_it() {
let (mut channel, _server, post_office) = setup(100);
let req = Request::new(0).to_untyped_request().unwrap().into_owned();
let res = Response::new(req.id.clone().into_owned(), 1)
.to_untyped_response()
.unwrap()
.into_owned();
let mut mailbox = channel.mail(req).await.unwrap();
// Send and receive first response
assert!(
post_office.deliver_untyped_response(res.clone()).await,
"Failed to deliver: {res:?}"
);
assert_eq!(mailbox.next().await, Some(res.clone()));
// Send and receive second response
assert!(
post_office.deliver_untyped_response(res.clone()).await,
"Failed to deliver: {res:?}"
);
assert_eq!(mailbox.next().await, Some(res.clone()));
// Trigger the mailbox to wait BEFORE closing our mailbox to ensure that
// we don't get stuck if the mailbox was already waiting
let next_task = tokio::spawn(async move { mailbox.next().await });
tokio::task::yield_now().await;
// Close our specific mailbox
post_office
.cancel(&res.origin_id.clone().into_owned())
.await;
match next_task.await {
Ok(None) => {}
x => panic!("Unexpected response: {:?}", x),
}
}
let req = Request::new(0);
match channel.send_timeout(req, Duration::from_millis(30)).await {
Err(x) => assert_eq!(x.kind(), io::ErrorKind::TimedOut),
x => panic!("Unexpected response: {:?}", x),
#[test(tokio::test)]
async fn send_should_wait_until_response_received() {
let (mut channel, _server, post_office) = setup(100);
let req = Request::new(0).to_untyped_request().unwrap().into_owned();
let res = Response::new(req.id.clone().into_owned(), 1)
.to_untyped_response()
.unwrap()
.into_owned();
let (actual, _) = tokio::join!(
channel.send(req),
post_office.deliver_untyped_response(res.clone())
);
match actual {
Ok(actual) => assert_eq!(actual, res),
x => panic!("Unexpected response: {:?}", x),
}
}
let _frame = server.recv().await.unwrap();
}
#[test(tokio::test)]
async fn send_timeout_should_fail_if_response_not_received_in_time() {
let (mut channel, mut server, _post_office) = setup(100);
#[test(tokio::test)]
async fn fire_should_send_request_and_not_wait_for_response() {
let (mut channel, mut server, _post_office) = setup(100);
let req = Request::new(0).to_untyped_request().unwrap().into_owned();
match channel.send_timeout(req, Duration::from_millis(30)).await {
Err(x) => assert_eq!(x.kind(), io::ErrorKind::TimedOut),
x => panic!("Unexpected response: {:?}", x),
}
let req = Request::new(0);
match channel.fire(req).await {
Ok(_) => {}
x => panic!("Unexpected response: {:?}", x),
let _frame = server.recv().await.unwrap();
}
let _frame = server.recv().await.unwrap();
#[test(tokio::test)]
async fn fire_should_send_request_and_not_wait_for_response() {
let (mut channel, mut server, _post_office) = setup(100);
let req = Request::new(0).to_untyped_request().unwrap().into_owned();
match channel.fire(req).await {
Ok(_) => {}
x => panic!("Unexpected response: {:?}", x),
}
let _frame = server.recv().await.unwrap();
}
}
}

@ -1,4 +1,5 @@
use crate::common::{Id, Response};
use crate::common::{Id, Response, UntypedResponse};
use async_trait::async_trait;
use std::{
collections::HashMap,
sync::{Arc, Weak},
@ -10,7 +11,7 @@ use tokio::{
time,
};
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct PostOffice<T> {
mailboxes: Arc<Mutex<HashMap<Id, mpsc::Sender<T>>>>,
}
@ -60,7 +61,10 @@ where
let (tx, rx) = mpsc::channel(buffer);
self.mailboxes.lock().await.insert(id.clone(), tx);
Mailbox { id, rx }
Mailbox {
id,
rx: Box::new(rx),
}
}
/// Delivers some value to appropriate mailbox, returning false if no mailbox is found
@ -110,13 +114,87 @@ where
}
}
impl PostOffice<UntypedResponse<'static>> {
/// Delivers some response to appropriate mailbox, returning false if no mailbox is found
/// for the response's origin or if the mailbox is no longer receiving values
pub async fn deliver_untyped_response(&self, res: UntypedResponse<'static>) -> bool {
self.deliver(&res.origin_id.clone().into_owned(), res).await
}
}
#[async_trait]
trait MailboxReceiver: Send + Sync {
type Output;
async fn recv(&mut self) -> Option<Self::Output>;
fn close(&mut self);
}
#[async_trait]
impl<T: Send> MailboxReceiver for mpsc::Receiver<T> {
type Output = T;
async fn recv(&mut self) -> Option<Self::Output> {
mpsc::Receiver::recv(self).await
}
fn close(&mut self) {
mpsc::Receiver::close(self)
}
}
struct MappedMailboxReceiver<T, U> {
rx: Box<dyn MailboxReceiver<Output = T>>,
f: Box<dyn Fn(T) -> U + Send + Sync>,
}
#[async_trait]
impl<T: Send, U: Send> MailboxReceiver for MappedMailboxReceiver<T, U> {
type Output = U;
async fn recv(&mut self) -> Option<Self::Output> {
let value = self.rx.recv().await?;
Some((self.f)(value))
}
fn close(&mut self) {
self.rx.close()
}
}
struct MappedOptMailboxReceiver<T, U> {
rx: Box<dyn MailboxReceiver<Output = T>>,
f: Box<dyn Fn(T) -> Option<U> + Send + Sync>,
}
#[async_trait]
impl<T: Send, U: Send> MailboxReceiver for MappedOptMailboxReceiver<T, U> {
type Output = U;
async fn recv(&mut self) -> Option<Self::Output> {
// Continually receive a new value and convert it to Option<U>
// until Option<U> == Some(U) or we receive None from our inner receiver
loop {
let value = self.rx.recv().await?;
if let Some(x) = (self.f)(value) {
return Some(x);
}
}
}
fn close(&mut self) {
self.rx.close()
}
}
/// Represents a destination for responses
pub struct Mailbox<T> {
/// Represents id associated with the mailbox
id: Id,
/// Underlying mailbox storage
rx: mpsc::Receiver<T>,
rx: Box<dyn MailboxReceiver<Output = T>>,
}
impl<T> Mailbox<T> {
@ -144,3 +222,31 @@ impl<T> Mailbox<T> {
self.rx.close()
}
}
impl<T: Send + 'static> Mailbox<T> {
/// Maps the results of each mailbox value into a new type `U`
pub fn map<U: Send + 'static>(self, f: impl Fn(T) -> U + Send + Sync + 'static) -> Mailbox<U> {
Mailbox {
id: self.id,
rx: Box::new(MappedMailboxReceiver {
rx: self.rx,
f: Box::new(f),
}),
}
}
/// Maps the results of each mailbox value into a new type `U` by returning an `Option<U>`
/// where the option is `None` in the case that `T` cannot be converted into `U`
pub fn map_opt<U: Send + 'static>(
self,
f: impl Fn(T) -> Option<U> + Send + Sync + 'static,
) -> Mailbox<U> {
Mailbox {
id: self.id,
rx: Box::new(MappedOptMailboxReceiver {
rx: self.rx,
f: Box::new(f),
}),
}
}
}

@ -112,7 +112,7 @@ impl ManagerServer {
}
.to_lowercase();
let transport = {
let client = {
let handler = self.config.connect_handlers.get(&scheme).ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
@ -124,7 +124,7 @@ impl ManagerServer {
.await?
};
let connection = ManagerConnection::new(destination, options, transport);
let connection = ManagerConnection::new(destination, options, client);
let id = connection.id;
self.connections.write().await.insert(id, connection);
Ok(id)

@ -1,4 +1,5 @@
use crate::{
client::UntypedClient,
common::{
ConnectionId, Destination, FramedTransport, Interest, Map, Transport, UntypedRequest,
UntypedResponse,
@ -60,18 +61,14 @@ impl ManagerChannel {
}
impl ManagerConnection {
pub fn new<T: Transport + 'static>(
destination: Destination,
options: Map,
transport: FramedTransport<T>,
) -> Self {
pub fn new(destination: Destination, options: Map, client: UntypedClient) -> Self {
let connection_id = rand::random();
let (tx, rx) = mpsc::unbounded_channel();
let (outgoing_tx, outgoing_rx) = mpsc::unbounded_channel();
let transport_task = tokio::spawn(transport_task(
connection_id,
transport,
client,
outgoing_rx,
tx.clone(),
Duration::from_millis(50),
@ -141,18 +138,15 @@ enum Action {
/// * `transport` - the fully-authenticated transport.
/// * `rx` - used to receive outgoing data to send through the connection.
/// * `tx` - used to send new [`Action`]s to process.
async fn transport_task<T: Transport>(
async fn transport_task(
id: ConnectionId,
mut transport: FramedTransport<T>,
mut client: UntypedClient,
mut rx: mpsc::UnboundedReceiver<Vec<u8>>,
tx: mpsc::UnboundedSender<Action>,
sleep_duration: Duration,
) {
loop {
let ready = match transport
.ready(Interest::READABLE | Interest::WRITABLE)
.await
{
let ready = match client.ready(Interest::READABLE | Interest::WRITABLE).await {
Ok(ready) => ready,
Err(x) => {
error!("[Conn {id}] Querying ready status failed: {x}");
@ -166,7 +160,7 @@ async fn transport_task<T: Transport>(
// If transport is readable, attempt to read a frame and forward it to our action task
if ready.is_readable() {
match transport.try_read_frame() {
match client.try_read_frame() {
Ok(Some(frame)) => {
if let Err(x) = tx.send(Action::Read {
data: frame.into_item().into_owned(),
@ -188,7 +182,7 @@ async fn transport_task<T: Transport>(
// If transport is writable, check if we have something to write
if ready.is_writable() {
if let Ok(data) = rx.try_recv() {
match transport.try_write_frame(data) {
match client.try_write_frame(data) {
Ok(()) => (),
Err(x) if x.kind() == io::ErrorKind::WouldBlock => write_blocked = true,
Err(x) => error!("[Conn {id}] Send failed: {x}"),
@ -200,7 +194,7 @@ async fn transport_task<T: Transport>(
// 1. When flush did not write any bytes, which can happen when the buffer
// is empty
// 2. When the call to write bytes blocks
match transport.try_flush() {
match client.try_flush() {
Ok(0) => write_blocked = true,
Ok(_) => (),
Err(x) if x.kind() == io::ErrorKind::WouldBlock => write_blocked = true,

@ -1,4 +1,5 @@
use crate::common::{authentication::Authenticator, Destination, FramedTransport, Map, Transport};
use crate::client::UntypedClient;
use crate::common::{authentication::Authenticator, Destination, Map};
use async_trait::async_trait;
use std::{future::Future, io};
@ -86,7 +87,7 @@ macro_rules! boxed_launch_handler {
/// * `options` is provided to include extra information needed to establish the connection.
/// * `authenticator` is provided to support a challenge-based authentication while connecting.
///
/// Returns a [`FramedTransport`] representing the connection.
/// Returns an [`UntypedClient`] representing the connection.
#[async_trait]
pub trait ConnectHandler: Send + Sync {
async fn connect(
@ -94,21 +95,21 @@ pub trait ConnectHandler: Send + Sync {
destination: &Destination,
options: &Map,
authenticator: &mut dyn Authenticator,
) -> io::Result<FramedTransport<Box<dyn Transport>>>;
) -> io::Result<UntypedClient>;
}
#[async_trait]
impl<F, R> ConnectHandler for F
where
F: Fn(&Destination, &Map, &mut dyn Authenticator) -> R + Send + Sync + 'static,
R: Future<Output = io::Result<FramedTransport<Box<dyn Transport>>>> + Send + 'static,
R: Future<Output = io::Result<UntypedClient>> + Send + 'static,
{
async fn connect(
&self,
destination: &Destination,
options: &Map,
authenticator: &mut dyn Authenticator,
) -> io::Result<FramedTransport<Box<dyn Transport>>> {
) -> io::Result<UntypedClient> {
self(destination, options, authenticator).await
}
}

Loading…
Cancel
Save