From 7214f242e0992fc14a91827ca576b87b03b0503a Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Mon, 19 Nov 2018 06:56:40 -0500 Subject: [PATCH] rip out old threadpool code --- llarp/service/endpoint.cpp | 11 +- llarp/threadpool.cpp | 235 +++---------------------------------- llarp/threadpool.hpp | 60 +--------- 3 files changed, 22 insertions(+), 284 deletions(-) diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index 037fa7f84..411bd0dca 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -38,11 +38,6 @@ namespace llarp if(addr.FromString(v)) m_PrefetchAddrs.insert(addr); } - if(k == "netns") - { - m_NetNS = v; - m_OnInit.push_back(std::bind(&Endpoint::IsolateNetwork, this)); - } if(k == "min-latency") { auto val = atoi(v.c_str()); @@ -55,11 +50,7 @@ namespace llarp bool Endpoint::IsolateNetwork() { - llarp::LogInfo("isolating network to namespace ", m_NetNS); - m_IsolatedWorker = llarp_init_isolated_net_threadpool( - m_NetNS.c_str(), &SetupIsolatedNetwork, &RunIsolatedMainLoop, this); - m_IsolatedLogic = llarp_init_single_process_logic(m_IsolatedWorker); - return true; + return false; } llarp_ev_loop* diff --git a/llarp/threadpool.cpp b/llarp/threadpool.cpp index c090d3db5..2959dfffe 100644 --- a/llarp/threadpool.cpp +++ b/llarp/threadpool.cpp @@ -1,5 +1,4 @@ #include "threadpool.hpp" -#include #include #include @@ -8,215 +7,21 @@ #include "logger.hpp" -#if(__FreeBSD__) || (__OpenBSD__) || (__NetBSD__) -#include -#endif - -#ifdef __linux__ -#ifndef ANDROID -#include -#endif -#endif - -namespace llarp -{ - namespace thread - { - void - Pool::Spawn(size_t workers, const char *name) - { - stop = false; - while(workers--) - { - threads.emplace_back([this, name] { - if(name) - { -#if(__APPLE__ && __MACH__) - pthread_setname_np(name); -#elif(__FreeBSD__) || (__OpenBSD__) || (__NetBSD__) - pthread_set_name_np(pthread_self(), name); -#elif(__linux__) || (__MINGW32__) - pthread_setname_np(pthread_self(), name); -#endif - } - for(;;) - { - Job_t job; - { - lock_t lock(this->queue_mutex); - this->condition.WaitUntil( - lock, [this] { return this->stop || !this->jobs.empty(); }); - if(this->stop) - { - // discard pending jobs - while(this->jobs.size()) - { - this->jobs.pop(); - } - return; - } - job = std::move(this->jobs.top()); - this->jobs.pop(); - } - // do work - job(); - } - }); - } - } - - void - Pool::Stop() - { - { - lock_t lock(queue_mutex); - stop = true; - } - condition.NotifyAll(); - } - - void - Pool::Join() - { - for(auto &t : threads) - t.join(); - threads.clear(); - done.NotifyAll(); - } - - void - Pool::QueueJob(const llarp_thread_job &job) - { - { - lock_t lock(queue_mutex); - - // don't allow enqueueing after stopping the pool - if(stop) - return; - jobs.emplace(ids++, job); - } - condition.NotifyOne(); - } - - void - IsolatedPool::Spawn(size_t workers, const char *name) - { - IsolatedPool *self = this; - self->IsolatedName = name; - self->m_IsolatedWorkers = workers; - m_isolated = new std::thread([self] { - if(!self->IsolateCurrentProcess()) - { - llarp::LogError("isolation failed: ", strerror(errno)); - self->Fail(); - return; - } - llarp::LogInfo("spawning isolated environment"); - self->Pool::Spawn(self->m_IsolatedWorkers, self->IsolatedName); - if(self->Isolated()) - { - self->MainLoop(); - } - }); - } - - void - IsolatedPool::Join() - { - Pool::Join(); - if(m_isolated) - { - m_isolated->join(); - delete m_isolated; - m_isolated = nullptr; - } - } - - _NetIsolatedPool::_NetIsolatedPool( - std::function< bool(void *, bool) > setupNet, - std::function< void(void *) > runMain, void *user) - : IsolatedPool(0) - - { - m_NetSetup = setupNet; - m_RunMain = runMain; - m_user = user; - } - -#ifdef __linux__ -#if defined(ANDROID) || defined(RPI) -#else - struct LinuxNetNSIsolatedPool : public _NetIsolatedPool - { - LinuxNetNSIsolatedPool(std::function< bool(void *, bool) > setup, - std::function< void(void *) > run, void *user) - : _NetIsolatedPool(setup, run, user) - { - } - - bool - IsolateNetwork() - { - return ::llarp::GNULinux::NetNSSwitch(IsolatedName); - } - }; - - typedef LinuxNetNSIsolatedPool NetIsolatedPool; -#define NET_ISOLATION_SUPPORTED -#endif -#endif - -#if defined(__FreeBSD__) - struct FreeBSDJailedThreadPool : public _NetIsolatedPool - { - FreeBSDJailedThreadPool(std::function< bool(void *, bool) > setup, - std::function< void(void *) > run, void *user) - : _NetIsolatedPool(setup, run, user) - { - } - - bool - IsolateNetwork() - { - // TODO: implement me - return false; - } - }; - typedef FreeBSDJailedThreadPool NetIsolatedPool; -#define NET_ISOLATION_SUPPORTED -#endif - - } // namespace thread -} // namespace llarp - struct llarp_threadpool { - llarp::thread::Pool *impl; + std::unique_ptr< llarp::thread::Pool > impl; llarp::util::Mutex m_access; uint32_t ids = 0; - std::queue< llarp::thread::Pool::Job_t > jobs; + std::queue< std::function< void(void) > > jobs; - llarp_threadpool(int workers, const char *name, bool isolate, - __attribute__((unused)) setup_net_func setup = nullptr, - __attribute__((unused)) run_main_func runmain = nullptr, - __attribute__((unused)) void *user = nullptr) + llarp_threadpool(int workers, const char *name) { -#ifdef NET_ISOLATION_SUPPORTED - if(isolate) - impl = new llarp::thread::NetIsolatedPool(setup, runmain, user); - else -#else - if(isolate) - { - llarp::LogError("network isolation not supported"); - } -#endif - impl = new llarp::thread::Pool(); - impl->Spawn(workers, name); + (void)name; + impl.reset(new llarp::thread::Pool(workers, workers * 128)); } - llarp_threadpool() : impl(nullptr) + llarp_threadpool() { } }; @@ -226,7 +31,7 @@ llarp_init_threadpool(int workers, const char *name) { if(workers <= 0) workers = 1; - return new llarp_threadpool(workers, name, false); + return new llarp_threadpool(workers, name); } struct llarp_threadpool * @@ -235,24 +40,19 @@ llarp_init_same_process_threadpool() return new llarp_threadpool(); } -struct llarp_threadpool * -llarp_init_isolated_net_threadpool(const char *name, setup_net_func setup, - run_main_func runmain, void *context) -{ - return new llarp_threadpool(1, name, true, setup, runmain, context); -} - void llarp_threadpool_join(struct llarp_threadpool *pool) { llarp::LogDebug("threadpool join"); if(pool->impl) - pool->impl->Join(); + pool->impl->drain(); } void -llarp_threadpool_start(__attribute__((unused)) struct llarp_threadpool *pool) -{ /** no op */ +llarp_threadpool_start(struct llarp_threadpool *pool) +{ + if(pool->impl) + pool->impl->start(); } void @@ -260,7 +60,7 @@ llarp_threadpool_stop(struct llarp_threadpool *pool) { llarp::LogDebug("threadpool stop"); if(pool->impl) - pool->impl->Stop(); + pool->impl->stop(); } void @@ -270,8 +70,7 @@ llarp_threadpool_wait(struct llarp_threadpool *pool) llarp::LogDebug("threadpool wait"); if(pool->impl) { - llarp::util::Lock lock(mtx); - pool->impl->done.Wait(lock); + pool->impl->drain(); } } @@ -280,12 +79,12 @@ llarp_threadpool_queue_job(struct llarp_threadpool *pool, struct llarp_thread_job job) { if(pool->impl) - pool->impl->QueueJob(job); + pool->impl->addJob(std::bind(job.work, job.user)); else { // single threaded mode llarp::util::Lock lock(pool->m_access); - pool->jobs.emplace(++pool->ids, job); + pool->jobs.emplace(std::bind(job.work, job.user)); } } @@ -294,7 +93,7 @@ llarp_threadpool_tick(struct llarp_threadpool *pool) { while(pool->jobs.size()) { - llarp::thread::Pool::Job_t job; + std::function< void(void) > job; { llarp::util::Lock lock(pool->m_access); job = std::move(pool->jobs.front()); diff --git a/llarp/threadpool.hpp b/llarp/threadpool.hpp index 353bfc56b..6d6398c78 100644 --- a/llarp/threadpool.hpp +++ b/llarp/threadpool.hpp @@ -4,11 +4,7 @@ #include #include -#include -#include - -#include -#include +#include "thread_pool.hpp" namespace llarp { @@ -16,64 +12,16 @@ namespace llarp { typedef util::Mutex mtx_t; typedef util::Lock lock_t; - struct Pool - { - virtual void - Spawn(size_t sz, const char* name); - - void - QueueJob(const llarp_thread_job& job); - - virtual void - Join(); - void - Stop(); - std::vector< std::thread > threads; - - struct Job_t - { - uint32_t id; - void* user; - llarp_thread_work_func work; - - Job_t() = default; - - Job_t(uint32_t jobid, const llarp_thread_job& j) - : id(jobid), user(j.user), work(j.work) - { - } - - bool - operator<(const Job_t& j) const - { - return id < j.id; - } - - void - operator()() const - { - work(user); - } - }; - - std::priority_queue< Job_t > jobs; - uint32_t ids = 0; - mtx_t queue_mutex; - util::Condition condition; - util::Condition done; - bool stop; - }; + using Pool = ThreadPool; struct IsolatedPool : public Pool { - IsolatedPool(int flags) : Pool(), m_flags(flags) + IsolatedPool(size_t workers, int flags) + : Pool(workers, workers * 128), m_flags(flags) { } - virtual void - Spawn(size_t workers, const char* name); - void Join();