Added connection keepalive

- implemented ngtcp2 ping to keep connections alive
- fixed weird lambda captures
- fetch logic
- lets see what happens
pull/2232/head
dr7ana 5 months ago
parent caa7b7ad24
commit cc97fe1f5f

@ -34,10 +34,8 @@ namespace llarp
/// Constructor
KeyManager();
/// Initializes keys using the provided config, loading from disk
///
/// NOTE: Must be called prior to obtaining any keys.
/// NOTE: blocks on I/O
/// Initializes keys using the provided config, loading from disk. Must be called
/// prior to obtaining any keys and blocks on I/O
///
/// @param config should be a prepared config object
/// @param genIfAbsent determines whether or not we will create files if they

@ -5,6 +5,5 @@
namespace llarp
{
using namespace std::literals;
/// how big of a time skip before we reset network state
constexpr auto TimeskipDetectedDuration = 1min;
} // namespace llarp

@ -182,7 +182,7 @@ namespace llarp
{
if (router)
{
llarp::log::debug(logcat, "Handling SIGINT");
llarp::log::error(logcat, "Handling SIGINT");
/// async stop router on sigint
router->Stop();
}

@ -27,6 +27,9 @@ namespace llarp
if (auto itr = active_conns.find(rc.router_id()); itr != active_conns.end())
return itr->second;
// if (auto itr = pending_conns.find(rc.router_id()); itr != pending_conns.end())
// return itr->second;
return nullptr;
}
@ -36,6 +39,9 @@ namespace llarp
if (auto itr = active_conns.find(rid); itr != active_conns.end())
return itr->second;
// if (auto itr = pending_conns.find(rid); itr != pending_conns.end())
// return itr->second;
return nullptr;
}
@ -48,6 +54,12 @@ namespace llarp
return true;
}
if (auto itr = pending_conns.find(remote); itr != pending_conns.end())
{
if (not(itr->second->remote_is_relay and client_only))
return true;
}
return false;
}
@ -99,14 +111,21 @@ namespace llarp
Endpoint::close_connection(RouterID _rid)
{
assert(link_manager._router.loop()->inEventLoop());
auto itr = active_conns.find(_rid);
if (itr != active_conns.end())
return;
auto& conn = *itr->second->conn;
conn.close_connection();
connid_map.erase(conn.scid());
active_conns.erase(itr);
// deletion from pending_conns, pending_conn_msg_queue, active_conns, etc is taken care
// of by LinkManager::on_conn_closed
if (auto itr = active_conns.find(_rid); itr != active_conns.end())
{
auto& conn = *itr->second->conn;
conn.close_connection();
}
else if (auto itr = pending_conns.find(_rid); itr != pending_conns.end())
{
auto& conn = *itr->second->conn;
conn.close_connection();
}
else
return;
}
} // namespace link
@ -123,7 +142,8 @@ namespace llarp
}
void
LinkManager::register_commands(std::shared_ptr<oxen::quic::BTRequestStream>& s, const RouterID& router_id)
LinkManager::register_commands(
std::shared_ptr<oxen::quic::BTRequestStream>& s, const RouterID& router_id)
{
log::critical(logcat, "{} called", __PRETTY_FUNCTION__);
@ -206,8 +226,7 @@ namespace llarp
if (_router.is_bootstrap_seed())
{
// FIXME: remove "|| true", this is just for local testing!
if (node_db->whitelist().count(other) || true)
if (node_db->whitelist().count(other))
{
log::critical(logcat, "Saving bootstrap seed requester...");
auto [it, b] = node_db->seeds().emplace(other);
@ -232,6 +251,7 @@ namespace llarp
{
ep->listen(
tls_creds,
ROUTER_KEEP_ALIVE,
[&](oxen::quic::Connection& c,
oxen::quic::Endpoint& e,
std::optional<int64_t> id) -> std::shared_ptr<oxen::quic::Stream> {
@ -304,6 +324,8 @@ namespace llarp
ep.connid_map.emplace(scid, rid);
auto [it, b] = ep.active_conns.emplace(rid, nullptr);
it->second = std::move(itr->second);
ep.pending_conns.erase(itr);
log::critical(logcat, "Connection to RID:{} moved from pending to active conns!", rid);
}
else
@ -340,43 +362,46 @@ namespace llarp
que.pop_front();
}
return;
}
log::warning(logcat, "No pending queue to clear for RID:{}", rid);
log::warning(logcat, "Pending queue empty for RID:{}", rid);
});
};
void
LinkManager::on_conn_closed(oxen::quic::connection_interface& ci, uint64_t ec)
{
_router.loop()->call([this, &conn_interface = ci, error_code = ec]() {
const auto& scid = conn_interface.scid();
log::critical(quic_cat, "Purging quic connection CID:{} (ec: {})", scid, error_code);
if (const auto& c_itr = ep.connid_map.find(scid); c_itr != ep.connid_map.end())
{
const auto& rid = c_itr->second;
_router.loop()->call(
[this, scid = ci.scid(), _rid = RouterID{ci.remote_key()}, error_code = ec]() {
log::critical(quic_cat, "Purging quic connection CID:{} (ec: {})", scid, error_code);
// if (auto maybe = rids_pending_verification.find(rid);
// maybe != rids_pending_verification.end())
// rids_pending_verification.erase(maybe);
// a pending connection would not be in the connid_map
if (auto v_itr = ep.pending_conns.find(_rid); v_itr != ep.pending_conns.end())
{
ep.pending_conns.erase(v_itr);
// in case this didn't clear earlier, do it now
if (auto p_itr = pending_conn_msg_queue.find(rid); p_itr != pending_conn_msg_queue.end())
pending_conn_msg_queue.erase(p_itr);
// in case this didn't clear earlier, do it now
if (auto p_itr = pending_conn_msg_queue.find(_rid);
p_itr != pending_conn_msg_queue.end())
pending_conn_msg_queue.erase(p_itr);
if (auto c_itr = ep.pending_conns.find(rid); c_itr != ep.pending_conns.end())
ep.pending_conns.erase(c_itr);
log::critical(quic_cat, "Pending quic connection CID:{} purged successfully", scid);
}
else if (const auto& c_itr = ep.connid_map.find(scid); c_itr != ep.connid_map.end())
{
const auto& rid = c_itr->second;
assert(_rid == rid); // this should hold true
if (auto m_itr = ep.active_conns.find(rid); m_itr != ep.active_conns.end())
ep.active_conns.erase(m_itr);
if (auto m_itr = ep.active_conns.find(rid); m_itr != ep.active_conns.end())
ep.active_conns.erase(m_itr);
ep.connid_map.erase(c_itr);
ep.connid_map.erase(c_itr);
log::critical(quic_cat, "Quic connection CID:{} purged successfully", scid);
}
});
log::critical(quic_cat, "Quic connection CID:{} purged successfully", scid);
}
else
log::critical(quic_cat, "Nothing to purge for quic connection CID:{}", scid);
});
}
bool
@ -585,6 +610,12 @@ namespace llarp
return ep.get_random_connection(router);
}
bool
LinkManager::is_service_node() const
{
return _router.is_service_node();
}
// TODO: this? perhaps no longer necessary in the same way?
void
LinkManager::check_persisting_conns(llarp_time_t)

