Path routing partially implementing

- Reworking how paths to services and snodes
- pushing for Tom to rebase for path handling
pull/2204/head
dr7ana 8 months ago
parent 2cc02d7b60
commit c8dae875b5

@ -1,6 +1,5 @@
#include "serviceaddresslookup.hpp"
#include "context.hpp"
#include <llarp/dht/messages/findintro.hpp>
#include <llarp/dht/messages/gotintro.hpp>
#include <utility>

@ -943,7 +943,6 @@ namespace llarp
void
LinkManager::handle_find_intro(oxen::quic::message m)
{
std::string tag_name;
ustring location;
uint64_t relay_order, is_relayed;
@ -951,7 +950,6 @@ namespace llarp
{
oxenc::bt_dict_consumer btdc{m.body()};
tag_name = btdc.require<std::string>("N");
relay_order = btdc.require<uint64_t>("O");
is_relayed = btdc.require<uint64_t>("R");
location = btdc.require<ustring>("S");
@ -993,7 +991,7 @@ namespace llarp
send_control_message(
peer_key,
"find_intro",
FindIntroMessage::serialize(dht::Key_t{peer_key}, tag_name, is_relayed, relay_order),
FindIntroMessage::serialize(dht::Key_t{peer_key}, is_relayed, relay_order),
[original_msg = std::move(m)](oxen::quic::message relay_response) mutable {
if (relay_response)
log::info(

@ -59,13 +59,12 @@ namespace llarp
inline auto INSUFFICIENT_NODES = "INSUFFICIENT NODES"sv;
inline static std::string
serialize(const dht::Key_t& location, std::string tag, bool is_relayed, uint64_t order)
serialize(const dht::Key_t& location, bool is_relayed, uint64_t order)
{
oxenc::bt_dict_producer btdp;
try
{
btdp.append("N", tag);
btdp.append("O", order);
btdp.append("R", is_relayed ? 1 : 0);
btdp.append("S", location.ToView());

@ -62,11 +62,22 @@ namespace llarp::path
EnterState(ePathBuilding, parent->Now());
}
void
Path::find_intro(
const dht::Key_t& location,
bool is_relayed,
uint64_t order,
std::function<void(oxen::quic::message m)> func)
{
send_path_control_message(
"find_intro", FindIntroMessage::serialize(location, is_relayed, order), std::move(func));
}
void
Path::find_name(std::string name, std::function<void(oxen::quic::message m)> func)
{
send_path_control_message(
"find_router", FindNameMessage::serialize(std::move(name)), std::move(func));
"find_name", FindNameMessage::serialize(std::move(name)), std::move(func));
}
void

@ -283,6 +283,13 @@ namespace llarp
void
find_router(std::string rid, std::function<void(oxen::quic::message m)> func = nullptr);
void
find_intro(
const dht::Key_t& location,
bool is_relayed = false,
uint64_t order = 0,
std::function<void(oxen::quic::message m)> func = nullptr);
void
send_path_control_message(
std::string method,

@ -2,7 +2,6 @@
#include "endpoint_state.hpp"
#include "endpoint_util.hpp"
#include "auth.hpp"
#include "llarp/util/logging.hpp"
#include "outbound_context.hpp"
#include "protocol.hpp"
#include "info.hpp"
@ -19,6 +18,7 @@
#include <llarp/dht/messages/gotrouter.hpp>
#include <llarp/dht/messages/pubintro.hpp>
#include <llarp/messages/dht.hpp>
#include <llarp/link/contacts.hpp>
#include <llarp/nodedb.hpp>
#include <llarp/profiling.hpp>
#include <llarp/router/router.hpp>
@ -30,6 +30,7 @@
#include <llarp/util/buffer.hpp>
#include <llarp/util/meta/memfn.hpp>
#include <llarp/link/link_manager.hpp>
#include <llarp/util/logging.hpp>
#include <llarp/tooling/dht_event.hpp>
#include <llarp/quic/server.hpp>
#include <llarp/quic/tunnel.hpp>
@ -930,16 +931,14 @@ namespace llarp
}
bool
Endpoint::lookup_router(RouterID rid)
Endpoint::lookup_router(RouterID rid, std::function<void(oxen::quic::message)> func)
{
using llarp::dht::FindRouterMessage;
auto& routers = _state->pending_routers;
const auto& routers = _state->pending_routers;
if (routers.find(rid) == routers.end())
{
auto path = GetEstablishedPathClosestTo(rid);
path->find_router("find_router");
path->find_router("find_router", func);
return true;
}
@ -1287,7 +1286,9 @@ namespace llarp
}
bool
Endpoint::EnsurePathToSNode(const RouterID snode, SNodeEnsureHook h)
Endpoint::EnsurePathToSNode(
const RouterID snode,
std::function<void(const RouterID, exit::BaseSession_ptr, ConvoTag)> hook)
{
auto& nodeSessions = _state->snode_sessions;
@ -1326,17 +1327,17 @@ namespace llarp
while (itr != range.second)
{
if (itr->second->IsReady())
h(snode, itr->second, ConvoTag{itr->second->CurrentPath()->as_array()});
hook(snode, itr->second, ConvoTag{itr->second->CurrentPath()->as_array()});
else
{
itr->second->AddReadyHook([h, snode](auto session) {
itr->second->AddReadyHook([hook, snode](auto session) {
if (session)
{
h(snode, session, ConvoTag{session->CurrentPath()->as_array()});
hook(snode, session, ConvoTag{session->CurrentPath()->as_array()});
}
else
{
h(snode, nullptr, ConvoTag{});
hook(snode, nullptr, ConvoTag{});
}
});
if (not itr->second->BuildCooldownHit(Now()))
@ -1349,7 +1350,9 @@ namespace llarp
bool
Endpoint::EnsurePathToService(
const Address remote, PathEnsureHook hook, [[maybe_unused]] llarp_time_t timeout)
const Address remote,
std::function<void(Address, OutboundContext*)> hook,
[[maybe_unused]] llarp_time_t timeout)
{
if (not WantsOutboundSession(remote))
{
@ -1361,8 +1364,6 @@ namespace llarp
/// how many routers to use for lookups
static constexpr size_t NumParallelLookups = 2;
/// how many requests per router
static constexpr size_t RequestsPerLookup = 2;
// add response hook to list for address.
_state->pending_service_lookups.emplace(remote, hook);
@ -1387,9 +1388,7 @@ namespace llarp
const auto paths = GetManyPathsWithUniqueEndpoints(this, NumParallelLookups);
using namespace std::placeholders;
const dht::Key_t location = remote.ToKey();
uint64_t order = 0;
// flag to only add callback to list of callbacks for
// address once.
@ -1397,36 +1396,28 @@ namespace llarp
for (const auto& path : paths)
{
for (size_t count = 0; count < RequestsPerLookup; ++count)
{
HiddenServiceAddressLookup* job = new HiddenServiceAddressLookup(
this,
[this](auto addr, auto result, auto from, auto left, auto order) {
return OnLookup(addr, result, from, left, order);
},
location,
PubKey{remote.as_array()},
path->Endpoint(),
order,
GenTXID(),
timeout + (2 * path->intro.latency) + IntrosetLookupGraceInterval);
LogInfo(
"doing lookup for ",
remote,
" via ",
path->Endpoint(),
" at ",
location,
" order=",
order);
order++;
if (job->SendRequestViaPath(path, router()))
path->find_intro(location, false, 0, [this, hook](oxen::quic::message m) mutable {
if (m)
{
hookAdded = true;
std::string introset;
try
{
oxenc::bt_dict_consumer btdc{m.body()};
introset = btdc.require<std::string>("INTROSET");
}
catch (...)
{
log::warning(log_cat, "Failed to parse find name response!");
throw;
}
service::EncryptedIntroSet enc{introset};
router()->contacts()->services()->PutNode(std::move(enc));
// TODO: finish this
}
else
LogError(Name(), " send via path failed for lookup");
}
});
}
return hookAdded;
}
@ -1450,6 +1441,7 @@ namespace llarp
LogWarn("SendToOrQueue failed: convo tag is zero");
return false;
}
LogDebug(Name(), " send ", pkt.sz, " bytes on T=", tag);
if (auto maybe = GetEndpointWithConvoTag(tag))
{

@ -239,11 +239,13 @@ namespace llarp
// "find router" via closest path
bool
lookup_router(RouterID router);
lookup_router(RouterID router, std::function<void(oxen::quic::message)> func = nullptr);
// "find name"
void
lookup_name(std::string name, std::function<void(oxen::quic::message)> func) override;
lookup_name(
std::string name, std::function<void(oxen::quic::message)> func = nullptr) override;
// "find introset?""
void
LookupServiceAsync(
@ -333,10 +335,6 @@ namespace llarp
std::function<void(std::optional<ConvoTag>)> hook,
llarp_time_t timeout) override;
// passed a sendto context when we have a path established otherwise
// nullptr if the path was not made before the timeout
using PathEnsureHook = std::function<void(Address, OutboundContext*)>;
static constexpr auto DefaultPathEnsureTimeout = 2s;
/// return false if we have already called this function before for this
@ -344,17 +342,17 @@ namespace llarp
bool
EnsurePathToService(
const Address remote,
PathEnsureHook h,
std::function<void(Address, OutboundContext*)> h,
llarp_time_t timeoutMS = DefaultPathEnsureTimeout);
using SNodeEnsureHook = std::function<void(const RouterID, exit::BaseSession_ptr, ConvoTag)>;
void
InformPathToService(const Address remote, OutboundContext* ctx);
/// ensure a path to a service node by public key
bool
EnsurePathToSNode(const RouterID remote, SNodeEnsureHook h);
EnsurePathToSNode(
const RouterID remote,
std::function<void(const RouterID, exit::BaseSession_ptr, ConvoTag)> h);
/// return true if this endpoint is trying to lookup this router right now
bool

@ -31,7 +31,7 @@ namespace llarp::service
bool
OutboundContext::ShouldBundleRC() const
{
return m_Endpoint->ShouldBundleRC();
return service_endpoint->ShouldBundleRC();
}
bool
@ -87,8 +87,8 @@ namespace llarp::service
if (remoteIntro != m_NextIntro)
{
remoteIntro = m_NextIntro;
m_DataHandler->PutSenderFor(currentConvoTag, currentIntroSet.address_keys, false);
m_DataHandler->PutIntroFor(currentConvoTag, remoteIntro);
service_endpoint->PutSenderFor(currentConvoTag, currentIntroSet.address_keys, false);
service_endpoint->PutIntroFor(currentConvoTag, remoteIntro);
ShiftIntroRouter(m_NextIntro.router);
// if we have not made a handshake to the remote endpoint do so
if (not IntroGenerated())
@ -241,19 +241,19 @@ namespace llarp::service
auto frame = std::make_shared<ProtocolFrameMessage>();
frame->clear();
auto ex = std::make_shared<AsyncKeyExchange>(
m_Endpoint->Loop(),
service_endpoint->Loop(),
remoteIdent,
m_Endpoint->GetIdentity(),
service_endpoint->GetIdentity(),
currentIntroSet.sntru_pubkey,
remoteIntro,
m_DataHandler,
service_endpoint,
currentConvoTag,
t);
ex->hook = [self = shared_from_this(), path](auto frame) {
if (not self->Send(std::move(frame), path))
return;
self->m_Endpoint->Loop()->call_later(
self->service_endpoint->Loop()->call_later(
self->remoteIntro.latency, [self]() { self->sentIntro = true; });
};
@ -263,9 +263,10 @@ namespace llarp::service
frame->flag = 0;
generatedIntro = true;
// ensure we have a sender put for this convo tag
m_DataHandler->PutSenderFor(currentConvoTag, currentIntroSet.address_keys, false);
service_endpoint->PutSenderFor(currentConvoTag, currentIntroSet.address_keys, false);
// encrypt frame async
m_Endpoint->router()->queue_work([ex, frame] { return AsyncKeyExchange::Encrypt(ex, frame); });
service_endpoint->router()->queue_work(
[ex, frame] { return AsyncKeyExchange::Encrypt(ex, frame); });
LogInfo(Name(), " send intro frame T=", currentConvoTag);
}
@ -287,7 +288,7 @@ namespace llarp::service
m_LastIntrosetUpdateAt = now;
// we want to use the parent endpoint's paths because outbound context
// does not implement path::PathSet::HandleGotIntroMessage
const auto paths = GetManyPathsWithUniqueEndpoints(m_Endpoint, 2, location);
const auto paths = GetManyPathsWithUniqueEndpoints(service_endpoint, 2, location);
[[maybe_unused]] uint64_t relayOrder = 0;
for ([[maybe_unused]] const auto& path : paths)
{
@ -352,7 +353,7 @@ namespace llarp::service
// if we dont have a cached session key after sending intro we are in a fugged state so
// expunge
SharedSecret discardme;
if (not m_DataHandler->GetCachedSessionKeyFor(currentConvoTag, discardme))
if (not service_endpoint->GetCachedSessionKeyFor(currentConvoTag, discardme))
{
LogError(Name(), " no cached key after sending intro, we are in a fugged state, oh no");
return true;
@ -397,7 +398,7 @@ namespace llarp::service
}
// lookup router in intro if set and unknown
if (not m_NextIntro.router.IsZero())
m_Endpoint->EnsureRouterIsKnown(m_NextIntro.router);
service_endpoint->EnsureRouterIsKnown(m_NextIntro.router);
if (ReadyToSend() and not m_ReadyHooks.empty())
{
@ -470,7 +471,7 @@ namespace llarp::service
}
if (m_NextIntro.router.IsZero())
return std::nullopt;
return GetHopsAlignedToForBuild(m_NextIntro.router, m_Endpoint->SnodeBlacklist());
return GetHopsAlignedToForBuild(m_NextIntro.router, service_endpoint->SnodeBlacklist());
}
bool
@ -538,7 +539,7 @@ namespace llarp::service
{
if (intro.ExpiresSoon(now))
continue;
if (m_Endpoint->SnodeBlacklist().count(intro.router))
if (service_endpoint->SnodeBlacklist().count(intro.router))
continue;
if (remoteIntro.router == intro.router)
{
@ -554,9 +555,9 @@ namespace llarp::service
/// pick newer intro not on same router
for (const auto& intro : intros)
{
if (m_Endpoint->SnodeBlacklist().count(intro.router))
if (service_endpoint->SnodeBlacklist().count(intro.router))
continue;
m_Endpoint->EnsureRouterIsKnown(intro.router);
service_endpoint->EnsureRouterIsKnown(intro.router);
if (intro.ExpiresSoon(now))
continue;
if (m_NextIntro != intro)
@ -627,13 +628,13 @@ namespace llarp::service
bool
OutboundContext::HandleHiddenServiceFrame(path::Path_ptr p, const ProtocolFrameMessage& frame)
{
m_LastInboundTraffic = m_Endpoint->Now();
m_LastInboundTraffic = service_endpoint->Now();
m_GotInboundTraffic = true;
if (frame.flag)
{
// handle discard
ServiceInfo si;
if (!m_Endpoint->GetSenderFor(frame.convo_tag, si))
if (!service_endpoint->GetSenderFor(frame.convo_tag, si))
{
LogWarn("no sender for T=", frame.convo_tag);
return false;
@ -651,7 +652,7 @@ namespace llarp::service
if (const auto maybe = AuthResultCodeFromInt(frame.flag))
result.code = *maybe;
SharedSecret sessionKey{};
if (m_DataHandler->GetCachedSessionKeyFor(frame.convo_tag, sessionKey))
if (service_endpoint->GetCachedSessionKeyFor(frame.convo_tag, sessionKey))
{
ProtocolMessage msg{};
if (frame.DecryptPayloadInto(sessionKey, msg))
@ -664,7 +665,7 @@ namespace llarp::service
}
}
m_Endpoint->RemoveConvoTag(frame.convo_tag);
service_endpoint->RemoveConvoTag(frame.convo_tag);
if (authResultListener)
{
authResultListener(result);
@ -687,8 +688,8 @@ namespace llarp::service
handler(result);
};
}
const auto& ident = m_Endpoint->GetIdentity();
if (not frame.AsyncDecryptAndVerify(m_Endpoint->Loop(), p, ident, m_Endpoint, hook))
const auto& ident = service_endpoint->GetIdentity();
if (not frame.AsyncDecryptAndVerify(service_endpoint->Loop(), p, ident, service_endpoint, hook))
{
// send reset convo tag message
LogError("failed to decrypt and verify frame");
@ -700,8 +701,8 @@ namespace llarp::service
f.Sign(ident);
{
LogWarn("invalidating convotag T=", frame.convo_tag);
m_Endpoint->RemoveConvoTag(frame.convo_tag);
m_Endpoint->_send_queue.tryPushBack(
service_endpoint->RemoveConvoTag(frame.convo_tag);
service_endpoint->_send_queue.tryPushBack(
SendEvent_t{std::make_shared<routing::PathTransferMessage>(f, frame.path_id), p});
}
}

@ -7,177 +7,173 @@
#include <unordered_map>
#include <unordered_set>
namespace llarp
namespace llarp::service
{
namespace service
struct AsyncKeyExchange;
struct Endpoint;
/// context needed to initiate an outbound hidden service session
struct OutboundContext : public path::Builder,
public SendContext,
public std::enable_shared_from_this<OutboundContext>
{
struct AsyncKeyExchange;
struct Endpoint;
OutboundContext(const IntroSet& introSet, Endpoint* parent);
/// context needed to initiate an outbound hidden service session
struct OutboundContext : public path::Builder,
public SendContext,
public std::enable_shared_from_this<OutboundContext>
{
OutboundContext(const IntroSet& introSet, Endpoint* parent);
~OutboundContext() override;
void
Tick(llarp_time_t now) override;
util::StatusObject
ExtractStatus() const;
~OutboundContext() override;
void
BlacklistSNode(const RouterID) override{};
void
Tick(llarp_time_t now) override;
bool
ShouldBundleRC() const override;
util::StatusObject
ExtractStatus() const;
path::PathSet_ptr
GetSelf() override
{
return shared_from_this();
}
std::weak_ptr<path::PathSet>
GetWeak() override
{
return weak_from_this();
}
void
BlacklistSNode(const RouterID) override{};
Address
Addr() const;
bool
ShouldBundleRC() const override;
bool
Stop() override;
path::PathSet_ptr
GetSelf() override
{
return shared_from_this();
}
bool
HandleDataDrop(path::Path_ptr p, const PathID_t& dst, uint64_t s);
std::weak_ptr<path::PathSet>
GetWeak() override
{
return weak_from_this();
}
void
HandlePathDied(path::Path_ptr p) override;
Address
Addr() const;
/// set to true if we are updating the remote introset right now
bool updatingIntroSet;
bool
Stop() override;
/// update the current selected intro to be a new best introduction
/// return true if we have changed intros
bool
ShiftIntroduction(bool rebuild = true) override;
bool
HandleDataDrop(path::Path_ptr p, const PathID_t& dst, uint64_t s);
/// shift the intro off the current router it is using
void
ShiftIntroRouter(const RouterID remote) override;
void
HandlePathDied(path::Path_ptr p) override;
/// mark the current remote intro as bad
void
MarkCurrentIntroBad(llarp_time_t now) override;
/// set to true if we are updating the remote introset right now
bool updatingIntroSet;
void
MarkIntroBad(const Introduction& marked, llarp_time_t now);
/// update the current selected intro to be a new best introduction
/// return true if we have changed intros
bool
ShiftIntroduction(bool rebuild = true) override;
/// return true if we are ready to send
bool
ReadyToSend() const;
/// shift the intro off the current router it is using
void
ShiftIntroRouter(const RouterID remote) override;
void
AddReadyHook(std::function<void(OutboundContext*)> readyHook, llarp_time_t timeout);
/// mark the current remote intro as bad
void
MarkCurrentIntroBad(llarp_time_t now) override;
/// for exits
void
SendPacketToRemote(const llarp_buffer_t&, ProtocolType t) override;
void
MarkIntroBad(const Introduction& marked, llarp_time_t now);
bool
ShouldBuildMore(llarp_time_t now) const override;
/// return true if we are ready to send
bool
ReadyToSend() const;
/// pump internal state
/// return true to mark as dead
bool
Pump(llarp_time_t now);
void
AddReadyHook(std::function<void(OutboundContext*)> readyHook, llarp_time_t timeout);
/// return true if it's safe to remove ourselves
bool
IsDone(llarp_time_t now) const;
/// for exits
void
SendPacketToRemote(const llarp_buffer_t&, ProtocolType t) override;
bool
CheckPathIsDead(path::Path_ptr p, llarp_time_t dlt);
bool
ShouldBuildMore(llarp_time_t now) const override;
void
AsyncGenIntro(const llarp_buffer_t& payload, ProtocolType t) override;
/// pump internal state
/// return true to mark as dead
bool
Pump(llarp_time_t now);
/// issues a lookup to find the current intro set of the remote service
void
UpdateIntroSet() override;
/// return true if it's safe to remove ourselves
bool
IsDone(llarp_time_t now) const;
void
HandlePathBuilt(path::Path_ptr path) override;
bool
CheckPathIsDead(path::Path_ptr p, llarp_time_t dlt);
void
HandlePathBuildTimeout(path::Path_ptr path) override;
void
AsyncGenIntro(const llarp_buffer_t& payload, ProtocolType t) override;
void
HandlePathBuildFailedAt(path::Path_ptr path, RouterID hop) override;
/// issues a lookup to find the current intro set of the remote service
void
UpdateIntroSet() override;
std::optional<std::vector<RouterContact>>
GetHopsForBuild() override;
void
HandlePathBuilt(path::Path_ptr path) override;
bool
HandleHiddenServiceFrame(path::Path_ptr p, const ProtocolFrameMessage& frame);
void
HandlePathBuildTimeout(path::Path_ptr path) override;
std::string
Name() const override;
void
HandlePathBuildFailedAt(path::Path_ptr path, RouterID hop) override;
void
KeepAlive();
std::optional<std::vector<RouterContact>>
GetHopsForBuild() override;
bool
ShouldKeepAlive(llarp_time_t now) const;
bool
HandleHiddenServiceFrame(path::Path_ptr p, const ProtocolFrameMessage& frame);
std::string
Name() const override;
void
KeepAlive();
bool
ShouldKeepAlive(llarp_time_t now) const;
const IntroSet&
GetCurrentIntroSet() const
{
return currentIntroSet;
}
llarp_time_t
RTT() const;
bool
OnIntroSetUpdate(
const Address& addr,
std::optional<IntroSet> i,
const RouterID& endpoint,
llarp_time_t,
uint64_t relayOrder);
private:
/// swap remoteIntro with next intro
void
SwapIntros();
bool
IntroGenerated() const override;
bool
IntroSent() const override;
const dht::Key_t location;
const Address addr;
uint64_t m_UpdateIntrosetTX = 0;
IntroSet currentIntroSet;
Introduction m_NextIntro;
llarp_time_t lastShift = 0s;
uint16_t m_LookupFails = 0;
uint16_t m_BuildFails = 0;
llarp_time_t m_LastInboundTraffic = 0s;
bool m_GotInboundTraffic = false;
bool generatedIntro = false;
bool sentIntro = false;
std::vector<std::function<void(OutboundContext*)>> m_ReadyHooks;
llarp_time_t m_LastIntrosetUpdateAt = 0s;
llarp_time_t m_LastKeepAliveAt = 0s;
};
} // namespace service
} // namespace llarp
const IntroSet&
GetCurrentIntroSet() const
{
return currentIntroSet;
}
llarp_time_t
RTT() const;
bool
OnIntroSetUpdate(
const Address& addr,
std::optional<IntroSet> i,
const RouterID& endpoint,
llarp_time_t,
uint64_t relayOrder);
private:
/// swap remoteIntro with next intro
void
SwapIntros();
bool
IntroGenerated() const override;
bool
IntroSent() const override;
const dht::Key_t location;
const Address addr;
uint64_t m_UpdateIntrosetTX = 0;
IntroSet currentIntroSet;
Introduction m_NextIntro;
llarp_time_t lastShift = 0s;
uint16_t m_LookupFails = 0;
uint16_t m_BuildFails = 0;
llarp_time_t m_LastInboundTraffic = 0s;
bool m_GotInboundTraffic = false;
bool generatedIntro = false;
bool sentIntro = false;
std::vector<std::function<void(OutboundContext*)>> m_ReadyHooks;
llarp_time_t m_LastIntrosetUpdateAt = 0s;
llarp_time_t m_LastKeepAliveAt = 0s;
};
} // namespace llarp::service

@ -16,8 +16,7 @@ namespace llarp::service
: remoteIdent(std::move(ident))
, remoteIntro(intro)
, m_PathSet(send)
, m_DataHandler(ep)
, m_Endpoint(ep)
, service_endpoint(ep)
, createdAt(ep->Now())
, m_SendQueue(SendContextQueueSize)
{}
@ -30,7 +29,7 @@ namespace llarp::service
std::make_shared<routing::PathTransferMessage>(*msg, remoteIntro.path_id), path))
== thread::QueueReturn::Success)
{
m_Endpoint->router()->TriggerPump();
service_endpoint->router()->TriggerPump();
return true;
}
return false;
@ -39,7 +38,7 @@ namespace llarp::service
void
SendContext::FlushUpstream()
{
auto r = m_Endpoint->router();
auto r = service_endpoint->router();
std::unordered_set<path::Path_ptr, path::Path::Ptr_Hash> flushpaths;
auto rttRMS = 0ms;
while (auto maybe = m_SendQueue.tryPopFront())
@ -50,7 +49,7 @@ namespace llarp::service
{
lastGoodSend = r->now();
flushpaths.emplace(path);
m_Endpoint->ConvoTagTX(msg->protocol_frame_msg.convo_tag);
service_endpoint->ConvoTagTX(msg->protocol_frame_msg.convo_tag);
const auto rtt = (path->intro.latency + remoteIntro.latency) * 2;
rttRMS += rtt * rtt.count();
}
@ -85,7 +84,7 @@ namespace llarp::service
return;
}
if (!m_DataHandler->GetCachedSessionKeyFor(f->convo_tag, shared))
if (!service_endpoint->GetCachedSessionKeyFor(f->convo_tag, shared))
{
LogWarn(
m_PathSet->Name(),
@ -95,10 +94,10 @@ namespace llarp::service
}
auto m = std::make_shared<ProtocolMessage>();
m_DataHandler->PutIntroFor(f->convo_tag, remoteIntro);
m_DataHandler->PutReplyIntroFor(f->convo_tag, path->intro);
service_endpoint->PutIntroFor(f->convo_tag, remoteIntro);
service_endpoint->PutReplyIntroFor(f->convo_tag, path->intro);
m->proto = t;
if (auto maybe = m_Endpoint->GetSeqNoForConvo(f->convo_tag))
if (auto maybe = service_endpoint->GetSeqNoForConvo(f->convo_tag))
{
m->seqno = *maybe;
}
@ -109,11 +108,11 @@ namespace llarp::service
}
m->introReply = path->intro;
f->path_id = m->introReply.path_id;
m->sender = m_Endpoint->GetIdentity().pub;
m->sender = service_endpoint->GetIdentity().pub;
m->tag = f->convo_tag;
m->PutBuffer(payload);
m_Endpoint->router()->queue_work([f, m, shared, path, this] {
if (not f->EncryptAndSign(*m, shared, m_Endpoint->GetIdentity()))
service_endpoint->router()->queue_work([f, m, shared, path, this] {
if (not f->EncryptAndSign(*m, shared, service_endpoint->GetIdentity()))
{
LogError(m_PathSet->Name(), " failed to sign message");
return;
@ -125,7 +124,7 @@ namespace llarp::service
void
SendContext::AsyncSendAuth(std::function<void(AuthResult)> resultHandler)
{
if (const auto maybe = m_Endpoint->MaybeGetAuthInfoForEndpoint(remoteIdent.Addr()))
if (const auto maybe = service_endpoint->MaybeGetAuthInfoForEndpoint(remoteIdent.Addr()))
{
// send auth message
const llarp_buffer_t authdata{maybe->token};
@ -154,7 +153,7 @@ namespace llarp::service
"to prevent bullshittery");
return;
}
const auto maybe = m_Endpoint->MaybeGetAuthInfoForEndpoint(remoteIdent.Addr());
const auto maybe = service_endpoint->MaybeGetAuthInfoForEndpoint(remoteIdent.Addr());
if (maybe.has_value())
{
// send auth message

@ -39,8 +39,8 @@ namespace llarp::service
Introduction remoteIntro;
ConvoTag currentConvoTag;
path::PathSet* const m_PathSet;
Endpoint* const m_DataHandler;
Endpoint* const m_Endpoint;
// Endpoint* const m_DataHandler;
Endpoint* const service_endpoint;
uint64_t sequenceNo = 0;
llarp_time_t lastGoodSend = 0s;
const llarp_time_t createdAt;

Loading…
Cancel
Save