Replace libuv with uvw & related refactoring

- removes all the llarp_ev_* functions, replacing with methods/classes/functions in the llarp
  namespace.
- banish ev/ev.h to the void
- Passes various things by const lvalue ref, especially shared_ptr's that don't need to be copied
  (to avoid an atomic refcount increment/decrement).
- Add a llarp::UDPHandle abstract class for UDP handling
- Removes the UDP tick handler; code that needs tick can just do a separate handler on the event
  loop outside the UDP socket.
- Adds an "OwnedBuffer" which owns its own memory but is implicitly convertible to a llarp_buffer_t.
  This is mostly needed to take over ownership of buffers from uvw without copying them as,
  currently, uvw does its own allocation (pending some open upstream issues/PRs).
- Logic:
  - add `make_caller`/`call_forever`/`call_every` utility functions to abstract Call wrapping and
    dependent timed tasks.
  - Add inLogicThread() so that code can tell its inside the logic thread (typically for
    debugging assertions).
  - get rid of janky integer returns and dealing with cancellations on call_later: the other methods
    added here and the event loop code remove the need for them.
- Event loop:
  - redo everything with uvw instead of libuv
  - rename EventLoopWakeup::Wakeup to EventLoopWakeup::Trigger to better reflect what it does.
  - add EventLoopRepeater for repeated events, and replace the code that reschedules itself every
    time it is called with a repeater.
  - Split up `EventLoop::run()` into a non-virtual base method and abstract `run_loop()` methods;
    the base method does a couple extra setup/teardown things that don't need to be in the derived class.
  - udp_listen is replaced with ev->udp(...) which returns a new UDPHandle object rather that
    needing gross C-style-but-not-actually-C-compatible structs.
  - Remove unused register_poll_fd_(un)readable
  - Use shared_ptr for EventLoopWakeup rather than returning a raw pointer; uvw lets us not have to
    worry about having the event loop class maintain ownership of it.
  - Add factory EventLoop::create() function to create a default (uvw-based) event loop (previously
    this was one of the llarp_ev_blahblah unnamespaced functions).
  - ev_libuv: this is mostly rewritten; all of the glue code/structs, in particular, are gone as
    they are no longer needed with uvw.
- DNS:
  - Rename DnsHandler to DnsInterceptor to better describe what it does (this is the code that
    intercepts all DNS to the tun IP range for Android).
- endpoint:
  - remove unused "isolated network" code
  - remove distinct (but actually always the same) variables for router/endpoint logic objects
- llarp_buffer_t
  - make constructors type-safe against being called with points to non-size-1 values
- tun packet reading:
  - read all available packets off the device/file descriptor; previously we were reading one packet
    at a time then returning to the event loop to poll again.
  - ReadNextPacket() now returns a 0-size packet if the read would block (so that we can implement
    the previous point).
  - ReadNextPacket() now throws on I/O error
- Miscellaneous code cleanups/simplifications
pull/1557/head
Jason Rhinelander 3 years ago
parent c71d3527bd
commit 5b555ee5aa

3
.gitmodules vendored

@ -26,3 +26,6 @@
[submodule "external/oxen-mq"]
path = external/oxen-mq
url = https://github.com/oxen-io/oxen-mq
[submodule "external/uvw"]
path = external/uvw
url = https://github.com/skypjack/uvw.git

@ -316,6 +316,7 @@ if(SUBMODULE_CHECK)
check_submodule(external/pybind11)
check_submodule(external/sqlite_orm)
check_submodule(external/oxen-mq)
check_submodule(external/uvw)
endif()
endif()

1
external/uvw vendored

@ -0,0 +1 @@
Subproject commit 58b299ee60d62386a2339dab3f99d30570b33085

