initial wack at lokimq

pull/1306/head
Jeff Becker 4 years ago
parent 91725a8530
commit 0006751d80
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

1
.gitmodules vendored

@ -30,3 +30,4 @@
[submodule "external/loki-mq"]
path = external/loki-mq
url = https://github.com/loki-project/loki-mq
branch = dev

@ -276,9 +276,6 @@ if(SUBMODULE_CHECK)
endif()
endif()
add_subdirectory(external/loki-mq)
include_directories(external/loki-mq)
include_directories(external/loki-mq/mapbox-variant/include)
# We only actually need pybind11 with WITH_HIVE, but if we don't load it here then something further
# down loads a broken PythonInterp that loads Python2, but Python2 headers are not C++17 compatible.

@ -62,9 +62,16 @@ elseif(DOWNLOAD_UV)
add_definitions(-D_LARGEFILE_SOURCE)
add_definitions(-D_FILE_OFFSET_BITS=64)
endif()
include_directories(${LIBUV_INCLUDE_DIRS})
#find_package(LokiMQ)
#if(LokiMQ_FOUND)
# message(STATUS "using system lokimq")
#else()
message(STATUS "using lokimq submodule")
add_subdirectory(${CMAKE_SOURCE_DIR}/external/loki-mq)
#endif()
if(EMBEDDED_CFG OR ${CMAKE_SYSTEM_NAME} MATCHES "Linux")
link_libatomic()
endif()

@ -221,7 +221,7 @@ if(WITH_HIVE)
target_sources(liblokinet PRIVATE tooling/router_hive.cpp)
endif()
target_link_libraries(liblokinet PUBLIC cxxopts lokinet-platform lokinet-util lokinet-cryptography)
target_link_libraries(liblokinet PUBLIC cxxopts lokinet-platform lokinet-util lokinet-cryptography lokimq)
target_link_libraries(liblokinet PRIVATE libunbound)

