diff --git a/crates/apub/src/activities/send/community.rs b/crates/apub/src/activities/send/community.rs index f5d321428..4f7e68dad 100644 --- a/crates/apub/src/activities/send/community.rs +++ b/crates/apub/src/activities/send/community.rs @@ -3,7 +3,11 @@ use crate::{ activity_queue::{send_activity_single_dest, send_to_community, send_to_community_followers}, check_is_apub_id_valid, extensions::context::lemmy_context, - fetcher::{get_or_fetch_and_upsert_actor, person::get_or_fetch_and_upsert_person}, + fetcher::{ + community::ReceiveAnnounceFunction, + get_or_fetch_and_upsert_actor, + person::get_or_fetch_and_upsert_person, + }, generate_moderators_url, insert_activity, objects::ToApub, @@ -244,6 +248,7 @@ impl CommunityType for Community { activity: AnyBase, object_actor: Option, context: &LemmyContext, + receive_announce: ReceiveAnnounceFunction<'_>, ) -> Result<(), LemmyError> { let inner_id = activity.id().context(location_info!())?; if inner_id.domain() == Some(&Settings::get().get_hostname_without_port()?) { @@ -255,7 +260,7 @@ impl CommunityType for Community { if let Some(actor_id) = object_actor { // Ignore errors, maybe its not actually an actor // TODO: should pass the actual request counter in, but that seems complicated - let actor = get_or_fetch_and_upsert_actor(&actor_id, context, &mut 0) + let actor = get_or_fetch_and_upsert_actor(&actor_id, context, &mut 0, receive_announce) .await .ok(); if let Some(actor) = actor { diff --git a/crates/apub/src/activity_queue.rs b/crates/apub/src/activity_queue.rs index 924b3dd61..e1a098d34 100644 --- a/crates/apub/src/activity_queue.rs +++ b/crates/apub/src/activity_queue.rs @@ -1,6 +1,7 @@ use crate::{ check_is_apub_id_valid, extensions::signatures::sign_and_send, + fetcher::community::ReceiveAnnounceFunction, insert_activity, ActorType, CommunityType, @@ -111,6 +112,7 @@ pub(crate) async fn send_to_community( community: &Community, object_actor: Option, context: &LemmyContext, + receive_announce: ReceiveAnnounceFunction<'_>, ) -> Result<(), LemmyError> where T: AsObject + Extends + Debug + BaseExt, @@ -120,7 +122,12 @@ where // if this is a local community, we need to do an announce from the community instead if community.local { community - .send_announce(activity.into_any_base()?, object_actor, context) + .send_announce( + activity.into_any_base()?, + object_actor, + context, + receive_announce, + ) .await?; } else { let inbox = community.get_shared_inbox_or_inbox_url(); diff --git a/crates/apub/src/fetcher/community.rs b/crates/apub/src/fetcher/community.rs index 6187ac10c..cf38976ed 100644 --- a/crates/apub/src/fetcher/community.rs +++ b/crates/apub/src/fetcher/community.rs @@ -6,10 +6,12 @@ use crate::{ should_refetch_actor, }, objects::FromApub, + ActorType, GroupExt, }; use activitystreams::{ actor::ApActorExt, + base::AnyBase, collection::{CollectionExt, OrderedCollection}, }; use anyhow::Context; @@ -24,8 +26,17 @@ use lemmy_db_views_actor::community_moderator_view::CommunityModeratorView; use lemmy_utils::{location_info, LemmyError}; use lemmy_websocket::LemmyContext; use log::debug; +use std::{future::Future, pin::Pin}; use url::Url; +pub(crate) type ReceiveAnnounceFunction<'a> = + fn( + &LemmyContext, + AnyBase, + &dyn ActorType, + &mut i32, + ) -> Pin> + 'a>>; + /// Get a community from its apub ID. /// /// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database. @@ -34,6 +45,7 @@ pub async fn get_or_fetch_and_upsert_community( apub_id: &Url, context: &LemmyContext, recursion_counter: &mut i32, + receive_announce: ReceiveAnnounceFunction<'_>, ) -> Result { let apub_id_owned = apub_id.to_owned(); let community = blocking(context.pool(), move |conn| { @@ -44,12 +56,19 @@ pub async fn get_or_fetch_and_upsert_community( match community { Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => { debug!("Fetching and updating from remote community: {}", apub_id); - fetch_remote_community(apub_id, context, Some(c), recursion_counter).await + fetch_remote_community( + apub_id, + context, + Some(c), + recursion_counter, + receive_announce, + ) + .await } Ok(c) => Ok(c), Err(NotFound {}) => { debug!("Fetching and creating remote community: {}", apub_id); - fetch_remote_community(apub_id, context, None, recursion_counter).await + fetch_remote_community(apub_id, context, None, recursion_counter, receive_announce).await } Err(e) => Err(e.into()), } @@ -63,6 +82,7 @@ async fn fetch_remote_community( context: &LemmyContext, old_community: Option, request_counter: &mut i32, + receive_announce: ReceiveAnnounceFunction<'_>, ) -> Result { let group = fetch_remote_object::(context.client(), apub_id, request_counter).await; @@ -87,7 +107,14 @@ async fn fetch_remote_community( // only fetch outbox for new communities, otherwise this can create an infinite loop if old_community.is_none() { let outbox = group.inner.outbox()?.context(location_info!())?; - fetch_community_outbox(context, outbox, &community, request_counter).await? + fetch_community_outbox( + context, + outbox, + &community, + request_counter, + receive_announce, + ) + .await? } Ok(community) @@ -147,6 +174,7 @@ async fn fetch_community_outbox( outbox: &Url, community: &Community, recursion_counter: &mut i32, + receive_announce: ReceiveAnnounceFunction<'_>, ) -> Result<(), LemmyError> { let outbox = fetch_remote_object::(context.client(), outbox, recursion_counter).await?; @@ -157,8 +185,7 @@ async fn fetch_community_outbox( } for activity in outbox_activities { - todo!("{:?} {:?} {:?}", activity, community, recursion_counter); - //receive_announce(context, activity, community, recursion_counter).await?; + receive_announce(context, activity, community, recursion_counter).await?; } Ok(()) diff --git a/crates/apub/src/fetcher/mod.rs b/crates/apub/src/fetcher/mod.rs index 477cecc87..8d423794b 100644 --- a/crates/apub/src/fetcher/mod.rs +++ b/crates/apub/src/fetcher/mod.rs @@ -6,7 +6,7 @@ pub mod search; use crate::{ fetcher::{ - community::get_or_fetch_and_upsert_community, + community::{get_or_fetch_and_upsert_community, ReceiveAnnounceFunction}, fetch::FetchError, person::get_or_fetch_and_upsert_person, }, @@ -46,8 +46,10 @@ pub async fn get_or_fetch_and_upsert_actor( apub_id: &Url, context: &LemmyContext, recursion_counter: &mut i32, + receive_announce: ReceiveAnnounceFunction<'_>, ) -> Result, LemmyError> { - let community = get_or_fetch_and_upsert_community(apub_id, context, recursion_counter).await; + let community = + get_or_fetch_and_upsert_community(apub_id, context, recursion_counter, receive_announce).await; let actor: Box = match community { Ok(c) => Box::new(c), Err(_) => Box::new(get_or_fetch_and_upsert_person(apub_id, context, recursion_counter).await?), diff --git a/crates/apub/src/lib.rs b/crates/apub/src/lib.rs index 89109318a..78dcb2512 100644 --- a/crates/apub/src/lib.rs +++ b/crates/apub/src/lib.rs @@ -14,7 +14,7 @@ use crate::{ person_extension::PersonExtension, signatures::{PublicKey, PublicKeyExtension}, }, - fetcher::community::get_or_fetch_and_upsert_community, + fetcher::community::{get_or_fetch_and_upsert_community, ReceiveAnnounceFunction}, }; use activitystreams::{ activity::Follow, @@ -217,6 +217,7 @@ pub trait CommunityType { activity: AnyBase, object: Option, context: &LemmyContext, + receive_announce: ReceiveAnnounceFunction<'_>, ) -> Result<(), LemmyError>; async fn send_add_mod( @@ -469,12 +470,14 @@ pub async fn get_community_from_to_or_cc( activity: &T, context: &LemmyContext, request_counter: &mut i32, + receive_announce: ReceiveAnnounceFunction<'_>, ) -> Result where T: AsObject, { for cid in get_activity_to_and_cc(activity) { - let community = get_or_fetch_and_upsert_community(&cid, context, request_counter).await; + let community = + get_or_fetch_and_upsert_community(&cid, context, request_counter, receive_announce).await; if community.is_ok() { return community; } diff --git a/crates/apub_receive/src/inbox/person_inbox.rs b/crates/apub_receive/src/inbox/person_inbox.rs index 66b6c95ac..e810b73fe 100644 --- a/crates/apub_receive/src/inbox/person_inbox.rs +++ b/crates/apub_receive/src/inbox/person_inbox.rs @@ -254,7 +254,8 @@ async fn receive_accept( .context(location_info!())?; let community = - get_or_fetch_and_upsert_community(&community_uri, context, request_counter).await?; + get_or_fetch_and_upsert_community(&community_uri, context, request_counter, receive_announce) + .await?; let community_id = community.id; let person_id = person.id;