From 59c9e997f29d53bcf8a1207109d21ebad6a6abb7 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Wed, 31 Mar 2021 06:54:28 -0400 Subject: [PATCH] build paths faster and limit path builds at edge router instead of via a time counter for all edges --- llarp/path/path.cpp | 16 ++++++++++--- llarp/path/pathbuilder.cpp | 37 ++++++++++++++++++++++++------ llarp/path/pathbuilder.hpp | 11 +++++++-- llarp/path/pathset.cpp | 4 ++-- llarp/path/pathset.hpp | 2 +- llarp/quic/connection.cpp | 5 ++-- llarp/quic/server.hpp | 2 +- llarp/quic/stream.cpp | 18 +++++++-------- llarp/quic/stream.hpp | 3 ++- llarp/quic/tunnel.cpp | 14 +++++++---- llarp/router/router.cpp | 2 +- llarp/service/endpoint.cpp | 14 ++++------- llarp/service/outbound_context.cpp | 22 +++++++++++------- llarp/service/outbound_context.hpp | 2 +- 14 files changed, 99 insertions(+), 53 deletions(-) diff --git a/llarp/path/path.cpp b/llarp/path/path.cpp index f147b336d..93821e0d3 100644 --- a/llarp/path/path.cpp +++ b/llarp/path/path.cpp @@ -172,6 +172,12 @@ namespace llarp currentStatus = record.status; if ((record.status & LR_StatusRecord::SUCCESS) != LR_StatusRecord::SUCCESS) { + if (record.status & LR_StatusRecord::FAIL_CONGESTION and index == 0) + { + // first hop building too fast + failedAt = hops[index].rc.pubkey; + break; + } // failed at next hop if (index + 1 < hops.size()) { @@ -245,8 +251,13 @@ namespace llarp { llarp::LogDebug("Path build failed for an unspecified reason"); } - r->loop()->call( - [r, self = shared_from_this()]() { self->EnterState(ePathFailed, r->Now()); }); + RouterID edge{}; + if (failedAt) + edge = *failedAt; + r->loop()->call([r, self = shared_from_this(), edge]() { + self->EnterState(ePathFailed, r->Now()); + self->m_PathSet->HandlePathBuildFailedAt(self, edge); + }); } // TODO: meaningful return value? @@ -259,7 +270,6 @@ namespace llarp if (st == ePathFailed) { _status = st; - m_PathSet->HandlePathBuildFailed(shared_from_this()); return; } if (st == ePathExpired && _status == ePathBuilding) diff --git a/llarp/path/pathbuilder.cpp b/llarp/path/pathbuilder.cpp index 4f19e04c3..ad27cb2aa 100644 --- a/llarp/path/pathbuilder.cpp +++ b/llarp/path/pathbuilder.cpp @@ -161,7 +161,11 @@ namespace llarp namespace path { Builder::Builder(AbstractRouter* p_router, size_t pathNum, size_t hops) - : path::PathSet(pathNum), _run(true), m_router(p_router), numHops(hops) + : path::PathSet{pathNum} + , m_EdgeLimiter{MIN_PATH_BUILD_INTERVAL} + , _run{true} + , m_router{p_router} + , numHops{hops} { CryptoManager::instance()->encryption_keygen(enckey); } @@ -169,13 +173,14 @@ namespace llarp void Builder::ResetInternalState() { - buildIntervalLimit = MIN_PATH_BUILD_INTERVAL; + buildIntervalLimit = PATH_BUILD_RATE; lastBuild = 0s; } void Builder::Tick(llarp_time_t) { const auto now = llarp::time_now_ms(); + m_EdgeLimiter.Decay(now); ExpirePaths(now, m_router); if (ShouldBuildMore(now)) BuildOne(); @@ -221,6 +226,9 @@ namespace llarp if (exclude.count(rc.pubkey)) return; + if (m_EdgeLimiter.Contains(rc.pubkey)) + return; + found = rc; } }, @@ -266,6 +274,12 @@ namespace llarp return enckey; } + bool + Builder::BuildCooldownHit(RouterID edge) const + { + return m_EdgeLimiter.Contains(edge); + } + bool Builder::BuildCooldownHit(llarp_time_t now) const { @@ -379,6 +393,14 @@ namespace llarp { if (IsStopped()) return; + + const RouterID edge{hops[0].pubkey}; + if (not m_EdgeLimiter.Insert(edge)) + { + LogWarn(Name(), " building too fast to edge router ", edge); + return; + } + lastBuild = Now(); // async generate keys auto ctx = std::make_shared(); @@ -401,7 +423,7 @@ namespace llarp void Builder::HandlePathBuilt(Path_ptr p) { - buildIntervalLimit = MIN_PATH_BUILD_INTERVAL; + buildIntervalLimit = PATH_BUILD_RATE; m_router->routerProfiling().MarkPathSuccess(p.get()); LogInfo(p->Name(), " built latency=", p->intro.latency); @@ -409,11 +431,12 @@ namespace llarp } void - Builder::HandlePathBuildFailed(Path_ptr p) + Builder::HandlePathBuildFailedAt(Path_ptr p, RouterID edge) { - m_router->routerProfiling().MarkPathFail(p.get()); - PathSet::HandlePathBuildFailed(p); + PathSet::HandlePathBuildFailedAt(p, edge); DoPathBuildBackoff(); + /// add it to the edge limter even if it's not an edge for simplicity + m_EdgeLimiter.Insert(edge); } void @@ -421,7 +444,7 @@ namespace llarp { static constexpr std::chrono::milliseconds MaxBuildInterval = 30s; // linear backoff - buildIntervalLimit = std::min(MIN_PATH_BUILD_INTERVAL + buildIntervalLimit, MaxBuildInterval); + buildIntervalLimit = std::min(PATH_BUILD_RATE + buildIntervalLimit, MaxBuildInterval); LogWarn(Name(), " build interval is now ", buildIntervalLimit); } diff --git a/llarp/path/pathbuilder.hpp b/llarp/path/pathbuilder.hpp index 8e2fd8705..fa1c1d367 100644 --- a/llarp/path/pathbuilder.hpp +++ b/llarp/path/pathbuilder.hpp @@ -2,6 +2,7 @@ #include "pathset.hpp" #include +#include #include #include @@ -10,13 +11,15 @@ namespace llarp { namespace path { - // milliseconds waiting between builds on a path + // milliseconds waiting between builds on a path per router static constexpr auto MIN_PATH_BUILD_INTERVAL = 500ms; + static constexpr auto PATH_BUILD_RATE = 100ms; struct Builder : public PathSet { private: llarp_time_t m_LastWarn = 0s; + util::DecayingHashSet m_EdgeLimiter; protected: /// flag for PathSet::Stop() @@ -25,6 +28,10 @@ namespace llarp virtual bool UrgentBuild(llarp_time_t now) const; + /// return true if we hit our soft limit for building paths too fast on a first hop + bool + BuildCooldownHit(RouterID edge) const; + private: void DoPathBuildBackoff(); @@ -118,7 +125,7 @@ namespace llarp HandlePathBuildTimeout(Path_ptr p) override; virtual void - HandlePathBuildFailed(Path_ptr p) override; + HandlePathBuildFailedAt(Path_ptr p, RouterID hop) override; }; using Builder_ptr = std::shared_ptr; diff --git a/llarp/path/pathset.cpp b/llarp/path/pathset.cpp index f49acfb55..0b8da15a6 100644 --- a/llarp/path/pathset.cpp +++ b/llarp/path/pathset.cpp @@ -327,9 +327,9 @@ namespace llarp } void - PathSet::HandlePathBuildFailed(Path_ptr p) + PathSet::HandlePathBuildFailedAt(Path_ptr p, RouterID hop) { - LogWarn(Name(), " path build ", p->ShortName(), " failed"); + LogWarn(Name(), " path build ", p->ShortName(), " failed at ", hop); m_BuildStats.fails++; } diff --git a/llarp/path/pathset.hpp b/llarp/path/pathset.hpp index 88a54a2bb..7e2cfea13 100644 --- a/llarp/path/pathset.hpp +++ b/llarp/path/pathset.hpp @@ -138,7 +138,7 @@ namespace llarp HandlePathBuildTimeout(Path_ptr path); virtual void - HandlePathBuildFailed(Path_ptr path); + HandlePathBuildFailedAt(Path_ptr path, RouterID hop); virtual void PathBuildStarted(Path_ptr path); diff --git a/llarp/quic/connection.cpp b/llarp/quic/connection.cpp index 641f9e5c8..c32f61282 100644 --- a/llarp/quic/connection.cpp +++ b/llarp/quic/connection.cpp @@ -761,9 +761,10 @@ namespace llarp::quic } auto expiry = std::chrono::nanoseconds{static_cast(exp)}; - auto expires_in = std::max(0ms, + auto expires_in = std::max( + 0ms, std::chrono::duration_cast( - expiry - get_time().time_since_epoch())); + expiry - get_time().time_since_epoch())); LogDebug("Next retransmit in ", expires_in.count(), "ms"); retransmit_timer->stop(); retransmit_timer->start(expires_in, 0ms); diff --git a/llarp/quic/server.hpp b/llarp/quic/server.hpp index 75953aa14..aededc032 100644 --- a/llarp/quic/server.hpp +++ b/llarp/quic/server.hpp @@ -13,7 +13,7 @@ namespace llarp::quic Server(EndpointBase& service_endpoint) : Endpoint{service_endpoint} { - default_stream_buffer_size = 0; // We don't currently use the endpoint ring buffer + default_stream_buffer_size = 0; // We don't currently use the endpoint ring buffer } // Stream callback: takes the server, the (just-created) stream, and the connection port. diff --git a/llarp/quic/stream.cpp b/llarp/quic/stream.cpp index 08790a212..77b6e9766 100644 --- a/llarp/quic/stream.cpp +++ b/llarp/quic/stream.cpp @@ -331,17 +331,17 @@ namespace llarp::quic } void - Stream::hard_close() + Stream::hard_close() + { + if (avail_trigger) { - if (avail_trigger) - { - avail_trigger->close(); - avail_trigger.reset(); - } - if (!is_closing && close_callback) - close_callback(*this, STREAM_ERROR_CONNECTION_EXPIRED); - is_closing = is_shutdown = true; + avail_trigger->close(); + avail_trigger.reset(); } + if (!is_closing && close_callback) + close_callback(*this, STREAM_ERROR_CONNECTION_EXPIRED); + is_closing = is_shutdown = true; + } void Stream::data(std::shared_ptr data) diff --git a/llarp/quic/stream.hpp b/llarp/quic/stream.hpp index ecc98d0d5..cd731c583 100644 --- a/llarp/quic/stream.hpp +++ b/llarp/quic/stream.hpp @@ -319,7 +319,8 @@ namespace llarp::quic // Called by the owning Connection to do a "hard" close of a stream during Connection // destruction: unlike a regular close this doesn't try to transmit a close over the wire (which // won't work since the Connection is dead), it just fires the close callback and cleans up. - void hard_close(); + void + hard_close(); // ngtcp2 stream_id, assigned during stream creation StreamID stream_id{-1}; diff --git a/llarp/quic/tunnel.cpp b/llarp/quic/tunnel.cpp index c81b43b7c..4bec1fa31 100644 --- a/llarp/quic/tunnel.cpp +++ b/llarp/quic/tunnel.cpp @@ -56,7 +56,9 @@ namespace llarp::quic on_incoming_data(Stream& stream, bstring_view bdata) { auto tcp = stream.data(); - if (!tcp) return; // TCP connection is gone, which would have already sent a stream close, so just drop it. + if (!tcp) + return; // TCP connection is gone, which would have already sent a stream close, so just + // drop it. std::string_view data{reinterpret_cast(bdata.data()), bdata.size()}; auto peer = tcp->peer(); @@ -78,11 +80,12 @@ namespace llarp::quic } } - void close_tcp_pair(quic::Stream& st, std::optional /*errcode*/) { + void + close_tcp_pair(quic::Stream& st, std::optional /*errcode*/) + { if (auto tcp = st.data()) tcp->close(); }; - // Creates a new tcp handle that forwards incoming data/errors/closes into appropriate actions // on the given quic stream. void @@ -447,7 +450,7 @@ namespace llarp::quic auto err_handler = tcp_tunnel->once([&failed](auto& evt, auto&) { failed = evt.what(); }); tcp_tunnel->bind(*bind_addr.operator const sockaddr*()); - tcp_tunnel->on([this] (const uvw::ListenEvent&, uvw::TCPHandle& tcp_tunnel) { + tcp_tunnel->on([this](const uvw::ListenEvent&, uvw::TCPHandle& tcp_tunnel) { auto client = tcp_tunnel.loop().resource(); tcp_tunnel.accept(*client); // Freeze the connection (after accepting) because we may need to stall it until a stream @@ -590,7 +593,8 @@ namespace llarp::quic void TunnelManager::flush_pending_incoming(ClientTunnel& ct) { - if (!ct.client) return; // Happens if we're still waiting for a path to build + if (!ct.client) + return; // Happens if we're still waiting for a path to build assert(ct.client->get_connection()); auto& conn = *ct.client->get_connection(); int available = conn.get_streams_available(); diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index 0eb49772f..dfaf0dec9 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -42,7 +42,7 @@ #include -static constexpr std::chrono::milliseconds ROUTER_TICK_INTERVAL = 1s; +static constexpr std::chrono::milliseconds ROUTER_TICK_INTERVAL = 100ms; namespace llarp { diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index 56a3d4034..b12bb8f1f 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -682,11 +682,7 @@ namespace llarp while (i != range.second) { itr->second->SetReadyHook( - [callback = i->second, addr](auto session) { - LogInfo(addr, " is ready to send"); - callback(addr, session); - }, - left); + [callback = i->second, addr](auto session) { callback(addr, session); }, left); ++i; } serviceLookups.erase(addr); @@ -703,11 +699,7 @@ namespace llarp if (itr != range.second) { session->SetReadyHook( - [callback = itr->second, addr](auto session) { - LogInfo(addr, " is ready to send"); - callback(addr, session); - }, - left); + [callback = itr->second, addr](auto session) { callback(addr, session); }, left); ++itr; } serviceLookups.erase(addr); @@ -1428,6 +1420,8 @@ namespace llarp std::optional ret = std::nullopt; for (const auto& [tag, session] : Sessions()) { + if (tag.IsZero()) + continue; if (session.remote.Addr() == *ptr and session.lastUsed >= time) { time = session.lastUsed; diff --git a/llarp/service/outbound_context.cpp b/llarp/service/outbound_context.cpp index f07812390..0f5d1a160 100644 --- a/llarp/service/outbound_context.cpp +++ b/llarp/service/outbound_context.cpp @@ -79,11 +79,9 @@ namespace llarp { if (remoteIntro != m_NextIntro) { - LogInfo(Name(), " swap intro to use ", RouterID{m_NextIntro.router}); remoteIntro = m_NextIntro; m_DataHandler->PutSenderFor(currentConvoTag, currentIntroSet.A, false); m_DataHandler->PutIntroFor(currentConvoTag, remoteIntro); - ShiftIntroduction(false); } } @@ -128,7 +126,9 @@ namespace llarp { if (markedBad) return false; - return (!remoteIntro.router.IsZero()) && GetPathByRouter(remoteIntro.router) != nullptr; + if (remoteIntro.router.IsZero()) + return false; + return GetPathByRouter(remoteIntro.router) != nullptr; } void @@ -157,10 +157,14 @@ namespace llarp } void - OutboundContext::HandlePathBuildFailed(path::Path_ptr p) + OutboundContext::HandlePathBuildFailedAt(path::Path_ptr p, RouterID hop) { - ShiftIntroRouter(p->Endpoint()); - path::Builder::HandlePathBuildFailed(p); + if (p->Endpoint() == hop) + { + // shift intro when we fail at the pivot + ShiftIntroRouter(p->Endpoint()); + } + path::Builder::HandlePathBuildFailedAt(p, hop); } void @@ -186,9 +190,11 @@ namespace llarp { if (sentIntro) return; - if (remoteIntro.router.IsZero()) - SwapIntros(); + { + LogWarn(Name(), " dropping intro frame we have no intro ready yet"); + return; + } auto path = m_PathSet->GetPathByRouter(remoteIntro.router); if (path == nullptr) diff --git a/llarp/service/outbound_context.hpp b/llarp/service/outbound_context.hpp index 1bb2f5c18..a7d7e3fac 100644 --- a/llarp/service/outbound_context.hpp +++ b/llarp/service/outbound_context.hpp @@ -106,7 +106,7 @@ namespace llarp HandlePathBuildTimeout(path::Path_ptr path) override; void - HandlePathBuildFailed(path::Path_ptr path) override; + HandlePathBuildFailedAt(path::Path_ptr path, RouterID hop) override; std::optional> GetHopsForBuild() override;