@ -21,6 +21,11 @@ struct llarp_dht_context;
struct llarp_nodedb;
struct llarp_threadpool;
namespace lokimq
{
class LokiMQ;
}
namespace llarp
{
class Logic;
@ -62,6 +67,8 @@ namespace llarp
class ThreadPool;
}
using LMQ_ptr = std::shared_ptr<lokimq::LokiMQ>;
struct AbstractRouter
{
#ifdef LOKINET_HIVE
@ -73,6 +80,9 @@ namespace llarp
virtual bool
HandleRecvLinkMessageBuffer(ILinkSession* from, const llarp_buffer_t& msg) = 0;
virtual LMQ_ptr
lmq() const = 0;
virtual std::shared_ptr<Logic>
logic() const = 0;
@ -239,7 +249,7 @@ namespace llarp
/// set router's service node whitelist
virtual void
SetRouterWhitelist(const std::vector<RouterID>& routers) = 0;
SetRouterWhitelist(const std::vector<RouterID> routers) = 0;
/// visit each connected link session
virtual void

@ -38,6 +38,8 @@
#include <systemd/sd-daemon.h>
#endif
#include <lokimq/lokimq.h>
static constexpr std::chrono::milliseconds ROUTER_TICK_INTERVAL = 1s;
namespace llarp
@ -47,6 +49,7 @@ namespace llarp
llarp_ev_loop_ptr __netloop,
std::shared_ptr<Logic> l)
: ready(false)
, m_lmq(std::make_shared<lokimq::LokiMQ>())
, _netloop(std::move(__netloop))
, cryptoworker(std::move(_tp))
, _logic(std::move(l))
@ -61,6 +64,7 @@ namespace llarp
#else
, _randomStartDelay(std::chrono::seconds((llarp::randint() % 30) + 10))
#endif
, m_lokidRpcClient(m_lmq, this)
{
m_keyManager = std::make_shared<KeyManager>();
@ -394,7 +398,7 @@ namespace llarp
// Lokid Config
usingSNSeed = conf->lokid.usingSNSeed;
whitelistRouters = conf->lokid.whitelistRouters;
lokidRPCAddr = IpAddress(conf->lokid.lokidRPCAddr); // TODO: make config's option an IpAddress
lokidRPCAddr = conf->lokid.lokidRPCAddr;
lokidRPCUser = conf->lokid.lokidRPCUser;
lokidRPCPassword = conf->lokid.lokidRPCPassword;
@ -808,7 +812,7 @@ namespace llarp
}
void
Router::SetRouterWhitelist(const std::vector<RouterID>& routers)
Router::SetRouterWhitelist(const std::vector<RouterID> routers)
{
_rcLookupHandler.SetRouterWhitelist(routers);
}
@ -839,9 +843,11 @@ namespace llarp
if (whitelistRouters)
{
LogInfo("RPC Caller to ", lokidRPCAddr, " started");
m_lokidRpcClient.ConnectAsync(std::string_view{lokidRPCAddr});
}
m_lmq->start();
if (!cryptoworker->start())
{
LogError("crypto worker failed to start");

@ -73,6 +73,14 @@ namespace llarp
/// should we obey the service node whitelist?
bool whitelistRouters = false;
LMQ_ptr m_lmq;
LMQ_ptr
lmq() const override
{
return m_lmq;
}
std::shared_ptr<Logic>
logic() const override
{
@ -113,7 +121,7 @@ namespace llarp
}
void
SetRouterWhitelist(const std::vector<RouterID>& routers) override;
SetRouterWhitelist(const std::vector<RouterID> routers) override;
exit::Context&
exitContext() override
@ -249,7 +257,7 @@ namespace llarp
rpc::LokidRpcClient m_lokidRpcClient;
IpAddress lokidRPCAddr = IpAddress("127.0.0.1:22023");
std::string lokidRPCAddr = "ipc://loki.sock";
std::string lokidRPCUser;
std::string lokidRPCPassword;

@ -3,34 +3,19 @@
#include <util/logging/logger.h>
#include <util/logging/logger.hpp>
#include <future>
#include <router/abstractrouter.hpp>
#include <nlohmann/json.hpp>
#include <util/time.hpp>
#include <util/thread/logic.hpp>
namespace llarp
{
namespace rpc
{
static ::LogLevel
fromLokiMQLogLevel(lokimq::LogLevel level)
{
switch (level)
{
case lokimq::LogLevel::fatal:
case lokimq::LogLevel::error:
return eLogError;
case lokimq::LogLevel::warn:
return eLogWarn;
case lokimq::LogLevel::info:
return eLogInfo;
case lokimq::LogLevel::debug:
case lokimq::LogLevel::trace:
return eLogDebug;
default:
return eLogNone;
}
}
static lokimq::LogLevel
toLokiMQLogLevel(::LogLevel level)
toLokiMQLogLevel(llarp::LogLevel level)
{
switch (level)
{
@ -48,65 +33,130 @@ namespace llarp
}
}
static void
lokimqLogger(lokimq::LogLevel level, const char* file, std::string msg)
LokidRpcClient::LokidRpcClient(LMQ_ptr lmq, AbstractRouter* r)
: m_lokiMQ(std::move(lmq)), m_Router(r)
{
switch (level)
{
case lokimq::LogLevel::fatal:
case lokimq::LogLevel::error:
LogError(msg);
break;
case lokimq::LogLevel::warn:
LogWarn(msg);
break;
case lokimq::LogLevel::info:
LogInfo(msg);
break;
case lokimq::LogLevel::debug:
case lokimq::LogLevel::trace:
LogDebug(msg);
break;
}
// m_lokiMQ->log_level(toLokiMQLogLevel(LogLevel::Instance().curLevel));
}
LokidRpcClient::LokidRpcClient(std::string lokidPubkey)
: m_lokiMQ(lokimqLogger, lokimq::LogLevel::debug), m_lokidPubkey(std::move(lokidPubkey))
void
LokidRpcClient::ConnectAsync(std::string_view url)
{
m_lokiMQ.log_level(toLokiMQLogLevel(LogLevel::Instance().curLevel));
LogInfo("connecting to lokid via LMQ at ", url);
m_lokiMQ->connect_remote(
std::move(url),
[self = shared_from_this()](lokimq::ConnectionID c) {
self->m_Connection = std::move(c);
self->Connected();
},
[](lokimq::ConnectionID, std::string_view f) {
llarp::LogWarn("Failed to connect to lokid: ", f);
});
}
void
LokidRpcClient::connect()
LokidRpcClient::Command(std::string_view cmd)
{
m_lokidConnectionId = m_lokiMQ.connect_sn(m_lokidPubkey); // not a blocking call
LogDebug("lokid command: ", cmd);
m_lokiMQ->send(*m_Connection, std::move(cmd));
}
std::future<void>
LokidRpcClient::ping()
void
LokidRpcClient::Connected()
{
throw std::runtime_error("TODO: LokidRpcClient::ping()");
}
constexpr auto PingInterval = 1min;
constexpr auto NodeListUpdateInterval = 90s;
std::future<std::string>
LokidRpcClient::requestNextBlockHash()
{
throw std::runtime_error("TODO: LokidRpcClient::requestNextBlockHash()");
}
LogInfo("we connected to lokid [", *m_Connection, "]");
m_lokiMQ->add_timer(
[self = shared_from_this()]() {
LogInfo("Telling lokid we are live");
self->Command("rpc.lokinet_ping");
},
PingInterval);
std::future<std::vector<RouterID>>
LokidRpcClient::requestServiceNodeList()
{
throw std::runtime_error("TODO: LokidRpcClient::requestServiceNodeList()");
m_lokiMQ->add_timer(
[self = shared_from_this()]() {
nlohmann::json request;
request["pubkey_ed25519"] = true;
request["active_only"] = true;
if (not self->m_CurrentBlockHash.empty())
request["poll_block_hash"] = self->m_CurrentBlockHash;
self->Request(
"rpc.get_service_nodes",
[self](bool success, std::vector<std::string> data) {
if (not success)
{
LogWarn("failed to update service node list");
return;
}
if (data.size() < 2)
{
LogWarn("lokid gave empty reply for service node list");
return;
}
try
{
self->HandleGotServiceNodeList(std::move(data[1]));
}
catch (std::exception& ex)
{
LogError("failed to process service node list: ", ex.what());
}
},
request.dump());
},
NodeListUpdateInterval);
}
void
LokidRpcClient::request()
LokidRpcClient::HandleGotServiceNodeList(std::string data)
{
// TODO: ensure we are connected
// m_lokiMQ.request(m_lokidConnectionId, ...);
auto j = nlohmann::json::parse(std::move(data));
{
const auto itr = j.find("block_hash");
if (itr != j.end())
{
m_CurrentBlockHash = itr->get<std::string>();
}
}
{
const auto itr = j.find("unchanged");
if (itr != j.end())
{
if (itr->get<bool>())
{
LogDebug("service node list unchanged");
return;
}
}
}
std::vector<RouterID> nodeList;
{
const auto itr = j.find("service_node_states");
if (itr != j.end() and itr->is_array())
{
for (auto j_itr = itr->begin(); j_itr != itr->end(); j_itr++)
{
const auto ed_itr = j_itr->find("pubkey_ed25519");
if (ed_itr == j_itr->end() or not ed_itr->is_string())
continue;
RouterID rid;
if (rid.FromHex(ed_itr->get<std::string>()))
nodeList.emplace_back(std::move(rid));
}
}
}
if (nodeList.empty())
{
LogWarn("got empty service node list from lokid");
return;
}
throw std::runtime_error("TODO: LokidRpcClient::request()");
// inform router about the new list
LogicCall(m_Router->logic(), [r = m_Router, nodeList]() { r->SetRouterWhitelist(nodeList); });
}
} // namespace rpc

@ -4,14 +4,16 @@
#include <lokimq/lokimq.h>
#include <future>
namespace llarp
{
struct AbstractRouter;
namespace rpc
{
using LMQ_ptr = std::shared_ptr<lokimq::LokiMQ>;
/// The LokidRpcClient uses loki-mq to talk to make API requests to lokid.
struct LokidRpcClient
struct LokidRpcClient : public std::enable_shared_from_this<LokidRpcClient>
{
/// Not copyable or movable (because lokimq::LokiMQ is not copyable or movable).
/// Consider wrapping in a std::unique_ptr or std::shared_ptr if you need to pass this around.
@ -23,36 +25,36 @@ namespace llarp
operator=(LokidRpcClient&&) = delete;
/// Constructor
/// TODO: take lokid pubkey and other auth parameters
LokidRpcClient(std::string lokidPubkey);
LokidRpcClient(LMQ_ptr lmq, AbstractRouter* r);
/// Connect to lokid
/// Connect to lokid async
void
connect();
/// Initiates a ping request to lokid, currently used to let lokid know that lokinet is still
/// running (required to prevent a Service Node from being deregistered).
///
/// This uses the "lokinet_ping" API endpoint.
std::future<void>
ping();
/// Requests the most recent known block hash from lokid
///
/// This uses the "poll_block_hash" API endpoint.
std::future<std::string>
requestNextBlockHash();
/// Requests a full list of known service nodes from lokid
///
/// This uses the "get_n_service_nodes" API endpoint.
std::future<std::vector<RouterID>>
requestServiceNodeList();
ConnectAsync(std::string_view url);
private:
std::string m_lokidPubkey;
lokimq::ConnectionID m_lokidConnectionId;
lokimq::LokiMQ m_lokiMQ;
/// called when we have connected to lokid via lokimq
void
Connected();
/// do a lmq command on the current connection
void
Command(std::string_view cmd);
template <typename HandlerFunc_t, typename Args_t>
void
Request(std::string_view cmd, HandlerFunc_t func, const Args_t& args)
{
m_lokiMQ->request(*m_Connection, std::move(cmd), std::move(func), args);
}
void
HandleGotServiceNodeList(std::string json);
std::optional<lokimq::ConnectionID> m_Connection;
LMQ_ptr m_lokiMQ;
std::string m_CurrentBlockHash;
AbstractRouter* const m_Router;
void
request();

@ -265,6 +265,12 @@ namespace llarp
return ToHex().substr(0, 8);
}
bool
FromHex(std::string_view str)
{
return HexDecode(str.begin(), str.end(), data(), size());
}
std::ostream&
print(std::ostream& stream, int level, int spaces) const
{

@ -150,6 +150,20 @@ namespace llarp
return sz == 0;
}
template <typename InputIt, typename OutputIt>
bool
HexDecode(InputIt begin, InputIt end, OutputIt target, size_t sz)
{
while (begin != end && sz)
{
const auto first = *begin++;
const auto second = *begin++;
*(target++) = char2int(first) * 16 + char2int(second);
--sz;
}
return sz == 0;
}
static const char base64_table[] = {
'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P',
'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f',

Loading…
Cancel
Save