Comment out auth skip code and remove local data from server

unused/TracingSupport
Chip Senkbeil 11 months ago
parent 2d9363fe6a
commit 29be68c49d
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

@ -4,7 +4,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use distant_net::common::ConnectionId;
use distant_net::server::{ConnectionCtx, Reply, ServerCtx, ServerHandler};
use distant_net::server::{Reply, RequestCtx, ServerHandler};
use log::*;
use crate::protocol::{
@ -16,23 +16,22 @@ mod reply;
use reply::DistantSingleReply;
/// Represents the context provided to the [`DistantApi`] for incoming requests
pub struct DistantCtx<T> {
pub struct DistantCtx {
pub connection_id: ConnectionId,
pub reply: Box<dyn Reply<Data = protocol::Response>>,
pub local_data: Arc<T>,
}
/// Represents a [`ServerHandler`] that leverages an API compliant with `distant`
pub struct DistantApiServerHandler<T, D>
pub struct DistantApiServerHandler<T>
where
T: DistantApi<LocalData = D>,
T: DistantApi,
{
api: Arc<T>,
}
impl<T, D> DistantApiServerHandler<T, D>
impl<T> DistantApiServerHandler<T>
where
T: DistantApi<LocalData = D>,
T: DistantApi,
{
pub fn new(api: T) -> Self {
Self { api: Arc::new(api) }
@ -51,12 +50,15 @@ fn unsupported<T>(label: &str) -> io::Result<T> {
/// which can be used to build other servers that are compatible with distant
#[async_trait]
pub trait DistantApi {
type LocalData: Send + Sync;
/// Invoked whenever a new connection is established.
#[allow(unused_variables)]
async fn on_connect(&self, id: ConnectionId) -> io::Result<()> {
Ok(())
}
/// Invoked whenever a new connection is established, providing a mutable reference to the
/// newly-created local data. This is a way to support modifying local data before it is used.
/// Invoked whenever an existing connection is dropped.
#[allow(unused_variables)]
async fn on_accept(&self, ctx: ConnectionCtx<'_, Self::LocalData>) -> io::Result<()> {
async fn on_disconnect(&self, id: ConnectionId) -> io::Result<()> {
Ok(())
}
@ -64,7 +66,7 @@ pub trait DistantApi {
///
/// *Override this, otherwise it will return "unsupported" as an error.*
#[allow(unused_variables)]
async fn version(&self, ctx: DistantCtx<Self::LocalData>) -> io::Result<Version> {
async fn version(&self, ctx: DistantCtx) -> io::Result<Version> {
unsupported("version")
}
@ -74,11 +76,7 @@ pub trait DistantApi {
///
/// *Override this, otherwise it will return "unsupported" as an error.*
#[allow(unused_variables)]
async fn read_file(
&self,
ctx: DistantCtx<Self::LocalData>,
path: PathBuf,
) -> io::Result<Vec<u8>> {
async fn read_file(&self, ctx: DistantCtx, path: PathBuf) -> io::Result<Vec<u8>> {
unsupported("read_file")
}
@ -88,11 +86,7 @@ pub trait DistantApi {
///
/// *Override this, otherwise it will return "unsupported" as an error.*
#[allow(unused_variables)]
async fn read_file_text(
&self,
ctx: DistantCtx<Self::LocalData>,
path: PathBuf,
) -> io::Result<String> {
async fn read_file_text(&self, ctx: DistantCtx, path: PathBuf) -> io::Result<String> {
unsupported("read_file_text")
}
@ -103,12 +97,7 @@ pub trait DistantApi {
///
/// *Override this, otherwise it will return "unsupported" as an error.*
#[allow(unused_variables)]
async fn write_file(
&self,
ctx: DistantCtx<Self::LocalData>,
path: PathBuf,
data: Vec<u8>,
) -> io::Result<()> {
async fn write_file(&self, ctx: DistantCtx, path: PathBuf, data: Vec<u8>) -> io::Result<()> {
unsupported("write_file")
}
@ -121,7 +110,7 @@ pub trait DistantApi {
#[allow(unused_variables)]
async fn write_file_text(
&self,
ctx: DistantCtx<Self::LocalData>,
ctx: DistantCtx,
path: PathBuf,
data: String,
) -> io::Result<()> {
@ -135,12 +124,7 @@ pub trait DistantApi {
///
/// *Override this, otherwise it will return "unsupported" as an error.*
#[allow(unused_variables)]
async fn append_file(
&self,
ctx: DistantCtx<Self::LocalData>,
path: PathBuf,
data: Vec<u8>,
) -> io::Result<()> {
async fn append_file(&self, ctx: DistantCtx, path: PathBuf, data: Vec<u8>) -> io::Result<()> {
unsupported("append_file")
}
@ -153,7 +137,7 @@ pub trait DistantApi {
#[allow(unused_variables)]
async fn append_file_text(
&self,
ctx: DistantCtx<Self::LocalData>,
ctx: DistantCtx,
path: PathBuf,
data: String,
) -> io::Result<()> {
@ -172,7 +156,7 @@ pub trait DistantApi {
#[allow(unused_variables)]
async fn read_dir(
&self,
ctx: DistantCtx<Self::LocalData>,
ctx: DistantCtx,
path: PathBuf,
depth: usize,
absolute: bool,
@ -189,12 +173,7 @@ pub trait DistantApi {
///
/// *Override this, otherwise it will return "unsupported" as an error.*
#[allow(unused_variables)]
async fn create_dir(
&self,
ctx: DistantCtx<Self::LocalData>,
path: PathBuf,
all: bool,
) -> io::Result<()> {
async fn create_dir(&self, ctx: DistantCtx, path: PathBuf, all: bool) -> io::Result<()> {
unsupported("create_dir")
}
@ -205,12 +184,7 @@ pub trait DistantApi {
///
/// *Override this, otherwise it will return "unsupported" as an error.*
#[allow(unused_variables)]
async fn copy(
&self,
ctx: DistantCtx<Self::LocalData>,
src: PathBuf,
dst: PathBuf,
) -> io::Result<()> {
async fn copy(&self, ctx: DistantCtx, src: PathBuf, dst: PathBuf) -> io::Result<()> {
unsupported("copy")
}
@ -221,12 +195,7 @@ pub trait DistantApi {
///
/// *Override this, otherwise it will return "unsupported" as an error.*
#[allow(unused_variables)]
async fn remove(
&self,
ctx: DistantCtx<Self::LocalData>,
path: PathBuf,
force: bool,
) -> io::Result<()> {
async fn remove(&self, ctx: DistantCtx, path: PathBuf, force: bool) -> io::Result<()> {
unsupported("remove")
}
@ -237,12 +206,7 @@ pub trait DistantApi {
///
/// *Override this, otherwise it will return "unsupported" as an error.*
#[allow(unused_variables)]
async fn rename(
&self,
ctx: DistantCtx<Self::LocalData>,
src: PathBuf,
dst: PathBuf,
) -> io::Result<()> {
async fn rename(&self, ctx: DistantCtx, src: PathBuf, dst: PathBuf) -> io::Result<()> {
unsupported("rename")
}
@ -257,7 +221,7 @@ pub trait DistantApi {
#[allow(unused_variables)]
async fn watch(
&self,
ctx: DistantCtx<Self::LocalData>,
ctx: DistantCtx,
path: PathBuf,
recursive: bool,
only: Vec<ChangeKind>,
@ -272,7 +236,7 @@ pub trait DistantApi {
///
/// *Override this, otherwise it will return "unsupported" as an error.*
#[allow(unused_variables)]
async fn unwatch(&self, ctx: DistantCtx<Self::LocalData>, path: PathBuf) -> io::Result<()> {
async fn unwatch(&self, ctx: DistantCtx, path: PathBuf) -> io::Result<()> {
unsupported("unwatch")
}
@ -282,7 +246,7 @@ pub trait DistantApi {
///
/// *Override this, otherwise it will return "unsupported" as an error.*
#[allow(unused_variables)]
async fn exists(&self, ctx: DistantCtx<Self::LocalData>, path: PathBuf) -> io::Result<bool> {
async fn exists(&self, ctx: DistantCtx, path: PathBuf) -> io::Result<bool> {
unsupported("exists")
}
@ -296,7 +260,7 @@ pub trait DistantApi {
#[allow(unused_variables)]
async fn metadata(
&self,
ctx: DistantCtx<Self::LocalData>,
ctx: DistantCtx,
path: PathBuf,
canonicalize: bool,
resolve_file_type: bool,
@ -314,7 +278,7 @@ pub trait DistantApi {
#[allow(unused_variables)]
async fn set_permissions(
&self,
ctx: DistantCtx<Self::LocalData>,
ctx: DistantCtx,
path: PathBuf,
permissions: Permissions,
options: SetPermissionsOptions,
@ -328,11 +292,7 @@ pub trait DistantApi {
///
/// *Override this, otherwise it will return "unsupported" as an error.*
#[allow(unused_variables)]
async fn search(
&self,
ctx: DistantCtx<Self::LocalData>,
query: SearchQuery,
) -> io::Result<SearchId> {
async fn search(&self, ctx: DistantCtx, query: SearchQuery) -> io::Result<SearchId> {
unsupported("search")
}
@ -342,11 +302,7 @@ pub trait DistantApi {
///
/// *Override this, otherwise it will return "unsupported" as an error.*
#[allow(unused_variables)]
async fn cancel_search(
&self,
ctx: DistantCtx<Self::LocalData>,
id: SearchId,
) -> io::Result<()> {
async fn cancel_search(&self, ctx: DistantCtx, id: SearchId) -> io::Result<()> {
unsupported("cancel_search")
}
@ -361,7 +317,7 @@ pub trait DistantApi {
#[allow(unused_variables)]
async fn proc_spawn(
&self,
ctx: DistantCtx<Self::LocalData>,
ctx: DistantCtx,
cmd: String,
environment: Environment,
current_dir: Option<PathBuf>,
@ -376,7 +332,7 @@ pub trait DistantApi {
///
/// *Override this, otherwise it will return "unsupported" as an error.*
#[allow(unused_variables)]
async fn proc_kill(&self, ctx: DistantCtx<Self::LocalData>, id: ProcessId) -> io::Result<()> {
async fn proc_kill(&self, ctx: DistantCtx, id: ProcessId) -> io::Result<()> {
unsupported("proc_kill")
}
@ -387,12 +343,7 @@ pub trait DistantApi {
///
/// *Override this, otherwise it will return "unsupported" as an error.*
#[allow(unused_variables)]
async fn proc_stdin(
&self,
ctx: DistantCtx<Self::LocalData>,
id: ProcessId,
data: Vec<u8>,
) -> io::Result<()> {
async fn proc_stdin(&self, ctx: DistantCtx, id: ProcessId, data: Vec<u8>) -> io::Result<()> {
unsupported("proc_stdin")
}
@ -405,7 +356,7 @@ pub trait DistantApi {
#[allow(unused_variables)]
async fn proc_resize_pty(
&self,
ctx: DistantCtx<Self::LocalData>,
ctx: DistantCtx,
id: ProcessId,
size: PtySize,
) -> io::Result<()> {
@ -416,32 +367,34 @@ pub trait DistantApi {
///
/// *Override this, otherwise it will return "unsupported" as an error.*
#[allow(unused_variables)]
async fn system_info(&self, ctx: DistantCtx<Self::LocalData>) -> io::Result<SystemInfo> {
async fn system_info(&self, ctx: DistantCtx) -> io::Result<SystemInfo> {
unsupported("system_info")
}
}
#[async_trait]
impl<T, D> ServerHandler for DistantApiServerHandler<T, D>
impl<T> ServerHandler for DistantApiServerHandler<T>
where
T: DistantApi<LocalData = D> + Send + Sync + 'static,
D: Send + Sync + 'static,
T: DistantApi + Send + Sync + 'static,
{
type LocalData = D;
type Request = protocol::Msg<protocol::Request>;
type Response = protocol::Msg<protocol::Response>;
/// Overridden to leverage [`DistantApi`] implementation of `on_accept`
async fn on_accept(&self, ctx: ConnectionCtx<'_, Self::LocalData>) -> io::Result<()> {
T::on_accept(&self.api, ctx).await
/// Overridden to leverage [`DistantApi`] implementation of `on_connect`.
async fn on_connect(&self, id: ConnectionId) -> io::Result<()> {
T::on_connect(&self.api, id).await
}
/// Overridden to leverage [`DistantApi`] implementation of `on_disconnect`.
async fn on_disconnect(&self, id: ConnectionId) -> io::Result<()> {
T::on_disconnect(&self.api, id).await
}
async fn on_request(&self, ctx: ServerCtx<Self::Request, Self::Response, Self::LocalData>) {
let ServerCtx {
async fn on_request(&self, ctx: RequestCtx<Self::Request, Self::Response>) {
let RequestCtx {
connection_id,
request,
reply,
local_data,
} = ctx;
// Convert our reply to a queued reply so we can ensure that the result
@ -454,7 +407,6 @@ where
let ctx = DistantCtx {
connection_id,
reply: Box::new(DistantSingleReply::from(reply.clone_reply())),
local_data,
};
let data = handle_request(Arc::clone(&self.api), ctx, data).await;
@ -485,7 +437,6 @@ where
let ctx = DistantCtx {
connection_id,
reply: Box::new(DistantSingleReply::from(reply.clone_reply())),
local_data: Arc::clone(&local_data),
};
let data = handle_request(Arc::clone(&self.api), ctx, data).await;
@ -513,7 +464,6 @@ where
let ctx = DistantCtx {
connection_id,
reply: Box::new(DistantSingleReply::from(reply.clone_reply())),
local_data: Arc::clone(&local_data),
};
let task = tokio::spawn(async move {
@ -560,14 +510,13 @@ where
}
/// Processes an incoming request
async fn handle_request<T, D>(
async fn handle_request<T>(
api: Arc<T>,
ctx: DistantCtx<D>,
ctx: DistantCtx,
request: protocol::Request,
) -> protocol::Response
where
T: DistantApi<LocalData = D> + Send + Sync,
D: Send + Sync,
T: DistantApi + Send + Sync,
{
match request {
protocol::Request::Version {} => api

@ -11,9 +11,7 @@ use distant_net::common::{InmemoryTransport, OneshotListener};
use distant_net::server::{Server, ServerRef};
/// Stands up an inmemory client and server using the given api.
async fn setup(
api: impl DistantApi<LocalData = ()> + Send + Sync + 'static,
) -> (DistantClient, ServerRef) {
async fn setup(api: impl DistantApi + Send + Sync + 'static) -> (DistantClient, ServerRef) {
let (t1, t2) = InmemoryTransport::pair(100);
let server = Server::new()
@ -42,13 +40,7 @@ mod single {
#[async_trait]
impl DistantApi for TestDistantApi {
type LocalData = ();
async fn read_file(
&self,
_ctx: DistantCtx<Self::LocalData>,
_path: PathBuf,
) -> io::Result<Vec<u8>> {
async fn read_file(&self, _ctx: DistantCtx, _path: PathBuf) -> io::Result<Vec<u8>> {
Err(io::Error::new(io::ErrorKind::NotFound, "test error"))
}
}
@ -66,13 +58,7 @@ mod single {
#[async_trait]
impl DistantApi for TestDistantApi {
type LocalData = ();
async fn read_file(
&self,
_ctx: DistantCtx<Self::LocalData>,
_path: PathBuf,
) -> io::Result<Vec<u8>> {
async fn read_file(&self, _ctx: DistantCtx, _path: PathBuf) -> io::Result<Vec<u8>> {
Ok(b"hello world".to_vec())
}
}
@ -97,13 +83,7 @@ mod batch_parallel {
#[async_trait]
impl DistantApi for TestDistantApi {
type LocalData = ();
async fn read_file(
&self,
_ctx: DistantCtx<Self::LocalData>,
path: PathBuf,
) -> io::Result<Vec<u8>> {
async fn read_file(&self, _ctx: DistantCtx, path: PathBuf) -> io::Result<Vec<u8>> {
if path.to_str().unwrap() == "slow" {
tokio::time::sleep(Duration::from_millis(500)).await;
}
@ -155,13 +135,7 @@ mod batch_parallel {
#[async_trait]
impl DistantApi for TestDistantApi {
type LocalData = ();
async fn read_file(
&self,
_ctx: DistantCtx<Self::LocalData>,
path: PathBuf,
) -> io::Result<Vec<u8>> {
async fn read_file(&self, _ctx: DistantCtx, path: PathBuf) -> io::Result<Vec<u8>> {
if path.to_str().unwrap() == "fail" {
return Err(io::Error::new(io::ErrorKind::Other, "test error"));
}
@ -223,13 +197,7 @@ mod batch_sequence {
#[async_trait]
impl DistantApi for TestDistantApi {
type LocalData = ();
async fn read_file(
&self,
_ctx: DistantCtx<Self::LocalData>,
path: PathBuf,
) -> io::Result<Vec<u8>> {
async fn read_file(&self, _ctx: DistantCtx, path: PathBuf) -> io::Result<Vec<u8>> {
if path.to_str().unwrap() == "slow" {
tokio::time::sleep(Duration::from_millis(500)).await;
}
@ -284,13 +252,7 @@ mod batch_sequence {
#[async_trait]
impl DistantApi for TestDistantApi {
type LocalData = ();
async fn read_file(
&self,
_ctx: DistantCtx<Self::LocalData>,
path: PathBuf,
) -> io::Result<Vec<u8>> {
async fn read_file(&self, _ctx: DistantCtx, path: PathBuf) -> io::Result<Vec<u8>> {
if path.to_str().unwrap() == "fail" {
return Err(io::Error::new(io::ErrorKind::Other, "test error"));
}

@ -39,13 +39,7 @@ impl Api {
#[async_trait]
impl DistantApi for Api {
type LocalData = ();
async fn read_file(
&self,
ctx: DistantCtx<Self::LocalData>,
path: PathBuf,
) -> io::Result<Vec<u8>> {
async fn read_file(&self, ctx: DistantCtx, path: PathBuf) -> io::Result<Vec<u8>> {
debug!(
"[Conn {}] Reading bytes from file {:?}",
ctx.connection_id, path
@ -54,11 +48,7 @@ impl DistantApi for Api {
tokio::fs::read(path).await
}
async fn read_file_text(
&self,
ctx: DistantCtx<Self::LocalData>,
path: PathBuf,
) -> io::Result<String> {
async fn read_file_text(&self, ctx: DistantCtx, path: PathBuf) -> io::Result<String> {
debug!(
"[Conn {}] Reading text from file {:?}",
ctx.connection_id, path
@ -67,12 +57,7 @@ impl DistantApi for Api {
tokio::fs::read_to_string(path).await
}
async fn write_file(
&self,
ctx: DistantCtx<Self::LocalData>,
path: PathBuf,
data: Vec<u8>,
) -> io::Result<()> {
async fn write_file(&self, ctx: DistantCtx, path: PathBuf, data: Vec<u8>) -> io::Result<()> {
debug!(
"[Conn {}] Writing bytes to file {:?}",
ctx.connection_id, path
@ -83,7 +68,7 @@ impl DistantApi for Api {
async fn write_file_text(
&self,
ctx: DistantCtx<Self::LocalData>,
ctx: DistantCtx,
path: PathBuf,
data: String,
) -> io::Result<()> {
@ -95,12 +80,7 @@ impl DistantApi for Api {
tokio::fs::write(path, data).await
}
async fn append_file(
&self,
ctx: DistantCtx<Self::LocalData>,
path: PathBuf,
data: Vec<u8>,
) -> io::Result<()> {
async fn append_file(&self, ctx: DistantCtx, path: PathBuf, data: Vec<u8>) -> io::Result<()> {
debug!(
"[Conn {}] Appending bytes to file {:?}",
ctx.connection_id, path
@ -116,7 +96,7 @@ impl DistantApi for Api {
async fn append_file_text(
&self,
ctx: DistantCtx<Self::LocalData>,
ctx: DistantCtx,
path: PathBuf,
data: String,
) -> io::Result<()> {
@ -135,7 +115,7 @@ impl DistantApi for Api {
async fn read_dir(
&self,
ctx: DistantCtx<Self::LocalData>,
ctx: DistantCtx,
path: PathBuf,
depth: usize,
absolute: bool,
@ -228,12 +208,7 @@ impl DistantApi for Api {
Ok((entries, errors))
}
async fn create_dir(
&self,
ctx: DistantCtx<Self::LocalData>,
path: PathBuf,
all: bool,
) -> io::Result<()> {
async fn create_dir(&self, ctx: DistantCtx, path: PathBuf, all: bool) -> io::Result<()> {
debug!(
"[Conn {}] Creating directory {:?} {{all: {}}}",
ctx.connection_id, path, all
@ -245,12 +220,7 @@ impl DistantApi for Api {
}
}
async fn remove(
&self,
ctx: DistantCtx<Self::LocalData>,
path: PathBuf,
force: bool,
) -> io::Result<()> {
async fn remove(&self, ctx: DistantCtx, path: PathBuf, force: bool) -> io::Result<()> {
debug!(
"[Conn {}] Removing {:?} {{force: {}}}",
ctx.connection_id, path, force
@ -267,12 +237,7 @@ impl DistantApi for Api {
}
}
async fn copy(
&self,
ctx: DistantCtx<Self::LocalData>,
src: PathBuf,
dst: PathBuf,
) -> io::Result<()> {
async fn copy(&self, ctx: DistantCtx, src: PathBuf, dst: PathBuf) -> io::Result<()> {
debug!(
"[Conn {}] Copying {:?} to {:?}",
ctx.connection_id, src, dst
@ -329,12 +294,7 @@ impl DistantApi for Api {
Ok(())
}
async fn rename(
&self,
ctx: DistantCtx<Self::LocalData>,
src: PathBuf,
dst: PathBuf,
) -> io::Result<()> {
async fn rename(&self, ctx: DistantCtx, src: PathBuf, dst: PathBuf) -> io::Result<()> {
debug!(
"[Conn {}] Renaming {:?} to {:?}",
ctx.connection_id, src, dst
@ -344,7 +304,7 @@ impl DistantApi for Api {
async fn watch(
&self,
ctx: DistantCtx<Self::LocalData>,
ctx: DistantCtx,
path: PathBuf,
recursive: bool,
only: Vec<ChangeKind>,
@ -372,7 +332,7 @@ impl DistantApi for Api {
Ok(())
}
async fn unwatch(&self, ctx: DistantCtx<Self::LocalData>, path: PathBuf) -> io::Result<()> {
async fn unwatch(&self, ctx: DistantCtx, path: PathBuf) -> io::Result<()> {
debug!("[Conn {}] Unwatching {:?}", ctx.connection_id, path);
self.state
@ -382,7 +342,7 @@ impl DistantApi for Api {
Ok(())
}
async fn exists(&self, ctx: DistantCtx<Self::LocalData>, path: PathBuf) -> io::Result<bool> {
async fn exists(&self, ctx: DistantCtx, path: PathBuf) -> io::Result<bool> {
debug!("[Conn {}] Checking if {:?} exists", ctx.connection_id, path);
// Following experimental `std::fs::try_exists`, which checks the error kind of the
@ -396,7 +356,7 @@ impl DistantApi for Api {
async fn metadata(
&self,
ctx: DistantCtx<Self::LocalData>,
ctx: DistantCtx,
path: PathBuf,
canonicalize: bool,
resolve_file_type: bool,
@ -469,7 +429,7 @@ impl DistantApi for Api {
async fn set_permissions(
&self,
_ctx: DistantCtx<Self::LocalData>,
_ctx: DistantCtx,
path: PathBuf,
permissions: Permissions,
options: SetPermissionsOptions,
@ -596,11 +556,7 @@ impl DistantApi for Api {
}
}
async fn search(
&self,
ctx: DistantCtx<Self::LocalData>,
query: SearchQuery,
) -> io::Result<SearchId> {
async fn search(&self, ctx: DistantCtx, query: SearchQuery) -> io::Result<SearchId> {
debug!(
"[Conn {}] Performing search via {query:?}",
ctx.connection_id,
@ -609,11 +565,7 @@ impl DistantApi for Api {
self.state.search.start(query, ctx.reply).await
}
async fn cancel_search(
&self,
ctx: DistantCtx<Self::LocalData>,
id: SearchId,
) -> io::Result<()> {
async fn cancel_search(&self, ctx: DistantCtx, id: SearchId) -> io::Result<()> {
debug!("[Conn {}] Cancelling search {id}", ctx.connection_id,);
self.state.search.cancel(id).await
@ -621,7 +573,7 @@ impl DistantApi for Api {
async fn proc_spawn(
&self,
ctx: DistantCtx<Self::LocalData>,
ctx: DistantCtx,
cmd: String,
environment: Environment,
current_dir: Option<PathBuf>,
@ -637,17 +589,12 @@ impl DistantApi for Api {
.await
}
async fn proc_kill(&self, ctx: DistantCtx<Self::LocalData>, id: ProcessId) -> io::Result<()> {
async fn proc_kill(&self, ctx: DistantCtx, id: ProcessId) -> io::Result<()> {
debug!("[Conn {}] Killing process {}", ctx.connection_id, id);
self.state.process.kill(id).await
}
async fn proc_stdin(
&self,
ctx: DistantCtx<Self::LocalData>,
id: ProcessId,
data: Vec<u8>,
) -> io::Result<()> {
async fn proc_stdin(&self, ctx: DistantCtx, id: ProcessId, data: Vec<u8>) -> io::Result<()> {
debug!(
"[Conn {}] Sending stdin to process {}",
ctx.connection_id, id
@ -657,7 +604,7 @@ impl DistantApi for Api {
async fn proc_resize_pty(
&self,
ctx: DistantCtx<Self::LocalData>,
ctx: DistantCtx,
id: ProcessId,
size: PtySize,
) -> io::Result<()> {
@ -668,7 +615,7 @@ impl DistantApi for Api {
self.state.process.resize_pty(id, size).await
}
async fn system_info(&self, ctx: DistantCtx<Self::LocalData>) -> io::Result<SystemInfo> {
async fn system_info(&self, ctx: DistantCtx) -> io::Result<SystemInfo> {
debug!("[Conn {}] Reading system information", ctx.connection_id);
Ok(SystemInfo {
family: env::consts::FAMILY.to_string(),
@ -685,7 +632,7 @@ impl DistantApi for Api {
})
}
async fn version(&self, ctx: DistantCtx<Self::LocalData>) -> io::Result<Version> {
async fn version(&self, ctx: DistantCtx) -> io::Result<Version> {
debug!("[Conn {}] Querying version", ctx.connection_id);
Ok(Version {
@ -698,11 +645,10 @@ impl DistantApi for Api {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;
use assert_fs::prelude::*;
use distant_core::net::server::{ConnectionCtx, Reply};
use distant_core::net::server::Reply;
use distant_core::protocol::Response;
use once_cell::sync::Lazy;
use predicates::prelude::*;
@ -773,7 +719,7 @@ mod tests {
const DEBOUNCE_TIMEOUT: Duration = Duration::from_millis(100);
async fn setup(buffer: usize) -> (Api, DistantCtx<()>, mpsc::Receiver<Response>) {
async fn setup(buffer: usize) -> (Api, DistantCtx, mpsc::Receiver<Response>) {
let api = Api::initialize(Config {
watch: WatchConfig {
debounce_timeout: DEBOUNCE_TIMEOUT,
@ -784,19 +730,10 @@ mod tests {
let (reply, rx) = make_reply(buffer);
let connection_id = rand::random();
DistantApi::on_accept(
&api,
ConnectionCtx {
connection_id,
local_data: &mut (),
},
)
.await
.unwrap();
DistantApi::on_connect(&api, connection_id).await.unwrap();
let ctx = DistantCtx {
connection_id,
reply,
local_data: Arc::new(()),
};
(api, ctx, rx)
}
@ -1683,7 +1620,6 @@ mod tests {
let ctx = DistantCtx {
connection_id: ctx_1.connection_id,
reply,
local_data: Arc::clone(&ctx_1.local_data),
};
(ctx, rx)
};
@ -2662,7 +2598,6 @@ mod tests {
let ctx = DistantCtx {
connection_id: ctx_1.connection_id,
reply,
local_data: Arc::clone(&ctx_1.local_data),
};
(ctx, rx)
};
@ -2723,7 +2658,6 @@ mod tests {
let ctx = DistantCtx {
connection_id: ctx_1.connection_id,
reply,
local_data: Arc::clone(&ctx_1.local_data),
};
(ctx, rx)
};

@ -9,10 +9,10 @@ mod config;
mod constants;
pub use api::Api;
pub use config::*;
use distant_core::{DistantApi, DistantApiServerHandler};
use distant_core::DistantApiServerHandler;
/// Implementation of [`DistantApiServerHandler`] using [`Api`].
pub type Handler = DistantApiServerHandler<Api, <Api as DistantApi>::LocalData>;
pub type Handler = DistantApiServerHandler<Api>;
/// Initializes a new [`Handler`].
pub fn new_handler(config: Config) -> std::io::Result<Handler> {

@ -12,7 +12,7 @@ use crate::manager::{
ConnectionInfo, ConnectionList, ManagerAuthenticationId, ManagerCapabilities, ManagerChannelId,
ManagerRequest, ManagerResponse,
};
use crate::server::{Server, ServerCtx, ServerHandler};
use crate::server::{RequestCtx, Server, ServerHandler};
mod authentication;
pub use authentication::*;
@ -31,6 +31,10 @@ pub struct ManagerServer {
/// Configuration settings for the server
config: Config,
/// Holds on to open channels feeding data back from a server to some connected client,
/// enabling us to cancel the tasks on demand
channels: RwLock<HashMap<ManagerChannelId, ManagerChannel>>,
/// Mapping of connection id -> connection
connections: RwLock<HashMap<ConnectionId, ManagerConnection>>,
@ -46,6 +50,7 @@ impl ManagerServer {
pub fn new(config: Config) -> Server<Self> {
Server::new().handler(Self {
config,
channels: RwLock::new(HashMap::new()),
connections: RwLock::new(HashMap::new()),
registry: Arc::new(RwLock::new(HashMap::new())),
})
@ -177,25 +182,16 @@ impl ManagerServer {
}
}
#[derive(Default)]
pub struct DistantManagerServerConnection {
/// Holds on to open channels feeding data back from a server to some connected client,
/// enabling us to cancel the tasks on demand
channels: RwLock<HashMap<ManagerChannelId, ManagerChannel>>,
}
#[async_trait]
impl ServerHandler for ManagerServer {
type LocalData = DistantManagerServerConnection;
type Request = ManagerRequest;
type Response = ManagerResponse;
async fn on_request(&self, ctx: ServerCtx<Self::Request, Self::Response, Self::LocalData>) {
let ServerCtx {
async fn on_request(&self, ctx: RequestCtx<Self::Request, Self::Response>) {
let RequestCtx {
connection_id,
request,
reply,
local_data,
} = ctx;
let response = match request.payload {
@ -256,7 +252,7 @@ impl ServerHandler for ManagerServer {
Ok(channel) => {
debug!("[Conn {id}] Channel {} has been opened", channel.id());
let id = channel.id();
local_data.channels.write().await.insert(id, channel);
self.channels.write().await.insert(id, channel);
ManagerResponse::ChannelOpened { id }
}
Err(x) => ManagerResponse::from(x),
@ -267,7 +263,7 @@ impl ServerHandler for ManagerServer {
)),
},
ManagerRequest::Channel { id, request } => {
match local_data.channels.read().await.get(&id) {
match self.channels.read().await.get(&id) {
// TODO: For now, we are NOT sending back a response to acknowledge
// a successful channel send. We could do this in order for
// the client to listen for a complete send, but is it worth it?
@ -281,21 +277,19 @@ impl ServerHandler for ManagerServer {
)),
}
}
ManagerRequest::CloseChannel { id } => {
match local_data.channels.write().await.remove(&id) {
Some(channel) => match channel.close() {
Ok(_) => {
debug!("Channel {id} has been closed");
ManagerResponse::ChannelClosed { id }
}
Err(x) => ManagerResponse::from(x),
},
None => ManagerResponse::from(io::Error::new(
io::ErrorKind::NotConnected,
"Channel is not open or does not exist",
)),
}
}
ManagerRequest::CloseChannel { id } => match self.channels.write().await.remove(&id) {
Some(channel) => match channel.close() {
Ok(_) => {
debug!("Channel {id} has been closed");
ManagerResponse::ChannelClosed { id }
}
Err(x) => ManagerResponse::from(x),
},
None => ManagerResponse::from(io::Error::new(
io::ErrorKind::NotConnected,
"Channel is not open or does not exist",
)),
},
ManagerRequest::Info { id } => match self.info(id).await {
Ok(info) => ManagerResponse::Info(info),
Err(x) => ManagerResponse::from(x),
@ -356,6 +350,7 @@ mod tests {
let server = ManagerServer {
config,
channels: RwLock::new(HashMap::new()),
connections: RwLock::new(HashMap::new()),
registry,
};

@ -9,7 +9,7 @@ use serde::de::DeserializeOwned;
use serde::Serialize;
use tokio::sync::{broadcast, RwLock};
use crate::common::{Listener, Response, Transport};
use crate::common::{ConnectionId, Listener, Response, Transport};
mod builder;
pub use builder::*;
@ -56,23 +56,21 @@ pub trait ServerHandler: Send {
/// Type of data sent back by the server
type Response;
/// Type of data to store locally tied to the specific connection
type LocalData: Send;
/// Invoked upon a new connection becoming established.
///
/// ### Note
///
/// This can be useful in performing some additional initialization on the connection's local
/// data prior to it being used anywhere else.
#[allow(unused_variables)]
async fn on_accept(&self, ctx: ConnectionCtx<'_, Self::LocalData>) -> io::Result<()> {
async fn on_connect(&self, id: ConnectionId) -> io::Result<()> {
Ok(())
}
/// Invoked upon an existing connection getting dropped.
#[allow(unused_variables)]
async fn on_disconnect(&self, id: ConnectionId) -> io::Result<()> {
Ok(())
}
/// Invoked upon receiving a request from a client. The server should process this
/// request, which can be found in `ctx`, and send one or more replies in response.
async fn on_request(&self, ctx: ServerCtx<Self::Request, Self::Response, Self::LocalData>);
async fn on_request(&self, ctx: RequestCtx<Self::Request, Self::Response>);
}
impl Server<()> {
@ -144,7 +142,6 @@ where
T: ServerHandler + Sync + 'static,
T::Request: DeserializeOwned + Send + Sync + 'static,
T::Response: Serialize + Send + 'static,
T::LocalData: Default + Send + Sync + 'static,
{
/// Consumes the server, starting a task to process connections from the `listener` and
/// returning a [`ServerRef`] that can be used to control the active server instance.
@ -257,15 +254,10 @@ mod tests {
#[async_trait]
impl ServerHandler for TestServerHandler {
type LocalData = ();
type Request = u16;
type Response = String;
async fn on_accept(&self, _: ConnectionCtx<'_, Self::LocalData>) -> io::Result<()> {
Ok(())
}
async fn on_request(&self, ctx: ServerCtx<Self::Request, Self::Response, Self::LocalData>) {
async fn on_request(&self, ctx: RequestCtx<Self::Request, Self::Response>) {
// Always send back "hello"
ctx.reply.send("hello".to_string()).await.unwrap();
}

@ -42,7 +42,6 @@ where
T: ServerHandler + Sync + 'static,
T::Request: DeserializeOwned + Send + Sync + 'static,
T::Response: Serialize + Send + 'static,
T::LocalData: Default + Send + Sync + 'static,
{
pub async fn start<P>(self, addr: IpAddr, port: P) -> io::Result<TcpServerRef>
where
@ -66,17 +65,16 @@ mod tests {
use super::*;
use crate::client::Client;
use crate::common::Request;
use crate::server::ServerCtx;
use crate::server::RequestCtx;
pub struct TestServerHandler;
#[async_trait]
impl ServerHandler for TestServerHandler {
type LocalData = ();
type Request = String;
type Response = String;
async fn on_request(&self, ctx: ServerCtx<Self::Request, Self::Response, Self::LocalData>) {
async fn on_request(&self, ctx: RequestCtx<Self::Request, Self::Response>) {
// Echo back what we received
ctx.reply
.send(ctx.request.payload.to_string())

@ -42,7 +42,6 @@ where
T: ServerHandler + Sync + 'static,
T::Request: DeserializeOwned + Send + Sync + 'static,
T::Response: Serialize + Send + 'static,
T::LocalData: Default + Send + Sync + 'static,
{
pub async fn start<P>(self, path: P) -> io::Result<UnixSocketServerRef>
where
@ -66,17 +65,16 @@ mod tests {
use super::*;
use crate::client::Client;
use crate::common::Request;
use crate::server::ServerCtx;
use crate::server::RequestCtx;
pub struct TestServerHandler;
#[async_trait]
impl ServerHandler for TestServerHandler {
type LocalData = ();
type Request = String;
type Response = String;
async fn on_request(&self, ctx: ServerCtx<Self::Request, Self::Response, Self::LocalData>) {
async fn on_request(&self, ctx: RequestCtx<Self::Request, Self::Response>) {
// Echo back what we received
ctx.reply
.send(ctx.request.payload.to_string())

@ -42,7 +42,6 @@ where
T: ServerHandler + Sync + 'static,
T::Request: DeserializeOwned + Send + Sync + 'static,
T::Response: Serialize + Send + 'static,
T::LocalData: Default + Send + Sync + 'static,
{
/// Start a new server at the specified address using the given codec
pub async fn start<A>(self, addr: A) -> io::Result<WindowsPipeServerRef>
@ -83,11 +82,10 @@ mod tests {
#[async_trait]
impl ServerHandler for TestServerHandler {
type LocalData = ();
type Request = String;
type Response = String;
async fn on_request(&self, ctx: ServerCtx<Self::Request, Self::Response, Self::LocalData>) {
async fn on_request(&self, ctx: ServerCtx<Self::Request, Self::Response>) {
// Echo back what we received
ctx.reply
.send(ctx.request.payload.to_string())

@ -12,10 +12,7 @@ use serde::Serialize;
use tokio::sync::{broadcast, mpsc, oneshot, RwLock};
use tokio::task::JoinHandle;
use super::{
ConnectionCtx, ConnectionState, ServerCtx, ServerHandler, ServerReply, ServerState,
ShutdownTimer,
};
use super::{ConnectionState, RequestCtx, ServerHandler, ServerReply, ServerState, ShutdownTimer};
use crate::common::{
Backup, Connection, Frame, Interest, Keychain, Response, Transport, UntypedRequest,
};
@ -226,7 +223,6 @@ where
H: ServerHandler + Sync + 'static,
H::Request: DeserializeOwned + Send + Sync + 'static,
H::Response: Serialize + Send + 'static,
H::LocalData: Default + Send + Sync + 'static,
T: Transport + 'static,
{
pub fn spawn(self) -> ConnectionTask {
@ -430,15 +426,10 @@ where
// Create local data for the connection and then process it
debug!("[Conn {id}] Officially accepting connection");
let mut local_data = H::LocalData::default();
if let Err(x) = await_or_shutdown!(handler.on_accept(ConnectionCtx {
connection_id: id,
local_data: &mut local_data
})) {
if let Err(x) = await_or_shutdown!(handler.on_connect(id)) {
terminate_connection!(@fatal "[Conn {id}] Accepting connection failed: {x}");
}
let local_data = Arc::new(local_data);
let mut last_heartbeat = Instant::now();
// Restore our connection's channels if we have them, otherwise make new ones
@ -487,14 +478,13 @@ where
Ok(request) => match request.to_typed_request() {
Ok(request) => {
let origin_id = request.id.clone();
let ctx = ServerCtx {
let ctx = RequestCtx {
connection_id: id,
request,
reply: ServerReply {
origin_id,
tx: tx.clone(),
},
local_data: Arc::clone(&local_data),
};
// Spawn a new task to run the request handler so we don't block
@ -615,21 +605,16 @@ mod tests {
use crate::common::{
HeapSecretKey, InmemoryTransport, Ready, Reconnectable, Request, Response,
};
use crate::server::Shutdown;
use crate::server::{ConnectionId, Shutdown};
struct TestServerHandler;
#[async_trait]
impl ServerHandler for TestServerHandler {
type LocalData = ();
type Request = u16;
type Response = String;
async fn on_accept(&self, _: ConnectionCtx<'_, Self::LocalData>) -> io::Result<()> {
Ok(())
}
async fn on_request(&self, ctx: ServerCtx<Self::Request, Self::Response, Self::LocalData>) {
async fn on_request(&self, ctx: RequestCtx<Self::Request, Self::Response>) {
// Always send back "hello"
ctx.reply.send("hello".to_string()).await.unwrap();
}
@ -750,18 +735,14 @@ mod tests {
#[async_trait]
impl ServerHandler for BadAcceptServerHandler {
type LocalData = ();
type Request = u16;
type Response = String;
async fn on_accept(&self, _: ConnectionCtx<'_, Self::LocalData>) -> io::Result<()> {
Err(io::Error::new(io::ErrorKind::Other, "bad accept"))
async fn on_connect(&self, _: ConnectionId) -> io::Result<()> {
Err(io::Error::new(io::ErrorKind::Other, "bad connect"))
}
async fn on_request(
&self,
_: ServerCtx<Self::Request, Self::Response, Self::LocalData>,
) {
async fn on_request(&self, _: RequestCtx<Self::Request, Self::Response>) {
unreachable!();
}
}
@ -1042,20 +1023,16 @@ mod tests {
#[async_trait]
impl ServerHandler for HangingAcceptServerHandler {
type LocalData = ();
type Request = ();
type Response = ();
async fn on_accept(&self, _: ConnectionCtx<'_, Self::LocalData>) -> io::Result<()> {
async fn on_connect(&self, _: ConnectionId) -> io::Result<()> {
// Wait "forever" so we can ensure that we fail at this step
tokio::time::sleep(Duration::MAX).await;
Err(io::Error::new(io::ErrorKind::Other, "bad accept"))
Err(io::Error::new(io::ErrorKind::Other, "bad connect"))
}
async fn on_request(
&self,
_: ServerCtx<Self::Request, Self::Response, Self::LocalData>,
) {
async fn on_request(&self, _: RequestCtx<Self::Request, Self::Response>) {
unreachable!();
}
}
@ -1098,19 +1075,15 @@ mod tests {
#[async_trait]
impl ServerHandler for AcceptServerHandler {
type LocalData = ();
type Request = ();
type Response = ();
async fn on_accept(&self, _: ConnectionCtx<'_, Self::LocalData>) -> io::Result<()> {
async fn on_connect(&self, _: ConnectionId) -> io::Result<()> {
self.tx.send(()).await.unwrap();
Ok(())
}
async fn on_request(
&self,
_: ServerCtx<Self::Request, Self::Response, Self::LocalData>,
) {
async fn on_request(&self, _: RequestCtx<Self::Request, Self::Response>) {
unreachable!();
}
}

@ -1,28 +1,14 @@
use std::sync::Arc;
use super::ServerReply;
use crate::common::{ConnectionId, Request};
/// Represents contextual information for working with an inbound request
pub struct ServerCtx<T, U, D> {
/// Unique identifer associated with the connection that sent the request
/// Represents contextual information for working with an inbound request.
pub struct RequestCtx<T, U> {
/// Unique identifer associated with the connection that sent the request.
pub connection_id: ConnectionId,
/// The request being handled
/// The request being handled.
pub request: Request<T>,
/// Used to send replies back to be sent out by the server
/// Used to send replies back to be sent out by the server.
pub reply: ServerReply<U>,
/// Reference to the connection's local data
pub local_data: Arc<D>,
}
/// Represents contextual information for working with an inbound connection
pub struct ConnectionCtx<'a, D> {
/// Unique identifer associated with the connection
pub connection_id: ConnectionId,
/// Reference to the connection's local data
pub local_data: &'a mut D,
}

@ -6,7 +6,7 @@ use distant_net::boxed_connect_handler;
use distant_net::client::Client;
use distant_net::common::{Destination, InmemoryTransport, Map, OneshotListener};
use distant_net::manager::{Config, ManagerClient, ManagerServer};
use distant_net::server::{Server, ServerCtx, ServerHandler};
use distant_net::server::{RequestCtx, Server, ServerHandler};
use log::*;
use test_log::test;
@ -14,11 +14,10 @@ struct TestServerHandler;
#[async_trait]
impl ServerHandler for TestServerHandler {
type LocalData = ();
type Request = String;
type Response = String;
async fn on_request(&self, ctx: ServerCtx<Self::Request, Self::Response, Self::LocalData>) {
async fn on_request(&self, ctx: RequestCtx<Self::Request, Self::Response>) {
ctx.reply
.send(format!("echo {}", ctx.request.payload))
.await

@ -2,7 +2,7 @@ use async_trait::async_trait;
use distant_auth::{DummyAuthHandler, Verifier};
use distant_net::client::Client;
use distant_net::common::{InmemoryTransport, OneshotListener};
use distant_net::server::{Server, ServerCtx, ServerHandler};
use distant_net::server::{RequestCtx, Server, ServerHandler};
use log::*;
use test_log::test;
@ -10,11 +10,10 @@ struct TestServerHandler;
#[async_trait]
impl ServerHandler for TestServerHandler {
type LocalData = ();
type Request = (u8, String);
type Response = String;
async fn on_request(&self, ctx: ServerCtx<Self::Request, Self::Response, Self::LocalData>) {
async fn on_request(&self, ctx: RequestCtx<Self::Request, Self::Response>) {
let (cnt, msg) = ctx.request.payload;
for i in 0..cnt {

@ -2,7 +2,7 @@ use async_trait::async_trait;
use distant_auth::{DummyAuthHandler, Verifier};
use distant_net::client::Client;
use distant_net::common::{InmemoryTransport, OneshotListener, Request};
use distant_net::server::{Server, ServerCtx, ServerHandler};
use distant_net::server::{RequestCtx, Server, ServerHandler};
use log::*;
use test_log::test;
@ -10,11 +10,10 @@ struct TestServerHandler;
#[async_trait]
impl ServerHandler for TestServerHandler {
type LocalData = ();
type Request = (u8, String);
type Response = String;
async fn on_request(&self, ctx: ServerCtx<Self::Request, Self::Response, Self::LocalData>) {
async fn on_request(&self, ctx: RequestCtx<Self::Request, Self::Response>) {
let (cnt, msg) = ctx.request.payload;
for i in 0..cnt {

@ -1,4 +1,4 @@
use std::collections::{HashMap, HashSet, VecDeque};
use std::collections::{HashMap, VecDeque};
use std::io;
use std::path::PathBuf;
use std::sync::{Arc, Weak};
@ -7,7 +7,6 @@ use std::time::Duration;
use async_compat::CompatExt;
use async_once_cell::OnceCell;
use async_trait::async_trait;
use distant_core::net::server::ConnectionCtx;
use distant_core::protocol::{
Capabilities, CapabilityKind, DirEntry, Environment, FileType, Metadata, Permissions,
ProcessId, PtySize, SetPermissionsOptions, SystemInfo, UnixMetadata, Version, PROTOCOL_VERSION,
@ -25,16 +24,6 @@ use crate::utils::{self, to_other_error};
/// Time after copy completes to wait for stdout/stderr to close
const COPY_COMPLETE_TIMEOUT: Duration = Duration::from_secs(1);
#[derive(Default)]
pub struct ConnectionState {
/// List of process ids that will be killed when the connection terminates
processes: Arc<RwLock<HashSet<ProcessId>>>,
/// Internal reference to global process list for removals
/// NOTE: Initialized during `on_accept` of [`DistantApi`]
global_processes: Weak<RwLock<HashMap<ProcessId, Process>>>,
}
struct Process {
stdin_tx: mpsc::Sender<Vec<u8>>,
kill_tx: mpsc::Sender<()>,
@ -72,18 +61,7 @@ impl SshDistantApi {
#[async_trait]
impl DistantApi for SshDistantApi {
type LocalData = ConnectionState;
async fn on_accept(&self, ctx: ConnectionCtx<'_, Self::LocalData>) -> io::Result<()> {
ctx.local_data.global_processes = Arc::downgrade(&self.processes);
Ok(())
}
async fn read_file(
&self,
ctx: DistantCtx<Self::LocalData>,
path: PathBuf,
) -> io::Result<Vec<u8>> {
async fn read_file(&self, ctx: DistantCtx, path: PathBuf) -> io::Result<Vec<u8>> {
debug!(
"[Conn {}] Reading bytes from file {:?}",
ctx.connection_id, path
@ -103,11 +81,7 @@ impl DistantApi for SshDistantApi {
Ok(contents.into_bytes())
}
async fn read_file_text(
&self,
ctx: DistantCtx<Self::LocalData>,
path: PathBuf,
) -> io::Result<String> {
async fn read_file_text(&self, ctx: DistantCtx, path: PathBuf) -> io::Result<String> {
debug!(
"[Conn {}] Reading text from file {:?}",
ctx.connection_id, path
@ -127,12 +101,7 @@ impl DistantApi for SshDistantApi {
Ok(contents)
}
async fn write_file(
&self,
ctx: DistantCtx<Self::LocalData>,
path: PathBuf,
data: Vec<u8>,
) -> io::Result<()> {
async fn write_file(&self, ctx: DistantCtx, path: PathBuf, data: Vec<u8>) -> io::Result<()> {
debug!(
"[Conn {}] Writing bytes to file {:?}",
ctx.connection_id, path
@ -154,7 +123,7 @@ impl DistantApi for SshDistantApi {
async fn write_file_text(
&self,
ctx: DistantCtx<Self::LocalData>,
ctx: DistantCtx,
path: PathBuf,
data: String,
) -> io::Result<()> {
@ -177,12 +146,7 @@ impl DistantApi for SshDistantApi {
Ok(())
}
async fn append_file(
&self,
ctx: DistantCtx<Self::LocalData>,
path: PathBuf,
data: Vec<u8>,
) -> io::Result<()> {
async fn append_file(&self, ctx: DistantCtx, path: PathBuf, data: Vec<u8>) -> io::Result<()> {
debug!(
"[Conn {}] Appending bytes to file {:?}",
ctx.connection_id, path
@ -213,7 +177,7 @@ impl DistantApi for SshDistantApi {
async fn append_file_text(
&self,
ctx: DistantCtx<Self::LocalData>,
ctx: DistantCtx,
path: PathBuf,
data: String,
) -> io::Result<()> {
@ -247,7 +211,7 @@ impl DistantApi for SshDistantApi {
async fn read_dir(
&self,
ctx: DistantCtx<Self::LocalData>,
ctx: DistantCtx,
path: PathBuf,
depth: usize,
absolute: bool,
@ -375,12 +339,7 @@ impl DistantApi for SshDistantApi {
Ok((entries, errors))
}
async fn create_dir(
&self,
ctx: DistantCtx<Self::LocalData>,
path: PathBuf,
all: bool,
) -> io::Result<()> {
async fn create_dir(&self, ctx: DistantCtx, path: PathBuf, all: bool) -> io::Result<()> {
debug!(
"[Conn {}] Creating directory {:?} {{all: {}}}",
ctx.connection_id, path, all
@ -436,12 +395,7 @@ impl DistantApi for SshDistantApi {
Ok(())
}
async fn remove(
&self,
ctx: DistantCtx<Self::LocalData>,
path: PathBuf,
force: bool,
) -> io::Result<()> {
async fn remove(&self, ctx: DistantCtx, path: PathBuf, force: bool) -> io::Result<()> {
debug!(
"[Conn {}] Removing {:?} {{force: {}}}",
ctx.connection_id, path, force
@ -526,12 +480,7 @@ impl DistantApi for SshDistantApi {
Ok(())
}
async fn copy(
&self,
ctx: DistantCtx<Self::LocalData>,
src: PathBuf,
dst: PathBuf,
) -> io::Result<()> {
async fn copy(&self, ctx: DistantCtx, src: PathBuf, dst: PathBuf) -> io::Result<()> {
debug!(
"[Conn {}] Copying {:?} to {:?}",
ctx.connection_id, src, dst
@ -573,12 +522,7 @@ impl DistantApi for SshDistantApi {
}
}
async fn rename(
&self,
ctx: DistantCtx<Self::LocalData>,
src: PathBuf,
dst: PathBuf,
) -> io::Result<()> {
async fn rename(&self, ctx: DistantCtx, src: PathBuf, dst: PathBuf) -> io::Result<()> {
debug!(
"[Conn {}] Renaming {:?} to {:?}",
ctx.connection_id, src, dst
@ -594,7 +538,7 @@ impl DistantApi for SshDistantApi {
Ok(())
}
async fn exists(&self, ctx: DistantCtx<Self::LocalData>, path: PathBuf) -> io::Result<bool> {
async fn exists(&self, ctx: DistantCtx, path: PathBuf) -> io::Result<bool> {
debug!("[Conn {}] Checking if {:?} exists", ctx.connection_id, path);
// NOTE: SFTP does not provide a means to check if a path exists that can be performed
@ -612,7 +556,7 @@ impl DistantApi for SshDistantApi {
async fn metadata(
&self,
ctx: DistantCtx<Self::LocalData>,
ctx: DistantCtx,
path: PathBuf,
canonicalize: bool,
resolve_file_type: bool,
@ -676,7 +620,7 @@ impl DistantApi for SshDistantApi {
#[allow(unreachable_code)]
async fn set_permissions(
&self,
ctx: DistantCtx<Self::LocalData>,
ctx: DistantCtx,
path: PathBuf,
permissions: Permissions,
options: SetPermissionsOptions,
@ -805,7 +749,7 @@ impl DistantApi for SshDistantApi {
async fn proc_spawn(
&self,
ctx: DistantCtx<Self::LocalData>,
ctx: DistantCtx,
cmd: String,
environment: Environment,
current_dir: Option<PathBuf>,
@ -817,14 +761,10 @@ impl DistantApi for SshDistantApi {
);
let global_processes = Arc::downgrade(&self.processes);
let local_processes = Arc::downgrade(&ctx.local_data.processes);
let cleanup = |id: ProcessId| async move {
if let Some(processes) = Weak::upgrade(&global_processes) {
processes.write().await.remove(&id);
}
if let Some(processes) = Weak::upgrade(&local_processes) {
processes.write().await.remove(&id);
}
};
let SpawnResult {
@ -874,7 +814,7 @@ impl DistantApi for SshDistantApi {
Ok(id)
}
async fn proc_kill(&self, ctx: DistantCtx<Self::LocalData>, id: ProcessId) -> io::Result<()> {
async fn proc_kill(&self, ctx: DistantCtx, id: ProcessId) -> io::Result<()> {
debug!("[Conn {}] Killing process {}", ctx.connection_id, id);
if let Some(process) = self.processes.read().await.get(&id) {
@ -892,12 +832,7 @@ impl DistantApi for SshDistantApi {
))
}
async fn proc_stdin(
&self,
ctx: DistantCtx<Self::LocalData>,
id: ProcessId,
data: Vec<u8>,
) -> io::Result<()> {
async fn proc_stdin(&self, ctx: DistantCtx, id: ProcessId, data: Vec<u8>) -> io::Result<()> {
debug!(
"[Conn {}] Sending stdin to process {}",
ctx.connection_id, id
@ -920,7 +855,7 @@ impl DistantApi for SshDistantApi {
async fn proc_resize_pty(
&self,
ctx: DistantCtx<Self::LocalData>,
ctx: DistantCtx,
id: ProcessId,
size: PtySize,
) -> io::Result<()> {
@ -944,7 +879,7 @@ impl DistantApi for SshDistantApi {
))
}
async fn system_info(&self, ctx: DistantCtx<Self::LocalData>) -> io::Result<SystemInfo> {
async fn system_info(&self, ctx: DistantCtx) -> io::Result<SystemInfo> {
// We cache each of these requested values since they should not change for the
// lifetime of the ssh connection
static CURRENT_DIR: OnceCell<PathBuf> = OnceCell::new();
@ -998,7 +933,7 @@ impl DistantApi for SshDistantApi {
})
}
async fn version(&self, ctx: DistantCtx<Self::LocalData>) -> io::Result<Version> {
async fn version(&self, ctx: DistantCtx) -> io::Result<Version> {
debug!("[Conn {}] Querying capabilities", ctx.connection_id);
let mut capabilities = Capabilities::all();

@ -174,7 +174,7 @@ impl AuthHandler for JsonAuthHandler {
// NOTE: This is a hack to skip the need for authentication prompting when a "none"
// method is available as the server should then reply with the on_finished
// status automatically.
if initialization
/*if initialization
.methods
.iter()
.any(|id| id == NoneAuthenticationMethod::ID)
@ -186,7 +186,7 @@ impl AuthHandler for JsonAuthHandler {
return Ok(InitializationResponse {
methods: vec![NoneAuthenticationMethod::ID.to_string()],
});
}
}*/
self.tx
.send_blocking(&Authentication::Initialization(initialization))?;
@ -202,9 +202,9 @@ impl AuthHandler for JsonAuthHandler {
}
async fn on_start_method(&mut self, start_method: StartMethod) -> io::Result<()> {
if self.skip {
/*if self.skip {
return Ok(());
}
}*/
self.tx
.send_blocking(&Authentication::StartMethod(start_method))?;
@ -212,9 +212,9 @@ impl AuthHandler for JsonAuthHandler {
}
async fn on_finished(&mut self) -> io::Result<()> {
if self.skip {
/*if self.skip {
return Ok(());
}
}*/
self.tx.send_blocking(&Authentication::Finished)?;
Ok(())

Loading…
Cancel
Save