* make rpc compile right

* make link sessions introspectable
* make utp write buffers fully flush each tick
pull/313/head
Jeff Becker 5 years ago
parent d75e925bda
commit 1d958d95de
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -239,6 +239,9 @@ set(CRYPTOGRAPHY_LIB ${LIB}-cryptography)
set(UTIL_LIB ${LIB}-util) set(UTIL_LIB ${LIB}-util)
set(PLATFORM_LIB ${LIB}-platform) set(PLATFORM_LIB ${LIB}-platform)
set(ANDROID_LIB ${LIB}android) set(ANDROID_LIB ${LIB}android)
set(ABYSS libabyss)
set(ABYSS_LIB abyss)
set(ABYSS_EXE ${ABYSS_LIB}-main)
get_filename_component(TT_ROOT "vendor/libtuntap-master" ABSOLUTE) get_filename_component(TT_ROOT "vendor/libtuntap-master" ABSOLUTE)
add_definitions(-D${CMAKE_SYSTEM_NAME}) add_definitions(-D${CMAKE_SYSTEM_NAME})
@ -307,10 +310,44 @@ function(add_log_tag target)
endforeach(F) endforeach(F)
endfunction() endfunction()
if(USE_LIBABYSS)
add_definitions(-DUSE_ABYSS=1)
set(ABYSS_SRC
${ABYSS}/src/http.cpp
${ABYSS}/src/client.cpp
${ABYSS}/src/server.cpp
${ABYSS}/src/json.cpp)
add_library(${ABYSS_LIB} STATIC ${ABYSS_SRC})
endif(USE_LIBABYSS)
add_subdirectory(crypto) add_subdirectory(crypto)
add_subdirectory(libutp) add_subdirectory(libutp)
add_subdirectory(llarp) add_subdirectory(llarp)
if(USE_LIBABYSS)
target_link_libraries(${ABYSS_LIB} PUBLIC ${PLATFORM_LIB})
if (NOT WIN32)
add_executable(${ABYSS_EXE} ${ABYSS}/main.cpp)
target_link_libraries(${ABYSS_EXE} PUBLIC ${ABYSS_LIB} ${STATIC_LIB} Threads::Threads)
else()
add_executable(${ABYSS_EXE} ${ABYSS}/main.cpp llarp/win32/abyss.rc)
target_link_libraries(${ABYSS_EXE} PUBLIC ${ABYSS_LIB} ${STATIC_LIB} ws2_32)
endif(NOT WIN32)
target_include_directories(${UTIL_LIB} PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/${ABYSS}/include")
target_include_directories(${ABYSS_LIB} PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/${ABYSS}/include")
target_include_directories(${ABYSS_EXE} PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/${ABYSS}/include")
# for freebsd
if(${CMAKE_SYSTEM_NAME} MATCHES "FreeBSD")
target_include_directories(${ABYSS_LIB} /usr/local/include)
endif(${CMAKE_SYSTEM_NAME} MATCHES "FreeBSD")
add_log_tag(${ABYSS_EXE})
add_log_tag(${ABYSS_LIB})
endif(USE_LIBABYSS)
if(SHADOW) if(SHADOW)
add_shadow_plugin(shadow-plugin-${SHARED_LIB} ${EXE_SRC} ${LIB_SRC} ${UTP_SRC} ${LIB_PLATFORM_SRC} ${CPP_BACKPORT_SRC} ${ABYSS_SRC} ${CRYPTOGRAPHY_SRC}) add_shadow_plugin(shadow-plugin-${SHARED_LIB} ${EXE_SRC} ${LIB_SRC} ${UTP_SRC} ${LIB_PLATFORM_SRC} ${CPP_BACKPORT_SRC} ${ABYSS_SRC} ${CRYPTOGRAPHY_SRC})
target_link_libraries(shadow-plugin-${SHARED_LIB} ${LIBS}) target_link_libraries(shadow-plugin-${SHARED_LIB} ${LIBS})
@ -349,40 +386,6 @@ else()
endif(ANDROID) endif(ANDROID)
endif(SHADOW) endif(SHADOW)
if(USE_LIBABYSS)
add_definitions(-DUSE_ABYSS=1)
set(ABYSS libabyss)
set(ABYSS_LIB abyss)
set(ABYSS_EXE ${ABYSS_LIB}-main)
set(ABYSS_SRC
${ABYSS}/src/http.cpp
${ABYSS}/src/client.cpp
${ABYSS}/src/server.cpp
${ABYSS}/src/json.cpp)
add_library(${ABYSS_LIB} STATIC ${ABYSS_SRC})
target_link_libraries(${ABYSS_LIB} PUBLIC ${PLATFORM_LIB})
if (NOT WIN32)
add_executable(${ABYSS_EXE} ${ABYSS}/main.cpp)
target_link_libraries(${ABYSS_EXE} PUBLIC ${ABYSS_LIB} ${STATIC_LIB} Threads::Threads)
else()
add_executable(${ABYSS_EXE} ${ABYSS}/main.cpp llarp/win32/abyss.rc)
target_link_libraries(${ABYSS_EXE} PUBLIC ${ABYSS_LIB} ${STATIC_LIB} ws2_32)
endif(NOT WIN32)
add_log_tag(${ABYSS_EXE})
add_log_tag(${ABYSS_LIB})
target_include_directories(${UTIL_LIB} PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/${ABYSS}/include")
target_include_directories(${ABYSS_LIB} PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/${ABYSS}/include")
target_include_directories(${ABYSS_EXE} PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/${ABYSS}/include")
# for freebsd
if(${CMAKE_SYSTEM_NAME} MATCHES "FreeBSD")
target_include_directories(${ABYSS_LIB} /usr/local/include)
endif(${CMAKE_SYSTEM_NAME} MATCHES "FreeBSD")
endif(USE_LIBABYSS)
enable_testing() enable_testing()
if (NOT SHADOW) if (NOT SHADOW)

@ -78,10 +78,6 @@ if(WIN32)
target_link_libraries(${PLATFORM_LIB} PUBLIC iphlpapi) target_link_libraries(${PLATFORM_LIB} PUBLIC iphlpapi)
endif() endif()
if(USE_LIBABYSS)
target_link_libraries(${UTIL_LIB} PUBLIC ${ABYSS_LIB})
target_link_libraries(${PLATFORM_LIB} ${ABYSS_LIB})
endif()
set(DNSLIB_SRC set(DNSLIB_SRC
dns/dotlokilookup.cpp dns/dotlokilookup.cpp
@ -208,6 +204,13 @@ set(LIB_SRC
add_library(${STATIC_LIB} STATIC ${LIB_SRC}) add_library(${STATIC_LIB} STATIC ${LIB_SRC})
target_link_libraries(${STATIC_LIB} PUBLIC ${PLATFORM_LIB} ${UTIL_LIB} ${CRYPTOGRAPHY_LIB} libutp ${LIBS}) target_link_libraries(${STATIC_LIB} PUBLIC ${PLATFORM_LIB} ${UTIL_LIB} ${CRYPTOGRAPHY_LIB} libutp ${LIBS})
if(USE_LIBABYSS)
add_definitions(-DUSE_ABYSS=1)
target_link_libraries(${UTIL_LIB} PUBLIC ${ABYSS_LIB})
target_link_libraries(${PLATFORM_LIB} PUBLIC ${ABYSS_LIB})
target_link_libraries(${STATIC_LIB} PUBLIC ${ABYSS_LIB})
endif()
if(TESTNET) if(TESTNET)
target_sources(${STATIC_LIB} PUBLIC testnet.c) target_sources(${STATIC_LIB} PUBLIC testnet.c)
endif() endif()

@ -170,6 +170,29 @@ namespace llarp
m_Pending.erase(remote); m_Pending.erase(remote);
} }
util::StatusObject
ILinkLayer::ExtractStatus() const
{
std::vector<util::StatusObject> pending, established;
std::transform(m_Pending.begin(), m_Pending.end(), std::back_inserter(pending), [](const auto & item) -> util::StatusObject {
return item.second->ExtractStatus();
});
std::transform(m_AuthedLinks.begin(), m_AuthedLinks.end(), std::back_inserter(established), [](const auto & item) -> util::StatusObject {
return item.second->ExtractStatus();
});
return {
{"name", Name()},
{"rank", uint64_t(Rank())},
{"addr", m_ourAddr.ToString()},
{"sessions", util::StatusObject{
{"pending", pending},
{"established", established}
}}
};
}
bool bool
ILinkLayer::TryEstablishTo(RouterContact rc) ILinkLayer::TryEstablishTo(RouterContact rc)
{ {
@ -196,6 +219,18 @@ namespace llarp
return true; return true;
} }
void
ILinkLayer::Tick(llarp_time_t now)
{
Lock l(m_AuthedLinksMutex);
auto itr = m_AuthedLinks.begin();
while(itr != m_AuthedLinks.end())
{
itr->second->Tick(now);
++itr;
}
}
void void
ILinkLayer::Stop() ILinkLayer::Stop()
{ {

@ -8,6 +8,7 @@
#include <router_contact.hpp> #include <router_contact.hpp>
#include <util/logic.hpp> #include <util/logic.hpp>
#include <util/threading.hpp> #include <util/threading.hpp>
#include <util/status.hpp>
#include <list> #include <list>
#include <unordered_map> #include <unordered_map>
@ -41,7 +42,7 @@ namespace llarp
/// handles close of all sessions with pubkey /// handles close of all sessions with pubkey
using SessionClosedHandler = std::function< void(llarp::RouterID) >; using SessionClosedHandler = std::function< void(llarp::RouterID) >;
struct ILinkLayer struct ILinkLayer : public util::IStateful
{ {
ILinkLayer(const SecretKey& routerEncSecret, GetRCFunc getrc, ILinkLayer(const SecretKey& routerEncSecret, GetRCFunc getrc,
LinkMessageHandler handler, SignBufferFunc signFunc, LinkMessageHandler handler, SignBufferFunc signFunc,
@ -123,6 +124,9 @@ namespace llarp
virtual const char* virtual const char*
Name() const = 0; Name() const = 0;
util::StatusObject
ExtractStatus() const override;
void void
CloseSessionTo(const RouterID& remote); CloseSessionTo(const RouterID& remote);
@ -176,9 +180,7 @@ namespace llarp
bool bool
MapAddr(const RouterID& pk, ILinkSession* s); MapAddr(const RouterID& pk, ILinkSession* s);
virtual void Tick(llarp_time_t) void Tick(llarp_time_t now);
{
}
LinkMessageHandler HandleMessage; LinkMessageHandler HandleMessage;
TimeoutHandler HandleTimeout; TimeoutHandler HandleTimeout;

@ -13,7 +13,7 @@ namespace llarp
struct LinkIntroMessage; struct LinkIntroMessage;
struct ILinkMessage; struct ILinkMessage;
struct ILinkLayer; struct ILinkLayer;
struct ILinkSession struct ILinkSession : public util::IStateful
{ {
virtual ~ILinkSession(){}; virtual ~ILinkSession(){};

@ -83,8 +83,8 @@ namespace llarp
if(expect) if(expect)
{ {
ssize_t s = utp_writev(sock, vecs.data(), vecs.size()); ssize_t s = utp_writev(sock, vecs.data(), vecs.size());
m_TXRate += s;
while(s > static_cast< ssize_t >(vecq.front().iov_len)) while(s > 0 && s >= static_cast< ssize_t >(vecq.front().iov_len))
{ {
s -= vecq.front().iov_len; s -= vecq.front().iov_len;
vecq.pop_front(); vecq.pop_front();
@ -173,6 +173,8 @@ namespace llarp
Session::TickImpl(llarp_time_t now) Session::TickImpl(llarp_time_t now)
{ {
PruneInboundMessages(now); PruneInboundMessages(now);
m_TXRate = 0;
m_RXRate = 0;
} }
/// low level read /// low level read
@ -181,6 +183,7 @@ namespace llarp
{ {
// mark we are alive // mark we are alive
Alive(); Alive();
m_RXRate += sz;
size_t s = sz; size_t s = sz;
// process leftovers // process leftovers
if(recvBufOffset) if(recvBufOffset)
@ -980,6 +983,19 @@ namespace llarp
} }
} }
util::StatusObject
Session::ExtractStatus() const
{
return {
{"client", !remoteRC.IsPublicRouter()},
{"sendBacklog", uint64_t(SendQueueBacklog())},
{"tx", m_TXRate},
{"rx", m_RXRate},
{"remoteAddr", remoteAddr.ToString()},
{"pubkey", remoteRC.pubkey.ToHex()}
};
}
bool bool
Session::GotSessionRenegotiate(const LinkIntroMessage* msg) Session::GotSessionRenegotiate(const LinkIntroMessage* msg)
{ {

@ -131,10 +131,17 @@ namespace llarp
std::unordered_map< uint32_t, InboundMessage > m_RecvMsgs; std::unordered_map< uint32_t, InboundMessage > m_RecvMsgs;
/// are we stalled or nah? /// are we stalled or nah?
bool stalled = false; bool stalled = false;
uint64_t m_RXRate = 0;
uint64_t m_TXRate = 0;
/// mark session as alive /// mark session as alive
void void
Alive(); Alive();
util::StatusObject
ExtractStatus() const override;
/// base /// base
Session(LinkLayer* p); Session(LinkLayer* p);

@ -2,7 +2,7 @@
#define LLARP_ABSTRACT_ROUTER_HPP #define LLARP_ABSTRACT_ROUTER_HPP
#include <util/types.hpp> #include <util/types.hpp>
#include <util/status.hpp>
#include <vector> #include <vector>
struct llarp_buffer_t; struct llarp_buffer_t;
@ -39,7 +39,7 @@ namespace llarp
struct IMessageHandler; struct IMessageHandler;
} }
struct AbstractRouter struct AbstractRouter : public util::IStateful
{ {
virtual ~AbstractRouter() = 0; virtual ~AbstractRouter() = 0;
@ -138,6 +138,14 @@ namespace llarp
/// returns false otherwise /// returns false otherwise
virtual bool virtual bool
CheckRenegotiateValid(RouterContact newRc, RouterContact oldRC) = 0; CheckRenegotiateValid(RouterContact newRc, RouterContact oldRC) = 0;
/// set router's service node whitelist
virtual void
SetRouterWhitelist(const std::vector<RouterID> & routers) =0 ;
/// visit each connected link session
virtual void
ForEachPeer(std::function<void(const ILinkSession*, bool)> visit) const = 0;
}; };
} // namespace llarp } // namespace llarp

@ -240,6 +240,17 @@ namespace llarp
util::StatusObject obj{{"dht", _dht->impl.ExtractStatus()}, util::StatusObject obj{{"dht", _dht->impl.ExtractStatus()},
{"services", hiddenServiceContext.ExtractStatus()}, {"services", hiddenServiceContext.ExtractStatus()},
{"exit", _exitContext.ExtractStatus()}}; {"exit", _exitContext.ExtractStatus()}};
std::vector<util::StatusObject> ob_links, ib_links;
std::transform(inboundLinks.begin(), inboundLinks.end(), std::back_inserter(ib_links), [](const auto & link) -> util::StatusObject {
return link->ExtractStatus();
});
std::transform(outboundLinks.begin(), outboundLinks.end(), std::back_inserter(ob_links), [](const auto & link) -> util::StatusObject {
return link->ExtractStatus();
});
obj.Put("links", util::StatusObject{
{"outbound", ob_links},
{"inbound", ib_links}
});
return obj; return obj;
} }
@ -977,7 +988,7 @@ namespace llarp
return false; return false;
// store it in nodedb async // store it in nodedb async
nodedb()->InsertAsync(newrc); async_verify_RC(newrc, nullptr);
// update dht if required // update dht if required
if(dht()->impl.nodes->HasNode(dht::Key_t{newrc.pubkey})) if(dht()->impl.nodes->HasNode(dht::Key_t{newrc.pubkey}))
{ {
@ -1247,6 +1258,16 @@ namespace llarp
llarp_nodedb_async_verify(job); llarp_nodedb_async_verify(job);
} }
void
Router::SetRouterWhitelist(const std::vector<RouterID> & routers)
{
lokinetRouters.clear();
for(const auto & router : routers)
lokinetRouters.emplace(router, std::numeric_limits<llarp_time_t>::max());
LogInfo("lokinet service node list now has ", lokinetRouters.size(),
" routers");
}
bool bool
Router::Run(struct llarp_nodedb *nodedb) Router::Run(struct llarp_nodedb *nodedb)
{ {

@ -68,7 +68,7 @@ namespace llarp
} }
}; };
struct Router final : public AbstractRouter, public util::IStateful struct Router final : public AbstractRouter
{ {
bool ready; bool ready;
// transient iwp encryption key // transient iwp encryption key
@ -140,6 +140,9 @@ namespace llarp
return _rc; return _rc;
} }
void
SetRouterWhitelist(const std::vector<RouterID> & routers) override;
exit::Context & exit::Context &
exitContext() override exitContext() override
{ {
@ -427,7 +430,7 @@ namespace llarp
RouterID remote, const std::vector< RouterContact > &results) override; RouterID remote, const std::vector< RouterContact > &results) override;
void void
ForEachPeer(std::function< void(const ILinkSession *, bool) > visit) const; ForEachPeer(std::function< void(const ILinkSession *, bool) > visit) const override;
void void
ForEachPeer(std::function< void(ILinkSession *) > visit); ForEachPeer(std::function< void(ILinkSession *) > visit);

@ -1,6 +1,9 @@
#include <rpc/rpc.hpp> #include <rpc/rpc.hpp>
#include <router/abstractrouter.hpp> #include <router/abstractrouter.hpp>
#include <util/logger.hpp>
#include <router_id.hpp>
#include <exit/context.hpp>
#ifdef USE_ABYSS #ifdef USE_ABYSS
#include <util/encode.hpp> #include <util/encode.hpp>
@ -160,7 +163,7 @@ namespace llarp
bool bool
Start(const std::string& remote) Start(const std::string& remote)
{ {
return RunAsync(router->netloop, remote); return RunAsync(router->netloop(), remote);
} }
abyss::http::IRPCClientHandler* abyss::http::IRPCClientHandler*
@ -177,12 +180,7 @@ namespace llarp
{ {
if(updated) if(updated)
{ {
router->lokinetRouters.clear(); router->SetRouterWhitelist(list);
for(const auto& pk : list)
router->lokinetRouters.insert(std::make_pair(
pk.data(), std::numeric_limits< llarp_time_t >::max()));
LogInfo("updated service node list, we have ",
router->lokinetRouters.size(), " authorized routers");
} }
else else
LogError("service node list not updated"); LogError("service node list not updated");
@ -229,7 +227,7 @@ namespace llarp
ListExitLevels(Response& resp) const ListExitLevels(Response& resp) const
{ {
exit::Context::TrafficStats stats; exit::Context::TrafficStats stats;
router->exitContext.CalculateExitTraffic(stats); router->exitContext().CalculateExitTraffic(stats);
resp.StartArray(); resp.StartArray();
auto itr = stats.begin(); auto itr = stats.begin();
while(itr != stats.end()) while(itr != stats.end())
@ -339,7 +337,7 @@ namespace llarp
saddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); saddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
saddr.sin_family = AF_INET; saddr.sin_family = AF_INET;
saddr.sin_port = htons(port); saddr.sin_port = htons(port);
return _handler.ServeAsync(router->netloop, router->logic(), return _handler.ServeAsync(router->netloop(), router->logic(),
(const sockaddr*)&saddr); (const sockaddr*)&saddr);
} }
}; };

Loading…
Cancel
Save