diff --git a/Cargo.lock b/Cargo.lock index 47e2012b0..41c3667c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5439,9 +5439,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.36.0" +version = "1.37.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931" +checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" dependencies = [ "backtrace", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 57d568b94..155526a09 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -139,7 +139,7 @@ anyhow = { version = "1.0.81", features = [ diesel_ltree = "0.3.1" typed-builder = "0.18.1" serial_test = "2.0.0" -tokio = { version = "1.36.0", features = ["full"] } +tokio = { version = "1.37.0", features = ["full"] } regex = "1.10.3" once_cell = "1.19.0" diesel-derive-newtype = "2.1.0" diff --git a/config/defaults.hjson b/config/defaults.hjson index 1fbab1fb9..40dba0d14 100644 --- a/config/defaults.hjson +++ b/config/defaults.hjson @@ -106,10 +106,11 @@ port: 8536 # Whether the site is available over TLS. Needs to be true for federation to work. tls_enabled: true - # The number of activitypub federation workers that can be in-flight concurrently - worker_count: 0 - # The number of activitypub federation retry workers that can be in-flight concurrently - retry_count: 0 + federation: { + # Limit to the number of concurrent outgoing federation requests per target instance. + # Set this to a higher value than 1 (e.g. 6) only if you have a huge instance (>10 activities per second) and if a receiving instance is not keeping up. + concurrent_sends_per_instance: 1 + } prometheus: { bind: "127.0.0.1" port: 10002 diff --git a/crates/federate/src/inboxes.rs b/crates/federate/src/inboxes.rs new file mode 100644 index 000000000..dde096053 --- /dev/null +++ b/crates/federate/src/inboxes.rs @@ -0,0 +1,149 @@ +use crate::util::LEMMY_TEST_FAST_FEDERATION; +use anyhow::Result; +use chrono::{DateTime, TimeZone, Utc}; +use lemmy_db_schema::{ + newtypes::{CommunityId, InstanceId}, + source::{activity::SentActivity, site::Site}, + utils::{ActualDbPool, DbPool}, +}; +use lemmy_db_views_actor::structs::CommunityFollowerView; +use once_cell::sync::Lazy; +use reqwest::Url; +use std::collections::{HashMap, HashSet}; + +/// interval with which new additions to community_followers are queried. +/// +/// The first time some user on an instance follows a specific remote community (or, more precisely: the first time a (followed_community_id, follower_inbox_url) tuple appears), +/// this delay limits the maximum time until the follow actually results in activities from that community id being sent to that inbox url. +/// This delay currently needs to not be too small because the DB load is currently fairly high because of the current structure of storing inboxes for every person, not having a separate list of shared_inboxes, and the architecture of having every instance queue be fully separate. +/// (see https://github.com/LemmyNet/lemmy/issues/3958) +static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy = Lazy::new(|| { + if *LEMMY_TEST_FAST_FEDERATION { + chrono::TimeDelta::try_seconds(1).expect("TimeDelta out of bounds") + } else { + chrono::TimeDelta::try_minutes(2).expect("TimeDelta out of bounds") + } +}); +/// The same as FOLLOW_ADDITIONS_RECHECK_DELAY, but triggering when the last person on an instance unfollows a specific remote community. +/// This is expected to happen pretty rarely and updating it in a timely manner is not too important. +static FOLLOW_REMOVALS_RECHECK_DELAY: Lazy = + Lazy::new(|| chrono::TimeDelta::try_hours(1).expect("TimeDelta out of bounds")); + +pub(crate) struct CommunityInboxCollector { + // load site lazily because if an instance is first seen due to being on allowlist, + // the corresponding row in `site` may not exist yet since that is only added once + // `fetch_instance_actor_for_object` is called. + // (this should be unlikely to be relevant outside of the federation tests) + site_loaded: bool, + site: Option, + followed_communities: HashMap>, + last_full_communities_fetch: DateTime, + last_incremental_communities_fetch: DateTime, + instance_id: InstanceId, + domain: String, + pool: ActualDbPool, +} +impl CommunityInboxCollector { + pub fn new( + pool: ActualDbPool, + instance_id: InstanceId, + domain: String, + ) -> CommunityInboxCollector { + CommunityInboxCollector { + pool, + site_loaded: false, + site: None, + followed_communities: HashMap::new(), + last_full_communities_fetch: Utc.timestamp_nanos(0), + last_incremental_communities_fetch: Utc.timestamp_nanos(0), + instance_id, + domain, + } + } + /// get inbox urls of sending the given activity to the given instance + /// most often this will return 0 values (if instance doesn't care about the activity) + /// or 1 value (the shared inbox) + /// > 1 values only happens for non-lemmy software + pub async fn get_inbox_urls(&mut self, activity: &SentActivity) -> Result> { + let mut inbox_urls: HashSet = HashSet::new(); + + if activity.send_all_instances { + if !self.site_loaded { + self.site = Site::read_from_instance_id(&mut self.pool(), self.instance_id).await?; + self.site_loaded = true; + } + if let Some(site) = &self.site { + // Nutomic: Most non-lemmy software wont have a site row. That means it cant handle these activities. So handling it like this is fine. + inbox_urls.insert(site.inbox_url.inner().clone()); + } + } + if let Some(t) = &activity.send_community_followers_of { + if let Some(urls) = self.followed_communities.get(t) { + inbox_urls.extend(urls.iter().cloned()); + } + } + inbox_urls.extend( + activity + .send_inboxes + .iter() + .filter_map(std::option::Option::as_ref) + .filter(|&u| (u.domain() == Some(&self.domain))) + .map(|u| u.inner().clone()), + ); + Ok(inbox_urls.into_iter().collect()) + } + + pub async fn update_communities(&mut self) -> Result<()> { + if (Utc::now() - self.last_full_communities_fetch) > *FOLLOW_REMOVALS_RECHECK_DELAY { + tracing::debug!("{}: fetching full list of communities", self.domain); + // process removals every hour + (self.followed_communities, self.last_full_communities_fetch) = self + .get_communities(self.instance_id, Utc.timestamp_nanos(0)) + .await?; + self.last_incremental_communities_fetch = self.last_full_communities_fetch; + } + if (Utc::now() - self.last_incremental_communities_fetch) > *FOLLOW_ADDITIONS_RECHECK_DELAY { + // process additions every minute + let (news, time) = self + .get_communities(self.instance_id, self.last_incremental_communities_fetch) + .await?; + if !news.is_empty() { + tracing::debug!( + "{}: fetched {} incremental new followed communities", + self.domain, + news.len() + ); + } + self.followed_communities.extend(news); + self.last_incremental_communities_fetch = time; + } + Ok(()) + } + + /// get a list of local communities with the remote inboxes on the given instance that cares about them + async fn get_communities( + &mut self, + instance_id: InstanceId, + last_fetch: DateTime, + ) -> Result<(HashMap>, DateTime)> { + let new_last_fetch = + Utc::now() - chrono::TimeDelta::try_seconds(10).expect("TimeDelta out of bounds"); // update to time before fetch to ensure overlap. subtract 10s to ensure overlap even if published date is not exact + Ok(( + CommunityFollowerView::get_instance_followed_community_inboxes( + &mut self.pool(), + instance_id, + last_fetch, + ) + .await? + .into_iter() + .fold(HashMap::new(), |mut map, (c, u)| { + map.entry(c).or_default().insert(u.into()); + map + }), + new_last_fetch, + )) + } + fn pool(&self) -> DbPool<'_> { + DbPool::Pool(&self.pool) + } +} diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs index a4dc49536..cb2fd1fa6 100644 --- a/crates/federate/src/lib.rs +++ b/crates/federate/src/lib.rs @@ -1,7 +1,11 @@ use crate::{util::CancellableTask, worker::InstanceWorker}; use activitypub_federation::config::FederationConfig; use chrono::{Local, Timelike}; -use lemmy_api_common::{context::LemmyContext, federate_retry_sleep_duration}; +use lemmy_api_common::{ + context::LemmyContext, + federate_retry_sleep_duration, + lemmy_utils::settings::structs::FederationWorkerConfig, +}; use lemmy_db_schema::{ newtypes::InstanceId, source::{federation_queue_state::FederationQueueState, instance::Instance}, @@ -14,6 +18,8 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; +mod inboxes; +mod send; mod util; mod worker; @@ -34,7 +40,8 @@ pub struct Opts { async fn start_stop_federation_workers( opts: Opts, pool: ActualDbPool, - federation_config: FederationConfig, + federation_lib_config: FederationConfig, + federation_worker_config: FederationWorkerConfig, cancel: CancellationToken, ) -> anyhow::Result<()> { let mut workers = HashMap::::new(); @@ -43,7 +50,9 @@ async fn start_stop_federation_workers( let exit_print = tokio::spawn(receive_print_stats(pool.clone(), stats_receiver)); let pool2 = &mut DbPool::Pool(&pool); let process_index = opts.process_index - 1; - let local_domain = federation_config.settings().get_hostname_without_port()?; + let local_domain = federation_lib_config + .settings() + .get_hostname_without_port()?; loop { let mut total_count = 0; let mut dead_count = 0; @@ -71,26 +80,19 @@ async fn start_stop_federation_workers( continue; } // create new worker - let config = federation_config.clone(); + let config = federation_lib_config.clone(); let stats_sender = stats_sender.clone(); - let pool = pool.clone(); + let federation_worker_config = federation_worker_config.clone(); workers.insert( instance.id, CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |stop| { - let instance = instance.clone(); - let req_data = config.clone().to_request_data(); - let stats_sender = stats_sender.clone(); - let pool = pool.clone(); - async move { - InstanceWorker::init_and_loop( - instance, - req_data, - &mut DbPool::Pool(&pool), - stop, - stats_sender, - ) - .await - } + InstanceWorker::init_and_loop( + instance.clone(), + config.clone(), + federation_worker_config.clone(), + stop, + stats_sender.clone(), + ) }), ); } else if !should_federate { @@ -126,12 +128,16 @@ pub fn start_stop_federation_workers_cancellable( opts: Opts, pool: ActualDbPool, config: FederationConfig, + federation_worker_config: FederationWorkerConfig, ) -> CancellableTask { CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |stop| { - let opts = opts.clone(); - let pool = pool.clone(); - let config = config.clone(); - async move { start_stop_federation_workers(opts, pool, config, stop).await } + start_stop_federation_workers( + opts.clone(), + pool.clone(), + config.clone(), + federation_worker_config.clone(), + stop, + ) }) } diff --git a/crates/federate/src/send.rs b/crates/federate/src/send.rs new file mode 100644 index 000000000..41516eef7 --- /dev/null +++ b/crates/federate/src/send.rs @@ -0,0 +1,117 @@ +use crate::util::get_actor_cached; +use activitypub_federation::{ + activity_sending::SendActivityTask, + config::Data, + protocol::context::WithContext, +}; +use anyhow::{Context, Result}; +use chrono::{DateTime, Utc}; +use lemmy_api_common::{context::LemmyContext, federate_retry_sleep_duration}; +use lemmy_apub::{activity_lists::SharedInboxActivities, FEDERATION_CONTEXT}; +use lemmy_db_schema::{newtypes::ActivityId, source::activity::SentActivity}; +use reqwest::Url; +use std::ops::Deref; +use tokio::{sync::mpsc::UnboundedSender, time::sleep}; +use tokio_util::sync::CancellationToken; + +#[derive(Debug, PartialEq, Eq)] +pub(crate) struct SendSuccessInfo { + pub activity_id: ActivityId, + pub published: Option>, + pub was_skipped: bool, +} +// need to be able to order them for the binary heap in the worker +impl PartialOrd for SendSuccessInfo { + fn partial_cmp(&self, other: &Self) -> Option { + other.activity_id.partial_cmp(&self.activity_id) + } +} +impl Ord for SendSuccessInfo { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + other.activity_id.cmp(&self.activity_id) + } +} +pub(crate) enum SendActivityResult { + Success(SendSuccessInfo), + Failure { + fail_count: i32, + // activity_id: ActivityId, + }, +} + +pub(crate) struct SendRetryTask<'a> { + pub activity: &'a SentActivity, + pub object: &'a SharedInboxActivities, + /// must not be empty at this point + pub inbox_urls: Vec, + /// report to the main instance worker + pub report: &'a mut UnboundedSender, + /// the first request will be sent immediately, but the next one will be delayed according to the number of previous fails + 1 + pub initial_fail_count: i32, + /// for logging + pub domain: String, + pub context: Data, + pub stop: CancellationToken, +} + +impl<'a> SendRetryTask<'a> { + // this function will return successfully when (a) send succeeded or (b) worker cancelled + // and will return an error if an internal error occurred (send errors cause an infinite loop) + pub async fn send_retry_loop(self) -> Result<()> { + let SendRetryTask { + activity, + object, + inbox_urls, + report, + initial_fail_count, + domain, + context, + stop, + } = self; + debug_assert!(!inbox_urls.is_empty()); + + let pool = &mut context.pool(); + let Some(actor_apub_id) = &activity.actor_apub_id else { + return Err(anyhow::anyhow!("activity is from before lemmy 0.19")); + }; + let actor = get_actor_cached(pool, activity.actor_type, actor_apub_id) + .await + .context("failed getting actor instance (was it marked deleted / removed?)")?; + + let object = WithContext::new(object.clone(), FEDERATION_CONTEXT.deref().clone()); + let requests = SendActivityTask::prepare(&object, actor.as_ref(), inbox_urls, &context).await?; + for task in requests { + // usually only one due to shared inbox + tracing::debug!("sending out {}", task); + let mut fail_count = initial_fail_count; + while let Err(e) = task.sign_and_send(&context).await { + fail_count += 1; + report.send(SendActivityResult::Failure { + fail_count, + // activity_id: activity.id, + })?; + let retry_delay = federate_retry_sleep_duration(fail_count); + tracing::info!( + "{}: retrying {:?} attempt {} with delay {retry_delay:.2?}. ({e})", + domain, + activity.id, + fail_count + ); + tokio::select! { + () = sleep(retry_delay) => {}, + () = stop.cancelled() => { + // save state to db and exit + // TODO: do we need to report state here to prevent hang on exit? + return Ok(()); + } + } + } + } + report.send(SendActivityResult::Success(SendSuccessInfo { + activity_id: activity.id, + published: Some(activity.published), + was_skipped: false, + }))?; + Ok(()) + } +} diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index ff2a68e3c..c282e482a 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -1,126 +1,152 @@ -use crate::util::{ - get_activity_cached, - get_actor_cached, - get_latest_activity_id, - LEMMY_TEST_FAST_FEDERATION, - WORK_FINISHED_RECHECK_DELAY, -}; -use activitypub_federation::{ - activity_sending::SendActivityTask, - config::Data, - protocol::context::WithContext, +use crate::{ + inboxes::CommunityInboxCollector, + send::{SendActivityResult, SendRetryTask, SendSuccessInfo}, + util::{get_activity_cached, get_latest_activity_id, WORK_FINISHED_RECHECK_DELAY}, }; +use activitypub_federation::config::FederationConfig; use anyhow::{Context, Result}; use chrono::{DateTime, Days, TimeZone, Utc}; -use lemmy_api_common::{context::LemmyContext, federate_retry_sleep_duration}; -use lemmy_apub::{activity_lists::SharedInboxActivities, FEDERATION_CONTEXT}; +use lemmy_api_common::{ + context::LemmyContext, + federate_retry_sleep_duration, + lemmy_utils::settings::structs::FederationWorkerConfig, +}; use lemmy_db_schema::{ - newtypes::{ActivityId, CommunityId, InstanceId}, + newtypes::ActivityId, source::{ - activity::SentActivity, federation_queue_state::FederationQueueState, instance::{Instance, InstanceForm}, - site::Site, }, - utils::{naive_now, DbPool}, + utils::{naive_now, ActualDbPool, DbPool}, }; -use lemmy_db_views_actor::structs::CommunityFollowerView; -use once_cell::sync::Lazy; -use reqwest::Url; -use std::{ - collections::{HashMap, HashSet}, - ops::{Add, Deref}, - time::Duration, +use std::{collections::BinaryHeap, ops::Add, time::Duration}; +use tokio::{ + sync::mpsc::{self, UnboundedSender}, + time::sleep, }; -use tokio::{sync::mpsc::UnboundedSender, time::sleep}; use tokio_util::sync::CancellationToken; -/// Check whether to save state to db every n sends if there's no failures (during failures state is saved after every attempt) -/// This determines the batch size for loop_batch. After a batch ends and SAVE_STATE_EVERY_TIME has passed, the federation_queue_state is updated in the DB. -static CHECK_SAVE_STATE_EVERY_IT: i64 = 100; /// Save state to db after this time has passed since the last state (so if the server crashes or is SIGKILLed, less than X seconds of activities are resent) static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(60); -/// interval with which new additions to community_followers are queried. -/// -/// The first time some user on an instance follows a specific remote community (or, more precisely: the first time a (followed_community_id, follower_inbox_url) tuple appears), -/// this delay limits the maximum time until the follow actually results in activities from that community id being sent to that inbox url. -/// This delay currently needs to not be too small because the DB load is currently fairly high because of the current structure of storing inboxes for every person, not having a separate list of shared_inboxes, and the architecture of having every instance queue be fully separate. -/// (see https://github.com/LemmyNet/lemmy/issues/3958) -static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy = Lazy::new(|| { - if *LEMMY_TEST_FAST_FEDERATION { - chrono::TimeDelta::try_seconds(1).expect("TimeDelta out of bounds") - } else { - chrono::TimeDelta::try_minutes(2).expect("TimeDelta out of bounds") - } -}); -/// The same as FOLLOW_ADDITIONS_RECHECK_DELAY, but triggering when the last person on an instance unfollows a specific remote community. -/// This is expected to happen pretty rarely and updating it in a timely manner is not too important. -static FOLLOW_REMOVALS_RECHECK_DELAY: Lazy = - Lazy::new(|| chrono::TimeDelta::try_hours(1).expect("TimeDelta out of bounds")); + +/// Maximum number of successful sends to allow out of order +const MAX_SUCCESSFULS: usize = 1000; + pub(crate) struct InstanceWorker { instance: Instance, - // load site lazily because if an instance is first seen due to being on allowlist, - // the corresponding row in `site` may not exist yet since that is only added once - // `fetch_instance_actor_for_object` is called. - // (this should be unlikely to be relevant outside of the federation tests) - site_loaded: bool, - site: Option, - followed_communities: HashMap>, stop: CancellationToken, - context: Data, + federation_lib_config: FederationConfig, + federation_worker_config: FederationWorkerConfig, stats_sender: UnboundedSender<(String, FederationQueueState)>, - last_full_communities_fetch: DateTime, - last_incremental_communities_fetch: DateTime, state: FederationQueueState, last_state_insert: DateTime, + pool: ActualDbPool, + inbox_collector: CommunityInboxCollector, } impl InstanceWorker { pub(crate) async fn init_and_loop( instance: Instance, - context: Data, - pool: &mut DbPool<'_>, // in theory there's a ref to the pool in context, but i couldn't get that to work wrt lifetimes + config: FederationConfig, + federation_worker_config: FederationWorkerConfig, stop: CancellationToken, stats_sender: UnboundedSender<(String, FederationQueueState)>, ) -> Result<(), anyhow::Error> { - let state = FederationQueueState::load(pool, instance.id).await?; + let pool = config.to_request_data().inner_pool().clone(); + let state = FederationQueueState::load(&mut DbPool::Pool(&pool), instance.id).await?; let mut worker = InstanceWorker { + inbox_collector: CommunityInboxCollector::new( + pool.clone(), + instance.id, + instance.domain.clone(), + ), + federation_worker_config, instance, - site_loaded: false, - site: None, - followed_communities: HashMap::new(), stop, - context, + federation_lib_config: config, stats_sender, - last_full_communities_fetch: Utc.timestamp_nanos(0), - last_incremental_communities_fetch: Utc.timestamp_nanos(0), state, last_state_insert: Utc.timestamp_nanos(0), + pool, }; - worker.loop_until_stopped(pool).await + worker.loop_until_stopped().await } /// loop fetch new activities from db and send them to the inboxes of the given instances /// this worker only returns if (a) there is an internal error or (b) the cancellation token is cancelled (graceful exit) - pub(crate) async fn loop_until_stopped( - &mut self, - pool: &mut DbPool<'_>, - ) -> Result<(), anyhow::Error> { - let save_state_every = chrono::Duration::from_std(SAVE_STATE_EVERY_TIME).expect("not negative"); - - self.update_communities(pool).await?; + async fn loop_until_stopped(&mut self) -> Result<()> { self.initial_fail_sleep().await?; + let (mut last_sent_id, mut newest_id) = self.get_latest_ids().await?; + + // activities that have been successfully sent but + // that are not the lowest number and thus can't be written to the database yet + let mut successfuls = BinaryHeap::::new(); + // number of activities that currently have a task spawned to send it + let mut in_flight: i64 = 0; + + // each HTTP send will report back to this channel concurrently + let (report_send_result, mut receive_send_result) = + tokio::sync::mpsc::unbounded_channel::(); while !self.stop.is_cancelled() { - self.loop_batch(pool).await?; - if self.stop.is_cancelled() { - break; - } - if (Utc::now() - self.last_state_insert) > save_state_every { - self.save_and_send_state(pool).await?; + // check if we need to wait for a send to finish before sending the next one + // we wait if (a) the last request failed, only if a request is already in flight (not at the start of the loop) + // or (b) if we have too many successfuls in memory or (c) if we have too many in flight + let need_wait_for_event = (in_flight != 0 && self.state.fail_count > 0) + || successfuls.len() >= MAX_SUCCESSFULS + || in_flight >= self.federation_worker_config.concurrent_sends_per_instance; + if need_wait_for_event || receive_send_result.len() > 4 { + // if len() > 0 then this does not block and allows us to write to db more often + // if len is 0 then this means we wait for something to change our above conditions, + // which can only happen by an event sent into the channel + self + .handle_send_results(&mut receive_send_result, &mut successfuls, &mut in_flight) + .await?; + // handle_send_results does not guarantee that we are now in a condition where we want to send a new one, + // so repeat this check until the if no longer applies + continue; + } else { + // send a new activity if there is one + self.inbox_collector.update_communities().await?; + let next_id_to_send = ActivityId(last_sent_id.0 + 1); + { + // sanity check: calculate next id to send based on the last id and the in flight requests + let last_successful_id = self + .state + .last_successful_id + .map(|e| e.0) + .expect("set above"); + let expected_next_id = last_successful_id + (successfuls.len() as i64) + in_flight + 1; + // compare to next id based on incrementing + if expected_next_id != next_id_to_send.0 { + anyhow::bail!( + "{}: next id to send is not as expected: {:?} != {:?}", + self.instance.domain, + expected_next_id, + next_id_to_send + ) + } + } + + if next_id_to_send > newest_id { + // lazily fetch latest id only if we have cought up + newest_id = self.get_latest_ids().await?.1; + if next_id_to_send > newest_id { + // no more work to be done, wait before rechecking + tokio::select! { + () = sleep(*WORK_FINISHED_RECHECK_DELAY) => {}, + () = self.stop.cancelled() => {} + } + continue; + } + } + in_flight += 1; + last_sent_id = next_id_to_send; + self + .spawn_send_if_needed(next_id_to_send, report_send_result.clone()) + .await?; } - self.update_communities(pool).await?; } - // final update of state in db - self.save_and_send_state(pool).await?; + // final update of state in db on shutdown + self.save_and_send_state().await?; Ok(()) } @@ -137,6 +163,11 @@ impl InstanceWorker { return Ok(()); } let remaining = required - elapsed; + tracing::debug!( + "{}: fail-sleeping for {:?} before starting queue", + self.instance.domain, + remaining + ); tokio::select! { () = sleep(remaining) => {}, () = self.stop.cancelled() => {} @@ -144,213 +175,189 @@ impl InstanceWorker { } Ok(()) } - /// send out a batch of CHECK_SAVE_STATE_EVERY_IT activities - async fn loop_batch(&mut self, pool: &mut DbPool<'_>) -> Result<()> { - let latest_id = get_latest_activity_id(pool).await?; - let mut id = if let Some(id) = self.state.last_successful_id { - id + + /// return the last successfully sent id and the newest activity id in the database + /// sets last_successful_id in database if it's the first time this instance is seen + async fn get_latest_ids(&mut self) -> Result<(ActivityId, ActivityId)> { + let latest_id = get_latest_activity_id(&mut self.pool()).await?; + if let Some(last) = self.state.last_successful_id { + Ok((last, latest_id)) } else { // this is the initial creation (instance first seen) of the federation queue for this instance // skip all past activities: self.state.last_successful_id = Some(latest_id); // save here to ensure it's not read as 0 again later if no activities have happened - self.save_and_send_state(pool).await?; - latest_id - }; - if id >= latest_id { - // no more work to be done, wait before rechecking - tokio::select! { - () = sleep(*WORK_FINISHED_RECHECK_DELAY) => {}, - () = self.stop.cancelled() => {} - } - return Ok(()); + self.save_and_send_state().await?; + Ok((latest_id, latest_id)) } - let mut processed_activities = 0; - while id < latest_id - && processed_activities < CHECK_SAVE_STATE_EVERY_IT - && !self.stop.is_cancelled() - { - id = ActivityId(id.0 + 1); - processed_activities += 1; - let Some(ele) = get_activity_cached(pool, id) - .await - .context("failed reading activity from db")? - else { - tracing::debug!("{}: {:?} does not exist", self.instance.domain, id); - self.state.last_successful_id = Some(id); - continue; - }; - if let Err(e) = self.send_retry_loop(pool, &ele.0, &ele.1).await { - tracing::warn!( - "sending {} errored internally, skipping activity: {:?}", - ele.0.ap_id, - e - ); - } - if self.stop.is_cancelled() { - return Ok(()); + } + + async fn handle_send_results( + &mut self, + receive_inbox_result: &mut mpsc::UnboundedReceiver, + successfuls: &mut BinaryHeap, + in_flight: &mut i64, + ) -> Result<(), anyhow::Error> { + let mut force_write = false; + let mut events = Vec::new(); + // wait for at least one event but if there's multiple handle them all + receive_inbox_result.recv_many(&mut events, 1000).await; + for event in events { + match event { + SendActivityResult::Success(s) => { + self.state.fail_count = 0; + *in_flight -= 1; + if !s.was_skipped { + self.mark_instance_alive().await?; + } + successfuls.push(s); + } + SendActivityResult::Failure { fail_count, .. } => { + if fail_count > self.state.fail_count { + // override fail count - if multiple activities are currently sending this value may get conflicting info but that's fine + self.state.fail_count = fail_count; + self.state.last_retry = Some(Utc::now()); + force_write = true; + } + } } - // send success! - self.state.last_successful_id = Some(id); - self.state.last_successful_published_time = Some(ele.0.published); - self.state.fail_count = 0; } + self + .pop_successfuls_and_write(successfuls, force_write) + .await?; Ok(()) } + async fn mark_instance_alive(&mut self) -> Result<()> { + // Activity send successful, mark instance as alive if it hasn't been updated in a while. + let updated = self.instance.updated.unwrap_or(self.instance.published); + if updated.add(Days::new(1)) < Utc::now() { + self.instance.updated = Some(Utc::now()); - // this function will return successfully when (a) send succeeded or (b) worker cancelled - // and will return an error if an internal error occurred (send errors cause an infinite loop) - async fn send_retry_loop( + let form = InstanceForm::builder() + .domain(self.instance.domain.clone()) + .updated(Some(naive_now())) + .build(); + Instance::update(&mut self.pool(), self.instance.id, form).await?; + } + Ok(()) + } + /// Checks that sequential activities `last_successful_id + 1`, `last_successful_id + 2` etc have been sent successfully. + /// In that case updates `last_successful_id` and saves the state to the database if the time since the last save is greater than `SAVE_STATE_EVERY_TIME`. + async fn pop_successfuls_and_write( &mut self, - pool: &mut DbPool<'_>, - activity: &SentActivity, - object: &SharedInboxActivities, + successfuls: &mut BinaryHeap, + force_write: bool, ) -> Result<()> { - let inbox_urls = self - .get_inbox_urls(pool, activity) - .await - .context("failed figuring out inbox urls")?; - if inbox_urls.is_empty() { - tracing::debug!("{}: {:?} no inboxes", self.instance.domain, activity.id); - self.state.last_successful_id = Some(activity.id); - self.state.last_successful_published_time = Some(activity.published); + let Some(mut last_id) = self.state.last_successful_id else { + tracing::warn!("{} should be impossible: last successful id is None", self.instance.domain); return Ok(()); - } - let Some(actor_apub_id) = &activity.actor_apub_id else { - return Ok(()); // activity was inserted before persistent queue was activated }; - let actor = get_actor_cached(pool, activity.actor_type, actor_apub_id) - .await - .context("failed getting actor instance (was it marked deleted / removed?)")?; - - let object = WithContext::new(object.clone(), FEDERATION_CONTEXT.deref().clone()); - let inbox_urls = inbox_urls.into_iter().collect(); - let requests = - SendActivityTask::prepare(&object, actor.as_ref(), inbox_urls, &self.context).await?; - for task in requests { - // usually only one due to shared inbox - tracing::debug!("sending out {}", task); - while let Err(e) = task.sign_and_send(&self.context).await { - self.state.fail_count += 1; - self.state.last_retry = Some(Utc::now()); - let retry_delay: Duration = federate_retry_sleep_duration(self.state.fail_count); - tracing::info!( - "{}: retrying {:?} attempt {} with delay {retry_delay:.2?}. ({e})", - self.instance.domain, - activity.id, - self.state.fail_count - ); - self.save_and_send_state(pool).await?; - tokio::select! { - () = sleep(retry_delay) => {}, - () = self.stop.cancelled() => { - // save state to db and exit - return Ok(()); - } - } - } - - // Activity send successful, mark instance as alive if it hasn't been updated in a while. - let updated = self.instance.updated.unwrap_or(self.instance.published); - if updated.add(Days::new(1)) < Utc::now() { - self.instance.updated = Some(Utc::now()); + tracing::debug!( + "{} last: {:?}, next: {:?}, currently in successfuls: {:?}", + self.instance.domain, + last_id, + successfuls.peek(), + successfuls.iter() + ); + while successfuls + .peek() + .map(|a| &a.activity_id == &ActivityId(last_id.0 + 1)) + .unwrap_or(false) + { + let next = successfuls.pop().unwrap(); + last_id = next.activity_id; + self.state.last_successful_id = Some(next.activity_id); + self.state.last_successful_published_time = next.published; + } - let form = InstanceForm::builder() - .domain(self.instance.domain.clone()) - .updated(Some(naive_now())) - .build(); - Instance::update(pool, self.instance.id, form).await?; - } + let save_state_every = chrono::Duration::from_std(SAVE_STATE_EVERY_TIME).expect("not negative"); + if force_write || (Utc::now() - self.last_state_insert) > save_state_every { + self.save_and_send_state().await?; } Ok(()) } - /// get inbox urls of sending the given activity to the given instance - /// most often this will return 0 values (if instance doesn't care about the activity) - /// or 1 value (the shared inbox) - /// > 1 values only happens for non-lemmy software - async fn get_inbox_urls( + /// we collect the relevant inboxes in the main instance worker task, and only spawn the send task if we have inboxes to send to + /// this limits CPU usage and reduces overhead for the (many) cases where we don't have any inboxes + async fn spawn_send_if_needed( &mut self, - pool: &mut DbPool<'_>, - activity: &SentActivity, - ) -> Result> { - let mut inbox_urls: HashSet = HashSet::new(); - - if activity.send_all_instances { - if !self.site_loaded { - self.site = Site::read_from_instance_id(pool, self.instance.id).await?; - self.site_loaded = true; - } - if let Some(site) = &self.site { - // Nutomic: Most non-lemmy software wont have a site row. That means it cant handle these activities. So handling it like this is fine. - inbox_urls.insert(site.inbox_url.inner().clone()); - } + activity_id: ActivityId, + report: UnboundedSender, + ) -> Result<()> { + let Some(ele) = get_activity_cached(&mut self.pool(), activity_id) + .await + .context("failed reading activity from db")? + else { + tracing::debug!("{}: {:?} does not exist", self.instance.domain, activity_id); + report.send(SendActivityResult::Success(SendSuccessInfo { + activity_id, + published: None, + was_skipped: true, + }))?; + return Ok(()); + }; + let activity = &ele.0; + let inbox_urls = self + .inbox_collector + .get_inbox_urls(activity) + .await + .context("failed figuring out inbox urls")?; + if inbox_urls.is_empty() { + tracing::debug!("{}: {:?} no inboxes", self.instance.domain, activity.id); + report.send(SendActivityResult::Success(SendSuccessInfo { + activity_id, + published: Some(activity.published), + was_skipped: true, + }))?; + return Ok(()); } - if let Some(t) = &activity.send_community_followers_of { - if let Some(urls) = self.followed_communities.get(t) { - inbox_urls.extend(urls.iter().cloned()); + let initial_fail_count = self.state.fail_count; + let data = self.federation_lib_config.to_request_data(); + let stop = self.stop.clone(); + let domain = self.instance.domain.clone(); + tokio::spawn(async move { + let mut report = report; + let res = SendRetryTask { + activity: &ele.0, + object: &ele.1, + inbox_urls, + report: &mut report, + initial_fail_count, + domain, + context: data, + stop, } - } - inbox_urls.extend( - activity - .send_inboxes - .iter() - .filter_map(std::option::Option::as_ref) - .filter(|&u| (u.domain() == Some(&self.instance.domain))) - .map(|u| u.inner().clone()), - ); - Ok(inbox_urls) - } - - async fn update_communities(&mut self, pool: &mut DbPool<'_>) -> Result<()> { - if (Utc::now() - self.last_full_communities_fetch) > *FOLLOW_REMOVALS_RECHECK_DELAY { - // process removals every hour - (self.followed_communities, self.last_full_communities_fetch) = self - .get_communities(pool, self.instance.id, Utc.timestamp_nanos(0)) - .await?; - self.last_incremental_communities_fetch = self.last_full_communities_fetch; - } - if (Utc::now() - self.last_incremental_communities_fetch) > *FOLLOW_ADDITIONS_RECHECK_DELAY { - // process additions every minute - let (news, time) = self - .get_communities( - pool, - self.instance.id, - self.last_incremental_communities_fetch, - ) - .await?; - self.followed_communities.extend(news); - self.last_incremental_communities_fetch = time; - } + .send_retry_loop() + .await; + if let Err(e) = res { + tracing::warn!( + "sending {} errored internally, skipping activity: {:?}", + ele.0.ap_id, + e + ); + report + .send(SendActivityResult::Success(SendSuccessInfo { + activity_id, + published: None, + was_skipped: true, + })) + .ok(); + } + }); Ok(()) } - /// get a list of local communities with the remote inboxes on the given instance that cares about them - async fn get_communities( - &mut self, - pool: &mut DbPool<'_>, - instance_id: InstanceId, - last_fetch: DateTime, - ) -> Result<(HashMap>, DateTime)> { - let new_last_fetch = - Utc::now() - chrono::TimeDelta::try_seconds(10).expect("TimeDelta out of bounds"); // update to time before fetch to ensure overlap. subtract 10s to ensure overlap even if published date is not exact - Ok(( - CommunityFollowerView::get_instance_followed_community_inboxes(pool, instance_id, last_fetch) - .await? - .into_iter() - .fold(HashMap::new(), |mut map, (c, u)| { - map.entry(c).or_default().insert(u.into()); - map - }), - new_last_fetch, - )) - } - async fn save_and_send_state(&mut self, pool: &mut DbPool<'_>) -> Result<()> { + async fn save_and_send_state(&mut self) -> Result<()> { + tracing::debug!("{}: saving and sending state", self.instance.domain); self.last_state_insert = Utc::now(); - FederationQueueState::upsert(pool, &self.state).await?; + FederationQueueState::upsert(&mut self.pool(), &self.state).await?; self .stats_sender .send((self.instance.domain.clone(), self.state.clone()))?; Ok(()) } + + fn pool(&self) -> DbPool<'_> { + DbPool::Pool(&self.pool) + } } diff --git a/crates/utils/src/settings/structs.rs b/crates/utils/src/settings/structs.rs index 91a5b37f4..10f1d7f05 100644 --- a/crates/utils/src/settings/structs.rs +++ b/crates/utils/src/settings/structs.rs @@ -42,12 +42,8 @@ pub struct Settings { #[default(None)] #[doku(skip)] pub opentelemetry_url: Option, - /// The number of activitypub federation workers that can be in-flight concurrently - #[default(0)] - pub worker_count: usize, - /// The number of activitypub federation retry workers that can be in-flight concurrently - #[default(0)] - pub retry_count: usize, + #[default(Default::default())] + pub federation: FederationWorkerConfig, // Prometheus configuration. #[default(None)] #[doku(example = "Some(Default::default())")] @@ -234,3 +230,13 @@ pub struct PrometheusConfig { #[doku(example = "10002")] pub port: i32, } + +#[derive(Debug, Deserialize, Serialize, Clone, SmartDefault, Document)] +#[serde(default)] +// named federation"worker"config to disambiguate from the activitypub library configuration +pub struct FederationWorkerConfig { + /// Limit to the number of concurrent outgoing federation requests per target instance. + /// Set this to a higher value than 1 (e.g. 6) only if you have a huge instance (>10 activities per second) and if a receiving instance is not keeping up. + #[default(1)] + pub concurrent_sends_per_instance: i64, +} diff --git a/src/lib.rs b/src/lib.rs index 633fd5313..0666efc9a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -213,6 +213,7 @@ pub async fn start_lemmy_server(args: CmdArgs) -> LemmyResult<()> { }, pool.clone(), federation_config.clone(), + SETTINGS.federation.clone(), ) }); let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?;