From 1d958d95deb6a5b9d24f81ae001631946b76ebe6 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Fri, 15 Feb 2019 17:19:19 -0500 Subject: [PATCH] * make rpc compile right * make link sessions introspectable * make utp write buffers fully flush each tick --- CMakeLists.txt | 71 +++++++++++++++++---------------- llarp/CMakeLists.txt | 11 +++-- llarp/link/server.cpp | 35 ++++++++++++++++ llarp/link/server.hpp | 10 +++-- llarp/link/session.hpp | 2 +- llarp/link/utp.cpp | 20 +++++++++- llarp/link/utp_internal.hpp | 7 ++++ llarp/router/abstractrouter.hpp | 12 +++++- llarp/router/router.cpp | 23 ++++++++++- llarp/router/router.hpp | 7 +++- llarp/rpc/rpc.cpp | 16 ++++---- 11 files changed, 155 insertions(+), 59 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 85a948242..0be0b1a9d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -239,6 +239,9 @@ set(CRYPTOGRAPHY_LIB ${LIB}-cryptography) set(UTIL_LIB ${LIB}-util) set(PLATFORM_LIB ${LIB}-platform) set(ANDROID_LIB ${LIB}android) +set(ABYSS libabyss) +set(ABYSS_LIB abyss) +set(ABYSS_EXE ${ABYSS_LIB}-main) get_filename_component(TT_ROOT "vendor/libtuntap-master" ABSOLUTE) add_definitions(-D${CMAKE_SYSTEM_NAME}) @@ -307,10 +310,44 @@ function(add_log_tag target) endforeach(F) endfunction() +if(USE_LIBABYSS) + add_definitions(-DUSE_ABYSS=1) + + set(ABYSS_SRC + ${ABYSS}/src/http.cpp + ${ABYSS}/src/client.cpp + ${ABYSS}/src/server.cpp + ${ABYSS}/src/json.cpp) + add_library(${ABYSS_LIB} STATIC ${ABYSS_SRC}) + + +endif(USE_LIBABYSS) + add_subdirectory(crypto) add_subdirectory(libutp) add_subdirectory(llarp) +if(USE_LIBABYSS) + target_link_libraries(${ABYSS_LIB} PUBLIC ${PLATFORM_LIB}) + + if (NOT WIN32) + add_executable(${ABYSS_EXE} ${ABYSS}/main.cpp) + target_link_libraries(${ABYSS_EXE} PUBLIC ${ABYSS_LIB} ${STATIC_LIB} Threads::Threads) + else() + add_executable(${ABYSS_EXE} ${ABYSS}/main.cpp llarp/win32/abyss.rc) + target_link_libraries(${ABYSS_EXE} PUBLIC ${ABYSS_LIB} ${STATIC_LIB} ws2_32) + endif(NOT WIN32) + target_include_directories(${UTIL_LIB} PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/${ABYSS}/include") + target_include_directories(${ABYSS_LIB} PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/${ABYSS}/include") + target_include_directories(${ABYSS_EXE} PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/${ABYSS}/include") + # for freebsd + if(${CMAKE_SYSTEM_NAME} MATCHES "FreeBSD") + target_include_directories(${ABYSS_LIB} /usr/local/include) + endif(${CMAKE_SYSTEM_NAME} MATCHES "FreeBSD") + add_log_tag(${ABYSS_EXE}) + add_log_tag(${ABYSS_LIB}) +endif(USE_LIBABYSS) + if(SHADOW) add_shadow_plugin(shadow-plugin-${SHARED_LIB} ${EXE_SRC} ${LIB_SRC} ${UTP_SRC} ${LIB_PLATFORM_SRC} ${CPP_BACKPORT_SRC} ${ABYSS_SRC} ${CRYPTOGRAPHY_SRC}) target_link_libraries(shadow-plugin-${SHARED_LIB} ${LIBS}) @@ -349,40 +386,6 @@ else() endif(ANDROID) endif(SHADOW) -if(USE_LIBABYSS) - add_definitions(-DUSE_ABYSS=1) - set(ABYSS libabyss) - set(ABYSS_LIB abyss) - set(ABYSS_EXE ${ABYSS_LIB}-main) - set(ABYSS_SRC - ${ABYSS}/src/http.cpp - ${ABYSS}/src/client.cpp - ${ABYSS}/src/server.cpp - ${ABYSS}/src/json.cpp) - add_library(${ABYSS_LIB} STATIC ${ABYSS_SRC}) - target_link_libraries(${ABYSS_LIB} PUBLIC ${PLATFORM_LIB}) - - if (NOT WIN32) - add_executable(${ABYSS_EXE} ${ABYSS}/main.cpp) - target_link_libraries(${ABYSS_EXE} PUBLIC ${ABYSS_LIB} ${STATIC_LIB} Threads::Threads) - else() - add_executable(${ABYSS_EXE} ${ABYSS}/main.cpp llarp/win32/abyss.rc) - target_link_libraries(${ABYSS_EXE} PUBLIC ${ABYSS_LIB} ${STATIC_LIB} ws2_32) - endif(NOT WIN32) - - add_log_tag(${ABYSS_EXE}) - add_log_tag(${ABYSS_LIB}) - - target_include_directories(${UTIL_LIB} PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/${ABYSS}/include") - target_include_directories(${ABYSS_LIB} PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/${ABYSS}/include") - target_include_directories(${ABYSS_EXE} PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/${ABYSS}/include") - - # for freebsd - if(${CMAKE_SYSTEM_NAME} MATCHES "FreeBSD") - target_include_directories(${ABYSS_LIB} /usr/local/include) - endif(${CMAKE_SYSTEM_NAME} MATCHES "FreeBSD") -endif(USE_LIBABYSS) - enable_testing() if (NOT SHADOW) diff --git a/llarp/CMakeLists.txt b/llarp/CMakeLists.txt index 7a27a43c7..043bf1882 100644 --- a/llarp/CMakeLists.txt +++ b/llarp/CMakeLists.txt @@ -78,10 +78,6 @@ if(WIN32) target_link_libraries(${PLATFORM_LIB} PUBLIC iphlpapi) endif() -if(USE_LIBABYSS) - target_link_libraries(${UTIL_LIB} PUBLIC ${ABYSS_LIB}) - target_link_libraries(${PLATFORM_LIB} ${ABYSS_LIB}) -endif() set(DNSLIB_SRC dns/dotlokilookup.cpp @@ -208,6 +204,13 @@ set(LIB_SRC add_library(${STATIC_LIB} STATIC ${LIB_SRC}) target_link_libraries(${STATIC_LIB} PUBLIC ${PLATFORM_LIB} ${UTIL_LIB} ${CRYPTOGRAPHY_LIB} libutp ${LIBS}) +if(USE_LIBABYSS) + add_definitions(-DUSE_ABYSS=1) + target_link_libraries(${UTIL_LIB} PUBLIC ${ABYSS_LIB}) + target_link_libraries(${PLATFORM_LIB} PUBLIC ${ABYSS_LIB}) + target_link_libraries(${STATIC_LIB} PUBLIC ${ABYSS_LIB}) +endif() + if(TESTNET) target_sources(${STATIC_LIB} PUBLIC testnet.c) endif() diff --git a/llarp/link/server.cpp b/llarp/link/server.cpp index 853e38bab..c9ebfbef5 100644 --- a/llarp/link/server.cpp +++ b/llarp/link/server.cpp @@ -170,6 +170,29 @@ namespace llarp m_Pending.erase(remote); } + util::StatusObject + ILinkLayer::ExtractStatus() const + { + std::vector pending, established; + + std::transform(m_Pending.begin(), m_Pending.end(), std::back_inserter(pending), [](const auto & item) -> util::StatusObject { + return item.second->ExtractStatus(); + }); + std::transform(m_AuthedLinks.begin(), m_AuthedLinks.end(), std::back_inserter(established), [](const auto & item) -> util::StatusObject { + return item.second->ExtractStatus(); + }); + + return { + {"name", Name()}, + {"rank", uint64_t(Rank())}, + {"addr", m_ourAddr.ToString()}, + {"sessions", util::StatusObject{ + {"pending", pending}, + {"established", established} + }} + }; + } + bool ILinkLayer::TryEstablishTo(RouterContact rc) { @@ -196,6 +219,18 @@ namespace llarp return true; } + void + ILinkLayer::Tick(llarp_time_t now) + { + Lock l(m_AuthedLinksMutex); + auto itr = m_AuthedLinks.begin(); + while(itr != m_AuthedLinks.end()) + { + itr->second->Tick(now); + ++itr; + } + } + void ILinkLayer::Stop() { diff --git a/llarp/link/server.hpp b/llarp/link/server.hpp index d657ffcf7..7349d571f 100644 --- a/llarp/link/server.hpp +++ b/llarp/link/server.hpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include @@ -41,7 +42,7 @@ namespace llarp /// handles close of all sessions with pubkey using SessionClosedHandler = std::function< void(llarp::RouterID) >; - struct ILinkLayer + struct ILinkLayer : public util::IStateful { ILinkLayer(const SecretKey& routerEncSecret, GetRCFunc getrc, LinkMessageHandler handler, SignBufferFunc signFunc, @@ -123,6 +124,9 @@ namespace llarp virtual const char* Name() const = 0; + util::StatusObject + ExtractStatus() const override; + void CloseSessionTo(const RouterID& remote); @@ -176,9 +180,7 @@ namespace llarp bool MapAddr(const RouterID& pk, ILinkSession* s); - virtual void Tick(llarp_time_t) - { - } + void Tick(llarp_time_t now); LinkMessageHandler HandleMessage; TimeoutHandler HandleTimeout; diff --git a/llarp/link/session.hpp b/llarp/link/session.hpp index 23186482f..685abaf09 100644 --- a/llarp/link/session.hpp +++ b/llarp/link/session.hpp @@ -13,7 +13,7 @@ namespace llarp struct LinkIntroMessage; struct ILinkMessage; struct ILinkLayer; - struct ILinkSession + struct ILinkSession : public util::IStateful { virtual ~ILinkSession(){}; diff --git a/llarp/link/utp.cpp b/llarp/link/utp.cpp index 9b8d7331c..a4520cc2c 100644 --- a/llarp/link/utp.cpp +++ b/llarp/link/utp.cpp @@ -83,8 +83,8 @@ namespace llarp if(expect) { ssize_t s = utp_writev(sock, vecs.data(), vecs.size()); - - while(s > static_cast< ssize_t >(vecq.front().iov_len)) + m_TXRate += s; + while(s > 0 && s >= static_cast< ssize_t >(vecq.front().iov_len)) { s -= vecq.front().iov_len; vecq.pop_front(); @@ -173,6 +173,8 @@ namespace llarp Session::TickImpl(llarp_time_t now) { PruneInboundMessages(now); + m_TXRate = 0; + m_RXRate = 0; } /// low level read @@ -181,6 +183,7 @@ namespace llarp { // mark we are alive Alive(); + m_RXRate += sz; size_t s = sz; // process leftovers if(recvBufOffset) @@ -980,6 +983,19 @@ namespace llarp } } + util::StatusObject + Session::ExtractStatus() const + { + return { + {"client", !remoteRC.IsPublicRouter()}, + {"sendBacklog", uint64_t(SendQueueBacklog())}, + {"tx", m_TXRate}, + {"rx", m_RXRate}, + {"remoteAddr", remoteAddr.ToString()}, + {"pubkey", remoteRC.pubkey.ToHex()} + }; + } + bool Session::GotSessionRenegotiate(const LinkIntroMessage* msg) { diff --git a/llarp/link/utp_internal.hpp b/llarp/link/utp_internal.hpp index 9deec7ce1..306e48d0b 100644 --- a/llarp/link/utp_internal.hpp +++ b/llarp/link/utp_internal.hpp @@ -131,10 +131,17 @@ namespace llarp std::unordered_map< uint32_t, InboundMessage > m_RecvMsgs; /// are we stalled or nah? bool stalled = false; + + uint64_t m_RXRate = 0; + uint64_t m_TXRate = 0; + /// mark session as alive void Alive(); + util::StatusObject + ExtractStatus() const override; + /// base Session(LinkLayer* p); diff --git a/llarp/router/abstractrouter.hpp b/llarp/router/abstractrouter.hpp index a5c96dbac..bcb219134 100644 --- a/llarp/router/abstractrouter.hpp +++ b/llarp/router/abstractrouter.hpp @@ -2,7 +2,7 @@ #define LLARP_ABSTRACT_ROUTER_HPP #include - +#include #include struct llarp_buffer_t; @@ -39,7 +39,7 @@ namespace llarp struct IMessageHandler; } - struct AbstractRouter + struct AbstractRouter : public util::IStateful { virtual ~AbstractRouter() = 0; @@ -138,6 +138,14 @@ namespace llarp /// returns false otherwise virtual bool CheckRenegotiateValid(RouterContact newRc, RouterContact oldRC) = 0; + + /// set router's service node whitelist + virtual void + SetRouterWhitelist(const std::vector & routers) =0 ; + + /// visit each connected link session + virtual void + ForEachPeer(std::function visit) const = 0; }; } // namespace llarp diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index 324f803db..bbef4ef8a 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -240,6 +240,17 @@ namespace llarp util::StatusObject obj{{"dht", _dht->impl.ExtractStatus()}, {"services", hiddenServiceContext.ExtractStatus()}, {"exit", _exitContext.ExtractStatus()}}; + std::vector ob_links, ib_links; + std::transform(inboundLinks.begin(), inboundLinks.end(), std::back_inserter(ib_links), [](const auto & link) -> util::StatusObject { + return link->ExtractStatus(); + }); + std::transform(outboundLinks.begin(), outboundLinks.end(), std::back_inserter(ob_links), [](const auto & link) -> util::StatusObject { + return link->ExtractStatus(); + }); + obj.Put("links", util::StatusObject{ + {"outbound", ob_links}, + {"inbound", ib_links} + }); return obj; } @@ -977,7 +988,7 @@ namespace llarp return false; // store it in nodedb async - nodedb()->InsertAsync(newrc); + async_verify_RC(newrc, nullptr); // update dht if required if(dht()->impl.nodes->HasNode(dht::Key_t{newrc.pubkey})) { @@ -1247,6 +1258,16 @@ namespace llarp llarp_nodedb_async_verify(job); } + void + Router::SetRouterWhitelist(const std::vector & routers) + { + lokinetRouters.clear(); + for(const auto & router : routers) + lokinetRouters.emplace(router, std::numeric_limits::max()); + LogInfo("lokinet service node list now has ", lokinetRouters.size(), + " routers"); + } + bool Router::Run(struct llarp_nodedb *nodedb) { diff --git a/llarp/router/router.hpp b/llarp/router/router.hpp index f42a9a4e4..f84072ce2 100644 --- a/llarp/router/router.hpp +++ b/llarp/router/router.hpp @@ -68,7 +68,7 @@ namespace llarp } }; - struct Router final : public AbstractRouter, public util::IStateful + struct Router final : public AbstractRouter { bool ready; // transient iwp encryption key @@ -140,6 +140,9 @@ namespace llarp return _rc; } + void + SetRouterWhitelist(const std::vector & routers) override; + exit::Context & exitContext() override { @@ -427,7 +430,7 @@ namespace llarp RouterID remote, const std::vector< RouterContact > &results) override; void - ForEachPeer(std::function< void(const ILinkSession *, bool) > visit) const; + ForEachPeer(std::function< void(const ILinkSession *, bool) > visit) const override; void ForEachPeer(std::function< void(ILinkSession *) > visit); diff --git a/llarp/rpc/rpc.cpp b/llarp/rpc/rpc.cpp index 33b78f1e7..134f6cca3 100644 --- a/llarp/rpc/rpc.cpp +++ b/llarp/rpc/rpc.cpp @@ -1,6 +1,9 @@ #include #include +#include +#include +#include #ifdef USE_ABYSS #include @@ -160,7 +163,7 @@ namespace llarp bool Start(const std::string& remote) { - return RunAsync(router->netloop, remote); + return RunAsync(router->netloop(), remote); } abyss::http::IRPCClientHandler* @@ -177,12 +180,7 @@ namespace llarp { if(updated) { - router->lokinetRouters.clear(); - for(const auto& pk : list) - router->lokinetRouters.insert(std::make_pair( - pk.data(), std::numeric_limits< llarp_time_t >::max())); - LogInfo("updated service node list, we have ", - router->lokinetRouters.size(), " authorized routers"); + router->SetRouterWhitelist(list); } else LogError("service node list not updated"); @@ -229,7 +227,7 @@ namespace llarp ListExitLevels(Response& resp) const { exit::Context::TrafficStats stats; - router->exitContext.CalculateExitTraffic(stats); + router->exitContext().CalculateExitTraffic(stats); resp.StartArray(); auto itr = stats.begin(); while(itr != stats.end()) @@ -339,7 +337,7 @@ namespace llarp saddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); saddr.sin_family = AF_INET; saddr.sin_port = htons(port); - return _handler.ServeAsync(router->netloop, router->logic(), + return _handler.ServeAsync(router->netloop(), router->logic(), (const sockaddr*)&saddr); } };