In a future, don't remove cancelled entries

They have already been removed by the pop_back() call.

Cancellation in Rust is such a clusterfuck...
pull/114/head
Frank Denis 2 years ago
parent 7c04ba000f
commit 68f40b7570

6
Cargo.lock generated

@ -16,9 +16,9 @@ checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234"
[[package]] [[package]]
name = "anyhow" name = "anyhow"
version = "1.0.60" version = "1.0.61"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c794e162a5eff65c72ef524dfe393eb923c354e350bb78b9c7383df13f3bc142" checksum = "508b352bb5c066aac251f6daf6b36eccd03e8a88e8081cd44959ea277a3af9a8"
[[package]] [[package]]
name = "autocfg" name = "autocfg"
@ -177,7 +177,7 @@ dependencies = [
[[package]] [[package]]
name = "encrypted-dns" name = "encrypted-dns"
version = "0.9.7" version = "0.9.8"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"byteorder", "byteorder",

@ -12,7 +12,7 @@ categories = ["asynchronous", "network-programming", "command-line-utilities"]
readme = "README.md" readme = "README.md"
[dependencies] [dependencies]
anyhow = "1.0.60" anyhow = "1.0.61"
byteorder = "1.4.3" byteorder = "1.4.3"
clap = { version = "3.2.16", default-features = false, features = [ clap = { version = "3.2.16", default-features = false, features = [
"std", "std",

@ -57,6 +57,7 @@ use dnscrypt::*;
use dnscrypt_certs::*; use dnscrypt_certs::*;
use dnsstamps::{InformalProperty, WithInformalProperty}; use dnsstamps::{InformalProperty, WithInformalProperty};
use errors::*; use errors::*;
use future::Either;
use futures::join; use futures::join;
use futures::prelude::*; use futures::prelude::*;
use globals::*; use globals::*;
@ -339,12 +340,18 @@ async fn tcp_acceptor(globals: Arc<Globals>, tcp_listener: TcpListener) -> Resul
}; };
let fut_abort = rx; let fut_abort = rx;
let fut_all = tokio::time::timeout(timeout, future::select(fut.boxed(), fut_abort)); let fut_all = tokio::time::timeout(timeout, future::select(fut.boxed(), fut_abort));
runtime_handle.spawn(fut_all.map(move |_| { runtime_handle.spawn(fut_all.map(move |either| {
let _count = concurrent_connections.fetch_sub(1, Ordering::Relaxed); let _count = concurrent_connections.fetch_sub(1, Ordering::Relaxed);
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
varz.inflight_tcp_queries.set(_count.saturating_sub(1) as _); varz.inflight_tcp_queries.set(_count.saturating_sub(1) as _);
let mut active_connections = active_connections.lock();
_ = active_connections.remove(tx_channel_index); if let Ok(Either::Right(_)) = either {
// Removing the active connection was already done during
// cancellation.
} else {
let mut active_connections = active_connections.lock();
_ = active_connections.remove(tx_channel_index);
}
})); }));
} }
} }
@ -394,12 +401,18 @@ async fn udp_acceptor(
let fut = handle_client_query(globals, client_ctx, packet); let fut = handle_client_query(globals, client_ctx, packet);
let fut_abort = rx; let fut_abort = rx;
let fut_all = tokio::time::timeout(timeout, future::select(fut.boxed(), fut_abort)); let fut_all = tokio::time::timeout(timeout, future::select(fut.boxed(), fut_abort));
runtime_handle.spawn(fut_all.map(move |_| { runtime_handle.spawn(fut_all.map(move |either| {
let _count = concurrent_connections.fetch_sub(1, Ordering::Relaxed); let _count = concurrent_connections.fetch_sub(1, Ordering::Relaxed);
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
varz.inflight_udp_queries.set(_count.saturating_sub(1) as _); varz.inflight_udp_queries.set(_count.saturating_sub(1) as _);
let mut active_connections = active_connections.lock();
_ = active_connections.remove(tx_channel_index); if let Ok(Either::Right(_)) = either {
// Removing the active connection was already done during
// cancellation.
} else {
let mut active_connections = active_connections.lock();
_ = active_connections.remove(tx_channel_index);
}
})); }));
} }
} }

Loading…
Cancel
Save