pull/1/head
Ryan Tharp 6 years ago
commit 7b8ef635e4

5
.gitignore vendored

@ -24,4 +24,7 @@ callgrind.*
*.sig
*.signed
*.key
*.key
shadow.config.xml
*.log

@ -1,17 +1,51 @@
cmake_minimum_required(VERSION 2.8.10)
set(WITH_SHARED OFF)
set(DEBUG_FLAGS "")
macro(add_cflags)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${ARGN}")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${ARGN}")
endmacro(add_cflags)
if(SHADOW)
set(WITH_STATIC OFF)
set(WITH_SHARED OFF)
else()
set(WITH_STATIC ON)
set(WITH_SHARED OFF)
endif()
set(DEBUG_FLAGS "-g")
set(OPTIMIZE_FLAGS "-Os")
if(ASAN)
set(DEBUG_FLAGS "${DEBUG_FLAGS} -g -fsanitize=address -fno-omit-frame-pointer")
set(DEBUG_FLAGS "${DEBUG_FLAGS} -fsanitize=address -fno-omit-frame-pointer")
set(OPTIMIZE_FLAGS "-O0")
endif(ASAN)
if(SHADOW)
if("${SHADOW_ROOT}" STREQUAL "")
set(SHADOW_ROOT "$ENV{HOME}/.shadow")
endif("${SHADOW_ROOT}" STREQUAL "")
if(EXISTS "${SHADOW_ROOT}")
message(STATUS "SHADOW_ROOT = ${SHADOW_ROOT}")
else()
message(FATAL_ERROR "SHADOW_ROOT path does not exist: '${SHADOW_ROOT}'")
endif()
set(CMAKE_MODULE_PATH "${SHADOW_ROOT}/share/cmake/Modules")
include_directories(${CMAKE_MODULE_PATH})
include(ShadowTools)
add_cflags("-fno-inline -fno-strict-aliasing")
add_definitions(-DTESTNET=true)
add_definitions(-DSHADOW_TESTNET)
include_directories(${SHADOW_ROOT}/include)
endif()
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -std=c11 -Wall ${DEBUG_FLAGS} ${OPTIMIZE_FLAGS}")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++17 -Wall ${DEBUG_FLAGS} ${OPTIMIZE_FLAGS}")
if(SHADOW)
add_cflags("-fPIC")
endif()
if(NOT GIT_VERSION)
exec_program("git" ${CMAKE_CURRENT_SOURCE_DIR} ARGS "rev-parse --short HEAD" OUTPUT_VARIABLE GIT_VERSION)
add_definitions(-DGIT_REV="${GIT_VERSION}")
@ -22,7 +56,7 @@ if(RELEASE_MOTTO)
endif()
set(EXE llarpd)
set(EXE_SRC daemon/main.cpp)
set(EXE_SRC daemon/main.c)
if(SODIUM_INCLUDE_DIR)
include_directories(${SODIUM_INCLUDE_DIR})
@ -66,21 +100,39 @@ set(LIB_SRC
llarp/router.cpp
llarp/router_identity.c
llarp/threadpool.cpp
llarp/testnet.c
llarp/time.cpp
llarp/timer.cpp
)
include_directories(include)
add_library(${STATIC_LIB} STATIC ${LIB_SRC})
if(WITH_SHARED)
add_library(${SHARED_LIB} SHARED ${LIB_SRC})
target_link_libraries(${SHARED_LIB} ${LIBS})
endif()
add_executable(${EXE} ${EXE_SRC})
target_link_libraries(${STATIC_LIB} ${LIBS})
target_link_libraries(${EXE} ${STATIC_LIB})
if(SHADOW)
add_shadow_plugin(shadow-plugin-${SHARED_LIB} ${EXE_SRC} ${LIB_SRC})
target_link_libraries(shadow-plugin-${SHARED_LIB} ${LIBS})
install(TARGETS shadow-plugin-${SHARED_LIB} DESTINATION plugins)
else()
add_executable(rcutil daemon/rcutil.cpp)
add_executable(${EXE} ${EXE_SRC})
if(WITH_STATIC)
add_library(${STATIC_LIB} STATIC ${LIB_SRC})
target_link_libraries(${STATIC_LIB} ${LIBS})
if(NOT WITH_SHARED)
target_link_libraries(rcutil ${STATIC_LIB})
target_link_libraries(${EXE} ${STATIC_LIB})
endif()
endif()
if(WITH_SHARED)
add_library(${SHARED_LIB} SHARED ${LIB_SRC})
target_link_libraries(${SHARED_LIB} ${LIBS})
if(NOT WITH_STATIC)
target_link_libraries(rcutil ${SHARED_LIB})
target_link_libraries(${EXE} ${SHARED_LIB})
endif()
endif()
add_executable(rcutil daemon/rcutil.cpp)
target_link_libraries(rcutil ${STATIC_LIB} ${LIBS})
endif()

@ -6,24 +6,24 @@ SIGN = gpg --sign --detach
TARGETS = llarpd libllarp.so libllarp-static.a
SIGS = $(TARGETS:=.sig)
SHADOW_ROOT ?= $(HOME)/.shadow
SHADOW_BIN=$(SHADOW_ROOT)/bin/shadow
SHADOW_CONFIG=shadow.config.xml
SHADOW_PLUGIN=libshadow-plugin-llarp.so
clean:
rm -f build.ninja rules.ninja cmake_install.cmake CMakeCache.txt
rm -rf CMakeFiles
rm -f $(TARGETS)
rm -f $(SHADOW_PLUGIN) $(SHADOW_CONFIG)
rm -f *.sig
debug-configure: clean
cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DASAN=true
cmake -GNinja -DCMAKE_BUILD_TYPE=Debug
release-configure: clean
cmake -GNinja -DCMAKE_BUILD_TYPE=Release -DRELEASE_MOTTO="$(shell cat motto.txt)"
configure: clean
cmake -GNinja -DCMAKE_BUILD_TYPE=Debug
build: configure
ninja
debug: debug-configure
ninja
@ -38,5 +38,16 @@ $(TARGETS): release-compile
release: $(SIGS)
shadow-configure: clean
cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DSHADOW=ON
shadow-build: shadow-configure
ninja clean
ninja
shadow: shadow-build
python3 contrib/shadow/genconf.py $(SHADOW_CONFIG)
bash -c "$(SHADOW_BIN) -w 16 $(SHADOW_CONFIG) &> shadow.log.txt"
format:
clang-format -i $$(find daemon llarp include | grep -E '\.[h,c](pp)?$$')

@ -0,0 +1,117 @@
#!/usr/bin/env python3
import configparser
import sys
import os
from xml.etree import ElementTree as etree
getSetting = lambda s, name, fallback : name in s and s[name] or fallback
shadowRoot = getSetting(os.environ, "SHADOW_ROOT", os.path.join(os.environ['HOME'], '.shadow'))
libpath = 'libshadow-plugin-llarp.so'
def nodeconf(conf, baseDir, name, ifname, port):
conf['netdb'] = {'dir': 'tmp-nodes'}
conf['router'] = {}
conf['router']['contact-file'] = os.path.join(baseDir, '{}.signed'.format(name))
conf['router']['ident-privkey'] = os.path.join(baseDir, '{}-ident.key'.format(name))
conf['router']['transport-privkey'] = os.path.join(baseDir, '{}-transport.key'.format(name))
if ifname != '*':
conf['iwp-links'] = {'*' : port + 1000, ifname: port}
else:
conf['iwp-links'] = {ifname : port}
conf['iwp-connect'] = {}
def addPeer(conf, baseDir, peer):
conf['iwp-connect'][peer] = os.path.join(baseDir, '{}.signed'.format(peer))
def createNode(pluginName, root, peer):
node = etree.SubElement(root, 'host')
node.attrib['id'] = peer['name']
node.attrib['interfacebuffer'] = '{}'.format(1024 * 1024 * 100)
app = etree.SubElement(node, 'process')
app.attrib['plugin'] = pluginName
app.attrib['time'] = '50'
app.attrib['arguments'] = peer['configfile']
def makeBase(settings, name, id):
return {
'id': id,
'name' : name,
'contact-file' : os.path.join(getSetting(settings, 'baseDir', 'tmp'), '{}.signed'.format(name)),
'configfile' : os.path.join(getSetting(settings, 'baseDir', 'tmp'), '{}.ini'.format(name)),
'config': configparser.ConfigParser()
}
def makeClient(settings, name, id, port):
peer = makeBase(settings, name, id)
nodeconf(peer['config'], getSetting(settings, 'baseDir', 'tmp'), name, '*', port)
return peer
def makeSVCNode(settings, name, id, port):
peer = makeBase(settings, name, id)
nodeconf(peer['config'], getSetting(settings, 'baseDir', 'tmp'), name, 'eth0', port)
return peer
def genconf(settings, outf):
root = etree.Element('shadow')
topology = etree.SubElement(root, 'topology')
topology.attrib['path'] = getSetting(settings, 'topology', os.path.join(shadowRoot, 'share', 'topology.graphml.xml'))
pluginName = getSetting(settings, 'name', 'llarpd')
kill = etree.SubElement(root, 'kill')
kill.attrib['time'] = getSetting(settings, 'runFor', '600')
baseDir = getSetting(settings, 'baseDir', 'tmp')
if not os.path.exists(baseDir):
os.mkdir(baseDir)
plugin = etree.SubElement(root, "plugin")
plugin.attrib['id'] = pluginName
plugin.attrib['path'] = libpath
basePort = getSetting(settings, 'svc-base-port', 19000)
clientBasePort = getSetting(settings, 'client-base-port', 18000)
svcNodeCount = getSetting(settings, 'service-nodes', 20)
peers = list()
for nodeid in range(svcNodeCount):
peers.append(makeSVCNode(settings, 'svc-node-{}'.format(nodeid), str(nodeid), basePort + 1))
basePort += 1
# make all service nodes know each other
for peer in peers:
for nodeid in range(svcNodeCount):
if str(nodeid) != peer['id']:
addPeer(peer['config'], baseDir, 'svc-node-{}'.format(nodeid))
# add client nodes
for nodeid in range(getSetting(settings, 'client-nodes', 200)):
peer = makeClient(settings, 'client-node-{}'.format(nodeid), str(nodeid), clientBasePort +1)
peers.append(peer)
for p in range(getSetting(settings, 'client-connect-to', 3)):
addPeer(peer['config'], baseDir, 'svc-node-{}'.format((p + nodeid) % svcNodeCount))
clientBasePort += 1
# generate xml and settings files
for peer in peers:
createNode(pluginName, root, peer)
with open(peer['configfile'], 'w') as f:
peer['config'].write(f)
# render
outf.write(etree.tostring(root).decode('utf-8'))
if __name__ == '__main__':
settings = {
'topology': os.path.join(shadowRoot, 'share', 'topology.graphml.xml')
}
with open(sys.argv[1], 'w') as f:
genconf(settings, f)

