diff --git a/Cargo.lock b/Cargo.lock index 0e8f5fc6b..fa7041fe3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -24,12 +24,14 @@ dependencies = [ "derive_builder", "dyn-clone", "enum_delegate", + "futures", "futures-core", "http", "http-signature-normalization", "http-signature-normalization-reqwest", "httpdate", "itertools 0.10.5", + "moka", "once_cell", "openssl", "pin-project-lite", @@ -1334,15 +1336,15 @@ dependencies = [ [[package]] name = "dashmap" -version = "5.4.0" +version = "5.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc" +checksum = "6943ae99c34386c84a470c499d3414f66502a41340aa895406e0d2e4a207b91d" dependencies = [ "cfg-if", - "hashbrown 0.12.3", + "hashbrown 0.14.0", "lock_api", "once_cell", - "parking_lot_core 0.9.4", + "parking_lot_core 0.9.8", ] [[package]] @@ -2765,6 +2767,7 @@ dependencies = [ name = "lemmy_db_views_actor" version = "0.18.1" dependencies = [ + "chrono", "diesel", "diesel-async", "lemmy_db_schema", @@ -2785,6 +2788,39 @@ dependencies = [ "ts-rs", ] +[[package]] +name = "lemmy_federate" +version = "0.18.1" +dependencies = [ + "activitypub_federation", + "anyhow", + "async-trait", + "bytes", + "chrono", + "dashmap", + "diesel", + "diesel-async", + "enum_delegate", + "futures", + "lemmy_api_common", + "lemmy_apub", + "lemmy_db_schema", + "lemmy_db_views_actor", + "lemmy_utils", + "moka", + "once_cell", + "openssl", + "reqwest", + "reqwest-middleware", + "reqwest-tracing", + "serde", + "serde_json", + "tokio", + "tokio-util", + "tracing", + "tracing-subscriber", +] + [[package]] name = "lemmy_routes" version = "0.18.1" @@ -3005,9 +3041,9 @@ checksum = "e34f76eb3611940e0e7d53a9aaa4e6a3151f69541a282fd0dad5571420c53ff1" [[package]] name = "lock_api" -version = "0.4.9" +version = "0.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "435011366fe56583b16cf956f9df0095b405b82d76425bc8981c0e22e60ec4df" +checksum = "c1cc9717a20b1bb222f333e6a92fd32f7d8a18ddc5a3191a11af45dcbf4dcd16" dependencies = [ "autocfg", "scopeguard", @@ -3615,7 +3651,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ "lock_api", - "parking_lot_core 0.9.4", + "parking_lot_core 0.9.8", ] [[package]] @@ -3627,22 +3663,22 @@ dependencies = [ "cfg-if", "instant", "libc", - "redox_syscall", + "redox_syscall 0.2.16", "smallvec", "winapi", ] [[package]] name = "parking_lot_core" -version = "0.9.4" +version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4dc9e0dc2adc1c69d09143aff38d3d30c5c3f0df0dad82e6d25547af174ebec0" +checksum = "93f00c865fe7cabf650081affecd3871070f26767e7b2070a3ffae14c654b447" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.3.5", "smallvec", - "windows-sys 0.42.0", + "windows-targets", ] [[package]] @@ -4293,6 +4329,15 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "redox_syscall" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "regex" version = "1.9.1" @@ -5164,7 +5209,7 @@ dependencies = [ "cfg-if", "fastrand", "libc", - "redox_syscall", + "redox_syscall 0.2.16", "remove_dir_all", "winapi", ] @@ -5393,9 +5438,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.4" +version = "0.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740" +checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" dependencies = [ "bytes", "futures-core", diff --git a/Cargo.toml b/Cargo.toml index f5268be24..062720420 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,7 @@ members = [ "crates/db_views_actor", "crates/db_views_actor", "crates/routes", + "crates/federate", ] [workspace.dependencies] @@ -67,9 +68,7 @@ lemmy_routes = { version = "=0.18.1", path = "./crates/routes" } lemmy_db_views = { version = "=0.18.1", path = "./crates/db_views" } lemmy_db_views_actor = { version = "=0.18.1", path = "./crates/db_views_actor" } lemmy_db_views_moderator = { version = "=0.18.1", path = "./crates/db_views_moderator" } -activitypub_federation = { version = "0.4.6", default-features = false, features = [ - "actix-web", -] } +activitypub_federation = { version = "0.4.6", default-features = false, features = ["actix-web"], git= "https://github.com/phiresky/activitypub-federation-rust/", branch="raw-sending" } diesel = "2.1.0" diesel_migrations = "2.1.0" diesel-async = "0.3.1" @@ -88,7 +87,6 @@ tracing-error = "0.2.0" tracing-log = "0.1.3" tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } url = { version = "2.4.0", features = ["serde"] } -url_serde = "0.2.0" reqwest = { version = "0.11.18", features = ["json", "blocking"] } reqwest-middleware = "0.2.2" reqwest-tracing = "0.4.5" @@ -120,7 +118,6 @@ futures = "0.3.28" http = "0.2.9" percent-encoding = "2.3.0" rosetta-i18n = "0.1.3" -rand = "0.8.5" opentelemetry = { version = "0.19.0", features = ["rt-tokio"] } tracing-opentelemetry = { version = "0.19.0" } ts-rs = { version = "6.2", features = ["serde-compat", "chrono-impl"] } diff --git a/crates/apub/src/activities/block/block_user.rs b/crates/apub/src/activities/block/block_user.rs index 55642f862..207c39903 100644 --- a/crates/apub/src/activities/block/block_user.rs +++ b/crates/apub/src/activities/block/block_user.rs @@ -27,6 +27,7 @@ use lemmy_api_common::{ }; use lemmy_db_schema::{ source::{ + activity::ActivitySendTargets, community::{ CommunityFollower, CommunityFollowerForm, @@ -97,12 +98,13 @@ impl BlockUser { match target { SiteOrCommunity::Site(_) => { - let inboxes = remote_instance_inboxes(&mut context.pool()).await?; + let mut inboxes = ActivitySendTargets::empty(); + inboxes.set_all_instances(true); send_lemmy_activity(context, block, mod_, inboxes, false).await } SiteOrCommunity::Community(c) => { let activity = AnnouncableActivities::BlockUser(block); - let inboxes = vec![user.shared_inbox_or_inbox()]; + let inboxes = ActivitySendTargets::to_inbox(user.shared_inbox_or_inbox()); send_activity_in_community(activity, mod_, c, inboxes, true, context).await } } diff --git a/crates/apub/src/activities/block/undo_block_user.rs b/crates/apub/src/activities/block/undo_block_user.rs index f68349794..4ea67720e 100644 --- a/crates/apub/src/activities/block/undo_block_user.rs +++ b/crates/apub/src/activities/block/undo_block_user.rs @@ -20,6 +20,7 @@ use activitypub_federation::{ use lemmy_api_common::context::LemmyContext; use lemmy_db_schema::{ source::{ + activity::ActivitySendTargets, community::{CommunityPersonBan, CommunityPersonBanForm}, moderator::{ModBan, ModBanForm, ModBanFromCommunity, ModBanFromCommunityForm}, person::{Person, PersonUpdateForm}, @@ -59,10 +60,10 @@ impl UndoBlockUser { audience, }; - let mut inboxes = vec![user.shared_inbox_or_inbox()]; + let mut inboxes = ActivitySendTargets::to_inbox(user.shared_inbox_or_inbox()); match target { SiteOrCommunity::Site(_) => { - inboxes.append(&mut remote_instance_inboxes(&mut context.pool()).await?); + inboxes.set_all_instances(true); send_lemmy_activity(context, undo, mod_, inboxes, false).await } SiteOrCommunity::Community(c) => { diff --git a/crates/apub/src/activities/community/announce.rs b/crates/apub/src/activities/community/announce.rs index ed489158e..c3e149a24 100644 --- a/crates/apub/src/activities/community/announce.rs +++ b/crates/apub/src/activities/community/announce.rs @@ -21,6 +21,7 @@ use activitypub_federation::{ traits::{ActivityHandler, Actor}, }; use lemmy_api_common::context::LemmyContext; +use lemmy_db_schema::source::activity::ActivitySendTargets; use lemmy_utils::error::{LemmyError, LemmyErrorType}; use serde_json::Value; use url::Url; @@ -92,7 +93,7 @@ impl AnnounceActivity { context: &Data, ) -> Result<(), LemmyError> { let announce = AnnounceActivity::new(object.clone(), community, context)?; - let inboxes = community.get_follower_inboxes(context).await?; + let inboxes = ActivitySendTargets::to_local_community_followers(community.id); send_lemmy_activity(context, announce, community, inboxes.clone(), false).await?; // Pleroma and Mastodon can't handle activities like Announce/Create/Page. So for diff --git a/crates/apub/src/activities/community/collection_add.rs b/crates/apub/src/activities/community/collection_add.rs index c36a8f0da..98a646c79 100644 --- a/crates/apub/src/activities/community/collection_add.rs +++ b/crates/apub/src/activities/community/collection_add.rs @@ -30,6 +30,7 @@ use lemmy_api_common::{ use lemmy_db_schema::{ impls::community::CollectionType, source::{ + activity::ActivitySendTargets, community::{Community, CommunityModerator, CommunityModeratorForm}, moderator::{ModAddCommunity, ModAddCommunityForm}, person::Person, @@ -64,7 +65,7 @@ impl CollectionAdd { }; let activity = AnnouncableActivities::CollectionAdd(add); - let inboxes = vec![added_mod.shared_inbox_or_inbox()]; + let inboxes = ActivitySendTargets::to_inbox(added_mod.shared_inbox_or_inbox()); send_activity_in_community(activity, actor, community, inboxes, true, context).await } @@ -89,7 +90,15 @@ impl CollectionAdd { audience: Some(community.id().into()), }; let activity = AnnouncableActivities::CollectionAdd(add); - send_activity_in_community(activity, actor, community, vec![], true, context).await + send_activity_in_community( + activity, + actor, + community, + ActivitySendTargets::empty(), + true, + context, + ) + .await } } diff --git a/crates/apub/src/activities/community/collection_remove.rs b/crates/apub/src/activities/community/collection_remove.rs index 28214284b..933247dd1 100644 --- a/crates/apub/src/activities/community/collection_remove.rs +++ b/crates/apub/src/activities/community/collection_remove.rs @@ -24,6 +24,7 @@ use lemmy_api_common::{ use lemmy_db_schema::{ impls::community::CollectionType, source::{ + activity::ActivitySendTargets, community::{Community, CommunityModerator, CommunityModeratorForm}, moderator::{ModAddCommunity, ModAddCommunityForm}, post::{Post, PostUpdateForm}, @@ -57,7 +58,7 @@ impl CollectionRemove { }; let activity = AnnouncableActivities::CollectionRemove(remove); - let inboxes = vec![removed_mod.shared_inbox_or_inbox()]; + let inboxes = ActivitySendTargets::to_inbox(removed_mod.shared_inbox_or_inbox()); send_activity_in_community(activity, actor, community, inboxes, true, context).await } @@ -82,7 +83,15 @@ impl CollectionRemove { audience: Some(community.id().into()), }; let activity = AnnouncableActivities::CollectionRemove(remove); - send_activity_in_community(activity, actor, community, vec![], true, context).await + send_activity_in_community( + activity, + actor, + community, + ActivitySendTargets::empty(), + true, + context, + ) + .await } } diff --git a/crates/apub/src/activities/community/lock_page.rs b/crates/apub/src/activities/community/lock_page.rs index 94135ede9..67a69cee8 100644 --- a/crates/apub/src/activities/community/lock_page.rs +++ b/crates/apub/src/activities/community/lock_page.rs @@ -27,6 +27,7 @@ use lemmy_api_common::{ }; use lemmy_db_schema::{ source::{ + activity::ActivitySendTargets, community::Community, post::{Post, PostUpdateForm}, }, @@ -150,7 +151,7 @@ impl SendActivity for LockPost { activity, &local_user_view.person.into(), &community.into(), - vec![], + ActivitySendTargets::empty(), true, context, ) diff --git a/crates/apub/src/activities/community/mod.rs b/crates/apub/src/activities/community/mod.rs index 7a88b34b3..c654a5c79 100644 --- a/crates/apub/src/activities/community/mod.rs +++ b/crates/apub/src/activities/community/mod.rs @@ -6,9 +6,8 @@ use crate::{ }; use activitypub_federation::{config::Data, traits::Actor}; use lemmy_api_common::context::LemmyContext; -use lemmy_db_schema::source::person::PersonFollower; +use lemmy_db_schema::source::{activity::ActivitySendTargets, person::PersonFollower}; use lemmy_utils::error::LemmyError; -use url::Url; pub mod announce; pub mod collection_add; @@ -34,7 +33,7 @@ pub(crate) async fn send_activity_in_community( activity: AnnouncableActivities, actor: &ApubPerson, community: &ApubCommunity, - extra_inboxes: Vec, + extra_inboxes: ActivitySendTargets, is_mod_action: bool, context: &Data, ) -> Result<(), LemmyError> { @@ -43,8 +42,8 @@ pub(crate) async fn send_activity_in_community( // send to user followers if !is_mod_action { - inboxes.extend( - &mut PersonFollower::list_followers(&mut context.pool(), actor.id) + inboxes.add_inboxes( + PersonFollower::list_followers(&mut context.pool(), actor.id) .await? .into_iter() .map(|p| ApubPerson(p).shared_inbox_or_inbox()), @@ -56,7 +55,7 @@ pub(crate) async fn send_activity_in_community( AnnounceActivity::send(activity.clone().try_into()?, community, context).await?; } else { // send to the community, which will then forward to followers - inboxes.push(community.shared_inbox_or_inbox()); + inboxes.add_inbox(community.shared_inbox_or_inbox()); } send_lemmy_activity(context, activity.clone(), actor, inboxes, false).await?; diff --git a/crates/apub/src/activities/community/report.rs b/crates/apub/src/activities/community/report.rs index 67b84644e..6a0a40a8b 100644 --- a/crates/apub/src/activities/community/report.rs +++ b/crates/apub/src/activities/community/report.rs @@ -20,6 +20,7 @@ use lemmy_api_common::{ }; use lemmy_db_schema::{ source::{ + activity::ActivitySendTargets, comment_report::{CommentReport, CommentReportForm}, post_report::{PostReport, PostReportForm}, }, @@ -94,8 +95,8 @@ impl Report { id: id.clone(), audience: Some(community.id().into()), }; - - let inbox = vec![community.shared_inbox_or_inbox()]; + // todo: this should probably filter and only send if the community is remote? + let inbox = ActivitySendTargets::to_inbox(community.shared_inbox_or_inbox()); send_lemmy_activity(context, report, actor, inbox, false).await } } diff --git a/crates/apub/src/activities/community/update.rs b/crates/apub/src/activities/community/update.rs index fe2477d6e..9621232df 100644 --- a/crates/apub/src/activities/community/update.rs +++ b/crates/apub/src/activities/community/update.rs @@ -22,7 +22,10 @@ use lemmy_api_common::{ context::LemmyContext, utils::local_user_view_from_jwt, }; -use lemmy_db_schema::{source::community::Community, traits::Crud}; +use lemmy_db_schema::{ + source::{activity::ActivitySendTargets, community::Community}, + traits::Crud, +}; use lemmy_utils::error::LemmyError; use url::Url; @@ -63,7 +66,15 @@ impl UpdateCommunity { }; let activity = AnnouncableActivities::UpdateCommunity(update); - send_activity_in_community(activity, actor, &community, vec![], true, context).await + send_activity_in_community( + activity, + actor, + &community, + ActivitySendTargets::empty(), + true, + context, + ) + .await } } diff --git a/crates/apub/src/activities/create_or_update/comment.rs b/crates/apub/src/activities/create_or_update/comment.rs index 51b87ed27..ca6e3b4d1 100644 --- a/crates/apub/src/activities/create_or_update/comment.rs +++ b/crates/apub/src/activities/create_or_update/comment.rs @@ -33,6 +33,7 @@ use lemmy_db_schema::{ aggregates::structs::CommentAggregates, newtypes::PersonId, source::{ + activity::ActivitySendTargets, comment::{Comment, CommentLike, CommentLikeForm}, community::Community, person::Person, @@ -128,10 +129,10 @@ impl CreateOrUpdateNote { .map(|t| t.href.clone()) .map(ObjectId::from) .collect(); - let mut inboxes = vec![]; + let mut inboxes = ActivitySendTargets::empty(); for t in tagged_users { let person = t.dereference(context).await?; - inboxes.push(person.shared_inbox_or_inbox()); + inboxes.add_inbox(person.shared_inbox_or_inbox()); } let activity = AnnouncableActivities::CreateOrUpdateComment(create_or_update); diff --git a/crates/apub/src/activities/create_or_update/post.rs b/crates/apub/src/activities/create_or_update/post.rs index 4767114f9..1f14b2df7 100644 --- a/crates/apub/src/activities/create_or_update/post.rs +++ b/crates/apub/src/activities/create_or_update/post.rs @@ -30,6 +30,7 @@ use lemmy_db_schema::{ aggregates::structs::PostAggregates, newtypes::PersonId, source::{ + activity::ActivitySendTargets, community::Community, person::Person, post::{Post, PostLike, PostLikeForm}, @@ -103,7 +104,7 @@ impl CreateOrUpdatePage { activity, &person, &community, - vec![], + ActivitySendTargets::empty(), is_mod_action, &context, ) diff --git a/crates/apub/src/activities/create_or_update/private_message.rs b/crates/apub/src/activities/create_or_update/private_message.rs index 3eaad2f71..afd15e8c6 100644 --- a/crates/apub/src/activities/create_or_update/private_message.rs +++ b/crates/apub/src/activities/create_or_update/private_message.rs @@ -19,7 +19,7 @@ use lemmy_api_common::{ }; use lemmy_db_schema::{ newtypes::PersonId, - source::{person::Person, private_message::PrivateMessage}, + source::{activity::ActivitySendTargets, person::Person, private_message::PrivateMessage}, traits::Crud, }; use lemmy_utils::error::LemmyError; @@ -89,7 +89,8 @@ impl CreateOrUpdateChatMessage { .await?, kind, }; - let inbox = vec![recipient.shared_inbox_or_inbox()]; + let inbox = ActivitySendTargets::to_inbox(recipient.shared_inbox_or_inbox()); + send_lemmy_activity(context, create_or_update, &sender, inbox, true).await } } diff --git a/crates/apub/src/activities/deletion/delete_user.rs b/crates/apub/src/activities/deletion/delete_user.rs index b388ed9e1..a4bd36919 100644 --- a/crates/apub/src/activities/deletion/delete_user.rs +++ b/crates/apub/src/activities/deletion/delete_user.rs @@ -16,6 +16,7 @@ use lemmy_api_common::{ person::{DeleteAccount, DeleteAccountResponse}, utils::{delete_user_account, local_user_view_from_jwt}, }; +use lemmy_db_schema::source::activity::ActivitySendTargets; use lemmy_utils::error::LemmyError; use url::Url; @@ -51,7 +52,9 @@ impl SendActivity for DeleteAccount { cc: vec![], }; - let inboxes = remote_instance_inboxes(&mut context.pool()).await?; + let mut inboxes = ActivitySendTargets::empty(); + inboxes.set_all_instances(true); + send_lemmy_activity(context, delete, &actor, inboxes, true).await?; Ok(()) } diff --git a/crates/apub/src/activities/deletion/mod.rs b/crates/apub/src/activities/deletion/mod.rs index 3b8c8b537..2199bc6ec 100644 --- a/crates/apub/src/activities/deletion/mod.rs +++ b/crates/apub/src/activities/deletion/mod.rs @@ -38,6 +38,7 @@ use lemmy_api_common::{ }; use lemmy_db_schema::{ source::{ + activity::ActivitySendTargets, comment::{Comment, CommentUpdateForm}, community::{Community, CommunityUpdateForm}, person::Person, @@ -238,7 +239,7 @@ async fn send_apub_delete_in_community( activity, &actor, &community.into(), - vec![], + ActivitySendTargets::empty(), is_mod_action, context, ) @@ -258,9 +259,9 @@ async fn send_apub_delete_private_message( .into(); let deletable = DeletableObjects::PrivateMessage(pm.into()); - let inbox = vec![recipient.shared_inbox_or_inbox()]; + let inbox = ActivitySendTargets::to_inbox(recipient.shared_inbox_or_inbox()); if deleted { - let delete = Delete::new(actor, deletable, recipient.id(), None, None, context)?; + let delete: Delete = Delete::new(actor, deletable, recipient.id(), None, None, context)?; send_lemmy_activity(context, delete, actor, inbox, true).await?; } else { let undo = UndoDelete::new(actor, deletable, recipient.id(), None, None, context)?; diff --git a/crates/apub/src/activities/following/accept.rs b/crates/apub/src/activities/following/accept.rs index adaad51d1..381b05930 100644 --- a/crates/apub/src/activities/following/accept.rs +++ b/crates/apub/src/activities/following/accept.rs @@ -10,7 +10,10 @@ use activitypub_federation::{ traits::{ActivityHandler, Actor}, }; use lemmy_api_common::context::LemmyContext; -use lemmy_db_schema::{source::community::CommunityFollower, traits::Followable}; +use lemmy_db_schema::{ + source::{activity::ActivitySendTargets, community::CommunityFollower}, + traits::Followable, +}; use lemmy_utils::error::LemmyError; use url::Url; @@ -29,7 +32,7 @@ impl AcceptFollow { &context.settings().get_protocol_and_hostname(), )?, }; - let inbox = vec![person.shared_inbox_or_inbox()]; + let inbox = ActivitySendTargets::to_inbox(person.shared_inbox_or_inbox()); send_lemmy_activity(context, accept, &user_or_community, inbox, true).await } } diff --git a/crates/apub/src/activities/following/follow.rs b/crates/apub/src/activities/following/follow.rs index 2f0f5037a..0649be16f 100644 --- a/crates/apub/src/activities/following/follow.rs +++ b/crates/apub/src/activities/following/follow.rs @@ -28,6 +28,7 @@ use lemmy_api_common::{ }; use lemmy_db_schema::{ source::{ + activity::ActivitySendTargets, community::{Community, CommunityFollower, CommunityFollowerForm}, person::{PersonFollower, PersonFollowerForm}, }, @@ -70,7 +71,8 @@ impl Follow { .ok(); let follow = Follow::new(actor, community, context)?; - let inbox = vec![community.shared_inbox_or_inbox()]; + // todo: this should probably filter and only send if the community is remote? + let inbox = ActivitySendTargets::to_inbox(community.shared_inbox_or_inbox()); send_lemmy_activity(context, follow, actor, inbox, true).await } } diff --git a/crates/apub/src/activities/following/undo_follow.rs b/crates/apub/src/activities/following/undo_follow.rs index c36b36df8..b3718a88e 100644 --- a/crates/apub/src/activities/following/undo_follow.rs +++ b/crates/apub/src/activities/following/undo_follow.rs @@ -14,6 +14,7 @@ use activitypub_federation::{ use lemmy_api_common::context::LemmyContext; use lemmy_db_schema::{ source::{ + activity::ActivitySendTargets, community::{CommunityFollower, CommunityFollowerForm}, person::{PersonFollower, PersonFollowerForm}, }, @@ -40,7 +41,8 @@ impl UndoFollow { &context.settings().get_protocol_and_hostname(), )?, }; - let inbox = vec![community.shared_inbox_or_inbox()]; + // todo: this should probably filter and only send if the community is remote? + let inbox = ActivitySendTargets::to_inbox(community.shared_inbox_or_inbox()); send_lemmy_activity(context, undo, actor, inbox, true).await } } diff --git a/crates/apub/src/activities/mod.rs b/crates/apub/src/activities/mod.rs index 02ad0b6b1..e2513cb20 100644 --- a/crates/apub/src/activities/mod.rs +++ b/crates/apub/src/activities/mod.rs @@ -4,7 +4,6 @@ use crate::{ CONTEXT, }; use activitypub_federation::{ - activity_queue::send_activity, config::Data, fetch::object_id::ObjectId, kinds::public, @@ -12,17 +11,14 @@ use activitypub_federation::{ traits::{ActivityHandler, Actor}, }; use anyhow::anyhow; -use lemmy_api_common::{ - context::LemmyContext, - send_activity::{ActivityChannel, SendActivityData}, -}; +use lemmy_api_common::context::LemmyContext; use lemmy_db_schema::{ newtypes::CommunityId, source::{ - activity::{SentActivity, SentActivityForm}, + activity::{ActivityInsertForm, ActivitySendTargets, ActorType}, community::Community, - instance::Instance, }, + traits::Crud, }; use lemmy_db_views_actor::structs::{CommunityPersonBanView, CommunityView}; use lemmy_utils::{ @@ -163,17 +159,21 @@ where Url::parse(&id) } +pub(crate) trait GetActorType { + fn actor_type(&self) -> ActorType; +} + #[tracing::instrument(skip_all)] async fn send_lemmy_activity( data: &Data, activity: Activity, actor: &ActorT, - mut inbox: Vec, + send_targets: ActivitySendTargets, sensitive: bool, ) -> Result<(), LemmyError> where Activity: ActivityHandler + Serialize + Send + Sync + Clone, - ActorT: Actor, + ActorT: Actor + GetActorType, Activity: ActivityHandler, { static CACHE: Lazy>>> = Lazy::new(|| { @@ -199,6 +199,8 @@ where ap_id: activity.id().clone().into(), data: serde_json::to_value(activity.clone())?, sensitive, + send_targets, + actor_apub_id: actor.id().into(), }; SentActivity::create(&mut data.pool(), form).await?; send_activity(activity, actor, inbox, data).await?; diff --git a/crates/apub/src/activities/voting/mod.rs b/crates/apub/src/activities/voting/mod.rs index 24250c501..947ae14be 100644 --- a/crates/apub/src/activities/voting/mod.rs +++ b/crates/apub/src/activities/voting/mod.rs @@ -20,6 +20,7 @@ use lemmy_api_common::{ use lemmy_db_schema::{ newtypes::CommunityId, source::{ + activity::ActivitySendTargets, comment::{CommentLike, CommentLikeForm}, community::Community, person::Person, @@ -91,17 +92,18 @@ async fn send_activity( .await? .into(); + let empty = ActivitySendTargets::empty(); // score of 1 means upvote, -1 downvote, 0 undo a previous vote if score != 0 { let vote = Vote::new(object_id, &actor, &community, score.try_into()?, context)?; let activity = AnnouncableActivities::Vote(vote); - send_activity_in_community(activity, &actor, &community, vec![], false, context).await + send_activity_in_community(activity, &actor, &community, empty, false, context).await } else { // Lemmy API doesnt distinguish between Undo/Like and Undo/Dislike, so we hardcode it here. let vote = Vote::new(object_id, &actor, &community, VoteType::Like, context)?; let undo_vote = UndoVote::new(vote, &actor, &community, context)?; let activity = AnnouncableActivities::UndoVote(undo_vote); - send_activity_in_community(activity, &actor, &community, vec![], false, context).await + send_activity_in_community(activity, &actor, &community, empty, false, context).await } } diff --git a/crates/apub/src/fetcher/mod.rs b/crates/apub/src/fetcher/mod.rs index 701d10383..86b2d7fc2 100644 --- a/crates/apub/src/fetcher/mod.rs +++ b/crates/apub/src/fetcher/mod.rs @@ -12,6 +12,7 @@ use lemmy_utils::error::LemmyError; pub mod post_or_comment; pub mod search; +pub mod site_or_community_or_user; pub mod user_or_community; /// Resolve actor identifier like `!news@example.com` to user or community object. diff --git a/crates/apub/src/fetcher/site_or_community_or_user.rs b/crates/apub/src/fetcher/site_or_community_or_user.rs new file mode 100644 index 000000000..ed64306e3 --- /dev/null +++ b/crates/apub/src/fetcher/site_or_community_or_user.rs @@ -0,0 +1,108 @@ +use crate::{ + fetcher::user_or_community::{PersonOrGroup, UserOrCommunity}, + objects::instance::ApubSite, + protocol::objects::instance::Instance, +}; +use activitypub_federation::{ + config::Data, + traits::{Actor, Object}, +}; +use chrono::NaiveDateTime; +use lemmy_api_common::context::LemmyContext; +use lemmy_utils::error::LemmyError; +use reqwest::Url; +use serde::{Deserialize, Serialize}; + +// todo: maybe this enum should be somewhere else? +#[derive(Debug)] +pub enum SiteOrCommunityOrUser { + Site(ApubSite), + UserOrCommunity(UserOrCommunity), +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +#[serde(untagged)] +pub enum SiteOrPersonOrGroup { + Instance(Instance), + PersonOrGroup(PersonOrGroup), +} + +#[async_trait::async_trait] +impl Object for SiteOrCommunityOrUser { + type DataType = LemmyContext; + type Kind = SiteOrPersonOrGroup; + type Error = LemmyError; + + fn last_refreshed_at(&self) -> Option { + Some(match self { + SiteOrCommunityOrUser::Site(p) => p.last_refreshed_at, + SiteOrCommunityOrUser::UserOrCommunity(p) => p.last_refreshed_at()?, + }) + } + + #[tracing::instrument(skip_all)] + async fn read_from_id( + object_id: Url, + data: &Data, + ) -> Result, LemmyError> { + unimplemented!(); + } + + #[tracing::instrument(skip_all)] + async fn delete(self, data: &Data) -> Result<(), LemmyError> { + match self { + SiteOrCommunityOrUser::Site(p) => p.delete(data).await, + SiteOrCommunityOrUser::UserOrCommunity(p) => p.delete(data).await, + } + } + + async fn into_json(self, _data: &Data) -> Result { + unimplemented!() + } + + #[tracing::instrument(skip_all)] + async fn verify( + apub: &Self::Kind, + expected_domain: &Url, + data: &Data, + ) -> Result<(), LemmyError> { + match apub { + SiteOrPersonOrGroup::Instance(a) => ApubSite::verify(a, expected_domain, data).await, + SiteOrPersonOrGroup::PersonOrGroup(a) => { + UserOrCommunity::verify(a, expected_domain, data).await + } + } + } + + #[tracing::instrument(skip_all)] + async fn from_json(apub: Self::Kind, data: &Data) -> Result { + unimplemented!(); + } +} + +impl Actor for SiteOrCommunityOrUser { + fn id(&self) -> Url { + match self { + SiteOrCommunityOrUser::Site(u) => u.id(), + SiteOrCommunityOrUser::UserOrCommunity(c) => c.id(), + } + } + + fn public_key_pem(&self) -> &str { + match self { + SiteOrCommunityOrUser::Site(p) => p.public_key_pem(), + SiteOrCommunityOrUser::UserOrCommunity(p) => p.public_key_pem(), + } + } + + fn private_key_pem(&self) -> Option { + match self { + SiteOrCommunityOrUser::Site(p) => p.private_key_pem(), + SiteOrCommunityOrUser::UserOrCommunity(p) => p.private_key_pem(), + } + } + + fn inbox(&self) -> Url { + unimplemented!() + } +} diff --git a/crates/apub/src/fetcher/user_or_community.rs b/crates/apub/src/fetcher/user_or_community.rs index d872c4e24..2763af2f0 100644 --- a/crates/apub/src/fetcher/user_or_community.rs +++ b/crates/apub/src/fetcher/user_or_community.rs @@ -1,4 +1,5 @@ use crate::{ + activities::GetActorType, objects::{community::ApubCommunity, person::ApubPerson}, protocol::objects::{group::Group, person::Person}, }; @@ -8,6 +9,7 @@ use activitypub_federation::{ }; use chrono::NaiveDateTime; use lemmy_api_common::context::LemmyContext; +use lemmy_db_schema::source::activity::ActorType; use lemmy_utils::error::LemmyError; use serde::{Deserialize, Serialize}; use url::Url; @@ -119,3 +121,12 @@ impl Actor for UserOrCommunity { unimplemented!() } } + +impl GetActorType for UserOrCommunity { + fn actor_type(&self) -> ActorType { + match self { + UserOrCommunity::User(p) => p.actor_type(), + UserOrCommunity::Community(p) => p.actor_type(), + } + } +} diff --git a/crates/apub/src/lib.rs b/crates/apub/src/lib.rs index 9a45284f2..5ce5182a6 100644 --- a/crates/apub/src/lib.rs +++ b/crates/apub/src/lib.rs @@ -13,7 +13,7 @@ use std::{sync::Arc, time::Duration}; use url::Url; pub mod activities; -pub(crate) mod activity_lists; +pub mod activity_lists; pub mod api; pub(crate) mod collections; pub mod fetcher; diff --git a/crates/apub/src/objects/community.rs b/crates/apub/src/objects/community.rs index 75eb941b1..665d9db8c 100644 --- a/crates/apub/src/objects/community.rs +++ b/crates/apub/src/objects/community.rs @@ -1,4 +1,5 @@ use crate::{ + activities::GetActorType, check_apub_id_valid, local_site_data_cached, objects::instance::fetch_instance_actor_for_object, @@ -20,6 +21,7 @@ use lemmy_api_common::{ }; use lemmy_db_schema::{ source::{ + activity::ActorType, actor_language::CommunityLanguage, community::{Community, CommunityUpdateForm}, }, @@ -178,6 +180,12 @@ impl Actor for ApubCommunity { } } +impl GetActorType for ApubCommunity { + fn actor_type(&self) -> ActorType { + ActorType::Community + } +} + impl ApubCommunity { /// For a given community, returns the inboxes of all followers. #[tracing::instrument(skip_all)] diff --git a/crates/apub/src/objects/instance.rs b/crates/apub/src/objects/instance.rs index 7933d4705..0a0793f5e 100644 --- a/crates/apub/src/objects/instance.rs +++ b/crates/apub/src/objects/instance.rs @@ -1,4 +1,5 @@ use crate::{ + activities::GetActorType, check_apub_id_valid_with_strictness, local_site_data_cached, objects::read_from_string_or_source_opt, @@ -20,6 +21,7 @@ use lemmy_api_common::{context::LemmyContext, utils::local_site_opt_to_slur_rege use lemmy_db_schema::{ newtypes::InstanceId, source::{ + activity::ActorType, actor_language::SiteLanguage, instance::Instance as DbInstance, site::{Site, SiteInsertForm}, @@ -168,6 +170,11 @@ impl Actor for ApubSite { self.inbox_url.clone().into() } } +impl GetActorType for ApubSite { + fn actor_type(&self) -> ActorType { + ActorType::Site + } +} /// Try to fetch the instance actor (to make things like instance rules available). pub(in crate::objects) async fn fetch_instance_actor_for_object + Clone>( diff --git a/crates/apub/src/objects/person.rs b/crates/apub/src/objects/person.rs index d28f8c7cf..f157c0192 100644 --- a/crates/apub/src/objects/person.rs +++ b/crates/apub/src/objects/person.rs @@ -1,4 +1,5 @@ use crate::{ + activities::GetActorType, check_apub_id_valid_with_strictness, local_site_data_cached, objects::{instance::fetch_instance_actor_for_object, read_from_string_or_source_opt}, @@ -22,7 +23,10 @@ use lemmy_api_common::{ utils::{generate_outbox_url, local_site_opt_to_slur_regex}, }; use lemmy_db_schema::{ - source::person::{Person as DbPerson, PersonInsertForm, PersonUpdateForm}, + source::{ + activity::ActorType, + person::{Person as DbPerson, PersonInsertForm, PersonUpdateForm}, + }, traits::{ApubActor, Crud}, utils::naive_now, }; @@ -193,6 +197,12 @@ impl Actor for ApubPerson { } } +impl GetActorType for ApubPerson { + fn actor_type(&self) -> ActorType { + ActorType::Person + } +} + #[cfg(test)] pub(crate) mod tests { #![allow(clippy::unwrap_used)] diff --git a/crates/db_schema/src/diesel_ltree.patch b/crates/db_schema/src/diesel_ltree.patch index ecbeb2193..ee6b241c6 100644 --- a/crates/db_schema/src/diesel_ltree.patch +++ b/crates/db_schema/src/diesel_ltree.patch @@ -2,10 +2,7 @@ diff --git a/crates/db_schema/src/schema.rs b/crates/db_schema/src/schema.rs index 255c6422..f2ccf5e2 100644 --- a/crates/db_schema/src/schema.rs +++ b/crates/db_schema/src/schema.rs -@@ -2,16 +2,12 @@ - - pub mod sql_types { - #[derive(diesel::sql_types::SqlType)] +@@ -9,10 +9,6 @@ pub mod sql_types { #[diesel(postgres_type(name = "listing_type_enum"))] pub struct ListingTypeEnum; @@ -16,9 +13,6 @@ index 255c6422..f2ccf5e2 100644 #[derive(diesel::sql_types::SqlType)] #[diesel(postgres_type(name = "registration_mode_enum"))] pub struct RegistrationModeEnum; - - #[derive(diesel::sql_types::SqlType)] - #[diesel(postgres_type(name = "sort_type_enum"))] @@ -76,13 +76,13 @@ diesel::table! { published -> Timestamp, } diff --git a/crates/db_schema/src/impls/instance.rs b/crates/db_schema/src/impls/instance.rs index d6a23a712..b1f17ad0b 100644 --- a/crates/db_schema/src/impls/instance.rs +++ b/crates/db_schema/src/impls/instance.rs @@ -6,11 +6,13 @@ use crate::{ utils::{get_conn, naive_now, DbPool}, }; use diesel::{ - dsl::{insert_into, now}, + dsl::{count_star, insert_into, now}, result::Error, sql_types::{Nullable, Timestamp}, ExpressionMethods, + NullableExpressionMethods, QueryDsl, + SelectableHelper, }; use diesel_async::RunQueryDsl; @@ -94,6 +96,37 @@ impl Instance { .await } + /// returns a list of all instances, each with a flag of whether the instance is allowed or not + /// ordered by id + pub async fn read_all_with_blocked(pool: &mut DbPool<'_>) -> Result, Error> { + let conn = &mut get_conn(pool).await?; + let use_allowlist = federation_allowlist::table + .select(count_star().gt(0)) + .get_result::(conn) + .await?; + if use_allowlist { + instance::table + .left_join(federation_allowlist::table) + .select(( + Self::as_select(), + federation_allowlist::id.nullable().is_not_null(), + )) + .order_by(instance::id) + .get_results::<(Self, bool)>(conn) + .await + } else { + instance::table + .left_join(federation_blocklist::table) + .select(( + Self::as_select(), + federation_blocklist::id.nullable().is_null(), + )) + .order_by(instance::id) + .get_results::<(Self, bool)>(conn) + .await + } + } + pub async fn linked(pool: &mut DbPool<'_>) -> Result, Error> { let conn = &mut get_conn(pool).await?; instance::table diff --git a/crates/db_schema/src/impls/site.rs b/crates/db_schema/src/impls/site.rs index 2820e9cd5..6c5a7fe2c 100644 --- a/crates/db_schema/src/impls/site.rs +++ b/crates/db_schema/src/impls/site.rs @@ -1,6 +1,6 @@ use crate::{ - newtypes::{DbUrl, SiteId}, - schema::site::dsl::{actor_id, id, site}, + newtypes::{DbUrl, InstanceId, SiteId}, + schema::site::dsl::{actor_id, id, instance_id, site}, source::{ actor_language::SiteLanguage, site::{Site, SiteInsertForm, SiteUpdateForm}, @@ -8,7 +8,7 @@ use crate::{ traits::Crud, utils::{get_conn, DbPool}, }; -use diesel::{dsl::insert_into, result::Error, ExpressionMethods, QueryDsl}; +use diesel::{dsl::insert_into, result::Error, ExpressionMethods, OptionalExtension, QueryDsl}; use diesel_async::RunQueryDsl; use url::Url; @@ -66,19 +66,29 @@ impl Crud for Site { } impl Site { + pub async fn read_from_instance_id( + pool: &mut DbPool<'_>, + _instance_id: InstanceId, + ) -> Result, Error> { + let conn = &mut get_conn(pool).await?; + site + .filter(instance_id.eq(_instance_id)) + .get_result(conn) + .await + .optional() + } pub async fn read_from_apub_id( pool: &mut DbPool<'_>, object_id: &DbUrl, ) -> Result, Error> { let conn = &mut get_conn(pool).await?; - Ok( - site - .filter(actor_id.eq(object_id)) - .first::(conn) - .await - .ok() - .map(Into::into), - ) + + site + .filter(actor_id.eq(object_id)) + .first::(conn) + .await + .optional() + .map(Into::into) } pub async fn read_remote_sites(pool: &mut DbPool<'_>) -> Result, Error> { diff --git a/crates/db_schema/src/newtypes.rs b/crates/db_schema/src/newtypes.rs index f5958105c..0ebf18396 100644 --- a/crates/db_schema/src/newtypes.rs +++ b/crates/db_schema/src/newtypes.rs @@ -249,3 +249,9 @@ impl TS for DbUrl { true } } + +impl InstanceId { + pub fn inner(self) -> i32 { + self.0 + } +} diff --git a/crates/db_schema/src/schema.rs b/crates/db_schema/src/schema.rs index 17a0f99f8..afda31060 100644 --- a/crates/db_schema/src/schema.rs +++ b/crates/db_schema/src/schema.rs @@ -1,17 +1,21 @@ // @generated automatically by Diesel CLI. pub mod sql_types { - #[derive(diesel::sql_types::SqlType)] - #[diesel(postgres_type(name = "listing_type_enum"))] - pub struct ListingTypeEnum; + #[derive(diesel::sql_types::SqlType)] + #[diesel(postgres_type(name = "actor_type_enum"))] + pub struct ActorTypeEnum; - #[derive(diesel::sql_types::SqlType)] - #[diesel(postgres_type(name = "registration_mode_enum"))] - pub struct RegistrationModeEnum; + #[derive(diesel::sql_types::SqlType)] + #[diesel(postgres_type(name = "listing_type_enum"))] + pub struct ListingTypeEnum; - #[derive(diesel::sql_types::SqlType)] - #[diesel(postgres_type(name = "sort_type_enum"))] - pub struct SortTypeEnum; + #[derive(diesel::sql_types::SqlType)] + #[diesel(postgres_type(name = "registration_mode_enum"))] + pub struct RegistrationModeEnum; + + #[derive(diesel::sql_types::SqlType)] + #[diesel(postgres_type(name = "sort_type_enum"))] + pub struct SortTypeEnum; } diesel::table! { @@ -290,6 +294,15 @@ diesel::table! { } } +diesel::table! { + federation_queue_state (domain) { + domain -> Text, + last_successful_id -> Int4, + fail_count -> Int4, + last_retry -> Timestamptz, + } +} + diesel::table! { instance (id) { id -> Int4, @@ -879,6 +892,7 @@ diesel::joinable!(custom_emoji_keyword -> custom_emoji (custom_emoji_id)); diesel::joinable!(email_verification -> local_user (local_user_id)); diesel::joinable!(federation_allowlist -> instance (instance_id)); diesel::joinable!(federation_blocklist -> instance (instance_id)); +diesel::joinable!(federation_queue_state -> activity (last_successful_id)); diesel::joinable!(local_site -> site (site_id)); diesel::joinable!(local_site_rate_limit -> local_site (local_site_id)); diesel::joinable!(local_user -> person (person_id)); @@ -930,68 +944,69 @@ diesel::joinable!(site_language -> site (site_id)); diesel::joinable!(tagline -> local_site (local_site_id)); diesel::allow_tables_to_appear_in_same_query!( - admin_purge_comment, - admin_purge_community, - admin_purge_person, - admin_purge_post, - captcha_answer, - comment, - comment_aggregates, - comment_like, - comment_reply, - comment_report, - comment_saved, - community, - community_aggregates, - community_block, - community_follower, - community_language, - community_moderator, - community_person_ban, - custom_emoji, - custom_emoji_keyword, - email_verification, - federation_allowlist, - federation_blocklist, - instance, - language, - local_site, - local_site_rate_limit, - local_user, - local_user_language, - mod_add, - mod_add_community, - mod_ban, - mod_ban_from_community, - mod_feature_post, - mod_hide_community, - mod_lock_post, - mod_remove_comment, - mod_remove_community, - mod_remove_post, - mod_transfer_community, - password_reset_request, - person, - person_aggregates, - person_ban, - person_block, - person_follower, - person_mention, - person_post_aggregates, - post, - post_aggregates, - post_like, - post_read, - post_report, - post_saved, - private_message, - private_message_report, - received_activity, - registration_application, - secret, - sent_activity, - site, - site_aggregates, - site_language, - tagline, + admin_purge_comment, + admin_purge_community, + admin_purge_person, + admin_purge_post, + captcha_answer, + comment, + comment_aggregates, + comment_like, + comment_reply, + comment_report, + comment_saved, + community, + community_aggregates, + community_block, + community_follower, + community_language, + community_moderator, + community_person_ban, + custom_emoji, + custom_emoji_keyword, + email_verification, + federation_allowlist, + federation_blocklist, + federation_queue_state, + instance, + language, + local_site, + local_site_rate_limit, + local_user, + local_user_language, + mod_add, + mod_add_community, + mod_ban, + mod_ban_from_community, + mod_feature_post, + mod_hide_community, + mod_lock_post, + mod_remove_comment, + mod_remove_community, + mod_remove_post, + mod_transfer_community, + password_reset_request, + person, + person_aggregates, + person_ban, + person_block, + person_follower, + person_mention, + person_post_aggregates, + post, + post_aggregates, + post_like, + post_read, + post_report, + post_saved, + private_message, + private_message_report, + received_activity, + registration_application, + secret, + sent_activity, + site, + site_aggregates, + site_language, + tagline, ); diff --git a/crates/db_schema/src/source/activity.rs b/crates/db_schema/src/source/activity.rs index 85b193f51..9e81f5515 100644 --- a/crates/db_schema/src/source/activity.rs +++ b/crates/db_schema/src/source/activity.rs @@ -1,6 +1,68 @@ -use crate::{newtypes::DbUrl, schema::sent_activity}; +use crate::{ + newtypes::{CommunityId, DbUrl}, + schema::{activity, sent_activity}, +}; +use diesel::{ + deserialize::FromSql, + pg::{Pg, PgValue}, + serialize::{Output, ToSql}, + sql_types::Jsonb, +}; use serde_json::Value; -use std::fmt::Debug; +use std::{collections::HashSet, fmt::Debug, io::Write}; +use url::Url; + +#[derive( + FromSqlRow, + PartialEq, + Eq, + AsExpression, + serde::Serialize, + serde::Deserialize, + Debug, + Default, + Clone, +)] +#[diesel(sql_type = Jsonb)] +/// describes where an activity should be sent +pub struct ActivitySendTargets { + /// send to these inboxes explicitly + pub inboxes: HashSet, + /// send to all followers of these local communities + pub community_followers_of: HashSet, + /// send to all remote instances + pub all_instances: bool, +} + +// todo: in different file? +impl ActivitySendTargets { + pub fn empty() -> ActivitySendTargets { + ActivitySendTargets::default() + } + pub fn to_inbox(url: Url) -> ActivitySendTargets { + let mut a = ActivitySendTargets::empty(); + a.inboxes.insert(url); + a + } + pub fn to_local_community_followers(id: CommunityId) -> ActivitySendTargets { + let mut a = ActivitySendTargets::empty(); + a.add_local_community_followers(id); + a + } + pub fn add_local_community_followers(&mut self, id: CommunityId) { + self.community_followers_of.insert(id); + } + pub fn set_all_instances(&mut self, b: bool) { + self.all_instances = b; + } + + pub fn add_inbox(&mut self, inbox: Url) { + self.inboxes.insert(inbox); + } + pub fn add_inboxes(&mut self, inboxes: impl Iterator) { + self.inboxes.extend(inboxes); + } +} #[derive(PartialEq, Eq, Debug, Queryable)] #[diesel(table_name = sent_activity)] @@ -10,6 +72,9 @@ pub struct SentActivity { pub data: Value, pub sensitive: bool, pub published: chrono::NaiveDateTime, + pub send_targets: ActivitySendTargets, + pub actor_type: ActorType, + pub actor_apub_id: DbUrl, } #[derive(Insertable)] #[diesel(table_name = sent_activity)] @@ -17,6 +82,17 @@ pub struct SentActivityForm { pub ap_id: DbUrl, pub data: Value, pub sensitive: bool, + pub send_targets: ActivitySendTargets, + pub actor_type: ActorType, + pub actor_apub_id: DbUrl, +} + +#[derive(Clone, Copy, Debug, diesel_derive_enum::DbEnum, PartialEq, Eq)] +#[ExistingTypePath = "crate::schema::sql_types::ActorTypeEnum"] +pub enum ActorType { + Site, + Community, + Person, } #[derive(PartialEq, Eq, Debug, Queryable)] @@ -26,3 +102,20 @@ pub struct ReceivedActivity { pub ap_id: DbUrl, pub published: chrono::NaiveDateTime, } + +// https://vasilakisfil.social/blog/2020/05/09/rust-diesel-jsonb/ +impl FromSql for ActivitySendTargets { + fn from_sql(bytes: PgValue) -> diesel::deserialize::Result { + let value = >::from_sql(bytes)?; + Ok(serde_json::from_value(value)?) + } +} + +impl ToSql for ActivitySendTargets { + fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Pg>) -> diesel::serialize::Result { + out.write_all(&[1])?; + serde_json::to_writer(out, self) + .map(|_| diesel::serialize::IsNull::No) + .map_err(Into::into) + } +} diff --git a/crates/db_schema/src/source/instance.rs b/crates/db_schema/src/source/instance.rs index a75259c15..db3bc277c 100644 --- a/crates/db_schema/src/source/instance.rs +++ b/crates/db_schema/src/source/instance.rs @@ -9,7 +9,7 @@ use ts_rs::TS; use typed_builder::TypedBuilder; #[skip_serializing_none] -#[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)] +#[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize, Selectable)] #[cfg_attr(feature = "full", derive(Queryable, Identifiable, TS))] #[cfg_attr(feature = "full", diesel(table_name = instance))] #[cfg_attr(feature = "full", ts(export))] diff --git a/crates/db_schema/src/utils.rs b/crates/db_schema/src/utils.rs index 94c867d6b..20a2b4072 100644 --- a/crates/db_schema/src/utils.rs +++ b/crates/db_schema/src/utils.rs @@ -374,6 +374,9 @@ pub mod functions { } sql_function!(fn lower(x: Text) -> Text); + + // really this function is variadic, this just adds the two-argument version + sql_function!(fn coalesce(x: diesel::sql_types::Nullable, y: T) -> T); } pub const DELETED_REPLACEMENT_TEXT: &str = "*Permanently Deleted*"; diff --git a/crates/db_views_actor/Cargo.toml b/crates/db_views_actor/Cargo.toml index 069013d71..03532438d 100644 --- a/crates/db_views_actor/Cargo.toml +++ b/crates/db_views_actor/Cargo.toml @@ -27,4 +27,5 @@ diesel-async = { workspace = true, features = [ ], optional = true } serde = { workspace = true } serde_with = { workspace = true } -ts-rs = { workspace = true, optional = true } +ts-rs = { workspace = true, optional = true } +chrono.workspace = true diff --git a/crates/db_views_actor/src/community_follower_view.rs b/crates/db_views_actor/src/community_follower_view.rs index c30503878..dfb5e3214 100644 --- a/crates/db_views_actor/src/community_follower_view.rs +++ b/crates/db_views_actor/src/community_follower_view.rs @@ -1,25 +1,45 @@ use crate::structs::CommunityFollowerView; +use chrono::Utc; use diesel::{ dsl::{count_star, not}, result::Error, - sql_function, ExpressionMethods, QueryDsl, }; use diesel_async::RunQueryDsl; use lemmy_db_schema::{ - newtypes::{CommunityId, DbUrl, PersonId}, + newtypes::{CommunityId, DbUrl, InstanceId, PersonId}, schema::{community, community_follower, person}, source::{community::Community, person::Person}, traits::JoinView, - utils::{get_conn, DbPool}, + utils::{functions::coalesce, get_conn, DbPool}, }; type CommunityFollowerViewTuple = (Community, Person); -sql_function!(fn coalesce(x: diesel::sql_types::Nullable, y: diesel::sql_types::Text) -> diesel::sql_types::Text); - impl CommunityFollowerView { + /// return a list of community ids and inboxes that at least one user of the given instance has followed + pub async fn get_instance_followed_community_inboxes( + pool: &mut DbPool<'_>, + instance_id: InstanceId, + published_since: chrono::DateTime, + ) -> Result, Error> { + let conn = &mut get_conn(pool).await?; + // todo: in most cases this will fetch the same url many times (the shared inbox url) + community_follower::table + .inner_join(community::table) + .inner_join(person::table) + .filter(person::instance_id.eq(instance_id)) + .filter(not(person::local)) + .filter(community_follower::published.gt(published_since.naive_utc())) + .select(( + community::id, + coalesce(person::shared_inbox_url, person::inbox_url), + )) + .distinct() // only need each community_id, inbox combination once + .load::<(CommunityId, DbUrl)>(conn) + .await + } pub async fn get_community_follower_inboxes( pool: &mut DbPool<'_>, community_id: CommunityId, diff --git a/crates/federate/Cargo.toml b/crates/federate/Cargo.toml new file mode 100644 index 000000000..06794499b --- /dev/null +++ b/crates/federate/Cargo.toml @@ -0,0 +1,40 @@ +[package] +name = "lemmy_federate" +version.workspace = true +edition.workspace = true +description.workspace = true +license.workspace = true +homepage.workspace = true +documentation.workspace = true +repository.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +activitypub_federation.workspace = true +anyhow.workspace = true +async-trait = "0.1.71" +bytes = "1.4.0" +chrono.workspace = true +dashmap = "5.5.0" +diesel = { workspace = true, features = ["postgres", "chrono", "serde_json"] } +diesel-async = { workspace = true, features = ["deadpool", "postgres"] } +enum_delegate = "0.2.0" +futures.workspace = true +lemmy_api_common.workspace = true +lemmy_apub.workspace = true +lemmy_db_schema = { workspace = true, features = ["full"] } +lemmy_db_views_actor.workspace = true +lemmy_utils.workspace = true +moka = { version = "0.11.2", features = ["future"] } +once_cell.workspace = true +openssl = "0.10.55" +reqwest.workspace = true +reqwest-middleware = "0.2.2" +reqwest-tracing = "0.4.5" +serde.workspace = true +serde_json.workspace = true +tokio = { workspace = true, features = ["full"] } +tokio-util = "0.7.8" +tracing.workspace = true +tracing-subscriber = "0.3.17" diff --git a/crates/federate/src/federation_queue_state.rs b/crates/federate/src/federation_queue_state.rs new file mode 100644 index 000000000..e048f120d --- /dev/null +++ b/crates/federate/src/federation_queue_state.rs @@ -0,0 +1,52 @@ +use crate::util::ActivityId; +use anyhow::Result; +use chrono::{DateTime, TimeZone, Utc}; +use diesel::prelude::*; +use diesel_async::RunQueryDsl; +use lemmy_db_schema::utils::{get_conn, DbPool}; + +#[derive(Queryable, Selectable, Insertable, AsChangeset, Clone)] +#[diesel(table_name = lemmy_db_schema::schema::federation_queue_state)] +#[diesel(check_for_backend(diesel::pg::Pg))] +pub struct FederationQueueState { + /// domain of the instance (primary key) + pub domain: String, + pub last_successful_id: ActivityId, // todo: i64 + pub fail_count: i32, + pub last_retry: DateTime, +} + +impl FederationQueueState { + /// load or return a default empty value + pub async fn load(pool: &mut DbPool<'_>, domain_: &str) -> Result { + use lemmy_db_schema::schema::federation_queue_state::dsl::*; + let conn = &mut get_conn(pool).await?; + Ok( + federation_queue_state + .find(&domain_) + .select(FederationQueueState::as_select()) + .get_result(conn) + .await + .optional()? + .unwrap_or(FederationQueueState { + domain: domain_.to_owned(), + fail_count: 0, + last_retry: Utc.timestamp_nanos(0), + last_successful_id: 0, // todo: start at current id not from beginning + }), + ) + } + pub async fn upsert(pool: &mut DbPool<'_>, state: &FederationQueueState) -> Result<()> { + let conn = &mut get_conn(pool).await?; + use lemmy_db_schema::schema::federation_queue_state::dsl::*; + + state + .insert_into(federation_queue_state) + .on_conflict(domain) + .do_update() + .set(state) + .execute(conn) + .await?; + Ok(()) + } +} diff --git a/crates/federate/src/main.rs b/crates/federate/src/main.rs new file mode 100644 index 000000000..b727324bd --- /dev/null +++ b/crates/federate/src/main.rs @@ -0,0 +1,174 @@ +use crate::{ + util::{retry_sleep_duration, spawn_cancellable}, + worker::instance_worker, +}; +use activitypub_federation::config::FederationConfig; +use chrono::{Local, Timelike}; +use federation_queue_state::FederationQueueState; +use lemmy_api_common::request::build_user_agent; +use lemmy_apub::{VerifyUrlData, FEDERATION_HTTP_FETCH_LIMIT}; +use lemmy_db_schema::{ + source::instance::Instance, + utils::{build_db_pool, DbPool}, +}; +use lemmy_utils::{error::LemmyErrorExt2, settings::SETTINGS, REQWEST_TIMEOUT}; +use reqwest::Client; +use reqwest_middleware::ClientBuilder; +use reqwest_tracing::TracingMiddleware; +use std::{collections::HashMap, time::Duration}; +use tokio::{ + signal::unix::SignalKind, + sync::mpsc::{unbounded_channel, UnboundedReceiver}, + time::sleep, +}; + +mod federation_queue_state; +mod util; +mod worker; + +static WORKER_EXIT_TIMEOUT: Duration = Duration::from_secs(30); + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + let settings = SETTINGS.to_owned(); + // TODO: wait until migrations are applied? or are they safe from race conditions and i can just call run_migrations here as well? + let pool = build_db_pool(&settings).await.into_anyhow()?; + let user_agent = build_user_agent(&settings); + let reqwest_client = Client::builder() + .user_agent(user_agent.clone()) + .timeout(REQWEST_TIMEOUT) + .connect_timeout(REQWEST_TIMEOUT) + .build()?; + + let client = ClientBuilder::new(reqwest_client.clone()) + .with(TracingMiddleware::default()) + .build(); + + let federation_config = FederationConfig::builder() + .domain(settings.hostname.clone()) + .app_data(()) + .client(client.clone()) + .http_fetch_limit(FEDERATION_HTTP_FETCH_LIMIT) + .http_signature_compat(true) + .url_verifier(Box::new(VerifyUrlData(pool.clone()))) + .build() + .await?; + let process_num = 1 - 1; // todo: pass these in via command line args + let process_count = 1; + let mut workers = HashMap::new(); + let mut pool2 = DbPool::from(&pool); + + let (stats_sender, stats_receiver) = unbounded_channel(); + let exit_print = tokio::spawn(receive_print_stats(&mut pool2, stats_receiver)); + let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?; + let mut terminate = tokio::signal::unix::signal(SignalKind::terminate())?; + loop { + for (instance, should_federate) in Instance::read_all_with_blocked(&mut pool2) + .await? + .into_iter() + { + if instance.id.inner() % process_count != process_num { + continue; + } + if !workers.contains_key(&instance.id) && should_federate { + let stats_sender = stats_sender.clone(); + workers.insert( + instance.id, + spawn_cancellable(WORKER_EXIT_TIMEOUT, |stop| { + instance_worker( + pool2, + instance, + federation_config.to_request_data(), + stop, + stats_sender, + ) + }), + ); + } else if !should_federate { + if let Some(worker) = workers.remove(&instance.id) { + if let Err(e) = worker.await { + tracing::error!("error stopping worker: {e}"); + } + } + } + } + tokio::select! { + () = sleep(Duration::from_secs(60)) => {}, + _ = tokio::signal::ctrl_c() => { + tracing::warn!("Received ctrl-c, shutting down gracefully..."); + break; + } + _ = interrupt.recv() => { + tracing::warn!("Received interrupt, shutting down gracefully..."); + break; + } + _ = terminate.recv() => { + tracing::warn!("Received terminate, shutting down gracefully..."); + break; + } + } + } + drop(stats_sender); + tracing::warn!( + "Waiting for {} workers ({:.2?} max)", + workers.len(), + WORKER_EXIT_TIMEOUT + ); + futures::future::join_all(workers.into_values()).await; + exit_print.await?; + Ok(()) +} + +/// every 60s, print the state for every instance. exits if the receiver is done (all senders dropped) +async fn receive_print_stats( + mut pool: &mut DbPool<'_>, + mut receiver: UnboundedReceiver, +) { + let mut printerval = tokio::time::interval(Duration::from_secs(60)); + printerval.tick().await; // skip first + let mut stats = HashMap::new(); + loop { + tokio::select! { + ele = receiver.recv() => { + let Some(ele) = ele else { + tracing::info!("done. quitting"); + print_stats(&mut pool, &stats).await; + return; + }; + stats.insert(ele.domain.clone(), ele); + }, + _ = printerval.tick() => { + print_stats(&mut pool, &stats).await; + } + } + } +} +async fn print_stats(pool: &mut DbPool<'_>, stats: &HashMap) { + let last_id = crate::util::get_latest_activity_id(pool).await; + let Ok(last_id) = last_id else { + tracing::error!("could not get last id"); + return; + }; + // it's expected that the values are a bit out of date, everything < SAVE_STATE_EVERY should be considered up to date + tracing::info!( + "Federation state as of {}:", + Local::now().with_nanosecond(0).unwrap().to_rfc3339() + ); + // todo: less noisy output (only output failing instances and summary for successful) + // todo: more stats (act/sec, avg http req duration) + for stat in stats.values() { + let behind = last_id - stat.last_successful_id; + if stat.fail_count > 0 { + tracing::info!( + "{}: Warning. {} behind, {} consecutive fails, current retry delay {:.2?}", + stat.domain, + behind, + stat.fail_count, + retry_sleep_duration(stat.fail_count) + ); + } else { + tracing::info!("{}: Ok. {} behind", stat.domain, behind); + } + } +} diff --git a/crates/federate/src/util.rs b/crates/federate/src/util.rs new file mode 100644 index 000000000..c2f589a86 --- /dev/null +++ b/crates/federate/src/util.rs @@ -0,0 +1,190 @@ +use anyhow::{anyhow, Context, Result}; +use dashmap::DashSet; +use diesel::{prelude::*, sql_types::Int8}; +use diesel_async::RunQueryDsl; +use lemmy_apub::{ + activity_lists::SharedInboxActivities, + fetcher::{site_or_community_or_user::SiteOrCommunityOrUser, user_or_community::UserOrCommunity}, +}; +use lemmy_db_schema::{ + source::{ + activity::{Activity, ActorType}, + community::Community, + person::Person, + site::Site, + }, + traits::{ApubActor, Crud}, + utils::{get_conn, DbPool}, +}; +use moka::future::Cache; +use once_cell::sync::Lazy; +use reqwest::Url; +use serde_json::Value; +use std::{ + borrow::{Borrow, Cow}, + future::Future, + sync::Arc, + time::Duration, +}; +use tokio::{task::JoinHandle, time::sleep}; +use tokio_util::sync::CancellationToken; + +/// spawn a task but with graceful shutdown +/// +/// only await the returned future when you want to cancel the task +pub fn spawn_cancellable( + timeout: Duration, + task: impl FnOnce(CancellationToken) -> F, +) -> impl Future> +where + F: Future> + Send + 'static, +{ + let stop = CancellationToken::new(); + let task = task(stop.clone()); + let task: JoinHandle> = tokio::spawn(async move { + match task.await { + Ok(o) => Ok(o), + Err(e) => { + tracing::error!("worker errored out: {e}"); + // todo: if this error happens, requeue worker creation in main + Err(e) + } + } + }); + let abort = task.abort_handle(); + async move { + tracing::info!("Shutting down task"); + stop.cancel(); + tokio::select! { + r = task => { + Ok(r.context("could not join")??) + }, + _ = sleep(timeout) => { + abort.abort(); + tracing::warn!("Graceful shutdown timed out, aborting task"); + Err(anyhow!("task aborted due to timeout")) + } + } + } +} + +/// assuming apub priv key and ids are immutable, then we don't need to have TTL +/// TODO: capacity should be configurable maybe based on memory use +pub async fn get_actor_cached( + pool: &mut DbPool<'_>, + actor_type: ActorType, + actor_apub_id: &Url, +) -> Result> { + static CACHE: Lazy>> = + Lazy::new(|| Cache::builder().max_capacity(10000).build()); + CACHE + .try_get_with(actor_apub_id.clone(), async { + let url = actor_apub_id.clone().into(); + let person = match actor_type { + ActorType::Site => SiteOrCommunityOrUser::Site( + Site::read_from_apub_id(pool, &url) + .await? + .context("apub site not found")? + .into(), + ), + ActorType::Community => SiteOrCommunityOrUser::UserOrCommunity(UserOrCommunity::Community( + Community::read_from_apub_id(pool, &url) + .await? + .context("apub community not found")? + .into(), + )), + ActorType::Person => SiteOrCommunityOrUser::UserOrCommunity(UserOrCommunity::User( + Person::read_from_apub_id(pool, &url) + .await? + .context("apub person not found")? + .into(), + )), + }; + Result::<_, anyhow::Error>::Ok(Arc::new(person)) + }) + .await + .map_err(|e| anyhow::anyhow!("err getting actor: {e}")) +} + +/// intern urls to reduce memory usage +/// not sure if worth it +pub fn intern_url<'a>(url: impl Into>) -> Arc { + let url: Cow<'a, Url> = url.into(); + static INTERNED_URLS: Lazy>> = Lazy::new(DashSet::new); + return INTERNED_URLS + .get::(url.borrow()) + .map(|e| e.clone()) + .unwrap_or_else(|| { + let ret = Arc::new(url.into_owned()); + INTERNED_URLS.insert(ret.clone()); + ret + }); +} + +/// this should maybe be a newtype like all the other PersonId CommunityId etc. +/// also should be i64 +pub type ActivityId = i32; + +/// activities are immutable so cache does not need to have TTL +/// May return None if the corresponding id does not exist or is a received activity. +/// Holes in serials are expected behaviour in postgresql +/// todo: cache size should probably be configurable / dependent on desired memory usage +pub async fn get_activity_cached( + pool: &mut DbPool<'_>, + activity_id: ActivityId, +) -> Result>> { + static ACTIVITIES: Lazy>>> = + Lazy::new(|| Cache::builder().max_capacity(10000).build()); + ACTIVITIES + .try_get_with(activity_id, async { + let row = Activity::read(pool, activity_id) + .await + .optional() + .context("could not read activity")?; + let Some(mut row) = row else { return anyhow::Result::<_, anyhow::Error>::Ok(None) }; + if row.send_targets.is_none() { + // must be a received activity + return Ok(None); + } + // swap to avoid cloning + let mut data = Value::Null; + std::mem::swap(&mut row.data, &mut data); + let activity_actual: SharedInboxActivities = serde_json::from_value(data)?; + + Ok(Some(Arc::new((row, activity_actual)))) + }) + .await + .map_err(|e| anyhow::anyhow!("err getting activity: {e}")) +} + +/// return the most current activity id (with 1 second cache) +pub async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result { + static CACHE: Lazy> = Lazy::new(|| { + Cache::builder() + .time_to_live(Duration::from_secs(1)) + .build() + }); + CACHE + .try_get_with((), async { + let conn = &mut get_conn(pool).await?; + let Sequence { + last_value: latest_id, + } = diesel::sql_query("select last_value from activity_id_seq") + .get_result(conn) + .await?; + anyhow::Result::<_, anyhow::Error>::Ok(latest_id as ActivityId) + }) + .await + .map_err(|e| anyhow::anyhow!("err getting id: {e}")) +} + +/// how long to sleep based on how many retries have already happened +pub fn retry_sleep_duration(retry_count: i32) -> Duration { + Duration::from_secs_f64(10.0 * 2.0_f64.powf(retry_count as f64)) +} + +#[derive(QueryableByName)] +struct Sequence { + #[diesel(sql_type = Int8)] + last_value: i64, // this value is bigint for some reason even if sequence is int4 +} diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs new file mode 100644 index 000000000..98e569943 --- /dev/null +++ b/crates/federate/src/worker.rs @@ -0,0 +1,218 @@ +use crate::{ + federation_queue_state::FederationQueueState, + util::{ + get_activity_cached, + get_actor_cached, + get_latest_activity_id, + intern_url, + retry_sleep_duration, + }, +}; +use activitypub_federation::{ + activity_queue::{prepare_raw, send_raw, sign_raw}, + config::Data, +}; +use anyhow::Result; +use chrono::{DateTime, TimeZone, Utc}; +use lemmy_db_schema::{ + newtypes::{CommunityId, InstanceId}, + source::{activity::Activity, instance::Instance, site::Site}, + utils::DbPool, +}; +use lemmy_db_views_actor::structs::CommunityFollowerView; +use lemmy_utils::{error::LemmyErrorExt2, REQWEST_TIMEOUT}; +use reqwest::Url; +use std::{ + borrow::Cow, + collections::{HashMap, HashSet}, + ops::Deref, + sync::Arc, + time::Duration, +}; +use tokio::{sync::mpsc::UnboundedSender, time::sleep}; +use tokio_util::sync::CancellationToken; +/// save state to db every n sends if there's no failures (otherwise state is saved after every attempt) +static SAVE_STATE_EVERY_IT: i64 = 100; +static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(10); + +/// loop fetch new activities from db and send them to the inboxes of the given instances +/// this worker only returns if (a) there is an internal error or (b) the cancellation token is cancelled (graceful exit) +pub async fn instance_worker( + mut pool: DbPool<'_>, + instance: Instance, + data: Data<()>, + stop: CancellationToken, + stats_sender: UnboundedSender, +) -> Result<(), anyhow::Error> { + let mut last_full_communities_fetch = Utc.timestamp_nanos(0); + let mut last_incremental_communities_fetch = Utc.timestamp_nanos(0); + let mut last_state_insert = Utc.timestamp_nanos(0); + let mut followed_communities: HashMap>> = get_communities( + &mut pool, + instance.id, + &mut last_incremental_communities_fetch, + ) + .await?; + let site = Site::read_from_instance_id(&mut pool, instance.id).await?; + + let mut state = FederationQueueState::load(&mut pool, &instance.domain).await?; + if state.fail_count > 0 { + // before starting queue, sleep remaining duration + let elapsed = (Utc::now() - state.last_retry).to_std()?; + let remaining = retry_sleep_duration(state.fail_count) - elapsed; + tokio::select! { + () = sleep(remaining) => {}, + () = stop.cancelled() => { return Ok(()); } + } + } + while !stop.is_cancelled() { + let latest_id = get_latest_activity_id(&mut pool).await?; + let mut id = state.last_successful_id; + if id == latest_id { + // no more work to be done, wait before rechecking + tokio::select! { + () = sleep(Duration::from_secs(10)) => { continue; }, + () = stop.cancelled() => { return Ok(()); } + } + } + let mut processed_activities = 0; + 'batch: while id < latest_id + && processed_activities < SAVE_STATE_EVERY_IT + && !stop.is_cancelled() + { + id += 1; + processed_activities += 1; + let Some(ele) = get_activity_cached(&mut pool, id).await? else { + state.last_successful_id = id; + continue; + }; + let (activity, object) = (&ele.0, &ele.1); + let inbox_urls = get_inbox_urls(&instance, &site, &followed_communities, activity); + if inbox_urls.is_empty() { + state.last_successful_id = id; + continue; + } + let actor = { + // these should always be set for sent activities + let (Some(actor_type), Some(apub_id)) = (activity.actor_type, &activity.actor_apub_id) else { + tracing::warn!("activity {id} does not have actor_type or actor_apub_id set"); + state.last_successful_id = id; + continue; + }; + get_actor_cached(&mut pool, actor_type, apub_id.deref()).await? + }; + let inbox_urls = inbox_urls.into_iter().map(|e| (*e).clone()).collect(); + let requests = prepare_raw(object, actor.as_ref(), inbox_urls, &data) + .await + .into_anyhow()?; + for task in requests { + // usually only one due to shared inbox + let mut req = sign_raw(&task, &data, REQWEST_TIMEOUT).await?; + tracing::info!("sending out {}", task); + while let Err(e) = send_raw(&task, &data, req).await { + tracing::info!("{task} failed: {e}"); + state.fail_count += 1; + state.last_retry = Utc::now(); + stats_sender.send(state.clone())?; + FederationQueueState::upsert(&mut pool, &state).await?; + req = sign_raw(&task, &data, REQWEST_TIMEOUT).await?; // resign request + tokio::select! { + () = sleep(retry_sleep_duration(state.fail_count)) => {}, + () = stop.cancelled() => { + // save state to db and exit + break 'batch; + } + } + } + } + // send success! + state.last_successful_id = id; + state.fail_count = 0; + } + + if Utc::now() - last_state_insert > chrono::Duration::from_std(SAVE_STATE_EVERY_TIME).unwrap() { + last_state_insert = Utc::now(); + FederationQueueState::upsert(&mut pool, &state).await?; + stats_sender.send(state.clone())?; + } + { + // update communities + if (Utc::now() - last_incremental_communities_fetch) > chrono::Duration::seconds(10) { + // process additions every 10s + followed_communities.extend( + get_communities( + &mut pool, + instance.id, + &mut last_incremental_communities_fetch, + ) + .await?, + ); + } + if (Utc::now() - last_full_communities_fetch) > chrono::Duration::seconds(300) { + // process removals every 5min + last_full_communities_fetch = Utc.timestamp_nanos(0); + followed_communities = + get_communities(&mut pool, instance.id, &mut last_full_communities_fetch).await?; + last_incremental_communities_fetch = last_full_communities_fetch.clone(); + } + } + } + + Ok(()) +} + +/// get inbox urls of sending the given activity to the given instance +/// most often this will return 0 values (if instance doesn't care about the activity) +/// or 1 value (the shared inbox) +/// > 1 values only happens for non-lemmy software +fn get_inbox_urls( + instance: &Instance, + site: &Option, + followed_communities: &HashMap>>, + activity: &Activity, +) -> HashSet> { + let mut inbox_urls = HashSet::new(); + let Some(targets) = &activity.send_targets else { + return inbox_urls; + }; + if targets.all_instances { + if let Some(site) = &site { + // todo: when does an instance not have a site? + inbox_urls.insert(intern_url(Cow::Borrowed(site.inbox_url.deref()))); + } + } + for t in &targets.community_followers_of { + if let Some(urls) = followed_communities.get(t) { + inbox_urls.extend(urls.iter().map(|e| e.clone())); + } + } + for inbox in &targets.inboxes { + if inbox.domain() != Some(&instance.domain) { + continue; + } + inbox_urls.insert(intern_url(Cow::Borrowed(inbox))); + } + inbox_urls +} + +/// get a list of local communities with the remote inboxes on the given instance that cares about them +async fn get_communities( + pool: &mut DbPool<'_>, + instance_id: InstanceId, + last_fetch: &mut DateTime, +) -> Result>>> { + let e = *last_fetch; + *last_fetch = Utc::now(); // update to time before fetch to ensure overlap + Ok( + CommunityFollowerView::get_instance_followed_community_inboxes(pool, instance_id, e) + .await? + .into_iter() + .fold(HashMap::new(), |mut map, (c, u)| { + map + .entry(c) + .or_insert_with(|| HashSet::new()) + .insert(intern_url(Cow::Owned(u.into()))); + map + }), + ) +} diff --git a/crates/utils/src/error.rs b/crates/utils/src/error.rs index ffc1723b4..46de182a4 100644 --- a/crates/utils/src/error.rs +++ b/crates/utils/src/error.rs @@ -236,6 +236,7 @@ impl> LemmyErrorExt for Result { } pub trait LemmyErrorExt2 { fn with_lemmy_type(self, error_type: LemmyErrorType) -> Result; + fn into_anyhow(self) -> Result; } impl LemmyErrorExt2 for Result { @@ -245,6 +246,10 @@ impl LemmyErrorExt2 for Result { e }) } + // this function can't be an impl From or similar because it would conflict with one of the other broad Into<> implementations + fn into_anyhow(self) -> Result { + self.map_err(|e| e.inner) + } } #[cfg(test)] diff --git a/migrations/2023-07-09-115243_persistent-activity-queue/down.sql b/migrations/2023-07-09-115243_persistent-activity-queue/down.sql new file mode 100644 index 000000000..b90dd3ab8 --- /dev/null +++ b/migrations/2023-07-09-115243_persistent-activity-queue/down.sql @@ -0,0 +1,11 @@ +ALTER TABLE activity + DROP COLUMN send_targets, + DROP COLUMN actor_apub_id, + DROP COLUMN actor_type; + +DROP TYPE actor_type_enum; + +DROP TABLE federation_queue_state; + +DROP INDEX idx_community_follower_published; + diff --git a/migrations/2023-07-09-115243_persistent-activity-queue/up.sql b/migrations/2023-07-09-115243_persistent-activity-queue/up.sql new file mode 100644 index 000000000..2db45766a --- /dev/null +++ b/migrations/2023-07-09-115243_persistent-activity-queue/up.sql @@ -0,0 +1,21 @@ +CREATE TYPE actor_type_enum AS enum( + 'site', + 'community', + 'person' +); + +ALTER TABLE activity + ADD COLUMN send_targets jsonb DEFAULT NULL, + ADD COLUMN actor_type actor_type_enum DEFAULT NULL, + ADD COLUMN actor_apub_id text DEFAULT NULL; + +CREATE TABLE federation_queue_state( + domain text PRIMARY KEY, + last_successful_id integer NOT NULL, + fail_count integer NOT NULL, + last_retry timestamptz NOT NULL +); + +-- for incremental fetches of followers +CREATE INDEX idx_community_follower_published ON community_follower(published); +