Merge pull request #610 from michael-loki/more_thread_pool

Convert more things to use thread::ThreadPool
This commit is contained in:
Jeff 2019-05-19 17:16:14 -04:00 committed by GitHub
commit 65766a501f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 93 additions and 138 deletions

View File

@ -2,7 +2,7 @@
#if defined(_WIN32) #if defined(_WIN32)
/** put win32 stuff here */ /** put win32 stuff here */
#else #else
#include <util/threadpool.h> #include <util/thread_pool.hpp>
#include <util/logger.hpp> #include <util/logger.hpp>
#include <sys/wait.h> #include <sys/wait.h>
#include <unistd.h> #include <unistd.h>
@ -29,13 +29,12 @@ namespace llarp
: public IBackend, : public IBackend,
public std::enable_shared_from_this< ExecShellHookBackend > public std::enable_shared_from_this< ExecShellHookBackend >
{ {
llarp_threadpool *m_ThreadPool; thread::ThreadPool m_ThreadPool;
std::vector< std::string > _args; std::vector< std::string > _args;
std::vector< char * > args; std::vector< char * > args;
ExecShellHookBackend(std::string script) ExecShellHookBackend(std::string script) : m_ThreadPool(1, 1000)
: m_ThreadPool(llarp_init_threadpool(1, script.c_str()))
{ {
do do
{ {
@ -55,21 +54,20 @@ namespace llarp
~ExecShellHookBackend() ~ExecShellHookBackend()
{ {
llarp_threadpool_stop(m_ThreadPool); m_ThreadPool.shutdown();
llarp_free_threadpool(&m_ThreadPool);
} }
bool bool
Start() override Start() override
{ {
llarp_threadpool_start(m_ThreadPool); m_ThreadPool.start();
return true; return true;
} }
bool bool
Stop() override Stop() override
{ {
llarp_threadpool_stop(m_ThreadPool); m_ThreadPool.stop();
return true; return true;
} }
@ -123,8 +121,8 @@ namespace llarp
return _m_env.data(); return _m_env.data();
} }
static void void
Exec(std::shared_ptr< ExecShellHookJob > self) Exec()
{ {
std::thread t([&]() { std::thread t([&]() {
int result = 0; int result = 0;
@ -134,8 +132,7 @@ namespace llarp
if(child) if(child)
::waitpid(child, &result, 0); ::waitpid(child, &result, 0);
else else
::execve(self->m_Parent->Exe(), self->m_Parent->Args(), ::execve(m_Parent->Exe(), m_Parent->Args(), Env());
self->Env());
}); });
t.join(); t.join();
} }
@ -147,7 +144,8 @@ namespace llarp
{ {
auto job = std::make_shared< ExecShellHookJob >(shared_from_this(), auto job = std::make_shared< ExecShellHookJob >(shared_from_this(),
std::move(params)); std::move(params));
m_ThreadPool->QueueFunc([=]() { ExecShellHookJob::Exec(job); });
m_ThreadPool.addJob(std::bind(&ExecShellHookJob::Exec, job));
} }
Backend_ptr Backend_ptr

View File

@ -8,6 +8,7 @@
#include <util/logger.hpp> #include <util/logger.hpp>
#include <util/logic.hpp> #include <util/logic.hpp>
#include <util/mem.hpp> #include <util/mem.hpp>
#include <util/thread_pool.hpp>
#include <fstream> #include <fstream>
#include <unordered_map> #include <unordered_map>
@ -48,31 +49,18 @@ llarp_nodedb::Get(const llarp::RouterID &pk, llarp::RouterContact &result)
return true; return true;
} }
// kill rcs from disk async
struct AsyncKillRCJobs
{
std::set< std::string > files;
static void
Work(void *u)
{
static_cast< AsyncKillRCJobs * >(u)->Kill();
}
void void
Kill() KillRCJobs(const std::set< std::string > &files)
{ {
for(const auto &file : files) for(const auto &file : files)
fs::remove(file); fs::remove(file);
delete this;
} }
};
void void
llarp_nodedb::RemoveIf( llarp_nodedb::RemoveIf(
std::function< bool(const llarp::RouterContact &rc) > filter) std::function< bool(const llarp::RouterContact &rc) > filter)
{ {
AsyncKillRCJobs *job = new AsyncKillRCJobs(); std::set< std::string > files;
{ {
llarp::util::Lock l(&access); llarp::util::Lock l(&access);
auto itr = entries.begin(); auto itr = entries.begin();
@ -80,14 +68,15 @@ llarp_nodedb::RemoveIf(
{ {
if(filter(itr->second)) if(filter(itr->second))
{ {
job->files.insert(getRCFilePath(itr->second.pubkey)); files.insert(getRCFilePath(itr->second.pubkey));
itr = entries.erase(itr); itr = entries.erase(itr);
} }
else else
++itr; ++itr;
} }
} }
llarp_threadpool_queue_job(disk, {job, &AsyncKillRCJobs::Work});
disk->addJob(std::bind(&KillRCJobs, files));
} }
bool bool
@ -118,38 +107,24 @@ llarp_nodedb::getRCFilePath(const llarp::RouterID &pubkey) const
return filepath.string(); return filepath.string();
} }
struct async_insert_rc
{
llarp_nodedb *nodedb;
llarp::RouterContact rc;
llarp::Logic *logic;
std::function< void(void) > completedHook;
async_insert_rc(llarp_nodedb *n, const llarp::RouterContact &r)
: nodedb(n), rc(r)
{
}
};
static void static void
handle_async_insert_rc(void *u) handle_async_insert_rc(llarp_nodedb *nodedb, const llarp::RouterContact &rc,
llarp::Logic *logic,
const std::function< void(void) > &completedHook)
{ {
async_insert_rc *job = static_cast< async_insert_rc * >(u); nodedb->Insert(rc);
job->nodedb->Insert(job->rc); if(logic && completedHook)
if(job->logic && job->completedHook)
{ {
job->logic->queue_func(job->completedHook); logic->queue_func(completedHook);
} }
delete job;
} }
void void
llarp_nodedb::InsertAsync(llarp::RouterContact rc, llarp::Logic *logic, llarp_nodedb::InsertAsync(llarp::RouterContact rc, llarp::Logic *logic,
std::function< void(void) > completionHandler) std::function< void(void) > completionHandler)
{ {
async_insert_rc *ctx = new async_insert_rc(this, rc); disk->addJob(
ctx->completedHook = completionHandler; std::bind(&handle_async_insert_rc, this, rc, logic, completionHandler));
ctx->logic = logic;
llarp_threadpool_queue_job(disk, {ctx, &handle_async_insert_rc});
} }
/// insert and write to disk /// insert and write to disk
@ -304,10 +279,8 @@ logic_threadworker_callback(void *user)
// write it to disk // write it to disk
void void
disk_threadworker_setRC(void *user) disk_threadworker_setRC(llarp_async_verify_rc *verify_request)
{ {
llarp_async_verify_rc *verify_request =
static_cast< llarp_async_verify_rc * >(user);
verify_request->valid = verify_request->nodedb->Insert(verify_request->rc); verify_request->valid = verify_request->nodedb->Insert(verify_request->rc);
if(verify_request->logic) if(verify_request->logic)
verify_request->logic->queue_job( verify_request->logic->queue_job(
@ -327,8 +300,8 @@ crypto_threadworker_verifyrc(void *user)
if(verify_request->valid && rc.IsPublicRouter()) if(verify_request->valid && rc.IsPublicRouter())
{ {
llarp::LogDebug("RC is valid, saving to disk"); llarp::LogDebug("RC is valid, saving to disk");
llarp_threadpool_queue_job(verify_request->diskworker, verify_request->diskworker->addJob(
{verify_request, &disk_threadworker_setRC}); std::bind(&disk_threadworker_setRC, verify_request));
} }
else else
{ {

View File

@ -28,6 +28,11 @@ namespace llarp
{ {
struct Crypto; struct Crypto;
class Logic; class Logic;
namespace thread
{
class ThreadPool;
}
} // namespace llarp } // namespace llarp
struct llarp_nodedb_iter struct llarp_nodedb_iter
@ -40,7 +45,7 @@ struct llarp_nodedb_iter
struct llarp_nodedb struct llarp_nodedb
{ {
llarp_nodedb(llarp::Crypto *c, llarp_threadpool *diskworker) llarp_nodedb(llarp::Crypto *c, llarp::thread::ThreadPool *diskworker)
: crypto(c), disk(diskworker) : crypto(c), disk(diskworker)
{ {
} }
@ -51,7 +56,7 @@ struct llarp_nodedb
} }
llarp::Crypto *crypto; llarp::Crypto *crypto;
llarp_threadpool *disk; llarp::thread::ThreadPool *disk;
mutable llarp::util::Mutex access; // protects entries mutable llarp::util::Mutex access; // protects entries
std::unordered_map< llarp::RouterID, llarp::RouterContact, std::unordered_map< llarp::RouterID, llarp::RouterContact,
llarp::RouterID::Hash > llarp::RouterID::Hash >
@ -146,13 +151,11 @@ struct llarp_async_verify_rc
/// async_verify_context /// async_verify_context
void *user; void *user;
/// nodedb storage /// nodedb storage
struct llarp_nodedb *nodedb; llarp_nodedb *nodedb;
// llarp::Logic for queue_job // llarp::Logic for queue_job
llarp::Logic *logic; // includes a llarp_threadpool llarp::Logic *logic; // includes a llarp_threadpool
// struct llarp::Crypto *crypto; // probably don't need this because we have llarp_threadpool *cryptoworker;
// it in the nodedb llarp::thread::ThreadPool *diskworker;
struct llarp_threadpool *cryptoworker;
struct llarp_threadpool *diskworker;
/// router contact /// router contact
llarp::RouterContact rc; llarp::RouterContact rc;
@ -181,11 +184,11 @@ struct llarp_async_load_rc
/// async_verify_context /// async_verify_context
void *user; void *user;
/// nodedb storage /// nodedb storage
struct llarp_nodedb *nodedb; llarp_nodedb *nodedb;
/// llarp::Logic for calling hook /// llarp::Logic for calling hook
llarp::Logic *logic; llarp::Logic *logic;
/// disk worker threadpool /// disk worker threadpool
struct llarp_threadpool *diskworker; llarp::thread::ThreadPool *diskworker;
/// target pubkey /// target pubkey
llarp::PubKey pubkey; llarp::PubKey pubkey;
/// router contact result /// router contact result

View File

@ -46,6 +46,11 @@ namespace llarp
struct Context; struct Context;
} }
namespace thread
{
class ThreadPool;
}
struct AbstractRouter struct AbstractRouter
{ {
virtual ~AbstractRouter() = 0; virtual ~AbstractRouter() = 0;
@ -96,7 +101,7 @@ namespace llarp
virtual llarp_threadpool * virtual llarp_threadpool *
threadpool() = 0; threadpool() = 0;
virtual llarp_threadpool * virtual thread::ThreadPool *
diskworker() = 0; diskworker() = 0;
virtual service::Context & virtual service::Context &

View File

@ -191,8 +191,8 @@ namespace llarp
{ {
if(!link->IsCompatable(remote)) if(!link->IsCompatable(remote))
continue; continue;
std::shared_ptr< TryConnectJob > job = std::make_shared< TryConnectJob >( std::shared_ptr< TryConnectJob > job =
remote, link, numretries, this); std::make_shared< TryConnectJob >(remote, link, numretries, this);
auto itr = pendingEstablishJobs.emplace(remote.pubkey, job); auto itr = pendingEstablishJobs.emplace(remote.pubkey, job);
if(itr.second) if(itr.second)
{ {
@ -223,6 +223,7 @@ namespace llarp
, _crypto(std::make_unique< sodium::CryptoLibSodium >()) , _crypto(std::make_unique< sodium::CryptoLibSodium >())
, paths(this) , paths(this)
, _exitContext(this) , _exitContext(this)
, disk(1, 1000)
, _dht(llarp_dht_context_new(this)) , _dht(llarp_dht_context_new(this))
, inbound_link_msg_parser(this) , inbound_link_msg_parser(this)
, _hiddenServiceContext(this) , _hiddenServiceContext(this)
@ -231,11 +232,6 @@ namespace llarp
this->ip4addr.sin_family = AF_INET; this->ip4addr.sin_family = AF_INET;
this->ip4addr.sin_port = htons(1090); this->ip4addr.sin_port = htons(1090);
#ifdef TESTNET
disk = tp;
#else
disk = llarp_init_threadpool(1, "llarp-diskio");
#endif
_stopping.store(false); _stopping.store(false);
_running.store(false); _running.store(false);
} }
@ -497,12 +493,11 @@ namespace llarp
} }
/// called in disk worker thread /// called in disk worker thread
static void void
HandleSaveRC(void *u) Router::HandleSaveRC() const
{ {
Router *self = static_cast< Router * >(u); std::string fname = our_rc_file.string();
std::string fname = self->our_rc_file.string(); _rc.Write(fname.c_str());
self->_rc.Write(fname.c_str());
} }
bool bool
@ -515,7 +510,7 @@ namespace llarp
LogError("RC is invalid, not saving"); LogError("RC is invalid, not saving");
return false; return false;
} }
llarp_threadpool_queue_job(diskworker(), {this, &HandleSaveRC}); diskworker()->addJob(std::bind(&Router::HandleSaveRC, this));
return true; return true;
} }
@ -532,6 +527,8 @@ namespace llarp
llarp_ev_loop_stop(_netloop.get()); llarp_ev_loop_stop(_netloop.get());
inboundLinks.clear(); inboundLinks.clear();
outboundLinks.clear(); outboundLinks.clear();
disk.stop();
disk.shutdown();
} }
void void
@ -1333,14 +1330,10 @@ namespace llarp
// save profiles async // save profiles async
if(routerProfiling().ShouldSave(now)) if(routerProfiling().ShouldSave(now))
{ {
llarp_threadpool_queue_job( diskworker()->addJob(
diskworker(), [&]() { routerProfiling().Save(routerProfilesFile.c_str()); });
{this, [](void *u) {
Router *self = static_cast< Router * >(u);
self->routerProfiling().Save(self->routerProfilesFile.c_str());
}});
}
} }
} // namespace llarp
bool bool
Router::Sign(Signature &sig, const llarp_buffer_t &buf) const Router::Sign(Signature &sig, const llarp_buffer_t &buf) const
@ -1504,7 +1497,7 @@ namespace llarp
job->nodedb = _nodedb; job->nodedb = _nodedb;
job->logic = _logic; job->logic = _logic;
job->cryptoworker = tp; job->cryptoworker = tp;
job->diskworker = disk; job->diskworker = &disk;
if(rc.IsPublicRouter()) if(rc.IsPublicRouter())
job->hook = &Router::on_verify_server_rc; job->hook = &Router::on_verify_server_rc;
else else
@ -1567,7 +1560,7 @@ namespace llarp
} }
llarp_threadpool_start(tp); llarp_threadpool_start(tp);
llarp_threadpool_start(disk); disk.start();
for(const auto &rc : bootstrapRCList) for(const auto &rc : bootstrapRCList)
{ {
@ -1959,8 +1952,8 @@ namespace llarp
if(outboundLinks.size() > 0) if(outboundLinks.size() > 0)
return true; return true;
static std::list< std::function< LinkLayer_ptr(Router *) > > static std::list< std::function< LinkLayer_ptr(Router *) > > linkFactories =
linkFactories = {utp::NewServerFromRouter, iwp::NewServerFromRouter}; {utp::NewServerFromRouter, iwp::NewServerFromRouter};
for(const auto &factory : linkFactories) for(const auto &factory : linkFactories)
{ {

View File

@ -170,10 +170,10 @@ namespace llarp
return tp; return tp;
} }
llarp_threadpool * thread::ThreadPool *
diskworker() override diskworker() override
{ {
return disk; return &disk;
} }
// our ipv4 public setting // our ipv4 public setting
@ -189,7 +189,7 @@ namespace llarp
exit::Context _exitContext; exit::Context _exitContext;
SecretKey _identity; SecretKey _identity;
SecretKey _encryption; SecretKey _encryption;
llarp_threadpool *disk; thread::ThreadPool disk;
llarp_dht_context *_dht = nullptr; llarp_dht_context *_dht = nullptr;
llarp_nodedb *_nodedb; llarp_nodedb *_nodedb;
llarp_time_t _startedAt; llarp_time_t _startedAt;
@ -385,6 +385,9 @@ namespace llarp
bool bool
ConnectionToRouterAllowed(const RouterID &router) const override; ConnectionToRouterAllowed(const RouterID &router) const override;
void
HandleSaveRC() const;
bool bool
SaveRC(); SaveRC();

View File

@ -3,7 +3,17 @@
namespace llarp namespace llarp
{ {
FileLogStream::FileLogStream(llarp_threadpool *disk, FILE *f, namespace
{
static void
Flush(const std::deque< std::string > &lines, FILE *const f)
{
for(const auto &line : lines)
fprintf(f, "%s\n", line.c_str());
fflush(f);
}
} // namespace
FileLogStream::FileLogStream(thread::ThreadPool *disk, FILE *f,
llarp_time_t flushInterval) llarp_time_t flushInterval)
: m_Disk(disk), m_File(f), m_FlushInterval(flushInterval) : m_Disk(disk), m_File(f), m_FlushInterval(flushInterval)
{ {
@ -66,24 +76,7 @@ namespace llarp
void void
FileLogStream::FlushLinesToDisk(llarp_time_t now) FileLogStream::FlushLinesToDisk(llarp_time_t now)
{ {
FlushEvent *ev = new FlushEvent(std::move(m_Lines), m_File); m_Disk->addJob(std::bind(&Flush, std::move(m_Lines), m_File));
llarp_threadpool_queue_job(m_Disk, {ev, &FlushEvent::HandleFlush});
m_LastFlush = now; m_LastFlush = now;
} }
void
FileLogStream::FlushEvent::HandleFlush(void *user)
{
static_cast< FileLogStream::FlushEvent * >(user)->Flush();
}
void
FileLogStream::FlushEvent::Flush()
{
for(const auto &line : lines)
fprintf(f, "%s\n", line.c_str());
fflush(f);
delete this;
}
} // namespace llarp } // namespace llarp

View File

@ -2,15 +2,18 @@
#define LLARP_UTIL_FILE_LOGGER_HPP #define LLARP_UTIL_FILE_LOGGER_HPP
#include <util/logstream.hpp> #include <util/logstream.hpp>
#include <util/threadpool.h> #include <util/thread_pool.hpp>
#include <util/time.hpp> #include <util/time.hpp>
#include <deque>
namespace llarp namespace llarp
{ {
/// flushable file based log stream /// flushable file based log stream
struct FileLogStream : public ILogStream struct FileLogStream : public ILogStream
{ {
FileLogStream(llarp_threadpool* disk, FILE* f, llarp_time_t flushInterval); FileLogStream(thread::ThreadPool* disk, FILE* f,
llarp_time_t flushInterval);
~FileLogStream(); ~FileLogStream();
@ -30,29 +33,13 @@ namespace llarp
} }
private: private:
struct FlushEvent
{
FlushEvent(std::deque< std::string > l, FILE* file)
: lines(std::move(l)), f(file)
{
}
const std::deque< std::string > lines;
FILE* const f;
void
Flush();
static void
HandleFlush(void*);
};
bool bool
ShouldFlush(llarp_time_t now) const; ShouldFlush(llarp_time_t now) const;
void void
FlushLinesToDisk(llarp_time_t now); FlushLinesToDisk(llarp_time_t now);
llarp_threadpool* m_Disk; thread::ThreadPool* m_Disk;
FILE* m_File; FILE* m_File;
const llarp_time_t m_FlushInterval; const llarp_time_t m_FlushInterval;
llarp_time_t m_LastFlush = 0; llarp_time_t m_LastFlush = 0;