* tweak introset handover timeouts

* introset path haodver tweaks
* improve warn/error messages to convey more information
* dont block on queue insertion
* reset convotag on decrypt/verify fail
* add multiple ready hooks on outbound context
* lookup introsets from close routers on dht
* continue to tick dead sessions so they expire their paths
* introset spacing
* reduce lns lookup diversity requirement for speed
* add a function to send reset convotag message
* only have 1 outbound context at a time
pull/1658/head
Jeff Becker 3 years ago
parent 9a1a022d62
commit 8dd1358cc6
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -101,11 +101,16 @@ namespace llarp
{ {
const auto now = llarp::time_now_ms(); const auto now = llarp::time_now_ms();
m_LastIntrosetRegenAttempt = now; m_LastIntrosetRegenAttempt = now;
std::set<Introduction> introset; std::set<Introduction, CompareIntroTimestamp> intros;
if (!GetCurrentIntroductionsWithFilter( if (const auto maybe =
introset, [now](const service::Introduction& intro) -> bool { GetCurrentIntroductionsWithFilter([now](const service::Introduction& intro) -> bool {
return not intro.ExpiresSoon(now, path::min_intro_lifetime); return not intro.ExpiresSoon(
now, path::default_lifetime - path::min_intro_lifetime);
})) }))
{
intros.insert(maybe->begin(), maybe->end());
}
else
{ {
LogWarn( LogWarn(
"could not publish descriptors for endpoint ", "could not publish descriptors for endpoint ",
@ -146,9 +151,10 @@ namespace llarp
} }
introSet().intros.clear(); introSet().intros.clear();
for (auto& intro : introset) for (auto& intro : intros)
{ {
introSet().intros.emplace_back(std::move(intro)); if (introSet().intros.size() < numDesiredPaths)
introSet().intros.emplace_back(std::move(intro));
} }
if (introSet().intros.empty()) if (introSet().intros.empty())
{ {
@ -710,8 +716,10 @@ namespace llarp
return false; return false;
auto next_pub = m_state->m_LastPublishAttempt auto next_pub = m_state->m_LastPublishAttempt
+ (m_state->m_IntroSet.HasExpiredIntros(now) ? INTROSET_PUBLISH_RETRY_INTERVAL + (m_state->m_IntroSet.HasStaleIntros(
: INTROSET_PUBLISH_INTERVAL); now, path::default_lifetime - path::intro_path_spread)
? IntrosetPublishRetryCooldown
: IntrosetPublishInterval);
return now >= next_pub and m_LastIntrosetRegenAttempt + 1s <= now; return now >= next_pub and m_LastIntrosetRegenAttempt + 1s <= now;
} }
@ -739,8 +747,11 @@ namespace llarp
{ {
std::unordered_set<RouterID> exclude; std::unordered_set<RouterID> exclude;
ForEachPath([&exclude](auto path) { exclude.insert(path->Endpoint()); }); ForEachPath([&exclude](auto path) { exclude.insert(path->Endpoint()); });
const auto maybe = m_router->nodedb()->GetRandom( const auto maybe =
[exclude](const auto& rc) -> bool { return exclude.count(rc.pubkey) == 0; }); m_router->nodedb()->GetRandom([exclude, r = m_router](const auto& rc) -> bool {
return exclude.count(rc.pubkey) == 0
and not r->routerProfiling().IsBadForPath(rc.pubkey);
});
if (not maybe.has_value()) if (not maybe.has_value())
return std::nullopt; return std::nullopt;
return GetHopsForBuildWithEndpoint(maybe->pubkey); return GetHopsForBuildWithEndpoint(maybe->pubkey);
@ -758,46 +769,27 @@ namespace llarp
path::Builder::PathBuildStarted(path); path::Builder::PathBuildStarted(path);
} }
constexpr auto MaxOutboundContextPerRemote = 4; constexpr auto MaxOutboundContextPerRemote = 1;
void void
Endpoint::PutNewOutboundContext(const service::IntroSet& introset, llarp_time_t left) Endpoint::PutNewOutboundContext(const service::IntroSet& introset, llarp_time_t left)
{ {
Address addr{introset.addressKeys.Addr()}; const Address addr{introset.addressKeys.Addr()};
auto& remoteSessions = m_state->m_RemoteSessions; auto& remoteSessions = m_state->m_RemoteSessions;
auto& serviceLookups = m_state->m_PendingServiceLookups;
if (remoteSessions.count(addr) >= MaxOutboundContextPerRemote) if (remoteSessions.count(addr) < MaxOutboundContextPerRemote)
{ {
auto itr = remoteSessions.find(addr); remoteSessions.emplace(addr, std::make_shared<OutboundContext>(introset, this));
LogInfo("Created New outbound context for ", addr.ToString());
auto range = serviceLookups.equal_range(addr);
auto i = range.first;
while (i != range.second)
{
itr->second->SetReadyHook(
[callback = i->second, addr](auto session) { callback(addr, session); }, left);
++i;
}
serviceLookups.erase(addr);
return;
} }
auto session = std::make_shared<OutboundContext>(introset, this); auto sessionRange = remoteSessions.equal_range(addr);
remoteSessions.emplace(addr, session); for (auto itr = sessionRange.first; itr != sessionRange.second; ++itr)
LogInfo("Created New outbound context for ", addr.ToString());
// inform pending
auto range = serviceLookups.equal_range(addr);
auto itr = range.first;
if (itr != range.second)
{ {
session->SetReadyHook( itr->second->AddReadyHook(
[callback = itr->second, addr](auto session) { callback(addr, session); }, left); [addr, this](auto session) { InformPathToService(addr, session); }, left);
++itr;
} }
serviceLookups.erase(addr);
} }
void void
@ -924,7 +916,7 @@ namespace llarp
paths.insert(path); paths.insert(path);
}); });
constexpr size_t min_unique_lns_endpoints = 3; constexpr size_t min_unique_lns_endpoints = 2;
// not enough paths // not enough paths
if (paths.size() < min_unique_lns_endpoints) if (paths.size() < min_unique_lns_endpoints)
@ -1066,11 +1058,11 @@ namespace llarp
void void
Endpoint::QueueRecvData(RecvDataEvent ev) Endpoint::QueueRecvData(RecvDataEvent ev)
{ {
if (m_RecvQueue.full() || m_RecvQueue.empty()) if (m_RecvQueue.full() or m_RecvQueue.empty())
{ {
m_router->loop()->call([this] { FlushRecvData(); }); m_router->loop()->call_soon([this] { FlushRecvData(); });
} }
m_RecvQueue.pushBack(std::move(ev)); m_RecvQueue.tryPushBack(std::move(ev));
} }
bool bool
@ -1079,13 +1071,16 @@ namespace llarp
{ {
msg->sender.UpdateAddr(); msg->sender.UpdateAddr();
if (not HasOutboundConvo(msg->sender.Addr())) if (not HasOutboundConvo(msg->sender.Addr()))
{
PutSenderFor(msg->tag, msg->sender, true); PutSenderFor(msg->tag, msg->sender, true);
PutReplyIntroFor(msg->tag, path->intro); }
Introduction intro; Introduction intro{};
intro.pathID = from; intro.pathID = from;
intro.router = PubKey{path->Endpoint()}; intro.router = PubKey{path->Endpoint()};
intro.expiresAt = std::min(path->ExpireTime(), msg->introReply.expiresAt); intro.expiresAt = std::min(path->ExpireTime(), msg->introReply.expiresAt);
intro.latency = path->intro.latency;
PutIntroFor(msg->tag, intro); PutIntroFor(msg->tag, intro);
PutReplyIntroFor(msg->tag, path->intro);
ConvoTagRX(msg->tag); ConvoTagRX(msg->tag);
return ProcessDataMessage(msg); return ProcessDataMessage(msg);
} }
@ -1178,10 +1173,11 @@ namespace llarp
// not applicable because we are not an exit or don't have an endpoint auth policy // not applicable because we are not an exit or don't have an endpoint auth policy
if ((not m_state->m_ExitEnabled) or m_AuthPolicy == nullptr) if ((not m_state->m_ExitEnabled) or m_AuthPolicy == nullptr)
return; return;
ProtocolFrame f; ProtocolFrame f{};
f.R = AuthResultCodeAsInt(result.code); f.R = AuthResultCodeAsInt(result.code);
f.T = tag; f.T = tag;
f.F = path->intro.pathID; f.F = path->intro.pathID;
f.N.Randomize();
if (result.code == AuthResultCode::eAuthAccepted) if (result.code == AuthResultCode::eAuthAccepted)
{ {
ProtocolMessage msg; ProtocolMessage msg;
@ -1189,10 +1185,7 @@ namespace llarp
std::vector<byte_t> reason{}; std::vector<byte_t> reason{};
reason.resize(result.reason.size()); reason.resize(result.reason.size());
std::copy_n(result.reason.c_str(), reason.size(), reason.data()); std::copy_n(result.reason.c_str(), reason.size(), reason.data());
msg.PutBuffer(reason); msg.PutBuffer(reason);
f.N.Randomize();
f.C.Zero();
if (m_AuthPolicy) if (m_AuthPolicy)
msg.proto = ProtocolType::Auth; msg.proto = ProtocolType::Auth;
else else
@ -1234,6 +1227,23 @@ namespace llarp
Sessions().erase(t); Sessions().erase(t);
} }
void
Endpoint::ResetConvoTag(ConvoTag tag, path::Path_ptr p, PathID_t from)
{
// send reset convo tag message
ProtocolFrame f{};
f.R = 1;
f.T = tag;
f.F = p->intro.pathID;
f.Sign(m_Identity);
{
LogWarn("invalidating convotag T=", tag);
RemoveConvoTag(tag);
m_SendQueue.tryPushBack(
SendEvent_t{std::make_shared<routing::PathTransferMessage>(f, from), p});
}
}
bool bool
Endpoint::HandleHiddenServiceFrame(path::Path_ptr p, const ProtocolFrame& frame) Endpoint::HandleHiddenServiceFrame(path::Path_ptr p, const ProtocolFrame& frame)
{ {
@ -1253,23 +1263,7 @@ namespace llarp
} }
if (not frame.AsyncDecryptAndVerify(Router()->loop(), p, m_Identity, this)) if (not frame.AsyncDecryptAndVerify(Router()->loop(), p, m_Identity, this))
{ {
LogError("Failed to decrypt protocol frame"); ResetConvoTag(frame.T, p, frame.F);
if (not frame.C.IsZero())
{
// send reset convo tag message
ProtocolFrame f;
f.R = 1;
f.T = frame.T;
f.F = p->intro.pathID;
f.Sign(m_Identity);
{
LogWarn("invalidating convotag T=", frame.T);
RemoveConvoTag(frame.T);
m_SendQueue.tryPushBack(
SendEvent_t{std::make_shared<routing::PathTransferMessage>(f, frame.F), p});
}
}
} }
return true; return true;
} }
@ -1279,8 +1273,8 @@ namespace llarp
{ {
m_router->routerProfiling().MarkPathTimeout(p.get()); m_router->routerProfiling().MarkPathTimeout(p.get());
ManualRebuild(1); ManualRebuild(1);
RegenAndPublishIntroSet();
path::Builder::HandlePathDied(p); path::Builder::HandlePathDied(p);
RegenAndPublishIntroSet();
} }
bool bool
@ -1294,22 +1288,54 @@ namespace llarp
const Address& addr, const Address& addr,
std::optional<IntroSet> introset, std::optional<IntroSet> introset,
const RouterID& endpoint, const RouterID& endpoint,
llarp_time_t timeLeft) llarp_time_t timeLeft,
uint64_t relayOrder)
{ {
// tell all our existing remote sessions about this introset update
const auto now = Router()->Now(); const auto now = Router()->Now();
auto& fails = m_state->m_ServiceLookupFails;
auto& lookups = m_state->m_PendingServiceLookups; auto& lookups = m_state->m_PendingServiceLookups;
if (introset)
{
auto& sessions = m_state->m_RemoteSessions;
auto range = sessions.equal_range(addr);
auto itr = range.first;
while (itr != range.second)
{
itr->second->OnIntroSetUpdate(addr, introset, endpoint, timeLeft, relayOrder);
// we got a successful lookup
if (itr->second->ReadyToSend() and not introset->IsExpired(now))
{
// inform all lookups
auto lookup_range = lookups.equal_range(addr);
auto i = lookup_range.first;
while (i != lookup_range.second)
{
i->second(addr, itr->second.get());
++i;
}
lookups.erase(addr);
}
++itr;
}
}
auto& fails = m_state->m_ServiceLookupFails;
if (not introset or introset->IsExpired(now)) if (not introset or introset->IsExpired(now))
{ {
LogError(Name(), " failed to lookup ", addr.ToString(), " from ", endpoint); LogError(Name(), " failed to lookup ", addr.ToString(), " from ", endpoint);
fails[endpoint] = fails[endpoint] + 1; fails[endpoint] = fails[endpoint] + 1;
// inform one // inform one if applicable
auto range = lookups.equal_range(addr); // when relay order is non zero we can be pretty sure that it's a fail as when relay order
auto itr = range.first; // is zero that can sometimes yield a fail because it isn't always the closets in keyspace.
if (itr != range.second) if (relayOrder > 0)
{ {
itr->second(addr, nullptr); auto range = lookups.equal_range(addr);
itr = lookups.erase(itr); auto itr = range.first;
if (itr != range.second)
{
itr->second(addr, nullptr);
itr = lookups.erase(itr);
}
} }
return false; return false;
} }
@ -1334,6 +1360,20 @@ namespace llarp
return m_state->m_OutboundSessions.count(addr) > 0; return m_state->m_OutboundSessions.count(addr) > 0;
} }
void
Endpoint::InformPathToService(const Address remote, OutboundContext* ctx)
{
auto& serviceLookups = m_state->m_PendingServiceLookups;
auto range = serviceLookups.equal_range(remote);
auto itr = range.first;
while (itr != range.second)
{
itr->second(remote, ctx);
++itr;
}
serviceLookups.erase(remote);
}
bool bool
Endpoint::EnsurePathToService(const Address remote, PathEnsureHook hook, llarp_time_t timeout) Endpoint::EnsurePathToService(const Address remote, PathEnsureHook hook, llarp_time_t timeout)
{ {
@ -1343,6 +1383,8 @@ namespace llarp
static constexpr size_t RequestsPerLookup = 2; static constexpr size_t RequestsPerLookup = 2;
MarkAddressOutbound(remote); MarkAddressOutbound(remote);
// add response hook to list for address.
m_state->m_PendingServiceLookups.emplace(remote, hook);
auto& sessions = m_state->m_RemoteSessions; auto& sessions = m_state->m_RemoteSessions;
{ {
@ -1352,20 +1394,17 @@ namespace llarp
{ {
if (itr->second->ReadyToSend()) if (itr->second->ReadyToSend())
{ {
hook(remote, itr->second.get()); InformPathToService(remote, itr->second.get());
return true; return true;
} }
++itr; ++itr;
} }
} }
// add response hook to list for address.
m_state->m_PendingServiceLookups.emplace(remote, hook);
/// check replay filter /// check replay filter
if (not m_IntrosetLookupFilter.Insert(remote)) if (not m_IntrosetLookupFilter.Insert(remote))
return true; return true;
const auto paths = GetManyPathsWithUniqueEndpoints(this, NumParallelLookups); const auto paths = GetManyPathsWithUniqueEndpoints(this, NumParallelLookups, remote.ToKey());
using namespace std::placeholders; using namespace std::placeholders;
const dht::Key_t location = remote.ToKey(); const dht::Key_t location = remote.ToKey();
@ -1425,14 +1464,8 @@ namespace llarp
bool bool
Endpoint::EnsurePathToSNode(const RouterID snode, SNodeEnsureHook h) Endpoint::EnsurePathToSNode(const RouterID snode, SNodeEnsureHook h)
{ {
static constexpr size_t MaxConcurrentSNodeSessions = 16;
auto& nodeSessions = m_state->m_SNodeSessions; auto& nodeSessions = m_state->m_SNodeSessions;
if (nodeSessions.size() >= MaxConcurrentSNodeSessions)
{
// a quick client side work arround before we do proper limiting
LogError(Name(), " has too many snode sessions");
return false;
}
using namespace std::placeholders; using namespace std::placeholders;
if (nodeSessions.count(snode) == 0) if (nodeSessions.count(snode) == 0)
{ {
@ -1799,8 +1832,7 @@ namespace llarp
LogError("failed to encrypt and sign"); LogError("failed to encrypt and sign");
return; return;
} }
self->m_SendQueue.pushBack(SendEvent_t{transfer, p}); self->m_SendQueue.tryPushBack(SendEvent_t{transfer, p});
;
}); });
return true; return true;
} }
@ -1883,10 +1915,13 @@ namespace llarp
bool bool
Endpoint::ShouldBuildMore(llarp_time_t now) const Endpoint::ShouldBuildMore(llarp_time_t now) const
{ {
if (not path::Builder::ShouldBuildMore(now)) if (BuildCooldownHit(now))
return false;
const auto requiredPaths = std::max(numDesiredPaths, path::min_intro_paths);
if (NumInStatus(path::ePathBuilding) >= requiredPaths)
return false; return false;
return ((now - lastBuild) > path::intro_path_spread) return NumPathsExistingAt(now + (path::default_lifetime - path::intro_path_spread))
|| NumInStatus(path::ePathEstablished) < path::min_intro_paths; < requiredPaths;
} }
AbstractRouter* AbstractRouter*

