From 2ff513f0cd9ce9852a7aea889fd1673c29723dce Mon Sep 17 00:00:00 2001 From: Jonathan G Rennison Date: Sun, 9 Apr 2023 01:34:04 +0100 Subject: [PATCH] Worker thread pool: Fix worker wake CV notify condition The worker wake CV was only notified when the job queue was empty. Now notify if the number of queued jobs is less than the number of waiting workers. --- src/worker_thread.cpp | 10 ++++++---- src/worker_thread.h | 3 ++- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/worker_thread.cpp b/src/worker_thread.cpp index 64ef0325d1..a138973a76 100644 --- a/src/worker_thread.cpp +++ b/src/worker_thread.cpp @@ -42,7 +42,7 @@ void WorkerThreadPool::Stop() { std::unique_lock lk(this->lock); this->exit = true; - this->empty_cv.notify_all(); + this->worker_wait_cv.notify_all(); this->done_cv.wait(lk, [this]() { return this->workers == 0; }); } @@ -55,10 +55,10 @@ void WorkerThreadPool::EnqueueJob(WorkerJobFunc *func, void *data1, void *data2, func(data1, data2, data3); return; } - bool notify = this->jobs.empty(); + bool notify = this->jobs.size() < (size_t)this->workers_waiting; this->jobs.push({ func, data1, data2, data3 }); lk.unlock(); - if (notify) this->empty_cv.notify_one(); + if (notify) this->worker_wait_cv.notify_one(); } void WorkerThreadPool::Run(WorkerThreadPool *pool) @@ -66,7 +66,9 @@ void WorkerThreadPool::Run(WorkerThreadPool *pool) std::unique_lock lk(pool->lock); while (!pool->exit || !pool->jobs.empty()) { if (pool->jobs.empty()) { - pool->empty_cv.wait(lk); + pool->workers_waiting++; + pool->worker_wait_cv.wait(lk); + pool->workers_waiting--; } else { WorkerJob job = pool->jobs.front(); pool->jobs.pop(); diff --git a/src/worker_thread.h b/src/worker_thread.h index 2a2b2134a6..6000af3c78 100644 --- a/src/worker_thread.h +++ b/src/worker_thread.h @@ -30,10 +30,11 @@ private: }; uint workers = 0; + uint workers_waiting = 0; bool exit = false; std::mutex lock; std::queue jobs; - std::condition_variable empty_cv; + std::condition_variable worker_wait_cv; std::condition_variable done_cv; static void Run(WorkerThreadPool *pool);