re-abstraction for client connections

- pending_conns removed in favor of direct creation of link::Connection objects in link::Endpoint::{service,client}_conn containers
- conn lookup maps removed, they were pointless
pull/2232/head
dr7ana 5 months ago
parent 3e9d5a97a8
commit 9cc053608a

@ -3,9 +3,10 @@
namespace llarp::link
{
Connection::Connection(
const std::shared_ptr<oxen::quic::connection_interface>& c,
std::shared_ptr<oxen::quic::BTRequestStream>& s)
: conn{c}, control_stream{s}, inbound{conn->is_inbound()}
std::shared_ptr<oxen::quic::connection_interface> c,
std::shared_ptr<oxen::quic::BTRequestStream> s,
bool is_relay)
: conn{std::move(c)}, control_stream{std::move(s)}, remote_is_relay{is_relay}
{}
} // namespace llarp::link

@ -12,13 +12,18 @@ namespace llarp::link
std::shared_ptr<oxen::quic::connection_interface> conn;
std::shared_ptr<oxen::quic::BTRequestStream> control_stream;
// one side of a connection will be responsible for some things, e.g. heartbeat
bool inbound{false};
bool remote_is_relay{true};
bool
is_inbound() const
{
return conn->is_inbound();
}
Connection(
const std::shared_ptr<oxen::quic::connection_interface>& c,
std::shared_ptr<oxen::quic::BTRequestStream>& s);
std::shared_ptr<oxen::quic::connection_interface> c,
std::shared_ptr<oxen::quic::BTRequestStream> s,
bool is_relay = true);
};
} // namespace llarp::link

