Merge pull request #1795 from majestrate/try-reducing-cpu-usage-2021-11-09

make PumpLL idempotent to reduce cpu use a bit
pull/1796/head
Jason Rhinelander 3 years ago committed by GitHub
commit f270d74441
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -80,7 +80,7 @@ namespace llarp
AsyncDecrypt(const EncryptedFrame& frame, User_ptr u, WorkerFunction_t worker)
{
target = frame;
worker(std::bind(&AsyncFrameDecrypter<User>::Decrypt, this, std::move(u)));
worker([this, u = std::move(u)]() mutable { Decrypt(std::move(u)); });
}
};
} // namespace llarp

@ -30,14 +30,13 @@ namespace llarp
llarp::LogDebug("got ", valuesFound.size(), " routers from exploration");
auto router = parent->GetRouter();
using std::placeholders::_1;
for (const auto& pk : valuesFound)
{
// lookup router
if (router and router->nodedb()->Has(pk))
continue;
parent->LookupRouter(
pk, std::bind(&AbstractRouter::HandleDHTLookupForExplore, router, pk, _1));
pk, [router, pk](const auto& res) { router->HandleDHTLookupForExplore(pk, res); });
}
}
} // namespace dht

@ -81,12 +81,6 @@ namespace llarp
return llarp::time_now_ms();
}
// Triggers an event loop wakeup; use when something has been done that requires the event loop
// to wake up (e.g. adding to queues). This is called implicitly by call() and call_soon().
// Idempotent and thread-safe.
virtual void
wakeup() = 0;
// Calls a function/lambda/etc. If invoked from within the event loop itself this calls the
// given lambda immediately; otherwise it passes it to `call_soon()` to be queued to run at the
// next event loop iteration.
@ -196,10 +190,6 @@ namespace llarp
virtual std::shared_ptr<UDPHandle>
make_udp(UDPReceiveFunc on_recv) = 0;
/// set the function that is called once per cycle the flush all the queues
virtual void
set_pump_function(std::function<void(void)> pumpll) = 0;
/// Make a thread-safe event loop waker (an "async" in libuv terminology) on this event loop;
/// you can call `->Trigger()` on the returned shared pointer to fire the callback at the next
/// available event loop iteration. (Multiple Trigger calls invoked before the call is actually
@ -227,6 +217,13 @@ namespace llarp
{
return nullptr;
}
protected:
// Triggers an event loop wakeup; use when something has been done that requires the event loop
// to wake up (e.g. adding to queues). This is called implicitly by call() and call_soon().
// Idempotent and thread-safe.
virtual void
wakeup() = 0;
};
using EventLoop_ptr = std::shared_ptr<EventLoop>;

@ -111,13 +111,12 @@ namespace llarp::uv
{
llarp::LogTrace("ticking event loop.");
FlushLogic();
PumpLL();
auto& log = llarp::LogContext::Instance();
if (log.logStream)
log.logStream->Tick(time_now());
}
Loop::Loop(size_t queue_size) : llarp::EventLoop{}, PumpLL{[] {}}, m_LogicCalls{queue_size}
Loop::Loop(size_t queue_size) : llarp::EventLoop{}, m_LogicCalls{queue_size}
{
if (!(m_Impl = uvw::Loop::create()))
throw std::runtime_error{"Failed to construct libuv loop"};
@ -161,12 +160,6 @@ namespace llarp::uv
m_WakeUp->send();
}
void
Loop::set_pump_function(std::function<void(void)> pump)
{
PumpLL = std::move(pump);
}
std::shared_ptr<llarp::UDPHandle>
Loop::make_udp(UDPReceiveFunc on_recv)
{

@ -31,9 +31,6 @@ namespace llarp::uv
bool
running() const override;
void
wakeup() override;
void
call_later(llarp_time_t delay_ms, std::function<void(void)> callback) override;
@ -54,9 +51,6 @@ namespace llarp::uv
void
call_soon(std::function<void(void)> f) override;
void
set_pump_function(std::function<void(void)> pumpll) override;
std::shared_ptr<llarp::EventLoopWakeup>
make_waker(std::function<void()> callback) override;
@ -69,8 +63,6 @@ namespace llarp::uv
void
FlushLogic();
std::function<void(void)> PumpLL;
std::shared_ptr<uvw::Loop>
MaybeGetUVWLoop() override;
@ -95,6 +87,9 @@ namespace llarp::uv
std::unordered_map<int, std::shared_ptr<uvw::PollHandle>> m_Polls;
std::optional<std::thread::id> m_EventLoopThreadID;
void
wakeup() override;
};
} // namespace llarp::uv

