diff --git a/Cargo.lock b/Cargo.lock index 08fbd70b3..c012290eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11,7 +11,7 @@ checksum = "fe438c63458706e03479442743baae6c88256498e6431708f6dfc520a26515d3" [[package]] name = "activitypub_federation" version = "0.4.6" -source = "git+https://github.com/phiresky/activitypub-federation-rust/?branch=raw-sending#7991eab37536067d312d75d4fbf2a769286d4dbe" +source = "git+https://github.com/phiresky/activitypub-federation-rust/?branch=raw-sending#76519bd094603b410ac939ffe64c19d34fd6121f" dependencies = [ "activitystreams-kinds", "actix-web", diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index 76236f761..ca80debf3 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -2,10 +2,7 @@ use crate::{ federation_queue_state::FederationQueueState, util::{get_activity_cached, get_actor_cached, get_latest_activity_id, retry_sleep_duration}, }; -use activitypub_federation::{ - activity_queue::{prepare_raw, send_raw, sign_raw}, - config::Data, -}; +use activitypub_federation::{activity_sending::SendActivityTask, config::Data}; use anyhow::Result; use chrono::{DateTime, TimeZone, Utc}; use lemmy_api_common::context::LemmyContext; @@ -16,7 +13,7 @@ use lemmy_db_schema::{ utils::DbPool, }; use lemmy_db_views_actor::structs::CommunityFollowerView; -use lemmy_utils::{error::LemmyErrorExt2, REQWEST_TIMEOUT}; +use lemmy_utils::error::LemmyErrorExt2; use reqwest::Url; use std::{ collections::{HashMap, HashSet}, @@ -157,14 +154,13 @@ impl InstanceWorker { let actor = get_actor_cached(pool, activity.actor_type, actor_apub_id).await?; let inbox_urls = inbox_urls.into_iter().collect(); - let requests = prepare_raw(object, actor.as_ref(), inbox_urls, &self.context) + let requests = SendActivityTask::prepare(object, actor.as_ref(), inbox_urls, &self.context) .await .into_anyhow()?; for task in requests { // usually only one due to shared inbox - let mut req = sign_raw(&task, &self.context, REQWEST_TIMEOUT).await?; tracing::info!("sending out {}", task); - while let Err(e) = send_raw(&task, &self.context, req).await { + while let Err(e) = task.sign_and_send(&self.context).await { self.state.fail_count += 1; self.state.last_retry = Utc::now(); let retry_delay: Duration = retry_sleep_duration(self.state.fail_count); @@ -175,7 +171,6 @@ impl InstanceWorker { self.state.fail_count ); self.save_and_send_state(pool).await?; - req = sign_raw(&task, &self.context, REQWEST_TIMEOUT).await?; // resign request tokio::select! { () = sleep(retry_delay) => {}, () = self.stop.cancelled() => { diff --git a/src/lib.rs b/src/lib.rs index 4072df0f3..4e6730b46 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -173,8 +173,6 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> { .app_data(context.clone()) .client(client.clone()) .http_fetch_limit(FEDERATION_HTTP_FETCH_LIMIT) - .worker_count(settings.worker_count) - .retry_count(settings.retry_count) .debug(*SYNCHRONOUS_FEDERATION) .http_signature_compat(true) .url_verifier(Box::new(VerifyUrlData(context.inner_pool().clone())))