mirror of https://github.com/oxen-io/lokinet
Merge remote-tracking branch 'origin/stable' into debian/sid
commit
9dd951b14b
@ -0,0 +1,7 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#ifdef _WIN32
|
||||||
|
#define EXPORT __cdecl
|
||||||
|
#else
|
||||||
|
#define EXPORT
|
||||||
|
#endif
|
@ -0,0 +1,156 @@
|
|||||||
|
|
||||||
|
#include "reachability_testing.hpp"
|
||||||
|
#include <chrono>
|
||||||
|
#include <llarp/router/abstractrouter.hpp>
|
||||||
|
#include <llarp/util/logging/logger.hpp>
|
||||||
|
#include <llarp/crypto/crypto.hpp>
|
||||||
|
|
||||||
|
using std::chrono::steady_clock;
|
||||||
|
|
||||||
|
namespace llarp::consensus
|
||||||
|
{
|
||||||
|
using fseconds = std::chrono::duration<float, std::chrono::seconds::period>;
|
||||||
|
using fminutes = std::chrono::duration<float, std::chrono::minutes::period>;
|
||||||
|
|
||||||
|
static void
|
||||||
|
check_incoming_tests_impl(
|
||||||
|
std::string_view name,
|
||||||
|
const time_point_t& now,
|
||||||
|
const time_point_t& startup,
|
||||||
|
detail::incoming_test_state& incoming)
|
||||||
|
{
|
||||||
|
const auto elapsed = now - std::max(startup, incoming.last_test);
|
||||||
|
bool failing = elapsed > reachability_testing::MAX_TIME_WITHOUT_PING;
|
||||||
|
bool whine = failing != incoming.was_failing
|
||||||
|
|| (failing && now - incoming.last_whine > reachability_testing::WHINING_INTERVAL);
|
||||||
|
|
||||||
|
incoming.was_failing = failing;
|
||||||
|
|
||||||
|
if (whine)
|
||||||
|
{
|
||||||
|
incoming.last_whine = now;
|
||||||
|
if (!failing)
|
||||||
|
{
|
||||||
|
LogInfo(name, " ping received; port is likely reachable again");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (incoming.last_test.time_since_epoch() == 0s)
|
||||||
|
{
|
||||||
|
LogWarn("Have NEVER received ", name, " pings!");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
LogWarn(
|
||||||
|
"Have not received ",
|
||||||
|
name,
|
||||||
|
" pings for a long time: ",
|
||||||
|
fminutes{elapsed}.count(),
|
||||||
|
" minutes");
|
||||||
|
}
|
||||||
|
LogWarn(
|
||||||
|
"Please check your ",
|
||||||
|
name,
|
||||||
|
" port. Not being reachable "
|
||||||
|
"over ",
|
||||||
|
name,
|
||||||
|
" may result in a deregistration!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
reachability_testing::check_incoming_tests(const time_point_t& now)
|
||||||
|
{
|
||||||
|
check_incoming_tests_impl("lokinet", now, startup, last);
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
reachability_testing::incoming_ping(const time_point_t& now)
|
||||||
|
{
|
||||||
|
last.last_test = now;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::optional<RouterID>
|
||||||
|
reachability_testing::next_random(AbstractRouter* router, const time_point_t& now, bool requeue)
|
||||||
|
{
|
||||||
|
if (next_general_test > now)
|
||||||
|
return std::nullopt;
|
||||||
|
CSRNG rng;
|
||||||
|
next_general_test =
|
||||||
|
now + std::chrono::duration_cast<time_point_t::duration>(fseconds(TESTING_INTERVAL(rng)));
|
||||||
|
|
||||||
|
// Pull the next element off the queue, but skip ourself, any that are no longer registered, and
|
||||||
|
// any that are currently known to be failing (those are queued for testing separately).
|
||||||
|
RouterID my_pk{router->pubkey()};
|
||||||
|
while (!testing_queue.empty())
|
||||||
|
{
|
||||||
|
auto& pk = testing_queue.back();
|
||||||
|
std::optional<RouterID> sn;
|
||||||
|
if (pk != my_pk && !failing.count(pk))
|
||||||
|
sn = pk;
|
||||||
|
testing_queue.pop_back();
|
||||||
|
if (sn)
|
||||||
|
return sn;
|
||||||
|
}
|
||||||
|
if (!requeue)
|
||||||
|
return std::nullopt;
|
||||||
|
|
||||||
|
// FIXME: when a *new* node comes online we need to inject it into a random position in the SN
|
||||||
|
// list with probability (L/N) [L = current list size, N = potential list size]
|
||||||
|
//
|
||||||
|
// (FIXME: put this FIXME in a better place ;-) )
|
||||||
|
|
||||||
|
// We exhausted the queue so repopulate it and try again
|
||||||
|
|
||||||
|
testing_queue.clear();
|
||||||
|
const auto all = router->GetRouterWhitelist();
|
||||||
|
testing_queue.insert(testing_queue.begin(), all.begin(), all.end());
|
||||||
|
|
||||||
|
std::shuffle(testing_queue.begin(), testing_queue.end(), rng);
|
||||||
|
|
||||||
|
// Recurse with the rebuild list, but don't let it try rebuilding again
|
||||||
|
return next_random(router, now, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<std::pair<RouterID, int>>
|
||||||
|
reachability_testing::get_failing(const time_point_t& now)
|
||||||
|
{
|
||||||
|
// Our failing_queue puts the oldest retest times at the top, so pop them off into our result
|
||||||
|
// until the top node should be retested sometime in the future
|
||||||
|
std::vector<std::pair<RouterID, int>> result;
|
||||||
|
while (result.size() < MAX_RETESTS_PER_TICK && !failing_queue.empty())
|
||||||
|
{
|
||||||
|
auto& [pk, retest_time, failures] = failing_queue.top();
|
||||||
|
if (retest_time > now)
|
||||||
|
break;
|
||||||
|
result.emplace_back(pk, failures);
|
||||||
|
failing_queue.pop();
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
reachability_testing::add_failing_node(const RouterID& pk, int previous_failures)
|
||||||
|
{
|
||||||
|
using namespace std::chrono;
|
||||||
|
|
||||||
|
if (previous_failures < 0)
|
||||||
|
previous_failures = 0;
|
||||||
|
CSRNG rng;
|
||||||
|
auto next_test_in = duration_cast<time_point_t::duration>(
|
||||||
|
previous_failures * TESTING_BACKOFF + fseconds{TESTING_INTERVAL(rng)});
|
||||||
|
if (next_test_in > TESTING_BACKOFF_MAX)
|
||||||
|
next_test_in = TESTING_BACKOFF_MAX;
|
||||||
|
|
||||||
|
failing.insert(pk);
|
||||||
|
failing_queue.emplace(pk, steady_clock::now() + next_test_in, previous_failures + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
reachability_testing::remove_node_from_failing(const RouterID& pk)
|
||||||
|
{
|
||||||
|
failing.erase(pk);
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace llarp::consensus
|
@ -0,0 +1,148 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <chrono>
|
||||||
|
#include <queue>
|
||||||
|
#include <random>
|
||||||
|
#include <unordered_map>
|
||||||
|
#include <unordered_set>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
#include <llarp/util/time.hpp>
|
||||||
|
#include <llarp/router_id.hpp>
|
||||||
|
|
||||||
|
namespace llarp
|
||||||
|
{
|
||||||
|
struct AbstractRouter;
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace llarp::consensus
|
||||||
|
{
|
||||||
|
namespace detail
|
||||||
|
{
|
||||||
|
using clock_t = std::chrono::steady_clock;
|
||||||
|
using time_point_t = std::chrono::time_point<clock_t>;
|
||||||
|
|
||||||
|
// Returns std::greater on the std::get<N>(v)th element value.
|
||||||
|
template <typename T, size_t N>
|
||||||
|
struct nth_greater
|
||||||
|
{
|
||||||
|
constexpr bool
|
||||||
|
operator()(const T& lhs, const T& rhs) const
|
||||||
|
{
|
||||||
|
return std::greater<std::tuple_element_t<N, T>>{}(std::get<N>(lhs), std::get<N>(rhs));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
struct incoming_test_state
|
||||||
|
{
|
||||||
|
time_point_t last_test{};
|
||||||
|
time_point_t last_whine{};
|
||||||
|
bool was_failing = false;
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace detail
|
||||||
|
using time_point_t = detail::time_point_t;
|
||||||
|
using clock_t = detail::clock_t;
|
||||||
|
|
||||||
|
// How often we tick the timer to check whether we need to do any tests.
|
||||||
|
constexpr auto REACHABILITY_TESTING_TIMER_INTERVAL = 50ms;
|
||||||
|
|
||||||
|
class reachability_testing
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
// Distribution for the seconds between node tests: we throw in some randomness to avoid
|
||||||
|
// potential clustering of tests. (Note that there is some granularity here as the test timer
|
||||||
|
// only runs every REACHABILITY_TESTING_TIMER_INTERVAL).
|
||||||
|
std::normal_distribution<float> TESTING_INTERVAL{10.0, 3.0};
|
||||||
|
|
||||||
|
// The linear backoff after each consecutive test failure before we re-test. Specifically we
|
||||||
|
// schedule the next re-test for (TESTING_BACKOFF*previous_failures) + TESTING_INTERVAL(rng).
|
||||||
|
inline static constexpr auto TESTING_BACKOFF = 10s;
|
||||||
|
|
||||||
|
// The upper bound for the re-test interval.
|
||||||
|
inline static constexpr auto TESTING_BACKOFF_MAX = 2min;
|
||||||
|
|
||||||
|
// The maximum number of nodes that we will re-test at once (i.e. per TESTING_TIMING_INTERVAL);
|
||||||
|
// mainly intended to throttle ourselves if, for instance, our own connectivity loss makes us
|
||||||
|
// accumulate tons of nodes to test all at once. (Despite the random intervals, this can happen
|
||||||
|
// if we also get decommissioned during which we can't test at all but still have lots of
|
||||||
|
// failing nodes we want to test right away when we get recommissioned).
|
||||||
|
inline static constexpr int MAX_RETESTS_PER_TICK = 4;
|
||||||
|
|
||||||
|
// Maximum time without a ping before we start whining about it.
|
||||||
|
//
|
||||||
|
// We have a probability of about 0.368* of *not* getting pinged within a ping interval (10s),
|
||||||
|
// and so the probability of not getting a ping for 2 minutes (i.e. 12 test spans) just because
|
||||||
|
// we haven't been selected is extremely small (0.0000061). It also coincides nicely with
|
||||||
|
// blockchain time (i.e. two minutes) and our max testing backoff.
|
||||||
|
//
|
||||||
|
// * = approx value of ((n-1)/n)^n for non-tiny values of n
|
||||||
|
inline static constexpr auto MAX_TIME_WITHOUT_PING = 2min;
|
||||||
|
|
||||||
|
// How often we whine in the logs about being unreachable
|
||||||
|
inline static constexpr auto WHINING_INTERVAL = 2min;
|
||||||
|
|
||||||
|
private:
|
||||||
|
// Queue of pubkeys of service nodes to test; we pop off the back of this until the queue
|
||||||
|
// empties then we refill it with a shuffled list of all pubkeys then pull off of it until it is
|
||||||
|
// empty again, etc.
|
||||||
|
std::vector<RouterID> testing_queue;
|
||||||
|
|
||||||
|
// The next time for a general test
|
||||||
|
time_point_t next_general_test = time_point_t::min();
|
||||||
|
|
||||||
|
// When we started, so that we know not to hold off on whining about no pings for a while.
|
||||||
|
const time_point_t startup = clock_t::now();
|
||||||
|
|
||||||
|
// Pubkeys, next test times, and sequential failure counts of service nodes that are currently
|
||||||
|
// in "failed" status along with the last time they failed; we retest them first after 10s then
|
||||||
|
// back off linearly by an additional 10s up to a max testing interval of 2m30s, until we get a
|
||||||
|
// successful response.
|
||||||
|
using FailingPK = std::tuple<RouterID, time_point_t, int>;
|
||||||
|
std::priority_queue<FailingPK, std::vector<FailingPK>, detail::nth_greater<FailingPK, 1>>
|
||||||
|
failing_queue;
|
||||||
|
std::unordered_set<RouterID> failing;
|
||||||
|
|
||||||
|
// Track the last time *this node* was tested by other network nodes; used to detect and warn
|
||||||
|
// about possible network issues.
|
||||||
|
detail::incoming_test_state last;
|
||||||
|
|
||||||
|
public:
|
||||||
|
// If it is time to perform another random test, this returns the next node to test from the
|
||||||
|
// testing queue and returns it, also updating the timer for the next test. If it is not yet
|
||||||
|
// time, or if the queue is empty and cannot current be replenished, returns std::nullopt. If
|
||||||
|
// the queue empties then this builds a new one by shuffling current public keys in the swarm's
|
||||||
|
// "all nodes" then starts using the new queue for this an subsequent calls.
|
||||||
|
//
|
||||||
|
// `requeue` is mainly for internal use: if false it avoids rebuilding the queue if we run
|
||||||
|
// out (and instead just return nullopt).
|
||||||
|
std::optional<RouterID>
|
||||||
|
next_random(
|
||||||
|
AbstractRouter* router, const time_point_t& now = clock_t::now(), bool requeue = true);
|
||||||
|
|
||||||
|
// Removes and returns up to MAX_RETESTS_PER_TICK nodes that are due to be tested (i.e.
|
||||||
|
// next-testing-time <= now). Returns [snrecord, #previous-failures] for each.
|
||||||
|
std::vector<std::pair<RouterID, int>>
|
||||||
|
get_failing(const time_point_t& now = clock_t::now());
|
||||||
|
|
||||||
|
// Adds a bad node pubkey to the failing list, to be re-tested soon (with a backoff depending on
|
||||||
|
// `failures`; see TESTING_BACKOFF). `previous_failures` should be the number of previous
|
||||||
|
// failures *before* this one, i.e. 0 for a random general test; or the failure count returned
|
||||||
|
// by `get_failing` for repeated failures.
|
||||||
|
void
|
||||||
|
add_failing_node(const RouterID& pk, int previous_failures = 0);
|
||||||
|
|
||||||
|
/// removes the public key from the failing set
|
||||||
|
void
|
||||||
|
remove_node_from_failing(const RouterID& pk);
|
||||||
|
|
||||||
|
// Called when this router receives an incomming session
|
||||||
|
void
|
||||||
|
incoming_ping(const time_point_t& now = clock_t::now());
|
||||||
|
|
||||||
|
// Check whether we received incoming pings recently
|
||||||
|
void
|
||||||
|
check_incoming_tests(const time_point_t& now = clock_t::now());
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace llarp::consensus
|
Loading…
Reference in New Issue