diff --git a/llarp/constants/evloop.hpp b/llarp/constants/evloop.hpp new file mode 100644 index 000000000..4824f1b9c --- /dev/null +++ b/llarp/constants/evloop.hpp @@ -0,0 +1,7 @@ +#pragma once + +namespace llarp +{ + /// default queue length for logic jobs + constexpr std::size_t event_loop_queue_size = 1024; +} // namespace llarp diff --git a/llarp/constants/path.hpp b/llarp/constants/path.hpp index 92758507d..9d6da0865 100644 --- a/llarp/constants/path.hpp +++ b/llarp/constants/path.hpp @@ -34,6 +34,10 @@ namespace llarp constexpr auto latency_interval = 20s; /// if a path is inactive for this amount of time it's dead constexpr auto alive_timeout = latency_interval * 1.5; + + /// how big transit hop traffic queues are + constexpr std::size_t transit_hop_queue_size = 256; + } // namespace path } // namespace llarp diff --git a/llarp/context.cpp b/llarp/context.cpp index 2e5e07108..f25d41eea 100644 --- a/llarp/context.cpp +++ b/llarp/context.cpp @@ -45,10 +45,7 @@ namespace llarp if (threads <= 0) threads = 1; worker = std::make_shared(threads, 1024, "llarp-worker"); - auto jobQueueSize = config->router.m_JobQueueSize; - if (jobQueueSize < 1024) - jobQueueSize = 1024; - logic = std::make_shared(jobQueueSize); + logic = std::make_shared(); nodedb_dir = config->router.m_dataDir / nodedb_dirname; @@ -80,7 +77,12 @@ namespace llarp llarp::LogInfo(llarp::VERSION_FULL, " ", llarp::RELEASE_MOTTO); llarp::LogInfo("starting up"); if (mainloop == nullptr) - mainloop = llarp_make_ev_loop(); + { + auto jobQueueSize = config->router.m_JobQueueSize; + if (jobQueueSize < 1024) + jobQueueSize = 1024; + mainloop = llarp_make_ev_loop(jobQueueSize); + } logic->set_event_loop(mainloop.get()); mainloop->set_logic(logic); diff --git a/llarp/ev/ev.cpp b/llarp/ev/ev.cpp index 50b2ee6f2..4a08e64b2 100644 --- a/llarp/ev/ev.cpp +++ b/llarp/ev/ev.cpp @@ -15,9 +15,9 @@ #endif llarp_ev_loop_ptr -llarp_make_ev_loop() +llarp_make_ev_loop(size_t queueLength) { - llarp_ev_loop_ptr r = std::make_shared(); + llarp_ev_loop_ptr r = std::make_shared(queueLength); r->init(); r->update_time(); return r; diff --git a/llarp/ev/ev.h b/llarp/ev/ev.h index 98f5168ff..e1c7b9456 100644 --- a/llarp/ev/ev.h +++ b/llarp/ev/ev.h @@ -25,6 +25,8 @@ #include #endif +#include + /** * ev.h * @@ -47,8 +49,9 @@ using llarp_ev_loop_ptr = std::shared_ptr; /// make an event loop using our baked in event loop on Windows /// make an event loop using libuv otherwise. +/// @param queue_size how big the logic job queue is llarp_ev_loop_ptr -llarp_make_ev_loop(); +llarp_make_ev_loop(std::size_t queue_size = llarp::event_loop_queue_size); // run mainloop void diff --git a/llarp/ev/ev_libuv.cpp b/llarp/ev/ev_libuv.cpp index 9e45e84ed..5eacbf354 100644 --- a/llarp/ev/ev_libuv.cpp +++ b/llarp/ev/ev_libuv.cpp @@ -753,7 +753,8 @@ namespace libuv llarp::LogContext::Instance().logStream->Tick(loop->time_now()); } - Loop::Loop() : llarp_ev_loop(), m_LogicCalls(1024), m_timerQueue(20), m_timerCancelQueue(20) + Loop::Loop(size_t queueLength) + : llarp_ev_loop(), m_LogicCalls(queueLength), m_timerQueue(20), m_timerCancelQueue(20) { } diff --git a/llarp/ev/ev_libuv.hpp b/llarp/ev/ev_libuv.hpp index deb9afc38..ff016e48b 100644 --- a/llarp/ev/ev_libuv.hpp +++ b/llarp/ev/ev_libuv.hpp @@ -24,7 +24,7 @@ namespace libuv Callback callback; }; - Loop(); + Loop(size_t queueSize); bool init() override; diff --git a/llarp/path/transit_hop.cpp b/llarp/path/transit_hop.cpp index edae826c1..a9f446955 100644 --- a/llarp/path/transit_hop.cpp +++ b/llarp/path/transit_hop.cpp @@ -32,7 +32,8 @@ namespace llarp return stream; } - TransitHop::TransitHop() : m_UpstreamGather(512), m_DownstreamGather(512) + TransitHop::TransitHop() + : m_UpstreamGather(transit_hop_queue_size), m_DownstreamGather(transit_hop_queue_size) { m_UpstreamGather.enable(); m_DownstreamGather.enable(); @@ -119,7 +120,6 @@ namespace llarp void TransitHop::DownstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r) { - m_DownstreamWorkCounter++; auto flushIt = [self = shared_from_this(), r]() { std::vector msgs; do @@ -161,7 +161,6 @@ namespace llarp void TransitHop::UpstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r) { - m_UpstreamWorkCounter++; auto flushIt = [self = shared_from_this(), r]() { std::vector msgs; do @@ -251,19 +250,28 @@ namespace llarp void TransitHop::FlushUpstream(AbstractRouter* r) { - if (m_UpstreamQueue && !m_UpstreamQueue->empty()) - r->threadpool()->addJob(std::bind( - &TransitHop::UpstreamWork, shared_from_this(), std::move(m_UpstreamQueue), r)); - + if (m_UpstreamQueue && not m_UpstreamQueue->empty()) + { + if (r->threadpool()->addJob(std::bind( + &TransitHop::UpstreamWork, shared_from_this(), std::move(m_UpstreamQueue), r))) + { + m_UpstreamWorkCounter++; + } + } m_UpstreamQueue = nullptr; } void TransitHop::FlushDownstream(AbstractRouter* r) { - if (m_DownstreamQueue && !m_DownstreamQueue->empty()) - r->threadpool()->addJob(std::bind( - &TransitHop::DownstreamWork, shared_from_this(), std::move(m_DownstreamQueue), r)); + if (m_DownstreamQueue && not m_DownstreamQueue->empty()) + { + if (r->threadpool()->addJob(std::bind( + &TransitHop::DownstreamWork, shared_from_this(), std::move(m_DownstreamQueue), r))) + { + m_DownstreamWorkCounter++; + } + } m_DownstreamQueue = nullptr; } diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index 0a55904c1..e0e7dd0d2 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -153,12 +153,11 @@ namespace llarp void Router::PumpLL() { - static constexpr size_t PumpJobThreshhold = 50; static constexpr auto PumpInterval = 25ms; const auto now = Now(); if (_stopping.load()) return; - if (_logic->numPendingJobs() >= PumpJobThreshhold && _lastPump + PumpInterval >= now) + if (_lastPump + PumpInterval >= now) { return; } diff --git a/llarp/util/thread/logic.cpp b/llarp/util/thread/logic.cpp index 2021d45ea..16e84a838 100644 --- a/llarp/util/thread/logic.cpp +++ b/llarp/util/thread/logic.cpp @@ -6,20 +6,6 @@ namespace llarp { - Logic::Logic(size_t) - { - } - - Logic::~Logic() - { - } - - size_t - Logic::numPendingJobs() const - { - return 0; - } - bool Logic::queue_job(struct llarp_thread_job job) { diff --git a/llarp/util/thread/logic.hpp b/llarp/util/thread/logic.hpp index d65a889e6..91e850251 100644 --- a/llarp/util/thread/logic.hpp +++ b/llarp/util/thread/logic.hpp @@ -11,10 +11,6 @@ namespace llarp class Logic { public: - Logic(size_t queueLength = size_t{1024 * 8}); - - ~Logic(); - /// stop all operation and wait for that to die void stop(); @@ -34,9 +30,6 @@ namespace llarp void remove_call(uint32_t id); - size_t - numPendingJobs() const; - bool can_flush() const; diff --git a/readme.md b/readme.md index b8252fdd3..d388ed463 100644 --- a/readme.md +++ b/readme.md @@ -12,8 +12,6 @@ You can view documentation on how to get started [here](https://loki-project.git [![Build Status](https://drone.lokinet.dev/api/badges/loki-project/loki-network/status.svg?ref=refs/heads/master)](https://drone.lokinet.dev/loki-project/loki-network) -You can find Bleeding edge builds [here](https://builds.lokinet.dev) (these builds may eat your first born). - ## Usage See the [documentation](https://loki-project.github.io/loki-docs/Lokinet/LokinetOverview/) on how to get started.