@ -4,9 +4,10 @@ use crate::objects::{
person ::ApubPerson ,
post ::ApubPost ,
} ;
use activitypub_federation ::{ config ::Data , fetch ::object_id ::ObjectId };
use activitypub_federation ::{ config ::Data , fetch ::object_id ::ObjectId , traits ::Object };
use actix_web ::web ::Json ;
use futures ::{ future ::try_join_all , StreamExt } ;
use itertools ::Itertools ;
use lemmy_api_common ::{ context ::LemmyContext , SuccessResponse } ;
use lemmy_db_schema ::{
newtypes ::DbUrl ,
@ -30,8 +31,11 @@ use lemmy_utils::{
spawn_try_task ,
} ;
use serde ::{ Deserialize , Serialize } ;
use std ::future ::Future ;
use tracing ::info ;
const PARALLELISM : usize = 10 ;
/// Backup of user data. This struct should never be changed so that the data can be used as a
/// long-term backup in case the instance goes down unexpectedly. All fields are optional to allow
/// importing partial backups.
@ -167,141 +171,91 @@ pub async fn import_settings(
}
spawn_try_task ( async move {
const PARALLELISM : usize = 10 ;
let person_id = local_user_view . person . id ;
// These tasks fetch objects from remote instances which might be down.
// TODO: Would be nice if we could send a list of failed items with api response, but then
// the request would likely timeout.
let mut failed_items = vec! [ ] ;
info ! (
"Starting settings backup for {}",
"Starting settings import for {}" ,
local_user_view . person . name
) ;
futures ::stream ::iter (
data
. followed_communities
. clone ( )
. into_iter ( )
// reset_request_count works like clone, and is necessary to avoid running into request limit
. map ( | f | ( f , context . reset_request_count ( ) ) )
. map ( | ( followed , context ) | async move {
// need to reset outgoing request count to avoid running into limit
let community = followed . dereference ( & context ) . await ? ;
let form = CommunityFollowerForm {
person_id ,
community_id : community . id ,
pending : true ,
} ;
CommunityFollower ::follow ( & mut context . pool ( ) , & form ) . await ? ;
LemmyResult ::Ok ( ( ) )
} ) ,
)
. buffer_unordered ( PARALLELISM )
. collect ::< Vec < _ > > ( )
. await
. into_iter ( )
. enumerate ( )
. for_each ( | ( i , r ) | {
if let Err ( e ) = r {
failed_items . push ( data . followed_communities . get ( i ) . map ( | u | u . inner ( ) . clone ( ) ) ) ;
info ! ( "Failed to import followed community: {e}" ) ;
}
} ) ;
futures ::stream ::iter (
data
. saved_posts
. clone ( )
. into_iter ( )
. map ( | s | ( s , context . reset_request_count ( ) ) )
. map ( | ( saved , context ) | async move {
let post = saved . dereference ( & context ) . await ? ;
let form = PostSavedForm {
person_id ,
post_id : post . id ,
} ;
PostSaved ::save ( & mut context . pool ( ) , & form ) . await ? ;
LemmyResult ::Ok ( ( ) )
} ) ,
let failed_followed_communities = fetch_and_import (
data . followed_communities . clone ( ) ,
& context ,
| ( followed , context ) | async move {
let community = followed . dereference ( & context ) . await ? ;
let form = CommunityFollowerForm {
person_id ,
community_id : community . id ,
pending : true ,
} ;
CommunityFollower ::follow ( & mut context . pool ( ) , & form ) . await ? ;
LemmyResult ::Ok ( ( ) )
} ,
)
. buffer_unordered ( PARALLELISM )
. collect ::< Vec < _ > > ( )
. await
. into_iter ( )
. enumerate ( )
. for_each ( | ( i , r ) | {
if let Err ( e ) = r {
failed_items . push ( data . followed_communities . get ( i ) . map ( | u | u . inner ( ) . clone ( ) ) ) ;
info ! ( "Failed to import saved post community: {e}" ) ;
}
} ) ;
futures ::stream ::iter (
data
. saved_comments
. clone ( )
. into_iter ( )
. map ( | s | ( s , context . reset_request_count ( ) ) )
. map ( | ( saved , context ) | async move {
let comment = saved . dereference ( & context ) . await ? ;
let form = CommentSavedForm {
person_id ,
comment_id : comment . id ,
} ;
CommentSaved ::save ( & mut context . pool ( ) , & form ) . await ? ;
LemmyResult ::Ok ( ( ) )
} ) ,
. await ? ;
let failed_saved_posts = fetch_and_import (
data . saved_posts . clone ( ) ,
& context ,
| ( saved , context ) | async move {
let post = saved . dereference ( & context ) . await ? ;
let form = PostSavedForm {
person_id ,
post_id : post . id ,
} ;
PostSaved ::save ( & mut context . pool ( ) , & form ) . await ? ;
LemmyResult ::Ok ( ( ) )
} ,
)
. buffer_unordered ( PARALLELISM )
. collect ::< Vec < _ > > ( )
. await
. into_iter ( )
. enumerate ( )
. for_each ( | ( i , r ) | {
if let Err ( e ) = r {
failed_items . push ( data . followed_communities . get ( i ) . map ( | u | u . inner ( ) . clone ( ) ) ) ;
info ! ( "Failed to import saved comment community: {e}" ) ;
}
} ) ;
. await ? ;
let failed_items : Vec < _ > = failed_items . into_iter ( ) . flatten ( ) . collect ( ) ;
info ! (
"Finished settings backup for {}, failed items: {:#?}" ,
local_user_view . person . name , failed_items
) ;
let failed_saved_comments = fetch_and_import (
data . saved_comments . clone ( ) ,
& context ,
| ( saved , context ) | async move {
let comment = saved . dereference ( & context ) . await ? ;
let form = CommentSavedForm {
person_id ,
comment_id : comment . id ,
} ;
CommentSaved ::save ( & mut context . pool ( ) , & form ) . await ? ;
LemmyResult ::Ok ( ( ) )
} ,
)
. await ? ;
// These tasks don't connect to any remote instances but only insert directly in the database.
// That means the only error condition are db connection failures, so no extra error handling is
// needed.
try_join_all ( data . blocked_communities . iter ( ) . map ( | blocked | async {
// dont fetch unknown blocked objects from home server
let community = blocked . dereference_local ( & context ) . await ? ;
let form = CommunityBlockForm {
person_id ,
community_id : community . id ,
} ;
CommunityBlock ::block ( & mut context . pool ( ) , & form ) . await ? ;
LemmyResult ::Ok ( ( ) )
}) )
let failed_community_blocks = fetch_and_import (
data . blocked_communities . clone ( ) ,
& context ,
| ( blocked , context ) | async move {
let community = blocked . dereference ( & context ) . await ? ;
let form = CommunityBlockForm {
person_id ,
community_id : community . id,
} ;
CommunityBlock ::block ( & mut context . pool ( ) , & form ) . await ? ;
LemmyResult ::Ok ( ( ) )
} ,
)
. await ? ;
try_join_all ( data . blocked_users . iter ( ) . map ( | blocked | async {
// dont fetch unknown blocked objects from home server
let target = blocked . dereference_local ( & context ) . await ? ;
let form = PersonBlockForm {
person_id ,
target_id : target . id ,
} ;
PersonBlock ::block ( & mut context . pool ( ) , & form ) . await ? ;
LemmyResult ::Ok ( ( ) )
} ) )
let failed_user_blocks = fetch_and_import (
data . blocked_users . clone ( ) ,
& context ,
| ( blocked , context ) | async move {
let context = context . reset_request_count ( ) ;
let target = blocked . dereference ( & context ) . await ? ;
let form = PersonBlockForm {
person_id ,
target_id : target . id ,
} ;
PersonBlock ::block ( & mut context . pool ( ) , & form ) . await ? ;
LemmyResult ::Ok ( ( ) )
} ,
)
. await ? ;
try_join_all ( data . blocked_instances . iter ( ) . map ( | domain | async {
// dont fetch unknown blocked objects from home server
let instance = Instance ::read_or_create ( & mut context . pool ( ) , domain . clone ( ) ) . await ? ;
let form = InstanceBlockForm {
person_id ,
@ -312,12 +266,48 @@ pub async fn import_settings(
} ) )
. await ? ;
info ! ( "Settings import completed for {}, the following items failed: {failed_followed_communities}, {failed_saved_posts}, {failed_saved_comments}, {failed_community_blocks}, {failed_user_blocks}" ,
local_user_view . person . name ) ;
Ok ( ( ) )
} ) ;
Ok ( Json ( Default ::default ( ) ) )
}
async fn fetch_and_import < Kind , Fut > (
objects : Vec < ObjectId < Kind > > ,
context : & Data < LemmyContext > ,
import_fn : impl FnMut ( ( ObjectId < Kind > , Data < LemmyContext > ) ) -> Fut ,
) -> LemmyResult < String >
where
Kind : Object + Send + ' static ,
for < ' de2 > < Kind as Object > ::Kind : Deserialize < ' de2 > ,
Fut : Future < Output = LemmyResult < ( ) > > ,
{
let mut failed_items = vec! [ ] ;
futures ::stream ::iter (
objects
. clone ( )
. into_iter ( )
// need to reset outgoing request count to avoid running into limit
. map ( | s | ( s , context . reset_request_count ( ) ) )
. map ( import_fn ) ,
)
. buffer_unordered ( PARALLELISM )
. collect ::< Vec < _ > > ( )
. await
. into_iter ( )
. enumerate ( )
. for_each ( | ( i , r ) : ( usize , LemmyResult < ( ) > ) | {
if r . is_err ( ) {
if let Some ( object ) = objects . get ( i ) {
failed_items . push ( object . inner ( ) . clone ( ) ) ;
}
}
} ) ;
Ok ( failed_items . into_iter ( ) . join ( "," ) )
}
#[ cfg(test) ]
#[ allow(clippy::indexing_slicing) ]
mod tests {