TCP listener laid out

- Was able to repurpose a lot of the code from the ill-fated ev-dns implementation
- TCP{Handle,Socket} implemented to parallelize UDP{Handle,Socket} in functionality
- TCP implementation takes in a callback that receives an IPPacket; can also be easily reconfigured to hanle a UDPPacket by simply changing the callback type and the constructor called in llarp/ev/tcp.cpp::tcp_read_cb()
This commit is contained in:
dr7ana 2024-04-10 08:46:44 -07:00
parent 9e6a334da3
commit d9cee71cf5
13 changed files with 315 additions and 25 deletions

View File

@ -94,8 +94,9 @@ lokinet_add_library(lokinet-time-place
address/keys.cpp
address/utils.cpp
ev/udp.cpp
ev/loop.cpp
ev/tcp.cpp
ev/udp.cpp
net/ip.cpp
net/net_int.cpp

View File

@ -29,20 +29,37 @@ namespace llarp
{}
IPPacket::IPPacket(std::vector<uint8_t> data) : IPPacket{data.data(), data.size()}
{
_init_internals();
}
{}
IPPacket::IPPacket(const uint8_t* buf, size_t len)
{
if (len < MIN_PACKET_SIZE)
{
_buf.resize(0);
return;
}
else
{
_buf.resize(len);
std::copy_n(buf, len, _buf.data());
}
_buf.resize(len);
std::copy_n(buf, len, _buf.data());
_init_internals();
}
IPPacket IPPacket::from_udp(UDPPacket pkt)
{
auto& data = pkt.data;
return IPPacket{reinterpret_cast<const unsigned char*>(data.data()), data.size()};
}
std::optional<IPPacket> IPPacket::from_buffer(const uint8_t* buf, size_t len)
{
std::optional<IPPacket> ret = std::nullopt;
if (IPPacket b; b.load(buf, len))
ret.emplace(std::move(b));
return ret;
}
void IPPacket::_init_internals()
@ -281,12 +298,6 @@ namespace llarp
return std::nullopt;
}
IPPacket IPPacket::from_udp(UDPPacket pkt)
{
auto& data = pkt.data;
return IPPacket{reinterpret_cast<const unsigned char*>(data.data()), data.size()};
}
UDPPacket IPPacket::make_udp()
{
return UDPPacket{oxen::quic::Path{_src_addr, _dst_addr}, bview()};

View File

@ -38,6 +38,7 @@ namespace llarp
explicit IPPacket(const uint8_t* buf, size_t len);
static IPPacket from_udp(UDPPacket pkt);
static std::optional<IPPacket> from_buffer(const uint8_t* buf, size_t len);
UDPPacket make_udp();

View File

@ -35,7 +35,7 @@ namespace llarp::dns
oxen::quic::Address _local_addr;
public:
explicit UDPReader(Server& dns, const loop_ptr& loop, oxen::quic::Address bind) : _dns{dns}
explicit UDPReader(Server& dns, const std::shared_ptr<EventLoop>& loop, oxen::quic::Address bind) : _dns{dns}
{
_udp = std::make_unique<UDPHandle>(loop, bind, [&](UDPPacket pkt) {
auto& src = pkt.path.local;

View File

@ -1,6 +1,6 @@
#pragma once
#include "udp.hpp"
#include "types.hpp"
#include <llarp/net/interface_info.hpp>
#include <llarp/util/buffer.hpp>
@ -98,12 +98,31 @@ namespace llarp
true);
}
// Returns a pointer deleter that defers invocation of a custom deleter to the event loop
template <typename T, typename Callable>
auto wrapped_deleter(Callable&& f)
{
return _loop->wrapped_deleter<T>(std::forward<Callable>(f));
}
// Similar in concept to std::make_shared<T>, but it creates the shared pointer with a
// custom deleter that dispatches actual object destruction to the network's event loop for
// thread safety.
template <typename T, typename... Args>
std::shared_ptr<T> make_shared(Args&&... args)
{
return _loop->make_shared<T>(std::forward<Args>(args)...);
}
// Similar to the above make_shared, but instead of forwarding arguments for the
// construction of the object, it creates the shared_ptr from the already created object ptr
// and wraps the object's deleter in a wrapped_deleter
template <typename T, typename Callable>
std::shared_ptr<T> shared_ptr(T* obj, Callable&& deleter)
{
return _loop->shared_ptr<T, Callable>(std::forward<T>(obj), std::forward<Callable>(deleter));
}
template <typename Callable>
auto make_caller(Callable f)
{

173
llarp/ev/tcp.cpp Normal file
View File

@ -0,0 +1,173 @@
#include "tcp.hpp"
#include <llarp/address/ip_packet.hpp>
#include <llarp/util/logging/buffer.hpp>
namespace llarp
{
static auto logcat = oxen::log::Cat("ev-tcp");
constexpr auto evconnlistener_deleter = [](::evconnlistener *e) {
log::trace(logcat, "Invoking evconnlistener deleter!");
if (e)
evconnlistener_free(e);
};
/// Checks rv for being -1 and, if so, raises a system_error from errno. Otherwise returns it.
static int check_rv(int rv)
{
#ifdef _WIN32
if (rv == SOCKET_ERROR)
throw std::system_error{WSAGetLastError(), std::system_category()};
#else
if (rv == -1)
throw std::system_error{errno, std::system_category()};
#endif
return rv;
}
static void tcp_read_cb(struct bufferevent *bev, void *user_arg)
{
std::array<uint8_t, 2048> buf{};
// Load data from input buffer to local buffer
auto nwrite = bufferevent_read(bev, buf.data(), buf.size());
log::trace(logcat, "TCP socket received {}B: {}", nwrite, buffer_printer{buf});
auto *handle = reinterpret_cast<TCPHandle *>(user_arg);
assert(handle);
auto bfd = bufferevent_getfd(bev);
if (auto maybe_str = handle->get_socket_stream(bfd))
{
log::trace(logcat, "TCP handle passing received data to corresponding stream!");
maybe_str->send(ustring_view{buf.data(), nwrite});
}
else
{
log::error(logcat, "TCP handle could not find corresponding stream to fd:{}", bfd);
handle->close_socket(bfd);
}
};
static void tcp_event_cb(struct bufferevent *bev, short what, void *user_arg)
{
// This void pointer is the TCPSocket object. This opens the door for closing the TCP setup in case of any
// failures
(void)user_arg;
if (what & BEV_EVENT_CONNECTED)
{
log::info(logcat, "TCP connect operation finished!");
}
if (what & BEV_EVENT_ERROR)
{
log::critical(logcat, "TCP listener encountered error from bufferevent");
}
if (what & (BEV_EVENT_EOF | BEV_EVENT_ERROR))
{
log::debug(logcat, "TCP listener freeing bufferevent...");
bufferevent_free(bev);
}
};
static void tcp_listen_cb(
struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *src, int socklen, void *user_arg)
{
oxen::quic::Address source{src, static_cast<socklen_t>(socklen)};
log::debug(logcat, "TCP RECEIVED -- SRC:{}", source);
auto *b = evconnlistener_get_base(listener);
auto *bevent = bufferevent_socket_new(b, fd, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE);
bufferevent_setcb(bevent, tcp_read_cb, nullptr, tcp_event_cb, user_arg);
bufferevent_enable(bevent, EV_READ | EV_WRITE);
auto *handle = reinterpret_cast<TCPHandle *>(user_arg);
assert(handle);
};
static void tcp_err_cb(struct evconnlistener * /* e */, void *user_arg)
{
int ec = EVUTIL_SOCKET_ERROR();
log::critical(logcat, "TCP LISTENER RECEIVED ERROR CODE {}:{}", ec, evutil_socket_error_to_string(ec));
auto *handle = reinterpret_cast<TCPHandle *>(user_arg);
assert(handle);
// DISCUSS: close everything here?
};
TCPSocket::TCPSocket(struct bufferevent *_bev, const oxen::quic::Address &_src) : bev{_bev}, src{_src}
{}
TCPSocket::~TCPSocket()
{
bufferevent_free(bev);
log::debug(logcat, "TCPSocket shut down!");
}
TCPHandle::TCPHandle(const std::shared_ptr<EventLoop> &ev_loop, const oxen::quic::Address &bind, rcv_data_hook cb)
: _ev{ev_loop}, _receive_cb{std::move(cb)}
{
assert(_ev);
if (!_receive_cb)
throw std::logic_error{"TCPSocket construction requires a non-empty receive callback"};
_init_internals(bind);
}
std::shared_ptr<oxen::quic::Stream> TCPHandle::get_socket_stream(evutil_socket_t fd)
{
if (auto itr = routing.find(fd); itr != routing.end())
{
if (auto str = itr->second->stream.lock())
return str;
}
return nullptr;
}
void TCPHandle::close_socket(evutil_socket_t fd)
{
routing.erase(fd);
}
void TCPHandle::_init_internals(const oxen::quic::Address &bind)
{
sockaddr_in _tcp{};
_tcp.sin_family = AF_INET;
_tcp.sin_addr.s_addr = INADDR_ANY;
_tcp.sin_port = htons(bind.port());
_tcp_listener = _ev->shared_ptr<struct evconnlistener>(
evconnlistener_new_bind(
_ev->loop().get(),
tcp_listen_cb,
this,
LEV_OPT_CLOSE_ON_FREE | LEV_OPT_THREADSAFE | LEV_OPT_REUSEABLE,
-1,
reinterpret_cast<sockaddr *>(&_tcp),
sizeof(sockaddr)),
evconnlistener_deleter);
if (not _tcp_listener)
{
throw std::runtime_error{
"TCP listener construction failed: {}"_format(evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()))};
}
_sock = evconnlistener_get_fd(_tcp_listener.get());
check_rv(getsockname(_sock, _bound, _bound.socklen_ptr()));
evconnlistener_set_error_cb(_tcp_listener.get(), tcp_err_cb);
}
TCPHandle::~TCPHandle()
{
_tcp_listener.reset();
log::debug(logcat, "TCPHandle shut down!");
}
} // namespace llarp

80
llarp/ev/tcp.hpp Normal file
View File

@ -0,0 +1,80 @@
#pragma once
#include "loop.hpp"
#include <llarp/util/buffer.hpp>
#include <llarp/util/logging.hpp>
extern "C"
{
#include <arpa/inet.h>
#include <event2/buffer.h>
#include <event2/bufferevent.h>
#include <event2/listener.h>
}
namespace llarp
{
class TCPHandle;
struct TCPSocket
{
TCPSocket() = delete;
TCPSocket(struct bufferevent* _bev, const oxen::quic::Address& _src);
/// Non-copyable and non-moveable
TCPSocket(const TCPSocket& s) = delete;
TCPSocket& operator=(const TCPSocket& s) = delete;
TCPSocket(TCPSocket&& s) = delete;
TCPSocket& operator=(TCPSocket&& s) = delete;
~TCPSocket();
struct bufferevent* bev;
oxen::quic::Address src;
std::weak_ptr<oxen::quic::Stream> stream;
};
class TCPHandle
{
public:
using socket_t =
#ifndef _WIN32
int
#else
SOCKET
#endif
;
TCPHandle() = delete;
explicit TCPHandle(const std::shared_ptr<EventLoop>& ev, const oxen::quic::Address& bind, rcv_data_hook cb);
~TCPHandle();
private:
std::shared_ptr<EventLoop> _ev;
std::shared_ptr<::evconnlistener> _tcp_listener;
socket_t _sock;
oxen::quic::Address _bound;
rcv_data_hook _receive_cb;
std::unordered_map<evutil_socket_t, std::shared_ptr<TCPSocket>> routing;
void _init_internals(const oxen::quic::Address& bind);
public:
// void map_buffer_socket(evutil_socket_t fd)
std::shared_ptr<oxen::quic::Stream> get_socket_stream(evutil_socket_t fd);
void close_socket(evutil_socket_t fd);
oxen::quic::Address bind()
{
return _bound;
}
};
} // namespace llarp

View File

@ -1,5 +1,7 @@
#pragma once
#include <llarp/util/buffer.hpp>
#include <oxen/quic.hpp>
namespace llarp
@ -10,6 +12,7 @@ namespace llarp
using udp_pkt_hook = std::function<void(UDPPacket&& pkt)>;
using ip_pkt_hook = std::function<void(IPPacket)>;
using rcv_data_hook = std::function<void(ustring)>;
using UDPSocket = oxen::quic::UDPSocket;

View File

@ -2,7 +2,7 @@
namespace llarp
{
static auto logcat = log::Cat("UDP");
static auto logcat = log::Cat("ev-udp");
inline constexpr size_t MAX_BATCH =
#if defined(OXEN_LIBQUIC_UDP_SENDMMSG) || defined(OXEN_LIBQUIC_UDP_GSO)
@ -11,10 +11,10 @@ namespace llarp
1;
#endif
UDPHandle::UDPHandle(loop_ptr ev, const oxen::quic::Address& bind, udp_pkt_hook cb)
UDPHandle::UDPHandle(const std::shared_ptr<EventLoop>& ev, const oxen::quic::Address& bind, udp_pkt_hook cb)
: _loop{ev}
{
socket = std::make_unique<UDPSocket>(ev.get(), bind, std::move(cb));
socket = std::make_unique<UDPSocket>(ev->loop().get(), bind, std::move(cb));
_local = socket->address();
}

View File

@ -1,20 +1,23 @@
#pragma once
#include "types.hpp"
#include "loop.hpp"
#include <llarp/util/buffer.hpp>
#include <llarp/util/logging.hpp>
namespace llarp
{
struct UDPHandle
class EventLoop;
class UDPHandle
{
public:
UDPHandle() = delete;
explicit UDPHandle(loop_ptr ev, const oxen::quic::Address& bind, udp_pkt_hook cb);
explicit UDPHandle(const std::shared_ptr<EventLoop>& ev, const oxen::quic::Address& bind, udp_pkt_hook cb);
~UDPHandle();
private:
std::shared_ptr<EventLoop> _loop;
std::unique_ptr<UDPSocket> socket;
oxen::quic::Address _local;

View File

@ -1,6 +1,5 @@
#include "tunnel.hpp"
#include <llarp/service/endpoint.hpp>
#include <llarp/service/name.hpp>
#include <llarp/service/tag.hpp>
#include <llarp/util/logging.hpp>

View File

@ -10,7 +10,7 @@ namespace llarp::win32
{
namespace
{
auto logcat = log::Cat("win32:exec");
static auto logcat = log::Cat("win32:exec");
/// get the directory for system32 which contains all the executables we use
std::string SystemExeDir()

View File

@ -23,7 +23,7 @@ namespace llarp::win32
{
namespace
{
auto logcat = log::Cat("wintun");
static auto logcat = log::Cat("wintun");
constexpr auto PoolName = "lokinet";
WINTUN_CREATE_ADAPTER_FUNC* create_adapter = nullptr;