@ -49,12 +49,13 @@ namespace llarp
struct OutboundContext; struct OutboundContext;
/// minimum interval for publishing introsets /// minimum interval for publishing introsets
static constexpr auto INTROSET_PUBLISH_INTERVAL = static constexpr auto IntrosetPublishInterval = path::intro_path_spread / 2;
std::chrono::milliseconds(path::default_lifetime) / 4;
static constexpr auto INTROSET_PUBLISH_RETRY_INTERVAL = 5s; /// how agressively should we retry publishing introset on failure
static constexpr auto IntrosetPublishRetryCooldown = 1s;
static constexpr auto INTROSET_LOOKUP_RETRY_COOLDOWN = 3s; /// how aggressively should we retry looking up introsets
static constexpr auto IntrosetLookupCooldown = 250ms;
struct Endpoint : public path::Builder, struct Endpoint : public path::Builder,
public ILookupHolder, public ILookupHolder,
@ -330,6 +331,9 @@ namespace llarp
using SNodeEnsureHook = std::function<void(const RouterID, exit::BaseSession_ptr, ConvoTag)>; using SNodeEnsureHook = std::function<void(const RouterID, exit::BaseSession_ptr, ConvoTag)>;
void
InformPathToService(const Address remote, OutboundContext* ctx);
/// ensure a path to a service node by public key /// ensure a path to a service node by public key
bool bool
EnsurePathToSNode(const RouterID remote, SNodeEnsureHook h); EnsurePathToSNode(const RouterID remote, SNodeEnsureHook h);
@ -415,6 +419,9 @@ namespace llarp
uint64_t uint64_t
GenTXID(); GenTXID();
void
ResetConvoTag(ConvoTag tag, path::Path_ptr path, PathID_t from);
const std::set<RouterID>& const std::set<RouterID>&
SnodeBlacklist() const; SnodeBlacklist() const;

