llarp/profiling: refactor to use oxenc producer/consumer

No more llarp_buffer_t here!

(I was tracking down a segfault which led me in here and it was easier
to rewrite this to use bt_dict_{consumer,producer} than to decipher all
the cursed llarp_buffer_t and bencode callback nest).
pull/2005/head
Jason Rhinelander 2 years ago
parent d335527a70
commit f641c08e80
No known key found for this signature in database
GPG Key ID: C4992CE7A88D4262

@ -1,53 +1,51 @@
#include "profiling.hpp" #include "profiling.hpp"
#include <oxenc/bt_producer.h>
#include <oxenc/bt_serialize.h>
#include <fstream> #include "util/file.hpp"
#include "util/fs.hpp" #include "util/logging.hpp"
using oxenc::bt_dict_consumer;
using oxenc::bt_dict_producer;
namespace llarp namespace llarp
{ {
bool static auto logcat = log::Cat("profiling");
RouterProfile::BEncode(llarp_buffer_t* buf) const
{
if (!bencode_start_dict(buf))
return false;
if (!BEncodeWriteDictInt("g", connectGoodCount, buf)) RouterProfile::RouterProfile(bt_dict_consumer dict)
return false; {
if (!BEncodeWriteDictInt("p", pathSuccessCount, buf)) BDecode(std::move(dict));
return false; }
if (!BEncodeWriteDictInt("q", pathTimeoutCount, buf))
return false;
if (!BEncodeWriteDictInt("s", pathFailCount, buf))
return false;
if (!BEncodeWriteDictInt("t", connectTimeoutCount, buf))
return false;
if (!BEncodeWriteDictInt("u", lastUpdated.count(), buf))
return false;
if (!BEncodeWriteDictInt("v", version, buf))
return false;
return bencode_end(buf); void
RouterProfile::BEncode(bt_dict_producer& dict) const
{
dict.append("g", connectGoodCount);
dict.append("p", pathSuccessCount);
dict.append("q", pathTimeoutCount);
dict.append("s", pathFailCount);
dict.append("t", connectTimeoutCount);
dict.append("u", lastUpdated.count());
dict.append("v", version);
} }
bool void
RouterProfile::DecodeKey(const llarp_buffer_t& k, llarp_buffer_t* buf) RouterProfile::BDecode(bt_dict_consumer dict)
{ {
bool read = false; if (dict.skip_until("g"))
if (!BEncodeMaybeReadDictInt("g", connectGoodCount, read, k, buf)) connectGoodCount = dict.consume_integer<uint64_t>();
return false; if (dict.skip_until("p"))
if (!BEncodeMaybeReadDictInt("t", connectTimeoutCount, read, k, buf)) pathSuccessCount = dict.consume_integer<uint64_t>();
return false; if (dict.skip_until("q"))
if (!BEncodeMaybeReadDictInt("u", lastUpdated, read, k, buf)) pathTimeoutCount = dict.consume_integer<uint64_t>();
return false; if (dict.skip_until("s"))
if (!BEncodeMaybeReadDictInt("v", version, read, k, buf)) pathFailCount = dict.consume_integer<uint64_t>();
return false; if (dict.skip_until("t"))
if (!BEncodeMaybeReadDictInt("s", pathFailCount, read, k, buf)) connectTimeoutCount = dict.consume_integer<uint64_t>();
return false; if (dict.skip_until("u"))
if (!BEncodeMaybeReadDictInt("p", pathSuccessCount, read, k, buf)) lastUpdated = llarp_time_t{dict.consume_integer<uint64_t>()};
return false; if (dict.skip_until("v"))
if (!BEncodeMaybeReadDictInt("q", pathTimeoutCount, read, k, buf)) version = dict.consume_integer<uint64_t>();
return false;
return read;
} }
void void
@ -156,23 +154,26 @@ namespace llarp
Profiling::Tick() Profiling::Tick()
{ {
util::Lock lock(m_ProfilesMutex); util::Lock lock(m_ProfilesMutex);
std::for_each(m_Profiles.begin(), m_Profiles.end(), [](auto& item) { item.second.Tick(); }); for (auto& [rid, profile] : m_Profiles)
profile.Tick();
} }
void void
Profiling::MarkConnectTimeout(const RouterID& r) Profiling::MarkConnectTimeout(const RouterID& r)
{ {
util::Lock lock{m_ProfilesMutex}; util::Lock lock{m_ProfilesMutex};
m_Profiles[r].connectTimeoutCount += 1; auto& profile = m_Profiles[r];
m_Profiles[r].lastUpdated = llarp::time_now_ms(); profile.connectTimeoutCount += 1;
profile.lastUpdated = llarp::time_now_ms();
} }
void void
Profiling::MarkConnectSuccess(const RouterID& r) Profiling::MarkConnectSuccess(const RouterID& r)
{ {
util::Lock lock{m_ProfilesMutex}; util::Lock lock{m_ProfilesMutex};
m_Profiles[r].connectGoodCount += 1; auto& profile = m_Profiles[r];
m_Profiles[r].lastUpdated = llarp::time_now_ms(); profile.connectGoodCount += 1;
profile.lastUpdated = llarp::time_now_ms();
} }
void void
@ -186,24 +187,27 @@ namespace llarp
Profiling::MarkHopFail(const RouterID& r) Profiling::MarkHopFail(const RouterID& r)
{ {
util::Lock lock{m_ProfilesMutex}; util::Lock lock{m_ProfilesMutex};
m_Profiles[r].pathFailCount += 1; auto& profile = m_Profiles[r];
m_Profiles[r].lastUpdated = llarp::time_now_ms(); profile.pathFailCount += 1;
profile.lastUpdated = llarp::time_now_ms();
} }
void void
Profiling::MarkPathFail(path::Path* p) Profiling::MarkPathFail(path::Path* p)
{ {
util::Lock lock{m_ProfilesMutex}; util::Lock lock{m_ProfilesMutex};
size_t idx = 0; bool first = true;
for (const auto& hop : p->hops) for (const auto& hop : p->hops)
{ {
// don't mark first hop as failure because we are connected to it directly // don't mark first hop as failure because we are connected to it directly
if (idx) if (first)
first = false;
else
{ {
m_Profiles[hop.rc.pubkey].pathFailCount += 1; auto& profile = m_Profiles[hop.rc.pubkey];
m_Profiles[hop.rc.pubkey].lastUpdated = llarp::time_now_ms(); profile.pathFailCount += 1;
profile.lastUpdated = llarp::time_now_ms();
} }
++idx;
} }
} }
@ -213,8 +217,9 @@ namespace llarp
util::Lock lock{m_ProfilesMutex}; util::Lock lock{m_ProfilesMutex};
for (const auto& hop : p->hops) for (const auto& hop : p->hops)
{ {
m_Profiles[hop.rc.pubkey].pathTimeoutCount += 1; auto& profile = m_Profiles[hop.rc.pubkey];
m_Profiles[hop.rc.pubkey].lastUpdated = llarp::time_now_ms(); profile.pathTimeoutCount += 1;
profile.lastUpdated = llarp::time_now_ms();
} }
} }
@ -225,88 +230,82 @@ namespace llarp
const auto sz = p->hops.size(); const auto sz = p->hops.size();
for (const auto& hop : p->hops) for (const auto& hop : p->hops)
{ {
auto& profile = m_Profiles[hop.rc.pubkey];
// redeem previous fails by halfing the fail count and setting timeout to zero // redeem previous fails by halfing the fail count and setting timeout to zero
m_Profiles[hop.rc.pubkey].pathFailCount /= 2; profile.pathFailCount /= 2;
m_Profiles[hop.rc.pubkey].pathTimeoutCount = 0; profile.pathTimeoutCount = 0;
// mark success at hop // mark success at hop
m_Profiles[hop.rc.pubkey].pathSuccessCount += sz; profile.pathSuccessCount += sz;
m_Profiles[hop.rc.pubkey].lastUpdated = llarp::time_now_ms(); profile.lastUpdated = llarp::time_now_ms();
} }
} }
bool bool
Profiling::Save(const fs::path fpath) Profiling::Save(const fs::path fpath)
{ {
const size_t sz = (m_Profiles.size() * (RouterProfile::MaxSize + 32 + 8)) + 8; std::string buf;
std::vector<byte_t> tmp(sz, 0);
llarp_buffer_t buf(tmp);
{ {
util::Lock lock{m_ProfilesMutex}; util::Lock lock{m_ProfilesMutex};
if (not BEncode(&buf)) buf.resize((m_Profiles.size() * (RouterProfile::MaxSize + 32 + 8)) + 8);
bt_dict_producer d{buf.data(), buf.size()};
try
{
BEncode(d);
}
catch (const std::exception& e)
{
log::warning(logcat, "Failed to encode profiling data: {}", e.what());
return false; return false;
}
buf.resize(d.end() - buf.data());
} }
buf.sz = buf.cur - buf.base; try
auto optional_f = util::OpenFileStream<std::ofstream>(fpath, std::ios::binary); {
if (!optional_f) util::dump_file(fpath, buf);
return false; }
auto& f = *optional_f; catch (const std::exception& e)
if (not f.is_open()) {
log::warning(logcat, "Failed to save profiling data to {}: {}", fpath, e.what());
return false; return false;
}
f.write(reinterpret_cast<const char*>(buf.base), buf.sz);
if (not f.good())
return false;
m_LastSave = llarp::time_now_ms(); m_LastSave = llarp::time_now_ms();
return true; return true;
} }
bool void
Profiling::BEncode(llarp_buffer_t* buf) const Profiling::BEncode(bt_dict_producer& dict) const
{
if (!bencode_start_dict(buf))
return false;
auto itr = m_Profiles.begin();
while (itr != m_Profiles.end())
{
if (!itr->first.BEncode(buf))
return false;
if (!itr->second.BEncode(buf))
return false;
++itr;
}
return bencode_end(buf);
}
bool
Profiling::DecodeKey(const llarp_buffer_t& k, llarp_buffer_t* buf)
{ {
if (k.sz != 32) for (const auto& [r_id, profile] : m_Profiles)
return false; profile.BEncode(dict.append_dict(r_id.ToView()));
RouterProfile profile;
if (!bencode_decode_dict(profile, buf))
return false;
RouterID pk = k.base;
return m_Profiles.emplace(pk, profile).second;
} }
bool void
Profiling::BDecode(llarp_buffer_t* buf) Profiling::BDecode(bt_dict_consumer dict)
{ {
return bencode_decode_dict(*this, buf); m_Profiles.clear();
while (dict)
{
auto [rid, subdict] = dict.next_dict_consumer();
if (rid.size() != RouterID::SIZE)
throw std::invalid_argument{"invalid RouterID"};
m_Profiles.emplace(reinterpret_cast<const byte_t*>(rid.data()), subdict);
}
} }
bool bool
Profiling::Load(const fs::path fname) Profiling::Load(const fs::path fname)
{ {
util::Lock lock{m_ProfilesMutex}; try
m_Profiles.clear(); {
if (!BDecodeReadFile(fname, *this)) std::string data = util::slurp_file(fname);
util::Lock lock{m_ProfilesMutex};
BDecode(bt_dict_consumer{data});
}
catch (const std::exception& e)
{ {
llarp::LogWarn("failed to load router profiles from ", fname); log::warning(logcat, "failed to load router profiles from {}: {}", fname, e.what());
return false; return false;
} }
m_LastSave = llarp::time_now_ms(); m_LastSave = llarp::time_now_ms();

@ -8,6 +8,12 @@
#include "util/thread/annotations.hpp" #include "util/thread/annotations.hpp"
#include <map> #include <map>
namespace oxenc
{
class bt_dict_consumer;
class bt_dict_producer;
} // namespace oxenc
namespace llarp namespace llarp
{ {
struct RouterProfile struct RouterProfile
@ -22,11 +28,19 @@ namespace llarp
llarp_time_t lastDecay = 0s; llarp_time_t lastDecay = 0s;
uint64_t version = llarp::constants::proto_version; uint64_t version = llarp::constants::proto_version;
bool RouterProfile() = default;
BEncode(llarp_buffer_t* buf) const; RouterProfile(oxenc::bt_dict_consumer dict);
bool void
DecodeKey(const llarp_buffer_t& k, llarp_buffer_t* buf); BEncode(oxenc::bt_dict_producer& dict) const;
void
BEncode(oxenc::bt_dict_producer&& dict) const
{
BEncode(dict);
}
void
BDecode(oxenc::bt_dict_consumer dict);
bool bool
IsGood(uint64_t chances) const; IsGood(uint64_t chances) const;
@ -89,16 +103,6 @@ namespace llarp
void void
Tick() EXCLUDES(m_ProfilesMutex); Tick() EXCLUDES(m_ProfilesMutex);
bool
BEncode(llarp_buffer_t* buf) const;
bool
BDecode(llarp_buffer_t* buf);
bool
DecodeKey(const llarp_buffer_t& k, llarp_buffer_t* buf) NO_THREAD_SAFETY_ANALYSIS;
// disabled because we do load -> bencode::BDecodeReadFromFile -> DecodeKey
bool bool
Load(const fs::path fname) EXCLUDES(m_ProfilesMutex); Load(const fs::path fname) EXCLUDES(m_ProfilesMutex);
@ -115,6 +119,12 @@ namespace llarp
Enable(); Enable();
private: private:
void
BEncode(oxenc::bt_dict_producer& dict) const;
void
BDecode(oxenc::bt_dict_consumer dict);
mutable util::Mutex m_ProfilesMutex; // protects m_Profiles mutable util::Mutex m_ProfilesMutex; // protects m_Profiles
std::map<RouterID, RouterProfile> m_Profiles GUARDED_BY(m_ProfilesMutex); std::map<RouterID, RouterProfile> m_Profiles GUARDED_BY(m_ProfilesMutex);
llarp_time_t m_LastSave = 0s; llarp_time_t m_LastSave = 0s;

@ -260,6 +260,12 @@ namespace llarp
return FromBytestring(&strbuf); return FromBytestring(&strbuf);
} }
std::string_view
ToView() const
{
return {reinterpret_cast<const char*>(data()), sz};
}
std::string std::string
ToHex() const ToHex() const
{ {

Loading…
Cancel
Save