Remove `actix_rt` & use standard tokio spawn (#3158)

* Remove `actix_rt` & use standard tokio spawn

* Adjust rust log back down

* Format correctly

* Update cargo lock

* Add DB settings

* Change name and update to latest rev

* Clean up formatting changes

* Move `worker_count` and `worker_retry_count` to settings

* Update defaults

* Use `0.4.4` instead of git branch
remove-docker-networks
cetra3 12 months ago committed by GitHub
parent 63d3759c48
commit d7da911a48
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

4
Cargo.lock generated

@ -2543,7 +2543,6 @@ dependencies = [
name = "lemmy_api_common" name = "lemmy_api_common"
version = "0.18.0" version = "0.18.0"
dependencies = [ dependencies = [
"actix-rt",
"actix-web", "actix-web",
"anyhow", "anyhow",
"chrono", "chrono",
@ -2561,6 +2560,7 @@ dependencies = [
"rosetta-i18n", "rosetta-i18n",
"serde", "serde",
"serde_with", "serde_with",
"tokio",
"tracing", "tracing",
"ts-rs", "ts-rs",
"url", "url",
@ -2592,7 +2592,6 @@ name = "lemmy_apub"
version = "0.18.0" version = "0.18.0"
dependencies = [ dependencies = [
"activitypub_federation", "activitypub_federation",
"actix-rt",
"actix-web", "actix-web",
"anyhow", "anyhow",
"assert-json-diff", "assert-json-diff",
@ -2620,6 +2619,7 @@ dependencies = [
"sha2", "sha2",
"strum_macros", "strum_macros",
"task-local-extensions", "task-local-extensions",
"tokio",
"tracing", "tracing",
"url", "url",
"uuid", "uuid",

@ -89,7 +89,7 @@ anyhow = "1.0.71"
diesel_ltree = "0.3.0" diesel_ltree = "0.3.0"
typed-builder = "0.10.0" typed-builder = "0.10.0"
serial_test = "0.9.0" serial_test = "0.9.0"
tokio = "1.28.2" tokio = { version = "1.28.2", features = ["full"] }
sha2 = "0.10.6" sha2 = "0.10.6"
regex = "1.8.4" regex = "1.8.4"
once_cell = "1.18.0" once_cell = "1.18.0"

@ -76,4 +76,8 @@
port: 8536 port: 8536
# Whether the site is available over TLS. Needs to be true for federation to work. # Whether the site is available over TLS. Needs to be true for federation to work.
tls_enabled: true tls_enabled: true
# The number of activitypub federation workers that can be in-flight concurrently
worker_count: 0
# The number of activitypub federation retry workers that can be in-flight concurrently
retry_count: 0
} }

@ -38,7 +38,7 @@ encoding = { version = "0.2.33", optional = true }
anyhow = { workspace = true } anyhow = { workspace = true }
futures = { workspace = true } futures = { workspace = true }
uuid = { workspace = true } uuid = { workspace = true }
actix-rt = { workspace = true } tokio = { workspace = true }
reqwest = { workspace = true } reqwest = { workspace = true }
ts-rs = { workspace = true, optional = true } ts-rs = { workspace = true, optional = true }
actix-web = { workspace = true } actix-web = { workspace = true }

@ -271,7 +271,7 @@ mod tests {
use url::Url; use url::Url;
// These helped with testing // These helped with testing
#[actix_rt::test] #[tokio::test]
async fn test_site_metadata() { async fn test_site_metadata() {
let settings = &SETTINGS.clone(); let settings = &SETTINGS.clone();
let client = reqwest::Client::builder() let client = reqwest::Client::builder()

@ -177,7 +177,6 @@ pub struct CreateSite {
pub rate_limit_search_per_second: Option<i32>, pub rate_limit_search_per_second: Option<i32>,
pub federation_enabled: Option<bool>, pub federation_enabled: Option<bool>,
pub federation_debug: Option<bool>, pub federation_debug: Option<bool>,
pub federation_worker_count: Option<i32>,
pub captcha_enabled: Option<bool>, pub captcha_enabled: Option<bool>,
pub captcha_difficulty: Option<String>, pub captcha_difficulty: Option<String>,
pub allowed_instances: Option<Vec<String>>, pub allowed_instances: Option<Vec<String>>,
@ -250,8 +249,6 @@ pub struct EditSite {
pub federation_enabled: Option<bool>, pub federation_enabled: Option<bool>,
/// Enables federation debugging. /// Enables federation debugging.
pub federation_debug: Option<bool>, pub federation_debug: Option<bool>,
/// The number of federation workers.
pub federation_worker_count: Option<i32>,
/// Whether to enable captchas for signups. /// Whether to enable captchas for signups.
pub captcha_enabled: Option<bool>, pub captcha_enabled: Option<bool>,
/// The captcha difficulty. Can be easy, medium, or hard /// The captcha difficulty. Can be easy, medium, or hard

@ -122,7 +122,6 @@ impl PerformCrud for CreateSite {
.slur_filter_regex(diesel_option_overwrite(&data.slur_filter_regex)) .slur_filter_regex(diesel_option_overwrite(&data.slur_filter_regex))
.actor_name_max_length(data.actor_name_max_length) .actor_name_max_length(data.actor_name_max_length)
.federation_enabled(data.federation_enabled) .federation_enabled(data.federation_enabled)
.federation_worker_count(data.federation_worker_count)
.captcha_enabled(data.captcha_enabled) .captcha_enabled(data.captcha_enabled)
.captcha_difficulty(data.captcha_difficulty.clone()) .captcha_difficulty(data.captcha_difficulty.clone())
.build(); .build();

@ -123,7 +123,6 @@ impl PerformCrud for EditSite {
.slur_filter_regex(diesel_option_overwrite(&data.slur_filter_regex)) .slur_filter_regex(diesel_option_overwrite(&data.slur_filter_regex))
.actor_name_max_length(data.actor_name_max_length) .actor_name_max_length(data.actor_name_max_length)
.federation_enabled(data.federation_enabled) .federation_enabled(data.federation_enabled)
.federation_worker_count(data.federation_worker_count)
.captcha_enabled(data.captcha_enabled) .captcha_enabled(data.captcha_enabled)
.captcha_difficulty(data.captcha_difficulty.clone()) .captcha_difficulty(data.captcha_difficulty.clone())
.reports_email_admins(data.reports_email_admins) .reports_email_admins(data.reports_email_admins)

@ -25,7 +25,7 @@ chrono = { workspace = true }
serde_json = { workspace = true } serde_json = { workspace = true }
serde = { workspace = true } serde = { workspace = true }
actix-web = { workspace = true } actix-web = { workspace = true }
actix-rt = { workspace = true } tokio = {workspace = true}
tracing = { workspace = true } tracing = { workspace = true }
strum_macros = { workspace = true } strum_macros = { workspace = true }
url = { workspace = true } url = { workspace = true }

@ -120,7 +120,7 @@ mod tests {
}; };
use serial_test::serial; use serial_test::serial;
#[actix_rt::test] #[tokio::test]
#[serial] #[serial]
async fn test_parse_lemmy_community_moderators() { async fn test_parse_lemmy_community_moderators() {
let context = init_context().await; let context = init_context().await;

@ -223,7 +223,7 @@ pub(crate) mod tests {
LocalSite::delete(context.pool()).await.unwrap(); LocalSite::delete(context.pool()).await.unwrap();
} }
#[actix_rt::test] #[tokio::test]
#[serial] #[serial]
pub(crate) async fn test_parse_lemmy_comment() { pub(crate) async fn test_parse_lemmy_comment() {
let context = init_context().await; let context = init_context().await;
@ -249,7 +249,7 @@ pub(crate) mod tests {
cleanup(data, &context).await; cleanup(data, &context).await;
} }
#[actix_rt::test] #[tokio::test]
#[serial] #[serial]
async fn test_parse_pleroma_comment() { async fn test_parse_pleroma_comment() {
let context = init_context().await; let context = init_context().await;
@ -279,7 +279,7 @@ pub(crate) mod tests {
cleanup(data, &context).await; cleanup(data, &context).await;
} }
#[actix_rt::test] #[tokio::test]
#[serial] #[serial]
async fn test_html_to_markdown_sanitize() { async fn test_html_to_markdown_sanitize() {
let parsed = parse_html("<script></script><b>hello</b>"); let parsed = parse_html("<script></script><b>hello</b>");

@ -242,7 +242,7 @@ pub(crate) mod tests {
community community
} }
#[actix_rt::test] #[tokio::test]
#[serial] #[serial]
async fn test_parse_lemmy_community() { async fn test_parse_lemmy_community() {
let context = init_context().await; let context = init_context().await;

@ -221,7 +221,7 @@ pub(crate) mod tests {
site site
} }
#[actix_rt::test] #[tokio::test]
#[serial] #[serial]
async fn test_parse_lemmy_instance() { async fn test_parse_lemmy_instance() {
let context = init_context().await; let context = init_context().await;

@ -223,7 +223,7 @@ pub(crate) mod tests {
(person, site) (person, site)
} }
#[actix_rt::test] #[tokio::test]
#[serial] #[serial]
async fn test_parse_lemmy_person() { async fn test_parse_lemmy_person() {
let context = init_context().await; let context = init_context().await;
@ -236,7 +236,7 @@ pub(crate) mod tests {
cleanup((person, site), &context).await; cleanup((person, site), &context).await;
} }
#[actix_rt::test] #[tokio::test]
#[serial] #[serial]
async fn test_parse_pleroma_person() { async fn test_parse_pleroma_person() {
let context = init_context().await; let context = init_context().await;

@ -281,7 +281,7 @@ mod tests {
use lemmy_db_schema::source::site::Site; use lemmy_db_schema::source::site::Site;
use serial_test::serial; use serial_test::serial;
#[actix_rt::test] #[tokio::test]
#[serial] #[serial]
async fn test_parse_lemmy_post() { async fn test_parse_lemmy_post() {
let context = init_context().await; let context = init_context().await;

@ -187,7 +187,7 @@ mod tests {
Site::delete(context.pool(), data.2.id).await.unwrap(); Site::delete(context.pool(), data.2.id).await.unwrap();
} }
#[actix_rt::test] #[tokio::test]
#[serial] #[serial]
async fn test_parse_lemmy_pm() { async fn test_parse_lemmy_pm() {
let context = init_context().await; let context = init_context().await;
@ -213,7 +213,7 @@ mod tests {
cleanup(data, &context).await; cleanup(data, &context).await;
} }
#[actix_rt::test] #[tokio::test]
#[serial] #[serial]
async fn test_parse_pleroma_pm() { async fn test_parse_pleroma_pm() {
let context = init_context().await; let context = init_context().await;

@ -339,7 +339,6 @@ diesel::table! {
slur_filter_regex -> Nullable<Text>, slur_filter_regex -> Nullable<Text>,
actor_name_max_length -> Int4, actor_name_max_length -> Int4,
federation_enabled -> Bool, federation_enabled -> Bool,
federation_worker_count -> Int4,
captcha_enabled -> Bool, captcha_enabled -> Bool,
#[max_length = 255] #[max_length = 255]
captcha_difficulty -> Varchar, captcha_difficulty -> Varchar,

@ -50,8 +50,6 @@ pub struct LocalSite {
pub actor_name_max_length: i32, pub actor_name_max_length: i32,
/// Whether federation is enabled. /// Whether federation is enabled.
pub federation_enabled: bool, pub federation_enabled: bool,
/// The number of concurrent federation http workers.
pub federation_worker_count: i32,
/// Whether captcha is enabled. /// Whether captcha is enabled.
pub captcha_enabled: bool, pub captcha_enabled: bool,
/// The captcha difficulty. /// The captcha difficulty.
@ -85,7 +83,6 @@ pub struct LocalSiteInsertForm {
pub slur_filter_regex: Option<String>, pub slur_filter_regex: Option<String>,
pub actor_name_max_length: Option<i32>, pub actor_name_max_length: Option<i32>,
pub federation_enabled: Option<bool>, pub federation_enabled: Option<bool>,
pub federation_worker_count: Option<i32>,
pub captcha_enabled: Option<bool>, pub captcha_enabled: Option<bool>,
pub captcha_difficulty: Option<String>, pub captcha_difficulty: Option<String>,
pub registration_mode: Option<RegistrationMode>, pub registration_mode: Option<RegistrationMode>,
@ -112,7 +109,6 @@ pub struct LocalSiteUpdateForm {
pub slur_filter_regex: Option<Option<String>>, pub slur_filter_regex: Option<Option<String>>,
pub actor_name_max_length: Option<i32>, pub actor_name_max_length: Option<i32>,
pub federation_enabled: Option<bool>, pub federation_enabled: Option<bool>,
pub federation_worker_count: Option<i32>,
pub captcha_enabled: Option<bool>, pub captcha_enabled: Option<bool>,
pub captcha_difficulty: Option<String>, pub captcha_difficulty: Option<String>,
pub registration_mode: Option<RegistrationMode>, pub registration_mode: Option<RegistrationMode>,

@ -39,6 +39,12 @@ pub struct Settings {
#[default(None)] #[default(None)]
#[doku(skip)] #[doku(skip)]
pub opentelemetry_url: Option<Url>, pub opentelemetry_url: Option<Url>,
/// The number of activitypub federation workers that can be in-flight concurrently
#[default(0)]
pub worker_count: usize,
/// The number of activitypub federation retry workers that can be in-flight concurrently
#[default(0)]
pub retry_count: usize,
} }
#[derive(Debug, Deserialize, Serialize, Clone, SmartDefault, Document)] #[derive(Debug, Deserialize, Serialize, Clone, SmartDefault, Document)]

@ -0,0 +1 @@
alter table local_site add column federation_worker_count int default 64 not null;

@ -0,0 +1 @@
alter table local_site drop column federation_worker_count;

@ -139,24 +139,23 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> {
}); });
} }
let settings_bind = settings.clone();
let federation_config = FederationConfig::builder() let federation_config = FederationConfig::builder()
.domain(settings.hostname.clone()) .domain(settings.hostname.clone())
.app_data(context.clone()) .app_data(context.clone())
.client(client.clone()) .client(client.clone())
.http_fetch_limit(FEDERATION_HTTP_FETCH_LIMIT) .http_fetch_limit(FEDERATION_HTTP_FETCH_LIMIT)
.worker_count(local_site.federation_worker_count as usize) .worker_count(settings.worker_count)
.retry_count(settings.retry_count)
.debug(cfg!(debug_assertions)) .debug(cfg!(debug_assertions))
.http_signature_compat(true) .http_signature_compat(true)
.url_verifier(Box::new(VerifyUrlData(context.pool().clone()))) .url_verifier(Box::new(VerifyUrlData(context.pool().clone())))
.build() .build()
.await .await?;
.expect("configure federation");
// Create Http server with websocket support // Create Http server with websocket support
let settings_bind = settings.clone();
HttpServer::new(move || { HttpServer::new(move || {
let context = context.clone();
let cors_config = if cfg!(debug_assertions) { let cors_config = if cfg!(debug_assertions) {
Cors::permissive() Cors::permissive()
} else { } else {
@ -173,7 +172,7 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> {
)) ))
.wrap(cors_config) .wrap(cors_config)
.wrap(TracingLogger::<QuieterRootSpanBuilder>::new()) .wrap(TracingLogger::<QuieterRootSpanBuilder>::new())
.app_data(Data::new(context)) .app_data(Data::new(context.clone()))
.app_data(Data::new(rate_limit_cell.clone())) .app_data(Data::new(rate_limit_cell.clone()))
.wrap(FederationMiddleware::new(federation_config.clone())) .wrap(FederationMiddleware::new(federation_config.clone()))
// The routes // The routes

@ -1,7 +1,7 @@
use lemmy_server::{init_logging, start_lemmy_server}; use lemmy_server::{init_logging, start_lemmy_server};
use lemmy_utils::{error::LemmyError, settings::SETTINGS}; use lemmy_utils::{error::LemmyError, settings::SETTINGS};
#[actix_web::main] #[tokio::main]
pub async fn main() -> Result<(), LemmyError> { pub async fn main() -> Result<(), LemmyError> {
init_logging(&SETTINGS.opentelemetry_url)?; init_logging(&SETTINGS.opentelemetry_url)?;
#[cfg(not(feature = "embed-pictrs"))] #[cfg(not(feature = "embed-pictrs"))]

Loading…
Cancel
Save