@ -47,6 +47,10 @@ if(ANDROID)
target_link_libraries(lokinet-util PUBLIC log)
endif()
add_library(uvw INTERFACE)
target_include_directories(uvw INTERFACE ${PROJECT_SOURCE_DIR}/external/uvw/src)
target_link_libraries(uvw INTERFACE libuv)
add_library(lokinet-platform
# for networking
ev/ev.cpp
@ -63,7 +67,7 @@ add_library(lokinet-platform
vpn/platform.cpp
)
target_link_libraries(lokinet-platform PUBLIC lokinet-cryptography lokinet-util Threads::Threads base_libs libuv)
target_link_libraries(lokinet-platform PUBLIC lokinet-cryptography lokinet-util Threads::Threads base_libs uvw)
if (ANDROID)
@ -81,7 +85,6 @@ endif()
if (WIN32)
target_sources(lokinet-platform PRIVATE
ev/ev_libuv.cpp
win32/win32_inet.c
win32/win32_intrnl.c)

@ -1,5 +1,6 @@
#include <llarp.hpp>
#include <constants/version.hpp>
#include <constants/evloop.hpp>
#include <config/config.hpp>
#include <crypto/crypto_libsodium.hpp>
@ -64,7 +65,7 @@ namespace llarp
if (mainloop == nullptr)
{
auto jobQueueSize = std::max(event_loop_queue_size, config->router.m_JobQueueSize);
mainloop = llarp_make_ev_loop(jobQueueSize);
mainloop = EventLoop::create(jobQueueSize);
}
logic->set_event_loop(mainloop.get());
@ -116,7 +117,7 @@ namespace llarp
// run net io thread
llarp::LogInfo("running mainloop");
llarp_ev_loop_run_single_process(mainloop, logic);
mainloop->run(*logic);
if (closeWaiter)
{
closeWaiter->set_value();

@ -122,6 +122,16 @@ namespace llarp
return true;
}
OwnedBuffer
Message::ToBuffer() const
{
std::array<byte_t, 1500> tmp;
llarp_buffer_t buf{tmp};
if (not Encode(&buf))
throw std::runtime_error("cannot encode dns message");
return OwnedBuffer::copy_used(buf);
}
void Message::AddServFail(RR_TTL_t)
{
if (questions.size())

@ -83,6 +83,10 @@ namespace llarp
bool
Decode(llarp_buffer_t* buf) override;
// Wrapper around Encode that encodes into a new buffer and returns it
[[nodiscard]] OwnedBuffer
ToBuffer() const;
std::ostream&
print(std::ostream& stream, int level, int spaces) const;

@ -4,31 +4,19 @@
#include <util/thread/logic.hpp>
#include <array>
#include <utility>
#include <ev/udp_handle.hpp>
namespace llarp::dns
{
static std::vector<byte_t>
MessageToBuffer(Message msg)
{
std::array<byte_t, 1500> tmp = {{0}};
llarp_buffer_t buf{tmp};
if (not msg.Encode(&buf))
throw std::runtime_error("cannot encode dns message");
std::vector<byte_t> pkt;
pkt.resize(buf.cur - buf.base);
std::copy_n(tmp.data(), pkt.size(), pkt.data());
return pkt;
}
PacketHandler::PacketHandler(Logic_ptr logic, IQueryHandler* h)
: m_QueryHandler{h}, m_Logic{logic}
: m_QueryHandler{h}, m_Logic{std::move(logic)}
{}
Proxy::Proxy(llarp_ev_loop_ptr loop, Logic_ptr logic, IQueryHandler* h)
Proxy::Proxy(EventLoop_ptr loop, Logic_ptr logic, IQueryHandler* h)
: PacketHandler{logic, h}, m_Loop(std::move(loop))
{
m_Server.user = this;
m_Server.tick = nullptr;
m_Server.recvfrom = &HandleUDP;
m_Server =
m_Loop->udp([this](UDPHandle&, SockAddr a, OwnedBuffer buf) { HandlePacket(a, a, buf); });
}
void
@ -43,23 +31,7 @@ namespace llarp::dns
{
if (not PacketHandler::Start(addr, std::move(resolvers)))
return false;
return (llarp_ev_add_udp(m_Loop, &m_Server, addr) == 0);
}
static Proxy::Buffer_t
CopyBuffer(const llarp_buffer_t& buf)
{
std::vector<byte_t> msgbuf(buf.sz);
std::copy_n(buf.base, buf.sz, msgbuf.data());
return msgbuf;
}
void
Proxy::HandleUDP(llarp_udp_io* u, const SockAddr& from, ManagedBuffer buf)
{
Buffer_t msgbuf = CopyBuffer(buf.underlying);
auto self = static_cast<Proxy*>(u->user);
self->HandlePacket(from, from, std::move(msgbuf));
return m_Server->listen(addr);
}
void
@ -81,21 +53,15 @@ namespace llarp::dns
bool
PacketHandler::SetupUnboundResolver(std::vector<IpAddress> resolvers)
{
auto failFunc = [self = weak_from_this()](SockAddr from, SockAddr to, Message msg) {
auto this_ptr = self.lock();
if (this_ptr)
{
this_ptr->SendServerMessageBufferTo(from, to, MessageToBuffer(std::move(msg)));
}
auto failFunc = [self = weak_from_this()](
const SockAddr& from, const SockAddr& to, Message msg) {
if (auto this_ptr = self.lock())
this_ptr->SendServerMessageBufferTo(from, to, msg.ToBuffer());
};
auto replyFunc = [self = weak_from_this()](
SockAddr from, SockAddr to, std::vector<byte_t> buf) {
auto this_ptr = self.lock();
if (this_ptr)
{
this_ptr->SendServerMessageBufferTo(from, to, std::move(buf));
}
auto replyFunc = [self = weak_from_this()](auto&&... args) {
if (auto this_ptr = self.lock())
this_ptr->SendServerMessageBufferTo(std::forward<decltype(args)>(args)...);
};
m_UnboundResolver =
@ -121,26 +87,24 @@ namespace llarp::dns
}
void
Proxy::SendServerMessageBufferTo(SockAddr, SockAddr to, Buffer_t buf)
Proxy::SendServerMessageBufferTo(const SockAddr&, const SockAddr& to, llarp_buffer_t buf)
{
if (llarp_ev_udp_sendto(&m_Server, to, buf) < 0)
if (!m_Server->send(to, buf))
llarp::LogError("dns reply failed");
}
bool
PacketHandler::ShouldHandlePacket(SockAddr to, SockAddr from, Buffer_t buf) const
PacketHandler::ShouldHandlePacket(
const SockAddr& to, [[maybe_unused]] const SockAddr& from, llarp_buffer_t buf) const
{
(void)from;
MessageHeader hdr;
llarp_buffer_t pkt{buf};
if (not hdr.Decode(&pkt))
if (not hdr.Decode(&buf))
{
return false;
}
Message msg{hdr};
if (not msg.Decode(&pkt))
if (not msg.Decode(&buf))
{
return false;
}
@ -156,18 +120,17 @@ namespace llarp::dns
}
void
PacketHandler::HandlePacket(SockAddr resolver, SockAddr from, Buffer_t buf)
PacketHandler::HandlePacket(const SockAddr& resolver, const SockAddr& from, llarp_buffer_t buf)
{
MessageHeader hdr;
llarp_buffer_t pkt{buf};
if (not hdr.Decode(&pkt))
if (not hdr.Decode(&buf))
{
llarp::LogWarn("failed to parse dns header from ", from);
return;
}
Message msg(hdr);
if (not msg.Decode(&pkt))
if (not msg.Decode(&buf))
{
llarp::LogWarn("failed to parse dns message from ", from);
return;
@ -186,7 +149,7 @@ namespace llarp::dns
// yea it is, let's turn off DoH because god is dead.
msg.AddNXReply();
// press F to pay respects
SendServerMessageBufferTo(resolver, from, MessageToBuffer(std::move(msg)));
SendServerMessageBufferTo(resolver, from, msg.ToBuffer());
return;
}
}
@ -194,7 +157,7 @@ namespace llarp::dns
if (m_QueryHandler && m_QueryHandler->ShouldHookDNSMessage(msg))
{
auto reply = [self = shared_from_this(), to = from, resolver](dns::Message msg) {
self->SendServerMessageBufferTo(resolver, to, MessageToBuffer(std::move(msg)));
self->SendServerMessageBufferTo(resolver, to, msg.ToBuffer());
};
if (!m_QueryHandler->HandleHookedDNSMessage(std::move(msg), reply))
{
@ -206,7 +169,7 @@ namespace llarp::dns
// no upstream resolvers
// let's serv fail it
msg.AddServFail();
SendServerMessageBufferTo(resolver, from, MessageToBuffer(std::move(msg)));
SendServerMessageBufferTo(resolver, from, msg.ToBuffer());
}
else
{

@ -2,7 +2,7 @@
#define LLARP_DNS_SERVER_HPP
#include <dns/message.hpp>
#include <ev/ev.h>
#include <ev/ev.hpp>
#include <net/net.hpp>
#include <util/thread/logic.hpp>
#include <dns/unbound_resolver.hpp>
@ -14,8 +14,9 @@ namespace llarp
namespace dns
{
/// handler of dns query hooking
struct IQueryHandler
class IQueryHandler
{
public:
virtual ~IQueryHandler() = default;
/// return true if we should hook this message
@ -27,10 +28,11 @@ namespace llarp
HandleHookedDNSMessage(Message query, std::function<void(Message)> sendReply) = 0;
};
struct PacketHandler : public std::enable_shared_from_this<PacketHandler>
// Base class for DNS lookups
class PacketHandler : public std::enable_shared_from_this<PacketHandler>
{
public:
using Logic_ptr = std::shared_ptr<Logic>;
using Buffer_t = std::vector<uint8_t>;
explicit PacketHandler(Logic_ptr logic, IQueryHandler* handler);
@ -46,18 +48,18 @@ namespace llarp
Restart();
void
HandlePacket(SockAddr resolver, SockAddr from, Buffer_t buf);
HandlePacket(const SockAddr& resolver, const SockAddr& from, llarp_buffer_t buf);
bool
ShouldHandlePacket(SockAddr to, SockAddr from, Buffer_t buf) const;
ShouldHandlePacket(const SockAddr& to, const SockAddr& from, llarp_buffer_t buf) const;
protected:
virtual void
SendServerMessageBufferTo(SockAddr from, SockAddr to, Buffer_t buf) = 0;
SendServerMessageBufferTo(const SockAddr& from, const SockAddr& to, llarp_buffer_t buf) = 0;
private:
void
HandleUpstreamFailure(SockAddr from, SockAddr to, Message msg);
HandleUpstreamFailure(const SockAddr& from, const SockAddr& to, Message msg);
bool
SetupUnboundResolver(std::vector<IpAddress> resolvers);
@ -68,27 +70,24 @@ namespace llarp
Logic_ptr m_Logic;
};
struct Proxy : public PacketHandler
// Proxying DNS handler that listens on a UDP port for proper DNS requests.
class Proxy : public PacketHandler
{
public:
using Logic_ptr = std::shared_ptr<Logic>;
explicit Proxy(llarp_ev_loop_ptr loop, Logic_ptr logic, IQueryHandler* handler);
explicit Proxy(EventLoop_ptr loop, Logic_ptr logic, IQueryHandler* handler);
bool
Start(SockAddr localaddr, std::vector<IpAddress> resolvers) override;
using Buffer_t = std::vector<uint8_t>;
protected:
void
SendServerMessageBufferTo(SockAddr from, SockAddr to, Buffer_t buf) override;
private:
static void
HandleUDP(llarp_udp_io*, const SockAddr&, ManagedBuffer);
SendServerMessageBufferTo(
const SockAddr& from, const SockAddr& to, llarp_buffer_t buf) override;
private:
llarp_udp_io m_Server;
llarp_ev_loop_ptr m_Loop;
std::shared_ptr<UDPHandle> m_Server;
EventLoop_ptr m_Loop;
};
} // namespace dns
} // namespace llarp

@ -39,12 +39,8 @@ namespace llarp::dns
std::shared_ptr<Logic> logic, ReplyFunction reply, FailFunction fail)
: unboundContext(nullptr)
, started(false)
, replyFunc([logic, reply](auto res, auto source, auto buf) {
LogicCall(logic, [source, buf, reply, res]() { reply(res, source, buf); });
})
, failFunc([logic, fail](auto res, auto source, auto message) {
LogicCall(logic, [source, message, res, fail]() { fail(res, source, message); });
})
, replyFunc(logic->make_caller(std::move(reply)))
, failFunc(logic->make_caller(std::move(fail)))
{}
// static callback
@ -65,8 +61,8 @@ namespace llarp::dns
ub_resolve_free(result);
return;
}
std::vector<byte_t> pkt(result->answer_len);
std::copy_n(static_cast<byte_t*>(result->answer_packet), pkt.size(), pkt.data());
OwnedBuffer pkt{(size_t)result->answer_len};
std::memcpy(pkt.buf.get(), result->answer_packet, pkt.sz);
llarp_buffer_t buf(pkt);
MessageHeader hdr;

@ -18,8 +18,9 @@
namespace llarp::dns
{
using ReplyFunction =
std::function<void(SockAddr resolver, SockAddr source, std::vector<byte_t> buf)>;
using FailFunction = std::function<void(SockAddr resolver, SockAddr source, Message msg)>;
std::function<void(const SockAddr& resolver, const SockAddr& source, OwnedBuffer buf)>;
using FailFunction =
std::function<void(const SockAddr& resolver, const SockAddr& source, Message msg)>;
class UnboundResolver : public std::enable_shared_from_this<UnboundResolver>
{

@ -1,4 +1,4 @@
#include <ev/ev.h>
#include <ev/ev.hpp>
#include <util/mem.hpp>
#include <util/str.hpp>
#include <util/thread/logic.hpp>
@ -10,64 +10,20 @@
// We libuv now
#include <ev/ev_libuv.hpp>
llarp_ev_loop_ptr
llarp_make_ev_loop(size_t queueLength)
namespace llarp
{
llarp_ev_loop_ptr r = std::make_shared<libuv::Loop>(queueLength);
r->init();
r->update_time();
return r;
}
void
llarp_ev_loop_run_single_process(llarp_ev_loop_ptr ev, std::shared_ptr<llarp::Logic> logic)
{
if (ev == nullptr or logic == nullptr)
return;
ev->run();
logic->clear_event_loop();
ev->stopped();
}
int
llarp_ev_add_udp(const llarp_ev_loop_ptr& ev, struct llarp_udp_io* udp, const llarp::SockAddr& src)
{
if (ev == nullptr or udp == nullptr)
EventLoop_ptr
EventLoop::create(size_t queueLength)
{
llarp::LogError("Attempting llarp_ev_add_udp() with null event loop or udp io struct.");
return -1;
return std::make_shared<llarp::uv::Loop>(queueLength);
}
udp->parent = ev.get();
if (ev->udp_listen(udp, src))
return 0;
llarp::LogError("llarp_ev_add_udp() call to udp_listen failed.");
return -1;
}
int
llarp_ev_close_udp(struct llarp_udp_io* udp)
{
if (udp->parent->udp_close(udp))
return 0;
return -1;
}
llarp_time_t
llarp_ev_loop_time_now_ms(const llarp_ev_loop_ptr& loop)
{
if (loop)
return loop->time_now();
return llarp::time_now_ms();
}
void
llarp_ev_loop_stop(const llarp_ev_loop_ptr& loop)
{
loop->stop();
}
void
EventLoop::run(Logic& logic)
{
run_loop();
logic.clear_event_loop();
stopped();
}
int
llarp_ev_udp_sendto(struct llarp_udp_io* udp, const llarp::SockAddr& to, const llarp_buffer_t& buf)
{
return udp->sendto(udp, to, buf.base, buf.sz);
}
} // namespace llarp

@ -1,88 +0,0 @@
#ifndef LLARP_EV_H
#define LLARP_EV_H
#include <net/ip_address.hpp>
#include <util/buffer.hpp>
#include <util/time.hpp>
#ifdef _WIN32
#include <winsock2.h>
#include <ws2tcpip.h>
#include <wspiapi.h>
#else
#include <netinet/in.h>
#include <sys/socket.h>
#endif
#include <net/net_if.hpp>
#include <memory>
#include <cstdint>
#include <cstdlib>
#include <constants/evloop.hpp>
/**
* ev.h
*
* event handler (cross platform high performance event system for IO)
*/
#define EV_TICK_INTERVAL 10
namespace llarp
{
class Logic;
struct EventLoop;
} // namespace llarp
using llarp_ev_loop_ptr = std::shared_ptr<llarp::EventLoop>;
/// make an event loop using our baked in event loop on Windows
/// make an event loop using libuv otherwise.
/// @param queue_size how big the logic job queue is
llarp_ev_loop_ptr
llarp_make_ev_loop(std::size_t queue_size = llarp::event_loop_queue_size);
// run mainloop
void
llarp_ev_loop_run_single_process(llarp_ev_loop_ptr ev, std::shared_ptr<llarp::Logic> logic);
/// get the current time on the event loop
llarp_time_t
llarp_ev_loop_time_now_ms(const llarp_ev_loop_ptr& ev);
/// stop event loop and wait for it to complete all jobs
void
llarp_ev_loop_stop(const llarp_ev_loop_ptr& ev);
/// UDP handling configuration
struct llarp_udp_io
{
/// set after added
int fd;
void* user;
void* impl;
llarp::EventLoop* parent;
/// called every event loop tick after reads
void (*tick)(struct llarp_udp_io*);
void (*recvfrom)(struct llarp_udp_io*, const llarp::SockAddr& source, ManagedBuffer);
/// set by parent
int (*sendto)(struct llarp_udp_io*, const llarp::SockAddr&, const byte_t*, size_t);
};
/// add UDP handler
int
llarp_ev_add_udp(const llarp_ev_loop_ptr& ev, struct llarp_udp_io* udp, const llarp::SockAddr& src);
/// send a UDP packet
int
llarp_ev_udp_sendto(struct llarp_udp_io* udp, const llarp::SockAddr& to, const llarp_buffer_t& pkt);
/// close UDP handler
int
llarp_ev_close_udp(struct llarp_udp_io* udp);
#endif

@ -1,12 +1,10 @@
#ifndef LLARP_EV_HPP
#define LLARP_EV_HPP
#include <net/ip_address.hpp>
#include <net/ip_packet.hpp>
#include <ev/ev.h>
#include <util/buffer.hpp>
#include <util/codel.hpp>
#include <util/time.hpp>
#include <util/thread/threading.hpp>
#include <constants/evloop.hpp>
// writev
#ifndef _WIN32
@ -54,49 +52,68 @@ struct llarp_ev_pkt_pipe;
namespace llarp
{
class Logic;
struct SockAddr;
struct UDPHandle;
namespace vpn
{
class NetworkInterface;
}
/// distinct event loop waker uper
class EventLoopWakeup
namespace net
{
protected:
std::function<void()> callback;
struct IPPacket;
}
/// distinct event loop waker upper; used to idempotently schedule a task on the next event loop
///
/// Created via EventLoop::make_waker(...).
class EventLoopWakeup
{
public:
EventLoopWakeup(std::function<void()> cb) : callback{cb}
{}
/// Destructor: remove the task from the event loop task. (Note that destruction here only
/// initiates removal of the task from the underlying event loop: it is *possible* for the
/// callback to fire again if already triggered depending on the underlying implementation).
virtual ~EventLoopWakeup() = default;
/// async wakeup and call callback once
/// trigger this task to run on the next event loop iteration; does nothing if already
/// triggered.
virtual void
Wakeup() = 0;
Trigger() = 0;
};
/// end operation
/// holds a repeated task on the event loop; the task is removed on destruction
class EventLoopRepeater
{
public:
// Destructor: if the task has been started then it is removed from the event loop. Note
// that it is possible for a task to fire *after* destruction of this container;
// destruction only initiates removal of the periodic task.
virtual ~EventLoopRepeater() = default;
// Starts the repeater to call `task` every `every` period.
virtual void
End() = 0;
start(llarp_time_t every, std::function<void()> task) = 0;
};
// this (nearly!) abstract base class
// is overriden for each platform
struct EventLoop
// : std::enable_shared_from_this<EventLoop> // FIXME: do I actually need shared_from_this()?
{
virtual bool
init() = 0;
// Runs the event loop. This does not return until sometime after `stop()` is called (and so
// typically you want to run this in its own thread).
void
run(Logic& logic);
virtual int
run() = 0;
// Actually runs the underlying implementation event loop; called by run().
virtual void
run_loop() = 0;
virtual bool
running() const = 0;
virtual void
update_time()
{}
virtual llarp_time_t
time_now() const
{
@ -104,13 +121,13 @@ namespace llarp
}
virtual void
stopped(){};
virtual uint32_t
call_after_delay(llarp_time_t delay_ms, std::function<void(void)> callback) = 0;
stopped()
{}
// Adds a timer to the event loop; should only be called from the logic thread (and so is
// typically scheduled via a call to Logic::call_later()).
virtual void
cancel_delayed_call(uint32_t call_id) = 0;
call_after_delay(llarp_time_t delay_ms, std::function<void(void)> callback) = 0;
virtual bool
add_network_interface(
@ -123,14 +140,15 @@ namespace llarp
virtual void
stop() = 0;
virtual bool
udp_listen(llarp_udp_io* l, const llarp::SockAddr& src) = 0;
using UDPReceiveFunc = std::function<void(UDPHandle&, SockAddr src, llarp::OwnedBuffer buf)>;
virtual bool
udp_close(llarp_udp_io* l) = 0;
// Constructs a UDP socket that can be used for sending and/or receiving
virtual std::shared_ptr<UDPHandle>
udp(UDPReceiveFunc on_recv) = 0;
/// give this event loop a logic thread for calling
virtual void set_logic(std::shared_ptr<llarp::Logic>) = 0;
virtual void
set_logic(const std::shared_ptr<llarp::Logic>& logic) = 0;
virtual ~EventLoop() = default;
@ -141,15 +159,28 @@ namespace llarp
virtual void
set_pump_function(std::function<void(void)> pumpll) = 0;
virtual void
register_poll_fd_readable(int fd, std::function<void(void)> callback) = 0;
/// Make a thread-safe event loop waker (an "async" in libuv terminology) on this event loop;
/// you can call `->Trigger()` on the returned shared pointer to fire the callback at the next
/// available event loop iteration. (Multiple Trigger calls invoked before the call is actually
/// made are coalesced into one call).
virtual std::shared_ptr<EventLoopWakeup>
make_waker(std::function<void()> callback) = 0;
virtual void
deregister_poll_fd_readable(int fd) = 0;
// Initializes a new repeated task object. Note that the task is not actually added to the event
// loop until you call start() on the returned object. Typically invoked via Logic::call_every.
virtual std::shared_ptr<EventLoopRepeater>
make_repeater() = 0;
// Constructs and initializes a new default (libuv) event loop
static std::shared_ptr<EventLoop>
create(size_t queueLength = event_loop_queue_size);
/// make an event loop waker on this event loop
virtual EventLoopWakeup*
make_event_loop_waker(std::function<void()> callback) = 0;
// Returns true if called from within the event loop thread, false otherwise.
virtual bool
inEventLoopThread() const = 0;
};
using EventLoop_ptr = std::shared_ptr<EventLoop>;
} // namespace llarp
#endif

@ -1,322 +1,82 @@
#include <ev/ev_libuv.hpp>
#include <ev/vpn.hpp>
#include <memory>
#include <thread>
#include <type_traits>
#include <util/thread/logic.hpp>
#include <util/thread/queue.hpp>
#include <cstring>
#include "ev/ev.hpp"
namespace libuv
{
#define LoopCall(h, ...) \
{ \
auto __f = __VA_ARGS__; \
__f(); \
}
#include <uvw.hpp>
struct glue
{
virtual ~glue() = default;
virtual void
Close() = 0;
};
class UVWakeup final : public llarp::EventLoopWakeup, public glue
namespace llarp::uv
{
class UVWakeup final : public EventLoopWakeup
{
uv_async_t m_Impl;
const int m_Idx;
static void
OnWake(uv_async_t* self)
{
static_cast<UVWakeup*>(self->data)->callback();
}
std::shared_ptr<uvw::AsyncHandle> async;
public:
UVWakeup(uv_loop_t* loop, std::function<void()> hook, int idx)
: llarp::EventLoopWakeup{hook}, m_Idx{idx}
UVWakeup(uvw::Loop& loop, std::function<void()> callback)
: async{loop.resource<uvw::AsyncHandle>()}
{
uv_async_init(loop, &m_Impl, OnWake);
m_Impl.data = this;
async->on<uvw::AsyncEvent>([f = std::move(callback)](auto&, auto&) { f(); });
}
~UVWakeup() = default;
void
Close() override
Trigger() override
{
uv_close((uv_handle_t*)&m_Impl, [](uv_handle_t* h) {
auto loop = static_cast<libuv::Loop*>(h->loop->data);
loop->delete_waker(static_cast<UVWakeup*>(h->data)->m_Idx);
});
async->send();
}
void
End() override
~UVWakeup() override
{
Close();
}
void
Wakeup() override
{
uv_async_send(&m_Impl);
async->close();
}
};
struct ticker_glue : public glue
class UVRepeater final : public EventLoopRepeater
{
std::function<void(void)> func;
ticker_glue(uv_loop_t* loop, std::function<void(void)> tick) : func(tick)
{
m_Ticker.data = this;
uv_check_init(loop, &m_Ticker);
}
std::shared_ptr<uvw::TimerHandle> timer;
static void
OnTick(uv_check_t* t)
{
llarp::LogTrace("ticker_glue::OnTick() start");
ticker_glue* ticker = static_cast<ticker_glue*>(t->data);
ticker->func();
// Loop* loop = static_cast<Loop*>(t->loop->data);
// loop->FlushLogic();
llarp::LogTrace("ticker_glue::OnTick() end");
}
public:
UVRepeater(uvw::Loop& loop) : timer{loop.resource<uvw::TimerHandle>()}
{}
bool
Start()
void
start(llarp_time_t every, std::function<void()> task) override
{
return uv_check_start(&m_Ticker, &OnTick) != -1;
timer->start(every, every);
timer->on<uvw::TimerEvent>([task = std::move(task)](auto&, auto&) { task(); });
}
void
Close() override
~UVRepeater() override
{
uv_check_stop(&m_Ticker);
uv_close((uv_handle_t*)&m_Ticker, [](auto h) {
ticker_glue* self = (ticker_glue*)h->data;
h->data = nullptr;
delete self;
});
timer->stop();
}
uv_check_t m_Ticker;
};
struct udp_glue : public glue
struct UDPHandle final : llarp::UDPHandle
{
uv_udp_t m_Handle;
uv_check_t m_Ticker;
llarp_udp_io* const m_UDP;
llarp::SockAddr m_Addr;
std::vector<char> m_Buffer;
udp_glue(uv_loop_t* loop, llarp_udp_io* udp, const llarp::SockAddr& src)
: m_UDP(udp), m_Addr(src)
{
m_Handle.data = this;
m_Ticker.data = this;
uv_udp_init(loop, &m_Handle);
uv_check_init(loop, &m_Ticker);
}
static void
Alloc(uv_handle_t* h, size_t suggested_size, uv_buf_t* buf)
{
udp_glue* self = static_cast<udp_glue*>(h->data);
if (self->m_Buffer.empty())
self->m_Buffer.resize(suggested_size);
buf->base = self->m_Buffer.data();
buf->len = self->m_Buffer.size();
}
/// callback for libuv
static void
OnRecv(uv_udp_t* handle, ssize_t nread, const uv_buf_t* buf, const sockaddr* addr, unsigned)
{
udp_glue* glue = static_cast<udp_glue*>(handle->data);
if (addr)
glue->RecvFrom(nread, buf, llarp::SockAddr(*addr));
}
void
RecvFrom(ssize_t sz, const uv_buf_t* buf, const llarp::SockAddr& fromaddr)
{
if (sz > 0 && m_UDP)
{
const size_t pktsz = sz;
if (m_UDP->recvfrom)
{
const llarp_buffer_t pkt((const byte_t*)buf->base, pktsz);
m_UDP->recvfrom(m_UDP, fromaddr, ManagedBuffer{pkt});
}
}
}
static void
OnTick(uv_check_t* t)
{
llarp::LogTrace("udp_glue::OnTick() start");
udp_glue* udp = static_cast<udp_glue*>(t->data);
udp->Tick();
llarp::LogTrace("udp_glue::OnTick() end");
}
void
Tick()
{
if (m_UDP && m_UDP->tick)
m_UDP->tick(m_UDP);
}
static int
SendTo(llarp_udp_io* udp, const llarp::SockAddr& to, const byte_t* ptr, size_t sz)
{
auto* self = static_cast<udp_glue*>(udp->impl);
if (self == nullptr)
return -1;
const auto buf = uv_buf_init((char*)ptr, sz);
return uv_udp_try_send(
&self->m_Handle, &buf, 1, (const sockaddr*)static_cast<const sockaddr_in*>(to));
}
UDPHandle(uvw::Loop& loop, ReceiveFunc rf);
bool
Bind()
{
auto ret =
uv_udp_bind(&m_Handle, (const sockaddr*)static_cast<const sockaddr_in*>(m_Addr), 0);
if (ret)
{
llarp::LogError("failed to bind to ", m_Addr, " ", uv_strerror(ret));
return false;
}
if (uv_udp_recv_start(&m_Handle, &Alloc, &OnRecv))
{
llarp::LogError("failed to start recving packets via ", m_Addr);
return false;
}
if (uv_check_start(&m_Ticker, &OnTick))
{
llarp::LogError("failed to start ticker");
return false;
}
#if defined(_WIN32) || defined(_WIN64)
#else
if (uv_fileno((const uv_handle_t*)&m_Handle, &m_UDP->fd))
return false;
#endif
m_UDP->sendto = &SendTo;
m_UDP->impl = this;
return true;
}
listen(const SockAddr& addr) override;
static void
OnClosed(uv_handle_t* h)
{
auto* glue = static_cast<udp_glue*>(h->data);
if (glue)
{
h->data = nullptr;
delete glue;
}
}
bool
send(const SockAddr& dest, const llarp_buffer_t& buf) override;
void
Close() override
{
m_UDP->impl = nullptr;
uv_check_stop(&m_Ticker);
uv_close((uv_handle_t*)&m_Handle, &OnClosed);
}
};
struct tun_glue : public glue
{
uv_poll_t m_Handle;
uv_check_t m_Ticker;
std::shared_ptr<llarp::vpn::NetworkInterface> m_NetIf;
std::function<void(llarp::net::IPPacket)> m_Handler;
tun_glue(
std::shared_ptr<llarp::vpn::NetworkInterface> netif,
std::function<void(llarp::net::IPPacket)> handler)
: m_NetIf{std::move(netif)}, m_Handler{std::move(handler)}
{
m_Handle.data = this;
m_Ticker.data = this;
}
static void
OnTick(uv_check_t* h)
{
auto self = static_cast<tun_glue*>(h->data);
while (self->m_NetIf->HasNextPacket())
self->Read();
}
static void
OnPoll(uv_poll_t* h, int, int events)
{
if (events & UV_READABLE)
{
static_cast<tun_glue*>(h->data)->Read();
}
}
close() override;
void
Read()
{
auto pkt = m_NetIf->ReadNextPacket();
LogDebug("got packet ", pkt.sz);
if (m_Handler)
m_Handler(std::move(pkt));
}
~UDPHandle() override;
static void
OnClosed(uv_handle_t* h)
{
auto* self = static_cast<tun_glue*>(h->data);
if (self)
{
h->data = nullptr;
delete self;
}
}
private:
std::shared_ptr<uvw::UDPHandle> handle;
void
Close() override
{
uv_check_stop(&m_Ticker);
#ifndef _WIN32
uv_close((uv_handle_t*)&m_Handle, &OnClosed);
#endif
}
bool
Init(uv_loop_t* loop)
{
if (uv_check_init(loop, &m_Ticker) == -1)
{
return false;
}
if (uv_check_start(&m_Ticker, &OnTick) == -1)
{
return false;
}
#ifndef _WIN32
if (uv_poll_init(loop, &m_Handle, m_NetIf->PollFD()) == -1)
{
llarp::LogError("failed to initialize polling on ", m_NetIf->IfName());
return false;
}
if (uv_poll_start(&m_Handle, UV_READABLE, &OnPoll))
{
llarp::LogError("failed to start polling on ", m_NetIf->IfName());
return false;
}
#endif
return true;
}
reset_handle(uvw::Loop& loop);
};
void
@ -331,63 +91,36 @@ namespace libuv
llarp::LogTrace("Loop::FlushLogic() end");
}
static void
OnAsyncWake(uv_async_t* async_handle)
void
Loop::tick_event_loop()
{
llarp::LogTrace("OnAsyncWake, ticking event loop.");
Loop* loop = static_cast<Loop*>(async_handle->data);
loop->update_time();
loop->process_timer_queue();
loop->process_cancel_queue();
loop->FlushLogic();
loop->PumpLL();
llarp::LogTrace("ticking event loop.");
FlushLogic();
PumpLL();
auto& log = llarp::LogContext::Instance();
if (log.logStream)
log.logStream->Tick(loop->time_now());
log.logStream->Tick(time_now());
}
constexpr size_t TimerQueueSize = 20;
Loop::Loop(size_t queue_size)
: llarp::EventLoop{}
, PumpLL{[]() {}}
, m_LogicCalls{queue_size}
, m_timerQueue{TimerQueueSize}
, m_timerCancelQueue{TimerQueueSize}
{}
bool
Loop::init()
Loop::Loop(size_t queue_size) : llarp::EventLoop{}, PumpLL{[] {}}, m_LogicCalls{queue_size}
{
if (uv_loop_init(&m_Impl) == -1)
return false;
if (!(m_Impl = uvw::Loop::create()))
throw std::runtime_error{"Failed to construct libuv loop"};
#ifdef LOKINET_DEBUG
last_time = 0;
loop_run_count = 0;
#endif
m_Impl.data = this;
#if defined(_WIN32) || defined(_WIN64)
#else
uv_loop_configure(&m_Impl, UV_LOOP_BLOCK_SIGNAL, SIGPIPE);
#ifndef _WIN32
signal(SIGPIPE, SIG_IGN);
#endif
m_TickTimer = new uv_timer_t;
m_TickTimer->data = this;
if (uv_timer_init(&m_Impl, m_TickTimer) == -1)
return false;
m_Run.store(true);
m_nextID.store(0);
m_WakeUp.data = this;
uv_async_init(&m_Impl, &m_WakeUp, &OnAsyncWake);
return true;
}
void
Loop::update_time()
{
llarp::EventLoop::update_time();
uv_update_time(&m_Impl);
if (!(m_WakeUp = m_Impl->resource<uvw::AsyncHandle>()))
throw std::runtime_error{"Failed to create libuv async"};
m_WakeUp->on<uvw::AsyncEvent>([this](const auto&, auto&) { tick_event_loop(); });
}
bool
@ -396,18 +129,12 @@ namespace libuv
return m_Run.load();
}
static void
OnTickTimeout(uv_timer_t* timer)
{
uv_stop(timer->loop);
}
int
Loop::run()
void
Loop::run_loop()
{
llarp::LogTrace("Loop::run()");
llarp::LogTrace("Loop::run_loop()");
m_EventLoopThreadID = std::this_thread::get_id();
return uv_run(&m_Impl, UV_RUN_DEFAULT);
m_Impl->run();
}
void
@ -416,98 +143,30 @@ namespace libuv
PumpLL = std::move(pump);
}
struct TimerData
std::shared_ptr<llarp::UDPHandle>
Loop::udp(UDPReceiveFunc on_recv)
{
Loop* loop;
uint64_t job_id;
};
void
CloseUVTimer(uv_timer_t* timer)
{
// have to delete timer handle this way because libuv.
uv_timer_stop(timer);
uv_close((uv_handle_t*)timer, [](uv_handle_t* handle) { delete (uv_timer_t*)handle; });
}
static void
OnUVTimer(uv_timer_t* timer)
{
TimerData* timer_data = static_cast<TimerData*>(timer->data);
Loop* loop = timer_data->loop;
loop->do_timer_job(timer_data->job_id);
delete timer_data;
CloseUVTimer(timer);
return std::static_pointer_cast<llarp::UDPHandle>(
std::make_shared<llarp::uv::UDPHandle>(*m_Impl, std::move(on_recv)));
}
uint32_t
// TODO: replace this one-shot timer mechanism with a repeated timer, because most likely
// everything using this is repeating scheduling itself all the time and would be better served by
// a repeating uv_timer.
void
Loop::call_after_delay(llarp_time_t delay_ms, std::function<void(void)> callback)
{
llarp::LogTrace("Loop::call_after_delay()");
#ifdef TESTNET_SPEED
delay_ms *= TESTNET_SPEED;
#endif
PendingTimer timer;
timer.delay_ms = delay_ms;
timer.callback = callback;
timer.job_id = m_nextID++;
uint64_t job_id = timer.job_id;
m_timerQueue.pushBack(std::move(timer));
uv_async_send(&m_WakeUp);
return job_id;
}
void
Loop::cancel_delayed_call(uint32_t job_id)
{
m_timerCancelQueue.pushBack(job_id);
uv_async_send(&m_WakeUp);
}
void
Loop::process_timer_queue()
{
while (not m_timerQueue.empty())
{
PendingTimer job = m_timerQueue.popFront();
uint64_t job_id = job.job_id;
m_pendingCalls.emplace(job_id, std::move(job.callback));
TimerData* timer_data = new TimerData;
timer_data->loop = this;
timer_data->job_id = job_id;
uv_timer_t* newTimer = new uv_timer_t;
newTimer->data = (void*)timer_data;
uv_timer_init(&m_Impl, newTimer);
uv_timer_start(newTimer, &OnUVTimer, job.delay_ms.count(), 0);
}
}
void
Loop::process_cancel_queue()
{
while (not m_timerCancelQueue.empty())
{
uint64_t job_id = m_timerCancelQueue.popFront();
m_pendingCalls.erase(job_id);
}
}
void
Loop::do_timer_job(uint64_t job_id)
{
auto itr = m_pendingCalls.find(job_id);
if (itr != m_pendingCalls.end())
{
if (itr->second)
itr->second();
m_pendingCalls.erase(itr->first);
}
auto timer = m_Impl->resource<uvw::TimerHandle>();
timer->on<uvw::TimerEvent>([f = std::move(callback)](const auto&, auto& timer) {
f();
timer.stop();
timer.close();
});
timer->start(delay_ms, 0ms);
}
void
@ -516,61 +175,34 @@ namespace libuv
if (m_Run)
{
llarp::LogInfo("stopping event loop");
CloseAll();
uv_stop(&m_Impl);
m_Impl->walk([](auto&& handle) {
if constexpr (!std::is_pointer_v<std::remove_reference_t<decltype(handle)>>)
handle.close();
});
llarp::LogDebug("Closed all handles, stopping the loop");
m_Impl->stop();
}
m_Run.store(false);
}
void
Loop::CloseAll()
{
llarp::LogInfo("Closing all handles");
uv_walk(
&m_Impl,
[](uv_handle_t* h, void*) {
if (uv_is_closing(h))
return;
if (h->data && uv_is_active(h) && h->type != UV_TIMER && h->type != UV_POLL)
{
auto glue = reinterpret_cast<libuv::glue*>(h->data);
if (glue)
glue->Close();
}
},
nullptr);
}
void
Loop::stopped()
{
llarp::LogInfo("we have stopped");
}
bool
Loop::udp_listen(llarp_udp_io* udp, const llarp::SockAddr& src)
{
auto* impl = new udp_glue(&m_Impl, udp, src);
udp->impl = impl;
if (impl->Bind())
if (m_Impl)
{
return true;
m_Impl->close();
m_Impl.reset();
}
llarp::LogError("Loop::udp_listen failed to bind");
delete impl;
return false;
llarp::LogInfo("we have stopped");
}
bool
Loop::add_ticker(std::function<void(void)> func)
{
auto* ticker = new ticker_glue(&m_Impl, func);
if (ticker->Start())
{
return true;
}
delete ticker;
return false;
auto check = m_Impl->resource<uvw::CheckHandle>();
check->on<uvw::CheckEvent>([f = std::move(func)](auto&, auto&) { f(); });
check->start();
return true;
}
bool
@ -578,23 +210,32 @@ namespace libuv
std::shared_ptr<llarp::vpn::NetworkInterface> netif,
std::function<void(llarp::net::IPPacket)> handler)
{
auto* glue = new tun_glue(netif, handler);
// call to Init gives ownership of glue to event loop
if (glue->Init(&m_Impl))
return true;
delete glue;
return false;
}
bool
Loop::udp_close(llarp_udp_io* udp)
{
if (udp == nullptr)
return false;
auto* glue = static_cast<udp_glue*>(udp->impl);
if (glue == nullptr)
#ifndef _WIN32
using event_t = uvw::PollEvent;
auto handle = m_Impl->resource<uvw::PollHandle>(netif->PollFD());
#else
using event_t = uvw::CheckEvent;
auto handle = m_Impl->resource<uvw::CheckHandle>();
#endif
if (!handle)
return false;
glue->Close();
handle->on<event_t>([netif = std::move(netif), handler = std::move(handler)](
const event_t&, [[maybe_unused]] auto& handle) {
for (auto pkt = netif->ReadNextPacket(); pkt.sz > 0; pkt = netif->ReadNextPacket())
{
LogDebug("got packet ", pkt.sz);
if (handler)
handler(std::move(pkt));
}
});
#ifndef _WIN32
handle->start(uvw::PollHandle::Event::READABLE);
#else
handle->start();
#endif
return true;
}
@ -604,80 +245,97 @@ namespace libuv
if (not m_EventLoopThreadID.has_value())
{
m_LogicCalls.tryPushBack(f);
uv_async_send(&m_WakeUp);
m_WakeUp->send();
return;
}
const auto inEventLoop = *m_EventLoopThreadID == std::this_thread::get_id();
const bool inEventLoop = *m_EventLoopThreadID == std::this_thread::get_id();
if (inEventLoop and m_LogicCalls.full())
{
FlushLogic();
}
m_LogicCalls.pushBack(f);
uv_async_send(&m_WakeUp);
m_WakeUp->send();
}
// Sets `handle` to a new uvw UDP handle, first initiating a close and then disowning the handle
// if already set, allocating the resource, and setting the receive event on it.
void
OnUVPollFDReadable(uv_poll_t* handle, int status, [[maybe_unused]] int events)
UDPHandle::reset_handle(uvw::Loop& loop)
{
if (handle)
handle->close();
handle = loop.resource<uvw::UDPHandle>();
handle->on<uvw::UDPDataEvent>([this](auto& event, auto& /*handle*/) {
on_recv(
*this,
SockAddr{event.sender.ip, static_cast<uint16_t>(event.sender.port)},
OwnedBuffer{std::move(event.data), event.length});
});
}
llarp::uv::UDPHandle::UDPHandle(uvw::Loop& loop, ReceiveFunc rf) : llarp::UDPHandle{std::move(rf)}
{
if (status < 0)
return; // probably fd was closed
reset_handle(loop);
}
auto func = static_cast<libuv::Loop::Callback*>(handle->data);
bool
UDPHandle::listen(const SockAddr& addr)
{
if (handle->active())
reset_handle(handle->loop());
bool good = true;
auto err = handle->on<uvw::ErrorEvent>([&](auto& event, auto&) {
llarp::LogError("failed to bind and start receiving on ", addr, ": ", event.what());
good = false;
});
handle->bind(*static_cast<const sockaddr*>(addr));
if (good)
handle->recv();
handle->erase(err);
return good;
}
(*func)();
bool
UDPHandle::send(const SockAddr& to, const llarp_buffer_t& buf)
{
return handle->trySend(
*static_cast<const sockaddr*>(to),
const_cast<char*>(reinterpret_cast<const char*>(buf.base)),
buf.sz)
>= 0;
}
void
Loop::register_poll_fd_readable(int fd, Callback callback)
UDPHandle::close()
{
if (m_Polls.count(fd))
{
llarp::LogError(
"Attempting to create event loop poll on fd ",
fd,
", but an event loop poll for that fd already exists.");
return;
}
// new a copy as the one passed in here will go out of scope
auto function_ptr = new Callback(callback);
auto& new_poll = m_Polls[fd];
uv_poll_init(&m_Impl, &new_poll, fd);
new_poll.data = (void*)function_ptr;
uv_poll_start(&new_poll, UV_READABLE, &OnUVPollFDReadable);
handle->close();
handle.reset();
}
void
Loop::deregister_poll_fd_readable(int fd)
UDPHandle::~UDPHandle()
{
auto itr = m_Polls.find(fd);
close();
}
if (itr != m_Polls.end())
{
uv_poll_stop(&(itr->second));
auto func = static_cast<Callback*>(itr->second.data);
delete func;
m_Polls.erase(itr);
}
std::shared_ptr<llarp::EventLoopWakeup>
Loop::make_waker(std::function<void()> callback)
{
return std::static_pointer_cast<llarp::EventLoopWakeup>(
std::make_shared<UVWakeup>(*m_Impl, std::move(callback)));
}
llarp::EventLoopWakeup*
Loop::make_event_loop_waker(std::function<void()> callback)
std::shared_ptr<EventLoopRepeater>
Loop::make_repeater()
{
auto wake_idx = m_NumWakers++;
auto wake = new UVWakeup{&m_Impl, callback, wake_idx};
m_Wakers[wake_idx] = wake;
return wake;
return std::static_pointer_cast<EventLoopRepeater>(std::make_shared<UVRepeater>(*m_Impl));
}
void
Loop::delete_waker(int idx)
bool
Loop::inEventLoopThread() const
{
delete m_Wakers[idx];
m_Wakers.erase(idx);
return m_EventLoopThreadID and *m_EventLoopThreadID == std::this_thread::get_id();
}
} // namespace libuv
} // namespace llarp::uv

