Using unpooled connections in scheduler.

This commit is contained in:
Dessalines 2021-08-20 21:28:33 -04:00
parent 1ccae69a47
commit 08a66627db
4 changed files with 22 additions and 20 deletions

View File

@ -13,7 +13,7 @@ popd
yarn yarn
yarn api-test || true yarn api-test || true
killall lemmy_server killall -9 lemmy_server
for INSTANCE in lemmy_alpha lemmy_beta lemmy_gamma lemmy_delta lemmy_epsilon; do for INSTANCE in lemmy_alpha lemmy_beta lemmy_gamma lemmy_delta lemmy_epsilon; do
psql "$LEMMY_DATABASE_URL" -c "DROP DATABASE $INSTANCE" psql "$LEMMY_DATABASE_URL" -c "DROP DATABASE $INSTANCE"

View File

@ -283,8 +283,12 @@ pub fn establish_unpooled_connection() -> PgConnection {
e e
), ),
}; };
establish_unpooled_connection_with_db_url(&db_url)
}
pub fn establish_unpooled_connection_with_db_url(db_url: &str) -> PgConnection {
let conn = let conn =
PgConnection::establish(&db_url).unwrap_or_else(|_| panic!("Error connecting to {}", db_url)); PgConnection::establish(db_url).unwrap_or_else(|_| panic!("Error connecting to {}", db_url));
embedded_migrations::run(&conn).expect("load migrations"); embedded_migrations::run(&conn).expect("load migrations");
conn conn
} }

View File

@ -33,7 +33,7 @@ async fn main() -> Result<(), LemmyError> {
Err(_) => settings.get_database_url(), Err(_) => settings.get_database_url(),
}; };
let manager = Manager::new(db_url); let manager = Manager::new(&db_url);
let pool = Pool::new(manager, settings.database.pool_size); let pool = Pool::new(manager, settings.database.pool_size);
let conn = &pool.get().await?; let conn = &pool.get().await?;
@ -41,13 +41,7 @@ async fn main() -> Result<(), LemmyError> {
embedded_migrations::run(conn)?; embedded_migrations::run(conn)?;
run_advanced_migrations(conn)?; run_advanced_migrations(conn)?;
// TODO can't move the pool into clokwerk, it doesn't yet support async thread::spawn(move || scheduled_tasks::setup(&db_url));
let c1 = pool.get().await?;
let c2 = pool.get().await?;
let c3 = pool.get().await?;
thread::spawn(move || {
scheduled_tasks::setup(c1, c2, c3);
});
// Set up the rate limiter // Set up the rate limiter
let rate_limiter = RateLimit { let rate_limiter = RateLimit {

View File

@ -2,28 +2,32 @@
use clokwerk::{Scheduler, TimeUnits}; use clokwerk::{Scheduler, TimeUnits};
// Import week days and WeekDay // Import week days and WeekDay
use diesel::{sql_query, PgConnection, RunQueryDsl}; use diesel::{sql_query, PgConnection, RunQueryDsl};
use lemmy_db_queries::source::activity::Activity_; use lemmy_db_queries::{establish_unpooled_connection_with_db_url, source::activity::Activity_};
use lemmy_db_schema::source::activity::Activity; use lemmy_db_schema::source::activity::Activity;
use log::info; use log::info;
use std::{thread, time::Duration}; use std::{thread, time::Duration};
type DeadpoolPgConnection = deadpool_diesel::Connection<PgConnection>;
/// Schedules various cleanup tasks for lemmy in a background thread /// Schedules various cleanup tasks for lemmy in a background thread
pub fn setup(c1: DeadpoolPgConnection, c2: DeadpoolPgConnection, c3: DeadpoolPgConnection) { pub fn setup(db_url: &str) {
let mut scheduler = Scheduler::new(); let mut scheduler = Scheduler::new();
active_counts(&c1); let conn = &establish_unpooled_connection_with_db_url(db_url);
reindex_aggregates_tables(&c1);
clear_old_activities(&c1);
active_counts(conn);
reindex_aggregates_tables(conn);
clear_old_activities(conn);
let db_url2 = db_url.to_owned();
scheduler.every(1.hour()).run(move || { scheduler.every(1.hour()).run(move || {
active_counts(&c2); let conn = &establish_unpooled_connection_with_db_url(&db_url2);
reindex_aggregates_tables(&c2); active_counts(conn);
reindex_aggregates_tables(conn);
}); });
let db_url3 = db_url.to_owned();
scheduler.every(1.weeks()).run(move || { scheduler.every(1.weeks()).run(move || {
clear_old_activities(&c3); let conn = &establish_unpooled_connection_with_db_url(&db_url3);
clear_old_activities(conn);
}); });
// Manually run the scheduler in an event loop // Manually run the scheduler in an event loop