@ -8,9 +8,11 @@ ident-privkey=server-ident.key
dir=./tmp-nodes
[iwp-connect]
i2p.rocks=i2p.rocks.signed
#other=other.signed
#named-node1=/path/to/routercontact1
[iwp-links]
lo=1090
*=1091
#lo=1090
#lo=eth

@ -1,6 +1,5 @@
#include <llarp.h>
#include <signal.h>
#include <memory>
struct llarp_main *ctx = 0;
@ -11,13 +10,17 @@ handle_signal(int sig)
llarp_main_signal(ctx, sig);
}
#ifndef TESTNET
#define TESTNET 0
#endif
int
main(int argc, char *argv[])
{
const char *conffname = "daemon.ini";
if(argc > 1)
conffname = argv[1];
ctx = llarp_main_init(conffname);
ctx = llarp_main_init(conffname, !TESTNET);
int code = 1;
if(ctx)
{

@ -17,7 +17,7 @@ struct llarp_main;
/** initialize application context and load config */
struct llarp_main *
llarp_main_init(const char *fname);
llarp_main_init(const char *fname, bool multiProcess);
/** handle signal for main context */
void

@ -11,10 +11,11 @@ namespace llarp
{
struct Context
{
Context(std::ostream &stdout);
Context(std::ostream &stdout, bool signleThread = false);
~Context();
int num_nethreads = 1;
int num_nethreads = 1;
bool singleThreaded = false;
std::vector< std::thread > netio_threads;
llarp_crypto crypto;
llarp_router *router = nullptr;

@ -13,34 +13,22 @@ namespace llarp
template < size_t sz >
struct AlignedBuffer
{
AlignedBuffer()
{
Zero();
}
AlignedBuffer(const AlignedBuffer& other) : AlignedBuffer(other.data())
{
}
AlignedBuffer() = default;
AlignedBuffer(const byte_t* data)
{
for(size_t idx = 0; idx < sz; ++idx)
buf.b[idx] = data[idx];
b[idx] = data[idx];
}
AlignedBuffer&
operator=(const byte_t* data)
{
for(size_t idx = 0; idx < sz; ++idx)
buf.b[idx] = data[idx];
b[idx] = data[idx];
return *this;
}
byte_t& operator[](size_t idx)
{
return buf.b[idx];
}
friend std::ostream&
operator<<(std::ostream& out, const AlignedBuffer& self)
{
@ -48,7 +36,7 @@ namespace llarp
out << std::hex << std::setw(2) << std::setfill('0');
while(idx < sz)
{
out << (int)self.buf.b[idx++];
out << (int)self.b[idx++];
}
return out << std::dec << std::setw(0) << std::setfill(' ');
}
@ -75,54 +63,54 @@ namespace llarp
Zero()
{
for(size_t idx = 0; sz < idx / 8; ++idx)
buf.l[idx] = 0;
l[idx] = 0;
}
void
Randomize()
{
randombytes(buf.b, sz);
randombytes(l, sz);
}
byte_t*
data()
{
return &buf.b[0];
return &b[0];
}
const byte_t*
data() const
{
return &buf.b[0];
return &b[0];
}
uint64_t*
data_l()
{
return &buf.l[0];
return &l[0];
}
const uint64_t*
data_l() const
{
return &buf.l[0];
return &l[0];
}
operator const byte_t*() const
{
return &buf.b[0];
return &b[0];
}
operator byte_t*()
{
return &buf.b[0];
return &b[0];
}
private:
union {
byte_t b[sz];
uint64_t l[sz / 8];
} buf;
};
};
}

@ -186,13 +186,13 @@ struct iwp_async_frame
/// a pointer to pass ourself
void *user;
/// current session key
uint8_t *sessionkey;
byte_t *sessionkey;
/// size of the frame
size_t sz;
/// result handler
iwp_async_frame_hook hook;
/// memory holding the entire frame
uint8_t buf[1500];
byte_t buf[1500];
};
/// decrypt iwp frame asynchronously

@ -52,7 +52,7 @@ struct llarp_router_lookup_job
struct llarp_dht_context* dht;
llarp_pubkey_t target;
bool found;
llarp_rc result;
struct llarp_rc result;
};
// shallow copy