@ -1,58 +1,42 @@
#ifndef LLARP_EV_LIBUV_HPP
#define LLARP_EV_LIBUV_HPP
#include <ev/ev.hpp>
#include <uv.h>
#include <vector>
#include <functional>
#include "udp_handle.hpp"
#include <util/thread/logic.hpp>
#include <util/thread/queue.hpp>
#include <util/meta/memfn.hpp>
#include <uvw/loop.h>
#include <uvw/async.h>
#include <uvw/poll.h>
#include <uvw/udp.h>
#include <functional>
#include <map>
#include <vector>
namespace libuv
namespace llarp::uv
{
class UVWakeup;
class UVRepeater;
struct Loop final : public llarp::EventLoop
{
typedef std::function<void(void)> Callback;
struct PendingTimer
{
uint64_t job_id;
llarp_time_t delay_ms;
Callback callback;
};
using Callback = std::function<void()>;
Loop(size_t queue_size);
bool
init() override;
int
run() override;
void
run_loop() override;
bool
running() const override;
void
update_time() override;
uint32_t
call_after_delay(llarp_time_t delay_ms, std::function<void(void)> callback) override;
void
cancel_delayed_call(uint32_t job_id) override;
void
process_timer_queue();
void
process_cancel_queue();
void
do_timer_job(uint64_t job_id);
tick_event_loop();
void
stop() override;
@ -60,15 +44,6 @@ namespace libuv
void
stopped() override;
void
CloseAll();
bool
udp_listen(llarp_udp_io* l, const llarp::SockAddr& src) override;
bool
udp_close(llarp_udp_io* l) override;
bool
add_ticker(std::function<void(void)> ticker) override;
@ -78,41 +53,37 @@ namespace libuv
std::function<void(llarp::net::IPPacket)> handler) override;
void
set_logic(std::shared_ptr<llarp::Logic> l) override
set_logic(const std::shared_ptr<llarp::Logic>& l) override
{
m_Logic = l;
m_Logic->SetQueuer(llarp::util::memFn(&Loop::call_soon, this));
l->SetQueuer([this](std::function<void()> f) { call_soon(std::move(f)); });
}
std::shared_ptr<llarp::Logic> m_Logic;
void
call_soon(std::function<void(void)> f) override;
void
register_poll_fd_readable(int fd, Callback callback) override;
void
deregister_poll_fd_readable(int fd) override;
void
set_pump_function(std::function<void(void)> pumpll) override;
llarp::EventLoopWakeup*
make_event_loop_waker(std::function<void()> callback) override;
std::shared_ptr<llarp::EventLoopWakeup>
make_waker(std::function<void()> callback) override;
void
delete_waker(int idx);
std::shared_ptr<EventLoopRepeater>
make_repeater() override;
std::shared_ptr<llarp::UDPHandle>
udp(UDPReceiveFunc on_recv) override;
void
FlushLogic();
std::function<void(void)> PumpLL;
bool
inEventLoopThread() const override;
private:
uv_loop_t m_Impl;
uv_timer_t* m_TickTimer;
uv_async_t m_WakeUp;
std::shared_ptr<uvw::Loop> m_Impl;
std::shared_ptr<uvw::AsyncHandle> m_WakeUp;
std::atomic<bool> m_Run;
using AtomicQueue_t = llarp::thread::Queue<std::function<void(void)>>;
AtomicQueue_t m_LogicCalls;
@ -125,16 +96,11 @@ namespace libuv
std::map<uint32_t, Callback> m_pendingCalls;
std::unordered_map<int, uv_poll_t> m_Polls;
std::unordered_map<int, std::shared_ptr<uvw::PollHandle>> m_Polls;
llarp::thread::Queue<PendingTimer> m_timerQueue;
llarp::thread::Queue<uint32_t> m_timerCancelQueue;
std::optional<std::thread::id> m_EventLoopThreadID;
int m_NumWakers;
std::unordered_map<int, UVWakeup*> m_Wakers;
};
} // namespace libuv
} // namespace llarp::uv
#endif

