diff --git a/CMakeLists.txt b/CMakeLists.txt
index e686e6556..9c411c8f3 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1,6 +1,11 @@
# Lowest version - android ndk 3.6.0
cmake_minimum_required(VERSION 3.6.0)
+find_program(CCACHE_PROGRAM ccache)
+if(CCACHE_PROGRAM)
+ set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE "${CCACHE_PROGRAM}")
+endif()
+
set(PROJECT_NAME lokinet)
project(${PROJECT_NAME} C CXX)
@@ -45,11 +50,6 @@ add_definitions(-D${CMAKE_SYSTEM_NAME})
get_filename_component(CORE_INCLUDE "${CMAKE_CURRENT_SOURCE_DIR}/include" ABSOLUTE)
get_filename_component(ABYSS_INCLUDE "${CMAKE_CURRENT_SOURCE_DIR}/${ABYSS}/include" ABSOLUTE)
-find_program(CCACHE_PROGRAM ccache)
-if(CCACHE_PROGRAM)
- set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE "${CCACHE_PROGRAM}")
-endif()
-
if(MSVC_VERSION)
enable_language(ASM_MASM)
list(APPEND CMAKE_ASM_MASM_SOURCE_FILE_EXTENSIONS s)
diff --git a/llarp/CMakeLists.txt b/llarp/CMakeLists.txt
index 0a19450d6..26507a5a5 100644
--- a/llarp/CMakeLists.txt
+++ b/llarp/CMakeLists.txt
@@ -173,6 +173,8 @@ set(LIB_SRC
iwp/linklayer.cpp
iwp/outermessage.cpp
iwp/iwp.cpp
+ link/i_link_manager.cpp
+ link/link_manager.cpp
link/server.cpp
link/session.cpp
messages/dht_immediate.cpp
@@ -195,6 +197,12 @@ set(LIB_SRC
pow.cpp
profiling.cpp
router/abstractrouter.cpp
+ router/i_outbound_message_handler.cpp
+ router/outbound_message_handler.cpp
+ router/i_outbound_session_maker.cpp
+ router/outbound_session_maker.cpp
+ router/i_rc_lookup_handler.cpp
+ router/rc_lookup_handler.cpp
router/router.cpp
router_contact.cpp
router_id.cpp
diff --git a/llarp/context.cpp b/llarp/context.cpp
index a3deb4c9f..20a8d8c63 100644
--- a/llarp/context.cpp
+++ b/llarp/context.cpp
@@ -159,8 +159,6 @@ __ ___ ____ _ _ ___ _ _ ____
int
Context::LoadDatabase()
{
- nodedb = std::make_unique< llarp_nodedb >(router->diskworker());
-
if(!llarp_nodedb::ensure_dir(nodedb_dir.c_str()))
{
llarp::LogError("nodedb_dir is incorrect");
@@ -233,16 +231,21 @@ __ ___ ____ _ _ ___ _ _ ____
cryptoManager = std::make_unique< CryptoManager >(crypto.get());
router = std::make_unique< Router >(worker, mainloop, logic);
- if(!router->Configure(config.get()))
+
+ nodedb = std::make_unique< llarp_nodedb >(router->diskworker());
+
+ if(!router->Configure(config.get(), nodedb.get()))
{
llarp::LogError("Failed to configure router");
return 1;
}
+
// must be done after router is made so we can use its disk io worker
// must also be done after configure so that netid is properly set if it
// is provided by config
if(!this->LoadDatabase())
return 1;
+
return 0;
}
diff --git a/llarp/dht/context.cpp b/llarp/dht/context.cpp
index 576f684ac..c7c208f82 100644
--- a/llarp/dht/context.cpp
+++ b/llarp/dht/context.cpp
@@ -199,6 +199,20 @@ namespace llarp
return _nodes.get();
}
+ void
+ PutRCNodeAsync(const RCNode& val) override
+ {
+ auto func = std::bind(&Bucket< RCNode >::PutNode, Nodes(), val);
+ router->logic()->queue_func(func);
+ }
+
+ void
+ DelRCNodeAsync(const Key_t& val) override
+ {
+ auto func = std::bind(&Bucket< RCNode >::DelNode, Nodes(), val);
+ router->logic()->queue_func(func);
+ }
+
const Key_t&
OurKey() const override
{
diff --git a/llarp/dht/context.hpp b/llarp/dht/context.hpp
index 431fe1005..392e52ad4 100644
--- a/llarp/dht/context.hpp
+++ b/llarp/dht/context.hpp
@@ -173,6 +173,12 @@ namespace llarp
virtual Bucket< RCNode >*
Nodes() const = 0;
+ virtual void
+ PutRCNodeAsync(const RCNode& val) = 0;
+
+ virtual void
+ DelRCNodeAsync(const Key_t& val) = 0;
+
virtual util::StatusObject
ExtractStatus() const = 0;
diff --git a/llarp/iwp/iwp.cpp b/llarp/iwp/iwp.cpp
index 1c02b137c..457a51340 100644
--- a/llarp/iwp/iwp.cpp
+++ b/llarp/iwp/iwp.cpp
@@ -7,19 +7,6 @@ namespace llarp
{
namespace iwp
{
- std::unique_ptr< ILinkLayer >
- NewServerFromRouter(AbstractRouter* r)
- {
- return NewServer(
- r->encryption(), std::bind(&AbstractRouter::rc, r),
- util::memFn(&AbstractRouter::HandleRecvLinkMessageBuffer, r),
- util::memFn(&AbstractRouter::OnSessionEstablished, r),
- util::memFn(&AbstractRouter::CheckRenegotiateValid, r),
- util::memFn(&AbstractRouter::Sign, r),
- util::memFn(&AbstractRouter::OnConnectTimeout, r),
- util::memFn(&AbstractRouter::SessionClosed, r));
- }
-
std::unique_ptr< ILinkLayer >
NewServer(const SecretKey& enckey, GetRCFunc getrc, LinkMessageHandler h,
SessionEstablishedHandler est, SessionRenegotiateHandler reneg,
diff --git a/llarp/iwp/iwp.hpp b/llarp/iwp/iwp.hpp
index b3c5464ca..e7a10413e 100644
--- a/llarp/iwp/iwp.hpp
+++ b/llarp/iwp/iwp.hpp
@@ -18,9 +18,6 @@ namespace llarp
llarp::SignBufferFunc sign, llarp::TimeoutHandler timeout,
llarp::SessionClosedHandler closed);
- std::unique_ptr< ILinkLayer >
- NewServerFromRouter(AbstractRouter* r);
-
} // namespace iwp
} // namespace llarp
diff --git a/llarp/link/i_link_manager.cpp b/llarp/link/i_link_manager.cpp
new file mode 100644
index 000000000..ffdfcdc38
--- /dev/null
+++ b/llarp/link/i_link_manager.cpp
@@ -0,0 +1 @@
+#include
diff --git a/llarp/link/i_link_manager.hpp b/llarp/link/i_link_manager.hpp
new file mode 100644
index 000000000..7015b8e67
--- /dev/null
+++ b/llarp/link/i_link_manager.hpp
@@ -0,0 +1,85 @@
+#ifndef LLARP_I_LINK_MANAGER_HPP
+#define LLARP_I_LINK_MANAGER_HPP
+
+#include
+#include
+#include
+
+#include
+
+struct llarp_buffer_t;
+
+namespace llarp
+{
+ using Logic_ptr = std::shared_ptr< Logic >;
+
+ struct RouterContact;
+ struct ILinkSession;
+ struct IOutboundSessionMaker;
+ struct RouterID;
+
+ namespace util
+ {
+ struct StatusObject;
+ } // namespace util
+
+ struct ILinkManager
+ {
+ virtual ~ILinkManager() = default;
+
+ virtual LinkLayer_ptr
+ GetCompatibleLink(const RouterContact &rc) const = 0;
+
+ virtual IOutboundSessionMaker *
+ GetSessionMaker() const = 0;
+
+ virtual bool
+ SendTo(const RouterID &remote, const llarp_buffer_t &buf) = 0;
+
+ virtual bool
+ HasSessionTo(const RouterID &remote) const = 0;
+
+ virtual void
+ PumpLinks() = 0;
+
+ virtual void
+ AddLink(LinkLayer_ptr link, bool inbound = false) = 0;
+
+ virtual bool
+ StartLinks(Logic_ptr logic) = 0;
+
+ virtual void
+ Stop() = 0;
+
+ virtual void
+ PersistSessionUntil(const RouterID &remote, llarp_time_t until) = 0;
+
+ virtual void
+ ForEachPeer(std::function< void(const ILinkSession *, bool) > visit,
+ bool randomize = false) const = 0;
+
+ virtual void
+ ForEachPeer(std::function< void(ILinkSession *) > visit) = 0;
+
+ virtual void
+ ForEachInboundLink(std::function< void(LinkLayer_ptr) > visit) const = 0;
+
+ virtual size_t
+ NumberOfConnectedRouters() const = 0;
+
+ virtual size_t
+ NumberOfConnectedClients() const = 0;
+
+ virtual bool
+ GetRandomConnectedRouter(RouterContact &router) const = 0;
+
+ virtual void
+ CheckPersistingSessions(llarp_time_t now) = 0;
+
+ virtual util::StatusObject
+ ExtractStatus() const = 0;
+ };
+
+} // namespace llarp
+
+#endif // LLARP_I_LINK_MANAGER_HPP
diff --git a/llarp/link/link_manager.cpp b/llarp/link/link_manager.cpp
new file mode 100644
index 000000000..d62e6c2cf
--- /dev/null
+++ b/llarp/link/link_manager.cpp
@@ -0,0 +1,362 @@
+#include
+
+#include
+#include
+
+#include
+#include
+
+namespace llarp
+{
+ LinkLayer_ptr
+ LinkManager::GetCompatibleLink(const RouterContact &rc) const
+ {
+ if(stopping)
+ return nullptr;
+
+ for(auto &link : outboundLinks)
+ {
+ // TODO: may want to add some memory of session failures for a given
+ // router on a given link and not return that link here for a
+ // duration
+ if(!link->IsCompatable(rc))
+ continue;
+
+ return link;
+ }
+
+ return nullptr;
+ }
+
+ IOutboundSessionMaker *
+ LinkManager::GetSessionMaker() const
+ {
+ return _sessionMaker;
+ }
+
+ bool
+ LinkManager::SendTo(const RouterID &remote, const llarp_buffer_t &buf)
+ {
+ if(stopping)
+ return false;
+
+ auto link = GetLinkWithSessionTo(remote);
+ if(link == nullptr)
+ {
+ return false;
+ }
+
+ return link->SendTo(remote, buf);
+ }
+
+ bool
+ LinkManager::HasSessionTo(const RouterID &remote) const
+ {
+ return GetLinkWithSessionTo(remote) != nullptr;
+ }
+
+ void
+ LinkManager::PumpLinks()
+ {
+ for(const auto &link : inboundLinks)
+ {
+ link->Pump();
+ }
+ for(const auto &link : outboundLinks)
+ {
+ link->Pump();
+ }
+ }
+
+ void
+ LinkManager::AddLink(LinkLayer_ptr link, bool inbound)
+ {
+ util::Lock l(&_mutex);
+
+ if(inbound)
+ {
+ inboundLinks.emplace(link);
+ }
+ else
+ {
+ outboundLinks.emplace(link);
+ }
+ }
+
+ bool
+ LinkManager::StartLinks(Logic_ptr logic)
+ {
+ LogInfo("starting ", outboundLinks.size(), " outbound links");
+ for(const auto &link : outboundLinks)
+ {
+ if(!link->Start(logic))
+ {
+ LogWarn("outbound link '", link->Name(), "' failed to start");
+ return false;
+ }
+ LogDebug("Outbound Link ", link->Name(), " started");
+ }
+
+ if(inboundLinks.size())
+ {
+ LogInfo("starting ", inboundLinks.size(), " inbound links");
+ for(const auto &link : inboundLinks)
+ {
+ if(!link->Start(logic))
+ {
+ LogWarn("Link ", link->Name(), " failed to start");
+ return false;
+ }
+ LogDebug("Inbound Link ", link->Name(), " started");
+ }
+ }
+
+ return true;
+ }
+
+ void
+ LinkManager::Stop()
+ {
+ if(stopping)
+ {
+ return;
+ }
+
+ util::Lock l(&_mutex);
+
+ LogInfo("stopping links");
+ stopping = true;
+
+ for(const auto &link : outboundLinks)
+ link->Stop();
+ for(const auto &link : inboundLinks)
+ link->Stop();
+ }
+
+ void
+ LinkManager::PersistSessionUntil(const RouterID &remote, llarp_time_t until)
+ {
+ if(stopping)
+ return;
+
+ util::Lock l(&_mutex);
+
+ m_PersistingSessions[remote] =
+ std::max(until, m_PersistingSessions[remote]);
+ LogDebug("persist session to ", remote, " until ",
+ m_PersistingSessions[remote]);
+ }
+
+ void
+ LinkManager::ForEachPeer(
+ std::function< void(const ILinkSession *, bool) > visit,
+ bool randomize) const
+ {
+ if(stopping)
+ return;
+
+ for(const auto &link : outboundLinks)
+ {
+ link->ForEachSession(
+ [visit](const ILinkSession *peer) { visit(peer, true); }, randomize);
+ }
+ for(const auto &link : inboundLinks)
+ {
+ link->ForEachSession(
+ [visit](const ILinkSession *peer) { visit(peer, false); }, randomize);
+ }
+ }
+
+ void
+ LinkManager::ForEachPeer(std::function< void(ILinkSession *) > visit)
+ {
+ if(stopping)
+ return;
+
+ for(const auto &link : outboundLinks)
+ {
+ link->ForEachSession([visit](ILinkSession *peer) { visit(peer); });
+ }
+ for(const auto &link : inboundLinks)
+ {
+ link->ForEachSession([visit](ILinkSession *peer) { visit(peer); });
+ }
+ }
+
+ void
+ LinkManager::ForEachInboundLink(
+ std::function< void(LinkLayer_ptr) > visit) const
+ {
+ for(const auto &link : inboundLinks)
+ {
+ visit(link);
+ }
+ }
+
+ size_t
+ LinkManager::NumberOfConnectedRouters() const
+ {
+ std::set< RouterID > connectedRouters;
+
+ auto fn = [&connectedRouters](const ILinkSession *session, bool) {
+ if(session->IsEstablished())
+ {
+ const RouterContact rc(session->GetRemoteRC());
+ if(rc.IsPublicRouter())
+ {
+ connectedRouters.insert(rc.pubkey);
+ }
+ }
+ };
+
+ ForEachPeer(fn);
+
+ return connectedRouters.size();
+ }
+
+ size_t
+ LinkManager::NumberOfConnectedClients() const
+ {
+ std::set< RouterID > connectedClients;
+
+ auto fn = [&connectedClients](const ILinkSession *session, bool) {
+ if(session->IsEstablished())
+ {
+ const RouterContact rc(session->GetRemoteRC());
+ if(!rc.IsPublicRouter())
+ {
+ connectedClients.insert(rc.pubkey);
+ }
+ }
+ };
+
+ ForEachPeer(fn);
+
+ return connectedClients.size();
+ }
+
+ bool
+ LinkManager::GetRandomConnectedRouter(RouterContact &router) const
+ {
+ std::unordered_map< RouterID, RouterContact, RouterID::Hash >
+ connectedRouters;
+
+ ForEachPeer(
+ [&connectedRouters](const ILinkSession *peer, bool unused) {
+ (void)unused;
+ connectedRouters[peer->GetPubKey()] = peer->GetRemoteRC();
+ },
+ false);
+
+ const auto sz = connectedRouters.size();
+ if(sz)
+ {
+ auto itr = connectedRouters.begin();
+ if(sz > 1)
+ {
+ std::advance(itr, randint() % sz);
+ }
+
+ router = itr->second;
+
+ return true;
+ }
+
+ return false;
+ }
+
+ void
+ LinkManager::CheckPersistingSessions(llarp_time_t now)
+ {
+ if(stopping)
+ return;
+
+ std::vector< RouterID > sessionsNeeded;
+
+ {
+ util::Lock l(&_mutex);
+
+ auto itr = m_PersistingSessions.begin();
+ while(itr != m_PersistingSessions.end())
+ {
+ auto link = GetLinkWithSessionTo(itr->first);
+ if(now < itr->second)
+ {
+ if(link)
+ {
+ LogDebug("keepalive to ", itr->first);
+ link->KeepAliveSessionTo(itr->first);
+ }
+ else
+ {
+ sessionsNeeded.push_back(itr->first);
+ }
+ ++itr;
+ }
+ else
+ {
+ const RouterID r(itr->first);
+ LogInfo("commit to ", r, " expired");
+ itr = m_PersistingSessions.erase(itr);
+ }
+ }
+ }
+
+ for(const auto &router : sessionsNeeded)
+ {
+ _sessionMaker->CreateSessionTo(router, nullptr);
+ }
+ }
+
+ util::StatusObject
+ LinkManager::ExtractStatus() const
+ {
+ std::vector< util::StatusObject > ob_links, ib_links;
+ std::transform(inboundLinks.begin(), inboundLinks.end(),
+ std::back_inserter(ib_links),
+ [](const auto &link) -> util::StatusObject {
+ return link->ExtractStatus();
+ });
+ std::transform(outboundLinks.begin(), outboundLinks.end(),
+ std::back_inserter(ob_links),
+ [](const auto &link) -> util::StatusObject {
+ return link->ExtractStatus();
+ });
+
+ util::StatusObject obj{{"outbound", ob_links}, {"inbound", ib_links}};
+
+ return obj;
+ }
+
+ void
+ LinkManager::Init(IOutboundSessionMaker *sessionMaker)
+ {
+ stopping = false;
+ _sessionMaker = sessionMaker;
+ }
+
+ LinkLayer_ptr
+ LinkManager::GetLinkWithSessionTo(const RouterID &remote) const
+ {
+ if(stopping)
+ return nullptr;
+
+ for(const auto &link : inboundLinks)
+ {
+ if(link->HasSessionTo(remote))
+ {
+ return link;
+ }
+ }
+ for(const auto &link : outboundLinks)
+ {
+ if(link->HasSessionTo(remote))
+ {
+ return link;
+ }
+ }
+
+ return nullptr;
+ }
+
+} // namespace llarp
diff --git a/llarp/link/link_manager.hpp b/llarp/link/link_manager.hpp
new file mode 100644
index 000000000..ef6aa5c30
--- /dev/null
+++ b/llarp/link/link_manager.hpp
@@ -0,0 +1,100 @@
+#ifndef LLARP_LINK_MANAGER_HPP
+#define LLARP_LINK_MANAGER_HPP
+
+#include
+
+#include
+#include
+#include
+
+#include
+#include
+#include
+
+namespace llarp
+{
+ struct IRouterContactManager;
+
+ struct LinkManager final : public ILinkManager
+ {
+ public:
+ ~LinkManager() = default;
+
+ LinkLayer_ptr
+ GetCompatibleLink(const RouterContact &rc) const override;
+
+ IOutboundSessionMaker *
+ GetSessionMaker() const override;
+
+ bool
+ SendTo(const RouterID &remote, const llarp_buffer_t &buf) override;
+
+ bool
+ HasSessionTo(const RouterID &remote) const override;
+
+ void
+ PumpLinks() override;
+
+ void
+ AddLink(LinkLayer_ptr link, bool inbound = false) override;
+
+ bool
+ StartLinks(Logic_ptr logic) override;
+
+ void
+ Stop() override;
+
+ void
+ PersistSessionUntil(const RouterID &remote, llarp_time_t until) override;
+
+ void
+ ForEachPeer(std::function< void(const ILinkSession *, bool) > visit,
+ bool randomize = false) const override;
+
+ void
+ ForEachPeer(std::function< void(ILinkSession *) > visit) override;
+
+ void
+ ForEachInboundLink(
+ std::function< void(LinkLayer_ptr) > visit) const override;
+
+ size_t
+ NumberOfConnectedRouters() const override;
+
+ size_t
+ NumberOfConnectedClients() const override;
+
+ bool
+ GetRandomConnectedRouter(RouterContact &router) const override;
+
+ void
+ CheckPersistingSessions(llarp_time_t now) override;
+
+ virtual util::StatusObject
+ ExtractStatus() const override;
+
+ void
+ Init(IOutboundSessionMaker *sessionMaker);
+
+ private:
+ LinkLayer_ptr
+ GetLinkWithSessionTo(const RouterID &remote) const;
+
+ std::atomic< bool > stopping;
+ mutable util::Mutex _mutex; // protects m_PersistingSessions
+
+ using LinkSet = std::set< LinkLayer_ptr, ComparePtr< LinkLayer_ptr > >;
+
+ LinkSet outboundLinks;
+ LinkSet inboundLinks;
+
+ // sessions to persist -> timestamp to end persist at
+ std::unordered_map< RouterID, llarp_time_t, RouterID::Hash >
+ m_PersistingSessions GUARDED_BY(_mutex);
+
+ IOutboundSessionMaker *_sessionMaker;
+ };
+
+} // namespace llarp
+
+#endif // LLARP_LINK_MANAGER_HPP
diff --git a/llarp/nodedb.cpp b/llarp/nodedb.cpp
index 748e62b94..9d7134258 100644
--- a/llarp/nodedb.cpp
+++ b/llarp/nodedb.cpp
@@ -151,6 +151,13 @@ llarp_nodedb::UpdateAsyncIfNewer(llarp::RouterContact rc,
InsertAsync(rc, logic, completionHandler);
return true;
}
+ else if(itr != entries.end())
+ {
+ // insertion time is set on...insertion. But it should be updated here
+ // even if there is no insertion of a new RC, to show that the existing one
+ // is not "stale"
+ itr->second.inserted = llarp::time_now_ms();
+ }
return false;
}
@@ -189,6 +196,8 @@ llarp_nodedb::Insert(const llarp::RouterContact &rc)
if(itr != entries.end())
entries.erase(itr);
entries.emplace(rc.pubkey.as_array(), rc);
+ LogInfo("Added or updated RC for ", llarp::RouterID(rc.pubkey),
+ " to nodedb. Current nodedb count is: ", entries.size());
}
return true;
}
@@ -313,6 +322,26 @@ llarp_nodedb::VisitInsertedBefore(
}
}
+void
+llarp_nodedb::RemoveStaleRCs(const std::set< llarp::RouterID > &keep,
+ llarp_time_t cutoff)
+{
+ std::set< llarp::RouterID > removeStale;
+ // remove stale routers
+ VisitInsertedBefore(
+ [&](const llarp::RouterContact &rc) {
+ if(keep.find(rc.pubkey) != keep.end())
+ return;
+ LogInfo("removing stale router: ", llarp::RouterID(rc.pubkey));
+ removeStale.insert(rc.pubkey);
+ },
+ cutoff);
+
+ RemoveIf([&removeStale](const llarp::RouterContact &rc) -> bool {
+ return removeStale.count(rc.pubkey) > 0;
+ });
+}
+
/*
bool
llarp_nodedb::Save()
diff --git a/llarp/nodedb.hpp b/llarp/nodedb.hpp
index 582298a20..29d29faa7 100644
--- a/llarp/nodedb.hpp
+++ b/llarp/nodedb.hpp
@@ -60,7 +60,7 @@ struct llarp_nodedb
struct NetDBEntry
{
const llarp::RouterContact rc;
- const llarp_time_t inserted;
+ llarp_time_t inserted;
NetDBEntry(const llarp::RouterContact &data);
};
@@ -140,6 +140,9 @@ struct llarp_nodedb
VisitInsertedBefore(std::function< void(const llarp::RouterContact &) > visit,
llarp_time_t insertedAfter) LOCKS_EXCLUDED(access);
+ void
+ RemoveStaleRCs(const std::set< llarp::RouterID > &keep, llarp_time_t cutoff);
+
size_t
num_loaded() const LOCKS_EXCLUDED(access);
diff --git a/llarp/router/abstractrouter.cpp b/llarp/router/abstractrouter.cpp
index 9bb8be2f8..5f70ceeb7 100644
--- a/llarp/router/abstractrouter.cpp
+++ b/llarp/router/abstractrouter.cpp
@@ -3,10 +3,6 @@
namespace llarp
{
- AbstractRouter::~AbstractRouter()
- {
- }
-
void
AbstractRouter::EnsureRouter(RouterID router, RouterLookupHandler handler)
{
diff --git a/llarp/router/abstractrouter.hpp b/llarp/router/abstractrouter.hpp
index 654d4623d..df3bfbbda 100644
--- a/llarp/router/abstractrouter.hpp
+++ b/llarp/router/abstractrouter.hpp
@@ -24,6 +24,10 @@ namespace llarp
struct Profiling;
struct SecretKey;
struct Signature;
+ struct IOutboundMessageHandler;
+ struct IOutboundSessionMaker;
+ struct ILinkManager;
+ struct I_RCLookupHandler;
namespace exit
{
@@ -52,10 +56,7 @@ namespace llarp
struct AbstractRouter
{
- virtual ~AbstractRouter() = 0;
-
- virtual bool
- OnSessionEstablished(ILinkSession *) = 0;
+ virtual ~AbstractRouter() = default;
virtual bool
HandleRecvLinkMessageBuffer(ILinkSession *from,
@@ -106,11 +107,23 @@ namespace llarp
virtual const service::Context &
hiddenServiceContext() const = 0;
+ virtual IOutboundMessageHandler &
+ outboundMessageHandler() = 0;
+
+ virtual IOutboundSessionMaker &
+ outboundSessionMaker() = 0;
+
+ virtual ILinkManager &
+ linkManager() = 0;
+
+ virtual I_RCLookupHandler &
+ rcLookupHandler() = 0;
+
virtual bool
Sign(Signature &sig, const llarp_buffer_t &buf) const = 0;
virtual bool
- Configure(Config *conf) = 0;
+ Configure(Config *conf, llarp_nodedb *nodedb) = 0;
virtual bool
Run(struct llarp_nodedb *nodedb) = 0;
@@ -129,9 +142,6 @@ namespace llarp
virtual const byte_t *
pubkey() const = 0;
- virtual void
- OnConnectTimeout(ILinkSession *session) = 0;
-
/// connect to N random routers
virtual void
ConnectToRandomRouters(int N) = 0;
@@ -219,11 +229,6 @@ namespace llarp
virtual bool
HasSessionTo(const RouterID &router) const = 0;
- /// return true if we are currently looking up this router either directly
- /// or via an anonymous endpoint
- virtual bool
- HasPendingRouterLookup(const RouterID &router) const = 0;
-
virtual util::StatusObject
ExtractStatus() const = 0;
diff --git a/llarp/router/i_outbound_message_handler.cpp b/llarp/router/i_outbound_message_handler.cpp
new file mode 100644
index 000000000..6a4b6a25f
--- /dev/null
+++ b/llarp/router/i_outbound_message_handler.cpp
@@ -0,0 +1 @@
+#include
diff --git a/llarp/router/i_outbound_message_handler.hpp b/llarp/router/i_outbound_message_handler.hpp
new file mode 100644
index 000000000..4789c1bf9
--- /dev/null
+++ b/llarp/router/i_outbound_message_handler.hpp
@@ -0,0 +1,43 @@
+#ifndef LLARP_ROUTER_I_OUTBOUND_MESSAGE_HANDLER_HPP
+#define LLARP_ROUTER_I_OUTBOUND_MESSAGE_HANDLER_HPP
+
+#include
+#include
+
+namespace llarp
+{
+ enum class SendStatus
+ {
+ Success,
+ Timeout,
+ NoLink,
+ InvalidRouter,
+ RouterNotFound,
+ Congestion
+ };
+
+ struct ILinkMessage;
+ struct RouterID;
+
+ namespace util
+ {
+ struct StatusObject;
+ }
+
+ using SendStatusHandler = std::function< void(SendStatus) >;
+
+ struct IOutboundMessageHandler
+ {
+ virtual ~IOutboundMessageHandler() = default;
+
+ virtual bool
+ QueueMessage(const RouterID &remote, const ILinkMessage *msg,
+ SendStatusHandler callback) = 0;
+
+ virtual util::StatusObject
+ ExtractStatus() const = 0;
+ };
+
+} // namespace llarp
+
+#endif // LLARP_ROUTER_I_OUTBOUND_MESSAGE_HANDLER_HPP
diff --git a/llarp/router/i_outbound_session_maker.cpp b/llarp/router/i_outbound_session_maker.cpp
new file mode 100644
index 000000000..35243e594
--- /dev/null
+++ b/llarp/router/i_outbound_session_maker.cpp
@@ -0,0 +1 @@
+#include
diff --git a/llarp/router/i_outbound_session_maker.hpp b/llarp/router/i_outbound_session_maker.hpp
new file mode 100644
index 000000000..fdebf0839
--- /dev/null
+++ b/llarp/router/i_outbound_session_maker.hpp
@@ -0,0 +1,59 @@
+#ifndef LLARP_ROUTER_I_OUTBOUND_SESSION_MAKER_HPP
+#define LLARP_ROUTER_I_OUTBOUND_SESSION_MAKER_HPP
+
+#include
+
+#include
+
+namespace llarp
+{
+ namespace util
+ {
+ struct StatusObject;
+ } // namespace util
+
+ struct ILinkSession;
+ struct RouterID;
+ struct RouterContact;
+
+ enum class SessionResult
+ {
+ Establish,
+ Timeout,
+ RouterNotFound,
+ InvalidRouter,
+ NoLink
+ };
+
+ using RouterCallback =
+ std::function< void(const RouterID &, const SessionResult) >;
+
+ struct IOutboundSessionMaker
+ {
+ virtual ~IOutboundSessionMaker() = default;
+
+ virtual bool
+ OnSessionEstablished(ILinkSession *session) = 0;
+
+ virtual void
+ OnConnectTimeout(ILinkSession *session) = 0;
+
+ virtual void
+ CreateSessionTo(const RouterID &router, RouterCallback on_result) = 0;
+
+ virtual void
+ CreateSessionTo(const RouterContact &rc, RouterCallback on_result) = 0;
+
+ virtual bool
+ HavePendingSessionTo(const RouterID &router) const = 0;
+
+ virtual void
+ ConnectToRandomRouters(int numDesired, llarp_time_t now) = 0;
+
+ virtual util::StatusObject
+ ExtractStatus() const = 0;
+ };
+
+} // namespace llarp
+
+#endif // LLARP_ROUTER_I_OUTBOUND_SESSION_MAKER_HPP
diff --git a/llarp/router/i_rc_lookup_handler.cpp b/llarp/router/i_rc_lookup_handler.cpp
new file mode 100644
index 000000000..5582e4c83
--- /dev/null
+++ b/llarp/router/i_rc_lookup_handler.cpp
@@ -0,0 +1 @@
+#include
diff --git a/llarp/router/i_rc_lookup_handler.hpp b/llarp/router/i_rc_lookup_handler.hpp
new file mode 100644
index 000000000..33ef5581f
--- /dev/null
+++ b/llarp/router/i_rc_lookup_handler.hpp
@@ -0,0 +1,63 @@
+#ifndef LLARP_I_RC_LOOKUP_HANDLER_HPP
+#define LLARP_I_RC_LOOKUP_HANDLER_HPP
+
+#include
+#include
+
+#include
+#include
+#include
+
+namespace llarp
+{
+ struct RouterContact;
+
+ enum class RCRequestResult
+ {
+ Success,
+ InvalidRouter,
+ RouterNotFound,
+ BadRC
+ };
+
+ using RCRequestCallback = std::function< void(
+ const RouterID &, const RouterContact *const, const RCRequestResult) >;
+
+ struct I_RCLookupHandler
+ {
+ virtual ~I_RCLookupHandler() = default;
+
+ virtual void
+ AddValidRouter(const RouterID &router) = 0;
+
+ virtual void
+ RemoveValidRouter(const RouterID &router) = 0;
+
+ virtual void
+ SetRouterWhitelist(const std::vector< RouterID > &routers) = 0;
+
+ virtual void
+ GetRC(const RouterID &router, RCRequestCallback callback) = 0;
+
+ virtual bool
+ RemoteIsAllowed(const RouterID &remote) const = 0;
+
+ virtual bool
+ CheckRC(const RouterContact &rc) const = 0;
+
+ virtual bool
+ GetRandomWhitelistRouter(RouterID &router) const = 0;
+
+ virtual bool
+ CheckRenegotiateValid(RouterContact newrc, RouterContact oldrc) = 0;
+
+ virtual void
+ PeriodicUpdate(llarp_time_t now) = 0;
+
+ virtual void
+ ExploreNetwork() = 0;
+ };
+
+} // namespace llarp
+
+#endif // LLARP_I_RC_LOOKUP_HANDLER_HPP
diff --git a/llarp/router/outbound_message_handler.cpp b/llarp/router/outbound_message_handler.cpp
new file mode 100644
index 000000000..fda79f737
--- /dev/null
+++ b/llarp/router/outbound_message_handler.cpp
@@ -0,0 +1,224 @@
+#include
+
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include
+#include
+
+namespace llarp
+{
+ bool
+ OutboundMessageHandler::QueueMessage(const RouterID &remote,
+ const ILinkMessage *msg,
+ SendStatusHandler callback)
+ {
+ std::array< byte_t, MAX_LINK_MSG_SIZE > linkmsg_buffer;
+ llarp_buffer_t buf(linkmsg_buffer);
+
+ if(!EncodeBuffer(msg, buf))
+ {
+ return false;
+ }
+
+ Message message;
+ message.first.reserve(buf.sz);
+ message.second = callback;
+
+ std::copy_n(buf.base, buf.sz, message.first.data());
+
+ if(SendIfSession(remote, message))
+ {
+ return true;
+ }
+
+ bool shouldCreateSession = false;
+ {
+ util::Lock l(&_mutex);
+
+ // create queue for if it doesn't exist, and get iterator
+ auto itr_pair = outboundMessageQueue.emplace(remote, MessageQueue());
+
+ itr_pair.first->second.push_back(std::move(message));
+
+ shouldCreateSession = itr_pair.second;
+ }
+
+ if(shouldCreateSession)
+ {
+ QueueSessionCreation(remote);
+ }
+
+ return true;
+ }
+
+ // TODO: this
+ util::StatusObject
+ OutboundMessageHandler::ExtractStatus() const
+ {
+ util::StatusObject status{};
+ return status;
+ }
+
+ void
+ OutboundMessageHandler::Init(ILinkManager *linkManager,
+ std::shared_ptr< Logic > logic)
+ {
+ _linkManager = linkManager;
+ _logic = logic;
+ }
+
+ void
+ OutboundMessageHandler::OnSessionEstablished(const RouterID &router)
+ {
+ FinalizeRequest(router, SendStatus::Success);
+ }
+
+ void
+ OutboundMessageHandler::OnConnectTimeout(const RouterID &router)
+ {
+ FinalizeRequest(router, SendStatus::Timeout);
+ }
+
+ void
+ OutboundMessageHandler::OnRouterNotFound(const RouterID &router)
+ {
+ FinalizeRequest(router, SendStatus::RouterNotFound);
+ }
+
+ void
+ OutboundMessageHandler::OnInvalidRouter(const RouterID &router)
+ {
+ FinalizeRequest(router, SendStatus::InvalidRouter);
+ }
+
+ void
+ OutboundMessageHandler::OnNoLink(const RouterID &router)
+ {
+ FinalizeRequest(router, SendStatus::NoLink);
+ }
+
+ void
+ OutboundMessageHandler::OnSessionResult(const RouterID &router,
+ const SessionResult result)
+ {
+ switch(result)
+ {
+ case SessionResult::Establish:
+ OnSessionEstablished(router);
+ break;
+ case SessionResult::Timeout:
+ OnConnectTimeout(router);
+ break;
+ case SessionResult::RouterNotFound:
+ OnRouterNotFound(router);
+ break;
+ case SessionResult::InvalidRouter:
+ OnInvalidRouter(router);
+ break;
+ case SessionResult::NoLink:
+ OnNoLink(router);
+ break;
+ default:
+ LogError("Impossible situation: enum class value out of bounds.");
+ std::abort();
+ break;
+ }
+ }
+
+ void
+ OutboundMessageHandler::DoCallback(SendStatusHandler callback,
+ SendStatus status)
+ {
+ if(callback)
+ {
+ auto func = std::bind(callback, status);
+ _logic->queue_func(func);
+ }
+ }
+
+ void
+ OutboundMessageHandler::QueueSessionCreation(const RouterID &remote)
+ {
+ auto fn = util::memFn(&OutboundMessageHandler::OnSessionResult, this);
+ _linkManager->GetSessionMaker()->CreateSessionTo(remote, fn);
+ }
+
+ bool
+ OutboundMessageHandler::EncodeBuffer(const ILinkMessage *msg,
+ llarp_buffer_t &buf)
+ {
+ if(!msg->BEncode(&buf))
+ {
+ LogWarn("failed to encode outbound message, buffer size left: ",
+ buf.size_left());
+ return false;
+ }
+ // set size of message
+ buf.sz = buf.cur - buf.base;
+ buf.cur = buf.base;
+
+ return true;
+ }
+
+ bool
+ OutboundMessageHandler::Send(const RouterID &remote, const Message &msg)
+ {
+ llarp_buffer_t buf(msg.first);
+ if(_linkManager->SendTo(remote, buf))
+ {
+ DoCallback(msg.second, SendStatus::Success);
+ return true;
+ }
+ DoCallback(msg.second, SendStatus::Congestion);
+ return false;
+ }
+
+ bool
+ OutboundMessageHandler::SendIfSession(const RouterID &remote,
+ const Message &msg)
+ {
+ if(_linkManager->HasSessionTo(remote))
+ {
+ return Send(remote, msg);
+ }
+ return false;
+ }
+
+ void
+ OutboundMessageHandler::FinalizeRequest(const RouterID &router,
+ SendStatus status)
+ {
+ MessageQueue movedMessages;
+ {
+ util::Lock l(&_mutex);
+ auto itr = outboundMessageQueue.find(router);
+
+ if(itr == outboundMessageQueue.end())
+ {
+ return;
+ }
+
+ movedMessages.splice(movedMessages.begin(), itr->second);
+
+ outboundMessageQueue.erase(itr);
+ }
+
+ for(const auto &msg : movedMessages)
+ {
+ if(status == SendStatus::Success)
+ {
+ Send(router, msg);
+ }
+ else
+ {
+ DoCallback(msg.second, status);
+ }
+ }
+ }
+
+} // namespace llarp
diff --git a/llarp/router/outbound_message_handler.hpp b/llarp/router/outbound_message_handler.hpp
new file mode 100644
index 000000000..c29a201e0
--- /dev/null
+++ b/llarp/router/outbound_message_handler.hpp
@@ -0,0 +1,88 @@
+#ifndef LLARP_ROUTER_OUTBOUND_MESSAGE_HANDLER_HPP
+#define LLARP_ROUTER_OUTBOUND_MESSAGE_HANDLER_HPP
+
+#include
+
+#include
+#include
+#include
+
+#include
+#include
+#include
+
+struct llarp_buffer_t;
+
+namespace llarp
+{
+ struct ILinkManager;
+ struct Logic;
+ enum class SessionResult;
+
+ struct OutboundMessageHandler final : public IOutboundMessageHandler
+ {
+ public:
+ ~OutboundMessageHandler() = default;
+
+ bool
+ QueueMessage(const RouterID &remote, const ILinkMessage *msg,
+ SendStatusHandler callback) override;
+
+ util::StatusObject
+ ExtractStatus() const override;
+
+ void
+ Init(ILinkManager *linkManager, std::shared_ptr< Logic > logic);
+
+ private:
+ using Message = std::pair< std::vector< byte_t >, SendStatusHandler >;
+ using MessageQueue = std::list< Message >;
+
+ void
+ OnSessionEstablished(const RouterID &router);
+
+ void
+ OnConnectTimeout(const RouterID &router);
+
+ void
+ OnRouterNotFound(const RouterID &router);
+
+ void
+ OnInvalidRouter(const RouterID &router);
+
+ void
+ OnNoLink(const RouterID &router);
+
+ void
+ OnSessionResult(const RouterID &router, const SessionResult result);
+
+ void
+ DoCallback(SendStatusHandler callback, SendStatus status);
+
+ void
+ QueueSessionCreation(const RouterID &remote);
+
+ bool
+ EncodeBuffer(const ILinkMessage *msg, llarp_buffer_t &buf);
+
+ bool
+ Send(const RouterID &remote, const Message &msg);
+
+ bool
+ SendIfSession(const RouterID &remote, const Message &msg);
+
+ void
+ FinalizeRequest(const RouterID &router, SendStatus status);
+
+ mutable util::Mutex _mutex; // protects outboundMessageQueue
+
+ std::unordered_map< RouterID, MessageQueue, RouterID::Hash >
+ outboundMessageQueue GUARDED_BY(_mutex);
+
+ ILinkManager *_linkManager;
+ std::shared_ptr< Logic > _logic;
+ };
+
+} // namespace llarp
+
+#endif // LLARP_ROUTER_OUTBOUND_MESSAGE_HANDLER_HPP
diff --git a/llarp/router/outbound_session_maker.cpp b/llarp/router/outbound_session_maker.cpp
new file mode 100644
index 000000000..0fc3b7171
--- /dev/null
+++ b/llarp/router/outbound_session_maker.cpp
@@ -0,0 +1,317 @@
+#include
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+namespace llarp
+{
+ struct PendingSession
+ {
+ // TODO: add session establish status metadata, e.g. num retries
+
+ const RouterContact rc;
+ LinkLayer_ptr link;
+
+ size_t attemptCount = 0;
+
+ PendingSession(const RouterContact &rc, LinkLayer_ptr link)
+ : rc(rc), link(link)
+ {
+ }
+ };
+
+ bool
+ OutboundSessionMaker::OnSessionEstablished(ILinkSession *session)
+ {
+ // TODO: do we want to keep it
+
+ const auto router = RouterID(session->GetPubKey());
+
+ const std::string remoteType =
+ session->GetRemoteRC().IsPublicRouter() ? "router" : "client";
+ LogInfo("session with ", remoteType, " [", router, "] established");
+
+ if(not _rcLookup->RemoteIsAllowed(router))
+ {
+ FinalizeRequest(router, SessionResult::InvalidRouter);
+ return false;
+ }
+
+ auto func = std::bind(&OutboundSessionMaker::VerifyRC, this,
+ session->GetRemoteRC());
+ _threadpool->addJob(func);
+
+ return true;
+ }
+
+ void
+ OutboundSessionMaker::OnConnectTimeout(ILinkSession *session)
+ {
+ // TODO: retry/num attempts
+
+ LogWarn("Session establish attempt to ", RouterID(session->GetPubKey()),
+ " timed out.");
+ FinalizeRequest(session->GetPubKey(), SessionResult::Timeout);
+ }
+
+ void
+ OutboundSessionMaker::CreateSessionTo(const RouterID &router,
+ RouterCallback on_result)
+ {
+ if(on_result)
+ {
+ util::Lock l(&_mutex);
+
+ auto itr_pair = pendingCallbacks.emplace(router, CallbacksQueue{});
+ itr_pair.first->second.push_back(on_result);
+ }
+
+ if(HavePendingSessionTo(router))
+ {
+ return;
+ }
+
+ CreatePendingSession(router);
+
+ LogDebug("Creating session establish attempt to ", router, " .");
+
+ auto fn = util::memFn(&OutboundSessionMaker::OnRouterContactResult, this);
+
+ _rcLookup->GetRC(router, fn);
+ }
+
+ void
+ OutboundSessionMaker::CreateSessionTo(const RouterContact &rc,
+ RouterCallback on_result)
+ {
+ if(on_result)
+ {
+ util::Lock l(&_mutex);
+
+ auto itr_pair = pendingCallbacks.emplace(rc.pubkey, CallbacksQueue{});
+ itr_pair.first->second.push_back(on_result);
+ }
+
+ if(not HavePendingSessionTo(rc.pubkey))
+ {
+ LogDebug("Creating session establish attempt to ", rc.pubkey, " .");
+ CreatePendingSession(rc.pubkey);
+ }
+
+ GotRouterContact(rc.pubkey, rc);
+ }
+
+ bool
+ OutboundSessionMaker::HavePendingSessionTo(const RouterID &router) const
+ {
+ util::Lock l(&_mutex);
+ return pendingSessions.find(router) != pendingSessions.end();
+ }
+
+ void
+ OutboundSessionMaker::ConnectToRandomRouters(int numDesired, llarp_time_t now)
+ {
+ int remainingDesired = numDesired;
+
+ _nodedb->visit([&](const RouterContact &other) -> bool {
+ // check if we really remainingDesired to
+ if(other.ExpiresSoon(now, 30000)) // TODO: make delta configurable
+ {
+ return remainingDesired > 0;
+ }
+ if(!_rcLookup->RemoteIsAllowed(other.pubkey))
+ {
+ return remainingDesired > 0;
+ }
+ if(randint() % 2 == 0
+ && !(_linkManager->HasSessionTo(other.pubkey)
+ || HavePendingSessionTo(other.pubkey)))
+ {
+ CreateSessionTo(other, nullptr);
+ --remainingDesired;
+ }
+ return remainingDesired > 0;
+ });
+ LogDebug("connecting to ", numDesired - remainingDesired, " out of ",
+ numDesired, " random routers");
+ }
+
+ // TODO: this
+ util::StatusObject
+ OutboundSessionMaker::ExtractStatus() const
+ {
+ util::StatusObject status{};
+ return status;
+ }
+
+ void
+ OutboundSessionMaker::Init(
+ ILinkManager *linkManager, I_RCLookupHandler *rcLookup,
+ std::shared_ptr< Logic > logic, llarp_nodedb *nodedb,
+ std::shared_ptr< llarp::thread::ThreadPool > threadpool)
+ {
+ _linkManager = linkManager;
+ _rcLookup = rcLookup;
+ _logic = logic;
+ _nodedb = nodedb;
+ _threadpool = threadpool;
+ }
+
+ void
+ OutboundSessionMaker::DoEstablish(const RouterID &router)
+ {
+ util::ReleasableLock l(&_mutex);
+
+ auto itr = pendingSessions.find(router);
+
+ if(itr == pendingSessions.end())
+ {
+ return;
+ }
+
+ const auto &job = itr->second;
+ if(!job->link->TryEstablishTo(job->rc))
+ {
+ // TODO: maybe different failure type?
+
+ l.Release();
+ FinalizeRequest(router, SessionResult::NoLink);
+ }
+ }
+
+ void
+ OutboundSessionMaker::GotRouterContact(const RouterID &router,
+ const RouterContact &rc)
+ {
+ {
+ util::Lock l(&_mutex);
+
+ // in case other request found RC for this router after this request was
+ // made
+ auto itr = pendingSessions.find(router);
+ if(itr == pendingSessions.end())
+ {
+ return;
+ }
+
+ LinkLayer_ptr link = _linkManager->GetCompatibleLink(rc);
+
+ if(!link)
+ {
+ FinalizeRequest(router, SessionResult::NoLink);
+ return;
+ }
+
+ auto session = std::make_shared< PendingSession >(rc, link);
+
+ itr->second = session;
+ }
+
+ auto fn = std::bind(&OutboundSessionMaker::DoEstablish, this, router);
+ _logic->queue_func(fn);
+ }
+
+ void
+ OutboundSessionMaker::InvalidRouter(const RouterID &router)
+ {
+ FinalizeRequest(router, SessionResult::InvalidRouter);
+ }
+
+ void
+ OutboundSessionMaker::RouterNotFound(const RouterID &router)
+ {
+ FinalizeRequest(router, SessionResult::RouterNotFound);
+ }
+
+ void
+ OutboundSessionMaker::OnRouterContactResult(const RouterID &router,
+ const RouterContact *const rc,
+ const RCRequestResult result)
+ {
+ if(not HavePendingSessionTo(router))
+ {
+ return;
+ }
+
+ switch(result)
+ {
+ case RCRequestResult::Success:
+ if(rc)
+ {
+ GotRouterContact(router, *rc);
+ }
+ else
+ {
+ LogError("RCRequestResult::Success but null rc pointer given");
+ }
+ break;
+ case RCRequestResult::InvalidRouter:
+ InvalidRouter(router);
+ break;
+ case RCRequestResult::RouterNotFound:
+ RouterNotFound(router);
+ break;
+ default:
+ break;
+ }
+ }
+
+ void
+ OutboundSessionMaker::VerifyRC(const RouterContact rc)
+ {
+ if(not _rcLookup->CheckRC(rc))
+ {
+ FinalizeRequest(rc.pubkey, SessionResult::InvalidRouter);
+ return;
+ }
+
+ FinalizeRequest(rc.pubkey, SessionResult::Establish);
+ }
+
+ void
+ OutboundSessionMaker::CreatePendingSession(const RouterID &router)
+ {
+ util::Lock l(&_mutex);
+ pendingSessions.emplace(router, nullptr);
+ }
+
+ void
+ OutboundSessionMaker::FinalizeRequest(const RouterID &router,
+ const SessionResult type)
+ {
+ CallbacksQueue movedCallbacks;
+ {
+ util::Lock l(&_mutex);
+
+ // TODO: Router profiling stuff
+
+ auto itr = pendingCallbacks.find(router);
+
+ if(itr != pendingCallbacks.end())
+ {
+ movedCallbacks.splice(movedCallbacks.begin(), itr->second);
+ pendingCallbacks.erase(itr);
+ }
+ }
+
+ for(const auto &callback : movedCallbacks)
+ {
+ auto func = std::bind(callback, router, type);
+ _logic->queue_func(func);
+ }
+
+ {
+ util::Lock l(&_mutex);
+ pendingSessions.erase(router);
+ }
+ }
+
+} // namespace llarp
diff --git a/llarp/router/outbound_session_maker.hpp b/llarp/router/outbound_session_maker.hpp
new file mode 100644
index 000000000..4329e2e06
--- /dev/null
+++ b/llarp/router/outbound_session_maker.hpp
@@ -0,0 +1,103 @@
+#ifndef LLARP_ROUTER_OUTBOUND_SESSION_MAKER_HPP
+#define LLARP_ROUTER_OUTBOUND_SESSION_MAKER_HPP
+
+#include
+
+#include
+#include
+#include
+#include
+
+#include
+#include
+#include
+
+struct llarp_nodedb;
+
+namespace llarp
+{
+ struct PendingSession;
+
+ struct ILinkManager;
+ struct I_RCLookupHandler;
+
+ struct OutboundSessionMaker final : public IOutboundSessionMaker
+ {
+ using CallbacksQueue = std::list< RouterCallback >;
+
+ public:
+ ~OutboundSessionMaker() = default;
+
+ bool
+ OnSessionEstablished(ILinkSession *session) override;
+
+ void
+ OnConnectTimeout(ILinkSession *session) override;
+
+ void
+ CreateSessionTo(const RouterID &router,
+ RouterCallback on_result) /* override */;
+
+ void
+ CreateSessionTo(const RouterContact &rc,
+ RouterCallback on_result) /* override */;
+
+ bool
+ HavePendingSessionTo(const RouterID &router) const override;
+
+ void
+ ConnectToRandomRouters(int numDesired, llarp_time_t now) override;
+
+ util::StatusObject
+ ExtractStatus() const override;
+
+ void
+ Init(ILinkManager *linkManager, I_RCLookupHandler *rcLookup,
+ std::shared_ptr< Logic > logic, llarp_nodedb *nodedb,
+ std::shared_ptr< llarp::thread::ThreadPool > threadpool);
+
+ private:
+ void
+ DoEstablish(const RouterID &router);
+
+ void
+ GotRouterContact(const RouterID &router, const RouterContact &rc);
+
+ void
+ InvalidRouter(const RouterID &router);
+
+ void
+ RouterNotFound(const RouterID &router);
+
+ void
+ OnRouterContactResult(const RouterID &router, const RouterContact *const rc,
+ const RCRequestResult result);
+
+ void
+ VerifyRC(const RouterContact rc);
+
+ void
+ CreatePendingSession(const RouterID &router);
+
+ void
+ FinalizeRequest(const RouterID &router, const SessionResult type);
+
+ mutable util::Mutex _mutex; // protects pendingSessions, pendingCallbacks
+
+ std::unordered_map< RouterID, std::shared_ptr< PendingSession >,
+ RouterID::Hash >
+ pendingSessions GUARDED_BY(_mutex);
+
+ std::unordered_map< RouterID, CallbacksQueue, RouterID::Hash >
+ pendingCallbacks GUARDED_BY(_mutex);
+
+ ILinkManager *_linkManager;
+ I_RCLookupHandler *_rcLookup;
+ std::shared_ptr< Logic > _logic;
+ llarp_nodedb *_nodedb;
+ std::shared_ptr< llarp::thread::ThreadPool > _threadpool;
+ };
+
+} // namespace llarp
+
+#endif // LLARP_ROUTER_OUTBOUND_SESSION_MAKER_HPP
diff --git a/llarp/router/rc_lookup_handler.cpp b/llarp/router/rc_lookup_handler.cpp
new file mode 100644
index 000000000..c6f343dc2
--- /dev/null
+++ b/llarp/router/rc_lookup_handler.cpp
@@ -0,0 +1,337 @@
+#include
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include
+#include
+
+namespace llarp
+{
+ void
+ RCLookupHandler::AddValidRouter(const RouterID &router)
+ {
+ util::Lock l(&_mutex);
+ whitelistRouters.insert(router);
+ }
+
+ void
+ RCLookupHandler::RemoveValidRouter(const RouterID &router)
+ {
+ util::Lock l(&_mutex);
+ whitelistRouters.erase(router);
+ }
+
+ void
+ RCLookupHandler::SetRouterWhitelist(const std::vector< RouterID > &routers)
+ {
+ util::Lock l(&_mutex);
+
+ whitelistRouters.clear();
+ for(auto &router : routers)
+ {
+ whitelistRouters.emplace(router);
+ }
+
+ LogInfo("lokinet service node list now has ", whitelistRouters.size(),
+ " routers");
+ }
+
+ void
+ RCLookupHandler::GetRC(const RouterID &router, RCRequestCallback callback)
+ {
+ RouterContact remoteRC;
+
+ if(_nodedb->Get(router, remoteRC))
+ {
+ if(callback)
+ {
+ callback(router, &remoteRC, RCRequestResult::Success);
+ }
+ FinalizeRequest(router, &remoteRC, RCRequestResult::Success);
+ return;
+ }
+
+ bool shouldDoLookup = false;
+
+ {
+ util::Lock l(&_mutex);
+
+ auto itr_pair = pendingCallbacks.emplace(router, CallbacksQueue{});
+
+ if(callback)
+ {
+ itr_pair.first->second.push_back(callback);
+ }
+ shouldDoLookup = itr_pair.second;
+ }
+
+ if(shouldDoLookup)
+ {
+ auto fn = std::bind(&RCLookupHandler::HandleDHTLookupResult, this, router,
+ std::placeholders::_1);
+
+ // if we are a client try using the hidden service endpoints
+ if(!isServiceNode)
+ {
+ bool sent = false;
+ LogInfo("Lookup ", router, " anonymously");
+ _hiddenServiceContext->ForEachService(
+ [&](const std::string &,
+ const std::shared_ptr< service::Endpoint > &ep) -> bool {
+ const bool success = ep->LookupRouterAnon(router, fn);
+ sent = sent || success;
+ return !success;
+ });
+ if(sent)
+ return;
+ LogWarn("cannot lookup ", router, " anonymously");
+ }
+
+ if(!_dht->impl->LookupRouter(router, fn))
+ {
+ FinalizeRequest(router, nullptr, RCRequestResult::RouterNotFound);
+ }
+ }
+ }
+
+ bool
+ RCLookupHandler::RemoteIsAllowed(const RouterID &remote) const
+ {
+ if(_strictConnectPubkeys.size() && _strictConnectPubkeys.count(remote) == 0
+ && !RemoteInBootstrap(remote))
+ {
+ return false;
+ }
+
+ util::Lock l(&_mutex);
+
+ if(useWhitelist && whitelistRouters.find(remote) == whitelistRouters.end())
+ {
+ return false;
+ }
+
+ return true;
+ }
+
+ bool
+ RCLookupHandler::CheckRC(const RouterContact &rc) const
+ {
+ if(not RemoteIsAllowed(rc.pubkey))
+ {
+ _dht->impl->DelRCNodeAsync(dht::Key_t{rc.pubkey});
+ return false;
+ }
+
+ if(not rc.Verify(_dht->impl->Now()))
+ {
+ return false;
+ }
+
+ // update nodedb if required
+ if(rc.IsPublicRouter())
+ {
+ LogInfo("Adding or updating RC for ", RouterID(rc.pubkey),
+ " to nodedb and dht.");
+ _nodedb->UpdateAsyncIfNewer(rc);
+ _dht->impl->PutRCNodeAsync(rc);
+ }
+
+ return true;
+ }
+
+ bool
+ RCLookupHandler::GetRandomWhitelistRouter(RouterID &router) const
+ {
+ util::Lock l(&_mutex);
+
+ const auto sz = whitelistRouters.size();
+ auto itr = whitelistRouters.begin();
+ if(sz == 0)
+ return false;
+ if(sz > 1)
+ std::advance(itr, randint() % sz);
+ router = *itr;
+ return true;
+ }
+
+ bool
+ RCLookupHandler::CheckRenegotiateValid(RouterContact newrc,
+ RouterContact oldrc)
+ {
+ // missmatch of identity ?
+ if(newrc.pubkey != oldrc.pubkey)
+ return false;
+
+ if(!RemoteIsAllowed(newrc.pubkey))
+ return false;
+
+ auto func = std::bind(&RCLookupHandler::CheckRC, this, newrc);
+ _threadpool->addJob(func);
+
+ // update dht if required
+ if(_dht->impl->Nodes()->HasNode(dht::Key_t{newrc.pubkey}))
+ {
+ _dht->impl->Nodes()->PutNode(newrc);
+ }
+
+ // TODO: check for other places that need updating the RC
+ return true;
+ }
+
+ void
+ RCLookupHandler::PeriodicUpdate(llarp_time_t now)
+ {
+ // try looking up stale routers
+ std::set< RouterID > routersToLookUp;
+
+ _nodedb->VisitInsertedBefore(
+ [&](const RouterContact &rc) {
+ if(HavePendingLookup(rc.pubkey))
+ return;
+ routersToLookUp.insert(rc.pubkey);
+ },
+ now - RouterContact::UpdateInterval);
+
+ for(const auto &router : routersToLookUp)
+ {
+ GetRC(router, nullptr);
+ }
+
+ _nodedb->RemoveStaleRCs(_bootstrapRouterIDList,
+ now - RouterContact::StaleInsertionAge);
+ }
+
+ void
+ RCLookupHandler::ExploreNetwork()
+ {
+ if(_bootstrapRCList.size())
+ {
+ for(const auto &rc : _bootstrapRCList)
+ {
+ LogInfo("Doing explore via bootstrap node: ", RouterID(rc.pubkey));
+ _dht->impl->ExploreNetworkVia(dht::Key_t{rc.pubkey});
+ }
+ }
+ else
+ {
+ LogError("we have no bootstrap nodes specified");
+ }
+
+ // TODO: only explore via random subset
+ // explore via every connected peer
+ _linkManager->ForEachPeer([&](ILinkSession *s) {
+ if(!s->IsEstablished())
+ return;
+ const RouterContact rc = s->GetRemoteRC();
+ if(rc.IsPublicRouter()
+ && (_bootstrapRCList.find(rc) == _bootstrapRCList.end()))
+ {
+ LogInfo("Doing explore via public node: ", RouterID(rc.pubkey));
+ _dht->impl->ExploreNetworkVia(dht::Key_t{rc.pubkey});
+ }
+ });
+ }
+
+ void
+ RCLookupHandler::Init(llarp_dht_context *dht, llarp_nodedb *nodedb,
+ std::shared_ptr< llarp::thread::ThreadPool > threadpool,
+ ILinkManager *linkManager,
+ service::Context *hiddenServiceContext,
+ const std::set< RouterID > &strictConnectPubkeys,
+ const std::set< RouterContact > &bootstrapRCList,
+ bool useWhitelist_arg, bool isServiceNode_arg)
+ {
+ _dht = dht;
+ _nodedb = nodedb;
+ _threadpool = threadpool;
+ _hiddenServiceContext = hiddenServiceContext;
+ _strictConnectPubkeys = strictConnectPubkeys;
+ _bootstrapRCList = bootstrapRCList;
+ _linkManager = linkManager;
+ useWhitelist = useWhitelist_arg;
+ isServiceNode = isServiceNode_arg;
+
+ for(const auto &rc : _bootstrapRCList)
+ {
+ _bootstrapRouterIDList.insert(rc.pubkey);
+ }
+ }
+
+ void
+ RCLookupHandler::HandleDHTLookupResult(
+ RouterID remote, const std::vector< RouterContact > &results)
+ {
+ if(not results.size())
+ {
+ FinalizeRequest(remote, nullptr, RCRequestResult::RouterNotFound);
+ return;
+ }
+
+ if(not RemoteIsAllowed(remote))
+ {
+ FinalizeRequest(remote, &results[0], RCRequestResult::InvalidRouter);
+ return;
+ }
+
+ if(not CheckRC(results[0]))
+ {
+ FinalizeRequest(remote, &results[0], RCRequestResult::BadRC);
+ return;
+ }
+
+ FinalizeRequest(remote, &results[0], RCRequestResult::Success);
+ }
+
+ bool
+ RCLookupHandler::HavePendingLookup(RouterID remote) const
+ {
+ return pendingCallbacks.find(remote) != pendingCallbacks.end();
+ }
+
+ bool
+ RCLookupHandler::RemoteInBootstrap(const RouterID &remote) const
+ {
+ for(const auto &rc : _bootstrapRCList)
+ {
+ if(rc.pubkey == remote)
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ void
+ RCLookupHandler::FinalizeRequest(const RouterID &router,
+ const RouterContact *const rc,
+ RCRequestResult result)
+ {
+ CallbacksQueue movedCallbacks;
+ {
+ util::Lock l(&_mutex);
+
+ auto itr = pendingCallbacks.find(router);
+
+ if(itr != pendingCallbacks.end())
+ {
+ movedCallbacks.splice(movedCallbacks.begin(), itr->second);
+ pendingCallbacks.erase(itr);
+ }
+ } // lock
+
+ for(const auto &callback : movedCallbacks)
+ {
+ callback(router, rc, result);
+ }
+ }
+
+} // namespace llarp
diff --git a/llarp/router/rc_lookup_handler.hpp b/llarp/router/rc_lookup_handler.hpp
new file mode 100644
index 000000000..1638da6c6
--- /dev/null
+++ b/llarp/router/rc_lookup_handler.hpp
@@ -0,0 +1,112 @@
+#ifndef LLARP_RC_LOOKUP_HANDLER_HPP
+#define LLARP_RC_LOOKUP_HANDLER_HPP
+
+#include
+
+#include
+#include
+
+#include
+#include
+#include
+
+struct llarp_nodedb;
+struct llarp_dht_context;
+
+namespace llarp
+{
+ namespace service
+ {
+ struct Context;
+
+ } // namespace service
+
+ struct ILinkManager;
+
+ struct RCLookupHandler final : public I_RCLookupHandler
+ {
+ public:
+ using CallbacksQueue = std::list< RCRequestCallback >;
+
+ ~RCLookupHandler() = default;
+
+ void
+ AddValidRouter(const RouterID &router) override;
+
+ void
+ RemoveValidRouter(const RouterID &router) override;
+
+ void
+ SetRouterWhitelist(const std::vector< RouterID > &routers) override;
+
+ void
+ GetRC(const RouterID &router, RCRequestCallback callback) override;
+
+ bool
+ RemoteIsAllowed(const RouterID &remote) const override;
+
+ bool
+ CheckRC(const RouterContact &rc) const override;
+
+ bool
+ GetRandomWhitelistRouter(RouterID &router) const override;
+
+ bool
+ CheckRenegotiateValid(RouterContact newrc, RouterContact oldrc) override;
+
+ void
+ PeriodicUpdate(llarp_time_t now) override;
+
+ void
+ ExploreNetwork() override;
+
+ void
+ Init(llarp_dht_context *dht, llarp_nodedb *nodedb,
+ std::shared_ptr< llarp::thread::ThreadPool > threadpool,
+ ILinkManager *linkManager, service::Context *hiddenServiceContext,
+ const std::set< RouterID > &strictConnectPubkeys,
+ const std::set< RouterContact > &bootstrapRCList,
+ bool useWhitelist_arg, bool isServiceNode_arg);
+
+ private:
+ void
+ HandleDHTLookupResult(RouterID remote,
+ const std::vector< RouterContact > &results);
+
+ bool
+ HavePendingLookup(RouterID remote) const;
+
+ bool
+ RemoteInBootstrap(const RouterID &remote) const;
+
+ void
+ FinalizeRequest(const RouterID &router, const RouterContact *const rc,
+ RCRequestResult result);
+
+ mutable util::Mutex _mutex; // protects pendingCallbacks, whitelistRouters
+
+ llarp_dht_context *_dht = nullptr;
+ llarp_nodedb *_nodedb = nullptr;
+ std::shared_ptr< llarp::thread::ThreadPool > _threadpool = nullptr;
+ service::Context *_hiddenServiceContext = nullptr;
+ ILinkManager *_linkManager = nullptr;
+
+ /// explicit whitelist of routers we will connect to directly (not for
+ /// service nodes)
+ std::set< RouterID > _strictConnectPubkeys;
+
+ std::set< RouterContact > _bootstrapRCList;
+ std::set< RouterID > _bootstrapRouterIDList;
+
+ std::unordered_map< RouterID, CallbacksQueue, RouterID::Hash >
+ pendingCallbacks GUARDED_BY(_mutex);
+
+ bool useWhitelist = false;
+ bool isServiceNode = false;
+
+ std::set< RouterID > whitelistRouters GUARDED_BY(_mutex);
+ };
+
+} // namespace llarp
+
+#endif // LLARP_RC_LOOKUP_HANDLER_HPP
diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp
index 6e1136933..f9231c3ac 100644
--- a/llarp/router/router.cpp
+++ b/llarp/router/router.cpp
@@ -30,93 +30,6 @@
#include
#endif
-namespace llarp
-{
- struct async_verify_context
- {
- Router *router;
- TryConnectJob *establish_job;
- };
-
-} // namespace llarp
-
-struct TryConnectJob
-{
- llarp_time_t lastAttempt = 0;
- const llarp::RouterContact rc;
- llarp::LinkLayer_ptr link;
- llarp::Router *router;
- uint16_t triesLeft;
- TryConnectJob(const llarp::RouterContact &remote, llarp::LinkLayer_ptr l,
- uint16_t tries, llarp::Router *r)
- : rc(remote), link(l), router(r), triesLeft(tries)
- {
- }
-
- ~TryConnectJob()
- {
- }
-
- bool
- TimeoutReached() const
- {
- const auto now = router->Now();
- return now > lastAttempt && now - lastAttempt > 5000;
- }
-
- void
- Success()
- {
- router->routerProfiling().MarkConnectSuccess(rc.pubkey);
- router->FlushOutboundFor(rc.pubkey, link.get());
- }
-
- /// return true to remove
- bool
- Timeout()
- {
- if(ShouldRetry())
- {
- return Attempt();
- }
- // discard pending traffic on timeout
- router->DiscardOutboundFor(rc.pubkey);
- router->routerProfiling().MarkConnectTimeout(rc.pubkey);
- if(router->routerProfiling().IsBad(rc.pubkey))
- {
- if(!router->IsBootstrapNode(rc.pubkey))
- router->nodedb()->Remove(rc.pubkey);
- }
- return true;
- }
-
- /// return true to remove
- bool
- Attempt()
- {
- --triesLeft;
- if(!link)
- return true;
- if(!link->TryEstablishTo(rc))
- return true;
- lastAttempt = router->Now();
- return false;
- }
-
- bool
- ShouldRetry() const
- {
- return triesLeft > 0;
- }
-};
-
-static void
-on_try_connecting(std::shared_ptr< TryConnectJob > j)
-{
- if(j->Attempt())
- j->router->pendingEstablishJobs.erase(j->rc.pubkey);
-}
-
bool
llarp_loadServiceNodeIdentityKey(const fs::path &fpath,
llarp::SecretKey &secret)
@@ -165,46 +78,6 @@ llarp_findOrCreateEncryption(const fs::path &path, llarp::SecretKey &encryption)
namespace llarp
{
- bool
- Router::TryConnectAsync(RouterContact remote, uint16_t numretries)
- {
- const RouterID us = pubkey();
- if(remote.pubkey == us)
- return false;
- if(!ConnectionToRouterAllowed(remote.pubkey))
- return false;
- // do we already have a pending job for this remote?
- if(HasPendingConnectJob(remote.pubkey))
- {
- LogDebug("We have pending connect jobs to ", remote.pubkey);
- return false;
- }
-
- for(auto &link : outboundLinks)
- {
- if(!link->IsCompatable(remote))
- continue;
- std::shared_ptr< TryConnectJob > job =
- std::make_shared< TryConnectJob >(remote, link, numretries, this);
- auto itr = pendingEstablishJobs.emplace(remote.pubkey, job);
- if(itr.second)
- {
- // try establishing async
- _logic->queue_func(std::bind(&on_try_connecting, job));
- return true;
- }
-
- itr.first->second->Attempt();
- }
- return false;
- }
-
- bool
- Router::OnSessionEstablished(ILinkSession *s)
- {
- return async_verify_RC(s->GetRemoteRC());
- }
-
Router::Router(std::shared_ptr< llarp::thread::ThreadPool > _tp,
llarp_ev_loop_ptr __netloop, std::shared_ptr< Logic > l)
: ready(false)
@@ -238,19 +111,9 @@ namespace llarp
util::StatusObject obj{{"dht", _dht->impl->ExtractStatus()},
{"services", _hiddenServiceContext.ExtractStatus()},
{"exit", _exitContext.ExtractStatus()}};
- std::vector< util::StatusObject > ob_links, ib_links;
- std::transform(inboundLinks.begin(), inboundLinks.end(),
- std::back_inserter(ib_links),
- [](const auto &link) -> util::StatusObject {
- return link->ExtractStatus();
- });
- std::transform(outboundLinks.begin(), outboundLinks.end(),
- std::back_inserter(ob_links),
- [](const auto &link) -> util::StatusObject {
- return link->ExtractStatus();
- });
- obj.Put("links",
- util::StatusObject{{"outbound", ob_links}, {"inbound", ib_links}});
+
+ obj.Put("links", _linkManager.ExtractStatus());
+
return obj;
}
@@ -272,15 +135,17 @@ namespace llarp
void
Router::PersistSessionUntil(const RouterID &remote, llarp_time_t until)
{
- m_PersistingSessions[remote] =
- std::max(until, m_PersistingSessions[remote]);
- LogDebug("persist session to ", remote, " until ",
- m_PersistingSessions[remote]);
+ _linkManager.PersistSessionUntil(remote, until);
}
bool
Router::GetRandomGoodRouter(RouterID &router)
{
+ if(whitelistRouters)
+ {
+ return _rcLookupHandler.GetRandomWhitelistRouter(router);
+ }
+
auto pick_router = [&](auto &collection) -> bool {
const auto sz = collection.size();
auto itr = collection.begin();
@@ -291,10 +156,7 @@ namespace llarp
router = itr->first;
return true;
};
- if(whitelistRouters)
- {
- pick_router(lokinetRouters);
- }
+
absl::ReaderMutexLock l(&nodedb()->access);
return pick_router(nodedb()->entries);
}
@@ -302,115 +164,28 @@ namespace llarp
void
Router::PumpLL()
{
- for(const auto &link : inboundLinks)
- {
- link->Pump();
- }
- for(const auto &link : outboundLinks)
- {
- link->Pump();
- }
+ _linkManager.PumpLinks();
}
bool
Router::SendToOrQueue(const RouterID &remote, const ILinkMessage *msg)
{
- for(const auto &link : inboundLinks)
- {
- if(link->HasSessionTo(remote))
- {
- SendTo(remote, msg, link.get());
- return true;
- }
- }
- for(const auto &link : outboundLinks)
- {
- if(link->HasSessionTo(remote))
- {
- SendTo(remote, msg, link.get());
- return true;
- }
- }
- // no link available
-
- // this will create an entry in the outbound mq if it's not already there
- auto itr = outboundMessageQueue.find(remote);
- if(itr == outboundMessageQueue.end())
- {
- outboundMessageQueue.emplace(remote, MessageQueue());
- }
- // encode
- llarp_buffer_t buf(linkmsg_buffer);
- if(!msg->BEncode(&buf))
- return false;
- // queue buffer
- auto &q = outboundMessageQueue[remote];
-
- buf.sz = buf.cur - buf.base;
- q.emplace(buf.sz);
- memcpy(q.back().data(), buf.base, buf.sz);
- RouterContact remoteRC;
- // we don't have an open session to that router right now
- if(nodedb()->Get(remote, remoteRC))
- {
- // try connecting directly as the rc is loaded from disk
- return TryConnectAsync(remoteRC, 10);
- }
-
- // we don't have the RC locally so do a dht lookup
- _dht->impl->LookupRouter(remote,
- std::bind(&Router::HandleDHTLookupForSendTo, this,
- remote, std::placeholders::_1));
- return true;
- }
-
- void
- Router::HandleDHTLookupForSendTo(RouterID remote,
- const std::vector< RouterContact > &results)
- {
- if(results.size())
- {
- if(whitelistRouters
- && lokinetRouters.find(results[0].pubkey) == lokinetRouters.end())
- {
- return;
- }
- if(results[0].Verify(Now()))
- {
- TryConnectAsync(results[0], 10);
- return;
- }
- }
- DiscardOutboundFor(remote);
+ using std::placeholders::_1;
+ auto func = std::bind(&Router::MessageSent, this, remote, _1);
+ return _outboundMessageHandler.QueueMessage(remote, msg, func);
}
void
Router::ForEachPeer(std::function< void(const ILinkSession *, bool) > visit,
bool randomize) const
{
- for(const auto &link : outboundLinks)
- {
- link->ForEachSession(
- [visit](const ILinkSession *peer) { visit(peer, true); }, randomize);
- }
- for(const auto &link : inboundLinks)
- {
- link->ForEachSession(
- [visit](const ILinkSession *peer) { visit(peer, false); }, randomize);
- }
+ _linkManager.ForEachPeer(visit, randomize);
}
void
Router::ForEachPeer(std::function< void(ILinkSession *) > visit)
{
- for(const auto &link : outboundLinks)
- {
- link->ForEachSession([visit](ILinkSession *peer) { visit(peer); });
- }
- for(const auto &link : inboundLinks)
- {
- link->ForEachSession([visit](ILinkSession *peer) { visit(peer); });
- }
+ _linkManager.ForEachPeer(visit);
}
void
@@ -425,11 +200,7 @@ namespace llarp
if(remote.Verify(Now()))
{
LogDebug("verified signature");
- if(!TryConnectAsync(remote, 10))
- {
- // or error?
- LogWarn("session already made");
- }
+ _outboundSessionMaker.CreateSessionTo(remote, nullptr);
}
else
LogError(rcfile, " contains invalid RC");
@@ -452,36 +223,25 @@ namespace llarp
return llarp_findOrCreateEncryption(encryption_keyfile, _encryption);
}
- void
- Router::AddLink(std::shared_ptr< ILinkLayer > link, bool inbound)
- {
- if(inbound)
- inboundLinks.emplace(link);
- else
- outboundLinks.emplace(link);
- }
-
bool
- Router::Configure(Config *conf)
+ Router::Configure(Config *conf, llarp_nodedb *nodedb)
{
+ if(nodedb == nullptr)
+ {
+ LogError(
+ "Attempting to Router::Configure but passed null nodedb pointer");
+ return false;
+ }
+ _nodedb = nodedb;
+
if(!FromConfig(conf))
return false;
if(!InitOutboundLinks())
return false;
- if(!Ready())
- {
- return false;
- }
return EnsureIdentity();
}
- bool
- Router::Ready()
- {
- return outboundLinks.size() > 0;
- }
-
/// called in disk worker thread
void
Router::HandleSaveRC() const
@@ -507,7 +267,7 @@ namespace llarp
bool
Router::IsServiceNode() const
{
- return inboundLinks.size() > 0;
+ return m_isServiceNode;
}
void
@@ -515,69 +275,10 @@ namespace llarp
{
LogInfo("closing router");
llarp_ev_loop_stop(_netloop);
- inboundLinks.clear();
- outboundLinks.clear();
disk->stop();
disk->shutdown();
}
- void
- Router::on_verify_client_rc(llarp_async_verify_rc *job)
- {
- async_verify_context *ctx =
- static_cast< async_verify_context * >(job->user);
- auto router = ctx->router;
- const PubKey pk(job->rc.pubkey);
- router->FlushOutboundFor(pk, router->GetLinkWithSessionByPubkey(pk));
- delete ctx;
- router->pendingVerifyRC.erase(pk);
- router->pendingEstablishJobs.erase(pk);
- }
-
- void
- Router::on_verify_server_rc(llarp_async_verify_rc *job)
- {
- async_verify_context *ctx =
- static_cast< async_verify_context * >(job->user);
- auto router = ctx->router;
- const PubKey pk(job->rc.pubkey);
- if(!job->valid)
- {
- delete ctx;
- router->DiscardOutboundFor(pk);
- router->pendingVerifyRC.erase(pk);
- return;
- }
- // we're valid, which means it's already been committed to the nodedb
-
- LogDebug("rc verified and saved to nodedb");
-
- if(router->validRouters.count(pk))
- {
- router->validRouters.erase(pk);
- }
-
- const RouterContact rc = job->rc;
-
- router->validRouters.emplace(pk, rc);
-
- // track valid router in dht
- router->dht()->impl->Nodes()->PutNode(rc);
-
- // mark success in profile
- router->routerProfiling().MarkConnectSuccess(pk);
-
- // this was an outbound establish job
- if(ctx->establish_job)
- {
- ctx->establish_job->Success();
- }
- else
- router->FlushOutboundFor(pk, router->GetLinkWithSessionByPubkey(pk));
- delete ctx;
- router->pendingVerifyRC.erase(pk);
- }
-
void
Router::handle_router_ticker(void *user, uint64_t orig, uint64_t left)
{
@@ -600,141 +301,19 @@ namespace llarp
bool
Router::ConnectionToRouterAllowed(const RouterID &router) const
{
- if(strictConnectPubkeys.size() && strictConnectPubkeys.count(router) == 0)
- return false;
- if(IsServiceNode() && whitelistRouters)
- return lokinetRouters.find(router) != lokinetRouters.end();
-
- return true;
- }
-
- void
- Router::HandleDHTLookupForExplore(RouterID,
- const std::vector< RouterContact > &results)
- {
- const auto numConnected = NumberOfConnectedRouters();
- for(const auto &rc : results)
- {
- if(!rc.Verify(Now()))
- continue;
- nodedb()->UpdateAsyncIfNewer(rc);
-
- if(ConnectionToRouterAllowed(rc.pubkey)
- && numConnected < minConnectedRouters)
- TryConnectAsync(rc, 10);
- }
- }
-
- void
- Router::TryEstablishTo(const RouterID &remote)
- {
- const RouterID us = pubkey();
- if(us == remote)
- return;
-
- if(!ConnectionToRouterAllowed(remote))
- {
- LogWarn("not connecting to ", remote, " as it's not permitted by config");
- return;
- }
-
- RouterContact rc;
- if(nodedb()->Get(remote, rc))
- {
- // try connecting async
- TryConnectAsync(rc, 5);
- }
- else if(IsServiceNode())
- {
- if(dht()->impl->HasRouterLookup(remote))
- return;
- LogInfo("looking up router ", remote);
- // dht lookup as we don't know it
- dht()->impl->LookupRouter(
- remote,
- std::bind(&Router::HandleDHTLookupForTryEstablishTo, this, remote,
- std::placeholders::_1));
- }
- else
- {
- LogWarn("not connecting to ", remote, " as it's unreliable");
- }
- }
-
- void
- Router::OnConnectTimeout(ILinkSession *session)
- {
- auto itr = pendingEstablishJobs.find(session->GetPubKey());
- if(itr != pendingEstablishJobs.end())
- {
- if(itr->second->Timeout())
- pendingEstablishJobs.erase(itr);
- }
- }
-
- void
- Router::HandleDHTLookupForTryEstablishTo(
- RouterID remote, const std::vector< RouterContact > &results)
- {
- if(results.size() == 0)
- {
- if(!IsServiceNode())
- routerProfiling().MarkConnectTimeout(remote);
- }
- for(const auto &result : results)
- {
- if(whitelistRouters
- && lokinetRouters.find(result.pubkey) == lokinetRouters.end())
- continue;
- nodedb()->UpdateAsyncIfNewer(result);
- TryConnectAsync(result, 10);
- }
- }
-
- size_t
- Router::NumberOfRoutersMatchingFilter(
- std::function< bool(const ILinkSession *) > filter) const
- {
- std::set< RouterID > connected;
- ForEachPeer([&](const auto *link, bool) {
- if(filter(link))
- connected.insert(link->GetPubKey());
- });
- return connected.size();
+ return _rcLookupHandler.RemoteIsAllowed(router);
}
size_t
Router::NumberOfConnectedRouters() const
{
- return NumberOfRoutersMatchingFilter([&](const ILinkSession *link) -> bool {
- if(!link->IsEstablished())
- return false;
- const RouterContact rc(link->GetRemoteRC());
- return rc.IsPublicRouter() && ConnectionToRouterAllowed(rc.pubkey);
- });
+ return _linkManager.NumberOfConnectedRouters();
}
size_t
Router::NumberOfConnectedClients() const
{
- return NumberOfRoutersMatchingFilter([&](const ILinkSession *link) -> bool {
- if(!link->IsEstablished())
- return false;
- const RouterContact rc(link->GetRemoteRC());
- return !rc.IsPublicRouter();
- });
- }
-
- size_t
- Router::NumberOfConnectionsMatchingFilter(
- std::function< bool(const ILinkSession *) > filter) const
- {
- size_t sz = 0;
- ForEachPeer([&](const auto *link, bool) {
- if(filter(link))
- ++sz;
- });
- return sz;
+ return _linkManager.NumberOfConnectedClients();
}
bool
@@ -807,55 +386,13 @@ namespace llarp
lokidRPCUser = conf->lokid.lokidRPCUser;
lokidRPCPassword = conf->lokid.lokidRPCPassword;
- if(!usingSNSeed)
+ // TODO: add config flag for "is service node"
+ if(conf->iwp_links.servers().size())
{
- ident_keyfile = conf->router.identKeyfile();
+ m_isServiceNode = true;
}
- for(const auto &serverConfig : conf->iwp_links.servers())
- {
- auto server = llarp::utp::NewServerFromRouter(this);
- if(!server->EnsureKeys(transport_keyfile.string().c_str()))
- {
- llarp::LogError("failed to ensure keyfile ", transport_keyfile);
- return false;
- }
-
- const auto &key = std::get< 0 >(serverConfig);
- int af = std::get< 1 >(serverConfig);
- uint16_t port = std::get< 2 >(serverConfig);
- if(!server->Configure(netloop(), key, af, port))
- {
- LogError("failed to bind inbound link on ", key, " port ", port);
- return false;
- }
- AddLink(std::move(server), true);
- }
-
- // set network config
- netConfig = conf->network.netConfig();
-
- // Network config
- if(conf->network.enableProfiling().has_value())
- {
- if(conf->network.enableProfiling().value())
- {
- routerProfiling().Enable();
- LogInfo("router profiling explicitly enabled");
- }
- else
- {
- routerProfiling().Disable();
- LogInfo("router profiling explicitly disabled");
- }
- }
-
- if(!conf->network.routerProfilesFile().empty())
- {
- routerProfilesFile = conf->network.routerProfilesFile();
- routerProfiling().Load(routerProfilesFile.c_str());
- llarp::LogInfo("setting profiles to ", routerProfilesFile);
- }
+ std::set< RouterID > strictConnectPubkeys;
if(!conf->network.strictConnect().empty())
{
@@ -888,6 +425,111 @@ namespace llarp
llarp::LogError("invalid key for strict-connect: ", val);
}
+ std::vector< std::string > configRouters = conf->connect.routers;
+ configRouters.insert(configRouters.end(), conf->bootstrap.routers.begin(),
+ conf->bootstrap.routers.end());
+ for(const auto &router : configRouters)
+ {
+ // llarp::LogDebug("connect section has ", key, "=", val);
+ RouterContact rc;
+ if(!rc.Read(router.c_str()))
+ {
+ llarp::LogWarn("failed to decode bootstrap RC, file='", router,
+ "' rc=", rc);
+ return false;
+ }
+ if(rc.Verify(Now()))
+ {
+ const auto result = bootstrapRCList.insert(rc);
+ if(result.second)
+ llarp::LogInfo("Added bootstrap node ", RouterID(rc.pubkey));
+ else
+ llarp::LogWarn("Duplicate bootstrap node ", RouterID(rc.pubkey));
+ }
+ else
+ {
+ if(rc.IsExpired(Now()))
+ {
+ llarp::LogWarn("Bootstrap node ", RouterID(rc.pubkey),
+ " is too old and needs to be refreshed");
+ }
+ else
+ {
+ llarp::LogError("malformed rc file='", router, "' rc=", rc);
+ }
+ }
+ }
+
+ // Init components after relevant config settings loaded
+ _outboundMessageHandler.Init(&_linkManager, _logic);
+ _outboundSessionMaker.Init(&_linkManager, &_rcLookupHandler, _logic,
+ _nodedb, threadpool());
+ _linkManager.Init(&_outboundSessionMaker);
+ _rcLookupHandler.Init(_dht, _nodedb, threadpool(), &_linkManager,
+ &_hiddenServiceContext, strictConnectPubkeys,
+ bootstrapRCList, whitelistRouters, m_isServiceNode);
+
+ if(!usingSNSeed)
+ {
+ ident_keyfile = conf->router.identKeyfile();
+ }
+
+ // create inbound links, if we are a service node
+ for(const auto &serverConfig : conf->iwp_links.servers())
+ {
+ auto server = llarp::utp::NewServer(
+ encryption(), util::memFn(&AbstractRouter::rc, this),
+ util::memFn(&AbstractRouter::HandleRecvLinkMessageBuffer, this),
+ util::memFn(&IOutboundSessionMaker::OnSessionEstablished,
+ &_outboundSessionMaker),
+ util::memFn(&AbstractRouter::CheckRenegotiateValid, this),
+ util::memFn(&AbstractRouter::Sign, this),
+ util::memFn(&IOutboundSessionMaker::OnConnectTimeout,
+ &_outboundSessionMaker),
+ util::memFn(&AbstractRouter::SessionClosed, this));
+
+ if(!server->EnsureKeys(transport_keyfile.string().c_str()))
+ {
+ llarp::LogError("failed to ensure keyfile ", transport_keyfile);
+ return false;
+ }
+
+ const auto &key = std::get< 0 >(serverConfig);
+ int af = std::get< 1 >(serverConfig);
+ uint16_t port = std::get< 2 >(serverConfig);
+ if(!server->Configure(netloop(), key, af, port))
+ {
+ LogError("failed to bind inbound link on ", key, " port ", port);
+ return false;
+ }
+ _linkManager.AddLink(std::move(server), true);
+ }
+
+ // set network config
+ netConfig = conf->network.netConfig();
+
+ // Network config
+ if(conf->network.enableProfiling().has_value())
+ {
+ if(conf->network.enableProfiling().value())
+ {
+ routerProfiling().Enable();
+ LogInfo("router profiling explicitly enabled");
+ }
+ else
+ {
+ routerProfiling().Disable();
+ LogInfo("router profiling explicitly disabled");
+ }
+ }
+
+ if(!conf->network.routerProfilesFile().empty())
+ {
+ routerProfilesFile = conf->network.routerProfilesFile();
+ routerProfiling().Load(routerProfilesFile.c_str());
+ llarp::LogInfo("setting profiles to ", routerProfilesFile);
+ }
+
// API config
enableRPCServer = conf->api.enableRPCServer();
rpcBindAddr = conf->api.rpcBindAddr();
@@ -923,146 +565,13 @@ namespace llarp
netConfig.insert(conf->dns.netConfig.begin(), conf->dns.netConfig.end());
- std::vector< std::string > configRouters = conf->connect.routers;
- configRouters.insert(configRouters.end(), conf->bootstrap.routers.begin(),
- conf->bootstrap.routers.end());
- for(const auto &router : configRouters)
- {
- // llarp::LogDebug("connect section has ", key, "=", val);
- RouterContact rc;
- if(!rc.Read(router.c_str()))
- {
- llarp::LogWarn("failed to decode bootstrap RC, file='", router,
- "' rc=", rc);
- return false;
- }
- if(rc.Verify(Now()))
- {
- const auto result = bootstrapRCList.insert(rc);
- if(result.second)
- llarp::LogInfo("Added bootstrap node ", RouterID(rc.pubkey));
- else
- llarp::LogWarn("Duplicate bootstrap node ", RouterID(rc.pubkey));
- }
- else
- {
- if(rc.IsExpired(Now()))
- {
- llarp::LogWarn("Bootstrap node ", RouterID(rc.pubkey),
- " is too old and needs to be refreshed");
- }
- else
- {
- llarp::LogError("malformed rc file='", router, "' rc=", rc);
- }
- }
- }
return true;
}
bool
Router::CheckRenegotiateValid(RouterContact newrc, RouterContact oldrc)
{
- // missmatch of identity ?
- if(newrc.pubkey != oldrc.pubkey)
- return false;
-
- // store it in nodedb async
- if(!async_verify_RC(newrc))
- return false;
- // update dht if required
- if(dht()->impl->Nodes()->HasNode(dht::Key_t{newrc.pubkey}))
- {
- dht()->impl->Nodes()->PutNode(newrc);
- }
- // update valid routers
- {
- auto itr = validRouters.find(newrc.pubkey);
- if(itr == validRouters.end())
- validRouters[newrc.pubkey] = newrc;
- else
- itr->second = newrc;
- }
- // TODO: check for other places that need updating the RC
- return true;
- }
-
- void
- Router::LookupRouterWhenExpired(RouterID router)
- {
- LookupRouter(router,
- std::bind(&Router::HandleRouterLookupForExpireUpdate, this,
- router, std::placeholders::_1));
- }
-
- void
- Router::HandleRouterLookupForExpireUpdate(
- RouterID router, const std::vector< RouterContact > &result)
- {
- const auto now = Now();
- RouterContact current;
- if(nodedb()->Get(router, current))
- {
- if(current.IsExpired(now))
- {
- nodedb()->Remove(router);
- }
- }
- if(result.size() == 1 && !result[0].IsExpired(now))
- {
- LogInfo("storing rc for ", router);
- nodedb()->UpdateAsyncIfNewer(result[0]);
- }
- else
- {
- LogInfo("not storing rc for ", router);
- }
- }
-
- bool
- Router::HasPendingRouterLookup(const RouterID &remote) const
- {
- if(IsServiceNode())
- return dht()->impl->HasRouterLookup(remote);
- bool has = false;
- _hiddenServiceContext.ForEachService(
- [&has, remote](const std::string &,
- const std::shared_ptr< service::Endpoint > &ep) -> bool {
- has |= ep->HasPendingRouterLookup(remote);
- return true;
- });
- return has;
- }
-
- void
- Router::LookupRouter(RouterID remote, RouterLookupHandler resultHandler)
- {
- if(!resultHandler)
- {
- resultHandler = std::bind(&Router::HandleRouterLookupForExpireUpdate,
- this, remote, std::placeholders::_1);
- }
-
- // if we are a client try using the hidden service endpoints
- if(!IsServiceNode())
- {
- bool sent = false;
- LogInfo("Lookup ", remote, " anonymously");
- _hiddenServiceContext.ForEachService(
- [&](const std::string &,
- const std::shared_ptr< service::Endpoint > &ep) -> bool {
- const bool success = ep->LookupRouterAnon(remote, resultHandler);
- sent = sent || success;
- return !success;
- });
- if(sent)
- return;
- LogWarn("cannot lookup ", remote, " anonymously");
- }
- // if we are service node or failed to use hidden service endpoints as
- // client do a direct lookup
- LogInfo("Lookup ", remote, " direct");
- dht()->impl->LookupRouter(remote, resultHandler);
+ return _rcLookupHandler.CheckRenegotiateValid(newrc, oldrc);
}
bool
@@ -1114,44 +623,10 @@ namespace llarp
ReportStats();
}
- const auto insertRouters = [&](const std::vector< RouterContact > &res) {
- // store found routers
- for(const auto &rc : res)
- {
- // don't accept expired RCs
- if(rc.Verify(Now(), false))
- nodedb()->InsertAsync(rc);
- }
- };
+ _rcLookupHandler.PeriodicUpdate(now);
const bool isSvcNode = IsServiceNode();
- // try looking up stale routers
- nodedb()->VisitInsertedBefore(
- [&](const RouterContact &rc) {
- if(HasPendingRouterLookup(rc.pubkey))
- return;
- LookupRouter(rc.pubkey, insertRouters);
- },
- now - RouterContact::UpdateInterval);
-
- std::set< RouterID > removeStale;
- // remove stale routers
- const auto timeout =
- RouterContact::UpdateWindow * RouterContact::UpdateTries;
- nodedb()->VisitInsertedBefore(
- [&](const RouterContact &rc) {
- if(IsBootstrapNode(rc.pubkey))
- return;
- LogInfo("removing stale router: ", RouterID(rc.pubkey));
- removeStale.insert(rc.pubkey);
- },
- now - timeout);
-
- nodedb()->RemoveIf([removeStale](const RouterContact &rc) -> bool {
- return removeStale.count(rc.pubkey) > 0;
- });
-
if(isSvcNode)
{
if(_rc.ExpiresSoon(now, randint() % 10000)
@@ -1162,65 +637,17 @@ namespace llarp
LogError("Failed to update our RC");
}
- /*
- // kill nodes that are not allowed by network policy
+ // remove RCs for nodes that are no longer allowed by network policy
nodedb()->RemoveIf([&](const RouterContact &rc) -> bool {
if(IsBootstrapNode(rc.pubkey))
return false;
- return !ConnectionToRouterAllowed(rc.pubkey);
+ return !_rcLookupHandler.RemoteIsAllowed(rc.pubkey);
});
- */
}
// expire paths
paths.ExpirePaths(now);
- {
- auto itr = pendingEstablishJobs.begin();
- while(itr != pendingEstablishJobs.end())
- {
- if(itr->second->TimeoutReached() && itr->second->Timeout())
- {
- LogWarn("failed to connect to ", itr->first);
- itr = pendingEstablishJobs.erase(itr);
- }
- else
- ++itr;
- }
- }
- {
- auto itr = m_PersistingSessions.begin();
- while(itr != m_PersistingSessions.end())
- {
- auto link = GetLinkWithSessionByPubkey(itr->first);
- if(now < itr->second)
- {
- if(link && link->HasSessionTo(itr->first))
- {
- LogDebug("keepalive to ", itr->first);
- link->KeepAliveSessionTo(itr->first);
- }
- else
- {
- RouterContact rc;
- if(nodedb()->Get(itr->first, rc))
- {
- if(rc.IsPublicRouter())
- {
- LogDebug("establish to ", itr->first);
- TryConnectAsync(rc, 5);
- }
- }
- }
- ++itr;
- }
- else
- {
- const RouterID r(itr->first);
- LogInfo("commit to ", r, " expired");
- itr = m_PersistingSessions.erase(itr);
- }
- }
- }
+ _linkManager.CheckPersistingSessions(now);
const size_t connected = NumberOfConnectedRouters();
const size_t N = nodedb()->num_loaded();
@@ -1228,30 +655,14 @@ namespace llarp
{
LogInfo("We need at least ", minRequiredRouters,
" service nodes to build paths but we have ", N, " in nodedb");
- // TODO: only connect to random subset
- if(bootstrapRCList.size())
- {
- for(const auto &rc : bootstrapRCList)
- {
- dht()->impl->ExploreNetworkVia(dht::Key_t{rc.pubkey});
- }
- // explore via every connected peer
- ForEachPeer([&](ILinkSession *s) {
- if(!s->IsEstablished())
- return;
- const RouterContact rc = s->GetRemoteRC();
- if(rc.IsPublicRouter())
- dht()->impl->ExploreNetworkVia(dht::Key_t{rc.pubkey});
- });
- }
- else
- LogError("we have no bootstrap nodes specified");
+
+ _rcLookupHandler.ExploreNetwork();
}
if(connected < minConnectedRouters)
{
size_t dlt = minConnectedRouters - connected;
LogInfo("connecting to ", dlt, " random routers to keep alive");
- ConnectToRandomRouters(dlt);
+ _outboundSessionMaker.ConnectToRandomRouters(dlt, now);
}
if(!isSvcNode)
@@ -1277,41 +688,6 @@ namespace llarp
return CryptoManager::instance()->sign(sig, identity(), buf);
}
- void
- Router::SendTo(RouterID remote, const ILinkMessage *msg, ILinkLayer *selected)
- {
- metrics::integerTick(msg->Name(), "to", 1, "tx", remote.ToString());
- llarp_buffer_t buf(linkmsg_buffer);
-
- if(!msg->BEncode(&buf))
- {
- LogWarn("failed to encode outbound message, buffer size left: ",
- buf.size_left());
- return;
- }
- // set size of message
- buf.sz = buf.cur - buf.base;
- buf.cur = buf.base;
- LogDebug("send ", buf.sz, " bytes to ", remote);
- if(selected)
- {
- if(selected->SendTo(remote, buf))
- return;
- }
- for(const auto &link : inboundLinks)
- {
- if(link->SendTo(remote, buf))
- return;
- }
- for(const auto &link : outboundLinks)
- {
- if(link->SendTo(remote, buf))
- return;
- }
- LogWarn("message to ", remote, " was dropped");
- metrics::integerTick(msg->Name(), "to", 1, "drop", remote.ToString());
- }
-
void
Router::ScheduleTicker(uint64_t ms)
{
@@ -1323,148 +699,48 @@ namespace llarp
{
dht::Key_t k(remote);
dht()->impl->Nodes()->DelNode(k);
- // remove from valid routers if it's a valid router
- validRouters.erase(remote);
+
LogInfo("Session to ", remote, " fully closed");
}
- ILinkLayer *
- Router::GetLinkWithSessionByPubkey(const RouterID &pubkey)
- {
- for(const auto &link : outboundLinks)
- {
- if(link->HasSessionTo(pubkey))
- return link.get();
- }
- for(const auto &link : inboundLinks)
- {
- if(link->HasSessionTo(pubkey))
- return link.get();
- }
- return nullptr;
- }
-
- void
- Router::FlushOutboundFor(RouterID remote, ILinkLayer *chosen)
- {
- LogDebug("Flush outbound for ", remote);
-
- auto itr = outboundMessageQueue.find(remote);
- if(itr == outboundMessageQueue.end())
- {
- pendingEstablishJobs.erase(remote);
- return;
- }
- // if for some reason we don't provide a link layer pick one that has it
- if(!chosen)
- {
- for(const auto &link : inboundLinks)
- {
- if(link->HasSessionTo(remote))
- {
- chosen = link.get();
- break;
- }
- }
- for(const auto &link : outboundLinks)
- {
- if(link->HasSessionTo(remote))
- {
- chosen = link.get();
- break;
- }
- }
- }
- while(itr->second.size())
- {
- llarp_buffer_t buf(itr->second.front());
- if(!chosen->SendTo(remote, buf))
- LogWarn("failed to send queued outbound message to ", remote, " via ",
- chosen->Name());
-
- itr->second.pop();
- }
- pendingEstablishJobs.erase(remote);
- outboundMessageQueue.erase(itr);
- }
-
- void
- Router::DiscardOutboundFor(const RouterID &remote)
- {
- outboundMessageQueue.erase(remote);
- }
-
bool
Router::GetRandomConnectedRouter(RouterContact &result) const
{
- auto sz = validRouters.size();
- if(sz)
- {
- auto itr = validRouters.begin();
- if(sz > 1)
- std::advance(itr, randint() % sz);
- result = itr->second;
- return true;
- }
- return false;
+ return _linkManager.GetRandomConnectedRouter(result);
}
- bool
- Router::async_verify_RC(const RouterContact rc)
+ void
+ Router::HandleDHTLookupForExplore(RouterID remote,
+ const std::vector< RouterContact > &results)
{
- if(rc.IsPublicRouter() && whitelistRouters && IsServiceNode())
+ for(const auto &rc : results)
{
- if(lokinetRouters.size() == 0)
- {
- LogError("we have no service nodes in whitelist");
- return false;
- }
- if(lokinetRouters.find(rc.pubkey) == lokinetRouters.end())
- {
- RouterID sn(rc.pubkey);
- LogInfo(sn, " is NOT a valid service node, rejecting");
- return false;
- }
+ _rcLookupHandler.CheckRC(rc);
}
- if(pendingVerifyRC.count(rc.pubkey))
- return true;
- LogInfo("session with ", RouterID(rc.pubkey), " established");
- llarp_async_verify_rc *job = &pendingVerifyRC[rc.pubkey];
- async_verify_context *ctx = new async_verify_context();
- ctx->router = this;
- ctx->establish_job = nullptr;
+ }
- auto itr = pendingEstablishJobs.find(rc.pubkey);
- if(itr != pendingEstablishJobs.end())
- ctx->establish_job = itr->second.get();
-
- job->user = ctx;
- job->rc = rc;
- job->valid = false;
- job->hook = nullptr;
-
- job->nodedb = _nodedb;
- job->logic = _logic;
- job->cryptoworker = cryptoworker;
- job->diskworker = disk;
- if(rc.IsPublicRouter())
- job->hook = &Router::on_verify_server_rc;
- else
- job->hook = &Router::on_verify_client_rc;
-
- llarp_nodedb_async_verify(job);
- return true;
+ // TODO: refactor callers and remove this function
+ void
+ Router::LookupRouter(RouterID remote, RouterLookupHandler resultHandler)
+ {
+ _rcLookupHandler.GetRC(
+ remote,
+ [=](const RouterID &id, const RouterContact *const rc,
+ const RCRequestResult result) {
+ (void)id;
+ std::vector< RouterContact > routers;
+ if(result == RCRequestResult::Success && rc != nullptr)
+ {
+ routers.push_back(*rc);
+ resultHandler(routers);
+ }
+ });
}
void
Router::SetRouterWhitelist(const std::vector< RouterID > &routers)
{
- lokinetRouters.clear();
- for(const auto &router : routers)
- lokinetRouters.emplace(router,
- std::numeric_limits< llarp_time_t >::max());
- LogInfo("lokinet service node list now has ", lokinetRouters.size(),
- " routers");
+ _rcLookupHandler.SetRouterWhitelist(routers);
}
/// this function ensure there are sane defualts in a net config
@@ -1544,18 +820,6 @@ namespace llarp
return false;
}
- for(const auto &rc : bootstrapRCList)
- {
- if(this->nodedb()->Insert(rc))
- {
- LogInfo("added bootstrap node ", RouterID(rc.pubkey));
- }
- else
- {
- LogError("Failed to add bootstrap node ", RouterID(rc.pubkey));
- }
- }
-
routerProfiling().Load(routerProfilesFile.c_str());
Addr publicAddr(this->addrInfo);
@@ -1565,14 +829,11 @@ namespace llarp
LogDebug("public address:port ", publicAddr);
}
- LogInfo("You have ", inboundLinks.size(), " inbound links");
-
// set public signing key
_rc.pubkey = seckey_topublic(identity());
AddressInfo ai;
- for(const auto &link : inboundLinks)
- {
+ _linkManager.ForEachInboundLink([&](LinkLayer_ptr link) {
if(link->GetOurAddressInfo(ai))
{
// override ip and port
@@ -1582,21 +843,17 @@ namespace llarp
ai.port = publicAddr.port();
}
if(IsBogon(ai.ip))
- continue;
+ return;
_rc.addrs.push_back(ai);
if(ExitEnabled())
{
const llarp::Addr addr(ai);
const nuint32_t a{addr.addr4()->s_addr};
_rc.exits.emplace_back(_rc.pubkey, a);
- LogInfo(
- "Neato teh l33toh, You are a freaking exit relay. w00t!!!!! your "
- "exit "
- "is advertised as exiting at ",
- a);
+ LogInfo("Exit relay started, advertised as exiting at: ", a);
}
}
- }
+ });
// set public encryption key
_rc.enckey = seckey_topublic(encryption());
@@ -1614,35 +871,15 @@ namespace llarp
return false;
}
- LogInfo("have ", nodedb->num_loaded(), " routers");
-
- LogInfo("starting outbound ", outboundLinks.size(), " links");
- for(const auto &link : outboundLinks)
+ if(!_linkManager.StartLinks(_logic))
{
- if(!link->Start(_logic))
- {
- LogWarn("outbound link '", link->Name(), "' failed to start");
- return false;
- }
- }
-
- int IBLinksStarted = 0;
-
- // start links
- for(const auto &link : inboundLinks)
- {
- if(link->Start(_logic))
- {
- LogDebug("Link ", link->Name(), " started");
- IBLinksStarted++;
- }
- else
- LogWarn("Link ", link->Name(), " failed to start");
+ LogWarn("One or more links failed to start.");
+ return false;
}
EnsureNetConfigDefaultsSane(netConfig);
- if(IBLinksStarted > 0)
+ if(IsServiceNode())
{
// initialize as service node
if(!InitServiceNode())
@@ -1685,7 +922,24 @@ namespace llarp
LogError("Failed to start hidden service context");
return false;
}
+
llarp_dht_context_start(dht(), pubkey());
+
+ for(const auto &rc : bootstrapRCList)
+ {
+ if(this->nodedb()->Insert(rc))
+ {
+ LogInfo("added bootstrap node ", RouterID(rc.pubkey));
+ }
+ else
+ {
+ LogError("Failed to add bootstrap node ", RouterID(rc.pubkey));
+ }
+ _dht->impl->Nodes()->PutNode(rc);
+ }
+
+ LogInfo("have ", nodedb->num_loaded(), " routers");
+
ScheduleTicker(1000);
_running.store(true);
_startedAt = Now();
@@ -1720,11 +974,7 @@ namespace llarp
void
Router::StopLinks()
{
- LogInfo("stopping links");
- for(const auto &link : outboundLinks)
- link->Stop();
- for(const auto &link : inboundLinks)
- link->Stop();
+ _linkManager.Stop();
}
void
@@ -1741,45 +991,20 @@ namespace llarp
_exitContext.Stop();
if(rpcServer)
rpcServer->Stop();
- PumpLL();
+ _linkManager.PumpLinks();
_logic->call_later({200, this, &RouterAfterStopIssued});
}
bool
Router::HasSessionTo(const RouterID &remote) const
{
- for(const auto &link : outboundLinks)
- if(link->HasSessionTo(remote))
- return true;
- for(const auto &link : inboundLinks)
- if(link->HasSessionTo(remote))
- return true;
- return false;
+ return _linkManager.HasSessionTo(remote);
}
void
Router::ConnectToRandomRouters(int want)
{
- int wanted = want;
- Router *self = this;
-
- self->nodedb()->visit([self, &want](const RouterContact &other) -> bool {
- // check if we really want to
- if(other.ExpiresSoon(self->Now(), 30000))
- return want > 0;
- if(!self->ConnectionToRouterAllowed(other.pubkey))
- return want > 0;
- if(randint() % 2 == 0
- && !(self->HasSessionTo(other.pubkey)
- || self->HasPendingConnectJob(other.pubkey)))
- {
- if(self->TryConnectAsync(other, 5))
- --want;
- }
- return want > 0;
- });
- LogInfo("connecting to ", abs(want - wanted), " out of ", wanted,
- " random routers");
+ _outboundSessionMaker.ConnectToRandomRouters(want, Now());
}
bool
@@ -1804,18 +1029,51 @@ namespace llarp
return true;
}
+ bool
+ Router::TryConnectAsync(RouterContact rc, uint16_t tries)
+ {
+ (void)tries;
+
+ if(rc.pubkey == pubkey())
+ {
+ return false;
+ }
+
+ if(!_rcLookupHandler.RemoteIsAllowed(rc.pubkey))
+ {
+ return false;
+ }
+
+ _outboundSessionMaker.CreateSessionTo(rc, nullptr);
+
+ return true;
+ }
+
bool
Router::InitOutboundLinks()
{
- if(outboundLinks.size() > 0)
- return true;
+ using LinkFactory = std::function< LinkLayer_ptr(
+ const SecretKey &, GetRCFunc, LinkMessageHandler,
+ SessionEstablishedHandler, SessionRenegotiateHandler, SignBufferFunc,
+ TimeoutHandler, SessionClosedHandler) >;
- static std::list< std::function< LinkLayer_ptr(Router *) > > linkFactories =
- {utp::NewServerFromRouter, iwp::NewServerFromRouter};
+ static std::list< LinkFactory > linkFactories = {utp::NewServer,
+ iwp::NewServer};
+ bool addedAtLeastOne = false;
for(const auto &factory : linkFactories)
{
- auto link = factory(this);
+ auto link = factory(
+ encryption(), util::memFn(&AbstractRouter::rc, this),
+ util::memFn(&AbstractRouter::HandleRecvLinkMessageBuffer, this),
+ util::memFn(&IOutboundSessionMaker::OnSessionEstablished,
+ &_outboundSessionMaker),
+ util::memFn(&AbstractRouter::CheckRenegotiateValid, this),
+ util::memFn(&AbstractRouter::Sign, this),
+ util::memFn(&IOutboundSessionMaker::OnConnectTimeout,
+ &_outboundSessionMaker),
+ util::memFn(&AbstractRouter::SessionClosed, this));
+
if(!link)
continue;
if(!link->EnsureKeys(transport_keyfile.string().c_str()))
@@ -1830,11 +1088,12 @@ namespace llarp
{
if(!link->Configure(netloop(), "*", af, m_OutboundPort))
continue;
- AddLink(std::move(link), false);
+ _linkManager.AddLink(std::move(link), false);
+ addedAtLeastOne = true;
break;
}
}
- return outboundLinks.size() > 0;
+ return addedAtLeastOne;
}
bool
@@ -1844,12 +1103,6 @@ namespace llarp
return hiddenServiceContext().AddDefaultEndpoint(netConfig);
}
- bool
- Router::HasPendingConnectJob(const RouterID &remote)
- {
- return pendingEstablishJobs.find(remote) != pendingEstablishJobs.end();
- }
-
bool
Router::LoadHiddenServiceConfig(string_view fname)
{
@@ -1867,4 +1120,17 @@ namespace llarp
}
return true;
}
+
+ void
+ Router::MessageSent(const RouterID &remote, SendStatus status)
+ {
+ if(status == SendStatus::Success)
+ {
+ LogInfo("Message successfully sent to ", remote);
+ }
+ else
+ {
+ LogWarn("Message failed sending to ", remote);
+ }
+ }
} // namespace llarp
diff --git a/llarp/router/router.hpp b/llarp/router/router.hpp
index 6c1c880fa..dd2287004 100644
--- a/llarp/router/router.hpp
+++ b/llarp/router/router.hpp
@@ -25,6 +25,10 @@
#include
#include
#include
+#include
+#include
+#include
+#include
#include
#include
@@ -50,8 +54,6 @@ bool
llarp_loadServiceNodeIdentityKey(const fs::path &fpath,
llarp::SecretKey &secretkey);
-struct TryConnectJob;
-
namespace llarp
{
struct Router final : public AbstractRouter
@@ -191,9 +193,6 @@ namespace llarp
bool
Sign(Signature &sig, const llarp_buffer_t &buf) const override;
- // buffer for serializing link messages
- std::array< byte_t, MAX_LINK_MSG_SIZE > linkmsg_buffer;
-
uint16_t m_OutboundPort = 0;
/// always maintain this many connections to other routers
@@ -233,10 +232,6 @@ namespace llarp
/// default network config for default network interface
NetConfig_t netConfig;
- /// identity keys whitelist of routers we will connect to directly (not for
- /// service nodes)
- std::set< RouterID > strictConnectPubkeys;
-
/// bootstrap RCs
std::set< RouterContact > bootstrapRCList;
@@ -267,55 +262,47 @@ namespace llarp
std::string lokidRPCUser;
std::string lokidRPCPassword;
- using LinkSet = std::set< LinkLayer_ptr, ComparePtr< LinkLayer_ptr > >;
-
- LinkSet outboundLinks;
- LinkSet inboundLinks;
-
Profiling _routerProfiling;
std::string routerProfilesFile = "profiles.dat";
- using MessageQueue = std::queue< std::vector< byte_t > >;
+ OutboundMessageHandler _outboundMessageHandler;
+ OutboundSessionMaker _outboundSessionMaker;
+ LinkManager _linkManager;
+ RCLookupHandler _rcLookupHandler;
- /// outbound message queue
- std::unordered_map< RouterID, MessageQueue, RouterID::Hash >
- outboundMessageQueue;
+ IOutboundMessageHandler &
+ outboundMessageHandler() override
+ {
+ return _outboundMessageHandler;
+ }
- /// loki verified routers
- std::unordered_map< RouterID, RouterContact, RouterID::Hash > validRouters;
+ IOutboundSessionMaker &
+ outboundSessionMaker() override
+ {
+ return _outboundSessionMaker;
+ }
- // pending establishing session with routers
- std::unordered_map< RouterID, std::shared_ptr< TryConnectJob >,
- RouterID::Hash >
- pendingEstablishJobs;
+ ILinkManager &
+ linkManager() override
+ {
+ return _linkManager;
+ }
- // pending RCs to be verified by pubkey
- std::unordered_map< RouterID, llarp_async_verify_rc, RouterID::Hash >
- pendingVerifyRC;
-
- // sessions to persist -> timestamp to end persist at
- std::unordered_map< RouterID, llarp_time_t, RouterID::Hash >
- m_PersistingSessions;
-
- // lokinet routers from lokid, maps pubkey to when we think it will expire,
- // set to max value right now
- std::unordered_map< RouterID, llarp_time_t, PubKey::Hash > lokinetRouters;
+ I_RCLookupHandler &
+ rcLookupHandler() override
+ {
+ return _rcLookupHandler;
+ }
Router(std::shared_ptr< llarp::thread::ThreadPool > worker,
llarp_ev_loop_ptr __netloop, std::shared_ptr< Logic > logic);
~Router();
- bool
- OnSessionEstablished(ILinkSession *) override;
-
bool
HandleRecvLinkMessageBuffer(ILinkSession *from,
const llarp_buffer_t &msg) override;
- void
- AddLink(std::shared_ptr< ILinkLayer > link, bool outbound = false);
-
bool
InitOutboundLinks();
@@ -341,10 +328,7 @@ namespace llarp
AddHiddenService(const service::Config::section_t &config);
bool
- Configure(Config *conf) override;
-
- bool
- Ready();
+ Configure(Config *conf, llarp_nodedb *nodedb = nullptr) override;
bool
Run(struct llarp_nodedb *nodedb) override;
@@ -381,15 +365,6 @@ namespace llarp
return seckey_topublic(_identity);
}
- void
- OnConnectTimeout(ILinkSession *session) override;
-
- bool
- HasPendingConnectJob(const RouterID &remote);
-
- bool
- HasPendingRouterLookup(const RouterID &remote) const override;
-
void
try_connect(fs::path rcfile);
@@ -397,6 +372,9 @@ namespace llarp
bool
Reconfigure(Config *conf) override;
+ bool
+ TryConnectAsync(RouterContact rc, uint16_t tries) override;
+
/// validate new configuration against old one
/// return true on 100% valid
/// return false if not 100% valid
@@ -411,37 +389,6 @@ namespace llarp
bool
SendToOrQueue(const RouterID &remote, const ILinkMessage *msg) override;
- /// sendto or drop
- void
- SendTo(RouterID remote, const ILinkMessage *msg, ILinkLayer *chosen);
-
- /// manually flush outbound message queue for just 1 router
- void
- FlushOutboundFor(RouterID remote, ILinkLayer *chosen = nullptr);
-
- void
- LookupRouter(RouterID remote, RouterLookupHandler handler) override;
-
- /// manually discard all pending messages to remote router
- void
- DiscardOutboundFor(const RouterID &remote);
-
- /// try establishing a session to a remote router
- void
- TryEstablishTo(const RouterID &remote);
-
- /// lookup a router by pubkey when it expires
- void
- LookupRouterWhenExpired(RouterID remote);
-
- void
- HandleDHTLookupForExplore(
- RouterID remote, const std::vector< RouterContact > &results) override;
-
- void
- HandleRouterLookupForExpireUpdate(
- RouterID remote, const std::vector< RouterContact > &results);
-
void
ForEachPeer(std::function< void(const ILinkSession *, bool) > visit,
bool randomize = false) const override;
@@ -458,10 +405,6 @@ namespace llarp
bool
CheckRenegotiateValid(RouterContact newRc, RouterContact oldRC) override;
- /// flush outbound message queue
- void
- FlushOutbound();
-
/// called by link when a remote session has no more sessions open
void
SessionClosed(RouterID remote) override;
@@ -481,9 +424,6 @@ namespace llarp
void
ScheduleTicker(uint64_t i = 1000);
- ILinkLayer *
- GetLinkWithSessionByPubkey(const RouterID &remote);
-
/// parse a routing message in a buffer and handle it with a handler if
/// successful parsing return true on parse and handle success otherwise
/// return false
@@ -503,52 +443,28 @@ namespace llarp
size_t
NumberOfConnectedClients() const override;
- /// count unique router id's given filter to match session
- size_t
- NumberOfRoutersMatchingFilter(
- std::function< bool(const ILinkSession *) > filter) const;
-
- /// count the number of connections that match filter
- size_t
- NumberOfConnectionsMatchingFilter(
- std::function< bool(const ILinkSession *) > filter) const;
-
- bool
- TryConnectAsync(RouterContact rc, uint16_t tries) override;
-
bool
GetRandomConnectedRouter(RouterContact &result) const override;
- bool
- async_verify_RC(const RouterContact rc);
+ void
+ HandleDHTLookupForExplore(
+ RouterID remote, const std::vector< RouterContact > &results) override;
void
- HandleDHTLookupForSendTo(RouterID remote,
- const std::vector< RouterContact > &results);
+ LookupRouter(RouterID remote, RouterLookupHandler resultHandler) override;
bool
HasSessionTo(const RouterID &remote) const override;
- void
- HandleDHTLookupForTryEstablishTo(
- RouterID remote, const std::vector< RouterContact > &results);
-
- static void
- on_verify_client_rc(llarp_async_verify_rc *context);
-
- static void
- on_verify_server_rc(llarp_async_verify_rc *context);
-
static void
handle_router_ticker(void *user, uint64_t orig, uint64_t left);
- static void
- HandleAsyncLoadRCForSendTo(llarp_async_load_rc *async);
-
private:
std::atomic< bool > _stopping;
std::atomic< bool > _running;
+ bool m_isServiceNode = false;
+
llarp_time_t m_LastStatsReport = 0;
bool
@@ -572,6 +488,9 @@ namespace llarp
bool
FromConfig(Config *conf);
+
+ void
+ MessageSent(const RouterID &remote, SendStatus status);
};
} // namespace llarp
diff --git a/llarp/router_contact.cpp b/llarp/router_contact.cpp
index d5710d04d..a4d3b54ae 100644
--- a/llarp/router_contact.cpp
+++ b/llarp/router_contact.cpp
@@ -32,12 +32,10 @@ namespace llarp
/// 1 day for real network
llarp_time_t RouterContact::Lifetime = 24 * 60 * 60 * 1000;
#endif
- /// every 30 minutes an RC is stale and needs updating
- llarp_time_t RouterContact::UpdateInterval = 30 * 60 * 1000;
- // 1 minute window for update
- llarp_time_t RouterContact::UpdateWindow = 60 * 1000;
- // try 5 times to update
- int RouterContact::UpdateTries = 5;
+ /// update RCs every 5 minutes
+ llarp_time_t RouterContact::UpdateInterval = 5 * 60 * 1000;
+ /// an RC inserted long enough ago (30 min) is considered stale and is removed
+ llarp_time_t RouterContact::StaleInsertionAge = 30 * 60 * 1000;
NetID::NetID(const byte_t *val) : AlignedBuffer< 8 >()
{
diff --git a/llarp/router_contact.hpp b/llarp/router_contact.hpp
index 75d20641f..7638e167c 100644
--- a/llarp/router_contact.hpp
+++ b/llarp/router_contact.hpp
@@ -70,8 +70,7 @@ namespace llarp
static llarp_time_t Lifetime;
static llarp_time_t UpdateInterval;
- static llarp_time_t UpdateWindow;
- static int UpdateTries;
+ static llarp_time_t StaleInsertionAge;
RouterContact()
{
diff --git a/llarp/util/threading.hpp b/llarp/util/threading.hpp
index 86734f282..6b885f175 100644
--- a/llarp/util/threading.hpp
+++ b/llarp/util/threading.hpp
@@ -35,9 +35,10 @@ namespace llarp
}
};
- using Mutex = absl::Mutex;
- using Lock = absl::MutexLock;
- using Condition = absl::CondVar;
+ using Mutex = absl::Mutex;
+ using Lock = absl::MutexLock;
+ using ReleasableLock = absl::ReleasableMutexLock;
+ using Condition = absl::CondVar;
class Semaphore
{
diff --git a/llarp/utp/utp.cpp b/llarp/utp/utp.cpp
index 495199bcd..b1c2f4d85 100644
--- a/llarp/utp/utp.cpp
+++ b/llarp/utp/utp.cpp
@@ -18,19 +18,6 @@ namespace llarp
reneg, timeout, closed);
}
- LinkLayer_ptr
- NewServerFromRouter(AbstractRouter* r)
- {
- return NewServer(
- r->encryption(), util::memFn(&AbstractRouter::rc, r),
- util::memFn(&AbstractRouter::HandleRecvLinkMessageBuffer, r),
- util::memFn(&AbstractRouter::OnSessionEstablished, r),
- util::memFn(&AbstractRouter::CheckRenegotiateValid, r),
- util::memFn(&AbstractRouter::Sign, r),
- util::memFn(&AbstractRouter::OnConnectTimeout, r),
- util::memFn(&AbstractRouter::SessionClosed, r));
- }
-
} // namespace utp
} // namespace llarp
diff --git a/llarp/utp/utp.hpp b/llarp/utp/utp.hpp
index fb8284345..fe1ef403e 100644
--- a/llarp/utp/utp.hpp
+++ b/llarp/utp/utp.hpp
@@ -15,9 +15,6 @@ namespace llarp
LinkMessageHandler h, SessionEstablishedHandler est,
SessionRenegotiateHandler reneg, SignBufferFunc sign,
TimeoutHandler timeout, SessionClosedHandler closed);
-
- LinkLayer_ptr
- NewServerFromRouter(AbstractRouter* r);
} // namespace utp
} // namespace llarp
diff --git a/test/dht/mock_context.hpp b/test/dht/mock_context.hpp
index eaa8643f1..0c9d8073c 100644
--- a/test/dht/mock_context.hpp
+++ b/test/dht/mock_context.hpp
@@ -117,6 +117,8 @@ namespace llarp
MOCK_METHOD0(AllowTransit, bool&());
MOCK_CONST_METHOD0(Nodes, dht::Bucket< dht::RCNode >*());
+ MOCK_METHOD1(PutRCNodeAsync, void(const dht::RCNode& val));
+ MOCK_METHOD1(DelRCNodeAsync, void(const dht::Key_t& val));
};
} // namespace test