From d9cee71cf50667883430ed5aec3793bb12ec96fc Mon Sep 17 00:00:00 2001 From: dr7ana Date: Wed, 10 Apr 2024 08:46:44 -0700 Subject: [PATCH] TCP listener laid out - Was able to repurpose a lot of the code from the ill-fated ev-dns implementation - TCP{Handle,Socket} implemented to parallelize UDP{Handle,Socket} in functionality - TCP implementation takes in a callback that receives an IPPacket; can also be easily reconfigured to hanle a UDPPacket by simply changing the callback type and the constructor called in llarp/ev/tcp.cpp::tcp_read_cb() --- llarp/CMakeLists.txt | 3 +- llarp/address/ip_packet.cpp | 35 +++++--- llarp/address/ip_packet.hpp | 1 + llarp/dns/server.cpp | 2 +- llarp/ev/loop.hpp | 21 ++++- llarp/ev/tcp.cpp | 173 ++++++++++++++++++++++++++++++++++++ llarp/ev/tcp.hpp | 80 +++++++++++++++++ llarp/ev/types.hpp | 3 + llarp/ev/udp.cpp | 8 +- llarp/ev/udp.hpp | 9 +- llarp/link/tunnel.cpp | 1 - llarp/win32/exec.cpp | 2 +- llarp/win32/wintun.cpp | 2 +- 13 files changed, 315 insertions(+), 25 deletions(-) create mode 100644 llarp/ev/tcp.cpp create mode 100644 llarp/ev/tcp.hpp diff --git a/llarp/CMakeLists.txt b/llarp/CMakeLists.txt index 104d04505..bc4264065 100644 --- a/llarp/CMakeLists.txt +++ b/llarp/CMakeLists.txt @@ -94,8 +94,9 @@ lokinet_add_library(lokinet-time-place address/keys.cpp address/utils.cpp - ev/udp.cpp ev/loop.cpp + ev/tcp.cpp + ev/udp.cpp net/ip.cpp net/net_int.cpp diff --git a/llarp/address/ip_packet.cpp b/llarp/address/ip_packet.cpp index 38df49334..6111b6d97 100644 --- a/llarp/address/ip_packet.cpp +++ b/llarp/address/ip_packet.cpp @@ -29,20 +29,37 @@ namespace llarp {} IPPacket::IPPacket(std::vector data) : IPPacket{data.data(), data.size()} - { - _init_internals(); - } + {} IPPacket::IPPacket(const uint8_t* buf, size_t len) { if (len < MIN_PACKET_SIZE) { _buf.resize(0); - return; + } + else + { + _buf.resize(len); + std::copy_n(buf, len, _buf.data()); } - _buf.resize(len); - std::copy_n(buf, len, _buf.data()); + _init_internals(); + } + + IPPacket IPPacket::from_udp(UDPPacket pkt) + { + auto& data = pkt.data; + return IPPacket{reinterpret_cast(data.data()), data.size()}; + } + + std::optional IPPacket::from_buffer(const uint8_t* buf, size_t len) + { + std::optional ret = std::nullopt; + + if (IPPacket b; b.load(buf, len)) + ret.emplace(std::move(b)); + + return ret; } void IPPacket::_init_internals() @@ -281,12 +298,6 @@ namespace llarp return std::nullopt; } - IPPacket IPPacket::from_udp(UDPPacket pkt) - { - auto& data = pkt.data; - return IPPacket{reinterpret_cast(data.data()), data.size()}; - } - UDPPacket IPPacket::make_udp() { return UDPPacket{oxen::quic::Path{_src_addr, _dst_addr}, bview()}; diff --git a/llarp/address/ip_packet.hpp b/llarp/address/ip_packet.hpp index 9f03d47e7..5b9aaf575 100644 --- a/llarp/address/ip_packet.hpp +++ b/llarp/address/ip_packet.hpp @@ -38,6 +38,7 @@ namespace llarp explicit IPPacket(const uint8_t* buf, size_t len); static IPPacket from_udp(UDPPacket pkt); + static std::optional from_buffer(const uint8_t* buf, size_t len); UDPPacket make_udp(); diff --git a/llarp/dns/server.cpp b/llarp/dns/server.cpp index 635d6d7c9..a5e422269 100644 --- a/llarp/dns/server.cpp +++ b/llarp/dns/server.cpp @@ -35,7 +35,7 @@ namespace llarp::dns oxen::quic::Address _local_addr; public: - explicit UDPReader(Server& dns, const loop_ptr& loop, oxen::quic::Address bind) : _dns{dns} + explicit UDPReader(Server& dns, const std::shared_ptr& loop, oxen::quic::Address bind) : _dns{dns} { _udp = std::make_unique(loop, bind, [&](UDPPacket pkt) { auto& src = pkt.path.local; diff --git a/llarp/ev/loop.hpp b/llarp/ev/loop.hpp index d7dee6b8a..e09e692d8 100644 --- a/llarp/ev/loop.hpp +++ b/llarp/ev/loop.hpp @@ -1,6 +1,6 @@ #pragma once -#include "udp.hpp" +#include "types.hpp" #include #include @@ -98,12 +98,31 @@ namespace llarp true); } + // Returns a pointer deleter that defers invocation of a custom deleter to the event loop + template + auto wrapped_deleter(Callable&& f) + { + return _loop->wrapped_deleter(std::forward(f)); + } + + // Similar in concept to std::make_shared, but it creates the shared pointer with a + // custom deleter that dispatches actual object destruction to the network's event loop for + // thread safety. template std::shared_ptr make_shared(Args&&... args) { return _loop->make_shared(std::forward(args)...); } + // Similar to the above make_shared, but instead of forwarding arguments for the + // construction of the object, it creates the shared_ptr from the already created object ptr + // and wraps the object's deleter in a wrapped_deleter + template + std::shared_ptr shared_ptr(T* obj, Callable&& deleter) + { + return _loop->shared_ptr(std::forward(obj), std::forward(deleter)); + } + template auto make_caller(Callable f) { diff --git a/llarp/ev/tcp.cpp b/llarp/ev/tcp.cpp new file mode 100644 index 000000000..0d0f7f8f1 --- /dev/null +++ b/llarp/ev/tcp.cpp @@ -0,0 +1,173 @@ +#include "tcp.hpp" + +#include +#include + +namespace llarp +{ + static auto logcat = oxen::log::Cat("ev-tcp"); + + constexpr auto evconnlistener_deleter = [](::evconnlistener *e) { + log::trace(logcat, "Invoking evconnlistener deleter!"); + if (e) + evconnlistener_free(e); + }; + + /// Checks rv for being -1 and, if so, raises a system_error from errno. Otherwise returns it. + static int check_rv(int rv) + { +#ifdef _WIN32 + if (rv == SOCKET_ERROR) + throw std::system_error{WSAGetLastError(), std::system_category()}; +#else + if (rv == -1) + throw std::system_error{errno, std::system_category()}; +#endif + return rv; + } + + static void tcp_read_cb(struct bufferevent *bev, void *user_arg) + { + std::array buf{}; + + // Load data from input buffer to local buffer + auto nwrite = bufferevent_read(bev, buf.data(), buf.size()); + + log::trace(logcat, "TCP socket received {}B: {}", nwrite, buffer_printer{buf}); + + auto *handle = reinterpret_cast(user_arg); + assert(handle); + + auto bfd = bufferevent_getfd(bev); + + if (auto maybe_str = handle->get_socket_stream(bfd)) + { + log::trace(logcat, "TCP handle passing received data to corresponding stream!"); + maybe_str->send(ustring_view{buf.data(), nwrite}); + } + else + { + log::error(logcat, "TCP handle could not find corresponding stream to fd:{}", bfd); + handle->close_socket(bfd); + } + }; + + static void tcp_event_cb(struct bufferevent *bev, short what, void *user_arg) + { + // This void pointer is the TCPSocket object. This opens the door for closing the TCP setup in case of any + // failures + (void)user_arg; + + if (what & BEV_EVENT_CONNECTED) + { + log::info(logcat, "TCP connect operation finished!"); + } + if (what & BEV_EVENT_ERROR) + { + log::critical(logcat, "TCP listener encountered error from bufferevent"); + } + if (what & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) + { + log::debug(logcat, "TCP listener freeing bufferevent..."); + bufferevent_free(bev); + } + }; + + static void tcp_listen_cb( + struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *src, int socklen, void *user_arg) + { + oxen::quic::Address source{src, static_cast(socklen)}; + log::debug(logcat, "TCP RECEIVED -- SRC:{}", source); + + auto *b = evconnlistener_get_base(listener); + auto *bevent = bufferevent_socket_new(b, fd, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE); + + bufferevent_setcb(bevent, tcp_read_cb, nullptr, tcp_event_cb, user_arg); + bufferevent_enable(bevent, EV_READ | EV_WRITE); + + auto *handle = reinterpret_cast(user_arg); + assert(handle); + }; + + static void tcp_err_cb(struct evconnlistener * /* e */, void *user_arg) + { + int ec = EVUTIL_SOCKET_ERROR(); + log::critical(logcat, "TCP LISTENER RECEIVED ERROR CODE {}:{}", ec, evutil_socket_error_to_string(ec)); + + auto *handle = reinterpret_cast(user_arg); + assert(handle); + + // DISCUSS: close everything here? + }; + + TCPSocket::TCPSocket(struct bufferevent *_bev, const oxen::quic::Address &_src) : bev{_bev}, src{_src} + {} + + TCPSocket::~TCPSocket() + { + bufferevent_free(bev); + log::debug(logcat, "TCPSocket shut down!"); + } + + TCPHandle::TCPHandle(const std::shared_ptr &ev_loop, const oxen::quic::Address &bind, rcv_data_hook cb) + : _ev{ev_loop}, _receive_cb{std::move(cb)} + { + assert(_ev); + + if (!_receive_cb) + throw std::logic_error{"TCPSocket construction requires a non-empty receive callback"}; + + _init_internals(bind); + } + + std::shared_ptr TCPHandle::get_socket_stream(evutil_socket_t fd) + { + if (auto itr = routing.find(fd); itr != routing.end()) + { + if (auto str = itr->second->stream.lock()) + return str; + } + + return nullptr; + } + + void TCPHandle::close_socket(evutil_socket_t fd) + { + routing.erase(fd); + } + + void TCPHandle::_init_internals(const oxen::quic::Address &bind) + { + sockaddr_in _tcp{}; + _tcp.sin_family = AF_INET; + _tcp.sin_addr.s_addr = INADDR_ANY; + _tcp.sin_port = htons(bind.port()); + + _tcp_listener = _ev->shared_ptr( + evconnlistener_new_bind( + _ev->loop().get(), + tcp_listen_cb, + this, + LEV_OPT_CLOSE_ON_FREE | LEV_OPT_THREADSAFE | LEV_OPT_REUSEABLE, + -1, + reinterpret_cast(&_tcp), + sizeof(sockaddr)), + evconnlistener_deleter); + + if (not _tcp_listener) + { + throw std::runtime_error{ + "TCP listener construction failed: {}"_format(evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()))}; + } + + _sock = evconnlistener_get_fd(_tcp_listener.get()); + check_rv(getsockname(_sock, _bound, _bound.socklen_ptr())); + evconnlistener_set_error_cb(_tcp_listener.get(), tcp_err_cb); + } + + TCPHandle::~TCPHandle() + { + _tcp_listener.reset(); + log::debug(logcat, "TCPHandle shut down!"); + } +} // namespace llarp diff --git a/llarp/ev/tcp.hpp b/llarp/ev/tcp.hpp new file mode 100644 index 000000000..bbd62b418 --- /dev/null +++ b/llarp/ev/tcp.hpp @@ -0,0 +1,80 @@ +#pragma once + +#include "loop.hpp" + +#include +#include + +extern "C" +{ +#include +#include +#include +#include +} + +namespace llarp +{ + class TCPHandle; + + struct TCPSocket + { + TCPSocket() = delete; + + TCPSocket(struct bufferevent* _bev, const oxen::quic::Address& _src); + + /// Non-copyable and non-moveable + TCPSocket(const TCPSocket& s) = delete; + TCPSocket& operator=(const TCPSocket& s) = delete; + TCPSocket(TCPSocket&& s) = delete; + TCPSocket& operator=(TCPSocket&& s) = delete; + + ~TCPSocket(); + + struct bufferevent* bev; + oxen::quic::Address src; + + std::weak_ptr stream; + }; + + class TCPHandle + { + public: + using socket_t = +#ifndef _WIN32 + int +#else + SOCKET +#endif + ; + TCPHandle() = delete; + + explicit TCPHandle(const std::shared_ptr& ev, const oxen::quic::Address& bind, rcv_data_hook cb); + + ~TCPHandle(); + + private: + std::shared_ptr _ev; + std::shared_ptr<::evconnlistener> _tcp_listener; + + socket_t _sock; + oxen::quic::Address _bound; + rcv_data_hook _receive_cb; + + std::unordered_map> routing; + + void _init_internals(const oxen::quic::Address& bind); + + public: + // void map_buffer_socket(evutil_socket_t fd) + + std::shared_ptr get_socket_stream(evutil_socket_t fd); + + void close_socket(evutil_socket_t fd); + + oxen::quic::Address bind() + { + return _bound; + } + }; +} // namespace llarp diff --git a/llarp/ev/types.hpp b/llarp/ev/types.hpp index 608b45889..ee39e86a8 100644 --- a/llarp/ev/types.hpp +++ b/llarp/ev/types.hpp @@ -1,5 +1,7 @@ #pragma once +#include + #include namespace llarp @@ -10,6 +12,7 @@ namespace llarp using udp_pkt_hook = std::function; using ip_pkt_hook = std::function; + using rcv_data_hook = std::function; using UDPSocket = oxen::quic::UDPSocket; diff --git a/llarp/ev/udp.cpp b/llarp/ev/udp.cpp index bd92752cf..74628dd8f 100644 --- a/llarp/ev/udp.cpp +++ b/llarp/ev/udp.cpp @@ -2,7 +2,7 @@ namespace llarp { - static auto logcat = log::Cat("UDP"); + static auto logcat = log::Cat("ev-udp"); inline constexpr size_t MAX_BATCH = #if defined(OXEN_LIBQUIC_UDP_SENDMMSG) || defined(OXEN_LIBQUIC_UDP_GSO) @@ -11,10 +11,10 @@ namespace llarp 1; #endif - UDPHandle::UDPHandle(loop_ptr ev, const oxen::quic::Address& bind, udp_pkt_hook cb) + UDPHandle::UDPHandle(const std::shared_ptr& ev, const oxen::quic::Address& bind, udp_pkt_hook cb) + : _loop{ev} { - socket = std::make_unique(ev.get(), bind, std::move(cb)); - + socket = std::make_unique(ev->loop().get(), bind, std::move(cb)); _local = socket->address(); } diff --git a/llarp/ev/udp.hpp b/llarp/ev/udp.hpp index 35bbb2708..713577d56 100644 --- a/llarp/ev/udp.hpp +++ b/llarp/ev/udp.hpp @@ -1,20 +1,23 @@ #pragma once -#include "types.hpp" +#include "loop.hpp" #include #include namespace llarp { - struct UDPHandle + class EventLoop; + + class UDPHandle { public: UDPHandle() = delete; - explicit UDPHandle(loop_ptr ev, const oxen::quic::Address& bind, udp_pkt_hook cb); + explicit UDPHandle(const std::shared_ptr& ev, const oxen::quic::Address& bind, udp_pkt_hook cb); ~UDPHandle(); private: + std::shared_ptr _loop; std::unique_ptr socket; oxen::quic::Address _local; diff --git a/llarp/link/tunnel.cpp b/llarp/link/tunnel.cpp index 8db2375a4..46ad4554d 100644 --- a/llarp/link/tunnel.cpp +++ b/llarp/link/tunnel.cpp @@ -1,6 +1,5 @@ #include "tunnel.hpp" -#include #include #include #include diff --git a/llarp/win32/exec.cpp b/llarp/win32/exec.cpp index 04dad5d89..756051849 100644 --- a/llarp/win32/exec.cpp +++ b/llarp/win32/exec.cpp @@ -10,7 +10,7 @@ namespace llarp::win32 { namespace { - auto logcat = log::Cat("win32:exec"); + static auto logcat = log::Cat("win32:exec"); /// get the directory for system32 which contains all the executables we use std::string SystemExeDir() diff --git a/llarp/win32/wintun.cpp b/llarp/win32/wintun.cpp index aa6d7ee89..88d136ee9 100644 --- a/llarp/win32/wintun.cpp +++ b/llarp/win32/wintun.cpp @@ -23,7 +23,7 @@ namespace llarp::win32 { namespace { - auto logcat = log::Cat("wintun"); + static auto logcat = log::Cat("wintun"); constexpr auto PoolName = "lokinet"; WINTUN_CREATE_ADAPTER_FUNC* create_adapter = nullptr;