- Added call_get to ev.hpp to queue event loop operations w/ a return value
- de-mutexed NodeDB and made all operations via event loop. Some calls to NodeDB methods (like ::put_if_newer) were wrapped in call->get's, but some weren't. All function bodies were using mutex locks
pull/2204/head
dr7ana 8 months ago
parent c8dae875b5
commit 5ccec24470

@ -42,22 +42,22 @@ namespace llarp::dht
GetRouter()->rc_lookup_handler().check_rc(rc);
}
void
LookupIntroSetRelayed(
const Key_t& target,
const Key_t& whoasked,
uint64_t whoaskedTX,
const Key_t& askpeer,
uint64_t relayOrder,
service::EncryptedIntroSetLookupHandler result = nullptr) override;
void
LookupIntroSetDirect(
const Key_t& target,
const Key_t& whoasked,
uint64_t whoaskedTX,
const Key_t& askpeer,
service::EncryptedIntroSetLookupHandler result = nullptr) override;
// void
// LookupIntroSetRelayed(
// const Key_t& target,
// const Key_t& whoasked,
// uint64_t whoaskedTX,
// const Key_t& askpeer,
// uint64_t relayOrder,
// service::EncryptedIntroSetLookupHandler result = nullptr) override;
// void
// LookupIntroSetDirect(
// const Key_t& target,
// const Key_t& whoasked,
// uint64_t whoaskedTX,
// const Key_t& askpeer,
// service::EncryptedIntroSetLookupHandler result = nullptr) override;
/// on behalf of whoasked request router with public key target from dht
/// router with key askpeer

@ -1,6 +1,5 @@
#include "explorenetworkjob.hpp"
#include "context.hpp"
#include <llarp/dht/messages/findrouter.hpp>
#include <llarp/router/router.hpp>
@ -8,36 +7,33 @@
#include <llarp/tooling/dht_event.hpp>
namespace llarp
namespace llarp::dht
{
namespace dht
void
ExploreNetworkJob::Start(const TXOwner& peer)
{
void
ExploreNetworkJob::Start(const TXOwner& peer)
auto msg = new FindRouterMessage(peer.txid);
auto router = parent->GetRouter();
if (router)
{
auto msg = new FindRouterMessage(peer.txid);
auto router = parent->GetRouter();
if (router)
{
router->notify_router_event<tooling::FindRouterSentEvent>(router->pubkey(), *msg);
}
parent->DHTSendTo(peer.node.as_array(), msg);
router->notify_router_event<tooling::FindRouterSentEvent>(router->pubkey(), *msg);
}
parent->DHTSendTo(peer.node.as_array(), msg);
}
void
ExploreNetworkJob::SendReply()
{
llarp::LogDebug("got ", valuesFound.size(), " routers from exploration");
void
ExploreNetworkJob::SendReply()
{
llarp::LogDebug("got ", valuesFound.size(), " routers from exploration");
auto router = parent->GetRouter();
for (const auto& pk : valuesFound)
{
// lookup router
if (router and router->node_db()->Has(pk))
continue;
parent->LookupRouter(
pk, [router, pk](const auto& res) { router->HandleDHTLookupForExplore(pk, res); });
}
auto router = parent->GetRouter();
for (const auto& pk : valuesFound)
{
// lookup router
if (router and router->node_db()->Has(pk))
continue;
parent->LookupRouter(
pk, [router, pk](const auto& res) { router->HandleDHTLookupForExplore(pk, res); });
}
} // namespace dht
} // namespace llarp
}
} // namespace llarp::dht

@ -4,30 +4,27 @@
#include "tx.hpp"
#include <llarp/router_id.hpp>
namespace llarp
namespace llarp::dht
{
namespace dht
struct ExploreNetworkJob : public TX<RouterID, RouterID>
{
struct ExploreNetworkJob : public TX<RouterID, RouterID>
{
ExploreNetworkJob(const RouterID& peer, AbstractDHTMessageHandler* ctx)
: TX<RouterID, RouterID>(TXOwner{}, peer, ctx)
{}
ExploreNetworkJob(const RouterID& peer, AbstractDHTMessageHandler* ctx)
: TX<RouterID, RouterID>(TXOwner{}, peer, ctx)
{}
bool
Validate(const RouterID&) const override
{
// TODO: check with lokid
return true;
}
bool
Validate(const RouterID&) const override
{
// TODO: check with lokid
return true;
}
void
Start(const TXOwner& peer) override;
void
Start(const TXOwner& peer) override;
void
SendReply() override;
};
} // namespace dht
} // namespace llarp
void
SendReply() override;
};
} // namespace llarp::dht
#endif

@ -9,74 +9,71 @@
#include <set>
#include <vector>
namespace llarp
namespace llarp::dht
{
namespace dht
{
struct AbstractDHTMessageHandler;
struct AbstractDHTMessageHandler;
template <typename K, typename V>
struct TX
{
K target;
AbstractDHTMessageHandler* parent;
std::set<Key_t> peersAsked;
std::vector<V> valuesFound;
TXOwner whoasked;
template <typename K, typename V>
struct TX
{
K target;
AbstractDHTMessageHandler* parent;
std::set<Key_t> peersAsked;
std::vector<V> valuesFound;
TXOwner whoasked;
TX(const TXOwner& asker, const K& k, AbstractDHTMessageHandler* p)
: target(k), parent(p), whoasked(asker)
{}
TX(const TXOwner& asker, const K& k, AbstractDHTMessageHandler* p)
: target(k), parent(p), whoasked(asker)
{}
virtual ~TX() = default;
virtual ~TX() = default;
void
OnFound(const Key_t& askedPeer, const V& value);
void
OnFound(const Key_t& askedPeer, const V& value);
util::StatusObject
ExtractStatus() const
{
util::StatusObject obj{
{"whoasked", whoasked.ExtractStatus()}, {"target", target.ExtractStatus()}};
std::vector<util::StatusObject> foundObjs;
std::transform(
valuesFound.begin(),
valuesFound.end(),
std::back_inserter(foundObjs),
[](const auto& item) -> util::StatusObject { return item.ExtractStatus(); });
util::StatusObject
ExtractStatus() const
{
util::StatusObject obj{
{"whoasked", whoasked.ExtractStatus()}, {"target", target.ExtractStatus()}};
std::vector<util::StatusObject> foundObjs;
std::transform(
valuesFound.begin(),
valuesFound.end(),
std::back_inserter(foundObjs),
[](const auto& item) -> util::StatusObject { return item.ExtractStatus(); });
obj["found"] = foundObjs;
std::vector<std::string> asked;
std::transform(
peersAsked.begin(),
peersAsked.end(),
std::back_inserter(asked),
[](const auto& item) -> std::string { return item.ToString(); });
obj["asked"] = asked;
return obj;
}
obj["found"] = foundObjs;
std::vector<std::string> asked;
std::transform(
peersAsked.begin(),
peersAsked.end(),
std::back_inserter(asked),
[](const auto& item) -> std::string { return item.ToString(); });
obj["asked"] = asked;
return obj;
}
virtual bool
Validate(const V& value) const = 0;
virtual bool
Validate(const V& value) const = 0;
virtual void
Start(const TXOwner& peer) = 0;
virtual void
Start(const TXOwner& peer) = 0;
virtual void
SendReply() = 0;
};
virtual void
SendReply() = 0;
};
template <typename K, typename V>
inline void
TX<K, V>::OnFound(const Key_t& askedPeer, const V& value)
template <typename K, typename V>
inline void
TX<K, V>::OnFound(const Key_t& askedPeer, const V& value)
{
peersAsked.insert(askedPeer);
if (Validate(value))
{
peersAsked.insert(askedPeer);
if (Validate(value))
{
valuesFound.push_back(value);
}
valuesFound.push_back(value);
}
} // namespace dht
} // namespace llarp
}
} // namespace llarp::dht
#endif

@ -11,6 +11,8 @@
#include <future>
#include <utility>
using oxen::log::slns::source_location;
namespace uvw
{
class Loop;
@ -21,6 +23,21 @@ namespace llarp
struct SockAddr;
struct UDPHandle;
static auto loop_cat = llarp::log::Cat("ev-loop");
template <typename... T>
void
loop_trace_log(
const log::logger_ptr& cat_logger,
[[maybe_unused]] const source_location& location,
[[maybe_unused]] fmt::format_string<T...> fmt,
[[maybe_unused]] T&&... args)
{
if (cat_logger)
cat_logger->log(
log::detail::spdlog_sloc(location), log::Level::trace, fmt, std::forward<T>(args)...);
}
namespace vpn
{
class NetworkInterface;
@ -143,6 +160,35 @@ namespace llarp
});
}
/// Calls a function and synchronously obtains its return value. If called from within the
/// event loop, the function is called and returned immediately, otherwise a promise/future
/// is used with `call_soon` to block until the event loop comes around and calls the
/// function.
template <typename Callable, typename Ret = decltype(std::declval<Callable>()())>
Ret
call_get(Callable&& f, source_location src = source_location::current())
{
if (inEventLoop())
{
loop_trace_log(loop_cat, src, "Event loop calling `{}`", src.function_name());
return f();
}
std::promise<Ret> prom;
auto fut = prom.get_future();
call_soon([&f, &prom] {
try
{
prom.set_value(f());
}
catch (...)
{
prom.set_exception(std::current_exception());
}
});
return fut.get();
}
// Wraps a lambda with a lambda that triggers it to be called via loop->call()
// when invoked. E.g.:
//

