Fix federate loop (#4330)

* make activity channel infallible

* clippy

* federate: make cancellabletask loop itself
reduce-pool-size^2
phiresky 4 months ago committed by GitHub
parent 53147596b4
commit 024ab7d530
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -23,6 +23,7 @@ static INSTANCES_RECHECK_DELAY: Duration = Duration::from_secs(5);
#[cfg(not(debug_assertions))]
static INSTANCES_RECHECK_DELAY: Duration = Duration::from_secs(60);
#[derive(Clone)]
pub struct Opts {
/// how many processes you are starting in total
pub process_count: i32,
@ -36,7 +37,7 @@ async fn start_stop_federation_workers(
federation_config: FederationConfig<LemmyContext>,
cancel: CancellationToken,
) -> anyhow::Result<()> {
let mut workers = HashMap::<InstanceId, CancellableTask<_>>::new();
let mut workers = HashMap::<InstanceId, CancellableTask>::new();
let (stats_sender, stats_receiver) = unbounded_channel();
let exit_print = tokio::spawn(receive_print_stats(pool.clone(), stats_receiver));
@ -66,40 +67,30 @@ async fn start_stop_federation_workers(
let should_federate = allowed && !is_dead;
if should_federate {
if workers.contains_key(&instance.id) {
if workers
.get(&instance.id)
.map(util::CancellableTask::has_ended)
.unwrap_or(false)
{
// task must have errored out, remove and recreated it
let worker = workers
.remove(&instance.id)
.expect("just checked contains_key");
tracing::error!(
"worker for {} has stopped, recreating: {:?}",
instance.domain,
worker.cancel().await
);
} else {
continue;
}
// worker already running
continue;
}
// create new worker
let config = federation_config.clone();
let stats_sender = stats_sender.clone();
let context = federation_config.to_request_data();
let pool = pool.clone();
workers.insert(
instance.id,
CancellableTask::spawn(WORKER_EXIT_TIMEOUT, |stop| async move {
InstanceWorker::init_and_loop(
instance,
context,
&mut DbPool::Pool(&pool),
stop,
stats_sender,
)
.await?;
Ok(())
CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |stop| {
let instance = instance.clone();
let req_data = config.clone().to_request_data();
let stats_sender = stats_sender.clone();
let pool = pool.clone();
async move {
InstanceWorker::init_and_loop(
instance,
req_data,
&mut DbPool::Pool(&pool),
stop,
stats_sender,
)
.await
}
}),
);
} else if !should_federate {
@ -135,9 +126,12 @@ pub fn start_stop_federation_workers_cancellable(
opts: Opts,
pool: ActualDbPool,
config: FederationConfig<LemmyContext>,
) -> CancellableTask<()> {
CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |c| {
start_stop_federation_workers(opts, pool, config, c)
) -> CancellableTask {
CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |stop| {
let opts = opts.clone();
let pool = pool.clone();
let config = config.clone();
async move { start_stop_federation_workers(opts, pool, config, stop).await }
})
}

@ -20,12 +20,7 @@ use moka::future::Cache;
use once_cell::sync::Lazy;
use reqwest::Url;
use serde_json::Value;
use std::{
future::Future,
pin::Pin,
sync::{Arc, RwLock},
time::Duration,
};
use std::{fmt::Debug, future::Future, pin::Pin, sync::Arc, time::Duration};
use tokio::{task::JoinHandle, time::sleep};
use tokio_util::sync::CancellationToken;
@ -49,41 +44,41 @@ pub(crate) static WORK_FINISHED_RECHECK_DELAY: Lazy<Duration> = Lazy::new(|| {
}
});
pub struct CancellableTask<R: Send + 'static> {
f: Pin<Box<dyn Future<Output = Result<R, anyhow::Error>> + Send + 'static>>,
ended: Arc<RwLock<bool>>,
/// A task that will be run in an infinite loop, unless it is cancelled.
/// If the task exits without being cancelled, an error will be logged and the task will be restarted.
pub struct CancellableTask {
f: Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + 'static>>,
}
impl<R: Send + 'static> CancellableTask<R> {
impl CancellableTask {
/// spawn a task but with graceful shutdown
pub fn spawn<F>(
pub fn spawn<F, R: Debug>(
timeout: Duration,
task: impl FnOnce(CancellationToken) -> F,
) -> CancellableTask<R>
task: impl Fn(CancellationToken) -> F + Send + 'static,
) -> CancellableTask
where
F: Future<Output = Result<R>> + Send + 'static,
F: Future<Output = R> + Send + 'static,
{
let stop = CancellationToken::new();
let task = task(stop.clone());
let ended = Arc::new(RwLock::new(false));
let ended_write = ended.clone();
let task: JoinHandle<Result<R>> = tokio::spawn(async move {
match task.await {
Ok(o) => Ok(o),
Err(e) => {
*ended_write.write().expect("poisoned") = true;
Err(e)
let stop2 = stop.clone();
let task: JoinHandle<()> = tokio::spawn(async move {
loop {
let res = task(stop2.clone()).await;
if stop2.is_cancelled() {
return;
} else {
tracing::warn!("task exited, restarting: {res:?}");
}
}
});
let abort = task.abort_handle();
CancellableTask {
ended,
f: Box::pin(async move {
stop.cancel();
tokio::select! {
r = task => {
Ok(r.context("could not join")??)
r.context("could not join")?;
Ok(())
},
_ = sleep(timeout) => {
abort.abort();
@ -96,12 +91,9 @@ impl<R: Send + 'static> CancellableTask<R> {
}
/// cancel the cancel signal, wait for timeout for the task to stop gracefully, otherwise abort it
pub async fn cancel(self) -> Result<R, anyhow::Error> {
pub async fn cancel(self) -> Result<(), anyhow::Error> {
self.f.await
}
pub fn has_ended(&self) -> bool {
*self.ended.read().expect("poisoned")
}
}
/// assuming apub priv key and ids are immutable, then we don't need to have TTL

@ -206,6 +206,7 @@ impl InstanceWorker {
.await
.context("failed figuring out inbox urls")?;
if inbox_urls.is_empty() {
tracing::debug!("{}: {:?} no inboxes", self.instance.domain, activity.id);
self.state.last_successful_id = Some(activity.id);
self.state.last_successful_published_time = Some(activity.published);
return Ok(());

Loading…
Cancel
Save