@ -33,6 +33,11 @@ namespace llarp
using stream_open_hook = oxen::quic::stream_open_callback;
using stream_closed_hook = oxen::quic::stream_close_callback;
using keep_alive = oxen::quic::opt::keep_alive;
inline const keep_alive ROUTER_KEEP_ALIVE{10s};
inline const keep_alive CLIENT_KEEP_ALIVE{0s};
namespace link
{
struct Connection;
@ -292,6 +297,9 @@ namespace llarp
bool
get_random_connected(RemoteRC& router) const;
bool
is_service_node() const;
void
check_persisting_conns(llarp_time_t now);
@ -401,8 +409,13 @@ namespace llarp
const auto& rid = rc.router_id();
log::critical(logcat, "Establishing connection to RID:{}", rid);
auto conn_interface =
endpoint->connect(remote, link_manager.tls_creds, std::forward<Opt>(opts)...);
bool is_snode = link_manager.is_service_node();
auto conn_interface = endpoint->connect(
remote,
link_manager.tls_creds,
is_snode ? ROUTER_KEEP_ALIVE : CLIENT_KEEP_ALIVE,
std::forward<Opt>(opts)...);
// add to pending conns
auto [itr, b] = pending_conns.emplace(rid, nullptr);

@ -232,8 +232,8 @@ namespace llarp
return;
}
// NOTE: this potentially involves multiple memory allocations,
// reimplement without split() if it is performance bottleneck
// TOFIX: This potentially involves multiple memory allocations,
// reimplement without split() if it is performance bottleneck
auto splits = split(str, ":");
// TODO: having ":port" at the end makes this ambiguous with IPv6

