Rewrote manager connection to use FramedTransport

pull/146/head
Chip Senkbeil 2 years ago
parent 642b57fdb0
commit 19ec95e978
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

@ -152,10 +152,18 @@ impl PostOffice<UntypedResponse<'static>> {
}
}
/// Error encountered when invoking [`try_recv`] for [`MailboxReceiver`].
pub enum MailboxTryNextError {
Empty,
Closed,
}
#[async_trait]
trait MailboxReceiver: Send + Sync {
type Output;
fn try_recv(&mut self) -> Result<Self::Output, MailboxTryNextError>;
async fn recv(&mut self) -> Option<Self::Output>;
fn close(&mut self);
@ -165,6 +173,14 @@ trait MailboxReceiver: Send + Sync {
impl<T: Send> MailboxReceiver for mpsc::Receiver<T> {
type Output = T;
fn try_recv(&mut self) -> Result<Self::Output, MailboxTryNextError> {
match mpsc::Receiver::try_recv(self) {
Ok(x) => Ok(x),
Err(mpsc::error::TryRecvError::Empty) => Err(MailboxTryNextError::Empty),
Err(mpsc::error::TryRecvError::Disconnected) => Err(MailboxTryNextError::Closed),
}
}
async fn recv(&mut self) -> Option<Self::Output> {
mpsc::Receiver::recv(self).await
}
@ -183,6 +199,13 @@ struct MappedMailboxReceiver<T, U> {
impl<T: Send, U: Send> MailboxReceiver for MappedMailboxReceiver<T, U> {
type Output = U;
fn try_recv(&mut self) -> Result<Self::Output, MailboxTryNextError> {
match self.rx.try_recv() {
Ok(x) => Ok((self.f)(x)),
Err(x) => Err(x),
}
}
async fn recv(&mut self) -> Option<Self::Output> {
let value = self.rx.recv().await?;
Some((self.f)(value))
@ -202,6 +225,16 @@ struct MappedOptMailboxReceiver<T, U> {
impl<T: Send, U: Send> MailboxReceiver for MappedOptMailboxReceiver<T, U> {
type Output = U;
fn try_recv(&mut self) -> Result<Self::Output, MailboxTryNextError> {
match self.rx.try_recv() {
Ok(x) => match (self.f)(x) {
Some(x) => Ok(x),
None => Err(MailboxTryNextError::Empty),
},
Err(x) => Err(x),
}
}
async fn recv(&mut self) -> Option<Self::Output> {
// Continually receive a new value and convert it to Option<U>
// until Option<U> == Some(U) or we receive None from our inner receiver
@ -233,6 +266,11 @@ impl<T> Mailbox<T> {
&self.id
}
/// Tries to receive the next value in mailbox without blocking or waiting async
pub fn try_next(&mut self) -> Result<T, MailboxTryNextError> {
self.rx.try_recv()
}
/// Receives next value in mailbox
pub async fn next(&mut self) -> Option<T> {
self.rx.recv().await

@ -45,6 +45,13 @@ impl Frame<'_> {
&self.item
}
/// Writes the frame to a new [`Vec`] of bytes, returning them on success
pub fn try_to_bytes(&self) -> io::Result<Vec<u8>> {
let mut bytes = BytesMut::new();
self.write(&mut bytes)?;
Ok(bytes.to_vec())
}
/// Writes the frame to the end of `dst`, including the header representing the length of the
/// item as part of the written bytes
pub fn write(&self, dst: &mut BytesMut) -> io::Result<()> {

@ -1,6 +1,6 @@
use crate::{
client::{Client, ReconnectStrategy, UntypedClient},
common::{ConnectionId, FramedTransport, InmemoryTransport},
common::{ConnectionId, FramedTransport, InmemoryTransport, UntypedRequest},
manager::data::{ManagerRequest, ManagerResponse},
};
use log::*;
@ -9,20 +9,18 @@ use std::{
io,
ops::{Deref, DerefMut},
};
use tokio::{sync::oneshot, task::JoinHandle};
use tokio::task::JoinHandle;
/// Represents a raw channel between a manager client and server. Underneath, this routes incoming
/// and outgoing data from a proxied server to an inmemory transport.
pub struct RawChannel {
transport: InmemoryTransport,
forward_task: JoinHandle<()>,
mailbox_task: JoinHandle<()>,
transport: FramedTransport<InmemoryTransport>,
task: JoinHandle<()>,
}
impl RawChannel {
pub fn abort(&self) {
self.forward_task.abort();
self.mailbox_task.abort();
self.task.abort();
}
/// Consumes this channel, returning a typed client wrapping the transport.
@ -37,10 +35,7 @@ impl RawChannel {
T: Send + Sync + Serialize + 'static,
U: Send + Sync + DeserializeOwned + 'static,
{
Client::spawn_inmemory(
FramedTransport::plain(self.transport),
ReconnectStrategy::Fail,
)
Client::spawn_inmemory(self.transport, ReconnectStrategy::Fail)
}
/// Consumes this channel, returning an untyped client wrapping the transport.
@ -51,15 +46,12 @@ impl RawChannel {
/// performed during separate connection and this merely wraps an inmemory transport that maps
/// to the primary connection.
pub fn into_untyped_client(self) -> UntypedClient {
UntypedClient::spawn_inmemory(
FramedTransport::plain(self.transport),
ReconnectStrategy::Fail,
)
UntypedClient::spawn_inmemory(self.transport, ReconnectStrategy::Fail)
}
}
impl Deref for RawChannel {
type Target = InmemoryTransport;
type Target = FramedTransport<InmemoryTransport>;
fn deref(&self) -> &Self::Target {
&self.transport
@ -100,56 +92,60 @@ impl RawChannel {
}?;
// Spawn our channel proxy transport
let (tx, mut rx, transport) = InmemoryTransport::make(1);
let (channel_close_tx, mut channel_close_rx) = oneshot::channel();
let mailbox_task = tokio::spawn(async move {
while let Some(response) = mailbox.next().await {
match response.payload {
ManagerResponse::Channel { data, .. } => {
if let Err(x) = tx.send(data).await {
error!("[Conn {connection_id} :: Chan {channel_id}] {x}");
}
}
ManagerResponse::ChannelClosed { .. } => {
let _ = channel_close_tx.send(());
break;
}
_ => continue,
}
}
});
let (mut proxy, transport) = FramedTransport::pair(1);
let mut manager_channel = client.clone_channel();
let forward_task = tokio::spawn(async move {
let task = tokio::spawn(async move {
loop {
tokio::select! {
_ = &mut channel_close_rx => { break }
data = rx.recv() => {
match data {
Some(data) => {
maybe_response = mailbox.next() => {
if maybe_response.is_none() {
debug!("[Conn {connection_id} :: Chan {channel_id}] Closing from no more responses");
break;
}
match maybe_response.unwrap().payload {
ManagerResponse::Channel { response, .. } => {
if let Err(x) = proxy.write_frame_for(&response).await {
error!(
"[Conn {connection_id} :: Chan {channel_id}] Write response failed: {x}"
);
}
}
ManagerResponse::ChannelClosed { .. } => {
break;
}
_ => continue,
}
}
result = proxy.read_frame_as::<UntypedRequest>() => {
match result {
Ok(Some(request)) => {
// NOTE: In this situation, we do not expect a response to this
// request (even if the server sends something back)
if let Err(x) = manager_channel
.fire(ManagerRequest::Channel {
id: channel_id,
data,
request,
})
.await
{
error!("[Conn {connection_id} :: Chan {channel_id}] {x}");
error!("[Conn {connection_id} :: Chan {channel_id}] Forward failed: {x}");
}
}
None => break,
Ok(None) => {
debug!("[Conn {connection_id} :: Chan {channel_id}] Closing from no more requests");
break;
}
Err(x) => {
error!("[Conn {connection_id} :: Chan {channel_id}] Read request failed: {x}");
}
}
}
}
}
});
Ok(RawChannel {
transport,
forward_task,
mailbox_task,
})
Ok(RawChannel { transport, task })
}
}

@ -1,5 +1,7 @@
use super::{ManagerAuthenticationId, ManagerChannelId};
use crate::common::{authentication::msg::AuthenticationResponse, ConnectionId, Destination, Map};
use crate::common::{
authentication::msg::AuthenticationResponse, ConnectionId, Destination, Map, UntypedRequest,
};
use derive_more::IsVariant;
use serde::{Deserialize, Serialize};
use strum::{AsRefStr, EnumDiscriminants, EnumIter, EnumMessage, EnumString};
@ -76,8 +78,8 @@ pub enum ManagerRequest {
/// Id of the channel
id: ManagerChannelId,
/// Raw data to send through the channel
data: Vec<u8>,
/// Untyped request to send through the channel
request: UntypedRequest<'static>,
},
/// Closes an open channel

@ -1,7 +1,9 @@
use super::{
ConnectionInfo, ConnectionList, ManagerAuthenticationId, ManagerCapabilities, ManagerChannelId,
};
use crate::common::{authentication::msg::Authentication, ConnectionId, Destination};
use crate::common::{
authentication::msg::Authentication, ConnectionId, Destination, UntypedResponse,
};
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
@ -45,8 +47,8 @@ pub enum ManagerResponse {
/// Id of the channel
id: ManagerChannelId,
/// Raw data to send through the channel
data: Vec<u8>,
/// Untyped response to send through the channel
response: UntypedResponse<'static>,
},
/// Indicates that a channel has been opened

@ -263,12 +263,12 @@ impl ServerHandler for ManagerServer {
"Connection does not exist",
)),
},
ManagerRequest::Channel { id, data } => {
ManagerRequest::Channel { id, request } => {
match local_data.channels.read().await.get(&id) {
// TODO: For now, we are NOT sending back a response to acknowledge
// a successful channel send. We could do this in order for
// the client to listen for a complete send, but is it worth it?
Some(channel) => match channel.send(data) {
Some(channel) => match channel.send(request) {
Ok(_) => return,
Err(x) => ManagerResponse::from(x),
},

@ -27,39 +27,32 @@ pub struct ManagerChannel {
}
impl ManagerChannel {
/// Returns the id associated with the channel.
pub fn id(&self) -> ManagerChannelId {
self.channel_id
}
pub fn send(&self, data: Vec<u8>) -> io::Result<()> {
let channel_id = self.channel_id;
let req = UntypedRequest::from_slice(&data)
.map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x))?
.into_owned();
/// Sends the untyped request to the server on the other side of the channel.
pub fn send(&self, req: UntypedRequest<'static>) -> io::Result<()> {
let id = self.channel_id;
self.tx
.send(Action::Write {
id: channel_id,
req,
})
.map_err(|x| {
io::Error::new(
io::ErrorKind::BrokenPipe,
format!("channel {channel_id} send failed: {x}"),
)
})
self.tx.send(Action::Write { id, req }).map_err(|x| {
io::Error::new(
io::ErrorKind::BrokenPipe,
format!("channel {id} send failed: {x}"),
)
})
}
/// Closes the channel, unregistering it with the connection.
pub fn close(&self) -> io::Result<()> {
let channel_id = self.channel_id;
self.tx
.send(Action::Unregister { id: channel_id })
.map_err(|x| {
io::Error::new(
io::ErrorKind::BrokenPipe,
format!("channel {channel_id} close failed: {x}"),
)
})
let id = self.channel_id;
self.tx.send(Action::Unregister { id }).map_err(|x| {
io::Error::new(
io::ErrorKind::BrokenPipe,
format!("channel {id} close failed: {x}"),
)
})
}
}
@ -204,7 +197,7 @@ async fn action_task(
if let Some(reply) = registered.get(&channel_id) {
let response = ManagerResponse::Channel {
id: channel_id,
data: res.to_bytes(),
response: res,
};
if let Err(x) = reply.send(response).await {
error!("[Conn {id}] {x}");

@ -26,7 +26,7 @@ impl ServerHandler for TestServerHandler {
}
#[test(tokio::test)]
async fn should_be_able_to_establish_a_single_connection_and_communicate() {
async fn should_be_able_to_establish_a_single_connection_and_communicate_with_a_manager() {
let (t1, t2) = InmemoryTransport::pair(100);
let mut config = Config::default();

Loading…
Cancel
Save