Tokio update, that makes things more complicated

pull/5/head
Frank Denis 5 years ago
parent 18fe23471d
commit 7ebcc7287e

@ -33,8 +33,8 @@ serde = "1.0.101"
serde_derive = "1.0.101" serde_derive = "1.0.101"
serde-big-array = "0.1.5" serde-big-array = "0.1.5"
siphasher = "0.3.1" siphasher = "0.3.1"
tokio = "=0.2.0-alpha.4" tokio = "=0.2.0-alpha.5"
tokio-net = "=0.2.0-alpha.4" tokio-net = "=0.2.0-alpha.5"
toml = "0.5.3" toml = "0.5.3"
[profile.release] [profile.release]

@ -47,6 +47,7 @@ use dnsstamps::{InformalProperty, WithInformalProperty};
use failure::{bail, ensure}; use failure::{bail, ensure};
use futures::join; use futures::join;
use futures::prelude::*; use futures::prelude::*;
use futures::FutureExt;
use parking_lot::Mutex; use parking_lot::Mutex;
use parking_lot::RwLock; use parking_lot::RwLock;
use privdrop::PrivDrop; use privdrop::PrivDrop;
@ -200,7 +201,7 @@ async fn handle_client_query(
async fn tls_proxy( async fn tls_proxy(
globals: Arc<Globals>, globals: Arc<Globals>,
binlen: [u8; 2], binlen: [u8; 2],
client_connection: TcpStream, mut client_connection: TcpStream,
) -> Result<(), Error> { ) -> Result<(), Error> {
let tls_upstream_addr = match &globals.tls_upstream_addr { let tls_upstream_addr = match &globals.tls_upstream_addr {
None => return Ok(()), None => return Ok(()),
@ -212,7 +213,7 @@ async fn tls_proxy(
}? }?
.bind(&globals.external_addr)? .bind(&globals.external_addr)?
.to_tcp_stream()?; .to_tcp_stream()?;
let ext_socket = let mut ext_socket =
TcpStream::connect_std(std_socket, tls_upstream_addr, &Handle::default()).await?; TcpStream::connect_std(std_socket, tls_upstream_addr, &Handle::default()).await?;
let (mut erh, mut ewh) = ext_socket.split(); let (mut erh, mut ewh) = ext_socket.split();
let (mut rh, mut wh) = client_connection.split(); let (mut rh, mut wh) = client_connection.split();
@ -267,8 +268,8 @@ async fn tcp_acceptor(globals: Arc<Globals>, tcp_listener: TcpListener) -> Resul
Ok(()) Ok(())
}; };
let fut_abort = rx; let fut_abort = rx;
let fut_all = future::select(fut.boxed(), fut_abort).timeout(timeout); let fut_all = future::select(FutureExt::boxed(fut), fut_abort).timeout(timeout);
runtime.spawn(fut_all.map(move |_| { runtime.spawn(FutureExt::map(fut_all, move |_| {
concurrent_connections.fetch_sub(1, Ordering::Relaxed); concurrent_connections.fetch_sub(1, Ordering::Relaxed);
})); }));
} }
@ -311,8 +312,8 @@ async fn udp_acceptor(
let concurrent_connections = concurrent_connections.clone(); let concurrent_connections = concurrent_connections.clone();
let fut = handle_client_query(globals, client_ctx, packet); let fut = handle_client_query(globals, client_ctx, packet);
let fut_abort = rx; let fut_abort = rx;
let fut_all = future::select(fut.boxed(), fut_abort).timeout(timeout); let fut_all = future::select(FutureExt::boxed(fut), fut_abort).timeout(timeout);
runtime.spawn(fut_all.map(move |_| { runtime.spawn(FutureExt::map(fut_all, move |_| {
concurrent_connections.fetch_sub(1, Ordering::Relaxed); concurrent_connections.fetch_sub(1, Ordering::Relaxed);
})); }));
} }
@ -324,8 +325,14 @@ async fn start(
listeners: Vec<(TcpListener, std::net::UdpSocket)>, listeners: Vec<(TcpListener, std::net::UdpSocket)>,
) -> Result<(), Error> { ) -> Result<(), Error> {
for listener in listeners { for listener in listeners {
runtime.spawn(tcp_acceptor(globals.clone(), listener.0).map(|_| {})); runtime.spawn(FutureExt::map(
runtime.spawn(udp_acceptor(globals.clone(), listener.1).map(|_| {})); tcp_acceptor(globals.clone(), listener.0),
|_| {},
));
runtime.spawn(FutureExt::map(
udp_acceptor(globals.clone(), listener.1),
|_| {},
));
} }
Ok(()) Ok(())
} }
@ -564,14 +571,13 @@ fn main() -> Result<(), Error> {
if !state_is_new { if !state_is_new {
updater.update(); updater.update();
} }
runtime.spawn( runtime.spawn(FutureExt::map(
start(globals, runtime.clone(), listeners) start(globals, runtime.clone(), listeners).map_err(|e| {
.map_err(|e| { error!("Unable to start the service: [{}]", e);
error!("Unable to start the service: [{}]", e); std::process::exit(1);
std::process::exit(1); }),
}) |_| (),
.map(|_| ()), ));
);
runtime.block_on(updater.run()); runtime.block_on(updater.run());
Ok(()) Ok(())

Loading…
Cancel
Save