lokinet/llarp/util/thread/thread_pool.cpp
Jason Rhinelander b4440094b0 De-abseil, part 2: mutex, locks, (most) time
- util::Mutex is now a std::shared_timed_mutex, which is capable of
  exclusive and shared locks.

- util::Lock is still present as a std::lock_guard<util::Mutex>.

- the locking annotations are preserved, but updated to the latest
  supported by clang rather than using abseil's older/deprecated ones.

- ACQUIRE_LOCK macro is gone since we don't pass mutexes by pointer into
  locks anymore (WTF abseil).

- ReleasableLock is gone.  Instead there are now some llarp::util helper
  methods to obtain unique and/or shared locks:
    - `auto lock = util::unique_lock(mutex);` gets an RAII-but-also
      unlockable object (std::unique_lock<T>, with T inferred from
      `mutex`).
    - `auto lock = util::shared_lock(mutex);` gets an RAII shared (i.e.
      "reader") lock of the mutex.
    - `auto lock = util::unique_locks(mutex1, mutex2, mutex3);` can be
      used to atomically lock multiple mutexes at once (returning a
      tuple of the locks).
  This are templated on the mutex which makes them a bit more flexible
  than using a concrete type: they can be used for any type of lockable
  mutex, not only util::Mutex.  (Some of the code here uses them for
  getting locks around a std::mutex).  Until C++17, using the RAII types
  is painfully verbose:

  ```C++
  // pre-C++17 - needing to figure out the mutex type here is annoying:
  std::unique_lock<util::Mutex> lock(mutex);
  // pre-C++17 and even more verbose (but at least the type isn't needed):
  std::unique_lock<decltype(mutex)> lock(mutex);
  // our compromise:
  auto lock = util::unique_lock(mutex);
  // C++17:
  std::unique_lock lock(mutex);
  ```

  All of these functions will also warn (under gcc or clang) if you
  discard the return value.  You can also do fancy things like
  `auto l = util::unique_lock(mutex, std::adopt_lock)` (which lets a
  lock take over an already-locked mutex).

- metrics code is gone, which also removes a big pile of code that was
  only used by metrics:
  - llarp::util::Scheduler
  - llarp:🧵:TimerQueue
  - llarp::util::Stopwatch
2020-02-21 23:22:47 -04:00

335 lines
6.4 KiB
C++