@ -288,9 +288,8 @@ namespace llarp
auto path = PickEstablishedPath(llarp::path::ePathRoleExit);
if (path)
{
for (auto& item : m_Upstream)
for (auto& [i, queue] : m_Upstream)
{
auto& queue = item.second;
while (queue.size())
{
auto& msg = queue.front();
@ -305,8 +304,8 @@ namespace llarp
if (m_Upstream.size())
llarp::LogWarn("no path for exit session");
// discard upstream
for (auto& item : m_Upstream)
item.second.clear();
for (auto& [i, queue] : m_Upstream)
queue.clear();
m_Upstream.clear();
if (numHops == 1)
{

@ -92,8 +92,7 @@ namespace llarp
}
return std::nullopt;
}
else
return std::nullopt;
return std::nullopt;
}
const EventLoop_ptr&
@ -112,16 +111,13 @@ namespace llarp
return false;
if (auto* rid = std::get_if<RouterID>(&*maybeAddr))
{
auto range = m_ActiveExits.equal_range(PubKey{*rid});
auto itr = range.first;
while (itr != range.second)
for (auto [itr, end] = m_ActiveExits.equal_range(PubKey{*rid}); itr != end; ++itr)
{
if (not itr->second->LooksDead(Now()))
{
if (itr->second->QueueInboundTraffic(ManagedBuffer{payload}, type))
return true;
}
++itr;
}
if (not m_Router->PathToRouterAllowed(*rid))
@ -136,8 +132,7 @@ namespace llarp
}
return true;
}
else
return false;
return false;
}
bool
@ -357,13 +352,9 @@ namespace llarp
ExitEndpoint::VisitEndpointsFor(
const PubKey& pk, std::function<bool(exit::Endpoint* const)> visit) const
{
auto range = m_ActiveExits.equal_range(pk);
auto itr = range.first;
while (itr != range.second)
for (auto [itr, end] = m_ActiveExits.equal_range(pk); itr != end; ++itr)
{
if (visit(itr->second.get()))
++itr;
else
if (not visit(itr->second.get()))
return true;
}
return false;
@ -422,27 +413,18 @@ namespace llarp
" as we have no working endpoints");
}
});
for (auto& [pubkey, endpoint] : m_ActiveExits)
{
auto itr = m_ActiveExits.begin();
while (itr != m_ActiveExits.end())
if (!endpoint->Flush())
{
if (!itr->second->Flush())
{
LogWarn("exit session with ", itr->first, " dropped packets");
}
++itr;
LogWarn("exit session with ", pubkey, " dropped packets");
}
}
for (auto& [id, session] : m_SNodeSessions)
{
auto itr = m_SNodeSessions.begin();
while (itr != m_SNodeSessions.end())
{
itr->second->FlushUpstream();
itr->second->FlushDownstream();
++itr;
}
session->FlushUpstream();
session->FlushDownstream();
}
m_Router->PumpLL();
}
bool
@ -558,15 +540,13 @@ namespace llarp
// find oldest activity ip address
huint128_t found = {0};
llarp_time_t min = std::numeric_limits<llarp_time_t>::max();
auto itr = m_IPActivity.begin();
while (itr != m_IPActivity.end())
for (const auto& [addr, time] : m_IPActivity)
{
if (itr->second < min)
if (time < min)
{
found.h = itr->first.h;
min = itr->second;
found.h = addr.h;
min = time;
}
++itr;
}
// kick old ident off exit
// TODO: DoS
@ -620,9 +600,9 @@ namespace llarp
ExitEndpoint::AllRemoteEndpoints() const
{
std::unordered_set<AddressVariant_t> remote;
for (auto itr = m_Paths.begin(); itr != m_Paths.end(); ++itr)
for (const auto& [path, pubkey] : m_Paths)
{
remote.insert(RouterID{itr->second});
remote.insert(RouterID{pubkey});
}
return remote;
}
@ -640,9 +620,7 @@ namespace llarp
huint128_t ip = m_KeyToIP[pk];
m_KeyToIP.erase(pk);
m_IPToKey.erase(ip);
auto range = m_ActiveExits.equal_range(pk);
auto exit_itr = range.first;
while (exit_itr != range.second)
for (auto [exit_itr, end] = m_ActiveExits.equal_range(pk); exit_itr != end;)
exit_itr = m_ActiveExits.erase(exit_itr);
}
@ -677,19 +655,14 @@ namespace llarp
{
exit::Endpoint* endpoint = nullptr;
PubKey pk;
{
auto itr = m_Paths.find(path);
if (itr == m_Paths.end())
return nullptr;
if (auto itr = m_Paths.find(path); itr != m_Paths.end())
pk = itr->second;
}
else
return nullptr;
if (auto itr = m_ActiveExits.find(pk); itr != m_ActiveExits.end())
{
auto itr = m_ActiveExits.find(pk);
if (itr != m_ActiveExits.end())
{
if (itr->second->PubKey() == pk)
endpoint = itr->second.get();
}
if (itr->second->PubKey() == pk)
endpoint = itr->second.get();
}
return endpoint;
}
@ -698,8 +671,7 @@ namespace llarp
ExitEndpoint::UpdateEndpointPath(const PubKey& remote, const PathID_t& next)
{
// check if already mapped
auto itr = m_Paths.find(next);
if (itr != m_Paths.end())
if (auto itr = m_Paths.find(next); itr != m_Paths.end())
return false;
m_Paths.emplace(next, remote);
return true;
@ -780,7 +752,7 @@ namespace llarp
{
auto session = std::make_shared<exit::SNodeSession>(
other,
std::bind(&ExitEndpoint::QueueSNodePacket, this, std::placeholders::_1, ip),
[this, ip](const auto& buf) { return QueueSNodePacket(buf, ip); },
GetRouter(),
2,
1,
@ -837,18 +809,14 @@ namespace llarp
void
ExitEndpoint::RemoveExit(const exit::Endpoint* ep)
{
auto range = m_ActiveExits.equal_range(ep->PubKey());
auto itr = range.first;
while (itr != range.second)
for (auto [itr, end] = m_ActiveExits.equal_range(ep->PubKey()); itr != end; ++itr)
{
if (itr->second->GetCurrentPath() == ep->GetCurrentPath())
{
itr = m_ActiveExits.erase(itr);
m_ActiveExits.erase(itr);
// now ep is gone af
return;
}
++itr;
}
}

@ -103,12 +103,10 @@ namespace llarp
void
CalculateTrafficStats(Stats& stats)
{
auto itr = m_ActiveExits.begin();
while (itr != m_ActiveExits.end())
for (auto& [pubkey, endpoint] : m_ActiveExits)
{
stats[itr->first].first += itr->second->TxRate();
stats[itr->first].second += itr->second->RxRate();
++itr;
stats[pubkey].first += endpoint->TxRate();
stats[pubkey].second += endpoint->RxRate();
}
}

@ -86,13 +86,7 @@ namespace llarp
TunEndpoint::TunEndpoint(AbstractRouter* r, service::Context* parent)
: service::Endpoint(r, parent)
, m_UserToNetworkPktQueue("endpoint_sendq", r->loop(), r->loop())
{
m_PacketSendWaker = r->loop()->make_waker([this]() { FlushWrite(); });
m_MessageSendWaker = r->loop()->make_waker([this]() {
FlushSend();
Pump(Now());
});
m_PacketRouter = std::make_unique<vpn::PacketRouter>(
[this](net::IPPacket pkt) { HandleGotUserPacket(std::move(pkt)); });
#if defined(ANDROID) || defined(__APPLE__)
@ -362,15 +356,7 @@ namespace llarp
}
void
TunEndpoint::Flush()
{
FlushSend();
Pump(Now());
FlushWrite();
}
void
TunEndpoint::FlushWrite()
TunEndpoint::Pump(llarp_time_t now)
{
// flush network to user
while (not m_NetworkToUserPktQueue.empty())
@ -378,6 +364,8 @@ namespace llarp
m_NetIf->WritePacket(m_NetworkToUserPktQueue.top().pkt);
m_NetworkToUserPktQueue.pop();
}
service::Endpoint::Pump(now);
}
static bool
@ -952,7 +940,6 @@ namespace llarp
LogInfo(Name(), " has ipv6 address ", m_OurIPv6);
}
#endif
Router()->loop()->add_ticker([this] { Flush(); });
// Attempt to register DNS on the interface
systemd_resolved_set_dns(
@ -1037,151 +1024,146 @@ namespace llarp
}
void
TunEndpoint::FlushSend()
TunEndpoint::HandleGotUserPacket(net::IPPacket pkt)
{
m_UserToNetworkPktQueue.Process([&](net::IPPacket& pkt) {
huint128_t dst, src;
if (pkt.IsV4())
huint128_t dst, src;
if (pkt.IsV4())
{
dst = pkt.dst4to6();
src = pkt.src4to6();
}
else
{
dst = pkt.dstv6();
src = pkt.srcv6();
}
// this is for ipv6 slaac on ipv6 exits
/*
constexpr huint128_t ipv6_multicast_all_nodes =
huint128_t{uint128_t{0xff01'0000'0000'0000UL, 1UL}};
constexpr huint128_t ipv6_multicast_all_routers =
huint128_t{uint128_t{0xff01'0000'0000'0000UL, 2UL}};
if (dst == ipv6_multicast_all_nodes and m_state->m_ExitEnabled)
{
// send ipv6 multicast
for (const auto& [ip, addr] : m_IPToAddr)
{
dst = pkt.dst4to6();
src = pkt.src4to6();
(void)ip;
SendToOrQueue(
service::Address{addr.as_array()}, pkt.ConstBuffer(), service::ProtocolType::Exit);
}
else
return;
}
*/
if (m_state->m_ExitEnabled)
{
dst = net::ExpandV4(net::TruncateV6(dst));
}
auto itr = m_IPToAddr.find(dst);
if (itr == m_IPToAddr.end())
{
// find all ranges that match the destination ip
const auto exitEntries = m_ExitMap.FindAllEntries(dst);
if (exitEntries.empty())
{
dst = pkt.dstv6();
src = pkt.srcv6();
// send icmp unreachable as we dont have any exits for this ip
if (const auto icmp = pkt.MakeICMPUnreachable())
{
HandleWriteIPPacket(icmp->ConstBuffer(), dst, src, 0);
}
return;
}
// this is for ipv6 slaac on ipv6 exits
/*
constexpr huint128_t ipv6_multicast_all_nodes =
huint128_t{uint128_t{0xff01'0000'0000'0000UL, 1UL}};
constexpr huint128_t ipv6_multicast_all_routers =
huint128_t{uint128_t{0xff01'0000'0000'0000UL, 2UL}};
if (dst == ipv6_multicast_all_nodes and m_state->m_ExitEnabled)
service::Address addr{};
for (const auto& [range, exitAddr] : exitEntries)
{
// send ipv6 multicast
for (const auto& [ip, addr] : m_IPToAddr)
if (not IsBogon(dst) or range.BogonContains(dst))
{
(void)ip;
SendToOrQueue(
service::Address{addr.as_array()}, pkt.ConstBuffer(), service::ProtocolType::Exit);
addr = exitAddr;
}
return;
// we do not permit bogons when they don't explicitly match a permitted bogon range
}
if (addr.IsZero()) // drop becase no exit was found that matches our rules
return;
pkt.ZeroSourceAddress();
MarkAddressOutbound(addr);
EnsurePathToService(
addr,
[pkt, this](service::Address addr, service::OutboundContext* ctx) {
if (ctx)
{
ctx->SendPacketToRemote(pkt.ConstBuffer(), service::ProtocolType::Exit);
Router()->TriggerPump();
return;
}
LogWarn("cannot ensure path to exit ", addr, " so we drop some packets");
},
PathAlignmentTimeout());
return;
}
std::variant<service::Address, RouterID> to;
service::ProtocolType type;
if (m_SNodes.at(itr->second))
{
to = RouterID{itr->second.as_array()};
type = service::ProtocolType::TrafficV4;
}
else
{
to = service::Address{itr->second.as_array()};
type = m_state->m_ExitEnabled and src != m_OurIP ? service::ProtocolType::Exit
: pkt.ServiceProtocol();
}
*/
if (m_state->m_ExitEnabled)
// prepare packet for insertion into network
// this includes clearing IP addresses, recalculating checksums, etc
// this does not happen for exits because the point is they don't rewrite addresses
if (type != service::ProtocolType::Exit)
{
if (pkt.IsV4())
pkt.UpdateIPv4Address({0}, {0});
else
pkt.UpdateIPv6Address({0}, {0});
}
// try sending it on an existing convotag
// this succeds for inbound convos, probably.
if (auto maybe = GetBestConvoTagFor(to))
{
if (SendToOrQueue(*maybe, pkt.ConstBuffer(), type))
{
dst = net::ExpandV4(net::TruncateV6(dst));
MarkIPActive(dst);
Router()->TriggerPump();
return;
}
auto itr = m_IPToAddr.find(dst);
if (itr == m_IPToAddr.end())
{
// find all ranges that match the destination ip
const auto exitEntries = m_ExitMap.FindAllEntries(dst);
if (exitEntries.empty())
{
// send icmp unreachable as we dont have any exits for this ip
if (const auto icmp = pkt.MakeICMPUnreachable())
}
// try establishing a path to this guy
// will fail if it's an inbound convo
EnsurePathTo(
to,
[pkt, type, dst, to, this](auto maybe) {
if (not maybe)
{
HandleWriteIPPacket(icmp->ConstBuffer(), dst, src, 0);
var::visit(
[this](auto&& addr) {
LogWarn(Name(), " failed to ensure path to ", addr, " no convo tag found");
},
to);
}
return;
}
service::Address addr{};
for (const auto& [range, exitAddr] : exitEntries)
{
if (range.BogonRange() and range.Contains(dst))
if (SendToOrQueue(*maybe, pkt.ConstBuffer(), type))
{
// we permit this because it matches our rules and we allow bogons
addr = exitAddr;
MarkIPActive(dst);
Router()->TriggerPump();
}
else if (not IsBogon(dst))
else
{
// allow because the destination is not a bogon and the mapped range is not a bogon
addr = exitAddr;
var::visit(
[this](auto&& addr) {
LogWarn(Name(), " failed to send to ", addr, ", SendToOrQueue failed");
},
to);
}
// we do not permit bogons when they don't explicitly match a permitted bogon range
}
if (addr.IsZero()) // drop becase no exit was found that matches our rules
return;
pkt.ZeroSourceAddress();
MarkAddressOutbound(addr);
EnsurePathToService(
addr,
[pkt](service::Address addr, service::OutboundContext* ctx) {
if (ctx)
{
ctx->SendPacketToRemote(pkt.ConstBuffer(), service::ProtocolType::Exit);
return;
}
LogWarn("cannot ensure path to exit ", addr, " so we drop some packets");
},
PathAlignmentTimeout());
return;
}
std::variant<service::Address, RouterID> to;
service::ProtocolType type;
if (m_SNodes.at(itr->second))
{
to = RouterID{itr->second.as_array()};
type = service::ProtocolType::TrafficV4;
}
else
{
to = service::Address{itr->second.as_array()};
type = m_state->m_ExitEnabled and src != m_OurIP ? service::ProtocolType::Exit
: pkt.ServiceProtocol();
}
// prepare packet for insertion into network
// this includes clearing IP addresses, recalculating checksums, etc
// this does not happen for exits because the point is they don't rewrite addresses
if (type != service::ProtocolType::Exit)
{
if (pkt.IsV4())
pkt.UpdateIPv4Address({0}, {0});
else
pkt.UpdateIPv6Address({0}, {0});
}
// try sending it on an existing convotag
// this succeds for inbound convos, probably.
if (auto maybe = GetBestConvoTagFor(to))
{
if (SendToOrQueue(*maybe, pkt.ConstBuffer(), type))
{
MarkIPActive(dst);
return;
}
}
// try establishing a path to this guy
// will fail if it's an inbound convo
EnsurePathTo(
to,
[pkt, type, dst, to, this](auto maybe) {
if (not maybe)
{
var::visit(
[&](auto&& addr) {
LogWarn(Name(), " failed to ensure path to ", addr, " no convo tag found");
},
to);
}
if (SendToOrQueue(*maybe, pkt.ConstBuffer(), type))
{
MarkIPActive(dst);
}
else
{
var::visit(
[&](auto&& addr) {
LogWarn(Name(), " failed to send to ", addr, ", SendToOrQueue failed");
},
to);
}
},
PathAlignmentTimeout());
});
},
PathAlignmentTimeout());
}
bool
@ -1288,7 +1270,7 @@ namespace llarp
bool allow = false;
for (const auto& [range, exitAddr] : mapped)
{
if ((range.BogonRange() and range.Contains(src)) or not IsBogon(src))
if (not IsBogon(src) or range.BogonContains(src))
{
// allow if this address matches the endpoint we think it should be
allow = exitAddr == fromAddr;
@ -1335,8 +1317,8 @@ namespace llarp
pkt.UpdateIPv6Address(src, dst);
}
m_NetworkToUserPktQueue.push(std::move(write));
// wake up packet flushing event so we ensure that all packets are written to user
m_PacketSendWaker->Trigger();
// wake up so we ensure that all packets are written to user
Router()->TriggerPump();
return true;
}
@ -1441,13 +1423,6 @@ namespace llarp
m_IPActivity[ip] = std::numeric_limits<llarp_time_t>::max();
}
void
TunEndpoint::HandleGotUserPacket(net::IPPacket pkt)
{
m_UserToNetworkPktQueue.Emplace(std::move(pkt));
m_MessageSendWaker->Trigger();
}
TunEndpoint::~TunEndpoint() = default;
} // namespace handlers