@ -106,6 +106,10 @@ namespace llarp
++itr; ++itr;
} }
} }
for (auto& item : deadSessions)
{
item.second->Tick(now);
}
} }
void void

@ -43,15 +43,31 @@ namespace llarp
template <typename Endpoint_t> template <typename Endpoint_t>
static path::Path::UniqueEndpointSet_t static path::Path::UniqueEndpointSet_t
GetManyPathsWithUniqueEndpoints(Endpoint_t* ep, size_t N, size_t tries = 10) GetManyPathsWithUniqueEndpoints(
Endpoint_t* ep,
size_t N,
std::optional<dht::Key_t> maybeLocation = std::nullopt,
size_t tries = 10)
{ {
std::unordered_set<RouterID> exclude;
path::Path::UniqueEndpointSet_t paths; path::Path::UniqueEndpointSet_t paths;
do do
{ {
--tries; --tries;
const auto path = ep->PickRandomEstablishedPath(); path::Path_ptr path;
if (maybeLocation)
{
path = ep->GetEstablishedPathClosestTo(RouterID{maybeLocation->as_array()}, exclude);
}
else
{
path = ep->PickRandomEstablishedPath();
}
if (path and path->IsReady()) if (path and path->IsReady())
{
paths.emplace(path); paths.emplace(path);
exclude.insert(path->Endpoint());
}
} while (tries > 0 and paths.size() < N); } while (tries > 0 and paths.size() < N);
return paths; return paths;
} }

