From c355d37beb7bfc3b4b3fe2dc4d55eb5102ea21e7 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Wed, 22 May 2019 13:18:19 -0400 Subject: [PATCH] use limited size queue for logic and singled threaded threadpool --- llarp/dns/server.cpp | 2 +- llarp/util/logic.cpp | 29 +++++++++++++++++++++++++++-- llarp/util/logic.hpp | 8 +++++++- llarp/util/threadpool.cpp | 23 ++++++++++++++--------- llarp/util/threadpool.h | 34 ++++++++++++++-------------------- 5 files changed, 63 insertions(+), 33 deletions(-) diff --git a/llarp/dns/server.cpp b/llarp/dns/server.cpp index 17d84e9ce..c1d6230a8 100644 --- a/llarp/dns/server.cpp +++ b/llarp/dns/server.cpp @@ -12,8 +12,8 @@ namespace llarp llarp_ev_loop_ptr clientLoop, Logic_ptr clientLogic, IQueryHandler* h) : m_ServerLoop(serverLoop) - , m_ServerLogic(serverLogic) , m_ClientLoop(clientLoop) + , m_ServerLogic(serverLogic) , m_ClientLogic(clientLogic) , m_QueryHandler(h) { diff --git a/llarp/util/logic.cpp b/llarp/util/logic.cpp index a0475b2af..d04cb7bc2 100644 --- a/llarp/util/logic.cpp +++ b/llarp/util/logic.cpp @@ -30,7 +30,7 @@ namespace llarp Logic::queue_job(struct llarp_thread_job job) { if(job.user && job.work) - llarp_threadpool_queue_job(this->thread, {job.user, job.work}); + queue_func(std::bind(job.work, job.user)); } void @@ -58,7 +58,26 @@ namespace llarp bool Logic::queue_func(std::function< void(void) > f) { - return this->thread->QueueFunc(f); + size_t left = 1000; + while(!this->thread->QueueFunc(f)) + { + // our queue is full + if(this->can_flush()) + { + // we can flush the queue here so let's do it + this->tick(llarp::time_now_ms()); + } + else + { + // wait a bit and retry queuing because we are not in the same thread as + // we are calling the jobs in + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + } + left--; + if(left == 0) // too many retries + return false; + } + return true; } uint32_t @@ -83,4 +102,10 @@ namespace llarp llarp_timer_remove_job(this->timer, id); } + bool + Logic::can_flush() const + { + return ourPID && ourPID == ::getpid(); + } + } // namespace llarp diff --git a/llarp/util/logic.hpp b/llarp/util/logic.hpp index 42e4e1598..c39de9982 100644 --- a/llarp/util/logic.hpp +++ b/llarp/util/logic.hpp @@ -12,14 +12,17 @@ namespace llarp public: struct llarp_threadpool* thread; struct llarp_timer_context* timer; + const pid_t ourPID; Logic() : thread(llarp_init_same_process_threadpool()) , timer(llarp_init_timer()) + , ourPID(::getpid()) { } - Logic(struct llarp_threadpool* tp) : thread(tp), timer(llarp_init_timer()) + Logic(struct llarp_threadpool* tp) + : thread(tp), timer(llarp_init_timer()), ourPID(0) { } @@ -54,6 +57,9 @@ namespace llarp void remove_call(uint32_t id); + + bool + can_flush() const; }; } // namespace llarp diff --git a/llarp/util/threadpool.cpp b/llarp/util/threadpool.cpp index 93244532c..5072a7579 100644 --- a/llarp/util/threadpool.cpp +++ b/llarp/util/threadpool.cpp @@ -41,6 +41,8 @@ llarp_threadpool_stop(struct llarp_threadpool *pool) llarp::LogDebug("threadpool stop"); if(pool->impl) pool->impl->stop(); + if(pool->jobs) + pool->jobs->disable(); } void @@ -67,8 +69,16 @@ llarp_threadpool_queue_job(struct llarp_threadpool *pool, else { // single threaded mode - llarp::util::Lock lock(&pool->m_access); - pool->jobs.emplace(std::bind(job.work, job.user)); + while(pool->jobs->tryPushBack(std::bind(job.work, job.user)) + != llarp::thread::QueueReturn::Success) + { + if(!pool->jobs->enabled()) + return; + if(::getpid() == pool->callingPID) + llarp_threadpool_tick(pool); + else + std::this_thread::sleep_for(std::chrono::microseconds(1000)); + } } } @@ -77,15 +87,10 @@ llarp_threadpool_tick(struct llarp_threadpool *pool) { while(pool->size()) { - std::function< void(void) > job; - { - llarp::util::Lock lock(&pool->m_access); - job = std::move(pool->jobs.front()); - pool->jobs.pop(); - } + auto job = pool->jobs->tryPopFront(); if(job) { - job(); + (*job)(); } } } diff --git a/llarp/util/threadpool.h b/llarp/util/threadpool.h index 095eaf5ec..396e3a35f 100644 --- a/llarp/util/threadpool.h +++ b/llarp/util/threadpool.h @@ -3,6 +3,7 @@ #include #include +#include #include #include @@ -11,39 +12,40 @@ struct llarp_threadpool { std::unique_ptr< llarp::thread::ThreadPool > impl; - - mutable llarp::util::Mutex m_access; // protects jobs - std::queue< std::function< void(void) > > jobs GUARDED_BY(m_access); + std::unique_ptr< llarp::thread::Queue< std::function< void(void) > > > jobs; + const pid_t callingPID; llarp_threadpool(int workers, const char *name) : impl( std::make_unique< llarp::thread::ThreadPool >(workers, workers * 128)) + , jobs(nullptr) + , callingPID(0) { (void)name; } llarp_threadpool() + : jobs(new llarp::thread::Queue< std::function< void(void) > >(128)) + , callingPID(::getpid()) { + jobs->enable(); } size_t - size() const LOCKS_EXCLUDED(m_access) + size() const { - absl::ReaderMutexLock l(&m_access); - return jobs.size(); + if(jobs) + return jobs->size(); + return 0; } bool - QueueFunc(std::function< void(void) > f) LOCKS_EXCLUDED(m_access) + QueueFunc(std::function< void(void) > f) { if(impl) return impl->tryAddJob(f); else - { - llarp::util::Lock lock(&m_access); - jobs.emplace(f); - return true; - } + return jobs->tryPushBack(f) == llarp::thread::QueueReturn::Success; } }; @@ -54,14 +56,6 @@ llarp_init_threadpool(int workers, const char *name); struct llarp_threadpool * llarp_init_same_process_threadpool(); -typedef bool (*setup_net_func)(void *, bool); -typedef void (*run_main_func)(void *); - -/// for network isolation -struct llarp_threadpool * -llarp_init_isolated_net_threadpool(const char *name, setup_net_func setupNet, - run_main_func runMain, void *context); - void llarp_free_threadpool(struct llarp_threadpool **tp);