@ -112,10 +112,6 @@ namespace llarp
HandleWriteIPPacket(
const llarp_buffer_t& buf, huint128_t src, huint128_t dst, uint64_t seqno);
/// queue outbound packet to the world
bool
QueueOutboundTraffic(llarp::net::IPPacket&& pkt);
/// we got a packet from the user
void
HandleGotUserPacket(llarp::net::IPPacket pkt);
@ -172,10 +168,6 @@ namespace llarp
huint128_t
ObtainIPForAddr(std::variant<service::Address, RouterID> addr) override;
/// flush network traffic
void
Flush();
void
ResetInternalState() override;
@ -187,9 +179,6 @@ namespace llarp
net::IPPacket::CompareOrder,
net::IPPacket::GetNow>;
/// queue for sending packets over the network from us
PacketQueue_t m_UserToNetworkPktQueue;
struct WritePacket
{
uint64_t seqno;
@ -204,6 +193,10 @@ namespace llarp
/// queue for sending packets to user from network
std::priority_queue<WritePacket> m_NetworkToUserPktQueue;
void
Pump(llarp_time_t now) override;
/// return true if we have a remote loki address for this ip address
bool
HasRemoteForIP(huint128_t ipv4) const;
@ -216,10 +209,6 @@ namespace llarp
void
MarkIPActiveForever(huint128_t ip);
/// flush ip packets
virtual void
FlushSend();
/// flush writing ip packets to interface
void
FlushWrite();
@ -292,11 +281,6 @@ namespace llarp
std::set<IPRange> m_OwnedRanges;
/// how long to wait for path alignment
llarp_time_t m_PathAlignmentTimeout;
/// idempotent wakeup for writing packets to user
std::shared_ptr<EventLoopWakeup> m_PacketSendWaker;
/// idempotent wakeup for writing messages to network
std::shared_ptr<EventLoopWakeup> m_MessageSendWaker;
/// a file to load / store the ephemeral address map to
std::optional<fs::path> m_PersistAddrMapFile;

@ -136,7 +136,7 @@ namespace llarp
{
auto job = std::make_shared<ExecShellHookJob>(shared_from_this(), std::move(params));
m_ThreadPool.addJob(std::bind(&ExecShellHookJob::Exec, job));
m_ThreadPool.addJob([job = std::move(job)] { job->Exec(); });
}
Backend_ptr

@ -23,9 +23,7 @@ namespace llarp::iwp
: ILinkLayer(
keyManager, getrc, h, sign, before, est, reneg, timeout, closed, pumpDone, worker)
, m_Wakeup{ev->make_waker([this]() { HandleWakeupPlaintext(); })}
, m_PlaintextRecv{1024}
, m_Inbound{allowInbound}
{}
const char*
@ -58,14 +56,15 @@ namespace llarp::iwp
if (itr == m_AuthedAddrs.end())
{
Lock_t lock{m_PendingMutex};
if (m_Pending.count(from) == 0)
auto it = m_Pending.find(from);
if (it == m_Pending.end())
{
if (not m_Inbound)
return;
isNewSession = true;
m_Pending.insert({from, std::make_shared<Session>(this, from)});
it = m_Pending.emplace(from, std::make_shared<Session>(this, from)).first;
}
session = m_Pending.find(from)->second;
session = it->second;
}
else
{
@ -78,8 +77,9 @@ namespace llarp::iwp
if (not success and isNewSession)
{
LogWarn("Brand new session failed; removing from pending sessions list");
m_Pending.erase(m_Pending.find(from));
m_Pending.erase(from);
}
WakeupPlaintext();
}
}
@ -106,13 +106,6 @@ namespace llarp::iwp
return std::make_shared<Session>(this, rc, ai);
}
void
LinkLayer::AddWakeup(std::weak_ptr<Session> session)
{
if (auto ptr = session.lock())
m_PlaintextRecv[ptr->GetRemoteEndpoint()] = session;
}
void
LinkLayer::WakeupPlaintext()
{
@ -122,13 +115,15 @@ namespace llarp::iwp
void
LinkLayer::HandleWakeupPlaintext()
{
for (const auto& [addr, session] : m_PlaintextRecv)
{
auto ptr = session.lock();
if (ptr)
ptr->HandlePlaintext();
}
m_PlaintextRecv.clear();
// Copy bare pointers out first because HandlePlaintext can end up removing themselves from the
// structures.
m_WakingUp.clear(); // Reused to minimize allocations.
for (const auto& [router_id, session] : m_AuthedLinks)
m_WakingUp.push_back(session.get());
for (const auto& [addr, session] : m_Pending)
m_WakingUp.push_back(session.get());
for (auto* session : m_WakingUp)
session->HandlePlaintext();
PumpDone();
}

@ -53,9 +53,6 @@ namespace llarp::iwp
void
WakeupPlaintext();
void
AddWakeup(std::weak_ptr<Session> peer);
std::string
PrintableName() const;
@ -64,8 +61,8 @@ namespace llarp::iwp
HandleWakeupPlaintext();
const std::shared_ptr<EventLoopWakeup> m_Wakeup;
std::unordered_map<SockAddr, std::weak_ptr<Session>> m_PlaintextRecv;
std::unordered_map<SockAddr, RouterID> m_AuthedAddrs;
std::vector<ILinkSession*> m_WakingUp;
const bool m_Inbound;
};

