@ -120,11 +122,13 @@ impl<'a> From<&'a ActualDbPool> for DbPool<'a> {
}
}
}
}
/// Runs multiple async functions that take `&mut DbPool<'_>` as input and return `Result`. Only works when the `futures` crate is listed in `Cargo.toml`.
/// Runs multiple async functions that take `&mut DbPool<'_>` as input and return `Result`. Only
/// works when the `futures` crate is listed in `Cargo.toml`.
///
///
/// `$pool` is the value given to each function.
/// `$pool` is the value given to each function.
///
///
/// A `Result` is returned (not in a `Future`, so don't use `.await`). The `Ok` variant contains a tuple with the values returned by the given functions.
/// A `Result` is returned (not in a `Future`, so don't use `.await`). The `Ok` variant contains a
/// tuple with the values returned by the given functions.
///
///
/// The functions run concurrently if `$pool` has the `DbPool::Pool` variant.
/// The functions run concurrently if `$pool` has the `DbPool::Pool` variant.
// the results are correct no matter which community we fetch these for, since it basically covers the "worst case" of the whole page consisting of posts from one community
// the results are correct no matter which community we fetch these for, since it basically
// but using the largest community decreases the pagination-frame so make the real query more efficient.
// covers the "worst case" of the whole page consisting of posts from one community
// but using the largest community decreases the pagination-frame so make the real query more
// In most cases this will fetch the same url many times (the shared inbox url)
// In most cases this will fetch the same url many times (the shared inbox url)
// PG will only send a single copy to rust, but it has to scan through all follower rows (same as it was before).
// PG will only send a single copy to rust, but it has to scan through all follower rows (same
// So on the PG side it would be possible to optimize this further by adding e.g. a new table community_followed_instances (community_id, instance_id)
// as it was before). So on the PG side it would be possible to optimize this further by
// adding e.g. a new table community_followed_instances (community_id, instance_id)
// that would work for all instances that support fully shared inboxes.
// that would work for all instances that support fully shared inboxes.
// It would be a bit more complicated though to keep it in sync.
// It would be a bit more complicated though to keep it in sync.
@ -31,7 +33,8 @@ impl CommunityFollowerView {
.inner_join(community::table)
.inner_join(community::table)
.inner_join(person::table)
.inner_join(person::table)
.filter(person::instance_id.eq(instance_id))
.filter(person::instance_id.eq(instance_id))
.filter(community::local)// this should be a no-op since community_followers table only has local-person+remote-community or remote-person+local-community
.filter(community::local)// this should be a no-op since community_followers table only has
// local-person+remote-community or remote-person+local-community
ListingType::Subscribed=>query.filter(community_follower::pending.is_not_null()),// TODO could be this: and(community_follower::person_id.eq(person_id_join)),
ListingType::Subscribed=>query.filter(community_follower::pending.is_not_null()),/* TODO could be this: and(community_follower::person_id.eq(person_id_join)), */
@ -36,17 +36,22 @@ use tokio::{sync::mpsc::UnboundedSender, time::sleep};
usetokio_util::sync::CancellationToken;
usetokio_util::sync::CancellationToken;
usetracing::{debug,info,trace,warn};
usetracing::{debug,info,trace,warn};
/// Check whether to save state to db every n sends if there's no failures (during failures state is saved after every attempt)
/// Check whether to save state to db every n sends if there's no failures (during failures state is
/// This determines the batch size for loop_batch. After a batch ends and SAVE_STATE_EVERY_TIME has passed, the federation_queue_state is updated in the DB.
/// saved after every attempt) This determines the batch size for loop_batch. After a batch ends and
/// SAVE_STATE_EVERY_TIME has passed, the federation_queue_state is updated in the DB.
staticCHECK_SAVE_STATE_EVERY_IT: i64=100;
staticCHECK_SAVE_STATE_EVERY_IT: i64=100;
/// Save state to db after this time has passed since the last state (so if the server crashes or is SIGKILLed, less than X seconds of activities are resent)
/// Save state to db after this time has passed since the last state (so if the server crashes or is
/// SIGKILLed, less than X seconds of activities are resent)
/// interval with which new additions to community_followers are queried.
/// interval with which new additions to community_followers are queried.
///
///
/// The first time some user on an instance follows a specific remote community (or, more precisely: the first time a (followed_community_id, follower_inbox_url) tuple appears),
/// The first time some user on an instance follows a specific remote community (or, more precisely:
/// this delay limits the maximum time until the follow actually results in activities from that community id being sent to that inbox url.
/// the first time a (followed_community_id, follower_inbox_url) tuple appears), this delay limits
/// This delay currently needs to not be too small because the DB load is currently fairly high because of the current structure of storing inboxes for every person, not having a separate list of shared_inboxes, and the architecture of having every instance queue be fully separate.
/// the maximum time until the follow actually results in activities from that community id being
/// (see https://github.com/LemmyNet/lemmy/issues/3958)
/// sent to that inbox url. This delay currently needs to not be too small because the DB load is
/// currently fairly high because of the current structure of storing inboxes for every person, not
/// having a separate list of shared_inboxes, and the architecture of having every instance queue be
/// fully separate. (see https://github.com/LemmyNet/lemmy/issues/3958)
Utc::now()-chrono::TimeDelta::try_seconds(10).expect("TimeDelta out of bounds");// update to time before fetch to ensure overlap. subtract 10s to ensure overlap even if published date is not exact
Utc::now()-chrono::TimeDelta::try_seconds(10).expect("TimeDelta out of bounds");// update to time before fetch to ensure overlap. subtract 10s to ensure overlap even if
// This is the default log format save for the usage of %{r}a over %a to guarantee to record the client's (forwarded) IP and not the last peer address, since the latter is frequently just a reverse proxy
// This is the default log format save for the usage of %{r}a over %a to guarantee to
// record the client's (forwarded) IP and not the last peer address, since the latter is