full mesh proto implementation

pull/2232/head
dr7ana 5 months ago
parent 89975a0b01
commit 7f143bb52f

@ -46,23 +46,27 @@ namespace llarp
}
bool
Endpoint::have_conn(const RouterID& remote, bool client_only) const
Endpoint::have_client_conn(const RouterID& remote) const
{
if (auto itr = active_conns.find(remote); itr != active_conns.end())
{
if (not(itr->second->remote_is_relay and client_only))
return true;
return not itr->second->remote_is_relay;
}
if (auto itr = pending_conns.find(remote); itr != pending_conns.end())
{
if (not(itr->second->remote_is_relay and client_only))
return true;
return not itr->second->remote_is_relay;
}
return false;
}
bool
Endpoint::have_conn(const RouterID& remote) const
{
return active_conns.count(remote) or pending_conns.count(remote);
}
size_t
Endpoint::num_connected(bool clients_only) const
{
@ -535,8 +539,6 @@ namespace llarp
const auto& remote_addr = rc.addr();
const auto& rid = rc.router_id();
// rids_pending_verification[rid] = rc;
// TODO: confirm remote end is using the expected pubkey (RouterID).
// TODO: ALPN for "client" vs "relay" (could just be set on endpoint creation)
if (auto rv = ep.establish_connection(
@ -553,15 +555,15 @@ namespace llarp
}
bool
LinkManager::have_connection_to(const RouterID& remote, bool client_only) const
LinkManager::have_connection_to(const RouterID& remote) const
{
return ep.have_conn(remote, client_only);
return ep.have_conn(remote);
}
bool
LinkManager::have_client_connection_to(const RouterID& remote) const
{
return ep.have_conn(remote, true);
return ep.have_client_conn(remote);
}
void
@ -636,20 +638,17 @@ namespace llarp
{
is_stopping = false;
node_db = _router.node_db();
client_router_connections = _router.required_num_client_conns();
}
void
LinkManager::connect_to_random(int num_conns)
LinkManager::connect_to_random(int num_conns, bool client_only)
{
std::set<RouterID> exclude;
auto remainder = num_conns;
auto filter = [exclude](const RemoteRC& rc) -> bool {
return exclude.count(rc.router_id()) == 0;
auto filter = [this, client_only](const RemoteRC& rc) -> bool {
return client_only ? not ep.have_client_conn(rc.router_id())
: not ep.have_conn(rc.router_id());
};
if (auto maybe = node_db->get_n_random_rcs_conditional(remainder, filter))
if (auto maybe = node_db->get_n_random_rcs_conditional(num_conns, filter))
{
std::vector<RemoteRC>& rcs = *maybe;
@ -696,6 +695,8 @@ namespace llarp
void
LinkManager::handle_gossip_rc(oxen::quic::message m)
{
log::critical(logcat, "Handling GossipRC request...");
// RemoteRC constructor wraps deserialization in a try/catch
RemoteRC rc;
RouterID src, sender;
@ -864,7 +865,6 @@ namespace llarp
}
const auto& rcs = node_db->get_rcs();
const auto now = time_point_now();
oxenc::bt_dict_producer btdp;
const auto& last_time = node_db->get_last_rc_update_times();
@ -895,8 +895,6 @@ namespace llarp
}
}
btdp.append("time", now.time_since_epoch().count());
m.respond(std::move(btdp).str());
}

@ -67,7 +67,10 @@ namespace llarp
get_conn(const RouterID&) const;
bool
have_conn(const RouterID& remote, bool client_only) const;
have_client_conn(const RouterID& remote) const;
bool
have_conn(const RouterID& remote) const;
size_t
num_connected(bool clients_only) const;
@ -265,7 +268,7 @@ namespace llarp
handle_fetch_bootstrap_rcs(oxen::quic::message m);
bool
have_connection_to(const RouterID& remote, bool client_only = false) const;
have_connection_to(const RouterID& remote) const;
bool
have_client_connection_to(const RouterID& remote) const;
@ -318,7 +321,7 @@ namespace llarp
// check if we already have a connection to any of the random set, as making
// that thread safe would be slow...I think.
void
connect_to_random(int num_conns);
connect_to_random(int num_conns, bool client_only = false);
/// always maintain this many client connections to other routers
int client_router_connections = 4;

@ -245,7 +245,7 @@ namespace llarp
}
bool
NodeDB::ingest_fetched_rcs(std::set<RemoteRC> rcs, rc_time timestamp)
NodeDB::ingest_fetched_rcs(std::set<RemoteRC> rcs)
{
// if we are not bootstrapping, we should check the rc's against the ones we currently hold
if (not _using_bootstrap_fallback)
@ -255,7 +255,7 @@ namespace llarp
}
while (!rcs.empty())
put_rc_if_newer(std::move(rcs.extract(rcs.begin()).value()), timestamp);
put_rc_if_newer(std::move(rcs.extract(rcs.begin()).value()));
return true;
}
@ -405,7 +405,6 @@ namespace llarp
}
auto btlc = btdc.require<oxenc::bt_list_consumer>("rcs"sv);
auto timestamp = rc_time{std::chrono::seconds{btdc.require<int64_t>("time"sv)}};
std::set<RemoteRC> rcs;
@ -413,7 +412,7 @@ namespace llarp
rcs.emplace(btlc.consume_dict_data());
// if process_fetched_rcs returns false, then the trust model rejected the fetched RC's
fetch_rcs_result(initial, not ingest_fetched_rcs(std::move(rcs), timestamp));
fetch_rcs_result(initial, not ingest_fetched_rcs(std::move(rcs)));
}
catch (const std::exception& e)
{
@ -957,7 +956,16 @@ namespace llarp
}
std::optional<RemoteRC>
NodeDB::get_rc(RouterID pk) const
NodeDB::get_rc(const RemoteRC& pk) const
{
if (auto itr = known_rcs.find(pk); itr != known_rcs.end())
return *itr;
return std::nullopt;
}
std::optional<RemoteRC>
NodeDB::get_rc(const RouterID& pk) const
{
if (auto itr = rc_lookup.find(pk); itr != rc_lookup.end())
return itr->second;
@ -1036,10 +1044,13 @@ namespace llarp
}
bool
NodeDB::put_rc_if_newer(RemoteRC rc, rc_time now)
NodeDB::put_rc_if_newer(RemoteRC rc)
{
if (not has_rc(rc))
return put_rc(std::move(rc), now);
if (auto maybe = get_rc(rc))
{
if (maybe->other_is_newer(rc))
return put_rc(rc);
}
return false;
}

