Merge pull request #1151 from majestrate/fix-up-introset-lookup-fails-2020-03-02

fix up introset lookups
pull/1153/head
Jeff 4 years ago committed by GitHub
commit 00f08ca5f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -23,6 +23,15 @@ namespace llarp
namespace dht
{
/// number of routers to publish to
static constexpr size_t IntroSetRelayRedundancy = 2;
/// number of dht locations handled per relay
static constexpr size_t IntroSetRequestsPerRelay = 2;
static constexpr size_t IntroSetStorageRedundancy =
(IntroSetRelayRedundancy * IntroSetRequestsPerRelay);
struct AbstractContext
{
using PendingIntrosetLookups =

@ -98,35 +98,27 @@ namespace llarp
// we are relaying this message for e.g. a client
if(relayed)
{
uint32_t numDesired = 0;
if(relayOrder == 0)
numDesired = 2;
else if(relayOrder == 1)
numDesired = 4;
else
if(relayOrder >= IntroSetStorageRedundancy)
{
// TODO: consider forward-compatibility here
LogError("Error: relayOrder must be 0 or 1");
return false;
llarp::LogWarn("Invalid relayOrder received: ", relayOrder);
replies.emplace_back(new GotIntroMessage({}, txID));
return true;
}
auto closestRCs =
dht.GetRouter()->nodedb()->FindClosestTo(location, numDesired);
auto closestRCs = dht.GetRouter()->nodedb()->FindClosestTo(
location, IntroSetStorageRedundancy);
// if relayOrder == 1, we want the 3rd and 4th closest, so remove the
// 1st and 2nd closest
if(relayOrder == 1)
if(closestRCs.size() <= relayOrder)
{
auto itr = closestRCs.begin();
std::advance(itr, 2);
closestRCs.erase(closestRCs.begin(), itr);
llarp::LogWarn("Can't fulfill FindIntro for relayOrder: ",
relayOrder);
replies.emplace_back(new GotIntroMessage({}, txID));
return true;
}
for(const auto& entry : closestRCs)
{
Key_t peer = Key_t(entry.pubkey);
dht.LookupIntroSetForPath(location, txID, pathID, peer, 0);
}
const auto& entry = closestRCs[relayOrder];
Key_t peer = Key_t(entry.pubkey);
dht.LookupIntroSetForPath(location, txID, pathID, peer, 0);
}
else
{

@ -81,10 +81,9 @@ namespace llarp
const llarp::dht::Key_t addr(introset.derivedSigningKey);
// identify closest 4 routers
static constexpr size_t StorageRedundancy = 4;
auto closestRCs =
dht.GetRouter()->nodedb()->FindClosestTo(addr, StorageRedundancy);
if(closestRCs.size() != StorageRedundancy)
auto closestRCs = dht.GetRouter()->nodedb()->FindClosestTo(
addr, IntroSetStorageRedundancy);
if(closestRCs.size() != IntroSetStorageRedundancy)
{
llarp::LogWarn("Received PublishIntroMessage but only know ",
closestRCs.size(), " nodes");
@ -96,7 +95,7 @@ namespace llarp
// function to identify the closest 4 routers we know of for this introset
auto propagateIfNotUs = [&](size_t index) {
assert(index < StorageRedundancy);
assert(index < IntroSetStorageRedundancy);
const auto &rc = closestRCs[index];
const Key_t peer{rc.pubkey};
@ -125,7 +124,7 @@ namespace llarp
if(relayed)
{
if(relayOrder >= StorageRedundancy)
if(relayOrder >= IntroSetStorageRedundancy)
{
llarp::LogWarn(
"Received PublishIntroMessage with invalid relayOrder: ",
@ -166,7 +165,7 @@ namespace llarp
"!!! Received PubIntro with relayed==false but we aren't"
" candidate, intro derived key: ",
keyStr, ", txid=", txID, ", message from: ", From);
for(size_t i = 0; i < StorageRedundancy; ++i)
for(size_t i = 0; i < IntroSetStorageRedundancy; ++i)
{
propagateIfNotUs(i);
}

@ -264,7 +264,8 @@ namespace llarp
if(self->context->CheckPathLimitHitByIP(self->fromAddr.value()))
{
// we hit a limit so tell it to slow tf down
llarp::LogError("client path build hit limit ", self->hop->info);
llarp::LogError("client path build hit limit ",
self->fromAddr.value());
OnForwardLRCMResult(self->context->Router(), self->hop->info.rxID,
self->hop->info.downstream, self->hop->pathKey,
SendStatus::Congestion);

@ -1,6 +1,7 @@
#include <chrono>
#include <service/endpoint.hpp>
#include <dht/context.hpp>
#include <dht/messages/findintro.hpp>
#include <dht/messages/findrouter.hpp>
#include <dht/messages/gotintro.hpp>
@ -184,6 +185,8 @@ namespace llarp
RegenAndPublishIntroSet();
}
m_state->m_RemoteLookupFilter.Decay(now);
// expire snode sessions
EndpointUtil::ExpireSNodeSessions(now, m_state->m_SNodeSessions);
// expire pending tx
@ -418,6 +421,7 @@ namespace llarp
bool
Endpoint::Start()
{
m_state->m_RemoteLookupFilter.DecayInterval(500ms);
// how can I tell if a m_Identity isn't loaded?
if(!m_DataHandler)
{
@ -451,23 +455,13 @@ namespace llarp
Endpoint::PublishIntroSet(const EncryptedIntroSet& introset,
AbstractRouter* r)
{
/// number of routers to publish to
static constexpr size_t RelayRedundancy = 2;
/// number of dht locations handled per relay
static constexpr size_t RequestsPerRelay = 2;
/// total number of dht locations that should store this introset
static constexpr size_t StorageRedundancy =
(RelayRedundancy * RequestsPerRelay);
assert(StorageRedundancy == 4);
const auto paths = GetManyPathsWithUniqueEndpoints(this, RelayRedundancy);
const auto paths = GetManyPathsWithUniqueEndpoints(
this, llarp::dht::IntroSetRelayRedundancy);
if(paths.size() != RelayRedundancy)
if(paths.size() != llarp::dht::IntroSetRelayRedundancy)
{
LogWarn("Cannot publish intro set because we only have ", paths.size(),
" paths, but need ", RelayRedundancy);
" paths, but need ", llarp::dht::IntroSetRelayRedundancy);
return false;
}
@ -475,16 +469,16 @@ namespace llarp
size_t published = 0;
for(const auto& path : paths)
{
for(size_t i = 0; i < RequestsPerRelay; ++i)
for(size_t i = 0; i < llarp::dht::IntroSetRequestsPerRelay; ++i)
{
if(PublishIntroSetVia(introset, r, path, published))
published++;
}
}
if(published != StorageRedundancy)
if(published != llarp::dht::IntroSetStorageRedundancy)
LogWarn("Publish introset failed: could only publish ", published,
" copies but wanted ", StorageRedundancy);
return published == StorageRedundancy;
" copies but wanted ", llarp::dht::IntroSetStorageRedundancy);
return published == llarp::dht::IntroSetStorageRedundancy;
}
struct PublishIntroSetJob : public IServiceLookup
@ -961,6 +955,11 @@ namespace llarp
}
return false;
}
// check for established outbound context
if(m_state->m_RemoteSessions.count(addr) > 0)
return true;
PutNewOutboundContext(introset.value());
return true;
}
@ -981,7 +980,10 @@ namespace llarp
Endpoint::EnsurePathToService(const Address remote, PathEnsureHook hook,
llarp_time_t /*timeoutMS*/)
{
static constexpr size_t NumParalellLookups = 2;
/// how many routers to use for lookups
static constexpr size_t NumParallelLookups = 2;
/// how many requests per router
static constexpr size_t RequestsPerLookup = 2;
LogInfo(Name(), " Ensure Path to ", remote.ToString());
MarkAddressOutbound(remote);
@ -997,30 +999,39 @@ namespace llarp
}
}
// filter check for address
if(m_state->m_RemoteLookupFilter.Insert(remote))
return false;
auto& lookups = m_state->m_PendingServiceLookups;
const auto paths =
GetManyPathsWithUniqueEndpoints(this, NumParalellLookups);
GetManyPathsWithUniqueEndpoints(this, NumParallelLookups);
using namespace std::placeholders;
size_t lookedUp = 0;
const dht::Key_t location = remote.ToKey();
uint64_t order = 0;
for(const auto& path : paths)
{
HiddenServiceAddressLookup* job = new HiddenServiceAddressLookup(
this, util::memFn(&Endpoint::OnLookup, this), location,
PubKey{remote.as_array()}, 0, GenTXID());
LogInfo("doing lookup for ", remote, " via ", path->Endpoint(), " at ",
location);
if(job->SendRequestViaPath(path, Router()))
for(size_t count = 0; count < RequestsPerLookup; ++count)
{
lookups.emplace(remote, hook);
lookedUp++;
HiddenServiceAddressLookup* job = new HiddenServiceAddressLookup(
this, util::memFn(&Endpoint::OnLookup, this), location,
PubKey{remote.as_array()}, order, GenTXID());
LogInfo("doing lookup for ", remote, " via ", path->Endpoint(),
" at ", location, " order=", order);
order++;
if(job->SendRequestViaPath(path, Router()))
{
lookups.emplace(remote, hook);
lookedUp++;
}
else
LogError(Name(), " send via path failed for lookup");
}
else
LogError(Name(), " send via path failed for lookup");
}
return lookedUp == NumParalellLookups;
return lookedUp == (NumParallelLookups * RequestsPerLookup);
}
bool

@ -98,6 +98,8 @@ namespace llarp
std::unordered_map< Tag, CachedTagResult, Tag::Hash > m_PrefetchedTags;
util::DecayingHashSet< Address > m_RemoteLookupFilter;
bool
SetOption(const std::string& k, const std::string& v, Endpoint& ep);

@ -88,7 +88,7 @@ namespace llarp
void
EndpointUtil::TickRemoteSessions(llarp_time_t now, Sessions& remoteSessions,
Sessions& deadSessions, ConvoMap & sessions)
Sessions& deadSessions, ConvoMap& sessions)
{
auto itr = remoteSessions.begin();
while(itr != remoteSessions.end())

@ -23,7 +23,7 @@ namespace llarp
static void
TickRemoteSessions(llarp_time_t now, Sessions& remoteSessions,
Sessions& deadSessions, ConvoMap & sessions);
Sessions& deadSessions, ConvoMap& sessions);
static void
ExpireConvoSessions(llarp_time_t now, ConvoMap& sessions);

@ -16,7 +16,6 @@ namespace llarp
, relayOrder(order)
, location(l)
, handle(std::move(h))
, requestsSent(0)
{
}
@ -39,19 +38,7 @@ namespace llarp
if(maybe.has_value())
found = maybe.value();
}
handle(remote, found, endpoint);
requestsSent--;
return requestsSent == 0;
}
bool
HiddenServiceAddressLookup::SendRequestViaPath(path::Path_ptr p,
AbstractRouter* r)
{
if(not IServiceLookup::SendRequestViaPath(p, r))
return false;
requestsSent += 2;
return true;
return handle(remote, found, endpoint);
}
std::shared_ptr< routing::IMessage >

@ -18,7 +18,6 @@ namespace llarp
using HandlerFunc = std::function< bool(
const Address&, nonstd::optional< IntroSet >, const RouterID&) >;
HandlerFunc handle;
size_t requestsSent;
HiddenServiceAddressLookup(Endpoint* p, HandlerFunc h,
const dht::Key_t& location,
@ -27,9 +26,6 @@ namespace llarp
~HiddenServiceAddressLookup() override = default;
bool
SendRequestViaPath(path::Path_ptr p, AbstractRouter* r) override;
bool
HandleResponse(const std::set< EncryptedIntroSet >& results) override;

Loading…
Cancel
Save