You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
lokinet/llarp/ev/ev_libuv.cpp

1067 lines
23 KiB
C++

#include <ev/ev_libuv.hpp>
#include <net/net_addr.hpp>
#include <util/thread/logic.hpp>
#include <util/thread/queue.hpp>
#include <cstring>
5 years ago
namespace libuv
{
#define LoopCall(h, ...) \
LogicCall(static_cast< Loop* >((h)->loop->data)->m_Logic, __VA_ARGS__)
struct glue
{
5 years ago
virtual ~glue() = default;
virtual void
Close() = 0;
};
5 years ago
/// tcp connection glue between llarp and libuv
struct conn_glue : public glue
5 years ago
{
using WriteBuffer_t = std::vector< char >;
struct WriteEvent
{
WriteBuffer_t data;
uv_write_t request;
WriteEvent() = default;
explicit WriteEvent(size_t sz, char* ptr)
{
request.data = this;
data.resize(sz);
std::copy_n(ptr, sz, data.begin());
}
uv_buf_t
Buffer()
{
return uv_buf_init(data.data(), data.size());
}
uv_write_t*
Request()
{
return &request;
}
};
5 years ago
uv_tcp_t m_Handle;
uv_connect_t m_Connect;
uv_check_t m_Ticker;
5 years ago
llarp_tcp_connecter* const m_TCP;
llarp_tcp_acceptor* const m_Accept;
llarp_tcp_conn m_Conn;
llarp::Addr m_Addr;
5 years ago
conn_glue(uv_loop_t* loop, llarp_tcp_connecter* tcp, const sockaddr* addr)
: m_TCP(tcp), m_Accept(nullptr), m_Addr(*addr)
5 years ago
{
m_Connect.data = this;
m_Handle.data = this;
m_TCP->impl = this;
5 years ago
uv_tcp_init(loop, &m_Handle);
5 years ago
m_Ticker.data = this;
uv_check_init(loop, &m_Ticker);
m_Conn.close = &ExplicitClose;
m_Conn.write = &ExplicitWrite;
5 years ago
}
conn_glue(uv_loop_t* loop, llarp_tcp_acceptor* tcp, const sockaddr* addr)
: m_TCP(nullptr), m_Accept(tcp), m_Addr(*addr)
5 years ago
{
m_Connect.data = nullptr;
m_Handle.data = this;
5 years ago
uv_tcp_init(loop, &m_Handle);
5 years ago
m_Ticker.data = this;
uv_check_init(loop, &m_Ticker);
m_Accept->close = &ExplicitCloseAccept;
m_Conn.write = nullptr;
m_Conn.closed = nullptr;
5 years ago
}
conn_glue(conn_glue* parent) : m_TCP(nullptr), m_Accept(nullptr)
5 years ago
{
m_Connect.data = nullptr;
m_Conn.close = &ExplicitClose;
m_Conn.write = &ExplicitWrite;
m_Handle.data = this;
5 years ago
uv_tcp_init(parent->m_Handle.loop, &m_Handle);
5 years ago
m_Ticker.data = this;
uv_check_init(parent->m_Handle.loop, &m_Ticker);
5 years ago
}
static void
OnOutboundConnect(uv_connect_t* c, int status)
{
conn_glue* self = static_cast< conn_glue* >(c->data);
self->HandleConnectResult(status);
c->data = nullptr;
5 years ago
}
bool
ConnectAsync()
{
return uv_tcp_connect(&m_Connect, &m_Handle, m_Addr, &OnOutboundConnect)
!= -1;
}
static void
ExplicitClose(llarp_tcp_conn* conn)
{
static_cast< conn_glue* >(conn->impl)->Close();
}
static void
ExplicitCloseAccept(llarp_tcp_acceptor* tcp)
{
static_cast< conn_glue* >(tcp->impl)->Close();
}
5 years ago
5 years ago
static ssize_t
ExplicitWrite(llarp_tcp_conn* conn, const byte_t* ptr, size_t sz)
{
return static_cast< conn_glue* >(conn->impl)->WriteAsync((char*)ptr, sz);
5 years ago
}
5 years ago
static void
OnRead(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf)
{
if(nread >= 0)
5 years ago
{
auto* conn = static_cast< conn_glue* >(stream->data);
conn->Read(buf->base, nread);
5 years ago
}
else if(nread < 0)
5 years ago
{
static_cast< conn_glue* >(stream->data)->Close();
5 years ago
}
5 years ago
delete[] buf->base;
}
static void
Alloc(uv_handle_t*, size_t suggested_size, uv_buf_t* buf)
{
buf->base = new char[suggested_size];
buf->len = suggested_size;
}
void
Read(const char* ptr, ssize_t sz)
{
if(m_Conn.read)
{
llarp::LogDebug("tcp read ", sz, " bytes");
const llarp_buffer_t buf(ptr, sz);
m_Conn.read(&m_Conn, buf);
}
5 years ago
}
void
HandleConnectResult(int status)
{
if(m_TCP && m_TCP->connected)
{
if(status == 0)
{
m_Conn.impl = this;
m_Conn.loop = m_TCP->loop;
m_Conn.close = &ExplicitClose;
5 years ago
m_Conn.write = &ExplicitWrite;
5 years ago
m_TCP->connected(m_TCP, &m_Conn);
5 years ago
Start();
5 years ago
}
else if(m_TCP->error)
{
llarp::LogError("failed to connect tcp ", uv_strerror(status));
5 years ago
m_TCP->error(m_TCP);
}
5 years ago
}
}
void
WriteFail()
{
if(m_Conn.close)
m_Conn.close(&m_Conn);
}
uv_stream_t*
Stream()
{
return (uv_stream_t*)&m_Handle;
5 years ago
}
static void
OnWritten(uv_write_t* req, int status)
{
WriteEvent* ev = static_cast< WriteEvent* >(req->data);
if(status == 0)
{
llarp::LogDebug("wrote ", ev->data.size());
}
else
{
llarp::LogDebug("write fail");
}
delete ev;
5 years ago
}
int
WriteAsync(char* data, size_t sz)
5 years ago
{
if(uv_is_closing((const uv_handle_t*)&m_Handle))
return -1;
WriteEvent* ev = new WriteEvent(sz, data);
auto buf = ev->Buffer();
if(uv_write(ev->Request(), Stream(), &buf, 1, &OnWritten) == 0)
return sz;
delete ev;
return -1;
5 years ago
}
static void
OnClosed(uv_handle_t* h)
{
conn_glue* conn = static_cast< conn_glue* >(h->data);
conn->HandleClosed();
5 years ago
}
static void
FullClose(uv_handle_t* h)
{
auto* self = static_cast< conn_glue* >(h->data);
h->data = nullptr;
delete self;
llarp::LogDebug("deleted");
}
5 years ago
void
HandleClosed()
{
m_Handle.data = nullptr;
if(m_Accept)
{
if(m_Accept->closed)
m_Accept->closed(m_Accept);
m_Accept->impl = nullptr;
}
5 years ago
if(m_Conn.closed)
{
5 years ago
m_Conn.closed(&m_Conn);
}
m_Conn.impl = nullptr;
llarp::LogDebug("closed");
uv_close((uv_handle_t*)&m_Ticker, &FullClose);
}
static void
OnShutdown(uv_shutdown_t* shut, int code)
{
llarp::LogDebug("shut down ", code);
auto* self = static_cast< conn_glue* >(shut->data);
uv_close((uv_handle_t*)&self->m_Handle, &OnClosed);
delete shut;
5 years ago
}
void
Close() override
5 years ago
{
if(uv_is_closing((uv_handle_t*)Stream()))
return;
llarp::LogDebug("close tcp connection");
uv_check_stop(&m_Ticker);
uv_read_stop(Stream());
auto* shut = new uv_shutdown_t();
shut->data = this;
uv_shutdown(shut, Stream(), &OnShutdown);
5 years ago
}
static void
OnAccept(uv_stream_t* stream, int status)
{
if(status == 0)
{
conn_glue* conn = static_cast< conn_glue* >(stream->data);
conn->Accept();
5 years ago
}
else
{
llarp::LogError("tcp accept failed: ", uv_strerror(status));
}
5 years ago
}
5 years ago
static void
OnTick(uv_check_t* t)
5 years ago
{
conn_glue* conn = static_cast< conn_glue* >(t->data);
conn->Tick();
5 years ago
}
void
Tick()
{
if(m_Accept && m_Accept->tick)
m_Accept->tick(m_Accept);
if(m_Conn.tick)
m_Conn.tick(&m_Conn);
}
void
Start()
{
auto result = uv_check_start(&m_Ticker, &OnTick);
if(result)
llarp::LogError("failed to start timer ", uv_strerror(result));
result = uv_read_start(Stream(), &Alloc, &OnRead);
if(result)
llarp::LogError("failed to start reader ", uv_strerror(result));
5 years ago
}
5 years ago
void
Accept()
{
if(m_Accept && m_Accept->accepted)
{
auto* child = new conn_glue(this);
llarp::LogDebug("accepted new connection");
child->m_Conn.impl = child;
5 years ago
child->m_Conn.loop = m_Accept->loop;
child->m_Conn.close = &ExplicitClose;
child->m_Conn.write = &ExplicitWrite;
auto res = uv_accept(Stream(), child->Stream());
if(res)
{
llarp::LogError("failed to accept tcp connection ", uv_strerror(res));
child->Close();
return;
}
5 years ago
m_Accept->accepted(m_Accept, &child->m_Conn);
5 years ago
child->Start();
5 years ago
}
}
bool
Server()
{
m_Accept->close = &ExplicitCloseAccept;
5 years ago
return uv_tcp_bind(&m_Handle, m_Addr, 0) == 0
&& uv_listen(Stream(), 5, &OnAccept) == 0;
5 years ago
}
};
struct ticker_glue : public glue
{
std::function< void(void) > func;
ticker_glue(uv_loop_t* loop, std::function< void(void) > tick) : func(tick)
{
m_Ticker.data = this;
uv_check_init(loop, &m_Ticker);
}
static void
OnTick(uv_check_t* t)
{
ticker_glue* ticker = static_cast< ticker_glue* >(t->data);
LoopCall(t, ticker->func);
}
bool
Start()
{
return uv_check_start(&m_Ticker, &OnTick) != -1;
}
void
Close() override
{
uv_check_stop(&m_Ticker);
5 years ago
uv_close((uv_handle_t*)&m_Ticker, [](auto h) {
ticker_glue* self = (ticker_glue*)h->data;
h->data = nullptr;
delete self;
});
}
uv_check_t m_Ticker;
};
struct udp_glue : public glue
5 years ago
{
uv_udp_t m_Handle;
uv_check_t m_Ticker;
5 years ago
llarp_udp_io* const m_UDP;
llarp::Addr m_Addr;
llarp_pkt_list m_LastPackets;
std::array< char, 1500 > m_Buffer;
5 years ago
udp_glue(uv_loop_t* loop, llarp_udp_io* udp, const sockaddr* src)
: m_UDP(udp), m_Addr(*src)
{
m_Handle.data = this;
m_Ticker.data = this;
uv_udp_init(loop, &m_Handle);
uv_check_init(loop, &m_Ticker);
5 years ago
}
static void
Alloc(uv_handle_t*, size_t suggested_size, uv_buf_t* buf)
{
const size_t sz = std::min(suggested_size, size_t{1500});
buf->base = new char[sz];
buf->len = sz;
5 years ago
}
static void
OnRecv(uv_udp_t* handle, ssize_t nread, const uv_buf_t* buf,
const struct sockaddr* addr, unsigned)
{
udp_glue* glue = static_cast< udp_glue* >(handle->data);
5 years ago
if(addr)
glue->RecvFrom(nread, buf, addr);
5 years ago
if(nread <= 0 || glue->m_UDP == nullptr
5 years ago
|| glue->m_UDP->recvfrom != nullptr)
delete[] buf->base;
}
bool
RecvMany(llarp_pkt_list* pkts)
{
*pkts = std::move(m_LastPackets);
m_LastPackets = llarp_pkt_list();
return pkts->size() > 0;
5 years ago
}
void
RecvFrom(ssize_t sz, const uv_buf_t* buf, const struct sockaddr* fromaddr)
{
if(sz > 0 && m_UDP)
5 years ago
{
const size_t pktsz = sz;
if(m_UDP->recvfrom)
5 years ago
{
const llarp_buffer_t pkt((const byte_t*)buf->base, pktsz);
m_UDP->recvfrom(m_UDP, fromaddr, ManagedBuffer{pkt});
5 years ago
}
else
{
5 years ago
PacketBuffer pbuf(buf->base, pktsz);
m_LastPackets.emplace_back(PacketEvent{*fromaddr, std::move(pbuf)});
}
5 years ago
}
}
static void
OnTick(uv_check_t* t)
5 years ago
{
udp_glue* udp = static_cast< udp_glue* >(t->data);
udp->Tick();
5 years ago
}
void
Tick()
{
5 years ago
if(m_UDP && m_UDP->tick)
m_UDP->tick(m_UDP);
5 years ago
}
static int
SendTo(llarp_udp_io* udp, const sockaddr* to, const byte_t* ptr, size_t sz)
{
auto* self = static_cast< udp_glue* >(udp->impl);
if(self == nullptr)
return -1;
uv_buf_t buf = uv_buf_init((char*)ptr, sz);
5 years ago
return uv_udp_try_send(&self->m_Handle, &buf, 1, to);
}
bool
Bind()
{
auto ret = uv_udp_bind(&m_Handle, m_Addr, 0);
if(ret)
{
llarp::LogError("failed to bind to ", m_Addr, " ", uv_strerror(ret));
return false;
}
if(uv_udp_recv_start(&m_Handle, &Alloc, &OnRecv))
{
llarp::LogError("failed to start recving packets via ", m_Addr);
return false;
}
if(uv_check_start(&m_Ticker, &OnTick))
5 years ago
{
llarp::LogError("failed to start ticker");
return false;
}
#if defined(_WIN32) || defined(_WIN64)
#else
5 years ago
if(uv_fileno((const uv_handle_t*)&m_Handle, &m_UDP->fd))
return false;
#endif
5 years ago
m_UDP->sendto = &SendTo;
m_UDP->impl = this;
5 years ago
return true;
}
static void
OnClosed(uv_handle_t* h)
{
auto* glue = static_cast< udp_glue* >(h->data);
if(glue)
{
h->data = nullptr;
delete glue;
}
5 years ago
}
void
Close() override
5 years ago
{
m_UDP->impl = nullptr;
uv_check_stop(&m_Ticker);
5 years ago
uv_close((uv_handle_t*)&m_Handle, &OnClosed);
}
};
5 years ago
struct pipe_glue : public glue
{
byte_t m_Buffer[1024 * 8];
llarp_ev_pkt_pipe* const m_Pipe;
pipe_glue(uv_loop_t* loop, llarp_ev_pkt_pipe* pipe) : m_Pipe(pipe)
{
m_Handle.data = this;
m_Ticker.data = this;
uv_poll_init(loop, &m_Handle, m_Pipe->fd);
uv_check_init(loop, &m_Ticker);
}
void
Tick()
{
LoopCall(&m_Handle, std::bind(&llarp_ev_pkt_pipe::tick, m_Pipe));
5 years ago
}
static void
OnRead(uv_poll_t* handle, int status, int)
{
if(status)
{
return;
}
pipe_glue* glue = static_cast< pipe_glue* >(handle->data);
int r = glue->m_Pipe->read(glue->m_Buffer, sizeof(glue->m_Buffer));
if(r <= 0)
return;
const llarp_buffer_t buf{glue->m_Buffer, static_cast< size_t >(r)};
5 years ago
glue->m_Pipe->OnRead(buf);
}
static void
OnClosed(uv_handle_t* h)
{
auto* self = static_cast< pipe_glue* >(h->data);
if(self)
{
h->data = nullptr;
delete self;
}
}
void
Close() override
{
uv_check_stop(&m_Ticker);
uv_close((uv_handle_t*)&m_Handle, &OnClosed);
}
static void
OnTick(uv_check_t* h)
{
pipe_glue* pipe = static_cast< pipe_glue* >(h->data);
LoopCall(h, std::bind(&pipe_glue::Tick, pipe));
5 years ago
}
bool
Start()
{
if(uv_poll_start(&m_Handle, UV_READABLE, &OnRead))
return false;
if(uv_check_start(&m_Ticker, &OnTick))
return false;
return true;
}
uv_poll_t m_Handle;
uv_check_t m_Ticker;
};
#if defined(_WIN32) || defined(_WIN64)
#else
struct tun_glue : public glue
5 years ago
{
uv_poll_t m_Handle;
uv_check_t m_Ticker;
5 years ago
llarp_tun_io* const m_Tun;
device* const m_Device;
byte_t m_Buffer[1500];
bool readpkt;
5 years ago
tun_glue(llarp_tun_io* tun) : m_Tun(tun), m_Device(tuntap_init())
{
m_Handle.data = this;
m_Ticker.data = this;
readpkt = false;
5 years ago
}
~tun_glue() override
5 years ago
{
tuntap_destroy(m_Device);
}
static void
OnTick(uv_check_t* timer)
5 years ago
{
tun_glue* tun = static_cast< tun_glue* >(timer->data);
5 years ago
tun->Tick();
5 years ago
}
static void
5 years ago
OnPoll(uv_poll_t* h, int, int events)
{
5 years ago
if(events & UV_READABLE)
{
static_cast< tun_glue* >(h->data)->Read();
}
}
void
Read()
{
auto sz = tuntap_read(m_Device, m_Buffer, sizeof(m_Buffer));
5 years ago
if(sz > 0)
{
llarp::LogDebug("tun read ", sz);
5 years ago
const llarp_buffer_t pkt(m_Buffer, sz);
5 years ago
if(m_Tun && m_Tun->recvpkt)
m_Tun->recvpkt(m_Tun, pkt);
}
}
5 years ago
void
Tick()
{
5 years ago
if(m_Tun->before_write)
m_Tun->before_write(m_Tun);
if(m_Tun->tick)
m_Tun->tick(m_Tun);
5 years ago
}
static void
OnClosed(uv_handle_t* h)
{
auto* self = static_cast< tun_glue* >(h->data);
if(self)
{
h->data = nullptr;
delete self;
}
5 years ago
}
void
Close() override
5 years ago
{
5 years ago
if(m_Tun->impl == nullptr)
return;
m_Tun->impl = nullptr;
uv_check_stop(&m_Ticker);
5 years ago
uv_close((uv_handle_t*)&m_Ticker, [](uv_handle_t* h) {
tun_glue* glue = static_cast< tun_glue* >(h->data);
uv_close((uv_handle_t*)&glue->m_Handle, &OnClosed);
});
5 years ago
}
5 years ago
bool
Write(const byte_t* pkt, size_t sz)
{
return tuntap_write(m_Device, (void*)pkt, sz) != -1;
}
static bool
WritePkt(llarp_tun_io* tun, const byte_t* pkt, size_t sz)
{
tun_glue* glue = static_cast< tun_glue* >(tun->impl);
return glue && glue->Write(pkt, sz);
5 years ago
}
5 years ago
bool
Init(uv_loop_t* loop)
{
memcpy(m_Device->if_name, m_Tun->ifname, sizeof(m_Device->if_name));
5 years ago
if(tuntap_start(m_Device, TUNTAP_MODE_TUNNEL, 0) == -1)
{
llarp::LogError("failed to start up ", m_Tun->ifname);
return false;
}
5 years ago
if(tuntap_set_ip(m_Device, m_Tun->ifaddr, m_Tun->ifaddr, m_Tun->netmask)
== -1)
{
llarp::LogError("failed to set address on ", m_Tun->ifname);
return false;
}
5 years ago
if(tuntap_up(m_Device) == -1)
{
llarp::LogError("failed to put up ", m_Tun->ifname);
return false;
}
if(m_Device->tun_fd == -1)
{
llarp::LogError("tun interface ", m_Tun->ifname,
" has invalid fd: ", m_Device->tun_fd);
return false;
}
tuntap_set_nonblocking(m_Device, 1);
5 years ago
if(uv_poll_init(loop, &m_Handle, m_Device->tun_fd) == -1)
{
llarp::LogError("failed to start polling on ", m_Tun->ifname);
return false;
}
if(uv_poll_start(&m_Handle, UV_READABLE, &OnPoll))
{
llarp::LogError("failed to start polling on ", m_Tun->ifname);
return false;
}
if(uv_check_init(loop, &m_Ticker) != 0
|| uv_check_start(&m_Ticker, &OnTick) != 0)
5 years ago
{
llarp::LogError("failed to set up tun interface timer for ",
m_Tun->ifname);
return false;
}
5 years ago
m_Tun->writepkt = &WritePkt;
m_Tun->impl = this;
5 years ago
return true;
}
};
#endif
static void
OnAsyncWake(uv_async_t* async_handle)
{
Loop* loop = static_cast< Loop* >(async_handle->data);
loop->process_timer_queue();
loop->process_cancel_queue();
}
4 years ago
Loop::Loop()
: llarp_ev_loop()
, m_LogicCalls(1024)
, m_timerQueue(20)
, m_timerCancelQueue(20)
{
}
5 years ago
bool
Loop::init()
{
if(uv_loop_init(&m_Impl) == -1)
5 years ago
return false;
#ifdef LOKINET_DEBUG
last_time = 0;
loop_run_count = 0;
#endif
m_Impl.data = this;
#if defined(_WIN32) || defined(_WIN64)
#else
uv_loop_configure(&m_Impl, UV_LOOP_BLOCK_SIGNAL, SIGPIPE);
#endif
m_LogicCaller.data = this;
uv_async_init(&m_Impl, &m_LogicCaller, [](uv_async_t* h) {
5 years ago
Loop* l = static_cast< Loop* >(h->data);
while(not l->m_LogicCalls.empty())
{
5 years ago
auto f = l->m_LogicCalls.popFront();
f();
}
});
m_TickTimer = new uv_timer_t;
m_TickTimer->data = this;
5 years ago
m_Run.store(true);
m_nextID.store(0);
m_WakeUp.data = this;
uv_async_init(&m_Impl, &m_WakeUp, &OnAsyncWake);
return uv_timer_init(&m_Impl, m_TickTimer) != -1;
5 years ago
}
void
Loop::update_time()
{
llarp_ev_loop::update_time();
uv_update_time(&m_Impl);
5 years ago
}
bool
Loop::running() const
{
return m_Run.load();
}
bool
Loop::tcp_connect(llarp_tcp_connecter* tcp, const sockaddr* addr)
{
auto* impl = new conn_glue(&m_Impl, tcp, addr);
tcp->impl = impl;
5 years ago
if(impl->ConnectAsync())
return true;
5 years ago
delete impl;
tcp->impl = nullptr;
return false;
}
static void
OnTickTimeout(uv_timer_t* timer)
{
uv_stop(timer->loop);
}
int
Loop::tick(int ms)
{
5 years ago
if(m_Run)
{
uv_timer_start(m_TickTimer, &OnTickTimeout, ms, 0);
5 years ago
uv_run(&m_Impl, UV_RUN_ONCE);
}
5 years ago
return 0;
}
struct TimerData
{
Loop* loop;
uint64_t job_id;
};
void
CloseUVTimer(uv_timer_t* timer)
{
// have to delete timer handle this way because libuv.
uv_timer_stop(timer);
uv_close((uv_handle_t*)timer,
[](uv_handle_t* handle) { delete(uv_timer_t*)handle; });
}
static void
OnUVTimer(uv_timer_t* timer)
{
TimerData* timer_data = static_cast< TimerData* >(timer->data);
Loop* loop = timer_data->loop;
loop->do_timer_job(timer_data->job_id);
delete timer_data;
CloseUVTimer(timer);
}
uint32_t
Loop::call_after_delay(llarp_time_t delay_ms,
std::function< void(void) > callback)
{
PendingTimer timer;
timer.delay_ms = delay_ms;
timer.callback = callback;
timer.job_id = m_nextID++;
uint64_t job_id = timer.job_id;
m_timerQueue.pushBack(std::move(timer));
uv_async_send(&m_WakeUp);
return job_id;
}
void
Loop::cancel_delayed_call(uint32_t job_id)
{
m_timerCancelQueue.pushBack(job_id);
uv_async_send(&m_WakeUp);
}
void
Loop::process_timer_queue()
{
while(not m_timerQueue.empty())
{
PendingTimer job = m_timerQueue.popFront();
uint64_t job_id = job.job_id;
m_pendingCalls.emplace(job_id, std::move(job.callback));
TimerData* timer_data = new TimerData;
timer_data->loop = this;
timer_data->job_id = job_id;
uv_timer_t* newTimer = new uv_timer_t;
newTimer->data = (void*)timer_data;
uv_timer_init(&m_Impl, newTimer);
uv_timer_start(newTimer, &OnUVTimer, job.delay_ms, 0);
}
}
void
Loop::process_cancel_queue()
{
while(not m_timerCancelQueue.empty())
{
uint64_t job_id = m_timerCancelQueue.popFront();
auto itr = m_pendingCalls.find(job_id);
if(itr != m_pendingCalls.end())
{
m_pendingCalls.erase(itr);
}
}
}
void
Loop::do_timer_job(uint64_t job_id)
{
auto itr = m_pendingCalls.find(job_id);
if(itr != m_pendingCalls.end())
{
LogicCall(m_Logic, itr->second);
m_pendingCalls.erase(itr);
}
}
5 years ago
void
Loop::stop()
{
5 years ago
if(m_Run)
{
llarp::LogInfo("stopping event loop");
CloseAll();
// uv_stop(&m_Impl);
}
5 years ago
m_Run.store(false);
}
void
Loop::CloseAll()
{
llarp::LogInfo("Closing all handles");
uv_walk(
&m_Impl,
[](uv_handle_t* h, void*) {
if(uv_is_closing(h))
return;
if(h->data && uv_is_active(h) && h->type != UV_TIMER)
{
static_cast< glue* >(h->data)->Close();
}
},
nullptr);
}
void
Loop::stopped()
{
tick(50);
llarp::LogInfo("we have stopped");
5 years ago
}
bool
Loop::udp_listen(llarp_udp_io* udp, const sockaddr* src)
{
auto* impl = new udp_glue(&m_Impl, udp, src);
udp->impl = impl;
5 years ago
if(impl->Bind())
{
return true;
}
delete impl;
5 years ago
return false;
}
bool
Loop::add_ticker(std::function< void(void) > func)
{
auto* ticker = new ticker_glue(&m_Impl, func);
if(ticker->Start())
{
return true;
}
delete ticker;
return false;
}
5 years ago
bool
Loop::udp_close(llarp_udp_io* udp)
{
if(udp == nullptr)
return false;
auto* glue = static_cast< udp_glue* >(udp->impl);
5 years ago
if(glue == nullptr)
return false;
glue->Close();
return true;
}
bool
Loop::tun_listen(llarp_tun_io* tun)
{
#if defined(_WIN32) || defined(_WIN64)
(void)tun;
return false;
#else
auto* glue = new tun_glue(tun);
tun->impl = glue;
if(glue->Init(&m_Impl))
5 years ago
{
return true;
}
delete glue;
return false;
#endif
5 years ago
}
bool
Loop::tcp_listen(llarp_tcp_acceptor* tcp, const sockaddr* addr)
{
auto* glue = new conn_glue(&m_Impl, tcp, addr);
tcp->impl = glue;
5 years ago
if(glue->Server())
return true;
tcp->impl = nullptr;
delete glue;
return false;
5 years ago
}
5 years ago
bool
Loop::add_pipe(llarp_ev_pkt_pipe* p)
{
auto* glue = new pipe_glue(&m_Impl, p);
5 years ago
if(glue->Start())
return true;
delete glue;
return false;
}
void
Loop::call_soon(std::function< void(void) > f)
{
5 years ago
m_LogicCalls.tryPushBack(f);
uv_async_send(&m_LogicCaller);
}
5 years ago
} // namespace libuv
bool
llarp_ev_udp_recvmany(struct llarp_udp_io* u, struct llarp_pkt_list* pkts)
{
return static_cast< libuv::udp_glue* >(u->impl)->RecvMany(pkts);
}