Merge pull request #1036 from majestrate/reduce-disk-io-2020-01-12

reduce disk io
pull/1043/head
Jason Rhinelander 4 years ago committed by GitHub
commit 991c9e1cfd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -151,33 +151,35 @@ llarp_nodedb::UpdateAsyncIfNewer(llarp::RouterContact rc,
/// insert and write to disk
bool
llarp_nodedb::Insert(const llarp::RouterContact &rc)
llarp_nodedb::Insert(const llarp::RouterContact &rc, bool writeToDisk)
{
std::array< byte_t, MAX_RC_SIZE > tmp;
llarp_buffer_t buf(tmp);
if(writeToDisk)
{
std::array< byte_t, MAX_RC_SIZE > tmp;
llarp_buffer_t buf(tmp);
if(!rc.BEncode(&buf))
return false;
if(!rc.BEncode(&buf))
return false;
buf.sz = buf.cur - buf.base;
auto filepath = getRCFilePath(rc.pubkey);
llarp::LogDebug("saving RC.pubkey ", filepath);
auto optional_ofs = llarp::util::OpenFileStream< std::ofstream >(
filepath,
std::ofstream::out | std::ofstream::binary | std::ofstream::trunc);
if(!optional_ofs)
return false;
auto &ofs = optional_ofs.value();
ofs.write((char *)buf.base, buf.sz);
ofs.flush();
ofs.close();
if(!ofs)
{
llarp::LogError("Failed to write: ", filepath);
return false;
buf.sz = buf.cur - buf.base;
auto filepath = getRCFilePath(rc.pubkey);
llarp::LogDebug("saving RC.pubkey ", filepath);
auto optional_ofs = llarp::util::OpenFileStream< std::ofstream >(
filepath,
std::ofstream::out | std::ofstream::binary | std::ofstream::trunc);
if(!optional_ofs)
return false;
auto &ofs = optional_ofs.value();
ofs.write((char *)buf.base, buf.sz);
ofs.flush();
ofs.close();
if(!ofs)
{
llarp::LogError("Failed to write: ", filepath);
return false;
}
llarp::LogDebug("saved RC.pubkey: ", filepath);
}
llarp::LogDebug("saved RC.pubkey: ", filepath);
// save rc after writing to disk
{
llarp::util::Lock lock(&access);
auto itr = entries.find(rc.pubkey.as_array());
@ -212,6 +214,7 @@ llarp_nodedb::Load(const fs::path &path)
if(l > 0)
loaded += l;
}
m_NextSaveToDisk = llarp::time_now_ms() + m_SaveInterval;
return loaded;
}
@ -241,10 +244,19 @@ llarp_nodedb::SaveAll()
}
}
bool
llarp_nodedb::ShouldSaveToDisk(llarp_time_t now) const
{
if(now == 0)
now = llarp::time_now_ms();
return m_NextSaveToDisk > 0 && m_NextSaveToDisk <= now;
}
void
llarp_nodedb::AsyncFlushToDisk()
{
disk->addJob(std::bind(&llarp_nodedb::SaveAll, this));
m_NextSaveToDisk = llarp::time_now_ms() + m_SaveInterval;
}
ssize_t
@ -375,16 +387,17 @@ crypto_threadworker_verifyrc(void *user)
// if it's valid we need to set it
if(verify_request->valid && rc.IsPublicRouter())
{
llarp::LogDebug("RC is valid, saving to disk");
verify_request->diskworker->addJob(
std::bind(&disk_threadworker_setRC, verify_request));
}
else
{
// callback to logic thread
verify_request->logic->queue_job(
{verify_request, &logic_threadworker_callback});
if(verify_request->diskworker)
{
llarp::LogDebug("RC is valid, saving to disk");
verify_request->diskworker->addJob(
std::bind(&disk_threadworker_setRC, verify_request));
return;
}
}
// callback to logic thread
verify_request->logic->queue_job(
{verify_request, &logic_threadworker_callback});
}
void
@ -394,6 +407,12 @@ nodedb_inform_load_rc(void *user)
job->hook(job);
}
void
llarp_nodedb_async_verify(struct llarp_async_verify_rc *job)
{
job->cryptoworker->addJob(std::bind(&crypto_threadworker_verifyrc, job));
}
void
nodedb_async_load_rc(void *user)
{
@ -458,25 +477,6 @@ llarp_nodedb::load_dir(const char *dir)
return Load(dir);
}
/// maybe rename to verify_and_set
void
llarp_nodedb_async_verify(struct llarp_async_verify_rc *job)
{
// switch to crypto threadpool and continue with
// crypto_threadworker_verifyrc
job->cryptoworker->addJob(std::bind(&crypto_threadworker_verifyrc, job));
}
// disabled for now
/*
void
llarp_nodedb_async_load_rc(struct llarp_async_load_rc *job)
{
// call in the disk io thread so we don't bog down the others
llarp_threadpool_queue_job(job->diskworker, {job, &nodedb_async_load_rc});
}
*/
size_t
llarp_nodedb::num_loaded() const
{

@ -52,6 +52,10 @@ struct llarp_nodedb
std::shared_ptr< llarp::thread::ThreadPool > disk;
mutable llarp::util::Mutex access; // protects entries
/// time for next save to disk event, 0 if never happened
llarp_time_t m_NextSaveToDisk = 0;
/// how often to save to disk
const llarp_time_t m_SaveInterval = 60 * 5 * 1000;
struct NetDBEntry
{
@ -67,6 +71,10 @@ struct llarp_nodedb
NetDBMap_t entries GUARDED_BY(access);
fs::path nodePath;
/// return true if we should save our nodedb to disk
bool
ShouldSaveToDisk(llarp_time_t now = 0) const;
bool
Remove(const llarp::RouterID &pk) LOCKS_EXCLUDED(access);
@ -87,9 +95,10 @@ struct llarp_nodedb
std::string
getRCFilePath(const llarp::RouterID &pubkey) const;
/// insert and write to disk
/// insert and optionally write to disk
bool
Insert(const llarp::RouterContact &rc) LOCKS_EXCLUDED(access);
Insert(const llarp::RouterContact &rc, bool writeToDisk = false)
LOCKS_EXCLUDED(access);
/// unconditional insert and write to disk in background
/// updates the inserted time of the entry

@ -727,12 +727,17 @@ namespace llarp
if(rpcCaller)
rpcCaller->Tick(now);
// save profiles async
// save profiles
if(routerProfiling().ShouldSave(now))
{
diskworker()->addJob(
[&]() { routerProfiling().Save(routerProfilesFile.c_str()); });
}
// save nodedb
if(nodedb()->ShouldSaveToDisk(now))
{
nodedb()->AsyncFlushToDisk();
}
// get connected peers
std::set< dht::Key_t > peersWeHave;

Loading…
Cancel
Save