@ -9,6 +9,7 @@ namespace llarp
{ {
util::StatusObject obj{ util::StatusObject obj{
{"router", router.ToHex()}, {"router", router.ToHex()},
{"path", pathID.ToHex()},
{"expiresAt", to_json(expiresAt)}, {"expiresAt", to_json(expiresAt)},
{"latency", to_json(latency)}, {"latency", to_json(latency)},
{"version", uint64_t(version)}}; {"version", uint64_t(version)}};
@ -66,8 +67,9 @@ namespace llarp
std::ostream& std::ostream&
Introduction::print(std::ostream& stream, int level, int spaces) const Introduction::print(std::ostream& stream, int level, int spaces) const
{ {
const RouterID r{router};
Printer printer(stream, level, spaces); Printer printer(stream, level, spaces);
printer.printAttribute("k", RouterID(router)); printer.printAttribute("k", r.ToString());
printer.printAttribute("l", latency.count()); printer.printAttribute("l", latency.count());
printer.printAttribute("p", pathID); printer.printAttribute("p", pathID);
printer.printAttribute("v", version); printer.printAttribute("v", version);

@ -77,6 +77,16 @@ namespace llarp
{ {
return i.print(out, -1, -1); return i.print(out, -1, -1);
} }
/// comparitor for introset timestamp
struct CompareIntroTimestamp
{
bool
operator()(const Introduction& left, const Introduction& right) const
{
return left.expiresAt > right.expiresAt;
}
};
} // namespace service } // namespace service
} // namespace llarp } // namespace llarp

@ -351,6 +351,15 @@ namespace llarp::service
return false; return false;
} }
bool
IntroSet::HasStaleIntros(llarp_time_t now, llarp_time_t delta) const
{
for (const auto& intro : intros)
if (intro.ExpiresSoon(now, delta))
return true;
return false;
}
bool bool
IntroSet::IsExpired(llarp_time_t now) const IntroSet::IsExpired(llarp_time_t now) const
{ {

@ -69,6 +69,10 @@ namespace llarp
bool bool
HasExpiredIntros(llarp_time_t now) const; HasExpiredIntros(llarp_time_t now) const;
/// return true if any of our intros expires soon given a delta
bool
HasStaleIntros(llarp_time_t now, llarp_time_t delta) const;
bool bool
IsExpired(llarp_time_t now) const; IsExpired(llarp_time_t now) const;

@ -48,6 +48,7 @@ namespace llarp
MarkCurrentIntroBad(Now()); MarkCurrentIntroBad(Now());
ShiftIntroduction(false); ShiftIntroduction(false);
UpdateIntroSet(); UpdateIntroSet();
SwapIntros();
} }
return true; return true;
} }
@ -58,17 +59,18 @@ namespace llarp
: path::Builder{parent->Router(), OutboundContextNumPaths, parent->numHops} : path::Builder{parent->Router(), OutboundContextNumPaths, parent->numHops}
, SendContext{introset.addressKeys, {}, this, parent} , SendContext{introset.addressKeys, {}, this, parent}
, location{introset.addressKeys.Addr().ToKey()} , location{introset.addressKeys.Addr().ToKey()}
, addr{introset.addressKeys.Addr()}
, currentIntroSet{introset} , currentIntroSet{introset}
{ {
updatingIntroSet = false; updatingIntroSet = false;
for (const auto& intro : introset.intros) for (const auto& intro : introset.intros)
{ {
if (intro.expiresAt > m_NextIntro.expiresAt) if (m_NextIntro.latency == 0s or m_NextIntro.latency > intro.latency)
m_NextIntro = intro; m_NextIntro = intro;
} }
currentConvoTag.Randomize(); currentConvoTag.Randomize();
lastShift = Now();
} }
OutboundContext::~OutboundContext() = default; OutboundContext::~OutboundContext() = default;
@ -151,8 +153,8 @@ namespace llarp
} }
if (selectedIntro.router.IsZero() || selectedIntro.ExpiresSoon(now)) if (selectedIntro.router.IsZero() || selectedIntro.ExpiresSoon(now))
return; return;
LogWarn(Name(), " shfiting intro off of ", r, " to ", RouterID(selectedIntro.router));
m_NextIntro = selectedIntro; m_NextIntro = selectedIntro;
lastShift = now;
} }
void void
@ -183,7 +185,7 @@ namespace llarp
p->SetDataHandler(util::memFn(&OutboundContext::HandleHiddenServiceFrame, this)); p->SetDataHandler(util::memFn(&OutboundContext::HandleHiddenServiceFrame, this));
p->SetDropHandler(util::memFn(&OutboundContext::HandleDataDrop, this)); p->SetDropHandler(util::memFn(&OutboundContext::HandleDataDrop, this));
// we now have a path to the next intro, swap intros // we now have a path to the next intro, swap intros
if (p->Endpoint() == m_NextIntro.router or p->Endpoint() == remoteIntro.router) if (p->Endpoint() == m_NextIntro.router)
SwapIntros(); SwapIntros();
else else
{ {
@ -194,7 +196,7 @@ namespace llarp
void void
OutboundContext::AsyncGenIntro(const llarp_buffer_t& payload, ProtocolType t) OutboundContext::AsyncGenIntro(const llarp_buffer_t& payload, ProtocolType t)
{ {
if (sentIntro) if (generatedIntro)
return; return;
if (remoteIntro.router.IsZero()) if (remoteIntro.router.IsZero())
{ {
@ -202,21 +204,14 @@ namespace llarp
return; return;
} }
auto path = m_PathSet->GetPathByRouter(remoteIntro.router); auto path = GetPathByRouter(remoteIntro.router);
if (path == nullptr) if (path == nullptr)
{ {
// try parent as fallback LogError(Name(), " has no path to ", remoteIntro.router, " when we should have had one");
path = m_Endpoint->GetPathByRouter(remoteIntro.router); return;
if (path == nullptr)
{
if (!BuildCooldownHit(Now()))
BuildOneAlignedTo(remoteIntro.router);
LogWarn(Name(), " dropping intro frame, no path to ", remoteIntro.router);
return;
}
} }
sentIntro = true;
auto frame = std::make_shared<ProtocolFrame>(); auto frame = std::make_shared<ProtocolFrame>();
frame->Clear();
auto ex = std::make_shared<AsyncKeyExchange>( auto ex = std::make_shared<AsyncKeyExchange>(
m_Endpoint->Loop(), m_Endpoint->Loop(),
remoteIdent, remoteIdent,
@ -228,22 +223,28 @@ namespace llarp
t); t);
ex->hook = [self = shared_from_this(), path](auto frame) { ex->hook = [self = shared_from_this(), path](auto frame) {
self->Send(std::move(frame), path); if (not self->Send(std::move(frame), path))
return;
self->m_Endpoint->Loop()->call_later(100ms, [self]() { self->sentIntro = true; });
}; };
ex->msg.PutBuffer(payload); ex->msg.PutBuffer(payload);
ex->msg.introReply = path->intro; ex->msg.introReply = path->intro;
frame->F = ex->msg.introReply.pathID; frame->F = ex->msg.introReply.pathID;
frame->R = 0; frame->R = 0;
generatedIntro = true;
// ensure we have a sender put for this convo tag
m_DataHandler->PutSenderFor(currentConvoTag, currentIntroSet.addressKeys, false);
// encrypt frame async
m_Endpoint->Router()->QueueWork([ex, frame] { return AsyncKeyExchange::Encrypt(ex, frame); }); m_Endpoint->Router()->QueueWork([ex, frame] { return AsyncKeyExchange::Encrypt(ex, frame); });
LogInfo("send intro frame");
LogInfo(Name(), " send intro frame T=", currentConvoTag);
} }
std::string std::string
OutboundContext::Name() const OutboundContext::Name() const
{ {
return "OBContext:" + m_Endpoint->Name() + "-" return "OBContext:" + currentIntroSet.addressKeys.Addr().ToString();
+ currentIntroSet.addressKeys.Addr().ToString();
} }
void void
@ -255,10 +256,9 @@ namespace llarp
return; return;
LogInfo(Name(), " updating introset"); LogInfo(Name(), " updating introset");
m_LastIntrosetUpdateAt = now; m_LastIntrosetUpdateAt = now;
const auto addr = currentIntroSet.addressKeys.Addr();
// we want to use the parent endpoint's paths because outbound context // we want to use the parent endpoint's paths because outbound context
// does not implement path::PathSet::HandleGotIntroMessage // does not implement path::PathSet::HandleGotIntroMessage
const auto paths = GetManyPathsWithUniqueEndpoints(m_Endpoint, 2); const auto paths = GetManyPathsWithUniqueEndpoints(m_Endpoint, 2, location);
uint64_t relayOrder = 0; uint64_t relayOrder = 0;
for (const auto& path : paths) for (const auto& path : paths)
{ {
@ -291,7 +291,7 @@ namespace llarp
obj["seqno"] = sequenceNo; obj["seqno"] = sequenceNo;
obj["markedBad"] = markedBad; obj["markedBad"] = markedBad;
obj["lastShift"] = to_json(lastShift); obj["lastShift"] = to_json(lastShift);
obj["remoteIdentity"] = remoteIdent.Addr().ToString(); obj["remoteIdentity"] = addr.ToString();
obj["currentRemoteIntroset"] = currentIntroSet.ExtractStatus(); obj["currentRemoteIntroset"] = currentIntroSet.ExtractStatus();
obj["nextIntro"] = m_NextIntro.ExtractStatus(); obj["nextIntro"] = m_NextIntro.ExtractStatus();
obj["readyToSend"] = ReadyToSend(); obj["readyToSend"] = ReadyToSend();
@ -318,32 +318,31 @@ namespace llarp
if (m_LookupFails > 16 || m_BuildFails > 10) if (m_LookupFails > 16 || m_BuildFails > 10)
return true; return true;
constexpr auto InboundTrafficTimeout = 5s;
if (ReadyToSend() and remoteIntro.router.IsZero()) if (ReadyToSend() and remoteIntro.router.IsZero())
{ {
SwapIntros(); SwapIntros();
} }
if (m_GotInboundTraffic and m_LastInboundTraffic + InboundTrafficTimeout <= now) if ((remoteIntro.router.IsZero() or m_BadIntros.count(remoteIntro))
and GetPathByRouter(m_NextIntro.router))
SwapIntros();
if (m_GotInboundTraffic and m_LastInboundTraffic + sendTimeout <= now)
{ {
if (std::chrono::abs(now - lastGoodSend) < InboundTrafficTimeout) // timeout on other side
{ UpdateIntroSet();
// timeout on other side MarkCurrentIntroBad(now);
MarkCurrentIntroBad(now); ShiftIntroRouter(remoteIntro.router);
}
} }
// check for stale intros
// check for expiration // update the introset if we think we need to
if (remoteIntro.ExpiresSoon(now)) if (currentIntroSet.HasStaleIntros(now, path::intro_path_spread))
{ {
UpdateIntroSet(); UpdateIntroSet();
// shift intro if it expires "soon"
if (ShiftIntroduction())
SwapIntros(); // swap intros if we shifted
} }
// lookup router in intro if set and unknown // lookup router in intro if set and unknown
m_Endpoint->EnsureRouterIsKnown(remoteIntro.router); if (not m_NextIntro.router.IsZero())
m_Endpoint->EnsureRouterIsKnown(m_NextIntro.router);
// expire bad intros // expire bad intros
auto itr = m_BadIntros.begin(); auto itr = m_BadIntros.begin();
while (itr != m_BadIntros.end()) while (itr != m_BadIntros.end())
@ -354,7 +353,7 @@ namespace llarp
++itr; ++itr;
} }
if (ReadyToSend() and m_ReadyHook) if (ReadyToSend() and not m_ReadyHooks.empty())
{ {
const auto path = GetPathByRouter(remoteIntro.router); const auto path = GetPathByRouter(remoteIntro.router);
if (not path) if (not path)
@ -362,32 +361,40 @@ namespace llarp
LogWarn(Name(), " ready but no path to ", remoteIntro.router, " ???"); LogWarn(Name(), " ready but no path to ", remoteIntro.router, " ???");
return true; return true;
} }
m_ReadyHook(this); for (const auto& hook : m_ReadyHooks)
m_ReadyHook = nullptr; hook(this);
m_ReadyHooks.clear();
} }
if (lastGoodSend > 0s and now >= lastGoodSend + (sendTimeout / 2)) const auto timeout = std::min(lastGoodSend, m_LastInboundTraffic);
if (lastGoodSend > 0s and now >= timeout + (sendTimeout / 2))
{ {
// send a keep alive to keep this session alive // send a keep alive to keep this session alive
KeepAlive(); KeepAlive();
} }
// if we are dead return true so we are removed // if we are dead return true so we are removed
return lastGoodSend > 0s ? (now >= lastGoodSend && now - lastGoodSend > sendTimeout) return timeout > 0s ? (now >= timeout && now - timeout > sendTimeout)
: (now >= createdAt && now - createdAt > connectTimeout); : (now >= createdAt && now - createdAt > connectTimeout);
} }
void void
OutboundContext::SetReadyHook(std::function<void(OutboundContext*)> hook, llarp_time_t timeout) OutboundContext::AddReadyHook(std::function<void(OutboundContext*)> hook, llarp_time_t timeout)
{ {
if (m_ReadyHook) if (ReadyToSend())
{
hook(this);
return; return;
m_ReadyHook = hook; }
m_router->loop()->call_later(timeout, [this]() { if (m_ReadyHooks.empty())
if (m_ReadyHook) {
m_ReadyHook(nullptr); m_router->loop()->call_later(timeout, [this]() {
m_ReadyHook = nullptr; LogWarn(Name(), " did not obtain session in time");
}); for (const auto& hook : m_ReadyHooks)
hook(nullptr);
m_ReadyHooks.clear();
});
}
m_ReadyHooks.push_back(hook);
} }
std::optional<std::vector<RouterContact>> std::optional<std::vector<RouterContact>>
@ -405,16 +412,22 @@ namespace llarp
bool bool
OutboundContext::ShouldBuildMore(llarp_time_t now) const OutboundContext::ShouldBuildMore(llarp_time_t now) const
{ {
if (markedBad || not path::Builder::ShouldBuildMore(now)) if (markedBad or path::Builder::BuildCooldownHit(now))
return false; return false;
if (NumInStatus(path::ePathBuilding) >= numDesiredPaths) if (NumInStatus(path::ePathBuilding) >= numDesiredPaths)
return false; return false;
llarp_time_t t = 0s;
ForEachPath([&t](path::Path_ptr path) { if (m_BadIntros.count(remoteIntro))
if (path->IsReady()) return true;
t = std::max(path->ExpireTime(), t);
size_t numValidPaths = 0;
ForEachPath([now, &numValidPaths](path::Path_ptr path) {
if (not path->IsReady())
return;
if (not path->intro.ExpiresSoon(now, path::default_lifetime - path::intro_path_spread))
numValidPaths++;
}); });
return t >= now + path::default_lifetime / 4; return numValidPaths < numDesiredPaths;
} }
void void
@ -430,12 +443,24 @@ namespace llarp
m_BadIntros[intro] = now; m_BadIntros[intro] = now;
} }
bool
OutboundContext::IntroSent() const
{
return sentIntro;
}
bool
OutboundContext::IntroGenerated() const
{
return sentIntro;
}
bool bool
OutboundContext::ShiftIntroduction(bool rebuild) OutboundContext::ShiftIntroduction(bool rebuild)
{ {
bool success = false; bool success = false;
auto now = Now(); const auto now = Now();
if (now - lastShift < MIN_SHIFT_INTERVAL) if (abs(now - lastShift) < shiftTimeout)
return false; return false;
bool shifted = false; bool shifted = false;
std::vector<Introduction> intros = currentIntroSet.intros; std::vector<Introduction> intros = currentIntroSet.intros;
@ -496,7 +521,7 @@ namespace llarp
{ {
// unconditionally update introset // unconditionally update introset
UpdateIntroSet(); UpdateIntroSet();
const RouterID endpoint(path->Endpoint()); const RouterID endpoint{path->Endpoint()};
// if a path to our current intro died... // if a path to our current intro died...
if (endpoint == remoteIntro.router) if (endpoint == remoteIntro.router)
{ {
@ -506,50 +531,13 @@ namespace llarp
if (p->Endpoint() == endpoint && p->IsReady()) if (p->Endpoint() == endpoint && p->IsReady())
++num; ++num;
}); });
// if we have more than two then we are probably fine if (num == 0)
if (num > 2)
return;
// if we have one working one ...
if (num == 1)
{
num = 0;
ForEachPath([&](const path::Path_ptr& p) {
if (p->Endpoint() == endpoint)
++num;
});
// if we have 2 or more established or pending don't do anything
if (num > 2)
return;
BuildOneAlignedTo(endpoint);
}
else if (num == 0)
{ {
// we have no paths to this router right now // we have no more paths to this endpoint so we want to pivot off of it
// hop off it MarkCurrentIntroBad(Now());
Introduction picked; ShiftIntroRouter(endpoint);
// get the latest intro that isn't on that endpoint if (m_NextIntro.router != endpoint)
for (const auto& intro : currentIntroSet.intros) BuildOneAlignedTo(m_NextIntro.router);
{
if (intro.router == endpoint)
continue;
if (intro.expiresAt > picked.expiresAt)
picked = intro;
}
// we got nothing
if (picked.router.IsZero())
{
return;
}
m_NextIntro = picked;
// check if we have a path to this router
num = 0;
ForEachPath([&](const path::Path_ptr& p) {
// don't count timed out paths
if (p->Status() != path::ePathTimeout && p->Endpoint() == m_NextIntro.router)
++num;
});
// build a path if one isn't already pending build or established
BuildOneAlignedTo(m_NextIntro.router);
} }
} }
} }

