pull/2232/head
dr7ana 4 months ago
parent b15f3e7a9d
commit 2d3e68a759

@ -22,9 +22,7 @@ namespace llarp
namespace link
{
Endpoint::Endpoint(std::shared_ptr<oxen::quic::Endpoint> ep, LinkManager& lm)
: endpoint{std::move(ep)},
link_manager{lm},
_is_service_node{link_manager.is_service_node()}
: endpoint{std::move(ep)}, link_manager{lm}, _is_service_node{link_manager.is_service_node()}
{}
std::shared_ptr<link::Connection> Endpoint::get_service_conn(const RouterID& rid) const
@ -56,47 +54,12 @@ namespace llarp
bool Endpoint::have_client_conn(const RouterID& remote) const
{
return link_manager.router().loop()->call_get(
[this, remote]() { return client_conns.count(remote); });
return link_manager.router().loop()->call_get([this, remote]() { return client_conns.count(remote); });
}
bool Endpoint::have_service_conn(const RouterID& remote) const
{
return link_manager.router().loop()->call_get(
[this, remote]() { return service_conns.count(remote); });
}
std::pair<size_t, size_t> Endpoint::num_in_out() const
{
size_t in{0}, out{0};
for (const auto& c : service_conns)
{
if (c.second->is_inbound())
++in;
else
++out;
}
for (const auto& c : client_conns)
{
if (c.second->is_inbound())
++in;
else
++out;
}
return {in, out};
}
size_t Endpoint::num_client_conns() const
{
return client_conns.size();
}
size_t Endpoint::num_router_conns() const
{
return service_conns.size();
return link_manager.router().loop()->call_get([this, remote]() { return service_conns.count(remote); });
}
void Endpoint::for_each_connection(std::function<void(link::Connection&)> func)
@ -133,8 +96,56 @@ namespace llarp
log::critical(logcat, "Could not find connection to RID:{} to close!", rid);
});
}
std::tuple<size_t, size_t, size_t, size_t> Endpoint::connection_stats() const
{
size_t in{0}, out{0};
for (const auto& c : service_conns)
{
if (c.second->is_inbound())
++in;
else
++out;
}
for (const auto& c : client_conns)
{
if (c.second->is_inbound())
++in;
else
++out;
}
return {in, out, service_conns.size(), client_conns.size()};
}
size_t Endpoint::num_client_conns() const
{
return client_conns.size();
}
size_t Endpoint::num_router_conns() const
{
return service_conns.size();
}
} // namespace link
std::tuple<size_t, size_t, size_t, size_t> LinkManager::connection_stats() const
{
return ep.connection_stats();
}
size_t LinkManager::get_num_connected_routers() const
{
return ep.num_router_conns();
}
size_t LinkManager::get_num_connected_clients() const
{
return ep.num_client_conns();
}
using messages::serialize_response;
void LinkManager::for_each_connection(std::function<void(link::Connection&)> func)
@ -151,51 +162,42 @@ namespace llarp
log::debug(logcat, "{} called", __PRETTY_FUNCTION__);
s->register_handler("bfetch_rcs"s, [this](oxen::quic::message m) {
_router.loop()->call([this, msg = std::move(m)]() mutable {
handle_fetch_bootstrap_rcs(std::move(msg));
});
_router.loop()->call([this, msg = std::move(m)]() mutable { handle_fetch_bootstrap_rcs(std::move(msg)); });
});
s->register_handler("fetch_rcs"s, [this](oxen::quic::message m) {
_router.loop()->call(
[this, msg = std::move(m)]() mutable { handle_fetch_rcs(std::move(msg)); });
_router.loop()->call([this, msg = std::move(m)]() mutable { handle_fetch_rcs(std::move(msg)); });
});
s->register_handler("fetch_rids"s, [this](oxen::quic::message m) {
_router.loop()->call(
[this, msg = std::move(m)]() mutable { handle_fetch_router_ids(std::move(msg)); });
_router.loop()->call([this, msg = std::move(m)]() mutable { handle_fetch_router_ids(std::move(msg)); });
});
s->register_handler("path_build"s, [this, rid = router_id](oxen::quic::message m) {
_router.loop()->call([this, &rid, msg = std::move(m)]() mutable {
handle_path_build(std::move(msg), rid);
});
_router.loop()->call(
[this, &rid, msg = std::move(m)]() mutable { handle_path_build(std::move(msg), rid); });
});
s->register_handler("path_control"s, [this, rid = router_id](oxen::quic::message m) {
_router.loop()->call([this, &rid, msg = std::move(m)]() mutable {
handle_path_control(std::move(msg), rid);
});
_router.loop()->call(
[this, &rid, msg = std::move(m)]() mutable { handle_path_control(std::move(msg), rid); });
});
s->register_handler("gossip_rc"s, [this](oxen::quic::message m) {
_router.loop()->call(
[this, msg = std::move(m)]() mutable { handle_gossip_rc(std::move(msg)); });
_router.loop()->call([this, msg = std::move(m)]() mutable { handle_gossip_rc(std::move(msg)); });
});
for (auto& method : direct_requests)
{
s->register_handler(
std::string{method.first},
[this, func = std::move(method.second)](oxen::quic::message m) {
_router.loop()->call(
[this, msg = std::move(m), func = std::move(func)]() mutable {
auto body = msg.body_str();
auto respond = [m = std::move(msg)](std::string response) mutable {
m.respond(std::move(response), m.is_error());
};
std::invoke(func, this, body, std::move(respond));
});
std::string{method.first}, [this, func = std::move(method.second)](oxen::quic::message m) {
_router.loop()->call([this, msg = std::move(m), func = std::move(func)]() mutable {
auto body = msg.body_str();
auto respond = [m = std::move(msg)](std::string response) mutable {
m.respond(std::move(response), m.is_error());
};
std::invoke(func, this, body, std::move(respond));
});
});
}
@ -232,20 +234,15 @@ namespace llarp
auto e = quic->endpoint(
_router.listen_addr(),
[this](oxen::quic::connection_interface& ci) { return on_conn_open(ci); },
[this](oxen::quic::connection_interface& ci, uint64_t ec) {
return on_conn_closed(ci, ec);
},
[this](oxen::quic::dgram_interface& di, bstring dgram) {
recv_data_message(di, dgram);
},
[this](oxen::quic::connection_interface& ci, uint64_t ec) { return on_conn_closed(ci, ec); },
[this](oxen::quic::dgram_interface& di, bstring dgram) { recv_data_message(di, dgram); },
is_service_node() ? alpns::SERVICE_INBOUND : alpns::CLIENT_INBOUND,
is_service_node() ? alpns::SERVICE_OUTBOUND : alpns::CLIENT_OUTBOUND);
// While only service nodes accept inbound connections, clients must have this key verify
// callback set. It will reject any attempted inbound connection to a lokinet client prior
// to handshake completion
tls_creds->set_key_verify_callback([this](
const ustring_view& key, const ustring_view& alpn) {
tls_creds->set_key_verify_callback([this](const ustring_view& key, const ustring_view& alpn) {
return _router.loop()->call_get([&]() {
RouterID other{key.data()};
auto us = router().is_bootstrap_seed() ? "Bootstrap seed node"s : "Service node"s;
@ -255,11 +252,7 @@ namespace llarp
{
if (alpn == alpns::C_ALPNS)
{
log::critical(
logcat,
"{} node accepting client connection (remote ID:{})!",
us,
other);
log::critical(logcat, "{} node accepting client connection (remote ID:{})!", us, other);
ep.client_conns.emplace(other, nullptr);
return true;
}
@ -294,17 +287,13 @@ namespace llarp
"(RID:{}); {}!",
us,
other,
defer_to_incoming ? "deferring to inbound"
: "rejecting in favor of outbound");
defer_to_incoming ? "deferring to inbound" : "rejecting in favor of outbound");
return defer_to_incoming;
}
log::critical(
logcat,
"{} node accepting inbound from registered remote (RID:{})",
us,
other);
logcat, "{} node accepting inbound from registered remote (RID:{})", us, other);
}
else
log::critical(
@ -318,8 +307,7 @@ namespace llarp
return result;
}
log::critical(
logcat, "{} node received unknown ALPN; rejecting connection!", us);
log::critical(logcat, "{} node received unknown ALPN; rejecting connection!", us);
return false;
}
@ -358,31 +346,29 @@ namespace llarp
auto control = make_control(ci, rid);
_router.loop()->call(
[this, ci_ptr = ci.shared_from_this(), bstream = std::move(control), rid]() {
if (auto it = ep.service_conns.find(rid); it != ep.service_conns.end())
{
log::critical(logcat, "Configuring inbound connection from relay RID:{}", rid);
_router.loop()->call([this, ci_ptr = ci.shared_from_this(), bstream = std::move(control), rid]() {
if (auto it = ep.service_conns.find(rid); it != ep.service_conns.end())
{
log::critical(logcat, "Configuring inbound connection from relay RID:{}", rid);
it->second = std::make_shared<link::Connection>(ci_ptr, std::move(bstream));
}
else if (auto it = ep.client_conns.find(rid); it != ep.client_conns.end())
{
log::critical(logcat, "Configuring inbound connection from client RID:{}", rid);
it->second =
std::make_shared<link::Connection>(ci_ptr, std::move(bstream), false);
}
else
{
log::critical(
logcat,
"ERROR: connection accepted from RID:{} that was not logged in key "
"verification!",
rid);
}
it->second = std::make_shared<link::Connection>(ci_ptr, std::move(bstream));
}
else if (auto it = ep.client_conns.find(rid); it != ep.client_conns.end())
{
log::critical(logcat, "Configuring inbound connection from client RID:{}", rid);
it->second = std::make_shared<link::Connection>(ci_ptr, std::move(bstream), false);
}
else
{
log::critical(
logcat,
"ERROR: connection accepted from RID:{} that was not logged in key "
"verification!",
rid);
}
log::critical(logcat, "Successfully configured inbound connection fom {}...", rid);
});
log::critical(logcat, "Successfully configured inbound connection fom {}...", rid);
});
}
void LinkManager::on_outbound_conn(oxen::quic::connection_interface& ci)
@ -431,36 +417,30 @@ namespace llarp
void LinkManager::on_conn_closed(oxen::quic::connection_interface& ci, uint64_t ec)
{
_router.loop()->call(
[this, ref_id = ci.reference_id(), rid = RouterID{ci.remote_key()}, error_code = ec]() {
log::critical(quic_cat, "Purging quic connection {} (ec:{})", ref_id, error_code);
_router.loop()->call([this, ref_id = ci.reference_id(), rid = RouterID{ci.remote_key()}, error_code = ec]() {
log::critical(quic_cat, "Purging quic connection {} (ec:{})", ref_id, error_code);
if (auto s_itr = ep.service_conns.find(rid); s_itr != ep.service_conns.end())
{
log::critical(
quic_cat, "Quic connection to relay RID:{} purged successfully", rid);
ep.service_conns.erase(s_itr);
}
else if (auto c_itr = ep.client_conns.find(rid); c_itr != ep.client_conns.end())
{
log::critical(
quic_cat, "Quic connection to client RID:{} purged successfully", rid);
ep.client_conns.erase(c_itr);
}
else
log::critical(quic_cat, "Nothing to purge for quic connection {}", ref_id);
});
if (auto s_itr = ep.service_conns.find(rid); s_itr != ep.service_conns.end())
{
log::critical(quic_cat, "Quic connection to relay RID:{} purged successfully", rid);
ep.service_conns.erase(s_itr);
}
else if (auto c_itr = ep.client_conns.find(rid); c_itr != ep.client_conns.end())
{
log::critical(quic_cat, "Quic connection to client RID:{} purged successfully", rid);
ep.client_conns.erase(c_itr);
}
else
log::critical(quic_cat, "Nothing to purge for quic connection {}", ref_id);
});
}
bool LinkManager::send_control_message(
const RouterID& remote,
std::string endpoint,
std::string body,
std::function<void(oxen::quic::message m)> func)
const RouterID& remote, std::string endpoint, std::string body, std::function<void(oxen::quic::message m)> func)
{
// DISCUSS: revisit if this assert makes sense. If so, there's no need to if (func) the
// next logic block
assert(func); // makes no sense to send control message and ignore response (maybe gossip?)
// assert(func); // makes no sense to send control message and ignore response (maybe gossip?)
if (is_stopping)
return false;
@ -468,8 +448,7 @@ namespace llarp
if (func)
{
func = [this, f = std::move(func)](oxen::quic::message m) mutable {
_router.loop()->call(
[func = std::move(f), msg = std::move(m)]() mutable { func(std::move(msg)); });
_router.loop()->call([func = std::move(f), msg = std::move(m)]() mutable { func(std::move(msg)); });
};
}
@ -482,13 +461,10 @@ namespace llarp
log::critical(logcat, "Queueing message to ");
_router.loop()->call([this,
remote,
endpoint = std::move(endpoint),
body = std::move(body),
f = std::move(func)]() {
connect_and_send(remote, std::move(endpoint), std::move(body), std::move(f));
});
_router.loop()->call(
[this, remote, endpoint = std::move(endpoint), body = std::move(body), f = std::move(func)]() {
connect_and_send(remote, std::move(endpoint), std::move(body), std::move(f));
});
return false;
}
@ -504,9 +480,8 @@ namespace llarp
return true;
}
_router.loop()->call([this, body = std::move(body), remote]() {
connect_and_send(remote, std::nullopt, std::move(body));
});
_router.loop()->call(
[this, body = std::move(body), remote]() { connect_and_send(remote, std::nullopt, std::move(body)); });
return false;
}
@ -516,8 +491,7 @@ namespace llarp
return ep.close_connection(rid);
}
void LinkManager::test_reachability(
const RouterID& rid, conn_open_hook on_open, conn_closed_hook on_close)
void LinkManager::test_reachability(const RouterID& rid, conn_open_hook on_open, conn_closed_hook on_close)
{
if (auto rc = node_db->get_rc(rid))
{
@ -554,14 +528,10 @@ namespace llarp
log::warning(quic_cat, "Failed to begin establishing connection to {}", remote_addr);
}
else
log::error(
quic_cat,
"Error: Could not find RC for connection to rid:{}, message not sent!",
router);
log::error(quic_cat, "Error: Could not find RC for connection to rid:{}, message not sent!", router);
}
void LinkManager::connect_to(
const RemoteRC& rc, conn_open_hook on_open, conn_closed_hook on_close)
void LinkManager::connect_to(const RemoteRC& rc, conn_open_hook on_open, conn_closed_hook on_close)
{
const auto& rid = rc.router_id();
@ -576,10 +546,7 @@ namespace llarp
const auto& remote_addr = rc.addr();
if (auto rv = ep.establish_connection(
oxen::quic::RemoteAddress{rid.ToView(), remote_addr},
rc,
std::move(on_open),
std::move(on_close));
oxen::quic::RemoteAddress{rid.ToView(), remote_addr}, rc, std::move(on_open), std::move(on_close));
rv)
{
log::info(quic_cat, "Begun establishing connection to {}", remote_addr);
@ -629,21 +596,6 @@ namespace llarp
}
}
std::pair<size_t, size_t> LinkManager::num_in_out() const
{
return ep.num_in_out();
}
size_t LinkManager::get_num_connected_routers() const
{
return ep.num_router_conns();
}
size_t LinkManager::get_num_connected_clients() const
{
return ep.num_client_conns();
}
bool LinkManager::is_service_node() const
{
return _is_service_node;
@ -687,8 +639,7 @@ namespace llarp
connect_to(rc);
}
else
log::warning(
logcat, "NodeDB query for {} random RCs for connection returned none", num_conns);
log::warning(logcat, "NodeDB query for {} random RCs for connection returned none", num_conns);
}
void LinkManager::recv_data_message(oxen::quic::dgram_interface&, bstring)
@ -709,10 +660,7 @@ namespace llarp
continue;
send_control_message(
rid,
"gossip_rc"s,
GossipRCMessage::serialize(last_sender, rc),
[](oxen::quic::message) mutable {
rid, "gossip_rc"s, GossipRCMessage::serialize(last_sender, rc), [](oxen::quic::message) {
log::trace(logcat, "PLACEHOLDER FOR GOSSIP RC RESPONSE HANDLER");
});
++count;
@ -785,8 +733,7 @@ namespace llarp
}
catch (const std::exception& e)
{
log::critical(
link_cat, "Exception handling RC Fetch request (body:{}): {}", m.body(), e.what());
log::critical(link_cat, "Exception handling RC Fetch request (body:{}): {}", m.body(), e.what());
m.respond(messages::ERROR_RESPONSE, true);
return;
}
@ -848,9 +795,7 @@ namespace llarp
}
void LinkManager::fetch_rcs(
const RouterID& source,
std::string payload,
std::function<void(oxen::quic::message m)> func)
const RouterID& source, std::string payload, std::function<void(oxen::quic::message m)> func)
{
// this handler should not be registered for service nodes
assert(not _router.is_service_node());
@ -900,8 +845,7 @@ namespace llarp
// Initial fetch: give me all the RC's
if (explicit_ids.empty())
{
log::critical(
logcat, "Returning ALL locally held RCs for initial FetchRC request...");
log::critical(logcat, "Returning ALL locally held RCs for initial FetchRC request...");
for (const auto& rc : rcs)
{
sublist.append_encoded(rc.view());
@ -961,8 +905,7 @@ namespace llarp
source,
"fetch_rids"s,
m.body_str(),
[source_rid = std::move(source),
original = std::move(m)](oxen::quic::message m) mutable {
[source_rid = std::move(source), original = std::move(m)](oxen::quic::message m) mutable {
original.respond(m.body_str(), m.is_error());
});
return;
@ -992,8 +935,7 @@ namespace llarp
m.respond(std::move(btdp).str());
}
void LinkManager::handle_find_name(
std::string_view body, std::function<void(std::string)> respond)
void LinkManager::handle_find_name(std::string_view body, std::function<void(std::string)> respond)
{
std::string name_hash;
@ -1012,13 +954,11 @@ namespace llarp
_router.rpc_client()->lookup_ons_hash(
name_hash,
[respond = std::move(respond)](
[[maybe_unused]] std::optional<service::EncryptedName> maybe) mutable {
[respond = std::move(respond)]([[maybe_unused]] std::optional<service::EncryptedName> maybe) mutable {
if (maybe)
respond(serialize_response({{"NAME", maybe->ciphertext}}));
else
respond(
serialize_response({{messages::STATUS_KEY, FindNameMessage::NOT_FOUND}}));
respond(serialize_response({{messages::STATUS_KEY, FindNameMessage::NOT_FOUND}}));
});
}
@ -1065,8 +1005,7 @@ namespace llarp
}
}
void LinkManager::handle_publish_intro(
std::string_view body, std::function<void(std::string)> respond)
void LinkManager::handle_publish_intro(std::string_view body, std::function<void(std::string)> respond)
{
std::string introset, derived_signing_key, payload, sig, nonce;
uint64_t is_relayed, relay_order;
@ -1101,32 +1040,24 @@ namespace llarp
if (not service::EncryptedIntroSet::verify(introset, derived_signing_key, sig))
{
log::error(
link_cat, "Received PublishIntroMessage with invalid introset: {}", introset);
respond(serialize_response(
{{messages::STATUS_KEY, PublishIntroMessage::INVALID_INTROSET}}));
log::error(link_cat, "Received PublishIntroMessage with invalid introset: {}", introset);
respond(serialize_response({{messages::STATUS_KEY, PublishIntroMessage::INVALID_INTROSET}}));
return;
}
if (now + service::MAX_INTROSET_TIME_DELTA > signed_at + path::DEFAULT_LIFETIME)
{
log::error(
link_cat, "Received PublishIntroMessage with expired introset: {}", introset);
log::error(link_cat, "Received PublishIntroMessage with expired introset: {}", introset);
respond(serialize_response({{messages::STATUS_KEY, PublishIntroMessage::EXPIRED}}));
return;
}
auto closest_rcs =
_router.node_db()->find_many_closest_to(addr, INTROSET_STORAGE_REDUNDANCY);
auto closest_rcs = _router.node_db()->find_many_closest_to(addr, INTROSET_STORAGE_REDUNDANCY);
if (closest_rcs.size() != INTROSET_STORAGE_REDUNDANCY)
{
log::error(
link_cat,
"Received PublishIntroMessage but only know {} nodes",
closest_rcs.size());
respond(
serialize_response({{messages::STATUS_KEY, PublishIntroMessage::INSUFFICIENT}}));
log::error(link_cat, "Received PublishIntroMessage but only know {} nodes", closest_rcs.size());
respond(serialize_response({{messages::STATUS_KEY, PublishIntroMessage::INSUFFICIENT}}));
return;
}
@ -1136,12 +1067,8 @@ namespace llarp
{
if (relay_order >= INTROSET_STORAGE_REDUNDANCY)
{
log::error(
link_cat,
"Received PublishIntroMessage with invalide relay order: {}",
relay_order);
respond(serialize_response(
{{messages::STATUS_KEY, PublishIntroMessage::INVALID_ORDER}}));
log::error(link_cat, "Received PublishIntroMessage with invalide relay order: {}", relay_order);
respond(serialize_response({{messages::STATUS_KEY, PublishIntroMessage::INVALID_ORDER}}));
return;
}
@ -1162,10 +1089,7 @@ namespace llarp
}
else
{
log::info(
link_cat,
"Received PublishIntroMessage; propagating to peer index {}",
relay_order);
log::info(link_cat, "Received PublishIntroMessage; propagating to peer index {}", relay_order);
send_control_message(
peer_key,
@ -1195,17 +1119,13 @@ namespace llarp
if (rc_index >= 0)
{
log::info(
link_cat, "Received PublishIntroMessage for {} (TXID: {}); we are candidate {}");
log::info(link_cat, "Received PublishIntroMessage for {} (TXID: {}); we are candidate {}");
_router.contacts().put_intro(std::move(enc));
respond(serialize_response({{messages::STATUS_KEY, ""}}));
}
else
log::warning(
link_cat,
"Received non-relayed PublishIntroMessage from {}; we are not the candidate",
addr);
log::warning(link_cat, "Received non-relayed PublishIntroMessage from {}; we are not the candidate", addr);
}
// DISCUSS: I feel like ::handle_publish_intro_response should be the callback that handles the
@ -1258,8 +1178,7 @@ namespace llarp
}
}
void LinkManager::handle_find_intro(
std::string_view body, std::function<void(std::string)> respond)
void LinkManager::handle_find_intro(std::string_view body, std::function<void(std::string)> respond)
{
ustring location;
uint64_t relay_order, is_relayed;
@ -1285,26 +1204,17 @@ namespace llarp
{
if (relay_order >= INTROSET_STORAGE_REDUNDANCY)
{
log::warning(
link_cat,
"Received FindIntroMessage with invalid relay order: {}",
relay_order);
respond(
serialize_response({{messages::STATUS_KEY, FindIntroMessage::INVALID_ORDER}}));
log::warning(link_cat, "Received FindIntroMessage with invalid relay order: {}", relay_order);
respond(serialize_response({{messages::STATUS_KEY, FindIntroMessage::INVALID_ORDER}}));
return;
}
auto closest_rcs =
_router.node_db()->find_many_closest_to(addr, INTROSET_STORAGE_REDUNDANCY);
auto closest_rcs = _router.node_db()->find_many_closest_to(addr, INTROSET_STORAGE_REDUNDANCY);
if (closest_rcs.size() != INTROSET_STORAGE_REDUNDANCY)
{
log::error(
link_cat,
"Received FindIntroMessage but only know {} nodes",
closest_rcs.size());
respond(serialize_response(
{{messages::STATUS_KEY, FindIntroMessage::INSUFFICIENT_NODES}}));
log::error(link_cat, "Received FindIntroMessage but only know {} nodes", closest_rcs.size());
respond(serialize_response({{messages::STATUS_KEY, FindIntroMessage::INSUFFICIENT_NODES}}));
return;
}
@ -1325,13 +1235,9 @@ namespace llarp
"to initial "
"requester");
else if (relay_response.timed_out)
log::critical(
link_cat,
"Relayed FindIntroMessage timed out! Notifying initial requester");
log::critical(link_cat, "Relayed FindIntroMessage timed out! Notifying initial requester");
else
log::critical(
link_cat,
"Relayed FindIntroMessage failed! Notifying initial requester");
log::critical(link_cat, "Relayed FindIntroMessage failed! Notifying initial requester");
respond(relay_response.body_str());
});
@ -1342,9 +1248,7 @@ namespace llarp
respond(serialize_response({{"INTROSET", maybe_intro->bt_encode()}}));
else
{
log::warning(
link_cat,
"Received FindIntroMessage with relayed == false and no local introset entry");
log::warning(link_cat, "Received FindIntroMessage with relayed == false and no local introset entry");
respond(serialize_response({{messages::STATUS_KEY, FindIntroMessage::NOT_FOUND}}));
}
}
@ -1389,8 +1293,7 @@ namespace llarp
if (!_router.path_context().is_transit_allowed())
{
log::warning(link_cat, "got path build request when not permitting transit");
m.respond(
serialize_response({{messages::STATUS_KEY, PathBuildMessage::NO_TRANSIT}}), true);
m.respond(serialize_response({{messages::STATUS_KEY, PathBuildMessage::NO_TRANSIT}}), true);
return;
}
@ -1400,9 +1303,7 @@ namespace llarp
if (payload_list.size() != path::MAX_LEN)
{
log::info(link_cat, "Path build message with wrong number of frames");
m.respond(
serialize_response({{messages::STATUS_KEY, PathBuildMessage::BAD_FRAMES}}),
true);
m.respond(serialize_response({{messages::STATUS_KEY, PathBuildMessage::BAD_FRAMES}}), true);
return;
}
@ -1417,13 +1318,10 @@ namespace llarp
SharedSecret shared;
// derive shared secret using ephemeral pubkey and our secret key (and nonce)
if (!crypto::dh_server(
shared.data(), other_pubkey.data(), _router.pubkey(), outer_nonce.data()))
if (!crypto::dh_server(shared.data(), other_pubkey.data(), _router.pubkey(), outer_nonce.data()))
{
log::info(link_cat, "DH server initialization failed during path build");
m.respond(
serialize_response({{messages::STATUS_KEY, PathBuildMessage::BAD_CRYPTO}}),
true);
m.respond(serialize_response({{messages::STATUS_KEY, PathBuildMessage::BAD_CRYPTO}}), true);
return;
}
@ -1432,28 +1330,21 @@ namespace llarp
if (!crypto::hmac(digest.data(), frame.data(), frame.size(), shared))
{
log::error(link_cat, "HMAC failed on path build request");
m.respond(
serialize_response({{messages::STATUS_KEY, PathBuildMessage::BAD_CRYPTO}}),
true);
m.respond(serialize_response({{messages::STATUS_KEY, PathBuildMessage::BAD_CRYPTO}}), true);
return;
}
if (!std::equal(digest.begin(), digest.end(), hash.data()))
{
log::info(link_cat, "HMAC mismatch on path build request");
m.respond(
serialize_response({{messages::STATUS_KEY, PathBuildMessage::BAD_CRYPTO}}),
true);
m.respond(serialize_response({{messages::STATUS_KEY, PathBuildMessage::BAD_CRYPTO}}), true);
return;
}
// decrypt frame with our hop info
if (!crypto::xchacha20(
hop_payload.data(), hop_payload.size(), shared.data(), outer_nonce.data()))
if (!crypto::xchacha20(hop_payload.data(), hop_payload.size(), shared.data(), outer_nonce.data()))
{
log::info(link_cat, "Decrypt failed on path build request");
m.respond(
serialize_response({{messages::STATUS_KEY, PathBuildMessage::BAD_CRYPTO}}),
true);
m.respond(serialize_response({{messages::STATUS_KEY, PathBuildMessage::BAD_CRYPTO}}), true);
return;
}
@ -1477,9 +1368,7 @@ namespace llarp
if (hop->info.txID.IsZero() || hop->info.rxID.IsZero())
{
log::warning(link_cat, "Invalid PathID; PathIDs must be non-zero");
m.respond(
serialize_response({{messages::STATUS_KEY, PathBuildMessage::BAD_PATHID}}),
true);
m.respond(serialize_response({{messages::STATUS_KEY, PathBuildMessage::BAD_PATHID}}), true);
return;
}
@ -1488,26 +1377,20 @@ namespace llarp
if (_router.path_context().has_transit_hop(hop->info))
{
log::warning(link_cat, "Invalid PathID; PathIDs must be unique");
m.respond(
serialize_response({{messages::STATUS_KEY, PathBuildMessage::BAD_PATHID}}),
true);
m.respond(serialize_response({{messages::STATUS_KEY, PathBuildMessage::BAD_PATHID}}), true);
return;
}
if (!crypto::dh_server(
hop->pathKey.data(), other_pubkey.data(), _router.pubkey(), inner_nonce.data()))
if (!crypto::dh_server(hop->pathKey.data(), other_pubkey.data(), _router.pubkey(), inner_nonce.data()))
{
log::warning(link_cat, "DH failed during path build.");
m.respond(
serialize_response({{messages::STATUS_KEY, PathBuildMessage::BAD_CRYPTO}}),
true);
m.respond(serialize_response({{messages::STATUS_KEY, PathBuildMessage::BAD_CRYPTO}}), true);
return;
}
// generate hash of hop key for nonce mutation
ShortHash xor_hash;
crypto::shorthash(xor_hash, hop->pathKey.data(), hop->pathKey.size());
hop->nonceXOR =
xor_hash.data(); // nonceXOR is 24 bytes, ShortHash is 32; this will truncate
hop->nonceXOR = xor_hash.data(); // nonceXOR is 24 bytes, ShortHash is 32; this will truncate
// set and check path lifetime
hop->lifetime = 1ms * lifetime;
@ -1515,9 +1398,7 @@ namespace llarp
if (hop->lifetime >= path::DEFAULT_LIFETIME)
{
log::warning(link_cat, "Path build attempt with too long of a lifetime.");
m.respond(
serialize_response({{messages::STATUS_KEY, PathBuildMessage::BAD_LIFETIME}}),
true);
m.respond(serialize_response({{messages::STATUS_KEY, PathBuildMessage::BAD_LIFETIME}}), true);
return;
}
@ -1543,8 +1424,7 @@ namespace llarp
// onion round to compute the return value, so we don't care about it.
for (auto& element : payload_list)
{
crypto::onion(
element.data(), element.size(), hop->pathKey, onion_nonce, onion_nonce);
crypto::onion(element.data(), element.size(), hop->pathKey, onion_nonce, onion_nonce);
}
// randomize final frame. could probably paste our frame on the end and onion it with
// the rest, but it gains nothing over random.
@ -1568,8 +1448,7 @@ namespace llarp
if (m.timed_out)
log::info(link_cat, "Upstream timed out on path build; relaying timeout");
else
log::info(
link_cat, "Upstream returned path build failure; relaying response");
log::info(link_cat, "Upstream returned path build failure; relaying response");
m.respond(m.body_str(), m.is_error());
});
@ -1663,8 +1542,7 @@ namespace llarp
}
RouterID target{pubkey.data()};
auto transit_hop =
_router.path_context().GetTransitHop(target, PathID_t{to_usv(tx_id).data()});
auto transit_hop = _router.path_context().GetTransitHop(target, PathID_t{to_usv(tx_id).data()});
const auto rx_id = transit_hop->info.rxID;
@ -1672,8 +1550,7 @@ namespace llarp
(crypto::verify(pubkey, to_usv(dict_data), sig)
and _router.exitContext().obtain_new_exit(PubKey{pubkey.data()}, rx_id, flag != 0));
m.respond(
ObtainExitMessage::sign_and_serialize_response(_router.identity(), tx_id), not success);
m.respond(ObtainExitMessage::sign_and_serialize_response(_router.identity(), tx_id), not success);
}
void LinkManager::handle_obtain_exit_response(oxen::quic::message m)
@ -1734,21 +1611,15 @@ namespace llarp
return;
}
auto transit_hop =
_router.path_context().GetTransitHop(_router.pubkey(), PathID_t{to_usv(tx_id).data()});
auto transit_hop = _router.path_context().GetTransitHop(_router.pubkey(), PathID_t{to_usv(tx_id).data()});
if (auto exit_ep =
_router.exitContext().find_endpoint_for_path(PathID_t{to_usv(path_id).data()}))
if (auto exit_ep = _router.exitContext().find_endpoint_for_path(PathID_t{to_usv(path_id).data()}))
{
if (crypto::verify(exit_ep->PubKey().data(), to_usv(dict_data), sig))
{
(exit_ep->UpdateLocalPath(transit_hop->info.rxID))
? m.respond(
UpdateExitMessage::sign_and_serialize_response(_router.identity(), tx_id))
: m.respond(
serialize_response(
{{messages::STATUS_KEY, UpdateExitMessage::UPDATE_FAILED}}),
true);
? m.respond(UpdateExitMessage::sign_and_serialize_response(_router.identity(), tx_id))
: m.respond(serialize_response({{messages::STATUS_KEY, UpdateExitMessage::UPDATE_FAILED}}), true);
}
// If we fail to verify the message, no-op
}
@ -1820,8 +1691,7 @@ namespace llarp
return;
}
auto transit_hop =
_router.path_context().GetTransitHop(_router.pubkey(), PathID_t{to_usv(tx_id).data()});
auto transit_hop = _router.path_context().GetTransitHop(_router.pubkey(), PathID_t{to_usv(tx_id).data()});
const auto rx_id = transit_hop->info.rxID;
@ -1834,8 +1704,7 @@ namespace llarp
}
}
m.respond(
serialize_response({{messages::STATUS_KEY, CloseExitMessage::UPDATE_FAILED}}), true);
m.respond(serialize_response({{messages::STATUS_KEY, CloseExitMessage::UPDATE_FAILED}}), true);
}
void LinkManager::handle_close_exit_response(oxen::quic::message m)
@ -1971,13 +1840,11 @@ namespace llarp
if (itr == path_requests.end())
{
log::info(
link_cat, "Received path control request \"{}\", which has no handler.", method);
log::info(link_cat, "Received path control request \"{}\", which has no handler.", method);
return;
}
auto respond = [m = std::move(m),
hop_weak = hop->weak_from_this()](std::string response) mutable {
auto respond = [m = std::move(m), hop_weak = hop->weak_from_this()](std::string response) mutable {
auto hop = hop_weak.lock();
if (not hop)
return; // transit hop gone, drop response

@ -88,15 +88,14 @@ namespace llarp
bool have_service_conn(const RouterID& remote) const;
std::pair<size_t, size_t> num_in_out() const;
std::tuple<size_t, size_t, size_t, size_t> connection_stats() const;
size_t num_client_conns() const;
size_t num_router_conns() const;
template <typename... Opt>
bool establish_connection(
const oxen::quic::RemoteAddress& remote, const RemoteRC& rc, Opt&&... opts);
bool establish_connection(const oxen::quic::RemoteAddress& remote, const RemoteRC& rc, Opt&&... opts);
template <typename... Opt>
bool establish_and_send(
@ -179,9 +178,7 @@ namespace llarp
std::shared_ptr<oxen::quic::Endpoint> startup_endpoint();
void register_commands(
const std::shared_ptr<oxen::quic::BTRequestStream>& s,
const RouterID& rid,
bool client_only = false);
const std::shared_ptr<oxen::quic::BTRequestStream>& s, const RouterID& rid, bool client_only = false);
public:
const link::Endpoint& endpoint() const
@ -198,24 +195,17 @@ namespace llarp
void handle_gossip_rc(oxen::quic::message m);
void fetch_rcs(
const RouterID& source,
std::string payload,
std::function<void(oxen::quic::message m)> func);
void fetch_rcs(const RouterID& source, std::string payload, std::function<void(oxen::quic::message m)> func);
void handle_fetch_rcs(oxen::quic::message m);
void fetch_router_ids(
const RouterID& via,
std::string payload,
std::function<void(oxen::quic::message m)> func);
const RouterID& via, std::string payload, std::function<void(oxen::quic::message m)> func);
void handle_fetch_router_ids(oxen::quic::message m);
void fetch_bootstrap_rcs(
const RemoteRC& source,
std::string payload,
std::function<void(oxen::quic::message m)> func);
const RemoteRC& source, std::string payload, std::function<void(oxen::quic::message m)> func);
void handle_fetch_bootstrap_rcs(oxen::quic::message m);
@ -241,7 +231,7 @@ namespace llarp
void set_conn_persist(const RouterID& remote, llarp_time_t until);
std::pair<size_t, size_t> num_in_out() const;
std::tuple<size_t, size_t, size_t, size_t> connection_stats() const;
size_t get_num_connected_routers() const;
@ -269,12 +259,9 @@ namespace llarp
private:
// DHT messages
void handle_find_name(
std::string_view body, std::function<void(std::string)> respond); // relay
void handle_find_intro(
std::string_view body, std::function<void(std::string)> respond); // relay
void handle_publish_intro(
std::string_view body, std::function<void(std::string)> respond); // relay
void handle_find_name(std::string_view body, std::function<void(std::string)> respond); // relay
void handle_find_intro(std::string_view body, std::function<void(std::string)> respond); // relay
void handle_publish_intro(std::string_view body, std::function<void(std::string)> respond); // relay
// Path messages
void handle_path_build(oxen::quic::message, const RouterID& from); // relay
@ -312,8 +299,7 @@ namespace llarp
// Path relaying
void handle_path_control(oxen::quic::message, const RouterID& from);
void handle_inner_request(
oxen::quic::message m, std::string payload, std::shared_ptr<path::TransitHop> hop);
void handle_inner_request(oxen::quic::message m, std::string payload, std::shared_ptr<path::TransitHop> hop);
// DHT responses
void handle_find_name_response(oxen::quic::message);
@ -355,12 +341,10 @@ namespace llarp
if (not b)
{
log::critical(
logcat,
"ERROR: attempting to establish an already-existing connection");
(is_control) ? itr->second->control_stream->command(
std::move(*ep), std::move(body), std::move(func))
: itr->second->conn->send_datagram(std::move(body));
log::critical(logcat, "ERROR: attempting to establish an already-existing connection");
(is_control)
? itr->second->control_stream->command(std::move(*ep), std::move(body), std::move(func))
: itr->second->conn->send_datagram(std::move(body));
return true;
}
@ -374,10 +358,7 @@ namespace llarp
std::shared_ptr<oxen::quic::BTRequestStream> control_stream =
conn_interface->template open_stream<oxen::quic::BTRequestStream>(
[rid = rid](oxen::quic::Stream&, uint64_t error_code) {
log::warning(
logcat,
"BTRequestStream closed unexpectedly (ec:{})",
error_code);
log::warning(logcat, "BTRequestStream closed unexpectedly (ec:{})", error_code);
});
if (is_snode)
@ -392,15 +373,12 @@ namespace llarp
is_control ? "control message (ep:{})"_format(ep) : "data message",
rid);
(is_control)
? control_stream->command(std::move(*ep), std::move(body), std::move(func))
: conn_interface->send_datagram(std::move(body));
(is_control) ? control_stream->command(std::move(*ep), std::move(body), std::move(func))
: conn_interface->send_datagram(std::move(body));
itr->second =
std::make_shared<link::Connection>(conn_interface, control_stream, true);
itr->second = std::make_shared<link::Connection>(conn_interface, control_stream, true);
log::critical(
logcat, "Outbound connection to RID:{} added to service conns...", rid);
log::critical(logcat, "Outbound connection to RID:{} added to service conns...", rid);
return true;
}
catch (...)
@ -412,8 +390,7 @@ namespace llarp
}
template <typename... Opt>
bool Endpoint::establish_connection(
const oxen::quic::RemoteAddress& remote, const RemoteRC& rc, Opt&&... opts)
bool Endpoint::establish_connection(const oxen::quic::RemoteAddress& remote, const RemoteRC& rc, Opt&&... opts)
{
return link_manager.router().loop()->call_get([&]() {
try
@ -427,9 +404,7 @@ namespace llarp
if (not b)
{
log::critical(
logcat,
"ERROR: attempting to establish an already-existing connection");
log::critical(logcat, "ERROR: attempting to establish an already-existing connection");
return b;
}
@ -439,24 +414,18 @@ namespace llarp
is_snode ? ROUTER_KEEP_ALIVE : CLIENT_KEEP_ALIVE,
std::forward<Opt>(opts)...);
auto control_stream =
conn_interface->template open_stream<oxen::quic::BTRequestStream>(
[rid = rid](oxen::quic::Stream&, uint64_t error_code) {
log::warning(
logcat,
"BTRequestStream closed unexpectedly (ec:{})",
error_code);
});
auto control_stream = conn_interface->template open_stream<oxen::quic::BTRequestStream>(
[rid = rid](oxen::quic::Stream&, uint64_t error_code) {
log::warning(logcat, "BTRequestStream closed unexpectedly (ec:{})", error_code);
});
if (is_snode)
link_manager.register_commands(control_stream, rid);
else
log::critical(logcat, "Client NOT registering BTStream commands!");
itr->second =
std::make_shared<link::Connection>(conn_interface, control_stream, true);
itr->second = std::make_shared<link::Connection>(conn_interface, control_stream, true);
log::critical(
logcat, "Outbound connection to RID:{} added to service conns...", rid);
log::critical(logcat, "Outbound connection to RID:{} added to service conns...", rid);
return true;
}
catch (...)

@ -42,6 +42,11 @@ namespace llarp
fetch_counters.clear();
}
std::tuple<size_t, size_t, size_t> NodeDB::db_stats() const
{
return {num_rcs(), num_rids(), num_bootstraps()};
}
std::optional<RemoteRC> NodeDB::get_rc_by_rid(const RouterID& rid)
{
if (auto itr = rc_lookup.find(rid); itr != rc_lookup.end())
@ -69,8 +74,7 @@ namespace llarp
return rand;
}
std::optional<RemoteRC> NodeDB::get_random_rc_conditional(
std::function<bool(RemoteRC)> hook) const
std::optional<RemoteRC> NodeDB::get_random_rc_conditional(std::function<bool(RemoteRC)> hook) const
{
std::optional<RemoteRC> rand = get_random_rc();
@ -206,9 +210,7 @@ namespace llarp
2) The ratio of "good" rcs to total received is above MIN_GOOD_RC_FETCH_THRESHOLD
*/
bool success = false;
if (success = inter_size > MIN_GOOD_RC_FETCH_TOTAL
and fetch_threshold > MIN_GOOD_RC_FETCH_THRESHOLD;
success)
if (success = inter_size > MIN_GOOD_RC_FETCH_TOTAL and fetch_threshold > MIN_GOOD_RC_FETCH_THRESHOLD; success)
{
// set rcs to be intersection set
rcs = std::move(confirmed_set);
@ -228,10 +230,7 @@ namespace llarp
auto success = process_fetched_rcs(rcs);
log::critical(
logcat,
"RCs returned by FetchRC {} by trust model",
success ? "approved" : "rejected");
log::critical(logcat, "RCs returned by FetchRC {} by trust model", success ? "approved" : "rejected");
return success;
}
@ -291,9 +290,7 @@ namespace llarp
receiving a sufficient amount to make a comparison of any sorts
*/
bool success = false;
if (success = (fetch_threshold > GOOD_RID_FETCH_THRESHOLD)
and (union_size > MIN_GOOD_RID_FETCH_TOTAL);
success)
if (success = (fetch_threshold > GOOD_RID_FETCH_THRESHOLD) and (union_size > MIN_GOOD_RID_FETCH_TOTAL); success)
{
process_results(std::move(unconfirmed_set), unconfirmed_rids, known_rids);
@ -409,8 +406,7 @@ namespace llarp
}
catch (const std::exception& e)
{
log::critical(
logcat, "Failed to parse RC fetch response from {}: {}", source, e.what());
log::critical(logcat, "Failed to parse RC fetch response from {}: {}", source, e.what());
fetch_rcs_result(initial, true);
return;
}
@ -433,8 +429,7 @@ namespace llarp
if (not initial and rid_sources.empty())
{
log::error(
logcat, "Attempting to fetch RouterIDs, but have no source from which to do so.");
log::error(logcat, "Attempting to fetch RouterIDs, but have no source from which to do so.");
fallback_to_bootstrap();
return;
}
@ -453,8 +448,8 @@ namespace llarp
[this, source = src, target, initial](oxen::quic::message m) mutable {
if (m.is_error())
{
auto err = "RID fetch from {} via {} {}"_format(
target, source, m.timed_out ? "timed out" : "failed");
auto err =
"RID fetch from {} via {} {}"_format(target, source, m.timed_out ? "timed out" : "failed");
log::critical(link_cat, err);
ingest_rid_fetch_responses(target);
fetch_rids_result(initial);
@ -468,14 +463,12 @@ namespace llarp
btdc.required("routers");
auto router_id_strings = btdc.consume_list<std::vector<ustring>>();
btdc.require_signature(
"signature", [&source](ustring_view msg, ustring_view sig) {
if (sig.size() != 64)
throw std::runtime_error{"Invalid signature: not 64 bytes"};
if (not crypto::verify(source, msg, sig))
throw std::runtime_error{
"Failed to verify signature for fetch RouterIDs response."};
});
btdc.require_signature("signature", [&source](ustring_view msg, ustring_view sig) {
if (sig.size() != 64)
throw std::runtime_error{"Invalid signature: not 64 bytes"};
if (not crypto::verify(source, msg, sig))
throw std::runtime_error{"Failed to verify signature for fetch RouterIDs response."};
});
std::set<RouterID> router_ids;
@ -484,10 +477,7 @@ namespace llarp
if (s.size() != RouterID::SIZE)
{
log::critical(
link_cat,
"RID fetch from {} via {} returned bad RouterID",
target,
source);
link_cat, "RID fetch from {} via {} returned bad RouterID", target, source);
ingest_rid_fetch_responses(target);
fetch_rids_result(initial);
return;
@ -502,8 +492,7 @@ namespace llarp
}
catch (const std::exception& e)
{
log::critical(
link_cat, "Error handling fetch RouterIDs response: {}", e.what());
log::critical(link_cat, "Error handling fetch RouterIDs response: {}", e.what());
ingest_rid_fetch_responses(target);
fetch_rids_result(initial);
}
@ -534,9 +523,8 @@ namespace llarp
// If we have passed the last last conditional, then it means we are not bootstrapping
// and the current fetch_source has more attempts before being rotated. As a result, we
// find new non-bootstrap RC fetch source and try again buddy
fetch_source = (initial)
? *std::next(known_rids.begin(), csrng() % known_rids.size())
: std::next(rc_lookup.begin(), csrng() % rc_lookup.size())->first;
fetch_source = (initial) ? *std::next(known_rids.begin(), csrng() % known_rids.size())
: std::next(rc_lookup.begin(), csrng() % rc_lookup.size())->first;
fetch_rcs(initial);
}
@ -565,8 +553,7 @@ namespace llarp
if (n_responses < RID_SOURCE_COUNT)
{
log::critical(
logcat, "Received {}/{} fetch RID requests", n_responses, RID_SOURCE_COUNT);
log::critical(logcat, "Received {}/{} fetch RID requests", n_responses, RID_SOURCE_COUNT);
return;
}
@ -574,11 +561,7 @@ namespace llarp
if (n_fails <= MAX_RID_ERRORS)
{
log::critical(
logcat,
"RID fetching was successful ({}/{} acceptable errors)",
n_fails,
MAX_RID_ERRORS);
log::critical(logcat, "RID fetching was successful ({}/{} acceptable errors)", n_fails, MAX_RID_ERRORS);
// this is where the trust model will do verification based on the similarity of the
// sets
@ -589,19 +572,14 @@ namespace llarp
return;
}
log::critical(
logcat,
"Accumulated RID's rejected by trust model, reselecting all RID sources...");
log::critical(logcat, "Accumulated RID's rejected by trust model, reselecting all RID sources...");
reselect_router_id_sources(rid_sources);
++fetch_failures;
}
else
{
// we had 4 or more failed requests, so we will need to rotate our rid sources
log::critical(
logcat,
"RID fetching found {} failures; reselecting failed RID sources...",
n_fails);
log::critical(logcat, "RID fetching found {} failures; reselecting failed RID sources...", n_fails);
++fetch_failures;
reselect_router_id_sources(fail_sources);
}
@ -655,8 +633,7 @@ namespace llarp
// bad spot; we are unable to do anything
if (_using_bootstrap_fallback)
{
auto err = fmt::format(
"ERROR: ALL BOOTSTRAPS ARE BAD... REATTEMPTING IN {}...", BOOTSTRAP_COOLDOWN);
auto err = fmt::format("ERROR: ALL BOOTSTRAPS ARE BAD... REATTEMPTING IN {}...", BOOTSTRAP_COOLDOWN);
log::error(logcat, err);
bootstrap_cooldown();
@ -676,15 +653,13 @@ namespace llarp
auto is_snode = _router.is_service_node();
auto num_needed =
is_snode ? SERVICE_NODE_BOOTSTRAP_SOURCE_COUNT : CLIENT_BOOTSTRAP_SOURCE_COUNT;
auto num_needed = is_snode ? SERVICE_NODE_BOOTSTRAP_SOURCE_COUNT : CLIENT_BOOTSTRAP_SOURCE_COUNT;
_router.link_manager().fetch_bootstrap_rcs(
rc,
BootstrapFetchMessage::serialize(
is_snode ? std::make_optional(_router.router_contact) : std::nullopt, num_needed),
[this, is_snode = _router.is_service_node(), src = rc.router_id()](
oxen::quic::message m) mutable {
[this, is_snode = _router.is_service_node(), src = rc.router_id()](oxen::quic::message m) mutable {
log::critical(logcat, "Received response to BootstrapRC fetch request...");
if (not m)
@ -731,11 +706,7 @@ namespace llarp
}
log::critical(
logcat,
"BootstrapRC fetch response from {} returned {}/{} needed RCs",
src,
num,
MIN_ACTIVE_RCS);
logcat, "BootstrapRC fetch response from {} returned {}/{} needed RCs", src, num, MIN_ACTIVE_RCS);
if (not is_snode)
{
@ -818,8 +789,7 @@ namespace llarp
{
if (not _router.is_service_node())
{
if (_pinned_edges.size() && _pinned_edges.count(remote) == 0
&& not _bootstraps.contains(remote))
if (_pinned_edges.size() && _pinned_edges.count(remote) == 0 && not _bootstraps.contains(remote))
return false;
}
@ -932,8 +902,7 @@ namespace llarp
{
auto cutoff_time = time_point_now();
cutoff_time -=
_router.is_service_node() ? RouterContact::OUTDATED_AGE : RouterContact::LIFETIME;
cutoff_time -= _router.is_service_node() ? RouterContact::OUTDATED_AGE : RouterContact::LIFETIME;
for (auto itr = rc_lookup.begin(); itr != rc_lookup.end();)
{
@ -1048,8 +1017,7 @@ namespace llarp
});
}
std::vector<RemoteRC> NodeDB::find_many_closest_to(
llarp::dht::Key_t location, uint32_t numRouters) const
std::vector<RemoteRC> NodeDB::find_many_closest_to(llarp::dht::Key_t location, uint32_t numRouters) const
{
return _router.loop()->call_get([this, location, numRouters]() -> std::vector<RemoteRC> {
std::vector<const RemoteRC*> all;
@ -1063,11 +1031,9 @@ namespace llarp
auto it_mid = numRouters < all.size() ? all.begin() + numRouters : all.end();
std::partial_sort(
all.begin(),
it_mid,
all.end(),
[compare = dht::XorMetric{location}](auto* a, auto* b) { return compare(*a, *b); });
std::partial_sort(all.begin(), it_mid, all.end(), [compare = dht::XorMetric{location}](auto* a, auto* b) {
return compare(*a, *b);
});
std::vector<RemoteRC> closest;
closest.reserve(numRouters);

@ -173,8 +173,8 @@ namespace llarp
*/
std::atomic<int> fetch_failures{0}, bootstrap_attempts{0};
std::atomic<bool> _using_bootstrap_fallback{false}, _needs_rebootstrap{false},
_needs_initial_fetch{true}, _fetching_initial{false}, _initial_completed{false};
std::atomic<bool> _using_bootstrap_fallback{false}, _needs_rebootstrap{false}, _needs_initial_fetch{true},
_fetching_initial{false}, _initial_completed{false};
bool want_rc(const RouterID& rid) const;
@ -185,12 +185,14 @@ namespace llarp
fs::path get_path_by_pubkey(const RouterID& pk) const;
public:
explicit NodeDB(
fs::path rootdir, std::function<void(std::function<void()>)> diskCaller, Router* r);
explicit NodeDB(fs::path rootdir, std::function<void(std::function<void()>)> diskCaller, Router* r);
/// in memory nodedb
NodeDB();
// returns {num_rcs, num_rids, num_bootstraps}
std::tuple<size_t, size_t, size_t> db_stats() const;
const std::set<RouterID>& get_known_rids() const
{
return known_rids;
@ -356,7 +358,7 @@ namespace llarp
/// explicit save all RCs to disk synchronously
void save_to_disk() const;
/// the number of RCs that are loaded from disk
/// the number of known RC's currently held
size_t num_rcs() const;
size_t num_rids() const;
@ -406,11 +408,7 @@ namespace llarp
// `target_size`) from population to refill it.
template <typename T, typename RNG>
void replace_subset(
std::set<T>& current,
const std::set<T>& replace,
std::set<T> population,
size_t target_size,
RNG&& rng)
std::set<T>& current, const std::set<T>& replace, std::set<T> population, size_t target_size, RNG&& rng)
{
// Remove the ones we are replacing from current:
current.erase(replace.begin(), replace.end());
@ -468,13 +466,8 @@ namespace llarp
template <
typename ID_t,
std::enable_if_t<
std::is_same_v<ID_t, RouterID> || std::is_same_v<ID_t, RemoteRC>,
int> = 0>
void process_results(
std::set<ID_t> unconfirmed,
std::set<Unconfirmed<ID_t>>& container,
std::set<ID_t>& known)
std::enable_if_t<std::is_same_v<ID_t, RouterID> || std::is_same_v<ID_t, RemoteRC>, int> = 0>
void process_results(std::set<ID_t> unconfirmed, std::set<Unconfirmed<ID_t>>& container, std::set<ID_t>& known)
{
// before we add the unconfirmed set, we check to see if our local set of unconfirmed
// rcs/rids appeared in the latest unconfirmed set; if so, we will increment their

@ -45,9 +45,8 @@ namespace llarp
_exit_context{this},
_disk_thread{_lmq->add_tagged_thread("disk")},
_rpc_server{nullptr},
_randomStartDelay{
platform::is_simulation ? std::chrono::milliseconds{(llarp::randint() % 1250) + 2000}
: 0s} // , _link_manager{*this}
_randomStartDelay{platform::is_simulation ? std::chrono::milliseconds{(llarp::randint() % 1250) + 2000} : 0s}
// , _link_manager{*this}
,
_hidden_service_context{this}
{
@ -147,8 +146,7 @@ namespace llarp
{
for (const auto& [key, value] : paths.items())
{
if (value.is_object() && value.at("status").is_string()
&& value.at("status") == "established")
if (value.is_object() && value.at("status").is_string() && value.at("status") == "established")
pathsCount++;
}
}
@ -184,6 +182,11 @@ namespace llarp
return stats;
}
bool Router::fully_meshed() const
{
return num_router_connections() >= _node_db->num_rcs();
}
bool Router::needs_initial_fetch() const
{
return _node_db->needs_initial_fetch();
@ -199,9 +202,7 @@ namespace llarp
if (is_service_node())
return;
for_each_connection([this](link::Connection& conn) {
loop()->call([&]() { conn.conn->close_connection(); });
});
for_each_connection([this](link::Connection& conn) { loop()->call([&]() { conn.conn->close_connection(); }); });
}
void Router::Thaw()
@ -211,9 +212,7 @@ namespace llarp
std::unordered_set<RouterID> peer_pubkeys;
for_each_connection([&peer_pubkeys](link::Connection& conn) {
peer_pubkeys.emplace(conn.conn->remote_key());
});
for_each_connection([&peer_pubkeys](link::Connection& conn) { peer_pubkeys.emplace(conn.conn->remote_key()); });
loop()->call([this, &peer_pubkeys]() {
for (auto& pk : peer_pubkeys)
@ -248,13 +247,9 @@ namespace llarp
}
bool Router::send_control_message(
const RouterID& remote,
std::string ep,
std::string body,
std::function<void(oxen::quic::message m)> func)
const RouterID& remote, std::string ep, std::string body, std::function<void(oxen::quic::message m)> func)
{
return _link_manager->send_control_message(
remote, std::move(ep), std::move(body), std::move(func));
return _link_manager->send_control_message(remote, std::move(ep), std::move(body), std::move(func));
}
void Router::for_each_connection(std::function<void(link::Connection&)> func)
@ -341,8 +336,7 @@ namespace llarp
RouterContact::ACTIVE_NETID = netid;
if (_testing_disabled and not _testnet)
throw std::runtime_error{
"Error: reachability testing can only be disabled on testnet!"};
throw std::runtime_error{"Error: reachability testing can only be disabled on testnet!"};
auto err = "Lokinet network ID set to {}, NOT mainnet! {}"_format(
netid,
@ -355,8 +349,7 @@ namespace llarp
auto log_type = conf.logging.type;
if (log_type == log::Type::File
&& (conf.logging.file == "stdout" || conf.logging.file == "-"
|| conf.logging.file.empty()))
&& (conf.logging.file == "stdout" || conf.logging.file == "-" || conf.logging.file.empty()))
log_type = log::Type::Print;
if (log::get_level_default() != log::Level::off)
@ -396,8 +389,7 @@ namespace llarp
_node_db = std::move(nodedb);
log::debug(
logcat, _is_service_node ? "Running as a relay (service node)" : "Running as a client");
log::debug(logcat, _is_service_node ? "Running as a relay (service node)" : "Running as a client");
if (_is_service_node)
_rpc_client->ConnectAsync(rpc_addr);
@ -454,20 +446,17 @@ namespace llarp
bool Router::appears_decommed() const
{
return _is_service_node and have_snode_whitelist()
and node_db()->greylist().count(pubkey());
return _is_service_node and have_snode_whitelist() and node_db()->greylist().count(pubkey());
}
bool Router::appears_funded() const
{
return _is_service_node and have_snode_whitelist()
and node_db()->is_connection_allowed(pubkey());
return _is_service_node and have_snode_whitelist() and node_db()->is_connection_allowed(pubkey());
}
bool Router::appears_registered() const
{
return _is_service_node and have_snode_whitelist()
and node_db()->registered_routers().count(pubkey());
return _is_service_node and have_snode_whitelist() and node_db()->registered_routers().count(pubkey());
}
bool Router::can_test_routers() const
@ -537,8 +526,7 @@ namespace llarp
else
{
if (paddr or pport)
throw std::runtime_error{
"Must specify [bind]:listen in config with public ip/addr!"};
throw std::runtime_error{"Must specify [bind]:listen in config with public ip/addr!"};
if (auto maybe_addr = net().get_best_public_address(true, DEFAULT_LISTEN_PORT))
_listen_address = std::move(*maybe_addr);
@ -546,9 +534,8 @@ namespace llarp
throw std::runtime_error{"Could not find net interface on current platform!"};
}
_public_address = (not paddr and not pport)
? _listen_address
: oxen::quic::Address{*paddr, pport ? *pport : DEFAULT_LISTEN_PORT};
_public_address = (not paddr and not pport) ? _listen_address
: oxen::quic::Address{*paddr, pport ? *pport : DEFAULT_LISTEN_PORT};
RouterContact::BLOCK_BOGONS = conf.router.block_bogons;
@ -563,8 +550,7 @@ namespace llarp
throw std::runtime_error("cannot use strict-connect option as service node");
if (val.size() < 2)
throw std::runtime_error(
"Must specify more than one strict-connect router if using strict-connect");
throw std::runtime_error("Must specify more than one strict-connect router if using strict-connect");
_node_db->pinned_edges().insert(val.begin(), val.end());
log::debug(logcat, "{} strict-connect routers configured", val.size());
@ -622,30 +608,22 @@ namespace llarp
return now - _last_stats_report > REPORT_STATS_INTERVAL;
}
std::string Router::_stats_line()
{
auto [_in, _out, _relay, _client] = _link_manager->connection_stats();
auto [_rcs, _rids, _bstraps] = _node_db->db_stats();
return "{} RCs, {} RIDs, {} bstraps, conns [{}:{} in:out, {}:{} relay:client"_format(
_rcs, _rids, _bstraps, _in, _out, _relay, _client);
}
void Router::report_stats()
{
const auto now = llarp::time_now_ms();
auto [in, out] = _link_manager->num_in_out();
auto num_bootstraps = _node_db->num_bootstraps();
auto num_rids = _node_db->num_rids();
auto num_rcs = _node_db->num_rcs();
auto num_router_conns = num_router_connections();
log::critical(logcat, "Local {}: {}", is_service_node() ? "Service Node" : "Client", _stats_line());
log::critical(
logcat,
"Local {} has {} RCs, {} RIDs, {} bootstrap peers, {}:{} (inbound:outbound) "
"conns ({} router, {} client)",
is_service_node() ? "Service Node" : "Client",
num_rcs,
num_rids,
num_bootstraps,
in,
out,
num_router_conns,
num_client_connections());
if (is_service_node() and num_router_connections() >= num_rcs)
if (is_service_node() and fully_meshed())
{
log::critical(logcat, "SERVICE NODE IS FULLY MESHED");
}
@ -660,55 +638,17 @@ namespace llarp
std::string Router::status_line()
{
std::string status;
auto out = std::back_inserter(status);
fmt::format_to(out, "v{}", fmt::join(llarp::LOKINET_VERSION, "."));
auto line = "v{}{}: {}"_format(
fmt::join(llarp::LOKINET_VERSION, "."), (_is_service_node) ? " snode: " : " client: ", _stats_line());
if (is_service_node())
{
fmt::format_to(
out,
" snode | known/svc/clients: {}/{}/{}",
node_db()->num_rcs(),
num_router_connections(),
num_client_connections());
fmt::format_to(
out,
" | {} active paths | block {} ",
path_context().CurrentTransitPaths(),
(_rpc_client ? _rpc_client->BlockHeight() : 0));
bool have_gossiped = last_rc_gossip == std::chrono::system_clock::time_point::min();
fmt::format_to(
out,
" | gossip: (next/last) {} / {}",
short_time_from_now(next_rc_gossip),
have_gossiped ? short_time_from_now(last_rc_gossip) : "never");
line += ", gossip [{}:{} next:last]"_format(
short_time_from_now(next_rc_gossip), have_gossiped ? short_time_from_now(last_rc_gossip) : "never");
}
else
{
fmt::format_to(
out,
" client | known/connected: {}/{}",
node_db()->num_rcs(),
num_router_connections());
if (auto ep = hidden_service_context().GetDefault())
{
fmt::format_to(
out,
" | paths/endpoints {}/{}",
path_context().CurrentOwnedPaths(),
ep->UniqueEndpoints());
if (auto success_rate = ep->CurrentBuildStats().SuccessRatio(); success_rate < 0.5)
{
fmt::format_to(
out,
" [ !!! Low Build Success Rate ({:.1f}%) !!! ]",
(100.0 * success_rate));
}
};
}
return status;
return line;
}
void Router::Tick()
@ -732,8 +672,7 @@ namespace llarp
const auto now = llarp::time_now_ms();
auto now_timepoint = std::chrono::system_clock::time_point(now);
if (const auto delta = now - _last_tick;
_last_tick != 0s and delta > NETWORK_RESET_SKIP_INTERVAL)
if (const auto delta = now - _last_tick; _last_tick != 0s and delta > NETWORK_RESET_SKIP_INTERVAL)
{
// we detected a time skip into the futre, thaw the network
log::error(logcat, "Timeskip of {} detected, resetting network state!", delta.count());
@ -767,8 +706,7 @@ namespace llarp
last_rc_gossip = now_timepoint;
// TESTNET: 0 to 3 minutes before testnet gossip interval
auto delta = std::chrono::seconds{
std::uniform_int_distribution<size_t>{0, 180}(llarp::csrng)};
auto delta = std::chrono::seconds{std::uniform_int_distribution<size_t>{0, 180}(llarp::csrng)};
next_rc_gossip = now_timepoint + TESTNET_GOSSIP_INTERVAL - delta;
}
@ -829,16 +767,14 @@ namespace llarp
// routers that are not whitelisted for first hops
if (not is_snode)
{
log::trace(
logcat, "Not removing {}: we are a client and it looks fine", rc.router_id());
log::trace(logcat, "Not removing {}: we are a client and it looks fine", rc.router_id());
return false;
}
// if we don't have the whitelist yet don't remove the entry
if (not whitelist_received)
{
log::debug(
logcat, "Skipping check on {}: don't have whitelist yet", rc.router_id());
log::debug(logcat, "Skipping check on {}: don't have whitelist yet", rc.router_id());
return false;
}
// if we have no whitelist enabled or we have
@ -887,9 +823,7 @@ namespace llarp
if (num_router_conns < num_rcs)
{
log::critical(
logcat,
"Service Node connecting to {} random routers to achieve full mesh",
FULL_MESH_ITERATION);
logcat, "Service Node connecting to {} random routers to achieve full mesh", FULL_MESH_ITERATION);
_link_manager->connect_to_random(FULL_MESH_ITERATION);
}
}
@ -907,8 +841,7 @@ namespace llarp
if (num_router_conns < min_client_conns)
{
size_t needed = min_client_conns - num_router_conns;
log::critical(
logcat, "Client connecting to {} random routers to keep alive", needed);
log::critical(logcat, "Client connecting to {} random routers to keep alive", needed);
_link_manager->connect_to_random(needed);
}
else
@ -964,8 +897,8 @@ namespace llarp
if (is_running || is_stopping)
return false;
router_contact = LocalRC::make(
identity(), _is_service_node and _public_address ? *_public_address : _listen_address);
router_contact =
LocalRC::make(identity(), _is_service_node and _public_address ? *_public_address : _listen_address);
_link_manager = LinkManager::make(*this);
@ -1007,12 +940,11 @@ namespace llarp
// if (our_rid == "55fxrybf3jtausbnmxpgwcsz9t8qkf5pr8t5f4xyto4omjrkorpy.snode"sv)
// {
// log::critical(logcat, "I am decaf20: upgrading 'quic' to log::debug and 'bparser' to log::trace");
// oxen::log::set_level("bparser", oxen::log::Level::trace);
// log::critical(logcat, "I am decaf20: upgrading 'quic' to log::debug and 'bparser' to
// log::trace"); oxen::log::set_level("bparser", oxen::log::Level::trace);
// oxen::log::set_level("quic", oxen::log::Level::debug);
// }
log::info(logcat, "Starting hidden service context...");
if (!hidden_service_context().StartAll())
@ -1039,69 +971,65 @@ namespace llarp
if (is_service_node() and not _testing_disabled)
{
// do service node testing if we are in service node whitelist mode
_loop->call_every(
consensus::REACHABILITY_TESTING_TIMER_INTERVAL, weak_from_this(), [this] {
// dont run tests if we are not running or we are stopping
if (not is_running)
return;
// dont run tests if we think we should not test other routers
// this occurs when we are deregistered or do not have the service node list
// yet when we expect to have one.
if (not can_test_routers())
return;
auto tests = router_testing.get_failing();
if (auto maybe = router_testing.next_random(this))
_loop->call_every(consensus::REACHABILITY_TESTING_TIMER_INTERVAL, weak_from_this(), [this] {
// dont run tests if we are not running or we are stopping
if (not is_running)
return;
// dont run tests if we think we should not test other routers
// this occurs when we are deregistered or do not have the service node list
// yet when we expect to have one.
if (not can_test_routers())
return;
auto tests = router_testing.get_failing();
if (auto maybe = router_testing.next_random(this))
{
tests.emplace_back(*maybe, 0);
}
for (const auto& [router, fails] : tests)
{
if (not SessionToRouterAllowed(router))
{
tests.emplace_back(*maybe, 0);
log::debug(
logcat,
"{} is no longer a registered service node; dropping from test "
"list",
router);
router_testing.remove_node_from_failing(router);
continue;
}
for (const auto& [router, fails] : tests)
{
if (not SessionToRouterAllowed(router))
{
log::debug(
log::critical(logcat, "Establishing session to {} for service node testing", router);
// try to make a session to this random router
// this will do a dht lookup if needed
_link_manager->test_reachability(
router,
[this, rid = router, previous = fails](oxen::quic::connection_interface& conn) {
log::info(
logcat,
"{} is no longer a registered service node; dropping from test "
"list",
router);
router_testing.remove_node_from_failing(router);
continue;
}
log::critical(
logcat, "Establishing session to {} for service node testing", router);
// try to make a session to this random router
// this will do a dht lookup if needed
_link_manager->test_reachability(
router,
[this, rid = router, previous = fails](
oxen::quic::connection_interface& conn) {
"Successful SN reachability test to {}{}",
rid,
previous ? "after {} previous failures"_format(previous) : "");
router_testing.remove_node_from_failing(rid);
_rpc_client->InformConnection(rid, true);
conn.close_connection();
},
[this, rid = router, previous = fails](oxen::quic::connection_interface&, uint64_t ec) {
if (ec != 0)
{
log::info(
logcat,
"Successful SN reachability test to {}{}",
"Unsuccessful SN reachability test to {} after {} previous "
"failures",
rid,
previous ? "after {} previous failures"_format(previous) : "");
router_testing.remove_node_from_failing(rid);
_rpc_client->InformConnection(rid, true);
conn.close_connection();
},
[this, rid = router, previous = fails](
oxen::quic::connection_interface&, uint64_t ec) {
if (ec != 0)
{
log::info(
logcat,
"Unsuccessful SN reachability test to {} after {} previous "
"failures",
rid,
previous);
router_testing.add_failing_node(rid, previous);
}
});
}
});
previous);
router_testing.add_failing_node(rid, previous);
}
});
}
});
}
llarp::sys::service_manager->ready();
@ -1177,8 +1105,7 @@ namespace llarp
is_stopping.store(true);
if (auto level = log::get_level_default();
level > log::Level::info and level != log::Level::off)
if (auto level = log::get_level_default(); level > log::Level::info and level != log::Level::off)
log::reset_level(log::Level::info);
log::info(logcat, "stopping service manager...");

@ -51,8 +51,7 @@ namespace llarp
/// number of dht locations handled per relay
inline constexpr size_t INTROSET_REQS_PER_RELAY{2};
inline constexpr size_t INTROSET_STORAGE_REDUNDANCY{
(INTROSET_RELAY_REDUNDANCY * INTROSET_REQS_PER_RELAY)};
inline constexpr size_t INTROSET_STORAGE_REDUNDANCY{(INTROSET_RELAY_REDUNDANCY * INTROSET_REQS_PER_RELAY)};
// TESTNET: these constants are shortened for testing purposes
inline constexpr std::chrono::milliseconds TESTNET_GOSSIP_INTERVAL{15min};
@ -152,6 +151,8 @@ namespace llarp
bool should_report_stats(llarp_time_t now) const;
std::string _stats_line();
void report_stats();
void save_rc();
@ -161,8 +162,7 @@ namespace llarp
bool insufficient_peers() const;
protected:
std::chrono::system_clock::time_point last_rc_gossip{
std::chrono::system_clock::time_point::min()};
std::chrono::system_clock::time_point last_rc_gossip{std::chrono::system_clock::time_point::min()};
std::chrono::system_clock::time_point next_rc_gossip{last_rc_gossip};
std::chrono::system_clock::time_point next_initial_fetch_attempt{last_rc_gossip};
std::chrono::system_clock::time_point last_rc_fetch{last_rc_gossip};
@ -170,6 +170,8 @@ namespace llarp
std::chrono::system_clock::time_point next_bootstrap_attempt{last_rc_gossip};
public:
bool fully_meshed() const;
bool testnet() const
{
return _testnet;

Loading…
Cancel
Save