diff --git a/llarp/dht/context.hpp b/llarp/dht/context.hpp index 231a224fb..fd23025af 100644 --- a/llarp/dht/context.hpp +++ b/llarp/dht/context.hpp @@ -23,6 +23,15 @@ namespace llarp namespace dht { + /// number of routers to publish to + static constexpr size_t IntroSetRelayRedundancy = 2; + + /// number of dht locations handled per relay + static constexpr size_t IntroSetRequestsPerRelay = 2; + + static constexpr size_t IntroSetStorageRedundancy = + (IntroSetRelayRedundancy * IntroSetRequestsPerRelay); + struct AbstractContext { using PendingIntrosetLookups = diff --git a/llarp/dht/messages/findintro.cpp b/llarp/dht/messages/findintro.cpp index 998120e41..fc905616d 100644 --- a/llarp/dht/messages/findintro.cpp +++ b/llarp/dht/messages/findintro.cpp @@ -98,35 +98,27 @@ namespace llarp // we are relaying this message for e.g. a client if(relayed) { - uint32_t numDesired = 0; - if(relayOrder == 0) - numDesired = 2; - else if(relayOrder == 1) - numDesired = 4; - else + if(relayOrder >= IntroSetStorageRedundancy) { - // TODO: consider forward-compatibility here - LogError("Error: relayOrder must be 0 or 1"); - return false; + llarp::LogWarn("Invalid relayOrder received: ", relayOrder); + replies.emplace_back(new GotIntroMessage({}, txID)); + return true; } - auto closestRCs = - dht.GetRouter()->nodedb()->FindClosestTo(location, numDesired); + auto closestRCs = dht.GetRouter()->nodedb()->FindClosestTo( + location, IntroSetStorageRedundancy); - // if relayOrder == 1, we want the 3rd and 4th closest, so remove the - // 1st and 2nd closest - if(relayOrder == 1) + if(closestRCs.size() <= relayOrder) { - auto itr = closestRCs.begin(); - std::advance(itr, 2); - closestRCs.erase(closestRCs.begin(), itr); + llarp::LogWarn("Can't fulfill FindIntro for relayOrder: ", + relayOrder); + replies.emplace_back(new GotIntroMessage({}, txID)); + return true; } - for(const auto& entry : closestRCs) - { - Key_t peer = Key_t(entry.pubkey); - dht.LookupIntroSetForPath(location, txID, pathID, peer, 0); - } + const auto& entry = closestRCs[relayOrder]; + Key_t peer = Key_t(entry.pubkey); + dht.LookupIntroSetForPath(location, txID, pathID, peer, 0); } else { diff --git a/llarp/dht/messages/pubintro.cpp b/llarp/dht/messages/pubintro.cpp index 89b93348f..553e757f5 100644 --- a/llarp/dht/messages/pubintro.cpp +++ b/llarp/dht/messages/pubintro.cpp @@ -81,10 +81,9 @@ namespace llarp const llarp::dht::Key_t addr(introset.derivedSigningKey); // identify closest 4 routers - static constexpr size_t StorageRedundancy = 4; - auto closestRCs = - dht.GetRouter()->nodedb()->FindClosestTo(addr, StorageRedundancy); - if(closestRCs.size() != StorageRedundancy) + auto closestRCs = dht.GetRouter()->nodedb()->FindClosestTo( + addr, IntroSetStorageRedundancy); + if(closestRCs.size() != IntroSetStorageRedundancy) { llarp::LogWarn("Received PublishIntroMessage but only know ", closestRCs.size(), " nodes"); @@ -96,7 +95,7 @@ namespace llarp // function to identify the closest 4 routers we know of for this introset auto propagateIfNotUs = [&](size_t index) { - assert(index < StorageRedundancy); + assert(index < IntroSetStorageRedundancy); const auto &rc = closestRCs[index]; const Key_t peer{rc.pubkey}; @@ -125,7 +124,7 @@ namespace llarp if(relayed) { - if(relayOrder >= StorageRedundancy) + if(relayOrder >= IntroSetStorageRedundancy) { llarp::LogWarn( "Received PublishIntroMessage with invalid relayOrder: ", @@ -166,7 +165,7 @@ namespace llarp "!!! Received PubIntro with relayed==false but we aren't" " candidate, intro derived key: ", keyStr, ", txid=", txID, ", message from: ", From); - for(size_t i = 0; i < StorageRedundancy; ++i) + for(size_t i = 0; i < IntroSetStorageRedundancy; ++i) { propagateIfNotUs(i); } diff --git a/llarp/messages/relay_commit.cpp b/llarp/messages/relay_commit.cpp index eee8a41be..076b1c450 100644 --- a/llarp/messages/relay_commit.cpp +++ b/llarp/messages/relay_commit.cpp @@ -264,7 +264,8 @@ namespace llarp if(self->context->CheckPathLimitHitByIP(self->fromAddr.value())) { // we hit a limit so tell it to slow tf down - llarp::LogError("client path build hit limit ", self->hop->info); + llarp::LogError("client path build hit limit ", + self->fromAddr.value()); OnForwardLRCMResult(self->context->Router(), self->hop->info.rxID, self->hop->info.downstream, self->hop->pathKey, SendStatus::Congestion); diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index c3983da62..29dc8a75d 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -1,6 +1,7 @@ #include #include +#include #include #include #include @@ -184,6 +185,8 @@ namespace llarp RegenAndPublishIntroSet(); } + m_state->m_RemoteLookupFilter.Decay(now); + // expire snode sessions EndpointUtil::ExpireSNodeSessions(now, m_state->m_SNodeSessions); // expire pending tx @@ -418,6 +421,7 @@ namespace llarp bool Endpoint::Start() { + m_state->m_RemoteLookupFilter.DecayInterval(500ms); // how can I tell if a m_Identity isn't loaded? if(!m_DataHandler) { @@ -451,23 +455,13 @@ namespace llarp Endpoint::PublishIntroSet(const EncryptedIntroSet& introset, AbstractRouter* r) { - /// number of routers to publish to - static constexpr size_t RelayRedundancy = 2; - - /// number of dht locations handled per relay - static constexpr size_t RequestsPerRelay = 2; - - /// total number of dht locations that should store this introset - static constexpr size_t StorageRedundancy = - (RelayRedundancy * RequestsPerRelay); - assert(StorageRedundancy == 4); - - const auto paths = GetManyPathsWithUniqueEndpoints(this, RelayRedundancy); + const auto paths = GetManyPathsWithUniqueEndpoints( + this, llarp::dht::IntroSetRelayRedundancy); - if(paths.size() != RelayRedundancy) + if(paths.size() != llarp::dht::IntroSetRelayRedundancy) { LogWarn("Cannot publish intro set because we only have ", paths.size(), - " paths, but need ", RelayRedundancy); + " paths, but need ", llarp::dht::IntroSetRelayRedundancy); return false; } @@ -475,16 +469,16 @@ namespace llarp size_t published = 0; for(const auto& path : paths) { - for(size_t i = 0; i < RequestsPerRelay; ++i) + for(size_t i = 0; i < llarp::dht::IntroSetRequestsPerRelay; ++i) { if(PublishIntroSetVia(introset, r, path, published)) published++; } } - if(published != StorageRedundancy) + if(published != llarp::dht::IntroSetStorageRedundancy) LogWarn("Publish introset failed: could only publish ", published, - " copies but wanted ", StorageRedundancy); - return published == StorageRedundancy; + " copies but wanted ", llarp::dht::IntroSetStorageRedundancy); + return published == llarp::dht::IntroSetStorageRedundancy; } struct PublishIntroSetJob : public IServiceLookup @@ -961,6 +955,11 @@ namespace llarp } return false; } + // check for established outbound context + + if(m_state->m_RemoteSessions.count(addr) > 0) + return true; + PutNewOutboundContext(introset.value()); return true; } @@ -981,7 +980,10 @@ namespace llarp Endpoint::EnsurePathToService(const Address remote, PathEnsureHook hook, llarp_time_t /*timeoutMS*/) { - static constexpr size_t NumParalellLookups = 2; + /// how many routers to use for lookups + static constexpr size_t NumParallelLookups = 2; + /// how many requests per router + static constexpr size_t RequestsPerLookup = 2; LogInfo(Name(), " Ensure Path to ", remote.ToString()); MarkAddressOutbound(remote); @@ -997,30 +999,39 @@ namespace llarp } } + // filter check for address + if(m_state->m_RemoteLookupFilter.Insert(remote)) + return false; + auto& lookups = m_state->m_PendingServiceLookups; const auto paths = - GetManyPathsWithUniqueEndpoints(this, NumParalellLookups); + GetManyPathsWithUniqueEndpoints(this, NumParallelLookups); using namespace std::placeholders; size_t lookedUp = 0; const dht::Key_t location = remote.ToKey(); + uint64_t order = 0; for(const auto& path : paths) { - HiddenServiceAddressLookup* job = new HiddenServiceAddressLookup( - this, util::memFn(&Endpoint::OnLookup, this), location, - PubKey{remote.as_array()}, 0, GenTXID()); - LogInfo("doing lookup for ", remote, " via ", path->Endpoint(), " at ", - location); - if(job->SendRequestViaPath(path, Router())) + for(size_t count = 0; count < RequestsPerLookup; ++count) { - lookups.emplace(remote, hook); - lookedUp++; + HiddenServiceAddressLookup* job = new HiddenServiceAddressLookup( + this, util::memFn(&Endpoint::OnLookup, this), location, + PubKey{remote.as_array()}, order, GenTXID()); + LogInfo("doing lookup for ", remote, " via ", path->Endpoint(), + " at ", location, " order=", order); + order++; + if(job->SendRequestViaPath(path, Router())) + { + lookups.emplace(remote, hook); + lookedUp++; + } + else + LogError(Name(), " send via path failed for lookup"); } - else - LogError(Name(), " send via path failed for lookup"); } - return lookedUp == NumParalellLookups; + return lookedUp == (NumParallelLookups * RequestsPerLookup); } bool diff --git a/llarp/service/endpoint_state.hpp b/llarp/service/endpoint_state.hpp index 9323f804a..e0d7e8127 100644 --- a/llarp/service/endpoint_state.hpp +++ b/llarp/service/endpoint_state.hpp @@ -98,6 +98,8 @@ namespace llarp std::unordered_map< Tag, CachedTagResult, Tag::Hash > m_PrefetchedTags; + util::DecayingHashSet< Address > m_RemoteLookupFilter; + bool SetOption(const std::string& k, const std::string& v, Endpoint& ep); diff --git a/llarp/service/endpoint_util.cpp b/llarp/service/endpoint_util.cpp index 8905c4037..0314d5886 100644 --- a/llarp/service/endpoint_util.cpp +++ b/llarp/service/endpoint_util.cpp @@ -88,7 +88,7 @@ namespace llarp void EndpointUtil::TickRemoteSessions(llarp_time_t now, Sessions& remoteSessions, - Sessions& deadSessions, ConvoMap & sessions) + Sessions& deadSessions, ConvoMap& sessions) { auto itr = remoteSessions.begin(); while(itr != remoteSessions.end()) diff --git a/llarp/service/endpoint_util.hpp b/llarp/service/endpoint_util.hpp index 66d24f2d9..fdc1c7b7d 100644 --- a/llarp/service/endpoint_util.hpp +++ b/llarp/service/endpoint_util.hpp @@ -23,7 +23,7 @@ namespace llarp static void TickRemoteSessions(llarp_time_t now, Sessions& remoteSessions, - Sessions& deadSessions, ConvoMap & sessions); + Sessions& deadSessions, ConvoMap& sessions); static void ExpireConvoSessions(llarp_time_t now, ConvoMap& sessions); diff --git a/llarp/service/hidden_service_address_lookup.cpp b/llarp/service/hidden_service_address_lookup.cpp index 4950d2049..d1e2759d0 100644 --- a/llarp/service/hidden_service_address_lookup.cpp +++ b/llarp/service/hidden_service_address_lookup.cpp @@ -16,7 +16,6 @@ namespace llarp , relayOrder(order) , location(l) , handle(std::move(h)) - , requestsSent(0) { } @@ -39,19 +38,7 @@ namespace llarp if(maybe.has_value()) found = maybe.value(); } - handle(remote, found, endpoint); - requestsSent--; - return requestsSent == 0; - } - - bool - HiddenServiceAddressLookup::SendRequestViaPath(path::Path_ptr p, - AbstractRouter* r) - { - if(not IServiceLookup::SendRequestViaPath(p, r)) - return false; - requestsSent += 2; - return true; + return handle(remote, found, endpoint); } std::shared_ptr< routing::IMessage > diff --git a/llarp/service/hidden_service_address_lookup.hpp b/llarp/service/hidden_service_address_lookup.hpp index 76970b660..55be3df13 100644 --- a/llarp/service/hidden_service_address_lookup.hpp +++ b/llarp/service/hidden_service_address_lookup.hpp @@ -18,7 +18,6 @@ namespace llarp using HandlerFunc = std::function< bool( const Address&, nonstd::optional< IntroSet >, const RouterID&) >; HandlerFunc handle; - size_t requestsSent; HiddenServiceAddressLookup(Endpoint* p, HandlerFunc h, const dht::Key_t& location, @@ -27,9 +26,6 @@ namespace llarp ~HiddenServiceAddressLookup() override = default; - bool - SendRequestViaPath(path::Path_ptr p, AbstractRouter* r) override; - bool HandleResponse(const std::set< EncryptedIntroSet >& results) override;