Merge pull request #1529 from majestrate/nodedb-refactor-2021-02-02

NodeDB refactor
pull/1531/head
Jeff 3 years ago committed by GitHub
commit c5a423d3f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -162,16 +162,16 @@ endif()
option(FORCE_LOKIMQ_SUBMODULE "force using lokimq submodule" OFF) option(FORCE_LOKIMQ_SUBMODULE "force using lokimq submodule" OFF)
if(NOT FORCE_LOKIMQ_SUBMODULE) if(NOT FORCE_LOKIMQ_SUBMODULE)
pkg_check_modules(LOKIMQ liblokimq>=1.2.2) pkg_check_modules(LOKIMQ liboxenmq>=1.2.2)
endif() endif()
if(LOKIMQ_FOUND) if(LOKIMQ_FOUND)
add_library(lokimq INTERFACE) add_library(oxenmq INTERFACE)
link_dep_libs(lokimq INTERFACE "${LOKIMQ_LIBRARY_DIRS}" ${LOKIMQ_LIBRARIES}) link_dep_libs(oxenmq INTERFACE "${LOKIMQ_LIBRARY_DIRS}" ${LOKIMQ_LIBRARIES})
target_include_directories(lokimq INTERFACE ${LOKIMQ_INCLUDE_DIRS}) target_include_directories(oxenmq INTERFACE ${LOKIMQ_INCLUDE_DIRS})
add_library(lokimq::lokimq ALIAS lokimq) add_library(oxenmq::oxenmq ALIAS oxenmq)
message(STATUS "Found system liblokimq ${LOKIMQ_VERSION}") message(STATUS "Found system liboxenmq ${LOKIMQ_VERSION}")
else() else()
message(STATUS "using lokimq submodule") message(STATUS "using oxenmq submodule")
add_subdirectory(${CMAKE_SOURCE_DIR}/external/loki-mq) add_subdirectory(${CMAKE_SOURCE_DIR}/external/loki-mq)
endif() endif()

