From 27e6097dc964a365b7bee83aa1ed14a784d8bf35 Mon Sep 17 00:00:00 2001 From: Frank Denis Date: Tue, 1 Oct 2019 20:58:51 +0200 Subject: [PATCH] Prometheus metrics --- Cargo.toml | 8 +++---- example-encrypted-dns.toml | 14 +++++++++++ src/config.rs | 20 ++++++++++++---- src/globals.rs | 8 ++++++- src/main.rs | 23 ++++++++++++++++-- src/metrics.rs | 49 +++++++++++++++++++++++++++++++------- 6 files changed, 102 insertions(+), 20 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index eef8e8f..b87f866 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,7 @@ derivative = "1.0.3" dnsstamps = "0.1.1" env_logger = { version="0.7.0", default-features = false, features = ["humantime"]} failure = "0.1.5" -futures-preview = { version = "=0.3.0-alpha.18", features = ["async-await", "nightly", "cfg-target-has-atomic"] } +futures-preview = { version = "=0.3.0-alpha.19", features = ["async-await", "unstable", "cfg-target-has-atomic"] } jemallocator = "0.3.2" libsodium-sys-stable="1.18.1" log = { version = "0.4.8", features = ["std", "release_max_level_debug"] } @@ -33,13 +33,13 @@ serde = "1.0.101" serde_derive = "1.0.101" serde-big-array = "0.1.5" siphasher = "0.3.1" -tokio = "=0.2.0-alpha.5" -tokio-net = "=0.2.0-alpha.5" +tokio = "=0.2.0-alpha.6" +tokio-net = "=0.2.0-alpha.6" toml = "0.5.3" [dependencies.hyper] optional = true -version = "0.13.0-alpha.2" +version = "0.13.0-alpha.3" default_features = false [dependencies.prometheus] diff --git a/example-encrypted-dns.toml b/example-encrypted-dns.toml index 8ae3dc6..e9f579f 100644 --- a/example-encrypted-dns.toml +++ b/example-encrypted-dns.toml @@ -166,3 +166,17 @@ key_cache_capacity = 10000 [filtering] # domain_blacklist = "/etc/domain_blacklist.txt" + + + +####################################### +# Server-side filtering # +####################################### + +[metrics] + +type = "prometheus" + +listen_addr = "0.0.0.0:9100" + +path = "/metrics" \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index dc36c8f..5b35cbc 100644 --- a/src/config.rs +++ b/src/config.rs @@ -9,7 +9,15 @@ use std::net::{IpAddr, SocketAddr}; use std::path::{Path, PathBuf}; use tokio::prelude::*; -#[derive(Serialize, Deserialize, Debug)] +#[cfg(feature = "metrics")] +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct MetricsConfig { + pub r#type: String, + pub listen_addr: SocketAddr, + pub path: String, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct DNSCryptConfig { pub provider_name: String, pub key_cache_capacity: usize, @@ -18,23 +26,23 @@ pub struct DNSCryptConfig { pub no_logs: bool, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct TLSConfig { pub upstream_addr: Option, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct ListenAddrConfig { pub local: SocketAddr, pub external: SocketAddr, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct FilteringConfig { pub domain_blacklist: Option, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct Config { pub listen_addrs: Vec, pub external_addr: IpAddr, @@ -57,6 +65,8 @@ pub struct Config { pub daemonize: bool, pub pid_file: Option, pub log_file: Option, + #[cfg(feature = "metrics")] + pub metrics: Option, } impl Config { diff --git a/src/globals.rs b/src/globals.rs index 4f96ea8..1f056b7 100644 --- a/src/globals.rs +++ b/src/globals.rs @@ -2,6 +2,8 @@ use crate::blacklist::*; use crate::cache::*; use crate::crypto::*; use crate::dnscrypt_certs::*; +#[cfg(feature = "metrics")] +use crate::varz::*; use parking_lot::{Mutex, RwLock}; use siphasher::sip128::SipHasher13; @@ -14,7 +16,8 @@ use std::time::Duration; use tokio::runtime::Runtime; use tokio::sync::oneshot; -#[derive(Debug, Clone)] +#[derive(Clone, Derivative)] +#[derivative(Debug)] pub struct Globals { pub runtime: Arc, pub state_file: PathBuf, @@ -37,4 +40,7 @@ pub struct Globals { pub hasher: SipHasher13, pub cache: Cache, pub blacklist: Option, + #[cfg(feature = "metrics")] + #[derivative(Debug = "ignore")] + pub varz: Varz, } diff --git a/src/main.rs b/src/main.rs index 0552f14..4c2cd64 100644 --- a/src/main.rs +++ b/src/main.rs @@ -46,6 +46,8 @@ use dnscrypt::*; use dnscrypt_certs::*; use errors::*; use globals::*; +#[cfg(feature = "metrics")] +use varz::*; use byteorder::{BigEndian, ByteOrder}; use clap::Arg; @@ -566,11 +568,30 @@ fn main() -> Result<(), Error> { hasher, cache, blacklist, + #[cfg(feature = "metrics")] + varz: Varz::default(), }); let updater = DNSCryptEncryptionParamsUpdater::new(globals.clone()); if !state_is_new { updater.update(); } + #[cfg(feature = "metrics")] + { + if let Some(metrics_config) = config.metrics { + runtime.spawn( + metrics::prometheus_service( + globals.varz.clone(), + metrics_config.clone(), + runtime.clone(), + ) + .map_err(|e| { + error!("Unable to start the metrics service: [{}]", e); + std::process::exit(1); + }) + .map(|_| ()), + ); + } + } runtime.spawn( start(globals, runtime.clone(), listeners) .map_err(|e| { @@ -579,8 +600,6 @@ fn main() -> Result<(), Error> { }) .map(|_| ()), ); - #[cfg(feature = "metrics")] - runtime.spawn(metrics::prometheus_service(runtime.clone()).map(|_| ())); runtime.block_on(updater.run()); Ok(()) } diff --git a/src/metrics.rs b/src/metrics.rs index 43c4a3e..d1a08e4 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1,23 +1,56 @@ +use crate::config::*; use crate::errors::*; +use crate::varz::*; -use futures::FutureExt; +use futures::prelude::*; +use hyper::header::CONTENT_TYPE; use hyper::server::conn::Http; use hyper::service::service_fn; -use hyper::{Body, Request, Response}; +use hyper::{Body, Request, Response, StatusCode}; +use prometheus::{self, Encoder, TextEncoder}; use std::sync::Arc; use tokio::net::TcpListener; use tokio::runtime::Runtime; -async fn handle_client_connection(_req: Request) -> Result, Error> { - let res = Response::new(Body::from("OK\n")); - Ok(res) +async fn handle_client_connection( + req: Request, + varz: Varz, + path: Arc, +) -> Result, Error> { + let mut buffer = vec![]; + if req.uri().path() != path.as_str() { + let response = Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::empty())?; + return Ok(response); + } + let StartInstant(start_instant) = varz.start_instant; + let uptime = start_instant.elapsed().as_secs(); + varz.uptime.set(uptime as f64); + let client_queries = varz.client_queries_udp.get() + varz.client_queries_tcp.get(); + varz.client_queries.set(client_queries); + let metric_families = prometheus::gather(); + let encoder = TextEncoder::new(); + encoder.encode(&metric_families, &mut buffer)?; + let response = Response::builder() + .header(CONTENT_TYPE, encoder.format_type()) + .body(buffer.into())?; + Ok(response) } -pub async fn prometheus_service(runtime: Arc) -> Result<(), Error> { - let mut stream = TcpListener::bind("0.0.0.0:8000").await?; +pub async fn prometheus_service( + varz: Varz, + metrics_config: MetricsConfig, + runtime: Arc, +) -> Result<(), Error> { + let path = Arc::new(metrics_config.path); + let mut stream = TcpListener::bind(metrics_config.listen_addr).await?; loop { let (client, _client_addr) = stream.accept().await?; - let service = service_fn(handle_client_connection); + let path = path.clone(); + let varz = varz.clone(); + let service = + service_fn(move |req| handle_client_connection(req, varz.clone(), path.clone())); let connection = Http::new().serve_connection(client, service); runtime.spawn(connection.map(|_| {})); }