pull/5/head
Frank Denis 5 years ago
parent 46c933e398
commit f7b2a1777a

@ -14,8 +14,9 @@ env_logger = "0.6.2"
failure = "0.1.5"
futures-preview = { version = "=0.3.0-alpha.18", features = ["compat", "async-await", "nightly", "io-compat", "cfg-target-has-atomic"] }
jemallocator = "0.3.2"
libsodium-sys="0.2.3"
libsodium-sys="0.2.4"
log = "0.4.8"
parking_lot = "0.9.0"
tokio = "=0.2.0-alpha.4"
[profile.release]

@ -34,59 +34,119 @@ use dnsstamps::{InformalProperty, WithInformalProperty};
use failure::{bail, ensure};
use futures::prelude::*;
use futures::{FutureExt, StreamExt};
use parking_lot::RwLock;
use std::convert::TryFrom;
use std::mem;
use std::net::SocketAddr;
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
use std::sync::Arc;
use tokio::net::{TcpListener, UdpSocket};
use tokio::net::{TcpListener, TcpStream, UdpSocket};
use tokio::prelude::*;
use tokio::runtime::Runtime;
use tokio::runtime::{current_thread::Handle, Runtime};
const DNSCRYPT_QUERY_MIN_SIZE: usize = 12;
const DNSCRYPT_QUERY_MAX_SIZE: usize = 512;
#[derive(Debug)]
struct UdpClientCtx {
udp_socket_fd: RawFd,
packet: Vec<u8>,
client_addr: SocketAddr,
}
#[derive(Debug)]
struct TcpClientCtx {
packet: Vec<u8>,
client_connection: TcpStream,
}
#[derive(Debug)]
enum ClientCtx {
Udp(UdpClientCtx),
Tcp(TcpClientCtx),
}
async fn respond_to_query(client_ctx: ClientCtx) -> Result<(), Error> {
match client_ctx {
ClientCtx::Udp(client_ctx) => {
let packet = client_ctx.packet;
let udp_socket = unsafe { std::net::UdpSocket::from_raw_fd(client_ctx.udp_socket_fd) };
let _ = udp_socket.send_to(&packet, client_ctx.client_addr);
mem::forget(udp_socket);
}
ClientCtx::Tcp(client_ctx) => {}
}
Ok(())
}
async fn handle_client_query(client_ctx: ClientCtx) -> Result<(), Error> {
// if let Some(synth_packet) =
// serve_certificates(&packet, &globals.provider_name, &globals.dnscrypt_certs)?
// {
// let _ = udp_socket.send_to(&synth_packet, client_addr).await;
// continue;
// }
// truncate(&mut packet);
// let _ = udp_socket.send_to(&packet, client_addr).await;
dbg!(&client_ctx);
respond_to_query(client_ctx).await
}
async fn tcp_acceptor(globals: Arc<Globals>, tcp_listener: TcpListener) -> Result<(), Error> {
let runtime = globals.runtime.clone();
let mut tcp_listener = tcp_listener.incoming();
while let Some(client) = tcp_listener.next().await {
let mut client = match client {
Ok(client) => client,
Err(_) => continue,
let mut client_connection: TcpStream = match client {
Ok(client_connection) => client_connection,
Err(e) => bail!(e),
};
let mut binlen = [0u8, 0];
client.read_exact(&mut binlen).await?;
let packet_len = BigEndian::read_u16(&binlen) as usize;
ensure!(
(DNSCRYPT_QUERY_MIN_SIZE..=DNSCRYPT_QUERY_MAX_SIZE).contains(&packet_len),
"Unexpected query size"
runtime.spawn(
async {
let mut binlen = [0u8, 0];
client_connection.read_exact(&mut binlen).await?;
let packet_len = BigEndian::read_u16(&binlen) as usize;
ensure!(
(DNSCRYPT_QUERY_MIN_SIZE..=DNSCRYPT_QUERY_MAX_SIZE).contains(&packet_len),
"Unexpected query size"
);
let mut packet = vec![0u8; packet_len];
client_connection.read_exact(&mut packet).await?;
let client_ctx = ClientCtx::Tcp(TcpClientCtx {
packet,
client_connection,
});
let _ = handle_client_query(client_ctx).await;
Ok(())
}
.map(|_| ()),
);
let mut packet = vec![0u8; packet_len];
client.read_exact(&mut packet).await?;
dbg!(packet);
}
Ok(())
}
async fn udp_acceptor(globals: Arc<Globals>, mut udp_listener: UdpSocket) -> Result<(), Error> {
async fn udp_acceptor(globals: Arc<Globals>, mut udp_socket: UdpSocket) -> Result<(), Error> {
let runtime = globals.runtime.clone();
loop {
let mut packet = vec![0u8; DNSCRYPT_QUERY_MAX_SIZE];
let (packet_len, client_addr) = udp_listener.recv_from(&mut packet).await?;
dbg!(&packet);
let mut packet = &mut packet[..packet_len];
if let Some(synth_packet) =
serve_certificates(&packet, &globals.provider_name, &globals.dnscrypt_certs)?
{
let _ = udp_listener.send_to(&synth_packet, client_addr).await;
continue;
}
truncate(&mut packet);
let _ = udp_listener.send_to(&packet, client_addr).await;
let (packet_len, client_addr) = udp_socket.recv_from(&mut packet).await?;
let udp_socket_fd = udp_socket.as_raw_fd();
packet.truncate(packet_len);
let client_ctx = ClientCtx::Udp(UdpClientCtx {
udp_socket_fd,
packet,
client_addr,
});
runtime.spawn(async { handle_client_query(client_ctx).await }.map(|_| ()));
}
}
async fn start(globals: Arc<Globals>, runtime: Arc<Runtime>) -> Result<(), Error> {
let socket_addr: SocketAddr = globals.listen_addr;
let tcp_listener = TcpListener::bind(&socket_addr).await?;
let udp_listener = UdpSocket::bind(&socket_addr).await?;
let udp_socket = UdpSocket::bind(&socket_addr).await?;
runtime.spawn(tcp_acceptor(globals.clone(), tcp_listener).map(|_| {}));
runtime.spawn(udp_acceptor(globals.clone(), udp_listener).map(|_| {}));
runtime.spawn(udp_acceptor(globals.clone(), udp_socket).map(|_| {}));
Ok(())
}

Loading…
Cancel
Save