re-abstraction for client connections

- pending_conns removed in favor of direct creation of link::Connection objects in link::Endpoint::{service,client}_conn containers
- conn lookup maps removed, they were pointless
pull/2228/head
dr7ana 6 months ago
parent 3451a30d0e
commit 4437d0b373

@ -3,9 +3,10 @@
namespace llarp::link namespace llarp::link
{ {
Connection::Connection( Connection::Connection(
const std::shared_ptr<oxen::quic::connection_interface>& c, std::shared_ptr<oxen::quic::connection_interface> c,
std::shared_ptr<oxen::quic::BTRequestStream>& s) std::shared_ptr<oxen::quic::BTRequestStream> s,
: conn{c}, control_stream{s}, inbound{conn->is_inbound()} bool is_relay)
: conn{std::move(c)}, control_stream{std::move(s)}, remote_is_relay{is_relay}
{} {}
} // namespace llarp::link } // namespace llarp::link

@ -12,13 +12,18 @@ namespace llarp::link
std::shared_ptr<oxen::quic::connection_interface> conn; std::shared_ptr<oxen::quic::connection_interface> conn;
std::shared_ptr<oxen::quic::BTRequestStream> control_stream; std::shared_ptr<oxen::quic::BTRequestStream> control_stream;
// one side of a connection will be responsible for some things, e.g. heartbeat
bool inbound{false};
bool remote_is_relay{true}; bool remote_is_relay{true};
bool
is_inbound() const
{
return conn->is_inbound();
}
Connection( Connection(
const std::shared_ptr<oxen::quic::connection_interface>& c, std::shared_ptr<oxen::quic::connection_interface> c,
std::shared_ptr<oxen::quic::BTRequestStream>& s); std::shared_ptr<oxen::quic::BTRequestStream> s,
bool is_relay = true);
}; };
} // namespace llarp::link } // namespace llarp::link

