diff --git a/Cargo.lock b/Cargo.lock index 1966bd228..cef3b7ac1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -181,7 +181,6 @@ dependencies = [ "actix-utils", "derive_more", "futures-core", - "http", "log", "pin-project-lite", "tokio-rustls", @@ -388,39 +387,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" -[[package]] -name = "awc" -version = "3.0.0-beta.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9fef22345ed9fc111adf13c3b1e48473136a2669a73680b82d18d6e71e720fe" -dependencies = [ - "actix-codec", - "actix-http", - "actix-rt", - "actix-service", - "actix-tls", - "actix-utils", - "ahash", - "base64 0.13.0", - "bytes", - "cfg-if", - "derive_more", - "futures-core", - "futures-util", - "h2", - "http", - "itoa", - "log", - "mime", - "percent-encoding", - "pin-project-lite", - "rand 0.8.4", - "serde", - "serde_json", - "serde_urlencoded", - "tokio", -] - [[package]] name = "background-jobs" version = "0.11.0" @@ -1726,7 +1692,6 @@ dependencies = [ "actix-web", "anyhow", "async-trait", - "awc", "background-jobs", "base64 0.13.0", "bcrypt", @@ -1788,7 +1753,6 @@ dependencies = [ "actix-web", "anyhow", "async-trait", - "awc", "background-jobs", "base64 0.13.0", "bcrypt", @@ -1834,7 +1798,6 @@ dependencies = [ "anyhow", "assert-json-diff", "async-trait", - "awc", "background-jobs", "bcrypt", "chrono", @@ -1968,9 +1931,9 @@ dependencies = [ "actix-web", "actix-web-actors", "anyhow", - "awc", "chrono", "diesel", + "futures", "lemmy_api_common", "lemmy_apub", "lemmy_db_schema", @@ -1979,10 +1942,13 @@ dependencies = [ "lemmy_utils", "lemmy_websocket", "once_cell", + "reqwest", + "reqwest-middleware", "rss", "serde", "sha2", "strum", + "tokio", "tracing", "url", ] @@ -3167,6 +3133,7 @@ dependencies = [ "serde_urlencoded", "tokio", "tokio-native-tls", + "tokio-util", "url", "wasm-bindgen", "wasm-bindgen-futures", diff --git a/crates/api/Cargo.toml b/crates/api/Cargo.toml index c2bc77faa..bec1de1a7 100644 --- a/crates/api/Cargo.toml +++ b/crates/api/Cargo.toml @@ -30,7 +30,6 @@ serde = { version = "1.0.130", features = ["derive"] } actix = "0.12.0" actix-web = { version = "4.0.0-beta.9", default-features = false } actix-rt = { version = "2.2.0", default-features = false } -awc = { version = "3.0.0-beta.8", default-features = false } rand = "0.8.4" strum = "0.21.0" strum_macros = "0.21.1" diff --git a/crates/api_crud/Cargo.toml b/crates/api_crud/Cargo.toml index a6d116573..9809ff3c3 100644 --- a/crates/api_crud/Cargo.toml +++ b/crates/api_crud/Cargo.toml @@ -25,7 +25,6 @@ serde = { version = "1.0.130", features = ["derive"] } actix = "0.12.0" actix-web = { version = "4.0.0-beta.9", default-features = false } actix-rt = { version = "2.2.0", default-features = false } -awc = { version = "3.0.0-beta.8", default-features = false } tracing = "0.1.29" rand = "0.8.4" strum = "0.21.0" diff --git a/crates/apub/Cargo.toml b/crates/apub/Cargo.toml index 47da38d60..7cd8608b1 100644 --- a/crates/apub/Cargo.toml +++ b/crates/apub/Cargo.toml @@ -30,7 +30,6 @@ serde_with = "1.10.0" actix = "0.12.0" actix-web = { version = "4.0.0-beta.9", default-features = false } actix-rt = { version = "2.2.0", default-features = false } -awc = { version = "3.0.0-beta.8", default-features = false } tracing = "0.1.29" rand = "0.8.4" strum = "0.21.0" diff --git a/crates/routes/Cargo.toml b/crates/routes/Cargo.toml index 25ab77f71..fd05ca7d1 100644 --- a/crates/routes/Cargo.toml +++ b/crates/routes/Cargo.toml @@ -26,10 +26,13 @@ actix-http = "3.0.0-beta.10" sha2 = "0.9.8" anyhow = "1.0.44" chrono = { version = "0.4.19", features = ["serde"] } +futures = "0.3.18" +reqwest = { version = "0.11.7", features = ["stream"] } +reqwest-middleware = "0.1.2" rss = "1.10.0" serde = { version = "1.0.130", features = ["derive"] } -awc = { version = "3.0.0-beta.8", default-features = false } url = { version = "2.2.2", features = ["serde"] } strum = "0.21.0" once_cell = "1.8.0" tracing = "0.1.29" +tokio = { version = "1", features = ["sync"] } diff --git a/crates/routes/src/images.rs b/crates/routes/src/images.rs index e5e33489d..d9e74e3e6 100644 --- a/crates/routes/src/images.rs +++ b/crates/routes/src/images.rs @@ -1,18 +1,18 @@ -use actix_http::http::header::ACCEPT_ENCODING; +use actix_http::{ + header::{HeaderName, HOST}, + http::header::ACCEPT_ENCODING, +}; use actix_web::{body::BodyStream, http::StatusCode, web::Data, *}; use anyhow::anyhow; -use awc::Client; +use futures::stream::{Stream, StreamExt}; use lemmy_utils::{claims::Claims, rate_limit::RateLimit, LemmyError}; use lemmy_websocket::LemmyContext; +use reqwest::Body; +use reqwest_middleware::{ClientWithMiddleware, RequestBuilder}; use serde::{Deserialize, Serialize}; use std::time::Duration; -pub fn config(cfg: &mut web::ServiceConfig, rate_limit: &RateLimit) { - let client = Client::builder() - .header("User-Agent", "pict-rs-frontend, v0.1.0") - .timeout(Duration::from_secs(30)) - .finish(); - +pub fn config(cfg: &mut web::ServiceConfig, client: ClientWithMiddleware, rate_limit: &RateLimit) { cfg .app_data(Data::new(client)) .service( @@ -43,10 +43,34 @@ struct PictrsParams { thumbnail: Option, } +fn adapt_request( + request: &HttpRequest, + client: &ClientWithMiddleware, + url: String, +) -> RequestBuilder { + // remove accept-encoding header so that pictrs doesnt compress the response + const INVALID_HEADERS: &[HeaderName] = &[ACCEPT_ENCODING, HOST]; + + let client_request = client + .request(request.method().clone(), url) + .timeout(Duration::from_secs(30)); + + request + .headers() + .iter() + .fold(client_request, |client_req, (key, value)| { + if INVALID_HEADERS.contains(key) { + client_req + } else { + client_req.header(key, value) + } + }) +} + async fn upload( req: HttpRequest, body: web::Payload, - client: web::Data, + client: web::Data, context: web::Data, ) -> Result { // TODO: check rate limit here @@ -58,32 +82,31 @@ async fn upload( return Ok(HttpResponse::Unauthorized().finish()); }; - let mut client_req = client.request_from( - format!("{}/image", pictrs_url(context.settings().pictrs_url)?), - req.head(), - ); - // remove content-encoding header so that pictrs doesnt send gzipped response - client_req.headers_mut().remove(ACCEPT_ENCODING); + let image_url = format!("{}/image", pictrs_url(context.settings().pictrs_url)?); + + let mut client_req = adapt_request(&req, &client, image_url); if let Some(addr) = req.head().peer_addr { - client_req = client_req.insert_header(("X-Forwarded-For", addr.to_string())) + client_req = client_req.header("X-Forwarded-For", addr.to_string()) }; - let mut res = client_req - .send_stream(body) + let res = client_req + .body(Body::wrap_stream(make_send(body))) + .send() .await .map_err(error::ErrorBadRequest)?; + let status = res.status(); let images = res.json::().await.map_err(error::ErrorBadRequest)?; - Ok(HttpResponse::build(res.status()).json(images)) + Ok(HttpResponse::build(status).json(images)) } async fn full_res( filename: web::Path, web::Query(params): web::Query, req: HttpRequest, - client: web::Data, + client: web::Data, context: web::Data, ) -> Result { let name = &filename.into_inner(); @@ -119,20 +142,19 @@ async fn full_res( async fn image( url: String, req: HttpRequest, - client: web::Data, + client: web::Data, ) -> Result { - let mut client_req = client.request_from(url, req.head()); - client_req.headers_mut().remove(ACCEPT_ENCODING); + let mut client_req = adapt_request(&req, &client, url); if let Some(addr) = req.head().peer_addr { - client_req = client_req.insert_header(("X-Forwarded-For", addr.to_string())) - }; + client_req = client_req.header("X-Forwarded-For", addr.to_string()); + } - let res = client_req - .no_decompress() - .send() - .await - .map_err(error::ErrorBadRequest)?; + if let Some(addr) = req.head().peer_addr { + client_req = client_req.header("X-Forwarded-For", addr.to_string()); + } + + let res = client_req.send().await.map_err(error::ErrorBadRequest)?; if res.status() == StatusCode::NOT_FOUND { return Ok(HttpResponse::NotFound().finish()); @@ -144,13 +166,13 @@ async fn image( client_res.insert_header((name.clone(), value.clone())); } - Ok(client_res.body(BodyStream::new(res))) + Ok(client_res.body(BodyStream::new(res.bytes_stream()))) } async fn delete( components: web::Path<(String, String)>, req: HttpRequest, - client: web::Data, + client: web::Data, context: web::Data, ) -> Result { let (token, file) = components.into_inner(); @@ -162,22 +184,59 @@ async fn delete( &file ); - let mut client_req = client.request_from(url, req.head()); - client_req.headers_mut().remove(ACCEPT_ENCODING); + let mut client_req = adapt_request(&req, &client, url); if let Some(addr) = req.head().peer_addr { - client_req = client_req.insert_header(("X-Forwarded-For", addr.to_string())) - }; + client_req = client_req.header("X-Forwarded-For", addr.to_string()); + } - let res = client_req - .no_decompress() - .send() - .await - .map_err(error::ErrorBadRequest)?; + let res = client_req.send().await.map_err(error::ErrorBadRequest)?; - Ok(HttpResponse::build(res.status()).body(BodyStream::new(res))) + Ok(HttpResponse::build(res.status()).body(BodyStream::new(res.bytes_stream()))) } fn pictrs_url(pictrs_url: Option) -> Result { pictrs_url.ok_or_else(|| anyhow!("images_disabled").into()) } + +fn make_send(mut stream: S) -> impl Stream + Send + Unpin + 'static +where + S: Stream + Unpin + 'static, + S::Item: Send, +{ + // NOTE: the 8 here is arbitrary + let (tx, rx) = tokio::sync::mpsc::channel(8); + + // NOTE: spawning stream into a new task can potentially hit this bug: + // - https://github.com/actix/actix-web/issues/1679 + // + // Since 4.0.0-beta.2 this issue is incredibly less frequent. I have not personally reproduced it. + // That said, it is still technically possible to encounter. + actix_web::rt::spawn(async move { + while let Some(res) = stream.next().await { + if tx.send(res).await.is_err() { + break; + } + } + }); + + SendStream { rx } +} + +struct SendStream { + rx: tokio::sync::mpsc::Receiver, +} + +impl Stream for SendStream +where + T: Send, +{ + type Item = T; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::pin::Pin::new(&mut self.rx).poll_recv(cx) + } +} diff --git a/src/main.rs b/src/main.rs index c65cf4479..cf29e7790 100644 --- a/src/main.rs +++ b/src/main.rs @@ -135,7 +135,7 @@ async fn main() -> Result<(), LemmyError> { .configure(|cfg| api_routes::config(cfg, &rate_limiter)) .configure(|cfg| lemmy_apub::http::routes::config(cfg, &settings)) .configure(feeds::config) - .configure(|cfg| images::config(cfg, &rate_limiter)) + .configure(|cfg| images::config(cfg, client.clone(), &rate_limiter)) .configure(nodeinfo::config) .configure(|cfg| webfinger::config(cfg, &settings)) })