diff --git a/llarp/exit/session.cpp b/llarp/exit/session.cpp index f6c2d5136..485ca768f 100644 --- a/llarp/exit/session.cpp +++ b/llarp/exit/session.cpp @@ -288,9 +288,8 @@ namespace llarp auto path = PickEstablishedPath(llarp::path::ePathRoleExit); if (path) { - for (auto& item : m_Upstream) + for (auto& [i, queue] : m_Upstream) { - auto& queue = item.second; while (queue.size()) { auto& msg = queue.front(); @@ -305,8 +304,8 @@ namespace llarp if (m_Upstream.size()) llarp::LogWarn("no path for exit session"); // discard upstream - for (auto& item : m_Upstream) - item.second.clear(); + for (auto& [i, queue] : m_Upstream) + queue.clear(); m_Upstream.clear(); if (numHops == 1) { diff --git a/llarp/iwp/session.cpp b/llarp/iwp/session.cpp index 0b5b3bec5..ae0af1444 100644 --- a/llarp/iwp/session.cpp +++ b/llarp/iwp/session.cpp @@ -3,6 +3,7 @@ #include #include #include +#include namespace llarp { @@ -136,6 +137,7 @@ namespace llarp Session::EncryptAndSend(ILinkSession::Packet_t data) { m_EncryptNext.emplace_back(std::move(data)); + TriggerPump(); if (!IsEstablished()) { EncryptWorker(std::move(m_EncryptNext)); @@ -190,6 +192,7 @@ namespace llarp const auto bufsz = buf.size(); auto& msg = m_TXMsgs.emplace(msgid, OutboundMessage{msgid, std::move(buf), now, completed}) .first->second; + TriggerPump(); EncryptAndSend(msg.XMIT()); if (bufsz > FragmentSize) { @@ -225,6 +228,12 @@ namespace llarp } } + void + Session::TriggerPump() + { + m_Parent->Router()->PumpLL(); + } + void Session::Pump() { @@ -233,18 +242,18 @@ namespace llarp { if (ShouldPing()) SendKeepAlive(); - for (auto& item : m_RXMsgs) + for (auto& [id, msg] : m_RXMsgs) { - if (item.second.ShouldSendACKS(now)) + if (msg.ShouldSendACKS(now)) { - item.second.SendACKS(util::memFn(&Session::EncryptAndSend, this), now); + msg.SendACKS(util::memFn(&Session::EncryptAndSend, this), now); } } - for (auto& item : m_TXMsgs) + for (auto& [id, msg] : m_TXMsgs) { - if (item.second.ShouldFlush(now)) + if (msg.ShouldFlush(now)) { - item.second.FlushUnAcked(util::memFn(&Session::EncryptAndSend, this), now); + msg.FlushUnAcked(util::memFn(&Session::EncryptAndSend, this), now); } } } @@ -613,6 +622,7 @@ namespace llarp Session::HandleSessionData(Packet_t pkt) { m_DecryptNext.emplace_back(std::move(pkt)); + TriggerPump(); } void @@ -679,7 +689,7 @@ namespace llarp } } SendMACK(); - Pump(); + TriggerPump(); } void @@ -774,6 +784,8 @@ namespace llarp { itr = m_RXMsgs.emplace(rxid, InboundMessage{rxid, sz, ShortHash{pos}, m_Parent->Now()}) .first; + TriggerPump(); + sz = std::min(sz, uint16_t{FragmentSize}); if ((data.size() - XMITOverhead) == sz) { diff --git a/llarp/iwp/session.hpp b/llarp/iwp/session.hpp index 7a96c4389..d37a5464a 100644 --- a/llarp/iwp/session.hpp +++ b/llarp/iwp/session.hpp @@ -48,8 +48,11 @@ namespace llarp /// inbound session Session(LinkLayer* parent, const SockAddr& from); - ~Session() = default; + // Signal the event loop that a pump is needed (idempotent) + void + TriggerPump(); + // Does the actual pump void Pump() override; @@ -191,7 +194,7 @@ namespace llarp /// maps rxid to time recieved std::unordered_map m_ReplayFilter; /// rx messages to send in next round of multiacks - std::priority_queue, std::greater> m_SendMACKs; + std::priority_queue, std::greater<>> m_SendMACKs; using CryptoQueue_t = std::vector; diff --git a/llarp/link/server.cpp b/llarp/link/server.cpp index 23658dfe1..7166ab59f 100644 --- a/llarp/link/server.cpp +++ b/llarp/link/server.cpp @@ -7,6 +7,7 @@ #include #include #include +#include static constexpr auto LINK_LAYER_TICK_INTERVAL = 100ms; @@ -38,7 +39,11 @@ namespace llarp , m_SecretKey(keyManager->transportKey) {} - ILinkLayer::~ILinkLayer() = default; + llarp_time_t + ILinkLayer::Now() const + { + return m_Router->loop()->time_now(); + } bool ILinkLayer::HasSessionTo(const RouterID& id) @@ -124,10 +129,10 @@ namespace llarp } bool - ILinkLayer::Configure(EventLoop_ptr loop, const std::string& ifname, int af, uint16_t port) + ILinkLayer::Configure(AbstractRouter* router, const std::string& ifname, int af, uint16_t port) { - m_Loop = std::move(loop); - m_udp = m_Loop->make_udp( + m_Router = router; + m_udp = m_Router->loop()->make_udp( [this]([[maybe_unused]] UDPHandle& udp, const SockAddr& from, llarp_buffer_t buf) { ILinkSession::Packet_t pkt; pkt.resize(buf.sz); @@ -163,7 +168,6 @@ namespace llarp if (not m_udp->listen(m_ourAddr)) return false; - m_Loop->add_ticker([this] { Pump(); }); return true; } @@ -247,6 +251,7 @@ namespace llarp } m_AuthedLinks.emplace(pk, itr->second); itr = m_Pending.erase(itr); + m_Router->PumpLL(); return true; } return false; @@ -345,7 +350,8 @@ namespace llarp { // Tie the lifetime of this repeater to this arbitrary shared_ptr: m_repeater_keepalive = std::make_shared(0); - m_Loop->call_every(LINK_LAYER_TICK_INTERVAL, m_repeater_keepalive, [this] { Tick(Now()); }); + m_Router->loop()->call_every( + LINK_LAYER_TICK_INTERVAL, m_repeater_keepalive, [this] { Tick(Now()); }); return true; } @@ -487,6 +493,7 @@ namespace llarp if (m_Pending.count(address)) return false; m_Pending.emplace(address, s); + m_Router->PumpLL(); return true; } diff --git a/llarp/link/server.hpp b/llarp/link/server.hpp index e58e4da56..803120454 100644 --- a/llarp/link/server.hpp +++ b/llarp/link/server.hpp @@ -85,14 +85,11 @@ namespace llarp SessionClosedHandler closed, PumpDoneHandler pumpDone, WorkerFunc_t doWork); - virtual ~ILinkLayer(); + virtual ~ILinkLayer() = default; /// get current time via event loop llarp_time_t - Now() const - { - return m_Loop->time_now(); - } + Now() const; bool HasSessionTo(const RouterID& pk); @@ -108,7 +105,7 @@ namespace llarp SendTo_LL(const SockAddr& to, const llarp_buffer_t& pkt); virtual bool - Configure(EventLoop_ptr loop, const std::string& ifname, int af, uint16_t port); + Configure(AbstractRouter* loop, const std::string& ifname, int af, uint16_t port); virtual std::shared_ptr NewOutboundSession(const RouterContact& rc, const AddressInfo& ai) = 0; @@ -225,6 +222,13 @@ namespace llarp std::optional GetUDPFD() const; + // Gets a pointer to the router owning us. + AbstractRouter* + Router() const + { + return m_Router; + } + private: const SecretKey& m_RouterEncSecret; @@ -239,7 +243,7 @@ namespace llarp bool PutSession(const std::shared_ptr& s); - EventLoop_ptr m_Loop; + AbstractRouter* m_Router; SockAddr m_ourAddr; std::shared_ptr m_udp; SecretKey m_SecretKey; diff --git a/llarp/link/session.hpp b/llarp/link/session.hpp index 60430458f..11f326b52 100644 --- a/llarp/link/session.hpp +++ b/llarp/link/session.hpp @@ -42,7 +42,7 @@ namespace llarp virtual void OnLinkEstablished(ILinkLayer*){}; - /// called every event loop tick + /// called during pumping virtual void Pump() = 0; diff --git a/llarp/router/i_outbound_message_handler.hpp b/llarp/router/i_outbound_message_handler.hpp index 0ab96c434..ed9e542a5 100644 --- a/llarp/router/i_outbound_message_handler.hpp +++ b/llarp/router/i_outbound_message_handler.hpp @@ -35,7 +35,7 @@ namespace llarp QueueMessage(const RouterID& remote, const ILinkMessage& msg, SendStatusHandler callback) = 0; virtual void - Tick() = 0; + Pump() = 0; virtual void RemovePath(const PathID_t& pathid) = 0; diff --git a/llarp/router/outbound_message_handler.cpp b/llarp/router/outbound_message_handler.cpp index 2aaa6a30e..7e5a0cab4 100644 --- a/llarp/router/outbound_message_handler.cpp +++ b/llarp/router/outbound_message_handler.cpp @@ -81,7 +81,7 @@ namespace llarp } void - OutboundMessageHandler::Tick() + OutboundMessageHandler::Pump() { m_Killer.TryAccess([this]() { recentlyRemovedPaths.Decay(); diff --git a/llarp/router/outbound_message_handler.hpp b/llarp/router/outbound_message_handler.hpp index e0402d633..e648ad1fa 100644 --- a/llarp/router/outbound_message_handler.hpp +++ b/llarp/router/outbound_message_handler.hpp @@ -34,9 +34,9 @@ namespace llarp * router, one is created. * * If there is a session to the destination router, the message is placed on the shared - * outbound message queue to be processed on Tick(). + * outbound message queue to be processed on Pump(). * - * When this class' Tick() is called, that queue is emptied and the messages there + * When this class' Pump() is called, that queue is emptied and the messages there * are placed in their paths' respective individual queues. * * Returns false if encoding the message into a buffer fails, true otherwise. @@ -47,7 +47,7 @@ namespace llarp QueueMessage(const RouterID& remote, const ILinkMessage& msg, SendStatusHandler callback) override EXCLUDES(_mutex); - /* Called once per event loop tick. + /* Called when pumping output queues, typically scheduled via a call to Router::PumpLL(). * * Processes messages on the shared message queue into their paths' respective * individual queues. @@ -59,7 +59,7 @@ namespace llarp * Sends messages from path queues until all are empty or a set cap has been reached. */ void - Tick() override; + Pump() override; /* Called from outside this class to inform it that a path has died / expired * and its queue should be discarded. @@ -145,7 +145,7 @@ namespace llarp * If the queue is full, the message is dropped and the message's status * callback is invoked with a congestion status. * - * When this class' Tick() is called, that queue is emptied and the messages there + * When this class' Pump() is called, that queue is emptied and the messages there * are placed in their paths' respective individual queues. */ bool diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index 2739c85d4..51283428b 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -87,7 +87,7 @@ namespace llarp llarp::LogTrace("Router::PumpLL() start"); if (_stopping.load()) return; - _outboundMessageHandler.Tick(); + _outboundMessageHandler.Pump(); _linkManager.PumpLinks(); llarp::LogTrace("Router::PumpLL() end"); } @@ -106,10 +106,7 @@ namespace llarp {"links", _linkManager.ExtractStatus()}, {"outboundMessages", _outboundMessageHandler.ExtractStatus()}}; } - else - { - return util::StatusObject{{"running", false}}; - } + return util::StatusObject{{"running", false}}; } util::StatusObject @@ -716,7 +713,7 @@ namespace llarp const std::string& key = serverConfig.interface; int af = serverConfig.addressFamily; uint16_t port = serverConfig.port; - if (!server->Configure(loop(), key, af, port)) + if (!server->Configure(this, key, af, port)) { throw std::runtime_error(stringify("failed to bind inbound link on ", key, " port ", port)); } @@ -1521,7 +1518,7 @@ namespace llarp for (const auto af : {AF_INET, AF_INET6}) { - if (not link->Configure(loop(), "*", af, m_OutboundPort)) + if (not link->Configure(this, "*", af, m_OutboundPort)) continue; #if defined(ANDROID)