Rename mode -> format; bump to 0.13.0

pull/38/head
Chip Senkbeil 3 years ago committed by Chip Senkbeil
parent 33abcbb5fb
commit 2b23cd379c
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

2
Cargo.lock generated

@ -179,7 +179,7 @@ dependencies = [
[[package]] [[package]]
name = "distant" name = "distant"
version = "0.12.0" version = "0.13.0"
dependencies = [ dependencies = [
"bytes", "bytes",
"derive_more", "derive_more",

@ -2,7 +2,7 @@
name = "distant" name = "distant"
description = "Operate on a remote computer through file and process manipulation" description = "Operate on a remote computer through file and process manipulation"
categories = ["command-line-utilities"] categories = ["command-line-utilities"]
version = "0.12.0" version = "0.13.0"
authors = ["Chip Senkbeil <chip@senkbeil.org>"] authors = ["Chip Senkbeil <chip@senkbeil.org>"]
edition = "2018" edition = "2018"
homepage = "https://github.com/chipsenkbeil/distant" homepage = "https://github.com/chipsenkbeil/distant"

@ -1,7 +1,10 @@
use crate::{ use crate::{
cli::subcommand, cli::subcommand,
core::{ core::{
constants::{SESSION_FILE_PATH_STR, SESSION_SOCKET_PATH_STR, TIMEOUT_STR}, constants::{
SERVER_CONN_MSG_CAPACITY_STR, SESSION_FILE_PATH_STR, SESSION_SOCKET_PATH_STR,
TIMEOUT_STR,
},
data::RequestData, data::RequestData,
}, },
ExitCodeError, ExitCodeError,
@ -107,7 +110,7 @@ impl Subcommand {
} }
} }
/// Represents the communication medium used for the send command /// Represents the format for data communicated to & from the client
#[derive( #[derive(
Copy, Copy,
Clone, Clone,
@ -121,7 +124,7 @@ impl Subcommand {
EnumVariantNames, EnumVariantNames,
)] )]
#[strum(serialize_all = "snake_case")] #[strum(serialize_all = "snake_case")]
pub enum Mode { pub enum Format {
/// Sends and receives data in JSON format /// Sends and receives data in JSON format
Json, Json,
@ -145,10 +148,10 @@ pub struct ActionSubcommand {
short, short,
long, long,
case_insensitive = true, case_insensitive = true,
default_value = Mode::Shell.into(), default_value = Format::Shell.into(),
possible_values = Mode::VARIANTS possible_values = Format::VARIANTS
)] )]
pub mode: Mode, pub format: Format,
/// Represents the medium for retrieving a session for use in performing the action /// Represents the medium for retrieving a session for use in performing the action
#[structopt( #[structopt(
@ -371,10 +374,10 @@ pub struct LaunchSubcommand {
short, short,
long, long,
case_insensitive = true, case_insensitive = true,
default_value = Mode::Shell.into(), default_value = Format::Shell.into(),
possible_values = Mode::VARIANTS possible_values = Format::VARIANTS
)] )]
pub mode: Mode, pub format: Format,
/// Path to distant program on remote machine to execute via ssh; /// Path to distant program on remote machine to execute via ssh;
/// by default, this program needs to be available within PATH as /// by default, this program needs to be available within PATH as
@ -524,7 +527,7 @@ pub struct ListenSubcommand {
pub use_ipv6: bool, pub use_ipv6: bool,
/// Maximum capacity for concurrent message handled by the server /// Maximum capacity for concurrent message handled by the server
#[structopt(long, default_value = "1000")] #[structopt(long, default_value = &SERVER_CONN_MSG_CAPACITY_STR)]
pub max_msg_capacity: u16, pub max_msg_capacity: u16,
/// The time in seconds before shutting down the server if there are no active /// The time in seconds before shutting down the server if there are no active

@ -1,5 +1,5 @@
use crate::{ use crate::{
cli::opt::Mode, cli::opt::Format,
core::{ core::{
constants::MAX_PIPE_CHUNK_SIZE, constants::MAX_PIPE_CHUNK_SIZE,
data::{Error, Request, RequestData, Response, ResponseData}, data::{Error, Request, RequestData, Response, ResponseData},
@ -27,7 +27,7 @@ pub enum LoopConfig {
Shell, Shell,
} }
impl From<LoopConfig> for Mode { impl From<LoopConfig> for Format {
fn from(config: LoopConfig) -> Self { fn from(config: LoopConfig) -> Self {
match config { match config {
LoopConfig::Json => Self::Json, LoopConfig::Json => Self::Json,
@ -59,7 +59,7 @@ where
while let Some(data) = rx.recv().await { while let Some(data) = rx.recv().await {
match config { match config {
// Special exit condition for interactive mode // Special exit condition for interactive format
_ if buf.trim() == "exit" => { _ if buf.trim() == "exit" => {
if let Err(_) = tx_stop.send(()) { if let Err(_) = tx_stop.send(()) {
error!("Failed to close interactive loop!"); error!("Failed to close interactive loop!");
@ -67,7 +67,7 @@ where
break; break;
} }
// For json mode, all stdin is treated as individual requests // For json format, all stdin is treated as individual requests
LoopConfig::Json => { LoopConfig::Json => {
buf.push_str(&data); buf.push_str(&data);
let (lines, new_buf) = buf.into_full_lines(); let (lines, new_buf) = buf.into_full_lines();
@ -81,7 +81,7 @@ where
.map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x)); .map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x));
match result { match result {
Ok(req) => match client.send(req).await { Ok(req) => match client.send(req).await {
Ok(res) => match format_response(Mode::Json, res) { Ok(res) => match format_response(Format::Json, res) {
Ok(out) => out.print(), Ok(out) => out.print(),
Err(x) => error!("Failed to format response: {}", x), Err(x) => error!("Failed to format response: {}", x),
}, },
@ -97,7 +97,7 @@ where
} }
} }
// For interactive shell mode, parse stdin as individual commands // For interactive shell format, parse stdin as individual commands
LoopConfig::Shell => { LoopConfig::Shell => {
buf.push_str(&data); buf.push_str(&data);
let (lines, new_buf) = buf.into_full_lines(); let (lines, new_buf) = buf.into_full_lines();
@ -124,7 +124,7 @@ where
.send(Request::new(tenant.as_str(), vec![data])) .send(Request::new(tenant.as_str(), vec![data]))
.await .await
{ {
Ok(res) => match format_response(Mode::Shell, res) { Ok(res) => match format_response(Format::Shell, res) {
Ok(out) => out.print(), Ok(out) => out.print(),
Err(x) => error!("Failed to format response: {}", x), Err(x) => error!("Failed to format response: {}", x),
}, },
@ -141,7 +141,7 @@ where
} }
} }
// For non-interactive shell mode, all stdin is treated as a proc's stdin // For non-interactive shell format, all stdin is treated as a proc's stdin
LoopConfig::Proc { id } => { LoopConfig::Proc { id } => {
debug!("Client sending stdin: {:?}", data); debug!("Client sending stdin: {:?}", data);
let req = let req =
@ -158,12 +158,7 @@ where
while let Err(TryRecvError::Empty) = rx_stop.try_recv() { while let Err(TryRecvError::Empty) = rx_stop.try_recv() {
if let Some(res) = stream.next().await { if let Some(res) = stream.next().await {
let res = res.map_err(|_| { let res = res.map_err(|x| io::Error::new(io::ErrorKind::BrokenPipe, x))?;
io::Error::new(
io::ErrorKind::BrokenPipe,
"Response stream no longer available",
)
})?;
// NOTE: If the loop is for a proxy process, we should assume that the payload // NOTE: If the loop is for a proxy process, we should assume that the payload
// is all-or-nothing for the done check // is all-or-nothing for the done check
@ -262,18 +257,18 @@ impl ResponseOut {
} }
} }
pub fn format_response(mode: Mode, res: Response) -> io::Result<ResponseOut> { pub fn format_response(format: Format, res: Response) -> io::Result<ResponseOut> {
let payload_cnt = res.payload.len(); let payload_cnt = res.payload.len();
Ok(match mode { Ok(match format {
Mode::Json => ResponseOut::StdoutLine(format!( Format::Json => ResponseOut::StdoutLine(format!(
"{}", "{}",
serde_json::to_string(&res) serde_json::to_string(&res)
.map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x))? .map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x))?
)), )),
// NOTE: For shell, we assume a singular entry in the response's payload // NOTE: For shell, we assume a singular entry in the response's payload
Mode::Shell if payload_cnt != 1 => { Format::Shell if payload_cnt != 1 => {
return Err(io::Error::new( return Err(io::Error::new(
io::ErrorKind::InvalidData, io::ErrorKind::InvalidData,
format!( format!(
@ -282,7 +277,7 @@ pub fn format_response(mode: Mode, res: Response) -> io::Result<ResponseOut> {
), ),
)) ))
} }
Mode::Shell => format_shell(res.payload.into_iter().next().unwrap()), Format::Shell => format_shell(res.payload.into_iter().next().unwrap()),
}) })
} }

@ -1,5 +1,5 @@
use crate::{ use crate::{
cli::opt::{ActionSubcommand, CommonOpt, Mode, SessionInput}, cli::opt::{ActionSubcommand, CommonOpt, Format, SessionInput},
core::{ core::{
data::{Request, RequestData, ResponseData}, data::{Request, RequestData, ResponseData},
lsp::LspData, lsp::LspData,
@ -136,7 +136,7 @@ where
proc_id = *id; proc_id = *id;
} }
inner::format_response(cmd.mode, res)?.print(); inner::format_response(cmd.format, res)?.print();
// If we also parsed an LSP's initialize request for its session, we want to forward // If we also parsed an LSP's initialize request for its session, we want to forward
// it along in the case of a process call // it along in the case of a process call
@ -164,10 +164,10 @@ where
// //
// If we are interactive, we want to continue looping regardless // If we are interactive, we want to continue looping regardless
if is_proc_req || cmd.interactive { if is_proc_req || cmd.interactive {
let config = match cmd.mode { let config = match cmd.format {
Mode::Json => inner::LoopConfig::Json, Format::Json => inner::LoopConfig::Json,
Mode::Shell if cmd.interactive => inner::LoopConfig::Shell, Format::Shell if cmd.interactive => inner::LoopConfig::Shell,
Mode::Shell => inner::LoopConfig::Proc { id: proc_id }, Format::Shell => inner::LoopConfig::Proc { id: proc_id },
}; };
inner::interactive_loop(client, tenant, config).await?; inner::interactive_loop(client, tenant, config).await?;
} }

@ -1,5 +1,5 @@
use crate::{ use crate::{
cli::opt::{CommonOpt, LaunchSubcommand, Mode, SessionOutput}, cli::opt::{CommonOpt, Format, LaunchSubcommand, SessionOutput},
core::{ core::{
constants::CLIENT_BROADCAST_CHANNEL_CAPACITY, constants::CLIENT_BROADCAST_CHANNEL_CAPACITY,
data::{Request, RequestData, Response, ResponseData}, data::{Request, RequestData, Response, ResponseData},
@ -51,7 +51,7 @@ struct ConnState {
pub fn run(cmd: LaunchSubcommand, opt: CommonOpt) -> Result<(), Error> { pub fn run(cmd: LaunchSubcommand, opt: CommonOpt) -> Result<(), Error> {
let rt = Runtime::new()?; let rt = Runtime::new()?;
let session_output = cmd.session; let session_output = cmd.session;
let mode = cmd.mode; let format = cmd.format;
let is_daemon = cmd.daemon; let is_daemon = cmd.daemon;
let session_file = cmd.session_data.session_file.clone(); let session_file = cmd.session_data.session_file.clone();
@ -70,7 +70,7 @@ pub fn run(cmd: LaunchSubcommand, opt: CommonOpt) -> Result<(), Error> {
} }
SessionOutput::Keep => { SessionOutput::Keep => {
debug!("Entering interactive loop over stdin"); debug!("Entering interactive loop over stdin");
rt.block_on(async { keep_loop(session, mode, timeout).await })? rt.block_on(async { keep_loop(session, format, timeout).await })?
} }
SessionOutput::Pipe => { SessionOutput::Pipe => {
debug!("Piping session to stdout"); debug!("Piping session to stdout");
@ -137,13 +137,13 @@ pub fn run(cmd: LaunchSubcommand, opt: CommonOpt) -> Result<(), Error> {
Ok(()) Ok(())
} }
async fn keep_loop(session: Session, mode: Mode, duration: Duration) -> io::Result<()> { async fn keep_loop(session: Session, format: Format, duration: Duration) -> io::Result<()> {
use crate::cli::subcommand::action::inner; use crate::cli::subcommand::action::inner;
match Client::tcp_connect_timeout(session, duration).await { match Client::tcp_connect_timeout(session, duration).await {
Ok(client) => { Ok(client) => {
let config = match mode { let config = match format {
Mode::Json => inner::LoopConfig::Json, Format::Json => inner::LoopConfig::Json,
Mode::Shell => inner::LoopConfig::Shell, Format::Shell => inner::LoopConfig::Shell,
}; };
inner::interactive_loop(client, utils::new_tenant(), config).await inner::interactive_loop(client, utils::new_tenant(), config).await
} }

@ -1,5 +1,5 @@
use crate::core::{ use crate::core::{
constants::MAX_PIPE_CHUNK_SIZE, constants::{MAX_PIPE_CHUNK_SIZE, READ_PAUSE_MILLIS},
data::{ data::{
self, DirEntry, FileType, Request, RequestData, Response, ResponseData, RunningProcess, self, DirEntry, FileType, Request, RequestData, Response, ResponseData, RunningProcess,
}, },
@ -397,6 +397,11 @@ async fn proc_run(
if let Err(_) = tx_2.send(res).await { if let Err(_) = tx_2.send(res).await {
break; break;
} }
// Pause to allow buffer to fill up a little bit, avoiding
// spamming with a lot of smaller responses
tokio::time::sleep(tokio::time::Duration::from_millis(READ_PAUSE_MILLIS))
.await;
} }
Err(x) => { Err(x) => {
error!("Invalid data read from stdout pipe: {}", x); error!("Invalid data read from stdout pipe: {}", x);
@ -433,6 +438,11 @@ async fn proc_run(
if let Err(_) = tx_2.send(res).await { if let Err(_) = tx_2.send(res).await {
break; break;
} }
// Pause to allow buffer to fill up a little bit, avoiding
// spamming with a lot of smaller responses
tokio::time::sleep(tokio::time::Duration::from_millis(READ_PAUSE_MILLIS))
.await;
} }
Err(x) => { Err(x) => {
error!("Invalid data read from stdout pipe: {}", x); error!("Invalid data read from stdout pipe: {}", x);

@ -6,13 +6,21 @@ pub const TIMEOUT: usize = 15000;
/// Capacity associated with a client broadcasting its received messages that /// Capacity associated with a client broadcasting its received messages that
/// do not have a callback associated /// do not have a callback associated
pub const CLIENT_BROADCAST_CHANNEL_CAPACITY: usize = 100; pub const CLIENT_BROADCAST_CHANNEL_CAPACITY: usize = 10000;
/// Capacity associated with a server receiving messages from a connection
/// with a client
pub const SERVER_CONN_MSG_CAPACITY: usize = 10000;
/// Represents the maximum size (in bytes) that data will be read from pipes /// Represents the maximum size (in bytes) that data will be read from pipes
/// per individual `read` call /// per individual `read` call
/// ///
/// Current setting is 1k size /// Current setting is 16k size
pub const MAX_PIPE_CHUNK_SIZE: usize = 1024; pub const MAX_PIPE_CHUNK_SIZE: usize = 16384;
/// Duration in milliseconds to sleep between reading stdout/stderr chunks
/// to avoid sending many small messages to clients
pub const READ_PAUSE_MILLIS: u64 = 50;
/// Represents the length of the salt to use for encryption /// Represents the length of the salt to use for encryption
pub const SALT_LEN: usize = 16; pub const SALT_LEN: usize = 16;
@ -26,6 +34,7 @@ pub mod test {
lazy_static::lazy_static! { lazy_static::lazy_static! {
pub static ref TIMEOUT_STR: String = TIMEOUT.to_string(); pub static ref TIMEOUT_STR: String = TIMEOUT.to_string();
pub static ref SERVER_CONN_MSG_CAPACITY_STR: String = SERVER_CONN_MSG_CAPACITY.to_string();
/// Represents the path to the global session file /// Represents the path to the global session file
pub static ref SESSION_FILE_PATH: PathBuf = env::temp_dir().join("distant.session"); pub static ref SESSION_FILE_PATH: PathBuf = env::temp_dir().join("distant.session");

Loading…
Cancel
Save