@ -337,15 +337,20 @@ namespace llarp
}
void
NodeDB::fetch_initial()
NodeDB::fetch_initial(bool is_snode)
{
auto sz = num_rcs();
if (num_rcs() < MIN_ACTIVE_RCS)
if (sz < MIN_ACTIVE_RCS)
{
log::critical(logcat, "{}/{} RCs held locally... BOOTSTRAP TIME", sz, MIN_ACTIVE_RCS);
fallback_to_bootstrap();
}
else if (is_snode)
{
// service nodes who have sufficient local RC's can bypass initial fetching
_needs_initial_fetch = false;
}
else
{
// Set fetch source as random selection of known active client routers
@ -675,7 +680,7 @@ namespace llarp
_router.link_manager().fetch_bootstrap_rcs(
rc,
BootstrapFetchMessage::serialize(_router.router_contact, BOOTSTRAP_SOURCE_COUNT),
[this](oxen::quic::message m) mutable {
[this, is_snode = _router.is_service_node()](oxen::quic::message m) mutable {
log::critical(logcat, "Received response to BootstrapRC fetch request...");
if (not m)
@ -736,8 +741,19 @@ namespace llarp
fetch_source,
num,
BOOTSTRAP_SOURCE_COUNT);
// known_rids.merge(rids);
fetch_initial();
if (not is_snode)
{
log::critical(
logcat,
"Client completed processing BootstrapRC fetch; proceeding to initial fetch");
fetch_initial();
}
else
{
log::critical(logcat, "Service node completed processing BootstrapRC fetch!");
post_snode_bootstrap();
}
// FIXME: when moving to testnet, uncomment this
// if (rids.size() == BOOTSTRAP_SOURCE_COUNT)
@ -760,6 +776,14 @@ namespace llarp
});
}
void
NodeDB::post_snode_bootstrap()
{
_needs_rebootstrap = false;
_using_bootstrap_fallback = false;
_needs_initial_fetch = false;
}
void
NodeDB::bootstrap_cooldown()
{
@ -805,22 +829,21 @@ namespace llarp
std::optional<RouterID>
NodeDB::get_random_whitelist_router() const
{
// TODO: this should be checking whitelist not known_rcs
if (auto rc = get_random_rc())
return rc->router_id();
std::optional<RouterID> rand = std::nullopt;
return std::nullopt;
std::sample(router_whitelist.begin(), router_whitelist.end(), &*rand, 1, csrng);
return rand;
}
bool
NodeDB::is_connection_allowed(const RouterID& remote) const
{
if (_pinned_edges.size() && _pinned_edges.count(remote) == 0
&& not _bootstraps.contains(remote))
return false;
if (not _router.is_service_node())
return true;
{
if (_pinned_edges.size() && _pinned_edges.count(remote) == 0
&& not _bootstraps.contains(remote))
return false;
}
return known_rids.count(remote) or router_greylist.count(remote);
}

@ -22,6 +22,8 @@ namespace llarp
{
struct Router;
// TESTNET: the following constants have been shortened for testing purposes
/* RC Fetch Constants */
inline constexpr size_t MIN_ACTIVE_RCS{6};
// max number of attempts we make in non-bootstrap fetch requests
@ -242,7 +244,7 @@ namespace llarp
process_fetched_rids();
void
fetch_initial();
fetch_initial(bool is_snode = false);
// RouterContact fetching
void
@ -264,6 +266,8 @@ namespace llarp
void
fallback_to_bootstrap();
void
post_snode_bootstrap();
void
bootstrap_cooldown();
// Populate rid_sources with random sample from known_rids. A set of rids is passed

@ -58,12 +58,8 @@ namespace llarp
loop_wakeup = _loop->make_waker([this]() { PumpLL(); });
}
Router::~Router()
{}
// TODO: investigate changes needed for libquic integration
// still needed at all?
// TODO: No. The answer is No.
// TONUKE: EVERYTHING ABOUT THIS
void
@ -237,14 +233,10 @@ namespace llarp
Router::GetRandomGoodRouter()
{
if (is_service_node())
{
return node_db()->get_random_whitelist_router();
}
if (auto maybe = node_db()->get_random_rc())
{
return maybe->router_id();
}
return std::nullopt;
}
@ -742,23 +734,13 @@ namespace llarp
bool
Router::is_bootstrap_node(const RouterID r) const
{
if (_node_db->has_bootstraps())
{
const auto& b = _node_db->bootstrap_list();
return std::count_if(
b.begin(),
b.end(),
[r](const RemoteRC& rc) -> bool { return rc.router_id() == r; })
> 0;
}
return false;
return _node_db->has_bootstraps() ? _node_db->bootstrap_list().contains(r) : false;
}
bool
Router::should_report_stats(llarp_time_t now) const
{
static constexpr auto ReportStatsInterval = 1h;
return now - _last_stats_report > ReportStatsInterval;
return now - _last_stats_report > REPORT_STATS_INTERVAL;
}
void
@ -766,23 +748,29 @@ namespace llarp
{
const auto now = llarp::time_now_ms();
log::critical(
logcat,
"{} RCs loaded with {} RIDs, {} bootstrap peers, and {} router connections!",
_node_db->num_rcs(),
_node_db->num_rids(),
_node_db->num_bootstraps(),
num_router_connections());
if (is_service_node())
{
log::info(
log::critical(
logcat,
"Local service node has {} client connections since last RC update ({} to expiry)",
"Local Service Node has {} RCs, {} RIDs, {} bootstrap peers, {} router "
"connections, and {} client connections since last RC update ({} to expiry)",
_node_db->num_rcs(),
_node_db->num_rids(),
_node_db->num_bootstraps(),
num_router_connections(),
num_client_connections(),
router_contact.age(now),
router_contact.time_to_expiry(now));
}
else
{
log::critical(
logcat,
"{} RCs loaded with {} RIDs, {} bootstrap peers, and {} router connections!",
_node_db->num_rcs(),
_node_db->num_rids(),
_node_db->num_bootstraps(),
num_router_connections());
}
if (_last_stats_report > 0s)
log::info(logcat, "Last reported stats time {}", now - _last_stats_report);
@ -844,12 +832,18 @@ namespace llarp
{
if (is_stopping)
return;
// LogDebug("tick router");
const bool is_snode = is_service_node();
const bool is_decommed = appears_decommed();
const auto now = llarp::time_now_ms();
if (const auto delta = now - _last_tick; _last_tick != 0s and delta > TimeskipDetectedDuration)
auto now_timepoint = std::chrono::system_clock::time_point(now);
if (const auto delta = now - _last_tick;
_last_tick != 0s and delta > NETWORK_RESET_SKIP_INTERVAL)
{
// we detected a time skip into the futre, thaw the network
LogWarn("Timeskip of ", ToString(delta), " detected. Resetting network state");
log::warning(logcat, "Timeskip of {} detected, resetting network state!", delta.count());
Thaw();
}
@ -864,18 +858,13 @@ namespace llarp
report_stats();
}
const bool is_snode = is_service_node();
const bool is_decommed = appears_decommed();
// (relay-only) if we have fetched the relay list from oxend and
// we are registered and funded, we want to gossip our RC periodically
auto now_timepoint = std::chrono::system_clock::time_point(now);
if (is_snode)
{
if (appears_funded() and now_timepoint > next_rc_gossip)
if (now_timepoint > next_rc_gossip)
{
log::info(logcat, "regenerating and gossiping RC");
log::critical(logcat, "Regenerating and gossiping RC...");
router_contact.resign();
save_rc();
@ -887,11 +876,15 @@ namespace llarp
last_rc_gossip = now_timepoint;
// 1min to 5min before "stale time" is next gossip time
// TESTNET: 1 to 2 minutes before testnet gossip interval
auto random_delta =
std::chrono::seconds{std::uniform_int_distribution<size_t>{60, 300}(llarp::csrng)};
// 1min to 5min before "stale time" is next gossip time
// auto random_delta =
// std::chrono::seconds{std::uniform_int_distribution<size_t>{60, 300}(llarp::csrng)};
next_rc_gossip = now_timepoint + RouterContact::STALE_AGE - random_delta;
next_rc_gossip = now_timepoint + TESTNET_GOSSIP_INTERVAL - random_delta;
// next_rc_gossip = now_timepoint + RouterContact::STALE_AGE - random_delta;
}
report_stats();
@ -900,13 +893,13 @@ namespace llarp
if (needs_initial_fetch())
{
if (not _config->bootstrap.seednode)
node_db()->fetch_initial();
node_db()->fetch_initial(is_service_node());
}
else if (needs_rebootstrap() and now_timepoint > next_bootstrap_attempt)
{
node_db()->fallback_to_bootstrap();
}
else
else if (not is_snode)
{
// (client-only) periodically fetch updated RCs
if (now_timepoint - last_rc_fetch > RC_UPDATE_INTERVAL)
@ -916,8 +909,9 @@ namespace llarp
}
// (client-only) periodically fetch updated RouterID list
if (not is_snode and now_timepoint - last_rid_fetch > ROUTERID_UPDATE_INTERVAL)
if (now_timepoint - last_rid_fetch > ROUTERID_UPDATE_INTERVAL)
{
log::critical(logcat, "Time to fetch RIDs!");
node_db()->fetch_rids();
}
}
@ -1005,7 +999,6 @@ namespace llarp
if (is_snode and now >= _next_decomm_warning)
{
constexpr auto DecommissionWarnInterval = 5min;
if (auto registered = appears_registered(), funded = appears_funded();
not(registered and funded and not is_decommed))
{
@ -1016,7 +1009,7 @@ namespace llarp
not registered ? "deregistered"
: is_decommed ? "decommissioned"
: "not fully staked");
_next_decomm_warning = now + DecommissionWarnInterval;
_next_decomm_warning = now + DECOMM_WARNING_INTERVAL;
}
else if (insufficient_peers())
{
@ -1024,7 +1017,7 @@ namespace llarp
logcat,
"We appear to be an active service node, but have only {} known peers.",
node_db()->num_rcs());
_next_decomm_warning = now + DecommissionWarnInterval;
_next_decomm_warning = now + DECOMM_WARNING_INTERVAL;
}
}
@ -1033,7 +1026,7 @@ namespace llarp
if (connected < connectToNum and (appears_funded() or not is_snode))
{
size_t dlt = connectToNum - connected;
LogDebug("connecting to ", dlt, " random routers to keep alive");
log::debug(logcat, "Connecting to {} random routers to keep alive", dlt);
_link_manager->connect_to_random(dlt);
}

@ -40,16 +40,27 @@
namespace llarp
{
/// number of routers to publish to
static constexpr size_t INTROSET_RELAY_REDUNDANCY = 2;
inline constexpr size_t INTROSET_RELAY_REDUNDANCY{2};
/// number of dht locations handled per relay
static constexpr size_t INTROSET_REQS_PER_RELAY = 2;
inline constexpr size_t INTROSET_REQS_PER_RELAY{2};
static constexpr size_t INTROSET_STORAGE_REDUNDANCY =
(INTROSET_RELAY_REDUNDANCY * INTROSET_REQS_PER_RELAY);
inline constexpr size_t INTROSET_STORAGE_REDUNDANCY{
(INTROSET_RELAY_REDUNDANCY * INTROSET_REQS_PER_RELAY)};
static const std::chrono::seconds RC_UPDATE_INTERVAL = 4min;
static const std::chrono::seconds ROUTERID_UPDATE_INTERVAL = 1h;
// TESTNET: these constants are shortened for testing purposes
inline constexpr std::chrono::milliseconds TESTNET_GOSSIP_INTERVAL{4min};
inline constexpr std::chrono::milliseconds RC_UPDATE_INTERVAL{4min};
inline constexpr std::chrono::milliseconds ROUTERID_UPDATE_INTERVAL{1h};
// DISCUSS: ask tom and jason about this
// how big of a time skip before we reset network state
inline constexpr std::chrono::milliseconds NETWORK_RESET_SKIP_INTERVAL{1min};
inline constexpr std::chrono::milliseconds REPORT_STATS_INTERVAL{1h};
inline constexpr std::chrono::milliseconds DECOMM_WARNING_INTERVAL{5min};
struct Contacts;
@ -59,7 +70,7 @@ namespace llarp
explicit Router(EventLoop_ptr loop, std::shared_ptr<vpn::Platform> vpnPlatform);
~Router();
~Router() = default;
private:
std::shared_ptr<RoutePoker> _route_poker;

@ -53,8 +53,6 @@ namespace llarp
static inline constexpr size_t MAX_RC_SIZE = 1024;
/// Timespans for RCs:
/// How long (from its signing time) before an RC is considered "stale". Relays republish
/// their RCs slightly more frequently than this so that ideally this won't happen.
static constexpr auto STALE_AGE = 6h;

Loading…
Cancel
Save