Refactor codebase into cli and core modules, add unix socket support, bump to 0.5.0

pull/38/head v0.5.0
Chip Senkbeil 3 years ago
parent bb7829e3f0
commit d4775477aa
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

33
Cargo.lock generated

@ -177,33 +177,12 @@ dependencies = [
"generic-array",
]
[[package]]
name = "directories"
version = "3.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e69600ff1703123957937708eb27f7a564e48885c537782722ed0ba3189ce1d7"
dependencies = [
"dirs-sys",
]
[[package]]
name = "dirs-sys"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03d86534ed367a67548dc68113a0f5db55432fdfbb6e6f9d77704397d95d5780"
dependencies = [
"libc",
"redox_users",
"winapi",
]
[[package]]
name = "distant"
version = "0.4.0"
version = "0.5.0"
dependencies = [
"bytes",
"derive_more",
"directories",
"flexi_logger",
"fork",
"futures",
@ -772,16 +751,6 @@ dependencies = [
"bitflags",
]
[[package]]
name = "redox_users"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "528532f3d801c87aec9def2add9ca802fe569e44a544afe633765267840abe64"
dependencies = [
"getrandom",
"redox_syscall",
]
[[package]]
name = "regex"
version = "1.5.4"

@ -2,7 +2,7 @@
name = "distant"
description = "Operate on a remote computer through file and process manipulation"
categories = ["command-line-utilities"]
version = "0.4.0"
version = "0.5.0"
authors = ["Chip Senkbeil <chip@senkbeil.org>"]
edition = "2018"
homepage = "https://github.com/chipsenkbeil/distant"
@ -18,7 +18,6 @@ codegen-units = 1
[dependencies]
bytes = "1.0.1"
derive_more = { version = "0.99.16", default-features = false, features = ["display", "from", "error", "is_variant"] }
directories = "3.0.2"
futures = "0.3.16"
hex = "0.4.3"
k256 = { version = "0.9.6", features = ["ecdh"] }

@ -0,0 +1,2 @@
pub mod opt;
mod subcommand;

@ -1,4 +1,10 @@
use crate::{data::RequestPayload, subcommand};
use crate::{
cli::subcommand,
core::{
constants::{SESSION_FILE_PATH_STR, SESSION_SOCKET_PATH_STR},
data::RequestPayload,
},
};
use derive_more::{Display, Error, From, IsVariant};
use lazy_static::lazy_static;
use std::{
@ -48,6 +54,20 @@ pub struct CommonOpt {
pub log_file: Option<PathBuf>,
}
/// Contains options related sessions
#[derive(Debug, StructOpt)]
pub struct SessionOpt {
/// Represents the location of the file containing session information,
/// only useful when the session is set to "file"
#[structopt(long, default_value = &SESSION_FILE_PATH_STR)]
pub session_file: PathBuf,
/// Represents the location of the session's socket to communicate across,
/// only useful when the session is set to "socket"
#[structopt(long, default_value = &SESSION_SOCKET_PATH_STR)]
pub session_socket: PathBuf,
}
#[derive(Debug, StructOpt)]
pub enum Subcommand {
/// Performs some action on a remote machine
@ -58,9 +78,6 @@ pub enum Subcommand {
/// Begins listening for incoming requests
Listen(ListenSubcommand),
/// Performs some task related to the current session
Session(SessionSubcommand),
}
impl Subcommand {
@ -70,41 +87,12 @@ impl Subcommand {
Self::Action(cmd) => subcommand::action::run(cmd, opt)?,
Self::Launch(cmd) => subcommand::launch::run(cmd, opt)?,
Self::Listen(cmd) => subcommand::listen::run(cmd, opt)?,
Self::Session(cmd) => subcommand::session::run(cmd, opt)?,
}
Ok(())
}
}
/// Represents subcommand to operate on a session
#[derive(Debug, StructOpt)]
pub enum SessionSubcommand {
/// Clears the current session
Clear,
/// Reports whether or not a session exists
Exists,
/// Prints out information about the available sessions
Info {
/// Represents the format that results should be returned
///
/// Currently, there are two possible formats:
///
/// 1. "json": printing out JSON for external program usage
/// 2. "shell": printing out human-readable results for interactive shell usage
#[structopt(
short,
long,
case_insensitive = true,
default_value = Mode::Shell.into(),
possible_values = Mode::VARIANTS
)]
mode: Mode,
},
}
/// Represents the communication medium used for the send command
#[derive(
Copy,
@ -150,11 +138,15 @@ pub struct ActionSubcommand {
/// Represents the medium for retrieving a session for use in performing the action
#[structopt(
long,
default_value = SessionInput::File.into(),
default_value = SessionInput::default().into(),
possible_values = SessionInput::VARIANTS
)]
pub session: SessionInput,
/// Contains additional information related to sessions
#[structopt(flatten)]
pub session_data: SessionOpt,
/// If specified, commands to send are sent over stdin and responses are received
/// over stdout (and stderr if mode is shell)
#[structopt(short, long)]
@ -244,6 +236,25 @@ pub enum SessionOutput {
/// Session is stored and retrieved over anonymous pipes (stdout/stdin)
/// in form of `DISTANT DATA <host> <port> <auth key>`
Pipe,
/// Special scenario where the session is not shared but is instead kept within the
/// launch program, where the program now listens on a unix socket for input
#[cfg(unix)]
Socket,
}
impl Default for SessionOutput {
/// For unix-based systems, output defaults to a socket
#[cfg(unix)]
fn default() -> Self {
Self::Socket
}
/// For non-unix-based systems, output defaults to a file
#[cfg(not(unix))]
fn default() -> Self {
Self::File
}
}
/// Represents the means by which to consume a session when performing an action
@ -274,6 +285,25 @@ pub enum SessionInput {
/// Session is stored and retrieved over anonymous pipes (stdout/stdin)
/// in form of `DISTANT DATA <host> <port> <auth key>`
Pipe,
/// Session isn't directly available but instead there is a process listening
/// on a unix socket that will forward requests and responses
#[cfg(unix)]
Socket,
}
impl Default for SessionInput {
/// For unix-based systems, input defaults to a socket
#[cfg(unix)]
fn default() -> Self {
Self::Socket
}
/// For non-unix-based systems, input defaults to a file
#[cfg(not(unix))]
fn default() -> Self {
Self::File
}
}
/// Represents subcommand to launch a remote server
@ -282,11 +312,20 @@ pub struct LaunchSubcommand {
/// Represents the medium for sharing the session upon launching on a remote machine
#[structopt(
long,
default_value = SessionOutput::File.into(),
default_value = SessionOutput::default().into(),
possible_values = SessionOutput::VARIANTS
)]
pub session: SessionOutput,
/// Contains additional information related to sessions
#[structopt(flatten)]
pub session_data: SessionOpt,
/// Runs in background via daemon-mode (does nothing on windows); only applies
/// when session is socket
#[structopt(short, long)]
pub daemon: bool,
/// Represents the format that results should be returned when session is "keep",
/// causing the launcher to enter an interactive loop to handle input and output
/// itself rather than enabling other clients to connect
@ -299,13 +338,13 @@ pub struct LaunchSubcommand {
)]
pub mode: Mode,
/// Path to remote program to execute via ssh
#[structopt(short, long, default_value = "distant")]
pub remote_program: String,
/// Path to distant program to execute via ssh
#[structopt(long, default_value = "distant")]
pub distant: String,
/// Path to ssh program to execute
#[structopt(short, long, default_value = "ssh")]
pub ssh_program: String,
#[structopt(long, default_value = "ssh")]
pub ssh: String,
/// Control the IP address that the server binds to.
///