@ -3,6 +3,7 @@
#include <llarp/messages/link_intro.hpp>
#include <llarp/messages/discard.hpp>
#include <llarp/util/meta/memfn.hpp>
#include <llarp/router/abstractrouter.hpp>
namespace llarp
{
@ -26,7 +27,7 @@ namespace llarp
return pkt;
}
constexpr size_t PlaintextQueueSize = 32;
constexpr size_t PlaintextQueueSize = 512;
Session::Session(LinkLayer* p, const RouterContact& rc, const AddressInfo& ai)
: m_State{State::Initial}
@ -39,6 +40,7 @@ namespace llarp
, m_PlaintextRecv{PlaintextQueueSize}
{
token.Zero();
m_PlaintextEmpty.test_and_set();
GotLIM = util::memFn(&Session::GotOutboundLIM, this);
CryptoManager::instance()->shorthash(m_SessionKey, llarp_buffer_t(rc.pubkey));
}
@ -52,6 +54,7 @@ namespace llarp
, m_PlaintextRecv{PlaintextQueueSize}
{
token.Randomize();
m_PlaintextEmpty.test_and_set();
GotLIM = util::memFn(&Session::GotInboundLIM, this);
const PubKey pk = m_Parent->GetOurRC().pubkey;
CryptoManager::instance()->shorthash(m_SessionKey, llarp_buffer_t(pk));
@ -136,6 +139,7 @@ namespace llarp
Session::EncryptAndSend(ILinkSession::Packet_t data)
{
m_EncryptNext.emplace_back(std::move(data));
TriggerPump();
if (!IsEstablished())
{
EncryptWorker(std::move(m_EncryptNext));
@ -190,6 +194,7 @@ namespace llarp
const auto bufsz = buf.size();
auto& msg = m_TXMsgs.emplace(msgid, OutboundMessage{msgid, std::move(buf), now, completed})
.first->second;
TriggerPump();
EncryptAndSend(msg.XMIT());
if (bufsz > FragmentSize)
{
@ -225,6 +230,12 @@ namespace llarp
}
}
void
Session::TriggerPump()
{
m_Parent->Router()->TriggerPump();
}
void
Session::Pump()
{
@ -233,18 +244,18 @@ namespace llarp
{
if (ShouldPing())
SendKeepAlive();
for (auto& item : m_RXMsgs)
for (auto& [id, msg] : m_RXMsgs)
{
if (item.second.ShouldSendACKS(now))
if (msg.ShouldSendACKS(now))
{
item.second.SendACKS(util::memFn(&Session::EncryptAndSend, this), now);
msg.SendACKS(util::memFn(&Session::EncryptAndSend, this), now);
}
}
for (auto& item : m_TXMsgs)
for (auto& [id, msg] : m_TXMsgs)
{
if (item.second.ShouldFlush(now))
if (msg.ShouldFlush(now))
{
item.second.FlushUnAcked(util::memFn(&Session::EncryptAndSend, this), now);
msg.FlushUnAcked(util::memFn(&Session::EncryptAndSend, this), now);
}
}
}
@ -258,7 +269,6 @@ namespace llarp
if (not m_DecryptNext.empty())
{
m_Parent->AddWakeup(weak_from_this());
m_Parent->QueueWork([self, data = m_DecryptNext] { self->DecryptWorker(data); });
m_DecryptNext.clear();
}
@ -613,6 +623,7 @@ namespace llarp
Session::HandleSessionData(Packet_t pkt)
{
m_DecryptNext.emplace_back(std::move(pkt));
TriggerPump();
}
void
@ -638,16 +649,18 @@ namespace llarp
++itr;
}
m_PlaintextRecv.tryPushBack(std::move(msgs));
m_PlaintextEmpty.clear();
m_Parent->WakeupPlaintext();
}
void
Session::HandlePlaintext()
{
while (not m_PlaintextRecv.empty())
if (m_PlaintextEmpty.test_and_set())
return;
while (auto maybe_queue = m_PlaintextRecv.tryPopFront())
{
auto queue = m_PlaintextRecv.popFront();
for (auto& result : queue)
for (auto& result : *maybe_queue)
{
LogTrace("Command ", int(result[PacketOverhead + 1]), " from ", m_RemoteAddr);
switch (result[PacketOverhead + 1])
@ -679,7 +692,7 @@ namespace llarp
}
}
SendMACK();
Pump();
m_Parent->WakeupPlaintext();
}
void
@ -774,6 +787,8 @@ namespace llarp
{
itr = m_RXMsgs.emplace(rxid, InboundMessage{rxid, sz, ShortHash{pos}, m_Parent->Now()})
.first;
TriggerPump();
sz = std::min(sz, uint16_t{FragmentSize});
if ((data.size() - XMITOverhead) == sz)
{

@ -48,8 +48,11 @@ namespace llarp
/// inbound session
Session(LinkLayer* parent, const SockAddr& from);
~Session() = default;
// Signal the event loop that a pump is needed (idempotent)
void
TriggerPump();
// Does the actual pump
void
Pump() override;
@ -129,7 +132,7 @@ namespace llarp
return m_Inbound;
}
void
HandlePlaintext();
HandlePlaintext() override;
private:
enum class State
@ -191,13 +194,14 @@ namespace llarp
/// maps rxid to time recieved
std::unordered_map<uint64_t, llarp_time_t> m_ReplayFilter;
/// rx messages to send in next round of multiacks
std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>> m_SendMACKs;
std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<>> m_SendMACKs;
using CryptoQueue_t = std::vector<Packet_t>;
CryptoQueue_t m_EncryptNext;
CryptoQueue_t m_DecryptNext;
std::atomic_flag m_PlaintextEmpty;
llarp::thread::Queue<CryptoQueue_t> m_PlaintextRecv;
void

@ -7,6 +7,7 @@
#include <llarp/util/fs.hpp>
#include <utility>
#include <unordered_set>
#include <llarp/router/abstractrouter.hpp>
static constexpr auto LINK_LAYER_TICK_INTERVAL = 100ms;
@ -38,7 +39,11 @@ namespace llarp
, m_SecretKey(keyManager->transportKey)
{}
ILinkLayer::~ILinkLayer() = default;
llarp_time_t
ILinkLayer::Now() const
{
return m_Router->loop()->time_now();
}
bool
ILinkLayer::HasSessionTo(const RouterID& id)
@ -124,10 +129,10 @@ namespace llarp
}
bool
ILinkLayer::Configure(EventLoop_ptr loop, const std::string& ifname, int af, uint16_t port)
ILinkLayer::Configure(AbstractRouter* router, const std::string& ifname, int af, uint16_t port)
{
m_Loop = std::move(loop);
m_udp = m_Loop->make_udp(
m_Router = router;
m_udp = m_Router->loop()->make_udp(
[this]([[maybe_unused]] UDPHandle& udp, const SockAddr& from, llarp_buffer_t buf) {
ILinkSession::Packet_t pkt;
pkt.resize(buf.sz);
@ -163,7 +168,6 @@ namespace llarp
if (not m_udp->listen(m_ourAddr))
return false;
m_Loop->add_ticker([this] { Pump(); });
return true;
}
@ -247,6 +251,7 @@ namespace llarp
}
m_AuthedLinks.emplace(pk, itr->second);
itr = m_Pending.erase(itr);
m_Router->TriggerPump();
return true;
}
return false;
@ -345,7 +350,8 @@ namespace llarp
{
// Tie the lifetime of this repeater to this arbitrary shared_ptr:
m_repeater_keepalive = std::make_shared<int>(0);
m_Loop->call_every(LINK_LAYER_TICK_INTERVAL, m_repeater_keepalive, [this] { Tick(Now()); });
m_Router->loop()->call_every(
LINK_LAYER_TICK_INTERVAL, m_repeater_keepalive, [this] { Tick(Now()); });
return true;
}
@ -402,8 +408,7 @@ namespace llarp
Lock_t l(m_AuthedLinksMutex);
RouterID r = remote;
llarp::LogInfo("Closing all to ", r);
auto [itr, end] = m_AuthedLinks.equal_range(r);
while (itr != end)
for (auto [itr, end] = m_AuthedLinks.equal_range(r); itr != end;)
{
itr->second->Close();
m_RecentlyClosed.emplace(itr->second->GetRemoteEndpoint(), now + CloseGraceWindow);

@ -85,14 +85,11 @@ namespace llarp
SessionClosedHandler closed,
PumpDoneHandler pumpDone,
WorkerFunc_t doWork);
virtual ~ILinkLayer();
virtual ~ILinkLayer() = default;
/// get current time via event loop
llarp_time_t
Now() const
{
return m_Loop->time_now();
}
Now() const;
bool
HasSessionTo(const RouterID& pk);
@ -108,7 +105,7 @@ namespace llarp
SendTo_LL(const SockAddr& to, const llarp_buffer_t& pkt);
virtual bool
Configure(EventLoop_ptr loop, const std::string& ifname, int af, uint16_t port);
Configure(AbstractRouter* loop, const std::string& ifname, int af, uint16_t port);
virtual std::shared_ptr<ILinkSession>
NewOutboundSession(const RouterContact& rc, const AddressInfo& ai) = 0;
@ -225,6 +222,13 @@ namespace llarp
std::optional<int>
GetUDPFD() const;
// Gets a pointer to the router owning us.
AbstractRouter*
Router() const
{
return m_Router;
}
private:
const SecretKey& m_RouterEncSecret;
@ -239,7 +243,7 @@ namespace llarp
bool
PutSession(const std::shared_ptr<ILinkSession>& s);
EventLoop_ptr m_Loop;
AbstractRouter* m_Router;
SockAddr m_ourAddr;
std::shared_ptr<llarp::UDPHandle> m_udp;
SecretKey m_SecretKey;

@ -42,7 +42,7 @@ namespace llarp
virtual void
OnLinkEstablished(ILinkLayer*){};
/// called every event loop tick
/// called during pumping
virtual void
Pump() = 0;
@ -130,5 +130,8 @@ namespace llarp
virtual util::StatusObject
ExtractStatus() const = 0;
virtual void
HandlePlaintext() = 0;
};
} // namespace llarp