@ -232,7 +232,7 @@ namespace llarp
ingest_bootstrap_seed();
bool
ingest_fetched_rcs(std::set<RemoteRC> rcs, rc_time timestamp);
ingest_fetched_rcs(std::set<RemoteRC> rcs);
bool
process_fetched_rcs(std::set<RemoteRC>& rcs);
@ -432,7 +432,10 @@ namespace llarp
/// maybe get an rc by its ident pubkey
std::optional<RemoteRC>
get_rc(RouterID pk) const;
get_rc(const RouterID& pk) const;
std::optional<RemoteRC>
get_rc(const RemoteRC& pk) const;
std::optional<RemoteRC>
get_random_rc() const;
@ -586,7 +589,7 @@ namespace llarp
/// put this rc into the cache if it is not there or is newer than the one there already
/// returns true if the rc was inserted
bool
put_rc_if_newer(RemoteRC rc, rc_time now = time_point_now());
put_rc_if_newer(RemoteRC rc);
bool
verify_store_gossip_rc(const RemoteRC& rc);

@ -986,48 +986,64 @@ namespace llarp
_link_manager->check_persisting_conns(now);
size_t connected = num_router_connections();
auto num_conns = num_router_connections();
size_t connectToNum = _link_manager->client_router_connections;
const auto& pinned_edges = _node_db->pinned_edges();
const auto pinned_count = pinned_edges.size();
const auto& num_rcs = node_db()->num_rcs();
if (pinned_count > 0 && connectToNum > pinned_count)
{
connectToNum = pinned_count;
}
if (is_snode and now >= _next_decomm_warning)
if (is_snode)
{
if (auto registered = appears_registered(), funded = appears_funded();
not(registered and funded and not is_decommed))
if (now >= _next_decomm_warning)
{
// complain about being deregistered/decommed/unfunded
log::error(
logcat,
"We are running as a service node but we seem to be {}",
not registered ? "deregistered"
: is_decommed ? "decommissioned"
: "not fully staked");
_next_decomm_warning = now + DECOMM_WARNING_INTERVAL;
if (auto registered = appears_registered(), funded = appears_funded();
not(registered and funded and not is_decommed))
{
// complain about being deregistered/decommed/unfunded
log::error(
logcat,
"We are running as a service node but we seem to be {}",
not registered ? "deregistered"
: is_decommed ? "decommissioned"
: "not fully staked");
_next_decomm_warning = now + DECOMM_WARNING_INTERVAL;
}
else if (insufficient_peers())
{
log::error(
logcat,
"We appear to be an active service node, but have only {} known peers.",
node_db()->num_rcs());
_next_decomm_warning = now + DECOMM_WARNING_INTERVAL;
}
}
else if (insufficient_peers())
if (num_conns < num_rcs)
{
log::error(
log::critical(
logcat,
"We appear to be an active service node, but have only {} known peers.",
node_db()->num_rcs());
_next_decomm_warning = now + DECOMM_WARNING_INTERVAL;
"Service Node connecting to {} random routers to achieve full mesh",
FULL_MESH_ITERATION);
_link_manager->connect_to_random(FULL_MESH_ITERATION);
}
else
log::critical(logcat, "SERVICE NODE IS FULLY MESHED");
}
// if we need more sessions to routers and we are not a service node kicked from the network or
// we are a client we shall connect out to others
if (connected < connectToNum and (appears_funded() or not is_snode))
else
{
size_t dlt = connectToNum - connected;
log::debug(logcat, "Connecting to {} random routers to keep alive", dlt);
_link_manager->connect_to_random(dlt);
size_t min_client_conns = _link_manager->client_router_connections;
const auto& pinned_edges = _node_db->pinned_edges();
const auto pinned_count = pinned_edges.size();
if (pinned_count > 0 && min_client_conns > pinned_count)
min_client_conns = pinned_count;
// if we need more sessions to routers and we are not a service node kicked from the network
// or we are a client we shall connect out to others
if (num_conns < min_client_conns)
{
size_t needed = min_client_conns - num_conns;
log::critical(logcat, "Client connecting to {} random routers to keep alive", needed);
_link_manager->connect_to_random(needed);
}
}
_hidden_service_context.Tick(now);

@ -51,7 +51,8 @@ namespace llarp
// TESTNET: these constants are shortened for testing purposes
inline constexpr std::chrono::milliseconds TESTNET_GOSSIP_INTERVAL{4min};
inline constexpr std::chrono::milliseconds RC_UPDATE_INTERVAL{4min};
// as we advance towards full mesh, we try to connect to this number per tick
inline constexpr int FULL_MESH_ITERATION{1};
inline constexpr std::chrono::milliseconds ROUTERID_UPDATE_INTERVAL{1h};
// DISCUSS: ask tom and jason about this

Loading…
Cancel
Save