@ -17,6 +17,10 @@
extern "C" {
#endif
// forward declare
struct llarp_threadpool;
struct llarp_logic;
struct llarp_ev_loop;
/// allocator
@ -31,6 +35,11 @@ llarp_ev_loop_free(struct llarp_ev_loop **ev);
int
llarp_ev_loop_run(struct llarp_ev_loop *ev);
void
llarp_ev_loop_run_single_process(struct llarp_ev_loop *ev,
struct llarp_threadpool *tp,
struct llarp_logic *logic);
/// stop event loop and wait for it to complete all jobs
void
llarp_ev_loop_stop(struct llarp_ev_loop *ev);

@ -15,7 +15,7 @@ extern "C" {
#endif
/** 2^15 bytes */
const size_t MAX_LINK_MSG_SIZE = 32768;
#define MAX_LINK_MSG_SIZE (32768)
/**
* wire layer transport interface

@ -23,6 +23,7 @@ namespace llarp
RouterID remote = {};
uint64_t version = 0;
ILinkMessage() = default;
ILinkMessage(const RouterID& id);
virtual ~ILinkMessage(){};

@ -19,7 +19,7 @@ namespace llarp
struct Logger
{
LogLevel minlevel = eLogDebug;
LogLevel minlevel = eLogInfo;
std::ostream& out = std::cout;
};
@ -52,7 +52,7 @@ namespace llarp
if(_glog.minlevel > lvl)
return;
std::stringstream ss("");
std::stringstream ss;
switch(lvl)
{
case eLogDebug:
@ -72,19 +72,23 @@ namespace llarp
ss << "[ERR] ";
break;
}
auto t = std::time(nullptr);
auto now = std::localtime(&t);
std::time_t t;
std::time(&t);
std::string tag = fname;
auto pos = tag.rfind('/');
if(pos != std::string::npos)
tag = tag.substr(pos + 1);
while(tag.size() % 8)
tag += " ";
ss << std::put_time(now, "%F %T") << " " << tag << "\t";
ss << std::put_time(std::localtime(&t), "%F %T") << " " << tag;
auto sz = tag.size() % 8;
while(sz--)
ss << " ";
ss << "\t";
LogAppend(ss, std::forward< TArgs >(args)...);
ss << (char)27 << "[0;0m";
_glog.out << ss.str() << std::endl;
#ifdef SHADOW_TESTNET
_glog.out << "\n" << std::flush;
#endif
}
}

@ -12,6 +12,14 @@ struct llarp_logic;
struct llarp_logic*
llarp_init_logic();
/// single threaded mode logic event loop
struct llarp_logic*
llarp_init_single_process_logic(struct llarp_threadpool* tp);
/// single threaded tick
void
llarp_logic_tick(struct llarp_logic* logic);
void
llarp_free_logic(struct llarp_logic** logic);

@ -4,24 +4,22 @@
namespace llarp
{
const std::size_t MAX_DISCARD_SIZE = 10000;
/// a dummy link message that is discarded
struct DiscardMessage : public ILinkMessage
{
std::vector< byte_t > Z;
~DiscardMessage()
{
}
byte_t pad[MAX_DISCARD_SIZE];
size_t sz = 0;
DiscardMessage(const RouterID& id) : ILinkMessage(id)
{
}
DiscardMessage(const RouterID& other, std::size_t padding)
: ILinkMessage(other)
DiscardMessage(std::size_t padding) : ILinkMessage()
{
Z.resize(padding);
std::fill(Z.begin(), Z.end(), 'z');
sz = padding;
memset(pad, 'z', sz);
}
virtual bool
@ -38,8 +36,10 @@ namespace llarp
{
if(!bencode_read_string(buf, &strbuf))
return false;
Z.resize(strbuf.sz);
memcpy(Z.data(), strbuf.base, strbuf.sz);
if(strbuf.sz > MAX_DISCARD_SIZE)
return false;
sz = strbuf.sz;
memcpy(pad, strbuf.base, sz);
return true;
}
return false;
@ -61,7 +61,7 @@ namespace llarp
if(!bencode_write_bytestring(buf, "z", 1))
return false;
if(!bencode_write_bytestring(buf, Z.data(), Z.size()))
if(!bencode_write_bytestring(buf, pad, sz))
return false;
return bencode_end(buf);
@ -71,7 +71,7 @@ namespace llarp
HandleMessage(llarp_router* router) const
{
(void)router;
llarp::Info("got discard message of size ", Z.size(), " bytes");
llarp::Info("got discard message of size ", sz, " bytes");
return true;
}
};

@ -5,7 +5,7 @@ namespace llarp
{
struct LinkIntroMessage : public ILinkMessage
{
LinkIntroMessage(llarp_rc* rc) : ILinkMessage({}), RC(rc)
LinkIntroMessage(llarp_rc* rc) : ILinkMessage(), RC(rc)
{
}

@ -4,6 +4,7 @@
#include <llarp/ev.h>
#include <llarp/link.h>
#include <llarp/logic.h>
#include <llarp/nodedb.h>
#include <llarp/router_contact.h>
#include <llarp/threadpool.h>

@ -7,7 +7,10 @@
extern "C" {
#endif
const size_t MAX_RC_SIZE = 1024;
// forward declare
struct llarp_alloc;
#define MAX_RC_SIZE (1024)
struct llarp_rc
{

@ -8,6 +8,11 @@ struct llarp_threadpool;
struct llarp_threadpool *
llarp_init_threadpool(int workers, const char *name);
/// for single process mode
struct llarp_threadpool *
llarp_init_same_process_threadpool();
void
llarp_free_threadpool(struct llarp_threadpool **tp);
@ -20,8 +25,24 @@ struct llarp_thread_job
void *user;
/** called in threadpool worker thread */
llarp_thread_work_func work;
#ifdef __cplusplus
llarp_thread_job(void *u, llarp_thread_work_func w) : user(u), work(w)
{
}
llarp_thread_job() : user(nullptr), work(nullptr)
{
}
#endif
};
/// for single process mode
void
llarp_threadpool_tick(struct llarp_threadpool *tp);
void
llarp_threadpool_queue_job(struct llarp_threadpool *tp,
struct llarp_thread_job j);

@ -39,6 +39,11 @@ llarp_timer_stop(struct llarp_timer_context *t);
void
llarp_timer_run(struct llarp_timer_context *t, struct llarp_threadpool *pool);
/// single threaded run timer, tick all timers
void
llarp_timer_tick_all(struct llarp_timer_context *t,
struct llarp_threadpool *pool);
void
llarp_free_timer(struct llarp_timer_context **t);

@ -20,7 +20,7 @@ namespace llarp
StackBuffer(T& stack)
{
llarp_buffer_t buff;
buff.base = stack;
buff.base = &stack[0];
buff.cur = buff.base;
buff.sz = sizeof(stack);
return buff;

@ -65,11 +65,14 @@ llarp_config_iter(struct llarp_config *conf, struct llarp_config_iterator *iter)
{
iter->conf = conf;
std::map< std::string, llarp::Config::section_t & > sections = {
{"router", conf->impl.router},
{"network", conf->impl.network},
{"iwp-connect", conf->impl.connect},
{"iwp-links", conf->impl.iwp_links},
{"netdb", conf->impl.netdb}};
for(const auto item : conf->impl.router)
iter->visit(iter, "router", item.first.c_str(), item.second.c_str());
for(const auto section : sections)
for(const auto item : section.second)
iter->visit(iter, section.first.c_str(), item.first.c_str(),

@ -10,7 +10,8 @@
namespace llarp
{
Context::Context(std::ostream &stdout) : out(stdout)
Context::Context(std::ostream &stdout, bool singleThread)
: singleThreaded(singleThread), out(stdout)
{
llarp::Info(LLARP_VERSION, " ", LLARP_RELEASE_MOTTO);
}
@ -50,7 +51,7 @@ namespace llarp
Context *ctx = static_cast< Context * >(itr->user);
if(!strcmp(section, "router"))
{
if(!strcmp(key, "worker-threads"))
if(!strcmp(key, "worker-threads") && !ctx->singleThreaded)
{
int workers = atoi(val);
if(workers > 0 && ctx->worker == nullptr)
@ -63,6 +64,8 @@ namespace llarp
ctx->num_nethreads = atoi(val);
if(ctx->num_nethreads <= 0)
ctx->num_nethreads = 1;
if(ctx->singleThreaded)
ctx->num_nethreads = 0;
}
}
if(!strcmp(section, "netdb"))
@ -81,54 +84,61 @@ namespace llarp
llarp_ev_loop_alloc(&mainloop);
llarp_crypto_libsodium_init(&crypto);
nodedb = llarp_nodedb_new(&crypto);
if(nodedb_dir[0])
// ensure worker thread pool
if(!worker && !singleThreaded)
worker = llarp_init_threadpool(2, "llarp-worker");
else if(singleThreaded)
{
nodedb_dir[sizeof(nodedb_dir) - 1] = 0;
if(llarp_nodedb_ensure_dir(nodedb_dir))
{
// ensure worker thread pool
if(!worker)
worker = llarp_init_threadpool(2, "llarp-worker");
// ensure netio thread
logic = llarp_init_logic();
llarp::Info("running in single threaded mode");
worker = llarp_init_same_process_threadpool();
}
// ensure netio thread
if(singleThreaded)
{
logic = llarp_init_single_process_logic(worker);
}
else
logic = llarp_init_logic();
router = llarp_init_router(worker, mainloop, logic);
router = llarp_init_router(worker, mainloop, logic);
if(llarp_configure_router(router, config))
if(llarp_configure_router(router, config))
{
if(custom_dht_func)
{
llarp::Info("using custom dht function");
llarp_dht_set_msg_handler(router->dht, custom_dht_func);
}
llarp_run_router(router, nodedb);
// run net io thread
if(singleThreaded)
{
llarp::Info("running mainloop");
llarp_ev_loop_run_single_process(mainloop, worker, logic);
}
else
{
auto netio = mainloop;
while(num_nethreads--)
{
if(custom_dht_func)
{
llarp::Info("using custom dht function");
llarp_dht_set_msg_handler(router->dht, custom_dht_func);
}
llarp_run_router(router, nodedb);
// run net io thread
auto netio = mainloop;
while(num_nethreads--)
{
netio_threads.emplace_back([netio]() { llarp_ev_loop_run(netio); });
netio_threads.emplace_back([netio]() { llarp_ev_loop_run(netio); });
#if(__APPLE__ && __MACH__)
#elif(__FreeBSD__)
pthread_set_name_np(netio_threads.back().native_handle(),
"llarp-netio");
pthread_set_name_np(netio_threads.back().native_handle(),
"llarp-netio");
#else
pthread_setname_np(netio_threads.back().native_handle(),
"llarp-netio");
pthread_setname_np(netio_threads.back().native_handle(),
"llarp-netio");
#endif
}
llarp::Info("Ready");
llarp_logic_mainloop(logic);
return 0;
}
else
llarp::Error("failed to start router");
llarp::Info("running mainloop");
llarp_logic_mainloop(logic);
}
else
llarp::Error("Failed to initialize nodedb");
return 0;
}
else
llarp::Error("no nodedb defined");
llarp::Error("Failed to configure router");
return 1;
}
@ -224,13 +234,13 @@ struct llarp_main
};
struct llarp_main *
llarp_main_init(const char *fname)
llarp_main_init(const char *fname, bool multiProcess)
{
if(!fname)
fname = "daemon.ini";
llarp_main *m = new llarp_main;
m->ctx.reset(new llarp::Context(std::cout));
m->ctx.reset(new llarp::Context(std::cout, !multiProcess));
if(!m->ctx->LoadConfig(fname))
{
m->ctx->Close();

@ -323,6 +323,7 @@ namespace iwp
{
iwp_async_frame *frame = static_cast< iwp_async_frame * >(user);
frame->hook(frame);
delete frame;
}
void
@ -330,9 +331,9 @@ namespace iwp
{
iwp_async_frame *frame = static_cast< iwp_async_frame * >(user);
auto crypto = frame->iwp->crypto;
auto hmac = frame->buf;
auto nonce = frame->buf + 32;
auto body = frame->buf + 64;
byte_t *hmac = frame->buf;
byte_t *nonce = frame->buf + 32;
byte_t *body = frame->buf + 64;
llarp_sharedkey_t digest;
@ -351,7 +352,7 @@ namespace iwp
buf.sz = frame->sz - 64;
crypto->xchacha20(buf, frame->sessionkey, nonce);
// inform result
llarp_logic_queue_job(frame->iwp->logic, {user, &inform_frame_done});
llarp_logic_queue_job(frame->iwp->logic, {frame, &inform_frame_done});
}
void
@ -359,9 +360,9 @@ namespace iwp
{
iwp_async_frame *frame = static_cast< iwp_async_frame * >(user);
auto crypto = frame->iwp->crypto;
auto hmac = frame->buf;
auto nonce = frame->buf + 32;
auto body = frame->buf + 64;
byte_t *hmac = frame->buf;
byte_t *nonce = frame->buf + 32;
byte_t *body = frame->buf + 64;
llarp_buffer_t buf;
buf.base = body;
@ -377,8 +378,9 @@ namespace iwp
buf.cur = buf.base;
buf.sz = frame->sz - 32;
crypto->hmac(hmac, buf, frame->sessionkey);
// inform result
llarp_logic_queue_job(frame->iwp->logic, {user, &inform_frame_done});
// call result RIGHT HERE
frame->hook(frame);
delete frame;
}
}

@ -441,6 +441,7 @@ namespace llarp
Context::CleanupTX()
{
auto now = llarp_time_now_ms();
llarp::Debug("DHT tick");
std::set< TXOwner > expired;
for(auto &item : pendingTX)
@ -454,10 +455,11 @@ namespace llarp
if(e.requester != ourKey)
{
// inform not found
auto msg = new llarp::DHTImmeidateMessage(e.requester);
msg->msgs.push_back(
llarp::DHTImmeidateMessage msg(e.requester);
msg.msgs.push_back(
new GotRouterMessage(e.requester, e.txid, nullptr));
router->SendToOrQueue(e.requester, {msg});
llarp::Info("DHT reply to ", e.requester);
router->SendTo(e.requester, &msg);
}
}

@ -0,0 +1,9 @@
#include <llarp/messages/discard.hpp>
namespace llarp
{
DiscardMessage::~DiscardMessage()
{
llarp::Debug("~DiscardMessage");
}
}

@ -1,14 +1,15 @@
#include <llarp/ev.h>
#include <llarp/logic.h>
#include "mem.hpp"
#ifdef __linux__
# include "ev_epoll.hpp"
#include "ev_epoll.hpp"
#endif
#if (__APPLE__ && __MACH__)
# include "ev_kqueue.hpp"
#if(__APPLE__ && __MACH__)
#include "ev_kqueue.hpp"
#endif
#ifdef __FreeBSD__
# include "ev_kqueue.hpp"
#include "ev_kqueue.hpp"
#endif
extern "C" {
@ -19,7 +20,7 @@ llarp_ev_loop_alloc(struct llarp_ev_loop **ev)
#ifdef __linux__
*ev = new llarp_epoll_loop;
#endif
#if (__APPLE__ && __MACH__)
#if(__APPLE__ && __MACH__)
*ev = new llarp_kqueue_loop;
#endif
#ifdef __FreeBSD__
@ -41,6 +42,20 @@ llarp_ev_loop_run(struct llarp_ev_loop *ev)
return ev->run();
}
void
llarp_ev_loop_run_single_process(struct llarp_ev_loop *ev,
struct llarp_threadpool *tp,
struct llarp_logic *logic)
{
while(true)
{
if(ev->tick(10) == -1)
return;
llarp_logic_tick(logic);
llarp_threadpool_tick(tp);
}
}
int
llarp_ev_add_udp(struct llarp_ev_loop *ev, struct llarp_udp_io *udp,
const struct sockaddr *src)

@ -27,6 +27,10 @@ struct llarp_ev_loop
init() = 0;
virtual int
run() = 0;
virtual int
tick(int ms) = 0;
virtual void
stop() = 0;

@ -100,6 +100,40 @@ struct llarp_epoll_loop : public llarp_ev_loop
return false;
}
int
tick(int ms)
{
epoll_event events[1024];
int result;
byte_t readbuf[2048];
result = epoll_wait(epollfd, events, 1024, ms);
if(result > 0)
{
int idx = 0;
while(idx < result)
{
// handle signalfd
if(events[idx].data.fd == pipefds[0])
{
llarp::Debug("exiting epoll loop");
return 0;
}
llarp::ev_io* ev = static_cast< llarp::ev_io* >(events[idx].data.ptr);
if(events[idx].events & EPOLLIN)
{
if(ev->read(readbuf, sizeof(readbuf)) == -1)
{
llarp::Debug("close ev");
close_ev(ev);
}
}
++idx;
}
}
return result;
}
int
run()
{

@ -79,7 +79,7 @@ namespace llarp
struct llarp_kqueue_loop : public llarp_ev_loop
{
int kqueuefd;
struct kevent change; /* event we want to monitor */
struct kevent change; /* event we want to monitor */
llarp_kqueue_loop() : kqueuefd(-1)
{
@ -92,12 +92,40 @@ struct llarp_kqueue_loop : public llarp_ev_loop
bool
init()
{
if (kqueuefd == -1) {
if(kqueuefd == -1)
{
kqueuefd = kqueue();
}
return kqueuefd != -1;
}
int
tick(int ms)
{
(void)ms;
struct kevent events[1024];
int result;
byte_t readbuf[2048];
result = kevent(kqueuefd, NULL, 0, events, 1024, NULL);
// result: 0 is a timeout
if(result > 0)
{
int idx = 0;
while(idx < result)
{
llarp::ev_io* ev = static_cast< llarp::ev_io* >(events[idx].udata);
if(ev->read(readbuf, sizeof(readbuf)) == -1)
{
llarp::Info(__FILE__, "close ev");
close_ev(ev);
delete ev;
}
++idx;
}
}
return result;
}
int
run()
{
@ -108,7 +136,7 @@ struct llarp_kqueue_loop : public llarp_ev_loop
{
result = kevent(kqueuefd, NULL, 0, events, 1024, NULL);
// result: 0 is a timeout
if (result > 0)
if(result > 0)
{
int idx = 0;
while(idx < result)
@ -202,7 +230,8 @@ struct llarp_kqueue_loop : public llarp_ev_loop
llarp::udp_listener* listener = new llarp::udp_listener(fd, l);
EV_SET(&change, fd, EVFILT_READ, EV_ADD, 0, 0, listener);
if (kevent(kqueuefd, &change, 1, NULL, 0, NULL) == -1) {
if(kevent(kqueuefd, &change, 1, NULL, 0, NULL) == -1)
{
delete listener;
return false;
}

@ -42,7 +42,33 @@ namespace iwp
eFRAG = 0x03
};
typedef std::vector< byte_t > sendbuf_t;
struct sendbuf_t
{
sendbuf_t(size_t s) : sz(s)
{
buf = new byte_t[s];
}
~sendbuf_t()
{
delete[] buf;
}
byte_t *buf;
size_t sz;
size_t
size() const
{
return sz;
}
byte_t *
data()
{
return buf;
}
};
enum header_flag
{
@ -107,15 +133,14 @@ namespace iwp
};
byte_t *
init_sendbuf(sendbuf_t &buf, msgtype t, uint16_t sz, uint8_t flags)
init_sendbuf(sendbuf_t *buf, msgtype t, uint16_t sz, uint8_t flags)
{
buf.resize(6 + sz);
frame_header hdr(buf.data());
frame_header hdr(buf->data());
hdr.version() = 0;
hdr.msgtype() = t;
hdr.setsize(sz);
buf[4] = 0;
buf[5] = flags;
buf->data()[4] = 0;
buf->data()[5] = flags;
return hdr.data();
}
@ -124,9 +149,7 @@ namespace iwp
{
byte_t buffer[48];
xmit()
{
}
xmit() = default;
xmit(byte_t *ptr)
{
@ -213,28 +236,25 @@ namespace iwp
}
};
typedef std::vector< uint8_t > fragment_t;
// forward declare
struct session;
struct server;
struct transit_message
{
session *parent = nullptr;
xmit msginfo;
std::bitset< 32 > status;
std::bitset< 32 > status = {};
std::map< uint8_t, fragment_t > frags;
fragment_t lastfrag;
typedef std::vector< byte_t > fragment_t;
transit_message()
{
}
std::unordered_map< byte_t, fragment_t > frags;
fragment_t lastfrag;
~transit_message()
void
clear()
{
frags.clear();
lastfrag.clear();
}
// calculate acked bitmask
@ -251,6 +271,13 @@ namespace iwp
return bitmask;
}
// outbound
transit_message(llarp_buffer_t buf, const byte_t *hash, uint64_t id,
uint16_t mtu = 1024)
{
put_message(buf, hash, id, mtu);
}
// inbound
transit_message(const xmit &x) : msginfo(x)
{
@ -264,11 +291,6 @@ namespace iwp
status.reset();
}
// outbound
transit_message(session *s) : parent(s)
{
}
/// ack packets based off a bitmask
void
ack(uint32_t bitmask)
@ -307,10 +329,9 @@ namespace iwp
void
generate_xmit(T &queue, byte_t flags = 0)
{
queue.emplace();
auto &xmitbuf = queue.back();
auto body_ptr = init_sendbuf(
xmitbuf, eXMIT, sizeof(msginfo.buffer) + lastfrag.size(), flags);
uint16_t sz = lastfrag.size() + sizeof(msginfo.buffer);
queue.push(new sendbuf_t(sz + 6));
auto body_ptr = init_sendbuf(queue.back(), eXMIT, sz, flags);
memcpy(body_ptr, msginfo.buffer, sizeof(msginfo.buffer));
body_ptr += sizeof(msginfo.buffer);
memcpy(body_ptr, lastfrag.data(), lastfrag.size());
@ -326,9 +347,10 @@ namespace iwp
{
if(status.test(frag.first))
continue;
queue.emplace();
auto &fragbuf = queue.back();
auto body_ptr = init_sendbuf(fragbuf, eFRAG, 9 + fragsize, flags);
uint16_t sz = 9 + fragsize;
queue.push(new sendbuf_t(sz + 6));
auto body_ptr = init_sendbuf(queue.back(), eFRAG, sz, flags);
// TODO: assumes big endian
memcpy(body_ptr, &msgid, 8);
body_ptr[8] = frag.first;
memcpy(body_ptr + 9, frag.second.data(), fragsize);
@ -402,10 +424,10 @@ namespace iwp
uint64_t rxids = 0;
uint64_t txids = 0;
llarp_time_t lastEvent = 0;
std::map< uint64_t, transit_message * > rx;
std::map< uint64_t, transit_message * > tx;
std::unordered_map< uint64_t, transit_message * > rx;
std::unordered_map< uint64_t, transit_message * > tx;
typedef std::queue< sendbuf_t > sendqueue_t;
typedef std::queue< sendbuf_t * > sendqueue_t;
llarp_router *router = nullptr;
llarp_link_session *parent = nullptr;
@ -422,11 +444,13 @@ namespace iwp
void
clear()
{
for(auto &item : rx)
auto _rx = rx;
auto _tx = tx;
for(auto &item : _rx)
delete item.second;
rx.clear();
for(auto &item : tx)
for(auto &item : _tx)
delete item.second;
rx.clear();
tx.clear();
}
@ -437,16 +461,15 @@ namespace iwp
push_ackfor(uint64_t id, uint32_t bitmask)
{
llarp::Debug("ACK for msgid=", id, " mask=", bitmask);
sendqueue.emplace();
auto &buf = sendqueue.back();
init_sendbuf(buf, eACKS, 12, txflags);
sendqueue.push(new sendbuf_t(12 + 6));
auto body_ptr = init_sendbuf(sendqueue.back(), eACKS, 12, txflags);
// TODO: this assumes big endian
memcpy(buf.data() + 6, &id, 8);
memcpy(buf.data() + 14, &bitmask, 4);
memcpy(body_ptr, &id, 8);
memcpy(body_ptr + 8, &bitmask, 4);
}
bool
got_xmit(frame_header &hdr, size_t sz)
got_xmit(frame_header hdr, size_t sz)
{
if(hdr.size() > sz)
{
@ -504,7 +527,7 @@ namespace iwp
}
bool
got_frag(frame_header &hdr, size_t sz)
got_frag(frame_header hdr, size_t sz)
{
if(hdr.size() > sz)
{
@ -561,33 +584,37 @@ namespace iwp
}
bool
got_acks(frame_header &hdr, size_t sz);
got_acks(frame_header hdr, size_t sz);
// queue new outbound message
void
queue_tx(uint64_t id, transit_message *msg)
{
auto itr = tx.emplace(id, msg);
if(itr.second)
tx[id] = msg;
msg->generate_xmit(sendqueue, txflags);
}
void
retransmit()
{
for(auto &item : tx)
{
msg->generate_xmit(sendqueue, txflags);
item.second->retransmit_frags(sendqueue, txflags);
}
else // duplicate
delete msg;
}
// get next frame to encrypt and transmit
bool
next_frame(llarp_buffer_t &buf)
next_frame(llarp_buffer_t *buf)
{
auto left = sendqueue.size();
llarp::Debug("next frame, ", left, " frames left in send queue");
if(left)
{
auto &send = sendqueue.front();
buf.base = &send[0];
buf.cur = &send[0];
buf.sz = send.size();
sendbuf_t *send = sendqueue.front();
buf->base = send->data();
buf->cur = send->data();
buf->sz = send->size();
return true;
}
return false;
@ -596,11 +623,13 @@ namespace iwp
void
pop_next_frame()
{
sendbuf_t *buf = sendqueue.front();
sendqueue.pop();
delete buf;
}
bool
process(uint8_t *buf, size_t sz)
process(byte_t *buf, size_t sz)
{
frame_header hdr(buf);
if(hdr.flags() & eSessionInvalidated)
@ -703,12 +732,11 @@ namespace iwp
static bool
sendto(llarp_link_session *s, llarp_buffer_t msg)
{
session *self = static_cast< session * >(s->impl);
transit_message *m = new transit_message(self);
auto id = self->frame.txids++;
session *self = static_cast< session * >(s->impl);
auto id = self->frame.txids++;
llarp_shorthash_t digest;
self->crypto->shorthash(digest, msg);
m->put_message(msg, digest, id);
transit_message *m = new transit_message(msg, digest, id);
self->add_outbound_message(id, m);
return true;
}
@ -731,7 +759,7 @@ namespace iwp
pump()
{
llarp_buffer_t buf;
while(frame.next_frame(buf))
while(frame.next_frame(&buf))
{
encrypt_frame_async_send(buf.base, buf.sz);
frame.pop_next_frame();
@ -769,18 +797,7 @@ namespace iwp
}
static void
handle_verify_session_start(iwp_async_session_start *s)
{
session *self = static_cast< session * >(s->user);
if(!s->buf)
{
// verify fail
// TODO: remove session?
llarp::Warn("session start verify failed");
return;
}
self->send_LIM();
}
handle_verify_session_start(iwp_async_session_start *s);
void
send_LIM()
@ -794,14 +811,12 @@ namespace iwp
if(llarp::EncodeLIM(&buf, our_router))
{
// rewind message buffer
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
auto msg = new transit_message;
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
// hash message buffer
crypto->shorthash(digest, buf);
// put message buffer
auto id = frame.txids++;
msg->put_message(buf, digest, id);
auto id = frame.txids++;
auto msg = new transit_message(buf, digest, id);
// put into outbound send queue
add_outbound_message(id, msg);
EnterState(eLIMSent);
@ -836,7 +851,11 @@ namespace iwp
}
// send keepalive if we are established or a session is made
if(state == eEstablished || state == eLIMSent)
{
send_keepalive(this);
frame.retransmit();
pump();
}
// TODO: determine if we are too idle
return false;
@ -918,7 +937,7 @@ namespace iwp
if(introack->buf == nullptr)
{
// invalid signature
llarp::Error("introack verify failed");
llarp::Error("introack verify failed from ", link->addr);
return;
}
link->EnterState(eIntroAckRecv);
@ -931,7 +950,9 @@ namespace iwp
handle_generated_session_start(iwp_async_session_start *start)
{
session *link = static_cast< session * >(start->user);
llarp_ev_udp_sendto(link->udp, link->addr, start->buf, start->sz);
if(llarp_ev_udp_sendto(link->udp, link->addr, start->buf, start->sz)
== -1)
llarp::Error("sendto failed");
link->EnterState(eSessionStartSent);
}
@ -966,6 +987,7 @@ namespace iwp
{
session *self = static_cast< session * >(frame->user);
llarp::Debug("rx ", frame->sz, " frames=", self->frames);
self->frames--;
if(frame->success)
{
if(self->frame.process(frame->buf + 64, frame->sz - 64))
@ -978,9 +1000,6 @@ namespace iwp
}
else
llarp::Error("decrypt frame fail");
delete frame;
--self->frames;
}
void
@ -988,8 +1007,8 @@ namespace iwp
{
if(sz > 64)
{
auto frame = alloc_frame(buf, sz);
frame->hook = &handle_frame_decrypt;
iwp_async_frame *frame = alloc_frame(buf, sz);
frame->hook = &handle_frame_decrypt;
iwp_call_async_frame_decrypt(iwp, frame);
}
else
@ -1001,9 +1020,10 @@ namespace iwp
{
session *self = static_cast< session * >(frame->user);
llarp::Debug("tx ", frame->sz, " frames=", self->frames);
llarp_ev_udp_sendto(self->udp, self->addr, frame->buf, frame->sz);
delete frame;
--self->frames;
if(llarp_ev_udp_sendto(self->udp, self->addr, frame->buf, frame->sz)
== -1)
llarp::Warn("sendto failed");
self->frames--;
}
iwp_async_frame *
@ -1013,13 +1033,13 @@ namespace iwp
if(sz > 1500)
return nullptr;
iwp_async_frame *frame = new iwp_async_frame;
iwp_async_frame *frame = new iwp_async_frame();
if(buf)
memcpy(frame->buf, buf, sz);
frame->sz = sz;
frame->user = this;
frame->sessionkey = sessionkey;
++frames;
frames++;
return frame;
}
@ -1027,8 +1047,12 @@ namespace iwp
encrypt_frame_async_send(const void *buf, size_t sz)
{
// 64 bytes frame overhead for nonce and hmac
auto frame = alloc_frame(nullptr, sz + 64);
iwp_async_frame *frame = alloc_frame(nullptr, sz + 64);
memcpy(frame->buf + 64, buf, sz);
auto padding = rand() % MAX_PAD;
if(padding)
crypto->randbytes(frame->buf + 64 + sz, padding);
frame->sz += padding;
frame->hook = &handle_frame_encrypt;
iwp_call_async_frame_encrypt(iwp, frame);
}
@ -1039,7 +1063,7 @@ namespace iwp
session *self = static_cast< session * >(intro->user);
if(!intro->buf)
{
llarp::Error("intro verify failed");
llarp::Error("intro verify failed from ", self->addr);
delete self;
return;
}
@ -1251,8 +1275,9 @@ namespace iwp
HasSessionToRouter(llarp_link *l, const byte_t *pubkey)
{
server *serv = static_cast< server * >(l->impl);
llarp::pubkey pk(pubkey);
lock_t lock(serv->m_Connected_Mutex);
return serv->m_Connected.find(pubkey) != serv->m_Connected.end();
return serv->m_Connected.find(pk) != serv->m_Connected.end();
}
void
@ -1262,12 +1287,11 @@ namespace iwp
{
lock_t lock(m_sessions_Mutex);
std::set< llarp::Addr > remove;
auto itr = m_sessions.begin();
while(itr != m_sessions.end())
for(auto &itr : m_sessions)
{
if(static_cast< session * >(itr->second.impl)->Tick(now))
remove.insert(itr->first);
++itr;
session *s = static_cast< session * >(itr.second.impl);
if(s && s->Tick(now))
remove.insert(itr.first);
}
for(const auto &addr : remove)
@ -1288,7 +1312,7 @@ namespace iwp
auto inner_itr = serv->m_sessions.find(itr->second);
if(inner_itr != serv->m_sessions.end())
{
auto link = &inner_itr->second;
llarp_link_session *link = &inner_itr->second;
return link->sendto(link, buf);
}
}
@ -1457,6 +1481,7 @@ namespace iwp
{
// new inbound session
s = link->create_session(*saddr, link->seckey);
llarp::Debug("new inbound session from ", s->addr);
}
s->recv(buf, sz);
}
@ -1551,22 +1576,17 @@ namespace iwp
return;
}
// all zeros means keepalive
byte_t tmp[MAX_PAD + 8] = {0};
byte_t tmp[8] = {0};
// set flags for tx
frame_header hdr(tmp);
hdr.flags() = self->frame.txflags;
// 8 bytes iwp header overhead
int padsz = rand() % (sizeof(tmp) - 8);
auto buf = llarp::StackBuffer< decltype(tmp) >(tmp);
if(padsz)
self->crypto->randbytes(buf.base + 8, padsz);
buf.sz -= padsz;
// send frame after encrypting
auto buf = llarp::StackBuffer< decltype(tmp) >(tmp);
self->encrypt_frame_async_send(buf.base, buf.sz);
}
bool
frame_state::got_acks(frame_header &hdr, size_t sz)
frame_state::got_acks(frame_header hdr, size_t sz)
{
if(hdr.size() > sz)
{
@ -1593,29 +1613,46 @@ namespace iwp
return false;
}
itr->second->ack(bitmask);
transit_message *msg = itr->second;
msg->ack(bitmask);
if(itr->second->completed())
if(msg->completed())
{
llarp::Debug("message transmitted msgid=", msgid);
delete itr->second;
tx.erase(itr);
session *impl = static_cast< session * >(parent->impl);
if(impl->state == session::eLIMSent && msgid == 0)
{
// first message acked we are established?
impl->session_established();
}
tx.erase(msgid);
delete msg;
}
else
{
llarp::Debug("message ", msgid, " retransmit fragments");
itr->second->retransmit_frags(sendqueue);
msg->retransmit_frags(sendqueue, txflags);
}
return true;
}
void
session::handle_verify_session_start(iwp_async_session_start *s)
{
session *self = static_cast< session * >(s->user);
if(!s->buf)
{
// verify fail
// TODO: remove session?
llarp::Warn("session start verify failed from ", self->addr);
self->serv->RemoveSessionByAddr(self->addr);
return;
}
self->send_LIM();
}
server *
link_alloc(struct llarp_router *router, const char *keyfile,
struct llarp_crypto *crypto, struct llarp_logic *logic,
@ -1723,7 +1760,7 @@ namespace iwp
link->timeout_job_id = 0;
link->logic = logic;
// start cleanup timer
link->issue_cleanup_timer(1000);
link->issue_cleanup_timer(2500);
return true;
}
@ -1741,11 +1778,17 @@ namespace iwp
link_iter_sessions(struct llarp_link *l, struct llarp_link_session_iter iter)
{
server *link = static_cast< server * >(l->impl);
iter.link = l;
// TODO: race condition with cleanup timer
for(auto &item : link->m_sessions)
if(!iter.visit(&iter, &item.second))
return;
auto sz = link->m_sessions.size();
if(sz)
{
llarp::Debug("we have ", sz, "sessions");
iter.link = l;
// TODO: race condition with cleanup timer
for(auto &item : link->m_sessions)
if(item.second.impl)
if(!iter.visit(&iter, &item.second))
return;
}
}
bool

@ -22,6 +22,24 @@ llarp_init_logic()
return logic;
};
struct llarp_logic*
llarp_init_single_process_logic(struct llarp_threadpool* tp)
{
llarp_logic* logic = new llarp_logic;
if(logic)
{
logic->thread = tp;
logic->timer = llarp_init_timer();
}
return logic;
}
void
llarp_logic_tick(struct llarp_logic* logic)
{
llarp_timer_tick_all(logic->timer, logic->thread);
}
void
llarp_free_logic(struct llarp_logic** logic)
{
@ -58,13 +76,20 @@ llarp_logic_mainloop(struct llarp_logic* logic)
void
llarp_logic_queue_job(struct llarp_logic* logic, struct llarp_thread_job job)
{
llarp_threadpool_queue_job(logic->thread, job);
llarp_thread_job j;
j.user = job.user;
j.work = job.work;
llarp_threadpool_queue_job(logic->thread, j);
}
uint32_t
llarp_logic_call_later(struct llarp_logic* logic, struct llarp_timeout_job job)
{
return llarp_timer_call_later(logic->timer, job);
llarp_timeout_job j;
j.user = job.user;
j.timeout = job.timeout;
j.handler = job.handler;
return llarp_timer_call_later(logic->timer, j);
}
void

@ -178,12 +178,25 @@ namespace llarp
return ntohs(_addr.sin6_port);
}
bool
operator<(const Addr& other) const
{
return port() < other.port() || *addr6() < *other.addr6()
|| af() < other.af();
}
bool
operator==(const Addr& other) const
{
return af() == other.af() && memcmp(addr6(), other.addr6(), 16) == 0
&& port() == other.port();
}
bool
operator!=(const Addr& other) const
{
return !(*this == other);
}
};
struct addrhash
@ -192,7 +205,7 @@ namespace llarp
operator()(Addr const& a) const noexcept
{
uint8_t empty[16] = {0};
return a.af() + memcmp(a.addr6(), empty, 16) + a.port();
return (a.af() + memcmp(a.addr6(), empty, 16)) ^ a.port();
}
};
}

@ -52,8 +52,13 @@ llarp_router::SendToOrQueue(const llarp::RouterID &remote,
std::vector< llarp::ILinkMessage * > msgs)
{
bool has = false;
for(auto &link : links)
for(auto &item : links)
{
if(!item.second)
continue;
auto link = item.first;
has |= link->has_session_to(link, remote);
}
if(!has)
{
@ -147,9 +152,9 @@ llarp_router::EnsureIdentity()
}
void
llarp_router::AddLink(struct llarp_link *link)
llarp_router::AddLink(struct llarp_link *link, bool isOutbound)
{
links.push_back(link);
links.push_back({link, isOutbound});
ready = true;
}
@ -189,8 +194,9 @@ llarp_router::SaveRC()
void
llarp_router::Close()
{
for(auto &link : links)
for(auto &pair : links)
{
auto link = pair.first;
link->stop_link(link);
link->free_impl(link);
delete link;
@ -204,7 +210,8 @@ llarp_router::connect_job_retry(void *user)
llarp_link_establish_job *job =
static_cast< llarp_link_establish_job * >(user);
llarp::Info("trying to establish session again");
llarp::Addr remote = job->ai;
llarp::Info("trying to establish session again with ", remote);
job->link->try_establish(job->link, job);
}
@ -281,11 +288,17 @@ void
llarp_router::Tick()
{
llarp::Debug("tick router");
llarp_link_session_iter iter;
iter.user = this;
iter.visit = &send_padded_message;
if(sendPadding)
{
for(auto &link : links)
for(auto &item : links)
{
link->iter_sessions(link, {this, nullptr, &send_padded_message});
if(!item.second)
continue;
auto link = item.first;
link->iter_sessions(link, iter);
}
}
}
@ -294,12 +307,46 @@ bool
llarp_router::send_padded_message(llarp_link_session_iter *itr,
llarp_link_session *peer)
{
auto msg = new llarp::DiscardMessage({}, 4096);
llarp_router *self = static_cast< llarp_router * >(itr->user);
self->SendToOrQueue(peer->get_remote_router(peer)->pubkey, {msg});
llarp::RouterID remote;
remote = &peer->get_remote_router(peer)->pubkey[0];
for(size_t idx = 0; idx < 50; ++idx)
{
llarp::DiscardMessage msg(9000);
self->SendTo(remote, &msg);
}
return true;
}
void
llarp_router::SendTo(llarp::RouterID remote, llarp::ILinkMessage *msg)
{
llarp_buffer_t buf =
llarp::StackBuffer< decltype(linkmsg_buffer) >(linkmsg_buffer);
if(!msg->BEncode(&buf))
{
llarp::Warn("failed to encode outbound message, buffer size left: ",
llarp_buffer_size_left(buf));
return;
}
// set size of message
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
bool sent = false;
for(auto &item : links)
{
if(!item.second)
continue;
if(!sent)
{
auto link = item.first;
sent = link->sendto(link, remote, buf);
}
}
}
void
llarp_router::ScheduleTicker(uint64_t ms)
{
@ -322,6 +369,7 @@ llarp_router::SessionClosed(const llarp::RouterID &remote)
void
llarp_router::FlushOutboundFor(const llarp::RouterID &remote)
{
llarp::Debug("Flush outbound for ", remote);
auto itr = outboundMesssageQueue.find(remote);
if(itr == outboundMesssageQueue.end())
return;
@ -343,12 +391,17 @@ llarp_router::FlushOutboundFor(const llarp::RouterID &remote)
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
bool sent = false;
for(auto &link : links)
llarp::RouterID peer = remote;
bool sent = false;
for(auto &item : links)
{
if(!sent)
if(item.second)
{
sent = link->sendto(link, remote, buf);
if(!sent)
{
auto link = item.first;
sent = link->sendto(link, peer, buf);
}
}
}
if(!sent)
@ -366,8 +419,11 @@ llarp_router::on_try_connect_result(llarp_link_establish_job *job)
llarp_router *router = static_cast< llarp_router * >(job->user);
if(job->session)
{
delete job;
/*
auto session = job->session;
router->async_verify_RC(session, false, job);
*/
return;
}
llarp::Info("session not established");
@ -399,8 +455,11 @@ llarp_router::Run()
llarp::Zero(&rc, sizeof(llarp_rc));
// fill our address list
rc.addrs = llarp_ai_list_new();
for(auto link : links)
for(auto &item : links)
{
if(item.second)
continue;
auto link = item.first;
llarp_ai addr;
link->get_our_address(link, &addr);
llarp_ai_list_pushback(rc.addrs, &addr);
@ -421,8 +480,9 @@ llarp_router::Run()
llarp_dht_context_start(dht, ourPubkey);
// start links
for(auto link : links)
for(auto &item : links)
{
auto link = item.first;
int result = link->start_link(link, logic);
if(result == -1)
llarp::Warn("Link ", link->name(), " failed to start");
@ -474,7 +534,11 @@ llarp_init_router(struct llarp_threadpool *tp, struct llarp_ev_loop *netloop,
router->tp = tp;
router->logic = logic;
// TODO: make disk io threadpool count configurable
#ifdef TESTNET
router->disk = tp;
#else
router->disk = llarp_init_threadpool(1, "llarp-diskio");
#endif
llarp_crypto_libsodium_init(&router->crypto);
}
return router;
@ -508,11 +572,22 @@ llarp_router_try_connect(struct llarp_router *router, struct llarp_rc *remote)
llarp_ai addr;
if(llarp_ai_list_index(remote->addrs, 0, &addr))
{
llarp_router_iterate_links(router,
{&addr, &llarp_router::iter_try_connect});
return true;
for(auto &item : router->links)
{
if(!item.second)
continue;
auto link = item.first;
llarp_link_establish_job *job = new llarp_link_establish_job;
llarp_ai_copy(&job->ai, &addr);
job->timeout = 10000;
job->result = &llarp_router::on_try_connect_result;
// give router as user pointer
job->user = router;
link->try_establish(link, job);
return true;
}
}
return false;
}
@ -553,10 +628,12 @@ bool
llarp_findOrCreateIdentity(llarp_crypto *crypto, const char *fpath,
byte_t *secretkey)
{
llarp::Debug("find or create ", fpath);
fs::path path(fpath);
std::error_code ec;
if(!fs::exists(path, ec))
{
llarp::Info("regenerated identity key");
crypto->identity_keygen(secretkey);
std::ofstream f(path, std::ios::binary);
if(f.is_open())
@ -570,6 +647,7 @@ llarp_findOrCreateIdentity(llarp_crypto *crypto, const char *fpath,
f.read((char *)secretkey, sizeof(llarp_seckey_t));
return true;
}
llarp::Info("failed to get identity key");
return false;
}
@ -619,9 +697,10 @@ void
llarp_router_iterate_links(struct llarp_router *router,
struct llarp_router_link_iter i)
{
for(auto link : router->links)
if(!i.visit(&i, router, link))
return;
for(auto item : router->links)
if(item.second)
if(!i.visit(&i, router, item.first))
return;
}
void
@ -677,13 +756,10 @@ namespace llarp
iwp_link_init(link, args);
if(llarp_link_initialized(link))
{
// printf("router -> link initialized\n");
llarp::Info("link ", key, " initialized");
if(link->configure(link, self->netloop, key, af, proto))
{
llarp_ai ai;
link->get_our_address(link, &ai);
llarp::Addr addr = ai;
self->AddLink(link);
self->AddLink(link, llarp::StrEq(key, "*"));
return;
}
if(af == AF_INET6)
@ -697,7 +773,7 @@ namespace llarp
llarp_ai ai;
link->get_our_address(link, &ai);
llarp::Addr addr = ai;
self->AddLink(link);
self->AddLink(link, llarp::StrEq(key, "*"));
return;
}
}

@ -68,7 +68,7 @@ struct llarp_router
llarp::InboundMessageParser inbound_msg_parser;
std::list< llarp_link * > links;
std::list< std::pair< llarp_link *, bool > > links;
typedef std::queue< llarp::ILinkMessage * > MessageQueue;
@ -86,7 +86,7 @@ struct llarp_router
HandleRecvLinkMessage(struct llarp_link_session *from, llarp_buffer_t msg);
void
AddLink(struct llarp_link *link);
AddLink(struct llarp_link *link, bool isOutbound);
void
Close();
@ -119,6 +119,10 @@ struct llarp_router
SendToOrQueue(const llarp::RouterID &remote,
std::vector< llarp::ILinkMessage * > msgs);
/// sendto or drop
void
SendTo(llarp::RouterID remote, llarp::ILinkMessage *msg);
/// manually flush outbound message queue for just 1 router
void
FlushOutboundFor(const llarp::RouterID &remote);

@ -0,0 +1,6 @@
#ifdef SHADOW_TESTNET
#include <llarp.h>
/** insert shadow test network specific code here */
#endif

@ -1,6 +1,10 @@
#include "threadpool.hpp"
#include <pthread.h>
#include <cstring>
#include <llarp/time.h>
#include <queue>
#include "logger.hpp"
#if(__FreeBSD__)
@ -29,18 +33,24 @@ namespace llarp
}
for(;;)
{
llarp_thread_job job;
llarp_thread_job *job;
{
lock_t lock(this->queue_mutex);
this->condition.wait(
lock, [this] { return this->stop || !this->jobs.empty(); });
if(this->stop && this->jobs.empty())
return;
job = std::move(this->jobs.front());
job = this->jobs.front();
this->jobs.pop_front();
}
auto now = llarp_time_now_ms();
// do work
job.work(job.user);
job->work(job->user);
auto after = llarp_time_now_ms();
auto dlt = after - now;
if(dlt > 1)
llarp::Warn("work took ", dlt, " ms");
delete job;
}
});
}
@ -75,7 +85,7 @@ namespace llarp
if(stop)
return;
jobs.emplace_back(job);
jobs.push_back(new llarp_thread_job(job.user, job.work));
}
condition.notify_one();
}
@ -85,9 +95,16 @@ namespace llarp
struct llarp_threadpool
{
llarp::thread::Pool impl;
llarp::thread::Pool *impl;
std::queue< llarp_thread_job > jobs;
llarp_threadpool(int workers, const char *name) : impl(workers, name)
llarp_threadpool(int workers, const char *name)
: impl(new llarp::thread::Pool(workers, name))
{
}
llarp_threadpool() : impl(nullptr)
{
}
};
@ -103,11 +120,18 @@ llarp_init_threadpool(int workers, const char *name)
return nullptr;
}
struct llarp_threadpool *
llarp_init_same_process_threadpool()
{
return new llarp_threadpool();
}
void
llarp_threadpool_join(struct llarp_threadpool *pool)
{
llarp::Debug("threadpool join");
pool->impl.Join();
if(pool->impl)
pool->impl->Join();
}
void
@ -119,7 +143,8 @@ void
llarp_threadpool_stop(struct llarp_threadpool *pool)
{
llarp::Debug("threadpool stop");
pool->impl.Stop();
if(pool->impl)
pool->impl->Stop();
}
void
@ -127,9 +152,10 @@ llarp_threadpool_wait(struct llarp_threadpool *pool)
{
std::mutex mtx;
llarp::Debug("threadpool wait");
if(pool->impl)
{
std::unique_lock< std::mutex > lock(mtx);
pool->impl.done.wait(lock);
pool->impl->done.wait(lock);
}
}
@ -137,7 +163,21 @@ void
llarp_threadpool_queue_job(struct llarp_threadpool *pool,
struct llarp_thread_job job)
{
pool->impl.QueueJob(job);
if(pool->impl)
pool->impl->QueueJob(job);
else
pool->jobs.push(job);
}
void
llarp_threadpool_tick(struct llarp_threadpool *pool)
{
while(pool->jobs.size())
{
auto &job = pool->jobs.front();
job.work(job.user);
pool->jobs.pop();
}
}
void

@ -25,7 +25,7 @@ namespace llarp
void
Stop();
std::vector< std::thread > threads;
std::deque< llarp_thread_job > jobs;
std::deque< llarp_thread_job* > jobs;
mtx_t queue_mutex;
std::condition_variable condition;

@ -3,7 +3,7 @@
namespace llarp
{
typedef std::chrono::steady_clock clock_t;
typedef std::chrono::system_clock clock_t;
template < typename Res, typename IntType >
static IntType

@ -1,8 +1,9 @@
#include <llarp/time.h>
#include <llarp/timer.h>
#include <atomic>
#include <condition_variable>
#include <list>
#include <map>
#include <unordered_map>
#include "logger.hpp"
@ -10,41 +11,28 @@ namespace llarp
{
struct timer
{
static uint64_t
now()
{
return std::chrono::duration_cast< std::chrono::milliseconds >(
std::chrono::steady_clock::now().time_since_epoch())
.count();
}
void* user;
uint64_t called_at;
uint64_t started;
uint64_t timeout;
llarp_timer_handler_func func;
bool done;
bool canceled;
timer(uint64_t ms = 0, void* _user = nullptr,
llarp_timer_handler_func _func = nullptr)
: user(_user)
, called_at(0)
, started(now())
, started(llarp_time_now_ms())
, timeout(ms)
, func(_func)
, done(false)
, canceled(false)
{
}
timer&
operator=(const timer& other)
~timer()
{
user = other.user;
called_at = other.called_at;
started = other.started;
timeout = other.timeout;
func = other.func;
return *this;
}
void
@ -56,25 +44,31 @@ namespace llarp
static_cast< timer* >(user)->exec();
}
operator llarp_thread_job()
void
send_job(llarp_threadpool* pool)
{
return {this, timer::call};
llarp_threadpool_queue_job(pool, {this, timer::call});
}
};
}; // namespace llarp
struct llarp_timer_context
{
llarp_threadpool* threadpool;
std::mutex timersMutex;
std::map< uint32_t, llarp::timer > timers;
std::unordered_map< uint32_t, llarp::timer* > timers;
std::mutex tickerMutex;
std::condition_variable ticker;
std::chrono::milliseconds nextTickLen = std::chrono::milliseconds(10);
std::condition_variable* ticker = nullptr;
std::chrono::milliseconds nextTickLen = std::chrono::milliseconds(100);
uint32_t ids = 0;
bool _run = true;
~llarp_timer_context()
{
if(ticker)
delete ticker;
}
bool
run()
{
@ -90,16 +84,11 @@ struct llarp_timer_context
void
cancel(uint32_t id)
{
llarp::timer t;
{
std::unique_lock< std::mutex > lock(timersMutex);
auto itr = timers.find(id);
if(itr == timers.end())
return;
t = itr->second;
}
t.called_at = llarp::timer::now();
t.exec();
std::unique_lock< std::mutex > lock(timersMutex);
auto itr = timers.find(id);
if(itr == timers.end())
return;
itr->second->canceled = true;
}
void
@ -107,8 +96,10 @@ struct llarp_timer_context
{
std::unique_lock< std::mutex > lock(timersMutex);
auto itr = timers.find(id);
if(itr != timers.end())
timers.erase(itr);
if(itr == timers.end())
return;
itr->second->func = nullptr;
itr->second->canceled = true;
}
uint32_t
@ -116,7 +107,7 @@ struct llarp_timer_context
{
std::unique_lock< std::mutex > lock(timersMutex);
uint32_t id = ++ids;
timers.emplace(id, std::move(llarp::timer(timeout_ms, user, func)));
timers[id] = new llarp::timer(timeout_ms, user, func);
return id;
}
@ -175,11 +166,10 @@ llarp_timer_stop(struct llarp_timer_context* t)
{
// destroy all timers
// don't call callbacks on timers
llarp::Debug("clear timers");
t->timers.clear();
t->stop();
llarp::Debug("stop timers");
t->ticker.notify_all();
if(t->ticker)
t->ticker->notify_all();
}
void
@ -188,46 +178,59 @@ llarp_timer_cancel_job(struct llarp_timer_context* t, uint32_t id)
t->cancel(id);
}
void
llarp_timer_tick_all(struct llarp_timer_context* t,
struct llarp_threadpool* pool)
{
if(!t->run())
return;
auto now = llarp_time_now_ms();
auto itr = t->timers.begin();
while(itr != t->timers.end())
{
if(now - itr->second->started >= itr->second->timeout
|| itr->second->canceled)
{
if(itr->second->func && itr->second->called_at == 0)
{
// timer hit
itr->second->called_at = now;
itr->second->send_job(pool);
++itr;
}
else if(itr->second->done)
{
// remove timer
llarp::timer* timer = itr->second;
itr = t->timers.erase(itr);
delete timer;
}
else
++itr;
}
else // timer not hit yet
++itr;
}
}
void
llarp_timer_run(struct llarp_timer_context* t, struct llarp_threadpool* pool)
{
t->threadpool = pool;
t->ticker = new std::condition_variable;
while(t->run())
{
// wait for timer mutex
if(t->ticker)
{
std::unique_lock< std::mutex > lock(t->tickerMutex);
t->ticker.wait_for(lock, t->nextTickLen);
t->ticker->wait_for(lock, t->nextTickLen);
}
if(t->run())
{
std::unique_lock< std::mutex > lock(t->timersMutex);
// we woke up
auto now = llarp::timer::now();
auto itr = t->timers.begin();
while(itr != t->timers.end())
{
if(now - itr->second.started >= itr->second.timeout)
{
if(itr->second.func)
{
// timer hit
itr->second.called_at = now;
llarp_threadpool_queue_job(pool, itr->second);
++itr;
}
else if(itr->second.done)
{
// timer was already called, remove timer
itr = t->timers.erase(itr);
}
else
++itr;
}
else // timer not hit yet
++itr;
}
llarp_timer_tick_all(t, pool);
}
}
}
@ -249,7 +252,7 @@ namespace llarp
call(user, timeout, 0);
else
call(user, timeout, diff);
done = true;
}
done = true;
}
}

Loading…
Cancel
Save