@ -57,7 +57,7 @@ namespace llarp
/// shift the intro off the current router it is using /// shift the intro off the current router it is using
void void
ShiftIntroRouter(const RouterID remote); ShiftIntroRouter(const RouterID remote) override;
/// mark the current remote intro as bad /// mark the current remote intro as bad
void void
@ -71,7 +71,7 @@ namespace llarp
ReadyToSend() const; ReadyToSend() const;
void void
SetReadyHook(std::function<void(OutboundContext*)> readyHook, llarp_time_t timeout); AddReadyHook(std::function<void(OutboundContext*)> readyHook, llarp_time_t timeout);
/// for exits /// for exits
void void
@ -129,19 +129,26 @@ namespace llarp
llarp_time_t llarp_time_t
RTT() const; RTT() const;
bool
OnIntroSetUpdate(
const Address& addr,
std::optional<IntroSet> i,
const RouterID& endpoint,
llarp_time_t,
uint64_t relayOrder);
private: private:
/// swap remoteIntro with next intro /// swap remoteIntro with next intro
void void
SwapIntros(); SwapIntros();
void
OnGeneratedIntroFrame(AsyncKeyExchange* k, PathID_t p);
bool bool
OnIntroSetUpdate( IntroGenerated() const override;
const Address& addr, std::optional<IntroSet> i, const RouterID& endpoint, llarp_time_t); bool
IntroSent() const override;
const dht::Key_t location; const dht::Key_t location;
const Address addr;
uint64_t m_UpdateIntrosetTX = 0; uint64_t m_UpdateIntrosetTX = 0;
IntroSet currentIntroSet; IntroSet currentIntroSet;
Introduction m_NextIntro; Introduction m_NextIntro;
@ -151,8 +158,9 @@ namespace llarp
uint16_t m_BuildFails = 0; uint16_t m_BuildFails = 0;
llarp_time_t m_LastInboundTraffic = 0s; llarp_time_t m_LastInboundTraffic = 0s;
bool m_GotInboundTraffic = false; bool m_GotInboundTraffic = false;
bool generatedIntro = false;
bool sentIntro = false; bool sentIntro = false;
std::function<void(OutboundContext*)> m_ReadyHook; std::vector<std::function<void(OutboundContext*)>> m_ReadyHooks;
llarp_time_t m_LastIntrosetUpdateAt = 0s; llarp_time_t m_LastIntrosetUpdateAt = 0s;
}; };
} // namespace service } // namespace service