@ -0,0 +1,41 @@
#include "ev.hpp"
#include "../util/buffer.hpp"
namespace llarp
{
// Base type for UDP handling; constructed via EventLoop::udp().
struct UDPHandle
{
using ReceiveFunc = EventLoop::UDPReceiveFunc;
// Starts listening for incoming UDP packets on the given address. Returns true on success,
// false if the address could not be bound. If you send without calling this first then the
// socket will bind to a random high port on 0.0.0.0 (the "all addresses" address).
virtual bool
listen(const SockAddr& addr) = 0;
// Sends a packet to the given recipient, immediately. Returns true if the send succeeded,
// false it could not be performed (either because of error, or because it would have blocked).
// If listen hasn't been called then a random IP/port will be used.
virtual bool
send(const SockAddr& dest, const llarp_buffer_t& buf) = 0;
// Closes the listening UDP socket (if opened); this is typically called (automatically) during
// destruction. Does nothing if the UDP socket is already closed.
virtual void
close() = 0;
// Base class destructor
virtual ~UDPHandle() = default;
protected:
explicit UDPHandle(ReceiveFunc on_recv) : on_recv{std::move(on_recv)}
{
// It makes no sense at all to use this with a null receive function:
assert(this->on_recv);
}
// Callback to invoke when data is received
ReceiveFunc on_recv;
};
} // namespace llarp

