Triggered pumping

This redoes how/when we pump so that we should only be calling the
idempotent PumpLL() when we actually have (or may have) something to
pump.
pull/1795/head
Jason Rhinelander 3 years ago
parent bfc6d35b33
commit 9113a6b36b

@ -288,9 +288,8 @@ namespace llarp
auto path = PickEstablishedPath(llarp::path::ePathRoleExit);
if (path)
{
for (auto& item : m_Upstream)
for (auto& [i, queue] : m_Upstream)
{
auto& queue = item.second;
while (queue.size())
{
auto& msg = queue.front();
@ -305,8 +304,8 @@ namespace llarp
if (m_Upstream.size())
llarp::LogWarn("no path for exit session");
// discard upstream
for (auto& item : m_Upstream)
item.second.clear();
for (auto& [i, queue] : m_Upstream)
queue.clear();
m_Upstream.clear();
if (numHops == 1)
{

@ -3,6 +3,7 @@
#include <llarp/messages/link_intro.hpp>
#include <llarp/messages/discard.hpp>
#include <llarp/util/meta/memfn.hpp>
#include <llarp/router/abstractrouter.hpp>
namespace llarp
{
@ -136,6 +137,7 @@ namespace llarp
Session::EncryptAndSend(ILinkSession::Packet_t data)
{
m_EncryptNext.emplace_back(std::move(data));
TriggerPump();
if (!IsEstablished())
{
EncryptWorker(std::move(m_EncryptNext));
@ -190,6 +192,7 @@ namespace llarp
const auto bufsz = buf.size();
auto& msg = m_TXMsgs.emplace(msgid, OutboundMessage{msgid, std::move(buf), now, completed})
.first->second;
TriggerPump();
EncryptAndSend(msg.XMIT());
if (bufsz > FragmentSize)
{
@ -225,6 +228,12 @@ namespace llarp
}
}
void
Session::TriggerPump()
{
m_Parent->Router()->PumpLL();
}
void
Session::Pump()
{
@ -233,18 +242,18 @@ namespace llarp
{
if (ShouldPing())
SendKeepAlive();
for (auto& item : m_RXMsgs)
for (auto& [id, msg] : m_RXMsgs)
{
if (item.second.ShouldSendACKS(now))
if (msg.ShouldSendACKS(now))
{
item.second.SendACKS(util::memFn(&Session::EncryptAndSend, this), now);
msg.SendACKS(util::memFn(&Session::EncryptAndSend, this), now);
}
}
for (auto& item : m_TXMsgs)
for (auto& [id, msg] : m_TXMsgs)
{
if (item.second.ShouldFlush(now))
if (msg.ShouldFlush(now))
{
item.second.FlushUnAcked(util::memFn(&Session::EncryptAndSend, this), now);
msg.FlushUnAcked(util::memFn(&Session::EncryptAndSend, this), now);
}
}
}
@ -613,6 +622,7 @@ namespace llarp
Session::HandleSessionData(Packet_t pkt)
{
m_DecryptNext.emplace_back(std::move(pkt));
TriggerPump();
}
void
@ -679,7 +689,7 @@ namespace llarp
}
}
SendMACK();
Pump();
TriggerPump();
}
void
@ -774,6 +784,8 @@ namespace llarp
{
itr = m_RXMsgs.emplace(rxid, InboundMessage{rxid, sz, ShortHash{pos}, m_Parent->Now()})
.first;
TriggerPump();
sz = std::min(sz, uint16_t{FragmentSize});
if ((data.size() - XMITOverhead) == sz)
{

@ -48,8 +48,11 @@ namespace llarp
/// inbound session
Session(LinkLayer* parent, const SockAddr& from);
~Session() = default;
// Signal the event loop that a pump is needed (idempotent)
void
TriggerPump();
// Does the actual pump
void
Pump() override;
@ -191,7 +194,7 @@ namespace llarp
/// maps rxid to time recieved
std::unordered_map<uint64_t, llarp_time_t> m_ReplayFilter;
/// rx messages to send in next round of multiacks
std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>> m_SendMACKs;
std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<>> m_SendMACKs;
using CryptoQueue_t = std::vector<Packet_t>;

@ -7,6 +7,7 @@
#include <llarp/util/fs.hpp>
#include <utility>
#include <unordered_set>
#include <llarp/router/abstractrouter.hpp>
static constexpr auto LINK_LAYER_TICK_INTERVAL = 100ms;
@ -38,7 +39,11 @@ namespace llarp
, m_SecretKey(keyManager->transportKey)
{}
ILinkLayer::~ILinkLayer() = default;
llarp_time_t
ILinkLayer::Now() const
{
return m_Router->loop()->time_now();
}
bool
ILinkLayer::HasSessionTo(const RouterID& id)
@ -124,10 +129,10 @@ namespace llarp
}
bool
ILinkLayer::Configure(EventLoop_ptr loop, const std::string& ifname, int af, uint16_t port)
ILinkLayer::Configure(AbstractRouter* router, const std::string& ifname, int af, uint16_t port)
{
m_Loop = std::move(loop);
m_udp = m_Loop->make_udp(
m_Router = router;
m_udp = m_Router->loop()->make_udp(
[this]([[maybe_unused]] UDPHandle& udp, const SockAddr& from, llarp_buffer_t buf) {
ILinkSession::Packet_t pkt;
pkt.resize(buf.sz);
@ -163,7 +168,6 @@ namespace llarp
if (not m_udp->listen(m_ourAddr))
return false;
m_Loop->add_ticker([this] { Pump(); });
return true;
}
@ -247,6 +251,7 @@ namespace llarp
}
m_AuthedLinks.emplace(pk, itr->second);
itr = m_Pending.erase(itr);
m_Router->PumpLL();
return true;
}
return false;
@ -345,7 +350,8 @@ namespace llarp
{
// Tie the lifetime of this repeater to this arbitrary shared_ptr:
m_repeater_keepalive = std::make_shared<int>(0);
m_Loop->call_every(LINK_LAYER_TICK_INTERVAL, m_repeater_keepalive, [this] { Tick(Now()); });
m_Router->loop()->call_every(
LINK_LAYER_TICK_INTERVAL, m_repeater_keepalive, [this] { Tick(Now()); });
return true;
}
@ -487,6 +493,7 @@ namespace llarp
if (m_Pending.count(address))
return false;
m_Pending.emplace(address, s);
m_Router->PumpLL();
return true;
}

@ -85,14 +85,11 @@ namespace llarp
SessionClosedHandler closed,
PumpDoneHandler pumpDone,
WorkerFunc_t doWork);
virtual ~ILinkLayer();
virtual ~ILinkLayer() = default;
/// get current time via event loop
llarp_time_t
Now() const
{
return m_Loop->time_now();
}
Now() const;
bool
HasSessionTo(const RouterID& pk);
@ -108,7 +105,7 @@ namespace llarp
SendTo_LL(const SockAddr& to, const llarp_buffer_t& pkt);
virtual bool
Configure(EventLoop_ptr loop, const std::string& ifname, int af, uint16_t port);
Configure(AbstractRouter* loop, const std::string& ifname, int af, uint16_t port);
virtual std::shared_ptr<ILinkSession>
NewOutboundSession(const RouterContact& rc, const AddressInfo& ai) = 0;
@ -225,6 +222,13 @@ namespace llarp
std::optional<int>
GetUDPFD() const;
// Gets a pointer to the router owning us.
AbstractRouter*
Router() const
{
return m_Router;
}
private:
const SecretKey& m_RouterEncSecret;
@ -239,7 +243,7 @@ namespace llarp
bool
PutSession(const std::shared_ptr<ILinkSession>& s);
EventLoop_ptr m_Loop;
AbstractRouter* m_Router;
SockAddr m_ourAddr;
std::shared_ptr<llarp::UDPHandle> m_udp;
SecretKey m_SecretKey;

@ -42,7 +42,7 @@ namespace llarp
virtual void
OnLinkEstablished(ILinkLayer*){};
/// called every event loop tick
/// called during pumping
virtual void
Pump() = 0;

@ -35,7 +35,7 @@ namespace llarp
QueueMessage(const RouterID& remote, const ILinkMessage& msg, SendStatusHandler callback) = 0;
virtual void
Tick() = 0;
Pump() = 0;
virtual void
RemovePath(const PathID_t& pathid) = 0;

@ -81,7 +81,7 @@ namespace llarp
}
void
OutboundMessageHandler::Tick()
OutboundMessageHandler::Pump()
{
m_Killer.TryAccess([this]() {
recentlyRemovedPaths.Decay();

@ -34,9 +34,9 @@ namespace llarp
* router, one is created.
*
* If there is a session to the destination router, the message is placed on the shared
* outbound message queue to be processed on Tick().
* outbound message queue to be processed on Pump().
*
* When this class' Tick() is called, that queue is emptied and the messages there
* When this class' Pump() is called, that queue is emptied and the messages there
* are placed in their paths' respective individual queues.
*
* Returns false if encoding the message into a buffer fails, true otherwise.
@ -47,7 +47,7 @@ namespace llarp
QueueMessage(const RouterID& remote, const ILinkMessage& msg, SendStatusHandler callback)
override EXCLUDES(_mutex);
/* Called once per event loop tick.
/* Called when pumping output queues, typically scheduled via a call to Router::PumpLL().
*
* Processes messages on the shared message queue into their paths' respective
* individual queues.
@ -59,7 +59,7 @@ namespace llarp
* Sends messages from path queues until all are empty or a set cap has been reached.
*/
void
Tick() override;
Pump() override;
/* Called from outside this class to inform it that a path has died / expired
* and its queue should be discarded.
@ -145,7 +145,7 @@ namespace llarp
* If the queue is full, the message is dropped and the message's status
* callback is invoked with a congestion status.
*
* When this class' Tick() is called, that queue is emptied and the messages there
* When this class' Pump() is called, that queue is emptied and the messages there
* are placed in their paths' respective individual queues.
*/
bool

@ -87,7 +87,7 @@ namespace llarp
llarp::LogTrace("Router::PumpLL() start");
if (_stopping.load())
return;
_outboundMessageHandler.Tick();
_outboundMessageHandler.Pump();
_linkManager.PumpLinks();
llarp::LogTrace("Router::PumpLL() end");
}
@ -106,10 +106,7 @@ namespace llarp
{"links", _linkManager.ExtractStatus()},
{"outboundMessages", _outboundMessageHandler.ExtractStatus()}};
}
else
{
return util::StatusObject{{"running", false}};
}
return util::StatusObject{{"running", false}};
}
util::StatusObject
@ -716,7 +713,7 @@ namespace llarp
const std::string& key = serverConfig.interface;
int af = serverConfig.addressFamily;
uint16_t port = serverConfig.port;
if (!server->Configure(loop(), key, af, port))
if (!server->Configure(this, key, af, port))
{
throw std::runtime_error(stringify("failed to bind inbound link on ", key, " port ", port));
}
@ -1521,7 +1518,7 @@ namespace llarp
for (const auto af : {AF_INET, AF_INET6})
{
if (not link->Configure(loop(), "*", af, m_OutboundPort))
if (not link->Configure(this, "*", af, m_OutboundPort))
continue;
#if defined(ANDROID)

Loading…
Cancel
Save