Unfinished migration of manager logic to distant-net

pull/146/head
Chip Senkbeil 2 years ago
parent 57e90a50a8
commit d2a5781a40
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

1
Cargo.lock generated

@ -824,6 +824,7 @@ dependencies = [
"serde",
"serde_bytes",
"sha2 0.10.2",
"strum",
"tempfile",
"test-log",
"tokio",

@ -1,108 +0,0 @@
use async_trait::async_trait;
use distant_net::common::authentication::{
msg::{ErrorKind, Question, VerificationKind},
AuthHandler,
};
use log::*;
use std::{collections::HashMap, io};
/// Configuration to use when creating a new [`DistantManagerClient`](super::DistantManagerClient)
pub struct DistantManagerClientConfig {
pub on_challenge:
Box<dyn FnMut(Vec<Question>, HashMap<String, String>) -> io::Result<Vec<String>>>,
pub on_verify: Box<dyn FnMut(VerificationKind, String) -> io::Result<bool>>,
pub on_info: Box<dyn FnMut(String) -> io::Result<()>>,
pub on_error: Box<dyn FnMut(ErrorKind, &str) -> io::Result<()>>,
}
#[async_trait]
impl AuthHandler for DistantManagerClientConfig {
async fn on_challenge(
&mut self,
questions: Vec<Question>,
options: HashMap<String, String>,
) -> io::Result<Vec<String>> {
(self.on_challenge)(questions, options)
}
async fn on_verification(&mut self, kind: VerificationKind, text: String) -> io::Result<bool> {
(self.on_verify)(kind, text)
}
async fn on_info(&mut self, text: String) -> io::Result<()> {
(self.on_info)(text)
}
async fn on_error(&mut self, kind: ErrorKind, text: &str) -> io::Result<()> {
(self.on_error)(kind, text)
}
}
impl DistantManagerClientConfig {
/// Creates a new config with prompts that return empty strings
pub fn with_empty_prompts() -> Self {
Self::with_prompts(|_| Ok("".to_string()), |_| Ok("".to_string()))
}
/// Creates a new config with two prompts
///
/// * `password_prompt` - used for prompting for a secret, and should not display what is typed
/// * `text_prompt` - used for general text, and is okay to display what is typed
pub fn with_prompts<PP, PT>(password_prompt: PP, text_prompt: PT) -> Self
where
PP: Fn(&str) -> io::Result<String> + Send + Sync + 'static,
PT: Fn(&str) -> io::Result<String> + Send + Sync + 'static,
{
Self {
on_challenge: Box::new(move |questions, _extra| {
trace!("[manager client] on_challenge({questions:?}, {_extra:?})");
let mut answers = Vec::new();
for question in questions.iter() {
// Contains all prompt lines including same line
let mut lines = question.text.split('\n').collect::<Vec<_>>();
// Line that is prompt on same line as answer
let line = lines.pop().unwrap();
// Go ahead and display all other lines
for line in lines.into_iter() {
eprintln!("{}", line);
}
// Get an answer from user input, or use a blank string as an answer
// if we fail to get input from the user
let answer = password_prompt(line).unwrap_or_default();
answers.push(answer);
}
Ok(answers)
}),
on_verify: Box::new(move |kind, text| {
trace!("[manager client] on_verify({kind}, {text})");
match kind {
VerificationKind::Host => {
eprintln!("{}", text);
let answer = text_prompt("Enter [y/N]> ")?;
trace!("Verify? Answer = '{answer}'");
Ok(matches!(answer.trim(), "y" | "Y" | "yes" | "YES"))
}
x => {
error!("Unsupported verify kind: {x}");
Ok(false)
}
}
}),
on_info: Box::new(|text| {
trace!("[manager client] on_info({text})");
println!("{}", text);
Ok(())
}),
on_error: Box::new(|kind, text| {
trace!("[manager client] on_error({kind}, {text})");
eprintln!("{}: {}", kind, text);
Ok(())
}),
}
}
}

@ -1,14 +0,0 @@
mod tcp;
pub use tcp::*;
#[cfg(unix)]
mod unix;
#[cfg(unix)]
pub use unix::*;
#[cfg(windows)]
mod windows;
#[cfg(windows)]
pub use windows::*;

@ -1,50 +0,0 @@
use crate::{DistantManagerClient, DistantManagerClientConfig};
use async_trait::async_trait;
use distant_net::common::{Codec, FramedTransport, TcpTransport};
use std::{convert, net::SocketAddr};
use tokio::{io, time::Duration};
#[async_trait]
pub trait TcpDistantManagerClientExt {
/// Connect to a remote TCP server using the provided information
async fn connect<C>(
config: DistantManagerClientConfig,
addr: SocketAddr,
codec: C,
) -> io::Result<DistantManagerClient>
where
C: Codec + Send + 'static;
/// Connect to a remote TCP server, timing out after duration has passed
async fn connect_timeout<C>(
config: DistantManagerClientConfig,
addr: SocketAddr,
codec: C,
duration: Duration,
) -> io::Result<DistantManagerClient>
where
C: Codec + Send + 'static,
{
tokio::time::timeout(duration, Self::connect(config, addr, codec))
.await
.map_err(|x| io::Error::new(io::ErrorKind::TimedOut, x))
.and_then(convert::identity)
}
}
#[async_trait]
impl TcpDistantManagerClientExt for DistantManagerClient {
/// Connect to a remote TCP server using the provided information
async fn connect<C>(
config: DistantManagerClientConfig,
addr: SocketAddr,
codec: C,
) -> io::Result<DistantManagerClient>
where
C: Codec + Send + 'static,
{
let transport = TcpTransport::connect(addr).await?;
let transport = FramedTransport::new(transport, codec);
Self::new(config, transport)
}
}

@ -1,54 +0,0 @@
use crate::{DistantManagerClient, DistantManagerClientConfig};
use async_trait::async_trait;
use distant_net::common::{Codec, FramedTransport, UnixSocketTransport};
use std::{convert, path::Path};
use tokio::{io, time::Duration};
#[async_trait]
pub trait UnixSocketDistantManagerClientExt {
/// Connect to a proxy unix socket
async fn connect<P, C>(
config: DistantManagerClientConfig,
path: P,
codec: C,
) -> io::Result<DistantManagerClient>
where
P: AsRef<Path> + Send,
C: Codec + Send + 'static;
/// Connect to a proxy unix socket, timing out after duration has passed
async fn connect_timeout<P, C>(
config: DistantManagerClientConfig,
path: P,
codec: C,
duration: Duration,
) -> io::Result<DistantManagerClient>
where
P: AsRef<Path> + Send,
C: Codec + Send + 'static,
{
tokio::time::timeout(duration, Self::connect(config, path, codec))
.await
.map_err(|x| io::Error::new(io::ErrorKind::TimedOut, x))
.and_then(convert::identity)
}
}
#[async_trait]
impl UnixSocketDistantManagerClientExt for DistantManagerClient {
/// Connect to a proxy unix socket
async fn connect<P, C>(
config: DistantManagerClientConfig,
path: P,
codec: C,
) -> io::Result<DistantManagerClient>
where
P: AsRef<Path> + Send,
C: Codec + Send + 'static,
{
let p = path.as_ref();
let transport = UnixSocketTransport::connect(p).await?;
let transport = FramedTransport::new(transport, codec);
Ok(DistantManagerClient::new(config, transport)?)
}
}

@ -1,91 +0,0 @@
use crate::{DistantManagerClient, DistantManagerClientConfig};
use async_trait::async_trait;
use distant_net::common::{Codec, FramedTransport, WindowsPipeTransport};
use std::{
convert,
ffi::{OsStr, OsString},
};
use tokio::{io, time::Duration};
#[async_trait]
pub trait WindowsPipeDistantManagerClientExt {
/// Connect to a server listening on a Windows pipe at the specified address
/// using the given codec
async fn connect<A, C>(
config: DistantManagerClientConfig,
addr: A,
codec: C,
) -> io::Result<DistantManagerClient>
where
A: AsRef<OsStr> + Send,
C: Codec + Send + 'static;
/// Connect to a server listening on a Windows pipe at the specified address
/// via `\\.\pipe\{name}` using the given codec
async fn connect_local<N, C>(
config: DistantManagerClientConfig,
name: N,
codec: C,
) -> io::Result<DistantManagerClient>
where
N: AsRef<OsStr> + Send,
C: Codec + Send + 'static,
{
let mut addr = OsString::from(r"\\.\pipe\");
addr.push(name.as_ref());
Self::connect(config, addr, codec).await
}
/// Connect to a server listening on a Windows pipe at the specified address
/// using the given codec, timing out after duration has passed
async fn connect_timeout<A, C>(
config: DistantManagerClientConfig,
addr: A,
codec: C,
duration: Duration,
) -> io::Result<DistantManagerClient>
where
A: AsRef<OsStr> + Send,
C: Codec + Send + 'static,
{
tokio::time::timeout(duration, Self::connect(config, addr, codec))
.await
.map_err(|x| io::Error::new(io::ErrorKind::TimedOut, x))
.and_then(convert::identity)
}
/// Connect to a server listening on a Windows pipe at the specified address
/// via `\\.\pipe\{name}` using the given codec, timing out after duration has passed
async fn connect_local_timeout<N, C>(
config: DistantManagerClientConfig,
name: N,
codec: C,
duration: Duration,
) -> io::Result<DistantManagerClient>
where
N: AsRef<OsStr> + Send,
C: Codec + Send + 'static,
{
let mut addr = OsString::from(r"\\.\pipe\");
addr.push(name.as_ref());
Self::connect_timeout(config, addr, codec, duration).await
}
}
#[async_trait]
impl WindowsPipeDistantManagerClientExt for DistantManagerClient {
async fn connect<A, C>(
config: DistantManagerClientConfig,
addr: A,
codec: C,
) -> io::Result<DistantManagerClient>
where
A: AsRef<OsStr> + Send,
C: Codec + Send + 'static,
{
let a = addr.as_ref();
let transport = WindowsPipeTransport::connect(a).await?;
let transport = FramedTransport::new(transport, codec);
Ok(DistantManagerClient::new(config, transport)?)
}
}

@ -1,5 +0,0 @@
/// Id associated with an active connection
pub type ConnectionId = u64;
/// Id associated with an open channel
pub type ChannelId = u64;

@ -1,14 +0,0 @@
mod tcp;
pub use tcp::*;
#[cfg(unix)]
mod unix;
#[cfg(unix)]
pub use unix::*;
#[cfg(windows)]
mod windows;
#[cfg(windows)]
pub use windows::*;

@ -1,31 +0,0 @@
use crate::{DistantManager, DistantManagerConfig};
use distant_net::{
common::{Codec, FramedTransport, MappedListener, PortRange, TcpListener},
server::TcpServerRef,
};
use std::{io, net::IpAddr};
impl DistantManager {
/// Start a new server by binding to the given IP address and one of the ports in the
/// specified range, mapping all connections to use the given codec
pub async fn start_tcp<P, C>(
config: DistantManagerConfig,
addr: IpAddr,
port: P,
codec: C,
) -> io::Result<TcpServerRef>
where
P: Into<PortRange> + Send,
C: Codec + Send + Sync + 'static,
{
let listener = TcpListener::bind(addr, port).await?;
let port = listener.port();
let listener = MappedListener::new(listener, move |transport| {
let transport = FramedTransport::new(transport, codec.clone());
transport.into_split()
});
let inner = DistantManager::start(config, listener)?;
Ok(TcpServerRef::new(addr, port, Box::new(inner)))
}
}

@ -1,51 +0,0 @@
use crate::{DistantManager, DistantManagerConfig};
use distant_net::{
common::{Codec, FramedTransport, MappedListener, UnixSocketListener},
server::UnixSocketServerRef,
};
use std::{io, path::Path};
impl DistantManager {
/// Start a new server using the specified path as a unix socket using default unix socket file
/// permissions
pub async fn start_unix_socket<P, C>(
config: DistantManagerConfig,
path: P,
codec: C,
) -> io::Result<UnixSocketServerRef>
where
P: AsRef<Path> + Send,
C: Codec + Send + Sync + 'static,
{
Self::start_unix_socket_with_permissions(
config,
path,
codec,
UnixSocketListener::default_unix_socket_file_permissions(),
)
.await
}
/// Start a new server using the specified path as a unix socket and `mode` as the unix socket
/// file permissions
pub async fn start_unix_socket_with_permissions<P, C>(
config: DistantManagerConfig,
path: P,
codec: C,
mode: u32,
) -> io::Result<UnixSocketServerRef>
where
P: AsRef<Path> + Send,
C: Codec + Send + Sync + 'static,
{
let listener = UnixSocketListener::bind_with_permissions(path, mode).await?;
let path = listener.path().to_path_buf();
let listener = MappedListener::new(listener, move |transport| {
let transport = FramedTransport::new(transport, codec.clone());
transport.into_split()
});
let inner = DistantManager::start(config, listener)?;
Ok(UnixSocketServerRef::new(path, Box::new(inner)))
}
}

@ -1,49 +0,0 @@
use crate::{DistantManager, DistantManagerConfig};
use distant_net::{
common::{Codec, FramedTransport, MappedListener, WindowsPipeListener },
server::WindowsPipeServerRef,
};
use std::{
ffi::{OsStr, OsString},
io,
};
impl DistantManager {
/// Start a new server at the specified address via `\\.\pipe\{name}` using the given codec
pub async fn start_local_named_pipe<N, C>(
config: DistantManagerConfig,
name: N,
codec: C,
) -> io::Result<WindowsPipeServerRef>
where
Self: Sized,
N: AsRef<OsStr> + Send,
C: Codec + Send + Sync + 'static,
{
let mut addr = OsString::from(r"\\.\pipe\");
addr.push(name.as_ref());
Self::start_named_pipe(config, addr, codec).await
}
/// Start a new server at the specified pipe address using the given codec
pub async fn start_named_pipe<A, C>(
config: DistantManagerConfig,
addr: A,
codec: C,
) -> io::Result<WindowsPipeServerRef>
where
A: AsRef<OsStr> + Send,
C: Codec + Send + Sync + 'static,
{
let a = addr.as_ref();
let listener = WindowsPipeListener::bind(a)?;
let addr = listener.addr().to_os_string();
let listener = MappedListener::new(listener, move |transport| {
let transport = FramedTransport::new(transport, codec.clone());
transport.into_split()
});
let inner = DistantManager::start(config, listener)?;
Ok(WindowsPipeServerRef::new(addr, Box::new(inner)))
}
}

@ -1,69 +0,0 @@
use super::{BoxedConnectHandler, BoxedLaunchHandler, ConnectHandler, LaunchHandler};
use distant_net::server::ServerRef;
use std::{collections::HashMap, io, sync::Weak};
use tokio::sync::RwLock;
/// Reference to a distant manager's server instance
pub struct DistantManagerRef {
/// Mapping of "scheme" -> handler
pub(crate) launch_handlers: Weak<RwLock<HashMap<String, BoxedLaunchHandler>>>,
/// Mapping of "scheme" -> handler
pub(crate) connect_handlers: Weak<RwLock<HashMap<String, BoxedConnectHandler>>>,
pub(crate) inner: Box<dyn ServerRef>,
}
impl DistantManagerRef {
/// Registers a new [`LaunchHandler`] for the specified scheme (e.g. "distant" or "ssh")
pub async fn register_launch_handler(
&self,
scheme: impl Into<String>,
handler: impl LaunchHandler + 'static,
) -> io::Result<()> {
let handlers = Weak::upgrade(&self.launch_handlers).ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
"Handler reference is no longer available",
)
})?;
handlers
.write()
.await
.insert(scheme.into(), Box::new(handler));
Ok(())
}
/// Registers a new [`ConnectHandler`] for the specified scheme (e.g. "distant" or "ssh")
pub async fn register_connect_handler(
&self,
scheme: impl Into<String>,
handler: impl ConnectHandler + 'static,
) -> io::Result<()> {
let handlers = Weak::upgrade(&self.connect_handlers).ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
"Handler reference is no longer available",
)
})?;
handlers
.write()
.await
.insert(scheme.into(), Box::new(handler));
Ok(())
}
}
impl ServerRef for DistantManagerRef {
fn is_finished(&self) -> bool {
self.inner.is_finished()
}
fn abort(&self) {
self.inner.abort();
}
}

