Get rid of external event loop direct wakeups

If something needs to wake up the event loop it should be using an
async, as we are now with PumpLL(); but we had various code triggering a
wakeup, expecting that PumpLL gets called on every wakeup, which isn't
true anymore.
This commit is contained in:
Jason Rhinelander 2021-11-12 09:25:14 -04:00
parent 74215fc44c
commit 031ea7aa37
8 changed files with 20 additions and 40 deletions

View File

@ -81,12 +81,6 @@ namespace llarp
return llarp::time_now_ms(); return llarp::time_now_ms();
} }
// Triggers an event loop wakeup; use when something has been done that requires the event loop
// to wake up (e.g. adding to queues). This is called implicitly by call() and call_soon().
// Idempotent and thread-safe.
virtual void
wakeup() = 0;
// Calls a function/lambda/etc. If invoked from within the event loop itself this calls the // Calls a function/lambda/etc. If invoked from within the event loop itself this calls the
// given lambda immediately; otherwise it passes it to `call_soon()` to be queued to run at the // given lambda immediately; otherwise it passes it to `call_soon()` to be queued to run at the
// next event loop iteration. // next event loop iteration.
@ -196,10 +190,6 @@ namespace llarp
virtual std::shared_ptr<UDPHandle> virtual std::shared_ptr<UDPHandle>
make_udp(UDPReceiveFunc on_recv) = 0; make_udp(UDPReceiveFunc on_recv) = 0;
/// set the function that is called once per cycle the flush all the queues
virtual void
set_pump_function(std::function<void(void)> pumpll) = 0;
/// Make a thread-safe event loop waker (an "async" in libuv terminology) on this event loop; /// Make a thread-safe event loop waker (an "async" in libuv terminology) on this event loop;
/// you can call `->Trigger()` on the returned shared pointer to fire the callback at the next /// you can call `->Trigger()` on the returned shared pointer to fire the callback at the next
/// available event loop iteration. (Multiple Trigger calls invoked before the call is actually /// available event loop iteration. (Multiple Trigger calls invoked before the call is actually
@ -227,6 +217,13 @@ namespace llarp
{ {
return nullptr; return nullptr;
} }
protected:
// Triggers an event loop wakeup; use when something has been done that requires the event loop
// to wake up (e.g. adding to queues). This is called implicitly by call() and call_soon().
// Idempotent and thread-safe.
virtual void
wakeup() = 0;
}; };
using EventLoop_ptr = std::shared_ptr<EventLoop>; using EventLoop_ptr = std::shared_ptr<EventLoop>;

View File

@ -111,14 +111,12 @@ namespace llarp::uv
{ {
llarp::LogTrace("ticking event loop."); llarp::LogTrace("ticking event loop.");
FlushLogic(); FlushLogic();
if (PumpLL)
PumpLL();
auto& log = llarp::LogContext::Instance(); auto& log = llarp::LogContext::Instance();
if (log.logStream) if (log.logStream)
log.logStream->Tick(time_now()); log.logStream->Tick(time_now());
} }
Loop::Loop(size_t queue_size) : llarp::EventLoop{}, PumpLL{nullptr}, m_LogicCalls{queue_size} Loop::Loop(size_t queue_size) : llarp::EventLoop{}, m_LogicCalls{queue_size}
{ {
if (!(m_Impl = uvw::Loop::create())) if (!(m_Impl = uvw::Loop::create()))
throw std::runtime_error{"Failed to construct libuv loop"}; throw std::runtime_error{"Failed to construct libuv loop"};
@ -162,12 +160,6 @@ namespace llarp::uv
m_WakeUp->send(); m_WakeUp->send();
} }
void
Loop::set_pump_function(std::function<void(void)> pump)
{
PumpLL = std::move(pump);
}
std::shared_ptr<llarp::UDPHandle> std::shared_ptr<llarp::UDPHandle>
Loop::make_udp(UDPReceiveFunc on_recv) Loop::make_udp(UDPReceiveFunc on_recv)
{ {

View File

@ -31,9 +31,6 @@ namespace llarp::uv
bool bool
running() const override; running() const override;
void
wakeup() override;
void void
call_later(llarp_time_t delay_ms, std::function<void(void)> callback) override; call_later(llarp_time_t delay_ms, std::function<void(void)> callback) override;
@ -54,9 +51,6 @@ namespace llarp::uv
void void
call_soon(std::function<void(void)> f) override; call_soon(std::function<void(void)> f) override;
void
set_pump_function(std::function<void(void)> pumpll) override;
std::shared_ptr<llarp::EventLoopWakeup> std::shared_ptr<llarp::EventLoopWakeup>
make_waker(std::function<void()> callback) override; make_waker(std::function<void()> callback) override;
@ -69,8 +63,6 @@ namespace llarp::uv
void void
FlushLogic(); FlushLogic();
std::function<void(void)> PumpLL;
std::shared_ptr<uvw::Loop> std::shared_ptr<uvw::Loop>
MaybeGetUVWLoop() override; MaybeGetUVWLoop() override;
@ -95,6 +87,9 @@ namespace llarp::uv
std::unordered_map<int, std::shared_ptr<uvw::PollHandle>> m_Polls; std::unordered_map<int, std::shared_ptr<uvw::PollHandle>> m_Polls;
std::optional<std::thread::id> m_EventLoopThreadID; std::optional<std::thread::id> m_EventLoopThreadID;
void
wakeup() override;
}; };
} // namespace llarp::uv } // namespace llarp::uv