@ -1,13 +1,16 @@
use crate::{
data::{Request, RequestPayload, Response, ResponsePayload},
net::Client,
opt::Mode,
cli::opt::Mode,
core::{
data::{Request, RequestPayload, Response, ResponsePayload},
net::{Client, DataStream},
},
};
use derive_more::IsVariant;
use log::*;
use std::marker::Unpin;
use structopt::StructOpt;
use tokio::{
io,
io::{self, AsyncRead, AsyncWrite},
sync::{
mpsc,
oneshot::{self, error::TryRecvError},
@ -34,8 +37,15 @@ impl From<LoopConfig> for Mode {
/// Starts a new action loop that processes requests and receives responses
///
/// id represents the id of a remote process
pub async fn interactive_loop(mut client: Client, config: LoopConfig) -> io::Result<()> {
let mut stream = client.to_response_stream();
pub async fn interactive_loop<T>(
mut client: Client<T>,
tenant: String,
config: LoopConfig,
) -> io::Result<()>
where
T: AsyncRead + AsyncWrite + DataStream + Unpin + 'static,
{
let mut stream = client.to_response_broadcast_stream();
// Create a channel that can report when we should stop the loop based on a received request
let (tx_stop, mut rx_stop) = oneshot::channel::<()>();
@ -55,7 +65,7 @@ pub async fn interactive_loop(mut client: Client, config: LoopConfig) -> io::Res
// For json mode, all stdin is treated as individual requests
LoopConfig::Json => {
trace!("Client sending request: {:?}", line);
debug!("Client sending request: {:?}", line);
let result = serde_json::from_str(&line)
.map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x));
match result {
@ -80,7 +90,7 @@ pub async fn interactive_loop(mut client: Client, config: LoopConfig) -> io::Res
continue;
}
trace!("Client sending command: {:?}", line);
debug!("Client sending command: {:?}", line);
// NOTE: We have to stick something in as the first argument as clap/structopt
// expect the binary name as the first item in the iterator
@ -89,15 +99,17 @@ pub async fn interactive_loop(mut client: Client, config: LoopConfig) -> io::Res
.chain(line.trim().split(' ').filter(|s| !s.trim().is_empty())),
);
match payload_result {
Ok(payload) => match client.send(Request::from(payload)).await {
Ok(res) => match format_response(Mode::Shell, res) {
Ok(out) => out.print(),
Err(x) => error!("Failed to format response: {}", x),
},
Err(x) => {
error!("Failed to send request: {}", x)
Ok(payload) => {
match client.send(Request::new(tenant.as_str(), payload)).await {
Ok(res) => match format_response(Mode::Shell, res) {
Ok(out) => out.print(),
Err(x) => error!("Failed to format response: {}", x),
},
Err(x) => {
error!("Failed to send request: {}", x)
}
}
},
}
Err(x) => {
error!("Failed to parse command: {}", x);
}
@ -106,11 +118,14 @@ pub async fn interactive_loop(mut client: Client, config: LoopConfig) -> io::Res
// For non-interactive shell mode, all stdin is treated as a proc's stdin
LoopConfig::Proc { id } => {
trace!("Client sending stdin: {:?}", line);
let req = Request::from(RequestPayload::ProcStdin {
id,
data: line.into_bytes(),
});
debug!("Client sending stdin: {:?}", line);
let req = Request::new(
tenant.as_str(),
RequestPayload::ProcStdin {
id,
data: line.into_bytes(),
},
);
let result = client.send(req).await;
if let Err(x) = result {

@ -1,8 +1,11 @@
use crate::{
data::{Request, ResponsePayload},
net::{Client, TransportError},
opt::{ActionSubcommand, CommonOpt, Mode, SessionInput},
session::{Session, SessionFile},
cli::opt::{ActionSubcommand, CommonOpt, Mode, SessionInput},
core::{
data::{Request, ResponsePayload},
net::{Client, DataStream, TransportError},
session::{Session, SessionFile},
utils,
},
};
use derive_more::{Display, Error, From};
use log::*;
@ -26,26 +29,55 @@ pub fn run(cmd: ActionSubcommand, opt: CommonOpt) -> Result<(), Error> {
}
async fn run_async(cmd: ActionSubcommand, _opt: CommonOpt) -> Result<(), Error> {
let session = match cmd.session {
SessionInput::Environment => Session::from_environment()?,
SessionInput::File => SessionFile::load().await?.into(),
SessionInput::Pipe => Session::from_stdin()?,
};
let mut client = Client::connect(session).await?;
match cmd.session {
SessionInput::Environment => {
start(
cmd,
Client::tcp_connect(Session::from_environment()?).await?,
)
.await
}
SessionInput::File => {
let path = cmd.session_data.session_file.clone();
start(
cmd,
Client::tcp_connect(SessionFile::load_from(path).await?.into()).await?,
)
.await
}
SessionInput::Pipe => start(cmd, Client::tcp_connect(Session::from_stdin()?).await?).await,
#[cfg(unix)]
SessionInput::Socket => {
let path = cmd.session_data.session_socket.clone();
start(cmd, Client::unix_connect(path, None).await?).await
}
#[cfg(not(unix))]
SessionInput::Socket => unreachable!(),
}
}
async fn start<T>(cmd: ActionSubcommand, mut client: Client<T>) -> Result<(), Error>
where
T: DataStream + 'static,
{
if !cmd.interactive && cmd.operation.is_none() {
return Err(Error::MissingOperation);
}
// Make up a tenant name
let tenant = utils::new_tenant();
// Special conditions for continuing to process responses
let mut is_proc_req = false;
let mut proc_id = 0;
if let Some(req) = cmd.operation.map(Request::from) {
if let Some(req) = cmd
.operation
.map(|payload| Request::new(tenant.as_str(), payload))
{
is_proc_req = req.payload.is_proc_run();
trace!("Client sending request: {:?}", req);
debug!("Client sending request: {:?}", req);
let res = client.send(req).await?;
// Store the spawned process id for using in sending stdin (if we spawned a proc)
@ -67,7 +99,7 @@ async fn run_async(cmd: ActionSubcommand, _opt: CommonOpt) -> Result<(), Error>
Mode::Shell if cmd.interactive => inner::LoopConfig::Shell,
Mode::Shell => inner::LoopConfig::Proc { id: proc_id },
};
inner::interactive_loop(client, config).await?;
inner::interactive_loop(client, tenant, config).await?;
}
Ok(())

@ -0,0 +1,301 @@
use crate::{
cli::opt::{CommonOpt, LaunchSubcommand, Mode, SessionOutput},
core::{
constants::CLIENT_BROADCAST_CHANNEL_CAPACITY,
data::{Request, Response},
net::{Client, Transport, TransportReadHalf, TransportWriteHalf},
session::{Session, SessionFile},
utils,
},
};
use derive_more::{Display, Error, From};
use fork::{daemon, Fork};
use hex::FromHexError;
use log::*;
use orion::errors::UnknownCryptoError;
use std::{marker::Unpin, path::Path, string::FromUtf8Error};
use tokio::{
io::{self, AsyncRead, AsyncWrite},
process::Command,
sync::{broadcast, mpsc, oneshot},
};
#[derive(Debug, Display, Error, From)]
pub enum Error {
#[display(fmt = "Missing data for session")]
MissingSessionData,
ForkError(#[error(not(source))] i32),
BadKey(UnknownCryptoError),
HexError(FromHexError),
IoError(io::Error),
Utf8Error(FromUtf8Error),
}
pub fn run(cmd: LaunchSubcommand, opt: CommonOpt) -> Result<(), Error> {
let rt = tokio::runtime::Runtime::new()?;
let session_output = cmd.session;
let mode = cmd.mode;
let is_daemon = cmd.daemon;
let session_file = cmd.session_data.session_file.clone();
let session_socket = cmd.session_data.session_socket.clone();
let session = rt.block_on(async { spawn_remote_server(cmd, opt).await })?;
// Handle sharing resulting session in different ways
match session_output {
SessionOutput::File => {
debug!("Outputting session to {:?}", session_file);
rt.block_on(async { SessionFile::new(session_file, session).save().await })?
}
SessionOutput::Keep => {
debug!("Entering interactive loop over stdin");
rt.block_on(async { keep_loop(session, mode).await })?
}
SessionOutput::Pipe => {
debug!("Piping session to stdout");
println!("{}", session.to_unprotected_string())
}
SessionOutput::Socket if is_daemon => {
debug!(
"Forking and entering interactive loop over unix socket {:?}",
session_socket
);
match daemon(false, false) {
Ok(Fork::Child) => {
rt.block_on(async { socket_loop(session_socket, session).await })?
}
Ok(_) => {}
Err(x) => return Err(Error::ForkError(x)),
}
}
#[cfg(unix)]
SessionOutput::Socket => {
debug!(
"Entering interactive loop over unix socket {:?}",
session_socket
);
rt.block_on(async { socket_loop(session_socket, session).await })?
}
#[cfg(not(unix))]
SessionOutput::Socket => {
debug!(concat!(
"Trying to enter interactive loop over unix socket, ",
"but not on unix platform!"
));
unreachable!()
}
}
Ok(())
}
async fn keep_loop(session: Session, mode: Mode) -> io::Result<()> {
use crate::cli::subcommand::action::inner;
match Client::tcp_connect(session).await {
Ok(client) => {
let config = match mode {
Mode::Json => inner::LoopConfig::Json,
Mode::Shell => inner::LoopConfig::Shell,
};
inner::interactive_loop(client, utils::new_tenant(), config).await
}
Err(x) => Err(x),
}
}
#[cfg(unix)]
async fn socket_loop(socket_path: impl AsRef<Path>, session: Session) -> io::Result<()> {
// We need to form a connection with the actual server to forward requests
// and responses between connections
let mut client = Client::tcp_connect(session).await?;
// Get a copy of our client's broadcaster so we can have each connection
// subscribe to it for new messages filtered by tenant
let broadcaster = client.to_response_broadcaster();
// Spawn task to send to the server requests from connections
let (req_tx, mut req_rx) = mpsc::channel::<Request>(CLIENT_BROADCAST_CHANNEL_CAPACITY);
tokio::spawn(async move {
while let Some(req) = req_rx.recv().await {
debug!(
"Forwarding request of type {} to server",
req.payload.as_ref()
);
if let Err(x) = client.fire(req).await {
error!("Client failed to send request: {:?}", x);
break;
}
}
});
// Continue to receive connections over the unix socket, store them in our
// connection mapping
let listener = tokio::net::UnixListener::bind(socket_path)?;
while let Ok((conn, addr)) = listener.accept().await {
// Establish a proper connection via a handshake, discarding the connection otherwise
let transport = match Transport::from_handshake(conn, None).await {
Ok(transport) => transport,
Err(x) => {
error!("<Client @ {:?}> Failed handshake: {}", addr, x);
continue;
}
};
let (t_read, t_write) = transport.into_split();
// Used to alert our response task of the connection's tenant name
// based on the first
let (tenant_tx, tenant_rx) = oneshot::channel();
// Spawn task to continually receive responses from the client that
// may or may not be relevant to the connection, which will filter
// by tenant and then along any response that matches
let res_rx = broadcaster.subscribe();
tokio::spawn(async move {
handle_conn_outgoing(addr, t_write, tenant_rx, res_rx).await;
});
// Spawn task to continually read requests from connection and forward
// them along to be sent via the client
let req_tx = req_tx.clone();
tokio::spawn(async move {
handle_conn_incoming(t_read, tenant_tx, req_tx).await;
});
}
Ok(())
}
/// Conn::Request -> Client::Fire
async fn handle_conn_incoming<T>(
mut reader: TransportReadHalf<T>,
tenant_tx: oneshot::Sender<String>,
req_tx: mpsc::Sender<Request>,
) where
T: AsyncRead + Unpin,
{
macro_rules! process_req {
($on_success:expr) => {
match reader.receive::<Request>().await {
Ok(Some(req)) => {
$on_success(&req);
if let Err(x) = req_tx.send(req).await {
error!(
"Failed to pass along request received on unix socket: {:?}",
x
);
return;
}
}
Ok(None) => return,
Err(x) => {
error!("Failed to receive request from unix stream: {:?}", x);
return;
}
}
};
}
// NOTE: Have to acquire our first request outside our loop since the oneshot
// sender of the tenant's name is consuming
process_req!(|req: &Request| {
if let Err(x) = tenant_tx.send(req.tenant.clone()) {
error!("Failed to send along acquired tenant name: {:?}", x);
return;
}
});
loop {
process_req!(|_| {});
}
}
async fn handle_conn_outgoing<T>(
addr: tokio::net::unix::SocketAddr,
mut writer: TransportWriteHalf<T>,
tenant_rx: oneshot::Receiver<String>,
mut res_rx: broadcast::Receiver<Response>,
) where
T: AsyncWrite + Unpin,
{
// We wait for the tenant to be identified by the first request
// before processing responses to be sent back; this is easier
// to implement and yields the same result as we would be dropping
// all responses before we know the tenant
if let Ok(tenant) = tenant_rx.await {
debug!("Associated tenant {} with conn {:?}", tenant, addr);
loop {
match res_rx.recv().await {
// Forward along responses that are for our connection
Ok(res) if res.tenant == tenant => {
debug!(
"Conn {:?} being sent response of type {}",
addr,
res.payload.as_ref()
);
if let Err(x) = writer.send(res).await {
error!("Failed to send response through unix connection: {}", x);
break;
}
}
// Skip responses that are not for our connection
Ok(_) => {}
Err(x) => {
error!(
"Conn {:?} failed to receive broadcast response: {}",
addr, x
);
break;
}
}
}
}
}
/// Spawns a remote server that listens for requests
///
/// Returns the session associated with the server
async fn spawn_remote_server(cmd: LaunchSubcommand, _opt: CommonOpt) -> Result<Session, Error> {
let distant_command = format!(
"{} listen --daemon --host {} {}",
cmd.distant,
cmd.bind_server,
cmd.extra_server_args.unwrap_or_default(),
);
let ssh_command = format!(
"{} -o StrictHostKeyChecking=no ssh://{}@{}:{} {} {}",
cmd.ssh,
cmd.username,
cmd.host.as_str(),
cmd.port,
cmd.identity_file
.map(|f| format!("-i {}", f.as_path().display()))
.unwrap_or_default(),
distant_command.trim(),
);
let out = Command::new("sh")
.arg("-c")
.arg(ssh_command)
.output()
.await?;
// If our attempt to run the program via ssh failed, report it
if !out.status.success() {
return Err(Error::from(io::Error::new(
io::ErrorKind::Other,
String::from_utf8(out.stderr)?.trim().to_string(),
)));
}
// Parse our output for the specific session line
// NOTE: The host provided on this line isn't valid, so we fill it in with our actual host
let out = String::from_utf8(out.stdout)?.trim().to_string();
let mut session = out
.lines()
.find_map(|line| line.parse::<Session>().ok())
.ok_or(Error::MissingSessionData)?;
session.host = cmd.host;
Ok(session)
}

@ -1,5 +1,5 @@
use super::{Process, State};
use crate::data::{
use crate::core::data::{
DirEntry, FileType, Metadata, Request, RequestPayload, Response, ResponsePayload,
RunningProcess,
};
@ -25,6 +25,7 @@ pub(super) async fn process(
tx: Reply,
) -> Result<(), mpsc::error::SendError<Response>> {
async fn inner(
tenant: String,
addr: SocketAddr,
state: HState,
payload: RequestPayload,
@ -43,24 +44,28 @@ pub(super) async fn process(
RequestPayload::Copy { src, dst } => copy(src, dst).await,
RequestPayload::Rename { src, dst } => rename(src, dst).await,
RequestPayload::Metadata { path } => metadata(path).await,
RequestPayload::ProcRun { cmd, args } => proc_run(addr, state, tx, cmd, args).await,
RequestPayload::ProcRun { cmd, args } => {
proc_run(tenant.to_string(), addr, state, tx, cmd, args).await
}
RequestPayload::ProcKill { id } => proc_kill(state, id).await,
RequestPayload::ProcStdin { id, data } => proc_stdin(state, id, data).await,
RequestPayload::ProcList {} => proc_list(state).await,
}
}
let res = Response::from_payload_with_origin(
match inner(addr, state, req.payload, tx.clone()).await {
let tenant = req.tenant.clone();
let res = Response::new(
req.tenant,
Some(req.id),
match inner(tenant, addr, state, req.payload, tx.clone()).await {
Ok(payload) => payload,
Err(x) => ResponsePayload::Error {
description: x.to_string(),
},
},
req.id,
);
trace!(
debug!(
"<Client @ {}> Sending response of type {}",
addr,
res.payload.as_ref()
@ -236,6 +241,7 @@ async fn metadata(path: PathBuf) -> Result<ResponsePayload, Box<dyn Error>> {
}
async fn proc_run(
tenant: String,
addr: SocketAddr,
state: HState,
tx: Reply,
@ -253,14 +259,19 @@ async fn proc_run(
// Spawn a task that sends stdout as a response
let tx_2 = tx.clone();
let tenant_2 = tenant.clone();
let mut stdout = child.stdout.take().unwrap();
let stdout_task = tokio::spawn(async move {
loop {
let mut data = Vec::new();
match stdout.read_to_end(&mut data).await {
Ok(n) if n > 0 => {
let res = Response::from(ResponsePayload::ProcStdout { id, data });
trace!(
let res = Response::new(
tenant_2.as_str(),
None,
ResponsePayload::ProcStdout { id, data },
);
debug!(
"<Client @ {}> Sending response of type {}",
addr,
res.payload.as_ref()
@ -277,14 +288,19 @@ async fn proc_run(
// Spawn a task that sends stderr as a response
let tx_2 = tx.clone();
let tenant_2 = tenant.clone();
let mut stderr = child.stderr.take().unwrap();
let stderr_task = tokio::spawn(async move {
loop {
let mut data = Vec::new();
match stderr.read_to_end(&mut data).await {
Ok(n) if n > 0 => {
let res = Response::from(ResponsePayload::ProcStderr { id, data });
trace!(
let res = Response::new(
tenant_2.as_str(),
None,
ResponsePayload::ProcStderr { id, data },
);
debug!(
"<Client @ {}> Sending response of type {}",
addr,
res.payload.as_ref()
@ -329,8 +345,12 @@ async fn proc_run(
Ok(status) => {
let success = status.success();
let code = status.code();
let res = Response::from(ResponsePayload::ProcDone { id, success, code });
trace!(
let res = Response::new(
tenant.as_str(),
None,
ResponsePayload::ProcDone { id, success, code }
);
debug!(
"<Client @ {}> Sending response of type {}",
addr,
res.payload.as_ref()
@ -340,10 +360,10 @@ async fn proc_run(
}
}
Err(x) => {
let res = Response::from(ResponsePayload::Error {
let res = Response::new(tenant.as_str(), None, ResponsePayload::Error {
description: x.to_string()
});
trace!(
debug!(
"<Client @ {}> Sending response of type {}",
addr,
res.payload.as_ref()
@ -369,10 +389,10 @@ async fn proc_run(
}
let res = Response::from(ResponsePayload::ProcDone {
let res = Response::new(tenant.as_str(), None, ResponsePayload::ProcDone {
id, success: false, code: None
});
trace!(
debug!(
"<Client @ {}> Sending response of type {}",
addr,
res.payload.as_ref()

@ -1,8 +1,10 @@
use crate::{
data::{Request, Response},
net::{Transport, TransportReadHalf, TransportWriteHalf},
opt::{CommonOpt, ConvertToIpAddrError, ListenSubcommand},
session::Session,
cli::opt::{CommonOpt, ConvertToIpAddrError, ListenSubcommand},
core::{
data::{Request, Response},
net::{Transport, TransportReadHalf, TransportWriteHalf},
session::Session,
},
};
use derive_more::{Display, Error, From};
use fork::{daemon, Fork};
@ -11,7 +13,7 @@ use orion::aead::SecretKey;
use std::{collections::HashMap, net::SocketAddr, sync::Arc};
use tokio::{
io,
net::TcpListener,
net::{tcp, TcpListener},
sync::{mpsc, oneshot, Mutex},
};
@ -126,21 +128,9 @@ async fn run_async(cmd: ListenSubcommand, _opt: CommonOpt, is_forked: bool) -> R
// Wait for a client connection, then spawn a new task to handle
// receiving data from the client
while let Ok((client, _)) = listener.accept().await {
// Grab the client's remote address for later logging purposes
let addr = match client.peer_addr() {
Ok(addr) => {
info!("<Client @ {}> Established connection", addr);
addr
}
Err(x) => {
error!("Unable to examine client's peer address: {}", x);
continue;
}
};
while let Ok((client, addr)) = listener.accept().await {
// Establish a proper connection via a handshake, discarding the connection otherwise
let transport = match Transport::from_handshake(client, Arc::clone(&key)).await {
let transport = match Transport::from_handshake(client, Some(Arc::clone(&key))).await {
Ok(transport) => transport,
Err(x) => {
error!("<Client @ {}> Failed handshake: {}", addr, x);
@ -176,13 +166,13 @@ async fn run_async(cmd: ListenSubcommand, _opt: CommonOpt, is_forked: bool) -> R
async fn request_loop(
addr: SocketAddr,
state: Arc<Mutex<State>>,
mut transport: TransportReadHalf,
mut transport: TransportReadHalf<tcp::OwnedReadHalf>,
tx: mpsc::Sender<Response>,
) {
loop {
match transport.receive::<Request>().await {
Ok(Some(req)) => {
trace!(
debug!(
"<Client @ {}> Received request of type {}",
addr,
req.payload.as_ref()
@ -208,7 +198,7 @@ async fn request_loop(
/// Repeatedly sends responses out over the wire
async fn response_loop(
addr: SocketAddr,
mut transport: TransportWriteHalf,
mut transport: TransportWriteHalf<tcp::OwnedWriteHalf>,
mut rx: mpsc::Receiver<Response>,
) {
while let Some(res) = rx.recv().await {

@ -1,4 +1,3 @@
pub mod action;
pub mod launch;
pub mod listen;
pub mod session;

@ -1,6 +0,0 @@
/// Capacity associated with a client broadcasting its received messages that
/// do not have a callback associated
pub static CLIENT_BROADCAST_CHANNEL_CAPACITY: usize = 100;
/// Represents the length of the salt to use for encryption
pub static SALT_LEN: usize = 16;

@ -0,0 +1,18 @@
use std::{env, path::PathBuf};
/// Capacity associated with a client broadcasting its received messages that
/// do not have a callback associated
pub static CLIENT_BROADCAST_CHANNEL_CAPACITY: usize = 100;
/// Represents the length of the salt to use for encryption
pub static SALT_LEN: usize = 16;
lazy_static::lazy_static! {
/// Represents the path to the global session file
pub static ref SESSION_FILE_PATH: PathBuf = env::temp_dir().join("distant.session.txt");
pub static ref SESSION_FILE_PATH_STR: String = SESSION_FILE_PATH.to_string_lossy().to_string();
/// Represents the path to a socket to communicate instead of a session file
pub static ref SESSION_SOCKET_PATH: PathBuf = env::temp_dir().join("distant.session.sock");
pub static ref SESSION_SOCKET_PATH_STR: String = SESSION_SOCKET_PATH.to_string_lossy().to_string();
}

@ -8,6 +8,9 @@ use strum::AsRefStr;
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case", deny_unknown_fields)]
pub struct Request {
/// A name tied to the requester (tenant)
pub tenant: String,
/// A unique id associated with the request
pub id: usize,
@ -15,11 +18,15 @@ pub struct Request {
pub payload: RequestPayload,
}
impl From<RequestPayload> for Request {
/// Produces a new request with the given payload and a randomly-generated id
fn from(payload: RequestPayload) -> Self {
impl Request {
/// Creates a new request, generating a unique id for it
pub fn new(tenant: impl Into<String>, payload: RequestPayload) -> Self {
let id = rand::random();
Self { id, payload }
Self {
tenant: tenant.into(),
id,
payload,
}
}
}
@ -173,6 +180,9 @@ pub enum RequestPayload {
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case", deny_unknown_fields)]
pub struct Response {
/// A name tied to the requester (tenant)
pub tenant: String,
/// A unique id associated with the response
pub id: usize,
@ -185,25 +195,17 @@ pub struct Response {
}
impl Response {
/// Produces a new response with the given payload and origin id while supplying
/// randomly-generated id
pub fn from_payload_with_origin(payload: ResponsePayload, origin_id: usize) -> Self {
let id = rand::random();
Self {
id,
origin_id: Some(origin_id),
payload,
}
}
}
impl From<ResponsePayload> for Response {
/// Produces a new response with the given payload, no origin id, and a randomly-generated id
fn from(payload: ResponsePayload) -> Self {
/// Creates a new response, generating a unique id for it
pub fn new(
tenant: impl Into<String>,
origin_id: Option<usize>,
payload: ResponsePayload,
) -> Self {
let id = rand::random();
Self {
tenant: tenant.into(),
id,
origin_id: None,
origin_id,
payload,
}
}

@ -0,0 +1,5 @@
pub mod constants;
pub mod data;
pub mod net;
pub mod session;
pub mod utils;

@ -1,7 +1,7 @@
mod transport;
pub use transport::{Transport, TransportError, TransportReadHalf, TransportWriteHalf};
pub use transport::{DataStream, Transport, TransportError, TransportReadHalf, TransportWriteHalf};
use crate::{
use crate::core::{
constants::CLIENT_BROADCAST_CHANNEL_CAPACITY,
data::{Request, Response},
session::Session,
@ -13,6 +13,7 @@ use std::{
};
use tokio::{
io,
net::TcpStream,
sync::{broadcast, oneshot},
};
use tokio_stream::wrappers::BroadcastStream;
@ -20,9 +21,12 @@ use tokio_stream::wrappers::BroadcastStream;
type Callbacks = Arc<Mutex<HashMap<usize, oneshot::Sender<Response>>>>;
/// Represents a client that can make requests against a server
pub struct Client {
pub struct Client<T>
where
T: DataStream,
{
/// Underlying transport used by client
t_write: TransportWriteHalf,
t_write: TransportWriteHalf<T::Write>,
/// Collection of callbacks to be invoked upon receiving a response to a request
callbacks: Callbacks,
@ -36,10 +40,10 @@ pub struct Client {
init_broadcast_receiver: Option<broadcast::Receiver<Response>>,
}
impl Client {
/// Establishes a connection using the provided session
pub async fn connect(session: Session) -> io::Result<Self> {
let transport = Transport::connect(session).await?;
impl Client<TcpStream> {
/// Connect to a remote TCP session
pub async fn tcp_connect(session: Session) -> io::Result<Self> {
let transport = Transport::<TcpStream>::connect(session).await?;
debug!(
"Client has connected to {}",
transport
@ -47,7 +51,35 @@ impl Client {
.map(|x| x.to_string())
.unwrap_or_else(|_| String::from("???"))
);
Self::inner_connect(transport).await
}
}
#[cfg(unix)]
impl Client<tokio::net::UnixStream> {
/// Connect to a proxy unix socket
pub async fn unix_connect(
path: impl AsRef<std::path::Path>,
auth_key: Option<Arc<orion::aead::SecretKey>>,
) -> io::Result<Self> {
let transport = Transport::<tokio::net::UnixStream>::connect(path, auth_key).await?;
debug!(
"Client has connected to {}",
transport
.peer_addr()
.map(|x| format!("{:?}", x))
.unwrap_or_else(|_| String::from("???"))
);
Self::inner_connect(transport).await
}
}
impl<T> Client<T>
where
T: DataStream,
{
/// Establishes a connection using the provided session
async fn inner_connect(transport: Transport<T>) -> io::Result<Self> {
let (mut t_read, t_write) = transport.into_split();
let callbacks: Callbacks = Arc::new(Mutex::new(HashMap::new()));
let (broadcast, init_broadcast_receiver) =
@ -112,8 +144,21 @@ impl Client {
.map_err(|x| TransportError::from(io::Error::new(io::ErrorKind::ConnectionAborted, x)))
}
/// Creates and returns a new stream of responses that are received with no originating request
pub fn to_response_stream(&mut self) -> BroadcastStream<Response> {
/// Sends a request without waiting for a response
///
/// Any response that would be received gets sent over the broadcast channel instead
pub async fn fire(&mut self, req: Request) -> Result<(), TransportError> {
self.t_write.send(req).await
}
/// Clones a new instance of the broadcaster used by the client
pub fn to_response_broadcaster(&self) -> broadcast::Sender<Response> {
self.broadcast.clone()
}
/// Creates and returns a new stream of responses that are received that do not match the
/// response to a `send` request
pub fn to_response_broadcast_stream(&mut self) -> BroadcastStream<Response> {
BroadcastStream::new(
self.init_broadcast_receiver
.take()

@ -1,8 +1,9 @@
use crate::{constants::SALT_LEN, session::Session};
use crate::core::{constants::SALT_LEN, session::Session};
use codec::DistantCodec;
use derive_more::{Display, Error, From};
use futures::SinkExt;
use k256::{ecdh::EphemeralSecret, EncodedPoint, PublicKey};
use log::*;
use orion::{
aead::{self, SecretKey},
auth::{self, Tag},
@ -11,10 +12,10 @@ use orion::{
pwhash::Password,
};
use serde::{de::DeserializeOwned, Serialize};
use std::{net::SocketAddr, sync::Arc};
use std::{marker::Unpin, net::SocketAddr, sync::Arc};
use tokio::{
io,
net::{tcp, TcpStream},
io::{self, AsyncRead, AsyncWrite},
net::{self, tcp, TcpStream},
};
use tokio_stream::StreamExt;
use tokio_util::codec::{Framed, FramedRead, FramedWrite};
@ -24,34 +25,87 @@ mod codec;
#[derive(Debug, Display, Error, From)]
pub enum TransportError {
#[from(ignore)]
#[display(fmt = "Authentication Error: {}", _0)]
AuthError(UnknownCryptoError),
#[from(ignore)]
#[display(fmt = "Encryption Error: {}", _0)]
EncryptError(UnknownCryptoError),
IoError(io::Error),
SerializeError(serde_cbor::Error),
}
/// Interface representing a two-way data stream
///
/// Enables splitting into separate, functioning halves that can read and write respectively
pub trait DataStream: AsyncRead + AsyncWrite + Unpin {
type Read: AsyncRead + Send + Unpin + 'static;
type Write: AsyncWrite + Send + Unpin + 'static;
/// Returns a textual description of the connection associated with this stream
fn to_connection_tag(&self) -> String;
/// Splits this stream into read and write halves
fn into_split(self) -> (Self::Read, Self::Write);
}
impl DataStream for TcpStream {
type Read = tcp::OwnedReadHalf;
type Write = tcp::OwnedWriteHalf;
fn to_connection_tag(&self) -> String {
self.peer_addr()
.map(|addr| format!("{}", addr))
.unwrap_or_else(|_| String::from("--"))
}
fn into_split(self) -> (Self::Read, Self::Write) {
TcpStream::into_split(self)
}
}
#[cfg(unix)]
impl DataStream for net::UnixStream {
type Read = net::unix::OwnedReadHalf;
type Write = net::unix::OwnedWriteHalf;
fn to_connection_tag(&self) -> String {
self.peer_addr()
.map(|addr| format!("{:?}", addr))
.unwrap_or_else(|_| String::from("--"))
}
fn into_split(self) -> (Self::Read, Self::Write) {
net::UnixStream::into_split(self)
}
}
/// Represents a transport of data across the network
pub struct Transport {
pub struct Transport<T>
where
T: DataStream,
{
/// Underlying connection to some remote system
conn: Framed<TcpStream, DistantCodec>,
conn: Framed<T, DistantCodec>,
/// Used to sign and validate messages
auth_key: Arc<SecretKey>,
auth_key: Option<Arc<SecretKey>>,
/// Used to encrypt and decrypt messages
crypt_key: Arc<SecretKey>,
}
impl Transport {
impl<T> Transport<T>
where
T: DataStream,
{
/// Takes a pre-existing connection and performs a handshake to build out the encryption key
/// with the remote system, returning a transport ready to communicate with the other side
pub async fn from_handshake(stream: TcpStream, auth_key: Arc<SecretKey>) -> io::Result<Self> {
let addr_str = stream
.peer_addr()
.map(|x| x.to_string())
.unwrap_or_else(|_| String::from("???"));
log::trace!("Beginning handshake @ {}", addr_str);
///
/// An optional authentication key can be provided that will be used alongside encryption
/// when communicating across the wire
pub async fn from_handshake(stream: T, auth_key: Option<Arc<SecretKey>>) -> io::Result<Self> {
let connection_tag = stream.to_connection_tag();
trace!("Beginning handshake for {}", connection_tag);
// First, wrap the raw stream in our framed codec
let mut conn = Framed::new(stream, DistantCodec);
@ -112,7 +166,7 @@ impl Transport {
.map_err(|x| io::Error::new(io::ErrorKind::Other, x))?;
let crypt_key = Arc::new(derived_key);
log::trace!("Completed handshake @ {}", addr_str);
trace!("Finished handshake for {}", connection_tag);
Ok(Self {
conn,
@ -120,22 +174,14 @@ impl Transport {
crypt_key,
})
}
}
/// Establishes a connection using the provided session and performs a handshake to establish
/// means of encryption, returning a transport ready to communicate with the other side
pub async fn connect(session: Session) -> io::Result<Self> {
let stream = TcpStream::connect(session.to_socket_addr().await?).await?;
Self::from_handshake(stream, Arc::new(session.auth_key)).await
}
/// Returns the address of the peer the transport is connected to
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
self.conn.get_ref().peer_addr()
}
impl<T> Transport<T>
where
T: AsyncRead + AsyncWrite + DataStream + Unpin,
{
/// Splits transport into read and write halves
pub fn into_split(self) -> (TransportReadHalf, TransportWriteHalf) {
let auth_key = self.auth_key;
pub fn into_split(self) -> (TransportReadHalf<T::Read>, TransportWriteHalf<T::Write>) {
let crypt_key = self.crypt_key;
let parts = self.conn.into_parts();
let (read_half, write_half) = parts.io.into_split();
@ -150,12 +196,12 @@ impl Transport {
let t_read = TransportReadHalf {
conn: f_read,
auth_key: Arc::clone(&auth_key),
auth_key: self.auth_key.as_ref().map(Arc::clone),
crypt_key: Arc::clone(&crypt_key),
};
let t_write = TransportWriteHalf {
conn: f_write,
auth_key,
auth_key: self.auth_key.as_ref().map(Arc::clone),
crypt_key,
};
@ -163,55 +209,122 @@ impl Transport {
}
}
impl Transport<TcpStream> {
/// Establishes a connection using the provided session and performs a handshake to establish
/// means of encryption, returning a transport ready to communicate with the other side
///
/// TCP Streams will always use a session's authentication key
pub async fn connect(session: Session) -> io::Result<Self> {
let stream = TcpStream::connect(session.to_socket_addr().await?).await?;
Self::from_handshake(stream, Some(Arc::new(session.auth_key))).await
}
/// Returns the address of the peer the transport is connected to
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
self.conn.get_ref().peer_addr()
}
}
#[cfg(unix)]
impl Transport<net::UnixStream> {
/// Establishes a connection using the provided session and performs a handshake to establish
/// means of encryption, returning a transport ready to communicate with the other side
///
/// Takes an optional authentication key
pub async fn connect(
path: impl AsRef<std::path::Path>,
auth_key: Option<Arc<SecretKey>>,
) -> io::Result<Self> {
let stream = net::UnixStream::connect(path.as_ref()).await?;
Self::from_handshake(stream, auth_key).await
}
/// Returns the address of the peer the transport is connected to
pub fn peer_addr(&self) -> io::Result<net::unix::SocketAddr> {
self.conn.get_ref().peer_addr()
}
}
/// Represents a transport of data out to the network
pub struct TransportWriteHalf {
pub struct TransportWriteHalf<T>
where
T: AsyncWrite + Unpin,
{
/// Underlying connection to some remote system
conn: FramedWrite<tcp::OwnedWriteHalf, DistantCodec>,
conn: FramedWrite<T, DistantCodec>,
/// Used to sign and validate messages
auth_key: Arc<SecretKey>,
/// Used to sign and validate messages; if none then no sign/validation occurs
auth_key: Option<Arc<SecretKey>>,
/// Used to encrypt and decrypt messages
crypt_key: Arc<SecretKey>,
}
impl TransportWriteHalf {
impl<T> TransportWriteHalf<T>
where
T: AsyncWrite + Unpin,
{
/// Sends some data across the wire, waiting for it to completely send
pub async fn send<T: Serialize>(&mut self, data: T) -> Result<(), TransportError> {
pub async fn send<D: Serialize>(&mut self, data: D) -> Result<(), TransportError> {
// Serialize, encrypt, and then sign
// NOTE: Cannot used packed implementation for now due to issues with deserialization
trace!("Serializing data");
let data = serde_cbor::to_vec(&data)?;
let data = aead::seal(&self.crypt_key, &data).map_err(TransportError::EncryptError)?;
let tag = auth::authenticate(&self.auth_key, &data).map_err(TransportError::AuthError)?;
// Send {TAG LEN}{TAG}{ENCRYPTED DATA}
trace!("Encrypting data of len {}", data.len());
let data = aead::seal(&self.crypt_key, &data).map_err(TransportError::EncryptError)?;
let tag = self
.auth_key
.as_ref()
.map(|key| auth::authenticate(key, &data))
.transpose()
.map_err(TransportError::AuthError)?;
// Send {TAG LEN}{TAG}{ENCRYPTED DATA} if we have an auth key,
// otherwise just send the encrypted data on its own
let mut out: Vec<u8> = Vec::new();
out.push(tag.unprotected_as_bytes().len() as u8);
out.extend_from_slice(tag.unprotected_as_bytes());
if let Some(tag) = tag {
trace!("Signing data of len {}", data.len());
let tag_len = tag.unprotected_as_bytes().len() as u8;
trace!("Tag len {}", tag_len);
out.push(tag_len);
out.extend_from_slice(tag.unprotected_as_bytes());
}
out.extend(data);
trace!("Sending out data of len {}", out.len());
self.conn.send(&out).await.map_err(TransportError::from)
}
}
/// Represents a transport of data in from the network
pub struct TransportReadHalf {
pub struct TransportReadHalf<T>
where
T: AsyncRead + Unpin,
{
/// Underlying connection to some remote system
conn: FramedRead<tcp::OwnedReadHalf, DistantCodec>,
conn: FramedRead<T, DistantCodec>,
/// Used to sign and validate messages
auth_key: Arc<SecretKey>,
auth_key: Option<Arc<SecretKey>>,
/// Used to encrypt and decrypt messages
crypt_key: Arc<SecretKey>,
}
impl TransportReadHalf {
impl<T> TransportReadHalf<T>
where
T: AsyncRead + Unpin,
{
/// Receives some data from out on the wire, waiting until it's available,
/// returning none if the transport is now closed
pub async fn receive<T: DeserializeOwned>(&mut self) -> Result<Option<T>, TransportError> {
pub async fn receive<R: DeserializeOwned>(&mut self) -> Result<Option<R>, TransportError> {
// If data is received, we process like usual
if let Some(data) = self.conn.next().await {
let mut data = data?;
trace!("Received data of len {}", data.len());
if data.is_empty() {
return Err(TransportError::from(io::Error::new(
io::ErrorKind::InvalidData,
@ -220,15 +333,36 @@ impl TransportReadHalf {
}
// Retrieve in form {TAG LEN}{TAG}{ENCRYPTED DATA}
let tag_len = data[0];
let tag =
Tag::from_slice(&data[1..=tag_len as usize]).map_err(TransportError::AuthError)?;
let data = data.split_off(tag_len as usize + 1);
// Validate signature, decrypt, and then deserialize
auth::authenticate_verify(&tag, &self.auth_key, &data)
.map_err(TransportError::AuthError)?;
// with the tag len and tag being optional
if let Some(auth_key) = self.auth_key.as_ref() {
trace!("Verifying signature on data of len {}", data.len());
// Parse the tag from the length, protecting against bad lengths
let tag_len = data[0];
if data.len() <= tag_len as usize {
return Err(TransportError::from(io::Error::new(
io::ErrorKind::InvalidData,
format!("Tag len {} > Data len {}", tag_len, data.len()),
)));
}
trace!("Tag len {}", tag_len);
let tag = Tag::from_slice(&data[1..=tag_len as usize])
.map_err(TransportError::AuthError)?;
// Update data with the content after the tag by mutating
// the current data to point to the return from split_off
data = data.split_off(tag_len as usize + 1);
// Validate signature, decrypt, and then deserialize
auth::authenticate_verify(&tag, auth_key, &data)
.map_err(TransportError::AuthError)?;
}
trace!("Decrypting data of len {}", data.len());
let data = aead::open(&self.crypt_key, &data).map_err(TransportError::EncryptError)?;
trace!("Deserializing decrypted data of len {}", data.len());
let data = serde_cbor::from_slice(&data)?;
Ok(Some(data))

@ -1,11 +1,10 @@
use crate::{PROJECT_DIRS, SESSION_PATH};
use derive_more::{Display, Error};
use orion::aead::SecretKey;
use std::{
env,
net::{IpAddr, SocketAddr},
ops::Deref,
path::Path,
path::{Path, PathBuf},
str::FromStr,
};
use tokio::{io, net::lookup_host};
@ -160,11 +159,20 @@ impl Session {
}
/// Provides operations related to working with a session that is disk-based
pub struct SessionFile(Session);
pub struct SessionFile {
path: PathBuf,
session: Session,
}
impl AsRef<Path> for SessionFile {
fn as_ref(&self) -> &Path {
self.as_path()
}
}
impl AsRef<Session> for SessionFile {
fn as_ref(&self) -> &Session {
&self.0
self.as_session()
}
}
@ -172,56 +180,60 @@ impl Deref for SessionFile {
type Target = Session;
fn deref(&self) -> &Self::Target {
&self.0
&self.session
}
}
impl From<SessionFile> for Session {
fn from(sf: SessionFile) -> Self {
sf.0
sf.session
}
}
impl From<Session> for SessionFile {
fn from(session: Session) -> Self {
Self(session)
impl SessionFile {
/// Creates a new inmemory pointer to a session and its file
pub fn new(path: impl Into<PathBuf>, session: Session) -> Self {
Self {
path: path.into(),
session,
}
}
}
impl SessionFile {
/// Clears the global session file
pub async fn clear() -> io::Result<()> {
tokio::fs::remove_file(SESSION_PATH.as_path()).await
/// Returns a reference to the path to the session file
pub fn as_path(&self) -> &Path {
self.path.as_path()
}
/// Returns true if the global session file exists
pub fn exists() -> bool {
SESSION_PATH.exists()
/// Returns a reference to the session
pub fn as_session(&self) -> &Session {
&self.session
}
/// Saves a session to the global session file
/// Saves a session by overwriting its current
pub async fn save(&self) -> io::Result<()> {
// Ensure our cache directory exists
let cache_dir = PROJECT_DIRS.cache_dir();
tokio::fs::create_dir_all(cache_dir).await?;
self.save_to(SESSION_PATH.as_path()).await
self.save_to(self.as_path(), true).await
}
/// Saves a session to to a file at the specified path
pub async fn save_to(&self, path: impl AsRef<Path>) -> io::Result<()> {
tokio::fs::write(path.as_ref(), self.0.to_unprotected_string()).await
}
///
/// If all is true, will create all directories leading up to file's location
pub async fn save_to(&self, path: impl AsRef<Path>, all: bool) -> io::Result<()> {
if all {
if let Some(dir) = path.as_ref().parent() {
tokio::fs::create_dir_all(dir).await?;
}
}
/// Loads a session from the global session file
pub async fn load() -> io::Result<Self> {
Self::load_from(SESSION_PATH.as_path()).await
tokio::fs::write(path.as_ref(), self.session.to_unprotected_string()).await
}
/// Loads a session from a file at the specified path
pub async fn load_from(path: impl AsRef<Path>) -> io::Result<Self> {
let text = tokio::fs::read_to_string(path.as_ref()).await?;
Ok(Self(text.parse()?))
Ok(Self {
path: path.as_ref().to_path_buf(),
session: text.parse()?,
})
}
}

@ -0,0 +1,4 @@
// Generates a new tenant name
pub fn new_tenant() -> String {
format!("tenant_{}{}", rand::random::<u16>(), rand::random::<u8>())
}

@ -1,27 +1,11 @@
mod constants;
mod data;
mod net;
mod opt;
mod session;
mod subcommand;
mod cli;
mod core;
use log::error;
pub use opt::Opt;
use std::path::PathBuf;
lazy_static::lazy_static! {
static ref PROJECT_DIRS: directories::ProjectDirs =
directories::ProjectDirs::from(
"org",
"senkbeil",
"distant",
).expect("Failed to find valid home directory path");
static ref SESSION_PATH: PathBuf = PROJECT_DIRS.cache_dir().join("session");
}
/// Main entrypoint into the program
pub fn run() {
let opt = Opt::load();
let opt = cli::opt::Opt::load();
init_logging(&opt.common);
if let Err(x) = opt.subcommand.run(opt.common) {
error!("{}", x);
@ -29,7 +13,7 @@ pub fn run() {
}
}
pub fn init_logging(opt: &opt::CommonOpt) {
fn init_logging(opt: &cli::opt::CommonOpt) {
use flexi_logger::{FileSpec, LevelFilter, LogSpecification, Logger};
let module = "distant";

@ -1,85 +0,0 @@
use crate::{
net::Client,
opt::{CommonOpt, LaunchSubcommand, Mode, SessionOutput},
session::{Session, SessionFile},
};
use derive_more::{Display, Error, From};
use hex::FromHexError;
use orion::errors::UnknownCryptoError;
use std::string::FromUtf8Error;
use tokio::{io, process::Command};
#[derive(Debug, Display, Error, From)]
pub enum Error {
#[display(fmt = "Missing data for session")]
MissingSessionData,
BadKey(UnknownCryptoError),
HexError(FromHexError),
IoError(io::Error),
Utf8Error(FromUtf8Error),
}
pub fn run(cmd: LaunchSubcommand, opt: CommonOpt) -> Result<(), Error> {
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(async { run_async(cmd, opt).await })
}
async fn run_async(cmd: LaunchSubcommand, _opt: CommonOpt) -> Result<(), Error> {
let remote_command = format!(
"{} listen --daemon --host {} {}",
cmd.remote_program,
cmd.bind_server,
cmd.extra_server_args.unwrap_or_default(),
);
let ssh_command = format!(
"{} -o StrictHostKeyChecking=no ssh://{}@{}:{} {} {}",
cmd.ssh_program,
cmd.username,
cmd.host.as_str(),
cmd.port,
cmd.identity_file
.map(|f| format!("-i {}", f.as_path().display()))
.unwrap_or_default(),
remote_command.trim(),
);
let out = Command::new("sh")
.arg("-c")
.arg(ssh_command)
.output()
.await?;
// If our attempt to run the program via ssh failed, report it
if !out.status.success() {
return Err(Error::from(io::Error::new(
io::ErrorKind::Other,
String::from_utf8(out.stderr)?.trim().to_string(),
)));
}
// Parse our output for the specific session line
// NOTE: The host provided on this line isn't valid, so we fill it in with our actual host
let out = String::from_utf8(out.stdout)?.trim().to_string();
let mut session = out
.lines()
.find_map(|line| line.parse::<Session>().ok())
.ok_or(Error::MissingSessionData)?;
session.host = cmd.host;
// Handle sharing resulting session in different ways
match cmd.session {
SessionOutput::File => SessionFile::from(session).save().await?,
SessionOutput::Keep => {
use crate::subcommand::action::inner;
let client = Client::connect(session).await?;
let config = match cmd.mode {
Mode::Json => inner::LoopConfig::Json,
Mode::Shell => inner::LoopConfig::Shell,
};
inner::interactive_loop(client, config).await?;
}
SessionOutput::Pipe => println!("{}", session.to_unprotected_string()),
}
Ok(())
}

@ -1,46 +0,0 @@
use crate::{
opt::{CommonOpt, Mode, SessionSubcommand},
session::SessionFile,
};
use tokio::io;
pub fn run(cmd: SessionSubcommand, _opt: CommonOpt) -> Result<(), io::Error> {
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(async {
match cmd {
SessionSubcommand::Clear => SessionFile::clear().await,
SessionSubcommand::Exists => {
if SessionFile::exists() {
Ok(())
} else {
Err(io::Error::new(
io::ErrorKind::NotFound,
"No session available",
))
}
}
SessionSubcommand::Info { mode } => {
let session = SessionFile::load()
.await
.map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x))?;
match mode {
Mode::Json => {
println!(
"{}",
serde_json::to_string(&serde_json::json!({
"host": session.host,
"port": session.port,
}))
.unwrap()
);
}
Mode::Shell => {
println!("Host: {}", session.host);
println!("Port: {}", session.port);
}
}
Ok(())
}
}
})
}
Loading…
Cancel
Save