From 6af498092bca001a075d00a57630a547f722c8da Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Thu, 21 May 2020 10:18:23 -0400 Subject: [PATCH 01/40] exit traffic via loki addresses --- CMakeLists.txt | 25 +-- cmake/unix.cmake | 9 + contrib/py/admin/lokinetmon | 120 ++++++++++--- daemon/CMakeLists.txt | 3 + daemon/main.cpp | 26 +++ llarp/config/config.cpp | 57 +++++-- llarp/config/config.hpp | 9 +- llarp/exit/session.cpp | 24 ++- llarp/exit/session.hpp | 8 +- llarp/handlers/null.hpp | 3 + llarp/handlers/tun.cpp | 266 ++++++++++++----------------- llarp/handlers/tun.hpp | 48 +----- llarp/net/ip_packet.cpp | 107 ++++++++++-- llarp/net/ip_packet.hpp | 11 ++ llarp/net/net.cpp | 9 + llarp/net/net.hpp | 3 + llarp/path/pathset.hpp | 6 + llarp/path/transit_hop.cpp | 2 +- llarp/service/endpoint.cpp | 14 +- llarp/service/endpoint.hpp | 9 +- llarp/service/endpoint_state.cpp | 3 +- llarp/service/endpoint_state.hpp | 2 + llarp/service/outbound_context.cpp | 21 ++- llarp/service/outbound_context.hpp | 10 ++ llarp/service/protocol_type.hpp | 2 +- llarp/util/aligned.hpp | 6 - 26 files changed, 505 insertions(+), 298 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 6644a1080..5a52d6776 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -33,7 +33,7 @@ option(STATIC_LINK "link statically against dependencies" OFF) option(BUILD_SHARED_LIBS "build lokinet libraries as shared libraries instead of static" ON) option(SHADOW "use shadow testing framework. linux only" OFF) option(XSAN "use sanitiser, if your system has it (requires -DCMAKE_BUILD_TYPE=Debug)" OFF) -option(JEMALLOC "use jemalloc. Not required on BSD" OFF) +option(WITH_JEMALLOC "use jemalloc as allocator" OFF) option(TESTNET "testnet build" OFF) option(WITH_COVERAGE "generate coverage data" OFF) option(USE_SHELLHOOKS "enable shell hooks on compile time (dangerous)" OFF) @@ -49,6 +49,12 @@ endif() include(CheckCXXSourceCompiles) include(CheckLibraryExists) +set(CMAKE_CXX_STANDARD 17) +set(CMAKE_CXX_STANDARD_REQUIRED ON) +set(CMAKE_CXX_EXTENSIONS OFF) +set(CMAKE_C_STANDARD 99) +set(CMAKE_C_STANDARD_REQUIRED ON) +set(CMAKE_C_EXTENSIONS OFF) include(cmake/enable_lto.cmake) include(cmake/target_link_libraries_system.cmake) @@ -80,14 +86,6 @@ list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_LIST_DIR}/cmake") include(MacroEnsureOutOfSourceBuild) macro_ensure_out_of_source_build("${PROJECT_NAME} requires an out-of-source build. Create a build directory and run 'cmake ${CMAKE_SOURCE_DIR} [options]'.") -set(CMAKE_CXX_STANDARD 17) - -set(CMAKE_CXX_STANDARD_REQUIRED ON) -set(CMAKE_CXX_EXTENSIONS OFF) -set(CMAKE_C_STANDARD 99) -set(CMAKE_C_STANDARD_REQUIRED ON) -set(CMAKE_C_EXTENSIONS OFF) - # Always build PIC set(CMAKE_POSITION_INDEPENDENT_CODE ON) @@ -213,15 +211,6 @@ endif() string(REGEX REPLACE "^fatal.*$" nogit GIT_VERSION_REAL "${GIT_VERSION}") -# HeapAlloc(2) on Windows was significantly revamped in 2009 -# but the old algorithm isn't too bad either -# this is _the_ system allocator on BSD UNIX -# openbsd replaced it with a secure/randomised malloc not too -# long ago -if(JEMALLOC) - set(MALLOC_LIB jemalloc) -endif() - find_package(PkgConfig QUIET) if(PKG_CONFIG_FOUND) diff --git a/cmake/unix.cmake b/cmake/unix.cmake index f0553f7c5..857392bd6 100644 --- a/cmake/unix.cmake +++ b/cmake/unix.cmake @@ -5,6 +5,15 @@ endif() include(CheckCXXSourceCompiles) include(CheckLibraryExists) +if(WITH_JEMALLOC) + find_package(Jemalloc REQUIRED) + if(NOT JEMALLOC_FOUND) + message(FATAL_ERROR "did not find jemalloc") + endif() + add_definitions(-DUSE_JEMALLOC) + message(STATUS "using jemalloc") +endif() + add_library(curl INTERFACE) option(DOWNLOAD_CURL "download and statically compile in CURL" OFF) diff --git a/contrib/py/admin/lokinetmon b/contrib/py/admin/lokinetmon index 3a4af2d61..fb201f138 100755 --- a/contrib/py/admin/lokinetmon +++ b/contrib/py/admin/lokinetmon @@ -15,9 +15,11 @@ class Monitor: def __init__(self, url): self.data = dict() self.win = curses.initscr() + curses.start_color() + curses.init_pair(1, curses.COLOR_RED, curses.COLOR_BLACK) self._url = url while len(self._globalspeed) < self._speedSamples: - self._globalspeed.append((0, 0)) + self._globalspeed.append((0, 0, 0, 0)) def __del__(self): curses.endwin() @@ -82,6 +84,18 @@ class Monitor: idx += 1 return "{} {}ps".format("%.2f" % rate, units[idx]) + def get_all_paths(self): + for k in self.data['services']: + status = self.data['services'][k] + for path in (status['paths'] or []): + yield path + for s in (status['remoteSessions'] or []): + for path in s['paths']: + yield path + for s in (status['snodeSessions'] or []): + for path in s['paths']: + yield path + def display_service(self, y, name, status): """display a service at current position""" self.win.move(y, 1) @@ -128,27 +142,34 @@ class Monitor: y += 2 self.win.move(y, 1) self.win.addstr( - "global speed:\t\t[{}\ttx]\t[{}\trx]".format( + "throughput:\t\t[{}\ttx]\t[{}\trx]".format( self.speedOf(self.txrate), self.speedOf(self.rxrate) ) ) + bloat_tx, bloat_rx = self.calculate_bloat(self.data['links']['outbound'], self.get_all_paths()) + y += 1 + self.win.move(y, 1) + self.win.addstr("goodput:\t\t[{}\ttx]\t[{}\trx]".format(self.speedOf(self.txrate-bloat_tx), self.speedOf(self.rxrate-bloat_rx))) + y += 1 + self.win.move(y, 1) + self.win.addstr("overhead:\t\t[{}\ttx]\t[{}\trx]".format(self.speedOf(bloat_tx), self.speedOf(bloat_rx))) + - self._globalspeed.append((self.txrate, self.rxrate)) + self._globalspeed.append((self.txrate, self.rxrate, bloat_tx, bloat_rx)) while len(self._globalspeed) > self._speedSamples: self._globalspeed.pop(0) return self.display_speedgraph(y + 2, self._globalspeed) - def display_speedgraph(self, y, samps, maxsz=20): + def display_speedgraph(self, y, samps, maxsz=40): """ display global speed graph """ - def scale(x, n): while n > 0: x /= 2 n -= 1 return int(x) - txmax, rxmax = 1000, 1000 - for tx, rx in samps: + txmax, rxmax = 1024, 1024 + for tx, rx, _tx, _rx in samps: if tx > txmax: txmax = tx if rx > rxmax: @@ -164,13 +185,13 @@ class Monitor: txscale += 1 txmax /= 2 - def makebar(samp, max): - bar = "#" * samp + def makebar(samp, badsamp, max): + bar = "#" * (samp - badsamp) pad = " " * (max - samp) - return pad, bar + return pad, bar, '#' * badsamp - txlabelpad = int(txmax / 2) - 1 - rxlabelpad = int(rxmax / 2) - 1 + txlabelpad = int(txmax / 2)# - 1 + rxlabelpad = int(rxmax / 2)# - 1 if txlabelpad <= 0: txlabelpad = 1 if rxlabelpad <= 0: @@ -182,32 +203,77 @@ class Monitor: self.win.addstr( "{}tx{}{}rx{}".format(txlabelpad, txlabelpad, rxlabelpad, rxlabelpad) ) - for tx, rx in samps: + for tx, rx, btx, brx in samps: y += 1 self.win.move(y, 1) - txpad, txbar = makebar(scale(tx, txscale), int(txmax)) - rxpad, rxbar = makebar(scale(rx, rxscale), int(rxmax)) - self.win.addstr("{}{}|{}{}".format(txpad, txbar, rxbar, rxpad)) + txpad, txbar, btxbar = makebar(scale(tx,txscale),scale(btx,txscale), int(txmax)) + rxpad, rxbar, brxbar = makebar(scale(rx,rxscale),scale(brx,rxscale), int(rxmax)) + self.win.addstr(txpad) + self.win.addstr(btxbar, curses.color_pair(1)) + self.win.addstr(txbar) + self.win.addstr('|') + self.win.addstr(rxbar) + self.win.addstr(brxbar, curses.color_pair(1)) + self.win.addstr(rxpad) + return y + 2 + def calculate_bloat(self, links, paths): + """ + calculate bandwith overhead + """ + lltx = 0 + llrx = 0 + tx = 0 + rx = 0 + for link in links: + sessions = link["sessions"]["established"] + for s in sessions: + lltx += s['tx'] + llrx += s['rx'] + for path in paths: + tx += path['txRateCurrent'] + rx += path['rxRateCurrent'] + if lltx > tx: + lltx -= tx + if llrx > rx: + llrx -= rx + return lltx, llrx + + def display_link(self, y, link): y += 1 self.win.move(y, 1) sessions = link["sessions"]["established"] for s in sessions: - y += 1 - self.win.move(y, 1) - self.txrate += s["txRateCurrent"] - self.rxrate += s["rxRateCurrent"] - self.win.addstr( - "{}\t[{}\ttx]\t[{}\trx]".format( - s["remoteAddr"], self.speedOf(s["txRateCurrent"]), self.speedOf(s["rxRateCurrent"]) - ) + y = self.display_link_session(y, s) + return y + + def display_link_session(self, y, s): + y += 1 + self.win.move(y, 1) + self.txrate += s["txRateCurrent"] + self.rxrate += s["rxRateCurrent"] + self.win.addstr( + "{}\t[{}\ttx]\t[{}\trx]".format( + s["remoteAddr"], self.speedOf(s["txRateCurrent"]), self.speedOf(s["rxRateCurrent"]) ) - if (s['txMsgQueueSize'] or 0) > 1: - self.win.addstr(" [out window:\t{}]".format(s['txMsgQueueSize'])) + ) + if (s['txMsgQueueSize'] or 0) > 1: + self.win.addstr(" [out window: {}]".format(s['txMsgQueueSize'])) if (s['rxMsgQueueSize'] or 0) > 1: - self.win.addstr(" [in window:\t{}]".format(s['rxMsgQueueSize'])) + self.win.addstr(" [in window: {}]".format(s['rxMsgQueueSize'])) + def display(acks, label, num='acks', dem='packets'): + if acks[dem] > 0: + self.win.addstr(" [{}: {}]".format(label, round(float(acks[num]) / float(acks[dem]), 2))) + if ('recvMACKs' in s) and ('sendMACKs' in s): + display(s['sendMACKs'], 'out MACK density') + display(s['recvMACKs'], 'in MACK density') + d = {'recvAcks': 'in acks', 'sendAcks': 'out acks', 'recvRTX': 'in RTX', 'sendRTX': 'out RTX'} + for k in d: + v = d[k] + if (k in s) and (s[k] > 0): + self.win.addstr(" [{}: {}]".format(v, s[k])) return y def display_dht(self, y, data): diff --git a/daemon/CMakeLists.txt b/daemon/CMakeLists.txt index 88a638215..0f3324360 100644 --- a/daemon/CMakeLists.txt +++ b/daemon/CMakeLists.txt @@ -20,6 +20,9 @@ else() target_link_directories(${exe} PRIVATE /usr/local/lib) endif() target_link_libraries(${exe} PRIVATE liblokinet) + if(WITH_JEMALLOC) + target_link_libraries(${exe} PUBLIC jemalloc) + endif() target_compile_definitions(${exe} PRIVATE -DVERSIONTAG=${GIT_VERSION_REAL}) add_log_tag(${exe}) endforeach() diff --git a/daemon/main.cpp b/daemon/main.cpp index 403fd4bb8..84db722df 100644 --- a/daemon/main.cpp +++ b/daemon/main.cpp @@ -14,6 +14,32 @@ #include #include +#ifdef USE_JEMALLOC +#include +#include + +void* +operator new(std::size_t sz) +{ + void* ptr = malloc(sz); + if (ptr) + return ptr; + else + throw std::bad_alloc{}; +} +void +operator delete(void* ptr) noexcept +{ + free(ptr); +} + +void +operator delete(void* ptr, size_t) noexcept +{ + free(ptr); +} +#endif + #ifdef _WIN32 #define wmin(x, y) (((x) < (y)) ? (x) : (y)) #define MIN wmin diff --git a/llarp/config/config.cpp b/llarp/config/config.cpp index 140d673ec..83721ec44 100644 --- a/llarp/config/config.cpp +++ b/llarp/config/config.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -142,6 +143,8 @@ namespace llarp conf.defineOption( "network", "type", false, "tun", AssignmentAcceptor(m_endpointType)); + conf.defineOption("network", "exit", false, false, AssignmentAcceptor(m_AllowExit)); + conf.defineOption( "network", "profiling", @@ -165,27 +168,59 @@ namespace llarp conf.defineOption( "network", "reachable", false, ReachableDefault, AssignmentAcceptor(m_reachable)); - conf.defineOption("network", "hops", false, HopsDefault, [](int arg) { + conf.defineOption("network", "hops", false, HopsDefault, [this](int arg) { if (arg < 1 or arg > 8) throw std::invalid_argument("[endpoint]:hops must be >= 1 and <= 8"); + m_hops = arg; }); - conf.defineOption("network", "paths", false, PathsDefault, [](int arg) { + conf.defineOption("network", "paths", false, PathsDefault, [this](int arg) { if (arg < 1 or arg > 8) throw std::invalid_argument("[endpoint]:paths must be >= 1 and <= 8"); + m_paths = arg; }); -#ifdef LOKINET_EXITS conf.defineOption("network", "exit-node", false, "", [this](std::string arg) { - // TODO: validate as valid .loki / .snode address - // probably not .snode...? - m_exitNode = arg; + if (arg.empty()) + return; + service::Address exit; + if (not exit.FromString(arg)) + { + throw std::invalid_argument(stringify("[endpoint]:exit-node bad address: ", arg)); + } + m_exitNode = exit; }); -#endif conf.defineOption("network", "mapaddr", false, "", [this](std::string arg) { - // TODO: parse / validate as loki_addr : IP addr pair - m_mapAddr = arg; + if (arg.empty()) + return; + huint128_t ip; + service::Address addr; + const auto pos = arg.find(":"); + if (pos == std::string::npos) + { + throw std::invalid_argument(stringify("[endpoint]:mapaddr invalid entry: ", arg)); + } + std::string addrstr = arg.substr(0, pos); + std::string ipstr = arg.substr(pos + 1); + if (not ip.FromString(ipstr)) + { + huint32_t ipv4; + if (not ipv4.FromString(ipstr)) + { + throw std::invalid_argument(stringify("[endpoint]:mapaddr invalid ip: ", ipstr)); + } + ip = net::ExpandV4(ipv4); + } + if (not addr.FromString(addrstr)) + { + throw std::invalid_argument(stringify("[endpoint]:mapaddr invalid addresss: ", addrstr)); + } + if (m_mapAddrs.find(ip) != m_mapAddrs.end()) + { + throw std::invalid_argument(stringify("[endpoint]:mapaddr ip already mapped: ", ipstr)); + } + m_mapAddrs[ip] = addr; }); conf.defineOption("network", "ifaddr", false, "", [this](std::string arg) { @@ -796,14 +831,12 @@ namespace llarp "Adds a `.snode` address to the blacklist.", }); -#ifdef LOKINET_EXITS def.addOptionComments( "network", "exit-node", { - "Specify a `.snode` or `.loki` address to use as an exit broker.", + "Specify a `.loki` address to use as an exit broker.", }); -#endif def.addOptionComments( "network", diff --git a/llarp/config/config.hpp b/llarp/config/config.hpp index 3da7b59fe..c27c3bde8 100644 --- a/llarp/config/config.hpp +++ b/llarp/config/config.hpp @@ -10,6 +10,8 @@ #include #include #include +#include +#include #include #include @@ -75,11 +77,10 @@ namespace llarp bool m_reachable = false; int m_hops = -1; int m_paths = -1; + bool m_AllowExit = false; std::set m_snodeBlacklist; -#ifdef LOKINET_EXITS - std::string m_exitNode; -#endif - std::string m_mapAddr; + std::optional m_exitNode; + std::unordered_map m_mapAddrs; // TODO: // on-up diff --git a/llarp/exit/session.cpp b/llarp/exit/session.cpp index bc565f753..6d1e7ae1b 100644 --- a/llarp/exit/session.cpp +++ b/llarp/exit/session.cpp @@ -65,7 +65,7 @@ namespace llarp } void - BaseSession::BlacklistSnode(const RouterID snode) + BaseSession::BlacklistSNode(const RouterID snode) { m_SnodeBlacklist.insert(std::move(snode)); } @@ -99,7 +99,7 @@ namespace llarp bool BaseSession::CheckPathDead(path::Path_ptr, llarp_time_t dlt) { - return dlt >= 10s; + return dlt >= path::alive_timeout; } void @@ -359,5 +359,25 @@ namespace llarp { return "Exit::" + m_ExitRouter.ToString(); } + + void + SNodeSession::SendPacketToRemote(const llarp_buffer_t& buf) + { + net::IPPacket pkt; + if (not pkt.Load(buf)) + return; + pkt.ZeroAddresses(); + QueueUpstreamTraffic(std::move(pkt), llarp::routing::ExitPadSize); + } + + void + ExitSession::SendPacketToRemote(const llarp_buffer_t& buf) + { + net::IPPacket pkt; + if (not pkt.Load(buf)) + return; + pkt.ZeroSourceAddress(); + QueueUpstreamTraffic(std::move(pkt), llarp::routing::ExitPadSize); + } } // namespace exit } // namespace llarp diff --git a/llarp/exit/session.hpp b/llarp/exit/session.hpp index 7eaaf57ca..f33f1fee9 100644 --- a/llarp/exit/session.hpp +++ b/llarp/exit/session.hpp @@ -45,7 +45,7 @@ namespace llarp } void - BlacklistSnode(const RouterID snode); + BlacklistSNode(const RouterID snode) override; util::StatusObject ExtractStatus() const; @@ -187,6 +187,9 @@ namespace llarp std::string Name() const override; + virtual void + SendPacketToRemote(const llarp_buffer_t& pkt) override; + protected: void PopulateRequest(llarp::routing::ObtainExitMessage& msg) const override @@ -213,6 +216,9 @@ namespace llarp std::string Name() const override; + virtual void + SendPacketToRemote(const llarp_buffer_t& pkt) override; + protected: void PopulateRequest(llarp::routing::ObtainExitMessage& msg) const override diff --git a/llarp/handlers/null.hpp b/llarp/handlers/null.hpp index 2e64542f9..3eaad30ff 100644 --- a/llarp/handlers/null.hpp +++ b/llarp/handlers/null.hpp @@ -33,6 +33,9 @@ namespace llarp { return false; } + + void + SendPacketToRemote(const llarp_buffer_t&) override{}; }; } // namespace handlers } // namespace llarp diff --git a/llarp/handlers/tun.cpp b/llarp/handlers/tun.cpp index 328ff0a81..3fc7bd688 100644 --- a/llarp/handlers/tun.cpp +++ b/llarp/handlers/tun.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -26,7 +27,7 @@ namespace llarp void TunEndpoint::FlushToUser(std::function send) { - m_ExitMap.ForEachValue([](const auto& exit) { exit->FlushDownstream(); }); + m_ExitMap.ForEachValue([r = Router()](const auto& exit) { exit->DownstreamFlush(r); }); // flush network to user m_NetworkToUserPktQueue.Process(send); } @@ -42,12 +43,7 @@ namespace llarp TunEndpoint::tunifTick(llarp_tun_io* tun) { auto* self = static_cast(tun->user); - const auto now = self->Now(); - if (self->ShouldFlushNow(now)) - { - self->m_LastFlushAt = now; - LogicCall(self->m_router->logic(), [self]() { self->Flush(); }); - } + self->Flush(); } TunEndpoint::TunEndpoint(AbstractRouter* r, service::Context* parent, bool lazyVPN) @@ -168,86 +164,12 @@ namespace llarp } */ -#ifdef LOKINET_EXITS - - if (not conf.m_exitNode.empty()) - { - IPRange exitRange; - llarp::RouterID exitRouter; - std::string routerStr; - const auto pos = conf.m_exitNode.find(","); - if (pos != std::string::npos) - { - auto range_str = conf.m_exitNode.substr(1 + pos); - if (!exitRange.FromString(range_str)) - { - LogError("bad exit range: '", range_str, "'"); - return false; - } - routerStr = conf.m_exitNode.substr(0, pos); - } - else - { - routerStr = conf.m_exitNode; - } - routerStr = TrimWhitespace(routerStr); - if (!(exitRouter.FromString(routerStr) - || HexDecode(routerStr.c_str(), exitRouter.begin(), exitRouter.size()))) - { - llarp::LogError(Name(), " bad exit router key: ", routerStr); - return false; - } - auto exit = std::make_shared( - exitRouter, - util::memFn(&TunEndpoint::QueueInboundPacketForExit, this), - m_router, - numPaths, - numHops, - ShouldBundleRC()); - m_ExitMap.Insert(exitRange, exit); - llarp::LogInfo(Name(), " using exit at ", exitRouter, " for ", exitRange); - } -#endif - m_LocalResolverAddr = dnsConf.m_bind; m_UpstreamResolvers = dnsConf.m_upstreamDNS; - if (not conf.m_mapAddr.empty()) + for (const auto& item : conf.m_mapAddrs) { - auto pos = conf.m_mapAddr.find(":"); - if (pos == std::string::npos) - { - llarp::LogError( - "Cannot map address ", - conf.m_mapAddr, - " invalid format, missing colon (:), expects " - "address.loki:ip.address.goes.here"); - return false; - } - service::Address addr; - auto addr_str = conf.m_mapAddr.substr(0, pos); - if (!addr.FromString(addr_str)) - { - llarp::LogError(Name() + " cannot map invalid address ", addr_str); - return false; - } - auto ip_str = conf.m_mapAddr.substr(pos + 1); - huint32_t ip; - huint128_t ipv6; - if (ip.FromString(ip_str)) - { - ipv6 = net::ExpandV4(ip); - } - else if (ipv6.FromString(ip_str)) - { - } - else - { - llarp::LogError(Name(), "failed to map ", ip_str, " failed to parse IP"); - return false; - } - - if (not MapAddress(addr, ipv6, false)) + if (not MapAddress(item.second, item.first, false)) return false; } @@ -315,19 +237,9 @@ namespace llarp void TunEndpoint::Flush() { - static const auto func = [](auto self) { - self->FlushSend(); - self->m_ExitMap.ForEachValue([](const auto& exit) { exit->FlushUpstream(); }); - self->Pump(self->Now()); - }; - if (NetworkIsIsolated()) - { - LogicCall(RouterLogic(), std::bind(func, shared_from_this())); - } - else - { - func(this); - } + FlushSend(); + m_ExitMap.ForEachValue([r = Router()](const auto& exit) { exit->UpstreamFlush(r); }); + Pump(Now()); } static bool @@ -659,7 +571,7 @@ namespace llarp const auto blacklist = SnodeBlacklist(); m_ExitMap.ForEachValue([blacklist](const auto& exit) { for (const auto& snode : blacklist) - exit->BlacklistSnode(snode); + exit->BlacklistSNode(snode); }); return SetupNetworking(); } @@ -844,10 +756,7 @@ namespace llarp void TunEndpoint::Tick(llarp_time_t now) { - m_ExitMap.ForEachValue([&](const auto& exit) { - this->EnsureRouterIsKnown(exit->Endpoint()); - exit->Tick(now); - }); + m_ExitMap.ForEachValue([&](const auto& exit) { exit->Tick(now); }); Endpoint::Tick(now); } @@ -864,29 +773,41 @@ namespace llarp m_UserToNetworkPktQueue.Process([&](net::IPPacket& pkt) { std::function sendFunc; - huint128_t dst; + huint128_t dst, src; if (pkt.IsV4()) - dst = net::ExpandV4(pkt.dstv4()); + { + dst = pkt.dst4to6(); + src = pkt.src4to6(); + } else + { dst = pkt.dstv6(); - + src = pkt.srcv6(); + } auto itr = m_IPToAddr.find(dst); if (itr == m_IPToAddr.end()) { - const auto exits = m_ExitMap.FindAll(dst); - for (const auto& exit : exits) + if (IsBogon(dst) or not m_state->m_ExitNode.has_value()) { - if (pkt.IsV4() && !llarp::IsIPv4Bogon(pkt.dstv4())) + // send icmp unreachable + const auto icmp = pkt.MakeICMPUnreachable(); + if (icmp.has_value()) { - pkt.UpdateIPv4Address({0}, xhtonl(pkt.dstv4())); - exit->QueueUpstreamTraffic(std::move(pkt), llarp::routing::ExitPadSize); - } - else if (pkt.IsV6()) - { - pkt.UpdateIPv6Address({0}, pkt.dstv6()); - exit->QueueUpstreamTraffic(std::move(pkt), llarp::routing::ExitPadSize); + HandleWriteIPPacket(icmp->ConstBuffer(), dst, src); } } + else + { + pkt.ZeroSourceAddress(); + MarkAddressOutbound(*m_state->m_ExitNode); + EnsurePathToService( + *m_state->m_ExitNode, + [pkt, self = this](service::Address, service::OutboundContext*) { + self->SendToServiceOrQueue( + *self->m_state->m_ExitNode, pkt.ConstBuffer(), service::eProtocolExit); + }, + 1s); + } return; } if (m_SNodes.at(itr->second)) @@ -897,6 +818,15 @@ namespace llarp itr->second.as_array(), std::placeholders::_1); } + else if (m_state->m_ExitEnabled) + { + sendFunc = std::bind( + &TunEndpoint::SendToServiceOrQueue, + this, + service::Address(itr->second.as_array()), + std::placeholders::_1, + service::eProtocolExit); + } else { sendFunc = std::bind( @@ -908,11 +838,13 @@ namespace llarp } // prepare packet for insertion into network // this includes clearing IP addresses, recalculating checksums, etc - if (pkt.IsV4()) - pkt.UpdateIPv4Address({0}, {0}); - else - pkt.UpdateIPv6Address({0}, {0}); - + if (not m_state->m_ExitEnabled) + { + if (pkt.IsV4()) + pkt.UpdateIPv4Address({0}, {0}); + else + pkt.UpdateIPv6Address({0}, {0}); + } if (sendFunc && sendFunc(pkt.Buffer())) { MarkIPActive(dst); @@ -923,38 +855,65 @@ namespace llarp } bool - TunEndpoint::HandleWriteIPPacket( - const llarp_buffer_t& b, std::function getFromIP) + TunEndpoint::HandleInboundPacket( + const service::ConvoTag tag, const llarp_buffer_t& buf, service::ProtocolType t) + { + if (t != service::eProtocolTrafficV4 && t != service::eProtocolTrafficV6 + && t != service::eProtocolExit) + return false; + AlignedBuffer<32> addr; + bool snode = false; + if (!GetEndpointWithConvoTag(tag, addr, snode)) + return false; + huint128_t src, dst; + + net::IPPacket pkt; + if (not pkt.Load(buf)) + return false; + + if (m_state->m_ExitNode == service::Address{addr.as_array()} and t == service::eProtocolExit) + { + // client side from exit + if (pkt.IsV4()) + src = pkt.src4to6(); + else if (pkt.IsV6()) + src = pkt.srcv6(); + dst = m_OurIP; + } + else if (m_state->m_ExitEnabled) + { + // exit side from exit + src = ObtainIPForAddr(addr, snode); + if (pkt.IsV4()) + dst = pkt.dst4to6(); + else if (pkt.IsV6()) + dst = pkt.dstv6(); + } + else + { + // snapp traffic + src = ObtainIPForAddr(addr, snode); + dst = m_OurIP; + } + HandleWriteIPPacket(buf, src, dst); + return true; + } + + bool + TunEndpoint::HandleWriteIPPacket(const llarp_buffer_t& b, huint128_t src, huint128_t dst) { - // llarp::LogInfo("got packet from ", msg->sender.Addr()); - auto themIP = getFromIP(); - // llarp::LogInfo("themIP ", themIP); - auto usIP = m_OurIP; ManagedBuffer buf(b); - return m_NetworkToUserPktQueue.EmplaceIf([buf, themIP, usIP](net::IPPacket& pkt) -> bool { + return m_NetworkToUserPktQueue.EmplaceIf([buf, src, dst](net::IPPacket& pkt) -> bool { // load if (!pkt.Load(buf)) return false; - // filter out: - // - packets smaller than minimal IPv4 header - // - non-IPv4 packets - // - packets with weird src/dst addresses - // (0.0.0.0/8 but not 0.0.0.0) - // - packets with 0 src but non-0 dst and oposite if (pkt.IsV4()) { - auto hdr = pkt.Header(); - if (pkt.sz < sizeof(*hdr) || (hdr->saddr != 0 && *(byte_t*)&(hdr->saddr) == 0) - || (hdr->daddr != 0 && *(byte_t*)&(hdr->daddr) == 0) - || ((hdr->saddr == 0) != (hdr->daddr == 0))) - { - return false; - } - pkt.UpdateIPv4Address(xhtonl(net::TruncateV6(themIP)), xhtonl(net::TruncateV6(usIP))); + pkt.UpdateIPv4Address(xhtonl(net::TruncateV6(src)), xhtonl(net::TruncateV6(dst))); } else if (pkt.IsV6()) { - pkt.UpdateIPv6Address(themIP, usIP); + pkt.UpdateIPv6Address(src, dst); } return true; }); @@ -1060,34 +1019,21 @@ namespace llarp { // called in the isolated network thread auto* self = static_cast(tun->user); - auto _pkts = std::move(self->m_TunPkts); - self->m_TunPkts = std::vector(); - - LogicCall(self->EndpointLogic(), [tun, self, pkts = std::move(_pkts)]() { - for (auto& pkt : pkts) + self->FlushToUser([self, tun](net::IPPacket& pkt) -> bool { + if (not llarp_ev_tun_async_write(tun, pkt.Buffer())) { - self->m_UserToNetworkPktQueue.Emplace(pkt); + llarp::LogWarn(self->Name(), " packet dropped"); } - self->FlushToUser([self, tun](net::IPPacket& pkt) -> bool { - if (!llarp_ev_tun_async_write(tun, pkt.Buffer())) - { - llarp::LogWarn(self->Name(), " packet dropped"); - return true; - } - return false; - }); + return false; }); - } + } // namespace handlers void TunEndpoint::tunifRecvPkt(llarp_tun_io* tun, const llarp_buffer_t& b) { // called for every packet read from user in isolated network thread auto* self = static_cast(tun->user); - net::IPPacket pkt; - if (not pkt.Load(b)) - return; - self->m_TunPkts.emplace_back(pkt); + self->m_UserToNetworkPktQueue.EmplaceIf([&](net::IPPacket& pkt) { return pkt.Load(b); }); } TunEndpoint::~TunEndpoint() = default; diff --git a/llarp/handlers/tun.hpp b/llarp/handlers/tun.hpp index 07c2c903e..148cc37f9 100644 --- a/llarp/handlers/tun.hpp +++ b/llarp/handlers/tun.hpp @@ -33,6 +33,9 @@ namespace llarp bool Configure(const NetworkConfig& conf, const DnsConfig& dnsConf) override; + void + SendPacketToRemote(const llarp_buffer_t&) override{}; + void Tick(llarp_time_t now) override; @@ -81,21 +84,11 @@ namespace llarp /// overrides Endpoint bool HandleInboundPacket( - const service::ConvoTag tag, const llarp_buffer_t& pkt, service::ProtocolType t) override - { - if (t != service::eProtocolTrafficV4 && t != service::eProtocolTrafficV6) - return false; - AlignedBuffer<32> addr; - bool snode = false; - if (!GetEndpointWithConvoTag(tag, addr, snode)) - return false; - return HandleWriteIPPacket( - pkt, [=]() -> huint128_t { return ObtainIPForAddr(addr, snode); }); - } + const service::ConvoTag tag, const llarp_buffer_t& pkt, service::ProtocolType t) override; /// handle inbound traffic bool - HandleWriteIPPacket(const llarp_buffer_t& buf, std::function getFromIP); + HandleWriteIPPacket(const llarp_buffer_t& buf, huint128_t src, huint128_t dst); /// queue outbound packet to the world bool @@ -192,8 +185,6 @@ namespace llarp net::IPPacket::CompareOrder, net::IPPacket::GetNow>; - /// queue packet for send on net thread from user - std::vector m_TunPkts; /// queue for sending packets over the network from us PacketQueue_t m_UserToNetworkPktQueue; /// queue for sending packets to user from network @@ -232,35 +223,6 @@ namespace llarp return nullptr; } - bool - QueueInboundPacketForExit(const llarp_buffer_t& buf) - { - ManagedBuffer copy{buf}; - return m_NetworkToUserPktQueue.EmplaceIf([&](llarp::net::IPPacket& pkt) -> bool { - if (!pkt.Load(copy.underlying)) - return false; - if (SupportsV6()) - { - if (pkt.IsV4()) - { - pkt.UpdateIPv6Address(net::ExpandV4(pkt.srcv4()), m_OurIP); - } - else - { - pkt.UpdateIPv6Address(pkt.srcv6(), m_OurIP); - } - } - else - { - if (pkt.IsV4()) - pkt.UpdateIPv4Address(xhtonl(pkt.srcv4()), xhtonl(net::TruncateV6(m_OurIP))); - else - return false; - } - return true; - }); - } - template void SendDNSReply( diff --git a/llarp/net/ip_packet.cpp b/llarp/net/ip_packet.cpp index a66a22c1c..ce0b8f267 100644 --- a/llarp/net/ip_packet.cpp +++ b/llarp/net/ip_packet.cpp @@ -84,32 +84,32 @@ namespace llarp return huint32_t{ntohl(Header()->daddr)}; } -#if 0 - static uint32_t - ipchksum_pseudoIPv4(nuint32_t src_ip, nuint32_t dst_ip, uint8_t proto, - uint16_t innerlen) + huint128_t + IPPacket::dst4to6() const { -#define IPCS(x) ((uint32_t)(x & 0xFFff) + (uint32_t)(x >> 16)) - uint32_t sum = IPCS(src_ip.n) + IPCS(dst_ip.n) + (uint32_t)proto - + (uint32_t)htons(innerlen); -#undef IPCS - return sum; + return ExpandV4(dstv4()); + } + + huint128_t + IPPacket::src4to6() const + { + return ExpandV4(srcv4()); } static uint16_t - ipchksum(const byte_t *buf, size_t sz, uint32_t sum = 0) + ipchksum(const byte_t* buf, size_t sz, uint32_t sum = 0) { - while(sz > 1) + while (sz > 1) { - sum += *(const uint16_t *)buf; + sum += *(const uint16_t*)buf; sz -= sizeof(uint16_t); buf += sizeof(uint16_t); } - if(sz != 0) + if (sz != 0) { uint16_t x = 0; - *(byte_t *)&x = *(const byte_t *)buf; + *(byte_t*)&x = *(const byte_t*)buf; sum += x; } @@ -120,7 +120,6 @@ namespace llarp return uint16_t((~sum) & 0xFFff); } -#endif #define ADD32CS(x) ((uint32_t)(x & 0xFFff) + (uint32_t)(x >> 16)) #define SUB32CS(x) ((uint32_t)((~x) & 0xFFff) + (uint32_t)((~x) >> 16)) @@ -419,5 +418,83 @@ namespace llarp break; } } + + void + IPPacket::ZeroAddresses() + { + if (IsV4()) + { + UpdateIPv4Address({0}, {0}); + } + else if (IsV6()) + { + UpdateIPv6Address({0}, {0}); + } + } + + void + IPPacket::ZeroSourceAddress() + { + if (IsV4()) + { + UpdateIPv4Address({0}, xhtonl(dstv4())); + } + else if (IsV6()) + { + UpdateIPv6Address({0}, {ntoh128(dstv6().h)}); + } + } + std::optional + IPPacket::MakeICMPUnreachable() const + { + if (IsV4()) + { + constexpr auto icmp_Header_size = 8; + constexpr auto ip_Header_size = 20; + net::IPPacket pkt{}; + auto* pkt_Header = pkt.Header(); + + pkt_Header->version = 4; + pkt_Header->ihl = 0x05; + pkt_Header->tos = 0; + pkt_Header->check = 0; + pkt_Header->tot_len = ntohs(icmp_Header_size + ip_Header_size); + pkt_Header->saddr = Header()->daddr; + pkt_Header->daddr = Header()->saddr; + pkt_Header->protocol = 1; // ICMP + pkt_Header->ttl = 1; + pkt_Header->frag_off = htons(0b0100000000000000); + // size pf ip header + const size_t l3_HeaderSize = Header()->ihl * 4; + // size of l4 packet to reflect back + const size_t l4_PacketSize = 8; + pkt_Header->tot_len += ntohs(l4_PacketSize + l3_HeaderSize); + + uint16_t* checksum; + uint8_t* itr = pkt.buf + (pkt_Header->ihl * 4); + uint8_t* icmp_begin = itr; // type 'destination unreachable' + *itr++ = 3; + // code 'Destination host unknown error' + *itr++ = 7; + // checksum + unused + htobe32buf(itr, 0); + checksum = (uint16_t*)itr; + itr += 4; + // next hop mtu is ignored but let's put something here anyways just in case tm + htobe16buf(itr, 1500); + itr += 2; + // copy ip header and first 8 bytes of datagram for icmp rject + std::copy_n(buf, l4_PacketSize + l3_HeaderSize, itr); + itr += l4_PacketSize + l3_HeaderSize; + // calculate checksum of ip header + pkt_Header->check = ipchksum(pkt.buf, pkt_Header->ihl * 4); + const auto icmp_size = std::distance(icmp_begin, itr); + // calculate icmp checksum + *checksum = ipchksum(icmp_begin, icmp_size); + pkt.sz = ntohs(pkt_Header->tot_len); + return pkt; + } + return std::nullopt; + } } // namespace net } // namespace llarp diff --git a/llarp/net/ip_packet.hpp b/llarp/net/ip_packet.hpp index b11964f19..138657f61 100644 --- a/llarp/net/ip_packet.hpp +++ b/llarp/net/ip_packet.hpp @@ -241,6 +241,17 @@ namespace llarp void UpdateIPv6Address(huint128_t src, huint128_t dst); + /// set addresses to zero and recacluate checksums + void + ZeroAddresses(); + + /// zero out source address + void + ZeroSourceAddress(); + + /// make an icmp unreachable reply packet based of this ip packet + std::optional + MakeICMPUnreachable() const; }; } // namespace net diff --git a/llarp/net/net.cpp b/llarp/net/net.cpp index 50edeb613..5367f1438 100644 --- a/llarp/net/net.cpp +++ b/llarp/net/net.cpp @@ -596,6 +596,15 @@ namespace llarp #endif } + bool + IsBogon(const huint128_t ip) + { + const nuint128_t netIP{ntoh128(ip.h)}; + in6_addr addr{}; + std::copy_n((const uint8_t*)&netIP.n, 16, &addr.s6_addr[0]); + return IsBogon(addr); + } + bool IsBogonRange(const in6_addr& host, const in6_addr&) { diff --git a/llarp/net/net.hpp b/llarp/net/net.hpp index 069c48648..efe137a66 100644 --- a/llarp/net/net.hpp +++ b/llarp/net/net.hpp @@ -94,6 +94,9 @@ namespace llarp bool IsBogon(const in6_addr& addr); + bool + IsBogon(const huint128_t addr); + bool IsBogonRange(const in6_addr& host, const in6_addr& mask); diff --git a/llarp/path/pathset.hpp b/llarp/path/pathset.hpp index 13a07f432..10215e9df 100644 --- a/llarp/path/pathset.hpp +++ b/llarp/path/pathset.hpp @@ -197,6 +197,9 @@ namespace llarp return false; } + virtual void + BlacklistSNode(const RouterID) = 0; + /// override me in subtype virtual bool HandleGotIntroMessage(std::shared_ptr) { @@ -262,6 +265,9 @@ namespace llarp virtual bool BuildOneAlignedTo(const RouterID endpoint) = 0; + virtual void + SendPacketToRemote(const llarp_buffer_t& pkt) = 0; + void ForEachPath(std::function visit) const { diff --git a/llarp/path/transit_hop.cpp b/llarp/path/transit_hop.cpp index 89d5e19a9..0dc691c0b 100644 --- a/llarp/path/transit_hop.cpp +++ b/llarp/path/transit_hop.cpp @@ -210,7 +210,7 @@ namespace llarp FlushDownstream(r); for (const auto& other : m_FlushOthers) { - other->FlushUpstream(r); + other->FlushDownstream(r); } m_FlushOthers.clear(); } diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index d7fd827fb..d9279c753 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -636,8 +636,7 @@ namespace llarp std::set Endpoint::GetExitRouters() const { - return m_ExitMap.TransformValues( - [](const exit::BaseSession_ptr& ptr) -> RouterID { return ptr->Endpoint(); }); + return {}; } void @@ -850,7 +849,9 @@ namespace llarp bool Endpoint::ProcessDataMessage(std::shared_ptr msg) { - if (msg->proto == eProtocolTrafficV4 || msg->proto == eProtocolTrafficV6) + if ((msg->proto == eProtocolExit + && (m_state->m_ExitEnabled || msg->sender.Addr() == m_state->m_ExitNode)) + || msg->proto == eProtocolTrafficV4 || msg->proto == eProtocolTrafficV6) { util::Lock l(m_state->m_InboundTrafficQueueMutex); m_state->m_InboundTrafficQueue.emplace(msg); @@ -971,7 +972,6 @@ namespace llarp static constexpr size_t NumParallelLookups = 2; /// how many requests per router static constexpr size_t RequestsPerLookup = 2; - LogInfo(Name(), " Ensure Path to ", remote.ToString()); MarkAddressOutbound(remote); @@ -1327,6 +1327,12 @@ namespace llarp return m_state->m_Router; } + void + Endpoint::BlacklistSNode(const RouterID snode) + { + m_state->m_SnodeBlacklist.insert(snode); + } + const std::set& Endpoint::SnodeBlacklist() const { diff --git a/llarp/service/endpoint.hpp b/llarp/service/endpoint.hpp index efa45b26a..d09fc711e 100644 --- a/llarp/service/endpoint.hpp +++ b/llarp/service/endpoint.hpp @@ -261,6 +261,9 @@ namespace llarp return false; } + void + BlacklistSNode(const RouterID snode) override; + /// return true if we have a convotag as an exit session /// or as a hidden service session /// set addr and issnode @@ -425,7 +428,7 @@ namespace llarp protected: IDataHandler* m_DataHandler = nullptr; Identity m_Identity; - net::IPRangeMap m_ExitMap; + net::IPRangeMap m_ExitMap; hooks::Backend_ptr m_OnUp; hooks::Backend_ptr m_OnDown; hooks::Backend_ptr m_OnReady; @@ -445,8 +448,10 @@ namespace llarp const ConvoMap& Sessions() const; ConvoMap& Sessions(); // clang-format on - + protected: std::unique_ptr m_state; + + private: thread::Queue m_RecvQueue; }; diff --git a/llarp/service/endpoint_state.cpp b/llarp/service/endpoint_state.cpp index d7c02a382..7b8421e07 100644 --- a/llarp/service/endpoint_state.cpp +++ b/llarp/service/endpoint_state.cpp @@ -15,7 +15,8 @@ namespace llarp { m_Keyfile = conf.m_keyfile; m_SnodeBlacklist = conf.m_snodeBlacklist; - + m_ExitEnabled = conf.m_AllowExit; + m_ExitNode = conf.m_exitNode; // TODO: /* if (k == "on-up") diff --git a/llarp/service/endpoint_state.hpp b/llarp/service/endpoint_state.hpp index 2b5448c98..53ba6fd55 100644 --- a/llarp/service/endpoint_state.hpp +++ b/llarp/service/endpoint_state.hpp @@ -52,6 +52,8 @@ namespace llarp std::string m_Keyfile; std::string m_Name; std::string m_NetNS; + bool m_ExitEnabled = false; + std::optional m_ExitNode; util::Mutex m_SendQueueMutex; // protects m_SendQueue std::deque m_SendQueue GUARDED_BY(m_SendQueueMutex); diff --git a/llarp/service/outbound_context.cpp b/llarp/service/outbound_context.cpp index dd17bc99c..d35cf329c 100644 --- a/llarp/service/outbound_context.cpp +++ b/llarp/service/outbound_context.cpp @@ -54,7 +54,7 @@ namespace llarp } OutboundContext::OutboundContext(const IntroSet& introset, Endpoint* parent) - : path::Builder(parent->Router(), 4, path::default_len) + : path::Builder(parent->Router(), 4, parent->numHops) , SendContext(introset.A, {}, this, parent) , location(introset.A.Addr().ToKey()) , currentIntroSet(introset) @@ -271,6 +271,17 @@ namespace llarp if (m_LookupFails > 16 || m_BuildFails > 10) return true; + constexpr auto InboundTrafficTimeout = 5s; + + if (m_GotInboundTraffic and m_LastInboundTraffic + InboundTrafficTimeout <= now) + { + if (std::chrono::abs(now - lastGoodSend) < InboundTrafficTimeout) + { + // timeout on other side + MarkCurrentIntroBad(now); + } + } + // check for expiration if (remoteIntro.ExpiresSoon(now)) { @@ -531,9 +542,17 @@ namespace llarp bool OutboundContext::HandleHiddenServiceFrame(path::Path_ptr p, const ProtocolFrame& frame) { + m_LastInboundTraffic = m_Endpoint->Now(); + m_GotInboundTraffic = true; return m_Endpoint->HandleHiddenServiceFrame(p, frame); } + void + OutboundContext::SendPacketToRemote(const llarp_buffer_t& buf) + { + AsyncEncryptAndSendTo(buf, eProtocolExit); + } + } // namespace service } // namespace llarp diff --git a/llarp/service/outbound_context.hpp b/llarp/service/outbound_context.hpp index 03a79406c..32ab540f8 100644 --- a/llarp/service/outbound_context.hpp +++ b/llarp/service/outbound_context.hpp @@ -21,11 +21,15 @@ namespace llarp public std::enable_shared_from_this { OutboundContext(const IntroSet& introSet, Endpoint* parent); + ~OutboundContext() override; util::StatusObject ExtractStatus() const; + void + BlacklistSNode(const RouterID) override{}; + bool ShouldBundleRC() const override; @@ -67,6 +71,10 @@ namespace llarp bool ReadyToSend() const; + /// for exits + void + SendPacketToRemote(const llarp_buffer_t&) override; + bool ShouldBuildMore(llarp_time_t now) const override; @@ -128,6 +136,8 @@ namespace llarp llarp_time_t lastShift = 0s; uint16_t m_LookupFails = 0; uint16_t m_BuildFails = 0; + llarp_time_t m_LastInboundTraffic = 0s; + bool m_GotInboundTraffic = false; }; } // namespace service diff --git a/llarp/service/protocol_type.hpp b/llarp/service/protocol_type.hpp index 4a52ec028..8fb3e9432 100644 --- a/llarp/service/protocol_type.hpp +++ b/llarp/service/protocol_type.hpp @@ -9,5 +9,5 @@ namespace llarp::service constexpr ProtocolType eProtocolControl = 0UL; constexpr ProtocolType eProtocolTrafficV4 = 1UL; constexpr ProtocolType eProtocolTrafficV6 = 2UL; - + constexpr ProtocolType eProtocolExit = 3UL; } // namespace llarp::service diff --git a/llarp/util/aligned.hpp b/llarp/util/aligned.hpp index eb462770f..b9f0fe0c1 100644 --- a/llarp/util/aligned.hpp +++ b/llarp/util/aligned.hpp @@ -28,13 +28,7 @@ namespace llarp { /// aligned buffer that is sz bytes long and aligns to the nearest Alignment template -#ifdef _WIN32 - // We CANNOT align on a 128-bit boundary, malloc(3C) on win32 - // only hands out 64-bit aligned pointers struct alignas(uint64_t) AlignedBuffer -#else - struct alignas(std::max_align_t) AlignedBuffer -#endif { static_assert( sz >= 8, From 3eb006f78ce03659ae838cff9ce61559e952265a Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Thu, 21 May 2020 10:20:47 -0400 Subject: [PATCH 02/40] iwp/libuv cleanup: remove llarp_pkt_list and all users of it as it's broke --- llarp/ev/ev.cpp | 7 +--- llarp/ev/ev.h | 9 ----- llarp/ev/ev.hpp | 68 ------------------------------------ llarp/ev/ev_libuv.cpp | 47 ++++++++----------------- llarp/iwp/message_buffer.cpp | 1 + llarp/iwp/session.cpp | 10 +----- llarp/link/server.cpp | 26 +++++--------- llarp/link/session.hpp | 2 +- 8 files changed, 27 insertions(+), 143 deletions(-) diff --git a/llarp/ev/ev.cpp b/llarp/ev/ev.cpp index 364f5e19b..50b2ee6f2 100644 --- a/llarp/ev/ev.cpp +++ b/llarp/ev/ev.cpp @@ -26,12 +26,7 @@ llarp_make_ev_loop() void llarp_ev_loop_run_single_process(llarp_ev_loop_ptr ev, std::shared_ptr logic) { - while (ev->running()) - { - ev->update_time(); - ev->tick(EV_TICK_INTERVAL); - llarp::LogContext::Instance().logStream->Tick(ev->time_now()); - } + ev->run(); logic->clear_event_loop(); ev->stopped(); } diff --git a/llarp/ev/ev.h b/llarp/ev/ev.h index 8733fad8f..98f5168ff 100644 --- a/llarp/ev/ev.h +++ b/llarp/ev/ev.h @@ -62,10 +62,6 @@ llarp_ev_loop_time_now_ms(const llarp_ev_loop_ptr& ev); void llarp_ev_loop_stop(const llarp_ev_loop_ptr& ev); -/// list of packets we recv'd -/// forward declared -struct llarp_pkt_list; - /// UDP handling configuration struct llarp_udp_io { @@ -83,11 +79,6 @@ struct llarp_udp_io int (*sendto)(struct llarp_udp_io*, const llarp::SockAddr&, const byte_t*, size_t); }; -/// get all packets recvieved last tick -/// return true if we got packets return false if we didn't -bool -llarp_ev_udp_recvmany(struct llarp_udp_io* udp, struct llarp_pkt_list* pkts); - /// add UDP handler int llarp_ev_add_udp(struct llarp_ev_loop* ev, struct llarp_udp_io* udp, const llarp::SockAddr& src); diff --git a/llarp/ev/ev.hpp b/llarp/ev/ev.hpp index fe53129b3..ee9789e51 100644 --- a/llarp/ev/ev.hpp +++ b/llarp/ev/ev.hpp @@ -812,72 +812,4 @@ struct llarp_ev_loop call_soon(std::function f) = 0; }; -struct PacketBuffer -{ - PacketBuffer(PacketBuffer&& other) - { - _ptr = other._ptr; - _sz = other._sz; - other._ptr = nullptr; - other._sz = 0; - } - - PacketBuffer(const PacketBuffer&) = delete; - - PacketBuffer& - operator=(const PacketBuffer&) = delete; - - PacketBuffer() : PacketBuffer(nullptr, 0){}; - explicit PacketBuffer(size_t sz) : _sz{sz} - { - _ptr = new char[sz]; - } - PacketBuffer(char* buf, size_t sz) - { - _ptr = buf; - _sz = sz; - } - ~PacketBuffer() - { - if (_ptr) - delete[] _ptr; - } - byte_t* - data() - { - return (byte_t*)_ptr; - } - size_t - size() - { - return _sz; - } - byte_t& operator[](size_t sz) - { - return data()[sz]; - } - void - reserve(size_t sz) - { - if (_ptr) - delete[] _ptr; - _ptr = new char[sz]; - _sz = sz; - } - - private: - char* _ptr = nullptr; - size_t _sz = 0; -}; - -struct PacketEvent -{ - llarp::SockAddr remote; - PacketBuffer pkt; -}; - -struct llarp_pkt_list : public std::vector -{ -}; - #endif diff --git a/llarp/ev/ev_libuv.cpp b/llarp/ev/ev_libuv.cpp index 21225b8bc..a674e2627 100644 --- a/llarp/ev/ev_libuv.cpp +++ b/llarp/ev/ev_libuv.cpp @@ -397,8 +397,6 @@ namespace libuv uv_check_t m_Ticker; llarp_udp_io* const m_UDP; llarp::SockAddr m_Addr; - llarp_pkt_list m_LastPackets; - std::array m_Buffer; udp_glue(uv_loop_t* loop, llarp_udp_io* udp, const llarp::SockAddr& src) : m_UDP(udp), m_Addr(src) @@ -424,16 +422,7 @@ namespace libuv udp_glue* glue = static_cast(handle->data); if (addr) glue->RecvFrom(nread, buf, llarp::SockAddr(*addr)); - if (nread <= 0 || glue->m_UDP == nullptr || glue->m_UDP->recvfrom != nullptr) - delete[] buf->base; - } - - bool - RecvMany(llarp_pkt_list* pkts) - { - *pkts = std::move(m_LastPackets); - m_LastPackets = llarp_pkt_list(); - return pkts->size() > 0; + delete[] buf->base; } void @@ -447,11 +436,6 @@ namespace libuv const llarp_buffer_t pkt((const byte_t*)buf->base, pktsz); m_UDP->recvfrom(m_UDP, fromaddr, ManagedBuffer{pkt}); } - else - { - PacketBuffer pbuf(buf->base, pktsz); - m_LastPackets.emplace_back(PacketEvent{fromaddr, std::move(pbuf)}); - } } } @@ -760,9 +744,11 @@ namespace libuv OnAsyncWake(uv_async_t* async_handle) { Loop* loop = static_cast(async_handle->data); + loop->update_time(); loop->process_timer_queue(); loop->process_cancel_queue(); loop->FlushLogic(); + llarp::LogContext::Instance().logStream->Tick(loop->time_now()); } Loop::Loop() : llarp_ev_loop(), m_LogicCalls(1024), m_timerQueue(20), m_timerCancelQueue(20) @@ -787,12 +773,13 @@ namespace libuv #endif m_TickTimer = new uv_timer_t; m_TickTimer->data = this; + if (uv_timer_init(&m_Impl, m_TickTimer) == -1) + return false; m_Run.store(true); m_nextID.store(0); - m_WakeUp.data = this; uv_async_init(&m_Impl, &m_WakeUp, &OnAsyncWake); - return uv_timer_init(&m_Impl, m_TickTimer) != -1; + return true; } void @@ -829,6 +816,11 @@ namespace libuv int Loop::run() { + uv_timer_start( + m_TickTimer, + [](uv_timer_t* t) { static_cast(t->loop->data)->FlushLogic(); }, + 1000, + 1000); return uv_run(&m_Impl, UV_RUN_DEFAULT); } @@ -923,11 +915,7 @@ namespace libuv while (not m_timerCancelQueue.empty()) { uint64_t job_id = m_timerCancelQueue.popFront(); - auto itr = m_pendingCalls.find(job_id); - if (itr != m_pendingCalls.end()) - { - m_pendingCalls.erase(itr); - } + m_pendingCalls.erase(job_id); } } @@ -937,8 +925,9 @@ namespace libuv auto itr = m_pendingCalls.find(job_id); if (itr != m_pendingCalls.end()) { - LogicCall(m_Logic, itr->second); - m_pendingCalls.erase(itr); + if (itr->second) + itr->second(); + m_pendingCalls.erase(itr->first); } } @@ -1063,9 +1052,3 @@ namespace libuv } } // namespace libuv - -bool -llarp_ev_udp_recvmany(struct llarp_udp_io* u, struct llarp_pkt_list* pkts) -{ - return static_cast(u->impl)->RecvMany(pkts); -} diff --git a/llarp/iwp/message_buffer.cpp b/llarp/iwp/message_buffer.cpp index 67d137c83..cda8477ab 100644 --- a/llarp/iwp/message_buffer.cpp +++ b/llarp/iwp/message_buffer.cpp @@ -19,6 +19,7 @@ namespace llarp { const llarp_buffer_t buf(m_Data); CryptoManager::instance()->shorthash(m_Digest, buf); + m_Acks.set(0); } ILinkSession::Packet_t diff --git a/llarp/iwp/session.cpp b/llarp/iwp/session.cpp index c0d6265bc..aaa44b5ca 100644 --- a/llarp/iwp/session.cpp +++ b/llarp/iwp/session.cpp @@ -143,10 +143,8 @@ namespace llarp Session::EncryptWorker(CryptoQueue_ptr msgs) { LogDebug("encrypt worker ", msgs->size(), " messages"); - auto itr = msgs->begin(); - while (itr != msgs->end()) + for (auto& pkt : *msgs) { - Packet_t pkt = std::move(*itr); llarp_buffer_t pktbuf(pkt); const TunnelNonce nonce_ptr{pkt.data() + HMACSIZE}; pktbuf.base += PacketOverhead; @@ -157,7 +155,6 @@ namespace llarp pktbuf.sz = pkt.size() - HMACSIZE; CryptoManager::instance()->hmac(pkt.data(), pktbuf, m_SessionKey); Send_LL(pkt.data(), pkt.size()); - ++itr; } } @@ -911,11 +908,6 @@ namespace llarp return false; } } - else - { - // this case should never happen - ::abort(); - } break; case State::Introduction: if (m_Inbound) diff --git a/llarp/link/server.cpp b/llarp/link/server.cpp index 9c997aba7..1733a116b 100644 --- a/llarp/link/server.cpp +++ b/llarp/link/server.cpp @@ -116,7 +116,13 @@ namespace llarp { m_Loop = loop; m_udp.user = this; - m_udp.recvfrom = nullptr; + m_udp.recvfrom = [](llarp_udp_io* udp, const llarp::SockAddr& from, ManagedBuffer pktbuf) { + ILinkSession::Packet_t pkt; + auto& buf = pktbuf.underlying; + pkt.resize(buf.sz); + std::copy_n(buf.base, buf.sz, pkt.data()); + static_cast(udp->user)->RecvFrom(from, std::move(pkt)); + }; m_udp.tick = &ILinkLayer::udp_tick; if (ifname == "*") { @@ -495,23 +501,7 @@ namespace llarp ILinkLayer::udp_tick(llarp_udp_io* udp) { ILinkLayer* link = static_cast(udp->user); - auto pkts = std::make_shared(); - llarp_ev_udp_recvmany(&link->m_udp, pkts.get()); - auto logic = link->logic(); - if (logic == nullptr) - return; - LogicCall(logic, [pkts, link]() { - auto itr = pkts->begin(); - while (itr != pkts->end()) - { - if (link->m_RecentlyClosed.find(itr->remote) == link->m_RecentlyClosed.end()) - { - link->RecvFrom(itr->remote, std::move(itr->pkt)); - } - ++itr; - } - link->Pump(); - }); + link->Pump(); } } // namespace llarp diff --git a/llarp/link/session.hpp b/llarp/link/session.hpp index 941ec61c0..cb8920acf 100644 --- a/llarp/link/session.hpp +++ b/llarp/link/session.hpp @@ -45,7 +45,7 @@ namespace llarp /// message delivery result hook function using CompletionHandler = std::function; - using Packet_t = PacketBuffer; + using Packet_t = std::vector; using Message_t = std::vector; /// send a message buffer to the remote endpoint From 114bdd5ce5001d4b23ebf9c72e9446da2b3b5fc3 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Thu, 21 May 2020 10:22:34 -0400 Subject: [PATCH 03/40] add move and copy constructors/assigment operators to ip_address --- llarp/net/ip_address.cpp | 23 +++++++++++++++++++++++ llarp/net/ip_address.hpp | 26 +++++++++++++------------- 2 files changed, 36 insertions(+), 13 deletions(-) diff --git a/llarp/net/ip_address.cpp b/llarp/net/ip_address.cpp index 55513e0e9..a9688a8a4 100644 --- a/llarp/net/ip_address.cpp +++ b/llarp/net/ip_address.cpp @@ -9,6 +9,11 @@ namespace llarp setAddress(str); } + IpAddress::IpAddress(const IpAddress& other) + : m_empty(other.m_empty), m_ipAddress(other.m_ipAddress), m_port(other.m_port) + { + } + IpAddress::IpAddress(std::string_view str, std::optional port) { setAddress(str, port); @@ -24,6 +29,16 @@ namespace llarp m_empty = addr.isEmpty(); } + IpAddress& + IpAddress::operator=(IpAddress&& other) + { + m_ipAddress = std::move(other.m_ipAddress); + m_port = std::move(other.m_port); + m_empty = other.m_empty; + other.m_empty = false; + return *this; + } + IpAddress& IpAddress::operator=(const sockaddr& other) { @@ -38,6 +53,14 @@ namespace llarp return *this; } + IpAddress& + IpAddress::operator=(const IpAddress& other) + { + m_empty = other.m_empty; + m_ipAddress = other.m_ipAddress; + m_port = other.m_port; + return *this; + } std::optional IpAddress::getPort() const diff --git a/llarp/net/ip_address.hpp b/llarp/net/ip_address.hpp index 86cec4b0e..4b39e4a38 100644 --- a/llarp/net/ip_address.hpp +++ b/llarp/net/ip_address.hpp @@ -21,6 +21,10 @@ namespace llarp { /// Empty constructor. IpAddress() = default; + /// move construtor + IpAddress(IpAddress&&) = default; + /// copy construct + IpAddress(const IpAddress&); /// Constructor. Takes a string which can be an IPv4 or IPv6 address optionally followed by /// a colon and a port. @@ -51,6 +55,14 @@ namespace llarp IpAddress& operator=(const sockaddr& other); + /// move assignment + IpAddress& + operator=(IpAddress&& other); + + /// copy assignment + IpAddress& + operator=(const IpAddress& other); + /// Return the port. Returns -1 if no port has been provided. /// /// @return the port, if present @@ -119,19 +131,7 @@ namespace llarp std::size_t operator()(const IpAddress& address) const noexcept { - (void)address; - // throw std::runtime_error("FIXME: IpAddress::Hash"); // can't do this in operator(), - // apparently, so hopefully it's me that stumbles upon this (if not, sorry!) - return 0; - - /* - if(a.af() == AF_INET) - { - return a.port() ^ a.addr4()->s_addr; - } - static const uint8_t empty[16] = {0}; - return (a.af() + memcmp(a.addr6(), empty, 16)) ^ a.port(); - */ + return std::hash{}(address.toString()); } }; From 00143e63f42b46b69678309333141bce2d789d83 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Thu, 21 May 2020 10:23:54 -0400 Subject: [PATCH 04/40] put replay filters on transit hops to reduce retransmissions. --- llarp/path/ihophandler.cpp | 11 ++++++ llarp/path/ihophandler.hpp | 6 +++ llarp/path/path.cpp | 16 -------- llarp/path/path.hpp | 9 ----- llarp/path/path_context.cpp | 2 + llarp/util/decaying_hashset.hpp | 10 ++++- .../util/test_llarp_util_decaying_hashset.cpp | 39 +++++++++++++++++-- 7 files changed, 62 insertions(+), 31 deletions(-) diff --git a/llarp/path/ihophandler.cpp b/llarp/path/ihophandler.cpp index cfbe359ac..ea915c158 100644 --- a/llarp/path/ihophandler.cpp +++ b/llarp/path/ihophandler.cpp @@ -8,6 +8,8 @@ namespace llarp bool IHopHandler::HandleUpstream(const llarp_buffer_t& X, const TunnelNonce& Y, AbstractRouter*) { + if (not m_UpstreamReplayFilter.Insert(Y)) + return false; if (m_UpstreamQueue == nullptr) m_UpstreamQueue = std::make_shared(); m_UpstreamQueue->emplace_back(); @@ -22,6 +24,8 @@ namespace llarp bool IHopHandler::HandleDownstream(const llarp_buffer_t& X, const TunnelNonce& Y, AbstractRouter*) { + if (not m_DownstreamReplayFilter.Insert(Y)) + return false; if (m_DownstreamQueue == nullptr) m_DownstreamQueue = std::make_shared(); m_DownstreamQueue->emplace_back(); @@ -31,5 +35,12 @@ namespace llarp pkt.second = Y; return true; } + + void + IHopHandler::DecayFilters(llarp_time_t now) + { + m_UpstreamReplayFilter.Decay(now); + m_DownstreamReplayFilter.Decay(now); + } } // namespace path } // namespace llarp diff --git a/llarp/path/ihophandler.hpp b/llarp/path/ihophandler.hpp index 49ee1ba11..ac76d93c9 100644 --- a/llarp/path/ihophandler.hpp +++ b/llarp/path/ihophandler.hpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -30,6 +31,9 @@ namespace llarp virtual ~IHopHandler() = default; + void + DecayFilters(llarp_time_t now); + virtual bool Expired(llarp_time_t now) const = 0; @@ -70,6 +74,8 @@ namespace llarp uint64_t m_SequenceNum = 0; TrafficQueue_ptr m_UpstreamQueue; TrafficQueue_ptr m_DownstreamQueue; + util::DecayingHashSet m_UpstreamReplayFilter; + util::DecayingHashSet m_DownstreamReplayFilter; virtual void UpstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) = 0; diff --git a/llarp/path/path.cpp b/llarp/path/path.cpp index 1247640d6..74194d29d 100644 --- a/llarp/path/path.cpp +++ b/llarp/path/path.cpp @@ -58,19 +58,6 @@ namespace llarp EnterState(ePathBuilding, parent->Now()); } - bool - Path::HandleUpstream(const llarp_buffer_t& X, const TunnelNonce& Y, AbstractRouter* r) - - { - return m_UpstreamReplayFilter.Insert(Y) and IHopHandler::HandleUpstream(X, Y, r); - } - - bool - Path::HandleDownstream(const llarp_buffer_t& X, const TunnelNonce& Y, AbstractRouter* r) - { - return m_DownstreamReplayFilter.Insert(Y) and IHopHandler::HandleDownstream(X, Y, r); - } - void Path::SetBuildResultHook(BuildResultHookFunc func) { @@ -372,9 +359,6 @@ namespace llarp m_RXRate = 0; m_TXRate = 0; - m_UpstreamReplayFilter.Decay(now); - m_DownstreamReplayFilter.Decay(now); - if (_status == ePathBuilding) { if (buildStarted == 0s) diff --git a/llarp/path/path.hpp b/llarp/path/path.hpp index 42a61ed49..39814d727 100644 --- a/llarp/path/path.hpp +++ b/llarp/path/path.hpp @@ -26,8 +26,6 @@ #include #include -#include - namespace llarp { class Logic; @@ -282,11 +280,6 @@ namespace llarp void Rebuild(); - bool - HandleUpstream(const llarp_buffer_t& X, const TunnelNonce& Y, AbstractRouter*) override; - bool - HandleDownstream(const llarp_buffer_t& X, const TunnelNonce& Y, AbstractRouter*) override; - void Tick(llarp_time_t now, AbstractRouter* r); @@ -420,8 +413,6 @@ namespace llarp uint64_t m_ExitObtainTX = 0; PathStatus _status; PathRole _role; - util::DecayingHashSet m_UpstreamReplayFilter; - util::DecayingHashSet m_DownstreamReplayFilter; uint64_t m_LastRXRate = 0; uint64_t m_RXRate = 0; uint64_t m_LastTXRate = 0; diff --git a/llarp/path/path_context.cpp b/llarp/path/path_context.cpp index 30d074df8..8ea1cb1ef 100644 --- a/llarp/path/path_context.cpp +++ b/llarp/path/path_context.cpp @@ -308,6 +308,7 @@ namespace llarp auto itr = map.begin(); while (itr != map.end()) { + itr->second->DecayFilters(now); if (itr->second->Expired(now)) { m_Router->outboundMessageHandler().QueueRemoveEmptyPath(itr->first); @@ -323,6 +324,7 @@ namespace llarp auto itr = map.begin(); while (itr != map.end()) { + itr->second->DecayFilters(now); if (itr->second->Expired(now)) { itr = map.erase(itr); diff --git a/llarp/util/decaying_hashset.hpp b/llarp/util/decaying_hashset.hpp index 4778e175c..f74b3b22f 100644 --- a/llarp/util/decaying_hashset.hpp +++ b/llarp/util/decaying_hashset.hpp @@ -20,7 +20,7 @@ namespace llarp bool Contains(const Val_t& v) const { - return m_Values.find(v) != m_Values.end(); + return m_Values.count(v) != 0; } /// return true if inserted @@ -30,7 +30,7 @@ namespace llarp { if (now == 0s) now = llarp::time_now_ms(); - return m_Values.emplace(v, now).second; + return m_Values.try_emplace(v, now).second; } /// decay hashset entries @@ -56,6 +56,12 @@ namespace llarp return m_CacheInterval; } + bool + Empty() const + { + return m_Values.empty(); + } + void DecayInterval(Time_t interval) { diff --git a/test/util/test_llarp_util_decaying_hashset.cpp b/test/util/test_llarp_util_decaying_hashset.cpp index be6dff534..d8fc385f9 100644 --- a/test/util/test_llarp_util_decaying_hashset.cpp +++ b/test/util/test_llarp_util_decaying_hashset.cpp @@ -2,11 +2,42 @@ #include #include +TEST_CASE("Thrash DecayingHashSet", "[decaying-hashset]") +{ + static constexpr auto duration = 5s; + + static constexpr auto decayInterval = 50ms; + llarp::util::DecayingHashSet> hashset(decayInterval); + const llarp_time_t started = llarp::time_now_ms(); + const auto end = duration + started; + llarp_time_t nextDecay = started + decayInterval; + do + { + const auto now = llarp::time_now_ms(); + for (size_t i = 0; i < 500; i++) + { + llarp::AlignedBuffer<32> rando; + rando.Randomize(); + hashset.Insert(rando, now); + /// maybe reinsert to simulate filter hits + if (i % 20 == 0) + hashset.Insert(rando, now); + } + if (now >= nextDecay) + { + REQUIRE(not hashset.Empty()); + hashset.Decay(now); + nextDecay += decayInterval; + } + + } while (llarp::time_now_ms() <= end); +} + TEST_CASE("DecayingHashSet test decay static time", "[decaying-hashset]") { static constexpr auto timeout = 5s; - static constexpr auto now = 1s; - llarp::util::DecayingHashSet< llarp::RouterID > hashset(timeout); + static constexpr auto now = 1s; + llarp::util::DecayingHashSet hashset(timeout); const llarp::RouterID zero; REQUIRE(zero.IsZero()); REQUIRE(not hashset.Contains(zero)); @@ -23,8 +54,8 @@ TEST_CASE("DecayingHashSet test decay static time", "[decaying-hashset]") TEST_CASE("DecayingHashSet tset decay dynamic time", "[decaying-hashset]") { static constexpr llarp_time_t timeout = 5s; - const llarp_time_t now = llarp::time_now_ms(); - llarp::util::DecayingHashSet< llarp::RouterID > hashset(timeout); + const llarp_time_t now = llarp::time_now_ms(); + llarp::util::DecayingHashSet hashset(timeout); const llarp::RouterID zero; REQUIRE(zero.IsZero()); REQUIRE(not hashset.Contains(zero)); From 382e4215a86c651789146cb7cbb18fd37bd28628 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Thu, 21 May 2020 10:24:59 -0400 Subject: [PATCH 05/40] path testing interval increase to reduce bandwidth use --- llarp/constants/path.hpp | 4 ++-- llarp/path/path.cpp | 27 +++++++++------------------ 2 files changed, 11 insertions(+), 20 deletions(-) diff --git a/llarp/constants/path.hpp b/llarp/constants/path.hpp index d81281abe..92758507d 100644 --- a/llarp/constants/path.hpp +++ b/llarp/constants/path.hpp @@ -31,9 +31,9 @@ namespace llarp constexpr auto build_timeout = 30s; /// measure latency every this interval ms - constexpr auto latency_interval = 5s; + constexpr auto latency_interval = 20s; /// if a path is inactive for this amount of time it's dead - constexpr auto alive_timeout = 30s; + constexpr auto alive_timeout = latency_interval * 1.5; } // namespace path } // namespace llarp diff --git a/llarp/path/path.cpp b/llarp/path/path.cpp index 74194d29d..5bddc9566 100644 --- a/llarp/path/path.cpp +++ b/llarp/path/path.cpp @@ -378,9 +378,12 @@ namespace llarp // check to see if this path is dead if (_status == ePathEstablished) { - const auto dlt = now - m_LastLatencyTestTime; + auto dlt = now - m_LastLatencyTestTime; if (dlt > path::latency_interval && m_LastLatencyTestID == 0) { + // bail doing test if we are active + if (now - m_LastRecvMessage < path::latency_interval) + return; routing::PathLatencyMessage latency; latency.T = randint(); m_LastLatencyTestID = latency.T; @@ -389,24 +392,12 @@ namespace llarp FlushUpstream(r); return; } - if (m_LastRecvMessage > 0s && now > m_LastRecvMessage) - { - const auto delay = now - m_LastRecvMessage; - if (m_CheckForDead && m_CheckForDead(shared_from_this(), delay)) - { - LogWarn(Name(), " waited for ", dlt, " and path is unresponsive"); - r->routerProfiling().MarkPathFail(this); - EnterState(ePathTimeout, now); - } - } - else if (dlt >= path::alive_timeout && m_LastRecvMessage == 0s) + dlt = now - m_LastRecvMessage; + if (dlt >= path::alive_timeout) { - if (m_CheckForDead && m_CheckForDead(shared_from_this(), dlt)) - { - LogWarn(Name(), " waited for ", dlt, " and path looks dead"); - r->routerProfiling().MarkPathFail(this); - EnterState(ePathTimeout, now); - } + LogWarn(Name(), " waited for ", dlt, " and path looks dead"); + r->routerProfiling().MarkPathFail(this); + EnterState(ePathTimeout, now); } } } From a45f92dca76d739d3d1a62d9899fc35d04ef04fe Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Thu, 21 May 2020 10:25:46 -0400 Subject: [PATCH 06/40] use random good path for outbound traffic so that it uses an even spread accross all paths --- llarp/path/pathset.cpp | 29 +++++++++++++++++++++++++++++ llarp/path/pathset.hpp | 3 +++ llarp/service/sendcontext.cpp | 2 +- 3 files changed, 33 insertions(+), 1 deletion(-) diff --git a/llarp/path/pathset.cpp b/llarp/path/pathset.cpp index 0614e6cef..a1088c6c4 100644 --- a/llarp/path/pathset.cpp +++ b/llarp/path/pathset.cpp @@ -5,6 +5,8 @@ #include #include +#include + namespace llarp { namespace path @@ -161,6 +163,33 @@ namespace llarp return chosen; } + Path_ptr + PathSet::GetRandomPathByRouter(RouterID id, PathRole roles) const + { + Lock_t l(m_PathsMutex); + std::vector chosen; + auto itr = m_Paths.begin(); + while (itr != m_Paths.end()) + { + if (itr->second->IsReady() && itr->second->SupportsAnyRoles(roles)) + { + if (itr->second->Endpoint() == id) + { + chosen.emplace_back(itr->second); + } + } + ++itr; + } + if (chosen.empty()) + return nullptr; + size_t idx = 0; + if (chosen.size() >= 2) + { + idx = rand() % chosen.size(); + } + return chosen[idx]; + } + Path_ptr PathSet::GetByEndpointWithID(RouterID ep, PathID_t id) const { diff --git a/llarp/path/pathset.hpp b/llarp/path/pathset.hpp index 10215e9df..3c9a205da 100644 --- a/llarp/path/pathset.hpp +++ b/llarp/path/pathset.hpp @@ -230,6 +230,9 @@ namespace llarp Path_ptr GetNewestPathByRouter(RouterID router, PathRole roles = ePathRoleAny) const; + Path_ptr + GetRandomPathByRouter(RouterID router, PathRole roles = ePathRoleAny) const; + Path_ptr GetPathByID(PathID_t id) const; diff --git a/llarp/service/sendcontext.cpp b/llarp/service/sendcontext.cpp index 1415381c0..febc15305 100644 --- a/llarp/service/sendcontext.cpp +++ b/llarp/service/sendcontext.cpp @@ -74,7 +74,7 @@ namespace llarp f->T = currentConvoTag; f->S = ++sequenceNo; - auto path = m_PathSet->GetNewestPathByRouter(remoteIntro.router); + auto path = m_PathSet->GetRandomPathByRouter(remoteIntro.router); if (!path) { LogError(m_Endpoint->Name(), " cannot encrypt and send: no path for intro ", remoteIntro); From 3a776b3ed1e18522ee353762402cf7bb243e3983 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Thu, 21 May 2020 10:26:51 -0400 Subject: [PATCH 07/40] bounds checks and such --- llarp/config/config.cpp | 2 +- llarp/dht/txowner.hpp | 2 +- llarp/iwp/session.cpp | 1 + llarp/util/str.cpp | 8 ++++---- llarp/util/str.hpp | 2 +- 5 files changed, 8 insertions(+), 7 deletions(-) diff --git a/llarp/config/config.cpp b/llarp/config/config.cpp index 83721ec44..b60fe3e2e 100644 --- a/llarp/config/config.cpp +++ b/llarp/config/config.cpp @@ -272,7 +272,7 @@ namespace llarp m_upstreamDNS.push_back(parseDNSAddr(std::move(arg))); }); - conf.defineOption("dns", "bind", false, std::nullopt, [=](std::string arg) { + conf.defineOption("dns", "bind", false, "127.3.2.1:53", [=](std::string arg) { m_bind = parseDNSAddr(std::move(arg)); }); } diff --git a/llarp/dht/txowner.hpp b/llarp/dht/txowner.hpp index 23e337211..3f3d8e257 100644 --- a/llarp/dht/txowner.hpp +++ b/llarp/dht/txowner.hpp @@ -53,7 +53,7 @@ namespace llarp operator()(const TXOwner& o) const noexcept { std::size_t sz2; - memcpy(&sz2, &o.node[0], sizeof(std::size_t)); + memcpy(&sz2, o.node.data(), sizeof(std::size_t)); return o.txid ^ (sz2 << 1); } }; diff --git a/llarp/iwp/session.cpp b/llarp/iwp/session.cpp index aaa44b5ca..fa83673ec 100644 --- a/llarp/iwp/session.cpp +++ b/llarp/iwp/session.cpp @@ -753,6 +753,7 @@ namespace llarp { return; } + if (not itr->second.Verify()) { LogError("bad short xmit hash from ", m_RemoteAddr); diff --git a/llarp/util/str.cpp b/llarp/util/str.cpp index 8024437ad..1497611e8 100644 --- a/llarp/util/str.cpp +++ b/llarp/util/str.cpp @@ -80,13 +80,13 @@ namespace llarp } std::vector - split(std::string_view str, char delimiter) + split(const std::string_view str, char delimiter) { std::vector splits; - + const auto str_size = str.size(); size_t last = 0; size_t next = 0; - while (last < str.size() and next < std::string_view::npos) + while (last < str_size and next < std::string_view::npos) { next = str.find_first_of(delimiter, last); if (next > last) @@ -96,7 +96,7 @@ namespace llarp last = next; // advance to next non-delimiter - while (str[last] == delimiter) + while (str[last] == delimiter and last < str_size) last++; } else diff --git a/llarp/util/str.hpp b/llarp/util/str.hpp index 523134ebf..d88cdde51 100644 --- a/llarp/util/str.hpp +++ b/llarp/util/str.hpp @@ -42,7 +42,7 @@ namespace llarp /// @param delimiter is the character to split on /// @return a vector of std::string_views with the split words, excluding the delimeter std::vector - split(std::string_view str, char delimiter); + split(const std::string_view str, char delimiter); } // namespace llarp From c826d0a0b13b926917a9a7a84b5e51c6084289ab Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Thu, 21 May 2020 10:27:41 -0400 Subject: [PATCH 08/40] increase transit hop limits --- llarp/path/transit_hop.cpp | 2 +- llarp/router/i_outbound_message_handler.hpp | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/llarp/path/transit_hop.cpp b/llarp/path/transit_hop.cpp index 0dc691c0b..edae826c1 100644 --- a/llarp/path/transit_hop.cpp +++ b/llarp/path/transit_hop.cpp @@ -32,7 +32,7 @@ namespace llarp return stream; } - TransitHop::TransitHop() : m_UpstreamGather(128), m_DownstreamGather(128) + TransitHop::TransitHop() : m_UpstreamGather(512), m_DownstreamGather(512) { m_UpstreamGather.enable(); m_DownstreamGather.enable(); diff --git a/llarp/router/i_outbound_message_handler.hpp b/llarp/router/i_outbound_message_handler.hpp index 9ffa784a7..5f167153a 100644 --- a/llarp/router/i_outbound_message_handler.hpp +++ b/llarp/router/i_outbound_message_handler.hpp @@ -24,9 +24,9 @@ namespace llarp using SendStatusHandler = std::function; - static const size_t MAX_PATH_QUEUE_SIZE = 40; - static const size_t MAX_OUTBOUND_QUEUE_SIZE = 200; - static const size_t MAX_OUTBOUND_MESSAGES_PER_TICK = 20; + static const size_t MAX_PATH_QUEUE_SIZE = 100; + static const size_t MAX_OUTBOUND_QUEUE_SIZE = 1000; + static const size_t MAX_OUTBOUND_MESSAGES_PER_TICK = 500; struct IOutboundMessageHandler { From 328c7a398ea4b00d1459ffa1c36e91475ca77634 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Thu, 21 May 2020 10:38:04 -0400 Subject: [PATCH 09/40] make sure event loop owns logic so that logic call does not use seperate thread --- test/link/test_llarp_link.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/test/link/test_llarp_link.cpp b/test/link/test_llarp_link.cpp index b393a807c..c50f76976 100644 --- a/test/link/test_llarp_link.cpp +++ b/test/link/test_llarp_link.cpp @@ -155,6 +155,7 @@ struct LinkLayerTest : public test::LlarpTest RouterContact::Lifetime = 500ms; netLoop = llarp_make_ev_loop(); m_logic.reset(new Logic()); + netLoop->set_logic(m_logic); Alice.Setup(); Bob.Setup(); } From fbb7c0d6f94707506865286f3c69b17618affab4 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Thu, 21 May 2020 13:50:55 -0400 Subject: [PATCH 10/40] add forgotten file --- cmake/FindJemalloc.cmake | 57 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 cmake/FindJemalloc.cmake diff --git a/cmake/FindJemalloc.cmake b/cmake/FindJemalloc.cmake new file mode 100644 index 000000000..36468861d --- /dev/null +++ b/cmake/FindJemalloc.cmake @@ -0,0 +1,57 @@ +# +# Find the JEMALLOC client includes and library +# + +# This module defines +# JEMALLOC_INCLUDE_DIR, where to find jemalloc.h +# JEMALLOC_LIBRARIES, the libraries to link against +# JEMALLOC_FOUND, if false, you cannot build anything that requires JEMALLOC + +# also defined, but not for general use are +# JEMALLOC_LIBRARY, where to find the JEMALLOC library. + +set( JEMALLOC_FOUND 0 ) + +if ( UNIX ) + FIND_PATH( JEMALLOC_INCLUDE_DIR + NAMES + jemalloc/jemalloc.h + PATHS + /usr/include + /usr/include/jemalloc + /usr/local/include + /usr/local/include/jemalloc + $ENV{JEMALLOC_ROOT} + $ENV{JEMALLOC_ROOT}/include + DOC + "Specify include-directories that might contain jemalloc.h here." + ) + FIND_LIBRARY( JEMALLOC_LIBRARY + NAMES + jemalloc libjemalloc JEMALLOC + PATHS + /usr/lib + /usr/lib/jemalloc + /usr/local/lib + /usr/local/lib/jemalloc + /usr/local/jemalloc/lib + $ENV{JEMALLOC_ROOT}/lib + $ENV{JEMALLOC_ROOT} + DOC "Specify library-locations that might contain the jemalloc library here." + ) + + if ( JEMALLOC_LIBRARY ) + if ( JEMALLOC_INCLUDE_DIR ) + set( JEMALLOC_FOUND 1 ) + message( STATUS "Found JEMALLOC library: ${JEMALLOC_LIBRARY}") + message( STATUS "Found JEMALLOC headers: ${JEMALLOC_INCLUDE_DIR}") + else ( JEMALLOC_INCLUDE_DIR ) + message(FATAL_ERROR "Could not find jemalloc headers! Please install jemalloc libraries and headers") + endif ( JEMALLOC_INCLUDE_DIR ) + endif ( JEMALLOC_LIBRARY ) + add_library(jemalloc SHARED IMPORTED) + set_target_properties(jemalloc PROPERTIES + INTERFACE_INCLUDE_DIRECTORIES "${JEMALLOC_INCLUDE_DUR}" + IMPORTED_LOCATION "${JEMALLOC_LIBRARY}") + mark_as_advanced( JEMALLOC_FOUND JEMALLOC_LIBRARY JEMALLOC_EXTRA_LIBRARIES JEMALLOC_INCLUDE_DIR ) +endif (UNIX) From e9d1a61053f7562c6abf0b1c6d26efcaa8727f90 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Fri, 22 May 2020 10:22:27 -0400 Subject: [PATCH 11/40] don't recover from sanitizer errors --- CMakeLists.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 5a52d6776..9e3c3d25a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -115,9 +115,9 @@ if (NOT CMAKE_SYSTEM_NAME MATCHES "Linux" AND SHADOW) endif() if(XSAN) - string(APPEND CMAKE_CXX_FLAGS_DEBUG " -fsanitize=${XSAN} -fno-omit-frame-pointer") + string(APPEND CMAKE_CXX_FLAGS_DEBUG " -fsanitize=${XSAN} -fno-omit-frame-pointer -fno-sanitize-recover") foreach(type EXE MODULE SHARED STATIC) - string(APPEND CMAKE_${type}_LINKER_FLAGS_DEBUG " -fsanitize=${XSAN} -fno-omit-frame-pointer") + string(APPEND CMAKE_${type}_LINKER_FLAGS_DEBUG " -fsanitize=${XSAN} -fno-omit-frame-pointer -fno-sanitize-recover") endforeach() message(STATUS "Doing a ${XSAN} sanitizer build") endif() From b8d262573afac2d1d98b451630cab5e35b9011aa Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Fri, 22 May 2020 12:06:34 -0400 Subject: [PATCH 12/40] use std::vector instead of thrashing heap with allocation --- llarp/ev/ev_libuv.cpp | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/llarp/ev/ev_libuv.cpp b/llarp/ev/ev_libuv.cpp index a674e2627..27d7c7744 100644 --- a/llarp/ev/ev_libuv.cpp +++ b/llarp/ev/ev_libuv.cpp @@ -397,6 +397,7 @@ namespace libuv uv_check_t m_Ticker; llarp_udp_io* const m_UDP; llarp::SockAddr m_Addr; + std::vector m_Buffer; udp_glue(uv_loop_t* loop, llarp_udp_io* udp, const llarp::SockAddr& src) : m_UDP(udp), m_Addr(src) @@ -408,11 +409,13 @@ namespace libuv } static void - Alloc(uv_handle_t*, size_t suggested_size, uv_buf_t* buf) + Alloc(uv_handle_t*h, size_t suggested_size, uv_buf_t* buf) { - const size_t sz = std::min(suggested_size, size_t{1500}); - buf->base = new char[sz]; - buf->len = sz; + udp_glue * self = static_cast(h->data); + if(self->m_Buffer.empty()) + self->m_Buffer.resize(suggested_size); + buf->base = self->m_Buffer.data(); + buf->len = self->m_Buffer.size(); } /// callback for libuv @@ -422,7 +425,6 @@ namespace libuv udp_glue* glue = static_cast(handle->data); if (addr) glue->RecvFrom(nread, buf, llarp::SockAddr(*addr)); - delete[] buf->base; } void From b8da447053653facb53c6d8aa4f05e82a400d37c Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Fri, 22 May 2020 13:10:42 -0400 Subject: [PATCH 13/40] use const iterators explicitly --- llarp/util/decaying_hashset.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/llarp/util/decaying_hashset.hpp b/llarp/util/decaying_hashset.hpp index f74b3b22f..4b5e902c8 100644 --- a/llarp/util/decaying_hashset.hpp +++ b/llarp/util/decaying_hashset.hpp @@ -40,8 +40,8 @@ namespace llarp if (now == 0s) now = llarp::time_now_ms(); - auto itr = m_Values.begin(); - while (itr != m_Values.end()) + auto itr = m_Values.cbegin(); + while (itr != m_Values.cend()) { if ((m_CacheInterval + itr->second) <= now) itr = m_Values.erase(itr); From 1230b815171c2c38b68b8e7e3d1f799a4edb6fd5 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Fri, 22 May 2020 13:13:58 -0400 Subject: [PATCH 14/40] make format --- llarp/ev/ev_libuv.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/llarp/ev/ev_libuv.cpp b/llarp/ev/ev_libuv.cpp index 27d7c7744..9e45e84ed 100644 --- a/llarp/ev/ev_libuv.cpp +++ b/llarp/ev/ev_libuv.cpp @@ -409,10 +409,10 @@ namespace libuv } static void - Alloc(uv_handle_t*h, size_t suggested_size, uv_buf_t* buf) + Alloc(uv_handle_t* h, size_t suggested_size, uv_buf_t* buf) { - udp_glue * self = static_cast(h->data); - if(self->m_Buffer.empty()) + udp_glue* self = static_cast(h->data); + if (self->m_Buffer.empty()) self->m_Buffer.resize(suggested_size); buf->base = self->m_Buffer.data(); buf->len = self->m_Buffer.size(); From 39f8f17b5357e85d855e319c57ca14bd5fdae1d9 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Fri, 22 May 2020 14:00:20 -0400 Subject: [PATCH 15/40] dont set callback if non provided --- llarp/router/router.cpp | 5 ----- 1 file changed, 5 deletions(-) diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index a60bbe1a4..0a55904c1 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -174,11 +174,6 @@ namespace llarp bool Router::SendToOrQueue(const RouterID& remote, const ILinkMessage* msg, SendStatusHandler handler) { - if (handler == nullptr) - { - using std::placeholders::_1; - handler = std::bind(&Router::MessageSent, this, remote, _1); - } return _outboundMessageHandler.QueueMessage(remote, msg, handler); } From eb0abbf1fffcc18741680bcacfcf68f75cbc8cc2 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Fri, 22 May 2020 15:47:42 -0400 Subject: [PATCH 16/40] add eraseif to decaying hashset --- llarp/path/path_context.cpp | 8 ++++++-- llarp/util/aligned.hpp | 8 +++----- llarp/util/decaying_hashset.hpp | 27 ++++++++++++++++++--------- 3 files changed, 27 insertions(+), 16 deletions(-) diff --git a/llarp/path/path_context.cpp b/llarp/path/path_context.cpp index 8ea1cb1ef..b3ad17cfe 100644 --- a/llarp/path/path_context.cpp +++ b/llarp/path/path_context.cpp @@ -308,14 +308,16 @@ namespace llarp auto itr = map.begin(); while (itr != map.end()) { - itr->second->DecayFilters(now); if (itr->second->Expired(now)) { m_Router->outboundMessageHandler().QueueRemoveEmptyPath(itr->first); itr = map.erase(itr); } else + { + itr->second->DecayFilters(now); ++itr; + } } } { @@ -324,13 +326,15 @@ namespace llarp auto itr = map.begin(); while (itr != map.end()) { - itr->second->DecayFilters(now); if (itr->second->Expired(now)) { itr = map.erase(itr); } else + { + itr->second->DecayFilters(now); ++itr; + } } } } diff --git a/llarp/util/aligned.hpp b/llarp/util/aligned.hpp index b9f0fe0c1..848dcbdcb 100644 --- a/llarp/util/aligned.hpp +++ b/llarp/util/aligned.hpp @@ -270,12 +270,10 @@ namespace llarp struct Hash { - size_t - operator()(const AlignedBuffer& buf) const + std::size_t + operator()(const AlignedBuffer& buf) const noexcept { - size_t hash; - std::memcpy(&hash, buf.data(), sizeof(hash)); - return hash; + return *reinterpret_cast(buf.data()); } }; diff --git a/llarp/util/decaying_hashset.hpp b/llarp/util/decaying_hashset.hpp index 4b5e902c8..16a566f0c 100644 --- a/llarp/util/decaying_hashset.hpp +++ b/llarp/util/decaying_hashset.hpp @@ -39,15 +39,7 @@ namespace llarp { if (now == 0s) now = llarp::time_now_ms(); - - auto itr = m_Values.cbegin(); - while (itr != m_Values.cend()) - { - if ((m_CacheInterval + itr->second) <= now) - itr = m_Values.erase(itr); - else - ++itr; - } + EraseIf([&](const auto& item) { return (m_CacheInterval + item.second) <= now; }); } Time_t @@ -69,6 +61,23 @@ namespace llarp } private: + template + void + EraseIf(Predicate_t pred) + { + for (auto i = m_Values.begin(), last = m_Values.end(); i != last;) + { + if (pred(*i)) + { + i = m_Values.erase(i); + } + else + { + ++i; + } + } + } + Time_t m_CacheInterval; std::unordered_map m_Values; }; From 4a378ae9349489740dedd5e86156a1ed5a060a6b Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Sat, 23 May 2020 16:04:43 -0400 Subject: [PATCH 17/40] remove logic thread from logic as it is now a thin wrapper arround the event loop --- llarp/util/thread/logic.cpp | 66 ++++++++----------------------------- llarp/util/thread/logic.hpp | 25 +++++++------- 2 files changed, 25 insertions(+), 66 deletions(-) diff --git a/llarp/util/thread/logic.cpp b/llarp/util/thread/logic.cpp index 3d8389363..2021d45ea 100644 --- a/llarp/util/thread/logic.cpp +++ b/llarp/util/thread/logic.cpp @@ -6,92 +6,54 @@ namespace llarp { - Logic::Logic(size_t sz) : m_Thread(llarp_init_threadpool(1, "llarp-logic", sz)) + Logic::Logic(size_t) { - llarp_threadpool_start(m_Thread); - /// set thread id - std::promise result; - // queue setting id and try to get the result back - llarp_threadpool_queue_job(m_Thread, [&]() { - m_ID = std::this_thread::get_id(); - result.set_value(*m_ID); - }); - // get the result back - ID_t spawned = result.get_future().get(); - LogDebug("logic thread spawned on ", spawned); } Logic::~Logic() { - delete m_Thread; } size_t Logic::numPendingJobs() const { - return m_Thread->pendingJobs(); + return 0; } bool Logic::queue_job(struct llarp_thread_job job) { - return job.user && job.work && LogicCall(this, std::bind(job.work, job.user)); + if (job.user && job.work) + { + LogicCall(this, std::bind(job.work, job.user)); + return true; + } + return false; } void Logic::stop() { llarp::LogDebug("logic thread stop"); - // stop all operations on threadpool - llarp_threadpool_stop(m_Thread); } - bool - Logic::_traceLogicCall(std::function func, const char* tag, int line) + void + Logic::Call(std::function func) { - // wrap the function so that we ensure that it's always calling stuff one at - // a time - - auto f = [self = this, func]() { - if (self->m_Queue) - { - func(); - } - else - { - self->m_Killer.TryAccess(func); - } - }; if (can_flush()) { - f(); - return true; - } - if (m_Queue) - { - m_Queue(f); - return true; - } - if (m_Thread->LooksFull(5)) - { - LogErrorExplicit( - tag ? tag : LOG_TAG, - line ? line : __LINE__, - "holy crap, we are trying to queue a job " - "onto the logic thread but it looks full"); - std::abort(); + func(); } - auto ret = llarp_threadpool_queue_job(m_Thread, f); - if (not ret) + else { + m_Queue(std::move(func)); } - return ret; } void Logic::SetQueuer(std::function)> q) { - m_Queue = q; + m_Queue = std::move(q); m_Queue([self = this]() { self->m_ID = std::this_thread::get_id(); }); } diff --git a/llarp/util/thread/logic.hpp b/llarp/util/thread/logic.hpp index d94f460d8..f548d5f8c 100644 --- a/llarp/util/thread/logic.hpp +++ b/llarp/util/thread/logic.hpp @@ -22,8 +22,8 @@ namespace llarp bool queue_job(struct llarp_thread_job job); - bool - _traceLogicCall(std::function func, const char* filename, int lineo); + void + Call(std::function func); uint32_t call_later(llarp_time_t later, std::function func); @@ -51,7 +51,6 @@ namespace llarp private: using ID_t = std::thread::id; - llarp_threadpool* const m_Thread; llarp_ev_loop* m_Loop = nullptr; std::optional m_ID; util::ContentionKiller m_Killer; @@ -59,15 +58,13 @@ namespace llarp }; } // namespace llarp -#ifndef LogicCall -#if defined(LOKINET_DEBUG) -#ifdef LOG_TAG -#define LogicCall(l, ...) l->_traceLogicCall(__VA_ARGS__, LOG_TAG, __LINE__) -#else -#define LogicCall(l, ...) l->_traceLogicCall(__VA_ARGS__, __FILE__, __LINE__) -#endif -#else -#define LogicCall(l, ...) l->_traceLogicCall(__VA_ARGS__, 0, 0) -#endif -#endif +/// this used to be a macro +template +static bool +LogicCall(const Logic_ptr& logic, Func_t func) +{ + logic->Call(std::move(func)); + return true; +} + #endif From 25a4bbd5ca4101c61e2a2d85a6f6c63fe50f9d55 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Sat, 23 May 2020 16:05:26 -0400 Subject: [PATCH 18/40] use std::swap --- llarp/path/path.cpp | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/llarp/path/path.cpp b/llarp/path/path.cpp index 5bddc9566..96470ab7f 100644 --- a/llarp/path/path.cpp +++ b/llarp/path/path.cpp @@ -439,31 +439,33 @@ namespace llarp msg.pathid = TXID(); ++idx; } - LogicCall( - r->logic(), - std::bind(&Path::HandleAllUpstream, shared_from_this(), std::move(sendmsgs), r)); + LogicCall(r->logic(), [self = shared_from_this(), data = std::move(sendmsgs), r]() { + self->HandleAllUpstream(std::move(data), r); + }); } void Path::FlushUpstream(AbstractRouter* r) { - if (m_UpstreamQueue && !m_UpstreamQueue->empty()) + if (m_UpstreamQueue && not m_UpstreamQueue->empty()) { + TrafficQueue_ptr data = nullptr; + std::swap(m_UpstreamQueue, data); r->threadpool()->addJob( - std::bind(&Path::UpstreamWork, shared_from_this(), std::move(m_UpstreamQueue), r)); + [self = shared_from_this(), data, r]() { self->UpstreamWork(std::move(data), r); }); } - m_UpstreamQueue = nullptr; } void Path::FlushDownstream(AbstractRouter* r) { - if (m_DownstreamQueue && !m_DownstreamQueue->empty()) + if (m_DownstreamQueue && not m_DownstreamQueue->empty()) { + TrafficQueue_ptr data = nullptr; + std::swap(m_DownstreamQueue, data); r->threadpool()->addJob( - std::bind(&Path::DownstreamWork, shared_from_this(), std::move(m_DownstreamQueue), r)); + [self = shared_from_this(), data, r]() { self->DownstreamWork(std::move(data), r); }); } - m_DownstreamQueue = nullptr; } bool @@ -507,9 +509,9 @@ namespace llarp sendMsgs[idx].X = buf; ++idx; } - LogicCall( - r->logic(), - std::bind(&Path::HandleAllDownstream, shared_from_this(), std::move(sendMsgs), r)); + LogicCall(r->logic(), [self = shared_from_this(), msgs = std::move(sendMsgs), r]() { + self->HandleAllDownstream(std::move(msgs), r); + }); } void From 7a5c193e4fa4aac1a51684d07dfade5250eb910b Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Sat, 23 May 2020 16:55:02 -0400 Subject: [PATCH 19/40] remove unused member --- llarp/util/thread/logic.hpp | 1 - 1 file changed, 1 deletion(-) diff --git a/llarp/util/thread/logic.hpp b/llarp/util/thread/logic.hpp index f548d5f8c..d65a889e6 100644 --- a/llarp/util/thread/logic.hpp +++ b/llarp/util/thread/logic.hpp @@ -53,7 +53,6 @@ namespace llarp using ID_t = std::thread::id; llarp_ev_loop* m_Loop = nullptr; std::optional m_ID; - util::ContentionKiller m_Killer; std::function)> m_Queue; }; } // namespace llarp From 4ad8ae253d97a35c61213cd4970c43ea1a76ef1f Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Sat, 23 May 2020 17:06:04 -0400 Subject: [PATCH 20/40] align to either 64bit or max_align_t which ever is smaller, for 32 bit. --- llarp/util/aligned.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/llarp/util/aligned.hpp b/llarp/util/aligned.hpp index 848dcbdcb..8ba6d6cad 100644 --- a/llarp/util/aligned.hpp +++ b/llarp/util/aligned.hpp @@ -28,7 +28,7 @@ namespace llarp { /// aligned buffer that is sz bytes long and aligns to the nearest Alignment template - struct alignas(uint64_t) AlignedBuffer + struct alignas(std::min(alignof(uint64_t), alignof(std::max_align_t))) AlignedBuffer { static_assert( sz >= 8, From f0eca908a43327fd3bf16cd96a1eac537fe37fc8 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Sat, 23 May 2020 17:18:00 -0400 Subject: [PATCH 21/40] use static_assert instead --- llarp/util/aligned.hpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/llarp/util/aligned.hpp b/llarp/util/aligned.hpp index 8ba6d6cad..6aa5ca708 100644 --- a/llarp/util/aligned.hpp +++ b/llarp/util/aligned.hpp @@ -28,8 +28,9 @@ namespace llarp { /// aligned buffer that is sz bytes long and aligns to the nearest Alignment template - struct alignas(std::min(alignof(uint64_t), alignof(std::max_align_t))) AlignedBuffer + struct alignas(std::max_align_t) AlignedBuffer { + static_assert(alignof(std::max_align_t) <= 16, "insane alignment"); static_assert( sz >= 8, "AlignedBuffer cannot be used with buffers smaller than 8 " From 403bc744933c64613ce266cf02cfd517d034de24 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Sun, 24 May 2020 07:06:27 -0400 Subject: [PATCH 22/40] unbreak unit tests --- llarp/util/aligned.hpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/llarp/util/aligned.hpp b/llarp/util/aligned.hpp index 6aa5ca708..5e866396b 100644 --- a/llarp/util/aligned.hpp +++ b/llarp/util/aligned.hpp @@ -274,7 +274,9 @@ namespace llarp std::size_t operator()(const AlignedBuffer& buf) const noexcept { - return *reinterpret_cast(buf.data()); + std::size_t h = 0; + std::memcpy(&h, buf.data(), sizeof(std::size_t)); + return h; } }; From 149a01c80f501231cc5354fd61a439b9c295e515 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Sun, 24 May 2020 08:07:37 -0400 Subject: [PATCH 23/40] dont use std::bind, use lambda --- llarp/iwp/session.cpp | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/llarp/iwp/session.cpp b/llarp/iwp/session.cpp index fa83673ec..2253fdbca 100644 --- a/llarp/iwp/session.cpp +++ b/llarp/iwp/session.cpp @@ -612,15 +612,17 @@ namespace llarp recvMsgs->emplace_back(std::move(pkt)); } LogDebug("decrypted ", recvMsgs->size(), " packets from ", m_RemoteAddr); - LogicCall( - m_Parent->logic(), std::bind(&Session::HandlePlaintext, shared_from_this(), recvMsgs)); + LogicCall(m_Parent->logic(), [self = shared_from_this(), msgs = recvMsgs] { + self->HandlePlaintext(std::move(msgs)); + }); } void Session::HandlePlaintext(CryptoQueue_ptr msgs) { - for (auto& result : *msgs) + for (auto itr = msgs->begin(), end = msgs->end(); itr != end;) { + Packet_t result{std::move(*itr)}; LogDebug("Command ", int(result[PacketOverhead + 1])); switch (result[PacketOverhead + 1]) { @@ -648,6 +650,7 @@ namespace llarp default: LogError("invalid command ", int(result[PacketOverhead + 1]), " from ", m_RemoteAddr); } + itr = msgs->erase(itr); } SendMACK(); Pump(); From 44ff3a99280bf1ade22b23ebec65ac8ae95b0e30 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Sun, 24 May 2020 08:14:08 -0400 Subject: [PATCH 24/40] copy assignment so we don't crash --- llarp/iwp/session.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/llarp/iwp/session.cpp b/llarp/iwp/session.cpp index 2253fdbca..1b92fad3e 100644 --- a/llarp/iwp/session.cpp +++ b/llarp/iwp/session.cpp @@ -622,7 +622,7 @@ namespace llarp { for (auto itr = msgs->begin(), end = msgs->end(); itr != end;) { - Packet_t result{std::move(*itr)}; + Packet_t result = *itr; LogDebug("Command ", int(result[PacketOverhead + 1])); switch (result[PacketOverhead + 1]) { From 96cbab33c323f8dccd6d091b9c163963089e427d Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Sun, 24 May 2020 08:15:36 -0400 Subject: [PATCH 25/40] style: put iterator erase in for loop update --- llarp/iwp/session.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/llarp/iwp/session.cpp b/llarp/iwp/session.cpp index 1b92fad3e..08b9f1315 100644 --- a/llarp/iwp/session.cpp +++ b/llarp/iwp/session.cpp @@ -620,7 +620,7 @@ namespace llarp void Session::HandlePlaintext(CryptoQueue_ptr msgs) { - for (auto itr = msgs->begin(), end = msgs->end(); itr != end;) + for (auto itr = msgs->begin(), end = msgs->end(); itr != end; itr = msgs->erase(itr)) { Packet_t result = *itr; LogDebug("Command ", int(result[PacketOverhead + 1])); @@ -650,7 +650,6 @@ namespace llarp default: LogError("invalid command ", int(result[PacketOverhead + 1]), " from ", m_RemoteAddr); } - itr = msgs->erase(itr); } SendMACK(); Pump(); From 9298313066ce0c43682095f8154912df16c7f7e3 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Sun, 24 May 2020 08:21:04 -0400 Subject: [PATCH 26/40] use std::move --- llarp/ev/ev_libuv.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/llarp/ev/ev_libuv.cpp b/llarp/ev/ev_libuv.cpp index 9e45e84ed..26dd21983 100644 --- a/llarp/ev/ev_libuv.cpp +++ b/llarp/ev/ev_libuv.cpp @@ -737,7 +737,7 @@ namespace libuv { while (not m_LogicCalls.empty()) { - auto f = m_LogicCalls.popFront(); + auto f = std::move(m_LogicCalls.popFront()); f(); } } From db00d080f6c21963eaaf2a88108c1aba3e629892 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Sun, 24 May 2020 08:36:46 -0400 Subject: [PATCH 27/40] use std::list instead of std::vector because idfk man --- llarp/iwp/session.cpp | 2 +- llarp/iwp/session.hpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/llarp/iwp/session.cpp b/llarp/iwp/session.cpp index 08b9f1315..a96d573f6 100644 --- a/llarp/iwp/session.cpp +++ b/llarp/iwp/session.cpp @@ -620,7 +620,7 @@ namespace llarp void Session::HandlePlaintext(CryptoQueue_ptr msgs) { - for (auto itr = msgs->begin(), end = msgs->end(); itr != end; itr = msgs->erase(itr)) + for (auto itr = msgs->begin(), end = msgs->end(); itr != end; itr++) { Packet_t result = *itr; LogDebug("Command ", int(result[PacketOverhead + 1])); diff --git a/llarp/iwp/session.hpp b/llarp/iwp/session.hpp index a4417cc11..fee4a2be0 100644 --- a/llarp/iwp/session.hpp +++ b/llarp/iwp/session.hpp @@ -204,7 +204,7 @@ namespace llarp /// set of rx messages to send in next round of multiacks std::unordered_set m_SendMACKs; - using CryptoQueue_t = std::vector; + using CryptoQueue_t = std::list; using CryptoQueue_ptr = std::shared_ptr; CryptoQueue_ptr m_EncryptNext; CryptoQueue_ptr m_DecryptNext; From f25e9bb01fc7a3a0693cb5f6d6857e6ca312c27c Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Sun, 24 May 2020 08:51:17 -0400 Subject: [PATCH 28/40] use std::list here too --- llarp/path/ihophandler.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/llarp/path/ihophandler.hpp b/llarp/path/ihophandler.hpp index ac76d93c9..514b5eac6 100644 --- a/llarp/path/ihophandler.hpp +++ b/llarp/path/ihophandler.hpp @@ -26,7 +26,7 @@ namespace llarp struct IHopHandler { using TrafficEvent_t = std::pair, TunnelNonce>; - using TrafficQueue_t = std::vector; + using TrafficQueue_t = std::list; using TrafficQueue_ptr = std::shared_ptr; virtual ~IHopHandler() = default; From b572e7a7cd65079c878b1abb9e8462054868edad Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Mon, 25 May 2020 09:50:55 -0400 Subject: [PATCH 29/40] add lokinet-exit.py script --- contrib/py/admin/lokinet-exit.py | 79 ++++++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100755 contrib/py/admin/lokinet-exit.py diff --git a/contrib/py/admin/lokinet-exit.py b/contrib/py/admin/lokinet-exit.py new file mode 100755 index 000000000..06e15f29c --- /dev/null +++ b/contrib/py/admin/lokinet-exit.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python3 +import argparse +import sys + +from socket import AF_INET + +import requests +from pyroute2 import IPRoute + +class LokinetRPC: + + def __init__(self, url): + self._url = url + + def _jsonrpc(self, method, params={}): + r = requests.post( + self._url, + headers={"Content-Type": "application/json", "Host": "localhost"}, + json={ + "jsonrpc": "2.0", + "id": "0", + "method": "{}".format(method), + "params": params, + }, + ) + return r.json() + + def get_first_hops(self): + data = self._jsonrpc("llarp.admin.dumpstate") + for link in data['result']['links']['outbound']: + for session in link["sessions"]['established']: + yield session['remoteAddr'] + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--rpc", type=str, default='127.0.0.1:1190') + ap.add_argument("--ifname", type=str, default="lokitun0") + ap.add_argument("--up", action='store_const', dest='action', const='up') + ap.add_argument("--down", action='store_const', dest='action', const='down') + args = ap.parse_args() + rpc = LokinetRPC('http://{}/jsonrpc'.format(args.rpc)) + hops = dict() + for hop in rpc.get_first_hops(): + ip = hop.split(':')[0] + hops[ip] = 0 + + with IPRoute() as ip: + ip.bind() + idx = ip.link_lookup(ifname=args.ifname)[0] + gateways = ip.get_default_routes(family=AF_INET) + gateway = None + for g in gateways: + useThisGateway = True + for name, val in g['attrs']: + if name == 'RTA_OIF' and val == idx: + useThisGateway = False + if not useThisGateway: + continue + for name, val in g['attrs']: + if name == 'RTA_GATEWAY': + gateway = val + if gateway: + for address in hops: + try: + if args.action == 'up': + ip.route("add", dst="{}/32".format(address), gateway=gateway) + elif args.action == 'down': + ip.route("del", dst="{}/32".format(address), gateway=gateway) + except: + pass + if args.action == 'up': + ip.route('add', dst='0.0.0.0/0', oif=idx) + elif args.action == 'down': + ip.route('del', dst='0.0.0.0/0', oif=idx) + + +if __name__ == '__main__': + main() From 14b134c42d6bf5aeaa9df5bf2a3e943c1b9a06d1 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 26 May 2020 10:06:38 -0400 Subject: [PATCH 30/40] when we get an explicit config path respect it and the base dir --- daemon/main.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/daemon/main.cpp b/daemon/main.cpp index 84db722df..b1f885d7e 100644 --- a/daemon/main.cpp +++ b/daemon/main.cpp @@ -222,8 +222,7 @@ main(int argc, char* argv[]) if (genconfigOnly) { - llarp::ensureConfig( - llarp::GetDefaultDataDir(), llarp::GetDefaultConfigPath(), overwrite, opts.isRelay); + llarp::ensureConfig(basedir, fname, overwrite, opts.isRelay); } else { From dd87cd396be8474a2e412abad003096929ea6c0d Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 26 May 2020 14:01:33 -0400 Subject: [PATCH 31/40] pedantic change to retrigger ci --- readme.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/readme.md b/readme.md index d388ed463..b8252fdd3 100644 --- a/readme.md +++ b/readme.md @@ -12,6 +12,8 @@ You can view documentation on how to get started [here](https://loki-project.git [![Build Status](https://drone.lokinet.dev/api/badges/loki-project/loki-network/status.svg?ref=refs/heads/master)](https://drone.lokinet.dev/loki-project/loki-network) +You can find Bleeding edge builds [here](https://builds.lokinet.dev) (these builds may eat your first born). + ## Usage See the [documentation](https://loki-project.github.io/loki-docs/Lokinet/LokinetOverview/) on how to get started. From c47a210302f33cdbe38ac9b96ebdeb07defc27ae Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 26 May 2020 14:04:43 -0400 Subject: [PATCH 32/40] remove a std::move that prevents copy elision --- llarp/ev/ev_libuv.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/llarp/ev/ev_libuv.cpp b/llarp/ev/ev_libuv.cpp index 26dd21983..9e45e84ed 100644 --- a/llarp/ev/ev_libuv.cpp +++ b/llarp/ev/ev_libuv.cpp @@ -737,7 +737,7 @@ namespace libuv { while (not m_LogicCalls.empty()) { - auto f = std::move(m_LogicCalls.popFront()); + auto f = m_LogicCalls.popFront(); f(); } } From 242ab3caba3b74f18a54439e760d967f5c933ea3 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Fri, 29 May 2020 12:31:57 -0400 Subject: [PATCH 33/40] rename lokinet-exit too to lokinet-vpn install lokinet-vpn tool for use with debian packaging --- daemon/CMakeLists.txt | 3 + .../lokinet-exit.py => daemon/lokinet-vpn | 29 ++- daemon/lokinetmon.cpp | 232 ++++++++++++++++++ 3 files changed, 259 insertions(+), 5 deletions(-) rename contrib/py/admin/lokinet-exit.py => daemon/lokinet-vpn (75%) create mode 100644 daemon/lokinetmon.cpp diff --git a/daemon/CMakeLists.txt b/daemon/CMakeLists.txt index 0f3324360..cf29f2650 100644 --- a/daemon/CMakeLists.txt +++ b/daemon/CMakeLists.txt @@ -43,4 +43,7 @@ else() target_link_libraries(lokinetctl PRIVATE ${CURL_LIBRARIES}) endif(CURL_FOUND) + + install(PROGRAMS lokinet-vpn DESTINATION bin COMPONENT lokinet) + endif() diff --git a/contrib/py/admin/lokinet-exit.py b/daemon/lokinet-vpn similarity index 75% rename from contrib/py/admin/lokinet-exit.py rename to daemon/lokinet-vpn index 06e15f29c..1a7ac4e17 100755 --- a/contrib/py/admin/lokinet-exit.py +++ b/daemon/lokinet-vpn @@ -44,10 +44,16 @@ def main(): for hop in rpc.get_first_hops(): ip = hop.split(':')[0] hops[ip] = 0 - + if len(hops) == 0: + print("lokinet is not connected yet") + return 1 with IPRoute() as ip: ip.bind() - idx = ip.link_lookup(ifname=args.ifname)[0] + try: + idx = ip.link_lookup(ifname=args.ifname)[0] + except: + print("cannot find {}".format(args.ifname)) + return 1 gateways = ip.get_default_routes(family=AF_INET) gateway = None for g in gateways: @@ -69,11 +75,24 @@ def main(): ip.route("del", dst="{}/32".format(address), gateway=gateway) except: pass + if args.action == 'up': - ip.route('add', dst='0.0.0.0/0', oif=idx) + try: + ip.route('add', dst='0.0.0.0/0', oif=idx) + except: + print('failed to add default route') + return 1 elif args.action == 'down': - ip.route('del', dst='0.0.0.0/0', oif=idx) + try: + ip.route('del', dst='0.0.0.0/0', oif=idx) + except: + print('failed to remove default route') + return 1 + else: + print("could not find gateway") + return 1 + return 0 if __name__ == '__main__': - main() + sys.exit(main()) diff --git a/daemon/lokinetmon.cpp b/daemon/lokinetmon.cpp new file mode 100644 index 000000000..232c60ae7 --- /dev/null +++ b/daemon/lokinetmon.cpp @@ -0,0 +1,232 @@ +#include +#include +#include +#include +#include +#include +#include + +using namespace std::chrono_literals; + +std::vector +get_args(int argc, char** argv) +{ + std::vector args; + for (int arg = 0; arg < argc; arg++) + { + args.emplace_back(argv[arg]); + } + return args; +} + +std::promise exit_promise; + +void +HandleSignal(int) +{ + exit_promise.set_value(true); +} + +struct Stats +{ + std::list m_Data; + std::string version; + template + static auto + speedof(const T& rate) + { + const std::array rates{"b", "Kb", "Mb", "Gb"}; + size_t idx = 0; + uint64_t rate_int = 0; + rate.get_to(rate_int); + double rate_float = rate_int * 8; + while (rate_float > 1024.0 and idx < rates.size()) + { + rate_float /= 1024.0; + idx++; + } + char buf[64] = {0}; + std::snprintf(buf, sizeof(buf), "%.2f%sps", rate_float, rates[idx]); + return std::string(buf); + } + + template + void + ForEachLink(Visit_t visit) const + { + const auto& back = m_Data.back(); + const auto& links = back.at("links"); + { + const auto& l = links.at("outbound"); + for (auto itr = l.begin(); itr != l.end(); ++itr) + { + visit(*itr); + } + } + { + const auto& l = links.at("inbound"); + for (auto itr = l.begin(); itr != l.end(); ++itr) + { + visit(*itr); + } + } + } + + template + void + ShowLinkSession(const T& session, WINDOW* win) const + { + std::stringstream ss; + std::string addr; + session["remoteAddr"].get_to(addr); + ss << addr << "\t"; + ss << "[" << speedof(session["txRateCurrent"]) << "\ttx]\t"; + ss << "[" << speedof(session["rxRateCurrent"]) << "\trx]"; + const auto str = ss.str(); + waddstr(win, str.c_str()); + } + + void + DisplayLinks(WINDOW* win) const + { + int y = 3; + ForEachLink([&](const auto& link) { + const auto& sessions = link.at("sessions"); + const auto& established = sessions.at("established"); + for (auto itr = established.begin(); itr != established.end(); ++itr) + { + wmove(win, y++, 1); + ShowLinkSession(*itr, win); + } + }); + } + + void + DisplayServices(WINDOW* win) const + { + (void)win; + } + + void + Update(WINDOW* win) const + { + wmove(win, 1, 1); + waddstr(win, version.c_str()); + DisplayLinks(win); + DisplayServices(win); + } + + void + AddSample(nlohmann::json data) + { + while (m_Data.size() > 64) + { + m_Data.pop_front(); + } + m_Data.emplace_back(std::move(data)); + RecalcStats(); + } + + void + RecalcStats() + { + } +}; + +void +UpdateUI( + std::shared_ptr lmq, + lokimq::ConnectionID id, + std::shared_ptr stats, + WINDOW* win) +{ + lmq->request(id, "llarp.status", [win, stats](bool success, std::vector data) { + wmove(win, 2, 1); + if (not success) + { + waddstr(win, "request failed"); + wrefresh(win); + return; + } + if (data.empty()) + { + waddstr(win, "no data"); + wrefresh(win); + return; + } + try + { + const auto j = nlohmann::json::parse(data.at(0)); + stats->AddSample(j); + } + catch (std::exception&) + { + } + wclear(win); + stats->Update(win); + wrefresh(win); + }); +} + +void +resizeHandler(int) +{ +} + +int +main(int argc, char* argv[]) +{ + std::string connect_str = "tcp://127.0.0.1:1190/"; + const auto args = get_args(argc, argv); + if (args.size() == 2) + { + connect_str = args[1]; + } + signal(SIGINT, HandleSignal); + signal(SIGTERM, HandleSignal); + signal(SIGWINCH, resizeHandler); + auto stats = std::make_shared(); + auto lmq = std::make_shared(); + lmq->start(); + lmq->connect_remote( + connect_str, + [lmq, stats](lokimq::ConnectionID id) { + auto screen = ::initscr(); + if (screen == nullptr) + { + std::cout << "failed to init screen" << std::endl; + exit_promise.set_value(false); + return; + } + ::cbreak(); + ::noecho(); + std::stringstream ss; + ss << "connected via " << id; + const auto str = ss.str(); + waddstr(stdscr, str.c_str()); + wrefresh(stdscr); + lmq->request(id, "llarp.version", [stats](bool success, std::vector data) { + wmove(stdscr, 1, 1); + + if (success and not data.empty()) + { + stats->version = std::move(data[0]); + } + else + { + waddstr(stdscr, "failed to get version"); + } + wrefresh(stdscr); + }); + lmq->add_timer([lmq, stats, id]() { UpdateUI(lmq, id, stats, stdscr); }, 1s); + }, + [](lokimq::ConnectionID, std::string_view fail) { + std::cout << "failed to start lokinetmon: " << fail << std::endl; + exit_promise.set_value(false); + }); + auto ftr = exit_promise.get_future(); + if (ftr.get()) + return ::endwin(); + else + return 0; +} From 18b2ef3d1e756140972908e589eed5e7ef0adbea Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Fri, 29 May 2020 12:33:16 -0400 Subject: [PATCH 34/40] remove unneeded file --- daemon/lokinetmon.cpp | 232 ------------------------------------------ 1 file changed, 232 deletions(-) delete mode 100644 daemon/lokinetmon.cpp diff --git a/daemon/lokinetmon.cpp b/daemon/lokinetmon.cpp deleted file mode 100644 index 232c60ae7..000000000 --- a/daemon/lokinetmon.cpp +++ /dev/null @@ -1,232 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include - -using namespace std::chrono_literals; - -std::vector -get_args(int argc, char** argv) -{ - std::vector args; - for (int arg = 0; arg < argc; arg++) - { - args.emplace_back(argv[arg]); - } - return args; -} - -std::promise exit_promise; - -void -HandleSignal(int) -{ - exit_promise.set_value(true); -} - -struct Stats -{ - std::list m_Data; - std::string version; - template - static auto - speedof(const T& rate) - { - const std::array rates{"b", "Kb", "Mb", "Gb"}; - size_t idx = 0; - uint64_t rate_int = 0; - rate.get_to(rate_int); - double rate_float = rate_int * 8; - while (rate_float > 1024.0 and idx < rates.size()) - { - rate_float /= 1024.0; - idx++; - } - char buf[64] = {0}; - std::snprintf(buf, sizeof(buf), "%.2f%sps", rate_float, rates[idx]); - return std::string(buf); - } - - template - void - ForEachLink(Visit_t visit) const - { - const auto& back = m_Data.back(); - const auto& links = back.at("links"); - { - const auto& l = links.at("outbound"); - for (auto itr = l.begin(); itr != l.end(); ++itr) - { - visit(*itr); - } - } - { - const auto& l = links.at("inbound"); - for (auto itr = l.begin(); itr != l.end(); ++itr) - { - visit(*itr); - } - } - } - - template - void - ShowLinkSession(const T& session, WINDOW* win) const - { - std::stringstream ss; - std::string addr; - session["remoteAddr"].get_to(addr); - ss << addr << "\t"; - ss << "[" << speedof(session["txRateCurrent"]) << "\ttx]\t"; - ss << "[" << speedof(session["rxRateCurrent"]) << "\trx]"; - const auto str = ss.str(); - waddstr(win, str.c_str()); - } - - void - DisplayLinks(WINDOW* win) const - { - int y = 3; - ForEachLink([&](const auto& link) { - const auto& sessions = link.at("sessions"); - const auto& established = sessions.at("established"); - for (auto itr = established.begin(); itr != established.end(); ++itr) - { - wmove(win, y++, 1); - ShowLinkSession(*itr, win); - } - }); - } - - void - DisplayServices(WINDOW* win) const - { - (void)win; - } - - void - Update(WINDOW* win) const - { - wmove(win, 1, 1); - waddstr(win, version.c_str()); - DisplayLinks(win); - DisplayServices(win); - } - - void - AddSample(nlohmann::json data) - { - while (m_Data.size() > 64) - { - m_Data.pop_front(); - } - m_Data.emplace_back(std::move(data)); - RecalcStats(); - } - - void - RecalcStats() - { - } -}; - -void -UpdateUI( - std::shared_ptr lmq, - lokimq::ConnectionID id, - std::shared_ptr stats, - WINDOW* win) -{ - lmq->request(id, "llarp.status", [win, stats](bool success, std::vector data) { - wmove(win, 2, 1); - if (not success) - { - waddstr(win, "request failed"); - wrefresh(win); - return; - } - if (data.empty()) - { - waddstr(win, "no data"); - wrefresh(win); - return; - } - try - { - const auto j = nlohmann::json::parse(data.at(0)); - stats->AddSample(j); - } - catch (std::exception&) - { - } - wclear(win); - stats->Update(win); - wrefresh(win); - }); -} - -void -resizeHandler(int) -{ -} - -int -main(int argc, char* argv[]) -{ - std::string connect_str = "tcp://127.0.0.1:1190/"; - const auto args = get_args(argc, argv); - if (args.size() == 2) - { - connect_str = args[1]; - } - signal(SIGINT, HandleSignal); - signal(SIGTERM, HandleSignal); - signal(SIGWINCH, resizeHandler); - auto stats = std::make_shared(); - auto lmq = std::make_shared(); - lmq->start(); - lmq->connect_remote( - connect_str, - [lmq, stats](lokimq::ConnectionID id) { - auto screen = ::initscr(); - if (screen == nullptr) - { - std::cout << "failed to init screen" << std::endl; - exit_promise.set_value(false); - return; - } - ::cbreak(); - ::noecho(); - std::stringstream ss; - ss << "connected via " << id; - const auto str = ss.str(); - waddstr(stdscr, str.c_str()); - wrefresh(stdscr); - lmq->request(id, "llarp.version", [stats](bool success, std::vector data) { - wmove(stdscr, 1, 1); - - if (success and not data.empty()) - { - stats->version = std::move(data[0]); - } - else - { - waddstr(stdscr, "failed to get version"); - } - wrefresh(stdscr); - }); - lmq->add_timer([lmq, stats, id]() { UpdateUI(lmq, id, stats, stdscr); }, 1s); - }, - [](lokimq::ConnectionID, std::string_view fail) { - std::cout << "failed to start lokinetmon: " << fail << std::endl; - exit_promise.set_value(false); - }); - auto ftr = exit_promise.get_future(); - if (ftr.get()) - return ::endwin(); - else - return 0; -} From 8a003a11447666e51593dfa4c14408828d3643a5 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Fri, 29 May 2020 12:37:34 -0400 Subject: [PATCH 35/40] add lokinet-vpn systemd service file --- debian/lokinet.lokinet-vpn.service | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 debian/lokinet.lokinet-vpn.service diff --git a/debian/lokinet.lokinet-vpn.service b/debian/lokinet.lokinet-vpn.service new file mode 100644 index 000000000..dfd63ac57 --- /dev/null +++ b/debian/lokinet.lokinet-vpn.service @@ -0,0 +1,13 @@ +[Unit] +Description=LokiNET VPN tunnel: shove all traffic over lokinet +Wants=lokinet.service +After=lokinet.service + +[Service] +Type=oneshot +ExecStart=/usr/bin/lokinet-vpn --up +ExecStop=/usr/bin/lokinet-vpn --down +RemainAfterExit=true + +[Install] +WantedBy=multi-user.target \ No newline at end of file From acecb23eb3b6a5ce96bb865b228c8bebb94a8812 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Mon, 1 Jun 2020 09:17:44 -0400 Subject: [PATCH 36/40] make libuv event loop logic queue size configurable. remove logic constructor that is no-op. add constant for default logic queue size add constant for transit hop queue size --- llarp/constants/evloop.hpp | 7 +++++++ llarp/constants/path.hpp | 4 ++++ llarp/context.cpp | 12 +++++++----- llarp/ev/ev.cpp | 4 ++-- llarp/ev/ev.h | 5 ++++- llarp/ev/ev_libuv.cpp | 3 ++- llarp/ev/ev_libuv.hpp | 2 +- llarp/path/transit_hop.cpp | 28 ++++++++++++++++++---------- llarp/router/router.cpp | 3 +-- llarp/util/thread/logic.cpp | 14 -------------- llarp/util/thread/logic.hpp | 7 ------- readme.md | 2 -- 12 files changed, 46 insertions(+), 45 deletions(-) create mode 100644 llarp/constants/evloop.hpp diff --git a/llarp/constants/evloop.hpp b/llarp/constants/evloop.hpp new file mode 100644 index 000000000..4824f1b9c --- /dev/null +++ b/llarp/constants/evloop.hpp @@ -0,0 +1,7 @@ +#pragma once + +namespace llarp +{ + /// default queue length for logic jobs + constexpr std::size_t event_loop_queue_size = 1024; +} // namespace llarp diff --git a/llarp/constants/path.hpp b/llarp/constants/path.hpp index 92758507d..9d6da0865 100644 --- a/llarp/constants/path.hpp +++ b/llarp/constants/path.hpp @@ -34,6 +34,10 @@ namespace llarp constexpr auto latency_interval = 20s; /// if a path is inactive for this amount of time it's dead constexpr auto alive_timeout = latency_interval * 1.5; + + /// how big transit hop traffic queues are + constexpr std::size_t transit_hop_queue_size = 256; + } // namespace path } // namespace llarp diff --git a/llarp/context.cpp b/llarp/context.cpp index 2e5e07108..f25d41eea 100644 --- a/llarp/context.cpp +++ b/llarp/context.cpp @@ -45,10 +45,7 @@ namespace llarp if (threads <= 0) threads = 1; worker = std::make_shared(threads, 1024, "llarp-worker"); - auto jobQueueSize = config->router.m_JobQueueSize; - if (jobQueueSize < 1024) - jobQueueSize = 1024; - logic = std::make_shared(jobQueueSize); + logic = std::make_shared(); nodedb_dir = config->router.m_dataDir / nodedb_dirname; @@ -80,7 +77,12 @@ namespace llarp llarp::LogInfo(llarp::VERSION_FULL, " ", llarp::RELEASE_MOTTO); llarp::LogInfo("starting up"); if (mainloop == nullptr) - mainloop = llarp_make_ev_loop(); + { + auto jobQueueSize = config->router.m_JobQueueSize; + if (jobQueueSize < 1024) + jobQueueSize = 1024; + mainloop = llarp_make_ev_loop(jobQueueSize); + } logic->set_event_loop(mainloop.get()); mainloop->set_logic(logic); diff --git a/llarp/ev/ev.cpp b/llarp/ev/ev.cpp index 50b2ee6f2..4a08e64b2 100644 --- a/llarp/ev/ev.cpp +++ b/llarp/ev/ev.cpp @@ -15,9 +15,9 @@ #endif llarp_ev_loop_ptr -llarp_make_ev_loop() +llarp_make_ev_loop(size_t queueLength) { - llarp_ev_loop_ptr r = std::make_shared(); + llarp_ev_loop_ptr r = std::make_shared(queueLength); r->init(); r->update_time(); return r; diff --git a/llarp/ev/ev.h b/llarp/ev/ev.h index 98f5168ff..e1c7b9456 100644 --- a/llarp/ev/ev.h +++ b/llarp/ev/ev.h @@ -25,6 +25,8 @@ #include #endif +#include + /** * ev.h * @@ -47,8 +49,9 @@ using llarp_ev_loop_ptr = std::shared_ptr; /// make an event loop using our baked in event loop on Windows /// make an event loop using libuv otherwise. +/// @param queue_size how big the logic job queue is llarp_ev_loop_ptr -llarp_make_ev_loop(); +llarp_make_ev_loop(std::size_t queue_size = llarp::event_loop_queue_size); // run mainloop void diff --git a/llarp/ev/ev_libuv.cpp b/llarp/ev/ev_libuv.cpp index 9e45e84ed..5eacbf354 100644 --- a/llarp/ev/ev_libuv.cpp +++ b/llarp/ev/ev_libuv.cpp @@ -753,7 +753,8 @@ namespace libuv llarp::LogContext::Instance().logStream->Tick(loop->time_now()); } - Loop::Loop() : llarp_ev_loop(), m_LogicCalls(1024), m_timerQueue(20), m_timerCancelQueue(20) + Loop::Loop(size_t queueLength) + : llarp_ev_loop(), m_LogicCalls(queueLength), m_timerQueue(20), m_timerCancelQueue(20) { } diff --git a/llarp/ev/ev_libuv.hpp b/llarp/ev/ev_libuv.hpp index deb9afc38..ff016e48b 100644 --- a/llarp/ev/ev_libuv.hpp +++ b/llarp/ev/ev_libuv.hpp @@ -24,7 +24,7 @@ namespace libuv Callback callback; }; - Loop(); + Loop(size_t queueSize); bool init() override; diff --git a/llarp/path/transit_hop.cpp b/llarp/path/transit_hop.cpp index edae826c1..a9f446955 100644 --- a/llarp/path/transit_hop.cpp +++ b/llarp/path/transit_hop.cpp @@ -32,7 +32,8 @@ namespace llarp return stream; } - TransitHop::TransitHop() : m_UpstreamGather(512), m_DownstreamGather(512) + TransitHop::TransitHop() + : m_UpstreamGather(transit_hop_queue_size), m_DownstreamGather(transit_hop_queue_size) { m_UpstreamGather.enable(); m_DownstreamGather.enable(); @@ -119,7 +120,6 @@ namespace llarp void TransitHop::DownstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r) { - m_DownstreamWorkCounter++; auto flushIt = [self = shared_from_this(), r]() { std::vector msgs; do @@ -161,7 +161,6 @@ namespace llarp void TransitHop::UpstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r) { - m_UpstreamWorkCounter++; auto flushIt = [self = shared_from_this(), r]() { std::vector msgs; do @@ -251,19 +250,28 @@ namespace llarp void TransitHop::FlushUpstream(AbstractRouter* r) { - if (m_UpstreamQueue && !m_UpstreamQueue->empty()) - r->threadpool()->addJob(std::bind( - &TransitHop::UpstreamWork, shared_from_this(), std::move(m_UpstreamQueue), r)); - + if (m_UpstreamQueue && not m_UpstreamQueue->empty()) + { + if (r->threadpool()->addJob(std::bind( + &TransitHop::UpstreamWork, shared_from_this(), std::move(m_UpstreamQueue), r))) + { + m_UpstreamWorkCounter++; + } + } m_UpstreamQueue = nullptr; } void TransitHop::FlushDownstream(AbstractRouter* r) { - if (m_DownstreamQueue && !m_DownstreamQueue->empty()) - r->threadpool()->addJob(std::bind( - &TransitHop::DownstreamWork, shared_from_this(), std::move(m_DownstreamQueue), r)); + if (m_DownstreamQueue && not m_DownstreamQueue->empty()) + { + if (r->threadpool()->addJob(std::bind( + &TransitHop::DownstreamWork, shared_from_this(), std::move(m_DownstreamQueue), r))) + { + m_DownstreamWorkCounter++; + } + } m_DownstreamQueue = nullptr; } diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index 0a55904c1..e0e7dd0d2 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -153,12 +153,11 @@ namespace llarp void Router::PumpLL() { - static constexpr size_t PumpJobThreshhold = 50; static constexpr auto PumpInterval = 25ms; const auto now = Now(); if (_stopping.load()) return; - if (_logic->numPendingJobs() >= PumpJobThreshhold && _lastPump + PumpInterval >= now) + if (_lastPump + PumpInterval >= now) { return; } diff --git a/llarp/util/thread/logic.cpp b/llarp/util/thread/logic.cpp index 2021d45ea..16e84a838 100644 --- a/llarp/util/thread/logic.cpp +++ b/llarp/util/thread/logic.cpp @@ -6,20 +6,6 @@ namespace llarp { - Logic::Logic(size_t) - { - } - - Logic::~Logic() - { - } - - size_t - Logic::numPendingJobs() const - { - return 0; - } - bool Logic::queue_job(struct llarp_thread_job job) { diff --git a/llarp/util/thread/logic.hpp b/llarp/util/thread/logic.hpp index d65a889e6..91e850251 100644 --- a/llarp/util/thread/logic.hpp +++ b/llarp/util/thread/logic.hpp @@ -11,10 +11,6 @@ namespace llarp class Logic { public: - Logic(size_t queueLength = size_t{1024 * 8}); - - ~Logic(); - /// stop all operation and wait for that to die void stop(); @@ -34,9 +30,6 @@ namespace llarp void remove_call(uint32_t id); - size_t - numPendingJobs() const; - bool can_flush() const; diff --git a/readme.md b/readme.md index b8252fdd3..d388ed463 100644 --- a/readme.md +++ b/readme.md @@ -12,8 +12,6 @@ You can view documentation on how to get started [here](https://loki-project.git [![Build Status](https://drone.lokinet.dev/api/badges/loki-project/loki-network/status.svg?ref=refs/heads/master)](https://drone.lokinet.dev/loki-project/loki-network) -You can find Bleeding edge builds [here](https://builds.lokinet.dev) (these builds may eat your first born). - ## Usage See the [documentation](https://loki-project.github.io/loki-docs/Lokinet/LokinetOverview/) on how to get started. From 44c790b65c20679d9ebd76921e87aeb01c0ea074 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Mon, 1 Jun 2020 09:23:17 -0400 Subject: [PATCH 37/40] revert back to for loop for simplicity --- llarp/iwp/session.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/llarp/iwp/session.cpp b/llarp/iwp/session.cpp index a96d573f6..2d642c15d 100644 --- a/llarp/iwp/session.cpp +++ b/llarp/iwp/session.cpp @@ -620,9 +620,8 @@ namespace llarp void Session::HandlePlaintext(CryptoQueue_ptr msgs) { - for (auto itr = msgs->begin(), end = msgs->end(); itr != end; itr++) + for (auto& result : *msgs) { - Packet_t result = *itr; LogDebug("Command ", int(result[PacketOverhead + 1])); switch (result[PacketOverhead + 1]) { From c6885ec285abea38c946cf8dd4206a21145ba256 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Mon, 1 Jun 2020 09:35:21 -0400 Subject: [PATCH 38/40] remove Endpoint::GetExitRouters() --- llarp/service/endpoint.cpp | 12 ------------ llarp/service/endpoint.hpp | 4 ---- llarp/service/outbound_context.cpp | 6 ------ 3 files changed, 22 deletions(-) diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index d9279c753..4943638ff 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -613,12 +613,6 @@ namespace llarp std::set exclude = prev; for (const auto& snode : SnodeBlacklist()) exclude.insert(snode); - if (hop == 0) - { - const auto exits = GetExitRouters(); - // exclude exit node as first hop in any paths - exclude.insert(exits.begin(), exits.end()); - } if (hop == numHops - 1) { // diversify endpoints @@ -633,12 +627,6 @@ namespace llarp path::Builder::PathBuildStarted(path); } - std::set - Endpoint::GetExitRouters() const - { - return {}; - } - void Endpoint::PutNewOutboundContext(const service::IntroSet& introset) { diff --git a/llarp/service/endpoint.hpp b/llarp/service/endpoint.hpp index d09fc711e..c5d3f2147 100644 --- a/llarp/service/endpoint.hpp +++ b/llarp/service/endpoint.hpp @@ -155,10 +155,6 @@ namespace llarp std::string Name() const override; - /// get a set of all the routers we use as exit node - std::set - GetExitRouters() const; - bool ShouldPublishDescriptors(llarp_time_t now) const override; diff --git a/llarp/service/outbound_context.cpp b/llarp/service/outbound_context.cpp index d35cf329c..015b4cd54 100644 --- a/llarp/service/outbound_context.cpp +++ b/llarp/service/outbound_context.cpp @@ -343,12 +343,6 @@ namespace llarp exclude.insert(m_NextIntro.router); for (const auto& snode : m_Endpoint->SnodeBlacklist()) exclude.insert(snode); - if (hop == 0) - { - // exclude any exits as our first hop - const auto exits = m_Endpoint->GetExitRouters(); - exclude.insert(exits.begin(), exits.end()); - } if (hop == numHops - 1) { m_Endpoint->EnsureRouterIsKnown(m_NextIntro.router); From 95e55a25444dbd38010083e5f16846dd5904aef4 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Mon, 1 Jun 2020 13:31:12 -0400 Subject: [PATCH 39/40] consistent spelling and use std::max --- llarp/context.cpp | 4 +--- llarp/ev/ev_libuv.cpp | 4 ++-- llarp/ev/ev_libuv.hpp | 2 +- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/llarp/context.cpp b/llarp/context.cpp index f25d41eea..257a3e8e3 100644 --- a/llarp/context.cpp +++ b/llarp/context.cpp @@ -78,9 +78,7 @@ namespace llarp llarp::LogInfo("starting up"); if (mainloop == nullptr) { - auto jobQueueSize = config->router.m_JobQueueSize; - if (jobQueueSize < 1024) - jobQueueSize = 1024; + auto jobQueueSize = std::max(event_loop_queue_size, config->router.m_JobQueueSize); mainloop = llarp_make_ev_loop(jobQueueSize); } logic->set_event_loop(mainloop.get()); diff --git a/llarp/ev/ev_libuv.cpp b/llarp/ev/ev_libuv.cpp index 5eacbf354..7242a71c8 100644 --- a/llarp/ev/ev_libuv.cpp +++ b/llarp/ev/ev_libuv.cpp @@ -753,8 +753,8 @@ namespace libuv llarp::LogContext::Instance().logStream->Tick(loop->time_now()); } - Loop::Loop(size_t queueLength) - : llarp_ev_loop(), m_LogicCalls(queueLength), m_timerQueue(20), m_timerCancelQueue(20) + Loop::Loop(size_t queue_size) + : llarp_ev_loop(), m_LogicCalls(queue_size), m_timerQueue(20), m_timerCancelQueue(20) { } diff --git a/llarp/ev/ev_libuv.hpp b/llarp/ev/ev_libuv.hpp index ff016e48b..3eac53481 100644 --- a/llarp/ev/ev_libuv.hpp +++ b/llarp/ev/ev_libuv.hpp @@ -24,7 +24,7 @@ namespace libuv Callback callback; }; - Loop(size_t queueSize); + Loop(size_t queue_size); bool init() override; From f40ffc0fd6af28640fa83ac1c67e83dcee4f07f7 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Mon, 1 Jun 2020 13:58:45 -0400 Subject: [PATCH 40/40] simplify header for tom. --- llarp/service/endpoint.hpp | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/llarp/service/endpoint.hpp b/llarp/service/endpoint.hpp index c5d3f2147..60a17b6c8 100644 --- a/llarp/service/endpoint.hpp +++ b/llarp/service/endpoint.hpp @@ -429,6 +429,7 @@ namespace llarp hooks::Backend_ptr m_OnDown; hooks::Backend_ptr m_OnReady; bool m_PublishIntroSet = true; + std::unique_ptr m_state; private: void @@ -444,10 +445,6 @@ namespace llarp const ConvoMap& Sessions() const; ConvoMap& Sessions(); // clang-format on - protected: - std::unique_ptr m_state; - - private: thread::Queue m_RecvQueue; };