|
|
|
@ -1,11 +1,12 @@
|
|
|
|
|
use crate::{
|
|
|
|
|
client::Client,
|
|
|
|
|
common::{
|
|
|
|
|
authentication::msg::AuthenticationResponse, Listener, Map, MpscListener, Request, Response,
|
|
|
|
|
authentication::msg::AuthenticationResponse, ConnectionId, Destination, Listener, Map,
|
|
|
|
|
MpscListener, Request, Response,
|
|
|
|
|
},
|
|
|
|
|
manager::{
|
|
|
|
|
ChannelId, ConnectionId, ConnectionInfo, ConnectionList, Destination, ManagerCapabilities,
|
|
|
|
|
ManagerRequest, ManagerResponse,
|
|
|
|
|
ConnectionInfo, ConnectionList, ManagerAuthenticationId, ManagerCapabilities,
|
|
|
|
|
ManagerChannelId, ManagerRequest, ManagerResponse,
|
|
|
|
|
},
|
|
|
|
|
server::{Server, ServerCtx, ServerHandler},
|
|
|
|
|
};
|
|
|
|
@ -17,6 +18,9 @@ use tokio::{
|
|
|
|
|
task::JoinHandle,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
mod authentication;
|
|
|
|
|
pub use authentication::*;
|
|
|
|
|
|
|
|
|
|
mod config;
|
|
|
|
|
pub use config::*;
|
|
|
|
|
|
|
|
|
@ -35,7 +39,8 @@ pub struct ManagerServer {
|
|
|
|
|
connections: RwLock<HashMap<ConnectionId, ManagerConnection>>,
|
|
|
|
|
|
|
|
|
|
/// Mapping of auth id -> callback
|
|
|
|
|
authentication: RwLock<HashMap<String, oneshot::Sender<AuthenticationResponse>>>,
|
|
|
|
|
registry:
|
|
|
|
|
Arc<RwLock<HashMap<ManagerAuthenticationId, oneshot::Sender<AuthenticationResponse>>>>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl ManagerServer {
|
|
|
|
@ -46,7 +51,7 @@ impl ManagerServer {
|
|
|
|
|
Server::new().handler(Self {
|
|
|
|
|
config,
|
|
|
|
|
connections: RwLock::new(HashMap::new()),
|
|
|
|
|
authentication: RwLock::new(HashMap::new()),
|
|
|
|
|
registry: Arc::new(RwLock::new(HashMap::new())),
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -54,7 +59,12 @@ impl ManagerServer {
|
|
|
|
|
/// and authentication client (if needed) to retrieve additional information needed to
|
|
|
|
|
/// enter the destination prior to starting the server, returning the destination of the
|
|
|
|
|
/// launched server
|
|
|
|
|
async fn launch(&self, destination: Destination, options: Map) -> io::Result<Destination> {
|
|
|
|
|
async fn launch(
|
|
|
|
|
&self,
|
|
|
|
|
destination: Destination,
|
|
|
|
|
options: Map,
|
|
|
|
|
mut authenticator: ManagerAuthenticator,
|
|
|
|
|
) -> io::Result<Destination> {
|
|
|
|
|
let scheme = match destination.scheme.as_deref() {
|
|
|
|
|
Some(scheme) => {
|
|
|
|
|
trace!("Using scheme {}", scheme);
|
|
|
|
@ -77,7 +87,9 @@ impl ManagerServer {
|
|
|
|
|
format!("No launch handler registered for {}", scheme),
|
|
|
|
|
)
|
|
|
|
|
})?;
|
|
|
|
|
handler.launch(&destination, &options, auth).await?
|
|
|
|
|
handler
|
|
|
|
|
.launch(&destination, &options, &mut authenticator)
|
|
|
|
|
.await?
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
Ok(credentials)
|
|
|
|
@ -86,7 +98,12 @@ impl ManagerServer {
|
|
|
|
|
/// Connects to a new server at the specified `destination` using the given `options` information
|
|
|
|
|
/// and authentication client (if needed) to retrieve additional information needed to
|
|
|
|
|
/// establish the connection to the server
|
|
|
|
|
async fn connect(&self, destination: Destination, options: Map) -> io::Result<ConnectionId> {
|
|
|
|
|
async fn connect(
|
|
|
|
|
&self,
|
|
|
|
|
destination: Destination,
|
|
|
|
|
options: Map,
|
|
|
|
|
mut authenticator: ManagerAuthenticator,
|
|
|
|
|
) -> io::Result<ConnectionId> {
|
|
|
|
|
let scheme = match destination.scheme.as_deref() {
|
|
|
|
|
Some(scheme) => {
|
|
|
|
|
trace!("Using scheme {}", scheme);
|
|
|
|
@ -109,7 +126,9 @@ impl ManagerServer {
|
|
|
|
|
format!("No connect handler registered for {}", scheme),
|
|
|
|
|
)
|
|
|
|
|
})?;
|
|
|
|
|
handler.connect(&destination, &options, auth).await?
|
|
|
|
|
handler
|
|
|
|
|
.connect(&destination, &options, &mut authenticator)
|
|
|
|
|
.await?
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let connection = ManagerConnection::new(destination, options, transport);
|
|
|
|
@ -191,25 +210,45 @@ impl ServerHandler for ManagerServer {
|
|
|
|
|
ManagerRequest::Launch {
|
|
|
|
|
destination,
|
|
|
|
|
options,
|
|
|
|
|
} => match self.launch(*destination, options).await {
|
|
|
|
|
} => match self
|
|
|
|
|
.launch(
|
|
|
|
|
*destination,
|
|
|
|
|
options,
|
|
|
|
|
ManagerAuthenticator {
|
|
|
|
|
reply,
|
|
|
|
|
registry: Arc::clone(&self.registry),
|
|
|
|
|
},
|
|
|
|
|
)
|
|
|
|
|
.await
|
|
|
|
|
{
|
|
|
|
|
Ok(destination) => ManagerResponse::Launched { destination },
|
|
|
|
|
Err(x) => ManagerResponse::Error(x.into()),
|
|
|
|
|
},
|
|
|
|
|
ManagerRequest::Connect {
|
|
|
|
|
destination,
|
|
|
|
|
options,
|
|
|
|
|
} => match self.connect(*destination, options).await {
|
|
|
|
|
} => match self
|
|
|
|
|
.connect(
|
|
|
|
|
*destination,
|
|
|
|
|
options,
|
|
|
|
|
ManagerAuthenticator {
|
|
|
|
|
reply,
|
|
|
|
|
registry: Arc::clone(&self.registry),
|
|
|
|
|
},
|
|
|
|
|
)
|
|
|
|
|
.await
|
|
|
|
|
{
|
|
|
|
|
Ok(id) => ManagerResponse::Connected { id },
|
|
|
|
|
Err(x) => ManagerResponse::Error(x.into()),
|
|
|
|
|
},
|
|
|
|
|
ManagerRequest::Authenticate { id, msg } => {
|
|
|
|
|
match self.authentication.write().await.remove(&id) {
|
|
|
|
|
match self.registry.write().await.remove(&id) {
|
|
|
|
|
Some(cb) => match cb.send(msg) {
|
|
|
|
|
Ok(_) => ManagerResponse::Ok,
|
|
|
|
|
Err(x) => ManagerResponse::Error(x.into()),
|
|
|
|
|
},
|
|
|
|
|
None => ManagerResponse::Error(
|
|
|
|
|
io::Error::new(io::ErrorKind::InvalidInput, "Invalid authenticate id")
|
|
|
|
|
io::Error::new(io::ErrorKind::InvalidInput, "Invalid authentication id")
|
|
|
|
|
.into(),
|
|
|
|
|
),
|
|
|
|
|
}
|
|
|
|
@ -283,65 +322,48 @@ impl ServerHandler for ManagerServer {
|
|
|
|
|
#[cfg(test)]
|
|
|
|
|
mod tests {
|
|
|
|
|
use super::*;
|
|
|
|
|
use crate::{
|
|
|
|
|
common::{FramedTransport, InmemoryTransport, MappedListener, OneshotListener, PlainCodec},
|
|
|
|
|
server::ServerRef,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
/// Create a new server, bypassing the start loop
|
|
|
|
|
fn setup() -> ManagerServer {
|
|
|
|
|
ManagerServer {
|
|
|
|
|
config: Default::default(),
|
|
|
|
|
connections: RwLock::new(HashMap::new()),
|
|
|
|
|
launch_handlers: Arc::new(RwLock::new(HashMap::new())),
|
|
|
|
|
connect_handlers: Arc::new(RwLock::new(HashMap::new())),
|
|
|
|
|
task: tokio::spawn(async move {}),
|
|
|
|
|
use crate::server::ServerReply;
|
|
|
|
|
|
|
|
|
|
fn test_config() -> Config {
|
|
|
|
|
Config {
|
|
|
|
|
launch_fallback_scheme: "ssh".to_string(),
|
|
|
|
|
connect_fallback_scheme: "distant".to_string(),
|
|
|
|
|
connection_buffer_size: 100,
|
|
|
|
|
user: false,
|
|
|
|
|
launch_handlers: HashMap::new(),
|
|
|
|
|
connect_handlers: HashMap::new(),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Creates a connected [`AuthClient`] with a launched auth server that blindly responds
|
|
|
|
|
fn auth_client_server() -> (AuthClient, Box<dyn ServerRef>) {
|
|
|
|
|
let (t1, t2) = FramedTransport::pair(1);
|
|
|
|
|
let client = AuthClient::from(Client::from_framed_transport(t1).unwrap());
|
|
|
|
|
|
|
|
|
|
// Create a server that does nothing, but will support
|
|
|
|
|
let server = HeapAuthServer {
|
|
|
|
|
on_challenge: Box::new(|_, _| Vec::new()),
|
|
|
|
|
on_verify: Box::new(|_, _| false),
|
|
|
|
|
on_info: Box::new(|_| ()),
|
|
|
|
|
on_error: Box::new(|_, _| ()),
|
|
|
|
|
}
|
|
|
|
|
.start(MappedListener::new(OneshotListener::from_value(t2), |t| {
|
|
|
|
|
t.into_split()
|
|
|
|
|
}))
|
|
|
|
|
.unwrap();
|
|
|
|
|
/// Create a new server and authenticator
|
|
|
|
|
fn setup(config: Config) -> (ManagerServer, ManagerAuthenticator) {
|
|
|
|
|
let registry = Arc::new(RwLock::new(HashMap::new()));
|
|
|
|
|
|
|
|
|
|
(client, server)
|
|
|
|
|
}
|
|
|
|
|
let authenticator = ManagerAuthenticator {
|
|
|
|
|
reply: ServerReply {
|
|
|
|
|
origin_id: format!("{}", rand::random::<u8>()),
|
|
|
|
|
tx: mpsc::channel(1).0,
|
|
|
|
|
},
|
|
|
|
|
registry: Arc::clone(®istry),
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
fn dummy_distant_writer_reader() -> (BoxedDistantWriter, BoxedDistantReader) {
|
|
|
|
|
setup_distant_writer_reader().0
|
|
|
|
|
}
|
|
|
|
|
let server = ManagerServer {
|
|
|
|
|
config,
|
|
|
|
|
connections: RwLock::new(HashMap::new()),
|
|
|
|
|
registry,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
/// Creates a writer & reader with a connected transport
|
|
|
|
|
fn setup_distant_writer_reader() -> (
|
|
|
|
|
(BoxedDistantWriter, BoxedDistantReader),
|
|
|
|
|
FramedTransport<InmemoryTransport, PlainCodec>,
|
|
|
|
|
) {
|
|
|
|
|
let (t1, t2) = FramedTransport::pair(1);
|
|
|
|
|
let (writer, reader) = t1.into_split();
|
|
|
|
|
((Box::new(writer), Box::new(reader)), t2)
|
|
|
|
|
(server, authenticator)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn launch_should_fail_if_destination_scheme_is_unsupported() {
|
|
|
|
|
let server = setup();
|
|
|
|
|
let (server, authenticator) = setup(test_config());
|
|
|
|
|
|
|
|
|
|
let destination = "scheme://host".parse::<Destination>().unwrap();
|
|
|
|
|
let options = "".parse::<Map>().unwrap();
|
|
|
|
|
let (mut auth, _auth_server) = auth_client_server();
|
|
|
|
|
let err = server
|
|
|
|
|
.launch(destination, options, Some(&mut auth))
|
|
|
|
|
.launch(destination, options, authenticator)
|
|
|
|
|
.await
|
|
|
|
|
.unwrap_err();
|
|
|
|
|
assert_eq!(err.kind(), io::ErrorKind::InvalidInput, "{:?}", err);
|
|
|
|
@ -349,23 +371,19 @@ mod tests {
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn launch_should_fail_if_handler_tied_to_scheme_fails() {
|
|
|
|
|
let server = setup();
|
|
|
|
|
let mut config = test_config();
|
|
|
|
|
|
|
|
|
|
let handler: Box<dyn LaunchHandler> = Box::new(|_: &_, _: &_, _: &mut _| async {
|
|
|
|
|
Err(io::Error::new(io::ErrorKind::Other, "test failure"))
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
server
|
|
|
|
|
.launch_handlers
|
|
|
|
|
.write()
|
|
|
|
|
.await
|
|
|
|
|
.insert("scheme".to_string(), handler);
|
|
|
|
|
config.launch_handlers.insert("scheme".to_string(), handler);
|
|
|
|
|
|
|
|
|
|
let (server, authenticator) = setup(config);
|
|
|
|
|
let destination = "scheme://host".parse::<Destination>().unwrap();
|
|
|
|
|
let options = "".parse::<Map>().unwrap();
|
|
|
|
|
let (mut auth, _auth_server) = auth_client_server();
|
|
|
|
|
let err = server
|
|
|
|
|
.launch(destination, options, Some(&mut auth))
|
|
|
|
|
.launch(destination, options, authenticator)
|
|
|
|
|
.await
|
|
|
|
|
.unwrap_err();
|
|
|
|
|
assert_eq!(err.kind(), io::ErrorKind::Other);
|
|
|
|
@ -374,7 +392,7 @@ mod tests {
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn launch_should_return_new_destination_on_success() {
|
|
|
|
|
let server = setup();
|
|
|
|
|
let mut config = test_config();
|
|
|
|
|
|
|
|
|
|
let handler: Box<dyn LaunchHandler> = {
|
|
|
|
|
Box::new(|_: &_, _: &_, _: &mut _| async {
|
|
|
|
@ -382,17 +400,13 @@ mod tests {
|
|
|
|
|
})
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
server
|
|
|
|
|
.launch_handlers
|
|
|
|
|
.write()
|
|
|
|
|
.await
|
|
|
|
|
.insert("scheme".to_string(), handler);
|
|
|
|
|
config.launch_handlers.insert("scheme".to_string(), handler);
|
|
|
|
|
|
|
|
|
|
let (server, authenticator) = setup(config);
|
|
|
|
|
let destination = "scheme://host".parse::<Destination>().unwrap();
|
|
|
|
|
let options = "key=value".parse::<Map>().unwrap();
|
|
|
|
|
let (mut auth, _auth_server) = auth_client_server();
|
|
|
|
|
let destination = server
|
|
|
|
|
.launch(destination, options, Some(&mut auth))
|
|
|
|
|
.launch(destination, options, authenticator)
|
|
|
|
|
.await
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
|
@ -404,13 +418,12 @@ mod tests {
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn connect_should_fail_if_destination_scheme_is_unsupported() {
|
|
|
|
|
let server = setup();
|
|
|
|
|
let (server, authenticator) = setup(test_config());
|
|
|
|
|
|
|
|
|
|
let destination = "scheme://host".parse::<Destination>().unwrap();
|
|
|
|
|
let options = "".parse::<Map>().unwrap();
|
|
|
|
|
let (mut auth, _auth_server) = auth_client_server();
|
|
|
|
|
let err = server
|
|
|
|
|
.connect(destination, options, Some(&mut auth))
|
|
|
|
|
.connect(destination, options, authenticator)
|
|
|
|
|
.await
|
|
|
|
|
.unwrap_err();
|
|
|
|
|
assert_eq!(err.kind(), io::ErrorKind::InvalidInput, "{:?}", err);
|
|
|
|
@ -418,23 +431,21 @@ mod tests {
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn connect_should_fail_if_handler_tied_to_scheme_fails() {
|
|
|
|
|
let server = setup();
|
|
|
|
|
let mut config = test_config();
|
|
|
|
|
|
|
|
|
|
let handler: Box<dyn ConnectHandler> = Box::new(|_: &_, _: &_, _: &mut _| async {
|
|
|
|
|
Err(io::Error::new(io::ErrorKind::Other, "test failure"))
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
server
|
|
|
|
|
config
|
|
|
|
|
.connect_handlers
|
|
|
|
|
.write()
|
|
|
|
|
.await
|
|
|
|
|
.insert("scheme".to_string(), handler);
|
|
|
|
|
|
|
|
|
|
let (server, authenticator) = setup(config);
|
|
|
|
|
let destination = "scheme://host".parse::<Destination>().unwrap();
|
|
|
|
|
let options = "".parse::<Map>().unwrap();
|
|
|
|
|
let (mut auth, _auth_server) = auth_client_server();
|
|
|
|
|
let err = server
|
|
|
|
|
.connect(destination, options, Some(&mut auth))
|
|
|
|
|
.connect(destination, options, authenticator)
|
|
|
|
|
.await
|
|
|
|
|
.unwrap_err();
|
|
|
|
|
assert_eq!(err.kind(), io::ErrorKind::Other);
|
|
|
|
@ -443,22 +454,20 @@ mod tests {
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn connect_should_return_id_of_new_connection_on_success() {
|
|
|
|
|
let server = setup();
|
|
|
|
|
let mut config = test_config();
|
|
|
|
|
|
|
|
|
|
let handler: Box<dyn ConnectHandler> =
|
|
|
|
|
Box::new(|_: &_, _: &_, _: &mut _| async { Ok(dummy_distant_writer_reader()) });
|
|
|
|
|
|
|
|
|
|
server
|
|
|
|
|
config
|
|
|
|
|
.connect_handlers
|
|
|
|
|
.write()
|
|
|
|
|
.await
|
|
|
|
|
.insert("scheme".to_string(), handler);
|
|
|
|
|
|
|
|
|
|
let (server, authenticator) = setup(config);
|
|
|
|
|
let destination = "scheme://host".parse::<Destination>().unwrap();
|
|
|
|
|
let options = "key=value".parse::<Map>().unwrap();
|
|
|
|
|
let (mut auth, _auth_server) = auth_client_server();
|
|
|
|
|
let id = server
|
|
|
|
|
.connect(destination, options, Some(&mut auth))
|
|
|
|
|
.connect(destination, options, authenticator)
|
|
|
|
|
.await
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
|
@ -471,7 +480,7 @@ mod tests {
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn info_should_fail_if_no_connection_found_for_specified_id() {
|
|
|
|
|
let server = setup();
|
|
|
|
|
let (server, _) = setup(test_config());
|
|
|
|
|
|
|
|
|
|
let err = server.info(999).await.unwrap_err();
|
|
|
|
|
assert_eq!(err.kind(), io::ErrorKind::NotConnected, "{:?}", err);
|
|
|
|
@ -479,7 +488,7 @@ mod tests {
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn info_should_return_information_about_established_connection() {
|
|
|
|
|
let server = setup();
|
|
|
|
|
let (server, _) = setup(test_config());
|
|
|
|
|
|
|
|
|
|
let (writer, reader) = dummy_distant_writer_reader();
|
|
|
|
|
let connection = ManagerConnection::new(
|
|
|
|
@ -504,7 +513,7 @@ mod tests {
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn list_should_return_empty_connection_list_if_no_established_connections() {
|
|
|
|
|
let server = setup();
|
|
|
|
|
let (server, _) = setup(test_config());
|
|
|
|
|
|
|
|
|
|
let list = server.list().await.unwrap();
|
|
|
|
|
assert_eq!(list, ConnectionList(HashMap::new()));
|
|
|
|
@ -512,7 +521,7 @@ mod tests {
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn list_should_return_a_list_of_established_connections() {
|
|
|
|
|
let server = setup();
|
|
|
|
|
let (server, _) = setup(test_config());
|
|
|
|
|
|
|
|
|
|
let (writer, reader) = dummy_distant_writer_reader();
|
|
|
|
|
let connection = ManagerConnection::new(
|
|
|
|
@ -547,7 +556,7 @@ mod tests {
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn kill_should_fail_if_no_connection_found_for_specified_id() {
|
|
|
|
|
let server = setup();
|
|
|
|
|
let (server, _) = setup(test_config());
|
|
|
|
|
|
|
|
|
|
let err = server.kill(999).await.unwrap_err();
|
|
|
|
|
assert_eq!(err.kind(), io::ErrorKind::NotConnected, "{:?}", err);
|
|
|
|
@ -555,7 +564,7 @@ mod tests {
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn kill_should_terminate_established_connection_and_remove_it_from_the_list() {
|
|
|
|
|
let server = setup();
|
|
|
|
|
let (server, _) = setup(test_config());
|
|
|
|
|
|
|
|
|
|
let (writer, reader) = dummy_distant_writer_reader();
|
|
|
|
|
let connection = ManagerConnection::new(
|
|
|
|
|