@ -458,15 +458,21 @@ namespace llarp
} }
}; };
handler->Router()->QueueWork( handler->Router()->QueueWork(
[v, msg = std::move(msg), recvPath = std::move(recvPath), callback]() { [v, msg = std::move(msg), recvPath = std::move(recvPath), callback, handler]() {
auto resetTag = [handler, tag = v->frame.T, from = v->frame.F, path = recvPath]() {
handler->ResetConvoTag(tag, path, from);
};
if (not v->frame.Verify(v->si)) if (not v->frame.Verify(v->si))
{ {
LogError("Signature failure from ", v->si.Addr()); LogError("Signature failure from ", v->si.Addr());
handler->Loop()->call_soon(resetTag);
return; return;
} }
if (not v->frame.DecryptPayloadInto(v->shared, *msg)) if (not v->frame.DecryptPayloadInto(v->shared, *msg))
{ {
LogError("failed to decrypt message"); LogError("failed to decrypt message from ", v->si.Addr());
handler->Loop()->call_soon(resetTag);
return; return;
} }
callback(msg); callback(msg);

@ -96,7 +96,7 @@ namespace llarp
version = other.version; version = other.version;
} }
ProtocolFrame() : routing::IMessage() ProtocolFrame() : routing::IMessage{}
{ {
Clear(); Clear();
} }

@ -26,13 +26,15 @@ namespace llarp
bool bool
SendContext::Send(std::shared_ptr<ProtocolFrame> msg, path::Path_ptr path) SendContext::Send(std::shared_ptr<ProtocolFrame> msg, path::Path_ptr path)
{ {
if (not path->IsReady())
return false;
if (m_SendQueue.empty() or m_SendQueue.full()) if (m_SendQueue.empty() or m_SendQueue.full())
{ {
m_Endpoint->Loop()->call([this] { FlushUpstream(); }); m_Endpoint->Loop()->call_soon([this] { FlushUpstream(); });
} }
m_SendQueue.pushBack(std::make_pair( return m_SendQueue.tryPushBack(std::make_pair(
std::make_shared<routing::PathTransferMessage>(*msg, remoteIntro.pathID), path)); std::make_shared<routing::PathTransferMessage>(*msg, remoteIntro.pathID), path))
return true; == thread::QueueReturn::Success;
} }
void void
@ -84,13 +86,15 @@ namespace llarp
auto path = m_PathSet->GetPathByRouter(remoteIntro.router); auto path = m_PathSet->GetPathByRouter(remoteIntro.router);
if (!path) if (!path)
{ {
LogWarn(m_Endpoint->Name(), " cannot encrypt and send: no path for intro ", remoteIntro); ShiftIntroRouter(remoteIntro.router);
LogWarn(m_PathSet->Name(), " cannot encrypt and send: no path for intro ", remoteIntro);
return; return;
} }
if (!m_DataHandler->GetCachedSessionKeyFor(f->T, shared)) if (!m_DataHandler->GetCachedSessionKeyFor(f->T, shared))
{ {
LogWarn(m_Endpoint->Name(), " has no cached session key on session T=", f->T); LogWarn(
m_PathSet->Name(), " could not send, has no cached session key on session T=", f->T);
return; return;
} }
@ -104,7 +108,7 @@ namespace llarp
} }
else else
{ {
LogWarn(m_Endpoint->Name(), " no session T=", f->T); LogWarn(m_PathSet->Name(), " could not get sequence number for session T=", f->T);
return; return;
} }
m->introReply = path->intro; m->introReply = path->intro;
@ -115,7 +119,7 @@ namespace llarp
m_Endpoint->Router()->QueueWork([f, m, shared, path, this] { m_Endpoint->Router()->QueueWork([f, m, shared, path, this] {
if (not f->EncryptAndSign(*m, shared, m_Endpoint->GetIdentity())) if (not f->EncryptAndSign(*m, shared, m_Endpoint->GetIdentity()))
{ {
LogError(m_Endpoint->Name(), " failed to sign message"); LogError(m_PathSet->Name(), " failed to sign message");
return; return;
} }
Send(f, path); Send(f, path);
@ -140,11 +144,15 @@ namespace llarp
void void
SendContext::AsyncEncryptAndSendTo(const llarp_buffer_t& data, ProtocolType protocol) SendContext::AsyncEncryptAndSendTo(const llarp_buffer_t& data, ProtocolType protocol)
{ {
if (lastGoodSend != 0s) if (IntroSent())
{ {
EncryptAndSendTo(data, protocol); EncryptAndSendTo(data, protocol);
return; return;
} }
// have we generated the initial intro but not sent it yet? bail here so we don't cause
// bullshittery
if (IntroGenerated() and not IntroSent())
return;
const auto maybe = m_Endpoint->MaybeGetAuthInfoForEndpoint(remoteIdent.Addr()); const auto maybe = m_Endpoint->MaybeGetAuthInfoForEndpoint(remoteIdent.Addr());
if (maybe.has_value()) if (maybe.has_value())
{ {

@ -44,8 +44,9 @@ namespace llarp
uint64_t sequenceNo = 0; uint64_t sequenceNo = 0;
llarp_time_t lastGoodSend = 0s; llarp_time_t lastGoodSend = 0s;
const llarp_time_t createdAt; const llarp_time_t createdAt;
llarp_time_t sendTimeout = 40s; llarp_time_t sendTimeout = path::build_timeout * 2;
llarp_time_t connectTimeout = 60s; llarp_time_t connectTimeout = path::build_timeout * 4;
llarp_time_t shiftTimeout = (path::build_timeout * 5) / 2;
llarp_time_t estimatedRTT = 0s; llarp_time_t estimatedRTT = 0s;
bool markedBad = false; bool markedBad = false;
using Msg_ptr = std::shared_ptr<routing::PathTransferMessage>; using Msg_ptr = std::shared_ptr<routing::PathTransferMessage>;
@ -62,6 +63,9 @@ namespace llarp
return true; return true;
} }
virtual void
ShiftIntroRouter(const RouterID) = 0;
virtual void virtual void
UpdateIntroSet() = 0; UpdateIntroSet() = 0;
@ -72,6 +76,11 @@ namespace llarp
AsyncSendAuth(std::function<void(AuthResult)> replyHandler); AsyncSendAuth(std::function<void(AuthResult)> replyHandler);
private: private:
virtual bool
IntroGenerated() const = 0;
virtual bool
IntroSent() const = 0;
void void
EncryptAndSendTo(const llarp_buffer_t& payload, ProtocolType t); EncryptAndSendTo(const llarp_buffer_t& payload, ProtocolType t);

Loading…
Cancel
Save