Upgrade to tokio 1.0.0

cart
Frank Denis 3 years ago
parent 76097b7f4a
commit 156adf46c3

@ -12,20 +12,20 @@ categories = ["asynchronous", "network-programming","command-line-utilities"]
readme = "README.md" readme = "README.md"
[dependencies] [dependencies]
anyhow = "1.0.33" anyhow = "1.0.36"
byteorder = "1.3.4" byteorder = "1.3.4"
clap = { version = "2.33.3", default-features = false, features = ["wrap_help"] } clap = { version = "2.33.3", default-features = false, features = ["wrap_help"] }
clockpro-cache = "0.1.8" clockpro-cache = "0.1.9"
coarsetime = "0.1.16" coarsetime = "0.1.18"
daemonize-simple = "0.1.4" daemonize-simple = "0.1.4"
derivative = "2.1.1" derivative = "2.1.1"
dnsstamps = "0.1.4" dnsstamps = "0.1.4"
env_logger = { version = "0.8.1", default-features = false, features = ["humantime"] } env_logger = { version = "0.8.1", default-features = false, features = ["humantime"] }
futures = { version = "0.3.6", features = ["async-await"] } futures = { version = "0.3.8", features = ["async-await"] }
hyper = { version = "0.13.8", default_features = false, optional = true } hyper = { version = "0.14.0", default_features = false, features = ["server", "http1"], optional = true }
ipext = "0.1.0" ipext = "0.1.0"
jemallocator = "0.3.2" jemallocator = "0.3.2"
libsodium-sys-stable= "1.19.11" libsodium-sys-stable= "1.19.12"
log = { version = "0.4.11", features = ["std", "release_max_level_debug"] } log = { version = "0.4.11", features = ["std", "release_max_level_debug"] }
socket2 = "0.3" socket2 = "0.3"
parking_lot = "0.11" parking_lot = "0.11"
@ -36,7 +36,7 @@ serde = "1.0"
serde_derive = "1.0" serde_derive = "1.0"
serde-big-array = "0.3.0" serde-big-array = "0.3.0"
siphasher = "0.3" siphasher = "0.3"
tokio = { version = "0.2.22", features = ["fs", "rt-threaded", "time", "tcp", "udp", "stream", "parking_lot"] } tokio = { version = "1", features = ["net", "io-std", "io-util", "fs", "time", "rt-multi-thread", "parking_lot"] }
toml = "0.5" toml = "0.5"
[dependencies.prometheus] [dependencies.prometheus]

