lokinet/llarp/service/outbound_context.cpp
dr7ana 41312abab0 introset and message transmission underway
- message handling through classes that inherit from PathSet
- cleanups around link_manager
- etc etc
2023-10-24 08:40:18 -07:00

577 lines
16 KiB
C++

#include "outbound_context.hpp"
#include "async_key_exchange.hpp"
#include "endpoint.hpp"
#include "endpoint_util.hpp"
#include "protocol_type.hpp"
#include <llarp/router/router.hpp>
#include <llarp/nodedb.hpp>
#include <llarp/profiling.hpp>
#include <llarp/util/meta/memfn.hpp>
#include <random>
#include <algorithm>
namespace llarp::service
{
bool
OutboundContext::Stop()
{
marked_bad = true;
return path::Builder::Stop();
}
bool
OutboundContext::IsDone(std::chrono::milliseconds now) const
{
(void)now;
return AvailablePaths(path::ePathRoleAny) == 0 && ShouldRemove();
}
constexpr auto OutboundContextNumPaths = 4;
OutboundContext::OutboundContext(const IntroSet& introset, Endpoint* parent)
: path::Builder{parent->router(), OutboundContextNumPaths, parent->numHops}
, ep{*parent}
, current_intro{introset}
, location{current_intro.address_keys.Addr().ToKey()}
, addr{current_intro.address_keys.Addr()}
, remote_identity{current_intro.address_keys}
, created_at{ep.Now()}
{
assert(not introset.intros.empty());
updatingIntroSet = false;
// pick random first intro
auto it = introset.intros.begin();
if (introset.intros.size() > 1)
{
CSRNG rng{};
it += std::uniform_int_distribution<size_t>{0, introset.intros.size() - 1}(rng);
}
next_intro = *it;
current_tag.Randomize();
last_shift = Now();
// add send and connect timeouts to the parent endpoints path alignment timeout
// this will make it so that there is less of a chance for timing races
send_timeout += parent->PathAlignmentTimeout();
connect_timeout += parent->PathAlignmentTimeout();
}
OutboundContext::~OutboundContext() = default;
/// actually swap intros
void
OutboundContext::swap_intros()
{
if (remote_intro != next_intro)
{
remote_intro = next_intro;
ep.PutSenderFor(current_tag, current_intro.address_keys, false);
ep.PutIntroFor(current_tag, remote_intro);
ShiftIntroRouter(next_intro.router);
// if we have not made a handshake to the remote endpoint do so
if (not generated_intro)
{
KeepAlive();
}
}
}
Address
OutboundContext::Addr() const
{
return addr;
}
bool
OutboundContext::OnIntroSetUpdate(
const Address&,
std::optional<IntroSet> foundIntro,
const RouterID& endpoint,
std::chrono::milliseconds,
uint64_t relayOrder)
{
if (marked_bad)
return true;
updatingIntroSet = false;
if (foundIntro)
{
if (foundIntro->time_signed == 0s)
{
LogWarn(Name(), " got introset with zero timestamp: ", *foundIntro);
return true;
}
if (current_intro.time_signed > foundIntro->time_signed)
{
LogInfo("introset is old, dropping");
return true;
}
const std::chrono::milliseconds now = Now();
if (foundIntro->IsExpired(now))
{
LogError("got expired introset from lookup from ", endpoint);
return true;
}
current_intro = *foundIntro;
ShiftIntroRouter(RouterID{});
}
else if (relayOrder > 0)
{
++lookup_fails;
LogWarn(Name(), " failed to look up introset, fails=", lookup_fails);
}
return true;
}
bool
OutboundContext::ReadyToSend() const
{
if (marked_bad)
return false;
if (remote_intro.router.IsZero())
return false;
return sent_intro and GetPathByRouter(remote_intro.router);
}
void
OutboundContext::ShiftIntroRouter(const RouterID r)
{
const auto now = Now();
Introduction selectedIntro{};
for (const auto& intro : current_intro.intros)
{
if (intro.expiry > selectedIntro.expiry and intro.router != r)
{
selectedIntro = intro;
}
}
if (selectedIntro.router.IsZero() || selectedIntro.ExpiresSoon(now))
return;
next_intro = selectedIntro;
last_shift = now;
}
void
OutboundContext::HandlePathBuildTimeout(path::Path_ptr p)
{
ShiftIntroRouter(p->Endpoint());
path::Builder::HandlePathBuildTimeout(p);
}
void
OutboundContext::HandlePathBuildFailedAt(path::Path_ptr p, RouterID hop)
{
if (p->Endpoint() == hop)
{
// shift intro when we fail at the pivot
ShiftIntroRouter(p->Endpoint());
}
path::Builder::HandlePathBuildFailedAt(p, hop);
}
void
OutboundContext::HandlePathBuilt(path::Path_ptr p)
{
path::Builder::HandlePathBuilt(p);
// p->SetDataHandler([self = weak_from_this()](auto path, auto frame) {
// if (auto ptr = self.lock())
// return ptr->HandleHiddenServiceFrame(path, frame);
// return false;
// });
// p->SetDropHandler([self = weak_from_this()](auto path, auto id, auto seqno) {
// if (auto ptr = self.lock())
// return ptr->HandleDataDrop(path, id, seqno);
// return false;
// });
if (marked_bad)
{
// ignore new path if we are marked dead
LogInfo(Name(), " marked bad, ignoring new path");
p->EnterState(path::ePathIgnore, Now());
}
else if (p->Endpoint() == next_intro.router)
{
// we now have a path to the next intro, swap intros
swap_intros();
}
}
void
OutboundContext::AsyncGenIntro(const llarp_buffer_t& payload, ProtocolType t)
{
if (generated_intro)
{
LogWarn(Name(), " dropping packet as we are not fully handshaked right now");
return;
}
if (remote_intro.router.IsZero())
{
LogWarn(Name(), " dropping intro frame we have no intro ready yet");
return;
}
auto path = GetPathByRouter(remote_intro.router);
if (path == nullptr)
{
LogError(Name(), " has no path to ", remote_intro.router, " when we should have had one");
return;
}
auto frame = std::make_shared<ProtocolFrameMessage>();
frame->clear();
auto ex = std::make_shared<AsyncKeyExchange>(
ep.Loop(),
remote_identity,
ep.GetIdentity(),
current_intro.sntru_pubkey,
remote_intro,
ep,
current_tag,
t);
ex->hook = [self = shared_from_this(), path](auto frame) {
if (not self->Send(std::move(frame), path))
return;
self->ep.Loop()->call_later(
self->remote_intro.latency, [self]() { self->sent_intro = true; });
};
ex->msg.PutBuffer(payload);
ex->msg.introReply = path->intro;
frame->path_id = ex->msg.introReply.path_id;
frame->flag = 0;
generated_intro = true;
// ensure we have a sender put for this convo tag
ep.PutSenderFor(current_tag, current_intro.address_keys, false);
// encrypt frame async
ep.router()->queue_work(
[ex, frame] { return AsyncKeyExchange::Encrypt(ex, frame); });
LogInfo(Name(), " send intro frame T=", current_tag);
}
std::string
OutboundContext::Name() const
{
return "OBContext:" + current_intro.address_keys.Addr().ToString();
}
void
OutboundContext::UpdateIntroSet()
{
constexpr auto IntrosetUpdateInterval = 10s;
const auto now = Now();
if (updatingIntroSet or marked_bad or now < last_introset_update + IntrosetUpdateInterval)
return;
log::info(link_cat, "{} updating introset", Name());
last_introset_update = now;
// we want to use the parent endpoint's paths because outbound context
// does not implement path::PathSet::HandleGotIntroMessage
const auto paths = GetManyPathsWithUniqueEndpoints(&ep, 2, location);
[[maybe_unused]] uint64_t relayOrder = 0;
for ([[maybe_unused]] const auto& path : paths)
{
// TODO: implement this
// HiddenServiceAddressLookup* job = new HiddenServiceAddressLookup(
// m_Endpoint,
// util::memFn(&OutboundContext::OnIntroSetUpdate, shared_from_this()),
// location,
// PubKey{addr.as_array()},
// path->Endpoint(),
// relayOrder,
// m_Endpoint->GenTXID(),
// (IntrosetUpdateInterval / 2) + (2 * path->intro.latency) +
// IntrosetLookupGraceInterval);
// relayOrder++;
// if (job->SendRequestViaPath(path, m_Endpoint->router()))
// updatingIntroSet = true;
}
}
util::StatusObject
OutboundContext::ExtractStatus() const
{
auto obj = path::Builder::ExtractStatus();
obj["current_tag"] = current_tag.ToHex();
obj["remote_intro"] = remote_intro.ExtractStatus();
obj["session_created"] = to_json(created_at);
obj["last_send"] = to_json(last_send);
obj["lastRecv"] = to_json(last_inbound_traffic);
obj["lastIntrosetUpdate"] = to_json(last_introset_update);
obj["marked_bad"] = marked_bad;
obj["last_shift"] = to_json(last_shift);
obj["remote_identityity"] = addr.ToString();
obj["currentRemote_introset"] = current_intro.ExtractStatus();
obj["nextIntro"] = next_intro.ExtractStatus();
obj["readyToSend"] = ReadyToSend();
return obj;
}
void
OutboundContext::KeepAlive()
{
ustring buf(64, '\0');
CryptoManager::instance()->randomize(buf.data(), buf.size());
SendPacketToRemote(buf, ProtocolType::Control);
last_keep_alive = Now();
}
bool
OutboundContext::Pump(std::chrono::milliseconds now)
{
if (ReadyToSend() and remote_intro.router.IsZero())
{
swap_intros();
}
if (ReadyToSend())
{
// if we dont have a cached session key after sending intro we are in a fugged state so
// expunge
SharedSecret discardme;
if (not ep.GetCachedSessionKeyFor(current_tag, discardme))
{
LogError(Name(), " no cached key after sending intro, we are in a fugged state, oh no");
return true;
}
}
if (got_inbound_traffic and last_inbound_traffic + send_timeout <= now)
{
// timeout on other side
UpdateIntroSet();
MarkCurrentIntroBad(now);
ShiftIntroRouter(remote_intro.router);
}
// check for stale intros
// update the introset if we think we need to
if (current_intro.HasStaleIntros(now, path::INTRO_PATH_SPREAD)
or remote_intro.ExpiresSoon(now, path::INTRO_PATH_SPREAD))
{
UpdateIntroSet();
ShiftIntroduction(false);
}
if (ReadyToSend())
{
if (not remote_intro.router.IsZero() and not GetPathByRouter(remote_intro.router))
{
// pick another good intro if we have no path on our current intro
std::vector<Introduction> otherIntros;
ForEachPath([now, router = remote_intro.router, &otherIntros](auto path) {
if (path and path->IsReady() and path->Endpoint() != router
and not path->ExpiresSoon(now, path::INTRO_PATH_SPREAD))
{
otherIntros.emplace_back(path->intro);
}
});
if (not otherIntros.empty())
{
std::shuffle(otherIntros.begin(), otherIntros.end(), CSRNG{});
remote_intro = otherIntros[0];
}
}
}
// lookup router in intro if set and unknown
if (not next_intro.router.IsZero())
ep.EnsureRouterIsKnown(next_intro.router);
if (ReadyToSend() and not ready_hooks.empty())
{
const auto path = GetPathByRouter(remote_intro.router);
if (not path)
{
LogWarn(Name(), " ready but no path to ", remote_intro.router, " ???");
return true;
}
}
const auto timeout = std::max(last_send, last_inbound_traffic);
if (last_send > 0s and now >= timeout + (send_timeout / 2))
{
// send a keep alive to keep this session alive
KeepAlive();
if (marked_bad)
{
LogWarn(Name(), " keepalive timeout hit");
return true;
}
}
// check for half open state where we can send but we get nothing back
if (last_inbound_traffic == 0s and now - created_at > connect_timeout)
{
LogWarn(Name(), " half open state, we can send but we got nothing back");
return true;
}
// if we are dead return true so we are removed
const bool removeIt = timeout > 0s ? (now >= timeout && now - timeout > send_timeout)
: (now >= created_at && now - created_at > connect_timeout);
if (removeIt)
{
LogInfo(Name(), " session is stale");
return true;
}
return false;
}
std::optional<std::vector<RouterContact>>
OutboundContext::GetHopsForBuild()
{
if (next_intro.router.IsZero())
{
ShiftIntroduction(false);
}
if (next_intro.router.IsZero())
return std::nullopt;
return GetHopsAlignedToForBuild(next_intro.router, ep.SnodeBlacklist());
}
bool
OutboundContext::ShouldBuildMore(std::chrono::milliseconds now) const
{
if (marked_bad or path::Builder::BuildCooldownHit(now))
return false;
if (NumInStatus(path::ePathBuilding) >= std::max(numDesiredPaths / size_t{2}, size_t{1}))
return false;
size_t numValidPaths = 0;
bool havePathToNextIntro = false;
ForEachPath([now, this, &havePathToNextIntro, &numValidPaths](path::Path_ptr path) {
if (not path->IsReady())
return;
if (not path->intro.ExpiresSoon(now, path::DEFAULT_LIFETIME - path::INTRO_PATH_SPREAD))
{
numValidPaths++;
if (path->intro.router == next_intro.router)
havePathToNextIntro = true;
}
});
return numValidPaths < numDesiredPaths or not havePathToNextIntro;
}
bool
OutboundContext::ShiftIntroduction(bool rebuild)
{
bool success = false, shifted = false;
const auto now = Now();
auto shift_timeout = send_timeout * 5 / 2;
if (abs(now - last_shift) < shift_timeout)
return false;
std::vector<Introduction> intros = current_intro.intros;
if (intros.size() > 1)
{
std::shuffle(intros.begin(), intros.end(), CSRNG{});
}
// to find a intro on the same router as before that is newer
for (const auto& intro : intros)
{
if (intro.ExpiresSoon(now))
continue;
if (ep.SnodeBlacklist().count(intro.router))
continue;
if (remote_intro.router == intro.router)
{
if (intro.expiry > next_intro.expiry)
{
success = true;
next_intro = intro;
}
}
}
if (!success)
{
/// pick newer intro not on same router
for (const auto& intro : intros)
{
if (ep.SnodeBlacklist().count(intro.router))
continue;
ep.EnsureRouterIsKnown(intro.router);
if (intro.ExpiresSoon(now))
continue;
if (next_intro != intro)
{
if (intro.expiry > next_intro.expiry)
{
shifted = intro.router != next_intro.router;
next_intro = intro;
success = true;
}
}
}
}
if (next_intro.router.IsZero())
return false;
if (shifted)
last_shift = now;
if (rebuild && !BuildCooldownHit(Now()))
BuildOneAlignedTo(next_intro.router);
return success;
}
void
OutboundContext::HandlePathDied(path::Path_ptr path)
{
// unconditionally update introset
UpdateIntroSet();
const RouterID endpoint{path->Endpoint()};
// if a path to our current intro died...
if (endpoint == remote_intro.router)
{
// figure out how many paths to this router we have
size_t num = 0;
ForEachPath([&](const path::Path_ptr& p) {
if (p->Endpoint() == endpoint && p->IsReady())
++num;
});
if (num == 0)
{
// we have no more paths to this endpoint so we want to pivot off of it
MarkCurrentIntroBad(Now());
ShiftIntroRouter(endpoint);
if (next_intro.router != endpoint)
BuildOneAlignedTo(next_intro.router);
}
}
}
bool
OutboundContext::ShouldKeepAlive(std::chrono::milliseconds now) const
{
const auto SendKeepAliveInterval = send_timeout / 2;
if (not got_inbound_traffic)
return false;
if (last_inbound_traffic == 0s)
return false;
return (now - last_keep_alive) >= SendKeepAliveInterval;
}
void
OutboundContext::Tick(std::chrono::milliseconds now)
{
path::Builder::Tick(now);
if (ShouldKeepAlive(now))
KeepAlive();
}
void
OutboundContext::send_packet_to_remote(std::string buf)
{
AsyncEncryptAndSendTo(buf, t);
}
} // namespace llarp::service