mirror of https://github.com/LemmyNet/lemmy
Rewrite collections to use new fetcher (#1861)
* Merge traits ToApub and FromApub into ApubObject * Rewrite community outbox to use new fetcher * Rewrite community moderators collection * Rewrite tombstoneremove_ansible
parent
d9ecabee87
commit
61189efe72
@ -0,0 +1,141 @@
|
||||
use crate::{
|
||||
collections::CommunityContext,
|
||||
context::lemmy_context,
|
||||
fetcher::object_id::ObjectId,
|
||||
generate_moderators_url,
|
||||
objects::person::ApubPerson,
|
||||
};
|
||||
use activitystreams::{
|
||||
base::AnyBase,
|
||||
chrono::NaiveDateTime,
|
||||
collection::kind::OrderedCollectionType,
|
||||
primitives::OneOrMany,
|
||||
url::Url,
|
||||
};
|
||||
use lemmy_api_common::blocking;
|
||||
use lemmy_apub_lib::{traits::ApubObject, verify::verify_domains_match};
|
||||
use lemmy_db_schema::{
|
||||
source::community::{CommunityModerator, CommunityModeratorForm},
|
||||
traits::Joinable,
|
||||
};
|
||||
use lemmy_db_views_actor::community_moderator_view::CommunityModeratorView;
|
||||
use lemmy_utils::LemmyError;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::skip_serializing_none;
|
||||
|
||||
#[skip_serializing_none]
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct GroupModerators {
|
||||
#[serde(rename = "@context")]
|
||||
context: OneOrMany<AnyBase>,
|
||||
r#type: OrderedCollectionType,
|
||||
id: Url,
|
||||
ordered_items: Vec<ObjectId<ApubPerson>>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) struct ApubCommunityModerators(pub(crate) Vec<CommunityModeratorView>);
|
||||
|
||||
#[async_trait::async_trait(?Send)]
|
||||
impl ApubObject for ApubCommunityModerators {
|
||||
type DataType = CommunityContext;
|
||||
type TombstoneType = ();
|
||||
type ApubType = GroupModerators;
|
||||
|
||||
fn last_refreshed_at(&self) -> Option<NaiveDateTime> {
|
||||
None
|
||||
}
|
||||
|
||||
async fn read_from_apub_id(
|
||||
_object_id: Url,
|
||||
data: &Self::DataType,
|
||||
) -> Result<Option<Self>, LemmyError> {
|
||||
// Only read from database if its a local community, otherwise fetch over http
|
||||
if data.0.local {
|
||||
let cid = data.0.id;
|
||||
let moderators = blocking(data.1.pool(), move |conn| {
|
||||
CommunityModeratorView::for_community(conn, cid)
|
||||
})
|
||||
.await??;
|
||||
Ok(Some(ApubCommunityModerators { 0: moderators }))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
async fn delete(self, _data: &Self::DataType) -> Result<(), LemmyError> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn to_apub(&self, data: &Self::DataType) -> Result<Self::ApubType, LemmyError> {
|
||||
let ordered_items = self
|
||||
.0
|
||||
.iter()
|
||||
.map(|m| ObjectId::<ApubPerson>::new(m.moderator.actor_id.clone().into_inner()))
|
||||
.collect();
|
||||
Ok(GroupModerators {
|
||||
context: lemmy_context(),
|
||||
r#type: OrderedCollectionType::OrderedCollection,
|
||||
id: generate_moderators_url(&data.0.actor_id)?.into(),
|
||||
ordered_items,
|
||||
})
|
||||
}
|
||||
|
||||
fn to_tombstone(&self) -> Result<Self::TombstoneType, LemmyError> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn from_apub(
|
||||
apub: &Self::ApubType,
|
||||
data: &Self::DataType,
|
||||
expected_domain: &Url,
|
||||
request_counter: &mut i32,
|
||||
) -> Result<Self, LemmyError> {
|
||||
verify_domains_match(expected_domain, &apub.id)?;
|
||||
let community_id = data.0.id;
|
||||
let current_moderators = blocking(data.1.pool(), move |conn| {
|
||||
CommunityModeratorView::for_community(conn, community_id)
|
||||
})
|
||||
.await??;
|
||||
// Remove old mods from database which arent in the moderators collection anymore
|
||||
for mod_user in ¤t_moderators {
|
||||
let mod_id = ObjectId::new(mod_user.moderator.actor_id.clone().into_inner());
|
||||
if !apub.ordered_items.contains(&mod_id) {
|
||||
let community_moderator_form = CommunityModeratorForm {
|
||||
community_id: mod_user.community.id,
|
||||
person_id: mod_user.moderator.id,
|
||||
};
|
||||
blocking(data.1.pool(), move |conn| {
|
||||
CommunityModerator::leave(conn, &community_moderator_form)
|
||||
})
|
||||
.await??;
|
||||
}
|
||||
}
|
||||
|
||||
// Add new mods to database which have been added to moderators collection
|
||||
for mod_id in &apub.ordered_items {
|
||||
let mod_id = ObjectId::new(mod_id.clone());
|
||||
let mod_user: ApubPerson = mod_id.dereference(&data.1, request_counter).await?;
|
||||
|
||||
if !current_moderators
|
||||
.clone()
|
||||
.iter()
|
||||
.map(|c| c.moderator.actor_id.clone())
|
||||
.any(|x| x == mod_user.actor_id)
|
||||
{
|
||||
let community_moderator_form = CommunityModeratorForm {
|
||||
community_id: data.0.id,
|
||||
person_id: mod_user.id,
|
||||
};
|
||||
blocking(data.1.pool(), move |conn| {
|
||||
CommunityModerator::join(conn, &community_moderator_form)
|
||||
})
|
||||
.await??;
|
||||
}
|
||||
}
|
||||
|
||||
// This return value is unused, so just set an empty vec
|
||||
Ok(ApubCommunityModerators { 0: vec![] })
|
||||
}
|
||||
}
|
@ -0,0 +1,128 @@
|
||||
use crate::{
|
||||
activities::{post::create_or_update::CreateOrUpdatePost, CreateOrUpdateType},
|
||||
collections::CommunityContext,
|
||||
context::lemmy_context,
|
||||
generate_outbox_url,
|
||||
objects::{person::ApubPerson, post::ApubPost},
|
||||
};
|
||||
use activitystreams::{
|
||||
base::AnyBase,
|
||||
chrono::NaiveDateTime,
|
||||
collection::kind::OrderedCollectionType,
|
||||
primitives::OneOrMany,
|
||||
url::Url,
|
||||
};
|
||||
use lemmy_api_common::blocking;
|
||||
use lemmy_apub_lib::{
|
||||
data::Data,
|
||||
traits::{ActivityHandler, ApubObject},
|
||||
verify::verify_domains_match,
|
||||
};
|
||||
use lemmy_db_schema::{
|
||||
source::{person::Person, post::Post},
|
||||
traits::Crud,
|
||||
};
|
||||
use lemmy_utils::LemmyError;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::skip_serializing_none;
|
||||
|
||||
#[skip_serializing_none]
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct GroupOutbox {
|
||||
#[serde(rename = "@context")]
|
||||
context: OneOrMany<AnyBase>,
|
||||
r#type: OrderedCollectionType,
|
||||
id: Url,
|
||||
ordered_items: Vec<CreateOrUpdatePost>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) struct ApubCommunityOutbox(Vec<ApubPost>);
|
||||
|
||||
#[async_trait::async_trait(?Send)]
|
||||
impl ApubObject for ApubCommunityOutbox {
|
||||
type DataType = CommunityContext;
|
||||
type TombstoneType = ();
|
||||
type ApubType = GroupOutbox;
|
||||
|
||||
fn last_refreshed_at(&self) -> Option<NaiveDateTime> {
|
||||
None
|
||||
}
|
||||
|
||||
async fn read_from_apub_id(
|
||||
_object_id: Url,
|
||||
data: &Self::DataType,
|
||||
) -> Result<Option<Self>, LemmyError> {
|
||||
// Only read from database if its a local community, otherwise fetch over http
|
||||
if data.0.local {
|
||||
let community_id = data.0.id;
|
||||
let post_list: Vec<ApubPost> = blocking(data.1.pool(), move |conn| {
|
||||
Post::list_for_community(conn, community_id)
|
||||
})
|
||||
.await??
|
||||
.into_iter()
|
||||
.map(Into::into)
|
||||
.collect();
|
||||
Ok(Some(ApubCommunityOutbox(post_list)))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
async fn delete(self, _data: &Self::DataType) -> Result<(), LemmyError> {
|
||||
// do nothing (it gets deleted automatically with the community)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn to_apub(&self, data: &Self::DataType) -> Result<Self::ApubType, LemmyError> {
|
||||
let mut ordered_items = vec![];
|
||||
for post in &self.0 {
|
||||
let actor = post.creator_id;
|
||||
let actor: ApubPerson = blocking(data.1.pool(), move |conn| Person::read(conn, actor))
|
||||
.await??
|
||||
.into();
|
||||
let a =
|
||||
CreateOrUpdatePost::new(post, &actor, &data.0, CreateOrUpdateType::Create, &data.1).await?;
|
||||
ordered_items.push(a);
|
||||
}
|
||||
|
||||
Ok(GroupOutbox {
|
||||
context: lemmy_context(),
|
||||
r#type: OrderedCollectionType::OrderedCollection,
|
||||
id: generate_outbox_url(&data.0.actor_id)?.into(),
|
||||
ordered_items,
|
||||
})
|
||||
}
|
||||
|
||||
fn to_tombstone(&self) -> Result<Self::TombstoneType, LemmyError> {
|
||||
// no tombstone for this, there is only a tombstone for the community
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn from_apub(
|
||||
apub: &Self::ApubType,
|
||||
data: &Self::DataType,
|
||||
expected_domain: &Url,
|
||||
request_counter: &mut i32,
|
||||
) -> Result<Self, LemmyError> {
|
||||
verify_domains_match(expected_domain, &apub.id)?;
|
||||
let mut outbox_activities = apub.ordered_items.clone();
|
||||
if outbox_activities.len() > 20 {
|
||||
outbox_activities = outbox_activities[0..20].to_vec();
|
||||
}
|
||||
|
||||
// We intentionally ignore errors here. This is because the outbox might contain posts from old
|
||||
// Lemmy versions, or from other software which we cant parse. In that case, we simply skip the
|
||||
// item and only parse the ones that work.
|
||||
for activity in outbox_activities {
|
||||
activity
|
||||
.receive(&Data::new(data.1.clone()), request_counter)
|
||||
.await
|
||||
.ok();
|
||||
}
|
||||
|
||||
// This return value is unused, so just set an empty vec
|
||||
Ok(ApubCommunityOutbox { 0: vec![] })
|
||||
}
|
||||
}
|
@ -0,0 +1,7 @@
|
||||
use crate::objects::community::ApubCommunity;
|
||||
use lemmy_websocket::LemmyContext;
|
||||
pub(crate) mod community_moderators;
|
||||
pub(crate) mod community_outbox;
|
||||
|
||||
/// Put community in the data, so we dont have to read it again from the database.
|
||||
pub(crate) struct CommunityContext(pub ApubCommunity, pub LemmyContext);
|
@ -1,144 +0,0 @@
|
||||
use crate::{
|
||||
activities::community::announce::AnnounceActivity,
|
||||
fetcher::{fetch::fetch_remote_object, object_id::ObjectId},
|
||||
objects::{community::Group, person::ApubPerson},
|
||||
};
|
||||
use activitystreams::{
|
||||
base::AnyBase,
|
||||
collection::{CollectionExt, OrderedCollection},
|
||||
};
|
||||
use anyhow::Context;
|
||||
use lemmy_api_common::blocking;
|
||||
use lemmy_apub_lib::{data::Data, traits::ActivityHandler};
|
||||
use lemmy_db_schema::{
|
||||
source::community::{Community, CommunityModerator, CommunityModeratorForm},
|
||||
traits::Joinable,
|
||||
};
|
||||
use lemmy_db_views_actor::community_moderator_view::CommunityModeratorView;
|
||||
use lemmy_utils::{location_info, LemmyError};
|
||||
use lemmy_websocket::LemmyContext;
|
||||
use url::Url;
|
||||
|
||||
pub(crate) async fn update_community_mods(
|
||||
group: &Group,
|
||||
community: &Community,
|
||||
context: &LemmyContext,
|
||||
request_counter: &mut i32,
|
||||
) -> Result<(), LemmyError> {
|
||||
let new_moderators = fetch_community_mods(context, group, request_counter).await?;
|
||||
let community_id = community.id;
|
||||
let current_moderators = blocking(context.pool(), move |conn| {
|
||||
CommunityModeratorView::for_community(conn, community_id)
|
||||
})
|
||||
.await??;
|
||||
// Remove old mods from database which arent in the moderators collection anymore
|
||||
for mod_user in ¤t_moderators {
|
||||
if !new_moderators.contains(&mod_user.moderator.actor_id.clone().into()) {
|
||||
let community_moderator_form = CommunityModeratorForm {
|
||||
community_id: mod_user.community.id,
|
||||
person_id: mod_user.moderator.id,
|
||||
};
|
||||
blocking(context.pool(), move |conn| {
|
||||
CommunityModerator::leave(conn, &community_moderator_form)
|
||||
})
|
||||
.await??;
|
||||
}
|
||||
}
|
||||
|
||||
// Add new mods to database which have been added to moderators collection
|
||||
for mod_id in new_moderators {
|
||||
let mod_id = ObjectId::new(mod_id);
|
||||
let mod_user: ApubPerson = mod_id.dereference(context, request_counter).await?;
|
||||
|
||||
if !current_moderators
|
||||
.clone()
|
||||
.iter()
|
||||
.map(|c| c.moderator.actor_id.clone())
|
||||
.any(|x| x == mod_user.actor_id)
|
||||
{
|
||||
let community_moderator_form = CommunityModeratorForm {
|
||||
community_id: community.id,
|
||||
person_id: mod_user.id,
|
||||
};
|
||||
blocking(context.pool(), move |conn| {
|
||||
CommunityModerator::join(conn, &community_moderator_form)
|
||||
})
|
||||
.await??;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn fetch_community_outbox(
|
||||
context: &LemmyContext,
|
||||
outbox: &Url,
|
||||
recursion_counter: &mut i32,
|
||||
) -> Result<(), LemmyError> {
|
||||
let outbox = fetch_remote_object::<OrderedCollection>(
|
||||
context.client(),
|
||||
&context.settings(),
|
||||
outbox,
|
||||
recursion_counter,
|
||||
)
|
||||
.await?;
|
||||
let outbox_activities = outbox.items().context(location_info!())?.clone();
|
||||
let mut outbox_activities = outbox_activities.many().context(location_info!())?;
|
||||
if outbox_activities.len() > 20 {
|
||||
outbox_activities = outbox_activities[0..20].to_vec();
|
||||
}
|
||||
|
||||
// We intentionally ignore errors here. This is because the outbox might contain posts from old
|
||||
// Lemmy versions, or from other software which we cant parse. In that case, we simply skip the
|
||||
// item and only parse the ones that work.
|
||||
for activity in outbox_activities {
|
||||
parse_outbox_item(activity, context, recursion_counter)
|
||||
.await
|
||||
.ok();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn parse_outbox_item(
|
||||
announce: AnyBase,
|
||||
context: &LemmyContext,
|
||||
request_counter: &mut i32,
|
||||
) -> Result<(), LemmyError> {
|
||||
// TODO: instead of converting like this, we should create a struct CommunityOutbox with
|
||||
// AnnounceActivity as inner type, but that gives me stackoverflow
|
||||
let ser = serde_json::to_string(&announce)?;
|
||||
let announce: AnnounceActivity = serde_json::from_str(&ser)?;
|
||||
announce
|
||||
.receive(&Data::new(context.clone()), request_counter)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn fetch_community_mods(
|
||||
context: &LemmyContext,
|
||||
group: &Group,
|
||||
recursion_counter: &mut i32,
|
||||
) -> Result<Vec<Url>, LemmyError> {
|
||||
if let Some(mods_url) = &group.moderators {
|
||||
let mods = fetch_remote_object::<OrderedCollection>(
|
||||
context.client(),
|
||||
&context.settings(),
|
||||
mods_url,
|
||||
recursion_counter,
|
||||
)
|
||||
.await?;
|
||||
let mods = mods
|
||||
.items()
|
||||
.map(|i| i.as_many())
|
||||
.flatten()
|
||||
.context(location_info!())?
|
||||
.iter()
|
||||
.filter_map(|i| i.as_xsd_any_uri())
|
||||
.map(|u| u.to_owned())
|
||||
.collect();
|
||||
Ok(mods)
|
||||
} else {
|
||||
Ok(vec![])
|
||||
}
|
||||
}
|
@ -1,49 +0,0 @@
|
||||
use crate::check_is_apub_id_valid;
|
||||
use anyhow::anyhow;
|
||||
use lemmy_apub_lib::APUB_JSON_CONTENT_TYPE;
|
||||
use lemmy_utils::{request::retry, settings::structs::Settings, LemmyError};
|
||||
use log::info;
|
||||
use reqwest::Client;
|
||||
use serde::Deserialize;
|
||||
use std::time::Duration;
|
||||
use url::Url;
|
||||
|
||||
/// Maximum number of HTTP requests allowed to handle a single incoming activity (or a single object
|
||||
/// fetch through the search).
|
||||
///
|
||||
/// A community fetch will load the outbox with up to 20 items, and fetch the creator for each item.
|
||||
/// So we are looking at a maximum of 22 requests (rounded up just to be safe).
|
||||
static MAX_REQUEST_NUMBER: i32 = 25;
|
||||
|
||||
/// Fetch any type of ActivityPub object, handling things like HTTP headers, deserialisation,
|
||||
/// timeouts etc.
|
||||
pub(in crate::fetcher) async fn fetch_remote_object<Response>(
|
||||
client: &Client,
|
||||
settings: &Settings,
|
||||
url: &Url,
|
||||
recursion_counter: &mut i32,
|
||||
) -> Result<Response, LemmyError>
|
||||
where
|
||||
Response: for<'de> Deserialize<'de> + std::fmt::Debug,
|
||||
{
|
||||
*recursion_counter += 1;
|
||||
if *recursion_counter > MAX_REQUEST_NUMBER {
|
||||
return Err(anyhow!("Maximum recursion depth reached").into());
|
||||
}
|
||||
check_is_apub_id_valid(url, false, settings)?;
|
||||
|
||||
let timeout = Duration::from_secs(60);
|
||||
|
||||
let res = retry(|| {
|
||||
client
|
||||
.get(url.as_str())
|
||||
.header("Accept", APUB_JSON_CONTENT_TYPE)
|
||||
.timeout(timeout)
|
||||
.send()
|
||||
})
|
||||
.await?;
|
||||
|
||||
let object = res.json().await?;
|
||||
info!("Fetched remote object {}", url);
|
||||
Ok(object)
|
||||
}
|
@ -0,0 +1,33 @@
|
||||
use crate::context::lemmy_context;
|
||||
use activitystreams::{
|
||||
base::AnyBase,
|
||||
chrono::{DateTime, FixedOffset, NaiveDateTime},
|
||||
object::kind::TombstoneType,
|
||||
primitives::OneOrMany,
|
||||
};
|
||||
use lemmy_utils::utils::convert_datetime;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::skip_serializing_none;
|
||||
|
||||
#[skip_serializing_none]
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Tombstone {
|
||||
#[serde(rename = "@context")]
|
||||
context: OneOrMany<AnyBase>,
|
||||
#[serde(rename = "type")]
|
||||
kind: TombstoneType,
|
||||
former_type: String,
|
||||
deleted: DateTime<FixedOffset>,
|
||||
}
|
||||
|
||||
impl Tombstone {
|
||||
pub fn new<T: ToString>(former_type: T, updated_time: NaiveDateTime) -> Tombstone {
|
||||
Tombstone {
|
||||
context: lemmy_context(),
|
||||
kind: TombstoneType::Tombstone,
|
||||
former_type: former_type.to_string(),
|
||||
deleted: convert_datetime(updated_time),
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue