diff --git a/Cargo.lock b/Cargo.lock index 129126f04..e3c1a3d03 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,7 +17,7 @@ checksum = "8f27d075294830fcab6f66e320dab524bc6d048f4a151698e153205559113772" [[package]] name = "activitypub_federation" version = "0.5.2" -source = "git+https://github.com/LemmyNet/activitypub-federation-rust.git?branch=actix-inbox-parts#b393221b10ed6fdd879e78e6c842b9714baa0f95" +source = "git+https://github.com/LemmyNet/activitypub-federation-rust.git?branch=actix-inbox-parts#d848b5981bf60bfec4239d00ba5c98155de0d505" dependencies = [ "activitystreams-kinds", "actix-web", diff --git a/api_tests/run-federation-test.sh b/api_tests/run-federation-test.sh index 969a95b3e..060abd3a3 100755 --- a/api_tests/run-federation-test.sh +++ b/api_tests/run-federation-test.sh @@ -11,7 +11,7 @@ killall -s1 lemmy_server || true popd pnpm i -pnpm api-test || true +pnpm api-test-post || true killall -s1 lemmy_server || true killall -s1 pict-rs || true diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs index a4dc49536..c2885ae8b 100644 --- a/crates/federate/src/lib.rs +++ b/crates/federate/src/lib.rs @@ -73,19 +73,16 @@ async fn start_stop_federation_workers( // create new worker let config = federation_config.clone(); let stats_sender = stats_sender.clone(); - let pool = pool.clone(); workers.insert( instance.id, CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |stop| { let instance = instance.clone(); let req_data = config.clone().to_request_data(); let stats_sender = stats_sender.clone(); - let pool = pool.clone(); async move { InstanceWorker::init_and_loop( instance, req_data, - &mut DbPool::Pool(&pool), stop, stats_sender, ) diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index ff2a68e3c..1d5191471 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -30,9 +30,14 @@ use reqwest::Url; use std::{ collections::{HashMap, HashSet}, ops::{Add, Deref}, + sync::{ + atomic::{AtomicU8, Ordering}, + Arc, + Mutex, + }, time::Duration, }; -use tokio::{sync::mpsc::UnboundedSender, time::sleep}; +use tokio::{spawn, sync::mpsc::UnboundedSender, time::sleep}; use tokio_util::sync::CancellationToken; /// Check whether to save state to db every n sends if there's no failures (during failures state is saved after every attempt) @@ -57,67 +62,128 @@ static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy = Lazy::new(|| { /// This is expected to happen pretty rarely and updating it in a timely manner is not too important. static FOLLOW_REMOVALS_RECHECK_DELAY: Lazy = Lazy::new(|| chrono::TimeDelta::try_hours(1).expect("TimeDelta out of bounds")); + +const MAX_INFLIGHT_REQUESTS: u8 = 5; + +#[derive(Clone)] pub(crate) struct InstanceWorker { instance: Instance, + context: Data, + data: Arc>, + stats_sender: UnboundedSender<(String, FederationQueueState)>, +} +impl InstanceWorker { + fn is_cancelled(&self) -> bool { + self.data.lock().unwrap().stop.is_cancelled() + } + async fn cancelled(&self) { + let stop = { + let lock = self.data.lock().unwrap(); + lock.stop.clone() + }; + stop.cancelled().await + } + fn state(&self) -> FederationQueueState { + self.data.lock().unwrap().state.clone() + } + fn last_state_insert(&self) -> DateTime { + self.data.lock().unwrap().last_state_insert.clone() + } + async fn site(&self, pool: &mut DbPool<'_>) -> Option { + let site_loaded = { + let lock = self.data.lock().unwrap(); + lock.site_loaded + }; + if !site_loaded { + let site = Site::read_from_instance_id(pool, self.instance.id) + .await + .unwrap(); + let mut lock = self.data.lock().unwrap(); + lock.site = site; + lock.site_loaded = true; + } + self.data.lock().unwrap().site.clone() + } + async fn update_communities(&self, pool: &mut DbPool<'_>) { + let mut communities = { + let lock = self.data.lock().unwrap(); + lock.communities.clone() + }; + communities + .update_communities(self.instance.id, pool) + .await + .unwrap(); + self.data.lock().unwrap().communities = communities; + } +} + +struct InstanceWorkerData { // load site lazily because if an instance is first seen due to being on allowlist, // the corresponding row in `site` may not exist yet since that is only added once // `fetch_instance_actor_for_object` is called. // (this should be unlikely to be relevant outside of the federation tests) site_loaded: bool, site: Option, - followed_communities: HashMap>, stop: CancellationToken, - context: Data, - stats_sender: UnboundedSender<(String, FederationQueueState)>, - last_full_communities_fetch: DateTime, - last_incremental_communities_fetch: DateTime, state: FederationQueueState, last_state_insert: DateTime, + communities: InstanceWorkerCommunities, +} + +#[derive(Clone)] +struct InstanceWorkerCommunities { + followed_communities: HashMap>, + last_full_communities_fetch: DateTime, + last_incremental_communities_fetch: DateTime, } impl InstanceWorker { pub(crate) async fn init_and_loop( instance: Instance, context: Data, - pool: &mut DbPool<'_>, // in theory there's a ref to the pool in context, but i couldn't get that to work wrt lifetimes stop: CancellationToken, stats_sender: UnboundedSender<(String, FederationQueueState)>, ) -> Result<(), anyhow::Error> { - let state = FederationQueueState::load(pool, instance.id).await?; + let state = FederationQueueState::load(&mut context.pool(), instance.id).await?; let mut worker = InstanceWorker { instance, - site_loaded: false, - site: None, - followed_communities: HashMap::new(), - stop, - context, + context: context.reset_request_count(), stats_sender, - last_full_communities_fetch: Utc.timestamp_nanos(0), - last_incremental_communities_fetch: Utc.timestamp_nanos(0), - state, - last_state_insert: Utc.timestamp_nanos(0), + data: Arc::new(Mutex::new(InstanceWorkerData { + site_loaded: false, + site: None, + stop, + state, + last_state_insert: Utc.timestamp_nanos(0), + communities: InstanceWorkerCommunities { + followed_communities: HashMap::new(), + last_full_communities_fetch: Utc.timestamp_nanos(0), + last_incremental_communities_fetch: Utc.timestamp_nanos(0), + }, + })), }; - worker.loop_until_stopped(pool).await + worker.loop_until_stopped(&context).await } /// loop fetch new activities from db and send them to the inboxes of the given instances /// this worker only returns if (a) there is an internal error or (b) the cancellation token is cancelled (graceful exit) pub(crate) async fn loop_until_stopped( &mut self, - pool: &mut DbPool<'_>, + context: &LemmyContext, ) -> Result<(), anyhow::Error> { + let pool = &mut context.pool(); let save_state_every = chrono::Duration::from_std(SAVE_STATE_EVERY_TIME).expect("not negative"); - self.update_communities(pool).await?; + self.update_communities(pool).await; self.initial_fail_sleep().await?; - while !self.stop.is_cancelled() { - self.loop_batch(pool).await?; - if self.stop.is_cancelled() { + while !self.is_cancelled() { + self.loop_batch(&context).await?; + if self.is_cancelled() { break; } - if (Utc::now() - self.last_state_insert) > save_state_every { + if (Utc::now() - self.last_state_insert()) > save_state_every { self.save_and_send_state(pool).await?; } - self.update_communities(pool).await?; + self.update_communities(pool).await; } // final update of state in db self.save_and_send_state(pool).await?; @@ -126,74 +192,99 @@ impl InstanceWorker { async fn initial_fail_sleep(&mut self) -> Result<()> { // before starting queue, sleep remaining duration if last request failed - if self.state.fail_count > 0 { + if self.state().fail_count > 0 { let last_retry = self - .state + .state() .last_retry .context("impossible: if fail count set last retry also set")?; let elapsed = (Utc::now() - last_retry).to_std()?; - let required = federate_retry_sleep_duration(self.state.fail_count); + let required = federate_retry_sleep_duration(self.state().fail_count); if elapsed >= required { return Ok(()); } let remaining = required - elapsed; tokio::select! { () = sleep(remaining) => {}, - () = self.stop.cancelled() => {} + () = self.cancelled() => {} } } Ok(()) } /// send out a batch of CHECK_SAVE_STATE_EVERY_IT activities - async fn loop_batch(&mut self, pool: &mut DbPool<'_>) -> Result<()> { - let latest_id = get_latest_activity_id(pool).await?; - let mut id = if let Some(id) = self.state.last_successful_id { + async fn loop_batch(&mut self, context: &LemmyContext) -> Result<()> { + let latest_id = get_latest_activity_id(&mut context.pool()).await?; + let mut id = if let Some(id) = self.state().last_successful_id { id } else { // this is the initial creation (instance first seen) of the federation queue for this instance // skip all past activities: - self.state.last_successful_id = Some(latest_id); + self.state().last_successful_id = Some(latest_id); // save here to ensure it's not read as 0 again later if no activities have happened - self.save_and_send_state(pool).await?; + self.save_and_send_state(&mut context.pool()).await?; latest_id }; if id >= latest_id { + if latest_id.0 != 0 { + dbg!("work done", id, latest_id); + } // no more work to be done, wait before rechecking tokio::select! { () = sleep(*WORK_FINISHED_RECHECK_DELAY) => {}, - () = self.stop.cancelled() => {} + () = self.cancelled() => {} } return Ok(()); } + // TODO: somehow its not reaching here and not sending anything + dbg!("loop 1"); let mut processed_activities = 0; - while id < latest_id - && processed_activities < CHECK_SAVE_STATE_EVERY_IT - && !self.stop.is_cancelled() + let inflight_requests = Arc::new(AtomicU8::new(0)); + dbg!("loop 2"); + while id < latest_id && processed_activities < CHECK_SAVE_STATE_EVERY_IT && !self.is_cancelled() { + dbg!("loop 3"); + while dbg!(inflight_requests.load(Ordering::Relaxed)) >= MAX_INFLIGHT_REQUESTS { + dbg!("sleep"); + sleep(Duration::from_millis(100)).await; + } id = ActivityId(id.0 + 1); processed_activities += 1; - let Some(ele) = get_activity_cached(pool, id) + dbg!("even before sending activity", id); + let Some(ele) = get_activity_cached(&mut context.pool(), id) .await .context("failed reading activity from db")? else { + dbg!("first send, set last successful to current"); tracing::debug!("{}: {:?} does not exist", self.instance.domain, id); - self.state.last_successful_id = Some(id); + self.state().last_successful_id = Some(id); continue; }; - if let Err(e) = self.send_retry_loop(pool, &ele.0, &ele.1).await { - tracing::warn!( - "sending {} errored internally, skipping activity: {:?}", - ele.0.ap_id, - e - ); - } - if self.stop.is_cancelled() { + let context = context.clone(); + let inflight_requests = inflight_requests.clone(); + let mut self_ = self.clone(); + dbg!(&ele); + dbg!("before sending activity", ele.0.id); + spawn(async move { + dbg!("during sending activity", ele.0.id); + dbg!(inflight_requests.fetch_add(1, Ordering::Relaxed) + 1); + if let Err(e) = self_ + .send_retry_loop(&mut context.pool(), &ele.0, &ele.1) + .await + { + tracing::warn!( + "sending {} errored internally, skipping activity: {:?}", + ele.0.ap_id, + e + ); + } + inflight_requests.fetch_sub(1, Ordering::Relaxed); + // send success! + self_.state().last_successful_id = Some(id); + self_.state().last_successful_published_time = Some(ele.0.published); + self_.state().fail_count = 0; + }); + if self.is_cancelled() { return Ok(()); } - // send success! - self.state.last_successful_id = Some(id); - self.state.last_successful_published_time = Some(ele.0.published); - self.state.fail_count = 0; } Ok(()) } @@ -212,8 +303,8 @@ impl InstanceWorker { .context("failed figuring out inbox urls")?; if inbox_urls.is_empty() { tracing::debug!("{}: {:?} no inboxes", self.instance.domain, activity.id); - self.state.last_successful_id = Some(activity.id); - self.state.last_successful_published_time = Some(activity.published); + self.state().last_successful_id = Some(activity.id); + self.state().last_successful_published_time = Some(activity.published); return Ok(()); } let Some(actor_apub_id) = &activity.actor_apub_id else { @@ -231,19 +322,19 @@ impl InstanceWorker { // usually only one due to shared inbox tracing::debug!("sending out {}", task); while let Err(e) = task.sign_and_send(&self.context).await { - self.state.fail_count += 1; - self.state.last_retry = Some(Utc::now()); - let retry_delay: Duration = federate_retry_sleep_duration(self.state.fail_count); + self.state().fail_count += 1; + self.state().last_retry = Some(Utc::now()); + let retry_delay: Duration = federate_retry_sleep_duration(self.state().fail_count); tracing::info!( "{}: retrying {:?} attempt {} with delay {retry_delay:.2?}. ({e})", self.instance.domain, activity.id, - self.state.fail_count + self.state().fail_count ); self.save_and_send_state(pool).await?; tokio::select! { () = sleep(retry_delay) => {}, - () = self.stop.cancelled() => { + () = self.cancelled() => { // save state to db and exit return Ok(()); } @@ -277,17 +368,20 @@ impl InstanceWorker { let mut inbox_urls: HashSet = HashSet::new(); if activity.send_all_instances { - if !self.site_loaded { - self.site = Site::read_from_instance_id(pool, self.instance.id).await?; - self.site_loaded = true; - } - if let Some(site) = &self.site { + if let Some(site) = &self.site(pool).await { // Nutomic: Most non-lemmy software wont have a site row. That means it cant handle these activities. So handling it like this is fine. inbox_urls.insert(site.inbox_url.inner().clone()); } } if let Some(t) = &activity.send_community_followers_of { - if let Some(urls) = self.followed_communities.get(t) { + if let Some(urls) = self + .data + .lock() + .unwrap() + .communities + .followed_communities + .get(t) + { inbox_urls.extend(urls.iter().cloned()); } } @@ -302,22 +396,35 @@ impl InstanceWorker { Ok(inbox_urls) } - async fn update_communities(&mut self, pool: &mut DbPool<'_>) -> Result<()> { + async fn save_and_send_state(&mut self, pool: &mut DbPool<'_>) -> Result<()> { + { + self.data.lock().unwrap().last_state_insert = Utc::now(); + } + FederationQueueState::upsert(pool, &self.state()).await?; + self + .stats_sender + .send((self.instance.domain.clone(), self.state().clone()))?; + Ok(()) + } +} + +impl InstanceWorkerCommunities { + async fn update_communities( + &mut self, + instance_id: InstanceId, + pool: &mut DbPool<'_>, + ) -> Result<()> { if (Utc::now() - self.last_full_communities_fetch) > *FOLLOW_REMOVALS_RECHECK_DELAY { // process removals every hour (self.followed_communities, self.last_full_communities_fetch) = self - .get_communities(pool, self.instance.id, Utc.timestamp_nanos(0)) + .get_communities(pool, instance_id, Utc.timestamp_nanos(0)) .await?; self.last_incremental_communities_fetch = self.last_full_communities_fetch; } if (Utc::now() - self.last_incremental_communities_fetch) > *FOLLOW_ADDITIONS_RECHECK_DELAY { // process additions every minute let (news, time) = self - .get_communities( - pool, - self.instance.id, - self.last_incremental_communities_fetch, - ) + .get_communities(pool, instance_id, self.last_incremental_communities_fetch) .await?; self.followed_communities.extend(news); self.last_incremental_communities_fetch = time; @@ -345,12 +452,4 @@ impl InstanceWorker { new_last_fetch, )) } - async fn save_and_send_state(&mut self, pool: &mut DbPool<'_>) -> Result<()> { - self.last_state_insert = Utc::now(); - FederationQueueState::upsert(pool, &self.state).await?; - self - .stats_sender - .send((self.instance.domain.clone(), self.state.clone()))?; - Ok(()) - } }