Rewrite fetcher (#1792)

* Use new fetcher implementation for post/comment

* rewrite person fetch to use new fetcher

* rewrite community to use new fetcher

* rename new_fetcher to dereference_object_id

* make ObjectId a newtype

* handle deletion in new fetcher

* rewrite apub object search to be generic

* move upsert() method out of ApubObject trait

* simplify ObjectId::new (and fix clippy)
remove_settings_and_secret_singletons_squashed
Nutomic 3 years ago committed by GitHub
parent 21346eb786
commit 527eefbe92
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -135,7 +135,8 @@ test('Update a post', async () => {
test('Sticky a post', async () => {
let postRes = await createPost(alpha, betaCommunity.community.id);
let stickiedPostRes = await stickyPost(alpha, true, postRes.post_view.post);
let betaPost1 = (await resolvePost(beta, postRes.post_view.post)).post;
let stickiedPostRes = await stickyPost(beta, true, betaPost1.post);
expect(stickiedPostRes.post_view.post.stickied).toBe(true);
// Make sure that post is stickied on beta
@ -145,7 +146,7 @@ test('Sticky a post', async () => {
expect(betaPost.post.stickied).toBe(true);
// Unsticky a post
let unstickiedPost = await stickyPost(alpha, false, postRes.post_view.post);
let unstickiedPost = await stickyPost(beta, false, betaPost1.post);
expect(unstickiedPost.post_view.post.stickied).toBe(false);
// Make sure that post is unstickied on beta

@ -13,7 +13,7 @@ use lemmy_apub::{
undo_vote::UndoVote,
vote::{Vote, VoteType},
},
PostOrComment,
fetcher::post_or_comment::PostOrComment,
};
use lemmy_db_queries::{source::comment::Comment_, Likeable, Saveable};
use lemmy_db_schema::{source::comment::*, LocalUserId};

@ -19,7 +19,7 @@ use lemmy_apub::{
},
CreateOrUpdateType,
},
PostOrComment,
fetcher::post_or_comment::PostOrComment,
};
use lemmy_db_queries::{source::post::Post_, Crud, Likeable, Saveable};
use lemmy_db_schema::source::{moderator::*, post::*};

@ -1,6 +1,7 @@
use crate::Perform;
use actix_web::web::Data;
use anyhow::Context;
use diesel::NotFound;
use lemmy_api_common::{
blocking,
build_federated_instances,
@ -9,24 +10,32 @@ use lemmy_api_common::{
is_admin,
site::*,
};
use lemmy_apub::{build_actor_id_from_shortname, fetcher::search::search_by_apub_id, EndpointType};
use lemmy_apub::{
build_actor_id_from_shortname,
fetcher::search::{search_by_apub_id, SearchableObjects},
EndpointType,
};
use lemmy_db_queries::{
from_opt_str_to_opt_enum,
source::site::Site_,
Crud,
DbPool,
DeleteableOrRemoveable,
ListingType,
SearchType,
SortType,
};
use lemmy_db_schema::source::{moderator::*, site::Site};
use lemmy_db_schema::{
source::{moderator::*, site::Site},
PersonId,
};
use lemmy_db_views::{
comment_view::CommentQueryBuilder,
post_view::PostQueryBuilder,
comment_view::{CommentQueryBuilder, CommentView},
post_view::{PostQueryBuilder, PostView},
site_view::SiteView,
};
use lemmy_db_views_actor::{
community_view::CommunityQueryBuilder,
community_view::{CommunityQueryBuilder, CommunityView},
person_view::{PersonQueryBuilder, PersonViewSafe},
};
use lemmy_db_views_moderator::{
@ -376,11 +385,52 @@ impl Perform for ResolveObject {
_websocket_id: Option<ConnectionId>,
) -> Result<ResolveObjectResponse, LemmyError> {
let local_user_view = get_local_user_view_from_jwt_opt(&self.auth, context.pool()).await?;
let res = search_by_apub_id(&self.q, local_user_view, context)
let res = search_by_apub_id(&self.q, context)
.await
.map_err(|_| ApiError::err("couldnt_find_object"))?;
Ok(res)
convert_response(res, local_user_view.map(|l| l.person.id), context.pool())
.await
.map_err(|_| ApiError::err("couldnt_find_object").into())
}
}
async fn convert_response(
object: SearchableObjects,
user_id: Option<PersonId>,
pool: &DbPool,
) -> Result<ResolveObjectResponse, LemmyError> {
let removed_or_deleted;
let mut res = ResolveObjectResponse {
comment: None,
post: None,
community: None,
person: None,
};
use SearchableObjects::*;
match object {
Person(p) => {
removed_or_deleted = p.deleted;
res.person = Some(blocking(pool, move |conn| PersonViewSafe::read(conn, p.id)).await??)
}
Community(c) => {
removed_or_deleted = c.deleted || c.removed;
res.community =
Some(blocking(pool, move |conn| CommunityView::read(conn, c.id, user_id)).await??)
}
Post(p) => {
removed_or_deleted = p.deleted || p.removed;
res.post = Some(blocking(pool, move |conn| PostView::read(conn, p.id, user_id)).await??)
}
Comment(c) => {
removed_or_deleted = c.deleted || c.removed;
res.comment = Some(blocking(pool, move |conn| CommentView::read(conn, c.id, user_id)).await??)
}
};
// if the object was deleted from database, dont return it
if removed_or_deleted {
return Err(NotFound {}.into());
}
Ok(res)
}
#[async_trait::async_trait(?Send)]

@ -15,9 +15,9 @@ use lemmy_apub::{
voting::vote::{Vote, VoteType},
CreateOrUpdateType,
},
fetcher::post_or_comment::PostOrComment,
generate_apub_endpoint,
EndpointType,
PostOrComment,
};
use lemmy_db_queries::{source::comment::Comment_, Crud, Likeable};
use lemmy_db_schema::source::comment::*;

@ -13,9 +13,9 @@ use lemmy_apub::{
voting::vote::{Vote, VoteType},
CreateOrUpdateType,
},
fetcher::post_or_comment::PostOrComment,
generate_apub_endpoint,
EndpointType,
PostOrComment,
};
use lemmy_db_queries::{source::post::Post_, Crud, Likeable};
use lemmy_db_schema::source::post::*;

@ -10,6 +10,7 @@ use crate::{
},
activity_queue::send_to_community_new,
extensions::context::lemmy_context,
fetcher::object_id::ObjectId,
objects::{comment::Note, FromApub, ToApub},
ActorType,
};
@ -26,7 +27,7 @@ use url::Url;
#[derive(Clone, Debug, Deserialize, Serialize, ActivityFields)]
#[serde(rename_all = "camelCase")]
pub struct CreateOrUpdateComment {
actor: Url,
actor: ObjectId<Person>,
to: [PublicUrl; 1],
object: Note,
cc: Vec<Url>,
@ -60,7 +61,7 @@ impl CreateOrUpdateComment {
let maa = collect_non_local_mentions(comment, &community, context).await?;
let create_or_update = CreateOrUpdateComment {
actor: actor.actor_id(),
actor: ObjectId::new(actor.actor_id()),
to: [PublicUrl::Public],
object: comment.to_apub(context.pool()).await?,
cc: maa.ccs,
@ -84,11 +85,11 @@ impl ActivityHandler for CreateOrUpdateComment {
request_counter: &mut i32,
) -> Result<(), LemmyError> {
let community = extract_community(&self.cc, context, request_counter).await?;
let community_id = ObjectId::new(community.actor_id());
verify_activity(self)?;
verify_person_in_community(&self.actor, &community.actor_id(), context, request_counter)
.await?;
verify_domains_match(&self.actor, self.object.id_unchecked())?;
verify_person_in_community(&self.actor, &community_id, context, request_counter).await?;
verify_domains_match(self.actor.inner(), self.object.id_unchecked())?;
// TODO: should add a check that the correct community is in cc (probably needs changes to
// comment deserialization)
self.object.verify(context, request_counter).await?;
@ -100,7 +101,8 @@ impl ActivityHandler for CreateOrUpdateComment {
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
let comment = Comment::from_apub(&self.object, context, &self.actor, request_counter).await?;
let comment =
Comment::from_apub(&self.object, context, self.actor.inner(), request_counter).await?;
let recipients = get_notif_recipients(&self.actor, &comment, context, request_counter).await?;
let notif_type = match self.kind {
CreateOrUpdateType::Create => UserOperationCrud::CreateComment,

@ -1,4 +1,4 @@
use crate::{fetcher::person::get_or_fetch_and_upsert_person, ActorType};
use crate::{fetcher::object_id::ObjectId, ActorType};
use activitystreams::{
base::BaseExt,
link::{LinkExt, Mention},
@ -26,14 +26,14 @@ use url::Url;
pub mod create_or_update;
async fn get_notif_recipients(
actor: &Url,
actor: &ObjectId<Person>,
comment: &Comment,
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<Vec<LocalUserId>, LemmyError> {
let post_id = comment.post_id;
let post = blocking(context.pool(), move |conn| Post::read(conn, post_id)).await??;
let actor = get_or_fetch_and_upsert_person(actor, context, request_counter).await?;
let actor = actor.dereference(context, request_counter).await?;
// Note:
// Although mentions could be gotten from the post tags (they are included there), or the ccs,
@ -76,14 +76,17 @@ pub async fn collect_non_local_mentions(
for mention in &mentions {
// TODO should it be fetching it every time?
if let Ok(actor_id) = fetch_webfinger_url(mention, context.client()).await {
let actor_id: ObjectId<Person> = ObjectId::new(actor_id);
debug!("mention actor_id: {}", actor_id);
addressed_ccs.push(actor_id.to_owned().to_string().parse()?);
let mention_person = get_or_fetch_and_upsert_person(&actor_id, context, &mut 0).await?;
let mention_person = actor_id.dereference(context, &mut 0).await?;
inboxes.push(mention_person.get_shared_inbox_or_inbox_url());
let mut mention_tag = Mention::new();
mention_tag.set_href(actor_id).set_name(mention.full_name());
mention_tag
.set_href(actor_id.into())
.set_name(mention.full_name());
tags.push(mention_tag);
}
}

@ -9,7 +9,7 @@ use crate::{
},
activity_queue::send_to_community_new,
extensions::context::lemmy_context,
fetcher::{community::get_or_fetch_and_upsert_community, person::get_or_fetch_and_upsert_person},
fetcher::object_id::ObjectId,
generate_moderators_url,
ActorType,
};
@ -34,11 +34,11 @@ use url::Url;
#[derive(Clone, Debug, Deserialize, Serialize, ActivityFields)]
#[serde(rename_all = "camelCase")]
pub struct AddMod {
actor: Url,
actor: ObjectId<Person>,
to: [PublicUrl; 1],
object: Url,
object: ObjectId<Person>,
target: Url,
cc: [Url; 1],
cc: [ObjectId<Community>; 1],
#[serde(rename = "type")]
kind: AddType,
id: Url,
@ -57,11 +57,11 @@ impl AddMod {
) -> Result<(), LemmyError> {
let id = generate_activity_id(AddType::Add)?;
let add = AddMod {
actor: actor.actor_id(),
actor: ObjectId::new(actor.actor_id()),
to: [PublicUrl::Public],
object: added_mod.actor_id(),
object: ObjectId::new(added_mod.actor_id()),
target: generate_moderators_url(&community.actor_id)?.into(),
cc: [community.actor_id()],
cc: [ObjectId::new(community.actor_id())],
kind: AddType::Add,
id: id.clone(),
context: lemmy_context(),
@ -84,7 +84,7 @@ impl ActivityHandler for AddMod {
verify_activity(self)?;
verify_person_in_community(&self.actor, &self.cc[0], context, request_counter).await?;
verify_mod_action(&self.actor, self.cc[0].clone(), context).await?;
verify_add_remove_moderator_target(&self.target, self.cc[0].clone())?;
verify_add_remove_moderator_target(&self.target, &self.cc[0])?;
Ok(())
}
@ -93,9 +93,8 @@ impl ActivityHandler for AddMod {
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
let community =
get_or_fetch_and_upsert_community(&self.cc[0], context, request_counter).await?;
let new_mod = get_or_fetch_and_upsert_person(&self.object, context, request_counter).await?;
let community = self.cc[0].dereference(context, request_counter).await?;
let new_mod = self.object.dereference(context, request_counter).await?;
// If we had to refetch the community while parsing the activity, then the new mod has already
// been added. Skip it here as it would result in a duplicate key error.

@ -19,6 +19,7 @@ use crate::{
},
activity_queue::send_activity_new,
extensions::context::lemmy_context,
fetcher::object_id::ObjectId,
http::is_activity_already_known,
insert_activity,
ActorType,
@ -57,7 +58,7 @@ pub enum AnnouncableActivities {
#[derive(Clone, Debug, Deserialize, Serialize, ActivityFields)]
#[serde(rename_all = "camelCase")]
pub struct AnnounceActivity {
actor: Url,
actor: ObjectId<Community>,
to: [PublicUrl; 1],
object: AnnouncableActivities,
cc: Vec<Url>,
@ -78,7 +79,7 @@ impl AnnounceActivity {
context: &LemmyContext,
) -> Result<(), LemmyError> {
let announce = AnnounceActivity {
actor: community.actor_id(),
actor: ObjectId::new(community.actor_id()),
to: [PublicUrl::Public],
object,
cc: vec![community.followers_url()],

@ -8,7 +8,7 @@ use crate::{
},
activity_queue::send_to_community_new,
extensions::context::lemmy_context,
fetcher::{community::get_or_fetch_and_upsert_community, person::get_or_fetch_and_upsert_person},
fetcher::object_id::ObjectId,
ActorType,
};
use activitystreams::{
@ -38,10 +38,10 @@ use url::Url;
#[derive(Clone, Debug, Deserialize, Serialize, ActivityFields)]
#[serde(rename_all = "camelCase")]
pub struct BlockUserFromCommunity {
actor: Url,
actor: ObjectId<Person>,
to: [PublicUrl; 1],
pub(in crate::activities::community) object: Url,
cc: [Url; 1],
pub(in crate::activities::community) object: ObjectId<Person>,
cc: [ObjectId<Community>; 1],
#[serde(rename = "type")]
kind: BlockType,
id: Url,
@ -58,10 +58,10 @@ impl BlockUserFromCommunity {
actor: &Person,
) -> Result<BlockUserFromCommunity, LemmyError> {
Ok(BlockUserFromCommunity {
actor: actor.actor_id(),
actor: ObjectId::new(actor.actor_id()),
to: [PublicUrl::Public],
object: target.actor_id(),
cc: [community.actor_id()],
object: ObjectId::new(target.actor_id()),
cc: [ObjectId::new(community.actor_id())],
kind: BlockType::Block,
id: generate_activity_id(BlockType::Block)?,
context: lemmy_context(),
@ -102,10 +102,8 @@ impl ActivityHandler for BlockUserFromCommunity {
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
let community =
get_or_fetch_and_upsert_community(&self.cc[0], context, request_counter).await?;
let blocked_user =
get_or_fetch_and_upsert_person(&self.object, context, request_counter).await?;
let community = self.cc[0].dereference(context, request_counter).await?;
let blocked_user = self.object.dereference(context, request_counter).await?;
let community_user_ban_form = CommunityPersonBanForm {
community_id: community.id,

@ -10,7 +10,7 @@ use crate::{
},
activity_queue::send_to_community_new,
extensions::context::lemmy_context,
fetcher::{community::get_or_fetch_and_upsert_community, person::get_or_fetch_and_upsert_person},
fetcher::object_id::ObjectId,
generate_moderators_url,
ActorType,
};
@ -35,10 +35,10 @@ use url::Url;
#[derive(Clone, Debug, Deserialize, Serialize, ActivityFields)]
#[serde(rename_all = "camelCase")]
pub struct RemoveMod {
actor: Url,
actor: ObjectId<Person>,
to: [PublicUrl; 1],
pub(in crate::activities) object: Url,
cc: [Url; 1],
pub(in crate::activities) object: ObjectId<Person>,
cc: [ObjectId<Community>; 1],
#[serde(rename = "type")]
kind: RemoveType,
// if target is set, this is means remove mod from community
@ -59,13 +59,13 @@ impl RemoveMod {
) -> Result<(), LemmyError> {
let id = generate_activity_id(RemoveType::Remove)?;
let remove = RemoveMod {
actor: actor.actor_id(),
actor: ObjectId::new(actor.actor_id()),
to: [PublicUrl::Public],
object: removed_mod.actor_id(),
object: ObjectId::new(removed_mod.actor_id()),
target: Some(generate_moderators_url(&community.actor_id)?.into()),
id: id.clone(),
context: lemmy_context(),
cc: [community.actor_id()],
cc: [ObjectId::new(community.actor_id())],
kind: RemoveType::Remove,
unparsed: Default::default(),
};
@ -87,10 +87,10 @@ impl ActivityHandler for RemoveMod {
if let Some(target) = &self.target {
verify_person_in_community(&self.actor, &self.cc[0], context, request_counter).await?;
verify_mod_action(&self.actor, self.cc[0].clone(), context).await?;
verify_add_remove_moderator_target(target, self.cc[0].clone())?;
verify_add_remove_moderator_target(target, &self.cc[0])?;
} else {
verify_delete_activity(
&self.object,
self.object.inner(),
self,
&self.cc[0],
true,
@ -108,10 +108,8 @@ impl ActivityHandler for RemoveMod {
request_counter: &mut i32,
) -> Result<(), LemmyError> {
if self.target.is_some() {
let community =
get_or_fetch_and_upsert_community(&self.cc[0], context, request_counter).await?;
let remove_mod =
get_or_fetch_and_upsert_person(&self.object, context, request_counter).await?;
let community = self.cc[0].dereference(context, request_counter).await?;
let remove_mod = self.object.dereference(context, request_counter).await?;
let form = CommunityModeratorForm {
community_id: community.id,
@ -124,7 +122,14 @@ impl ActivityHandler for RemoveMod {
// TODO: send websocket notification about removed mod
Ok(())
} else {
receive_remove_action(&self.actor, &self.object, None, context, request_counter).await
receive_remove_action(
&self.actor,
self.object.inner(),
None,
context,
request_counter,
)
.await
}
}
}

@ -8,7 +8,7 @@ use crate::{
},
activity_queue::send_to_community_new,
extensions::context::lemmy_context,
fetcher::{community::get_or_fetch_and_upsert_community, person::get_or_fetch_and_upsert_person},
fetcher::object_id::ObjectId,
ActorType,
};
use activitystreams::{
@ -32,10 +32,10 @@ use url::Url;
#[derive(Clone, Debug, Deserialize, Serialize, ActivityFields)]
#[serde(rename_all = "camelCase")]
pub struct UndoBlockUserFromCommunity {
actor: Url,
actor: ObjectId<Person>,
to: [PublicUrl; 1],
object: BlockUserFromCommunity,
cc: [Url; 1],
cc: [ObjectId<Community>; 1],
#[serde(rename = "type")]
kind: UndoType,
id: Url,
@ -56,10 +56,10 @@ impl UndoBlockUserFromCommunity {
let id = generate_activity_id(UndoType::Undo)?;
let undo = UndoBlockUserFromCommunity {
actor: actor.actor_id(),
actor: ObjectId::new(actor.actor_id()),
to: [PublicUrl::Public],
object: block,
cc: [community.actor_id()],
cc: [ObjectId::new(community.actor_id())],
kind: UndoType::Undo,
id: id.clone(),
context: lemmy_context(),
@ -91,10 +91,12 @@ impl ActivityHandler for UndoBlockUserFromCommunity {
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
let community =
get_or_fetch_and_upsert_community(&self.cc[0], context, request_counter).await?;
let blocked_user =
get_or_fetch_and_upsert_person(&self.object.object, context, request_counter).await?;
let community = self.cc[0].dereference(context, request_counter).await?;
let blocked_user = self
.object
.object
.dereference(context, request_counter)
.await?;
let community_user_ban_form = CommunityPersonBanForm {
community_id: community.id,

@ -8,6 +8,7 @@ use crate::{
},
activity_queue::send_to_community_new,
extensions::context::lemmy_context,
fetcher::object_id::ObjectId,
objects::{community::Group, ToApub},
ActorType,
};
@ -34,11 +35,11 @@ use url::Url;
#[derive(Clone, Debug, Deserialize, Serialize, ActivityFields)]
#[serde(rename_all = "camelCase")]
pub struct UpdateCommunity {
actor: Url,
actor: ObjectId<Person>,
to: [PublicUrl; 1],
// TODO: would be nice to use a separate struct here, which only contains the fields updated here
object: Group,
cc: [Url; 1],
cc: [ObjectId<Community>; 1],
#[serde(rename = "type")]
kind: UpdateType,
id: Url,
@ -56,10 +57,10 @@ impl UpdateCommunity {
) -> Result<(), LemmyError> {
let id = generate_activity_id(UpdateType::Update)?;
let update = UpdateCommunity {
actor: actor.actor_id(),
actor: ObjectId::new(actor.actor_id()),
to: [PublicUrl::Public],
object: community.to_apub(context.pool()).await?,
cc: [community.actor_id()],
cc: [ObjectId::new(community.actor_id())],
kind: UpdateType::Update,
id: id.clone(),
context: lemmy_context(),

@ -12,7 +12,7 @@ use crate::{
},
activity_queue::send_to_community_new,
extensions::context::lemmy_context,
fetcher::person::get_or_fetch_and_upsert_person,
fetcher::object_id::ObjectId,
ActorType,
};
use activitystreams::{
@ -49,6 +49,7 @@ use lemmy_websocket::{
UserOperationCrud,
};
use serde::{Deserialize, Serialize};
use serde_with::skip_serializing_none;
use url::Url;
/// This is very confusing, because there are four distinct cases to handle:
@ -59,13 +60,14 @@ use url::Url;
///
/// TODO: we should probably change how community deletions work to simplify this. Probably by
/// wrapping it in an announce just like other activities, instead of having the community send it.
#[skip_serializing_none]
#[derive(Clone, Debug, Deserialize, Serialize, ActivityFields)]
#[serde(rename_all = "camelCase")]
pub struct Delete {
actor: Url,
actor: ObjectId<Person>,
to: [PublicUrl; 1],
pub(in crate::activities::deletion) object: Url,
pub(in crate::activities::deletion) cc: [Url; 1],
pub(in crate::activities::deletion) cc: [ObjectId<Community>; 1],
#[serde(rename = "type")]
kind: DeleteType,
/// If summary is present, this is a mod action (Remove in Lemmy terms). Otherwise, its a user
@ -138,10 +140,10 @@ impl Delete {
summary: Option<String>,
) -> Result<Delete, LemmyError> {
Ok(Delete {
actor: actor.actor_id(),
actor: ObjectId::new(actor.actor_id()),
to: [PublicUrl::Public],
object: object_id,
cc: [community.actor_id()],
cc: [ObjectId::new(community.actor_id())],
kind: DeleteType::Delete,
summary,
id: generate_activity_id(DeleteType::Delete)?,
@ -165,13 +167,13 @@ impl Delete {
}
pub(in crate::activities) async fn receive_remove_action(
actor: &Url,
actor: &ObjectId<Person>,
object: &Url,
reason: Option<String>,
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
let actor = get_or_fetch_and_upsert_person(actor, context, request_counter).await?;
let actor = actor.dereference(context, request_counter).await?;
use UserOperationCrud::*;
match DeletableObjects::read_from_db(object, context).await? {
DeletableObjects::Community(community) => {

@ -4,7 +4,7 @@ use crate::{
verify_mod_action,
verify_person_in_community,
},
fetcher::person::get_or_fetch_and_upsert_person,
fetcher::object_id::ObjectId,
ActorType,
};
use lemmy_api_common::blocking;
@ -99,22 +99,22 @@ impl DeletableObjects {
pub(in crate::activities) async fn verify_delete_activity(
object: &Url,
activity: &dyn ActivityFields,
community_id: &Url,
community_id: &ObjectId<Community>,
is_mod_action: bool,
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
let object = DeletableObjects::read_from_db(object, context).await?;
let actor = ObjectId::new(activity.actor().clone());
match object {
DeletableObjects::Community(c) => {
if c.local {
// can only do this check for local community, in remote case it would try to fetch the
// deleted community (which fails)
verify_person_in_community(activity.actor(), community_id, context, request_counter)
.await?;
verify_person_in_community(&actor, community_id, context, request_counter).await?;
}
// community deletion is always a mod (or admin) action
verify_mod_action(activity.actor(), c.actor_id(), context).await?;
verify_mod_action(&actor, ObjectId::new(c.actor_id()), context).await?;
}
DeletableObjects::Post(p) => {
verify_delete_activity_post_or_comment(
@ -145,14 +145,15 @@ pub(in crate::activities) async fn verify_delete_activity(
async fn verify_delete_activity_post_or_comment(
activity: &dyn ActivityFields,
object_id: &Url,
community_id: &Url,
community_id: &ObjectId<Community>,
is_mod_action: bool,
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
verify_person_in_community(activity.actor(), community_id, context, request_counter).await?;
let actor = ObjectId::new(activity.actor().clone());
verify_person_in_community(&actor, community_id, context, request_counter).await?;
if is_mod_action {
verify_mod_action(activity.actor(), community_id.clone(), context).await?;
verify_mod_action(&actor, community_id.clone(), context).await?;
} else {
// domain of post ap_id and post.creator ap_id are identical, so we just check the former
verify_domains_match(activity.actor(), object_id)?;
@ -171,7 +172,7 @@ struct WebsocketMessages {
/// because of the mod log
async fn receive_delete_action(
object: &Url,
actor: &Url,
actor: &ObjectId<Person>,
ws_messages: WebsocketMessages,
deleted: bool,
context: &LemmyContext,
@ -180,7 +181,7 @@ async fn receive_delete_action(
match DeletableObjects::read_from_db(object, context).await? {
DeletableObjects::Community(community) => {
if community.local {
let mod_ = get_or_fetch_and_upsert_person(actor, context, request_counter).await?;
let mod_ = actor.dereference(context, request_counter).await?;
let object = community.actor_id();
send_apub_delete(&mod_, &community.clone(), object, true, context).await?;
}

@ -13,6 +13,7 @@ use crate::{
},
activity_queue::send_to_community_new,
extensions::context::lemmy_context,
fetcher::object_id::ObjectId,
ActorType,
};
use activitystreams::{
@ -38,10 +39,10 @@ use url::Url;
#[derive(Clone, Debug, Deserialize, Serialize, ActivityFields)]
#[serde(rename_all = "camelCase")]
pub struct UndoDelete {
actor: Url,
actor: ObjectId<Person>,
to: [PublicUrl; 1],
object: Delete,
cc: [Url; 1],
cc: [ObjectId<Community>; 1],
#[serde(rename = "type")]
kind: UndoType,
id: Url,
@ -109,10 +110,10 @@ impl UndoDelete {
let id = generate_activity_id(UndoType::Undo)?;
let undo = UndoDelete {
actor: actor.actor_id(),
actor: ObjectId::new(actor.actor_id()),
to: [PublicUrl::Public],
object,
cc: [community.actor_id()],
cc: [ObjectId::new(community.actor_id())],
kind: UndoType::Undo,
id: id.clone(),
context: lemmy_context(),

@ -7,7 +7,7 @@ use crate::{
},
activity_queue::send_activity_new,
extensions::context::lemmy_context,
fetcher::{community::get_or_fetch_and_upsert_community, person::get_or_fetch_and_upsert_person},
fetcher::object_id::ObjectId,
ActorType,
};
use activitystreams::{
@ -31,8 +31,8 @@ use url::Url;
#[derive(Clone, Debug, Deserialize, Serialize, ActivityFields)]
#[serde(rename_all = "camelCase")]
pub struct AcceptFollowCommunity {
actor: Url,
to: Url,
actor: ObjectId<Community>,
to: ObjectId<Person>,
object: FollowCommunity,
#[serde(rename = "type")]
kind: AcceptType,
@ -57,8 +57,8 @@ impl AcceptFollowCommunity {
.await??;
let accept = AcceptFollowCommunity {
actor: community.actor_id(),
to: person.actor_id(),
actor: ObjectId::new(community.actor_id()),
to: ObjectId::new(person.actor_id()),
object: follow,
kind: AcceptType::Accept,
id: generate_activity_id(AcceptType::Accept)?,
@ -78,8 +78,8 @@ impl ActivityHandler for AcceptFollowCommunity {
request_counter: &mut i32,
) -> Result<(), LemmyError> {
verify_activity(self)?;
verify_urls_match(&self.to, self.object.actor())?;
verify_urls_match(&self.actor, &self.object.to)?;
verify_urls_match(self.to.inner(), self.object.actor())?;
verify_urls_match(self.actor(), self.object.to.inner())?;
verify_community(&self.actor, context, request_counter).await?;
self.object.verify(context, request_counter).await?;
Ok(())
@ -90,8 +90,8 @@ impl ActivityHandler for AcceptFollowCommunity {
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
let actor = get_or_fetch_and_upsert_community(&self.actor, context, request_counter).await?;
let to = get_or_fetch_and_upsert_person(&self.to, context, request_counter).await?;
let actor = self.actor.dereference(context, request_counter).await?;
let to = self.to.dereference(context, request_counter).await?;
// This will throw an error if no follow was requested
blocking(context.pool(), move |conn| {
CommunityFollower::follow_accepted(conn, actor.id, to.id)

@ -7,7 +7,7 @@ use crate::{
},
activity_queue::send_activity_new,
extensions::context::lemmy_context,
fetcher::{community::get_or_fetch_and_upsert_community, person::get_or_fetch_and_upsert_person},
fetcher::object_id::ObjectId,
ActorType,
};
use activitystreams::{
@ -31,9 +31,10 @@ use url::Url;
#[derive(Clone, Debug, Deserialize, Serialize, ActivityFields)]
#[serde(rename_all = "camelCase")]
pub struct FollowCommunity {
actor: Url,
pub(in crate::activities::following) to: Url,
pub(in crate::activities::following) object: Url,
actor: ObjectId<Person>,
// TODO: is there any reason to put the same community id twice, in to and object?
pub(in crate::activities::following) to: ObjectId<Community>,
pub(in crate::activities::following) object: ObjectId<Community>,
#[serde(rename = "type")]
kind: FollowType,
id: Url,
@ -49,9 +50,9 @@ impl FollowCommunity {
community: &Community,
) -> Result<FollowCommunity, LemmyError> {
Ok(FollowCommunity {
actor: actor.actor_id(),
to: community.actor_id(),
object: community.actor_id(),
actor: ObjectId::new(actor.actor_id()),
to: ObjectId::new(community.actor_id()),
object: ObjectId::new(community.actor_id()),
kind: FollowType::Follow,
id: generate_activity_id(FollowType::Follow)?,
context: lemmy_context(),
@ -87,7 +88,7 @@ impl ActivityHandler for FollowCommunity {
request_counter: &mut i32,
) -> Result<(), LemmyError> {
verify_activity(self)?;
verify_urls_match(&self.to, &self.object)?;
verify_urls_match(self.to.inner(), self.object.inner())?;
verify_person(&self.actor, context, request_counter).await?;
Ok(())
}
@ -97,9 +98,8 @@ impl ActivityHandler for FollowCommunity {
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
let actor = get_or_fetch_and_upsert_person(&self.actor, context, request_counter).await?;
let community =
get_or_fetch_and_upsert_community(&self.object, context, request_counter).await?;
let actor = self.actor.dereference(context, request_counter).await?;
let community = self.object.dereference(context, request_counter).await?;
let community_follower_form = CommunityFollowerForm {
community_id: community.id,
person_id: actor.id,

@ -7,7 +7,7 @@ use crate::{
},
activity_queue::send_activity_new,
extensions::context::lemmy_context,
fetcher::{community::get_or_fetch_and_upsert_community, person::get_or_fetch_and_upsert_person},
fetcher::object_id::ObjectId,
ActorType,
};
use activitystreams::{
@ -31,8 +31,8 @@ use url::Url;
#[derive(Clone, Debug, Deserialize, Serialize, ActivityFields)]
#[serde(rename_all = "camelCase")]
pub struct UndoFollowCommunity {
actor: Url,
to: Url,
actor: ObjectId<Person>,
to: ObjectId<Community>,
object: FollowCommunity,
#[serde(rename = "type")]
kind: UndoType,
@ -51,8 +51,8 @@ impl UndoFollowCommunity {
) -> Result<(), LemmyError> {
let object = FollowCommunity::new(actor, community)?;
let undo = UndoFollowCommunity {
actor: actor.actor_id(),
to: community.actor_id(),
actor: ObjectId::new(actor.actor_id()),
to: ObjectId::new(community.actor_id()),
object,
kind: UndoType::Undo,
id: generate_activity_id(UndoType::Undo)?,
@ -72,8 +72,8 @@ impl ActivityHandler for UndoFollowCommunity {
request_counter: &mut i32,
) -> Result<(), LemmyError> {
verify_activity(self)?;
verify_urls_match(&self.to, &self.object.object)?;
verify_urls_match(&self.actor, self.object.actor())?;
verify_urls_match(self.to.inner(), self.object.object.inner())?;
verify_urls_match(self.actor(), self.object.actor())?;
verify_person(&self.actor, context, request_counter).await?;
self.object.verify(context, request_counter).await?;
Ok(())
@ -84,8 +84,8 @@ impl ActivityHandler for UndoFollowCommunity {
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
let actor = get_or_fetch_and_upsert_person(&self.actor, context, request_counter).await?;
let community = get_or_fetch_and_upsert_community(&self.to, context, request_counter).await?;
let actor = self.actor.dereference(context, request_counter).await?;
let community = self.to.dereference(context, request_counter).await?;
let community_follower_form = CommunityFollowerForm {
community_id: community.id,

@ -1,7 +1,7 @@
use crate::{
check_community_or_site_ban,
check_is_apub_id_valid,
fetcher::{community::get_or_fetch_and_upsert_community, person::get_or_fetch_and_upsert_person},
fetcher::object_id::ObjectId,
generate_moderators_url,
};
use anyhow::anyhow;
@ -39,11 +39,11 @@ pub enum CreateOrUpdateType {
/// Checks that the specified Url actually identifies a Person (by fetching it), and that the person
/// doesn't have a site ban.
async fn verify_person(
person_id: &Url,
person_id: &ObjectId<Person>,
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
let person = get_or_fetch_and_upsert_person(person_id, context, request_counter).await?;
let person = person_id.dereference(context, request_counter).await?;
if person.banned {
return Err(anyhow!("Person {} is banned", person_id).into());
}
@ -58,7 +58,8 @@ pub(crate) async fn extract_community(
let mut cc_iter = cc.iter();
loop {
if let Some(cid) = cc_iter.next() {
if let Ok(c) = get_or_fetch_and_upsert_community(cid, context, request_counter).await {
let cid = ObjectId::new(cid.clone());
if let Ok(c) = cid.dereference(context, request_counter).await {
break Ok(c);
}
} else {
@ -70,23 +71,23 @@ pub(crate) async fn extract_community(
/// Fetches the person and community to verify their type, then checks if person is banned from site
/// or community.
pub(crate) async fn verify_person_in_community(
person_id: &Url,
community_id: &Url,
person_id: &ObjectId<Person>,
community_id: &ObjectId<Community>,
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
let community = get_or_fetch_and_upsert_community(community_id, context, request_counter).await?;
let person = get_or_fetch_and_upsert_person(person_id, context, request_counter).await?;
let community = community_id.dereference(context, request_counter).await?;
let person = person_id.dereference(context, request_counter).await?;
check_community_or_site_ban(&person, community.id, context.pool()).await
}
/// Simply check that the url actually refers to a valid group.
async fn verify_community(
community_id: &Url,
community_id: &ObjectId<Community>,
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
get_or_fetch_and_upsert_community(community_id, context, request_counter).await?;
community_id.dereference(context, request_counter).await?;
Ok(())
}
@ -100,12 +101,12 @@ fn verify_activity(activity: &dyn ActivityFields) -> Result<(), LemmyError> {
/// because in case of remote communities, admins can also perform mod actions. As admin status
/// is not federated, we cant verify their actions remotely.
pub(crate) async fn verify_mod_action(
actor_id: &Url,
community: Url,
actor_id: &ObjectId<Person>,
community_id: ObjectId<Community>,
context: &LemmyContext,
) -> Result<(), LemmyError> {
let community = blocking(context.pool(), move |conn| {
Community::read_from_apub_id(conn, &community.into())
Community::read_from_apub_id(conn, &community_id.into())
})
.await??;
@ -133,8 +134,11 @@ pub(crate) async fn verify_mod_action(
/// For Add/Remove community moderator activities, check that the target field actually contains
/// /c/community/moderators. Any different values are unsupported.
fn verify_add_remove_moderator_target(target: &Url, community: Url) -> Result<(), LemmyError> {
if target != &generate_moderators_url(&community.into())?.into_inner() {
fn verify_add_remove_moderator_target(
target: &Url,
community: &ObjectId<Community>,
) -> Result<(), LemmyError> {
if target != &generate_moderators_url(&community.clone().into())?.into_inner() {
return Err(anyhow!("Unkown target url").into());
}
Ok(())

@ -1,7 +1,6 @@
use crate::{
activities::{
community::announce::AnnouncableActivities,
extract_community,
generate_activity_id,
verify_activity,
verify_mod_action,
@ -10,7 +9,7 @@ use crate::{
},
activity_queue::send_to_community_new,
extensions::context::lemmy_context,
fetcher::person::get_or_fetch_and_upsert_person,
fetcher::object_id::ObjectId,
objects::{post::Page, FromApub, ToApub},
ActorType,
};
@ -34,10 +33,10 @@ use url::Url;
#[derive(Clone, Debug, Deserialize, Serialize, ActivityFields)]
#[serde(rename_all = "camelCase")]
pub struct CreateOrUpdatePost {
actor: Url,
actor: ObjectId<Person>,
to: [PublicUrl; 1],
object: Page,
cc: [Url; 1],
cc: [ObjectId<Community>; 1],
#[serde(rename = "type")]
kind: CreateOrUpdateType,
id: Url,
@ -62,10 +61,10 @@ impl CreateOrUpdatePost {
let id = generate_activity_id(kind.clone())?;
let create_or_update = CreateOrUpdatePost {
actor: actor.actor_id(),
actor: ObjectId::new(actor.actor_id()),
to: [PublicUrl::Public],
object: post.to_apub(context.pool()).await?,
cc: [community.actor_id()],
cc: [ObjectId::new(community.actor_id())],
kind,
id: id.clone(),
context: lemmy_context(),
@ -85,13 +84,12 @@ impl ActivityHandler for CreateOrUpdatePost {
request_counter: &mut i32,
) -> Result<(), LemmyError> {
verify_activity(self)?;
let community = extract_community(&self.cc, context, request_counter).await?;
let community_id = community.actor_id();
verify_person_in_community(&self.actor, &community_id, context, request_counter).await?;
let community = self.cc[0].dereference(context, request_counter).await?;
verify_person_in_community(&self.actor, &self.cc[0], context, request_counter).await?;
match self.kind {
CreateOrUpdateType::Create => {
verify_domains_match(&self.actor, self.object.id_unchecked())?;
verify_urls_match(&self.actor, &self.object.attributed_to)?;
verify_domains_match(self.actor.inner(), self.object.id_unchecked())?;
verify_urls_match(self.actor(), self.object.attributed_to.inner())?;
// Check that the post isnt locked or stickied, as that isnt possible for newly created posts.
// However, when fetching a remote post we generate a new create activity with the current
// locked/stickied value, so this check may fail. So only check if its a local community,
@ -105,10 +103,10 @@ impl ActivityHandler for CreateOrUpdatePost {
CreateOrUpdateType::Update => {
let is_mod_action = self.object.is_mod_action(context.pool()).await?;
if is_mod_action {
verify_mod_action(&self.actor, community_id, context).await?;
verify_mod_action(&self.actor, self.cc[0].clone(), context).await?;
} else {
verify_domains_match(&self.actor, self.object.id_unchecked())?;
verify_urls_match(&self.actor, &self.object.attributed_to)?;
verify_domains_match(self.actor.inner(), self.object.id_unchecked())?;
verify_urls_match(self.actor(), self.object.attributed_to.inner())?;
}
}
}
@ -121,7 +119,7 @@ impl ActivityHandler for CreateOrUpdatePost {
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
let actor = get_or_fetch_and_upsert_person(&self.actor, context, request_counter).await?;
let actor = self.actor.dereference(context, request_counter).await?;
let post = Post::from_apub(&self.object, context, &actor.actor_id(), request_counter).await?;
let notif_type = match self.kind {

@ -2,6 +2,7 @@ use crate::{
activities::{generate_activity_id, verify_activity, verify_person, CreateOrUpdateType},
activity_queue::send_activity_new,
extensions::context::lemmy_context,
fetcher::object_id::ObjectId,
objects::{private_message::Note, FromApub, ToApub},
ActorType,
};
@ -21,9 +22,8 @@ pub struct CreateOrUpdatePrivateMessage {
#[serde(rename = "@context")]
pub context: OneOrMany<AnyBase>,
id: Url,
actor: Url,
to: Url,
cc: [Url; 0],
actor: ObjectId<Person>,
to: ObjectId<Person>,
object: Note,
#[serde(rename = "type")]
kind: CreateOrUpdateType,
@ -46,9 +46,8 @@ impl CreateOrUpdatePrivateMessage {
let create_or_update = CreateOrUpdatePrivateMessage {
context: lemmy_context(),
id: id.clone(),
actor: actor.actor_id(),
to: recipient.actor_id(),
cc: [],
actor: ObjectId::new(actor.actor_id()),
to: ObjectId::new(recipient.actor_id()),
object: private_message.to_apub(context.pool()).await?,
kind,
unparsed: Default::default(),
@ -66,7 +65,7 @@ impl ActivityHandler for CreateOrUpdatePrivateMessage {
) -> Result<(), LemmyError> {
verify_activity(self)?;
verify_person(&self.actor, context, request_counter).await?;
verify_domains_match(&self.actor, self.object.id_unchecked())?;
verify_domains_match(self.actor.inner(), self.object.id_unchecked())?;
self.object.verify(context, request_counter).await?;
Ok(())
}
@ -77,7 +76,7 @@ impl ActivityHandler for CreateOrUpdatePrivateMessage {
request_counter: &mut i32,
) -> Result<(), LemmyError> {
let private_message =
PrivateMessage::from_apub(&self.object, context, &self.actor, request_counter).await?;
PrivateMessage::from_apub(&self.object, context, self.actor.inner(), request_counter).await?;
let notif_type = match self.kind {
CreateOrUpdateType::Create => UserOperationCrud::CreatePrivateMessage,

@ -2,6 +2,7 @@ use crate::{
activities::{generate_activity_id, verify_activity, verify_person},
activity_queue::send_activity_new,
extensions::context::lemmy_context,
fetcher::object_id::ObjectId,
ActorType,
};
use activitystreams::{
@ -22,8 +23,8 @@ use url::Url;
#[derive(Clone, Debug, Deserialize, Serialize, ActivityFields)]
#[serde(rename_all = "camelCase")]
pub struct DeletePrivateMessage {
actor: Url,
to: Url,
actor: ObjectId<Person>,
to: ObjectId<Person>,
pub(in crate::activities::private_message) object: Url,
#[serde(rename = "type")]
kind: DeleteType,
@ -40,8 +41,8 @@ impl DeletePrivateMessage {
pm: &PrivateMessage,
) -> Result<DeletePrivateMessage, LemmyError> {
Ok(DeletePrivateMessage {
actor: actor.actor_id(),
to: actor.actor_id(),
actor: ObjectId::new(actor.actor_id()),
to: ObjectId::new(actor.actor_id()),
object: pm.ap_id.clone().into(),
kind: DeleteType::Delete,
id: generate_activity_id(DeleteType::Delete)?,
@ -74,7 +75,7 @@ impl ActivityHandler for DeletePrivateMessage {
) -> Result<(), LemmyError> {
verify_activity(self)?;
verify_person(&self.actor, context, request_counter).await?;
verify_domains_match(&self.actor, &self.object)?;
verify_domains_match(self.actor.inner(), &self.object)?;
Ok(())
}

@ -7,6 +7,7 @@ use crate::{
},
activity_queue::send_activity_new,
extensions::context::lemmy_context,
fetcher::object_id::ObjectId,
ActorType,
};
use activitystreams::{
@ -27,8 +28,8 @@ use url::Url;
#[derive(Clone, Debug, Deserialize, Serialize, ActivityFields)]
#[serde(rename_all = "camelCase")]
pub struct UndoDeletePrivateMessage {
actor: Url,
to: Url,
actor: ObjectId<Person>,
to: ObjectId<Person>,
object: DeletePrivateMessage,
#[serde(rename = "type")]
kind: UndoType,
@ -52,8 +53,8 @@ impl UndoDeletePrivateMessage {
let object = DeletePrivateMessage::new(actor, pm)?;
let id = generate_activity_id(UndoType::Undo)?;
let undo = UndoDeletePrivateMessage {
actor: actor.actor_id(),
to: recipient.actor_id(),
actor: ObjectId::new(actor.actor_id()),
to: ObjectId::new(recipient.actor_id()),
object,
kind: UndoType::Undo,
id: id.clone(),
@ -74,8 +75,8 @@ impl ActivityHandler for UndoDeletePrivateMessage {
) -> Result<(), LemmyError> {
verify_activity(self)?;
verify_person(&self.actor, context, request_counter).await?;
verify_urls_match(&self.actor, self.object.actor())?;
verify_domains_match(&self.actor, &self.object.object)?;
verify_urls_match(self.actor(), self.object.actor())?;
verify_domains_match(self.actor(), &self.object.object)?;
self.object.verify(context, request_counter).await?;
Ok(())
}

@ -1,7 +1,10 @@
use crate::activities::{
community::remove_mod::RemoveMod,
deletion::{undo_delete::UndoDelete, verify_delete_activity},
verify_activity,
use crate::{
activities::{
community::remove_mod::RemoveMod,
deletion::{undo_delete::UndoDelete, verify_delete_activity},
verify_activity,
},
fetcher::object_id::ObjectId,
};
use activitystreams::{
activity::kind::UndoType,
@ -10,6 +13,7 @@ use activitystreams::{
unparsed::Unparsed,
};
use lemmy_apub_lib::{values::PublicUrl, ActivityFields, ActivityHandler};
use lemmy_db_schema::source::{community::Community, person::Person};
use lemmy_utils::LemmyError;
use lemmy_websocket::LemmyContext;
use serde::{Deserialize, Serialize};
@ -18,11 +22,11 @@ use url::Url;
#[derive(Clone, Debug, Deserialize, Serialize, ActivityFields)]
#[serde(rename_all = "camelCase")]
pub struct UndoRemovePostCommentOrCommunity {
actor: Url,
actor: ObjectId<Person>,
to: [PublicUrl; 1],
// Note, there is no such thing as Undo/Remove/Mod, so we ignore that
object: RemoveMod,
cc: [Url; 1],
cc: [ObjectId<Community>; 1],
#[serde(rename = "type")]
kind: UndoType,
id: Url,
@ -43,7 +47,7 @@ impl ActivityHandler for UndoRemovePostCommentOrCommunity {
self.object.verify(context, request_counter).await?;
verify_delete_activity(
&self.object.object,
self.object.object.inner(),
self,
&self.cc[0],
true,
@ -59,6 +63,6 @@ impl ActivityHandler for UndoRemovePostCommentOrCommunity {
context: &LemmyContext,
_request_counter: &mut i32,
) -> Result<(), LemmyError> {
UndoDelete::receive_undo_remove_action(&self.object.object, context).await
UndoDelete::receive_undo_remove_action(self.object.object.inner(), context).await
}
}

@ -12,10 +12,7 @@ use crate::{
},
activity_queue::send_to_community_new,
extensions::context::lemmy_context,
fetcher::{
objects::get_or_fetch_and_insert_post_or_comment,
person::get_or_fetch_and_upsert_person,
},
fetcher::object_id::ObjectId,
ActorType,
PostOrComment,
};
@ -41,10 +38,10 @@ use url::Url;
#[derive(Clone, Debug, Deserialize, Serialize, ActivityFields)]
#[serde(rename_all = "camelCase")]
pub struct UndoVote {
actor: Url,
actor: ObjectId<Person>,
to: [PublicUrl; 1],
object: Vote,
cc: [Url; 1],
cc: [ObjectId<Community>; 1],
#[serde(rename = "type")]
kind: UndoType,
id: Url,
@ -70,10 +67,10 @@ impl UndoVote {
let object = Vote::new(object, actor, &community, kind.clone())?;
let id = generate_activity_id(UndoType::Undo)?;
let undo_vote = UndoVote {
actor: actor.actor_id(),
actor: ObjectId::new(actor.actor_id()),
to: [PublicUrl::Public],
object,
cc: [community.actor_id()],
cc: [ObjectId::new(community.actor_id())],
kind: UndoType::Undo,
id: id.clone(),
context: lemmy_context(),
@ -93,7 +90,7 @@ impl ActivityHandler for UndoVote {
) -> Result<(), LemmyError> {
verify_activity(self)?;
verify_person_in_community(&self.actor, &self.cc[0], context, request_counter).await?;
verify_urls_match(&self.actor, self.object.actor())?;
verify_urls_match(self.actor(), self.object.actor())?;
self.object.verify(context, request_counter).await?;
Ok(())
}
@ -103,10 +100,12 @@ impl ActivityHandler for UndoVote {
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
let actor = get_or_fetch_and_upsert_person(&self.actor, context, request_counter).await?;
let object =
get_or_fetch_and_insert_post_or_comment(&self.object.object, context, request_counter)
.await?;
let actor = self.actor.dereference(context, request_counter).await?;
let object = self
.object
.object
.dereference(context, request_counter)
.await?;
match object {
PostOrComment::Post(p) => undo_vote_post(actor, p.deref(), context).await,
PostOrComment::Comment(c) => undo_vote_comment(actor, c.deref(), context).await,

@ -8,10 +8,7 @@ use crate::{
},
activity_queue::send_to_community_new,
extensions::context::lemmy_context,
fetcher::{
objects::get_or_fetch_and_insert_post_or_comment,
person::get_or_fetch_and_upsert_person,
},
fetcher::object_id::ObjectId,
ActorType,
PostOrComment,
};
@ -61,10 +58,10 @@ impl From<&VoteType> for i16 {
#[derive(Clone, Debug, Deserialize, Serialize, ActivityFields)]
#[serde(rename_all = "camelCase")]
pub struct Vote {
actor: Url,
actor: ObjectId<Person>,
to: [PublicUrl; 1],
pub(in crate::activities::voting) object: Url,
cc: [Url; 1],
pub(in crate::activities::voting) object: ObjectId<PostOrComment>,
cc: [ObjectId<Community>; 1],
#[serde(rename = "type")]
pub(in crate::activities::voting) kind: VoteType,
id: Url,
@ -82,10 +79,10 @@ impl Vote {
kind: VoteType,
) -> Result<Vote, LemmyError> {
Ok(Vote {
actor: actor.actor_id(),
actor: ObjectId::new(actor.actor_id()),
to: [PublicUrl::Public],
object: object.ap_id(),
cc: [community.actor_id()],
object: ObjectId::new(object.ap_id()),
cc: [ObjectId::new(community.actor_id())],
kind: kind.clone(),
id: generate_activity_id(kind)?,
context: lemmy_context(),
@ -129,9 +126,8 @@ impl ActivityHandler for Vote {
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
let actor = get_or_fetch_and_upsert_person(&self.actor, context, request_counter).await?;
let object =
get_or_fetch_and_insert_post_or_comment(&self.object, context, request_counter).await?;
let actor = self.actor.dereference(context, request_counter).await?;
let object = self.object.dereference(context, request_counter).await?;
match object {
PostOrComment::Post(p) => vote_post(&self.kind, actor, p.deref(), context).await,
PostOrComment::Comment(c) => vote_comment(&self.kind, actor, c.deref(), context).await,

@ -1,92 +1,23 @@
use crate::{
activities::community::announce::AnnounceActivity,
fetcher::{
fetch::fetch_remote_object,
is_deleted,
person::get_or_fetch_and_upsert_person,
should_refetch_actor,
},
objects::{community::Group, FromApub},
fetcher::{fetch::fetch_remote_object, object_id::ObjectId},
objects::community::Group,
};
use activitystreams::collection::{CollectionExt, OrderedCollection};
use anyhow::Context;
use diesel::result::Error::NotFound;
use lemmy_api_common::blocking;
use lemmy_apub_lib::ActivityHandler;
use lemmy_db_queries::{source::community::Community_, ApubObject, Joinable};
use lemmy_db_schema::source::community::{Community, CommunityModerator, CommunityModeratorForm};
use lemmy_db_queries::Joinable;
use lemmy_db_schema::source::{
community::{Community, CommunityModerator, CommunityModeratorForm},
person::Person,
};
use lemmy_db_views_actor::community_moderator_view::CommunityModeratorView;
use lemmy_utils::{location_info, LemmyError};
use lemmy_websocket::LemmyContext;
use log::debug;
use url::Url;
/// Get a community from its apub ID.
///
/// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
/// Otherwise it is fetched from the remote instance, stored and returned.
pub(crate) async fn get_or_fetch_and_upsert_community(
apub_id: &Url,
context: &LemmyContext,
recursion_counter: &mut i32,
) -> Result<Community, LemmyError> {
let apub_id_owned = apub_id.to_owned();
let community = blocking(context.pool(), move |conn| {
Community::read_from_apub_id(conn, &apub_id_owned.into())
})
.await?;
match community {
Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => {
debug!("Fetching and updating from remote community: {}", apub_id);
fetch_remote_community(apub_id, context, Some(c), recursion_counter).await
}
Ok(c) => Ok(c),
Err(NotFound {}) => {
debug!("Fetching and creating remote community: {}", apub_id);
fetch_remote_community(apub_id, context, None, recursion_counter).await
}
Err(e) => Err(e.into()),
}
}
/// Request a community by apub ID from a remote instance, including moderators. If `old_community`,
/// is set, this is an update for a community which is already known locally. If not, we don't know
/// the community yet and also pull the outbox, to get some initial posts.
async fn fetch_remote_community(
apub_id: &Url,
context: &LemmyContext,
old_community: Option<Community>,
request_counter: &mut i32,
) -> Result<Community, LemmyError> {
let group = fetch_remote_object::<Group>(context.client(), apub_id, request_counter).await;
if let Some(c) = old_community.to_owned() {
if is_deleted(&group) {
blocking(context.pool(), move |conn| {
Community::update_deleted(conn, c.id, true)
})
.await??;
} else if group.is_err() {
// If fetching failed, return the existing data.
return Ok(c);
}
}
let group = group?;
let community = Community::from_apub(&group, context, apub_id, request_counter).await?;
update_community_mods(&group, &community, context, request_counter).await?;
// only fetch outbox for new communities, otherwise this can create an infinite loop
if old_community.is_none() {
fetch_community_outbox(context, &group.outbox, request_counter).await?
}
Ok(community)
}
async fn update_community_mods(
pub(crate) async fn update_community_mods(
group: &Group,
community: &Community,
context: &LemmyContext,
@ -113,8 +44,9 @@ async fn update_community_mods(
}
// Add new mods to database which have been added to moderators collection
for mod_uri in new_moderators {
let mod_user = get_or_fetch_and_upsert_person(&mod_uri, context, request_counter).await?;
for mod_id in new_moderators {
let mod_id = ObjectId::new(mod_id);
let mod_user: Person = mod_id.dereference(context, request_counter).await?;
if !current_moderators
.clone()
@ -136,7 +68,7 @@ async fn update_community_mods(
Ok(())
}
async fn fetch_community_outbox(
pub(crate) async fn fetch_community_outbox(
context: &LemmyContext,
outbox: &Url,
recursion_counter: &mut i32,
@ -160,7 +92,7 @@ async fn fetch_community_outbox(
Ok(())
}
pub(crate) async fn fetch_community_mods(
async fn fetch_community_mods(
context: &LemmyContext,
group: &Group,
recursion_counter: &mut i32,

@ -0,0 +1,85 @@
use crate::fetcher::post_or_comment::PostOrComment;
use lemmy_api_common::blocking;
use lemmy_db_queries::source::{
comment::Comment_,
community::Community_,
person::Person_,
post::Post_,
};
use lemmy_db_schema::source::{comment::Comment, community::Community, person::Person, post::Post};
use lemmy_utils::LemmyError;
use lemmy_websocket::LemmyContext;
// TODO: merge this trait with ApubObject (means that db_schema needs to depend on apub_lib)
#[async_trait::async_trait(?Send)]
pub trait DeletableApubObject {
// TODO: pass in tombstone with summary field, to decide between remove/delete
async fn delete(self, context: &LemmyContext) -> Result<(), LemmyError>;
}
#[async_trait::async_trait(?Send)]
impl DeletableApubObject for Community {
async fn delete(self, context: &LemmyContext) -> Result<(), LemmyError> {
let id = self.id;
blocking(context.pool(), move |conn| {
Community::update_deleted(conn, id, true)
})
.await??;
Ok(())
}
}
#[async_trait::async_trait(?Send)]
impl DeletableApubObject for Person {
async fn delete(self, context: &LemmyContext) -> Result<(), LemmyError> {
let id = self.id;
blocking(context.pool(), move |conn| Person::delete_account(conn, id)).await??;
Ok(())
}
}
#[async_trait::async_trait(?Send)]
impl DeletableApubObject for Post {
async fn delete(self, context: &LemmyContext) -> Result<(), LemmyError> {
let id = self.id;
blocking(context.pool(), move |conn| {
Post::update_deleted(conn, id, true)
})
.await??;
Ok(())
}
}
#[async_trait::async_trait(?Send)]
impl DeletableApubObject for Comment {
async fn delete(self, context: &LemmyContext) -> Result<(), LemmyError> {
let id = self.id;
blocking(context.pool(), move |conn| {
Comment::update_deleted(conn, id, true)
})
.await??;
Ok(())
}
}
#[async_trait::async_trait(?Send)]
impl DeletableApubObject for PostOrComment {
async fn delete(self, context: &LemmyContext) -> Result<(), LemmyError> {
match self {
PostOrComment::Comment(c) => {
blocking(context.pool(), move |conn| {
Comment::update_deleted(conn, c.id, true)
})
.await??;
}
PostOrComment::Post(p) => {
blocking(context.pool(), move |conn| {
Post::update_deleted(conn, p.id, true)
})
.await??;
}
}
Ok(())
}
}

@ -2,10 +2,9 @@ use crate::{check_is_apub_id_valid, APUB_JSON_CONTENT_TYPE};
use anyhow::anyhow;
use lemmy_utils::{request::retry, LemmyError};
use log::info;
use reqwest::{Client, StatusCode};
use reqwest::Client;
use serde::Deserialize;
use std::time::Duration;
use thiserror::Error;
use url::Url;
/// Maximum number of HTTP requests allowed to handle a single incoming activity (or a single object
@ -15,50 +14,19 @@ use url::Url;
/// So we are looking at a maximum of 22 requests (rounded up just to be safe).
static MAX_REQUEST_NUMBER: i32 = 25;
#[derive(Debug, Error)]
pub(in crate::fetcher) struct FetchError {
pub inner: anyhow::Error,
pub status_code: Option<StatusCode>,
}
impl From<LemmyError> for FetchError {
fn from(t: LemmyError) -> Self {
FetchError {
inner: t.inner,
status_code: None,
}
}
}
impl From<reqwest::Error> for FetchError {
fn from(t: reqwest::Error) -> Self {
let status = t.status();
FetchError {
inner: t.into(),
status_code: status,
}
}
}
impl std::fmt::Display for FetchError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
std::fmt::Display::fmt(&self, f)
}
}
/// Fetch any type of ActivityPub object, handling things like HTTP headers, deserialisation,
/// timeouts etc.
pub(in crate::fetcher) async fn fetch_remote_object<Response>(
client: &Client,
url: &Url,
recursion_counter: &mut i32,
) -> Result<Response, FetchError>
) -> Result<Response, LemmyError>
where
Response: for<'de> Deserialize<'de> + std::fmt::Debug,
{
*recursion_counter += 1;
if *recursion_counter > MAX_REQUEST_NUMBER {
return Err(LemmyError::from(anyhow!("Maximum recursion depth reached")).into());
return Err(anyhow!("Maximum recursion depth reached").into());
}
check_is_apub_id_valid(url, false)?;
@ -73,14 +41,6 @@ where
})
.await?;
if res.status() == StatusCode::GONE {
info!("Fetched remote object {} which was deleted", url);
return Err(FetchError {
inner: anyhow!("Fetched remote object {} which was deleted", url),
status_code: Some(res.status()),
});
}
let object = res.json().await?;
info!("Fetched remote object {}", url);
Ok(object)

@ -1,56 +1,42 @@
pub mod community;
pub mod deletable_apub_object;
mod fetch;
pub mod objects;
pub mod person;
pub mod object_id;
pub mod post_or_comment;
pub mod search;
use crate::{
fetcher::{
community::get_or_fetch_and_upsert_community,
fetch::FetchError,
person::get_or_fetch_and_upsert_person,
},
ActorType,
};
use crate::{fetcher::object_id::ObjectId, ActorType};
use chrono::NaiveDateTime;
use http::StatusCode;
use lemmy_db_schema::naive_now;
use lemmy_db_schema::{
naive_now,
source::{community::Community, person::Person},
};
use lemmy_utils::LemmyError;
use lemmy_websocket::LemmyContext;
use serde::Deserialize;
use url::Url;
static ACTOR_REFETCH_INTERVAL_SECONDS: i64 = 24 * 60 * 60;
static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 10;
fn is_deleted<Response>(fetch_response: &Result<Response, FetchError>) -> bool
where
Response: for<'de> Deserialize<'de>,
{
if let Err(e) = fetch_response {
if let Some(status) = e.status_code {
if status == StatusCode::GONE {
return true;
}
}
}
false
}
/// Get a remote actor from its apub ID (either a person or a community). Thin wrapper around
/// `get_or_fetch_and_upsert_person()` and `get_or_fetch_and_upsert_community()`.
///
/// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
/// Otherwise it is fetched from the remote instance, stored and returned.
pub(crate) async fn get_or_fetch_and_upsert_actor(
apub_id: &Url,
apub_id: Url,
context: &LemmyContext,
recursion_counter: &mut i32,
) -> Result<Box<dyn ActorType>, LemmyError> {
let community = get_or_fetch_and_upsert_community(apub_id, context, recursion_counter).await;
let community_id = ObjectId::<Community>::new(apub_id.clone());
let community = community_id.dereference(context, recursion_counter).await;
let actor: Box<dyn ActorType> = match community {
Ok(c) => Box::new(c),
Err(_) => Box::new(get_or_fetch_and_upsert_person(apub_id, context, recursion_counter).await?),
Err(_) => {
let person_id = ObjectId::new(apub_id);
let person: Person = person_id.dereference(context, recursion_counter).await?;
Box::new(person)
}
};
Ok(actor)
}

@ -0,0 +1,157 @@
use crate::{
fetcher::{deletable_apub_object::DeletableApubObject, should_refetch_actor},
objects::FromApub,
APUB_JSON_CONTENT_TYPE,
};
use anyhow::anyhow;
use diesel::NotFound;
use lemmy_api_common::blocking;
use lemmy_db_queries::{ApubObject, DbPool};
use lemmy_db_schema::DbUrl;
use lemmy_utils::{request::retry, settings::structs::Settings, LemmyError};
use lemmy_websocket::LemmyContext;
use reqwest::StatusCode;
use serde::{Deserialize, Serialize};
use std::{
fmt::{Debug, Display, Formatter},
marker::PhantomData,
time::Duration,
};
use url::Url;
/// Maximum number of HTTP requests allowed to handle a single incoming activity (or a single object
/// fetch through the search). This should be configurable.
static REQUEST_LIMIT: i32 = 25;
#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)]
pub struct ObjectId<Kind>(Url, #[serde(skip)] PhantomData<Kind>)
where
Kind: FromApub + ApubObject + DeletableApubObject + Send + 'static,
for<'de2> <Kind as FromApub>::ApubType: serde::Deserialize<'de2>;
impl<Kind> ObjectId<Kind>
where
Kind: FromApub + ApubObject + DeletableApubObject + Send + 'static,
for<'de> <Kind as FromApub>::ApubType: serde::Deserialize<'de>,
{
pub fn new<T>(url: T) -> Self
where
T: Into<Url>,
{
ObjectId(url.into(), PhantomData::<Kind>)
}
pub fn inner(&self) -> &Url {
&self.0
}
/// Fetches an activitypub object, either from local database (if possible), or over http.
pub(crate) async fn dereference(
&self,
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<Kind, LemmyError> {
let db_object = self.dereference_locally(context.pool()).await?;
// if its a local object, only fetch it from the database and not over http
if self.0.domain() == Some(&Settings::get().get_hostname_without_port()?) {
return match db_object {
None => Err(NotFound {}.into()),
Some(o) => Ok(o),
};
}
if let Some(object) = db_object {
if let Some(last_refreshed_at) = object.last_refreshed_at() {
// TODO: rename to should_refetch_object()
if should_refetch_actor(last_refreshed_at) {
return self
.dereference_remotely(context, request_counter, Some(object))
.await;
}
}
Ok(object)
} else {
self
.dereference_remotely(context, request_counter, None)
.await
}
}
/// returning none means the object was not found in local db
async fn dereference_locally(&self, pool: &DbPool) -> Result<Option<Kind>, LemmyError> {
let id: DbUrl = self.0.clone().into();
let object = blocking(pool, move |conn| ApubObject::read_from_apub_id(conn, &id)).await?;
match object {
Ok(o) => Ok(Some(o)),
Err(NotFound {}) => Ok(None),
Err(e) => Err(e.into()),
}
}
async fn dereference_remotely(
&self,
context: &LemmyContext,
request_counter: &mut i32,
db_object: Option<Kind>,
) -> Result<Kind, LemmyError> {
// dont fetch local objects this way
debug_assert!(self.0.domain() != Some(&Settings::get().hostname));
*request_counter += 1;
if *request_counter > REQUEST_LIMIT {
return Err(LemmyError::from(anyhow!("Request limit reached")));
}
let res = retry(|| {
context
.client()
.get(self.0.as_str())
.header("Accept", APUB_JSON_CONTENT_TYPE)
.timeout(Duration::from_secs(60))
.send()
})
.await?;
if res.status() == StatusCode::GONE {
if let Some(db_object) = db_object {
db_object.delete(context).await?;
}
return Err(anyhow!("Fetched remote object {} which was deleted", self).into());
}
let res2: Kind::ApubType = res.json().await?;
Ok(Kind::from_apub(&res2, context, self.inner(), request_counter).await?)
}
}
impl<Kind> Display for ObjectId<Kind>
where
Kind: FromApub + ApubObject + DeletableApubObject + Send + 'static,
for<'de> <Kind as FromApub>::ApubType: serde::Deserialize<'de>,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0.to_string())
}
}
impl<Kind> From<ObjectId<Kind>> for Url
where
Kind: FromApub + ApubObject + DeletableApubObject + Send + 'static,
for<'de> <Kind as FromApub>::ApubType: serde::Deserialize<'de>,
{
fn from(id: ObjectId<Kind>) -> Self {
id.0
}
}
impl<Kind> From<ObjectId<Kind>> for DbUrl
where
Kind: FromApub + ApubObject + DeletableApubObject + Send + 'static,
for<'de> <Kind as FromApub>::ApubType: serde::Deserialize<'de>,
{
fn from(id: ObjectId<Kind>) -> Self {
id.0.into()
}
}

@ -1,97 +0,0 @@
use crate::{
fetcher::fetch::fetch_remote_object,
objects::{comment::Note, post::Page, FromApub},
PostOrComment,
};
use anyhow::anyhow;
use diesel::result::Error::NotFound;
use lemmy_api_common::blocking;
use lemmy_db_queries::{ApubObject, Crud};
use lemmy_db_schema::source::{comment::Comment, post::Post};
use lemmy_utils::LemmyError;
use lemmy_websocket::LemmyContext;
use log::debug;
use url::Url;
/// Gets a post by its apub ID. If it exists locally, it is returned directly. Otherwise it is
/// pulled from its apub ID, inserted and returned.
///
/// The parent community is also pulled if necessary. Comments are not pulled.
pub(crate) async fn get_or_fetch_and_insert_post(
post_ap_id: &Url,
context: &LemmyContext,
recursion_counter: &mut i32,
) -> Result<Post, LemmyError> {
let post_ap_id_owned = post_ap_id.to_owned();
let post = blocking(context.pool(), move |conn| {
Post::read_from_apub_id(conn, &post_ap_id_owned.into())
})
.await?;
match post {
Ok(p) => Ok(p),
Err(NotFound {}) => {
debug!("Fetching and creating remote post: {}", post_ap_id);
let page =
fetch_remote_object::<Page>(context.client(), post_ap_id, recursion_counter).await?;
let post = Post::from_apub(&page, context, post_ap_id, recursion_counter).await?;
Ok(post)
}
Err(e) => Err(e.into()),
}
}
/// Gets a comment by its apub ID. If it exists locally, it is returned directly. Otherwise it is
/// pulled from its apub ID, inserted and returned.
///
/// The parent community, post and comment are also pulled if necessary.
pub(crate) async fn get_or_fetch_and_insert_comment(
comment_ap_id: &Url,
context: &LemmyContext,
recursion_counter: &mut i32,
) -> Result<Comment, LemmyError> {
let comment_ap_id_owned = comment_ap_id.to_owned();
let comment = blocking(context.pool(), move |conn| {
Comment::read_from_apub_id(conn, &comment_ap_id_owned.into())
})
.await?;
match comment {
Ok(p) => Ok(p),
Err(NotFound {}) => {
debug!(
"Fetching and creating remote comment and its parents: {}",
comment_ap_id
);
let comment =
fetch_remote_object::<Note>(context.client(), comment_ap_id, recursion_counter).await?;
let comment = Comment::from_apub(&comment, context, comment_ap_id, recursion_counter).await?;
let post_id = comment.post_id;
let post = blocking(context.pool(), move |conn| Post::read(conn, post_id)).await??;
if post.locked {
return Err(anyhow!("Post is locked").into());
}
Ok(comment)
}
Err(e) => Err(e.into()),
}
}
pub(crate) async fn get_or_fetch_and_insert_post_or_comment(
ap_id: &Url,
context: &LemmyContext,
recursion_counter: &mut i32,
) -> Result<PostOrComment, LemmyError> {
Ok(
match get_or_fetch_and_insert_post(ap_id, context, recursion_counter).await {
Ok(p) => PostOrComment::Post(Box::new(p)),
Err(_) => {
let c = get_or_fetch_and_insert_comment(ap_id, context, recursion_counter).await?;
PostOrComment::Comment(Box::new(c))
}
},
)
}

@ -1,70 +0,0 @@
use crate::{
fetcher::{fetch::fetch_remote_object, is_deleted, should_refetch_actor},
objects::{person::Person as ApubPerson, FromApub},
};
use anyhow::anyhow;
use diesel::result::Error::NotFound;
use lemmy_api_common::blocking;
use lemmy_db_queries::{source::person::Person_, ApubObject};
use lemmy_db_schema::source::person::Person;
use lemmy_utils::LemmyError;
use lemmy_websocket::LemmyContext;
use log::debug;
use url::Url;
/// Get a person from its apub ID.
///
/// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
/// Otherwise it is fetched from the remote instance, stored and returned.
pub(crate) async fn get_or_fetch_and_upsert_person(
apub_id: &Url,
context: &LemmyContext,
recursion_counter: &mut i32,
) -> Result<Person, LemmyError> {
let apub_id_owned = apub_id.to_owned();
let person = blocking(context.pool(), move |conn| {
Person::read_from_apub_id(conn, &apub_id_owned.into())
})
.await?;
match person {
// If its older than a day, re-fetch it
Ok(u) if !u.local && should_refetch_actor(u.last_refreshed_at) => {
debug!("Fetching and updating from remote person: {}", apub_id);
let person =
fetch_remote_object::<ApubPerson>(context.client(), apub_id, recursion_counter).await;
if is_deleted(&person) {
// TODO: use Person::update_deleted() once implemented
blocking(context.pool(), move |conn| {
Person::delete_account(conn, u.id)
})
.await??;
return Err(anyhow!("Person was deleted by remote instance").into());
} else if person.is_err() {
return Ok(u);
}
let person = Person::from_apub(&person?, context, apub_id, recursion_counter).await?;
let person_id = person.id;
blocking(context.pool(), move |conn| {
Person::mark_as_updated(conn, person_id)
})
.await??;
Ok(person)
}
Ok(u) => Ok(u),
Err(NotFound {}) => {
debug!("Fetching and creating remote person: {}", apub_id);
let person =
fetch_remote_object::<ApubPerson>(context.client(), apub_id, recursion_counter).await?;
let person = Person::from_apub(&person, context, apub_id, recursion_counter).await?;
Ok(person)
}
Err(e) => Err(e.into()),
}
}

@ -0,0 +1,85 @@
use crate::objects::{comment::Note, post::Page, FromApub};
use activitystreams::chrono::NaiveDateTime;
use diesel::{result::Error, PgConnection};
use lemmy_db_queries::ApubObject;
use lemmy_db_schema::{
source::{
comment::{Comment, CommentForm},
post::{Post, PostForm},
},
DbUrl,
};
use lemmy_utils::LemmyError;
use lemmy_websocket::LemmyContext;
use serde::Deserialize;
use url::Url;
#[derive(Clone, Debug)]
pub enum PostOrComment {
Comment(Box<Comment>),
Post(Box<Post>),
}
pub enum PostOrCommentForm {
PostForm(PostForm),
CommentForm(CommentForm),
}
#[derive(Deserialize)]
pub enum PageOrNote {
Page(Page),
Note(Note),
}
#[async_trait::async_trait(?Send)]
impl ApubObject for PostOrComment {
fn last_refreshed_at(&self) -> Option<NaiveDateTime> {
None
}
// TODO: this can probably be implemented using a single sql query
fn read_from_apub_id(conn: &PgConnection, object_id: &DbUrl) -> Result<Self, Error>
where
Self: Sized,
{
let post = Post::read_from_apub_id(conn, object_id);
Ok(match post {
Ok(o) => PostOrComment::Post(Box::new(o)),
Err(_) => PostOrComment::Comment(Box::new(Comment::read_from_apub_id(conn, object_id)?)),
})
}
}
#[async_trait::async_trait(?Send)]
impl FromApub for PostOrComment {
type ApubType = PageOrNote;
async fn from_apub(
apub: &PageOrNote,
context: &LemmyContext,
expected_domain: &Url,
request_counter: &mut i32,
) -> Result<Self, LemmyError>
where
Self: Sized,
{
Ok(match apub {
PageOrNote::Page(p) => PostOrComment::Post(Box::new(
Post::from_apub(p, context, expected_domain, request_counter).await?,
)),
PageOrNote::Note(n) => PostOrComment::Comment(Box::new(
Comment::from_apub(n, context, expected_domain, request_counter).await?,
)),
})
}
}
impl PostOrComment {
pub(crate) fn ap_id(&self) -> Url {
match self {
PostOrComment::Post(p) => p.ap_id.clone(),
PostOrComment::Comment(c) => c.ap_id.clone(),
}
.into()
}
}

@ -1,52 +1,27 @@
use crate::{
fetcher::{
community::get_or_fetch_and_upsert_community,
fetch::fetch_remote_object,
is_deleted,
person::get_or_fetch_and_upsert_person,
},
find_object_by_id,
fetcher::{deletable_apub_object::DeletableApubObject, object_id::ObjectId},
objects::{comment::Note, community::Group, person::Person as ApubPerson, post::Page, FromApub},
Object,
};
use activitystreams::chrono::NaiveDateTime;
use anyhow::anyhow;
use diesel::{result::Error, PgConnection};
use itertools::Itertools;
use lemmy_api_common::{blocking, site::ResolveObjectResponse};
use lemmy_api_common::blocking;
use lemmy_apub_lib::webfinger::{webfinger_resolve_actor, WebfingerType};
use lemmy_db_queries::source::{
comment::Comment_,
community::Community_,
person::Person_,
post::Post_,
private_message::PrivateMessage_,
use lemmy_db_queries::{
source::{community::Community_, person::Person_},
ApubObject,
DbPool,
};
use lemmy_db_schema::source::{
comment::Comment,
community::Community,
person::Person,
post::Post,
private_message::PrivateMessage,
use lemmy_db_schema::{
source::{comment::Comment, community::Community, person::Person, post::Post},
DbUrl,
};
use lemmy_db_views::{
comment_view::CommentView,
local_user_view::LocalUserView,
post_view::PostView,
};
use lemmy_db_views_actor::{community_view::CommunityView, person_view::PersonViewSafe};
use lemmy_utils::LemmyError;
use lemmy_websocket::LemmyContext;
use serde::Deserialize;
use url::Url;
/// The types of ActivityPub objects that can be fetched directly by searching for their ID.
#[derive(serde::Deserialize, Debug)]
#[serde(untagged)]
enum SearchAcceptedObjects {
Person(Box<ApubPerson>),
Group(Box<Group>),
Page(Box<Page>),
Comment(Box<Note>),
}
/// Attempt to parse the query as URL, and fetch an ActivityPub object from it.
///
/// Some working examples for use with the `docker/federation/` setup:
@ -56,9 +31,8 @@ enum SearchAcceptedObjects {
/// http://lemmy_delta:8571/comment/2
pub async fn search_by_apub_id(
query: &str,
local_user_view: Option<LocalUserView>,
context: &LemmyContext,
) -> Result<ResolveObjectResponse, LemmyError> {
) -> Result<SearchableObjects, LemmyError> {
let query_url = match Url::parse(query) {
Ok(u) => u,
Err(_) => {
@ -75,142 +49,113 @@ pub async fn search_by_apub_id(
}
// local actor, read from database and return
else {
let name: String = name.into();
return match kind {
WebfingerType::Group => {
let res = blocking(context.pool(), move |conn| {
let community = Community::read_from_name(conn, &name)?;
CommunityView::read(conn, community.id, local_user_view.map(|l| l.person.id))
})
.await??;
Ok(ResolveObjectResponse {
community: Some(res),
..ResolveObjectResponse::default()
})
}
WebfingerType::Person => {
let res = blocking(context.pool(), move |conn| {
let person = Person::find_by_name(conn, &name)?;
PersonViewSafe::read(conn, person.id)
})
.await??;
Ok(ResolveObjectResponse {
person: Some(res),
..ResolveObjectResponse::default()
})
}
};
return find_local_actor_by_name(name, kind, context.pool()).await;
}
}
};
let request_counter = &mut 0;
// this does a fetch (even for local objects), just to determine its type and fetch it again
// below. we need to fix this when rewriting the fetcher.
let fetch_response =
fetch_remote_object::<SearchAcceptedObjects>(context.client(), &query_url, request_counter)
.await;
if is_deleted(&fetch_response) {
delete_object_locally(&query_url, context).await?;
return Err(anyhow!("Object was deleted").into());
}
ObjectId::new(query_url)
.dereference(context, request_counter)
.await
}
// Necessary because we get a stack overflow using FetchError
let fet_res = fetch_response.map_err(|e| LemmyError::from(e.inner))?;
build_response(fet_res, query_url, request_counter, context).await
async fn find_local_actor_by_name(
name: &str,
kind: WebfingerType,
pool: &DbPool,
) -> Result<SearchableObjects, LemmyError> {
let name: String = name.into();
Ok(match kind {
WebfingerType::Group => SearchableObjects::Community(
blocking(pool, move |conn| Community::read_from_name(conn, &name)).await??,
),
WebfingerType::Person => SearchableObjects::Person(
blocking(pool, move |conn| Person::find_by_name(conn, &name)).await??,
),
})
}
async fn build_response(
fetch_response: SearchAcceptedObjects,
query_url: Url,
recursion_counter: &mut i32,
context: &LemmyContext,
) -> Result<ResolveObjectResponse, LemmyError> {
use ResolveObjectResponse as ROR;
Ok(match fetch_response {
SearchAcceptedObjects::Person(p) => {
let person_uri = p.id(&query_url)?;
/// The types of ActivityPub objects that can be fetched directly by searching for their ID.
#[derive(Debug)]
pub enum SearchableObjects {
Person(Person),
Community(Community),
Post(Post),
Comment(Comment),
}
let person = get_or_fetch_and_upsert_person(person_uri, context, recursion_counter).await?;
ROR {
person: blocking(context.pool(), move |conn| {
PersonViewSafe::read(conn, person.id)
})
.await?
.ok(),
..ROR::default()
}
}
SearchAcceptedObjects::Group(g) => {
let community_uri = g.id(&query_url)?;
let community =
get_or_fetch_and_upsert_community(community_uri, context, recursion_counter).await?;
ROR {
community: blocking(context.pool(), move |conn| {
CommunityView::read(conn, community.id, None)
})
.await?
.ok(),
..ROR::default()
}
}
SearchAcceptedObjects::Page(p) => {
let p = Post::from_apub(&p, context, &query_url, recursion_counter).await?;
ROR {
post: blocking(context.pool(), move |conn| PostView::read(conn, p.id, None))
.await?
.ok(),
..ROR::default()
}
}
SearchAcceptedObjects::Comment(c) => {
let c = Comment::from_apub(&c, context, &query_url, recursion_counter).await?;
ROR {
comment: blocking(context.pool(), move |conn| {
CommentView::read(conn, c.id, None)
})
.await?
.ok(),
..ROR::default()
}
}
})
#[derive(Deserialize)]
#[serde(untagged)]
pub enum SearchableApubTypes {
Group(Group),
Person(ApubPerson),
Page(Page),
Note(Note),
}
async fn delete_object_locally(query_url: &Url, context: &LemmyContext) -> Result<(), LemmyError> {
let res = find_object_by_id(context, query_url.to_owned()).await?;
match res {
Object::Comment(c) => {
blocking(context.pool(), move |conn| {
Comment::update_deleted(conn, c.id, true)
})
.await??;
impl ApubObject for SearchableObjects {
fn last_refreshed_at(&self) -> Option<NaiveDateTime> {
match self {
SearchableObjects::Person(p) => p.last_refreshed_at(),
SearchableObjects::Community(c) => c.last_refreshed_at(),
SearchableObjects::Post(p) => p.last_refreshed_at(),
SearchableObjects::Comment(c) => c.last_refreshed_at(),
}
Object::Post(p) => {
blocking(context.pool(), move |conn| {
Post::update_deleted(conn, p.id, true)
})
.await??;
}
// TODO: this is inefficient, because if the object is not in local db, it will run 4 db queries
// before finally returning an error. it would be nice if we could check all 4 tables in
// a single query.
// we could skip this and always return an error, but then it would not be able to mark
// objects as deleted that were deleted by remote server.
fn read_from_apub_id(conn: &PgConnection, object_id: &DbUrl) -> Result<Self, Error> {
let c = Community::read_from_apub_id(conn, object_id);
if let Ok(c) = c {
return Ok(SearchableObjects::Community(c));
}
Object::Person(u) => {
// TODO: implement update_deleted() for user, move it to ApubObject trait
blocking(context.pool(), move |conn| {
Person::delete_account(conn, u.id)
})
.await??;
let p = Person::read_from_apub_id(conn, object_id);
if let Ok(p) = p {
return Ok(SearchableObjects::Person(p));
}
Object::Community(c) => {
blocking(context.pool(), move |conn| {
Community::update_deleted(conn, c.id, true)
})
.await??;
let p = Post::read_from_apub_id(conn, object_id);
if let Ok(p) = p {
return Ok(SearchableObjects::Post(p));
}
Object::PrivateMessage(pm) => {
blocking(context.pool(), move |conn| {
PrivateMessage::update_deleted(conn, pm.id, true)
})
.await??;
let c = Comment::read_from_apub_id(conn, object_id);
Ok(SearchableObjects::Comment(c?))
}
}
#[async_trait::async_trait(?Send)]
impl FromApub for SearchableObjects {
type ApubType = SearchableApubTypes;
async fn from_apub(
apub: &Self::ApubType,
context: &LemmyContext,
ed: &Url,
rc: &mut i32,
) -> Result<Self, LemmyError> {
use SearchableApubTypes as SAT;
use SearchableObjects as SO;
Ok(match apub {
SAT::Group(g) => SO::Community(Community::from_apub(g, context, ed, rc).await?),
SAT::Person(p) => SO::Person(Person::from_apub(p, context, ed, rc).await?),
SAT::Page(p) => SO::Post(Post::from_apub(p, context, ed, rc).await?),
SAT::Note(n) => SO::Comment(Comment::from_apub(n, context, ed, rc).await?),
})
}
}
#[async_trait::async_trait(?Send)]
impl DeletableApubObject for SearchableObjects {
async fn delete(self, context: &LemmyContext) -> Result<(), LemmyError> {
match self {
SearchableObjects::Person(p) => p.delete(context).await,
SearchableObjects::Community(c) => c.delete(context).await,
SearchableObjects::Post(p) => p.delete(context).await,
SearchableObjects::Comment(c) => c.delete(context).await,
}
}
Ok(())
}

@ -90,7 +90,8 @@ where
+ 'static,
{
let request_counter = &mut 0;
let actor = get_or_fetch_and_upsert_actor(activity.actor(), context, request_counter).await?;
let actor =
get_or_fetch_and_upsert_actor(activity.actor().clone(), context, request_counter).await?;
verify_signature(&request, &actor.public_key().context(location_info!())?)?;
// Do nothing if we received the same activity before

@ -9,26 +9,17 @@ pub mod http;
pub mod migrations;
pub mod objects;
use crate::extensions::signatures::PublicKey;
use crate::{extensions::signatures::PublicKey, fetcher::post_or_comment::PostOrComment};
use anyhow::{anyhow, Context};
use diesel::NotFound;
use lemmy_api_common::blocking;
use lemmy_db_queries::{source::activity::Activity_, ApubObject, DbPool};
use lemmy_db_queries::{source::activity::Activity_, DbPool};
use lemmy_db_schema::{
source::{
activity::Activity,
comment::Comment,
community::Community,
person::{Person as DbPerson, Person},
post::Post,
private_message::PrivateMessage,
},
source::{activity::Activity, person::Person},
CommunityId,
DbUrl,
};
use lemmy_db_views_actor::community_person_ban_view::CommunityPersonBanView;
use lemmy_utils::{location_info, settings::structs::Settings, LemmyError};
use lemmy_websocket::LemmyContext;
use serde::Serialize;
use std::net::IpAddr;
use url::{ParseError, Url};
@ -244,96 +235,6 @@ where
Ok(())
}
pub enum PostOrComment {
Comment(Box<Comment>),
Post(Box<Post>),
}
impl PostOrComment {
pub(crate) fn ap_id(&self) -> Url {
match self {
PostOrComment::Post(p) => p.ap_id.clone(),
PostOrComment::Comment(c) => c.ap_id.clone(),
}
.into()
}
}
/// Tries to find a post or comment in the local database, without any network requests.
/// This is used to handle deletions and removals, because in case we dont have the object, we can
/// simply ignore the activity.
pub(crate) async fn find_post_or_comment_by_id(
context: &LemmyContext,
apub_id: Url,
) -> Result<PostOrComment, LemmyError> {
let ap_id = apub_id.clone();
let post = blocking(context.pool(), move |conn| {
Post::read_from_apub_id(conn, &ap_id.into())
})
.await?;
if let Ok(p) = post {
return Ok(PostOrComment::Post(Box::new(p)));
}
let ap_id = apub_id.clone();
let comment = blocking(context.pool(), move |conn| {
Comment::read_from_apub_id(conn, &ap_id.into())
})
.await?;
if let Ok(c) = comment {
return Ok(PostOrComment::Comment(Box::new(c)));
}
Err(NotFound.into())
}
#[derive(Debug)]
enum Object {
Comment(Box<Comment>),
Post(Box<Post>),
Community(Box<Community>),
Person(Box<DbPerson>),
PrivateMessage(Box<PrivateMessage>),
}
async fn find_object_by_id(context: &LemmyContext, apub_id: Url) -> Result<Object, LemmyError> {
let ap_id = apub_id.clone();
if let Ok(pc) = find_post_or_comment_by_id(context, ap_id.to_owned()).await {
return Ok(match pc {
PostOrComment::Post(p) => Object::Post(Box::new(*p)),
PostOrComment::Comment(c) => Object::Comment(Box::new(*c)),
});
}
let ap_id = apub_id.clone();
let person = blocking(context.pool(), move |conn| {
DbPerson::read_from_apub_id(conn, &ap_id.into())
})
.await?;
if let Ok(u) = person {
return Ok(Object::Person(Box::new(u)));
}
let ap_id = apub_id.clone();
let community = blocking(context.pool(), move |conn| {
Community::read_from_apub_id(conn, &ap_id.into())
})
.await?;
if let Ok(c) = community {
return Ok(Object::Community(Box::new(c)));
}
let private_message = blocking(context.pool(), move |conn| {
PrivateMessage::read_from_apub_id(conn, &apub_id.into())
})
.await?;
if let Ok(pm) = private_message {
return Ok(Object::PrivateMessage(Box::new(pm)));
}
Err(NotFound.into())
}
async fn check_community_or_site_ban(
person: &Person,
community_id: CommunityId,

@ -1,3 +1,4 @@
use crate::fetcher::{object_id::ObjectId, post_or_comment::PostOrComment};
use serde::{Deserialize, Serialize};
use url::Url;
@ -13,7 +14,7 @@ use url::Url;
#[serde(untagged)]
pub enum CommentInReplyToMigration {
Old(Vec<Url>),
New(Url),
New(ObjectId<PostOrComment>),
}
// Another migration we are doing is to handle all deletions and removals using Delete activity.

@ -1,13 +1,9 @@
use crate::{
activities::verify_person_in_community,
extensions::context::lemmy_context,
fetcher::objects::{
get_or_fetch_and_insert_comment,
get_or_fetch_and_insert_post,
get_or_fetch_and_insert_post_or_comment,
},
fetcher::object_id::ObjectId,
migrations::CommentInReplyToMigration,
objects::{create_tombstone, get_or_fetch_and_upsert_person, FromApub, Source, ToApub},
objects::{create_tombstone, FromApub, Source, ToApub},
ActorType,
PostOrComment,
};
@ -24,7 +20,7 @@ use lemmy_apub_lib::{
values::{MediaTypeHtml, MediaTypeMarkdown, PublicUrl},
verify_domains_match,
};
use lemmy_db_queries::{ApubObject, Crud, DbPool};
use lemmy_db_queries::{source::comment::Comment_, Crud, DbPool};
use lemmy_db_schema::{
source::{
comment::{Comment, CommentForm},
@ -53,7 +49,7 @@ pub struct Note {
context: OneOrMany<AnyBase>,
r#type: NoteType,
id: Url,
pub(crate) attributed_to: Url,
pub(crate) attributed_to: ObjectId<Person>,
/// Indicates that the object is publicly readable. Unlike [`Post.to`], this one doesn't contain
/// the community ID, as it would be incompatible with Pleroma (and we can get the community from
/// the post in [`in_reply_to`]).
@ -86,23 +82,15 @@ impl Note {
CommentInReplyToMigration::Old(in_reply_to) => {
// This post, or the parent comment might not yet exist on this server yet, fetch them.
let post_id = in_reply_to.get(0).context(location_info!())?;
let post = Box::pin(get_or_fetch_and_insert_post(
post_id,
context,
request_counter,
))
.await?;
let post_id = ObjectId::new(post_id.clone());
let post = Box::pin(post_id.dereference(context, request_counter)).await?;
// The 2nd item, if it exists, is the parent comment apub_id
// Nested comments will automatically get fetched recursively
let parent_id: Option<CommentId> = match in_reply_to.get(1) {
Some(parent_comment_uri) => {
let parent_comment = Box::pin(get_or_fetch_and_insert_comment(
parent_comment_uri,
context,
request_counter,
))
.await?;
Some(comment_id) => {
let comment_id = ObjectId::<Comment>::new(comment_id.clone());
let parent_comment = Box::pin(comment_id.dereference(context, request_counter)).await?;
Some(parent_comment.id)
}
@ -112,9 +100,7 @@ impl Note {
Ok((post, parent_id))
}
CommentInReplyToMigration::New(in_reply_to) => {
let parent = Box::pin(
get_or_fetch_and_insert_post_or_comment(in_reply_to, context, request_counter).await?,
);
let parent = Box::pin(in_reply_to.dereference(context, request_counter).await?);
match parent.deref() {
PostOrComment::Post(p) => {
// Workaround because I cant figure ut how to get the post out of the box (and we dont
@ -148,10 +134,10 @@ impl Note {
if post.locked {
return Err(anyhow!("Post is locked").into());
}
verify_domains_match(&self.attributed_to, &self.id)?;
verify_domains_match(self.attributed_to.inner(), &self.id)?;
verify_person_in_community(
&self.attributed_to,
&community.actor_id(),
&ObjectId::new(community.actor_id()),
context,
request_counter,
)
@ -185,7 +171,7 @@ impl ToApub for Comment {
context: lemmy_context(),
r#type: NoteType::Note,
id: self.ap_id.to_owned().into_inner(),
attributed_to: creator.actor_id.into_inner(),
attributed_to: ObjectId::new(creator.actor_id),
to: PublicUrl::Public,
content: self.content.clone(),
media_type: MediaTypeHtml::Html,
@ -226,9 +212,14 @@ impl FromApub for Comment {
request_counter: &mut i32,
) -> Result<Comment, LemmyError> {
let ap_id = Some(note.id(expected_domain)?.clone().into());
let creator =
get_or_fetch_and_upsert_person(&note.attributed_to, context, request_counter).await?;
let creator = note
.attributed_to
.dereference(context, request_counter)
.await?;
let (post, parent_comment_id) = note.get_parents(context, request_counter).await?;
if post.locked {
return Err(anyhow!("Post is locked").into());
}
let content = &note.source.content;
let content_slurs_removed = remove_slurs(content);

@ -1,6 +1,6 @@
use crate::{
extensions::{context::lemmy_context, signatures::PublicKey},
fetcher::community::fetch_community_mods,
fetcher::community::{fetch_community_outbox, update_community_mods},
generate_moderators_url,
objects::{create_tombstone, FromApub, ImageObject, Source, ToApub},
ActorType,
@ -18,7 +18,7 @@ use lemmy_apub_lib::{
values::{MediaTypeHtml, MediaTypeMarkdown},
verify_domains_match,
};
use lemmy_db_queries::{ApubObject, DbPool};
use lemmy_db_queries::{source::community::Community_, DbPool};
use lemmy_db_schema::{
naive_now,
source::community::{Community, CommunityForm},
@ -175,10 +175,14 @@ impl FromApub for Community {
expected_domain: &Url,
request_counter: &mut i32,
) -> Result<Community, LemmyError> {
fetch_community_mods(context, group, request_counter).await?;
let form = Group::from_apub_to_form(group, expected_domain).await?;
let community = blocking(context.pool(), move |conn| Community::upsert(conn, &form)).await??;
update_community_mods(group, &community, context, request_counter).await?;
// TODO: doing this unconditionally might cause infinite loop for some reason
fetch_community_outbox(context, &group.outbox, request_counter).await?;
Ok(community)
}
}

@ -1,4 +1,3 @@
use crate::fetcher::person::get_or_fetch_and_upsert_person;
use activitystreams::{
base::BaseExt,
object::{kind::ImageType, Tombstone, TombstoneExt},
@ -26,7 +25,7 @@ pub(crate) trait ToApub {
}
#[async_trait::async_trait(?Send)]
pub(crate) trait FromApub {
pub trait FromApub {
type ApubType;
/// Converts an object from ActivityPub type to Lemmy internal type.
///

@ -17,7 +17,7 @@ use lemmy_apub_lib::{
values::{MediaTypeHtml, MediaTypeMarkdown},
verify_domains_match,
};
use lemmy_db_queries::{ApubObject, DbPool};
use lemmy_db_queries::{source::person::Person_, DbPool};
use lemmy_db_schema::{
naive_now,
source::person::{Person as DbPerson, PersonForm},

@ -1,7 +1,7 @@
use crate::{
activities::{extract_community, verify_person_in_community},
extensions::context::lemmy_context,
fetcher::person::get_or_fetch_and_upsert_person,
fetcher::object_id::ObjectId,
objects::{create_tombstone, FromApub, ImageObject, Source, ToApub},
ActorType,
};
@ -21,7 +21,7 @@ use lemmy_apub_lib::{
values::{MediaTypeHtml, MediaTypeMarkdown},
verify_domains_match,
};
use lemmy_db_queries::{ApubObject, Crud, DbPool};
use lemmy_db_queries::{source::post::Post_, ApubObject, Crud, DbPool};
use lemmy_db_schema::{
self,
source::{
@ -48,7 +48,7 @@ pub struct Page {
context: OneOrMany<AnyBase>,
r#type: PageType,
id: Url,
pub(crate) attributed_to: Url,
pub(crate) attributed_to: ObjectId<Person>,
to: [Url; 2],
name: String,
content: Option<String>,
@ -101,10 +101,10 @@ impl Page {
let community = extract_community(&self.to, context, request_counter).await?;
check_slurs(&self.name)?;
verify_domains_match(&self.attributed_to, &self.id)?;
verify_domains_match(self.attributed_to.inner(), &self.id)?;
verify_person_in_community(
&self.attributed_to,
&community.actor_id(),
&ObjectId::new(community.actor_id()),
context,
request_counter,
)
@ -137,7 +137,7 @@ impl ToApub for Post {
context: lemmy_context(),
r#type: PageType::Page,
id: self.ap_id.clone().into(),
attributed_to: creator.actor_id.into(),
attributed_to: ObjectId::new(creator.actor_id),
to: [community.actor_id.into(), public()],
name: self.name.clone(),
content: self.body.as_ref().map(|b| markdown_to_html(b)),
@ -183,8 +183,10 @@ impl FromApub for Post {
page.id(expected_domain)?
};
let ap_id = Some(ap_id.clone().into());
let creator =
get_or_fetch_and_upsert_person(&page.attributed_to, context, request_counter).await?;
let creator = page
.attributed_to
.dereference(context, request_counter)
.await?;
let community = extract_community(&page.to, context, request_counter).await?;
let thumbnail_url: Option<Url> = page.image.clone().map(|i| i.url);

@ -1,6 +1,6 @@
use crate::{
extensions::context::lemmy_context,
fetcher::person::get_or_fetch_and_upsert_person,
fetcher::object_id::ObjectId,
objects::{create_tombstone, FromApub, Source, ToApub},
};
use activitystreams::{
@ -16,7 +16,7 @@ use lemmy_apub_lib::{
values::{MediaTypeHtml, MediaTypeMarkdown},
verify_domains_match,
};
use lemmy_db_queries::{ApubObject, Crud, DbPool};
use lemmy_db_queries::{source::private_message::PrivateMessage_, Crud, DbPool};
use lemmy_db_schema::source::{
person::Person,
private_message::{PrivateMessage, PrivateMessageForm},
@ -35,8 +35,8 @@ pub struct Note {
context: OneOrMany<AnyBase>,
r#type: NoteType,
id: Url,
pub(crate) attributed_to: Url,
to: Url,
pub(crate) attributed_to: ObjectId<Person>,
to: ObjectId<Person>,
content: String,
media_type: MediaTypeHtml,
source: Source,
@ -60,9 +60,11 @@ impl Note {
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
verify_domains_match(&self.attributed_to, &self.id)?;
let person =
get_or_fetch_and_upsert_person(&self.attributed_to, context, request_counter).await?;
verify_domains_match(self.attributed_to.inner(), &self.id)?;
let person = self
.attributed_to
.dereference(context, request_counter)
.await?;
if person.banned {
return Err(anyhow!("Person is banned from site").into());
}
@ -85,8 +87,8 @@ impl ToApub for PrivateMessage {
context: lemmy_context(),
r#type: NoteType::Note,
id: self.ap_id.clone().into(),
attributed_to: creator.actor_id.into_inner(),
to: recipient.actor_id.into(),
attributed_to: ObjectId::new(creator.actor_id),
to: ObjectId::new(recipient.actor_id),
content: self.content.clone(),
media_type: MediaTypeHtml::Html,
source: Source {
@ -121,9 +123,11 @@ impl FromApub for PrivateMessage {
request_counter: &mut i32,
) -> Result<PrivateMessage, LemmyError> {
let ap_id = Some(note.id(expected_domain)?.clone().into());
let creator =
get_or_fetch_and_upsert_person(&note.attributed_to, context, request_counter).await?;
let recipient = get_or_fetch_and_upsert_person(&note.to, context, request_counter).await?;
let creator = note
.attributed_to
.dereference(context, request_counter)
.await?;
let recipient = note.to.dereference(context, request_counter).await?;
let form = PrivateMessageForm {
creator_id: creator.id,

@ -145,14 +145,14 @@ pub fn derive_activity_fields(input: proc_macro::TokenStream) -> proc_macro::Tok
unimplemented!()
};
let cc_impl = if has_cc {
quote! {self.cc.clone().into()}
quote! {self.cc.iter().map(|i| i.clone().into()).collect()}
} else {
quote! {vec![]}
};
quote! {
impl #impl_generics lemmy_apub_lib::ActivityFields for #name #ty_generics #where_clause {
fn id_unchecked(&self) -> &url::Url { &self.id }
fn actor(&self) -> &url::Url { &self.actor }
fn actor(&self) -> &url::Url { &self.actor.inner() }
fn cc(&self) -> Vec<url::Url> { #cc_impl }
}
}

@ -12,6 +12,7 @@ extern crate diesel_migrations;
#[cfg(test)]
extern crate serial_test;
use chrono::NaiveDateTime;
use diesel::{result::Error, *};
use lemmy_db_schema::{CommunityId, DbUrl, PersonId};
use lemmy_utils::ApiError;
@ -145,14 +146,14 @@ pub trait DeleteableOrRemoveable {
fn blank_out_deleted_or_removed_info(self) -> Self;
}
// TODO: move this to apub lib
pub trait ApubObject {
type Form;
/// If this object should be refetched after a certain interval, it should return the last refresh
/// time here. This is mainly used to update remote actors.
fn last_refreshed_at(&self) -> Option<NaiveDateTime>;
fn read_from_apub_id(conn: &PgConnection, object_id: &DbUrl) -> Result<Self, Error>
where
Self: Sized;
fn upsert(conn: &PgConnection, user_form: &Self::Form) -> Result<Self, Error>
where
Self: Sized;
}
pub trait MaybeOptional<T> {

@ -1,7 +1,6 @@
use crate::Crud;
use diesel::{dsl::*, result::Error, sql_types::Text, *};
use lemmy_db_schema::{source::activity::*, DbUrl};
use log::debug;
use serde::Serialize;
use serde_json::Value;
use std::{
@ -72,7 +71,6 @@ impl Activity_ for Activity {
where
T: Serialize + Debug,
{
debug!("{}", serde_json::to_string_pretty(&data)?);
let activity_form = ActivityForm {
ap_id,
data: serde_json::to_value(&data)?,

@ -1,4 +1,5 @@
use crate::{ApubObject, Crud, DeleteableOrRemoveable, Likeable, Saveable};
use chrono::NaiveDateTime;
use diesel::{dsl::*, result::Error, *};
use lemmy_db_schema::{
naive_now,
@ -50,6 +51,7 @@ pub trait Comment_ {
comment_id: CommentId,
new_content: &str,
) -> Result<Comment, Error>;
fn upsert(conn: &PgConnection, comment_form: &CommentForm) -> Result<Comment, Error>;
}
impl Comment_ for Comment {
@ -133,6 +135,16 @@ impl Comment_ for Comment {
.set((content.eq(new_content), updated.eq(naive_now())))
.get_result::<Self>(conn)
}
fn upsert(conn: &PgConnection, comment_form: &CommentForm) -> Result<Comment, Error> {
use lemmy_db_schema::schema::comment::dsl::*;
insert_into(comment)
.values(comment_form)
.on_conflict(ap_id)
.do_update()
.set(comment_form)
.get_result::<Self>(conn)
}
}
impl Crud for Comment {
@ -168,20 +180,13 @@ impl Crud for Comment {
}
impl ApubObject for Comment {
type Form = CommentForm;
fn read_from_apub_id(conn: &PgConnection, object_id: &DbUrl) -> Result<Self, Error> {
use lemmy_db_schema::schema::comment::dsl::*;
comment.filter(ap_id.eq(object_id)).first::<Self>(conn)
fn last_refreshed_at(&self) -> Option<NaiveDateTime> {
None
}
fn upsert(conn: &PgConnection, comment_form: &CommentForm) -> Result<Self, Error> {
fn read_from_apub_id(conn: &PgConnection, object_id: &DbUrl) -> Result<Self, Error> {
use lemmy_db_schema::schema::comment::dsl::*;
insert_into(comment)
.values(comment_form)
.on_conflict(ap_id)
.do_update()
.set(comment_form)
.get_result::<Self>(conn)
comment.filter(ap_id.eq(object_id)).first::<Self>(conn)
}
}

@ -1,4 +1,5 @@
use crate::{ApubObject, Bannable, Crud, DeleteableOrRemoveable, Followable, Joinable};
use chrono::NaiveDateTime;
use diesel::{dsl::*, result::Error, *};
use lemmy_db_schema::{
naive_now,
@ -93,23 +94,16 @@ impl Crud for Community {
}
impl ApubObject for Community {
type Form = CommunityForm;
fn last_refreshed_at(&self) -> Option<NaiveDateTime> {
Some(self.last_refreshed_at)
}
fn read_from_apub_id(conn: &PgConnection, for_actor_id: &DbUrl) -> Result<Self, Error> {
use lemmy_db_schema::schema::community::dsl::*;
community
.filter(actor_id.eq(for_actor_id))
.first::<Self>(conn)
}
fn upsert(conn: &PgConnection, community_form: &CommunityForm) -> Result<Community, Error> {
use lemmy_db_schema::schema::community::dsl::*;
insert_into(community)
.values(community_form)
.on_conflict(actor_id)
.do_update()
.set(community_form)
.get_result::<Self>(conn)
}
}
pub trait Community_ {
@ -129,6 +123,7 @@ pub trait Community_ {
conn: &PgConnection,
followers_url: &DbUrl,
) -> Result<Community, Error>;
fn upsert(conn: &PgConnection, community_form: &CommunityForm) -> Result<Community, Error>;
}
impl Community_ for Community {
@ -176,6 +171,16 @@ impl Community_ for Community {
.filter(followers_url.eq(followers_url_))
.first::<Self>(conn)
}
fn upsert(conn: &PgConnection, community_form: &CommunityForm) -> Result<Community, Error> {
use lemmy_db_schema::schema::community::dsl::*;
insert_into(community)
.values(community_form)
.on_conflict(actor_id)
.do_update()
.set(community_form)
.get_result::<Self>(conn)
}
}
impl Joinable for CommunityModerator {

@ -1,4 +1,5 @@
use crate::{ApubObject, Crud};
use chrono::NaiveDateTime;
use diesel::{dsl::*, result::Error, *};
use lemmy_db_schema::{
naive_now,
@ -181,7 +182,10 @@ impl Crud for Person {
}
impl ApubObject for Person {
type Form = PersonForm;
fn last_refreshed_at(&self) -> Option<NaiveDateTime> {
Some(self.last_refreshed_at)
}
fn read_from_apub_id(conn: &PgConnection, object_id: &DbUrl) -> Result<Self, Error> {
use lemmy_db_schema::schema::person::dsl::*;
person
@ -189,15 +193,6 @@ impl ApubObject for Person {
.filter(actor_id.eq(object_id))
.first::<Self>(conn)
}
fn upsert(conn: &PgConnection, person_form: &PersonForm) -> Result<Person, Error> {
insert_into(person)
.values(person_form)
.on_conflict(actor_id)
.do_update()
.set(person_form)
.get_result::<Self>(conn)
}
}
pub trait Person_ {
@ -206,6 +201,7 @@ pub trait Person_ {
fn find_by_name(conn: &PgConnection, name: &str) -> Result<Person, Error>;
fn mark_as_updated(conn: &PgConnection, person_id: PersonId) -> Result<Person, Error>;
fn delete_account(conn: &PgConnection, person_id: PersonId) -> Result<Person, Error>;
fn upsert(conn: &PgConnection, person_form: &PersonForm) -> Result<Person, Error>;
}
impl Person_ for Person {
@ -256,6 +252,15 @@ impl Person_ for Person {
))
.get_result::<Self>(conn)
}
fn upsert(conn: &PgConnection, person_form: &PersonForm) -> Result<Person, Error> {
insert_into(person)
.values(person_form)
.on_conflict(actor_id)
.do_update()
.set(person_form)
.get_result::<Self>(conn)
}
}
#[cfg(test)]

@ -1,4 +1,5 @@
use crate::{ApubObject, Crud, DeleteableOrRemoveable, Likeable, Readable, Saveable};
use chrono::NaiveDateTime;
use diesel::{dsl::*, result::Error, *};
use lemmy_db_schema::{
naive_now,
@ -72,6 +73,7 @@ pub trait Post_ {
new_stickied: bool,
) -> Result<Post, Error>;
fn is_post_creator(person_id: PersonId, post_creator_id: PersonId) -> bool;
fn upsert(conn: &PgConnection, post_form: &PostForm) -> Result<Post, Error>;
}
impl Post_ for Post {
@ -179,14 +181,6 @@ impl Post_ for Post {
fn is_post_creator(person_id: PersonId, post_creator_id: PersonId) -> bool {
person_id == post_creator_id
}
}
impl ApubObject for Post {
type Form = PostForm;
fn read_from_apub_id(conn: &PgConnection, object_id: &DbUrl) -> Result<Self, Error> {
use lemmy_db_schema::schema::post::dsl::*;
post.filter(ap_id.eq(object_id)).first::<Self>(conn)
}
fn upsert(conn: &PgConnection, post_form: &PostForm) -> Result<Post, Error> {
use lemmy_db_schema::schema::post::dsl::*;
@ -199,6 +193,17 @@ impl ApubObject for Post {
}
}
impl ApubObject for Post {
fn last_refreshed_at(&self) -> Option<NaiveDateTime> {
None
}
fn read_from_apub_id(conn: &PgConnection, object_id: &DbUrl) -> Result<Self, Error> {
use lemmy_db_schema::schema::post::dsl::*;
post.filter(ap_id.eq(object_id)).first::<Self>(conn)
}
}
impl Likeable for PostLike {
type Form = PostLikeForm;
type IdType = PostId;

@ -1,4 +1,5 @@
use crate::{ApubObject, Crud, DeleteableOrRemoveable};
use chrono::NaiveDateTime;
use diesel::{dsl::*, result::Error, *};
use lemmy_db_schema::{naive_now, source::private_message::*, DbUrl, PersonId, PrivateMessageId};
@ -30,7 +31,10 @@ impl Crud for PrivateMessage {
}
impl ApubObject for PrivateMessage {
type Form = PrivateMessageForm;
fn last_refreshed_at(&self) -> Option<NaiveDateTime> {
None
}
fn read_from_apub_id(conn: &PgConnection, object_id: &DbUrl) -> Result<Self, Error>
where
Self: Sized,
@ -40,16 +44,6 @@ impl ApubObject for PrivateMessage {
.filter(ap_id.eq(object_id))
.first::<Self>(conn)
}
fn upsert(conn: &PgConnection, private_message_form: &PrivateMessageForm) -> Result<Self, Error> {
use lemmy_db_schema::schema::private_message::dsl::*;
insert_into(private_message)
.values(private_message_form)
.on_conflict(ap_id)
.do_update()
.set(private_message_form)
.get_result::<Self>(conn)
}
}
pub trait PrivateMessage_ {
@ -77,6 +71,10 @@ pub trait PrivateMessage_ {
conn: &PgConnection,
for_recipient_id: PersonId,
) -> Result<Vec<PrivateMessage>, Error>;
fn upsert(
conn: &PgConnection,
private_message_form: &PrivateMessageForm,
) -> Result<PrivateMessage, Error>;
}
impl PrivateMessage_ for PrivateMessage {
@ -138,6 +136,19 @@ impl PrivateMessage_ for PrivateMessage {
.set(read.eq(true))
.get_results::<Self>(conn)
}
fn upsert(
conn: &PgConnection,
private_message_form: &PrivateMessageForm,
) -> Result<PrivateMessage, Error> {
use lemmy_db_schema::schema::private_message::dsl::*;
insert_into(private_message)
.values(private_message_form)
.on_conflict(ap_id)
.do_update()
.set(private_message_form)
.get_result::<Self>(conn)
}
}
impl DeleteableOrRemoveable for PrivateMessage {

Loading…
Cancel
Save