More progress

pull/146/head
Chip Senkbeil 2 years ago
parent d2a5781a40
commit 69ee27f03b
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

@ -1,6 +1,6 @@
use crate::{
client::Client,
common::{ConnectionId, Destination, Map},
common::{authentication::AuthHandler, ConnectionId, Destination, Map},
manager::data::{
ConnectionInfo, ConnectionList, ManagerCapabilities, ManagerRequest, ManagerResponse,
},
@ -15,13 +15,17 @@ pub use channel::*;
pub type ManagerClient = Client<ManagerRequest, ManagerResponse>;
impl ManagerClient {
/// Request that the manager launches a new server at the given `destination`
/// with `options` being passed for destination-specific details, returning the new
/// `destination` of the spawned server.
/// Request that the manager launches a new server at the given `destination` with `options`
/// being passed for destination-specific details, returning the new `destination` of the
/// spawned server.
///
/// The provided `handler` will be used for any authentication requirements when connecting to
/// the remote machine to spawn the server.
pub async fn launch(
&mut self,
destination: impl Into<Destination>,
options: impl Into<Map>,
handler: impl AuthHandler,
) -> io::Result<Destination> {
let destination = Box::new(destination.into());
let options = options.into();
@ -44,29 +48,40 @@ impl ManagerClient {
}
/// Request that the manager establishes a new connection at the given `destination`
/// with `options` being passed for destination-specific details
/// with `options` being passed for destination-specific details.
///
/// The provided `handler` will be used for any authentication requirements when connecting to
/// the server.
pub async fn connect(
&mut self,
destination: impl Into<Destination>,
options: impl Into<Map>,
handler: impl AuthHandler,
) -> io::Result<ConnectionId> {
let destination = Box::new(destination.into());
let options = options.into();
trace!("connect({}, {})", destination, options);
let res = self
.send(ManagerRequest::Connect {
let mailbox = self
.mail(ManagerRequest::Connect {
destination,
options,
})
.await?;
match res.payload {
ManagerResponse::Connected { id } => Ok(id),
ManagerResponse::Error(x) => Err(x.into()),
x => Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Got unexpected response: {:?}", x),
)),
// Continue to process authentication challenges and other details until we are either
// connected or fail
while let Some(res) = mailbox.next().await {
match res.payload {
ManagerResponse::Connected { id } => return Ok(id),
ManagerResponse::Error(x) => return Err(x.into()),
x => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Got unexpected response: {:?}", x),
))
}
}
}
}

@ -1,3 +1,5 @@
pub type ManagerChannelId = u32;
mod capabilities;
pub use capabilities::*;

@ -1,11 +1,11 @@
use crate::common::{ConnectionId, Destination, Map};
use super::ManagerChannelId;
use crate::common::{authentication::msg::AuthenticationResponse, ConnectionId, Destination, Map};
use derive_more::IsVariant;
use serde::{Deserialize, Serialize};
use strum::{AsRefStr, EnumDiscriminants, EnumIter, EnumMessage, EnumString};
#[allow(clippy::large_enum_variant)]
#[derive(Clone, Debug, EnumDiscriminants, Serialize, Deserialize)]
#[cfg_attr(feature = "clap", derive(clap::Subcommand))]
#[strum_discriminants(derive(
AsRefStr,
strum::Display,
@ -38,7 +38,6 @@ pub enum ManagerRequest {
destination: Box<Destination>,
/// Additional options specific to the connection
#[cfg_attr(feature = "clap", clap(short, long, action = clap::ArgAction::Append))]
options: Map,
},
@ -49,12 +48,20 @@ pub enum ManagerRequest {
destination: Box<Destination>,
/// Additional options specific to the connection
#[cfg_attr(feature = "clap", clap(short, long, action = clap::ArgAction::Append))]
options: Map,
},
/// Opens a channel for communication with a server
#[cfg_attr(feature = "clap", clap(skip))]
/// Submit some authentication message for the manager to use with an active connection
#[strum_discriminants(strum(message = "Supports authenticating with a remote server"))]
Authenticate {
/// Id of the authentication request that is being responded to
id: String,
/// Response being sent to some active connection
msg: AuthenticationResponse,
},
/// Opens a channel for communication with an already-connected server
#[strum_discriminants(strum(message = "Supports opening a channel with a remote server"))]
OpenChannel {
/// Id of the connection
@ -62,24 +69,22 @@ pub enum ManagerRequest {
},
/// Sends data through channel
#[cfg_attr(feature = "clap", clap(skip))]
#[strum_discriminants(strum(
message = "Supports sending data through a channel with a remote server"
))]
Channel {
/// Id of the channel
id: ChannelId,
id: ManagerChannelId,
/// Raw data to send through the channel
data: Vec<u8>,
},
/// Closes an open channel
#[cfg_attr(feature = "clap", clap(skip))]
#[strum_discriminants(strum(message = "Supports closing a channel with a remote server"))]
CloseChannel {
/// Id of the channel to close
id: ChannelId,
id: ManagerChannelId,
},
/// Retrieve information about a specific connection

@ -1,10 +1,13 @@
use super::{ConnectionInfo, ConnectionList, Error, ManagerCapabilities};
use crate::common::{ConnectionId, Destination};
use super::{ConnectionInfo, ConnectionList, Error, ManagerCapabilities, ManagerChannelId};
use crate::common::{authentication::msg::Authentication, ConnectionId, Destination};
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(rename_all = "snake_case", deny_unknown_fields, tag = "type")]
pub enum ManagerResponse {
/// Acknowledgement of a response being processed
Ok,
/// Acknowledgement that a connection was killed
Killed,
@ -23,6 +26,15 @@ pub enum ManagerResponse {
/// Confirmation of a connection being established
Connected { id: ConnectionId },
/// Authentication information being sent to a client
Authenticate {
/// Id tied to authentication information in case a response is needed
id: String,
/// Authentication message
msg: Authentication,
},
/// Information about a specific connection
Info(ConnectionInfo),
@ -32,7 +44,7 @@ pub enum ManagerResponse {
/// Forward a response back to a specific channel that made a request
Channel {
/// Id of the channel
id: ChannelId,
id: ManagerChannelId,
/// Raw data to send through the channel
data: Vec<u8>,
@ -41,12 +53,12 @@ pub enum ManagerResponse {
/// Indicates that a channel has been opened
ChannelOpened {
/// Id of the channel
id: ChannelId,
id: ManagerChannelId,
},
/// Indicates that a channel has been closed
ChannelClosed {
/// Id of the channel
id: ChannelId,
id: ManagerChannelId,
},
}

@ -1,6 +1,8 @@
use crate::{
client::Client,
common::{Listener, Map, MpscListener, Request, Response},
common::{
authentication::msg::AuthenticationResponse, Listener, Map, MpscListener, Request, Response,
},
manager::{
ChannelId, ConnectionId, ConnectionInfo, ConnectionList, Destination, ManagerCapabilities,
ManagerRequest, ManagerResponse,
@ -11,7 +13,7 @@ use async_trait::async_trait;
use log::*;
use std::{collections::HashMap, io, sync::Arc};
use tokio::{
sync::{mpsc, Mutex, RwLock},
sync::{mpsc, oneshot, Mutex, RwLock},
task::JoinHandle,
};
@ -31,6 +33,9 @@ pub struct ManagerServer {
/// Mapping of connection id -> connection
connections: RwLock<HashMap<ConnectionId, ManagerConnection>>,
/// Mapping of auth id -> callback
authentication: RwLock<HashMap<String, oneshot::Sender<AuthenticationResponse>>>,
}
impl ManagerServer {
@ -41,6 +46,7 @@ impl ManagerServer {
Server::new().handler(Self {
config,
connections: RwLock::new(HashMap::new()),
authentication: RwLock::new(HashMap::new()),
})
}
@ -48,19 +54,7 @@ impl ManagerServer {
/// and authentication client (if needed) to retrieve additional information needed to
/// enter the destination prior to starting the server, returning the destination of the
/// launched server
async fn launch(
&self,
destination: Destination,
options: Map,
auth: Option<&mut AuthClient>,
) -> io::Result<Destination> {
let auth = auth.ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
"Authentication client not initialized",
)
})?;
async fn launch(&self, destination: Destination, options: Map) -> io::Result<Destination> {
let scheme = match destination.scheme.as_deref() {
Some(scheme) => {
trace!("Using scheme {}", scheme);
@ -92,19 +86,7 @@ impl ManagerServer {
/// Connects to a new server at the specified `destination` using the given `options` information
/// and authentication client (if needed) to retrieve additional information needed to
/// establish the connection to the server
async fn connect(
&self,
destination: Destination,
options: Map,
auth: Option<&mut AuthClient>,
) -> io::Result<ConnectionId> {
let auth = auth.ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
"Authentication client not initialized",
)
})?;
async fn connect(&self, destination: Destination, options: Map) -> io::Result<ConnectionId> {
let scheme = match destination.scheme.as_deref() {
Some(scheme) => {
trace!("Using scheme {}", scheme);
@ -209,35 +191,27 @@ impl ServerHandler for ManagerServer {
ManagerRequest::Launch {
destination,
options,
} => {
let mut auth = match local_data.auth_client.as_ref() {
Some(client) => Some(client.lock().await),
None => None,
};
match self
.launch(*destination, options, auth.as_deref_mut())
.await
{
Ok(destination) => ManagerResponse::Launched { destination },
Err(x) => ManagerResponse::Error(x.into()),
}
}
} => match self.launch(*destination, options).await {
Ok(destination) => ManagerResponse::Launched { destination },
Err(x) => ManagerResponse::Error(x.into()),
},
ManagerRequest::Connect {
destination,
options,
} => {
let mut auth = match local_data.auth_client.as_ref() {
Some(client) => Some(client.lock().await),
None => None,
};
match self
.connect(*destination, options, auth.as_deref_mut())
.await
{
Ok(id) => ManagerResponse::Connected { id },
Err(x) => ManagerResponse::Error(x.into()),
} => match self.connect(*destination, options).await {
Ok(id) => ManagerResponse::Connected { id },
Err(x) => ManagerResponse::Error(x.into()),
},
ManagerRequest::Authenticate { id, msg } => {
match self.authentication.write().await.remove(&id) {
Some(cb) => match cb.send(msg) {
Ok(_) => ManagerResponse::Ok,
Err(x) => ManagerResponse::Error(x.into()),
},
None => ManagerResponse::Error(
io::Error::new(io::ErrorKind::InvalidInput, "Invalid authenticate id")
.into(),
),
}
}
ManagerRequest::OpenChannel { id } => match self.connections.read().await.get(&id) {

@ -3,7 +3,7 @@ use crate::{
ConnectionId, Destination, FramedTransport, Interest, Map, Request, Transport,
UntypedRequest, UntypedResponse,
},
manager::data::ManagerResponse,
manager::data::{ManagerChannelId, ManagerResponse},
server::{ServerRef, ServerReply},
};
use log::*;
@ -16,19 +16,19 @@ pub struct ManagerConnection {
pub id: ConnectionId,
pub destination: Destination,
pub options: Map,
tx: mpsc::Sender<Action>,
tx: mpsc::UnboundedSender<Action>,
transport_task: JoinHandle<()>,
action_task: JoinHandle<()>,
}
#[derive(Clone)]
pub struct ManagerChannel {
channel_id: ChannelId,
channel_id: ManagerChannelId,
tx: mpsc::Sender<Action>,
}
impl ManagerChannel {
pub fn id(&self) -> ChannelId {
pub fn id(&self) -> ManagerChannelId {
self.channel_id
}
@ -63,13 +63,23 @@ impl ManagerChannel {
}
impl ManagerConnection {
pub fn new<T: Transport>(
pub fn new<T: Transport + Send + Sync + 'static>(
destination: Destination,
options: Map,
transport: FramedTransport<T>,
) -> Self {
let connection_id = rand::random();
let (tx, mut rx) = mpsc::channel(1);
let (tx, mut rx) = mpsc::unbounded_channel();
let (outgoing_tx, outgoing_rx) = mpsc::unbounded_channel();
let transport_task = tokio::spawn(transport_task(
connection_id,
transport,
outgoing_rx,
tx.clone(),
Duration::from_millis(50),
));
let action_task = tokio::spawn(action_task(connection_id, rx, outgoing_tx));
Self {
id: connection_id,
@ -91,7 +101,6 @@ impl ManagerConnection {
id: channel_id,
reply,
})
.await
.map_err(|x| {
io::Error::new(
io::ErrorKind::BrokenPipe,
@ -114,12 +123,12 @@ impl Drop for ManagerConnection {
enum Action {
Register {
id: ChannelId,
id: ManagerChannelId,
reply: ServerReply<ManagerResponse>,
},
Unregister {
id: ChannelId,
id: ManagerChannelId,
},
Read {
@ -127,7 +136,7 @@ enum Action {
},
Write {
id: ChannelId,
id: ManagerChannelId,
data: Vec<u8>,
},
}
@ -138,11 +147,11 @@ enum Action {
/// * `transport` - the fully-authenticated transport.
/// * `rx` - used to receive outgoing data to send through the connection.
/// * `tx` - used to send new [`Action`]s to process.
async fn transport_task<T>(
async fn transport_task<T: Transport>(
id: ConnectionId,
transport: FramedTransport<T>,
mut transport: FramedTransport<T>,
mut rx: mpsc::UnboundedReceiver<Vec<u8>>,
mut tx: mpsc::UnboundedSender<Action>,
tx: mpsc::UnboundedSender<Action>,
sleep_duration: Duration,
) {
loop {
@ -165,12 +174,9 @@ async fn transport_task<T>(
if ready.is_readable() {
match transport.try_read_frame() {
Ok(Some(frame)) => {
if let Err(x) = tx
.send(Action::Read {
data: frame.into_item().into_owned(),
})
.await
{
if let Err(x) = tx.send(Action::Read {
data: frame.into_item().into_owned(),
}) {
error!("[Conn {id}] Failed to forward frame: {x}");
}
}
@ -226,7 +232,7 @@ async fn transport_task<T>(
async fn action_task(
id: ConnectionId,
mut rx: mpsc::UnboundedReceiver<Action>,
mut tx: mpsc::UnboundedSender<Vec<u8>>,
tx: mpsc::UnboundedSender<Vec<u8>>,
) {
let mut registered = HashMap::new();
@ -252,8 +258,8 @@ async fn action_task(
// update the origin id to match the request id only
let channel_id = match response.origin_id.split_once('_') {
Some((cid_str, oid_str)) => {
if let Ok(cid) = cid_str.parse::<ChannelId>() {
response.set_origin_id(oid_str);
if let Ok(cid) = cid_str.parse::<ManagerChannelId>() {
response.set_origin_id(oid_str.to_string());
cid
} else {
continue;
@ -286,7 +292,7 @@ async fn action_task(
// the response containing this in the origin id
request.set_id(format!("{id}_{}", request.id));
if let Err(x) = tx.send(request.to_bytes()).await {
if let Err(x) = tx.send(request.to_bytes()) {
error!("[Conn {id}] {x}");
}
}

Loading…
Cancel
Save