diff --git a/Cargo.lock b/Cargo.lock index 5589c610c..8c782c6fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2644,6 +2644,19 @@ dependencies = [ "uuid", ] +[[package]] +name = "lemmy_db_perf" +version = "0.19.0" +dependencies = [ + "clap", + "diesel", + "diesel-async", + "lemmy_db_schema", + "lemmy_db_views", + "lemmy_utils", + "tokio", +] + [[package]] name = "lemmy_db_schema" version = "0.19.0" diff --git a/Cargo.toml b/Cargo.toml index 75d2852c6..6985db965 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,6 +53,7 @@ members = [ "crates/api_common", "crates/apub", "crates/utils", + "crates/db_perf", "crates/db_schema", "crates/db_views", "crates/db_views_actor", @@ -155,6 +156,7 @@ tokio-postgres = "0.7.10" tokio-postgres-rustls = "0.10.0" enum-map = "2.7" moka = { version = "0.12.1", features = ["future"] } +clap = { version = "4.4.11", features = ["derive"] } [dependencies] lemmy_api = { workspace = true } @@ -191,5 +193,5 @@ futures-util = { workspace = true } chrono = { workspace = true } prometheus = { version = "0.13.3", features = ["process"] } serial_test = { workspace = true } -clap = { version = "4.4.11", features = ["derive"] } +clap = { workspace = true } actix-web-prom = "0.7.0" diff --git a/crates/db_perf/Cargo.toml b/crates/db_perf/Cargo.toml new file mode 100644 index 000000000..7aa1566bf --- /dev/null +++ b/crates/db_perf/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "lemmy_db_perf" +version.workspace = true +edition.workspace = true +description.workspace = true +license.workspace = true +homepage.workspace = true +documentation.workspace = true +repository.workspace = true + + +[lints] +workspace = true + +[dependencies] +clap = { workspace = true } +diesel = { workspace = true } +diesel-async = { workspace = true } +lemmy_db_schema = { workspace = true } +lemmy_db_views = { workspace = true, features = ["full"] } +lemmy_utils = { workspace = true } +tokio = { workspace = true } diff --git a/crates/db_perf/src/main.rs b/crates/db_perf/src/main.rs new file mode 100644 index 000000000..5ec4e843c --- /dev/null +++ b/crates/db_perf/src/main.rs @@ -0,0 +1,156 @@ +use clap::{Parser, Subcommand}; +use diesel::{dsl, sql_query, sql_types, ExpressionMethods, IntoSql}; +use diesel_async::RunQueryDsl; +use lemmy_db_schema::{ + schema::post, + source::{ + community::{Community, CommunityInsertForm}, + instance::Instance, + person::{Person, PersonInsertForm}, + }, + traits::Crud, + utils::{ + build_db_pool, + get_conn, + series::{self, ValuesFromSeries}, + }, + SortType, +}; +use lemmy_db_views::{ + post_view::{PaginationCursorData, PostQuery}, + structs::PaginationCursor, +}; +use lemmy_utils::error::LemmyResult; +use std::num::NonZeroU32; + +#[derive(Parser, Debug)] +struct CmdArgs { + #[arg(long, default_value_t = 3.try_into().unwrap())] + communities: NonZeroU32, + #[arg(long, default_value_t = 3.try_into().unwrap())] + people: NonZeroU32, + #[arg(long, default_value_t = 100000.try_into().unwrap())] + posts: NonZeroU32, + #[arg(long)] + read_posts: bool, +} + +#[tokio::main] +async fn main() -> LemmyResult<()> { + let args = CmdArgs::parse(); + let pool = &build_db_pool().await?; + let pool = &mut pool.into(); + + let instance = Instance::read_or_create(pool, "reddit.com".to_owned()).await?; + + println!("🫃 creating {} people", args.people); + let mut person_ids = vec![]; + for i in 0..args.people.get() { + person_ids.push( + Person::create( + pool, + &PersonInsertForm::builder() + .name(format!("p{i}")) + .public_key("pubkey".to_owned()) + .instance_id(instance.id) + .build(), + ) + .await? + .id, + ); + } + + println!("🏠 creating {} communities", args.communities); + let mut community_ids = vec![]; + for i in 0..args.communities.get() { + community_ids.push( + Community::create( + pool, + &CommunityInsertForm::builder() + .name(format!("c{i}")) + .title(i.to_string()) + .instance_id(instance.id) + .build(), + ) + .await? + .id, + ); + } + + let post_batches = args.people.get() * args.communities.get(); + let posts_per_batch = args.posts.get() / post_batches; + let num_posts = post_batches * posts_per_batch; + println!( + "📢 creating {} posts ({} featured in community)", + num_posts, post_batches + ); + let mut num_inserted_posts = 0; + // TODO: progress bar + for person_id in &person_ids { + for community_id in &community_ids { + let n = dsl::insert_into(post::table) + .values(ValuesFromSeries { + start: 1, + stop: posts_per_batch.into(), + selection: ( + "AAAAAAAAAAA".into_sql::(), + person_id.into_sql::(), + community_id.into_sql::(), + series::current_value.eq(1), + ), + }) + .into_columns(( + post::name, + post::creator_id, + post::community_id, + post::featured_community, + )) + .execute(&mut get_conn(pool).await?) + .await?; + num_inserted_posts += n; + } + } + + // Lie detector for the println above + assert_eq!(num_inserted_posts, num_posts as usize); + + // Enable auto_explain + let conn = &mut get_conn(pool).await?; + sql_query("SET auto_explain.log_min_duration = 0") + .execute(conn) + .await?; + let pool = &mut conn.into(); + + if args.read_posts { + let mut page_after = None; + for page_num in 1..=2 { + println!( + "👀 getting page {page_num} of posts (pagination cursor used: {})", + page_after.is_some() + ); + // TODO: include local_user + let post_views = PostQuery { + community_id: community_ids.get(0).cloned(), + sort: Some(SortType::New), + limit: Some(20), + page_after, + ..Default::default() + } + .list(pool) + .await?; + if let Some(post_view) = post_views.into_iter().last() { + println!("👀 getting pagination cursor data for next page "); + let cursor_data = PaginationCursor::after_post(&post_view).read(pool).await?; + page_after = Some(cursor_data); + } else { + break; + } + } + } + + if let Ok(path) = std::env::var("PGDATA") { + println!("🪵 query plans written in {path}/log"); + } + + Ok(()) +} diff --git a/crates/db_schema/src/impls/post.rs b/crates/db_schema/src/impls/post.rs index 4f2f88cb2..cb76dc334 100644 --- a/crates/db_schema/src/impls/post.rs +++ b/crates/db_schema/src/impls/post.rs @@ -41,7 +41,14 @@ use crate::{ }; use ::url::Url; use chrono::{Duration, Utc}; -use diesel::{dsl::insert_into, result::Error, ExpressionMethods, QueryDsl, TextExpressionMethods}; +use diesel::{ + dsl::insert_into, + result::Error, + ExpressionMethods, + Insertable, + QueryDsl, + TextExpressionMethods, +}; use diesel_async::RunQueryDsl; use std::collections::HashSet; diff --git a/crates/db_schema/src/utils.rs b/crates/db_schema/src/utils.rs index 55667f5da..67e005273 100644 --- a/crates/db_schema/src/utils.rs +++ b/crates/db_schema/src/utils.rs @@ -1,3 +1,5 @@ +pub mod series; + use crate::{ diesel::Connection, diesel_migrations::MigrationHarness, diff --git a/crates/db_schema/src/utils/series.rs b/crates/db_schema/src/utils/series.rs new file mode 100644 index 000000000..c8f6831e6 --- /dev/null +++ b/crates/db_schema/src/utils/series.rs @@ -0,0 +1,81 @@ +use diesel::{ + dsl, + expression::{is_aggregate, ValidGrouping}, + pg::Pg, + query_builder::{AsQuery, AstPass, QueryFragment}, + result::Error, + sql_types, + AppearsOnTable, + Expression, + Insertable, + SelectableExpression, +}; + +#[derive(QueryId)] +pub struct ValuesFromSeries { + pub start: i64, + pub stop: i64, + pub selection: S, +} + +impl> QueryFragment for ValuesFromSeries { + fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> Result<(), Error> { + self.selection.walk_ast(out.reborrow())?; + out.push_sql(" FROM generate_series("); + out.push_bind_param::(&self.start)?; + out.push_sql(", "); + out.push_bind_param::(&self.stop)?; + out.push_sql(")"); + + Ok(()) + } +} + +impl Expression for ValuesFromSeries { + type SqlType = S::SqlType; +} + +impl> AppearsOnTable for ValuesFromSeries {} + +impl> SelectableExpression for ValuesFromSeries {} + +impl> Insertable for ValuesFromSeries +where + dsl::BareSelect: AsQuery + Insertable, +{ + type Values = as Insertable>::Values; + + fn values(self) -> Self::Values { + dsl::select(self).values() + } +} + +impl> ValidGrouping<()> + for ValuesFromSeries +{ + type IsAggregate = is_aggregate::No; +} + +#[allow(non_camel_case_types)] +#[derive(QueryId, Clone, Copy, Debug)] +pub struct current_value; + +impl QueryFragment for current_value { + fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> Result<(), Error> { + out.push_identifier("generate_series")?; + + Ok(()) + } +} + +impl Expression for current_value { + type SqlType = sql_types::BigInt; +} + +impl AppearsOnTable for current_value {} + +impl SelectableExpression for current_value {} + +impl ValidGrouping<()> for current_value { + type IsAggregate = is_aggregate::No; +} diff --git a/migrations/2023-12-19-210053_tolerable-batch-insert-speed/down.sql b/migrations/2023-12-19-210053_tolerable-batch-insert-speed/down.sql new file mode 100644 index 000000000..9c1ae769e --- /dev/null +++ b/migrations/2023-12-19-210053_tolerable-batch-insert-speed/down.sql @@ -0,0 +1,42 @@ +CREATE OR REPLACE FUNCTION post_aggregates_post () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + IF (TG_OP = 'INSERT') THEN + INSERT INTO post_aggregates (post_id, published, newest_comment_time, newest_comment_time_necro, community_id, creator_id, instance_id) + SELECT + NEW.id, + NEW.published, + NEW.published, + NEW.published, + NEW.community_id, + NEW.creator_id, + community.instance_id + FROM + community + WHERE + NEW.community_id = community.id; + ELSIF (TG_OP = 'DELETE') THEN + DELETE FROM post_aggregates + WHERE post_id = OLD.id; + END IF; + RETURN NULL; +END +$$; + +CREATE OR REPLACE TRIGGER post_aggregates_post + AFTER INSERT OR DELETE ON post + FOR EACH ROW + EXECUTE PROCEDURE post_aggregates_post (); + +CREATE OR REPLACE FUNCTION generate_unique_changeme () + RETURNS text + LANGUAGE sql + AS $$ + SELECT + 'http://changeme.invalid/' || substr(md5(random()::text), 0, 25); +$$; + +DROP SEQUENCE IF EXISTS changeme_seq; + diff --git a/migrations/2023-12-19-210053_tolerable-batch-insert-speed/up.sql b/migrations/2023-12-19-210053_tolerable-batch-insert-speed/up.sql new file mode 100644 index 000000000..0157e4bb7 --- /dev/null +++ b/migrations/2023-12-19-210053_tolerable-batch-insert-speed/up.sql @@ -0,0 +1,41 @@ +-- Change post_aggregates trigger to run once per statement instead of once per row. +-- The trigger doesn't need to handle deletion because the post_id column has ON DELETE CASCADE. + +CREATE OR REPLACE FUNCTION post_aggregates_post () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + INSERT INTO post_aggregates (post_id, published, newest_comment_time, newest_comment_time_necro, community_id, creator_id, instance_id) + SELECT + new_post.id, + new_post.published, + new_post.published, + new_post.published, + new_post.community_id, + new_post.creator_id, + (SELECT community.instance_id FROM community WHERE community.id = new_post.community_id LIMIT 1) + FROM + new_post; + RETURN NULL; +END +$$; + +CREATE OR REPLACE TRIGGER post_aggregates_post + AFTER INSERT ON post + REFERENCING NEW TABLE AS new_post + FOR EACH STATEMENT + EXECUTE PROCEDURE post_aggregates_post (); + +-- Avoid running hash function and random number generation for default ap_id + +CREATE SEQUENCE IF NOT EXISTS changeme_seq AS bigint CYCLE; + +CREATE OR REPLACE FUNCTION generate_unique_changeme () + RETURNS text + LANGUAGE sql + AS $$ + SELECT + 'http://changeme.invalid/seq/' || nextval('changeme_seq')::text; +$$; + diff --git a/scripts/db_perf.sh b/scripts/db_perf.sh new file mode 100755 index 000000000..9d06d5175 --- /dev/null +++ b/scripts/db_perf.sh @@ -0,0 +1,17 @@ +#!/usr/bin/env bash +set -e + +CWD="$(cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")" && pwd -P)" + +cd $CWD/../ + +source scripts/start_dev_db.sh + +export LEMMY_CONFIG_LOCATION=config/config.hjson +export RUST_BACKTRACE=1 + +cargo run --package lemmy_db_perf -- "$@" + +pg_ctl stop --silent + +# $PGDATA directory is kept so log can be seen diff --git a/scripts/start_dev_db.sh b/scripts/start_dev_db.sh index 31d3e0d3f..839b68b5e 100644 --- a/scripts/start_dev_db.sh +++ b/scripts/start_dev_db.sh @@ -11,7 +11,7 @@ then (pg_ctl status > /dev/null) || pg_status_exit_code=$? if [[ ${pg_status_exit_code} -ne 3 ]] then - pg_ctl stop + pg_ctl stop --silent fi rm -rf $PGDATA @@ -25,27 +25,23 @@ config_args=( # Write logs to a file in $PGDATA/log -c logging_collector=on - # Log all query plans by default + # Allow auto_explain to be turned on -c session_preload_libraries=auto_explain - -c auto_explain.log_min_duration=0 + #-c auto_explain.log_min_duration=0 # Include actual row amounts and run times for query plan nodes -c auto_explain.log_analyze=on - # Avoid sequential scans so query plans show what index scans can be done - # (index scan is normally avoided in some cases, such as the table being small enough) - -c enable_seqscan=off - # Don't log parameter values -c auto_explain.log_parameter_max_length=0 ) # Create cluster -initdb --username=postgres --auth=trust --no-instructions +pg_ctl init --silent --options="--username=postgres --auth=trust --no-instructions" # Start server that only listens to socket in current directory -pg_ctl start --options="${config_args[*]}" +pg_ctl start --silent --options="${config_args[*]}" # Setup database -psql -c "CREATE USER lemmy WITH PASSWORD 'password' SUPERUSER;" -U postgres -psql -c "CREATE DATABASE lemmy WITH OWNER lemmy;" -U postgres +psql --quiet -c "CREATE USER lemmy WITH PASSWORD 'password' SUPERUSER;" -U postgres +psql --quiet -c "CREATE DATABASE lemmy WITH OWNER lemmy;" -U postgres diff --git a/scripts/test.sh b/scripts/test.sh index 7a8bd1c29..efe9b1513 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -27,4 +27,5 @@ cargo test -p lemmy_utils --all-features --no-fail-fast # Add this to do printlns: -- --nocapture -pg_ctl stop +pg_ctl stop --silent +rm -rf $PGDATA