Beginning work on adding tokio-diesel. #1684

feature/tokio_diesel
Dessalines 3 years ago
parent b8d7f00d58
commit f6926f8af2

166
Cargo.lock generated

@ -36,7 +36,7 @@ dependencies = [
"actix-rt",
"actix_derive",
"bitflags",
"bytes",
"bytes 1.0.1",
"crossbeam-channel",
"futures-core",
"futures-sink",
@ -45,9 +45,9 @@ dependencies = [
"log",
"once_cell",
"parking_lot",
"pin-project-lite",
"pin-project-lite 0.2.7",
"smallvec",
"tokio",
"tokio 1.8.0",
"tokio-util",
]
@ -58,12 +58,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d5dbeb2d9e51344cb83ca7cc170f1217f9fe25bfc50160e6e200b5c31c1019a"
dependencies = [
"bitflags",
"bytes",
"bytes 1.0.1",
"futures-core",
"futures-sink",
"log",
"pin-project-lite",
"tokio",
"pin-project-lite 0.2.7",
"tokio 1.8.0",
"tokio-util",
]
@ -81,7 +81,7 @@ dependencies = [
"ahash 0.7.4",
"base64 0.13.0",
"bitflags",
"bytes",
"bytes 1.0.1",
"bytestring",
"derive_more",
"encoding_rs",
@ -98,14 +98,14 @@ dependencies = [
"once_cell",
"percent-encoding",
"pin-project",
"pin-project-lite",
"pin-project-lite 0.2.7",
"rand 0.8.4",
"regex",
"serde",
"sha-1 0.9.6",
"smallvec",
"time 0.2.27",
"tokio",
"tokio 1.8.0",
]
[[package]]
@ -139,7 +139,7 @@ checksum = "bc7d7cd957c9ed92288a7c3c96af81fa5291f65247a76a34dac7b6af74e52ba0"
dependencies = [
"actix-macros",
"futures-core",
"tokio",
"tokio 1.8.0",
]
[[package]]
@ -156,7 +156,7 @@ dependencies = [
"mio",
"num_cpus",
"slab",
"tokio",
"tokio 1.8.0",
]
[[package]]
@ -167,7 +167,7 @@ checksum = "77f5f9d66a8730d0fae62c26f3424f5751e5518086628a40b7ab6fca4a705034"
dependencies = [
"futures-core",
"paste",
"pin-project-lite",
"pin-project-lite 0.2.7",
]
[[package]]
@ -196,7 +196,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e491cbaac2e7fc788dfff99ff48ef317e23b3cf63dbaf7aaab6418f40f92aa94"
dependencies = [
"local-waker",
"pin-project-lite",
"pin-project-lite 0.2.7",
]
[[package]]
@ -216,7 +216,7 @@ dependencies = [
"actix-utils",
"actix-web-codegen",
"ahash 0.7.4",
"bytes",
"bytes 1.0.1",
"cfg-if",
"cookie",
"derive_more",
@ -251,11 +251,11 @@ dependencies = [
"actix-codec",
"actix-http",
"actix-web",
"bytes",
"bytes 1.0.1",
"bytestring",
"futures-core",
"pin-project",
"tokio",
"tokio 1.8.0",
]
[[package]]
@ -393,7 +393,7 @@ dependencies = [
"actix-rt",
"actix-service",
"base64 0.13.0",
"bytes",
"bytes 1.0.1",
"cfg-if",
"derive_more",
"futures-core",
@ -401,7 +401,7 @@ dependencies = [
"log",
"mime",
"percent-encoding",
"pin-project-lite",
"pin-project-lite 0.2.7",
"rand 0.8.4",
"serde",
"serde_json",
@ -435,7 +435,7 @@ dependencies = [
"serde",
"serde_json",
"thiserror",
"tokio",
"tokio 1.8.0",
"uuid",
]
@ -454,7 +454,7 @@ dependencies = [
"serde",
"serde_json",
"thiserror",
"tokio",
"tokio 1.8.0",
"uuid",
]
@ -585,6 +585,12 @@ version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "bytes"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38"
[[package]]
name = "bytes"
version = "1.0.1"
@ -597,7 +603,7 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90706ba19e97b90786e19dc0d5e2abd80008d99d4c0c5d1ad0b5e72cec7c494d"
dependencies = [
"bytes",
"bytes 1.0.1",
]
[[package]]
@ -1176,7 +1182,7 @@ dependencies = [
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-project-lite 0.2.7",
"pin-utils",
"proc-macro-hack",
"proc-macro-nested",
@ -1252,7 +1258,7 @@ version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "825343c4eef0b63f541f8903f395dc5beb362a979b5799a84062527ef1e37726"
dependencies = [
"bytes",
"bytes 1.0.1",
"fnv",
"futures-core",
"futures-sink",
@ -1260,7 +1266,7 @@ dependencies = [
"http",
"indexmap",
"slab",
"tokio",
"tokio 1.8.0",
"tokio-util",
"tracing",
]
@ -1321,7 +1327,7 @@ version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "527e8c9ac747e28542699a951517aa9a6945af506cd1f2e1b53a576c17b6cc11"
dependencies = [
"bytes",
"bytes 1.0.1",
"fnv",
"itoa",
]
@ -1332,9 +1338,9 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60daa14be0e0786db0f03a9e57cb404c9d756eed2b6c62b9ea98ec5743ec75a9"
dependencies = [
"bytes",
"bytes 1.0.1",
"http",
"pin-project-lite",
"pin-project-lite 0.2.7",
]
[[package]]
@ -1371,7 +1377,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a1e24f3c8b2c8b5eddb82d4cdf07dafe01f5b87f92b81a369dd520a107d33e8"
dependencies = [
"base64 0.13.0",
"bytes",
"bytes 1.0.1",
"chrono",
"futures",
"http",
@ -1379,7 +1385,7 @@ dependencies = [
"reqwest",
"sha2",
"thiserror",
"tokio",
"tokio 1.8.0",
]
[[package]]
@ -1406,7 +1412,7 @@ version = "0.14.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07d6baa1b441335f3ce5098ac421fb6547c46dda735ca1bc6d0153c838f9dd83"
dependencies = [
"bytes",
"bytes 1.0.1",
"futures-channel",
"futures-core",
"futures-util",
@ -1416,9 +1422,9 @@ dependencies = [
"httparse",
"httpdate",
"itoa",
"pin-project-lite",
"pin-project-lite 0.2.7",
"socket2",
"tokio",
"tokio 1.8.0",
"tower-service",
"tracing",
"want",
@ -1430,10 +1436,10 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905"
dependencies = [
"bytes",
"bytes 1.0.1",
"hyper",
"native-tls",
"tokio",
"tokio 1.8.0",
"tokio-native-tls",
]
@ -1597,7 +1603,7 @@ dependencies = [
"strum",
"strum_macros",
"thiserror",
"tokio",
"tokio 1.8.0",
"url",
"uuid",
]
@ -1660,7 +1666,8 @@ dependencies = [
"strum",
"strum_macros",
"thiserror",
"tokio",
"tokio 1.8.0",
"tokio-diesel 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"url",
"uuid",
]
@ -1708,7 +1715,7 @@ dependencies = [
"strum",
"strum_macros",
"thiserror",
"tokio",
"tokio 1.8.0",
"url",
"uuid",
]
@ -1757,6 +1764,8 @@ dependencies = [
"sha2",
"strum",
"strum_macros",
"tokio 1.8.0",
"tokio-diesel 0.3.0 (git+https://github.com/mehcode/tokio-diesel)",
"url",
]
@ -1867,7 +1876,7 @@ dependencies = [
"serde",
"serde_json",
"strum",
"tokio",
"tokio 1.8.0",
"url",
]
@ -1901,7 +1910,7 @@ dependencies = [
"strum",
"strum_macros",
"thiserror",
"tokio",
"tokio 1.8.0",
"url",
]
@ -1927,7 +1936,7 @@ dependencies = [
"serde_json",
"strum",
"strum_macros",
"tokio",
"tokio 1.8.0",
]
[[package]]
@ -2399,6 +2408,12 @@ dependencies = [
"syn 1.0.73",
]
[[package]]
name = "pin-project-lite"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "257b64915a082f7811703966789728173279bdebb956b143dbcd23f6f970a777"
[[package]]
name = "pin-project-lite"
version = "0.2.7"
@ -2697,7 +2712,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "246e9f61b9bb77df069a947682be06e31ac43ea37862e244a69f177694ea6d22"
dependencies = [
"base64 0.13.0",
"bytes",
"bytes 1.0.1",
"encoding_rs",
"futures-core",
"futures-util",
@ -2712,11 +2727,11 @@ dependencies = [
"mime",
"native-tls",
"percent-encoding",
"pin-project-lite",
"pin-project-lite 0.2.7",
"serde",
"serde_json",
"serde_urlencoded",
"tokio",
"tokio 1.8.0",
"tokio-native-tls",
"url",
"wasm-bindgen",
@ -3265,6 +3280,17 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]]
name = "tokio"
version = "0.2.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6703a273949a90131b290be1fe7b039d0fc884aa1935860dfcbe056f28cd8092"
dependencies = [
"bytes 0.5.6",
"pin-project-lite 0.1.12",
"slab",
]
[[package]]
name = "tokio"
version = "1.8.0"
@ -3272,17 +3298,55 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "570c2eb13b3ab38208130eccd41be92520388791207fde783bda7c1e8ace28d4"
dependencies = [
"autocfg",
"bytes",
"bytes 1.0.1",
"libc",
"memchr",
"mio",
"num_cpus",
"once_cell",
"parking_lot",
"pin-project-lite",
"pin-project-lite 0.2.7",
"signal-hook-registry",
"tokio-macros",
"winapi",
]
[[package]]
name = "tokio-diesel"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "713f06058e12ed5adb542401b9d20f2b18b3fccf6184c5d77b57a31b3d21cc69"
dependencies = [
"async-trait",
"diesel",
"futures",
"r2d2",
"tokio 0.2.25",
]
[[package]]
name = "tokio-diesel"
version = "0.3.0"
source = "git+https://github.com/mehcode/tokio-diesel#f4af42558246ab323600622ba8d08803d3c18842"
dependencies = [
"async-trait",
"diesel",
"futures",
"r2d2",
"tokio 1.8.0",
]
[[package]]
name = "tokio-macros"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54473be61f4ebe4efd09cec9bd5d16fa51d70ea0192213d754d2d500457db110"
dependencies = [
"proc-macro2 1.0.27",
"quote 1.0.9",
"syn 1.0.73",
]
[[package]]
name = "tokio-native-tls"
version = "0.3.0"
@ -3290,7 +3354,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7d995660bd2b7f8c1568414c1126076c13fbb725c40112dc0120b78eb9b717b"
dependencies = [
"native-tls",
"tokio",
"tokio 1.8.0",
]
[[package]]
@ -3300,7 +3364,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6"
dependencies = [
"rustls",
"tokio",
"tokio 1.8.0",
"webpki",
]
@ -3310,12 +3374,12 @@ version = "0.6.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1caa0b0c8d94a049db56b5acf8cba99dc0623aab1b26d5b5f5e2d945846b3592"
dependencies = [
"bytes",
"bytes 1.0.1",
"futures-core",
"futures-sink",
"log",
"pin-project-lite",
"tokio",
"pin-project-lite 0.2.7",
"tokio 1.8.0",
]
[[package]]
@ -3340,7 +3404,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09adeb8c97449311ccd28a427f96fb563e7fd31aabf994189879d9da2394b89d"
dependencies = [
"cfg-if",
"pin-project-lite",
"pin-project-lite 0.2.7",
"tracing-core",
]

@ -42,3 +42,4 @@ anyhow = "1.0.41"
thiserror = "1.0.26"
background-jobs = "0.9.0"
reqwest = { version = "0.11.4", features = ["json"] }
tokio-diesel = "0.3.0"

@ -24,6 +24,8 @@ url = { version = "2.2.2", features = ["serde"] }
lazy_static = "1.4.0"
regex = "1.5.4"
bcrypt = "0.10.0"
tokio = { version = "1.8.0", features = ["rt", "macros"] }
tokio-diesel = { git = "https://github.com/mehcode/tokio-diesel" }
[dev-dependencies]
serial_test = "0.5.1"

@ -14,10 +14,10 @@ pub struct CommentAggregates {
}
impl CommentAggregates {
pub fn read(conn: &PgConnection, comment_id: CommentId) -> Result<Self, Error> {
pub fn read(pool: &PgConnection, comment_id: CommentId) -> Result<Self, Error> {
comment_aggregates::table
.filter(comment_aggregates::comment_id.eq(comment_id))
.first::<Self>(conn)
.first::<Self>(pool)
}
}
@ -25,7 +25,7 @@ impl CommentAggregates {
mod tests {
use crate::{
aggregates::comment_aggregates::CommentAggregates,
establish_unpooled_connection,
setup_connection_pool_for_tests,
Crud,
Likeable,
};
@ -40,21 +40,21 @@ mod tests {
#[test]
#[serial]
fn test_crud() {
let conn = establish_unpooled_connection();
let pool = setup_connection_pool_for_tests();
let new_person = PersonForm {
name: "thommy_comment_agg".into(),
..PersonForm::default()
};
let inserted_person = Person::create(&conn, &new_person).unwrap();
let inserted_person = Person::create(&pool, &new_person).unwrap();
let another_person = PersonForm {
name: "jerry_comment_agg".into(),
..PersonForm::default()
};
let another_inserted_person = Person::create(&conn, &another_person).unwrap();
let another_inserted_person = Person::create(&pool, &another_person).unwrap();
let new_community = CommunityForm {
name: "TIL_comment_agg".into(),
@ -62,7 +62,7 @@ mod tests {
..CommunityForm::default()
};
let inserted_community = Community::create(&conn, &new_community).unwrap();
let inserted_community = Community::create(&pool, &new_community).unwrap();
let new_post = PostForm {
name: "A test post".into(),
@ -71,7 +71,7 @@ mod tests {
..PostForm::default()
};
let inserted_post = Post::create(&conn, &new_post).unwrap();
let inserted_post = Post::create(&pool, &new_post).unwrap();
let comment_form = CommentForm {
content: "A test comment".into(),
@ -80,7 +80,7 @@ mod tests {
..CommentForm::default()
};
let inserted_comment = Comment::create(&conn, &comment_form).unwrap();
let inserted_comment = Comment::create(&pool, &comment_form).unwrap();
let child_comment_form = CommentForm {
content: "A test comment".into(),
@ -90,7 +90,7 @@ mod tests {
..CommentForm::default()
};
let _inserted_child_comment = Comment::create(&conn, &child_comment_form).unwrap();
let _inserted_child_comment = Comment::create(&pool, &child_comment_form).unwrap();
let comment_like = CommentLikeForm {
comment_id: inserted_comment.id,
@ -99,9 +99,9 @@ mod tests {
score: 1,
};
CommentLike::like(&conn, &comment_like).unwrap();
CommentLike::like(&pool, &comment_like).unwrap();
let comment_aggs_before_delete = CommentAggregates::read(&conn, inserted_comment.id).unwrap();
let comment_aggs_before_delete = CommentAggregates::read(&pool, inserted_comment.id).unwrap();
assert_eq!(1, comment_aggs_before_delete.score);
assert_eq!(1, comment_aggs_before_delete.upvotes);
@ -115,35 +115,35 @@ mod tests {
score: -1,
};
CommentLike::like(&conn, &comment_dislike).unwrap();
CommentLike::like(&pool, &comment_dislike).unwrap();
let comment_aggs_after_dislike = CommentAggregates::read(&conn, inserted_comment.id).unwrap();
let comment_aggs_after_dislike = CommentAggregates::read(&pool, inserted_comment.id).unwrap();
assert_eq!(0, comment_aggs_after_dislike.score);
assert_eq!(1, comment_aggs_after_dislike.upvotes);
assert_eq!(1, comment_aggs_after_dislike.downvotes);
// Remove the first comment like
CommentLike::remove(&conn, inserted_person.id, inserted_comment.id).unwrap();
let after_like_remove = CommentAggregates::read(&conn, inserted_comment.id).unwrap();
CommentLike::remove(&pool, inserted_person.id, inserted_comment.id).unwrap();
let after_like_remove = CommentAggregates::read(&pool, inserted_comment.id).unwrap();
assert_eq!(-1, after_like_remove.score);
assert_eq!(0, after_like_remove.upvotes);
assert_eq!(1, after_like_remove.downvotes);
// Remove the parent post
Post::delete(&conn, inserted_post.id).unwrap();
Post::delete(&pool, inserted_post.id).unwrap();
// Should be none found, since the post was deleted
let after_delete = CommentAggregates::read(&conn, inserted_comment.id);
let after_delete = CommentAggregates::read(&pool, inserted_comment.id);
assert!(after_delete.is_err());
// This should delete all the associated rows, and fire triggers
Person::delete(&conn, another_inserted_person.id).unwrap();
let person_num_deleted = Person::delete(&conn, inserted_person.id).unwrap();
Person::delete(&pool, another_inserted_person.id).unwrap();
let person_num_deleted = Person::delete(&pool, inserted_person.id).unwrap();
assert_eq!(1, person_num_deleted);
// Delete the community
let community_num_deleted = Community::delete(&conn, inserted_community.id).unwrap();
let community_num_deleted = Community::delete(&pool, inserted_community.id).unwrap();
assert_eq!(1, community_num_deleted);
}
}

@ -12,30 +12,34 @@ extern crate diesel_migrations;
#[cfg(test)]
extern crate serial_test;
use diesel::{result::Error, *};
use diesel::{*, r2d2::{ConnectionManager, Pool}, result::Error};
use lemmy_db_schema::{CommunityId, DbUrl, PersonId};
use lemmy_utils::ApiError;
use lemmy_utils::{ApiError, settings::structs::Settings};
use regex::Regex;
use serde::{Deserialize, Serialize};
use std::{env, env::VarError};
use url::Url;
use core::pin::Pin;
use core::future::Future;
use tokio_diesel::*;
pub mod aggregates;
pub mod source;
pub type DbPool = diesel::r2d2::Pool<diesel::r2d2::ConnectionManager<diesel::PgConnection>>;
pub type TokioDieselFuture<'a, T>= Pin<Box<dyn Future<Output = Result<T, AsyncError>> + Send + 'a>>;
pub trait Crud<Form, IdType> {
fn create(conn: &PgConnection, form: &Form) -> Result<Self, Error>
pub trait Crud<'a, Form, IdType> {
fn create(pool: &'a DbPool, form: &'a Form) -> TokioDieselFuture<'a, Self>
where
Self: Sized;
fn read(conn: &PgConnection, id: IdType) -> Result<Self, Error>
fn read(pool: &'a DbPool, id: IdType) -> TokioDieselFuture<'a, Self>
where
Self: Sized;
fn update(conn: &PgConnection, id: IdType, form: &Form) -> Result<Self, Error>
fn update(pool: &'a DbPool, id: IdType, form: &'a Form) -> TokioDieselFuture<'a, Self>
where
Self: Sized;
fn delete(_conn: &PgConnection, _id: IdType) -> Result<usize, Error>
fn delete(_pool: &'a DbPool, _id: IdType) -> TokioDieselFuture<'a, usize>
where
Self: Sized,
{
@ -43,76 +47,76 @@ pub trait Crud<Form, IdType> {
}
}
pub trait Followable<Form> {
fn follow(conn: &PgConnection, form: &Form) -> Result<Self, Error>
pub trait Followable<'a, Form> {
fn follow(pool: &'a DbPool, form: &Form) -> TokioDieselFuture<'a, Self>
where
Self: Sized;
fn follow_accepted(
conn: &PgConnection,
pool: &'a DbPool,
community_id: CommunityId,
person_id: PersonId,
) -> Result<Self, Error>
) -> TokioDieselFuture<'a, Self>
where
Self: Sized;
fn unfollow(conn: &PgConnection, form: &Form) -> Result<usize, Error>
fn unfollow(pool: &'a DbPool, form: &Form) -> TokioDieselFuture<'a, usize>
where
Self: Sized;
fn has_local_followers(conn: &PgConnection, community_id: CommunityId) -> Result<bool, Error>;
fn has_local_followers(pool: &'a DbPool, community_id: CommunityId) -> Result<bool, Error>;
}
pub trait Joinable<Form> {
fn join(conn: &PgConnection, form: &Form) -> Result<Self, Error>
pub trait Joinable<'a, Form> {
fn join(pool: &'a DbPool, form: &Form) -> TokioDieselFuture<'a, Self>
where
Self: Sized;
fn leave(conn: &PgConnection, form: &Form) -> Result<usize, Error>
fn leave(pool: &'a DbPool, form: &Form) -> TokioDieselFuture<'a, usize>
where
Self: Sized;
}
pub trait Likeable<Form, IdType> {
fn like(conn: &PgConnection, form: &Form) -> Result<Self, Error>
pub trait Likeable<'a, Form, IdType> {
fn like(pool: &'a DbPool, form: &'a Form) -> TokioDieselFuture<'a, Self>
where
Self: Sized;
fn remove(conn: &PgConnection, person_id: PersonId, item_id: IdType) -> Result<usize, Error>
fn remove(pool: &'a DbPool, person_id: PersonId, item_id: IdType) -> TokioDieselFuture<'a, usize>
where
Self: Sized;
}
pub trait Bannable<Form> {
fn ban(conn: &PgConnection, form: &Form) -> Result<Self, Error>
pub trait Bannable<'a, Form> {
fn ban(pool: &'a DbPool, form: &'a Form) -> TokioDieselFuture<'a, Self>
where
Self: Sized;
fn unban(conn: &PgConnection, form: &Form) -> Result<usize, Error>
fn unban(pool: &'a DbPool, form: &'a Form) -> TokioDieselFuture<'a, usize>
where
Self: Sized;
}
pub trait Saveable<Form> {
fn save(conn: &PgConnection, form: &Form) -> Result<Self, Error>
pub trait Saveable<'a, Form> {
fn save(pool: &'a DbPool, form: &'a Form) -> TokioDieselFuture<'a, Self>
where
Self: Sized;
fn unsave(conn: &PgConnection, form: &Form) -> Result<usize, Error>
fn unsave(pool: &'a DbPool, form: &'a Form) -> TokioDieselFuture<'a, usize>
where
Self: Sized;
}
pub trait Readable<Form> {
fn mark_as_read(conn: &PgConnection, form: &Form) -> Result<Self, Error>
pub trait Readable<'a, Form> {
fn mark_as_read(pool: &'a DbPool, form: &'a Form) -> TokioDieselFuture<'a, Self>
where
Self: Sized;
fn mark_as_unread(conn: &PgConnection, form: &Form) -> Result<usize, Error>
fn mark_as_unread(pool: &'a DbPool, form: &'a Form) -> TokioDieselFuture<'a, usize>
where
Self: Sized;
}
pub trait Reportable<Form> {
fn report(conn: &PgConnection, form: &Form) -> Result<Self, Error>
pub trait Reportable<'a, Form> {
fn report(pool: &'a DbPool, form: &'a Form) -> TokioDieselFuture<'a, Self>
where
Self: Sized;
fn resolve(conn: &PgConnection, report_id: i32, resolver_id: PersonId) -> Result<usize, Error>
fn resolve(pool: &'a DbPool, report_id: i32, resolver_id: PersonId) -> TokioDieselFuture<'a, usize>
where
Self: Sized;
fn unresolve(conn: &PgConnection, report_id: i32, resolver_id: PersonId) -> Result<usize, Error>
fn unresolve(pool: &'a DbPool, report_id: i32, resolver_id: PersonId) -> TokioDieselFuture<'a, usize>
where
Self: Sized;
}
@ -121,11 +125,11 @@ pub trait DeleteableOrRemoveable {
fn blank_out_deleted_or_removed_info(self) -> Self;
}
pub trait ApubObject<Form> {
fn read_from_apub_id(conn: &PgConnection, object_id: &DbUrl) -> Result<Self, Error>
pub trait ApubObject<'a, Form> {
fn read_from_apub_id(pool: &'a DbPool, object_id: &'a DbUrl) -> TokioDieselFuture<'a, Self>
where
Self: Sized;
fn upsert(conn: &PgConnection, user_form: &Form) -> Result<Self, Error>
fn upsert(pool: &'a DbPool, user_form: &'a Form) -> TokioDieselFuture<'a, Self>
where
Self: Sized;
}
@ -249,7 +253,17 @@ pub fn diesel_option_overwrite_to_url(
embed_migrations!();
pub fn establish_unpooled_connection() -> PgConnection {
/// Set up the r2d2 connection pool
pub fn setup_connection_pool() -> DbPool {
let db_url = match get_database_url_from_env() {
Ok(url) => url,
Err(_) => Settings::get().get_database_url(),
};
build_connection_pool(&db_url, Settings::get().database().pool_size())
}
/// Set up the r2d2 connection pool for tests
pub fn setup_connection_pool_for_tests() -> DbPool {
let db_url = match get_database_url_from_env() {
Ok(url) => url,
Err(e) => panic!(
@ -257,10 +271,21 @@ pub fn establish_unpooled_connection() -> PgConnection {
e
),
};
let conn =
PgConnection::establish(&db_url).unwrap_or_else(|_| panic!("Error connecting to {}", db_url));
build_connection_pool(&db_url, 10)
}
fn build_connection_pool(db_url: &str, pool_size: u32) -> DbPool {
let manager = ConnectionManager::<PgConnection>::new(db_url);
let pool = Pool::builder()
.max_size(pool_size)
.build(manager)
.unwrap_or_else(|_| panic!("Error connecting to {}", db_url));
let conn = pool.get().expect("Missing connection in pool");
// Run the migrations
embedded_migrations::run(&conn).expect("load migrations");
conn
pool
}
lazy_static! {

@ -1,4 +1,5 @@
use crate::{ApubObject, Crud, DeleteableOrRemoveable, Likeable, Readable, Saveable};
use crate::{ApubObject, Crud, DbPool, TokioDieselFuture, DeleteableOrRemoveable, Likeable, Readable, Saveable};
use tokio_diesel::*;
use diesel::{dsl::*, result::Error, *};
use lemmy_db_schema::{
naive_now,
@ -18,86 +19,85 @@ use lemmy_db_schema::{
PostId,
};
impl Crud<PostForm, PostId> for Post {
fn read(conn: &PgConnection, post_id: PostId) -> Result<Self, Error> {
impl<'a> Crud<'a, PostForm, PostId> for Post {
fn read(pool: &'a DbPool, post_id: PostId) -> TokioDieselFuture<'a, Self> {
use lemmy_db_schema::schema::post::dsl::*;
post.find(post_id).first::<Self>(conn)
post.find(post_id).first_async(pool)
}
fn delete(conn: &PgConnection, post_id: PostId) -> Result<usize, Error> {
fn delete(pool: &'a DbPool, post_id: PostId) -> TokioDieselFuture<'a, usize> {
use lemmy_db_schema::schema::post::dsl::*;
diesel::delete(post.find(post_id)).execute(conn)
diesel::delete(post.find(post_id)).execute_async(pool)
}
fn create(conn: &PgConnection, new_post: &PostForm) -> Result<Self, Error> {
fn create(pool: &'a DbPool, new_post: &'a PostForm) -> TokioDieselFuture<'a, Self> {
use lemmy_db_schema::schema::post::dsl::*;
insert_into(post).values(new_post).get_result::<Self>(conn)
insert_into(post).values(new_post).get_result_async(pool)
}
fn update(conn: &PgConnection, post_id: PostId, new_post: &PostForm) -> Result<Self, Error> {
fn update(pool: &'a DbPool, post_id: PostId, new_post: &'a PostForm) -> TokioDieselFuture<'a, Self> {
use lemmy_db_schema::schema::post::dsl::*;
diesel::update(post.find(post_id))
.set(new_post)
.get_result::<Self>(conn)
.get_result_async(pool)
}
}
pub trait Post_ {
//fn read(conn: &PgConnection, post_id: i32) -> Result<Post, Error>;
pub trait Post_<'a>{
fn list_for_community(
conn: &PgConnection,
pool: &'a DbPool,
the_community_id: CommunityId,
) -> Result<Vec<Post>, Error>;
fn update_ap_id(conn: &PgConnection, post_id: PostId, apub_id: DbUrl) -> Result<Post, Error>;
) -> TokioDieselFuture<'a, Vec<Post>>;
fn update_ap_id(pool: &'a DbPool, post_id: PostId, apub_id: DbUrl) -> TokioDieselFuture<'a, Post>;
fn permadelete_for_creator(
conn: &PgConnection,
pool: &'a DbPool,
for_creator_id: PersonId,
) -> Result<Vec<Post>, Error>;
fn update_deleted(conn: &PgConnection, post_id: PostId, new_deleted: bool)
-> Result<Post, Error>;
fn update_removed(conn: &PgConnection, post_id: PostId, new_removed: bool)
-> Result<Post, Error>;
) -> TokioDieselFuture<'a, Vec<Post>>;
fn update_deleted(pool: &'a DbPool, post_id: PostId, new_deleted: bool)
-> TokioDieselFuture<'a, Post>;
fn update_removed(pool: &'a DbPool, post_id: PostId, new_removed: bool)
-> TokioDieselFuture<'a, Post>;
fn update_removed_for_creator(
conn: &PgConnection,
pool: &'a DbPool,
for_creator_id: PersonId,
for_community_id: Option<CommunityId>,
new_removed: bool,
) -> Result<Vec<Post>, Error>;
fn update_locked(conn: &PgConnection, post_id: PostId, new_locked: bool) -> Result<Post, Error>;
) -> TokioDieselFuture<'a, Vec<Post>>;
fn update_locked(pool: &'a DbPool, post_id: PostId, new_locked: bool) -> TokioDieselFuture<'a, Post>;
fn update_stickied(
conn: &PgConnection,
pool: &'a DbPool,
post_id: PostId,
new_stickied: bool,
) -> Result<Post, Error>;
) -> TokioDieselFuture<'a, Post>;
fn is_post_creator(person_id: PersonId, post_creator_id: PersonId) -> bool;
}
impl Post_ for Post {
impl<'a> Post_<'a> for Post {
fn list_for_community(
conn: &PgConnection,
pool: &'a DbPool,
the_community_id: CommunityId,
) -> Result<Vec<Self>, Error> {
) -> TokioDieselFuture<'a, Vec<Self>> {
use lemmy_db_schema::schema::post::dsl::*;
post
.filter(community_id.eq(the_community_id))
.then_order_by(published.desc())
.then_order_by(stickied.desc())
.limit(20)
.load::<Self>(conn)
.load_async(pool)
}
fn update_ap_id(conn: &PgConnection, post_id: PostId, apub_id: DbUrl) -> Result<Self, Error> {
fn update_ap_id(pool: &'a DbPool, post_id: PostId, apub_id: DbUrl) -> TokioDieselFuture<'a, Self> {
use lemmy_db_schema::schema::post::dsl::*;
diesel::update(post.find(post_id))
.set(ap_id.eq(apub_id))
.get_result::<Self>(conn)
.get_result_async(pool)
}
fn permadelete_for_creator(
conn: &PgConnection,
pool: &'a DbPool,
for_creator_id: PersonId,
) -> Result<Vec<Self>, Error> {
) -> TokioDieselFuture<'a, Vec<Self>> {
use lemmy_db_schema::schema::post::dsl::*;
let perma_deleted = "*Permananently Deleted*";
@ -111,37 +111,37 @@ impl Post_ for Post {
deleted.eq(true),
updated.eq(naive_now()),
))
.get_results::<Self>(conn)
.get_results_async(pool)
}
fn update_deleted(
conn: &PgConnection,
pool: &'a DbPool,
post_id: PostId,
new_deleted: bool,
) -> Result<Self, Error> {
) -> TokioDieselFuture<'a, Self> {
use lemmy_db_schema::schema::post::dsl::*;
diesel::update(post.find(post_id))
.set((deleted.eq(new_deleted), updated.eq(naive_now())))
.get_result::<Self>(conn)
.get_result_async(pool)
}
fn update_removed(
conn: &PgConnection,
pool: &'a DbPool,
post_id: PostId,
new_removed: bool,
) -> Result<Self, Error> {
) -> TokioDieselFuture<'a, Self> {
use lemmy_db_schema::schema::post::dsl::*;
diesel::update(post.find(post_id))
.set((removed.eq(new_removed), updated.eq(naive_now())))
.get_result::<Self>(conn)
.get_result_async(pool)
}
fn update_removed_for_creator(
conn: &PgConnection,
pool: &'a DbPool,
for_creator_id: PersonId,
for_community_id: Option<CommunityId>,
new_removed: bool,
) -> Result<Vec<Self>, Error> {
) -> TokioDieselFuture<'a, Vec<Self>> {
use lemmy_db_schema::schema::post::dsl::*;
let mut update = diesel::update(post).into_boxed();
@ -153,25 +153,25 @@ impl Post_ for Post {
update
.set((removed.eq(new_removed), updated.eq(naive_now())))
.get_results::<Self>(conn)
.get_results_async(pool)
}
fn update_locked(conn: &PgConnection, post_id: PostId, new_locked: bool) -> Result<Self, Error> {
fn update_locked(pool: &'a DbPool, post_id: PostId, new_locked: bool) -> TokioDieselFuture<'a, Self> {
use lemmy_db_schema::schema::post::dsl::*;
diesel::update(post.find(post_id))
.set(locked.eq(new_locked))
.get_result::<Self>(conn)
.get_result_async(pool)
}
fn update_stickied(
conn: &PgConnection,
pool: &'a DbPool,
post_id: PostId,
new_stickied: bool,
) -> Result<Self, Error> {
) -> TokioDieselFuture<'a, Self> {
use lemmy_db_schema::schema::post::dsl::*;
diesel::update(post.find(post_id))
.set(stickied.eq(new_stickied))
.get_result::<Self>(conn)
.get_result_async(pool)
}
fn is_post_creator(person_id: PersonId, post_creator_id: PersonId) -> bool {
@ -179,84 +179,84 @@ impl Post_ for Post {
}
}
impl ApubObject<PostForm> for Post {
fn read_from_apub_id(conn: &PgConnection, object_id: &DbUrl) -> Result<Self, Error> {
impl<'a> ApubObject<'a, PostForm> for Post {
fn read_from_apub_id(pool: &'a DbPool, object_id: &'a DbUrl) -> TokioDieselFuture<'a, Self> {
use lemmy_db_schema::schema::post::dsl::*;
post.filter(ap_id.eq(object_id)).first::<Self>(conn)
post.filter(ap_id.eq(object_id)).first_async(pool)
}
fn upsert(conn: &PgConnection, post_form: &PostForm) -> Result<Post, Error> {
fn upsert(pool: &'a DbPool, post_form: &'a PostForm) -> TokioDieselFuture<'a, Post> {
use lemmy_db_schema::schema::post::dsl::*;
insert_into(post)
.values(post_form)
.on_conflict(ap_id)
.do_update()
.set(post_form)
.get_result::<Self>(conn)
.get_result_async(pool)
}
}
impl Likeable<PostLikeForm, PostId> for PostLike {
fn like(conn: &PgConnection, post_like_form: &PostLikeForm) -> Result<Self, Error> {
impl<'a> Likeable<'a, PostLikeForm, PostId> for PostLike {
fn like(pool: &'a DbPool, post_like_form: &'a PostLikeForm) -> TokioDieselFuture<'a, Self> {
use lemmy_db_schema::schema::post_like::dsl::*;
insert_into(post_like)
.values(post_like_form)
.on_conflict((post_id, person_id))
.do_update()
.set(post_like_form)
.get_result::<Self>(conn)
.get_result_async(pool)
}
fn remove(conn: &PgConnection, person_id: PersonId, post_id: PostId) -> Result<usize, Error> {
fn remove(pool: &'a DbPool, person_id: PersonId, post_id: PostId) -> TokioDieselFuture<'a, usize> {
use lemmy_db_schema::schema::post_like::dsl;
diesel::delete(
dsl::post_like
.filter(dsl::post_id.eq(post_id))
.filter(dsl::person_id.eq(person_id)),
)
.execute(conn)
.execute_async(pool)
}
}
impl Saveable<PostSavedForm> for PostSaved {
fn save(conn: &PgConnection, post_saved_form: &PostSavedForm) -> Result<Self, Error> {
impl<'a> Saveable<'a, PostSavedForm> for PostSaved {
fn save(pool: &'a DbPool, post_saved_form: &'a PostSavedForm) -> TokioDieselFuture<'a, Self> {
use lemmy_db_schema::schema::post_saved::dsl::*;
insert_into(post_saved)
.values(post_saved_form)
.on_conflict((post_id, person_id))
.do_update()
.set(post_saved_form)
.get_result::<Self>(conn)
.get_result_async(pool)
}
fn unsave(conn: &PgConnection, post_saved_form: &PostSavedForm) -> Result<usize, Error> {
fn unsave(pool: &'a DbPool, post_saved_form: &PostSavedForm) -> TokioDieselFuture<'a, usize> {
use lemmy_db_schema::schema::post_saved::dsl::*;
diesel::delete(
post_saved
.filter(post_id.eq(post_saved_form.post_id))
.filter(person_id.eq(post_saved_form.person_id)),
)
.execute(conn)
.execute_async(pool)
}
}
impl Readable<PostReadForm> for PostRead {
fn mark_as_read(conn: &PgConnection, post_read_form: &PostReadForm) -> Result<Self, Error> {
impl<'a> Readable<'a, PostReadForm> for PostRead {
fn mark_as_read(pool: &'a DbPool, post_read_form: &'a PostReadForm) -> TokioDieselFuture<'a, Self> {
use lemmy_db_schema::schema::post_read::dsl::*;
insert_into(post_read)
.values(post_read_form)
.on_conflict((post_id, person_id))
.do_update()
.set(post_read_form)
.get_result::<Self>(conn)
.get_result_async(pool)
}
fn mark_as_unread(conn: &PgConnection, post_read_form: &PostReadForm) -> Result<usize, Error> {
fn mark_as_unread(pool: &'a DbPool, post_read_form: &'a PostReadForm) -> TokioDieselFuture<'a, usize> {
use lemmy_db_schema::schema::post_read::dsl::*;
diesel::delete(
post_read
.filter(post_id.eq(post_read_form.post_id))
.filter(person_id.eq(post_read_form.person_id)),
)
.execute(conn)
.execute_async(pool)
}
}
@ -276,24 +276,22 @@ impl DeleteableOrRemoveable for Post {
#[cfg(test)]
mod tests {
use crate::{establish_unpooled_connection, source::post::*};
use crate::{setup_connection_pool_for_tests, source::post::*};
use lemmy_db_schema::source::{
community::{Community, CommunityForm},
person::*,
};
use serial_test::serial;
#[test]
#[serial]
fn test_crud() {
let conn = establish_unpooled_connection();
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_crud() {
let pool = setup_connection_pool_for_tests();
let new_person = PersonForm {
name: "jim".into(),
..PersonForm::default()
};
let inserted_person = Person::create(&conn, &new_person).unwrap();
let inserted_person = Person::create(&pool, &new_person).await.unwrap();
let new_community = CommunityForm {
name: "test community_3".to_string(),
@ -301,7 +299,7 @@ mod tests {
..CommunityForm::default()
};
let inserted_community = Community::create(&conn, &new_community).unwrap();
let inserted_community = Community::create(&pool, &new_community).await.unwrap();
let new_post = PostForm {
name: "A test post".into(),
@ -310,7 +308,7 @@ mod tests {
..PostForm::default()
};
let inserted_post = Post::create(&conn, &new_post).unwrap();
let inserted_post = Post::create(&pool, &new_post).await.unwrap();
let expected_post = Post {
id: inserted_post.id,
@ -341,7 +339,7 @@ mod tests {
score: 1,
};
let inserted_post_like = PostLike::like(&conn, &post_like_form).unwrap();
let inserted_post_like = PostLike::like(&pool, &post_like_form).await.unwrap();
let expected_post_like = PostLike {
id: inserted_post_like.id,
@ -357,7 +355,7 @@ mod tests {
person_id: inserted_person.id,
};
let inserted_post_saved = PostSaved::save(&conn, &post_saved_form).unwrap();
let inserted_post_saved = PostSaved::save(&pool, &post_saved_form).await.unwrap();
let expected_post_saved = PostSaved {
id: inserted_post_saved.id,
@ -372,7 +370,7 @@ mod tests {
person_id: inserted_person.id,
};
let inserted_post_read = PostRead::mark_as_read(&conn, &post_read_form).unwrap();
let inserted_post_read = PostRead::mark_as_read(&pool, &post_read_form).await.unwrap();
let expected_post_read = PostRead {
id: inserted_post_read.id,
@ -381,14 +379,14 @@ mod tests {
published: inserted_post_read.published,
};
let read_post = Post::read(&conn, inserted_post.id).unwrap();
let updated_post = Post::update(&conn, inserted_post.id, &new_post).unwrap();
let like_removed = PostLike::remove(&conn, inserted_person.id, inserted_post.id).unwrap();
let saved_removed = PostSaved::unsave(&conn, &post_saved_form).unwrap();
let read_removed = PostRead::mark_as_unread(&conn, &post_read_form).unwrap();
let num_deleted = Post::delete(&conn, inserted_post.id).unwrap();
Community::delete(&conn, inserted_community.id).unwrap();
Person::delete(&conn, inserted_person.id).unwrap();
let read_post = Post::read(&pool, inserted_post.id).await.unwrap();
let updated_post = Post::update(&pool, inserted_post.id, &new_post).await.unwrap();
let like_removed = PostLike::remove(&pool, inserted_person.id, inserted_post.id).await.unwrap();
let saved_removed = PostSaved::unsave(&pool, &post_saved_form).await.unwrap();
let read_removed = PostRead::mark_as_unread(&pool, &post_read_form).await.unwrap();
let num_deleted = Post::delete(&pool, inserted_post.id).await.unwrap();
Community::delete(&pool, inserted_community.id).await.unwrap();
Person::delete(&pool, inserted_person.id).await.unwrap();
assert_eq!(expected_post, read_post);
assert_eq!(expected_post, inserted_post);

@ -3,15 +3,11 @@ extern crate diesel_migrations;
use actix::prelude::*;
use actix_web::{web::Data, *};
use diesel::{
r2d2::{ConnectionManager, Pool},
PgConnection,
};
use lemmy_api::match_websocket_operation;
use lemmy_api_common::blocking;
use lemmy_api_crud::match_websocket_operation_crud;
use lemmy_apub::activity_queue::create_activity_queue;
use lemmy_db_queries::get_database_url_from_env;
use lemmy_db_queries::setup_connection_pool;
use lemmy_routes::{feeds, images, nodeinfo, webfinger};
use lemmy_server::{api_routes, code_migrations::run_advanced_migrations, scheduled_tasks};
use lemmy_utils::{
@ -32,18 +28,11 @@ async fn main() -> Result<(), LemmyError> {
let settings = Settings::get();
// Set up the r2d2 connection pool
let db_url = match get_database_url_from_env() {
Ok(url) => url,
Err(_) => settings.get_database_url(),
};
let manager = ConnectionManager::<PgConnection>::new(&db_url);
let pool = Pool::builder()
.max_size(settings.database().pool_size())
.build(manager)
.unwrap_or_else(|_| panic!("Error connecting to {}", db_url));
let pool = setup_connection_pool();
// Run the migrations from code
blocking(&pool, move |conn| {
// TODO this is already done from the pool
embedded_migrations::run(conn)?;
run_advanced_migrations(conn)?;
Ok(()) as Result<(), LemmyError>

Loading…
Cancel
Save