mirror of https://github.com/LemmyNet/lemmy
persistent activity queue
parent
102124b6d2
commit
4506309d83
@ -0,0 +1,108 @@
|
||||
use crate::{
|
||||
fetcher::user_or_community::{PersonOrGroup, UserOrCommunity},
|
||||
objects::instance::ApubSite,
|
||||
protocol::objects::instance::Instance,
|
||||
};
|
||||
use activitypub_federation::{
|
||||
config::Data,
|
||||
traits::{Actor, Object},
|
||||
};
|
||||
use chrono::NaiveDateTime;
|
||||
use lemmy_api_common::context::LemmyContext;
|
||||
use lemmy_utils::error::LemmyError;
|
||||
use reqwest::Url;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
// todo: maybe this enum should be somewhere else?
|
||||
#[derive(Debug)]
|
||||
pub enum SiteOrCommunityOrUser {
|
||||
Site(ApubSite),
|
||||
UserOrCommunity(UserOrCommunity),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
#[serde(untagged)]
|
||||
pub enum SiteOrPersonOrGroup {
|
||||
Instance(Instance),
|
||||
PersonOrGroup(PersonOrGroup),
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Object for SiteOrCommunityOrUser {
|
||||
type DataType = LemmyContext;
|
||||
type Kind = SiteOrPersonOrGroup;
|
||||
type Error = LemmyError;
|
||||
|
||||
fn last_refreshed_at(&self) -> Option<NaiveDateTime> {
|
||||
Some(match self {
|
||||
SiteOrCommunityOrUser::Site(p) => p.last_refreshed_at,
|
||||
SiteOrCommunityOrUser::UserOrCommunity(p) => p.last_refreshed_at()?,
|
||||
})
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn read_from_id(
|
||||
object_id: Url,
|
||||
data: &Data<Self::DataType>,
|
||||
) -> Result<Option<Self>, LemmyError> {
|
||||
unimplemented!();
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn delete(self, data: &Data<Self::DataType>) -> Result<(), LemmyError> {
|
||||
match self {
|
||||
SiteOrCommunityOrUser::Site(p) => p.delete(data).await,
|
||||
SiteOrCommunityOrUser::UserOrCommunity(p) => p.delete(data).await,
|
||||
}
|
||||
}
|
||||
|
||||
async fn into_json(self, _data: &Data<Self::DataType>) -> Result<Self::Kind, LemmyError> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn verify(
|
||||
apub: &Self::Kind,
|
||||
expected_domain: &Url,
|
||||
data: &Data<Self::DataType>,
|
||||
) -> Result<(), LemmyError> {
|
||||
match apub {
|
||||
SiteOrPersonOrGroup::Instance(a) => ApubSite::verify(a, expected_domain, data).await,
|
||||
SiteOrPersonOrGroup::PersonOrGroup(a) => {
|
||||
UserOrCommunity::verify(a, expected_domain, data).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn from_json(apub: Self::Kind, data: &Data<Self::DataType>) -> Result<Self, LemmyError> {
|
||||
unimplemented!();
|
||||
}
|
||||
}
|
||||
|
||||
impl Actor for SiteOrCommunityOrUser {
|
||||
fn id(&self) -> Url {
|
||||
match self {
|
||||
SiteOrCommunityOrUser::Site(u) => u.id(),
|
||||
SiteOrCommunityOrUser::UserOrCommunity(c) => c.id(),
|
||||
}
|
||||
}
|
||||
|
||||
fn public_key_pem(&self) -> &str {
|
||||
match self {
|
||||
SiteOrCommunityOrUser::Site(p) => p.public_key_pem(),
|
||||
SiteOrCommunityOrUser::UserOrCommunity(p) => p.public_key_pem(),
|
||||
}
|
||||
}
|
||||
|
||||
fn private_key_pem(&self) -> Option<String> {
|
||||
match self {
|
||||
SiteOrCommunityOrUser::Site(p) => p.private_key_pem(),
|
||||
SiteOrCommunityOrUser::UserOrCommunity(p) => p.private_key_pem(),
|
||||
}
|
||||
}
|
||||
|
||||
fn inbox(&self) -> Url {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
@ -0,0 +1,40 @@
|
||||
[package]
|
||||
name = "lemmy_federate"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
description.workspace = true
|
||||
license.workspace = true
|
||||
homepage.workspace = true
|
||||
documentation.workspace = true
|
||||
repository.workspace = true
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
activitypub_federation.workspace = true
|
||||
anyhow.workspace = true
|
||||
async-trait = "0.1.71"
|
||||
bytes = "1.4.0"
|
||||
chrono.workspace = true
|
||||
dashmap = "5.5.0"
|
||||
diesel = { workspace = true, features = ["postgres", "chrono", "serde_json"] }
|
||||
diesel-async = { workspace = true, features = ["deadpool", "postgres"] }
|
||||
enum_delegate = "0.2.0"
|
||||
futures.workspace = true
|
||||
lemmy_api_common.workspace = true
|
||||
lemmy_apub.workspace = true
|
||||
lemmy_db_schema = { workspace = true, features = ["full"] }
|
||||
lemmy_db_views_actor.workspace = true
|
||||
lemmy_utils.workspace = true
|
||||
moka = { version = "0.11.2", features = ["future"] }
|
||||
once_cell.workspace = true
|
||||
openssl = "0.10.55"
|
||||
reqwest.workspace = true
|
||||
reqwest-middleware = "0.2.2"
|
||||
reqwest-tracing = "0.4.5"
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
tokio-util = "0.7.8"
|
||||
tracing.workspace = true
|
||||
tracing-subscriber = "0.3.17"
|
@ -0,0 +1,52 @@
|
||||
use crate::util::ActivityId;
|
||||
use anyhow::Result;
|
||||
use chrono::{DateTime, TimeZone, Utc};
|
||||
use diesel::prelude::*;
|
||||
use diesel_async::RunQueryDsl;
|
||||
use lemmy_db_schema::utils::{get_conn, DbPool};
|
||||
|
||||
#[derive(Queryable, Selectable, Insertable, AsChangeset, Clone)]
|
||||
#[diesel(table_name = lemmy_db_schema::schema::federation_queue_state)]
|
||||
#[diesel(check_for_backend(diesel::pg::Pg))]
|
||||
pub struct FederationQueueState {
|
||||
/// domain of the instance (primary key)
|
||||
pub domain: String,
|
||||
pub last_successful_id: ActivityId, // todo: i64
|
||||
pub fail_count: i32,
|
||||
pub last_retry: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl FederationQueueState {
|
||||
/// load or return a default empty value
|
||||
pub async fn load(pool: &mut DbPool<'_>, domain_: &str) -> Result<FederationQueueState> {
|
||||
use lemmy_db_schema::schema::federation_queue_state::dsl::*;
|
||||
let conn = &mut get_conn(pool).await?;
|
||||
Ok(
|
||||
federation_queue_state
|
||||
.find(&domain_)
|
||||
.select(FederationQueueState::as_select())
|
||||
.get_result(conn)
|
||||
.await
|
||||
.optional()?
|
||||
.unwrap_or(FederationQueueState {
|
||||
domain: domain_.to_owned(),
|
||||
fail_count: 0,
|
||||
last_retry: Utc.timestamp_nanos(0),
|
||||
last_successful_id: 0, // todo: start at current id not from beginning
|
||||
}),
|
||||
)
|
||||
}
|
||||
pub async fn upsert(pool: &mut DbPool<'_>, state: &FederationQueueState) -> Result<()> {
|
||||
let conn = &mut get_conn(pool).await?;
|
||||
use lemmy_db_schema::schema::federation_queue_state::dsl::*;
|
||||
|
||||
state
|
||||
.insert_into(federation_queue_state)
|
||||
.on_conflict(domain)
|
||||
.do_update()
|
||||
.set(state)
|
||||
.execute(conn)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
@ -0,0 +1,174 @@
|
||||
use crate::{
|
||||
util::{retry_sleep_duration, spawn_cancellable},
|
||||
worker::instance_worker,
|
||||
};
|
||||
use activitypub_federation::config::FederationConfig;
|
||||
use chrono::{Local, Timelike};
|
||||
use federation_queue_state::FederationQueueState;
|
||||
use lemmy_api_common::request::build_user_agent;
|
||||
use lemmy_apub::{VerifyUrlData, FEDERATION_HTTP_FETCH_LIMIT};
|
||||
use lemmy_db_schema::{
|
||||
source::instance::Instance,
|
||||
utils::{build_db_pool, DbPool},
|
||||
};
|
||||
use lemmy_utils::{error::LemmyErrorExt2, settings::SETTINGS, REQWEST_TIMEOUT};
|
||||
use reqwest::Client;
|
||||
use reqwest_middleware::ClientBuilder;
|
||||
use reqwest_tracing::TracingMiddleware;
|
||||
use std::{collections::HashMap, time::Duration};
|
||||
use tokio::{
|
||||
signal::unix::SignalKind,
|
||||
sync::mpsc::{unbounded_channel, UnboundedReceiver},
|
||||
time::sleep,
|
||||
};
|
||||
|
||||
mod federation_queue_state;
|
||||
mod util;
|
||||
mod worker;
|
||||
|
||||
static WORKER_EXIT_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
tracing_subscriber::fmt::init();
|
||||
let settings = SETTINGS.to_owned();
|
||||
// TODO: wait until migrations are applied? or are they safe from race conditions and i can just call run_migrations here as well?
|
||||
let pool = build_db_pool(&settings).await.into_anyhow()?;
|
||||
let user_agent = build_user_agent(&settings);
|
||||
let reqwest_client = Client::builder()
|
||||
.user_agent(user_agent.clone())
|
||||
.timeout(REQWEST_TIMEOUT)
|
||||
.connect_timeout(REQWEST_TIMEOUT)
|
||||
.build()?;
|
||||
|
||||
let client = ClientBuilder::new(reqwest_client.clone())
|
||||
.with(TracingMiddleware::default())
|
||||
.build();
|
||||
|
||||
let federation_config = FederationConfig::builder()
|
||||
.domain(settings.hostname.clone())
|
||||
.app_data(())
|
||||
.client(client.clone())
|
||||
.http_fetch_limit(FEDERATION_HTTP_FETCH_LIMIT)
|
||||
.http_signature_compat(true)
|
||||
.url_verifier(Box::new(VerifyUrlData(pool.clone())))
|
||||
.build()
|
||||
.await?;
|
||||
let process_num = 1 - 1; // todo: pass these in via command line args
|
||||
let process_count = 1;
|
||||
let mut workers = HashMap::new();
|
||||
let mut pool2 = DbPool::from(&pool);
|
||||
|
||||
let (stats_sender, stats_receiver) = unbounded_channel();
|
||||
let exit_print = tokio::spawn(receive_print_stats(&mut pool2, stats_receiver));
|
||||
let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?;
|
||||
let mut terminate = tokio::signal::unix::signal(SignalKind::terminate())?;
|
||||
loop {
|
||||
for (instance, should_federate) in Instance::read_all_with_blocked(&mut pool2)
|
||||
.await?
|
||||
.into_iter()
|
||||
{
|
||||
if instance.id.inner() % process_count != process_num {
|
||||
continue;
|
||||
}
|
||||
if !workers.contains_key(&instance.id) && should_federate {
|
||||
let stats_sender = stats_sender.clone();
|
||||
workers.insert(
|
||||
instance.id,
|
||||
spawn_cancellable(WORKER_EXIT_TIMEOUT, |stop| {
|
||||
instance_worker(
|
||||
pool2,
|
||||
instance,
|
||||
federation_config.to_request_data(),
|
||||
stop,
|
||||
stats_sender,
|
||||
)
|
||||
}),
|
||||
);
|
||||
} else if !should_federate {
|
||||
if let Some(worker) = workers.remove(&instance.id) {
|
||||
if let Err(e) = worker.await {
|
||||
tracing::error!("error stopping worker: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
tokio::select! {
|
||||
() = sleep(Duration::from_secs(60)) => {},
|
||||
_ = tokio::signal::ctrl_c() => {
|
||||
tracing::warn!("Received ctrl-c, shutting down gracefully...");
|
||||
break;
|
||||
}
|
||||
_ = interrupt.recv() => {
|
||||
tracing::warn!("Received interrupt, shutting down gracefully...");
|
||||
break;
|
||||
}
|
||||
_ = terminate.recv() => {
|
||||
tracing::warn!("Received terminate, shutting down gracefully...");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
drop(stats_sender);
|
||||
tracing::warn!(
|
||||
"Waiting for {} workers ({:.2?} max)",
|
||||
workers.len(),
|
||||
WORKER_EXIT_TIMEOUT
|
||||
);
|
||||
futures::future::join_all(workers.into_values()).await;
|
||||
exit_print.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// every 60s, print the state for every instance. exits if the receiver is done (all senders dropped)
|
||||
async fn receive_print_stats(
|
||||
mut pool: &mut DbPool<'_>,
|
||||
mut receiver: UnboundedReceiver<FederationQueueState>,
|
||||
) {
|
||||
let mut printerval = tokio::time::interval(Duration::from_secs(60));
|
||||
printerval.tick().await; // skip first
|
||||
let mut stats = HashMap::new();
|
||||
loop {
|
||||
tokio::select! {
|
||||
ele = receiver.recv() => {
|
||||
let Some(ele) = ele else {
|
||||
tracing::info!("done. quitting");
|
||||
print_stats(&mut pool, &stats).await;
|
||||
return;
|
||||
};
|
||||
stats.insert(ele.domain.clone(), ele);
|
||||
},
|
||||
_ = printerval.tick() => {
|
||||
print_stats(&mut pool, &stats).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
async fn print_stats(pool: &mut DbPool<'_>, stats: &HashMap<String, FederationQueueState>) {
|
||||
let last_id = crate::util::get_latest_activity_id(pool).await;
|
||||
let Ok(last_id) = last_id else {
|
||||
tracing::error!("could not get last id");
|
||||
return;
|
||||
};
|
||||
// it's expected that the values are a bit out of date, everything < SAVE_STATE_EVERY should be considered up to date
|
||||
tracing::info!(
|
||||
"Federation state as of {}:",
|
||||
Local::now().with_nanosecond(0).unwrap().to_rfc3339()
|
||||
);
|
||||
// todo: less noisy output (only output failing instances and summary for successful)
|
||||
// todo: more stats (act/sec, avg http req duration)
|
||||
for stat in stats.values() {
|
||||
let behind = last_id - stat.last_successful_id;
|
||||
if stat.fail_count > 0 {
|
||||
tracing::info!(
|
||||
"{}: Warning. {} behind, {} consecutive fails, current retry delay {:.2?}",
|
||||
stat.domain,
|
||||
behind,
|
||||
stat.fail_count,
|
||||
retry_sleep_duration(stat.fail_count)
|
||||
);
|
||||
} else {
|
||||
tracing::info!("{}: Ok. {} behind", stat.domain, behind);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,190 @@
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use dashmap::DashSet;
|
||||
use diesel::{prelude::*, sql_types::Int8};
|
||||
use diesel_async::RunQueryDsl;
|
||||
use lemmy_apub::{
|
||||
activity_lists::SharedInboxActivities,
|
||||
fetcher::{site_or_community_or_user::SiteOrCommunityOrUser, user_or_community::UserOrCommunity},
|
||||
};
|
||||
use lemmy_db_schema::{
|
||||
source::{
|
||||
activity::{Activity, ActorType},
|
||||
community::Community,
|
||||
person::Person,
|
||||
site::Site,
|
||||
},
|
||||
traits::{ApubActor, Crud},
|
||||
utils::{get_conn, DbPool},
|
||||
};
|
||||
use moka::future::Cache;
|
||||
use once_cell::sync::Lazy;
|
||||
use reqwest::Url;
|
||||
use serde_json::Value;
|
||||
use std::{
|
||||
borrow::{Borrow, Cow},
|
||||
future::Future,
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::{task::JoinHandle, time::sleep};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
/// spawn a task but with graceful shutdown
|
||||
///
|
||||
/// only await the returned future when you want to cancel the task
|
||||
pub fn spawn_cancellable<R: Send + 'static, F>(
|
||||
timeout: Duration,
|
||||
task: impl FnOnce(CancellationToken) -> F,
|
||||
) -> impl Future<Output = Result<R>>
|
||||
where
|
||||
F: Future<Output = Result<R>> + Send + 'static,
|
||||
{
|
||||
let stop = CancellationToken::new();
|
||||
let task = task(stop.clone());
|
||||
let task: JoinHandle<Result<R>> = tokio::spawn(async move {
|
||||
match task.await {
|
||||
Ok(o) => Ok(o),
|
||||
Err(e) => {
|
||||
tracing::error!("worker errored out: {e}");
|
||||
// todo: if this error happens, requeue worker creation in main
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
});
|
||||
let abort = task.abort_handle();
|
||||
async move {
|
||||
tracing::info!("Shutting down task");
|
||||
stop.cancel();
|
||||
tokio::select! {
|
||||
r = task => {
|
||||
Ok(r.context("could not join")??)
|
||||
},
|
||||
_ = sleep(timeout) => {
|
||||
abort.abort();
|
||||
tracing::warn!("Graceful shutdown timed out, aborting task");
|
||||
Err(anyhow!("task aborted due to timeout"))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// assuming apub priv key and ids are immutable, then we don't need to have TTL
|
||||
/// TODO: capacity should be configurable maybe based on memory use
|
||||
pub async fn get_actor_cached(
|
||||
pool: &mut DbPool<'_>,
|
||||
actor_type: ActorType,
|
||||
actor_apub_id: &Url,
|
||||
) -> Result<Arc<SiteOrCommunityOrUser>> {
|
||||
static CACHE: Lazy<Cache<Url, Arc<SiteOrCommunityOrUser>>> =
|
||||
Lazy::new(|| Cache::builder().max_capacity(10000).build());
|
||||
CACHE
|
||||
.try_get_with(actor_apub_id.clone(), async {
|
||||
let url = actor_apub_id.clone().into();
|
||||
let person = match actor_type {
|
||||
ActorType::Site => SiteOrCommunityOrUser::Site(
|
||||
Site::read_from_apub_id(pool, &url)
|
||||
.await?
|
||||
.context("apub site not found")?
|
||||
.into(),
|
||||
),
|
||||
ActorType::Community => SiteOrCommunityOrUser::UserOrCommunity(UserOrCommunity::Community(
|
||||
Community::read_from_apub_id(pool, &url)
|
||||
.await?
|
||||
.context("apub community not found")?
|
||||
.into(),
|
||||
)),
|
||||
ActorType::Person => SiteOrCommunityOrUser::UserOrCommunity(UserOrCommunity::User(
|
||||
Person::read_from_apub_id(pool, &url)
|
||||
.await?
|
||||
.context("apub person not found")?
|
||||
.into(),
|
||||
)),
|
||||
};
|
||||
Result::<_, anyhow::Error>::Ok(Arc::new(person))
|
||||
})
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("err getting actor: {e}"))
|
||||
}
|
||||
|
||||
/// intern urls to reduce memory usage
|
||||
/// not sure if worth it
|
||||
pub fn intern_url<'a>(url: impl Into<Cow<'a, Url>>) -> Arc<Url> {
|
||||
let url: Cow<'a, Url> = url.into();
|
||||
static INTERNED_URLS: Lazy<DashSet<Arc<Url>>> = Lazy::new(DashSet::new);
|
||||
return INTERNED_URLS
|
||||
.get::<Url>(url.borrow())
|
||||
.map(|e| e.clone())
|
||||
.unwrap_or_else(|| {
|
||||
let ret = Arc::new(url.into_owned());
|
||||
INTERNED_URLS.insert(ret.clone());
|
||||
ret
|
||||
});
|
||||
}
|
||||
|
||||
/// this should maybe be a newtype like all the other PersonId CommunityId etc.
|
||||
/// also should be i64
|
||||
pub type ActivityId = i32;
|
||||
|
||||
/// activities are immutable so cache does not need to have TTL
|
||||
/// May return None if the corresponding id does not exist or is a received activity.
|
||||
/// Holes in serials are expected behaviour in postgresql
|
||||
/// todo: cache size should probably be configurable / dependent on desired memory usage
|
||||
pub async fn get_activity_cached(
|
||||
pool: &mut DbPool<'_>,
|
||||
activity_id: ActivityId,
|
||||
) -> Result<Option<Arc<(Activity, SharedInboxActivities)>>> {
|
||||
static ACTIVITIES: Lazy<Cache<ActivityId, Option<Arc<(Activity, SharedInboxActivities)>>>> =
|
||||
Lazy::new(|| Cache::builder().max_capacity(10000).build());
|
||||
ACTIVITIES
|
||||
.try_get_with(activity_id, async {
|
||||
let row = Activity::read(pool, activity_id)
|
||||
.await
|
||||
.optional()
|
||||
.context("could not read activity")?;
|
||||
let Some(mut row) = row else { return anyhow::Result::<_, anyhow::Error>::Ok(None) };
|
||||
if row.send_targets.is_none() {
|
||||
// must be a received activity
|
||||
return Ok(None);
|
||||
}
|
||||
// swap to avoid cloning
|
||||
let mut data = Value::Null;
|
||||
std::mem::swap(&mut row.data, &mut data);
|
||||
let activity_actual: SharedInboxActivities = serde_json::from_value(data)?;
|
||||
|
||||
Ok(Some(Arc::new((row, activity_actual))))
|
||||
})
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("err getting activity: {e}"))
|
||||
}
|
||||
|
||||
/// return the most current activity id (with 1 second cache)
|
||||
pub async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result<ActivityId> {
|
||||
static CACHE: Lazy<Cache<(), ActivityId>> = Lazy::new(|| {
|
||||
Cache::builder()
|
||||
.time_to_live(Duration::from_secs(1))
|
||||
.build()
|
||||
});
|
||||
CACHE
|
||||
.try_get_with((), async {
|
||||
let conn = &mut get_conn(pool).await?;
|
||||
let Sequence {
|
||||
last_value: latest_id,
|
||||
} = diesel::sql_query("select last_value from activity_id_seq")
|
||||
.get_result(conn)
|
||||
.await?;
|
||||
anyhow::Result::<_, anyhow::Error>::Ok(latest_id as ActivityId)
|
||||
})
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("err getting id: {e}"))
|
||||
}
|
||||
|
||||
/// how long to sleep based on how many retries have already happened
|
||||
pub fn retry_sleep_duration(retry_count: i32) -> Duration {
|
||||
Duration::from_secs_f64(10.0 * 2.0_f64.powf(retry_count as f64))
|
||||
}
|
||||
|
||||
#[derive(QueryableByName)]
|
||||
struct Sequence {
|
||||
#[diesel(sql_type = Int8)]
|
||||
last_value: i64, // this value is bigint for some reason even if sequence is int4
|
||||
}
|
@ -0,0 +1,218 @@
|
||||
use crate::{
|
||||
federation_queue_state::FederationQueueState,
|
||||
util::{
|
||||
get_activity_cached,
|
||||
get_actor_cached,
|
||||
get_latest_activity_id,
|
||||
intern_url,
|
||||
retry_sleep_duration,
|
||||
},
|
||||
};
|
||||
use activitypub_federation::{
|
||||
activity_queue::{prepare_raw, send_raw, sign_raw},
|
||||
config::Data,
|
||||
};
|
||||
use anyhow::Result;
|
||||
use chrono::{DateTime, TimeZone, Utc};
|
||||
use lemmy_db_schema::{
|
||||
newtypes::{CommunityId, InstanceId},
|
||||
source::{activity::Activity, instance::Instance, site::Site},
|
||||
utils::DbPool,
|
||||
};
|
||||
use lemmy_db_views_actor::structs::CommunityFollowerView;
|
||||
use lemmy_utils::{error::LemmyErrorExt2, REQWEST_TIMEOUT};
|
||||
use reqwest::Url;
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
collections::{HashMap, HashSet},
|
||||
ops::Deref,
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::{sync::mpsc::UnboundedSender, time::sleep};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
/// save state to db every n sends if there's no failures (otherwise state is saved after every attempt)
|
||||
static SAVE_STATE_EVERY_IT: i64 = 100;
|
||||
static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(10);
|
||||
|
||||
/// loop fetch new activities from db and send them to the inboxes of the given instances
|
||||
/// this worker only returns if (a) there is an internal error or (b) the cancellation token is cancelled (graceful exit)
|
||||
pub async fn instance_worker(
|
||||
mut pool: DbPool<'_>,
|
||||
instance: Instance,
|
||||
data: Data<()>,
|
||||
stop: CancellationToken,
|
||||
stats_sender: UnboundedSender<FederationQueueState>,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
let mut last_full_communities_fetch = Utc.timestamp_nanos(0);
|
||||
let mut last_incremental_communities_fetch = Utc.timestamp_nanos(0);
|
||||
let mut last_state_insert = Utc.timestamp_nanos(0);
|
||||
let mut followed_communities: HashMap<CommunityId, HashSet<Arc<Url>>> = get_communities(
|
||||
&mut pool,
|
||||
instance.id,
|
||||
&mut last_incremental_communities_fetch,
|
||||
)
|
||||
.await?;
|
||||
let site = Site::read_from_instance_id(&mut pool, instance.id).await?;
|
||||
|
||||
let mut state = FederationQueueState::load(&mut pool, &instance.domain).await?;
|
||||
if state.fail_count > 0 {
|
||||
// before starting queue, sleep remaining duration
|
||||
let elapsed = (Utc::now() - state.last_retry).to_std()?;
|
||||
let remaining = retry_sleep_duration(state.fail_count) - elapsed;
|
||||
tokio::select! {
|
||||
() = sleep(remaining) => {},
|
||||
() = stop.cancelled() => { return Ok(()); }
|
||||
}
|
||||
}
|
||||
while !stop.is_cancelled() {
|
||||
let latest_id = get_latest_activity_id(&mut pool).await?;
|
||||
let mut id = state.last_successful_id;
|
||||
if id == latest_id {
|
||||
// no more work to be done, wait before rechecking
|
||||
tokio::select! {
|
||||
() = sleep(Duration::from_secs(10)) => { continue; },
|
||||
() = stop.cancelled() => { return Ok(()); }
|
||||
}
|
||||
}
|
||||
let mut processed_activities = 0;
|
||||
'batch: while id < latest_id
|
||||
&& processed_activities < SAVE_STATE_EVERY_IT
|
||||
&& !stop.is_cancelled()
|
||||
{
|
||||
id += 1;
|
||||
processed_activities += 1;
|
||||
let Some(ele) = get_activity_cached(&mut pool, id).await? else {
|
||||
state.last_successful_id = id;
|
||||
continue;
|
||||
};
|
||||
let (activity, object) = (&ele.0, &ele.1);
|
||||
let inbox_urls = get_inbox_urls(&instance, &site, &followed_communities, activity);
|
||||
if inbox_urls.is_empty() {
|
||||
state.last_successful_id = id;
|
||||
continue;
|
||||
}
|
||||
let actor = {
|
||||
// these should always be set for sent activities
|
||||
let (Some(actor_type), Some(apub_id)) = (activity.actor_type, &activity.actor_apub_id) else {
|
||||
tracing::warn!("activity {id} does not have actor_type or actor_apub_id set");
|
||||
state.last_successful_id = id;
|
||||
continue;
|
||||
};
|
||||
get_actor_cached(&mut pool, actor_type, apub_id.deref()).await?
|
||||
};
|
||||
let inbox_urls = inbox_urls.into_iter().map(|e| (*e).clone()).collect();
|
||||
let requests = prepare_raw(object, actor.as_ref(), inbox_urls, &data)
|
||||
.await
|
||||
.into_anyhow()?;
|
||||
for task in requests {
|
||||
// usually only one due to shared inbox
|
||||
let mut req = sign_raw(&task, &data, REQWEST_TIMEOUT).await?;
|
||||
tracing::info!("sending out {}", task);
|
||||
while let Err(e) = send_raw(&task, &data, req).await {
|
||||
tracing::info!("{task} failed: {e}");
|
||||
state.fail_count += 1;
|
||||
state.last_retry = Utc::now();
|
||||
stats_sender.send(state.clone())?;
|
||||
FederationQueueState::upsert(&mut pool, &state).await?;
|
||||
req = sign_raw(&task, &data, REQWEST_TIMEOUT).await?; // resign request
|
||||
tokio::select! {
|
||||
() = sleep(retry_sleep_duration(state.fail_count)) => {},
|
||||
() = stop.cancelled() => {
|
||||
// save state to db and exit
|
||||
break 'batch;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// send success!
|
||||
state.last_successful_id = id;
|
||||
state.fail_count = 0;
|
||||
}
|
||||
|
||||
if Utc::now() - last_state_insert > chrono::Duration::from_std(SAVE_STATE_EVERY_TIME).unwrap() {
|
||||
last_state_insert = Utc::now();
|
||||
FederationQueueState::upsert(&mut pool, &state).await?;
|
||||
stats_sender.send(state.clone())?;
|
||||
}
|
||||
{
|
||||
// update communities
|
||||
if (Utc::now() - last_incremental_communities_fetch) > chrono::Duration::seconds(10) {
|
||||
// process additions every 10s
|
||||
followed_communities.extend(
|
||||
get_communities(
|
||||
&mut pool,
|
||||
instance.id,
|
||||
&mut last_incremental_communities_fetch,
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
}
|
||||
if (Utc::now() - last_full_communities_fetch) > chrono::Duration::seconds(300) {
|
||||
// process removals every 5min
|
||||
last_full_communities_fetch = Utc.timestamp_nanos(0);
|
||||
followed_communities =
|
||||
get_communities(&mut pool, instance.id, &mut last_full_communities_fetch).await?;
|
||||
last_incremental_communities_fetch = last_full_communities_fetch.clone();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// get inbox urls of sending the given activity to the given instance
|
||||
/// most often this will return 0 values (if instance doesn't care about the activity)
|
||||
/// or 1 value (the shared inbox)
|
||||
/// > 1 values only happens for non-lemmy software
|
||||
fn get_inbox_urls(
|
||||
instance: &Instance,
|
||||
site: &Option<Site>,
|
||||
followed_communities: &HashMap<CommunityId, HashSet<Arc<Url>>>,
|
||||
activity: &Activity,
|
||||
) -> HashSet<Arc<Url>> {
|
||||
let mut inbox_urls = HashSet::new();
|
||||
let Some(targets) = &activity.send_targets else {
|
||||
return inbox_urls;
|
||||
};
|
||||
if targets.all_instances {
|
||||
if let Some(site) = &site {
|
||||
// todo: when does an instance not have a site?
|
||||
inbox_urls.insert(intern_url(Cow::Borrowed(site.inbox_url.deref())));
|
||||
}
|
||||
}
|
||||
for t in &targets.community_followers_of {
|
||||
if let Some(urls) = followed_communities.get(t) {
|
||||
inbox_urls.extend(urls.iter().map(|e| e.clone()));
|
||||
}
|
||||
}
|
||||
for inbox in &targets.inboxes {
|
||||
if inbox.domain() != Some(&instance.domain) {
|
||||
continue;
|
||||
}
|
||||
inbox_urls.insert(intern_url(Cow::Borrowed(inbox)));
|
||||
}
|
||||
inbox_urls
|
||||
}
|
||||
|
||||
/// get a list of local communities with the remote inboxes on the given instance that cares about them
|
||||
async fn get_communities(
|
||||
pool: &mut DbPool<'_>,
|
||||
instance_id: InstanceId,
|
||||
last_fetch: &mut DateTime<Utc>,
|
||||
) -> Result<HashMap<CommunityId, HashSet<Arc<Url>>>> {
|
||||
let e = *last_fetch;
|
||||
*last_fetch = Utc::now(); // update to time before fetch to ensure overlap
|
||||
Ok(
|
||||
CommunityFollowerView::get_instance_followed_community_inboxes(pool, instance_id, e)
|
||||
.await?
|
||||
.into_iter()
|
||||
.fold(HashMap::new(), |mut map, (c, u)| {
|
||||
map
|
||||
.entry(c)
|
||||
.or_insert_with(|| HashSet::new())
|
||||
.insert(intern_url(Cow::Owned(u.into())));
|
||||
map
|
||||
}),
|
||||
)
|
||||
}
|
@ -0,0 +1,11 @@
|
||||
ALTER TABLE activity
|
||||
DROP COLUMN send_targets,
|
||||
DROP COLUMN actor_apub_id,
|
||||
DROP COLUMN actor_type;
|
||||
|
||||
DROP TYPE actor_type_enum;
|
||||
|
||||
DROP TABLE federation_queue_state;
|
||||
|
||||
DROP INDEX idx_community_follower_published;
|
||||
|
@ -0,0 +1,21 @@
|
||||
CREATE TYPE actor_type_enum AS enum(
|
||||
'site',
|
||||
'community',
|
||||
'person'
|
||||
);
|
||||
|
||||
ALTER TABLE activity
|
||||
ADD COLUMN send_targets jsonb DEFAULT NULL,
|
||||
ADD COLUMN actor_type actor_type_enum DEFAULT NULL,
|
||||
ADD COLUMN actor_apub_id text DEFAULT NULL;
|
||||
|
||||
CREATE TABLE federation_queue_state(
|
||||
domain text PRIMARY KEY,
|
||||
last_successful_id integer NOT NULL,
|
||||
fail_count integer NOT NULL,
|
||||
last_retry timestamptz NOT NULL
|
||||
);
|
||||
|
||||
-- for incremental fetches of followers
|
||||
CREATE INDEX idx_community_follower_published ON community_follower(published);
|
||||
|
Loading…
Reference in New Issue