@ -19,14 +19,14 @@ namespace llarp::exit
size_t hoplen,
EndpointBase* parent)
: llarp::path::Builder{r, numpaths, hoplen}
, m_ExitRouter{routerId}
, m_WritePacket{std::move(writepkt)}
, exit_router{routerId}
, packet_write_func{std::move(writepkt)}
, m_Counter{0}
, m_LastUse{r->now()}
, m_BundleRC{false}
, m_Parent{parent}
{
CryptoManager::instance()->identity_keygen(m_ExitIdentity);
CryptoManager::instance()->identity_keygen(exit_key);
}
BaseSession::~BaseSession() = default;
@ -42,16 +42,16 @@ namespace llarp::exit
{
auto obj = path::Builder::ExtractStatus();
obj["lastExitUse"] = to_json(m_LastUse);
auto pub = m_ExitIdentity.toPublic();
auto pub = exit_key.toPublic();
obj["exitIdentity"] = pub.ToString();
obj["endpoint"] = m_ExitRouter.ToString();
obj["endpoint"] = exit_router.ToString();
return obj;
}
bool
BaseSession::LoadIdentityFromFile(const char* fname)
{
return m_ExitIdentity.LoadFromFile(fname);
return exit_key.LoadFromFile(fname);
}
bool
@ -68,7 +68,7 @@ namespace llarp::exit
void
BaseSession::BlacklistSNode(const RouterID snode)
{
m_SnodeBlacklist.insert(std::move(snode));
snode_blacklist.insert(std::move(snode));
}
std::optional<std::vector<RouterContact>>
@ -76,12 +76,12 @@ namespace llarp::exit
{
if (numHops == 1)
{
if (auto maybe = router->node_db()->Get(m_ExitRouter))
if (auto maybe = router->node_db()->get_rc(exit_router))
return std::vector<RouterContact>{*maybe};
return std::nullopt;
}
else
return GetHopsAlignedToForBuild(m_ExitRouter);
return GetHopsAlignedToForBuild(exit_router);
}
bool
@ -103,13 +103,13 @@ namespace llarp::exit
obtain.sequence_number = p->NextSeqNo();
obtain.tx_id = llarp::randint();
PopulateRequest(obtain);
if (!obtain.Sign(m_ExitIdentity))
if (!obtain.Sign(exit_key))
{
llarp::LogError("Failed to sign exit request");
return;
}
if (p->SendExitRequest(obtain, router))
llarp::LogInfo("asking ", m_ExitRouter, " for exit");
llarp::LogInfo("asking ", exit_router, " for exit");
else
llarp::LogError("failed to send exit request");
}
@ -161,7 +161,7 @@ namespace llarp::exit
{
llarp::LogInfo(p->name(), " closing exit path");
routing::CloseExitMessage msg;
if (msg.Sign(m_ExitIdentity) && p->SendExitClose(msg, router))
if (msg.Sign(exit_key) && p->SendExitClose(msg, router))
{
p->ClearRoles(roles);
}
@ -182,7 +182,7 @@ namespace llarp::exit
{
LogInfo(p->name(), " closing exit path");
routing::CloseExitMessage msg;
if (!(msg.Sign(m_ExitIdentity) && p->SendExitClose(msg, router)))
if (!(msg.Sign(exit_key) && p->SendExitClose(msg, router)))
LogWarn(p->name(), " failed to send exit close message");
}
};
@ -209,7 +209,7 @@ namespace llarp::exit
return true;
}
if (m_WritePacket)
if (packet_write_func)
{
llarp::net::IPPacket pkt{buf.view_all()};
if (pkt.empty())
@ -224,7 +224,7 @@ namespace llarp::exit
bool
BaseSession::HandleTrafficDrop(llarp::path::Path_ptr p, const PathID_t& path, uint64_t s)
{
llarp::LogError("dropped traffic on exit ", m_ExitRouter, " S=", s, " P=", path);
llarp::LogError("dropped traffic on exit ", exit_router, " S=", s, " P=", path);
p->EnterState(path::ePathIgnore, router->now());
return true;
}
@ -309,16 +309,37 @@ namespace llarp::exit
if (numHops == 1)
{
auto r = router;
if (const auto maybe = r->node_db()->Get(m_ExitRouter); maybe.has_value())
r->TryConnectAsync(*maybe, 5);
if (const auto maybe = r->node_db()->get_rc(exit_router); maybe.has_value())
r->connect_to(*maybe);
else
r->LookupRouter(m_ExitRouter, [r](const std::vector<RouterContact>& results) {
if (results.size())
r->TryConnectAsync(results[0], 5);
r->lookup_router(exit_router, [r](oxen::quic::message m) mutable {
if (m)
{
std::string payload;
try
{
oxenc::bt_dict_consumer btdc{m.body()};
payload = btdc.require<std::string>("RC");
}
catch (...)
{
log::warning(link_cat, "Failed to parse Find Router response!");
throw;
}
RouterContact result{std::move(payload)};
r->node_db()->put_rc_if_newer(result);
r->connect_to(result);
}
else
{
r->link_manager().handle_find_router_error(std::move(m));
}
});
}
else if (UrgentBuild(now))
BuildOneAlignedTo(m_ExitRouter);
BuildOneAlignedTo(exit_router);
}
return true;
}
@ -328,8 +349,8 @@ namespace llarp::exit
{
while (m_Downstream.size())
{
if (m_WritePacket)
m_WritePacket(const_cast<net::IPPacket&>(m_Downstream.top().second).steal());
if (packet_write_func)
packet_write_func(const_cast<net::IPPacket&>(m_Downstream.top().second).steal());
m_Downstream.pop();
}
}
@ -346,20 +367,20 @@ namespace llarp::exit
{
if (useRouterSNodeKey)
{
m_ExitIdentity = r->identity();
exit_key = r->identity();
}
}
std::string
SNodeSession::Name() const
{
return "SNode::" + m_ExitRouter.ToString();
return "SNode::" + exit_router.ToString();
}
std::string
ExitSession::Name() const
{
return "Exit::" + m_ExitRouter.ToString();
return "Exit::" + exit_router.ToString();
}
void

@ -117,7 +117,7 @@ namespace llarp
const llarp::RouterID
Endpoint() const
{
return m_ExitRouter;
return exit_router;
}
std::optional<PathID_t>
@ -138,9 +138,9 @@ namespace llarp
AddReadyHook(SessionReadyFunc func);
protected:
llarp::RouterID m_ExitRouter;
llarp::SecretKey m_ExitIdentity;
std::function<bool(const llarp_buffer_t&)> m_WritePacket;
llarp::RouterID exit_router;
llarp::SecretKey exit_key;
std::function<bool(const llarp_buffer_t&)> packet_write_func;
virtual void
PopulateRequest(llarp::routing::ObtainExitMessage& msg) const = 0;
@ -159,11 +159,9 @@ namespace llarp
service::ProtocolType t);
private:
std::set<RouterID> m_SnodeBlacklist;
std::set<RouterID> snode_blacklist;
using UpstreamTrafficQueue_t = std::deque<llarp::routing::TransferTrafficMessage>;
using TieredQueue_t = std::map<uint8_t, UpstreamTrafficQueue_t>;
TieredQueue_t m_Upstream;
std::map<uint8_t, std::deque<routing::TransferTrafficMessage>> m_Upstream;
PathID_t m_CurrentPath;

@ -624,23 +624,39 @@ namespace llarp::handlers
RouterID snode;
if (snode.FromString(qname))
{
router()->LookupRouter(snode, [reply, msg = std::move(msg)](const auto& found) mutable {
if (found.empty())
{
msg.AddNXReply();
}
else
{
std::string recs;
for (const auto& rc : found)
recs += rc.ToTXTRecord();
msg.AddTXTReply(std::move(recs));
}
reply(msg);
});
router()->lookup_router(
snode, [r = router(), msg = std::move(msg), reply](oxen::quic::message m) mutable {
if (m)
{
std::string payload;
try
{
oxenc::bt_dict_consumer btdc{m.body()};
payload = btdc.require<std::string>("RC");
}
catch (...)
{
log::warning(link_cat, "Failed to parse Find Router response!");
throw;
}
r->node_db()->put_rc_if_newer(RouterContact{payload});
msg.AddTXTReply(payload);
}
else
{
msg.AddNXReply();
r->link_manager().handle_find_router_error(std::move(m));
}
reply(msg);
});
return true;
}
else if (msg.questions[0].IsLocalhost() and msg.questions[0].HasSubdomains())
if (msg.questions[0].IsLocalhost() and msg.questions[0].HasSubdomains())
{
const auto subdomain = msg.questions[0].Subdomains();
if (subdomain == "exit")

@ -70,19 +70,10 @@ namespace llarp
}
bool
Contacts::lookup_router(const RouterID& rid)
Contacts::lookup_router(const RouterID& rid, std::function<void(oxen::quic::message)> func)
{
dht::Key_t ask_peer;
if (not _rc_nodes->FindClosest(dht::Key_t{rid}, ask_peer))
return false;
_router.loop()->call([this, rid]() {
_router.send_control_message(
rid, "find_router", FindRouterMessage::serialize(rid, false, false));
});
return true;
return _router.send_control_message(
rid, "find_router", FindRouterMessage::serialize(rid, false, false), std::move(func));
}
void

