More work to write a client bridge to api

feat/RusshSupport
Chip Senkbeil 7 months ago
parent d67002421d
commit 608a4c0161
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

@ -1,58 +1,55 @@
use std::any::TypeId;
use std::io;
use std::path::PathBuf;
use async_trait::async_trait;
use distant_core_protocol::*;
mod checker;
mod ctx;
mod unsupported;
pub use checker::*;
pub use ctx::*;
pub use unsupported::*;
/// Full API that represents a distant-compatible server.
#[async_trait]
pub trait Api {
/// Specific implementation of [`FileSystemApi`] associated with this [`Api`].
type FileSystem: FileSystemApi;
/// Specific implementation of [`ProcessApi`] associated with this [`Api`].
type Process: ProcessApi;
/// Specific implementation of [`SearchApi`] associated with this [`Api`].
type Search: SearchApi;
/// Specific implementation of [`SystemInfoApi`] associated with this [`Api`].
type SystemInfo: SystemInfoApi;
/// Specific implementation of [`VersionApi`] associated with this [`Api`].
type Version: VersionApi;
/// Specific implementation of [`WatchApi`] associated with this [`Api`].
type Watch: WatchApi;
/// Returns true if [`FileSystemApi`] is supported. This is checked by ensuring that the
/// implementation of the associated trait is not [`Unsupported`].
fn is_file_system_api_supported() -> bool {
TypeId::of::<Self::FileSystem>() != TypeId::of::<Unsupported>()
}
/// Returns true if [`ProcessApi`] is supported. This is checked by ensuring that the
/// implementation of the associated trait is not [`Unsupported`].
fn is_process_api_supported() -> bool {
TypeId::of::<Self::Process>() != TypeId::of::<Unsupported>()
}
/// Returns true if [`SearchApi`] is supported. This is checked by ensuring that the
/// implementation of the associated trait is not [`Unsupported`].
fn is_search_api_supported() -> bool {
TypeId::of::<Self::Search>() != TypeId::of::<Unsupported>()
}
/// Returns true if [`SystemInfoApi`] is supported. This is checked by ensuring that the
/// implementation of the associated trait is not [`Unsupported`].
fn is_system_info_api_supported() -> bool {
TypeId::of::<Self::SystemInfo>() != TypeId::of::<Unsupported>()
}
/// Returns true if [`VersionApi`] is supported. This is checked by ensuring that the
/// implementation of the associated trait is not [`Unsupported`].
fn is_version_api_supported() -> bool {
TypeId::of::<Self::Version>() != TypeId::of::<Unsupported>()
}
/// Returns true if [`WatchApi`] is supported. This is checked by ensuring that the
/// implementation of the associated trait is not [`Unsupported`].
fn is_watch_api_supported() -> bool {
TypeId::of::<Self::Watch>() != TypeId::of::<Unsupported>()
}
/// Returns a reference to the [`FileSystemApi`] implementation tied to this [`Api`].
fn file_system(&self) -> &Self::FileSystem;
/// Returns a reference to the [`ProcessApi`] implementation tied to this [`Api`].
fn process(&self) -> &Self::Process;
/// Returns a reference to the [`SearchApi`] implementation tied to this [`Api`].
fn search(&self) -> &Self::Search;
/// Returns a reference to the [`SystemInfoApi`] implementation tied to this [`Api`].
fn system_info(&self) -> &Self::SystemInfo;
/// Returns a reference to the [`VersionApi`] implementation tied to this [`Api`].
fn version(&self) -> &Self::Version;
/// Returns a reference to the [`WatchApi`] implementation tied to this [`Api`].
fn watch(&self) -> &Self::Watch;
}
/// API supporting filesystem operations.
@ -252,194 +249,3 @@ pub trait WatchApi {
/// * `path` - the path to the file or directory
async fn unwatch(&self, ctx: BoxedCtx, path: PathBuf) -> io::Result<()>;
}
pub use unsupported::Unsupported;
mod unsupported {
use super::*;
#[inline]
fn unsupported<T>(label: &str) -> io::Result<T> {
Err(io::Error::new(
io::ErrorKind::Unsupported,
format!("{label} is unsupported"),
))
}
/// Generic struct that implements all APIs as unsupported.
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct Unsupported;
#[async_trait]
impl Api for Unsupported {
type FileSystem = Self;
type Process = Self;
type Search = Self;
type SystemInfo = Self;
type Version = Self;
type Watch = Self;
}
#[async_trait]
impl FileSystemApi for Unsupported {
async fn read_file(&self, ctx: BoxedCtx, path: PathBuf) -> io::Result<Vec<u8>> {
unsupported("read_file")
}
async fn read_file_text(&self, ctx: BoxedCtx, path: PathBuf) -> io::Result<String> {
unsupported("read_file_text")
}
async fn write_file(&self, ctx: BoxedCtx, path: PathBuf, data: Vec<u8>) -> io::Result<()> {
unsupported("write_file")
}
async fn write_file_text(
&self,
ctx: BoxedCtx,
path: PathBuf,
data: String,
) -> io::Result<()> {
unsupported("write_file_text")
}
async fn append_file(&self, ctx: BoxedCtx, path: PathBuf, data: Vec<u8>) -> io::Result<()> {
unsupported("append_file")
}
async fn append_file_text(
&self,
ctx: BoxedCtx,
path: PathBuf,
data: String,
) -> io::Result<()> {
unsupported("append_file_text")
}
async fn read_dir(
&self,
ctx: BoxedCtx,
path: PathBuf,
depth: usize,
absolute: bool,
canonicalize: bool,
include_root: bool,
) -> io::Result<(Vec<DirEntry>, Vec<io::Error>)> {
unsupported("read_dir")
}
async fn create_dir(&self, ctx: BoxedCtx, path: PathBuf, all: bool) -> io::Result<()> {
unsupported("create_dir")
}
async fn copy(&self, ctx: BoxedCtx, src: PathBuf, dst: PathBuf) -> io::Result<()> {
unsupported("copy")
}
async fn remove(&self, ctx: BoxedCtx, path: PathBuf, force: bool) -> io::Result<()> {
unsupported("remove")
}
async fn rename(&self, ctx: BoxedCtx, src: PathBuf, dst: PathBuf) -> io::Result<()> {
unsupported("rename")
}
async fn exists(&self, ctx: BoxedCtx, path: PathBuf) -> io::Result<bool> {
unsupported("exists")
}
async fn metadata(
&self,
ctx: BoxedCtx,
path: PathBuf,
canonicalize: bool,
resolve_file_type: bool,
) -> io::Result<Metadata> {
unsupported("metadata")
}
async fn set_permissions(
&self,
ctx: BoxedCtx,
path: PathBuf,
permissions: Permissions,
options: SetPermissionsOptions,
) -> io::Result<()> {
unsupported("set_permissions")
}
}
#[async_trait]
impl ProcessApi for Unsupported {
async fn proc_spawn(
&self,
ctx: BoxedCtx,
cmd: String,
environment: Environment,
current_dir: Option<PathBuf>,
pty: Option<PtySize>,
) -> io::Result<ProcessId> {
unsupported("proc_spawn")
}
async fn proc_kill(&self, ctx: BoxedCtx, id: ProcessId) -> io::Result<()> {
unsupported("proc_kill")
}
async fn proc_stdin(&self, ctx: BoxedCtx, id: ProcessId, data: Vec<u8>) -> io::Result<()> {
unsupported("proc_stdin")
}
async fn proc_resize_pty(
&self,
ctx: BoxedCtx,
id: ProcessId,
size: PtySize,
) -> io::Result<()> {
unsupported("proc_resize_pty")
}
}
#[async_trait]
impl SearchApi for Unsupported {
async fn search(&self, ctx: BoxedCtx, query: SearchQuery) -> io::Result<SearchId> {
unsupported("search")
}
async fn cancel_search(&self, ctx: BoxedCtx, id: SearchId) -> io::Result<()> {
unsupported("cancel_search")
}
}
#[async_trait]
impl SystemInfoApi for Unsupported {
async fn system_info(&self, ctx: BoxedCtx) -> io::Result<SystemInfo> {
unsupported("system_info")
}
}
#[async_trait]
impl VersionApi for Unsupported {
async fn version(&self, ctx: BoxedCtx) -> io::Result<Version> {
unsupported("version")
}
}
#[async_trait]
impl WatchApi for Unsupported {
async fn watch(
&self,
ctx: BoxedCtx,
path: PathBuf,
recursive: bool,
only: Vec<ChangeKind>,
except: Vec<ChangeKind>,
) -> io::Result<()> {
unsupported("watch")
}
async fn unwatch(&self, ctx: BoxedCtx, path: PathBuf) -> io::Result<()> {
unsupported("unwatch")
}
}
}