@ -28,6 +28,7 @@ rmp-serde = "1.1.0"
sha2 = "0.10.2"
serde = { version = "1.0.142", features = ["derive"] }
serde_bytes = "0.11.7"
strum = { version = "0.24.1", features = ["derive"] }
tokio = { version = "1.20.1", features = ["full"] }
# Optional dependencies based on features

@ -45,7 +45,7 @@ where
U: Send + Sync + DeserializeOwned + 'static,
{
/// Spawns a client using the provided [`Connection`].
fn spawn<V>(mut connection: Connection<V>, mut strategy: ReconnectStrategy) -> Self
pub(crate) fn spawn<V>(mut connection: Connection<V>, mut strategy: ReconnectStrategy) -> Self
where
V: Transport + Send + Sync + 'static,
{

@ -1,7 +1,9 @@
mod any;
pub mod authentication;
mod connection;
mod destination;
mod listener;
mod map;
mod packet;
mod port;
mod transport;
@ -9,7 +11,9 @@ pub(crate) mod utils;
pub use any::*;
pub(crate) use connection::*;
pub use destination::*;
pub use listener::*;
pub use map::*;
pub use packet::*;
pub use port::*;
pub use transport::*;

@ -1,8 +1,9 @@
use super::msg::*;
use async_trait::async_trait;
use log::*;
use std::io;
/// Interface for a handler of authentication requests
/// Interface for a handler of authentication requests.
#[async_trait]
pub trait AuthHandler {
/// Callback when a challenge is received, returning answers to the given questions.
@ -66,3 +67,82 @@ impl AuthHandler for DummyAuthHandler {
Err(io::Error::from(io::ErrorKind::Unsupported))
}
}
/// Blocking implementation of [`AuthHandler`] that uses prompts to communicate challenge &
/// verification requests, receiving responses to relay back.
pub struct PromptAuthHandler<T, U> {
text_prompt: T,
password_prompt: U,
}
impl<T, U> PromptAuthHandler<T, U> {
pub fn new(text_prompt: T, password_prompt: U) -> Self {
Self {
text_prompt,
password_prompt,
}
}
}
#[async_trait]
impl<T, U> AuthHandler for PromptAuthHandler<T, U>
where
T: Fn(&str) -> io::Result<String> + Send + Sync + 'static,
U: Fn(&str) -> io::Result<String> + Send + Sync + 'static,
{
async fn on_challenge(&mut self, challenge: Challenge) -> io::Result<ChallengeResponse> {
trace!("on_challenge({challenge:?})");
let mut answers = Vec::new();
for question in challenge.questions.iter() {
// Contains all prompt lines including same line
let mut lines = question.text.split('\n').collect::<Vec<_>>();
// Line that is prompt on same line as answer
let line = lines.pop().unwrap();
// Go ahead and display all other lines
for line in lines.into_iter() {
eprintln!("{}", line);
}
// Get an answer from user input, or use a blank string as an answer
// if we fail to get input from the user
let answer = self.password_prompt(line).unwrap_or_default();
answers.push(answer);
}
Ok(answers)
}
async fn on_verification(
&mut self,
verification: Verification,
) -> io::Result<VerificationResponse> {
trace!("on_verify({verification:?})");
match verification.kind {
VerificationKind::Host => {
eprintln!("{}", verification.text);
let answer = self.text_prompt("Enter [y/N]> ")?;
trace!("Verify? Answer = '{answer}'");
Ok(matches!(answer.trim(), "y" | "Y" | "yes" | "YES"))
}
x => {
error!("Unsupported verify kind: {x}");
Ok(false)
}
}
}
async fn on_info(&mut self, info: Info) -> io::Result<()> {
trace!("on_info({info:?})");
println!("{}", info.text);
Ok(())
}
async fn on_error(&mut self, error: Error) -> io::Result<()> {
trace!("on_error({error:?})");
eprintln!("{}: {}", error.kind, error.text);
Ok(())
}
}

@ -1,4 +1,4 @@
use crate::serde_str::{deserialize_from_str, serialize_to_str};
use super::utils::{deserialize_from_str, serialize_to_str};
use serde::{de::Deserializer, ser::Serializer, Deserialize, Serialize};
use std::{fmt, hash::Hash, str::FromStr};
@ -38,17 +38,8 @@ pub struct Destination {
}
impl Destination {
/// Returns true if destination represents a distant server
pub fn is_distant(&self) -> bool {
self.scheme_eq("distant")
}
/// Returns true if destination represents an ssh server
pub fn is_ssh(&self) -> bool {
self.scheme_eq("ssh")
}
fn scheme_eq(&self, s: &str) -> bool {
/// Returns true if the destination's scheme represents the specified (case-insensitive).
pub fn scheme_eq(&self, s: &str) -> bool {
match self.scheme.as_ref() {
Some(scheme) => scheme.eq_ignore_ascii_case(s),
None => false,

@ -1,4 +1,4 @@
use crate::serde_str::{deserialize_from_str, serialize_to_str};
use super::{deserialize_from_str, serialize_to_str};
use derive_more::{Display, Error, From};
use serde::{de::Deserializer, ser::Serializer, Deserialize, Serialize};
use std::{

@ -1,5 +1,9 @@
use serde::{de::DeserializeOwned, Serialize};
use std::{future::Future, io, time::Duration};
use serde::{
de::{DeserializeOwned, Deserializer, Error as SerdeError, Visitor},
ser::Serializer,
Serialize,
};
use std::{fmt, future::Future, io, marker::PhantomData, str::FromStr, time::Duration};
use tokio::{sync::mpsc, task::JoinHandle};
pub fn serialize_to_vec<T: Serialize>(value: &T) -> io::Result<Vec<u8>> {
@ -20,6 +24,46 @@ pub fn deserialize_from_slice<T: DeserializeOwned>(slice: &[u8]) -> io::Result<T
})
}
/// From https://docs.rs/serde_with/1.14.0/src/serde_with/rust.rs.html#90-118
pub fn deserialize_from_str<'de, D, T>(deserializer: D) -> Result<T, D::Error>
where
D: Deserializer<'de>,
T: FromStr,
T::Err: fmt::Display,
{
struct Helper<S>(PhantomData<S>);
impl<'de, S> Visitor<'de> for Helper<S>
where
S: FromStr,
<S as FromStr>::Err: fmt::Display,
{
type Value = S;
fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(formatter, "a string")
}
fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
where
E: SerdeError,
{
value.parse::<Self::Value>().map_err(SerdeError::custom)
}
}
deserializer.deserialize_str(Helper(PhantomData))
}
/// From https://docs.rs/serde_with/1.14.0/src/serde_with/rust.rs.html#121-127
pub fn serialize_to_str<T, S>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
where
T: fmt::Display,
S: Serializer,
{
serializer.collect_str(&value)
}
pub(crate) struct Timer<T>
where
T: Send + 'static,

@ -1,6 +1,7 @@
pub mod client;
pub mod common;
pub mod server;
pub mod manager;
pub use client::{Client, ReconnectStrategy};
pub use server::Server;

@ -1,97 +1,23 @@
use super::data::{
ConnectionId, ConnectionInfo, ConnectionList, Destination, ManagerCapabilities, ManagerRequest,
ManagerResponse,
};
use crate::{DistantChannel, DistantClient, Map};
use distant_net::{
common::{FramedTransport, InmemoryTransport, Transport},
server::ServerRef,
Client,
use crate::{
client::Client,
common::{ConnectionId, Destination, Map},
manager::data::{
ConnectionInfo, ConnectionList, ManagerCapabilities, ManagerRequest, ManagerResponse,
},
};
use log::*;
use std::{
collections::HashMap,
io,
ops::{Deref, DerefMut},
};
use tokio::{sync::oneshot, task::JoinHandle};
mod config;
pub use config::*;
mod ext;
pub use ext::*;
use std::io;
/// Represents a client that can connect to a remote distant manager
pub struct DistantManagerClient {
client: Client<ManagerRequest, ManagerResponse>,
distant_clients: HashMap<ConnectionId, ClientHandle>,
}
mod channel;
pub use channel::*;
impl Drop for DistantManagerClient {
fn drop(&mut self) {
self.client.abort();
}
}
/// Represents a raw channel between a manager client and some remote server
pub struct RawDistantChannel {
pub transport: FramedTransport<InmemoryTransport>,
forward_task: JoinHandle<()>,
mailbox_task: JoinHandle<()>,
}
impl RawDistantChannel {
pub fn abort(&self) {
self.forward_task.abort();
self.mailbox_task.abort();
}
}
impl Deref for RawDistantChannel {
type Target = FramedTransport<InmemoryTransport>;
fn deref(&self) -> &Self::Target {
&self.transport
}
}
impl DerefMut for RawDistantChannel {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.transport
}
}
struct ClientHandle {
client: DistantClient,
forward_task: JoinHandle<()>,
mailbox_task: JoinHandle<()>,
}
impl Drop for ClientHandle {
fn drop(&mut self) {
self.forward_task.abort();
self.mailbox_task.abort();
}
}
impl DistantManagerClient {
/// Initializes a client using the provided [`Transport`]
pub async fn new<T>(config: DistantManagerClientConfig, transport: T) -> io::Result<Self>
where
T: Transport + 'static,
{
let transport = FramedTransport::from_client_handshake(transport).await?;
Ok(Self {
client: Client::new(transport),
distant_clients: HashMap::new(),
})
}
/// Represents a client that can connect to a remote server manager.
pub type ManagerClient = Client<ManagerRequest, ManagerResponse>;
impl ManagerClient {
/// Request that the manager launches a new server at the given `destination`
/// with `options` being passed for destination-specific details, returning the new
/// `destination` of the spawned server to connect to
/// `destination` of the spawned server.
pub async fn launch(
&mut self,
destination: impl Into<Destination>,
@ -102,7 +28,6 @@ impl DistantManagerClient {
trace!("launch({}, {})", destination, options);
let res = self
.client
.send(ManagerRequest::Launch {
destination,
options,
@ -130,7 +55,6 @@ impl DistantManagerClient {
trace!("connect({}, {})", destination, options);
let res = self
.client
.send(ManagerRequest::Connect {
destination,
options,
@ -147,127 +71,24 @@ impl DistantManagerClient {
}
/// Establishes a channel with the server represented by the `connection_id`,
/// returning a [`DistantChannel`] acting as the connection
///
/// ### Note
///
/// Multiple calls to open a channel against the same connection will result in
/// clones of the same [`DistantChannel`] rather than establishing a duplicate
/// remote connection to the same server
pub async fn open_channel(
&mut self,
connection_id: ConnectionId,
) -> io::Result<DistantChannel> {
trace!("open_channel({})", connection_id);
if let Some(handle) = self.distant_clients.get(&connection_id) {
Ok(handle.client.clone_channel())
} else {
let RawDistantChannel {
transport,
forward_task,
mailbox_task,
} = self.open_raw_channel(connection_id).await?;
let client = DistantClient::new(transport);
let channel = client.clone_channel();
self.distant_clients.insert(
connection_id,
ClientHandle {
client,
forward_task,
mailbox_task,
},
);
Ok(channel)
}
}
/// Establishes a channel with the server represented by the `connection_id`,
/// returning a [`Transport`] acting as the connection
/// returning a [`RawChannel`] acting as the connection.
///
/// ### Note
///
/// Multiple calls to open a channel against the same connection will result in establishing a
/// duplicate remote connections to the same server, so take care when using this method
/// duplicate channel to the same server, so take care when using this method.
pub async fn open_raw_channel(
&mut self,
connection_id: ConnectionId,
) -> io::Result<RawDistantChannel> {
) -> io::Result<RawChannel> {
trace!("open_raw_channel({})", connection_id);
let mut mailbox = self
.client
.mail(ManagerRequest::OpenChannel { id: connection_id })
.await?;
// Wait for the first response, which should be channel confirmation
let channel_id = match mailbox.next().await {
Some(response) => match response.payload {
ManagerResponse::ChannelOpened { id } => Ok(id),
ManagerResponse::Error(x) => Err(x.into()),
x => Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Got unexpected response: {x:?}"),
)),
},
None => Err(io::Error::new(
io::ErrorKind::ConnectionAborted,
"open_channel mailbox aborted",
)),
}?;
// Spawn reader and writer tasks to forward requests and replies
// using our opened channel
let (tx, rx, transport) = InmemoryTransport::make(1);
let (channel_close_tx, channel_close_rx) = oneshot::channel();
let mailbox_task = tokio::spawn(async move {
while let Some(response) = mailbox.next().await {
match response.payload {
ManagerResponse::Channel { data, .. } => {
if let Err(x) = tx.send(data).await {
error!("[Conn {connection_id}] {x}");
}
}
ManagerResponse::ChannelClosed { .. } => {
let _ = channel_close_tx.send(());
break;
}
_ => continue,
}
}
});
let mut manager_channel = self.client.clone_channel();
let forward_task = tokio::spawn(async move {
loop {
tokio::select! {
_ = channel_close_rx => { break }
data = rx.recv() => {
// NOTE: In this situation, we do not expect a response to this
// request (even if the server sends something back)
if let Err(x) = manager_channel
.fire(ManagerRequest::Channel {
id: channel_id,
data,
})
.await
{
error!("[Conn {connection_id}] {x}");
}
}
}
}
});
Ok(RawDistantChannel {
transport,
forward_task,
mailbox_task,
})
RawChannel::spawn(connection_id, self).await
}
/// Retrieves a list of supported capabilities
pub async fn capabilities(&mut self) -> io::Result<ManagerCapabilities> {
trace!("capabilities()");
let res = self.client.send(ManagerRequest::Capabilities).await?;
let res = self.send(ManagerRequest::Capabilities).await?;
match res.payload {
ManagerResponse::Capabilities { supported } => Ok(supported),
ManagerResponse::Error(x) => Err(x.into()),
@ -281,7 +102,7 @@ impl DistantManagerClient {
/// Retrieves information about a specific connection
pub async fn info(&mut self, id: ConnectionId) -> io::Result<ConnectionInfo> {
trace!("info({})", id);
let res = self.client.send(ManagerRequest::Info { id }).await?;
let res = self.send(ManagerRequest::Info { id }).await?;
match res.payload {
ManagerResponse::Info(info) => Ok(info),
ManagerResponse::Error(x) => Err(x.into()),
@ -295,7 +116,7 @@ impl DistantManagerClient {
/// Kills the specified connection
pub async fn kill(&mut self, id: ConnectionId) -> io::Result<()> {
trace!("kill({})", id);
let res = self.client.send(ManagerRequest::Kill { id }).await?;
let res = self.send(ManagerRequest::Kill { id }).await?;
match res.payload {
ManagerResponse::Killed => Ok(()),
ManagerResponse::Error(x) => Err(x.into()),
@ -309,7 +130,7 @@ impl DistantManagerClient {
/// Retrieves a list of active connections
pub async fn list(&mut self) -> io::Result<ConnectionList> {
trace!("list()");
let res = self.client.send(ManagerRequest::List).await?;
let res = self.send(ManagerRequest::List).await?;
match res.payload {
ManagerResponse::List(list) => Ok(list),
ManagerResponse::Error(x) => Err(x.into()),
@ -319,33 +140,20 @@ impl DistantManagerClient {
)),
}
}
/// Requests that the manager shuts down
pub async fn shutdown(&mut self) -> io::Result<()> {
trace!("shutdown()");
let res = self.client.send(ManagerRequest::Shutdown).await?;
match res.payload {
ManagerResponse::Shutdown => Ok(()),
ManagerResponse::Error(x) => Err(x.into()),
x => Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Got unexpected response: {:?}", x),
)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::data::{Error, ErrorKind};
use distant_net::common::{Request, Response};
use crate::{
common::{Request, Response},
manager::data::{Error, ErrorKind},
};
fn setup() -> (DistantManagerClient, FramedTransport<InmemoryTransport>) {
fn setup() -> (ManagerClient, FramedTransport<InmemoryTransport>) {
let (t1, t2) = FramedTransport::test_pair(100);
let client =
DistantManagerClient::new(DistantManagerClientConfig::with_empty_prompts(), t1)
.unwrap();
ManagerClient::new(DistantManagerClientConfig::with_empty_prompts(), t1).unwrap();
(client, t2)
}
@ -668,73 +476,4 @@ mod tests {
client.kill(123).await.unwrap();
}
#[tokio::test]
async fn shutdown_should_report_error_if_receives_error_response() {
let (mut client, mut transport) = setup();
tokio::spawn(async move {
let request = transport
.read::<Request<ManagerRequest>>()
.await
.unwrap()
.unwrap();
transport
.write(Response::new(
request.id,
ManagerResponse::Connected { id: 0 },
))
.await
.unwrap();
});
let err = client.shutdown().await.unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::InvalidData);
}
#[tokio::test]
async fn shutdown_should_report_error_if_receives_unexpected_response() {
let (mut client, mut transport) = setup();
tokio::spawn(async move {
let request = transport
.read::<Request<ManagerRequest>>()
.await
.unwrap()
.unwrap();
transport
.write(Response::new(
request.id,
ManagerResponse::Error(test_error()),
))
.await
.unwrap();
});
let err = client.shutdown().await.unwrap_err();
assert_eq!(err.kind(), test_io_error().kind());
assert_eq!(err.to_string(), test_io_error().to_string());
}
#[tokio::test]
async fn shutdown_should_return_success_from_successful_response() {
let (mut client, mut transport) = setup();
tokio::spawn(async move {
let request = transport
.read::<Request<ManagerRequest>>()
.await
.unwrap()
.unwrap();
transport
.write(Response::new(request.id, ManagerResponse::Shutdown))
.await
.unwrap();
});
client.shutdown().await.unwrap();
}
}

@ -0,0 +1,143 @@
use crate::{
client::{Client, ReconnectStrategy},
common::{authentication::AuthHandler, Connection, ConnectionId, InmemoryTransport},
manager::data::{ManagerRequest, ManagerResponse},
};
use log::*;
use serde::{de::DeserializeOwned, Serialize};
use std::{
io,
ops::{Deref, DerefMut},
};
use tokio::{sync::oneshot, task::JoinHandle};
/// Represents a raw channel between a manager client and server. Underneath, this routes incoming
/// and outgoing data from a proxied server to an inmemory transport.
pub struct RawChannel {
transport: InmemoryTransport,
forward_task: JoinHandle<()>,
mailbox_task: JoinHandle<()>,
}
impl RawChannel {
pub fn abort(&self) {
self.forward_task.abort();
self.mailbox_task.abort();
}
/// Consumes this channel, returning a typed client wrapping the transport.
///
/// ### Note
///
/// This will perform necessary handshakes and authentication (via `handler`) with the server.
///
/// Because the underlying transport maps to the same, singular connection with the manager
/// of servers, the reconnect strategy is set up to fail immediately as the actual reconnect
/// logic is handled by the primary client connection with the manager, not the connection
/// with a proxied server.
pub async fn spawn_client<T, U>(
self,
handler: impl AuthHandler + Send,
) -> io::Result<Client<T, U>>
where
T: Send + Sync + Serialize + 'static,
U: Send + Sync + DeserializeOwned + 'static,
{
let connection = Connection::client(self.transport, handler).await?;
Ok(Client::spawn(connection, ReconnectStrategy::Fail))
}
}
impl Deref for RawChannel {
type Target = InmemoryTransport;
fn deref(&self) -> &Self::Target {
&self.transport
}
}
impl DerefMut for RawChannel {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.transport
}
}
impl RawChannel {
pub(super) async fn spawn(
connection_id: ConnectionId,
client: &mut Client<ManagerRequest, ManagerResponse>,
) -> io::Result<Self> {
let mut mailbox = client
.mail(ManagerRequest::OpenChannel { id: connection_id })
.await?;
// Wait for the first response, which should be channel confirmation
let channel_id = match mailbox.next().await {
Some(response) => match response.payload {
ManagerResponse::ChannelOpened { id } => Ok(id),
ManagerResponse::Error(x) => Err(x.into()),
x => Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("[Conn {connection_id}] Raw channel open unexpected response: {x:?}"),
)),
},
None => Err(io::Error::new(
io::ErrorKind::ConnectionAborted,
format!("[Conn {connection_id}] Raw channel mailbox aborted"),
)),
}?;
// Spawn our channel proxy transport
let (tx, rx, transport) = InmemoryTransport::make(1);
let (channel_close_tx, channel_close_rx) = oneshot::channel();
let mailbox_task = tokio::spawn(async move {
while let Some(response) = mailbox.next().await {
match response.payload {
ManagerResponse::Channel { data, .. } => {
if let Err(x) = tx.send(data).await {
error!("[Conn {connection_id} :: Chan {channel_id}] {x}");
}
}
ManagerResponse::ChannelClosed { .. } => {
let _ = channel_close_tx.send(());
break;
}
_ => continue,
}
}
});
let mut manager_channel = client.clone_channel();
let forward_task = tokio::spawn(async move {
loop {
tokio::select! {
_ = channel_close_rx => { break }
data = rx.recv() => {
match data {
Some(data) => {
// NOTE: In this situation, we do not expect a response to this
// request (even if the server sends something back)
if let Err(x) = manager_channel
.fire(ManagerRequest::Channel {
id: channel_id,
data,
})
.await
{
error!("[Conn {connection_id} :: Chan {channel_id}] {x}");
}
}
None => break,
}
}
}
}
});
Ok(RawChannel {
transport,
forward_task,
mailbox_task,
})
}
}

@ -1,12 +1,6 @@
mod capabilities;
pub use capabilities::*;
mod destination;
pub use destination::*;
mod id;
pub use id::*;
mod info;
pub use info::*;

@ -1,5 +1,4 @@
use super::{ConnectionId, Destination};
use crate::data::Map;
use crate::common::{ConnectionId, Destination, Map};
use serde::{Deserialize, Serialize};
/// Information about a specific connection

@ -1,4 +1,4 @@
use super::{ConnectionId, Destination};
use crate::common::{ConnectionId, Destination};
use derive_more::IntoIterator;
use serde::{Deserialize, Serialize};
use std::{

@ -1,5 +1,4 @@
use super::{ChannelId, ConnectionId, Destination};
use crate::Map;
use crate::common::{ConnectionId, Destination, Map};
use derive_more::IsVariant;
use serde::{Deserialize, Serialize};
use strum::{AsRefStr, EnumDiscriminants, EnumIter, EnumMessage, EnumString};
@ -33,7 +32,7 @@ pub enum ManagerRequest {
Capabilities,
/// Launch a server using the manager
#[strum_discriminants(strum(message = "Supports launching distant on remote servers"))]
#[strum_discriminants(strum(message = "Supports launching a server on remote machines"))]
Launch {
// NOTE: Boxed per clippy's large_enum_variant warning
destination: Box<Destination>,
@ -94,8 +93,4 @@ pub enum ManagerRequest {
/// Retrieve list of connections being managed
#[strum_discriminants(strum(message = "Supports retrieving a list of managed connections"))]
List,
/// Signals the manager to shutdown
#[strum_discriminants(strum(message = "Supports being shut down on demand"))]
Shutdown,
}

@ -1,5 +1,5 @@
use crate::{data::Error, ConnectionInfo, ConnectionList, Destination, ManagerCapabilities};
use crate::{ChannelId, ConnectionId};
use super::{ConnectionInfo, ConnectionList, Error, ManagerCapabilities};
use crate::common::{ConnectionId, Destination};
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
@ -8,16 +8,13 @@ pub enum ManagerResponse {
/// Acknowledgement that a connection was killed
Killed,
/// Broadcast that the manager is shutting down (not guaranteed to be sent)
Shutdown,
/// Indicates that some error occurred during a request
Error(Error),
/// Response to retrieving information about the manager's capabilities
Capabilities { supported: ManagerCapabilities },
/// Confirmation of a distant server being launched
/// Confirmation of a server being launched
Launched {
/// Updated location of the spawned server
destination: Destination,

@ -1,13 +1,13 @@
use crate::{
ChannelId, ConnectionId, ConnectionInfo, ConnectionList, Destination, ManagerCapabilities,
ManagerRequest, ManagerResponse, Map,
};
use async_trait::async_trait;
use distant_net::{
common::{Listener, MpscListener, Request, Response},
client::Client,
common::{Listener, Map, MpscListener, Request, Response},
manager::{
ChannelId, ConnectionId, ConnectionInfo, ConnectionList, Destination, ManagerCapabilities,
ManagerRequest, ManagerResponse,
},
server::{Server, ServerCtx, ServerHandler},
Client,
};
use async_trait::async_trait;
use log::*;
use std::{collections::HashMap, io, sync::Arc};
use tokio::{
@ -21,97 +21,26 @@ pub use config::*;
mod connection;
pub use connection::*;
mod ext;
pub use ext::*;
mod handler;
pub use handler::*;
mod r#ref;
pub use r#ref::*;
/// Represents a manager of multiple distant server connections
pub struct DistantManager {
/// Represents a manager of multiple server connections.
pub struct ManagerServer {
/// Configuration settings for the server
config: DistantManagerConfig,
config: Config,
/// Mapping of connection id -> connection
connections: RwLock<HashMap<ConnectionId, DistantManagerConnection>>,
/// Handlers for launch requests
launch_handlers: Arc<RwLock<HashMap<String, BoxedLaunchHandler>>>,
/// Handlers for connect requests
connect_handlers: Arc<RwLock<HashMap<String, BoxedConnectHandler>>>,
/// Primary task of server
task: JoinHandle<()>,
connections: RwLock<HashMap<ConnectionId, ManagerConnection>>,
}
impl DistantManager {
/// Initializes a new instance of [`DistantManagerServer`] using the provided [`UntypedTransport`]
pub fn start<L, T>(
mut config: DistantManagerConfig,
mut listener: L,
) -> io::Result<DistantManagerRef>
where
L: Listener<Output = T> + 'static,
T: IntoSplit + Send + 'static,
T::Read: UntypedTransportRead + 'static,
T::Write: UntypedTransportWrite + 'static,
{
let (conn_tx, mpsc_listener) = MpscListener::channel(config.connection_buffer_size);
let (auth_client_tx, auth_client_rx) = mpsc::channel(1);
// Spawn task that uses our input listener to get both auth and manager events,
// forwarding manager events to the internal mpsc listener
let task = tokio::spawn(async move {
while let Ok(transport) = listener.accept().await {
let DistantManagerRouter {
auth_transport,
manager_transport,
..
} = DistantManagerRouter::new(transport);
let (writer, reader) = auth_transport.into_split();
let client = match Client::new(writer, reader) {
Ok(client) => client,
Err(x) => {
error!("Creating auth client failed: {}", x);
continue;
}
};
let auth_client = AuthClient::from(client);
// Forward auth client for new connection in server
if auth_client_tx.send(auth_client).await.is_err() {
break;
}
// Forward connected and routed transport to server
if conn_tx.send(manager_transport.into_split()).await.is_err() {
break;
}
}
});
let launch_handlers = Arc::new(RwLock::new(config.launch_handlers.drain().collect()));
let weak_launch_handlers = Arc::downgrade(&launch_handlers);
let connect_handlers = Arc::new(RwLock::new(config.connect_handlers.drain().collect()));
let weak_connect_handlers = Arc::downgrade(&connect_handlers);
let server_ref = Self {
impl ManagerServer {
/// Creates a new [`Server`] starting with a default configuration and no authentication
/// methods. The provided `config` will be used to configure the launch and connect handlers
/// for the server as well as provide other defaults.
pub fn new(mut config: Config) -> Server<Self> {
Server::new().handler(Self {
config,
launch_handlers,
connect_handlers,
connections: RwLock::new(HashMap::new()),
task,
}
.start(mpsc_listener)?;
Ok(DistantManagerRef {
launch_handlers: weak_launch_handlers,
connect_handlers: weak_connect_handlers,
inner: server_ref,
})
}
@ -148,8 +77,7 @@ impl DistantManager {
.to_lowercase();
let credentials = {
let lock = self.launch_handlers.read().await;
let handler = lock.get(&scheme).ok_or_else(|| {
let handler = self.config.launch_handlers.get(&scheme).ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
format!("No launch handler registered for {}", scheme),
@ -193,8 +121,7 @@ impl DistantManager {
.to_lowercase();
let transport = {
let lock = self.connect_handlers.read().await;
let handler = lock.get(&scheme).ok_or_else(|| {
let handler = self.config.connect_handlers.get(&scheme).ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
format!("No connect handler registered for {}", scheme),
@ -203,7 +130,7 @@ impl DistantManager {
handler.connect(&destination, &options, auth).await?
};
let connection = DistantManagerConnection::new(destination, options, transport);
let connection = ManagerConnection::new(destination, options, transport);
let id = connection.id;
self.connections.write().await.insert(id, connection);
Ok(id)
@ -257,30 +184,15 @@ impl DistantManager {
pub struct DistantManagerServerConnection {
/// Holds on to open channels feeding data back from a server to some connected client,
/// enabling us to cancel the tasks on demand
channels: RwLock<HashMap<ChannelId, DistantManagerChannel>>,
channels: RwLock<HashMap<ChannelId, ManagerChannel>>,
}
#[async_trait]
impl ServerHandler for DistantManager {
impl ServerHandler for ManagerServer {
type Request = ManagerRequest;
type Response = ManagerResponse;
type LocalData = DistantManagerServerConnection;
async fn on_accept(&self, local_data: &mut Self::LocalData) {
local_data.auth_client = self
.auth_client_rx
.lock()
.await
.recv()
.await
.map(Mutex::new);
// Enable jit handshake
if let Some(auth_client) = local_data.auth_client.as_ref() {
auth_client.lock().await.set_jit_handshake(true);
}
}
async fn on_request(&self, ctx: ServerCtx<Self::Request, Self::Response, Self::LocalData>) {
let ServerCtx {
connection_id,
@ -386,22 +298,6 @@ impl ServerHandler for DistantManager {
Ok(()) => ManagerResponse::Killed,
Err(x) => ManagerResponse::Error(x.into()),
},
ManagerRequest::Shutdown => {
if let Err(x) = reply.send(ManagerResponse::Shutdown).await {
error!("[Conn {}] {}", connection_id, x);
}
// Clear out handler state in order to trigger drops
self.launch_handlers.write().await.clear();
self.connect_handlers.write().await.clear();
// Shutdown the primary server task
self.task.abort();
// TODO: Perform a graceful shutdown instead of this?
// Review https://tokio.rs/tokio/topics/shutdown
std::process::exit(0);
}
};
if let Err(x) = reply.send(response).await {
@ -413,14 +309,14 @@ impl ServerHandler for DistantManager {
#[cfg(test)]
mod tests {
use super::*;
use distant_net::{
use crate::{
common::{FramedTransport, InmemoryTransport, MappedListener, OneshotListener, PlainCodec},
server::ServerRef,
};
/// Create a new server, bypassing the start loop
fn setup() -> DistantManager {
DistantManager {
fn setup() -> ManagerServer {
ManagerServer {
config: Default::default(),
connections: RwLock::new(HashMap::new()),
launch_handlers: Arc::new(RwLock::new(HashMap::new())),
@ -612,7 +508,7 @@ mod tests {
let server = setup();
let (writer, reader) = dummy_distant_writer_reader();
let connection = DistantManagerConnection::new(
let connection = ManagerConnection::new(
"scheme://host".parse().unwrap(),
"key=value".parse().unwrap(),
writer,
@ -645,7 +541,7 @@ mod tests {
let server = setup();
let (writer, reader) = dummy_distant_writer_reader();
let connection = DistantManagerConnection::new(
let connection = ManagerConnection::new(
"scheme://host".parse().unwrap(),
"key=value".parse().unwrap(),
writer,
@ -655,7 +551,7 @@ mod tests {
server.connections.write().await.insert(id_1, connection);
let (writer, reader) = dummy_distant_writer_reader();
let connection = DistantManagerConnection::new(
let connection = ManagerConnection::new(
"other://host2".parse().unwrap(),
"key=value".parse().unwrap(),
writer,
@ -688,7 +584,7 @@ mod tests {
let server = setup();
let (writer, reader) = dummy_distant_writer_reader();
let connection = DistantManagerConnection::new(
let connection = ManagerConnection::new(
"scheme://host".parse().unwrap(),
"key=value".parse().unwrap(),
writer,

@ -1,7 +1,8 @@
use crate::{BoxedConnectHandler, BoxedLaunchHandler};
use super::{BoxedConnectHandler, BoxedLaunchHandler};
use std::collections::HashMap;
pub struct DistantManagerConfig {
/// Configuration settings for a manager.
pub struct Config {
/// Scheme to use when none is provided in a destination for launch
pub launch_fallback_scheme: String,
@ -21,7 +22,7 @@ pub struct DistantManagerConfig {
pub connect_handlers: HashMap<String, BoxedConnectHandler>,
}
impl Default for DistantManagerConfig {
impl Default for Config {
fn default() -> Self {
Self {
// Default to using ssh to launch distant

@ -1,18 +1,18 @@
use crate::{
data::Map,
manager::data::{ChannelId, ConnectionId, Destination},
DistantMsg, DistantRequestData, DistantResponseData, ManagerResponse,
};
use distant_net::{
common::{FramedTransport, Interest, Request, Transport},
common::{
ConnectionId, Destination, FramedTransport, Interest, Map, Request, Transport,
UntypedRequest, UntypedResponse,
},
manager::data::ManagerResponse,
server::{ServerRef, ServerReply},
};
use log::*;
use serde::Serialize;
use std::{collections::HashMap, io, time::Duration};
use tokio::{sync::mpsc, task::JoinHandle};
/// Represents a connection a distant manager has with some distant-compatible server
pub struct DistantManagerConnection {
pub struct ManagerConnection {
pub id: ConnectionId,
pub destination: Destination,
pub options: Map,
@ -22,17 +22,17 @@ pub struct DistantManagerConnection {
}
#[derive(Clone)]
pub struct DistantManagerChannel {
pub struct ManagerChannel {
channel_id: ChannelId,
tx: mpsc::Sender<Action>,
}
impl DistantManagerChannel {
impl ManagerChannel {
pub fn id(&self) -> ChannelId {
self.channel_id
}
pub async fn send(&self, request: Request<DistantMsg<DistantRequestData>>) -> io::Result<()> {
pub async fn send<T: Serialize>(&self, request: Request<T>) -> io::Result<()> {
let channel_id = self.channel_id;
self.tx
.send(Action::Write {
@ -62,7 +62,7 @@ impl DistantManagerChannel {
}
}
impl DistantManagerConnection {
impl ManagerConnection {
pub fn new<T: Transport>(
destination: Destination,
options: Map,
@ -84,7 +84,7 @@ impl DistantManagerConnection {
pub async fn open_channel(
&self,
reply: ServerReply<ManagerResponse>,
) -> io::Result<DistantManagerChannel> {
) -> io::Result<ManagerChannel> {
let channel_id = rand::random();
self.tx
.send(Action::Register {
@ -98,14 +98,14 @@ impl DistantManagerConnection {
format!("open_channel failed: {x}"),
)
})?;
Ok(DistantManagerChannel {
Ok(ManagerChannel {
channel_id,
tx: self.tx.clone(),
})
}
}
impl Drop for DistantManagerConnection {
impl Drop for ManagerConnection {
fn drop(&mut self) {
self.transport_task.abort();
self.action_task.abort();

@ -1,13 +1,11 @@
use crate::{data::Map, manager::data::Destination};
use crate::common::{authentication::Authenticator, Destination, FramedTransport, Map, Transport};
use async_trait::async_trait;
use distant_net::common::{authentication::Authenticator, FramedTransport, Transport};
use std::{future::Future, io};
pub type BoxedLaunchHandler = Box<dyn LaunchHandler>;
pub type BoxedConnectHandler = Box<dyn ConnectHandler>;
/// Represents an interface to start a server at some remote `destination` and then connect to the
/// started server.
/// Represents an interface to start a server at some remote `destination`.
///
/// * `destination` is the location where the server will be started.
/// * `options` is provided to include extra information needed to launch or establish the
Loading…
Cancel
Save