diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index d9e43b80f..605de51c7 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -101,11 +101,16 @@ namespace llarp { const auto now = llarp::time_now_ms(); m_LastIntrosetRegenAttempt = now; - std::set introset; - if (!GetCurrentIntroductionsWithFilter( - introset, [now](const service::Introduction& intro) -> bool { - return not intro.ExpiresSoon(now, path::min_intro_lifetime); + std::set intros; + if (const auto maybe = + GetCurrentIntroductionsWithFilter([now](const service::Introduction& intro) -> bool { + return not intro.ExpiresSoon( + now, path::default_lifetime - path::min_intro_lifetime); })) + { + intros.insert(maybe->begin(), maybe->end()); + } + else { LogWarn( "could not publish descriptors for endpoint ", @@ -146,9 +151,10 @@ namespace llarp } introSet().intros.clear(); - for (auto& intro : introset) + for (auto& intro : intros) { - introSet().intros.emplace_back(std::move(intro)); + if (introSet().intros.size() < numDesiredPaths) + introSet().intros.emplace_back(std::move(intro)); } if (introSet().intros.empty()) { @@ -710,8 +716,10 @@ namespace llarp return false; auto next_pub = m_state->m_LastPublishAttempt - + (m_state->m_IntroSet.HasExpiredIntros(now) ? INTROSET_PUBLISH_RETRY_INTERVAL - : INTROSET_PUBLISH_INTERVAL); + + (m_state->m_IntroSet.HasStaleIntros( + now, path::default_lifetime - path::intro_path_spread) + ? IntrosetPublishRetryCooldown + : IntrosetPublishInterval); return now >= next_pub and m_LastIntrosetRegenAttempt + 1s <= now; } @@ -739,8 +747,11 @@ namespace llarp { std::unordered_set exclude; ForEachPath([&exclude](auto path) { exclude.insert(path->Endpoint()); }); - const auto maybe = m_router->nodedb()->GetRandom( - [exclude](const auto& rc) -> bool { return exclude.count(rc.pubkey) == 0; }); + const auto maybe = + m_router->nodedb()->GetRandom([exclude, r = m_router](const auto& rc) -> bool { + return exclude.count(rc.pubkey) == 0 + and not r->routerProfiling().IsBadForPath(rc.pubkey); + }); if (not maybe.has_value()) return std::nullopt; return GetHopsForBuildWithEndpoint(maybe->pubkey); @@ -758,46 +769,27 @@ namespace llarp path::Builder::PathBuildStarted(path); } - constexpr auto MaxOutboundContextPerRemote = 4; + constexpr auto MaxOutboundContextPerRemote = 1; void Endpoint::PutNewOutboundContext(const service::IntroSet& introset, llarp_time_t left) { - Address addr{introset.addressKeys.Addr()}; + const Address addr{introset.addressKeys.Addr()}; auto& remoteSessions = m_state->m_RemoteSessions; - auto& serviceLookups = m_state->m_PendingServiceLookups; - if (remoteSessions.count(addr) >= MaxOutboundContextPerRemote) + if (remoteSessions.count(addr) < MaxOutboundContextPerRemote) { - auto itr = remoteSessions.find(addr); - - auto range = serviceLookups.equal_range(addr); - auto i = range.first; - while (i != range.second) - { - itr->second->SetReadyHook( - [callback = i->second, addr](auto session) { callback(addr, session); }, left); - ++i; - } - serviceLookups.erase(addr); - return; + remoteSessions.emplace(addr, std::make_shared(introset, this)); + LogInfo("Created New outbound context for ", addr.ToString()); } - auto session = std::make_shared(introset, this); - remoteSessions.emplace(addr, session); - LogInfo("Created New outbound context for ", addr.ToString()); - - // inform pending - auto range = serviceLookups.equal_range(addr); - auto itr = range.first; - if (itr != range.second) + auto sessionRange = remoteSessions.equal_range(addr); + for (auto itr = sessionRange.first; itr != sessionRange.second; ++itr) { - session->SetReadyHook( - [callback = itr->second, addr](auto session) { callback(addr, session); }, left); - ++itr; + itr->second->AddReadyHook( + [addr, this](auto session) { InformPathToService(addr, session); }, left); } - serviceLookups.erase(addr); } void @@ -924,7 +916,7 @@ namespace llarp paths.insert(path); }); - constexpr size_t min_unique_lns_endpoints = 3; + constexpr size_t min_unique_lns_endpoints = 2; // not enough paths if (paths.size() < min_unique_lns_endpoints) @@ -1066,11 +1058,11 @@ namespace llarp void Endpoint::QueueRecvData(RecvDataEvent ev) { - if (m_RecvQueue.full() || m_RecvQueue.empty()) + if (m_RecvQueue.full() or m_RecvQueue.empty()) { - m_router->loop()->call([this] { FlushRecvData(); }); + m_router->loop()->call_soon([this] { FlushRecvData(); }); } - m_RecvQueue.pushBack(std::move(ev)); + m_RecvQueue.tryPushBack(std::move(ev)); } bool @@ -1079,13 +1071,16 @@ namespace llarp { msg->sender.UpdateAddr(); if (not HasOutboundConvo(msg->sender.Addr())) + { PutSenderFor(msg->tag, msg->sender, true); - PutReplyIntroFor(msg->tag, path->intro); - Introduction intro; + } + Introduction intro{}; intro.pathID = from; intro.router = PubKey{path->Endpoint()}; intro.expiresAt = std::min(path->ExpireTime(), msg->introReply.expiresAt); + intro.latency = path->intro.latency; PutIntroFor(msg->tag, intro); + PutReplyIntroFor(msg->tag, path->intro); ConvoTagRX(msg->tag); return ProcessDataMessage(msg); } @@ -1178,10 +1173,11 @@ namespace llarp // not applicable because we are not an exit or don't have an endpoint auth policy if ((not m_state->m_ExitEnabled) or m_AuthPolicy == nullptr) return; - ProtocolFrame f; + ProtocolFrame f{}; f.R = AuthResultCodeAsInt(result.code); f.T = tag; f.F = path->intro.pathID; + f.N.Randomize(); if (result.code == AuthResultCode::eAuthAccepted) { ProtocolMessage msg; @@ -1189,10 +1185,7 @@ namespace llarp std::vector reason{}; reason.resize(result.reason.size()); std::copy_n(result.reason.c_str(), reason.size(), reason.data()); - msg.PutBuffer(reason); - f.N.Randomize(); - f.C.Zero(); if (m_AuthPolicy) msg.proto = ProtocolType::Auth; else @@ -1234,6 +1227,23 @@ namespace llarp Sessions().erase(t); } + void + Endpoint::ResetConvoTag(ConvoTag tag, path::Path_ptr p, PathID_t from) + { + // send reset convo tag message + ProtocolFrame f{}; + f.R = 1; + f.T = tag; + f.F = p->intro.pathID; + f.Sign(m_Identity); + { + LogWarn("invalidating convotag T=", tag); + RemoveConvoTag(tag); + m_SendQueue.tryPushBack( + SendEvent_t{std::make_shared(f, from), p}); + } + } + bool Endpoint::HandleHiddenServiceFrame(path::Path_ptr p, const ProtocolFrame& frame) { @@ -1253,23 +1263,7 @@ namespace llarp } if (not frame.AsyncDecryptAndVerify(Router()->loop(), p, m_Identity, this)) { - LogError("Failed to decrypt protocol frame"); - if (not frame.C.IsZero()) - { - // send reset convo tag message - ProtocolFrame f; - f.R = 1; - f.T = frame.T; - f.F = p->intro.pathID; - - f.Sign(m_Identity); - { - LogWarn("invalidating convotag T=", frame.T); - RemoveConvoTag(frame.T); - m_SendQueue.tryPushBack( - SendEvent_t{std::make_shared(f, frame.F), p}); - } - } + ResetConvoTag(frame.T, p, frame.F); } return true; } @@ -1279,8 +1273,8 @@ namespace llarp { m_router->routerProfiling().MarkPathTimeout(p.get()); ManualRebuild(1); - RegenAndPublishIntroSet(); path::Builder::HandlePathDied(p); + RegenAndPublishIntroSet(); } bool @@ -1294,22 +1288,54 @@ namespace llarp const Address& addr, std::optional introset, const RouterID& endpoint, - llarp_time_t timeLeft) + llarp_time_t timeLeft, + uint64_t relayOrder) { + // tell all our existing remote sessions about this introset update + const auto now = Router()->Now(); - auto& fails = m_state->m_ServiceLookupFails; auto& lookups = m_state->m_PendingServiceLookups; + if (introset) + { + auto& sessions = m_state->m_RemoteSessions; + auto range = sessions.equal_range(addr); + auto itr = range.first; + while (itr != range.second) + { + itr->second->OnIntroSetUpdate(addr, introset, endpoint, timeLeft, relayOrder); + // we got a successful lookup + if (itr->second->ReadyToSend() and not introset->IsExpired(now)) + { + // inform all lookups + auto lookup_range = lookups.equal_range(addr); + auto i = lookup_range.first; + while (i != lookup_range.second) + { + i->second(addr, itr->second.get()); + ++i; + } + lookups.erase(addr); + } + ++itr; + } + } + auto& fails = m_state->m_ServiceLookupFails; if (not introset or introset->IsExpired(now)) { LogError(Name(), " failed to lookup ", addr.ToString(), " from ", endpoint); fails[endpoint] = fails[endpoint] + 1; - // inform one - auto range = lookups.equal_range(addr); - auto itr = range.first; - if (itr != range.second) + // inform one if applicable + // when relay order is non zero we can be pretty sure that it's a fail as when relay order + // is zero that can sometimes yield a fail because it isn't always the closets in keyspace. + if (relayOrder > 0) { - itr->second(addr, nullptr); - itr = lookups.erase(itr); + auto range = lookups.equal_range(addr); + auto itr = range.first; + if (itr != range.second) + { + itr->second(addr, nullptr); + itr = lookups.erase(itr); + } } return false; } @@ -1334,6 +1360,20 @@ namespace llarp return m_state->m_OutboundSessions.count(addr) > 0; } + void + Endpoint::InformPathToService(const Address remote, OutboundContext* ctx) + { + auto& serviceLookups = m_state->m_PendingServiceLookups; + auto range = serviceLookups.equal_range(remote); + auto itr = range.first; + while (itr != range.second) + { + itr->second(remote, ctx); + ++itr; + } + serviceLookups.erase(remote); + } + bool Endpoint::EnsurePathToService(const Address remote, PathEnsureHook hook, llarp_time_t timeout) { @@ -1343,6 +1383,8 @@ namespace llarp static constexpr size_t RequestsPerLookup = 2; MarkAddressOutbound(remote); + // add response hook to list for address. + m_state->m_PendingServiceLookups.emplace(remote, hook); auto& sessions = m_state->m_RemoteSessions; { @@ -1352,20 +1394,17 @@ namespace llarp { if (itr->second->ReadyToSend()) { - hook(remote, itr->second.get()); + InformPathToService(remote, itr->second.get()); return true; } ++itr; } } - // add response hook to list for address. - m_state->m_PendingServiceLookups.emplace(remote, hook); - /// check replay filter if (not m_IntrosetLookupFilter.Insert(remote)) return true; - const auto paths = GetManyPathsWithUniqueEndpoints(this, NumParallelLookups); + const auto paths = GetManyPathsWithUniqueEndpoints(this, NumParallelLookups, remote.ToKey()); using namespace std::placeholders; const dht::Key_t location = remote.ToKey(); @@ -1425,14 +1464,8 @@ namespace llarp bool Endpoint::EnsurePathToSNode(const RouterID snode, SNodeEnsureHook h) { - static constexpr size_t MaxConcurrentSNodeSessions = 16; auto& nodeSessions = m_state->m_SNodeSessions; - if (nodeSessions.size() >= MaxConcurrentSNodeSessions) - { - // a quick client side work arround before we do proper limiting - LogError(Name(), " has too many snode sessions"); - return false; - } + using namespace std::placeholders; if (nodeSessions.count(snode) == 0) { @@ -1799,8 +1832,7 @@ namespace llarp LogError("failed to encrypt and sign"); return; } - self->m_SendQueue.pushBack(SendEvent_t{transfer, p}); - ; + self->m_SendQueue.tryPushBack(SendEvent_t{transfer, p}); }); return true; } @@ -1883,10 +1915,13 @@ namespace llarp bool Endpoint::ShouldBuildMore(llarp_time_t now) const { - if (not path::Builder::ShouldBuildMore(now)) + if (BuildCooldownHit(now)) + return false; + const auto requiredPaths = std::max(numDesiredPaths, path::min_intro_paths); + if (NumInStatus(path::ePathBuilding) >= requiredPaths) return false; - return ((now - lastBuild) > path::intro_path_spread) - || NumInStatus(path::ePathEstablished) < path::min_intro_paths; + return NumPathsExistingAt(now + (path::default_lifetime - path::intro_path_spread)) + < requiredPaths; } AbstractRouter* diff --git a/llarp/service/endpoint.hpp b/llarp/service/endpoint.hpp index 254a46e77..b8287dab3 100644 --- a/llarp/service/endpoint.hpp +++ b/llarp/service/endpoint.hpp @@ -49,12 +49,13 @@ namespace llarp struct OutboundContext; /// minimum interval for publishing introsets - static constexpr auto INTROSET_PUBLISH_INTERVAL = - std::chrono::milliseconds(path::default_lifetime) / 4; + static constexpr auto IntrosetPublishInterval = path::intro_path_spread / 2; - static constexpr auto INTROSET_PUBLISH_RETRY_INTERVAL = 5s; + /// how agressively should we retry publishing introset on failure + static constexpr auto IntrosetPublishRetryCooldown = 1s; - static constexpr auto INTROSET_LOOKUP_RETRY_COOLDOWN = 3s; + /// how aggressively should we retry looking up introsets + static constexpr auto IntrosetLookupCooldown = 250ms; struct Endpoint : public path::Builder, public ILookupHolder, @@ -330,6 +331,9 @@ namespace llarp using SNodeEnsureHook = std::function; + void + InformPathToService(const Address remote, OutboundContext* ctx); + /// ensure a path to a service node by public key bool EnsurePathToSNode(const RouterID remote, SNodeEnsureHook h); @@ -415,6 +419,9 @@ namespace llarp uint64_t GenTXID(); + void + ResetConvoTag(ConvoTag tag, path::Path_ptr path, PathID_t from); + const std::set& SnodeBlacklist() const; diff --git a/llarp/service/endpoint_util.cpp b/llarp/service/endpoint_util.cpp index ee0a613e7..782e2968e 100644 --- a/llarp/service/endpoint_util.cpp +++ b/llarp/service/endpoint_util.cpp @@ -106,6 +106,10 @@ namespace llarp ++itr; } } + for (auto& item : deadSessions) + { + item.second->Tick(now); + } } void diff --git a/llarp/service/endpoint_util.hpp b/llarp/service/endpoint_util.hpp index 2aa66bfd4..9b2c46965 100644 --- a/llarp/service/endpoint_util.hpp +++ b/llarp/service/endpoint_util.hpp @@ -43,15 +43,31 @@ namespace llarp template static path::Path::UniqueEndpointSet_t - GetManyPathsWithUniqueEndpoints(Endpoint_t* ep, size_t N, size_t tries = 10) + GetManyPathsWithUniqueEndpoints( + Endpoint_t* ep, + size_t N, + std::optional maybeLocation = std::nullopt, + size_t tries = 10) { + std::unordered_set exclude; path::Path::UniqueEndpointSet_t paths; do { --tries; - const auto path = ep->PickRandomEstablishedPath(); + path::Path_ptr path; + if (maybeLocation) + { + path = ep->GetEstablishedPathClosestTo(RouterID{maybeLocation->as_array()}, exclude); + } + else + { + path = ep->PickRandomEstablishedPath(); + } if (path and path->IsReady()) + { paths.emplace(path); + exclude.insert(path->Endpoint()); + } } while (tries > 0 and paths.size() < N); return paths; } diff --git a/llarp/service/intro.cpp b/llarp/service/intro.cpp index fd868ed0d..b7fde5556 100644 --- a/llarp/service/intro.cpp +++ b/llarp/service/intro.cpp @@ -9,6 +9,7 @@ namespace llarp { util::StatusObject obj{ {"router", router.ToHex()}, + {"path", pathID.ToHex()}, {"expiresAt", to_json(expiresAt)}, {"latency", to_json(latency)}, {"version", uint64_t(version)}}; @@ -66,8 +67,9 @@ namespace llarp std::ostream& Introduction::print(std::ostream& stream, int level, int spaces) const { + const RouterID r{router}; Printer printer(stream, level, spaces); - printer.printAttribute("k", RouterID(router)); + printer.printAttribute("k", r.ToString()); printer.printAttribute("l", latency.count()); printer.printAttribute("p", pathID); printer.printAttribute("v", version); diff --git a/llarp/service/intro.hpp b/llarp/service/intro.hpp index 1fa5fd790..bd0b928de 100644 --- a/llarp/service/intro.hpp +++ b/llarp/service/intro.hpp @@ -77,6 +77,16 @@ namespace llarp { return i.print(out, -1, -1); } + + /// comparitor for introset timestamp + struct CompareIntroTimestamp + { + bool + operator()(const Introduction& left, const Introduction& right) const + { + return left.expiresAt > right.expiresAt; + } + }; } // namespace service } // namespace llarp diff --git a/llarp/service/intro_set.cpp b/llarp/service/intro_set.cpp index 8bd8ae66d..ca6e58414 100644 --- a/llarp/service/intro_set.cpp +++ b/llarp/service/intro_set.cpp @@ -351,6 +351,15 @@ namespace llarp::service return false; } + bool + IntroSet::HasStaleIntros(llarp_time_t now, llarp_time_t delta) const + { + for (const auto& intro : intros) + if (intro.ExpiresSoon(now, delta)) + return true; + return false; + } + bool IntroSet::IsExpired(llarp_time_t now) const { diff --git a/llarp/service/intro_set.hpp b/llarp/service/intro_set.hpp index d0fbca60a..66e88fb14 100644 --- a/llarp/service/intro_set.hpp +++ b/llarp/service/intro_set.hpp @@ -69,6 +69,10 @@ namespace llarp bool HasExpiredIntros(llarp_time_t now) const; + /// return true if any of our intros expires soon given a delta + bool + HasStaleIntros(llarp_time_t now, llarp_time_t delta) const; + bool IsExpired(llarp_time_t now) const; diff --git a/llarp/service/outbound_context.cpp b/llarp/service/outbound_context.cpp index 5e280115d..9837248a1 100644 --- a/llarp/service/outbound_context.cpp +++ b/llarp/service/outbound_context.cpp @@ -48,6 +48,7 @@ namespace llarp MarkCurrentIntroBad(Now()); ShiftIntroduction(false); UpdateIntroSet(); + SwapIntros(); } return true; } @@ -58,17 +59,18 @@ namespace llarp : path::Builder{parent->Router(), OutboundContextNumPaths, parent->numHops} , SendContext{introset.addressKeys, {}, this, parent} , location{introset.addressKeys.Addr().ToKey()} + , addr{introset.addressKeys.Addr()} , currentIntroSet{introset} { updatingIntroSet = false; for (const auto& intro : introset.intros) { - if (intro.expiresAt > m_NextIntro.expiresAt) + if (m_NextIntro.latency == 0s or m_NextIntro.latency > intro.latency) m_NextIntro = intro; } - currentConvoTag.Randomize(); + lastShift = Now(); } OutboundContext::~OutboundContext() = default; @@ -151,8 +153,8 @@ namespace llarp } if (selectedIntro.router.IsZero() || selectedIntro.ExpiresSoon(now)) return; - LogWarn(Name(), " shfiting intro off of ", r, " to ", RouterID(selectedIntro.router)); m_NextIntro = selectedIntro; + lastShift = now; } void @@ -183,7 +185,7 @@ namespace llarp p->SetDataHandler(util::memFn(&OutboundContext::HandleHiddenServiceFrame, this)); p->SetDropHandler(util::memFn(&OutboundContext::HandleDataDrop, this)); // we now have a path to the next intro, swap intros - if (p->Endpoint() == m_NextIntro.router or p->Endpoint() == remoteIntro.router) + if (p->Endpoint() == m_NextIntro.router) SwapIntros(); else { @@ -194,7 +196,7 @@ namespace llarp void OutboundContext::AsyncGenIntro(const llarp_buffer_t& payload, ProtocolType t) { - if (sentIntro) + if (generatedIntro) return; if (remoteIntro.router.IsZero()) { @@ -202,21 +204,14 @@ namespace llarp return; } - auto path = m_PathSet->GetPathByRouter(remoteIntro.router); + auto path = GetPathByRouter(remoteIntro.router); if (path == nullptr) { - // try parent as fallback - path = m_Endpoint->GetPathByRouter(remoteIntro.router); - if (path == nullptr) - { - if (!BuildCooldownHit(Now())) - BuildOneAlignedTo(remoteIntro.router); - LogWarn(Name(), " dropping intro frame, no path to ", remoteIntro.router); - return; - } + LogError(Name(), " has no path to ", remoteIntro.router, " when we should have had one"); + return; } - sentIntro = true; auto frame = std::make_shared(); + frame->Clear(); auto ex = std::make_shared( m_Endpoint->Loop(), remoteIdent, @@ -228,22 +223,28 @@ namespace llarp t); ex->hook = [self = shared_from_this(), path](auto frame) { - self->Send(std::move(frame), path); + if (not self->Send(std::move(frame), path)) + return; + self->m_Endpoint->Loop()->call_later(100ms, [self]() { self->sentIntro = true; }); }; ex->msg.PutBuffer(payload); ex->msg.introReply = path->intro; frame->F = ex->msg.introReply.pathID; frame->R = 0; + generatedIntro = true; + // ensure we have a sender put for this convo tag + m_DataHandler->PutSenderFor(currentConvoTag, currentIntroSet.addressKeys, false); + // encrypt frame async m_Endpoint->Router()->QueueWork([ex, frame] { return AsyncKeyExchange::Encrypt(ex, frame); }); - LogInfo("send intro frame"); + + LogInfo(Name(), " send intro frame T=", currentConvoTag); } std::string OutboundContext::Name() const { - return "OBContext:" + m_Endpoint->Name() + "-" - + currentIntroSet.addressKeys.Addr().ToString(); + return "OBContext:" + currentIntroSet.addressKeys.Addr().ToString(); } void @@ -255,10 +256,9 @@ namespace llarp return; LogInfo(Name(), " updating introset"); m_LastIntrosetUpdateAt = now; - const auto addr = currentIntroSet.addressKeys.Addr(); // we want to use the parent endpoint's paths because outbound context // does not implement path::PathSet::HandleGotIntroMessage - const auto paths = GetManyPathsWithUniqueEndpoints(m_Endpoint, 2); + const auto paths = GetManyPathsWithUniqueEndpoints(m_Endpoint, 2, location); uint64_t relayOrder = 0; for (const auto& path : paths) { @@ -291,7 +291,7 @@ namespace llarp obj["seqno"] = sequenceNo; obj["markedBad"] = markedBad; obj["lastShift"] = to_json(lastShift); - obj["remoteIdentity"] = remoteIdent.Addr().ToString(); + obj["remoteIdentity"] = addr.ToString(); obj["currentRemoteIntroset"] = currentIntroSet.ExtractStatus(); obj["nextIntro"] = m_NextIntro.ExtractStatus(); obj["readyToSend"] = ReadyToSend(); @@ -318,32 +318,31 @@ namespace llarp if (m_LookupFails > 16 || m_BuildFails > 10) return true; - constexpr auto InboundTrafficTimeout = 5s; - if (ReadyToSend() and remoteIntro.router.IsZero()) { SwapIntros(); } - if (m_GotInboundTraffic and m_LastInboundTraffic + InboundTrafficTimeout <= now) + if ((remoteIntro.router.IsZero() or m_BadIntros.count(remoteIntro)) + and GetPathByRouter(m_NextIntro.router)) + SwapIntros(); + + if (m_GotInboundTraffic and m_LastInboundTraffic + sendTimeout <= now) { - if (std::chrono::abs(now - lastGoodSend) < InboundTrafficTimeout) - { - // timeout on other side - MarkCurrentIntroBad(now); - } + // timeout on other side + UpdateIntroSet(); + MarkCurrentIntroBad(now); + ShiftIntroRouter(remoteIntro.router); } - - // check for expiration - if (remoteIntro.ExpiresSoon(now)) + // check for stale intros + // update the introset if we think we need to + if (currentIntroSet.HasStaleIntros(now, path::intro_path_spread)) { UpdateIntroSet(); - // shift intro if it expires "soon" - if (ShiftIntroduction()) - SwapIntros(); // swap intros if we shifted } // lookup router in intro if set and unknown - m_Endpoint->EnsureRouterIsKnown(remoteIntro.router); + if (not m_NextIntro.router.IsZero()) + m_Endpoint->EnsureRouterIsKnown(m_NextIntro.router); // expire bad intros auto itr = m_BadIntros.begin(); while (itr != m_BadIntros.end()) @@ -354,7 +353,7 @@ namespace llarp ++itr; } - if (ReadyToSend() and m_ReadyHook) + if (ReadyToSend() and not m_ReadyHooks.empty()) { const auto path = GetPathByRouter(remoteIntro.router); if (not path) @@ -362,32 +361,40 @@ namespace llarp LogWarn(Name(), " ready but no path to ", remoteIntro.router, " ???"); return true; } - m_ReadyHook(this); - m_ReadyHook = nullptr; + for (const auto& hook : m_ReadyHooks) + hook(this); + m_ReadyHooks.clear(); } - if (lastGoodSend > 0s and now >= lastGoodSend + (sendTimeout / 2)) + const auto timeout = std::min(lastGoodSend, m_LastInboundTraffic); + if (lastGoodSend > 0s and now >= timeout + (sendTimeout / 2)) { // send a keep alive to keep this session alive KeepAlive(); } - // if we are dead return true so we are removed - return lastGoodSend > 0s ? (now >= lastGoodSend && now - lastGoodSend > sendTimeout) - : (now >= createdAt && now - createdAt > connectTimeout); + return timeout > 0s ? (now >= timeout && now - timeout > sendTimeout) + : (now >= createdAt && now - createdAt > connectTimeout); } void - OutboundContext::SetReadyHook(std::function hook, llarp_time_t timeout) + OutboundContext::AddReadyHook(std::function hook, llarp_time_t timeout) { - if (m_ReadyHook) + if (ReadyToSend()) + { + hook(this); return; - m_ReadyHook = hook; - m_router->loop()->call_later(timeout, [this]() { - if (m_ReadyHook) - m_ReadyHook(nullptr); - m_ReadyHook = nullptr; - }); + } + if (m_ReadyHooks.empty()) + { + m_router->loop()->call_later(timeout, [this]() { + LogWarn(Name(), " did not obtain session in time"); + for (const auto& hook : m_ReadyHooks) + hook(nullptr); + m_ReadyHooks.clear(); + }); + } + m_ReadyHooks.push_back(hook); } std::optional> @@ -405,16 +412,22 @@ namespace llarp bool OutboundContext::ShouldBuildMore(llarp_time_t now) const { - if (markedBad || not path::Builder::ShouldBuildMore(now)) + if (markedBad or path::Builder::BuildCooldownHit(now)) return false; if (NumInStatus(path::ePathBuilding) >= numDesiredPaths) return false; - llarp_time_t t = 0s; - ForEachPath([&t](path::Path_ptr path) { - if (path->IsReady()) - t = std::max(path->ExpireTime(), t); + + if (m_BadIntros.count(remoteIntro)) + return true; + + size_t numValidPaths = 0; + ForEachPath([now, &numValidPaths](path::Path_ptr path) { + if (not path->IsReady()) + return; + if (not path->intro.ExpiresSoon(now, path::default_lifetime - path::intro_path_spread)) + numValidPaths++; }); - return t >= now + path::default_lifetime / 4; + return numValidPaths < numDesiredPaths; } void @@ -430,12 +443,24 @@ namespace llarp m_BadIntros[intro] = now; } + bool + OutboundContext::IntroSent() const + { + return sentIntro; + } + + bool + OutboundContext::IntroGenerated() const + { + return sentIntro; + } + bool OutboundContext::ShiftIntroduction(bool rebuild) { bool success = false; - auto now = Now(); - if (now - lastShift < MIN_SHIFT_INTERVAL) + const auto now = Now(); + if (abs(now - lastShift) < shiftTimeout) return false; bool shifted = false; std::vector intros = currentIntroSet.intros; @@ -496,7 +521,7 @@ namespace llarp { // unconditionally update introset UpdateIntroSet(); - const RouterID endpoint(path->Endpoint()); + const RouterID endpoint{path->Endpoint()}; // if a path to our current intro died... if (endpoint == remoteIntro.router) { @@ -506,50 +531,13 @@ namespace llarp if (p->Endpoint() == endpoint && p->IsReady()) ++num; }); - // if we have more than two then we are probably fine - if (num > 2) - return; - // if we have one working one ... - if (num == 1) - { - num = 0; - ForEachPath([&](const path::Path_ptr& p) { - if (p->Endpoint() == endpoint) - ++num; - }); - // if we have 2 or more established or pending don't do anything - if (num > 2) - return; - BuildOneAlignedTo(endpoint); - } - else if (num == 0) + if (num == 0) { - // we have no paths to this router right now - // hop off it - Introduction picked; - // get the latest intro that isn't on that endpoint - for (const auto& intro : currentIntroSet.intros) - { - if (intro.router == endpoint) - continue; - if (intro.expiresAt > picked.expiresAt) - picked = intro; - } - // we got nothing - if (picked.router.IsZero()) - { - return; - } - m_NextIntro = picked; - // check if we have a path to this router - num = 0; - ForEachPath([&](const path::Path_ptr& p) { - // don't count timed out paths - if (p->Status() != path::ePathTimeout && p->Endpoint() == m_NextIntro.router) - ++num; - }); - // build a path if one isn't already pending build or established - BuildOneAlignedTo(m_NextIntro.router); + // we have no more paths to this endpoint so we want to pivot off of it + MarkCurrentIntroBad(Now()); + ShiftIntroRouter(endpoint); + if (m_NextIntro.router != endpoint) + BuildOneAlignedTo(m_NextIntro.router); } } } diff --git a/llarp/service/outbound_context.hpp b/llarp/service/outbound_context.hpp index 011aa74b8..6b5d00b84 100644 --- a/llarp/service/outbound_context.hpp +++ b/llarp/service/outbound_context.hpp @@ -57,7 +57,7 @@ namespace llarp /// shift the intro off the current router it is using void - ShiftIntroRouter(const RouterID remote); + ShiftIntroRouter(const RouterID remote) override; /// mark the current remote intro as bad void @@ -71,7 +71,7 @@ namespace llarp ReadyToSend() const; void - SetReadyHook(std::function readyHook, llarp_time_t timeout); + AddReadyHook(std::function readyHook, llarp_time_t timeout); /// for exits void @@ -129,19 +129,26 @@ namespace llarp llarp_time_t RTT() const; + bool + OnIntroSetUpdate( + const Address& addr, + std::optional i, + const RouterID& endpoint, + llarp_time_t, + uint64_t relayOrder); + private: /// swap remoteIntro with next intro void SwapIntros(); - void - OnGeneratedIntroFrame(AsyncKeyExchange* k, PathID_t p); - bool - OnIntroSetUpdate( - const Address& addr, std::optional i, const RouterID& endpoint, llarp_time_t); + IntroGenerated() const override; + bool + IntroSent() const override; const dht::Key_t location; + const Address addr; uint64_t m_UpdateIntrosetTX = 0; IntroSet currentIntroSet; Introduction m_NextIntro; @@ -151,8 +158,9 @@ namespace llarp uint16_t m_BuildFails = 0; llarp_time_t m_LastInboundTraffic = 0s; bool m_GotInboundTraffic = false; + bool generatedIntro = false; bool sentIntro = false; - std::function m_ReadyHook; + std::vector> m_ReadyHooks; llarp_time_t m_LastIntrosetUpdateAt = 0s; }; } // namespace service diff --git a/llarp/service/protocol.cpp b/llarp/service/protocol.cpp index f7543a3c8..89c6a09aa 100644 --- a/llarp/service/protocol.cpp +++ b/llarp/service/protocol.cpp @@ -458,15 +458,21 @@ namespace llarp } }; handler->Router()->QueueWork( - [v, msg = std::move(msg), recvPath = std::move(recvPath), callback]() { + [v, msg = std::move(msg), recvPath = std::move(recvPath), callback, handler]() { + auto resetTag = [handler, tag = v->frame.T, from = v->frame.F, path = recvPath]() { + handler->ResetConvoTag(tag, path, from); + }; + if (not v->frame.Verify(v->si)) { LogError("Signature failure from ", v->si.Addr()); + handler->Loop()->call_soon(resetTag); return; } if (not v->frame.DecryptPayloadInto(v->shared, *msg)) { - LogError("failed to decrypt message"); + LogError("failed to decrypt message from ", v->si.Addr()); + handler->Loop()->call_soon(resetTag); return; } callback(msg); diff --git a/llarp/service/protocol.hpp b/llarp/service/protocol.hpp index 623e2afa7..f2485c3af 100644 --- a/llarp/service/protocol.hpp +++ b/llarp/service/protocol.hpp @@ -96,7 +96,7 @@ namespace llarp version = other.version; } - ProtocolFrame() : routing::IMessage() + ProtocolFrame() : routing::IMessage{} { Clear(); } diff --git a/llarp/service/sendcontext.cpp b/llarp/service/sendcontext.cpp index 5425839e2..06330e054 100644 --- a/llarp/service/sendcontext.cpp +++ b/llarp/service/sendcontext.cpp @@ -26,13 +26,15 @@ namespace llarp bool SendContext::Send(std::shared_ptr msg, path::Path_ptr path) { + if (not path->IsReady()) + return false; if (m_SendQueue.empty() or m_SendQueue.full()) { - m_Endpoint->Loop()->call([this] { FlushUpstream(); }); + m_Endpoint->Loop()->call_soon([this] { FlushUpstream(); }); } - m_SendQueue.pushBack(std::make_pair( - std::make_shared(*msg, remoteIntro.pathID), path)); - return true; + return m_SendQueue.tryPushBack(std::make_pair( + std::make_shared(*msg, remoteIntro.pathID), path)) + == thread::QueueReturn::Success; } void @@ -84,13 +86,15 @@ namespace llarp auto path = m_PathSet->GetPathByRouter(remoteIntro.router); if (!path) { - LogWarn(m_Endpoint->Name(), " cannot encrypt and send: no path for intro ", remoteIntro); + ShiftIntroRouter(remoteIntro.router); + LogWarn(m_PathSet->Name(), " cannot encrypt and send: no path for intro ", remoteIntro); return; } if (!m_DataHandler->GetCachedSessionKeyFor(f->T, shared)) { - LogWarn(m_Endpoint->Name(), " has no cached session key on session T=", f->T); + LogWarn( + m_PathSet->Name(), " could not send, has no cached session key on session T=", f->T); return; } @@ -104,7 +108,7 @@ namespace llarp } else { - LogWarn(m_Endpoint->Name(), " no session T=", f->T); + LogWarn(m_PathSet->Name(), " could not get sequence number for session T=", f->T); return; } m->introReply = path->intro; @@ -115,7 +119,7 @@ namespace llarp m_Endpoint->Router()->QueueWork([f, m, shared, path, this] { if (not f->EncryptAndSign(*m, shared, m_Endpoint->GetIdentity())) { - LogError(m_Endpoint->Name(), " failed to sign message"); + LogError(m_PathSet->Name(), " failed to sign message"); return; } Send(f, path); @@ -140,11 +144,15 @@ namespace llarp void SendContext::AsyncEncryptAndSendTo(const llarp_buffer_t& data, ProtocolType protocol) { - if (lastGoodSend != 0s) + if (IntroSent()) { EncryptAndSendTo(data, protocol); return; } + // have we generated the initial intro but not sent it yet? bail here so we don't cause + // bullshittery + if (IntroGenerated() and not IntroSent()) + return; const auto maybe = m_Endpoint->MaybeGetAuthInfoForEndpoint(remoteIdent.Addr()); if (maybe.has_value()) { diff --git a/llarp/service/sendcontext.hpp b/llarp/service/sendcontext.hpp index 517bea988..aa38cc8b0 100644 --- a/llarp/service/sendcontext.hpp +++ b/llarp/service/sendcontext.hpp @@ -44,8 +44,9 @@ namespace llarp uint64_t sequenceNo = 0; llarp_time_t lastGoodSend = 0s; const llarp_time_t createdAt; - llarp_time_t sendTimeout = 40s; - llarp_time_t connectTimeout = 60s; + llarp_time_t sendTimeout = path::build_timeout * 2; + llarp_time_t connectTimeout = path::build_timeout * 4; + llarp_time_t shiftTimeout = (path::build_timeout * 5) / 2; llarp_time_t estimatedRTT = 0s; bool markedBad = false; using Msg_ptr = std::shared_ptr; @@ -62,6 +63,9 @@ namespace llarp return true; } + virtual void + ShiftIntroRouter(const RouterID) = 0; + virtual void UpdateIntroSet() = 0; @@ -72,6 +76,11 @@ namespace llarp AsyncSendAuth(std::function replyHandler); private: + virtual bool + IntroGenerated() const = 0; + virtual bool + IntroSent() const = 0; + void EncryptAndSendTo(const llarp_buffer_t& payload, ProtocolType t);