@ -0,0 +1,68 @@
use std::any::TypeId;
use super::{Api, Unsupported};
/// Utility class to check if various APIs are supported.
pub struct Checker;
impl Checker {
/// Returns true if [`FileSystemApi`] is supported. This is checked by ensuring that the
/// implementation of the associated trait is not [`Unsupported`].
pub fn has_file_system_support<T>() -> bool
where
T: Api,
T::FileSystem: 'static,
{
TypeId::of::<T::FileSystem>() != TypeId::of::<Unsupported>()
}
/// Returns true if [`ProcessApi`] is supported. This is checked by ensuring that the
/// implementation of the associated trait is not [`Unsupported`].
pub fn has_process_support<T>() -> bool
where
T: Api,
T::Process: 'static,
{
TypeId::of::<T::Process>() != TypeId::of::<Unsupported>()
}
/// Returns true if [`SearchApi`] is supported. This is checked by ensuring that the
/// implementation of the associated trait is not [`Unsupported`].
pub fn has_search_support<T>() -> bool
where
T: Api,
T::Search: 'static,
{
TypeId::of::<T::Search>() != TypeId::of::<Unsupported>()
}
/// Returns true if [`SystemInfoApi`] is supported. This is checked by ensuring that the
/// implementation of the associated trait is not [`Unsupported`].
pub fn has_system_info_support<T>() -> bool
where
T: Api,
T::SystemInfo: 'static,
{
TypeId::of::<T::SystemInfo>() != TypeId::of::<Unsupported>()
}
/// Returns true if [`VersionApi`] is supported. This is checked by ensuring that the
/// implementation of the associated trait is not [`Unsupported`].
pub fn has_version_support<T>() -> bool
where
T: Api,
T::Version: 'static,
{
TypeId::of::<T::Version>() != TypeId::of::<Unsupported>()
}
/// Returns true if [`WatchApi`] is supported. This is checked by ensuring that the
/// implementation of the associated trait is not [`Unsupported`].
pub fn has_watch_support<T>() -> bool
where
T: Api,
T::Watch: 'static,
{
TypeId::of::<T::Watch>() != TypeId::of::<Unsupported>()
}
}

