diff --git a/llarp/ev/ev.hpp b/llarp/ev/ev.hpp index 7e3d38740..f39d3041f 100644 --- a/llarp/ev/ev.hpp +++ b/llarp/ev/ev.hpp @@ -81,12 +81,6 @@ namespace llarp return llarp::time_now_ms(); } - // Triggers an event loop wakeup; use when something has been done that requires the event loop - // to wake up (e.g. adding to queues). This is called implicitly by call() and call_soon(). - // Idempotent and thread-safe. - virtual void - wakeup() = 0; - // Calls a function/lambda/etc. If invoked from within the event loop itself this calls the // given lambda immediately; otherwise it passes it to `call_soon()` to be queued to run at the // next event loop iteration. @@ -196,10 +190,6 @@ namespace llarp virtual std::shared_ptr make_udp(UDPReceiveFunc on_recv) = 0; - /// set the function that is called once per cycle the flush all the queues - virtual void - set_pump_function(std::function pumpll) = 0; - /// Make a thread-safe event loop waker (an "async" in libuv terminology) on this event loop; /// you can call `->Trigger()` on the returned shared pointer to fire the callback at the next /// available event loop iteration. (Multiple Trigger calls invoked before the call is actually @@ -227,6 +217,13 @@ namespace llarp { return nullptr; } + + protected: + // Triggers an event loop wakeup; use when something has been done that requires the event loop + // to wake up (e.g. adding to queues). This is called implicitly by call() and call_soon(). + // Idempotent and thread-safe. + virtual void + wakeup() = 0; }; using EventLoop_ptr = std::shared_ptr; diff --git a/llarp/ev/ev_libuv.cpp b/llarp/ev/ev_libuv.cpp index 04b447987..b491f42b0 100644 --- a/llarp/ev/ev_libuv.cpp +++ b/llarp/ev/ev_libuv.cpp @@ -111,14 +111,12 @@ namespace llarp::uv { llarp::LogTrace("ticking event loop."); FlushLogic(); - if (PumpLL) - PumpLL(); auto& log = llarp::LogContext::Instance(); if (log.logStream) log.logStream->Tick(time_now()); } - Loop::Loop(size_t queue_size) : llarp::EventLoop{}, PumpLL{nullptr}, m_LogicCalls{queue_size} + Loop::Loop(size_t queue_size) : llarp::EventLoop{}, m_LogicCalls{queue_size} { if (!(m_Impl = uvw::Loop::create())) throw std::runtime_error{"Failed to construct libuv loop"}; @@ -162,12 +160,6 @@ namespace llarp::uv m_WakeUp->send(); } - void - Loop::set_pump_function(std::function pump) - { - PumpLL = std::move(pump); - } - std::shared_ptr Loop::make_udp(UDPReceiveFunc on_recv) { diff --git a/llarp/ev/ev_libuv.hpp b/llarp/ev/ev_libuv.hpp index e03b52c93..86dacac79 100644 --- a/llarp/ev/ev_libuv.hpp +++ b/llarp/ev/ev_libuv.hpp @@ -31,9 +31,6 @@ namespace llarp::uv bool running() const override; - void - wakeup() override; - void call_later(llarp_time_t delay_ms, std::function callback) override; @@ -54,9 +51,6 @@ namespace llarp::uv void call_soon(std::function f) override; - void - set_pump_function(std::function pumpll) override; - std::shared_ptr make_waker(std::function callback) override; @@ -69,8 +63,6 @@ namespace llarp::uv void FlushLogic(); - std::function PumpLL; - std::shared_ptr MaybeGetUVWLoop() override; @@ -95,6 +87,9 @@ namespace llarp::uv std::unordered_map> m_Polls; std::optional m_EventLoopThreadID; + + void + wakeup() override; }; } // namespace llarp::uv diff --git a/llarp/path/ihophandler.cpp b/llarp/path/ihophandler.cpp index 1b185e3cb..589d4e4cd 100644 --- a/llarp/path/ihophandler.cpp +++ b/llarp/path/ihophandler.cpp @@ -16,7 +16,7 @@ namespace llarp pkt.first.resize(X.sz); std::copy_n(X.base, X.sz, pkt.first.begin()); pkt.second = Y; - r->loop()->wakeup(); + r->PumpLL(); return true; } @@ -31,7 +31,7 @@ namespace llarp pkt.first.resize(X.sz); std::copy_n(X.base, X.sz, pkt.first.begin()); pkt.second = Y; - r->loop()->wakeup(); + r->PumpLL(); return true; } diff --git a/llarp/path/path.cpp b/llarp/path/path.cpp index 9692233f1..0181a838b 100644 --- a/llarp/path/path.cpp +++ b/llarp/path/path.cpp @@ -488,7 +488,7 @@ namespace llarp LogDebug("failed to send upstream to ", Upstream()); } } - r->linkManager().PumpLinks(); + r->PumpLL(); } void @@ -600,7 +600,7 @@ namespace llarp m_RXRate += buf.sz; if (HandleRoutingMessage(buf, r)) { - r->loop()->wakeup(); + r->PumpLL(); m_LastRecvMessage = r->Now(); } } diff --git a/llarp/path/transit_hop.cpp b/llarp/path/transit_hop.cpp index 2e09a6f74..cb2abcd2e 100644 --- a/llarp/path/transit_hop.cpp +++ b/llarp/path/transit_hop.cpp @@ -185,7 +185,6 @@ namespace llarp other->FlushDownstream(r); } m_FlushOthers.clear(); - r->loop()->wakeup(); } else { @@ -200,8 +199,8 @@ namespace llarp info.upstream); r->SendToOrQueue(info.upstream, msg); } - r->linkManager().PumpLinks(); } + r->PumpLL(); } void @@ -218,7 +217,7 @@ namespace llarp info.downstream); r->SendToOrQueue(info.downstream, msg); } - r->linkManager().PumpLinks(); + r->PumpLL(); } void diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index 50e8f1a57..0cd9f00aa 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -1245,10 +1245,7 @@ namespace llarp #ifdef _WIN32 // windows uses proactor event loop so we need to constantly pump _loop->add_ticker([this] { PumpLLNonIdempotent(); }); -#else - _loop->set_pump_function([this] { PumpLLNonIdempotent(); }); #endif - //_loop->call_every(10ms, weak_from_this(), [this] { PumpLLNonIdempotent(); }); _loop->call_every(ROUTER_TICK_INTERVAL, weak_from_this(), [this] { Tick(); }); _running.store(true); _startedAt = Now(); diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index 943dfe4e4..139325f49 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -1587,7 +1587,7 @@ namespace llarp if (*ptr == m_Identity.pub.Addr()) { ConvoTagTX(tag); - Loop()->wakeup(); + m_state->m_Router->PumpLL(); if (not HandleInboundPacket(tag, pkt, t, 0)) return false; ConvoTagRX(tag); @@ -1596,7 +1596,7 @@ namespace llarp } if (not SendToOrQueue(*maybe, pkt, t)) return false; - Loop()->wakeup(); + m_state->m_Router->PumpLL(); return true; } LogDebug("SendToOrQueue failed: no endpoint for convo tag ", tag); @@ -1673,7 +1673,7 @@ namespace llarp } UpstreamFlush(router); - router->linkManager().PumpLinks(); + router->PumpLL(); } std::optional