@ -39,26 +39,31 @@ namespace llarp
}
constexpr size_t udp_header_size = 8;
struct DnsHandler : public dns::PacketHandler
// Intercepts DNS IP packets going to an IP on the tun interface; this is currently used on
// Android where binding to a DNS port (i.e. via llarp::dns::Proxy) isn't possible because of OS
// restrictions, but a tun interface *is* available.
class DnsInterceptor : public dns::PacketHandler
{
public:
TunEndpoint* const m_Endpoint;
explicit DnsHandler(AbstractRouter* router, TunEndpoint* ep)
explicit DnsInterceptor(AbstractRouter* router, TunEndpoint* ep)
: dns::PacketHandler{router->logic(), ep}, m_Endpoint{ep} {};
void
SendServerMessageBufferTo(SockAddr from, SockAddr to, std::vector<byte_t> buf) override
SendServerMessageBufferTo(
const SockAddr& from, const SockAddr& to, llarp_buffer_t buf) override
{
net::IPPacket pkt;
if (buf.size() + 28 > sizeof(pkt.buf))
if (buf.sz + 28 > sizeof(pkt.buf))
return;
auto* hdr = pkt.Header();
pkt.buf[1] = 0;
hdr->version = 4;
hdr->ihl = 5;
hdr->tot_len = htons(buf.size() + 28);
hdr->tot_len = htons(buf.sz + 28);
hdr->protocol = 0x11; // udp
hdr->ttl = 64;
hdr->frag_off = htons(0b0100000000000000);
@ -72,11 +77,11 @@ namespace llarp
ptr += 2;
htobe16buf(ptr, to.getPort());
ptr += 2;
htobe16buf(ptr, buf.size() + udp_header_size);
htobe16buf(ptr, buf.sz + udp_header_size);
ptr += 2;
htobe16buf(ptr, uint16_t{0}); // checksum
ptr += 2;
std::copy_n(buf.data(), buf.size(), ptr);
std::copy_n(buf.base, buf.sz, ptr);
/// queue ip packet write
const IpAddress remoteIP{from};
@ -84,7 +89,7 @@ namespace llarp
hdr->check = 0;
hdr->check = net::ipchksum(pkt.buf, 20);
pkt.sz = 28 + buf.size();
pkt.sz = 28 + buf.sz;
m_Endpoint->HandleWriteIPPacket(
pkt.ConstBuffer(), net::ExpandV4(remoteIP.toIP()), net::ExpandV4(localIP.toIP()), 0);
}
@ -97,7 +102,7 @@ namespace llarp
m_PacketRouter.reset(
new vpn::PacketRouter{[&](net::IPPacket pkt) { HandleGotUserPacket(std::move(pkt)); }});
#if ANDROID
m_Resolver = std::make_shared<DnsHandler>(r, this);
m_Resolver = std::make_shared<DnsInterceptor>(r, this);
m_PacketRouter->AddUDPHandler(huint16_t{53}, [&](net::IPPacket pkt) {
const size_t ip_header_size = (pkt.Header()->ihl * 4);

@ -22,17 +22,12 @@ namespace llarp::iwp
bool allowInbound)
: ILinkLayer(
keyManager, getrc, h, sign, before, est, reneg, timeout, closed, pumpDone, worker)
, m_Wakeup{ev->make_event_loop_waker([self = this]() { self->HandleWakeupPlaintext(); })}
, m_Wakeup{ev->make_waker([this]() { HandleWakeupPlaintext(); })}
, m_PlaintextRecv{1024}
, permitInbound{allowInbound}
{}
LinkLayer::~LinkLayer()
{
m_Wakeup->End();
}
const char*
LinkLayer::Name() const
{
@ -111,7 +106,7 @@ namespace llarp::iwp
void
LinkLayer::WakeupPlaintext()
{
m_Wakeup->Wakeup();
m_Wakeup->Trigger();
}
void

@ -33,8 +33,6 @@ namespace llarp::iwp
WorkerFunc_t dowork,
bool permitInbound);
~LinkLayer() override;
std::shared_ptr<ILinkSession>
NewOutboundSession(const RouterContact& rc, const AddressInfo& ai) override;
@ -63,7 +61,7 @@ namespace llarp::iwp
void
HandleWakeupPlaintext();
EventLoopWakeup* const m_Wakeup;
const std::shared_ptr<EventLoopWakeup> m_Wakeup;
std::unordered_map<SockAddr, std::weak_ptr<Session>, SockAddr::Hash> m_PlaintextRecv;
std::unordered_map<SockAddr, RouterID, SockAddr::Hash> m_AuthedAddrs;
const bool permitInbound;

@ -1,5 +1,6 @@
#include <link/server.hpp>
#include <ev/ev.hpp>
#include <ev/udp_handle.hpp>
#include <crypto/crypto.hpp>
#include <config/key_manager.hpp>
#include <memory>
@ -125,18 +126,19 @@ namespace llarp
}
bool
ILinkLayer::Configure(llarp_ev_loop_ptr loop, const std::string& ifname, int af, uint16_t port)
ILinkLayer::Configure(EventLoop_ptr loop, const std::string& ifname, int af, uint16_t port)
{
m_Loop = loop;
m_udp.user = this;
m_udp.recvfrom = [](llarp_udp_io* udp, const llarp::SockAddr& from, ManagedBuffer pktbuf) {
ILinkSession::Packet_t pkt;
auto& buf = pktbuf.underlying;
pkt.resize(buf.sz);
std::copy_n(buf.base, buf.sz, pkt.data());
static_cast<ILinkLayer*>(udp->user)->RecvFrom(from, std::move(pkt));
};
m_udp.tick = &ILinkLayer::udp_tick;
LogError("omg wtf");
m_Loop = std::move(loop);
// using UDPReceiveFunc = std::function<void(UDPHandle&, SockAddr src, llarp::OwnedBuffer buf)>;
m_udp = m_Loop->udp(
[this]([[maybe_unused]] UDPHandle& udp, const SockAddr& from, llarp_buffer_t buf) {
ILinkSession::Packet_t pkt;
pkt.resize(buf.sz);
std::copy_n(buf.base, buf.sz, pkt.data());
RecvFrom(from, std::move(pkt));
});
if (ifname == "*")
{
if (!AllInterfaces(af, m_ourAddr))
@ -162,7 +164,11 @@ namespace llarp
}
}
m_ourAddr.setPort(port);
return llarp_ev_add_udp(m_Loop, &m_udp, m_ourAddr) != -1;
if (not m_udp->listen(m_ourAddr))
return false;
m_Loop->add_ticker([this] { Pump(); });
return true;
}
void
@ -335,15 +341,16 @@ namespace llarp
}
bool
ILinkLayer::Start(std::shared_ptr<Logic> l)
ILinkLayer::Start(const std::shared_ptr<Logic>& logic)
{
m_Logic = l;
ScheduleTick(LINK_LAYER_TICK_INTERVAL);
// Tie the lifetime of this repeater to this arbitrary shared_ptr:
m_repeater_keepalive = std::make_shared<int>(42);
logic->call_every(LINK_LAYER_TICK_INTERVAL, m_repeater_keepalive, [this] { Tick(Now()); });
return true;
}
void
ILinkLayer::Tick(llarp_time_t now)
ILinkLayer::Tick(const llarp_time_t now)
{
{
Lock_t l(m_AuthedLinksMutex);
@ -373,8 +380,7 @@ namespace llarp
void
ILinkLayer::Stop()
{
if (m_Logic && tick_id)
m_Logic->remove_call(tick_id);
m_repeater_keepalive.reset(); // make the repeater kill itself
{
Lock_t l(m_AuthedLinksMutex);
for (const auto& [router, link] : m_AuthedLinks)
@ -421,6 +427,12 @@ namespace llarp
}
}
void
ILinkLayer::SendTo_LL(const SockAddr& to, const llarp_buffer_t& pkt)
{
m_udp->send(to, pkt);
}
bool
ILinkLayer::SendTo(
const RouterID& remote, const llarp_buffer_t& buf, ILinkSession::CompletionHandler completed)
@ -479,25 +491,4 @@ namespace llarp
return true;
}
void
ILinkLayer::OnTick()
{
auto now = Now();
Tick(now);
ScheduleTick(LINK_LAYER_TICK_INTERVAL);
}
void
ILinkLayer::ScheduleTick(llarp_time_t interval)
{
tick_id = m_Logic->call_later(interval, std::bind(&ILinkLayer::OnTick, this));
}
void
ILinkLayer::udp_tick(llarp_udp_io* udp)
{
ILinkLayer* link = static_cast<ILinkLayer*>(udp->user);
link->Pump();
}
} // namespace llarp

