diff --git a/Cargo.toml b/Cargo.toml index 65a55ae..5c52f54 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,8 +33,8 @@ serde = "1.0.101" serde_derive = "1.0.101" serde-big-array = "0.1.5" siphasher = "0.3.1" -tokio = "=0.2.0-alpha.4" -tokio-net = "=0.2.0-alpha.4" +tokio = "=0.2.0-alpha.5" +tokio-net = "=0.2.0-alpha.5" toml = "0.5.3" [profile.release] diff --git a/src/main.rs b/src/main.rs index 960e9db..4ead4bc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -47,6 +47,7 @@ use dnsstamps::{InformalProperty, WithInformalProperty}; use failure::{bail, ensure}; use futures::join; use futures::prelude::*; +use futures::FutureExt; use parking_lot::Mutex; use parking_lot::RwLock; use privdrop::PrivDrop; @@ -200,7 +201,7 @@ async fn handle_client_query( async fn tls_proxy( globals: Arc, binlen: [u8; 2], - client_connection: TcpStream, + mut client_connection: TcpStream, ) -> Result<(), Error> { let tls_upstream_addr = match &globals.tls_upstream_addr { None => return Ok(()), @@ -212,7 +213,7 @@ async fn tls_proxy( }? .bind(&globals.external_addr)? .to_tcp_stream()?; - let ext_socket = + let mut ext_socket = TcpStream::connect_std(std_socket, tls_upstream_addr, &Handle::default()).await?; let (mut erh, mut ewh) = ext_socket.split(); let (mut rh, mut wh) = client_connection.split(); @@ -267,8 +268,8 @@ async fn tcp_acceptor(globals: Arc, tcp_listener: TcpListener) -> Resul Ok(()) }; let fut_abort = rx; - let fut_all = future::select(fut.boxed(), fut_abort).timeout(timeout); - runtime.spawn(fut_all.map(move |_| { + let fut_all = future::select(FutureExt::boxed(fut), fut_abort).timeout(timeout); + runtime.spawn(FutureExt::map(fut_all, move |_| { concurrent_connections.fetch_sub(1, Ordering::Relaxed); })); } @@ -311,8 +312,8 @@ async fn udp_acceptor( let concurrent_connections = concurrent_connections.clone(); let fut = handle_client_query(globals, client_ctx, packet); let fut_abort = rx; - let fut_all = future::select(fut.boxed(), fut_abort).timeout(timeout); - runtime.spawn(fut_all.map(move |_| { + let fut_all = future::select(FutureExt::boxed(fut), fut_abort).timeout(timeout); + runtime.spawn(FutureExt::map(fut_all, move |_| { concurrent_connections.fetch_sub(1, Ordering::Relaxed); })); } @@ -324,8 +325,14 @@ async fn start( listeners: Vec<(TcpListener, std::net::UdpSocket)>, ) -> Result<(), Error> { for listener in listeners { - runtime.spawn(tcp_acceptor(globals.clone(), listener.0).map(|_| {})); - runtime.spawn(udp_acceptor(globals.clone(), listener.1).map(|_| {})); + runtime.spawn(FutureExt::map( + tcp_acceptor(globals.clone(), listener.0), + |_| {}, + )); + runtime.spawn(FutureExt::map( + udp_acceptor(globals.clone(), listener.1), + |_| {}, + )); } Ok(()) } @@ -564,14 +571,13 @@ fn main() -> Result<(), Error> { if !state_is_new { updater.update(); } - runtime.spawn( - start(globals, runtime.clone(), listeners) - .map_err(|e| { - error!("Unable to start the service: [{}]", e); - std::process::exit(1); - }) - .map(|_| ()), - ); + runtime.spawn(FutureExt::map( + start(globals, runtime.clone(), listeners).map_err(|e| { + error!("Unable to start the service: [{}]", e); + std::process::exit(1); + }), + |_| (), + )); runtime.block_on(updater.run()); Ok(())