@ -1,16 +1,15 @@
use chrono ::{ DateTime , TimeZone , Utc } ;
use clokwerk ::{ Scheduler, TimeUnits as CTimeUnits } ;
use clokwerk ::{ Async Scheduler, TimeUnits as CTimeUnits } ;
use diesel ::{
dsl ::IntervalDsl ,
sql_query ,
sql_types ::{ Integer , Timestamptz } ,
Connection ,
ExpressionMethods ,
NullableExpressionMethods ,
QueryDsl ,
QueryableByName ,
} ;
// Import week days and WeekDay
use diesel ::{ sql_query , PgConnection , RunQueryDsl } ;
use diesel_async ::{ AsyncPgConnection , RunQueryDsl } ;
use lemmy_api_common ::context ::LemmyContext ;
use lemmy_db_schema ::{
schema ::{
@ -24,153 +23,144 @@ use lemmy_db_schema::{
sent_activity ,
} ,
source ::instance ::{ Instance , InstanceForm } ,
utils ::{ naive_now, now , DELETED_REPLACEMENT_TEXT } ,
utils ::{ get_conn, naive_now, now , DbPool , DELETED_REPLACEMENT_TEXT } ,
} ;
use lemmy_routes ::nodeinfo ::NodeInfo ;
use lemmy_utils ::{
error ::{ LemmyError , LemmyResult } ,
REQWEST_TIMEOUT ,
} ;
use reqwest ::blocking ::Client ;
use std ::{ thread , time ::Duration } ;
use lemmy_utils ::error ::{ LemmyError , LemmyResult } ;
use reqwest_middleware ::ClientWithMiddleware ;
use std ::time ::Duration ;
use tracing ::{ error , info , warn } ;
/// Schedules various cleanup tasks for lemmy in a background thread
pub fn setup (
db_url : String ,
user_agent : String ,
context_1 : LemmyContext ,
) -> Result < ( ) , LemmyError > {
pub async fn setup ( context : LemmyContext ) -> Result < ( ) , LemmyError > {
// Setup the connections
let mut scheduler = Scheduler ::new ( ) ;
startup_jobs ( & db_url ) ;
let mut scheduler = AsyncScheduler ::new ( ) ;
startup_jobs ( & mut context . pool ( ) ) . await ;
let context_1 = context . clone ( ) ;
// Update active counts every hour
let url = db_url . clone ( ) ;
scheduler . every ( CTimeUnits ::hour ( 1 ) ) . run ( move | | {
PgConnection ::establish ( & url )
. map ( | mut conn | {
active_counts ( & mut conn ) ;
update_banned_when_expired ( & mut conn ) ;
} )
. map_err ( | e | {
error ! ( "Failed to establish db connection for active counts update: {e}" ) ;
} )
. ok ( ) ;
let context = context_1 . clone ( ) ;
async move {
active_counts ( & mut context . pool ( ) ) . await ;
update_banned_when_expired ( & mut context . pool ( ) ) . await ;
}
} ) ;
let context_1 = context . clone ( ) ;
// Update hot ranks every 15 minutes
let url = db_url . clone ( ) ;
scheduler . every ( CTimeUnits ::minutes ( 15 ) ) . run ( move | | {
PgConnection ::establish ( & url )
. map ( | mut conn | {
update_hot_ranks ( & mut conn ) ;
} )
. map_err ( | e | {
error ! ( "Failed to establish db connection for hot ranks update: {e}" ) ;
} )
. ok ( ) ;
scheduler . every ( CTimeUnits ::minutes ( 10 ) ) . run ( move | | {
let context = context_1 . clone ( ) ;
async move {
update_hot_ranks ( & mut context . pool ( ) ) . await ;
}
} ) ;
let context_1 = context . clone ( ) ;
// Delete any captcha answers older than ten minutes, every ten minutes
let url = db_url . clone ( ) ;
scheduler . every ( CTimeUnits ::minutes ( 10 ) ) . run ( move | | {
PgConnection ::establish ( & url )
. map ( | mut conn | {
delete_expired_captcha_answers ( & mut conn ) ;
} )
. map_err ( | e | {
error ! ( "Failed to establish db connection for captcha cleanup: {e}" ) ;
} )
. ok ( ) ;
let context = context_1 . clone ( ) ;
async move {
delete_expired_captcha_answers ( & mut context . pool ( ) ) . await ;
}
} ) ;
let context_1 = context . clone ( ) ;
// Clear old activities every week
let url = db_url . clone ( ) ;
scheduler . every ( CTimeUnits ::weeks ( 1 ) ) . run ( move | | {
PgConnection ::establish ( & url )
. map ( | mut conn | {
clear_old_activities ( & mut conn ) ;
} )
. map_err ( | e | {
error ! ( "Failed to establish db connection for activity cleanup: {e}" ) ;
} )
. ok ( ) ;
let context = context_1 . clone ( ) ;
async move {
clear_old_activities ( & mut context . pool ( ) ) . await ;
}
} ) ;
let context_1 = context . clone ( ) ;
// Remove old rate limit buckets after 1 to 2 hours of inactivity
scheduler . every ( CTimeUnits ::hour ( 1 ) ) . run ( move | | {
let hour = Duration ::from_secs ( 3600 ) ;
context_1 . settings_updated_channel ( ) . remove_older_than ( hour ) ;
let context = context_1 . clone ( ) ;
async move {
let hour = Duration ::from_secs ( 3600 ) ;
context . settings_updated_channel ( ) . remove_older_than ( hour ) ;
}
} ) ;
let context_1 = context . clone ( ) ;
// Overwrite deleted & removed posts and comments every day
let url = db_url . clone ( ) ;
scheduler . every ( CTimeUnits ::days ( 1 ) ) . run ( move | | {
PgConnection ::establish ( & db_url )
. map ( | mut conn | {
overwrite_deleted_posts_and_comments ( & mut conn ) ;
} )
. map_err ( | e | {
error ! ( "Failed to establish db connection for deleted content cleanup: {e}" ) ;
} )
. ok ( ) ;
let context = context_1 . clone ( ) ;
async move {
overwrite_deleted_posts_and_comments ( & mut context . pool ( ) ) . await ;
}
} ) ;
let context_1 = context . clone ( ) ;
// Update the Instance Software
scheduler . every ( CTimeUnits ::days ( 1 ) ) . run ( move | | {
PgConnection ::establish ( & url )
. map ( | mut conn | {
update_instance_software ( & mut conn , & user_agent )
. map_err ( | e | warn ! ( "Failed to update instance software: {e}" ) )
. ok ( ) ;
} )
. map_err ( | e | {
error ! ( "Failed to establish db connection for instance software update: {e}" ) ;
} )
. ok ( ) ;
let context = context_1 . clone ( ) ;
async move {
update_instance_software ( & mut context . pool ( ) , context . client ( ) )
. await
. map_err ( | e | warn ! ( "Failed to update instance software: {e}" ) )
. ok ( ) ;
}
} ) ;
// Manually run the scheduler in an event loop
loop {
scheduler . run_pending ( ) ;
t hread ::sleep ( Duration ::from_millis ( 1000 ) ) ;
scheduler . run_pending ( ) .await ;
t okio::time ::sleep ( Duration ::from_millis ( 1000 ) ) . await ;
}
}
/// Run these on server startup
fn startup_jobs ( db_url : & str ) {
let mut conn = PgConnection ::establish ( db_url ) . expect ( "could not establish connection" ) ;
active_counts ( & mut conn ) ;
update_hot_ranks ( & mut conn ) ;
update_banned_when_expired ( & mut conn ) ;
clear_old_activities ( & mut conn ) ;
overwrite_deleted_posts_and_comments ( & mut conn ) ;
async fn startup_jobs ( pool : & mut DbPool < ' _ > ) {
active_counts ( pool ) . await ;
update_hot_ranks ( pool ) . await ;
update_banned_when_expired ( pool ) . await ;
clear_old_activities ( pool ) . await ;
overwrite_deleted_posts_and_comments ( pool ) . await ;
}
/// Update the hot_rank columns for the aggregates tables
/// Runs in batches until all necessary rows are updated once
fn update_hot_ranks ( conn : & mut PgConnection ) {
async fn update_hot_ranks ( pool : & mut DbPool < ' _ > ) {
info ! ( "Updating hot ranks for all history..." ) ;
process_post_aggregates_ranks_in_batches ( conn ) ;
process_ranks_in_batches (
conn ,
"comment_aggregates" ,
"a.hot_rank != 0" ,
"SET hot_rank = hot_rank(a.score, a.published)" ,
) ;
process_ranks_in_batches (
conn ,
"community_aggregates" ,
"a.hot_rank != 0" ,
"SET hot_rank = hot_rank(a.subscribers, a.published)" ,
) ;
info ! ( "Finished hot ranks update!" ) ;
let conn = get_conn ( pool ) . await ;
match conn {
Ok ( mut conn ) = > {
process_post_aggregates_ranks_in_batches ( & mut conn ) . await ;
process_ranks_in_batches (
& mut conn ,
"comment_aggregates" ,
"a.hot_rank != 0" ,
"SET hot_rank = hot_rank(a.score, a.published)" ,
)
. await ;
process_ranks_in_batches (
& mut conn ,
"community_aggregates" ,
"a.hot_rank != 0" ,
"SET hot_rank = hot_rank(a.subscribers, a.published)" ,
)
. await ;
info ! ( "Finished hot ranks update!" ) ;
}
Err ( e ) = > {
error ! ( "Failed to get connection from pool: {e}" ) ;
}
}
}
#[ derive(QueryableByName) ]
@ -183,8 +173,8 @@ struct HotRanksUpdateResult {
/// In `where_clause` and `set_clause`, "a" will refer to the current aggregates table.
/// Locked rows are skipped in order to prevent deadlocks (they will likely get updated on the next
/// run)
fn process_ranks_in_batches (
conn : & mut PgConnection,
async fn process_ranks_in_batches (
conn : & mut Async PgConnection,
table_name : & str ,
where_clause : & str ,
set_clause : & str ,
@ -216,7 +206,8 @@ fn process_ranks_in_batches(
) )
. bind ::< Timestamptz , _ > ( previous_batch_last_published )
. bind ::< Integer , _ > ( update_batch_size )
. get_results ::< HotRanksUpdateResult > ( conn ) ;
. get_results ::< HotRanksUpdateResult > ( conn )
. await ;
match result {
Ok ( updated_rows ) = > {
@ -237,7 +228,7 @@ fn process_ranks_in_batches(
/// Post aggregates is a special case, since it needs to join to the community_aggregates
/// table, to get the active monthly user counts.
fn process_post_aggregates_ranks_in_batches ( conn : & mut PgConnection) {
async fn process_post_aggregates_ranks_in_batches ( conn : & mut Async PgConnection) {
let process_start_time : DateTime < Utc > = Utc
. timestamp_opt ( 0 , 0 )
. single ( )
@ -265,7 +256,8 @@ fn process_post_aggregates_ranks_in_batches(conn: &mut PgConnection) {
)
. bind ::< Timestamptz , _ > ( previous_batch_last_published )
. bind ::< Integer , _ > ( update_batch_size )
. get_results ::< HotRanksUpdateResult > ( conn ) ;
. get_results ::< HotRanksUpdateResult > ( conn )
. await ;
match result {
Ok ( updated_rows ) = > {
@ -284,185 +276,244 @@ fn process_post_aggregates_ranks_in_batches(conn: &mut PgConnection) {
) ;
}
fn delete_expired_captcha_answers ( conn : & mut PgConnection ) {
diesel ::delete (
captcha_answer ::table . filter ( captcha_answer ::published . lt ( now ( ) - IntervalDsl ::minutes ( 10 ) ) ) ,
)
. execute ( conn )
. map ( | _ | {
info ! ( "Done." ) ;
} )
. map_err ( | e | error ! ( "Failed to clear old captcha answers: {e}" ) )
. ok ( ) ;
async fn delete_expired_captcha_answers ( pool : & mut DbPool < ' _ > ) {
let conn = get_conn ( pool ) . await ;
match conn {
Ok ( mut conn ) = > {
diesel ::delete (
captcha_answer ::table
. filter ( captcha_answer ::published . lt ( now ( ) - IntervalDsl ::minutes ( 10 ) ) ) ,
)
. execute ( & mut conn )
. await
. map ( | _ | {
info ! ( "Done." ) ;
} )
. map_err ( | e | error ! ( "Failed to clear old captcha answers: {e}" ) )
. ok ( ) ;
}
Err ( e ) = > {
error ! ( "Failed to get connection from pool: {e}" ) ;
}
}
}
/// Clear old activities (this table gets very large)
fn clear_old_activities ( conn : & mut PgConnection ) {
async fn clear_old_activities ( pool: & mut DbPool < ' _ > ) {
info ! ( "Clearing old activities..." ) ;
diesel ::delete ( sent_activity ::table . filter ( sent_activity ::published . lt ( now ( ) - 3. months ( ) ) ) )
. execute ( conn )
. map_err ( | e | error ! ( "Failed to clear old sent activities: {e}" ) )
. ok ( ) ;
diesel ::delete (
received_activity ::table . filter ( received_activity ::published . lt ( now ( ) - 3. months ( ) ) ) ,
)
. execute ( conn )
. map ( | _ | info ! ( "Done." ) )
. map_err ( | e | error ! ( "Failed to clear old received activities: {e}" ) )
. ok ( ) ;
let conn = get_conn ( pool ) . await ;
match conn {
Ok ( mut conn ) = > {
diesel ::delete ( sent_activity ::table . filter ( sent_activity ::published . lt ( now ( ) - 3. months ( ) ) ) )
. execute ( & mut conn )
. await
. map_err ( | e | error ! ( "Failed to clear old sent activities: {e}" ) )
. ok ( ) ;
diesel ::delete (
received_activity ::table . filter ( received_activity ::published . lt ( now ( ) - 3. months ( ) ) ) ,
)
. execute ( & mut conn )
. await
. map ( | _ | info ! ( "Done." ) )
. map_err ( | e | error ! ( "Failed to clear old received activities: {e}" ) )
. ok ( ) ;
}
Err ( e ) = > {
error ! ( "Failed to get connection from pool: {e}" ) ;
}
}
}
/// overwrite posts and comments 30d after deletion
fn overwrite_deleted_posts_and_comments ( conn : & mut PgConnection ) {
async fn overwrite_deleted_posts_and_comments ( pool: & mut DbPool < ' _ > ) {
info ! ( "Overwriting deleted posts..." ) ;
diesel ::update (
post ::table
. filter ( post ::deleted . eq ( true ) )
. filter ( post ::updated . lt ( now ( ) . nullable ( ) - 1. months ( ) ) )
. filter ( post ::body . ne ( DELETED_REPLACEMENT_TEXT ) ) ,
)
. set ( (
post ::body . eq ( DELETED_REPLACEMENT_TEXT ) ,
post ::name . eq ( DELETED_REPLACEMENT_TEXT ) ,
) )
. execute ( conn )
. map ( | _ | {
info ! ( "Done." ) ;
} )
. map_err ( | e | error ! ( "Failed to overwrite deleted posts: {e}" ) )
. ok ( ) ;
info ! ( "Overwriting deleted comments..." ) ;
diesel ::update (
comment ::table
. filter ( comment ::deleted . eq ( true ) )
. filter ( comment ::updated . lt ( now ( ) . nullable ( ) - 1. months ( ) ) )
. filter ( comment ::content . ne ( DELETED_REPLACEMENT_TEXT ) ) ,
)
. set ( comment ::content . eq ( DELETED_REPLACEMENT_TEXT ) )
. execute ( conn )
. map ( | _ | {
info ! ( "Done." ) ;
} )
. map_err ( | e | error ! ( "Failed to overwrite deleted comments: {e}" ) )
. ok ( ) ;
let conn = get_conn ( pool ) . await ;
match conn {
Ok ( mut conn ) = > {
diesel ::update (
post ::table
. filter ( post ::deleted . eq ( true ) )
. filter ( post ::updated . lt ( now ( ) . nullable ( ) - 1. months ( ) ) )
. filter ( post ::body . ne ( DELETED_REPLACEMENT_TEXT ) ) ,
)
. set ( (
post ::body . eq ( DELETED_REPLACEMENT_TEXT ) ,
post ::name . eq ( DELETED_REPLACEMENT_TEXT ) ,
) )
. execute ( & mut conn )
. await
. map ( | _ | {
info ! ( "Done." ) ;
} )
. map_err ( | e | error ! ( "Failed to overwrite deleted posts: {e}" ) )
. ok ( ) ;
info ! ( "Overwriting deleted comments..." ) ;
diesel ::update (
comment ::table
. filter ( comment ::deleted . eq ( true ) )
. filter ( comment ::updated . lt ( now ( ) . nullable ( ) - 1. months ( ) ) )
. filter ( comment ::content . ne ( DELETED_REPLACEMENT_TEXT ) ) ,
)
. set ( comment ::content . eq ( DELETED_REPLACEMENT_TEXT ) )
. execute ( & mut conn )
. await
. map ( | _ | {
info ! ( "Done." ) ;
} )
. map_err ( | e | error ! ( "Failed to overwrite deleted comments: {e}" ) )
. ok ( ) ;
}
Err ( e ) = > {
error ! ( "Failed to get connection from pool: {e}" ) ;
}
}
}
/// Re-calculate the site and community active counts every 12 hours
fn active_counts ( conn : & mut PgConnection ) {
async fn active_counts ( pool : & mut DbPool < ' _ > ) {
info ! ( "Updating active site and community aggregates ..." ) ;
let intervals = vec! [
( "1 day" , "day" ) ,
( "1 week" , "week" ) ,
( "1 month" , "month" ) ,
( "6 months" , "half_year" ) ,
] ;
let conn = get_conn ( pool ) . await ;
for i in & intervals {
let update_site_stmt = format! (
match conn {
Ok ( mut conn ) = > {
let intervals = vec! [
( "1 day" , "day" ) ,
( "1 week" , "week" ) ,
( "1 month" , "month" ) ,
( "6 months" , "half_year" ) ,
] ;
for i in & intervals {
let update_site_stmt = format! (
"update site_aggregates set users_active_{} = (select * from site_aggregates_activity('{}')) where site_id = 1" ,
i . 1 , i . 0
) ;
sql_query ( update_site_stmt )
. execute ( conn )
. map_err ( | e | error ! ( "Failed to update site stats: {e}" ) )
. ok ( ) ;
sql_query ( update_site_stmt )
. execute ( & mut conn )
. await
. map_err ( | e | error ! ( "Failed to update site stats: {e}" ) )
. ok ( ) ;
let update_community_stmt = format! ( "update community_aggregates ca set users_active_{} = mv.count_ from community_aggregates_activity('{}') mv where ca.community_id = mv.community_id_" , i . 1 , i . 0 ) ;
sql_query ( update_community_stmt )
. execute ( conn )
. map_err ( | e | error ! ( "Failed to update community stats: {e}" ) )
. ok ( ) ;
}
let update_community_stmt = format! ( "update community_aggregates ca set users_active_{} = mv.count_ from community_aggregates_activity('{}') mv where ca.community_id = mv.community_id_" , i . 1 , i . 0 ) ;
sql_query ( update_community_stmt )
. execute ( & mut conn )
. await
. map_err ( | e | error ! ( "Failed to update community stats: {e}" ) )
. ok ( ) ;
}
info ! ( "Done." ) ;
info ! ( "Done." ) ;
}
Err ( e ) = > {
error ! ( "Failed to get connection from pool: {e}" ) ;
}
}
}
/// Set banned to false after ban expires
fn update_banned_when_expired ( conn : & mut PgConnection ) {
async fn update_banned_when_expired ( pool: & mut DbPool < ' _ > ) {
info ! ( "Updating banned column if it expires ..." ) ;
let conn = get_conn ( pool ) . await ;
match conn {
Ok ( mut conn ) = > {
diesel ::update (
person ::table
. filter ( person ::banned . eq ( true ) )
. filter ( person ::ban_expires . lt ( now ( ) . nullable ( ) ) ) ,
)
. set ( person ::banned . eq ( false ) )
. execute ( & mut conn )
. await
. map_err ( | e | error ! ( "Failed to update person.banned when expires: {e}" ) )
. ok ( ) ;
diesel ::update (
person ::table
. filter ( person ::banned . eq ( true ) )
. filter ( person ::ban_expires . lt ( now ( ) . nullable ( ) ) ) ,
)
. set ( person ::banned . eq ( false ) )
. execute ( conn )
. map_err ( | e | error ! ( "Failed to update person.banned when expires: {e}" ) )
. ok ( ) ;
diesel ::delete (
community_person_ban ::table . filter ( community_person_ban ::expires . lt ( now ( ) . nullable ( ) ) ) ,
)
. execute ( conn )
. map_err ( | e | error ! ( "Failed to remove community_ban expired rows: {e}" ) )
. ok ( ) ;
diesel ::delete (
community_person_ban ::table . filter ( community_person_ban ::expires . lt ( now ( ) . nullable ( ) ) ) ,
)
. execute ( & mut conn )
. await
. map_err ( | e | error ! ( "Failed to remove community_ban expired rows: {e}" ) )
. ok ( ) ;
}
Err ( e ) = > {
error ! ( "Failed to get connection from pool: {e}" ) ;
}
}
}
/// Updates the instance software and version
///
/// TODO: this should be async
/// TODO: if instance has been dead for a long time, it should be checked less frequently
fn update_instance_software ( conn : & mut PgConnection , user_agent : & str ) -> LemmyResult < ( ) > {
async fn update_instance_software (
pool : & mut DbPool < ' _ > ,
client : & ClientWithMiddleware ,
) -> LemmyResult < ( ) > {
info ! ( "Updating instances software and versions..." ) ;
let client = Client ::builder ( )
. user_agent ( user_agent )
. timeout ( REQWEST_TIMEOUT )
. connect_timeout ( REQWEST_TIMEOUT )
. build ( ) ? ;
let instances = instance ::table . get_results ::< Instance > ( conn ) ? ;
for instance in instances {
let node_info_url = format! ( "https://{}/nodeinfo/2.0.json" , instance . domain ) ;
// The `updated` column is used to check if instances are alive. If it is more than three days
// in the past, no outgoing activities will be sent to that instance. However not every
// Fediverse instance has a valid Nodeinfo endpoint (its not required for Activitypub). That's
// why we always need to mark instances as updated if they are alive.
let default_form = InstanceForm ::builder ( )
. domain ( instance . domain . clone ( ) )
. updated ( Some ( naive_now ( ) ) )
. build ( ) ;
let form = match client . get ( & node_info_url ) . send ( ) {
Ok ( res ) if res . status ( ) . is_client_error ( ) = > {
// Instance doesnt have nodeinfo but sent a response, consider it alive
Some ( default_form )
}
Ok ( res ) = > match res . json ::< NodeInfo > ( ) {
Ok ( node_info ) = > {
// Instance sent valid nodeinfo, write it to db
let software = node_info . software . as_ref ( ) ;
Some (
InstanceForm ::builder ( )
. domain ( instance . domain )
. updated ( Some ( naive_now ( ) ) )
. software ( software . and_then ( | s | s . name . clone ( ) ) )
. version ( software . and_then ( | s | s . version . clone ( ) ) )
. build ( ) ,
)
let conn = get_conn ( pool ) . await ;
match conn {
Ok ( mut conn ) = > {
let instances = instance ::table . get_results ::< Instance > ( & mut conn ) . await ? ;
for instance in instances {
let node_info_url = format! ( "https://{}/nodeinfo/2.0.json" , instance . domain ) ;
// The `updated` column is used to check if instances are alive. If it is more than three days
// in the past, no outgoing activities will be sent to that instance. However not every
// Fediverse instance has a valid Nodeinfo endpoint (its not required for Activitypub). That's
// why we always need to mark instances as updated if they are alive.
let default_form = InstanceForm ::builder ( )
. domain ( instance . domain . clone ( ) )
. updated ( Some ( naive_now ( ) ) )
. build ( ) ;
let form = match client . get ( & node_info_url ) . send ( ) . await {
Ok ( res ) if res . status ( ) . is_client_error ( ) = > {
// Instance doesnt have nodeinfo but sent a response, consider it alive
Some ( default_form )
}
Ok ( res ) = > match res . json ::< NodeInfo > ( ) . await {
Ok ( node_info ) = > {
// Instance sent valid nodeinfo, write it to db
let software = node_info . software . as_ref ( ) ;
Some (
InstanceForm ::builder ( )
. domain ( instance . domain )
. updated ( Some ( naive_now ( ) ) )
. software ( software . and_then ( | s | s . name . clone ( ) ) )
. version ( software . and_then ( | s | s . version . clone ( ) ) )
. build ( ) ,
)
}
Err ( _ ) = > {
// No valid nodeinfo but valid HTTP response, consider instance alive
Some ( default_form )
}
} ,
Err ( _ ) = > {
// dead instance, do nothing
None
}
} ;
if let Some ( form ) = form {
diesel ::update ( instance ::table . find ( instance . id ) )
. set ( form )
. execute ( & mut conn )
. await ? ;
}
Err ( _ ) = > {
// No valid nodeinfo but valid HTTP response, consider instance alive
Some ( default_form )
}
} ,
Err ( _ ) = > {
// dead instance, do nothing
None
}
} ;
if let Some ( form ) = form {
diesel ::update ( instance ::table . find ( instance . id ) )
. set ( form )
. execute ( conn ) ? ;
info ! ( "Finished updating instances software and versions..." ) ;
}
Err ( e ) = > {
error ! ( "Failed to get connection from pool: {e}" ) ;
}
}
info ! ( "Finished updating instances software and versions..." ) ;
Ok ( ( ) )
}