diff --git a/CMakeLists.txt b/CMakeLists.txt index badf03bc6..f02783037 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -210,7 +210,7 @@ endif(JEMALLOC) # FS_LIB should resolve to nothing on all other platforms # it is only required on win32 -rick -set(LIBS ${MALLOC_LIB} ${FS_LIB}) +set(LIBS ${MALLOC_LIB} ${FS_LIB} uv) if(ANDROID) list(APPEND LIBS log) @@ -226,6 +226,7 @@ else() message(FATAL_ERROR "What operating system _are_ you building on/for?") endif() + set(LIBTUNTAP_SRC_BASE ${TT_ROOT}/tuntap.cpp ${TT_ROOT}/tuntap_log.cpp diff --git a/Makefile b/Makefile index 4ac8441c5..e4b913482 100644 --- a/Makefile +++ b/Makefile @@ -65,6 +65,8 @@ NETNS ?= OFF CROSS ?= OFF # build liblokinet-shared.so SHARED_LIB ?= OFF +# use libuv +LIBUV ?= ON # enable generating coverage COVERAGE ?= OFF COVERAGE_OUTDIR ?= "$(TMPDIR)/lokinet-coverage" @@ -92,11 +94,11 @@ ANALYZE_CONFIG_CMD = $(shell gecho -n "cd '$(BUILD_ROOT)' && " ; gecho -n "$(SCA COVERAGE_CONFIG_CMD = $(shell gecho -n "cd '$(BUILD_ROOT)' && " ; gecho -n "cmake -G'$(CMAKE_GEN)' -DCMAKE_CROSSCOMPILING=$(CROSS) -DSTATIC_LINK_RUNTIME=$(STATIC_LINK) -DUSE_NETNS=$(NETNS) -DUSE_AVX2=$(AVX2) -DNON_PC_TARGET=$(NON_PC_TARGET) -DWITH_SHARED=$(SHARED_LIB) -DWITH_COVERAGE=yes -DCMAKE_EXPORT_COMPILE_COMMANDS=ON '$(REPO)'") else -CONFIG_CMD = $(shell /bin/echo -n "cd '$(BUILD_ROOT)' && " ; /bin/echo -n "cmake -G'$(CMAKE_GEN)' -DCMAKE_CROSSCOMPILING=$(CROSS) -DSTATIC_LINK_RUNTIME=$(STATIC_LINK) -DUSE_NETNS=$(NETNS) -DUSE_AVX2=$(AVX2) -DNON_PC_TARGET=$(NON_PC_TARGET) -DWITH_SHARED=$(SHARED_LIB) -DCMAKE_EXPORT_COMPILE_COMMANDS=ON '$(REPO)'") +CONFIG_CMD = $(shell /bin/echo -n "cd '$(BUILD_ROOT)' && " ; /bin/echo -n "cmake -G'$(CMAKE_GEN)' -DCMAKE_CROSSCOMPILING=$(CROSS) -DSTATIC_LINK_RUNTIME=$(STATIC_LINK) -DUSE_NETNS=$(NETNS) -DUSE_AVX2=$(AVX2) -DNON_PC_TARGET=$(NON_PC_TARGET) -DWITH_SHARED=$(SHARED_LIB) -DUSE_LIBUV=$(LIBUV) -DCMAKE_EXPORT_COMPILE_COMMANDS=ON '$(REPO)'") -ANALYZE_CONFIG_CMD = $(shell /bin/echo -n "cd '$(BUILD_ROOT)' && " ; /bin/echo -n "$(SCAN_BUILD) cmake -G'$(CMAKE_GEN)' -DCMAKE_CROSSCOMPILING=$(CROSS) -DSTATIC_LINK_RUNTIME=$(STATIC_LINK) -DUSE_NETNS=$(NETNS) -DUSE_AVX2=$(AVX2) -DNON_PC_TARGET=$(NON_PC_TARGET) -DWITH_SHARED=$(SHARED_LIB) -DCMAKE_EXPORT_COMPILE_COMMANDS=ON '$(REPO)'") +ANALYZE_CONFIG_CMD = $(shell /bin/echo -n "cd '$(BUILD_ROOT)' && " ; /bin/echo -n "$(SCAN_BUILD) cmake -G'$(CMAKE_GEN)' -DCMAKE_CROSSCOMPILING=$(CROSS) -DSTATIC_LINK_RUNTIME=$(STATIC_LINK) -DUSE_NETNS=$(NETNS) -DUSE_AVX2=$(AVX2) -DNON_PC_TARGET=$(NON_PC_TARGET) -DWITH_SHARED=$(SHARED_LIB) -DUSE_LIBUV=$(LIBUV) -DCMAKE_EXPORT_COMPILE_COMMANDS=ON '$(REPO)'") -COVERAGE_CONFIG_CMD = $(shell /bin/echo -n "cd '$(BUILD_ROOT)' && " ; /bin/echo -n "cmake -G'$(CMAKE_GEN)' -DCMAKE_CROSSCOMPILING=$(CROSS) -DSTATIC_LINK_RUNTIME=$(STATIC_LINK) -DUSE_NETNS=$(NETNS) -DUSE_AVX2=$(AVX2) -DNON_PC_TARGET=$(NON_PC_TARGET) -DWITH_SHARED=$(SHARED_LIB) -DWITH_COVERAGE=yes -DCMAKE_EXPORT_COMPILE_COMMANDS=ON '$(REPO)'") +COVERAGE_CONFIG_CMD = $(shell /bin/echo -n "cd '$(BUILD_ROOT)' && " ; /bin/echo -n "cmake -G'$(CMAKE_GEN)' -DCMAKE_CROSSCOMPILING=$(CROSS) -DSTATIC_LINK_RUNTIME=$(STATIC_LINK) -DUSE_NETNS=$(NETNS) -DUSE_AVX2=$(AVX2) -DNON_PC_TARGET=$(NON_PC_TARGET) -DWITH_SHARED=$(SHARED_LIB) -DUSE_LIBUV=$(LIBUV) -DWITH_COVERAGE=yes -DCMAKE_EXPORT_COMPILE_COMMANDS=ON '$(REPO)'") endif TARGETS = $(REPO)/lokinet diff --git a/libabyss/main.cpp b/libabyss/main.cpp index 759edc431..cf4b30caf 100644 --- a/libabyss/main.cpp +++ b/libabyss/main.cpp @@ -68,8 +68,7 @@ struct DemoClient : public abyss::http::JSONRPC abyss::http::IRPCClientHandler* NewConn(abyss::http::ConnImpl* impl) { - return new DemoCall(impl, m_Logic, - std::bind(&llarp_ev_loop_stop, m_Loop.get())); + return new DemoCall(impl, m_Logic, std::bind(&llarp_ev_loop_stop, m_Loop)); } void diff --git a/llarp/CMakeLists.txt b/llarp/CMakeLists.txt index b4b62a2d0..e6d1bb711 100644 --- a/llarp/CMakeLists.txt +++ b/llarp/CMakeLists.txt @@ -73,6 +73,7 @@ set(LIB_PLATFORM_SRC # tun ${LIBTUNTAP_SRC} ${EV_SRC} + ev/ev_libuv.cpp ) if (WIN32) diff --git a/llarp/context.cpp b/llarp/context.cpp index b43932190..bd275abcf 100644 --- a/llarp/context.cpp +++ b/llarp/context.cpp @@ -251,8 +251,11 @@ __ ___ ____ _ _ ___ _ _ ____ { llarp::LogInfo(LLARP_VERSION, " ", LLARP_RELEASE_MOTTO); llarp::LogInfo("starting up"); +#if defined(WIN32) mainloop = llarp_make_ev_loop(); - +#else + mainloop = llarp_make_uv_loop(); +#endif // ensure worker thread pool if(!worker && !singleThreaded) worker.reset(llarp_init_threadpool(2, "llarp-worker")); @@ -421,7 +424,7 @@ __ ___ ____ _ _ ___ _ _ ____ { if(logic) logic->stop(); - llarp_ev_loop_stop(mainloop.get()); + llarp_ev_loop_stop(mainloop); Close(); } } diff --git a/llarp/ev/ev.cpp b/llarp/ev/ev.cpp index aa357b739..252f47668 100644 --- a/llarp/ev/ev.cpp +++ b/llarp/ev/ev.cpp @@ -57,6 +57,7 @@ llarp_ev_loop_run_single_process(llarp_ev_loop_ptr ev, } llarp::LogContext::Instance().logStream->Tick(ev->time_now()); } + ev->stopped(); } int @@ -78,7 +79,7 @@ llarp_ev_close_udp(struct llarp_udp_io *udp) } llarp_time_t -llarp_ev_loop_time_now_ms(const llarp_ev_loop_ptr &loop) +llarp_ev_loop_time_now_ms(llarp_ev_loop_ptr loop) { if(loop) return loop->time_now(); @@ -86,7 +87,7 @@ llarp_ev_loop_time_now_ms(const llarp_ev_loop_ptr &loop) } void -llarp_ev_loop_stop(struct llarp_ev_loop *loop) +llarp_ev_loop_stop(llarp_ev_loop_ptr loop) { loop->stop(); } @@ -95,30 +96,7 @@ int llarp_ev_udp_sendto(struct llarp_udp_io *udp, const sockaddr *to, const llarp_buffer_t &buf) { - auto ret = - static_cast< llarp::ev_io * >(udp->impl)->sendto(to, buf.base, buf.sz); -#ifndef _WIN32 - if(ret == -1 && errno != 0) - { -#else - if(ret == -1 && WSAGetLastError()) - { -#endif - -#ifndef _WIN32 - llarp::LogWarn("sendto failed ", strerror(errno)); - errno = 0; - } -#else - char ebuf[1024]; - int err = WSAGetLastError(); - FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, nullptr, err, LANG_NEUTRAL, ebuf, - 1024, nullptr); - llarp::LogWarn("sendto failed: ", ebuf); - WSASetLastError(0); - } -#endif - return ret; + return udp->sendto(udp, to, buf.base, buf.sz); } #include @@ -165,12 +143,7 @@ llarp_ev_add_tun(struct llarp_ev_loop *loop, struct llarp_tun_io *tun) llarp::LogDebug("IfName: ", tun->ifname); llarp::LogDebug("IfNMsk: ", tun->netmask); #ifndef _WIN32 - auto dev = loop->create_tun(tun); - tun->impl = dev; - if(dev) - { - return loop->add_ev(dev, false); - } + return loop->tun_listen(tun); #else UNREFERENCED_PARAMETER(loop); auto dev = new win32_tun_io(tun); @@ -267,13 +240,8 @@ bool llarp_tcp_serve(struct llarp_ev_loop *loop, struct llarp_tcp_acceptor *tcp, const struct sockaddr *bindaddr) { - tcp->loop = loop; - llarp::ev_io *impl = loop->bind_tcp(tcp, bindaddr); - if(impl) - { - return loop->add_ev(impl, false); - } - return false; + tcp->loop = loop; + return loop->tcp_listen(tcp, bindaddr); } void @@ -290,7 +258,7 @@ llarp_tcp_acceptor_close(struct llarp_tcp_acceptor *tcp) void llarp_tcp_conn_close(struct llarp_tcp_conn *conn) { - static_cast< llarp::tcp_conn * >(conn->impl)->_shouldClose = true; + conn->close(conn); } namespace llarp diff --git a/llarp/ev/ev.h b/llarp/ev/ev.h index 689482121..40285f465 100644 --- a/llarp/ev/ev.h +++ b/llarp/ev/ev.h @@ -27,6 +27,8 @@ typedef SSIZE_T ssize_t; #include #include +#include + /** * ev.h * @@ -47,9 +49,14 @@ namespace llarp using llarp_ev_loop_ptr = std::shared_ptr< llarp_ev_loop >; +/// make an event loop using our baked in event loop, ew. llarp_ev_loop_ptr llarp_make_ev_loop(); +/// make an event loop using libuv +llarp_ev_loop_ptr +llarp_make_uv_loop(); + // run mainloop void llarp_ev_loop_run_single_process(llarp_ev_loop_ptr ev, @@ -58,11 +65,11 @@ llarp_ev_loop_run_single_process(llarp_ev_loop_ptr ev, /// get the current time on the event loop llarp_time_t -llarp_ev_loop_time_now_ms(const llarp_ev_loop_ptr &ev); +llarp_ev_loop_time_now_ms(llarp_ev_loop_ptr ev); /// stop event loop and wait for it to complete all jobs void -llarp_ev_loop_stop(struct llarp_ev_loop *ev); +llarp_ev_loop_stop(llarp_ev_loop_ptr ev); /// UDP handling configuration struct llarp_udp_io @@ -78,6 +85,9 @@ struct llarp_udp_io /// sockaddr * is the source address void (*recvfrom)(struct llarp_udp_io *, const struct sockaddr *, ManagedBuffer); + /// set by parent + int (*sendto)(struct llarp_udp_io *, const struct sockaddr *, const byte_t *, + size_t); }; /// add UDP handler @@ -110,6 +120,8 @@ struct llarp_tcp_conn void (*read)(struct llarp_tcp_conn *, const llarp_buffer_t &); /// handle close event (free-ing is handled by event loop) void (*closed)(struct llarp_tcp_conn *); + /// explict close by user (set by parent) + void (*close)(struct llarp_tcp_conn *); /// handle event loop tick void (*tick)(struct llarp_tcp_conn *); }; @@ -132,6 +144,8 @@ struct llarp_tcp_connecter char remote[512]; /// userdata pointer void *user; + /// private implementation (dont set me) + void *impl; /// parent event loop (dont set me) struct llarp_ev_loop *loop; /// handle outbound connection made diff --git a/llarp/ev/ev.hpp b/llarp/ev/ev.hpp index a0d1e6f75..d5e22d8fa 100644 --- a/llarp/ev/ev.hpp +++ b/llarp/ev/ev.hpp @@ -531,6 +531,12 @@ namespace llarp // null if inbound otherwise outbound llarp_tcp_connecter* _conn; + static void + DoClose(llarp_tcp_conn* conn) + { + static_cast< tcp_conn* >(conn->impl)->_shouldClose = true; + } + /// inbound tcp_conn(llarp_ev_loop* loop, int fd) : ev_io(fd, new LosslessWriteQueue_t{}), _conn(nullptr) @@ -541,6 +547,7 @@ namespace llarp tcp.user = nullptr; tcp.read = nullptr; tcp.tick = nullptr; + tcp.close = &DoClose; } /// outbound @@ -560,6 +567,7 @@ namespace llarp tcp.user = nullptr; tcp.read = nullptr; tcp.tick = nullptr; + tcp.close = &DoClose; } virtual ~tcp_conn() @@ -713,7 +721,7 @@ struct llarp_ev_loop virtual bool running() const = 0; - void + virtual void update_time() { _now = llarp::time_now_ms(); @@ -725,6 +733,9 @@ struct llarp_ev_loop return _now; } + virtual void + stopped(){}; + /// return false on socket error (non blocking) virtual bool tcp_connect(llarp_tcp_connecter* tcp, const sockaddr* addr) = 0; @@ -738,15 +749,24 @@ struct llarp_ev_loop virtual bool udp_listen(llarp_udp_io* l, const sockaddr* src) = 0; - virtual llarp::ev_io* - create_udp(llarp_udp_io* l, const sockaddr* src) = 0; - virtual bool udp_close(llarp_udp_io* l) = 0; /// deregister event listener virtual bool close_ev(llarp::ev_io* ev) = 0; + virtual bool + tun_listen(llarp_tun_io* tun) + { + auto dev = create_tun(tun); + tun->impl = dev; + if(dev) + { + return add_ev(dev, false); + } + return false; + } + virtual llarp::ev_io* create_tun(llarp_tun_io* tun) = 0; @@ -757,6 +777,13 @@ struct llarp_ev_loop virtual bool add_ev(llarp::ev_io* ev, bool write) = 0; + virtual bool + tcp_listen(llarp_tcp_acceptor* tcp, const sockaddr* addr) + { + auto conn = bind_tcp(tcp, addr); + return conn && add_ev(conn, true); + } + virtual ~llarp_ev_loop() { } diff --git a/llarp/ev/ev_epoll.cpp b/llarp/ev/ev_epoll.cpp index b22ccaf89..0877f4ee6 100644 --- a/llarp/ev/ev_epoll.cpp +++ b/llarp/ev/ev_epoll.cpp @@ -309,12 +309,23 @@ llarp_epoll_loop::bind_tcp(llarp_tcp_acceptor* tcp, const sockaddr* bindaddr) return new llarp::tcp_serv(this, fd, tcp); } +static int +llarp_ev_epoll_sendto(struct llarp_udp_io* udp, const struct sockaddr* to, + const byte_t* pkt, size_t sz) +{ + const llarp::Addr toaddr(*to); + return ::sendto(udp->fd, pkt, sz, 0, toaddr, toaddr.SockLen()); +} + bool llarp_epoll_loop::udp_listen(llarp_udp_io* l, const sockaddr* src) { auto ev = create_udp(l, src); if(ev) - l->fd = ev->fd; + { + l->fd = ev->fd; + l->sendto = &llarp_ev_epoll_sendto; + } return ev && add_ev(ev, false); } diff --git a/llarp/ev/ev_kqueue.cpp b/llarp/ev/ev_kqueue.cpp index b2288df32..eb626fce5 100644 --- a/llarp/ev/ev_kqueue.cpp +++ b/llarp/ev/ev_kqueue.cpp @@ -492,12 +492,23 @@ llarp_kqueue_loop::udp_bind(const sockaddr* addr) return fd; } +static int +llarp_ev_kqueue_sendto(struct llarp_udp_io* udp, const struct sockaddr* to, + const byte_t* pkt, size_t sz) +{ + const llarp::Addr toaddr(*to); + return ::sendto(udp->fd, pkt, sz, 0, toaddr, toaddr.SockLen()); +} + bool llarp_kqueue_loop::udp_listen(llarp_udp_io* l, const sockaddr* src) { auto ev = create_udp(l, src); if(ev) - l->fd = ev->fd; + { + l->fd = ev->fd; + l->sendto = &llarp_ev_kqueue_sendto; + } return ev && add_ev(ev, false); } diff --git a/llarp/ev/ev_libuv.cpp b/llarp/ev/ev_libuv.cpp new file mode 100644 index 000000000..23fd7b7f6 --- /dev/null +++ b/llarp/ev/ev_libuv.cpp @@ -0,0 +1,478 @@ +#include "ev_libuv.hpp" +#include "net/net_addr.hpp" + +namespace libuv +{ + /// tcp connection glue between llarp and libuv + struct conn_glue + { + uv_tcp_t m_Handle; + uv_connect_t m_Connect; + llarp_tcp_connecter* const m_TCP; + llarp_tcp_acceptor* const m_Accept; + llarp_tcp_conn m_Conn; + llarp::Addr m_Addr; + + conn_glue(uv_loop_t* loop, llarp_tcp_connecter* tcp, const sockaddr* addr) + : m_TCP(tcp), m_Accept(nullptr), m_Addr(*addr) + { + m_Connect.data = this; + m_Handle.data = tcp; + uv_tcp_init(loop, &m_Handle); + } + + conn_glue(uv_loop_t* loop, llarp_tcp_acceptor* tcp, const sockaddr* addr) + : m_TCP(nullptr), m_Accept(tcp), m_Addr(*addr) + { + m_Handle.data = this; + uv_tcp_init(loop, &m_Handle); + } + + conn_glue(conn_glue* parent) : m_TCP(nullptr), m_Accept(nullptr) + { + uv_tcp_init(parent->m_Handle.loop, &m_Handle); + } + + static void + OnOutboundConnect(uv_connect_t* c, int status) + { + conn_glue* self = static_cast< conn_glue* >(c->data); + self->HandleConnectResult(status); + } + + bool + ConnectAsync() + { + return uv_tcp_connect(&m_Connect, &m_Handle, m_Addr, &OnOutboundConnect) + != -1; + } + + static void + ExplicitClose(llarp_tcp_conn* conn) + { + static_cast< conn_glue* >(conn->impl)->Close(); + } + + static void + OnRead(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) + { + static_cast< conn_glue* >(stream->data)->Read(buf->base, nread); + delete[] buf->base; + } + + static void + Alloc(uv_handle_t*, size_t suggested_size, uv_buf_t* buf) + { + buf->base = new char[suggested_size]; + buf->len = suggested_size; + } + + void + Read(const char* ptr, ssize_t sz) + { + const llarp_buffer_t buf(ptr, sz); + m_Conn.read(&m_Conn, buf); + } + + void + HandleConnectResult(int status) + { + if(m_TCP && m_TCP->connected) + { + if(status == 0) + { + m_Conn.impl = this; + m_Conn.loop = m_TCP->loop; + m_Conn.close = &ExplicitClose; + m_TCP->connected(m_TCP, &m_Conn); + uv_read_start(Stream(), &Alloc, &OnRead); + } + else if(m_TCP->error) + m_TCP->error(m_TCP); + } + } + + void + WriteFail() + { + m_Conn.close(&m_Conn); + } + + static void + OnWritten(uv_write_t* req, int status) + { + conn_glue* self = static_cast< conn_glue* >(req->data); + if(status) + self->WriteFail(); + delete req; + } + + uv_stream_t* + Stream() + { + return (uv_stream_t*)&m_Handle; + } + + bool + WriteAsync(const void* data, size_t sz) + { + uv_write_t* request = new uv_write_t(); + request->data = this; + auto buf = uv_buf_init((char*)data, sz); + return uv_write(request, Stream(), &buf, 1, &OnWritten) != -1; + } + + static void + OnClosed(uv_handle_t* h) + { + static_cast< conn_glue* >(h->data)->HandleClosed(); + } + + void + HandleClosed() + { + m_Handle.data = nullptr; + if(m_Accept && m_Accept->closed) + m_Accept->closed(m_Accept); + if(m_Conn.closed) + m_Conn.closed(&m_Conn); + delete this; + } + + void + Close() + { + uv_close((uv_handle_t*)&m_Handle, &OnClosed); + } + + static void + OnAccept(uv_stream_t* stream, int status) + { + if(status == 0) + { + static_cast< conn_glue* >(stream->data)->Accept(); + } + } + + void + Accept() + { + if(m_Accept && m_Accept->accepted) + { + conn_glue* child = new conn_glue(this); + uv_accept(Stream(), child->Stream()); + m_Accept->accepted(m_Accept, &child->m_Conn); + } + } + + bool + Server() + { + return uv_tcp_bind(&m_Handle, m_Addr, 0) == -1 + || uv_listen(Stream(), 5, &OnAccept) == -1; + } + }; + + struct udp_glue + { + uv_udp_t m_Handle; + uv_idle_t m_Ticker; + llarp_udp_io* const m_UDP; + llarp::Addr m_Addr; + + udp_glue(uv_loop_t* loop, llarp_udp_io* udp, const sockaddr* src) + : m_UDP(udp), m_Addr(*src) + { + m_Handle.data = this; + m_Ticker.data = this; + uv_udp_init(loop, &m_Handle); + uv_idle_init(loop, &m_Ticker); + } + + static void + Alloc(uv_handle_t*, size_t suggested_size, uv_buf_t* buf) + { + buf->base = new char[suggested_size]; + buf->len = suggested_size; + } + + static void + OnRecv(uv_udp_t* handle, ssize_t nread, const uv_buf_t* buf, + const struct sockaddr* addr, unsigned) + { + if(addr) + static_cast< udp_glue* >(handle->data)->RecvFrom(nread, buf, addr); + delete[] buf->base; + } + + void + RecvFrom(ssize_t sz, const uv_buf_t* buf, const struct sockaddr* fromaddr) + { + if(sz >= 0) + { + const size_t pktsz = sz; + const llarp_buffer_t pkt{(const byte_t*)buf->base, pktsz}; + m_UDP->recvfrom(m_UDP, fromaddr, ManagedBuffer{pkt}); + } + } + + static void + OnTick(uv_idle_t* t) + { + static_cast< udp_glue* >(t->data)->Tick(); + } + + void + Tick() + { + llarp::LogDebug("udp tick"); + if(m_UDP && m_UDP->tick) + m_UDP->tick(m_UDP); + } + + static int + SendTo(llarp_udp_io* udp, const sockaddr* to, const byte_t* ptr, size_t sz) + { + udp_glue* self = static_cast< udp_glue* >(udp->impl); + uv_buf_t buf = uv_buf_init((char*)ptr, sz); + return uv_udp_try_send(&self->m_Handle, &buf, 1, to); + } + + bool + Bind() + { + auto ret = uv_udp_bind(&m_Handle, m_Addr, 0); + if(ret) + { + llarp::LogError("failed to bind to ", m_Addr, " ", uv_strerror(ret)); + return false; + } + if(uv_udp_recv_start(&m_Handle, &Alloc, &OnRecv)) + { + llarp::LogError("failed to start recving packets via ", m_Addr); + return false; + } + if(uv_idle_start(&m_Ticker, &OnTick)) + { + llarp::LogError("failed to start ticker"); + return false; + } + if(uv_fileno((const uv_handle_t*)&m_Handle, &m_UDP->fd)) + return false; + m_UDP->sendto = &SendTo; + return true; + } + + static void + OnClosed(uv_handle_t* h) + { + udp_glue* glue = static_cast< udp_glue* >(h->data); + glue->m_UDP->impl = nullptr; + delete glue; + } + + void + Close() + { + uv_idle_stop(&m_Ticker); + uv_close((uv_handle_t*)&m_Handle, &OnClosed); + } + }; + + struct tun_glue + { + uv_poll_t m_Handle; + uv_idle_t m_Ticker; + llarp_tun_io* const m_Tun; + device* const m_Device; + tun_glue(llarp_tun_io* tun) : m_Tun(tun), m_Device(tuntap_init()) + { + m_Handle.data = this; + m_Ticker.data = this; + } + + ~tun_glue() + { + tuntap_destroy(m_Device); + } + + static void + OnTick(uv_timer_t* timer) + { + static_cast< tun_glue* >(timer->data)->Tick(); + } + + void + Tick() + { + if(m_Tun->tick) + m_Tun->tick(m_Tun); + if(m_Tun->before_write) + m_Tun->before_write(m_Tun); + } + + static void + OnClosed(uv_handle_t* h) + { + tun_glue* self = static_cast< tun_glue* >(h->data); + delete self; + } + + void + Close() + { + uv_close((uv_handle_t*)&m_Handle, &OnClosed); + } + + bool + Init(uv_loop_t* loop) + { + strncpy(m_Device->if_name, m_Tun->ifname, sizeof(m_Device->if_name)); + if(tuntap_start(m_Device, TUNTAP_MODE_TUNNEL, 0) == -1) + { + llarp::LogError("failed to start up ", m_Tun->ifname); + return false; + } + if(tuntap_up(m_Device) == -1) + { + llarp::LogError("failed to put up ", m_Tun->ifname); + return false; + } + if(m_Device->tun_fd == -1) + { + llarp::LogError("tun interface ", m_Tun->ifname, + " has invalid fd: ", m_Device->tun_fd); + return false; + } + if(uv_poll_init(loop, &m_Handle, m_Device->tun_fd) == -1) + { + llarp::LogError("failed to start polling on ", m_Tun->ifname); + return false; + } + if(uv_idle_init(loop, &m_Ticker) == -1) + { + llarp::LogError("failed to set up tun interface timer for ", + m_Tun->ifname); + return false; + } + return true; + } + }; + + bool + Loop::init() + { + m_Impl.reset(uv_loop_new()); + if(uv_loop_init(m_Impl.get()) == -1) + return false; + m_TickTimer.data = this; + m_Run.store(true); + return uv_timer_init(m_Impl.get(), &m_TickTimer) != -1; + } + + void + Loop::update_time() + { + uv_update_time(m_Impl.get()); + } + + bool + Loop::running() const + { + return m_Run.load(); + } + + llarp_time_t + Loop::time_now() const + { + return llarp::time_now_ms(); + } + + bool + Loop::tcp_connect(llarp_tcp_connecter* tcp, const sockaddr* addr) + { + conn_glue* impl = new conn_glue(m_Impl.get(), tcp, addr); + tcp->impl = impl; + delete impl; + tcp->impl = nullptr; + return false; + } + + static void + OnTickTimeout(uv_timer_t* timer) + { + uv_stop(timer->loop); + } + + int + Loop::tick(int ms) + { + uv_timer_start(&m_TickTimer, &OnTickTimeout, ms, 0); + uv_run(m_Impl.get(), UV_RUN_NOWAIT); + return 0; + } + + void + Loop::stop() + { + llarp::LogInfo("stopping event loop"); + m_Run.store(false); + } + + bool + Loop::udp_listen(llarp_udp_io* udp, const sockaddr* src) + { + udp_glue* impl = new udp_glue(m_Impl.get(), udp, src); + udp->impl = impl; + if(impl->Bind()) + { + m_CloseFuncs.emplace_back(std::bind(&udp_glue::Close, impl)); + return true; + } + return false; + } + + bool + Loop::udp_close(llarp_udp_io* udp) + { + if(udp == nullptr) + return false; + udp_glue* glue = static_cast< udp_glue* >(udp->impl); + if(glue == nullptr) + return false; + glue->Close(); + return true; + } + + bool + Loop::tun_listen(llarp_tun_io* tun) + { + tun_glue* glue = new tun_glue(tun); + tun->impl = glue; + if(glue->Init(m_Impl.get())) + { + m_CloseFuncs.emplace_back(std::bind(&tun_glue ::Close, glue)); + return true; + } + delete glue; + return false; + } + + bool + Loop::tcp_listen(llarp_tcp_acceptor* tcp, const sockaddr* addr) + { + conn_glue* glue = new conn_glue(m_Impl.get(), tcp, addr); + tcp->impl = glue; + return glue->Server(); + } + +} // namespace libuv + +llarp_ev_loop_ptr +llarp_make_uv_loop() +{ + auto loop = std::make_shared< libuv::Loop >(); + if(loop->init()) + return loop; + return nullptr; +} diff --git a/llarp/ev/ev_libuv.hpp b/llarp/ev/ev_libuv.hpp new file mode 100644 index 000000000..c4b538d80 --- /dev/null +++ b/llarp/ev/ev_libuv.hpp @@ -0,0 +1,104 @@ +#ifndef LLARP_EV_LIBUV_HPP +#include +#include +#include +#include + +namespace libuv +{ + struct Loop : public llarp_ev_loop + { + bool + init() override; + + int + run() override + { + return -11; + } + + bool + running() const override; + + void + update_time() override; + + llarp_time_t + time_now() const override; + + /// return false on socket error (non blocking) + bool + tcp_connect(llarp_tcp_connecter* tcp, const sockaddr* addr) override; + + int + tick(int ms) override; + + void + stop() override; + + void + stopped() override + { + for(const auto& func : m_CloseFuncs) + func(); + m_CloseFuncs.clear(); + llarp::LogInfo("event loop stopped"); + } + + bool + udp_listen(llarp_udp_io* l, const sockaddr* src) override; + + bool + udp_close(llarp_udp_io* l) override; + + /// deregister event listener + bool + close_ev(llarp::ev_io*) override + { + return true; + } + + bool + tun_listen(llarp_tun_io* tun) override; + + llarp::ev_io* + create_tun(llarp_tun_io*) override + { + return nullptr; + } + + bool + tcp_listen(llarp_tcp_acceptor* tcp, const sockaddr* addr) override; + + llarp::ev_io* + bind_tcp(llarp_tcp_acceptor*, const sockaddr*) override + { + return nullptr; + } + + /// register event listener + bool + add_ev(llarp::ev_io*, bool) override + { + return false; + } + + private: + struct DestructLoop + { + void + operator()(uv_loop_t* l) const + { + uv_loop_close(l); + } + }; + + std::unique_ptr< uv_loop_t, DestructLoop > m_Impl; + uv_timer_t m_TickTimer; + std::atomic< bool > m_Run; + std::vector< std::function< void(void) > > m_CloseFuncs; + }; + +} // namespace libuv + +#endif \ No newline at end of file diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index 7795095da..5dba92a0d 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -511,7 +511,7 @@ namespace llarp Router::Close() { LogInfo("closing router"); - llarp_ev_loop_stop(_netloop.get()); + llarp_ev_loop_stop(_netloop); inboundLinks.clear(); outboundLinks.clear(); disk.stop(); diff --git a/llarp/utp/linklayer.cpp b/llarp/utp/linklayer.cpp index 112569fcf..5f55d8b7d 100644 --- a/llarp/utp/linklayer.cpp +++ b/llarp/utp/linklayer.cpp @@ -43,73 +43,7 @@ namespace llarp if(l == nullptr) return 0; LogDebug("utp_sendto ", Addr(*arg->address), " ", arg->len, " bytes"); - // For whatever reason, the UTP_UDP_DONTFRAG flag is set - // on the socket itself....which isn't correct and causes - // winsock (at minimum) to reeee - // here, we check its value, then set fragmentation the _right_ - // way. Naturally, Linux has its own special procedure. - // Of course, the flag itself is cleared. -rick -#ifndef _WIN32 - // No practical method of doing this on NetBSD or Darwin - // without resorting to raw sockets -#if !(__NetBSD__ || __OpenBSD__ || (__APPLE__ && __MACH__)) -#ifndef __linux__ - if(arg->flags == 2) - { - int val = 1; - setsockopt(l->m_udp.fd, IPPROTO_IP, IP_DONTFRAGMENT, &val, sizeof(val)); - } - else - { - int val = 0; - setsockopt(l->m_udp.fd, IPPROTO_IP, IP_DONTFRAGMENT, &val, sizeof(val)); - } -#else - if(arg->flags == 2) - { - int val = IP_PMTUDISC_DO; - setsockopt(l->m_udp.fd, IPPROTO_IP, IP_MTU_DISCOVER, &val, sizeof(val)); - } - else - { - int val = IP_PMTUDISC_DONT; - setsockopt(l->m_udp.fd, IPPROTO_IP, IP_MTU_DISCOVER, &val, sizeof(val)); - } -#endif -#endif - arg->flags = 0; - if(::sendto(l->m_udp.fd, (char*)arg->buf, arg->len, arg->flags, - arg->address, arg->address_len) - == -1 - && errno) -#else - if(arg->flags == 2) - { - char val = 1; - setsockopt(l->m_udp.fd, IPPROTO_IP, IP_DONTFRAGMENT, &val, sizeof(val)); - } - else - { - char val = 0; - setsockopt(l->m_udp.fd, IPPROTO_IP, IP_DONTFRAGMENT, &val, sizeof(val)); - } - arg->flags = 0; - if(::sendto(l->m_udp.fd, (char*)arg->buf, arg->len, arg->flags, - arg->address, arg->address_len) - == -1) -#endif - { -#ifdef _WIN32 - char buf[1024]; - int err = WSAGetLastError(); - FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, nullptr, err, LANG_NEUTRAL, - buf, 1024, nullptr); - LogError("sendto failed: ", buf); -#else - LogError("sendto failed: ", strerror(errno)); -#endif - } - return 0; + return l->m_udp.sendto(&l->m_udp, arg->address, arg->buf, arg->len); } uint64 diff --git a/test/ev/test_ev_loop.cpp b/test/ev/test_ev_loop.cpp index 1fef5030e..bf67e66d5 100644 --- a/test/ev/test_ev_loop.cpp +++ b/test/ev/test_ev_loop.cpp @@ -42,7 +42,7 @@ struct EventLoopTest : public ::testing::Test void Stop() { - llarp_ev_loop_stop(loop.get()); + llarp_ev_loop_stop(loop); } void diff --git a/test/link/test_llarp_link.cpp b/test/link/test_llarp_link.cpp index 93671555a..58ba5b567 100644 --- a/test/link/test_llarp_link.cpp +++ b/test/link/test_llarp_link.cpp @@ -121,7 +121,7 @@ struct LinkLayerTest : public test::LlarpTest< NoOpCrypto > oldRCLifetime = RouterContact::Lifetime; RouterContact::IgnoreBogons = true; RouterContact::Lifetime = 500; - netLoop = llarp_make_ev_loop(); + netLoop = llarp_make_uv_loop(); m_logic.reset(new Logic()); } @@ -141,6 +141,7 @@ struct LinkLayerTest : public test::LlarpTest< NoOpCrypto > { if(left) return; + llarp::LogInfo("timed out test"); static_cast< LinkLayerTest* >(u)->Stop(); } @@ -154,7 +155,7 @@ struct LinkLayerTest : public test::LlarpTest< NoOpCrypto > void Stop() { - llarp_ev_loop_stop(netLoop.get()); + llarp_ev_loop_stop(netLoop); } bool diff --git a/test/test_libabyss.cpp b/test/test_libabyss.cpp index c989e2fe9..c1f0e5495 100644 --- a/test/test_libabyss.cpp +++ b/test/test_libabyss.cpp @@ -72,7 +72,7 @@ struct AbyssTestBase : public ::testing::Test Stop() { llarp::LogDebug("test case Stop() called"); - llarp_ev_loop_stop(loop.get()); + llarp_ev_loop_stop(loop); } void