Merge pull request #850 from majestrate/multithreaded-cryptography

Multithreaded cryptography
pull/859/head
Jeff 5 years ago committed by GitHub
commit fb7360cd47
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -29,6 +29,7 @@ option(WITH_SHARED "build shared library")
option(WITH_COVERAGE "generate coverage data")
option(USE_SHELLHOOKS "enable shell hooks on compile time (dangerous)" OFF)
option(WARNINGS_AS_ERRORS "treat all warnings as errors. turn off for development, on for release" OFF)
option(TRACY_ROOT "include tracy profiler source")
include(cmake/target_link_libraries_system.cmake)
include(cmake/add_import_library.cmake)
@ -102,6 +103,11 @@ endif(WITH_SHELLHOOKS)
# Always build PIC
set(CMAKE_POSITION_INDEPENDENT_CODE ON)
if(TRACY_ROOT)
include_directories(${TRACY_ROOT})
add_definitions(-DTRACY_ENABLE)
endif()
set(ABSEIL_DIR vendor/abseil-cpp)
include_directories(SYSTEM ${ABSEIL_DIR})
add_subdirectory(vendor/cxxopts)
@ -214,6 +220,9 @@ if(ANDROID)
endif(ANDROID)
set(LIBS ${MALLOC_LIB} ${FS_LIB} ${LIBUV_LIBRARY})
if(TRACY_ROOT)
list(APPEND LIBS -ldl)
endif()
add_subdirectory(crypto)
add_subdirectory(llarp)

@ -79,7 +79,11 @@ CROSS ?= OFF
SHARED_LIB ?= OFF
# enable generating coverage
COVERAGE ?= OFF
COVERAGE_OUTDIR ?= "$(TMPDIR)/lokinet-coverage"
# tracy profiler
TRACY_ROOT ?=
# enable sanitizer
XSAN ?= False
@ -106,7 +110,7 @@ ANALYZE_CONFIG_CMD = $(shell gecho -n "cd '$(BUILD_ROOT)' && " ; gecho -n "$(SCA
COVERAGE_CONFIG_CMD = $(shell gecho -n "cd '$(BUILD_ROOT)' && " ; gecho -n "cmake -G'$(CMAKE_GEN)' -DCMAKE_CROSSCOMPILING=$(CROSS) -DSTATIC_LINK_RUNTIME=$(STATIC_LINK) -DUSE_NETNS=$(NETNS) -DUSE_AVX2=$(AVX2) -DNON_PC_TARGET=$(NON_PC_TARGET) -DWITH_SHARED=$(SHARED_LIB) -DWITH_COVERAGE=yes -DCMAKE_EXPORT_COMPILE_COMMANDS=ON '$(REPO)'")
else
CONFIG_CMD = $(shell /bin/echo -n "cd '$(BUILD_ROOT)' && " ; /bin/echo -n "cmake -G'$(CMAKE_GEN)' -DCMAKE_CROSSCOMPILING=$(CROSS) -DSTATIC_LINK_RUNTIME=$(STATIC_LINK) -DUSE_SHELLHOOKS=$(SHELL_HOOKS) -DUSE_NETNS=$(NETNS) -DUSE_AVX2=$(AVX2) -DNON_PC_TARGET=$(NON_PC_TARGET) -DWITH_SHARED=$(SHARED_LIB) -DCMAKE_EXPORT_COMPILE_COMMANDS=ON '$(REPO)'")
CONFIG_CMD = $(shell /bin/echo -n "cd '$(BUILD_ROOT)' && " ; /bin/echo -n "cmake -G'$(CMAKE_GEN)' -DCMAKE_CROSSCOMPILING=$(CROSS) -DSTATIC_LINK_RUNTIME=$(STATIC_LINK) -DUSE_SHELLHOOKS=$(SHELL_HOOKS) -DUSE_NETNS=$(NETNS) -DUSE_AVX2=$(AVX2) -DNON_PC_TARGET=$(NON_PC_TARGET) -DWITH_SHARED=$(SHARED_LIB) -DTRACY_ROOT=$(TRACY_ROOT) -DCMAKE_EXPORT_COMPILE_COMMANDS=ON '$(REPO)'")
CONFIG_CMD_WINDOWS = $(shell /bin/echo -n "cd '$(BUILD_ROOT)' && " ; /bin/echo -n "cmake -G'$(CMAKE_GEN)' -DCMAKE_CROSSCOMPILING=ON -DSTATIC_LINK_RUNTIME=$(STATIC_LINK) -DUSE_SHELLHOOKS=$(SHELL_HOOKS) -DUSE_NETNS=$(NETNS) -DUSE_AVX2=$(AVX2) -DNON_PC_TARGET=$(NON_PC_TARGET) -DWITH_SHARED=$(SHARED_LIB) -DCMAKE_EXPORT_COMPILE_COMMANDS=ON '$(REPO)'")
ANALYZE_CONFIG_CMD = $(shell /bin/echo -n "cd '$(BUILD_ROOT)' && " ; /bin/echo -n "$(SCAN_BUILD) cmake -G'$(CMAKE_GEN)' -DCMAKE_CROSSCOMPILING=$(CROSS) -DSTATIC_LINK_RUNTIME=$(STATIC_LINK) -DUSE_NETNS=$(NETNS) -DUSE_AVX2=$(AVX2) -DNON_PC_TARGET=$(NON_PC_TARGET) -DWITH_SHARED=$(SHARED_LIB) -DXSAN=$(XSAN) -DCMAKE_EXPORT_COMPILE_COMMANDS=ON '$(REPO)'")

@ -1,6 +1,10 @@
set(EXE lokinet)
set(EXE_SRC main.cpp)
if(TRACY_ROOT)
list(APPEND EXE_SRC ${TRACY_ROOT}/TracyClient.cpp)
endif()
if(SHADOW)
set(LOKINET_SHADOW shadow-plugin-${SHARED_LIB})
set(LOKINET_SHADOW_LIBS ${SHARED_LIB})

@ -29,7 +29,9 @@ void
handle_signal(int sig)
{
if(ctx)
{
llarp_main_signal(ctx, sig);
}
}
#ifdef _WIN32

@ -39,7 +39,7 @@ struct DemoCall : public abyss::http::IRPCClientHandler
bool HandleResponse(abyss::http::RPC_Response) override
{
llarp::LogInfo("response get");
m_Logic->queue_func(m_Callback);
m_Logic->queue_func([=]() { m_Callback(); });
return true;
}

@ -6,5 +6,5 @@
constexpr size_t MAX_LINK_MSG_SIZE = 8192;
constexpr llarp_time_t DefaultLinkSessionLifetime = 60 * 1000;
constexpr size_t MaxSendQueueSize = 128;
constexpr size_t MaxSendQueueSize = 1024;
#endif

@ -43,6 +43,7 @@ namespace llarp
bool
Context::Configure()
{
logic = std::make_shared< Logic >();
// llarp::LogInfo("loading config at ", configfile);
if(!config->Load(configfile.c_str()))
{
@ -197,7 +198,6 @@ __ ___ ____ _ _ ___ _ _ ____
llarp::LogInfo(LLARP_VERSION, " ", LLARP_RELEASE_MOTTO);
llarp::LogInfo("starting up");
mainloop = llarp_make_ev_loop();
logic = std::make_shared< Logic >();
crypto = std::make_unique< sodium::CryptoLibSodium >();
cryptoManager = std::make_unique< CryptoManager >(crypto.get());
@ -413,7 +413,8 @@ extern "C"
void
llarp_main_signal(struct llarp_main *ptr, int sig)
{
ptr->ctx->HandleSignal(sig);
ptr->ctx->logic->queue_func(
std::bind(&llarp::Context::HandleSignal, ptr->ctx.get(), sig));
}
int

@ -36,6 +36,7 @@ void
llarp_ev_loop_run_single_process(llarp_ev_loop_ptr ev,
std::shared_ptr< llarp::Logic > logic)
{
ev->set_logic(logic);
while(ev->running())
{
ev->update_time();
@ -44,7 +45,6 @@ llarp_ev_loop_run_single_process(llarp_ev_loop_ptr ev,
{
ev->update_time();
logic->tick_async(ev->time_now());
llarp_threadpool_tick(logic->thread);
}
llarp::LogContext::Instance().logStream->Tick(ev->time_now());
}

@ -69,6 +69,10 @@ llarp_ev_loop_time_now_ms(const llarp_ev_loop_ptr &ev);
void
llarp_ev_loop_stop(const llarp_ev_loop_ptr &ev);
/// list of packets we recv'd
/// forward declared
struct llarp_pkt_list;
/// UDP handling configuration
struct llarp_udp_io
{
@ -88,6 +92,11 @@ struct llarp_udp_io
size_t);
};
/// get all packets recvieved last tick
/// return true if we got packets return false if we didn't
bool
llarp_ev_udp_recvmany(struct llarp_udp_io *udp, struct llarp_pkt_list *pkts);
/// add UDP handler
int
llarp_ev_add_udp(struct llarp_ev_loop *ev, struct llarp_udp_io *udp,

@ -1,6 +1,7 @@
#ifndef LLARP_EV_HPP
#define LLARP_EV_HPP
#include <net/net_addr.hpp>
#include <ev/ev.h>
#include <util/buffer.hpp>
#include <util/codel.hpp>
@ -744,6 +745,9 @@ struct llarp_ev_loop
virtual int
tick(int ms) = 0;
virtual bool
add_ticker(std::function< void(void) > ticker) = 0;
virtual void
stop() = 0;
@ -780,6 +784,9 @@ struct llarp_ev_loop
return false;
}
/// give this event loop a logic thread for calling
virtual void set_logic(std::shared_ptr< llarp::Logic >) = 0;
/// register event listener
virtual bool
add_ev(llarp::ev_io* ev, bool write) = 0;
@ -795,7 +802,7 @@ struct llarp_ev_loop
std::list< std::unique_ptr< llarp::ev_io > > handlers;
void
virtual void
tick_listeners()
{
auto itr = handlers.begin();
@ -812,4 +819,63 @@ struct llarp_ev_loop
}
};
struct PacketBuffer
{
PacketBuffer(PacketBuffer&& other)
{
_ptr = other._ptr;
_sz = other._sz;
other._ptr = nullptr;
other._sz = 0;
}
PacketBuffer() : PacketBuffer(nullptr, 0){};
explicit PacketBuffer(size_t sz) : PacketBuffer(new char[sz], sz)
{
}
PacketBuffer(char* buf, size_t sz) : _ptr{buf}, _sz{sz}
{
}
~PacketBuffer()
{
if(_ptr)
delete[] _ptr;
}
byte_t*
data()
{
return (byte_t*)_ptr;
}
size_t
size()
{
return _sz;
}
byte_t& operator[](size_t sz)
{
return data()[sz];
}
void
reserve(size_t sz)
{
if(_ptr)
delete[] _ptr;
_ptr = new char[sz];
}
private:
char* _ptr = nullptr;
size_t _sz = 0;
};
struct PacketEvent
{
llarp::Addr remote = {};
PacketBuffer pkt = {};
};
struct llarp_pkt_list : public std::vector< PacketEvent >
{
};
#endif

@ -1,10 +1,19 @@
#include <ev/ev_libuv.hpp>
#include <net/net_addr.hpp>
#include <util/thread/logic.hpp>
#include <cstring>
namespace libuv
{
/// call a function in logic thread via a handle
template < typename Handle, typename Func >
void
Call(Handle* h, Func&& f)
{
static_cast< Loop* >(h->loop->data)->Call(f);
}
struct glue
{
virtual ~glue() = default;
@ -65,8 +74,9 @@ namespace libuv
static void
OnOutboundConnect(uv_connect_t* c, int status)
{
auto* self = static_cast< conn_glue* >(c->data);
self->HandleConnectResult(status);
conn_glue* self = static_cast< conn_glue* >(c->data);
Call(self->Stream(),
std::bind(&conn_glue::HandleConnectResult, self, status));
c->data = nullptr;
}
@ -98,9 +108,14 @@ namespace libuv
OnRead(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf)
{
if(nread >= 0)
static_cast< conn_glue* >(stream->data)->Read(buf->base, nread);
{
auto* conn = static_cast< conn_glue* >(stream->data);
conn->Read(buf->base, nread);
}
else if(nread < 0)
{
static_cast< conn_glue* >(stream->data)->Close();
}
delete[] buf->base;
}
@ -160,13 +175,14 @@ namespace libuv
static void
OnWritten(uv_write_t* req, int status)
{
conn_glue* conn = static_cast< conn_glue* >(req->data);
if(status)
{
llarp::LogError("write failed on tcp: ", uv_strerror(status));
static_cast< conn_glue* >(req->data)->Close();
conn->Close();
}
else
static_cast< conn_glue* >(req->data)->DrainOne();
Call(conn->Stream(), std::bind(&conn_glue::DrainOne, conn));
delete req;
}
@ -190,7 +206,8 @@ namespace libuv
static void
OnClosed(uv_handle_t* h)
{
static_cast< conn_glue* >(h->data)->HandleClosed();
conn_glue* conn = static_cast< conn_glue* >(h->data);
Call(h, std::bind(&conn_glue::HandleClosed, conn));
}
static void
@ -246,7 +263,8 @@ namespace libuv
{
if(status == 0)
{
static_cast< conn_glue* >(stream->data)->Accept();
conn_glue* conn = static_cast< conn_glue* >(stream->data);
Call(stream, std::bind(&conn_glue::Accept, conn));
}
else
{
@ -257,7 +275,8 @@ namespace libuv
static void
OnTick(uv_check_t* t)
{
static_cast< conn_glue* >(t->data)->Tick();
conn_glue* conn = static_cast< conn_glue* >(t->data);
Call(t, std::bind(&conn_glue::Tick, conn));
}
void
@ -312,20 +331,54 @@ namespace libuv
}
};
struct ticker_glue : public glue
{
std::function< void(void) > func;
ticker_glue(uv_loop_t* loop, std::function< void(void) > tick) : func(tick)
{
m_Ticker.data = this;
uv_check_init(loop, &m_Ticker);
}
static void
OnTick(uv_check_t* t)
{
ticker_glue* ticker = static_cast< ticker_glue* >(t->data);
Call(&ticker->m_Ticker, [ticker]() { ticker->func(); });
}
bool
Start()
{
return uv_check_start(&m_Ticker, &OnTick) != -1;
}
void
Close() override
{
uv_check_stop(&m_Ticker);
m_Ticker.data = nullptr;
delete this;
}
uv_check_t m_Ticker;
};
struct udp_glue : public glue
{
uv_udp_t m_Handle;
uv_check_t m_Ticker;
llarp_udp_io* const m_UDP;
llarp::Addr m_Addr;
bool gotpkts;
llarp_pkt_list m_LastPackets;
std::array< char, 1500 > m_Buffer;
udp_glue(uv_loop_t* loop, llarp_udp_io* udp, const sockaddr* src)
: m_UDP(udp), m_Addr(*src)
{
m_Handle.data = this;
m_Ticker.data = this;
gotpkts = false;
uv_udp_init(loop, &m_Handle);
uv_check_init(loop, &m_Ticker);
}
@ -333,35 +386,52 @@ namespace libuv
static void
Alloc(uv_handle_t*, size_t suggested_size, uv_buf_t* buf)
{
buf->base = new char[suggested_size];
buf->len = suggested_size;
const size_t sz = std::min(suggested_size, size_t{1500});
buf->base = new char[sz];
buf->len = sz;
}
static void
OnRecv(uv_udp_t* handle, ssize_t nread, const uv_buf_t* buf,
const struct sockaddr* addr, unsigned)
{
udp_glue* glue = static_cast< udp_glue* >(handle->data);
if(addr)
static_cast< udp_glue* >(handle->data)->RecvFrom(nread, buf, addr);
delete[] buf->base;
glue->RecvFrom(nread, buf, addr);
if(glue->m_UDP == nullptr || glue->m_UDP->recvfrom != nullptr)
delete[] buf->base;
}
bool
RecvMany(llarp_pkt_list* pkts)
{
*pkts = std::move(m_LastPackets);
m_LastPackets = llarp_pkt_list();
return pkts->size() > 0;
}
void
RecvFrom(ssize_t sz, const uv_buf_t* buf, const struct sockaddr* fromaddr)
{
if(sz >= 0 && m_UDP && m_UDP->recvfrom)
if(sz > 0 && m_UDP)
{
const size_t pktsz = sz;
const llarp_buffer_t pkt{(const byte_t*)buf->base, pktsz};
m_UDP->recvfrom(m_UDP, fromaddr, ManagedBuffer{pkt});
gotpkts = true;
if(m_UDP->recvfrom)
m_UDP->recvfrom(m_UDP, fromaddr, ManagedBuffer{pkt});
else
{
m_LastPackets.emplace_back(
PacketEvent{llarp::Addr(*fromaddr), PacketBuffer(buf->base, sz)});
}
}
}
static void
OnTick(uv_check_t* t)
{
static_cast< udp_glue* >(t->data)->Tick();
udp_glue* udp = static_cast< udp_glue* >(t->data);
udp->Tick();
}
void
@ -369,7 +439,6 @@ namespace libuv
{
if(m_UDP && m_UDP->tick)
m_UDP->tick(m_UDP);
gotpkts = false;
}
static int
@ -404,6 +473,7 @@ namespace libuv
if(uv_fileno((const uv_handle_t*)&m_Handle, &m_UDP->fd))
return false;
m_UDP->sendto = &SendTo;
m_UDP->impl = this;
return true;
}
@ -413,8 +483,7 @@ namespace libuv
auto* glue = static_cast< udp_glue* >(h->data);
if(glue)
{
h->data = nullptr;
glue->m_UDP->impl = nullptr;
h->data = nullptr;
delete glue;
}
}
@ -422,6 +491,7 @@ namespace libuv
void
Close() override
{
m_UDP->impl = nullptr;
uv_check_stop(&m_Ticker);
uv_close((uv_handle_t*)&m_Handle, &OnClosed);
}
@ -442,7 +512,7 @@ namespace libuv
void
Tick()
{
m_Pipe->tick();
Call(&m_Handle, std::bind(&llarp_ev_pkt_pipe::tick, m_Pipe));
}
static void
@ -481,7 +551,8 @@ namespace libuv
static void
OnTick(uv_check_t* h)
{
static_cast< pipe_glue* >(h->data)->Tick();
pipe_glue* pipe = static_cast< pipe_glue* >(h->data);
Call(h, std::bind(&pipe_glue::Tick, pipe));
}
bool
@ -522,7 +593,8 @@ namespace libuv
static void
OnTick(uv_check_t* timer)
{
static_cast< tun_glue* >(timer->data)->Tick();
tun_glue* tun = static_cast< tun_glue* >(timer->data);
Call(timer, std::bind(&tun_glue::Tick, tun));
}
static void
@ -562,8 +634,7 @@ namespace libuv
auto* self = static_cast< tun_glue* >(h->data);
if(self)
{
self->m_Tun->impl = nullptr;
h->data = nullptr;
h->data = nullptr;
delete self;
}
}
@ -571,6 +642,7 @@ namespace libuv
void
Close() override
{
m_Tun->impl = nullptr;
uv_check_stop(&m_Ticker);
uv_close((uv_handle_t*)&m_Handle, &OnClosed);
}
@ -584,7 +656,8 @@ namespace libuv
static bool
WritePkt(llarp_tun_io* tun, const byte_t* pkt, size_t sz)
{
return static_cast< tun_glue* >(tun->impl)->Write(pkt, sz);
tun_glue* glue = static_cast< tun_glue* >(tun->impl);
return glue && glue->Write(pkt, sz);
}
bool
@ -631,6 +704,7 @@ namespace libuv
return false;
}
m_Tun->writepkt = &WritePkt;
m_Tun->impl = this;
return true;
}
};
@ -641,6 +715,7 @@ namespace libuv
m_Impl.reset(uv_loop_new());
if(uv_loop_init(m_Impl.get()) == -1)
return false;
m_Impl->data = this;
uv_loop_configure(m_Impl.get(), UV_LOOP_BLOCK_SIGNAL, SIGPIPE);
m_TickTimer.data = this;
m_Run.store(true);
@ -737,6 +812,18 @@ namespace libuv
return false;
}
bool
Loop::add_ticker(std::function< void(void) > func)
{
auto* ticker = new ticker_glue(m_Impl.get(), func);
if(ticker->Start())
{
return true;
}
delete ticker;
return false;
}
bool
Loop::udp_close(llarp_udp_io* udp)
{
@ -785,3 +872,9 @@ namespace libuv
}
} // namespace libuv
bool
llarp_ev_udp_recvmany(struct llarp_udp_io* u, struct llarp_pkt_list* pkts)
{
return static_cast< libuv::udp_glue* >(u->impl)->RecvMany(pkts);
}

