From 44e9622b7178320012f084011f2c7cead0018c65 Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Wed, 20 Mar 2024 12:53:24 +0100 Subject: [PATCH] Add queue for incoming activities, to sort them by `published` --- Cargo.lock | 3 +- Cargo.toml | 2 +- crates/apub/src/activities/mod.rs | 9 +- crates/apub/src/http/inbox.rs | 171 +++++++++++++++++++++++ crates/apub/src/http/mod.rs | 26 +--- crates/apub/src/http/routes.rs | 2 +- crates/db_views/src/custom_emoji_view.rs | 2 +- src/lib.rs | 5 + 8 files changed, 189 insertions(+), 31 deletions(-) create mode 100644 crates/apub/src/http/inbox.rs diff --git a/Cargo.lock b/Cargo.lock index 0069d3510..129126f04 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,8 +17,7 @@ checksum = "8f27d075294830fcab6f66e320dab524bc6d048f4a151698e153205559113772" [[package]] name = "activitypub_federation" version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a028034c642d3ed16b535f98f48b3df30397833c183d68852d79de16650d5ed5" +source = "git+https://github.com/LemmyNet/activitypub-federation-rust.git?branch=actix-inbox-parts#b393221b10ed6fdd879e78e6c842b9714baa0f95" dependencies = [ "activitystreams-kinds", "actix-web", diff --git a/Cargo.toml b/Cargo.toml index 66c3f9d47..eb8202cbc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -96,7 +96,7 @@ lemmy_routes = { version = "=0.19.3", path = "./crates/routes" } lemmy_db_views = { version = "=0.19.3", path = "./crates/db_views" } lemmy_db_views_actor = { version = "=0.19.3", path = "./crates/db_views_actor" } lemmy_db_views_moderator = { version = "=0.19.3", path = "./crates/db_views_moderator" } -activitypub_federation = { version = "0.5.2", default-features = false, features = [ +activitypub_federation = { git = "https://github.com/LemmyNet/activitypub-federation-rust.git", branch = "actix-inbox-parts", default-features = false, features = [ "actix-web", ] } diesel = "2.1.4" diff --git a/crates/apub/src/activities/mod.rs b/crates/apub/src/activities/mod.rs index 752568682..27064e289 100644 --- a/crates/apub/src/activities/mod.rs +++ b/crates/apub/src/activities/mod.rs @@ -45,6 +45,7 @@ use lemmy_db_schema::{ use lemmy_db_views_actor::structs::{CommunityPersonBanView, CommunityView}; use lemmy_utils::error::{LemmyError, LemmyErrorExt, LemmyErrorType, LemmyResult}; use serde::Serialize; +use serde_with::serde_derive::Deserialize; use tracing::info; use url::{ParseError, Url}; use uuid::Uuid; @@ -227,9 +228,9 @@ where /// Wrapper struct that adds `published` field with timestamp to outgoing activities. Important that /// the timestamp includes milliseconds and timezone. -#[derive(Serialize)] -struct WithPublished { - published: DateTime, +#[derive(Serialize, Deserialize)] +pub(crate) struct WithPublished { + pub(crate) published: Option>, #[serde(flatten)] inner: T, } @@ -237,7 +238,7 @@ struct WithPublished { impl WithPublished { pub fn new(inner: T) -> WithPublished { Self { - published: Local::now().into(), + published: Some(Local::now().into()), inner, } } diff --git a/crates/apub/src/http/inbox.rs b/crates/apub/src/http/inbox.rs new file mode 100644 index 000000000..b8ef837df --- /dev/null +++ b/crates/apub/src/http/inbox.rs @@ -0,0 +1,171 @@ +use crate::{ + activities::WithPublished, + activity_lists::SharedInboxActivities, + fetcher::user_or_community::UserOrCommunity, +}; +use activitypub_federation::{ + actix_web::inbox::{receive_activity, receive_activity_parts}, + config::Data, +}; +use actix_web::{http::header::HeaderMap, web::Bytes, HttpRequest, HttpResponse}; +use chrono::{DateTime, Local, TimeDelta, Utc}; +use http::{Method, Uri}; +use lemmy_api_common::context::LemmyContext; +use lemmy_utils::error::LemmyResult; +use once_cell::sync::Lazy; +use std::{ + cmp::Ordering, + collections::BinaryHeap, + sync::{Arc, Mutex}, + thread::available_parallelism, + time::Duration, +}; +use tokio::{spawn, task::JoinHandle, time::sleep}; +use tracing::info; + +/// Handle incoming activities. +pub async fn shared_inbox( + request: HttpRequest, + bytes: Bytes, + data: Data, +) -> LemmyResult { + match serde_json::from_slice::>(&bytes)?.published { + Some(published) => { + // includes published timestamp, insert to queue to ensure that activities are processed + // in correct order even when delivered out of order. + let request_parts = ( + request.headers().clone(), + request.method().clone(), + request.uri().clone(), + ); + ACTIVITY_QUEUE.lock().unwrap().push(InboxActivity { + request_parts, + bytes, + published, + }); + } + None => { + // no timestamp included, process immediately + receive_activity::( + request, bytes, &data, + ) + .await?; + } + }; + Ok(HttpResponse::Ok().finish()) +} + +/// Queue of incoming activities, ordered by oldest published first +static ACTIVITY_QUEUE: Lazy>>> = + Lazy::new(|| Arc::new(Mutex::new(BinaryHeap::new()))); + +/// Minimum age of an activity before it gets processed. This ensures that an activity which was +/// delayed still gets processed in correct order. +const RECEIVE_DELAY: Option = TimeDelta::try_seconds(1); + +pub fn handle_received_activities( + context: &Data, +) -> LemmyResult>> { + // launch one task per cpu core + let parallelism = available_parallelism()?.into(); + let workers = (0..parallelism) + .map(|_| { + let context = context.reset_request_count(); + spawn(async move { + loop { + let now = Local::now(); + if let Some(latest_timestamp) = peek_queue_timestamp() { + if latest_timestamp < now - RECEIVE_DELAY.unwrap() { + if let Some(a) = pop_queue() { + let parts = (&a.request_parts.0, &a.request_parts.1, &a.request_parts.2); + receive_activity_parts::( + parts, a.bytes, &context, + ) + .await + .inspect_err(|e| info!("Error receiving activity: {e}")) + .ok(); + } + } + } + // TODO: could sleep based on remaining time until head activity reaches 1s + // or simply use `WORK_FINISHED_RECHECK_DELAY` from lemmy_federate + sleep(Duration::from_millis(100)).await; + // TODO: need cancel? lemmy seems to shutdown just fine + } + }) + }) + .collect(); + + Ok(workers) +} + +fn peek_queue_timestamp() -> Option> { + ACTIVITY_QUEUE.lock().unwrap().peek().map(|i| i.published) +} + +fn pop_queue<'a>() -> Option { + ACTIVITY_QUEUE.lock().unwrap().pop() +} + +#[derive(Clone, Debug)] +struct InboxActivity { + // Need to store like this because HttpRequest is not Sync + request_parts: (HeaderMap, Method, Uri), + bytes: Bytes, + published: DateTime, +} + +impl PartialOrd for InboxActivity { + fn partial_cmp(&self, other: &Self) -> Option { + other.published.partial_cmp(&self.published) + } +} + +impl Ord for InboxActivity { + fn cmp(&self, other: &Self) -> Ordering { + other.published.cmp(&self.published) + } +} + +impl PartialEq for InboxActivity { + fn eq(&self, other: &Self) -> bool { + self.bytes.eq(&other.bytes) + } +} + +impl Eq for InboxActivity {} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn activity_queue_order() { + let activity1 = InboxActivity { + request_parts: Default::default(), + bytes: Default::default(), + published: Local::now().into(), + }; + let activity2 = InboxActivity { + request_parts: Default::default(), + bytes: Default::default(), + published: Local::now().into(), + }; + let activity3 = InboxActivity { + request_parts: Default::default(), + bytes: Default::default(), + published: Local::now().into(), + }; + let mut lock = ACTIVITY_QUEUE.lock().unwrap(); + + // insert in wrong order + lock.push(activity3.clone()); + lock.push(activity1.clone()); + lock.push(activity2.clone()); + + // should be popped in correct order + assert_eq!(activity1.published, lock.pop().unwrap().published); + assert_eq!(activity2.published, lock.pop().unwrap().published); + assert_eq!(activity3.published, lock.pop().unwrap().published); + } +} diff --git a/crates/apub/src/http/mod.rs b/crates/apub/src/http/mod.rs index b400e3dab..ece74fcd7 100644 --- a/crates/apub/src/http/mod.rs +++ b/crates/apub/src/http/mod.rs @@ -1,16 +1,6 @@ -use crate::{ - activity_lists::SharedInboxActivities, - fetcher::user_or_community::UserOrCommunity, - protocol::objects::tombstone::Tombstone, - FEDERATION_CONTEXT, -}; -use activitypub_federation::{ - actix_web::inbox::receive_activity, - config::Data, - protocol::context::WithContext, - FEDERATION_CONTENT_TYPE, -}; -use actix_web::{web, web::Bytes, HttpRequest, HttpResponse}; +use crate::{protocol::objects::tombstone::Tombstone, FEDERATION_CONTEXT}; +use activitypub_federation::{protocol::context::WithContext, FEDERATION_CONTENT_TYPE}; +use actix_web::{web, HttpResponse}; use http::{header::LOCATION, StatusCode}; use lemmy_api_common::context::LemmyContext; use lemmy_db_schema::{ @@ -25,20 +15,12 @@ use url::Url; mod comment; mod community; +pub mod inbox; mod person; mod post; pub mod routes; pub mod site; -pub async fn shared_inbox( - request: HttpRequest, - body: Bytes, - data: Data, -) -> LemmyResult { - receive_activity::(request, body, &data) - .await -} - /// Convert the data to json and turn it into an HTTP Response with the correct ActivityPub /// headers. /// diff --git a/crates/apub/src/http/routes.rs b/crates/apub/src/http/routes.rs index ab046afe1..6ef04831f 100644 --- a/crates/apub/src/http/routes.rs +++ b/crates/apub/src/http/routes.rs @@ -9,9 +9,9 @@ use crate::http::{ get_apub_community_outbox, }, get_activity, + inbox::shared_inbox, person::{get_apub_person_http, get_apub_person_outbox, person_inbox}, post::get_apub_post, - shared_inbox, site::{get_apub_site_http, get_apub_site_outbox}, }; use actix_web::{ diff --git a/crates/db_views/src/custom_emoji_view.rs b/crates/db_views/src/custom_emoji_view.rs index d83fa9912..4d2f1fd85 100644 --- a/crates/db_views/src/custom_emoji_view.rs +++ b/crates/db_views/src/custom_emoji_view.rs @@ -77,7 +77,7 @@ impl CustomEmojiView { } for emoji in &mut result { if let Some(keywords) = hash.get_mut(&emoji.custom_emoji.id) { - emoji.keywords = keywords.clone(); + emoji.keywords.clone_from(keywords); } } result diff --git a/src/lib.rs b/src/lib.rs index 1b2507f4e..bb688ceac 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,6 +25,7 @@ use actix_web::{ }; use actix_web_prom::PrometheusMetricsBuilder; use clap::Parser; +use futures_util::future::join_all; use lemmy_api_common::{ context::LemmyContext, lemmy_db_views::structs::SiteView, @@ -37,6 +38,7 @@ use lemmy_api_common::{ }; use lemmy_apub::{ activities::{handle_outgoing_activities, match_outgoing_activities}, + http::inbox::handle_received_activities, objects::instance::ApubSite, VerifyUrlData, FEDERATION_HTTP_FETCH_LIMIT, @@ -192,6 +194,8 @@ pub async fn start_lemmy_server(args: CmdArgs) -> Result<(), LemmyError> { .expect("set function pointer"); let request_data = federation_config.to_request_data(); let outgoing_activities_task = tokio::task::spawn(handle_outgoing_activities(request_data)); + let request_data = federation_config.to_request_data(); + let handle_received_activities_task = handle_received_activities(&request_data); let server = if !args.disable_http_server { if let Some(startup_server_handle) = startup_server_handle { @@ -239,6 +243,7 @@ pub async fn start_lemmy_server(args: CmdArgs) -> Result<(), LemmyError> { // Wait for outgoing apub sends to complete ActivityChannel::close(outgoing_activities_task).await?; + join_all(handle_received_activities_task?).await; Ok(()) }