make libuv event loop logic queue size configurable.

remove logic constructor that is no-op.
add constant for default logic queue size
add constant for transit hop queue size
pull/1272/head
Jeff Becker 4 years ago
parent 8a003a1144
commit acecb23eb3
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -0,0 +1,7 @@
#pragma once
namespace llarp
{
/// default queue length for logic jobs
constexpr std::size_t event_loop_queue_size = 1024;
} // namespace llarp

@ -34,6 +34,10 @@ namespace llarp
constexpr auto latency_interval = 20s;
/// if a path is inactive for this amount of time it's dead
constexpr auto alive_timeout = latency_interval * 1.5;
/// how big transit hop traffic queues are
constexpr std::size_t transit_hop_queue_size = 256;
} // namespace path
} // namespace llarp

@ -45,10 +45,7 @@ namespace llarp
if (threads <= 0)
threads = 1;
worker = std::make_shared<llarp::thread::ThreadPool>(threads, 1024, "llarp-worker");
auto jobQueueSize = config->router.m_JobQueueSize;
if (jobQueueSize < 1024)
jobQueueSize = 1024;
logic = std::make_shared<Logic>(jobQueueSize);
logic = std::make_shared<Logic>();
nodedb_dir = config->router.m_dataDir / nodedb_dirname;
@ -80,7 +77,12 @@ namespace llarp
llarp::LogInfo(llarp::VERSION_FULL, " ", llarp::RELEASE_MOTTO);
llarp::LogInfo("starting up");
if (mainloop == nullptr)
mainloop = llarp_make_ev_loop();
{
auto jobQueueSize = config->router.m_JobQueueSize;
if (jobQueueSize < 1024)
jobQueueSize = 1024;
mainloop = llarp_make_ev_loop(jobQueueSize);
}
logic->set_event_loop(mainloop.get());
mainloop->set_logic(logic);

@ -15,9 +15,9 @@
#endif
llarp_ev_loop_ptr
llarp_make_ev_loop()
llarp_make_ev_loop(size_t queueLength)
{
llarp_ev_loop_ptr r = std::make_shared<libuv::Loop>();
llarp_ev_loop_ptr r = std::make_shared<libuv::Loop>(queueLength);
r->init();
r->update_time();
return r;

@ -25,6 +25,8 @@
#include <uv.h>
#endif
#include <constants/evloop.hpp>
/**
* ev.h
*
@ -47,8 +49,9 @@ using llarp_ev_loop_ptr = std::shared_ptr<llarp_ev_loop>;
/// make an event loop using our baked in event loop on Windows
/// make an event loop using libuv otherwise.
/// @param queue_size how big the logic job queue is
llarp_ev_loop_ptr
llarp_make_ev_loop();
llarp_make_ev_loop(std::size_t queue_size = llarp::event_loop_queue_size);
// run mainloop
void

@ -753,7 +753,8 @@ namespace libuv
llarp::LogContext::Instance().logStream->Tick(loop->time_now());
}
Loop::Loop() : llarp_ev_loop(), m_LogicCalls(1024), m_timerQueue(20), m_timerCancelQueue(20)
Loop::Loop(size_t queueLength)
: llarp_ev_loop(), m_LogicCalls(queueLength), m_timerQueue(20), m_timerCancelQueue(20)
{
}

@ -24,7 +24,7 @@ namespace libuv
Callback callback;
};
Loop();
Loop(size_t queueSize);
bool
init() override;

@ -32,7 +32,8 @@ namespace llarp
return stream;
}
TransitHop::TransitHop() : m_UpstreamGather(512), m_DownstreamGather(512)
TransitHop::TransitHop()
: m_UpstreamGather(transit_hop_queue_size), m_DownstreamGather(transit_hop_queue_size)
{
m_UpstreamGather.enable();
m_DownstreamGather.enable();
@ -119,7 +120,6 @@ namespace llarp
void
TransitHop::DownstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r)
{
m_DownstreamWorkCounter++;
auto flushIt = [self = shared_from_this(), r]() {
std::vector<RelayDownstreamMessage> msgs;
do
@ -161,7 +161,6 @@ namespace llarp
void
TransitHop::UpstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r)
{
m_UpstreamWorkCounter++;
auto flushIt = [self = shared_from_this(), r]() {
std::vector<RelayUpstreamMessage> msgs;
do
@ -251,19 +250,28 @@ namespace llarp
void
TransitHop::FlushUpstream(AbstractRouter* r)
{
if (m_UpstreamQueue && !m_UpstreamQueue->empty())
r->threadpool()->addJob(std::bind(
&TransitHop::UpstreamWork, shared_from_this(), std::move(m_UpstreamQueue), r));
if (m_UpstreamQueue && not m_UpstreamQueue->empty())
{
if (r->threadpool()->addJob(std::bind(
&TransitHop::UpstreamWork, shared_from_this(), std::move(m_UpstreamQueue), r)))
{
m_UpstreamWorkCounter++;
}
}
m_UpstreamQueue = nullptr;
}
void
TransitHop::FlushDownstream(AbstractRouter* r)
{
if (m_DownstreamQueue && !m_DownstreamQueue->empty())
r->threadpool()->addJob(std::bind(
&TransitHop::DownstreamWork, shared_from_this(), std::move(m_DownstreamQueue), r));
if (m_DownstreamQueue && not m_DownstreamQueue->empty())
{
if (r->threadpool()->addJob(std::bind(
&TransitHop::DownstreamWork, shared_from_this(), std::move(m_DownstreamQueue), r)))
{
m_DownstreamWorkCounter++;
}
}
m_DownstreamQueue = nullptr;
}

@ -153,12 +153,11 @@ namespace llarp
void
Router::PumpLL()
{
static constexpr size_t PumpJobThreshhold = 50;
static constexpr auto PumpInterval = 25ms;
const auto now = Now();
if (_stopping.load())
return;
if (_logic->numPendingJobs() >= PumpJobThreshhold && _lastPump + PumpInterval >= now)
if (_lastPump + PumpInterval >= now)
{
return;
}

@ -6,20 +6,6 @@
namespace llarp
{
Logic::Logic(size_t)
{
}
Logic::~Logic()
{
}
size_t
Logic::numPendingJobs() const
{
return 0;
}
bool
Logic::queue_job(struct llarp_thread_job job)
{

@ -11,10 +11,6 @@ namespace llarp
class Logic
{
public:
Logic(size_t queueLength = size_t{1024 * 8});
~Logic();
/// stop all operation and wait for that to die
void
stop();
@ -34,9 +30,6 @@ namespace llarp
void
remove_call(uint32_t id);
size_t
numPendingJobs() const;
bool
can_flush() const;

@ -12,8 +12,6 @@ You can view documentation on how to get started [here](https://loki-project.git
[![Build Status](https://drone.lokinet.dev/api/badges/loki-project/loki-network/status.svg?ref=refs/heads/master)](https://drone.lokinet.dev/loki-project/loki-network)
You can find Bleeding edge builds [here](https://builds.lokinet.dev) (these builds may eat your first born).
## Usage
See the [documentation](https://loki-project.github.io/loki-docs/Lokinet/LokinetOverview/) on how to get started.

Loading…
Cancel
Save