@ -1,6 +1,7 @@
use std::io;
use async_trait::async_trait;
use distant_core_protocol::Response;
/// Type abstraction of a boxed [`Ctx`].
pub type BoxedCtx = Box<dyn Ctx>;
@ -16,5 +17,5 @@ pub trait Ctx: Send {
fn clone_ctx(&self) -> BoxedCtx;
/// Sends some response back.
fn send(&self, data: Vec<u8>) -> io::Result<()>;
fn send(&self, response: Response) -> io::Result<()>;
}

@ -0,0 +1,212 @@
use async_trait::async_trait;
use super::*;
#[inline]
fn unsupported<T>(label: &str) -> io::Result<T> {
Err(io::Error::new(
io::ErrorKind::Unsupported,
format!("{label} is unsupported"),
))
}
/// Generic struct that implements all APIs as unsupported.
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct Unsupported;
#[async_trait]
impl Api for Unsupported {
type FileSystem = Self;
type Process = Self;
type Search = Self;
type SystemInfo = Self;
type Version = Self;
type Watch = Self;
fn file_system(&self) -> &Self::FileSystem {
self
}
fn process(&self) -> &Self::Process {
self
}
fn search(&self) -> &Self::Search {
self
}
fn system_info(&self) -> &Self::SystemInfo {
self
}
fn version(&self) -> &Self::Version {
self
}
fn watch(&self) -> &Self::Watch {
self
}
}
#[async_trait]
impl FileSystemApi for Unsupported {
async fn read_file(&self, _ctx: BoxedCtx, _path: PathBuf) -> io::Result<Vec<u8>> {
unsupported("read_file")
}
async fn read_file_text(&self, _ctx: BoxedCtx, _path: PathBuf) -> io::Result<String> {
unsupported("read_file_text")
}
async fn write_file(&self, _ctx: BoxedCtx, _path: PathBuf, _data: Vec<u8>) -> io::Result<()> {
unsupported("write_file")
}
async fn write_file_text(
&self,
_ctx: BoxedCtx,
_path: PathBuf,
_data: String,
) -> io::Result<()> {
unsupported("write_file_text")
}
async fn append_file(&self, _ctx: BoxedCtx, _path: PathBuf, _data: Vec<u8>) -> io::Result<()> {
unsupported("append_file")
}
async fn append_file_text(
&self,
_ctx: BoxedCtx,
_path: PathBuf,
_data: String,
) -> io::Result<()> {
unsupported("append_file_text")
}
async fn read_dir(
&self,
_ctx: BoxedCtx,
_path: PathBuf,
_depth: usize,
_absolute: bool,
_canonicalize: bool,
_include_root: bool,
) -> io::Result<(Vec<DirEntry>, Vec<io::Error>)> {
unsupported("read_dir")
}
async fn create_dir(&self, _ctx: BoxedCtx, _path: PathBuf, _all: bool) -> io::Result<()> {
unsupported("create_dir")
}
async fn copy(&self, _ctx: BoxedCtx, _src: PathBuf, _dst: PathBuf) -> io::Result<()> {
unsupported("copy")
}
async fn remove(&self, _ctx: BoxedCtx, _path: PathBuf, _force: bool) -> io::Result<()> {
unsupported("remove")
}
async fn rename(&self, _ctx: BoxedCtx, _src: PathBuf, _dst: PathBuf) -> io::Result<()> {
unsupported("rename")
}
async fn exists(&self, _ctx: BoxedCtx, _path: PathBuf) -> io::Result<bool> {
unsupported("exists")
}
async fn metadata(
&self,
_ctx: BoxedCtx,
_path: PathBuf,
_canonicalize: bool,
_resolve_file_type: bool,
) -> io::Result<Metadata> {
unsupported("metadata")
}
async fn set_permissions(
&self,
_ctx: BoxedCtx,
_path: PathBuf,
_permissions: Permissions,
_options: SetPermissionsOptions,
) -> io::Result<()> {
unsupported("set_permissions")
}
}
#[async_trait]
impl ProcessApi for Unsupported {
async fn proc_spawn(
&self,
_ctx: BoxedCtx,
_cmd: String,
_environment: Environment,
_current_dir: Option<PathBuf>,
_pty: Option<PtySize>,
) -> io::Result<ProcessId> {
unsupported("proc_spawn")
}
async fn proc_kill(&self, _ctx: BoxedCtx, _id: ProcessId) -> io::Result<()> {
unsupported("proc_kill")
}
async fn proc_stdin(&self, _ctx: BoxedCtx, _id: ProcessId, _data: Vec<u8>) -> io::Result<()> {
unsupported("proc_stdin")
}
async fn proc_resize_pty(
&self,
_ctx: BoxedCtx,
_id: ProcessId,
_size: PtySize,
) -> io::Result<()> {
unsupported("proc_resize_pty")
}
}
#[async_trait]
impl SearchApi for Unsupported {
async fn search(&self, _ctx: BoxedCtx, _query: SearchQuery) -> io::Result<SearchId> {
unsupported("search")
}
async fn cancel_search(&self, _ctx: BoxedCtx, _id: SearchId) -> io::Result<()> {
unsupported("cancel_search")
}
}
#[async_trait]
impl SystemInfoApi for Unsupported {
async fn system_info(&self, _ctx: BoxedCtx) -> io::Result<SystemInfo> {
unsupported("system_info")
}
}
#[async_trait]
impl VersionApi for Unsupported {
async fn version(&self, _ctx: BoxedCtx) -> io::Result<Version> {
unsupported("version")
}
}
#[async_trait]
impl WatchApi for Unsupported {
async fn watch(
&self,
_ctx: BoxedCtx,
_path: PathBuf,
_recursive: bool,
_only: Vec<ChangeKind>,
_except: Vec<ChangeKind>,
) -> io::Result<()> {
unsupported("watch")
}
async fn unwatch(&self, _ctx: BoxedCtx, _path: PathBuf) -> io::Result<()> {
unsupported("unwatch")
}
}

