Fix CPU pegging of server (unnecessary transport loop), add extra server args option, and adjust default port settings to be TCP friendly

pull/38/head
Chip Senkbeil 3 years ago
parent e4bdde8aae
commit f6fa3e606e
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

@ -47,7 +47,6 @@ impl Transport {
pub async fn send<T: Serialize>(&mut self, data: T) -> Result<(), TransportError> {
// Serialize, encrypt, and then (TODO) sign
// NOTE: Cannot used packed implementation for now due to issues with deserialization
// let data = serde_cbor::ser::to_vec_packed(&data)?;
let data = serde_cbor::to_vec(&data)?;
let data = aead::seal(&self.key, &data)?;
@ -57,24 +56,18 @@ impl Transport {
.map_err(TransportError::CodecError)
}
/// Receives some data from out on the wire, waiting until it's available
pub async fn receive<T: DeserializeOwned>(&mut self) -> Result<T, TransportError> {
loop {
if let Some(data) = self.try_receive().await? {
break Ok(data);
}
}
}
/// Attempts to receive some data from out on the wire, returning that data if available
/// or none if unavailable
pub async fn try_receive<T: DeserializeOwned>(&mut self) -> Result<Option<T>, TransportError> {
/// Receives some data from out on the wire, waiting until it's available,
/// returning none if the transport is now closed
pub async fn receive<T: DeserializeOwned>(&mut self) -> Result<Option<T>, TransportError> {
// If data is received, we process like usual
if let Some(data) = self.inner.next().await {
// Validate (TODO), decrypt, and then deserialize
// Validate (TODO) signature, decrypt, and then deserialize
let data = data?;
let data = aead::open(&self.key, &data)?;
let data = serde_cbor::from_slice(&data)?;
Ok(Some(data))
// Otherwise, if no data is received, this means that our socket has closed
} else {
Ok(None)
}

@ -187,7 +187,7 @@ pub struct LaunchSubcommand {
#[structopt(short, long, default_value = "ssh")]
pub ssh_program: String,
/// Control the IP address that the mosh-server binds to.
/// Control the IP address that the server binds to.
///
/// The default is `ssh', in which case the server will reply from the IP address that the SSH
/// connection came from (as found in the SSH_CONNECTION environment variable). This is
@ -201,14 +201,9 @@ pub struct LaunchSubcommand {
#[structopt(long, value_name = "ssh|any|IP", default_value = "ssh")]
pub bind_server: BindAddress,
/// If specified, will write server logs to a file instead of discarding them
#[structopt(long)]
pub server_log_file: Option<PathBuf>,
/// If specified, will set the server's log level (0 is warning and above, 1 is info, 2 is
/// debug, and 3 or higher is trace)
#[structopt(long, default_value = "0")]
pub server_log_level: u8,
/// Additional arguments to provide to the server
#[structopt(long, allow_hyphen_values(true))]
pub extra_server_args: Option<String>,
/// If specified, will bind server to the ipv6 interface if host is "any" instead of ipv4
#[structopt(short = "6", long)]
@ -333,7 +328,7 @@ pub struct ListenSubcommand {
short,
long,
value_name = "PORT[:PORT2]",
default_value = "60000:61000"
default_value = "8080:8099"
)]
pub port: PortRange,
}

@ -4,7 +4,6 @@ use crate::{
};
use derive_more::{Display, Error, From};
use hex::FromHexError;
use log::*;
use orion::{aead::SecretKey, errors::UnknownCryptoError};
use std::string::FromUtf8Error;
use tokio::{io, process::Command};
@ -27,18 +26,10 @@ pub fn run(cmd: LaunchSubcommand, opt: CommonOpt) -> Result<(), Error> {
async fn run_async(cmd: LaunchSubcommand, _opt: CommonOpt) -> Result<(), Error> {
let remote_command = format!(
"{} listen --daemon --host {} {} {} {}",
"{} listen --daemon --host {} {}",
cmd.remote_program,
cmd.bind_server,
if cmd.use_ipv6 { "-6" } else { "" },
cmd.server_log_file
.as_ref()
.map(|path| format!("--log-file {:?}", path))
.unwrap_or_default(),
match cmd.server_log_level {
0 => String::new(),
n => format!("-{}", "v".repeat(n as usize)),
},
cmd.extra_server_args.unwrap_or_default(),
);
let ssh_command = format!(
"{} -o StrictHostKeyChecking=no ssh://{}@{}:{} {} {}",
@ -98,7 +89,7 @@ async fn run_async(cmd: LaunchSubcommand, _opt: CommonOpt) -> Result<(), Error>
session.save().await?;
if cmd.print_startup_data {
info!("DISTANT DATA {} {}", port, session.to_hex_key());
println!("DISTANT DATA {} {}", port, session.to_hex_key());
}
Ok(())

@ -68,16 +68,14 @@ async fn run_async(cmd: ListenSubcommand, _opt: CommonOpt, is_forked: bool) -> R
}
}
// Begin our listen loop
loop {
// Wait for a client connection
let (client, _) = listener.accept().await?;
// Wait for a client connection, then spawn a new task to handle
// receiving data from the client
while let Ok((client, _)) = listener.accept().await {
// Grab the client's remote address for later logging purposes
let addr_string = match client.peer_addr() {
Ok(addr) => {
let addr_string = addr.to_string();
info!("New client from {}", addr_string);
info!("<Client @ {}> Established connection", addr_string);
addr_string
}
Err(x) => {
@ -93,9 +91,9 @@ async fn run_async(cmd: ListenSubcommand, _opt: CommonOpt, is_forked: bool) -> R
tokio::spawn(async move {
loop {
match transport.receive::<Operation>().await {
Ok(request) => {
Ok(Some(request)) => {
trace!(
"[{}] Received request of type {}",
"<Client @ {}> Received request of type {}",
addr_string.as_str(),
request.as_ref()
);
@ -105,12 +103,16 @@ async fn run_async(cmd: ListenSubcommand, _opt: CommonOpt, is_forked: bool) -> R
};
if let Err(x) = transport.send(response).await {
error!("{}", x);
error!("<Client @ {}> {}", addr_string.as_str(), x);
break;
}
}
Ok(None) => {
info!("<Client @ {}> Closed connection", addr_string.as_str());
break;
}
Err(x) => {
error!("{}", x);
error!("<Client @ {}> {}", addr_string.as_str(), x);
break;
}
}
@ -118,7 +120,6 @@ async fn run_async(cmd: ListenSubcommand, _opt: CommonOpt, is_forked: bool) -> R
});
}
#[allow(unreachable_code)]
Ok(())
}

Loading…
Cancel
Save