#include "ev_libuv.hpp" #include "vpn.hpp" #include #include #include #include #include #include "ev.hpp" #include namespace llarp::uv { std::shared_ptr Loop::MaybeGetUVWLoop() { return m_Impl; } class UVWakeup final : public EventLoopWakeup { std::shared_ptr async; public: UVWakeup(uvw::Loop& loop, std::function callback) : async{loop.resource()} { async->on([f = std::move(callback)](auto&, auto&) { f(); }); } void Trigger() override { async->send(); } ~UVWakeup() override { async->close(); } }; class UVRepeater final : public EventLoopRepeater { std::shared_ptr timer; public: UVRepeater(uvw::Loop& loop) : timer{loop.resource()} {} void start(llarp_time_t every, std::function task) override { timer->start(every, every); timer->on([task = std::move(task)](auto&, auto&) { task(); }); } ~UVRepeater() override { timer->stop(); } }; struct UDPHandle final : llarp::UDPHandle { UDPHandle(uvw::Loop& loop, ReceiveFunc rf); bool listen(const SockAddr& addr) override; bool send(const SockAddr& dest, const llarp_buffer_t& buf) override; std::optional file_descriptor() override { #ifndef _WIN32 if (int fd = handle->fd(); fd >= 0) return fd; #endif return std::nullopt; } void close() override; ~UDPHandle() override; private: std::shared_ptr handle; void reset_handle(uvw::Loop& loop); }; void Loop::FlushLogic() { llarp::LogTrace("Loop::FlushLogic() start"); while (not m_LogicCalls.empty()) { auto f = m_LogicCalls.popFront(); f(); } llarp::LogTrace("Loop::FlushLogic() end"); } void Loop::tick_event_loop() { llarp::LogTrace("ticking event loop."); FlushLogic(); PumpLL(); auto& log = llarp::LogContext::Instance(); if (log.logStream) log.logStream->Tick(time_now()); } Loop::Loop(size_t queue_size) : llarp::EventLoop{}, PumpLL{[] {}}, m_LogicCalls{queue_size} { if (!(m_Impl = uvw::Loop::create())) throw std::runtime_error{"Failed to construct libuv loop"}; #ifdef LOKINET_DEBUG last_time = 0; loop_run_count = 0; #endif #ifndef _WIN32 signal(SIGPIPE, SIG_IGN); #endif m_Run.store(true); m_nextID.store(0); if (!(m_WakeUp = m_Impl->resource())) throw std::runtime_error{"Failed to create libuv async"}; m_WakeUp->on([this](const auto&, auto&) { tick_event_loop(); }); } bool Loop::running() const { return m_Run.load(); } void Loop::run() { llarp::LogTrace("Loop::run_loop()"); m_EventLoopThreadID = std::this_thread::get_id(); m_Impl->run(); m_Impl->close(); m_Impl.reset(); llarp::LogInfo("we have stopped"); } void Loop::wakeup() { m_WakeUp->send(); } void Loop::set_pump_function(std::function pump) { PumpLL = std::move(pump); } std::shared_ptr Loop::make_udp(UDPReceiveFunc on_recv) { return std::static_pointer_cast( std::make_shared(*m_Impl, std::move(on_recv))); } static void setup_oneshot_timer(uvw::Loop& loop, llarp_time_t delay, std::function callback) { auto timer = loop.resource(); timer->on([f = std::move(callback)](const auto&, auto& timer) { f(); timer.stop(); timer.close(); }); timer->start(delay, 0ms); } void Loop::call_later(llarp_time_t delay_ms, std::function callback) { llarp::LogTrace("Loop::call_after_delay()"); #ifdef TESTNET_SPEED delay_ms *= TESTNET_SPEED; #endif if (inEventLoop()) setup_oneshot_timer(*m_Impl, delay_ms, std::move(callback)); else { call_soon([this, f = std::move(callback), target_time = time_now() + delay_ms] { // Recalculate delay because it may have taken some time to get ourselves into the logic // thread auto updated_delay = target_time - time_now(); if (updated_delay <= 0ms) f(); // Timer already expired! else setup_oneshot_timer(*m_Impl, updated_delay, std::move(f)); }); } } void Loop::stop() { if (m_Run) { if (not inEventLoop()) return call_soon([this] { stop(); }); llarp::LogInfo("stopping event loop"); m_Impl->walk([](auto&& handle) { if constexpr (!std::is_pointer_v>) handle.close(); }); llarp::LogDebug("Closed all handles, stopping the loop"); m_Impl->stop(); m_Run.store(false); } } bool Loop::add_ticker(std::function func) { auto check = m_Impl->resource(); check->on([f = std::move(func)](auto&, auto&) { f(); }); check->start(); return true; } bool Loop::add_network_interface( std::shared_ptr netif, std::function handler) { #ifndef _WIN32 using event_t = uvw::PollEvent; auto handle = m_Impl->resource(netif->PollFD()); #else using event_t = uvw::CheckEvent; auto handle = m_Impl->resource(); #endif if (!handle) return false; handle->on([netif = std::move(netif), handler = std::move(handler)]( const event_t&, [[maybe_unused]] auto& handle) { for (auto pkt = netif->ReadNextPacket(); pkt.sz > 0; pkt = netif->ReadNextPacket()) { LogDebug("got packet ", pkt.sz); if (handler) handler(std::move(pkt)); } }); #ifndef _WIN32 handle->start(uvw::PollHandle::Event::READABLE); #else handle->start(); #endif return true; } void Loop::call_soon(std::function f) { if (not m_EventLoopThreadID.has_value()) { m_LogicCalls.tryPushBack(f); m_WakeUp->send(); return; } if (inEventLoop() and m_LogicCalls.full()) { FlushLogic(); } m_LogicCalls.pushBack(f); m_WakeUp->send(); } // Sets `handle` to a new uvw UDP handle, first initiating a close and then disowning the handle // if already set, allocating the resource, and setting the receive event on it. void UDPHandle::reset_handle(uvw::Loop& loop) { if (handle) handle->close(); handle = loop.resource(); handle->on([this](auto& event, auto& /*handle*/) { on_recv( *this, SockAddr{event.sender.ip, static_cast(event.sender.port)}, OwnedBuffer{std::move(event.data), event.length}); }); } llarp::uv::UDPHandle::UDPHandle(uvw::Loop& loop, ReceiveFunc rf) : llarp::UDPHandle{std::move(rf)} { reset_handle(loop); } bool UDPHandle::listen(const SockAddr& addr) { if (handle->active()) reset_handle(handle->loop()); bool good = true; auto err = handle->on([&](auto& event, auto&) { llarp::LogError("failed to bind and start receiving on ", addr, ": ", event.what()); good = false; }); handle->bind(*static_cast(addr)); if (good) handle->recv(); handle->erase(err); return good; } bool UDPHandle::send(const SockAddr& to, const llarp_buffer_t& buf) { return handle->trySend( *static_cast(to), const_cast(reinterpret_cast(buf.base)), buf.sz) >= 0; } void UDPHandle::close() { handle->close(); handle.reset(); } UDPHandle::~UDPHandle() { close(); } std::shared_ptr Loop::make_waker(std::function callback) { return std::static_pointer_cast( std::make_shared(*m_Impl, std::move(callback))); } std::shared_ptr Loop::make_repeater() { return std::static_pointer_cast(std::make_shared(*m_Impl)); } bool Loop::inEventLoop() const { return m_EventLoopThreadID and *m_EventLoopThreadID == std::this_thread::get_id(); } } // namespace llarp::uv