You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

683 lines
23 KiB

use crate::{
Capabilities, ChangeKind, DirEntry, Environment, Error, Metadata, ProcessId, PtySize,
SearchId, SearchQuery, SystemInfo,
DistantMsg, DistantRequestData, DistantResponseData,
use async_trait::async_trait;
use distant_net::common::ConnectionId;
use distant_net::server::{ConnectionCtx, Reply, ServerCtx, ServerHandler};
use log::*;
use std::{io, path::PathBuf, sync::Arc};
mod local;
pub use local::LocalDistantApi;
mod reply;
use reply::DistantSingleReply;
/// Represents the context provided to the [`DistantApi`] for incoming requests
pub struct DistantCtx<T> {
pub connection_id: ConnectionId,
pub reply: Box<dyn Reply<Data = DistantResponseData>>,
pub local_data: Arc<T>,
/// Represents a [`ServerHandler`] that leverages an API compliant with `distant`
pub struct DistantApiServerHandler<T, D>
T: DistantApi<LocalData = D>,
api: T,
impl<T, D> DistantApiServerHandler<T, D>
T: DistantApi<LocalData = D>,
pub fn new(api: T) -> Self {
Self { api }
impl DistantApiServerHandler<LocalDistantApi, <LocalDistantApi as DistantApi>::LocalData> {
/// Creates a new server using the [`LocalDistantApi`] implementation
pub fn local() -> io::Result<Self> {
Ok(Self {
api: LocalDistantApi::initialize()?,
fn unsupported<T>(label: &str) -> io::Result<T> {
format!("{label} is unsupported"),
/// Interface to support the suite of functionality available with distant,
/// which can be used to build other servers that are compatible with distant
pub trait DistantApi {
type LocalData: Send + Sync;
/// Invoked whenever a new connection is established, providing a mutable reference to the
/// newly-created local data. This is a way to support modifying local data before it is used.
async fn on_accept(&self, ctx: ConnectionCtx<'_, Self::LocalData>) -> io::Result<()> {
/// Retrieves information about the server's capabilities.
/// *Override this, otherwise it will return "unsupported" as an error.*
async fn capabilities(&self, ctx: DistantCtx<Self::LocalData>) -> io::Result<Capabilities> {
/// Reads bytes from a file.
/// * `path` - the path to the file
/// *Override this, otherwise it will return "unsupported" as an error.*
async fn read_file(
ctx: DistantCtx<Self::LocalData>,
path: PathBuf,
) -> io::Result<Vec<u8>> {
/// Reads bytes from a file as text.
/// * `path` - the path to the file
/// *Override this, otherwise it will return "unsupported" as an error.*
async fn read_file_text(
ctx: DistantCtx<Self::LocalData>,
path: PathBuf,
) -> io::Result<String> {
/// Writes bytes to a file, overwriting the file if it exists.
/// * `path` - the path to the file
/// * `data` - the data to write
/// *Override this, otherwise it will return "unsupported" as an error.*
async fn write_file(
ctx: DistantCtx<Self::LocalData>,
path: PathBuf,
data: Vec<u8>,
) -> io::Result<()> {
/// Writes text to a file, overwriting the file if it exists.
/// * `path` - the path to the file
/// * `data` - the data to write
/// *Override this, otherwise it will return "unsupported" as an error.*
async fn write_file_text(
ctx: DistantCtx<Self::LocalData>,
path: PathBuf,
data: String,
) -> io::Result<()> {
/// Writes bytes to the end of a file, creating it if it is missing.
/// * `path` - the path to the file
/// * `data` - the data to append
/// *Override this, otherwise it will return "unsupported" as an error.*
async fn append_file(
ctx: DistantCtx<Self::LocalData>,
path: PathBuf,
data: Vec<u8>,
) -> io::Result<()> {
/// Writes bytes to the end of a file, creating it if it is missing.
/// * `path` - the path to the file
/// * `data` - the data to append
/// *Override this, otherwise it will return "unsupported" as an error.*
async fn append_file_text(
ctx: DistantCtx<Self::LocalData>,
path: PathBuf,
data: String,
) -> io::Result<()> {
/// Reads entries from a directory.
/// * `path` - the path to the directory
/// * `depth` - how far to traverse the directory, 0 being unlimited
/// * `absolute` - if true, will return absolute paths instead of relative paths
/// * `canonicalize` - if true, will canonicalize entry paths before returned
/// * `include_root` - if true, will include the directory specified in the entries
/// *Override this, otherwise it will return "unsupported" as an error.*
async fn read_dir(
ctx: DistantCtx<Self::LocalData>,
path: PathBuf,
depth: usize,
absolute: bool,
canonicalize: bool,
include_root: bool,
) -> io::Result<(Vec<DirEntry>, Vec<io::Error>)> {
/// Creates a directory.
/// * `path` - the path to the directory
/// * `all` - if true, will create all missing parent components
/// *Override this, otherwise it will return "unsupported" as an error.*
async fn create_dir(
ctx: DistantCtx<Self::LocalData>,
path: PathBuf,
all: bool,
) -> io::Result<()> {
/// Copies some file or directory.
/// * `src` - the path to the file or directory to copy
/// * `dst` - the path where the copy will be placed
/// *Override this, otherwise it will return "unsupported" as an error.*
async fn copy(
ctx: DistantCtx<Self::LocalData>,
src: PathBuf,
dst: PathBuf,
) -> io::Result<()> {
/// Removes some file or directory.
/// * `path` - the path to a file or directory
/// * `force` - if true, will remove non-empty directories
/// *Override this, otherwise it will return "unsupported" as an error.*
async fn remove(
ctx: DistantCtx<Self::LocalData>,
path: PathBuf,
force: bool,
) -> io::Result<()> {
/// Renames some file or directory.
/// * `src` - the path to the file or directory to rename
/// * `dst` - the new name for the file or directory
/// *Override this, otherwise it will return "unsupported" as an error.*
async fn rename(
ctx: DistantCtx<Self::LocalData>,
src: PathBuf,
dst: PathBuf,
) -> io::Result<()> {
/// Watches a file or directory for changes.
/// * `path` - the path to the file or directory
/// * `recursive` - if true, will watch for changes within subdirectories and beyond
/// * `only` - if non-empty, will limit reported changes to those included in this list
/// * `except` - if non-empty, will limit reported changes to those not included in this list
/// *Override this, otherwise it will return "unsupported" as an error.*
async fn watch(
ctx: DistantCtx<Self::LocalData>,
path: PathBuf,
recursive: bool,
only: Vec<ChangeKind>,
except: Vec<ChangeKind>,
) -> io::Result<()> {
/// Removes a file or directory from being watched.
/// * `path` - the path to the file or directory
/// *Override this, otherwise it will return "unsupported" as an error.*
async fn unwatch(&self, ctx: DistantCtx<Self::LocalData>, path: PathBuf) -> io::Result<()> {
/// Checks if the specified path exists.
/// * `path` - the path to the file or directory
/// *Override this, otherwise it will return "unsupported" as an error.*
async fn exists(&self, ctx: DistantCtx<Self::LocalData>, path: PathBuf) -> io::Result<bool> {
/// Reads metadata for a file or directory.
/// * `path` - the path to the file or directory
/// * `canonicalize` - if true, will include a canonicalized path in the metadata
/// * `resolve_file_type` - if true, will resolve symlinks to underlying type (file or dir)
/// *Override this, otherwise it will return "unsupported" as an error.*
async fn metadata(
ctx: DistantCtx<Self::LocalData>,
path: PathBuf,
canonicalize: bool,
resolve_file_type: bool,
) -> io::Result<Metadata> {
/// Searches files for matches based on a query.
/// * `query` - the specific query to perform
/// *Override this, otherwise it will return "unsupported" as an error.*
async fn search(
ctx: DistantCtx<Self::LocalData>,
query: SearchQuery,
) -> io::Result<SearchId> {
/// Cancels an actively-ongoing search.
/// * `id` - the id of the search to cancel
/// *Override this, otherwise it will return "unsupported" as an error.*
async fn cancel_search(
ctx: DistantCtx<Self::LocalData>,
id: SearchId,
) -> io::Result<()> {
/// Spawns a new process, returning its id.
/// * `cmd` - the full command to run as a new process (including arguments)
/// * `environment` - the environment variables to associate with the process
/// * `current_dir` - the alternative current directory to use with the process
/// * `pty` - if provided, will run the process within a PTY of the given size
/// *Override this, otherwise it will return "unsupported" as an error.*
async fn proc_spawn(
ctx: DistantCtx<Self::LocalData>,
cmd: String,
environment: Environment,
current_dir: Option<PathBuf>,
pty: Option<PtySize>,
) -> io::Result<ProcessId> {
/// Kills a running process by its id.
/// * `id` - the unique id of the process
/// *Override this, otherwise it will return "unsupported" as an error.*
async fn proc_kill(&self, ctx: DistantCtx<Self::LocalData>, id: ProcessId) -> io::Result<()> {
/// Sends data to the stdin of the process with the specified id.
/// * `id` - the unique id of the process
/// * `data` - the bytes to send to stdin
/// *Override this, otherwise it will return "unsupported" as an error.*
async fn proc_stdin(
ctx: DistantCtx<Self::LocalData>,
id: ProcessId,
data: Vec<u8>,
) -> io::Result<()> {
/// Resizes the PTY of the process with the specified id.
/// * `id` - the unique id of the process
/// * `size` - the new size of the pty
/// *Override this, otherwise it will return "unsupported" as an error.*
async fn proc_resize_pty(
ctx: DistantCtx<Self::LocalData>,
id: ProcessId,
size: PtySize,
) -> io::Result<()> {
/// Retrieves information about the system.
/// *Override this, otherwise it will return "unsupported" as an error.*
async fn system_info(&self, ctx: DistantCtx<Self::LocalData>) -> io::Result<SystemInfo> {
impl<T, D> ServerHandler for DistantApiServerHandler<T, D>
T: DistantApi<LocalData = D> + Send + Sync,
D: Send + Sync,
type Request = DistantMsg<DistantRequestData>;
type Response = DistantMsg<DistantResponseData>;
type LocalData = D;
/// Overridden to leverage [`DistantApi`] implementation of `on_accept`
async fn on_accept(&self, ctx: ConnectionCtx<'_, Self::LocalData>) -> io::Result<()> {
T::on_accept(&self.api, ctx).await
async fn on_request(&self, ctx: ServerCtx<Self::Request, Self::Response, Self::LocalData>) {
let ServerCtx {
} = ctx;
// Convert our reply to a queued reply so we can ensure that the result
// of an API function is sent back before anything else
let reply = reply.queue();
// Process single vs batch requests
let response = match request.payload {
DistantMsg::Single(data) => {
let ctx = DistantCtx {
reply: Box::new(DistantSingleReply::from(reply.clone_reply())),
let data = handle_request(self, ctx, data).await;
// Report outgoing errors in our debug logs
if let DistantResponseData::Error(x) = &data {
debug!("[Conn {}] {}", connection_id, x);
DistantMsg::Batch(list) => {
let mut out = Vec::new();
for data in list {
let ctx = DistantCtx {
reply: Box::new(DistantSingleReply::from(reply.clone_reply())),
local_data: Arc::clone(&local_data),
// TODO: This does not run in parallel, meaning that the next item in the
// batch will not be queued until the previous item completes! This
// would be useful if we wanted to chain requests where the previous
// request feeds into the current request, but not if we just want
// to run everything together. So we should instead rewrite this
// to spawn a task per request and then await completion of all tasks
let data = handle_request(self, ctx, data).await;
// Report outgoing errors in our debug logs
if let DistantResponseData::Error(x) = &data {
debug!("[Conn {}] {}", connection_id, x);
// Queue up our result to go before ANY of the other messages that might be sent.
// This is important to avoid situations such as when a process is started, but before
// the confirmation can be sent some stdout or stderr is captured and sent first.
if let Err(x) = reply.send_before(response).await {
error!("[Conn {}] Failed to send response: {}", connection_id, x);
// Flush out all of our replies thus far and toggle to no longer hold submissions
if let Err(x) = reply.flush(false).await {
"[Conn {}] Failed to flush response queue: {}",
connection_id, x
/// Processes an incoming request
async fn handle_request<T, D>(
server: &DistantApiServerHandler<T, D>,
ctx: DistantCtx<D>,
request: DistantRequestData,
) -> DistantResponseData
T: DistantApi<LocalData = D> + Send + Sync,
D: Send + Sync,
match request {
DistantRequestData::Capabilities {} => server
.map(|supported| DistantResponseData::Capabilities { supported })
DistantRequestData::FileRead { path } => server
.read_file(ctx, path)
.map(|data| DistantResponseData::Blob { data })
DistantRequestData::FileReadText { path } => server
.read_file_text(ctx, path)
.map(|data| DistantResponseData::Text { data })
DistantRequestData::FileWrite { path, data } => server
.write_file(ctx, path, data)
.map(|_| DistantResponseData::Ok)
DistantRequestData::FileWriteText { path, text } => server
.write_file_text(ctx, path, text)
.map(|_| DistantResponseData::Ok)
DistantRequestData::FileAppend { path, data } => server
.append_file(ctx, path, data)
.map(|_| DistantResponseData::Ok)
DistantRequestData::FileAppendText { path, text } => server
.append_file_text(ctx, path, text)
.map(|_| DistantResponseData::Ok)
DistantRequestData::DirRead {
} => server
.read_dir(ctx, path, depth, absolute, canonicalize, include_root)
.map(|(entries, errors)| DistantResponseData::DirEntries {
errors: errors.into_iter().map(Error::from).collect(),
DistantRequestData::DirCreate { path, all } => server
.create_dir(ctx, path, all)
.map(|_| DistantResponseData::Ok)
DistantRequestData::Remove { path, force } => server
.remove(ctx, path, force)
.map(|_| DistantResponseData::Ok)
DistantRequestData::Copy { src, dst } => server
.copy(ctx, src, dst)
.map(|_| DistantResponseData::Ok)
DistantRequestData::Rename { src, dst } => server
.rename(ctx, src, dst)
.map(|_| DistantResponseData::Ok)
DistantRequestData::Watch {
} => server
.watch(ctx, path, recursive, only, except)
.map(|_| DistantResponseData::Ok)
DistantRequestData::Unwatch { path } => server
.unwatch(ctx, path)
.map(|_| DistantResponseData::Ok)
DistantRequestData::Exists { path } => server
.exists(ctx, path)
.map(|value| DistantResponseData::Exists { value })
DistantRequestData::Metadata {
} => server
.metadata(ctx, path, canonicalize, resolve_file_type)
DistantRequestData::Search { query } => server
.search(ctx, query)
.map(|id| DistantResponseData::SearchStarted { id })
DistantRequestData::CancelSearch { id } => server
.cancel_search(ctx, id)
.map(|_| DistantResponseData::Ok)
DistantRequestData::ProcSpawn {
} => server
.proc_spawn(ctx, cmd.into(), environment, current_dir, pty)
.map(|id| DistantResponseData::ProcSpawned { id })
DistantRequestData::ProcKill { id } => server
.proc_kill(ctx, id)
.map(|_| DistantResponseData::Ok)
DistantRequestData::ProcStdin { id, data } => server
.proc_stdin(ctx, id, data)
.map(|_| DistantResponseData::Ok)
DistantRequestData::ProcResizePty { id, size } => server
.proc_resize_pty(ctx, id, size)
.map(|_| DistantResponseData::Ok)
DistantRequestData::SystemInfo {} => server