diff --git a/crates/apub/src/activities/mod.rs b/crates/apub/src/activities/mod.rs index 7a9020b50..7ae99c831 100644 --- a/crates/apub/src/activities/mod.rs +++ b/crates/apub/src/activities/mod.rs @@ -205,7 +205,17 @@ where ap_id: activity.id().clone().into(), data: serde_json::to_value(activity.clone())?, sensitive, - send_targets, + send_inboxes: send_targets + .inboxes + .into_iter() + .map(|e| Some(e.into())) + .collect(), + send_all_instances: send_targets.all_instances, + send_community_followers_of: send_targets + .community_followers_of + .into_iter() + .map(|e| Some(e.0)) + .collect(), actor_type: actor.actor_type(), actor_apub_id: actor.id().into(), }; diff --git a/crates/db_schema/src/impls/activity.rs b/crates/db_schema/src/impls/activity.rs index 576f85dc4..329f20e7a 100644 --- a/crates/db_schema/src/impls/activity.rs +++ b/crates/db_schema/src/impls/activity.rs @@ -70,10 +70,7 @@ mod tests { #![allow(clippy::indexing_slicing)] use super::*; - use crate::{ - source::activity::{ActivitySendTargets, ActorType}, - utils::build_db_pool_for_tests, - }; + use crate::{source::activity::ActorType, utils::build_db_pool_for_tests}; use serde_json::json; use serial_test::serial; use url::Url; @@ -117,7 +114,9 @@ mod tests { .unwrap() .into(), actor_type: ActorType::Person, - send_targets: ActivitySendTargets::empty(), + send_all_instances: false, + send_community_followers_of: vec![], + send_inboxes: vec![], }; SentActivity::create(pool, form).await.unwrap(); diff --git a/crates/db_schema/src/newtypes.rs b/crates/db_schema/src/newtypes.rs index 0ebf18396..abfcfa1f4 100644 --- a/crates/db_schema/src/newtypes.rs +++ b/crates/db_schema/src/newtypes.rs @@ -162,7 +162,7 @@ pub struct CustomEmojiId(i32); pub struct LtreeDef(pub String); #[repr(transparent)] -#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Debug)] +#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Debug, Hash)] #[cfg_attr(feature = "full", derive(AsExpression, FromSqlRow))] #[cfg_attr(feature = "full", diesel(sql_type = diesel::sql_types::Text))] pub struct DbUrl(pub(crate) Box); diff --git a/crates/db_schema/src/schema.rs b/crates/db_schema/src/schema.rs index 30f327f61..c69b003a5 100644 --- a/crates/db_schema/src/schema.rs +++ b/crates/db_schema/src/schema.rs @@ -1,21 +1,21 @@ // @generated automatically by Diesel CLI. pub mod sql_types { - #[derive(diesel::sql_types::SqlType)] - #[diesel(postgres_type(name = "actor_type_enum"))] - pub struct ActorTypeEnum; + #[derive(diesel::sql_types::SqlType)] + #[diesel(postgres_type(name = "actor_type_enum"))] + pub struct ActorTypeEnum; - #[derive(diesel::sql_types::SqlType)] - #[diesel(postgres_type(name = "listing_type_enum"))] - pub struct ListingTypeEnum; + #[derive(diesel::sql_types::SqlType)] + #[diesel(postgres_type(name = "listing_type_enum"))] + pub struct ListingTypeEnum; - #[derive(diesel::sql_types::SqlType)] - #[diesel(postgres_type(name = "registration_mode_enum"))] - pub struct RegistrationModeEnum; + #[derive(diesel::sql_types::SqlType)] + #[diesel(postgres_type(name = "registration_mode_enum"))] + pub struct RegistrationModeEnum; - #[derive(diesel::sql_types::SqlType)] - #[diesel(postgres_type(name = "sort_type_enum"))] - pub struct SortTypeEnum; + #[derive(diesel::sql_types::SqlType)] + #[diesel(postgres_type(name = "sort_type_enum"))] + pub struct SortTypeEnum; } diesel::table! { @@ -409,9 +409,9 @@ diesel::table! { totp_2fa_secret -> Nullable, totp_2fa_url -> Nullable, open_links_in_new_tab -> Bool, + infinite_scroll_enabled -> Bool, blur_nsfw -> Bool, auto_expand -> Bool, - infinite_scroll_enabled -> Bool, } } @@ -805,7 +805,9 @@ diesel::table! { data -> Json, sensitive -> Bool, published -> Timestamp, - send_targets -> Jsonb, + send_inboxes -> Array>, + send_community_followers_of -> Array>, + send_all_instances -> Bool, actor_type -> ActorTypeEnum, actor_apub_id -> Nullable, } @@ -953,69 +955,69 @@ diesel::joinable!(site_language -> site (site_id)); diesel::joinable!(tagline -> local_site (local_site_id)); diesel::allow_tables_to_appear_in_same_query!( - admin_purge_comment, - admin_purge_community, - admin_purge_person, - admin_purge_post, - captcha_answer, - comment, - comment_aggregates, - comment_like, - comment_reply, - comment_report, - comment_saved, - community, - community_aggregates, - community_block, - community_follower, - community_language, - community_moderator, - community_person_ban, - custom_emoji, - custom_emoji_keyword, - email_verification, - federation_allowlist, - federation_blocklist, - federation_queue_state, - instance, - language, - local_site, - local_site_rate_limit, - local_user, - local_user_language, - mod_add, - mod_add_community, - mod_ban, - mod_ban_from_community, - mod_feature_post, - mod_hide_community, - mod_lock_post, - mod_remove_comment, - mod_remove_community, - mod_remove_post, - mod_transfer_community, - password_reset_request, - person, - person_aggregates, - person_ban, - person_block, - person_follower, - person_mention, - person_post_aggregates, - post, - post_aggregates, - post_like, - post_read, - post_report, - post_saved, - private_message, - private_message_report, - received_activity, - registration_application, - secret, - sent_activity, - site, - site_aggregates, - site_language, - tagline, + admin_purge_comment, + admin_purge_community, + admin_purge_person, + admin_purge_post, + captcha_answer, + comment, + comment_aggregates, + comment_like, + comment_reply, + comment_report, + comment_saved, + community, + community_aggregates, + community_block, + community_follower, + community_language, + community_moderator, + community_person_ban, + custom_emoji, + custom_emoji_keyword, + email_verification, + federation_allowlist, + federation_blocklist, + federation_queue_state, + instance, + language, + local_site, + local_site_rate_limit, + local_user, + local_user_language, + mod_add, + mod_add_community, + mod_ban, + mod_ban_from_community, + mod_feature_post, + mod_hide_community, + mod_lock_post, + mod_remove_comment, + mod_remove_community, + mod_remove_post, + mod_transfer_community, + password_reset_request, + person, + person_aggregates, + person_ban, + person_block, + person_follower, + person_mention, + person_post_aggregates, + post, + post_aggregates, + post_like, + post_read, + post_report, + post_saved, + private_message, + private_message_report, + received_activity, + registration_application, + secret, + sent_activity, + site, + site_aggregates, + site_language, + tagline, ); diff --git a/crates/db_schema/src/source/activity.rs b/crates/db_schema/src/source/activity.rs index cef101ccc..a7b4e6687 100644 --- a/crates/db_schema/src/source/activity.rs +++ b/crates/db_schema/src/source/activity.rs @@ -3,13 +3,15 @@ use crate::{ schema::sent_activity, }; use diesel::{ + backend::Backend, deserialize::FromSql, pg::{Pg, PgValue}, serialize::{Output, ToSql}, - sql_types::Jsonb, + sql_types::{Array, Jsonb, Nullable}, + Queryable, }; use serde_json::Value; -use std::{collections::HashSet, fmt::Debug, io::Write}; +use std::{collections::HashSet, fmt::Debug, hash::Hash, io::Write}; use url::Url; #[derive( @@ -72,17 +74,50 @@ pub struct SentActivity { pub data: Value, pub sensitive: bool, pub published: chrono::NaiveDateTime, - pub send_targets: ActivitySendTargets, + #[diesel(deserialize_as = ArrayToHashSet)] + pub send_inboxes: HashSet, + #[diesel(deserialize_as = ArrayToHashSet)] + pub send_community_followers_of: HashSet, + pub send_all_instances: bool, pub actor_type: ActorType, pub actor_apub_id: Option, } + +// wrapper to remove optional from array values and convert to hashset +pub struct ArrayToHashSet(HashSet); + +impl Queryable>, DB> for ArrayToHashSet +where + DB: Backend, + T1: FromSql + Hash + Eq, + Vec>: FromSql>, DB>, +{ + type Row = Vec>; + + fn build(row: Self::Row) -> diesel::deserialize::Result { + let res: diesel::deserialize::Result> = row + .into_iter() + .map(|e| e.ok_or("array with null element".into())) + .collect(); + res.map(ArrayToHashSet) + } +} + +impl From> for HashSet { + fn from(val: ArrayToHashSet) -> Self { + val.0 + } +} + #[derive(Insertable)] #[diesel(table_name = sent_activity)] pub struct SentActivityForm { pub ap_id: DbUrl, pub data: Value, pub sensitive: bool, - pub send_targets: ActivitySendTargets, + pub send_inboxes: Vec>, + pub send_community_followers_of: Vec>, + pub send_all_instances: bool, pub actor_type: ActorType, pub actor_apub_id: DbUrl, } diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index a3d7ac379..86cacae7b 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -32,7 +32,7 @@ use std::{ use tokio::{sync::mpsc::UnboundedSender, time::sleep}; use tokio_util::sync::CancellationToken; /// save state to db every n sends if there's no failures (otherwise state is saved after every attempt) -static SAVE_STATE_EVERY_IT: i64 = 100; +static CHECK_SAVE_STATE_EVERY_IT: i64 = 100; static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(10); /// loop fetch new activities from db and send them to the inboxes of the given instances @@ -74,7 +74,7 @@ pub async fn instance_worker( } let mut processed_activities = 0; 'batch: while id < latest_id - && processed_activities < SAVE_STATE_EVERY_IT + && processed_activities < CHECK_SAVE_STATE_EVERY_IT && !stop.is_cancelled() { id += 1; @@ -167,23 +167,23 @@ fn get_inbox_urls( activity: &SentActivity, ) -> HashSet> { let mut inbox_urls = HashSet::new(); - let targets = &activity.send_targets; - if targets.all_instances { + + if activity.send_all_instances { if let Some(site) = &site { // todo: when does an instance not have a site? inbox_urls.insert(intern_url(Cow::Borrowed(site.inbox_url.deref()))); } } - for t in &targets.community_followers_of { + for t in &activity.send_community_followers_of { if let Some(urls) = followed_communities.get(t) { inbox_urls.extend(urls.iter().map(std::clone::Clone::clone)); } } - for inbox in &targets.inboxes { + for inbox in &activity.send_inboxes { if inbox.domain() != Some(&instance.domain) { continue; } - inbox_urls.insert(intern_url(Cow::Borrowed(inbox))); + inbox_urls.insert(intern_url(Cow::Borrowed(inbox.inner()))); } inbox_urls } diff --git a/migrations/2023-08-01-115243_persistent-activity-queue/down.sql b/migrations/2023-08-01-115243_persistent-activity-queue/down.sql index ea4ef719b..30800fe5b 100644 --- a/migrations/2023-08-01-115243_persistent-activity-queue/down.sql +++ b/migrations/2023-08-01-115243_persistent-activity-queue/down.sql @@ -1,5 +1,7 @@ ALTER TABLE sent_activity - DROP COLUMN send_targets, + DROP COLUMN send_inboxes, + DROP COLUMN send_community_followers_of, + DROP COLUMN send_all_instances, DROP COLUMN actor_apub_id, DROP COLUMN actor_type; diff --git a/migrations/2023-08-01-115243_persistent-activity-queue/up.sql b/migrations/2023-08-01-115243_persistent-activity-queue/up.sql index 454cc4660..0385fc830 100644 --- a/migrations/2023-08-01-115243_persistent-activity-queue/up.sql +++ b/migrations/2023-08-01-115243_persistent-activity-queue/up.sql @@ -4,15 +4,20 @@ CREATE TYPE actor_type_enum AS enum( 'person' ); --- actor_apub_id only null for old entries +-- actor_apub_id only null for old entries before this migration ALTER TABLE sent_activity - ADD COLUMN send_targets jsonb NOT NULL DEFAULT '{"inboxes": [], "community_followers_of": [], "all_instances": false}', + ADD COLUMN send_inboxes text[] NOT NULL DEFAULT '{}', -- list of specific inbox urls + ADD COLUMN send_community_followers_of integer[] NOT NULL DEFAULT '{}', + ADD COLUMN send_all_instances boolean NOT NULL DEFAULT FALSE, ADD COLUMN actor_type actor_type_enum NOT NULL DEFAULT 'person', ADD COLUMN actor_apub_id text DEFAULT NULL; ALTER TABLE sent_activity - ALTER COLUMN send_targets DROP DEFAULT, - ALTER COLUMN actor_type DROP DEFAULT; + ALTER COLUMN send_inboxes DROP DEFAULT, + ALTER COLUMN send_community_followers_of DROP DEFAULT, + ALTER COLUMN send_all_instances DROP DEFAULT, + ALTER COLUMN actor_type DROP DEFAULT, + ALTER COLUMN actor_apub_id DROP DEFAULT; CREATE TABLE federation_queue_state( domain text PRIMARY KEY,