2018-11-17 21:07:04 +00:00
|
|
|
#ifndef LLARP_THREAD_POOL_HPP
|
|
|
|
#define LLARP_THREAD_POOL_HPP
|
|
|
|
|
2019-07-09 00:06:22 +00:00
|
|
|
#include <util/string_view.hpp>
|
2019-09-01 13:26:16 +00:00
|
|
|
#include <util/thread/queue.hpp>
|
|
|
|
#include <util/thread/threading.hpp>
|
2018-11-17 21:07:04 +00:00
|
|
|
|
2019-03-03 20:51:47 +00:00
|
|
|
#include <atomic>
|
2018-11-17 21:07:04 +00:00
|
|
|
#include <functional>
|
|
|
|
#include <thread>
|
|
|
|
#include <vector>
|
|
|
|
|
|
|
|
namespace llarp
|
|
|
|
{
|
|
|
|
namespace thread
|
|
|
|
{
|
|
|
|
class ThreadPool
|
|
|
|
{
|
|
|
|
// Provide an efficient fixed size threadpool. The following attributes
|
|
|
|
// of the threadpool are fixed at construction time:
|
|
|
|
// - the max number of pending jobs
|
|
|
|
// - the number of threads
|
|
|
|
public:
|
|
|
|
using Job = std::function< void() >;
|
|
|
|
using JobQueue = Queue< Job >;
|
|
|
|
|
|
|
|
enum class Status
|
|
|
|
{
|
|
|
|
Stop,
|
|
|
|
Run,
|
|
|
|
Suspend,
|
|
|
|
Drain
|
|
|
|
};
|
|
|
|
|
|
|
|
private:
|
|
|
|
JobQueue m_queue; // The job queue
|
|
|
|
util::Semaphore m_semaphore; // The semaphore for the queue.
|
|
|
|
|
|
|
|
std::atomic_size_t m_idleThreads; // Number of idle threads
|
|
|
|
|
2019-03-03 20:51:47 +00:00
|
|
|
util::Mutex m_mutex;
|
2018-11-17 21:07:04 +00:00
|
|
|
|
|
|
|
std::atomic< Status > m_status;
|
|
|
|
|
2019-03-03 20:51:47 +00:00
|
|
|
size_t m_gateCount GUARDED_BY(m_gateMutex);
|
|
|
|
size_t m_numThreadsReady
|
|
|
|
GUARDED_BY(m_gateMutex); // Threads ready to go through the gate.
|
2018-11-17 21:07:04 +00:00
|
|
|
|
De-abseil, part 2: mutex, locks, (most) time
- util::Mutex is now a std::shared_timed_mutex, which is capable of
exclusive and shared locks.
- util::Lock is still present as a std::lock_guard<util::Mutex>.
- the locking annotations are preserved, but updated to the latest
supported by clang rather than using abseil's older/deprecated ones.
- ACQUIRE_LOCK macro is gone since we don't pass mutexes by pointer into
locks anymore (WTF abseil).
- ReleasableLock is gone. Instead there are now some llarp::util helper
methods to obtain unique and/or shared locks:
- `auto lock = util::unique_lock(mutex);` gets an RAII-but-also
unlockable object (std::unique_lock<T>, with T inferred from
`mutex`).
- `auto lock = util::shared_lock(mutex);` gets an RAII shared (i.e.
"reader") lock of the mutex.
- `auto lock = util::unique_locks(mutex1, mutex2, mutex3);` can be
used to atomically lock multiple mutexes at once (returning a
tuple of the locks).
This are templated on the mutex which makes them a bit more flexible
than using a concrete type: they can be used for any type of lockable
mutex, not only util::Mutex. (Some of the code here uses them for
getting locks around a std::mutex). Until C++17, using the RAII types
is painfully verbose:
```C++
// pre-C++17 - needing to figure out the mutex type here is annoying:
std::unique_lock<util::Mutex> lock(mutex);
// pre-C++17 and even more verbose (but at least the type isn't needed):
std::unique_lock<decltype(mutex)> lock(mutex);
// our compromise:
auto lock = util::unique_lock(mutex);
// C++17:
std::unique_lock lock(mutex);
```
All of these functions will also warn (under gcc or clang) if you
discard the return value. You can also do fancy things like
`auto l = util::unique_lock(mutex, std::adopt_lock)` (which lets a
lock take over an already-locked mutex).
- metrics code is gone, which also removes a big pile of code that was
only used by metrics:
- llarp::util::Scheduler
- llarp::thread::TimerQueue
- llarp::util::Stopwatch
2020-02-21 17:21:11 +00:00
|
|
|
std::mutex m_gateMutex;
|
|
|
|
std::condition_variable m_gateCV;
|
|
|
|
std::condition_variable m_numThreadsCV;
|
2018-11-17 21:07:04 +00:00
|
|
|
|
2019-07-09 00:06:22 +00:00
|
|
|
std::string m_name;
|
2018-11-17 21:07:04 +00:00
|
|
|
std::vector< std::thread > m_threads;
|
|
|
|
size_t m_createdThreads;
|
|
|
|
|
|
|
|
void
|
|
|
|
join();
|
|
|
|
|
|
|
|
void
|
|
|
|
runJobs();
|
|
|
|
|
|
|
|
void
|
|
|
|
drainQueue();
|
|
|
|
|
|
|
|
void
|
|
|
|
waitThreads();
|
|
|
|
|
|
|
|
void
|
|
|
|
releaseThreads();
|
|
|
|
|
|
|
|
void
|
|
|
|
interrupt();
|
|
|
|
|
|
|
|
void
|
|
|
|
worker();
|
|
|
|
|
|
|
|
bool
|
|
|
|
spawn();
|
|
|
|
|
2019-03-03 20:51:47 +00:00
|
|
|
bool
|
De-abseil, part 2: mutex, locks, (most) time
- util::Mutex is now a std::shared_timed_mutex, which is capable of
exclusive and shared locks.
- util::Lock is still present as a std::lock_guard<util::Mutex>.
- the locking annotations are preserved, but updated to the latest
supported by clang rather than using abseil's older/deprecated ones.
- ACQUIRE_LOCK macro is gone since we don't pass mutexes by pointer into
locks anymore (WTF abseil).
- ReleasableLock is gone. Instead there are now some llarp::util helper
methods to obtain unique and/or shared locks:
- `auto lock = util::unique_lock(mutex);` gets an RAII-but-also
unlockable object (std::unique_lock<T>, with T inferred from
`mutex`).
- `auto lock = util::shared_lock(mutex);` gets an RAII shared (i.e.
"reader") lock of the mutex.
- `auto lock = util::unique_locks(mutex1, mutex2, mutex3);` can be
used to atomically lock multiple mutexes at once (returning a
tuple of the locks).
This are templated on the mutex which makes them a bit more flexible
than using a concrete type: they can be used for any type of lockable
mutex, not only util::Mutex. (Some of the code here uses them for
getting locks around a std::mutex). Until C++17, using the RAII types
is painfully verbose:
```C++
// pre-C++17 - needing to figure out the mutex type here is annoying:
std::unique_lock<util::Mutex> lock(mutex);
// pre-C++17 and even more verbose (but at least the type isn't needed):
std::unique_lock<decltype(mutex)> lock(mutex);
// our compromise:
auto lock = util::unique_lock(mutex);
// C++17:
std::unique_lock lock(mutex);
```
All of these functions will also warn (under gcc or clang) if you
discard the return value. You can also do fancy things like
`auto l = util::unique_lock(mutex, std::adopt_lock)` (which lets a
lock take over an already-locked mutex).
- metrics code is gone, which also removes a big pile of code that was
only used by metrics:
- llarp::util::Scheduler
- llarp::thread::TimerQueue
- llarp::util::Stopwatch
2020-02-21 17:21:11 +00:00
|
|
|
allThreadsReady() const REQUIRES_SHARED(m_gateMutex)
|
2019-03-03 20:51:47 +00:00
|
|
|
{
|
|
|
|
return m_numThreadsReady == m_threads.size();
|
|
|
|
}
|
|
|
|
|
2018-11-17 21:07:04 +00:00
|
|
|
public:
|
2019-07-09 00:06:22 +00:00
|
|
|
ThreadPool(size_t numThreads, size_t maxJobs, string_view name);
|
2018-11-17 21:07:04 +00:00
|
|
|
|
|
|
|
~ThreadPool();
|
|
|
|
|
|
|
|
// Disable the threadpool. Calls to `addJob` and `tryAddJob` will fail.
|
|
|
|
// Jobs currently in the pool will not be affected.
|
|
|
|
void
|
|
|
|
disable();
|
|
|
|
|
|
|
|
void
|
|
|
|
enable();
|
|
|
|
|
|
|
|
// Add a job to the bool. Note this call will block if the underlying
|
|
|
|
// queue is full.
|
|
|
|
// Returns false if the queue is currently disabled.
|
|
|
|
bool
|
|
|
|
addJob(const Job& job);
|
|
|
|
bool
|
|
|
|
addJob(Job&& job);
|
|
|
|
|
|
|
|
// Try to add a job to the pool. If the queue is full, or the queue is
|
|
|
|
// disabled, return false.
|
|
|
|
// This call will not block.
|
|
|
|
bool
|
|
|
|
tryAddJob(const Job& job);
|
|
|
|
bool
|
|
|
|
tryAddJob(Job&& job);
|
|
|
|
|
|
|
|
// Wait until all current jobs are complete.
|
|
|
|
// If any jobs are submitted during this time, they **may** or **may not**
|
|
|
|
// run.
|
|
|
|
void
|
|
|
|
drain();
|
|
|
|
|
|
|
|
// Disable this pool, and cancel all pending jobs. After all currently
|
|
|
|
// running jobs are complete, join with the threads in the pool.
|
|
|
|
void
|
|
|
|
shutdown();
|
|
|
|
|
|
|
|
// Start this threadpool by spawning `threadCount()` threads.
|
|
|
|
bool
|
|
|
|
start();
|
|
|
|
|
|
|
|
// Disable queueing on this threadpool and wait until all pending jobs
|
|
|
|
// have finished.
|
|
|
|
void
|
|
|
|
stop();
|
|
|
|
|
|
|
|
bool
|
|
|
|
enabled() const;
|
|
|
|
|
|
|
|
bool
|
|
|
|
started() const;
|
|
|
|
|
|
|
|
size_t
|
|
|
|
activeThreadCount() const;
|
|
|
|
|
|
|
|
// Current number of queued jobs
|
|
|
|
size_t
|
|
|
|
jobCount() const;
|
|
|
|
|
|
|
|
// Number of threads passed in the constructor
|
|
|
|
size_t
|
|
|
|
threadCount() const;
|
|
|
|
|
|
|
|
// Number of threads currently started in the threadpool
|
|
|
|
size_t
|
|
|
|
startedThreadCount() const;
|
|
|
|
|
|
|
|
// Max number of queued jobs
|
|
|
|
size_t
|
|
|
|
capacity() const;
|
|
|
|
};
|
|
|
|
|
|
|
|
inline void
|
|
|
|
ThreadPool::disable()
|
|
|
|
{
|
|
|
|
m_queue.disable();
|
|
|
|
}
|
|
|
|
|
|
|
|
inline void
|
|
|
|
ThreadPool::enable()
|
|
|
|
{
|
|
|
|
m_queue.enable();
|
|
|
|
}
|
|
|
|
|
|
|
|
inline bool
|
|
|
|
ThreadPool::enabled() const
|
|
|
|
{
|
|
|
|
return m_queue.enabled();
|
|
|
|
}
|
|
|
|
|
|
|
|
inline size_t
|
|
|
|
ThreadPool::activeThreadCount() const
|
|
|
|
{
|
|
|
|
if(m_threads.size() == m_createdThreads)
|
|
|
|
{
|
|
|
|
return m_threads.size() - m_idleThreads.load(std::memory_order_relaxed);
|
|
|
|
}
|
2019-07-06 17:03:40 +00:00
|
|
|
|
|
|
|
return 0;
|
2018-11-17 21:07:04 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
inline size_t
|
|
|
|
ThreadPool::threadCount() const
|
|
|
|
{
|
|
|
|
return m_threads.size();
|
|
|
|
}
|
|
|
|
|
|
|
|
inline size_t
|
|
|
|
ThreadPool::startedThreadCount() const
|
|
|
|
{
|
|
|
|
return m_createdThreads;
|
|
|
|
}
|
|
|
|
|
|
|
|
inline size_t
|
|
|
|
ThreadPool::jobCount() const
|
|
|
|
{
|
|
|
|
return m_queue.size();
|
|
|
|
}
|
|
|
|
|
|
|
|
inline size_t
|
|
|
|
ThreadPool::capacity() const
|
|
|
|
{
|
|
|
|
return m_queue.capacity();
|
|
|
|
}
|
|
|
|
} // namespace thread
|
|
|
|
} // namespace llarp
|
|
|
|
|
|
|
|
#endif
|