@ -41,11 +41,11 @@ set(SODIUM_SOURCE libsodium-${SODIUM_VERSION}.tar.gz)
set(SODIUM_HASH SHA512=17e8638e46d8f6f7d024fe5559eccf2b8baf23e143fadd472a7d29d228b186d86686a5e6920385fe2020729119a5f12f989c3a782afbd05a8db4819bb18666ef set(SODIUM_HASH SHA512=17e8638e46d8f6f7d024fe5559eccf2b8baf23e143fadd472a7d29d228b186d86686a5e6920385fe2020729119a5f12f989c3a782afbd05a8db4819bb18666ef
CACHE STRING "libsodium source hash") CACHE STRING "libsodium source hash")
set(ZMQ_VERSION 4.3.3 CACHE STRING "libzmq version") set(ZMQ_VERSION 4.3.4 CACHE STRING "libzmq version")
set(ZMQ_MIRROR ${LOCAL_MIRROR} https://github.com/zeromq/libzmq/releases/download/v${ZMQ_VERSION} set(ZMQ_MIRROR ${LOCAL_MIRROR} https://github.com/zeromq/libzmq/releases/download/v${ZMQ_VERSION}
CACHE STRING "libzmq mirror(s)") CACHE STRING "libzmq mirror(s)")
set(ZMQ_SOURCE zeromq-${ZMQ_VERSION}.tar.gz) set(ZMQ_SOURCE zeromq-${ZMQ_VERSION}.tar.gz)
set(ZMQ_HASH SHA512=4c18d784085179c5b1fcb753a93813095a12c8d34970f2e1bfca6499be6c9d67769c71c68b7ca54ff181b20390043170e89733c22f76ff1ea46494814f7095b1 set(ZMQ_HASH SHA512=e198ef9f82d392754caadd547537666d4fba0afd7d027749b3adae450516bcf284d241d4616cad3cb4ad9af8c10373d456de92dc6d115b037941659f141e7c0e
CACHE STRING "libzmq source hash") CACHE STRING "libzmq source hash")
set(LIBUV_VERSION 1.40.0 CACHE STRING "libuv version") set(LIBUV_VERSION 1.40.0 CACHE STRING "libuv version")
@ -283,10 +283,7 @@ endif()
if(CMAKE_CROSSCOMPILING AND ARCH_TRIPLET MATCHES mingw) if(CMAKE_CROSSCOMPILING AND ARCH_TRIPLET MATCHES mingw)
set(zmq_patch set(zmq_patch
PATCH_COMMAND ${PROJECT_SOURCE_DIR}/contrib/apply-patches.sh ${PROJECT_SOURCE_DIR}/contrib/patches/libzmq-mingw-wepoll.patch) PATCH_COMMAND ${PROJECT_SOURCE_DIR}/contrib/apply-patches.sh ${PROJECT_SOURCE_DIR}/contrib/patches/libzmq-mingw-wepoll.patch ${PROJECT_SOURCE_DIR}/contrib/patches/libzmq-mingw-closesocket.patch)
if(ZMQ_VERSION VERSION_LESS 4.3.4)
set(zmq_patch ${zmq_patch} ${PROJECT_SOURCE_DIR}/contrib/patches/libzmq-mingw-closesocket.patch)
endif()
endif() endif()
build_external(zmq build_external(zmq

@ -1,5 +1,5 @@
#!/bin/bash #!/bin/bash
mkdir -p build-windows mkdir -p build-windows
cd build-windows cd build-windows
cmake -G Ninja -DCMAKE_CROSSCOMPILE=ON -DCMAKE_EXE_LINKER_FLAGS=-fstack-protector -DCMAKE_CXX_FLAGS=-fdiagnostics-color=always -DCMAKE_TOOLCHAIN_FILE=../contrib/cross/mingw64.cmake -DBUILD_STATIC_DEPS=ON -DBUILD_PACKAGE=ON -DBUILD_SHARED_LIBS=OFF -DBUILD_TESTING=OFF -DWITH_TESTS=OFF -DNATIVE_BUILD=OFF -DSTATIC_LINK=ON -DWITH_SYSTEMD=OFF -DFORCE_LOKIMQ_SUBMODULE=ON -DSUBMODULE_CHECK=OFF -DWITH_LTO=OFF -DCMAKE_BUILD_TYPE=Release -DCMAKE_CROSSCOMPLING=ON .. cmake -G Ninja -DCMAKE_EXE_LINKER_FLAGS=-fstack-protector -DCMAKE_CXX_FLAGS=-fdiagnostics-color=always -DCMAKE_TOOLCHAIN_FILE=../contrib/cross/mingw64.cmake -DBUILD_STATIC_DEPS=ON -DBUILD_PACKAGE=ON -DBUILD_SHARED_LIBS=OFF -DBUILD_TESTING=OFF -DWITH_TESTS=OFF -DNATIVE_BUILD=OFF -DSTATIC_LINK=ON -DWITH_SYSTEMD=OFF -DFORCE_LOKIMQ_SUBMODULE=ON -DSUBMODULE_CHECK=OFF -DWITH_LTO=OFF -DCMAKE_BUILD_TYPE=Release ..
ninja package ninja package

@ -259,21 +259,21 @@ run_main_context(std::optional<fs::path> confFile, const llarp::RuntimeOptions o
llarp::LogTrace("start of run_main_context()"); llarp::LogTrace("start of run_main_context()");
try try
{ {
std::unique_ptr<llarp::Config> conf; std::shared_ptr<llarp::Config> conf;
if (confFile.has_value()) if (confFile.has_value())
{ {
llarp::LogInfo("Using config file: ", *confFile); llarp::LogInfo("Using config file: ", *confFile);
conf = std::make_unique<llarp::Config>(confFile->parent_path()); conf = std::make_shared<llarp::Config>(confFile->parent_path());
} }
else else
{ {
conf = std::make_unique<llarp::Config>(llarp::GetDefaultDataDir()); conf = std::make_shared<llarp::Config>(llarp::GetDefaultDataDir());
} }
if (!conf->Load(confFile, opts.isRouter)) if (!conf->Load(confFile, opts.isRouter))
throw std::runtime_error{"Config file parsing failed"}; throw std::runtime_error{"Config file parsing failed"};
ctx = std::make_shared<llarp::Context>(); ctx = std::make_shared<llarp::Context>();
ctx->Configure(*conf); ctx->Configure(std::move(conf));
signal(SIGINT, handle_signal); signal(SIGINT, handle_signal);
signal(SIGTERM, handle_signal); signal(SIGTERM, handle_signal);
@ -339,13 +339,16 @@ class WindowsServiceStopped
LONG LONG
GenerateDump(EXCEPTION_POINTERS* pExceptionPointers) GenerateDump(EXCEPTION_POINTERS* pExceptionPointers)
{ {
const DWORD flags = MiniDumpWithFullMemory | MiniDumpWithFullMemoryInfo | MiniDumpWithHandleData
| MiniDumpWithUnloadedModules | MiniDumpWithThreadInfo;
std::stringstream ss; std::stringstream ss;
ss << "C:\\ProgramData\\lokinet\\crash-" << llarp::time_now_ms().count() << ".dmp"; ss << "C:\\ProgramData\\lokinet\\crash-" << llarp::time_now_ms().count() << ".dmp";
const std::string fname = ss.str(); const std::string fname = ss.str();
HANDLE hDumpFile; HANDLE hDumpFile;
SYSTEMTIME stLocalTime; SYSTEMTIME stLocalTime;
GetLocalTime(&stLocalTime); GetLocalTime(&stLocalTime);
MINIDUMP_EXCEPTION_INFORMATION ExpParam; MINIDUMP_EXCEPTION_INFORMATION ExpParam{};
hDumpFile = CreateFile( hDumpFile = CreateFile(
fname.c_str(), fname.c_str(),
@ -356,18 +359,11 @@ GenerateDump(EXCEPTION_POINTERS* pExceptionPointers)
0, 0,
0); 0);
ExpParam.ThreadId = GetCurrentThreadId();
ExpParam.ExceptionPointers = pExceptionPointers; ExpParam.ExceptionPointers = pExceptionPointers;
ExpParam.ClientPointers = TRUE; ExpParam.ClientPointers = TRUE;
MiniDumpWriteDump( MiniDumpWriteDump(
GetCurrentProcess(), GetCurrentProcess(), GetCurrentProcessId(), hDumpFile, flags, &ExpParam, NULL, NULL);
GetCurrentProcessId(),
hDumpFile,
MiniDumpWithDataSegs,
&ExpParam,
NULL,
NULL);
return 1; return 1;
} }

2
external/loki-mq vendored

@ -1 +1 @@
Subproject commit e7487fd0c8ee843b3a3df16563ee42dad2fde050 Subproject commit 506bd65b05b1cd5b11b4d1e3392306e4473f5ba3

@ -1,27 +1,31 @@
#ifndef LLARP_HPP #ifndef LLARP_HPP
#define LLARP_HPP #define LLARP_HPP
#include <llarp.h> #include <llarp.h>
#include <util/fs.hpp>
#include <util/types.hpp>
#include <ev/ev.hpp>
#include <ev/vpn.hpp>
#include <nodedb.hpp>
#include <crypto/crypto.hpp>
#include <router/abstractrouter.hpp>
#include <future>
#include <iostream> #include <iostream>
#include <map> #include <map>
#include <memory> #include <memory>
#include <string> #include <string>
#include <vector> #include <vector>
struct llarp_ev_loop;
namespace llarp namespace llarp
{ {
namespace vpn
{
class Platform;
}
class Logic; class Logic;
struct Config; struct Config;
struct RouterContact; struct RouterContact;
struct Config;
struct Crypto;
struct CryptoManager;
struct AbstractRouter;
struct EventLoop;
class NodeDB;
namespace thread namespace thread
{ {
class ThreadPool; class ThreadPool;
@ -36,12 +40,12 @@ namespace llarp
struct Context struct Context
{ {
std::unique_ptr<Crypto> crypto = nullptr; std::shared_ptr<Crypto> crypto = nullptr;
std::unique_ptr<CryptoManager> cryptoManager = nullptr; std::shared_ptr<CryptoManager> cryptoManager = nullptr;
std::unique_ptr<AbstractRouter> router = nullptr; std::shared_ptr<AbstractRouter> router = nullptr;
std::shared_ptr<Logic> logic = nullptr; std::shared_ptr<Logic> logic = nullptr;
std::unique_ptr<llarp_nodedb> nodedb = nullptr; std::shared_ptr<NodeDB> nodedb = nullptr;
llarp_ev_loop_ptr mainloop; std::shared_ptr<EventLoop> mainloop;
std::string nodedb_dir; std::string nodedb_dir;
virtual ~Context() = default; virtual ~Context() = default;
@ -49,9 +53,6 @@ namespace llarp
void void
Close(); Close();
int
LoadDatabase();
void void
Setup(const RuntimeOptions& opts); Setup(const RuntimeOptions& opts);
@ -62,10 +63,8 @@ namespace llarp
HandleSignal(int sig); HandleSignal(int sig);
/// Configure given the specified config. /// Configure given the specified config.
///
/// note: consider using std::move() when passing conf in.
void void
Configure(Config conf); Configure(std::shared_ptr<Config> conf);
/// handle SIGHUP /// handle SIGHUP
void void
@ -93,11 +92,11 @@ namespace llarp
/// Creates a router. Can be overridden to allow a different class of router /// Creates a router. Can be overridden to allow a different class of router
/// to be created instead. Defaults to llarp::Router. /// to be created instead. Defaults to llarp::Router.
virtual std::unique_ptr<AbstractRouter> virtual std::shared_ptr<AbstractRouter>
makeRouter(llarp_ev_loop_ptr __netloop, std::shared_ptr<Logic> logic); makeRouter(std::shared_ptr<EventLoop> __netloop, std::shared_ptr<Logic> logic);
/// create the vpn platform for use in creating network interfaces /// create the vpn platform for use in creating network interfaces
virtual std::unique_ptr<llarp::vpn::Platform> virtual std::shared_ptr<llarp::vpn::Platform>
makeVPNPlatform(); makeVPNPlatform();
protected: protected:

@ -39,7 +39,7 @@ target_link_libraries(lokinet-util PUBLIC
nlohmann_json::nlohmann_json nlohmann_json::nlohmann_json
filesystem filesystem
date::date date::date
lokimq oxenmq
sqlite3 sqlite3
) )

@ -881,7 +881,7 @@ namespace llarp
"but can use (non-default) TCP if lokid is configured that way:", "but can use (non-default) TCP if lokid is configured that way:",
" rpc=tcp://127.0.0.1:5678", " rpc=tcp://127.0.0.1:5678",
}, },
[this](std::string arg) { lokidRPCAddr = lokimq::address(arg); }); [this](std::string arg) { lokidRPCAddr = oxenmq::address(arg); });
// Deprecated options: // Deprecated options:
conf.defineOption<std::string>("lokid", "username", Deprecated); conf.defineOption<std::string>("lokid", "username", Deprecated);

@ -23,7 +23,7 @@
#include <vector> #include <vector>
#include <unordered_set> #include <unordered_set>
#include <lokimq/address.h> #include <oxenmq/address.h>
namespace llarp namespace llarp
{ {
@ -157,7 +157,7 @@ namespace llarp
{ {
bool whitelistRouters = false; bool whitelistRouters = false;
fs::path ident_keyfile; fs::path ident_keyfile;
lokimq::address lokidRPCAddr; oxenmq::address lokidRPCAddr;
void void
defineConfigOptions(ConfigDefinition& conf, const ConfigGenParameters& params); defineConfigOptions(ConfigDefinition& conf, const ConfigGenParameters& params);

@ -28,16 +28,16 @@ namespace llarp
} }
void void
Context::Configure(Config conf) Context::Configure(std::shared_ptr<Config> conf)
{ {
if (nullptr != config.get()) if (nullptr != config.get())
throw std::runtime_error("Config already exists"); throw std::runtime_error("Config already exists");
config = std::make_shared<Config>(std::move(conf)); config = std::move(conf);
logic = std::make_shared<Logic>(); logic = std::make_shared<Logic>();
nodedb_dir = fs::path(config->router.m_dataDir / nodedb_dirname).string(); nodedb_dir = fs::path{config->router.m_dataDir / nodedb_dirname}.string();
} }
bool bool
@ -52,13 +52,6 @@ namespace llarp
return router && router->LooksAlive(); return router && router->LooksAlive();
} }
int
Context::LoadDatabase()
{
llarp_nodedb::ensure_dir(nodedb_dir.c_str());
return 1;
}
void void
Context::Setup(const RuntimeOptions& opts) Context::Setup(const RuntimeOptions& opts)
{ {
@ -77,31 +70,25 @@ namespace llarp
mainloop->set_logic(logic); mainloop->set_logic(logic);
crypto = std::make_unique<sodium::CryptoLibSodium>(); crypto = std::make_shared<sodium::CryptoLibSodium>();
cryptoManager = std::make_unique<CryptoManager>(crypto.get()); cryptoManager = std::make_shared<CryptoManager>(crypto.get());
router = makeRouter(mainloop, logic); router = makeRouter(mainloop, logic);
nodedb = std::make_unique<llarp_nodedb>( nodedb = std::make_shared<NodeDB>(
nodedb_dir, [r = router.get()](auto call) { r->QueueDiskIO(std::move(call)); }); nodedb_dir, [r = router.get()](auto call) { r->QueueDiskIO(std::move(call)); });
if (!router->Configure(config, opts.isRouter, nodedb.get())) if (!router->Configure(config, opts.isRouter, nodedb))
throw std::runtime_error("Failed to configure router"); throw std::runtime_error("Failed to configure router");
// must be done after router is made so we can use its disk io worker
// must also be done after configure so that netid is properly set if it
// is provided by config
if (!this->LoadDatabase())
throw std::runtime_error("Config::Setup() failed to load database");
} }
std::unique_ptr<AbstractRouter> std::shared_ptr<AbstractRouter>
Context::makeRouter(llarp_ev_loop_ptr netloop, std::shared_ptr<Logic> logic) Context::makeRouter(std::shared_ptr<EventLoop> netloop, std::shared_ptr<Logic> logic)
{ {
return std::make_unique<Router>(netloop, logic, makeVPNPlatform()); return std::make_shared<Router>(netloop, logic, makeVPNPlatform());
} }
std::unique_ptr<vpn::Platform> std::shared_ptr<vpn::Platform>
Context::makeVPNPlatform() Context::makeVPNPlatform()
{ {
auto plat = vpn::MakeNativePlatform(this); auto plat = vpn::MakeNativePlatform(this);
@ -195,10 +182,10 @@ namespace llarp
config.reset(); config.reset();
llarp::LogDebug("free nodedb"); llarp::LogDebug("free nodedb");
nodedb.release(); nodedb.reset();
llarp::LogDebug("free router"); llarp::LogDebug("free router");
router.release(); router.reset();
llarp::LogDebug("free logic"); llarp::LogDebug("free logic");
logic.reset(); logic.reset();

@ -7,7 +7,7 @@
#include <iterator> #include <iterator>
#include <lokimq/hex.h> #include <oxenmq/hex.h>
#include <sodium/crypto_sign.h> #include <sodium/crypto_sign.h>
#include <sodium/crypto_sign_ed25519.h> #include <sodium/crypto_sign_ed25519.h>
@ -20,14 +20,14 @@ namespace llarp
{ {
if (str.size() != 2 * size()) if (str.size() != 2 * size())
return false; return false;
lokimq::from_hex(str.begin(), str.end(), begin()); oxenmq::from_hex(str.begin(), str.end(), begin());
return true; return true;
} }
std::string std::string
PubKey::ToString() const PubKey::ToString() const
{ {
return lokimq::to_hex(begin(), end()); return oxenmq::to_hex(begin(), end());
} }
bool bool

@ -228,7 +228,12 @@ namespace llarp
bool bool
GetRCFromNodeDB(const Key_t& k, llarp::RouterContact& rc) const override GetRCFromNodeDB(const Key_t& k, llarp::RouterContact& rc) const override
{ {
return router->nodedb()->Get(k.as_array(), rc); if (const auto maybe = router->nodedb()->Get(k.as_array()); maybe.has_value())
{
rc = *maybe;
return true;
}
return false;
} }
PendingIntrosetLookups _pendingIntrosetLookups; PendingIntrosetLookups _pendingIntrosetLookups;

@ -106,7 +106,7 @@ namespace llarp
} }
auto closestRCs = auto closestRCs =
dht.GetRouter()->nodedb()->FindClosestTo(location, IntroSetStorageRedundancy); dht.GetRouter()->nodedb()->FindManyClosestTo(location, IntroSetStorageRedundancy);
if (closestRCs.size() <= relayOrder) if (closestRCs.size() <= relayOrder)
{ {

@ -1,5 +1,5 @@
#include <dht/messages/findname.hpp> #include <dht/messages/findname.hpp>
#include <lokimq/bt_serialize.h> #include <oxenmq/bt_serialize.h>
#include <dht/context.hpp> #include <dht/context.hpp>
#include <dht/messages/gotname.hpp> #include <dht/messages/gotname.hpp>
#include <router/abstractrouter.hpp> #include <router/abstractrouter.hpp>
@ -16,8 +16,8 @@ namespace llarp::dht
bool bool
FindNameMessage::BEncode(llarp_buffer_t* buf) const FindNameMessage::BEncode(llarp_buffer_t* buf) const
{ {
const auto data = lokimq::bt_serialize( const auto data = oxenmq::bt_serialize(
lokimq::bt_dict{{"A", "N"sv}, oxenmq::bt_dict{{"A", "N"sv},
{"H", std::string_view{(char*)NameHash.data(), NameHash.size()}}, {"H", std::string_view{(char*)NameHash.data(), NameHash.size()}},
{"T", TxID}}); {"T", TxID}});
return buf->write(data.begin(), data.end()); return buf->write(data.begin(), data.end());

@ -1,5 +1,5 @@
#include <dht/messages/gotname.hpp> #include <dht/messages/gotname.hpp>
#include <lokimq/bt_serialize.h> #include <oxenmq/bt_serialize.h>
#include <dht/context.hpp> #include <dht/context.hpp>
#include <router/abstractrouter.hpp> #include <router/abstractrouter.hpp>
#include <path/path_context.hpp> #include <path/path_context.hpp>
@ -19,8 +19,8 @@ namespace llarp::dht
GotNameMessage::BEncode(llarp_buffer_t* buf) const GotNameMessage::BEncode(llarp_buffer_t* buf) const
{ {
const std::string nonce((const char*)result.nonce.data(), result.nonce.size()); const std::string nonce((const char*)result.nonce.data(), result.nonce.size());
const auto data = lokimq::bt_serialize( const auto data = oxenmq::bt_serialize(
lokimq::bt_dict{{"A", "M"sv}, {"D", result.ciphertext}, {"N", nonce}, {"T", TxID}}); oxenmq::bt_dict{{"A", "M"sv}, {"D", result.ciphertext}, {"N", nonce}, {"T", TxID}});
return buf->write(data.begin(), data.end()); return buf->write(data.begin(), data.end());
} }

@ -86,7 +86,8 @@ namespace llarp
} }
// identify closest 4 routers // identify closest 4 routers
auto closestRCs = dht.GetRouter()->nodedb()->FindClosestTo(addr, IntroSetStorageRedundancy); auto closestRCs =
dht.GetRouter()->nodedb()->FindManyClosestTo(addr, IntroSetStorageRedundancy);
if (closestRCs.size() != IntroSetStorageRedundancy) if (closestRCs.size() != IntroSetStorageRedundancy)
{ {
llarp::LogWarn("Received PublishIntroMessage but only know ", closestRCs.size(), " nodes"); llarp::LogWarn("Received PublishIntroMessage but only know ", closestRCs.size(), " nodes");

@ -81,7 +81,7 @@ namespace llarp::vpn
}; };
/// create native vpn platform /// create native vpn platform
std::unique_ptr<Platform> std::shared_ptr<Platform>
MakeNativePlatform(llarp::Context* ctx); MakeNativePlatform(llarp::Context* ctx);
} // namespace llarp::vpn } // namespace llarp::vpn

@ -70,30 +70,10 @@ namespace llarp
m_SnodeBlacklist.insert(std::move(snode)); m_SnodeBlacklist.insert(std::move(snode));
} }
bool std::optional<std::vector<RouterContact>>
BaseSession::SelectHop( BaseSession::GetHopsForBuild()
llarp_nodedb* db,
const std::set<RouterID>& prev,
RouterContact& cur,
size_t hop,
llarp::path::PathRole roles)
{ {
std::set<RouterID> exclude = prev; return GetHopsAlignedToForBuild(m_ExitRouter);
for (const auto& snode : m_SnodeBlacklist)
{
if (snode != m_ExitRouter)
exclude.insert(snode);
}
exclude.insert(m_ExitRouter);
if (hop == numHops - 1)
{
if (db->Get(m_ExitRouter, cur))
return true;
m_router->LookupRouter(m_ExitRouter, nullptr);
return false;
}
return path::Builder::SelectHop(db, exclude, cur, hop, roles);
} }
bool bool
@ -306,9 +286,8 @@ namespace llarp
if (numHops == 1) if (numHops == 1)
{ {
auto r = m_router; auto r = m_router;
RouterContact rc; if (const auto maybe = r->nodedb()->Get(m_ExitRouter); maybe.has_value())
if (r->nodedb()->Get(m_ExitRouter, rc)) r->TryConnectAsync(*maybe, 5);
r->TryConnectAsync(rc, 5);
else else
r->LookupRouter(m_ExitRouter, [r](const std::vector<RouterContact>& results) { r->LookupRouter(m_ExitRouter, [r](const std::vector<RouterContact>& results) {
if (results.size()) if (results.size())

@ -67,13 +67,8 @@ namespace llarp
bool bool
CheckPathDead(path::Path_ptr p, llarp_time_t dlt); CheckPathDead(path::Path_ptr p, llarp_time_t dlt);
bool std::optional<std::vector<RouterContact>>
SelectHop( GetHopsForBuild() override;
llarp_nodedb* db,
const std::set<RouterID>& prev,
RouterContact& cur,
size_t hop,
llarp::path::PathRole roles) override;
bool bool
ShouldBuildMore(llarp_time_t now) const override; ShouldBuildMore(llarp_time_t now) const override;

@ -305,20 +305,6 @@ namespace llarp
self->hop->info.upstream, self->hop->ExpireTime() + 10s); self->hop->info.upstream, self->hop->ExpireTime() + 10s);
// put hop // put hop
self->context->PutTransitHop(self->hop); self->context->PutTransitHop(self->hop);
// if we have an rc for this hop...
if (self->record.nextRC)
{
// ... and it matches the next hop ...
if (self->record.nextHop == self->record.nextRC->pubkey)
{
// ... and it's valid
const auto now = self->context->Router()->Now();
if (self->record.nextRC->IsPublicRouter() && self->record.nextRC->Verify(now))
{
self->context->Router()->nodedb()->UpdateAsyncIfNewer(*self->record.nextRC.get());
}
}
}
// forward to next hop // forward to next hop
using std::placeholders::_1; using std::placeholders::_1;
auto func = std::bind( auto func = std::bind(

@ -408,10 +408,10 @@ namespace llarp::net
if (parts[1].find_first_not_of('0') == std::string::npos and parts[0] != ifname) if (parts[1].find_first_not_of('0') == std::string::npos and parts[0] != ifname)
{ {
const auto& ip = parts[2]; const auto& ip = parts[2];
if ((ip.size() == sizeof(uint32_t) * 2) and lokimq::is_hex(ip)) if ((ip.size() == sizeof(uint32_t) * 2) and oxenmq::is_hex(ip))
{ {
huint32_t x{}; huint32_t x{};
lokimq::from_hex(ip.begin(), ip.end(), reinterpret_cast<char*>(&x.h)); oxenmq::from_hex(ip.begin(), ip.end(), reinterpret_cast<char*>(&x.h));
gateways.emplace_back(x.ToString()); gateways.emplace_back(x.ToString());
} }
} }

@ -19,490 +19,255 @@
static const char skiplist_subdirs[] = "0123456789abcdef"; static const char skiplist_subdirs[] = "0123456789abcdef";
static const std::string RC_FILE_EXT = ".signed"; static const std::string RC_FILE_EXT = ".signed";
llarp_nodedb::NetDBEntry::NetDBEntry(llarp::RouterContact value) namespace llarp
: rc(std::move(value)), inserted(llarp::time_now_ms())
{}
bool
llarp_nodedb::Remove(const llarp::RouterID& pk)
{
bool removed = false;
RemoveIf([&](const llarp::RouterContact& rc) -> bool {
if (rc.pubkey == pk)
{
removed = true;
return true;
}
return false;
});
return removed;
}
void
llarp_nodedb::Clear()
{ {
llarp::util::Lock lock(access); NodeDB::Entry::Entry(RouterContact value) : rc(std::move(value)), insertedAt(llarp::time_now_ms())
entries.clear(); {}
}
bool static void
llarp_nodedb::Get(const llarp::RouterID& pk, llarp::RouterContact& result) EnsureSkiplist(fs::path nodedbDir)
{
llarp::util::Lock l(access);
auto itr = entries.find(pk);
if (itr == entries.end())
return false;
result = itr->second.rc;
return true;
}
void
llarp_nodedb::RemoveIf(std::function<bool(const llarp::RouterContact& rc)> filter)
{
std::set<std::string> files;
{ {
llarp::util::Lock l(access); if (not fs::exists(nodedbDir))
auto itr = entries.begin();
while (itr != entries.end())
{ {
if (filter(itr->second.rc)) // if the old 'netdb' directory exists, move it to this one
{ fs::path parent = nodedbDir.parent_path();
files.insert(getRCFilePath(itr->second.rc.pubkey)); fs::path old = parent / "netdb";
itr = entries.erase(itr); if (fs::exists(old))
} fs::rename(old, nodedbDir);
else else
++itr; fs::create_directory(nodedbDir);
} }
}
disk([files = std::move(files)]() {
for (const auto& file : files)
fs::remove(file);
});
}
bool
llarp_nodedb::Has(const llarp::RouterID& pk)
{
llarp::util::Lock lock(access);
return entries.find(pk) != entries.end();
}
llarp::RouterContact
llarp_nodedb::FindClosestTo(const llarp::dht::Key_t& location)
{
llarp::RouterContact rc;
const llarp::dht::XorMetric compare(location);
visit([&rc, compare](const auto& otherRC) -> bool {
if (rc.pubkey.IsZero())
{
rc = otherRC;
return true;
}
if (compare(
llarp::dht::Key_t{otherRC.pubkey.as_array()}, llarp::dht::Key_t{rc.pubkey.as_array()}))
rc = otherRC;
return true;
});
return rc;
}
std::vector<llarp::RouterContact>
llarp_nodedb::FindClosestTo(const llarp::dht::Key_t& location, uint32_t numRouters)
{
llarp::util::Lock lock(access);
std::vector<const llarp::RouterContact*> all;
all.reserve(entries.size()); if (not fs::is_directory(nodedbDir))
for (auto& entry : entries) throw std::runtime_error(llarp::stringify("nodedb ", nodedbDir, " is not a directory"));
{
all.push_back(&entry.second.rc);
}
auto it_mid = numRouters < all.size() ? all.begin() + numRouters : all.end(); for (const char& ch : skiplist_subdirs)
std::partial_sort(
all.begin(),
it_mid,
all.end(),
[compare = llarp::dht::XorMetric{location}](auto* a, auto* b) { return compare(*a, *b); });
std::vector<llarp::RouterContact> closest;
closest.reserve(numRouters);
for (auto it = all.begin(); it != it_mid; ++it)
closest.push_back(**it);
return closest;
}
/// skiplist directory is hex encoded first nibble
/// skiplist filename is <base32encoded>.snode.signed
std::string
llarp_nodedb::getRCFilePath(const llarp::RouterID& pubkey) const
{
std::string hexString = lokimq::to_hex(pubkey.begin(), pubkey.end());
std::string skiplistDir;
llarp::RouterID r(pubkey);
std::string fname = r.ToString();
skiplistDir += hexString[0];
fname += RC_FILE_EXT;
fs::path filepath = nodePath / skiplistDir / fname;
return filepath.string();
}
void
llarp_nodedb::InsertAsync(
llarp::RouterContact rc,
std::shared_ptr<llarp::Logic> logic,
std::function<void(void)> completionHandler)
{
disk([this, rc, logic, completionHandler]() {
this->Insert(rc);
if (logic && completionHandler)
{ {
LogicCall(logic, completionHandler); // this seems to be a problem on all targets
// perhaps cpp17::fs is just as screwed-up
// attempting to create a folder with no name
// what does this mean...?
if (!ch)
continue;
fs::path sub = nodedbDir / std::string(&ch, 1);
fs::create_directory(sub);
} }
});
}
bool
llarp_nodedb::UpdateAsyncIfNewer(
llarp::RouterContact rc,
std::shared_ptr<llarp::Logic> logic,
std::function<void(void)> completionHandler)
{
llarp::util::Lock lock(access);
auto itr = entries.find(rc.pubkey);
if (itr == entries.end() || itr->second.rc.OtherIsNewer(rc))
{
InsertAsync(rc, logic, completionHandler);
return true;
}
if (itr != entries.end())
{
// insertion time is set on...insertion. But it should be updated here
// even if there is no insertion of a new RC, to show that the existing one
// is not "stale"
itr->second.inserted = llarp::time_now_ms();
} }
return false;
}
/// insert constexpr auto FlushInterval = 5min;
bool
llarp_nodedb::Insert(const llarp::RouterContact& rc)
{
llarp::util::Lock lock(access);
auto itr = entries.find(rc.pubkey.as_array());
if (itr != entries.end())
entries.erase(itr);
entries.emplace(rc.pubkey.as_array(), rc);
LogDebug(
"Added or updated RC for ",
llarp::RouterID(rc.pubkey),
" to nodedb. Current nodedb count is: ",
entries.size());
return true;
}
ssize_t
llarp_nodedb::Load(const fs::path& path)
{
std::error_code ec;
if (!fs::exists(path, ec))
{
return -1;
}
ssize_t loaded = 0;
for (const char& ch : skiplist_subdirs) NodeDB::NodeDB(fs::path root, std::function<void(std::function<void()>)> diskCaller)
: m_Root{std::move(root)}
, disk(std::move(diskCaller))
, m_NextFlushAt{time_now_ms() + FlushInterval}
{ {
if (!ch) EnsureSkiplist(m_Root);
continue;
std::string p;
p += ch;
fs::path sub = path / p;
ssize_t l = loadSubdir(sub);
if (l > 0)
loaded += l;
} }
m_NextSaveToDisk = llarp::time_now_ms() + m_SaveInterval;
return loaded;
}
void void
llarp_nodedb::SaveAll() NodeDB::Tick(llarp_time_t now)
{
std::array<byte_t, MAX_RC_SIZE> tmp;
llarp::util::Lock lock(access);
for (const auto& item : entries)
{ {
llarp_buffer_t buf(tmp); if (now > m_NextFlushAt)
{
if (!item.second.rc.BEncode(&buf)) m_NextFlushAt += FlushInterval;
continue; // make copy of all rcs
std::vector<RouterContact> copy;
buf.sz = buf.cur - buf.base; for (const auto& item : m_Entries)
const auto filepath = getRCFilePath(item.second.rc.pubkey); copy.push_back(item.second.rc);
auto optional_ofs = llarp::util::OpenFileStream<std::ofstream>( // flush them to disk in one big job
filepath, std::ofstream::out | std::ofstream::binary | std::ofstream::trunc); // TODO: split this up? idk maybe some day...
if (!optional_ofs) disk([this, data = std::move(copy)]() {
continue; for (const auto& rc : data)
auto& ofs = *optional_ofs; {
ofs.write((char*)buf.base, buf.sz); rc.Write(GetPathForPubkey(rc.pubkey));
ofs.flush(); }
ofs.close(); });
}
} }
}
bool fs::path
llarp_nodedb::ShouldSaveToDisk(llarp_time_t now) const NodeDB::GetPathForPubkey(RouterID pubkey) const
{ {
if (now == 0s) std::string hexString = oxenmq::to_hex(pubkey.begin(), pubkey.end());
now = llarp::time_now_ms(); std::string skiplistDir;
return m_NextSaveToDisk > 0s && m_NextSaveToDisk <= now;
}
void const llarp::RouterID r{pubkey};
llarp_nodedb::AsyncFlushToDisk() std::string fname = r.ToString();
{
disk([this]() { SaveAll(); });
m_NextSaveToDisk = llarp::time_now_ms() + m_SaveInterval;
}
ssize_t skiplistDir += hexString[0];
llarp_nodedb::loadSubdir(const fs::path& dir) fname += RC_FILE_EXT;
{ return m_Root / skiplistDir / fname;
ssize_t sz = 0;
llarp::util::IterDir(dir, [&](const fs::path& f) -> bool {
if (fs::is_regular_file(f) && loadfile(f))
sz++;
return true;
});
return sz;
}
bool
llarp_nodedb::loadfile(const fs::path& fpath)
{
if (fpath.extension() != RC_FILE_EXT)
return false;
llarp::RouterContact rc;
if (!rc.Read(fpath))
{
llarp::LogError("failed to read file ", fpath);
return false;
} }
if (!rc.Verify(llarp::time_now_ms()))
void
NodeDB::LoadFromDisk()
{ {
llarp::LogError(fpath, " contains invalid RC"); for (const char& ch : skiplist_subdirs)
return false; {
if (!ch)
continue;
std::string p;
p += ch;
fs::path sub = m_Root / p;
llarp::util::IterDir(sub, [&](const fs::path& f) -> bool {
if (fs::is_regular_file(f) and f.extension() == RC_FILE_EXT)
{
RouterContact rc{};
if (rc.Read(f) and rc.Verify(time_now_ms()))
m_Entries.emplace(rc.pubkey, rc);
}
return true;
});
}
} }
void
NodeDB::SaveToDisk() const
{ {
llarp::util::Lock lock(access); for (const auto& item : m_Entries)
entries.emplace(rc.pubkey.as_array(), rc); {
item.second.rc.Write(GetPathForPubkey(item.first));
}
} }
return true;
}
void bool
llarp_nodedb::visit(std::function<bool(const llarp::RouterContact&)> visit) NodeDB::Has(RouterID pk) const
{
llarp::util::Lock lock(access);
auto itr = entries.begin();
while (itr != entries.end())
{ {
if (!visit(itr->second.rc)) util::NullLock lock{m_Access};
return; return m_Entries.find(pk) != m_Entries.end();
++itr;
} }
}
void std::optional<RouterContact>
llarp_nodedb::VisitInsertedBefore( NodeDB::Get(RouterID pk) const
std::function<void(const llarp::RouterContact&)> visit, llarp_time_t insertedAfter)
{
llarp::util::Lock lock(access);
auto itr = entries.begin();
while (itr != entries.end())
{ {
if (itr->second.inserted < insertedAfter) util::NullLock lock{m_Access};
visit(itr->second.rc); const auto itr = m_Entries.find(pk);
++itr; if (itr == m_Entries.end())
return std::nullopt;
return itr->second.rc;
} }
}
void void
llarp_nodedb::RemoveStaleRCs(const std::set<llarp::RouterID>& keep, llarp_time_t cutoff) NodeDB::Remove(RouterID pk)
{
std::set<llarp::RouterID> removeStale;
// remove stale routers
VisitInsertedBefore(
[&](const llarp::RouterContact& rc) {
if (keep.find(rc.pubkey) != keep.end())
return;
LogInfo("removing stale router: ", llarp::RouterID(rc.pubkey));
removeStale.insert(rc.pubkey);
},
cutoff);
RemoveIf([&removeStale](const llarp::RouterContact& rc) -> bool {
return removeStale.count(rc.pubkey) > 0;
});
}
// write it to disk
void
disk_threadworker_setRC(llarp_async_verify_rc* verify_request)
{
verify_request->valid = verify_request->nodedb->Insert(verify_request->rc);
if (verify_request->logic)
{ {
LogicCall(verify_request->logic, [verify_request]() { util::NullLock lock{m_Access};
if (verify_request->hook) m_Entries.erase(pk);
verify_request->hook(verify_request); AsyncRemoveManyFromDisk({pk});
});
} }
}
// we run the crypto verify in the crypto threadpool worker void
void NodeDB::RemoveStaleRCs(std::unordered_set<RouterID> keep, llarp_time_t cutoff)
crypto_threadworker_verifyrc(llarp_async_verify_rc* verify_request)
{
llarp::RouterContact rc = verify_request->rc;
verify_request->valid = rc.Verify(llarp::time_now_ms());
// if it's valid we need to set it
if (verify_request->valid && rc.IsPublicRouter())
{ {
if (verify_request->disk) util::NullLock lock{m_Access};
std::unordered_set<RouterID> removed;
auto itr = m_Entries.begin();
while (itr != m_Entries.end())
{ {
llarp::LogDebug("RC is valid, saving to disk"); if (itr->second.insertedAt < cutoff and keep.count(itr->second.rc.pubkey) == 0)
verify_request->disk(std::bind(&disk_threadworker_setRC, verify_request)); {
return; removed.insert(itr->second.rc.pubkey);
itr = m_Entries.erase(itr);
}
else
++itr;
} }
if (not removed.empty())
AsyncRemoveManyFromDisk(std::move(removed));
} }
// callback to logic thread
LogicCall(verify_request->logic, [verify_request]() {
if (verify_request->hook)
verify_request->hook(verify_request);
});
}
void
llarp_nodedb_async_verify(struct llarp_async_verify_rc* job)
{
job->worker(std::bind(&crypto_threadworker_verifyrc, job));
}
void void
llarp_nodedb::ensure_dir(const fs::path& nodedbDir) NodeDB::Put(RouterContact rc)
{
if (not fs::exists(nodedbDir))
{ {
// if the old 'netdb' directory exists, move it to this one util::NullLock lock{m_Access};
fs::path parent = nodedbDir.parent_path(); m_Entries.erase(rc.pubkey);
fs::path old = parent / "netdb"; m_Entries.emplace(rc.pubkey, rc);
if (fs::exists(old))
fs::rename(old, nodedbDir);
else
fs::create_directory(nodedbDir);
} }
if (not fs::is_directory(nodedbDir)) size_t
throw std::runtime_error(llarp::stringify("nodedb ", nodedbDir, " is not a directory")); NodeDB::NumLoaded() const
for (const char& ch : skiplist_subdirs)
{ {
// this seems to be a problem on all targets util::NullLock lock{m_Access};
// perhaps cpp17::fs is just as screwed-up return m_Entries.size();
// attempting to create a folder with no name
// what does this mean...?
if (!ch)
continue;
fs::path sub = nodedbDir / std::string(&ch, 1);
fs::create_directory(sub);
} }
}
ssize_t
llarp_nodedb::LoadAll()
{
return Load(nodePath.c_str());
}
size_t
llarp_nodedb::num_loaded() const
{
llarp::util::Lock l{access};
return entries.size();
}
bool void
llarp_nodedb::select_random_exit(llarp::RouterContact& result) NodeDB::PutIfNewer(RouterContact rc)
{
llarp::util::Lock lock(access);
const auto sz = entries.size();
auto itr = entries.begin();
if (sz < 3)
return false;
auto idx = llarp::randint() % sz;
if (idx)
std::advance(itr, idx - 1);
while (itr != entries.end())
{ {
if (itr->second.rc.IsExit()) util::NullLock lock{m_Access};
auto itr = m_Entries.find(rc.pubkey);
if (itr == m_Entries.end() or itr->second.rc.OtherIsNewer(rc))
{ {
result = itr->second.rc; // delete if existing
return true; if (itr != m_Entries.end())
m_Entries.erase(itr);
// add new entry
m_Entries.emplace(rc.pubkey, rc);
} }
++itr;
} }
// wrap around
itr = entries.begin(); void
while (idx--) NodeDB::AsyncRemoveManyFromDisk(std::unordered_set<RouterID> remove) const
{ {
if (itr->second.rc.IsExit()) // build file list
std::set<fs::path> files;
for (auto id : remove)
{ {
result = itr->second.rc; files.emplace(GetPathForPubkey(std::move(id)));
return true;
} }
++itr; // remove them from the disk via the diskio thread
disk([files]() {
for (auto fpath : files)
fs::remove(fpath);
});
} }
return false;
}
bool llarp::RouterContact
llarp_nodedb::select_random_hop_excluding( NodeDB::FindClosestTo(llarp::dht::Key_t location) const
llarp::RouterContact& result, const std::set<llarp::RouterID>& exclude)
{
llarp::util::Lock lock(access);
/// checking for "guard" status for N = 0 is done by caller inside of
/// pathbuilder's scope
const size_t sz = entries.size();
if (sz < 3)
{ {
return false; util::NullLock lock{m_Access};
llarp::RouterContact rc;
const llarp::dht::XorMetric compare(location);
VisitAll([&rc, compare](const auto& otherRC) {
if (rc.pubkey.IsZero())
{
rc = otherRC;
return;
}
if (compare(
llarp::dht::Key_t{otherRC.pubkey.as_array()},
llarp::dht::Key_t{rc.pubkey.as_array()}))
rc = otherRC;
});
return rc;
} }
const size_t pos = llarp::randint() % sz; std::vector<RouterContact>
const auto start = std::next(entries.begin(), pos); NodeDB::FindManyClosestTo(llarp::dht::Key_t location, uint32_t numRouters) const
for (auto itr = start; itr != entries.end(); ++itr)
{ {
if (exclude.count(itr->first) == 0 and itr->second.rc.IsPublicRouter()) util::NullLock lock{m_Access};
{ std::vector<const RouterContact*> all;
result = itr->second.rc;
return true; const auto& entries = m_Entries;
}
} all.reserve(entries.size());
for (auto itr = entries.begin(); itr != start; ++itr) for (auto& entry : entries)
{
if (exclude.count(itr->first) == 0 and itr->second.rc.IsPublicRouter())
{ {
result = itr->second.rc; all.push_back(&entry.second.rc);
return true;
} }
auto it_mid = numRouters < all.size() ? all.begin() + numRouters : all.end();
std::partial_sort(
all.begin(), it_mid, all.end(), [compare = dht::XorMetric{location}](auto* a, auto* b) {
return compare(*a, *b);
});
std::vector<RouterContact> closest;
closest.reserve(numRouters);
for (auto it = all.begin(); it != it_mid; ++it)
closest.push_back(**it);
return closest;
} }
return false; } // namespace llarp
}

@ -8,221 +8,168 @@
#include <util/thread/threading.hpp> #include <util/thread/threading.hpp>
#include <util/thread/annotations.hpp> #include <util/thread/annotations.hpp>
#include <dht/key.hpp> #include <dht/key.hpp>
#include <crypto/crypto.hpp>
#include <set> #include <set>
#include <optional>
#include <unordered_set>
#include <unordered_map>
#include <utility> #include <utility>
#include <atomic>
/**
* nodedb.hpp
*
* persistent storage API for router contacts
*/
namespace llarp namespace llarp
{ {
class Logic; class Logic;
} // namespace llarp
struct llarp_nodedb
{
using DiskJob_t = std::function<void(void)>;
using DiskCaller_t = std::function<void(DiskJob_t)>;
using WorkJob_t = std::function<void(void)>;
using WorkCaller_t = std::function<void(WorkJob_t)>;
explicit llarp_nodedb(const std::string rootdir, DiskCaller_t diskCaller) class NodeDB
: disk(std::move(diskCaller)), nodePath(rootdir)
{}
~llarp_nodedb()
{ {
Clear(); struct Entry
} {
const RouterContact rc;
const DiskCaller_t disk; llarp_time_t insertedAt;
mutable llarp::util::Mutex access; // protects entries explicit Entry(RouterContact rc);
/// time for next save to disk event, 0 if never happened };
llarp_time_t m_NextSaveToDisk = 0s; using NodeMap = std::unordered_map<RouterID, Entry>;
/// how often to save to disk
const llarp_time_t m_SaveInterval = 5min; NodeMap m_Entries;
struct NetDBEntry const fs::path m_Root;
{
const llarp::RouterContact rc; const std::function<void(std::function<void()>)> disk;
llarp_time_t inserted;
llarp_time_t m_NextFlushAt;
NetDBEntry(llarp::RouterContact data);
mutable util::NullMutex m_Access;
/// asynchronously remove the files for a set of rcs on disk given their public ident key
void
AsyncRemoveManyFromDisk(std::unordered_set<RouterID> idents) const;
/// get filename of an RC file given its public ident key
fs::path
GetPathForPubkey(RouterID pk) const;
public:
explicit NodeDB(fs::path rootdir, std::function<void(std::function<void()>)> diskCaller);
/// load all entries from disk syncrhonously
void
LoadFromDisk();
/// explicit save all RCs to disk synchronously
void
SaveToDisk() const;
/// the number of RCs that are loaded from disk
size_t
NumLoaded() const;
/// do periodic tasks like flush to disk and expiration
void
Tick(llarp_time_t now);
/// find the absolute closets router to a dht location
RouterContact
FindClosestTo(dht::Key_t location) const;
/// find many routers closest to dht key
std::vector<RouterContact>
FindManyClosestTo(dht::Key_t location, uint32_t numRouters) const;
/// return true if we have an rc by its ident pubkey
bool
Has(RouterID pk) const;
/// maybe get an rc by its ident pubkey
std::optional<RouterContact>
Get(RouterID pk) const;
template <typename Filter>
std::optional<RouterContact>
GetRandom(Filter visit) const
{
util::NullLock lock{m_Access};
const auto sz = m_Entries.size();
if (sz < 3)
return std::nullopt;
const auto begin = m_Entries.begin();
const auto middle = std::next(m_Entries.begin(), randint() % sz);
for (auto itr = middle; itr != m_Entries.end(); ++itr)
{
if (visit(itr->second.rc))
return itr->second.rc;
}
for (auto itr = begin; itr != middle; ++itr)
{
if (visit(itr->second.rc))
return itr->second.rc;
}
return std::nullopt;
}
/// visit all entries
template <typename Visit>
void
VisitAll(Visit visit) const
{
util::NullLock lock{m_Access};
for (const auto& item : m_Entries)
{
visit(item.second.rc);
}
}
/// visit all entries inserted before a timestamp
template <typename Visit>
void
VisitInsertedBefore(Visit visit, llarp_time_t insertedBefore)
{
util::NullLock lock{m_Access};
for (const auto& item : m_Entries)
{
if (item.second.insertedAt < insertedBefore)
visit(item.second.rc);
}
}
/// remove an entry via its ident pubkey
void
Remove(RouterID pk);
/// remove an entry given a filter that inspects the rc
template <typename Filter>
void
RemoveIf(Filter visit)
{
util::NullLock lock{m_Access};
std::unordered_set<RouterID> removed;
auto itr = m_Entries.begin();
while (itr != m_Entries.end())
{
if (visit(itr->second.rc))
{
removed.insert(itr->second.rc.pubkey);
itr = m_Entries.erase(itr);
}
else
++itr;
}
if (not removed.empty())
AsyncRemoveManyFromDisk(std::move(removed));
}
/// remove rcs that are not in keep and have been inserted before cutoff
void
RemoveStaleRCs(std::unordered_set<RouterID> keep, llarp_time_t cutoff);
/// put this rc into the cache if it is not there or newer than the one there already
void
PutIfNewer(RouterContact rc);
/// unconditional put of rc into cache
void
Put(RouterContact rc);
}; };
} // namespace llarp
using NetDBMap_t = std::unordered_map<llarp::RouterID, NetDBEntry, llarp::RouterID::Hash>;
NetDBMap_t entries GUARDED_BY(access);
fs::path nodePath;
llarp::RouterContact
FindClosestTo(const llarp::dht::Key_t& location);
/// find the $numRouters closest routers to the given DHT key
std::vector<llarp::RouterContact>
FindClosestTo(const llarp::dht::Key_t& location, uint32_t numRouters);
/// return true if we should save our nodedb to disk
bool
ShouldSaveToDisk(llarp_time_t now = 0s) const;
bool
Remove(const llarp::RouterID& pk) EXCLUDES(access);
void
RemoveIf(std::function<bool(const llarp::RouterContact&)> filter) EXCLUDES(access);
void
Clear() EXCLUDES(access);
bool
Get(const llarp::RouterID& pk, llarp::RouterContact& result) EXCLUDES(access);
bool
Has(const llarp::RouterID& pk) EXCLUDES(access);
std::string
getRCFilePath(const llarp::RouterID& pubkey) const;
/// insert without writing to disk
bool
Insert(const llarp::RouterContact& rc) EXCLUDES(access);
/// invokes Insert() asynchronously with an optional completion
/// callback
void
InsertAsync(
llarp::RouterContact rc,
std::shared_ptr<llarp::Logic> l = nullptr,
std::function<void(void)> completionHandler = nullptr);
/// update rc if newer
/// return true if we started to put this rc in the database
/// retur false if not newer
bool
UpdateAsyncIfNewer(
llarp::RouterContact rc,
std::shared_ptr<llarp::Logic> l = nullptr,
std::function<void(void)> completionHandler = nullptr) EXCLUDES(access);
ssize_t
Load(const fs::path& path);
ssize_t
loadSubdir(const fs::path& dir);
/// save all entries to disk async
void
AsyncFlushToDisk();
bool
loadfile(const fs::path& fpath) EXCLUDES(access);
void
visit(std::function<bool(const llarp::RouterContact&)> visit) EXCLUDES(access);
void
set_dir(const char* dir);
ssize_t
LoadAll();
ssize_t
store_dir(const char* dir);
/// visit all entries inserted into nodedb cache before a timestamp
void
VisitInsertedBefore(
std::function<void(const llarp::RouterContact&)> visit, llarp_time_t insertedAfter)
EXCLUDES(access);
void
RemoveStaleRCs(const std::set<llarp::RouterID>& keep, llarp_time_t cutoff);
size_t
num_loaded() const EXCLUDES(access);
bool
select_random_exit(llarp::RouterContact& rc) EXCLUDES(access);
bool
select_random_hop_excluding(
llarp::RouterContact& result, const std::set<llarp::RouterID>& exclude) EXCLUDES(access);
/// Ensures that the given nodedb 'dir' exists
///
/// @param nodedbDir should be the desired nodedb directory
/// @throws on any filesistem error or if `nodedbDir` exists and is not a directory
static void
ensure_dir(const fs::path& nodedbDir);
void
SaveAll() EXCLUDES(access);
};
/// struct for async rc verification
struct llarp_async_verify_rc;
using llarp_async_verify_rc_hook_func = std::function<void(struct llarp_async_verify_rc*)>;
/// verify rc request
struct llarp_async_verify_rc
{
/// async_verify_context
void* user;
/// nodedb storage
llarp_nodedb* nodedb;
// llarp::Logic for queue_job
std::shared_ptr<llarp::Logic> logic;
llarp_nodedb::WorkCaller_t worker;
llarp_nodedb::DiskCaller_t disk;
/// router contact
llarp::RouterContact rc;
/// result
bool valid;
/// hook
llarp_async_verify_rc_hook_func hook;
};
/**
struct for async rc verification
data is loaded in disk io threadpool
crypto is done on the crypto worker threadpool
result is called on the logic thread
*/
void
llarp_nodedb_async_verify(struct llarp_async_verify_rc* job);
struct llarp_async_load_rc;
using llarp_async_load_rc_hook_func = std::function<void(struct llarp_async_load_rc*)>;
struct llarp_async_load_rc
{
/// async_verify_context
void* user;
/// nodedb storage
llarp_nodedb* nodedb;
/// llarp::Logic for calling hook
llarp::Logic* logic;
/// disk worker threadpool
llarp_nodedb::DiskCaller_t disk;
/// target pubkey
llarp::PubKey pubkey;
/// router contact result
llarp::RouterContact result;
/// set to true if we loaded the rc
bool loaded;
/// hook function called in logic thread
llarp_async_load_rc_hook_func hook;
};
/// asynchronously load an rc from disk
void
llarp_nodedb_async_load_rc(struct llarp_async_load_rc* job);
#endif #endif

@ -205,56 +205,37 @@ namespace llarp
return obj; return obj;
} }
bool std::optional<RouterContact>
Builder::SelectHop( Builder::SelectFirstHop() const
llarp_nodedb* db,
const std::set<RouterID>& exclude,
RouterContact& cur,
size_t hop,
PathRole roles)
{ {
(void)roles; std::optional<RouterContact> found = std::nullopt;
size_t tries = 10; m_router->ForEachPeer(
if (hop == 0) [&](const ILinkSession* s, bool isOutbound) {
{ if (s && s->IsEstablished() && isOutbound && not found.has_value())
if (m_router->NumberOfConnectedRouters() == 0) {
{ const RouterContact rc = s->GetRemoteRC();
return false; #ifndef TESTNET
} if (m_router->IsBootstrapNode(rc.pubkey))
bool got = false; return;
m_router->ForEachPeer(
[&](const ILinkSession* s, bool isOutbound) {
if (s && s->IsEstablished() && isOutbound && !got)
{
const RouterContact rc = s->GetRemoteRC();
#ifdef TESTNET
if (got || exclude.count(rc.pubkey))
#else
if (got || exclude.count(rc.pubkey) || m_router->IsBootstrapNode(rc.pubkey))
#endif #endif
return; found = rc;
cur = rc; }
got = true; },
} true);
}, return found;
true); }
return got;
}
do std::optional<std::vector<RouterContact>>
Builder::GetHopsForBuild()
{
auto filter = [r = m_router](const auto& rc) -> bool {
return not r->routerProfiling().IsBadForPath(rc.pubkey);
};
if (const auto maybe = m_router->nodedb()->GetRandom(filter))
{ {
cur.Clear(); return GetHopsAlignedToForBuild(maybe->pubkey);
--tries; }
std::set<RouterID> excluding = exclude; return std::nullopt;
if (db->select_random_hop_excluding(cur, excluding))
{
excluding.insert(cur.pubkey);
if (!m_router->routerProfiling().IsBadForPath(cur.pubkey))
return true;
}
} while (tries > 0);
return false;
} }
bool bool
@ -301,9 +282,8 @@ namespace llarp
void void
Builder::BuildOne(PathRole roles) Builder::BuildOne(PathRole roles)
{ {
std::vector<RouterContact> hops(numHops); if (const auto maybe = GetHopsForBuild(); maybe.has_value())
if (SelectHops(m_router->nodedb(), hops, roles)) Build(*maybe, roles);
Build(hops, roles);
} }
bool Builder::UrgentBuild(llarp_time_t) const bool Builder::UrgentBuild(llarp_time_t) const
@ -311,114 +291,60 @@ namespace llarp
return buildIntervalLimit > MIN_PATH_BUILD_INTERVAL * 4; return buildIntervalLimit > MIN_PATH_BUILD_INTERVAL * 4;
} }
bool std::optional<std::vector<RouterContact>>
Builder::DoUrgentBuildAlignedTo(const RouterID remote, std::vector<RouterContact>& hops) Builder::GetHopsAlignedToForBuild(RouterID endpoint)
{ {
const auto aligned = m_router->pathContext().FindOwnedPathsWithEndpoint(remote); std::vector<RouterContact> hops;
/// pick the lowest latency path that aligns to remote
/// note: peer exhaustion is made worse happen here
Path_ptr p;
llarp_time_t min = std::numeric_limits<llarp_time_t>::max();
for (const auto& path : aligned)
{
if (path->intro.latency < min && path->hops.size() == numHops)
{
p = path;
min = path->intro.latency;
}
}
if (p)
{ {
for (const auto& hop : p->hops) const auto maybe = SelectFirstHop();
{ if (not maybe.has_value())
if (hop.rc.pubkey.IsZero()) return std::nullopt;
return false; hops.emplace_back(*maybe);
hops.emplace_back(hop.rc); };
} for (size_t idx = hops.size(); idx < numHops; ++idx)
}
return true;
}
bool
Builder::DoBuildAlignedTo(const RouterID remote, std::vector<RouterContact>& hops)
{
std::set<RouterID> routers{remote};
hops.resize(numHops);
auto nodedb = m_router->nodedb();
for (size_t idx = 0; idx < hops.size(); idx++)
{ {
hops[idx].Clear(); if (idx + 1 == numHops)
if (idx == numHops - 1)
{ {
// last hop const auto maybe = m_router->nodedb()->Get(endpoint);
if (!nodedb->Get(remote, hops[idx])) if (maybe.has_value())
{ {
m_router->LookupRouter(remote, nullptr); hops.emplace_back(*maybe);
return false;
} }
else
return std::nullopt;
} }
else else
{ {
if (!SelectHop(nodedb, routers, hops[idx], idx, path::ePathRoleAny)) const auto maybe = m_router->nodedb()->GetRandom(
{ [&hops, r = m_router, endpoint](const auto& rc) -> bool {
return false; if (r->routerProfiling().IsBadForPath(rc.pubkey))
} return false;
for (const auto& hop : hops)
{
if (hop.pubkey == rc.pubkey)
return false;
}
return rc.pubkey != endpoint;
});
if (not maybe.has_value())
return std::nullopt;
hops.emplace_back(*maybe);
} }
if (hops[idx].pubkey.IsZero())
return false;
routers.insert(hops[idx].pubkey);
} }
return hops;
return true;
} }
bool bool
Builder::BuildOneAlignedTo(const RouterID remote) Builder::BuildOneAlignedTo(const RouterID remote)
{ {
std::vector<RouterContact> hops; if (const auto maybe = GetHopsAlignedToForBuild(remote); maybe.has_value())
/// if we really need this path build it "dangerously"
if (UrgentBuild(m_router->Now()))
{ {
if (!DoUrgentBuildAlignedTo(remote, hops)) LogInfo(Name(), " building path to ", remote);
{ Build(*maybe);
return false; return true;
}
}
if (hops.empty())
{
if (!DoBuildAlignedTo(remote, hops))
{
return false;
}
} }
LogInfo(Name(), " building path to ", remote); return false;
Build(hops);
return true;
}
bool
Builder::SelectHops(llarp_nodedb* nodedb, std::vector<RouterContact>& hops, PathRole roles)
{
std::set<RouterID> exclude;
for (size_t idx = 0; idx < hops.size(); ++idx)
{
hops[idx].Clear();
size_t tries = 32;
while (tries > 0 && !SelectHop(nodedb, exclude, hops[idx], idx, roles))
{
--tries;
}
if (tries == 0 || hops[idx].pubkey.IsZero())
{
LogWarn(Name(), " failed to select hop ", idx);
return false;
}
exclude.emplace(hops[idx].pubkey);
}
return true;
} }
llarp_time_t llarp_time_t
@ -428,7 +354,7 @@ namespace llarp
} }
void void
Builder::Build(const std::vector<RouterContact>& hops, PathRole roles) Builder::Build(std::vector<RouterContact> hops, PathRole roles)
{ {
if (IsStopped()) if (IsStopped())
return; return;

@ -30,12 +30,6 @@ namespace llarp
void void
DoPathBuildBackoff(); DoPathBuildBackoff();
bool
DoUrgentBuildAlignedTo(const RouterID remote, std::vector<RouterContact>& hops);
bool
DoBuildAlignedTo(const RouterID remote, std::vector<RouterContact>& hops);
public: public:
AbstractRouter* m_router; AbstractRouter* m_router;
SecretKey enckey; SecretKey enckey;
@ -51,14 +45,6 @@ namespace llarp
util::StatusObject util::StatusObject
ExtractStatus() const; ExtractStatus() const;
bool
SelectHop(
llarp_nodedb* db,
const std::set<RouterID>& prev,
RouterContact& cur,
size_t hop,
PathRole roles) override;
bool bool
ShouldBuildMore(llarp_time_t now) const override; ShouldBuildMore(llarp_time_t now) const override;
@ -107,11 +93,18 @@ namespace llarp
bool bool
BuildOneAlignedTo(const RouterID endpoint) override; BuildOneAlignedTo(const RouterID endpoint) override;
virtual std::optional<std::vector<RouterContact>>
GetHopsAlignedToForBuild(RouterID endpoint);
void void
Build(const std::vector<RouterContact>& hops, PathRole roles = ePathRoleAny) override; Build(std::vector<RouterContact> hops, PathRole roles = ePathRoleAny) override;
bool /// pick a first hop
SelectHops(llarp_nodedb* db, std::vector<RouterContact>& hops, PathRole roles = ePathRoleAny); virtual std::optional<RouterContact>
SelectFirstHop() const;
std::optional<std::vector<RouterContact>>
GetHopsForBuild() override;
void void
ManualRebuild(size_t N, PathRole roles = ePathRoleAny); ManualRebuild(size_t N, PathRole roles = ePathRoleAny);

@ -14,11 +14,10 @@
#include <map> #include <map>
#include <tuple> #include <tuple>
struct llarp_nodedb;
namespace llarp namespace llarp
{ {
struct RouterContact; struct RouterContact;
class NodeDB;
namespace dht namespace dht
{ {
@ -109,7 +108,7 @@ namespace llarp
/// manual build on these hops /// manual build on these hops
virtual void virtual void
Build(const std::vector<RouterContact>& hops, PathRole roles = ePathRoleAny) = 0; Build(std::vector<RouterContact> hops, PathRole roles = ePathRoleAny) = 0;
/// tick owned paths /// tick owned paths
virtual void virtual void
@ -261,20 +260,15 @@ namespace llarp
virtual void virtual void
ResetInternalState() = 0; ResetInternalState() = 0;
virtual bool
SelectHop(
llarp_nodedb* db,
const std::set<RouterID>& prev,
RouterContact& cur,
size_t hop,
PathRole roles) = 0;
virtual bool virtual bool
BuildOneAlignedTo(const RouterID endpoint) = 0; BuildOneAlignedTo(const RouterID endpoint) = 0;
virtual void virtual void
SendPacketToRemote(const llarp_buffer_t& pkt) = 0; SendPacketToRemote(const llarp_buffer_t& pkt) = 0;
virtual std::optional<std::vector<RouterContact>>
GetHopsForBuild() = 0;
void void
ForEachPath(std::function<void(const Path_ptr&)> visit) const ForEachPath(std::function<void(const Path_ptr&)> visit) const
{ {

@ -1,7 +1,7 @@
#include <peerstats/types.hpp> #include <peerstats/types.hpp>
#include <util/str.hpp> #include <util/str.hpp>
#include <lokimq/bt_serialize.h> #include <oxenmq/bt_serialize.h>
#include <stdexcept> #include <stdexcept>
namespace llarp namespace llarp
@ -103,7 +103,7 @@ namespace llarp
{ {
if (not buf) if (not buf)
throw std::runtime_error("PeerStats: Can't use null buf"); throw std::runtime_error("PeerStats: Can't use null buf");
const lokimq::bt_dict data = { const oxenmq::bt_dict data = {
{NumConnectionAttemptsKey, numConnectionAttempts}, {NumConnectionAttemptsKey, numConnectionAttempts},
{NumConnectionSuccessesKey, numConnectionSuccesses}, {NumConnectionSuccessesKey, numConnectionSuccesses},
{NumConnectionRejectionsKey, numConnectionRejections}, {NumConnectionRejectionsKey, numConnectionRejections},
@ -120,7 +120,7 @@ namespace llarp
{LeastRCRemainingLifetimeKey, leastRCRemainingLifetime.count()}, {LeastRCRemainingLifetimeKey, leastRCRemainingLifetime.count()},
{LastRCUpdatedKey, lastRCUpdated.count()}, {LastRCUpdatedKey, lastRCUpdated.count()},
}; };
const auto serialized = lokimq::bt_serialize(data); const auto serialized = oxenmq::bt_serialize(data);
if (not buf->write(serialized.begin(), serialized.end())) if (not buf->write(serialized.begin(), serialized.end()))
throw std::runtime_error("PeerStats: buffer too small"); throw std::runtime_error("PeerStats: buffer too small");
} }

@ -20,16 +20,15 @@
struct llarp_buffer_t; struct llarp_buffer_t;
struct llarp_dht_context; struct llarp_dht_context;
struct llarp_nodedb;
struct llarp_threadpool;
namespace lokimq namespace oxenmq
{ {
class LokiMQ; class OxenMQ;
} }
namespace llarp namespace llarp
{ {
class NodeDB;
class Logic; class Logic;
struct Config; struct Config;
struct RouterID; struct RouterID;
@ -80,7 +79,7 @@ namespace llarp
class Platform; class Platform;
} }
using LMQ_ptr = std::shared_ptr<lokimq::LokiMQ>; using LMQ_ptr = std::shared_ptr<oxenmq::OxenMQ>;
struct AbstractRouter struct AbstractRouter
{ {
@ -108,7 +107,7 @@ namespace llarp
virtual llarp_dht_context* virtual llarp_dht_context*
dht() const = 0; dht() const = 0;
virtual llarp_nodedb* virtual std::shared_ptr<NodeDB>
nodedb() const = 0; nodedb() const = 0;
virtual const path::PathContext& virtual const path::PathContext&
@ -178,7 +177,7 @@ namespace llarp
Sign(Signature& sig, const llarp_buffer_t& buf) const = 0; Sign(Signature& sig, const llarp_buffer_t& buf) const = 0;
virtual bool virtual bool
Configure(std::shared_ptr<Config> conf, bool isRouter, llarp_nodedb* nodedb) = 0; Configure(std::shared_ptr<Config> conf, bool isRouter, std::shared_ptr<NodeDB> nodedb) = 0;
virtual bool virtual bool
IsServiceNode() const = 0; IsServiceNode() const = 0;

@ -123,8 +123,14 @@ namespace llarp
std::set<RouterID> exclude; std::set<RouterID> exclude;
do do
{ {
auto filter = [exclude](const auto& rc) -> bool { return exclude.count(rc.pubkey) == 0; };
RouterContact other; RouterContact other;
if (not _nodedb->select_random_hop_excluding(other, exclude)) if (const auto maybe = _nodedb->GetRandom(filter))
{
other = *maybe;
}
else
break; break;
exclude.insert(other.pubkey); exclude.insert(other.pubkey);
@ -158,14 +164,13 @@ namespace llarp
I_RCLookupHandler* rcLookup, I_RCLookupHandler* rcLookup,
Profiling* profiler, Profiling* profiler,
std::shared_ptr<Logic> logic, std::shared_ptr<Logic> logic,
llarp_nodedb* nodedb,
WorkerFunc_t dowork) WorkerFunc_t dowork)
{ {
_router = router; _router = router;
_linkManager = linkManager; _linkManager = linkManager;
_rcLookup = rcLookup; _rcLookup = rcLookup;
_logic = logic; _logic = logic;
_nodedb = nodedb; _nodedb = router->nodedb();
_profiler = profiler; _profiler = profiler;
work = dowork; work = dowork;
} }

@ -13,8 +13,6 @@
#include <list> #include <list>
#include <memory> #include <memory>
struct llarp_nodedb;
namespace llarp namespace llarp
{ {
struct PendingSession; struct PendingSession;
@ -63,7 +61,6 @@ namespace llarp
I_RCLookupHandler* rcLookup, I_RCLookupHandler* rcLookup,
Profiling* profiler, Profiling* profiler,
std::shared_ptr<Logic> logic, std::shared_ptr<Logic> logic,
llarp_nodedb* nodedb,
WorkerFunc_t work); WorkerFunc_t work);
void void
@ -115,7 +112,7 @@ namespace llarp
ILinkManager* _linkManager = nullptr; ILinkManager* _linkManager = nullptr;
I_RCLookupHandler* _rcLookup = nullptr; I_RCLookupHandler* _rcLookup = nullptr;
Profiling* _profiler = nullptr; Profiling* _profiler = nullptr;
llarp_nodedb* _nodedb = nullptr; std::shared_ptr<NodeDB> _nodedb;
std::shared_ptr<Logic> _logic; std::shared_ptr<Logic> _logic;
WorkerFunc_t work; WorkerFunc_t work;
RouterID us; RouterID us;

@ -61,8 +61,9 @@ namespace llarp
RouterContact remoteRC; RouterContact remoteRC;
if (not forceLookup) if (not forceLookup)
{ {
if (_nodedb->Get(router, remoteRC)) if (const auto maybe = _nodedb->Get(router); maybe.has_value())
{ {
remoteRC = *maybe;
if (callback) if (callback)
{ {
callback(router, &remoteRC, RCRequestResult::Success); callback(router, &remoteRC, RCRequestResult::Success);
@ -155,7 +156,8 @@ namespace llarp
if (rc.IsPublicRouter()) if (rc.IsPublicRouter())
{ {
LogDebug("Adding or updating RC for ", RouterID(rc.pubkey), " to nodedb and dht."); LogDebug("Adding or updating RC for ", RouterID(rc.pubkey), " to nodedb and dht.");
_nodedb->UpdateAsyncIfNewer(rc); const RouterContact copy{rc};
LogicCall(_logic, [copy, n = _nodedb]() { n->PutIfNewer(copy); });
_dht->impl->PutRCNodeAsync(rc); _dht->impl->PutRCNodeAsync(rc);
} }
@ -210,7 +212,7 @@ namespace llarp
RCLookupHandler::PeriodicUpdate(llarp_time_t now) RCLookupHandler::PeriodicUpdate(llarp_time_t now)
{ {
// try looking up stale routers // try looking up stale routers
std::set<RouterID> routersToLookUp; std::unordered_set<RouterID> routersToLookUp;
_nodedb->VisitInsertedBefore( _nodedb->VisitInsertedBefore(
[&](const RouterContact& rc) { [&](const RouterContact& rc) {
@ -231,7 +233,7 @@ namespace llarp
void void
RCLookupHandler::ExploreNetwork() RCLookupHandler::ExploreNetwork()
{ {
const size_t known = _nodedb->num_loaded(); const size_t known = _nodedb->NumLoaded();
if (_bootstrapRCList.empty() && known == 0) if (_bootstrapRCList.empty() && known == 0)
{ {
LogError("we have no bootstrap nodes specified"); LogError("we have no bootstrap nodes specified");
@ -298,17 +300,19 @@ namespace llarp
void void
RCLookupHandler::Init( RCLookupHandler::Init(
llarp_dht_context* dht, llarp_dht_context* dht,
llarp_nodedb* nodedb, std::shared_ptr<NodeDB> nodedb,
std::shared_ptr<Logic> logic,
WorkerFunc_t dowork, WorkerFunc_t dowork,
ILinkManager* linkManager, ILinkManager* linkManager,
service::Context* hiddenServiceContext, service::Context* hiddenServiceContext,
const std::set<RouterID>& strictConnectPubkeys, const std::unordered_set<RouterID>& strictConnectPubkeys,
const std::set<RouterContact>& bootstrapRCList, const std::set<RouterContact>& bootstrapRCList,
bool useWhitelist_arg, bool useWhitelist_arg,
bool isServiceNode_arg) bool isServiceNode_arg)
{ {
_dht = dht; _dht = dht;
_nodedb = nodedb; _nodedb = nodedb;
_logic = logic;
_work = dowork; _work = dowork;
_hiddenServiceContext = hiddenServiceContext; _hiddenServiceContext = hiddenServiceContext;
_strictConnectPubkeys = strictConnectPubkeys; _strictConnectPubkeys = strictConnectPubkeys;

@ -8,13 +8,16 @@
#include <unordered_map> #include <unordered_map>
#include <set> #include <set>
#include <unordered_set>
#include <list> #include <list>
struct llarp_nodedb;
struct llarp_dht_context; struct llarp_dht_context;
namespace llarp namespace llarp
{ {
class NodeDB;
class Logic;
namespace service namespace service
{ {
struct Context; struct Context;
@ -72,11 +75,12 @@ namespace llarp
void void
Init( Init(
llarp_dht_context* dht, llarp_dht_context* dht,
llarp_nodedb* nodedb, std::shared_ptr<NodeDB> nodedb,
std::shared_ptr<Logic> logic,
WorkerFunc_t dowork, WorkerFunc_t dowork,
ILinkManager* linkManager, ILinkManager* linkManager,
service::Context* hiddenServiceContext, service::Context* hiddenServiceContext,
const std::set<RouterID>& strictConnectPubkeys, const std::unordered_set<RouterID>& strictConnectPubkeys,
const std::set<RouterContact>& bootstrapRCList, const std::set<RouterContact>& bootstrapRCList,
bool useWhitelist_arg, bool useWhitelist_arg,
bool isServiceNode_arg); bool isServiceNode_arg);
@ -98,17 +102,18 @@ namespace llarp
mutable util::Mutex _mutex; // protects pendingCallbacks, whitelistRouters mutable util::Mutex _mutex; // protects pendingCallbacks, whitelistRouters
llarp_dht_context* _dht = nullptr; llarp_dht_context* _dht = nullptr;
llarp_nodedb* _nodedb = nullptr; std::shared_ptr<NodeDB> _nodedb;
std::shared_ptr<Logic> _logic;
WorkerFunc_t _work = nullptr; WorkerFunc_t _work = nullptr;
service::Context* _hiddenServiceContext = nullptr; service::Context* _hiddenServiceContext = nullptr;
ILinkManager* _linkManager = nullptr; ILinkManager* _linkManager = nullptr;
/// explicit whitelist of routers we will connect to directly (not for /// explicit whitelist of routers we will connect to directly (not for
/// service nodes) /// service nodes)
std::set<RouterID> _strictConnectPubkeys; std::unordered_set<RouterID> _strictConnectPubkeys;
std::set<RouterContact> _bootstrapRCList; std::set<RouterContact> _bootstrapRCList;
std::set<RouterID> _bootstrapRouterIDList; std::unordered_set<RouterID> _bootstrapRouterIDList;
std::unordered_map<RouterID, CallbacksQueue, RouterID::Hash> pendingCallbacks std::unordered_map<RouterID, CallbacksQueue, RouterID::Hash> pendingCallbacks
GUARDED_BY(_mutex); GUARDED_BY(_mutex);
@ -116,7 +121,7 @@ namespace llarp
bool useWhitelist = false; bool useWhitelist = false;
bool isServiceNode = false; bool isServiceNode = false;
std::set<RouterID> whitelistRouters GUARDED_BY(_mutex); std::unordered_set<RouterID> whitelistRouters GUARDED_BY(_mutex);
using TimePoint = std::chrono::steady_clock::time_point; using TimePoint = std::chrono::steady_clock::time_point;
std::unordered_map<RouterID, TimePoint, RouterID::Hash> _routerLookupTimes; std::unordered_map<RouterID, TimePoint, RouterID::Hash> _routerLookupTimes;

@ -40,7 +40,7 @@
#include <systemd/sd-daemon.h> #include <systemd/sd-daemon.h>
#endif #endif
#include <lokimq/lokimq.h> #include <oxenmq/oxenmq.h>
static constexpr std::chrono::milliseconds ROUTER_TICK_INTERVAL = 1s; static constexpr std::chrono::milliseconds ROUTER_TICK_INTERVAL = 1s;
@ -49,9 +49,9 @@ namespace llarp
Router::Router( Router::Router(
llarp_ev_loop_ptr __netloop, llarp_ev_loop_ptr __netloop,
std::shared_ptr<Logic> l, std::shared_ptr<Logic> l,
std::unique_ptr<vpn::Platform> vpnPlatform) std::shared_ptr<vpn::Platform> vpnPlatform)
: ready(false) : ready(false)
, m_lmq(std::make_shared<lokimq::LokiMQ>()) , m_lmq(std::make_shared<oxenmq::OxenMQ>())
, _netloop(std::move(__netloop)) , _netloop(std::move(__netloop))
, _logic(std::move(l)) , _logic(std::move(l))
, _vpnPlatform(std::move(vpnPlatform)) , _vpnPlatform(std::move(vpnPlatform))
@ -93,7 +93,7 @@ namespace llarp
peerStatsObj = m_peerDb->ExtractStatus(); peerStatsObj = m_peerDb->ExtractStatus();
return util::StatusObject{{"running", true}, return util::StatusObject{{"running", true},
{"numNodesKnown", _nodedb->num_loaded()}, {"numNodesKnown", _nodedb->NumLoaded()},
{"dht", _dht->impl->ExtractStatus()}, {"dht", _dht->impl->ExtractStatus()},
{"services", _hiddenServiceContext.ExtractStatus()}, {"services", _hiddenServiceContext.ExtractStatus()},
{"exit", _exitContext.ExtractStatus()}, {"exit", _exitContext.ExtractStatus()},
@ -150,19 +150,12 @@ namespace llarp
return _rcLookupHandler.GetRandomWhitelistRouter(router); return _rcLookupHandler.GetRandomWhitelistRouter(router);
} }
auto pick_router = [&](auto& collection) -> bool { if (const auto maybe = nodedb()->GetRandom([](const auto&) -> bool { return true; }))
const auto sz = collection.size(); {
auto itr = collection.begin(); router = maybe->pubkey;
if (sz == 0)
return false;
if (sz > 1)
std::advance(itr, randint() % sz);
router = itr->first;
return true; return true;
}; }
return false;
util::Lock l{nodedb()->access};
return pick_router(nodedb()->entries);
} }
void void
@ -270,17 +263,17 @@ namespace llarp
} }
bool bool
Router::Configure(std::shared_ptr<Config> c, bool isRouter, llarp_nodedb* nodedb) Router::Configure(std::shared_ptr<Config> c, bool isRouter, std::shared_ptr<NodeDB> nodedb)
{ {
m_Config = c; m_Config = c;
auto& conf = *m_Config; auto& conf = *m_Config;
whitelistRouters = conf.lokid.whitelistRouters; whitelistRouters = conf.lokid.whitelistRouters;
if (whitelistRouters) if (whitelistRouters)
lokidRPCAddr = lokimq::address(conf.lokid.lokidRPCAddr); lokidRPCAddr = oxenmq::address(conf.lokid.lokidRPCAddr);
enableRPCServer = conf.api.m_enableRPCServer; enableRPCServer = conf.api.m_enableRPCServer;
if (enableRPCServer) if (enableRPCServer)
rpcBindAddr = lokimq::address(conf.api.m_rpcBindAddr); rpcBindAddr = oxenmq::address(conf.api.m_rpcBindAddr);
if (not StartRpcServer()) if (not StartRpcServer())
throw std::runtime_error("Failed to start rpc server"); throw std::runtime_error("Failed to start rpc server");
@ -461,7 +454,7 @@ namespace llarp
// Lokid Config // Lokid Config
whitelistRouters = conf.lokid.whitelistRouters; whitelistRouters = conf.lokid.whitelistRouters;
lokidRPCAddr = lokimq::address(conf.lokid.lokidRPCAddr); lokidRPCAddr = oxenmq::address(conf.lokid.lokidRPCAddr);
m_isServiceNode = conf.router.m_isRelay; m_isServiceNode = conf.router.m_isRelay;
@ -470,7 +463,7 @@ namespace llarp
/// build a set of strictConnectPubkeys ( /// build a set of strictConnectPubkeys (
/// TODO: make this consistent with config -- do we support multiple strict connections /// TODO: make this consistent with config -- do we support multiple strict connections
// or not? // or not?
std::set<RouterID> strictConnectPubkeys; std::unordered_set<RouterID> strictConnectPubkeys;
if (not networkConfig.m_strictConnect.empty()) if (not networkConfig.m_strictConnect.empty())
{ {
const auto& val = networkConfig.m_strictConnect; const auto& val = networkConfig.m_strictConnect;
@ -562,12 +555,12 @@ namespace llarp
&_rcLookupHandler, &_rcLookupHandler,
&_routerProfiling, &_routerProfiling,
_logic, _logic,
_nodedb,
util::memFn(&AbstractRouter::QueueWork, this)); util::memFn(&AbstractRouter::QueueWork, this));
_linkManager.Init(&_outboundSessionMaker); _linkManager.Init(&_outboundSessionMaker);
_rcLookupHandler.Init( _rcLookupHandler.Init(
_dht, _dht,
_nodedb, _nodedb,
_logic,
util::memFn(&AbstractRouter::QueueWork, this), util::memFn(&AbstractRouter::QueueWork, this),
&_linkManager, &_linkManager,
&_hiddenServiceContext, &_hiddenServiceContext,
@ -691,7 +684,7 @@ namespace llarp
Router::ReportStats() Router::ReportStats()
{ {
const auto now = Now(); const auto now = Now();
LogInfo(nodedb()->num_loaded(), " RCs loaded"); LogInfo(nodedb()->NumLoaded(), " RCs loaded");
LogInfo(bootstrapRCList.size(), " bootstrap peers"); LogInfo(bootstrapRCList.size(), " bootstrap peers");
LogInfo(NumberOfConnectedRouters(), " router connections"); LogInfo(NumberOfConnectedRouters(), " router connections");
if (IsServiceNode()) if (IsServiceNode())
@ -719,13 +712,13 @@ namespace llarp
ss << "WATCHDOG=1\nSTATUS=v" << llarp::VERSION_STR; ss << "WATCHDOG=1\nSTATUS=v" << llarp::VERSION_STR;
if (IsServiceNode()) if (IsServiceNode())
{ {
ss << " snode | known/svc/clients: " << nodedb()->num_loaded() << "/" ss << " snode | known/svc/clients: " << nodedb()->NumLoaded() << "/"
<< NumberOfConnectedRouters() << "/" << NumberOfConnectedClients() << " | " << NumberOfConnectedRouters() << "/" << NumberOfConnectedClients() << " | "
<< pathContext().CurrentTransitPaths() << " active paths"; << pathContext().CurrentTransitPaths() << " active paths";
} }
else else
{ {
ss << " client | known/connected: " << nodedb()->num_loaded() << "/" ss << " client | known/connected: " << nodedb()->NumLoaded() << "/"
<< NumberOfConnectedRouters() << " | path success: "; << NumberOfConnectedRouters() << " | path success: ";
hiddenServiceContext().ForEachService([&ss](const auto& name, const auto& ep) { hiddenServiceContext().ForEachService([&ss](const auto& name, const auto& ep) {
ss << " [" << name << " " << std::setprecision(4) ss << " [" << name << " " << std::setprecision(4)
@ -852,11 +845,8 @@ namespace llarp
{ {
QueueDiskIO([&]() { routerProfiling().Save(_profilesFile); }); QueueDiskIO([&]() { routerProfiling().Save(_profilesFile); });
} }
// save nodedb
if (nodedb()->ShouldSaveToDisk(now)) _nodedb->Tick(now);
{
nodedb()->AsyncFlushToDisk();
}
if (m_peerDb) if (m_peerDb)
{ {
@ -908,10 +898,11 @@ namespace llarp
LogInfo("Session to ", remote, " fully closed"); LogInfo("Session to ", remote, " fully closed");
if (IsServiceNode()) if (IsServiceNode())
return; return;
RouterContact rc; if (const auto maybe = nodedb()->Get(remote); maybe.has_value())
if (not nodedb()->Get(remote, rc)) {
return; for (const auto& addr : maybe->addrs)
m_RoutePoker.DelRoute(rc.addrs[0].toIpAddress().toIP()); m_RoutePoker.DelRoute(addr.toIpAddress().toIP());
}
} }
void void
@ -1097,31 +1088,20 @@ namespace llarp
} }
{ {
ssize_t loaded = _nodedb->LoadAll(); LogInfo("Loading nodedb from disk...");
llarp::LogInfo("loaded ", loaded, " RCs"); _nodedb->LoadFromDisk();
if (loaded < 0)
{
// shouldn't be possible
return false;
}
} }
llarp_dht_context_start(dht(), pubkey()); llarp_dht_context_start(dht(), pubkey());
for (const auto& rc : bootstrapRCList) for (const auto& rc : bootstrapRCList)
{ {
if (this->nodedb()->Insert(rc)) nodedb()->Put(rc);
{
LogInfo("added bootstrap node ", RouterID(rc.pubkey));
}
else
{
LogError("Failed to add bootstrap node ", RouterID(rc.pubkey));
}
_dht->impl->Nodes()->PutNode(rc); _dht->impl->Nodes()->PutNode(rc);
LogInfo("added bootstrap node ", RouterID{rc.pubkey});
} }
LogInfo("have ", _nodedb->num_loaded(), " routers"); LogInfo("have ", _nodedb->NumLoaded(), " routers");
#ifdef _WIN32 #ifdef _WIN32
// windows uses proactor event loop so we need to constantly pump // windows uses proactor event loop so we need to constantly pump
@ -1165,7 +1145,7 @@ namespace llarp
Router::AfterStopIssued() Router::AfterStopIssued()
{ {
StopLinks(); StopLinks();
nodedb()->AsyncFlushToDisk(); nodedb()->SaveToDisk();
_logic->call_later(200ms, std::bind(&Router::AfterStopLinks, this)); _logic->call_later(200ms, std::bind(&Router::AfterStopLinks, this));
} }

@ -46,7 +46,7 @@
#include <unordered_map> #include <unordered_map>
#include <vector> #include <vector>
#include <lokimq/address.h> #include <oxenmq/address.h>
namespace llarp namespace llarp
{ {
@ -103,7 +103,7 @@ namespace llarp
util::StatusObject util::StatusObject
ExtractStatus() const override; ExtractStatus() const override;
llarp_nodedb* std::shared_ptr<NodeDB>
nodedb() const override nodedb() const override
{ {
return _nodedb; return _nodedb;
@ -182,15 +182,15 @@ namespace llarp
llarp_ev_loop_ptr _netloop; llarp_ev_loop_ptr _netloop;
std::shared_ptr<Logic> _logic; std::shared_ptr<Logic> _logic;
std::unique_ptr<vpn::Platform> _vpnPlatform; std::shared_ptr<vpn::Platform> _vpnPlatform;
path::PathContext paths; path::PathContext paths;
exit::Context _exitContext; exit::Context _exitContext;
SecretKey _identity; SecretKey _identity;
SecretKey _encryption; SecretKey _encryption;
llarp_dht_context* _dht = nullptr; llarp_dht_context* _dht = nullptr;
llarp_nodedb* _nodedb; std::shared_ptr<NodeDB> _nodedb;
llarp_time_t _startedAt; llarp_time_t _startedAt;
const lokimq::TaggedThreadID m_DiskThread; const oxenmq::TaggedThreadID m_DiskThread;
llarp_time_t llarp_time_t
Uptime() const override; Uptime() const override;
@ -270,16 +270,16 @@ namespace llarp
void void
PumpLL() override; PumpLL() override;
const lokimq::address DefaultRPCBindAddr = lokimq::address::tcp("127.0.0.1", 1190); const oxenmq::address DefaultRPCBindAddr = oxenmq::address::tcp("127.0.0.1", 1190);
bool enableRPCServer = false; bool enableRPCServer = false;
lokimq::address rpcBindAddr = DefaultRPCBindAddr; oxenmq::address rpcBindAddr = DefaultRPCBindAddr;
std::unique_ptr<rpc::RpcServer> m_RPCServer; std::unique_ptr<rpc::RpcServer> m_RPCServer;
const llarp_time_t _randomStartDelay; const llarp_time_t _randomStartDelay;
std::shared_ptr<rpc::LokidRpcClient> m_lokidRpcClient; std::shared_ptr<rpc::LokidRpcClient> m_lokidRpcClient;
lokimq::address lokidRPCAddr; oxenmq::address lokidRPCAddr;
Profiling _routerProfiling; Profiling _routerProfiling;
fs::path _profilesFile; fs::path _profilesFile;
OutboundMessageHandler _outboundMessageHandler; OutboundMessageHandler _outboundMessageHandler;
@ -329,7 +329,7 @@ namespace llarp
explicit Router( explicit Router(
llarp_ev_loop_ptr __netloop, llarp_ev_loop_ptr __netloop,
std::shared_ptr<Logic> logic, std::shared_ptr<Logic> logic,
std::unique_ptr<vpn::Platform> vpnPlatform); std::shared_ptr<vpn::Platform> vpnPlatform);
virtual ~Router() override; virtual ~Router() override;
@ -358,7 +358,7 @@ namespace llarp
Close(); Close();
bool bool
Configure(std::shared_ptr<Config> conf, bool isRouter, llarp_nodedb* nodedb = nullptr) override; Configure(std::shared_ptr<Config> conf, bool isRouter, std::shared_ptr<NodeDB> nodedb) override;
bool bool
StartRpcServer() override; StartRpcServer() override;

@ -10,7 +10,7 @@
#include <util/printer.hpp> #include <util/printer.hpp>
#include <util/time.hpp> #include <util/time.hpp>
#include <lokimq/bt_serialize.h> #include <oxenmq/bt_serialize.h>
#include <fstream> #include <fstream>
#include <util/fs.hpp> #include <util/fs.hpp>
@ -250,7 +250,7 @@ namespace llarp
try try
{ {
std::string_view buf_view(reinterpret_cast<char*>(buf->cur), buf->size_left()); std::string_view buf_view(reinterpret_cast<char*>(buf->cur), buf->size_left());
lokimq::bt_list_consumer btlist(buf_view); oxenmq::bt_list_consumer btlist(buf_view);
uint64_t outer_version = btlist.consume_integer<uint64_t>(); uint64_t outer_version = btlist.consume_integer<uint64_t>();
@ -284,7 +284,7 @@ namespace llarp
} }
bool bool
RouterContact::DecodeVersion_1(lokimq::bt_list_consumer& btlist) RouterContact::DecodeVersion_1(oxenmq::bt_list_consumer& btlist)
{ {
auto signature_string = btlist.consume_string_view(); auto signature_string = btlist.consume_string_view();
signed_bt_dict = btlist.consume_dict_data(); signed_bt_dict = btlist.consume_dict_data();

@ -17,10 +17,10 @@
#define MAX_RC_SIZE (1024) #define MAX_RC_SIZE (1024)
#define NICKLEN (32) #define NICKLEN (32)
namespace lokimq namespace oxenmq
{ {
class bt_list_consumer; class bt_list_consumer;
} // namespace lokimq } // namespace oxenmq
namespace llarp namespace llarp
{ {
@ -230,7 +230,7 @@ namespace llarp
DecodeVersion_0(llarp_buffer_t* buf); DecodeVersion_0(llarp_buffer_t* buf);
bool bool
DecodeVersion_1(lokimq::bt_list_consumer& btlist); DecodeVersion_1(oxenmq::bt_list_consumer& btlist);
}; };
inline std::ostream& inline std::ostream&

@ -1,5 +1,5 @@
#include <router_id.hpp> #include <router_id.hpp>
#include <lokimq/base32z.h> #include <oxenmq/base32z.h>
namespace llarp namespace llarp
{ {
@ -8,7 +8,7 @@ namespace llarp
std::string std::string
RouterID::ToString() const RouterID::ToString() const
{ {
std::string b32 = lokimq::to_base32z(begin(), end()); std::string b32 = oxenmq::to_base32z(begin(), end());
b32 += SNODE_TLD; b32 += SNODE_TLD;
return b32; return b32;
} }
@ -17,7 +17,7 @@ namespace llarp
RouterID::ShortString() const RouterID::ShortString() const
{ {
// 5 bytes produces exactly 8 base32z characters: // 5 bytes produces exactly 8 base32z characters:
return lokimq::to_base32z(begin(), begin() + 5); return oxenmq::to_base32z(begin(), begin() + 5);
} }
util::StatusObject util::StatusObject
@ -38,9 +38,9 @@ namespace llarp
// - must end in a 1-bit value: 'o' or 'y' (i.e. 10000 or 00000) // - must end in a 1-bit value: 'o' or 'y' (i.e. 10000 or 00000)
// - must have 51 preceeding base32z chars // - must have 51 preceeding base32z chars
// - thus we get 51*5+1 = 256 bits = 32 bytes of output // - thus we get 51*5+1 = 256 bits = 32 bytes of output
if (str.size() != 52 || !lokimq::is_base32z(str) || !(str.back() == 'o' || str.back() == 'y')) if (str.size() != 52 || !oxenmq::is_base32z(str) || !(str.back() == 'o' || str.back() == 'y'))
return false; return false;
lokimq::from_base32z(str.begin(), str.end(), begin()); oxenmq::from_base32z(str.begin(), str.end(), begin());
return true; return true;
} }
} // namespace llarp } // namespace llarp

@ -57,4 +57,17 @@ namespace llarp
} // namespace llarp } // namespace llarp
namespace std
{
template <>
struct hash<llarp::RouterID>
{
size_t
operator()(const llarp::RouterID& id) const
{
const llarp::RouterID::Hash h{};
return h(id);
}
};
} // namespace std
#endif #endif

@ -23,11 +23,11 @@ namespace llarp::rpc
return; return;
m_LMQ->connect_remote( m_LMQ->connect_remote(
m_AuthURL, m_AuthURL,
[self = shared_from_this()](lokimq::ConnectionID c) { [self = shared_from_this()](oxenmq::ConnectionID c) {
self->m_Conn = std::move(c); self->m_Conn = std::move(c);
LogInfo("connected to endpoint auth server via ", *self->m_Conn); LogInfo("connected to endpoint auth server via ", *self->m_Conn);
}, },
[self = shared_from_this()](lokimq::ConnectionID, std::string_view fail) { [self = shared_from_this()](oxenmq::ConnectionID, std::string_view fail) {
LogWarn("failed to connect to endpoint auth server: ", fail); LogWarn("failed to connect to endpoint auth server: ", fail);
self->m_Endpoint->RouterLogic()->call_later(1s, [self]() { self->Start(); }); self->m_Endpoint->RouterLogic()->call_later(1s, [self]() { self->Start(); });
}); });

@ -1,7 +1,7 @@
#pragma once #pragma once
#include <service/auth.hpp> #include <service/auth.hpp>
#include <lokimq/lokimq.h> #include <oxenmq/oxenmq.h>
namespace llarp::service namespace llarp::service
{ {
@ -13,7 +13,7 @@ namespace llarp::rpc
struct EndpointAuthRPC : public llarp::service::IAuthPolicy, struct EndpointAuthRPC : public llarp::service::IAuthPolicy,
public std::enable_shared_from_this<EndpointAuthRPC> public std::enable_shared_from_this<EndpointAuthRPC>
{ {
using LMQ_ptr = std::shared_ptr<lokimq::LokiMQ>; using LMQ_ptr = std::shared_ptr<oxenmq::OxenMQ>;
using Endpoint_ptr = std::shared_ptr<llarp::service::Endpoint>; using Endpoint_ptr = std::shared_ptr<llarp::service::Endpoint>;
using Whitelist_t = std::unordered_set<llarp::service::Address, llarp::service::Address::Hash>; using Whitelist_t = std::unordered_set<llarp::service::Address, llarp::service::Address::Hash>;
@ -39,6 +39,6 @@ namespace llarp::rpc
const Whitelist_t m_AuthWhitelist; const Whitelist_t m_AuthWhitelist;
LMQ_ptr m_LMQ; LMQ_ptr m_LMQ;
Endpoint_ptr m_Endpoint; Endpoint_ptr m_Endpoint;
std::optional<lokimq::ConnectionID> m_Conn; std::optional<oxenmq::ConnectionID> m_Conn;
}; };
} // namespace llarp::rpc } // namespace llarp::rpc

@ -14,22 +14,22 @@ namespace llarp
{ {
namespace rpc namespace rpc
{ {
static lokimq::LogLevel static oxenmq::LogLevel
toLokiMQLogLevel(llarp::LogLevel level) toLokiMQLogLevel(llarp::LogLevel level)
{ {
switch (level) switch (level)
{ {
case eLogError: case eLogError:
return lokimq::LogLevel::error; return oxenmq::LogLevel::error;
case eLogWarn: case eLogWarn:
return lokimq::LogLevel::warn; return oxenmq::LogLevel::warn;
case eLogInfo: case eLogInfo:
return lokimq::LogLevel::info; return oxenmq::LogLevel::info;
case eLogDebug: case eLogDebug:
return lokimq::LogLevel::debug; return oxenmq::LogLevel::debug;
case eLogNone: case eLogNone:
default: default:
return lokimq::LogLevel::trace; return oxenmq::LogLevel::trace;
} }
} }
@ -39,13 +39,13 @@ namespace llarp
// m_lokiMQ->log_level(toLokiMQLogLevel(LogLevel::Instance().curLevel)); // m_lokiMQ->log_level(toLokiMQLogLevel(LogLevel::Instance().curLevel));
// TODO: proper auth here // TODO: proper auth here
auto lokidCategory = m_lokiMQ->add_category("lokid", lokimq::Access{lokimq::AuthLevel::none}); auto lokidCategory = m_lokiMQ->add_category("lokid", oxenmq::Access{oxenmq::AuthLevel::none});
lokidCategory.add_request_command( lokidCategory.add_request_command(
"get_peer_stats", [this](lokimq::Message& m) { HandleGetPeerStats(m); }); "get_peer_stats", [this](oxenmq::Message& m) { HandleGetPeerStats(m); });
} }
void void
LokidRpcClient::ConnectAsync(lokimq::address url) LokidRpcClient::ConnectAsync(oxenmq::address url)
{ {
if (not m_Router->IsServiceNode()) if (not m_Router->IsServiceNode())
{ {
@ -54,8 +54,8 @@ namespace llarp
LogInfo("connecting to lokid via LMQ at ", url); LogInfo("connecting to lokid via LMQ at ", url);
m_Connection = m_lokiMQ->connect_remote( m_Connection = m_lokiMQ->connect_remote(
url, url,
[self = shared_from_this()](lokimq::ConnectionID) { self->Connected(); }, [self = shared_from_this()](oxenmq::ConnectionID) { self->Connected(); },
[self = shared_from_this(), url](lokimq::ConnectionID, std::string_view f) { [self = shared_from_this(), url](oxenmq::ConnectionID, std::string_view f) {
llarp::LogWarn("Failed to connect to lokid: ", f); llarp::LogWarn("Failed to connect to lokid: ", f);
LogicCall(self->m_Router->logic(), [self, url]() { self->ConnectAsync(url); }); LogicCall(self->m_Router->logic(), [self, url]() { self->ConnectAsync(url); });
}); });
@ -236,8 +236,8 @@ namespace llarp
{ {
service::EncryptedName result; service::EncryptedName result;
const auto j = nlohmann::json::parse(data[1]); const auto j = nlohmann::json::parse(data[1]);
result.ciphertext = lokimq::from_hex(j["encrypted_value"].get<std::string>()); result.ciphertext = oxenmq::from_hex(j["encrypted_value"].get<std::string>());
const auto nonce = lokimq::from_hex(j["nonce"].get<std::string>()); const auto nonce = oxenmq::from_hex(j["nonce"].get<std::string>());
if (nonce.size() != result.nonce.size()) if (nonce.size() != result.nonce.size())
{ {
throw std::invalid_argument(stringify( throw std::invalid_argument(stringify(
@ -258,7 +258,7 @@ namespace llarp
} }
void void
LokidRpcClient::HandleGetPeerStats(lokimq::Message& msg) LokidRpcClient::HandleGetPeerStats(oxenmq::Message& msg)
{ {
LogInfo("Got request for peer stats (size: ", msg.data.size(), ")"); LogInfo("Got request for peer stats (size: ", msg.data.size(), ")");
for (auto str : msg.data) for (auto str : msg.data)
@ -290,7 +290,7 @@ namespace llarp
} }
std::vector<std::string> routerIdStrings; std::vector<std::string> routerIdStrings;
lokimq::bt_deserialize(msg.data[0], routerIdStrings); oxenmq::bt_deserialize(msg.data[0], routerIdStrings);
std::vector<RouterID> routerIds; std::vector<RouterID> routerIds;
routerIds.reserve(routerIdStrings.size()); routerIds.reserve(routerIdStrings.size());

@ -2,8 +2,8 @@
#include <router_id.hpp> #include <router_id.hpp>
#include <lokimq/lokimq.h> #include <oxenmq/oxenmq.h>
#include <lokimq/address.h> #include <oxenmq/address.h>
#include <crypto/types.hpp> #include <crypto/types.hpp>
#include <dht/key.hpp> #include <dht/key.hpp>
#include <service/name.hpp> #include <service/name.hpp>
@ -14,7 +14,7 @@ namespace llarp
namespace rpc namespace rpc
{ {
using LMQ_ptr = std::shared_ptr<lokimq::LokiMQ>; using LMQ_ptr = std::shared_ptr<oxenmq::OxenMQ>;
/// The LokidRpcClient uses loki-mq to talk to make API requests to lokid. /// The LokidRpcClient uses loki-mq to talk to make API requests to lokid.
struct LokidRpcClient : public std::enable_shared_from_this<LokidRpcClient> struct LokidRpcClient : public std::enable_shared_from_this<LokidRpcClient>
@ -23,7 +23,7 @@ namespace llarp
/// Connect to lokid async /// Connect to lokid async
void void
ConnectAsync(lokimq::address url); ConnectAsync(oxenmq::address url);
/// blocking request identity key from lokid /// blocking request identity key from lokid
/// throws on failure /// throws on failure
@ -66,9 +66,9 @@ namespace llarp
// Handles request from lokid for peer stats on a specific peer // Handles request from lokid for peer stats on a specific peer
void void
HandleGetPeerStats(lokimq::Message& msg); HandleGetPeerStats(oxenmq::Message& msg);
std::optional<lokimq::ConnectionID> m_Connection; std::optional<oxenmq::ConnectionID> m_Connection;
LMQ_ptr m_lokiMQ; LMQ_ptr m_lokiMQ;
std::string m_CurrentBlockHash; std::string m_CurrentBlockHash;

@ -17,7 +17,7 @@ namespace llarp::rpc
/// maybe parse json from message paramter at index /// maybe parse json from message paramter at index
std::optional<nlohmann::json> std::optional<nlohmann::json>
MaybeParseJSON(const lokimq::Message& msg, size_t index = 0) MaybeParseJSON(const oxenmq::Message& msg, size_t index = 0)
{ {
try try
{ {
@ -55,7 +55,7 @@ namespace llarp::rpc
void void
HandleJSONRequest( HandleJSONRequest(
lokimq::Message& msg, std::function<void(nlohmann::json, ReplyFunction_t)> handleRequest) oxenmq::Message& msg, std::function<void(nlohmann::json, ReplyFunction_t)> handleRequest)
{ {
const auto maybe = MaybeParseJSON(msg); const auto maybe = MaybeParseJSON(msg);
if (not maybe.has_value()) if (not maybe.has_value())
@ -70,10 +70,8 @@ namespace llarp::rpc
} }
try try
{ {
std::promise<std::string> reply; handleRequest(
handleRequest(*maybe, [&reply](std::string result) { reply.set_value(result); }); *maybe, [defer = msg.send_later()](std::string result) { defer.reply(result); });
auto ftr = reply.get_future();
msg.send_reply(ftr.get());
} }
catch (std::exception& ex) catch (std::exception& ex)
{ {
@ -82,13 +80,13 @@ namespace llarp::rpc
} }
void void
RpcServer::AsyncServeRPC(lokimq::address url) RpcServer::AsyncServeRPC(oxenmq::address url)
{ {
m_LMQ->listen_plain(url.zmq_address()); m_LMQ->listen_plain(url.zmq_address());
m_LMQ->add_category("llarp", lokimq::AuthLevel::none) m_LMQ->add_category("llarp", oxenmq::AuthLevel::none)
.add_command( .add_command(
"halt", "halt",
[&](lokimq::Message& msg) { [&](oxenmq::Message& msg) {
if (not m_Router->IsRunning()) if (not m_Router->IsRunning())
{ {
msg.send_reply(CreateJSONError("router is not running")); msg.send_reply(CreateJSONError("router is not running"));
@ -99,25 +97,30 @@ namespace llarp::rpc
}) })
.add_request_command( .add_request_command(
"version", "version",
[r = m_Router](lokimq::Message& msg) { [r = m_Router](oxenmq::Message& msg) {
util::StatusObject result{{"version", llarp::VERSION_FULL}, util::StatusObject result{{"version", llarp::VERSION_FULL},
{"uptime", to_json(r->Uptime())}}; {"uptime", to_json(r->Uptime())}};
msg.send_reply(CreateJSONResponse(result)); msg.send_reply(CreateJSONResponse(result));
}) })
.add_request_command( .add_request_command(
"status", "status",
[&](lokimq::Message& msg) { [&](oxenmq::Message& msg) {
std::promise<util::StatusObject> result; LogicCall(m_Router->logic(), [defer = msg.send_later(), r = m_Router]() {
LogicCall(m_Router->logic(), [&result, r = m_Router]() { std::string data;
const auto state = r->ExtractStatus(); if (r->IsRunning())
result.set_value(state); {
data = CreateJSONResponse(r->ExtractStatus());
}
else
{
data = CreateJSONError("router not yet ready");
}
defer.reply(data);
}); });
auto ftr = result.get_future();
msg.send_reply(CreateJSONResponse(ftr.get()));
}) })
.add_request_command( .add_request_command(
"exit", "exit",
[&](lokimq::Message& msg) { [&](oxenmq::Message& msg) {
HandleJSONRequest(msg, [r = m_Router](nlohmann::json obj, ReplyFunction_t reply) { HandleJSONRequest(msg, [r = m_Router](nlohmann::json obj, ReplyFunction_t reply) {
if (r->IsServiceNode()) if (r->IsServiceNode())
{ {
@ -261,7 +264,7 @@ namespace llarp::rpc
}); });
}); });
}) })
.add_request_command("config", [&](lokimq::Message& msg) { .add_request_command("config", [&](oxenmq::Message& msg) {
HandleJSONRequest(msg, [r = m_Router](nlohmann::json obj, ReplyFunction_t reply) { HandleJSONRequest(msg, [r = m_Router](nlohmann::json obj, ReplyFunction_t reply) {
{ {
const auto itr = obj.find("override"); const auto itr = obj.find("override");

@ -1,8 +1,8 @@
#pragma once #pragma once
#include <string_view> #include <string_view>
#include <lokimq/lokimq.h> #include <oxenmq/oxenmq.h>
#include <lokimq/address.h> #include <oxenmq/address.h>
namespace llarp namespace llarp
{ {
@ -11,14 +11,14 @@ namespace llarp
namespace llarp::rpc namespace llarp::rpc
{ {
using LMQ_ptr = std::shared_ptr<lokimq::LokiMQ>; using LMQ_ptr = std::shared_ptr<oxenmq::OxenMQ>;
struct RpcServer struct RpcServer
{ {
explicit RpcServer(LMQ_ptr, AbstractRouter*); explicit RpcServer(LMQ_ptr, AbstractRouter*);
~RpcServer() = default; ~RpcServer() = default;
void void
AsyncServeRPC(lokimq::address addr); AsyncServeRPC(oxenmq::address addr);
private: private:
LMQ_ptr m_LMQ; LMQ_ptr m_LMQ;

@ -1,77 +1,73 @@
#include <service/address.hpp> #include <service/address.hpp>
#include <crypto/crypto.hpp> #include <crypto/crypto.hpp>
#include <lokimq/base32z.h> #include <oxenmq/base32z.h>
#include <algorithm> #include <algorithm>
namespace llarp namespace llarp::service
{ {
namespace service const std::set<std::string> Address::AllowedTLDs = {".loki", ".snode"};
bool
Address::PermitTLD(const char* tld)
{ {
const std::set<std::string> Address::AllowedTLDs = {".loki", ".snode"}; std::string gtld(tld);
std::transform(gtld.begin(), gtld.end(), gtld.begin(), ::tolower);
return AllowedTLDs.count(gtld) != 0;
}
bool std::string
Address::PermitTLD(const char* tld) Address::ToString(const char* tld) const
{
if (!PermitTLD(tld))
return "";
std::string str;
if (subdomain.size())
{ {
std::string gtld(tld); str = subdomain;
std::transform(gtld.begin(), gtld.end(), gtld.begin(), ::tolower); str += '.';
return AllowedTLDs.count(gtld) != 0;
} }
str += oxenmq::to_base32z(begin(), end());
str += tld;
return str;
}
std::string bool
Address::ToString(const char* tld) const Address::FromString(std::string_view str, const char* tld)
{ {
if (!PermitTLD(tld)) if (!PermitTLD(tld))
return ""; return false;
std::string str; // Find, validate, and remove the .tld
if (subdomain.size()) const auto pos = str.find_last_of('.');
{ if (pos == std::string::npos)
str = subdomain; return false;
str += '.'; if (str.substr(pos) != tld)
} return false;
str += lokimq::to_base32z(begin(), end()); str = str.substr(0, pos);
str += tld;
return str;
}
bool // copy subdomains if they are there (and strip them off)
Address::FromString(std::string_view str, const char* tld) const auto idx = str.find_last_of('.');
if (idx != std::string::npos)
{ {
if (!PermitTLD(tld)) subdomain = str.substr(0, idx);
return false; str.remove_prefix(idx + 1);
// Find, validate, and remove the .tld
const auto pos = str.find_last_of('.');
if (pos == std::string::npos)
return false;
if (str.substr(pos) != tld)
return false;
str = str.substr(0, pos);
// copy subdomains if they are there (and strip them off)
const auto idx = str.find_last_of('.');
if (idx != std::string::npos)
{
subdomain = str.substr(0, idx);
str.remove_prefix(idx + 1);
}
// Ensure we have something valid:
// - must end in a 1-bit value: 'o' or 'y' (i.e. 10000 or 00000)
// - must have 51 preceeding base32z chars
// - thus we get 51*5+1 = 256 bits = 32 bytes of output
if (str.size() != 52 || !lokimq::is_base32z(str) || !(str.back() == 'o' || str.back() == 'y'))
return false;
lokimq::from_base32z(str.begin(), str.end(), begin());
return true;
} }
dht::Key_t // Ensure we have something valid:
Address::ToKey() const // - must end in a 1-bit value: 'o' or 'y' (i.e. 10000 or 00000)
{ // - must have 51 preceeding base32z chars
PubKey k; // - thus we get 51*5+1 = 256 bits = 32 bytes of output
CryptoManager::instance()->derive_subkey(k, PubKey(data()), 1); if (str.size() != 52 || !oxenmq::is_base32z(str) || !(str.back() == 'o' || str.back() == 'y'))
return dht::Key_t{k.as_array()}; return false;
}
oxenmq::from_base32z(str.begin(), str.end(), begin());
return true;
}
} // namespace service dht::Key_t
} // namespace llarp Address::ToKey() const
{
PubKey k;
CryptoManager::instance()->derive_subkey(k, PubKey(data()), 1);
return dht::Key_t{k.as_array()};
}
} // namespace llarp::service

@ -647,24 +647,65 @@ namespace llarp
} }
} }
bool std::optional<std::vector<RouterContact>>
Endpoint::SelectHop( Endpoint::GetHopsForBuild()
llarp_nodedb* db, {
const std::set<RouterID>& prev, std::unordered_set<RouterID> exclude;
RouterContact& cur, ForEachPath([&exclude](auto path) { exclude.insert(path->Endpoint()); });
size_t hop, const auto maybe = m_router->nodedb()->GetRandom(
path::PathRole roles) [exclude](const auto& rc) -> bool { return exclude.count(rc.pubkey) == 0; });
if (not maybe.has_value())
return std::nullopt;
return GetHopsForBuildWithEndpoint(maybe->pubkey);
}
std::optional<std::vector<RouterContact>>
Endpoint::GetHopsForBuildWithEndpoint(RouterID endpoint)
{ {
std::set<RouterID> exclude = prev; std::vector<RouterContact> hops;
for (const auto& snode : SnodeBlacklist()) // get first hop
exclude.insert(snode); if (const auto maybe = SelectFirstHop(); maybe.has_value())
if (hop == numHops - 1 and numHops > 1)
{ {
// diversify endpoints hops.emplace_back(*maybe);
ForEachPath([&exclude](const path::Path_ptr& path) { exclude.insert(path->Endpoint()); });
} }
return path::Builder::SelectHop(db, exclude, cur, hop, roles); else
return std::nullopt;
auto filter =
[endpoint, &hops, blacklist = SnodeBlacklist(), r = m_router](const auto& rc) -> bool {
if (blacklist.count(rc.pubkey) > 0)
return false;
if (r->routerProfiling().IsBadForPath(rc.pubkey))
return false;
for (const auto& hop : hops)
{
if (hop.pubkey == rc.pubkey)
return false;
}
return endpoint != rc.pubkey;
};
for (size_t idx = hops.size(); idx < numHops; ++idx)
{
if (idx + 1 == numHops)
{
if (const auto maybe = m_router->nodedb()->Get(endpoint))
{
hops.emplace_back(*maybe);
}
else
return std::nullopt;
}
else if (const auto maybe = m_router->nodedb()->GetRandom(filter))
{
hops.emplace_back(*maybe);
}
else
return std::nullopt;
}
return hops;
} }
void void
@ -712,19 +753,18 @@ namespace llarp
} }
void void
Endpoint::HandleVerifyGotRouter(dht::GotRouterMessage_constptr msg, llarp_async_verify_rc* j) Endpoint::HandleVerifyGotRouter(dht::GotRouterMessage_constptr msg, RouterID id, bool valid)
{ {
auto& pendingRouters = m_state->m_PendingRouters; auto& pendingRouters = m_state->m_PendingRouters;
auto itr = pendingRouters.find(j->rc.pubkey); auto itr = pendingRouters.find(id);
if (itr != pendingRouters.end()) if (itr != pendingRouters.end())
{ {
if (j->valid) if (valid)
itr->second.InformResult(msg->foundRCs); itr->second.InformResult(msg->foundRCs);
else else
itr->second.InformResult({}); itr->second.InformResult({});
pendingRouters.erase(itr); pendingRouters.erase(itr);
} }
delete j;
} }
bool bool
@ -732,16 +772,15 @@ namespace llarp
{ {
if (not msg->foundRCs.empty()) if (not msg->foundRCs.empty())
{ {
for (const auto& rc : msg->foundRCs) for (auto rc : msg->foundRCs)
{ {
llarp_async_verify_rc* job = new llarp_async_verify_rc(); Router()->QueueWork([rc = std::move(rc), logic = Router()->logic(), self = this, msg]() {
job->nodedb = Router()->nodedb(); bool valid = rc.Verify(llarp::time_now_ms());
job->worker = util::memFn(&AbstractRouter::QueueWork, Router()); LogicCall(logic, [self, valid, rc = std::move(rc), msg]() {
job->disk = util::memFn(&AbstractRouter::QueueDiskIO, Router()); self->Router()->nodedb()->PutIfNewer(rc);
job->logic = Router()->logic(); self->HandleVerifyGotRouter(msg, rc.pubkey, valid);
job->hook = std::bind(&Endpoint::HandleVerifyGotRouter, this, msg, std::placeholders::_1); });
job->rc = rc; });
llarp_nodedb_async_verify(job);
} }
} }
else else

@ -28,8 +28,6 @@
#define MIN_SHIFT_INTERVAL 5s #define MIN_SHIFT_INTERVAL 5s
#endif #endif
struct llarp_async_verify_rc;
namespace llarp namespace llarp
{ {
namespace service namespace service
@ -364,13 +362,11 @@ namespace llarp
bool bool
HasExit() const; HasExit() const;
bool std::optional<std::vector<RouterContact>>
SelectHop( GetHopsForBuild() override;
llarp_nodedb* db,
const std::set<RouterID>& prev, std::optional<std::vector<RouterContact>>
RouterContact& cur, GetHopsForBuildWithEndpoint(RouterID endpoint);
size_t hop,
path::PathRole roles) override;
virtual void virtual void
PathBuildStarted(path::Path_ptr path) override; PathBuildStarted(path::Path_ptr path) override;
@ -436,7 +432,7 @@ namespace llarp
private: private:
void void
HandleVerifyGotRouter(dht::GotRouterMessage_constptr msg, llarp_async_verify_rc* j); HandleVerifyGotRouter(dht::GotRouterMessage_constptr msg, RouterID id, bool valid);
bool bool
OnLookup(const service::Address& addr, std::optional<IntroSet> i, const RouterID& endpoint); OnLookup(const service::Address& addr, std::optional<IntroSet> i, const RouterID& endpoint);

@ -2,393 +2,389 @@
#include <crypto/crypto.hpp> #include <crypto/crypto.hpp>
#include <path/path.hpp> #include <path/path.hpp>
#include <lokimq/bt_serialize.h> #include <oxenmq/bt_serialize.h>
namespace llarp namespace llarp::service
{ {
namespace service util::StatusObject
EncryptedIntroSet::ExtractStatus() const
{ {
util::StatusObject const auto sz = introsetPayload.size();
EncryptedIntroSet::ExtractStatus() const return {
{ {"location", derivedSigningKey.ToString()}, {"signedAt", to_json(signedAt)}, {"size", sz}};
const auto sz = introsetPayload.size(); }
return {{"location", derivedSigningKey.ToString()},
{"signedAt", to_json(signedAt)}, bool
{"size", sz}}; EncryptedIntroSet::BEncode(llarp_buffer_t* buf) const
} {
if (not bencode_start_dict(buf))
return false;
if (not BEncodeWriteDictEntry("d", derivedSigningKey, buf))
return false;
if (not BEncodeWriteDictEntry("n", nounce, buf))
return false;
if (not BEncodeWriteDictInt("s", signedAt.count(), buf))
return false;
if (not bencode_write_bytestring(buf, "x", 1))
return false;
if (not bencode_write_bytestring(buf, introsetPayload.data(), introsetPayload.size()))
return false;
if (not BEncodeWriteDictEntry("z", sig, buf))
return false;
return bencode_end(buf);
}
bool bool
EncryptedIntroSet::BEncode(llarp_buffer_t* buf) const EncryptedIntroSet::DecodeKey(const llarp_buffer_t& key, llarp_buffer_t* buf)
{
bool read = false;
if (key == "x")
{ {
if (not bencode_start_dict(buf)) llarp_buffer_t strbuf;
if (not bencode_read_string(buf, &strbuf))
return false; return false;
if (not BEncodeWriteDictEntry("d", derivedSigningKey, buf)) if (strbuf.sz > MAX_INTROSET_SIZE)
return false; return false;
if (not BEncodeWriteDictEntry("n", nounce, buf)) introsetPayload.resize(strbuf.sz);
return false; std::copy_n(strbuf.base, strbuf.sz, introsetPayload.data());
if (not BEncodeWriteDictInt("s", signedAt.count(), buf)) return true;
return false;
if (not bencode_write_bytestring(buf, "x", 1))
return false;
if (not bencode_write_bytestring(buf, introsetPayload.data(), introsetPayload.size()))
return false;
if (not BEncodeWriteDictEntry("z", sig, buf))
return false;
return bencode_end(buf);
} }
if (not BEncodeMaybeReadDictEntry("d", derivedSigningKey, read, key, buf))
return false;
bool if (not BEncodeMaybeReadDictEntry("n", nounce, read, key, buf))
EncryptedIntroSet::DecodeKey(const llarp_buffer_t& key, llarp_buffer_t* buf) return false;
{
bool read = false;
if (key == "x")
{
llarp_buffer_t strbuf;
if (not bencode_read_string(buf, &strbuf))
return false;
if (strbuf.sz > MAX_INTROSET_SIZE)
return false;
introsetPayload.resize(strbuf.sz);
std::copy_n(strbuf.base, strbuf.sz, introsetPayload.data());
return true;
}
if (not BEncodeMaybeReadDictEntry("d", derivedSigningKey, read, key, buf))
return false;
if (not BEncodeMaybeReadDictEntry("n", nounce, read, key, buf)) if (not BEncodeMaybeReadDictInt("s", signedAt, read, key, buf))
return false; return false;
if (not BEncodeMaybeReadDictInt("s", signedAt, read, key, buf)) if (not BEncodeMaybeReadDictEntry("z", sig, read, key, buf))
return false; return false;
return read;
}
if (not BEncodeMaybeReadDictEntry("z", sig, read, key, buf)) bool
return false; EncryptedIntroSet::OtherIsNewer(const EncryptedIntroSet& other) const
return read; {
} return signedAt < other.signedAt;
}
bool std::ostream&
EncryptedIntroSet::OtherIsNewer(const EncryptedIntroSet& other) const EncryptedIntroSet::print(std::ostream& out, int levels, int spaces) const
{ {
return signedAt < other.signedAt; Printer printer(out, levels, spaces);
} printer.printAttribute("d", derivedSigningKey);
printer.printAttribute("n", nounce);
printer.printAttribute("s", signedAt.count());
printer.printAttribute("x", "[" + std::to_string(introsetPayload.size()) + " bytes]");
printer.printAttribute("z", sig);
return out;
}
std::optional<IntroSet>
EncryptedIntroSet::MaybeDecrypt(const PubKey& root) const
{
SharedSecret k(root);
IntroSet i;
std::vector<byte_t> payload = introsetPayload;
llarp_buffer_t buf(payload);
CryptoManager::instance()->xchacha20(buf, k, nounce);
if (not i.BDecode(&buf))
return {};
return i;
}
bool
EncryptedIntroSet::IsExpired(llarp_time_t now) const
{
return now >= signedAt + path::default_lifetime;
}
std::ostream& bool
EncryptedIntroSet::print(std::ostream& out, int levels, int spaces) const EncryptedIntroSet::Sign(const PrivateKey& k)
{ {
Printer printer(out, levels, spaces); signedAt = llarp::time_now_ms();
printer.printAttribute("d", derivedSigningKey); if (not k.toPublic(derivedSigningKey))
printer.printAttribute("n", nounce); return false;
printer.printAttribute("s", signedAt.count()); sig.Zero();
printer.printAttribute("x", "[" + std::to_string(introsetPayload.size()) + " bytes]"); std::array<byte_t, MAX_INTROSET_SIZE + 128> tmp;
printer.printAttribute("z", sig); llarp_buffer_t buf(tmp);
return out; if (not BEncode(&buf))
} return false;
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
if (not CryptoManager::instance()->sign(sig, k, buf))
return false;
LogDebug("signed encrypted introset: ", *this);
return true;
}
std::optional<IntroSet> bool
EncryptedIntroSet::MaybeDecrypt(const PubKey& root) const EncryptedIntroSet::Verify(llarp_time_t now) const
{ {
SharedSecret k(root); if (IsExpired(now))
IntroSet i; return false;
std::vector<byte_t> payload = introsetPayload; std::array<byte_t, MAX_INTROSET_SIZE + 128> tmp;
llarp_buffer_t buf(payload); llarp_buffer_t buf(tmp);
CryptoManager::instance()->xchacha20(buf, k, nounce); EncryptedIntroSet copy(*this);
if (not i.BDecode(&buf)) copy.sig.Zero();
return {}; if (not copy.BEncode(&buf))
return i; return false;
} LogDebug("verify encrypted introset: ", copy, " sig = ", sig);
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
return CryptoManager::instance()->verify(derivedSigningKey, buf, sig);
}
util::StatusObject
IntroSet::ExtractStatus() const
{
util::StatusObject obj{{"published", to_json(T)}};
std::vector<util::StatusObject> introsObjs;
std::transform(
I.begin(),
I.end(),
std::back_inserter(introsObjs),
[](const auto& intro) -> util::StatusObject { return intro.ExtractStatus(); });
obj["intros"] = introsObjs;
if (!topic.IsZero())
obj["topic"] = topic.ToString();
return obj;
}
bool
IntroSet::DecodeKey(const llarp_buffer_t& key, llarp_buffer_t* buf)
{
bool read = false;
if (!BEncodeMaybeReadDictEntry("a", A, read, key, buf))
return false;
bool if (key == "i")
EncryptedIntroSet::IsExpired(llarp_time_t now) const
{ {
return now >= signedAt + path::default_lifetime; return BEncodeReadList(I, buf);
} }
if (!BEncodeMaybeReadDictEntry("k", K, read, key, buf))
return false;
bool if (!BEncodeMaybeReadDictEntry("n", topic, read, key, buf))
EncryptedIntroSet::Sign(const PrivateKey& k) return false;
{
signedAt = llarp::time_now_ms();
if (not k.toPublic(derivedSigningKey))
return false;
sig.Zero();
std::array<byte_t, MAX_INTROSET_SIZE + 128> tmp;
llarp_buffer_t buf(tmp);
if (not BEncode(&buf))
return false;
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
if (not CryptoManager::instance()->sign(sig, k, buf))
return false;
LogDebug("signed encrypted introset: ", *this);
return true;
}
bool if (key == "s")
EncryptedIntroSet::Verify(llarp_time_t now) const
{ {
if (IsExpired(now)) byte_t* begin = buf->cur;
return false; if (not bencode_discard(buf))
std::array<byte_t, MAX_INTROSET_SIZE + 128> tmp;
llarp_buffer_t buf(tmp);
EncryptedIntroSet copy(*this);
copy.sig.Zero();
if (not copy.BEncode(&buf))
return false; return false;
LogDebug("verify encrypted introset: ", copy, " sig = ", sig);
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
return CryptoManager::instance()->verify(derivedSigningKey, buf, sig);
}
util::StatusObject byte_t* end = buf->cur;
IntroSet::ExtractStatus() const
{
util::StatusObject obj{{"published", to_json(T)}};
std::vector<util::StatusObject> introsObjs;
std::transform(
I.begin(),
I.end(),
std::back_inserter(introsObjs),
[](const auto& intro) -> util::StatusObject { return intro.ExtractStatus(); });
obj["intros"] = introsObjs;
if (!topic.IsZero())
obj["topic"] = topic.ToString();
return obj;
}
bool std::string_view srvString(reinterpret_cast<char*>(begin), end - begin);
IntroSet::DecodeKey(const llarp_buffer_t& key, llarp_buffer_t* buf)
{
bool read = false;
if (!BEncodeMaybeReadDictEntry("a", A, read, key, buf))
return false;
if (key == "i") try
{ {
return BEncodeReadList(I, buf); oxenmq::bt_deserialize(srvString, SRVs);
} }
if (!BEncodeMaybeReadDictEntry("k", K, read, key, buf)) catch (const oxenmq::bt_deserialize_invalid& err)
{
LogError("Error decoding SRV records from IntroSet: ", err.what());
return false; return false;
}
read = true;
}
if (!BEncodeMaybeReadDictEntry("n", topic, read, key, buf)) if (!BEncodeMaybeReadDictInt("t", T, read, key, buf))
return false; return false;
if (key == "s") if (key == "w")
{ {
byte_t* begin = buf->cur; W.emplace();
if (not bencode_discard(buf)) return bencode_decode_dict(*W, buf);
return false; }
byte_t* end = buf->cur; if (!BEncodeMaybeReadDictInt("v", version, read, key, buf))
return false;
std::string_view srvString(reinterpret_cast<char*>(begin), end - begin); if (!BEncodeMaybeReadDictEntry("z", Z, read, key, buf))
return false;
try if (read)
{ return true;
lokimq::bt_deserialize(srvString, SRVs);
}
catch (const lokimq::bt_deserialize_invalid& err)
{
LogError("Error decoding SRV records from IntroSet: ", err.what());
return false;
}
read = true;
}
if (!BEncodeMaybeReadDictInt("t", T, read, key, buf)) return bencode_discard(buf);
return false; }
if (key == "w") bool
{ IntroSet::BEncode(llarp_buffer_t* buf) const
W.emplace(); {
return bencode_decode_dict(*W, buf); if (!bencode_start_dict(buf))
} return false;
if (!BEncodeWriteDictEntry("a", A, buf))
return false;
// start introduction list
if (!bencode_write_bytestring(buf, "i", 1))
return false;
if (!BEncodeWriteList(I.begin(), I.end(), buf))
return false;
// end introduction list
if (!BEncodeMaybeReadDictInt("v", version, read, key, buf)) // pq pubkey
return false; if (!BEncodeWriteDictEntry("k", K, buf))
return false;
if (!BEncodeMaybeReadDictEntry("z", Z, read, key, buf)) // topic tag
if (topic.ToString().size())
{
if (!BEncodeWriteDictEntry("n", topic, buf))
return false; return false;
if (read)
return true;
return bencode_discard(buf);
} }
bool if (SRVs.size())
IntroSet::BEncode(llarp_buffer_t* buf) const
{ {
if (!bencode_start_dict(buf)) std::string serial = oxenmq::bt_serialize(SRVs);
return false; if (!bencode_write_bytestring(buf, "s", 1))
if (!BEncodeWriteDictEntry("a", A, buf))
return false;
// start introduction list
if (!bencode_write_bytestring(buf, "i", 1))
return false; return false;
if (!BEncodeWriteList(I.begin(), I.end(), buf)) if (!buf->write(serial.begin(), serial.end()))
return false; return false;
// end introduction list }
// Timestamp published
if (!BEncodeWriteDictInt("t", T.count(), buf))
return false;
// pq pubkey // write version
if (!BEncodeWriteDictEntry("k", K, buf)) if (!BEncodeWriteDictInt("v", version, buf))
return false;
if (W)
{
if (!BEncodeWriteDictEntry("w", *W, buf))
return false; return false;
}
if (!BEncodeWriteDictEntry("z", Z, buf))
return false;
// topic tag return bencode_end(buf);
if (topic.ToString().size()) }
{
if (!BEncodeWriteDictEntry("n", topic, buf))
return false;
}
if (SRVs.size()) bool
{ IntroSet::HasExpiredIntros(llarp_time_t now) const
std::string serial = lokimq::bt_serialize(SRVs); {
if (!bencode_write_bytestring(buf, "s", 1)) for (const auto& i : I)
return false; if (now >= i.expiresAt)
if (!buf->write(serial.begin(), serial.end())) return true;
return false; return false;
} }
// Timestamp published bool
if (!BEncodeWriteDictInt("t", T.count(), buf)) IntroSet::IsExpired(llarp_time_t now) const
return false; {
return GetNewestIntroExpiration() < now;
}
// write version std::vector<llarp::dns::SRVData>
if (!BEncodeWriteDictInt("v", version, buf)) IntroSet::GetMatchingSRVRecords(std::string_view service_proto) const
return false; {
if (W) std::vector<llarp::dns::SRVData> records;
for (const auto& tuple : SRVs)
{
if (std::get<0>(tuple) == service_proto)
{ {
if (!BEncodeWriteDictEntry("w", *W, buf)) records.push_back(llarp::dns::SRVData::fromTuple(tuple));
return false;
} }
if (!BEncodeWriteDictEntry("z", Z, buf))
return false;
return bencode_end(buf);
} }
bool return records;
IntroSet::HasExpiredIntros(llarp_time_t now) const }
bool
IntroSet::Verify(llarp_time_t now) const
{
std::array<byte_t, MAX_INTROSET_SIZE> tmp;
llarp_buffer_t buf(tmp);
IntroSet copy;
copy = *this;
copy.Z.Zero();
if (!copy.BEncode(&buf))
{ {
for (const auto& i : I)
if (now >= i.expiresAt)
return true;
return false; return false;
} }
// rewind and resize buffer
bool buf.sz = buf.cur - buf.base;
IntroSet::IsExpired(llarp_time_t now) const buf.cur = buf.base;
if (!A.Verify(buf, Z))
{ {
return GetNewestIntroExpiration() < now; return false;
} }
// validate PoW
std::vector<llarp::dns::SRVData> if (W && !W->IsValid(now))
IntroSet::GetMatchingSRVRecords(std::string_view service_proto) const
{ {
std::vector<llarp::dns::SRVData> records; return false;
for (const auto& tuple : SRVs)
{
if (std::get<0>(tuple) == service_proto)
{
records.push_back(llarp::dns::SRVData::fromTuple(tuple));
}
}
return records;
} }
// valid timestamps
bool // add max clock skew
IntroSet::Verify(llarp_time_t now) const now += MAX_INTROSET_TIME_DELTA;
for (const auto& intro : I)
{ {
std::array<byte_t, MAX_INTROSET_SIZE> tmp; if (intro.expiresAt > now && intro.expiresAt - now > path::default_lifetime)
llarp_buffer_t buf(tmp);
IntroSet copy;
copy = *this;
copy.Z.Zero();
if (!copy.BEncode(&buf))
{
return false;
}
// rewind and resize buffer
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
if (!A.Verify(buf, Z))
{
return false;
}
// validate PoW
if (W && !W->IsValid(now))
{ {
return false; if (!W)
}
// valid timestamps
// add max clock skew
now += MAX_INTROSET_TIME_DELTA;
for (const auto& intro : I)
{
if (intro.expiresAt > now && intro.expiresAt - now > path::default_lifetime)
{ {
if (!W) LogWarn("intro has too high expire time");
{ return false;
LogWarn("intro has too high expire time"); }
return false; if (intro.expiresAt - W->extendedLifetime > path::default_lifetime)
} {
if (intro.expiresAt - W->extendedLifetime > path::default_lifetime) return false;
{
return false;
}
} }
} }
if (IsExpired(now))
{
LogWarn("introset expired: ", *this);
return false;
}
return true;
} }
if (IsExpired(now))
llarp_time_t
IntroSet::GetNewestIntroExpiration() const
{ {
llarp_time_t t = 0s; LogWarn("introset expired: ", *this);
for (const auto& intro : I) return false;
t = std::max(intro.expiresAt, t);
return t;
} }
return true;
}
std::ostream& llarp_time_t
IntroSet::print(std::ostream& stream, int level, int spaces) const IntroSet::GetNewestIntroExpiration() const
{ {
Printer printer(stream, level, spaces); llarp_time_t t = 0s;
printer.printAttribute("A", A); for (const auto& intro : I)
printer.printAttribute("I", I); t = std::max(intro.expiresAt, t);
printer.printAttribute("K", K); return t;
}
std::ostream&
IntroSet::print(std::ostream& stream, int level, int spaces) const
{
Printer printer(stream, level, spaces);
printer.printAttribute("A", A);
printer.printAttribute("I", I);
printer.printAttribute("K", K);
std::string _topic = topic.ToString(); std::string _topic = topic.ToString();
if (!_topic.empty()) if (!_topic.empty())
{ {
printer.printAttribute("topic", _topic); printer.printAttribute("topic", _topic);
} }
else else
{ {
printer.printAttribute("topic", topic); printer.printAttribute("topic", topic);
} }
printer.printAttribute("T", T.count());
if (W)
{
printer.printAttribute("W", *W);
}
else
{
printer.printAttribute("W", "NULL");
}
printer.printAttribute("V", version);
printer.printAttribute("Z", Z);
return stream; printer.printAttribute("T", T.count());
if (W)
{
printer.printAttribute("W", *W);
} }
} // namespace service else
} // namespace llarp {
printer.printAttribute("W", "NULL");
}
printer.printAttribute("V", version);
printer.printAttribute("Z", Z);
return stream;
}
} // namespace llarp::service

@ -334,33 +334,16 @@ namespace llarp
: (now >= createdAt && now - createdAt > connectTimeout); : (now >= createdAt && now - createdAt > connectTimeout);
} }
bool std::optional<std::vector<RouterContact>>
OutboundContext::SelectHop( OutboundContext::GetHopsForBuild()
llarp_nodedb* db,
const std::set<RouterID>& prev,
RouterContact& cur,
size_t hop,
path::PathRole roles)
{ {
if (m_NextIntro.router.IsZero() || prev.count(m_NextIntro.router)) if (m_NextIntro.router.IsZero())
{ {
ShiftIntroduction(false); ShiftIntroduction(false);
} }
if (m_NextIntro.router.IsZero()) if (m_NextIntro.router.IsZero())
return false; return std::nullopt;
std::set<RouterID> exclude = prev; return GetHopsAlignedToForBuild(m_NextIntro.router);
exclude.insert(m_NextIntro.router);
for (const auto& snode : m_Endpoint->SnodeBlacklist())
exclude.insert(snode);
if (hop == numHops - 1)
{
m_Endpoint->EnsureRouterIsKnown(m_NextIntro.router);
if (db->Get(m_NextIntro.router, cur))
return true;
++m_BuildFails;
return false;
}
return path::Builder::SelectHop(db, exclude, cur, hop, roles);
} }
bool bool

@ -106,13 +106,8 @@ namespace llarp
void void
HandlePathBuildFailed(path::Path_ptr path) override; HandlePathBuildFailed(path::Path_ptr path) override;
bool std::optional<std::vector<RouterContact>>
SelectHop( GetHopsForBuild() override;
llarp_nodedb* db,
const std::set<RouterID>& prev,
RouterContact& cur,
size_t hop,
path::PathRole roles) override;
bool bool
HandleHiddenServiceFrame(path::Path_ptr p, const ProtocolFrame& frame); HandleHiddenServiceFrame(path::Path_ptr p, const ProtocolFrame& frame);

@ -7,10 +7,10 @@ namespace tooling
HiveContext::HiveContext(RouterHive* hive) : m_hive(hive) HiveContext::HiveContext(RouterHive* hive) : m_hive(hive)
{} {}
std::unique_ptr<llarp::AbstractRouter> std::shared_ptr<llarp::AbstractRouter>
HiveContext::makeRouter(llarp_ev_loop_ptr netloop, std::shared_ptr<llarp::Logic> logic) HiveContext::makeRouter(llarp_ev_loop_ptr netloop, std::shared_ptr<llarp::Logic> logic)
{ {
return std::make_unique<HiveRouter>(netloop, logic, makeVPNPlatform(), m_hive); return std::make_shared<HiveRouter>(netloop, logic, makeVPNPlatform(), m_hive);
} }
HiveRouter* HiveRouter*

@ -11,7 +11,7 @@ namespace tooling
{ {
HiveContext(RouterHive* hive); HiveContext(RouterHive* hive);
std::unique_ptr<llarp::AbstractRouter> std::shared_ptr<llarp::AbstractRouter>
makeRouter(llarp_ev_loop_ptr netloop, std::shared_ptr<llarp::Logic> logic) override; makeRouter(llarp_ev_loop_ptr netloop, std::shared_ptr<llarp::Logic> logic) override;
/// Get this context's router as a HiveRouter. /// Get this context's router as a HiveRouter.

@ -7,9 +7,9 @@ namespace tooling
HiveRouter::HiveRouter( HiveRouter::HiveRouter(
llarp_ev_loop_ptr netloop, llarp_ev_loop_ptr netloop,
std::shared_ptr<llarp::Logic> logic, std::shared_ptr<llarp::Logic> logic,
std::unique_ptr<llarp::vpn::Platform> plat, std::shared_ptr<llarp::vpn::Platform> plat,
RouterHive* hive) RouterHive* hive)
: Router(netloop, logic, std::move(plat)), m_hive(hive) : Router(netloop, logic, plat), m_hive(hive)
{} {}
bool bool

@ -13,7 +13,7 @@ namespace tooling
explicit HiveRouter( explicit HiveRouter(
llarp_ev_loop_ptr netloop, llarp_ev_loop_ptr netloop,
std::shared_ptr<llarp::Logic> logic, std::shared_ptr<llarp::Logic> logic,
std::unique_ptr<llarp::vpn::Platform> vpnPlatform, std::shared_ptr<llarp::vpn::Platform> vpnPlatform,
RouterHive* hive); RouterHive* hive);
virtual ~HiveRouter() = default; virtual ~HiveRouter() = default;

@ -23,7 +23,7 @@ namespace tooling
opts.isRouter = isRouter; opts.isRouter = isRouter;
Context_ptr context = std::make_shared<HiveContext>(this); Context_ptr context = std::make_shared<HiveContext>(this);
context->Configure(*config); context->Configure(config);
context->Setup(opts); context->Setup(opts);
auto routerId = llarp::RouterID(context->router->pubkey()); auto routerId = llarp::RouterID(context->router->pubkey());

@ -6,7 +6,7 @@
#include <util/meta/traits.hpp> #include <util/meta/traits.hpp>
#include <util/printer.hpp> #include <util/printer.hpp>
#include <lokimq/hex.h> #include <oxenmq/hex.h>
#include <array> #include <array>
#include <cstddef> #include <cstddef>
@ -71,7 +71,7 @@ namespace llarp
friend std::ostream& friend std::ostream&
operator<<(std::ostream& out, const AlignedBuffer& self) operator<<(std::ostream& out, const AlignedBuffer& self)
{ {
return out << lokimq::to_hex(self.begin(), self.end()); return out << oxenmq::to_hex(self.begin(), self.end());
} }
/// bitwise NOT /// bitwise NOT
@ -261,21 +261,21 @@ namespace llarp
std::string std::string
ToHex() const ToHex() const
{ {
return lokimq::to_hex(begin(), end()); return oxenmq::to_hex(begin(), end());
} }
std::string std::string
ShortHex() const ShortHex() const
{ {
return lokimq::to_hex(begin(), begin() + 4); return oxenmq::to_hex(begin(), begin() + 4);
} }
bool bool
FromHex(std::string_view str) FromHex(std::string_view str)
{ {
if (str.size() != 2 * size() || !lokimq::is_hex(str)) if (str.size() != 2 * size() || !oxenmq::is_hex(str))
return false; return false;
lokimq::from_hex(str.begin(), str.end(), begin()); oxenmq::from_hex(str.begin(), str.end(), begin());
return true; return true;
} }

@ -14,23 +14,23 @@
namespace llarp::vpn namespace llarp::vpn
{ {
std::unique_ptr<Platform> std::shared_ptr<Platform>
MakeNativePlatform(llarp::Context* ctx) MakeNativePlatform(llarp::Context* ctx)
{ {
(void)ctx; (void)ctx;
std::unique_ptr<Platform> plat; std::shared_ptr<Platform> plat;
#ifdef _WIN32 #ifdef _WIN32
plat = std::make_unique<vpn::Win32Platform>(); plat = std::make_shared<vpn::Win32Platform>();
#endif #endif
#ifdef __linux__ #ifdef __linux__
#ifdef ANDROID #ifdef ANDROID
plat = std::make_unique<vpn::AndroidPlatform>(); plat = std::make_shared<vpn::AndroidPlatform>();
#else #else
plat = std::make_unique<vpn::LinuxPlatform>(); plat = std::make_shared<vpn::LinuxPlatform>();
#endif #endif
#endif #endif
#ifdef __APPLE__ #ifdef __APPLE__
plat = std::make_unique<vpn::ApplePlatform>(); plat = std::make_shared<vpn::ApplePlatform>();
#endif #endif
return plat; return plat;
} }

@ -121,7 +121,7 @@ namespace llarp::vpn
{ {
std::atomic<bool> m_Run; std::atomic<bool> m_Run;
HANDLE m_Device, m_IOCP; HANDLE m_Device, m_IOCP;
std::vector<HANDLE> m_Threads; std::vector<std::thread> m_Threads;
thread::Queue<net::IPPacket> m_ReadQueue; thread::Queue<net::IPPacket> m_ReadQueue;
const InterfaceInfo m_Info; const InterfaceInfo m_Info;
@ -334,7 +334,7 @@ namespace llarp::vpn
CloseHandle(m_Device); CloseHandle(m_Device);
// close the reader threads // close the reader threads
for (auto& thread : m_Threads) for (auto& thread : m_Threads)
CloseHandle(thread); thread.join();
} }
int int
@ -355,21 +355,22 @@ namespace llarp::vpn
return ""; return "";
} }
static DWORD FAR PASCAL
Loop(void* u)
{
static_cast<Win32Interface*>(u)->ReadLoop();
return 0;
}
void void
Start() Start()
{ {
m_Run = true; m_Run = true;
const auto numThreads = std::thread::hardware_concurrency(); const auto numThreads = std::thread::hardware_concurrency();
m_IOCP = CreateIoCompletionPort(m_Device, nullptr, (ULONG_PTR)this, 1 + numThreads); // allocate packets
for (size_t idx = 0; idx < numThreads; ++idx) for (size_t idx = 0; idx < numThreads; ++idx)
m_Threads.push_back(CreateThread(nullptr, 0, &Loop, this, 0, nullptr)); m_Packets.emplace_back(new asio_evt_pkt{true});
// create completion port
m_IOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, 0, 0);
// attach the handle or some shit
CreateIoCompletionPort(m_Device, m_IOCP, 0, 0);
// spawn reader threads
for (size_t idx = 0; idx < numThreads; ++idx)
m_Threads.emplace_back([this, idx]() { ReadLoop(idx); });
} }
net::IPPacket net::IPPacket
@ -394,6 +395,8 @@ namespace llarp::vpn
} }
}; };
std::vector<std::unique_ptr<asio_evt_pkt>> m_Packets;
bool bool
WritePacket(net::IPPacket pkt) WritePacket(net::IPPacket pkt)
{ {
@ -406,9 +409,9 @@ namespace llarp::vpn
} }
void void
ReadLoop() ReadLoop(size_t packetIndex)
{ {
std::unique_ptr<asio_evt_pkt> ev = std::make_unique<asio_evt_pkt>(true); auto& ev = m_Packets[packetIndex];
ev->Read(m_Device); ev->Read(m_Device);
while (m_Run) while (m_Run)
{ {

@ -99,7 +99,7 @@ namespace llarp
.def_property( .def_property(
"lokidRPCAddr", "lokidRPCAddr",
[](LokidConfig& self) { return self.lokidRPCAddr.full_address().c_str(); }, [](LokidConfig& self) { return self.lokidRPCAddr.full_address().c_str(); },
[](LokidConfig& self, std::string arg) { self.lokidRPCAddr = lokimq::address(arg); }); [](LokidConfig& self, std::string arg) { self.lokidRPCAddr = oxenmq::address(arg); });
py::class_<BootstrapConfig>(mod, "BootstrapConfig") py::class_<BootstrapConfig>(mod, "BootstrapConfig")
.def(py::init<>()) .def(py::init<>())

@ -11,9 +11,9 @@ namespace llarp
.def( .def(
"FromHex", "FromHex",
[](RouterID* r, const std::string& hex) { [](RouterID* r, const std::string& hex) {
if (hex.size() != 2 * r->size() || !lokimq::is_hex(hex)) if (hex.size() != 2 * r->size() || !oxenmq::is_hex(hex))
throw std::runtime_error("FromHex requires a 64-digit hex string"); throw std::runtime_error("FromHex requires a 64-digit hex string");
lokimq::from_hex(hex.begin(), hex.end(), r->data()); oxenmq::from_hex(hex.begin(), hex.end(), r->data());
}) })
.def("__repr__", &RouterID::ToString) .def("__repr__", &RouterID::ToString)
.def("__str__", &RouterID::ToString) .def("__str__", &RouterID::ToString)

@ -11,18 +11,18 @@ static const llarp::RuntimeOptions opts = {.background = false, .debug = false,
std::shared_ptr<llarp::Context> std::shared_ptr<llarp::Context>
make_context() make_context()
{ {
llarp::Config conf{fs::current_path()}; auto conf = std::make_shared<llarp::Config>(fs::current_path());
conf.Load(std::nullopt, true); conf->Load(std::nullopt, true);
// set testing defaults // set testing defaults
conf.network.m_endpointType = "null"; conf->network.m_endpointType = "null";
conf.bootstrap.seednode = true; conf->bootstrap.seednode = true;
conf.api.m_enableRPCServer = false; conf->api.m_enableRPCServer = false;
conf.lokid.whitelistRouters = false; conf->lokid.whitelistRouters = false;
conf.router.m_publicAddress = llarp::IpAddress("1.1.1.1"); conf->router.m_publicAddress = llarp::IpAddress("1.1.1.1");
// make a fake inbound link // make a fake inbound link
conf.links.m_InboundLinks.emplace_back(); conf->links.m_InboundLinks.emplace_back();
auto& link = conf.links.m_InboundLinks.back(); auto& link = conf->links.m_InboundLinks.back();
link.interface = "0.0.0.0"; link.interface = "0.0.0.0";
link.addressFamily = AF_INET; link.addressFamily = AF_INET;
link.port = 0; link.port = 0;

@ -4,23 +4,25 @@
#include <router_contact.hpp> #include <router_contact.hpp>
#include <nodedb.hpp> #include <nodedb.hpp>
using llarp_nodedb = llarp::NodeDB;
TEST_CASE("FindClosestTo returns correct number of elements", "[nodedb][dht]") TEST_CASE("FindClosestTo returns correct number of elements", "[nodedb][dht]")
{ {
llarp_nodedb nodeDB("", nullptr); llarp_nodedb nodeDB{fs::current_path(), nullptr};
constexpr uint64_t numRCs = 3; constexpr uint64_t numRCs = 3;
for (uint64_t i = 0; i < numRCs; ++i) for (uint64_t i = 0; i < numRCs; ++i)
{ {
llarp::RouterContact rc; llarp::RouterContact rc;
rc.pubkey[0] = i; rc.pubkey[0] = i;
nodeDB.Insert(rc); nodeDB.Put(rc);
} }
REQUIRE(numRCs == nodeDB.num_loaded()); REQUIRE(numRCs == nodeDB.NumLoaded());
llarp::dht::Key_t key; llarp::dht::Key_t key;
std::vector<llarp::RouterContact> results = nodeDB.FindClosestTo(key, 4); std::vector<llarp::RouterContact> results = nodeDB.FindManyClosestTo(key, 4);
// we asked for more entries than nodedb had // we asked for more entries than nodedb had
REQUIRE(numRCs == results.size()); REQUIRE(numRCs == results.size());
@ -28,26 +30,26 @@ TEST_CASE("FindClosestTo returns correct number of elements", "[nodedb][dht]")
TEST_CASE("FindClosestTo returns properly ordered set", "[nodedb][dht]") TEST_CASE("FindClosestTo returns properly ordered set", "[nodedb][dht]")
{ {
llarp_nodedb nodeDB("", nullptr); llarp_nodedb nodeDB{fs::current_path(), nullptr};
// insert some RCs: a < b < c // insert some RCs: a < b < c
llarp::RouterContact a; llarp::RouterContact a;
a.pubkey[0] = 1; a.pubkey[0] = 1;
nodeDB.Insert(a); nodeDB.Put(a);
llarp::RouterContact b; llarp::RouterContact b;
b.pubkey[0] = 2; b.pubkey[0] = 2;
nodeDB.Insert(b); nodeDB.Put(b);
llarp::RouterContact c; llarp::RouterContact c;
c.pubkey[0] = 3; c.pubkey[0] = 3;
nodeDB.Insert(c); nodeDB.Put(c);
REQUIRE(3 == nodeDB.num_loaded()); REQUIRE(3 == nodeDB.NumLoaded());
llarp::dht::Key_t key; llarp::dht::Key_t key;
std::vector<llarp::RouterContact> results = nodeDB.FindClosestTo(key, 2); std::vector<llarp::RouterContact> results = nodeDB.FindManyClosestTo(key, 2);
REQUIRE(2 == results.size()); REQUIRE(2 == results.size());
// we xor'ed with 0x0, so order should be a,b,c // we xor'ed with 0x0, so order should be a,b,c
@ -57,7 +59,7 @@ TEST_CASE("FindClosestTo returns properly ordered set", "[nodedb][dht]")
llarp::dht::Key_t compKey; llarp::dht::Key_t compKey;
compKey.Fill(0xFF); compKey.Fill(0xFF);
results = nodeDB.FindClosestTo(compKey, 2); results = nodeDB.FindManyClosestTo(compKey, 2);
// we xor'ed with 0xF...F, so order should be inverted (c,b,a) // we xor'ed with 0xF...F, so order should be inverted (c,b,a)
REQUIRE(c.pubkey == results[0].pubkey); REQUIRE(c.pubkey == results[0].pubkey);

@ -11,12 +11,12 @@ llarp::RuntimeOptions opts = {false, false, false};
static std::shared_ptr<llarp::Context> static std::shared_ptr<llarp::Context>
make_context(std::optional<fs::path> keyfile) make_context(std::optional<fs::path> keyfile)
{ {
llarp::Config conf{fs::current_path()}; auto conf = std::make_shared<llarp::Config>(fs::current_path());
conf.Load(std::nullopt, opts.isRouter); conf->Load(std::nullopt, opts.isRouter);
conf.network.m_endpointType = "null"; conf->network.m_endpointType = "null";
conf.network.m_keyfile = keyfile; conf->network.m_keyfile = keyfile;
conf.bootstrap.seednode = true; conf->bootstrap.seednode = true;
conf.api.m_enableRPCServer = false; conf->api.m_enableRPCServer = false;
auto context = std::make_shared<llarp::Context>(); auto context = std::make_shared<llarp::Context>();
REQUIRE_NOTHROW(context->Configure(std::move(conf))); REQUIRE_NOTHROW(context->Configure(std::move(conf)));

Loading…
Cancel
Save