diff --git a/llarp/iwp/session.cpp b/llarp/iwp/session.cpp index dd395f82b..82c659354 100644 --- a/llarp/iwp/session.cpp +++ b/llarp/iwp/session.cpp @@ -281,6 +281,13 @@ namespace llarp return false; } + SessionStats + Session::GetSessionStats() const + { + // TODO: thread safety + return m_Stats; + } + util::StatusObject Session::ExtractStatus() const { diff --git a/llarp/iwp/session.hpp b/llarp/iwp/session.hpp index 56044ffd8..00ae14510 100644 --- a/llarp/iwp/session.hpp +++ b/llarp/iwp/session.hpp @@ -115,6 +115,9 @@ namespace llarp bool ShouldPing() const override; + SessionStats + GetSessionStats() const override; + util::StatusObject ExtractStatus() const override; @@ -141,20 +144,7 @@ namespace llarp static std::string StateToString(State state); State m_State; - - struct Stats - { - // rate - uint64_t currentRateRX = 0; - uint64_t currentRateTX = 0; - - uint64_t totalPacketsRX = 0; - - uint64_t totalAckedTX = 0; - uint64_t totalDroppedTX = 0; - uint64_t totalInFlightTX = 0; - }; - Stats m_Stats; + SessionStats m_Stats; /// are we inbound session ? const bool m_Inbound; diff --git a/llarp/link/i_link_manager.hpp b/llarp/link/i_link_manager.hpp index 002a59f52..52b16c33a 100644 --- a/llarp/link/i_link_manager.hpp +++ b/llarp/link/i_link_manager.hpp @@ -4,6 +4,7 @@ #include #include #include +#include #include @@ -77,6 +78,9 @@ namespace llarp virtual void CheckPersistingSessions(llarp_time_t now) = 0; + virtual void + updatePeerDb(std::shared_ptr peerDb) = 0; + virtual util::StatusObject ExtractStatus() const = 0; }; diff --git a/llarp/link/link_manager.cpp b/llarp/link/link_manager.cpp index 9caf940c1..705652df5 100644 --- a/llarp/link/link_manager.cpp +++ b/llarp/link/link_manager.cpp @@ -327,6 +327,58 @@ namespace llarp } } + void + LinkManager::updatePeerDb(std::shared_ptr peerDb) + { + LogWarn("LinkManager::updatePeerDb()"); + + std::vector> statsToUpdate; + + int64_t diffTotalTX = 0; + + ForEachPeer([&](ILinkSession* session) { + // derive RouterID + RouterID id = RouterID(session->GetRemoteRC().pubkey); + LogWarn(" Session : ", id); + + SessionStats sessionStats = session->GetSessionStats(); + SessionStats diff; + SessionStats& lastStats = m_lastRouterStats[id]; + + // TODO: operator overloads / member func for diff + diff.currentRateRX = std::max(sessionStats.currentRateRX, lastStats.currentRateRX); + diff.currentRateTX = std::max(sessionStats.currentRateTX, lastStats.currentRateTX); + diff.totalPacketsRX = sessionStats.totalPacketsRX - lastStats.totalPacketsRX; + diff.totalAckedTX = sessionStats.totalAckedTX - lastStats.totalAckedTX; + diff.totalDroppedTX = sessionStats.totalDroppedTX - lastStats.totalDroppedTX; + + diffTotalTX = diff.totalAckedTX + diff.totalDroppedTX + diff.totalInFlightTX; + + lastStats = sessionStats; + + // TODO: if we have both inbound and outbound session, this will overwrite + statsToUpdate.push_back({id, diff}); + }); + + for (auto& routerStats : statsToUpdate) + { + peerDb->modifyPeerStats(routerStats.first, [&](PeerStats& stats) { + // TODO: store separate stats for up vs down + const auto& diff = routerStats.second; + + // note that 'currentRateRX' and 'currentRateTX' are per-second + stats.peakBandwidthBytesPerSec = std::max( + stats.peakBandwidthBytesPerSec, + (double)std::max(diff.currentRateRX, diff.currentRateTX)); + stats.numPacketsDropped += diff.totalDroppedTX; + stats.numPacketsSent = diff.totalAckedTX; + stats.numPacketsAttempted = diffTotalTX; + + // TODO: others -- we have slight mismatch on what we store + }); + } + } + util::StatusObject LinkManager::ExtractStatus() const { diff --git a/llarp/link/link_manager.hpp b/llarp/link/link_manager.hpp index ae9bfabf7..ddd546854 100644 --- a/llarp/link/link_manager.hpp +++ b/llarp/link/link_manager.hpp @@ -74,6 +74,9 @@ namespace llarp void CheckPersistingSessions(llarp_time_t now) override; + void + updatePeerDb(std::shared_ptr peerDb) override; + util::StatusObject ExtractStatus() const override; @@ -96,6 +99,8 @@ namespace llarp std::unordered_map m_PersistingSessions GUARDED_BY(_mutex); + std::unordered_map m_lastRouterStats; + IOutboundSessionMaker* _sessionMaker; }; diff --git a/llarp/link/session.hpp b/llarp/link/session.hpp index 199b14996..efb6f1dd5 100644 --- a/llarp/link/session.hpp +++ b/llarp/link/session.hpp @@ -15,6 +15,19 @@ namespace llarp struct ILinkMessage; struct ILinkLayer; + struct SessionStats + { + // rate + uint64_t currentRateRX = 0; + uint64_t currentRateTX = 0; + + uint64_t totalPacketsRX = 0; + + uint64_t totalAckedTX = 0; + uint64_t totalDroppedTX = 0; + uint64_t totalInFlightTX = 0; + }; + struct ILinkSession { virtual ~ILinkSession() = default; @@ -108,6 +121,10 @@ namespace llarp virtual bool ShouldPing() const = 0; + /// return the current stats for this session + virtual SessionStats + GetSessionStats() const = 0; + virtual util::StatusObject ExtractStatus() const = 0; }; diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index 0dea08ae7..dbcf98de7 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -770,8 +770,18 @@ namespace llarp nodedb()->AsyncFlushToDisk(); } - if (m_peerDb and m_peerDb->shouldFlush(now)) - diskworker()->addJob([this]() { m_peerDb->flushDatabase(); }); + if (m_peerDb) + { + // TODO: throttle this? + // TODO: need to capture session stats when session terminates / is removed from link manager + _linkManager.updatePeerDb(m_peerDb); + + if (m_peerDb->shouldFlush(now)) + { + LogWarn("Queing database flush..."); + diskworker()->addJob([this]() { m_peerDb->flushDatabase(); }); + } + } // get connected peers std::set peersWeHave;