diff --git a/CHANGELOG.md b/CHANGELOG.md index 1fdda8e..0eb5ca9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Rename `GenericServerRef` to `ServerRef` and remove `ServerRef` trait, refactoring `TcpServerRef`, `UnixSocketServerRef`, and `WindowsPipeServerRef` to use the struct instead of `Box` +- Update `Reply` trait and associated implementations to be non-blocking & + synchronous as opposed to asynchronous to avoid deadlocks and also be more + performant ## [0.20.0-alpha.8] diff --git a/distant-core/src/api.rs b/distant-core/src/api.rs index 4c85a58..62042fd 100644 --- a/distant-core/src/api.rs +++ b/distant-core/src/api.rs @@ -495,12 +495,12 @@ where // Queue up our result to go before ANY of the other messages that might be sent. // This is important to avoid situations such as when a process is started, but before // the confirmation can be sent some stdout or stderr is captured and sent first. - if let Err(x) = reply.send_before(response).await { + if let Err(x) = reply.send_before(response) { error!("[Conn {}] Failed to send response: {}", connection_id, x); } // Flush out all of our replies thus far and toggle to no longer hold submissions - if let Err(x) = reply.flush(false).await { + if let Err(x) = reply.flush(false) { error!( "[Conn {}] Failed to flush response queue: {}", connection_id, x diff --git a/distant-core/src/api/reply.rs b/distant-core/src/api/reply.rs index 0c69822..78212a0 100644 --- a/distant-core/src/api/reply.rs +++ b/distant-core/src/api/reply.rs @@ -1,6 +1,4 @@ -use std::future::Future; use std::io; -use std::pin::Pin; use distant_net::server::Reply; @@ -19,14 +17,10 @@ impl From>>> for DistantS impl Reply for DistantSingleReply { type Data = protocol::Response; - fn send(&self, data: Self::Data) -> Pin> + Send + '_>> { + fn send(&self, data: Self::Data) -> io::Result<()> { self.0.send(protocol::Msg::Single(data)) } - fn blocking_send(&self, data: Self::Data) -> io::Result<()> { - self.0.blocking_send(protocol::Msg::Single(data)) - } - fn clone_reply(&self) -> Box> { Box::new(Self(self.0.clone_reply())) } diff --git a/distant-local/src/api.rs b/distant-local/src/api.rs index f61adb5..fd589bc 100644 --- a/distant-local/src/api.rs +++ b/distant-local/src/api.rs @@ -719,7 +719,7 @@ mod tests { const DEBOUNCE_TIMEOUT: Duration = Duration::from_millis(100); - async fn setup(buffer: usize) -> (Api, DistantCtx, mpsc::Receiver) { + async fn setup() -> (Api, DistantCtx, mpsc::UnboundedReceiver) { let api = Api::initialize(Config { watch: WatchConfig { debounce_timeout: DEBOUNCE_TIMEOUT, @@ -727,7 +727,7 @@ mod tests { }, }) .unwrap(); - let (reply, rx) = make_reply(buffer); + let (reply, rx) = make_reply(); let connection_id = rand::random(); DistantApi::on_connect(&api, connection_id).await.unwrap(); @@ -738,14 +738,17 @@ mod tests { (api, ctx, rx) } - fn make_reply(buffer: usize) -> (Box>, mpsc::Receiver) { - let (tx, rx) = mpsc::channel(buffer); + fn make_reply() -> ( + Box>, + mpsc::UnboundedReceiver, + ) { + let (tx, rx) = mpsc::unbounded_channel(); (Box::new(tx), rx) } #[test(tokio::test)] async fn read_file_should_fail_if_file_missing() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let path = temp.child("missing-file").path().to_path_buf(); @@ -754,7 +757,7 @@ mod tests { #[test(tokio::test)] async fn read_file_should_send_blob_with_file_contents() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let file = temp.child("test-file"); @@ -766,7 +769,7 @@ mod tests { #[test(tokio::test)] async fn read_file_text_should_send_error_if_fails_to_read_file() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let path = temp.child("missing-file").path().to_path_buf(); @@ -776,7 +779,7 @@ mod tests { #[test(tokio::test)] async fn read_file_text_should_send_text_with_file_contents() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let file = temp.child("test-file"); @@ -791,7 +794,7 @@ mod tests { #[test(tokio::test)] async fn write_file_should_send_error_if_fails_to_write_file() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; // Create a temporary path and add to it to ensure that there are // extra components that don't exist to cause writing to fail @@ -809,7 +812,7 @@ mod tests { #[test(tokio::test)] async fn write_file_should_send_ok_when_successful() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; // Path should point to a file that does not exist, but all // other components leading up to it do @@ -827,7 +830,7 @@ mod tests { #[test(tokio::test)] async fn write_file_text_should_send_error_if_fails_to_write_file() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; // Create a temporary path and add to it to ensure that there are // extra components that don't exist to cause writing to fail @@ -844,7 +847,7 @@ mod tests { #[test(tokio::test)] async fn write_file_text_should_send_ok_when_successful() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; // Path should point to a file that does not exist, but all // other components leading up to it do @@ -862,7 +865,7 @@ mod tests { #[test(tokio::test)] async fn append_file_should_send_error_if_fails_to_create_file() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; // Create a temporary path and add to it to ensure that there are // extra components that don't exist to cause writing to fail @@ -883,7 +886,7 @@ mod tests { #[test(tokio::test)] async fn append_file_should_create_file_if_missing() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; // Don't create the file directly, but define path // where the file should be @@ -907,7 +910,7 @@ mod tests { #[test(tokio::test)] async fn append_file_should_send_ok_when_successful() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; // Create a temporary file and fill it with some contents let temp = assert_fs::TempDir::new().unwrap(); @@ -931,7 +934,7 @@ mod tests { #[test(tokio::test)] async fn append_file_text_should_send_error_if_fails_to_create_file() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; // Create a temporary path and add to it to ensure that there are // extra components that don't exist to cause writing to fail @@ -953,7 +956,7 @@ mod tests { #[test(tokio::test)] async fn append_file_text_should_create_file_if_missing() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; // Don't create the file directly, but define path // where the file should be @@ -977,7 +980,7 @@ mod tests { #[test(tokio::test)] async fn append_file_text_should_send_ok_when_successful() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; // Create a temporary file and fill it with some contents let temp = assert_fs::TempDir::new().unwrap(); @@ -1001,7 +1004,7 @@ mod tests { #[test(tokio::test)] async fn dir_read_should_send_error_if_directory_does_not_exist() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let dir = temp.child("test-dir"); @@ -1042,7 +1045,7 @@ mod tests { #[test(tokio::test)] async fn dir_read_should_support_depth_limits() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; // Create directory with some nested items let root_dir = setup_dir().await; @@ -1076,7 +1079,7 @@ mod tests { #[test(tokio::test)] async fn dir_read_should_support_unlimited_depth_using_zero() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; // Create directory with some nested items let root_dir = setup_dir().await; @@ -1114,7 +1117,7 @@ mod tests { #[test(tokio::test)] async fn dir_read_should_support_including_directory_in_returned_entries() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; // Create directory with some nested items let root_dir = setup_dir().await; @@ -1153,7 +1156,7 @@ mod tests { #[test(tokio::test)] async fn dir_read_should_support_returning_absolute_paths() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; // Create directory with some nested items let root_dir = setup_dir().await; @@ -1188,7 +1191,7 @@ mod tests { #[test(tokio::test)] async fn dir_read_should_support_returning_canonicalized_paths() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; // Create directory with some nested items let root_dir = setup_dir().await; @@ -1223,7 +1226,7 @@ mod tests { #[test(tokio::test)] async fn create_dir_should_send_error_if_fails() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; // Make a path that has multiple non-existent components // so the creation will fail @@ -1241,7 +1244,7 @@ mod tests { #[test(tokio::test)] async fn create_dir_should_send_ok_when_successful() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let root_dir = setup_dir().await; let path = root_dir.path().join("new-dir"); @@ -1255,7 +1258,7 @@ mod tests { #[test(tokio::test)] async fn create_dir_should_support_creating_multiple_dir_components() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let root_dir = setup_dir().await; let path = root_dir.path().join("nested").join("new-dir"); @@ -1269,7 +1272,7 @@ mod tests { #[test(tokio::test)] async fn remove_should_send_error_on_failure() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let file = temp.child("missing-file"); @@ -1284,7 +1287,7 @@ mod tests { #[test(tokio::test)] async fn remove_should_support_deleting_a_directory() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let dir = temp.child("dir"); dir.create_dir_all().unwrap(); @@ -1299,7 +1302,7 @@ mod tests { #[test(tokio::test)] async fn remove_should_delete_nonempty_directory_if_force_is_true() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let dir = temp.child("dir"); dir.create_dir_all().unwrap(); @@ -1315,7 +1318,7 @@ mod tests { #[test(tokio::test)] async fn remove_should_support_deleting_a_single_file() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let file = temp.child("some-file"); file.touch().unwrap(); @@ -1330,7 +1333,7 @@ mod tests { #[test(tokio::test)] async fn copy_should_send_error_on_failure() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let src = temp.child("src"); let dst = temp.child("dst"); @@ -1346,7 +1349,7 @@ mod tests { #[test(tokio::test)] async fn copy_should_support_copying_an_entire_directory() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let src = temp.child("src"); @@ -1370,7 +1373,7 @@ mod tests { #[test(tokio::test)] async fn copy_should_support_copying_an_empty_directory() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let src = temp.child("src"); src.create_dir_all().unwrap(); @@ -1387,7 +1390,7 @@ mod tests { #[test(tokio::test)] async fn copy_should_support_copying_a_directory_that_only_contains_directories() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let src = temp.child("src"); @@ -1411,7 +1414,7 @@ mod tests { #[test(tokio::test)] async fn copy_should_support_copying_a_single_file() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let src = temp.child("src"); src.write_str("some text").unwrap(); @@ -1428,7 +1431,7 @@ mod tests { #[test(tokio::test)] async fn rename_should_fail_if_path_missing() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let src = temp.child("src"); let dst = temp.child("dst"); @@ -1444,7 +1447,7 @@ mod tests { #[test(tokio::test)] async fn rename_should_support_renaming_an_entire_directory() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let src = temp.child("src"); @@ -1468,7 +1471,7 @@ mod tests { #[test(tokio::test)] async fn rename_should_support_renaming_a_single_file() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let src = temp.child("src"); src.write_str("some text").unwrap(); @@ -1504,7 +1507,7 @@ mod tests { #[test(tokio::test)] async fn watch_should_support_watching_a_single_file() { // NOTE: Supporting multiple replies being sent back as part of creating, modifying, etc. - let (api, ctx, mut rx) = setup(100).await; + let (api, ctx, mut rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let file = temp.child("file"); @@ -1537,7 +1540,7 @@ mod tests { #[test(tokio::test)] async fn watch_should_support_watching_a_directory_recursively() { // NOTE: Supporting multiple replies being sent back as part of creating, modifying, etc. - let (api, ctx, mut rx) = setup(100).await; + let (api, ctx, mut rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let file = temp.child("file"); @@ -1614,9 +1617,9 @@ mod tests { #[test(tokio::test)] async fn watch_should_report_changes_using_the_ctx_replies() { // NOTE: Supporting multiple replies being sent back as part of creating, modifying, etc. - let (api, ctx_1, mut rx_1) = setup(100).await; + let (api, ctx_1, mut rx_1) = setup().await; let (ctx_2, mut rx_2) = { - let (reply, rx) = make_reply(100); + let (reply, rx) = make_reply(); let ctx = DistantCtx { connection_id: ctx_1.connection_id, reply, @@ -1685,7 +1688,7 @@ mod tests { #[test(tokio::test)] async fn exists_should_send_true_if_path_exists() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let file = temp.child("file"); file.touch().unwrap(); @@ -1696,7 +1699,7 @@ mod tests { #[test(tokio::test)] async fn exists_should_send_false_if_path_does_not_exist() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let file = temp.child("file"); @@ -1706,7 +1709,7 @@ mod tests { #[test(tokio::test)] async fn metadata_should_send_error_on_failure() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let file = temp.child("file"); @@ -1723,7 +1726,7 @@ mod tests { #[test(tokio::test)] async fn metadata_should_send_back_metadata_on_file_if_exists() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let file = temp.child("file"); file.write_str("some text").unwrap(); @@ -1757,7 +1760,7 @@ mod tests { #[cfg(unix)] #[test(tokio::test)] async fn metadata_should_include_unix_specific_metadata_on_unix_platform() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let file = temp.child("file"); file.write_str("some text").unwrap(); @@ -1787,7 +1790,7 @@ mod tests { #[cfg(windows)] #[test(tokio::test)] async fn metadata_should_include_windows_specific_metadata_on_windows_platform() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let file = temp.child("file"); file.write_str("some text").unwrap(); @@ -1816,7 +1819,7 @@ mod tests { #[test(tokio::test)] async fn metadata_should_send_back_metadata_on_dir_if_exists() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let dir = temp.child("dir"); dir.create_dir_all().unwrap(); @@ -1848,7 +1851,7 @@ mod tests { #[test(tokio::test)] async fn metadata_should_send_back_metadata_on_symlink_if_exists() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let file = temp.child("file"); file.write_str("some text").unwrap(); @@ -1883,7 +1886,7 @@ mod tests { #[test(tokio::test)] async fn metadata_should_include_canonicalized_path_if_flag_specified() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let file = temp.child("file"); file.write_str("some text").unwrap(); @@ -1918,7 +1921,7 @@ mod tests { #[test(tokio::test)] async fn metadata_should_resolve_file_type_of_symlink_if_flag_specified() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let file = temp.child("file"); file.write_str("some text").unwrap(); @@ -1951,7 +1954,7 @@ mod tests { #[test(tokio::test)] async fn set_permissions_should_set_readonly_flag_if_specified() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let file = temp.child("file"); file.write_str("some text").unwrap(); @@ -1988,7 +1991,7 @@ mod tests { { use std::os::unix::prelude::*; - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let file = temp.child("file"); file.write_str("some text").unwrap(); @@ -2031,7 +2034,7 @@ mod tests { #[test(tokio::test)] #[cfg_attr(unix, ignore)] async fn set_permissions_should_set_readonly_flag_if_not_on_unix_platform() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let file = temp.child("file"); file.write_str("some text").unwrap(); @@ -2071,7 +2074,7 @@ mod tests { #[test(tokio::test)] async fn set_permissions_should_not_recurse_if_option_false() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let file = temp.child("file"); file.write_str("some text").unwrap(); @@ -2145,7 +2148,7 @@ mod tests { #[test(tokio::test)] async fn set_permissions_should_traverse_symlinks_while_recursing_if_following_symlinks_enabled( ) { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let file = temp.child("file"); file.write_str("some text").unwrap(); @@ -2189,7 +2192,7 @@ mod tests { #[test(tokio::test)] async fn set_permissions_should_not_traverse_symlinks_while_recursing_if_following_symlinks_disabled( ) { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let file = temp.child("file"); file.write_str("some text").unwrap(); @@ -2235,7 +2238,7 @@ mod tests { #[test(tokio::test)] async fn set_permissions_should_skip_symlinks_if_exclude_symlinks_enabled() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let file = temp.child("file"); file.write_str("some text").unwrap(); @@ -2279,7 +2282,7 @@ mod tests { #[test(tokio::test)] async fn set_permissions_should_support_recursive_if_option_specified() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let file = temp.child("file"); file.write_str("some text").unwrap(); @@ -2330,7 +2333,7 @@ mod tests { #[test(tokio::test)] async fn set_permissions_should_support_following_explicit_symlink_if_option_specified() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let temp = assert_fs::TempDir::new().unwrap(); let file = temp.child("file"); file.write_str("some text").unwrap(); @@ -2390,7 +2393,7 @@ mod tests { #[test(tokio::test)] #[cfg_attr(windows, ignore)] async fn proc_spawn_should_send_error_on_failure() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let _ = api .proc_spawn( @@ -2409,7 +2412,7 @@ mod tests { #[test(tokio::test)] #[cfg_attr(windows, ignore)] async fn proc_spawn_should_return_id_of_spawned_process() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let id = api .proc_spawn( @@ -2434,7 +2437,7 @@ mod tests { #[test(tokio::test)] #[cfg_attr(windows, ignore)] async fn proc_spawn_should_send_back_stdout_periodically_when_available() { - let (api, ctx, mut rx) = setup(1).await; + let (api, ctx, mut rx) = setup().await; let proc_id = api .proc_spawn( @@ -2498,7 +2501,7 @@ mod tests { #[test(tokio::test)] #[cfg_attr(windows, ignore)] async fn proc_spawn_should_send_back_stderr_periodically_when_available() { - let (api, ctx, mut rx) = setup(1).await; + let (api, ctx, mut rx) = setup().await; let proc_id = api .proc_spawn( @@ -2562,7 +2565,7 @@ mod tests { #[test(tokio::test)] #[cfg_attr(windows, ignore)] async fn proc_spawn_should_send_done_signal_when_completed() { - let (api, ctx, mut rx) = setup(1).await; + let (api, ctx, mut rx) = setup().await; let proc_id = api .proc_spawn( @@ -2592,9 +2595,9 @@ mod tests { #[test(tokio::test)] #[cfg_attr(windows, ignore)] async fn proc_spawn_should_clear_process_from_state_when_killed() { - let (api, ctx_1, mut rx) = setup(1).await; + let (api, ctx_1, mut rx) = setup().await; let (ctx_2, _rx) = { - let (reply, rx) = make_reply(1); + let (reply, rx) = make_reply(); let ctx = DistantCtx { connection_id: ctx_1.connection_id, reply, @@ -2630,7 +2633,7 @@ mod tests { #[test(tokio::test)] async fn proc_kill_should_fail_if_given_non_existent_process() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; // Send kill to a non-existent process let _ = api.proc_kill(ctx, 0xDEADBEEF).await.unwrap_err(); @@ -2638,7 +2641,7 @@ mod tests { #[test(tokio::test)] async fn proc_stdin_should_fail_if_given_non_existent_process() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; // Send stdin to a non-existent process let _ = api @@ -2652,9 +2655,9 @@ mod tests { #[test(tokio::test)] #[cfg_attr(windows, ignore)] async fn proc_stdin_should_send_stdin_to_process() { - let (api, ctx_1, mut rx) = setup(1).await; + let (api, ctx_1, mut rx) = setup().await; let (ctx_2, _rx) = { - let (reply, rx) = make_reply(1); + let (reply, rx) = make_reply(); let ctx = DistantCtx { connection_id: ctx_1.connection_id, reply, @@ -2695,7 +2698,7 @@ mod tests { #[test(tokio::test)] async fn system_info_should_return_system_info_based_on_binary() { - let (api, ctx, _rx) = setup(1).await; + let (api, ctx, _rx) = setup().await; let system_info = api.system_info(ctx).await.unwrap(); assert_eq!( diff --git a/distant-local/src/api/state/process/instance.rs b/distant-local/src/api/state/process/instance.rs index 4d170fc..8049c5c 100644 --- a/distant-local/src/api/state/process/instance.rs +++ b/distant-local/src/api/state/process/instance.rs @@ -173,7 +173,7 @@ async fn stdout_task( loop { match stdout.recv().await { Ok(Some(data)) => { - reply.send(Response::ProcStdout { id, data }).await?; + reply.send(Response::ProcStdout { id, data })?; } Ok(None) => return Ok(()), Err(x) => return Err(x), @@ -189,7 +189,7 @@ async fn stderr_task( loop { match stderr.recv().await { Ok(Some(data)) => { - reply.send(Response::ProcStderr { id, data }).await?; + reply.send(Response::ProcStderr { id, data })?; } Ok(None) => return Ok(()), Err(x) => return Err(x), @@ -205,15 +205,11 @@ async fn wait_task( let status = child.wait().await; match status { - Ok(status) => { - reply - .send(Response::ProcDone { - id, - success: status.success, - code: status.code, - }) - .await - } - Err(x) => reply.send(Response::from(x)).await, + Ok(status) => reply.send(Response::ProcDone { + id, + success: status.success, + code: status.code, + }), + Err(x) => reply.send(Response::from(x)), } } diff --git a/distant-local/src/api/state/search.rs b/distant-local/src/api/state/search.rs index 7d9f33e..2c6f118 100644 --- a/distant-local/src/api/state/search.rs +++ b/distant-local/src/api/state/search.rs @@ -224,13 +224,10 @@ impl SearchQueryReporter { if let Some(len) = options.pagination { if matches.len() as u64 >= len { trace!("[Query {id}] Reached {len} paginated matches"); - if let Err(x) = reply - .send(Response::SearchResults { - id, - matches: std::mem::take(&mut matches), - }) - .await - { + if let Err(x) = reply.send(Response::SearchResults { + id, + matches: std::mem::take(&mut matches), + }) { error!("[Query {id}] Failed to send paginated matches: {x}"); } } @@ -240,14 +237,14 @@ impl SearchQueryReporter { // Send any remaining matches if !matches.is_empty() { trace!("[Query {id}] Sending {} remaining matches", matches.len()); - if let Err(x) = reply.send(Response::SearchResults { id, matches }).await { + if let Err(x) = reply.send(Response::SearchResults { id, matches }) { error!("[Query {id}] Failed to send final matches: {x}"); } } // Report that we are done trace!("[Query {id}] Reporting as done"); - if let Err(x) = reply.send(Response::SearchDone { id }).await { + if let Err(x) = reply.send(Response::SearchDone { id }) { error!("[Query {id}] Failed to send done status: {x}"); } } @@ -842,7 +839,7 @@ mod tests { let root = setup_dir(Vec::new()); let state = SearchState::new(); - let (reply, mut rx) = mpsc::channel(100); + let (reply, mut rx) = mpsc::unbounded_channel(); let query = SearchQuery { paths: vec![root.path().to_path_buf()], @@ -869,7 +866,7 @@ mod tests { ]); let state = SearchState::new(); - let (reply, mut rx) = mpsc::channel(100); + let (reply, mut rx) = mpsc::unbounded_channel(); let query = SearchQuery { paths: vec![root.path().to_path_buf()], @@ -946,7 +943,7 @@ mod tests { ]); let state = SearchState::new(); - let (reply, mut rx) = mpsc::channel(100); + let (reply, mut rx) = mpsc::unbounded_channel(); let query = SearchQuery { paths: vec![root.path().to_path_buf()], @@ -1021,7 +1018,7 @@ mod tests { ]); let state = SearchState::new(); - let (reply, mut rx) = mpsc::channel(100); + let (reply, mut rx) = mpsc::unbounded_channel(); let query = SearchQuery { paths: vec![root.path().to_path_buf()], @@ -1089,7 +1086,7 @@ mod tests { let root = setup_dir(vec![("path/to/file.txt", "aa ab ac\nba bb bc\nca cb cc")]); let state = SearchState::new(); - let (reply, mut rx) = mpsc::channel(100); + let (reply, mut rx) = mpsc::unbounded_channel(); let query = SearchQuery { paths: vec![root.path().to_path_buf()], @@ -1183,7 +1180,7 @@ mod tests { ]); let state = SearchState::new(); - let (reply, mut rx) = mpsc::channel(100); + let (reply, mut rx) = mpsc::unbounded_channel(); let query = SearchQuery { paths: vec![root.path().to_path_buf()], @@ -1276,7 +1273,7 @@ mod tests { ]); let state = SearchState::new(); - let (reply, mut rx) = mpsc::channel(100); + let (reply, mut rx) = mpsc::unbounded_channel(); let query = SearchQuery { paths: vec![root.path().to_path_buf()], @@ -1310,7 +1307,7 @@ mod tests { ]); let state = SearchState::new(); - let (reply, mut rx) = mpsc::channel(100); + let (reply, mut rx) = mpsc::unbounded_channel(); let query = SearchQuery { paths: vec![root.path().to_path_buf()], @@ -1353,7 +1350,7 @@ mod tests { expected_paths: Vec, ) { let state = SearchState::new(); - let (reply, mut rx) = mpsc::channel(100); + let (reply, mut rx) = mpsc::unbounded_channel(); let query = SearchQuery { paths: vec![root.path().to_path_buf()], target: SearchQueryTarget::Path, @@ -1441,7 +1438,7 @@ mod tests { ]); let state = SearchState::new(); - let (reply, mut rx) = mpsc::channel(100); + let (reply, mut rx) = mpsc::unbounded_channel(); let query = SearchQuery { paths: vec![root.path().to_path_buf()], @@ -1493,7 +1490,7 @@ mod tests { ]); let state = SearchState::new(); - let (reply, mut rx) = mpsc::channel(100); + let (reply, mut rx) = mpsc::unbounded_channel(); let query = SearchQuery { paths: vec![root.path().to_path_buf()], @@ -1559,7 +1556,7 @@ mod tests { .unwrap(); let state = SearchState::new(); - let (reply, mut rx) = mpsc::channel(100); + let (reply, mut rx) = mpsc::unbounded_channel(); // NOTE: We provide regex that matches an invalid UTF-8 character by disabling the u flag // and checking for 0x9F (159) @@ -1611,7 +1608,7 @@ mod tests { .unwrap(); let state = SearchState::new(); - let (reply, mut rx) = mpsc::channel(100); + let (reply, mut rx) = mpsc::unbounded_channel(); // NOTE: We provide regex that matches an invalid UTF-8 character by disabling the u flag // and checking for 0x9F (159) @@ -1647,7 +1644,7 @@ mod tests { expected_paths: Vec, ) { let state = SearchState::new(); - let (reply, mut rx) = mpsc::channel(100); + let (reply, mut rx) = mpsc::unbounded_channel(); let query = SearchQuery { paths: vec![root.path().to_path_buf()], @@ -1731,7 +1728,7 @@ mod tests { .unwrap(); let state = SearchState::new(); - let (reply, mut rx) = mpsc::channel(100); + let (reply, mut rx) = mpsc::unbounded_channel(); let query = SearchQuery { paths: vec![root.path().to_path_buf()], @@ -1786,7 +1783,7 @@ mod tests { .unwrap(); let state = SearchState::new(); - let (reply, mut rx) = mpsc::channel(100); + let (reply, mut rx) = mpsc::unbounded_channel(); // NOTE: Following symlobic links on its own does nothing, but when combined with a file // type filter, it will evaluate the underlying type of symbolic links and filter @@ -1834,7 +1831,7 @@ mod tests { ]); let state = SearchState::new(); - let (reply, mut rx) = mpsc::channel(100); + let (reply, mut rx) = mpsc::unbounded_channel(); let query = SearchQuery { paths: vec![ @@ -1918,7 +1915,7 @@ mod tests { expected_paths: Vec, ) { let state = SearchState::new(); - let (reply, mut rx) = mpsc::channel(100); + let (reply, mut rx) = mpsc::unbounded_channel(); let query = SearchQuery { paths: vec![path], target: SearchQueryTarget::Path, diff --git a/distant-local/src/api/state/watcher.rs b/distant-local/src/api/state/watcher.rs index 1e5db77..c612fd7 100644 --- a/distant-local/src/api/state/watcher.rs +++ b/distant-local/src/api/state/watcher.rs @@ -394,7 +394,7 @@ async fn watcher_task( extra: ev.info().map(ToString::to_string), }, }; - match registered_path.filter_and_send(change).await { + match registered_path.filter_and_send(change) { Ok(_) => (), Err(x) => error!( "[Conn {}] Failed to forward changes to paths: {}", @@ -410,10 +410,11 @@ async fn watcher_task( error!("Watcher encountered an error {} for {:?}", msg, err.paths); for registered_path in registered_paths.iter() { - match registered_path - .filter_and_send_error(&msg, &err.paths, !err.paths.is_empty()) - .await - { + match registered_path.filter_and_send_error( + &msg, + &err.paths, + !err.paths.is_empty(), + ) { Ok(_) => (), Err(x) => error!( "[Conn {}] Failed to forward changes to paths: {}", diff --git a/distant-local/src/api/state/watcher/path.rs b/distant-local/src/api/state/watcher/path.rs index 39dabe2..718631a 100644 --- a/distant-local/src/api/state/watcher/path.rs +++ b/distant-local/src/api/state/watcher/path.rs @@ -122,17 +122,14 @@ impl RegisteredPath { /// out any changes that are not applicable. /// /// Returns true if message was sent, and false if not. - pub async fn filter_and_send(&self, change: Change) -> io::Result { + pub fn filter_and_send(&self, change: Change) -> io::Result { if !self.allowed().contains(&change.kind) { return Ok(false); } // Only send if this registered path applies to the changed path if self.applies_to_path(&change.path) { - self.reply - .send(Response::Changed(change)) - .await - .map(|_| true) + self.reply.send(Response::Changed(change)).map(|_| true) } else { Ok(false) } @@ -142,7 +139,7 @@ impl RegisteredPath { /// no paths match and `skip_if_no_paths` is true. /// /// Returns true if message was sent, and false if not. - pub async fn filter_and_send_error( + pub fn filter_and_send_error( &self, msg: &str, paths: T, @@ -165,7 +162,6 @@ impl RegisteredPath { } else { Response::Error(Error::from(format!("{msg} about {paths:?}"))) }) - .await .map(|_| true) } else { Ok(false) diff --git a/distant-net/src/manager/server.rs b/distant-net/src/manager/server.rs index 75ad981..c223ee4 100644 --- a/distant-net/src/manager/server.rs +++ b/distant-net/src/manager/server.rs @@ -353,7 +353,7 @@ impl ServerHandler for ManagerServer { } }; - if let Err(x) = reply.send(response).await { + if let Err(x) = reply.send(response) { error!("[Conn {}] {}", connection_id, x); } } @@ -392,7 +392,7 @@ mod tests { let authenticator = ManagerAuthenticator { reply: ServerReply { origin_id: format!("{}", rand::random::()), - tx: mpsc::channel(1).0, + tx: mpsc::unbounded_channel().0, }, registry: Arc::clone(®istry), }; diff --git a/distant-net/src/manager/server/authentication.rs b/distant-net/src/manager/server/authentication.rs index 74ae48a..edc6963 100644 --- a/distant-net/src/manager/server/authentication.rs +++ b/distant-net/src/manager/server/authentication.rs @@ -29,19 +29,15 @@ impl ManagerAuthenticator { let id = rand::random(); self.registry.write().await.insert(id, tx); - self.reply - .send(ManagerResponse::Authenticate { id, msg }) - .await?; + self.reply.send(ManagerResponse::Authenticate { id, msg })?; rx.await .map_err(|x| io::Error::new(io::ErrorKind::Other, x)) } /// Sends an [`Authentication`] `msg` without expecting a reply. No callback is stored. - async fn fire(&self, msg: Authentication) -> io::Result<()> { + fn fire(&self, msg: Authentication) -> io::Result<()> { let id = rand::random(); - self.reply - .send(ManagerResponse::Authenticate { id, msg }) - .await?; + self.reply.send(ManagerResponse::Authenticate { id, msg })?; Ok(()) } } @@ -89,18 +85,18 @@ impl Authenticator for ManagerAuthenticator { } async fn info(&mut self, info: Info) -> io::Result<()> { - self.fire(Authentication::Info(info)).await + self.fire(Authentication::Info(info)) } async fn error(&mut self, error: Error) -> io::Result<()> { - self.fire(Authentication::Error(error)).await + self.fire(Authentication::Error(error)) } async fn start_method(&mut self, start_method: StartMethod) -> io::Result<()> { - self.fire(Authentication::StartMethod(start_method)).await + self.fire(Authentication::StartMethod(start_method)) } async fn finished(&mut self) -> io::Result<()> { - self.fire(Authentication::Finished).await + self.fire(Authentication::Finished) } } diff --git a/distant-net/src/manager/server/connection.rs b/distant-net/src/manager/server/connection.rs index d278788..2aaf921 100644 --- a/distant-net/src/manager/server/connection.rs +++ b/distant-net/src/manager/server/connection.rs @@ -225,19 +225,9 @@ async fn action_task( response: res, }; - // TODO: This seems to get stuck at times with some change recently, - // so we kick this off in a new task instead. The better solution - // is to switch most of our mpsc usage to be unbounded so we - // don't need an async call. The only bounded ones should be those - // externally facing to the API user, if even that. - // - // https://github.com/chipsenkbeil/distant/issues/205 - let reply = reply.clone(); - tokio::spawn(async move { - if let Err(x) = reply.send(response).await { - error!("[Conn {id}] {x}"); - } - }); + if let Err(x) = reply.send(response) { + error!("[Conn {id}] {x}"); + } } } Action::Write { id, mut req } => { diff --git a/distant-net/src/server.rs b/distant-net/src/server.rs index 548dfd2..6f5c677 100644 --- a/distant-net/src/server.rs +++ b/distant-net/src/server.rs @@ -262,7 +262,7 @@ mod tests { async fn on_request(&self, ctx: RequestCtx) { // Always send back "hello" - ctx.reply.send("hello".to_string()).await.unwrap(); + ctx.reply.send("hello".to_string()).unwrap(); } } diff --git a/distant-net/src/server/builder/tcp.rs b/distant-net/src/server/builder/tcp.rs index de75251..2129d69 100644 --- a/distant-net/src/server/builder/tcp.rs +++ b/distant-net/src/server/builder/tcp.rs @@ -76,10 +76,7 @@ mod tests { async fn on_request(&self, ctx: RequestCtx) { // Echo back what we received - ctx.reply - .send(ctx.request.payload.to_string()) - .await - .unwrap(); + ctx.reply.send(ctx.request.payload.to_string()).unwrap(); } } diff --git a/distant-net/src/server/builder/unix.rs b/distant-net/src/server/builder/unix.rs index 6e12ffb..4bddc1c 100644 --- a/distant-net/src/server/builder/unix.rs +++ b/distant-net/src/server/builder/unix.rs @@ -76,10 +76,7 @@ mod tests { async fn on_request(&self, ctx: RequestCtx) { // Echo back what we received - ctx.reply - .send(ctx.request.payload.to_string()) - .await - .unwrap(); + ctx.reply.send(ctx.request.payload.to_string()).unwrap(); } } diff --git a/distant-net/src/server/builder/windows.rs b/distant-net/src/server/builder/windows.rs index a5e7402..5d506fa 100644 --- a/distant-net/src/server/builder/windows.rs +++ b/distant-net/src/server/builder/windows.rs @@ -87,10 +87,7 @@ mod tests { async fn on_request(&self, ctx: RequestCtx) { // Echo back what we received - ctx.reply - .send(ctx.request.payload.to_string()) - .await - .unwrap(); + ctx.reply.send(ctx.request.payload.to_string()).unwrap(); } } diff --git a/distant-net/src/server/connection.rs b/distant-net/src/server/connection.rs index c66cfc3..43658f9 100644 --- a/distant-net/src/server/connection.rs +++ b/distant-net/src/server/connection.rs @@ -441,12 +441,12 @@ where } None => { warn!("[Conn {id}] Existing connection with id, but channels not saved"); - mpsc::channel::>(1) + mpsc::unbounded_channel::>() } }, None => { debug!("[Conn {id}] Marked as new connection"); - mpsc::channel::>(1) + mpsc::unbounded_channel::>() } }; @@ -609,7 +609,7 @@ mod tests { async fn on_request(&self, ctx: RequestCtx) { // Always send back "hello" - ctx.reply.send("hello".to_string()).await.unwrap(); + ctx.reply.send("hello".to_string()).unwrap(); } } diff --git a/distant-net/src/server/reply.rs b/distant-net/src/server/reply.rs index 0eca1ec..5adacac 100644 --- a/distant-net/src/server/reply.rs +++ b/distant-net/src/server/reply.rs @@ -1,9 +1,7 @@ -use std::future::Future; use std::io; -use std::pin::Pin; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; -use tokio::sync::{mpsc, Mutex}; +use tokio::sync::mpsc; use crate::common::{Id, Response}; @@ -11,29 +9,18 @@ use crate::common::{Id, Response}; pub trait Reply: Send + Sync { type Data; - /// Sends a reply out from the server - fn send(&self, data: Self::Data) -> Pin> + Send + '_>>; + /// Sends a reply out from the server. + fn send(&self, data: Self::Data) -> io::Result<()>; - /// Blocking version of sending a reply out from the server - fn blocking_send(&self, data: Self::Data) -> io::Result<()>; - - /// Clones this reply + /// Clones this reply. fn clone_reply(&self) -> Box>; } -impl Reply for mpsc::Sender { +impl Reply for mpsc::UnboundedSender { type Data = T; - fn send(&self, data: Self::Data) -> Pin> + Send + '_>> { - Box::pin(async move { - mpsc::Sender::send(self, data) - .await - .map_err(|x| io::Error::new(io::ErrorKind::Other, x.to_string())) - }) - } - - fn blocking_send(&self, data: Self::Data) -> io::Result<()> { - mpsc::Sender::blocking_send(self, data) + fn send(&self, data: Self::Data) -> io::Result<()> { + mpsc::UnboundedSender::send(self, data) .map_err(|x| io::Error::new(io::ErrorKind::Other, x.to_string())) } @@ -45,7 +32,7 @@ impl Reply for mpsc::Sender { /// Utility to send ad-hoc replies from the server back through the connection pub struct ServerReply { pub(crate) origin_id: Id, - pub(crate) tx: mpsc::Sender>, + pub(crate) tx: mpsc::UnboundedSender>, } impl Clone for ServerReply { @@ -58,16 +45,9 @@ impl Clone for ServerReply { } impl ServerReply { - pub async fn send(&self, data: T) -> io::Result<()> { + pub fn send(&self, data: T) -> io::Result<()> { self.tx .send(Response::new(self.origin_id.clone(), data)) - .await - .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "Connection reply closed")) - } - - pub fn blocking_send(&self, data: T) -> io::Result<()> { - self.tx - .blocking_send(Response::new(self.origin_id.clone(), data)) .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "Connection reply closed")) } @@ -87,12 +67,8 @@ impl ServerReply { impl Reply for ServerReply { type Data = T; - fn send(&self, data: Self::Data) -> Pin> + Send + '_>> { - Box::pin(ServerReply::send(self, data)) - } - - fn blocking_send(&self, data: Self::Data) -> io::Result<()> { - ServerReply::blocking_send(self, data) + fn send(&self, data: Self::Data) -> io::Result<()> { + ServerReply::send(self, data) } fn clone_reply(&self) -> Box> { @@ -125,38 +101,27 @@ impl QueuedServerReply { /// /// * If true, all messages are held until the queue is flushed /// * If false, messages are sent directly as they come in - pub async fn hold(&self, hold: bool) { - *self.hold.lock().await = hold; - } - - /// Send this message, adding it to a queue if holding messages - pub async fn send(&self, data: T) -> io::Result<()> { - if *self.hold.lock().await { - self.queue.lock().await.push(data); - Ok(()) - } else { - self.inner.send(data).await - } + pub fn hold(&self, hold: bool) { + *self.hold.lock().unwrap() = hold; } - /// Send this message, adding it to a queue if holding messages, blocking - /// for access to locks and other internals - pub fn blocking_send(&self, data: T) -> io::Result<()> { - if *self.hold.blocking_lock() { - self.queue.blocking_lock().push(data); + /// Send this message, adding it to a queue if holding messages. + pub fn send(&self, data: T) -> io::Result<()> { + if *self.hold.lock().unwrap() { + self.queue.lock().unwrap().push(data); Ok(()) } else { - self.inner.blocking_send(data) + self.inner.send(data) } } /// Send this message before anything else in the queue - pub async fn send_before(&self, data: T) -> io::Result<()> { - if *self.hold.lock().await { - self.queue.lock().await.insert(0, data); + pub fn send_before(&self, data: T) -> io::Result<()> { + if *self.hold.lock().unwrap() { + self.queue.lock().unwrap().insert(0, data); Ok(()) } else { - self.inner.send(data).await + self.inner.send(data) } } @@ -165,14 +130,14 @@ impl QueuedServerReply { /// Additionally, takes `hold` to indicate whether or not new msgs /// after the flush should continue to be held within the queue /// or if all future msgs will be sent immediately - pub async fn flush(&self, hold: bool) -> io::Result<()> { + pub fn flush(&self, hold: bool) -> io::Result<()> { // Lock hold so we can ensure that nothing gets sent // to the queue after we clear it - let mut hold_lock = self.hold.lock().await; + let mut hold_lock = self.hold.lock().unwrap(); // Clear the queue by sending everything - for data in self.queue.lock().await.drain(..) { - self.inner.send(data).await?; + for data in self.queue.lock().unwrap().drain(..) { + self.inner.send(data)?; } // Update hold to @@ -189,12 +154,8 @@ impl QueuedServerReply { impl Reply for QueuedServerReply { type Data = T; - fn send(&self, data: Self::Data) -> Pin> + Send + '_>> { - Box::pin(QueuedServerReply::send(self, data)) - } - - fn blocking_send(&self, data: Self::Data) -> io::Result<()> { - QueuedServerReply::blocking_send(self, data) + fn send(&self, data: Self::Data) -> io::Result<()> { + QueuedServerReply::send(self, data) } fn clone_reply(&self) -> Box> { diff --git a/distant-net/src/server/state.rs b/distant-net/src/server/state.rs index 935f945..ce89ab9 100644 --- a/distant-net/src/server/state.rs +++ b/distant-net/src/server/state.rs @@ -31,7 +31,7 @@ impl Default for ServerState { pub struct ConnectionState { shutdown_tx: oneshot::Sender<()>, - task: JoinHandle, mpsc::Receiver)>>, + task: JoinHandle, mpsc::UnboundedReceiver)>>, } impl ConnectionState { @@ -40,7 +40,7 @@ impl ConnectionState { #[allow(clippy::type_complexity)] pub fn channel() -> ( oneshot::Receiver<()>, - oneshot::Sender<(mpsc::Sender, mpsc::Receiver)>, + oneshot::Sender<(mpsc::UnboundedSender, mpsc::UnboundedReceiver)>, Self, ) { let (shutdown_tx, shutdown_rx) = oneshot::channel(); @@ -65,7 +65,9 @@ impl ConnectionState { self.task.is_finished() } - pub async fn shutdown_and_wait(self) -> Option<(mpsc::Sender, mpsc::Receiver)> { + pub async fn shutdown_and_wait( + self, + ) -> Option<(mpsc::UnboundedSender, mpsc::UnboundedReceiver)> { let _ = self.shutdown_tx.send(()); self.task.await.unwrap() } diff --git a/distant-net/tests/manager_tests.rs b/distant-net/tests/manager_tests.rs index a242fa8..370fade 100644 --- a/distant-net/tests/manager_tests.rs +++ b/distant-net/tests/manager_tests.rs @@ -20,7 +20,6 @@ impl ServerHandler for TestServerHandler { async fn on_request(&self, ctx: RequestCtx) { ctx.reply .send(format!("echo {}", ctx.request.payload)) - .await .expect("Failed to send response") } } diff --git a/distant-net/tests/typed_tests.rs b/distant-net/tests/typed_tests.rs index 266fc87..beecce7 100644 --- a/distant-net/tests/typed_tests.rs +++ b/distant-net/tests/typed_tests.rs @@ -19,7 +19,6 @@ impl ServerHandler for TestServerHandler { for i in 0..cnt { ctx.reply .send(format!("echo {i} {msg}")) - .await .expect("Failed to send response"); } } diff --git a/distant-net/tests/untyped_tests.rs b/distant-net/tests/untyped_tests.rs index 2440465..5abd2cf 100644 --- a/distant-net/tests/untyped_tests.rs +++ b/distant-net/tests/untyped_tests.rs @@ -19,7 +19,6 @@ impl ServerHandler for TestServerHandler { for i in 0..cnt { ctx.reply .send(format!("echo {i} {msg}")) - .await .expect("Failed to send response"); } } diff --git a/distant-ssh2/src/process.rs b/distant-ssh2/src/process.rs index e9993c2..251d63f 100644 --- a/distant-ssh2/src/process.rs +++ b/distant-ssh2/src/process.rs @@ -216,7 +216,7 @@ fn spawn_blocking_stdout_task( id, data: buf[..n].to_vec(), }; - if reply.blocking_send(payload).is_err() { + if reply.send(payload).is_err() { error!("[Ssh | Proc {}] Stdout channel closed", id); break; } @@ -247,7 +247,7 @@ fn spawn_nonblocking_stdout_task( id, data: buf[..n].to_vec(), }; - if reply.send(payload).await.is_err() { + if reply.send(payload).is_err() { error!("[Ssh | Proc {}] Stdout channel closed", id); break; } @@ -281,7 +281,7 @@ fn spawn_nonblocking_stderr_task( id, data: buf[..n].to_vec(), }; - if reply.send(payload).await.is_err() { + if reply.send(payload).is_err() { error!("[Ssh | Proc {}] Stderr channel closed", id); break; } @@ -423,7 +423,7 @@ where code: if success { Some(0) } else { None }, }; - if reply.send(payload).await.is_err() { + if reply.send(payload).is_err() { error!("[Ssh | Proc {}] Failed to send done", id,); } })