@ -28,14 +28,11 @@ namespace llarp
{} {}
std::shared_ptr<link::Connection> std::shared_ptr<link::Connection>
Endpoint::get_conn(const RemoteRC& rc) const Endpoint::get_service_conn(const RouterID& rid) const
{ {
if (auto itr = service_conns.find(rc.router_id()); itr != service_conns.end()) if (auto itr = service_conns.find(rid); itr != service_conns.end())
return itr->second; return itr->second;
// if (auto itr = pending_conns.find(rc.router_id()); itr != pending_conns.end())
// return itr->second;
return nullptr; return nullptr;
} }
@ -45,32 +42,31 @@ namespace llarp
if (auto itr = service_conns.find(rid); itr != service_conns.end()) if (auto itr = service_conns.find(rid); itr != service_conns.end())
return itr->second; return itr->second;
// if (auto itr = pending_conns.find(rid); itr != pending_conns.end()) if (_is_service_node)
// return itr->second; {
if (auto itr = client_conns.find(rid); itr != client_conns.end())
return itr->second;
}
return nullptr; return nullptr;
} }
bool bool
Endpoint::have_client_conn(const RouterID& remote) const Endpoint::have_conn(const RouterID& remote) const
{ {
if (auto itr = service_conns.find(remote); itr != service_conns.end()) return have_service_conn(remote) or have_client_conn(remote);
{ }
return not itr->second->remote_is_relay;
}
if (auto itr = pending_conns.find(remote); itr != pending_conns.end())
{
return not itr->second->remote_is_relay;
}
return false; bool
Endpoint::have_client_conn(const RouterID& remote) const
{
return client_conns.count(remote);
} }
bool bool
Endpoint::have_conn(const RouterID& remote) const Endpoint::have_service_conn(const RouterID& remote) const
{ {
return service_conns.count(remote) or pending_conns.count(remote); return service_conns.count(remote);
} }
std::pair<size_t, size_t> std::pair<size_t, size_t>
@ -80,7 +76,15 @@ namespace llarp
for (const auto& c : service_conns) for (const auto& c : service_conns)
{ {
if (c.second->inbound) if (c.second->is_inbound())
++in;
else
++out;
}
for (const auto& c : client_conns)
{
if (c.second->is_inbound())
++in; ++in;
else else
++out; ++out;
@ -92,38 +96,7 @@ namespace llarp
size_t size_t
Endpoint::num_connected(bool clients_only) const Endpoint::num_connected(bool clients_only) const
{ {
size_t count = 0; return clients_only ? client_conns.size() : client_conns.size() + service_conns.size();
for (const auto& c : service_conns)
{
if (not(c.second->remote_is_relay and clients_only))
count += 1;
}
return count;
}
bool
Endpoint::get_random_connection(RemoteRC& router) const
{
if (const auto size = service_conns.size(); size)
{
auto itr = service_conns.begin();
std::advance(itr, randint() % size);
RouterID rid{itr->second->conn->remote_key()};
if (auto maybe = link_manager.node_db->get_rc(rid))
{
router = *maybe;
return true;
}
return false;
}
log::warning(quic_cat, "Error: failed to fetch random connection");
return false;
} }
void void
@ -131,27 +104,35 @@ namespace llarp
{ {
for (const auto& [rid, conn] : service_conns) for (const auto& [rid, conn] : service_conns)
func(*conn); func(*conn);
if (_is_service_node)
{
for (const auto& [rid, conn] : client_conns)
func(*conn);
}
} }
void void
Endpoint::close_connection(RouterID _rid) Endpoint::close_connection(RouterID _rid)
{ {
// assert(link_manager._router.loop()->inEventLoop());
link_manager._router.loop()->call([this, rid = _rid]() { link_manager._router.loop()->call([this, rid = _rid]() {
// deletion from pending_conns, pending_conn_msg_queue, active_conns, etc is taken care
// of by LinkManager::on_conn_closed
if (auto itr = service_conns.find(rid); itr != service_conns.end()) if (auto itr = service_conns.find(rid); itr != service_conns.end())
{ {
log::critical(logcat, "Closing connection to relay RID:{}", rid);
auto& conn = *itr->second->conn; auto& conn = *itr->second->conn;
conn.close_connection(); conn.close_connection();
} }
else if (auto itr = pending_conns.find(rid); itr != pending_conns.end()) else if (_is_service_node)
{ {
auto& conn = *itr->second->conn; if (auto itr = client_conns.find(rid); itr != client_conns.end())
conn.close_connection(); {
log::critical(logcat, "Closing connection to client RID:{}", rid);
auto& conn = *itr->second->conn;
conn.close_connection();
}
} }
else else
return; log::critical(logcat, "Could not find connection to RID:{} to close!", rid);
}); });
} }
} // namespace link } // namespace link
@ -169,9 +150,9 @@ namespace llarp
void void
LinkManager::register_commands( LinkManager::register_commands(
std::shared_ptr<oxen::quic::BTRequestStream>& s, const RouterID& router_id) std::shared_ptr<oxen::quic::BTRequestStream>& s, const RouterID& router_id, bool)
{ {
log::critical(logcat, "{} called", __PRETTY_FUNCTION__); log::debug(logcat, "{} called", __PRETTY_FUNCTION__);
s->register_command("bfetch_rcs"s, [this](oxen::quic::message m) { s->register_command("bfetch_rcs"s, [this](oxen::quic::message m) {
_router.loop()->call( _router.loop()->call(
@ -240,7 +221,7 @@ namespace llarp
- bt stream construction contains a stream close callback that shuts down the connection - bt stream construction contains a stream close callback that shuts down the connection
if the btstream closes unexpectedly if the btstream closes unexpectedly
*/ */
auto ep = quic->endpoint( auto e = quic->endpoint(
_router.listen_addr(), _router.listen_addr(),
[this](oxen::quic::connection_interface& ci) { return on_conn_open(ci); }, [this](oxen::quic::connection_interface& ci) { return on_conn_open(ci); },
[this](oxen::quic::connection_interface& ci, uint64_t ec) { [this](oxen::quic::connection_interface& ci, uint64_t ec) {
@ -250,18 +231,20 @@ namespace llarp
is_service_node() ? alpns::SERVICE_INBOUND : alpns::CLIENT_INBOUND, is_service_node() ? alpns::SERVICE_INBOUND : alpns::CLIENT_INBOUND,
is_service_node() ? alpns::SERVICE_OUTBOUND : alpns::CLIENT_OUTBOUND); is_service_node() ? alpns::SERVICE_OUTBOUND : alpns::CLIENT_OUTBOUND);
// While only service nodes accept inbound connections, clients must have this key verify
// callback set. It will reject any attempted inbound connection to a lokinet client prior to
// handshake completion
tls_creds->set_key_verify_callback([this](const ustring_view& key, const ustring_view& alpn) { tls_creds->set_key_verify_callback([this](const ustring_view& key, const ustring_view& alpn) {
auto is_snode = is_service_node();
RouterID other{key.data()}; RouterID other{key.data()};
auto us = router().is_bootstrap_seed() ? "Bootstrap seed node"s : "Service node"s; auto us = router().is_bootstrap_seed() ? "Bootstrap seed node"s : "Service node"s;
auto is_snode = is_service_node();
if (is_snode) if (is_snode)
{ {
if (alpn == alpns::C_ALPNS) if (alpn == alpns::C_ALPNS)
{ {
log::critical(logcat, "{} node accepting client connection (remote ID:{})!", us, other); log::critical(logcat, "{} node accepting client connection (remote ID:{})!", us, other);
ep.client_conns.emplace(other, nullptr);
return true; return true;
} }
if (alpn == alpns::SN_ALPNS) if (alpn == alpns::SN_ALPNS)
@ -271,10 +254,14 @@ namespace llarp
log::critical( log::critical(
logcat, logcat,
"{} node was {} to confirm remote (RID:{}) is registered; allowing connection!", "{} node was {} to confirm remote (RID:{}) is registered; {} connection!",
us, us,
result ? "able" : "unable", result ? "able" : "unable",
other); other,
result ? "allowing" : "rejecting");
if (result)
ep.service_conns.emplace(other, nullptr);
return result; return result;
} }
@ -283,81 +270,76 @@ namespace llarp
return false; return false;
} }
// TESTNET: change this to an error message later; just because someone tries to erroneously
// connect to a local lokinet client doesn't mean we should kill the program?
throw std::runtime_error{"Clients should not be validating inbound connections!"}; throw std::runtime_error{"Clients should not be validating inbound connections!"};
}); });
if (_router.is_service_node()) if (_router.is_service_node())
{ {
ep->listen(tls_creds, ROUTER_KEEP_ALIVE); e->listen(tls_creds, ROUTER_KEEP_ALIVE);
} }
return ep; return e;
} }
void std::shared_ptr<oxen::quic::BTRequestStream>
LinkManager::on_inbound_conn(oxen::quic::connection_interface& ci) LinkManager::make_control(oxen::quic::connection_interface& ci, const RouterID& rid)
{ {
const auto& scid = ci.scid();
RouterID rid{ci.remote_key()};
ep.service_connid_map.emplace(scid, rid);
auto [itr, b] = ep.service_conns.emplace(rid, nullptr);
log::critical(logcat, "Queueing BTStream to be opened...");
auto control_stream = ci.queue_incoming_stream<oxen::quic::BTRequestStream>( auto control_stream = ci.queue_incoming_stream<oxen::quic::BTRequestStream>(
[this, rid = rid](oxen::quic::Stream&, uint64_t error_code) { [this, rid = rid](oxen::quic::Stream&, uint64_t error_code) {
log::warning( log::warning(
logcat, logcat,
"BTRequestStream closed unexpectedly (ec:{}); closing connection...", "BTRequestStream closed unexpectedly (ec:{}); closing inbound connection...",
error_code); error_code);
ep.close_connection(rid); ep.close_connection(rid);
}); });
log::critical(logcat, "Queued BTStream to be opened ID:{}", control_stream->stream_id()); log::critical(logcat, "Queued BTStream to be opened (ID:{})", control_stream->stream_id());
assert(control_stream->stream_id() == 0); assert(control_stream->stream_id() == 0);
register_commands(control_stream, rid); register_commands(control_stream, rid);
itr->second = std::make_shared<link::Connection>(ci.shared_from_this(), control_stream); return control_stream;
log::critical(logcat, "Successfully configured inbound connection fom {}...", rid);
} }
// TODO: should we add routes here now that Router::SessionOpen is gone?
void void
LinkManager::on_conn_open(oxen::quic::connection_interface& ci) LinkManager::on_inbound_conn(oxen::quic::connection_interface& ci)
{ {
_router.loop()->call([this, &conn_interface = ci]() { assert(_is_service_node);
const auto rid = RouterID{conn_interface.remote_key()}; RouterID rid{ci.remote_key()};
const auto& remote = conn_interface.remote();
const auto& scid = conn_interface.scid();
if (conn_interface.is_inbound())
{
log::critical(logcat, "Inbound connection fom {} (remote:{})", rid, remote);
on_inbound_conn(conn_interface);
}
else
{
log::critical(logcat, "Searching for RID:{} in pending conns...", rid);
if (auto itr = ep.pending_conns.find(rid); itr != ep.pending_conns.end())
{
ep.service_connid_map.emplace(scid, rid);
auto [it, b] = ep.service_conns.emplace(rid, nullptr);
it->second = std::move(itr->second);
ep.pending_conns.erase(itr);
log::critical(logcat, "Connection to RID:{} moved from pending to active conns!", rid);
}
else
throw std::runtime_error{"Could not find newly established connection in pending conns!"};
}
if (auto it = ep.service_conns.find(rid); it != ep.service_conns.end())
{
log::critical(logcat, "Configuring inbound connection from relay RID:{}", rid);
it->second = std::make_shared<link::Connection>(ci.shared_from_this(), make_control(ci, rid));
}
else if (auto it = ep.client_conns.find(rid); it != ep.client_conns.end())
{
log::critical(logcat, "Configuring inbound connection from client RID:{}", rid);
it->second =
std::make_shared<link::Connection>(ci.shared_from_this(), make_control(ci, rid), false);
}
else
{
log::critical( log::critical(
logcat, logcat,
"SERVICE NODE (RID:{}) ESTABLISHED CONNECTION TO RID:{}", "ERROR: connection accepted from RID:{} that was not logged in key verification!",
_router.local_rid(),
rid); rid);
}
log::critical(logcat, "Successfully configured inbound connection fom {}...", rid);
}
void
LinkManager::on_outbound_conn(oxen::quic::connection_interface& ci)
{
RouterID rid{ci.remote_key()};
if (auto it = ep.service_conns.find(rid); it != ep.service_conns.end())
{
log::critical(logcat, "Fetched configured outbound connection to relay RID:{}", rid);
auto& conn = it->second->conn;
auto& str = it->second->control_stream;
// check to see if this connection was established while we were attempting to queue
// messages to the remote
if (auto itr = pending_conn_msg_queue.find(rid); itr != pending_conn_msg_queue.end()) if (auto itr = pending_conn_msg_queue.find(rid); itr != pending_conn_msg_queue.end())
{ {
log::critical(logcat, "Clearing pending queue for RID:{}", rid); log::critical(logcat, "Clearing pending queue for RID:{}", rid);
@ -370,12 +352,12 @@ namespace llarp
if (msg.is_control) if (msg.is_control)
{ {
log::critical(logcat, "Dispatching {} request!", *msg.endpoint); log::critical(logcat, "Dispatching {} request!", *msg.endpoint);
ep.service_conns[rid]->control_stream->command( str->command(std::move(*msg.endpoint), std::move(msg.body), std::move(msg.func));
std::move(*msg.endpoint), std::move(msg.body), std::move(msg.func));
} }
else else
{ {
conn_interface.send_datagram(std::move(msg.body)); log::critical(logcat, "DIspatching data message: {}", msg.body);
conn->send_datagram(std::move(msg.body));
} }
que.pop_front(); que.pop_front();
@ -383,6 +365,40 @@ namespace llarp
} }
log::warning(logcat, "Pending queue empty for RID:{}", rid); log::warning(logcat, "Pending queue empty for RID:{}", rid);
}
else
{
log::critical(
logcat,
"ERROR: connection established to RID:{} that was not logged in key verifrication!",
rid);
}
}
// TODO: should we add routes here now that Router::SessionOpen is gone?
void
LinkManager::on_conn_open(oxen::quic::connection_interface& ci)
{
_router.loop()->call([this, &conn_interface = ci]() {
const auto rid = RouterID{conn_interface.remote_key()};
const auto& remote = conn_interface.remote();
if (conn_interface.is_inbound())
{
log::critical(logcat, "Inbound connection fom {} (remote:{})", rid, remote);
on_inbound_conn(conn_interface);
}
else
{
log::critical(logcat, "Outbound connection fom {} (remote:{})", rid, remote);
on_outbound_conn(conn_interface);
}
log::critical(
logcat,
"SERVICE NODE (RID:{}) ESTABLISHED CONNECTION TO RID:{}",
_router.local_rid(),
rid);
}); });
}; };
@ -390,33 +406,22 @@ namespace llarp
LinkManager::on_conn_closed(oxen::quic::connection_interface& ci, uint64_t ec) LinkManager::on_conn_closed(oxen::quic::connection_interface& ci, uint64_t ec)
{ {
_router.loop()->call( _router.loop()->call(
[this, scid = ci.scid(), _rid = RouterID{ci.remote_key()}, error_code = ec]() { [this, scid = ci.scid(), rid = RouterID{ci.remote_key()}, error_code = ec]() {
log::critical(quic_cat, "Purging quic connection CID:{} (ec: {})", scid, error_code); log::critical(quic_cat, "Purging quic connection CID:{} (ec:{})", scid, error_code);
// a pending connection would not be in the connid_map
if (auto v_itr = ep.pending_conns.find(_rid); v_itr != ep.pending_conns.end())
{
ep.pending_conns.erase(v_itr);
// in case this didn't clear earlier, do it now // in case this didn't clear earlier, do it now
if (auto p_itr = pending_conn_msg_queue.find(_rid); if (auto p_itr = pending_conn_msg_queue.find(rid); p_itr != pending_conn_msg_queue.end())
p_itr != pending_conn_msg_queue.end()) pending_conn_msg_queue.erase(p_itr);
pending_conn_msg_queue.erase(p_itr);
log::critical(quic_cat, "Pending quic connection CID:{} purged successfully", scid); if (auto s_itr = ep.service_conns.find(rid); s_itr != ep.service_conns.end())
{
log::critical(quic_cat, "Quic connection to relay RID:{} purged successfully", rid);
ep.service_conns.erase(s_itr);
} }
else if (const auto& c_itr = ep.service_connid_map.find(scid); else if (auto c_itr = ep.client_conns.find(rid); c_itr != ep.client_conns.end())
c_itr != ep.service_connid_map.end())
{ {
const auto& rid = c_itr->second; log::critical(quic_cat, "Quic connection to client RID:{} purged successfully", rid);
assert(_rid == rid); // this should hold true ep.client_conns.erase(c_itr);
if (auto m_itr = ep.service_conns.find(rid); m_itr != ep.service_conns.end())
ep.service_conns.erase(m_itr);
ep.service_connid_map.erase(c_itr);
log::critical(quic_cat, "Quic connection CID:{} purged successfully", scid);
} }
else else
log::critical(quic_cat, "Nothing to purge for quic connection CID:{}", scid); log::critical(quic_cat, "Nothing to purge for quic connection CID:{}", scid);
@ -466,19 +471,15 @@ namespace llarp
f = std::move(func)]() { f = std::move(func)]() {
auto pending = PendingMessage(std::move(body), std::move(endpoint), std::move(f)); auto pending = PendingMessage(std::move(body), std::move(endpoint), std::move(f));
if (auto it1 = ep.pending_conns.find(remote); it1 != ep.pending_conns.end()) if (auto it = pending_conn_msg_queue.find(remote); it != pending_conn_msg_queue.end())
{ {
if (auto it2 = pending_conn_msg_queue.find(remote); it2 != pending_conn_msg_queue.end()) it->second.push_back(std::move(pending));
{ log::critical(
it2->second.push_back(std::move(pending)); logcat, "Connection to RID:{} is pending; message appended to send queue!", remote);
log::critical(
logcat, "Connection (RID:{}) is pending; message appended to send queue!", remote);
}
} }
else else
{ {
log::critical( log::critical(logcat, "Connection to RID:{} is pending; creating send queue!", remote);
logcat, "Connection (RID:{}) not found in pending conns; creating send queue!", remote);
auto [itr, b] = pending_conn_msg_queue.emplace(remote, MessageQueue()); auto [itr, b] = pending_conn_msg_queue.emplace(remote, MessageQueue());
itr->second.push_back(std::move(pending)); itr->second.push_back(std::move(pending));
connect_to(remote); connect_to(remote);
@ -494,7 +495,7 @@ namespace llarp
if (is_stopping) if (is_stopping)
return false; return false;
if (auto conn = ep.get_conn(remote); conn) if (auto conn = ep.get_service_conn(remote); conn)
{ {
conn->conn->send_datagram(std::move(body)); conn->conn->send_datagram(std::move(body));
return true; return true;
@ -539,13 +540,12 @@ namespace llarp
log::warning(quic_cat, "Could not find RouterContact for connection to rid:{}", rid); log::warning(quic_cat, "Could not find RouterContact for connection to rid:{}", rid);
} }
// This function assumes the RC has already had its signature verified and connection is allowed.
void void
LinkManager::connect_to(const RemoteRC& rc, conn_open_hook on_open, conn_closed_hook on_close) LinkManager::connect_to(const RemoteRC& rc, conn_open_hook on_open, conn_closed_hook on_close)
{ {
const auto& rid = rc.router_id(); const auto& rid = rc.router_id();
if (ep.have_conn(rid)) if (ep.have_service_conn(rid))
{ {
log::warning(logcat, "We already have a connection to {}!", rid); log::warning(logcat, "We already have a connection to {}!", rid);
// TODO: should implement some connection failed logic, but not the same logic that // TODO: should implement some connection failed logic, but not the same logic that
@ -576,6 +576,12 @@ namespace llarp
return ep.have_conn(remote); return ep.have_conn(remote);
} }
bool
LinkManager::have_service_connection_to(const RouterID& remote) const
{
return ep.have_service_conn(remote);
}
bool bool
LinkManager::have_client_connection_to(const RouterID& remote) const LinkManager::have_client_connection_to(const RouterID& remote) const
{ {
@ -628,12 +634,6 @@ namespace llarp
return get_num_connected(true); return get_num_connected(true);
} }
bool
LinkManager::get_random_connected(RemoteRC& router) const
{
return ep.get_random_connection(router);
}
bool bool
LinkManager::is_service_node() const LinkManager::is_service_node() const
{ {
@ -666,10 +666,10 @@ namespace llarp
LinkManager::connect_to_random(int num_conns, bool client_only) LinkManager::connect_to_random(int num_conns, bool client_only)
{ {
auto filter = [this, client_only](const RemoteRC& rc) -> bool { auto filter = [this, client_only](const RemoteRC& rc) -> bool {
auto res = const auto& rid = rc.router_id();
client_only ? not ep.have_client_conn(rc.router_id()) : not ep.have_conn(rc.router_id()); auto res = client_only ? not ep.have_client_conn(rid) : not ep.have_conn(rid);
log::debug(logcat, "RID:{} {}", rc.router_id(), res ? "ACCEPTED" : "REJECTED"); log::debug(logcat, "RID:{} {}", rid, res ? "ACCEPTED" : "REJECTED");
return res; return res;
}; };
@ -766,17 +766,19 @@ namespace llarp
}; };
} }
if (auto conn = ep.get_conn(source); conn) const auto& rid = source.router_id();
if (auto conn = ep.get_service_conn(rid); conn)
{ {
conn->control_stream->command("bfetch_rcs"s, std::move(payload), std::move(f)); conn->control_stream->command("bfetch_rcs"s, std::move(payload), std::move(f));
log::critical(logcat, "Dispatched bootstrap fetch request!"); log::critical(logcat, "Dispatched bootstrap fetch request!");
return; return;
} }
log::critical(logcat, "Queuing bootstrap fetch request to {}", source.router_id()); log::critical(logcat, "Queuing bootstrap fetch request to {}", rid);
auto pending = PendingMessage(std::move(payload), "bfetch_rcs"s, std::move(f)); auto pending = PendingMessage(std::move(payload), "bfetch_rcs"s, std::move(f));
auto [itr, b] = pending_conn_msg_queue.emplace(source.router_id(), MessageQueue()); auto [itr, b] = pending_conn_msg_queue.emplace(rid, MessageQueue());
itr->second.push_back(std::move(pending)); itr->second.push_back(std::move(pending));
connect_to(source); connect_to(source);

@ -63,29 +63,31 @@ namespace llarp
std::shared_ptr<oxen::quic::Endpoint> endpoint; std::shared_ptr<oxen::quic::Endpoint> endpoint;
LinkManager& link_manager; LinkManager& link_manager;
// for outgoing packets, we route via RouterID; map RouterID->Connection /** Connection containers:
// for incoming packets, we get a ConnectionID; map ConnectionID->RouterID - service_conns: holds all connections where the remote (from the perspective
of the local lokinet instance) is a service node. This means all relay to
relay connections are held here; clients will also hold their connections to
relays here as well
- client_conns: holds all connections wehre the remote is a client. This is only
used by service nodes to store their client connections
*/
std::unordered_map<RouterID, std::shared_ptr<link::Connection>> service_conns; std::unordered_map<RouterID, std::shared_ptr<link::Connection>> service_conns;
std::unordered_map<oxen::quic::ConnectionID, RouterID> service_connid_map;
std::unordered_map<RouterID, std::shared_ptr<link::Connection>> client_conns; std::unordered_map<RouterID, std::shared_ptr<link::Connection>> client_conns;
std::unordered_map<oxen::quic::ConnectionID, RouterID> client_connid_map;
// for pending connections, cleared in LinkManager::on_conn_open
std::unordered_map<RouterID, std::shared_ptr<link::Connection>> pending_conns;
// TODO: see which of these is actually useful and delete the other
std::shared_ptr<link::Connection> std::shared_ptr<link::Connection>
get_conn(const RemoteRC&) const; get_conn(const RouterID&) const;
std::shared_ptr<link::Connection> std::shared_ptr<link::Connection>
get_conn(const RouterID&) const; get_service_conn(const RouterID&) const;
bool
have_conn(const RouterID& remote) const;
bool bool
have_client_conn(const RouterID& remote) const; have_client_conn(const RouterID& remote) const;
bool bool
have_conn(const RouterID& remote) const; have_service_conn(const RouterID& remote) const;
std::pair<size_t, size_t> std::pair<size_t, size_t>
num_in_out() const; num_in_out() const;
@ -93,9 +95,6 @@ namespace llarp
size_t size_t
num_connected(bool clients_only) const; num_connected(bool clients_only) const;
bool
get_random_connection(RemoteRC& router) const;
template <typename... Opt> template <typename... Opt>
bool bool
establish_connection( establish_connection(
@ -229,9 +228,15 @@ namespace llarp
void void
recv_control_message(oxen::quic::message msg); recv_control_message(oxen::quic::message msg);
std::shared_ptr<oxen::quic::BTRequestStream>
make_control(oxen::quic::connection_interface& ci, const RouterID& rid);
void void
on_inbound_conn(oxen::quic::connection_interface& ci); on_inbound_conn(oxen::quic::connection_interface& ci);
void
on_outbound_conn(oxen::quic::connection_interface& ci);
void void
on_conn_open(oxen::quic::connection_interface& ci); on_conn_open(oxen::quic::connection_interface& ci);
@ -242,7 +247,10 @@ namespace llarp
startup_endpoint(); startup_endpoint();
void void
register_commands(std::shared_ptr<oxen::quic::BTRequestStream>& s, const RouterID& rid); register_commands(
std::shared_ptr<oxen::quic::BTRequestStream>& s,
const RouterID& rid,
bool client_only = false);
public: public:
const link::Endpoint& const link::Endpoint&
@ -291,6 +299,9 @@ namespace llarp
bool bool
have_connection_to(const RouterID& remote) const; have_connection_to(const RouterID& remote) const;
bool
have_service_connection_to(const RouterID& remote) const;
bool bool
have_client_connection_to(const RouterID& remote) const; have_client_connection_to(const RouterID& remote) const;
@ -321,9 +332,6 @@ namespace llarp
size_t size_t
get_num_connected_clients() const; get_num_connected_clients() const;
bool
get_random_connected(RemoteRC& router) const;
bool bool
is_service_node() const; is_service_node() const;
@ -444,22 +452,24 @@ namespace llarp
is_snode ? ROUTER_KEEP_ALIVE : CLIENT_KEEP_ALIVE, is_snode ? ROUTER_KEEP_ALIVE : CLIENT_KEEP_ALIVE,
std::forward<Opt>(opts)...); std::forward<Opt>(opts)...);
// add to pending conns // add to service conns
auto [itr, b] = pending_conns.emplace(rid, nullptr); auto [itr, b] = service_conns.emplace(rid, nullptr);
auto control_stream = conn_interface->template open_stream<oxen::quic::BTRequestStream>( auto control_stream = conn_interface->template open_stream<oxen::quic::BTRequestStream>(
[this, rid = rid](oxen::quic::Stream&, uint64_t error_code) { [this, rid = rid](oxen::quic::Stream&, uint64_t error_code) {
log::warning( log::warning(
logcat, logcat,
"BTRequestStream closed unexpectedly (ec:{}); closing connection...", "BTRequestStream closed unexpectedly (ec:{}); closing outbound connection...",
error_code); error_code);
close_connection(rid); close_connection(rid);
}); });
log::critical(logcat, "Opened BTStream (ID:{})", control_stream->stream_id());
assert(control_stream->stream_id() == 0);
link_manager.register_commands(control_stream, rid); link_manager.register_commands(control_stream, rid);
itr->second = std::make_shared<link::Connection>(conn_interface, control_stream); itr->second = std::make_shared<link::Connection>(conn_interface, control_stream, true);
log::critical(logcat, "Connection to RID:{} added to pending connections...", rid); log::critical(logcat, "Outbound connection to RID:{} added to service conns...", rid);
return true; return true;
} }
catch (...) catch (...)
@ -469,50 +479,4 @@ namespace llarp
} }
} }
} // namespace link } // namespace link
} // namespace llarp } // namespace llarp
/*
- Refactor RouterID to use gnutls info and maybe ConnectionID
- Combine routerID and connectionID to simplify mapping in llarp/link/endpoint.hpp
- Combine llarp/link/session.hpp into llarp/link/connection.hpp::Connection
- Combine llarp/link/server.hpp::ILinkLayer into llarp/link/endpoint.hpp::Endpoint
- must maintain metadata storage, callbacks, etc
- If: one endpoint for ipv4 and ipv6
- Then: can potentially combine:
- llarp/link/endpoint.hpp
- llarp/link/link_manager.hpp
- llarp/link/outbound_message_handler.hpp
- llarp/link/outbound_session_maker.hpp
-> Yields mega-combo endpoint managing object?
- Can avoid "kitchen sink" by greatly reducing complexity of implementation
llarp/router/outbound_message_handler.hpp
- pendingsessionmessagequeue
- establish queue of messages to be sent on a connection we are creating
- upon creation, send these messages in the connection established callback
- if connection times out, flush queue
- TOCHECK: is priority used at all??
std::unordered_map<std::string, void (llarp::link::LinkManager::*)(oxen::quic::message)>
rpc_commands = {
{"find_name", &handle_find_name},
// ...
};
for (const auto& [name, mfn] : rpc_commands)
bparser.add_command(name, [this, mfn] (oxen::quic::message m) {
router->call([this, mfn, m=std::move(m)] mutable {
try {
std::invoke(mfn, this, std::move(m));
} catch (const std::exception& e) {
m.respond("Error: "s + e.what(), true);
}
});
});
*/

@ -287,7 +287,7 @@ namespace llarp
const auto fetch_threshold = (double)union_size / num_received; const auto fetch_threshold = (double)union_size / num_received;
/** We are checking 2, potentially 3 things here: /** We are checking 2 things here:
1) The ratio of received/accepted to total received is above GOOD_RID_FETCH_THRESHOLD. 1) The ratio of received/accepted to total received is above GOOD_RID_FETCH_THRESHOLD.
This tells us how well the rid source's sets of rids "agree" with one another This tells us how well the rid source's sets of rids "agree" with one another
2) The total number received is above MIN_RID_FETCH_TOTAL. This ensures that we are 2) The total number received is above MIN_RID_FETCH_TOTAL. This ensures that we are
@ -790,6 +790,13 @@ namespace llarp
const std::vector<RouterID>& greylist, const std::vector<RouterID>& greylist,
const std::vector<RouterID>& greenlist) const std::vector<RouterID>& greenlist)
{ {
log::critical(
logcat,
"Oxend provided {}/{}/{} (white/gray/green) routers",
whitelist.size(),
greylist.size(),
greenlist.size());
if (whitelist.empty()) if (whitelist.empty())
return; return;
@ -807,7 +814,7 @@ namespace llarp
log::critical( log::critical(
logcat, logcat,
"Oxend provided {}:{} (whitelist:registered)", "Service node holding {}:{} (whitelist:registered) after oxend integration",
_router_whitelist.size(), _router_whitelist.size(),
_registered_routers.size()); _registered_routers.size());
} }
@ -1039,7 +1046,7 @@ namespace llarp
RemoteRC rc{}; RemoteRC rc{};
const llarp::dht::XorMetric compare(location); const llarp::dht::XorMetric compare(location);
VisitAll([&rc, compare](const auto& otherRC) { visit_all([&rc, compare](const auto& otherRC) {
const auto& rid = rc.router_id(); const auto& rid = rc.router_id();
if (rid.IsZero() || compare(dht::Key_t{otherRC.router_id()}, dht::Key_t{rid})) if (rid.IsZero() || compare(dht::Key_t{otherRC.router_id()}, dht::Key_t{rid}))

@ -481,7 +481,7 @@ namespace llarp
/// visit all known_rcs /// visit all known_rcs
template <typename Visit> template <typename Visit>
void void
VisitAll(Visit visit) const visit_all(Visit visit) const
{ {
_router.loop()->call([this, visit]() { _router.loop()->call([this, visit]() {
for (const auto& item : known_rcs) for (const auto& item : known_rcs)
@ -496,7 +496,7 @@ namespace llarp
/// remove an entry given a filter that inspects the rc /// remove an entry given a filter that inspects the rc
template <typename Filter> template <typename Filter>
void void
RemoveIf(Filter visit) remove_if(Filter visit)
{ {
_router.loop()->call([this, visit]() { _router.loop()->call([this, visit]() {
std::unordered_set<RouterID> removed; std::unordered_set<RouterID> removed;

@ -847,7 +847,7 @@ namespace llarp
_last_tick != 0s and delta > NETWORK_RESET_SKIP_INTERVAL) _last_tick != 0s and delta > NETWORK_RESET_SKIP_INTERVAL)
{ {
// we detected a time skip into the futre, thaw the network // we detected a time skip into the futre, thaw the network
log::warning(logcat, "Timeskip of {} detected, resetting network state!", delta.count()); log::error(logcat, "Timeskip of {} detected, resetting network state!", delta.count());
Thaw(); Thaw();
} }
@ -877,28 +877,28 @@ namespace llarp
last_rc_gossip = now_timepoint; last_rc_gossip = now_timepoint;
// TESTNET: 1 to 4 minutes before testnet gossip interval // TESTNET: 1 to 5 minutes before testnet gossip interval
auto random_delta = auto delta =
std::chrono::seconds{std::uniform_int_distribution<size_t>{60, 300}(llarp::csrng)}; std::chrono::seconds{std::uniform_int_distribution<size_t>{60, 300}(llarp::csrng)};
// 1min to 5min before "stale time" is next gossip time // 1min to 5min before "stale time" is next gossip time
// auto random_delta = // auto random_delta =
// std::chrono::seconds{std::uniform_int_distribution<size_t>{60, 300}(llarp::csrng)}; // std::chrono::seconds{std::uniform_int_distribution<size_t>{60, 300}(llarp::csrng)};
next_rc_gossip = now_timepoint + TESTNET_GOSSIP_INTERVAL - random_delta; next_rc_gossip = now_timepoint + TESTNET_GOSSIP_INTERVAL - delta;
// next_rc_gossip = now_timepoint + RouterContact::STALE_AGE - random_delta; // next_rc_gossip = now_timepoint + RouterContact::STALE_AGE - random_delta;
} }
report_stats(); report_stats();
} }
if (needs_initial_fetch()) if (needs_rebootstrap() and now_timepoint > next_bootstrap_attempt)
{ {
if (not _config->bootstrap.seednode) node_db()->fallback_to_bootstrap();
node_db()->fetch_initial(is_service_node());
} }
else if (needs_rebootstrap() and now_timepoint > next_bootstrap_attempt) else if (needs_initial_fetch())
{ {
node_db()->fallback_to_bootstrap(); if (not _config->bootstrap.seednode)
node_db()->fetch_initial(is_service_node());
} }
else if (not is_snode) else if (not is_snode)
{ {
@ -918,13 +918,14 @@ namespace llarp
} }
// remove RCs for nodes that are no longer allowed by network policy // remove RCs for nodes that are no longer allowed by network policy
node_db()->RemoveIf([&](const RemoteRC& rc) -> bool { node_db()->remove_if([&](const RemoteRC& rc) -> bool {
// don't purge bootstrap nodes from nodedb // don't purge bootstrap nodes from nodedb
if (is_bootstrap_node(rc.router_id())) if (is_bootstrap_node(rc.router_id()))
{ {
log::trace(logcat, "Not removing {}: is bootstrap node", rc.router_id()); log::trace(logcat, "Not removing {}: is bootstrap node", rc.router_id());
return false; return false;
} }
// if for some reason we stored an RC that isn't a valid router // if for some reason we stored an RC that isn't a valid router
// purge this entry // purge this entry
if (not rc.is_public_addressable()) if (not rc.is_public_addressable())
@ -932,12 +933,14 @@ namespace llarp
log::debug(logcat, "Removing {}: not a valid router", rc.router_id()); log::debug(logcat, "Removing {}: not a valid router", rc.router_id());
return true; return true;
} }
/// clear out a fully expired RC
// clear out a fully expired RC
if (rc.is_expired(now)) if (rc.is_expired(now))
{ {
log::debug(logcat, "Removing {}: RC is expired", rc.router_id()); log::debug(logcat, "Removing {}: RC is expired", rc.router_id());
return true; return true;
} }
// clients have no notion of a whilelist // clients have no notion of a whilelist
// we short circuit logic here so we dont remove // we short circuit logic here so we dont remove
// routers that are not whitelisted for first hops // routers that are not whitelisted for first hops
@ -965,26 +968,6 @@ namespace llarp
return false; return false;
}); });
/* TODO: this behavior seems incorrect, but fixing it will require discussion
*
if (not is_snode or not whitelist_received)
{
// find all deregistered relays
std::unordered_set<RouterID> close_peers;
for_each_connection([this, &close_peers](link::Connection& conn) {
const auto& pk = conn.remote_rc.router_id();
if (conn.remote_is_relay and not _rc_lookup_handler.is_session_allowed(pk))
close_peers.insert(pk);
});
// mark peers as de-registered
for (auto& peer : close_peers)
_link_manager.close_connection(peer);
}
*/
_link_manager->check_persisting_conns(now); _link_manager->check_persisting_conns(now);
auto num_conns = num_router_connections(); auto num_conns = num_router_connections();
@ -1064,12 +1047,6 @@ namespace llarp
_last_tick = llarp::time_now_ms(); _last_tick = llarp::time_now_ms();
} }
bool
Router::GetRandomConnectedRouter(RemoteRC& result) const
{
return _link_manager->get_random_connected(result);
}
const std::set<RouterID>& const std::set<RouterID>&
Router::get_whitelist() const Router::get_whitelist() const
{ {
@ -1324,12 +1301,6 @@ namespace llarp
_loop->call_later(200ms, [this] { AfterStopIssued(); }); _loop->call_later(200ms, [this] { AfterStopIssued(); });
} }
bool
Router::HasSessionTo(const RouterID& remote) const
{
return _link_manager->have_connection_to(remote);
}
std::string std::string
Router::ShortName() const Router::ShortName() const
{ {

@ -540,12 +540,6 @@ namespace llarp
size_t size_t
num_client_connections() const; num_client_connections() const;
bool
GetRandomConnectedRouter(RemoteRC& result) const;
bool
HasSessionTo(const RouterID& remote) const;
std::string std::string
ShortName() const; ShortName() const;

Loading…
Cancel
Save