diff --git a/Cargo.lock b/Cargo.lock index 407cb9aae..b55aeb6b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2633,6 +2633,7 @@ dependencies = [ name = "lemmy_api_common" version = "0.18.1" dependencies = [ + "activitypub_federation", "actix-web", "anyhow", "chrono", @@ -2644,6 +2645,7 @@ dependencies = [ "lemmy_db_views_actor", "lemmy_db_views_moderator", "lemmy_utils", + "once_cell", "percent-encoding", "regex", "reqwest", diff --git a/crates/api_common/Cargo.toml b/crates/api_common/Cargo.toml index bef5ab285..8a23a4cb2 100644 --- a/crates/api_common/Cargo.toml +++ b/crates/api_common/Cargo.toml @@ -22,6 +22,7 @@ full = [ "lemmy_db_views/full", "lemmy_db_views_actor/full", "lemmy_db_views_moderator/full", + "activitypub_federation", "percent-encoding", "encoding", "reqwest-middleware", @@ -32,6 +33,7 @@ full = [ "reqwest", "actix-web", "futures", + "once_cell", ] [dependencies] @@ -40,6 +42,7 @@ lemmy_db_views_moderator = { workspace = true } lemmy_db_views_actor = { workspace = true } lemmy_db_schema = { workspace = true } lemmy_utils = { workspace = true, optional = true } +activitypub_federation = { workspace = true, optional = true } serde = { workspace = true } serde_with = { workspace = true } url = { workspace = true } @@ -59,5 +62,7 @@ uuid = { workspace = true, optional = true } tokio = { workspace = true, optional = true } reqwest = { workspace = true, optional = true } ts-rs = { workspace = true, optional = true } +once_cell = { workspace = true, optional = true } actix-web = { workspace = true, optional = true } +# necessary for wasmt compilation getrandom = { version = "0.2.10", features = ["js"] } diff --git a/crates/api_common/src/build_response.rs b/crates/api_common/src/build_response.rs index acb7355bd..a61da7f0b 100644 --- a/crates/api_common/src/build_response.rs +++ b/crates/api_common/src/build_response.rs @@ -64,7 +64,7 @@ pub async fn build_community_response( } pub async fn build_post_response( - context: &Data, + context: &LemmyContext, community_id: CommunityId, person_id: PersonId, post_id: PostId, diff --git a/crates/api_common/src/lib.rs b/crates/api_common/src/lib.rs index 224e114a5..652cbaf43 100644 --- a/crates/api_common/src/lib.rs +++ b/crates/api_common/src/lib.rs @@ -10,6 +10,8 @@ pub mod post; pub mod private_message; #[cfg(feature = "full")] pub mod request; +#[cfg(feature = "full")] +pub mod send_activity; pub mod sensitive; pub mod site; #[cfg(feature = "full")] diff --git a/crates/api_common/src/send_activity.rs b/crates/api_common/src/send_activity.rs new file mode 100644 index 000000000..a2bc9a6de --- /dev/null +++ b/crates/api_common/src/send_activity.rs @@ -0,0 +1,58 @@ +use crate::context::LemmyContext; +use activitypub_federation::config::Data; +use futures::future::BoxFuture; +use lemmy_db_schema::source::post::Post; +use lemmy_utils::{error::LemmyResult, SYNCHRONOUS_FEDERATION}; +use once_cell::sync::{Lazy, OnceCell}; +use tokio::sync::{ + mpsc, + mpsc::{UnboundedReceiver, UnboundedSender}, + Mutex, +}; + +type MatchOutgoingActivitiesBoxed = + Box fn(SendActivityData, &'a Data) -> BoxFuture<'a, LemmyResult<()>>>; + +/// This static is necessary so that activities can be sent out synchronously for tests. +pub static MATCH_OUTGOING_ACTIVITIES: OnceCell = OnceCell::new(); + +#[derive(Debug)] +pub enum SendActivityData { + CreatePost(Post), +} + +static ACTIVITY_CHANNEL: Lazy = Lazy::new(|| { + let (sender, receiver) = mpsc::unbounded_channel(); + ActivityChannel { + sender, + receiver: Mutex::new(receiver), + } +}); + +pub struct ActivityChannel { + sender: UnboundedSender, + receiver: Mutex>, +} + +impl ActivityChannel { + pub async fn retrieve_activity() -> Option { + let mut lock = ACTIVITY_CHANNEL.receiver.lock().await; + lock.recv().await + } + + pub async fn submit_activity( + data: SendActivityData, + context: &Data, + ) -> LemmyResult<()> { + if *SYNCHRONOUS_FEDERATION { + MATCH_OUTGOING_ACTIVITIES + .get() + .expect("retrieve function pointer")(data, context) + .await?; + } else { + let lock = &ACTIVITY_CHANNEL.sender; + lock.send(data)?; + } + Ok(()) + } +} diff --git a/crates/api_crud/src/lib.rs b/crates/api_crud/src/lib.rs index b9449ca69..e79342865 100644 --- a/crates/api_crud/src/lib.rs +++ b/crates/api_crud/src/lib.rs @@ -5,7 +5,7 @@ use lemmy_utils::error::LemmyError; mod comment; mod community; mod custom_emoji; -mod post; +pub mod post; mod private_message; mod site; mod user; diff --git a/crates/api_crud/src/post/create.rs b/crates/api_crud/src/post/create.rs index a7aafe812..16a6f0004 100644 --- a/crates/api_crud/src/post/create.rs +++ b/crates/api_crud/src/post/create.rs @@ -1,10 +1,11 @@ -use crate::PerformCrud; -use actix_web::web::Data; +use activitypub_federation::config::Data; +use actix_web::web::Json; use lemmy_api_common::{ build_response::build_post_response, context::LemmyContext, post::{CreatePost, PostResponse}, request::fetch_site_data, + send_activity::{ActivityChannel, SendActivityData}, utils::{ check_community_ban, check_community_deleted_or_removed, @@ -40,147 +41,145 @@ use tracing::Instrument; use url::Url; use webmention::{Webmention, WebmentionError}; -#[async_trait::async_trait(?Send)] -impl PerformCrud for CreatePost { - type Response = PostResponse; - - #[tracing::instrument(skip(context))] - async fn perform(&self, context: &Data) -> Result { - let data: &CreatePost = self; - let local_user_view = local_user_view_from_jwt(&data.auth, context).await?; - let local_site = LocalSite::read(&mut context.pool()).await?; - - let slur_regex = local_site_to_slur_regex(&local_site); - check_slurs(&data.name, &slur_regex)?; - check_slurs_opt(&data.body, &slur_regex)?; - honeypot_check(&data.honeypot)?; - - let data_url = data.url.as_ref(); - let url = data_url.map(clean_url_params).map(Into::into); // TODO no good way to handle a "clear" - - is_valid_post_title(&data.name)?; - is_valid_body_field(&data.body, true)?; - check_url_scheme(&data.url)?; - - check_community_ban( - local_user_view.person.id, - data.community_id, +#[tracing::instrument(skip(context))] +pub async fn create_post( + data: Json, + context: Data, +) -> Result, LemmyError> { + let local_user_view = local_user_view_from_jwt(&data.auth, &context).await?; + let local_site = LocalSite::read(&mut context.pool()).await?; + + let slur_regex = local_site_to_slur_regex(&local_site); + check_slurs(&data.name, &slur_regex)?; + check_slurs_opt(&data.body, &slur_regex)?; + honeypot_check(&data.honeypot)?; + + let data_url = data.url.as_ref(); + let url = data_url.map(clean_url_params).map(Into::into); // TODO no good way to handle a "clear" + + is_valid_post_title(&data.name)?; + is_valid_body_field(&data.body, true)?; + check_url_scheme(&data.url)?; + + check_community_ban( + local_user_view.person.id, + data.community_id, + &mut context.pool(), + ) + .await?; + check_community_deleted_or_removed(data.community_id, &mut context.pool()).await?; + + let community_id = data.community_id; + let community = Community::read(&mut context.pool(), community_id).await?; + if community.posting_restricted_to_mods { + let community_id = data.community_id; + let is_mod = CommunityView::is_mod_or_admin( &mut context.pool(), + local_user_view.local_user.person_id, + community_id, ) .await?; - check_community_deleted_or_removed(data.community_id, &mut context.pool()).await?; + if !is_mod { + return Err(LemmyErrorType::OnlyModsCanPostInCommunity)?; + } + } - let community_id = data.community_id; - let community = Community::read(&mut context.pool(), community_id).await?; - if community.posting_restricted_to_mods { - let community_id = data.community_id; - let is_mod = CommunityView::is_mod_or_admin( + // Fetch post links and pictrs cached image + let (metadata_res, thumbnail_url) = + fetch_site_data(context.client(), context.settings(), data_url, true).await; + let (embed_title, embed_description, embed_video_url) = metadata_res + .map(|u| (u.title, u.description, u.embed_video_url)) + .unwrap_or_default(); + + let language_id = match data.language_id { + Some(lid) => Some(lid), + None => { + default_post_language( &mut context.pool(), - local_user_view.local_user.person_id, community_id, + local_user_view.local_user.id, ) - .await?; - if !is_mod { - return Err(LemmyErrorType::OnlyModsCanPostInCommunity)?; - } + .await? } - - // Fetch post links and pictrs cached image - let (metadata_res, thumbnail_url) = - fetch_site_data(context.client(), context.settings(), data_url, true).await; - let (embed_title, embed_description, embed_video_url) = metadata_res - .map(|u| (u.title, u.description, u.embed_video_url)) - .unwrap_or_default(); - - let language_id = match data.language_id { - Some(lid) => Some(lid), - None => { - default_post_language( - &mut context.pool(), - community_id, - local_user_view.local_user.id, - ) - .await? - } - }; - CommunityLanguage::is_allowed_community_language( - &mut context.pool(), - language_id, - community_id, - ) + }; + CommunityLanguage::is_allowed_community_language(&mut context.pool(), language_id, community_id) .await?; - let post_form = PostInsertForm::builder() - .name(data.name.trim().to_owned()) - .url(url) - .body(data.body.clone()) - .community_id(data.community_id) - .creator_id(local_user_view.person.id) - .nsfw(data.nsfw) - .embed_title(embed_title) - .embed_description(embed_description) - .embed_video_url(embed_video_url) - .language_id(language_id) - .thumbnail_url(thumbnail_url) - .build(); - - let inserted_post = Post::create(&mut context.pool(), &post_form) - .await - .with_lemmy_type(LemmyErrorType::CouldntCreatePost)?; - - let inserted_post_id = inserted_post.id; - let protocol_and_hostname = context.settings().get_protocol_and_hostname(); - let apub_id = generate_local_apub_endpoint( - EndpointType::Post, - &inserted_post_id.to_string(), - &protocol_and_hostname, - )?; - let updated_post = Post::update( - &mut context.pool(), - inserted_post_id, - &PostUpdateForm::builder().ap_id(Some(apub_id)).build(), - ) + let post_form = PostInsertForm::builder() + .name(data.name.trim().to_owned()) + .url(url) + .body(data.body.clone()) + .community_id(data.community_id) + .creator_id(local_user_view.person.id) + .nsfw(data.nsfw) + .embed_title(embed_title) + .embed_description(embed_description) + .embed_video_url(embed_video_url) + .language_id(language_id) + .thumbnail_url(thumbnail_url) + .build(); + + let inserted_post = Post::create(&mut context.pool(), &post_form) .await .with_lemmy_type(LemmyErrorType::CouldntCreatePost)?; - // They like their own post by default - let person_id = local_user_view.person.id; - let post_id = inserted_post.id; - let like_form = PostLikeForm { - post_id, - person_id, - score: 1, - }; + let inserted_post_id = inserted_post.id; + let protocol_and_hostname = context.settings().get_protocol_and_hostname(); + let apub_id = generate_local_apub_endpoint( + EndpointType::Post, + &inserted_post_id.to_string(), + &protocol_and_hostname, + )?; + let updated_post = Post::update( + &mut context.pool(), + inserted_post_id, + &PostUpdateForm::builder().ap_id(Some(apub_id)).build(), + ) + .await + .with_lemmy_type(LemmyErrorType::CouldntCreatePost)?; + + // They like their own post by default + let person_id = local_user_view.person.id; + let post_id = inserted_post.id; + let like_form = PostLikeForm { + post_id, + person_id, + score: 1, + }; + + PostLike::like(&mut context.pool(), &like_form) + .await + .with_lemmy_type(LemmyErrorType::CouldntLikePost)?; + + ActivityChannel::submit_activity(SendActivityData::CreatePost(updated_post.clone()), &context) + .await?; - PostLike::like(&mut context.pool(), &like_form) - .await - .with_lemmy_type(LemmyErrorType::CouldntLikePost)?; - - // Mark the post as read - mark_post_as_read(person_id, post_id, &mut context.pool()).await?; - - if let Some(url) = updated_post.url.clone() { - let task = async move { - let mut webmention = - Webmention::new::(updated_post.ap_id.clone().into(), url.clone().into())?; - webmention.set_checked(true); - match webmention - .send() - .instrument(tracing::info_span!("Sending webmention")) - .await - { - Err(WebmentionError::NoEndpointDiscovered(_)) => Ok(()), - Ok(_) => Ok(()), - Err(e) => Err(e).with_lemmy_type(LemmyErrorType::CouldntSendWebmention), - } - }; - if *SYNCHRONOUS_FEDERATION { - task.await?; - } else { - spawn_try_task(task); + // Mark the post as read + mark_post_as_read(person_id, post_id, &mut context.pool()).await?; + + if let Some(url) = updated_post.url.clone() { + let task = async move { + let mut webmention = + Webmention::new::(updated_post.ap_id.clone().into(), url.clone().into())?; + webmention.set_checked(true); + match webmention + .send() + .instrument(tracing::info_span!("Sending webmention")) + .await + { + Err(WebmentionError::NoEndpointDiscovered(_)) => Ok(()), + Ok(_) => Ok(()), + Err(e) => Err(e).with_lemmy_type(LemmyErrorType::CouldntSendWebmention), } }; + if *SYNCHRONOUS_FEDERATION { + task.await?; + } else { + spawn_try_task(task); + } + }; - build_post_response(context, community_id, person_id, post_id).await - } + Ok(Json( + build_post_response(&context, community_id, person_id, post_id).await?, + )) } diff --git a/crates/api_crud/src/post/mod.rs b/crates/api_crud/src/post/mod.rs index d3d789a02..437955561 100644 --- a/crates/api_crud/src/post/mod.rs +++ b/crates/api_crud/src/post/mod.rs @@ -1,4 +1,4 @@ -mod create; +pub mod create; mod delete; mod read; mod remove; diff --git a/crates/apub/src/activities/create_or_update/post.rs b/crates/apub/src/activities/create_or_update/post.rs index 77199056d..4767114f9 100644 --- a/crates/apub/src/activities/create_or_update/post.rs +++ b/crates/apub/src/activities/create_or_update/post.rs @@ -24,7 +24,7 @@ use activitypub_federation::{ }; use lemmy_api_common::{ context::LemmyContext, - post::{CreatePost, EditPost, PostResponse}, + post::{EditPost, PostResponse}, }; use lemmy_db_schema::{ aggregates::structs::PostAggregates, @@ -39,25 +39,6 @@ use lemmy_db_schema::{ use lemmy_utils::error::{LemmyError, LemmyErrorType}; use url::Url; -#[async_trait::async_trait] -impl SendActivity for CreatePost { - type Response = PostResponse; - - async fn send_activity( - _request: &Self, - response: &Self::Response, - context: &Data, - ) -> Result<(), LemmyError> { - CreateOrUpdatePage::send( - &response.post_view.post, - response.post_view.creator.id, - CreateOrUpdateType::Create, - context, - ) - .await - } -} - #[async_trait::async_trait] impl SendActivity for EditPost { type Response = PostResponse; @@ -68,10 +49,10 @@ impl SendActivity for EditPost { context: &Data, ) -> Result<(), LemmyError> { CreateOrUpdatePage::send( - &response.post_view.post, + response.post_view.post.clone(), response.post_view.creator.id, CreateOrUpdateType::Update, - context, + context.reset_request_count(), ) .await } @@ -102,12 +83,12 @@ impl CreateOrUpdatePage { #[tracing::instrument(skip_all)] pub(crate) async fn send( - post: &Post, + post: Post, person_id: PersonId, kind: CreateOrUpdateType, - context: &Data, + context: Data, ) -> Result<(), LemmyError> { - let post = ApubPost(post.clone()); + let post = ApubPost(post); let community_id = post.community_id; let person: ApubPerson = Person::read(&mut context.pool(), person_id).await?.into(); let community: ApubCommunity = Community::read(&mut context.pool(), community_id) @@ -115,8 +96,8 @@ impl CreateOrUpdatePage { .into(); let create_or_update = - CreateOrUpdatePage::new(post, &person, &community, kind, context).await?; - let is_mod_action = create_or_update.object.is_mod_action(context).await?; + CreateOrUpdatePage::new(post, &person, &community, kind, &context).await?; + let is_mod_action = create_or_update.object.is_mod_action(&context).await?; let activity = AnnouncableActivities::CreateOrUpdatePost(create_or_update); send_activity_in_community( activity, @@ -124,7 +105,7 @@ impl CreateOrUpdatePage { &community, vec![], is_mod_action, - context, + &context, ) .await?; Ok(()) diff --git a/crates/apub/src/activities/mod.rs b/crates/apub/src/activities/mod.rs index 4fd8da536..02ad0b6b1 100644 --- a/crates/apub/src/activities/mod.rs +++ b/crates/apub/src/activities/mod.rs @@ -1,5 +1,6 @@ use crate::{ objects::{community::ApubCommunity, person::ApubPerson}, + protocol::activities::{create_or_update::page::CreateOrUpdatePage, CreateOrUpdateType}, CONTEXT, }; use activitypub_federation::{ @@ -11,7 +12,10 @@ use activitypub_federation::{ traits::{ActivityHandler, Actor}, }; use anyhow::anyhow; -use lemmy_api_common::context::LemmyContext; +use lemmy_api_common::{ + context::LemmyContext, + send_activity::{ActivityChannel, SendActivityData}, +}; use lemmy_db_schema::{ newtypes::CommunityId, source::{ @@ -21,7 +25,11 @@ use lemmy_db_schema::{ }, }; use lemmy_db_views_actor::structs::{CommunityPersonBanView, CommunityView}; -use lemmy_utils::error::{LemmyError, LemmyErrorExt, LemmyErrorType}; +use lemmy_utils::{ + error::{LemmyError, LemmyErrorExt, LemmyErrorType, LemmyResult}, + spawn_try_task, + SYNCHRONOUS_FEDERATION, +}; use moka::future::Cache; use once_cell::sync::Lazy; use serde::Serialize; @@ -197,3 +205,33 @@ where Ok(()) } + +pub async fn handle_outgoing_activities(context: Data) -> LemmyResult<()> { + while let Some(data) = ActivityChannel::retrieve_activity().await { + match_outgoing_activities(data, &context.reset_request_count()).await? + } + Ok(()) +} + +pub async fn match_outgoing_activities( + data: SendActivityData, + context: &Data, +) -> LemmyResult<()> { + let fed_task = match data { + SendActivityData::CreatePost(post) => { + let creator_id = post.creator_id; + CreateOrUpdatePage::send( + post, + creator_id, + CreateOrUpdateType::Create, + context.reset_request_count(), + ) + } + }; + if *SYNCHRONOUS_FEDERATION { + fed_task.await?; + } else { + spawn_try_task(fed_task); + } + Ok(()) +} diff --git a/src/api_routes_http.rs b/src/api_routes_http.rs index cb735f807..bc4340e3c 100644 --- a/src/api_routes_http.rs +++ b/src/api_routes_http.rs @@ -52,7 +52,6 @@ use lemmy_api_common::{ VerifyEmail, }, post::{ - CreatePost, CreatePostLike, CreatePostReport, DeletePost, @@ -93,7 +92,7 @@ use lemmy_api_common::{ PurgePost, }, }; -use lemmy_api_crud::PerformCrud; +use lemmy_api_crud::{post::create::create_post, PerformCrud}; use lemmy_apub::{ api::{ list_comments::list_comments, @@ -175,7 +174,7 @@ pub fn config(cfg: &mut web::ServiceConfig, rate_limit: &RateLimitCell) { web::resource("/post") .guard(guard::Post()) .wrap(rate_limit.post()) - .route(web::post().to(route_post_crud::)), + .route(web::post().to(create_post)), ) .service( web::scope("/post") diff --git a/src/lib.rs b/src/lib.rs index 55bb91606..e07ae2685 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,12 +21,17 @@ use lemmy_api_common::{ context::LemmyContext, lemmy_db_views::structs::SiteView, request::build_user_agent, + send_activity::MATCH_OUTGOING_ACTIVITIES, utils::{ check_private_instance_and_federation_enabled, local_site_rate_limit_to_rate_limit_config, }, }; -use lemmy_apub::{VerifyUrlData, FEDERATION_HTTP_FETCH_LIMIT}; +use lemmy_apub::{ + activities::{handle_outgoing_activities, match_outgoing_activities}, + VerifyUrlData, + FEDERATION_HTTP_FETCH_LIMIT, +}; use lemmy_db_schema::{ source::secret::Secret, utils::{build_db_pool, get_database_url, run_migrations}, @@ -165,9 +170,17 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> { .build() .expect("Should always be buildable"); + MATCH_OUTGOING_ACTIVITIES + .set(Box::new(move |d, c| { + Box::pin(match_outgoing_activities(d, c)) + })) + .expect("set function pointer"); + let request_data = federation_config.to_request_data(); + let outgoing_activities_task = tokio::task::spawn(handle_outgoing_activities(request_data)); + // Create Http server with websocket support HttpServer::new(move || { - let cors_origin = std::env::var("LEMMY_CORS_ORIGIN"); + let cors_origin = env::var("LEMMY_CORS_ORIGIN"); let cors_config = match (cors_origin, cfg!(debug_assertions)) { (Ok(origin), false) => Cors::default() .allowed_origin(&origin) @@ -213,6 +226,9 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> { .run() .await?; + // Wait for outgoing apub sends to complete + outgoing_activities_task.await??; + Ok(()) }