@ -1,8 +1,14 @@
use std::io;
use std::sync::mpsc;
use std::sync::{mpsc, Arc};
use async_trait::async_trait;
use distant_core_protocol::{Request, Response};
use distant_core_protocol::{Error, Request, Response};
use crate::api::{
Api, BoxedCtx, Ctx, FileSystemApi, ProcessApi, SearchApi, SystemInfoApi, VersionApi, WatchApi,
};
pub type BoxedClient = Box<dyn Client>;
/// Full API for a distant-compatible client.
#[async_trait]
@ -20,3 +26,269 @@ pub trait Client {
/// the session's receiving line to the remote server has already been severed
async fn send(&mut self, request: Request) -> io::Result<Response>;
}
/// Represents a bridge between a [`Client`] and an [`Api`] implementation that maps requests to
/// the API and forwards responses back.
///
/// This can be used to run an Api implementation locally, such as when you want to translate some
/// other platform (e.g. ssh, docker) into a distant-compatible form.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ClientBridge<T: Api>(Arc<T>);
impl<T: Api> ClientBridge<T> {
/// Creates a new bridge wrapping around the provided api.
pub fn new(api: T) -> Self {
Self(Arc::new(api))
}
}
#[async_trait]
impl<T: Api + Send + Sync> Client for ClientBridge<T> {
async fn fire(&mut self, request: Request) -> io::Result<()> {
let _ = self.send(request).await?;
Ok(())
}
async fn mail(&mut self, request: Request) -> io::Result<mpsc::Receiver<Response>> {
#[derive(Clone, Debug)]
struct __Ctx(u32, mpsc::Sender<Response>);
impl Ctx for __Ctx {
fn connection(&self) -> u32 {
self.0
}
fn clone_ctx(&self) -> BoxedCtx {
Box::new(__Ctx(self.0, self.1.clone()))
}
fn send(&self, response: Response) -> io::Result<()> {
self.1
.send(response)
.map_err(|_| io::Error::new(io::ErrorKind::Other, "Bridge has closed"))
}
}
let (tx, rx) = mpsc::channel();
let ctx = Box::new(__Ctx(0, tx));
// TODO: This is blocking! How can we make this async? Do we REALLY need to import tokio?
//
// We would need to import tokio to spawn a task to run this...
let _response = handle_request(Arc::clone(&self.0), ctx, request).await;
Ok(rx)
}
async fn send(&mut self, request: Request) -> io::Result<Response> {
let rx = self.mail(request).await?;
// TODO: This is blocking! How can we make this async? Do we REALLY need to import tokio?
rx.recv()
.map_err(|_| io::Error::new(io::ErrorKind::Other, "Bridge has closed"))
}
}
/// Processes an incoming request.
async fn handle_request<T>(api: Arc<T>, ctx: BoxedCtx, request: Request) -> Response
where
T: Api + Send,
{
match request {
Request::Version {} => {
let api = api.version();
api.version(ctx)
.await
.map(Response::Version)
.unwrap_or_else(Response::from)
}
Request::FileRead { path } => {
let api = api.file_system();
api.read_file(ctx, path)
.await
.map(|data| Response::Blob { data })
.unwrap_or_else(Response::from)
}
Request::FileReadText { path } => {
let api = api.file_system();
api.read_file_text(ctx, path)
.await
.map(|data| Response::Text { data })
.unwrap_or_else(Response::from)
}
Request::FileWrite { path, data } => {
let api = api.file_system();
api.write_file(ctx, path, data)
.await
.map(|_| Response::Ok)
.unwrap_or_else(Response::from)
}
Request::FileWriteText { path, text } => {
let api = api.file_system();
api.write_file_text(ctx, path, text)
.await
.map(|_| Response::Ok)
.unwrap_or_else(Response::from)
}
Request::FileAppend { path, data } => {
let api = api.file_system();
api.append_file(ctx, path, data)
.await
.map(|_| Response::Ok)
.unwrap_or_else(Response::from)
}
Request::FileAppendText { path, text } => {
let api = api.file_system();
api.append_file_text(ctx, path, text)
.await
.map(|_| Response::Ok)
.unwrap_or_else(Response::from)
}
Request::DirRead {
path,
depth,
absolute,
canonicalize,
include_root,
} => {
let api = api.file_system();
api.read_dir(ctx, path, depth, absolute, canonicalize, include_root)
.await
.map(|(entries, errors)| Response::DirEntries {
entries,
errors: errors.into_iter().map(Error::from).collect(),
})
.unwrap_or_else(Response::from)
}
Request::DirCreate { path, all } => {
let api = api.file_system();
api.create_dir(ctx, path, all)
.await
.map(|_| Response::Ok)
.unwrap_or_else(Response::from)
}
Request::Remove { path, force } => {
let api = api.file_system();
api.remove(ctx, path, force)
.await
.map(|_| Response::Ok)
.unwrap_or_else(Response::from)
}
Request::Copy { src, dst } => {
let api = api.file_system();
api.copy(ctx, src, dst)
.await
.map(|_| Response::Ok)
.unwrap_or_else(Response::from)
}
Request::Rename { src, dst } => {
let api = api.file_system();
api.rename(ctx, src, dst)
.await
.map(|_| Response::Ok)
.unwrap_or_else(Response::from)
}
Request::Watch {
path,
recursive,
only,
except,
} => {
let api = api.watch();
api.watch(ctx, path, recursive, only, except)
.await
.map(|_| Response::Ok)
.unwrap_or_else(Response::from)
}
Request::Unwatch { path } => {
let api = api.watch();
api.unwatch(ctx, path)
.await
.map(|_| Response::Ok)
.unwrap_or_else(Response::from)
}
Request::Exists { path } => {
let api = api.file_system();
api.exists(ctx, path)
.await
.map(|value| Response::Exists { value })
.unwrap_or_else(Response::from)
}
Request::Metadata {
path,
canonicalize,
resolve_file_type,
} => {
let api = api.file_system();
api.metadata(ctx, path, canonicalize, resolve_file_type)
.await
.map(Response::Metadata)
.unwrap_or_else(Response::from)
}
Request::SetPermissions {
path,
permissions,
options,
} => {
let api = api.file_system();
api.set_permissions(ctx, path, permissions, options)
.await
.map(|_| Response::Ok)
.unwrap_or_else(Response::from)
}
Request::Search { query } => {
let api = api.search();
api.search(ctx, query)
.await
.map(|id| Response::SearchStarted { id })
.unwrap_or_else(Response::from)
}
Request::CancelSearch { id } => {
let api = api.search();
api.cancel_search(ctx, id)
.await
.map(|_| Response::Ok)
.unwrap_or_else(Response::from)
}
Request::ProcSpawn {
cmd,
environment,
current_dir,
pty,
} => {
let api = api.process();
api.proc_spawn(ctx, cmd.into(), environment, current_dir, pty)
.await
.map(|id| Response::ProcSpawned { id })
.unwrap_or_else(Response::from)
}
Request::ProcKill { id } => {
let api = api.process();
api.proc_kill(ctx, id)
.await
.map(|_| Response::Ok)
.unwrap_or_else(Response::from)
}
Request::ProcStdin { id, data } => {
let api = api.process();
api.proc_stdin(ctx, id, data)
.await
.map(|_| Response::Ok)
.unwrap_or_else(Response::from)
}
Request::ProcResizePty { id, size } => {
let api = api.process();
api.proc_resize_pty(ctx, id, size)
.await
.map(|_| Response::Ok)
.unwrap_or_else(Response::from)
}
Request::SystemInfo {} => {
let api = api.system_info();
api.system_info(ctx)
.await
.map(Response::SystemInfo)
.unwrap_or_else(Response::from)
}
}
}

Loading…
Cancel
Save