lokinet/llarp/peerstats/peer_db.cpp

366 lines
9.7 KiB
C++
Raw Normal View History

#include "peer_db.hpp"
2020-05-20 17:26:45 +00:00
#include <llarp/util/logging.hpp>
#include <llarp/util/status.hpp>
#include <llarp/util/str.hpp>
2020-05-20 17:26:45 +00:00
namespace llarp
{
#ifdef LOKINET_PEERSTATS_BACKEND
2020-05-26 17:03:21 +00:00
PeerDb::PeerDb()
{
m_lastFlush.store({});
}
void
PeerDb::loadDatabase(std::optional<fs::path> file)
2020-05-20 17:26:45 +00:00
{
2020-06-05 16:50:32 +00:00
std::lock_guard guard(m_statsLock);
2020-05-20 17:26:45 +00:00
if (m_storage)
throw std::runtime_error("Reloading database not supported"); // TODO
2020-05-20 17:26:45 +00:00
m_peerStats.clear();
// sqlite_orm treats empty-string as an indicator to load a memory-backed database, which we'll
// use if file is an empty-optional
std::string fileString;
if (file.has_value())
{
fileString = file->string();
LogInfo("Loading PeerDb from file ", fileString);
}
else
{
LogInfo("Loading memory-backed PeerDb");
}
2020-05-20 17:26:45 +00:00
m_storage = std::make_unique<PeerDbStorage>(initStorage(fileString));
2020-05-21 15:31:31 +00:00
m_storage->sync_schema(true); // true for "preserve" as in "don't nuke" (how cute!)
auto allStats = m_storage->get_all<PeerStats>();
LogInfo("Loading ", allStats.size(), " PeerStats from table peerstats...");
2020-05-28 16:03:07 +00:00
for (PeerStats& stats : allStats)
{
// we cleared m_peerStats, and the database should enforce that routerId is unique...
assert(m_peerStats.find(stats.routerId) == m_peerStats.end());
2020-05-28 16:03:07 +00:00
stats.stale = false;
m_peerStats[stats.routerId] = stats;
}
2020-05-20 17:26:45 +00:00
}
void
PeerDb::flushDatabase()
2020-05-20 17:26:45 +00:00
{
2020-05-26 17:03:21 +00:00
LogDebug("flushing PeerDb...");
auto start = time_now_ms();
if (not shouldFlush(start))
2020-05-28 16:03:07 +00:00
{
LogWarn("Call to flushDatabase() while already in progress, ignoring");
return;
}
2020-05-26 17:03:21 +00:00
if (not m_storage)
throw std::runtime_error("Cannot flush database before it has been loaded");
2020-05-28 16:03:07 +00:00
std::vector<PeerStats> staleStats;
2020-05-20 17:26:45 +00:00
{
2020-06-05 16:50:32 +00:00
std::lock_guard guard(m_statsLock);
2020-05-28 16:03:07 +00:00
// copy all stale entries
for (auto& entry : m_peerStats)
{
if (entry.second.stale)
{
staleStats.push_back(entry.second);
entry.second.stale = false;
}
}
}
2020-05-20 17:26:45 +00:00
LogDebug("Updating ", staleStats.size(), " stats");
2020-05-28 16:03:07 +00:00
{
2020-05-28 16:03:07 +00:00
auto guard = m_storage->transaction_guard();
2020-05-20 17:26:45 +00:00
2020-05-28 16:03:07 +00:00
for (const auto& stats : staleStats)
{
m_storage->replace(stats);
}
guard.commit();
}
2020-05-26 17:03:21 +00:00
auto end = time_now_ms();
auto elapsed = end - start;
LogDebug("PeerDb flush took about ", elapsed, " seconds");
2020-05-26 17:03:21 +00:00
m_lastFlush.store(end);
2020-05-20 17:26:45 +00:00
}
void
PeerDb::accumulatePeerStats(const RouterID& routerId, const PeerStats& delta)
{
if (routerId != delta.routerId)
throw std::invalid_argument{
fmt::format("routerId {} doesn't match {}", routerId, delta.routerId)};
2020-06-05 16:50:32 +00:00
std::lock_guard guard(m_statsLock);
auto itr = m_peerStats.find(routerId);
if (itr == m_peerStats.end())
itr = m_peerStats.insert({routerId, delta}).first;
else
itr->second += delta;
2020-05-28 16:03:07 +00:00
itr->second.stale = true;
2020-05-20 17:26:45 +00:00
}
2020-05-26 19:17:51 +00:00
void
PeerDb::modifyPeerStats(const RouterID& routerId, std::function<void(PeerStats&)> callback)
{
2020-06-05 16:50:32 +00:00
std::lock_guard guard(m_statsLock);
2020-05-26 19:17:51 +00:00
PeerStats& stats = m_peerStats[routerId];
stats.routerId = routerId;
2020-05-28 16:03:07 +00:00
stats.stale = true;
2020-05-26 19:17:51 +00:00
callback(stats);
}
std::optional<PeerStats>
2020-05-20 17:26:45 +00:00
PeerDb::getCurrentPeerStats(const RouterID& routerId) const
{
2020-06-05 16:50:32 +00:00
std::lock_guard guard(m_statsLock);
2020-05-20 17:26:45 +00:00
auto itr = m_peerStats.find(routerId);
if (itr == m_peerStats.end())
return std::nullopt;
2020-05-20 17:26:45 +00:00
else
return itr->second;
}
std::vector<PeerStats>
PeerDb::listAllPeerStats() const
{
std::lock_guard guard(m_statsLock);
std::vector<PeerStats> statsList;
statsList.reserve(m_peerStats.size());
for (const auto& [routerId, stats] : m_peerStats)
{
statsList.push_back(stats);
}
return statsList;
}
std::vector<PeerStats>
PeerDb::listPeerStats(const std::vector<RouterID>& ids) const
{
std::lock_guard guard(m_statsLock);
std::vector<PeerStats> statsList;
statsList.reserve(ids.size());
for (const auto& id : ids)
{
const auto itr = m_peerStats.find(id);
if (itr != m_peerStats.end())
statsList.push_back(itr->second);
}
return statsList;
}
/// Assume we receive an RC at some point `R` in time which was signed at some point `S` in time
/// and expires at some point `E` in time, as depicted below:
///
/// +-----------------------------+
/// | signed rc | <- useful lifetime of RC
/// +-----------------------------+
/// ^ [ . . . . . . . . ] <----------- window in which we receive this RC gossiped to us
/// | ^ ^
/// | | |
/// S R E
///
/// One useful metric from this is the difference between (E - R), the useful contact time of this
/// RC. As we track this metric over time, the high and low watermarks serve to tell us how
/// quickly we receive signed RCs from a given router and how close to expiration they are when
/// we receive them. The latter is particularly useful, and should always be a positive number for
/// a healthy router. A negative number indicates that we are receiving an expired RC.
///
/// TODO: we actually discard expired RCs, so we currently would not detect a negative value for
/// (E - R)
///
/// Another related metric is the distance between a newly received RC and the previous RC's
/// expiration, which represents how close we came to having no useful RC to work with. This
/// should be a high (positive) number for a healthy router, and if negative indicates that we
/// had no way to contact this router for a period of time.
///
/// E1 E2 E3
/// | | |
/// v | |
/// +-----------------------------+ | |
/// | signed rc 1 | | |
/// +-----------------------------+ | |
/// [ . . . . . ] v |
/// ^ +-----------------------------+ |
/// | | signed rc 2 | |
/// | +-----------------------------+ |
/// | [ . . . . . . . . . . ] v
/// | ^ +-----------------------------+
/// | | | signed rc 3 |
/// | | +-----------------------------+
/// | | [ . . ]
/// | | ^
/// | | |
/// R1 R2 R3
///
/// Example: the delta between (E1 - R2) is healthy, but the delta between (E2 - R3) is indicates
/// that we had a brief period of time where we had no valid (non-expired) RC for this router
/// (because it is negative).
2020-05-27 01:57:27 +00:00
void
PeerDb::handleGossipedRC(const RouterContact& rc, llarp_time_t now)
{
2020-06-05 16:50:32 +00:00
std::lock_guard guard(m_statsLock);
2020-05-27 01:57:27 +00:00
RouterID id(rc.pubkey);
auto& stats = m_peerStats[id];
stats.routerId = id;
2020-05-27 01:57:27 +00:00
const bool isNewRC = (stats.lastRCUpdated < rc.last_updated);
if (isNewRC)
2020-05-27 01:57:27 +00:00
{
stats.numDistinctRCsReceived++;
if (stats.numDistinctRCsReceived > 1)
{
auto prevRCExpiration = (stats.lastRCUpdated + RouterContact::Lifetime);
// we track max expiry as the delta between (last expiration time - time received),
// and this value will be negative for an unhealthy router
// TODO: handle case where new RC is also expired? just ignore?
auto expiry = prevRCExpiration - now;
2020-05-27 01:57:27 +00:00
if (stats.numDistinctRCsReceived == 2)
stats.leastRCRemainingLifetime = expiry;
else
stats.leastRCRemainingLifetime = std::min(stats.leastRCRemainingLifetime, expiry);
}
stats.lastRCUpdated = rc.last_updated;
2020-05-28 16:03:07 +00:00
stats.stale = true;
2020-05-27 01:57:27 +00:00
}
}
2020-05-26 17:03:21 +00:00
void
PeerDb::configure(const RouterConfig& routerConfig)
{
fs::path dbPath = routerConfig.m_dataDir / "peerstats.sqlite";
loadDatabase(dbPath);
}
bool
PeerDb::shouldFlush(llarp_time_t now)
{
2020-05-26 19:17:51 +00:00
constexpr llarp_time_t TargetFlushInterval = 30s;
2020-05-26 17:29:22 +00:00
return (now - m_lastFlush.load() >= TargetFlushInterval);
2020-05-26 17:03:21 +00:00
}
util::StatusObject
PeerDb::ExtractStatus() const
{
2020-06-05 16:50:32 +00:00
std::lock_guard guard(m_statsLock);
bool loaded = (m_storage.get() != nullptr);
util::StatusObject dbFile = nullptr;
if (loaded)
dbFile = m_storage->filename();
std::vector<util::StatusObject> statsObjs;
statsObjs.reserve(m_peerStats.size());
for (const auto& pair : m_peerStats)
{
statsObjs.push_back(pair.second.toJson());
}
util::StatusObject obj{
{"dbLoaded", loaded},
{"dbFile", dbFile},
{"lastFlushMs", m_lastFlush.load().count()},
{"stats", statsObjs},
};
return obj;
}
#else // !LOKINET_PEERSTATS
// Empty stubs
PeerDb::PeerDb()
{
throw std::logic_error{"Peer stats backend not enabled!"};
}
void PeerDb::loadDatabase(std::optional<fs::path>)
{}
void
PeerDb::flushDatabase()
{}
void
PeerDb::accumulatePeerStats(const RouterID&, const PeerStats&)
{}
void
PeerDb::modifyPeerStats(const RouterID&, std::function<void(PeerStats&)>)
{}
std::optional<PeerStats>
PeerDb::getCurrentPeerStats(const RouterID&) const
{
return std::nullopt;
}
std::vector<PeerStats>
PeerDb::listAllPeerStats() const
{
return {};
}
std::vector<PeerStats>
PeerDb::listPeerStats(const std::vector<RouterID>&) const
{
return {};
}
void
PeerDb::handleGossipedRC(const RouterContact&, llarp_time_t)
{}
void
2022-08-30 18:53:40 +00:00
PeerDb::configure(const RouterConfig&)
{}
2022-08-30 18:53:40 +00:00
bool PeerDb::shouldFlush(llarp_time_t)
{
return false;
}
util::StatusObject
PeerDb::ExtractStatus() const
{
return {};
}
#endif
2020-05-20 17:26:45 +00:00
}; // namespace llarp