diff --git a/llarp/hook/shell.cpp b/llarp/hook/shell.cpp index 9fa4ce587..c232f8726 100644 --- a/llarp/hook/shell.cpp +++ b/llarp/hook/shell.cpp @@ -2,7 +2,7 @@ #if defined(_WIN32) /** put win32 stuff here */ #else -#include +#include #include #include #include @@ -29,13 +29,12 @@ namespace llarp : public IBackend, public std::enable_shared_from_this< ExecShellHookBackend > { - llarp_threadpool *m_ThreadPool; + thread::ThreadPool m_ThreadPool; std::vector< std::string > _args; std::vector< char * > args; - ExecShellHookBackend(std::string script) - : m_ThreadPool(llarp_init_threadpool(1, script.c_str())) + ExecShellHookBackend(std::string script) : m_ThreadPool(1, 1000) { do { @@ -55,21 +54,20 @@ namespace llarp ~ExecShellHookBackend() { - llarp_threadpool_stop(m_ThreadPool); - llarp_free_threadpool(&m_ThreadPool); + m_ThreadPool.shutdown(); } bool Start() override { - llarp_threadpool_start(m_ThreadPool); + m_ThreadPool.start(); return true; } bool Stop() override { - llarp_threadpool_stop(m_ThreadPool); + m_ThreadPool.stop(); return true; } @@ -123,8 +121,8 @@ namespace llarp return _m_env.data(); } - static void - Exec(std::shared_ptr< ExecShellHookJob > self) + void + Exec() { std::thread t([&]() { int result = 0; @@ -134,8 +132,7 @@ namespace llarp if(child) ::waitpid(child, &result, 0); else - ::execve(self->m_Parent->Exe(), self->m_Parent->Args(), - self->Env()); + ::execve(m_Parent->Exe(), m_Parent->Args(), Env()); }); t.join(); } @@ -147,7 +144,8 @@ namespace llarp { auto job = std::make_shared< ExecShellHookJob >(shared_from_this(), std::move(params)); - m_ThreadPool->QueueFunc([=]() { ExecShellHookJob::Exec(job); }); + + m_ThreadPool.addJob(std::bind(&ExecShellHookJob::Exec, job)); } Backend_ptr diff --git a/llarp/nodedb.cpp b/llarp/nodedb.cpp index c32523a88..b382a62b8 100644 --- a/llarp/nodedb.cpp +++ b/llarp/nodedb.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include @@ -48,31 +49,18 @@ llarp_nodedb::Get(const llarp::RouterID &pk, llarp::RouterContact &result) return true; } -// kill rcs from disk async -struct AsyncKillRCJobs +void +KillRCJobs(const std::set< std::string > &files) { - std::set< std::string > files; - - static void - Work(void *u) - { - static_cast< AsyncKillRCJobs * >(u)->Kill(); - } - - void - Kill() - { - for(const auto &file : files) - fs::remove(file); - delete this; - } -}; + for(const auto &file : files) + fs::remove(file); +} void llarp_nodedb::RemoveIf( std::function< bool(const llarp::RouterContact &rc) > filter) { - AsyncKillRCJobs *job = new AsyncKillRCJobs(); + std::set< std::string > files; { llarp::util::Lock l(&access); auto itr = entries.begin(); @@ -80,14 +68,15 @@ llarp_nodedb::RemoveIf( { if(filter(itr->second)) { - job->files.insert(getRCFilePath(itr->second.pubkey)); + files.insert(getRCFilePath(itr->second.pubkey)); itr = entries.erase(itr); } else ++itr; } } - llarp_threadpool_queue_job(disk, {job, &AsyncKillRCJobs::Work}); + + disk->addJob(std::bind(&KillRCJobs, files)); } bool @@ -118,38 +107,24 @@ llarp_nodedb::getRCFilePath(const llarp::RouterID &pubkey) const return filepath.string(); } -struct async_insert_rc -{ - llarp_nodedb *nodedb; - llarp::RouterContact rc; - llarp::Logic *logic; - std::function< void(void) > completedHook; - async_insert_rc(llarp_nodedb *n, const llarp::RouterContact &r) - : nodedb(n), rc(r) - { - } -}; - static void -handle_async_insert_rc(void *u) +handle_async_insert_rc(llarp_nodedb *nodedb, const llarp::RouterContact &rc, + llarp::Logic *logic, + const std::function< void(void) > &completedHook) { - async_insert_rc *job = static_cast< async_insert_rc * >(u); - job->nodedb->Insert(job->rc); - if(job->logic && job->completedHook) + nodedb->Insert(rc); + if(logic && completedHook) { - job->logic->queue_func(job->completedHook); + logic->queue_func(completedHook); } - delete job; } void llarp_nodedb::InsertAsync(llarp::RouterContact rc, llarp::Logic *logic, std::function< void(void) > completionHandler) { - async_insert_rc *ctx = new async_insert_rc(this, rc); - ctx->completedHook = completionHandler; - ctx->logic = logic; - llarp_threadpool_queue_job(disk, {ctx, &handle_async_insert_rc}); + disk->addJob( + std::bind(&handle_async_insert_rc, this, rc, logic, completionHandler)); } /// insert and write to disk @@ -304,10 +279,8 @@ logic_threadworker_callback(void *user) // write it to disk void -disk_threadworker_setRC(void *user) +disk_threadworker_setRC(llarp_async_verify_rc *verify_request) { - llarp_async_verify_rc *verify_request = - static_cast< llarp_async_verify_rc * >(user); verify_request->valid = verify_request->nodedb->Insert(verify_request->rc); if(verify_request->logic) verify_request->logic->queue_job( @@ -327,8 +300,8 @@ crypto_threadworker_verifyrc(void *user) if(verify_request->valid && rc.IsPublicRouter()) { llarp::LogDebug("RC is valid, saving to disk"); - llarp_threadpool_queue_job(verify_request->diskworker, - {verify_request, &disk_threadworker_setRC}); + verify_request->diskworker->addJob( + std::bind(&disk_threadworker_setRC, verify_request)); } else { diff --git a/llarp/nodedb.hpp b/llarp/nodedb.hpp index f6aba9597..2850b0e75 100644 --- a/llarp/nodedb.hpp +++ b/llarp/nodedb.hpp @@ -28,6 +28,11 @@ namespace llarp { struct Crypto; class Logic; + + namespace thread + { + class ThreadPool; + } } // namespace llarp struct llarp_nodedb_iter @@ -40,7 +45,7 @@ struct llarp_nodedb_iter struct llarp_nodedb { - llarp_nodedb(llarp::Crypto *c, llarp_threadpool *diskworker) + llarp_nodedb(llarp::Crypto *c, llarp::thread::ThreadPool *diskworker) : crypto(c), disk(diskworker) { } @@ -51,7 +56,7 @@ struct llarp_nodedb } llarp::Crypto *crypto; - llarp_threadpool *disk; + llarp::thread::ThreadPool *disk; mutable llarp::util::Mutex access; // protects entries std::unordered_map< llarp::RouterID, llarp::RouterContact, llarp::RouterID::Hash > @@ -146,13 +151,11 @@ struct llarp_async_verify_rc /// async_verify_context void *user; /// nodedb storage - struct llarp_nodedb *nodedb; + llarp_nodedb *nodedb; // llarp::Logic for queue_job llarp::Logic *logic; // includes a llarp_threadpool - // struct llarp::Crypto *crypto; // probably don't need this because we have - // it in the nodedb - struct llarp_threadpool *cryptoworker; - struct llarp_threadpool *diskworker; + llarp_threadpool *cryptoworker; + llarp::thread::ThreadPool *diskworker; /// router contact llarp::RouterContact rc; @@ -181,11 +184,11 @@ struct llarp_async_load_rc /// async_verify_context void *user; /// nodedb storage - struct llarp_nodedb *nodedb; + llarp_nodedb *nodedb; /// llarp::Logic for calling hook llarp::Logic *logic; /// disk worker threadpool - struct llarp_threadpool *diskworker; + llarp::thread::ThreadPool *diskworker; /// target pubkey llarp::PubKey pubkey; /// router contact result diff --git a/llarp/router/abstractrouter.hpp b/llarp/router/abstractrouter.hpp index c7d94d0a7..59fcd8b22 100644 --- a/llarp/router/abstractrouter.hpp +++ b/llarp/router/abstractrouter.hpp @@ -46,6 +46,11 @@ namespace llarp struct Context; } + namespace thread + { + class ThreadPool; + } + struct AbstractRouter { virtual ~AbstractRouter() = 0; @@ -96,7 +101,7 @@ namespace llarp virtual llarp_threadpool * threadpool() = 0; - virtual llarp_threadpool * + virtual thread::ThreadPool * diskworker() = 0; virtual service::Context & diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index 6991a2690..a047b2d95 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -116,7 +116,7 @@ struct TryConnectJob }; static void -on_try_connecting(std::shared_ptr j) +on_try_connecting(std::shared_ptr< TryConnectJob > j) { if(j->Attempt()) j->router->pendingEstablishJobs.erase(j->rc.pubkey); @@ -191,8 +191,8 @@ namespace llarp { if(!link->IsCompatable(remote)) continue; - std::shared_ptr< TryConnectJob > job = std::make_shared< TryConnectJob >( - remote, link, numretries, this); + std::shared_ptr< TryConnectJob > job = + std::make_shared< TryConnectJob >(remote, link, numretries, this); auto itr = pendingEstablishJobs.emplace(remote.pubkey, job); if(itr.second) { @@ -223,6 +223,7 @@ namespace llarp , _crypto(std::make_unique< sodium::CryptoLibSodium >()) , paths(this) , _exitContext(this) + , disk(1, 1000) , _dht(llarp_dht_context_new(this)) , inbound_link_msg_parser(this) , _hiddenServiceContext(this) @@ -231,11 +232,6 @@ namespace llarp this->ip4addr.sin_family = AF_INET; this->ip4addr.sin_port = htons(1090); -#ifdef TESTNET - disk = tp; -#else - disk = llarp_init_threadpool(1, "llarp-diskio"); -#endif _stopping.store(false); _running.store(false); } @@ -497,12 +493,11 @@ namespace llarp } /// called in disk worker thread - static void - HandleSaveRC(void *u) + void + Router::HandleSaveRC() const { - Router *self = static_cast< Router * >(u); - std::string fname = self->our_rc_file.string(); - self->_rc.Write(fname.c_str()); + std::string fname = our_rc_file.string(); + _rc.Write(fname.c_str()); } bool @@ -515,7 +510,7 @@ namespace llarp LogError("RC is invalid, not saving"); return false; } - llarp_threadpool_queue_job(diskworker(), {this, &HandleSaveRC}); + diskworker()->addJob(std::bind(&Router::HandleSaveRC, this)); return true; } @@ -532,6 +527,8 @@ namespace llarp llarp_ev_loop_stop(_netloop.get()); inboundLinks.clear(); outboundLinks.clear(); + disk.stop(); + disk.shutdown(); } void @@ -1333,14 +1330,10 @@ namespace llarp // save profiles async if(routerProfiling().ShouldSave(now)) { - llarp_threadpool_queue_job( - diskworker(), - {this, [](void *u) { - Router *self = static_cast< Router * >(u); - self->routerProfiling().Save(self->routerProfilesFile.c_str()); - }}); + diskworker()->addJob( + [&]() { routerProfiling().Save(routerProfilesFile.c_str()); }); } - } + } // namespace llarp bool Router::Sign(Signature &sig, const llarp_buffer_t &buf) const @@ -1504,7 +1497,7 @@ namespace llarp job->nodedb = _nodedb; job->logic = _logic; job->cryptoworker = tp; - job->diskworker = disk; + job->diskworker = &disk; if(rc.IsPublicRouter()) job->hook = &Router::on_verify_server_rc; else @@ -1567,7 +1560,7 @@ namespace llarp } llarp_threadpool_start(tp); - llarp_threadpool_start(disk); + disk.start(); for(const auto &rc : bootstrapRCList) { @@ -1959,8 +1952,8 @@ namespace llarp if(outboundLinks.size() > 0) return true; - static std::list< std::function< LinkLayer_ptr(Router *) > > - linkFactories = {utp::NewServerFromRouter, iwp::NewServerFromRouter}; + static std::list< std::function< LinkLayer_ptr(Router *) > > linkFactories = + {utp::NewServerFromRouter, iwp::NewServerFromRouter}; for(const auto &factory : linkFactories) { diff --git a/llarp/router/router.hpp b/llarp/router/router.hpp index 417e6792f..96bad2cf5 100644 --- a/llarp/router/router.hpp +++ b/llarp/router/router.hpp @@ -170,10 +170,10 @@ namespace llarp return tp; } - llarp_threadpool * + thread::ThreadPool * diskworker() override { - return disk; + return &disk; } // our ipv4 public setting @@ -189,7 +189,7 @@ namespace llarp exit::Context _exitContext; SecretKey _identity; SecretKey _encryption; - llarp_threadpool *disk; + thread::ThreadPool disk; llarp_dht_context *_dht = nullptr; llarp_nodedb *_nodedb; llarp_time_t _startedAt; @@ -385,6 +385,9 @@ namespace llarp bool ConnectionToRouterAllowed(const RouterID &router) const override; + void + HandleSaveRC() const; + bool SaveRC(); diff --git a/llarp/util/file_logger.cpp b/llarp/util/file_logger.cpp index d73bd8bb4..3561685b3 100644 --- a/llarp/util/file_logger.cpp +++ b/llarp/util/file_logger.cpp @@ -3,7 +3,17 @@ namespace llarp { - FileLogStream::FileLogStream(llarp_threadpool *disk, FILE *f, + namespace + { + static void + Flush(const std::deque< std::string > &lines, FILE *const f) + { + for(const auto &line : lines) + fprintf(f, "%s\n", line.c_str()); + fflush(f); + } + } // namespace + FileLogStream::FileLogStream(thread::ThreadPool *disk, FILE *f, llarp_time_t flushInterval) : m_Disk(disk), m_File(f), m_FlushInterval(flushInterval) { @@ -66,24 +76,7 @@ namespace llarp void FileLogStream::FlushLinesToDisk(llarp_time_t now) { - FlushEvent *ev = new FlushEvent(std::move(m_Lines), m_File); - llarp_threadpool_queue_job(m_Disk, {ev, &FlushEvent::HandleFlush}); + m_Disk->addJob(std::bind(&Flush, std::move(m_Lines), m_File)); m_LastFlush = now; } - - void - FileLogStream::FlushEvent::HandleFlush(void *user) - { - static_cast< FileLogStream::FlushEvent * >(user)->Flush(); - } - - void - FileLogStream::FlushEvent::Flush() - { - for(const auto &line : lines) - fprintf(f, "%s\n", line.c_str()); - fflush(f); - delete this; - } - -} // namespace llarp \ No newline at end of file +} // namespace llarp diff --git a/llarp/util/file_logger.hpp b/llarp/util/file_logger.hpp index 743193e17..0233b260b 100644 --- a/llarp/util/file_logger.hpp +++ b/llarp/util/file_logger.hpp @@ -2,15 +2,18 @@ #define LLARP_UTIL_FILE_LOGGER_HPP #include -#include +#include #include +#include + namespace llarp { /// flushable file based log stream struct FileLogStream : public ILogStream { - FileLogStream(llarp_threadpool* disk, FILE* f, llarp_time_t flushInterval); + FileLogStream(thread::ThreadPool* disk, FILE* f, + llarp_time_t flushInterval); ~FileLogStream(); @@ -30,29 +33,13 @@ namespace llarp } private: - struct FlushEvent - { - FlushEvent(std::deque< std::string > l, FILE* file) - : lines(std::move(l)), f(file) - { - } - - const std::deque< std::string > lines; - FILE* const f; - - void - Flush(); - static void - HandleFlush(void*); - }; - bool ShouldFlush(llarp_time_t now) const; void FlushLinesToDisk(llarp_time_t now); - llarp_threadpool* m_Disk; + thread::ThreadPool* m_Disk; FILE* m_File; const llarp_time_t m_FlushInterval; llarp_time_t m_LastFlush = 0;