View File

@ -16,7 +16,7 @@ namespace llarp
pkt.first.resize(X.sz); pkt.first.resize(X.sz);
std::copy_n(X.base, X.sz, pkt.first.begin()); std::copy_n(X.base, X.sz, pkt.first.begin());
pkt.second = Y; pkt.second = Y;
r->loop()->wakeup(); r->PumpLL();
return true; return true;
} }
@ -31,7 +31,7 @@ namespace llarp
pkt.first.resize(X.sz); pkt.first.resize(X.sz);
std::copy_n(X.base, X.sz, pkt.first.begin()); std::copy_n(X.base, X.sz, pkt.first.begin());
pkt.second = Y; pkt.second = Y;
r->loop()->wakeup(); r->PumpLL();
return true; return true;
} }

View File

@ -488,7 +488,7 @@ namespace llarp
LogDebug("failed to send upstream to ", Upstream()); LogDebug("failed to send upstream to ", Upstream());
} }
} }
r->linkManager().PumpLinks(); r->PumpLL();
} }
void void
@ -600,7 +600,7 @@ namespace llarp
m_RXRate += buf.sz; m_RXRate += buf.sz;
if (HandleRoutingMessage(buf, r)) if (HandleRoutingMessage(buf, r))
{ {
r->loop()->wakeup(); r->PumpLL();
m_LastRecvMessage = r->Now(); m_LastRecvMessage = r->Now();
} }
} }

View File

@ -185,7 +185,6 @@ namespace llarp
other->FlushDownstream(r); other->FlushDownstream(r);
} }
m_FlushOthers.clear(); m_FlushOthers.clear();
r->loop()->wakeup();
} }
else else
{ {
@ -200,8 +199,8 @@ namespace llarp
info.upstream); info.upstream);
r->SendToOrQueue(info.upstream, msg); r->SendToOrQueue(info.upstream, msg);
} }
r->linkManager().PumpLinks();
} }
r->PumpLL();
} }
void void
@ -218,7 +217,7 @@ namespace llarp
info.downstream); info.downstream);
r->SendToOrQueue(info.downstream, msg); r->SendToOrQueue(info.downstream, msg);
} }
r->linkManager().PumpLinks(); r->PumpLL();
} }
void void

View File

@ -1245,10 +1245,7 @@ namespace llarp
#ifdef _WIN32 #ifdef _WIN32
// windows uses proactor event loop so we need to constantly pump // windows uses proactor event loop so we need to constantly pump
_loop->add_ticker([this] { PumpLLNonIdempotent(); }); _loop->add_ticker([this] { PumpLLNonIdempotent(); });
#else
_loop->set_pump_function([this] { PumpLLNonIdempotent(); });
#endif #endif
//_loop->call_every(10ms, weak_from_this(), [this] { PumpLLNonIdempotent(); });
_loop->call_every(ROUTER_TICK_INTERVAL, weak_from_this(), [this] { Tick(); }); _loop->call_every(ROUTER_TICK_INTERVAL, weak_from_this(), [this] { Tick(); });
_running.store(true); _running.store(true);
_startedAt = Now(); _startedAt = Now();

View File

@ -1587,7 +1587,7 @@ namespace llarp
if (*ptr == m_Identity.pub.Addr()) if (*ptr == m_Identity.pub.Addr())
{ {
ConvoTagTX(tag); ConvoTagTX(tag);
Loop()->wakeup(); m_state->m_Router->PumpLL();
if (not HandleInboundPacket(tag, pkt, t, 0)) if (not HandleInboundPacket(tag, pkt, t, 0))
return false; return false;
ConvoTagRX(tag); ConvoTagRX(tag);
@ -1596,7 +1596,7 @@ namespace llarp
} }
if (not SendToOrQueue(*maybe, pkt, t)) if (not SendToOrQueue(*maybe, pkt, t))
return false; return false;
Loop()->wakeup(); m_state->m_Router->PumpLL();
return true; return true;
} }
LogDebug("SendToOrQueue failed: no endpoint for convo tag ", tag); LogDebug("SendToOrQueue failed: no endpoint for convo tag ", tag);
@ -1673,7 +1673,7 @@ namespace llarp
} }
UpstreamFlush(router); UpstreamFlush(router);
router->linkManager().PumpLinks(); router->PumpLL();
} }
std::optional<ConvoTag> std::optional<ConvoTag>