@ -153,18 +153,18 @@ struct lokinet_srv_lookup_private
auto lock = ctx->acquire();
if (ctx->impl and ctx->impl->IsUp())
{
ctx->impl->CallSafe([host, service, &promise, ctx, self = this]() {
ctx->impl->CallSafe([host, service, &promise, ctx, this]() {
auto ep = ctx->endpoint();
if (ep == nullptr)
{
promise.set_value(ENOTSUP);
return;
}
ep->LookupServiceAsync(host, service, [self, &promise, host](auto results) {
ep->LookupServiceAsync(host, service, [this, &promise, host](auto results) {
for (const auto& result : results)
{
if (auto maybe = SRVFromData(result, host))
self->results.emplace_back(*maybe);
this->results.emplace_back(*maybe);
}
promise.set_value(0);
});

@ -31,12 +31,12 @@ namespace llarp
uint64_t _status,
HopHandler_ptr _hop,
AbstractRouter* _router,
const PathID_t& pathid)
PathID_t pathid)
: frames{std::move(_frames)}
, status{_status}
, hop{std::move(_hop)}
, router{_router}
, pathid{pathid}
, pathid{std::move(pathid)}
{}
~LRSM_AsyncHandler() = default;
@ -51,7 +51,7 @@ namespace llarp
void
queue_handle()
{
auto func = std::bind(&llarp::LRSM_AsyncHandler::handle, shared_from_this());
auto func = [self = shared_from_this()] { self->handle(); };
router->QueueWork(func);
}
};

@ -52,6 +52,14 @@ namespace llarp
return IsBogon(addr) or IsBogon(HighestAddr());
}
/// return true if we intersect with a bogon range *and* we contain the given address
template <typename Addr>
bool
BogonContains(Addr&& addr) const
{
return BogonRange() and Contains(std::forward<Addr>(addr));
}
/// return the number of bits set in the hostmask
constexpr int
HostmaskBits() const

@ -9,14 +9,11 @@ namespace llarp
bool
IHopHandler::HandleUpstream(const llarp_buffer_t& X, const TunnelNonce& Y, AbstractRouter* r)
{
if (m_UpstreamQueue == nullptr)
m_UpstreamQueue = std::make_shared<TrafficQueue_t>();
m_UpstreamQueue->emplace_back();
auto& pkt = m_UpstreamQueue->back();
auto& pkt = m_UpstreamQueue.emplace_back();
pkt.first.resize(X.sz);
std::copy_n(X.base, X.sz, pkt.first.begin());
pkt.second = Y;
r->loop()->wakeup();
r->TriggerPump();
return true;
}
@ -24,14 +21,11 @@ namespace llarp
bool
IHopHandler::HandleDownstream(const llarp_buffer_t& X, const TunnelNonce& Y, AbstractRouter* r)
{
if (m_DownstreamQueue == nullptr)
m_DownstreamQueue = std::make_shared<TrafficQueue_t>();
m_DownstreamQueue->emplace_back();
auto& pkt = m_DownstreamQueue->back();
auto& pkt = m_DownstreamQueue.emplace_back();
pkt.first.resize(X.sz);
std::copy_n(X.base, X.sz, pkt.first.begin());
pkt.second = Y;
r->loop()->wakeup();
r->TriggerPump();
return true;
}

@ -26,7 +26,6 @@ namespace llarp
{
using TrafficEvent_t = std::pair<std::vector<byte_t>, TunnelNonce>;
using TrafficQueue_t = std::list<TrafficEvent_t>;
using TrafficQueue_ptr = std::shared_ptr<TrafficQueue_t>;
virtual ~IHopHandler() = default;
@ -74,16 +73,16 @@ namespace llarp
protected:
uint64_t m_SequenceNum = 0;
TrafficQueue_ptr m_UpstreamQueue;
TrafficQueue_ptr m_DownstreamQueue;
TrafficQueue_t m_UpstreamQueue;
TrafficQueue_t m_DownstreamQueue;
util::DecayingHashSet<TunnelNonce> m_UpstreamReplayFilter;
util::DecayingHashSet<TunnelNonce> m_DownstreamReplayFilter;
virtual void
UpstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) = 0;
UpstreamWork(TrafficQueue_t queue, AbstractRouter* r) = 0;
virtual void
DownstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) = 0;
DownstreamWork(TrafficQueue_t queue, AbstractRouter* r) = 0;
virtual void
HandleAllUpstream(std::vector<RelayUpstreamMessage> msgs, AbstractRouter* r) = 0;

@ -29,7 +29,7 @@ namespace llarp
std::weak_ptr<PathSet> pathset,
PathRole startingRoles,
std::string shortName)
: m_PathSet{pathset}, _role{startingRoles}, m_shortName{std::move(shortName)}
: m_PathSet{std::move(pathset)}, _role{startingRoles}, m_shortName{std::move(shortName)}
{
hops.resize(h.size());
@ -488,15 +488,15 @@ namespace llarp
LogDebug("failed to send upstream to ", Upstream());
}
}
r->linkManager().PumpLinks();
r->TriggerPump();
}
void
Path::UpstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r)
Path::UpstreamWork(TrafficQueue_t msgs, AbstractRouter* r)
{
std::vector<RelayUpstreamMessage> sendmsgs(msgs->size());
std::vector<RelayUpstreamMessage> sendmsgs(msgs.size());
size_t idx = 0;
for (auto& ev : *msgs)
for (auto& ev : msgs)
{
const llarp_buffer_t buf(ev.first);
TunnelNonce n = ev.second;
@ -519,24 +519,22 @@ namespace llarp
void
Path::FlushUpstream(AbstractRouter* r)
{
if (m_UpstreamQueue && not m_UpstreamQueue->empty())
if (not m_UpstreamQueue.empty())
{
TrafficQueue_ptr data = nullptr;
std::swap(m_UpstreamQueue, data);
r->QueueWork(
[self = shared_from_this(), data, r]() { self->UpstreamWork(std::move(data), r); });
r->QueueWork([self = shared_from_this(),
data = std::exchange(m_UpstreamQueue, {}),
r]() mutable { self->UpstreamWork(std::move(data), r); });
}
}
void
Path::FlushDownstream(AbstractRouter* r)
{
if (m_DownstreamQueue && not m_DownstreamQueue->empty())
if (not m_DownstreamQueue.empty())
{
TrafficQueue_ptr data = nullptr;
std::swap(m_DownstreamQueue, data);
r->QueueWork(
[self = shared_from_this(), data, r]() { self->DownstreamWork(std::move(data), r); });
r->QueueWork([self = shared_from_this(),
data = std::exchange(m_DownstreamQueue, {}),
r]() mutable { self->DownstreamWork(std::move(data), r); });
}
}
@ -570,11 +568,11 @@ namespace llarp
}
void
Path::DownstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r)
Path::DownstreamWork(TrafficQueue_t msgs, AbstractRouter* r)
{
std::vector<RelayDownstreamMessage> sendMsgs(msgs->size());
std::vector<RelayDownstreamMessage> sendMsgs(msgs.size());
size_t idx = 0;
for (auto& ev : *msgs)
for (auto& ev : msgs)
{
const llarp_buffer_t buf(ev.first);
sendMsgs[idx].Y = ev.second;
@ -600,7 +598,7 @@ namespace llarp
m_RXRate += buf.sz;
if (HandleRoutingMessage(buf, r))
{
r->loop()->wakeup();
r->TriggerPump();
m_LastRecvMessage = r->Now();
}
}

@ -388,10 +388,10 @@ namespace llarp
protected:
void
UpstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) override;
UpstreamWork(TrafficQueue_t queue, AbstractRouter* r) override;
void
DownstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) override;
DownstreamWork(TrafficQueue_t queue, AbstractRouter* r) override;
void
HandleAllUpstream(std::vector<RelayUpstreamMessage> msgs, AbstractRouter* r) override;

@ -104,7 +104,7 @@ namespace llarp
}
void
TransitHop::DownstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r)
TransitHop::DownstreamWork(TrafficQueue_t msgs, AbstractRouter* r)
{
auto flushIt = [self = shared_from_this(), r]() {
std::vector<RelayDownstreamMessage> msgs;
@ -114,7 +114,7 @@ namespace llarp
}
self->HandleAllDownstream(std::move(msgs), r);
};
for (auto& ev : *msgs)
for (auto& ev : msgs)
{
RelayDownstreamMessage msg;
const llarp_buffer_t buf(ev.first);
@ -140,17 +140,9 @@ namespace llarp
}
void
TransitHop::UpstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r)
TransitHop::UpstreamWork(TrafficQueue_t msgs, AbstractRouter* r)
{
auto flushIt = [self = shared_from_this(), r]() {
std::vector<RelayUpstreamMessage> msgs;
while (auto maybe = self->m_UpstreamGather.tryPopFront())
{
msgs.push_back(*maybe);
}
self->HandleAllUpstream(std::move(msgs), r);
};
for (auto& ev : *msgs)
for (auto& ev : msgs)
{
const llarp_buffer_t buf(ev.first);
RelayUpstreamMessage msg;
@ -158,14 +150,19 @@ namespace llarp
msg.pathid = info.txID;
msg.Y = ev.second ^ nonceXOR;
msg.X = buf;
if (m_UpstreamGather.full())
if (m_UpstreamGather.tryPushBack(msg) != thread::QueueReturn::Success)
break;
}
// Flush it:
r->loop()->call([self = shared_from_this(), r] {
std::vector<RelayUpstreamMessage> msgs;
while (auto maybe = self->m_UpstreamGather.tryPopFront())
{
r->loop()->call(flushIt);
msgs.push_back(*maybe);
}
if (m_UpstreamGather.enabled())
m_UpstreamGather.pushBack(msg);
}
r->loop()->call(flushIt);
self->HandleAllUpstream(std::move(msgs), r);
});
}
void
@ -188,7 +185,6 @@ namespace llarp
other->FlushDownstream(r);
}
m_FlushOthers.clear();
r->loop()->wakeup();
}
else
{
@ -203,8 +199,8 @@ namespace llarp
info.upstream);
r->SendToOrQueue(info.upstream, msg);
}
r->linkManager().PumpLinks();
}
r->TriggerPump();
}
void
@ -221,31 +217,29 @@ namespace llarp
info.downstream);
r->SendToOrQueue(info.downstream, msg);
}
r->linkManager().PumpLinks();
r->TriggerPump();
}
void
TransitHop::FlushUpstream(AbstractRouter* r)
{
if (m_UpstreamQueue && not m_UpstreamQueue->empty())
if (not m_UpstreamQueue.empty())
{
r->QueueWork([self = shared_from_this(), data = std::move(m_UpstreamQueue), r]() mutable {
self->UpstreamWork(std::move(data), r);
});
r->QueueWork([self = shared_from_this(),
data = std::exchange(m_UpstreamQueue, {}),
r]() mutable { self->UpstreamWork(std::move(data), r); });
}
m_UpstreamQueue = nullptr;
}
void
TransitHop::FlushDownstream(AbstractRouter* r)
{
if (m_DownstreamQueue && not m_DownstreamQueue->empty())
if (not m_DownstreamQueue.empty())
{
r->QueueWork([self = shared_from_this(), data = std::move(m_DownstreamQueue), r]() mutable {
self->DownstreamWork(std::move(data), r);
});
r->QueueWork([self = shared_from_this(),
data = std::exchange(m_DownstreamQueue, {}),
r]() mutable { self->DownstreamWork(std::move(data), r); });
}
m_DownstreamQueue = nullptr;
}
/// this is where a DHT message is handled at the end of a path, that is,

@ -190,10 +190,10 @@ namespace llarp
protected:
void
UpstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) override;
UpstreamWork(TrafficQueue_t queue, AbstractRouter* r) override;
void
DownstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) override;
DownstreamWork(TrafficQueue_t queue, AbstractRouter* r) override;
void
HandleAllUpstream(std::vector<RelayUpstreamMessage> msgs, AbstractRouter* r) override;

@ -210,9 +210,9 @@ namespace llarp
virtual void
Die() = 0;
/// pump low level links
/// Trigger a pump of low level links. Idempotent.
virtual void
PumpLL() = 0;
TriggerPump() = 0;
virtual bool
IsBootstrapNode(RouterID r) const = 0;

@ -35,7 +35,7 @@ namespace llarp
QueueMessage(const RouterID& remote, const ILinkMessage& msg, SendStatusHandler callback) = 0;
virtual void
Tick() = 0;
Pump() = 0;
virtual void
RemovePath(const PathID_t& pathid) = 0;