@ -46,7 +46,7 @@ namespace llarp
ExtractStatus() const;
bool
lookup_router(const RouterID&);
lookup_router(const RouterID&, std::function<void(oxen::quic::message)> = nullptr);
void
put_rc_node_async(const dht::RCNode& val);

@ -54,7 +54,7 @@ namespace llarp
auto& c = itr->second;
auto& _scid = c->conn->scid();
link_manager.router.loop()->call([this, scid = _scid, rid = _rid]() {
link_manager._router.loop()->call([this, scid = _scid, rid = _rid]() {
endpoint->close_connection(scid);
conns.erase(rid);
@ -111,7 +111,7 @@ namespace llarp
auto& c = itr->second;
auto& _scid = c->conn->scid();
link_manager.router.loop()->call([this, scid = _scid, rid = _rid]() {
link_manager._router.loop()->call([this, scid = _scid, rid = _rid]() {
endpoint->close_connection(scid);
conns.erase(rid);
@ -137,7 +137,7 @@ namespace llarp
for (const auto& [name, func] : rpc_commands)
{
s->register_command(name, [this, f = func](oxen::quic::message m) {
router.loop()->call([this, func = f, msg = std::move(m)]() mutable {
_router.loop()->call([this, func = f, msg = std::move(m)]() mutable {
std::invoke(func, this, std::move(msg));
});
});
@ -155,7 +155,7 @@ namespace llarp
- will return a BTRequestStream on the first call to get_new_stream<BTRequestStream>
*/
auto ep = quic->endpoint(
router.public_ip(),
_router.public_ip(),
[this](oxen::quic::connection_interface& ci) { return on_conn_open(ci); },
[this](oxen::quic::connection_interface& ci, uint64_t ec) {
return on_conn_closed(ci, ec);
@ -178,11 +178,11 @@ namespace llarp
}
LinkManager::LinkManager(Router& r)
: router{r}
: _router{r}
, quic{std::make_unique<oxen::quic::Network>()}
, tls_creds{oxen::quic::GNUTLSCreds::make_from_ed_keys(
{reinterpret_cast<const char*>(router.identity().data()), size_t{32}},
{reinterpret_cast<const char*>(router.identity().toPublic().data()), size_t{32}})}
{reinterpret_cast<const char*>(_router.identity().data()), size_t{32}},
{reinterpret_cast<const char*>(_router.identity().toPublic().data()), size_t{32}})}
, ep{startup_endpoint(), *this}
{}
@ -203,7 +203,7 @@ namespace llarp
if (func)
{
func = [this, f = std::move(func)](oxen::quic::message m) mutable {
router.loop()->call([func = std::move(f), msg = std::move(m)]() mutable { func(msg); });
_router.loop()->call([func = std::move(f), msg = std::move(m)]() mutable { func(msg); });
};
}
@ -226,7 +226,7 @@ namespace llarp
return true;
}
router.loop()->call([this, remote, endpoint, body, f = std::move(func)]() {
_router.loop()->call([this, remote, endpoint, body, f = std::move(func)]() {
auto pending = PendingControlMessage(body, endpoint, f);
auto [itr, b] = pending_conn_msg_queue.emplace(remote, MessageQueue());
@ -260,7 +260,7 @@ namespace llarp
return true;
}
router.loop()->call([&]() {
_router.loop()->call([&]() {
auto pending = PendingDataMessage(body);
auto [itr, b] = pending_conn_msg_queue.emplace(remote, MessageQueue());
@ -289,7 +289,7 @@ namespace llarp
}
void
LinkManager::connect_to(RouterID rid)
LinkManager::connect_to(const RouterID& rid)
{
rc_lookup->get_rc(
rid,
@ -308,7 +308,7 @@ namespace llarp
// This function assumes the RC has already had its signature verified and connection is allowed.
void
LinkManager::connect_to(RouterContact rc)
LinkManager::connect_to(const RouterContact& rc)
{
if (auto conn = ep.get_conn(rc.pubkey); conn)
{
@ -333,7 +333,7 @@ namespace llarp
void
LinkManager::on_conn_open(oxen::quic::connection_interface& ci)
{
router.loop()->call([this, &conn_interface = ci]() {
_router.loop()->call([this, &conn_interface = ci]() {
const auto& scid = conn_interface.scid();
const auto& rid = ep.connid_map[scid];
@ -367,7 +367,7 @@ namespace llarp
void
LinkManager::on_conn_closed(oxen::quic::connection_interface& ci, uint64_t ec)
{
router.loop()->call([this, &conn_interface = ci, error_code = ec]() {
_router.loop()->call([this, &conn_interface = ci, error_code = ec]() {
const auto& scid = conn_interface.scid();
log::debug(quic_cat, "Purging quic connection CID:{} (ec: {})", scid, error_code);
@ -488,7 +488,7 @@ namespace llarp
{
is_stopping = false;
rc_lookup = rcLookup;
node_db = router.node_db();
node_db = _router.node_db();
}
void
@ -543,7 +543,7 @@ namespace llarp
m.respond(serialize_response({{"STATUS", FindNameMessage::EXCEPTION}}), true);
}
router.rpc_client()->lookup_ons_hash(
_router.rpc_client()->lookup_ons_hash(
name_hash, [this, msg = std::move(m)](std::optional<service::EncryptedName> maybe) mutable {
if (maybe.has_value())
msg.respond(serialize_response({{"NAME", maybe->ciphertext}}));
@ -616,7 +616,7 @@ namespace llarp
{
log::warning(link_cat, "Exception: {}", e.what());
m.respond(
serialize_response({{"STATUS", FindRouterMessage::EXCEPTION}, {"RECIPIENT", ""}}), true);
serialize_response({{"STATUS", FindRouterMessage::EXCEPTION}, {"TARGET", ""}}), true);
return;
}
@ -626,31 +626,32 @@ namespace llarp
target_rid.FromString(target_key);
const auto target_addr = dht::Key_t{reinterpret_cast<uint8_t*>(target_key.data())};
const auto& local_rid = router.rc().pubkey;
const auto& local_rid = _router.rc().pubkey;
const auto local_key = dht::Key_t{local_rid};
if (is_exploratory)
{
std::string neighbors{};
const auto closest_rcs =
router.node_db()->FindManyClosestTo(target_addr, RC_LOOKUP_STORAGE_REDUNDANCY);
_router.node_db()->find_many_closest_to(target_addr, RC_LOOKUP_STORAGE_REDUNDANCY);
for (const auto& rc : closest_rcs)
{
const auto& rid = rc.pubkey;
if (router.router_profiling().IsBadForConnect(rid) || target_rid == rid || local_rid == rid)
if (_router.router_profiling().IsBadForConnect(rid) || target_rid == rid
|| local_rid == rid)
continue;
neighbors += oxenc::bt_serialize(rid.ToString());
neighbors += rid.bt_encode();
}
m.respond(
serialize_response({{"STATUS", FindRouterMessage::RETRY_EXP}, {"RECIPIENT", neighbors}}),
serialize_response({{"STATUS", FindRouterMessage::RETRY_EXP}, {"TARGET", neighbors}}),
true);
}
else
{
const auto closest_rc = router.node_db()->FindClosestTo(target_addr);
const auto closest_rc = _router.node_db()->find_closest_to(target_addr);
const auto& closest_rid = closest_rc.pubkey;
const auto closest_key = dht::Key_t{closest_rid};
@ -668,7 +669,7 @@ namespace llarp
}
else
{
m.respond(serialize_response({{"RC", closest_rc.ToString()}}));
m.respond(serialize_response({{"RC", closest_rc.bt_encode()}}));
}
}
else if (not is_iterative)
@ -687,7 +688,7 @@ namespace llarp
{
m.respond(
serialize_response(
{{"STATUS", FindRouterMessage::RETRY_ITER}, {"RECIPIENT", target_addr.data()}}),
{{"STATUS", FindRouterMessage::RETRY_ITER}, {"TARGET", target_addr.data()}}),
true);
}
}
@ -696,7 +697,7 @@ namespace llarp
m.respond(
serialize_response(
{{"STATUS", FindRouterMessage::RETRY_NEW},
{"RECIPIENT", reinterpret_cast<const char*>(closest_rid.data())}}),
{"TARGET", reinterpret_cast<const char*>(closest_rid.data())}}),
true);
}
}
@ -721,8 +722,8 @@ namespace llarp
payload = btdc.require<std::string>("RC");
else
{
status = btdc.require<std::string>("STATUS");
payload = btdc.require<std::string>("RECIPIENT");
status = btdc.require<std::string>("TARGET");
}
}
catch (const std::exception& e)
@ -733,7 +734,7 @@ namespace llarp
if (m)
{
router.node_db()->PutIfNewer(RouterContact{payload});
_router.node_db()->put_rc_if_newer(RouterContact{payload});
}
else
{
@ -767,6 +768,53 @@ namespace llarp
}
}
void
LinkManager::handle_find_router_error(oxen::quic::message&& m)
{
std::string status, payload;
try
{
oxenc::bt_dict_consumer btdc{m.body()};
payload = btdc.require<std::string>("RECIPIENT");
status = btdc.require<std::string>("TARGET");
}
catch (const std::exception& e)
{
log::warning(link_cat, "Exception: {}", e.what());
return;
}
if (status == FindRouterMessage::EXCEPTION)
{
log::info(link_cat, "FindRouterMessage failed with remote exception!");
// Do something smart here probably
return;
}
RouterID target{reinterpret_cast<uint8_t*>(payload.data())};
if (status == FindRouterMessage::RETRY_EXP)
{
log::info(link_cat, "FindRouterMessage failed, retrying as exploratory!");
send_control_message(
target, "find_router", FindRouterMessage::serialize(target, false, true));
}
else if (status == FindRouterMessage::RETRY_ITER)
{
log::info(link_cat, "FindRouterMessage failed, retrying as iterative!");
send_control_message(
target, "find_router", FindRouterMessage::serialize(target, true, false));
}
else if (status == FindRouterMessage::RETRY_NEW)
{
log::info(link_cat, "FindRouterMessage failed, retrying with new recipient!");
send_control_message(
target, "find_router", FindRouterMessage::serialize(target, false, false));
}
}
void
LinkManager::handle_publish_intro(oxen::quic::message m)
{
@ -797,9 +845,9 @@ namespace llarp
return;
}
const auto now = router.now();
const auto now = _router.now();
const auto addr = dht::Key_t{reinterpret_cast<uint8_t*>(derived_signing_key.data())};
const auto local_key = router.rc().pubkey;
const auto local_key = _router.rc().pubkey;
if (not service::EncryptedIntroSet::verify(introset, derived_signing_key, sig))
{
@ -815,7 +863,7 @@ namespace llarp
return;
}
auto closest_rcs = router.node_db()->FindManyClosestTo(addr, INTROSET_STORAGE_REDUNDANCY);
auto closest_rcs = _router.node_db()->find_many_closest_to(addr, INTROSET_STORAGE_REDUNDANCY);
if (closest_rcs.size() != INTROSET_STORAGE_REDUNDANCY)
{
@ -849,7 +897,7 @@ namespace llarp
"Received PublishIntroMessage in which we are peer index {}.. storing introset",
relay_order);
router.contacts()->services()->PutNode(dht::ISNode{std::move(enc)});
_router.contacts()->services()->PutNode(dht::ISNode{std::move(enc)});
m.respond(serialize_response({{"STATUS", ""}}));
}
else
@ -882,7 +930,7 @@ namespace llarp
{
log::info(link_cat, "Received PublishIntroMessage for {} (TXID: {}); we are candidate {}");
router.contacts()->services()->PutNode(dht::ISNode{std::move(enc)});
_router.contacts()->services()->PutNode(dht::ISNode{std::move(enc)});
m.respond(serialize_response());
}
else
@ -973,7 +1021,7 @@ namespace llarp
return;
}
auto closest_rcs = router.node_db()->FindManyClosestTo(addr, INTROSET_STORAGE_REDUNDANCY);
auto closest_rcs = _router.node_db()->find_many_closest_to(addr, INTROSET_STORAGE_REDUNDANCY);
if (closest_rcs.size() != INTROSET_STORAGE_REDUNDANCY)
{
@ -1010,7 +1058,7 @@ namespace llarp
}
else
{
if (auto maybe_intro = router.contacts()->get_introset_by_location(addr))
if (auto maybe_intro = _router.contacts()->get_introset_by_location(addr))
m.respond(serialize_response({{"INTROSET", maybe_intro->bt_encode()}}));
else
{
@ -1048,7 +1096,7 @@ namespace llarp
if (m)
{
service::EncryptedIntroSet enc{payload};
router.contacts()->services()->PutNode(std::move(enc));
_router.contacts()->services()->PutNode(std::move(enc));
}
else
{
@ -1060,7 +1108,7 @@ namespace llarp
void
LinkManager::handle_path_build(oxen::quic::message m)
{
if (!router.path_context().AllowingTransit())
if (!_router.path_context().AllowingTransit())
{
log::warning("got path build request when not permitting transit");
m.respond(serialize_response({{"STATUS", PathBuildMessage::NO_TRANSIT}}), true);
@ -1100,7 +1148,7 @@ namespace llarp
SharedSecret shared;
// derive shared secret using ephemeral pubkey and our secret key (and nonce)
if (!crypto->dh_server(shared, otherPubkey, router.identity(), nonce))
if (!crypto->dh_server(shared, otherPubkey, _router.identity(), nonce))
{
log::info("DH failed during path build.");
m.respond(serialize_response({{"STATUS", PathBuildMessage::BAD_CRYPTO}}), true);
@ -1162,7 +1210,7 @@ namespace llarp
// a different upstream, that would be "unique" but we wouldn't know where
// to route messages (nevermind that messages don't currently know the RouterID
// they came from).
if (router.path_context.HasTransitHop(hop->info))
if (_router.path_context.HasTransitHop(hop->info))
{
log::info("Invalid PathID; PathIDs must be unique.");
m.respond(serialize_response({{"STATUS", PathBuildMessage::BAD_PATHID}}), true);
@ -1172,7 +1220,7 @@ namespace llarp
otherPubkey.from_string_view(hop_dict.require<std::string_view>("commkey"));
nonce.from_string_view(hop_dict.require<std::string_view>("nonce"));
if (!crypto->dh_server(hop->pathKey, otherPubkey, router.identity(), nonce))
if (!crypto->dh_server(hop->pathKey, otherPubkey, _router.identity(), nonce))
{
log::info("DH failed during path build.");
m.respond(serialize_response({{"STATUS", PathBuildMessage::BAD_CRYPTO}}), true);
@ -1189,13 +1237,13 @@ namespace llarp
m.respond(serialize_response({{"STATUS", PathBuildMessage::BAD_LIFETIME}}), true);
return;
}
hop->started = router.now();
router.persist_connection_until(hop->info.downstream, hop->ExpireTime() + 10s);
hop->started = _router.now();
_router.persist_connection_until(hop->info.downstream, hop->ExpireTime() + 10s);
if (hop->info.upstream == router.pubkey())
if (hop->info.upstream == _router.pubkey())
{
// we are terminal hop and everything is okay
router.path_context.PutTransitHop(hop);
_router.path_context.PutTransitHop(hop);
m.respond(serialize_response({{"STATUS", PathBuildMessage::OK}}), false);
return;
}
@ -1215,7 +1263,7 @@ namespace llarp
link_cat,
"Upstream returned successful path build response; giving hop info to Router, "
"then relaying response");
router.path_context.PutTransitHop(hop);
_router.path_context.PutTransitHop(hop);
m.respond(response.body_str(), false);
return;
}
@ -1361,17 +1409,17 @@ namespace llarp
RouterID target{pubkey.data()};
auto transit_hop = std::static_pointer_cast<path::TransitHop>(
router.path_context().GetByUpstream(target, PathID_t{to_usv(tx_id).data()}));
_router.path_context().GetByUpstream(target, PathID_t{to_usv(tx_id).data()}));
const auto rx_id = transit_hop->info.rxID;
const auto next_seqno = transit_hop->NextSeqNo();
auto success =
(CryptoManager::instance()->verify(pubkey, to_usv(dict_data), sig)
and router.exitContext().ObtainNewExit(PubKey{pubkey.data()}, rx_id, flag != 0));
and _router.exitContext().ObtainNewExit(PubKey{pubkey.data()}, rx_id, flag != 0));
m.respond(
ObtainExit::sign_and_serialize_response(router.identity(), next_seqno, tx_id),
ObtainExit::sign_and_serialize_response(_router.identity(), next_seqno, tx_id),
not success);
}
catch (const std::exception& e)
@ -1408,9 +1456,9 @@ namespace llarp
tx_id = btdc.require<std::string_view>("T");
auto path_ptr = std::static_pointer_cast<path::Path>(
router.path_context().GetByDownstream(router.pubkey(), PathID_t{to_usv(tx_id).data()}));
_router.path_context().GetByDownstream(_router.pubkey(), PathID_t{to_usv(tx_id).data()}));
if (CryptoManager::instance()->verify(router.pubkey(), to_usv(dict_data), sig))
if (CryptoManager::instance()->verify(_router.pubkey(), to_usv(dict_data), sig))
path_ptr->enable_exit_traffic();
}
catch (const std::exception& e)
@ -1437,17 +1485,18 @@ namespace llarp
tx_id = btdc.require<std::string_view>("T");
auto transit_hop = std::static_pointer_cast<path::TransitHop>(
router.path_context().GetByUpstream(router.pubkey(), PathID_t{to_usv(tx_id).data()}));
_router.path_context().GetByUpstream(_router.pubkey(), PathID_t{to_usv(tx_id).data()}));
const auto next_seqno = transit_hop->NextSeqNo();
if (auto exit_ep = router.exitContext().FindEndpointForPath(PathID_t{to_usv(path_id).data()}))
if (auto exit_ep =
_router.exitContext().FindEndpointForPath(PathID_t{to_usv(path_id).data()}))
{
if (CryptoManager::instance()->verify(exit_ep->PubKey().data(), to_usv(dict_data), sig))
{
(exit_ep->UpdateLocalPath(transit_hop->info.rxID))
? m.respond(
UpdateExit::sign_and_serialize_response(router.identity(), next_seqno, tx_id))
UpdateExit::sign_and_serialize_response(_router.identity(), next_seqno, tx_id))
: m.respond(serialize_response({{"STATUS", UpdateExit::UPDATE_FAILED}}), true);
}
// If we fail to verify the message, no-op
@ -1487,9 +1536,9 @@ namespace llarp
tx_id = btdc.require<std::string_view>("T");
auto path_ptr = std::static_pointer_cast<path::Path>(
router.path_context().GetByDownstream(router.pubkey(), PathID_t{to_usv(tx_id).data()}));
_router.path_context().GetByDownstream(_router.pubkey(), PathID_t{to_usv(tx_id).data()}));
if (CryptoManager::instance()->verify(router.pubkey(), to_usv(dict_data), sig))
if (CryptoManager::instance()->verify(_router.pubkey(), to_usv(dict_data), sig))
{
if (path_ptr->update_exit(std::stoul(tx_id)))
{
@ -1523,7 +1572,7 @@ namespace llarp
tx_id = btdc.require<std::string_view>("T");
auto transit_hop = std::static_pointer_cast<path::TransitHop>(
router.path_context().GetByUpstream(router.pubkey(), PathID_t{to_usv(tx_id).data()}));
_router.path_context().GetByUpstream(_router.pubkey(), PathID_t{to_usv(tx_id).data()}));
const auto rx_id = transit_hop->info.rxID;
const auto next_seqno = transit_hop->NextSeqNo();
@ -1533,7 +1582,7 @@ namespace llarp
if (CryptoManager::instance()->verify(exit_ep->PubKey().data(), to_usv(dict_data), sig))
{
exit_ep->Close();
m.respond(CloseExit::sign_and_serialize_response(router.identity(), next_seqno, tx_id));
m.respond(CloseExit::sign_and_serialize_response(_router.identity(), next_seqno, tx_id));
}
}
@ -1574,10 +1623,10 @@ namespace llarp
nonce = btdc.require<std::string_view>("Y");
auto path_ptr = std::static_pointer_cast<path::Path>(
router.path_context().GetByDownstream(router.pubkey(), PathID_t{to_usv(tx_id).data()}));
_router.path_context().GetByDownstream(_router.pubkey(), PathID_t{to_usv(tx_id).data()}));
if (path_ptr->SupportsAnyRoles(path::ePathRoleExit | path::ePathRoleSVC)
and CryptoManager::instance()->verify(router.pubkey(), to_usv(dict_data), sig))
and CryptoManager::instance()->verify(_router.pubkey(), to_usv(dict_data), sig))
path_ptr->close_exit();
}
catch (const std::exception& e)

@ -63,13 +63,11 @@ namespace llarp
bool
get_random_connection(RouterContact& router) const;
// DISCUSS: added template to forward callbacks/etc to endpoint->connect(...).
// This would be useful after combining link_manager with the redundant classes
// listed below. As a result, link_manager would be holding all the relevant
// callbacks, tls_creds, and other context required for endpoint management
template <typename... Opt>
bool
establish_connection(const oxen::quic::Address& remote, RouterContact& rc, Opt&&... opts);
establish_connection(
const oxen::quic::Address& remote, const RouterContact& rc, Opt&&... opts);
void
for_each_connection(std::function<void(link::Connection&)> func);
@ -151,6 +149,12 @@ namespace llarp
bool
send_data_message(const RouterID& remote, std::string data);
Router&
router() const
{
return _router;
}
private:
bool
send_control_message_impl(
@ -178,7 +182,7 @@ namespace llarp
oxen::quic::Address addr;
Router& router;
Router& _router;
// FIXME: Lokinet currently expects to be able to kill all network functionality before
// finishing other shutdown things, including destroying this class, and that is all in
@ -228,10 +232,10 @@ namespace llarp
deregister_peer(RouterID remote);
void
connect_to(RouterID router);
connect_to(const RouterID& router);
void
connect_to(RouterContact rc);
connect_to(const RouterContact& rc);
void
close_connection(RouterID rid);
@ -342,6 +346,12 @@ namespace llarp
std::string
serialize_response(oxenc::bt_dict supplement = {});
public:
// Public response functions and error handling functions invoked elsehwere. These take
// r-value references s.t. that message is taken out of calling scope
void
handle_find_router_error(oxen::quic::message&& m);
};
namespace link
@ -349,7 +359,7 @@ namespace llarp
template <typename... Opt>
bool
Endpoint::establish_connection(
const oxen::quic::Address& remote, RouterContact& rc, Opt&&... opts)
const oxen::quic::Address& remote, const RouterContact& rc, Opt&&... opts)
{
try
{

@ -1,8 +1,8 @@
#include "nodedb.hpp"
#include "router_contact.hpp"
#include "crypto/crypto.hpp"
#include "crypto/types.hpp"
#include "router_contact.hpp"
#include "util/buffer.hpp"
#include "util/fs.hpp"
#include "util/logging.hpp"
@ -58,15 +58,14 @@ namespace llarp
constexpr auto FlushInterval = 5min;
NodeDB::NodeDB(fs::path root, std::function<void(std::function<void()>)> diskCaller)
: m_Root{std::move(root)}
NodeDB::NodeDB(fs::path root, std::function<void(std::function<void()>)> diskCaller, Router* r)
: router{*r}
, m_Root{std::move(root)}
, disk(std::move(diskCaller))
, m_NextFlushAt{time_now_ms() + FlushInterval}
{
EnsureSkiplist(m_Root);
}
NodeDB::NodeDB() : m_Root{}, disk{[](auto) {}}, m_NextFlushAt{0s}
{}
void
NodeDB::Tick(llarp_time_t now)
@ -79,21 +78,21 @@ namespace llarp
m_NextFlushAt += FlushInterval;
// make copy of all rcs
std::vector<RouterContact> copy;
for (const auto& item : m_Entries)
for (const auto& item : entries)
copy.push_back(item.second.rc);
// flush them to disk in one big job
// TODO: split this up? idk maybe some day...
disk([this, data = std::move(copy)]() {
for (const auto& rc : data)
{
rc.Write(GetPathForPubkey(rc.pubkey));
rc.Write(get_path_by_pubkey(rc.pubkey));
}
});
}
}
fs::path
NodeDB::GetPathForPubkey(RouterID pubkey) const
NodeDB::get_path_by_pubkey(RouterID pubkey) const
{
std::string hexString = oxenc::to_hex(pubkey.begin(), pubkey.end());
std::string skiplistDir;
@ -107,7 +106,7 @@ namespace llarp
}
void
NodeDB::LoadFromDisk()
NodeDB::load_from_disk()
{
if (m_Root.empty())
return;
@ -151,7 +150,7 @@ namespace llarp
// validate signature and purge entries with invalid signatures
// load ones with valid signatures
if (rc.VerifySignature())
m_Entries.emplace(rc.pubkey, rc);
entries.emplace(rc.pubkey, rc);
else
purge.emplace(f);
@ -169,94 +168,97 @@ namespace llarp
}
void
NodeDB::SaveToDisk() const
NodeDB::save_to_disk() const
{
if (m_Root.empty())
return;
for (const auto& item : m_Entries)
{
item.second.rc.Write(GetPathForPubkey(item.first));
}
router.loop()->call([this]() {
for (const auto& item : entries)
item.second.rc.Write(get_path_by_pubkey(item.first));
});
}
bool
NodeDB::Has(RouterID pk) const
NodeDB::has_router(RouterID pk) const
{
util::NullLock lock{m_Access};
return m_Entries.find(pk) != m_Entries.end();
return router.loop()->call_get([this, pk]() { return entries.find(pk) != entries.end(); });
}
std::optional<RouterContact>
NodeDB::Get(RouterID pk) const
NodeDB::get_rc(RouterID pk) const
{
util::NullLock lock{m_Access};
const auto itr = m_Entries.find(pk);
if (itr == m_Entries.end())
return std::nullopt;
return itr->second.rc;
return router.loop()->call_get([this, pk]() -> std::optional<RouterContact> {
const auto itr = entries.find(pk);
if (itr == entries.end())
return std::nullopt;
return itr->second.rc;
});
}
void
NodeDB::Remove(RouterID pk)
NodeDB::remove_router(RouterID pk)
{
util::NullLock lock{m_Access};
m_Entries.erase(pk);
AsyncRemoveManyFromDisk({pk});
router.loop()->call([this, pk]() {
entries.erase(pk);
remove_many_from_disk_async({pk});
});
}
void
NodeDB::RemoveStaleRCs(std::unordered_set<RouterID> keep, llarp_time_t cutoff)
NodeDB::remove_stale_rcs(std::unordered_set<RouterID> keep, llarp_time_t cutoff)
{
util::NullLock lock{m_Access};
std::unordered_set<RouterID> removed;
auto itr = m_Entries.begin();
while (itr != m_Entries.end())
{
if (itr->second.insertedAt < cutoff and keep.count(itr->second.rc.pubkey) == 0)
router.loop()->call([this, keep, cutoff]() {
std::unordered_set<RouterID> removed;
auto itr = entries.begin();
while (itr != entries.end())
{
removed.insert(itr->second.rc.pubkey);
itr = m_Entries.erase(itr);
if (itr->second.insertedAt < cutoff and keep.count(itr->second.rc.pubkey) == 0)
{
removed.insert(itr->second.rc.pubkey);
itr = entries.erase(itr);
}
else
++itr;
}
else
++itr;
}
if (not removed.empty())
AsyncRemoveManyFromDisk(std::move(removed));
if (not removed.empty())
remove_many_from_disk_async(std::move(removed));
});
}
void
NodeDB::Put(RouterContact rc)
NodeDB::put_rc(RouterContact rc)
{
util::NullLock lock{m_Access};
m_Entries.erase(rc.pubkey);
m_Entries.emplace(rc.pubkey, rc);
router.loop()->call([this, rc]() {
entries.erase(rc.pubkey);
entries.emplace(rc.pubkey, rc);
});
}
size_t
NodeDB::NumLoaded() const
NodeDB::num_loaded() const
{
util::NullLock lock{m_Access};
return m_Entries.size();
return router.loop()->call_get([this]() { return entries.size(); });
}
void
NodeDB::PutIfNewer(RouterContact rc)
NodeDB::put_rc_if_newer(RouterContact rc)
{
util::NullLock lock{m_Access};
auto itr = m_Entries.find(rc.pubkey);
if (itr == m_Entries.end() or itr->second.rc.OtherIsNewer(rc))
{
// delete if existing
if (itr != m_Entries.end())
m_Entries.erase(itr);
// add new entry
m_Entries.emplace(rc.pubkey, rc);
}
router.loop()->call([this, rc]() {
auto itr = entries.find(rc.pubkey);
if (itr == entries.end() or itr->second.rc.OtherIsNewer(rc))
{
// delete if existing
if (itr != entries.end())
entries.erase(itr);
// add new entry
entries.emplace(rc.pubkey, rc);
}
});
}
void
NodeDB::AsyncRemoveManyFromDisk(std::unordered_set<RouterID> remove) const
NodeDB::remove_many_from_disk_async(std::unordered_set<RouterID> remove) const
{
if (m_Root.empty())
return;
@ -264,7 +266,7 @@ namespace llarp
std::set<fs::path> files;
for (auto id : remove)
{
files.emplace(GetPathForPubkey(std::move(id)));
files.emplace(get_path_by_pubkey(std::move(id)));
}
// remove them from the disk via the diskio thread
disk([files]() {
@ -274,50 +276,50 @@ namespace llarp
}
llarp::RouterContact
NodeDB::FindClosestTo(llarp::dht::Key_t location) const
NodeDB::find_closest_to(llarp::dht::Key_t location) const
{
util::NullLock lock{m_Access};
llarp::RouterContact rc;
const llarp::dht::XorMetric compare(location);
VisitAll([&rc, compare](const auto& otherRC) {
if (rc.pubkey.IsZero())
{
rc = otherRC;
return;
}
if (compare(
llarp::dht::Key_t{otherRC.pubkey.as_array()},
llarp::dht::Key_t{rc.pubkey.as_array()}))
rc = otherRC;
return router.loop()->call_get([this, location]() {
llarp::RouterContact rc;
const llarp::dht::XorMetric compare(location);
VisitAll([&rc, compare](const auto& otherRC) {
if (rc.pubkey.IsZero())
{
rc = otherRC;
return;
}
if (compare(
llarp::dht::Key_t{otherRC.pubkey.as_array()},
llarp::dht::Key_t{rc.pubkey.as_array()}))
rc = otherRC;
});
return rc;
});
return rc;
}
std::vector<RouterContact>
NodeDB::FindManyClosestTo(llarp::dht::Key_t location, uint32_t numRouters) const
NodeDB::find_many_closest_to(llarp::dht::Key_t location, uint32_t numRouters) const
{
util::NullLock lock{m_Access};
std::vector<const RouterContact*> all;
return router.loop()->call_get([this, location, numRouters]() {
std::vector<const RouterContact*> all;
const auto& entries = m_Entries;
all.reserve(entries.size());
for (auto& entry : entries)
{
all.push_back(&entry.second.rc);
}
all.reserve(entries.size());
for (auto& entry : entries)
{
all.push_back(&entry.second.rc);
}
auto it_mid = numRouters < all.size() ? all.begin() + numRouters : all.end();
std::partial_sort(
all.begin(), it_mid, all.end(), [compare = dht::XorMetric{location}](auto* a, auto* b) {
return compare(*a, *b);
});
auto it_mid = numRouters < all.size() ? all.begin() + numRouters : all.end();
std::partial_sort(
all.begin(), it_mid, all.end(), [compare = dht::XorMetric{location}](auto* a, auto* b) {
return compare(*a, *b);
});
std::vector<RouterContact> closest;
closest.reserve(numRouters);
for (auto it = all.begin(); it != it_mid; ++it)
closest.push_back(**it);
std::vector<RouterContact> closest;
closest.reserve(numRouters);
for (auto it = all.begin(); it != it_mid; ++it)
closest.push_back(**it);
return closest;
return closest;
});
}
} // namespace llarp

@ -2,12 +2,13 @@
#include "router_contact.hpp"
#include "router_id.hpp"
#include "dht/key.hpp"
#include "crypto/crypto.hpp"
#include "util/common.hpp"
#include "util/fs.hpp"
#include "util/thread/threading.hpp"
#include "util/thread/annotations.hpp"
#include "dht/key.hpp"
#include "crypto/crypto.hpp"
#include <llarp/router/router.hpp>
#include <set>
#include <optional>
@ -19,6 +20,8 @@
namespace llarp
{
struct Router;
class NodeDB
{
struct Entry
@ -27,12 +30,13 @@ namespace llarp
llarp_time_t insertedAt;
explicit Entry(RouterContact rc);
};
using NodeMap = std::unordered_map<RouterID, Entry>;
NodeMap m_Entries;
NodeMap entries;
const Router& router;
const fs::path m_Root;
const std::function<void(std::function<void()>)> disk;
llarp_time_t m_NextFlushAt;
@ -41,29 +45,30 @@ namespace llarp
/// asynchronously remove the files for a set of rcs on disk given their public ident key
void
AsyncRemoveManyFromDisk(std::unordered_set<RouterID> idents) const;
remove_many_from_disk_async(std::unordered_set<RouterID> idents) const;
/// get filename of an RC file given its public ident key
fs::path
GetPathForPubkey(RouterID pk) const;
get_path_by_pubkey(RouterID pk) const;
public:
explicit NodeDB(fs::path rootdir, std::function<void(std::function<void()>)> diskCaller);
explicit NodeDB(
fs::path rootdir, std::function<void(std::function<void()>)> diskCaller, Router* r);
/// in memory nodedb
NodeDB();
/// load all entries from disk syncrhonously
void
LoadFromDisk();
load_from_disk();
/// explicit save all RCs to disk synchronously
void
SaveToDisk() const;
save_to_disk() const;
/// the number of RCs that are loaded from disk
size_t
NumLoaded() const;
num_loaded() const;
/// do periodic tasks like flush to disk and expiration
void
@ -71,39 +76,39 @@ namespace llarp
/// find the absolute closets router to a dht location
RouterContact
FindClosestTo(dht::Key_t location) const;
find_closest_to(dht::Key_t location) const;
/// find many routers closest to dht key
std::vector<RouterContact>
FindManyClosestTo(dht::Key_t location, uint32_t numRouters) const;
find_many_closest_to(dht::Key_t location, uint32_t numRouters) const;
/// return true if we have an rc by its ident pubkey
bool
Has(RouterID pk) const;
has_router(RouterID pk) const;
/// maybe get an rc by its ident pubkey
std::optional<RouterContact>
Get(RouterID pk) const;
get_rc(RouterID pk) const;
template <typename Filter>
std::optional<RouterContact>
GetRandom(Filter visit) const
{
util::NullLock lock{m_Access};
return router.loop()->call_get([this, visit]() -> std::optional<RouterContact> {
std::vector<const decltype(entries)::value_type*> entries;
for (const auto& entry : entries)
entries.push_back(entry);
std::vector<const decltype(m_Entries)::value_type*> entries;
for (const auto& entry : m_Entries)
entries.push_back(&entry);
std::shuffle(entries.begin(), entries.end(), llarp::CSRNG{});
std::shuffle(entries.begin(), entries.end(), llarp::CSRNG{});
for (const auto entry : entries)
{
if (visit(entry->second.rc))
return entry->second.rc;
}
for (const auto entry : entries)
{
if (visit(entry->second.rc))
return entry->second.rc;
}
return std::nullopt;
return std::nullopt;
});
}
/// visit all entries
@ -111,11 +116,10 @@ namespace llarp
void
VisitAll(Visit visit) const
{
util::NullLock lock{m_Access};
for (const auto& item : m_Entries)
{
visit(item.second.rc);
}
router.loop()->call([this, visit]() {
for (const auto& item : entries)
visit(item.second.rc);
});
}
/// visit all entries inserted before a timestamp
@ -123,50 +127,52 @@ namespace llarp
void
VisitInsertedBefore(Visit visit, llarp_time_t insertedBefore)
{
util::NullLock lock{m_Access};
for (const auto& item : m_Entries)
{
if (item.second.insertedAt < insertedBefore)
visit(item.second.rc);
}
router.loop()->call([this, visit, insertedBefore]() {
for (const auto& item : entries)
{
if (item.second.insertedAt < insertedBefore)
visit(item.second.rc);
}
});
}
/// remove an entry via its ident pubkey
void
Remove(RouterID pk);
remove_router(RouterID pk);
/// remove an entry given a filter that inspects the rc
template <typename Filter>
void
RemoveIf(Filter visit)
{
util::NullLock lock{m_Access};
std::unordered_set<RouterID> removed;
auto itr = m_Entries.begin();
while (itr != m_Entries.end())
{
if (visit(itr->second.rc))
router.loop()->call([this, visit]() {
std::unordered_set<RouterID> removed;
auto itr = entries.begin();
while (itr != entries.end())
{
removed.insert(itr->second.rc.pubkey);
itr = m_Entries.erase(itr);
if (visit(itr->second.rc))
{
removed.insert(itr->second.rc.pubkey);
itr = entries.erase(itr);
}
else
++itr;
}
else
++itr;
}
if (not removed.empty())
AsyncRemoveManyFromDisk(std::move(removed));
if (not removed.empty())
remove_many_from_disk_async(std::move(removed));
});
}
/// remove rcs that are not in keep and have been inserted before cutoff
void
RemoveStaleRCs(std::unordered_set<RouterID> keep, llarp_time_t cutoff);
remove_stale_rcs(std::unordered_set<RouterID> keep, llarp_time_t cutoff);
/// put this rc into the cache if it is not there or newer than the one there already
void
PutIfNewer(RouterContact rc);
put_rc_if_newer(RouterContact rc);
/// unconditional put of rc into cache
void
Put(RouterContact rc);
put_rc(RouterContact rc);
};
} // namespace llarp

@ -277,7 +277,7 @@ namespace llarp::path
{
r->loop()->call([nodedb = r->node_db(), router = *failedAt]() {
LogInfo("router ", router, " is deregistered so we remove it");
nodedb->Remove(router);
nodedb->remove_router(router);
});
}
}

@ -332,7 +332,7 @@ namespace llarp
};
RouterContact endpointRC;
if (const auto maybe = router->node_db()->Get(endpoint))
if (const auto maybe = router->node_db()->get_rc(endpoint))
{
endpointRC = *maybe;
}
@ -526,14 +526,14 @@ namespace llarp
{
log::info(path_cat, "Refreshed RouterContact for {}", rid);
;
r->node_db()->PutIfNewer(*rc);
r->node_db()->put_rc_if_newer(*rc);
}
else
{
// remove all connections to this router as it's probably not registered anymore
log::warning(path_cat, "Removing router {} due to path build timeout", rid);
r->link_manager().deregister_peer(rid);
r->node_db()->Remove(rid);
r->node_db()->remove_router(rid);
}
},
true);

@ -68,7 +68,7 @@ namespace llarp
RouterContact remoteRC;
if (not forceLookup)
{
if (const auto maybe = node_db->Get(router); maybe.has_value())
if (const auto maybe = node_db->get_rc(router); maybe.has_value())
{
remoteRC = *maybe;
if (callback)
@ -95,14 +95,43 @@ namespace llarp
if (shouldDoLookup)
{
auto lookup_cb = [this](oxen::quic::message m) mutable {
auto& r = link_manager->router();
if (m)
{
std::string payload;
try
{
oxenc::bt_dict_consumer btdc{m.body()};
payload = btdc.require<std::string>("RC");
}
catch (...)
{
log::warning(link_cat, "Failed to parse Find Router response!");
throw;
}
RouterContact result{std::move(payload)};
r.node_db()->put_rc_if_newer(result);
r.connect_to(result);
}
else
{
link_manager->handle_find_router_error(std::move(m));
}
};
// if we are a client try using the hidden service endpoints
if (!isServiceNode)
{
bool sent = false;
LogInfo("Lookup ", router, " anonymously");
hidden_service_context->ForEachService(
[&](const std::string&, const std::shared_ptr<service::Endpoint>& ep) -> bool {
const bool success = ep->lookup_router(router);
[&, cb = lookup_cb](
const std::string&, const std::shared_ptr<service::Endpoint>& ep) -> bool {
const bool success = ep->lookup_router(router, cb);
sent = sent || success;
return !success;
});
@ -111,7 +140,7 @@ namespace llarp
LogWarn("cannot lookup ", router, " anonymously");
}
if (not contacts->lookup_router(router))
if (not contacts->lookup_router(router, lookup_cb))
finalize_request(router, nullptr, RCRequestResult::RouterNotFound);
else
router_lookup_times[router] = std::chrono::steady_clock::now();
@ -203,7 +232,7 @@ namespace llarp
if (rc.IsPublicRouter())
{
LogDebug("Adding or updating RC for ", RouterID(rc.pubkey), " to nodedb and dht.");
loop->call([rc, n = node_db] { n->PutIfNewer(rc); });
loop->call([rc, n = node_db] { n->put_rc_if_newer(rc); });
contacts->put_rc_node_async(rc);
}
@ -273,13 +302,13 @@ namespace llarp
get_rc(router, nullptr, true);
}
node_db->RemoveStaleRCs(boostrap_rid_list, now - RouterContact::StaleInsertionAge);
node_db->remove_stale_rcs(boostrap_rid_list, now - RouterContact::StaleInsertionAge);
}
void
RCLookupHandler::explore_network()
{
const size_t known = node_db->NumLoaded();
const size_t known = node_db->num_loaded();
if (bootstrap_rc_list.empty() && known == 0)
{
LogError("we have no bootstrap nodes specified");
@ -302,19 +331,20 @@ namespace llarp
std::vector<RouterID> lookupRouters;
lookupRouters.reserve(LookupPerTick);
const auto now = std::chrono::steady_clock::now();
std::vector<RouterID> lrs = link_manager->router().loop()->call_get([this]() {
std::vector<RouterID> lookups;
lookups.reserve(LookupPerTick);
const auto now = std::chrono::steady_clock::now();
{
// if we are using a whitelist look up a few routers we don't have
util::Lock l(_mutex);
for (const auto& r : router_whitelist)
{
if (now > router_lookup_times[r] + RerequestInterval and not node_db->Has(r))
{
lookupRouters.emplace_back(r);
}
if (now > router_lookup_times[r] + RerequestInterval and not node_db->has_router(r))
lookups.emplace_back(r);
}
}
return lookups;
});
if (lookupRouters.size() > LookupPerTick)
{

@ -1,31 +1,32 @@
#include <memory>
#include "router.hpp"
#include <llarp/nodedb.hpp>
#include <llarp/config/config.hpp>
#include <llarp/constants/proto.hpp>
#include <llarp/constants/files.hpp>
#include <llarp/constants/time.hpp>
#include <llarp/crypto/crypto.hpp>
#include <llarp/dht/node.hpp>
#include <llarp/ev/ev.hpp>
#include <llarp/link/contacts.hpp>
#include <llarp/messages/dht.hpp>
#include <llarp/messages/link_message.hpp>
#include <llarp/net/net.hpp>
#include <stdexcept>
#include <llarp/tooling/peer_stats_event.hpp>
#include <llarp/tooling/router_event.hpp>
#include <llarp/util/buffer.hpp>
#include <llarp/util/logging.hpp>
#include <llarp/util/meta/memfn.hpp>
#include <llarp/util/str.hpp>
#include <llarp/ev/ev.hpp>
#include <llarp/tooling/peer_stats_event.hpp>
#include <llarp/tooling/router_event.hpp>
#include <llarp/util/status.hpp>
#include <memory>
#include <fstream>
#include <cstdlib>
#include <iterator>
#include <unordered_map>
#include <utility>
#include <stdexcept>
#if defined(ANDROID) || defined(IOS)
#include <unistd.h>
#endif
@ -95,7 +96,7 @@ namespace llarp
return util::StatusObject{
{"running", true},
{"numNodesKnown", _node_db->NumLoaded()},
{"numNodesKnown", _node_db->num_loaded()},
{"contacts", _contacts->ExtractStatus()},
{"services", _hidden_service_context.ExtractStatus()},
{"exit", _exit_context.ExtractStatus()},
@ -185,7 +186,7 @@ namespace llarp
{"uptime", to_json(Uptime())},
{"numPathsBuilt", pathsCount},
{"numPeersConnected", peers},
{"numRoutersKnown", _node_db->NumLoaded()},
{"numRoutersKnown", _node_db->num_loaded()},
{"ratio", ratio},
{"txRate", tx_rate},
{"rxRate", rx_rate},
@ -268,6 +269,28 @@ namespace llarp
loop_wakeup->Trigger();
}
void
Router::connect_to(const RouterID& rid)
{
_link_manager.connect_to(rid);
}
void
Router::connect_to(const RouterContact& rc)
{
_link_manager.connect_to(rc);
}
void
Router::lookup_router(RouterID rid, std::function<void(oxen::quic::message)> func)
{
_link_manager.send_control_message(
rid,
"find_router",
FindRouterMessage::serialize(std::move(rid), false, false),
std::move(func));
}
bool
Router::send_data_message(const RouterID& remote, const AbstractDataMessage& msg)
{
@ -285,23 +308,6 @@ namespace llarp
remote, std::move(ep), std::move(body), std::move(func));
}
void
Router::try_connect(fs::path rcfile)
{
RouterContact remote;
if (!remote.Read(rcfile.string().c_str()))
{
LogError("failure to decode or verify of remote RC");
return;
}
if (remote.Verify(now()))
{
LogDebug("verified signature");
_link_manager.connect_to(remote);
}
else
LogError(rcfile, " contains invalid RC");
}
void
Router::for_each_connection(std::function<void(link::Connection&)> func)
{
@ -456,7 +462,7 @@ namespace llarp
return false;
}
if (is_service_node)
_node_db->Put(router_contact);
_node_db->put_rc(router_contact);
queue_disk_io([&]() { HandleSaveRC(); });
return true;
}
@ -471,7 +477,7 @@ namespace llarp
Router::insufficient_peers() const
{
constexpr int KnownPeerWarningThreshold = 5;
return node_db()->NumLoaded() < KnownPeerWarningThreshold;
return node_db()->num_loaded() < KnownPeerWarningThreshold;
}
std::optional<std::string>
@ -826,7 +832,7 @@ namespace llarp
Router::report_stats()
{
const auto now = llarp::time_now_ms();
LogInfo(node_db()->NumLoaded(), " RCs loaded");
LogInfo(node_db()->num_loaded(), " RCs loaded");
LogInfo(bootstrap_rc_list.size(), " bootstrap peers");
LogInfo(NumberOfConnectedRouters(), " router connections");
if (IsServiceNode())
@ -851,7 +857,7 @@ namespace llarp
fmt::format_to(
out,
" snode | known/svc/clients: {}/{}/{}",
node_db()->NumLoaded(),
node_db()->num_loaded(),
NumberOfConnectedRouters(),
NumberOfConnectedClients());
fmt::format_to(
@ -871,7 +877,7 @@ namespace llarp
fmt::format_to(
out,
" client | known/connected: {}/{}",
node_db()->NumLoaded(),
node_db()->num_loaded(),
NumberOfConnectedRouters());
if (auto ep = hidden_service_context().GetDefault())
@ -1051,7 +1057,7 @@ namespace llarp
log::error(
logcat,
"We appear to be an active service node, but have only {} known peers.",
node_db()->NumLoaded());
node_db()->num_loaded());
_next_decomm_warning = now + DecommissionWarnInterval;
}
}
@ -1139,26 +1145,6 @@ namespace llarp
}
}
// TODO: refactor callers and remove this function
void
Router::LookupRouter(RouterID remote, RouterLookupHandler resultHandler)
{
_rc_lookup_handler.get_rc(
remote,
[=](const RouterID& id, const RouterContact* const rc, const RCRequestResult result) {
(void)id;
if (resultHandler)
{
std::vector<RouterContact> routers;
if (result == RCRequestResult::Success && rc != nullptr)
{
routers.push_back(*rc);
}
resultHandler(routers);
}
});
}
void
Router::set_router_whitelist(
const std::vector<RouterID>& whitelist,
@ -1255,19 +1241,19 @@ namespace llarp
{
LogInfo("Loading nodedb from disk...");
_node_db->LoadFromDisk();
_node_db->load_from_disk();
}
_contacts = std::make_shared<Contacts>(llarp::dht::Key_t(pubkey()), *this);
for (const auto& rc : bootstrap_rc_list)
{
node_db()->Put(rc);
node_db()->put_rc(rc);
_contacts->rc_nodes()->PutNode(rc);
LogInfo("added bootstrap node ", RouterID{rc.pubkey});
}
LogInfo("have ", _node_db->NumLoaded(), " routers");
LogInfo("have ", _node_db->num_loaded(), " routers");
_loop->call_every(ROUTER_TICK_INTERVAL, weak_from_this(), [this] { Tick(); });
_route_poker->start();
@ -1386,7 +1372,7 @@ namespace llarp
log::debug(logcat, "stopping links");
StopLinks();
log::debug(logcat, "saving nodedb to disk");
node_db()->SaveToDisk();
node_db()->save_to_disk();
_loop->call_later(200ms, [this] { AfterStopLinks(); });
}
@ -1486,26 +1472,6 @@ namespace llarp
return true;
}
bool
Router::TryConnectAsync(RouterContact rc, uint16_t tries)
{
(void)tries;
if (rc.pubkey == pubkey())
{
return false;
}
if (not _rc_lookup_handler.is_session_allowed(rc.pubkey))
{
return false;
}
_link_manager.connect_to(rc);
return true;
}
void
Router::queue_work(std::function<void(void)> func)
{

@ -13,7 +13,6 @@
#include <llarp/exit/context.hpp>
#include <llarp/handlers/tun.hpp>
#include <llarp/link/link_manager.hpp>
#include <llarp/nodedb.hpp>
#include <llarp/path/path_context.hpp>
#include <llarp/peerstats/peer_db.hpp>
#include <llarp/profiling.hpp>
@ -68,16 +67,6 @@ namespace llarp
struct Contacts;
class RouteManager final /* : public Router */
{
public:
std::shared_ptr<oxen::quic::connection_interface>
get_or_connect();
private:
std::shared_ptr<oxen::quic::Endpoint> ep;
};
struct Router : std::enable_shared_from_this<Router>
{
explicit Router(EventLoop_ptr loop, std::shared_ptr<vpn::Platform> vpnPlatform);
@ -186,6 +175,15 @@ namespace llarp
void
for_each_connection(std::function<void(link::Connection&)> func);
void
lookup_router(RouterID rid, std::function<void(oxen::quic::message)> = nullptr);
void
connect_to(const RouterID& rid);
void
connect_to(const RouterContact& rc);
Contacts*
contacts() const
{
@ -496,12 +494,6 @@ namespace llarp
return seckey_topublic(_identity);
}
void
try_connect(fs::path rcfile);
bool
TryConnectAsync(RouterContact rc, uint16_t tries);
/// send to remote router or queue for sending
/// returns false on overflow
/// returns true on successful queue
@ -564,9 +556,6 @@ namespace llarp
void
HandleDHTLookupForExplore(RouterID remote, const std::vector<RouterContact>& results);
void
LookupRouter(RouterID remote, RouterLookupHandler resultHandler);
bool
HasSessionTo(const RouterID& remote) const;

@ -240,7 +240,7 @@ namespace llarp
// handles when we resolved a .snode
auto handleResolvedSNodeName = [resultHandler, nodedb = router()->node_db()](auto router_id) {
std::vector<dns::SRVData> result{};
if (auto maybe_rc = nodedb->Get(router_id))
if (auto maybe_rc = nodedb->get_rc(router_id))
{
result = maybe_rc->srvRecords;
}
@ -924,7 +924,7 @@ namespace llarp
{
if (rid.IsZero())
return;
if (!router()->node_db()->Has(rid))
if (!router()->node_db()->has_router(rid))
{
lookup_router(rid);
}

Loading…
Cancel
Save