build paths faster and limit path builds at edge router instead of via a time counter for all edges

pull/1576/head
Jeff Becker 3 years ago
parent ff2e79ce38
commit 59c9e997f2
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -172,6 +172,12 @@ namespace llarp
currentStatus = record.status; currentStatus = record.status;
if ((record.status & LR_StatusRecord::SUCCESS) != LR_StatusRecord::SUCCESS) if ((record.status & LR_StatusRecord::SUCCESS) != LR_StatusRecord::SUCCESS)
{ {
if (record.status & LR_StatusRecord::FAIL_CONGESTION and index == 0)
{
// first hop building too fast
failedAt = hops[index].rc.pubkey;
break;
}
// failed at next hop // failed at next hop
if (index + 1 < hops.size()) if (index + 1 < hops.size())
{ {
@ -245,8 +251,13 @@ namespace llarp
{ {
llarp::LogDebug("Path build failed for an unspecified reason"); llarp::LogDebug("Path build failed for an unspecified reason");
} }
r->loop()->call( RouterID edge{};
[r, self = shared_from_this()]() { self->EnterState(ePathFailed, r->Now()); }); if (failedAt)
edge = *failedAt;
r->loop()->call([r, self = shared_from_this(), edge]() {
self->EnterState(ePathFailed, r->Now());
self->m_PathSet->HandlePathBuildFailedAt(self, edge);
});
} }
// TODO: meaningful return value? // TODO: meaningful return value?
@ -259,7 +270,6 @@ namespace llarp
if (st == ePathFailed) if (st == ePathFailed)
{ {
_status = st; _status = st;
m_PathSet->HandlePathBuildFailed(shared_from_this());
return; return;
} }
if (st == ePathExpired && _status == ePathBuilding) if (st == ePathExpired && _status == ePathBuilding)

@ -161,7 +161,11 @@ namespace llarp
namespace path namespace path
{ {
Builder::Builder(AbstractRouter* p_router, size_t pathNum, size_t hops) Builder::Builder(AbstractRouter* p_router, size_t pathNum, size_t hops)
: path::PathSet(pathNum), _run(true), m_router(p_router), numHops(hops) : path::PathSet{pathNum}
, m_EdgeLimiter{MIN_PATH_BUILD_INTERVAL}
, _run{true}
, m_router{p_router}
, numHops{hops}
{ {
CryptoManager::instance()->encryption_keygen(enckey); CryptoManager::instance()->encryption_keygen(enckey);
} }
@ -169,13 +173,14 @@ namespace llarp
void void
Builder::ResetInternalState() Builder::ResetInternalState()
{ {
buildIntervalLimit = MIN_PATH_BUILD_INTERVAL; buildIntervalLimit = PATH_BUILD_RATE;
lastBuild = 0s; lastBuild = 0s;
} }
void Builder::Tick(llarp_time_t) void Builder::Tick(llarp_time_t)
{ {
const auto now = llarp::time_now_ms(); const auto now = llarp::time_now_ms();
m_EdgeLimiter.Decay(now);
ExpirePaths(now, m_router); ExpirePaths(now, m_router);
if (ShouldBuildMore(now)) if (ShouldBuildMore(now))
BuildOne(); BuildOne();
@ -221,6 +226,9 @@ namespace llarp
if (exclude.count(rc.pubkey)) if (exclude.count(rc.pubkey))
return; return;
if (m_EdgeLimiter.Contains(rc.pubkey))
return;
found = rc; found = rc;
} }
}, },
@ -266,6 +274,12 @@ namespace llarp
return enckey; return enckey;
} }
bool
Builder::BuildCooldownHit(RouterID edge) const
{
return m_EdgeLimiter.Contains(edge);
}
bool bool
Builder::BuildCooldownHit(llarp_time_t now) const Builder::BuildCooldownHit(llarp_time_t now) const
{ {
@ -379,6 +393,14 @@ namespace llarp
{ {
if (IsStopped()) if (IsStopped())
return; return;
const RouterID edge{hops[0].pubkey};
if (not m_EdgeLimiter.Insert(edge))
{
LogWarn(Name(), " building too fast to edge router ", edge);
return;
}
lastBuild = Now(); lastBuild = Now();
// async generate keys // async generate keys
auto ctx = std::make_shared<AsyncPathKeyExchangeContext>(); auto ctx = std::make_shared<AsyncPathKeyExchangeContext>();
@ -401,7 +423,7 @@ namespace llarp
void void
Builder::HandlePathBuilt(Path_ptr p) Builder::HandlePathBuilt(Path_ptr p)
{ {
buildIntervalLimit = MIN_PATH_BUILD_INTERVAL; buildIntervalLimit = PATH_BUILD_RATE;
m_router->routerProfiling().MarkPathSuccess(p.get()); m_router->routerProfiling().MarkPathSuccess(p.get());
LogInfo(p->Name(), " built latency=", p->intro.latency); LogInfo(p->Name(), " built latency=", p->intro.latency);
@ -409,11 +431,12 @@ namespace llarp
} }
void void
Builder::HandlePathBuildFailed(Path_ptr p) Builder::HandlePathBuildFailedAt(Path_ptr p, RouterID edge)
{ {
m_router->routerProfiling().MarkPathFail(p.get()); PathSet::HandlePathBuildFailedAt(p, edge);
PathSet::HandlePathBuildFailed(p);
DoPathBuildBackoff(); DoPathBuildBackoff();
/// add it to the edge limter even if it's not an edge for simplicity
m_EdgeLimiter.Insert(edge);
} }
void void
@ -421,7 +444,7 @@ namespace llarp
{ {
static constexpr std::chrono::milliseconds MaxBuildInterval = 30s; static constexpr std::chrono::milliseconds MaxBuildInterval = 30s;
// linear backoff // linear backoff
buildIntervalLimit = std::min(MIN_PATH_BUILD_INTERVAL + buildIntervalLimit, MaxBuildInterval); buildIntervalLimit = std::min(PATH_BUILD_RATE + buildIntervalLimit, MaxBuildInterval);
LogWarn(Name(), " build interval is now ", buildIntervalLimit); LogWarn(Name(), " build interval is now ", buildIntervalLimit);
} }

@ -2,6 +2,7 @@
#include "pathset.hpp" #include "pathset.hpp"
#include <llarp/util/status.hpp> #include <llarp/util/status.hpp>
#include <llarp/util/decaying_hashset.hpp>
#include <atomic> #include <atomic>
#include <set> #include <set>
@ -10,13 +11,15 @@ namespace llarp
{ {
namespace path namespace path
{ {
// milliseconds waiting between builds on a path // milliseconds waiting between builds on a path per router
static constexpr auto MIN_PATH_BUILD_INTERVAL = 500ms; static constexpr auto MIN_PATH_BUILD_INTERVAL = 500ms;
static constexpr auto PATH_BUILD_RATE = 100ms;
struct Builder : public PathSet struct Builder : public PathSet
{ {
private: private:
llarp_time_t m_LastWarn = 0s; llarp_time_t m_LastWarn = 0s;
util::DecayingHashSet<RouterID> m_EdgeLimiter;
protected: protected:
/// flag for PathSet::Stop() /// flag for PathSet::Stop()
@ -25,6 +28,10 @@ namespace llarp
virtual bool virtual bool
UrgentBuild(llarp_time_t now) const; UrgentBuild(llarp_time_t now) const;
/// return true if we hit our soft limit for building paths too fast on a first hop
bool
BuildCooldownHit(RouterID edge) const;
private: private:
void void
DoPathBuildBackoff(); DoPathBuildBackoff();
@ -118,7 +125,7 @@ namespace llarp
HandlePathBuildTimeout(Path_ptr p) override; HandlePathBuildTimeout(Path_ptr p) override;
virtual void virtual void
HandlePathBuildFailed(Path_ptr p) override; HandlePathBuildFailedAt(Path_ptr p, RouterID hop) override;
}; };
using Builder_ptr = std::shared_ptr<Builder>; using Builder_ptr = std::shared_ptr<Builder>;

@ -327,9 +327,9 @@ namespace llarp
} }
void void
PathSet::HandlePathBuildFailed(Path_ptr p) PathSet::HandlePathBuildFailedAt(Path_ptr p, RouterID hop)
{ {
LogWarn(Name(), " path build ", p->ShortName(), " failed"); LogWarn(Name(), " path build ", p->ShortName(), " failed at ", hop);
m_BuildStats.fails++; m_BuildStats.fails++;
} }

@ -138,7 +138,7 @@ namespace llarp
HandlePathBuildTimeout(Path_ptr path); HandlePathBuildTimeout(Path_ptr path);
virtual void virtual void
HandlePathBuildFailed(Path_ptr path); HandlePathBuildFailedAt(Path_ptr path, RouterID hop);
virtual void virtual void
PathBuildStarted(Path_ptr path); PathBuildStarted(Path_ptr path);

@ -761,9 +761,10 @@ namespace llarp::quic
} }
auto expiry = std::chrono::nanoseconds{static_cast<std::chrono::nanoseconds::rep>(exp)}; auto expiry = std::chrono::nanoseconds{static_cast<std::chrono::nanoseconds::rep>(exp)};
auto expires_in = std::max(0ms, auto expires_in = std::max(
0ms,
std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::duration_cast<std::chrono::milliseconds>(
expiry - get_time().time_since_epoch())); expiry - get_time().time_since_epoch()));
LogDebug("Next retransmit in ", expires_in.count(), "ms"); LogDebug("Next retransmit in ", expires_in.count(), "ms");
retransmit_timer->stop(); retransmit_timer->stop();
retransmit_timer->start(expires_in, 0ms); retransmit_timer->start(expires_in, 0ms);

@ -13,7 +13,7 @@ namespace llarp::quic
Server(EndpointBase& service_endpoint) : Endpoint{service_endpoint} Server(EndpointBase& service_endpoint) : Endpoint{service_endpoint}
{ {
default_stream_buffer_size = 0; // We don't currently use the endpoint ring buffer default_stream_buffer_size = 0; // We don't currently use the endpoint ring buffer
} }
// Stream callback: takes the server, the (just-created) stream, and the connection port. // Stream callback: takes the server, the (just-created) stream, and the connection port.

@ -331,17 +331,17 @@ namespace llarp::quic
} }
void void
Stream::hard_close() Stream::hard_close()
{
if (avail_trigger)
{ {
if (avail_trigger) avail_trigger->close();
{ avail_trigger.reset();
avail_trigger->close();
avail_trigger.reset();
}
if (!is_closing && close_callback)
close_callback(*this, STREAM_ERROR_CONNECTION_EXPIRED);
is_closing = is_shutdown = true;
} }
if (!is_closing && close_callback)
close_callback(*this, STREAM_ERROR_CONNECTION_EXPIRED);
is_closing = is_shutdown = true;
}
void void
Stream::data(std::shared_ptr<void> data) Stream::data(std::shared_ptr<void> data)

@ -319,7 +319,8 @@ namespace llarp::quic
// Called by the owning Connection to do a "hard" close of a stream during Connection // Called by the owning Connection to do a "hard" close of a stream during Connection
// destruction: unlike a regular close this doesn't try to transmit a close over the wire (which // destruction: unlike a regular close this doesn't try to transmit a close over the wire (which
// won't work since the Connection is dead), it just fires the close callback and cleans up. // won't work since the Connection is dead), it just fires the close callback and cleans up.
void hard_close(); void
hard_close();
// ngtcp2 stream_id, assigned during stream creation // ngtcp2 stream_id, assigned during stream creation
StreamID stream_id{-1}; StreamID stream_id{-1};

@ -56,7 +56,9 @@ namespace llarp::quic
on_incoming_data(Stream& stream, bstring_view bdata) on_incoming_data(Stream& stream, bstring_view bdata)
{ {
auto tcp = stream.data<uvw::TCPHandle>(); auto tcp = stream.data<uvw::TCPHandle>();
if (!tcp) return; // TCP connection is gone, which would have already sent a stream close, so just drop it. if (!tcp)
return; // TCP connection is gone, which would have already sent a stream close, so just
// drop it.
std::string_view data{reinterpret_cast<const char*>(bdata.data()), bdata.size()}; std::string_view data{reinterpret_cast<const char*>(bdata.data()), bdata.size()};
auto peer = tcp->peer(); auto peer = tcp->peer();
@ -78,11 +80,12 @@ namespace llarp::quic
} }
} }
void close_tcp_pair(quic::Stream& st, std::optional<uint64_t> /*errcode*/) { void
close_tcp_pair(quic::Stream& st, std::optional<uint64_t> /*errcode*/)
{
if (auto tcp = st.data<uvw::TCPHandle>()) if (auto tcp = st.data<uvw::TCPHandle>())
tcp->close(); tcp->close();
}; };
// Creates a new tcp handle that forwards incoming data/errors/closes into appropriate actions // Creates a new tcp handle that forwards incoming data/errors/closes into appropriate actions
// on the given quic stream. // on the given quic stream.
void void
@ -447,7 +450,7 @@ namespace llarp::quic
auto err_handler = auto err_handler =
tcp_tunnel->once<uvw::ErrorEvent>([&failed](auto& evt, auto&) { failed = evt.what(); }); tcp_tunnel->once<uvw::ErrorEvent>([&failed](auto& evt, auto&) { failed = evt.what(); });
tcp_tunnel->bind(*bind_addr.operator const sockaddr*()); tcp_tunnel->bind(*bind_addr.operator const sockaddr*());
tcp_tunnel->on<uvw::ListenEvent>([this] (const uvw::ListenEvent&, uvw::TCPHandle& tcp_tunnel) { tcp_tunnel->on<uvw::ListenEvent>([this](const uvw::ListenEvent&, uvw::TCPHandle& tcp_tunnel) {
auto client = tcp_tunnel.loop().resource<uvw::TCPHandle>(); auto client = tcp_tunnel.loop().resource<uvw::TCPHandle>();
tcp_tunnel.accept(*client); tcp_tunnel.accept(*client);
// Freeze the connection (after accepting) because we may need to stall it until a stream // Freeze the connection (after accepting) because we may need to stall it until a stream
@ -590,7 +593,8 @@ namespace llarp::quic
void void
TunnelManager::flush_pending_incoming(ClientTunnel& ct) TunnelManager::flush_pending_incoming(ClientTunnel& ct)
{ {
if (!ct.client) return; // Happens if we're still waiting for a path to build if (!ct.client)
return; // Happens if we're still waiting for a path to build
assert(ct.client->get_connection()); assert(ct.client->get_connection());
auto& conn = *ct.client->get_connection(); auto& conn = *ct.client->get_connection();
int available = conn.get_streams_available(); int available = conn.get_streams_available();

@ -42,7 +42,7 @@
#include <oxenmq/oxenmq.h> #include <oxenmq/oxenmq.h>
static constexpr std::chrono::milliseconds ROUTER_TICK_INTERVAL = 1s; static constexpr std::chrono::milliseconds ROUTER_TICK_INTERVAL = 100ms;
namespace llarp namespace llarp
{ {

@ -682,11 +682,7 @@ namespace llarp
while (i != range.second) while (i != range.second)
{ {
itr->second->SetReadyHook( itr->second->SetReadyHook(
[callback = i->second, addr](auto session) { [callback = i->second, addr](auto session) { callback(addr, session); }, left);
LogInfo(addr, " is ready to send");
callback(addr, session);
},
left);
++i; ++i;
} }
serviceLookups.erase(addr); serviceLookups.erase(addr);
@ -703,11 +699,7 @@ namespace llarp
if (itr != range.second) if (itr != range.second)
{ {
session->SetReadyHook( session->SetReadyHook(
[callback = itr->second, addr](auto session) { [callback = itr->second, addr](auto session) { callback(addr, session); }, left);
LogInfo(addr, " is ready to send");
callback(addr, session);
},
left);
++itr; ++itr;
} }
serviceLookups.erase(addr); serviceLookups.erase(addr);
@ -1428,6 +1420,8 @@ namespace llarp
std::optional<ConvoTag> ret = std::nullopt; std::optional<ConvoTag> ret = std::nullopt;
for (const auto& [tag, session] : Sessions()) for (const auto& [tag, session] : Sessions())
{ {
if (tag.IsZero())
continue;
if (session.remote.Addr() == *ptr and session.lastUsed >= time) if (session.remote.Addr() == *ptr and session.lastUsed >= time)
{ {
time = session.lastUsed; time = session.lastUsed;

@ -79,11 +79,9 @@ namespace llarp
{ {
if (remoteIntro != m_NextIntro) if (remoteIntro != m_NextIntro)
{ {
LogInfo(Name(), " swap intro to use ", RouterID{m_NextIntro.router});
remoteIntro = m_NextIntro; remoteIntro = m_NextIntro;
m_DataHandler->PutSenderFor(currentConvoTag, currentIntroSet.A, false); m_DataHandler->PutSenderFor(currentConvoTag, currentIntroSet.A, false);
m_DataHandler->PutIntroFor(currentConvoTag, remoteIntro); m_DataHandler->PutIntroFor(currentConvoTag, remoteIntro);
ShiftIntroduction(false);
} }
} }
@ -128,7 +126,9 @@ namespace llarp
{ {
if (markedBad) if (markedBad)
return false; return false;
return (!remoteIntro.router.IsZero()) && GetPathByRouter(remoteIntro.router) != nullptr; if (remoteIntro.router.IsZero())
return false;
return GetPathByRouter(remoteIntro.router) != nullptr;
} }
void void
@ -157,10 +157,14 @@ namespace llarp
} }
void void
OutboundContext::HandlePathBuildFailed(path::Path_ptr p) OutboundContext::HandlePathBuildFailedAt(path::Path_ptr p, RouterID hop)
{ {
ShiftIntroRouter(p->Endpoint()); if (p->Endpoint() == hop)
path::Builder::HandlePathBuildFailed(p); {
// shift intro when we fail at the pivot
ShiftIntroRouter(p->Endpoint());
}
path::Builder::HandlePathBuildFailedAt(p, hop);
} }
void void
@ -186,9 +190,11 @@ namespace llarp
{ {
if (sentIntro) if (sentIntro)
return; return;
if (remoteIntro.router.IsZero()) if (remoteIntro.router.IsZero())
SwapIntros(); {
LogWarn(Name(), " dropping intro frame we have no intro ready yet");
return;
}
auto path = m_PathSet->GetPathByRouter(remoteIntro.router); auto path = m_PathSet->GetPathByRouter(remoteIntro.router);
if (path == nullptr) if (path == nullptr)

@ -106,7 +106,7 @@ namespace llarp
HandlePathBuildTimeout(path::Path_ptr path) override; HandlePathBuildTimeout(path::Path_ptr path) override;
void void
HandlePathBuildFailed(path::Path_ptr path) override; HandlePathBuildFailedAt(path::Path_ptr path, RouterID hop) override;
std::optional<std::vector<RouterContact>> std::optional<std::vector<RouterContact>>
GetHopsForBuild() override; GetHopsForBuild() override;

Loading…
Cancel
Save