@ -78,7 +78,7 @@ pub async fn handle_anonymized_dns(
!= ANONYMIZED_DNSCRYPT_QUERY_MAGIC, != ANONYMIZED_DNSCRYPT_QUERY_MAGIC,
"Loop detected" "Loop detected"
); );
let mut ext_socket = match globals.external_addr { let ext_socket = match globals.external_addr {
Some(x) => UdpSocket::bind(x).await?, Some(x) => UdpSocket::bind(x).await?,
None => match upstream_address { None => match upstream_address {
SocketAddr::V4(_) => { SocketAddr::V4(_) => {

@ -6,7 +6,7 @@ use std::fs;
use std::mem; use std::mem;
use std::net::{IpAddr, SocketAddr}; use std::net::{IpAddr, SocketAddr};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use tokio::prelude::*; use tokio::io::AsyncWriteExt;
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone)]
pub struct AccessControlConfig { pub struct AccessControlConfig {

@ -69,8 +69,8 @@ use std::path::Path;
use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::net::{TcpListener, TcpStream, UdpSocket}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::prelude::*; use tokio::net::{TcpListener, TcpSocket, TcpStream, UdpSocket};
use tokio::runtime::Handle; use tokio::runtime::Handle;
use tokio::sync::oneshot; use tokio::sync::oneshot;
@ -127,7 +127,7 @@ pub async fn respond_to_query(client_ctx: ClientCtx, response: Vec<u8>) -> Resul
BigEndian::write_u16(&mut binlen[..], response_len as u16); BigEndian::write_u16(&mut binlen[..], response_len as u16);
client_connection.write_all(&binlen).await?; client_connection.write_all(&binlen).await?;
client_connection.write_all(&response).await?; client_connection.write_all(&response).await?;
client_connection.flush(); client_connection.flush().await?;
} }
} }
Ok(()) Ok(())
@ -249,41 +249,25 @@ async fn tls_proxy(
None => return Ok(()), None => return Ok(()),
Some(tls_upstream_addr) => tls_upstream_addr, Some(tls_upstream_addr) => tls_upstream_addr,
}; };
let std_socket = match globals.external_addr { let socket = match globals.external_addr {
Some(x @ SocketAddr::V4(_)) => { Some(x @ SocketAddr::V4(_)) => {
let kindy = socket2::Socket::new( let socket = TcpSocket::new_v4()?;
socket2::Domain::ipv4(), socket.set_reuseaddr(true).ok();
socket2::Type::stream(), socket.bind(x)?;
Some(socket2::Protocol::tcp()), socket
)?;
kindy.bind(&x.into())?;
kindy.into_tcp_stream()
} }
Some(x @ SocketAddr::V6(_)) => { Some(x @ SocketAddr::V6(_)) => {
let kindy = socket2::Socket::new( let socket = TcpSocket::new_v6()?;
socket2::Domain::ipv6(), socket.set_reuseaddr(true).ok();
socket2::Type::stream(), socket.bind(x)?;
Some(socket2::Protocol::tcp()), socket
)?;
kindy.bind(&x.into())?;
kindy.into_tcp_stream()
} }
None => match tls_upstream_addr { None => match tls_upstream_addr {
SocketAddr::V4(_) => socket2::Socket::new( SocketAddr::V4(_) => TcpSocket::new_v4()?,
socket2::Domain::ipv4(), SocketAddr::V6(_) => TcpSocket::new_v6()?,
socket2::Type::stream(),
Some(socket2::Protocol::tcp()),
)?
.into_tcp_stream(),
SocketAddr::V6(_) => socket2::Socket::new(
socket2::Domain::ipv6(),
socket2::Type::stream(),
Some(socket2::Protocol::tcp()),
)?
.into_tcp_stream(),
}, },
}; };
let mut ext_socket = TcpStream::connect_std(std_socket, tls_upstream_addr).await?; let mut ext_socket = socket.connect(*tls_upstream_addr).await?;
let (mut erh, mut ewh) = ext_socket.split(); let (mut erh, mut ewh) = ext_socket.split();
let (mut rh, mut wh) = client_connection.split(); let (mut rh, mut wh) = client_connection.split();
ewh.write_all(&binlen).await?; ewh.write_all(&binlen).await?;
@ -295,20 +279,12 @@ async fn tls_proxy(
} }
} }
async fn tcp_acceptor(globals: Arc<Globals>, mut tcp_listener: TcpListener) -> Result<(), Error> { async fn tcp_acceptor(globals: Arc<Globals>, tcp_listener: TcpListener) -> Result<(), Error> {
let runtime_handle = globals.runtime_handle.clone(); let runtime_handle = globals.runtime_handle.clone();
let mut tcp_listener = tcp_listener.incoming();
let timeout = globals.tcp_timeout; let timeout = globals.tcp_timeout;
let concurrent_connections = globals.tcp_concurrent_connections.clone(); let concurrent_connections = globals.tcp_concurrent_connections.clone();
let active_connections = globals.tcp_active_connections.clone(); let active_connections = globals.tcp_active_connections.clone();
while let Some(client) = tcp_listener.next().await { while let Ok((mut client_connection, _client_addr)) = tcp_listener.accept().await {
let mut client_connection: TcpStream = match client {
Ok(client_connection) => client_connection,
Err(e) => {
debug!("{}", e);
continue;
}
};
let (tx, rx) = oneshot::channel::<()>(); let (tx, rx) = oneshot::channel::<()>();
{ {
let mut active_connections = active_connections.lock(); let mut active_connections = active_connections.lock();
@ -363,7 +339,7 @@ async fn udp_acceptor(
net_udp_socket: std::net::UdpSocket, net_udp_socket: std::net::UdpSocket,
) -> Result<(), Error> { ) -> Result<(), Error> {
let runtime_handle = globals.runtime_handle.clone(); let runtime_handle = globals.runtime_handle.clone();
let mut tokio_udp_socket = UdpSocket::try_from(net_udp_socket.try_clone()?)?; let tokio_udp_socket = UdpSocket::try_from(net_udp_socket.try_clone()?)?;
let timeout = globals.udp_timeout; let timeout = globals.udp_timeout;
let concurrent_connections = globals.udp_concurrent_connections.clone(); let concurrent_connections = globals.udp_concurrent_connections.clone();
let active_connections = globals.udp_active_connections.clone(); let active_connections = globals.udp_active_connections.clone();
@ -456,6 +432,7 @@ fn bind_listeners(
kindy.into_tcp_listener() kindy.into_tcp_listener()
} }
}; };
tcp_listener.set_nonblocking(true)?;
let udp_socket = match listen_addr { let udp_socket = match listen_addr {
SocketAddr::V4(_) => { SocketAddr::V4(_) => {
let kindy = socket2::Socket::new( let kindy = socket2::Socket::new(
@ -479,6 +456,7 @@ fn bind_listeners(
kindy.into_udp_socket() kindy.into_udp_socket()
} }
}; };
udp_socket.set_nonblocking(true)?;
sockets.push((tcp_listener, udp_socket)) sockets.push((tcp_listener, udp_socket))
} }
Ok(sockets) Ok(sockets)
@ -565,11 +543,10 @@ fn main() -> Result<(), Error> {
}; };
let external_addr = config.external_addr.map(|addr| SocketAddr::new(addr, 0)); let external_addr = config.external_addr.map(|addr| SocketAddr::new(addr, 0));
let mut runtime_builder = tokio::runtime::Builder::new(); let mut runtime_builder = tokio::runtime::Builder::new_multi_thread();
runtime_builder.enable_all(); runtime_builder.enable_all();
runtime_builder.threaded_scheduler();
runtime_builder.thread_name("encrypted-dns-"); runtime_builder.thread_name("encrypted-dns-");
let mut runtime = runtime_builder.build()?; let runtime = runtime_builder.build()?;
let listen_addrs: Vec<_> = config.listen_addrs.iter().map(|x| x.local).collect(); let listen_addrs: Vec<_> = config.listen_addrs.iter().map(|x| x.local).collect();
let listeners = bind_listeners(&listen_addrs) let listeners = bind_listeners(&listen_addrs)

@ -77,7 +77,8 @@ pub async fn prometheus_service(
kindy.into_tcp_listener() kindy.into_tcp_listener()
} }
}; };
let mut stream = TcpListener::from_std(std_socket)?; std_socket.set_nonblocking(true)?;
let stream = TcpListener::from_std(std_socket)?;
let concurrent_connections = Arc::new(AtomicU32::new(0)); let concurrent_connections = Arc::new(AtomicU32::new(0));
loop { loop {
let (client, _client_addr) = stream.accept().await?; let (client, _client_addr) = stream.accept().await?;

@ -10,8 +10,8 @@ use siphasher::sip128::Hasher128;
use std::cmp; use std::cmp;
use std::hash::Hasher; use std::hash::Hasher;
use std::net::{Ipv6Addr, SocketAddr, SocketAddrV6}; use std::net::{Ipv6Addr, SocketAddr, SocketAddrV6};
use tokio::net::{TcpStream, UdpSocket}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::prelude::*; use tokio::net::{TcpSocket, UdpSocket};
pub async fn resolve_udp( pub async fn resolve_udp(
globals: &Globals, globals: &Globals,
@ -67,7 +67,8 @@ pub async fn resolve_udp(
} }
}, },
}; };
let mut ext_socket = UdpSocket::from_std(std_socket)?; std_socket.set_nonblocking(true)?;
let ext_socket = UdpSocket::from_std(std_socket)?;
ext_socket.connect(&globals.upstream_addr).await?; ext_socket.connect(&globals.upstream_addr).await?;
dns::set_edns_max_payload_size(&mut packet, DNS_MAX_PACKET_SIZE as u16)?; dns::set_edns_max_payload_size(&mut packet, DNS_MAX_PACKET_SIZE as u16)?;
let mut response; let mut response;
@ -110,47 +111,31 @@ pub async fn resolve_tcp(
packet_qname: &[u8], packet_qname: &[u8],
tid: u16, tid: u16,
) -> Result<Vec<u8>, Error> { ) -> Result<Vec<u8>, Error> {
let std_socket = match globals.external_addr { let socket = match globals.external_addr {
Some(x @ SocketAddr::V4(_)) => { Some(x @ SocketAddr::V4(_)) => {
let kindy = socket2::Socket::new( let socket = TcpSocket::new_v4()?;
socket2::Domain::ipv4(), socket.set_reuseaddr(true).ok();
socket2::Type::stream(), socket.bind(x)?;
Some(socket2::Protocol::tcp()), socket
)?;
kindy.bind(&x.into())?;
kindy.into_tcp_stream()
} }
Some(x @ SocketAddr::V6(_)) => { Some(x @ SocketAddr::V6(_)) => {
let kindy = socket2::Socket::new( let socket = TcpSocket::new_v6()?;
socket2::Domain::ipv6(), socket.set_reuseaddr(true).ok();
socket2::Type::stream(), socket.bind(x)?;
Some(socket2::Protocol::tcp()), socket
)?;
kindy.bind(&x.into())?;
kindy.into_tcp_stream()
} }
None => match globals.upstream_addr { None => match globals.upstream_addr {
SocketAddr::V4(_) => socket2::Socket::new( SocketAddr::V4(_) => TcpSocket::new_v4()?,
socket2::Domain::ipv4(), SocketAddr::V6(_) => TcpSocket::new_v6()?,
socket2::Type::stream(),
Some(socket2::Protocol::tcp()),
)?
.into_tcp_stream(),
SocketAddr::V6(_) => socket2::Socket::new(
socket2::Domain::ipv6(),
socket2::Type::stream(),
Some(socket2::Protocol::tcp()),
)?
.into_tcp_stream(),
}, },
}; };
let mut ext_socket = TcpStream::connect_std(std_socket, &globals.upstream_addr).await?; let mut ext_socket = socket.connect(globals.upstream_addr).await?;
ext_socket.set_nodelay(true)?; ext_socket.set_nodelay(true)?;
let mut binlen = [0u8, 0]; let mut binlen = [0u8, 0];
BigEndian::write_u16(&mut binlen[..], packet.len() as u16); BigEndian::write_u16(&mut binlen[..], packet.len() as u16);
ext_socket.write_all(&binlen).await?; ext_socket.write_all(&binlen).await?;
ext_socket.write_all(&packet).await?; ext_socket.write_all(&packet).await?;
ext_socket.flush(); ext_socket.flush().await?;
ext_socket.read_exact(&mut binlen).await?; ext_socket.read_exact(&mut binlen).await?;
let response_len = BigEndian::read_u16(&binlen) as usize; let response_len = BigEndian::read_u16(&binlen) as usize;
ensure!( ensure!(

Loading…
Cancel
Save