@ -1,9 +1,7 @@
#include "outbound_message_handler.hpp"
#include <llarp/messages/link_message.hpp>
#include "i_outbound_session_maker.hpp"
#include "i_rc_lookup_handler.hpp"
#include <llarp/link/i_link_manager.hpp>
#include "router.hpp"
#include <llarp/constants/link_layer.hpp>
#include <llarp/util/meta/memfn.hpp>
#include <llarp/util/status.hpp>
@ -26,7 +24,8 @@ namespace llarp
const RouterID& remote, const ILinkMessage& msg, SendStatusHandler callback)
{
// if the destination is invalid, callback with failure and return
if (not _linkManager->SessionIsClient(remote) and not _lookupHandler->SessionIsAllowed(remote))
if (not _router->linkManager().SessionIsClient(remote)
and not _router->rcLookupHandler().SessionIsAllowed(remote))
{
DoCallback(callback, SendStatus::InvalidRouter);
return true;
@ -47,7 +46,7 @@ namespace llarp
std::copy_n(buf.base, buf.sz, message.first.data());
// if we have a session to the destination, queue the message and return
if (_linkManager->HasSessionTo(remote))
if (_router->linkManager().HasSessionTo(remote))
{
QueueOutboundMessage(remote, std::move(message), msg.pathid, priority);
return true;
@ -82,12 +81,13 @@ namespace llarp
}
void
OutboundMessageHandler::Tick()
OutboundMessageHandler::Pump()
{
m_Killer.TryAccess([this]() {
recentlyRemovedPaths.Decay();
ProcessOutboundQueue();
SendRoundRobin();
if (/*bool more = */ SendRoundRobin())
_router->TriggerPump();
});
}
@ -127,13 +127,9 @@ namespace llarp
}
void
OutboundMessageHandler::Init(
ILinkManager* linkManager, I_RCLookupHandler* lookupHandler, EventLoop_ptr loop)
OutboundMessageHandler::Init(AbstractRouter* router)
{
_linkManager = linkManager;
_lookupHandler = lookupHandler;
_loop = std::move(loop);
_router = router;
outboundMessageQueues.emplace(zeroID, MessageQueue());
}
@ -168,14 +164,14 @@ namespace llarp
OutboundMessageHandler::DoCallback(SendStatusHandler callback, SendStatus status)
{
if (callback)
_loop->call([f = std::move(callback), status] { f(status); });
_router->loop()->call([f = std::move(callback), status] { f(status); });
}
void
OutboundMessageHandler::QueueSessionCreation(const RouterID& remote)
{
auto fn = util::memFn(&OutboundMessageHandler::OnSessionResult, this);
_linkManager->GetSessionMaker()->CreateSessionTo(remote, fn);
_router->linkManager().GetSessionMaker()->CreateSessionTo(remote, fn);
}
bool
@ -199,7 +195,7 @@ namespace llarp
const llarp_buffer_t buf(msg.first);
auto callback = msg.second;
m_queueStats.sent++;
return _linkManager->SendTo(remote, buf, [=](ILinkSession::DeliveryStatus status) {
return _router->linkManager().SendTo(remote, buf, [=](ILinkSession::DeliveryStatus status) {
if (status == ILinkSession::DeliveryStatus::eDeliverySuccess)
DoCallback(callback, SendStatus::Success);
else
@ -212,7 +208,7 @@ namespace llarp
bool
OutboundMessageHandler::SendIfSession(const RouterID& remote, const Message& msg)
{
if (_linkManager->HasSessionTo(remote))
if (_router->linkManager().HasSessionTo(remote))
{
return Send(remote, msg);
}
@ -258,7 +254,7 @@ namespace llarp
// so check here if the pathid was recently removed.
if (recentlyRemovedPaths.Contains(entry.pathid))
{
return;
continue;
}
auto [queue_itr, is_new] = outboundMessageQueues.emplace(entry.pathid, MessageQueue());
@ -282,7 +278,7 @@ namespace llarp
}
}
void
bool
OutboundMessageHandler::SendRoundRobin()
{
m_queueStats.numTicks++;
@ -296,7 +292,6 @@ namespace llarp
routing_mq.pop();
}
size_t empty_count = 0;
size_t num_queues = roundRobinOrder.size();
// if any paths have been removed since last tick, remove any stale
@ -317,16 +312,16 @@ namespace llarp
removedSomePaths = false;
num_queues = roundRobinOrder.size();
size_t sent_count = 0;
if (num_queues == 0) // if no queues, return
if (num_queues == 0)
{
return;
return false;
}
// send messages for each pathid in roundRobinOrder, stopping when
// either every path's queue is empty or a set maximum amount of
// messages have been sent.
while (sent_count < MAX_OUTBOUND_MESSAGES_PER_TICK)
size_t consecutive_empty = 0;
for (size_t sent_count = 0; sent_count < MAX_OUTBOUND_MESSAGES_PER_TICK;)
{
PathID_t pathid = std::move(roundRobinOrder.front());
roundRobinOrder.pop();
@ -339,24 +334,26 @@ namespace llarp
Send(entry.router, entry.message);
message_queue.pop();
empty_count = 0;
sent_count++;
consecutive_empty = 0;
consecutive_empty++;
}
else
{
empty_count++;
consecutive_empty++;
}
roundRobinOrder.push(std::move(pathid));
// if num_queues empty queues in a row, all queues empty.
if (empty_count == num_queues)
if (consecutive_empty == num_queues)
{
break;
}
}
m_queueStats.perTickMax = std::max((uint32_t)sent_count, m_queueStats.perTickMax);
m_queueStats.perTickMax = std::max((uint32_t)consecutive_empty, m_queueStats.perTickMax);
return consecutive_empty != num_queues;
}
void

@ -17,8 +17,7 @@ struct llarp_buffer_t;
namespace llarp
{
struct ILinkManager;
struct I_RCLookupHandler;
struct AbstractRouter;
enum class SessionResult;
struct OutboundMessageHandler final : public IOutboundMessageHandler
@ -35,9 +34,9 @@ namespace llarp
* router, one is created.
*
* If there is a session to the destination router, the message is placed on the shared
* outbound message queue to be processed on Tick().
* outbound message queue to be processed on Pump().
*
* When this class' Tick() is called, that queue is emptied and the messages there
* When this class' Pump() is called, that queue is emptied and the messages there
* are placed in their paths' respective individual queues.
*
* Returns false if encoding the message into a buffer fails, true otherwise.
@ -48,7 +47,7 @@ namespace llarp
QueueMessage(const RouterID& remote, const ILinkMessage& msg, SendStatusHandler callback)
override EXCLUDES(_mutex);
/* Called once per event loop tick.
/* Called when pumping output queues, typically scheduled via a call to Router::TriggerPump().
*
* Processes messages on the shared message queue into their paths' respective
* individual queues.
@ -60,7 +59,7 @@ namespace llarp
* Sends messages from path queues until all are empty or a set cap has been reached.
*/
void
Tick() override;
Pump() override;
/* Called from outside this class to inform it that a path has died / expired
* and its queue should be discarded.
@ -72,7 +71,7 @@ namespace llarp
ExtractStatus() const override;
void
Init(ILinkManager* linkManager, I_RCLookupHandler* lookupHandler, EventLoop_ptr loop);
Init(AbstractRouter* router);
private:
using Message = std::pair<std::vector<byte_t>, SendStatusHandler>;
@ -146,7 +145,7 @@ namespace llarp
* If the queue is full, the message is dropped and the message's status
* callback is invoked with a congestion status.
*
* When this class' Tick() is called, that queue is emptied and the messages there
* When this class' Pump() is called, that queue is emptied and the messages there
* are placed in their paths' respective individual queues.
*/
bool
@ -160,14 +159,17 @@ namespace llarp
ProcessOutboundQueue();
/*
* Sends all routing messages that have been queued, indicated by pathid 0 when queued.
* Sends routing messages that have been queued, indicated by pathid 0 when queued.
*
* Sends messages from path queues until all are empty or a set cap has been reached.
* This will send one message from each queue in a round-robin fashion such that they
* all have roughly equal access to bandwidth. A notion of priority may be introduced
* at a later time, but for now only routing messages get priority.
*
* Returns true if there is more to send (i.e. we hit the limit before emptying all path
* queues), false if all queues were drained.
*/
void
bool
SendRoundRobin();
/* Invoked when an outbound session establish attempt has concluded.
@ -193,9 +195,7 @@ namespace llarp
std::queue<PathID_t> roundRobinOrder;
ILinkManager* _linkManager;
I_RCLookupHandler* _lookupHandler;
EventLoop_ptr _loop;
AbstractRouter* _router;
util::ContentionKiller m_Killer;

@ -94,8 +94,7 @@ namespace llarp
if (shouldDoLookup)
{
auto fn =
std::bind(&RCLookupHandler::HandleDHTLookupResult, this, router, std::placeholders::_1);
auto fn = [this, router](const auto& res) { HandleDHTLookupResult(router, res); };
// if we are a client try using the hidden service endpoints
if (!isServiceNode)
@ -232,7 +231,7 @@ namespace llarp
if (!SessionIsAllowed(newrc.pubkey))
return false;
auto func = std::bind(&RCLookupHandler::CheckRC, this, newrc);
auto func = [this, newrc] { CheckRC(newrc); };
_work(func);
// update dht if required

@ -69,6 +69,7 @@ namespace llarp
_running.store(false);
_lastTick = llarp::time_now_ms();
m_NextExploreAt = Clock_t::now();
m_Pump = _loop->make_waker([this]() { PumpLL(); });
}
Router::~Router()
@ -76,6 +77,20 @@ namespace llarp
llarp_dht_context_free(_dht);
}
void
Router::PumpLL()
{
llarp::LogTrace("Router::PumpLL() start");
if (_stopping.load())
return;
paths.PumpDownstream();
paths.PumpUpstream();
_hiddenServiceContext.Pump();
_outboundMessageHandler.Pump();
_linkManager.PumpLinks();
llarp::LogTrace("Router::PumpLL() end");
}
util::StatusObject
Router::ExtractStatus() const
{
@ -90,10 +105,7 @@ namespace llarp
{"links", _linkManager.ExtractStatus()},
{"outboundMessages", _outboundMessageHandler.ExtractStatus()}};
}
else
{
return util::StatusObject{{"running", false}};
}
return util::StatusObject{{"running", false}};
}
util::StatusObject
@ -243,16 +255,9 @@ namespace llarp
}
void
Router::PumpLL()
Router::TriggerPump()
{
llarp::LogTrace("Router::PumpLL() start");
if (_stopping.load())
return;
paths.PumpDownstream();
paths.PumpUpstream();
_outboundMessageHandler.Tick();
_linkManager.PumpLinks();
llarp::LogTrace("Router::PumpLL() end");
m_Pump->Trigger();
}
bool
@ -652,7 +657,7 @@ namespace llarp
LogInfo("Loaded ", bootstrapRCList.size(), " bootstrap routers");
// Init components after relevant config settings loaded
_outboundMessageHandler.Init(&_linkManager, &_rcLookupHandler, _loop);
_outboundMessageHandler.Init(this);
_outboundSessionMaker.Init(
this,
&_linkManager,
@ -701,13 +706,13 @@ namespace llarp
util::memFn(&AbstractRouter::CheckRenegotiateValid, this),
util::memFn(&Router::ConnectionTimedOut, this),
util::memFn(&AbstractRouter::SessionClosed, this),
util::memFn(&AbstractRouter::PumpLL, this),
util::memFn(&AbstractRouter::TriggerPump, this),
util::memFn(&AbstractRouter::QueueWork, this));
const std::string& key = serverConfig.interface;
int af = serverConfig.addressFamily;
uint16_t port = serverConfig.port;
if (!server->Configure(loop(), key, af, port))
if (!server->Configure(this, key, af, port))
{
throw std::runtime_error(stringify("failed to bind inbound link on ", key, " port ", port));
}
@ -1239,8 +1244,6 @@ namespace llarp
#ifdef _WIN32
// windows uses proactor event loop so we need to constantly pump
_loop->add_ticker([this] { PumpLL(); });
#else
_loop->set_pump_function([this] { PumpLL(); });
#endif
_loop->call_every(ROUTER_TICK_INTERVAL, weak_from_this(), [this] { Tick(); });
_running.store(true);
@ -1464,10 +1467,7 @@ namespace llarp
void
Router::QueueWork(std::function<void(void)> func)
{
if (m_isServiceNode)
_loop->call_soon(std::move(func));
else
m_lmq->job(std::move(func));
m_lmq->job(std::move(func));
}
void
@ -1507,7 +1507,7 @@ namespace llarp
util::memFn(&AbstractRouter::CheckRenegotiateValid, this),
util::memFn(&Router::ConnectionTimedOut, this),
util::memFn(&AbstractRouter::SessionClosed, this),
util::memFn(&AbstractRouter::PumpLL, this),
util::memFn(&AbstractRouter::TriggerPump, this),
util::memFn(&AbstractRouter::QueueWork, this));
if (!link)
@ -1515,7 +1515,7 @@ namespace llarp
for (const auto af : {AF_INET, AF_INET6})
{
if (not link->Configure(loop(), "*", af, m_OutboundPort))
if (not link->Configure(this, "*", af, m_OutboundPort))
continue;
#if defined(ANDROID)

@ -76,6 +76,8 @@ namespace llarp
path::BuildLimiter m_PathBuildLimiter;
std::shared_ptr<EventLoopWakeup> m_Pump;
path::BuildLimiter&
pathBuildLimiter() override
{
@ -282,7 +284,10 @@ namespace llarp
RoutePoker m_RoutePoker;
void
PumpLL() override;
TriggerPump() override;
void
PumpLL();
const oxenmq::address DefaultRPCBindAddr = oxenmq::address::tcp("127.0.0.1", 1190);
bool enableRPCServer = false;

@ -56,7 +56,7 @@ namespace llarp
Clear();
size_t idx = 0;
if (not bencode_read_list(
[self = this, &idx](llarp_buffer_t* buffer, bool has) {
[this, &idx](llarp_buffer_t* buffer, bool has) {
if (has)
{
uint64_t i;
@ -65,14 +65,14 @@ namespace llarp
uint64_t val = -1;
if (not bencode_read_integer(buffer, &val))
return false;
self->m_ProtoVersion = val;
m_ProtoVersion = val;
}
else if (bencode_read_integer(buffer, &i))
{
// prevent overflow (note that idx includes version too)
if (idx > self->m_Version.max_size())
if (idx > m_Version.max_size())
return false;
self->m_Version[idx - 1] = i;
m_Version[idx - 1] = i;
}
else
return false;

@ -80,6 +80,14 @@ namespace llarp
}
}
void
Context::Pump()
{
auto now = time_now_ms();
for (auto& [name, endpoint] : m_Endpoints)
endpoint->Pump(now);
}
bool
Context::RemoveEndpoint(const std::string& name)
{

@ -36,6 +36,10 @@ namespace llarp
void
ForEachService(std::function<bool(const std::string&, const Endpoint_ptr&)> visit) const;
/// Pumps the hidden service endpoints, called during Router::PumpLL
void
Pump();
/// add endpoint via config
void
AddEndpoint(const Config& conf, bool autostart = false);

@ -1079,24 +1079,18 @@ namespace llarp
void
Endpoint::FlushRecvData()
{
do
while (auto maybe = m_RecvQueue.tryPopFront())
{
auto maybe = m_RecvQueue.tryPopFront();
if (not maybe)
return;
auto ev = std::move(*maybe);
auto& ev = *maybe;
ProtocolMessage::ProcessAsync(ev.fromPath, ev.pathid, ev.msg);
} while (true);
}
}
void
Endpoint::QueueRecvData(RecvDataEvent ev)
{
if (m_RecvQueue.full() or m_RecvQueue.empty())
{
m_router->loop()->call_soon([this] { FlushRecvData(); });
}
m_RecvQueue.tryPushBack(std::move(ev));
Router()->TriggerPump();
}
bool
@ -1167,6 +1161,7 @@ namespace llarp
|| (msg->proto == ProtocolType::QUIC and m_quic))
{
m_InboundTrafficQueue.tryPushBack(std::move(msg));
Router()->TriggerPump();
return true;
}
if (msg->proto == ProtocolType::Control)
@ -1587,7 +1582,7 @@ namespace llarp
if (*ptr == m_Identity.pub.Addr())
{
ConvoTagTX(tag);
Loop()->wakeup();
m_state->m_Router->TriggerPump();
if (not HandleInboundPacket(tag, pkt, t, 0))
return false;
ConvoTagRX(tag);
@ -1596,7 +1591,6 @@ namespace llarp
}
if (not SendToOrQueue(*maybe, pkt, t))
return false;
Loop()->wakeup();
return true;
}
LogDebug("SendToOrQueue failed: no endpoint for convo tag ", tag);
@ -1610,16 +1604,19 @@ namespace llarp
auto pkt = std::make_shared<net::IPPacket>();
if (!pkt->Load(buf))
return false;
EnsurePathToSNode(addr, [=](RouterID, exit::BaseSession_ptr s, ConvoTag) {
if (s)
{
s->SendPacketToRemote(pkt->ConstBuffer(), t);
}
});
EnsurePathToSNode(
addr, [this, t, pkt = std::move(pkt)](RouterID, exit::BaseSession_ptr s, ConvoTag) {
if (s)
{
s->SendPacketToRemote(pkt->ConstBuffer(), t);
Router()->TriggerPump();
}
});
return true;
}
void Endpoint::Pump(llarp_time_t)
void
Endpoint::Pump(llarp_time_t now)
{
FlushRecvData();
// send downstream packets to user for snode
@ -1658,7 +1655,10 @@ namespace llarp
auto router = Router();
// TODO: locking on this container
for (const auto& [addr, outctx] : m_state->m_RemoteSessions)
{
outctx->FlushUpstream();
outctx->Pump(now);
}
// TODO: locking on this container
for (const auto& [router, session] : m_state->m_SNodeSessions)
session->FlushUpstream();
@ -1673,7 +1673,6 @@ namespace llarp
}
UpstreamFlush(router);
router->linkManager().PumpLinks();
}
std::optional<ConvoTag>
@ -1877,14 +1876,14 @@ namespace llarp
f.S = m->seqno;
f.F = p->intro.pathID;
transfer->P = replyIntro.pathID;
auto self = this;
Router()->QueueWork([transfer, p, m, K, self]() {
if (not transfer->T.EncryptAndSign(*m, K, self->m_Identity))
Router()->QueueWork([transfer, p, m, K, this]() {
if (not transfer->T.EncryptAndSign(*m, K, m_Identity))
{
LogError("failed to encrypt and sign for sessionn T=", transfer->T.T);
return;
}
self->m_SendQueue.tryPushBack(SendEvent_t{transfer, p});
m_SendQueue.tryPushBack(SendEvent_t{transfer, p});
Router()->TriggerPump();
});
return true;
}
@ -1926,10 +1925,10 @@ namespace llarp
traffic[remote].emplace_back(data, t);
EnsurePathToService(
remote,
[self = this](Address addr, OutboundContext* ctx) {
[this](Address addr, OutboundContext* ctx) {
if (ctx)
{
for (auto& pending : self->m_state->m_PendingTraffic[addr])
for (auto& pending : m_state->m_PendingTraffic[addr])
{
ctx->AsyncEncryptAndSendTo(pending.Buffer(), pending.protocol);
}
@ -1938,7 +1937,7 @@ namespace llarp
{
LogWarn("no path made to ", addr);
}
self->m_state->m_PendingTraffic.erase(addr);
m_state->m_PendingTraffic.erase(addr);
},
PathAlignmentTimeout());
return true;

@ -21,19 +21,20 @@ namespace llarp
, m_Endpoint(ep)
, createdAt(ep->Now())
, m_SendQueue(SendContextQueueSize)
{
m_FlushWakeup = ep->Loop()->make_waker([this] { FlushUpstream(); });
}
{}
bool
SendContext::Send(std::shared_ptr<ProtocolFrame> msg, path::Path_ptr path)
{
if (not path->IsReady())
return false;
m_FlushWakeup->Trigger();
return m_SendQueue.tryPushBack(std::make_pair(
std::make_shared<routing::PathTransferMessage>(*msg, remoteIntro.pathID), path))
== thread::QueueReturn::Success;
if (path->IsReady()
and m_SendQueue.tryPushBack(std::make_pair(
std::make_shared<routing::PathTransferMessage>(*msg, remoteIntro.pathID), path))
== thread::QueueReturn::Success)
{
m_Endpoint->Router()->TriggerPump();
return true;
}
return false;
}
void
@ -42,23 +43,18 @@ namespace llarp
auto r = m_Endpoint->Router();
std::unordered_set<path::Path_ptr, path::Path::Ptr_Hash> flushpaths;
auto rttRMS = 0ms;
while (auto maybe = m_SendQueue.tryPopFront())
{
do
auto& [msg, path] = *maybe;
msg->S = path->NextSeqNo();
if (path->SendRoutingMessage(*msg, r))
{
auto maybe = m_SendQueue.tryPopFront();
if (not maybe)
break;
auto& item = *maybe;
item.first->S = item.second->NextSeqNo();
if (item.second->SendRoutingMessage(*item.first, r))
{
lastGoodSend = r->Now();
flushpaths.emplace(item.second);
m_Endpoint->ConvoTagTX(item.first->T.T);
const auto rtt = (item.second->intro.latency + remoteIntro.latency) * 2;
rttRMS += rtt * rtt.count();
}
} while (not m_SendQueue.empty());
lastGoodSend = r->Now();
flushpaths.emplace(path);
m_Endpoint->ConvoTagTX(msg->T.T);
const auto rtt = (path->intro.latency + remoteIntro.latency) * 2;
rttRMS += rtt * rtt.count();
}
}
// flush the select path's upstream
for (const auto& path : flushpaths)

@ -56,8 +56,6 @@ namespace llarp
std::function<void(AuthResult)> authResultListener;
std::shared_ptr<EventLoopWakeup> m_FlushWakeup;
virtual bool
ShiftIntroduction(bool rebuild = true)
{

@ -22,7 +22,6 @@ add_executable(testAll
crypto/test_llarp_crypto.cpp
crypto/test_llarp_key_manager.cpp
dns/test_llarp_dns_dns.cpp
iwp/test_iwp_session.cpp
net/test_ip_address.cpp
net/test_llarp_net.cpp
net/test_sock_addr.cpp

@ -1,312 +0,0 @@
#include <catch2/catch.hpp>
#include <crypto/crypto.hpp>
#include <crypto/crypto_libsodium.hpp>
#include <string_view>
#include <router_contact.hpp>
#include <iwp/iwp.hpp>
#include <util/meta/memfn.hpp>
#include <messages/link_message_parser.hpp>
#include <messages/discard.hpp>
#include <util/time.hpp>
#include <net/net_if.hpp>
#include "ev/ev.hpp"
#undef LOG_TAG
#define LOG_TAG __FILE__
namespace iwp = llarp::iwp;
namespace util = llarp::util;
/// make an iwp link
template <bool inbound, typename... Args>
static llarp::LinkLayer_ptr
make_link(Args&&... args)
{
if (inbound)
return iwp::NewInboundLink(std::forward<Args>(args)...);
return iwp::NewOutboundLink(std::forward<Args>(args)...);
}
/// a single iwp link with associated keys and members to make unit tests work
struct IWPLinkContext
{
llarp::RouterContact rc;
llarp::IpAddress localAddr;
llarp::LinkLayer_ptr link;
std::shared_ptr<llarp::KeyManager> keyManager;
llarp::LinkMessageParser m_Parser;
llarp::EventLoop_ptr m_Loop;
/// is the test done on this context ?
bool gucci = false;
IWPLinkContext(std::string_view addr, llarp::EventLoop_ptr loop)
: localAddr{std::move(addr)}
, keyManager{std::make_shared<llarp::KeyManager>()}
, m_Parser{nullptr}
, m_Loop{std::move(loop)}
{
// generate keys
llarp::CryptoManager::instance()->identity_keygen(keyManager->identityKey);
llarp::CryptoManager::instance()->encryption_keygen(keyManager->encryptionKey);
llarp::CryptoManager::instance()->encryption_keygen(keyManager->transportKey);
// set keys in rc
rc.pubkey = keyManager->identityKey.toPublic();
rc.enckey = keyManager->encryptionKey.toPublic();
}
template <typename Func_t>
void
Call(Func_t work)
{
m_Loop->call_soon(std::move(work));
}
bool
HandleMessage(llarp::ILinkSession* from, const llarp_buffer_t& buf)
{
return m_Parser.ProcessFrom(from, buf);
}
/// initialize link
template <bool inbound>
void
InitLink(std::function<void(llarp::ILinkSession*)> established)
{
link = make_link<inbound>(
keyManager,
m_Loop,
// getrc
[&]() -> const llarp::RouterContact& { return rc; },
// link message handler
util::memFn(&IWPLinkContext::HandleMessage, this),
// sign buffer
[&](llarp::Signature& sig, const llarp_buffer_t& buf) {
REQUIRE(llarp::CryptoManager::instance()->sign(sig, keyManager->identityKey, buf));
return true;
},
// before connect
nullptr,
// established handler
[established](llarp::ILinkSession* s, bool linkIsInbound) {
REQUIRE(s != nullptr);
REQUIRE(inbound == linkIsInbound);
established(s);
return true;
},
// renegotiate handler
[](llarp::RouterContact newrc, llarp::RouterContact oldrc) {
REQUIRE(newrc.pubkey == oldrc.pubkey);
return true;
},
// timeout handler
[&](llarp::ILinkSession*) {
m_Loop->stop();
FAIL("session timeout");
},
// session closed handler
[](llarp::RouterID) {},
// pump done handler
[]() {},
// do work function
[l = m_Loop](llarp::Work_t work) { l->call_soon(work); });
REQUIRE(link->Configure(
m_Loop, llarp::net::LoopbackInterfaceName(), AF_INET, *localAddr.getPort()));
if (inbound)
{
// only add address info on the recipient's rc
rc.addrs.emplace_back();
REQUIRE(link->GetOurAddressInfo(rc.addrs.back()));
}
// sign rc
REQUIRE(rc.Sign(keyManager->identityKey));
REQUIRE(keyManager != nullptr);
}
};
using Context_ptr = std::shared_ptr<IWPLinkContext>;
/// run an iwp unit test after setup
/// call take 2 parameters, test and a timeout
///
/// test is a callable that takes 5 arguments:
/// 0) std::function<EventLoop_ptr(void)> that starts the iwp links and gives an event loop to call with
/// 1) std::function<void(void)> that ends the unit test if we are done
/// 2) std::function<void(void)> that ends the unit test right now as a success
/// 3) client iwp link context (shared_ptr)
/// 4) relay iwp link context (shared_ptr)
///
/// timeout is a std::chrono::duration that tells the driver how long to run the unit test for
/// before it should assume failure of unit test
template <typename Func_t, typename Duration_t = std::chrono::milliseconds>
void
RunIWPTest(Func_t test, Duration_t timeout = 10s)
{
// shut up logs
llarp::LogSilencer shutup;
// set up event loop
auto loop = llarp::EventLoop::create();
llarp::LogContext::Instance().Initialize(
llarp::eLogDebug, llarp::LogType::File, "stdout", "unit test", [loop](auto work) {
loop->call_soon(work);
});
// turn off bogon blocking
auto oldBlockBogons = llarp::RouterContact::BlockBogons;
llarp::RouterContact::BlockBogons = false;
// set up cryptography
llarp::sodium::CryptoLibSodium crypto{};
llarp::CryptoManager manager{&crypto};
// set up client
auto initiator = std::make_shared<IWPLinkContext>("127.0.0.1:3001", loop);
// set up server
auto recipient = std::make_shared<IWPLinkContext>("127.0.0.1:3002", loop);
// function for ending unit test on success
auto endIfDone = [initiator, recipient, loop]() {
if (initiator->gucci and recipient->gucci)
{
loop->stop();
}
};
// function to start test and give loop to unit test
auto start = [initiator, recipient, loop]() {
REQUIRE(initiator->link->Start());
REQUIRE(recipient->link->Start());
return loop;
};
// function to end test immediately
auto endTest = [loop] { loop->stop(); };
loop->call_later(timeout, [] { FAIL("test timeout"); });
test(start, endIfDone, endTest, initiator, recipient);
loop->run();
llarp::RouterContact::BlockBogons = oldBlockBogons;
}
/// ensure clients can connect to relays
TEST_CASE("IWP handshake", "[iwp]")
{
RunIWPTest([](std::function<llarp::EventLoop_ptr(void)> start,
std::function<void(void)> endIfDone,
[[maybe_unused]] std::function<void(void)> endTestNow,
Context_ptr alice,
Context_ptr bob) {
// set up initiator
alice->InitLink<false>([=](auto remote) {
REQUIRE(remote->GetRemoteRC() == bob->rc);
alice->gucci = true;
endIfDone();
});
// set up recipient
bob->InitLink<true>([=](auto remote) {
REQUIRE(remote->GetRemoteRC() == alice->rc);
bob->gucci = true;
endIfDone();
});
// start unit test
auto loop = start();
// try establishing a session
loop->call([link = alice->link, rc = bob->rc]() { REQUIRE(link->TryEstablishTo(rc)); });
});
}
/// ensure relays cannot connect to clients
TEST_CASE("IWP handshake reverse", "[iwp]")
{
RunIWPTest([](std::function<llarp::EventLoop_ptr(void)> start,
[[maybe_unused]] std::function<void(void)> endIfDone,
std::function<void(void)> endTestNow,
Context_ptr alice,
Context_ptr bob) {
alice->InitLink<false>([](auto) {});
bob->InitLink<true>([](auto) {});
// start unit test
auto loop = start();
// try establishing a session in the wrong direction
loop->call([link = bob->link, rc = alice->rc, endTestNow] {
REQUIRE(not link->TryEstablishTo(rc));
endTestNow();
});
});
}
/// ensure iwp can send messages between sessions
TEST_CASE("IWP send messages", "[iwp]")
{
int aliceNumSent = 0;
int bobNumSent = 0;
RunIWPTest([&aliceNumSent, &bobNumSent](std::function<llarp::EventLoop_ptr(void)> start,
std::function<void(void)> endIfDone,
std::function<void(void)> endTestNow,
Context_ptr alice,
Context_ptr bob) {
constexpr int numSend = 64;
// when alice makes a session to bob send `aliceNumSend` messages to him
alice->InitLink<false>([endIfDone, alice, &aliceNumSent](auto session) {
for (auto index = 0; index < numSend; index++)
{
alice->Call([session, endIfDone, alice, &aliceNumSent]() {
// generate a discard message that is 512 bytes long
llarp::DiscardMessage msg;
std::vector<byte_t> msgBuff(512);
llarp_buffer_t buf(msgBuff);
// add random padding
llarp::CryptoManager::instance()->randomize(buf);
// encode the discard message
msg.BEncode(&buf);
// send the message
session->SendMessageBuffer(msgBuff, [endIfDone, alice, &aliceNumSent](auto status) {
if (status == llarp::ILinkSession::DeliveryStatus::eDeliverySuccess)
{
// on successful transmit increment the number we sent
aliceNumSent++;
}
// if we sent all the messages sucessfully we end the unit test
alice->gucci = aliceNumSent == numSend;
endIfDone();
});
});
}
});
bob->InitLink<true>([endIfDone, bob, &bobNumSent](auto session) {
for (auto index = 0; index < numSend; index++)
{
bob->Call([session, endIfDone, bob, &bobNumSent]() {
// generate a discard message that is 512 bytes long
llarp::DiscardMessage msg;
std::vector<byte_t> msgBuff(512);
llarp_buffer_t buf(msgBuff);
// add random padding
llarp::CryptoManager::instance()->randomize(buf);
// encode the discard message
msg.BEncode(&buf);
// send the message
session->SendMessageBuffer(msgBuff, [endIfDone, bob, &bobNumSent](auto status) {
if (status == llarp::ILinkSession::DeliveryStatus::eDeliverySuccess)
{
// on successful transmit increment the number we sent
bobNumSent++;
}
// if we sent all the messages sucessfully we end the unit test
bob->gucci = bobNumSent == numSend;
endIfDone();
});
});
}
});
// start unit test
auto loop = start();
// try establishing a session from alice to bob
loop->call([link = alice->link, rc = bob->rc, endTestNow]() {
REQUIRE(link->TryEstablishTo(rc));
});
});
}
Loading…
Cancel
Save