federation: some comments

federation-send-parallel
phiresky 2 months ago
parent 539f06af97
commit 491daabaf2

@ -93,15 +93,14 @@ struct SendSuccessInfo {
was_skipped: bool,
}
impl PartialOrd for SendSuccessInfo {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
other.activity_id.partial_cmp(&self.activity_id)
}
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
other.activity_id.partial_cmp(&self.activity_id)
}
}
impl Ord for SendSuccessInfo {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
other.activity_id.cmp(&self.activity_id)
}
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
other.activity_id.cmp(&self.activity_id)
}
}
enum SendActivityResult {
Success(SendSuccessInfo),
@ -115,13 +114,11 @@ impl InstanceWorker {
pub(crate) async fn init_and_loop(
instance: Instance,
config: FederationConfig<LemmyContext>,
// pool: ActualDbPool, // in theory there's a ref to the pool in context, but i couldn't get that to work wrt lifetimes
stop: CancellationToken,
stats_sender: UnboundedSender<(String, FederationQueueState)>,
) -> Result<(), anyhow::Error> {
let state =
FederationQueueState::load(&mut config.to_request_data().pool(), instance.id).await?;
let pool = config.to_request_data().inner_pool().clone();
let state = FederationQueueState::load(&mut DbPool::Pool(&pool), instance.id).await?;
let mut worker = InstanceWorker {
instance,
site_loaded: false,
@ -147,28 +144,43 @@ impl InstanceWorker {
// activities that have been successfully sent but
// that are not the lowest number and thus can't be written to the database yet
let mut successfuls = BinaryHeap::<SendSuccessInfo>::new();
// number of activities that currently have a task spawned to send it
let mut in_flight: i64 = 0;
let (report_inbox_result, mut receive_inbox_result) =
// each HTTP send will report back to this channel concurrently
let (report_send_result, mut receive_send_result) =
tokio::sync::mpsc::unbounded_channel::<SendActivityResult>();
while !self.stop.is_cancelled() {
// check if we need to wait for a send to finish before sending the next one
// we wait if (a) the last request failed, only if a request is already in flight (not at the start of the loop)
// or (b) if we have too many successfuls in memory or (c) if we have too many in flight
let need_wait_for_event = (in_flight != 0 && self.state.fail_count > 0)
|| successfuls.len() > MAX_SUCCESSFULS
|| successfuls.len() >= MAX_SUCCESSFULS
|| in_flight >= *CONCURRENT_SENDS;
if need_wait_for_event || receive_inbox_result.len() > 4 {
if need_wait_for_event || receive_send_result.len() > 4 {
// if len() > 0 then this does not block and allows us to write to db more often
// if len is 0 then this means we wait for something to change our above conditions,
// which can only happen by an event sent into the channel
self
.handle_send_results(&mut receive_inbox_result, &mut successfuls, &mut in_flight)
.handle_send_results(&mut receive_send_result, &mut successfuls, &mut in_flight)
.await?;
// handle_send_results does not guarantee that we are now in a condition where we want to send a new one,
// so repeat this check until the if no longer applies
continue;
} else {
// send a new activity if there is one
self.update_communities().await?;
let last_successful_id = self
.state
.last_successful_id
.map(|e| e.0)
.expect("set above");
let next_id = ActivityId(last_successful_id + (successfuls.len() as i64) + in_flight + 1);
let next_id = {
// calculate next id to send based on the last id and the in flight requests
let last_successful_id = self
.state
.last_successful_id
.map(|e| e.0)
.expect("set above");
ActivityId(last_successful_id + (successfuls.len() as i64) + in_flight + 1)
};
if next_id > latest_id {
// lazily fetch latest id only if we have cought up
latest_id = self.get_latest_id().await?;
if next_id > latest_id {
// no more work to be done, wait before rechecking
@ -181,7 +193,7 @@ impl InstanceWorker {
}
in_flight += 1;
self
.spawn_send_if_needed(next_id, report_inbox_result.clone())
.spawn_send_if_needed(next_id, report_send_result.clone())
.await?;
}
}
@ -348,11 +360,12 @@ impl InstanceWorker {
let stop = self.stop.clone();
let domain = self.instance.domain.clone();
tokio::spawn(async move {
let mut report = report;
if let Err(e) = InstanceWorker::send_retry_loop(
&ele.0,
&ele.1,
inbox_urls,
report,
&mut report,
initial_fail_count,
domain,
data,
@ -365,6 +378,11 @@ impl InstanceWorker {
ele.0.ap_id,
e
);
report.send(SendActivityResult::Success(SendSuccessInfo {
activity_id,
published: None,
was_skipped: true,
})).ok();
}
});
Ok(())
@ -376,7 +394,7 @@ impl InstanceWorker {
activity: &SentActivity,
object: &SharedInboxActivities,
inbox_urls: Vec<Url>,
report: UnboundedSender<SendActivityResult>,
report: &mut UnboundedSender<SendActivityResult>,
initial_fail_count: i32,
domain: String,
context: Data<LemmyContext>,
@ -384,7 +402,7 @@ impl InstanceWorker {
) -> Result<()> {
let pool = &mut context.pool();
let Some(actor_apub_id) = &activity.actor_apub_id else {
return Ok(()); // activity was inserted before persistent queue was activated
return Err(anyhow::anyhow!("activity is from before lemmy 0.19"));
};
let actor = get_actor_cached(pool, activity.actor_type, actor_apub_id)
.await
@ -413,6 +431,7 @@ impl InstanceWorker {
() = sleep(retry_delay) => {},
() = stop.cancelled() => {
// save state to db and exit
// TODO: do we need to report state here to prevent hang on exit?
return Ok(());
}
}

Loading…
Cancel
Save