diff --git a/.gitignore b/.gitignore index c6ddf23f0..24c0a2cec 100644 --- a/.gitignore +++ b/.gitignore @@ -24,4 +24,7 @@ callgrind.* *.sig *.signed -*.key \ No newline at end of file +*.key + +shadow.config.xml +*.log \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index d491e802b..83111cb80 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,17 +1,51 @@ cmake_minimum_required(VERSION 2.8.10) -set(WITH_SHARED OFF) -set(DEBUG_FLAGS "") +macro(add_cflags) + set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${ARGN}") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${ARGN}") +endmacro(add_cflags) + +if(SHADOW) + set(WITH_STATIC OFF) + set(WITH_SHARED OFF) +else() + set(WITH_STATIC ON) + set(WITH_SHARED OFF) +endif() + +set(DEBUG_FLAGS "-g") set(OPTIMIZE_FLAGS "-Os") if(ASAN) - set(DEBUG_FLAGS "${DEBUG_FLAGS} -g -fsanitize=address -fno-omit-frame-pointer") + set(DEBUG_FLAGS "${DEBUG_FLAGS} -fsanitize=address -fno-omit-frame-pointer") set(OPTIMIZE_FLAGS "-O0") endif(ASAN) +if(SHADOW) + if("${SHADOW_ROOT}" STREQUAL "") + set(SHADOW_ROOT "$ENV{HOME}/.shadow") + endif("${SHADOW_ROOT}" STREQUAL "") + if(EXISTS "${SHADOW_ROOT}") + message(STATUS "SHADOW_ROOT = ${SHADOW_ROOT}") + else() + message(FATAL_ERROR "SHADOW_ROOT path does not exist: '${SHADOW_ROOT}'") + endif() + set(CMAKE_MODULE_PATH "${SHADOW_ROOT}/share/cmake/Modules") + include_directories(${CMAKE_MODULE_PATH}) + include(ShadowTools) + add_cflags("-fno-inline -fno-strict-aliasing") + + add_definitions(-DTESTNET=true) + add_definitions(-DSHADOW_TESTNET) + include_directories(${SHADOW_ROOT}/include) +endif() set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -std=c11 -Wall ${DEBUG_FLAGS} ${OPTIMIZE_FLAGS}") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++17 -Wall ${DEBUG_FLAGS} ${OPTIMIZE_FLAGS}") +if(SHADOW) + add_cflags("-fPIC") +endif() + if(NOT GIT_VERSION) exec_program("git" ${CMAKE_CURRENT_SOURCE_DIR} ARGS "rev-parse --short HEAD" OUTPUT_VARIABLE GIT_VERSION) add_definitions(-DGIT_REV="${GIT_VERSION}") @@ -22,7 +56,7 @@ if(RELEASE_MOTTO) endif() set(EXE llarpd) -set(EXE_SRC daemon/main.cpp) +set(EXE_SRC daemon/main.c) if(SODIUM_INCLUDE_DIR) include_directories(${SODIUM_INCLUDE_DIR}) @@ -66,21 +100,39 @@ set(LIB_SRC llarp/router.cpp llarp/router_identity.c llarp/threadpool.cpp + llarp/testnet.c llarp/time.cpp llarp/timer.cpp ) include_directories(include) -add_library(${STATIC_LIB} STATIC ${LIB_SRC}) -if(WITH_SHARED) - add_library(${SHARED_LIB} SHARED ${LIB_SRC}) - target_link_libraries(${SHARED_LIB} ${LIBS}) + +if(SHADOW) + add_shadow_plugin(shadow-plugin-${SHARED_LIB} ${EXE_SRC} ${LIB_SRC}) + target_link_libraries(shadow-plugin-${SHARED_LIB} ${LIBS}) + install(TARGETS shadow-plugin-${SHARED_LIB} DESTINATION plugins) +else() + + add_executable(rcutil daemon/rcutil.cpp) + add_executable(${EXE} ${EXE_SRC}) + + if(WITH_STATIC) + add_library(${STATIC_LIB} STATIC ${LIB_SRC}) + target_link_libraries(${STATIC_LIB} ${LIBS}) + if(NOT WITH_SHARED) + target_link_libraries(rcutil ${STATIC_LIB}) + target_link_libraries(${EXE} ${STATIC_LIB}) + endif() + endif() + + if(WITH_SHARED) + add_library(${SHARED_LIB} SHARED ${LIB_SRC}) + target_link_libraries(${SHARED_LIB} ${LIBS}) + if(NOT WITH_STATIC) + target_link_libraries(rcutil ${SHARED_LIB}) + target_link_libraries(${EXE} ${SHARED_LIB}) + endif() + endif() + endif() - -add_executable(${EXE} ${EXE_SRC}) -target_link_libraries(${STATIC_LIB} ${LIBS}) -target_link_libraries(${EXE} ${STATIC_LIB}) - -add_executable(rcutil daemon/rcutil.cpp) -target_link_libraries(rcutil ${STATIC_LIB} ${LIBS}) diff --git a/Makefile b/Makefile index b1f87e6f9..04992db71 100644 --- a/Makefile +++ b/Makefile @@ -6,24 +6,24 @@ SIGN = gpg --sign --detach TARGETS = llarpd libllarp.so libllarp-static.a SIGS = $(TARGETS:=.sig) +SHADOW_ROOT ?= $(HOME)/.shadow +SHADOW_BIN=$(SHADOW_ROOT)/bin/shadow +SHADOW_CONFIG=shadow.config.xml +SHADOW_PLUGIN=libshadow-plugin-llarp.so + clean: rm -f build.ninja rules.ninja cmake_install.cmake CMakeCache.txt rm -rf CMakeFiles rm -f $(TARGETS) + rm -f $(SHADOW_PLUGIN) $(SHADOW_CONFIG) rm -f *.sig debug-configure: clean - cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DASAN=true + cmake -GNinja -DCMAKE_BUILD_TYPE=Debug release-configure: clean cmake -GNinja -DCMAKE_BUILD_TYPE=Release -DRELEASE_MOTTO="$(shell cat motto.txt)" -configure: clean - cmake -GNinja -DCMAKE_BUILD_TYPE=Debug - -build: configure - ninja - debug: debug-configure ninja @@ -38,5 +38,16 @@ $(TARGETS): release-compile release: $(SIGS) +shadow-configure: clean + cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DSHADOW=ON + +shadow-build: shadow-configure + ninja clean + ninja + +shadow: shadow-build + python3 contrib/shadow/genconf.py $(SHADOW_CONFIG) + bash -c "$(SHADOW_BIN) -w 16 $(SHADOW_CONFIG) &> shadow.log.txt" + format: clang-format -i $$(find daemon llarp include | grep -E '\.[h,c](pp)?$$') diff --git a/contrib/shadow/genconf.py b/contrib/shadow/genconf.py new file mode 100644 index 000000000..dfaec4c66 --- /dev/null +++ b/contrib/shadow/genconf.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python3 + +import configparser +import sys +import os + +from xml.etree import ElementTree as etree + +getSetting = lambda s, name, fallback : name in s and s[name] or fallback + +shadowRoot = getSetting(os.environ, "SHADOW_ROOT", os.path.join(os.environ['HOME'], '.shadow')) + +libpath = 'libshadow-plugin-llarp.so' + +def nodeconf(conf, baseDir, name, ifname, port): + conf['netdb'] = {'dir': 'tmp-nodes'} + conf['router'] = {} + conf['router']['contact-file'] = os.path.join(baseDir, '{}.signed'.format(name)) + conf['router']['ident-privkey'] = os.path.join(baseDir, '{}-ident.key'.format(name)) + conf['router']['transport-privkey'] = os.path.join(baseDir, '{}-transport.key'.format(name)) + if ifname != '*': + conf['iwp-links'] = {'*' : port + 1000, ifname: port} + else: + conf['iwp-links'] = {ifname : port} + conf['iwp-connect'] = {} + +def addPeer(conf, baseDir, peer): + conf['iwp-connect'][peer] = os.path.join(baseDir, '{}.signed'.format(peer)) + +def createNode(pluginName, root, peer): + node = etree.SubElement(root, 'host') + node.attrib['id'] = peer['name'] + node.attrib['interfacebuffer'] = '{}'.format(1024 * 1024 * 100) + app = etree.SubElement(node, 'process') + app.attrib['plugin'] = pluginName + app.attrib['time'] = '50' + app.attrib['arguments'] = peer['configfile'] + + +def makeBase(settings, name, id): + return { + 'id': id, + 'name' : name, + 'contact-file' : os.path.join(getSetting(settings, 'baseDir', 'tmp'), '{}.signed'.format(name)), + 'configfile' : os.path.join(getSetting(settings, 'baseDir', 'tmp'), '{}.ini'.format(name)), + 'config': configparser.ConfigParser() + } + +def makeClient(settings, name, id, port): + peer = makeBase(settings, name, id) + nodeconf(peer['config'], getSetting(settings, 'baseDir', 'tmp'), name, '*', port) + return peer + +def makeSVCNode(settings, name, id, port): + peer = makeBase(settings, name, id) + nodeconf(peer['config'], getSetting(settings, 'baseDir', 'tmp'), name, 'eth0', port) + return peer + + +def genconf(settings, outf): + root = etree.Element('shadow') + topology = etree.SubElement(root, 'topology') + topology.attrib['path'] = getSetting(settings, 'topology', os.path.join(shadowRoot, 'share', 'topology.graphml.xml')) + + + pluginName = getSetting(settings, 'name', 'llarpd') + + kill = etree.SubElement(root, 'kill') + kill.attrib['time'] = getSetting(settings, 'runFor', '600') + + baseDir = getSetting(settings, 'baseDir', 'tmp') + + if not os.path.exists(baseDir): + os.mkdir(baseDir) + + plugin = etree.SubElement(root, "plugin") + plugin.attrib['id'] = pluginName + plugin.attrib['path'] = libpath + basePort = getSetting(settings, 'svc-base-port', 19000) + clientBasePort = getSetting(settings, 'client-base-port', 18000) + svcNodeCount = getSetting(settings, 'service-nodes', 20) + peers = list() + for nodeid in range(svcNodeCount): + peers.append(makeSVCNode(settings, 'svc-node-{}'.format(nodeid), str(nodeid), basePort + 1)) + basePort += 1 + + # make all service nodes know each other + for peer in peers: + for nodeid in range(svcNodeCount): + if str(nodeid) != peer['id']: + addPeer(peer['config'], baseDir, 'svc-node-{}'.format(nodeid)) + + # add client nodes + for nodeid in range(getSetting(settings, 'client-nodes', 200)): + peer = makeClient(settings, 'client-node-{}'.format(nodeid), str(nodeid), clientBasePort +1) + peers.append(peer) + for p in range(getSetting(settings, 'client-connect-to', 3)): + addPeer(peer['config'], baseDir, 'svc-node-{}'.format((p + nodeid) % svcNodeCount)) + clientBasePort += 1 + + + # generate xml and settings files + for peer in peers: + createNode(pluginName, root, peer) + + with open(peer['configfile'], 'w') as f: + peer['config'].write(f) + + # render + outf.write(etree.tostring(root).decode('utf-8')) + +if __name__ == '__main__': + settings = { + 'topology': os.path.join(shadowRoot, 'share', 'topology.graphml.xml') + } + with open(sys.argv[1], 'w') as f: + genconf(settings, f) diff --git a/daemon.ini b/daemon.ini index c967b314a..e6f794483 100644 --- a/daemon.ini +++ b/daemon.ini @@ -8,9 +8,11 @@ ident-privkey=server-ident.key dir=./tmp-nodes [iwp-connect] +i2p.rocks=i2p.rocks.signed #other=other.signed #named-node1=/path/to/routercontact1 [iwp-links] -lo=1090 +*=1091 +#lo=1090 #lo=eth diff --git a/daemon/main.cpp b/daemon/main.c similarity index 81% rename from daemon/main.cpp rename to daemon/main.c index 3c8f930a8..b1c0c9eb6 100644 --- a/daemon/main.cpp +++ b/daemon/main.c @@ -1,6 +1,5 @@ #include #include -#include struct llarp_main *ctx = 0; @@ -11,13 +10,17 @@ handle_signal(int sig) llarp_main_signal(ctx, sig); } +#ifndef TESTNET +#define TESTNET 0 +#endif + int main(int argc, char *argv[]) { const char *conffname = "daemon.ini"; if(argc > 1) conffname = argv[1]; - ctx = llarp_main_init(conffname); + ctx = llarp_main_init(conffname, !TESTNET); int code = 1; if(ctx) { diff --git a/include/llarp.h b/include/llarp.h index 6f109d115..db00ef7c8 100644 --- a/include/llarp.h +++ b/include/llarp.h @@ -17,7 +17,7 @@ struct llarp_main; /** initialize application context and load config */ struct llarp_main * -llarp_main_init(const char *fname); +llarp_main_init(const char *fname, bool multiProcess); /** handle signal for main context */ void diff --git a/include/llarp.hpp b/include/llarp.hpp index badb69238..4848b5bea 100644 --- a/include/llarp.hpp +++ b/include/llarp.hpp @@ -11,10 +11,11 @@ namespace llarp { struct Context { - Context(std::ostream &stdout); + Context(std::ostream &stdout, bool signleThread = false); ~Context(); - int num_nethreads = 1; + int num_nethreads = 1; + bool singleThreaded = false; std::vector< std::thread > netio_threads; llarp_crypto crypto; llarp_router *router = nullptr; diff --git a/include/llarp/aligned.hpp b/include/llarp/aligned.hpp index 423e070c5..e789d2a0e 100644 --- a/include/llarp/aligned.hpp +++ b/include/llarp/aligned.hpp @@ -13,34 +13,22 @@ namespace llarp template < size_t sz > struct AlignedBuffer { - AlignedBuffer() - { - Zero(); - } - - AlignedBuffer(const AlignedBuffer& other) : AlignedBuffer(other.data()) - { - } + AlignedBuffer() = default; AlignedBuffer(const byte_t* data) { for(size_t idx = 0; idx < sz; ++idx) - buf.b[idx] = data[idx]; + b[idx] = data[idx]; } AlignedBuffer& operator=(const byte_t* data) { for(size_t idx = 0; idx < sz; ++idx) - buf.b[idx] = data[idx]; + b[idx] = data[idx]; return *this; } - byte_t& operator[](size_t idx) - { - return buf.b[idx]; - } - friend std::ostream& operator<<(std::ostream& out, const AlignedBuffer& self) { @@ -48,7 +36,7 @@ namespace llarp out << std::hex << std::setw(2) << std::setfill('0'); while(idx < sz) { - out << (int)self.buf.b[idx++]; + out << (int)self.b[idx++]; } return out << std::dec << std::setw(0) << std::setfill(' '); } @@ -75,54 +63,54 @@ namespace llarp Zero() { for(size_t idx = 0; sz < idx / 8; ++idx) - buf.l[idx] = 0; + l[idx] = 0; } void Randomize() { - randombytes(buf.b, sz); + randombytes(l, sz); } byte_t* data() { - return &buf.b[0]; + return &b[0]; } const byte_t* data() const { - return &buf.b[0]; + return &b[0]; } uint64_t* data_l() { - return &buf.l[0]; + return &l[0]; } const uint64_t* data_l() const { - return &buf.l[0]; + return &l[0]; } operator const byte_t*() const { - return &buf.b[0]; + return &b[0]; } operator byte_t*() { - return &buf.b[0]; + return &b[0]; } private: union { byte_t b[sz]; uint64_t l[sz / 8]; - } buf; + }; }; } diff --git a/include/llarp/crypto_async.h b/include/llarp/crypto_async.h index 73a7317ed..a72a2ffdb 100644 --- a/include/llarp/crypto_async.h +++ b/include/llarp/crypto_async.h @@ -186,13 +186,13 @@ struct iwp_async_frame /// a pointer to pass ourself void *user; /// current session key - uint8_t *sessionkey; + byte_t *sessionkey; /// size of the frame size_t sz; /// result handler iwp_async_frame_hook hook; /// memory holding the entire frame - uint8_t buf[1500]; + byte_t buf[1500]; }; /// decrypt iwp frame asynchronously diff --git a/include/llarp/dht.h b/include/llarp/dht.h index eefeb81a9..dcc320fee 100644 --- a/include/llarp/dht.h +++ b/include/llarp/dht.h @@ -52,7 +52,7 @@ struct llarp_router_lookup_job struct llarp_dht_context* dht; llarp_pubkey_t target; bool found; - llarp_rc result; + struct llarp_rc result; }; // shallow copy diff --git a/include/llarp/ev.h b/include/llarp/ev.h index 2f6c7a74d..1254a9211 100644 --- a/include/llarp/ev.h +++ b/include/llarp/ev.h @@ -17,6 +17,10 @@ extern "C" { #endif +// forward declare +struct llarp_threadpool; +struct llarp_logic; + struct llarp_ev_loop; /// allocator @@ -31,6 +35,11 @@ llarp_ev_loop_free(struct llarp_ev_loop **ev); int llarp_ev_loop_run(struct llarp_ev_loop *ev); +void +llarp_ev_loop_run_single_process(struct llarp_ev_loop *ev, + struct llarp_threadpool *tp, + struct llarp_logic *logic); + /// stop event loop and wait for it to complete all jobs void llarp_ev_loop_stop(struct llarp_ev_loop *ev); diff --git a/include/llarp/link.h b/include/llarp/link.h index 2bba0794d..10332b694 100644 --- a/include/llarp/link.h +++ b/include/llarp/link.h @@ -15,7 +15,7 @@ extern "C" { #endif /** 2^15 bytes */ -const size_t MAX_LINK_MSG_SIZE = 32768; +#define MAX_LINK_MSG_SIZE (32768) /** * wire layer transport interface diff --git a/include/llarp/link_message.hpp b/include/llarp/link_message.hpp index 5006f0b9c..7982d388e 100644 --- a/include/llarp/link_message.hpp +++ b/include/llarp/link_message.hpp @@ -23,6 +23,7 @@ namespace llarp RouterID remote = {}; uint64_t version = 0; + ILinkMessage() = default; ILinkMessage(const RouterID& id); virtual ~ILinkMessage(){}; diff --git a/include/llarp/logger.hpp b/include/llarp/logger.hpp index f4a85f8cb..2d9dc3225 100644 --- a/include/llarp/logger.hpp +++ b/include/llarp/logger.hpp @@ -19,7 +19,7 @@ namespace llarp struct Logger { - LogLevel minlevel = eLogDebug; + LogLevel minlevel = eLogInfo; std::ostream& out = std::cout; }; @@ -52,7 +52,7 @@ namespace llarp if(_glog.minlevel > lvl) return; - std::stringstream ss(""); + std::stringstream ss; switch(lvl) { case eLogDebug: @@ -72,19 +72,23 @@ namespace llarp ss << "[ERR] "; break; } - auto t = std::time(nullptr); - auto now = std::localtime(&t); + std::time_t t; + std::time(&t); std::string tag = fname; auto pos = tag.rfind('/'); if(pos != std::string::npos) tag = tag.substr(pos + 1); - - while(tag.size() % 8) - tag += " "; - ss << std::put_time(now, "%F %T") << " " << tag << "\t"; + ss << std::put_time(std::localtime(&t), "%F %T") << " " << tag; + auto sz = tag.size() % 8; + while(sz--) + ss << " "; + ss << "\t"; LogAppend(ss, std::forward< TArgs >(args)...); ss << (char)27 << "[0;0m"; _glog.out << ss.str() << std::endl; +#ifdef SHADOW_TESTNET + _glog.out << "\n" << std::flush; +#endif } } diff --git a/include/llarp/logic.h b/include/llarp/logic.h index a38b5ae16..9465ecf87 100644 --- a/include/llarp/logic.h +++ b/include/llarp/logic.h @@ -12,6 +12,14 @@ struct llarp_logic; struct llarp_logic* llarp_init_logic(); +/// single threaded mode logic event loop +struct llarp_logic* +llarp_init_single_process_logic(struct llarp_threadpool* tp); + +/// single threaded tick +void +llarp_logic_tick(struct llarp_logic* logic); + void llarp_free_logic(struct llarp_logic** logic); diff --git a/include/llarp/messages/discard.hpp b/include/llarp/messages/discard.hpp index 14e1f8982..4f88acfba 100644 --- a/include/llarp/messages/discard.hpp +++ b/include/llarp/messages/discard.hpp @@ -4,24 +4,22 @@ namespace llarp { + const std::size_t MAX_DISCARD_SIZE = 10000; + /// a dummy link message that is discarded struct DiscardMessage : public ILinkMessage { - std::vector< byte_t > Z; - - ~DiscardMessage() - { - } + byte_t pad[MAX_DISCARD_SIZE]; + size_t sz = 0; DiscardMessage(const RouterID& id) : ILinkMessage(id) { } - DiscardMessage(const RouterID& other, std::size_t padding) - : ILinkMessage(other) + DiscardMessage(std::size_t padding) : ILinkMessage() { - Z.resize(padding); - std::fill(Z.begin(), Z.end(), 'z'); + sz = padding; + memset(pad, 'z', sz); } virtual bool @@ -38,8 +36,10 @@ namespace llarp { if(!bencode_read_string(buf, &strbuf)) return false; - Z.resize(strbuf.sz); - memcpy(Z.data(), strbuf.base, strbuf.sz); + if(strbuf.sz > MAX_DISCARD_SIZE) + return false; + sz = strbuf.sz; + memcpy(pad, strbuf.base, sz); return true; } return false; @@ -61,7 +61,7 @@ namespace llarp if(!bencode_write_bytestring(buf, "z", 1)) return false; - if(!bencode_write_bytestring(buf, Z.data(), Z.size())) + if(!bencode_write_bytestring(buf, pad, sz)) return false; return bencode_end(buf); @@ -71,7 +71,7 @@ namespace llarp HandleMessage(llarp_router* router) const { (void)router; - llarp::Info("got discard message of size ", Z.size(), " bytes"); + llarp::Info("got discard message of size ", sz, " bytes"); return true; } }; diff --git a/include/llarp/messages/link_intro.hpp b/include/llarp/messages/link_intro.hpp index 38c9e2286..b373765ad 100644 --- a/include/llarp/messages/link_intro.hpp +++ b/include/llarp/messages/link_intro.hpp @@ -5,7 +5,7 @@ namespace llarp { struct LinkIntroMessage : public ILinkMessage { - LinkIntroMessage(llarp_rc* rc) : ILinkMessage({}), RC(rc) + LinkIntroMessage(llarp_rc* rc) : ILinkMessage(), RC(rc) { } diff --git a/include/llarp/router.h b/include/llarp/router.h index 9a0e6e690..8cd8c940c 100644 --- a/include/llarp/router.h +++ b/include/llarp/router.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include diff --git a/include/llarp/router_contact.h b/include/llarp/router_contact.h index 95f0b06de..575bf82aa 100644 --- a/include/llarp/router_contact.h +++ b/include/llarp/router_contact.h @@ -7,7 +7,10 @@ extern "C" { #endif -const size_t MAX_RC_SIZE = 1024; +// forward declare +struct llarp_alloc; + +#define MAX_RC_SIZE (1024) struct llarp_rc { diff --git a/include/llarp/threadpool.h b/include/llarp/threadpool.h index 53d387b63..bc97fa522 100644 --- a/include/llarp/threadpool.h +++ b/include/llarp/threadpool.h @@ -8,6 +8,11 @@ struct llarp_threadpool; struct llarp_threadpool * llarp_init_threadpool(int workers, const char *name); + +/// for single process mode +struct llarp_threadpool * +llarp_init_same_process_threadpool(); + void llarp_free_threadpool(struct llarp_threadpool **tp); @@ -20,8 +25,24 @@ struct llarp_thread_job void *user; /** called in threadpool worker thread */ llarp_thread_work_func work; + +#ifdef __cplusplus + + llarp_thread_job(void *u, llarp_thread_work_func w) : user(u), work(w) + { + } + + llarp_thread_job() : user(nullptr), work(nullptr) + { + } + +#endif }; +/// for single process mode +void +llarp_threadpool_tick(struct llarp_threadpool *tp); + void llarp_threadpool_queue_job(struct llarp_threadpool *tp, struct llarp_thread_job j); diff --git a/include/llarp/timer.h b/include/llarp/timer.h index 5e4e33a5b..f34b666c2 100644 --- a/include/llarp/timer.h +++ b/include/llarp/timer.h @@ -39,6 +39,11 @@ llarp_timer_stop(struct llarp_timer_context *t); void llarp_timer_run(struct llarp_timer_context *t, struct llarp_threadpool *pool); +/// single threaded run timer, tick all timers +void +llarp_timer_tick_all(struct llarp_timer_context *t, + struct llarp_threadpool *pool); + void llarp_free_timer(struct llarp_timer_context **t); diff --git a/llarp/buffer.hpp b/llarp/buffer.hpp index d27705a92..1e779fe41 100644 --- a/llarp/buffer.hpp +++ b/llarp/buffer.hpp @@ -20,7 +20,7 @@ namespace llarp StackBuffer(T& stack) { llarp_buffer_t buff; - buff.base = stack; + buff.base = &stack[0]; buff.cur = buff.base; buff.sz = sizeof(stack); return buff; diff --git a/llarp/config.cpp b/llarp/config.cpp index c0d96b605..68022b8fb 100644 --- a/llarp/config.cpp +++ b/llarp/config.cpp @@ -65,11 +65,14 @@ llarp_config_iter(struct llarp_config *conf, struct llarp_config_iterator *iter) { iter->conf = conf; std::map< std::string, llarp::Config::section_t & > sections = { - {"router", conf->impl.router}, {"network", conf->impl.network}, {"iwp-connect", conf->impl.connect}, {"iwp-links", conf->impl.iwp_links}, {"netdb", conf->impl.netdb}}; + + for(const auto item : conf->impl.router) + iter->visit(iter, "router", item.first.c_str(), item.second.c_str()); + for(const auto section : sections) for(const auto item : section.second) iter->visit(iter, section.first.c_str(), item.first.c_str(), diff --git a/llarp/context.cpp b/llarp/context.cpp index 96e9e0089..c7c1087a6 100644 --- a/llarp/context.cpp +++ b/llarp/context.cpp @@ -10,7 +10,8 @@ namespace llarp { - Context::Context(std::ostream &stdout) : out(stdout) + Context::Context(std::ostream &stdout, bool singleThread) + : singleThreaded(singleThread), out(stdout) { llarp::Info(LLARP_VERSION, " ", LLARP_RELEASE_MOTTO); } @@ -50,7 +51,7 @@ namespace llarp Context *ctx = static_cast< Context * >(itr->user); if(!strcmp(section, "router")) { - if(!strcmp(key, "worker-threads")) + if(!strcmp(key, "worker-threads") && !ctx->singleThreaded) { int workers = atoi(val); if(workers > 0 && ctx->worker == nullptr) @@ -63,6 +64,8 @@ namespace llarp ctx->num_nethreads = atoi(val); if(ctx->num_nethreads <= 0) ctx->num_nethreads = 1; + if(ctx->singleThreaded) + ctx->num_nethreads = 0; } } if(!strcmp(section, "netdb")) @@ -81,54 +84,61 @@ namespace llarp llarp_ev_loop_alloc(&mainloop); llarp_crypto_libsodium_init(&crypto); nodedb = llarp_nodedb_new(&crypto); - if(nodedb_dir[0]) + // ensure worker thread pool + if(!worker && !singleThreaded) + worker = llarp_init_threadpool(2, "llarp-worker"); + else if(singleThreaded) { - nodedb_dir[sizeof(nodedb_dir) - 1] = 0; - if(llarp_nodedb_ensure_dir(nodedb_dir)) + llarp::Info("running in single threaded mode"); + worker = llarp_init_same_process_threadpool(); + } + // ensure netio thread + if(singleThreaded) + { + logic = llarp_init_single_process_logic(worker); + } + else + logic = llarp_init_logic(); + + router = llarp_init_router(worker, mainloop, logic); + + if(llarp_configure_router(router, config)) + { + if(custom_dht_func) { - // ensure worker thread pool - if(!worker) - worker = llarp_init_threadpool(2, "llarp-worker"); - // ensure netio thread - logic = llarp_init_logic(); - - router = llarp_init_router(worker, mainloop, logic); - - if(llarp_configure_router(router, config)) + llarp::Info("using custom dht function"); + llarp_dht_set_msg_handler(router->dht, custom_dht_func); + } + llarp_run_router(router, nodedb); + // run net io thread + if(singleThreaded) + { + llarp::Info("running mainloop"); + llarp_ev_loop_run_single_process(mainloop, worker, logic); + } + else + { + auto netio = mainloop; + while(num_nethreads--) { - if(custom_dht_func) - { - llarp::Info("using custom dht function"); - llarp_dht_set_msg_handler(router->dht, custom_dht_func); - } - llarp_run_router(router, nodedb); - // run net io thread - auto netio = mainloop; - while(num_nethreads--) - { - netio_threads.emplace_back([netio]() { llarp_ev_loop_run(netio); }); + netio_threads.emplace_back([netio]() { llarp_ev_loop_run(netio); }); #if(__APPLE__ && __MACH__) #elif(__FreeBSD__) - pthread_set_name_np(netio_threads.back().native_handle(), - "llarp-netio"); + pthread_set_name_np(netio_threads.back().native_handle(), + "llarp-netio"); #else - pthread_setname_np(netio_threads.back().native_handle(), - "llarp-netio"); + pthread_setname_np(netio_threads.back().native_handle(), + "llarp-netio"); #endif - } - llarp::Info("Ready"); - llarp_logic_mainloop(logic); - return 0; } - else - llarp::Error("failed to start router"); + llarp::Info("running mainloop"); + llarp_logic_mainloop(logic); } - else - llarp::Error("Failed to initialize nodedb"); + return 0; } else - llarp::Error("no nodedb defined"); + llarp::Error("Failed to configure router"); return 1; } @@ -224,13 +234,13 @@ struct llarp_main }; struct llarp_main * -llarp_main_init(const char *fname) +llarp_main_init(const char *fname, bool multiProcess) { if(!fname) fname = "daemon.ini"; llarp_main *m = new llarp_main; - m->ctx.reset(new llarp::Context(std::cout)); + m->ctx.reset(new llarp::Context(std::cout, !multiProcess)); if(!m->ctx->LoadConfig(fname)) { m->ctx->Close(); diff --git a/llarp/crypto_async.cpp b/llarp/crypto_async.cpp index 5eb3c6400..720963543 100644 --- a/llarp/crypto_async.cpp +++ b/llarp/crypto_async.cpp @@ -323,6 +323,7 @@ namespace iwp { iwp_async_frame *frame = static_cast< iwp_async_frame * >(user); frame->hook(frame); + delete frame; } void @@ -330,9 +331,9 @@ namespace iwp { iwp_async_frame *frame = static_cast< iwp_async_frame * >(user); auto crypto = frame->iwp->crypto; - auto hmac = frame->buf; - auto nonce = frame->buf + 32; - auto body = frame->buf + 64; + byte_t *hmac = frame->buf; + byte_t *nonce = frame->buf + 32; + byte_t *body = frame->buf + 64; llarp_sharedkey_t digest; @@ -351,7 +352,7 @@ namespace iwp buf.sz = frame->sz - 64; crypto->xchacha20(buf, frame->sessionkey, nonce); // inform result - llarp_logic_queue_job(frame->iwp->logic, {user, &inform_frame_done}); + llarp_logic_queue_job(frame->iwp->logic, {frame, &inform_frame_done}); } void @@ -359,9 +360,9 @@ namespace iwp { iwp_async_frame *frame = static_cast< iwp_async_frame * >(user); auto crypto = frame->iwp->crypto; - auto hmac = frame->buf; - auto nonce = frame->buf + 32; - auto body = frame->buf + 64; + byte_t *hmac = frame->buf; + byte_t *nonce = frame->buf + 32; + byte_t *body = frame->buf + 64; llarp_buffer_t buf; buf.base = body; @@ -377,8 +378,9 @@ namespace iwp buf.cur = buf.base; buf.sz = frame->sz - 32; crypto->hmac(hmac, buf, frame->sessionkey); - // inform result - llarp_logic_queue_job(frame->iwp->logic, {user, &inform_frame_done}); + // call result RIGHT HERE + frame->hook(frame); + delete frame; } } diff --git a/llarp/dht.cpp b/llarp/dht.cpp index 2ae0478cc..fb121a161 100644 --- a/llarp/dht.cpp +++ b/llarp/dht.cpp @@ -441,6 +441,7 @@ namespace llarp Context::CleanupTX() { auto now = llarp_time_now_ms(); + llarp::Debug("DHT tick"); std::set< TXOwner > expired; for(auto &item : pendingTX) @@ -454,10 +455,11 @@ namespace llarp if(e.requester != ourKey) { // inform not found - auto msg = new llarp::DHTImmeidateMessage(e.requester); - msg->msgs.push_back( + llarp::DHTImmeidateMessage msg(e.requester); + msg.msgs.push_back( new GotRouterMessage(e.requester, e.txid, nullptr)); - router->SendToOrQueue(e.requester, {msg}); + llarp::Info("DHT reply to ", e.requester); + router->SendTo(e.requester, &msg); } } diff --git a/llarp/discard_message.cpp b/llarp/discard_message.cpp new file mode 100644 index 000000000..6e5239edb --- /dev/null +++ b/llarp/discard_message.cpp @@ -0,0 +1,9 @@ +#include + +namespace llarp +{ + DiscardMessage::~DiscardMessage() + { + llarp::Debug("~DiscardMessage"); + } +} diff --git a/llarp/ev.cpp b/llarp/ev.cpp index cef4efed0..0b99e04af 100644 --- a/llarp/ev.cpp +++ b/llarp/ev.cpp @@ -1,14 +1,15 @@ #include +#include #include "mem.hpp" #ifdef __linux__ -# include "ev_epoll.hpp" +#include "ev_epoll.hpp" #endif -#if (__APPLE__ && __MACH__) -# include "ev_kqueue.hpp" +#if(__APPLE__ && __MACH__) +#include "ev_kqueue.hpp" #endif #ifdef __FreeBSD__ -# include "ev_kqueue.hpp" +#include "ev_kqueue.hpp" #endif extern "C" { @@ -19,7 +20,7 @@ llarp_ev_loop_alloc(struct llarp_ev_loop **ev) #ifdef __linux__ *ev = new llarp_epoll_loop; #endif -#if (__APPLE__ && __MACH__) +#if(__APPLE__ && __MACH__) *ev = new llarp_kqueue_loop; #endif #ifdef __FreeBSD__ @@ -41,6 +42,20 @@ llarp_ev_loop_run(struct llarp_ev_loop *ev) return ev->run(); } +void +llarp_ev_loop_run_single_process(struct llarp_ev_loop *ev, + struct llarp_threadpool *tp, + struct llarp_logic *logic) +{ + while(true) + { + if(ev->tick(10) == -1) + return; + llarp_logic_tick(logic); + llarp_threadpool_tick(tp); + } +} + int llarp_ev_add_udp(struct llarp_ev_loop *ev, struct llarp_udp_io *udp, const struct sockaddr *src) diff --git a/llarp/ev.hpp b/llarp/ev.hpp index b944fd16f..3ad4eac6a 100644 --- a/llarp/ev.hpp +++ b/llarp/ev.hpp @@ -27,6 +27,10 @@ struct llarp_ev_loop init() = 0; virtual int run() = 0; + + virtual int + tick(int ms) = 0; + virtual void stop() = 0; diff --git a/llarp/ev_epoll.hpp b/llarp/ev_epoll.hpp index f15a7303d..9d0898e54 100644 --- a/llarp/ev_epoll.hpp +++ b/llarp/ev_epoll.hpp @@ -100,6 +100,40 @@ struct llarp_epoll_loop : public llarp_ev_loop return false; } + int + tick(int ms) + { + epoll_event events[1024]; + int result; + byte_t readbuf[2048]; + + result = epoll_wait(epollfd, events, 1024, ms); + if(result > 0) + { + int idx = 0; + while(idx < result) + { + // handle signalfd + if(events[idx].data.fd == pipefds[0]) + { + llarp::Debug("exiting epoll loop"); + return 0; + } + llarp::ev_io* ev = static_cast< llarp::ev_io* >(events[idx].data.ptr); + if(events[idx].events & EPOLLIN) + { + if(ev->read(readbuf, sizeof(readbuf)) == -1) + { + llarp::Debug("close ev"); + close_ev(ev); + } + } + ++idx; + } + } + return result; + } + int run() { diff --git a/llarp/ev_kqueue.hpp b/llarp/ev_kqueue.hpp index 3e3a42817..15e9ba399 100644 --- a/llarp/ev_kqueue.hpp +++ b/llarp/ev_kqueue.hpp @@ -79,7 +79,7 @@ namespace llarp struct llarp_kqueue_loop : public llarp_ev_loop { int kqueuefd; - struct kevent change; /* event we want to monitor */ + struct kevent change; /* event we want to monitor */ llarp_kqueue_loop() : kqueuefd(-1) { @@ -92,12 +92,40 @@ struct llarp_kqueue_loop : public llarp_ev_loop bool init() { - if (kqueuefd == -1) { + if(kqueuefd == -1) + { kqueuefd = kqueue(); } return kqueuefd != -1; } + int + tick(int ms) + { + (void)ms; + struct kevent events[1024]; + int result; + byte_t readbuf[2048]; + result = kevent(kqueuefd, NULL, 0, events, 1024, NULL); + // result: 0 is a timeout + if(result > 0) + { + int idx = 0; + while(idx < result) + { + llarp::ev_io* ev = static_cast< llarp::ev_io* >(events[idx].udata); + if(ev->read(readbuf, sizeof(readbuf)) == -1) + { + llarp::Info(__FILE__, "close ev"); + close_ev(ev); + delete ev; + } + ++idx; + } + } + return result; + } + int run() { @@ -108,7 +136,7 @@ struct llarp_kqueue_loop : public llarp_ev_loop { result = kevent(kqueuefd, NULL, 0, events, 1024, NULL); // result: 0 is a timeout - if (result > 0) + if(result > 0) { int idx = 0; while(idx < result) @@ -202,7 +230,8 @@ struct llarp_kqueue_loop : public llarp_ev_loop llarp::udp_listener* listener = new llarp::udp_listener(fd, l); EV_SET(&change, fd, EVFILT_READ, EV_ADD, 0, 0, listener); - if (kevent(kqueuefd, &change, 1, NULL, 0, NULL) == -1) { + if(kevent(kqueuefd, &change, 1, NULL, 0, NULL) == -1) + { delete listener; return false; } diff --git a/llarp/iwp_link.cpp b/llarp/iwp_link.cpp index ef193a262..2bfa36895 100644 --- a/llarp/iwp_link.cpp +++ b/llarp/iwp_link.cpp @@ -42,7 +42,33 @@ namespace iwp eFRAG = 0x03 }; - typedef std::vector< byte_t > sendbuf_t; + struct sendbuf_t + { + sendbuf_t(size_t s) : sz(s) + { + buf = new byte_t[s]; + } + + ~sendbuf_t() + { + delete[] buf; + } + + byte_t *buf; + size_t sz; + + size_t + size() const + { + return sz; + } + + byte_t * + data() + { + return buf; + } + }; enum header_flag { @@ -107,15 +133,14 @@ namespace iwp }; byte_t * - init_sendbuf(sendbuf_t &buf, msgtype t, uint16_t sz, uint8_t flags) + init_sendbuf(sendbuf_t *buf, msgtype t, uint16_t sz, uint8_t flags) { - buf.resize(6 + sz); - frame_header hdr(buf.data()); + frame_header hdr(buf->data()); hdr.version() = 0; hdr.msgtype() = t; hdr.setsize(sz); - buf[4] = 0; - buf[5] = flags; + buf->data()[4] = 0; + buf->data()[5] = flags; return hdr.data(); } @@ -124,9 +149,7 @@ namespace iwp { byte_t buffer[48]; - xmit() - { - } + xmit() = default; xmit(byte_t *ptr) { @@ -213,28 +236,25 @@ namespace iwp } }; - typedef std::vector< uint8_t > fragment_t; - // forward declare struct session; struct server; struct transit_message { - session *parent = nullptr; xmit msginfo; - std::bitset< 32 > status; + std::bitset< 32 > status = {}; - std::map< uint8_t, fragment_t > frags; + typedef std::vector< byte_t > fragment_t; + + std::unordered_map< byte_t, fragment_t > frags; fragment_t lastfrag; - transit_message() - { - } - - ~transit_message() + void + clear() { frags.clear(); + lastfrag.clear(); } // calculate acked bitmask @@ -251,6 +271,13 @@ namespace iwp return bitmask; } + // outbound + transit_message(llarp_buffer_t buf, const byte_t *hash, uint64_t id, + uint16_t mtu = 1024) + { + put_message(buf, hash, id, mtu); + } + // inbound transit_message(const xmit &x) : msginfo(x) { @@ -264,11 +291,6 @@ namespace iwp status.reset(); } - // outbound - transit_message(session *s) : parent(s) - { - } - /// ack packets based off a bitmask void ack(uint32_t bitmask) @@ -307,10 +329,9 @@ namespace iwp void generate_xmit(T &queue, byte_t flags = 0) { - queue.emplace(); - auto &xmitbuf = queue.back(); - auto body_ptr = init_sendbuf( - xmitbuf, eXMIT, sizeof(msginfo.buffer) + lastfrag.size(), flags); + uint16_t sz = lastfrag.size() + sizeof(msginfo.buffer); + queue.push(new sendbuf_t(sz + 6)); + auto body_ptr = init_sendbuf(queue.back(), eXMIT, sz, flags); memcpy(body_ptr, msginfo.buffer, sizeof(msginfo.buffer)); body_ptr += sizeof(msginfo.buffer); memcpy(body_ptr, lastfrag.data(), lastfrag.size()); @@ -326,9 +347,10 @@ namespace iwp { if(status.test(frag.first)) continue; - queue.emplace(); - auto &fragbuf = queue.back(); - auto body_ptr = init_sendbuf(fragbuf, eFRAG, 9 + fragsize, flags); + uint16_t sz = 9 + fragsize; + queue.push(new sendbuf_t(sz + 6)); + auto body_ptr = init_sendbuf(queue.back(), eFRAG, sz, flags); + // TODO: assumes big endian memcpy(body_ptr, &msgid, 8); body_ptr[8] = frag.first; memcpy(body_ptr + 9, frag.second.data(), fragsize); @@ -402,10 +424,10 @@ namespace iwp uint64_t rxids = 0; uint64_t txids = 0; llarp_time_t lastEvent = 0; - std::map< uint64_t, transit_message * > rx; - std::map< uint64_t, transit_message * > tx; + std::unordered_map< uint64_t, transit_message * > rx; + std::unordered_map< uint64_t, transit_message * > tx; - typedef std::queue< sendbuf_t > sendqueue_t; + typedef std::queue< sendbuf_t * > sendqueue_t; llarp_router *router = nullptr; llarp_link_session *parent = nullptr; @@ -422,11 +444,13 @@ namespace iwp void clear() { - for(auto &item : rx) + auto _rx = rx; + auto _tx = tx; + for(auto &item : _rx) + delete item.second; + for(auto &item : _tx) delete item.second; rx.clear(); - for(auto &item : tx) - delete item.second; tx.clear(); } @@ -437,16 +461,15 @@ namespace iwp push_ackfor(uint64_t id, uint32_t bitmask) { llarp::Debug("ACK for msgid=", id, " mask=", bitmask); - sendqueue.emplace(); - auto &buf = sendqueue.back(); - init_sendbuf(buf, eACKS, 12, txflags); + sendqueue.push(new sendbuf_t(12 + 6)); + auto body_ptr = init_sendbuf(sendqueue.back(), eACKS, 12, txflags); // TODO: this assumes big endian - memcpy(buf.data() + 6, &id, 8); - memcpy(buf.data() + 14, &bitmask, 4); + memcpy(body_ptr, &id, 8); + memcpy(body_ptr + 8, &bitmask, 4); } bool - got_xmit(frame_header &hdr, size_t sz) + got_xmit(frame_header hdr, size_t sz) { if(hdr.size() > sz) { @@ -504,7 +527,7 @@ namespace iwp } bool - got_frag(frame_header &hdr, size_t sz) + got_frag(frame_header hdr, size_t sz) { if(hdr.size() > sz) { @@ -561,33 +584,37 @@ namespace iwp } bool - got_acks(frame_header &hdr, size_t sz); + got_acks(frame_header hdr, size_t sz); // queue new outbound message void queue_tx(uint64_t id, transit_message *msg) { - auto itr = tx.emplace(id, msg); - if(itr.second) + tx[id] = msg; + msg->generate_xmit(sendqueue, txflags); + } + + void + retransmit() + { + for(auto &item : tx) { - msg->generate_xmit(sendqueue, txflags); + item.second->retransmit_frags(sendqueue, txflags); } - else // duplicate - delete msg; } // get next frame to encrypt and transmit bool - next_frame(llarp_buffer_t &buf) + next_frame(llarp_buffer_t *buf) { auto left = sendqueue.size(); llarp::Debug("next frame, ", left, " frames left in send queue"); if(left) { - auto &send = sendqueue.front(); - buf.base = &send[0]; - buf.cur = &send[0]; - buf.sz = send.size(); + sendbuf_t *send = sendqueue.front(); + buf->base = send->data(); + buf->cur = send->data(); + buf->sz = send->size(); return true; } return false; @@ -596,11 +623,13 @@ namespace iwp void pop_next_frame() { + sendbuf_t *buf = sendqueue.front(); sendqueue.pop(); + delete buf; } bool - process(uint8_t *buf, size_t sz) + process(byte_t *buf, size_t sz) { frame_header hdr(buf); if(hdr.flags() & eSessionInvalidated) @@ -703,12 +732,11 @@ namespace iwp static bool sendto(llarp_link_session *s, llarp_buffer_t msg) { - session *self = static_cast< session * >(s->impl); - transit_message *m = new transit_message(self); - auto id = self->frame.txids++; + session *self = static_cast< session * >(s->impl); + auto id = self->frame.txids++; llarp_shorthash_t digest; self->crypto->shorthash(digest, msg); - m->put_message(msg, digest, id); + transit_message *m = new transit_message(msg, digest, id); self->add_outbound_message(id, m); return true; } @@ -731,7 +759,7 @@ namespace iwp pump() { llarp_buffer_t buf; - while(frame.next_frame(buf)) + while(frame.next_frame(&buf)) { encrypt_frame_async_send(buf.base, buf.sz); frame.pop_next_frame(); @@ -769,18 +797,7 @@ namespace iwp } static void - handle_verify_session_start(iwp_async_session_start *s) - { - session *self = static_cast< session * >(s->user); - if(!s->buf) - { - // verify fail - // TODO: remove session? - llarp::Warn("session start verify failed"); - return; - } - self->send_LIM(); - } + handle_verify_session_start(iwp_async_session_start *s); void send_LIM() @@ -794,14 +811,12 @@ namespace iwp if(llarp::EncodeLIM(&buf, our_router)) { // rewind message buffer - buf.sz = buf.cur - buf.base; - buf.cur = buf.base; - auto msg = new transit_message; + buf.sz = buf.cur - buf.base; + buf.cur = buf.base; // hash message buffer crypto->shorthash(digest, buf); - // put message buffer - auto id = frame.txids++; - msg->put_message(buf, digest, id); + auto id = frame.txids++; + auto msg = new transit_message(buf, digest, id); // put into outbound send queue add_outbound_message(id, msg); EnterState(eLIMSent); @@ -836,7 +851,11 @@ namespace iwp } // send keepalive if we are established or a session is made if(state == eEstablished || state == eLIMSent) + { send_keepalive(this); + frame.retransmit(); + pump(); + } // TODO: determine if we are too idle return false; @@ -918,7 +937,7 @@ namespace iwp if(introack->buf == nullptr) { // invalid signature - llarp::Error("introack verify failed"); + llarp::Error("introack verify failed from ", link->addr); return; } link->EnterState(eIntroAckRecv); @@ -931,7 +950,9 @@ namespace iwp handle_generated_session_start(iwp_async_session_start *start) { session *link = static_cast< session * >(start->user); - llarp_ev_udp_sendto(link->udp, link->addr, start->buf, start->sz); + if(llarp_ev_udp_sendto(link->udp, link->addr, start->buf, start->sz) + == -1) + llarp::Error("sendto failed"); link->EnterState(eSessionStartSent); } @@ -966,6 +987,7 @@ namespace iwp { session *self = static_cast< session * >(frame->user); llarp::Debug("rx ", frame->sz, " frames=", self->frames); + self->frames--; if(frame->success) { if(self->frame.process(frame->buf + 64, frame->sz - 64)) @@ -978,9 +1000,6 @@ namespace iwp } else llarp::Error("decrypt frame fail"); - - delete frame; - --self->frames; } void @@ -988,8 +1007,8 @@ namespace iwp { if(sz > 64) { - auto frame = alloc_frame(buf, sz); - frame->hook = &handle_frame_decrypt; + iwp_async_frame *frame = alloc_frame(buf, sz); + frame->hook = &handle_frame_decrypt; iwp_call_async_frame_decrypt(iwp, frame); } else @@ -1001,9 +1020,10 @@ namespace iwp { session *self = static_cast< session * >(frame->user); llarp::Debug("tx ", frame->sz, " frames=", self->frames); - llarp_ev_udp_sendto(self->udp, self->addr, frame->buf, frame->sz); - delete frame; - --self->frames; + if(llarp_ev_udp_sendto(self->udp, self->addr, frame->buf, frame->sz) + == -1) + llarp::Warn("sendto failed"); + self->frames--; } iwp_async_frame * @@ -1013,13 +1033,13 @@ namespace iwp if(sz > 1500) return nullptr; - iwp_async_frame *frame = new iwp_async_frame; + iwp_async_frame *frame = new iwp_async_frame(); if(buf) memcpy(frame->buf, buf, sz); frame->sz = sz; frame->user = this; frame->sessionkey = sessionkey; - ++frames; + frames++; return frame; } @@ -1027,8 +1047,12 @@ namespace iwp encrypt_frame_async_send(const void *buf, size_t sz) { // 64 bytes frame overhead for nonce and hmac - auto frame = alloc_frame(nullptr, sz + 64); + iwp_async_frame *frame = alloc_frame(nullptr, sz + 64); memcpy(frame->buf + 64, buf, sz); + auto padding = rand() % MAX_PAD; + if(padding) + crypto->randbytes(frame->buf + 64 + sz, padding); + frame->sz += padding; frame->hook = &handle_frame_encrypt; iwp_call_async_frame_encrypt(iwp, frame); } @@ -1039,7 +1063,7 @@ namespace iwp session *self = static_cast< session * >(intro->user); if(!intro->buf) { - llarp::Error("intro verify failed"); + llarp::Error("intro verify failed from ", self->addr); delete self; return; } @@ -1251,8 +1275,9 @@ namespace iwp HasSessionToRouter(llarp_link *l, const byte_t *pubkey) { server *serv = static_cast< server * >(l->impl); + llarp::pubkey pk(pubkey); lock_t lock(serv->m_Connected_Mutex); - return serv->m_Connected.find(pubkey) != serv->m_Connected.end(); + return serv->m_Connected.find(pk) != serv->m_Connected.end(); } void @@ -1262,12 +1287,11 @@ namespace iwp { lock_t lock(m_sessions_Mutex); std::set< llarp::Addr > remove; - auto itr = m_sessions.begin(); - while(itr != m_sessions.end()) + for(auto &itr : m_sessions) { - if(static_cast< session * >(itr->second.impl)->Tick(now)) - remove.insert(itr->first); - ++itr; + session *s = static_cast< session * >(itr.second.impl); + if(s && s->Tick(now)) + remove.insert(itr.first); } for(const auto &addr : remove) @@ -1288,7 +1312,7 @@ namespace iwp auto inner_itr = serv->m_sessions.find(itr->second); if(inner_itr != serv->m_sessions.end()) { - auto link = &inner_itr->second; + llarp_link_session *link = &inner_itr->second; return link->sendto(link, buf); } } @@ -1457,6 +1481,7 @@ namespace iwp { // new inbound session s = link->create_session(*saddr, link->seckey); + llarp::Debug("new inbound session from ", s->addr); } s->recv(buf, sz); } @@ -1551,22 +1576,17 @@ namespace iwp return; } // all zeros means keepalive - byte_t tmp[MAX_PAD + 8] = {0}; + byte_t tmp[8] = {0}; // set flags for tx frame_header hdr(tmp); hdr.flags() = self->frame.txflags; - // 8 bytes iwp header overhead - int padsz = rand() % (sizeof(tmp) - 8); - auto buf = llarp::StackBuffer< decltype(tmp) >(tmp); - if(padsz) - self->crypto->randbytes(buf.base + 8, padsz); - buf.sz -= padsz; // send frame after encrypting + auto buf = llarp::StackBuffer< decltype(tmp) >(tmp); self->encrypt_frame_async_send(buf.base, buf.sz); } bool - frame_state::got_acks(frame_header &hdr, size_t sz) + frame_state::got_acks(frame_header hdr, size_t sz) { if(hdr.size() > sz) { @@ -1593,29 +1613,46 @@ namespace iwp return false; } - itr->second->ack(bitmask); + transit_message *msg = itr->second; - if(itr->second->completed()) + msg->ack(bitmask); + + if(msg->completed()) { llarp::Debug("message transmitted msgid=", msgid); - delete itr->second; - tx.erase(itr); session *impl = static_cast< session * >(parent->impl); if(impl->state == session::eLIMSent && msgid == 0) { // first message acked we are established? impl->session_established(); } + tx.erase(msgid); + delete msg; } else { llarp::Debug("message ", msgid, " retransmit fragments"); - itr->second->retransmit_frags(sendqueue); + msg->retransmit_frags(sendqueue, txflags); } return true; } + void + session::handle_verify_session_start(iwp_async_session_start *s) + { + session *self = static_cast< session * >(s->user); + if(!s->buf) + { + // verify fail + // TODO: remove session? + llarp::Warn("session start verify failed from ", self->addr); + self->serv->RemoveSessionByAddr(self->addr); + return; + } + self->send_LIM(); + } + server * link_alloc(struct llarp_router *router, const char *keyfile, struct llarp_crypto *crypto, struct llarp_logic *logic, @@ -1723,7 +1760,7 @@ namespace iwp link->timeout_job_id = 0; link->logic = logic; // start cleanup timer - link->issue_cleanup_timer(1000); + link->issue_cleanup_timer(2500); return true; } @@ -1741,11 +1778,17 @@ namespace iwp link_iter_sessions(struct llarp_link *l, struct llarp_link_session_iter iter) { server *link = static_cast< server * >(l->impl); - iter.link = l; - // TODO: race condition with cleanup timer - for(auto &item : link->m_sessions) - if(!iter.visit(&iter, &item.second)) - return; + auto sz = link->m_sessions.size(); + if(sz) + { + llarp::Debug("we have ", sz, "sessions"); + iter.link = l; + // TODO: race condition with cleanup timer + for(auto &item : link->m_sessions) + if(item.second.impl) + if(!iter.visit(&iter, &item.second)) + return; + } } bool diff --git a/llarp/logic.cpp b/llarp/logic.cpp index 48cd3b13b..c90898b6d 100644 --- a/llarp/logic.cpp +++ b/llarp/logic.cpp @@ -22,6 +22,24 @@ llarp_init_logic() return logic; }; +struct llarp_logic* +llarp_init_single_process_logic(struct llarp_threadpool* tp) +{ + llarp_logic* logic = new llarp_logic; + if(logic) + { + logic->thread = tp; + logic->timer = llarp_init_timer(); + } + return logic; +} + +void +llarp_logic_tick(struct llarp_logic* logic) +{ + llarp_timer_tick_all(logic->timer, logic->thread); +} + void llarp_free_logic(struct llarp_logic** logic) { @@ -58,13 +76,20 @@ llarp_logic_mainloop(struct llarp_logic* logic) void llarp_logic_queue_job(struct llarp_logic* logic, struct llarp_thread_job job) { - llarp_threadpool_queue_job(logic->thread, job); + llarp_thread_job j; + j.user = job.user; + j.work = job.work; + llarp_threadpool_queue_job(logic->thread, j); } uint32_t llarp_logic_call_later(struct llarp_logic* logic, struct llarp_timeout_job job) { - return llarp_timer_call_later(logic->timer, job); + llarp_timeout_job j; + j.user = job.user; + j.timeout = job.timeout; + j.handler = job.handler; + return llarp_timer_call_later(logic->timer, j); } void diff --git a/llarp/net.hpp b/llarp/net.hpp index 71d37dd33..ca1870c88 100644 --- a/llarp/net.hpp +++ b/llarp/net.hpp @@ -178,12 +178,25 @@ namespace llarp return ntohs(_addr.sin6_port); } + bool + operator<(const Addr& other) const + { + return port() < other.port() || *addr6() < *other.addr6() + || af() < other.af(); + } + bool operator==(const Addr& other) const { return af() == other.af() && memcmp(addr6(), other.addr6(), 16) == 0 && port() == other.port(); } + + bool + operator!=(const Addr& other) const + { + return !(*this == other); + } }; struct addrhash @@ -192,7 +205,7 @@ namespace llarp operator()(Addr const& a) const noexcept { uint8_t empty[16] = {0}; - return a.af() + memcmp(a.addr6(), empty, 16) + a.port(); + return (a.af() + memcmp(a.addr6(), empty, 16)) ^ a.port(); } }; } diff --git a/llarp/router.cpp b/llarp/router.cpp index b81c019d2..58ef076c6 100644 --- a/llarp/router.cpp +++ b/llarp/router.cpp @@ -52,8 +52,13 @@ llarp_router::SendToOrQueue(const llarp::RouterID &remote, std::vector< llarp::ILinkMessage * > msgs) { bool has = false; - for(auto &link : links) + for(auto &item : links) + { + if(!item.second) + continue; + auto link = item.first; has |= link->has_session_to(link, remote); + } if(!has) { @@ -147,9 +152,9 @@ llarp_router::EnsureIdentity() } void -llarp_router::AddLink(struct llarp_link *link) +llarp_router::AddLink(struct llarp_link *link, bool isOutbound) { - links.push_back(link); + links.push_back({link, isOutbound}); ready = true; } @@ -189,8 +194,9 @@ llarp_router::SaveRC() void llarp_router::Close() { - for(auto &link : links) + for(auto &pair : links) { + auto link = pair.first; link->stop_link(link); link->free_impl(link); delete link; @@ -204,7 +210,8 @@ llarp_router::connect_job_retry(void *user) llarp_link_establish_job *job = static_cast< llarp_link_establish_job * >(user); - llarp::Info("trying to establish session again"); + llarp::Addr remote = job->ai; + llarp::Info("trying to establish session again with ", remote); job->link->try_establish(job->link, job); } @@ -281,11 +288,17 @@ void llarp_router::Tick() { llarp::Debug("tick router"); + llarp_link_session_iter iter; + iter.user = this; + iter.visit = &send_padded_message; if(sendPadding) { - for(auto &link : links) + for(auto &item : links) { - link->iter_sessions(link, {this, nullptr, &send_padded_message}); + if(!item.second) + continue; + auto link = item.first; + link->iter_sessions(link, iter); } } } @@ -294,12 +307,46 @@ bool llarp_router::send_padded_message(llarp_link_session_iter *itr, llarp_link_session *peer) { - auto msg = new llarp::DiscardMessage({}, 4096); llarp_router *self = static_cast< llarp_router * >(itr->user); - self->SendToOrQueue(peer->get_remote_router(peer)->pubkey, {msg}); + llarp::RouterID remote; + remote = &peer->get_remote_router(peer)->pubkey[0]; + for(size_t idx = 0; idx < 50; ++idx) + { + llarp::DiscardMessage msg(9000); + self->SendTo(remote, &msg); + } return true; } +void +llarp_router::SendTo(llarp::RouterID remote, llarp::ILinkMessage *msg) +{ + llarp_buffer_t buf = + llarp::StackBuffer< decltype(linkmsg_buffer) >(linkmsg_buffer); + + if(!msg->BEncode(&buf)) + { + llarp::Warn("failed to encode outbound message, buffer size left: ", + llarp_buffer_size_left(buf)); + return; + } + // set size of message + buf.sz = buf.cur - buf.base; + buf.cur = buf.base; + + bool sent = false; + for(auto &item : links) + { + if(!item.second) + continue; + if(!sent) + { + auto link = item.first; + sent = link->sendto(link, remote, buf); + } + } +} + void llarp_router::ScheduleTicker(uint64_t ms) { @@ -322,6 +369,7 @@ llarp_router::SessionClosed(const llarp::RouterID &remote) void llarp_router::FlushOutboundFor(const llarp::RouterID &remote) { + llarp::Debug("Flush outbound for ", remote); auto itr = outboundMesssageQueue.find(remote); if(itr == outboundMesssageQueue.end()) return; @@ -343,12 +391,17 @@ llarp_router::FlushOutboundFor(const llarp::RouterID &remote) buf.sz = buf.cur - buf.base; buf.cur = buf.base; - bool sent = false; - for(auto &link : links) + llarp::RouterID peer = remote; + bool sent = false; + for(auto &item : links) { - if(!sent) + if(item.second) { - sent = link->sendto(link, remote, buf); + if(!sent) + { + auto link = item.first; + sent = link->sendto(link, peer, buf); + } } } if(!sent) @@ -366,8 +419,11 @@ llarp_router::on_try_connect_result(llarp_link_establish_job *job) llarp_router *router = static_cast< llarp_router * >(job->user); if(job->session) { + delete job; + /* auto session = job->session; router->async_verify_RC(session, false, job); + */ return; } llarp::Info("session not established"); @@ -399,8 +455,11 @@ llarp_router::Run() llarp::Zero(&rc, sizeof(llarp_rc)); // fill our address list rc.addrs = llarp_ai_list_new(); - for(auto link : links) + for(auto &item : links) { + if(item.second) + continue; + auto link = item.first; llarp_ai addr; link->get_our_address(link, &addr); llarp_ai_list_pushback(rc.addrs, &addr); @@ -421,8 +480,9 @@ llarp_router::Run() llarp_dht_context_start(dht, ourPubkey); // start links - for(auto link : links) + for(auto &item : links) { + auto link = item.first; int result = link->start_link(link, logic); if(result == -1) llarp::Warn("Link ", link->name(), " failed to start"); @@ -474,7 +534,11 @@ llarp_init_router(struct llarp_threadpool *tp, struct llarp_ev_loop *netloop, router->tp = tp; router->logic = logic; // TODO: make disk io threadpool count configurable +#ifdef TESTNET + router->disk = tp; +#else router->disk = llarp_init_threadpool(1, "llarp-diskio"); +#endif llarp_crypto_libsodium_init(&router->crypto); } return router; @@ -508,11 +572,22 @@ llarp_router_try_connect(struct llarp_router *router, struct llarp_rc *remote) llarp_ai addr; if(llarp_ai_list_index(remote->addrs, 0, &addr)) { - llarp_router_iterate_links(router, - {&addr, &llarp_router::iter_try_connect}); - return true; - } + for(auto &item : router->links) + { + if(!item.second) + continue; + auto link = item.first; + llarp_link_establish_job *job = new llarp_link_establish_job; + llarp_ai_copy(&job->ai, &addr); + job->timeout = 10000; + job->result = &llarp_router::on_try_connect_result; + // give router as user pointer + job->user = router; + link->try_establish(link, job); + return true; + } + } return false; } @@ -553,10 +628,12 @@ bool llarp_findOrCreateIdentity(llarp_crypto *crypto, const char *fpath, byte_t *secretkey) { + llarp::Debug("find or create ", fpath); fs::path path(fpath); std::error_code ec; if(!fs::exists(path, ec)) { + llarp::Info("regenerated identity key"); crypto->identity_keygen(secretkey); std::ofstream f(path, std::ios::binary); if(f.is_open()) @@ -570,6 +647,7 @@ llarp_findOrCreateIdentity(llarp_crypto *crypto, const char *fpath, f.read((char *)secretkey, sizeof(llarp_seckey_t)); return true; } + llarp::Info("failed to get identity key"); return false; } @@ -619,9 +697,10 @@ void llarp_router_iterate_links(struct llarp_router *router, struct llarp_router_link_iter i) { - for(auto link : router->links) - if(!i.visit(&i, router, link)) - return; + for(auto item : router->links) + if(item.second) + if(!i.visit(&i, router, item.first)) + return; } void @@ -677,13 +756,10 @@ namespace llarp iwp_link_init(link, args); if(llarp_link_initialized(link)) { - // printf("router -> link initialized\n"); + llarp::Info("link ", key, " initialized"); if(link->configure(link, self->netloop, key, af, proto)) { - llarp_ai ai; - link->get_our_address(link, &ai); - llarp::Addr addr = ai; - self->AddLink(link); + self->AddLink(link, llarp::StrEq(key, "*")); return; } if(af == AF_INET6) @@ -697,7 +773,7 @@ namespace llarp llarp_ai ai; link->get_our_address(link, &ai); llarp::Addr addr = ai; - self->AddLink(link); + self->AddLink(link, llarp::StrEq(key, "*")); return; } } diff --git a/llarp/router.hpp b/llarp/router.hpp index 47e5f28dd..2a766286e 100644 --- a/llarp/router.hpp +++ b/llarp/router.hpp @@ -68,7 +68,7 @@ struct llarp_router llarp::InboundMessageParser inbound_msg_parser; - std::list< llarp_link * > links; + std::list< std::pair< llarp_link *, bool > > links; typedef std::queue< llarp::ILinkMessage * > MessageQueue; @@ -86,7 +86,7 @@ struct llarp_router HandleRecvLinkMessage(struct llarp_link_session *from, llarp_buffer_t msg); void - AddLink(struct llarp_link *link); + AddLink(struct llarp_link *link, bool isOutbound); void Close(); @@ -119,6 +119,10 @@ struct llarp_router SendToOrQueue(const llarp::RouterID &remote, std::vector< llarp::ILinkMessage * > msgs); + /// sendto or drop + void + SendTo(llarp::RouterID remote, llarp::ILinkMessage *msg); + /// manually flush outbound message queue for just 1 router void FlushOutboundFor(const llarp::RouterID &remote); diff --git a/llarp/testnet.c b/llarp/testnet.c new file mode 100644 index 000000000..60f501489 --- /dev/null +++ b/llarp/testnet.c @@ -0,0 +1,6 @@ +#ifdef SHADOW_TESTNET +#include + +/** insert shadow test network specific code here */ + +#endif diff --git a/llarp/threadpool.cpp b/llarp/threadpool.cpp index 0ac869364..421139d60 100644 --- a/llarp/threadpool.cpp +++ b/llarp/threadpool.cpp @@ -1,6 +1,10 @@ #include "threadpool.hpp" #include #include + +#include +#include + #include "logger.hpp" #if(__FreeBSD__) @@ -29,18 +33,24 @@ namespace llarp } for(;;) { - llarp_thread_job job; + llarp_thread_job *job; { lock_t lock(this->queue_mutex); this->condition.wait( lock, [this] { return this->stop || !this->jobs.empty(); }); if(this->stop && this->jobs.empty()) return; - job = std::move(this->jobs.front()); + job = this->jobs.front(); this->jobs.pop_front(); } + auto now = llarp_time_now_ms(); // do work - job.work(job.user); + job->work(job->user); + auto after = llarp_time_now_ms(); + auto dlt = after - now; + if(dlt > 1) + llarp::Warn("work took ", dlt, " ms"); + delete job; } }); } @@ -75,7 +85,7 @@ namespace llarp if(stop) return; - jobs.emplace_back(job); + jobs.push_back(new llarp_thread_job(job.user, job.work)); } condition.notify_one(); } @@ -85,9 +95,16 @@ namespace llarp struct llarp_threadpool { - llarp::thread::Pool impl; + llarp::thread::Pool *impl; - llarp_threadpool(int workers, const char *name) : impl(workers, name) + std::queue< llarp_thread_job > jobs; + + llarp_threadpool(int workers, const char *name) + : impl(new llarp::thread::Pool(workers, name)) + { + } + + llarp_threadpool() : impl(nullptr) { } }; @@ -103,11 +120,18 @@ llarp_init_threadpool(int workers, const char *name) return nullptr; } +struct llarp_threadpool * +llarp_init_same_process_threadpool() +{ + return new llarp_threadpool(); +} + void llarp_threadpool_join(struct llarp_threadpool *pool) { llarp::Debug("threadpool join"); - pool->impl.Join(); + if(pool->impl) + pool->impl->Join(); } void @@ -119,7 +143,8 @@ void llarp_threadpool_stop(struct llarp_threadpool *pool) { llarp::Debug("threadpool stop"); - pool->impl.Stop(); + if(pool->impl) + pool->impl->Stop(); } void @@ -127,9 +152,10 @@ llarp_threadpool_wait(struct llarp_threadpool *pool) { std::mutex mtx; llarp::Debug("threadpool wait"); + if(pool->impl) { std::unique_lock< std::mutex > lock(mtx); - pool->impl.done.wait(lock); + pool->impl->done.wait(lock); } } @@ -137,7 +163,21 @@ void llarp_threadpool_queue_job(struct llarp_threadpool *pool, struct llarp_thread_job job) { - pool->impl.QueueJob(job); + if(pool->impl) + pool->impl->QueueJob(job); + else + pool->jobs.push(job); +} + +void +llarp_threadpool_tick(struct llarp_threadpool *pool) +{ + while(pool->jobs.size()) + { + auto &job = pool->jobs.front(); + job.work(job.user); + pool->jobs.pop(); + } } void diff --git a/llarp/threadpool.hpp b/llarp/threadpool.hpp index a34dfe0c8..b40ea891f 100644 --- a/llarp/threadpool.hpp +++ b/llarp/threadpool.hpp @@ -25,7 +25,7 @@ namespace llarp void Stop(); std::vector< std::thread > threads; - std::deque< llarp_thread_job > jobs; + std::deque< llarp_thread_job* > jobs; mtx_t queue_mutex; std::condition_variable condition; diff --git a/llarp/time.cpp b/llarp/time.cpp index 089e9db2b..10224102b 100644 --- a/llarp/time.cpp +++ b/llarp/time.cpp @@ -3,7 +3,7 @@ namespace llarp { - typedef std::chrono::steady_clock clock_t; + typedef std::chrono::system_clock clock_t; template < typename Res, typename IntType > static IntType diff --git a/llarp/timer.cpp b/llarp/timer.cpp index ad3b0182c..4b91c2edf 100644 --- a/llarp/timer.cpp +++ b/llarp/timer.cpp @@ -1,8 +1,9 @@ +#include #include #include #include #include -#include +#include #include "logger.hpp" @@ -10,41 +11,28 @@ namespace llarp { struct timer { - static uint64_t - now() - { - return std::chrono::duration_cast< std::chrono::milliseconds >( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - } - void* user; uint64_t called_at; uint64_t started; uint64_t timeout; llarp_timer_handler_func func; bool done; + bool canceled; timer(uint64_t ms = 0, void* _user = nullptr, llarp_timer_handler_func _func = nullptr) : user(_user) , called_at(0) - , started(now()) + , started(llarp_time_now_ms()) , timeout(ms) , func(_func) , done(false) + , canceled(false) { } - timer& - operator=(const timer& other) + ~timer() { - user = other.user; - called_at = other.called_at; - started = other.started; - timeout = other.timeout; - func = other.func; - return *this; } void @@ -56,25 +44,31 @@ namespace llarp static_cast< timer* >(user)->exec(); } - operator llarp_thread_job() + void + send_job(llarp_threadpool* pool) { - return {this, timer::call}; + llarp_threadpool_queue_job(pool, {this, timer::call}); } }; }; // namespace llarp struct llarp_timer_context { - llarp_threadpool* threadpool; std::mutex timersMutex; - std::map< uint32_t, llarp::timer > timers; + std::unordered_map< uint32_t, llarp::timer* > timers; std::mutex tickerMutex; - std::condition_variable ticker; - std::chrono::milliseconds nextTickLen = std::chrono::milliseconds(10); + std::condition_variable* ticker = nullptr; + std::chrono::milliseconds nextTickLen = std::chrono::milliseconds(100); uint32_t ids = 0; bool _run = true; + ~llarp_timer_context() + { + if(ticker) + delete ticker; + } + bool run() { @@ -90,16 +84,11 @@ struct llarp_timer_context void cancel(uint32_t id) { - llarp::timer t; - { - std::unique_lock< std::mutex > lock(timersMutex); - auto itr = timers.find(id); - if(itr == timers.end()) - return; - t = itr->second; - } - t.called_at = llarp::timer::now(); - t.exec(); + std::unique_lock< std::mutex > lock(timersMutex); + auto itr = timers.find(id); + if(itr == timers.end()) + return; + itr->second->canceled = true; } void @@ -107,8 +96,10 @@ struct llarp_timer_context { std::unique_lock< std::mutex > lock(timersMutex); auto itr = timers.find(id); - if(itr != timers.end()) - timers.erase(itr); + if(itr == timers.end()) + return; + itr->second->func = nullptr; + itr->second->canceled = true; } uint32_t @@ -116,7 +107,7 @@ struct llarp_timer_context { std::unique_lock< std::mutex > lock(timersMutex); uint32_t id = ++ids; - timers.emplace(id, std::move(llarp::timer(timeout_ms, user, func))); + timers[id] = new llarp::timer(timeout_ms, user, func); return id; } @@ -175,11 +166,10 @@ llarp_timer_stop(struct llarp_timer_context* t) { // destroy all timers // don't call callbacks on timers - llarp::Debug("clear timers"); t->timers.clear(); t->stop(); - llarp::Debug("stop timers"); - t->ticker.notify_all(); + if(t->ticker) + t->ticker->notify_all(); } void @@ -188,46 +178,59 @@ llarp_timer_cancel_job(struct llarp_timer_context* t, uint32_t id) t->cancel(id); } +void +llarp_timer_tick_all(struct llarp_timer_context* t, + struct llarp_threadpool* pool) +{ + if(!t->run()) + return; + auto now = llarp_time_now_ms(); + auto itr = t->timers.begin(); + while(itr != t->timers.end()) + { + if(now - itr->second->started >= itr->second->timeout + || itr->second->canceled) + { + if(itr->second->func && itr->second->called_at == 0) + { + // timer hit + itr->second->called_at = now; + itr->second->send_job(pool); + ++itr; + } + else if(itr->second->done) + { + // remove timer + llarp::timer* timer = itr->second; + itr = t->timers.erase(itr); + delete timer; + } + else + ++itr; + } + else // timer not hit yet + ++itr; + } +} + void llarp_timer_run(struct llarp_timer_context* t, struct llarp_threadpool* pool) { - t->threadpool = pool; + t->ticker = new std::condition_variable; while(t->run()) { // wait for timer mutex + if(t->ticker) { std::unique_lock< std::mutex > lock(t->tickerMutex); - t->ticker.wait_for(lock, t->nextTickLen); + t->ticker->wait_for(lock, t->nextTickLen); } if(t->run()) { std::unique_lock< std::mutex > lock(t->timersMutex); // we woke up - auto now = llarp::timer::now(); - auto itr = t->timers.begin(); - while(itr != t->timers.end()) - { - if(now - itr->second.started >= itr->second.timeout) - { - if(itr->second.func) - { - // timer hit - itr->second.called_at = now; - llarp_threadpool_queue_job(pool, itr->second); - ++itr; - } - else if(itr->second.done) - { - // timer was already called, remove timer - itr = t->timers.erase(itr); - } - else - ++itr; - } - else // timer not hit yet - ++itr; - } + llarp_timer_tick_all(t, pool); } } } @@ -249,7 +252,7 @@ namespace llarp call(user, timeout, 0); else call(user, timeout, diff); - done = true; } + done = true; } }