diff --git a/Cargo.lock b/Cargo.lock index 129126f04..d22b1d04c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2712,6 +2712,7 @@ dependencies = [ "moka", "once_cell", "pretty_assertions", + "rand", "reqwest", "serde", "serde_json", @@ -2909,6 +2910,7 @@ dependencies = [ "pict-rs", "pretty_assertions", "prometheus", + "rand", "reqwest", "reqwest-middleware", "reqwest-tracing", diff --git a/Cargo.toml b/Cargo.toml index eb8202cbc..15ca34a83 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -202,6 +202,7 @@ prometheus = { version = "0.13.3", features = ["process"] } serial_test = { workspace = true } clap = { workspace = true } actix-web-prom = "0.7.0" +rand = "0.8.5" [dev-dependencies] pretty_assertions = { workspace = true } diff --git a/crates/apub/Cargo.toml b/crates/apub/Cargo.toml index 4c3189a09..f20dc507b 100644 --- a/crates/apub/Cargo.toml +++ b/crates/apub/Cargo.toml @@ -47,6 +47,7 @@ html2md = "0.2.14" html2text = "0.6.0" stringreader = "0.1.1" enum_delegate = "0.2.0" +rand = "0.8.5" [dev-dependencies] serial_test = { workspace = true } diff --git a/crates/apub/src/http/inbox.rs b/crates/apub/src/http/inbox.rs index 71215c032..87bc28791 100644 --- a/crates/apub/src/http/inbox.rs +++ b/crates/apub/src/http/inbox.rs @@ -11,17 +11,21 @@ use actix_web::{http::header::HeaderMap, web::Bytes, HttpRequest, HttpResponse}; use chrono::{DateTime, Local, TimeDelta, Utc}; use http::{Method, Uri}; use lemmy_api_common::context::LemmyContext; +use lemmy_db_schema::{newtypes::InstanceId, source::instance::Instance}; use lemmy_utils::error::LemmyResult; use once_cell::sync::Lazy; +use rand::seq::IteratorRandom; +use serde::Deserialize; use std::{ cmp::Ordering, - collections::BinaryHeap, + collections::{BinaryHeap, HashMap}, sync::{Arc, RwLock}, thread::available_parallelism, time::Duration, }; use tokio::{spawn, task::JoinHandle, time::sleep}; use tracing::info; +use url::Url; /// Handle incoming activities. pub async fn shared_inbox( @@ -38,7 +42,25 @@ pub async fn shared_inbox( request.method().clone(), request.uri().clone(), ); - ACTIVITY_QUEUE.write().unwrap().push(InboxActivity { + + #[derive(Deserialize)] + struct Id { + id: Url, + } + + let activity_id = serde_json::from_slice::(&bytes)?.id; + let domain = activity_id.domain().unwrap().to_string(); + let instance = Instance::read_or_create(&mut data.pool(), domain) + .await + .unwrap(); + + let mut lock = ACTIVITY_QUEUE.write().unwrap(); + let instance_queue = lock.entry(instance.id).or_insert(BinaryHeap::new()); + while instance_queue.len() > 5 { + // TODO: must not hold lock here + sleep(Duration::from_millis(100)).await; + } + instance_queue.push(InboxActivity { request_parts, bytes, published, @@ -56,8 +78,8 @@ pub async fn shared_inbox( } /// Queue of incoming activities, ordered by oldest published first -static ACTIVITY_QUEUE: Lazy>>> = - Lazy::new(|| Arc::new(RwLock::new(BinaryHeap::new()))); +static ACTIVITY_QUEUE: Lazy>>>> = + Lazy::new(|| Arc::new(RwLock::new(HashMap::new()))); /// Minimum age of an activity before it gets processed. This ensures that an activity which was /// delayed still gets processed in correct order. @@ -74,9 +96,13 @@ pub fn handle_received_activities( spawn(async move { loop { let now = Local::now(); - if let Some(latest_timestamp) = peek_queue_timestamp() { + let instance_id = { + let lock = ACTIVITY_QUEUE.read().unwrap(); + lock.keys().choose(&mut rand::thread_rng()).unwrap().clone() + }; + if let Some(latest_timestamp) = peek_queue_timestamp(&instance_id) { if latest_timestamp < now - RECEIVE_DELAY.unwrap() { - if let Some(a) = pop_queue() { + if let Some(a) = pop_queue(&instance_id) { let parts = (&a.request_parts.0, &a.request_parts.1, &a.request_parts.2); receive_activity_parts::( parts, a.bytes, &context, @@ -99,12 +125,23 @@ pub fn handle_received_activities( Ok(workers) } -fn peek_queue_timestamp() -> Option> { - ACTIVITY_QUEUE.read().unwrap().peek().map(|i| i.published) +fn peek_queue_timestamp(instance_id: &InstanceId) -> Option> { + ACTIVITY_QUEUE + .read() + .unwrap() + .get(instance_id) + .unwrap() + .peek() + .map(|i| i.published) } -fn pop_queue<'a>() -> Option { - ACTIVITY_QUEUE.write().unwrap().pop() +fn pop_queue<'a>(instance_id: &InstanceId) -> Option { + let mut lock = ACTIVITY_QUEUE.write().unwrap(); + let res = lock.get_mut(instance_id).unwrap().pop(); + if lock.is_empty() { + lock.remove(instance_id); + } + res } #[derive(Clone, Debug)]