@ -2,7 +2,7 @@
#define LLARP_LINK_SERVER_HPP
#include <crypto/types.hpp>
#include <ev/ev.h>
#include <ev/ev.hpp>
#include <link/session.hpp>
#include <net/sock_addr.hpp>
#include <router_contact.hpp>
@ -93,7 +93,7 @@ namespace llarp
llarp_time_t
Now() const
{
return llarp_ev_loop_time_now_ms(m_Loop);
return m_Loop->time_now();
}
bool
@ -106,17 +106,11 @@ namespace llarp
void
ForEachSession(std::function<void(ILinkSession*)> visit) EXCLUDES(m_AuthedLinksMutex);
static void
udp_tick(llarp_udp_io* udp);
void
SendTo_LL(const SockAddr& to, const llarp_buffer_t& pkt)
{
llarp_ev_udp_sendto(&m_udp, to, pkt);
}
SendTo_LL(const SockAddr& to, const llarp_buffer_t& pkt);
virtual bool
Configure(llarp_ev_loop_ptr loop, const std::string& ifname, int af, uint16_t port);
Configure(EventLoop_ptr loop, const std::string& ifname, int af, uint16_t port);
virtual std::shared_ptr<ILinkSession>
NewOutboundSession(const RouterContact& rc, const AddressInfo& ai) = 0;
@ -138,7 +132,7 @@ namespace llarp
TryEstablishTo(RouterContact rc);
bool
Start(std::shared_ptr<llarp::Logic> l);
Start(const std::shared_ptr<llarp::Logic>& l);
virtual void
Stop();
@ -211,12 +205,6 @@ namespace llarp
std::shared_ptr<KeyManager> keyManager;
WorkerFunc_t QueueWork;
std::shared_ptr<Logic>
logic()
{
return m_Logic;
}
bool
operator<(const ILinkLayer& other) const
{
@ -235,19 +223,7 @@ namespace llarp
return m_Pending.size();
}
int
GetUDPSocket() const
{
return m_udp.fd;
}
private:
void
OnTick();
void
ScheduleTick(llarp_time_t interval);
uint32_t tick_id;
const SecretKey& m_RouterEncSecret;
@ -262,10 +238,9 @@ namespace llarp
bool
PutSession(const std::shared_ptr<ILinkSession>& s);
std::shared_ptr<llarp::Logic> m_Logic = nullptr;
llarp_ev_loop_ptr m_Loop;
EventLoop_ptr m_Loop;
SockAddr m_ourAddr;
llarp_udp_io m_udp;
std::shared_ptr<llarp::UDPHandle> m_udp;
SecretKey m_SecretKey;
using AuthedLinks =
@ -278,6 +253,9 @@ namespace llarp
Pending m_Pending GUARDED_BY(m_PendingMutex);
std::unordered_map<SockAddr, llarp_time_t, SockAddr::Hash> m_RecentlyClosed;
private:
std::shared_ptr<int> m_repeater_keepalive;
};
using LinkLayer_ptr = std::shared_ptr<ILinkLayer>;