@ -28,14 +28,11 @@ namespace llarp
{}
std::shared_ptr<link::Connection>
Endpoint::get_conn(const RemoteRC& rc) const
Endpoint::get_service_conn(const RouterID& rid) const
{
if (auto itr = service_conns.find(rc.router_id()); itr != service_conns.end())
if (auto itr = service_conns.find(rid); itr != service_conns.end())
return itr->second;
// if (auto itr = pending_conns.find(rc.router_id()); itr != pending_conns.end())
// return itr->second;
return nullptr;
}
@ -45,32 +42,31 @@ namespace llarp
if (auto itr = service_conns.find(rid); itr != service_conns.end())
return itr->second;
// if (auto itr = pending_conns.find(rid); itr != pending_conns.end())
// return itr->second;
if (_is_service_node)
{
if (auto itr = client_conns.find(rid); itr != client_conns.end())
return itr->second;
}
return nullptr;
}
bool
Endpoint::have_client_conn(const RouterID& remote) const
Endpoint::have_conn(const RouterID& remote) const
{
if (auto itr = service_conns.find(remote); itr != service_conns.end())
{
return not itr->second->remote_is_relay;
}
if (auto itr = pending_conns.find(remote); itr != pending_conns.end())
{
return not itr->second->remote_is_relay;
}
return have_service_conn(remote) or have_client_conn(remote);
}
return false;
bool
Endpoint::have_client_conn(const RouterID& remote) const
{
return client_conns.count(remote);
}
bool
Endpoint::have_conn(const RouterID& remote) const
Endpoint::have_service_conn(const RouterID& remote) const
{
return service_conns.count(remote) or pending_conns.count(remote);
return service_conns.count(remote);
}
std::pair<size_t, size_t>
@ -80,7 +76,15 @@ namespace llarp
for (const auto& c : service_conns)
{
if (c.second->inbound)
if (c.second->is_inbound())
++in;
else
++out;
}
for (const auto& c : client_conns)
{
if (c.second->is_inbound())
++in;
else
++out;
@ -92,38 +96,7 @@ namespace llarp
size_t
Endpoint::num_connected(bool clients_only) const
{
size_t count = 0;
for (const auto& c : service_conns)
{
if (not(c.second->remote_is_relay and clients_only))
count += 1;
}
return count;
}
bool
Endpoint::get_random_connection(RemoteRC& router) const
{
if (const auto size = service_conns.size(); size)
{
auto itr = service_conns.begin();
std::advance(itr, randint() % size);
RouterID rid{itr->second->conn->remote_key()};
if (auto maybe = link_manager.node_db->get_rc(rid))
{
router = *maybe;
return true;
}
return false;
}
log::warning(quic_cat, "Error: failed to fetch random connection");
return false;
return clients_only ? client_conns.size() : client_conns.size() + service_conns.size();
}
void
@ -131,27 +104,35 @@ namespace llarp
{
for (const auto& [rid, conn] : service_conns)
func(*conn);
if (_is_service_node)
{
for (const auto& [rid, conn] : client_conns)
func(*conn);
}
}
void
Endpoint::close_connection(RouterID _rid)
{
// assert(link_manager._router.loop()->inEventLoop());
link_manager._router.loop()->call([this, rid = _rid]() {
// deletion from pending_conns, pending_conn_msg_queue, active_conns, etc is taken care
// of by LinkManager::on_conn_closed
if (auto itr = service_conns.find(rid); itr != service_conns.end())
{
log::critical(logcat, "Closing connection to relay RID:{}", rid);
auto& conn = *itr->second->conn;
conn.close_connection();
}
else if (auto itr = pending_conns.find(rid); itr != pending_conns.end())
else if (_is_service_node)
{
auto& conn = *itr->second->conn;
conn.close_connection();
if (auto itr = client_conns.find(rid); itr != client_conns.end())
{
log::critical(logcat, "Closing connection to client RID:{}", rid);
auto& conn = *itr->second->conn;
conn.close_connection();
}
}
else
return;
log::critical(logcat, "Could not find connection to RID:{} to close!", rid);
});
}
} // namespace link
@ -169,9 +150,9 @@ namespace llarp
void
LinkManager::register_commands(
std::shared_ptr<oxen::quic::BTRequestStream>& s, const RouterID& router_id)
std::shared_ptr<oxen::quic::BTRequestStream>& s, const RouterID& router_id, bool)
{
log::critical(logcat, "{} called", __PRETTY_FUNCTION__);
log::debug(logcat, "{} called", __PRETTY_FUNCTION__);
s->register_command("bfetch_rcs"s, [this](oxen::quic::message m) {
_router.loop()->call(
@ -240,7 +221,7 @@ namespace llarp
- bt stream construction contains a stream close callback that shuts down the connection
if the btstream closes unexpectedly
*/
auto ep = quic->endpoint(
auto e = quic->endpoint(
_router.listen_addr(),
[this](oxen::quic::connection_interface& ci) { return on_conn_open(ci); },
[this](oxen::quic::connection_interface& ci, uint64_t ec) {
@ -250,18 +231,20 @@ namespace llarp
is_service_node() ? alpns::SERVICE_INBOUND : alpns::CLIENT_INBOUND,
is_service_node() ? alpns::SERVICE_OUTBOUND : alpns::CLIENT_OUTBOUND);
// While only service nodes accept inbound connections, clients must have this key verify
// callback set. It will reject any attempted inbound connection to a lokinet client prior to
// handshake completion
tls_creds->set_key_verify_callback([this](const ustring_view& key, const ustring_view& alpn) {
auto is_snode = is_service_node();
RouterID other{key.data()};
auto us = router().is_bootstrap_seed() ? "Bootstrap seed node"s : "Service node"s;
auto is_snode = is_service_node();
if (is_snode)
{
if (alpn == alpns::C_ALPNS)
{
log::critical(logcat, "{} node accepting client connection (remote ID:{})!", us, other);
ep.client_conns.emplace(other, nullptr);
return true;
}
if (alpn == alpns::SN_ALPNS)
@ -271,10 +254,14 @@ namespace llarp
log::critical(
logcat,
"{} node was {} to confirm remote (RID:{}) is registered; allowing connection!",
"{} node was {} to confirm remote (RID:{}) is registered; {} connection!",
us,
result ? "able" : "unable",
other);
other,
result ? "allowing" : "rejecting");
if (result)
ep.service_conns.emplace(other, nullptr);
return result;
}
@ -283,81 +270,76 @@ namespace llarp
return false;
}
// TESTNET: change this to an error message later; just because someone tries to erroneously
// connect to a local lokinet client doesn't mean we should kill the program?
throw std::runtime_error{"Clients should not be validating inbound connections!"};
});
if (_router.is_service_node())
{
ep->listen(tls_creds, ROUTER_KEEP_ALIVE);
e->listen(tls_creds, ROUTER_KEEP_ALIVE);
}
return ep;
return e;
}
void
LinkManager::on_inbound_conn(oxen::quic::connection_interface& ci)
std::shared_ptr<oxen::quic::BTRequestStream>
LinkManager::make_control(oxen::quic::connection_interface& ci, const RouterID& rid)
{
const auto& scid = ci.scid();
RouterID rid{ci.remote_key()};
ep.service_connid_map.emplace(scid, rid);
auto [itr, b] = ep.service_conns.emplace(rid, nullptr);
log::critical(logcat, "Queueing BTStream to be opened...");
auto control_stream = ci.queue_incoming_stream<oxen::quic::BTRequestStream>(
[this, rid = rid](oxen::quic::Stream&, uint64_t error_code) {
log::warning(
logcat,
"BTRequestStream closed unexpectedly (ec:{}); closing connection...",
"BTRequestStream closed unexpectedly (ec:{}); closing inbound connection...",
error_code);
ep.close_connection(rid);
});
log::critical(logcat, "Queued BTStream to be opened ID:{}", control_stream->stream_id());
log::critical(logcat, "Queued BTStream to be opened (ID:{})", control_stream->stream_id());
assert(control_stream->stream_id() == 0);
register_commands(control_stream, rid);
itr->second = std::make_shared<link::Connection>(ci.shared_from_this(), control_stream);
log::critical(logcat, "Successfully configured inbound connection fom {}...", rid);
return control_stream;
}
// TODO: should we add routes here now that Router::SessionOpen is gone?
void
LinkManager::on_conn_open(oxen::quic::connection_interface& ci)
LinkManager::on_inbound_conn(oxen::quic::connection_interface& ci)
{
_router.loop()->call([this, &conn_interface = ci]() {
const auto rid = RouterID{conn_interface.remote_key()};
const auto& remote = conn_interface.remote();
const auto& scid = conn_interface.scid();
if (conn_interface.is_inbound())
{
log::critical(logcat, "Inbound connection fom {} (remote:{})", rid, remote);
on_inbound_conn(conn_interface);
}
else
{
log::critical(logcat, "Searching for RID:{} in pending conns...", rid);
if (auto itr = ep.pending_conns.find(rid); itr != ep.pending_conns.end())
{
ep.service_connid_map.emplace(scid, rid);
auto [it, b] = ep.service_conns.emplace(rid, nullptr);
it->second = std::move(itr->second);
ep.pending_conns.erase(itr);
log::critical(logcat, "Connection to RID:{} moved from pending to active conns!", rid);
}
else
throw std::runtime_error{"Could not find newly established connection in pending conns!"};
}
assert(_is_service_node);
RouterID rid{ci.remote_key()};
if (auto it = ep.service_conns.find(rid); it != ep.service_conns.end())
{
log::critical(logcat, "Configuring inbound connection from relay RID:{}", rid);
it->second = std::make_shared<link::Connection>(ci.shared_from_this(), make_control(ci, rid));
}
else if (auto it = ep.client_conns.find(rid); it != ep.client_conns.end())
{
log::critical(logcat, "Configuring inbound connection from client RID:{}", rid);
it->second =
std::make_shared<link::Connection>(ci.shared_from_this(), make_control(ci, rid), false);
}
else
{
log::critical(
logcat,
"SERVICE NODE (RID:{}) ESTABLISHED CONNECTION TO RID:{}",
_router.local_rid(),
"ERROR: connection accepted from RID:{} that was not logged in key verification!",
rid);
}
log::critical(logcat, "Successfully configured inbound connection fom {}...", rid);
}
void
LinkManager::on_outbound_conn(oxen::quic::connection_interface& ci)
{
RouterID rid{ci.remote_key()};
if (auto it = ep.service_conns.find(rid); it != ep.service_conns.end())
{
log::critical(logcat, "Fetched configured outbound connection to relay RID:{}", rid);
auto& conn = it->second->conn;
auto& str = it->second->control_stream;
// check to see if this connection was established while we were attempting to queue
// messages to the remote
if (auto itr = pending_conn_msg_queue.find(rid); itr != pending_conn_msg_queue.end())
{
log::critical(logcat, "Clearing pending queue for RID:{}", rid);
@ -370,12 +352,12 @@ namespace llarp
if (msg.is_control)
{
log::critical(logcat, "Dispatching {} request!", *msg.endpoint);
ep.service_conns[rid]->control_stream->command(
std::move(*msg.endpoint), std::move(msg.body), std::move(msg.func));
str->command(std::move(*msg.endpoint), std::move(msg.body), std::move(msg.func));
}
else
{
conn_interface.send_datagram(std::move(msg.body));
log::critical(logcat, "DIspatching data message: {}", msg.body);
conn->send_datagram(std::move(msg.body));
}
que.pop_front();
@ -383,6 +365,40 @@ namespace llarp
}
log::warning(logcat, "Pending queue empty for RID:{}", rid);
}
else
{
log::critical(
logcat,
"ERROR: connection established to RID:{} that was not logged in key verifrication!",
rid);
}
}
// TODO: should we add routes here now that Router::SessionOpen is gone?
void
LinkManager::on_conn_open(oxen::quic::connection_interface& ci)
{
_router.loop()->call([this, &conn_interface = ci]() {
const auto rid = RouterID{conn_interface.remote_key()};
const auto& remote = conn_interface.remote();
if (conn_interface.is_inbound())
{
log::critical(logcat, "Inbound connection fom {} (remote:{})", rid, remote);
on_inbound_conn(conn_interface);
}
else
{
log::critical(logcat, "Outbound connection fom {} (remote:{})", rid, remote);
on_outbound_conn(conn_interface);
}
log::critical(
logcat,
"SERVICE NODE (RID:{}) ESTABLISHED CONNECTION TO RID:{}",
_router.local_rid(),
rid);
});
};
@ -390,33 +406,22 @@ namespace llarp
LinkManager::on_conn_closed(oxen::quic::connection_interface& ci, uint64_t ec)
{
_router.loop()->call(
[this, scid = ci.scid(), _rid = RouterID{ci.remote_key()}, error_code = ec]() {
log::critical(quic_cat, "Purging quic connection CID:{} (ec: {})", scid, error_code);
// a pending connection would not be in the connid_map
if (auto v_itr = ep.pending_conns.find(_rid); v_itr != ep.pending_conns.end())
{
ep.pending_conns.erase(v_itr);
[this, scid = ci.scid(), rid = RouterID{ci.remote_key()}, error_code = ec]() {
log::critical(quic_cat, "Purging quic connection CID:{} (ec:{})", scid, error_code);
// in case this didn't clear earlier, do it now
if (auto p_itr = pending_conn_msg_queue.find(_rid);
p_itr != pending_conn_msg_queue.end())
pending_conn_msg_queue.erase(p_itr);
// in case this didn't clear earlier, do it now
if (auto p_itr = pending_conn_msg_queue.find(rid); p_itr != pending_conn_msg_queue.end())
pending_conn_msg_queue.erase(p_itr);
log::critical(quic_cat, "Pending quic connection CID:{} purged successfully", scid);
if (auto s_itr = ep.service_conns.find(rid); s_itr != ep.service_conns.end())
{
log::critical(quic_cat, "Quic connection to relay RID:{} purged successfully", rid);
ep.service_conns.erase(s_itr);
}
else if (const auto& c_itr = ep.service_connid_map.find(scid);
c_itr != ep.service_connid_map.end())
else if (auto c_itr = ep.client_conns.find(rid); c_itr != ep.client_conns.end())
{
const auto& rid = c_itr->second;
assert(_rid == rid); // this should hold true
if (auto m_itr = ep.service_conns.find(rid); m_itr != ep.service_conns.end())
ep.service_conns.erase(m_itr);
ep.service_connid_map.erase(c_itr);
log::critical(quic_cat, "Quic connection CID:{} purged successfully", scid);
log::critical(quic_cat, "Quic connection to client RID:{} purged successfully", rid);
ep.client_conns.erase(c_itr);
}
else
log::critical(quic_cat, "Nothing to purge for quic connection CID:{}", scid);
@ -466,19 +471,15 @@ namespace llarp
f = std::move(func)]() {
auto pending = PendingMessage(std::move(body), std::move(endpoint), std::move(f));
if (auto it1 = ep.pending_conns.find(remote); it1 != ep.pending_conns.end())
if (auto it = pending_conn_msg_queue.find(remote); it != pending_conn_msg_queue.end())
{
if (auto it2 = pending_conn_msg_queue.find(remote); it2 != pending_conn_msg_queue.end())
{
it2->second.push_back(std::move(pending));
log::critical(
logcat, "Connection (RID:{}) is pending; message appended to send queue!", remote);
}
it->second.push_back(std::move(pending));
log::critical(
logcat, "Connection to RID:{} is pending; message appended to send queue!", remote);
}
else
{
log::critical(
logcat, "Connection (RID:{}) not found in pending conns; creating send queue!", remote);
log::critical(logcat, "Connection to RID:{} is pending; creating send queue!", remote);
auto [itr, b] = pending_conn_msg_queue.emplace(remote, MessageQueue());
itr->second.push_back(std::move(pending));
connect_to(remote);
@ -494,7 +495,7 @@ namespace llarp
if (is_stopping)
return false;
if (auto conn = ep.get_conn(remote); conn)
if (auto conn = ep.get_service_conn(remote); conn)
{
conn->conn->send_datagram(std::move(body));
return true;
@ -539,13 +540,12 @@ namespace llarp
log::warning(quic_cat, "Could not find RouterContact for connection to rid:{}", rid);
}
// This function assumes the RC has already had its signature verified and connection is allowed.
void
LinkManager::connect_to(const RemoteRC& rc, conn_open_hook on_open, conn_closed_hook on_close)
{
const auto& rid = rc.router_id();
if (ep.have_conn(rid))
if (ep.have_service_conn(rid))
{
log::warning(logcat, "We already have a connection to {}!", rid);
// TODO: should implement some connection failed logic, but not the same logic that
@ -576,6 +576,12 @@ namespace llarp
return ep.have_conn(remote);
}
bool
LinkManager::have_service_connection_to(const RouterID& remote) const
{
return ep.have_service_conn(remote);
}
bool
LinkManager::have_client_connection_to(const RouterID& remote) const
{
@ -628,12 +634,6 @@ namespace llarp
return get_num_connected(true);
}
bool
LinkManager::get_random_connected(RemoteRC& router) const
{
return ep.get_random_connection(router);
}
bool
LinkManager::is_service_node() const
{
@ -666,10 +666,10 @@ namespace llarp
LinkManager::connect_to_random(int num_conns, bool client_only)
{
auto filter = [this, client_only](const RemoteRC& rc) -> bool {
auto res =
client_only ? not ep.have_client_conn(rc.router_id()) : not ep.have_conn(rc.router_id());
const auto& rid = rc.router_id();
auto res = client_only ? not ep.have_client_conn(rid) : not ep.have_conn(rid);
log::debug(logcat, "RID:{} {}", rc.router_id(), res ? "ACCEPTED" : "REJECTED");
log::debug(logcat, "RID:{} {}", rid, res ? "ACCEPTED" : "REJECTED");
return res;
};
@ -766,17 +766,19 @@ namespace llarp
};
}
if (auto conn = ep.get_conn(source); conn)
const auto& rid = source.router_id();
if (auto conn = ep.get_service_conn(rid); conn)
{
conn->control_stream->command("bfetch_rcs"s, std::move(payload), std::move(f));
log::critical(logcat, "Dispatched bootstrap fetch request!");
return;
}
log::critical(logcat, "Queuing bootstrap fetch request to {}", source.router_id());
log::critical(logcat, "Queuing bootstrap fetch request to {}", rid);
auto pending = PendingMessage(std::move(payload), "bfetch_rcs"s, std::move(f));
auto [itr, b] = pending_conn_msg_queue.emplace(source.router_id(), MessageQueue());
auto [itr, b] = pending_conn_msg_queue.emplace(rid, MessageQueue());
itr->second.push_back(std::move(pending));
connect_to(source);

@ -63,29 +63,31 @@ namespace llarp
std::shared_ptr<oxen::quic::Endpoint> endpoint;
LinkManager& link_manager;
// for outgoing packets, we route via RouterID; map RouterID->Connection
// for incoming packets, we get a ConnectionID; map ConnectionID->RouterID
/** Connection containers:
- service_conns: holds all connections where the remote (from the perspective
of the local lokinet instance) is a service node. This means all relay to
relay connections are held here; clients will also hold their connections to
relays here as well
- client_conns: holds all connections wehre the remote is a client. This is only
used by service nodes to store their client connections
*/
std::unordered_map<RouterID, std::shared_ptr<link::Connection>> service_conns;
std::unordered_map<oxen::quic::ConnectionID, RouterID> service_connid_map;
std::unordered_map<RouterID, std::shared_ptr<link::Connection>> client_conns;
std::unordered_map<oxen::quic::ConnectionID, RouterID> client_connid_map;
// for pending connections, cleared in LinkManager::on_conn_open
std::unordered_map<RouterID, std::shared_ptr<link::Connection>> pending_conns;
// TODO: see which of these is actually useful and delete the other
std::shared_ptr<link::Connection>
get_conn(const RemoteRC&) const;
get_conn(const RouterID&) const;
std::shared_ptr<link::Connection>
get_conn(const RouterID&) const;
get_service_conn(const RouterID&) const;
bool
have_conn(const RouterID& remote) const;
bool
have_client_conn(const RouterID& remote) const;
bool
have_conn(const RouterID& remote) const;
have_service_conn(const RouterID& remote) const;
std::pair<size_t, size_t>
num_in_out() const;
@ -93,9 +95,6 @@ namespace llarp
size_t
num_connected(bool clients_only) const;
bool
get_random_connection(RemoteRC& router) const;
template <typename... Opt>
bool
establish_connection(
@ -229,9 +228,15 @@ namespace llarp
void
recv_control_message(oxen::quic::message msg);
std::shared_ptr<oxen::quic::BTRequestStream>
make_control(oxen::quic::connection_interface& ci, const RouterID& rid);
void
on_inbound_conn(oxen::quic::connection_interface& ci);
void
on_outbound_conn(oxen::quic::connection_interface& ci);
void
on_conn_open(oxen::quic::connection_interface& ci);
@ -242,7 +247,10 @@ namespace llarp
startup_endpoint();
void
register_commands(std::shared_ptr<oxen::quic::BTRequestStream>& s, const RouterID& rid);
register_commands(
std::shared_ptr<oxen::quic::BTRequestStream>& s,
const RouterID& rid,
bool client_only = false);
public:
const link::Endpoint&
@ -291,6 +299,9 @@ namespace llarp
bool
have_connection_to(const RouterID& remote) const;
bool
have_service_connection_to(const RouterID& remote) const;
bool
have_client_connection_to(const RouterID& remote) const;
@ -321,9 +332,6 @@ namespace llarp
size_t
get_num_connected_clients() const;
bool
get_random_connected(RemoteRC& router) const;
bool
is_service_node() const;
@ -444,22 +452,24 @@ namespace llarp
is_snode ? ROUTER_KEEP_ALIVE : CLIENT_KEEP_ALIVE,
std::forward<Opt>(opts)...);
// add to pending conns
auto [itr, b] = pending_conns.emplace(rid, nullptr);
// add to service conns
auto [itr, b] = service_conns.emplace(rid, nullptr);
auto control_stream = conn_interface->template open_stream<oxen::quic::BTRequestStream>(
[this, rid = rid](oxen::quic::Stream&, uint64_t error_code) {
log::warning(
logcat,
"BTRequestStream closed unexpectedly (ec:{}); closing connection...",
"BTRequestStream closed unexpectedly (ec:{}); closing outbound connection...",
error_code);
close_connection(rid);
});
log::critical(logcat, "Opened BTStream (ID:{})", control_stream->stream_id());
assert(control_stream->stream_id() == 0);
link_manager.register_commands(control_stream, rid);
itr->second = std::make_shared<link::Connection>(conn_interface, control_stream);
itr->second = std::make_shared<link::Connection>(conn_interface, control_stream, true);
log::critical(logcat, "Connection to RID:{} added to pending connections...", rid);
log::critical(logcat, "Outbound connection to RID:{} added to service conns...", rid);
return true;
}
catch (...)
@ -469,50 +479,4 @@ namespace llarp
}
}
} // namespace link
} // namespace llarp
/*
- Refactor RouterID to use gnutls info and maybe ConnectionID
- Combine routerID and connectionID to simplify mapping in llarp/link/endpoint.hpp
- Combine llarp/link/session.hpp into llarp/link/connection.hpp::Connection
- Combine llarp/link/server.hpp::ILinkLayer into llarp/link/endpoint.hpp::Endpoint
- must maintain metadata storage, callbacks, etc
- If: one endpoint for ipv4 and ipv6
- Then: can potentially combine:
- llarp/link/endpoint.hpp
- llarp/link/link_manager.hpp
- llarp/link/outbound_message_handler.hpp
- llarp/link/outbound_session_maker.hpp
-> Yields mega-combo endpoint managing object?
- Can avoid "kitchen sink" by greatly reducing complexity of implementation
llarp/router/outbound_message_handler.hpp
- pendingsessionmessagequeue
- establish queue of messages to be sent on a connection we are creating
- upon creation, send these messages in the connection established callback
- if connection times out, flush queue
- TOCHECK: is priority used at all??
std::unordered_map<std::string, void (llarp::link::LinkManager::*)(oxen::quic::message)>
rpc_commands = {
{"find_name", &handle_find_name},
// ...
};
for (const auto& [name, mfn] : rpc_commands)
bparser.add_command(name, [this, mfn] (oxen::quic::message m) {
router->call([this, mfn, m=std::move(m)] mutable {
try {
std::invoke(mfn, this, std::move(m));
} catch (const std::exception& e) {
m.respond("Error: "s + e.what(), true);
}
});
});
*/

@ -287,7 +287,7 @@ namespace llarp
const auto fetch_threshold = (double)union_size / num_received;
/** We are checking 2, potentially 3 things here:
/** We are checking 2 things here:
1) The ratio of received/accepted to total received is above GOOD_RID_FETCH_THRESHOLD.
This tells us how well the rid source's sets of rids "agree" with one another
2) The total number received is above MIN_RID_FETCH_TOTAL. This ensures that we are
@ -790,6 +790,13 @@ namespace llarp
const std::vector<RouterID>& greylist,
const std::vector<RouterID>& greenlist)
{
log::critical(
logcat,
"Oxend provided {}/{}/{} (white/gray/green) routers",
whitelist.size(),
greylist.size(),
greenlist.size());
if (whitelist.empty())
return;
@ -807,7 +814,7 @@ namespace llarp
log::critical(
logcat,
"Oxend provided {}:{} (whitelist:registered)",
"Service node holding {}:{} (whitelist:registered) after oxend integration",
_router_whitelist.size(),
_registered_routers.size());
}
@ -1039,7 +1046,7 @@ namespace llarp
RemoteRC rc{};
const llarp::dht::XorMetric compare(location);
VisitAll([&rc, compare](const auto& otherRC) {
visit_all([&rc, compare](const auto& otherRC) {
const auto& rid = rc.router_id();
if (rid.IsZero() || compare(dht::Key_t{otherRC.router_id()}, dht::Key_t{rid}))

@ -481,7 +481,7 @@ namespace llarp
/// visit all known_rcs
template <typename Visit>
void
VisitAll(Visit visit) const
visit_all(Visit visit) const
{
_router.loop()->call([this, visit]() {
for (const auto& item : known_rcs)
@ -496,7 +496,7 @@ namespace llarp
/// remove an entry given a filter that inspects the rc
template <typename Filter>
void
RemoveIf(Filter visit)
remove_if(Filter visit)
{
_router.loop()->call([this, visit]() {
std::unordered_set<RouterID> removed;

@ -847,7 +847,7 @@ namespace llarp
_last_tick != 0s and delta > NETWORK_RESET_SKIP_INTERVAL)
{
// we detected a time skip into the futre, thaw the network
log::warning(logcat, "Timeskip of {} detected, resetting network state!", delta.count());
log::error(logcat, "Timeskip of {} detected, resetting network state!", delta.count());
Thaw();
}
@ -877,28 +877,28 @@ namespace llarp
last_rc_gossip = now_timepoint;
// TESTNET: 1 to 4 minutes before testnet gossip interval
auto random_delta =
// TESTNET: 1 to 5 minutes before testnet gossip interval
auto delta =
std::chrono::seconds{std::uniform_int_distribution<size_t>{60, 300}(llarp::csrng)};
// 1min to 5min before "stale time" is next gossip time
// auto random_delta =
// std::chrono::seconds{std::uniform_int_distribution<size_t>{60, 300}(llarp::csrng)};
next_rc_gossip = now_timepoint + TESTNET_GOSSIP_INTERVAL - random_delta;
next_rc_gossip = now_timepoint + TESTNET_GOSSIP_INTERVAL - delta;
// next_rc_gossip = now_timepoint + RouterContact::STALE_AGE - random_delta;
}
report_stats();
}
if (needs_initial_fetch())
if (needs_rebootstrap() and now_timepoint > next_bootstrap_attempt)
{
if (not _config->bootstrap.seednode)
node_db()->fetch_initial(is_service_node());
node_db()->fallback_to_bootstrap();
}
else if (needs_rebootstrap() and now_timepoint > next_bootstrap_attempt)
else if (needs_initial_fetch())
{
node_db()->fallback_to_bootstrap();
if (not _config->bootstrap.seednode)
node_db()->fetch_initial(is_service_node());
}
else if (not is_snode)
{
@ -918,13 +918,14 @@ namespace llarp
}
// remove RCs for nodes that are no longer allowed by network policy
node_db()->RemoveIf([&](const RemoteRC& rc) -> bool {
node_db()->remove_if([&](const RemoteRC& rc) -> bool {
// don't purge bootstrap nodes from nodedb
if (is_bootstrap_node(rc.router_id()))
{
log::trace(logcat, "Not removing {}: is bootstrap node", rc.router_id());
return false;
}
// if for some reason we stored an RC that isn't a valid router
// purge this entry
if (not rc.is_public_addressable())
@ -932,12 +933,14 @@ namespace llarp
log::debug(logcat, "Removing {}: not a valid router", rc.router_id());
return true;
}
/// clear out a fully expired RC
// clear out a fully expired RC
if (rc.is_expired(now))
{
log::debug(logcat, "Removing {}: RC is expired", rc.router_id());
return true;
}
// clients have no notion of a whilelist
// we short circuit logic here so we dont remove
// routers that are not whitelisted for first hops
@ -965,26 +968,6 @@ namespace llarp
return false;
});
/* TODO: this behavior seems incorrect, but fixing it will require discussion
*
if (not is_snode or not whitelist_received)
{
// find all deregistered relays
std::unordered_set<RouterID> close_peers;
for_each_connection([this, &close_peers](link::Connection& conn) {
const auto& pk = conn.remote_rc.router_id();
if (conn.remote_is_relay and not _rc_lookup_handler.is_session_allowed(pk))
close_peers.insert(pk);
});
// mark peers as de-registered
for (auto& peer : close_peers)
_link_manager.close_connection(peer);
}
*/
_link_manager->check_persisting_conns(now);
auto num_conns = num_router_connections();
@ -1064,12 +1047,6 @@ namespace llarp
_last_tick = llarp::time_now_ms();
}
bool
Router::GetRandomConnectedRouter(RemoteRC& result) const
{
return _link_manager->get_random_connected(result);
}
const std::set<RouterID>&
Router::get_whitelist() const
{
@ -1324,12 +1301,6 @@ namespace llarp
_loop->call_later(200ms, [this] { AfterStopIssued(); });
}
bool
Router::HasSessionTo(const RouterID& remote) const
{
return _link_manager->have_connection_to(remote);
}
std::string
Router::ShortName() const
{

@ -540,12 +540,6 @@ namespace llarp
size_t
num_client_connections() const;
bool
GetRandomConnectedRouter(RemoteRC& result) const;
bool
HasSessionTo(const RouterID& remote) const;
std::string
ShortName() const;

Loading…
Cancel
Save