diff --git a/daemon/main.cpp b/daemon/main.cpp index 9b8fe45af..3975c8997 100644 --- a/daemon/main.cpp +++ b/daemon/main.cpp @@ -6,7 +6,6 @@ #include #include #include -#include #ifdef _WIN32 #include @@ -69,7 +68,7 @@ void handle_signal(int sig) { if (ctx) - LogicCall(ctx->logic, std::bind(&llarp::Context::HandleSignal, ctx.get(), sig)); + ctx->loop->call([sig] { ctx->HandleSignal(sig); }); else std::cerr << "Received signal " << sig << ", but have no context yet. Ignoring!" << std::endl; } diff --git a/include/llarp.hpp b/include/llarp.hpp index 0e1e01ebd..01643724c 100644 --- a/include/llarp.hpp +++ b/include/llarp.hpp @@ -16,7 +16,7 @@ namespace llarp class Platform; } - class Logic; + class EventLoop; struct Config; struct RouterContact; struct Config; @@ -43,9 +43,8 @@ namespace llarp std::shared_ptr crypto = nullptr; std::shared_ptr cryptoManager = nullptr; std::shared_ptr router = nullptr; - std::shared_ptr logic = nullptr; + std::shared_ptr loop = nullptr; std::shared_ptr nodedb = nullptr; - std::shared_ptr mainloop; std::string nodedb_dir; virtual ~Context() = default; @@ -93,7 +92,7 @@ namespace llarp /// Creates a router. Can be overridden to allow a different class of router /// to be created instead. Defaults to llarp::Router. virtual std::shared_ptr - makeRouter(std::shared_ptr __netloop, std::shared_ptr logic); + makeRouter(const std::shared_ptr& loop); /// create the vpn platform for use in creating network interfaces virtual std::shared_ptr diff --git a/llarp/CMakeLists.txt b/llarp/CMakeLists.txt index 975dda7ef..b7e55c5ea 100644 --- a/llarp/CMakeLists.txt +++ b/llarp/CMakeLists.txt @@ -19,7 +19,6 @@ add_library(lokinet-util util/mem.cpp util/printer.cpp util/str.cpp - util/thread/logic.cpp util/thread/queue_manager.cpp util/thread/threading.cpp util/time.cpp diff --git a/llarp/context.cpp b/llarp/context.cpp index f5aff4aa3..5a3695f2e 100644 --- a/llarp/context.cpp +++ b/llarp/context.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -25,7 +26,10 @@ namespace llarp bool Context::CallSafe(std::function f) { - return logic && LogicCall(logic, f); + if (!loop) + return false; + loop->call(std::move(f)); + return true; } void @@ -36,8 +40,6 @@ namespace llarp config = std::move(conf); - logic = std::make_shared(); - nodedb_dir = fs::path{config->router.m_dataDir / nodedb_dirname}.string(); } @@ -62,19 +64,16 @@ namespace llarp llarp::LogInfo(llarp::VERSION_FULL, " ", llarp::RELEASE_MOTTO); llarp::LogInfo("starting up"); - if (mainloop == nullptr) + if (!loop) { auto jobQueueSize = std::max(event_loop_queue_size, config->router.m_JobQueueSize); - mainloop = EventLoop::create(jobQueueSize); + loop = EventLoop::create(jobQueueSize); } - logic->set_event_loop(mainloop.get()); - - mainloop->set_logic(logic); crypto = std::make_shared(); cryptoManager = std::make_shared(crypto.get()); - router = makeRouter(mainloop, logic); + router = makeRouter(loop); nodedb = std::make_shared( nodedb_dir, [r = router.get()](auto call) { r->QueueDiskIO(std::move(call)); }); @@ -84,9 +83,10 @@ namespace llarp } std::shared_ptr - Context::makeRouter(std::shared_ptr netloop, std::shared_ptr logic) + Context::makeRouter(const EventLoop_ptr& loop) { - return std::make_shared(netloop, logic, makeVPNPlatform()); + return std::static_pointer_cast( + std::make_shared(loop, makeVPNPlatform())); } std::shared_ptr @@ -117,7 +117,7 @@ namespace llarp // run net io thread llarp::LogInfo("running mainloop"); - mainloop->run(*logic); + loop->run(); if (closeWaiter) { closeWaiter->set_value(); @@ -188,8 +188,8 @@ namespace llarp llarp::LogDebug("free router"); router.reset(); - llarp::LogDebug("free logic"); - logic.reset(); + llarp::LogDebug("free loop"); + loop.reset(); } #if defined(ANDROID) diff --git a/llarp/dht/context.cpp b/llarp/dht/context.cpp index 68fa865db..290108452 100644 --- a/llarp/dht/context.cpp +++ b/llarp/dht/context.cpp @@ -17,7 +17,6 @@ #include #include #include -#include #include #include #include @@ -161,7 +160,7 @@ namespace llarp GetIntroSetByLocation(const Key_t& location) const override; void - handle_cleaner_timer(uint64_t interval); + handle_cleaner_timer(); /// explore dht for new routers void @@ -202,15 +201,13 @@ namespace llarp void PutRCNodeAsync(const RCNode& val) override { - auto func = std::bind(&Bucket::PutNode, Nodes(), val); - LogicCall(router->logic(), func); + router->loop()->call([nodes = Nodes(), val] { nodes->PutNode(val); }); } void DelRCNodeAsync(const Key_t& val) override { - auto func = std::bind(&Bucket::DelNode, Nodes(), val); - LogicCall(router->logic(), func); + router->loop()->call([nodes = Nodes(), val] { nodes->DelNode(val); }); } const Key_t& @@ -289,8 +286,7 @@ namespace llarp ExploreNetworkVia(const Key_t& peer) override; private: - void - ScheduleCleanupTimer(); + std::shared_ptr _timer_keepalive; void CleanupTX(); @@ -333,7 +329,7 @@ namespace llarp } void - Context::handle_cleaner_timer(__attribute__((unused)) uint64_t interval) + Context::handle_cleaner_timer() { // clean up transactions CleanupTX(); @@ -354,7 +350,6 @@ namespace llarp ++itr; } } - ScheduleCleanupTimer(); } void @@ -458,14 +453,8 @@ namespace llarp _services = std::make_unique>(ourKey, llarp::randint); llarp::LogDebug("initialize dht with key ", ourKey); // start cleanup timer - ScheduleCleanupTimer(); - } - - void - Context::ScheduleCleanupTimer() - { - router->logic()->call_later( - 1s, std::bind(&llarp::dht::Context::handle_cleaner_timer, this, 1000)); + _timer_keepalive = std::make_shared(0); + router->loop()->call_every(1s, _timer_keepalive, [this] { handle_cleaner_timer(); }); } void diff --git a/llarp/dns/server.cpp b/llarp/dns/server.cpp index 3374d9f3f..368f44a9c 100644 --- a/llarp/dns/server.cpp +++ b/llarp/dns/server.cpp @@ -1,19 +1,18 @@ #include #include #include -#include #include #include #include namespace llarp::dns { - PacketHandler::PacketHandler(Logic_ptr logic, IQueryHandler* h) - : m_QueryHandler{h}, m_Logic{std::move(logic)} + PacketHandler::PacketHandler(EventLoop_ptr loop, IQueryHandler* h) + : m_QueryHandler{h}, m_Loop{std::move(loop)} {} - Proxy::Proxy(EventLoop_ptr loop, Logic_ptr logic, IQueryHandler* h) - : PacketHandler{logic, h}, m_Loop(std::move(loop)) + Proxy::Proxy(EventLoop_ptr loop, IQueryHandler* h) + : PacketHandler{loop, h}, m_Loop(std::move(loop)) { m_Server = m_Loop->udp([this](UDPHandle&, SockAddr a, OwnedBuffer buf) { HandlePacket(a, a, buf); }); @@ -65,7 +64,7 @@ namespace llarp::dns }; m_UnboundResolver = - std::make_shared(m_Logic, std::move(replyFunc), std::move(failFunc)); + std::make_shared(m_Loop, std::move(replyFunc), std::move(failFunc)); if (not m_UnboundResolver->Init()) { llarp::LogError("Failed to initialize upstream DNS resolver."); diff --git a/llarp/dns/server.hpp b/llarp/dns/server.hpp index 3d0d9784b..b9080c68b 100644 --- a/llarp/dns/server.hpp +++ b/llarp/dns/server.hpp @@ -4,7 +4,6 @@ #include #include #include -#include #include #include @@ -32,9 +31,7 @@ namespace llarp class PacketHandler : public std::enable_shared_from_this { public: - using Logic_ptr = std::shared_ptr; - - explicit PacketHandler(Logic_ptr logic, IQueryHandler* handler); + explicit PacketHandler(EventLoop_ptr loop, IQueryHandler* handler); virtual ~PacketHandler() = default; @@ -67,15 +64,14 @@ namespace llarp IQueryHandler* const m_QueryHandler; std::set m_Resolvers; std::shared_ptr m_UnboundResolver; - Logic_ptr m_Logic; + EventLoop_ptr m_Loop; }; // Proxying DNS handler that listens on a UDP port for proper DNS requests. class Proxy : public PacketHandler { public: - using Logic_ptr = std::shared_ptr; - explicit Proxy(EventLoop_ptr loop, Logic_ptr logic, IQueryHandler* handler); + explicit Proxy(EventLoop_ptr loop, IQueryHandler* handler); bool Start(SockAddr localaddr, std::vector resolvers) override; diff --git a/llarp/dns/unbound_resolver.cpp b/llarp/dns/unbound_resolver.cpp index 462385c9a..808052c8d 100644 --- a/llarp/dns/unbound_resolver.cpp +++ b/llarp/dns/unbound_resolver.cpp @@ -36,11 +36,11 @@ namespace llarp::dns } UnboundResolver::UnboundResolver( - std::shared_ptr logic, ReplyFunction reply, FailFunction fail) + EventLoop_ptr loop, ReplyFunction reply, FailFunction fail) : unboundContext(nullptr) , started(false) - , replyFunc(logic->make_caller(std::move(reply))) - , failFunc(logic->make_caller(std::move(fail))) + , replyFunc(loop->make_caller(std::move(reply))) + , failFunc(loop->make_caller(std::move(fail))) {} // static callback diff --git a/llarp/dns/unbound_resolver.hpp b/llarp/dns/unbound_resolver.hpp index 498df35be..0d85a7304 100644 --- a/llarp/dns/unbound_resolver.hpp +++ b/llarp/dns/unbound_resolver.hpp @@ -7,7 +7,6 @@ #include #include -#include #include @@ -37,7 +36,7 @@ namespace llarp::dns Reset(); public: - UnboundResolver(std::shared_ptr logic, ReplyFunction replyFunc, FailFunction failFunc); + UnboundResolver(EventLoop_ptr loop, ReplyFunction replyFunc, FailFunction failFunc); static void Callback(void* data, int err, ub_result* result); diff --git a/llarp/ev/ev.cpp b/llarp/ev/ev.cpp index a8e54630b..348338bdb 100644 --- a/llarp/ev/ev.cpp +++ b/llarp/ev/ev.cpp @@ -1,7 +1,6 @@ #include #include #include -#include #include #include @@ -17,13 +16,4 @@ namespace llarp { return std::make_shared(queueLength); } - - void - EventLoop::run(Logic& logic) - { - run_loop(); - logic.clear_event_loop(); - stopped(); - } - } // namespace llarp diff --git a/llarp/ev/ev.hpp b/llarp/ev/ev.hpp index 0f1ad8664..16628cdf9 100644 --- a/llarp/ev/ev.hpp +++ b/llarp/ev/ev.hpp @@ -52,7 +52,6 @@ struct llarp_ev_pkt_pipe; namespace llarp { - class Logic; struct SockAddr; struct UDPHandle; @@ -99,17 +98,13 @@ namespace llarp // this (nearly!) abstract base class // is overriden for each platform - struct EventLoop - // : std::enable_shared_from_this // FIXME: do I actually need shared_from_this()? + class EventLoop { + public: // Runs the event loop. This does not return until sometime after `stop()` is called (and so // typically you want to run this in its own thread). - void - run(Logic& logic); - - // Actually runs the underlying implementation event loop; called by run(). virtual void - run_loop() = 0; + run() = 0; virtual bool running() const = 0; @@ -120,14 +115,89 @@ namespace llarp return llarp::time_now_ms(); } + // Calls a function/lambda/etc. If invoked from within the event loop itself this calls the + // given lambda immediately; otherwise it passes it to `call_soon()` to be queued to run at the + // next event loop iteration. + template void call(Callable&& f) { + if (inEventLoop()) + f(); + else + call_soon(std::forward(f)); + } + + // Queues a function to be called on the next event loop cycle and triggers it to be called as + // soon as possible; can be called from any thread. Note that, unlike `call()`, this queues the + // job even if called from the event loop thread itself and so you *usually* want to use + // `call()` instead. virtual void - stopped() - {} + call_soon(std::function f) = 0; - // Adds a timer to the event loop; should only be called from the logic thread (and so is - // typically scheduled via a call to Logic::call_later()). + // Adds a timer to the event loop to invoke the given callback after a delay. virtual void - call_after_delay(llarp_time_t delay_ms, std::function callback) = 0; + call_later(llarp_time_t delay_ms, std::function callback) = 0; + + // Created a repeated timer that fires ever `repeat` time unit. Lifetime of the event + // is tied to `owner`: callbacks will be invoked so long as `owner` remains alive, but + // the first time it repeats after `owner` has been destroyed the internal timer object will + // be destroyed and no more callbacks will be invoked. + // + // Intended to be used as: + // + // loop->call_every(100ms, weak_from_this(), [this] { some_func(); }); + // + // Alternative, when shared_from_this isn't available for a type, you can use a local member + // shared_ptr (or even create a simple one, for more fine-grained control) to tie the lifetime: + // + // m_keepalive = std::make_shared(42); + // loop->call_every(100ms, m_keepalive, [this] { some_func(); }); + // + template // Templated so that the compiler can inline the call + void + call_every(llarp_time_t repeat, std::weak_ptr owner, Callable f) + { + auto repeater = make_repeater(); + auto& r = *repeater; // reference *before* we pass ownership into the lambda below + r.start( + repeat, + [repeater = std::move(repeater), owner = std::move(owner), f = std::move(f)]() mutable { + if (auto ptr = owner.lock()) + f(); + else + repeater.reset(); // Trigger timer removal on tied object destruction (we should be the only thing holding + // the repeater; ideally it would be a unique_ptr, but + // std::function says nuh-uh). + }); + } + + // Wraps a lambda with a lambda that triggers it to be called via loop->call() + // when invoked. E.g.: + // + // auto x = loop->make_caller([] (int a) { std::cerr << a; }); + // x(42); + // x(99); + // + // will schedule two calls of the inner lambda (with different arguments) in the event loop. + // Arguments are forwarded to the inner lambda (allowing moving arguments into it). + template + auto + make_caller(Callable f) + { + return [this, f = std::move(f)](auto&&... args) { + if (inEventLoop()) + return f(std::forward(args)...); + + // This shared pointer in a pain in the ass but needed because this lambda is going into a + // std::function that only accepts copyable lambdas. I *want* to simply capture: + // args=std::make_tuple(std::forward(args)...) but that fails if any given + // arguments aren't copyable (because of std::function). Dammit. + auto args_tuple_ptr = std::make_shared...>>( + std::forward(args)...); + call_soon([f, args = std::move(args_tuple_ptr)]() mutable { + // Moving away the tuple args here is okay because this lambda will only be invoked once + std::apply(f, std::move(*args)); + }); + }; + } virtual bool add_network_interface( @@ -146,15 +216,8 @@ namespace llarp virtual std::shared_ptr udp(UDPReceiveFunc on_recv) = 0; - /// give this event loop a logic thread for calling - virtual void - set_logic(const std::shared_ptr& logic) = 0; - virtual ~EventLoop() = default; - virtual void - call_soon(std::function f) = 0; - /// set the function that is called once per cycle the flush all the queues virtual void set_pump_function(std::function pumpll) = 0; @@ -167,7 +230,7 @@ namespace llarp make_waker(std::function callback) = 0; // Initializes a new repeated task object. Note that the task is not actually added to the event - // loop until you call start() on the returned object. Typically invoked via Logic::call_every. + // loop until you call start() on the returned object. Typically invoked via call_every. virtual std::shared_ptr make_repeater() = 0; @@ -177,7 +240,7 @@ namespace llarp // Returns true if called from within the event loop thread, false otherwise. virtual bool - inEventLoopThread() const = 0; + inEventLoop() const = 0; }; using EventLoop_ptr = std::shared_ptr; diff --git a/llarp/ev/ev_libuv.cpp b/llarp/ev/ev_libuv.cpp index 5126e3784..1241ff9a6 100644 --- a/llarp/ev/ev_libuv.cpp +++ b/llarp/ev/ev_libuv.cpp @@ -3,7 +3,6 @@ #include #include #include -#include #include #include @@ -130,11 +129,14 @@ namespace llarp::uv } void - Loop::run_loop() + Loop::run() { llarp::LogTrace("Loop::run_loop()"); m_EventLoopThreadID = std::this_thread::get_id(); m_Impl->run(); + m_Impl->close(); + m_Impl.reset(); + llarp::LogInfo("we have stopped"); } void @@ -150,23 +152,37 @@ namespace llarp::uv std::make_shared(*m_Impl, std::move(on_recv))); } - // TODO: replace this one-shot timer mechanism with a repeated timer, because most likely - // everything using this is repeating scheduling itself all the time and would be better served by - // a repeating uv_timer. + static void setup_oneshot_timer(uvw::Loop& loop, llarp_time_t delay, std::function callback) { + auto timer = loop.resource(); + timer->on([f = std::move(callback)](const auto&, auto& timer) { + f(); + timer.stop(); + timer.close(); + }); + timer->start(delay, 0ms); + } + void - Loop::call_after_delay(llarp_time_t delay_ms, std::function callback) + Loop::call_later(llarp_time_t delay_ms, std::function callback) { llarp::LogTrace("Loop::call_after_delay()"); #ifdef TESTNET_SPEED delay_ms *= TESTNET_SPEED; #endif - auto timer = m_Impl->resource(); - timer->on([f = std::move(callback)](const auto&, auto& timer) { - f(); - timer.stop(); - timer.close(); - }); - timer->start(delay_ms, 0ms); + + if (inEventLoop()) + setup_oneshot_timer(*m_Impl, delay_ms, std::move(callback)); + else + { + call_soon([this, f = std::move(callback), target_time = time_now() + delay_ms] { + // Recalculate delay because it may have taken some time to get ourselves into the logic thread + auto updated_delay = target_time - time_now(); + if (updated_delay <= 0ms) + f(); // Timer already expired! + else + setup_oneshot_timer(*m_Impl, updated_delay, std::move(f)); + }); + } } void @@ -174,6 +190,9 @@ namespace llarp::uv { if (m_Run) { + if (not inEventLoop()) + return call_soon([this] { stop(); }); + llarp::LogInfo("stopping event loop"); m_Impl->walk([](auto&& handle) { if constexpr (!std::is_pointer_v>) @@ -181,19 +200,9 @@ namespace llarp::uv }); llarp::LogDebug("Closed all handles, stopping the loop"); m_Impl->stop(); - } - m_Run.store(false); - } - void - Loop::stopped() - { - if (m_Impl) - { - m_Impl->close(); - m_Impl.reset(); + m_Run.store(false); } - llarp::LogInfo("we have stopped"); } bool @@ -248,9 +257,8 @@ namespace llarp::uv m_WakeUp->send(); return; } - const bool inEventLoop = *m_EventLoopThreadID == std::this_thread::get_id(); - if (inEventLoop and m_LogicCalls.full()) + if (inEventLoop() and m_LogicCalls.full()) { FlushLogic(); } @@ -333,7 +341,7 @@ namespace llarp::uv } bool - Loop::inEventLoopThread() const + Loop::inEventLoop() const { return m_EventLoopThreadID and *m_EventLoopThreadID == std::this_thread::get_id(); } diff --git a/llarp/ev/ev_libuv.hpp b/llarp/ev/ev_libuv.hpp index 3560aabd5..dc1a82a35 100644 --- a/llarp/ev/ev_libuv.hpp +++ b/llarp/ev/ev_libuv.hpp @@ -2,7 +2,6 @@ #define LLARP_EV_LIBUV_HPP #include #include "udp_handle.hpp" -#include #include #include @@ -20,20 +19,21 @@ namespace llarp::uv class UVWakeup; class UVRepeater; - struct Loop final : public llarp::EventLoop + class Loop final : public llarp::EventLoop { + public: using Callback = std::function; Loop(size_t queue_size); void - run_loop() override; + run() override; bool running() const override; void - call_after_delay(llarp_time_t delay_ms, std::function callback) override; + call_later(llarp_time_t delay_ms, std::function callback) override; void tick_event_loop(); @@ -41,9 +41,6 @@ namespace llarp::uv void stop() override; - void - stopped() override; - bool add_ticker(std::function ticker) override; @@ -52,12 +49,6 @@ namespace llarp::uv std::shared_ptr netif, std::function handler) override; - void - set_logic(const std::shared_ptr& l) override - { - l->SetQueuer([this](std::function f) { call_soon(std::move(f)); }); - } - void call_soon(std::function f) override; @@ -79,7 +70,7 @@ namespace llarp::uv std::function PumpLL; bool - inEventLoopThread() const override; + inEventLoop() const override; private: std::shared_ptr m_Impl; diff --git a/llarp/ev/vpnio.cpp b/llarp/ev/vpnio.cpp index b7e05d3dc..50a3e4b88 100644 --- a/llarp/ev/vpnio.cpp +++ b/llarp/ev/vpnio.cpp @@ -1,7 +1,6 @@ #include #include #include -#include void llarp_vpn_io_impl::AsyncClose() diff --git a/llarp/handlers/exit.cpp b/llarp/handlers/exit.cpp index a610d2978..eae740e02 100644 --- a/llarp/handlers/exit.cpp +++ b/llarp/handlers/exit.cpp @@ -13,12 +13,12 @@ namespace llarp { namespace handlers { - ExitEndpoint::ExitEndpoint(const std::string& name, AbstractRouter* r) + ExitEndpoint::ExitEndpoint(std::string name, AbstractRouter* r) : m_Router(r) - , m_Resolver(std::make_shared(r->netloop(), r->logic(), this)) - , m_Name(name) + , m_Resolver(std::make_shared(r->loop(), this)) + , m_Name(std::move(name)) , m_LocalResolverAddr("127.0.0.1", 53) - , m_InetToNetwork(name + "_exit_rx", r->netloop(), r->netloop()) + , m_InetToNetwork(name + "_exit_rx", r->loop(), r->loop()) { m_ShouldInitTun = true; @@ -312,15 +312,14 @@ namespace llarp llarp::LogError("Could not create interface"); return false; } - auto loop = GetRouter()->netloop(); - if (not loop->add_network_interface( - m_NetIf, [&](net::IPPacket pkt) { OnInetPacket(std::move(pkt)); })) + if (not GetRouter()->loop()->add_network_interface( + m_NetIf, [this](net::IPPacket pkt) { OnInetPacket(std::move(pkt)); })) { llarp::LogWarn("Could not create tunnel for exit endpoint"); return false; } - loop->add_ticker([&]() { Flush(); }); + GetRouter()->loop()->add_ticker([this] { Flush(); }); llarp::LogInfo("Trying to start resolver ", m_LocalResolverAddr.toString()); return m_Resolver->Start(m_LocalResolverAddr.createSockAddr(), m_UpstreamResolvers); diff --git a/llarp/handlers/exit.hpp b/llarp/handlers/exit.hpp index c8e490ac3..9c3af2b17 100644 --- a/llarp/handlers/exit.hpp +++ b/llarp/handlers/exit.hpp @@ -13,7 +13,7 @@ namespace llarp { struct ExitEndpoint : public dns::IQueryHandler { - ExitEndpoint(const std::string& name, AbstractRouter* r); + ExitEndpoint(std::string name, AbstractRouter* r); ~ExitEndpoint() override; void diff --git a/llarp/handlers/tun.cpp b/llarp/handlers/tun.cpp index cb720c1d2..c7651e1c6 100644 --- a/llarp/handlers/tun.cpp +++ b/llarp/handlers/tun.cpp @@ -18,7 +18,6 @@ #include #include #include -#include #include #include @@ -48,7 +47,7 @@ namespace llarp TunEndpoint* const m_Endpoint; explicit DnsInterceptor(AbstractRouter* router, TunEndpoint* ep) - : dns::PacketHandler{router->logic(), ep}, m_Endpoint{ep} {}; + : dns::PacketHandler{router->loop(), ep}, m_Endpoint{ep} {}; void SendServerMessageBufferTo( @@ -97,7 +96,7 @@ namespace llarp TunEndpoint::TunEndpoint(AbstractRouter* r, service::Context* parent) : service::Endpoint(r, parent) - , m_UserToNetworkPktQueue("endpoint_sendq", r->netloop(), r->netloop()) + , m_UserToNetworkPktQueue("endpoint_sendq", r->loop(), r->loop()) { m_PacketRouter.reset( new vpn::PacketRouter{[&](net::IPPacket pkt) { HandleGotUserPacket(std::move(pkt)); }}); @@ -121,7 +120,7 @@ namespace llarp HandleGotUserPacket(std::move(pkt)); }); #else - m_Resolver = std::make_shared(r->netloop(), r->logic(), this); + m_Resolver = std::make_shared(r->loop(), this); #endif } @@ -838,9 +837,8 @@ namespace llarp m_IfName = m_NetIf->IfName(); LogInfo(Name(), " got network interface ", m_IfName); - auto netloop = Router()->netloop(); - if (not netloop->add_network_interface( - m_NetIf, [&](net::IPPacket pkt) { m_PacketRouter->HandleIPPacket(std::move(pkt)); })) + if (not Router()->loop()->add_network_interface( + m_NetIf, [this](net::IPPacket pkt) { m_PacketRouter->HandleIPPacket(std::move(pkt)); })) { LogError(Name(), " failed to add network interface"); return false; @@ -853,7 +851,7 @@ namespace llarp LogInfo(Name(), " has ipv6 address ", m_OurIPv6); } - netloop->add_ticker([&]() { Flush(); }); + Router()->loop()->add_ticker([this] { Flush(); }); if (m_OnUp) { diff --git a/llarp/link/i_link_manager.hpp b/llarp/link/i_link_manager.hpp index 250503b78..08d0f13d9 100644 --- a/llarp/link/i_link_manager.hpp +++ b/llarp/link/i_link_manager.hpp @@ -2,7 +2,6 @@ #define LLARP_I_LINK_MANAGER_HPP #include -#include #include #include @@ -13,8 +12,6 @@ struct llarp_buffer_t; namespace llarp { - using Logic_ptr = std::shared_ptr; - struct RouterContact; struct ILinkSession; struct IOutboundSessionMaker; @@ -52,7 +49,7 @@ namespace llarp AddLink(LinkLayer_ptr link, bool inbound = false) = 0; virtual bool - StartLinks(Logic_ptr logic) = 0; + StartLinks(const EventLoop_ptr& loop) = 0; virtual void Stop() = 0; diff --git a/llarp/link/link_manager.cpp b/llarp/link/link_manager.cpp index e5526a6de..478315585 100644 --- a/llarp/link/link_manager.cpp +++ b/llarp/link/link_manager.cpp @@ -121,12 +121,12 @@ namespace llarp } bool - LinkManager::StartLinks(Logic_ptr logic) + LinkManager::StartLinks(const EventLoop_ptr& loop) { LogInfo("starting ", outboundLinks.size(), " outbound links"); for (const auto& link : outboundLinks) { - if (!link->Start(logic)) + if (!link->Start(loop)) { LogWarn("outbound link '", link->Name(), "' failed to start"); return false; @@ -139,7 +139,7 @@ namespace llarp LogInfo("starting ", inboundLinks.size(), " inbound links"); for (const auto& link : inboundLinks) { - if (!link->Start(logic)) + if (!link->Start(loop)) { LogWarn("Link ", link->Name(), " failed to start"); return false; diff --git a/llarp/link/link_manager.hpp b/llarp/link/link_manager.hpp index bc01cb6e2..558b48ce2 100644 --- a/llarp/link/link_manager.hpp +++ b/llarp/link/link_manager.hpp @@ -47,7 +47,7 @@ namespace llarp AddLink(LinkLayer_ptr link, bool inbound = false) override; bool - StartLinks(Logic_ptr logic) override; + StartLinks(const EventLoop_ptr& loop) override; void Stop() override; diff --git a/llarp/link/server.cpp b/llarp/link/server.cpp index 0d6a16d03..a806e709c 100644 --- a/llarp/link/server.cpp +++ b/llarp/link/server.cpp @@ -341,11 +341,11 @@ namespace llarp } bool - ILinkLayer::Start(const std::shared_ptr& logic) + ILinkLayer::Start(const EventLoop_ptr& loop) { // Tie the lifetime of this repeater to this arbitrary shared_ptr: - m_repeater_keepalive = std::make_shared(42); - logic->call_every(LINK_LAYER_TICK_INTERVAL, m_repeater_keepalive, [this] { Tick(Now()); }); + m_repeater_keepalive = std::make_shared(0); + loop->call_every(LINK_LAYER_TICK_INTERVAL, m_repeater_keepalive, [this] { Tick(Now()); }); return true; } diff --git a/llarp/link/server.hpp b/llarp/link/server.hpp index 435ec028d..b8175c009 100644 --- a/llarp/link/server.hpp +++ b/llarp/link/server.hpp @@ -7,7 +7,6 @@ #include #include #include -#include #include #include @@ -132,7 +131,7 @@ namespace llarp TryEstablishTo(RouterContact rc); bool - Start(const std::shared_ptr& l); + Start(const EventLoop_ptr& loop); virtual void Stop(); diff --git a/llarp/messages/relay_commit.cpp b/llarp/messages/relay_commit.cpp index fbee25177..277f40d22 100644 --- a/llarp/messages/relay_commit.cpp +++ b/llarp/messages/relay_commit.cpp @@ -12,7 +12,6 @@ #include #include #include -#include #include #include @@ -238,10 +237,8 @@ namespace llarp break; } - auto func = - std::bind(&LR_StatusMessage::CreateAndSend, router, pathid, nextHop, pathKey, status); - - router->QueueWork(func); + router->QueueWork([router, pathid, nextHop, pathKey, status] { + LR_StatusMessage::CreateAndSend(router, pathid, nextHop, pathKey, status); }); } /// this is done from logic thread @@ -438,7 +435,7 @@ namespace llarp // we are the farthest hop llarp::LogDebug("We are the farthest hop for ", info); // send a LRSM down the path - LogicCall(self->context->logic(), [=]() { + self->context->loop()->call([self] { SendPathConfirm(self); self->decrypter = nullptr; }); @@ -447,7 +444,7 @@ namespace llarp { // forward upstream // we are still in the worker thread so post job to logic - LogicCall(self->context->logic(), [=]() { + self->context->loop()->call([self] { SendLRCM(self); self->decrypter = nullptr; }); diff --git a/llarp/messages/relay_status.cpp b/llarp/messages/relay_status.cpp index a6ef0dc44..e2e83be65 100644 --- a/llarp/messages/relay_status.cpp +++ b/llarp/messages/relay_status.cpp @@ -9,7 +9,6 @@ #include #include #include -#include #include #include @@ -224,8 +223,8 @@ namespace llarp LR_StatusMessage::QueueSendMessage( AbstractRouter* router, const RouterID nextHop, std::shared_ptr msg) { - auto func = std::bind(&LR_StatusMessage::SendMessage, router, nextHop, msg); - LogicCall(router->logic(), func); + router->loop()->call([router, nextHop, msg=std::move(msg)] { + SendMessage(router, nextHop, msg); }); } void diff --git a/llarp/nodedb.cpp b/llarp/nodedb.cpp index 6f01ca5f3..9503b6e70 100644 --- a/llarp/nodedb.cpp +++ b/llarp/nodedb.cpp @@ -7,7 +7,6 @@ #include #include #include -#include #include #include diff --git a/llarp/nodedb.hpp b/llarp/nodedb.hpp index 96495c36d..db6fc40ea 100644 --- a/llarp/nodedb.hpp +++ b/llarp/nodedb.hpp @@ -20,8 +20,6 @@ namespace llarp { - class Logic; - class NodeDB { struct Entry diff --git a/llarp/path/path.cpp b/llarp/path/path.cpp index 05e79d38d..2a2cb6745 100644 --- a/llarp/path/path.cpp +++ b/llarp/path/path.cpp @@ -14,7 +14,6 @@ #include #include #include -#include #include #include @@ -174,8 +173,7 @@ namespace llarp if ((currentStatus & LR_StatusRecord::SUCCESS) == LR_StatusRecord::SUCCESS) { llarp::LogDebug("LR_Status message processed, path build successful"); - auto self = shared_from_this(); - LogicCall(r->logic(), [=]() { self->HandlePathConfirmMessage(r); }); + r->loop()->call([r, self = shared_from_this()] { self->HandlePathConfirmMessage(r); }); } else { @@ -231,8 +229,7 @@ namespace llarp { llarp::LogDebug("Path build failed for an unspecified reason"); } - auto self = shared_from_this(); - LogicCall(r->logic(), [=]() { self->EnterState(ePathFailed, r->Now()); }); + r->loop()->call([r, self = shared_from_this()]() { self->EnterState(ePathFailed, r->Now()); }); } // TODO: meaningful return value? @@ -439,7 +436,7 @@ namespace llarp msg.pathid = TXID(); ++idx; } - LogicCall(r->logic(), [self = shared_from_this(), data = std::move(sendmsgs), r]() { + r->loop()->call([self = shared_from_this(), data = std::move(sendmsgs), r] () mutable { self->HandleAllUpstream(std::move(data), r); }); } @@ -509,7 +506,7 @@ namespace llarp sendMsgs[idx].X = buf; ++idx; } - LogicCall(r->logic(), [self = shared_from_this(), msgs = std::move(sendMsgs), r]() { + r->loop()->call([self = shared_from_this(), msgs = std::move(sendMsgs), r] () mutable { self->HandleAllDownstream(std::move(msgs), r); }); } diff --git a/llarp/path/path.hpp b/llarp/path/path.hpp index 39814d727..a1a97cd1d 100644 --- a/llarp/path/path.hpp +++ b/llarp/path/path.hpp @@ -28,7 +28,6 @@ namespace llarp { - class Logic; struct AbstractRouter; struct LR_CommitMessage; diff --git a/llarp/path/path_context.cpp b/llarp/path/path_context.cpp index d7fb966b3..75dbc6bcd 100644 --- a/llarp/path/path_context.cpp +++ b/llarp/path/path_context.cpp @@ -42,10 +42,10 @@ namespace llarp #endif } - std::shared_ptr - PathContext::logic() + const EventLoop_ptr& + PathContext::loop() { - return m_Router->logic(); + return m_Router->loop(); } const SecretKey& diff --git a/llarp/path/path_context.hpp b/llarp/path/path_context.hpp index a8c617804..cb9d08736 100644 --- a/llarp/path/path_context.hpp +++ b/llarp/path/path_context.hpp @@ -18,7 +18,6 @@ namespace llarp { - class Logic; struct AbstractRouter; struct LR_CommitMessage; struct RelayDownstreamMessage; @@ -147,8 +146,8 @@ namespace llarp } }; - std::shared_ptr - logic(); + const EventLoop_ptr& + loop(); AbstractRouter* Router(); diff --git a/llarp/path/pathbuilder.cpp b/llarp/path/pathbuilder.cpp index d9550ac98..63da90c86 100644 --- a/llarp/path/pathbuilder.cpp +++ b/llarp/path/pathbuilder.cpp @@ -7,7 +7,6 @@ #include #include #include -#include #include #include @@ -28,7 +27,7 @@ namespace llarp size_t idx = 0; AbstractRouter* router = nullptr; WorkerFunc_t work; - std::shared_ptr logic; + EventLoop_ptr loop; LR_CommitMessage LRCM; void @@ -97,21 +96,21 @@ namespace llarp { // farthest hop // TODO: encrypt junk frames because our public keys are not eligator - LogicCall(logic, std::bind(result, shared_from_this())); + loop->call([self = shared_from_this()] { self->result(self); }); } else { // next hop - work(std::bind(&AsyncPathKeyExchangeContext::GenerateNextKey, shared_from_this())); + work([self = shared_from_this()] { self->GenerateNextKey(); }); } } /// Generate all keys asynchronously and call handler when done void - AsyncGenerateKeys(Path_t p, std::shared_ptr l, WorkerFunc_t worker, Handler func) + AsyncGenerateKeys(Path_t p, EventLoop_ptr l, WorkerFunc_t worker, Handler func) { path = p; - logic = l; + loop = std::move(l); result = func; work = worker; @@ -119,7 +118,7 @@ namespace llarp { LRCM.frames[i].Randomize(); } - work(std::bind(&AsyncPathKeyExchangeContext::GenerateNextKey, shared_from_this())); + work([self = shared_from_this()] { self->GenerateNextKey(); }); } }; @@ -393,7 +392,7 @@ namespace llarp path->SetBuildResultHook([self](Path_ptr p) { self->HandlePathBuilt(p); }); ctx->AsyncGenerateKeys( path, - m_router->logic(), + m_router->loop(), [r = m_router](auto func) { r->QueueWork(std::move(func)); }, &PathBuilderKeysGenerated); } diff --git a/llarp/path/transit_hop.cpp b/llarp/path/transit_hop.cpp index 5c9e3c34a..9190abfcb 100644 --- a/llarp/path/transit_hop.cpp +++ b/llarp/path/transit_hop.cpp @@ -121,13 +121,10 @@ namespace llarp { auto flushIt = [self = shared_from_this(), r]() { std::vector msgs; - do + while (auto maybe = self->m_DownstreamGather.tryPopFront()) { - auto maybe = self->m_DownstreamGather.tryPopFront(); - if (not maybe) - break; - msgs.emplace_back(*maybe); - } while (true); + msgs.push_back(*maybe); + } self->HandleAllDownstream(std::move(msgs), r); }; for (auto& ev : *msgs) @@ -147,12 +144,12 @@ namespace llarp info.downstream); if (m_DownstreamGather.full()) { - LogicCall(r->logic(), flushIt); + r->loop()->call(flushIt); } if (m_DownstreamGather.enabled()) m_DownstreamGather.pushBack(msg); } - LogicCall(r->logic(), flushIt); + r->loop()->call(flushIt); } void @@ -160,13 +157,10 @@ namespace llarp { auto flushIt = [self = shared_from_this(), r]() { std::vector msgs; - do + while (auto maybe = self->m_UpstreamGather.tryPopFront()) { - auto maybe = self->m_UpstreamGather.tryPopFront(); - if (not maybe) - break; - msgs.emplace_back(*maybe); - } while (true); + msgs.push_back(*maybe); + } self->HandleAllUpstream(std::move(msgs), r); }; for (auto& ev : *msgs) @@ -179,12 +173,12 @@ namespace llarp msg.X = buf; if (m_UpstreamGather.full()) { - LogicCall(r->logic(), flushIt); + r->loop()->call(flushIt); } if (m_UpstreamGather.enabled()) m_UpstreamGather.pushBack(msg); } - LogicCall(r->logic(), flushIt); + r->loop()->call(flushIt); } void @@ -488,8 +482,7 @@ namespace llarp void TransitHop::QueueDestroySelf(AbstractRouter* r) { - auto func = std::bind(&TransitHop::SetSelfDestruct, shared_from_this()); - LogicCall(r->logic(), func); + r->loop()->call([self = shared_from_this()] { self->SetSelfDestruct(); }); } } // namespace path } // namespace llarp diff --git a/llarp/router/abstractrouter.hpp b/llarp/router/abstractrouter.hpp index 24bda166d..793fec6f7 100644 --- a/llarp/router/abstractrouter.hpp +++ b/llarp/router/abstractrouter.hpp @@ -29,7 +29,6 @@ namespace oxenmq namespace llarp { class NodeDB; - class Logic; struct Config; struct RouterID; struct ILinkMessage; @@ -101,9 +100,6 @@ namespace llarp virtual const std::shared_ptr& RpcClient() const = 0; - virtual const std::shared_ptr& - logic() const = 0; - virtual llarp_dht_context* dht() const = 0; @@ -135,7 +131,7 @@ namespace llarp routerProfiling() = 0; virtual const EventLoop_ptr& - netloop() const = 0; + loop() const = 0; /// call function in crypto worker virtual void QueueWork(std::function) = 0; diff --git a/llarp/router/outbound_message_handler.cpp b/llarp/router/outbound_message_handler.cpp index 7b926ae88..cf61d4fd1 100644 --- a/llarp/router/outbound_message_handler.cpp +++ b/llarp/router/outbound_message_handler.cpp @@ -113,11 +113,11 @@ namespace llarp void OutboundMessageHandler::Init( - ILinkManager* linkManager, I_RCLookupHandler* lookupHandler, std::shared_ptr logic) + ILinkManager* linkManager, I_RCLookupHandler* lookupHandler, EventLoop_ptr loop) { _linkManager = linkManager; _lookupHandler = lookupHandler; - _logic = logic; + _loop = std::move(loop); outboundMessageQueues.emplace(zeroID, MessageQueue()); } @@ -184,8 +184,8 @@ namespace llarp { if (callback) { - auto f = std::bind(callback, status); - LogicCall(_logic, [self = this, f]() { self->m_Killer.TryAccess(f); }); + auto f = [f=std::move(callback), status] { f(status); }; + _loop->call([this, f=std::move(f)] { m_Killer.TryAccess(f); }); } } diff --git a/llarp/router/outbound_message_handler.hpp b/llarp/router/outbound_message_handler.hpp index 44f1a8c76..f19e87f33 100644 --- a/llarp/router/outbound_message_handler.hpp +++ b/llarp/router/outbound_message_handler.hpp @@ -3,7 +3,7 @@ #include -#include +#include #include #include #include @@ -19,7 +19,6 @@ namespace llarp { struct ILinkManager; struct I_RCLookupHandler; - class Logic; enum class SessionResult; struct OutboundMessageHandler final : public IOutboundMessageHandler @@ -43,7 +42,7 @@ namespace llarp ExtractStatus() const override; void - Init(ILinkManager* linkManager, I_RCLookupHandler* lookupHandler, std::shared_ptr logic); + Init(ILinkManager* linkManager, I_RCLookupHandler* lookupHandler, EventLoop_ptr loop); private: using Message = std::pair, SendStatusHandler>; @@ -139,7 +138,7 @@ namespace llarp ILinkManager* _linkManager; I_RCLookupHandler* _lookupHandler; - std::shared_ptr _logic; + EventLoop_ptr _loop; util::ContentionKiller m_Killer; diff --git a/llarp/router/outbound_session_maker.cpp b/llarp/router/outbound_session_maker.cpp index 6461f9ccc..763353fe9 100644 --- a/llarp/router/outbound_session_maker.cpp +++ b/llarp/router/outbound_session_maker.cpp @@ -8,7 +8,6 @@ #include #include #include -#include #include #include #include @@ -163,16 +162,16 @@ namespace llarp ILinkManager* linkManager, I_RCLookupHandler* rcLookup, Profiling* profiler, - std::shared_ptr logic, + EventLoop_ptr loop, WorkerFunc_t dowork) { _router = router; _linkManager = linkManager; _rcLookup = rcLookup; - _logic = logic; + _loop = std::move(loop); _nodedb = router->nodedb(); _profiler = profiler; - work = dowork; + work = std::move(dowork); } void @@ -226,8 +225,7 @@ namespace llarp } if (ShouldConnectTo(router)) { - auto fn = std::bind(&OutboundSessionMaker::DoEstablish, this, router); - LogicCall(_logic, fn); + _loop->call([this, router] { DoEstablish(router); }); } } @@ -348,8 +346,7 @@ namespace llarp for (const auto& callback : movedCallbacks) { - auto func = std::bind(callback, router, type); - LogicCall(_logic, func); + _loop->call([callback, router, type] { return callback(router, type); }); } { diff --git a/llarp/router/outbound_session_maker.hpp b/llarp/router/outbound_session_maker.hpp index 8933be01a..bf4a39a9e 100644 --- a/llarp/router/outbound_session_maker.hpp +++ b/llarp/router/outbound_session_maker.hpp @@ -4,7 +4,6 @@ #include #include -#include #include #include @@ -60,7 +59,7 @@ namespace llarp ILinkManager* linkManager, I_RCLookupHandler* rcLookup, Profiling* profiler, - std::shared_ptr logic, + EventLoop_ptr loop, WorkerFunc_t work); void @@ -113,7 +112,7 @@ namespace llarp I_RCLookupHandler* _rcLookup = nullptr; Profiling* _profiler = nullptr; std::shared_ptr _nodedb; - std::shared_ptr _logic; + EventLoop_ptr _loop; WorkerFunc_t work; RouterID us; }; diff --git a/llarp/router/rc_lookup_handler.cpp b/llarp/router/rc_lookup_handler.cpp index 9c4dcd6de..b4750e62b 100644 --- a/llarp/router/rc_lookup_handler.cpp +++ b/llarp/router/rc_lookup_handler.cpp @@ -157,7 +157,7 @@ namespace llarp { LogDebug("Adding or updating RC for ", RouterID(rc.pubkey), " to nodedb and dht."); const RouterContact copy{rc}; - LogicCall(_logic, [copy, n = _nodedb]() { n->PutIfNewer(copy); }); + _loop->call([rc, n=_nodedb] { n->PutIfNewer(rc); }); _dht->impl->PutRCNodeAsync(rc); } @@ -301,7 +301,7 @@ namespace llarp RCLookupHandler::Init( llarp_dht_context* dht, std::shared_ptr nodedb, - std::shared_ptr logic, + EventLoop_ptr loop, WorkerFunc_t dowork, ILinkManager* linkManager, service::Context* hiddenServiceContext, @@ -311,9 +311,9 @@ namespace llarp bool isServiceNode_arg) { _dht = dht; - _nodedb = nodedb; - _logic = logic; - _work = dowork; + _nodedb = std::move(nodedb); + _loop = std::move(loop); + _work = std::move(dowork); _hiddenServiceContext = hiddenServiceContext; _strictConnectPubkeys = strictConnectPubkeys; _bootstrapRCList = bootstrapRCList; diff --git a/llarp/router/rc_lookup_handler.hpp b/llarp/router/rc_lookup_handler.hpp index 084142965..f248e2765 100644 --- a/llarp/router/rc_lookup_handler.hpp +++ b/llarp/router/rc_lookup_handler.hpp @@ -16,7 +16,7 @@ struct llarp_dht_context; namespace llarp { class NodeDB; - class Logic; + class EventLoop; namespace service { @@ -76,7 +76,7 @@ namespace llarp Init( llarp_dht_context* dht, std::shared_ptr nodedb, - std::shared_ptr logic, + std::shared_ptr loop, WorkerFunc_t dowork, ILinkManager* linkManager, service::Context* hiddenServiceContext, @@ -103,7 +103,7 @@ namespace llarp llarp_dht_context* _dht = nullptr; std::shared_ptr _nodedb; - std::shared_ptr _logic; + std::shared_ptr _loop; WorkerFunc_t _work = nullptr; service::Context* _hiddenServiceContext = nullptr; ILinkManager* _linkManager = nullptr; diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index 60a0602a8..e18a10c79 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -47,11 +47,10 @@ static constexpr std::chrono::milliseconds ROUTER_TICK_INTERVAL = 1s; namespace llarp { Router::Router( - EventLoop_ptr netloop, std::shared_ptr l, std::shared_ptr vpnPlatform) + EventLoop_ptr loop, std::shared_ptr vpnPlatform) : ready(false) , m_lmq(std::make_shared()) - , _netloop(std::move(netloop)) - , _logic(std::move(l)) + , _loop(std::move(loop)) , _vpnPlatform(std::move(vpnPlatform)) , paths(this) , _exitContext(this) @@ -364,7 +363,7 @@ namespace llarp if (_onDown) _onDown(); LogInfo("closing router"); - _netloop->stop(); + _loop->stop(); _running.store(false); } @@ -563,19 +562,19 @@ namespace llarp LogInfo("Loaded ", bootstrapRCList.size(), " bootstrap routers"); // Init components after relevant config settings loaded - _outboundMessageHandler.Init(&_linkManager, &_rcLookupHandler, _logic); + _outboundMessageHandler.Init(&_linkManager, &_rcLookupHandler, _loop); _outboundSessionMaker.Init( this, &_linkManager, &_rcLookupHandler, &_routerProfiling, - _logic, + _loop, util::memFn(&AbstractRouter::QueueWork, this)); _linkManager.Init(&_outboundSessionMaker); _rcLookupHandler.Init( _dht, _nodedb, - _logic, + _loop, util::memFn(&AbstractRouter::QueueWork, this), &_linkManager, &_hiddenServiceContext, @@ -603,7 +602,7 @@ namespace llarp { auto server = iwp::NewInboundLink( m_keyManager, - netloop(), + loop(), util::memFn(&AbstractRouter::rc, this), util::memFn(&AbstractRouter::HandleRecvLinkMessageBuffer, this), util::memFn(&AbstractRouter::Sign, this), @@ -618,7 +617,7 @@ namespace llarp const std::string& key = serverConfig.interface; int af = serverConfig.addressFamily; uint16_t port = serverConfig.port; - if (!server->Configure(netloop(), key, af, port)) + if (!server->Configure(loop(), key, af, port)) { throw std::runtime_error(stringify("failed to bind inbound link on ", key, " port ", port)); } @@ -1059,7 +1058,7 @@ namespace llarp } } _outboundSessionMaker.SetOurRouter(pubkey()); - if (!_linkManager.StartLinks(_logic)) + if (!_linkManager.StartLinks(_loop)) { LogWarn("One or more links failed to start."); return false; @@ -1120,11 +1119,11 @@ namespace llarp #ifdef _WIN32 // windows uses proactor event loop so we need to constantly pump - _netloop->add_ticker([this] { PumpLL(); }); + _loop->add_ticker([this] { PumpLL(); }); #else - _netloop->set_pump_function([this] { PumpLL(); }); + _loop->set_pump_function([this] { PumpLL(); }); #endif - _logic->call_every(ROUTER_TICK_INTERVAL, weak_from_this(), [this] { Tick(); }); + _loop->call_every(ROUTER_TICK_INTERVAL, weak_from_this(), [this] { Tick(); }); _running.store(true); _startedAt = Now(); #if defined(WITH_SYSTEMD) @@ -1161,7 +1160,7 @@ namespace llarp { StopLinks(); nodedb()->SaveToDisk(); - _logic->call_later(200ms, std::bind(&Router::AfterStopLinks, this)); + _loop->call_later(200ms, [this] { AfterStopLinks(); }); } void @@ -1208,7 +1207,7 @@ namespace llarp _exitContext.Stop(); paths.PumpUpstream(); _linkManager.PumpLinks(); - _logic->call_later(200ms, std::bind(&Router::AfterStopIssued, this)); + _loop->call_later(200ms, [this] { AfterStopIssued(); }); } bool @@ -1299,7 +1298,7 @@ namespace llarp { auto link = iwp::NewOutboundLink( m_keyManager, - netloop(), + loop(), util::memFn(&AbstractRouter::rc, this), util::memFn(&AbstractRouter::HandleRecvLinkMessageBuffer, this), util::memFn(&AbstractRouter::Sign, this), @@ -1324,7 +1323,7 @@ namespace llarp for (const auto af : {AF_INET, AF_INET6}) { - if (not link->Configure(netloop(), "*", af, m_OutboundPort)) + if (not link->Configure(loop(), "*", af, m_OutboundPort)) continue; #if defined(ANDROID) diff --git a/llarp/router/router.hpp b/llarp/router/router.hpp index 9b2738a78..df6b77008 100644 --- a/llarp/router/router.hpp +++ b/llarp/router/router.hpp @@ -35,7 +35,6 @@ #include #include #include -#include #include #include @@ -88,12 +87,6 @@ namespace llarp return m_lokidRpcClient; } - const std::shared_ptr& - logic() const override - { - return _logic; - } - llarp_dht_context* dht() const override { @@ -161,9 +154,9 @@ namespace llarp } const EventLoop_ptr& - netloop() const override + loop() const override { - return _netloop; + return _loop; } vpn::Platform* @@ -180,8 +173,7 @@ namespace llarp std::optional _ourAddress; - EventLoop_ptr _netloop; - std::shared_ptr _logic; + EventLoop_ptr _loop; std::shared_ptr _vpnPlatform; path::PathContext paths; exit::Context _exitContext; @@ -325,8 +317,7 @@ namespace llarp GossipRCIfNeeded(const RouterContact rc) override; explicit Router( - EventLoop_ptr netloop, - std::shared_ptr logic, + EventLoop_ptr loop, std::shared_ptr vpnPlatform); ~Router() override; diff --git a/llarp/rpc/endpoint_rpc.cpp b/llarp/rpc/endpoint_rpc.cpp index 4c621eb71..55cc342b3 100644 --- a/llarp/rpc/endpoint_rpc.cpp +++ b/llarp/rpc/endpoint_rpc.cpp @@ -29,7 +29,7 @@ namespace llarp::rpc }, [self = shared_from_this()](oxenmq::ConnectionID, std::string_view fail) { LogWarn("failed to connect to endpoint auth server: ", fail); - self->m_Endpoint->Logic()->call_later(1s, [self] { self->Start(); }); + self->m_Endpoint->Loop()->call_later(1s, [self] { self->Start(); }); }); } @@ -44,11 +44,10 @@ namespace llarp::rpc std::shared_ptr msg, std::function hook) { - assert(m_Endpoint->Logic()->inLogicThread()); service::ConvoTag tag = msg->tag; m_PendingAuths.insert(tag); const auto from = msg->sender.Addr(); - auto reply = m_Endpoint->Logic()->make_caller([this, tag, hook](service::AuthResult result) { + auto reply = m_Endpoint->Loop()->make_caller([this, tag, hook](service::AuthResult result) { m_PendingAuths.erase(tag); hook(result); }); diff --git a/llarp/rpc/lokid_rpc_client.cpp b/llarp/rpc/lokid_rpc_client.cpp index 25dd3324f..d14143c15 100644 --- a/llarp/rpc/lokid_rpc_client.cpp +++ b/llarp/rpc/lokid_rpc_client.cpp @@ -8,7 +8,6 @@ #include #include -#include namespace llarp { @@ -57,7 +56,7 @@ namespace llarp [self = shared_from_this()](oxenmq::ConnectionID) { self->Connected(); }, [self = shared_from_this(), url](oxenmq::ConnectionID, std::string_view f) { llarp::LogWarn("Failed to connect to lokid: ", f); - LogicCall(self->m_Router->logic(), [self, url]() { self->ConnectAsync(url); }); + self->m_Router->loop()->call([self, url]() { self->ConnectAsync(url); }); }); } @@ -170,7 +169,7 @@ namespace llarp return; } // inform router about the new list - LogicCall(m_Router->logic(), [r = m_Router, nodeList = std::move(nodeList)]() mutable { + m_Router->loop()->call([r = m_Router, nodeList = std::move(nodeList)]() mutable { r->SetRouterWhitelist(std::move(nodeList)); }); } @@ -252,7 +251,7 @@ namespace llarp LogError("failed to parse response from lns lookup: ", ex.what()); } } - LogicCall(r->logic(), [resultHandler, maybe]() { resultHandler(maybe); }); + r->loop()->call([resultHandler, maybe=std::move(maybe)]() { resultHandler(std::move(maybe)); }); }, req.dump()); } diff --git a/llarp/rpc/rpc_server.cpp b/llarp/rpc/rpc_server.cpp index d91cdbbcb..702835883 100644 --- a/llarp/rpc/rpc_server.cpp +++ b/llarp/rpc/rpc_server.cpp @@ -1,6 +1,5 @@ #include "rpc_server.hpp" #include -#include #include #include #include @@ -106,7 +105,7 @@ namespace llarp::rpc .add_request_command( "status", [&](oxenmq::Message& msg) { - LogicCall(m_Router->logic(), [defer = msg.send_later(), r = m_Router]() { + m_Router->loop()->call([defer = msg.send_later(), r = m_Router]() { std::string data; if (r->IsRunning()) { @@ -158,7 +157,7 @@ namespace llarp::rpc reply(CreateJSONError("no action taken")); return; } - LogicCall(r->logic(), [r, endpoint, kills, reply]() { + r->loop()->call([r, endpoint, kills, reply]() { auto ep = r->hiddenServiceContext().GetEndpointByName(endpoint); if (ep == nullptr) { @@ -236,8 +235,7 @@ namespace llarp::rpc { endpoint = endpoint_itr->get(); } - LogicCall( - r->logic(), [map, exit, lnsExit, range, token, endpoint, r, reply]() mutable { + r->loop()->call([map, exit, lnsExit, range, token, endpoint, r, reply]() mutable { auto ep = r->hiddenServiceContext().GetEndpointByName(endpoint); if (ep == nullptr) { @@ -274,6 +272,7 @@ namespace llarp::rpc } ctx->AsyncSendAuth( [onGoodResult, reply](service::AuthResult result) { + // TODO: refactor this code. We are 5 lambdas deep here! if (result.code != service::AuthResultCode::eAuthAccepted) { reply(CreateJSONError(result.reason)); diff --git a/llarp/service/async_key_exchange.cpp b/llarp/service/async_key_exchange.cpp index bb9f226f8..cd98e9924 100644 --- a/llarp/service/async_key_exchange.cpp +++ b/llarp/service/async_key_exchange.cpp @@ -3,7 +3,6 @@ #include #include #include -#include #include namespace llarp @@ -11,7 +10,7 @@ namespace llarp namespace service { AsyncKeyExchange::AsyncKeyExchange( - std::shared_ptr l, + EventLoop_ptr l, ServiceInfo r, const Identity& localident, const PQPubKey& introsetPubKey, @@ -19,7 +18,7 @@ namespace llarp IDataHandler* h, const ConvoTag& t, ProtocolType proto) - : logic(std::move(l)) + : loop(std::move(l)) , m_remote(std::move(r)) , m_LocalIdentity(localident) , introPubKey(introsetPubKey) @@ -74,7 +73,7 @@ namespace llarp self->msg.version = LLARP_PROTO_VERSION; // encrypt and sign if (frame->EncryptAndSign(self->msg, K, self->m_LocalIdentity)) - LogicCall(self->logic, std::bind(&AsyncKeyExchange::Result, self, frame)); + self->loop->call([self, frame] { AsyncKeyExchange::Result(self, frame); }); else { LogError("failed to encrypt and sign"); diff --git a/llarp/service/async_key_exchange.hpp b/llarp/service/async_key_exchange.hpp index 1b401f660..7cbaae9aa 100644 --- a/llarp/service/async_key_exchange.hpp +++ b/llarp/service/async_key_exchange.hpp @@ -7,13 +7,11 @@ namespace llarp { - class Logic; - namespace service { struct AsyncKeyExchange : public std::enable_shared_from_this { - std::shared_ptr logic; + EventLoop_ptr loop; SharedSecret sharedKey; ServiceInfo m_remote; const Identity& m_LocalIdentity; @@ -26,7 +24,7 @@ namespace llarp ConvoTag tag; AsyncKeyExchange( - std::shared_ptr l, + EventLoop_ptr l, ServiceInfo r, const Identity& localident, const PQPubKey& introsetPubKey, diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index a1b714cf7..279257589 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -21,7 +21,6 @@ #include #include #include -#include #include #include #include @@ -719,13 +718,13 @@ namespace llarp { if (not msg->foundRCs.empty()) { - for (auto rc : msg->foundRCs) + for (auto& rc : msg->foundRCs) { - Router()->QueueWork([rc = std::move(rc), logic = Router()->logic(), self = this, msg]() { + Router()->QueueWork([this, rc, msg]() mutable { bool valid = rc.Verify(llarp::time_now_ms()); - LogicCall(logic, [self, valid, rc = std::move(rc), msg]() { - self->Router()->nodedb()->PutIfNewer(rc); - self->HandleVerifyGotRouter(msg, rc.pubkey, valid); + Router()->loop()->call([this, valid, rc = std::move(rc), msg] { + Router()->nodedb()->PutIfNewer(rc); + HandleVerifyGotRouter(msg, rc.pubkey, valid); }); }); } @@ -920,8 +919,7 @@ namespace llarp { if (m_RecvQueue.full() || m_RecvQueue.empty()) { - auto self = this; - LogicCall(m_router->logic(), [self]() { self->FlushRecvData(); }); + m_router->loop()->call([this] { FlushRecvData(); }); } m_RecvQueue.pushBack(std::move(ev)); } @@ -990,7 +988,7 @@ namespace llarp } else { - Router()->logic()->Call([h = std::move(hook)] { + Router()->loop()->call([h = std::move(hook)] { h({AuthResultCode::eAuthAccepted, "OK"}); }); } @@ -1072,7 +1070,7 @@ namespace llarp RemoveConvoTag(frame.T); return true; } - if (not frame.AsyncDecryptAndVerify(Router()->logic(), p, m_Identity, this)) + if (not frame.AsyncDecryptAndVerify(Router()->loop(), p, m_Identity, this)) { // send reset convo tag message ProtocolFrame f; @@ -1509,10 +1507,10 @@ namespace llarp return m_state->m_Router; } - const std::shared_ptr& - Endpoint::Logic() + const EventLoop_ptr& + Endpoint::Loop() { - return Router()->logic(); + return Router()->loop(); } void diff --git a/llarp/service/endpoint.hpp b/llarp/service/endpoint.hpp index 5f0bf64b3..3680be09d 100644 --- a/llarp/service/endpoint.hpp +++ b/llarp/service/endpoint.hpp @@ -18,7 +18,6 @@ #include #include #include -#include #include #include @@ -136,10 +135,10 @@ namespace llarp void ResetInternalState() override; - /// logic (via router) + /// loop (via router) /// use when sending any data on a path - const std::shared_ptr& - Logic(); + const EventLoop_ptr& + Loop(); AbstractRouter* Router(); diff --git a/llarp/service/lookup.cpp b/llarp/service/lookup.cpp index 982e45010..d9ba90768 100644 --- a/llarp/service/lookup.cpp +++ b/llarp/service/lookup.cpp @@ -3,7 +3,6 @@ #include #include #include -#include #include namespace llarp @@ -25,8 +24,7 @@ namespace llarp auto msg = BuildRequestMessage(); if (!msg) return false; - endpoint = path->Endpoint(); - LogicCall(r->logic(), [=]() { path->SendRoutingMessage(*msg, r); }); + r->loop()->call([path=std::move(path), msg=std::move(msg), r] { path->SendRoutingMessage(*msg, r); }); return true; } } // namespace service diff --git a/llarp/service/outbound_context.cpp b/llarp/service/outbound_context.cpp index cedbe1f6e..dfb6c7514 100644 --- a/llarp/service/outbound_context.cpp +++ b/llarp/service/outbound_context.cpp @@ -202,7 +202,7 @@ namespace llarp currentConvoTag.Randomize(); auto frame = std::make_shared(); auto ex = std::make_shared( - m_Endpoint->Logic(), + m_Endpoint->Loop(), remoteIdent, m_Endpoint->GetIdentity(), currentIntroSet.K, @@ -211,12 +211,12 @@ namespace llarp currentConvoTag, t); - ex->hook = std::bind(&OutboundContext::Send, shared_from_this(), std::placeholders::_1, path); + ex->hook = [self = shared_from_this(), path](auto frame) { self->Send(std::move(frame), path); }; ex->msg.PutBuffer(payload); ex->msg.introReply = path->intro; frame->F = ex->msg.introReply.pathID; - m_Endpoint->Router()->QueueWork(std::bind(&AsyncKeyExchange::Encrypt, ex, frame)); + m_Endpoint->Router()->QueueWork([ex, frame] { return AsyncKeyExchange::Encrypt(ex, frame); }); } std::string @@ -581,7 +581,7 @@ namespace llarp }; } const auto& ident = m_Endpoint->GetIdentity(); - if (not frame.AsyncDecryptAndVerify(m_Endpoint->Logic(), p, ident, m_Endpoint, hook)) + if (not frame.AsyncDecryptAndVerify(m_Endpoint->Loop(), p, ident, m_Endpoint, hook)) { // send reset convo tag message ProtocolFrame f; diff --git a/llarp/service/protocol.cpp b/llarp/service/protocol.cpp index 14339c21c..55d79627b 100644 --- a/llarp/service/protocol.cpp +++ b/llarp/service/protocol.cpp @@ -4,7 +4,6 @@ #include #include #include -#include #include #include #include @@ -271,7 +270,7 @@ namespace llarp struct AsyncFrameDecrypt { path::Path_ptr path; - std::shared_ptr logic; + EventLoop_ptr loop; std::shared_ptr msg; const Identity& m_LocalIdentity; Endpoint* handler; @@ -279,13 +278,13 @@ namespace llarp const Introduction fromIntro; AsyncFrameDecrypt( - std::shared_ptr l, + EventLoop_ptr l, const Identity& localIdent, Endpoint* h, std::shared_ptr m, const ProtocolFrame& f, const Introduction& recvIntro) - : logic(std::move(l)) + : loop(std::move(l)) , msg(std::move(m)) , m_LocalIdentity(localIdent) , handler(h) @@ -403,7 +402,7 @@ namespace llarp bool ProtocolFrame::AsyncDecryptAndVerify( - std::shared_ptr logic, + EventLoop_ptr loop, path::Path_ptr recvPath, const Identity& localIdent, Endpoint* handler, @@ -416,9 +415,9 @@ namespace llarp LogInfo("Got protocol frame with new convo"); // we need to dh auto dh = std::make_shared( - logic, localIdent, handler, msg, *this, recvPath->intro); + loop, localIdent, handler, msg, *this, recvPath->intro); dh->path = recvPath; - handler->Router()->QueueWork(std::bind(&AsyncFrameDecrypt::Work, dh)); + handler->Router()->QueueWork([dh = std::move(dh)] { return AsyncFrameDecrypt::Work(dh); }); return true; } @@ -436,10 +435,10 @@ namespace llarp return false; } v->frame = *this; - auto callback = [logic, hook](std::shared_ptr msg) { + auto callback = [loop, hook](std::shared_ptr msg) { if (hook) { - LogicCall(logic, [msg, hook]() { hook(msg); }); + loop->call([msg, hook]() { hook(msg); }); } }; handler->Router()->QueueWork( diff --git a/llarp/service/protocol.hpp b/llarp/service/protocol.hpp index 0e9177dd0..a26834892 100644 --- a/llarp/service/protocol.hpp +++ b/llarp/service/protocol.hpp @@ -20,8 +20,6 @@ struct llarp_threadpool; namespace llarp { - class Logic; - namespace path { /// forward declare @@ -127,7 +125,7 @@ namespace llarp bool AsyncDecryptAndVerify( - std::shared_ptr logic, + EventLoop_ptr loop, path::Path_ptr fromPath, const Identity& localIdent, Endpoint* handler, diff --git a/llarp/service/sendcontext.cpp b/llarp/service/sendcontext.cpp index 725752c9a..012b1d691 100644 --- a/llarp/service/sendcontext.cpp +++ b/llarp/service/sendcontext.cpp @@ -3,7 +3,6 @@ #include #include #include -#include #include #include @@ -29,7 +28,7 @@ namespace llarp { if (m_SendQueue.empty() or m_SendQueue.full()) { - LogicCall(m_Endpoint->Logic(), [this] { FlushUpstream(); }); + m_Endpoint->Loop()->call([this] { FlushUpstream(); }); } m_SendQueue.pushBack(std::make_pair( std::make_shared(*msg, remoteIntro.pathID), path)); diff --git a/llarp/tooling/hive_context.cpp b/llarp/tooling/hive_context.cpp index 98c1502d1..8171e321f 100644 --- a/llarp/tooling/hive_context.cpp +++ b/llarp/tooling/hive_context.cpp @@ -8,9 +8,9 @@ namespace tooling {} std::shared_ptr - HiveContext::makeRouter(llarp::EventLoop_ptr netloop, std::shared_ptr logic) + HiveContext::makeRouter(llarp::EventLoop_ptr loop) { - return std::make_shared(netloop, logic, makeVPNPlatform(), m_hive); + return std::make_shared(loop, makeVPNPlatform(), m_hive); } HiveRouter* diff --git a/llarp/tooling/hive_context.hpp b/llarp/tooling/hive_context.hpp index 42c998287..6a394c8b0 100644 --- a/llarp/tooling/hive_context.hpp +++ b/llarp/tooling/hive_context.hpp @@ -12,7 +12,7 @@ namespace tooling HiveContext(RouterHive* hive); std::shared_ptr - makeRouter(llarp::EventLoop_ptr netloop, std::shared_ptr logic) override; + makeRouter(llarp::EventLoop_ptr loop) override; /// Get this context's router as a HiveRouter. /// diff --git a/llarp/tooling/hive_router.cpp b/llarp/tooling/hive_router.cpp index 54ccb8a17..dd2c90e38 100644 --- a/llarp/tooling/hive_router.cpp +++ b/llarp/tooling/hive_router.cpp @@ -5,11 +5,10 @@ namespace tooling { HiveRouter::HiveRouter( - llarp::EventLoop_ptr netloop, - std::shared_ptr logic, + llarp::EventLoop_ptr loop, std::shared_ptr plat, RouterHive* hive) - : Router(netloop, logic, plat), m_hive(hive) + : Router(loop, plat), m_hive(hive) {} bool diff --git a/llarp/tooling/hive_router.hpp b/llarp/tooling/hive_router.hpp index 6aaf571c8..ce8d9c6f7 100644 --- a/llarp/tooling/hive_router.hpp +++ b/llarp/tooling/hive_router.hpp @@ -11,8 +11,7 @@ namespace tooling struct HiveRouter : public llarp::Router { explicit HiveRouter( - llarp::EventLoop_ptr netloop, - std::shared_ptr logic, + llarp::EventLoop_ptr loop, std::shared_ptr vpnPlatform, RouterHive* hive); diff --git a/llarp/tooling/router_hive.cpp b/llarp/tooling/router_hive.cpp index 520d67600..b4e933615 100644 --- a/llarp/tooling/router_hive.cpp +++ b/llarp/tooling/router_hive.cpp @@ -2,7 +2,6 @@ #include "llarp.h" #include "llarp.hpp" -#include "util/thread/logic.hpp" #include "util/str.hpp" #include "router/abstractrouter.hpp" @@ -73,13 +72,13 @@ namespace tooling RouterHive::StopRouters() { llarp::LogInfo("Signalling all routers to stop"); - for (auto [routerId, ctx] : relays) + for (auto& [routerId, ctx] : relays) { - LogicCall(ctx->logic, [ctx]() { ctx->HandleSignal(SIGINT); }); + ctx->mainloop->call([ctx=ctx]() { ctx->HandleSignal(SIGINT); }); } - for (auto [routerId, ctx] : clients) + for (auto& [routerId, ctx] : clients) { - LogicCall(ctx->logic, [ctx]() { ctx->HandleSignal(SIGINT); }); + ctx->mainloop->call([ctx=ctx]() { ctx->HandleSignal(SIGINT); }); } llarp::LogInfo("Waiting on routers to be stopped"); @@ -149,8 +148,8 @@ namespace tooling void RouterHive::VisitRouter(Context_ptr ctx, std::function visit) { - // TODO: this should be called from each router's appropriate Logic thread, e.g.: - // LogicCall(ctx->logic, [visit, ctx]() { visit(ctx); }); + // TODO: this should be called from each router's appropriate Loop, e.g.: + // ctx->mainloop->call([visit, ctx]() { visit(ctx); }); // however, this causes visit calls to be deferred visit(ctx); } @@ -172,18 +171,18 @@ namespace tooling std::vector RouterHive::RelayConnectedRelays() { - std::lock_guard guard{routerMutex}; + std::lock_guard guard{routerMutex}; std::vector results; results.resize(relays.size()); std::mutex results_lock; size_t i = 0; size_t done_count = 0; - for (auto [routerId, ctx] : relays) + for (auto& [routerId, ctx] : relays) { - LogicCall(ctx->logic, [&, i, ctx]() { + ctx->mainloop->call([&, i, ctx=ctx]() { size_t count = ctx->router->NumberOfConnectedRouters(); - std::lock_guard guard{results_lock}; + std::lock_guard guard{results_lock}; results[i] = count; done_count++; }); @@ -194,7 +193,7 @@ namespace tooling { size_t read_done_count = 0; { - std::lock_guard guard{results_lock}; + std::lock_guard guard{results_lock}; read_done_count = done_count; } if (read_done_count == relays.size()) diff --git a/llarp/util/thread/logic.cpp b/llarp/util/thread/logic.cpp deleted file mode 100644 index c20cb00c6..000000000 --- a/llarp/util/thread/logic.cpp +++ /dev/null @@ -1,48 +0,0 @@ -#include -#include -#include - -#include - -namespace llarp -{ - void - Logic::stop() - { - llarp::LogDebug("logic thread stop"); - } - - void - Logic::Call(std::function func) - { - m_Queue(std::move(func)); - } - - void - Logic::SetQueuer(std::function)> q) - { - m_Queue = std::move(q); - } - - void - Logic::call_later(llarp_time_t timeout, std::function func) - { - Call([this, timeout, f = std::move(func)]() mutable { - m_Loop->call_after_delay(timeout, std::move(f)); - }); - } - - void - Logic::set_event_loop(EventLoop* loop) - { - m_Loop = loop; - SetQueuer([loop](std::function work) { loop->call_soon(work); }); - } - - void - Logic::clear_event_loop() - { - m_Loop = nullptr; - } - -} // namespace llarp diff --git a/llarp/util/thread/logic.hpp b/llarp/util/thread/logic.hpp deleted file mode 100644 index a05ab1819..000000000 --- a/llarp/util/thread/logic.hpp +++ /dev/null @@ -1,114 +0,0 @@ -#ifndef LLARP_LOGIC_HPP -#define LLARP_LOGIC_HPP - -#include -#include - -namespace llarp -{ - class Logic - { - public: - /// stop all operation and wait for that to die - void - stop(); - - void - Call(std::function func); - - // Calls the given function once, after the given delay. - void - call_later(llarp_time_t later, std::function func); - - // Calls the given function repeatedly, forever, as long as the event loop lasts; the initial - // call will be after the given delay. - void - call_forever(llarp_time_t repeat, std::function func); - - // Created a repeated timer, like call_forever(repeat, func), but ties the lifetime of the - // callback to `owner`: callbacks will be invoked so long as `owner` remains alive, but - // thereafter the callback will be destroyed. Intended to be used as: - // - // logic->call_every(100ms, shared_from_this(), [this] { some_func(); }); - // - template - void - call_every(llarp_time_t repeat, std::weak_ptr owner, Callable f) - { - auto repeater = m_Loop->make_repeater(); - auto& r = *repeater; - r.start( - repeat, - [repeater = std::move(repeater), owner = std::move(owner), f = std::move(f)]() mutable { - if (auto ptr = owner.lock()) - f(); - else - repeater.reset(); // Remove timer on destruction (we should be the only thing holder - // the repeater) - }); - } - - // Wraps a lambda with a lambda that triggers it to be called via Logic::Call() - // when invoked. E.g.: - // - // auto x = logic->make_caller([] (int a) { std::cerr << a; }); - // x(42); - // x(99); - // - // will schedule two calls of the inner lambda (with different arguments) in the logic thread. - // Arguments are forwarded to the inner lambda (allowing moving arguments into it). - template - auto - make_caller(Callable&& f) - { - return [this, f = std::forward(f)](auto&&... args) { - // This shared pointer in a pain in the ass but needed because this lambda is going into a - // std::function that only accepts copyable lambdas. I *want* to simply capture: - // args=std::make_tuple(std::forward(args)...) - // but that fails if any given args aren't copyable. Dammit. - auto args_tuple_ptr = std::make_shared...>>( - std::forward(args)...); - Call([f, args = std::move(args_tuple_ptr)]() mutable { - // Moving away the tuple args here is okay because this lambda will only be invoked once - std::apply(f, std::move(*args)); - }); - }; - } - - void - SetQueuer(std::function)> q); - - EventLoop* - event_loop() - { - return m_Loop; - } - - void - set_event_loop(EventLoop* loop); - - void - clear_event_loop(); - - bool - inLogicThread() const - { - return m_Loop and m_Loop->inEventLoopThread(); - } - - private: - EventLoop* m_Loop = nullptr; - std::function)> m_Queue; - }; -} // namespace llarp - -/// this used to be a macro -template -static bool -LogicCall(const Logic_ptr& logic, Func_t func) -{ - logic->Call(std::move(func)); - return true; -} - -#endif diff --git a/test/iwp/test_iwp_session.cpp b/test/iwp/test_iwp_session.cpp index 09c011f6b..9d5241bd0 100644 --- a/test/iwp/test_iwp_session.cpp +++ b/test/iwp/test_iwp_session.cpp @@ -20,16 +20,14 @@ namespace iwp = llarp::iwp; namespace util = llarp::util; /// make an iwp link -template +template static llarp::LinkLayer_ptr -make_link(Args_t... args) +make_link(Args&&... args) { if (inbound) - return iwp::NewInboundLink(args...); - else - return iwp::NewOutboundLink(args...); + return iwp::NewInboundLink(std::forward(args)...); + return iwp::NewOutboundLink(std::forward(args)...); } -using Logic_ptr = std::shared_ptr; /// a single iwp link with associated keys and members to make unit tests work struct IWPLinkContext @@ -135,7 +133,7 @@ using Context_ptr = std::shared_ptr; /// call take 2 parameters, test and a timeout /// /// test is a callable that takes 5 arguments: -/// 0) std::function that starts the iwp links and gives a logic to call with +/// 0) std::function that starts the iwp links and gives an event loop to call with /// 1) std::function that ends the unit test if we are done /// 2) std::function that ends the unit test right now as a success /// 3) client iwp link context (shared_ptr) @@ -150,10 +148,7 @@ RunIWPTest(Func_t test, Duration_t timeout = 10s) // shut up logs llarp::LogSilencer shutup; // set up event loop - auto logic = std::make_shared(); auto loop = llarp::EventLoop::create(); - loop->set_logic(logic); - logic->set_event_loop(loop.get()); llarp::LogContext::Instance().Initialize( llarp::eLogDebug, llarp::LogType::File, "stdout", "unit test", [loop](auto work) { @@ -174,33 +169,32 @@ RunIWPTest(Func_t test, Duration_t timeout = 10s) auto recipiant = std::make_shared("127.0.0.1:3002", loop); // function for ending unit test on success - auto endIfDone = [initiator, recipiant, loop, logic]() { + auto endIfDone = [initiator, recipiant, loop]() { if (initiator->gucci and recipiant->gucci) { - LogicCall(logic, [loop] { loop->stop(); }); + loop->stop(); } }; - // function to start test and give logic to unit test - auto start = [initiator, recipiant, logic]() { - REQUIRE(initiator->link->Start(logic)); - REQUIRE(recipiant->link->Start(logic)); - return logic; + // function to start test and give loop to unit test + auto start = [initiator, recipiant, loop]() { + REQUIRE(initiator->link->Start(loop)); + REQUIRE(recipiant->link->Start(loop)); + return loop; }; // function to end test immediately - auto endTest = [logic, loop]() { LogicCall(logic, [loop] { loop->stop(); }); }; + auto endTest = [loop] { loop->stop(); }; - loop->call_after_delay( - std::chrono::duration_cast(timeout), []() { FAIL("test timeout"); }); + loop->call_later(timeout, [] { FAIL("test timeout"); }); test(start, endIfDone, endTest, initiator, recipiant); - loop->run(*logic); + loop->run(); llarp::RouterContact::BlockBogons = oldBlockBogons; } /// ensure clients can connect to relays TEST_CASE("IWP handshake", "[iwp]") { - RunIWPTest([](std::function start, + RunIWPTest([](std::function start, std::function endIfDone, [[maybe_unused]] std::function endTestNow, Context_ptr alice, @@ -218,16 +212,16 @@ TEST_CASE("IWP handshake", "[iwp]") endIfDone(); }); // start unit test - auto logic = start(); + auto loop = start(); // try establishing a session - LogicCall(logic, [link = alice->link, rc = bob->rc]() { REQUIRE(link->TryEstablishTo(rc)); }); + loop->call([link = alice->link, rc = bob->rc]() { REQUIRE(link->TryEstablishTo(rc)); }); }); } /// ensure relays cannot connect to clients TEST_CASE("IWP handshake reverse", "[iwp]") { - RunIWPTest([](std::function start, + RunIWPTest([](std::function start, [[maybe_unused]] std::function endIfDone, std::function endTestNow, Context_ptr alice, @@ -235,9 +229,9 @@ TEST_CASE("IWP handshake reverse", "[iwp]") alice->InitLink([](auto) {}); bob->InitLink([](auto) {}); // start unit test - auto logic = start(); + auto loop = start(); // try establishing a session in the wrong direction - LogicCall(logic, [logic, link = bob->link, rc = alice->rc, endTestNow]() { + loop->call([link = bob->link, rc = alice->rc, endTestNow] { REQUIRE(not link->TryEstablishTo(rc)); endTestNow(); }); @@ -249,7 +243,7 @@ TEST_CASE("IWP send messages", "[iwp]") { int aliceNumSent = 0; int bobNumSent = 0; - RunIWPTest([&aliceNumSent, &bobNumSent](std::function start, + RunIWPTest([&aliceNumSent, &bobNumSent](std::function start, std::function endIfDone, std::function endTestNow, Context_ptr alice, @@ -309,9 +303,9 @@ TEST_CASE("IWP send messages", "[iwp]") } }); // start unit test - auto logic = start(); + auto loop = start(); // try establishing a session from alice to bob - LogicCall(logic, [logic, link = alice->link, rc = bob->rc, endTestNow]() { + loop->call([link = alice->link, rc = bob->rc, endTestNow]() { REQUIRE(link->TryEstablishTo(rc)); }); });