@ -1,7 +1,7 @@
#ifndef LLARP_IP_HPP
#define LLARP_IP_HPP
#include <ev/ev.h>
#include <ev/ev.hpp>
#include <net/net.hpp>
#include <util/buffer.hpp>
#include <util/time.hpp>
@ -109,8 +109,6 @@ struct ipv6_header
#include <service/protocol_type.hpp>
#include <utility>
struct llarp_ev_loop;
namespace llarp
{
namespace net
@ -143,25 +141,25 @@ namespace llarp
struct PutTime
{
llarp_ev_loop_ptr loop;
PutTime(llarp_ev_loop_ptr evloop) : loop(std::move(evloop))
EventLoop_ptr loop;
PutTime(EventLoop_ptr evloop) : loop(std::move(evloop))
{}
void
operator()(IPPacket& pkt) const
{
pkt.timestamp = llarp_ev_loop_time_now_ms(loop);
pkt.timestamp = loop->time_now();
}
};
struct GetNow
{
llarp_ev_loop_ptr loop;
GetNow(llarp_ev_loop_ptr evloop) : loop(std::move(evloop))
EventLoop_ptr loop;
GetNow(EventLoop_ptr evloop) : loop(std::move(evloop))
{}
llarp_time_t
operator()() const
{
return llarp_ev_loop_time_now_ms(loop);
return loop->time_now();
}
};

@ -8,7 +8,7 @@
#include <util/status.hpp>
#include <router/i_outbound_message_handler.hpp>
#include <vector>
#include <ev/ev.h>
#include <ev/ev.hpp>
#include <functional>
#include <router_contact.hpp>
#include <tooling/router_event.hpp>
@ -81,7 +81,7 @@ namespace llarp
using LMQ_ptr = std::shared_ptr<oxenmq::OxenMQ>;
struct AbstractRouter
struct AbstractRouter : public std::enable_shared_from_this<AbstractRouter>
{
#ifdef LOKINET_HIVE
tooling::RouterHive* hive = nullptr;
@ -92,22 +92,22 @@ namespace llarp
virtual bool
HandleRecvLinkMessageBuffer(ILinkSession* from, const llarp_buffer_t& msg) = 0;
virtual LMQ_ptr
virtual const LMQ_ptr&
lmq() const = 0;
virtual vpn::Platform*
GetVPNPlatform() const = 0;
virtual std::shared_ptr<rpc::LokidRpcClient>
virtual const std::shared_ptr<rpc::LokidRpcClient>&
RpcClient() const = 0;
virtual std::shared_ptr<Logic>
virtual const std::shared_ptr<Logic>&
logic() const = 0;
virtual llarp_dht_context*
dht() const = 0;
virtual std::shared_ptr<NodeDB>
virtual const std::shared_ptr<NodeDB>&
nodedb() const = 0;
virtual const path::PathContext&
@ -122,7 +122,7 @@ namespace llarp
virtual exit::Context&
exitContext() = 0;
virtual std::shared_ptr<KeyManager>
virtual const std::shared_ptr<KeyManager>&
keyManager() const = 0;
virtual const SecretKey&
@ -134,7 +134,7 @@ namespace llarp
virtual Profiling&
routerProfiling() = 0;
virtual llarp_ev_loop_ptr
virtual const EventLoop_ptr&
netloop() const = 0;
/// call function in crypto worker

@ -47,12 +47,10 @@ static constexpr std::chrono::milliseconds ROUTER_TICK_INTERVAL = 1s;
namespace llarp
{
Router::Router(
llarp_ev_loop_ptr __netloop,
std::shared_ptr<Logic> l,
std::shared_ptr<vpn::Platform> vpnPlatform)
EventLoop_ptr netloop, std::shared_ptr<Logic> l, std::shared_ptr<vpn::Platform> vpnPlatform)
: ready(false)
, m_lmq(std::make_shared<oxenmq::OxenMQ>())
, _netloop(std::move(__netloop))
, _netloop(std::move(netloop))
, _logic(std::move(l))
, _vpnPlatform(std::move(vpnPlatform))
, paths(this)
@ -289,7 +287,7 @@ namespace llarp
bool
Router::Configure(std::shared_ptr<Config> c, bool isRouter, std::shared_ptr<NodeDB> nodedb)
{
m_Config = c;
m_Config = std::move(c);
auto& conf = *m_Config;
whitelistRouters = conf.lokid.whitelistRouters;
if (whitelistRouters)
@ -307,7 +305,7 @@ namespace llarp
m_lmq->start();
_nodedb = nodedb;
_nodedb = std::move(nodedb);
m_isServiceNode = conf.router.m_isRelay;
@ -366,18 +364,10 @@ namespace llarp
if (_onDown)
_onDown();
LogInfo("closing router");
llarp_ev_loop_stop(_netloop);
_netloop->stop();
_running.store(false);
}
void
Router::handle_router_ticker()
{
ticker_job_id = 0;
Tick();
ScheduleTicker(ROUTER_TICK_INTERVAL);
}
bool
Router::ParseRoutingMessageBuffer(
const llarp_buffer_t& buf, routing::IMessageHandler* h, const PathID_t& rxid)
@ -914,12 +904,6 @@ namespace llarp
return CryptoManager::instance()->sign(sig, identity(), buf);
}
void
Router::ScheduleTicker(llarp_time_t interval)
{
ticker_job_id = _logic->call_later(interval, std::bind(&Router::handle_router_ticker, this));
}
void
Router::SessionClosed(RouterID remote)
{
@ -1136,11 +1120,11 @@ namespace llarp
#ifdef _WIN32
// windows uses proactor event loop so we need to constantly pump
_netloop->add_ticker(std::bind(&Router::PumpLL, this));
_netloop->add_ticker([this] { PumpLL(); });
#else
_netloop->set_pump_function(std::bind(&Router::PumpLL, this));
_netloop->set_pump_function([this] { PumpLL(); });
#endif
ScheduleTicker(ROUTER_TICK_INTERVAL);
_logic->call_every(ROUTER_TICK_INTERVAL, weak_from_this(), [this] { Tick(); });
_running.store(true);
_startedAt = Now();
#if defined(WITH_SYSTEMD)
@ -1338,9 +1322,7 @@ namespace llarp
if (!link)
throw std::runtime_error("NewOutboundLink() failed to provide a link");
const auto afs = {AF_INET, AF_INET6};
for (const auto af : afs)
for (const auto af : {AF_INET, AF_INET6})
{
if (not link->Configure(netloop(), "*", af, m_OutboundPort))
continue;

@ -8,7 +8,7 @@
#include <config/key_manager.hpp>
#include <constants/link_layer.hpp>
#include <crypto/types.hpp>
#include <ev/ev.h>
#include <ev/ev.hpp>
#include <exit/context.hpp>
#include <handlers/tun.hpp>
#include <link/link_manager.hpp>
@ -76,19 +76,19 @@ namespace llarp
LMQ_ptr m_lmq;
LMQ_ptr
const LMQ_ptr&
lmq() const override
{
return m_lmq;
}
std::shared_ptr<rpc::LokidRpcClient>
const std::shared_ptr<rpc::LokidRpcClient>&
RpcClient() const override
{
return m_lokidRpcClient;
}
std::shared_ptr<Logic>
const std::shared_ptr<Logic>&
logic() const override
{
return _logic;
@ -103,7 +103,7 @@ namespace llarp
util::StatusObject
ExtractStatus() const override;
std::shared_ptr<NodeDB>
const std::shared_ptr<NodeDB>&
nodedb() const override
{
return _nodedb;
@ -136,7 +136,7 @@ namespace llarp
return _exitContext;
}
std::shared_ptr<KeyManager>
const std::shared_ptr<KeyManager>&
keyManager() const override
{
return m_keyManager;
@ -160,7 +160,7 @@ namespace llarp
return _routerProfiling;
}
llarp_ev_loop_ptr
const EventLoop_ptr&
netloop() const override
{
return _netloop;
@ -180,7 +180,7 @@ namespace llarp
std::optional<SockAddr> _ourAddress;
llarp_ev_loop_ptr _netloop;
EventLoop_ptr _netloop;
std::shared_ptr<Logic> _logic;
std::shared_ptr<vpn::Platform> _vpnPlatform;
path::PathContext paths;
@ -206,8 +206,6 @@ namespace llarp
// should we be sending padded messages every interval?
bool sendPadding = false;
uint32_t ticker_job_id = 0;
LinkMessageParser inbound_link_msg_parser;
routing::InboundMessageParser inbound_routing_msg_parser;
@ -327,11 +325,11 @@ namespace llarp
GossipRCIfNeeded(const RouterContact rc) override;
explicit Router(
llarp_ev_loop_ptr __netloop,
EventLoop_ptr netloop,
std::shared_ptr<Logic> logic,
std::shared_ptr<vpn::Platform> vpnPlatform);
virtual ~Router() override;
~Router() override;
bool
HandleRecvLinkMessageBuffer(ILinkSession* from, const llarp_buffer_t& msg) override;
@ -462,10 +460,6 @@ namespace llarp
return llarp::time_now_ms();
}
/// schedule ticker to call i ms from now
void
ScheduleTicker(llarp_time_t i = 1s);
/// parse a routing message in a buffer and handle it with a handler if
/// successful parsing return true on parse and handle success otherwise
/// return false
@ -502,9 +496,6 @@ namespace llarp
uint32_t
NextPathBuildNumber() override;
void
handle_router_ticker();
void
AfterStopLinks();

@ -29,7 +29,7 @@ namespace llarp::rpc
},
[self = shared_from_this()](oxenmq::ConnectionID, std::string_view fail) {
LogWarn("failed to connect to endpoint auth server: ", fail);
self->m_Endpoint->RouterLogic()->call_later(1s, [self]() { self->Start(); });
self->m_Endpoint->Logic()->call_later(1s, [self] { self->Start(); });
});
}
@ -44,33 +44,32 @@ namespace llarp::rpc
std::shared_ptr<llarp::service::ProtocolMessage> msg,
std::function<void(service::AuthResult)> hook)
{
assert(m_Endpoint->Logic()->inLogicThread());
service::ConvoTag tag = msg->tag;
m_PendingAuths.insert(tag);
const auto from = msg->sender.Addr();
auto reply =
[self = this, tag, logic = m_Endpoint->RouterLogic(), hook](service::AuthResult result) {
logic->Call([self, hook, result, tag]() {
self->m_PendingAuths.erase(tag);
hook(result);
});
};
auto reply = m_Endpoint->Logic()->make_caller([this, tag, hook](service::AuthResult result) {
m_PendingAuths.erase(tag);
hook(result);
});
if (m_AuthWhitelist.count(from))
{
// explicitly whitelisted source
reply({service::AuthResultCode::eAuthAccepted, "explicitly whitelisted"});
reply(service::AuthResult{service::AuthResultCode::eAuthAccepted, "explicitly whitelisted"});
return;
}
if (not m_Conn.has_value())
{
// we don't have a connection to the backend so it's failed
reply({service::AuthResultCode::eAuthFailed, "remote has no connection to auth backend"});
reply(service::AuthResult{service::AuthResultCode::eAuthFailed,
"remote has no connection to auth backend"});
return;
}
if (msg->proto != llarp::service::eProtocolAuth)
{
// not an auth message, reject
reply({service::AuthResultCode::eAuthRejected, "protocol error"});
reply(service::AuthResult{service::AuthResultCode::eAuthRejected, "protocol error"});
return;
}
@ -81,7 +80,8 @@ namespace llarp::rpc
m_LMQ->request(
*m_Conn,
m_AuthMethod,
[self = shared_from_this(), reply](bool success, std::vector<std::string> data) {
[self = shared_from_this(), reply = std::move(reply)](
bool success, std::vector<std::string> data) {
service::AuthResult result{service::AuthResultCode::eAuthFailed, "no reason given"};
if (success and not data.empty())
{

@ -76,21 +76,6 @@ namespace llarp
return m_state->Configure(conf);
}
llarp_ev_loop_ptr
Endpoint::EndpointNetLoop()
{
if (m_state->m_IsolatedNetLoop)
return m_state->m_IsolatedNetLoop;
return Router()->netloop();
}
bool
Endpoint::NetworkIsIsolated() const
{
return m_state->m_IsolatedLogic.get() != nullptr && m_state->m_IsolatedNetLoop != nullptr;
}
bool
Endpoint::HasPendingPathToService(const Address& addr) const
{
@ -652,20 +637,6 @@ namespace llarp
m_OnReady = nullptr;
}
void
Endpoint::IsolatedNetworkMainLoop()
{
m_state->m_IsolatedNetLoop = llarp_make_ev_loop();
m_state->m_IsolatedLogic = std::make_shared<llarp::Logic>();
if (SetupNetworking())
llarp_ev_loop_run_single_process(m_state->m_IsolatedNetLoop, m_state->m_IsolatedLogic);
else
{
m_state->m_IsolatedNetLoop.reset();
m_state->m_IsolatedLogic.reset();
}
}
std::optional<std::vector<RouterContact>>
Endpoint::GetHopsForBuild()
{
@ -1019,7 +990,9 @@ namespace llarp
}
else
{
RouterLogic()->Call([hook]() { hook({AuthResultCode::eAuthAccepted, "OK"}); });
Router()->logic()->Call([h = std::move(hook)] {
h({AuthResultCode::eAuthAccepted, "OK"});
});
}
}
@ -1099,7 +1072,7 @@ namespace llarp
RemoveConvoTag(frame.T);
return true;
}
if (not frame.AsyncDecryptAndVerify(EndpointLogic(), p, m_Identity, this))
if (not frame.AsyncDecryptAndVerify(Router()->logic(), p, m_Identity, this))
{
// send reset convo tag message
ProtocolFrame f;
@ -1342,14 +1315,7 @@ namespace llarp
}
};
if (NetworkIsIsolated())
{
LogicCall(EndpointLogic(), epPump);
}
else
{
epPump();
}
epPump();
auto router = Router();
// TODO: locking on this container
for (const auto& item : m_state->m_RemoteSessions)
@ -1537,24 +1503,18 @@ namespace llarp
|| NumInStatus(path::ePathEstablished) < path::min_intro_paths;
}
std::shared_ptr<Logic>
Endpoint::RouterLogic()
{
return Router()->logic();
}
std::shared_ptr<Logic>
Endpoint::EndpointLogic()
{
return m_state->m_IsolatedLogic ? m_state->m_IsolatedLogic : Router()->logic();
}
AbstractRouter*
Endpoint::Router()
{
return m_state->m_Router;
}
const std::shared_ptr<llarp::Logic>&
Endpoint::Logic()
{
return Router()->logic();
}
void
Endpoint::BlacklistSNode(const RouterID snode)
{

@ -2,7 +2,7 @@
#define LLARP_SERVICE_ENDPOINT_HPP
#include <llarp.h>
#include <dht/messages/gotrouter.hpp>
#include <ev/ev.h>
#include <ev/ev.hpp>
#include <exit/session.hpp>
#include <net/ip_range_map.hpp>
#include <net/net.hpp>
@ -136,20 +136,10 @@ namespace llarp
void
ResetInternalState() override;
/// router's logic
/// logic (via router)
/// use when sending any data on a path
std::shared_ptr<Logic>
RouterLogic();
/// endpoint's logic
/// use when writing any data to local network interfaces
std::shared_ptr<Logic>
EndpointLogic();
/// borrow endpoint's net loop for sending data to user on local network
/// interface
llarp_ev_loop_ptr
EndpointNetLoop();
const std::shared_ptr<llarp::Logic>&
Logic();
AbstractRouter*
Router();
@ -423,22 +413,6 @@ namespace llarp
void
PrefetchServicesByTag(const Tag& tag);
/// spawn a new process that contains a network isolated process
/// return true if we set up isolation and the event loop is up
/// otherwise return false
virtual bool
SpawnIsolatedNetwork()
{
return false;
}
bool
NetworkIsIsolated() const;
/// this runs in the isolated network process
void
IsolatedNetworkMainLoop();
private:
void
HandleVerifyGotRouter(dht::GotRouterMessage_constptr msg, RouterID id, bool valid);

@ -19,25 +19,8 @@
namespace llarp
{
struct EventLoop;
}
using llarp_ev_loop_ptr = std::shared_ptr<llarp::EventLoop>;
namespace llarp
{
// clang-format off
namespace exit { struct BaseSession; }
namespace path { struct Path; using Path_ptr = std::shared_ptr< Path >; }
namespace routing { struct PathTransferMessage; }
// clang-format on
namespace service
{
struct IServiceLookup;
struct OutboundContext;
struct Endpoint;
struct EndpointState
{
hooks::Backend_ptr m_OnUp;
@ -47,8 +30,6 @@ namespace llarp
std::set<RouterID> m_SnodeBlacklist;
AbstractRouter* m_Router;
std::shared_ptr<Logic> m_IsolatedLogic = nullptr;
llarp_ev_loop_ptr m_IsolatedNetLoop = nullptr;
std::string m_Keyfile;
std::string m_Name;
std::string m_NetNS;

@ -202,7 +202,7 @@ namespace llarp
currentConvoTag.Randomize();
auto frame = std::make_shared<ProtocolFrame>();
auto ex = std::make_shared<AsyncKeyExchange>(
m_Endpoint->RouterLogic(),
m_Endpoint->Logic(),
remoteIdent,
m_Endpoint->GetIdentity(),
currentIntroSet.K,
@ -581,7 +581,7 @@ namespace llarp
};
}
const auto& ident = m_Endpoint->GetIdentity();
if (not frame.AsyncDecryptAndVerify(m_Endpoint->EndpointLogic(), p, ident, m_Endpoint, hook))
if (not frame.AsyncDecryptAndVerify(m_Endpoint->Logic(), p, ident, m_Endpoint, hook))
{
// send reset convo tag message
ProtocolFrame f;

@ -29,7 +29,7 @@ namespace llarp
{
if (m_SendQueue.empty() or m_SendQueue.full())
{
LogicCall(m_Endpoint->RouterLogic(), [self = this]() { self->FlushUpstream(); });
LogicCall(m_Endpoint->Logic(), [this] { FlushUpstream(); });
}
m_SendQueue.pushBack(std::make_pair(
std::make_shared<const routing::PathTransferMessage>(*msg, remoteIntro.pathID), path));
@ -97,14 +97,13 @@ namespace llarp
m->sender = m_Endpoint->GetIdentity().pub;
m->tag = f->T;
m->PutBuffer(payload);
auto self = this;
m_Endpoint->Router()->QueueWork([f, m, shared, path, self]() {
if (not f->EncryptAndSign(*m, shared, self->m_Endpoint->GetIdentity()))
m_Endpoint->Router()->QueueWork([f, m, shared, path, this] {
if (not f->EncryptAndSign(*m, shared, m_Endpoint->GetIdentity()))
{
LogError(self->m_Endpoint->Name(), " failed to sign message");
LogError(m_Endpoint->Name(), " failed to sign message");
return;
}
self->Send(f, path);
Send(f, path);
});
}

@ -1,6 +1,6 @@
#pragma once
#include <crypto/crypto_libsodium.hpp>
#include <ev/ev.h>
#include <ev/ev.hpp>
namespace llarp
{
@ -15,7 +15,7 @@ namespace llarp
Simulation();
llarp::CryptoManager m_CryptoManager;
llarp_ev_loop_ptr m_NetLoop;
EventLoop_ptr m_NetLoop;
std::unordered_map<std::string, Node_ptr> m_Nodes;

@ -8,7 +8,7 @@ namespace tooling
{}
std::shared_ptr<llarp::AbstractRouter>
HiveContext::makeRouter(llarp_ev_loop_ptr netloop, std::shared_ptr<llarp::Logic> logic)
HiveContext::makeRouter(llarp::EventLoop_ptr netloop, std::shared_ptr<llarp::Logic> logic)
{
return std::make_shared<HiveRouter>(netloop, logic, makeVPNPlatform(), m_hive);
}

@ -12,7 +12,7 @@ namespace tooling
HiveContext(RouterHive* hive);
std::shared_ptr<llarp::AbstractRouter>
makeRouter(llarp_ev_loop_ptr netloop, std::shared_ptr<llarp::Logic> logic) override;
makeRouter(llarp::EventLoop_ptr netloop, std::shared_ptr<llarp::Logic> logic) override;
/// Get this context's router as a HiveRouter.
///

@ -5,7 +5,7 @@
namespace tooling
{
HiveRouter::HiveRouter(
llarp_ev_loop_ptr netloop,
llarp::EventLoop_ptr netloop,
std::shared_ptr<llarp::Logic> logic,
std::shared_ptr<llarp::vpn::Platform> plat,
RouterHive* hive)

@ -11,7 +11,7 @@ namespace tooling
struct HiveRouter : public llarp::Router
{
explicit HiveRouter(
llarp_ev_loop_ptr netloop,
llarp::EventLoop_ptr netloop,
std::shared_ptr<llarp::Logic> logic,
std::shared_ptr<llarp::vpn::Platform> vpnPlatform,
RouterHive* hive);

@ -124,7 +124,7 @@ namespace llarp
operator^(const AlignedBuffer& other) const
{
AlignedBuffer<sz> ret;
std::transform(begin(), end(), other.begin(), ret.begin(), std::bit_xor<byte_t>());
std::transform(begin(), end(), other.begin(), ret.begin(), std::bit_xor<>());
return ret;
}

@ -129,3 +129,23 @@ operator==(const llarp_buffer_t& buff, const char* c_str)
}
return *str == 0;
}
namespace llarp
{
OwnedBuffer
OwnedBuffer::copy_from(const llarp_buffer_t& b)
{
auto buf = std::make_unique<byte_t[]>(b.sz);
std::copy(b.begin(), b.end(), buf.get());
return {std::move(buf), b.sz};
}
OwnedBuffer
OwnedBuffer::copy_used(const llarp_buffer_t& b)
{
auto buf = std::make_unique<byte_t[]>(b.cur - b.base);
std::copy(b.base, b.cur, buf.get());
return {std::move(buf), b.sz};
}
} // namespace llarp

@ -1,6 +1,7 @@
#ifndef LLARP_BUFFER_HPP
#define LLARP_BUFFER_HPP
#include <type_traits>
#include <util/common.hpp>
#include <util/mem.h>
#include <util/types.hpp>
@ -13,11 +14,16 @@
#include <cstring>
#include <utility>
#include <algorithm>
#include <memory>
/**
* buffer.h
*
* generic memory buffer
*
* TODO: replace usage of these with std::span (via a backport until we move to C++20). That's a
* fairly big job, though, as llarp_buffer_t is currently used a bit differently (i.e. maintains
* both start and current position, plus has some value reading/writing methods).
*/
/**
@ -83,34 +89,44 @@ struct llarp_buffer_t
llarp_buffer_t(const ManagedBuffer&) = delete;
llarp_buffer_t(ManagedBuffer&&) = delete;
template <typename T>
llarp_buffer_t(T* buf, size_t _sz) : base(reinterpret_cast<byte_t*>(buf)), cur(base), sz(_sz)
/// Construct referencing some 1-byte, trivially copyable (e.g. char, unsigned char, byte_t)
/// pointer type and a buffer size.
template <
typename T,
typename = std::enable_if_t<sizeof(T) == 1 and std::is_trivially_copyable_v<T>>>
llarp_buffer_t(T* buf, size_t _sz)
: base(reinterpret_cast<byte_t*>(const_cast<std::remove_const_t<T>*>(buf)))
, cur(base)
, sz(_sz)
{}
template <typename T>
llarp_buffer_t(const T* buf, size_t _sz)
: base(reinterpret_cast<byte_t*>(const_cast<T*>(buf))), cur(base), sz(_sz)
/// initialize llarp_buffer_t from containers supporting .data() and .size()
template <
typename T,
typename = std::void_t<decltype(std::declval<T>().data() + std::declval<T>().size())>>
llarp_buffer_t(T& t) : llarp_buffer_t{t.data(), t.size()}
{}
/** initialize llarp_buffer_t from container */
template <typename T>
llarp_buffer_t(T& t) : base(t.data()), cur(t.data()), sz(t.size())
byte_t*
begin()
{
// use data over the first element to "enforce" the container used has
// contiguous memory. (Note this isn't required by the standard, but a
// reasonable test on most standard library implementations).
return base;
}
byte_t*
begin() const
{
return base;
}
byte_t*
end()
{
return base + sz;
}
byte_t*
end() const
{
return base + sz;
}
template <typename T>
llarp_buffer_t(const T& t) : llarp_buffer_t(t.data(), t.size())
{}
// clang-format off
byte_t * begin() { return base; }
byte_t * begin() const { return base; }
byte_t * end() { return base + sz; }
byte_t * end() const { return base + sz; }
// clang-format on
size_t
size_left() const;
@ -211,4 +227,49 @@ struct ManagedBuffer
}
};
namespace llarp
{
// Wrapper around a std::unique_ptr<byte_t[]> that owns its own memory and is also implicitly
// convertible to a llarp_buffer_t.
struct OwnedBuffer
{
std::unique_ptr<byte_t[]> buf;
size_t sz;
template <typename T, typename = std::enable_if_t<sizeof(T) == 1>>
OwnedBuffer(std::unique_ptr<T[]> buf, size_t sz)
: buf{reinterpret_cast<byte_t*>(buf.release())}, sz{sz}
{}
// Create a new, uninitialized owned buffer of the given size.
explicit OwnedBuffer(size_t sz) : OwnedBuffer{std::make_unique<byte_t[]>(sz), sz}
{}
OwnedBuffer(const OwnedBuffer&) = delete;
OwnedBuffer&
operator=(const OwnedBuffer&) = delete;
OwnedBuffer(OwnedBuffer&&) = default;
OwnedBuffer&
operator=(OwnedBuffer&&) = delete;
// Implicit conversion so that this OwnedBuffer can be passed to anything taking a
// llarp_buffer_t
operator llarp_buffer_t()
{
return {buf.get(), sz};
}
// Creates an owned buffer by copying from a llarp_buffer_t. (Can also be used to copy from
// another OwnedBuffer via the implicit conversion operator above).
static OwnedBuffer
copy_from(const llarp_buffer_t& b);
// Creates an owned buffer by copying the used portion of a llarp_buffer_t (i.e. from base to
// cur), for when a llarp_buffer_t is used in write mode.
static OwnedBuffer
copy_used(const llarp_buffer_t& b);
};
} // namespace llarp
#endif

@ -24,35 +24,12 @@ namespace llarp
m_Queue = std::move(q);
}
uint32_t
Logic::call_later(llarp_time_t timeout, std::function<void(void)> func)
{
auto loop = m_Loop;
if (loop != nullptr)
{
return loop->call_after_delay(timeout, func);
}
return 0;
}
void
Logic::cancel_call(uint32_t id)
{
auto loop = m_Loop;
if (loop != nullptr)
{
loop->cancel_delayed_call(id);
}
}
void
Logic::remove_call(uint32_t id)
Logic::call_later(llarp_time_t timeout, std::function<void(void)> func)
{
auto loop = m_Loop;
if (loop != nullptr)
{
loop->cancel_delayed_call(id);
}
Call([this, timeout, f = std::move(func)]() mutable {
m_Loop->call_after_delay(timeout, std::move(f));
});
}
void

@ -16,24 +16,86 @@ namespace llarp
void
Call(std::function<void(void)> func);
uint32_t
// Calls the given function once, after the given delay.
void
call_later(llarp_time_t later, std::function<void(void)> func);
// Calls the given function repeatedly, forever, as long as the event loop lasts; the initial
// call will be after the given delay.
void
cancel_call(uint32_t id);
call_forever(llarp_time_t repeat, std::function<void(void)> func);
// Created a repeated timer, like call_forever(repeat, func), but ties the lifetime of the
// callback to `owner`: callbacks will be invoked so long as `owner` remains alive, but
// thereafter the callback will be destroyed. Intended to be used as:
//
// logic->call_every(100ms, shared_from_this(), [this] { some_func(); });
//
template <typename Callable>
void
remove_call(uint32_t id);
call_every(llarp_time_t repeat, std::weak_ptr<void> owner, Callable f)
{
auto repeater = m_Loop->make_repeater();
auto& r = *repeater;
r.start(
repeat,
[repeater = std::move(repeater), owner = std::move(owner), f = std::move(f)]() mutable {
if (auto ptr = owner.lock())
f();
else
repeater.reset(); // Remove timer on destruction (we should be the only thing holder
// the repeater)
});
}
// Wraps a lambda with a lambda that triggers it to be called via Logic::Call()
// when invoked. E.g.:
//
// auto x = logic->make_caller([] (int a) { std::cerr << a; });
// x(42);
// x(99);
//
// will schedule two calls of the inner lambda (with different arguments) in the logic thread.
// Arguments are forwarded to the inner lambda (allowing moving arguments into it).
template <typename Callable>
auto
make_caller(Callable&& f)
{
return [this, f = std::forward<Callable>(f)](auto&&... args) {
// This shared pointer in a pain in the ass but needed because this lambda is going into a
// std::function that only accepts copyable lambdas. I *want* to simply capture:
// args=std::make_tuple(std::forward<decltype(args)>(args)...)
// but that fails if any given args aren't copyable. Dammit.
auto args_tuple_ptr = std::make_shared<std::tuple<std::decay_t<decltype(args)>...>>(
std::forward<decltype(args)>(args)...);
Call([f, args = std::move(args_tuple_ptr)]() mutable {
// Moving away the tuple args here is okay because this lambda will only be invoked once
std::apply(f, std::move(*args));
});
};
}
void
SetQueuer(std::function<void(std::function<void(void)>)> q);
EventLoop*
event_loop()
{
return m_Loop;
}
void
set_event_loop(EventLoop* loop);
void
clear_event_loop();
bool
inLogicThread() const
{
return m_Loop and m_Loop->inEventLoopThread();
}
private:
EventLoop* m_Loop = nullptr;
std::function<void(std::function<void(void)>)> m_Queue;

@ -131,16 +131,18 @@ namespace llarp::vpn
net::IPPacket
ReadNextPacket() override
{
constexpr int uintsize = sizeof(unsigned int);
net::IPPacket pkt{};
unsigned int pktinfo = 0;
const struct iovec vecs[2] = {{.iov_base = &pktinfo, .iov_len = sizeof(unsigned int)},
const struct iovec vecs[2] = {{.iov_base = &pktinfo, .iov_len = uintsize},
{.iov_base = pkt.buf, .iov_len = sizeof(pkt.buf)}};
int n = readv(m_FD, vecs, 2);
if (n >= (int)(sizeof(unsigned int)))
{
n -= sizeof(unsigned int);
pkt.sz = n;
}
int sz = readv(m_FD, vecs, 2);
if (sz >= uintsize)
pkt.sz = sz - uintsize;
else if (sz >= 0 || errno == EAGAIN || errno == EWOULDBLOCK)
pkt.sz = 0;
else
throw std::error_code{errno, std::system_category()};
return pkt;
}

@ -91,6 +91,10 @@ namespace llarp::vpn
const auto sz = read(m_fd, pkt.buf, sizeof(pkt.buf));
if (sz >= 0)
pkt.sz = std::min(sz, ssize_t{sizeof(pkt.buf)});
else if (errno == EAGAIN || errno == EWOULDBLOCK)
pkt.sz = 0;
else
throw std::error_code{errno, std::system_category()};
return pkt;
}

@ -345,12 +345,6 @@ namespace llarp::vpn
return -1;
}
bool
HasNextPacket() override
{
return not m_ReadQueue.empty();
}
std::string
IfName() const override
{
@ -376,8 +370,11 @@ namespace llarp::vpn
}
net::IPPacket
ReadNextPacket()
ReadNextPacket() override
{
if (m_ReadQueue.empty())
return net::IPPacket{};
return m_ReadQueue.popFront();
}

@ -11,6 +11,7 @@
#include <util/time.hpp>
#include <net/net_if.hpp>
#include "ev/ev.hpp"
#undef LOG_TAG
#define LOG_TAG __FILE__
@ -38,11 +39,11 @@ struct IWPLinkContext
llarp::LinkLayer_ptr link;
std::shared_ptr<llarp::KeyManager> keyManager;
llarp::LinkMessageParser m_Parser;
llarp_ev_loop_ptr m_Loop;
llarp::EventLoop_ptr m_Loop;
/// is the test done on this context ?
bool gucci = false;
IWPLinkContext(std::string_view addr, llarp_ev_loop_ptr loop)
IWPLinkContext(std::string_view addr, llarp::EventLoop_ptr loop)
: localAddr{std::move(addr)}
, keyManager{std::make_shared<llarp::KeyManager>()}
, m_Parser{nullptr}
@ -104,7 +105,7 @@ struct IWPLinkContext
},
// timeout handler
[&](llarp::ILinkSession*) {
llarp_ev_loop_stop(m_Loop);
m_Loop->stop();
FAIL("session timeout");
},
// session closed handler
@ -150,8 +151,9 @@ RunIWPTest(Func_t test, Duration_t timeout = 10s)
llarp::LogSilencer shutup;
// set up event loop
auto logic = std::make_shared<llarp::Logic>();
auto loop = llarp_make_ev_loop();
auto loop = llarp::EventLoop::create();
loop->set_logic(logic);
logic->set_event_loop(loop.get());
llarp::LogContext::Instance().Initialize(
llarp::eLogDebug, llarp::LogType::File, "stdout", "unit test", [loop](auto work) {
@ -175,7 +177,7 @@ RunIWPTest(Func_t test, Duration_t timeout = 10s)
auto endIfDone = [initiator, recipiant, loop, logic]() {
if (initiator->gucci and recipiant->gucci)
{
LogicCall(logic, [loop]() { llarp_ev_loop_stop(loop); });
LogicCall(logic, [loop] { loop->stop(); });
}
};
// function to start test and give logic to unit test
@ -186,12 +188,12 @@ RunIWPTest(Func_t test, Duration_t timeout = 10s)
};
// function to end test immediately
auto endTest = [logic, loop]() { LogicCall(logic, [loop]() { llarp_ev_loop_stop(loop); }); };
auto endTest = [logic, loop]() { LogicCall(logic, [loop] { loop->stop(); }); };
loop->call_after_delay(
std::chrono::duration_cast<llarp_time_t>(timeout), []() { FAIL("test timeout"); });
test(start, endIfDone, endTest, initiator, recipiant);
llarp_ev_loop_run_single_process(loop, logic);
loop->run(*logic);
llarp::RouterContact::BlockBogons = oldBlockBogons;
}

Loading…
Cancel
Save