@ -1,5 +1,5 @@
use crate ::{
client ::{ Mailbox , Session , Session Channel} ,
client ::{ Mailbox , Session Channel} ,
constants ::CLIENT_MAILBOX_CAPACITY ,
data ::{ Request , RequestData , ResponseData } ,
net ::TransportError ,
@ -61,7 +61,7 @@ impl RemoteProcess {
/// Spawns the specified process on the remote machine using the given session
pub async fn spawn (
tenant : impl Into < String > ,
session : & mut Session ,
mut channel : SessionChannel ,
cmd : impl Into < String > ,
args : Vec < String > ,
) -> Result < Self , RemoteProcessError > {
@ -69,7 +69,7 @@ impl RemoteProcess {
let cmd = cmd . into ( ) ;
// Submit our run request and get back a mailbox for responses
let mut mailbox = session
let mut mailbox = channel
. mail ( Request ::new (
tenant . as_str ( ) ,
vec! [ RequestData ::ProcRun { cmd , args } ] ,
@ -121,7 +121,6 @@ impl RemoteProcess {
} ) ;
// Spawn a task that takes stdin from our channel and forwards it to the remote process
let channel = session . clone_channel ( ) ;
let req_task = tokio ::spawn ( async move {
process_outgoing_requests ( tenant , id , channel , stdin_rx , kill_rx ) . await
} ) ;
@ -302,6 +301,7 @@ async fn process_incoming_responses(
mod tests {
use super ::* ;
use crate ::{
client ::Session ,
data ::{ Error , ErrorKind , Response } ,
net ::{ InmemoryStream , PlainCodec , Transport } ,
} ;
@ -313,14 +313,14 @@ mod tests {
#[ tokio::test ]
async fn spawn_should_return_invalid_data_if_payload_size_unexpected ( ) {
let ( mut transport , mut session ) = make_session ( ) ;
let ( mut transport , session ) = make_session ( ) ;
// Create a task for process spawning as we need to handle the request and a response
// in a separate async block
let spawn_task = tokio ::spawn ( async move {
RemoteProcess ::spawn (
String ::from ( "test-tenant" ) ,
& mut session ,
session . clone_channel ( ) ,
String ::from ( "cmd" ) ,
vec! [ String ::from ( "arg" ) ] ,
)
@ -351,14 +351,14 @@ mod tests {
#[ tokio::test ]
async fn spawn_should_return_invalid_data_if_did_not_get_a_indicator_that_process_started ( ) {
let ( mut transport , mut session ) = make_session ( ) ;
let ( mut transport , session ) = make_session ( ) ;
// Create a task for process spawning as we need to handle the request and a response
// in a separate async block
let spawn_task = tokio ::spawn ( async move {
RemoteProcess ::spawn (
String ::from ( "test-tenant" ) ,
& mut session ,
session . clone_channel ( ) ,
String ::from ( "cmd" ) ,
vec! [ String ::from ( "arg" ) ] ,
)
@ -396,14 +396,14 @@ mod tests {
#[ tokio::test ]
async fn kill_should_return_error_if_internal_tasks_already_completed ( ) {
let ( mut transport , mut session ) = make_session ( ) ;
let ( mut transport , session ) = make_session ( ) ;
// Create a task for process spawning as we need to handle the request and a response
// in a separate async block
let spawn_task = tokio ::spawn ( async move {
RemoteProcess ::spawn (
String ::from ( "test-tenant" ) ,
& mut session ,
session . clone_channel ( ) ,
String ::from ( "cmd" ) ,
vec! [ String ::from ( "arg" ) ] ,
)
@ -441,14 +441,14 @@ mod tests {
#[ tokio::test ]
async fn kill_should_send_proc_kill_request_and_then_cause_stdin_forwarding_to_close ( ) {
let ( mut transport , mut session ) = make_session ( ) ;
let ( mut transport , session ) = make_session ( ) ;
// Create a task for process spawning as we need to handle the request and a response
// in a separate async block
let spawn_task = tokio ::spawn ( async move {
RemoteProcess ::spawn (
String ::from ( "test-tenant" ) ,
& mut session ,
session . clone_channel ( ) ,
String ::from ( "cmd" ) ,
vec! [ String ::from ( "arg" ) ] ,
)
@ -497,14 +497,14 @@ mod tests {
#[ tokio::test ]
async fn stdin_should_be_forwarded_from_receiver_field ( ) {
let ( mut transport , mut session ) = make_session ( ) ;
let ( mut transport , session ) = make_session ( ) ;
// Create a task for process spawning as we need to handle the request and a response
// in a separate async block
let spawn_task = tokio ::spawn ( async move {
RemoteProcess ::spawn (
String ::from ( "test-tenant" ) ,
& mut session ,
session . clone_channel ( ) ,
String ::from ( "cmd" ) ,
vec! [ String ::from ( "arg" ) ] ,
)
@ -552,14 +552,14 @@ mod tests {
#[ tokio::test ]
async fn stdout_should_be_forwarded_to_receiver_field ( ) {
let ( mut transport , mut session ) = make_session ( ) ;
let ( mut transport , session ) = make_session ( ) ;
// Create a task for process spawning as we need to handle the request and a response
// in a separate async block
let spawn_task = tokio ::spawn ( async move {
RemoteProcess ::spawn (
String ::from ( "test-tenant" ) ,
& mut session ,
session . clone_channel ( ) ,
String ::from ( "cmd" ) ,
vec! [ String ::from ( "arg" ) ] ,
)
@ -601,14 +601,14 @@ mod tests {
#[ tokio::test ]
async fn stderr_should_be_forwarded_to_receiver_field ( ) {
let ( mut transport , mut session ) = make_session ( ) ;
let ( mut transport , session ) = make_session ( ) ;
// Create a task for process spawning as we need to handle the request and a response
// in a separate async block
let spawn_task = tokio ::spawn ( async move {
RemoteProcess ::spawn (
String ::from ( "test-tenant" ) ,
& mut session ,
session . clone_channel ( ) ,
String ::from ( "cmd" ) ,
vec! [ String ::from ( "arg" ) ] ,
)
@ -650,14 +650,14 @@ mod tests {
#[ tokio::test ]
async fn wait_should_return_error_if_internal_tasks_fail ( ) {
let ( mut transport , mut session ) = make_session ( ) ;
let ( mut transport , session ) = make_session ( ) ;
// Create a task for process spawning as we need to handle the request and a response
// in a separate async block
let spawn_task = tokio ::spawn ( async move {
RemoteProcess ::spawn (
String ::from ( "test-tenant" ) ,
& mut session ,
session . clone_channel ( ) ,
String ::from ( "cmd" ) ,
vec! [ String ::from ( "arg" ) ] ,
)
@ -692,14 +692,14 @@ mod tests {
#[ tokio::test ]
async fn wait_should_return_error_if_connection_terminates_before_receiving_done_response ( ) {
let ( mut transport , mut session ) = make_session ( ) ;
let ( mut transport , session ) = make_session ( ) ;
// Create a task for process spawning as we need to handle the request and a response
// in a separate async block
let spawn_task = tokio ::spawn ( async move {
RemoteProcess ::spawn (
String ::from ( "test-tenant" ) ,
& mut session ,
session . clone_channel ( ) ,
String ::from ( "cmd" ) ,
vec! [ String ::from ( "arg" ) ] ,
)
@ -741,14 +741,14 @@ mod tests {
#[ tokio::test ]
async fn receiving_done_response_should_result_in_wait_returning_exit_information ( ) {
let ( mut transport , mut session ) = make_session ( ) ;
let ( mut transport , session ) = make_session ( ) ;
// Create a task for process spawning as we need to handle the request and a response
// in a separate async block
let spawn_task = tokio ::spawn ( async move {
RemoteProcess ::spawn (
String ::from ( "test-tenant" ) ,
& mut session ,
session . clone_channel ( ) ,
String ::from ( "cmd" ) ,
vec! [ String ::from ( "arg" ) ] ,
)