From 5b555ee5aa619741f3af400064b4642c19c50c1a Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Mon, 1 Mar 2021 22:06:20 -0400 Subject: [PATCH] Replace libuv with uvw & related refactoring - removes all the llarp_ev_* functions, replacing with methods/classes/functions in the llarp namespace. - banish ev/ev.h to the void - Passes various things by const lvalue ref, especially shared_ptr's that don't need to be copied (to avoid an atomic refcount increment/decrement). - Add a llarp::UDPHandle abstract class for UDP handling - Removes the UDP tick handler; code that needs tick can just do a separate handler on the event loop outside the UDP socket. - Adds an "OwnedBuffer" which owns its own memory but is implicitly convertible to a llarp_buffer_t. This is mostly needed to take over ownership of buffers from uvw without copying them as, currently, uvw does its own allocation (pending some open upstream issues/PRs). - Logic: - add `make_caller`/`call_forever`/`call_every` utility functions to abstract Call wrapping and dependent timed tasks. - Add inLogicThread() so that code can tell its inside the logic thread (typically for debugging assertions). - get rid of janky integer returns and dealing with cancellations on call_later: the other methods added here and the event loop code remove the need for them. - Event loop: - redo everything with uvw instead of libuv - rename EventLoopWakeup::Wakeup to EventLoopWakeup::Trigger to better reflect what it does. - add EventLoopRepeater for repeated events, and replace the code that reschedules itself every time it is called with a repeater. - Split up `EventLoop::run()` into a non-virtual base method and abstract `run_loop()` methods; the base method does a couple extra setup/teardown things that don't need to be in the derived class. - udp_listen is replaced with ev->udp(...) which returns a new UDPHandle object rather that needing gross C-style-but-not-actually-C-compatible structs. - Remove unused register_poll_fd_(un)readable - Use shared_ptr for EventLoopWakeup rather than returning a raw pointer; uvw lets us not have to worry about having the event loop class maintain ownership of it. - Add factory EventLoop::create() function to create a default (uvw-based) event loop (previously this was one of the llarp_ev_blahblah unnamespaced functions). - ev_libuv: this is mostly rewritten; all of the glue code/structs, in particular, are gone as they are no longer needed with uvw. - DNS: - Rename DnsHandler to DnsInterceptor to better describe what it does (this is the code that intercepts all DNS to the tun IP range for Android). - endpoint: - remove unused "isolated network" code - remove distinct (but actually always the same) variables for router/endpoint logic objects - llarp_buffer_t - make constructors type-safe against being called with points to non-size-1 values - tun packet reading: - read all available packets off the device/file descriptor; previously we were reading one packet at a time then returning to the event loop to poll again. - ReadNextPacket() now returns a 0-size packet if the read would block (so that we can implement the previous point). - ReadNextPacket() now throws on I/O error - Miscellaneous code cleanups/simplifications --- .gitmodules | 3 + CMakeLists.txt | 1 + external/uvw | 1 + llarp/CMakeLists.txt | 7 +- llarp/context.cpp | 5 +- llarp/dns/message.cpp | 10 + llarp/dns/message.hpp | 4 + llarp/dns/server.cpp | 87 ++-- llarp/dns/server.hpp | 37 +- llarp/dns/unbound_resolver.cpp | 12 +- llarp/dns/unbound_resolver.hpp | 5 +- llarp/ev/ev.cpp | 70 +-- llarp/ev/ev.h | 88 ---- llarp/ev/ev.hpp | 111 +++-- llarp/ev/ev_libuv.cpp | 690 ++++++++--------------------- llarp/ev/ev_libuv.hpp | 94 ++-- llarp/ev/udp_handle.hpp | 41 ++ llarp/handlers/tun.cpp | 23 +- llarp/iwp/linklayer.cpp | 9 +- llarp/iwp/linklayer.hpp | 4 +- llarp/link/server.cpp | 69 ++- llarp/link/server.hpp | 42 +- llarp/net/ip_packet.hpp | 16 +- llarp/router/abstractrouter.hpp | 16 +- llarp/router/router.cpp | 36 +- llarp/router/router.hpp | 29 +- llarp/rpc/endpoint_rpc.cpp | 24 +- llarp/service/endpoint.cpp | 62 +-- llarp/service/endpoint.hpp | 34 +- llarp/service/endpoint_state.hpp | 19 - llarp/service/outbound_context.cpp | 4 +- llarp/service/sendcontext.cpp | 11 +- llarp/simulation/sim_context.hpp | 4 +- llarp/tooling/hive_context.cpp | 2 +- llarp/tooling/hive_context.hpp | 2 +- llarp/tooling/hive_router.cpp | 2 +- llarp/tooling/hive_router.hpp | 2 +- llarp/util/aligned.hpp | 2 +- llarp/util/buffer.cpp | 20 + llarp/util/buffer.hpp | 105 ++++- llarp/util/thread/logic.cpp | 31 +- llarp/util/thread/logic.hpp | 68 ++- llarp/vpn/apple.hpp | 16 +- llarp/vpn/linux.hpp | 4 + llarp/vpn/win32.hpp | 11 +- test/iwp/test_iwp_session.cpp | 16 +- 46 files changed, 736 insertions(+), 1213 deletions(-) create mode 160000 external/uvw delete mode 100644 llarp/ev/ev.h create mode 100644 llarp/ev/udp_handle.hpp diff --git a/.gitmodules b/.gitmodules index eaaeafcbb..2a91f627f 100644 --- a/.gitmodules +++ b/.gitmodules @@ -26,3 +26,6 @@ [submodule "external/oxen-mq"] path = external/oxen-mq url = https://github.com/oxen-io/oxen-mq +[submodule "external/uvw"] + path = external/uvw + url = https://github.com/skypjack/uvw.git diff --git a/CMakeLists.txt b/CMakeLists.txt index 4cd1f6877..98ca5fb2e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -316,6 +316,7 @@ if(SUBMODULE_CHECK) check_submodule(external/pybind11) check_submodule(external/sqlite_orm) check_submodule(external/oxen-mq) + check_submodule(external/uvw) endif() endif() diff --git a/external/uvw b/external/uvw new file mode 160000 index 000000000..58b299ee6 --- /dev/null +++ b/external/uvw @@ -0,0 +1 @@ +Subproject commit 58b299ee60d62386a2339dab3f99d30570b33085 diff --git a/llarp/CMakeLists.txt b/llarp/CMakeLists.txt index c4d4f0627..975dda7ef 100644 --- a/llarp/CMakeLists.txt +++ b/llarp/CMakeLists.txt @@ -47,6 +47,10 @@ if(ANDROID) target_link_libraries(lokinet-util PUBLIC log) endif() +add_library(uvw INTERFACE) +target_include_directories(uvw INTERFACE ${PROJECT_SOURCE_DIR}/external/uvw/src) +target_link_libraries(uvw INTERFACE libuv) + add_library(lokinet-platform # for networking ev/ev.cpp @@ -63,7 +67,7 @@ add_library(lokinet-platform vpn/platform.cpp ) -target_link_libraries(lokinet-platform PUBLIC lokinet-cryptography lokinet-util Threads::Threads base_libs libuv) +target_link_libraries(lokinet-platform PUBLIC lokinet-cryptography lokinet-util Threads::Threads base_libs uvw) if (ANDROID) @@ -81,7 +85,6 @@ endif() if (WIN32) target_sources(lokinet-platform PRIVATE - ev/ev_libuv.cpp win32/win32_inet.c win32/win32_intrnl.c) diff --git a/llarp/context.cpp b/llarp/context.cpp index b70df293c..f5aff4aa3 100644 --- a/llarp/context.cpp +++ b/llarp/context.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include @@ -64,7 +65,7 @@ namespace llarp if (mainloop == nullptr) { auto jobQueueSize = std::max(event_loop_queue_size, config->router.m_JobQueueSize); - mainloop = llarp_make_ev_loop(jobQueueSize); + mainloop = EventLoop::create(jobQueueSize); } logic->set_event_loop(mainloop.get()); @@ -116,7 +117,7 @@ namespace llarp // run net io thread llarp::LogInfo("running mainloop"); - llarp_ev_loop_run_single_process(mainloop, logic); + mainloop->run(*logic); if (closeWaiter) { closeWaiter->set_value(); diff --git a/llarp/dns/message.cpp b/llarp/dns/message.cpp index 9bdcd22bb..d35a95957 100644 --- a/llarp/dns/message.cpp +++ b/llarp/dns/message.cpp @@ -122,6 +122,16 @@ namespace llarp return true; } + OwnedBuffer + Message::ToBuffer() const + { + std::array tmp; + llarp_buffer_t buf{tmp}; + if (not Encode(&buf)) + throw std::runtime_error("cannot encode dns message"); + return OwnedBuffer::copy_used(buf); + } + void Message::AddServFail(RR_TTL_t) { if (questions.size()) diff --git a/llarp/dns/message.hpp b/llarp/dns/message.hpp index 363fc7b26..82ffe7bec 100644 --- a/llarp/dns/message.hpp +++ b/llarp/dns/message.hpp @@ -83,6 +83,10 @@ namespace llarp bool Decode(llarp_buffer_t* buf) override; + // Wrapper around Encode that encodes into a new buffer and returns it + [[nodiscard]] OwnedBuffer + ToBuffer() const; + std::ostream& print(std::ostream& stream, int level, int spaces) const; diff --git a/llarp/dns/server.cpp b/llarp/dns/server.cpp index 52f25ddd3..3374d9f3f 100644 --- a/llarp/dns/server.cpp +++ b/llarp/dns/server.cpp @@ -4,31 +4,19 @@ #include #include #include +#include namespace llarp::dns { - static std::vector - MessageToBuffer(Message msg) - { - std::array tmp = {{0}}; - llarp_buffer_t buf{tmp}; - if (not msg.Encode(&buf)) - throw std::runtime_error("cannot encode dns message"); - std::vector pkt; - pkt.resize(buf.cur - buf.base); - std::copy_n(tmp.data(), pkt.size(), pkt.data()); - return pkt; - } PacketHandler::PacketHandler(Logic_ptr logic, IQueryHandler* h) - : m_QueryHandler{h}, m_Logic{logic} + : m_QueryHandler{h}, m_Logic{std::move(logic)} {} - Proxy::Proxy(llarp_ev_loop_ptr loop, Logic_ptr logic, IQueryHandler* h) + Proxy::Proxy(EventLoop_ptr loop, Logic_ptr logic, IQueryHandler* h) : PacketHandler{logic, h}, m_Loop(std::move(loop)) { - m_Server.user = this; - m_Server.tick = nullptr; - m_Server.recvfrom = &HandleUDP; + m_Server = + m_Loop->udp([this](UDPHandle&, SockAddr a, OwnedBuffer buf) { HandlePacket(a, a, buf); }); } void @@ -43,23 +31,7 @@ namespace llarp::dns { if (not PacketHandler::Start(addr, std::move(resolvers))) return false; - return (llarp_ev_add_udp(m_Loop, &m_Server, addr) == 0); - } - - static Proxy::Buffer_t - CopyBuffer(const llarp_buffer_t& buf) - { - std::vector msgbuf(buf.sz); - std::copy_n(buf.base, buf.sz, msgbuf.data()); - return msgbuf; - } - - void - Proxy::HandleUDP(llarp_udp_io* u, const SockAddr& from, ManagedBuffer buf) - { - Buffer_t msgbuf = CopyBuffer(buf.underlying); - auto self = static_cast(u->user); - self->HandlePacket(from, from, std::move(msgbuf)); + return m_Server->listen(addr); } void @@ -81,21 +53,15 @@ namespace llarp::dns bool PacketHandler::SetupUnboundResolver(std::vector resolvers) { - auto failFunc = [self = weak_from_this()](SockAddr from, SockAddr to, Message msg) { - auto this_ptr = self.lock(); - if (this_ptr) - { - this_ptr->SendServerMessageBufferTo(from, to, MessageToBuffer(std::move(msg))); - } + auto failFunc = [self = weak_from_this()]( + const SockAddr& from, const SockAddr& to, Message msg) { + if (auto this_ptr = self.lock()) + this_ptr->SendServerMessageBufferTo(from, to, msg.ToBuffer()); }; - auto replyFunc = [self = weak_from_this()]( - SockAddr from, SockAddr to, std::vector buf) { - auto this_ptr = self.lock(); - if (this_ptr) - { - this_ptr->SendServerMessageBufferTo(from, to, std::move(buf)); - } + auto replyFunc = [self = weak_from_this()](auto&&... args) { + if (auto this_ptr = self.lock()) + this_ptr->SendServerMessageBufferTo(std::forward(args)...); }; m_UnboundResolver = @@ -121,26 +87,24 @@ namespace llarp::dns } void - Proxy::SendServerMessageBufferTo(SockAddr, SockAddr to, Buffer_t buf) + Proxy::SendServerMessageBufferTo(const SockAddr&, const SockAddr& to, llarp_buffer_t buf) { - if (llarp_ev_udp_sendto(&m_Server, to, buf) < 0) + if (!m_Server->send(to, buf)) llarp::LogError("dns reply failed"); } bool - PacketHandler::ShouldHandlePacket(SockAddr to, SockAddr from, Buffer_t buf) const + PacketHandler::ShouldHandlePacket( + const SockAddr& to, [[maybe_unused]] const SockAddr& from, llarp_buffer_t buf) const { - (void)from; - MessageHeader hdr; - llarp_buffer_t pkt{buf}; - if (not hdr.Decode(&pkt)) + if (not hdr.Decode(&buf)) { return false; } Message msg{hdr}; - if (not msg.Decode(&pkt)) + if (not msg.Decode(&buf)) { return false; } @@ -156,18 +120,17 @@ namespace llarp::dns } void - PacketHandler::HandlePacket(SockAddr resolver, SockAddr from, Buffer_t buf) + PacketHandler::HandlePacket(const SockAddr& resolver, const SockAddr& from, llarp_buffer_t buf) { MessageHeader hdr; - llarp_buffer_t pkt{buf}; - if (not hdr.Decode(&pkt)) + if (not hdr.Decode(&buf)) { llarp::LogWarn("failed to parse dns header from ", from); return; } Message msg(hdr); - if (not msg.Decode(&pkt)) + if (not msg.Decode(&buf)) { llarp::LogWarn("failed to parse dns message from ", from); return; @@ -186,7 +149,7 @@ namespace llarp::dns // yea it is, let's turn off DoH because god is dead. msg.AddNXReply(); // press F to pay respects - SendServerMessageBufferTo(resolver, from, MessageToBuffer(std::move(msg))); + SendServerMessageBufferTo(resolver, from, msg.ToBuffer()); return; } } @@ -194,7 +157,7 @@ namespace llarp::dns if (m_QueryHandler && m_QueryHandler->ShouldHookDNSMessage(msg)) { auto reply = [self = shared_from_this(), to = from, resolver](dns::Message msg) { - self->SendServerMessageBufferTo(resolver, to, MessageToBuffer(std::move(msg))); + self->SendServerMessageBufferTo(resolver, to, msg.ToBuffer()); }; if (!m_QueryHandler->HandleHookedDNSMessage(std::move(msg), reply)) { @@ -206,7 +169,7 @@ namespace llarp::dns // no upstream resolvers // let's serv fail it msg.AddServFail(); - SendServerMessageBufferTo(resolver, from, MessageToBuffer(std::move(msg))); + SendServerMessageBufferTo(resolver, from, msg.ToBuffer()); } else { diff --git a/llarp/dns/server.hpp b/llarp/dns/server.hpp index 9d0a8cc16..3d0d9784b 100644 --- a/llarp/dns/server.hpp +++ b/llarp/dns/server.hpp @@ -2,7 +2,7 @@ #define LLARP_DNS_SERVER_HPP #include -#include +#include #include #include #include @@ -14,8 +14,9 @@ namespace llarp namespace dns { /// handler of dns query hooking - struct IQueryHandler + class IQueryHandler { + public: virtual ~IQueryHandler() = default; /// return true if we should hook this message @@ -27,10 +28,11 @@ namespace llarp HandleHookedDNSMessage(Message query, std::function sendReply) = 0; }; - struct PacketHandler : public std::enable_shared_from_this + // Base class for DNS lookups + class PacketHandler : public std::enable_shared_from_this { + public: using Logic_ptr = std::shared_ptr; - using Buffer_t = std::vector; explicit PacketHandler(Logic_ptr logic, IQueryHandler* handler); @@ -46,18 +48,18 @@ namespace llarp Restart(); void - HandlePacket(SockAddr resolver, SockAddr from, Buffer_t buf); + HandlePacket(const SockAddr& resolver, const SockAddr& from, llarp_buffer_t buf); bool - ShouldHandlePacket(SockAddr to, SockAddr from, Buffer_t buf) const; + ShouldHandlePacket(const SockAddr& to, const SockAddr& from, llarp_buffer_t buf) const; protected: virtual void - SendServerMessageBufferTo(SockAddr from, SockAddr to, Buffer_t buf) = 0; + SendServerMessageBufferTo(const SockAddr& from, const SockAddr& to, llarp_buffer_t buf) = 0; private: void - HandleUpstreamFailure(SockAddr from, SockAddr to, Message msg); + HandleUpstreamFailure(const SockAddr& from, const SockAddr& to, Message msg); bool SetupUnboundResolver(std::vector resolvers); @@ -68,27 +70,24 @@ namespace llarp Logic_ptr m_Logic; }; - struct Proxy : public PacketHandler + // Proxying DNS handler that listens on a UDP port for proper DNS requests. + class Proxy : public PacketHandler { + public: using Logic_ptr = std::shared_ptr; - explicit Proxy(llarp_ev_loop_ptr loop, Logic_ptr logic, IQueryHandler* handler); + explicit Proxy(EventLoop_ptr loop, Logic_ptr logic, IQueryHandler* handler); bool Start(SockAddr localaddr, std::vector resolvers) override; - using Buffer_t = std::vector; - protected: void - SendServerMessageBufferTo(SockAddr from, SockAddr to, Buffer_t buf) override; - - private: - static void - HandleUDP(llarp_udp_io*, const SockAddr&, ManagedBuffer); + SendServerMessageBufferTo( + const SockAddr& from, const SockAddr& to, llarp_buffer_t buf) override; private: - llarp_udp_io m_Server; - llarp_ev_loop_ptr m_Loop; + std::shared_ptr m_Server; + EventLoop_ptr m_Loop; }; } // namespace dns } // namespace llarp diff --git a/llarp/dns/unbound_resolver.cpp b/llarp/dns/unbound_resolver.cpp index 52fa1a335..462385c9a 100644 --- a/llarp/dns/unbound_resolver.cpp +++ b/llarp/dns/unbound_resolver.cpp @@ -39,12 +39,8 @@ namespace llarp::dns std::shared_ptr logic, ReplyFunction reply, FailFunction fail) : unboundContext(nullptr) , started(false) - , replyFunc([logic, reply](auto res, auto source, auto buf) { - LogicCall(logic, [source, buf, reply, res]() { reply(res, source, buf); }); - }) - , failFunc([logic, fail](auto res, auto source, auto message) { - LogicCall(logic, [source, message, res, fail]() { fail(res, source, message); }); - }) + , replyFunc(logic->make_caller(std::move(reply))) + , failFunc(logic->make_caller(std::move(fail))) {} // static callback @@ -65,8 +61,8 @@ namespace llarp::dns ub_resolve_free(result); return; } - std::vector pkt(result->answer_len); - std::copy_n(static_cast(result->answer_packet), pkt.size(), pkt.data()); + OwnedBuffer pkt{(size_t)result->answer_len}; + std::memcpy(pkt.buf.get(), result->answer_packet, pkt.sz); llarp_buffer_t buf(pkt); MessageHeader hdr; diff --git a/llarp/dns/unbound_resolver.hpp b/llarp/dns/unbound_resolver.hpp index a4fb8c055..498df35be 100644 --- a/llarp/dns/unbound_resolver.hpp +++ b/llarp/dns/unbound_resolver.hpp @@ -18,8 +18,9 @@ namespace llarp::dns { using ReplyFunction = - std::function buf)>; - using FailFunction = std::function; + std::function; + using FailFunction = + std::function; class UnboundResolver : public std::enable_shared_from_this { diff --git a/llarp/ev/ev.cpp b/llarp/ev/ev.cpp index 5be343a95..a8e54630b 100644 --- a/llarp/ev/ev.cpp +++ b/llarp/ev/ev.cpp @@ -1,4 +1,4 @@ -#include +#include #include #include #include @@ -10,64 +10,20 @@ // We libuv now #include -llarp_ev_loop_ptr -llarp_make_ev_loop(size_t queueLength) +namespace llarp { - llarp_ev_loop_ptr r = std::make_shared(queueLength); - r->init(); - r->update_time(); - return r; -} - -void -llarp_ev_loop_run_single_process(llarp_ev_loop_ptr ev, std::shared_ptr logic) -{ - if (ev == nullptr or logic == nullptr) - return; - ev->run(); - logic->clear_event_loop(); - ev->stopped(); -} - -int -llarp_ev_add_udp(const llarp_ev_loop_ptr& ev, struct llarp_udp_io* udp, const llarp::SockAddr& src) -{ - if (ev == nullptr or udp == nullptr) + EventLoop_ptr + EventLoop::create(size_t queueLength) { - llarp::LogError("Attempting llarp_ev_add_udp() with null event loop or udp io struct."); - return -1; + return std::make_shared(queueLength); } - udp->parent = ev.get(); - if (ev->udp_listen(udp, src)) - return 0; - llarp::LogError("llarp_ev_add_udp() call to udp_listen failed."); - return -1; -} - -int -llarp_ev_close_udp(struct llarp_udp_io* udp) -{ - if (udp->parent->udp_close(udp)) - return 0; - return -1; -} -llarp_time_t -llarp_ev_loop_time_now_ms(const llarp_ev_loop_ptr& loop) -{ - if (loop) - return loop->time_now(); - return llarp::time_now_ms(); -} - -void -llarp_ev_loop_stop(const llarp_ev_loop_ptr& loop) -{ - loop->stop(); -} + void + EventLoop::run(Logic& logic) + { + run_loop(); + logic.clear_event_loop(); + stopped(); + } -int -llarp_ev_udp_sendto(struct llarp_udp_io* udp, const llarp::SockAddr& to, const llarp_buffer_t& buf) -{ - return udp->sendto(udp, to, buf.base, buf.sz); -} +} // namespace llarp diff --git a/llarp/ev/ev.h b/llarp/ev/ev.h deleted file mode 100644 index e49a7502f..000000000 --- a/llarp/ev/ev.h +++ /dev/null @@ -1,88 +0,0 @@ -#ifndef LLARP_EV_H -#define LLARP_EV_H - -#include -#include -#include - -#ifdef _WIN32 -#include -#include -#include -#else -#include -#include -#endif -#include - -#include - -#include -#include - -#include - -/** - * ev.h - * - * event handler (cross platform high performance event system for IO) - */ - -#define EV_TICK_INTERVAL 10 - -namespace llarp -{ - class Logic; - struct EventLoop; -} // namespace llarp - -using llarp_ev_loop_ptr = std::shared_ptr; - -/// make an event loop using our baked in event loop on Windows -/// make an event loop using libuv otherwise. -/// @param queue_size how big the logic job queue is -llarp_ev_loop_ptr -llarp_make_ev_loop(std::size_t queue_size = llarp::event_loop_queue_size); - -// run mainloop -void -llarp_ev_loop_run_single_process(llarp_ev_loop_ptr ev, std::shared_ptr logic); - -/// get the current time on the event loop -llarp_time_t -llarp_ev_loop_time_now_ms(const llarp_ev_loop_ptr& ev); - -/// stop event loop and wait for it to complete all jobs -void -llarp_ev_loop_stop(const llarp_ev_loop_ptr& ev); - -/// UDP handling configuration -struct llarp_udp_io -{ - /// set after added - int fd; - void* user; - void* impl; - llarp::EventLoop* parent; - - /// called every event loop tick after reads - void (*tick)(struct llarp_udp_io*); - - void (*recvfrom)(struct llarp_udp_io*, const llarp::SockAddr& source, ManagedBuffer); - /// set by parent - int (*sendto)(struct llarp_udp_io*, const llarp::SockAddr&, const byte_t*, size_t); -}; - -/// add UDP handler -int -llarp_ev_add_udp(const llarp_ev_loop_ptr& ev, struct llarp_udp_io* udp, const llarp::SockAddr& src); - -/// send a UDP packet -int -llarp_ev_udp_sendto(struct llarp_udp_io* udp, const llarp::SockAddr& to, const llarp_buffer_t& pkt); - -/// close UDP handler -int -llarp_ev_close_udp(struct llarp_udp_io* udp); - -#endif diff --git a/llarp/ev/ev.hpp b/llarp/ev/ev.hpp index 5dd0f5ad9..0f1ad8664 100644 --- a/llarp/ev/ev.hpp +++ b/llarp/ev/ev.hpp @@ -1,12 +1,10 @@ #ifndef LLARP_EV_HPP #define LLARP_EV_HPP -#include -#include -#include #include -#include +#include #include +#include // writev #ifndef _WIN32 @@ -54,49 +52,68 @@ struct llarp_ev_pkt_pipe; namespace llarp { + class Logic; + struct SockAddr; + struct UDPHandle; + namespace vpn { class NetworkInterface; } - /// distinct event loop waker uper - class EventLoopWakeup + namespace net { - protected: - std::function callback; + struct IPPacket; + } + /// distinct event loop waker upper; used to idempotently schedule a task on the next event loop + /// + /// Created via EventLoop::make_waker(...). + class EventLoopWakeup + { public: - EventLoopWakeup(std::function cb) : callback{cb} - {} - + /// Destructor: remove the task from the event loop task. (Note that destruction here only + /// initiates removal of the task from the underlying event loop: it is *possible* for the + /// callback to fire again if already triggered depending on the underlying implementation). virtual ~EventLoopWakeup() = default; - /// async wakeup and call callback once + /// trigger this task to run on the next event loop iteration; does nothing if already + /// triggered. virtual void - Wakeup() = 0; + Trigger() = 0; + }; - /// end operation + /// holds a repeated task on the event loop; the task is removed on destruction + class EventLoopRepeater + { + public: + // Destructor: if the task has been started then it is removed from the event loop. Note + // that it is possible for a task to fire *after* destruction of this container; + // destruction only initiates removal of the periodic task. + virtual ~EventLoopRepeater() = default; + + // Starts the repeater to call `task` every `every` period. virtual void - End() = 0; + start(llarp_time_t every, std::function task) = 0; }; // this (nearly!) abstract base class // is overriden for each platform struct EventLoop + // : std::enable_shared_from_this // FIXME: do I actually need shared_from_this()? { - virtual bool - init() = 0; + // Runs the event loop. This does not return until sometime after `stop()` is called (and so + // typically you want to run this in its own thread). + void + run(Logic& logic); - virtual int - run() = 0; + // Actually runs the underlying implementation event loop; called by run(). + virtual void + run_loop() = 0; virtual bool running() const = 0; - virtual void - update_time() - {} - virtual llarp_time_t time_now() const { @@ -104,13 +121,13 @@ namespace llarp } virtual void - stopped(){}; - - virtual uint32_t - call_after_delay(llarp_time_t delay_ms, std::function callback) = 0; + stopped() + {} + // Adds a timer to the event loop; should only be called from the logic thread (and so is + // typically scheduled via a call to Logic::call_later()). virtual void - cancel_delayed_call(uint32_t call_id) = 0; + call_after_delay(llarp_time_t delay_ms, std::function callback) = 0; virtual bool add_network_interface( @@ -123,14 +140,15 @@ namespace llarp virtual void stop() = 0; - virtual bool - udp_listen(llarp_udp_io* l, const llarp::SockAddr& src) = 0; + using UDPReceiveFunc = std::function; - virtual bool - udp_close(llarp_udp_io* l) = 0; + // Constructs a UDP socket that can be used for sending and/or receiving + virtual std::shared_ptr + udp(UDPReceiveFunc on_recv) = 0; /// give this event loop a logic thread for calling - virtual void set_logic(std::shared_ptr) = 0; + virtual void + set_logic(const std::shared_ptr& logic) = 0; virtual ~EventLoop() = default; @@ -141,15 +159,28 @@ namespace llarp virtual void set_pump_function(std::function pumpll) = 0; - virtual void - register_poll_fd_readable(int fd, std::function callback) = 0; + /// Make a thread-safe event loop waker (an "async" in libuv terminology) on this event loop; + /// you can call `->Trigger()` on the returned shared pointer to fire the callback at the next + /// available event loop iteration. (Multiple Trigger calls invoked before the call is actually + /// made are coalesced into one call). + virtual std::shared_ptr + make_waker(std::function callback) = 0; - virtual void - deregister_poll_fd_readable(int fd) = 0; + // Initializes a new repeated task object. Note that the task is not actually added to the event + // loop until you call start() on the returned object. Typically invoked via Logic::call_every. + virtual std::shared_ptr + make_repeater() = 0; + + // Constructs and initializes a new default (libuv) event loop + static std::shared_ptr + create(size_t queueLength = event_loop_queue_size); - /// make an event loop waker on this event loop - virtual EventLoopWakeup* - make_event_loop_waker(std::function callback) = 0; + // Returns true if called from within the event loop thread, false otherwise. + virtual bool + inEventLoopThread() const = 0; }; + + using EventLoop_ptr = std::shared_ptr; + } // namespace llarp #endif diff --git a/llarp/ev/ev_libuv.cpp b/llarp/ev/ev_libuv.cpp index 352a135d0..5126e3784 100644 --- a/llarp/ev/ev_libuv.cpp +++ b/llarp/ev/ev_libuv.cpp @@ -1,322 +1,82 @@ #include #include +#include +#include +#include #include #include #include +#include "ev/ev.hpp" -namespace libuv -{ -#define LoopCall(h, ...) \ - { \ - auto __f = __VA_ARGS__; \ - __f(); \ - } +#include - struct glue - { - virtual ~glue() = default; - virtual void - Close() = 0; - }; - - class UVWakeup final : public llarp::EventLoopWakeup, public glue +namespace llarp::uv +{ + class UVWakeup final : public EventLoopWakeup { - uv_async_t m_Impl; - const int m_Idx; - static void - OnWake(uv_async_t* self) - { - static_cast(self->data)->callback(); - } + std::shared_ptr async; public: - UVWakeup(uv_loop_t* loop, std::function hook, int idx) - : llarp::EventLoopWakeup{hook}, m_Idx{idx} + UVWakeup(uvw::Loop& loop, std::function callback) + : async{loop.resource()} { - uv_async_init(loop, &m_Impl, OnWake); - m_Impl.data = this; + async->on([f = std::move(callback)](auto&, auto&) { f(); }); } - ~UVWakeup() = default; - void - Close() override + Trigger() override { - uv_close((uv_handle_t*)&m_Impl, [](uv_handle_t* h) { - auto loop = static_cast(h->loop->data); - loop->delete_waker(static_cast(h->data)->m_Idx); - }); + async->send(); } - void - End() override + ~UVWakeup() override { - Close(); - } - - void - Wakeup() override - { - uv_async_send(&m_Impl); + async->close(); } }; - struct ticker_glue : public glue + class UVRepeater final : public EventLoopRepeater { - std::function func; - - ticker_glue(uv_loop_t* loop, std::function tick) : func(tick) - { - m_Ticker.data = this; - uv_check_init(loop, &m_Ticker); - } + std::shared_ptr timer; - static void - OnTick(uv_check_t* t) - { - llarp::LogTrace("ticker_glue::OnTick() start"); - ticker_glue* ticker = static_cast(t->data); - ticker->func(); - // Loop* loop = static_cast(t->loop->data); - // loop->FlushLogic(); - llarp::LogTrace("ticker_glue::OnTick() end"); - } + public: + UVRepeater(uvw::Loop& loop) : timer{loop.resource()} + {} - bool - Start() + void + start(llarp_time_t every, std::function task) override { - return uv_check_start(&m_Ticker, &OnTick) != -1; + timer->start(every, every); + timer->on([task = std::move(task)](auto&, auto&) { task(); }); } - void - Close() override + ~UVRepeater() override { - uv_check_stop(&m_Ticker); - uv_close((uv_handle_t*)&m_Ticker, [](auto h) { - ticker_glue* self = (ticker_glue*)h->data; - h->data = nullptr; - delete self; - }); + timer->stop(); } - - uv_check_t m_Ticker; }; - struct udp_glue : public glue + struct UDPHandle final : llarp::UDPHandle { - uv_udp_t m_Handle; - uv_check_t m_Ticker; - llarp_udp_io* const m_UDP; - llarp::SockAddr m_Addr; - std::vector m_Buffer; - - udp_glue(uv_loop_t* loop, llarp_udp_io* udp, const llarp::SockAddr& src) - : m_UDP(udp), m_Addr(src) - { - m_Handle.data = this; - m_Ticker.data = this; - uv_udp_init(loop, &m_Handle); - uv_check_init(loop, &m_Ticker); - } - - static void - Alloc(uv_handle_t* h, size_t suggested_size, uv_buf_t* buf) - { - udp_glue* self = static_cast(h->data); - if (self->m_Buffer.empty()) - self->m_Buffer.resize(suggested_size); - buf->base = self->m_Buffer.data(); - buf->len = self->m_Buffer.size(); - } - - /// callback for libuv - static void - OnRecv(uv_udp_t* handle, ssize_t nread, const uv_buf_t* buf, const sockaddr* addr, unsigned) - { - udp_glue* glue = static_cast(handle->data); - if (addr) - glue->RecvFrom(nread, buf, llarp::SockAddr(*addr)); - } - - void - RecvFrom(ssize_t sz, const uv_buf_t* buf, const llarp::SockAddr& fromaddr) - { - if (sz > 0 && m_UDP) - { - const size_t pktsz = sz; - if (m_UDP->recvfrom) - { - const llarp_buffer_t pkt((const byte_t*)buf->base, pktsz); - m_UDP->recvfrom(m_UDP, fromaddr, ManagedBuffer{pkt}); - } - } - } - - static void - OnTick(uv_check_t* t) - { - llarp::LogTrace("udp_glue::OnTick() start"); - udp_glue* udp = static_cast(t->data); - udp->Tick(); - llarp::LogTrace("udp_glue::OnTick() end"); - } - - void - Tick() - { - if (m_UDP && m_UDP->tick) - m_UDP->tick(m_UDP); - } - - static int - SendTo(llarp_udp_io* udp, const llarp::SockAddr& to, const byte_t* ptr, size_t sz) - { - auto* self = static_cast(udp->impl); - if (self == nullptr) - return -1; - const auto buf = uv_buf_init((char*)ptr, sz); - return uv_udp_try_send( - &self->m_Handle, &buf, 1, (const sockaddr*)static_cast(to)); - } + UDPHandle(uvw::Loop& loop, ReceiveFunc rf); bool - Bind() - { - auto ret = - uv_udp_bind(&m_Handle, (const sockaddr*)static_cast(m_Addr), 0); - if (ret) - { - llarp::LogError("failed to bind to ", m_Addr, " ", uv_strerror(ret)); - return false; - } - if (uv_udp_recv_start(&m_Handle, &Alloc, &OnRecv)) - { - llarp::LogError("failed to start recving packets via ", m_Addr); - return false; - } - if (uv_check_start(&m_Ticker, &OnTick)) - { - llarp::LogError("failed to start ticker"); - return false; - } -#if defined(_WIN32) || defined(_WIN64) -#else - if (uv_fileno((const uv_handle_t*)&m_Handle, &m_UDP->fd)) - return false; -#endif - m_UDP->sendto = &SendTo; - m_UDP->impl = this; - return true; - } + listen(const SockAddr& addr) override; - static void - OnClosed(uv_handle_t* h) - { - auto* glue = static_cast(h->data); - if (glue) - { - h->data = nullptr; - delete glue; - } - } + bool + send(const SockAddr& dest, const llarp_buffer_t& buf) override; void - Close() override - { - m_UDP->impl = nullptr; - uv_check_stop(&m_Ticker); - uv_close((uv_handle_t*)&m_Handle, &OnClosed); - } - }; - - struct tun_glue : public glue - { - uv_poll_t m_Handle; - uv_check_t m_Ticker; - std::shared_ptr m_NetIf; - std::function m_Handler; - - tun_glue( - std::shared_ptr netif, - std::function handler) - : m_NetIf{std::move(netif)}, m_Handler{std::move(handler)} - { - m_Handle.data = this; - m_Ticker.data = this; - } - - static void - OnTick(uv_check_t* h) - { - auto self = static_cast(h->data); - while (self->m_NetIf->HasNextPacket()) - self->Read(); - } - - static void - OnPoll(uv_poll_t* h, int, int events) - { - if (events & UV_READABLE) - { - static_cast(h->data)->Read(); - } - } + close() override; - void - Read() - { - auto pkt = m_NetIf->ReadNextPacket(); - LogDebug("got packet ", pkt.sz); - if (m_Handler) - m_Handler(std::move(pkt)); - } + ~UDPHandle() override; - static void - OnClosed(uv_handle_t* h) - { - auto* self = static_cast(h->data); - if (self) - { - h->data = nullptr; - delete self; - } - } + private: + std::shared_ptr handle; void - Close() override - { - uv_check_stop(&m_Ticker); -#ifndef _WIN32 - uv_close((uv_handle_t*)&m_Handle, &OnClosed); -#endif - } - - bool - Init(uv_loop_t* loop) - { - if (uv_check_init(loop, &m_Ticker) == -1) - { - return false; - } - if (uv_check_start(&m_Ticker, &OnTick) == -1) - { - return false; - } -#ifndef _WIN32 - if (uv_poll_init(loop, &m_Handle, m_NetIf->PollFD()) == -1) - { - llarp::LogError("failed to initialize polling on ", m_NetIf->IfName()); - return false; - } - if (uv_poll_start(&m_Handle, UV_READABLE, &OnPoll)) - { - llarp::LogError("failed to start polling on ", m_NetIf->IfName()); - return false; - } -#endif - return true; - } + reset_handle(uvw::Loop& loop); }; void @@ -331,63 +91,36 @@ namespace libuv llarp::LogTrace("Loop::FlushLogic() end"); } - static void - OnAsyncWake(uv_async_t* async_handle) + void + Loop::tick_event_loop() { - llarp::LogTrace("OnAsyncWake, ticking event loop."); - Loop* loop = static_cast(async_handle->data); - loop->update_time(); - loop->process_timer_queue(); - loop->process_cancel_queue(); - loop->FlushLogic(); - loop->PumpLL(); + llarp::LogTrace("ticking event loop."); + FlushLogic(); + PumpLL(); auto& log = llarp::LogContext::Instance(); if (log.logStream) - log.logStream->Tick(loop->time_now()); + log.logStream->Tick(time_now()); } - constexpr size_t TimerQueueSize = 20; - - Loop::Loop(size_t queue_size) - : llarp::EventLoop{} - , PumpLL{[]() {}} - , m_LogicCalls{queue_size} - , m_timerQueue{TimerQueueSize} - , m_timerCancelQueue{TimerQueueSize} - {} - - bool - Loop::init() + Loop::Loop(size_t queue_size) : llarp::EventLoop{}, PumpLL{[] {}}, m_LogicCalls{queue_size} { - if (uv_loop_init(&m_Impl) == -1) - return false; + if (!(m_Impl = uvw::Loop::create())) + throw std::runtime_error{"Failed to construct libuv loop"}; #ifdef LOKINET_DEBUG last_time = 0; loop_run_count = 0; #endif - m_Impl.data = this; -#if defined(_WIN32) || defined(_WIN64) -#else - uv_loop_configure(&m_Impl, UV_LOOP_BLOCK_SIGNAL, SIGPIPE); +#ifndef _WIN32 + signal(SIGPIPE, SIG_IGN); #endif - m_TickTimer = new uv_timer_t; - m_TickTimer->data = this; - if (uv_timer_init(&m_Impl, m_TickTimer) == -1) - return false; + m_Run.store(true); m_nextID.store(0); - m_WakeUp.data = this; - uv_async_init(&m_Impl, &m_WakeUp, &OnAsyncWake); - return true; - } - - void - Loop::update_time() - { - llarp::EventLoop::update_time(); - uv_update_time(&m_Impl); + if (!(m_WakeUp = m_Impl->resource())) + throw std::runtime_error{"Failed to create libuv async"}; + m_WakeUp->on([this](const auto&, auto&) { tick_event_loop(); }); } bool @@ -396,18 +129,12 @@ namespace libuv return m_Run.load(); } - static void - OnTickTimeout(uv_timer_t* timer) - { - uv_stop(timer->loop); - } - - int - Loop::run() + void + Loop::run_loop() { - llarp::LogTrace("Loop::run()"); + llarp::LogTrace("Loop::run_loop()"); m_EventLoopThreadID = std::this_thread::get_id(); - return uv_run(&m_Impl, UV_RUN_DEFAULT); + m_Impl->run(); } void @@ -416,98 +143,30 @@ namespace libuv PumpLL = std::move(pump); } - struct TimerData + std::shared_ptr + Loop::udp(UDPReceiveFunc on_recv) { - Loop* loop; - uint64_t job_id; - }; - - void - CloseUVTimer(uv_timer_t* timer) - { - // have to delete timer handle this way because libuv. - uv_timer_stop(timer); - uv_close((uv_handle_t*)timer, [](uv_handle_t* handle) { delete (uv_timer_t*)handle; }); - } - - static void - OnUVTimer(uv_timer_t* timer) - { - TimerData* timer_data = static_cast(timer->data); - Loop* loop = timer_data->loop; - loop->do_timer_job(timer_data->job_id); - - delete timer_data; - CloseUVTimer(timer); + return std::static_pointer_cast( + std::make_shared(*m_Impl, std::move(on_recv))); } - uint32_t + // TODO: replace this one-shot timer mechanism with a repeated timer, because most likely + // everything using this is repeating scheduling itself all the time and would be better served by + // a repeating uv_timer. + void Loop::call_after_delay(llarp_time_t delay_ms, std::function callback) { llarp::LogTrace("Loop::call_after_delay()"); #ifdef TESTNET_SPEED delay_ms *= TESTNET_SPEED; #endif - PendingTimer timer; - timer.delay_ms = delay_ms; - timer.callback = callback; - timer.job_id = m_nextID++; - uint64_t job_id = timer.job_id; - - m_timerQueue.pushBack(std::move(timer)); - uv_async_send(&m_WakeUp); - - return job_id; - } - - void - Loop::cancel_delayed_call(uint32_t job_id) - { - m_timerCancelQueue.pushBack(job_id); - uv_async_send(&m_WakeUp); - } - - void - Loop::process_timer_queue() - { - while (not m_timerQueue.empty()) - { - PendingTimer job = m_timerQueue.popFront(); - uint64_t job_id = job.job_id; - m_pendingCalls.emplace(job_id, std::move(job.callback)); - - TimerData* timer_data = new TimerData; - timer_data->loop = this; - timer_data->job_id = job_id; - - uv_timer_t* newTimer = new uv_timer_t; - newTimer->data = (void*)timer_data; - - uv_timer_init(&m_Impl, newTimer); - uv_timer_start(newTimer, &OnUVTimer, job.delay_ms.count(), 0); - } - } - - void - Loop::process_cancel_queue() - { - while (not m_timerCancelQueue.empty()) - { - uint64_t job_id = m_timerCancelQueue.popFront(); - m_pendingCalls.erase(job_id); - } - } - - void - Loop::do_timer_job(uint64_t job_id) - { - auto itr = m_pendingCalls.find(job_id); - if (itr != m_pendingCalls.end()) - { - if (itr->second) - itr->second(); - m_pendingCalls.erase(itr->first); - } + auto timer = m_Impl->resource(); + timer->on([f = std::move(callback)](const auto&, auto& timer) { + f(); + timer.stop(); + timer.close(); + }); + timer->start(delay_ms, 0ms); } void @@ -516,61 +175,34 @@ namespace libuv if (m_Run) { llarp::LogInfo("stopping event loop"); - CloseAll(); - uv_stop(&m_Impl); + m_Impl->walk([](auto&& handle) { + if constexpr (!std::is_pointer_v>) + handle.close(); + }); + llarp::LogDebug("Closed all handles, stopping the loop"); + m_Impl->stop(); } m_Run.store(false); } - void - Loop::CloseAll() - { - llarp::LogInfo("Closing all handles"); - uv_walk( - &m_Impl, - [](uv_handle_t* h, void*) { - if (uv_is_closing(h)) - return; - if (h->data && uv_is_active(h) && h->type != UV_TIMER && h->type != UV_POLL) - { - auto glue = reinterpret_cast(h->data); - if (glue) - glue->Close(); - } - }, - nullptr); - } - void Loop::stopped() { - llarp::LogInfo("we have stopped"); - } - - bool - Loop::udp_listen(llarp_udp_io* udp, const llarp::SockAddr& src) - { - auto* impl = new udp_glue(&m_Impl, udp, src); - udp->impl = impl; - if (impl->Bind()) + if (m_Impl) { - return true; + m_Impl->close(); + m_Impl.reset(); } - llarp::LogError("Loop::udp_listen failed to bind"); - delete impl; - return false; + llarp::LogInfo("we have stopped"); } bool Loop::add_ticker(std::function func) { - auto* ticker = new ticker_glue(&m_Impl, func); - if (ticker->Start()) - { - return true; - } - delete ticker; - return false; + auto check = m_Impl->resource(); + check->on([f = std::move(func)](auto&, auto&) { f(); }); + check->start(); + return true; } bool @@ -578,23 +210,32 @@ namespace libuv std::shared_ptr netif, std::function handler) { - auto* glue = new tun_glue(netif, handler); - // call to Init gives ownership of glue to event loop - if (glue->Init(&m_Impl)) - return true; - delete glue; - return false; - } - - bool - Loop::udp_close(llarp_udp_io* udp) - { - if (udp == nullptr) - return false; - auto* glue = static_cast(udp->impl); - if (glue == nullptr) +#ifndef _WIN32 + using event_t = uvw::PollEvent; + auto handle = m_Impl->resource(netif->PollFD()); +#else + using event_t = uvw::CheckEvent; + auto handle = m_Impl->resource(); +#endif + if (!handle) return false; - glue->Close(); + + handle->on([netif = std::move(netif), handler = std::move(handler)]( + const event_t&, [[maybe_unused]] auto& handle) { + for (auto pkt = netif->ReadNextPacket(); pkt.sz > 0; pkt = netif->ReadNextPacket()) + { + LogDebug("got packet ", pkt.sz); + if (handler) + handler(std::move(pkt)); + } + }); + +#ifndef _WIN32 + handle->start(uvw::PollHandle::Event::READABLE); +#else + handle->start(); +#endif + return true; } @@ -604,80 +245,97 @@ namespace libuv if (not m_EventLoopThreadID.has_value()) { m_LogicCalls.tryPushBack(f); - uv_async_send(&m_WakeUp); + m_WakeUp->send(); return; } - const auto inEventLoop = *m_EventLoopThreadID == std::this_thread::get_id(); + const bool inEventLoop = *m_EventLoopThreadID == std::this_thread::get_id(); if (inEventLoop and m_LogicCalls.full()) { FlushLogic(); } m_LogicCalls.pushBack(f); - uv_async_send(&m_WakeUp); + m_WakeUp->send(); } + // Sets `handle` to a new uvw UDP handle, first initiating a close and then disowning the handle + // if already set, allocating the resource, and setting the receive event on it. void - OnUVPollFDReadable(uv_poll_t* handle, int status, [[maybe_unused]] int events) + UDPHandle::reset_handle(uvw::Loop& loop) + { + if (handle) + handle->close(); + handle = loop.resource(); + handle->on([this](auto& event, auto& /*handle*/) { + on_recv( + *this, + SockAddr{event.sender.ip, static_cast(event.sender.port)}, + OwnedBuffer{std::move(event.data), event.length}); + }); + } + + llarp::uv::UDPHandle::UDPHandle(uvw::Loop& loop, ReceiveFunc rf) : llarp::UDPHandle{std::move(rf)} { - if (status < 0) - return; // probably fd was closed + reset_handle(loop); + } - auto func = static_cast(handle->data); + bool + UDPHandle::listen(const SockAddr& addr) + { + if (handle->active()) + reset_handle(handle->loop()); + + bool good = true; + auto err = handle->on([&](auto& event, auto&) { + llarp::LogError("failed to bind and start receiving on ", addr, ": ", event.what()); + good = false; + }); + handle->bind(*static_cast(addr)); + if (good) + handle->recv(); + handle->erase(err); + return good; + } - (*func)(); + bool + UDPHandle::send(const SockAddr& to, const llarp_buffer_t& buf) + { + return handle->trySend( + *static_cast(to), + const_cast(reinterpret_cast(buf.base)), + buf.sz) + >= 0; } void - Loop::register_poll_fd_readable(int fd, Callback callback) + UDPHandle::close() { - if (m_Polls.count(fd)) - { - llarp::LogError( - "Attempting to create event loop poll on fd ", - fd, - ", but an event loop poll for that fd already exists."); - return; - } - - // new a copy as the one passed in here will go out of scope - auto function_ptr = new Callback(callback); - - auto& new_poll = m_Polls[fd]; - - uv_poll_init(&m_Impl, &new_poll, fd); - new_poll.data = (void*)function_ptr; - uv_poll_start(&new_poll, UV_READABLE, &OnUVPollFDReadable); + handle->close(); + handle.reset(); } - void - Loop::deregister_poll_fd_readable(int fd) + UDPHandle::~UDPHandle() { - auto itr = m_Polls.find(fd); + close(); + } - if (itr != m_Polls.end()) - { - uv_poll_stop(&(itr->second)); - auto func = static_cast(itr->second.data); - delete func; - m_Polls.erase(itr); - } + std::shared_ptr + Loop::make_waker(std::function callback) + { + return std::static_pointer_cast( + std::make_shared(*m_Impl, std::move(callback))); } - llarp::EventLoopWakeup* - Loop::make_event_loop_waker(std::function callback) + std::shared_ptr + Loop::make_repeater() { - auto wake_idx = m_NumWakers++; - auto wake = new UVWakeup{&m_Impl, callback, wake_idx}; - m_Wakers[wake_idx] = wake; - return wake; + return std::static_pointer_cast(std::make_shared(*m_Impl)); } - void - Loop::delete_waker(int idx) + bool + Loop::inEventLoopThread() const { - delete m_Wakers[idx]; - m_Wakers.erase(idx); + return m_EventLoopThreadID and *m_EventLoopThreadID == std::this_thread::get_id(); } -} // namespace libuv +} // namespace llarp::uv diff --git a/llarp/ev/ev_libuv.hpp b/llarp/ev/ev_libuv.hpp index 4eb162c40..3560aabd5 100644 --- a/llarp/ev/ev_libuv.hpp +++ b/llarp/ev/ev_libuv.hpp @@ -1,58 +1,42 @@ #ifndef LLARP_EV_LIBUV_HPP #define LLARP_EV_LIBUV_HPP #include -#include -#include -#include +#include "udp_handle.hpp" #include #include #include +#include +#include +#include +#include + +#include #include +#include -namespace libuv +namespace llarp::uv { class UVWakeup; + class UVRepeater; struct Loop final : public llarp::EventLoop { - typedef std::function Callback; - - struct PendingTimer - { - uint64_t job_id; - llarp_time_t delay_ms; - Callback callback; - }; + using Callback = std::function; Loop(size_t queue_size); - bool - init() override; - - int - run() override; + void + run_loop() override; bool running() const override; void - update_time() override; - - uint32_t call_after_delay(llarp_time_t delay_ms, std::function callback) override; void - cancel_delayed_call(uint32_t job_id) override; - - void - process_timer_queue(); - - void - process_cancel_queue(); - - void - do_timer_job(uint64_t job_id); + tick_event_loop(); void stop() override; @@ -60,15 +44,6 @@ namespace libuv void stopped() override; - void - CloseAll(); - - bool - udp_listen(llarp_udp_io* l, const llarp::SockAddr& src) override; - - bool - udp_close(llarp_udp_io* l) override; - bool add_ticker(std::function ticker) override; @@ -78,41 +53,37 @@ namespace libuv std::function handler) override; void - set_logic(std::shared_ptr l) override + set_logic(const std::shared_ptr& l) override { - m_Logic = l; - m_Logic->SetQueuer(llarp::util::memFn(&Loop::call_soon, this)); + l->SetQueuer([this](std::function f) { call_soon(std::move(f)); }); } - std::shared_ptr m_Logic; - void call_soon(std::function f) override; - void - register_poll_fd_readable(int fd, Callback callback) override; - - void - deregister_poll_fd_readable(int fd) override; - void set_pump_function(std::function pumpll) override; - llarp::EventLoopWakeup* - make_event_loop_waker(std::function callback) override; + std::shared_ptr + make_waker(std::function callback) override; - void - delete_waker(int idx); + std::shared_ptr + make_repeater() override; + + std::shared_ptr + udp(UDPReceiveFunc on_recv) override; void FlushLogic(); std::function PumpLL; + bool + inEventLoopThread() const override; + private: - uv_loop_t m_Impl; - uv_timer_t* m_TickTimer; - uv_async_t m_WakeUp; + std::shared_ptr m_Impl; + std::shared_ptr m_WakeUp; std::atomic m_Run; using AtomicQueue_t = llarp::thread::Queue>; AtomicQueue_t m_LogicCalls; @@ -125,16 +96,11 @@ namespace libuv std::map m_pendingCalls; - std::unordered_map m_Polls; + std::unordered_map> m_Polls; - llarp::thread::Queue m_timerQueue; - llarp::thread::Queue m_timerCancelQueue; std::optional m_EventLoopThreadID; - - int m_NumWakers; - std::unordered_map m_Wakers; }; -} // namespace libuv +} // namespace llarp::uv #endif diff --git a/llarp/ev/udp_handle.hpp b/llarp/ev/udp_handle.hpp new file mode 100644 index 000000000..40a92dfe6 --- /dev/null +++ b/llarp/ev/udp_handle.hpp @@ -0,0 +1,41 @@ +#include "ev.hpp" +#include "../util/buffer.hpp" + +namespace llarp +{ + // Base type for UDP handling; constructed via EventLoop::udp(). + struct UDPHandle + { + using ReceiveFunc = EventLoop::UDPReceiveFunc; + + // Starts listening for incoming UDP packets on the given address. Returns true on success, + // false if the address could not be bound. If you send without calling this first then the + // socket will bind to a random high port on 0.0.0.0 (the "all addresses" address). + virtual bool + listen(const SockAddr& addr) = 0; + + // Sends a packet to the given recipient, immediately. Returns true if the send succeeded, + // false it could not be performed (either because of error, or because it would have blocked). + // If listen hasn't been called then a random IP/port will be used. + virtual bool + send(const SockAddr& dest, const llarp_buffer_t& buf) = 0; + + // Closes the listening UDP socket (if opened); this is typically called (automatically) during + // destruction. Does nothing if the UDP socket is already closed. + virtual void + close() = 0; + + // Base class destructor + virtual ~UDPHandle() = default; + + protected: + explicit UDPHandle(ReceiveFunc on_recv) : on_recv{std::move(on_recv)} + { + // It makes no sense at all to use this with a null receive function: + assert(this->on_recv); + } + + // Callback to invoke when data is received + ReceiveFunc on_recv; + }; +} // namespace llarp diff --git a/llarp/handlers/tun.cpp b/llarp/handlers/tun.cpp index 36dc82a79..cb720c1d2 100644 --- a/llarp/handlers/tun.cpp +++ b/llarp/handlers/tun.cpp @@ -39,26 +39,31 @@ namespace llarp } constexpr size_t udp_header_size = 8; - struct DnsHandler : public dns::PacketHandler + // Intercepts DNS IP packets going to an IP on the tun interface; this is currently used on + // Android where binding to a DNS port (i.e. via llarp::dns::Proxy) isn't possible because of OS + // restrictions, but a tun interface *is* available. + class DnsInterceptor : public dns::PacketHandler { + public: TunEndpoint* const m_Endpoint; - explicit DnsHandler(AbstractRouter* router, TunEndpoint* ep) + explicit DnsInterceptor(AbstractRouter* router, TunEndpoint* ep) : dns::PacketHandler{router->logic(), ep}, m_Endpoint{ep} {}; void - SendServerMessageBufferTo(SockAddr from, SockAddr to, std::vector buf) override + SendServerMessageBufferTo( + const SockAddr& from, const SockAddr& to, llarp_buffer_t buf) override { net::IPPacket pkt; - if (buf.size() + 28 > sizeof(pkt.buf)) + if (buf.sz + 28 > sizeof(pkt.buf)) return; auto* hdr = pkt.Header(); pkt.buf[1] = 0; hdr->version = 4; hdr->ihl = 5; - hdr->tot_len = htons(buf.size() + 28); + hdr->tot_len = htons(buf.sz + 28); hdr->protocol = 0x11; // udp hdr->ttl = 64; hdr->frag_off = htons(0b0100000000000000); @@ -72,11 +77,11 @@ namespace llarp ptr += 2; htobe16buf(ptr, to.getPort()); ptr += 2; - htobe16buf(ptr, buf.size() + udp_header_size); + htobe16buf(ptr, buf.sz + udp_header_size); ptr += 2; htobe16buf(ptr, uint16_t{0}); // checksum ptr += 2; - std::copy_n(buf.data(), buf.size(), ptr); + std::copy_n(buf.base, buf.sz, ptr); /// queue ip packet write const IpAddress remoteIP{from}; @@ -84,7 +89,7 @@ namespace llarp hdr->check = 0; hdr->check = net::ipchksum(pkt.buf, 20); - pkt.sz = 28 + buf.size(); + pkt.sz = 28 + buf.sz; m_Endpoint->HandleWriteIPPacket( pkt.ConstBuffer(), net::ExpandV4(remoteIP.toIP()), net::ExpandV4(localIP.toIP()), 0); } @@ -97,7 +102,7 @@ namespace llarp m_PacketRouter.reset( new vpn::PacketRouter{[&](net::IPPacket pkt) { HandleGotUserPacket(std::move(pkt)); }}); #if ANDROID - m_Resolver = std::make_shared(r, this); + m_Resolver = std::make_shared(r, this); m_PacketRouter->AddUDPHandler(huint16_t{53}, [&](net::IPPacket pkt) { const size_t ip_header_size = (pkt.Header()->ihl * 4); diff --git a/llarp/iwp/linklayer.cpp b/llarp/iwp/linklayer.cpp index 3ad98cbc7..1b4b23f4f 100644 --- a/llarp/iwp/linklayer.cpp +++ b/llarp/iwp/linklayer.cpp @@ -22,17 +22,12 @@ namespace llarp::iwp bool allowInbound) : ILinkLayer( keyManager, getrc, h, sign, before, est, reneg, timeout, closed, pumpDone, worker) - , m_Wakeup{ev->make_event_loop_waker([self = this]() { self->HandleWakeupPlaintext(); })} + , m_Wakeup{ev->make_waker([this]() { HandleWakeupPlaintext(); })} , m_PlaintextRecv{1024} , permitInbound{allowInbound} {} - LinkLayer::~LinkLayer() - { - m_Wakeup->End(); - } - const char* LinkLayer::Name() const { @@ -111,7 +106,7 @@ namespace llarp::iwp void LinkLayer::WakeupPlaintext() { - m_Wakeup->Wakeup(); + m_Wakeup->Trigger(); } void diff --git a/llarp/iwp/linklayer.hpp b/llarp/iwp/linklayer.hpp index 34b2b98a1..ec7a2ed26 100644 --- a/llarp/iwp/linklayer.hpp +++ b/llarp/iwp/linklayer.hpp @@ -33,8 +33,6 @@ namespace llarp::iwp WorkerFunc_t dowork, bool permitInbound); - ~LinkLayer() override; - std::shared_ptr NewOutboundSession(const RouterContact& rc, const AddressInfo& ai) override; @@ -63,7 +61,7 @@ namespace llarp::iwp void HandleWakeupPlaintext(); - EventLoopWakeup* const m_Wakeup; + const std::shared_ptr m_Wakeup; std::unordered_map, SockAddr::Hash> m_PlaintextRecv; std::unordered_map m_AuthedAddrs; const bool permitInbound; diff --git a/llarp/link/server.cpp b/llarp/link/server.cpp index 1d5408d96..0d6a16d03 100644 --- a/llarp/link/server.cpp +++ b/llarp/link/server.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -125,18 +126,19 @@ namespace llarp } bool - ILinkLayer::Configure(llarp_ev_loop_ptr loop, const std::string& ifname, int af, uint16_t port) + ILinkLayer::Configure(EventLoop_ptr loop, const std::string& ifname, int af, uint16_t port) { - m_Loop = loop; - m_udp.user = this; - m_udp.recvfrom = [](llarp_udp_io* udp, const llarp::SockAddr& from, ManagedBuffer pktbuf) { - ILinkSession::Packet_t pkt; - auto& buf = pktbuf.underlying; - pkt.resize(buf.sz); - std::copy_n(buf.base, buf.sz, pkt.data()); - static_cast(udp->user)->RecvFrom(from, std::move(pkt)); - }; - m_udp.tick = &ILinkLayer::udp_tick; + LogError("omg wtf"); + m_Loop = std::move(loop); + // using UDPReceiveFunc = std::function; + m_udp = m_Loop->udp( + [this]([[maybe_unused]] UDPHandle& udp, const SockAddr& from, llarp_buffer_t buf) { + ILinkSession::Packet_t pkt; + pkt.resize(buf.sz); + std::copy_n(buf.base, buf.sz, pkt.data()); + RecvFrom(from, std::move(pkt)); + }); + if (ifname == "*") { if (!AllInterfaces(af, m_ourAddr)) @@ -162,7 +164,11 @@ namespace llarp } } m_ourAddr.setPort(port); - return llarp_ev_add_udp(m_Loop, &m_udp, m_ourAddr) != -1; + if (not m_udp->listen(m_ourAddr)) + return false; + + m_Loop->add_ticker([this] { Pump(); }); + return true; } void @@ -335,15 +341,16 @@ namespace llarp } bool - ILinkLayer::Start(std::shared_ptr l) + ILinkLayer::Start(const std::shared_ptr& logic) { - m_Logic = l; - ScheduleTick(LINK_LAYER_TICK_INTERVAL); + // Tie the lifetime of this repeater to this arbitrary shared_ptr: + m_repeater_keepalive = std::make_shared(42); + logic->call_every(LINK_LAYER_TICK_INTERVAL, m_repeater_keepalive, [this] { Tick(Now()); }); return true; } void - ILinkLayer::Tick(llarp_time_t now) + ILinkLayer::Tick(const llarp_time_t now) { { Lock_t l(m_AuthedLinksMutex); @@ -373,8 +380,7 @@ namespace llarp void ILinkLayer::Stop() { - if (m_Logic && tick_id) - m_Logic->remove_call(tick_id); + m_repeater_keepalive.reset(); // make the repeater kill itself { Lock_t l(m_AuthedLinksMutex); for (const auto& [router, link] : m_AuthedLinks) @@ -421,6 +427,12 @@ namespace llarp } } + void + ILinkLayer::SendTo_LL(const SockAddr& to, const llarp_buffer_t& pkt) + { + m_udp->send(to, pkt); + } + bool ILinkLayer::SendTo( const RouterID& remote, const llarp_buffer_t& buf, ILinkSession::CompletionHandler completed) @@ -479,25 +491,4 @@ namespace llarp return true; } - void - ILinkLayer::OnTick() - { - auto now = Now(); - Tick(now); - ScheduleTick(LINK_LAYER_TICK_INTERVAL); - } - - void - ILinkLayer::ScheduleTick(llarp_time_t interval) - { - tick_id = m_Logic->call_later(interval, std::bind(&ILinkLayer::OnTick, this)); - } - - void - ILinkLayer::udp_tick(llarp_udp_io* udp) - { - ILinkLayer* link = static_cast(udp->user); - link->Pump(); - } - } // namespace llarp diff --git a/llarp/link/server.hpp b/llarp/link/server.hpp index a43d107ca..435ec028d 100644 --- a/llarp/link/server.hpp +++ b/llarp/link/server.hpp @@ -2,7 +2,7 @@ #define LLARP_LINK_SERVER_HPP #include -#include +#include #include #include #include @@ -93,7 +93,7 @@ namespace llarp llarp_time_t Now() const { - return llarp_ev_loop_time_now_ms(m_Loop); + return m_Loop->time_now(); } bool @@ -106,17 +106,11 @@ namespace llarp void ForEachSession(std::function visit) EXCLUDES(m_AuthedLinksMutex); - static void - udp_tick(llarp_udp_io* udp); - void - SendTo_LL(const SockAddr& to, const llarp_buffer_t& pkt) - { - llarp_ev_udp_sendto(&m_udp, to, pkt); - } + SendTo_LL(const SockAddr& to, const llarp_buffer_t& pkt); virtual bool - Configure(llarp_ev_loop_ptr loop, const std::string& ifname, int af, uint16_t port); + Configure(EventLoop_ptr loop, const std::string& ifname, int af, uint16_t port); virtual std::shared_ptr NewOutboundSession(const RouterContact& rc, const AddressInfo& ai) = 0; @@ -138,7 +132,7 @@ namespace llarp TryEstablishTo(RouterContact rc); bool - Start(std::shared_ptr l); + Start(const std::shared_ptr& l); virtual void Stop(); @@ -211,12 +205,6 @@ namespace llarp std::shared_ptr keyManager; WorkerFunc_t QueueWork; - std::shared_ptr - logic() - { - return m_Logic; - } - bool operator<(const ILinkLayer& other) const { @@ -235,19 +223,7 @@ namespace llarp return m_Pending.size(); } - int - GetUDPSocket() const - { - return m_udp.fd; - } - private: - void - OnTick(); - - void - ScheduleTick(llarp_time_t interval); - uint32_t tick_id; const SecretKey& m_RouterEncSecret; @@ -262,10 +238,9 @@ namespace llarp bool PutSession(const std::shared_ptr& s); - std::shared_ptr m_Logic = nullptr; - llarp_ev_loop_ptr m_Loop; + EventLoop_ptr m_Loop; SockAddr m_ourAddr; - llarp_udp_io m_udp; + std::shared_ptr m_udp; SecretKey m_SecretKey; using AuthedLinks = @@ -278,6 +253,9 @@ namespace llarp Pending m_Pending GUARDED_BY(m_PendingMutex); std::unordered_map m_RecentlyClosed; + + private: + std::shared_ptr m_repeater_keepalive; }; using LinkLayer_ptr = std::shared_ptr; diff --git a/llarp/net/ip_packet.hpp b/llarp/net/ip_packet.hpp index b65df40cd..afb06a77a 100644 --- a/llarp/net/ip_packet.hpp +++ b/llarp/net/ip_packet.hpp @@ -1,7 +1,7 @@ #ifndef LLARP_IP_HPP #define LLARP_IP_HPP -#include +#include #include #include #include @@ -109,8 +109,6 @@ struct ipv6_header #include #include -struct llarp_ev_loop; - namespace llarp { namespace net @@ -143,25 +141,25 @@ namespace llarp struct PutTime { - llarp_ev_loop_ptr loop; - PutTime(llarp_ev_loop_ptr evloop) : loop(std::move(evloop)) + EventLoop_ptr loop; + PutTime(EventLoop_ptr evloop) : loop(std::move(evloop)) {} void operator()(IPPacket& pkt) const { - pkt.timestamp = llarp_ev_loop_time_now_ms(loop); + pkt.timestamp = loop->time_now(); } }; struct GetNow { - llarp_ev_loop_ptr loop; - GetNow(llarp_ev_loop_ptr evloop) : loop(std::move(evloop)) + EventLoop_ptr loop; + GetNow(EventLoop_ptr evloop) : loop(std::move(evloop)) {} llarp_time_t operator()() const { - return llarp_ev_loop_time_now_ms(loop); + return loop->time_now(); } }; diff --git a/llarp/router/abstractrouter.hpp b/llarp/router/abstractrouter.hpp index 26625de25..24bda166d 100644 --- a/llarp/router/abstractrouter.hpp +++ b/llarp/router/abstractrouter.hpp @@ -8,7 +8,7 @@ #include #include #include -#include +#include #include #include #include @@ -81,7 +81,7 @@ namespace llarp using LMQ_ptr = std::shared_ptr; - struct AbstractRouter + struct AbstractRouter : public std::enable_shared_from_this { #ifdef LOKINET_HIVE tooling::RouterHive* hive = nullptr; @@ -92,22 +92,22 @@ namespace llarp virtual bool HandleRecvLinkMessageBuffer(ILinkSession* from, const llarp_buffer_t& msg) = 0; - virtual LMQ_ptr + virtual const LMQ_ptr& lmq() const = 0; virtual vpn::Platform* GetVPNPlatform() const = 0; - virtual std::shared_ptr + virtual const std::shared_ptr& RpcClient() const = 0; - virtual std::shared_ptr + virtual const std::shared_ptr& logic() const = 0; virtual llarp_dht_context* dht() const = 0; - virtual std::shared_ptr + virtual const std::shared_ptr& nodedb() const = 0; virtual const path::PathContext& @@ -122,7 +122,7 @@ namespace llarp virtual exit::Context& exitContext() = 0; - virtual std::shared_ptr + virtual const std::shared_ptr& keyManager() const = 0; virtual const SecretKey& @@ -134,7 +134,7 @@ namespace llarp virtual Profiling& routerProfiling() = 0; - virtual llarp_ev_loop_ptr + virtual const EventLoop_ptr& netloop() const = 0; /// call function in crypto worker diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index e0641f12b..60a0602a8 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -47,12 +47,10 @@ static constexpr std::chrono::milliseconds ROUTER_TICK_INTERVAL = 1s; namespace llarp { Router::Router( - llarp_ev_loop_ptr __netloop, - std::shared_ptr l, - std::shared_ptr vpnPlatform) + EventLoop_ptr netloop, std::shared_ptr l, std::shared_ptr vpnPlatform) : ready(false) , m_lmq(std::make_shared()) - , _netloop(std::move(__netloop)) + , _netloop(std::move(netloop)) , _logic(std::move(l)) , _vpnPlatform(std::move(vpnPlatform)) , paths(this) @@ -289,7 +287,7 @@ namespace llarp bool Router::Configure(std::shared_ptr c, bool isRouter, std::shared_ptr nodedb) { - m_Config = c; + m_Config = std::move(c); auto& conf = *m_Config; whitelistRouters = conf.lokid.whitelistRouters; if (whitelistRouters) @@ -307,7 +305,7 @@ namespace llarp m_lmq->start(); - _nodedb = nodedb; + _nodedb = std::move(nodedb); m_isServiceNode = conf.router.m_isRelay; @@ -366,18 +364,10 @@ namespace llarp if (_onDown) _onDown(); LogInfo("closing router"); - llarp_ev_loop_stop(_netloop); + _netloop->stop(); _running.store(false); } - void - Router::handle_router_ticker() - { - ticker_job_id = 0; - Tick(); - ScheduleTicker(ROUTER_TICK_INTERVAL); - } - bool Router::ParseRoutingMessageBuffer( const llarp_buffer_t& buf, routing::IMessageHandler* h, const PathID_t& rxid) @@ -914,12 +904,6 @@ namespace llarp return CryptoManager::instance()->sign(sig, identity(), buf); } - void - Router::ScheduleTicker(llarp_time_t interval) - { - ticker_job_id = _logic->call_later(interval, std::bind(&Router::handle_router_ticker, this)); - } - void Router::SessionClosed(RouterID remote) { @@ -1136,11 +1120,11 @@ namespace llarp #ifdef _WIN32 // windows uses proactor event loop so we need to constantly pump - _netloop->add_ticker(std::bind(&Router::PumpLL, this)); + _netloop->add_ticker([this] { PumpLL(); }); #else - _netloop->set_pump_function(std::bind(&Router::PumpLL, this)); + _netloop->set_pump_function([this] { PumpLL(); }); #endif - ScheduleTicker(ROUTER_TICK_INTERVAL); + _logic->call_every(ROUTER_TICK_INTERVAL, weak_from_this(), [this] { Tick(); }); _running.store(true); _startedAt = Now(); #if defined(WITH_SYSTEMD) @@ -1338,9 +1322,7 @@ namespace llarp if (!link) throw std::runtime_error("NewOutboundLink() failed to provide a link"); - const auto afs = {AF_INET, AF_INET6}; - - for (const auto af : afs) + for (const auto af : {AF_INET, AF_INET6}) { if (not link->Configure(netloop(), "*", af, m_OutboundPort)) continue; diff --git a/llarp/router/router.hpp b/llarp/router/router.hpp index d5070649a..9b2738a78 100644 --- a/llarp/router/router.hpp +++ b/llarp/router/router.hpp @@ -8,7 +8,7 @@ #include #include #include -#include +#include #include #include #include @@ -76,19 +76,19 @@ namespace llarp LMQ_ptr m_lmq; - LMQ_ptr + const LMQ_ptr& lmq() const override { return m_lmq; } - std::shared_ptr + const std::shared_ptr& RpcClient() const override { return m_lokidRpcClient; } - std::shared_ptr + const std::shared_ptr& logic() const override { return _logic; @@ -103,7 +103,7 @@ namespace llarp util::StatusObject ExtractStatus() const override; - std::shared_ptr + const std::shared_ptr& nodedb() const override { return _nodedb; @@ -136,7 +136,7 @@ namespace llarp return _exitContext; } - std::shared_ptr + const std::shared_ptr& keyManager() const override { return m_keyManager; @@ -160,7 +160,7 @@ namespace llarp return _routerProfiling; } - llarp_ev_loop_ptr + const EventLoop_ptr& netloop() const override { return _netloop; @@ -180,7 +180,7 @@ namespace llarp std::optional _ourAddress; - llarp_ev_loop_ptr _netloop; + EventLoop_ptr _netloop; std::shared_ptr _logic; std::shared_ptr _vpnPlatform; path::PathContext paths; @@ -206,8 +206,6 @@ namespace llarp // should we be sending padded messages every interval? bool sendPadding = false; - uint32_t ticker_job_id = 0; - LinkMessageParser inbound_link_msg_parser; routing::InboundMessageParser inbound_routing_msg_parser; @@ -327,11 +325,11 @@ namespace llarp GossipRCIfNeeded(const RouterContact rc) override; explicit Router( - llarp_ev_loop_ptr __netloop, + EventLoop_ptr netloop, std::shared_ptr logic, std::shared_ptr vpnPlatform); - virtual ~Router() override; + ~Router() override; bool HandleRecvLinkMessageBuffer(ILinkSession* from, const llarp_buffer_t& msg) override; @@ -462,10 +460,6 @@ namespace llarp return llarp::time_now_ms(); } - /// schedule ticker to call i ms from now - void - ScheduleTicker(llarp_time_t i = 1s); - /// parse a routing message in a buffer and handle it with a handler if /// successful parsing return true on parse and handle success otherwise /// return false @@ -502,9 +496,6 @@ namespace llarp uint32_t NextPathBuildNumber() override; - void - handle_router_ticker(); - void AfterStopLinks(); diff --git a/llarp/rpc/endpoint_rpc.cpp b/llarp/rpc/endpoint_rpc.cpp index 3e3f6668f..4c621eb71 100644 --- a/llarp/rpc/endpoint_rpc.cpp +++ b/llarp/rpc/endpoint_rpc.cpp @@ -29,7 +29,7 @@ namespace llarp::rpc }, [self = shared_from_this()](oxenmq::ConnectionID, std::string_view fail) { LogWarn("failed to connect to endpoint auth server: ", fail); - self->m_Endpoint->RouterLogic()->call_later(1s, [self]() { self->Start(); }); + self->m_Endpoint->Logic()->call_later(1s, [self] { self->Start(); }); }); } @@ -44,33 +44,32 @@ namespace llarp::rpc std::shared_ptr msg, std::function hook) { + assert(m_Endpoint->Logic()->inLogicThread()); service::ConvoTag tag = msg->tag; m_PendingAuths.insert(tag); const auto from = msg->sender.Addr(); - auto reply = - [self = this, tag, logic = m_Endpoint->RouterLogic(), hook](service::AuthResult result) { - logic->Call([self, hook, result, tag]() { - self->m_PendingAuths.erase(tag); - hook(result); - }); - }; + auto reply = m_Endpoint->Logic()->make_caller([this, tag, hook](service::AuthResult result) { + m_PendingAuths.erase(tag); + hook(result); + }); if (m_AuthWhitelist.count(from)) { // explicitly whitelisted source - reply({service::AuthResultCode::eAuthAccepted, "explicitly whitelisted"}); + reply(service::AuthResult{service::AuthResultCode::eAuthAccepted, "explicitly whitelisted"}); return; } if (not m_Conn.has_value()) { // we don't have a connection to the backend so it's failed - reply({service::AuthResultCode::eAuthFailed, "remote has no connection to auth backend"}); + reply(service::AuthResult{service::AuthResultCode::eAuthFailed, + "remote has no connection to auth backend"}); return; } if (msg->proto != llarp::service::eProtocolAuth) { // not an auth message, reject - reply({service::AuthResultCode::eAuthRejected, "protocol error"}); + reply(service::AuthResult{service::AuthResultCode::eAuthRejected, "protocol error"}); return; } @@ -81,7 +80,8 @@ namespace llarp::rpc m_LMQ->request( *m_Conn, m_AuthMethod, - [self = shared_from_this(), reply](bool success, std::vector data) { + [self = shared_from_this(), reply = std::move(reply)]( + bool success, std::vector data) { service::AuthResult result{service::AuthResultCode::eAuthFailed, "no reason given"}; if (success and not data.empty()) { diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index 7577a813c..a1b714cf7 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -76,21 +76,6 @@ namespace llarp return m_state->Configure(conf); } - llarp_ev_loop_ptr - Endpoint::EndpointNetLoop() - { - if (m_state->m_IsolatedNetLoop) - return m_state->m_IsolatedNetLoop; - - return Router()->netloop(); - } - - bool - Endpoint::NetworkIsIsolated() const - { - return m_state->m_IsolatedLogic.get() != nullptr && m_state->m_IsolatedNetLoop != nullptr; - } - bool Endpoint::HasPendingPathToService(const Address& addr) const { @@ -652,20 +637,6 @@ namespace llarp m_OnReady = nullptr; } - void - Endpoint::IsolatedNetworkMainLoop() - { - m_state->m_IsolatedNetLoop = llarp_make_ev_loop(); - m_state->m_IsolatedLogic = std::make_shared(); - if (SetupNetworking()) - llarp_ev_loop_run_single_process(m_state->m_IsolatedNetLoop, m_state->m_IsolatedLogic); - else - { - m_state->m_IsolatedNetLoop.reset(); - m_state->m_IsolatedLogic.reset(); - } - } - std::optional> Endpoint::GetHopsForBuild() { @@ -1019,7 +990,9 @@ namespace llarp } else { - RouterLogic()->Call([hook]() { hook({AuthResultCode::eAuthAccepted, "OK"}); }); + Router()->logic()->Call([h = std::move(hook)] { + h({AuthResultCode::eAuthAccepted, "OK"}); + }); } } @@ -1099,7 +1072,7 @@ namespace llarp RemoveConvoTag(frame.T); return true; } - if (not frame.AsyncDecryptAndVerify(EndpointLogic(), p, m_Identity, this)) + if (not frame.AsyncDecryptAndVerify(Router()->logic(), p, m_Identity, this)) { // send reset convo tag message ProtocolFrame f; @@ -1342,14 +1315,7 @@ namespace llarp } }; - if (NetworkIsIsolated()) - { - LogicCall(EndpointLogic(), epPump); - } - else - { - epPump(); - } + epPump(); auto router = Router(); // TODO: locking on this container for (const auto& item : m_state->m_RemoteSessions) @@ -1537,24 +1503,18 @@ namespace llarp || NumInStatus(path::ePathEstablished) < path::min_intro_paths; } - std::shared_ptr - Endpoint::RouterLogic() - { - return Router()->logic(); - } - - std::shared_ptr - Endpoint::EndpointLogic() - { - return m_state->m_IsolatedLogic ? m_state->m_IsolatedLogic : Router()->logic(); - } - AbstractRouter* Endpoint::Router() { return m_state->m_Router; } + const std::shared_ptr& + Endpoint::Logic() + { + return Router()->logic(); + } + void Endpoint::BlacklistSNode(const RouterID snode) { diff --git a/llarp/service/endpoint.hpp b/llarp/service/endpoint.hpp index 858012b17..5f0bf64b3 100644 --- a/llarp/service/endpoint.hpp +++ b/llarp/service/endpoint.hpp @@ -2,7 +2,7 @@ #define LLARP_SERVICE_ENDPOINT_HPP #include #include -#include +#include #include #include #include @@ -136,20 +136,10 @@ namespace llarp void ResetInternalState() override; - /// router's logic + /// logic (via router) /// use when sending any data on a path - std::shared_ptr - RouterLogic(); - - /// endpoint's logic - /// use when writing any data to local network interfaces - std::shared_ptr - EndpointLogic(); - - /// borrow endpoint's net loop for sending data to user on local network - /// interface - llarp_ev_loop_ptr - EndpointNetLoop(); + const std::shared_ptr& + Logic(); AbstractRouter* Router(); @@ -423,22 +413,6 @@ namespace llarp void PrefetchServicesByTag(const Tag& tag); - /// spawn a new process that contains a network isolated process - /// return true if we set up isolation and the event loop is up - /// otherwise return false - virtual bool - SpawnIsolatedNetwork() - { - return false; - } - - bool - NetworkIsIsolated() const; - - /// this runs in the isolated network process - void - IsolatedNetworkMainLoop(); - private: void HandleVerifyGotRouter(dht::GotRouterMessage_constptr msg, RouterID id, bool valid); diff --git a/llarp/service/endpoint_state.hpp b/llarp/service/endpoint_state.hpp index 963e81d27..99549d4ae 100644 --- a/llarp/service/endpoint_state.hpp +++ b/llarp/service/endpoint_state.hpp @@ -19,25 +19,8 @@ namespace llarp { - struct EventLoop; -} - -using llarp_ev_loop_ptr = std::shared_ptr; - -namespace llarp -{ - // clang-format off - namespace exit { struct BaseSession; } - namespace path { struct Path; using Path_ptr = std::shared_ptr< Path >; } - namespace routing { struct PathTransferMessage; } - // clang-format on - namespace service { - struct IServiceLookup; - struct OutboundContext; - struct Endpoint; - struct EndpointState { hooks::Backend_ptr m_OnUp; @@ -47,8 +30,6 @@ namespace llarp std::set m_SnodeBlacklist; AbstractRouter* m_Router; - std::shared_ptr m_IsolatedLogic = nullptr; - llarp_ev_loop_ptr m_IsolatedNetLoop = nullptr; std::string m_Keyfile; std::string m_Name; std::string m_NetNS; diff --git a/llarp/service/outbound_context.cpp b/llarp/service/outbound_context.cpp index 69c0effed..cedbe1f6e 100644 --- a/llarp/service/outbound_context.cpp +++ b/llarp/service/outbound_context.cpp @@ -202,7 +202,7 @@ namespace llarp currentConvoTag.Randomize(); auto frame = std::make_shared(); auto ex = std::make_shared( - m_Endpoint->RouterLogic(), + m_Endpoint->Logic(), remoteIdent, m_Endpoint->GetIdentity(), currentIntroSet.K, @@ -581,7 +581,7 @@ namespace llarp }; } const auto& ident = m_Endpoint->GetIdentity(); - if (not frame.AsyncDecryptAndVerify(m_Endpoint->EndpointLogic(), p, ident, m_Endpoint, hook)) + if (not frame.AsyncDecryptAndVerify(m_Endpoint->Logic(), p, ident, m_Endpoint, hook)) { // send reset convo tag message ProtocolFrame f; diff --git a/llarp/service/sendcontext.cpp b/llarp/service/sendcontext.cpp index e27f2b819..725752c9a 100644 --- a/llarp/service/sendcontext.cpp +++ b/llarp/service/sendcontext.cpp @@ -29,7 +29,7 @@ namespace llarp { if (m_SendQueue.empty() or m_SendQueue.full()) { - LogicCall(m_Endpoint->RouterLogic(), [self = this]() { self->FlushUpstream(); }); + LogicCall(m_Endpoint->Logic(), [this] { FlushUpstream(); }); } m_SendQueue.pushBack(std::make_pair( std::make_shared(*msg, remoteIntro.pathID), path)); @@ -97,14 +97,13 @@ namespace llarp m->sender = m_Endpoint->GetIdentity().pub; m->tag = f->T; m->PutBuffer(payload); - auto self = this; - m_Endpoint->Router()->QueueWork([f, m, shared, path, self]() { - if (not f->EncryptAndSign(*m, shared, self->m_Endpoint->GetIdentity())) + m_Endpoint->Router()->QueueWork([f, m, shared, path, this] { + if (not f->EncryptAndSign(*m, shared, m_Endpoint->GetIdentity())) { - LogError(self->m_Endpoint->Name(), " failed to sign message"); + LogError(m_Endpoint->Name(), " failed to sign message"); return; } - self->Send(f, path); + Send(f, path); }); } diff --git a/llarp/simulation/sim_context.hpp b/llarp/simulation/sim_context.hpp index 6d9a3338b..dc27cbfb2 100644 --- a/llarp/simulation/sim_context.hpp +++ b/llarp/simulation/sim_context.hpp @@ -1,6 +1,6 @@ #pragma once #include -#include +#include namespace llarp { @@ -15,7 +15,7 @@ namespace llarp Simulation(); llarp::CryptoManager m_CryptoManager; - llarp_ev_loop_ptr m_NetLoop; + EventLoop_ptr m_NetLoop; std::unordered_map m_Nodes; diff --git a/llarp/tooling/hive_context.cpp b/llarp/tooling/hive_context.cpp index 963407018..98c1502d1 100644 --- a/llarp/tooling/hive_context.cpp +++ b/llarp/tooling/hive_context.cpp @@ -8,7 +8,7 @@ namespace tooling {} std::shared_ptr - HiveContext::makeRouter(llarp_ev_loop_ptr netloop, std::shared_ptr logic) + HiveContext::makeRouter(llarp::EventLoop_ptr netloop, std::shared_ptr logic) { return std::make_shared(netloop, logic, makeVPNPlatform(), m_hive); } diff --git a/llarp/tooling/hive_context.hpp b/llarp/tooling/hive_context.hpp index 875c6f96d..42c998287 100644 --- a/llarp/tooling/hive_context.hpp +++ b/llarp/tooling/hive_context.hpp @@ -12,7 +12,7 @@ namespace tooling HiveContext(RouterHive* hive); std::shared_ptr - makeRouter(llarp_ev_loop_ptr netloop, std::shared_ptr logic) override; + makeRouter(llarp::EventLoop_ptr netloop, std::shared_ptr logic) override; /// Get this context's router as a HiveRouter. /// diff --git a/llarp/tooling/hive_router.cpp b/llarp/tooling/hive_router.cpp index ffa161814..54ccb8a17 100644 --- a/llarp/tooling/hive_router.cpp +++ b/llarp/tooling/hive_router.cpp @@ -5,7 +5,7 @@ namespace tooling { HiveRouter::HiveRouter( - llarp_ev_loop_ptr netloop, + llarp::EventLoop_ptr netloop, std::shared_ptr logic, std::shared_ptr plat, RouterHive* hive) diff --git a/llarp/tooling/hive_router.hpp b/llarp/tooling/hive_router.hpp index 221cf6308..6aaf571c8 100644 --- a/llarp/tooling/hive_router.hpp +++ b/llarp/tooling/hive_router.hpp @@ -11,7 +11,7 @@ namespace tooling struct HiveRouter : public llarp::Router { explicit HiveRouter( - llarp_ev_loop_ptr netloop, + llarp::EventLoop_ptr netloop, std::shared_ptr logic, std::shared_ptr vpnPlatform, RouterHive* hive); diff --git a/llarp/util/aligned.hpp b/llarp/util/aligned.hpp index 0db12ea9e..0558d5e18 100644 --- a/llarp/util/aligned.hpp +++ b/llarp/util/aligned.hpp @@ -124,7 +124,7 @@ namespace llarp operator^(const AlignedBuffer& other) const { AlignedBuffer ret; - std::transform(begin(), end(), other.begin(), ret.begin(), std::bit_xor()); + std::transform(begin(), end(), other.begin(), ret.begin(), std::bit_xor<>()); return ret; } diff --git a/llarp/util/buffer.cpp b/llarp/util/buffer.cpp index 473bfc5ef..3d6aebddb 100644 --- a/llarp/util/buffer.cpp +++ b/llarp/util/buffer.cpp @@ -129,3 +129,23 @@ operator==(const llarp_buffer_t& buff, const char* c_str) } return *str == 0; } + +namespace llarp +{ + OwnedBuffer + OwnedBuffer::copy_from(const llarp_buffer_t& b) + { + auto buf = std::make_unique(b.sz); + std::copy(b.begin(), b.end(), buf.get()); + return {std::move(buf), b.sz}; + } + + OwnedBuffer + OwnedBuffer::copy_used(const llarp_buffer_t& b) + { + auto buf = std::make_unique(b.cur - b.base); + std::copy(b.base, b.cur, buf.get()); + return {std::move(buf), b.sz}; + } + +} // namespace llarp diff --git a/llarp/util/buffer.hpp b/llarp/util/buffer.hpp index b9e63d6c6..57e1eb022 100644 --- a/llarp/util/buffer.hpp +++ b/llarp/util/buffer.hpp @@ -1,6 +1,7 @@ #ifndef LLARP_BUFFER_HPP #define LLARP_BUFFER_HPP +#include #include #include #include @@ -13,11 +14,16 @@ #include #include #include +#include /** * buffer.h * * generic memory buffer + * + * TODO: replace usage of these with std::span (via a backport until we move to C++20). That's a + * fairly big job, though, as llarp_buffer_t is currently used a bit differently (i.e. maintains + * both start and current position, plus has some value reading/writing methods). */ /** @@ -83,34 +89,44 @@ struct llarp_buffer_t llarp_buffer_t(const ManagedBuffer&) = delete; llarp_buffer_t(ManagedBuffer&&) = delete; - template - llarp_buffer_t(T* buf, size_t _sz) : base(reinterpret_cast(buf)), cur(base), sz(_sz) + /// Construct referencing some 1-byte, trivially copyable (e.g. char, unsigned char, byte_t) + /// pointer type and a buffer size. + template < + typename T, + typename = std::enable_if_t>> + llarp_buffer_t(T* buf, size_t _sz) + : base(reinterpret_cast(const_cast*>(buf))) + , cur(base) + , sz(_sz) {} - template - llarp_buffer_t(const T* buf, size_t _sz) - : base(reinterpret_cast(const_cast(buf))), cur(base), sz(_sz) + /// initialize llarp_buffer_t from containers supporting .data() and .size() + template < + typename T, + typename = std::void_t().data() + std::declval().size())>> + llarp_buffer_t(T& t) : llarp_buffer_t{t.data(), t.size()} {} - /** initialize llarp_buffer_t from container */ - template - llarp_buffer_t(T& t) : base(t.data()), cur(t.data()), sz(t.size()) + byte_t* + begin() { - // use data over the first element to "enforce" the container used has - // contiguous memory. (Note this isn't required by the standard, but a - // reasonable test on most standard library implementations). + return base; + } + byte_t* + begin() const + { + return base; + } + byte_t* + end() + { + return base + sz; + } + byte_t* + end() const + { + return base + sz; } - - template - llarp_buffer_t(const T& t) : llarp_buffer_t(t.data(), t.size()) - {} - - // clang-format off - byte_t * begin() { return base; } - byte_t * begin() const { return base; } - byte_t * end() { return base + sz; } - byte_t * end() const { return base + sz; } - // clang-format on size_t size_left() const; @@ -211,4 +227,49 @@ struct ManagedBuffer } }; +namespace llarp +{ + // Wrapper around a std::unique_ptr that owns its own memory and is also implicitly + // convertible to a llarp_buffer_t. + struct OwnedBuffer + { + std::unique_ptr buf; + size_t sz; + + template > + OwnedBuffer(std::unique_ptr buf, size_t sz) + : buf{reinterpret_cast(buf.release())}, sz{sz} + {} + + // Create a new, uninitialized owned buffer of the given size. + explicit OwnedBuffer(size_t sz) : OwnedBuffer{std::make_unique(sz), sz} + {} + + OwnedBuffer(const OwnedBuffer&) = delete; + OwnedBuffer& + operator=(const OwnedBuffer&) = delete; + OwnedBuffer(OwnedBuffer&&) = default; + OwnedBuffer& + operator=(OwnedBuffer&&) = delete; + + // Implicit conversion so that this OwnedBuffer can be passed to anything taking a + // llarp_buffer_t + operator llarp_buffer_t() + { + return {buf.get(), sz}; + } + + // Creates an owned buffer by copying from a llarp_buffer_t. (Can also be used to copy from + // another OwnedBuffer via the implicit conversion operator above). + static OwnedBuffer + copy_from(const llarp_buffer_t& b); + + // Creates an owned buffer by copying the used portion of a llarp_buffer_t (i.e. from base to + // cur), for when a llarp_buffer_t is used in write mode. + static OwnedBuffer + copy_used(const llarp_buffer_t& b); + }; + +} // namespace llarp + #endif diff --git a/llarp/util/thread/logic.cpp b/llarp/util/thread/logic.cpp index c1c0e1bf7..c20cb00c6 100644 --- a/llarp/util/thread/logic.cpp +++ b/llarp/util/thread/logic.cpp @@ -24,35 +24,12 @@ namespace llarp m_Queue = std::move(q); } - uint32_t - Logic::call_later(llarp_time_t timeout, std::function func) - { - auto loop = m_Loop; - if (loop != nullptr) - { - return loop->call_after_delay(timeout, func); - } - return 0; - } - void - Logic::cancel_call(uint32_t id) - { - auto loop = m_Loop; - if (loop != nullptr) - { - loop->cancel_delayed_call(id); - } - } - - void - Logic::remove_call(uint32_t id) + Logic::call_later(llarp_time_t timeout, std::function func) { - auto loop = m_Loop; - if (loop != nullptr) - { - loop->cancel_delayed_call(id); - } + Call([this, timeout, f = std::move(func)]() mutable { + m_Loop->call_after_delay(timeout, std::move(f)); + }); } void diff --git a/llarp/util/thread/logic.hpp b/llarp/util/thread/logic.hpp index 355b1044c..a05ab1819 100644 --- a/llarp/util/thread/logic.hpp +++ b/llarp/util/thread/logic.hpp @@ -16,24 +16,86 @@ namespace llarp void Call(std::function func); - uint32_t + // Calls the given function once, after the given delay. + void call_later(llarp_time_t later, std::function func); + // Calls the given function repeatedly, forever, as long as the event loop lasts; the initial + // call will be after the given delay. void - cancel_call(uint32_t id); + call_forever(llarp_time_t repeat, std::function func); + // Created a repeated timer, like call_forever(repeat, func), but ties the lifetime of the + // callback to `owner`: callbacks will be invoked so long as `owner` remains alive, but + // thereafter the callback will be destroyed. Intended to be used as: + // + // logic->call_every(100ms, shared_from_this(), [this] { some_func(); }); + // + template void - remove_call(uint32_t id); + call_every(llarp_time_t repeat, std::weak_ptr owner, Callable f) + { + auto repeater = m_Loop->make_repeater(); + auto& r = *repeater; + r.start( + repeat, + [repeater = std::move(repeater), owner = std::move(owner), f = std::move(f)]() mutable { + if (auto ptr = owner.lock()) + f(); + else + repeater.reset(); // Remove timer on destruction (we should be the only thing holder + // the repeater) + }); + } + + // Wraps a lambda with a lambda that triggers it to be called via Logic::Call() + // when invoked. E.g.: + // + // auto x = logic->make_caller([] (int a) { std::cerr << a; }); + // x(42); + // x(99); + // + // will schedule two calls of the inner lambda (with different arguments) in the logic thread. + // Arguments are forwarded to the inner lambda (allowing moving arguments into it). + template + auto + make_caller(Callable&& f) + { + return [this, f = std::forward(f)](auto&&... args) { + // This shared pointer in a pain in the ass but needed because this lambda is going into a + // std::function that only accepts copyable lambdas. I *want* to simply capture: + // args=std::make_tuple(std::forward(args)...) + // but that fails if any given args aren't copyable. Dammit. + auto args_tuple_ptr = std::make_shared...>>( + std::forward(args)...); + Call([f, args = std::move(args_tuple_ptr)]() mutable { + // Moving away the tuple args here is okay because this lambda will only be invoked once + std::apply(f, std::move(*args)); + }); + }; + } void SetQueuer(std::function)> q); + EventLoop* + event_loop() + { + return m_Loop; + } + void set_event_loop(EventLoop* loop); void clear_event_loop(); + bool + inLogicThread() const + { + return m_Loop and m_Loop->inEventLoopThread(); + } + private: EventLoop* m_Loop = nullptr; std::function)> m_Queue; diff --git a/llarp/vpn/apple.hpp b/llarp/vpn/apple.hpp index 4c420410e..50b09fb5a 100644 --- a/llarp/vpn/apple.hpp +++ b/llarp/vpn/apple.hpp @@ -131,16 +131,18 @@ namespace llarp::vpn net::IPPacket ReadNextPacket() override { + constexpr int uintsize = sizeof(unsigned int); net::IPPacket pkt{}; unsigned int pktinfo = 0; - const struct iovec vecs[2] = {{.iov_base = &pktinfo, .iov_len = sizeof(unsigned int)}, + const struct iovec vecs[2] = {{.iov_base = &pktinfo, .iov_len = uintsize}, {.iov_base = pkt.buf, .iov_len = sizeof(pkt.buf)}}; - int n = readv(m_FD, vecs, 2); - if (n >= (int)(sizeof(unsigned int))) - { - n -= sizeof(unsigned int); - pkt.sz = n; - } + int sz = readv(m_FD, vecs, 2); + if (sz >= uintsize) + pkt.sz = sz - uintsize; + else if (sz >= 0 || errno == EAGAIN || errno == EWOULDBLOCK) + pkt.sz = 0; + else + throw std::error_code{errno, std::system_category()}; return pkt; } diff --git a/llarp/vpn/linux.hpp b/llarp/vpn/linux.hpp index 9a13dd017..98e264a8d 100644 --- a/llarp/vpn/linux.hpp +++ b/llarp/vpn/linux.hpp @@ -91,6 +91,10 @@ namespace llarp::vpn const auto sz = read(m_fd, pkt.buf, sizeof(pkt.buf)); if (sz >= 0) pkt.sz = std::min(sz, ssize_t{sizeof(pkt.buf)}); + else if (errno == EAGAIN || errno == EWOULDBLOCK) + pkt.sz = 0; + else + throw std::error_code{errno, std::system_category()}; return pkt; } diff --git a/llarp/vpn/win32.hpp b/llarp/vpn/win32.hpp index 079448daf..cb7e8fe34 100644 --- a/llarp/vpn/win32.hpp +++ b/llarp/vpn/win32.hpp @@ -345,12 +345,6 @@ namespace llarp::vpn return -1; } - bool - HasNextPacket() override - { - return not m_ReadQueue.empty(); - } - std::string IfName() const override { @@ -376,8 +370,11 @@ namespace llarp::vpn } net::IPPacket - ReadNextPacket() + ReadNextPacket() override { + if (m_ReadQueue.empty()) + return net::IPPacket{}; + return m_ReadQueue.popFront(); } diff --git a/test/iwp/test_iwp_session.cpp b/test/iwp/test_iwp_session.cpp index 1d4fc297a..09c011f6b 100644 --- a/test/iwp/test_iwp_session.cpp +++ b/test/iwp/test_iwp_session.cpp @@ -11,6 +11,7 @@ #include #include +#include "ev/ev.hpp" #undef LOG_TAG #define LOG_TAG __FILE__ @@ -38,11 +39,11 @@ struct IWPLinkContext llarp::LinkLayer_ptr link; std::shared_ptr keyManager; llarp::LinkMessageParser m_Parser; - llarp_ev_loop_ptr m_Loop; + llarp::EventLoop_ptr m_Loop; /// is the test done on this context ? bool gucci = false; - IWPLinkContext(std::string_view addr, llarp_ev_loop_ptr loop) + IWPLinkContext(std::string_view addr, llarp::EventLoop_ptr loop) : localAddr{std::move(addr)} , keyManager{std::make_shared()} , m_Parser{nullptr} @@ -104,7 +105,7 @@ struct IWPLinkContext }, // timeout handler [&](llarp::ILinkSession*) { - llarp_ev_loop_stop(m_Loop); + m_Loop->stop(); FAIL("session timeout"); }, // session closed handler @@ -150,8 +151,9 @@ RunIWPTest(Func_t test, Duration_t timeout = 10s) llarp::LogSilencer shutup; // set up event loop auto logic = std::make_shared(); - auto loop = llarp_make_ev_loop(); + auto loop = llarp::EventLoop::create(); loop->set_logic(logic); + logic->set_event_loop(loop.get()); llarp::LogContext::Instance().Initialize( llarp::eLogDebug, llarp::LogType::File, "stdout", "unit test", [loop](auto work) { @@ -175,7 +177,7 @@ RunIWPTest(Func_t test, Duration_t timeout = 10s) auto endIfDone = [initiator, recipiant, loop, logic]() { if (initiator->gucci and recipiant->gucci) { - LogicCall(logic, [loop]() { llarp_ev_loop_stop(loop); }); + LogicCall(logic, [loop] { loop->stop(); }); } }; // function to start test and give logic to unit test @@ -186,12 +188,12 @@ RunIWPTest(Func_t test, Duration_t timeout = 10s) }; // function to end test immediately - auto endTest = [logic, loop]() { LogicCall(logic, [loop]() { llarp_ev_loop_stop(loop); }); }; + auto endTest = [logic, loop]() { LogicCall(logic, [loop] { loop->stop(); }); }; loop->call_after_delay( std::chrono::duration_cast(timeout), []() { FAIL("test timeout"); }); test(start, endIfDone, endTest, initiator, recipiant); - llarp_ev_loop_run_single_process(loop, logic); + loop->run(*logic); llarp::RouterContact::BlockBogons = oldBlockBogons; }