wip: try to fix activity routing

pull/1652/head
Felix Ableitner 3 years ago
parent ff7c64ea6f
commit 3d2e19f3c3

7
Cargo.lock generated

@ -1876,7 +1876,6 @@ dependencies = [
"strum_macros 0.20.1",
"thiserror",
"tokio 0.3.7",
"trait_enum",
"url",
"uuid",
]
@ -3690,12 +3689,6 @@ dependencies = [
"tracing",
]
[[package]]
name = "trait_enum"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "130dd741d3c71f76d031e58caffff3624eaaa2db9bd8c4b05406a71885300fc7"
[[package]]
name = "trust-dns-proto"
version = "0.19.6"

@ -9,7 +9,7 @@
"scripts": {
"lint": "tsc --noEmit && eslint --report-unused-disable-directives --ext .js,.ts,.tsx src",
"fix": "prettier --write src && eslint --fix src",
"api-test": "jest src/ -i --verbose"
"api-test": "jest src/post.spec.ts -i --verbose"
},
"devDependencies": {
"@types/jest": "^26.0.23",

@ -3,6 +3,7 @@ set -e
export LEMMY_TEST_SEND_SYNC=1
export RUST_BACKTRACE=1
export RUST_LOG="warn,lemmy_server=debug,lemmy_api=debug,lemmy_api_common=debug,lemmy_api_crud=debug,lemmy_apub=debug,lemmy_apub_receive=debug,lemmy_db_queries=debug,lemmy_db_schema=debug,lemmy_db_views=debug,lemmy_db_views_actor=debug,lemmy_db_views_moderator=debug,lemmy_routes=debug,lemmy_utils=debug,lemmy_websocket=debug"
for INSTANCE in lemmy_alpha lemmy_beta lemmy_gamma lemmy_delta lemmy_epsilon; do
psql "${LEMMY_DATABASE_URL}/lemmy" -c "DROP DATABASE IF EXISTS $INSTANCE"

@ -88,7 +88,7 @@ test('Create a post', async () => {
let searchEpsilon = await searchPost(epsilon, postRes.post_view.post);
expect(searchEpsilon.posts[0]).toBeUndefined();
});
/*
test('Create a post in a non-existent community', async () => {
let postRes = await createPost(alpha, -2);
expect(postRes).toStrictEqual({ error: 'couldnt_create_post' });
@ -363,3 +363,4 @@ test('Enforce community ban for federated user', async () => {
let betaPost = searchBeta.posts[0];
expect(betaPost).toBeDefined();
});
*/

@ -50,4 +50,3 @@ thiserror = "1.0.26"
background-jobs = "0.8.0"
reqwest = { version = "0.10.10", features = ["json"] }
backtrace = "0.3.56"
trait_enum = "0.5.0"

@ -49,7 +49,7 @@ where
if check_is_apub_id_valid(&inbox, false).is_ok() {
debug!(
"Sending activity {:?} to {}",
&activity.id_unchecked(),
&activity.id_unchecked().map(ToString::to_string),
&inbox
);
send_activity_internal(context, activity, creator, vec![inbox], true, true).await?;
@ -88,7 +88,7 @@ where
.collect();
debug!(
"Sending activity {:?} to followers of {}",
&activity.id_unchecked().map(|i| i.to_string()),
&activity.id_unchecked().map(ToString::to_string),
&community.actor_id
);
@ -127,7 +127,7 @@ where
check_is_apub_id_valid(&inbox, false)?;
debug!(
"Sending activity {:?} to community {}",
&activity.id_unchecked(),
&activity.id_unchecked().map(ToString::to_string),
&community.actor_id
);
// dont send to object_actor here, as that is responsibility of the community itself

@ -14,10 +14,7 @@ use crate::{
};
use chrono::NaiveDateTime;
use http::StatusCode;
use lemmy_db_schema::{
naive_now,
source::{community::Community, person::Person},
};
use lemmy_db_schema::naive_now;
use lemmy_utils::LemmyError;
use lemmy_websocket::LemmyContext;
use serde::Deserialize;
@ -40,47 +37,26 @@ where
false
}
trait_enum! {
pub enum Actor: ActorType {
// TODO: remove this
pub enum Actor {
Person,
Community,
}
}
/*
impl ActorType for Actor {
fn is_local(&self) -> bool {
self.
self.is_local()
}
fn actor_id(&self) -> Url {
self.actor_id()
}
fn name(&self) -> String {
self.name()
}
fn public_key(&self) -> Option<String> {
self.public_key()
}
fn private_key(&self) -> Option<String> {
self.private_key()
}
fn get_shared_inbox_or_inbox_url(&self) -> Url {
self.get_shared_inbox_or_inbox_url()
}
}
*/
/// Get a remote actor from its apub ID (either a person or a community). Thin wrapper around
/// `get_or_fetch_and_upsert_person()` and `get_or_fetch_and_upsert_community()`.
///
/// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
/// Otherwise it is fetched from the remote instance, stored and returned.
pub async fn get_or_fetch_and_upsert_actor(
apub_id: &Url,
context: &LemmyContext,
recursion_counter: &mut i32,
) -> Result<Actor, LemmyError> {
) -> Result<Box<dyn ActorType>, LemmyError> {
let community = get_or_fetch_and_upsert_community(apub_id, context, recursion_counter).await;
let actor: Actor = match community {
Ok(c) => Actor::Community(c),
Err(_) => {
Actor::Person(get_or_fetch_and_upsert_person(apub_id, context, recursion_counter).await?)
}
let actor: Box<dyn ActorType> = match community {
Ok(c) => Box::new(c),
Err(_) => Box::new(get_or_fetch_and_upsert_person(apub_id, context, recursion_counter).await?),
};
Ok(actor)
}

@ -1,7 +1,5 @@
#[macro_use]
extern crate lazy_static;
#[macro_use]
extern crate trait_enum;
pub mod activities;
pub mod activity_queue;

@ -1,4 +1,9 @@
use activitystreams::error::DomainError;
use activitystreams::{
base::AnyBase,
error::DomainError,
primitives::OneOrMany,
unparsed::Unparsed,
};
use lemmy_utils::LemmyError;
use lemmy_websocket::LemmyContext;
use std::marker::PhantomData;
@ -31,6 +36,42 @@ pub enum PublicUrl {
Public,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ActivityCommonFields {
#[serde(rename = "@context")]
pub context: OneOrMany<AnyBase>,
id: Url,
pub actor: Url,
// unparsed fields
#[serde(flatten)]
pub unparsed: Unparsed,
}
impl ActivityCommonFields {
pub fn id_unchecked(&self) -> &Url {
&self.id
}
}
#[async_trait::async_trait(?Send)]
pub trait ActivityHandlerNew {
// TODO: also need to check for instance/community blocks in here
async fn verify(
&self,
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<(), LemmyError>;
async fn receive(
&self,
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<(), LemmyError>;
fn common(&self) -> &ActivityCommonFields;
}
#[async_trait::async_trait(?Send)]
pub trait ActivityHandler {
type Actor;

@ -1,10 +1,18 @@
use crate::activities::{following::follow::FollowCommunity, LemmyActivity};
use activitystreams::activity::kind::AcceptType;
use lemmy_api_common::blocking;
use lemmy_apub::{check_is_apub_id_valid, fetcher::person::get_or_fetch_and_upsert_person};
use lemmy_apub_lib::{verify_domains_match, ActivityHandler};
use lemmy_apub::{
check_is_apub_id_valid,
fetcher::{community::get_or_fetch_and_upsert_community, person::get_or_fetch_and_upsert_person},
};
use lemmy_apub_lib::{
verify_domains_match,
ActivityCommonFields,
ActivityHandler,
ActivityHandlerNew,
};
use lemmy_db_queries::Followable;
use lemmy_db_schema::source::community::{Community, CommunityFollower};
use lemmy_db_schema::source::community::CommunityFollower;
use lemmy_utils::LemmyError;
use lemmy_websocket::LemmyContext;
use url::Url;
@ -16,32 +24,37 @@ pub struct AcceptFollowCommunity {
object: LemmyActivity<FollowCommunity>,
#[serde(rename = "type")]
kind: AcceptType,
#[serde(flatten)]
common: ActivityCommonFields,
}
/// Handle accepted follows
#[async_trait::async_trait(?Send)]
impl ActivityHandler for LemmyActivity<AcceptFollowCommunity> {
type Actor = Community;
async fn verify(&self, context: &LemmyContext) -> Result<(), LemmyError> {
verify_domains_match(&self.actor, self.id_unchecked())?;
check_is_apub_id_valid(&self.actor, false)?;
self.inner.object.verify(context).await
impl ActivityHandlerNew for AcceptFollowCommunity {
async fn verify(&self, context: &LemmyContext, _: &mut i32) -> Result<(), LemmyError> {
verify_domains_match(&self.common.actor, &self.common.id_unchecked())?;
check_is_apub_id_valid(&self.common.actor, false)?;
self.object.verify(context).await
}
async fn receive(
&self,
actor: Self::Actor,
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
let person = get_or_fetch_and_upsert_person(&self.inner.to, context, request_counter).await?;
let actor =
get_or_fetch_and_upsert_community(&self.common.actor, context, request_counter).await?;
let to = get_or_fetch_and_upsert_person(&self.to, context, request_counter).await?;
// This will throw an error if no follow was requested
blocking(context.pool(), move |conn| {
CommunityFollower::follow_accepted(conn, actor.id, person.id)
CommunityFollower::follow_accepted(conn, actor.id, to.id)
})
.await??;
Ok(())
}
fn common(&self) -> &ActivityCommonFields {
&self.common
}
}

@ -14,6 +14,7 @@ pub mod following;
pub mod post;
pub mod private_message;
// TODO: remove this
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct LemmyActivity<Kind> {

@ -1,8 +1,14 @@
use crate::activities::{post::send_websocket_message, LemmyActivity};
use crate::activities::post::send_websocket_message;
use activitystreams::{activity::kind::CreateType, base::BaseExt};
use lemmy_apub::{check_is_apub_id_valid, objects::FromApub, ActorType, PageExt};
use lemmy_apub_lib::{verify_domains_match, ActivityHandler, PublicUrl};
use lemmy_db_schema::source::{person::Person, post::Post};
use lemmy_apub::{
check_is_apub_id_valid,
fetcher::person::get_or_fetch_and_upsert_person,
objects::FromApub,
ActorType,
PageExt,
};
use lemmy_apub_lib::{verify_domains_match, ActivityCommonFields, ActivityHandlerNew, PublicUrl};
use lemmy_db_schema::source::post::Post;
use lemmy_utils::LemmyError;
use lemmy_websocket::{LemmyContext, UserOperationCrud};
use url::Url;
@ -15,26 +21,27 @@ pub struct CreatePost {
cc: Vec<Url>,
#[serde(rename = "type")]
kind: CreateType,
#[serde(flatten)]
common: ActivityCommonFields,
}
#[async_trait::async_trait(?Send)]
impl ActivityHandler for LemmyActivity<CreatePost> {
type Actor = Person;
async fn verify(&self, _context: &LemmyContext) -> Result<(), LemmyError> {
verify_domains_match(self.id_unchecked(), &self.actor)?;
self.inner.object.id(self.actor.as_str())?;
check_is_apub_id_valid(&self.actor, false)
impl ActivityHandlerNew for CreatePost {
async fn verify(&self, _context: &LemmyContext, _: &mut i32) -> Result<(), LemmyError> {
verify_domains_match(self.common.id_unchecked(), &self.common.actor)?;
self.object.id(self.common.actor.as_str())?;
check_is_apub_id_valid(&self.common.actor, false)
}
async fn receive(
&self,
actor: Self::Actor,
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
let actor =
get_or_fetch_and_upsert_person(&self.common.actor, context, request_counter).await?;
let post = Post::from_apub(
&self.inner.object,
&self.object,
context,
actor.actor_id(),
request_counter,
@ -44,4 +51,8 @@ impl ActivityHandler for LemmyActivity<CreatePost> {
send_websocket_message(post.id, UserOperationCrud::CreatePost, context).await
}
fn common(&self) -> &ActivityCommonFields {
&self.common
}
}

@ -1,8 +1,7 @@
use crate::activities::{post::like_or_dislike_post, LemmyActivity};
use crate::activities::post::like_or_dislike_post;
use activitystreams::activity::kind::LikeType;
use lemmy_apub::check_is_apub_id_valid;
use lemmy_apub_lib::{verify_domains_match, ActivityHandler, PublicUrl};
use lemmy_db_schema::source::person::Person;
use lemmy_apub::{check_is_apub_id_valid, fetcher::person::get_or_fetch_and_upsert_person};
use lemmy_apub_lib::{verify_domains_match, ActivityCommonFields, ActivityHandlerNew, PublicUrl};
use lemmy_utils::LemmyError;
use lemmy_websocket::LemmyContext;
use url::Url;
@ -15,23 +14,28 @@ pub struct LikePost {
cc: [Url; 1],
#[serde(rename = "type")]
kind: LikeType,
#[serde(flatten)]
common: ActivityCommonFields,
}
#[async_trait::async_trait(?Send)]
impl ActivityHandler for LemmyActivity<LikePost> {
type Actor = Person;
async fn verify(&self, _context: &LemmyContext) -> Result<(), LemmyError> {
verify_domains_match(&self.actor, self.id_unchecked())?;
check_is_apub_id_valid(&self.actor, false)
impl ActivityHandlerNew for LikePost {
async fn verify(&self, _context: &LemmyContext, _: &mut i32) -> Result<(), LemmyError> {
verify_domains_match(&self.common.actor, self.common.id_unchecked())?;
check_is_apub_id_valid(&self.common.actor, false)
}
async fn receive(
&self,
actor: Self::Actor,
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
like_or_dislike_post(1, &actor, &self.inner.object, context, request_counter).await
let actor =
get_or_fetch_and_upsert_person(&self.common.actor, context, request_counter).await?;
like_or_dislike_post(1, &actor, &self.object, context, request_counter).await
}
fn common(&self) -> &ActivityCommonFields {
&self.common
}
}

@ -26,8 +26,8 @@ impl ActivityHandler for LemmyActivity<UndoLikePost> {
async fn verify(&self, context: &LemmyContext) -> Result<(), LemmyError> {
verify_domains_match(&self.actor, self.id_unchecked())?;
verify_domains_match(&self.actor, &self.inner.object.inner.object)?;
check_is_apub_id_valid(&self.actor, false)?;
self.inner.object.verify(context).await
check_is_apub_id_valid(&self.actor, false)
//self.inner.object.verify(context).await
}
async fn receive(

@ -61,7 +61,8 @@ pub async fn community_inbox(
path: web::Path<String>,
context: web::Data<LemmyContext>,
) -> Result<HttpResponse, LemmyError> {
receive_activity(request, input.into_inner(), Some(path.0), context).await
//receive_activity(request, input.into_inner(), Some(path.0), context).await
todo!()
}
/// Returns an empty followers collection, only populating the size (for privacy).

@ -49,6 +49,7 @@ use lemmy_websocket::LemmyContext;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(untagged)]
pub enum PersonInboxActivities {
AcceptFollowCommunity(AcceptFollowCommunity),
CreatePrivateMessage(CreatePrivateMessage),
@ -59,6 +60,7 @@ pub enum PersonInboxActivities {
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(untagged)]
pub enum GroupInboxActivities {
FollowCommunity(FollowCommunity),
UndoFollowCommunity(UndoFollowCommunity),
@ -94,6 +96,7 @@ pub enum GroupInboxActivities {
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(untagged)]
pub enum SharedInboxActivities {
// received by person
AcceptFollowCommunity(AcceptFollowCommunity),

@ -1,24 +1,28 @@
use actix_web::{body::Body, web, HttpRequest, HttpResponse};
use http::StatusCode;
use serde::{Deserialize, Serialize};
use url::Url;
use crate::{activities::LemmyActivity, http::inbox_enums::SharedInboxActivities};
use crate::activities::{
following::accept::AcceptFollowCommunity,
post::{create::CreatePost, like::LikePost},
};
use actix_web::{body::Body, web, web::Bytes, HttpRequest, HttpResponse};
use anyhow::{anyhow, Context};
use futures::StreamExt;
use http::StatusCode;
use lemmy_api_common::blocking;
use lemmy_apub::{
check_is_apub_id_valid,
extensions::signatures::verify_signature,
fetcher::{get_or_fetch_and_upsert_actor, Actor},
fetcher::get_or_fetch_and_upsert_actor,
insert_activity,
APUB_JSON_CONTENT_TYPE,
};
use lemmy_apub_lib::ActivityHandler;
use lemmy_apub_lib::{ActivityCommonFields, ActivityHandlerNew};
use lemmy_db_queries::{source::activity::Activity_, DbPool};
use lemmy_db_schema::source::activity::Activity;
use lemmy_utils::{location_info, settings::structs::Settings, LemmyError};
use lemmy_websocket::LemmyContext;
use std::fmt::Debug;
use log::debug;
use serde::{Deserialize, Serialize};
use std::{fmt::Debug, io::Read};
use url::Url;
pub mod comment;
pub mod community;
@ -26,51 +30,99 @@ pub mod inbox_enums;
pub mod person;
pub mod post;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(untagged)]
enum Ac {
CreatePost(CreatePost),
LikePost(LikePost),
AcceptFollowCommunity(AcceptFollowCommunity),
}
// TODO: write a derive trait which creates this
#[async_trait::async_trait(?Send)]
impl ActivityHandlerNew for Ac {
async fn verify(
&self,
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
match self {
Ac::CreatePost(a) => a.verify(context, request_counter).await,
Ac::LikePost(a) => a.verify(context, request_counter).await,
Ac::AcceptFollowCommunity(a) => a.verify(context, request_counter).await,
}
}
async fn receive(
&self,
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
match self {
Ac::CreatePost(a) => a.receive(context, request_counter).await,
Ac::LikePost(a) => a.receive(context, request_counter).await,
Ac::AcceptFollowCommunity(a) => a.receive(context, request_counter).await,
}
}
fn common(&self) -> &ActivityCommonFields {
match self {
Ac::CreatePost(a) => a.common(),
Ac::LikePost(a) => a.common(),
Ac::AcceptFollowCommunity(a) => a.common(),
}
}
}
pub async fn shared_inbox(
request: HttpRequest,
input: web::Json<LemmyActivity<SharedInboxActivities>>,
mut body: web::Payload,
context: web::Data<LemmyContext>,
) -> Result<HttpResponse, LemmyError> {
receive_activity(request, input.into_inner(), None, context).await
let mut bytes = web::BytesMut::new();
while let Some(item) = body.next().await {
bytes.extend_from_slice(&item?);
}
let mut unparsed: String = String::new();
Bytes::from(bytes).as_ref().read_to_string(&mut unparsed)?;
receive_activity::<Ac>(request, &unparsed, None, context).await
}
async fn receive_activity<T>(
async fn receive_activity<'a, T>(
request: HttpRequest,
activity: LemmyActivity<T>,
activity: &'a str,
expected_name: Option<String>,
context: web::Data<LemmyContext>,
) -> Result<HttpResponse, LemmyError>
where
T: ActivityHandler<Actor = lemmy_apub::fetcher::Actor>
+ Clone
+ Serialize
+ std::fmt::Debug
+ Send
+ 'static,
T: ActivityHandlerNew + Clone + Deserialize<'a> + Serialize + std::fmt::Debug + Send + 'static,
{
debug!("Received activity {}", activity);
let activity = serde_json::from_str::<T>(activity)?;
let activity_data = activity.common();
// TODO: which order to check things?
// Do nothing if we received the same activity before
if is_activity_already_known(context.pool(), activity.id_unchecked()).await? {
if is_activity_already_known(context.pool(), activity_data.id_unchecked()).await? {
return Ok(HttpResponse::Ok().finish());
}
assert_activity_not_local(&activity)?;
check_is_apub_id_valid(&activity.actor, false)?;
activity.inner.verify(&context).await?;
check_is_apub_id_valid(&activity_data.actor, false)?;
let request_counter = &mut 0;
let actor: Actor =
get_or_fetch_and_upsert_actor(&activity.actor, &context, request_counter).await?;
let actor =
get_or_fetch_and_upsert_actor(&activity_data.actor, &context, request_counter).await?;
if let Some(expected) = expected_name {
if expected != actor.name() {
return Ok(HttpResponse::BadRequest().finish());
}
}
verify_signature(&request, &actor.public_key().context(location_info!())?)?;
activity.verify(&context, request_counter).await?;
// Log the activity, so we avoid receiving and parsing it twice. Note that this could still happen
// if we receive the same activity twice in very quick succession.
insert_activity(
activity.id_unchecked(),
activity_data.id_unchecked(),
activity.clone(),
false,
true,
@ -78,10 +130,7 @@ where
)
.await?;
activity
.inner
.receive(actor, &context, request_counter)
.await?;
activity.receive(&context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
@ -153,10 +202,14 @@ pub(crate) async fn is_activity_already_known(
}
}
pub(in crate::http) fn assert_activity_not_local<T: Debug>(
activity: &LemmyActivity<T>,
fn assert_activity_not_local<T: Debug + ActivityHandlerNew>(
activity: &T,
) -> Result<(), LemmyError> {
let activity_domain = activity.id_unchecked().domain().context(location_info!())?;
let activity_domain = activity
.common()
.id_unchecked()
.domain()
.context(location_info!())?;
if activity_domain == Settings::get().hostname() {
return Err(

@ -53,7 +53,8 @@ pub async fn person_inbox(
path: web::Path<String>,
context: web::Data<LemmyContext>,
) -> Result<HttpResponse, LemmyError> {
receive_activity(request, input.into_inner(), Some(path.0), context).await
//receive_activity(request, input.into_inner(), Some(path.0), context).await
todo!()
}
pub(crate) async fn get_apub_person_outbox(

Loading…
Cancel
Save