@ -5,6 +5,7 @@
#include <uv.h>
#include <vector>
#include <functional>
#include <util/thread/logic.hpp>
namespace libuv
{
@ -78,6 +79,9 @@ namespace libuv
return nullptr;
}
bool
add_ticker(std::function< void(void) > ticker) override;
/// register event listener
bool
add_ev(llarp::ev_io*, bool) override
@ -85,6 +89,20 @@ namespace libuv
return false;
}
void
set_logic(std::shared_ptr< llarp::Logic > l) override
{
m_Logic = l;
}
/// call function in logic thread
template < typename F >
void
Call(F f)
{
m_Logic->queue_func(f);
}
private:
struct DestructLoop
{
@ -98,6 +116,7 @@ namespace libuv
std::unique_ptr< uv_loop_t, DestructLoop > m_Impl;
uv_timer_t m_TickTimer;
std::atomic< bool > m_Run;
std::shared_ptr< llarp::Logic > m_Logic;
};
} // namespace libuv

@ -366,10 +366,25 @@ namespace llarp
if(static_cast< size_t >(ret) > sz)
return -1;
b.sz = ret;
udp->recvfrom(udp, addr, ManagedBuffer{b});
if(udp->recvfrom)
udp->recvfrom(udp, addr, ManagedBuffer{b});
else
{
m_RecvPackets.emplace_back(
PacketEvent{llarp::Addr(*addr), PacketBuffer(ret)});
std::copy_n(buf, ret, m_RecvPackets.back().pkt.data());
}
return 0;
}
bool
udp_listener::RecvMany(llarp_pkt_list* pkts)
{
*pkts = std::move(m_RecvPackets);
m_RecvPackets = llarp_pkt_list();
return pkts->size() > 0;
}
static int
UDPSendTo(llarp_udp_io* udp, const sockaddr* to, const byte_t* ptr, size_t sz)
{
@ -696,4 +711,18 @@ llarp_win32_loop::stop()
llarp::LogDebug("destroy upoll");
}
void
llarp_win32_loop::tick_listeners()
{
llarp_ev_loop::tick_listeners();
for(auto& func : m_Tickers)
m_Logic->queue_func([func]() { func(); });
}
bool
llarp_ev_udp_recvmany(struct llarp_udp_io* u, struct llarp_pkt_list* pkts)
{
return static_cast< llarp::udp_listener* >(u->impl)->RecvMany(pkts);
}
#endif

