Convert router diskworker to use a modern ThreadPool

pull/610/head
Michael 5 years ago
parent 5f823f8ba3
commit 636bb2a17d
No known key found for this signature in database
GPG Key ID: 2D51757B47E2434C

@ -8,6 +8,7 @@
#include <util/logger.hpp>
#include <util/logic.hpp>
#include <util/mem.hpp>
#include <util/thread_pool.hpp>
#include <fstream>
#include <unordered_map>
@ -48,31 +49,18 @@ llarp_nodedb::Get(const llarp::RouterID &pk, llarp::RouterContact &result)
return true;
}
// kill rcs from disk async
struct AsyncKillRCJobs
void
KillRCJobs(const std::set< std::string > &files)
{
std::set< std::string > files;
static void
Work(void *u)
{
static_cast< AsyncKillRCJobs * >(u)->Kill();
}
void
Kill()
{
for(const auto &file : files)
fs::remove(file);
delete this;
}
};
for(const auto &file : files)
fs::remove(file);
}
void
llarp_nodedb::RemoveIf(
std::function< bool(const llarp::RouterContact &rc) > filter)
{
AsyncKillRCJobs *job = new AsyncKillRCJobs();
std::set< std::string > files;
{
llarp::util::Lock l(&access);
auto itr = entries.begin();
@ -80,14 +68,15 @@ llarp_nodedb::RemoveIf(
{
if(filter(itr->second))
{
job->files.insert(getRCFilePath(itr->second.pubkey));
files.insert(getRCFilePath(itr->second.pubkey));
itr = entries.erase(itr);
}
else
++itr;
}
}
llarp_threadpool_queue_job(disk, {job, &AsyncKillRCJobs::Work});
disk->addJob(std::bind(&KillRCJobs, files));
}
bool
@ -118,38 +107,24 @@ llarp_nodedb::getRCFilePath(const llarp::RouterID &pubkey) const
return filepath.string();
}
struct async_insert_rc
{
llarp_nodedb *nodedb;
llarp::RouterContact rc;
llarp::Logic *logic;
std::function< void(void) > completedHook;
async_insert_rc(llarp_nodedb *n, const llarp::RouterContact &r)
: nodedb(n), rc(r)
{
}
};
static void
handle_async_insert_rc(void *u)
handle_async_insert_rc(llarp_nodedb *nodedb, const llarp::RouterContact &rc,
llarp::Logic *logic,
const std::function< void(void) > &completedHook)
{
async_insert_rc *job = static_cast< async_insert_rc * >(u);
job->nodedb->Insert(job->rc);
if(job->logic && job->completedHook)
nodedb->Insert(rc);
if(logic && completedHook)
{
job->logic->queue_func(job->completedHook);
logic->queue_func(completedHook);
}
delete job;
}
void
llarp_nodedb::InsertAsync(llarp::RouterContact rc, llarp::Logic *logic,
std::function< void(void) > completionHandler)
{
async_insert_rc *ctx = new async_insert_rc(this, rc);
ctx->completedHook = completionHandler;
ctx->logic = logic;
llarp_threadpool_queue_job(disk, {ctx, &handle_async_insert_rc});
disk->addJob(
std::bind(&handle_async_insert_rc, this, rc, logic, completionHandler));
}
/// insert and write to disk
@ -304,10 +279,8 @@ logic_threadworker_callback(void *user)
// write it to disk
void
disk_threadworker_setRC(void *user)
disk_threadworker_setRC(llarp_async_verify_rc *verify_request)
{
llarp_async_verify_rc *verify_request =
static_cast< llarp_async_verify_rc * >(user);
verify_request->valid = verify_request->nodedb->Insert(verify_request->rc);
if(verify_request->logic)
verify_request->logic->queue_job(
@ -327,8 +300,8 @@ crypto_threadworker_verifyrc(void *user)
if(verify_request->valid && rc.IsPublicRouter())
{
llarp::LogDebug("RC is valid, saving to disk");
llarp_threadpool_queue_job(verify_request->diskworker,
{verify_request, &disk_threadworker_setRC});
verify_request->diskworker->addJob(
std::bind(&disk_threadworker_setRC, verify_request));
}
else
{

@ -28,6 +28,11 @@ namespace llarp
{
struct Crypto;
class Logic;
namespace thread
{
class ThreadPool;
}
} // namespace llarp
struct llarp_nodedb_iter
@ -40,7 +45,7 @@ struct llarp_nodedb_iter
struct llarp_nodedb
{
llarp_nodedb(llarp::Crypto *c, llarp_threadpool *diskworker)
llarp_nodedb(llarp::Crypto *c, llarp::thread::ThreadPool *diskworker)
: crypto(c), disk(diskworker)
{
}
@ -51,7 +56,7 @@ struct llarp_nodedb
}
llarp::Crypto *crypto;
llarp_threadpool *disk;
llarp::thread::ThreadPool *disk;
mutable llarp::util::Mutex access; // protects entries
std::unordered_map< llarp::RouterID, llarp::RouterContact,
llarp::RouterID::Hash >
@ -146,13 +151,11 @@ struct llarp_async_verify_rc
/// async_verify_context
void *user;
/// nodedb storage
struct llarp_nodedb *nodedb;
llarp_nodedb *nodedb;
// llarp::Logic for queue_job
llarp::Logic *logic; // includes a llarp_threadpool
// struct llarp::Crypto *crypto; // probably don't need this because we have
// it in the nodedb
struct llarp_threadpool *cryptoworker;
struct llarp_threadpool *diskworker;
llarp_threadpool *cryptoworker;
llarp::thread::ThreadPool *diskworker;
/// router contact
llarp::RouterContact rc;
@ -181,11 +184,11 @@ struct llarp_async_load_rc
/// async_verify_context
void *user;
/// nodedb storage
struct llarp_nodedb *nodedb;
llarp_nodedb *nodedb;
/// llarp::Logic for calling hook
llarp::Logic *logic;
/// disk worker threadpool
struct llarp_threadpool *diskworker;
llarp::thread::ThreadPool *diskworker;
/// target pubkey
llarp::PubKey pubkey;
/// router contact result

@ -46,6 +46,11 @@ namespace llarp
struct Context;
}
namespace thread
{
class ThreadPool;
}
struct AbstractRouter
{
virtual ~AbstractRouter() = 0;
@ -96,7 +101,7 @@ namespace llarp
virtual llarp_threadpool *
threadpool() = 0;
virtual llarp_threadpool *
virtual thread::ThreadPool *
diskworker() = 0;
virtual service::Context &

@ -116,7 +116,7 @@ struct TryConnectJob
};
static void
on_try_connecting(std::shared_ptr<TryConnectJob> j)
on_try_connecting(std::shared_ptr< TryConnectJob > j)
{
if(j->Attempt())
j->router->pendingEstablishJobs.erase(j->rc.pubkey);
@ -191,8 +191,8 @@ namespace llarp
{
if(!link->IsCompatable(remote))
continue;
std::shared_ptr< TryConnectJob > job = std::make_shared< TryConnectJob >(
remote, link, numretries, this);
std::shared_ptr< TryConnectJob > job =
std::make_shared< TryConnectJob >(remote, link, numretries, this);
auto itr = pendingEstablishJobs.emplace(remote.pubkey, job);
if(itr.second)
{
@ -223,6 +223,7 @@ namespace llarp
, _crypto(std::make_unique< sodium::CryptoLibSodium >())
, paths(this)
, _exitContext(this)
, disk(1, 1000)
, _dht(llarp_dht_context_new(this))
, inbound_link_msg_parser(this)
, _hiddenServiceContext(this)
@ -231,11 +232,6 @@ namespace llarp
this->ip4addr.sin_family = AF_INET;
this->ip4addr.sin_port = htons(1090);
#ifdef TESTNET
disk = tp;
#else
disk = llarp_init_threadpool(1, "llarp-diskio");
#endif
_stopping.store(false);
_running.store(false);
}
@ -499,12 +495,11 @@ namespace llarp
}
/// called in disk worker thread
static void
HandleSaveRC(void *u)
void
Router::HandleSaveRC() const
{
Router *self = static_cast< Router * >(u);
std::string fname = self->our_rc_file.string();
self->_rc.Write(fname.c_str());
std::string fname = our_rc_file.string();
_rc.Write(fname.c_str());
}
bool
@ -517,7 +512,7 @@ namespace llarp
LogError("RC is invalid, not saving");
return false;
}
llarp_threadpool_queue_job(diskworker(), {this, &HandleSaveRC});
diskworker()->addJob(std::bind(&Router::HandleSaveRC, this));
return true;
}
@ -1335,14 +1330,10 @@ namespace llarp
// save profiles async
if(routerProfiling().ShouldSave(now))
{
llarp_threadpool_queue_job(
diskworker(),
{this, [](void *u) {
Router *self = static_cast< Router * >(u);
self->routerProfiling().Save(self->routerProfilesFile.c_str());
}});
diskworker()->addJob(
[&]() { routerProfiling().Save(routerProfilesFile.c_str()); });
}
}
} // namespace llarp
bool
Router::Sign(Signature &sig, const llarp_buffer_t &buf) const
@ -1506,7 +1497,7 @@ namespace llarp
job->nodedb = _nodedb;
job->logic = _logic;
job->cryptoworker = tp;
job->diskworker = disk;
job->diskworker = &disk;
if(rc.IsPublicRouter())
job->hook = &Router::on_verify_server_rc;
else
@ -1569,7 +1560,7 @@ namespace llarp
}
llarp_threadpool_start(tp);
llarp_threadpool_start(disk);
disk.start();
for(const auto &rc : bootstrapRCList)
{
@ -1961,8 +1952,8 @@ namespace llarp
if(outboundLinks.size() > 0)
return true;
static std::list< std::function< LinkLayer_ptr(Router *) > >
linkFactories = {utp::NewServerFromRouter, iwp::NewServerFromRouter};
static std::list< std::function< LinkLayer_ptr(Router *) > > linkFactories =
{utp::NewServerFromRouter, iwp::NewServerFromRouter};
for(const auto &factory : linkFactories)
{

@ -170,10 +170,10 @@ namespace llarp
return tp;
}
llarp_threadpool *
thread::ThreadPool *
diskworker() override
{
return disk;
return &disk;
}
// our ipv4 public setting
@ -189,7 +189,7 @@ namespace llarp
exit::Context _exitContext;
SecretKey _identity;
SecretKey _encryption;
llarp_threadpool *disk;
thread::ThreadPool disk;
llarp_dht_context *_dht = nullptr;
llarp_nodedb *_nodedb;
llarp_time_t _startedAt;
@ -385,6 +385,9 @@ namespace llarp
bool
ConnectionToRouterAllowed(const RouterID &router) const override;
void
HandleSaveRC() const;
bool
SaveRC();

@ -3,7 +3,17 @@
namespace llarp
{
FileLogStream::FileLogStream(llarp_threadpool *disk, FILE *f,
namespace
{
static void
Flush(const std::deque< std::string > &lines, FILE *const f)
{
for(const auto &line : lines)
fprintf(f, "%s\n", line.c_str());
fflush(f);
}
} // namespace
FileLogStream::FileLogStream(thread::ThreadPool *disk, FILE *f,
llarp_time_t flushInterval)
: m_Disk(disk), m_File(f), m_FlushInterval(flushInterval)
{
@ -66,24 +76,7 @@ namespace llarp
void
FileLogStream::FlushLinesToDisk(llarp_time_t now)
{
FlushEvent *ev = new FlushEvent(std::move(m_Lines), m_File);
llarp_threadpool_queue_job(m_Disk, {ev, &FlushEvent::HandleFlush});
m_Disk->addJob(std::bind(&Flush, std::move(m_Lines), m_File));
m_LastFlush = now;
}
void
FileLogStream::FlushEvent::HandleFlush(void *user)
{
static_cast< FileLogStream::FlushEvent * >(user)->Flush();
}
void
FileLogStream::FlushEvent::Flush()
{
for(const auto &line : lines)
fprintf(f, "%s\n", line.c_str());
fflush(f);
delete this;
}
} // namespace llarp
} // namespace llarp

@ -2,15 +2,18 @@
#define LLARP_UTIL_FILE_LOGGER_HPP
#include <util/logstream.hpp>
#include <util/threadpool.h>
#include <util/thread_pool.hpp>
#include <util/time.hpp>
#include <deque>
namespace llarp
{
/// flushable file based log stream
struct FileLogStream : public ILogStream
{
FileLogStream(llarp_threadpool* disk, FILE* f, llarp_time_t flushInterval);
FileLogStream(thread::ThreadPool* disk, FILE* f,
llarp_time_t flushInterval);
~FileLogStream();
@ -30,29 +33,13 @@ namespace llarp
}
private:
struct FlushEvent
{
FlushEvent(std::deque< std::string > l, FILE* file)
: lines(std::move(l)), f(file)
{
}
const std::deque< std::string > lines;
FILE* const f;
void
Flush();
static void
HandleFlush(void*);
};
bool
ShouldFlush(llarp_time_t now) const;
void
FlushLinesToDisk(llarp_time_t now);
llarp_threadpool* m_Disk;
thread::ThreadPool* m_Disk;
FILE* m_File;
const llarp_time_t m_FlushInterval;
llarp_time_t m_LastFlush = 0;

Loading…
Cancel
Save