diff --git a/.drone.jsonnet b/.drone.jsonnet index 07c8e44e5..b5bdb3f67 100644 --- a/.drone.jsonnet +++ b/.drone.jsonnet @@ -1,4 +1,4 @@ -local default_deps_base='libsystemd-dev python3-dev libcurl4-openssl-dev libuv1-dev libunbound-dev nettle-dev libssl-dev libevent-dev'; +local default_deps_base='libsystemd-dev python3-dev libcurl4-openssl-dev libuv1-dev libunbound-dev nettle-dev libssl-dev libevent-dev libsqlite3-dev'; local default_deps_nocxx='libsodium-dev ' + default_deps_base; // libsodium-dev needs to be >= 1.0.18 local default_deps='g++ ' + default_deps_nocxx; // g++ sometimes needs replacement local default_windows_deps='mingw-w64-binutils mingw-w64-gcc mingw-w64-crt mingw-w64-headers mingw-w64-winpthreads perl openssh zip bash'; // deps for windows cross compile diff --git a/.gitmodules b/.gitmodules index 18e3dc04b..f8462c270 100644 --- a/.gitmodules +++ b/.gitmodules @@ -31,3 +31,6 @@ path = external/loki-mq url = https://github.com/loki-project/loki-mq branch = dev +[submodule "external/sqlite_orm"] + path = external/sqlite_orm + url = https://github.com/fnc12/sqlite_orm diff --git a/CMakeLists.txt b/CMakeLists.txt index a0a9bc955..04ef7ba82 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -305,6 +305,7 @@ if(SUBMODULE_CHECK) check_submodule(external/ghc-filesystem) check_submodule(external/date) check_submodule(external/pybind11) + check_submodule(external/sqlite_orm) if (NOT WIN32) # we grab libuv for windows separately in win32-setup/libuv. see note in cmake/win32.cmake. check_submodule(external/libuv) endif() @@ -324,6 +325,8 @@ add_subdirectory(external/nlohmann EXCLUDE_FROM_ALL) add_subdirectory(external/cxxopts EXCLUDE_FROM_ALL) add_subdirectory(external/date EXCLUDE_FROM_ALL) +include_directories(SYSTEM external/sqlite_orm/include) + add_subdirectory(vendor) if(ANDROID) diff --git a/cmake/StaticBuild.cmake b/cmake/StaticBuild.cmake index 3bb312231..ae1748cb8 100644 --- a/cmake/StaticBuild.cmake +++ b/cmake/StaticBuild.cmake @@ -25,6 +25,13 @@ set(UNBOUND_SOURCE unbound-${UNBOUND_VERSION}.tar.gz) set(UNBOUND_HASH SHA256=b73677c21a71cf92f15cc8cfe76a3d875e40f65b6150081c39620b286582d536 CACHE STRING "unbound source hash") +set(SQLITE3_VERSION 3320200 CACHE STRING "sqlite3 version") +set(SQLITE3_MIRROR ${LOCAL_MIRROR} https://www.sqlite.org/2020 + CACHE STRING "sqlite3 download mirror(s)") +set(SQLITE3_SOURCE sqlite-autoconf-${SQLITE3_VERSION}.tar.gz) +set(SQLITE3_HASH SHA512=5b551a1366ce4fd5dfaa687e5021194d34315935b26dd7d71f8abc9935d03c3caea323263a8330fb42038c487cd399e95de68e451cc26d573f852f219c00a02f + CACHE STRING "sqlite3 source hash") + set(SODIUM_VERSION 1.0.18 CACHE STRING "libsodium version") set(SODIUM_MIRROR ${LOCAL_MIRROR} https://download.libsodium.org/libsodium/releases @@ -193,6 +200,9 @@ endif() build_external(sodium) add_static_target(sodium sodium_external libsodium.a) +build_external(sqlite3) +add_static_target(sqlite3 sqlite3_external libsqlite3.a) + if(ZMQ_VERSION VERSION_LESS 4.3.3 AND CMAKE_CROSSCOMPILING AND ARCH_TRIPLET MATCHES mingw) set(zmq_patch PATCH_COMMAND patch -p1 -i ${PROJECT_SOURCE_DIR}/contrib/cross/patches/libzmq-pr3601-mingw-build-fix.patch diff --git a/daemon/main.cpp b/daemon/main.cpp index 0b43f49c2..5bc328d2f 100644 --- a/daemon/main.cpp +++ b/daemon/main.cpp @@ -98,8 +98,8 @@ run_main_context(const fs::path confFile, const llarp::RuntimeOptions opts) llarp::Config conf; conf.Load(confFile, opts.isRouter, confFile.parent_path()); - ctx = std::shared_ptr(); - ctx->Configure(opts, {}); + ctx = std::make_shared(); + ctx->Configure(conf); signal(SIGINT, handle_signal); signal(SIGTERM, handle_signal); @@ -297,7 +297,23 @@ main(int argc, char* argv[]) } while (ftr.wait_for(std::chrono::seconds(1)) != std::future_status::ready); main_thread.join(); - const auto code = ftr.get(); + + int code = 0; + + try + { + code = ftr.get(); + } + catch (const std::exception& e) + { + std::cerr << "main thread threw exception: " << e.what() << std::endl; + code = 1; + } + catch (...) + { + std::cerr << "main thread threw non-standard exception" << std::endl; + code = 2; + } llarp::LogContext::Instance().ImmediateFlush(); #ifdef _WIN32 diff --git a/external/sqlite_orm b/external/sqlite_orm new file mode 160000 index 000000000..f7ef17a6b --- /dev/null +++ b/external/sqlite_orm @@ -0,0 +1 @@ +Subproject commit f7ef17a6bde6162e8b487deb36519bace412920a diff --git a/include/llarp.hpp b/include/llarp.hpp index 762bf38b1..5485b4730 100644 --- a/include/llarp.hpp +++ b/include/llarp.hpp @@ -5,6 +5,8 @@ #include #include #include +#include +#include #include #include @@ -14,20 +16,10 @@ struct llarp_ev_loop; -#ifdef LOKINET_HIVE -namespace tooling -{ - struct RouterHive; -} // namespace tooling -#endif - namespace llarp { class Logic; - struct AbstractRouter; struct Config; - struct Crypto; - struct CryptoManager; struct RouterContact; namespace thread { @@ -43,12 +35,11 @@ namespace llarp struct Context { - std::unique_ptr crypto; - std::unique_ptr cryptoManager; - std::unique_ptr router; - std::shared_ptr logic; - std::unique_ptr config; - std::unique_ptr nodedb; + std::unique_ptr crypto = nullptr; + std::unique_ptr cryptoManager = nullptr; + std::unique_ptr router = nullptr; + std::shared_ptr logic = nullptr; + std::unique_ptr nodedb = nullptr; llarp_ev_loop_ptr mainloop; std::string nodedb_dir; @@ -67,8 +58,11 @@ namespace llarp void HandleSignal(int sig); - bool - Configure(const RuntimeOptions& opts, std::optional dataDir); + /// Configure given the specified config. + /// + /// note: consider using std::move() when passing conf in. + void + Configure(Config conf); bool IsUp() const; @@ -90,16 +84,20 @@ namespace llarp bool CallSafe(std::function f); -#ifdef LOKINET_HIVE - void - InjectHive(tooling::RouterHive* hive); -#endif + /// Creates a router. Can be overridden to allow a different class of router + /// to be created instead. Defaults to llarp::Router. + virtual std::unique_ptr + makeRouter( + llarp_ev_loop_ptr __netloop, + std::shared_ptr logic); + + protected: + std::unique_ptr config = nullptr; private: void SigINT(); - std::string configfile; std::unique_ptr> closeWaiter; }; diff --git a/llarp/CMakeLists.txt b/llarp/CMakeLists.txt index e87cb6673..5cdd64b84 100644 --- a/llarp/CMakeLists.txt +++ b/llarp/CMakeLists.txt @@ -28,19 +28,25 @@ add_dependencies(lokinet-util genversion) target_include_directories(lokinet-util PUBLIC ${CMAKE_CURRENT_SOURCE_DIR} ${PROJECT_SOURCE_DIR}/include) +if(NOT TARGET sqlite3) + add_library(sqlite3 INTERFACE) + pkg_check_modules(SQLITE3 REQUIRED IMPORTED_TARGET sqlite3) + target_link_libraries(sqlite3 INTERFACE PkgConfig::SQLITE3) +endif() + target_link_libraries(lokinet-util PUBLIC lokinet-cryptography nlohmann_json::nlohmann_json filesystem date::date lokimq + sqlite3 ) if(ANDROID) target_link_libraries(lokinet-util PUBLIC log) endif() - add_library(lokinet-platform # for networking ev/ev.cpp @@ -156,6 +162,8 @@ add_library(liblokinet path/pathbuilder.cpp path/pathset.cpp path/transit_hop.cpp + peerstats/peer_db.cpp + peerstats/types.cpp pow.cpp profiling.cpp router/outbound_message_handler.cpp @@ -219,7 +227,11 @@ if(TESTNET) endif() if(WITH_HIVE) - target_sources(liblokinet PRIVATE tooling/router_hive.cpp) + target_sources(liblokinet PRIVATE + tooling/router_hive.cpp + tooling/hive_router.cpp + tooling/hive_context.cpp + ) endif() target_link_libraries(liblokinet PUBLIC cxxopts lokinet-platform lokinet-util lokinet-cryptography) diff --git a/llarp/config/config.cpp b/llarp/config/config.cpp index b718de338..a5b65caeb 100644 --- a/llarp/config/config.cpp +++ b/llarp/config/config.cpp @@ -36,6 +36,7 @@ namespace llarp constexpr int DefaultWorkerThreads = 1; constexpr int DefaultNetThreads = 1; constexpr bool DefaultBlockBogons = true; + constexpr bool DefaultEnablePeerStats = false; conf.defineOption("router", "job-queue-size", false, DefaultJobQueueSize, [this](int arg) { if (arg < 1024) @@ -128,6 +129,21 @@ namespace llarp conf.defineOption( "router", "transport-privkey", false, "", AssignmentAcceptor(m_transportKeyFile)); + + if (not params.isRelay) + { + // TODO: remove this -- all service nodes should run peer db + conf.defineOption( + "router", + "enable-peer-stats", + false, + DefaultEnablePeerStats, + AssignmentAcceptor(m_enablePeerStats)); + } + else + { + m_enablePeerStats = true; + } } void @@ -987,6 +1003,40 @@ namespace llarp "File containing service node's seed.", }); + // extra [network] options + // TODO: probably better to create an [exit] section and only allow it for routers + def.addOptionComments( + "network", + "exit", + { + "Whether or not we should act as an exit node. Beware that this increases demand", + "on the server and may pose liability concerns. Enable at your own risk.", + }); + + // TODO: define the order of precedence (e.g. is whitelist applied before blacklist?) + // additionally, what's default? What if I don't whitelist anything? + def.addOptionComments( + "network", + "exit-whitelist", + { + "List of destination protocol:port pairs to whitelist, example: udp:*", + "or tcp:80. Multiple values supported.", + }); + + def.addOptionComments( + "network", + "exit-blacklist", + { + "Blacklist of destinations (same format as whitelist).", + }); + + def.addOptionComments( + "router", + "enable-peer-stats", + { + "Enable collection of SNode peer stats", + }); + return def.generateINIConfig(true); } diff --git a/llarp/config/config.hpp b/llarp/config/config.hpp index 1a2733a95..5f92571e9 100644 --- a/llarp/config/config.hpp +++ b/llarp/config/config.hpp @@ -24,8 +24,6 @@ #include -struct llarp_config; - namespace llarp { using SectionValues_t = llarp::ConfigParser::SectionValues_t; @@ -64,6 +62,8 @@ namespace llarp std::string m_identityKeyFile; std::string m_transportKeyFile; + bool m_enablePeerStats = false; + void defineConfigOptions(ConfigDefinition& conf, const ConfigGenParameters& params); }; @@ -222,9 +222,6 @@ namespace llarp std::string generateBaseRouterConfig(fs::path defaultDataDir); - - llarp_config* - Copy() const; }; void diff --git a/llarp/context.cpp b/llarp/context.cpp index 7daa00818..f37eee983 100644 --- a/llarp/context.cpp +++ b/llarp/context.cpp @@ -27,31 +27,17 @@ namespace llarp return logic && LogicCall(logic, f); } - bool - Context::Configure(const RuntimeOptions& opts, std::optional dataDir) + void + Context::Configure(Config conf) { - if (config) - throw std::runtime_error("Re-configure not supported"); - - config = std::make_unique(); + if (nullptr != config.get()) + throw std::runtime_error("Config already exists"); - fs::path defaultDataDir = dataDir ? *dataDir : GetDefaultDataDir(); - - if (configfile.size()) - { - if (!config->Load(configfile.c_str(), opts.isRouter, defaultDataDir)) - { - config.release(); - llarp::LogError("failed to load config file ", configfile); - return false; - } - } + config = std::make_unique(std::move(conf)); logic = std::make_shared(); nodedb_dir = fs::path(config->router.m_dataDir / nodedb_dirname).string(); - - return true; } bool @@ -76,6 +62,10 @@ namespace llarp void Context::Setup(const RuntimeOptions& opts) { + /// Call one of the Configure() methods before calling Setup() + if (not config) + throw std::runtime_error("Cannot call Setup() on context without a Config"); + llarp::LogInfo(llarp::VERSION_FULL, " ", llarp::RELEASE_MOTTO); llarp::LogInfo("starting up"); if (mainloop == nullptr) @@ -90,12 +80,12 @@ namespace llarp crypto = std::make_unique(); cryptoManager = std::make_unique(crypto.get()); - router = std::make_unique(mainloop, logic); + router = makeRouter(mainloop, logic); nodedb = std::make_unique( nodedb_dir, [r = router.get()](auto call) { r->QueueDiskIO(std::move(call)); }); - if (!router->Configure(config.get(), opts.isRouter, nodedb.get())) + if (!router->Configure(*config.get(), opts.isRouter, nodedb.get())) throw std::runtime_error("Failed to configure router"); // must be done after router is made so we can use its disk io worker @@ -105,6 +95,14 @@ namespace llarp throw std::runtime_error("Config::Setup() failed to load database"); } + std::unique_ptr + Context::makeRouter( + llarp_ev_loop_ptr netloop, + std::shared_ptr logic) + { + return std::make_unique(netloop, logic); + } + int Context::Run(const RuntimeOptions& opts) { @@ -197,14 +195,6 @@ namespace llarp llarp::LogDebug("free logic"); logic.reset(); } - -#ifdef LOKINET_HIVE - void - Context::InjectHive(tooling::RouterHive* hive) - { - router->hive = hive; - } -#endif } // namespace llarp extern "C" diff --git a/llarp/dht/messages/gotrouter.cpp b/llarp/dht/messages/gotrouter.cpp index fef4666ca..17ea95bfb 100644 --- a/llarp/dht/messages/gotrouter.cpp +++ b/llarp/dht/messages/gotrouter.cpp @@ -127,6 +127,10 @@ namespace llarp auto* router = dht.GetRouter(); router->NotifyRouterEvent(router->pubkey(), rc); router->GossipRCIfNeeded(rc); + + auto peerDb = router->peerDb(); + if (peerDb) + peerDb->handleGossipedRC(rc); } } return true; diff --git a/llarp/iwp/session.cpp b/llarp/iwp/session.cpp index dd395f82b..06ba7dc41 100644 --- a/llarp/iwp/session.cpp +++ b/llarp/iwp/session.cpp @@ -76,7 +76,7 @@ namespace llarp GotLIM = util::memFn(&Session::GotRenegLIM, this); m_RemoteRC = msg->rc; m_Parent->MapAddr(m_RemoteRC.pubkey, this); - return m_Parent->SessionEstablished(this); + return m_Parent->SessionEstablished(this, true); } bool @@ -96,7 +96,7 @@ namespace llarp { self->m_State = State::Ready; self->m_Parent->MapAddr(self->m_RemoteRC.pubkey, self.get()); - self->m_Parent->SessionEstablished(self.get()); + self->m_Parent->SessionEstablished(self.get(), false); } }); return true; @@ -281,6 +281,13 @@ namespace llarp return false; } + SessionStats + Session::GetSessionStats() const + { + // TODO: thread safety + return m_Stats; + } + util::StatusObject Session::ExtractStatus() const { diff --git a/llarp/iwp/session.hpp b/llarp/iwp/session.hpp index 56044ffd8..00ae14510 100644 --- a/llarp/iwp/session.hpp +++ b/llarp/iwp/session.hpp @@ -115,6 +115,9 @@ namespace llarp bool ShouldPing() const override; + SessionStats + GetSessionStats() const override; + util::StatusObject ExtractStatus() const override; @@ -141,20 +144,7 @@ namespace llarp static std::string StateToString(State state); State m_State; - - struct Stats - { - // rate - uint64_t currentRateRX = 0; - uint64_t currentRateTX = 0; - - uint64_t totalPacketsRX = 0; - - uint64_t totalAckedTX = 0; - uint64_t totalDroppedTX = 0; - uint64_t totalInFlightTX = 0; - }; - Stats m_Stats; + SessionStats m_Stats; /// are we inbound session ? const bool m_Inbound; diff --git a/llarp/link/i_link_manager.hpp b/llarp/link/i_link_manager.hpp index 002a59f52..52b16c33a 100644 --- a/llarp/link/i_link_manager.hpp +++ b/llarp/link/i_link_manager.hpp @@ -4,6 +4,7 @@ #include #include #include +#include #include @@ -77,6 +78,9 @@ namespace llarp virtual void CheckPersistingSessions(llarp_time_t now) = 0; + virtual void + updatePeerDb(std::shared_ptr peerDb) = 0; + virtual util::StatusObject ExtractStatus() const = 0; }; diff --git a/llarp/link/link_manager.cpp b/llarp/link/link_manager.cpp index 9caf940c1..447b7d9d7 100644 --- a/llarp/link/link_manager.cpp +++ b/llarp/link/link_manager.cpp @@ -327,6 +327,55 @@ namespace llarp } } + void + LinkManager::updatePeerDb(std::shared_ptr peerDb) + { + std::vector> statsToUpdate; + + int64_t diffTotalTX = 0; + + ForEachPeer([&](ILinkSession* session) { + // derive RouterID + RouterID id = RouterID(session->GetRemoteRC().pubkey); + + SessionStats sessionStats = session->GetSessionStats(); + SessionStats diff; + SessionStats& lastStats = m_lastRouterStats[id]; + + // TODO: operator overloads / member func for diff + diff.currentRateRX = std::max(sessionStats.currentRateRX, lastStats.currentRateRX); + diff.currentRateTX = std::max(sessionStats.currentRateTX, lastStats.currentRateTX); + diff.totalPacketsRX = sessionStats.totalPacketsRX - lastStats.totalPacketsRX; + diff.totalAckedTX = sessionStats.totalAckedTX - lastStats.totalAckedTX; + diff.totalDroppedTX = sessionStats.totalDroppedTX - lastStats.totalDroppedTX; + + diffTotalTX = diff.totalAckedTX + diff.totalDroppedTX + diff.totalInFlightTX; + + lastStats = sessionStats; + + // TODO: if we have both inbound and outbound session, this will overwrite + statsToUpdate.push_back({id, diff}); + }); + + for (auto& routerStats : statsToUpdate) + { + peerDb->modifyPeerStats(routerStats.first, [&](PeerStats& stats) { + // TODO: store separate stats for up vs down + const auto& diff = routerStats.second; + + // note that 'currentRateRX' and 'currentRateTX' are per-second + stats.peakBandwidthBytesPerSec = std::max( + stats.peakBandwidthBytesPerSec, + (double)std::max(diff.currentRateRX, diff.currentRateTX)); + stats.numPacketsDropped += diff.totalDroppedTX; + stats.numPacketsSent = diff.totalAckedTX; + stats.numPacketsAttempted = diffTotalTX; + + // TODO: others -- we have slight mismatch on what we store + }); + } + } + util::StatusObject LinkManager::ExtractStatus() const { diff --git a/llarp/link/link_manager.hpp b/llarp/link/link_manager.hpp index ae9bfabf7..ddd546854 100644 --- a/llarp/link/link_manager.hpp +++ b/llarp/link/link_manager.hpp @@ -74,6 +74,9 @@ namespace llarp void CheckPersistingSessions(llarp_time_t now) override; + void + updatePeerDb(std::shared_ptr peerDb) override; + util::StatusObject ExtractStatus() const override; @@ -96,6 +99,8 @@ namespace llarp std::unordered_map m_PersistingSessions GUARDED_BY(_mutex); + std::unordered_map m_lastRouterStats; + IOutboundSessionMaker* _sessionMaker; }; diff --git a/llarp/link/server.hpp b/llarp/link/server.hpp index 8c9005e5c..55df4fa6f 100644 --- a/llarp/link/server.hpp +++ b/llarp/link/server.hpp @@ -17,34 +17,53 @@ namespace llarp { - /// handle a link layer message + /// handle a link layer message. this allows for the message to be handled by "upper layers" + /// + /// currently called from iwp::Session when messages are sent or received. using LinkMessageHandler = std::function; - /// sign a buffer with identity key + /// sign a buffer with identity key. this function should take the given `llarp_buffer_t` and + /// sign it, prividing the signature in the out variable `Signature&`. + /// + /// currently called from iwp::Session for signing LIMs (link introduction messages) using SignBufferFunc = std::function; /// handle connection timeout + /// + /// currently called from ILinkLayer::Pump() when an unestablished session times out using TimeoutHandler = std::function; /// get our RC + /// + /// currently called by iwp::Session to include as part of a LIM (link introduction message) using GetRCFunc = std::function; /// handler of session established /// return false to reject /// return true to accept - using SessionEstablishedHandler = std::function; + /// + /// currently called in iwp::Session when a valid LIM is received. + using SessionEstablishedHandler = std::function; /// f(new, old) /// handler of session renegotiation /// returns true if the new rc is valid /// returns false otherwise and the session is terminated + /// + /// currently called from iwp::Session when we receive a renegotiation LIM using SessionRenegotiateHandler = std::function; /// handles close of all sessions with pubkey + /// + /// Note that this handler is called while m_AuthedLinksMutex is held + /// + /// currently called from iwp::ILinkSession when a previously established session times out using SessionClosedHandler = std::function; /// notifies router that a link session has ended its pump and we should flush /// messages to upper layers + /// + /// currently called at the end of every iwp::Session::Pump() call using PumpDoneHandler = std::function; using Work_t = std::function; diff --git a/llarp/link/session.hpp b/llarp/link/session.hpp index 199b14996..efb6f1dd5 100644 --- a/llarp/link/session.hpp +++ b/llarp/link/session.hpp @@ -15,6 +15,19 @@ namespace llarp struct ILinkMessage; struct ILinkLayer; + struct SessionStats + { + // rate + uint64_t currentRateRX = 0; + uint64_t currentRateTX = 0; + + uint64_t totalPacketsRX = 0; + + uint64_t totalAckedTX = 0; + uint64_t totalDroppedTX = 0; + uint64_t totalInFlightTX = 0; + }; + struct ILinkSession { virtual ~ILinkSession() = default; @@ -108,6 +121,10 @@ namespace llarp virtual bool ShouldPing() const = 0; + /// return the current stats for this session + virtual SessionStats + GetSessionStats() const = 0; + virtual util::StatusObject ExtractStatus() const = 0; }; diff --git a/llarp/peerstats/orm.hpp b/llarp/peerstats/orm.hpp new file mode 100644 index 000000000..53670cd27 --- /dev/null +++ b/llarp/peerstats/orm.hpp @@ -0,0 +1,137 @@ +#pragma once + +#include + +#include + +/// Contains some code to help deal with sqlite_orm in hopes of keeping other headers clean + +namespace llarp +{ + inline auto + initStorage(const std::string& file) + { + using namespace sqlite_orm; + return make_storage( + file, + make_table( + "peerstats", + make_column("routerId", &PeerStats::routerId, primary_key(), unique()), + make_column("numConnectionAttempts", &PeerStats::numConnectionAttempts), + make_column("numConnectionSuccesses", &PeerStats::numConnectionSuccesses), + make_column("numConnectionRejections", &PeerStats::numConnectionRejections), + make_column("numConnectionTimeouts", &PeerStats::numConnectionTimeouts), + make_column("numPathBuilds", &PeerStats::numPathBuilds), + make_column("numPacketsAttempted", &PeerStats::numPacketsAttempted), + make_column("numPacketsSent", &PeerStats::numPacketsSent), + make_column("numPacketsDropped", &PeerStats::numPacketsDropped), + make_column("numPacketsResent", &PeerStats::numPacketsResent), + make_column("numDistinctRCsReceived", &PeerStats::numDistinctRCsReceived), + make_column("numLateRCs", &PeerStats::numLateRCs), + make_column("peakBandwidthBytesPerSec", &PeerStats::peakBandwidthBytesPerSec), + make_column("longestRCReceiveInterval", &PeerStats::longestRCReceiveInterval), + make_column("leastRCRemainingLifetime", &PeerStats::leastRCRemainingLifetime))); + } + + using PeerDbStorage = decltype(initStorage("")); + +} // namespace llarp + +/// "custom" types for sqlite_orm +/// reference: https://github.com/fnc12/sqlite_orm/blob/master/examples/enum_binding.cpp +namespace sqlite_orm +{ + /// llarp_time_t serialization + template <> + struct type_printer : public integer_printer + { + }; + + template <> + struct statement_binder + { + int + bind(sqlite3_stmt* stmt, int index, const llarp_time_t& value) + { + return statement_binder().bind(stmt, index, value.count()); + } + }; + + template <> + struct field_printer + { + std::string + operator()(const llarp_time_t& value) const + { + std::stringstream stream; + stream << value.count(); + return stream.str(); + } + }; + + template <> + struct row_extractor + { + llarp_time_t + extract(const char* row_value) + { + int64_t raw = static_cast(atoi(row_value)); + return llarp_time_t(raw); + } + + llarp_time_t + extract(sqlite3_stmt* stmt, int columnIndex) + { + auto str = sqlite3_column_text(stmt, columnIndex); + return this->extract((const char*)str); + } + }; + + /// RouterID serialization + template <> + struct type_printer : public text_printer + { + }; + + template <> + struct statement_binder + { + int + bind(sqlite3_stmt* stmt, int index, const llarp::RouterID& value) + { + return statement_binder().bind(stmt, index, value.ToString()); + } + }; + + template <> + struct field_printer + { + std::string + operator()(const llarp::RouterID& value) const + { + return value.ToString(); + } + }; + + template <> + struct row_extractor + { + llarp::RouterID + extract(const char* row_value) + { + llarp::RouterID id; + if (not id.FromString(row_value)) + throw std::runtime_error("Invalid RouterID in sqlite3 database"); + + return id; + } + + llarp::RouterID + extract(sqlite3_stmt* stmt, int columnIndex) + { + auto str = sqlite3_column_text(stmt, columnIndex); + return this->extract((const char*)str); + } + }; + +} // namespace sqlite_orm diff --git a/llarp/peerstats/peer_db.cpp b/llarp/peerstats/peer_db.cpp new file mode 100644 index 000000000..3ca8d4328 --- /dev/null +++ b/llarp/peerstats/peer_db.cpp @@ -0,0 +1,303 @@ +#include + +#include +#include +#include + +namespace llarp +{ + PeerDb::PeerDb() + { + m_lastFlush.store({}); + } + + void + PeerDb::loadDatabase(std::optional file) + { + std::lock_guard guard(m_statsLock); + + if (m_storage) + throw std::runtime_error("Reloading database not supported"); // TODO + + m_peerStats.clear(); + + // sqlite_orm treats empty-string as an indicator to load a memory-backed database, which we'll + // use if file is an empty-optional + std::string fileString; + if (file.has_value()) + { + fileString = file->string(); + LogInfo("Loading PeerDb from file ", fileString); + } + else + { + LogInfo("Loading memory-backed PeerDb"); + } + + m_storage = std::make_unique(initStorage(fileString)); + m_storage->sync_schema(true); // true for "preserve" as in "don't nuke" (how cute!) + + auto allStats = m_storage->get_all(); + LogInfo("Loading ", allStats.size(), " PeerStats from table peerstats..."); + for (PeerStats& stats : allStats) + { + // we cleared m_peerStats, and the database should enforce that routerId is unique... + assert(m_peerStats.find(stats.routerId) == m_peerStats.end()); + + stats.stale = false; + m_peerStats[stats.routerId] = stats; + } + } + + void + PeerDb::flushDatabase() + { + LogDebug("flushing PeerDb..."); + + auto start = time_now_ms(); + if (not shouldFlush(start)) + { + LogWarn("Call to flushDatabase() while already in progress, ignoring"); + return; + } + + if (not m_storage) + throw std::runtime_error("Cannot flush database before it has been loaded"); + + std::vector staleStats; + + { + std::lock_guard guard(m_statsLock); + + // copy all stale entries + for (auto& entry : m_peerStats) + { + if (entry.second.stale) + { + staleStats.push_back(entry.second); + entry.second.stale = false; + } + } + } + + LogInfo("Updating ", staleStats.size(), " stats"); + + { + auto guard = m_storage->transaction_guard(); + + for (const auto& stats : staleStats) + { + m_storage->replace(stats); + } + + guard.commit(); + } + + auto end = time_now_ms(); + + auto elapsed = end - start; + LogInfo("PeerDb flush took about ", elapsed, " seconds"); + + m_lastFlush.store(end); + } + + void + PeerDb::accumulatePeerStats(const RouterID& routerId, const PeerStats& delta) + { + if (routerId != delta.routerId) + throw std::invalid_argument( + stringify("routerId ", routerId, " doesn't match ", delta.routerId)); + + std::lock_guard guard(m_statsLock); + auto itr = m_peerStats.find(routerId); + if (itr == m_peerStats.end()) + itr = m_peerStats.insert({routerId, delta}).first; + else + itr->second += delta; + + itr->second.stale = true; + } + + void + PeerDb::modifyPeerStats(const RouterID& routerId, std::function callback) + { + std::lock_guard guard(m_statsLock); + + PeerStats& stats = m_peerStats[routerId]; + stats.routerId = routerId; + stats.stale = true; + callback(stats); + } + + std::optional + PeerDb::getCurrentPeerStats(const RouterID& routerId) const + { + std::lock_guard guard(m_statsLock); + auto itr = m_peerStats.find(routerId); + if (itr == m_peerStats.end()) + return std::nullopt; + else + return itr->second; + } + + std::vector + PeerDb::listAllPeerStats() const + { + std::lock_guard guard(m_statsLock); + + std::vector statsList; + statsList.reserve(m_peerStats.size()); + + for (const auto& [routerId, stats] : m_peerStats) + { + statsList.push_back(stats); + } + + return statsList; + } + + std::vector + PeerDb::listPeerStats(const std::vector& ids) const + { + std::lock_guard guard(m_statsLock); + + std::vector statsList; + statsList.reserve(ids.size()); + + for (const auto& id : ids) + { + const auto itr = m_peerStats.find(id); + if (itr != m_peerStats.end()) + statsList.push_back(itr->second); + } + + return statsList; + } + + /// Assume we receive an RC at some point `R` in time which was signed at some point `S` in time + /// and expires at some point `E` in time, as depicted below: + /// + /// +-----------------------------+ + /// | signed rc | <- useful lifetime of RC + /// +-----------------------------+ + /// ^ [ . . . . . . . . ] <----------- window in which we receive this RC gossiped to us + /// | ^ ^ + /// | | | + /// S R E + /// + /// One useful metric from this is the difference between (E - R), the useful contact time of this + /// RC. As we track this metric over time, the high and low watermarks serve to tell us how + /// quickly we receive signed RCs from a given router and how close to expiration they are when + /// we receive them. The latter is particularly useful, and should always be a positive number for + /// a healthy router. A negative number indicates that we are receiving an expired RC. + /// + /// TODO: we actually discard expired RCs, so we currently would not detect a negative value for + /// (E - R) + /// + /// Another related metric is the distance between a newly received RC and the previous RC's + /// expiration, which represents how close we came to having no useful RC to work with. This + /// should be a high (positive) number for a healthy router, and if negative indicates that we + /// had no way to contact this router for a period of time. + /// + /// E1 E2 E3 + /// | | | + /// v | | + /// +-----------------------------+ | | + /// | signed rc 1 | | | + /// +-----------------------------+ | | + /// [ . . . . . ] v | + /// ^ +-----------------------------+ | + /// | | signed rc 2 | | + /// | +-----------------------------+ | + /// | [ . . . . . . . . . . ] v + /// | ^ +-----------------------------+ + /// | | | signed rc 3 | + /// | | +-----------------------------+ + /// | | [ . . ] + /// | | ^ + /// | | | + /// R1 R2 R3 + /// + /// Example: the delta between (E1 - R2) is healthy, but the delta between (E2 - R3) is indicates + /// that we had a brief period of time where we had no valid (non-expired) RC for this router + /// (because it is negative). + void + PeerDb::handleGossipedRC(const RouterContact& rc, llarp_time_t now) + { + std::lock_guard guard(m_statsLock); + + RouterID id(rc.pubkey); + auto& stats = m_peerStats[id]; + stats.routerId = id; + + const bool isNewRC = (stats.lastRCUpdated < rc.last_updated); + + if (isNewRC) + { + stats.numDistinctRCsReceived++; + + if (stats.numDistinctRCsReceived > 1) + { + auto prevRCExpiration = (stats.lastRCUpdated + RouterContact::Lifetime); + + // we track max expiry as the delta between (last expiration time - time received), + // and this value will be negative for an unhealthy router + // TODO: handle case where new RC is also expired? just ignore? + auto expiry = prevRCExpiration - now; + + if (stats.numDistinctRCsReceived == 2) + stats.leastRCRemainingLifetime = expiry; + else + stats.leastRCRemainingLifetime = std::min(stats.leastRCRemainingLifetime, expiry); + } + + stats.lastRCUpdated = rc.last_updated; + stats.stale = true; + } + } + + void + PeerDb::configure(const RouterConfig& routerConfig) + { + if (not routerConfig.m_enablePeerStats) + throw std::runtime_error("[router]:enable-peer-stats is not enabled"); + + fs::path dbPath = routerConfig.m_dataDir / "peerstats.sqlite"; + + loadDatabase(dbPath); + } + + bool + PeerDb::shouldFlush(llarp_time_t now) + { + constexpr llarp_time_t TargetFlushInterval = 30s; + return (now - m_lastFlush.load() >= TargetFlushInterval); + } + + util::StatusObject + PeerDb::ExtractStatus() const + { + std::lock_guard guard(m_statsLock); + + bool loaded = (m_storage.get() != nullptr); + util::StatusObject dbFile = nullptr; + if (loaded) + dbFile = m_storage->filename(); + + std::vector statsObjs; + statsObjs.reserve(m_peerStats.size()); + for (const auto& pair : m_peerStats) + { + statsObjs.push_back(pair.second.toJson()); + } + + util::StatusObject obj{ + {"dbLoaded", loaded}, + {"dbFile", dbFile}, + {"lastFlushMs", m_lastFlush.load().count()}, + {"stats", statsObjs}, + }; + return obj; + } + +}; // namespace llarp diff --git a/llarp/peerstats/peer_db.hpp b/llarp/peerstats/peer_db.hpp new file mode 100644 index 000000000..d5d4a1a05 --- /dev/null +++ b/llarp/peerstats/peer_db.hpp @@ -0,0 +1,138 @@ +#pragma once + +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include + +namespace llarp +{ + /// Maintains a database of stats collected about the connections with our Service Node peers. + /// This uses a sqlite3 database behind the scenes as persistance, but this database is + /// periodically flushed to, meaning that it will become stale as PeerDb accumulates stats without + /// a flush. + struct PeerDb + { + /// Constructor + PeerDb(); + + /// Loads the database from disk using the provided filepath. If the file is equal to + /// `std::nullopt`, the database will be loaded into memory (useful for testing). + /// + /// This must be called prior to calling flushDatabase(), and will truncate any existing data. + /// + /// This is a blocking call, both in the sense that it blocks on disk/database I/O and that it + /// will sit on a mutex while the database is loaded. + /// + /// @param file is an optional file which doesn't have to exist but must be writable, if a value + /// is provided. If no value is provided, the database will be memory-backed. + /// @throws if sqlite_orm/sqlite3 is unable to open or create a database at the given file + void + loadDatabase(std::optional file); + + /// Flushes the database. Must be called after loadDatabase(). This call will block during I/O + /// and should be called in an appropriate threading context. However, it will make a temporary + /// copy of the peer stats so as to avoid sitting on a mutex lock during disk I/O. + /// + /// @throws if the database could not be written to (esp. if loadDatabase() has not been called) + void + flushDatabase(); + + /// Add the given stats to the cummulative stats for the given peer. For cummulative stats, the + /// stats are added together; for watermark stats, the max is kept. + /// + /// This is intended to be used in the following pattern: + /// + /// 1) Initialize an empty PeerStats + /// 2) Collect relevant stats + /// 3) Call accumulatePeerStats() with the stats + /// 4) Reset the stats to 0 + /// 5) + /// + /// @param routerId is the id of the router whose stats should be modified. + /// @param delta is the stats to add to the existing stats + void + accumulatePeerStats(const RouterID& routerId, const PeerStats& delta); + + /// Allows write-access to the stats for a given peer while appropriate mutex lock is held. This + /// is an alternative means of incrementing peer stats that is suitable for one-off + /// modifications. + /// + /// Note that this holds m_statsLock during the callback invocation, so the callback should + /// return as quickly as possible. + /// + /// @param routerId is the id of the router whose stats should be modified. + /// @param callback is a function which will be called immediately with mutex held + void + modifyPeerStats(const RouterID& routerId, std::function callback); + + /// Provides a snapshot of the most recent PeerStats we have for the given peer. If we don't + /// have any stats for the peer, std::nullopt + /// + /// @param routerId is the RouterID of the requested peer + /// @return a copy of the most recent peer stats or an empty one if no such peer is known + std::optional + getCurrentPeerStats(const RouterID& routerId) const; + + /// Lists all peer stats. This essentially dumps the database into a list of PeerStats objects. + /// + /// Note that this avoids disk I/O by copying from our cached map of peers. + /// + /// @return a list of all PeerStats we have maintained + std::vector + listAllPeerStats() const; + + /// Lists specific peer stats. + /// + /// @param peers is list of RouterIDs which are desired + /// @return a list of the requested peers. Peers not found will be omitted. + std::vector + listPeerStats(const std::vector& ids) const; + + /// Handles a new gossiped RC, updating stats as needed. The database tracks the last + /// advertised update time, so it knows whether this is a new RC or not. + /// + /// The given RC is assumed to be valid. + /// + /// @param rc is the RouterContact to handle + /// @param now is an optional time representing the current time + void + handleGossipedRC(const RouterContact& rc, llarp_time_t now = time_now_ms()); + + /// Configures the PeerDb based on RouterConfig + /// + /// @param routerConfig + void + configure(const RouterConfig& routerConfig); + + /// Returns whether or not we should flush, as determined by the last time we flushed and the + /// configured flush interval. + /// + /// @param now is the current[-ish] time + bool + shouldFlush(llarp_time_t now); + + /// Get JSON status for API + /// + /// @return JSON object representing our current status + util::StatusObject + ExtractStatus() const; + + private: + std::unordered_map m_peerStats; + mutable std::mutex m_statsLock; + + std::unique_ptr m_storage; + + std::atomic m_lastFlush; + }; + +} // namespace llarp diff --git a/llarp/peerstats/types.cpp b/llarp/peerstats/types.cpp new file mode 100644 index 000000000..af78d70c0 --- /dev/null +++ b/llarp/peerstats/types.cpp @@ -0,0 +1,159 @@ +#include + +#include + +#include + +namespace llarp +{ + + constexpr auto RouterIdKey = "routerId"; + constexpr auto NumConnectionAttemptsKey = "numConnectionAttempts"; + constexpr auto NumConnectionSuccessesKey = "numConnectionSuccesses"; + constexpr auto NumConnectionRejectionsKey = "numConnectionRejections"; + constexpr auto NumConnectionTimeoutsKey = "numConnectionTimeouts"; + constexpr auto NumPathBuildsKey = "numPathBuilds"; + constexpr auto NumPacketsAttemptedKey = "numPacketsAttempted"; + constexpr auto NumPacketsSentKey = "numPacketsSent"; + constexpr auto NumPacketsDroppedKey = "numPacketsDropped"; + constexpr auto NumPacketsResentKey = "numPacketsResent"; + constexpr auto NumDistinctRCsReceivedKey = "numDistinctRCsReceived"; + constexpr auto NumLateRCsKey = "numLateRCs"; + constexpr auto PeakBandwidthBytesPerSecKey = "peakBandwidthBytesPerSec"; + constexpr auto LongestRCReceiveIntervalKey = "longestRCReceiveInterval"; + constexpr auto LeastRCRemainingLifetimeKey = "leastRCRemainingLifetime"; + constexpr auto LastRCUpdatedKey = "lastRCUpdated"; + + PeerStats::PeerStats() = default; + + PeerStats::PeerStats(const RouterID& routerId_) : routerId(routerId_) + { + } + + PeerStats& + PeerStats::operator+=(const PeerStats& other) + { + numConnectionAttempts += other.numConnectionAttempts; + numConnectionSuccesses += other.numConnectionSuccesses; + numConnectionRejections += other.numConnectionRejections; + numConnectionTimeouts += other.numConnectionTimeouts; + + numPathBuilds += other.numPathBuilds; + numPacketsAttempted += other.numPacketsAttempted; + numPacketsSent += other.numPacketsSent; + numPacketsDropped += other.numPacketsDropped; + numPacketsResent += other.numPacketsResent; + + numDistinctRCsReceived += other.numDistinctRCsReceived; + numLateRCs += other.numLateRCs; + + peakBandwidthBytesPerSec = std::max(peakBandwidthBytesPerSec, other.peakBandwidthBytesPerSec); + longestRCReceiveInterval = std::max(longestRCReceiveInterval, other.longestRCReceiveInterval); + leastRCRemainingLifetime = std::max(leastRCRemainingLifetime, other.leastRCRemainingLifetime); + lastRCUpdated = std::max(lastRCUpdated, other.lastRCUpdated); + + return *this; + } + + bool + PeerStats::operator==(const PeerStats& other) const + { + return routerId == other.routerId and numConnectionAttempts == other.numConnectionAttempts + and numConnectionSuccesses == other.numConnectionSuccesses + and numConnectionRejections == other.numConnectionRejections + and numConnectionTimeouts == other.numConnectionTimeouts + + and numPathBuilds == other.numPathBuilds + and numPacketsAttempted == other.numPacketsAttempted + and numPacketsSent == other.numPacketsSent and numPacketsDropped == other.numPacketsDropped + and numPacketsResent == other.numPacketsResent + + and numDistinctRCsReceived == other.numDistinctRCsReceived + and numLateRCs == other.numLateRCs + + and peakBandwidthBytesPerSec == other.peakBandwidthBytesPerSec + and longestRCReceiveInterval == other.longestRCReceiveInterval + and leastRCRemainingLifetime == other.leastRCRemainingLifetime + and lastRCUpdated == other.lastRCUpdated; + } + + util::StatusObject + PeerStats::toJson() const + { + return { + {RouterIdKey, routerId.ToString()}, + {NumConnectionAttemptsKey, numConnectionAttempts}, + {NumConnectionSuccessesKey, numConnectionSuccesses}, + {NumConnectionRejectionsKey, numConnectionRejections}, + {NumConnectionTimeoutsKey, numConnectionTimeouts}, + {NumPathBuildsKey, numPathBuilds}, + {NumPacketsAttemptedKey, numPacketsAttempted}, + {NumPacketsSentKey, numPacketsSent}, + {NumPacketsDroppedKey, numPacketsDropped}, + {NumPacketsResentKey, numPacketsResent}, + {NumDistinctRCsReceivedKey, numDistinctRCsReceived}, + {NumLateRCsKey, numLateRCs}, + {PeakBandwidthBytesPerSecKey, peakBandwidthBytesPerSec}, + {LongestRCReceiveIntervalKey, longestRCReceiveInterval.count()}, + {LeastRCRemainingLifetimeKey, leastRCRemainingLifetime.count()}, + {LastRCUpdatedKey, lastRCUpdated.count()}, + }; + } + + void + PeerStats::BEncode(llarp_buffer_t* buf) const + { + if (not buf) + throw std::runtime_error("PeerStats: Can't use null buf"); + + auto encodeUint64Entry = [&](std::string_view key, uint64_t value) { + if (not bencode_write_uint64_entry(buf, key.data(), key.size(), value)) + throw std::runtime_error(stringify("PeerStats: Could not encode ", key)); + }; + + if (not bencode_start_dict(buf)) + throw std::runtime_error("PeerStats: Could not create bencode dict"); + + // TODO: we don't have bencode support for dict entries other than uint64...? + + // encodeUint64Entry(RouterIdKey, routerId); + encodeUint64Entry(NumConnectionAttemptsKey, numConnectionAttempts); + encodeUint64Entry(NumConnectionSuccessesKey, numConnectionSuccesses); + encodeUint64Entry(NumConnectionRejectionsKey, numConnectionRejections); + encodeUint64Entry(NumConnectionTimeoutsKey, numConnectionTimeouts); + encodeUint64Entry(NumPathBuildsKey, numPathBuilds); + encodeUint64Entry(NumPacketsAttemptedKey, numPacketsAttempted); + encodeUint64Entry(NumPacketsSentKey, numPacketsSent); + encodeUint64Entry(NumPacketsDroppedKey, numPacketsDropped); + encodeUint64Entry(NumPacketsResentKey, numPacketsResent); + encodeUint64Entry(NumDistinctRCsReceivedKey, numDistinctRCsReceived); + encodeUint64Entry(NumLateRCsKey, numLateRCs); + encodeUint64Entry(PeakBandwidthBytesPerSecKey, (uint64_t)peakBandwidthBytesPerSec); + encodeUint64Entry(LongestRCReceiveIntervalKey, longestRCReceiveInterval.count()); + encodeUint64Entry(LeastRCRemainingLifetimeKey, leastRCRemainingLifetime.count()); + encodeUint64Entry(LastRCUpdatedKey, lastRCUpdated.count()); + + if (not bencode_end(buf)) + throw std::runtime_error("PeerStats: Could not end bencode dict"); + + } + + void + PeerStats::BEncodeList(const std::vector& statsList, llarp_buffer_t* buf) + { + if (not buf) + throw std::runtime_error("PeerStats: Can't use null buf"); + + if (not bencode_start_list(buf)) + throw std::runtime_error("PeerStats: Could not create bencode dict"); + + for (const auto& stats : statsList) + { + stats.BEncode(buf); + } + + if (not bencode_end(buf)) + throw std::runtime_error("PeerStats: Could not end bencode dict"); + } + +}; // namespace llarp diff --git a/llarp/peerstats/types.hpp b/llarp/peerstats/types.hpp new file mode 100644 index 000000000..78684a61a --- /dev/null +++ b/llarp/peerstats/types.hpp @@ -0,0 +1,59 @@ +#pragma once + +#include +#include + +#include +#include +#include + +/// Types stored in our peerstats database are declared here + +namespace llarp +{ + // Struct containing stats we know about a peer + struct PeerStats + { + RouterID routerId; + + int32_t numConnectionAttempts = 0; + int32_t numConnectionSuccesses = 0; + int32_t numConnectionRejections = 0; + int32_t numConnectionTimeouts = 0; + + int32_t numPathBuilds = 0; + int64_t numPacketsAttempted = 0; + int64_t numPacketsSent = 0; + int64_t numPacketsDropped = 0; + int64_t numPacketsResent = 0; + + int32_t numDistinctRCsReceived = 0; + int32_t numLateRCs = 0; + + double peakBandwidthBytesPerSec = 0; + llarp_time_t longestRCReceiveInterval = 0ms; + llarp_time_t leastRCRemainingLifetime = 0ms; + llarp_time_t lastRCUpdated = 0ms; + + // not serialized + bool stale = true; + + PeerStats(); + PeerStats(const RouterID& routerId); + + PeerStats& + operator+=(const PeerStats& other); + bool + operator==(const PeerStats& other) const; + + util::StatusObject + toJson() const; + + void + BEncode(llarp_buffer_t* buf) const; + + static void + BEncodeList(const std::vector& statsList, llarp_buffer_t* buf); + }; + +} // namespace llarp diff --git a/llarp/router/abstractrouter.hpp b/llarp/router/abstractrouter.hpp index 90ae1e952..c7a581c31 100644 --- a/llarp/router/abstractrouter.hpp +++ b/llarp/router/abstractrouter.hpp @@ -11,9 +11,10 @@ #include #include #include +#include #ifdef LOKINET_HIVE -#include "tooling/router_hive.hpp" +#include "tooling/router_event.hpp" #endif struct llarp_buffer_t; @@ -151,11 +152,14 @@ namespace llarp virtual I_RCLookupHandler& rcLookupHandler() = 0; + virtual std::shared_ptr + peerDb() = 0; + virtual bool Sign(Signature& sig, const llarp_buffer_t& buf) const = 0; virtual bool - Configure(Config* conf, bool isRouter, llarp_nodedb* nodedb) = 0; + Configure(const Config& conf, bool isRouter, llarp_nodedb* nodedb) = 0; virtual bool IsServiceNode() const = 0; @@ -193,19 +197,10 @@ namespace llarp /// connect to N random routers virtual void ConnectToRandomRouters(int N) = 0; - /// inject configuration and reconfigure router - virtual bool - Reconfigure(Config* conf) = 0; virtual bool TryConnectAsync(RouterContact rc, uint16_t tries) = 0; - /// validate new configuration against old one - /// return true on 100% valid - /// return false if not 100% valid - virtual bool - ValidateConfig(Config* conf) const = 0; - /// called by link when a remote session has no more sessions open virtual void SessionClosed(RouterID remote) = 0; @@ -288,14 +283,23 @@ namespace llarp virtual void GossipRCIfNeeded(const RouterContact rc) = 0; + /// Templated convenience function to generate a RouterHive event and + /// delegate to non-templated (and overridable) function for handling. template void NotifyRouterEvent([[maybe_unused]] Params&&... args) const { -#ifdef LOKINET_HIVE - hive->NotifyEvent(std::make_unique(std::forward(args)...)); -#endif + // TODO: no-op when appropriate + auto event = std::make_unique(args...); + HandleRouterEvent(std::move(event)); } + + protected: + /// Virtual function to handle RouterEvent. HiveRouter overrides this in + /// order to inject the event. The default implementation in Router simply + /// logs it. + virtual void + HandleRouterEvent(tooling::RouterEventPtr event) const = 0; }; } // namespace llarp diff --git a/llarp/router/outbound_session_maker.cpp b/llarp/router/outbound_session_maker.cpp index f7e50185d..c03dc34f7 100644 --- a/llarp/router/outbound_session_maker.cpp +++ b/llarp/router/outbound_session_maker.cpp @@ -1,5 +1,7 @@ #include +#include +#include #include #include #include @@ -152,6 +154,7 @@ namespace llarp void OutboundSessionMaker::Init( + AbstractRouter* router, ILinkManager* linkManager, I_RCLookupHandler* rcLookup, Profiling* profiler, @@ -159,6 +162,7 @@ namespace llarp llarp_nodedb* nodedb, WorkerFunc_t dowork) { + _router = router; _linkManager = linkManager; _rcLookup = rcLookup; _logic = logic; @@ -298,8 +302,18 @@ namespace llarp void OutboundSessionMaker::CreatePendingSession(const RouterID& router) { - util::Lock l(_mutex); - pendingSessions.emplace(router, nullptr); + { + util::Lock l(_mutex); + pendingSessions.emplace(router, nullptr); + } + + auto peerDb = _router->peerDb(); + if (peerDb) + { + peerDb->modifyPeerStats(router, [](PeerStats& stats) { stats.numConnectionAttempts++; }); + } + + _router->NotifyRouterEvent(_router->pubkey(), router); } void diff --git a/llarp/router/outbound_session_maker.hpp b/llarp/router/outbound_session_maker.hpp index 63069f65e..063594cbd 100644 --- a/llarp/router/outbound_session_maker.hpp +++ b/llarp/router/outbound_session_maker.hpp @@ -58,6 +58,7 @@ namespace llarp void Init( + AbstractRouter* router, ILinkManager* linkManager, I_RCLookupHandler* rcLookup, Profiling* profiler, @@ -110,6 +111,7 @@ namespace llarp std::unordered_map pendingCallbacks GUARDED_BY(_mutex); + AbstractRouter* _router = nullptr; ILinkManager* _linkManager = nullptr; I_RCLookupHandler* _rcLookup = nullptr; Profiling* _profiler = nullptr; diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index 7b8f0d336..7bbc82ecf 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -21,8 +21,10 @@ #include #include #include +#include #include "tooling/router_event.hpp" +#include "util/status.hpp" #include #include @@ -80,13 +82,18 @@ namespace llarp { if (_running) { + util::StatusObject peerStatsObj = nullptr; + if (m_peerDb) + peerStatsObj = m_peerDb->ExtractStatus(); + return util::StatusObject{{"running", true}, {"numNodesKnown", _nodedb->num_loaded()}, {"dht", _dht->impl->ExtractStatus()}, {"services", _hiddenServiceContext.ExtractStatus()}, {"exit", _exitContext.ExtractStatus()}, {"links", _linkManager.ExtractStatus()}, - {"outboundMessages", _outboundMessageHandler.ExtractStatus()}}; + {"outboundMessages", _outboundMessageHandler.ExtractStatus()}, + {"peerStats", peerStatsObj}}; } else { @@ -117,6 +124,9 @@ namespace llarp void Router::GossipRCIfNeeded(const RouterContact rc) { + if (disableGossipingRC_TestingOnly()) + return; + /// if we are not a service node forget about gossip if (not IsServiceNode()) return; @@ -212,7 +222,31 @@ namespace llarp return false; #endif #endif - _identity = RpcClient()->ObtainIdentityKey(); + constexpr int maxTries = 5; + int numTries = 0; + while (numTries < maxTries) + { + numTries++; + try + { + _identity = RpcClient()->ObtainIdentityKey(); + LogWarn("Obtained lokid identity keys"); + break; + } + catch (const std::exception& e) + { + LogWarn( + "Failed attempt ", + numTries, + " of ", + maxTries, + " to get lokid identity keys because: ", + e.what()); + + if (numTries == maxTries) + throw; + } + } } else { @@ -228,16 +262,16 @@ namespace llarp } bool - Router::Configure(Config* conf, bool isRouter, llarp_nodedb* nodedb) + Router::Configure(const Config& conf, bool isRouter, llarp_nodedb* nodedb) { - // we need this first so we can start lmq to fetch keys - if (conf) - { - enableRPCServer = conf->api.m_enableRPCServer; - rpcBindAddr = lokimq::address(conf->api.m_rpcBindAddr); - whitelistRouters = conf->lokid.whitelistRouters; - lokidRPCAddr = lokimq::address(conf->lokid.lokidRPCAddr); - } + whitelistRouters = conf.lokid.whitelistRouters; + if (whitelistRouters) + lokidRPCAddr = lokimq::address(conf.lokid.lokidRPCAddr); + + enableRPCServer = conf.api.m_enableRPCServer; + if (enableRPCServer) + rpcBindAddr = lokimq::address(conf.api.m_rpcBindAddr); + if (not StartRpcServer()) throw std::runtime_error("Failed to start rpc server"); @@ -251,13 +285,11 @@ namespace llarp } // fetch keys - if (conf) - { - if (not m_keyManager->initialize(*conf, true, isRouter)) - throw std::runtime_error("KeyManager failed to initialize"); - if (!FromConfig(conf)) - throw std::runtime_error("FromConfig() failed"); - } + if (not m_keyManager->initialize(conf, true, isRouter)) + throw std::runtime_error("KeyManager failed to initialize"); + if (!FromConfig(conf)) + throw std::runtime_error("FromConfig() failed"); + if (!InitOutboundLinks()) throw std::runtime_error("InitOutboundLinks() failed"); @@ -374,12 +406,12 @@ namespace llarp } bool - Router::FromConfig(Config* conf) + Router::FromConfig(const Config& conf) { // Set netid before anything else - if (!conf->router.m_netId.empty() && strcmp(conf->router.m_netId.c_str(), llarp::DEFAULT_NETID)) + if (!conf.router.m_netId.empty() && strcmp(conf.router.m_netId.c_str(), llarp::DEFAULT_NETID)) { - const auto& netid = conf->router.m_netId; + const auto& netid = conf.router.m_netId; llarp::LogWarn( "!!!! you have manually set netid to be '", netid, @@ -394,36 +426,36 @@ namespace llarp } // IWP config - m_OutboundPort = conf->links.m_OutboundLink.port; + m_OutboundPort = conf.links.m_OutboundLink.port; // Router config - _rc.SetNick(conf->router.m_nickname); - _outboundSessionMaker.maxConnectedRouters = conf->router.m_maxConnectedRouters; - _outboundSessionMaker.minConnectedRouters = conf->router.m_minConnectedRouters; + _rc.SetNick(conf.router.m_nickname); + _outboundSessionMaker.maxConnectedRouters = conf.router.m_maxConnectedRouters; + _outboundSessionMaker.minConnectedRouters = conf.router.m_minConnectedRouters; encryption_keyfile = m_keyManager->m_encKeyPath; our_rc_file = m_keyManager->m_rcPath; transport_keyfile = m_keyManager->m_transportKeyPath; ident_keyfile = m_keyManager->m_idKeyPath; - _ourAddress = conf->router.m_publicAddress; + _ourAddress = conf.router.m_publicAddress; - RouterContact::BlockBogons = conf->router.m_blockBogons; + RouterContact::BlockBogons = conf.router.m_blockBogons; // Lokid Config - usingSNSeed = conf->lokid.usingSNSeed; - whitelistRouters = conf->lokid.whitelistRouters; - lokidRPCAddr = lokimq::address(conf->lokid.lokidRPCAddr); + usingSNSeed = conf.lokid.usingSNSeed; + whitelistRouters = conf.lokid.whitelistRouters; + lokidRPCAddr = lokimq::address(conf.lokid.lokidRPCAddr); if (usingSNSeed) - ident_keyfile = conf->lokid.ident_keyfile; + ident_keyfile = conf.lokid.ident_keyfile; // TODO: add config flag for "is service node" - if (conf->links.m_InboundLinks.size()) + if (conf.links.m_InboundLinks.size()) { m_isServiceNode = true; } - networkConfig = conf->network; + networkConfig = conf.network; /// build a set of strictConnectPubkeys ( /// TODO: make this consistent with config -- do we support multiple strict connections @@ -446,21 +478,21 @@ namespace llarp throw std::invalid_argument(stringify("invalid key for strict-connect: ", val)); } - std::vector configRouters = conf->connect.routers; + std::vector configRouters = conf.connect.routers; configRouters.insert( - configRouters.end(), conf->bootstrap.routers.begin(), conf->bootstrap.routers.end()); + configRouters.end(), conf.bootstrap.routers.begin(), conf.bootstrap.routers.end()); // if our conf had no bootstrap files specified, try the default location of // /bootstrap.signed. If this isn't present, leave a useful error message if (configRouters.size() == 0 and not m_isServiceNode) { // TODO: use constant - fs::path defaultBootstrapFile = conf->router.m_dataDir / "bootstrap.signed"; + fs::path defaultBootstrapFile = conf.router.m_dataDir / "bootstrap.signed"; if (fs::exists(defaultBootstrapFile)) { configRouters.push_back(defaultBootstrapFile); } - else if (not conf->bootstrap.skipBootstrap) + else if (not conf.bootstrap.skipBootstrap) { LogError("No bootstrap files specified in config file, and the default"); LogError("bootstrap file ", defaultBootstrapFile, " does not exist."); @@ -515,6 +547,7 @@ namespace llarp // Init components after relevant config settings loaded _outboundMessageHandler.Init(&_linkManager, _logic); _outboundSessionMaker.Init( + this, &_linkManager, &_rcLookupHandler, &_routerProfiling, @@ -534,16 +567,16 @@ namespace llarp m_isServiceNode); // create inbound links, if we are a service node - for (const LinksConfig::LinkInfo& serverConfig : conf->links.m_InboundLinks) + for (const LinksConfig::LinkInfo& serverConfig : conf.links.m_InboundLinks) { auto server = iwp::NewInboundLink( m_keyManager, util::memFn(&AbstractRouter::rc, this), util::memFn(&AbstractRouter::HandleRecvLinkMessageBuffer, this), util::memFn(&AbstractRouter::Sign, this), - util::memFn(&IOutboundSessionMaker::OnSessionEstablished, &_outboundSessionMaker), + util::memFn(&Router::ConnectionEstablished, this), util::memFn(&AbstractRouter::CheckRenegotiateValid, this), - util::memFn(&IOutboundSessionMaker::OnConnectTimeout, &_outboundSessionMaker), + util::memFn(&Router::ConnectionTimedOut, this), util::memFn(&AbstractRouter::SessionClosed, this), util::memFn(&AbstractRouter::PumpLL, this), util::memFn(&AbstractRouter::QueueWork, this)); @@ -559,15 +592,15 @@ namespace llarp } // Network config - if (conf->network.m_enableProfiling.has_value() and not*conf->network.m_enableProfiling) + if (conf.network.m_enableProfiling.has_value() and not*conf.network.m_enableProfiling) { routerProfiling().Disable(); LogWarn("router profiling explicitly disabled"); } - if (!conf->network.m_routerProfilesFile.empty()) + if (!conf.network.m_routerProfilesFile.empty()) { - routerProfilesFile = conf->network.m_routerProfilesFile; + routerProfilesFile = conf.network.m_routerProfilesFile; routerProfiling().Load(routerProfilesFile.c_str()); llarp::LogInfo("setting profiles to ", routerProfilesFile); } @@ -575,15 +608,27 @@ namespace llarp // API config if (not IsServiceNode()) { - hiddenServiceContext().AddEndpoint(*conf); + hiddenServiceContext().AddEndpoint(conf); + } + + // peer stats + if (conf.router.m_enablePeerStats) + { + LogInfo("Initializing peerdb..."); + m_peerDb = std::make_shared(); + m_peerDb->configure(conf.router); + } + else + { + assert(not IsServiceNode()); // enable peer stats must be enabled for service nodes } // Logging config LogContext::Instance().Initialize( - conf->logging.m_logLevel, - conf->logging.m_logType, - conf->logging.m_logFile, - conf->router.m_nickname, + conf.logging.m_logLevel, + conf.logging.m_logType, + conf.logging.m_logFile, + conf.router.m_nickname, util::memFn(&AbstractRouter::QueueDiskIO, this)); return true; @@ -754,6 +799,20 @@ namespace llarp { nodedb()->AsyncFlushToDisk(); } + + if (m_peerDb) + { + // TODO: throttle this? + // TODO: need to capture session stats when session terminates / is removed from link manager + _linkManager.updatePeerDb(m_peerDb); + + if (m_peerDb->shouldFlush(now)) + { + LogWarn("Queing database flush..."); + QueueDiskIO([this]() { m_peerDb->flushDatabase(); }); + } + } + // get connected peers std::set peersWeHave; _linkManager.ForEachPeer([&peersWeHave](ILinkSession* s) { @@ -791,6 +850,31 @@ namespace llarp LogInfo("Session to ", remote, " fully closed"); } + void + Router::ConnectionTimedOut(ILinkSession* session) + { + if (m_peerDb) + { + RouterID id{session->GetPubKey()}; + // TODO: make sure this is a public router (on whitelist)? + m_peerDb->modifyPeerStats(id, [&](PeerStats& stats) { stats.numConnectionTimeouts++; }); + } + _outboundSessionMaker.OnConnectTimeout(session); + } + + bool + Router::ConnectionEstablished(ILinkSession* session, bool inbound) + { + RouterID id{session->GetPubKey()}; + if (m_peerDb) + { + // TODO: make sure this is a public router (on whitelist)? + m_peerDb->modifyPeerStats(id, [&](PeerStats& stats) { stats.numConnectionSuccesses++; }); + } + NotifyRouterEvent(pubkey(), id, inbound); + return _outboundSessionMaker.OnSessionEstablished(session); + } + bool Router::GetRandomConnectedRouter(RouterContact& result) const { @@ -1101,19 +1185,6 @@ namespace llarp return true; } - bool - Router::ValidateConfig(Config* /*conf*/) const - { - return true; - } - - bool - Router::Reconfigure(Config*) - { - // TODO: implement me - return true; - } - bool Router::TryConnectAsync(RouterContact rc, uint16_t tries) { @@ -1142,9 +1213,9 @@ namespace llarp util::memFn(&AbstractRouter::rc, this), util::memFn(&AbstractRouter::HandleRecvLinkMessageBuffer, this), util::memFn(&AbstractRouter::Sign, this), - util::memFn(&IOutboundSessionMaker::OnSessionEstablished, &_outboundSessionMaker), + util::memFn(&Router::ConnectionEstablished, this), util::memFn(&AbstractRouter::CheckRenegotiateValid, this), - util::memFn(&IOutboundSessionMaker::OnConnectTimeout, &_outboundSessionMaker), + util::memFn(&Router::ConnectionTimedOut, this), util::memFn(&AbstractRouter::SessionClosed, this), util::memFn(&AbstractRouter::PumpLL, this), util::memFn(&AbstractRouter::QueueWork, this)); @@ -1178,4 +1249,11 @@ namespace llarp LogDebug("Message failed sending to ", remote); } } + + void + Router::HandleRouterEvent(tooling::RouterEventPtr event) const + { + LogDebug(event->ToString()); + } + } // namespace llarp diff --git a/llarp/router/router.hpp b/llarp/router/router.hpp index 22dcb3514..ccf4d1f33 100644 --- a/llarp/router/router.hpp +++ b/llarp/router/router.hpp @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -48,7 +49,7 @@ namespace llarp { - struct Router final : public AbstractRouter + struct Router : public AbstractRouter { llarp_time_t _lastPump = 0s; bool ready; @@ -306,12 +307,18 @@ namespace llarp return _rcLookupHandler; } + std::shared_ptr + peerDb() override + { + return m_peerDb; + } + void GossipRCIfNeeded(const RouterContact rc) override; explicit Router(llarp_ev_loop_ptr __netloop, std::shared_ptr logic); - ~Router() override; + virtual ~Router() override; bool HandleRecvLinkMessageBuffer(ILinkSession* from, const llarp_buffer_t& msg) override; @@ -338,7 +345,7 @@ namespace llarp Close(); bool - Configure(Config* conf, bool isRouter, llarp_nodedb* nodedb = nullptr) override; + Configure(const Config& conf, bool isRouter, llarp_nodedb* nodedb = nullptr) override; bool StartRpcServer() override; @@ -385,19 +392,9 @@ namespace llarp void try_connect(fs::path rcfile); - /// inject configuration and reconfigure router - bool - Reconfigure(Config* conf) override; - bool TryConnectAsync(RouterContact rc, uint16_t tries) override; - /// validate new configuration against old one - /// return true on 100% valid - /// return false if not 100% valid - bool - ValidateConfig(Config* conf) const override; - /// send to remote router or queue for sending /// returns false on overflow /// returns true on successful queue @@ -427,6 +424,14 @@ namespace llarp void SessionClosed(RouterID remote) override; + /// called by link when an unestablished connection times out + void + ConnectionTimedOut(ILinkSession* session); + + /// called by link when session is fully established + bool + ConnectionEstablished(ILinkSession* session, bool inbound); + /// call internal router ticker void Tick(); @@ -495,6 +500,7 @@ namespace llarp llarp_time_t m_LastStatsReport = 0s; std::shared_ptr m_keyManager; + std::shared_ptr m_peerDb; uint32_t path_build_count = 0; @@ -508,10 +514,20 @@ namespace llarp UpdateOurRC(bool rotateKeys = false); bool - FromConfig(Config* conf); + FromConfig(const Config& conf); void MessageSent(const RouterID& remote, SendStatus status); + + protected: + virtual void + HandleRouterEvent(tooling::RouterEventPtr event) const override; + + virtual bool + disableGossipingRC_TestingOnly() + { + return false; + }; }; } // namespace llarp diff --git a/llarp/rpc/lokid_rpc_client.cpp b/llarp/rpc/lokid_rpc_client.cpp index 304331125..54f74b6e6 100644 --- a/llarp/rpc/lokid_rpc_client.cpp +++ b/llarp/rpc/lokid_rpc_client.cpp @@ -1,5 +1,6 @@ #include +#include #include #include @@ -36,18 +37,20 @@ namespace llarp : m_lokiMQ(std::move(lmq)), m_Router(r) { // m_lokiMQ->log_level(toLokiMQLogLevel(LogLevel::Instance().curLevel)); + + // TODO: proper auth here + auto lokidCategory = m_lokiMQ->add_category("lokid", lokimq::Access{lokimq::AuthLevel::none}); + lokidCategory.add_request_command( + "get_peer_stats", [this](lokimq::Message& m) { HandleGetPeerStats(m); }); } void LokidRpcClient::ConnectAsync(lokimq::address url) { LogInfo("connecting to lokid via LMQ at ", url); - m_lokiMQ->connect_remote( + m_Connection = m_lokiMQ->connect_remote( url, - [self = shared_from_this()](lokimq::ConnectionID c) { - self->m_Connection = std::move(c); - self->Connected(); - }, + [self = shared_from_this()](lokimq::ConnectionID) { self->Connected(); }, [self = shared_from_this(), url](lokimq::ConnectionID, std::string_view f) { llarp::LogWarn("Failed to connect to lokid: ", f); LogicCall(self->m_Router->logic(), [self, url]() { self->ConnectAsync(url); }); @@ -100,10 +103,18 @@ namespace llarp constexpr auto PingInterval = 1min; constexpr auto NodeListUpdateInterval = 30s; - LogInfo("we connected to lokid [", *m_Connection, "]"); - Command("admin.lokinet_ping"); - m_lokiMQ->add_timer( - [self = shared_from_this()]() { self->Command("admin.lokinet_ping"); }, PingInterval); + auto makePingRequest = [self = shared_from_this()]() { + nlohmann::json payload = {{"version", {VERSION[0], VERSION[1], VERSION[2]}}}; + self->Request( + "admin.lokinet_ping", + [](bool success, std::vector data) { + (void)data; + LogDebug("Received response for ping. Successful: ", success); + }, + payload.dump()); + }; + makePingRequest(); + m_lokiMQ->add_timer(makePingRequest, PingInterval); m_lokiMQ->add_timer( [self = shared_from_this()]() { self->UpdateServiceNodeList(); }, NodeListUpdateInterval); UpdateServiceNodeList(); @@ -175,13 +186,13 @@ namespace llarp "failed to get private key request " "failed"); } - if (data.empty()) + if (data.empty() or data.size() < 2) { throw std::runtime_error( "failed to get private key request " "data empty"); } - const auto j = nlohmann::json::parse(data[0]); + const auto j = nlohmann::json::parse(data[1]); SecretKey k; if (not k.FromHex(j.at("service_node_ed25519_privkey").get())) { @@ -189,8 +200,14 @@ namespace llarp } promise.set_value(k); } + catch (const std::exception& e) + { + LogWarn("Caught exception while trying to request admin keys: ", e.what()); + promise.set_exception(std::current_exception()); + } catch (...) { + LogWarn("Caught non-standard exception while trying to request admin keys"); promise.set_exception(std::current_exception()); } }); @@ -198,5 +215,74 @@ namespace llarp return ftr.get(); } + void + LokidRpcClient::HandleGetPeerStats(lokimq::Message& msg) + { + LogInfo("Got request for peer stats (size: ", msg.data.size(), ")"); + for (auto str : msg.data) + { + LogInfo(" :", str); + } + + assert(m_Router != nullptr); + + if (not m_Router->peerDb()) + { + LogWarn("HandleGetPeerStats called when router has no peerDb set up."); + + // TODO: this can sometimes occur if lokid hits our API before we're done configuring + // (mostly an issue in a loopback testnet) + msg.send_reply("EAGAIN"); + return; + } + + try + { + // msg.data[0] is expected to contain a bt list of router ids (in our preferred string + // format) + if (msg.data.empty()) + { + LogWarn("lokid requested peer stats with no request body"); + msg.send_reply("peer stats request requires list of router IDs"); + return; + } + + std::vector routerIdStrings; + lokimq::bt_deserialize(msg.data[0], routerIdStrings); + + std::vector routerIds; + routerIds.reserve(routerIdStrings.size()); + + for (const auto& routerIdString : routerIdStrings) + { + RouterID id; + if (not id.FromString(routerIdString)) + { + LogWarn("lokid sent us an invalid router id: ", routerIdString); + msg.send_reply("Invalid router id"); + return; + } + + routerIds.push_back(std::move(id)); + } + + auto statsList = m_Router->peerDb()->listPeerStats(routerIds); + + int32_t bufSize = + 256 + (statsList.size() * 1024); // TODO: tune this or allow to grow dynamically + auto buf = std::unique_ptr(new uint8_t[bufSize]); + llarp_buffer_t llarpBuf(buf.get(), bufSize); + + PeerStats::BEncodeList(statsList, &llarpBuf); + + msg.send_reply(std::string_view((const char*)llarpBuf.base, llarpBuf.cur - llarpBuf.base)); + } + catch (const std::exception& e) + { + LogError("Failed to handle get_peer_stats request: ", e.what()); + msg.send_reply("server error"); + } + } + } // namespace rpc } // namespace llarp diff --git a/llarp/rpc/lokid_rpc_client.hpp b/llarp/rpc/lokid_rpc_client.hpp index 682c143ed..b8cef923e 100644 --- a/llarp/rpc/lokid_rpc_client.hpp +++ b/llarp/rpc/lokid_rpc_client.hpp @@ -59,6 +59,10 @@ namespace llarp void HandleGotServiceNodeList(std::string json); + // Handles request from lokid for peer stats on a specific peer + void + HandleGetPeerStats(lokimq::Message& msg); + std::optional m_Connection; LMQ_ptr m_lokiMQ; std::string m_CurrentBlockHash; diff --git a/llarp/tooling/hive_context.cpp b/llarp/tooling/hive_context.cpp new file mode 100644 index 000000000..76f436c13 --- /dev/null +++ b/llarp/tooling/hive_context.cpp @@ -0,0 +1,33 @@ +#include + +#include + +namespace tooling +{ + HiveContext::HiveContext(RouterHive* hive) : m_hive(hive) + { + } + + std::unique_ptr + HiveContext::makeRouter( + llarp_ev_loop_ptr netloop, + std::shared_ptr logic) + { + return std::make_unique(netloop, logic, m_hive); + } + + HiveRouter* + HiveContext::getRouterAsHiveRouter() + { + if (not router) + return nullptr; + + HiveRouter* hiveRouter = dynamic_cast(router.get()); + + if (hiveRouter == nullptr) + throw std::runtime_error("HiveContext has a router not of type HiveRouter"); + + return hiveRouter; + } + +} // namespace tooling diff --git a/llarp/tooling/hive_context.hpp b/llarp/tooling/hive_context.hpp new file mode 100644 index 000000000..6489705f1 --- /dev/null +++ b/llarp/tooling/hive_context.hpp @@ -0,0 +1,30 @@ +#pragma once + +#include +#include + +namespace tooling +{ + /// HiveContext is a subclass of llarp::Context which allows RouterHive to + /// perform custom behavior which might be undesirable in production code. + struct HiveContext : public llarp::Context + { + HiveContext(RouterHive* hive); + + std::unique_ptr + makeRouter( + llarp_ev_loop_ptr netloop, + std::shared_ptr logic) override; + + /// Get this context's router as a HiveRouter. + /// + /// Returns nullptr if there is no router or throws an exception if the + /// router is somehow not an instance of HiveRouter. + HiveRouter* + getRouterAsHiveRouter(); + + protected: + RouterHive* m_hive = nullptr; + }; + +} // namespace tooling diff --git a/llarp/tooling/hive_router.cpp b/llarp/tooling/hive_router.cpp new file mode 100644 index 000000000..3a40e2e05 --- /dev/null +++ b/llarp/tooling/hive_router.cpp @@ -0,0 +1,37 @@ +#include + +#include + +namespace tooling +{ + HiveRouter::HiveRouter( + llarp_ev_loop_ptr netloop, std::shared_ptr logic, RouterHive* hive) + : Router(netloop, logic), m_hive(hive) + { + } + + bool + HiveRouter::disableGossipingRC_TestingOnly() + { + return m_disableGossiping; + } + + void + HiveRouter::disableGossiping() + { + m_disableGossiping = true; + } + + void + HiveRouter::enableGossiping() + { + m_disableGossiping = false; + } + + void + HiveRouter::HandleRouterEvent(RouterEventPtr event) const + { + m_hive->NotifyEvent(std::move(event)); + } + +} // namespace tooling diff --git a/llarp/tooling/hive_router.hpp b/llarp/tooling/hive_router.hpp new file mode 100644 index 000000000..f8af35a0f --- /dev/null +++ b/llarp/tooling/hive_router.hpp @@ -0,0 +1,38 @@ +#pragma once + +#include + +namespace tooling +{ + /// HiveRouter is a subclass of Router which overrides specific behavior in + /// order to perform testing-related functions. It exists largely to prevent + /// this behavior (which may often be "dangerous") from leaking into release + /// code. + struct HiveRouter : public llarp::Router + { + HiveRouter( + llarp_ev_loop_ptr netloop, + std::shared_ptr logic, + RouterHive* hive); + + virtual ~HiveRouter() = default; + + /// Override logic to prevent base Router class from gossiping its RC. + virtual bool + disableGossipingRC_TestingOnly() override; + + void + disableGossiping(); + + void + enableGossiping(); + + protected: + bool m_disableGossiping = false; + RouterHive* m_hive = nullptr; + + virtual void + HandleRouterEvent(RouterEventPtr event) const override; + }; + +} // namespace tooling diff --git a/llarp/tooling/peer_stats_event.hpp b/llarp/tooling/peer_stats_event.hpp new file mode 100644 index 000000000..34d1f6519 --- /dev/null +++ b/llarp/tooling/peer_stats_event.hpp @@ -0,0 +1,44 @@ +#pragma once + +#include "router_event.hpp" + +namespace tooling +{ + struct LinkSessionEstablishedEvent : public RouterEvent + { + llarp::RouterID remoteId; + bool inbound = false; + + LinkSessionEstablishedEvent( + const llarp::RouterID& ourRouterId, const llarp::RouterID& remoteId_, bool inbound_) + : RouterEvent("Link: LinkSessionEstablishedEvent", ourRouterId, false) + , remoteId(remoteId_) + , inbound(inbound_) + { + } + + std::string + ToString() const + { + return RouterEvent::ToString() + (inbound ? "inbound" : "outbound") + + " : LinkSessionEstablished with " + remoteId.ToString(); + } + }; + + struct ConnectionAttemptEvent : public RouterEvent + { + llarp::RouterID remoteId; + + ConnectionAttemptEvent(const llarp::RouterID& ourRouterId, const llarp::RouterID& remoteId_) + : RouterEvent("Link: ConnectionAttemptEvent", ourRouterId, false), remoteId(remoteId_) + { + } + + std::string + ToString() const + { + return RouterEvent::ToString() + " : LinkSessionEstablished with " + remoteId.ToString(); + } + }; + +} // namespace tooling diff --git a/llarp/tooling/router_hive.cpp b/llarp/tooling/router_hive.cpp index 613db5a9a..647153772 100644 --- a/llarp/tooling/router_hive.cpp +++ b/llarp/tooling/router_hive.cpp @@ -14,45 +14,43 @@ using namespace std::chrono_literals; namespace tooling { void - RouterHive::AddRouter( - const std::shared_ptr& config, std::vector* routers, bool isRelay) + RouterHive::AddRouter(const std::shared_ptr& config, bool isRouter) { - llarp_main* ctx = llarp_main_init_from_config(config->Copy(), isRelay); - auto result = llarp_main_setup(ctx, isRelay); - if (result == 0) - { - llarp::Context::Get(ctx)->InjectHive(this); - routers->push_back(ctx); - } - else - { - throw std::runtime_error(llarp::stringify( - "Failed to add RouterHive ", - (isRelay ? "relay" : "client"), - ", llarp_main_setup() returned ", - result)); - } + auto& container = (isRouter ? relays : clients); + + llarp::RuntimeOptions opts; + opts.isRouter = isRouter; + + Context_ptr context = std::make_shared(this); + context->Configure(*config); + context->Setup(opts); + + auto routerId = llarp::RouterID(context->router->pubkey()); + container[routerId] = context; + std::cout << "Generated router with ID " << routerId << std::endl; } void RouterHive::AddRelay(const std::shared_ptr& config) { - AddRouter(config, &relays, true); + AddRouter(config, true); } void RouterHive::AddClient(const std::shared_ptr& config) { - AddRouter(config, &clients, false); + AddRouter(config, false); } void - RouterHive::StartRouters(std::vector* routers, bool isRelay) + RouterHive::StartRouters(bool isRelay) { - for (llarp_main* ctx : *routers) + auto& container = (isRelay ? relays : clients); + + for (auto [routerId, ctx] : container) { routerMainThreads.emplace_back([=]() { - llarp_main_run(ctx, llarp_main_runtime_opts{false, false, false, isRelay}); + ctx->Run(llarp::RuntimeOptions{false, false, isRelay}); }); std::this_thread::sleep_for(2ms); } @@ -61,39 +59,39 @@ namespace tooling void RouterHive::StartRelays() { - StartRouters(&relays, true); + StartRouters(true); } void RouterHive::StartClients() { - StartRouters(&clients, false); + StartRouters(false); } void RouterHive::StopRouters() { llarp::LogInfo("Signalling all routers to stop"); - for (llarp_main* ctx : relays) + for (auto [routerId, ctx] : relays) { - llarp_main_signal(ctx, 2 /* SIGINT */); + LogicCall(ctx->logic, [ctx]() { ctx->HandleSignal(SIGINT); }); } - for (llarp_main* ctx : clients) + for (auto [routerId, ctx] : clients) { - llarp_main_signal(ctx, 2 /* SIGINT */); + LogicCall(ctx->logic, [ctx]() { ctx->HandleSignal(SIGINT); }); } llarp::LogInfo("Waiting on routers to be stopped"); - for (llarp_main* ctx : relays) + for (auto [routerId, ctx] : relays) { - while (llarp_main_is_running(ctx)) + while (ctx->IsUp()) { std::this_thread::sleep_for(10ms); } } - for (llarp_main* ctx : clients) + for (auto [routerId, ctx] : clients) { - while (llarp_main_is_running(ctx)) + while (ctx->IsUp()) { std::this_thread::sleep_for(10ms); } @@ -148,46 +146,40 @@ namespace tooling } void - RouterHive::VisitRouter(llarp_main* router, std::function visit) + RouterHive::VisitRouter(Context_ptr ctx, std::function visit) { - auto ctx = llarp::Context::Get(router); - LogicCall(ctx->logic, [visit, ctx]() { visit(ctx); }); + // TODO: this should be called from each router's appropriate Logic thread, e.g.: + // LogicCall(ctx->logic, [visit, ctx]() { visit(ctx); }); + // however, this causes visit calls to be deferred + visit(ctx); } - void - RouterHive::VisitRelay(size_t index, std::function visit) + HiveRouter* + RouterHive::GetRelay(const llarp::RouterID& id, bool needMutexLock) { - if (index >= relays.size()) - { - visit(nullptr); - return; - } - VisitRouter(relays[index], visit); - } + auto guard = + needMutexLock ? std::make_optional>(routerMutex) : std::nullopt; - void - RouterHive::VisitClient(size_t index, std::function visit) - { - if (index >= clients.size()) - { - visit(nullptr); - return; - } - VisitRouter(clients[index], visit); + auto itr = relays.find(id); + if (itr == relays.end()) + return nullptr; + + auto ctx = itr->second; + return ctx->getRouterAsHiveRouter(); } std::vector RouterHive::RelayConnectedRelays() { + std::lock_guard guard{routerMutex}; std::vector results; results.resize(relays.size()); std::mutex results_lock; size_t i = 0; size_t done_count = 0; - for (auto relay : relays) + for (auto [routerId, ctx] : relays) { - auto ctx = llarp::Context::Get(relay); LogicCall(ctx->logic, [&, i, ctx]() { size_t count = ctx->router->NumberOfConnectedRouters(); std::lock_guard guard{results_lock}; @@ -216,17 +208,43 @@ namespace tooling std::vector RouterHive::GetRelayRCs() { + std::lock_guard guard{routerMutex}; std::vector results; results.resize(relays.size()); size_t i = 0; - for (auto relay : relays) + for (auto [routerId, ctx] : relays) { - auto ctx = llarp::Context::Get(relay); results[i] = ctx->router->rc(); i++; } return results; } + void + RouterHive::ForEachRelay(std::function visit) + { + for (auto [routerId, ctx] : relays) + { + VisitRouter(ctx, visit); + } + } + + void + RouterHive::ForEachClient(std::function visit) + { + for (auto [routerId, ctx] : clients) + { + VisitRouter(ctx, visit); + } + } + + /// safely visit every router context + void + RouterHive::ForEachRouter(std::function visit) + { + ForEachRelay(visit); + ForEachClient(visit); + } + } // namespace tooling diff --git a/llarp/tooling/router_hive.hpp b/llarp/tooling/router_hive.hpp index 9e9982e35..8cb8fa37b 100644 --- a/llarp/tooling/router_hive.hpp +++ b/llarp/tooling/router_hive.hpp @@ -4,6 +4,7 @@ #include #include +#include #include #include @@ -16,35 +17,26 @@ struct llarp_main; namespace llarp { struct Context; -} +} // namespace llarp namespace tooling { + struct HiveRouter; // Hive's version of Router + struct RouterHive { - using Context_ptr = std::shared_ptr; + using Context_ptr = std::shared_ptr; private: void - StartRouters(std::vector* routers, bool isRelay); - - void - AddRouter( - const std::shared_ptr& config, - std::vector* routers, - bool isRelay); + StartRouters(bool isRelay); - /// safely visit router void - VisitRouter(llarp_main* router, std::function visit); + AddRouter(const std::shared_ptr& config, bool isRelay); - /// safely visit relay at index N + /// safely visit router (asynchronously) void - VisitRelay(size_t index, std::function visit); - - /// safely visit client at index N - void - VisitClient(size_t index, std::function visit); + VisitRouter(Context_ptr ctx, std::function visit); public: RouterHive() = default; @@ -73,31 +65,16 @@ namespace tooling std::deque GetAllEvents(); + // functions to safely visit each relay and/or client's HiveContext void - ForEachRelay(std::function visit) - { - for (size_t idx = 0; idx < relays.size(); ++idx) - { - VisitRelay(idx, visit); - } - } - + ForEachRelay(std::function visit); void - ForEachClient(std::function visit) - { - for (size_t idx = 0; idx < clients.size(); ++idx) - { - VisitClient(idx, visit); - } - } - - /// safely visit every router context + ForEachClient(std::function visit); void - ForEachRouter(std::function visit) - { - ForEachRelay(visit); - ForEachClient(visit); - } + ForEachRouter(std::function visit); + + HiveRouter* + GetRelay(const llarp::RouterID& id, bool needMutexLock = true); std::vector RelayConnectedRelays(); @@ -105,8 +82,9 @@ namespace tooling std::vector GetRelayRCs(); - std::vector relays; - std::vector clients; + std::mutex routerMutex; + std::unordered_map relays; + std::unordered_map clients; std::vector routerMainThreads; diff --git a/pybind/CMakeLists.txt b/pybind/CMakeLists.txt index e307f67cd..a8706ff50 100644 --- a/pybind/CMakeLists.txt +++ b/pybind/CMakeLists.txt @@ -1,11 +1,13 @@ pybind11_add_module(pyllarp MODULE module.cpp llarp/context.cpp + llarp/router.cpp llarp/router_id.cpp llarp/router_contact.cpp llarp/crypto/types.cpp llarp/config.cpp llarp/logger.cpp + llarp/peerstats.cpp llarp/dht/dht_types.cpp llarp/path/path_types.cpp llarp/path/path_hop_config.cpp diff --git a/pybind/common.hpp b/pybind/common.hpp index 93faecd6e..6d1508882 100644 --- a/pybind/common.hpp +++ b/pybind/common.hpp @@ -20,6 +20,9 @@ namespace llarp void CryptoTypes_Init(py::module& mod); + void + AbstractRouter_Init(py::module& mod); + void RouterID_Init(py::module& mod); @@ -32,6 +35,12 @@ namespace llarp void PathTypes_Init(py::module& mod); + void + PeerDb_Init(py::module& mod); + + void + PeerStats_Init(py::module& mod); + namespace dht { void @@ -65,4 +74,10 @@ namespace tooling void RouterEvent_Init(py::module& mod); + + void + HiveContext_Init(py::module& mod); + + void + HiveRouter_Init(py::module& mod); } // namespace tooling diff --git a/pybind/llarp/config.cpp b/pybind/llarp/config.cpp index 2ca9934ee..b14575013 100644 --- a/pybind/llarp/config.cpp +++ b/pybind/llarp/config.cpp @@ -48,7 +48,8 @@ namespace llarp }) .def_readwrite("workerThreads", &RouterConfig::m_workerThreads) .def_readwrite("numNetThreads", &RouterConfig::m_numNetThreads) - .def_readwrite("JobQueueSize", &RouterConfig::m_JobQueueSize); + .def_readwrite("JobQueueSize", &RouterConfig::m_JobQueueSize) + .def_readwrite("enablePeerStats", &RouterConfig::m_enablePeerStats); py::class_(mod, "NetworkConfig") .def(py::init<>()) @@ -101,7 +102,10 @@ namespace llarp .def_readwrite("usingSNSeed", &LokidConfig::usingSNSeed) .def_readwrite("whitelistRouters", &LokidConfig::whitelistRouters) .def_readwrite("ident_keyfile", &LokidConfig::ident_keyfile) - .def_readwrite("lokidRPCAddr", &LokidConfig::lokidRPCAddr); + .def_property( + "lokidRPCAddr", + [](LokidConfig& self) { return self.lokidRPCAddr.full_address().c_str(); }, + [](LokidConfig& self, std::string arg) { self.lokidRPCAddr = lokimq::address(arg); }); py::class_(mod, "BootstrapConfig") .def(py::init<>()) diff --git a/pybind/llarp/context.cpp b/pybind/llarp/context.cpp index e2e8084be..b15b67bae 100644 --- a/pybind/llarp/context.cpp +++ b/pybind/llarp/context.cpp @@ -1,5 +1,6 @@ #include "common.hpp" #include +#include #include #include "llarp/handlers/pyhandler.hpp" namespace llarp @@ -11,8 +12,10 @@ namespace llarp py::class_(mod, "Context") .def( "Setup", - [](Context_ptr self, bool isRelay) -> bool { return self->Setup(isRelay) == 0; }) - .def("Run", [](Context_ptr self) -> int { return self->Run(llarp_main_runtime_opts{}); }) + [](Context_ptr self, bool isRouter) { + self->Setup({false, false, isRouter}); + }) + .def("Run", [](Context_ptr self) -> int { return self->Run(RuntimeOptions{}); }) .def("Stop", [](Context_ptr self) { self->CloseAsync(); }) .def("IsUp", &Context::IsUp) .def("IsRelay", [](Context_ptr self) -> bool { return self->router->IsServiceNode(); }) @@ -34,4 +37,19 @@ namespace llarp }) .def("CallSafe", &Context::CallSafe); } + } // namespace llarp + +namespace tooling +{ + void + HiveContext_Init(py::module& mod) + { + using HiveContext_ptr = std::shared_ptr; + py::class_(mod, "HiveContext") + .def( + "getRouterAsHiveRouter", + &tooling::HiveContext::getRouterAsHiveRouter, + py::return_value_policy::reference); + } +} // namespace tooling diff --git a/pybind/llarp/peerstats.cpp b/pybind/llarp/peerstats.cpp new file mode 100644 index 000000000..eebca2059 --- /dev/null +++ b/pybind/llarp/peerstats.cpp @@ -0,0 +1,40 @@ +#include "common.hpp" +#include "config/config.hpp" +#include "peerstats/peer_db.hpp" +#include "peerstats/types.hpp" + +#include + +namespace llarp +{ + void + PeerDb_Init(py::module& mod) + { + using PeerDb_ptr = std::shared_ptr; + py::class_(mod, "PeerDb") + .def("getCurrentPeerStats", &PeerDb::getCurrentPeerStats); + } + + void + PeerStats_Init(py::module& mod) + { + py::class_(mod, "PeerStats") + .def_readwrite("routerId", &PeerStats::routerId) + .def_readwrite("numConnectionAttempts", &PeerStats::numConnectionAttempts) + .def_readwrite("numConnectionSuccesses", &PeerStats::numConnectionSuccesses) + .def_readwrite("numConnectionRejections", &PeerStats::numConnectionRejections) + .def_readwrite("numConnectionTimeouts", &PeerStats::numConnectionTimeouts) + .def_readwrite("numPathBuilds", &PeerStats::numPathBuilds) + .def_readwrite("numPacketsAttempted", &PeerStats::numPacketsAttempted) + .def_readwrite("numPacketsSent", &PeerStats::numPacketsSent) + .def_readwrite("numPacketsDropped", &PeerStats::numPacketsDropped) + .def_readwrite("numPacketsResent", &PeerStats::numPacketsResent) + .def_readwrite("numDistinctRCsReceived", &PeerStats::numDistinctRCsReceived) + .def_readwrite("numLateRCs", &PeerStats::numLateRCs) + .def_readwrite("peakBandwidthBytesPerSec", &PeerStats::peakBandwidthBytesPerSec) + .def_readwrite("longestRCReceiveInterval", &PeerStats::longestRCReceiveInterval) + .def_readwrite("leastRCRemainingLifetime", &PeerStats::leastRCRemainingLifetime) + .def_readwrite("lastRCUpdated", &PeerStats::lastRCUpdated) + .def_readwrite("stale", &PeerStats::stale); + } +} // namespace llarp diff --git a/pybind/llarp/router.cpp b/pybind/llarp/router.cpp new file mode 100644 index 000000000..ad668f4b8 --- /dev/null +++ b/pybind/llarp/router.cpp @@ -0,0 +1,28 @@ +#include "common.hpp" + +#include "router/abstractrouter.hpp" +#include "tooling/hive_router.hpp" + +namespace llarp +{ + void + AbstractRouter_Init(py::module& mod) + { + py::class_(mod, "AbstractRouter") + .def("rc", &AbstractRouter::rc) + .def("Stop", &AbstractRouter::Stop) + .def("peerDb", &AbstractRouter::peerDb); + } + +} // namespace llarp + +namespace tooling +{ + void + HiveRouter_Init(py::module& mod) + { + py::class_(mod, "HiveRouter") + .def("disableGossiping", &HiveRouter::disableGossiping) + .def("enableGossiping", &HiveRouter::enableGossiping); + } +} // namespace tooling diff --git a/pybind/llarp/router_id.cpp b/pybind/llarp/router_id.cpp index 6c15f8814..a04784d52 100644 --- a/pybind/llarp/router_id.cpp +++ b/pybind/llarp/router_id.cpp @@ -18,8 +18,6 @@ namespace llarp .def("__repr__", &RouterID::ToString) .def("__str__", &RouterID::ToString) .def("ShortString", &RouterID::ShortString) - .def("__eq__", [](const RouterID* const lhs, const RouterID* const rhs) { - return *lhs == *rhs; - }); + .def("__eq__", [](const RouterID& lhs, const RouterID& rhs) { return lhs == rhs; }); } } // namespace llarp diff --git a/pybind/llarp/tooling/peer_stats_event.hpp b/pybind/llarp/tooling/peer_stats_event.hpp new file mode 100644 index 000000000..92e7c7381 --- /dev/null +++ b/pybind/llarp/tooling/peer_stats_event.hpp @@ -0,0 +1,28 @@ +#pragma once + +#include "router_event.hpp" + +namespace tooling +{ + struct LinkSessionEstablishedEvent : public RouterEvent + { + llarp::RouterID remoteId; + bool inbound = false; + + LinkSessionEstablishedEvent( + const llarp::RouterID& ourRouterId, const llarp::RouterID& remoteId_, bool inbound_) + : RouterEvent("Link: LinkSessionEstablishedEvent", ourRouterId, false) + , remoteId(remoteId_) + , inbound(inbound_) + { + } + + std::string + ToString() const + { + return RouterEvent::ToString() + (inbound ? "inbound" : "outbound") + + " : LinkSessionEstablished with " + remoteId.ToString(); + } + }; + +} // namespace tooling diff --git a/pybind/llarp/tooling/router_event.cpp b/pybind/llarp/tooling/router_event.cpp index 8098d4283..3fe62db18 100644 --- a/pybind/llarp/tooling/router_event.cpp +++ b/pybind/llarp/tooling/router_event.cpp @@ -5,6 +5,7 @@ #include "tooling/dht_event.hpp" #include "tooling/path_event.hpp" #include "tooling/rc_event.hpp" +#include "tooling/peer_stats_event.hpp" #include #include @@ -73,6 +74,13 @@ namespace tooling mod, "FindRouterReceivedEvent"); py::class_(mod, "FindRouterSentEvent"); + + py::class_(mod, "LinkSessionEstablishedEvent") + .def_readonly("remoteId", &LinkSessionEstablishedEvent::remoteId) + .def_readonly("inbound", &LinkSessionEstablishedEvent::inbound); + + py::class_(mod, "ConnectionAttemptEvent") + .def_readonly("remoteId", &ConnectionAttemptEvent::remoteId); } } // namespace tooling diff --git a/pybind/llarp/tooling/router_hive.cpp b/pybind/llarp/tooling/router_hive.cpp index 63f7e6c44..b5c23d825 100644 --- a/pybind/llarp/tooling/router_hive.cpp +++ b/pybind/llarp/tooling/router_hive.cpp @@ -3,13 +3,18 @@ #include "pybind11/iostream.h" #include +#include "router/abstractrouter.hpp" #include "llarp.hpp" + namespace tooling { void RouterHive_Init(py::module& mod) { using RouterHive_ptr = std::shared_ptr; + using Context_ptr = RouterHive::Context_ptr; + using ContextVisitor = std::function; + py::class_(mod, "RouterHive") .def(py::init<>()) .def("AddRelay", &RouterHive::AddRelay) @@ -17,12 +22,34 @@ namespace tooling .def("StartRelays", &RouterHive::StartRelays) .def("StartClients", &RouterHive::StartClients) .def("StopAll", &RouterHive::StopRouters) - .def("ForEachRelay", &RouterHive::ForEachRelay) - .def("ForEachClient", &RouterHive::ForEachClient) - .def("ForEachRouter", &RouterHive::ForEachRouter) + .def( + "ForEachRelay", + [](RouterHive& hive, ContextVisitor visit) { + hive.ForEachRelay([visit](Context_ptr ctx) { + py::gil_scoped_acquire acquire; + visit(std::move(ctx)); + }); + }) + .def( + "ForEachClient", + [](RouterHive& hive, ContextVisitor visit) { + hive.ForEachClient([visit](Context_ptr ctx) { + py::gil_scoped_acquire acquire; + visit(std::move(ctx)); + }); + }) + .def( + "ForEachRouter", + [](RouterHive& hive, ContextVisitor visit) { + hive.ForEachRouter([visit](Context_ptr ctx) { + py::gil_scoped_acquire acquire; + visit(std::move(ctx)); + }); + }) .def("GetNextEvent", &RouterHive::GetNextEvent) .def("GetAllEvents", &RouterHive::GetAllEvents) .def("RelayConnectedRelays", &RouterHive::RelayConnectedRelays) - .def("GetRelayRCs", &RouterHive::GetRelayRCs); + .def("GetRelayRCs", &RouterHive::GetRelayRCs) + .def("GetRelay", &RouterHive::GetRelay); } } // namespace tooling diff --git a/pybind/module.cpp b/pybind/module.cpp index e53894179..605d83844 100644 --- a/pybind/module.cpp +++ b/pybind/module.cpp @@ -5,10 +5,15 @@ PYBIND11_MODULE(pyllarp, m) { tooling::RouterHive_Init(m); tooling::RouterEvent_Init(m); + llarp::AbstractRouter_Init(m); + tooling::HiveRouter_Init(m); + llarp::PeerDb_Init(m); + llarp::PeerStats_Init(m); llarp::RouterID_Init(m); llarp::RouterContact_Init(m); llarp::CryptoTypes_Init(m); llarp::Context_Init(m); + tooling::HiveContext_Init(m); llarp::Config_Init(m); llarp::dht::DHTTypes_Init(m); llarp::PathTypes_Init(m); diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index bd3714233..b921350a5 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -68,6 +68,8 @@ add_executable(catchAll util/test_llarp_util_printer.cpp util/test_llarp_util_str.cpp util/test_llarp_util_decaying_hashset.cpp + peerstats/test_peer_db.cpp + peerstats/test_peer_types.cpp config/test_llarp_config_definition.cpp config/test_llarp_config_output.cpp net/test_ip_address.cpp diff --git a/test/exit/test_llarp_exit_context.cpp b/test/exit/test_llarp_exit_context.cpp index 2e2e30c26..cf93e60ac 100644 --- a/test/exit/test_llarp_exit_context.cpp +++ b/test/exit/test_llarp_exit_context.cpp @@ -6,39 +6,40 @@ #include #include -llarp_main* +static const llarp::RuntimeOptions opts = {.background = false, .debug = false, .isRouter = true}; + +std::shared_ptr make_context() { - // make config - auto config = new llarp_config(); - REQUIRE(config != nullptr); - REQUIRE(config->impl.LoadDefault(true, fs::current_path())); + llarp::Config conf; + conf.LoadDefault(true, fs::current_path()); + // set testing defaults - config->impl.network.m_endpointType = "null"; - config->impl.bootstrap.skipBootstrap = true; - config->impl.api.m_enableRPCServer = false; + conf.network.m_endpointType = "null"; + conf.bootstrap.skipBootstrap = true; + conf.api.m_enableRPCServer = false; // make a fake inbound link - config->impl.links.m_InboundLinks.emplace_back(); - auto& link = config->impl.links.m_InboundLinks.back(); + conf.links.m_InboundLinks.emplace_back(); + auto& link = conf.links.m_InboundLinks.back(); link.interface = llarp::net::LoopbackInterfaceName(); link.addressFamily = AF_INET; link.port = 0; // configure - auto ptr = llarp_main_init_from_config(config, true); - REQUIRE(ptr != nullptr); - llarp_config_free(config); - return ptr; + + auto context = std::make_shared(); + REQUIRE_NOTHROW(context->Configure(std::move(conf))); + + return context; } TEST_CASE("ensure snode address allocation", "[snode]") { llarp::LogSilencer shutup; auto ctx = make_context(); - REQUIRE(llarp_main_setup(ctx, true) == 0); - auto ctx_pp = llarp::Context::Get(ctx); - ctx_pp->CallSafe([ctx_pp]() { - REQUIRE(ctx_pp->router->IsServiceNode()); - auto& context = ctx_pp->router->exitContext(); + REQUIRE_NOTHROW(ctx->Setup(opts)); + ctx->CallSafe([ctx]() { + REQUIRE(ctx->router->IsServiceNode()); + auto& context = ctx->router->exitContext(); llarp::PubKey pk; pk.Randomize(); @@ -51,8 +52,9 @@ TEST_CASE("ensure snode address allocation", "[snode]") REQUIRE( context.FindEndpointForPath(firstPath)->LocalIP() == context.FindEndpointForPath(secondPath)->LocalIP()); - ctx_pp->CloseAsync(); + ctx->CloseAsync(); }); - REQUIRE(llarp_main_run(ctx, llarp_main_runtime_opts{.isRelay = true}) == 0); - llarp_main_free(ctx); + REQUIRE(ctx->Run(opts) == 0); + + ctx.reset(); } diff --git a/test/hive/conftest.py b/test/hive/conftest.py index e44427ee8..371f62740 100644 --- a/test/hive/conftest.py +++ b/test/hive/conftest.py @@ -33,3 +33,15 @@ def HiveArbitrary(): router_hive.Stop() +@pytest.fixture() +def HiveForPeerStats(): + router_hive = None + def _make(n_relays, n_clients, netid): + nonlocal router_hive + router_hive = hive.RouterHive(n_relays, n_clients, netid) + router_hive.Start() + return router_hive + + yield _make + + router_hive.Stop() diff --git a/test/hive/hive.py b/test/hive/hive.py index 3adda4eef..fa6fb88e0 100644 --- a/test/hive/hive.py +++ b/test/hive/hive.py @@ -65,6 +65,7 @@ class RouterHive(object): config.router.nickname = "Router%d" % index config.router.overrideAddress('127.0.0.1:{}'.format(port)) config.router.blockBogons = False + config.router.enablePeerStats = True config.network.enableProfiling = False config.network.routerProfilesFile = "%s/profiles.dat" % dirname @@ -80,6 +81,8 @@ class RouterHive(object): config.api.enableRPCServer = False + config.lokid.whitelistRouters = False + print("adding relay at index %d" % port); self.hive.AddRelay(config) @@ -110,6 +113,8 @@ class RouterHive(object): config.api.enableRPCServer = False + config.lokid.whitelistRouters = False + self.hive.AddClient(config) def InitFirstRC(self): diff --git a/test/hive/test_peer_stats.py b/test/hive/test_peer_stats.py new file mode 100644 index 000000000..c8f934053 --- /dev/null +++ b/test/hive/test_peer_stats.py @@ -0,0 +1,96 @@ +import pyllarp +from time import time + +def test_peer_stats(HiveForPeerStats): + + numRelays = 12 + + hive = HiveForPeerStats(n_relays=numRelays, n_clients=0, netid="hive") + someRouterId = None + + def collectStatsForAWhile(duration): + print("collecting router hive stats for {} seconds...", duration) + + start_time = time() + cur_time = start_time + + # we track the number of attempted sessions and inbound/outbound established sessions + numInbound = 0 + numOutbound = 0 + numAttempts = 0 + + nonlocal someRouterId + + while cur_time < start_time + duration: + hive.CollectAllEvents() + + for event in hive.events: + event_name = event.__class__.__name__ + + if event_name == "LinkSessionEstablishedEvent": + if event.inbound: + numInbound += 1 + else: + numOutbound += 1 + + if event_name == "ConnectionAttemptEvent": + numAttempts += 1 + + # we pick an arbitrary router out of our routers + if someRouterId is None: + someRouterId = event.remoteId; + + hive.events = [] + cur_time = time() + + # these should be strictly equal, although there is variation because of + # the time we sample + print("test duration exceeded") + print("in: {} out: {} attempts: {}", numInbound, numOutbound, numAttempts); + totalReceived = tally_rc_received_for_peer(hive.hive, someRouterId) + + # every router should have received this relay's RC exactly once + print("total times RC received: {} numRelays: {}", totalReceived, numRelays) + + return { + "numInbound": numInbound, + "numOutbound": numOutbound, + "numAttempts": numAttempts, + "totalTargetRCsReceived": totalReceived, + }; + + results1 = collectStatsForAWhile(30); + assert(results1["totalTargetRCsReceived"] == numRelays) + + # stop our router from gossiping + router = hive.hive.GetRelay(someRouterId, True) + router.disableGossiping(); + + ignore = collectStatsForAWhile(30); + + # ensure that no one hears a fresh RC from this router again + print("Starting second (longer) stats collection...") + results2 = collectStatsForAWhile(3600); + assert(results2["totalTargetRCsReceived"] == numRelays) # should not have increased + +def tally_rc_received_for_peer(hive, routerId): + + numFound = 0 + + def visit(context): + nonlocal numFound + + peerDb = context.getRouterAsHiveRouter().peerDb() + stats = peerDb.getCurrentPeerStats(routerId); + + assert(stats.routerId == routerId) + + numFound += stats.numDistinctRCsReceived + + hive.ForEachRelay(visit) + + return numFound; + + +if __name__ == "__main__": + main() diff --git a/test/iwp/test_iwp_session.cpp b/test/iwp/test_iwp_session.cpp index 0510e73d3..3d2f26394 100644 --- a/test/iwp/test_iwp_session.cpp +++ b/test/iwp/test_iwp_session.cpp @@ -86,8 +86,9 @@ struct IWPLinkContext return true; }, // established handler - [established](llarp::ILinkSession* s) { + [established](llarp::ILinkSession* s, bool linkIsInbound) { REQUIRE(s != nullptr); + REQUIRE(inbound == linkIsInbound); established(s); return true; }, diff --git a/test/peerstats/test_peer_db.cpp b/test/peerstats/test_peer_db.cpp new file mode 100644 index 000000000..b3baa3a5b --- /dev/null +++ b/test/peerstats/test_peer_db.cpp @@ -0,0 +1,217 @@ +#include +#include + +#include +#include +#include "peerstats/types.hpp" +#include "router_contact.hpp" +#include "util/time.hpp" + +TEST_CASE("Test PeerDb PeerStats memory storage", "[PeerDb]") +{ + const llarp::RouterID id = llarp::test::makeBuf(0x01); + const llarp::PeerStats empty(id); + + llarp::PeerDb db; + CHECK(db.getCurrentPeerStats(id).has_value() == false); + + llarp::PeerStats delta(id); + delta.numConnectionAttempts = 4; + delta.peakBandwidthBytesPerSec = 5; + db.accumulatePeerStats(id, delta); + CHECK(* db.getCurrentPeerStats(id) == delta); + + delta = llarp::PeerStats(id); + delta.numConnectionAttempts = 5; + delta.peakBandwidthBytesPerSec = 6; + db.accumulatePeerStats(id, delta); + + llarp::PeerStats expected(id); + expected.numConnectionAttempts = 9; + expected.peakBandwidthBytesPerSec = 6; + CHECK(* db.getCurrentPeerStats(id) == expected); +} + +TEST_CASE("Test PeerDb flush before load", "[PeerDb]") +{ + llarp::PeerDb db; + CHECK_THROWS_WITH(db.flushDatabase(), "Cannot flush database before it has been loaded"); +} + +TEST_CASE("Test PeerDb load twice", "[PeerDb]") +{ + llarp::PeerDb db; + CHECK_NOTHROW(db.loadDatabase(std::nullopt)); + CHECK_THROWS_WITH(db.loadDatabase(std::nullopt), "Reloading database not supported"); +} + +TEST_CASE("Test PeerDb nukes stats on load", "[PeerDb]") +{ + const llarp::RouterID id = llarp::test::makeBuf(0x01); + + llarp::PeerDb db; + + llarp::PeerStats stats(id); + stats.numConnectionAttempts = 1; + + db.accumulatePeerStats(id, stats); + CHECK(* db.getCurrentPeerStats(id) == stats); + + db.loadDatabase(std::nullopt); + + CHECK(db.getCurrentPeerStats(id).has_value() == false); +} + +TEST_CASE("Test PeerDb file-backed database reloads properly", "[PeerDb]") +{ + const std::string filename = "/tmp/peerdb_test_tmp2.db.sqlite"; + const llarp::RouterID id = llarp::test::makeBuf(0x02); + + { + llarp::PeerDb db; + db.loadDatabase(filename); + + llarp::PeerStats stats(id); + stats.numConnectionAttempts = 43; + + db.accumulatePeerStats(id, stats); + + db.flushDatabase(); + } + + { + llarp::PeerDb db; + db.loadDatabase(filename); + + auto stats = db.getCurrentPeerStats(id); + CHECK(stats.has_value() == true); + CHECK(stats->numConnectionAttempts == 43); + } + + fs::remove(filename); +} + +TEST_CASE("Test PeerDb modifyPeerStats", "[PeerDb]") +{ + const llarp::RouterID id = llarp::test::makeBuf(0xF2); + + int numTimesCalled = 0; + + llarp::PeerDb db; + db.loadDatabase(std::nullopt); + + db.modifyPeerStats(id, [&](llarp::PeerStats& stats) { + numTimesCalled++; + + stats.numPathBuilds += 42; + }); + + db.flushDatabase(); + + CHECK(numTimesCalled == 1); + + auto stats = db.getCurrentPeerStats(id); + CHECK(stats.has_value()); + CHECK(stats->numPathBuilds == 42); +} + +TEST_CASE("Test PeerDb handleGossipedRC", "[PeerDb]") +{ + const llarp::RouterID id = llarp::test::makeBuf(0xCA); + + auto rcLifetime = llarp::RouterContact::Lifetime; + llarp_time_t now = 0s; + + llarp::RouterContact rc; + rc.pubkey = llarp::PubKey(id); + rc.last_updated = 10s; + + llarp::PeerDb db; + db.handleGossipedRC(rc, now); + + auto stats = db.getCurrentPeerStats(id); + CHECK(stats.has_value()); + CHECK(stats->leastRCRemainingLifetime == 0ms); // not calculated on first received RC + CHECK(stats->numDistinctRCsReceived == 1); + CHECK(stats->lastRCUpdated == 10000ms); + + now = 9s; + db.handleGossipedRC(rc, now); + stats = db.getCurrentPeerStats(id); + CHECK(stats.has_value()); + // these values should remain unchanged, this is not a new RC + CHECK(stats->leastRCRemainingLifetime == 0ms); + CHECK(stats->numDistinctRCsReceived == 1); + CHECK(stats->lastRCUpdated == 10000ms); + + rc.last_updated = 11s; + + db.handleGossipedRC(rc, now); + stats = db.getCurrentPeerStats(id); + // should be (previous expiration time - new received time) + CHECK(stats->leastRCRemainingLifetime == ((10s + rcLifetime) - now)); + CHECK(stats->numDistinctRCsReceived == 2); + CHECK(stats->lastRCUpdated == 11000ms); +} + +TEST_CASE("Test PeerDb handleGossipedRC expiry calcs", "[PeerDb]") +{ + const llarp::RouterID id = llarp::test::makeBuf(0xF9); + + // see comments in peer_db.cpp above PeerDb::handleGossipedRC() for some context around these + // tests and esp. these numbers + const llarp_time_t ref = 48h; + const llarp_time_t rcLifetime = llarp::RouterContact::Lifetime; + + // rc1, first rc received + const llarp_time_t s1 = ref; + const llarp_time_t r1 = s1 + 30s; + const llarp_time_t e1 = s1 + rcLifetime; + llarp::RouterContact rc1; + rc1.pubkey = llarp::PubKey(id); + rc1.last_updated = s1; + + // rc2, second rc received + // received "healthily", with lots of room to spare before rc1 expires + const llarp_time_t s2 = s1 + 8h; + const llarp_time_t r2 = s2 + 30s; // healthy recv time + const llarp_time_t e2 = s2 + rcLifetime; + llarp::RouterContact rc2; + rc2.pubkey = llarp::PubKey(id); + rc2.last_updated = s2; + + // rc3, third rc received + // received "unhealthily" (after rc2 expires) + const llarp_time_t s3 = s2 + 8h; + const llarp_time_t r3 = e2 + 1h; // received after e2 + llarp::RouterContact rc3; + rc3.pubkey = llarp::PubKey(id); + rc3.last_updated = s3; + + llarp::PeerDb db; + + db.handleGossipedRC(rc1, r1); + auto stats1 = db.getCurrentPeerStats(id); + CHECK(stats1.has_value()); + CHECK(stats1->leastRCRemainingLifetime == 0ms); + CHECK(stats1->numDistinctRCsReceived == 1); + CHECK(stats1->lastRCUpdated == s1); + + db.handleGossipedRC(rc2, r2); + auto stats2 = db.getCurrentPeerStats(id); + CHECK(stats2.has_value()); + CHECK(stats2->leastRCRemainingLifetime == (e1 - r2)); + CHECK(stats2->leastRCRemainingLifetime > 0ms); // ensure positive indicates healthy + CHECK(stats2->numDistinctRCsReceived == 2); + CHECK(stats2->lastRCUpdated == s2); + + db.handleGossipedRC(rc3, r3); + auto stats3 = db.getCurrentPeerStats(id); + CHECK(stats3.has_value()); + CHECK(stats3->leastRCRemainingLifetime == (e2 - r3)); + CHECK( + stats3->leastRCRemainingLifetime + < 0ms); // ensure negative indicates unhealthy and we use min() + CHECK(stats3->numDistinctRCsReceived == 3); + CHECK(stats3->lastRCUpdated == s3); +} diff --git a/test/peerstats/test_peer_types.cpp b/test/peerstats/test_peer_types.cpp new file mode 100644 index 000000000..e957d9dbf --- /dev/null +++ b/test/peerstats/test_peer_types.cpp @@ -0,0 +1,77 @@ +#include +#include +#include + +#include + +TEST_CASE("Test PeerStats operator+=", "[PeerStats]") +{ + llarp::RouterID id = {}; + + // TODO: test all members + llarp::PeerStats stats(id); + stats.numConnectionAttempts = 1; + stats.peakBandwidthBytesPerSec = 12; + + llarp::PeerStats delta(id); + delta.numConnectionAttempts = 2; + delta.peakBandwidthBytesPerSec = 4; + + stats += delta; + + CHECK(stats.numConnectionAttempts == 3); + CHECK(stats.peakBandwidthBytesPerSec == 12); // should take max(), not add +} + +TEST_CASE("Test PeerStats BEncode", "[PeerStats]") +{ + llarp::RouterID id = llarp::test::makeBuf(0x01); + + llarp::PeerStats stats(id); + + stats.numConnectionAttempts = 1; + stats.numConnectionSuccesses = 2; + stats.numConnectionRejections = 3; + stats.numConnectionTimeouts = 4; + stats.numPathBuilds = 5; + stats.numPacketsAttempted = 6; + stats.numPacketsSent = 7; + stats.numPacketsDropped = 8; + stats.numPacketsResent = 9; + stats.numDistinctRCsReceived = 10; + stats.numLateRCs = 11; + stats.peakBandwidthBytesPerSec = 12.1; // should truncate to 12 + stats.longestRCReceiveInterval = 13ms; + stats.leastRCRemainingLifetime = 14ms; + stats.lastRCUpdated = 15ms; + + constexpr int bufSize = 4096; + byte_t* raw = new byte_t[bufSize]; + llarp_buffer_t buf(raw, bufSize); + + CHECK_NOTHROW(stats.BEncode(&buf)); + + std::string asString = (const char*)raw; + constexpr std::string_view expected = + "d" + "21:numConnectionAttempts" "i1e" + "22:numConnectionSuccesses" "i2e" + "23:numConnectionRejections" "i3e" + "21:numConnectionTimeouts" "i4e" + "13:numPathBuilds" "i5e" + "19:numPacketsAttempted" "i6e" + "14:numPacketsSent" "i7e" + "17:numPacketsDropped" "i8e" + "16:numPacketsResent" "i9e" + "22:numDistinctRCsReceived" "i10e" + "10:numLateRCs" "i11e" + "24:peakBandwidthBytesPerSec" "i12e" + "24:longestRCReceiveInterval" "i13e" + "24:leastRCRemainingLifetime" "i14e" + "13:lastRCUpdated" "i15e" + "e"; + + CHECK(asString == expected); + + delete [] raw; +} diff --git a/test/regress/2020-06-08-key-backup-bug.cpp b/test/regress/2020-06-08-key-backup-bug.cpp index d10f47da0..bc7e30f7f 100644 --- a/test/regress/2020-06-08-key-backup-bug.cpp +++ b/test/regress/2020-06-08-key-backup-bug.cpp @@ -11,13 +11,15 @@ llarp::RuntimeOptions opts = {false, false, false}; static std::shared_ptr make_context(std::optional keyfile) { - auto context = std::make_shared(); - context->Configure(opts, {}); + llarp::Config conf; + conf.LoadDefault(opts.isRouter, {}); + conf.network.m_endpointType = "null"; + conf.network.m_keyfile = keyfile; + conf.bootstrap.skipBootstrap = true; + conf.api.m_enableRPCServer = false; - context->config->network.m_endpointType = "null"; - context->config->network.m_keyfile = keyfile; - context->config->bootstrap.skipBootstrap = true; - context->config->api.m_enableRPCServer = false; + auto context = std::make_shared(); + REQUIRE_NOTHROW(context->Configure(std::move(conf))); return context; }