use limited size queue for logic and singled threaded threadpool

pull/618/head
Jeff Becker 5 years ago
parent 64c7ed42fc
commit c355d37beb
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -12,8 +12,8 @@ namespace llarp
llarp_ev_loop_ptr clientLoop, Logic_ptr clientLogic,
IQueryHandler* h)
: m_ServerLoop(serverLoop)
, m_ServerLogic(serverLogic)
, m_ClientLoop(clientLoop)
, m_ServerLogic(serverLogic)
, m_ClientLogic(clientLogic)
, m_QueryHandler(h)
{

@ -30,7 +30,7 @@ namespace llarp
Logic::queue_job(struct llarp_thread_job job)
{
if(job.user && job.work)
llarp_threadpool_queue_job(this->thread, {job.user, job.work});
queue_func(std::bind(job.work, job.user));
}
void
@ -58,7 +58,26 @@ namespace llarp
bool
Logic::queue_func(std::function< void(void) > f)
{
return this->thread->QueueFunc(f);
size_t left = 1000;
while(!this->thread->QueueFunc(f))
{
// our queue is full
if(this->can_flush())
{
// we can flush the queue here so let's do it
this->tick(llarp::time_now_ms());
}
else
{
// wait a bit and retry queuing because we are not in the same thread as
// we are calling the jobs in
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
left--;
if(left == 0) // too many retries
return false;
}
return true;
}
uint32_t
@ -83,4 +102,10 @@ namespace llarp
llarp_timer_remove_job(this->timer, id);
}
bool
Logic::can_flush() const
{
return ourPID && ourPID == ::getpid();
}
} // namespace llarp

@ -12,14 +12,17 @@ namespace llarp
public:
struct llarp_threadpool* thread;
struct llarp_timer_context* timer;
const pid_t ourPID;
Logic()
: thread(llarp_init_same_process_threadpool())
, timer(llarp_init_timer())
, ourPID(::getpid())
{
}
Logic(struct llarp_threadpool* tp) : thread(tp), timer(llarp_init_timer())
Logic(struct llarp_threadpool* tp)
: thread(tp), timer(llarp_init_timer()), ourPID(0)
{
}
@ -54,6 +57,9 @@ namespace llarp
void
remove_call(uint32_t id);
bool
can_flush() const;
};
} // namespace llarp

@ -41,6 +41,8 @@ llarp_threadpool_stop(struct llarp_threadpool *pool)
llarp::LogDebug("threadpool stop");
if(pool->impl)
pool->impl->stop();
if(pool->jobs)
pool->jobs->disable();
}
void
@ -67,8 +69,16 @@ llarp_threadpool_queue_job(struct llarp_threadpool *pool,
else
{
// single threaded mode
llarp::util::Lock lock(&pool->m_access);
pool->jobs.emplace(std::bind(job.work, job.user));
while(pool->jobs->tryPushBack(std::bind(job.work, job.user))
!= llarp::thread::QueueReturn::Success)
{
if(!pool->jobs->enabled())
return;
if(::getpid() == pool->callingPID)
llarp_threadpool_tick(pool);
else
std::this_thread::sleep_for(std::chrono::microseconds(1000));
}
}
}
@ -77,15 +87,10 @@ llarp_threadpool_tick(struct llarp_threadpool *pool)
{
while(pool->size())
{
std::function< void(void) > job;
{
llarp::util::Lock lock(&pool->m_access);
job = std::move(pool->jobs.front());
pool->jobs.pop();
}
auto job = pool->jobs->tryPopFront();
if(job)
{
job();
(*job)();
}
}
}

@ -3,6 +3,7 @@
#include <util/threading.hpp>
#include <util/threadpool.hpp>
#include <util/queue.hpp>
#include <absl/base/thread_annotations.h>
#include <memory>
@ -11,39 +12,40 @@
struct llarp_threadpool
{
std::unique_ptr< llarp::thread::ThreadPool > impl;
mutable llarp::util::Mutex m_access; // protects jobs
std::queue< std::function< void(void) > > jobs GUARDED_BY(m_access);
std::unique_ptr< llarp::thread::Queue< std::function< void(void) > > > jobs;
const pid_t callingPID;
llarp_threadpool(int workers, const char *name)
: impl(
std::make_unique< llarp::thread::ThreadPool >(workers, workers * 128))
, jobs(nullptr)
, callingPID(0)
{
(void)name;
}
llarp_threadpool()
: jobs(new llarp::thread::Queue< std::function< void(void) > >(128))
, callingPID(::getpid())
{
jobs->enable();
}
size_t
size() const LOCKS_EXCLUDED(m_access)
size() const
{
absl::ReaderMutexLock l(&m_access);
return jobs.size();
if(jobs)
return jobs->size();
return 0;
}
bool
QueueFunc(std::function< void(void) > f) LOCKS_EXCLUDED(m_access)
QueueFunc(std::function< void(void) > f)
{
if(impl)
return impl->tryAddJob(f);
else
{
llarp::util::Lock lock(&m_access);
jobs.emplace(f);
return true;
}
return jobs->tryPushBack(f) == llarp::thread::QueueReturn::Success;
}
};
@ -54,14 +56,6 @@ llarp_init_threadpool(int workers, const char *name);
struct llarp_threadpool *
llarp_init_same_process_threadpool();
typedef bool (*setup_net_func)(void *, bool);
typedef void (*run_main_func)(void *);
/// for network isolation
struct llarp_threadpool *
llarp_init_isolated_net_threadpool(const char *name, setup_net_func setupNet,
run_main_func runMain, void *context);
void
llarp_free_threadpool(struct llarp_threadpool **tp);

Loading…
Cancel
Save