#include <util/thread/thread_pool.hpp>
#include <util/thread/threading.hpp>
namespace llarp
{
namespace thread
{
void
ThreadPool::join()
{
for(auto& t : m_threads)
{
if(t.joinable())
{
t.join();
}
}
m_createdThreads = 0;
}
void
ThreadPool::runJobs()
{
while(m_status.load(std::memory_order_relaxed) == Status::Run)
{
auto functor = m_queue.tryPopFront();
if(functor.has_value())
{
functor.value()();
}
else
{
m_idleThreads++;
if(m_status == Status::Run && m_queue.empty())
{
m_semaphore.wait();
}
m_idleThreads.fetch_sub(1, std::memory_order_relaxed);
}
}
}
void
ThreadPool::drainQueue()
{
while(m_status.load(std::memory_order_relaxed) == Status::Drain)
{
auto functor = m_queue.tryPopFront();
if(!functor)
{
return;
}
functor.value()();
}
}
void
ThreadPool::waitThreads()
{
std::unique_lock< std::mutex > lock(m_gateMutex);
m_numThreadsCV.wait(lock, [this] { return allThreadsReady(); });
}
void
ThreadPool::releaseThreads()
{
{
std::lock_guard< std::mutex > lock(m_gateMutex);
m_numThreadsReady = 0;
++m_gateCount;
}
m_gateCV.notify_all();
}
void
ThreadPool::interrupt()
{
std::lock_guard< std::mutex > lock(m_gateMutex);
size_t count = m_idleThreads;
for(size_t i = 0; i < count; ++i)
{
m_semaphore.notify();
}
}
void
ThreadPool::worker()
{
// Lock will be valid until the end of the statement
size_t gateCount =
(std::lock_guard< std::mutex >(m_gateMutex), m_gateCount);
util::SetThreadName(m_name);
for(;;)
{
{
std::unique_lock< std::mutex > lock(m_gateMutex);
++m_numThreadsReady;
m_numThreadsCV.notify_one();
m_gateCV.wait(lock, [&] { return gateCount != m_gateCount; });
gateCount = m_gateCount;
}
Status status = m_status.load(std::memory_order_relaxed);
// Can't use a switch here as we want to load and fall through.
if(status == Status::Run)
{
runJobs();
status = m_status;
}
if(status == Status::Drain)
{
drainQueue();
}
else if(status == Status::Suspend)
{
continue;
}
else
{
assert(status == Status::Stop);
return;
}
}
}
bool
ThreadPool::spawn()
{
try
{
m_threads.at(m_createdThreads) =
std::thread(std::bind(&ThreadPool::worker, this));
++m_createdThreads;
return true;
}
catch(const std::system_error&)
{
return false;
}
}
ThreadPool::ThreadPool(size_t numThreads, size_t maxJobs, string_view name)
: m_queue(maxJobs)
, m_semaphore(0)
, m_idleThreads(0)
, m_status(Status::Stop)
, m_gateCount(0)
, m_numThreadsReady(0)
, m_name(name)
, m_threads(numThreads)
, m_createdThreads(0)
{
assert(numThreads != 0);
assert(maxJobs != 0);
disable();
}
ThreadPool::~ThreadPool()
{
shutdown();
}
bool
ThreadPool::addJob(const Job& job)
{
assert(job);
QueueReturn ret = m_queue.pushBack(job);
if(ret == QueueReturn::Success && m_idleThreads > 0)
{
m_semaphore.notify();
}
return ret == QueueReturn::Success;
}
bool
ThreadPool::addJob(Job&& job)
{
assert(job);
QueueReturn ret = m_queue.pushBack(std::move(job));
if(ret == QueueReturn::Success && m_idleThreads > 0)
{
m_semaphore.notify();
}
return ret == QueueReturn::Success;
}
bool
ThreadPool::tryAddJob(const Job& job)
{
assert(job);
QueueReturn ret = m_queue.tryPushBack(job);
if(ret == QueueReturn::Success && m_idleThreads > 0)
{
m_semaphore.notify();
}
return ret == QueueReturn::Success;
}
bool
ThreadPool::tryAddJob(Job&& job)
{
assert(job);
QueueReturn ret = m_queue.tryPushBack(std::move(job));
if(ret == QueueReturn::Success && m_idleThreads > 0)
{
m_semaphore.notify();
}
return ret == QueueReturn::Success;
}
void
ThreadPool::drain()
{
util::Lock lock(m_mutex);
if(m_status.load(std::memory_order_relaxed) == Status::Run)
{
m_status = Status::Drain;
interrupt();
waitThreads();
m_status = Status::Run;
releaseThreads();
}
}
void
ThreadPool::shutdown()
{
util::Lock lock(m_mutex);
if(m_status.load(std::memory_order_relaxed) == Status::Run)
{
m_queue.disable();
m_status = Status::Stop;
interrupt();
m_queue.removeAll();
join();
}
}
bool
ThreadPool::start()
{
util::Lock lock(m_mutex);
if(m_status.load(std::memory_order_relaxed) != Status::Stop)
{
return true;
}
for(auto it = (m_threads.begin() + m_createdThreads);
it != m_threads.end(); ++it)
{
if(!spawn())
{
releaseThreads();
join();
return false;
}
}
waitThreads();
m_queue.enable();
m_status = Status::Run;
// `releaseThreads` has a release barrier so workers don't return from
// wait and not see the above store.
releaseThreads();
return true;
}
void
ThreadPool::stop()
{
util::Lock lock(m_mutex);
if(m_status.load(std::memory_order_relaxed) == Status::Run)
{
m_queue.disable();
m_status = Status::Drain;
// `interrupt` has an acquire barrier (locks a mutex), so nothing will
// be executed before the above store to `status`.
interrupt();
waitThreads();
m_status = Status::Stop;
// `releaseThreads` has a release barrier so workers don't return from
// wait and not see the above store.
releaseThreads();
join();
}
}
} // namespace thread
} // namespace llarp