From 539f06af977b9ced7880870920dfc4099050fdcf Mon Sep 17 00:00:00 2001 From: phiresky Date: Sat, 13 Apr 2024 23:18:28 +0200 Subject: [PATCH 1/9] federation: parallel sending --- Cargo.lock | 4 +- Cargo.toml | 2 +- crates/federate/src/lib.rs | 7 +- crates/federate/src/worker.rs | 424 ++++++++++++++++++++++++---------- 4 files changed, 303 insertions(+), 134 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 97504600b..979e9af10 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5439,9 +5439,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.36.0" +version = "1.37.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931" +checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" dependencies = [ "backtrace", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 4d9485ca5..6737d3b9c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -139,7 +139,7 @@ anyhow = { version = "1.0.81", features = [ diesel_ltree = "0.3.1" typed-builder = "0.18.1" serial_test = "2.0.0" -tokio = { version = "1.36.0", features = ["full"] } +tokio = { version = "1.37.0", features = ["full"] } regex = "1.10.3" once_cell = "1.19.0" diesel-derive-newtype = "2.1.0" diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs index a4dc49536..a48f31ed9 100644 --- a/crates/federate/src/lib.rs +++ b/crates/federate/src/lib.rs @@ -73,19 +73,16 @@ async fn start_stop_federation_workers( // create new worker let config = federation_config.clone(); let stats_sender = stats_sender.clone(); - let pool = pool.clone(); workers.insert( instance.id, CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |stop| { let instance = instance.clone(); - let req_data = config.clone().to_request_data(); + let config = config.clone(); let stats_sender = stats_sender.clone(); - let pool = pool.clone(); async move { InstanceWorker::init_and_loop( instance, - req_data, - &mut DbPool::Pool(&pool), + config, stop, stats_sender, ) diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index ff2a68e3c..0dbf1c391 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -7,7 +7,7 @@ use crate::util::{ }; use activitypub_federation::{ activity_sending::SendActivityTask, - config::Data, + config::{Data, FederationConfig}, protocol::context::WithContext, }; use anyhow::{Context, Result}; @@ -22,22 +22,22 @@ use lemmy_db_schema::{ instance::{Instance, InstanceForm}, site::Site, }, - utils::{naive_now, DbPool}, + utils::{naive_now, ActualDbPool, DbPool}, }; use lemmy_db_views_actor::structs::CommunityFollowerView; use once_cell::sync::Lazy; use reqwest::Url; use std::{ - collections::{HashMap, HashSet}, + collections::{BinaryHeap, HashMap, HashSet}, ops::{Add, Deref}, time::Duration, }; -use tokio::{sync::mpsc::UnboundedSender, time::sleep}; +use tokio::{ + sync::mpsc::{self, UnboundedSender}, + time::sleep, +}; use tokio_util::sync::CancellationToken; -/// Check whether to save state to db every n sends if there's no failures (during failures state is saved after every attempt) -/// This determines the batch size for loop_batch. After a batch ends and SAVE_STATE_EVERY_TIME has passed, the federation_queue_state is updated in the DB. -static CHECK_SAVE_STATE_EVERY_IT: i64 = 100; /// Save state to db after this time has passed since the last state (so if the server crashes or is SIGKILLed, less than X seconds of activities are resent) static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(60); /// interval with which new additions to community_followers are queried. @@ -57,6 +57,16 @@ static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy = Lazy::new(|| { /// This is expected to happen pretty rarely and updating it in a timely manner is not too important. static FOLLOW_REMOVALS_RECHECK_DELAY: Lazy = Lazy::new(|| chrono::TimeDelta::try_hours(1).expect("TimeDelta out of bounds")); + +static CONCURRENT_SENDS: Lazy = Lazy::new(|| { + std::env::var("LEMMY_FEDERATION_CONCURRENT_SENDS_PER_INSTANCE") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(8) +}); +/// Maximum number of successful sends to allow out of order +const MAX_SUCCESSFULS: usize = 1000; + pub(crate) struct InstanceWorker { instance: Instance, // load site lazily because if an instance is first seen due to being on allowlist, @@ -67,60 +77,116 @@ pub(crate) struct InstanceWorker { site: Option, followed_communities: HashMap>, stop: CancellationToken, - context: Data, + config: FederationConfig, stats_sender: UnboundedSender<(String, FederationQueueState)>, last_full_communities_fetch: DateTime, last_incremental_communities_fetch: DateTime, state: FederationQueueState, last_state_insert: DateTime, + pool: ActualDbPool, +} + +#[derive(Debug, PartialEq, Eq)] +struct SendSuccessInfo { + activity_id: ActivityId, + published: Option>, + was_skipped: bool, +} +impl PartialOrd for SendSuccessInfo { + fn partial_cmp(&self, other: &Self) -> Option { + other.activity_id.partial_cmp(&self.activity_id) + } +} +impl Ord for SendSuccessInfo { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + other.activity_id.cmp(&self.activity_id) + } + +} +enum SendActivityResult { + Success(SendSuccessInfo), + Failure { + fail_count: i32, + // activity_id: ActivityId, + }, } impl InstanceWorker { pub(crate) async fn init_and_loop( instance: Instance, - context: Data, - pool: &mut DbPool<'_>, // in theory there's a ref to the pool in context, but i couldn't get that to work wrt lifetimes + config: FederationConfig, + // pool: ActualDbPool, // in theory there's a ref to the pool in context, but i couldn't get that to work wrt lifetimes stop: CancellationToken, stats_sender: UnboundedSender<(String, FederationQueueState)>, ) -> Result<(), anyhow::Error> { - let state = FederationQueueState::load(pool, instance.id).await?; + let state = + FederationQueueState::load(&mut config.to_request_data().pool(), instance.id).await?; + let pool = config.to_request_data().inner_pool().clone(); let mut worker = InstanceWorker { instance, site_loaded: false, site: None, followed_communities: HashMap::new(), stop, - context, + config, stats_sender, last_full_communities_fetch: Utc.timestamp_nanos(0), last_incremental_communities_fetch: Utc.timestamp_nanos(0), state, last_state_insert: Utc.timestamp_nanos(0), + pool, }; - worker.loop_until_stopped(pool).await + worker.loop_until_stopped().await } /// loop fetch new activities from db and send them to the inboxes of the given instances /// this worker only returns if (a) there is an internal error or (b) the cancellation token is cancelled (graceful exit) - pub(crate) async fn loop_until_stopped( - &mut self, - pool: &mut DbPool<'_>, - ) -> Result<(), anyhow::Error> { - let save_state_every = chrono::Duration::from_std(SAVE_STATE_EVERY_TIME).expect("not negative"); - - self.update_communities(pool).await?; + async fn loop_until_stopped(&mut self) -> Result<()> { self.initial_fail_sleep().await?; + let mut latest_id = self.get_latest_id().await?; + + // activities that have been successfully sent but + // that are not the lowest number and thus can't be written to the database yet + let mut successfuls = BinaryHeap::::new(); + let mut in_flight: i64 = 0; + + let (report_inbox_result, mut receive_inbox_result) = + tokio::sync::mpsc::unbounded_channel::(); while !self.stop.is_cancelled() { - self.loop_batch(pool).await?; - if self.stop.is_cancelled() { - break; - } - if (Utc::now() - self.last_state_insert) > save_state_every { - self.save_and_send_state(pool).await?; + // check if we need to wait for a send to finish before sending the next one + let need_wait_for_event = (in_flight != 0 && self.state.fail_count > 0) + || successfuls.len() > MAX_SUCCESSFULS + || in_flight >= *CONCURRENT_SENDS; + if need_wait_for_event || receive_inbox_result.len() > 4 { + self + .handle_send_results(&mut receive_inbox_result, &mut successfuls, &mut in_flight) + .await?; + } else { + self.update_communities().await?; + let last_successful_id = self + .state + .last_successful_id + .map(|e| e.0) + .expect("set above"); + let next_id = ActivityId(last_successful_id + (successfuls.len() as i64) + in_flight + 1); + if next_id > latest_id { + latest_id = self.get_latest_id().await?; + if next_id > latest_id { + // no more work to be done, wait before rechecking + tokio::select! { + () = sleep(*WORK_FINISHED_RECHECK_DELAY) => {}, + () = self.stop.cancelled() => {} + } + continue; + } + } + in_flight += 1; + self + .spawn_send_if_needed(next_id, report_inbox_result.clone()) + .await?; } - self.update_communities(pool).await?; } - // final update of state in db - self.save_and_send_state(pool).await?; + // final update of state in db on shutdown + self.save_and_send_state().await?; Ok(()) } @@ -137,6 +203,11 @@ impl InstanceWorker { return Ok(()); } let remaining = required - elapsed; + tracing::debug!( + "{}: fail-sleeping for {:?} before starting queue", + self.instance.domain, + remaining + ); tokio::select! { () = sleep(remaining) => {}, () = self.stop.cancelled() => {} @@ -144,78 +215,174 @@ impl InstanceWorker { } Ok(()) } - /// send out a batch of CHECK_SAVE_STATE_EVERY_IT activities - async fn loop_batch(&mut self, pool: &mut DbPool<'_>) -> Result<()> { - let latest_id = get_latest_activity_id(pool).await?; - let mut id = if let Some(id) = self.state.last_successful_id { - id - } else { + + /// get newest activity id and set it as last_successful_id if it's the first time this instance is seen + async fn get_latest_id(&mut self) -> Result { + let latest_id = get_latest_activity_id(&mut self.pool()).await?; + if let None = self.state.last_successful_id { // this is the initial creation (instance first seen) of the federation queue for this instance // skip all past activities: self.state.last_successful_id = Some(latest_id); // save here to ensure it's not read as 0 again later if no activities have happened - self.save_and_send_state(pool).await?; - latest_id - }; - if id >= latest_id { - // no more work to be done, wait before rechecking - tokio::select! { - () = sleep(*WORK_FINISHED_RECHECK_DELAY) => {}, - () = self.stop.cancelled() => {} + self.save_and_send_state().await?; + } + Ok(latest_id) + } + + async fn handle_send_results( + &mut self, + receive_inbox_result: &mut mpsc::UnboundedReceiver, + successfuls: &mut BinaryHeap, + in_flight: &mut i64, + ) -> Result<(), anyhow::Error> { + let force_write = false; + let mut events = Vec::new(); + // wait for at least one event but if there's multiple handle them all + receive_inbox_result.recv_many(&mut events, 1000).await; + for event in events { + match event { + SendActivityResult::Success(s) => { + self.state.fail_count = 0; + *in_flight -= 1; + if !s.was_skipped { + self.mark_instance_alive().await?; + } + successfuls.push(s); + } + SendActivityResult::Failure { fail_count, .. } => { + if fail_count > self.state.fail_count { + // override fail count - if multiple activities are currently sending this value may get conflicting info but that's fine + self.state.fail_count = fail_count; + self.state.last_retry = Some(Utc::now()); + } + } } - return Ok(()); } - let mut processed_activities = 0; - while id < latest_id - && processed_activities < CHECK_SAVE_STATE_EVERY_IT - && !self.stop.is_cancelled() + self + .pop_successfuls_and_write(successfuls, force_write) + .await?; + Ok(()) + } + async fn mark_instance_alive(&mut self) -> Result<()> { + // Activity send successful, mark instance as alive if it hasn't been updated in a while. + let updated = self.instance.updated.unwrap_or(self.instance.published); + if updated.add(Days::new(1)) < Utc::now() { + self.instance.updated = Some(Utc::now()); + + let form = InstanceForm::builder() + .domain(self.instance.domain.clone()) + .updated(Some(naive_now())) + .build(); + Instance::update(&mut self.pool(), self.instance.id, form).await?; + } + Ok(()) + } + /// checks whether the highest successful id can be updated and writes to db if so + async fn pop_successfuls_and_write( + &mut self, + successfuls: &mut BinaryHeap, + force_write: bool, + ) -> Result<()> { + let Some(mut last_id) = self.state.last_successful_id else { + tracing::warn!("should be impossible: last successful id is None"); + return Ok(()); + }; + tracing::debug!( + "last: {:?}, next: {:?}, currently in successfuls: {:?}", + last_id, + successfuls.peek(), + successfuls.iter() + ); + while successfuls + .peek() + .map(|a| &a.activity_id == &ActivityId(last_id.0 + 1)) + .unwrap_or(false) { - id = ActivityId(id.0 + 1); - processed_activities += 1; - let Some(ele) = get_activity_cached(pool, id) - .await - .context("failed reading activity from db")? - else { - tracing::debug!("{}: {:?} does not exist", self.instance.domain, id); - self.state.last_successful_id = Some(id); - continue; - }; - if let Err(e) = self.send_retry_loop(pool, &ele.0, &ele.1).await { + let next = successfuls.pop().unwrap(); + last_id = next.activity_id; + self.state.last_successful_id = Some(next.activity_id); + self.state.last_successful_published_time = next.published; + } + + let save_state_every = chrono::Duration::from_std(SAVE_STATE_EVERY_TIME).expect("not negative"); + if force_write || (Utc::now() - self.last_state_insert) > save_state_every { + self.save_and_send_state().await?; + } + Ok(()) + } + + async fn spawn_send_if_needed( + &mut self, + activity_id: ActivityId, + report: UnboundedSender, + ) -> Result<()> { + let Some(ele) = get_activity_cached(&mut self.pool(), activity_id) + .await + .context("failed reading activity from db")? + else { + tracing::debug!("{}: {:?} does not exist", self.instance.domain, activity_id); + report.send(SendActivityResult::Success(SendSuccessInfo { + activity_id, + published: None, + was_skipped: true, + }))?; + return Ok(()); + }; + let activity = &ele.0; + let inbox_urls = self + .get_inbox_urls(activity) + .await + .context("failed figuring out inbox urls")?; + if inbox_urls.is_empty() { + tracing::debug!("{}: {:?} no inboxes", self.instance.domain, activity.id); + report.send(SendActivityResult::Success(SendSuccessInfo { + activity_id, + published: Some(activity.published), + was_skipped: true, + }))?; + return Ok(()); + } + let inbox_urls = inbox_urls.into_iter().collect(); + let initial_fail_count = self.state.fail_count; + let data = self.config.to_request_data(); + let stop = self.stop.clone(); + let domain = self.instance.domain.clone(); + tokio::spawn(async move { + if let Err(e) = InstanceWorker::send_retry_loop( + &ele.0, + &ele.1, + inbox_urls, + report, + initial_fail_count, + domain, + data, + stop, + ) + .await + { tracing::warn!( "sending {} errored internally, skipping activity: {:?}", ele.0.ap_id, e ); } - if self.stop.is_cancelled() { - return Ok(()); - } - // send success! - self.state.last_successful_id = Some(id); - self.state.last_successful_published_time = Some(ele.0.published); - self.state.fail_count = 0; - } + }); Ok(()) } // this function will return successfully when (a) send succeeded or (b) worker cancelled // and will return an error if an internal error occurred (send errors cause an infinite loop) async fn send_retry_loop( - &mut self, - pool: &mut DbPool<'_>, activity: &SentActivity, object: &SharedInboxActivities, + inbox_urls: Vec, + report: UnboundedSender, + initial_fail_count: i32, + domain: String, + context: Data, + stop: CancellationToken, ) -> Result<()> { - let inbox_urls = self - .get_inbox_urls(pool, activity) - .await - .context("failed figuring out inbox urls")?; - if inbox_urls.is_empty() { - tracing::debug!("{}: {:?} no inboxes", self.instance.domain, activity.id); - self.state.last_successful_id = Some(activity.id); - self.state.last_successful_published_time = Some(activity.published); - return Ok(()); - } + let pool = &mut context.pool(); let Some(actor_apub_id) = &activity.actor_apub_id else { return Ok(()); // activity was inserted before persistent queue was activated }; @@ -224,61 +391,50 @@ impl InstanceWorker { .context("failed getting actor instance (was it marked deleted / removed?)")?; let object = WithContext::new(object.clone(), FEDERATION_CONTEXT.deref().clone()); - let inbox_urls = inbox_urls.into_iter().collect(); - let requests = - SendActivityTask::prepare(&object, actor.as_ref(), inbox_urls, &self.context).await?; + let requests = SendActivityTask::prepare(&object, actor.as_ref(), inbox_urls, &context).await?; for task in requests { // usually only one due to shared inbox tracing::debug!("sending out {}", task); - while let Err(e) = task.sign_and_send(&self.context).await { - self.state.fail_count += 1; - self.state.last_retry = Some(Utc::now()); - let retry_delay: Duration = federate_retry_sleep_duration(self.state.fail_count); + let mut fail_count = initial_fail_count; + while let Err(e) = task.sign_and_send(&context).await { + fail_count += 1; + report.send(SendActivityResult::Failure { + fail_count, + // activity_id: activity.id, + })?; + let retry_delay: Duration = federate_retry_sleep_duration(fail_count); tracing::info!( "{}: retrying {:?} attempt {} with delay {retry_delay:.2?}. ({e})", - self.instance.domain, + domain, activity.id, - self.state.fail_count + fail_count ); - self.save_and_send_state(pool).await?; tokio::select! { () = sleep(retry_delay) => {}, - () = self.stop.cancelled() => { + () = stop.cancelled() => { // save state to db and exit return Ok(()); } } } - - // Activity send successful, mark instance as alive if it hasn't been updated in a while. - let updated = self.instance.updated.unwrap_or(self.instance.published); - if updated.add(Days::new(1)) < Utc::now() { - self.instance.updated = Some(Utc::now()); - - let form = InstanceForm::builder() - .domain(self.instance.domain.clone()) - .updated(Some(naive_now())) - .build(); - Instance::update(pool, self.instance.id, form).await?; - } } + report.send(SendActivityResult::Success(SendSuccessInfo { + activity_id: activity.id, + published: Some(activity.published), + was_skipped: false, + }))?; Ok(()) } - /// get inbox urls of sending the given activity to the given instance /// most often this will return 0 values (if instance doesn't care about the activity) /// or 1 value (the shared inbox) /// > 1 values only happens for non-lemmy software - async fn get_inbox_urls( - &mut self, - pool: &mut DbPool<'_>, - activity: &SentActivity, - ) -> Result> { + async fn get_inbox_urls(&mut self, activity: &SentActivity) -> Result> { let mut inbox_urls: HashSet = HashSet::new(); if activity.send_all_instances { if !self.site_loaded { - self.site = Site::read_from_instance_id(pool, self.instance.id).await?; + self.site = Site::read_from_instance_id(&mut self.pool(), self.instance.id).await?; self.site_loaded = true; } if let Some(site) = &self.site { @@ -302,23 +458,30 @@ impl InstanceWorker { Ok(inbox_urls) } - async fn update_communities(&mut self, pool: &mut DbPool<'_>) -> Result<()> { + async fn update_communities(&mut self) -> Result<()> { if (Utc::now() - self.last_full_communities_fetch) > *FOLLOW_REMOVALS_RECHECK_DELAY { + tracing::debug!( + "{}: fetching full list of communities", + self.instance.domain + ); // process removals every hour (self.followed_communities, self.last_full_communities_fetch) = self - .get_communities(pool, self.instance.id, Utc.timestamp_nanos(0)) + .get_communities(self.instance.id, Utc.timestamp_nanos(0)) .await?; self.last_incremental_communities_fetch = self.last_full_communities_fetch; } if (Utc::now() - self.last_incremental_communities_fetch) > *FOLLOW_ADDITIONS_RECHECK_DELAY { // process additions every minute let (news, time) = self - .get_communities( - pool, - self.instance.id, - self.last_incremental_communities_fetch, - ) + .get_communities(self.instance.id, self.last_incremental_communities_fetch) .await?; + if !news.is_empty() { + tracing::debug!( + "{}: fetched {} incremental new followed communities", + self.instance.domain, + news.len() + ); + } self.followed_communities.extend(news); self.last_incremental_communities_fetch = time; } @@ -328,29 +491,38 @@ impl InstanceWorker { /// get a list of local communities with the remote inboxes on the given instance that cares about them async fn get_communities( &mut self, - pool: &mut DbPool<'_>, instance_id: InstanceId, last_fetch: DateTime, ) -> Result<(HashMap>, DateTime)> { let new_last_fetch = Utc::now() - chrono::TimeDelta::try_seconds(10).expect("TimeDelta out of bounds"); // update to time before fetch to ensure overlap. subtract 10s to ensure overlap even if published date is not exact Ok(( - CommunityFollowerView::get_instance_followed_community_inboxes(pool, instance_id, last_fetch) - .await? - .into_iter() - .fold(HashMap::new(), |mut map, (c, u)| { - map.entry(c).or_default().insert(u.into()); - map - }), + CommunityFollowerView::get_instance_followed_community_inboxes( + &mut self.pool(), + instance_id, + last_fetch, + ) + .await? + .into_iter() + .fold(HashMap::new(), |mut map, (c, u)| { + map.entry(c).or_default().insert(u.into()); + map + }), new_last_fetch, )) } - async fn save_and_send_state(&mut self, pool: &mut DbPool<'_>) -> Result<()> { + async fn save_and_send_state(&mut self) -> Result<()> { + tracing::debug!("{}: saving and sending state", self.instance.domain); self.last_state_insert = Utc::now(); - FederationQueueState::upsert(pool, &self.state).await?; + FederationQueueState::upsert(&mut self.pool(), &self.state).await?; self .stats_sender .send((self.instance.domain.clone(), self.state.clone()))?; Ok(()) } + + fn pool(&self) -> DbPool<'_> { + //self.config.to_request_data() + DbPool::Pool(&self.pool) + } } From 491daabaf2c1eeeb59857433a41a1c484b384148 Mon Sep 17 00:00:00 2001 From: phiresky Date: Sat, 13 Apr 2024 23:48:32 +0200 Subject: [PATCH 2/9] federation: some comments --- crates/federate/src/worker.rs | 67 ++++++++++++++++++++++------------- 1 file changed, 43 insertions(+), 24 deletions(-) diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index 0dbf1c391..58896750a 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -93,15 +93,14 @@ struct SendSuccessInfo { was_skipped: bool, } impl PartialOrd for SendSuccessInfo { - fn partial_cmp(&self, other: &Self) -> Option { - other.activity_id.partial_cmp(&self.activity_id) - } + fn partial_cmp(&self, other: &Self) -> Option { + other.activity_id.partial_cmp(&self.activity_id) + } } impl Ord for SendSuccessInfo { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - other.activity_id.cmp(&self.activity_id) - } - + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + other.activity_id.cmp(&self.activity_id) + } } enum SendActivityResult { Success(SendSuccessInfo), @@ -115,13 +114,11 @@ impl InstanceWorker { pub(crate) async fn init_and_loop( instance: Instance, config: FederationConfig, - // pool: ActualDbPool, // in theory there's a ref to the pool in context, but i couldn't get that to work wrt lifetimes stop: CancellationToken, stats_sender: UnboundedSender<(String, FederationQueueState)>, ) -> Result<(), anyhow::Error> { - let state = - FederationQueueState::load(&mut config.to_request_data().pool(), instance.id).await?; let pool = config.to_request_data().inner_pool().clone(); + let state = FederationQueueState::load(&mut DbPool::Pool(&pool), instance.id).await?; let mut worker = InstanceWorker { instance, site_loaded: false, @@ -147,28 +144,43 @@ impl InstanceWorker { // activities that have been successfully sent but // that are not the lowest number and thus can't be written to the database yet let mut successfuls = BinaryHeap::::new(); + // number of activities that currently have a task spawned to send it let mut in_flight: i64 = 0; - let (report_inbox_result, mut receive_inbox_result) = + // each HTTP send will report back to this channel concurrently + let (report_send_result, mut receive_send_result) = tokio::sync::mpsc::unbounded_channel::(); while !self.stop.is_cancelled() { // check if we need to wait for a send to finish before sending the next one + // we wait if (a) the last request failed, only if a request is already in flight (not at the start of the loop) + // or (b) if we have too many successfuls in memory or (c) if we have too many in flight let need_wait_for_event = (in_flight != 0 && self.state.fail_count > 0) - || successfuls.len() > MAX_SUCCESSFULS + || successfuls.len() >= MAX_SUCCESSFULS || in_flight >= *CONCURRENT_SENDS; - if need_wait_for_event || receive_inbox_result.len() > 4 { + if need_wait_for_event || receive_send_result.len() > 4 { + // if len() > 0 then this does not block and allows us to write to db more often + // if len is 0 then this means we wait for something to change our above conditions, + // which can only happen by an event sent into the channel self - .handle_send_results(&mut receive_inbox_result, &mut successfuls, &mut in_flight) + .handle_send_results(&mut receive_send_result, &mut successfuls, &mut in_flight) .await?; + // handle_send_results does not guarantee that we are now in a condition where we want to send a new one, + // so repeat this check until the if no longer applies + continue; } else { + // send a new activity if there is one self.update_communities().await?; - let last_successful_id = self - .state - .last_successful_id - .map(|e| e.0) - .expect("set above"); - let next_id = ActivityId(last_successful_id + (successfuls.len() as i64) + in_flight + 1); + let next_id = { + // calculate next id to send based on the last id and the in flight requests + let last_successful_id = self + .state + .last_successful_id + .map(|e| e.0) + .expect("set above"); + ActivityId(last_successful_id + (successfuls.len() as i64) + in_flight + 1) + }; if next_id > latest_id { + // lazily fetch latest id only if we have cought up latest_id = self.get_latest_id().await?; if next_id > latest_id { // no more work to be done, wait before rechecking @@ -181,7 +193,7 @@ impl InstanceWorker { } in_flight += 1; self - .spawn_send_if_needed(next_id, report_inbox_result.clone()) + .spawn_send_if_needed(next_id, report_send_result.clone()) .await?; } } @@ -348,11 +360,12 @@ impl InstanceWorker { let stop = self.stop.clone(); let domain = self.instance.domain.clone(); tokio::spawn(async move { + let mut report = report; if let Err(e) = InstanceWorker::send_retry_loop( &ele.0, &ele.1, inbox_urls, - report, + &mut report, initial_fail_count, domain, data, @@ -365,6 +378,11 @@ impl InstanceWorker { ele.0.ap_id, e ); + report.send(SendActivityResult::Success(SendSuccessInfo { + activity_id, + published: None, + was_skipped: true, + })).ok(); } }); Ok(()) @@ -376,7 +394,7 @@ impl InstanceWorker { activity: &SentActivity, object: &SharedInboxActivities, inbox_urls: Vec, - report: UnboundedSender, + report: &mut UnboundedSender, initial_fail_count: i32, domain: String, context: Data, @@ -384,7 +402,7 @@ impl InstanceWorker { ) -> Result<()> { let pool = &mut context.pool(); let Some(actor_apub_id) = &activity.actor_apub_id else { - return Ok(()); // activity was inserted before persistent queue was activated + return Err(anyhow::anyhow!("activity is from before lemmy 0.19")); }; let actor = get_actor_cached(pool, activity.actor_type, actor_apub_id) .await @@ -413,6 +431,7 @@ impl InstanceWorker { () = sleep(retry_delay) => {}, () = stop.cancelled() => { // save state to db and exit + // TODO: do we need to report state here to prevent hang on exit? return Ok(()); } } From 987174a6c1c6e9b7f407737ded4d00ab96367660 Mon Sep 17 00:00:00 2001 From: phiresky Date: Mon, 15 Apr 2024 17:59:26 +0200 Subject: [PATCH 3/9] lint and set force_write true when a request fails --- crates/federate/src/lib.rs | 10 +--------- crates/federate/src/worker.rs | 17 ++++++++++------- 2 files changed, 11 insertions(+), 16 deletions(-) diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs index a48f31ed9..6bcd453f1 100644 --- a/crates/federate/src/lib.rs +++ b/crates/federate/src/lib.rs @@ -79,15 +79,7 @@ async fn start_stop_federation_workers( let instance = instance.clone(); let config = config.clone(); let stats_sender = stats_sender.clone(); - async move { - InstanceWorker::init_and_loop( - instance, - config, - stop, - stats_sender, - ) - .await - } + async move { InstanceWorker::init_and_loop(instance, config, stop, stats_sender).await } }), ); } else if !should_federate { diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index 58896750a..950458ad9 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -231,7 +231,7 @@ impl InstanceWorker { /// get newest activity id and set it as last_successful_id if it's the first time this instance is seen async fn get_latest_id(&mut self) -> Result { let latest_id = get_latest_activity_id(&mut self.pool()).await?; - if let None = self.state.last_successful_id { + if self.state.last_successful_id.is_none() { // this is the initial creation (instance first seen) of the federation queue for this instance // skip all past activities: self.state.last_successful_id = Some(latest_id); @@ -247,7 +247,7 @@ impl InstanceWorker { successfuls: &mut BinaryHeap, in_flight: &mut i64, ) -> Result<(), anyhow::Error> { - let force_write = false; + let mut force_write = false; let mut events = Vec::new(); // wait for at least one event but if there's multiple handle them all receive_inbox_result.recv_many(&mut events, 1000).await; @@ -266,6 +266,7 @@ impl InstanceWorker { // override fail count - if multiple activities are currently sending this value may get conflicting info but that's fine self.state.fail_count = fail_count; self.state.last_retry = Some(Utc::now()); + force_write = true; } } } @@ -378,11 +379,13 @@ impl InstanceWorker { ele.0.ap_id, e ); - report.send(SendActivityResult::Success(SendSuccessInfo { - activity_id, - published: None, - was_skipped: true, - })).ok(); + report + .send(SendActivityResult::Success(SendSuccessInfo { + activity_id, + published: None, + was_skipped: true, + })) + .ok(); } }); Ok(()) From a66aec69dd2b31dce7d91e5f2477b71903c66fc7 Mon Sep 17 00:00:00 2001 From: phiresky Date: Mon, 15 Apr 2024 18:05:46 +0200 Subject: [PATCH 4/9] inbox_urls return vec --- crates/federate/src/worker.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index 950458ad9..6f151257f 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -290,7 +290,8 @@ impl InstanceWorker { } Ok(()) } - /// checks whether the highest successful id can be updated and writes to db if so + /// Checks that sequential activities `last_successful_id + 1`, `last_successful_id + 2` etc have been sent successfully. + /// In that case updates `last_successful_id` and saves the state to the database if the time since the last save is greater than `SAVE_STATE_EVERY_TIME`. async fn pop_successfuls_and_write( &mut self, successfuls: &mut BinaryHeap, @@ -355,7 +356,6 @@ impl InstanceWorker { }))?; return Ok(()); } - let inbox_urls = inbox_urls.into_iter().collect(); let initial_fail_count = self.state.fail_count; let data = self.config.to_request_data(); let stop = self.stop.clone(); @@ -451,7 +451,7 @@ impl InstanceWorker { /// most often this will return 0 values (if instance doesn't care about the activity) /// or 1 value (the shared inbox) /// > 1 values only happens for non-lemmy software - async fn get_inbox_urls(&mut self, activity: &SentActivity) -> Result> { + async fn get_inbox_urls(&mut self, activity: &SentActivity) -> Result> { let mut inbox_urls: HashSet = HashSet::new(); if activity.send_all_instances { @@ -477,7 +477,7 @@ impl InstanceWorker { .filter(|&u| (u.domain() == Some(&self.instance.domain))) .map(|u| u.inner().clone()), ); - Ok(inbox_urls) + Ok(inbox_urls.into_iter().collect()) } async fn update_communities(&mut self) -> Result<()> { From a3d705f0d68e12b72d05f2123661650af8bc3895 Mon Sep 17 00:00:00 2001 From: phiresky Date: Mon, 15 Apr 2024 18:39:07 +0200 Subject: [PATCH 5/9] split inbox functions into separate file --- crates/federate/src/inboxes.rs | 149 +++++++++++++++++++++++++++++++++ crates/federate/src/lib.rs | 1 + crates/federate/src/worker.rs | 148 +++++--------------------------- 3 files changed, 170 insertions(+), 128 deletions(-) create mode 100644 crates/federate/src/inboxes.rs diff --git a/crates/federate/src/inboxes.rs b/crates/federate/src/inboxes.rs new file mode 100644 index 000000000..dde096053 --- /dev/null +++ b/crates/federate/src/inboxes.rs @@ -0,0 +1,149 @@ +use crate::util::LEMMY_TEST_FAST_FEDERATION; +use anyhow::Result; +use chrono::{DateTime, TimeZone, Utc}; +use lemmy_db_schema::{ + newtypes::{CommunityId, InstanceId}, + source::{activity::SentActivity, site::Site}, + utils::{ActualDbPool, DbPool}, +}; +use lemmy_db_views_actor::structs::CommunityFollowerView; +use once_cell::sync::Lazy; +use reqwest::Url; +use std::collections::{HashMap, HashSet}; + +/// interval with which new additions to community_followers are queried. +/// +/// The first time some user on an instance follows a specific remote community (or, more precisely: the first time a (followed_community_id, follower_inbox_url) tuple appears), +/// this delay limits the maximum time until the follow actually results in activities from that community id being sent to that inbox url. +/// This delay currently needs to not be too small because the DB load is currently fairly high because of the current structure of storing inboxes for every person, not having a separate list of shared_inboxes, and the architecture of having every instance queue be fully separate. +/// (see https://github.com/LemmyNet/lemmy/issues/3958) +static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy = Lazy::new(|| { + if *LEMMY_TEST_FAST_FEDERATION { + chrono::TimeDelta::try_seconds(1).expect("TimeDelta out of bounds") + } else { + chrono::TimeDelta::try_minutes(2).expect("TimeDelta out of bounds") + } +}); +/// The same as FOLLOW_ADDITIONS_RECHECK_DELAY, but triggering when the last person on an instance unfollows a specific remote community. +/// This is expected to happen pretty rarely and updating it in a timely manner is not too important. +static FOLLOW_REMOVALS_RECHECK_DELAY: Lazy = + Lazy::new(|| chrono::TimeDelta::try_hours(1).expect("TimeDelta out of bounds")); + +pub(crate) struct CommunityInboxCollector { + // load site lazily because if an instance is first seen due to being on allowlist, + // the corresponding row in `site` may not exist yet since that is only added once + // `fetch_instance_actor_for_object` is called. + // (this should be unlikely to be relevant outside of the federation tests) + site_loaded: bool, + site: Option, + followed_communities: HashMap>, + last_full_communities_fetch: DateTime, + last_incremental_communities_fetch: DateTime, + instance_id: InstanceId, + domain: String, + pool: ActualDbPool, +} +impl CommunityInboxCollector { + pub fn new( + pool: ActualDbPool, + instance_id: InstanceId, + domain: String, + ) -> CommunityInboxCollector { + CommunityInboxCollector { + pool, + site_loaded: false, + site: None, + followed_communities: HashMap::new(), + last_full_communities_fetch: Utc.timestamp_nanos(0), + last_incremental_communities_fetch: Utc.timestamp_nanos(0), + instance_id, + domain, + } + } + /// get inbox urls of sending the given activity to the given instance + /// most often this will return 0 values (if instance doesn't care about the activity) + /// or 1 value (the shared inbox) + /// > 1 values only happens for non-lemmy software + pub async fn get_inbox_urls(&mut self, activity: &SentActivity) -> Result> { + let mut inbox_urls: HashSet = HashSet::new(); + + if activity.send_all_instances { + if !self.site_loaded { + self.site = Site::read_from_instance_id(&mut self.pool(), self.instance_id).await?; + self.site_loaded = true; + } + if let Some(site) = &self.site { + // Nutomic: Most non-lemmy software wont have a site row. That means it cant handle these activities. So handling it like this is fine. + inbox_urls.insert(site.inbox_url.inner().clone()); + } + } + if let Some(t) = &activity.send_community_followers_of { + if let Some(urls) = self.followed_communities.get(t) { + inbox_urls.extend(urls.iter().cloned()); + } + } + inbox_urls.extend( + activity + .send_inboxes + .iter() + .filter_map(std::option::Option::as_ref) + .filter(|&u| (u.domain() == Some(&self.domain))) + .map(|u| u.inner().clone()), + ); + Ok(inbox_urls.into_iter().collect()) + } + + pub async fn update_communities(&mut self) -> Result<()> { + if (Utc::now() - self.last_full_communities_fetch) > *FOLLOW_REMOVALS_RECHECK_DELAY { + tracing::debug!("{}: fetching full list of communities", self.domain); + // process removals every hour + (self.followed_communities, self.last_full_communities_fetch) = self + .get_communities(self.instance_id, Utc.timestamp_nanos(0)) + .await?; + self.last_incremental_communities_fetch = self.last_full_communities_fetch; + } + if (Utc::now() - self.last_incremental_communities_fetch) > *FOLLOW_ADDITIONS_RECHECK_DELAY { + // process additions every minute + let (news, time) = self + .get_communities(self.instance_id, self.last_incremental_communities_fetch) + .await?; + if !news.is_empty() { + tracing::debug!( + "{}: fetched {} incremental new followed communities", + self.domain, + news.len() + ); + } + self.followed_communities.extend(news); + self.last_incremental_communities_fetch = time; + } + Ok(()) + } + + /// get a list of local communities with the remote inboxes on the given instance that cares about them + async fn get_communities( + &mut self, + instance_id: InstanceId, + last_fetch: DateTime, + ) -> Result<(HashMap>, DateTime)> { + let new_last_fetch = + Utc::now() - chrono::TimeDelta::try_seconds(10).expect("TimeDelta out of bounds"); // update to time before fetch to ensure overlap. subtract 10s to ensure overlap even if published date is not exact + Ok(( + CommunityFollowerView::get_instance_followed_community_inboxes( + &mut self.pool(), + instance_id, + last_fetch, + ) + .await? + .into_iter() + .fold(HashMap::new(), |mut map, (c, u)| { + map.entry(c).or_default().insert(u.into()); + map + }), + new_last_fetch, + )) + } + fn pool(&self) -> DbPool<'_> { + DbPool::Pool(&self.pool) + } +} diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs index 6bcd453f1..ebc6c783e 100644 --- a/crates/federate/src/lib.rs +++ b/crates/federate/src/lib.rs @@ -14,6 +14,7 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; +mod inboxes; mod util; mod worker; diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index 6f151257f..5d952cc64 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -1,9 +1,11 @@ -use crate::util::{ - get_activity_cached, - get_actor_cached, - get_latest_activity_id, - LEMMY_TEST_FAST_FEDERATION, - WORK_FINISHED_RECHECK_DELAY, +use crate::{ + inboxes::CommunityInboxCollector, + util::{ + get_activity_cached, + get_actor_cached, + get_latest_activity_id, + WORK_FINISHED_RECHECK_DELAY, + }, }; use activitypub_federation::{ activity_sending::SendActivityTask, @@ -15,20 +17,18 @@ use chrono::{DateTime, Days, TimeZone, Utc}; use lemmy_api_common::{context::LemmyContext, federate_retry_sleep_duration}; use lemmy_apub::{activity_lists::SharedInboxActivities, FEDERATION_CONTEXT}; use lemmy_db_schema::{ - newtypes::{ActivityId, CommunityId, InstanceId}, + newtypes::{ActivityId}, source::{ activity::SentActivity, federation_queue_state::FederationQueueState, instance::{Instance, InstanceForm}, - site::Site, }, utils::{naive_now, ActualDbPool, DbPool}, }; -use lemmy_db_views_actor::structs::CommunityFollowerView; use once_cell::sync::Lazy; use reqwest::Url; use std::{ - collections::{BinaryHeap, HashMap, HashSet}, + collections::{BinaryHeap}, ops::{Add, Deref}, time::Duration, }; @@ -40,23 +40,6 @@ use tokio_util::sync::CancellationToken; /// Save state to db after this time has passed since the last state (so if the server crashes or is SIGKILLed, less than X seconds of activities are resent) static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(60); -/// interval with which new additions to community_followers are queried. -/// -/// The first time some user on an instance follows a specific remote community (or, more precisely: the first time a (followed_community_id, follower_inbox_url) tuple appears), -/// this delay limits the maximum time until the follow actually results in activities from that community id being sent to that inbox url. -/// This delay currently needs to not be too small because the DB load is currently fairly high because of the current structure of storing inboxes for every person, not having a separate list of shared_inboxes, and the architecture of having every instance queue be fully separate. -/// (see https://github.com/LemmyNet/lemmy/issues/3958) -static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy = Lazy::new(|| { - if *LEMMY_TEST_FAST_FEDERATION { - chrono::TimeDelta::try_seconds(1).expect("TimeDelta out of bounds") - } else { - chrono::TimeDelta::try_minutes(2).expect("TimeDelta out of bounds") - } -}); -/// The same as FOLLOW_ADDITIONS_RECHECK_DELAY, but triggering when the last person on an instance unfollows a specific remote community. -/// This is expected to happen pretty rarely and updating it in a timely manner is not too important. -static FOLLOW_REMOVALS_RECHECK_DELAY: Lazy = - Lazy::new(|| chrono::TimeDelta::try_hours(1).expect("TimeDelta out of bounds")); static CONCURRENT_SENDS: Lazy = Lazy::new(|| { std::env::var("LEMMY_FEDERATION_CONCURRENT_SENDS_PER_INSTANCE") @@ -69,21 +52,13 @@ const MAX_SUCCESSFULS: usize = 1000; pub(crate) struct InstanceWorker { instance: Instance, - // load site lazily because if an instance is first seen due to being on allowlist, - // the corresponding row in `site` may not exist yet since that is only added once - // `fetch_instance_actor_for_object` is called. - // (this should be unlikely to be relevant outside of the federation tests) - site_loaded: bool, - site: Option, - followed_communities: HashMap>, stop: CancellationToken, config: FederationConfig, stats_sender: UnboundedSender<(String, FederationQueueState)>, - last_full_communities_fetch: DateTime, - last_incremental_communities_fetch: DateTime, state: FederationQueueState, last_state_insert: DateTime, pool: ActualDbPool, + inbox_collector: CommunityInboxCollector, } #[derive(Debug, PartialEq, Eq)] @@ -120,15 +95,15 @@ impl InstanceWorker { let pool = config.to_request_data().inner_pool().clone(); let state = FederationQueueState::load(&mut DbPool::Pool(&pool), instance.id).await?; let mut worker = InstanceWorker { + inbox_collector: CommunityInboxCollector::new( + pool.clone(), + instance.id, + instance.domain.clone(), + ), instance, - site_loaded: false, - site: None, - followed_communities: HashMap::new(), stop, config, stats_sender, - last_full_communities_fetch: Utc.timestamp_nanos(0), - last_incremental_communities_fetch: Utc.timestamp_nanos(0), state, last_state_insert: Utc.timestamp_nanos(0), pool, @@ -169,7 +144,7 @@ impl InstanceWorker { continue; } else { // send a new activity if there is one - self.update_communities().await?; + self.inbox_collector.update_communities().await?; let next_id = { // calculate next id to send based on the last id and the in flight requests let last_successful_id = self @@ -344,6 +319,7 @@ impl InstanceWorker { }; let activity = &ele.0; let inbox_urls = self + .inbox_collector .get_inbox_urls(activity) .await .context("failed figuring out inbox urls")?; @@ -403,6 +379,8 @@ impl InstanceWorker { context: Data, stop: CancellationToken, ) -> Result<()> { + debug_assert!(!inbox_urls.is_empty()); + let pool = &mut context.pool(); let Some(actor_apub_id) = &activity.actor_apub_id else { return Err(anyhow::anyhow!("activity is from before lemmy 0.19")); @@ -447,92 +425,7 @@ impl InstanceWorker { }))?; Ok(()) } - /// get inbox urls of sending the given activity to the given instance - /// most often this will return 0 values (if instance doesn't care about the activity) - /// or 1 value (the shared inbox) - /// > 1 values only happens for non-lemmy software - async fn get_inbox_urls(&mut self, activity: &SentActivity) -> Result> { - let mut inbox_urls: HashSet = HashSet::new(); - - if activity.send_all_instances { - if !self.site_loaded { - self.site = Site::read_from_instance_id(&mut self.pool(), self.instance.id).await?; - self.site_loaded = true; - } - if let Some(site) = &self.site { - // Nutomic: Most non-lemmy software wont have a site row. That means it cant handle these activities. So handling it like this is fine. - inbox_urls.insert(site.inbox_url.inner().clone()); - } - } - if let Some(t) = &activity.send_community_followers_of { - if let Some(urls) = self.followed_communities.get(t) { - inbox_urls.extend(urls.iter().cloned()); - } - } - inbox_urls.extend( - activity - .send_inboxes - .iter() - .filter_map(std::option::Option::as_ref) - .filter(|&u| (u.domain() == Some(&self.instance.domain))) - .map(|u| u.inner().clone()), - ); - Ok(inbox_urls.into_iter().collect()) - } - - async fn update_communities(&mut self) -> Result<()> { - if (Utc::now() - self.last_full_communities_fetch) > *FOLLOW_REMOVALS_RECHECK_DELAY { - tracing::debug!( - "{}: fetching full list of communities", - self.instance.domain - ); - // process removals every hour - (self.followed_communities, self.last_full_communities_fetch) = self - .get_communities(self.instance.id, Utc.timestamp_nanos(0)) - .await?; - self.last_incremental_communities_fetch = self.last_full_communities_fetch; - } - if (Utc::now() - self.last_incremental_communities_fetch) > *FOLLOW_ADDITIONS_RECHECK_DELAY { - // process additions every minute - let (news, time) = self - .get_communities(self.instance.id, self.last_incremental_communities_fetch) - .await?; - if !news.is_empty() { - tracing::debug!( - "{}: fetched {} incremental new followed communities", - self.instance.domain, - news.len() - ); - } - self.followed_communities.extend(news); - self.last_incremental_communities_fetch = time; - } - Ok(()) - } - /// get a list of local communities with the remote inboxes on the given instance that cares about them - async fn get_communities( - &mut self, - instance_id: InstanceId, - last_fetch: DateTime, - ) -> Result<(HashMap>, DateTime)> { - let new_last_fetch = - Utc::now() - chrono::TimeDelta::try_seconds(10).expect("TimeDelta out of bounds"); // update to time before fetch to ensure overlap. subtract 10s to ensure overlap even if published date is not exact - Ok(( - CommunityFollowerView::get_instance_followed_community_inboxes( - &mut self.pool(), - instance_id, - last_fetch, - ) - .await? - .into_iter() - .fold(HashMap::new(), |mut map, (c, u)| { - map.entry(c).or_default().insert(u.into()); - map - }), - new_last_fetch, - )) - } async fn save_and_send_state(&mut self) -> Result<()> { tracing::debug!("{}: saving and sending state", self.instance.domain); self.last_state_insert = Utc::now(); @@ -544,7 +437,6 @@ impl InstanceWorker { } fn pool(&self) -> DbPool<'_> { - //self.config.to_request_data() DbPool::Pool(&self.pool) } } From 7eedcb7be233aac54a7260d2c62679f0fd196d81 Mon Sep 17 00:00:00 2001 From: phiresky Date: Mon, 15 Apr 2024 18:45:57 +0200 Subject: [PATCH 6/9] cleanup --- crates/federate/src/worker.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index 5d952cc64..acf52ca96 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -17,7 +17,7 @@ use chrono::{DateTime, Days, TimeZone, Utc}; use lemmy_api_common::{context::LemmyContext, federate_retry_sleep_duration}; use lemmy_apub::{activity_lists::SharedInboxActivities, FEDERATION_CONTEXT}; use lemmy_db_schema::{ - newtypes::{ActivityId}, + newtypes::ActivityId, source::{ activity::SentActivity, federation_queue_state::FederationQueueState, @@ -28,7 +28,7 @@ use lemmy_db_schema::{ use once_cell::sync::Lazy; use reqwest::Url; use std::{ - collections::{BinaryHeap}, + collections::BinaryHeap, ops::{Add, Deref}, time::Duration, }; @@ -300,6 +300,8 @@ impl InstanceWorker { Ok(()) } + /// we collect the relevant inboxes in the main instance worker task, and only spawn the send task if we have inboxes to send to + /// this limits CPU usage and reduces overhead for the (many) cases where we don't have any inboxes async fn spawn_send_if_needed( &mut self, activity_id: ActivityId, @@ -338,7 +340,7 @@ impl InstanceWorker { let domain = self.instance.domain.clone(); tokio::spawn(async move { let mut report = report; - if let Err(e) = InstanceWorker::send_retry_loop( + let res = InstanceWorker::send_retry_loop( &ele.0, &ele.1, inbox_urls, @@ -348,8 +350,8 @@ impl InstanceWorker { data, stop, ) - .await - { + .await; + if let Err(e) = res { tracing::warn!( "sending {} errored internally, skipping activity: {:?}", ele.0.ap_id, From e719bafc9b413712ab36a715a4670bc2ff8e824c Mon Sep 17 00:00:00 2001 From: phiresky Date: Mon, 15 Apr 2024 19:25:40 +0200 Subject: [PATCH 7/9] extract sending task code to separate file --- crates/federate/src/lib.rs | 1 + crates/federate/src/send.rs | 117 ++++++++++++++++++++++++++++++++++ crates/federate/src/worker.rs | 97 +++------------------------- 3 files changed, 126 insertions(+), 89 deletions(-) create mode 100644 crates/federate/src/send.rs diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs index ebc6c783e..0a95c85aa 100644 --- a/crates/federate/src/lib.rs +++ b/crates/federate/src/lib.rs @@ -15,6 +15,7 @@ use tokio::{ use tokio_util::sync::CancellationToken; mod inboxes; +mod send; mod util; mod worker; diff --git a/crates/federate/src/send.rs b/crates/federate/src/send.rs new file mode 100644 index 000000000..6a2ac5364 --- /dev/null +++ b/crates/federate/src/send.rs @@ -0,0 +1,117 @@ +use crate::util::{get_activity_cached, get_actor_cached}; +use activitypub_federation::{ + activity_sending::SendActivityTask, + config::Data, + protocol::context::WithContext, +}; +use anyhow::{Context, Result}; +use chrono::{DateTime, Utc}; +use lemmy_api_common::{context::LemmyContext, federate_retry_sleep_duration}; +use lemmy_apub::{activity_lists::SharedInboxActivities, FEDERATION_CONTEXT}; +use lemmy_db_schema::{newtypes::ActivityId, source::activity::SentActivity}; +use reqwest::Url; +use std::{ops::Deref, sync::Arc, time::Duration}; +use tokio::{sync::mpsc::UnboundedSender, time::sleep}; +use tokio_util::sync::CancellationToken; + +#[derive(Debug, PartialEq, Eq)] +pub(crate) struct SendSuccessInfo { + pub activity_id: ActivityId, + pub published: Option>, + pub was_skipped: bool, +} +// need to be able to order them for the binary heap in the worker +impl PartialOrd for SendSuccessInfo { + fn partial_cmp(&self, other: &Self) -> Option { + other.activity_id.partial_cmp(&self.activity_id) + } +} +impl Ord for SendSuccessInfo { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + other.activity_id.cmp(&self.activity_id) + } +} +pub(crate) enum SendActivityResult { + Success(SendSuccessInfo), + Failure { + fail_count: i32, + // activity_id: ActivityId, + }, +} + +pub(crate) struct SendRetryTask<'a> { + pub activity: &'a SentActivity, + pub object: &'a SharedInboxActivities, + /// must not be empty at this point + pub inbox_urls: Vec, + /// report to the main instance worker + pub report: &'a mut UnboundedSender, + /// the first request will be sent immediately, but the next one will be delayed according to the number of previous fails + 1 + pub initial_fail_count: i32, + /// for logging + pub domain: String, + pub context: Data, + pub stop: CancellationToken, +} + +impl<'a> SendRetryTask<'a> { + // this function will return successfully when (a) send succeeded or (b) worker cancelled + // and will return an error if an internal error occurred (send errors cause an infinite loop) + pub async fn send_retry_loop(self) -> Result<()> { + let SendRetryTask { + activity, + object, + inbox_urls, + report, + initial_fail_count, + domain, + context, + stop, + } = self; + debug_assert!(!inbox_urls.is_empty()); + + let pool = &mut context.pool(); + let Some(actor_apub_id) = &activity.actor_apub_id else { + return Err(anyhow::anyhow!("activity is from before lemmy 0.19")); + }; + let actor = get_actor_cached(pool, activity.actor_type, actor_apub_id) + .await + .context("failed getting actor instance (was it marked deleted / removed?)")?; + + let object = WithContext::new(object.clone(), FEDERATION_CONTEXT.deref().clone()); + let requests = SendActivityTask::prepare(&object, actor.as_ref(), inbox_urls, &context).await?; + for task in requests { + // usually only one due to shared inbox + tracing::debug!("sending out {}", task); + let mut fail_count = initial_fail_count; + while let Err(e) = task.sign_and_send(&context).await { + fail_count += 1; + report.send(SendActivityResult::Failure { + fail_count, + // activity_id: activity.id, + })?; + let retry_delay = federate_retry_sleep_duration(fail_count); + tracing::info!( + "{}: retrying {:?} attempt {} with delay {retry_delay:.2?}. ({e})", + domain, + activity.id, + fail_count + ); + tokio::select! { + () = sleep(retry_delay) => {}, + () = stop.cancelled() => { + // save state to db and exit + // TODO: do we need to report state here to prevent hang on exit? + return Ok(()); + } + } + } + } + report.send(SendActivityResult::Success(SendSuccessInfo { + activity_id: activity.id, + published: Some(activity.published), + was_skipped: false, + }))?; + Ok(()) + } +} diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index acf52ca96..5ed7c22d6 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -1,5 +1,6 @@ use crate::{ inboxes::CommunityInboxCollector, + send::{SendActivityResult, SendRetryTask, SendSuccessInfo}, util::{ get_activity_cached, get_actor_cached, @@ -61,30 +62,6 @@ pub(crate) struct InstanceWorker { inbox_collector: CommunityInboxCollector, } -#[derive(Debug, PartialEq, Eq)] -struct SendSuccessInfo { - activity_id: ActivityId, - published: Option>, - was_skipped: bool, -} -impl PartialOrd for SendSuccessInfo { - fn partial_cmp(&self, other: &Self) -> Option { - other.activity_id.partial_cmp(&self.activity_id) - } -} -impl Ord for SendSuccessInfo { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - other.activity_id.cmp(&self.activity_id) - } -} -enum SendActivityResult { - Success(SendSuccessInfo), - Failure { - fail_count: i32, - // activity_id: ActivityId, - }, -} - impl InstanceWorker { pub(crate) async fn init_and_loop( instance: Instance, @@ -340,16 +317,17 @@ impl InstanceWorker { let domain = self.instance.domain.clone(); tokio::spawn(async move { let mut report = report; - let res = InstanceWorker::send_retry_loop( - &ele.0, - &ele.1, + let res = SendRetryTask { + activity: &ele.0, + object: &ele.1, inbox_urls, - &mut report, + report: &mut report, initial_fail_count, domain, - data, + context: data, stop, - ) + } + .send_retry_loop() .await; if let Err(e) = res { tracing::warn!( @@ -369,65 +347,6 @@ impl InstanceWorker { Ok(()) } - // this function will return successfully when (a) send succeeded or (b) worker cancelled - // and will return an error if an internal error occurred (send errors cause an infinite loop) - async fn send_retry_loop( - activity: &SentActivity, - object: &SharedInboxActivities, - inbox_urls: Vec, - report: &mut UnboundedSender, - initial_fail_count: i32, - domain: String, - context: Data, - stop: CancellationToken, - ) -> Result<()> { - debug_assert!(!inbox_urls.is_empty()); - - let pool = &mut context.pool(); - let Some(actor_apub_id) = &activity.actor_apub_id else { - return Err(anyhow::anyhow!("activity is from before lemmy 0.19")); - }; - let actor = get_actor_cached(pool, activity.actor_type, actor_apub_id) - .await - .context("failed getting actor instance (was it marked deleted / removed?)")?; - - let object = WithContext::new(object.clone(), FEDERATION_CONTEXT.deref().clone()); - let requests = SendActivityTask::prepare(&object, actor.as_ref(), inbox_urls, &context).await?; - for task in requests { - // usually only one due to shared inbox - tracing::debug!("sending out {}", task); - let mut fail_count = initial_fail_count; - while let Err(e) = task.sign_and_send(&context).await { - fail_count += 1; - report.send(SendActivityResult::Failure { - fail_count, - // activity_id: activity.id, - })?; - let retry_delay: Duration = federate_retry_sleep_duration(fail_count); - tracing::info!( - "{}: retrying {:?} attempt {} with delay {retry_delay:.2?}. ({e})", - domain, - activity.id, - fail_count - ); - tokio::select! { - () = sleep(retry_delay) => {}, - () = stop.cancelled() => { - // save state to db and exit - // TODO: do we need to report state here to prevent hang on exit? - return Ok(()); - } - } - } - } - report.send(SendActivityResult::Success(SendSuccessInfo { - activity_id: activity.id, - published: Some(activity.published), - was_skipped: false, - }))?; - Ok(()) - } - async fn save_and_send_state(&mut self) -> Result<()> { tracing::debug!("{}: saving and sending state", self.instance.domain); self.last_state_insert = Utc::now(); From 5e986ef5ddbe9fb207aa7dfc4e86446792062477 Mon Sep 17 00:00:00 2001 From: phiresky Date: Mon, 15 Apr 2024 19:41:18 +0200 Subject: [PATCH 8/9] move federation concurrent config to config file --- config/defaults.hjson | 9 +++--- crates/federate/src/lib.rs | 39 +++++++++++++++-------- crates/federate/src/send.rs | 4 +-- crates/federate/src/worker.rs | 46 +++++++++------------------- crates/utils/src/settings/structs.rs | 18 +++++++---- src/lib.rs | 1 + 6 files changed, 62 insertions(+), 55 deletions(-) diff --git a/config/defaults.hjson b/config/defaults.hjson index c52f9055e..ce440ab69 100644 --- a/config/defaults.hjson +++ b/config/defaults.hjson @@ -106,10 +106,11 @@ port: 8536 # Whether the site is available over TLS. Needs to be true for federation to work. tls_enabled: true - # The number of activitypub federation workers that can be in-flight concurrently - worker_count: 0 - # The number of activitypub federation retry workers that can be in-flight concurrently - retry_count: 0 + federation: { + # Limit to the number of concurrent outgoing federation requests per target instance. + # Set this to a higher value than 1 (e.g. 6) only if you have a huge instance (>10 activities per second) and if a receiving instance is not keeping up. + concurrent_sends_per_instance: 1 + } prometheus: { bind: "127.0.0.1" port: 10002 diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs index 0a95c85aa..cb2fd1fa6 100644 --- a/crates/federate/src/lib.rs +++ b/crates/federate/src/lib.rs @@ -1,7 +1,11 @@ use crate::{util::CancellableTask, worker::InstanceWorker}; use activitypub_federation::config::FederationConfig; use chrono::{Local, Timelike}; -use lemmy_api_common::{context::LemmyContext, federate_retry_sleep_duration}; +use lemmy_api_common::{ + context::LemmyContext, + federate_retry_sleep_duration, + lemmy_utils::settings::structs::FederationWorkerConfig, +}; use lemmy_db_schema::{ newtypes::InstanceId, source::{federation_queue_state::FederationQueueState, instance::Instance}, @@ -36,7 +40,8 @@ pub struct Opts { async fn start_stop_federation_workers( opts: Opts, pool: ActualDbPool, - federation_config: FederationConfig, + federation_lib_config: FederationConfig, + federation_worker_config: FederationWorkerConfig, cancel: CancellationToken, ) -> anyhow::Result<()> { let mut workers = HashMap::::new(); @@ -45,7 +50,9 @@ async fn start_stop_federation_workers( let exit_print = tokio::spawn(receive_print_stats(pool.clone(), stats_receiver)); let pool2 = &mut DbPool::Pool(&pool); let process_index = opts.process_index - 1; - let local_domain = federation_config.settings().get_hostname_without_port()?; + let local_domain = federation_lib_config + .settings() + .get_hostname_without_port()?; loop { let mut total_count = 0; let mut dead_count = 0; @@ -73,15 +80,19 @@ async fn start_stop_federation_workers( continue; } // create new worker - let config = federation_config.clone(); + let config = federation_lib_config.clone(); let stats_sender = stats_sender.clone(); + let federation_worker_config = federation_worker_config.clone(); workers.insert( instance.id, CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |stop| { - let instance = instance.clone(); - let config = config.clone(); - let stats_sender = stats_sender.clone(); - async move { InstanceWorker::init_and_loop(instance, config, stop, stats_sender).await } + InstanceWorker::init_and_loop( + instance.clone(), + config.clone(), + federation_worker_config.clone(), + stop, + stats_sender.clone(), + ) }), ); } else if !should_federate { @@ -117,12 +128,16 @@ pub fn start_stop_federation_workers_cancellable( opts: Opts, pool: ActualDbPool, config: FederationConfig, + federation_worker_config: FederationWorkerConfig, ) -> CancellableTask { CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |stop| { - let opts = opts.clone(); - let pool = pool.clone(); - let config = config.clone(); - async move { start_stop_federation_workers(opts, pool, config, stop).await } + start_stop_federation_workers( + opts.clone(), + pool.clone(), + config.clone(), + federation_worker_config.clone(), + stop, + ) }) } diff --git a/crates/federate/src/send.rs b/crates/federate/src/send.rs index 6a2ac5364..41516eef7 100644 --- a/crates/federate/src/send.rs +++ b/crates/federate/src/send.rs @@ -1,4 +1,4 @@ -use crate::util::{get_activity_cached, get_actor_cached}; +use crate::util::get_actor_cached; use activitypub_federation::{ activity_sending::SendActivityTask, config::Data, @@ -10,7 +10,7 @@ use lemmy_api_common::{context::LemmyContext, federate_retry_sleep_duration}; use lemmy_apub::{activity_lists::SharedInboxActivities, FEDERATION_CONTEXT}; use lemmy_db_schema::{newtypes::ActivityId, source::activity::SentActivity}; use reqwest::Url; -use std::{ops::Deref, sync::Arc, time::Duration}; +use std::ops::Deref; use tokio::{sync::mpsc::UnboundedSender, time::sleep}; use tokio_util::sync::CancellationToken; diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index 5ed7c22d6..edf530fb3 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -1,38 +1,25 @@ use crate::{ inboxes::CommunityInboxCollector, send::{SendActivityResult, SendRetryTask, SendSuccessInfo}, - util::{ - get_activity_cached, - get_actor_cached, - get_latest_activity_id, - WORK_FINISHED_RECHECK_DELAY, - }, -}; -use activitypub_federation::{ - activity_sending::SendActivityTask, - config::{Data, FederationConfig}, - protocol::context::WithContext, + util::{get_activity_cached, get_latest_activity_id, WORK_FINISHED_RECHECK_DELAY}, }; +use activitypub_federation::config::FederationConfig; use anyhow::{Context, Result}; use chrono::{DateTime, Days, TimeZone, Utc}; -use lemmy_api_common::{context::LemmyContext, federate_retry_sleep_duration}; -use lemmy_apub::{activity_lists::SharedInboxActivities, FEDERATION_CONTEXT}; +use lemmy_api_common::{ + context::LemmyContext, + federate_retry_sleep_duration, + lemmy_utils::settings::structs::FederationWorkerConfig, +}; use lemmy_db_schema::{ newtypes::ActivityId, source::{ - activity::SentActivity, federation_queue_state::FederationQueueState, instance::{Instance, InstanceForm}, }, utils::{naive_now, ActualDbPool, DbPool}, }; -use once_cell::sync::Lazy; -use reqwest::Url; -use std::{ - collections::BinaryHeap, - ops::{Add, Deref}, - time::Duration, -}; +use std::{collections::BinaryHeap, ops::Add, time::Duration}; use tokio::{ sync::mpsc::{self, UnboundedSender}, time::sleep, @@ -42,19 +29,14 @@ use tokio_util::sync::CancellationToken; /// Save state to db after this time has passed since the last state (so if the server crashes or is SIGKILLed, less than X seconds of activities are resent) static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(60); -static CONCURRENT_SENDS: Lazy = Lazy::new(|| { - std::env::var("LEMMY_FEDERATION_CONCURRENT_SENDS_PER_INSTANCE") - .ok() - .and_then(|s| s.parse().ok()) - .unwrap_or(8) -}); /// Maximum number of successful sends to allow out of order const MAX_SUCCESSFULS: usize = 1000; pub(crate) struct InstanceWorker { instance: Instance, stop: CancellationToken, - config: FederationConfig, + federation_lib_config: FederationConfig, + federation_worker_config: FederationWorkerConfig, stats_sender: UnboundedSender<(String, FederationQueueState)>, state: FederationQueueState, last_state_insert: DateTime, @@ -66,6 +48,7 @@ impl InstanceWorker { pub(crate) async fn init_and_loop( instance: Instance, config: FederationConfig, + federation_worker_config: FederationWorkerConfig, stop: CancellationToken, stats_sender: UnboundedSender<(String, FederationQueueState)>, ) -> Result<(), anyhow::Error> { @@ -77,9 +60,10 @@ impl InstanceWorker { instance.id, instance.domain.clone(), ), + federation_worker_config, instance, stop, - config, + federation_lib_config: config, stats_sender, state, last_state_insert: Utc.timestamp_nanos(0), @@ -108,7 +92,7 @@ impl InstanceWorker { // or (b) if we have too many successfuls in memory or (c) if we have too many in flight let need_wait_for_event = (in_flight != 0 && self.state.fail_count > 0) || successfuls.len() >= MAX_SUCCESSFULS - || in_flight >= *CONCURRENT_SENDS; + || in_flight >= self.federation_worker_config.concurrent_sends_per_instance; if need_wait_for_event || receive_send_result.len() > 4 { // if len() > 0 then this does not block and allows us to write to db more often // if len is 0 then this means we wait for something to change our above conditions, @@ -312,7 +296,7 @@ impl InstanceWorker { return Ok(()); } let initial_fail_count = self.state.fail_count; - let data = self.config.to_request_data(); + let data = self.federation_lib_config.to_request_data(); let stop = self.stop.clone(); let domain = self.instance.domain.clone(); tokio::spawn(async move { diff --git a/crates/utils/src/settings/structs.rs b/crates/utils/src/settings/structs.rs index 4a8d8afb6..c5394a908 100644 --- a/crates/utils/src/settings/structs.rs +++ b/crates/utils/src/settings/structs.rs @@ -42,12 +42,8 @@ pub struct Settings { #[default(None)] #[doku(skip)] pub opentelemetry_url: Option, - /// The number of activitypub federation workers that can be in-flight concurrently - #[default(0)] - pub worker_count: usize, - /// The number of activitypub federation retry workers that can be in-flight concurrently - #[default(0)] - pub retry_count: usize, + #[default(Default::default())] + pub federation: FederationWorkerConfig, // Prometheus configuration. #[default(None)] #[doku(example = "Some(Default::default())")] @@ -234,3 +230,13 @@ pub struct PrometheusConfig { #[doku(example = "10002")] pub port: i32, } + +#[derive(Debug, Deserialize, Serialize, Clone, SmartDefault, Document)] +#[serde(default)] +// named federation"worker"config to disambiguate from the activitypub library configuration +pub struct FederationWorkerConfig { + /// Limit to the number of concurrent outgoing federation requests per target instance. + /// Set this to a higher value than 1 (e.g. 6) only if you have a huge instance (>10 activities per second) and if a receiving instance is not keeping up. + #[default(1)] + pub concurrent_sends_per_instance: i64, +} diff --git a/src/lib.rs b/src/lib.rs index 00960826a..e9ac06e62 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -213,6 +213,7 @@ pub async fn start_lemmy_server(args: CmdArgs) -> LemmyResult<()> { }, pool.clone(), federation_config.clone(), + SETTINGS.federation.clone(), ) }); let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?; From c1932f9009bb3969947ce6145a6cabcf821fb642 Mon Sep 17 00:00:00 2001 From: phiresky Date: Mon, 15 Apr 2024 21:05:14 +0200 Subject: [PATCH 9/9] off by one issue --- crates/federate/src/worker.rs | 46 +++++++++++++++++++++++------------ 1 file changed, 31 insertions(+), 15 deletions(-) diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index edf530fb3..c282e482a 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -75,7 +75,7 @@ impl InstanceWorker { /// this worker only returns if (a) there is an internal error or (b) the cancellation token is cancelled (graceful exit) async fn loop_until_stopped(&mut self) -> Result<()> { self.initial_fail_sleep().await?; - let mut latest_id = self.get_latest_id().await?; + let (mut last_sent_id, mut newest_id) = self.get_latest_ids().await?; // activities that have been successfully sent but // that are not the lowest number and thus can't be written to the database yet @@ -106,19 +106,30 @@ impl InstanceWorker { } else { // send a new activity if there is one self.inbox_collector.update_communities().await?; - let next_id = { - // calculate next id to send based on the last id and the in flight requests + let next_id_to_send = ActivityId(last_sent_id.0 + 1); + { + // sanity check: calculate next id to send based on the last id and the in flight requests let last_successful_id = self .state .last_successful_id .map(|e| e.0) .expect("set above"); - ActivityId(last_successful_id + (successfuls.len() as i64) + in_flight + 1) - }; - if next_id > latest_id { + let expected_next_id = last_successful_id + (successfuls.len() as i64) + in_flight + 1; + // compare to next id based on incrementing + if expected_next_id != next_id_to_send.0 { + anyhow::bail!( + "{}: next id to send is not as expected: {:?} != {:?}", + self.instance.domain, + expected_next_id, + next_id_to_send + ) + } + } + + if next_id_to_send > newest_id { // lazily fetch latest id only if we have cought up - latest_id = self.get_latest_id().await?; - if next_id > latest_id { + newest_id = self.get_latest_ids().await?.1; + if next_id_to_send > newest_id { // no more work to be done, wait before rechecking tokio::select! { () = sleep(*WORK_FINISHED_RECHECK_DELAY) => {}, @@ -128,8 +139,9 @@ impl InstanceWorker { } } in_flight += 1; + last_sent_id = next_id_to_send; self - .spawn_send_if_needed(next_id, report_send_result.clone()) + .spawn_send_if_needed(next_id_to_send, report_send_result.clone()) .await?; } } @@ -164,17 +176,20 @@ impl InstanceWorker { Ok(()) } - /// get newest activity id and set it as last_successful_id if it's the first time this instance is seen - async fn get_latest_id(&mut self) -> Result { + /// return the last successfully sent id and the newest activity id in the database + /// sets last_successful_id in database if it's the first time this instance is seen + async fn get_latest_ids(&mut self) -> Result<(ActivityId, ActivityId)> { let latest_id = get_latest_activity_id(&mut self.pool()).await?; - if self.state.last_successful_id.is_none() { + if let Some(last) = self.state.last_successful_id { + Ok((last, latest_id)) + } else { // this is the initial creation (instance first seen) of the federation queue for this instance // skip all past activities: self.state.last_successful_id = Some(latest_id); // save here to ensure it's not read as 0 again later if no activities have happened self.save_and_send_state().await?; + Ok((latest_id, latest_id)) } - Ok(latest_id) } async fn handle_send_results( @@ -234,11 +249,12 @@ impl InstanceWorker { force_write: bool, ) -> Result<()> { let Some(mut last_id) = self.state.last_successful_id else { - tracing::warn!("should be impossible: last successful id is None"); + tracing::warn!("{} should be impossible: last successful id is None", self.instance.domain); return Ok(()); }; tracing::debug!( - "last: {:?}, next: {:?}, currently in successfuls: {:?}", + "{} last: {:?}, next: {:?}, currently in successfuls: {:?}", + self.instance.domain, last_id, successfuls.peek(), successfuls.iter()