Merge pull request #639 from majestrate/master

[WIP] libuv
This commit is contained in:
Jeff 2019-06-13 09:27:42 -04:00 committed by GitHub
commit b857be3d02
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
45 changed files with 1187 additions and 287 deletions

View File

@ -11,7 +11,7 @@ build:linux:
- linux
stage: build
before_script:
- apk add --update g++ make cmake linux-headers libcap-dev
- apk add --update g++ make cmake linux-headers libcap-dev libuv-dev
script:
- make STATIC_LINK=ON
artifacts:

View File

@ -16,7 +16,7 @@ env:
- CCACHE_DIR=$HOME/.ccache
- BASE_OUTDIR=$TRAVIS_BUILD_DIR/out
- SDK_URL=https://bitcoincore.org/depends-sources/sdks
- DOCKER_PACKAGES="build-essential cmake git libcap-dev bsdmainutils ninja-build curl git ca-certificates ccache"
- DOCKER_PACKAGES="build-essential cmake git libcap-dev bsdmainutils ninja-build curl git ca-certificates ccache libuv1-dev"
matrix:
#- HOST=x86_64-w64-mingw32
- HOST=x86_64-unknown-linux-gnu STATIC_LINK=OFF

View File

@ -212,6 +212,9 @@ endif(JEMALLOC)
# FS_LIB should resolve to nothing on all other platforms
# it is only required on win32 -rick
set(LIBS ${MALLOC_LIB} ${FS_LIB})
if(NOT WIN32)
set(LIBS ${LIBS} uv)
endif()
if(ANDROID)
list(APPEND LIBS log)
@ -227,6 +230,7 @@ else()
message(FATAL_ERROR "What operating system _are_ you building on/for?")
endif()
set(LIBTUNTAP_SRC_BASE
${TT_ROOT}/tuntap.cpp
${TT_ROOT}/tuntap_log.cpp
@ -263,6 +267,7 @@ target_include_directories(${ABYSS_EXE} PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/${AB
# for freebsd
if(${CMAKE_SYSTEM_NAME} MATCHES "FreeBSD")
target_link_directories(${ABYSS_EXE} PRIVATE /usr/local/lib)
target_include_directories(${ABYSS_LIB} SYSTEM PUBLIC /usr/local/include)
endif(${CMAKE_SYSTEM_NAME} MATCHES "FreeBSD")
add_log_tag(${ABYSS_EXE})
@ -296,7 +301,9 @@ else()
if(${CMAKE_SYSTEM_NAME} MATCHES "Linux")
install(CODE "execute_process(COMMAND setcap cap_net_admin,cap_net_bind_service=+eip ${CMAKE_INSTALL_PREFIX}/bin/lokinet)")
endif(${CMAKE_SYSTEM_NAME} MATCHES "Linux")
if(${CMAKE_SYSTEM_NAME} MATCHES "FreeBSD")
target_link_directories(${EXE} PRIVATE /usr/local/lib)
endif()
target_link_libraries(${EXE} PUBLIC ${EXE_LIBS})
if(ANDROID)

View File

@ -65,6 +65,8 @@ NETNS ?= OFF
CROSS ?= OFF
# build liblokinet-shared.so
SHARED_LIB ?= OFF
# use libuv
LIBUV ?= ON
# enable generating coverage
COVERAGE ?= OFF
COVERAGE_OUTDIR ?= "$(TMPDIR)/lokinet-coverage"
@ -92,11 +94,11 @@ ANALYZE_CONFIG_CMD = $(shell gecho -n "cd '$(BUILD_ROOT)' && " ; gecho -n "$(SCA
COVERAGE_CONFIG_CMD = $(shell gecho -n "cd '$(BUILD_ROOT)' && " ; gecho -n "cmake -G'$(CMAKE_GEN)' -DCMAKE_CROSSCOMPILING=$(CROSS) -DSTATIC_LINK_RUNTIME=$(STATIC_LINK) -DUSE_NETNS=$(NETNS) -DUSE_AVX2=$(AVX2) -DNON_PC_TARGET=$(NON_PC_TARGET) -DWITH_SHARED=$(SHARED_LIB) -DWITH_COVERAGE=yes -DCMAKE_EXPORT_COMPILE_COMMANDS=ON '$(REPO)'")
else
CONFIG_CMD = $(shell /bin/echo -n "cd '$(BUILD_ROOT)' && " ; /bin/echo -n "cmake -G'$(CMAKE_GEN)' -DCMAKE_CROSSCOMPILING=$(CROSS) -DSTATIC_LINK_RUNTIME=$(STATIC_LINK) -DUSE_NETNS=$(NETNS) -DUSE_AVX2=$(AVX2) -DNON_PC_TARGET=$(NON_PC_TARGET) -DWITH_SHARED=$(SHARED_LIB) -DCMAKE_EXPORT_COMPILE_COMMANDS=ON '$(REPO)'")
CONFIG_CMD = $(shell /bin/echo -n "cd '$(BUILD_ROOT)' && " ; /bin/echo -n "cmake -G'$(CMAKE_GEN)' -DCMAKE_CROSSCOMPILING=$(CROSS) -DSTATIC_LINK_RUNTIME=$(STATIC_LINK) -DUSE_NETNS=$(NETNS) -DUSE_AVX2=$(AVX2) -DNON_PC_TARGET=$(NON_PC_TARGET) -DWITH_SHARED=$(SHARED_LIB) -DUSE_LIBUV=$(LIBUV) -DCMAKE_EXPORT_COMPILE_COMMANDS=ON '$(REPO)'")
ANALYZE_CONFIG_CMD = $(shell /bin/echo -n "cd '$(BUILD_ROOT)' && " ; /bin/echo -n "$(SCAN_BUILD) cmake -G'$(CMAKE_GEN)' -DCMAKE_CROSSCOMPILING=$(CROSS) -DSTATIC_LINK_RUNTIME=$(STATIC_LINK) -DUSE_NETNS=$(NETNS) -DUSE_AVX2=$(AVX2) -DNON_PC_TARGET=$(NON_PC_TARGET) -DWITH_SHARED=$(SHARED_LIB) -DCMAKE_EXPORT_COMPILE_COMMANDS=ON '$(REPO)'")
ANALYZE_CONFIG_CMD = $(shell /bin/echo -n "cd '$(BUILD_ROOT)' && " ; /bin/echo -n "$(SCAN_BUILD) cmake -G'$(CMAKE_GEN)' -DCMAKE_CROSSCOMPILING=$(CROSS) -DSTATIC_LINK_RUNTIME=$(STATIC_LINK) -DUSE_NETNS=$(NETNS) -DUSE_AVX2=$(AVX2) -DNON_PC_TARGET=$(NON_PC_TARGET) -DWITH_SHARED=$(SHARED_LIB) -DUSE_LIBUV=$(LIBUV) -DCMAKE_EXPORT_COMPILE_COMMANDS=ON '$(REPO)'")
COVERAGE_CONFIG_CMD = $(shell /bin/echo -n "cd '$(BUILD_ROOT)' && " ; /bin/echo -n "cmake -G'$(CMAKE_GEN)' -DCMAKE_CROSSCOMPILING=$(CROSS) -DSTATIC_LINK_RUNTIME=$(STATIC_LINK) -DUSE_NETNS=$(NETNS) -DUSE_AVX2=$(AVX2) -DNON_PC_TARGET=$(NON_PC_TARGET) -DWITH_SHARED=$(SHARED_LIB) -DWITH_COVERAGE=yes -DCMAKE_EXPORT_COMPILE_COMMANDS=ON '$(REPO)'")
COVERAGE_CONFIG_CMD = $(shell /bin/echo -n "cd '$(BUILD_ROOT)' && " ; /bin/echo -n "cmake -G'$(CMAKE_GEN)' -DCMAKE_CROSSCOMPILING=$(CROSS) -DSTATIC_LINK_RUNTIME=$(STATIC_LINK) -DUSE_NETNS=$(NETNS) -DUSE_AVX2=$(AVX2) -DNON_PC_TARGET=$(NON_PC_TARGET) -DWITH_SHARED=$(SHARED_LIB) -DUSE_LIBUV=$(LIBUV) -DWITH_COVERAGE=yes -DCMAKE_EXPORT_COMPILE_COMMANDS=ON '$(REPO)'")
endif
TARGETS = $(REPO)/lokinet

View File

@ -4,4 +4,6 @@ add_library(${ABYSS_LIB} "${CMAKE_CURRENT_SOURCE_DIR}/src/md5.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/src/server.cpp")
target_include_directories(${ABYSS_LIB} PUBLIC include)
target_link_libraries(${ABYSS_LIB} PUBLIC ${PLATFORM_LIB})
if(NOT WIN32)
target_link_libraries(${ABYSS_LIB} PUBLIC ${PLATFORM_LIB} uv)
endif()

View File

@ -68,8 +68,7 @@ struct DemoClient : public abyss::http::JSONRPC
abyss::http::IRPCClientHandler*
NewConn(abyss::http::ConnImpl* impl)
{
return new DemoCall(impl, m_Logic,
std::bind(&llarp_ev_loop_stop, m_Loop.get()));
return new DemoCall(impl, m_Logic, std::bind(&llarp_ev_loop_stop, m_Loop));
}
void
@ -118,7 +117,7 @@ main(ABSL_ATTRIBUTE_UNUSED int argc, ABSL_ATTRIBUTE_UNUSED char* argv[])
#endif
llarp::SetLogLevel(llarp::eLogDebug);
llarp_threadpool* threadpool = llarp_init_same_process_threadpool();
llarp_ev_loop_ptr loop = llarp_make_ev_loop();
llarp_ev_loop_ptr loop = llarp_make_uv_loop();
auto logic = std::make_shared< llarp::Logic >(threadpool);
sockaddr_in addr;
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);

View File

@ -63,6 +63,7 @@ namespace abyss
static void
OnClosed(llarp_tcp_conn* conn)
{
llarp::LogDebug("connection closed");
ConnImpl* self = static_cast< ConnImpl* >(conn->user);
self->state = eCloseMe;
}

View File

@ -212,6 +212,7 @@ namespace abyss
bool
ProcessRead(const char* buf, size_t sz)
{
llarp::LogDebug("http read ", sz, " bytes");
if(m_Bad)
{
return false;
@ -266,6 +267,7 @@ namespace abyss
static void
OnClosed(llarp_tcp_conn* conn)
{
llarp::LogDebug("connection closed");
ConnImpl* self = static_cast< ConnImpl* >(conn->user);
self->_conn = nullptr;
}
@ -280,6 +282,8 @@ namespace abyss
void
Tick()
{
if(m_Bad)
Close();
}
/// mark bad so next tick we are closed

View File

@ -79,12 +79,15 @@ set(LIB_PLATFORM_SRC
)
if (WIN32)
set(LIB_PLATFORM_SRC
${LIB_PLATFORM_SRC}
win32/win32_inet.c
win32/win32_intrnl.c
win32/win32_upoll.c
)
set(LIB_PLATFORM_SRC
${LIB_PLATFORM_SRC}
win32/win32_inet.c
win32/win32_intrnl.c
win32/win32_upoll.c)
else()
set(LIB_PLATFORM_SRC
${LIB_PLATFORM_SRC}
ev/ev_libuv.cpp)
endif(WIN32)
if (SOLARIS)
@ -250,6 +253,11 @@ add_library(${STATIC_LIB} STATIC ${LIB_SRC})
set(LIBS ${LIBS} libutp)
target_link_libraries(${STATIC_LIB} PUBLIC cxxopts ${ABYSS_LIB} ${PLATFORM_LIB} ${UTIL_LIB} ${CRYPTOGRAPHY_LIB} ${LIBS})
if(${CMAKE_SYSTEM_NAME} MATCHES "FreeBSD")
target_include_directories(${PLATFORM_LIB} SYSTEM PUBLIC /usr/local/include)
target_include_directories(${STATIC_LIB} SYSTEM PUBLIC /usr/local/include)
endif()
if(WITH_SHARED)
add_library(${SHARED_LIB} SHARED ${LIB_SRC})
set(LIBS ${LIBS} Threads::Threads)

View File

@ -224,12 +224,6 @@ __ ___ ____ _ _ ___ _ _ ____
return 1;
}
int
Context::IterateDatabase(llarp_nodedb_iter &i)
{
return nodedb->iterate_all(i);
}
bool
Context::PutDatabase(__attribute__((unused)) struct llarp::RouterContact &rc)
{
@ -251,8 +245,11 @@ __ ___ ____ _ _ ___ _ _ ____
{
llarp::LogInfo(LLARP_VERSION, " ", LLARP_RELEASE_MOTTO);
llarp::LogInfo("starting up");
#if defined(WIN32)
mainloop = llarp_make_ev_loop();
#else
mainloop = llarp_make_uv_loop();
#endif
// ensure worker thread pool
if(!worker && !singleThreaded)
worker.reset(llarp_init_threadpool(2, "llarp-worker"));
@ -421,7 +418,7 @@ __ ___ ____ _ _ ___ _ _ ____
{
if(logic)
logic->stop();
llarp_ev_loop_stop(mainloop.get());
llarp_ev_loop_stop(mainloop);
Close();
}
}

View File

@ -11,7 +11,7 @@ namespace llarp
struct NoOpCrypto final : public Crypto
{
private:
std::atomic<uint64_t> m_value;
std::atomic< uint64_t > m_value;
static constexpr byte_t MAX_BYTE = std::numeric_limits< byte_t >::max();

View File

@ -57,6 +57,7 @@ llarp_ev_loop_run_single_process(llarp_ev_loop_ptr ev,
}
llarp::LogContext::Instance().logStream->Tick(ev->time_now());
}
ev->stopped();
}
int
@ -86,7 +87,7 @@ llarp_ev_loop_time_now_ms(const llarp_ev_loop_ptr &loop)
}
void
llarp_ev_loop_stop(struct llarp_ev_loop *loop)
llarp_ev_loop_stop(const llarp_ev_loop_ptr &loop)
{
loop->stop();
}
@ -95,30 +96,7 @@ int
llarp_ev_udp_sendto(struct llarp_udp_io *udp, const sockaddr *to,
const llarp_buffer_t &buf)
{
auto ret =
static_cast< llarp::ev_io * >(udp->impl)->sendto(to, buf.base, buf.sz);
#ifndef _WIN32
if(ret == -1 && errno != 0)
{
#else
if(ret == -1 && WSAGetLastError())
{
#endif
#ifndef _WIN32
llarp::LogWarn("sendto failed ", strerror(errno));
errno = 0;
}
#else
char ebuf[1024];
int err = WSAGetLastError();
FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, nullptr, err, LANG_NEUTRAL, ebuf,
1024, nullptr);
llarp::LogWarn("sendto failed: ", ebuf);
WSASetLastError(0);
}
#endif
return ret;
return udp->sendto(udp, to, buf.base, buf.sz);
}
#include <string.h>
@ -165,12 +143,7 @@ llarp_ev_add_tun(struct llarp_ev_loop *loop, struct llarp_tun_io *tun)
llarp::LogDebug("IfName: ", tun->ifname);
llarp::LogDebug("IfNMsk: ", tun->netmask);
#ifndef _WIN32
auto dev = loop->create_tun(tun);
tun->impl = dev;
if(dev)
{
return loop->add_ev(dev, false);
}
return loop->tun_listen(tun);
#else
UNREFERENCED_PARAMETER(loop);
auto dev = new win32_tun_io(tun);
@ -195,7 +168,7 @@ llarp_ev_tun_async_write(struct llarp_tun_io *tun, const llarp_buffer_t &buf)
return false;
}
#ifndef _WIN32
return static_cast< llarp::tun * >(tun->impl)->queue_write(buf.base, buf.sz);
return tun->writepkt(tun, buf.base, buf.sz);
#else
return static_cast< win32_tun_io * >(tun->impl)->queue_write(buf.base,
buf.sz);
@ -206,24 +179,21 @@ bool
llarp_tcp_conn_async_write(struct llarp_tcp_conn *conn, const llarp_buffer_t &b)
{
ManagedBuffer buf{b};
llarp::tcp_conn *impl = static_cast< llarp::tcp_conn * >(conn->impl);
if(impl->_shouldClose)
{
llarp::LogError("write on closed connection");
return false;
}
size_t sz = buf.underlying.sz;
buf.underlying.cur = buf.underlying.base;
while(sz > EV_WRITE_BUF_SZ)
{
if(!impl->queue_write(buf.underlying.cur, EV_WRITE_BUF_SZ))
ssize_t amount = conn->write(conn, buf.underlying.cur, EV_WRITE_BUF_SZ);
if(amount <= 0)
{
llarp::LogError("write underrun");
return false;
}
buf.underlying.cur += EV_WRITE_BUF_SZ;
sz -= EV_WRITE_BUF_SZ;
buf.underlying.cur += amount;
sz -= amount;
}
return impl->queue_write(buf.underlying.cur, sz);
return conn->write(conn, buf.underlying.cur, sz) > 0;
}
void
@ -267,30 +237,20 @@ bool
llarp_tcp_serve(struct llarp_ev_loop *loop, struct llarp_tcp_acceptor *tcp,
const struct sockaddr *bindaddr)
{
tcp->loop = loop;
llarp::ev_io *impl = loop->bind_tcp(tcp, bindaddr);
if(impl)
{
return loop->add_ev(impl, false);
}
return false;
tcp->loop = loop;
return loop->tcp_listen(tcp, bindaddr);
}
void
llarp_tcp_acceptor_close(struct llarp_tcp_acceptor *tcp)
{
llarp::ev_io *impl = static_cast< llarp::ev_io * >(tcp->user);
tcp->impl = nullptr;
tcp->loop->close_ev(impl);
if(tcp->closed)
tcp->closed(tcp);
// dont free acceptor because it may be stack allocated
tcp->close(tcp);
}
void
llarp_tcp_conn_close(struct llarp_tcp_conn *conn)
{
static_cast< llarp::tcp_conn * >(conn->impl)->_shouldClose = true;
conn->close(conn);
}
namespace llarp

View File

@ -27,6 +27,10 @@ typedef SSIZE_T ssize_t;
#include <stdint.h>
#include <stdlib.h>
#if !defined(WIN32)
#include <uv.h>
#endif
/**
* ev.h
*
@ -47,9 +51,14 @@ namespace llarp
using llarp_ev_loop_ptr = std::shared_ptr< llarp_ev_loop >;
/// make an event loop using our baked in event loop, ew.
llarp_ev_loop_ptr
llarp_make_ev_loop();
/// make an event loop using libuv
llarp_ev_loop_ptr
llarp_make_uv_loop();
// run mainloop
void
llarp_ev_loop_run_single_process(llarp_ev_loop_ptr ev,
@ -62,7 +71,7 @@ llarp_ev_loop_time_now_ms(const llarp_ev_loop_ptr &ev);
/// stop event loop and wait for it to complete all jobs
void
llarp_ev_loop_stop(struct llarp_ev_loop *ev);
llarp_ev_loop_stop(const llarp_ev_loop_ptr &ev);
/// UDP handling configuration
struct llarp_udp_io
@ -78,6 +87,9 @@ struct llarp_udp_io
/// sockaddr * is the source address
void (*recvfrom)(struct llarp_udp_io *, const struct sockaddr *,
ManagedBuffer);
/// set by parent
int (*sendto)(struct llarp_udp_io *, const struct sockaddr *, const byte_t *,
size_t);
};
/// add UDP handler
@ -108,8 +120,14 @@ struct llarp_tcp_conn
struct llarp_ev_loop *loop;
/// handle read event
void (*read)(struct llarp_tcp_conn *, const llarp_buffer_t &);
//// set by parent
ssize_t (*write)(struct llarp_tcp_conn *, const byte_t *, size_t sz);
/// set by parent
bool (*is_open)(struct llarp_tcp_conn *);
/// handle close event (free-ing is handled by event loop)
void (*closed)(struct llarp_tcp_conn *);
/// explict close by user (set by parent)
void (*close)(struct llarp_tcp_conn *);
/// handle event loop tick
void (*tick)(struct llarp_tcp_conn *);
};
@ -132,6 +150,8 @@ struct llarp_tcp_connecter
char remote[512];
/// userdata pointer
void *user;
/// private implementation (dont set me)
void *impl;
/// parent event loop (dont set me)
struct llarp_ev_loop *loop;
/// handle outbound connection made
@ -160,6 +180,8 @@ struct llarp_tcp_acceptor
void (*accepted)(struct llarp_tcp_acceptor *, struct llarp_tcp_conn *);
/// handle after server socket closed (free-ing is handled by event loop)
void (*closed)(struct llarp_tcp_acceptor *);
/// set by impl
void (*close)(struct llarp_tcp_acceptor *);
};
/// bind to an address and start serving async
@ -204,6 +226,8 @@ struct llarp_tun_io
/// called every event loop tick after reads
void (*tick)(struct llarp_tun_io *);
void (*recvpkt)(struct llarp_tun_io *, const llarp_buffer_t &);
/// set by parent
bool (*writepkt)(struct llarp_tun_io *, const byte_t *, size_t);
};
/// create tun interface with network interface name ifname

View File

@ -531,6 +531,12 @@ namespace llarp
// null if inbound otherwise outbound
llarp_tcp_connecter* _conn;
static void
DoClose(llarp_tcp_conn* conn)
{
static_cast< tcp_conn* >(conn->impl)->_shouldClose = true;
}
/// inbound
tcp_conn(llarp_ev_loop* loop, int fd)
: ev_io(fd, new LosslessWriteQueue_t{}), _conn(nullptr)
@ -541,6 +547,7 @@ namespace llarp
tcp.user = nullptr;
tcp.read = nullptr;
tcp.tick = nullptr;
tcp.close = &DoClose;
}
/// outbound
@ -560,6 +567,7 @@ namespace llarp
tcp.user = nullptr;
tcp.read = nullptr;
tcp.tick = nullptr;
tcp.close = &DoClose;
}
virtual ~tcp_conn()
@ -713,7 +721,7 @@ struct llarp_ev_loop
virtual bool
running() const = 0;
void
virtual void
update_time()
{
_now = llarp::time_now_ms();
@ -725,6 +733,9 @@ struct llarp_ev_loop
return _now;
}
virtual void
stopped(){};
/// return false on socket error (non blocking)
virtual bool
tcp_connect(llarp_tcp_connecter* tcp, const sockaddr* addr) = 0;
@ -738,15 +749,24 @@ struct llarp_ev_loop
virtual bool
udp_listen(llarp_udp_io* l, const sockaddr* src) = 0;
virtual llarp::ev_io*
create_udp(llarp_udp_io* l, const sockaddr* src) = 0;
virtual bool
udp_close(llarp_udp_io* l) = 0;
/// deregister event listener
virtual bool
close_ev(llarp::ev_io* ev) = 0;
virtual bool
tun_listen(llarp_tun_io* tun)
{
auto dev = create_tun(tun);
tun->impl = dev;
if(dev)
{
return add_ev(dev, false);
}
return false;
}
virtual llarp::ev_io*
create_tun(llarp_tun_io* tun) = 0;
@ -757,6 +777,13 @@ struct llarp_ev_loop
virtual bool
add_ev(llarp::ev_io* ev, bool write) = 0;
virtual bool
tcp_listen(llarp_tcp_acceptor* tcp, const sockaddr* addr)
{
auto conn = bind_tcp(tcp, addr);
return conn && add_ev(conn, true);
}
virtual ~llarp_ev_loop()
{
}

View File

@ -309,12 +309,23 @@ llarp_epoll_loop::bind_tcp(llarp_tcp_acceptor* tcp, const sockaddr* bindaddr)
return new llarp::tcp_serv(this, fd, tcp);
}
static int
llarp_ev_epoll_sendto(struct llarp_udp_io* udp, const struct sockaddr* to,
const byte_t* pkt, size_t sz)
{
const llarp::Addr toaddr(*to);
return ::sendto(udp->fd, pkt, sz, 0, toaddr, toaddr.SockLen());
}
bool
llarp_epoll_loop::udp_listen(llarp_udp_io* l, const sockaddr* src)
{
auto ev = create_udp(l, src);
if(ev)
l->fd = ev->fd;
{
l->fd = ev->fd;
l->sendto = &llarp_ev_epoll_sendto;
}
return ev && add_ev(ev, false);
}

View File

@ -492,12 +492,23 @@ llarp_kqueue_loop::udp_bind(const sockaddr* addr)
return fd;
}
static int
llarp_ev_kqueue_sendto(struct llarp_udp_io* udp, const struct sockaddr* to,
const byte_t* pkt, size_t sz)
{
const llarp::Addr toaddr(*to);
return ::sendto(udp->fd, pkt, sz, 0, toaddr, toaddr.SockLen());
}
bool
llarp_kqueue_loop::udp_listen(llarp_udp_io* l, const sockaddr* src)
{
auto ev = create_udp(l, src);
if(ev)
l->fd = ev->fd;
{
l->fd = ev->fd;
l->sendto = &llarp_ev_kqueue_sendto;
}
return ev && add_ev(ev, false);
}

View File

@ -113,7 +113,7 @@ struct llarp_kqueue_loop final
int
udp_bind(const sockaddr* addr);
virtual bool
bool
udp_listen(llarp_udp_io* l, const sockaddr* src) override;
bool
@ -126,7 +126,7 @@ struct llarp_kqueue_loop final
bind_tcp(llarp_tcp_acceptor* tcp, const sockaddr* bindaddr) override;
llarp::ev_io*
create_udp(llarp_udp_io* l, const sockaddr* src) override;
create_udp(llarp_udp_io* l, const sockaddr* src);
bool
add_ev(llarp::ev_io* ev, bool w) override;

641
llarp/ev/ev_libuv.cpp Normal file
View File

@ -0,0 +1,641 @@
#include "ev_libuv.hpp"
#include "net/net_addr.hpp"
namespace libuv
{
/// tcp connection glue between llarp and libuv
struct conn_glue
{
uv_tcp_t m_Handle;
uv_connect_t m_Connect;
uv_timer_t m_Ticker;
llarp_tcp_connecter* const m_TCP;
llarp_tcp_acceptor* const m_Accept;
llarp_tcp_conn m_Conn;
llarp::Addr m_Addr;
std::deque< std::vector< char > > m_WriteQueue;
conn_glue(uv_loop_t* loop, llarp_tcp_connecter* tcp, const sockaddr* addr)
: m_TCP(tcp), m_Accept(nullptr), m_Addr(*addr)
{
m_Connect.data = this;
m_Handle.data = this;
uv_tcp_init(loop, &m_Handle);
m_Ticker.data = this;
uv_timer_init(loop, &m_Ticker);
m_Conn.close = &ExplicitClose;
m_Conn.write = &ExplicitWrite;
}
conn_glue(uv_loop_t* loop, llarp_tcp_acceptor* tcp, const sockaddr* addr)
: m_TCP(nullptr), m_Accept(tcp), m_Addr(*addr)
{
m_Handle.data = this;
uv_tcp_init(loop, &m_Handle);
m_Ticker.data = this;
uv_timer_init(loop, &m_Ticker);
m_Accept->close = &ExplicitCloseAccept;
m_Conn.write = nullptr;
m_Conn.closed = nullptr;
}
conn_glue(conn_glue* parent) : m_TCP(nullptr), m_Accept(nullptr)
{
m_Conn.close = &ExplicitClose;
m_Conn.write = &ExplicitWrite;
m_Handle.data = this;
uv_tcp_init(parent->m_Handle.loop, &m_Handle);
m_Ticker.data = this;
uv_timer_init(parent->m_Handle.loop, &m_Ticker);
}
static void
OnOutboundConnect(uv_connect_t* c, int status)
{
conn_glue* self = static_cast< conn_glue* >(c->data);
self->HandleConnectResult(status);
}
bool
ConnectAsync()
{
return uv_tcp_connect(&m_Connect, &m_Handle, m_Addr, &OnOutboundConnect)
!= -1;
}
static void
ExplicitClose(llarp_tcp_conn* conn)
{
static_cast< conn_glue* >(conn->impl)->Close();
}
static void
ExplicitCloseAccept(llarp_tcp_acceptor* tcp)
{
static_cast< conn_glue* >(tcp->impl)->Close();
}
static ssize_t
ExplicitWrite(llarp_tcp_conn* conn, const byte_t* ptr, size_t sz)
{
return static_cast< conn_glue* >(conn->impl)->WriteAsync((char*)ptr, sz);
}
static void
OnRead(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf)
{
if(nread >= 0)
static_cast< conn_glue* >(stream->data)->Read(buf->base, nread);
else if(nread < 0)
static_cast< conn_glue* >(stream->data)->Close();
delete[] buf->base;
}
static void
Alloc(uv_handle_t*, size_t suggested_size, uv_buf_t* buf)
{
buf->base = new char[suggested_size];
buf->len = suggested_size;
}
void
Read(const char* ptr, ssize_t sz)
{
if(m_Conn.read)
{
llarp::LogDebug("tcp read ", sz, " bytes");
const llarp_buffer_t buf(ptr, sz);
m_Conn.read(&m_Conn, buf);
}
}
void
HandleConnectResult(int status)
{
if(m_TCP && m_TCP->connected)
{
if(status == 0)
{
m_Conn.impl = this;
m_Conn.loop = m_TCP->loop;
m_Conn.close = &ExplicitClose;
m_Conn.write = &ExplicitWrite;
m_TCP->connected(m_TCP, &m_Conn);
Start();
}
else if(m_TCP->error)
{
llarp::LogError("failed to connect tcp ", uv_strerror(status));
m_TCP->error(m_TCP);
}
}
}
void
WriteFail()
{
if(m_Conn.close)
m_Conn.close(&m_Conn);
}
uv_stream_t*
Stream()
{
return (uv_stream_t*)&m_Handle;
}
static void
OnWritten(uv_write_t* req, int status)
{
if(status)
{
llarp::LogError("write failed on tcp: ", uv_strerror(status));
static_cast< conn_glue* >(req->data)->Close();
return;
}
static_cast< conn_glue* >(req->data)->DrainOne();
}
void
DrainOne()
{
m_WriteQueue.pop_front();
}
int
WriteAsync(char* data, size_t sz)
{
m_WriteQueue.emplace_back(sz);
std::copy_n(data, sz, m_WriteQueue.back().begin());
auto buf = uv_buf_init(m_WriteQueue.back().data(), sz);
uv_write_t* req = new uv_write_t();
req->data = this;
return uv_write(req, Stream(), &buf, 1, &OnWritten) == 0 ? sz : 0;
}
static void
OnClosed(uv_handle_t* h)
{
static_cast< conn_glue* >(h->data)->HandleClosed();
}
void
HandleClosed()
{
m_Handle.data = nullptr;
if(m_Accept && m_Accept->closed)
{
m_Accept->impl = nullptr;
m_Accept->closed(m_Accept);
}
m_Conn.impl = nullptr;
if(m_Conn.closed)
{
m_Conn.closed(&m_Conn);
}
delete this;
}
void
Close()
{
llarp::LogDebug("close tcp connection");
uv_timer_stop(&m_Ticker);
uv_close((uv_handle_t*)&m_Handle, &OnClosed);
}
static void
OnAccept(uv_stream_t* stream, int status)
{
if(status == 0)
{
static_cast< conn_glue* >(stream->data)->Accept();
}
else
{
llarp::LogError("tcp accept failed: ", uv_strerror(status));
}
}
static void
OnTick(uv_timer_t* t)
{
static_cast< conn_glue* >(t->data)->Tick();
uv_timer_again(t);
}
void
Tick()
{
if(m_Accept && m_Accept->tick)
m_Accept->tick(m_Accept);
if(m_Conn.tick)
m_Conn.tick(&m_Conn);
}
void
Start()
{
auto result = uv_timer_start((uv_timer_t*)&m_Ticker, &OnTick, 10, 10);
if(result)
llarp::LogError("failed to start timer ", uv_strerror(result));
result = uv_read_start(Stream(), &Alloc, &OnRead);
if(result)
llarp::LogError("failed to start reader ", uv_strerror(result));
}
void
Accept()
{
if(m_Accept && m_Accept->accepted)
{
conn_glue* child = new conn_glue(this);
llarp::LogDebug("accepted new connection");
child->m_Conn.impl = child;
child->m_Conn.loop = m_Accept->loop;
child->m_Conn.close = &ExplicitClose;
child->m_Conn.write = &ExplicitWrite;
auto res = uv_accept(Stream(), child->Stream());
if(res)
{
llarp::LogError("failed to accept tcp connection ", uv_strerror(res));
child->Close();
return;
}
m_Accept->accepted(m_Accept, &child->m_Conn);
child->Start();
}
}
bool
Server()
{
m_Accept->close = &ExplicitCloseAccept;
return uv_tcp_bind(&m_Handle, m_Addr, 0) == 0
&& uv_listen(Stream(), 5, &OnAccept) == 0;
}
};
struct udp_glue
{
uv_udp_t m_Handle;
uv_check_t m_Ticker;
llarp_udp_io* const m_UDP;
llarp::Addr m_Addr;
bool gotpkts;
udp_glue(uv_loop_t* loop, llarp_udp_io* udp, const sockaddr* src)
: m_UDP(udp), m_Addr(*src)
{
m_Handle.data = this;
m_Ticker.data = this;
gotpkts = false;
uv_udp_init(loop, &m_Handle);
uv_check_init(loop, &m_Ticker);
}
static void
Alloc(uv_handle_t*, size_t suggested_size, uv_buf_t* buf)
{
buf->base = new char[suggested_size];
buf->len = suggested_size;
}
static void
OnRecv(uv_udp_t* handle, ssize_t nread, const uv_buf_t* buf,
const struct sockaddr* addr, unsigned)
{
if(addr)
static_cast< udp_glue* >(handle->data)->RecvFrom(nread, buf, addr);
delete[] buf->base;
}
void
RecvFrom(ssize_t sz, const uv_buf_t* buf, const struct sockaddr* fromaddr)
{
if(sz >= 0 && m_UDP && m_UDP->recvfrom)
{
const size_t pktsz = sz;
const llarp_buffer_t pkt{(const byte_t*)buf->base, pktsz};
m_UDP->recvfrom(m_UDP, fromaddr, ManagedBuffer{pkt});
gotpkts = true;
}
}
static void
OnTick(uv_check_t* t)
{
static_cast< udp_glue* >(t->data)->Tick();
}
void
Tick()
{
if(m_UDP && m_UDP->tick)
m_UDP->tick(m_UDP);
}
static int
SendTo(llarp_udp_io* udp, const sockaddr* to, const byte_t* ptr, size_t sz)
{
udp_glue* self = static_cast< udp_glue* >(udp->impl);
uv_buf_t buf = uv_buf_init((char*)ptr, sz);
return uv_udp_try_send(&self->m_Handle, &buf, 1, to);
}
bool
Bind()
{
auto ret = uv_udp_bind(&m_Handle, m_Addr, 0);
if(ret)
{
llarp::LogError("failed to bind to ", m_Addr, " ", uv_strerror(ret));
return false;
}
if(uv_udp_recv_start(&m_Handle, &Alloc, &OnRecv))
{
llarp::LogError("failed to start recving packets via ", m_Addr);
return false;
}
if(uv_check_start(&m_Ticker, &OnTick))
{
llarp::LogError("failed to start ticker");
return false;
}
if(uv_fileno((const uv_handle_t*)&m_Handle, &m_UDP->fd))
return false;
m_UDP->sendto = &SendTo;
return true;
}
static void
OnClosed(uv_handle_t* h)
{
udp_glue* glue = static_cast< udp_glue* >(h->data);
glue->m_UDP->impl = nullptr;
delete glue;
}
void
Close()
{
uv_check_stop(&m_Ticker);
uv_close((uv_handle_t*)&m_Handle, &OnClosed);
}
};
struct tun_glue
{
uv_poll_t m_Handle;
uv_check_t m_Ticker;
llarp_tun_io* const m_Tun;
device* const m_Device;
byte_t m_Buffer[1500];
bool readpkt;
tun_glue(llarp_tun_io* tun) : m_Tun(tun), m_Device(tuntap_init())
{
m_Handle.data = this;
m_Ticker.data = this;
readpkt = false;
}
~tun_glue()
{
tuntap_destroy(m_Device);
}
static void
OnTick(uv_check_t* timer)
{
static_cast< tun_glue* >(timer->data)->Tick();
}
static void
OnPoll(uv_poll_t* h, int, int events)
{
if(events & UV_READABLE)
{
static_cast< tun_glue* >(h->data)->Read();
}
}
void
Read()
{
auto sz = tuntap_read(m_Device, m_Buffer, sizeof(m_Buffer));
if(sz > 0)
{
llarp::LogDebug("tun read ", sz);
llarp_buffer_t pkt(m_Buffer, sz);
if(m_Tun && m_Tun->recvpkt)
m_Tun->recvpkt(m_Tun, pkt);
}
}
void
Tick()
{
if(m_Tun->before_write)
m_Tun->before_write(m_Tun);
if(m_Tun->tick)
m_Tun->tick(m_Tun);
}
static void
OnClosed(uv_handle_t* h)
{
tun_glue* self = static_cast< tun_glue* >(h->data);
delete self;
}
void
Close()
{
uv_close((uv_handle_t*)&m_Handle, &OnClosed);
}
bool
Write(const byte_t* pkt, size_t sz)
{
return tuntap_write(m_Device, (void*)pkt, sz) != -1;
}
static bool
WritePkt(llarp_tun_io* tun, const byte_t* pkt, size_t sz)
{
return static_cast< tun_glue* >(tun->impl)->Write(pkt, sz);
}
bool
Init(uv_loop_t* loop)
{
strncpy(m_Device->if_name, m_Tun->ifname, sizeof(m_Device->if_name));
if(tuntap_start(m_Device, TUNTAP_MODE_TUNNEL, 0) == -1)
{
llarp::LogError("failed to start up ", m_Tun->ifname);
return false;
}
if(tuntap_set_ip(m_Device, m_Tun->ifaddr, m_Tun->ifaddr, m_Tun->netmask)
== -1)
{
llarp::LogError("failed to set address on ", m_Tun->ifname);
return false;
}
if(tuntap_up(m_Device) == -1)
{
llarp::LogError("failed to put up ", m_Tun->ifname);
return false;
}
if(m_Device->tun_fd == -1)
{
llarp::LogError("tun interface ", m_Tun->ifname,
" has invalid fd: ", m_Device->tun_fd);
return false;
}
if(uv_poll_init(loop, &m_Handle, m_Device->tun_fd) == -1)
{
llarp::LogError("failed to start polling on ", m_Tun->ifname);
return false;
}
if(uv_poll_start(&m_Handle, UV_READABLE, &OnPoll))
{
llarp::LogError("failed to start polling on ", m_Tun->ifname);
return false;
}
if(uv_check_init(loop, &m_Ticker) != 0
|| uv_check_start(&m_Ticker, &OnTick) != 0)
{
llarp::LogError("failed to set up tun interface timer for ",
m_Tun->ifname);
return false;
}
m_Tun->writepkt = &WritePkt;
return true;
}
};
bool
Loop::init()
{
m_Impl.reset(uv_loop_new());
if(uv_loop_init(m_Impl.get()) == -1)
return false;
uv_loop_configure(m_Impl.get(), UV_LOOP_BLOCK_SIGNAL, SIGPIPE);
m_TickTimer.data = this;
m_Run.store(true);
return uv_timer_init(m_Impl.get(), &m_TickTimer) != -1;
}
void
Loop::update_time()
{
uv_update_time(m_Impl.get());
}
bool
Loop::running() const
{
return m_Run.load();
}
llarp_time_t
Loop::time_now() const
{
return llarp::time_now_ms();
}
bool
Loop::tcp_connect(llarp_tcp_connecter* tcp, const sockaddr* addr)
{
conn_glue* impl = new conn_glue(m_Impl.get(), tcp, addr);
tcp->impl = impl;
if(impl->ConnectAsync())
return true;
delete impl;
tcp->impl = nullptr;
return false;
}
static void
OnTickTimeout(uv_timer_t* timer)
{
uv_stop(timer->loop);
}
int
Loop::tick(int ms)
{
uv_timer_start(&m_TickTimer, &OnTickTimeout, ms, 0);
uv_run(m_Impl.get(), UV_RUN_ONCE);
return 0;
}
void
Loop::stop()
{
llarp::LogInfo("stopping event loop");
m_Run.store(false);
}
bool
Loop::udp_listen(llarp_udp_io* udp, const sockaddr* src)
{
udp_glue* impl = new udp_glue(m_Impl.get(), udp, src);
udp->impl = impl;
if(impl->Bind())
{
m_CloseFuncs.emplace_back(std::bind(&udp_glue::Close, impl));
return true;
}
return false;
}
bool
Loop::udp_close(llarp_udp_io* udp)
{
if(udp == nullptr)
return false;
udp_glue* glue = static_cast< udp_glue* >(udp->impl);
if(glue == nullptr)
return false;
glue->Close();
return true;
}
bool
Loop::tun_listen(llarp_tun_io* tun)
{
tun_glue* glue = new tun_glue(tun);
tun->impl = glue;
if(glue->Init(m_Impl.get()))
{
m_CloseFuncs.emplace_back(std::bind(&tun_glue ::Close, glue));
return true;
}
delete glue;
return false;
}
bool
Loop::tcp_listen(llarp_tcp_acceptor* tcp, const sockaddr* addr)
{
conn_glue* glue = new conn_glue(m_Impl.get(), tcp, addr);
tcp->impl = glue;
if(glue->Server())
return true;
tcp->impl = nullptr;
delete glue;
return false;
}
} // namespace libuv
llarp_ev_loop_ptr
llarp_make_uv_loop()
{
auto loop = std::make_shared< libuv::Loop >();
if(loop->init())
return loop;
return nullptr;
}

104
llarp/ev/ev_libuv.hpp Normal file
View File

@ -0,0 +1,104 @@
#ifndef LLARP_EV_LIBUV_HPP
#include <ev/ev.hpp>
#include <uv.h>
#include <vector>
#include <functional>
namespace libuv
{
struct Loop final : public llarp_ev_loop
{
bool
init() override;
int
run() override
{
return -1;
}
bool
running() const override;
void
update_time() override;
llarp_time_t
time_now() const override;
/// return false on socket error (non blocking)
bool
tcp_connect(llarp_tcp_connecter* tcp, const sockaddr* addr) override;
int
tick(int ms) override;
void
stop() override;
void
stopped() override
{
for(const auto& func : m_CloseFuncs)
func();
m_CloseFuncs.clear();
llarp::LogInfo("event loop stopped");
}
bool
udp_listen(llarp_udp_io* l, const sockaddr* src) override;
bool
udp_close(llarp_udp_io* l) override;
/// deregister event listener
bool
close_ev(llarp::ev_io*) override
{
return true;
}
bool
tun_listen(llarp_tun_io* tun) override;
llarp::ev_io*
create_tun(llarp_tun_io*) override
{
return nullptr;
}
bool
tcp_listen(llarp_tcp_acceptor* tcp, const sockaddr* addr) override;
llarp::ev_io*
bind_tcp(llarp_tcp_acceptor*, const sockaddr*) override
{
return nullptr;
}
/// register event listener
bool
add_ev(llarp::ev_io*, bool) override
{
return false;
}
private:
struct DestructLoop
{
void
operator()(uv_loop_t* l) const
{
uv_loop_close(l);
}
};
std::unique_ptr< uv_loop_t, DestructLoop > m_Impl;
uv_timer_t m_TickTimer;
std::atomic< bool > m_Run;
std::vector< std::function< void(void) > > m_CloseFuncs;
};
} // namespace libuv
#endif

View File

@ -211,11 +211,12 @@ namespace llarp
}
bool
BaseSession::HandleTrafficDrop(llarp::path::Path_ptr, const PathID_t& path,
uint64_t s)
BaseSession::HandleTrafficDrop(llarp::path::Path_ptr p,
const PathID_t& path, uint64_t s)
{
llarp::LogError("dropped traffic on exit ", m_ExitRouter, " S=", s,
" P=", path);
p->EnterState(path::ePathIgnore, router->Now());
return true;
}

View File

@ -283,6 +283,13 @@ namespace llarp
bool
ExitEndpoint::Start()
{
// map our address
const PubKey us(m_Router->pubkey());
const huint32_t ip = GetIfAddr();
m_KeyToIP[us] = ip;
m_IPToKey[ip] = us;
m_IPActivity[ip] = std::numeric_limits< llarp_time_t >::max();
m_SNodeKeys.insert(us);
if(m_ShouldInitTun)
{
auto loop = GetRouter()->netloop();
@ -560,7 +567,12 @@ namespace llarp
huint32_t
ExitEndpoint::ObtainServiceNodeIP(const RouterID &other)
{
PubKey pubKey(other);
const PubKey pubKey(other);
const PubKey us(m_Router->pubkey());
// just in case
if(pubKey == us)
return m_IfAddr;
huint32_t ip = GetIPForIdent(pubKey);
if(m_SNodeKeys.emplace(pubKey).second)
{

View File

@ -684,9 +684,10 @@ namespace llarp
}
else
{
sendFunc = std::bind(&TunEndpoint::SendToServiceOrQueue, this,
itr->second.as_array(), std::placeholders::_1,
service::eProtocolTraffic);
sendFunc =
std::bind(&TunEndpoint::SendToServiceOrQueue, this,
service::Address(itr->second.as_array()),
std::placeholders::_1, service::eProtocolTraffic);
}
// prepare packet for insertion into network
// this includes clearing IP addresses, recalculating checksums, etc

View File

@ -16,6 +16,11 @@
static const char skiplist_subdirs[] = "0123456789abcdef";
static const std::string RC_FILE_EXT = ".signed";
llarp_nodedb::NetDBEntry::NetDBEntry(const llarp::RouterContact &value)
: rc(value), inserted(llarp::time_now_ms())
{
}
bool
llarp_nodedb::Remove(const llarp::RouterID &pk)
{
@ -45,7 +50,7 @@ llarp_nodedb::Get(const llarp::RouterID &pk, llarp::RouterContact &result)
auto itr = entries.find(pk);
if(itr == entries.end())
return false;
result = itr->second;
result = itr->second.rc;
return true;
}
@ -66,9 +71,9 @@ llarp_nodedb::RemoveIf(
auto itr = entries.begin();
while(itr != entries.end())
{
if(filter(itr->second))
if(filter(itr->second.rc))
{
files.insert(getRCFilePath(itr->second.pubkey));
files.insert(getRCFilePath(itr->second.rc.pubkey));
itr = entries.erase(itr);
}
else
@ -228,28 +233,25 @@ llarp_nodedb::visit(std::function< bool(const llarp::RouterContact &) > visit)
auto itr = entries.begin();
while(itr != entries.end())
{
if(!visit(itr->second))
if(!visit(itr->second.rc))
return;
++itr;
}
}
bool
llarp_nodedb::iterate(llarp_nodedb_iter &i)
void
llarp_nodedb::VisitInsertedAfter(
std::function< void(const llarp::RouterContact &) > visit,
llarp_time_t insertedAfter)
{
i.index = 0;
llarp::util::Lock lock(&access);
auto itr = entries.begin();
while(itr != entries.end())
{
i.rc = &itr->second;
i.visit(&i);
// advance
i.index++;
itr++;
if(itr->second.inserted > insertedAfter)
visit(itr->second.rc);
++itr;
}
return true;
}
/*
@ -382,13 +384,6 @@ llarp_nodedb::load_dir(const char *dir)
return Load(dir);
}
int
llarp_nodedb::iterate_all(struct llarp_nodedb_iter i)
{
iterate(i);
return num_loaded();
}
/// maybe rename to verify_and_set
void
llarp_nodedb_async_verify(struct llarp_async_verify_rc *job)
@ -429,9 +424,9 @@ llarp_nodedb::select_random_exit(llarp::RouterContact &result)
std::advance(itr, idx - 1);
while(itr != entries.end())
{
if(itr->second.IsExit())
if(itr->second.rc.IsExit())
{
result = itr->second;
result = itr->second.rc;
return true;
}
++itr;
@ -440,9 +435,9 @@ llarp_nodedb::select_random_exit(llarp::RouterContact &result)
itr = entries.begin();
while(idx--)
{
if(itr->second.IsExit())
if(itr->second.rc.IsExit())
{
result = itr->second;
result = itr->second.rc;
return true;
}
++itr;
@ -470,11 +465,11 @@ llarp_nodedb::select_random_hop(const llarp::RouterContact &prev,
auto start = itr;
while(itr == entries.end())
{
if(prev.pubkey != itr->second.pubkey)
if(prev.pubkey != itr->second.rc.pubkey)
{
if(itr->second.addrs.size() && !itr->second.IsExpired(now))
if(itr->second.rc.addrs.size() && !itr->second.rc.IsExpired(now))
{
result = itr->second;
result = itr->second.rc;
return true;
}
}
@ -483,11 +478,11 @@ llarp_nodedb::select_random_hop(const llarp::RouterContact &prev,
itr = entries.begin();
while(itr != start)
{
if(prev.pubkey != itr->second.pubkey)
if(prev.pubkey != itr->second.rc.pubkey)
{
if(itr->second.addrs.size() && !itr->second.IsExpired(now))
if(itr->second.rc.addrs.size() && !itr->second.rc.IsExpired(now))
{
result = itr->second;
result = itr->second.rc;
return true;
}
}
@ -518,9 +513,9 @@ llarp_nodedb::select_random_hop_excluding(
{
if(exclude.count(itr->first) == 0)
{
if(itr->second.addrs.size() && !itr->second.IsExpired(now))
if(itr->second.rc.addrs.size() && !itr->second.rc.IsExpired(now))
{
result = itr->second;
result = itr->second.rc;
return true;
}
}
@ -531,9 +526,9 @@ llarp_nodedb::select_random_hop_excluding(
{
if(exclude.count(itr->first) == 0)
{
if(itr->second.addrs.size() && !itr->second.IsExpired(now))
if(itr->second.rc.addrs.size() && !itr->second.rc.IsExpired(now))
{
result = itr->second;
result = itr->second.rc;
return true;
}
}

View File

@ -56,9 +56,19 @@ struct llarp_nodedb
llarp::thread::ThreadPool *disk;
mutable llarp::util::Mutex access; // protects entries
std::unordered_map< llarp::RouterID, llarp::RouterContact,
llarp::RouterID::Hash >
entries GUARDED_BY(access);
struct NetDBEntry
{
const llarp::RouterContact rc;
const llarp_time_t inserted;
NetDBEntry(const llarp::RouterContact &data);
};
using NetDBMap_t =
std::unordered_map< llarp::RouterID, NetDBEntry, llarp::RouterID::Hash >;
NetDBMap_t entries GUARDED_BY(access);
fs::path nodePath;
bool
@ -104,9 +114,6 @@ struct llarp_nodedb
visit(std::function< bool(const llarp::RouterContact &) > visit)
LOCKS_EXCLUDED(access);
bool
iterate(llarp_nodedb_iter &i) LOCKS_EXCLUDED(access);
void
set_dir(const char *dir);
@ -115,8 +122,10 @@ struct llarp_nodedb
ssize_t
store_dir(const char *dir);
int
iterate_all(llarp_nodedb_iter i);
/// visit all entries inserted into nodedb cache after a timestamp
void
VisitInsertedAfter(std::function< void(const llarp::RouterContact &) > visit,
llarp_time_t insertedAfter) LOCKS_EXCLUDED(access);
size_t
num_loaded() const LOCKS_EXCLUDED(access);

View File

@ -535,6 +535,8 @@ namespace llarp
case ePathExpired:
obj.Put("status", "expired");
break;
case ePathIgnore:
obj.Put("status", "ignored");
default:
obj.Put("status", "unknown");
break;

View File

@ -34,6 +34,7 @@ namespace llarp
ePathBuilding,
ePathEstablished,
ePathTimeout,
ePathIgnore,
ePathExpired
};

View File

@ -219,6 +219,11 @@ namespace llarp
virtual bool
HasSessionTo(const RouterID &router) const = 0;
/// return true if we are currently looking up this router either directly
/// or via an anonymous endpoint
virtual bool
HasPendingRouterLookup(const RouterID &router) const = 0;
virtual util::StatusObject
ExtractStatus() const = 0;

View File

@ -511,7 +511,7 @@ namespace llarp
Router::Close()
{
LogInfo("closing router");
llarp_ev_loop_stop(_netloop.get());
llarp_ev_loop_stop(_netloop);
inboundLinks.clear();
outboundLinks.clear();
disk.stop();
@ -1145,30 +1145,72 @@ namespace llarp
}
void
Router::ServiceNodeLookupRouterWhenExpired(RouterID router)
Router::LookupRouterWhenExpired(RouterID router)
{
using namespace std::placeholders;
dht()->impl->LookupRouter(
router,
std::bind(&Router::HandleDHTLookupForExplore, this, router, _1));
LookupRouter(router,
std::bind(&Router::HandleRouterLookupForExpireUpdate, this,
router, std::placeholders::_1));
}
void
Router::HandleRouterLookupForExpireUpdate(
RouterID router, const std::vector< RouterContact > &result)
{
const auto now = Now();
RouterContact current;
if(nodedb()->Get(router, current))
{
if(current.IsExpired(now))
{
nodedb()->Remove(router);
}
}
if(result.size() == 1 && !result[0].IsExpired(now))
{
LogInfo("storing rc for ", router);
nodedb()->Insert(result[0]);
}
else
{
LogInfo("not storing rc for ", router);
}
}
bool
Router::HasPendingRouterLookup(const RouterID &remote) const
{
if(IsServiceNode())
return dht()->impl->HasRouterLookup(remote);
bool has = false;
_hiddenServiceContext.ForEachService(
[&has, remote](const std::string &,
const std::shared_ptr< service::Endpoint > &ep) -> bool {
has |= ep->HasPendingRouterLookup(remote);
return true;
});
return has;
}
void
Router::LookupRouter(RouterID remote, RouterLookupHandler resultHandler)
{
if(!resultHandler)
{
resultHandler = std::bind(&Router::HandleRouterLookupForExpireUpdate,
this, remote, std::placeholders::_1);
}
if(IsServiceNode())
{
if(resultHandler)
dht()->impl->LookupRouter(remote, resultHandler);
else
ServiceNodeLookupRouterWhenExpired(remote);
return;
dht()->impl->LookupRouter(remote, resultHandler);
}
else
{
_hiddenServiceContext.ForEachService(
[=](const std::string &,
const std::shared_ptr< service::Endpoint > &ep) -> bool {
return !ep->LookupRouterAnon(remote, resultHandler);
});
}
_hiddenServiceContext.ForEachService(
[=](const std::string &,
const std::shared_ptr< service::Endpoint > &ep) -> bool {
return !ep->LookupRouterAnon(remote, resultHandler);
});
}
bool
@ -1190,6 +1232,22 @@ namespace llarp
routerProfiling().Tick();
// try looking up stale routers
nodedb()->VisitInsertedAfter(
[&](const RouterContact &rc) {
if(HasPendingRouterLookup(rc.pubkey))
return;
LookupRouter(rc.pubkey, nullptr);
},
RouterContact::UpdateInterval + now);
std::set< RouterID > removeStale;
// remove stale routers
nodedb()->VisitInsertedAfter(
[&](const RouterContact &rc) { removeStale.insert(rc.pubkey); },
((RouterContact::UpdateInterval * 3) / 2) + now);
nodedb()->RemoveIf([removeStale](const RouterContact &rc) -> bool {
return removeStale.count(rc.pubkey) > 0;
});
if(IsServiceNode())
{
if(_rc.ExpiresSoon(now, randint() % 10000)
@ -1208,14 +1266,6 @@ namespace llarp
return !ConnectionToRouterAllowed(rc.pubkey);
});
*/
// only do this as service node
// client endpoints do this on their own
nodedb()->visit([&](const RouterContact &rc) -> bool {
if(rc.ExpiresSoon(now, randint() % 10000))
ServiceNodeLookupRouterWhenExpired(rc.pubkey);
return true;
});
}
else
{
@ -1848,10 +1898,10 @@ namespace llarp
bool
Router::HasSessionTo(const RouterID &remote) const
{
for(const auto & link : outboundLinks)
for(const auto &link : outboundLinks)
if(link->HasSessionTo(remote))
return true;
for(const auto & link : inboundLinks)
for(const auto &link : inboundLinks)
if(link->HasSessionTo(remote))
return true;
return false;

View File

@ -391,6 +391,9 @@ namespace llarp
bool
HasPendingConnectJob(const RouterID &remote);
bool
HasPendingRouterLookup(const RouterID &remote) const override;
void
try_connect(fs::path rcfile);
@ -431,14 +434,18 @@ namespace llarp
void
TryEstablishTo(const RouterID &remote);
/// lookup a router by pubkey when it expires when we are a service node
/// lookup a router by pubkey when it expires
void
ServiceNodeLookupRouterWhenExpired(RouterID remote);
LookupRouterWhenExpired(RouterID remote);
void
HandleDHTLookupForExplore(
RouterID remote, const std::vector< RouterContact > &results) override;
void
HandleRouterLookupForExpireUpdate(
RouterID remote, const std::vector< RouterContact > &results);
void
ForEachPeer(std::function< void(const ILinkSession *, bool) > visit,
bool randomize = false) const override;

View File

@ -31,6 +31,8 @@ namespace llarp
/// 1 day for real network
llarp_time_t RouterContact::Lifetime = 24 * 60 * 60 * 1000;
#endif
/// every 30 minutes an RC is stale and needs updating
llarp_time_t RouterContact::UpdateInterval = 30 * 60 * 1000;
NetID::NetID(const byte_t *val) : AlignedBuffer< 8 >()
{
@ -231,28 +233,15 @@ namespace llarp
bool
RouterContact::IsExpired(llarp_time_t now) const
{
/*
auto expiresAt = last_updated + Lifetime;
const auto expiresAt = last_updated + Lifetime;
return now >= expiresAt;
*/
(void)now;
return false;
}
bool
RouterContact::ExpiresSoon(llarp_time_t now, llarp_time_t dlt) const
{
(void)now;
(void)dlt;
return false;
/*
if(IsExpired(now))
{
return true;
}
auto expiresAt = last_updated + Lifetime;
return expiresAt - now <= dlt;
*/
const auto expiresAt = last_updated + Lifetime;
return expiresAt <= now || expiresAt - now <= dlt;
}
std::string
@ -278,7 +267,7 @@ namespace llarp
}
bool
RouterContact::Verify(llarp_time_t now) const
RouterContact::Verify(llarp_time_t now, bool allowExpired) const
{
if(netID != NetID::DefaultValue())
{
@ -288,8 +277,12 @@ namespace llarp
}
if(IsExpired(now))
{
llarp::LogError("RC is expired");
return false;
if(!allowExpired)
{
llarp::LogError("RC is expired");
return false;
}
llarp::LogWarn("RC is expired");
}
for(const auto &a : addrs)
{

View File

@ -69,6 +69,7 @@ namespace llarp
static bool IgnoreBogons;
static llarp_time_t Lifetime;
static llarp_time_t UpdateInterval;
RouterContact()
{
@ -164,7 +165,7 @@ namespace llarp
SetNick(const std::string &nick);
bool
Verify(llarp_time_t now) const;
Verify(llarp_time_t now, bool allowExpired = true) const;
bool
Sign(const llarp::SecretKey &secret);
@ -173,6 +174,7 @@ namespace llarp
bool
ExpiresSoon(llarp_time_t now, llarp_time_t dlt = 60000) const;
/// returns true if this RC is expired and should be removed
bool
IsExpired(llarp_time_t now) const;

View File

@ -186,6 +186,12 @@ namespace llarp
return true;
}
bool
Endpoint::HasPendingRouterLookup(const RouterID remote) const
{
return m_PendingRouters.find(remote) != m_PendingRouters.end();
}
bool
Endpoint::IntrosetIsStale() const
{
@ -821,10 +827,11 @@ namespace llarp
Endpoint::HandleDataMessage(const PathID_t& src,
std::shared_ptr< ProtocolMessage > msg)
{
msg->sender.UpdateAddr();
auto path = GetPathByID(src);
if(path)
PutReplyIntroFor(msg->tag, path->intro);
msg->sender.UpdateAddr();
PutSenderFor(msg->tag, msg->sender);
PutIntroFor(msg->tag, msg->introReply);
EnsureReplyPath(msg->sender);
return ProcessDataMessage(msg);
@ -1070,11 +1077,9 @@ namespace llarp
}
bool
Endpoint::SendToServiceOrQueue(const RouterID& addr,
Endpoint::SendToServiceOrQueue(const service::Address& remote,
const llarp_buffer_t& data, ProtocolType t)
{
service::Address remote(addr.as_array());
// inbound converstation
auto now = Now();
@ -1104,7 +1109,6 @@ namespace llarp
if(p)
{
f.T = tag;
break;
}
}
}
@ -1120,8 +1124,9 @@ namespace llarp
m.introReply = p->intro;
PutReplyIntroFor(f.T, m.introReply);
m.sender = m_Identity.pub;
m.seqno = GetSeqNoForConvo(f.T);
f.S = 1;
f.F = m.introReply.pathID;
f.S = GetSeqNoForConvo(f.T);
transfer->P = remoteIntro.pathID;
if(!f.EncryptAndSign(m, K, m_Identity))
{

View File

@ -207,8 +207,8 @@ namespace llarp
HandlePathBuilt(path::Path_ptr path) override;
bool
SendToServiceOrQueue(const RouterID& addr, const llarp_buffer_t& payload,
ProtocolType t);
SendToServiceOrQueue(const service::Address& addr,
const llarp_buffer_t& payload, ProtocolType t);
bool
SendToSNodeOrQueue(const RouterID& addr, const llarp_buffer_t& payload);
@ -250,6 +250,10 @@ namespace llarp
void
EnsurePathToSNode(const RouterID& remote, SNodeEnsureHook h);
/// return true if this endpoint is trying to lookup this router right now
bool
HasPendingRouterLookup(const RouterID remote) const;
bool
HasPathToSNode(const RouterID& remote) const;

View File

@ -59,7 +59,7 @@ namespace llarp
ProtocolFrame f;
f.N.Randomize();
f.T = currentConvoTag;
f.S = m_Endpoint->GetSeqNoForConvo(f.T);
f.S = ++sequenceNo;
auto now = m_Endpoint->Now();
if(remoteIntro.ExpiresSoon(now))
@ -87,7 +87,7 @@ namespace llarp
m_DataHandler->PutIntroFor(f.T, remoteIntro);
m_DataHandler->PutReplyIntroFor(f.T, path->intro);
m.proto = t;
m.seqno = sequenceNo++;
m.seqno = m_Endpoint->GetSeqNoForConvo(f.T);
m.introReply = path->intro;
f.F = m.introReply.pathID;
m.sender = m_Endpoint->GetIdentity().pub;

View File

@ -25,6 +25,7 @@ namespace llarp
LogContext::LogContext()
: logStream(std::make_unique< Stream_t >(_LOGSTREAM_INIT))
, started(llarp::time_now_ms())
{
}
@ -35,6 +36,17 @@ namespace llarp
return ctx;
}
log_timestamp::log_timestamp() : log_timestamp("%c %Z")
{
}
log_timestamp::log_timestamp(const char* fmt)
: format(fmt)
, now(llarp::time_now_ms())
, delta(llarp::time_now_ms() - LogContext::Instance().started)
{
}
void
SetLogLevel(LogLevel lvl)
{

View File

@ -86,6 +86,9 @@ namespace llarp
LogLevel minLevel = eLogInfo;
ILogStream_ptr logStream;
std::string nodeName;
const llarp_time_t started;
static LogContext&
Instance();
};

View File

@ -45,14 +45,12 @@ namespace llarp
struct log_timestamp
{
const char* format;
const llarp_time_t now;
const llarp_time_t delta;
log_timestamp() : format("%c %Z")
{
}
log_timestamp();
explicit log_timestamp(const char* fmt) : format(fmt)
{
}
explicit log_timestamp(const char* fmt);
};
inline std::ostream&
@ -60,10 +58,11 @@ namespace llarp
{
#if defined(ANDROID) || defined(RPI)
(void)ts;
return out << time_now_ms();
return out << ts.now << " [+" << ts.delta << " ms]";
#else
absl::TimeZone tz = absl::LocalTimeZone();
return out << absl::FormatTime(ts.format, absl::Now(), tz);
return out << absl::FormatTime(ts.format, absl::FromUnixMillis(ts.now), tz)
<< " [+" << ts.delta << " ms]";
#endif
}

View File

@ -11,26 +11,32 @@ namespace llarp
struct _overloaded;
template < typename T, typename... Ts >
struct _overloaded<T, Ts...> : T, _overloaded<Ts...>
struct _overloaded< T, Ts... > : T, _overloaded< Ts... >
{
_overloaded(T&& t, Ts&&... ts) : T(t), _overloaded<Ts...>(std::forward<Ts>(ts)...) {}
_overloaded(T&& t, Ts&&... ts)
: T(t), _overloaded< Ts... >(std::forward< Ts >(ts)...)
{
}
using T::operator();
using _overloaded< Ts... >::operator();
};
template<typename T>
struct _overloaded<T> : T
template < typename T >
struct _overloaded< T > : T
{
_overloaded(T&& t) : T(t) {}
_overloaded(T&& t) : T(t)
{
}
using T::operator();
};
template < typename... Ts >
constexpr auto overloaded(Ts&&... ts)->_overloaded< Ts... >
constexpr auto
overloaded(Ts&&... ts) -> _overloaded< Ts... >
{
return _overloaded<Ts...>(std::forward<Ts>(ts)...);
return _overloaded< Ts... >(std::forward< Ts >(ts)...);
}
} // namespace util

View File

@ -43,73 +43,7 @@ namespace llarp
if(l == nullptr)
return 0;
LogDebug("utp_sendto ", Addr(*arg->address), " ", arg->len, " bytes");
// For whatever reason, the UTP_UDP_DONTFRAG flag is set
// on the socket itself....which isn't correct and causes
// winsock (at minimum) to reeee
// here, we check its value, then set fragmentation the _right_
// way. Naturally, Linux has its own special procedure.
// Of course, the flag itself is cleared. -rick
#ifndef _WIN32
// No practical method of doing this on NetBSD or Darwin
// without resorting to raw sockets
#if !(__NetBSD__ || __OpenBSD__ || (__APPLE__ && __MACH__))
#ifndef __linux__
if(arg->flags == 2)
{
int val = 1;
setsockopt(l->m_udp.fd, IPPROTO_IP, IP_DONTFRAGMENT, &val, sizeof(val));
}
else
{
int val = 0;
setsockopt(l->m_udp.fd, IPPROTO_IP, IP_DONTFRAGMENT, &val, sizeof(val));
}
#else
if(arg->flags == 2)
{
int val = IP_PMTUDISC_DO;
setsockopt(l->m_udp.fd, IPPROTO_IP, IP_MTU_DISCOVER, &val, sizeof(val));
}
else
{
int val = IP_PMTUDISC_DONT;
setsockopt(l->m_udp.fd, IPPROTO_IP, IP_MTU_DISCOVER, &val, sizeof(val));
}
#endif
#endif
arg->flags = 0;
if(::sendto(l->m_udp.fd, (char*)arg->buf, arg->len, arg->flags,
arg->address, arg->address_len)
== -1
&& errno)
#else
if(arg->flags == 2)
{
char val = 1;
setsockopt(l->m_udp.fd, IPPROTO_IP, IP_DONTFRAGMENT, &val, sizeof(val));
}
else
{
char val = 0;
setsockopt(l->m_udp.fd, IPPROTO_IP, IP_DONTFRAGMENT, &val, sizeof(val));
}
arg->flags = 0;
if(::sendto(l->m_udp.fd, (char*)arg->buf, arg->len, arg->flags,
arg->address, arg->address_len)
== -1)
#endif
{
#ifdef _WIN32
char buf[1024];
int err = WSAGetLastError();
FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, nullptr, err, LANG_NEUTRAL,
buf, 1024, nullptr);
LogError("sendto failed: ", buf);
#else
LogError("sendto failed: ", strerror(errno));
#endif
}
return 0;
return l->m_udp.sendto(&l->m_udp, arg->address, arg->buf, arg->len);
}
uint64

View File

@ -67,12 +67,13 @@ Build requirements:
* CMake
* C++ 17 capable C++ compiler
* gcovr (if generating test coverage with gcc)
* libuv >= 1.27.0
### Linux
build:
$ sudo apt install build-essential cmake git libcap-dev wget
$ sudo apt install build-essential cmake git libcap-dev curl libuv1-dev
$ git clone https://github.com/loki-project/loki-network
$ cd loki-network
$ make -j8
@ -153,7 +154,7 @@ TODO: add pkgsrc instructions
build:
# pkg_add wget cmake git (optional: ninja ccache)
# pkg_add curl cmake git (optional: ninja ccache)
$ git clone https://github.com/loki-project/loki-network
$ cd loki-network
$ gmake -j8
@ -166,7 +167,7 @@ install (root):
build:
$ pkg install wget cmake git
$ pkg install cmake git curl libuv-1.27.0
$ git clone https://github.com/loki-project/loki-network
$ cd loki-network
$ gmake -j8

55
release.md Normal file
View File

@ -0,0 +1,55 @@
The windows release is signed by rick, his public key is:
```
-----BEGIN PGP PUBLIC KEY BLOCK-----
mDMEXBhAthYJKwYBBAHaRw8BAQdACj8fXcXB+ktPL/gNRBGZajE9ycsQOiMPXigH
0uP6BCW0G1JpY2sgViA8cmlja0Bzbm93bGlnaHQubmV0PoiQBBMWCAA4FiEEsbLw
yIc/Y1IT8TNLwO3Icj/cNGUFAlwYQLYCGyMFCwkIBwIGFQoJCAsCBBYCAwECHgEC
F4AACgkQwO3Icj/cNGUeCwEAuyFfehigul3So0xOuRIxldiHoqLJfSEp4kjU+8b5
NjsBAIOC4KFpdv8CTPa/aQgRIx/UlOjJ8vMnS94XPSs2vRcDuDgEXBhAthIKKwYB
BAGXVQEFAQEHQKT2GHP2O+q5vgXd6D4IiOu8rI+kcGllVY/0DEqGesJYAwEIB4h4
BBgWCAAgFiEEsbLwyIc/Y1IT8TNLwO3Icj/cNGUFAlwYQLYCGwwACgkQwO3Icj/c
NGX/tgD9GES37acIhovhMzDj0u9oU/1HqNyx4A45EQ90dP8KMN4BALBRzWXgB23t
9r6g3ZWHQJEpF4RnmcbDbR0SxdyoCkQG
=RdBx
-----END PGP PUBLIC KEY BLOCK-----
```
The linux and macos releases are signed by jeff, his public key is:
```
-----BEGIN PGP PUBLIC KEY BLOCK-----
mDMEWZx2ERYJKwYBBAHaRw8BAQdAKxsq4dGzYzKJqU8Vin5d8vJF10/NG4Hziw+f
WTbM8nC0MEplZmYgQmVja2VyIChwcm9iYWJseSBub3QgZXZpbCkgPGplZmZAaTJw
LnJvY2tzPoh5BBMWCAAhBQJZnHYRAhsDBQsJCAcCBhUICQoLAgQWAgMBAh4BAheA
AAoJEPNXs7Qvb5sFP2MBAIcL8KOd/RupEtSMyb2f4OBsaE8oFU+NsvfevW0XrBBQ
AQDhjax9f2D0k30pj4uYBJRb/L0JJFfbzI+uwgTtgRp1DLg4BFmcdhESCisGAQQB
l1UBBQEBB0BJOuegxPmX1Ma/nv4O2lZp0rA89EazPgtUrR3e1846DQMBCAeIYQQY
FggACQUCWZx2EQIbDAAKCRDzV7O0L2+bBUgkAPsEeiiut+gGECP/63m7NyTwruNP
oVZUYE1m8XXbHr28UgEA4nXGIAHDRuIUY4sRcVQz2Um9O6kaCdQHH0eSPE48VQ8=
=gFkp
-----END PGP PUBLIC KEY BLOCK-----
```
To verify the releases first import those keys into gpg key database
run the following, copy paste both keys and press `^D` (control - D) to finish
$ gpg --import
Alternatively you can get jeff's key off a key server:
$ gpg --recv-key 67EF6BA68E7B0B0D6EB4F7D4F357B3B42F6F9B05 # jeff's key
then verify the signatures, make sure that the `.sig` file and the release file
are in the same directory.
$ gpg --verify release-file.exe.sig

View File

@ -69,3 +69,7 @@ else()
target_sources(${TEST_EXE} PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/win32/test.rc")
target_link_libraries(${TEST_EXE} PUBLIC ${FS_LIB} ws2_32 iphlpapi shlwapi)
endif(NOT WIN32)
if(${CMAKE_SYSTEM_NAME} MATCHES "FreeBSD")
target_link_directories(${TEST_EXE} PRIVATE /usr/local/lib)
endif()

View File

@ -42,7 +42,7 @@ struct EventLoopTest : public ::testing::Test
void
Stop()
{
llarp_ev_loop_stop(loop.get());
llarp_ev_loop_stop(loop);
}
void

View File

@ -121,7 +121,7 @@ struct LinkLayerTest : public test::LlarpTest< NoOpCrypto >
oldRCLifetime = RouterContact::Lifetime;
RouterContact::IgnoreBogons = true;
RouterContact::Lifetime = 500;
netLoop = llarp_make_ev_loop();
netLoop = llarp_make_uv_loop();
m_logic.reset(new Logic());
}
@ -141,6 +141,7 @@ struct LinkLayerTest : public test::LlarpTest< NoOpCrypto >
{
if(left)
return;
llarp::LogInfo("timed out test");
static_cast< LinkLayerTest* >(u)->Stop();
}
@ -154,7 +155,7 @@ struct LinkLayerTest : public test::LlarpTest< NoOpCrypto >
void
Stop()
{
llarp_ev_loop_stop(netLoop.get());
llarp_ev_loop_stop(netLoop);
}
bool

View File

@ -48,7 +48,7 @@ struct AbyssTestBase : public ::testing::Test
Start()
{
threadpool = llarp_init_same_process_threadpool();
loop = llarp_make_ev_loop();
loop = llarp_make_uv_loop();
logic = std::make_shared< llarp::Logic >(threadpool);
sockaddr_in addr;
@ -72,7 +72,7 @@ struct AbyssTestBase : public ::testing::Test
Stop()
{
llarp::LogDebug("test case Stop() called");
llarp_ev_loop_stop(loop.get());
llarp_ev_loop_stop(loop);
}
void