From 0006751d806a9c06e12227baa0a708906b69ca4c Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 19 May 2020 14:53:03 -0400 Subject: [PATCH] initial wack at lokimq --- .gitmodules | 1 + CMakeLists.txt | 3 - cmake/unix.cmake | 9 +- llarp/CMakeLists.txt | 2 +- llarp/router/abstractrouter.hpp | 12 ++- llarp/router/router.cpp | 12 ++- llarp/router/router.hpp | 12 ++- llarp/rpc/lokid_rpc_client.cpp | 176 ++++++++++++++++++++------------ llarp/rpc/lokid_rpc_client.hpp | 60 +++++------ llarp/util/aligned.hpp | 6 ++ llarp/util/encode.hpp | 14 +++ 11 files changed, 204 insertions(+), 103 deletions(-) diff --git a/.gitmodules b/.gitmodules index c50dd5725..18e3dc04b 100644 --- a/.gitmodules +++ b/.gitmodules @@ -30,3 +30,4 @@ [submodule "external/loki-mq"] path = external/loki-mq url = https://github.com/loki-project/loki-mq + branch = dev diff --git a/CMakeLists.txt b/CMakeLists.txt index a59abbd36..a9e1b9d91 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -276,9 +276,6 @@ if(SUBMODULE_CHECK) endif() endif() -add_subdirectory(external/loki-mq) -include_directories(external/loki-mq) -include_directories(external/loki-mq/mapbox-variant/include) # We only actually need pybind11 with WITH_HIVE, but if we don't load it here then something further # down loads a broken PythonInterp that loads Python2, but Python2 headers are not C++17 compatible. diff --git a/cmake/unix.cmake b/cmake/unix.cmake index 857392bd6..44a9ce430 100644 --- a/cmake/unix.cmake +++ b/cmake/unix.cmake @@ -62,9 +62,16 @@ elseif(DOWNLOAD_UV) add_definitions(-D_LARGEFILE_SOURCE) add_definitions(-D_FILE_OFFSET_BITS=64) endif() - include_directories(${LIBUV_INCLUDE_DIRS}) +#find_package(LokiMQ) +#if(LokiMQ_FOUND) +# message(STATUS "using system lokimq") +#else() +message(STATUS "using lokimq submodule") +add_subdirectory(${CMAKE_SOURCE_DIR}/external/loki-mq) +#endif() + if(EMBEDDED_CFG OR ${CMAKE_SYSTEM_NAME} MATCHES "Linux") link_libatomic() endif() diff --git a/llarp/CMakeLists.txt b/llarp/CMakeLists.txt index da40700d8..0517b1c0f 100644 --- a/llarp/CMakeLists.txt +++ b/llarp/CMakeLists.txt @@ -221,7 +221,7 @@ if(WITH_HIVE) target_sources(liblokinet PRIVATE tooling/router_hive.cpp) endif() -target_link_libraries(liblokinet PUBLIC cxxopts lokinet-platform lokinet-util lokinet-cryptography) +target_link_libraries(liblokinet PUBLIC cxxopts lokinet-platform lokinet-util lokinet-cryptography lokimq) target_link_libraries(liblokinet PRIVATE libunbound) diff --git a/llarp/router/abstractrouter.hpp b/llarp/router/abstractrouter.hpp index a49bcfd19..c988483c2 100644 --- a/llarp/router/abstractrouter.hpp +++ b/llarp/router/abstractrouter.hpp @@ -21,6 +21,11 @@ struct llarp_dht_context; struct llarp_nodedb; struct llarp_threadpool; +namespace lokimq +{ + class LokiMQ; +} + namespace llarp { class Logic; @@ -62,6 +67,8 @@ namespace llarp class ThreadPool; } + using LMQ_ptr = std::shared_ptr; + struct AbstractRouter { #ifdef LOKINET_HIVE @@ -73,6 +80,9 @@ namespace llarp virtual bool HandleRecvLinkMessageBuffer(ILinkSession* from, const llarp_buffer_t& msg) = 0; + virtual LMQ_ptr + lmq() const = 0; + virtual std::shared_ptr logic() const = 0; @@ -239,7 +249,7 @@ namespace llarp /// set router's service node whitelist virtual void - SetRouterWhitelist(const std::vector& routers) = 0; + SetRouterWhitelist(const std::vector routers) = 0; /// visit each connected link session virtual void diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index 80d76b4ac..22932ae37 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -38,6 +38,8 @@ #include #endif +#include + static constexpr std::chrono::milliseconds ROUTER_TICK_INTERVAL = 1s; namespace llarp @@ -47,6 +49,7 @@ namespace llarp llarp_ev_loop_ptr __netloop, std::shared_ptr l) : ready(false) + , m_lmq(std::make_shared()) , _netloop(std::move(__netloop)) , cryptoworker(std::move(_tp)) , _logic(std::move(l)) @@ -61,6 +64,7 @@ namespace llarp #else , _randomStartDelay(std::chrono::seconds((llarp::randint() % 30) + 10)) #endif + , m_lokidRpcClient(m_lmq, this) { m_keyManager = std::make_shared(); @@ -394,7 +398,7 @@ namespace llarp // Lokid Config usingSNSeed = conf->lokid.usingSNSeed; whitelistRouters = conf->lokid.whitelistRouters; - lokidRPCAddr = IpAddress(conf->lokid.lokidRPCAddr); // TODO: make config's option an IpAddress + lokidRPCAddr = conf->lokid.lokidRPCAddr; lokidRPCUser = conf->lokid.lokidRPCUser; lokidRPCPassword = conf->lokid.lokidRPCPassword; @@ -808,7 +812,7 @@ namespace llarp } void - Router::SetRouterWhitelist(const std::vector& routers) + Router::SetRouterWhitelist(const std::vector routers) { _rcLookupHandler.SetRouterWhitelist(routers); } @@ -839,9 +843,11 @@ namespace llarp if (whitelistRouters) { - LogInfo("RPC Caller to ", lokidRPCAddr, " started"); + m_lokidRpcClient.ConnectAsync(std::string_view{lokidRPCAddr}); } + m_lmq->start(); + if (!cryptoworker->start()) { LogError("crypto worker failed to start"); diff --git a/llarp/router/router.hpp b/llarp/router/router.hpp index 1772398cc..2b21fa07d 100644 --- a/llarp/router/router.hpp +++ b/llarp/router/router.hpp @@ -73,6 +73,14 @@ namespace llarp /// should we obey the service node whitelist? bool whitelistRouters = false; + LMQ_ptr m_lmq; + + LMQ_ptr + lmq() const override + { + return m_lmq; + } + std::shared_ptr logic() const override { @@ -113,7 +121,7 @@ namespace llarp } void - SetRouterWhitelist(const std::vector& routers) override; + SetRouterWhitelist(const std::vector routers) override; exit::Context& exitContext() override @@ -249,7 +257,7 @@ namespace llarp rpc::LokidRpcClient m_lokidRpcClient; - IpAddress lokidRPCAddr = IpAddress("127.0.0.1:22023"); + std::string lokidRPCAddr = "ipc://loki.sock"; std::string lokidRPCUser; std::string lokidRPCPassword; diff --git a/llarp/rpc/lokid_rpc_client.cpp b/llarp/rpc/lokid_rpc_client.cpp index b9c51a213..6eca11c7e 100644 --- a/llarp/rpc/lokid_rpc_client.cpp +++ b/llarp/rpc/lokid_rpc_client.cpp @@ -3,34 +3,19 @@ #include #include -#include +#include + +#include + +#include +#include namespace llarp { namespace rpc { - static ::LogLevel - fromLokiMQLogLevel(lokimq::LogLevel level) - { - switch (level) - { - case lokimq::LogLevel::fatal: - case lokimq::LogLevel::error: - return eLogError; - case lokimq::LogLevel::warn: - return eLogWarn; - case lokimq::LogLevel::info: - return eLogInfo; - case lokimq::LogLevel::debug: - case lokimq::LogLevel::trace: - return eLogDebug; - default: - return eLogNone; - } - } - static lokimq::LogLevel - toLokiMQLogLevel(::LogLevel level) + toLokiMQLogLevel(llarp::LogLevel level) { switch (level) { @@ -48,65 +33,130 @@ namespace llarp } } - static void - lokimqLogger(lokimq::LogLevel level, const char* file, std::string msg) + LokidRpcClient::LokidRpcClient(LMQ_ptr lmq, AbstractRouter* r) + : m_lokiMQ(std::move(lmq)), m_Router(r) { - switch (level) - { - case lokimq::LogLevel::fatal: - case lokimq::LogLevel::error: - LogError(msg); - break; - case lokimq::LogLevel::warn: - LogWarn(msg); - break; - case lokimq::LogLevel::info: - LogInfo(msg); - break; - case lokimq::LogLevel::debug: - case lokimq::LogLevel::trace: - LogDebug(msg); - break; - } + // m_lokiMQ->log_level(toLokiMQLogLevel(LogLevel::Instance().curLevel)); } - LokidRpcClient::LokidRpcClient(std::string lokidPubkey) - : m_lokiMQ(lokimqLogger, lokimq::LogLevel::debug), m_lokidPubkey(std::move(lokidPubkey)) + void + LokidRpcClient::ConnectAsync(std::string_view url) { - m_lokiMQ.log_level(toLokiMQLogLevel(LogLevel::Instance().curLevel)); + LogInfo("connecting to lokid via LMQ at ", url); + m_lokiMQ->connect_remote( + std::move(url), + [self = shared_from_this()](lokimq::ConnectionID c) { + self->m_Connection = std::move(c); + self->Connected(); + }, + [](lokimq::ConnectionID, std::string_view f) { + llarp::LogWarn("Failed to connect to lokid: ", f); + }); } void - LokidRpcClient::connect() + LokidRpcClient::Command(std::string_view cmd) { - m_lokidConnectionId = m_lokiMQ.connect_sn(m_lokidPubkey); // not a blocking call + LogDebug("lokid command: ", cmd); + m_lokiMQ->send(*m_Connection, std::move(cmd)); } - std::future - LokidRpcClient::ping() + void + LokidRpcClient::Connected() { - throw std::runtime_error("TODO: LokidRpcClient::ping()"); - } + constexpr auto PingInterval = 1min; + constexpr auto NodeListUpdateInterval = 90s; - std::future - LokidRpcClient::requestNextBlockHash() - { - throw std::runtime_error("TODO: LokidRpcClient::requestNextBlockHash()"); - } + LogInfo("we connected to lokid [", *m_Connection, "]"); + m_lokiMQ->add_timer( + [self = shared_from_this()]() { + LogInfo("Telling lokid we are live"); + self->Command("rpc.lokinet_ping"); + }, + PingInterval); - std::future> - LokidRpcClient::requestServiceNodeList() - { - throw std::runtime_error("TODO: LokidRpcClient::requestServiceNodeList()"); + m_lokiMQ->add_timer( + [self = shared_from_this()]() { + nlohmann::json request; + request["pubkey_ed25519"] = true; + request["active_only"] = true; + if (not self->m_CurrentBlockHash.empty()) + request["poll_block_hash"] = self->m_CurrentBlockHash; + self->Request( + "rpc.get_service_nodes", + [self](bool success, std::vector data) { + if (not success) + { + LogWarn("failed to update service node list"); + return; + } + if (data.size() < 2) + { + LogWarn("lokid gave empty reply for service node list"); + return; + } + try + { + self->HandleGotServiceNodeList(std::move(data[1])); + } + catch (std::exception& ex) + { + LogError("failed to process service node list: ", ex.what()); + } + }, + request.dump()); + }, + NodeListUpdateInterval); } void - LokidRpcClient::request() + LokidRpcClient::HandleGotServiceNodeList(std::string data) { - // TODO: ensure we are connected - // m_lokiMQ.request(m_lokidConnectionId, ...); + auto j = nlohmann::json::parse(std::move(data)); + { + const auto itr = j.find("block_hash"); + if (itr != j.end()) + { + m_CurrentBlockHash = itr->get(); + } + } + { + const auto itr = j.find("unchanged"); + if (itr != j.end()) + { + if (itr->get()) + { + LogDebug("service node list unchanged"); + return; + } + } + } + + std::vector nodeList; + { + const auto itr = j.find("service_node_states"); + if (itr != j.end() and itr->is_array()) + { + for (auto j_itr = itr->begin(); j_itr != itr->end(); j_itr++) + { + const auto ed_itr = j_itr->find("pubkey_ed25519"); + if (ed_itr == j_itr->end() or not ed_itr->is_string()) + continue; + RouterID rid; + if (rid.FromHex(ed_itr->get())) + nodeList.emplace_back(std::move(rid)); + } + } + } + + if (nodeList.empty()) + { + LogWarn("got empty service node list from lokid"); + return; + } - throw std::runtime_error("TODO: LokidRpcClient::request()"); + // inform router about the new list + LogicCall(m_Router->logic(), [r = m_Router, nodeList]() { r->SetRouterWhitelist(nodeList); }); } } // namespace rpc diff --git a/llarp/rpc/lokid_rpc_client.hpp b/llarp/rpc/lokid_rpc_client.hpp index aec34052c..a98da99e4 100644 --- a/llarp/rpc/lokid_rpc_client.hpp +++ b/llarp/rpc/lokid_rpc_client.hpp @@ -4,14 +4,16 @@ #include -#include - namespace llarp { + struct AbstractRouter; + namespace rpc { + using LMQ_ptr = std::shared_ptr; + /// The LokidRpcClient uses loki-mq to talk to make API requests to lokid. - struct LokidRpcClient + struct LokidRpcClient : public std::enable_shared_from_this { /// Not copyable or movable (because lokimq::LokiMQ is not copyable or movable). /// Consider wrapping in a std::unique_ptr or std::shared_ptr if you need to pass this around. @@ -23,36 +25,36 @@ namespace llarp operator=(LokidRpcClient&&) = delete; /// Constructor - /// TODO: take lokid pubkey and other auth parameters - LokidRpcClient(std::string lokidPubkey); + LokidRpcClient(LMQ_ptr lmq, AbstractRouter* r); - /// Connect to lokid + /// Connect to lokid async void - connect(); - - /// Initiates a ping request to lokid, currently used to let lokid know that lokinet is still - /// running (required to prevent a Service Node from being deregistered). - /// - /// This uses the "lokinet_ping" API endpoint. - std::future - ping(); - - /// Requests the most recent known block hash from lokid - /// - /// This uses the "poll_block_hash" API endpoint. - std::future - requestNextBlockHash(); - - /// Requests a full list of known service nodes from lokid - /// - /// This uses the "get_n_service_nodes" API endpoint. - std::future> - requestServiceNodeList(); + ConnectAsync(std::string_view url); private: - std::string m_lokidPubkey; - lokimq::ConnectionID m_lokidConnectionId; - lokimq::LokiMQ m_lokiMQ; + /// called when we have connected to lokid via lokimq + void + Connected(); + + /// do a lmq command on the current connection + void + Command(std::string_view cmd); + + template + void + Request(std::string_view cmd, HandlerFunc_t func, const Args_t& args) + { + m_lokiMQ->request(*m_Connection, std::move(cmd), std::move(func), args); + } + + void + HandleGotServiceNodeList(std::string json); + + std::optional m_Connection; + LMQ_ptr m_lokiMQ; + std::string m_CurrentBlockHash; + + AbstractRouter* const m_Router; void request(); diff --git a/llarp/util/aligned.hpp b/llarp/util/aligned.hpp index ac6904343..187447ecd 100644 --- a/llarp/util/aligned.hpp +++ b/llarp/util/aligned.hpp @@ -265,6 +265,12 @@ namespace llarp return ToHex().substr(0, 8); } + bool + FromHex(std::string_view str) + { + return HexDecode(str.begin(), str.end(), data(), size()); + } + std::ostream& print(std::ostream& stream, int level, int spaces) const { diff --git a/llarp/util/encode.hpp b/llarp/util/encode.hpp index 58cd1f09e..b0aab609e 100644 --- a/llarp/util/encode.hpp +++ b/llarp/util/encode.hpp @@ -150,6 +150,20 @@ namespace llarp return sz == 0; } + template + bool + HexDecode(InputIt begin, InputIt end, OutputIt target, size_t sz) + { + while (begin != end && sz) + { + const auto first = *begin++; + const auto second = *begin++; + *(target++) = char2int(first) * 16 + char2int(second); + --sz; + } + return sz == 0; + } + static const char base64_table[] = { 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f',