From baf8019fe5f2e0a146acc8c0a821d844fe98db0f Mon Sep 17 00:00:00 2001 From: Thomas Winget Date: Wed, 26 Jun 2019 17:39:29 -0400 Subject: [PATCH] Refactor Router code into more classes This commit refactors functionality from the Router class into separate, dedicated classes. There are a few behavior changes that came as a result of discussion on what the correct behavior should be. In addition, many things Router was previously doing can now be provided callback functions to alert the calling point when the asynchronous action completes, successfully or otherwise. --- CMakeLists.txt | 10 +- llarp/CMakeLists.txt | 8 + llarp/context.cpp | 9 +- llarp/dht/context.cpp | 14 + llarp/dht/context.hpp | 6 + llarp/iwp/iwp.cpp | 13 - llarp/iwp/iwp.hpp | 3 - llarp/link/i_link_manager.cpp | 1 + llarp/link/i_link_manager.hpp | 85 ++ llarp/link/link_manager.cpp | 362 ++++++ llarp/link/link_manager.hpp | 100 ++ llarp/nodedb.cpp | 29 + llarp/nodedb.hpp | 5 +- llarp/router/abstractrouter.cpp | 4 - llarp/router/abstractrouter.hpp | 31 +- llarp/router/i_outbound_message_handler.cpp | 1 + llarp/router/i_outbound_message_handler.hpp | 43 + llarp/router/i_outbound_session_maker.cpp | 1 + llarp/router/i_outbound_session_maker.hpp | 59 + llarp/router/i_rc_lookup_handler.cpp | 1 + llarp/router/i_rc_lookup_handler.hpp | 63 + llarp/router/outbound_message_handler.cpp | 224 ++++ llarp/router/outbound_message_handler.hpp | 88 ++ llarp/router/outbound_session_maker.cpp | 317 +++++ llarp/router/outbound_session_maker.hpp | 103 ++ llarp/router/rc_lookup_handler.cpp | 337 +++++ llarp/router/rc_lookup_handler.hpp | 112 ++ llarp/router/router.cpp | 1244 ++++--------------- llarp/router/router.hpp | 163 +-- llarp/router_contact.cpp | 10 +- llarp/router_contact.hpp | 3 +- llarp/util/threading.hpp | 7 +- llarp/utp/utp.cpp | 13 - llarp/utp/utp.hpp | 3 - test/dht/mock_context.hpp | 2 + 35 files changed, 2294 insertions(+), 1180 deletions(-) create mode 100644 llarp/link/i_link_manager.cpp create mode 100644 llarp/link/i_link_manager.hpp create mode 100644 llarp/link/link_manager.cpp create mode 100644 llarp/link/link_manager.hpp create mode 100644 llarp/router/i_outbound_message_handler.cpp create mode 100644 llarp/router/i_outbound_message_handler.hpp create mode 100644 llarp/router/i_outbound_session_maker.cpp create mode 100644 llarp/router/i_outbound_session_maker.hpp create mode 100644 llarp/router/i_rc_lookup_handler.cpp create mode 100644 llarp/router/i_rc_lookup_handler.hpp create mode 100644 llarp/router/outbound_message_handler.cpp create mode 100644 llarp/router/outbound_message_handler.hpp create mode 100644 llarp/router/outbound_session_maker.cpp create mode 100644 llarp/router/outbound_session_maker.hpp create mode 100644 llarp/router/rc_lookup_handler.cpp create mode 100644 llarp/router/rc_lookup_handler.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index e686e6556..9c411c8f3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,6 +1,11 @@ # Lowest version - android ndk 3.6.0 cmake_minimum_required(VERSION 3.6.0) +find_program(CCACHE_PROGRAM ccache) +if(CCACHE_PROGRAM) + set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE "${CCACHE_PROGRAM}") +endif() + set(PROJECT_NAME lokinet) project(${PROJECT_NAME} C CXX) @@ -45,11 +50,6 @@ add_definitions(-D${CMAKE_SYSTEM_NAME}) get_filename_component(CORE_INCLUDE "${CMAKE_CURRENT_SOURCE_DIR}/include" ABSOLUTE) get_filename_component(ABYSS_INCLUDE "${CMAKE_CURRENT_SOURCE_DIR}/${ABYSS}/include" ABSOLUTE) -find_program(CCACHE_PROGRAM ccache) -if(CCACHE_PROGRAM) - set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE "${CCACHE_PROGRAM}") -endif() - if(MSVC_VERSION) enable_language(ASM_MASM) list(APPEND CMAKE_ASM_MASM_SOURCE_FILE_EXTENSIONS s) diff --git a/llarp/CMakeLists.txt b/llarp/CMakeLists.txt index 0a19450d6..26507a5a5 100644 --- a/llarp/CMakeLists.txt +++ b/llarp/CMakeLists.txt @@ -173,6 +173,8 @@ set(LIB_SRC iwp/linklayer.cpp iwp/outermessage.cpp iwp/iwp.cpp + link/i_link_manager.cpp + link/link_manager.cpp link/server.cpp link/session.cpp messages/dht_immediate.cpp @@ -195,6 +197,12 @@ set(LIB_SRC pow.cpp profiling.cpp router/abstractrouter.cpp + router/i_outbound_message_handler.cpp + router/outbound_message_handler.cpp + router/i_outbound_session_maker.cpp + router/outbound_session_maker.cpp + router/i_rc_lookup_handler.cpp + router/rc_lookup_handler.cpp router/router.cpp router_contact.cpp router_id.cpp diff --git a/llarp/context.cpp b/llarp/context.cpp index a3deb4c9f..20a8d8c63 100644 --- a/llarp/context.cpp +++ b/llarp/context.cpp @@ -159,8 +159,6 @@ __ ___ ____ _ _ ___ _ _ ____ int Context::LoadDatabase() { - nodedb = std::make_unique< llarp_nodedb >(router->diskworker()); - if(!llarp_nodedb::ensure_dir(nodedb_dir.c_str())) { llarp::LogError("nodedb_dir is incorrect"); @@ -233,16 +231,21 @@ __ ___ ____ _ _ ___ _ _ ____ cryptoManager = std::make_unique< CryptoManager >(crypto.get()); router = std::make_unique< Router >(worker, mainloop, logic); - if(!router->Configure(config.get())) + + nodedb = std::make_unique< llarp_nodedb >(router->diskworker()); + + if(!router->Configure(config.get(), nodedb.get())) { llarp::LogError("Failed to configure router"); return 1; } + // must be done after router is made so we can use its disk io worker // must also be done after configure so that netid is properly set if it // is provided by config if(!this->LoadDatabase()) return 1; + return 0; } diff --git a/llarp/dht/context.cpp b/llarp/dht/context.cpp index 576f684ac..c7c208f82 100644 --- a/llarp/dht/context.cpp +++ b/llarp/dht/context.cpp @@ -199,6 +199,20 @@ namespace llarp return _nodes.get(); } + void + PutRCNodeAsync(const RCNode& val) override + { + auto func = std::bind(&Bucket< RCNode >::PutNode, Nodes(), val); + router->logic()->queue_func(func); + } + + void + DelRCNodeAsync(const Key_t& val) override + { + auto func = std::bind(&Bucket< RCNode >::DelNode, Nodes(), val); + router->logic()->queue_func(func); + } + const Key_t& OurKey() const override { diff --git a/llarp/dht/context.hpp b/llarp/dht/context.hpp index 431fe1005..392e52ad4 100644 --- a/llarp/dht/context.hpp +++ b/llarp/dht/context.hpp @@ -173,6 +173,12 @@ namespace llarp virtual Bucket< RCNode >* Nodes() const = 0; + virtual void + PutRCNodeAsync(const RCNode& val) = 0; + + virtual void + DelRCNodeAsync(const Key_t& val) = 0; + virtual util::StatusObject ExtractStatus() const = 0; diff --git a/llarp/iwp/iwp.cpp b/llarp/iwp/iwp.cpp index 1c02b137c..457a51340 100644 --- a/llarp/iwp/iwp.cpp +++ b/llarp/iwp/iwp.cpp @@ -7,19 +7,6 @@ namespace llarp { namespace iwp { - std::unique_ptr< ILinkLayer > - NewServerFromRouter(AbstractRouter* r) - { - return NewServer( - r->encryption(), std::bind(&AbstractRouter::rc, r), - util::memFn(&AbstractRouter::HandleRecvLinkMessageBuffer, r), - util::memFn(&AbstractRouter::OnSessionEstablished, r), - util::memFn(&AbstractRouter::CheckRenegotiateValid, r), - util::memFn(&AbstractRouter::Sign, r), - util::memFn(&AbstractRouter::OnConnectTimeout, r), - util::memFn(&AbstractRouter::SessionClosed, r)); - } - std::unique_ptr< ILinkLayer > NewServer(const SecretKey& enckey, GetRCFunc getrc, LinkMessageHandler h, SessionEstablishedHandler est, SessionRenegotiateHandler reneg, diff --git a/llarp/iwp/iwp.hpp b/llarp/iwp/iwp.hpp index b3c5464ca..e7a10413e 100644 --- a/llarp/iwp/iwp.hpp +++ b/llarp/iwp/iwp.hpp @@ -18,9 +18,6 @@ namespace llarp llarp::SignBufferFunc sign, llarp::TimeoutHandler timeout, llarp::SessionClosedHandler closed); - std::unique_ptr< ILinkLayer > - NewServerFromRouter(AbstractRouter* r); - } // namespace iwp } // namespace llarp diff --git a/llarp/link/i_link_manager.cpp b/llarp/link/i_link_manager.cpp new file mode 100644 index 000000000..ffdfcdc38 --- /dev/null +++ b/llarp/link/i_link_manager.cpp @@ -0,0 +1 @@ +#include diff --git a/llarp/link/i_link_manager.hpp b/llarp/link/i_link_manager.hpp new file mode 100644 index 000000000..7015b8e67 --- /dev/null +++ b/llarp/link/i_link_manager.hpp @@ -0,0 +1,85 @@ +#ifndef LLARP_I_LINK_MANAGER_HPP +#define LLARP_I_LINK_MANAGER_HPP + +#include +#include +#include + +#include + +struct llarp_buffer_t; + +namespace llarp +{ + using Logic_ptr = std::shared_ptr< Logic >; + + struct RouterContact; + struct ILinkSession; + struct IOutboundSessionMaker; + struct RouterID; + + namespace util + { + struct StatusObject; + } // namespace util + + struct ILinkManager + { + virtual ~ILinkManager() = default; + + virtual LinkLayer_ptr + GetCompatibleLink(const RouterContact &rc) const = 0; + + virtual IOutboundSessionMaker * + GetSessionMaker() const = 0; + + virtual bool + SendTo(const RouterID &remote, const llarp_buffer_t &buf) = 0; + + virtual bool + HasSessionTo(const RouterID &remote) const = 0; + + virtual void + PumpLinks() = 0; + + virtual void + AddLink(LinkLayer_ptr link, bool inbound = false) = 0; + + virtual bool + StartLinks(Logic_ptr logic) = 0; + + virtual void + Stop() = 0; + + virtual void + PersistSessionUntil(const RouterID &remote, llarp_time_t until) = 0; + + virtual void + ForEachPeer(std::function< void(const ILinkSession *, bool) > visit, + bool randomize = false) const = 0; + + virtual void + ForEachPeer(std::function< void(ILinkSession *) > visit) = 0; + + virtual void + ForEachInboundLink(std::function< void(LinkLayer_ptr) > visit) const = 0; + + virtual size_t + NumberOfConnectedRouters() const = 0; + + virtual size_t + NumberOfConnectedClients() const = 0; + + virtual bool + GetRandomConnectedRouter(RouterContact &router) const = 0; + + virtual void + CheckPersistingSessions(llarp_time_t now) = 0; + + virtual util::StatusObject + ExtractStatus() const = 0; + }; + +} // namespace llarp + +#endif // LLARP_I_LINK_MANAGER_HPP diff --git a/llarp/link/link_manager.cpp b/llarp/link/link_manager.cpp new file mode 100644 index 000000000..d62e6c2cf --- /dev/null +++ b/llarp/link/link_manager.cpp @@ -0,0 +1,362 @@ +#include + +#include +#include + +#include +#include + +namespace llarp +{ + LinkLayer_ptr + LinkManager::GetCompatibleLink(const RouterContact &rc) const + { + if(stopping) + return nullptr; + + for(auto &link : outboundLinks) + { + // TODO: may want to add some memory of session failures for a given + // router on a given link and not return that link here for a + // duration + if(!link->IsCompatable(rc)) + continue; + + return link; + } + + return nullptr; + } + + IOutboundSessionMaker * + LinkManager::GetSessionMaker() const + { + return _sessionMaker; + } + + bool + LinkManager::SendTo(const RouterID &remote, const llarp_buffer_t &buf) + { + if(stopping) + return false; + + auto link = GetLinkWithSessionTo(remote); + if(link == nullptr) + { + return false; + } + + return link->SendTo(remote, buf); + } + + bool + LinkManager::HasSessionTo(const RouterID &remote) const + { + return GetLinkWithSessionTo(remote) != nullptr; + } + + void + LinkManager::PumpLinks() + { + for(const auto &link : inboundLinks) + { + link->Pump(); + } + for(const auto &link : outboundLinks) + { + link->Pump(); + } + } + + void + LinkManager::AddLink(LinkLayer_ptr link, bool inbound) + { + util::Lock l(&_mutex); + + if(inbound) + { + inboundLinks.emplace(link); + } + else + { + outboundLinks.emplace(link); + } + } + + bool + LinkManager::StartLinks(Logic_ptr logic) + { + LogInfo("starting ", outboundLinks.size(), " outbound links"); + for(const auto &link : outboundLinks) + { + if(!link->Start(logic)) + { + LogWarn("outbound link '", link->Name(), "' failed to start"); + return false; + } + LogDebug("Outbound Link ", link->Name(), " started"); + } + + if(inboundLinks.size()) + { + LogInfo("starting ", inboundLinks.size(), " inbound links"); + for(const auto &link : inboundLinks) + { + if(!link->Start(logic)) + { + LogWarn("Link ", link->Name(), " failed to start"); + return false; + } + LogDebug("Inbound Link ", link->Name(), " started"); + } + } + + return true; + } + + void + LinkManager::Stop() + { + if(stopping) + { + return; + } + + util::Lock l(&_mutex); + + LogInfo("stopping links"); + stopping = true; + + for(const auto &link : outboundLinks) + link->Stop(); + for(const auto &link : inboundLinks) + link->Stop(); + } + + void + LinkManager::PersistSessionUntil(const RouterID &remote, llarp_time_t until) + { + if(stopping) + return; + + util::Lock l(&_mutex); + + m_PersistingSessions[remote] = + std::max(until, m_PersistingSessions[remote]); + LogDebug("persist session to ", remote, " until ", + m_PersistingSessions[remote]); + } + + void + LinkManager::ForEachPeer( + std::function< void(const ILinkSession *, bool) > visit, + bool randomize) const + { + if(stopping) + return; + + for(const auto &link : outboundLinks) + { + link->ForEachSession( + [visit](const ILinkSession *peer) { visit(peer, true); }, randomize); + } + for(const auto &link : inboundLinks) + { + link->ForEachSession( + [visit](const ILinkSession *peer) { visit(peer, false); }, randomize); + } + } + + void + LinkManager::ForEachPeer(std::function< void(ILinkSession *) > visit) + { + if(stopping) + return; + + for(const auto &link : outboundLinks) + { + link->ForEachSession([visit](ILinkSession *peer) { visit(peer); }); + } + for(const auto &link : inboundLinks) + { + link->ForEachSession([visit](ILinkSession *peer) { visit(peer); }); + } + } + + void + LinkManager::ForEachInboundLink( + std::function< void(LinkLayer_ptr) > visit) const + { + for(const auto &link : inboundLinks) + { + visit(link); + } + } + + size_t + LinkManager::NumberOfConnectedRouters() const + { + std::set< RouterID > connectedRouters; + + auto fn = [&connectedRouters](const ILinkSession *session, bool) { + if(session->IsEstablished()) + { + const RouterContact rc(session->GetRemoteRC()); + if(rc.IsPublicRouter()) + { + connectedRouters.insert(rc.pubkey); + } + } + }; + + ForEachPeer(fn); + + return connectedRouters.size(); + } + + size_t + LinkManager::NumberOfConnectedClients() const + { + std::set< RouterID > connectedClients; + + auto fn = [&connectedClients](const ILinkSession *session, bool) { + if(session->IsEstablished()) + { + const RouterContact rc(session->GetRemoteRC()); + if(!rc.IsPublicRouter()) + { + connectedClients.insert(rc.pubkey); + } + } + }; + + ForEachPeer(fn); + + return connectedClients.size(); + } + + bool + LinkManager::GetRandomConnectedRouter(RouterContact &router) const + { + std::unordered_map< RouterID, RouterContact, RouterID::Hash > + connectedRouters; + + ForEachPeer( + [&connectedRouters](const ILinkSession *peer, bool unused) { + (void)unused; + connectedRouters[peer->GetPubKey()] = peer->GetRemoteRC(); + }, + false); + + const auto sz = connectedRouters.size(); + if(sz) + { + auto itr = connectedRouters.begin(); + if(sz > 1) + { + std::advance(itr, randint() % sz); + } + + router = itr->second; + + return true; + } + + return false; + } + + void + LinkManager::CheckPersistingSessions(llarp_time_t now) + { + if(stopping) + return; + + std::vector< RouterID > sessionsNeeded; + + { + util::Lock l(&_mutex); + + auto itr = m_PersistingSessions.begin(); + while(itr != m_PersistingSessions.end()) + { + auto link = GetLinkWithSessionTo(itr->first); + if(now < itr->second) + { + if(link) + { + LogDebug("keepalive to ", itr->first); + link->KeepAliveSessionTo(itr->first); + } + else + { + sessionsNeeded.push_back(itr->first); + } + ++itr; + } + else + { + const RouterID r(itr->first); + LogInfo("commit to ", r, " expired"); + itr = m_PersistingSessions.erase(itr); + } + } + } + + for(const auto &router : sessionsNeeded) + { + _sessionMaker->CreateSessionTo(router, nullptr); + } + } + + util::StatusObject + LinkManager::ExtractStatus() const + { + std::vector< util::StatusObject > ob_links, ib_links; + std::transform(inboundLinks.begin(), inboundLinks.end(), + std::back_inserter(ib_links), + [](const auto &link) -> util::StatusObject { + return link->ExtractStatus(); + }); + std::transform(outboundLinks.begin(), outboundLinks.end(), + std::back_inserter(ob_links), + [](const auto &link) -> util::StatusObject { + return link->ExtractStatus(); + }); + + util::StatusObject obj{{"outbound", ob_links}, {"inbound", ib_links}}; + + return obj; + } + + void + LinkManager::Init(IOutboundSessionMaker *sessionMaker) + { + stopping = false; + _sessionMaker = sessionMaker; + } + + LinkLayer_ptr + LinkManager::GetLinkWithSessionTo(const RouterID &remote) const + { + if(stopping) + return nullptr; + + for(const auto &link : inboundLinks) + { + if(link->HasSessionTo(remote)) + { + return link; + } + } + for(const auto &link : outboundLinks) + { + if(link->HasSessionTo(remote)) + { + return link; + } + } + + return nullptr; + } + +} // namespace llarp diff --git a/llarp/link/link_manager.hpp b/llarp/link/link_manager.hpp new file mode 100644 index 000000000..ef6aa5c30 --- /dev/null +++ b/llarp/link/link_manager.hpp @@ -0,0 +1,100 @@ +#ifndef LLARP_LINK_MANAGER_HPP +#define LLARP_LINK_MANAGER_HPP + +#include + +#include +#include +#include + +#include +#include +#include + +namespace llarp +{ + struct IRouterContactManager; + + struct LinkManager final : public ILinkManager + { + public: + ~LinkManager() = default; + + LinkLayer_ptr + GetCompatibleLink(const RouterContact &rc) const override; + + IOutboundSessionMaker * + GetSessionMaker() const override; + + bool + SendTo(const RouterID &remote, const llarp_buffer_t &buf) override; + + bool + HasSessionTo(const RouterID &remote) const override; + + void + PumpLinks() override; + + void + AddLink(LinkLayer_ptr link, bool inbound = false) override; + + bool + StartLinks(Logic_ptr logic) override; + + void + Stop() override; + + void + PersistSessionUntil(const RouterID &remote, llarp_time_t until) override; + + void + ForEachPeer(std::function< void(const ILinkSession *, bool) > visit, + bool randomize = false) const override; + + void + ForEachPeer(std::function< void(ILinkSession *) > visit) override; + + void + ForEachInboundLink( + std::function< void(LinkLayer_ptr) > visit) const override; + + size_t + NumberOfConnectedRouters() const override; + + size_t + NumberOfConnectedClients() const override; + + bool + GetRandomConnectedRouter(RouterContact &router) const override; + + void + CheckPersistingSessions(llarp_time_t now) override; + + virtual util::StatusObject + ExtractStatus() const override; + + void + Init(IOutboundSessionMaker *sessionMaker); + + private: + LinkLayer_ptr + GetLinkWithSessionTo(const RouterID &remote) const; + + std::atomic< bool > stopping; + mutable util::Mutex _mutex; // protects m_PersistingSessions + + using LinkSet = std::set< LinkLayer_ptr, ComparePtr< LinkLayer_ptr > >; + + LinkSet outboundLinks; + LinkSet inboundLinks; + + // sessions to persist -> timestamp to end persist at + std::unordered_map< RouterID, llarp_time_t, RouterID::Hash > + m_PersistingSessions GUARDED_BY(_mutex); + + IOutboundSessionMaker *_sessionMaker; + }; + +} // namespace llarp + +#endif // LLARP_LINK_MANAGER_HPP diff --git a/llarp/nodedb.cpp b/llarp/nodedb.cpp index 748e62b94..9d7134258 100644 --- a/llarp/nodedb.cpp +++ b/llarp/nodedb.cpp @@ -151,6 +151,13 @@ llarp_nodedb::UpdateAsyncIfNewer(llarp::RouterContact rc, InsertAsync(rc, logic, completionHandler); return true; } + else if(itr != entries.end()) + { + // insertion time is set on...insertion. But it should be updated here + // even if there is no insertion of a new RC, to show that the existing one + // is not "stale" + itr->second.inserted = llarp::time_now_ms(); + } return false; } @@ -189,6 +196,8 @@ llarp_nodedb::Insert(const llarp::RouterContact &rc) if(itr != entries.end()) entries.erase(itr); entries.emplace(rc.pubkey.as_array(), rc); + LogInfo("Added or updated RC for ", llarp::RouterID(rc.pubkey), + " to nodedb. Current nodedb count is: ", entries.size()); } return true; } @@ -313,6 +322,26 @@ llarp_nodedb::VisitInsertedBefore( } } +void +llarp_nodedb::RemoveStaleRCs(const std::set< llarp::RouterID > &keep, + llarp_time_t cutoff) +{ + std::set< llarp::RouterID > removeStale; + // remove stale routers + VisitInsertedBefore( + [&](const llarp::RouterContact &rc) { + if(keep.find(rc.pubkey) != keep.end()) + return; + LogInfo("removing stale router: ", llarp::RouterID(rc.pubkey)); + removeStale.insert(rc.pubkey); + }, + cutoff); + + RemoveIf([&removeStale](const llarp::RouterContact &rc) -> bool { + return removeStale.count(rc.pubkey) > 0; + }); +} + /* bool llarp_nodedb::Save() diff --git a/llarp/nodedb.hpp b/llarp/nodedb.hpp index 582298a20..29d29faa7 100644 --- a/llarp/nodedb.hpp +++ b/llarp/nodedb.hpp @@ -60,7 +60,7 @@ struct llarp_nodedb struct NetDBEntry { const llarp::RouterContact rc; - const llarp_time_t inserted; + llarp_time_t inserted; NetDBEntry(const llarp::RouterContact &data); }; @@ -140,6 +140,9 @@ struct llarp_nodedb VisitInsertedBefore(std::function< void(const llarp::RouterContact &) > visit, llarp_time_t insertedAfter) LOCKS_EXCLUDED(access); + void + RemoveStaleRCs(const std::set< llarp::RouterID > &keep, llarp_time_t cutoff); + size_t num_loaded() const LOCKS_EXCLUDED(access); diff --git a/llarp/router/abstractrouter.cpp b/llarp/router/abstractrouter.cpp index 9bb8be2f8..5f70ceeb7 100644 --- a/llarp/router/abstractrouter.cpp +++ b/llarp/router/abstractrouter.cpp @@ -3,10 +3,6 @@ namespace llarp { - AbstractRouter::~AbstractRouter() - { - } - void AbstractRouter::EnsureRouter(RouterID router, RouterLookupHandler handler) { diff --git a/llarp/router/abstractrouter.hpp b/llarp/router/abstractrouter.hpp index 654d4623d..df3bfbbda 100644 --- a/llarp/router/abstractrouter.hpp +++ b/llarp/router/abstractrouter.hpp @@ -24,6 +24,10 @@ namespace llarp struct Profiling; struct SecretKey; struct Signature; + struct IOutboundMessageHandler; + struct IOutboundSessionMaker; + struct ILinkManager; + struct I_RCLookupHandler; namespace exit { @@ -52,10 +56,7 @@ namespace llarp struct AbstractRouter { - virtual ~AbstractRouter() = 0; - - virtual bool - OnSessionEstablished(ILinkSession *) = 0; + virtual ~AbstractRouter() = default; virtual bool HandleRecvLinkMessageBuffer(ILinkSession *from, @@ -106,11 +107,23 @@ namespace llarp virtual const service::Context & hiddenServiceContext() const = 0; + virtual IOutboundMessageHandler & + outboundMessageHandler() = 0; + + virtual IOutboundSessionMaker & + outboundSessionMaker() = 0; + + virtual ILinkManager & + linkManager() = 0; + + virtual I_RCLookupHandler & + rcLookupHandler() = 0; + virtual bool Sign(Signature &sig, const llarp_buffer_t &buf) const = 0; virtual bool - Configure(Config *conf) = 0; + Configure(Config *conf, llarp_nodedb *nodedb) = 0; virtual bool Run(struct llarp_nodedb *nodedb) = 0; @@ -129,9 +142,6 @@ namespace llarp virtual const byte_t * pubkey() const = 0; - virtual void - OnConnectTimeout(ILinkSession *session) = 0; - /// connect to N random routers virtual void ConnectToRandomRouters(int N) = 0; @@ -219,11 +229,6 @@ namespace llarp virtual bool HasSessionTo(const RouterID &router) const = 0; - /// return true if we are currently looking up this router either directly - /// or via an anonymous endpoint - virtual bool - HasPendingRouterLookup(const RouterID &router) const = 0; - virtual util::StatusObject ExtractStatus() const = 0; diff --git a/llarp/router/i_outbound_message_handler.cpp b/llarp/router/i_outbound_message_handler.cpp new file mode 100644 index 000000000..6a4b6a25f --- /dev/null +++ b/llarp/router/i_outbound_message_handler.cpp @@ -0,0 +1 @@ +#include diff --git a/llarp/router/i_outbound_message_handler.hpp b/llarp/router/i_outbound_message_handler.hpp new file mode 100644 index 000000000..4789c1bf9 --- /dev/null +++ b/llarp/router/i_outbound_message_handler.hpp @@ -0,0 +1,43 @@ +#ifndef LLARP_ROUTER_I_OUTBOUND_MESSAGE_HANDLER_HPP +#define LLARP_ROUTER_I_OUTBOUND_MESSAGE_HANDLER_HPP + +#include +#include + +namespace llarp +{ + enum class SendStatus + { + Success, + Timeout, + NoLink, + InvalidRouter, + RouterNotFound, + Congestion + }; + + struct ILinkMessage; + struct RouterID; + + namespace util + { + struct StatusObject; + } + + using SendStatusHandler = std::function< void(SendStatus) >; + + struct IOutboundMessageHandler + { + virtual ~IOutboundMessageHandler() = default; + + virtual bool + QueueMessage(const RouterID &remote, const ILinkMessage *msg, + SendStatusHandler callback) = 0; + + virtual util::StatusObject + ExtractStatus() const = 0; + }; + +} // namespace llarp + +#endif // LLARP_ROUTER_I_OUTBOUND_MESSAGE_HANDLER_HPP diff --git a/llarp/router/i_outbound_session_maker.cpp b/llarp/router/i_outbound_session_maker.cpp new file mode 100644 index 000000000..35243e594 --- /dev/null +++ b/llarp/router/i_outbound_session_maker.cpp @@ -0,0 +1 @@ +#include diff --git a/llarp/router/i_outbound_session_maker.hpp b/llarp/router/i_outbound_session_maker.hpp new file mode 100644 index 000000000..fdebf0839 --- /dev/null +++ b/llarp/router/i_outbound_session_maker.hpp @@ -0,0 +1,59 @@ +#ifndef LLARP_ROUTER_I_OUTBOUND_SESSION_MAKER_HPP +#define LLARP_ROUTER_I_OUTBOUND_SESSION_MAKER_HPP + +#include + +#include + +namespace llarp +{ + namespace util + { + struct StatusObject; + } // namespace util + + struct ILinkSession; + struct RouterID; + struct RouterContact; + + enum class SessionResult + { + Establish, + Timeout, + RouterNotFound, + InvalidRouter, + NoLink + }; + + using RouterCallback = + std::function< void(const RouterID &, const SessionResult) >; + + struct IOutboundSessionMaker + { + virtual ~IOutboundSessionMaker() = default; + + virtual bool + OnSessionEstablished(ILinkSession *session) = 0; + + virtual void + OnConnectTimeout(ILinkSession *session) = 0; + + virtual void + CreateSessionTo(const RouterID &router, RouterCallback on_result) = 0; + + virtual void + CreateSessionTo(const RouterContact &rc, RouterCallback on_result) = 0; + + virtual bool + HavePendingSessionTo(const RouterID &router) const = 0; + + virtual void + ConnectToRandomRouters(int numDesired, llarp_time_t now) = 0; + + virtual util::StatusObject + ExtractStatus() const = 0; + }; + +} // namespace llarp + +#endif // LLARP_ROUTER_I_OUTBOUND_SESSION_MAKER_HPP diff --git a/llarp/router/i_rc_lookup_handler.cpp b/llarp/router/i_rc_lookup_handler.cpp new file mode 100644 index 000000000..5582e4c83 --- /dev/null +++ b/llarp/router/i_rc_lookup_handler.cpp @@ -0,0 +1 @@ +#include diff --git a/llarp/router/i_rc_lookup_handler.hpp b/llarp/router/i_rc_lookup_handler.hpp new file mode 100644 index 000000000..33ef5581f --- /dev/null +++ b/llarp/router/i_rc_lookup_handler.hpp @@ -0,0 +1,63 @@ +#ifndef LLARP_I_RC_LOOKUP_HANDLER_HPP +#define LLARP_I_RC_LOOKUP_HANDLER_HPP + +#include +#include + +#include +#include +#include + +namespace llarp +{ + struct RouterContact; + + enum class RCRequestResult + { + Success, + InvalidRouter, + RouterNotFound, + BadRC + }; + + using RCRequestCallback = std::function< void( + const RouterID &, const RouterContact *const, const RCRequestResult) >; + + struct I_RCLookupHandler + { + virtual ~I_RCLookupHandler() = default; + + virtual void + AddValidRouter(const RouterID &router) = 0; + + virtual void + RemoveValidRouter(const RouterID &router) = 0; + + virtual void + SetRouterWhitelist(const std::vector< RouterID > &routers) = 0; + + virtual void + GetRC(const RouterID &router, RCRequestCallback callback) = 0; + + virtual bool + RemoteIsAllowed(const RouterID &remote) const = 0; + + virtual bool + CheckRC(const RouterContact &rc) const = 0; + + virtual bool + GetRandomWhitelistRouter(RouterID &router) const = 0; + + virtual bool + CheckRenegotiateValid(RouterContact newrc, RouterContact oldrc) = 0; + + virtual void + PeriodicUpdate(llarp_time_t now) = 0; + + virtual void + ExploreNetwork() = 0; + }; + +} // namespace llarp + +#endif // LLARP_I_RC_LOOKUP_HANDLER_HPP diff --git a/llarp/router/outbound_message_handler.cpp b/llarp/router/outbound_message_handler.cpp new file mode 100644 index 000000000..fda79f737 --- /dev/null +++ b/llarp/router/outbound_message_handler.cpp @@ -0,0 +1,224 @@ +#include + +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace llarp +{ + bool + OutboundMessageHandler::QueueMessage(const RouterID &remote, + const ILinkMessage *msg, + SendStatusHandler callback) + { + std::array< byte_t, MAX_LINK_MSG_SIZE > linkmsg_buffer; + llarp_buffer_t buf(linkmsg_buffer); + + if(!EncodeBuffer(msg, buf)) + { + return false; + } + + Message message; + message.first.reserve(buf.sz); + message.second = callback; + + std::copy_n(buf.base, buf.sz, message.first.data()); + + if(SendIfSession(remote, message)) + { + return true; + } + + bool shouldCreateSession = false; + { + util::Lock l(&_mutex); + + // create queue for if it doesn't exist, and get iterator + auto itr_pair = outboundMessageQueue.emplace(remote, MessageQueue()); + + itr_pair.first->second.push_back(std::move(message)); + + shouldCreateSession = itr_pair.second; + } + + if(shouldCreateSession) + { + QueueSessionCreation(remote); + } + + return true; + } + + // TODO: this + util::StatusObject + OutboundMessageHandler::ExtractStatus() const + { + util::StatusObject status{}; + return status; + } + + void + OutboundMessageHandler::Init(ILinkManager *linkManager, + std::shared_ptr< Logic > logic) + { + _linkManager = linkManager; + _logic = logic; + } + + void + OutboundMessageHandler::OnSessionEstablished(const RouterID &router) + { + FinalizeRequest(router, SendStatus::Success); + } + + void + OutboundMessageHandler::OnConnectTimeout(const RouterID &router) + { + FinalizeRequest(router, SendStatus::Timeout); + } + + void + OutboundMessageHandler::OnRouterNotFound(const RouterID &router) + { + FinalizeRequest(router, SendStatus::RouterNotFound); + } + + void + OutboundMessageHandler::OnInvalidRouter(const RouterID &router) + { + FinalizeRequest(router, SendStatus::InvalidRouter); + } + + void + OutboundMessageHandler::OnNoLink(const RouterID &router) + { + FinalizeRequest(router, SendStatus::NoLink); + } + + void + OutboundMessageHandler::OnSessionResult(const RouterID &router, + const SessionResult result) + { + switch(result) + { + case SessionResult::Establish: + OnSessionEstablished(router); + break; + case SessionResult::Timeout: + OnConnectTimeout(router); + break; + case SessionResult::RouterNotFound: + OnRouterNotFound(router); + break; + case SessionResult::InvalidRouter: + OnInvalidRouter(router); + break; + case SessionResult::NoLink: + OnNoLink(router); + break; + default: + LogError("Impossible situation: enum class value out of bounds."); + std::abort(); + break; + } + } + + void + OutboundMessageHandler::DoCallback(SendStatusHandler callback, + SendStatus status) + { + if(callback) + { + auto func = std::bind(callback, status); + _logic->queue_func(func); + } + } + + void + OutboundMessageHandler::QueueSessionCreation(const RouterID &remote) + { + auto fn = util::memFn(&OutboundMessageHandler::OnSessionResult, this); + _linkManager->GetSessionMaker()->CreateSessionTo(remote, fn); + } + + bool + OutboundMessageHandler::EncodeBuffer(const ILinkMessage *msg, + llarp_buffer_t &buf) + { + if(!msg->BEncode(&buf)) + { + LogWarn("failed to encode outbound message, buffer size left: ", + buf.size_left()); + return false; + } + // set size of message + buf.sz = buf.cur - buf.base; + buf.cur = buf.base; + + return true; + } + + bool + OutboundMessageHandler::Send(const RouterID &remote, const Message &msg) + { + llarp_buffer_t buf(msg.first); + if(_linkManager->SendTo(remote, buf)) + { + DoCallback(msg.second, SendStatus::Success); + return true; + } + DoCallback(msg.second, SendStatus::Congestion); + return false; + } + + bool + OutboundMessageHandler::SendIfSession(const RouterID &remote, + const Message &msg) + { + if(_linkManager->HasSessionTo(remote)) + { + return Send(remote, msg); + } + return false; + } + + void + OutboundMessageHandler::FinalizeRequest(const RouterID &router, + SendStatus status) + { + MessageQueue movedMessages; + { + util::Lock l(&_mutex); + auto itr = outboundMessageQueue.find(router); + + if(itr == outboundMessageQueue.end()) + { + return; + } + + movedMessages.splice(movedMessages.begin(), itr->second); + + outboundMessageQueue.erase(itr); + } + + for(const auto &msg : movedMessages) + { + if(status == SendStatus::Success) + { + Send(router, msg); + } + else + { + DoCallback(msg.second, status); + } + } + } + +} // namespace llarp diff --git a/llarp/router/outbound_message_handler.hpp b/llarp/router/outbound_message_handler.hpp new file mode 100644 index 000000000..c29a201e0 --- /dev/null +++ b/llarp/router/outbound_message_handler.hpp @@ -0,0 +1,88 @@ +#ifndef LLARP_ROUTER_OUTBOUND_MESSAGE_HANDLER_HPP +#define LLARP_ROUTER_OUTBOUND_MESSAGE_HANDLER_HPP + +#include + +#include +#include +#include + +#include +#include +#include + +struct llarp_buffer_t; + +namespace llarp +{ + struct ILinkManager; + struct Logic; + enum class SessionResult; + + struct OutboundMessageHandler final : public IOutboundMessageHandler + { + public: + ~OutboundMessageHandler() = default; + + bool + QueueMessage(const RouterID &remote, const ILinkMessage *msg, + SendStatusHandler callback) override; + + util::StatusObject + ExtractStatus() const override; + + void + Init(ILinkManager *linkManager, std::shared_ptr< Logic > logic); + + private: + using Message = std::pair< std::vector< byte_t >, SendStatusHandler >; + using MessageQueue = std::list< Message >; + + void + OnSessionEstablished(const RouterID &router); + + void + OnConnectTimeout(const RouterID &router); + + void + OnRouterNotFound(const RouterID &router); + + void + OnInvalidRouter(const RouterID &router); + + void + OnNoLink(const RouterID &router); + + void + OnSessionResult(const RouterID &router, const SessionResult result); + + void + DoCallback(SendStatusHandler callback, SendStatus status); + + void + QueueSessionCreation(const RouterID &remote); + + bool + EncodeBuffer(const ILinkMessage *msg, llarp_buffer_t &buf); + + bool + Send(const RouterID &remote, const Message &msg); + + bool + SendIfSession(const RouterID &remote, const Message &msg); + + void + FinalizeRequest(const RouterID &router, SendStatus status); + + mutable util::Mutex _mutex; // protects outboundMessageQueue + + std::unordered_map< RouterID, MessageQueue, RouterID::Hash > + outboundMessageQueue GUARDED_BY(_mutex); + + ILinkManager *_linkManager; + std::shared_ptr< Logic > _logic; + }; + +} // namespace llarp + +#endif // LLARP_ROUTER_OUTBOUND_MESSAGE_HANDLER_HPP diff --git a/llarp/router/outbound_session_maker.cpp b/llarp/router/outbound_session_maker.cpp new file mode 100644 index 000000000..0fc3b7171 --- /dev/null +++ b/llarp/router/outbound_session_maker.cpp @@ -0,0 +1,317 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace llarp +{ + struct PendingSession + { + // TODO: add session establish status metadata, e.g. num retries + + const RouterContact rc; + LinkLayer_ptr link; + + size_t attemptCount = 0; + + PendingSession(const RouterContact &rc, LinkLayer_ptr link) + : rc(rc), link(link) + { + } + }; + + bool + OutboundSessionMaker::OnSessionEstablished(ILinkSession *session) + { + // TODO: do we want to keep it + + const auto router = RouterID(session->GetPubKey()); + + const std::string remoteType = + session->GetRemoteRC().IsPublicRouter() ? "router" : "client"; + LogInfo("session with ", remoteType, " [", router, "] established"); + + if(not _rcLookup->RemoteIsAllowed(router)) + { + FinalizeRequest(router, SessionResult::InvalidRouter); + return false; + } + + auto func = std::bind(&OutboundSessionMaker::VerifyRC, this, + session->GetRemoteRC()); + _threadpool->addJob(func); + + return true; + } + + void + OutboundSessionMaker::OnConnectTimeout(ILinkSession *session) + { + // TODO: retry/num attempts + + LogWarn("Session establish attempt to ", RouterID(session->GetPubKey()), + " timed out."); + FinalizeRequest(session->GetPubKey(), SessionResult::Timeout); + } + + void + OutboundSessionMaker::CreateSessionTo(const RouterID &router, + RouterCallback on_result) + { + if(on_result) + { + util::Lock l(&_mutex); + + auto itr_pair = pendingCallbacks.emplace(router, CallbacksQueue{}); + itr_pair.first->second.push_back(on_result); + } + + if(HavePendingSessionTo(router)) + { + return; + } + + CreatePendingSession(router); + + LogDebug("Creating session establish attempt to ", router, " ."); + + auto fn = util::memFn(&OutboundSessionMaker::OnRouterContactResult, this); + + _rcLookup->GetRC(router, fn); + } + + void + OutboundSessionMaker::CreateSessionTo(const RouterContact &rc, + RouterCallback on_result) + { + if(on_result) + { + util::Lock l(&_mutex); + + auto itr_pair = pendingCallbacks.emplace(rc.pubkey, CallbacksQueue{}); + itr_pair.first->second.push_back(on_result); + } + + if(not HavePendingSessionTo(rc.pubkey)) + { + LogDebug("Creating session establish attempt to ", rc.pubkey, " ."); + CreatePendingSession(rc.pubkey); + } + + GotRouterContact(rc.pubkey, rc); + } + + bool + OutboundSessionMaker::HavePendingSessionTo(const RouterID &router) const + { + util::Lock l(&_mutex); + return pendingSessions.find(router) != pendingSessions.end(); + } + + void + OutboundSessionMaker::ConnectToRandomRouters(int numDesired, llarp_time_t now) + { + int remainingDesired = numDesired; + + _nodedb->visit([&](const RouterContact &other) -> bool { + // check if we really remainingDesired to + if(other.ExpiresSoon(now, 30000)) // TODO: make delta configurable + { + return remainingDesired > 0; + } + if(!_rcLookup->RemoteIsAllowed(other.pubkey)) + { + return remainingDesired > 0; + } + if(randint() % 2 == 0 + && !(_linkManager->HasSessionTo(other.pubkey) + || HavePendingSessionTo(other.pubkey))) + { + CreateSessionTo(other, nullptr); + --remainingDesired; + } + return remainingDesired > 0; + }); + LogDebug("connecting to ", numDesired - remainingDesired, " out of ", + numDesired, " random routers"); + } + + // TODO: this + util::StatusObject + OutboundSessionMaker::ExtractStatus() const + { + util::StatusObject status{}; + return status; + } + + void + OutboundSessionMaker::Init( + ILinkManager *linkManager, I_RCLookupHandler *rcLookup, + std::shared_ptr< Logic > logic, llarp_nodedb *nodedb, + std::shared_ptr< llarp::thread::ThreadPool > threadpool) + { + _linkManager = linkManager; + _rcLookup = rcLookup; + _logic = logic; + _nodedb = nodedb; + _threadpool = threadpool; + } + + void + OutboundSessionMaker::DoEstablish(const RouterID &router) + { + util::ReleasableLock l(&_mutex); + + auto itr = pendingSessions.find(router); + + if(itr == pendingSessions.end()) + { + return; + } + + const auto &job = itr->second; + if(!job->link->TryEstablishTo(job->rc)) + { + // TODO: maybe different failure type? + + l.Release(); + FinalizeRequest(router, SessionResult::NoLink); + } + } + + void + OutboundSessionMaker::GotRouterContact(const RouterID &router, + const RouterContact &rc) + { + { + util::Lock l(&_mutex); + + // in case other request found RC for this router after this request was + // made + auto itr = pendingSessions.find(router); + if(itr == pendingSessions.end()) + { + return; + } + + LinkLayer_ptr link = _linkManager->GetCompatibleLink(rc); + + if(!link) + { + FinalizeRequest(router, SessionResult::NoLink); + return; + } + + auto session = std::make_shared< PendingSession >(rc, link); + + itr->second = session; + } + + auto fn = std::bind(&OutboundSessionMaker::DoEstablish, this, router); + _logic->queue_func(fn); + } + + void + OutboundSessionMaker::InvalidRouter(const RouterID &router) + { + FinalizeRequest(router, SessionResult::InvalidRouter); + } + + void + OutboundSessionMaker::RouterNotFound(const RouterID &router) + { + FinalizeRequest(router, SessionResult::RouterNotFound); + } + + void + OutboundSessionMaker::OnRouterContactResult(const RouterID &router, + const RouterContact *const rc, + const RCRequestResult result) + { + if(not HavePendingSessionTo(router)) + { + return; + } + + switch(result) + { + case RCRequestResult::Success: + if(rc) + { + GotRouterContact(router, *rc); + } + else + { + LogError("RCRequestResult::Success but null rc pointer given"); + } + break; + case RCRequestResult::InvalidRouter: + InvalidRouter(router); + break; + case RCRequestResult::RouterNotFound: + RouterNotFound(router); + break; + default: + break; + } + } + + void + OutboundSessionMaker::VerifyRC(const RouterContact rc) + { + if(not _rcLookup->CheckRC(rc)) + { + FinalizeRequest(rc.pubkey, SessionResult::InvalidRouter); + return; + } + + FinalizeRequest(rc.pubkey, SessionResult::Establish); + } + + void + OutboundSessionMaker::CreatePendingSession(const RouterID &router) + { + util::Lock l(&_mutex); + pendingSessions.emplace(router, nullptr); + } + + void + OutboundSessionMaker::FinalizeRequest(const RouterID &router, + const SessionResult type) + { + CallbacksQueue movedCallbacks; + { + util::Lock l(&_mutex); + + // TODO: Router profiling stuff + + auto itr = pendingCallbacks.find(router); + + if(itr != pendingCallbacks.end()) + { + movedCallbacks.splice(movedCallbacks.begin(), itr->second); + pendingCallbacks.erase(itr); + } + } + + for(const auto &callback : movedCallbacks) + { + auto func = std::bind(callback, router, type); + _logic->queue_func(func); + } + + { + util::Lock l(&_mutex); + pendingSessions.erase(router); + } + } + +} // namespace llarp diff --git a/llarp/router/outbound_session_maker.hpp b/llarp/router/outbound_session_maker.hpp new file mode 100644 index 000000000..4329e2e06 --- /dev/null +++ b/llarp/router/outbound_session_maker.hpp @@ -0,0 +1,103 @@ +#ifndef LLARP_ROUTER_OUTBOUND_SESSION_MAKER_HPP +#define LLARP_ROUTER_OUTBOUND_SESSION_MAKER_HPP + +#include + +#include +#include +#include +#include + +#include +#include +#include + +struct llarp_nodedb; + +namespace llarp +{ + struct PendingSession; + + struct ILinkManager; + struct I_RCLookupHandler; + + struct OutboundSessionMaker final : public IOutboundSessionMaker + { + using CallbacksQueue = std::list< RouterCallback >; + + public: + ~OutboundSessionMaker() = default; + + bool + OnSessionEstablished(ILinkSession *session) override; + + void + OnConnectTimeout(ILinkSession *session) override; + + void + CreateSessionTo(const RouterID &router, + RouterCallback on_result) /* override */; + + void + CreateSessionTo(const RouterContact &rc, + RouterCallback on_result) /* override */; + + bool + HavePendingSessionTo(const RouterID &router) const override; + + void + ConnectToRandomRouters(int numDesired, llarp_time_t now) override; + + util::StatusObject + ExtractStatus() const override; + + void + Init(ILinkManager *linkManager, I_RCLookupHandler *rcLookup, + std::shared_ptr< Logic > logic, llarp_nodedb *nodedb, + std::shared_ptr< llarp::thread::ThreadPool > threadpool); + + private: + void + DoEstablish(const RouterID &router); + + void + GotRouterContact(const RouterID &router, const RouterContact &rc); + + void + InvalidRouter(const RouterID &router); + + void + RouterNotFound(const RouterID &router); + + void + OnRouterContactResult(const RouterID &router, const RouterContact *const rc, + const RCRequestResult result); + + void + VerifyRC(const RouterContact rc); + + void + CreatePendingSession(const RouterID &router); + + void + FinalizeRequest(const RouterID &router, const SessionResult type); + + mutable util::Mutex _mutex; // protects pendingSessions, pendingCallbacks + + std::unordered_map< RouterID, std::shared_ptr< PendingSession >, + RouterID::Hash > + pendingSessions GUARDED_BY(_mutex); + + std::unordered_map< RouterID, CallbacksQueue, RouterID::Hash > + pendingCallbacks GUARDED_BY(_mutex); + + ILinkManager *_linkManager; + I_RCLookupHandler *_rcLookup; + std::shared_ptr< Logic > _logic; + llarp_nodedb *_nodedb; + std::shared_ptr< llarp::thread::ThreadPool > _threadpool; + }; + +} // namespace llarp + +#endif // LLARP_ROUTER_OUTBOUND_SESSION_MAKER_HPP diff --git a/llarp/router/rc_lookup_handler.cpp b/llarp/router/rc_lookup_handler.cpp new file mode 100644 index 000000000..c6f343dc2 --- /dev/null +++ b/llarp/router/rc_lookup_handler.cpp @@ -0,0 +1,337 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace llarp +{ + void + RCLookupHandler::AddValidRouter(const RouterID &router) + { + util::Lock l(&_mutex); + whitelistRouters.insert(router); + } + + void + RCLookupHandler::RemoveValidRouter(const RouterID &router) + { + util::Lock l(&_mutex); + whitelistRouters.erase(router); + } + + void + RCLookupHandler::SetRouterWhitelist(const std::vector< RouterID > &routers) + { + util::Lock l(&_mutex); + + whitelistRouters.clear(); + for(auto &router : routers) + { + whitelistRouters.emplace(router); + } + + LogInfo("lokinet service node list now has ", whitelistRouters.size(), + " routers"); + } + + void + RCLookupHandler::GetRC(const RouterID &router, RCRequestCallback callback) + { + RouterContact remoteRC; + + if(_nodedb->Get(router, remoteRC)) + { + if(callback) + { + callback(router, &remoteRC, RCRequestResult::Success); + } + FinalizeRequest(router, &remoteRC, RCRequestResult::Success); + return; + } + + bool shouldDoLookup = false; + + { + util::Lock l(&_mutex); + + auto itr_pair = pendingCallbacks.emplace(router, CallbacksQueue{}); + + if(callback) + { + itr_pair.first->second.push_back(callback); + } + shouldDoLookup = itr_pair.second; + } + + if(shouldDoLookup) + { + auto fn = std::bind(&RCLookupHandler::HandleDHTLookupResult, this, router, + std::placeholders::_1); + + // if we are a client try using the hidden service endpoints + if(!isServiceNode) + { + bool sent = false; + LogInfo("Lookup ", router, " anonymously"); + _hiddenServiceContext->ForEachService( + [&](const std::string &, + const std::shared_ptr< service::Endpoint > &ep) -> bool { + const bool success = ep->LookupRouterAnon(router, fn); + sent = sent || success; + return !success; + }); + if(sent) + return; + LogWarn("cannot lookup ", router, " anonymously"); + } + + if(!_dht->impl->LookupRouter(router, fn)) + { + FinalizeRequest(router, nullptr, RCRequestResult::RouterNotFound); + } + } + } + + bool + RCLookupHandler::RemoteIsAllowed(const RouterID &remote) const + { + if(_strictConnectPubkeys.size() && _strictConnectPubkeys.count(remote) == 0 + && !RemoteInBootstrap(remote)) + { + return false; + } + + util::Lock l(&_mutex); + + if(useWhitelist && whitelistRouters.find(remote) == whitelistRouters.end()) + { + return false; + } + + return true; + } + + bool + RCLookupHandler::CheckRC(const RouterContact &rc) const + { + if(not RemoteIsAllowed(rc.pubkey)) + { + _dht->impl->DelRCNodeAsync(dht::Key_t{rc.pubkey}); + return false; + } + + if(not rc.Verify(_dht->impl->Now())) + { + return false; + } + + // update nodedb if required + if(rc.IsPublicRouter()) + { + LogInfo("Adding or updating RC for ", RouterID(rc.pubkey), + " to nodedb and dht."); + _nodedb->UpdateAsyncIfNewer(rc); + _dht->impl->PutRCNodeAsync(rc); + } + + return true; + } + + bool + RCLookupHandler::GetRandomWhitelistRouter(RouterID &router) const + { + util::Lock l(&_mutex); + + const auto sz = whitelistRouters.size(); + auto itr = whitelistRouters.begin(); + if(sz == 0) + return false; + if(sz > 1) + std::advance(itr, randint() % sz); + router = *itr; + return true; + } + + bool + RCLookupHandler::CheckRenegotiateValid(RouterContact newrc, + RouterContact oldrc) + { + // missmatch of identity ? + if(newrc.pubkey != oldrc.pubkey) + return false; + + if(!RemoteIsAllowed(newrc.pubkey)) + return false; + + auto func = std::bind(&RCLookupHandler::CheckRC, this, newrc); + _threadpool->addJob(func); + + // update dht if required + if(_dht->impl->Nodes()->HasNode(dht::Key_t{newrc.pubkey})) + { + _dht->impl->Nodes()->PutNode(newrc); + } + + // TODO: check for other places that need updating the RC + return true; + } + + void + RCLookupHandler::PeriodicUpdate(llarp_time_t now) + { + // try looking up stale routers + std::set< RouterID > routersToLookUp; + + _nodedb->VisitInsertedBefore( + [&](const RouterContact &rc) { + if(HavePendingLookup(rc.pubkey)) + return; + routersToLookUp.insert(rc.pubkey); + }, + now - RouterContact::UpdateInterval); + + for(const auto &router : routersToLookUp) + { + GetRC(router, nullptr); + } + + _nodedb->RemoveStaleRCs(_bootstrapRouterIDList, + now - RouterContact::StaleInsertionAge); + } + + void + RCLookupHandler::ExploreNetwork() + { + if(_bootstrapRCList.size()) + { + for(const auto &rc : _bootstrapRCList) + { + LogInfo("Doing explore via bootstrap node: ", RouterID(rc.pubkey)); + _dht->impl->ExploreNetworkVia(dht::Key_t{rc.pubkey}); + } + } + else + { + LogError("we have no bootstrap nodes specified"); + } + + // TODO: only explore via random subset + // explore via every connected peer + _linkManager->ForEachPeer([&](ILinkSession *s) { + if(!s->IsEstablished()) + return; + const RouterContact rc = s->GetRemoteRC(); + if(rc.IsPublicRouter() + && (_bootstrapRCList.find(rc) == _bootstrapRCList.end())) + { + LogInfo("Doing explore via public node: ", RouterID(rc.pubkey)); + _dht->impl->ExploreNetworkVia(dht::Key_t{rc.pubkey}); + } + }); + } + + void + RCLookupHandler::Init(llarp_dht_context *dht, llarp_nodedb *nodedb, + std::shared_ptr< llarp::thread::ThreadPool > threadpool, + ILinkManager *linkManager, + service::Context *hiddenServiceContext, + const std::set< RouterID > &strictConnectPubkeys, + const std::set< RouterContact > &bootstrapRCList, + bool useWhitelist_arg, bool isServiceNode_arg) + { + _dht = dht; + _nodedb = nodedb; + _threadpool = threadpool; + _hiddenServiceContext = hiddenServiceContext; + _strictConnectPubkeys = strictConnectPubkeys; + _bootstrapRCList = bootstrapRCList; + _linkManager = linkManager; + useWhitelist = useWhitelist_arg; + isServiceNode = isServiceNode_arg; + + for(const auto &rc : _bootstrapRCList) + { + _bootstrapRouterIDList.insert(rc.pubkey); + } + } + + void + RCLookupHandler::HandleDHTLookupResult( + RouterID remote, const std::vector< RouterContact > &results) + { + if(not results.size()) + { + FinalizeRequest(remote, nullptr, RCRequestResult::RouterNotFound); + return; + } + + if(not RemoteIsAllowed(remote)) + { + FinalizeRequest(remote, &results[0], RCRequestResult::InvalidRouter); + return; + } + + if(not CheckRC(results[0])) + { + FinalizeRequest(remote, &results[0], RCRequestResult::BadRC); + return; + } + + FinalizeRequest(remote, &results[0], RCRequestResult::Success); + } + + bool + RCLookupHandler::HavePendingLookup(RouterID remote) const + { + return pendingCallbacks.find(remote) != pendingCallbacks.end(); + } + + bool + RCLookupHandler::RemoteInBootstrap(const RouterID &remote) const + { + for(const auto &rc : _bootstrapRCList) + { + if(rc.pubkey == remote) + { + return true; + } + } + return false; + } + + void + RCLookupHandler::FinalizeRequest(const RouterID &router, + const RouterContact *const rc, + RCRequestResult result) + { + CallbacksQueue movedCallbacks; + { + util::Lock l(&_mutex); + + auto itr = pendingCallbacks.find(router); + + if(itr != pendingCallbacks.end()) + { + movedCallbacks.splice(movedCallbacks.begin(), itr->second); + pendingCallbacks.erase(itr); + } + } // lock + + for(const auto &callback : movedCallbacks) + { + callback(router, rc, result); + } + } + +} // namespace llarp diff --git a/llarp/router/rc_lookup_handler.hpp b/llarp/router/rc_lookup_handler.hpp new file mode 100644 index 000000000..1638da6c6 --- /dev/null +++ b/llarp/router/rc_lookup_handler.hpp @@ -0,0 +1,112 @@ +#ifndef LLARP_RC_LOOKUP_HANDLER_HPP +#define LLARP_RC_LOOKUP_HANDLER_HPP + +#include + +#include +#include + +#include +#include +#include + +struct llarp_nodedb; +struct llarp_dht_context; + +namespace llarp +{ + namespace service + { + struct Context; + + } // namespace service + + struct ILinkManager; + + struct RCLookupHandler final : public I_RCLookupHandler + { + public: + using CallbacksQueue = std::list< RCRequestCallback >; + + ~RCLookupHandler() = default; + + void + AddValidRouter(const RouterID &router) override; + + void + RemoveValidRouter(const RouterID &router) override; + + void + SetRouterWhitelist(const std::vector< RouterID > &routers) override; + + void + GetRC(const RouterID &router, RCRequestCallback callback) override; + + bool + RemoteIsAllowed(const RouterID &remote) const override; + + bool + CheckRC(const RouterContact &rc) const override; + + bool + GetRandomWhitelistRouter(RouterID &router) const override; + + bool + CheckRenegotiateValid(RouterContact newrc, RouterContact oldrc) override; + + void + PeriodicUpdate(llarp_time_t now) override; + + void + ExploreNetwork() override; + + void + Init(llarp_dht_context *dht, llarp_nodedb *nodedb, + std::shared_ptr< llarp::thread::ThreadPool > threadpool, + ILinkManager *linkManager, service::Context *hiddenServiceContext, + const std::set< RouterID > &strictConnectPubkeys, + const std::set< RouterContact > &bootstrapRCList, + bool useWhitelist_arg, bool isServiceNode_arg); + + private: + void + HandleDHTLookupResult(RouterID remote, + const std::vector< RouterContact > &results); + + bool + HavePendingLookup(RouterID remote) const; + + bool + RemoteInBootstrap(const RouterID &remote) const; + + void + FinalizeRequest(const RouterID &router, const RouterContact *const rc, + RCRequestResult result); + + mutable util::Mutex _mutex; // protects pendingCallbacks, whitelistRouters + + llarp_dht_context *_dht = nullptr; + llarp_nodedb *_nodedb = nullptr; + std::shared_ptr< llarp::thread::ThreadPool > _threadpool = nullptr; + service::Context *_hiddenServiceContext = nullptr; + ILinkManager *_linkManager = nullptr; + + /// explicit whitelist of routers we will connect to directly (not for + /// service nodes) + std::set< RouterID > _strictConnectPubkeys; + + std::set< RouterContact > _bootstrapRCList; + std::set< RouterID > _bootstrapRouterIDList; + + std::unordered_map< RouterID, CallbacksQueue, RouterID::Hash > + pendingCallbacks GUARDED_BY(_mutex); + + bool useWhitelist = false; + bool isServiceNode = false; + + std::set< RouterID > whitelistRouters GUARDED_BY(_mutex); + }; + +} // namespace llarp + +#endif // LLARP_RC_LOOKUP_HANDLER_HPP diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index 6e1136933..f9231c3ac 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -30,93 +30,6 @@ #include #endif -namespace llarp -{ - struct async_verify_context - { - Router *router; - TryConnectJob *establish_job; - }; - -} // namespace llarp - -struct TryConnectJob -{ - llarp_time_t lastAttempt = 0; - const llarp::RouterContact rc; - llarp::LinkLayer_ptr link; - llarp::Router *router; - uint16_t triesLeft; - TryConnectJob(const llarp::RouterContact &remote, llarp::LinkLayer_ptr l, - uint16_t tries, llarp::Router *r) - : rc(remote), link(l), router(r), triesLeft(tries) - { - } - - ~TryConnectJob() - { - } - - bool - TimeoutReached() const - { - const auto now = router->Now(); - return now > lastAttempt && now - lastAttempt > 5000; - } - - void - Success() - { - router->routerProfiling().MarkConnectSuccess(rc.pubkey); - router->FlushOutboundFor(rc.pubkey, link.get()); - } - - /// return true to remove - bool - Timeout() - { - if(ShouldRetry()) - { - return Attempt(); - } - // discard pending traffic on timeout - router->DiscardOutboundFor(rc.pubkey); - router->routerProfiling().MarkConnectTimeout(rc.pubkey); - if(router->routerProfiling().IsBad(rc.pubkey)) - { - if(!router->IsBootstrapNode(rc.pubkey)) - router->nodedb()->Remove(rc.pubkey); - } - return true; - } - - /// return true to remove - bool - Attempt() - { - --triesLeft; - if(!link) - return true; - if(!link->TryEstablishTo(rc)) - return true; - lastAttempt = router->Now(); - return false; - } - - bool - ShouldRetry() const - { - return triesLeft > 0; - } -}; - -static void -on_try_connecting(std::shared_ptr< TryConnectJob > j) -{ - if(j->Attempt()) - j->router->pendingEstablishJobs.erase(j->rc.pubkey); -} - bool llarp_loadServiceNodeIdentityKey(const fs::path &fpath, llarp::SecretKey &secret) @@ -165,46 +78,6 @@ llarp_findOrCreateEncryption(const fs::path &path, llarp::SecretKey &encryption) namespace llarp { - bool - Router::TryConnectAsync(RouterContact remote, uint16_t numretries) - { - const RouterID us = pubkey(); - if(remote.pubkey == us) - return false; - if(!ConnectionToRouterAllowed(remote.pubkey)) - return false; - // do we already have a pending job for this remote? - if(HasPendingConnectJob(remote.pubkey)) - { - LogDebug("We have pending connect jobs to ", remote.pubkey); - return false; - } - - for(auto &link : outboundLinks) - { - if(!link->IsCompatable(remote)) - continue; - std::shared_ptr< TryConnectJob > job = - std::make_shared< TryConnectJob >(remote, link, numretries, this); - auto itr = pendingEstablishJobs.emplace(remote.pubkey, job); - if(itr.second) - { - // try establishing async - _logic->queue_func(std::bind(&on_try_connecting, job)); - return true; - } - - itr.first->second->Attempt(); - } - return false; - } - - bool - Router::OnSessionEstablished(ILinkSession *s) - { - return async_verify_RC(s->GetRemoteRC()); - } - Router::Router(std::shared_ptr< llarp::thread::ThreadPool > _tp, llarp_ev_loop_ptr __netloop, std::shared_ptr< Logic > l) : ready(false) @@ -238,19 +111,9 @@ namespace llarp util::StatusObject obj{{"dht", _dht->impl->ExtractStatus()}, {"services", _hiddenServiceContext.ExtractStatus()}, {"exit", _exitContext.ExtractStatus()}}; - std::vector< util::StatusObject > ob_links, ib_links; - std::transform(inboundLinks.begin(), inboundLinks.end(), - std::back_inserter(ib_links), - [](const auto &link) -> util::StatusObject { - return link->ExtractStatus(); - }); - std::transform(outboundLinks.begin(), outboundLinks.end(), - std::back_inserter(ob_links), - [](const auto &link) -> util::StatusObject { - return link->ExtractStatus(); - }); - obj.Put("links", - util::StatusObject{{"outbound", ob_links}, {"inbound", ib_links}}); + + obj.Put("links", _linkManager.ExtractStatus()); + return obj; } @@ -272,15 +135,17 @@ namespace llarp void Router::PersistSessionUntil(const RouterID &remote, llarp_time_t until) { - m_PersistingSessions[remote] = - std::max(until, m_PersistingSessions[remote]); - LogDebug("persist session to ", remote, " until ", - m_PersistingSessions[remote]); + _linkManager.PersistSessionUntil(remote, until); } bool Router::GetRandomGoodRouter(RouterID &router) { + if(whitelistRouters) + { + return _rcLookupHandler.GetRandomWhitelistRouter(router); + } + auto pick_router = [&](auto &collection) -> bool { const auto sz = collection.size(); auto itr = collection.begin(); @@ -291,10 +156,7 @@ namespace llarp router = itr->first; return true; }; - if(whitelistRouters) - { - pick_router(lokinetRouters); - } + absl::ReaderMutexLock l(&nodedb()->access); return pick_router(nodedb()->entries); } @@ -302,115 +164,28 @@ namespace llarp void Router::PumpLL() { - for(const auto &link : inboundLinks) - { - link->Pump(); - } - for(const auto &link : outboundLinks) - { - link->Pump(); - } + _linkManager.PumpLinks(); } bool Router::SendToOrQueue(const RouterID &remote, const ILinkMessage *msg) { - for(const auto &link : inboundLinks) - { - if(link->HasSessionTo(remote)) - { - SendTo(remote, msg, link.get()); - return true; - } - } - for(const auto &link : outboundLinks) - { - if(link->HasSessionTo(remote)) - { - SendTo(remote, msg, link.get()); - return true; - } - } - // no link available - - // this will create an entry in the outbound mq if it's not already there - auto itr = outboundMessageQueue.find(remote); - if(itr == outboundMessageQueue.end()) - { - outboundMessageQueue.emplace(remote, MessageQueue()); - } - // encode - llarp_buffer_t buf(linkmsg_buffer); - if(!msg->BEncode(&buf)) - return false; - // queue buffer - auto &q = outboundMessageQueue[remote]; - - buf.sz = buf.cur - buf.base; - q.emplace(buf.sz); - memcpy(q.back().data(), buf.base, buf.sz); - RouterContact remoteRC; - // we don't have an open session to that router right now - if(nodedb()->Get(remote, remoteRC)) - { - // try connecting directly as the rc is loaded from disk - return TryConnectAsync(remoteRC, 10); - } - - // we don't have the RC locally so do a dht lookup - _dht->impl->LookupRouter(remote, - std::bind(&Router::HandleDHTLookupForSendTo, this, - remote, std::placeholders::_1)); - return true; - } - - void - Router::HandleDHTLookupForSendTo(RouterID remote, - const std::vector< RouterContact > &results) - { - if(results.size()) - { - if(whitelistRouters - && lokinetRouters.find(results[0].pubkey) == lokinetRouters.end()) - { - return; - } - if(results[0].Verify(Now())) - { - TryConnectAsync(results[0], 10); - return; - } - } - DiscardOutboundFor(remote); + using std::placeholders::_1; + auto func = std::bind(&Router::MessageSent, this, remote, _1); + return _outboundMessageHandler.QueueMessage(remote, msg, func); } void Router::ForEachPeer(std::function< void(const ILinkSession *, bool) > visit, bool randomize) const { - for(const auto &link : outboundLinks) - { - link->ForEachSession( - [visit](const ILinkSession *peer) { visit(peer, true); }, randomize); - } - for(const auto &link : inboundLinks) - { - link->ForEachSession( - [visit](const ILinkSession *peer) { visit(peer, false); }, randomize); - } + _linkManager.ForEachPeer(visit, randomize); } void Router::ForEachPeer(std::function< void(ILinkSession *) > visit) { - for(const auto &link : outboundLinks) - { - link->ForEachSession([visit](ILinkSession *peer) { visit(peer); }); - } - for(const auto &link : inboundLinks) - { - link->ForEachSession([visit](ILinkSession *peer) { visit(peer); }); - } + _linkManager.ForEachPeer(visit); } void @@ -425,11 +200,7 @@ namespace llarp if(remote.Verify(Now())) { LogDebug("verified signature"); - if(!TryConnectAsync(remote, 10)) - { - // or error? - LogWarn("session already made"); - } + _outboundSessionMaker.CreateSessionTo(remote, nullptr); } else LogError(rcfile, " contains invalid RC"); @@ -452,36 +223,25 @@ namespace llarp return llarp_findOrCreateEncryption(encryption_keyfile, _encryption); } - void - Router::AddLink(std::shared_ptr< ILinkLayer > link, bool inbound) - { - if(inbound) - inboundLinks.emplace(link); - else - outboundLinks.emplace(link); - } - bool - Router::Configure(Config *conf) + Router::Configure(Config *conf, llarp_nodedb *nodedb) { + if(nodedb == nullptr) + { + LogError( + "Attempting to Router::Configure but passed null nodedb pointer"); + return false; + } + _nodedb = nodedb; + if(!FromConfig(conf)) return false; if(!InitOutboundLinks()) return false; - if(!Ready()) - { - return false; - } return EnsureIdentity(); } - bool - Router::Ready() - { - return outboundLinks.size() > 0; - } - /// called in disk worker thread void Router::HandleSaveRC() const @@ -507,7 +267,7 @@ namespace llarp bool Router::IsServiceNode() const { - return inboundLinks.size() > 0; + return m_isServiceNode; } void @@ -515,69 +275,10 @@ namespace llarp { LogInfo("closing router"); llarp_ev_loop_stop(_netloop); - inboundLinks.clear(); - outboundLinks.clear(); disk->stop(); disk->shutdown(); } - void - Router::on_verify_client_rc(llarp_async_verify_rc *job) - { - async_verify_context *ctx = - static_cast< async_verify_context * >(job->user); - auto router = ctx->router; - const PubKey pk(job->rc.pubkey); - router->FlushOutboundFor(pk, router->GetLinkWithSessionByPubkey(pk)); - delete ctx; - router->pendingVerifyRC.erase(pk); - router->pendingEstablishJobs.erase(pk); - } - - void - Router::on_verify_server_rc(llarp_async_verify_rc *job) - { - async_verify_context *ctx = - static_cast< async_verify_context * >(job->user); - auto router = ctx->router; - const PubKey pk(job->rc.pubkey); - if(!job->valid) - { - delete ctx; - router->DiscardOutboundFor(pk); - router->pendingVerifyRC.erase(pk); - return; - } - // we're valid, which means it's already been committed to the nodedb - - LogDebug("rc verified and saved to nodedb"); - - if(router->validRouters.count(pk)) - { - router->validRouters.erase(pk); - } - - const RouterContact rc = job->rc; - - router->validRouters.emplace(pk, rc); - - // track valid router in dht - router->dht()->impl->Nodes()->PutNode(rc); - - // mark success in profile - router->routerProfiling().MarkConnectSuccess(pk); - - // this was an outbound establish job - if(ctx->establish_job) - { - ctx->establish_job->Success(); - } - else - router->FlushOutboundFor(pk, router->GetLinkWithSessionByPubkey(pk)); - delete ctx; - router->pendingVerifyRC.erase(pk); - } - void Router::handle_router_ticker(void *user, uint64_t orig, uint64_t left) { @@ -600,141 +301,19 @@ namespace llarp bool Router::ConnectionToRouterAllowed(const RouterID &router) const { - if(strictConnectPubkeys.size() && strictConnectPubkeys.count(router) == 0) - return false; - if(IsServiceNode() && whitelistRouters) - return lokinetRouters.find(router) != lokinetRouters.end(); - - return true; - } - - void - Router::HandleDHTLookupForExplore(RouterID, - const std::vector< RouterContact > &results) - { - const auto numConnected = NumberOfConnectedRouters(); - for(const auto &rc : results) - { - if(!rc.Verify(Now())) - continue; - nodedb()->UpdateAsyncIfNewer(rc); - - if(ConnectionToRouterAllowed(rc.pubkey) - && numConnected < minConnectedRouters) - TryConnectAsync(rc, 10); - } - } - - void - Router::TryEstablishTo(const RouterID &remote) - { - const RouterID us = pubkey(); - if(us == remote) - return; - - if(!ConnectionToRouterAllowed(remote)) - { - LogWarn("not connecting to ", remote, " as it's not permitted by config"); - return; - } - - RouterContact rc; - if(nodedb()->Get(remote, rc)) - { - // try connecting async - TryConnectAsync(rc, 5); - } - else if(IsServiceNode()) - { - if(dht()->impl->HasRouterLookup(remote)) - return; - LogInfo("looking up router ", remote); - // dht lookup as we don't know it - dht()->impl->LookupRouter( - remote, - std::bind(&Router::HandleDHTLookupForTryEstablishTo, this, remote, - std::placeholders::_1)); - } - else - { - LogWarn("not connecting to ", remote, " as it's unreliable"); - } - } - - void - Router::OnConnectTimeout(ILinkSession *session) - { - auto itr = pendingEstablishJobs.find(session->GetPubKey()); - if(itr != pendingEstablishJobs.end()) - { - if(itr->second->Timeout()) - pendingEstablishJobs.erase(itr); - } - } - - void - Router::HandleDHTLookupForTryEstablishTo( - RouterID remote, const std::vector< RouterContact > &results) - { - if(results.size() == 0) - { - if(!IsServiceNode()) - routerProfiling().MarkConnectTimeout(remote); - } - for(const auto &result : results) - { - if(whitelistRouters - && lokinetRouters.find(result.pubkey) == lokinetRouters.end()) - continue; - nodedb()->UpdateAsyncIfNewer(result); - TryConnectAsync(result, 10); - } - } - - size_t - Router::NumberOfRoutersMatchingFilter( - std::function< bool(const ILinkSession *) > filter) const - { - std::set< RouterID > connected; - ForEachPeer([&](const auto *link, bool) { - if(filter(link)) - connected.insert(link->GetPubKey()); - }); - return connected.size(); + return _rcLookupHandler.RemoteIsAllowed(router); } size_t Router::NumberOfConnectedRouters() const { - return NumberOfRoutersMatchingFilter([&](const ILinkSession *link) -> bool { - if(!link->IsEstablished()) - return false; - const RouterContact rc(link->GetRemoteRC()); - return rc.IsPublicRouter() && ConnectionToRouterAllowed(rc.pubkey); - }); + return _linkManager.NumberOfConnectedRouters(); } size_t Router::NumberOfConnectedClients() const { - return NumberOfRoutersMatchingFilter([&](const ILinkSession *link) -> bool { - if(!link->IsEstablished()) - return false; - const RouterContact rc(link->GetRemoteRC()); - return !rc.IsPublicRouter(); - }); - } - - size_t - Router::NumberOfConnectionsMatchingFilter( - std::function< bool(const ILinkSession *) > filter) const - { - size_t sz = 0; - ForEachPeer([&](const auto *link, bool) { - if(filter(link)) - ++sz; - }); - return sz; + return _linkManager.NumberOfConnectedClients(); } bool @@ -807,55 +386,13 @@ namespace llarp lokidRPCUser = conf->lokid.lokidRPCUser; lokidRPCPassword = conf->lokid.lokidRPCPassword; - if(!usingSNSeed) + // TODO: add config flag for "is service node" + if(conf->iwp_links.servers().size()) { - ident_keyfile = conf->router.identKeyfile(); + m_isServiceNode = true; } - for(const auto &serverConfig : conf->iwp_links.servers()) - { - auto server = llarp::utp::NewServerFromRouter(this); - if(!server->EnsureKeys(transport_keyfile.string().c_str())) - { - llarp::LogError("failed to ensure keyfile ", transport_keyfile); - return false; - } - - const auto &key = std::get< 0 >(serverConfig); - int af = std::get< 1 >(serverConfig); - uint16_t port = std::get< 2 >(serverConfig); - if(!server->Configure(netloop(), key, af, port)) - { - LogError("failed to bind inbound link on ", key, " port ", port); - return false; - } - AddLink(std::move(server), true); - } - - // set network config - netConfig = conf->network.netConfig(); - - // Network config - if(conf->network.enableProfiling().has_value()) - { - if(conf->network.enableProfiling().value()) - { - routerProfiling().Enable(); - LogInfo("router profiling explicitly enabled"); - } - else - { - routerProfiling().Disable(); - LogInfo("router profiling explicitly disabled"); - } - } - - if(!conf->network.routerProfilesFile().empty()) - { - routerProfilesFile = conf->network.routerProfilesFile(); - routerProfiling().Load(routerProfilesFile.c_str()); - llarp::LogInfo("setting profiles to ", routerProfilesFile); - } + std::set< RouterID > strictConnectPubkeys; if(!conf->network.strictConnect().empty()) { @@ -888,6 +425,111 @@ namespace llarp llarp::LogError("invalid key for strict-connect: ", val); } + std::vector< std::string > configRouters = conf->connect.routers; + configRouters.insert(configRouters.end(), conf->bootstrap.routers.begin(), + conf->bootstrap.routers.end()); + for(const auto &router : configRouters) + { + // llarp::LogDebug("connect section has ", key, "=", val); + RouterContact rc; + if(!rc.Read(router.c_str())) + { + llarp::LogWarn("failed to decode bootstrap RC, file='", router, + "' rc=", rc); + return false; + } + if(rc.Verify(Now())) + { + const auto result = bootstrapRCList.insert(rc); + if(result.second) + llarp::LogInfo("Added bootstrap node ", RouterID(rc.pubkey)); + else + llarp::LogWarn("Duplicate bootstrap node ", RouterID(rc.pubkey)); + } + else + { + if(rc.IsExpired(Now())) + { + llarp::LogWarn("Bootstrap node ", RouterID(rc.pubkey), + " is too old and needs to be refreshed"); + } + else + { + llarp::LogError("malformed rc file='", router, "' rc=", rc); + } + } + } + + // Init components after relevant config settings loaded + _outboundMessageHandler.Init(&_linkManager, _logic); + _outboundSessionMaker.Init(&_linkManager, &_rcLookupHandler, _logic, + _nodedb, threadpool()); + _linkManager.Init(&_outboundSessionMaker); + _rcLookupHandler.Init(_dht, _nodedb, threadpool(), &_linkManager, + &_hiddenServiceContext, strictConnectPubkeys, + bootstrapRCList, whitelistRouters, m_isServiceNode); + + if(!usingSNSeed) + { + ident_keyfile = conf->router.identKeyfile(); + } + + // create inbound links, if we are a service node + for(const auto &serverConfig : conf->iwp_links.servers()) + { + auto server = llarp::utp::NewServer( + encryption(), util::memFn(&AbstractRouter::rc, this), + util::memFn(&AbstractRouter::HandleRecvLinkMessageBuffer, this), + util::memFn(&IOutboundSessionMaker::OnSessionEstablished, + &_outboundSessionMaker), + util::memFn(&AbstractRouter::CheckRenegotiateValid, this), + util::memFn(&AbstractRouter::Sign, this), + util::memFn(&IOutboundSessionMaker::OnConnectTimeout, + &_outboundSessionMaker), + util::memFn(&AbstractRouter::SessionClosed, this)); + + if(!server->EnsureKeys(transport_keyfile.string().c_str())) + { + llarp::LogError("failed to ensure keyfile ", transport_keyfile); + return false; + } + + const auto &key = std::get< 0 >(serverConfig); + int af = std::get< 1 >(serverConfig); + uint16_t port = std::get< 2 >(serverConfig); + if(!server->Configure(netloop(), key, af, port)) + { + LogError("failed to bind inbound link on ", key, " port ", port); + return false; + } + _linkManager.AddLink(std::move(server), true); + } + + // set network config + netConfig = conf->network.netConfig(); + + // Network config + if(conf->network.enableProfiling().has_value()) + { + if(conf->network.enableProfiling().value()) + { + routerProfiling().Enable(); + LogInfo("router profiling explicitly enabled"); + } + else + { + routerProfiling().Disable(); + LogInfo("router profiling explicitly disabled"); + } + } + + if(!conf->network.routerProfilesFile().empty()) + { + routerProfilesFile = conf->network.routerProfilesFile(); + routerProfiling().Load(routerProfilesFile.c_str()); + llarp::LogInfo("setting profiles to ", routerProfilesFile); + } + // API config enableRPCServer = conf->api.enableRPCServer(); rpcBindAddr = conf->api.rpcBindAddr(); @@ -923,146 +565,13 @@ namespace llarp netConfig.insert(conf->dns.netConfig.begin(), conf->dns.netConfig.end()); - std::vector< std::string > configRouters = conf->connect.routers; - configRouters.insert(configRouters.end(), conf->bootstrap.routers.begin(), - conf->bootstrap.routers.end()); - for(const auto &router : configRouters) - { - // llarp::LogDebug("connect section has ", key, "=", val); - RouterContact rc; - if(!rc.Read(router.c_str())) - { - llarp::LogWarn("failed to decode bootstrap RC, file='", router, - "' rc=", rc); - return false; - } - if(rc.Verify(Now())) - { - const auto result = bootstrapRCList.insert(rc); - if(result.second) - llarp::LogInfo("Added bootstrap node ", RouterID(rc.pubkey)); - else - llarp::LogWarn("Duplicate bootstrap node ", RouterID(rc.pubkey)); - } - else - { - if(rc.IsExpired(Now())) - { - llarp::LogWarn("Bootstrap node ", RouterID(rc.pubkey), - " is too old and needs to be refreshed"); - } - else - { - llarp::LogError("malformed rc file='", router, "' rc=", rc); - } - } - } return true; } bool Router::CheckRenegotiateValid(RouterContact newrc, RouterContact oldrc) { - // missmatch of identity ? - if(newrc.pubkey != oldrc.pubkey) - return false; - - // store it in nodedb async - if(!async_verify_RC(newrc)) - return false; - // update dht if required - if(dht()->impl->Nodes()->HasNode(dht::Key_t{newrc.pubkey})) - { - dht()->impl->Nodes()->PutNode(newrc); - } - // update valid routers - { - auto itr = validRouters.find(newrc.pubkey); - if(itr == validRouters.end()) - validRouters[newrc.pubkey] = newrc; - else - itr->second = newrc; - } - // TODO: check for other places that need updating the RC - return true; - } - - void - Router::LookupRouterWhenExpired(RouterID router) - { - LookupRouter(router, - std::bind(&Router::HandleRouterLookupForExpireUpdate, this, - router, std::placeholders::_1)); - } - - void - Router::HandleRouterLookupForExpireUpdate( - RouterID router, const std::vector< RouterContact > &result) - { - const auto now = Now(); - RouterContact current; - if(nodedb()->Get(router, current)) - { - if(current.IsExpired(now)) - { - nodedb()->Remove(router); - } - } - if(result.size() == 1 && !result[0].IsExpired(now)) - { - LogInfo("storing rc for ", router); - nodedb()->UpdateAsyncIfNewer(result[0]); - } - else - { - LogInfo("not storing rc for ", router); - } - } - - bool - Router::HasPendingRouterLookup(const RouterID &remote) const - { - if(IsServiceNode()) - return dht()->impl->HasRouterLookup(remote); - bool has = false; - _hiddenServiceContext.ForEachService( - [&has, remote](const std::string &, - const std::shared_ptr< service::Endpoint > &ep) -> bool { - has |= ep->HasPendingRouterLookup(remote); - return true; - }); - return has; - } - - void - Router::LookupRouter(RouterID remote, RouterLookupHandler resultHandler) - { - if(!resultHandler) - { - resultHandler = std::bind(&Router::HandleRouterLookupForExpireUpdate, - this, remote, std::placeholders::_1); - } - - // if we are a client try using the hidden service endpoints - if(!IsServiceNode()) - { - bool sent = false; - LogInfo("Lookup ", remote, " anonymously"); - _hiddenServiceContext.ForEachService( - [&](const std::string &, - const std::shared_ptr< service::Endpoint > &ep) -> bool { - const bool success = ep->LookupRouterAnon(remote, resultHandler); - sent = sent || success; - return !success; - }); - if(sent) - return; - LogWarn("cannot lookup ", remote, " anonymously"); - } - // if we are service node or failed to use hidden service endpoints as - // client do a direct lookup - LogInfo("Lookup ", remote, " direct"); - dht()->impl->LookupRouter(remote, resultHandler); + return _rcLookupHandler.CheckRenegotiateValid(newrc, oldrc); } bool @@ -1114,44 +623,10 @@ namespace llarp ReportStats(); } - const auto insertRouters = [&](const std::vector< RouterContact > &res) { - // store found routers - for(const auto &rc : res) - { - // don't accept expired RCs - if(rc.Verify(Now(), false)) - nodedb()->InsertAsync(rc); - } - }; + _rcLookupHandler.PeriodicUpdate(now); const bool isSvcNode = IsServiceNode(); - // try looking up stale routers - nodedb()->VisitInsertedBefore( - [&](const RouterContact &rc) { - if(HasPendingRouterLookup(rc.pubkey)) - return; - LookupRouter(rc.pubkey, insertRouters); - }, - now - RouterContact::UpdateInterval); - - std::set< RouterID > removeStale; - // remove stale routers - const auto timeout = - RouterContact::UpdateWindow * RouterContact::UpdateTries; - nodedb()->VisitInsertedBefore( - [&](const RouterContact &rc) { - if(IsBootstrapNode(rc.pubkey)) - return; - LogInfo("removing stale router: ", RouterID(rc.pubkey)); - removeStale.insert(rc.pubkey); - }, - now - timeout); - - nodedb()->RemoveIf([removeStale](const RouterContact &rc) -> bool { - return removeStale.count(rc.pubkey) > 0; - }); - if(isSvcNode) { if(_rc.ExpiresSoon(now, randint() % 10000) @@ -1162,65 +637,17 @@ namespace llarp LogError("Failed to update our RC"); } - /* - // kill nodes that are not allowed by network policy + // remove RCs for nodes that are no longer allowed by network policy nodedb()->RemoveIf([&](const RouterContact &rc) -> bool { if(IsBootstrapNode(rc.pubkey)) return false; - return !ConnectionToRouterAllowed(rc.pubkey); + return !_rcLookupHandler.RemoteIsAllowed(rc.pubkey); }); - */ } // expire paths paths.ExpirePaths(now); - { - auto itr = pendingEstablishJobs.begin(); - while(itr != pendingEstablishJobs.end()) - { - if(itr->second->TimeoutReached() && itr->second->Timeout()) - { - LogWarn("failed to connect to ", itr->first); - itr = pendingEstablishJobs.erase(itr); - } - else - ++itr; - } - } - { - auto itr = m_PersistingSessions.begin(); - while(itr != m_PersistingSessions.end()) - { - auto link = GetLinkWithSessionByPubkey(itr->first); - if(now < itr->second) - { - if(link && link->HasSessionTo(itr->first)) - { - LogDebug("keepalive to ", itr->first); - link->KeepAliveSessionTo(itr->first); - } - else - { - RouterContact rc; - if(nodedb()->Get(itr->first, rc)) - { - if(rc.IsPublicRouter()) - { - LogDebug("establish to ", itr->first); - TryConnectAsync(rc, 5); - } - } - } - ++itr; - } - else - { - const RouterID r(itr->first); - LogInfo("commit to ", r, " expired"); - itr = m_PersistingSessions.erase(itr); - } - } - } + _linkManager.CheckPersistingSessions(now); const size_t connected = NumberOfConnectedRouters(); const size_t N = nodedb()->num_loaded(); @@ -1228,30 +655,14 @@ namespace llarp { LogInfo("We need at least ", minRequiredRouters, " service nodes to build paths but we have ", N, " in nodedb"); - // TODO: only connect to random subset - if(bootstrapRCList.size()) - { - for(const auto &rc : bootstrapRCList) - { - dht()->impl->ExploreNetworkVia(dht::Key_t{rc.pubkey}); - } - // explore via every connected peer - ForEachPeer([&](ILinkSession *s) { - if(!s->IsEstablished()) - return; - const RouterContact rc = s->GetRemoteRC(); - if(rc.IsPublicRouter()) - dht()->impl->ExploreNetworkVia(dht::Key_t{rc.pubkey}); - }); - } - else - LogError("we have no bootstrap nodes specified"); + + _rcLookupHandler.ExploreNetwork(); } if(connected < minConnectedRouters) { size_t dlt = minConnectedRouters - connected; LogInfo("connecting to ", dlt, " random routers to keep alive"); - ConnectToRandomRouters(dlt); + _outboundSessionMaker.ConnectToRandomRouters(dlt, now); } if(!isSvcNode) @@ -1277,41 +688,6 @@ namespace llarp return CryptoManager::instance()->sign(sig, identity(), buf); } - void - Router::SendTo(RouterID remote, const ILinkMessage *msg, ILinkLayer *selected) - { - metrics::integerTick(msg->Name(), "to", 1, "tx", remote.ToString()); - llarp_buffer_t buf(linkmsg_buffer); - - if(!msg->BEncode(&buf)) - { - LogWarn("failed to encode outbound message, buffer size left: ", - buf.size_left()); - return; - } - // set size of message - buf.sz = buf.cur - buf.base; - buf.cur = buf.base; - LogDebug("send ", buf.sz, " bytes to ", remote); - if(selected) - { - if(selected->SendTo(remote, buf)) - return; - } - for(const auto &link : inboundLinks) - { - if(link->SendTo(remote, buf)) - return; - } - for(const auto &link : outboundLinks) - { - if(link->SendTo(remote, buf)) - return; - } - LogWarn("message to ", remote, " was dropped"); - metrics::integerTick(msg->Name(), "to", 1, "drop", remote.ToString()); - } - void Router::ScheduleTicker(uint64_t ms) { @@ -1323,148 +699,48 @@ namespace llarp { dht::Key_t k(remote); dht()->impl->Nodes()->DelNode(k); - // remove from valid routers if it's a valid router - validRouters.erase(remote); + LogInfo("Session to ", remote, " fully closed"); } - ILinkLayer * - Router::GetLinkWithSessionByPubkey(const RouterID &pubkey) - { - for(const auto &link : outboundLinks) - { - if(link->HasSessionTo(pubkey)) - return link.get(); - } - for(const auto &link : inboundLinks) - { - if(link->HasSessionTo(pubkey)) - return link.get(); - } - return nullptr; - } - - void - Router::FlushOutboundFor(RouterID remote, ILinkLayer *chosen) - { - LogDebug("Flush outbound for ", remote); - - auto itr = outboundMessageQueue.find(remote); - if(itr == outboundMessageQueue.end()) - { - pendingEstablishJobs.erase(remote); - return; - } - // if for some reason we don't provide a link layer pick one that has it - if(!chosen) - { - for(const auto &link : inboundLinks) - { - if(link->HasSessionTo(remote)) - { - chosen = link.get(); - break; - } - } - for(const auto &link : outboundLinks) - { - if(link->HasSessionTo(remote)) - { - chosen = link.get(); - break; - } - } - } - while(itr->second.size()) - { - llarp_buffer_t buf(itr->second.front()); - if(!chosen->SendTo(remote, buf)) - LogWarn("failed to send queued outbound message to ", remote, " via ", - chosen->Name()); - - itr->second.pop(); - } - pendingEstablishJobs.erase(remote); - outboundMessageQueue.erase(itr); - } - - void - Router::DiscardOutboundFor(const RouterID &remote) - { - outboundMessageQueue.erase(remote); - } - bool Router::GetRandomConnectedRouter(RouterContact &result) const { - auto sz = validRouters.size(); - if(sz) - { - auto itr = validRouters.begin(); - if(sz > 1) - std::advance(itr, randint() % sz); - result = itr->second; - return true; - } - return false; + return _linkManager.GetRandomConnectedRouter(result); } - bool - Router::async_verify_RC(const RouterContact rc) + void + Router::HandleDHTLookupForExplore(RouterID remote, + const std::vector< RouterContact > &results) { - if(rc.IsPublicRouter() && whitelistRouters && IsServiceNode()) + for(const auto &rc : results) { - if(lokinetRouters.size() == 0) - { - LogError("we have no service nodes in whitelist"); - return false; - } - if(lokinetRouters.find(rc.pubkey) == lokinetRouters.end()) - { - RouterID sn(rc.pubkey); - LogInfo(sn, " is NOT a valid service node, rejecting"); - return false; - } + _rcLookupHandler.CheckRC(rc); } - if(pendingVerifyRC.count(rc.pubkey)) - return true; - LogInfo("session with ", RouterID(rc.pubkey), " established"); - llarp_async_verify_rc *job = &pendingVerifyRC[rc.pubkey]; - async_verify_context *ctx = new async_verify_context(); - ctx->router = this; - ctx->establish_job = nullptr; + } - auto itr = pendingEstablishJobs.find(rc.pubkey); - if(itr != pendingEstablishJobs.end()) - ctx->establish_job = itr->second.get(); - - job->user = ctx; - job->rc = rc; - job->valid = false; - job->hook = nullptr; - - job->nodedb = _nodedb; - job->logic = _logic; - job->cryptoworker = cryptoworker; - job->diskworker = disk; - if(rc.IsPublicRouter()) - job->hook = &Router::on_verify_server_rc; - else - job->hook = &Router::on_verify_client_rc; - - llarp_nodedb_async_verify(job); - return true; + // TODO: refactor callers and remove this function + void + Router::LookupRouter(RouterID remote, RouterLookupHandler resultHandler) + { + _rcLookupHandler.GetRC( + remote, + [=](const RouterID &id, const RouterContact *const rc, + const RCRequestResult result) { + (void)id; + std::vector< RouterContact > routers; + if(result == RCRequestResult::Success && rc != nullptr) + { + routers.push_back(*rc); + resultHandler(routers); + } + }); } void Router::SetRouterWhitelist(const std::vector< RouterID > &routers) { - lokinetRouters.clear(); - for(const auto &router : routers) - lokinetRouters.emplace(router, - std::numeric_limits< llarp_time_t >::max()); - LogInfo("lokinet service node list now has ", lokinetRouters.size(), - " routers"); + _rcLookupHandler.SetRouterWhitelist(routers); } /// this function ensure there are sane defualts in a net config @@ -1544,18 +820,6 @@ namespace llarp return false; } - for(const auto &rc : bootstrapRCList) - { - if(this->nodedb()->Insert(rc)) - { - LogInfo("added bootstrap node ", RouterID(rc.pubkey)); - } - else - { - LogError("Failed to add bootstrap node ", RouterID(rc.pubkey)); - } - } - routerProfiling().Load(routerProfilesFile.c_str()); Addr publicAddr(this->addrInfo); @@ -1565,14 +829,11 @@ namespace llarp LogDebug("public address:port ", publicAddr); } - LogInfo("You have ", inboundLinks.size(), " inbound links"); - // set public signing key _rc.pubkey = seckey_topublic(identity()); AddressInfo ai; - for(const auto &link : inboundLinks) - { + _linkManager.ForEachInboundLink([&](LinkLayer_ptr link) { if(link->GetOurAddressInfo(ai)) { // override ip and port @@ -1582,21 +843,17 @@ namespace llarp ai.port = publicAddr.port(); } if(IsBogon(ai.ip)) - continue; + return; _rc.addrs.push_back(ai); if(ExitEnabled()) { const llarp::Addr addr(ai); const nuint32_t a{addr.addr4()->s_addr}; _rc.exits.emplace_back(_rc.pubkey, a); - LogInfo( - "Neato teh l33toh, You are a freaking exit relay. w00t!!!!! your " - "exit " - "is advertised as exiting at ", - a); + LogInfo("Exit relay started, advertised as exiting at: ", a); } } - } + }); // set public encryption key _rc.enckey = seckey_topublic(encryption()); @@ -1614,35 +871,15 @@ namespace llarp return false; } - LogInfo("have ", nodedb->num_loaded(), " routers"); - - LogInfo("starting outbound ", outboundLinks.size(), " links"); - for(const auto &link : outboundLinks) + if(!_linkManager.StartLinks(_logic)) { - if(!link->Start(_logic)) - { - LogWarn("outbound link '", link->Name(), "' failed to start"); - return false; - } - } - - int IBLinksStarted = 0; - - // start links - for(const auto &link : inboundLinks) - { - if(link->Start(_logic)) - { - LogDebug("Link ", link->Name(), " started"); - IBLinksStarted++; - } - else - LogWarn("Link ", link->Name(), " failed to start"); + LogWarn("One or more links failed to start."); + return false; } EnsureNetConfigDefaultsSane(netConfig); - if(IBLinksStarted > 0) + if(IsServiceNode()) { // initialize as service node if(!InitServiceNode()) @@ -1685,7 +922,24 @@ namespace llarp LogError("Failed to start hidden service context"); return false; } + llarp_dht_context_start(dht(), pubkey()); + + for(const auto &rc : bootstrapRCList) + { + if(this->nodedb()->Insert(rc)) + { + LogInfo("added bootstrap node ", RouterID(rc.pubkey)); + } + else + { + LogError("Failed to add bootstrap node ", RouterID(rc.pubkey)); + } + _dht->impl->Nodes()->PutNode(rc); + } + + LogInfo("have ", nodedb->num_loaded(), " routers"); + ScheduleTicker(1000); _running.store(true); _startedAt = Now(); @@ -1720,11 +974,7 @@ namespace llarp void Router::StopLinks() { - LogInfo("stopping links"); - for(const auto &link : outboundLinks) - link->Stop(); - for(const auto &link : inboundLinks) - link->Stop(); + _linkManager.Stop(); } void @@ -1741,45 +991,20 @@ namespace llarp _exitContext.Stop(); if(rpcServer) rpcServer->Stop(); - PumpLL(); + _linkManager.PumpLinks(); _logic->call_later({200, this, &RouterAfterStopIssued}); } bool Router::HasSessionTo(const RouterID &remote) const { - for(const auto &link : outboundLinks) - if(link->HasSessionTo(remote)) - return true; - for(const auto &link : inboundLinks) - if(link->HasSessionTo(remote)) - return true; - return false; + return _linkManager.HasSessionTo(remote); } void Router::ConnectToRandomRouters(int want) { - int wanted = want; - Router *self = this; - - self->nodedb()->visit([self, &want](const RouterContact &other) -> bool { - // check if we really want to - if(other.ExpiresSoon(self->Now(), 30000)) - return want > 0; - if(!self->ConnectionToRouterAllowed(other.pubkey)) - return want > 0; - if(randint() % 2 == 0 - && !(self->HasSessionTo(other.pubkey) - || self->HasPendingConnectJob(other.pubkey))) - { - if(self->TryConnectAsync(other, 5)) - --want; - } - return want > 0; - }); - LogInfo("connecting to ", abs(want - wanted), " out of ", wanted, - " random routers"); + _outboundSessionMaker.ConnectToRandomRouters(want, Now()); } bool @@ -1804,18 +1029,51 @@ namespace llarp return true; } + bool + Router::TryConnectAsync(RouterContact rc, uint16_t tries) + { + (void)tries; + + if(rc.pubkey == pubkey()) + { + return false; + } + + if(!_rcLookupHandler.RemoteIsAllowed(rc.pubkey)) + { + return false; + } + + _outboundSessionMaker.CreateSessionTo(rc, nullptr); + + return true; + } + bool Router::InitOutboundLinks() { - if(outboundLinks.size() > 0) - return true; + using LinkFactory = std::function< LinkLayer_ptr( + const SecretKey &, GetRCFunc, LinkMessageHandler, + SessionEstablishedHandler, SessionRenegotiateHandler, SignBufferFunc, + TimeoutHandler, SessionClosedHandler) >; - static std::list< std::function< LinkLayer_ptr(Router *) > > linkFactories = - {utp::NewServerFromRouter, iwp::NewServerFromRouter}; + static std::list< LinkFactory > linkFactories = {utp::NewServer, + iwp::NewServer}; + bool addedAtLeastOne = false; for(const auto &factory : linkFactories) { - auto link = factory(this); + auto link = factory( + encryption(), util::memFn(&AbstractRouter::rc, this), + util::memFn(&AbstractRouter::HandleRecvLinkMessageBuffer, this), + util::memFn(&IOutboundSessionMaker::OnSessionEstablished, + &_outboundSessionMaker), + util::memFn(&AbstractRouter::CheckRenegotiateValid, this), + util::memFn(&AbstractRouter::Sign, this), + util::memFn(&IOutboundSessionMaker::OnConnectTimeout, + &_outboundSessionMaker), + util::memFn(&AbstractRouter::SessionClosed, this)); + if(!link) continue; if(!link->EnsureKeys(transport_keyfile.string().c_str())) @@ -1830,11 +1088,12 @@ namespace llarp { if(!link->Configure(netloop(), "*", af, m_OutboundPort)) continue; - AddLink(std::move(link), false); + _linkManager.AddLink(std::move(link), false); + addedAtLeastOne = true; break; } } - return outboundLinks.size() > 0; + return addedAtLeastOne; } bool @@ -1844,12 +1103,6 @@ namespace llarp return hiddenServiceContext().AddDefaultEndpoint(netConfig); } - bool - Router::HasPendingConnectJob(const RouterID &remote) - { - return pendingEstablishJobs.find(remote) != pendingEstablishJobs.end(); - } - bool Router::LoadHiddenServiceConfig(string_view fname) { @@ -1867,4 +1120,17 @@ namespace llarp } return true; } + + void + Router::MessageSent(const RouterID &remote, SendStatus status) + { + if(status == SendStatus::Success) + { + LogInfo("Message successfully sent to ", remote); + } + else + { + LogWarn("Message failed sending to ", remote); + } + } } // namespace llarp diff --git a/llarp/router/router.hpp b/llarp/router/router.hpp index 6c1c880fa..dd2287004 100644 --- a/llarp/router/router.hpp +++ b/llarp/router/router.hpp @@ -25,6 +25,10 @@ #include #include #include +#include +#include +#include +#include #include #include @@ -50,8 +54,6 @@ bool llarp_loadServiceNodeIdentityKey(const fs::path &fpath, llarp::SecretKey &secretkey); -struct TryConnectJob; - namespace llarp { struct Router final : public AbstractRouter @@ -191,9 +193,6 @@ namespace llarp bool Sign(Signature &sig, const llarp_buffer_t &buf) const override; - // buffer for serializing link messages - std::array< byte_t, MAX_LINK_MSG_SIZE > linkmsg_buffer; - uint16_t m_OutboundPort = 0; /// always maintain this many connections to other routers @@ -233,10 +232,6 @@ namespace llarp /// default network config for default network interface NetConfig_t netConfig; - /// identity keys whitelist of routers we will connect to directly (not for - /// service nodes) - std::set< RouterID > strictConnectPubkeys; - /// bootstrap RCs std::set< RouterContact > bootstrapRCList; @@ -267,55 +262,47 @@ namespace llarp std::string lokidRPCUser; std::string lokidRPCPassword; - using LinkSet = std::set< LinkLayer_ptr, ComparePtr< LinkLayer_ptr > >; - - LinkSet outboundLinks; - LinkSet inboundLinks; - Profiling _routerProfiling; std::string routerProfilesFile = "profiles.dat"; - using MessageQueue = std::queue< std::vector< byte_t > >; + OutboundMessageHandler _outboundMessageHandler; + OutboundSessionMaker _outboundSessionMaker; + LinkManager _linkManager; + RCLookupHandler _rcLookupHandler; - /// outbound message queue - std::unordered_map< RouterID, MessageQueue, RouterID::Hash > - outboundMessageQueue; + IOutboundMessageHandler & + outboundMessageHandler() override + { + return _outboundMessageHandler; + } - /// loki verified routers - std::unordered_map< RouterID, RouterContact, RouterID::Hash > validRouters; + IOutboundSessionMaker & + outboundSessionMaker() override + { + return _outboundSessionMaker; + } - // pending establishing session with routers - std::unordered_map< RouterID, std::shared_ptr< TryConnectJob >, - RouterID::Hash > - pendingEstablishJobs; + ILinkManager & + linkManager() override + { + return _linkManager; + } - // pending RCs to be verified by pubkey - std::unordered_map< RouterID, llarp_async_verify_rc, RouterID::Hash > - pendingVerifyRC; - - // sessions to persist -> timestamp to end persist at - std::unordered_map< RouterID, llarp_time_t, RouterID::Hash > - m_PersistingSessions; - - // lokinet routers from lokid, maps pubkey to when we think it will expire, - // set to max value right now - std::unordered_map< RouterID, llarp_time_t, PubKey::Hash > lokinetRouters; + I_RCLookupHandler & + rcLookupHandler() override + { + return _rcLookupHandler; + } Router(std::shared_ptr< llarp::thread::ThreadPool > worker, llarp_ev_loop_ptr __netloop, std::shared_ptr< Logic > logic); ~Router(); - bool - OnSessionEstablished(ILinkSession *) override; - bool HandleRecvLinkMessageBuffer(ILinkSession *from, const llarp_buffer_t &msg) override; - void - AddLink(std::shared_ptr< ILinkLayer > link, bool outbound = false); - bool InitOutboundLinks(); @@ -341,10 +328,7 @@ namespace llarp AddHiddenService(const service::Config::section_t &config); bool - Configure(Config *conf) override; - - bool - Ready(); + Configure(Config *conf, llarp_nodedb *nodedb = nullptr) override; bool Run(struct llarp_nodedb *nodedb) override; @@ -381,15 +365,6 @@ namespace llarp return seckey_topublic(_identity); } - void - OnConnectTimeout(ILinkSession *session) override; - - bool - HasPendingConnectJob(const RouterID &remote); - - bool - HasPendingRouterLookup(const RouterID &remote) const override; - void try_connect(fs::path rcfile); @@ -397,6 +372,9 @@ namespace llarp bool Reconfigure(Config *conf) override; + bool + TryConnectAsync(RouterContact rc, uint16_t tries) override; + /// validate new configuration against old one /// return true on 100% valid /// return false if not 100% valid @@ -411,37 +389,6 @@ namespace llarp bool SendToOrQueue(const RouterID &remote, const ILinkMessage *msg) override; - /// sendto or drop - void - SendTo(RouterID remote, const ILinkMessage *msg, ILinkLayer *chosen); - - /// manually flush outbound message queue for just 1 router - void - FlushOutboundFor(RouterID remote, ILinkLayer *chosen = nullptr); - - void - LookupRouter(RouterID remote, RouterLookupHandler handler) override; - - /// manually discard all pending messages to remote router - void - DiscardOutboundFor(const RouterID &remote); - - /// try establishing a session to a remote router - void - TryEstablishTo(const RouterID &remote); - - /// lookup a router by pubkey when it expires - void - LookupRouterWhenExpired(RouterID remote); - - void - HandleDHTLookupForExplore( - RouterID remote, const std::vector< RouterContact > &results) override; - - void - HandleRouterLookupForExpireUpdate( - RouterID remote, const std::vector< RouterContact > &results); - void ForEachPeer(std::function< void(const ILinkSession *, bool) > visit, bool randomize = false) const override; @@ -458,10 +405,6 @@ namespace llarp bool CheckRenegotiateValid(RouterContact newRc, RouterContact oldRC) override; - /// flush outbound message queue - void - FlushOutbound(); - /// called by link when a remote session has no more sessions open void SessionClosed(RouterID remote) override; @@ -481,9 +424,6 @@ namespace llarp void ScheduleTicker(uint64_t i = 1000); - ILinkLayer * - GetLinkWithSessionByPubkey(const RouterID &remote); - /// parse a routing message in a buffer and handle it with a handler if /// successful parsing return true on parse and handle success otherwise /// return false @@ -503,52 +443,28 @@ namespace llarp size_t NumberOfConnectedClients() const override; - /// count unique router id's given filter to match session - size_t - NumberOfRoutersMatchingFilter( - std::function< bool(const ILinkSession *) > filter) const; - - /// count the number of connections that match filter - size_t - NumberOfConnectionsMatchingFilter( - std::function< bool(const ILinkSession *) > filter) const; - - bool - TryConnectAsync(RouterContact rc, uint16_t tries) override; - bool GetRandomConnectedRouter(RouterContact &result) const override; - bool - async_verify_RC(const RouterContact rc); + void + HandleDHTLookupForExplore( + RouterID remote, const std::vector< RouterContact > &results) override; void - HandleDHTLookupForSendTo(RouterID remote, - const std::vector< RouterContact > &results); + LookupRouter(RouterID remote, RouterLookupHandler resultHandler) override; bool HasSessionTo(const RouterID &remote) const override; - void - HandleDHTLookupForTryEstablishTo( - RouterID remote, const std::vector< RouterContact > &results); - - static void - on_verify_client_rc(llarp_async_verify_rc *context); - - static void - on_verify_server_rc(llarp_async_verify_rc *context); - static void handle_router_ticker(void *user, uint64_t orig, uint64_t left); - static void - HandleAsyncLoadRCForSendTo(llarp_async_load_rc *async); - private: std::atomic< bool > _stopping; std::atomic< bool > _running; + bool m_isServiceNode = false; + llarp_time_t m_LastStatsReport = 0; bool @@ -572,6 +488,9 @@ namespace llarp bool FromConfig(Config *conf); + + void + MessageSent(const RouterID &remote, SendStatus status); }; } // namespace llarp diff --git a/llarp/router_contact.cpp b/llarp/router_contact.cpp index d5710d04d..a4d3b54ae 100644 --- a/llarp/router_contact.cpp +++ b/llarp/router_contact.cpp @@ -32,12 +32,10 @@ namespace llarp /// 1 day for real network llarp_time_t RouterContact::Lifetime = 24 * 60 * 60 * 1000; #endif - /// every 30 minutes an RC is stale and needs updating - llarp_time_t RouterContact::UpdateInterval = 30 * 60 * 1000; - // 1 minute window for update - llarp_time_t RouterContact::UpdateWindow = 60 * 1000; - // try 5 times to update - int RouterContact::UpdateTries = 5; + /// update RCs every 5 minutes + llarp_time_t RouterContact::UpdateInterval = 5 * 60 * 1000; + /// an RC inserted long enough ago (30 min) is considered stale and is removed + llarp_time_t RouterContact::StaleInsertionAge = 30 * 60 * 1000; NetID::NetID(const byte_t *val) : AlignedBuffer< 8 >() { diff --git a/llarp/router_contact.hpp b/llarp/router_contact.hpp index 75d20641f..7638e167c 100644 --- a/llarp/router_contact.hpp +++ b/llarp/router_contact.hpp @@ -70,8 +70,7 @@ namespace llarp static llarp_time_t Lifetime; static llarp_time_t UpdateInterval; - static llarp_time_t UpdateWindow; - static int UpdateTries; + static llarp_time_t StaleInsertionAge; RouterContact() { diff --git a/llarp/util/threading.hpp b/llarp/util/threading.hpp index 86734f282..6b885f175 100644 --- a/llarp/util/threading.hpp +++ b/llarp/util/threading.hpp @@ -35,9 +35,10 @@ namespace llarp } }; - using Mutex = absl::Mutex; - using Lock = absl::MutexLock; - using Condition = absl::CondVar; + using Mutex = absl::Mutex; + using Lock = absl::MutexLock; + using ReleasableLock = absl::ReleasableMutexLock; + using Condition = absl::CondVar; class Semaphore { diff --git a/llarp/utp/utp.cpp b/llarp/utp/utp.cpp index 495199bcd..b1c2f4d85 100644 --- a/llarp/utp/utp.cpp +++ b/llarp/utp/utp.cpp @@ -18,19 +18,6 @@ namespace llarp reneg, timeout, closed); } - LinkLayer_ptr - NewServerFromRouter(AbstractRouter* r) - { - return NewServer( - r->encryption(), util::memFn(&AbstractRouter::rc, r), - util::memFn(&AbstractRouter::HandleRecvLinkMessageBuffer, r), - util::memFn(&AbstractRouter::OnSessionEstablished, r), - util::memFn(&AbstractRouter::CheckRenegotiateValid, r), - util::memFn(&AbstractRouter::Sign, r), - util::memFn(&AbstractRouter::OnConnectTimeout, r), - util::memFn(&AbstractRouter::SessionClosed, r)); - } - } // namespace utp } // namespace llarp diff --git a/llarp/utp/utp.hpp b/llarp/utp/utp.hpp index fb8284345..fe1ef403e 100644 --- a/llarp/utp/utp.hpp +++ b/llarp/utp/utp.hpp @@ -15,9 +15,6 @@ namespace llarp LinkMessageHandler h, SessionEstablishedHandler est, SessionRenegotiateHandler reneg, SignBufferFunc sign, TimeoutHandler timeout, SessionClosedHandler closed); - - LinkLayer_ptr - NewServerFromRouter(AbstractRouter* r); } // namespace utp } // namespace llarp diff --git a/test/dht/mock_context.hpp b/test/dht/mock_context.hpp index eaa8643f1..0c9d8073c 100644 --- a/test/dht/mock_context.hpp +++ b/test/dht/mock_context.hpp @@ -117,6 +117,8 @@ namespace llarp MOCK_METHOD0(AllowTransit, bool&()); MOCK_CONST_METHOD0(Nodes, dht::Bucket< dht::RCNode >*()); + MOCK_METHOD1(PutRCNodeAsync, void(const dht::RCNode& val)); + MOCK_METHOD1(DelRCNodeAsync, void(const dht::Key_t& val)); }; } // namespace test