From 68f40b7570f606f231873e0415985b1a8f851794 Mon Sep 17 00:00:00 2001 From: Frank Denis Date: Thu, 11 Aug 2022 12:52:17 +0200 Subject: [PATCH] In a future, don't remove cancelled entries They have already been removed by the pop_back() call. Cancellation in Rust is such a clusterfuck... --- Cargo.lock | 6 +++--- Cargo.toml | 2 +- src/main.rs | 25 +++++++++++++++++++------ 3 files changed, 23 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3079d79..a71ca5a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -16,9 +16,9 @@ checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" [[package]] name = "anyhow" -version = "1.0.60" +version = "1.0.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c794e162a5eff65c72ef524dfe393eb923c354e350bb78b9c7383df13f3bc142" +checksum = "508b352bb5c066aac251f6daf6b36eccd03e8a88e8081cd44959ea277a3af9a8" [[package]] name = "autocfg" @@ -177,7 +177,7 @@ dependencies = [ [[package]] name = "encrypted-dns" -version = "0.9.7" +version = "0.9.8" dependencies = [ "anyhow", "byteorder", diff --git a/Cargo.toml b/Cargo.toml index 5350db3..c712da1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ categories = ["asynchronous", "network-programming", "command-line-utilities"] readme = "README.md" [dependencies] -anyhow = "1.0.60" +anyhow = "1.0.61" byteorder = "1.4.3" clap = { version = "3.2.16", default-features = false, features = [ "std", diff --git a/src/main.rs b/src/main.rs index d5d7796..302e54a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -57,6 +57,7 @@ use dnscrypt::*; use dnscrypt_certs::*; use dnsstamps::{InformalProperty, WithInformalProperty}; use errors::*; +use future::Either; use futures::join; use futures::prelude::*; use globals::*; @@ -339,12 +340,18 @@ async fn tcp_acceptor(globals: Arc, tcp_listener: TcpListener) -> Resul }; let fut_abort = rx; let fut_all = tokio::time::timeout(timeout, future::select(fut.boxed(), fut_abort)); - runtime_handle.spawn(fut_all.map(move |_| { + runtime_handle.spawn(fut_all.map(move |either| { let _count = concurrent_connections.fetch_sub(1, Ordering::Relaxed); #[cfg(feature = "metrics")] varz.inflight_tcp_queries.set(_count.saturating_sub(1) as _); - let mut active_connections = active_connections.lock(); - _ = active_connections.remove(tx_channel_index); + + if let Ok(Either::Right(_)) = either { + // Removing the active connection was already done during + // cancellation. + } else { + let mut active_connections = active_connections.lock(); + _ = active_connections.remove(tx_channel_index); + } })); } } @@ -394,12 +401,18 @@ async fn udp_acceptor( let fut = handle_client_query(globals, client_ctx, packet); let fut_abort = rx; let fut_all = tokio::time::timeout(timeout, future::select(fut.boxed(), fut_abort)); - runtime_handle.spawn(fut_all.map(move |_| { + runtime_handle.spawn(fut_all.map(move |either| { let _count = concurrent_connections.fetch_sub(1, Ordering::Relaxed); #[cfg(feature = "metrics")] varz.inflight_udp_queries.set(_count.saturating_sub(1) as _); - let mut active_connections = active_connections.lock(); - _ = active_connections.remove(tx_channel_index); + + if let Ok(Either::Right(_)) = either { + // Removing the active connection was already done during + // cancellation. + } else { + let mut active_connections = active_connections.lock(); + _ = active_connections.remove(tx_channel_index); + } })); } }