|
|
@ -37,8 +37,8 @@ use tokio_util::sync::CancellationToken;
|
|
|
|
use tracing::{debug, info, trace, warn};
|
|
|
|
use tracing::{debug, info, trace, warn};
|
|
|
|
|
|
|
|
|
|
|
|
/// Check whether to save state to db every n sends if there's no failures (during failures state is
|
|
|
|
/// Check whether to save state to db every n sends if there's no failures (during failures state is
|
|
|
|
/// saved after every attempt) This determines the batch size for loop_batch. After a batch ends and
|
|
|
|
/// saved after every attempt). This determines the batch size for loop_batch. After a batch ends
|
|
|
|
/// SAVE_STATE_EVERY_TIME has passed, the federation_queue_state is updated in the DB.
|
|
|
|
/// and SAVE_STATE_EVERY_TIME has passed, the federation_queue_state is updated in the DB.
|
|
|
|
static CHECK_SAVE_STATE_EVERY_IT: i64 = 100;
|
|
|
|
static CHECK_SAVE_STATE_EVERY_IT: i64 = 100;
|
|
|
|
/// Save state to db after this time has passed since the last state (so if the server crashes or is
|
|
|
|
/// Save state to db after this time has passed since the last state (so if the server crashes or is
|
|
|
|
/// SIGKILLed, less than X seconds of activities are resent)
|
|
|
|
/// SIGKILLed, less than X seconds of activities are resent)
|
|
|
@ -161,7 +161,9 @@ impl InstanceWorker {
|
|
|
|
id
|
|
|
|
id
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
// this is the initial creation (instance first seen) of the federation queue for this
|
|
|
|
// this is the initial creation (instance first seen) of the federation queue for this
|
|
|
|
// instance skip all past activities:
|
|
|
|
// instance
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// skip all past activities:
|
|
|
|
self.state.last_successful_id = Some(latest_id);
|
|
|
|
self.state.last_successful_id = Some(latest_id);
|
|
|
|
// save here to ensure it's not read as 0 again later if no activities have happened
|
|
|
|
// save here to ensure it's not read as 0 again later if no activities have happened
|
|
|
|
self.save_and_send_state(pool).await?;
|
|
|
|
self.save_and_send_state(pool).await?;
|
|
|
|