@ -5,6 +5,7 @@
#include <net/net.h>
#include <net/net.hpp>
#include <util/buffer.hpp>
#include <util/thread/logic.hpp>
#include <windows.h>
#include <process.h>
@ -36,6 +37,7 @@ namespace llarp
struct udp_listener : public ev_io
{
llarp_udp_io* udp;
llarp_pkt_list m_RecvPackets;
udp_listener(int fd, llarp_udp_io* u) : ev_io(fd), udp(u){};
@ -43,6 +45,9 @@ namespace llarp
{
}
bool
RecvMany(llarp_pkt_list*);
bool
tick();
@ -99,6 +104,8 @@ struct win32_tun_io
struct llarp_win32_loop : public llarp_ev_loop
{
upoll_t* upollfd;
std::shared_ptr< llarp::Logic > m_Logic;
std::vector< std::function< void(void) > > m_Tickers;
llarp_win32_loop() : upollfd(nullptr)
{
@ -148,6 +155,22 @@ struct llarp_win32_loop : public llarp_ev_loop
void
stop();
bool
add_ticker(std::function< void(void) > func) override
{
m_Tickers.emplace_back(func);
return true;
}
void
set_logic(std::shared_ptr< llarp::Logic > l) override
{
m_Logic = l;
}
void
tick_listeners() override;
};
#endif

@ -5,6 +5,7 @@
#include <path/path_context.hpp>
#include <router/abstractrouter.hpp>
#include <util/str.hpp>
#include <util/bits.hpp>
#include <cassert>
@ -21,7 +22,8 @@ namespace llarp
static void
ExitHandlerFlush(llarp_tun_io *tun)
{
static_cast< ExitEndpoint * >(tun->user)->Flush();
auto *ep = static_cast< ExitEndpoint * >(tun->user);
ep->GetRouter()->logic()->queue_func(std::bind(&ExitEndpoint::Flush, ep));
}
ExitEndpoint::ExitEndpoint(const std::string &name, AbstractRouter *r)
@ -547,6 +549,11 @@ namespace llarp
}
if(k == "ifaddr")
{
if(!m_OurRange.FromString(v))
{
LogError(Name(), " has invalid address range: ", v);
return false;
}
auto pos = v.find("/");
if(pos == std::string::npos)
{
@ -558,26 +565,9 @@ namespace llarp
// string, or just a plain char array?
strncpy(m_Tun.ifaddr, host_str.c_str(), sizeof(m_Tun.ifaddr) - 1);
m_Tun.netmask = std::atoi(nmask_str.c_str());
huint32_t ip;
if(ip.FromString(host_str))
{
m_IfAddr = net::IPPacket::ExpandV4(ip);
m_OurRange.netmask_bits = netmask_ipv6_bits(m_Tun.netmask + 96);
}
else if(m_IfAddr.FromString(host_str))
{
m_UseV6 = true;
m_OurRange.netmask_bits = netmask_ipv6_bits(m_Tun.netmask);
}
else
{
LogError(Name(), " invalid ifaddr: ", v);
return false;
}
m_OurRange.addr = m_IfAddr;
m_NextAddr = m_IfAddr;
m_HigestAddr = m_IfAddr | (~m_OurRange.netmask_bits);
m_IfAddr = m_OurRange.addr;
m_NextAddr = m_IfAddr;
m_HigestAddr = m_OurRange.HighestAddr();
LogInfo(Name(), " set ifaddr range to ", m_Tun.ifaddr, "/",
m_Tun.netmask, " lo=", m_IfAddr, " hi=", m_HigestAddr);
}

@ -18,6 +18,8 @@
#include <util/str.hpp>
#include <absl/strings/ascii.h>
namespace llarp
{
namespace handlers
@ -143,18 +145,39 @@ namespace llarp
// the keyfile
if(k == "exit-node")
{
IPRange exitRange;
llarp::RouterID exitRouter;
if(!(exitRouter.FromString(v)
|| HexDecode(v.c_str(), exitRouter.begin(), exitRouter.size())))
std::string routerStr;
const auto pos = v.find(",");
if(pos != std::string::npos)
{
auto range_str = v.substr(1 + pos);
if(!exitRange.FromString(range_str))
{
LogError("bad exit range: '", range_str, "'");
return false;
}
routerStr = v.substr(0, pos);
}
else
{
llarp::LogError(Name(), " bad exit router key: ", v);
routerStr = v;
}
absl::StripAsciiWhitespace(&routerStr);
if(!(exitRouter.FromString(routerStr)
|| HexDecode(routerStr.c_str(), exitRouter.begin(),
exitRouter.size())))
{
llarp::LogError(Name(), " bad exit router key: ", routerStr);
return false;
}
m_Exit = std::make_shared< llarp::exit::ExitSession >(
auto exit = std::make_shared< llarp::exit::ExitSession >(
exitRouter,
util::memFn(&TunEndpoint::QueueInboundPacketForExit, this),
m_router, numPaths, numHops, ShouldBundleRC());
llarp::LogInfo(Name(), " using exit at ", exitRouter);
m_ExitMap.Insert(exitRange, exit);
llarp::LogInfo(Name(), " using exit at ", exitRouter, " for ",
exitRange);
}
if(k == "local-dns")
{
@ -288,16 +311,10 @@ namespace llarp
{
auto self = shared_from_this();
FlushSend();
if(m_Exit)
{
RouterLogic()->queue_func([=] {
self->m_Exit->FlushUpstream();
self->Router()->PumpLL();
});
}
RouterLogic()->queue_func([=]() {
RouterLogic()->queue_func([=] {
self->m_ExitMap.ForEachValue(
[](const auto &exit) { exit->FlushUpstream(); });
self->Pump(self->Now());
self->Router()->PumpLL();
});
}
@ -487,8 +504,8 @@ namespace llarp
TunEndpoint::ResetInternalState()
{
service::Endpoint::ResetInternalState();
if(m_Exit)
m_Exit->ResetInternalState();
m_ExitMap.ForEachValue(
[](const auto &exit) { exit->ResetInternalState(); });
}
bool
@ -550,11 +567,11 @@ namespace llarp
llarp::LogWarn("Couldn't start endpoint");
return false;
}
if(m_Exit)
{
for(const auto &snode : SnodeBlacklist())
m_Exit->BlacklistSnode(snode);
}
const auto blacklist = SnodeBlacklist();
m_ExitMap.ForEachValue([blacklist](const auto &exit) {
for(const auto &snode : blacklist)
exit->BlacklistSnode(snode);
});
return SetupNetworking();
}
@ -639,7 +656,7 @@ namespace llarp
m_NextIP = m_OurIP;
m_OurRange.addr = m_OurIP;
m_MaxIP = m_OurIP | (~m_OurRange.netmask_bits);
m_MaxIP = m_OurRange.HighestAddr();
llarp::LogInfo(Name(), " set ", tunif.ifname, " to have address ",
m_OurIP);
llarp::LogInfo(Name(), " allocated up to ", m_MaxIP, " on range ",
@ -688,21 +705,19 @@ namespace llarp
void
TunEndpoint::Tick(llarp_time_t now)
{
// call tun code in endpoint logic in case of network isolation
// EndpointLogic()->queue_job({this, handleTickTun});
if(m_Exit)
{
EnsureRouterIsKnown(m_Exit->Endpoint());
m_Exit->Tick(now);
}
Endpoint::Tick(now);
EndpointLogic()->queue_func([&]() {
m_ExitMap.ForEachValue([&](const auto &exit) {
this->EnsureRouterIsKnown(exit->Endpoint());
exit->Tick(now);
});
Endpoint::Tick(now);
});
}
bool
TunEndpoint::Stop()
{
if(m_Exit)
m_Exit->Stop();
m_ExitMap.ForEachValue([](const auto &exit) { exit->Stop(); });
return llarp::service::Endpoint::Stop();
}
@ -721,17 +736,26 @@ namespace llarp
auto itr = m_IPToAddr.find(dst);
if(itr == m_IPToAddr.end())
{
if(m_Exit && pkt.IsV4() && !llarp::IsIPv4Bogon(pkt.dstv4()))
const auto exits = m_ExitMap.FindAll(dst);
if(exits.empty())
{
pkt.UpdateIPv4Address({0}, xhtonl(pkt.dstv4()));
m_Exit->QueueUpstreamTraffic(std::move(pkt),
llarp::routing::ExitPadSize);
llarp::LogWarn(Name(), " has no exit mapped for ", dst);
return;
}
else if(m_Exit && pkt.IsV6())
for(const auto &exit : exits)
{
pkt.UpdateIPv6Address({0}, pkt.dstv6());
m_Exit->QueueUpstreamTraffic(std::move(pkt),
if(pkt.IsV4() && !llarp::IsIPv4Bogon(pkt.dstv4()))
{
pkt.UpdateIPv4Address({0}, xhtonl(pkt.dstv4()));
exit->QueueUpstreamTraffic(std::move(pkt),
llarp::routing::ExitPadSize);
}
else if(pkt.IsV6())
{
pkt.UpdateIPv6Address({0}, pkt.dstv6());
exit->QueueUpstreamTraffic(std::move(pkt),
llarp::routing::ExitPadSize);
}
}
return;
}
@ -760,6 +784,7 @@ namespace llarp
}
llarp::LogWarn(Name(), " did not flush packets");
});
Router()->PumpLL();
}
bool
@ -906,12 +931,13 @@ namespace llarp
// called in the isolated network thread
auto *self = static_cast< TunEndpoint * >(tun->user);
// flush user to network
self->FlushSend();
self->EndpointLogic()->queue_func(
std::bind(&TunEndpoint::FlushSend, self));
// flush exit traffic queues if it's there
if(self->m_Exit)
{
self->m_Exit->FlushDownstream();
}
self->EndpointLogic()->queue_func([self] {
self->m_ExitMap.ForEachValue(
[](const auto &exit) { exit->FlushDownstream(); });
});
// flush network to user
self->m_NetworkToUserPktQueue.Process([tun](net::IPPacket &pkt) {
if(!llarp_ev_tun_async_write(tun, pkt.Buffer()))
@ -924,9 +950,9 @@ namespace llarp
{
// called for every packet read from user in isolated network thread
auto *self = static_cast< TunEndpoint * >(tun->user);
const ManagedBuffer buf(b);
const ManagedBuffer pkt(b);
self->m_UserToNetworkPktQueue.EmplaceIf(
[&buf](net::IPPacket &pkt) -> bool { return pkt.Load(buf); });
[&pkt](net::IPPacket &p) -> bool { return p.Load(pkt); });
}
TunEndpoint::~TunEndpoint() = default;

@ -1,5 +1,6 @@
#include <iwp/linklayer.hpp>
#include <iwp/session.hpp>
#include <unordered_set>
namespace llarp
{
@ -22,19 +23,20 @@ namespace llarp
void
LinkLayer::Pump()
{
std::set< RouterID > sessions;
std::unordered_set< RouterID, RouterID::Hash > sessions;
{
Lock l(&m_AuthedLinksMutex);
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
auto itr = m_AuthedLinks.begin();
while(itr != m_AuthedLinks.end())
{
sessions.insert(itr->first);
const RouterID r{itr->first};
sessions.emplace(r);
++itr;
}
}
ILinkLayer::Pump();
{
Lock l(&m_AuthedLinksMutex);
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
for(const auto& pk : sessions)
{
if(m_AuthedLinks.count(pk) == 0)
@ -66,20 +68,20 @@ namespace llarp
return 2;
}
bool
LinkLayer::Start(std::shared_ptr< Logic > l)
void
LinkLayer::QueueWork(std::function< void(void) > func)
{
return ILinkLayer::Start(l);
m_Worker->addJob(func);
}
void
LinkLayer::RecvFrom(const Addr& from, const void* pkt, size_t sz)
LinkLayer::RecvFrom(const Addr& from, ILinkSession::Packet_t pkt)
{
std::shared_ptr< ILinkSession > session;
auto itr = m_AuthedAddrs.find(from);
if(itr == m_AuthedAddrs.end())
{
Lock lock(&m_PendingMutex);
ACQUIRE_LOCK(Lock_t lock, m_PendingMutex);
if(m_Pending.count(from) == 0)
{
if(not permitInbound)
@ -90,14 +92,13 @@ namespace llarp
}
else
{
Lock lock(&m_AuthedLinksMutex);
ACQUIRE_LOCK(Lock_t lock, m_AuthedLinksMutex);
auto range = m_AuthedLinks.equal_range(itr->second);
session = range.first->second;
}
if(session)
{
const llarp_buffer_t buf{pkt, sz};
session->Recv_LL(buf);
session->Recv_LL(std::move(pkt));
}
}

@ -6,6 +6,7 @@
#include <crypto/encrypted.hpp>
#include <crypto/types.hpp>
#include <link/server.hpp>
#include <util/thread/thread_pool.hpp>
namespace llarp
{
@ -21,9 +22,6 @@ namespace llarp
~LinkLayer() override;
bool
Start(std::shared_ptr< Logic > l) override;
std::shared_ptr< ILinkSession >
NewOutboundSession(const RouterContact &rc,
const AddressInfo &ai) override;
@ -41,7 +39,7 @@ namespace llarp
Rank() const override;
void
RecvFrom(const Addr &from, const void *buf, size_t sz) override;
RecvFrom(const Addr &from, ILinkSession::Packet_t pkt) override;
bool
MapAddr(const RouterID &pk, ILinkSession *s) override;
@ -49,6 +47,9 @@ namespace llarp
void
UnmapAddr(const Addr &addr);
void
QueueWork(std::function< void(void) > work);
private:
std::unordered_map< Addr, RouterID, Addr::Hash > m_AuthedAddrs;
const bool permitInbound;

@ -6,28 +6,27 @@ namespace llarp
{
namespace iwp
{
OutboundMessage::OutboundMessage(uint64_t msgid, const llarp_buffer_t &pkt,
OutboundMessage::OutboundMessage(uint64_t msgid,
ILinkSession::Message_t msg,
llarp_time_t now,
ILinkSession::CompletionHandler handler)
: m_Size{(uint16_t)std::min(pkt.sz, MAX_LINK_MSG_SIZE)}
: m_Data{std::move(msg)}
, m_MsgID{msgid}
, m_Completed{handler}
, m_StartedAt{now}
{
m_Data.Zero();
std::copy_n(pkt.base, m_Size, m_Data.begin());
const llarp_buffer_t buf(m_Data.data(), m_Size);
CryptoManager::instance()->shorthash(digest, buf);
const llarp_buffer_t buf(m_Data);
CryptoManager::instance()->shorthash(m_Digest, buf);
}
std::vector< byte_t >
ILinkSession::Packet_t
OutboundMessage::XMIT() const
{
std::vector< byte_t > xmit{
LLARP_PROTO_VERSION, Command::eXMIT, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
htobe16buf(xmit.data() + 2, m_Size);
htobe64buf(xmit.data() + 4, m_MsgID);
std::copy(digest.begin(), digest.end(), std::back_inserter(xmit));
auto xmit = CreatePacket(Command::eXMIT, 10 + 32);
htobe16buf(xmit.data() + CommandOverhead + PacketOverhead, m_Data.size());
htobe64buf(xmit.data() + 2 + CommandOverhead + PacketOverhead, m_MsgID);
std::copy_n(m_Digest.begin(), m_Digest.size(),
xmit.data() + 10 + CommandOverhead + PacketOverhead);
return xmit;
}
@ -55,35 +54,24 @@ namespace llarp
void
OutboundMessage::FlushUnAcked(
std::function< void(const llarp_buffer_t &) > sendpkt, llarp_time_t now)
std::function< void(ILinkSession::Packet_t) > sendpkt, llarp_time_t now)
{
uint16_t idx = 0;
while(idx < m_Size)
/// overhead for a data packet in plaintext
static constexpr size_t Overhead = 10;
uint16_t idx = 0;
const auto datasz = m_Data.size();
while(idx < datasz)
{
if(not m_Acks[idx / FragmentSize])
{
std::vector< byte_t > frag{LLARP_PROTO_VERSION,
Command::eDATA,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0};
htobe16buf(frag.data() + 2, idx);
htobe64buf(frag.data() + 4, m_MsgID);
const size_t fragsz =
idx + FragmentSize < m_Size ? FragmentSize : m_Size - idx;
const auto sz = frag.size();
frag.resize(sz + fragsz);
idx + FragmentSize < datasz ? FragmentSize : datasz - idx;
auto frag = CreatePacket(Command::eDATA, fragsz + Overhead, 0, 0);
htobe16buf(frag.data() + 2 + PacketOverhead, idx);
htobe64buf(frag.data() + 4 + PacketOverhead, m_MsgID);
std::copy(m_Data.begin() + idx, m_Data.begin() + idx + fragsz,
frag.begin() + sz);
const llarp_buffer_t pkt(frag);
sendpkt(pkt);
frag.data() + PacketOverhead + Overhead + 2);
sendpkt(std::move(frag));
}
idx += FragmentSize;
}
@ -93,9 +81,10 @@ namespace llarp
bool
OutboundMessage::IsTransmitted() const
{
for(uint16_t idx = 0; idx < m_Size; idx += FragmentSize)
const auto sz = m_Data.size();
for(uint16_t idx = 0; idx < sz; idx += FragmentSize)
{
if(!m_Acks.test(idx / FragmentSize))
if(not m_Acks.test(idx / FragmentSize))
return false;
}
return true;
@ -120,8 +109,8 @@ namespace llarp
InboundMessage::InboundMessage(uint64_t msgid, uint16_t sz, ShortHash h,
llarp_time_t now)
: m_Digset{std::move(h)}
, m_Size{sz}
: m_Data(size_t{sz})
, m_Digset{std::move(h)}
, m_MsgID{msgid}
, m_LastActiveAt{now}
{
@ -132,39 +121,40 @@ namespace llarp
llarp_time_t now)
{
if(idx + buf.sz > m_Data.size())
{
LogWarn("invalid fragment offset ", idx);
return;
auto *dst = m_Data.data() + idx;
}
byte_t *dst = m_Data.data() + idx;
std::copy_n(buf.base, buf.sz, dst);
m_Acks.set(idx / FragmentSize);
LogDebug("got fragment ", idx / FragmentSize, " of ", m_Size);
LogDebug("got fragment ", idx / FragmentSize);
m_LastActiveAt = now;
}
std::vector< byte_t >
ILinkSession::Packet_t
InboundMessage::ACKS() const
{
std::vector< byte_t > acks{LLARP_PROTO_VERSION,
Command::eACKS,
0,
0,
0,
0,
0,
0,
0,
0,
uint8_t{(uint8_t)m_Acks.to_ulong()}};
htobe64buf(acks.data() + 2, m_MsgID);
auto acks = CreatePacket(Command::eACKS, 9);
htobe64buf(acks.data() + CommandOverhead + PacketOverhead, m_MsgID);
acks[PacketOverhead + 10] = AcksBitmask();
return acks;
}
byte_t
InboundMessage::AcksBitmask() const
{
return byte_t{(byte_t)m_Acks.to_ulong()};
}
bool
InboundMessage::IsCompleted() const
{
for(uint16_t idx = 0; idx < m_Size; idx += FragmentSize)
const auto sz = m_Data.size();
for(size_t idx = 0; idx < sz; idx += FragmentSize)
{
if(!m_Acks.test(idx / FragmentSize))
if(not m_Acks.test(idx / FragmentSize))
return false;
}
return true;
@ -173,7 +163,7 @@ namespace llarp
bool
InboundMessage::ShouldSendACKS(llarp_time_t now) const
{
return now - m_LastACKSent > 1000 || IsCompleted();
return (now > m_LastACKSent && now - m_LastACKSent > 1000);
}
bool
@ -185,12 +175,9 @@ namespace llarp
void
InboundMessage::SendACKS(
std::function< void(const llarp_buffer_t &) > sendpkt, llarp_time_t now)
std::function< void(ILinkSession::Packet_t) > sendpkt, llarp_time_t now)
{
auto acks = ACKS();
AddRandomPadding(acks);
const llarp_buffer_t pkt(acks);
sendpkt(pkt);
sendpkt(ACKS());
m_LastACKSent = now;
}
@ -198,15 +185,9 @@ namespace llarp
InboundMessage::Verify() const
{
ShortHash gotten;
const llarp_buffer_t buf(m_Data.data(), m_Size);
const llarp_buffer_t buf(m_Data);
CryptoManager::instance()->shorthash(gotten, buf);
LogDebug("gotten=", gotten.ToHex());
if(gotten != m_Digset)
{
DumpBuffer(buf);
return false;
}
return true;
return gotten == m_Digset;
}
} // namespace iwp

@ -23,36 +23,40 @@ namespace llarp
eACKS = 3,
/// negative ack
eNACK = 4,
/// multiack
eMACK = 5,
/// close session
eCLOS = 5
eCLOS = 0xff,
};
/// max size of data fragments
static constexpr size_t FragmentSize = 1024;
/// plaintext header overhead size
static constexpr size_t CommandOverhead = 2;
struct OutboundMessage
{
OutboundMessage() = default;
OutboundMessage(uint64_t msgid, const llarp_buffer_t &pkt,
OutboundMessage(uint64_t msgid, ILinkSession::Message_t data,
llarp_time_t now,
ILinkSession::CompletionHandler handler);
AlignedBuffer< MAX_LINK_MSG_SIZE > m_Data;
uint16_t m_Size = 0;
ILinkSession::Message_t m_Data;
uint64_t m_MsgID = 0;
std::bitset< MAX_LINK_MSG_SIZE / FragmentSize > m_Acks;
ILinkSession::CompletionHandler m_Completed;
llarp_time_t m_LastFlush = 0;
ShortHash digest;
ShortHash m_Digest;
llarp_time_t m_StartedAt = 0;
std::vector< byte_t >
ILinkSession::Packet_t
XMIT() const;
void
Ack(byte_t bitmask);
void
FlushUnAcked(std::function< void(const llarp_buffer_t &) > sendpkt,
FlushUnAcked(std::function< void(ILinkSession::Packet_t) > sendpkt,
llarp_time_t now);
bool
@ -77,16 +81,15 @@ namespace llarp
InboundMessage(uint64_t msgid, uint16_t sz, ShortHash h,
llarp_time_t now);
AlignedBuffer< MAX_LINK_MSG_SIZE > m_Data;
ILinkSession::Message_t m_Data;
ShortHash m_Digset;
uint16_t m_Size = 0;
uint64_t m_MsgID = 0;
llarp_time_t m_LastACKSent = 0;
llarp_time_t m_LastActiveAt = 0;
std::bitset< MAX_LINK_MSG_SIZE / FragmentSize > m_Acks;
void
HandleData(uint16_t idx, const llarp_buffer_t &buf, llarp_time_t now);
HandleData(uint16_t idx, const llarp_buffer_t& buf, llarp_time_t now);
bool
IsCompleted() const;
@ -97,14 +100,17 @@ namespace llarp
bool
Verify() const;
byte_t
AcksBitmask() const;
bool
ShouldSendACKS(llarp_time_t now) const;
void
SendACKS(std::function< void(const llarp_buffer_t &) > sendpkt,
SendACKS(std::function< void(ILinkSession::Packet_t) > sendpkt,
llarp_time_t now);
std::vector< byte_t >
ILinkSession::Packet_t
ACKS() const;
};

@ -8,15 +8,24 @@ namespace llarp
{
namespace iwp
{
static constexpr size_t PacketOverhead = HMACSIZE + TUNNONCESIZE;
void
AddRandomPadding(std::vector< byte_t >& pkt, size_t min, size_t variance)
ILinkSession::Packet_t
CreatePacket(Command cmd, size_t plainsize, size_t minpad, size_t variance)
{
const auto sz = pkt.size();
const size_t randpad = min + randint() % variance;
pkt.resize(sz + randpad);
CryptoManager::instance()->randbytes(pkt.data() + sz, randpad);
const size_t pad =
minpad > 0 ? minpad + (variance > 0 ? randint() % variance : 0) : 0;
ILinkSession::Packet_t pkt(PacketOverhead + plainsize + pad
+ CommandOverhead);
// randomize pad
if(pad)
{
CryptoManager::instance()->randbytes(
pkt.data() + PacketOverhead + CommandOverhead + plainsize, pad);
}
// randomize nounce
CryptoManager::instance()->randbytes(pkt.data() + HMACSIZE, TUNNONCESIZE);
pkt[PacketOverhead] = LLARP_PROTO_VERSION;
pkt[PacketOverhead + 1] = cmd;
return pkt;
}
Session::Session(LinkLayer* p, RouterContact rc, AddressInfo ai)
@ -30,6 +39,8 @@ namespace llarp
{
token.Zero();
GotLIM = util::memFn(&Session::GotOutboundLIM, this);
CryptoManager::instance()->shorthash(m_SessionKey,
llarp_buffer_t(rc.pubkey));
}
Session::Session(LinkLayer* p, Addr from)
@ -40,7 +51,9 @@ namespace llarp
, m_RemoteAddr{from}
{
token.Randomize();
GotLIM = util::memFn(&Session::GotInboundLIM, this);
GotLIM = util::memFn(&Session::GotInboundLIM, this);
const PubKey pk = m_Parent->GetOurRC().pubkey;
CryptoManager::instance()->shorthash(m_SessionKey, llarp_buffer_t(pk));
}
void
@ -54,9 +67,10 @@ namespace llarp
bool
Session::GotInboundLIM(const LinkIntroMessage* msg)
{
if(msg->rc.enckey != m_RemoteOnionKey)
if(msg->rc.pubkey != m_ExpectedIdent)
{
LogError("key missmatch");
LogError("ident key missmatch from ", m_RemoteAddr, " ", msg->rc.pubkey,
" != ", m_ExpectedIdent);
return false;
}
m_State = State::Ready;
@ -100,15 +114,13 @@ namespace llarp
LogError("failed to sign our RC for ", m_RemoteAddr);
return;
}
AlignedBuffer< LinkIntroMessage::MaxSize > data;
ILinkSession::Message_t data(LinkIntroMessage::MaxSize + PacketOverhead);
llarp_buffer_t buf(data);
if(not msg.BEncode(&buf))
{
LogError("failed to encode LIM for ", m_RemoteAddr);
}
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
if(!SendMessageBuffer(buf, h))
if(!SendMessageBuffer(std::move(data), h))
{
LogError("failed to send LIM to ", m_RemoteAddr);
}
@ -116,28 +128,38 @@ namespace llarp
}
void
Session::EncryptAndSend(const llarp_buffer_t& data)
Session::EncryptAndSend(ILinkSession::Packet_t data)
{
std::vector< byte_t > pkt;
pkt.resize(data.sz + PacketOverhead);
CryptoManager::instance()->randbytes(pkt.data(), pkt.size());
llarp_buffer_t pktbuf(pkt);
pktbuf.base += PacketOverhead;
pktbuf.cur = pktbuf.base;
pktbuf.sz -= PacketOverhead;
byte_t* nonce_ptr = pkt.data() + HMACSIZE;
CryptoManager::instance()->xchacha20_alt(pktbuf, data, m_SessionKey,
nonce_ptr);
pktbuf.base = nonce_ptr;
pktbuf.sz = data.sz + 32;
CryptoManager::instance()->hmac(pkt.data(), pktbuf, m_SessionKey);
if(m_EncryptNext == nullptr)
m_EncryptNext = std::make_shared< CryptoQueue_t >();
m_EncryptNext->emplace_back(std::move(data));
if(!IsEstablished())
{
EncryptWorker(std::move(m_EncryptNext));
m_EncryptNext = nullptr;
}
}
pktbuf.base = pkt.data();
pktbuf.cur = pkt.data();
pktbuf.sz = pkt.size();
Send_LL(pktbuf);
void
Session::EncryptWorker(CryptoQueue_ptr msgs)
{
LogDebug("encrypt worker ", msgs->size(), " messages");
for(auto& pkt : *msgs)
{
llarp_buffer_t pktbuf(pkt);
const TunnelNonce nonce_ptr{pkt.data() + HMACSIZE};
pktbuf.base += PacketOverhead;
pktbuf.cur = pktbuf.base;
pktbuf.sz -= PacketOverhead;
CryptoManager::instance()->xchacha20(pktbuf, m_SessionKey, nonce_ptr);
pktbuf.base = pkt.data() + HMACSIZE;
pktbuf.sz = pkt.size() - HMACSIZE;
CryptoManager::instance()->hmac(pkt.data(), pktbuf, m_SessionKey);
pktbuf.base = pkt.data();
pktbuf.cur = pkt.data();
pktbuf.sz = pkt.size();
Send_LL(pktbuf);
}
}
void
@ -145,10 +167,8 @@ namespace llarp
{
if(m_State == State::Closed)
return;
std::vector< byte_t > close_msg = {LLARP_PROTO_VERSION, Command::eCLOS};
AddRandomPadding(close_msg);
const llarp_buffer_t buf(close_msg);
EncryptAndSend(buf);
auto close_msg = CreatePacket(Command::eCLOS, 0, 16, 16);
EncryptAndSend(std::move(close_msg));
if(m_State == State::Ready)
m_Parent->UnmapAddr(m_RemoteAddr);
m_State = State::Closed;
@ -156,23 +176,51 @@ namespace llarp
}
bool
Session::SendMessageBuffer(const llarp_buffer_t& buf,
Session::SendMessageBuffer(ILinkSession::Message_t buf,
ILinkSession::CompletionHandler completed)
{
if(m_TXMsgs.size() >= MaxSendQueueSize)
return false;
const auto now = m_Parent->Now();
const auto msgid = m_TXID++;
auto& msg =
m_TXMsgs.emplace(msgid, OutboundMessage{msgid, buf, now, completed})
m_TXMsgs
.emplace(msgid,
OutboundMessage{msgid, std::move(buf), now, completed})
.first->second;
auto xmit = msg.XMIT();
AddRandomPadding(xmit);
const llarp_buffer_t pkt(xmit);
EncryptAndSend(pkt);
EncryptAndSend(msg.XMIT());
msg.FlushUnAcked(util::memFn(&Session::EncryptAndSend, this), now);
LogDebug("send message ", msgid);
return true;
}
void
Session::SendMACK()
{
// send multi acks
while(m_SendMACKs.size() > 0)
{
const auto sz = m_SendMACKs.size();
const auto max = Session::MaxACKSInMACK;
auto numAcks = std::min(sz, max);
auto mack =
CreatePacket(Command::eMACK, 1 + (numAcks * sizeof(uint64_t)));
mack[PacketOverhead + CommandOverhead] =
byte_t{static_cast< byte_t >(numAcks)};
byte_t* ptr = mack.data() + 3 + PacketOverhead;
LogDebug("send ", numAcks, " macks to ", m_RemoteAddr);
auto itr = m_SendMACKs.begin();
while(numAcks > 0)
{
htobe64buf(ptr, *itr);
itr = m_SendMACKs.erase(itr);
numAcks--;
ptr += sizeof(uint64_t);
}
EncryptAndSend(std::move(mack));
}
}
void
Session::Pump()
{
@ -198,6 +246,22 @@ namespace llarp
}
}
}
auto self = shared_from_this();
if(m_EncryptNext && !m_EncryptNext->empty())
{
m_Parent->QueueWork([self, data = std::move(m_EncryptNext)] {
self->EncryptWorker(data);
});
m_EncryptNext = nullptr;
}
if(m_DecryptNext && !m_DecryptNext->empty())
{
m_Parent->QueueWork([self, data = std::move(m_DecryptNext)] {
self->DecryptWorker(data);
});
m_DecryptNext = nullptr;
}
}
bool
@ -267,6 +331,7 @@ namespace llarp
{
if(itr->second.IsTimedOut(now))
{
m_ReplayFilter.emplace(itr->first, now);
itr = m_RXMsgs.erase(itr);
}
else
@ -288,15 +353,33 @@ namespace llarp
}
}
using Introduction = AlignedBuffer< 64 >;
using Introduction = AlignedBuffer< PubKey::SIZE + PubKey::SIZE
+ TunnelNonce::SIZE + Signature::SIZE >;
void
Session::GenerateAndSendIntro()
{
Introduction intro;
TunnelNonce N;
N.Randomize();
ILinkSession::Packet_t req(Introduction::SIZE + PacketOverhead);
const auto pk = m_Parent->GetOurRC().pubkey;
const auto e_pk = m_Parent->RouterEncryptionSecret().toPublic();
auto itr = req.data() + PacketOverhead;
std::copy_n(pk.begin(), pk.size(), itr);
itr += pk.size();
std::copy_n(e_pk.begin(), e_pk.size(), itr);
itr += e_pk.size();
std::copy(N.begin(), N.end(), itr);
Signature Z;
llarp_buffer_t signbuf(req.data() + PacketOverhead,
Introduction::SIZE - Signature::SIZE);
m_Parent->Sign(Z, signbuf);
std::copy_n(
Z.begin(), Z.size(),
req.data() + PacketOverhead + (Introduction::SIZE - Signature::SIZE));
CryptoManager::instance()->randbytes(req.data() + HMACSIZE, TUNNONCESIZE);
EncryptAndSend(std::move(req));
m_State = State::Introduction;
if(not CryptoManager::instance()->transport_dh_client(
m_SessionKey, m_ChosenAI.pubkey,
m_Parent->RouterEncryptionSecret(), N))
@ -305,37 +388,25 @@ namespace llarp
m_RemoteAddr);
return;
}
const auto pk = m_Parent->RouterEncryptionSecret().toPublic();
std::copy_n(pk.begin(), pk.size(), intro.begin());
std::copy(N.begin(), N.end(), intro.begin() + PubKey::SIZE);
LogDebug("pk=", pk.ToHex(), " N=", N.ToHex(),
" remote-pk=", m_ChosenAI.pubkey.ToHex());
std::vector< byte_t > req;
std::copy_n(intro.begin(), intro.size(), std::back_inserter(req));
AddRandomPadding(req);
const llarp_buffer_t buf(req);
Send_LL(buf);
m_State = State::Introduction;
LogDebug("sent intro to ", m_RemoteAddr);
}
void
Session::HandleCreateSessionRequest(const llarp_buffer_t& buf)
Session::HandleCreateSessionRequest(Packet_t pkt)
{
std::vector< byte_t > result;
if(not DecryptMessage(buf, result))
if(not DecryptMessageInPlace(pkt))
{
LogError("failed to decrypt session request from ", m_RemoteAddr);
return;
}
if(result.size() < token.size())
if(pkt.size() < token.size() + PacketOverhead)
{
LogError("bad session request size, ", result.size(), " < ",
token.size(), " from ", m_RemoteAddr);
LogError("bad session request size, ", pkt.size(), " < ",
token.size() + PacketOverhead, " from ", m_RemoteAddr);
return;
}
if(not std::equal(result.begin(), result.begin() + token.size(),
token.begin()))
const auto begin = pkt.data() + PacketOverhead;
if(not std::equal(begin, begin + token.size(), token.begin()))
{
LogError("token missmatch from ", m_RemoteAddr);
return;
@ -346,19 +417,33 @@ namespace llarp
}
void
Session::HandleGotIntro(const llarp_buffer_t& buf)
Session::HandleGotIntro(Packet_t pkt)
{
if(buf.sz < Introduction::SIZE)
if(pkt.size() < Introduction::SIZE + PacketOverhead)
{
LogWarn("intro too small from ", m_RemoteAddr);
return;
}
byte_t* ptr = pkt.data() + PacketOverhead;
TunnelNonce N;
std::copy_n(buf.base, PubKey::SIZE, m_RemoteOnionKey.begin());
std::copy_n(buf.base + PubKey::SIZE, TunnelNonce::SIZE, N.begin());
std::copy_n(ptr, PubKey::SIZE, m_ExpectedIdent.begin());
ptr += PubKey::SIZE;
std::copy_n(ptr, PubKey::SIZE, m_RemoteOnionKey.begin());
ptr += PubKey::SIZE;
std::copy_n(ptr, TunnelNonce::SIZE, N.begin());
ptr += TunnelNonce::SIZE;
Signature Z;
std::copy_n(ptr, Z.size(), Z.begin());
const llarp_buffer_t verifybuf(pkt.data() + PacketOverhead,
Introduction::SIZE - Signature::SIZE);
if(!CryptoManager::instance()->verify(m_ExpectedIdent, verifybuf, Z))
{
LogError("intro verify failed from ", m_RemoteAddr);
return;
}
const PubKey pk = m_Parent->TransportSecretKey().toPublic();
LogDebug("got intro: remote-pk=", m_RemoteOnionKey.ToHex(),
" N=", N.ToHex(), " local-pk=", pk.ToHex(), " sz=", buf.sz);
" N=", N.ToHex(), " local-pk=", pk.ToHex());
if(not CryptoManager::instance()->transport_dh_server(
m_SessionKey, m_RemoteOnionKey, m_Parent->TransportSecretKey(), N))
{
@ -366,48 +451,53 @@ namespace llarp
m_RemoteAddr);
return;
}
std::vector< byte_t > reply;
std::copy_n(token.begin(), token.size(), std::back_inserter(reply));
AddRandomPadding(reply);
const llarp_buffer_t pkt(reply);
Packet_t reply(token.size() + PacketOverhead);
// random nonce
CryptoManager::instance()->randbytes(reply.data() + HMACSIZE,
TUNNONCESIZE);
// set token
std::copy_n(token.begin(), token.size(), reply.data() + PacketOverhead);
m_LastRX = m_Parent->Now();
EncryptAndSend(pkt);
EncryptAndSend(std::move(reply));
LogDebug("sent intro ack to ", m_RemoteAddr);
m_State = State::Introduction;
}
void
Session::HandleGotIntroAck(const llarp_buffer_t& buf)
Session::HandleGotIntroAck(Packet_t pkt)
{
std::vector< byte_t > reply;
if(not DecryptMessage(buf, reply))
if(pkt.size() < token.size() + PacketOverhead)
{
LogError("intro ack decrypt failed from ", m_RemoteAddr);
LogError("bad intro ack size ", pkt.size(), " < ",
token.size() + PacketOverhead, " from ", m_RemoteAddr);
return;
}
if(reply.size() < token.size())
Packet_t reply(token.size() + PacketOverhead);
if(not DecryptMessageInPlace(pkt))
{
LogError("bad intro ack size ", reply.size(), " < ", token.size(),
" from ", m_RemoteAddr);
LogError("intro ack decrypt failed from ", m_RemoteAddr);
return;
}
m_LastRX = m_Parent->Now();
std::copy_n(reply.begin(), token.size(), token.begin());
const llarp_buffer_t pkt(token);
EncryptAndSend(pkt);
std::copy_n(pkt.data() + PacketOverhead, token.size(), token.begin());
std::copy_n(token.begin(), token.size(), reply.data() + PacketOverhead);
// random nounce
CryptoManager::instance()->randbytes(reply.data() + HMACSIZE,
TUNNONCESIZE);
EncryptAndSend(std::move(reply));
LogDebug("sent session request to ", m_RemoteAddr);
m_State = State::LinkIntro;
}
bool
Session::DecryptMessage(const llarp_buffer_t& buf,
std::vector< byte_t >& result)
Session::DecryptMessageInPlace(Packet_t& pkt)
{
if(buf.sz <= PacketOverhead)
if(pkt.size() <= PacketOverhead)
{
LogError("packet too small ", buf.sz);
LogError("packet too small from ", m_RemoteAddr);
return false;
}
const llarp_buffer_t buf(pkt);
ShortHash H;
llarp_buffer_t curbuf(buf.base, buf.sz);
curbuf.base += ShortHash::SIZE;
@ -424,14 +514,11 @@ namespace llarp
m_RemoteAddr, " state=", int(m_State), " size=", buf.sz);
return false;
}
const byte_t* nonce_ptr = curbuf.base;
const TunnelNonce N{curbuf.base};
curbuf.base += 32;
curbuf.sz -= 32;
result.resize(buf.sz - PacketOverhead);
const llarp_buffer_t outbuf(result);
LogDebug("decrypt: ", result.size(), " bytes from ", m_RemoteAddr);
return CryptoManager::instance()->xchacha20_alt(outbuf, curbuf,
m_SessionKey, nonce_ptr);
LogDebug("decrypt: ", curbuf.sz, " bytes from ", m_RemoteAddr);
return CryptoManager::instance()->xchacha20(curbuf, m_SessionKey, N);
}
void
@ -443,77 +530,143 @@ namespace llarp
}
void
Session::HandleSessionData(const llarp_buffer_t& buf)
Session::HandleSessionData(Packet_t pkt)
{
if(m_DecryptNext == nullptr)
m_DecryptNext = std::make_shared< CryptoQueue_t >();
m_DecryptNext->emplace_back(std::move(pkt));
}
void
Session::DecryptWorker(CryptoQueue_ptr msgs)
{
CryptoQueue_ptr recvMsgs = std::make_shared< CryptoQueue_t >();
for(auto& pkt : *msgs)
{
if(not DecryptMessageInPlace(pkt))
{
LogError("failed to decrypt session data from ", m_RemoteAddr);
continue;
}
if(pkt[PacketOverhead] != LLARP_PROTO_VERSION)
{
LogError("protocol version missmatch ", int(pkt[PacketOverhead]),
" != ", LLARP_PROTO_VERSION);
continue;
}
recvMsgs->emplace_back(std::move(pkt));
}
LogDebug("decrypted ", recvMsgs->size(), " packets from ", m_RemoteAddr);
m_Parent->logic()->queue_func(
std::bind(&Session::HandlePlaintext, shared_from_this(), recvMsgs));
}
void
Session::HandlePlaintext(CryptoQueue_ptr msgs)
{
std::vector< byte_t > result;
if(not DecryptMessage(buf, result))
for(auto& result : *msgs)
{
LogError("failed to decrypt session data from ", m_RemoteAddr);
LogDebug("Command ", int(result[PacketOverhead + 1]));
switch(result[PacketOverhead + 1])
{
case Command::eXMIT:
HandleXMIT(std::move(result));
break;
case Command::eDATA:
HandleDATA(std::move(result));
break;
case Command::eACKS:
HandleACKS(std::move(result));
break;
case Command::ePING:
HandlePING(std::move(result));
break;
case Command::eNACK:
HandleNACK(std::move(result));
break;
case Command::eCLOS:
HandleCLOS(std::move(result));
break;
case Command::eMACK:
HandleMACK(std::move(result));
break;
default:
LogError("invalid command ", int(result[PacketOverhead + 1]),
" from ", m_RemoteAddr);
}
}
SendMACK();
}
void
Session::HandleMACK(Packet_t data)
{
if(data.size() < 3 + PacketOverhead)
{
LogError("impossibly short mack from ", m_RemoteAddr);
return;
}
if(result[0] != LLARP_PROTO_VERSION)
byte_t numAcks = data[CommandOverhead + PacketOverhead];
if(data.size()
< 1 + CommandOverhead + PacketOverhead + (numAcks * sizeof(uint64_t)))
{
LogError("protocol version missmatch ", int(result[0]),
" != ", LLARP_PROTO_VERSION);
LogError("short mack from ", m_RemoteAddr);
return;
}
LogDebug("command ", int(result[1]), " from ", m_RemoteAddr);
switch(result[1])
LogDebug("got ", int(numAcks), " mack from ", m_RemoteAddr);
byte_t* ptr = data.data() + CommandOverhead + PacketOverhead + 1;
while(numAcks > 0)
{
case Command::eXMIT:
HandleXMIT(std::move(result));
return;
case Command::eDATA:
HandleDATA(std::move(result));
return;
case Command::eACKS:
HandleACKS(std::move(result));
return;
case Command::ePING:
HandlePING(std::move(result));
return;
case Command::eNACK:
HandleNACK(std::move(result));
return;
case Command::eCLOS:
HandleCLOS(std::move(result));
return;
uint64_t acked = bufbe64toh(ptr);
LogDebug("mack containing txid=", acked, " from ", m_RemoteAddr);
auto itr = m_TXMsgs.find(acked);
if(itr != m_TXMsgs.end())
{
itr->second.Completed();
m_TXMsgs.erase(itr);
}
else
{
LogDebug("ignored mack for txid=", acked, " from ", m_RemoteAddr);
}
ptr += sizeof(uint64_t);
numAcks--;
}
LogError("invalid command ", int(result[1]));
}
void
Session::HandleNACK(std::vector< byte_t > data)
Session::HandleNACK(Packet_t data)
{
if(data.size() < 10)
if(data.size() < CommandOverhead + sizeof(uint64_t) + PacketOverhead)
{
LogError("short nack from ", m_RemoteAddr);
return;
}
uint64_t txid = bufbe64toh(data.data() + 2);
uint64_t txid =
bufbe64toh(data.data() + CommandOverhead + PacketOverhead);
LogDebug("got nack on ", txid, " from ", m_RemoteAddr);
auto itr = m_TXMsgs.find(txid);
if(itr != m_TXMsgs.end())
{
auto xmit = itr->second.XMIT();
AddRandomPadding(xmit);
const llarp_buffer_t pkt(xmit);
EncryptAndSend(pkt);
EncryptAndSend(itr->second.XMIT());
}
m_LastRX = m_Parent->Now();
}
void
Session::HandleXMIT(std::vector< byte_t > data)
Session::HandleXMIT(Packet_t data)
{
if(data.size() < 44)
if(data.size() < CommandOverhead + PacketOverhead + sizeof(uint16_t)
+ sizeof(uint64_t) + ShortHash::SIZE)
{
LogError("short XMIT from ", m_RemoteAddr, " ", data.size(), " < 44");
LogError("short XMIT from ", m_RemoteAddr);
return;
}
uint16_t sz = bufbe16toh(data.data() + 2);
uint64_t rxid = bufbe64toh(data.data() + 4);
ShortHash h{data.data() + 12};
uint16_t sz = bufbe16toh(data.data() + CommandOverhead + PacketOverhead);
uint64_t rxid = bufbe64toh(data.data() + CommandOverhead
+ sizeof(uint16_t) + PacketOverhead);
ShortHash h{data.data() + CommandOverhead + sizeof(uint16_t)
+ sizeof(uint64_t) + PacketOverhead};
LogDebug("rxid=", rxid, " sz=", sz, " h=", h.ToHex());
m_LastRX = m_Parent->Now();
{
@ -521,6 +674,7 @@ namespace llarp
auto itr = m_ReplayFilter.find(rxid);
if(itr != m_ReplayFilter.end())
{
m_SendMACKs.emplace(rxid);
LogDebug("duplicate rxid=", rxid, " from ", m_RemoteAddr);
return;
}
@ -536,44 +690,51 @@ namespace llarp
}
void
Session::HandleDATA(std::vector< byte_t > data)
Session::HandleDATA(Packet_t data)
{
if(data.size() <= 12)
if(data.size() <= CommandOverhead + sizeof(uint16_t) + sizeof(uint64_t)
+ PacketOverhead)
{
LogError("short DATA from ", m_RemoteAddr, " ", data.size());
return;
}
m_LastRX = m_Parent->Now();
uint16_t sz = bufbe16toh(data.data() + 2);
uint64_t rxid = bufbe64toh(data.data() + 4);
m_LastRX = m_Parent->Now();
uint16_t sz = bufbe16toh(data.data() + CommandOverhead + PacketOverhead);
uint64_t rxid = bufbe64toh(data.data() + CommandOverhead
+ sizeof(uint16_t) + PacketOverhead);
auto itr = m_RXMsgs.find(rxid);
if(itr == m_RXMsgs.end())
{
LogDebug("no rxid=", rxid, " for ", m_RemoteAddr);
std::vector< byte_t > nack = {
LLARP_PROTO_VERSION, Command::eNACK, 0, 0, 0, 0, 0, 0, 0, 0};
htobe64buf(nack.data() + 2, rxid);
AddRandomPadding(nack);
const llarp_buffer_t nackbuf(nack);
EncryptAndSend(nackbuf);
if(m_ReplayFilter.find(rxid) == m_ReplayFilter.end())
{
LogDebug("no rxid=", rxid, " for ", m_RemoteAddr);
auto nack = CreatePacket(Command::eNACK, 8);
htobe64buf(nack.data() + PacketOverhead + CommandOverhead, rxid);
EncryptAndSend(std::move(nack));
}
else
{
LogDebug("replay hit for rxid=", rxid, " for ", m_RemoteAddr);
m_SendMACKs.emplace(rxid);
}
return;
}
{
const llarp_buffer_t buf(data.data() + 12, data.size() - 12);
const llarp_buffer_t buf(data.data() + PacketOverhead + 12,
data.size() - (PacketOverhead + 12));
itr->second.HandleData(sz, buf, m_Parent->Now());
}
if(itr->second.IsCompleted())
{
itr->second.SendACKS(util::memFn(&Session::EncryptAndSend, this),
m_Parent->Now());
if(itr->second.Verify())
{
auto msg = std::move(itr->second);
const llarp_buffer_t buf(msg.m_Data.data(), msg.m_Size);
const llarp_buffer_t buf(msg.m_Data);
m_Parent->HandleMessage(this, buf);
m_ReplayFilter.emplace(itr->first, m_Parent->Now());
m_SendMACKs.emplace(itr->first);
}
else
{
@ -584,23 +745,23 @@ namespace llarp
}
void
Session::HandleACKS(std::vector< byte_t > data)
Session::HandleACKS(Packet_t data)
{
if(data.size() < 11)
if(data.size() < 11 + PacketOverhead)
{
LogError("short ACKS from ", m_RemoteAddr, " ", data.size(), " < 11");
LogError("short ACKS from ", m_RemoteAddr);
return;
}
const auto now = m_Parent->Now();
m_LastRX = now;
uint64_t txid = bufbe64toh(data.data() + 2);
uint64_t txid = bufbe64toh(data.data() + 2 + PacketOverhead);
auto itr = m_TXMsgs.find(txid);
if(itr == m_TXMsgs.end())
{
LogDebug("no txid=", txid, " for ", m_RemoteAddr);
return;
}
itr->second.Ack(data[10]);
itr->second.Ack(data[10 + PacketOverhead]);
if(itr->second.IsTransmitted())
{
@ -615,13 +776,13 @@ namespace llarp
}
}
void Session::HandleCLOS(std::vector< byte_t >)
void Session::HandleCLOS(Packet_t)
{
LogInfo("remote closed by ", m_RemoteAddr);
Close();
}
void Session::HandlePING(std::vector< byte_t >)
void Session::HandlePING(Packet_t)
{
m_LastRX = m_Parent->Now();
}
@ -631,9 +792,7 @@ namespace llarp
{
if(m_State == State::Ready)
{
std::vector< byte_t > ping{LLARP_PROTO_VERSION, Command::ePING};
const llarp_buffer_t buf(ping);
EncryptAndSend(buf);
EncryptAndSend(CreatePacket(Command::ePING, 0));
return true;
}
return false;
@ -646,7 +805,7 @@ namespace llarp
}
void
Session::Recv_LL(const llarp_buffer_t& buf)
Session::Recv_LL(ILinkSession::Packet_t data)
{
switch(m_State)
{
@ -655,7 +814,10 @@ namespace llarp
{
// initial data
// enter introduction phase
HandleGotIntro(buf);
if(DecryptMessageInPlace(data))
HandleGotIntro(std::move(data));
else
LogError("bad intro from ", m_RemoteAddr);
}
else
{
@ -667,18 +829,18 @@ namespace llarp
if(m_Inbound)
{
// we are replying to an intro ack
HandleCreateSessionRequest(buf);
HandleCreateSessionRequest(std::move(data));
}
else
{
// we got an intro ack
// send a session request
HandleGotIntroAck(buf);
HandleGotIntroAck(std::move(data));
}
break;
case State::LinkIntro:
default:
HandleSessionData(buf);
HandleSessionData(std::move(data));
break;
}
}

@ -4,34 +4,39 @@
#include <link/session.hpp>
#include <iwp/linklayer.hpp>
#include <iwp/message_buffer.hpp>
#include <unordered_set>
#include <deque>
namespace llarp
{
namespace iwp
{
void
AddRandomPadding(std::vector< byte_t >& pkt, size_t min = 16,
size_t variance = 16);
/// packet crypto overhead size
static constexpr size_t PacketOverhead = HMACSIZE + TUNNONCESIZE;
/// creates a packet with plaintext size + wire overhead + random pad
ILinkSession::Packet_t
CreatePacket(Command cmd, size_t plainsize, size_t min_pad = 16,
size_t pad_variance = 16);
struct Session : public ILinkSession,
public std::enable_shared_from_this< Session >
{
/// Time how long we try delivery for
static constexpr llarp_time_t DeliveryTimeout = 5000;
/// How long to keep a replay window for
static constexpr llarp_time_t ReplayWindow = (DeliveryTimeout * 3) / 2;
static constexpr llarp_time_t DeliveryTimeout = 1000;
/// Time how long we wait to recieve a message
static constexpr llarp_time_t RecievalTimeout = (DeliveryTimeout * 8) / 5;
/// How long to keep a replay window for
static constexpr llarp_time_t ReplayWindow = (RecievalTimeout * 3) / 2;
/// How often to acks RX messages
static constexpr llarp_time_t ACKResendInterval = 500;
static constexpr llarp_time_t ACKResendInterval = DeliveryTimeout / 4;
/// How often to retransmit TX fragments
static constexpr llarp_time_t TXFlushInterval =
(ACKResendInterval * 3) / 2;
static constexpr llarp_time_t TXFlushInterval = (DeliveryTimeout / 5) * 2;
/// How often we send a keepalive
static constexpr llarp_time_t PingInterval = 2000;
static constexpr llarp_time_t PingInterval = 5000;
/// How long we wait for a session to die with no tx from them
static constexpr llarp_time_t SessionAliveTimeout =
(PingInterval * 13) / 3;
static constexpr llarp_time_t SessionAliveTimeout = PingInterval * 5;
/// maximum number of messages we can ack in a multiack
static constexpr std::size_t MaxACKSInMACK = 1024 / sizeof(uint64_t);
/// outbound session
Session(LinkLayer* parent, RouterContact rc, AddressInfo ai);
@ -53,14 +58,13 @@ namespace llarp
Tick(llarp_time_t now) override;
bool
SendMessageBuffer(const llarp_buffer_t& buf,
SendMessageBuffer(ILinkSession::Message_t msg,
CompletionHandler resultHandler) override;
void
Send_LL(const llarp_buffer_t& pkt);
void
EncryptAndSend(const llarp_buffer_t& data);
void EncryptAndSend(ILinkSession::Packet_t);
void
Start() override;
@ -68,8 +72,7 @@ namespace llarp
void
Close() override;
void
Recv_LL(const llarp_buffer_t& pkt) override;
void Recv_LL(ILinkSession::Packet_t) override;
bool
SendKeepAlive() override;
@ -149,6 +152,7 @@ namespace llarp
/// session token
AlignedBuffer< 24 > token;
PubKey m_ExpectedIdent;
PubKey m_RemoteOnionKey;
llarp_time_t m_LastTX = 0;
@ -161,24 +165,43 @@ namespace llarp
/// maps rxid to time recieved
std::unordered_map< uint64_t, llarp_time_t > m_ReplayFilter;
/// set of rx messages to send in next round of multiacks
std::unordered_set< uint64_t > m_SendMACKs;
using CryptoQueue_t = std::vector< Packet_t >;
using CryptoQueue_ptr = std::shared_ptr< CryptoQueue_t >;
CryptoQueue_ptr m_EncryptNext;
CryptoQueue_ptr m_DecryptNext;
void
EncryptWorker(CryptoQueue_ptr msgs);
void
HandleGotIntro(const llarp_buffer_t& buf);
DecryptWorker(CryptoQueue_ptr msgs);
void
HandleGotIntroAck(const llarp_buffer_t& buf);
HandlePlaintext(CryptoQueue_ptr msgs);
void
HandleCreateSessionRequest(const llarp_buffer_t& buf);
HandleGotIntro(Packet_t pkt);
void
HandleAckSession(const llarp_buffer_t& buf);
HandleGotIntroAck(Packet_t pkt);
void
HandleSessionData(const llarp_buffer_t& buf);
HandleCreateSessionRequest(Packet_t pkt);
void
HandleAckSession(Packet_t pkt);
void
HandleSessionData(Packet_t pkt);
bool
DecryptMessage(const llarp_buffer_t& buf, std::vector< byte_t >& result);
DecryptMessageInPlace(Packet_t& pkt);
void
SendMACK();
void
GenerateAndSendIntro();
@ -196,22 +219,25 @@ namespace llarp
SendOurLIM(ILinkSession::CompletionHandler h = nullptr);
void
HandleXMIT(std::vector< byte_t > msg);
HandleXMIT(Packet_t msg);
void
HandleDATA(Packet_t msg);
void
HandleDATA(std::vector< byte_t > msg);
HandleACKS(Packet_t msg);
void
HandleACKS(std::vector< byte_t > msg);
HandleNACK(Packet_t msg);
void
HandleNACK(std::vector< byte_t > msg);
HandlePING(Packet_t msg);
void
HandlePING(std::vector< byte_t > msg);
HandleCLOS(Packet_t msg);
void
HandleCLOS(std::vector< byte_t > msg);
HandleMACK(Packet_t msg);
};
} // namespace iwp
} // namespace llarp

@ -42,7 +42,8 @@ namespace llarp
AddLink(LinkLayer_ptr link, bool inbound = false) = 0;
virtual bool
StartLinks(Logic_ptr logic) = 0;
StartLinks(Logic_ptr logic,
std::shared_ptr< thread::ThreadPool > worker) = 0;
virtual void
Stop() = 0;

@ -89,12 +89,13 @@ namespace llarp
}
bool
LinkManager::StartLinks(Logic_ptr logic)
LinkManager::StartLinks(Logic_ptr logic,
std::shared_ptr< thread::ThreadPool > worker)
{
LogInfo("starting ", outboundLinks.size(), " outbound links");
for(const auto &link : outboundLinks)
{
if(!link->Start(logic))
if(!link->Start(logic, worker))
{
LogWarn("outbound link '", link->Name(), "' failed to start");
return false;
@ -107,7 +108,7 @@ namespace llarp
LogInfo("starting ", inboundLinks.size(), " inbound links");
for(const auto &link : inboundLinks)
{
if(!link->Start(logic))
if(!link->Start(logic, worker))
{
LogWarn("Link ", link->Name(), " failed to start");
return false;

@ -40,7 +40,8 @@ namespace llarp
AddLink(LinkLayer_ptr link, bool inbound = false) override;
bool
StartLinks(Logic_ptr logic) override;
StartLinks(Logic_ptr logic,
std::shared_ptr< thread::ThreadPool > worker) override;
void
Stop() override;

@ -1,5 +1,5 @@
#include <link/server.hpp>
#include <ev/ev.hpp>
#include <crypto/crypto.hpp>
#include <util/fs.hpp>
#include <utility>
@ -29,7 +29,7 @@ namespace llarp
bool
ILinkLayer::HasSessionTo(const RouterID& id)
{
Lock l(&m_AuthedLinksMutex);
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
return m_AuthedLinks.find(id) != m_AuthedLinks.end();
}
@ -39,7 +39,7 @@ namespace llarp
{
std::vector< std::shared_ptr< ILinkSession > > sessions;
{
Lock l(&m_AuthedLinksMutex);
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
if(m_AuthedLinks.size() == 0)
return;
const size_t sz = randint() % m_AuthedLinks.size();
@ -75,7 +75,7 @@ namespace llarp
{
std::shared_ptr< ILinkSession > session;
{
Lock l(&m_AuthedLinksMutex);
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
auto itr = m_AuthedLinks.find(pk);
if(itr == m_AuthedLinks.end())
return false;
@ -89,7 +89,7 @@ namespace llarp
{
std::vector< std::shared_ptr< ILinkSession > > sessions;
{
Lock l(&m_AuthedLinksMutex);
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
auto itr = m_AuthedLinks.begin();
while(itr != m_AuthedLinks.end())
{
@ -107,7 +107,7 @@ namespace llarp
{
m_Loop = loop;
m_udp.user = this;
m_udp.recvfrom = &ILinkLayer::udp_recv_from;
m_udp.recvfrom = nullptr;
m_udp.tick = &ILinkLayer::udp_tick;
if(ifname == "*")
{
@ -125,7 +125,7 @@ namespace llarp
{
auto _now = Now();
{
Lock lock(&m_AuthedLinksMutex);
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
auto itr = m_AuthedLinks.begin();
while(itr != m_AuthedLinks.end())
{
@ -144,7 +144,7 @@ namespace llarp
}
}
{
Lock lock(&m_PendingMutex);
ACQUIRE_LOCK(Lock_t l, m_PendingMutex);
auto itr = m_Pending.begin();
while(itr != m_Pending.end())
@ -172,8 +172,8 @@ namespace llarp
bool
ILinkLayer::MapAddr(const RouterID& pk, ILinkSession* s)
{
Lock l_authed(&m_AuthedLinksMutex);
Lock l_pending(&m_PendingMutex);
ACQUIRE_LOCK(Lock_t l_authed, m_AuthedLinksMutex);
ACQUIRE_LOCK(Lock_t l_pending, m_PendingMutex);
llarp::Addr addr = s->GetRemoteEndpoint();
auto itr = m_Pending.find(addr);
if(itr != m_Pending.end())
@ -213,7 +213,7 @@ namespace llarp
std::vector< util::StatusObject > pending, established;
{
Lock l(&m_PendingMutex);
ACQUIRE_LOCK(Lock_t l, m_PendingMutex);
std::transform(m_Pending.cbegin(), m_Pending.cend(),
std::back_inserter(pending),
[](const auto& item) -> util::StatusObject {
@ -221,7 +221,7 @@ namespace llarp
});
}
{
Lock l(&m_AuthedLinksMutex);
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
std::transform(m_AuthedLinks.cbegin(), m_AuthedLinks.cend(),
std::back_inserter(established),
[](const auto& item) -> util::StatusObject {
@ -241,7 +241,7 @@ namespace llarp
ILinkLayer::TryEstablishTo(RouterContact rc)
{
{
Lock l(&m_AuthedLinksMutex);
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
if(m_AuthedLinks.count(rc.pubkey) >= MaxSessionsPerKey)
return false;
}
@ -250,7 +250,7 @@ namespace llarp
return false;
const llarp::Addr addr(to);
{
Lock l(&m_PendingMutex);
ACQUIRE_LOCK(Lock_t l, m_PendingMutex);
if(m_Pending.count(addr) >= MaxSessionsPerKey)
return false;
}
@ -264,9 +264,12 @@ namespace llarp
}
bool
ILinkLayer::Start(std::shared_ptr< Logic > l)
ILinkLayer::Start(std::shared_ptr< Logic > l,
std::shared_ptr< thread::ThreadPool > worker)
{
m_Logic = l;
m_Recv = std::make_shared< TrafficQueue_t >();
m_Worker = worker;
m_Logic = l;
ScheduleTick(100);
return true;
}
@ -275,7 +278,7 @@ namespace llarp
ILinkLayer::Tick(llarp_time_t now)
{
{
Lock l(&m_AuthedLinksMutex);
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
auto itr = m_AuthedLinks.begin();
while(itr != m_AuthedLinks.end())
{
@ -285,7 +288,7 @@ namespace llarp
}
{
Lock l(&m_PendingMutex);
ACQUIRE_LOCK(Lock_t l, m_PendingMutex);
auto itr = m_Pending.begin();
while(itr != m_Pending.end())
{
@ -301,7 +304,7 @@ namespace llarp
if(m_Logic && tick_id)
m_Logic->remove_call(tick_id);
{
Lock l(&m_AuthedLinksMutex);
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
auto itr = m_AuthedLinks.begin();
while(itr != m_AuthedLinks.end())
{
@ -310,7 +313,7 @@ namespace llarp
}
}
{
Lock l(&m_PendingMutex);
ACQUIRE_LOCK(Lock_t l, m_PendingMutex);
auto itr = m_Pending.begin();
while(itr != m_Pending.end())
{
@ -318,12 +321,13 @@ namespace llarp
++itr;
}
}
m_Recv.reset();
}
void
ILinkLayer::CloseSessionTo(const RouterID& remote)
{
Lock l(&m_AuthedLinksMutex);
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
RouterID r = remote;
llarp::LogInfo("Closing all to ", r);
auto range = m_AuthedLinks.equal_range(r);
@ -338,7 +342,7 @@ namespace llarp
void
ILinkLayer::KeepAliveSessionTo(const RouterID& remote)
{
Lock l(&m_AuthedLinksMutex);
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
auto range = m_AuthedLinks.equal_range(remote);
auto itr = range.first;
while(itr != range.second)
@ -353,9 +357,9 @@ namespace llarp
ILinkLayer::SendTo(const RouterID& remote, const llarp_buffer_t& buf,
ILinkSession::CompletionHandler completed)
{
ILinkSession* s = nullptr;
std::shared_ptr< ILinkSession > s;
{
Lock l(&m_AuthedLinksMutex);
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
auto range = m_AuthedLinks.equal_range(remote);
auto itr = range.first;
// pick lowest backlog session
@ -363,16 +367,18 @@ namespace llarp
while(itr != range.second)
{
auto backlog = itr->second->SendQueueBacklog();
const auto backlog = itr->second->SendQueueBacklog();
if(backlog < min)
{
s = itr->second.get();
s = itr->second;
min = backlog;
}
++itr;
}
}
return s && s->SendMessageBuffer(buf, completed);
ILinkSession::Message_t pkt(buf.sz);
std::copy_n(buf.base, buf.sz, pkt.begin());
return s && s->SendMessageBuffer(std::move(pkt), completed);
}
bool
@ -431,7 +437,7 @@ namespace llarp
ILinkLayer::PutSession(const std::shared_ptr< ILinkSession >& s)
{
static constexpr size_t MaxSessionsPerEndpoint = 5;
Lock lock(&m_PendingMutex);
ACQUIRE_LOCK(Lock_t lock, m_PendingMutex);
llarp::Addr addr = s->GetRemoteEndpoint();
if(m_Pending.count(addr) >= MaxSessionsPerEndpoint)
return false;
@ -453,4 +459,22 @@ namespace llarp
tick_id = m_Logic->call_later({interval, this, &ILinkLayer::on_timer_tick});
}
void
ILinkLayer::udp_tick(llarp_udp_io* udp)
{
ILinkLayer* link = static_cast< ILinkLayer* >(udp->user);
auto pkts = std::make_shared< llarp_pkt_list >();
llarp_ev_udp_recvmany(&link->m_udp, pkts.get());
link->logic()->queue_func([pkts, link]() {
auto itr = pkts->begin();
while(itr != pkts->end())
{
link->RecvFrom(itr->remote, std::move(itr->pkt));
++itr;
}
link->Pump();
});
}
} // namespace llarp

@ -77,25 +77,7 @@ namespace llarp
LOCKS_EXCLUDED(m_AuthedLinksMutex);
static void
udp_tick(llarp_udp_io* udp)
{
static_cast< ILinkLayer* >(udp->user)->Pump();
}
static void
udp_recv_from(llarp_udp_io* udp, const sockaddr* from, ManagedBuffer buf)
{
if(!udp)
{
llarp::LogWarn("no udp set");
return;
}
const llarp::Addr srcaddr(*from);
// maybe check from too?
// no it's never null
static_cast< ILinkLayer* >(udp->user)->RecvFrom(
srcaddr, buf.underlying.base, buf.underlying.sz);
}
udp_tick(llarp_udp_io* udp);
void
SendTo_LL(const llarp::Addr& to, const llarp_buffer_t& pkt)
@ -114,7 +96,7 @@ namespace llarp
Pump();
virtual void
RecvFrom(const Addr& from, const void* buf, size_t sz) = 0;
RecvFrom(const Addr& from, ILinkSession::Packet_t pkt) = 0;
bool
PickAddress(const RouterContact& rc, AddressInfo& picked) const;
@ -122,8 +104,9 @@ namespace llarp
bool
TryEstablishTo(RouterContact rc);
virtual bool
Start(std::shared_ptr< llarp::Logic > l);
bool
Start(std::shared_ptr< llarp::Logic > l,
std::shared_ptr< thread::ThreadPool > worker);
virtual void
Stop();
@ -237,13 +220,18 @@ namespace llarp
const SecretKey& m_RouterEncSecret;
protected:
using Lock = util::NullLock;
using Mutex = util::NullMutex;
#ifdef TRACY_ENABLE
using Lock_t = std::lock_guard< LockableBase(std::mutex) >;
using Mutex_t = std::mutex;
#else
using Lock_t = util::NullLock;
using Mutex_t = util::NullMutex;
#endif
bool
PutSession(const std::shared_ptr< ILinkSession >& s);
std::shared_ptr< llarp::Logic > m_Logic = nullptr;
std::shared_ptr< llarp::Logic > m_Logic = nullptr;
std::shared_ptr< llarp::thread::ThreadPool > m_Worker = nullptr;
llarp_ev_loop_ptr m_Loop;
Addr m_ourAddr;
llarp_udp_io m_udp;
@ -255,11 +243,17 @@ namespace llarp
using Pending =
std::unordered_multimap< llarp::Addr, std::shared_ptr< ILinkSession >,
llarp::Addr::Hash >;
mutable Mutex m_AuthedLinksMutex ACQUIRED_BEFORE(m_PendingMutex);
mutable DECLARE_LOCK(Mutex_t, m_AuthedLinksMutex,
ACQUIRED_BEFORE(m_PendingMutex));
AuthedLinks m_AuthedLinks GUARDED_BY(m_AuthedLinksMutex);
mutable Mutex m_PendingMutex ACQUIRED_AFTER(m_AuthedLinksMutex);
mutable DECLARE_LOCK(Mutex_t, m_PendingMutex,
ACQUIRED_AFTER(m_AuthedLinksMutex));
Pending m_Pending GUARDED_BY(m_PendingMutex);
using TrafficEvent_t = std::pair< Addr, ILinkSession::Packet_t >;
using TrafficQueue_t = std::vector< TrafficEvent_t >;
std::shared_ptr< TrafficQueue_t > m_Recv;
};
using LinkLayer_ptr = std::shared_ptr< ILinkLayer >;

@ -3,6 +3,7 @@
#include <crypto/types.hpp>
#include <net/net.hpp>
#include <ev/ev.hpp>
#include <router_contact.hpp>
#include <util/types.hpp>
@ -44,9 +45,12 @@ namespace llarp
/// message delivery result hook function
using CompletionHandler = std::function< void(DeliveryStatus) >;
using Packet_t = PacketBuffer;
using Message_t = std::vector< byte_t >;
/// send a message buffer to the remote endpoint
virtual bool
SendMessageBuffer(const llarp_buffer_t &, CompletionHandler handler) = 0;
SendMessageBuffer(Message_t, CompletionHandler handler) = 0;
/// start the connection
virtual void
@ -57,8 +61,7 @@ namespace llarp
/// recv packet on low layer
/// not used by utp
virtual void
Recv_LL(const llarp_buffer_t &)
virtual void Recv_LL(Packet_t)
{
}

@ -238,6 +238,15 @@ namespace llarp
static void
SendLRCM(std::shared_ptr< LRCMFrameDecrypt > self)
{
if(self->context->HasTransitHop(self->hop->info))
{
llarp::LogError("duplicate transit hop", self->hop->info);
OnForwardLRCMResult(self->context->Router(), self->hop->info.rxID,
self->hop->info.downstream, self->hop->pathKey,
SendStatus::Congestion);
self->hop = nullptr;
return;
}
if(!self->context->Router()->ConnectionToRouterAllowed(
self->hop->info.upstream))
{
@ -287,15 +296,21 @@ namespace llarp
static void
SendPathConfirm(std::shared_ptr< LRCMFrameDecrypt > self)
{
// persist session to downstream until path expiration
self->context->Router()->PersistSessionUntil(
self->hop->info.downstream, self->hop->ExpireTime() + 10000);
// put hop
self->context->PutTransitHop(self->hop);
// send path confirmation
// TODO: other status flags?
uint64_t status = LR_StatusRecord::SUCCESS;
if(self->context->HasTransitHop(self->hop->info))
{
status = LR_StatusRecord::FAIL_DUPLICATE_HOP;
}
else
{
// persist session to downstream until path expiration
self->context->Router()->PersistSessionUntil(
self->hop->info.downstream, self->hop->ExpireTime() + 10000);
// put hop
self->context->PutTransitHop(self->hop);
}
if(!LR_StatusMessage::CreateAndSend(
self->context->Router(), self->hop->info.rxID,
@ -332,12 +347,7 @@ namespace llarp
info.txID = self->record.txid;
info.rxID = self->record.rxid;
info.upstream = self->record.nextHop;
if(self->context->HasTransitHop(info))
{
llarp::LogError("duplicate transit hop ", info);
self->decrypter = nullptr;
return;
}
// generate path key as we are in a worker thread
auto crypto = CryptoManager::instance();
if(!crypto->dh_server(self->hop->pathKey, self->record.commkey,
@ -387,7 +397,7 @@ namespace llarp
{
// we are the farthest hop
llarp::LogDebug("We are the farthest hop for ", info);
// send a LRAM down the path
// send a LRSM down the path
self->context->logic()->queue_func([=]() {
SendPathConfirm(self);
self->decrypter = nullptr;

@ -23,7 +23,7 @@ namespace llarp
struct LR_StatusRecord
{
static constexpr uint64_t SUCCESS = 1;
static constexpr uint64_t SUCCESS = 1 << 0;
static constexpr uint64_t FAIL_TIMEOUT = 1 << 1;
static constexpr uint64_t FAIL_CONGESTION = 1 << 2;
static constexpr uint64_t FAIL_DEST_UNKNOWN = 1 << 3;
@ -31,6 +31,7 @@ namespace llarp
static constexpr uint64_t FAIL_MALFORMED_RECORD = 1 << 5;
static constexpr uint64_t FAIL_DEST_INVALID = 1 << 6;
static constexpr uint64_t FAIL_CANNOT_CONNECT = 1 << 7;
static constexpr uint64_t FAIL_DUPLICATE_HOP = 1 << 8;
uint64_t status = 0;
uint64_t version = 0;

@ -0,0 +1,89 @@
#ifndef LLARP_NET_IP_RANGE_MAP_HPP
#define LLARP_NET_IP_RANGE_MAP_HPP
#include <net/ip.hpp>
namespace llarp
{
namespace net
{
/// a container that maps an ip range to a value that allows you to lookup
/// key by range hit
/// TODO: do some kind of magic shit to ensure near constant time for
/// lookups
template < typename Value_t >
struct IPRangeMap
{
using Range_t = IPRange;
using IP_t = Range_t::Addr_t;
using Entry_t = std::pair< Range_t, Value_t >;
using Container_t = std::forward_list< Entry_t >;
/// get a set of all values
std::set< Value_t >
Values() const
{
std::set< Value_t > all;
for(const auto &entry : m_Entries)
all.insert(entry.second);
return all;
}
void
ForEachValue(std::function< void(const Value_t &) > functor) const
{
for(const auto &entry : m_Entries)
functor(entry.second);
}
/// convert all values into type T using a transformer
template < typename T, typename Transformer >
std::set< T >
TransformValues(Transformer transform) const
{
std::set< T > transformed;
for(const auto &entry : m_Entries)
{
T val = transform(entry.second);
transformed.insert(std::move(val));
}
return transformed;
}
/// return a set of all values who's range contains this IP
std::set< Value_t >
FindAll(const IP_t &addr) const
{
std::set< Value_t > found;
for(const auto &entry : m_Entries)
{
if(entry.first.Contains(addr))
found.insert(entry.second);
}
return found;
}
struct CompareEntry
{
bool
operator()(const Entry_t &left, const Entry_t &right) const
{
return left.first < right.first;
}
};
void
Insert(const Range_t &addr, const Value_t &val)
{
m_Entries.emplace_front(addr, val);
m_Entries.sort(CompareEntry{});
}
private:
Container_t m_Entries;
};
} // namespace net
} // namespace llarp
#endif

@ -1026,6 +1026,52 @@ namespace llarp
return Contains(net::IPPacket::ExpandV4(ip));
}
bool
IPRange::FromString(std::string str)
{
const auto colinpos = str.find(":");
const auto slashpos = str.find("/");
std::string bitsstr;
if(slashpos != std::string::npos)
{
bitsstr = str.substr(slashpos + 1);
str = str.substr(0, slashpos);
}
if(colinpos == std::string::npos)
{
huint32_t ip;
if(!ip.FromString(str))
return false;
addr = net::IPPacket::ExpandV4(ip);
if(!bitsstr.empty())
{
auto bits = atoi(bitsstr.c_str());
if(bits < 0 || bits > 32)
return false;
netmask_bits = netmask_ipv6_bits(96 + bits);
}
else
netmask_bits = netmask_ipv6_bits(128);
}
else
{
if(!addr.FromString(str))
return false;
if(!bitsstr.empty())
{
auto bits = atoi(bitsstr.c_str());
if(bits < 0 || bits > 128)
return false;
netmask_bits = netmask_ipv6_bits(bits);
}
else
{
netmask_bits = netmask_ipv6_bits(128);
}
}
return true;
}
std::string
IPRange::ToString() const
{

@ -59,12 +59,13 @@ namespace llarp
{
struct IPRange
{
huint128_t addr;
huint128_t netmask_bits;
using Addr_t = huint128_t;
huint128_t addr = {0};
huint128_t netmask_bits = {0};
/// return true if ip is contained in this ip range
bool
Contains(const huint128_t& ip) const
Contains(const Addr_t& ip) const
{
return (addr & netmask_bits) == (ip & netmask_bits);
}
@ -78,8 +79,28 @@ namespace llarp
return out << a.ToString();
}
/// get the highest address on this range
huint128_t
HighestAddr() const
{
return (addr & netmask_bits)
+ (huint128_t{1} << (128 - bits::count_bits_128(netmask_bits.h)))
- huint128_t{1};
}
bool
operator<(const IPRange& other) const
{
return (this->addr & this->netmask_bits)
< (other.addr & other.netmask_bits)
|| this->netmask_bits < other.netmask_bits;
}
std::string
ToString() const;
bool
FromString(std::string str);
};
huint128_t

@ -40,6 +40,18 @@ namespace llarp
return huint_t{UInt_t{h | x.h}};
}
constexpr huint_t
operator-(huint_t x) const
{
return huint_t{UInt_t{h - x.h}};
}
constexpr huint_t
operator+(huint_t x) const
{
return huint_t{UInt_t{h + x.h}};
}
constexpr huint_t
operator^(huint_t x) const
{

@ -113,31 +113,18 @@ llarp_nodedb::getRCFilePath(const llarp::RouterID &pubkey) const
return filepath.string();
}
static void
handle_async_insert_rc(llarp_nodedb *nodedb, const llarp::RouterContact &rc,
std::shared_ptr< llarp::Logic > logic,
const std::function< void(void) > &completedHook)
{
nodedb->Insert(rc);
if(logic && completedHook)
{
logic->queue_func(completedHook);
}
}
void
llarp_nodedb::InsertAsync(llarp::RouterContact rc,
std::shared_ptr< llarp::Logic > logic,
std::function< void(void) > completionHandler)
{
const auto job =
std::bind(&handle_async_insert_rc, this, rc, logic, completionHandler);
size_t tries = 10;
while((!disk->addJob(job)) && tries-- > 0)
std::this_thread::sleep_for(std::chrono::milliseconds(1));
// on fail do synchronous write
if(tries == 0)
Insert(rc);
disk->addJob([this, rc, logic, completionHandler]() {
this->Insert(rc);
if(logic && completionHandler)
{
logic->queue_func([completionHandler] { completionHandler(); });
}
});
}
bool
@ -197,8 +184,8 @@ llarp_nodedb::Insert(const llarp::RouterContact &rc)
if(itr != entries.end())
entries.erase(itr);
entries.emplace(rc.pubkey.as_array(), rc);
LogInfo("Added or updated RC for ", llarp::RouterID(rc.pubkey),
" to nodedb. Current nodedb count is: ", entries.size());
LogDebug("Added or updated RC for ", llarp::RouterID(rc.pubkey),
" to nodedb. Current nodedb count is: ", entries.size());
}
return true;
}

@ -1 +1,37 @@
#include <path/ihophandler.hpp>
namespace llarp
{
namespace path
{
// handle data in upstream direction
bool
IHopHandler::HandleUpstream(const llarp_buffer_t& X, const TunnelNonce& Y,
AbstractRouter*)
{
if(m_UpstreamQueue == nullptr)
m_UpstreamQueue = std::make_shared< TrafficQueue_t >();
m_UpstreamQueue->emplace_back();
auto& pkt = m_UpstreamQueue->back();
pkt.first.resize(X.sz);
std::copy_n(X.base, X.sz, pkt.first.begin());
pkt.second = Y;
return true;
}
// handle data in downstream direction
bool
IHopHandler::HandleDownstream(const llarp_buffer_t& X, const TunnelNonce& Y,
AbstractRouter*)
{
if(m_DownstreamQueue == nullptr)
m_DownstreamQueue = std::make_shared< TrafficQueue_t >();
m_DownstreamQueue->emplace_back();
auto& pkt = m_DownstreamQueue->back();
pkt.first.resize(X.sz);
std::copy_n(X.base, X.sz, pkt.first.begin());
pkt.second = Y;
return true;
}
} // namespace path
} // namespace llarp

@ -4,6 +4,8 @@
#include <crypto/types.hpp>
#include <util/types.hpp>
#include <crypto/encrypted_frame.hpp>
#include <messages/relay.hpp>
#include <vector>
#include <memory>
@ -22,6 +24,10 @@ namespace llarp
{
struct IHopHandler
{
using TrafficEvent_t = std::pair< std::vector< byte_t >, TunnelNonce >;
using TrafficQueue_t = std::vector< TrafficEvent_t >;
using TrafficQueue_ptr = std::shared_ptr< TrafficQueue_t >;
virtual ~IHopHandler() = default;
virtual bool
@ -35,14 +41,13 @@ namespace llarp
SendRoutingMessage(const routing::IMessage& msg, AbstractRouter* r) = 0;
// handle data in upstream direction
virtual bool
bool
HandleUpstream(const llarp_buffer_t& X, const TunnelNonce& Y,
AbstractRouter* r) = 0;
AbstractRouter*);
// handle data in downstream direction
virtual bool
bool
HandleDownstream(const llarp_buffer_t& X, const TunnelNonce& Y,
AbstractRouter* r) = 0;
AbstractRouter*);
/// return timestamp last remote activity happened at
virtual llarp_time_t
@ -57,9 +62,29 @@ namespace llarp
{
return m_SequenceNum++;
}
virtual void
FlushUpstream(AbstractRouter* r) = 0;
virtual void
FlushDownstream(AbstractRouter* r) = 0;
protected:
uint64_t m_SequenceNum = 0;
TrafficQueue_ptr m_UpstreamQueue;
TrafficQueue_ptr m_DownstreamQueue;
virtual void
UpstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) = 0;
virtual void
DownstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) = 0;
virtual void
HandleAllUpstream(std::vector< RelayUpstreamMessage > msgs,
AbstractRouter* r) = 0;
virtual void
HandleAllDownstream(std::vector< RelayDownstreamMessage > msgs,
AbstractRouter* r) = 0;
};
using HopHandler_ptr = std::shared_ptr< IHopHandler >;

@ -1,6 +1,7 @@
#include <path/path.hpp>
#include <exit/exit_messages.hpp>
#include <link/i_link_manager.hpp>
#include <messages/discard.hpp>
#include <messages/relay_commit.hpp>
#include <messages/relay_status.hpp>
@ -341,6 +342,7 @@ namespace llarp
m_LastLatencyTestID = latency.T;
m_LastLatencyTestTime = now;
SendRoutingMessage(latency, r);
FlushUpstream(r);
return;
}
if(m_LastRecvMessage && now > m_LastRecvMessage)
@ -363,24 +365,67 @@ namespace llarp
}
}
bool
Path::HandleUpstream(const llarp_buffer_t& buf, const TunnelNonce& Y,
AbstractRouter* r)
void
Path::HandleAllUpstream(std::vector< RelayUpstreamMessage > msgs,
AbstractRouter* r)
{
TunnelNonce n = Y;
for(const auto& hop : hops)
for(const auto& msg : msgs)
{
CryptoManager::instance()->xchacha20(buf, hop.shared, n);
n ^= hop.nonceXOR;
if(!r->SendToOrQueue(Upstream(), &msg))
{
LogDebug("failed to send upstream to ", Upstream());
}
}
RelayUpstreamMessage msg;
msg.X = buf;
msg.Y = Y;
msg.pathid = TXID();
if(r->SendToOrQueue(Upstream(), &msg))
return true;
LogError("send to ", Upstream(), " failed");
return false;
r->linkManager().PumpLinks();
}
void
Path::UpstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r)
{
std::vector< RelayUpstreamMessage > sendmsgs(msgs->size());
size_t idx = 0;
for(auto& ev : *msgs)
{
const llarp_buffer_t buf(ev.first);
TunnelNonce n = ev.second;
for(const auto& hop : hops)
{
CryptoManager::instance()->xchacha20(buf, hop.shared, n);
n ^= hop.nonceXOR;
}
auto& msg = sendmsgs[idx];
msg.X = buf;
msg.Y = ev.second;
msg.pathid = TXID();
++idx;
}
r->logic()->queue_func(std::bind(&Path::HandleAllUpstream,
shared_from_this(), std::move(sendmsgs),
r));
}
void
Path::FlushUpstream(AbstractRouter* r)
{
if(m_UpstreamQueue && !m_UpstreamQueue->empty())
{
r->threadpool()->addJob(std::bind(&Path::UpstreamWork,
shared_from_this(),
std::move(m_UpstreamQueue), r));
}
m_UpstreamQueue = nullptr;
}
void
Path::FlushDownstream(AbstractRouter* r)
{
if(m_DownstreamQueue && !m_DownstreamQueue->empty())
{
r->threadpool()->addJob(std::bind(&Path::DownstreamWork,
shared_from_this(),
std::move(m_DownstreamQueue), r));
}
m_DownstreamQueue = nullptr;
}
bool
@ -406,20 +451,44 @@ namespace llarp
return ss.str();
}
bool
Path::HandleDownstream(const llarp_buffer_t& buf, const TunnelNonce& Y,
AbstractRouter* r)
void
Path::DownstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r)
{
TunnelNonce n = Y;
for(const auto& hop : hops)
std::vector< RelayDownstreamMessage > sendMsgs(msgs->size());
size_t idx = 0;
for(auto& ev : *msgs)
{
n ^= hop.nonceXOR;
CryptoManager::instance()->xchacha20(buf, hop.shared, n);
const llarp_buffer_t buf(ev.first);
sendMsgs[idx].Y = ev.second;
for(const auto& hop : hops)
{
sendMsgs[idx].Y ^= hop.nonceXOR;
CryptoManager::instance()->xchacha20(buf, hop.shared,
sendMsgs[idx].Y);
}
sendMsgs[idx].X = buf;
++idx;
}
if(!HandleRoutingMessage(buf, r))
return false;
m_LastRecvMessage = r->Now();
return true;
r->logic()->queue_func(std::bind(&Path::HandleAllDownstream,
shared_from_this(), std::move(sendMsgs),
r));
}
void
Path::HandleAllDownstream(std::vector< RelayDownstreamMessage > msgs,
AbstractRouter* r)
{
for(const auto& msg : msgs)
{
const llarp_buffer_t buf(msg.X);
if(!HandleRoutingMessage(buf, r))
{
LogWarn("failed to handle downstream message");
continue;
}
m_LastRecvMessage = r->Now();
}
FlushUpstream(r);
}
bool
@ -521,7 +590,10 @@ namespace llarp
latency.T = randint();
m_LastLatencyTestID = latency.T;
m_LastLatencyTestTime = now;
return SendRoutingMessage(latency, r);
if(!SendRoutingMessage(latency, r))
return false;
FlushUpstream(r);
return true;
}
LogWarn("got unwarranted path confirm message on tx=", RXID(),
" rx=", RXID());

@ -108,6 +108,34 @@ namespace llarp
return _role;
}
struct Hash
{
size_t
operator()(const Path& p) const
{
const auto& tx = p.hops[0].txID;
const auto& rx = p.hops[0].rxID;
const auto& r = p.hops[0].upstream;
const size_t rhash =
std::accumulate(r.begin(), r.end(), 0, std::bit_xor< size_t >());
return std::accumulate(rx.begin(), rx.begin(),
std::accumulate(tx.begin(), tx.end(), rhash,
std::bit_xor< size_t >()),
std::bit_xor< size_t >());
}
};
struct Ptr_Hash
{
size_t
operator()(const std::shared_ptr< Path >& p) const
{
if(p == nullptr)
return 0;
return Hash{}(*p);
}
};
bool
operator<(const Path& other) const
{
@ -286,16 +314,6 @@ namespace llarp
bool
HandleRoutingMessage(const llarp_buffer_t& buf, AbstractRouter* r);
// handle data in upstream direction
bool
HandleUpstream(const llarp_buffer_t& X, const TunnelNonce& Y,
AbstractRouter* r) override;
// handle data in downstream direction
bool
HandleDownstream(const llarp_buffer_t& X, const TunnelNonce& Y,
AbstractRouter* r) override;
bool
IsReady() const;
@ -334,6 +352,27 @@ namespace llarp
bool
SendExitClose(const routing::CloseExitMessage& msg, AbstractRouter* r);
void
FlushUpstream(AbstractRouter* r) override;
void
FlushDownstream(AbstractRouter* r) override;
protected:
void
UpstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) override;
void
DownstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) override;
void
HandleAllUpstream(std::vector< RelayUpstreamMessage > msgs,
AbstractRouter* r) override;
void
HandleAllDownstream(std::vector< RelayDownstreamMessage > msgs,
AbstractRouter* r) override;
private:
/// call obtained exit hooks
bool

@ -83,12 +83,12 @@ namespace llarp
return true;
}
template < typename Map_t, typename Key_t, typename CheckValue_t,
typename GetFunc_t >
template < typename Lock_t, typename Map_t, typename Key_t,
typename CheckValue_t, typename GetFunc_t >
HopHandler_ptr
MapGet(Map_t& map, const Key_t& k, CheckValue_t check, GetFunc_t get)
{
util::Lock lock(&map.first);
Lock_t lock(&map.first);
auto range = map.second.equal_range(k);
for(auto i = range.first; i != range.second; ++i)
{
@ -98,11 +98,12 @@ namespace llarp
return nullptr;
}
template < typename Map_t, typename Key_t, typename CheckValue_t >
template < typename Lock_t, typename Map_t, typename Key_t,
typename CheckValue_t >
bool
MapHas(Map_t& map, const Key_t& k, CheckValue_t check)
{
util::Lock lock(&map.first);
Lock_t lock(&map.first);
auto range = map.second.equal_range(k);
for(auto i = range.first; i != range.second; ++i)
{
@ -112,28 +113,30 @@ namespace llarp
return false;
}
template < typename Map_t, typename Key_t, typename Value_t >
template < typename Lock_t, typename Map_t, typename Key_t,
typename Value_t >
void
MapPut(Map_t& map, const Key_t& k, const Value_t& v)
{
util::Lock lock(&map.first);
Lock_t lock(&map.first);
map.second.emplace(k, v);
}
template < typename Map_t, typename Visit_t >
template < typename Lock_t, typename Map_t, typename Visit_t >
void
MapIter(Map_t& map, Visit_t v)
{
util::Lock lock(map.first);
Lock_t lock(map.first);
for(const auto& item : map.second)
v(item);
}
template < typename Map_t, typename Key_t, typename Check_t >
template < typename Lock_t, typename Map_t, typename Key_t,
typename Check_t >
void
MapDel(Map_t& map, const Key_t& k, Check_t check)
{
util::Lock lock(map.first);
Lock_t lock(map.first);
auto range = map.second.equal_range(k);
for(auto i = range.first; i != range.second;)
{
@ -148,23 +151,24 @@ namespace llarp
PathContext::AddOwnPath(PathSet_ptr set, Path_ptr path)
{
set->AddPath(path);
MapPut(m_OurPaths, path->TXID(), set);
MapPut(m_OurPaths, path->RXID(), set);
MapPut< SyncOwnedPathsMap_t::Lock_t >(m_OurPaths, path->TXID(), set);
MapPut< SyncOwnedPathsMap_t::Lock_t >(m_OurPaths, path->RXID(), set);
}
bool
PathContext::HasTransitHop(const TransitHopInfo& info)
{
return MapHas(m_TransitPaths, info.txID,
[info](const std::shared_ptr< TransitHop >& hop) -> bool {
return info == hop->info;
});
return MapHas< SyncTransitMap_t::Lock_t >(
m_TransitPaths, info.txID,
[info](const std::shared_ptr< TransitHop >& hop) -> bool {
return info == hop->info;
});
}
HopHandler_ptr
PathContext::GetByUpstream(const RouterID& remote, const PathID_t& id)
{
auto own = MapGet(
auto own = MapGet< SyncOwnedPathsMap_t::Lock_t >(
m_OurPaths, id,
[](const PathSet_ptr) -> bool {
// TODO: is this right?
@ -176,7 +180,7 @@ namespace llarp
if(own)
return own;
return MapGet(
return MapGet< SyncTransitMap_t::Lock_t >(
m_TransitPaths, id,
[remote](const std::shared_ptr< TransitHop >& hop) -> bool {
return hop->info.upstream == remote;
@ -190,7 +194,7 @@ namespace llarp
PathContext::TransitHopPreviousIsRouter(const PathID_t& path,
const RouterID& otherRouter)
{
util::Lock lock(&m_TransitPaths.first);
SyncTransitMap_t::Lock_t lock(&m_TransitPaths.first);
auto itr = m_TransitPaths.second.find(path);
if(itr == m_TransitPaths.second.end())
return false;
@ -200,7 +204,7 @@ namespace llarp
HopHandler_ptr
PathContext::GetByDownstream(const RouterID& remote, const PathID_t& id)
{
return MapGet(
return MapGet< SyncTransitMap_t::Lock_t >(
m_TransitPaths, id,
[remote](const std::shared_ptr< TransitHop >& hop) -> bool {
return hop->info.downstream == remote;
@ -214,7 +218,7 @@ namespace llarp
PathContext::GetLocalPathSet(const PathID_t& id)
{
auto& map = m_OurPaths;
util::Lock lock(&map.first);
SyncOwnedPathsMap_t::Lock_t lock(&map.first);
auto itr = map.second.find(id);
if(itr != map.second.end())
{
@ -241,7 +245,7 @@ namespace llarp
RouterID us(OurRouterID());
auto& map = m_TransitPaths;
{
util::Lock lock(&map.first);
SyncTransitMap_t::Lock_t lock(&map.first);
auto range = map.second.equal_range(id);
for(auto i = range.first; i != range.second; ++i)
{
@ -252,18 +256,33 @@ namespace llarp
return nullptr;
}
void
PathContext::PumpUpstream()
{
m_TransitPaths.ForEach([&](auto& ptr) { ptr->FlushUpstream(m_Router); });
m_OurPaths.ForEach([&](auto& ptr) { ptr->UpstreamFlush(m_Router); });
}
void
PathContext::PumpDownstream()
{
m_TransitPaths.ForEach(
[&](auto& ptr) { ptr->FlushDownstream(m_Router); });
m_OurPaths.ForEach([&](auto& ptr) { ptr->DownstreamFlush(m_Router); });
}
void
PathContext::PutTransitHop(std::shared_ptr< TransitHop > hop)
{
MapPut(m_TransitPaths, hop->info.txID, hop);
MapPut(m_TransitPaths, hop->info.rxID, hop);
MapPut< SyncTransitMap_t::Lock_t >(m_TransitPaths, hop->info.txID, hop);
MapPut< SyncTransitMap_t::Lock_t >(m_TransitPaths, hop->info.rxID, hop);
}
void
PathContext::ExpirePaths(llarp_time_t now)
{
{
util::Lock lock(&m_TransitPaths.first);
SyncTransitMap_t::Lock_t lock(&m_TransitPaths.first);
auto& map = m_TransitPaths.second;
auto itr = map.begin();
while(itr != map.end())
@ -277,7 +296,7 @@ namespace llarp
}
}
{
util::Lock lock(&m_OurPaths.first);
SyncOwnedPathsMap_t::Lock_t lock(&m_OurPaths.first);
auto& map = m_OurPaths.second;
for(auto& item : map)
{
@ -299,7 +318,7 @@ namespace llarp
const RouterID us(OurRouterID());
auto& map = m_TransitPaths;
{
util::Lock lock(&map.first);
SyncTransitMap_t::Lock_t lock(&map.first);
auto range = map.second.equal_range(id);
for(auto i = range.first; i != range.second; ++i)
{
@ -313,7 +332,7 @@ namespace llarp
void
PathContext::RemovePathSet(PathSet_ptr set)
{
util::Lock lock(&m_OurPaths.first);
SyncOwnedPathsMap_t::Lock_t lock(&m_OurPaths.first);
auto& map = m_OurPaths.second;
auto itr = map.begin();
while(itr != map.end())

@ -37,6 +37,12 @@ namespace llarp
void
ExpirePaths(llarp_time_t now);
void
PumpUpstream();
void
PumpDownstream();
void
AllowTransit();
@ -102,14 +108,17 @@ namespace llarp
struct SyncTransitMap_t
{
util::Mutex first; // protects second
using Mutex_t = util::NullMutex;
using Lock_t = util::NullLock;
Mutex_t first; // protects second
TransitHopsMap_t second GUARDED_BY(first);
void
ForEach(std::function< void(const TransitHop_ptr&) > visit)
LOCKS_EXCLUDED(first)
{
util::Lock lock(&first);
Lock_t lock(&first);
for(const auto& item : second)
visit(item.second);
}
@ -120,13 +129,15 @@ namespace llarp
struct SyncOwnedPathsMap_t
{
util::Mutex first; // protects second
using Mutex_t = util::Mutex;
using Lock_t = util::Lock;
Mutex_t first; // protects second
OwnedPathsMap_t second GUARDED_BY(first);
void
ForEach(std::function< void(const PathSet_ptr&) > visit)
{
util::Lock lock(&first);
Lock_t lock(&first);
for(const auto& item : second)
visit(item.second);
}

@ -214,7 +214,12 @@ namespace llarp
if(s && s->IsEstablished() && isOutbound && !got)
{
const RouterContact rc = s->GetRemoteRC();
#ifdef TESTNET
if(got || exclude.count(rc.pubkey))
#else
if(got || exclude.count(rc.pubkey)
|| m_router->IsBootstrapNode(rc.pubkey))
#endif
return;
cur = rc;
got = true;
@ -391,7 +396,7 @@ namespace llarp
for(size_t idx = 0; idx < hops.size(); ++idx)
{
hops[idx].Clear();
size_t tries = 4;
size_t tries = 32;
while(tries > 0 && !SelectHop(nodedb, exclude, hops[idx], idx, roles))
{
--tries;
@ -401,7 +406,7 @@ namespace llarp
LogWarn(Name(), " failed to select hop ", idx);
return false;
}
exclude.insert(hops[idx].pubkey);
exclude.emplace(hops[idx].pubkey);
}
return true;
}

@ -375,5 +375,17 @@ namespace llarp
return nullptr;
}
void
PathSet::UpstreamFlush(AbstractRouter* r)
{
ForEachPath([r](const Path_ptr& p) { p->FlushUpstream(r); });
}
void
PathSet::DownstreamFlush(AbstractRouter* r)
{
ForEachPath([r](const Path_ptr& p) { p->FlushDownstream(r); });
}
} // namespace path
} // namespace llarp

@ -275,6 +275,12 @@ namespace llarp
}
}
void
UpstreamFlush(AbstractRouter* r);
void
DownstreamFlush(AbstractRouter* r);
size_t numPaths;
protected:

@ -3,6 +3,7 @@
#include <dht/context.hpp>
#include <exit/context.hpp>
#include <exit/exit_messages.hpp>
#include <link/i_link_manager.hpp>
#include <messages/discard.hpp>
#include <messages/relay_commit.hpp>
#include <messages/relay_status.hpp>
@ -114,39 +115,109 @@ namespace llarp
return HandleDownstream(buf, N, r);
}
bool
TransitHop::HandleDownstream(const llarp_buffer_t& buf,
const TunnelNonce& Y, AbstractRouter* r)
void
TransitHop::DownstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r)
{
RelayDownstreamMessage msg;
msg.pathid = info.rxID;
msg.Y = Y ^ nonceXOR;
CryptoManager::instance()->xchacha20(buf, pathKey, Y);
msg.X = buf;
llarp::LogDebug("relay ", msg.X.size(), " bytes downstream from ",
info.upstream, " to ", info.downstream);
return r->SendToOrQueue(info.downstream, &msg);
std::vector< RelayDownstreamMessage > sendmsgs(msgs->size());
size_t idx = 0;
for(auto& ev : *msgs)
{
const llarp_buffer_t buf(ev.first);
auto& msg = sendmsgs[idx];
msg.pathid = info.rxID;
msg.Y = ev.second ^ nonceXOR;
CryptoManager::instance()->xchacha20(buf, pathKey, ev.second);
msg.X = buf;
llarp::LogDebug("relay ", msg.X.size(), " bytes downstream from ",
info.upstream, " to ", info.downstream);
++idx;
}
r->logic()->queue_func(std::bind(&TransitHop::HandleAllDownstream,
shared_from_this(), std::move(sendmsgs),
r));
}
bool
TransitHop::HandleUpstream(const llarp_buffer_t& buf, const TunnelNonce& Y,
AbstractRouter* r)
void
TransitHop::UpstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r)
{
std::vector< RelayUpstreamMessage > sendmsgs(msgs->size());
size_t idx = 0;
for(auto& ev : *msgs)
{
const llarp_buffer_t buf(ev.first);
auto& msg = sendmsgs[idx];
CryptoManager::instance()->xchacha20(buf, pathKey, ev.second);
msg.pathid = info.txID;
msg.Y = ev.second ^ nonceXOR;
msg.X = buf;
++idx;
}
r->logic()->queue_func(std::bind(&TransitHop::HandleAllUpstream,
shared_from_this(), std::move(sendmsgs),
r));
}
void
TransitHop::HandleAllUpstream(std::vector< RelayUpstreamMessage > msgs,
AbstractRouter* r)
{
CryptoManager::instance()->xchacha20(buf, pathKey, Y);
if(IsEndpoint(r->pubkey()))
{
m_LastActivity = r->Now();
return r->ParseRoutingMessageBuffer(buf, this, info.rxID);
for(const auto& msg : msgs)
{
const llarp_buffer_t buf(msg.X);
if(!r->ParseRoutingMessageBuffer(buf, this, info.rxID))
{
LogWarn("invalid upstream data on endpoint ", info);
}
m_LastActivity = r->Now();
}
FlushDownstream(r);
}
else
{
for(const auto& msg : msgs)
{
llarp::LogDebug("relay ", msg.X.size(), " bytes upstream from ",
info.downstream, " to ", info.upstream);
r->SendToOrQueue(info.upstream, &msg);
}
}
r->linkManager().PumpLinks();
}
void
TransitHop::HandleAllDownstream(std::vector< RelayDownstreamMessage > msgs,
AbstractRouter* r)
{
for(const auto& msg : msgs)
{
llarp::LogDebug("relay ", msg.X.size(), " bytes downstream from ",
info.upstream, " to ", info.downstream);
r->SendToOrQueue(info.downstream, &msg);
}
r->linkManager().PumpLinks();
}
void
TransitHop::FlushUpstream(AbstractRouter* r)
{
if(m_UpstreamQueue && !m_UpstreamQueue->empty())
r->threadpool()->addJob(std::bind(&TransitHop::UpstreamWork,
shared_from_this(),
std::move(m_UpstreamQueue), r));
RelayUpstreamMessage msg;
msg.pathid = info.txID;
msg.Y = Y ^ nonceXOR;
m_UpstreamQueue = nullptr;
}
msg.X = buf;
llarp::LogDebug("relay ", msg.X.size(), " bytes upstream from ",
info.downstream, " to ", info.upstream);
return r->SendToOrQueue(info.upstream, &msg);
void
TransitHop::FlushDownstream(AbstractRouter* r)
{
if(m_DownstreamQueue && !m_DownstreamQueue->empty())
r->threadpool()->addJob(std::bind(&TransitHop::DownstreamWork,
shared_from_this(),
std::move(m_DownstreamQueue), r));
m_DownstreamQueue = nullptr;
}
bool
@ -228,7 +299,6 @@ namespace llarp
{
if(SendRoutingMessage(reply, r))
{
r->PumpLL();
ep->Close();
return true;
}
@ -364,7 +434,7 @@ namespace llarp
void
TransitHop::QueueDestroySelf(AbstractRouter* r)
{
auto func = std::bind(&TransitHop::SetSelfDestruct, this);
auto func = std::bind(&TransitHop::SetSelfDestruct, shared_from_this());
r->logic()->queue_func(func);
}
} // namespace path

@ -79,7 +79,9 @@ namespace llarp
return info.print(out, -1, -1);
}
struct TransitHop : public IHopHandler, public routing::IMessageHandler
struct TransitHop : public IHopHandler,
public routing::IMessageHandler,
std::enable_shared_from_this< TransitHop >
{
TransitHop();
@ -193,15 +195,26 @@ namespace llarp
bool
HandleDHTMessage(const dht::IMessage& msg, AbstractRouter* r) override;
// handle data in upstream direction
bool
HandleUpstream(const llarp_buffer_t& X, const TunnelNonce& Y,
AbstractRouter* r) override;
void
FlushUpstream(AbstractRouter* r) override;
// handle data in downstream direction
bool
HandleDownstream(const llarp_buffer_t& X, const TunnelNonce& Y,
AbstractRouter* r) override;
void
FlushDownstream(AbstractRouter* r) override;
protected:
void
UpstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) override;
void
DownstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) override;
void
HandleAllUpstream(std::vector< RelayUpstreamMessage > msgs,
AbstractRouter* r) override;
void
HandleAllDownstream(std::vector< RelayDownstreamMessage > msgs,
AbstractRouter* r) override;
private:
void

@ -56,6 +56,9 @@ namespace llarp
virtual void
ExploreNetwork() = 0;
virtual size_t
NumberOfStrictConnectRouters() const = 0;
};
} // namespace llarp

@ -149,6 +149,12 @@ namespace llarp
return true;
}
size_t
RCLookupHandler::NumberOfStrictConnectRouters() const
{
return _strictConnectPubkeys.size();
}
bool
RCLookupHandler::GetRandomWhitelistRouter(RouterID &router) const
{

@ -64,6 +64,9 @@ namespace llarp
void
ExploreNetwork() override;
size_t
NumberOfStrictConnectRouters() const override;
void
Init(llarp_dht_context *dht, llarp_nodedb *nodedb,
std::shared_ptr< llarp::thread::ThreadPool > threadpool,

@ -172,6 +172,10 @@ namespace llarp
void
Router::PumpLL()
{
if(_stopping.load())
return;
paths.PumpUpstream();
paths.PumpDownstream();
_linkManager.PumpLinks();
}
@ -298,7 +302,7 @@ namespace llarp
return;
auto *self = static_cast< Router * >(user);
self->ticker_job_id = 0;
self->Tick();
self->logic()->queue_func(std::bind(&Router::Tick, self));
self->ScheduleTicker(orig);
}
@ -706,9 +710,16 @@ namespace llarp
_rcLookupHandler.ExploreNetwork();
}
if(connected < _outboundSessionMaker.minConnectedRouters)
size_t connectToNum = _outboundSessionMaker.minConnectedRouters;
const auto strictConnect = _rcLookupHandler.NumberOfStrictConnectRouters();
if(strictConnect > 0 && connectToNum > strictConnect)
{
size_t dlt = _outboundSessionMaker.minConnectedRouters - connected;
connectToNum = strictConnect;
}
if(connected < connectToNum)
{
size_t dlt = connectToNum - connected;
LogInfo("connecting to ", dlt, " random routers to keep alive");
_outboundSessionMaker.ConnectToRandomRouters(dlt, now);
}
@ -945,7 +956,7 @@ namespace llarp
return false;
}
_outboundSessionMaker.SetOurRouter(pubkey());
if(!_linkManager.StartLinks(_logic))
if(!_linkManager.StartLinks(_logic, cryptoworker))
{
LogWarn("One or more links failed to start.");
return false;
@ -1020,6 +1031,8 @@ namespace llarp
LogInfo("have ", _nodedb->num_loaded(), " routers");
_netloop->add_ticker(std::bind(&Router::PumpLL, this));
ScheduleTicker(1000);
_running.store(true);
_startedAt = Now();
@ -1071,6 +1084,7 @@ namespace llarp
_exitContext.Stop();
if(rpcServer)
rpcServer->Stop();
paths.PumpUpstream();
_linkManager.PumpLinks();
_logic->call_later({200, this, &RouterAfterStopIssued});
}

@ -17,6 +17,7 @@ namespace llarp
: logic(std::move(l))
, m_remote(std::move(r))
, m_LocalIdentity(localident)
, frame(std::make_shared< ProtocolFrame >())
, introPubKey(introsetPubKey)
, remoteIntro(remote)
, handler(h)
@ -26,34 +27,31 @@ namespace llarp
}
void
AsyncKeyExchange::Result(void* user)
AsyncKeyExchange::Result(std::shared_ptr< AsyncKeyExchange > self)
{
auto* self = static_cast< AsyncKeyExchange* >(user);
// put values
self->handler->PutSenderFor(self->msg.tag, self->m_remote, false);
self->handler->PutCachedSessionKeyFor(self->msg.tag, self->sharedKey);
self->handler->PutIntroFor(self->msg.tag, self->remoteIntro);
self->handler->PutReplyIntroFor(self->msg.tag, self->msg.introReply);
self->hook(self->frame);
delete self;
}
void
AsyncKeyExchange::Encrypt(void* user)
AsyncKeyExchange::Encrypt(std::shared_ptr< AsyncKeyExchange > self)
{
auto* self = static_cast< AsyncKeyExchange* >(user);
// derive ntru session key component
SharedSecret K;
auto crypto = CryptoManager::instance();
crypto->pqe_encrypt(self->frame.C, K, self->introPubKey);
crypto->pqe_encrypt(self->frame->C, K, self->introPubKey);
// randomize Nonce
self->frame.N.Randomize();
self->frame->N.Randomize();
// compure post handshake session key
// PKE (A, B, N)
SharedSecret sharedSecret;
path_dh_func dh_client = util::memFn(&Crypto::dh_client, crypto);
if(!self->m_LocalIdentity.KeyExchange(dh_client, sharedSecret,
self->m_remote, self->frame.N))
self->m_remote, self->frame->N))
{
LogError("failed to derive x25519 shared key component");
}
@ -70,12 +68,11 @@ namespace llarp
// set version
self->msg.version = LLARP_PROTO_VERSION;
// encrypt and sign
if(self->frame.EncryptAndSign(self->msg, K, self->m_LocalIdentity))
self->logic->queue_job({self, &Result});
if(self->frame->EncryptAndSign(self->msg, K, self->m_LocalIdentity))
self->logic->queue_func(std::bind(&AsyncKeyExchange::Result, self));
else
{
LogError("failed to encrypt and sign");
delete self;
}
}
} // namespace service

@ -12,17 +12,18 @@ namespace llarp
namespace service
{
struct AsyncKeyExchange
: public std::enable_shared_from_this< AsyncKeyExchange >
{
std::shared_ptr< Logic > logic;
SharedSecret sharedKey;
ServiceInfo m_remote;
const Identity& m_LocalIdentity;
ProtocolMessage msg;
ProtocolFrame frame;
std::shared_ptr< ProtocolFrame > frame;
Introduction intro;
const PQPubKey introPubKey;
Introduction remoteIntro;
std::function< void(ProtocolFrame&) > hook;
std::function< void(std::shared_ptr< ProtocolFrame >) > hook;
IDataHandler* handler;
ConvoTag tag;
@ -33,11 +34,11 @@ namespace llarp
const ConvoTag& t, ProtocolType proto);
static void
Result(void* user);
Result(std::shared_ptr< AsyncKeyExchange > user);
/// given protocol message make protocol frame
static void
Encrypt(void* user);
Encrypt(std::shared_ptr< AsyncKeyExchange > user);
};
} // namespace service

@ -638,9 +638,24 @@ namespace llarp
std::set< RouterID > exclude = prev;
for(const auto& snode : SnodeBlacklist())
exclude.insert(snode);
if(hop == 0)
{
const auto exits = GetExitRouters();
// exclude exit node as first hop in any paths
exclude.insert(exits.begin(), exits.end());
}
return path::Builder::SelectHop(db, exclude, cur, hop, roles);
}
std::set< RouterID >
Endpoint::GetExitRouters() const
{
return m_ExitMap.TransformValues< RouterID >(
[](const exit::BaseSession_ptr& ptr) -> RouterID {
return ptr->Endpoint();
});
}
bool
Endpoint::ShouldBundleRC() const
{
@ -1063,6 +1078,7 @@ namespace llarp
MarkConvoTagActive(item.first->T.T);
}
m_state->m_SendQueue.clear();
router->PumpLL();
}
bool
@ -1129,30 +1145,30 @@ namespace llarp
if(p)
{
// TODO: check expiration of our end
ProtocolMessage m(f.T);
m.PutBuffer(data);
auto m = std::make_shared< ProtocolMessage >(f.T);
m->PutBuffer(data);
f.N.Randomize();
f.C.Zero();
transfer->Y.Randomize();
m.proto = t;
m.introReply = p->intro;
PutReplyIntroFor(f.T, m.introReply);
m.sender = m_Identity.pub;
m.seqno = GetSeqNoForConvo(f.T);
m->proto = t;
m->introReply = p->intro;
PutReplyIntroFor(f.T, m->introReply);
m->sender = m_Identity.pub;
m->seqno = GetSeqNoForConvo(f.T);
f.S = 1;
f.F = m.introReply.pathID;
f.F = m->introReply.pathID;
transfer->P = remoteIntro.pathID;
if(!f.EncryptAndSign(m, K, m_Identity))
{
LogError("failed to encrypt and sign");
return false;
}
LogDebug(Name(), " send ", data.sz, " via ", remoteIntro.router);
{
util::Lock lock(&m_state->m_SendQueueMutex);
m_state->m_SendQueue.emplace_back(transfer, p);
}
return true;
auto self = this;
return CryptoWorker()->addJob([transfer, p, m, K, self]() {
if(not transfer->T.EncryptAndSign(*m, K, self->m_Identity))
{
LogError("failed to encrypt and sign");
return;
}
util::Lock lock(&self->m_state->m_SendQueueMutex);
self->m_state->m_SendQueue.emplace_back(transfer, p);
});
}
}
}
@ -1190,7 +1206,7 @@ namespace llarp
}
m_state->m_PendingTraffic.erase(r);
},
5000, true);
5000);
}
bool

@ -4,6 +4,7 @@
#include <dht/messages/gotrouter.hpp>
#include <ev/ev.h>
#include <exit/session.hpp>
#include <net/ip_range_map.hpp>
#include <net/net.hpp>
#include <path/path.hpp>
#include <path/pathbuilder.hpp>
@ -145,6 +146,10 @@ namespace llarp
std::string
Name() const override;
/// get a set of all the routers we use as exit node
std::set< RouterID >
GetExitRouters() const;
bool
ShouldPublishDescriptors(llarp_time_t now) const override;
@ -420,7 +425,7 @@ namespace llarp
protected:
IDataHandler* m_DataHandler = nullptr;
Identity m_Identity;
std::shared_ptr< exit::BaseSession > m_Exit;
net::IPRangeMap< exit::BaseSession_ptr > m_ExitMap;
hooks::Backend_ptr m_OnUp;
hooks::Backend_ptr m_OnDown;
hooks::Backend_ptr m_OnReady;

@ -168,7 +168,7 @@ namespace llarp
{
if(itr->second.remote.Addr() == info)
{
if(tags.insert(itr->first).second)
if(tags.emplace(itr->first).second)
{
inserted = true;
}

@ -19,7 +19,7 @@ namespace llarp
{
struct ILookupHolder;
constexpr size_t MaxConcurrentLookups = size_t(4);
constexpr size_t MaxConcurrentLookups = size_t(16);
struct IServiceLookup
{

@ -193,7 +193,7 @@ namespace llarp
return;
}
}
AsyncKeyExchange* ex = new AsyncKeyExchange(
auto ex = std::make_shared< AsyncKeyExchange >(
m_Endpoint->RouterLogic(), remoteIdent, m_Endpoint->GetIdentity(),
currentIntroSet.K, remoteIntro, m_DataHandler, currentConvoTag, t);
@ -202,7 +202,7 @@ namespace llarp
ex->msg.PutBuffer(payload);
ex->msg.introReply = path->intro;
ex->frame.F = ex->msg.introReply.pathID;
ex->frame->F = ex->msg.introReply.pathID;
m_Endpoint->CryptoWorker()->addJob(
std::bind(&AsyncKeyExchange::Encrypt, ex));
}
@ -337,6 +337,12 @@ namespace llarp
exclude.insert(m_NextIntro.router);
for(const auto& snode : m_Endpoint->SnodeBlacklist())
exclude.insert(snode);
if(hop == 0)
{
// exclude any exits as our first hop
const auto exits = m_Endpoint->GetExitRouters();
exclude.insert(exits.begin(), exits.end());
}
if(hop == numHops - 1)
{
m_Endpoint->EnsureRouterIsKnown(m_NextIntro.router);

@ -5,6 +5,7 @@
#include <service/endpoint.hpp>
#include <util/thread/logic.hpp>
#include <utility>
#include <unordered_set>
namespace llarp
{
@ -22,12 +23,12 @@ namespace llarp
}
bool
SendContext::Send(const ProtocolFrame& msg, path::Path_ptr path)
SendContext::Send(std::shared_ptr< ProtocolFrame > msg, path::Path_ptr path)
{
util::Lock lock(&m_SendQueueMutex);
m_SendQueue.emplace_back(
std::make_shared< const routing::PathTransferMessage >(
msg, remoteIntro.pathID),
*msg, remoteIntro.pathID),
path);
return true;
}
@ -36,18 +37,25 @@ namespace llarp
SendContext::FlushUpstream()
{
auto r = m_Endpoint->Router();
util::Lock lock(&m_SendQueueMutex);
for(const auto& item : m_SendQueue)
std::unordered_set< path::Path_ptr, path::Path::Ptr_Hash > flushpaths;
{
if(item.second->SendRoutingMessage(*item.first, r))
util::Lock lock(&m_SendQueueMutex);
for(const auto& item : m_SendQueue)
{
lastGoodSend = r->Now();
m_Endpoint->MarkConvoTagActive(item.first->T.T);
if(item.second->SendRoutingMessage(*item.first, r))
{
lastGoodSend = r->Now();
flushpaths.emplace(item.second);
m_Endpoint->MarkConvoTagActive(item.first->T.T);
}
}
else
LogError(m_Endpoint->Name(), " failed to send frame on path");
m_SendQueue.clear();
}
// flush the select path's upstream
for(const auto& path : flushpaths)
{
path->FlushUpstream(r);
}
m_SendQueue.clear();
}
/// send on an established convo tag
@ -55,10 +63,10 @@ namespace llarp
SendContext::EncryptAndSendTo(const llarp_buffer_t& payload, ProtocolType t)
{
SharedSecret shared;
ProtocolFrame f;
f.N.Randomize();
f.T = currentConvoTag;
f.S = ++sequenceNo;
auto f = std::make_shared< ProtocolFrame >();
f->N.Randomize();
f->T = currentConvoTag;
f->S = ++sequenceNo;
auto path = m_PathSet->GetNewestPathByRouter(remoteIntro.router);
if(!path)
@ -68,29 +76,35 @@ namespace llarp
return;
}
if(!m_DataHandler->GetCachedSessionKeyFor(f.T, shared))
if(!m_DataHandler->GetCachedSessionKeyFor(f->T, shared))
{
LogError(m_Endpoint->Name(),
" has no cached session key on session T=", f.T);
" has no cached session key on session T=", f->T);
return;
}
ProtocolMessage m;
m_DataHandler->PutIntroFor(f.T, remoteIntro);
m_DataHandler->PutReplyIntroFor(f.T, path->intro);
m.proto = t;
m.seqno = m_Endpoint->GetSeqNoForConvo(f.T);
m.introReply = path->intro;
f.F = m.introReply.pathID;
m.sender = m_Endpoint->GetIdentity().pub;
m.tag = f.T;
m.PutBuffer(payload);
if(!f.EncryptAndSign(m, shared, m_Endpoint->GetIdentity()))
{
LogError(m_Endpoint->Name(), " failed to sign message");
return;
}
Send(f, path);
auto m = std::make_shared< ProtocolMessage >();
m_DataHandler->PutIntroFor(f->T, remoteIntro);
m_DataHandler->PutReplyIntroFor(f->T, path->intro);
m->proto = t;
m->seqno = m_Endpoint->GetSeqNoForConvo(f->T);
m->introReply = path->intro;
f->F = m->introReply.pathID;
m->sender = m_Endpoint->GetIdentity().pub;
m->tag = f->T;
m->PutBuffer(payload);
auto self = this;
m_Endpoint->CryptoWorker()->addJob([f, m, shared, path, self]() {
if(!f->EncryptAndSign(*m, shared, self->m_Endpoint->GetIdentity()))
{
LogError(self->m_Endpoint->Name(), " failed to sign message");
return;
}
self->m_Endpoint->RouterLogic()->queue_func([self, f, path]() {
self->Send(f, path);
self->FlushUpstream();
});
});
}
void

@ -29,7 +29,7 @@ namespace llarp
/// queue send a fully encrypted hidden service frame
/// via a path
bool
Send(const ProtocolFrame& f, path::Path_ptr path)
Send(std::shared_ptr< ProtocolFrame > f, path::Path_ptr path)
LOCKS_EXCLUDED(m_SendQueueMutex);
/// flush upstream traffic when in router thread

@ -23,6 +23,13 @@ namespace llarp
return std::bitset< std::numeric_limits< Int_t >::digits >(i).count();
}
constexpr std::size_t
count_bits_128(const absl::uint128& i)
{
return count_bits(absl::Uint128High64(i))
+ count_bits(absl::Uint128Low64(i));
}
template < typename InputIt >
constexpr std::size_t
__count_array_bits(InputIt begin, InputIt end)

@ -8,7 +8,7 @@
#include <ctime>
#include <iomanip>
#include <sstream>
#include <thread>
#include <util/thread/threading.hpp>
namespace llarp
{

@ -1,21 +1,6 @@
#ifndef LLARP_STRING_VIEW_HPP
#define LLARP_STRING_VIEW_HPP
#if __cplusplus >= 201703L
#include <string_view>
#include <string>
namespace llarp
{
using string_view = std::string_view;
using string_view_hash = std::hash< string_view >;
static std::string
string_view_string(const string_view& v)
{
return std::string(v.data(), v.size());
}
} // namespace llarp
#else
#include <absl/hash/hash.h>
#include <absl/strings/string_view.h>
namespace llarp
@ -30,4 +15,3 @@ namespace llarp
}
} // namespace llarp
#endif
#endif

@ -13,11 +13,26 @@ namespace llarp
llarp_threadpool_tick(this->thread);
}
Logic::Logic()
: thread(llarp_init_threadpool(1, "llarp-logic"))
, timer(llarp_init_timer())
{
llarp_threadpool_start(thread);
/// set thread id
thread->impl->addJob([&]() { id.emplace(std::this_thread::get_id()); });
}
Logic::~Logic()
{
llarp_threadpool_stop(this->thread);
llarp_threadpool_join(this->thread);
llarp_free_threadpool(&this->thread);
}
void
Logic::tick_async(llarp_time_t now)
{
llarp_timer_tick_all_async(this->timer, this->thread, now);
llarp_threadpool_tick(this->thread);
}
void
@ -40,9 +55,7 @@ namespace llarp
if(this->thread)
{
llarp_threadpool_stop(this->thread);
llarp_threadpool_join(this->thread);
}
llarp_free_threadpool(&this->thread);
llarp::LogDebug("logic timer stop");
if(this->timer)
@ -56,12 +69,11 @@ namespace llarp
}
bool
Logic::queue_func(std::function< void(void) > f)
Logic::queue_func(std::function< void(void) >&& f)
{
if(!this->thread->QueueFunc(f))
if(!this->thread->impl->tryAddJob(f))
{
// try calling it later if the job queue overflows
this->call_later(1, f);
call_later(0, f);
}
return true;
}
@ -97,7 +109,7 @@ namespace llarp
bool
Logic::can_flush() const
{
return ourID == std::this_thread::get_id();
return id.value() == std::this_thread::get_id();
}
} // namespace llarp

@ -4,6 +4,7 @@
#include <util/mem.h>
#include <util/thread/threadpool.h>
#include <util/thread/timer.hpp>
#include <absl/types/optional.h>
namespace llarp
{
@ -12,14 +13,11 @@ namespace llarp
public:
struct llarp_threadpool* thread;
struct llarp_timer_context* timer;
const std::thread::id ourID;
absl::optional< std::thread::id > id;
Logic()
: thread(llarp_init_same_process_threadpool())
, timer(llarp_init_timer())
, ourID(std::this_thread::get_id())
{
}
Logic();
~Logic();
/// single threaded tick
void
@ -42,7 +40,7 @@ namespace llarp
queue_job(struct llarp_thread_job job);
bool
queue_func(std::function< void(void) > func);
queue_func(std::function< void(void) >&& func);
uint32_t
call_later(const llarp_timeout_job& job);

@ -1,6 +1,7 @@
#include <util/thread/threading.hpp>
#include <util/logging/logger.hpp>
#include <cstring>
#ifdef POSIX
#include <pthread.h>
@ -40,7 +41,7 @@ namespace llarp
if(rc)
{
LogError("Failed to set thread name to ", name, " errno = ", rc,
" errstr = ", strerror(rc));
" errstr = ", ::strerror(rc));
}
#endif
#elif _MSC_VER

@ -17,6 +17,15 @@ using pid_t = int;
#include <unistd.h>
#endif
#ifdef TRACY_ENABLE
#include "Tracy.hpp"
#define DECLARE_LOCK(type, var, ...) TracyLockable(type, var)
#define ACQUIRE_LOCK(lock, mtx) lock(mtx)
#else
#define DECLARE_LOCK(type, var, ...) type var __VA_ARGS__
#define ACQUIRE_LOCK(lock, mtx) lock(&mtx)
#endif
namespace llarp
{
namespace util
@ -64,8 +73,9 @@ namespace llarp
}
};
using Mutex = absl::Mutex;
using Lock = absl::MutexLock;
using Mutex = absl::Mutex;
using Lock = absl::MutexLock;
using ReleasableLock = absl::ReleasableMutexLock;
using Condition = absl::CondVar;

@ -38,15 +38,6 @@ struct llarp_threadpool
return jobs->size();
return 0;
}
bool
QueueFunc(std::function< void(void) > f)
{
if(impl)
return impl->tryAddJob(f);
return jobs->tryPushBack(f) == llarp::thread::QueueReturn::Success;
}
};
struct llarp_threadpool *

@ -30,6 +30,8 @@ struct LinkLayerTest : public test::LlarpTest< llarp::sodium::CryptoLibSodium >
rc.enckey = encryptionKey.toPublic();
}
std::shared_ptr<thread::ThreadPool> worker;
SecretKey signingKey;
SecretKey encryptionKey;
@ -37,6 +39,12 @@ struct LinkLayerTest : public test::LlarpTest< llarp::sodium::CryptoLibSodium >
bool gotLIM = false;
void Setup()
{
worker = std::make_shared<thread::ThreadPool>(1, 128, "test-worker");
worker->start();
}
const RouterContact&
GetRC() const
{
@ -85,7 +93,7 @@ struct LinkLayerTest : public test::LlarpTest< llarp::sodium::CryptoLibSodium >
return false;
if(!rc.Sign(signingKey))
return false;
return link->Start(logic);
return link->Start(logic, worker);
}
void
@ -93,13 +101,18 @@ struct LinkLayerTest : public test::LlarpTest< llarp::sodium::CryptoLibSodium >
{
if(link)
link->Stop();
if(worker)
{
worker->drain();
worker->stop();
}
}
void
TearDown()
{
Stop();
link.reset();
worker.reset();
}
};
@ -125,6 +138,8 @@ struct LinkLayerTest : public test::LlarpTest< llarp::sodium::CryptoLibSodium >
RouterContact::Lifetime = 500;
netLoop = llarp_make_ev_loop();
m_logic.reset(new Logic());
Alice.Setup();
Bob.Setup();
}
void
@ -157,7 +172,11 @@ struct LinkLayerTest : public test::LlarpTest< llarp::sodium::CryptoLibSodium >
void
Stop()
{
llarp_ev_loop_stop(netLoop);
m_logic->queue_func([&]() {
Alice.Stop();
Bob.Stop();
llarp_ev_loop_stop(netLoop);
});
}
bool
@ -212,14 +231,12 @@ TEST_F(LinkLayerTest, TestIWP)
auto sendDiscardMessage = [](ILinkSession* s) -> bool {
// send discard message in reply to complete unit test
std::array< byte_t, 32 > tmp;
std::vector< byte_t> tmp(32);
llarp_buffer_t otherBuf(tmp);
DiscardMessage discard;
if(!discard.BEncode(&otherBuf))
return false;
otherBuf.sz = otherBuf.cur - otherBuf.base;
otherBuf.cur = otherBuf.base;
return s->SendMessageBuffer(otherBuf, nullptr);
return s->SendMessageBuffer(std::move(tmp), nullptr);
};
Bob.link = iwp::NewInboundLink(
@ -255,7 +272,7 @@ TEST_F(LinkLayerTest, TestIWP)
ASSERT_TRUE(Alice.Start(m_logic, netLoop, AlicePort));
ASSERT_TRUE(Bob.Start(m_logic, netLoop, BobPort));
ASSERT_TRUE(Alice.link->TryEstablishTo(Bob.GetRC()));
m_logic->queue_func([&]() { ASSERT_TRUE(Alice.link->TryEstablishTo(Bob.GetRC())); });
RunMainloop();
ASSERT_TRUE(Bob.gotLIM);

@ -11,7 +11,6 @@
struct AbyssTestBase : public ::testing::Test
{
llarp::sodium::CryptoLibSodium crypto;
llarp_threadpool* threadpool = nullptr;
llarp_ev_loop_ptr loop = nullptr;
std::shared_ptr< llarp::Logic > logic;
abyss::httpd::BaseReqHandler* server = nullptr;
@ -49,7 +48,6 @@ struct AbyssTestBase : public ::testing::Test
{
loop = llarp_make_ev_loop();
logic = std::make_shared< llarp::Logic >();
threadpool = logic->thread;
sockaddr_in addr;
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
addr.sin_port = htons((llarp::randint() % 2000) + 2000);
@ -83,7 +81,6 @@ struct AbyssTestBase : public ::testing::Test
~AbyssTestBase()
{
logic.reset();
llarp_free_threadpool(&threadpool);
llarp::SetLogLevel(llarp::eLogInfo);
}
};

Loading…
Cancel
Save