Track traffic peerstats

pull/1312/head
Stephen Shelton 4 years ago
parent 77b98459dd
commit d897099e1d
No known key found for this signature in database
GPG Key ID: EE4BADACCE8B631C

@ -281,6 +281,13 @@ namespace llarp
return false;
}
SessionStats
Session::GetSessionStats() const
{
// TODO: thread safety
return m_Stats;
}
util::StatusObject
Session::ExtractStatus() const
{

@ -115,6 +115,9 @@ namespace llarp
bool
ShouldPing() const override;
SessionStats
GetSessionStats() const override;
util::StatusObject
ExtractStatus() const override;
@ -141,20 +144,7 @@ namespace llarp
static std::string
StateToString(State state);
State m_State;
struct Stats
{
// rate
uint64_t currentRateRX = 0;
uint64_t currentRateTX = 0;
uint64_t totalPacketsRX = 0;
uint64_t totalAckedTX = 0;
uint64_t totalDroppedTX = 0;
uint64_t totalInFlightTX = 0;
};
Stats m_Stats;
SessionStats m_Stats;
/// are we inbound session ?
const bool m_Inbound;

@ -4,6 +4,7 @@
#include <link/server.hpp>
#include <util/thread/logic.hpp>
#include <util/types.hpp>
#include <peerstats/peer_db.hpp>
#include <functional>
@ -77,6 +78,9 @@ namespace llarp
virtual void
CheckPersistingSessions(llarp_time_t now) = 0;
virtual void
updatePeerDb(std::shared_ptr<PeerDb> peerDb) = 0;
virtual util::StatusObject
ExtractStatus() const = 0;
};

@ -327,6 +327,58 @@ namespace llarp
}
}
void
LinkManager::updatePeerDb(std::shared_ptr<PeerDb> peerDb)
{
LogWarn("LinkManager::updatePeerDb()");
std::vector<std::pair<RouterID, SessionStats>> statsToUpdate;
int64_t diffTotalTX = 0;
ForEachPeer([&](ILinkSession* session) {
// derive RouterID
RouterID id = RouterID(session->GetRemoteRC().pubkey);
LogWarn(" Session : ", id);
SessionStats sessionStats = session->GetSessionStats();
SessionStats diff;
SessionStats& lastStats = m_lastRouterStats[id];
// TODO: operator overloads / member func for diff
diff.currentRateRX = std::max(sessionStats.currentRateRX, lastStats.currentRateRX);
diff.currentRateTX = std::max(sessionStats.currentRateTX, lastStats.currentRateTX);
diff.totalPacketsRX = sessionStats.totalPacketsRX - lastStats.totalPacketsRX;
diff.totalAckedTX = sessionStats.totalAckedTX - lastStats.totalAckedTX;
diff.totalDroppedTX = sessionStats.totalDroppedTX - lastStats.totalDroppedTX;
diffTotalTX = diff.totalAckedTX + diff.totalDroppedTX + diff.totalInFlightTX;
lastStats = sessionStats;
// TODO: if we have both inbound and outbound session, this will overwrite
statsToUpdate.push_back({id, diff});
});
for (auto& routerStats : statsToUpdate)
{
peerDb->modifyPeerStats(routerStats.first, [&](PeerStats& stats) {
// TODO: store separate stats for up vs down
const auto& diff = routerStats.second;
// note that 'currentRateRX' and 'currentRateTX' are per-second
stats.peakBandwidthBytesPerSec = std::max(
stats.peakBandwidthBytesPerSec,
(double)std::max(diff.currentRateRX, diff.currentRateTX));
stats.numPacketsDropped += diff.totalDroppedTX;
stats.numPacketsSent = diff.totalAckedTX;
stats.numPacketsAttempted = diffTotalTX;
// TODO: others -- we have slight mismatch on what we store
});
}
}
util::StatusObject
LinkManager::ExtractStatus() const
{

@ -74,6 +74,9 @@ namespace llarp
void
CheckPersistingSessions(llarp_time_t now) override;
void
updatePeerDb(std::shared_ptr<PeerDb> peerDb) override;
util::StatusObject
ExtractStatus() const override;
@ -96,6 +99,8 @@ namespace llarp
std::unordered_map<RouterID, llarp_time_t, RouterID::Hash> m_PersistingSessions
GUARDED_BY(_mutex);
std::unordered_map<RouterID, SessionStats, RouterID::Hash> m_lastRouterStats;
IOutboundSessionMaker* _sessionMaker;
};

@ -15,6 +15,19 @@ namespace llarp
struct ILinkMessage;
struct ILinkLayer;
struct SessionStats
{
// rate
uint64_t currentRateRX = 0;
uint64_t currentRateTX = 0;
uint64_t totalPacketsRX = 0;
uint64_t totalAckedTX = 0;
uint64_t totalDroppedTX = 0;
uint64_t totalInFlightTX = 0;
};
struct ILinkSession
{
virtual ~ILinkSession() = default;
@ -108,6 +121,10 @@ namespace llarp
virtual bool
ShouldPing() const = 0;
/// return the current stats for this session
virtual SessionStats
GetSessionStats() const = 0;
virtual util::StatusObject
ExtractStatus() const = 0;
};

@ -770,8 +770,18 @@ namespace llarp
nodedb()->AsyncFlushToDisk();
}
if (m_peerDb and m_peerDb->shouldFlush(now))
diskworker()->addJob([this]() { m_peerDb->flushDatabase(); });
if (m_peerDb)
{
// TODO: throttle this?
// TODO: need to capture session stats when session terminates / is removed from link manager
_linkManager.updatePeerDb(m_peerDb);
if (m_peerDb->shouldFlush(now))
{
LogWarn("Queing database flush...");
diskworker()->addJob([this]() { m_peerDb->flushDatabase(); });
}
}
// get connected peers
std::set<dht::Key_t> peersWeHave;

Loading…
Cancel
Save