Merge pull request #728 from tewinget/refactor-router-code

Refactor router code
This commit is contained in:
Jeff 2019-07-25 14:23:47 -04:00 committed by GitHub
commit f7e05ad13a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 2294 additions and 1180 deletions

View File

@ -1,6 +1,11 @@
# Lowest version - android ndk 3.6.0
cmake_minimum_required(VERSION 3.6.0)
find_program(CCACHE_PROGRAM ccache)
if(CCACHE_PROGRAM)
set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE "${CCACHE_PROGRAM}")
endif()
set(PROJECT_NAME lokinet)
project(${PROJECT_NAME} C CXX)
@ -45,11 +50,6 @@ add_definitions(-D${CMAKE_SYSTEM_NAME})
get_filename_component(CORE_INCLUDE "${CMAKE_CURRENT_SOURCE_DIR}/include" ABSOLUTE)
get_filename_component(ABYSS_INCLUDE "${CMAKE_CURRENT_SOURCE_DIR}/${ABYSS}/include" ABSOLUTE)
find_program(CCACHE_PROGRAM ccache)
if(CCACHE_PROGRAM)
set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE "${CCACHE_PROGRAM}")
endif()
if(MSVC_VERSION)
enable_language(ASM_MASM)
list(APPEND CMAKE_ASM_MASM_SOURCE_FILE_EXTENSIONS s)

View File

@ -173,6 +173,8 @@ set(LIB_SRC
iwp/linklayer.cpp
iwp/outermessage.cpp
iwp/iwp.cpp
link/i_link_manager.cpp
link/link_manager.cpp
link/server.cpp
link/session.cpp
messages/dht_immediate.cpp
@ -195,6 +197,12 @@ set(LIB_SRC
pow.cpp
profiling.cpp
router/abstractrouter.cpp
router/i_outbound_message_handler.cpp
router/outbound_message_handler.cpp
router/i_outbound_session_maker.cpp
router/outbound_session_maker.cpp
router/i_rc_lookup_handler.cpp
router/rc_lookup_handler.cpp
router/router.cpp
router_contact.cpp
router_id.cpp

View File

@ -159,8 +159,6 @@ __ ___ ____ _ _ ___ _ _ ____
int
Context::LoadDatabase()
{
nodedb = std::make_unique< llarp_nodedb >(router->diskworker());
if(!llarp_nodedb::ensure_dir(nodedb_dir.c_str()))
{
llarp::LogError("nodedb_dir is incorrect");
@ -233,16 +231,21 @@ __ ___ ____ _ _ ___ _ _ ____
cryptoManager = std::make_unique< CryptoManager >(crypto.get());
router = std::make_unique< Router >(worker, mainloop, logic);
if(!router->Configure(config.get()))
nodedb = std::make_unique< llarp_nodedb >(router->diskworker());
if(!router->Configure(config.get(), nodedb.get()))
{
llarp::LogError("Failed to configure router");
return 1;
}
// must be done after router is made so we can use its disk io worker
// must also be done after configure so that netid is properly set if it
// is provided by config
if(!this->LoadDatabase())
return 1;
return 0;
}

View File

@ -199,6 +199,20 @@ namespace llarp
return _nodes.get();
}
void
PutRCNodeAsync(const RCNode& val) override
{
auto func = std::bind(&Bucket< RCNode >::PutNode, Nodes(), val);
router->logic()->queue_func(func);
}
void
DelRCNodeAsync(const Key_t& val) override
{
auto func = std::bind(&Bucket< RCNode >::DelNode, Nodes(), val);
router->logic()->queue_func(func);
}
const Key_t&
OurKey() const override
{

View File

@ -173,6 +173,12 @@ namespace llarp
virtual Bucket< RCNode >*
Nodes() const = 0;
virtual void
PutRCNodeAsync(const RCNode& val) = 0;
virtual void
DelRCNodeAsync(const Key_t& val) = 0;
virtual util::StatusObject
ExtractStatus() const = 0;

View File

@ -7,19 +7,6 @@ namespace llarp
{
namespace iwp
{
std::unique_ptr< ILinkLayer >
NewServerFromRouter(AbstractRouter* r)
{
return NewServer(
r->encryption(), std::bind(&AbstractRouter::rc, r),
util::memFn(&AbstractRouter::HandleRecvLinkMessageBuffer, r),
util::memFn(&AbstractRouter::OnSessionEstablished, r),
util::memFn(&AbstractRouter::CheckRenegotiateValid, r),
util::memFn(&AbstractRouter::Sign, r),
util::memFn(&AbstractRouter::OnConnectTimeout, r),
util::memFn(&AbstractRouter::SessionClosed, r));
}
std::unique_ptr< ILinkLayer >
NewServer(const SecretKey& enckey, GetRCFunc getrc, LinkMessageHandler h,
SessionEstablishedHandler est, SessionRenegotiateHandler reneg,

View File

@ -18,9 +18,6 @@ namespace llarp
llarp::SignBufferFunc sign, llarp::TimeoutHandler timeout,
llarp::SessionClosedHandler closed);
std::unique_ptr< ILinkLayer >
NewServerFromRouter(AbstractRouter* r);
} // namespace iwp
} // namespace llarp

View File

@ -0,0 +1 @@
#include <link/i_link_manager.hpp>

View File

@ -0,0 +1,85 @@
#ifndef LLARP_I_LINK_MANAGER_HPP
#define LLARP_I_LINK_MANAGER_HPP
#include <link/server.hpp>
#include <util/types.hpp>
#include <util/logic.hpp>
#include <functional>
struct llarp_buffer_t;
namespace llarp
{
using Logic_ptr = std::shared_ptr< Logic >;
struct RouterContact;
struct ILinkSession;
struct IOutboundSessionMaker;
struct RouterID;
namespace util
{
struct StatusObject;
} // namespace util
struct ILinkManager
{
virtual ~ILinkManager() = default;
virtual LinkLayer_ptr
GetCompatibleLink(const RouterContact &rc) const = 0;
virtual IOutboundSessionMaker *
GetSessionMaker() const = 0;
virtual bool
SendTo(const RouterID &remote, const llarp_buffer_t &buf) = 0;
virtual bool
HasSessionTo(const RouterID &remote) const = 0;
virtual void
PumpLinks() = 0;
virtual void
AddLink(LinkLayer_ptr link, bool inbound = false) = 0;
virtual bool
StartLinks(Logic_ptr logic) = 0;
virtual void
Stop() = 0;
virtual void
PersistSessionUntil(const RouterID &remote, llarp_time_t until) = 0;
virtual void
ForEachPeer(std::function< void(const ILinkSession *, bool) > visit,
bool randomize = false) const = 0;
virtual void
ForEachPeer(std::function< void(ILinkSession *) > visit) = 0;
virtual void
ForEachInboundLink(std::function< void(LinkLayer_ptr) > visit) const = 0;
virtual size_t
NumberOfConnectedRouters() const = 0;
virtual size_t
NumberOfConnectedClients() const = 0;
virtual bool
GetRandomConnectedRouter(RouterContact &router) const = 0;
virtual void
CheckPersistingSessions(llarp_time_t now) = 0;
virtual util::StatusObject
ExtractStatus() const = 0;
};
} // namespace llarp
#endif // LLARP_I_LINK_MANAGER_HPP

362
llarp/link/link_manager.cpp Normal file
View File

@ -0,0 +1,362 @@
#include <link/link_manager.hpp>
#include <router/i_outbound_session_maker.hpp>
#include <crypto/crypto.hpp>
#include <algorithm>
#include <set>
namespace llarp
{
LinkLayer_ptr
LinkManager::GetCompatibleLink(const RouterContact &rc) const
{
if(stopping)
return nullptr;
for(auto &link : outboundLinks)
{
// TODO: may want to add some memory of session failures for a given
// router on a given link and not return that link here for a
// duration
if(!link->IsCompatable(rc))
continue;
return link;
}
return nullptr;
}
IOutboundSessionMaker *
LinkManager::GetSessionMaker() const
{
return _sessionMaker;
}
bool
LinkManager::SendTo(const RouterID &remote, const llarp_buffer_t &buf)
{
if(stopping)
return false;
auto link = GetLinkWithSessionTo(remote);
if(link == nullptr)
{
return false;
}
return link->SendTo(remote, buf);
}
bool
LinkManager::HasSessionTo(const RouterID &remote) const
{
return GetLinkWithSessionTo(remote) != nullptr;
}
void
LinkManager::PumpLinks()
{
for(const auto &link : inboundLinks)
{
link->Pump();
}
for(const auto &link : outboundLinks)
{
link->Pump();
}
}
void
LinkManager::AddLink(LinkLayer_ptr link, bool inbound)
{
util::Lock l(&_mutex);
if(inbound)
{
inboundLinks.emplace(link);
}
else
{
outboundLinks.emplace(link);
}
}
bool
LinkManager::StartLinks(Logic_ptr logic)
{
LogInfo("starting ", outboundLinks.size(), " outbound links");
for(const auto &link : outboundLinks)
{
if(!link->Start(logic))
{
LogWarn("outbound link '", link->Name(), "' failed to start");
return false;
}
LogDebug("Outbound Link ", link->Name(), " started");
}
if(inboundLinks.size())
{
LogInfo("starting ", inboundLinks.size(), " inbound links");
for(const auto &link : inboundLinks)
{
if(!link->Start(logic))
{
LogWarn("Link ", link->Name(), " failed to start");
return false;
}
LogDebug("Inbound Link ", link->Name(), " started");
}
}
return true;
}
void
LinkManager::Stop()
{
if(stopping)
{
return;
}
util::Lock l(&_mutex);
LogInfo("stopping links");
stopping = true;
for(const auto &link : outboundLinks)
link->Stop();
for(const auto &link : inboundLinks)
link->Stop();
}
void
LinkManager::PersistSessionUntil(const RouterID &remote, llarp_time_t until)
{
if(stopping)
return;
util::Lock l(&_mutex);
m_PersistingSessions[remote] =
std::max(until, m_PersistingSessions[remote]);
LogDebug("persist session to ", remote, " until ",
m_PersistingSessions[remote]);
}
void
LinkManager::ForEachPeer(
std::function< void(const ILinkSession *, bool) > visit,
bool randomize) const
{
if(stopping)
return;
for(const auto &link : outboundLinks)
{
link->ForEachSession(
[visit](const ILinkSession *peer) { visit(peer, true); }, randomize);
}
for(const auto &link : inboundLinks)
{
link->ForEachSession(
[visit](const ILinkSession *peer) { visit(peer, false); }, randomize);
}
}
void
LinkManager::ForEachPeer(std::function< void(ILinkSession *) > visit)
{
if(stopping)
return;
for(const auto &link : outboundLinks)
{
link->ForEachSession([visit](ILinkSession *peer) { visit(peer); });
}
for(const auto &link : inboundLinks)
{
link->ForEachSession([visit](ILinkSession *peer) { visit(peer); });
}
}
void
LinkManager::ForEachInboundLink(
std::function< void(LinkLayer_ptr) > visit) const
{
for(const auto &link : inboundLinks)
{
visit(link);
}
}
size_t
LinkManager::NumberOfConnectedRouters() const
{
std::set< RouterID > connectedRouters;
auto fn = [&connectedRouters](const ILinkSession *session, bool) {
if(session->IsEstablished())
{
const RouterContact rc(session->GetRemoteRC());
if(rc.IsPublicRouter())
{
connectedRouters.insert(rc.pubkey);
}
}
};
ForEachPeer(fn);
return connectedRouters.size();
}
size_t
LinkManager::NumberOfConnectedClients() const
{
std::set< RouterID > connectedClients;
auto fn = [&connectedClients](const ILinkSession *session, bool) {
if(session->IsEstablished())
{
const RouterContact rc(session->GetRemoteRC());
if(!rc.IsPublicRouter())
{
connectedClients.insert(rc.pubkey);
}
}
};
ForEachPeer(fn);
return connectedClients.size();
}
bool
LinkManager::GetRandomConnectedRouter(RouterContact &router) const
{
std::unordered_map< RouterID, RouterContact, RouterID::Hash >
connectedRouters;
ForEachPeer(
[&connectedRouters](const ILinkSession *peer, bool unused) {
(void)unused;
connectedRouters[peer->GetPubKey()] = peer->GetRemoteRC();
},
false);
const auto sz = connectedRouters.size();
if(sz)
{
auto itr = connectedRouters.begin();
if(sz > 1)
{
std::advance(itr, randint() % sz);
}
router = itr->second;
return true;
}
return false;
}
void
LinkManager::CheckPersistingSessions(llarp_time_t now)
{
if(stopping)
return;
std::vector< RouterID > sessionsNeeded;
{
util::Lock l(&_mutex);
auto itr = m_PersistingSessions.begin();
while(itr != m_PersistingSessions.end())
{
auto link = GetLinkWithSessionTo(itr->first);
if(now < itr->second)
{
if(link)
{
LogDebug("keepalive to ", itr->first);
link->KeepAliveSessionTo(itr->first);
}
else
{
sessionsNeeded.push_back(itr->first);
}
++itr;
}
else
{
const RouterID r(itr->first);
LogInfo("commit to ", r, " expired");
itr = m_PersistingSessions.erase(itr);
}
}
}
for(const auto &router : sessionsNeeded)
{
_sessionMaker->CreateSessionTo(router, nullptr);
}
}
util::StatusObject
LinkManager::ExtractStatus() const
{
std::vector< util::StatusObject > ob_links, ib_links;
std::transform(inboundLinks.begin(), inboundLinks.end(),
std::back_inserter(ib_links),
[](const auto &link) -> util::StatusObject {
return link->ExtractStatus();
});
std::transform(outboundLinks.begin(), outboundLinks.end(),
std::back_inserter(ob_links),
[](const auto &link) -> util::StatusObject {
return link->ExtractStatus();
});
util::StatusObject obj{{"outbound", ob_links}, {"inbound", ib_links}};
return obj;
}
void
LinkManager::Init(IOutboundSessionMaker *sessionMaker)
{
stopping = false;
_sessionMaker = sessionMaker;
}
LinkLayer_ptr
LinkManager::GetLinkWithSessionTo(const RouterID &remote) const
{
if(stopping)
return nullptr;
for(const auto &link : inboundLinks)
{
if(link->HasSessionTo(remote))
{
return link;
}
}
for(const auto &link : outboundLinks)
{
if(link->HasSessionTo(remote))
{
return link;
}
}
return nullptr;
}
} // namespace llarp

100
llarp/link/link_manager.hpp Normal file
View File

@ -0,0 +1,100 @@
#ifndef LLARP_LINK_MANAGER_HPP
#define LLARP_LINK_MANAGER_HPP
#include <link/i_link_manager.hpp>
#include <util/compare_ptr.hpp>
#include <util/threading.hpp>
#include <link/server.hpp>
#include <unordered_map>
#include <set>
#include <atomic>
namespace llarp
{
struct IRouterContactManager;
struct LinkManager final : public ILinkManager
{
public:
~LinkManager() = default;
LinkLayer_ptr
GetCompatibleLink(const RouterContact &rc) const override;
IOutboundSessionMaker *
GetSessionMaker() const override;
bool
SendTo(const RouterID &remote, const llarp_buffer_t &buf) override;
bool
HasSessionTo(const RouterID &remote) const override;
void
PumpLinks() override;
void
AddLink(LinkLayer_ptr link, bool inbound = false) override;
bool
StartLinks(Logic_ptr logic) override;
void
Stop() override;
void
PersistSessionUntil(const RouterID &remote, llarp_time_t until) override;
void
ForEachPeer(std::function< void(const ILinkSession *, bool) > visit,
bool randomize = false) const override;
void
ForEachPeer(std::function< void(ILinkSession *) > visit) override;
void
ForEachInboundLink(
std::function< void(LinkLayer_ptr) > visit) const override;
size_t
NumberOfConnectedRouters() const override;
size_t
NumberOfConnectedClients() const override;
bool
GetRandomConnectedRouter(RouterContact &router) const override;
void
CheckPersistingSessions(llarp_time_t now) override;
virtual util::StatusObject
ExtractStatus() const override;
void
Init(IOutboundSessionMaker *sessionMaker);
private:
LinkLayer_ptr
GetLinkWithSessionTo(const RouterID &remote) const;
std::atomic< bool > stopping;
mutable util::Mutex _mutex; // protects m_PersistingSessions
using LinkSet = std::set< LinkLayer_ptr, ComparePtr< LinkLayer_ptr > >;
LinkSet outboundLinks;
LinkSet inboundLinks;
// sessions to persist -> timestamp to end persist at
std::unordered_map< RouterID, llarp_time_t, RouterID::Hash >
m_PersistingSessions GUARDED_BY(_mutex);
IOutboundSessionMaker *_sessionMaker;
};
} // namespace llarp
#endif // LLARP_LINK_MANAGER_HPP

View File

@ -151,6 +151,13 @@ llarp_nodedb::UpdateAsyncIfNewer(llarp::RouterContact rc,
InsertAsync(rc, logic, completionHandler);
return true;
}
else if(itr != entries.end())
{
// insertion time is set on...insertion. But it should be updated here
// even if there is no insertion of a new RC, to show that the existing one
// is not "stale"
itr->second.inserted = llarp::time_now_ms();
}
return false;
}
@ -189,6 +196,8 @@ llarp_nodedb::Insert(const llarp::RouterContact &rc)
if(itr != entries.end())
entries.erase(itr);
entries.emplace(rc.pubkey.as_array(), rc);
LogInfo("Added or updated RC for ", llarp::RouterID(rc.pubkey),
" to nodedb. Current nodedb count is: ", entries.size());
}
return true;
}
@ -313,6 +322,26 @@ llarp_nodedb::VisitInsertedBefore(
}
}
void
llarp_nodedb::RemoveStaleRCs(const std::set< llarp::RouterID > &keep,
llarp_time_t cutoff)
{
std::set< llarp::RouterID > removeStale;
// remove stale routers
VisitInsertedBefore(
[&](const llarp::RouterContact &rc) {
if(keep.find(rc.pubkey) != keep.end())
return;
LogInfo("removing stale router: ", llarp::RouterID(rc.pubkey));
removeStale.insert(rc.pubkey);
},
cutoff);
RemoveIf([&removeStale](const llarp::RouterContact &rc) -> bool {
return removeStale.count(rc.pubkey) > 0;
});
}
/*
bool
llarp_nodedb::Save()

View File

@ -60,7 +60,7 @@ struct llarp_nodedb
struct NetDBEntry
{
const llarp::RouterContact rc;
const llarp_time_t inserted;
llarp_time_t inserted;
NetDBEntry(const llarp::RouterContact &data);
};
@ -140,6 +140,9 @@ struct llarp_nodedb
VisitInsertedBefore(std::function< void(const llarp::RouterContact &) > visit,
llarp_time_t insertedAfter) LOCKS_EXCLUDED(access);
void
RemoveStaleRCs(const std::set< llarp::RouterID > &keep, llarp_time_t cutoff);
size_t
num_loaded() const LOCKS_EXCLUDED(access);

View File

@ -3,10 +3,6 @@
namespace llarp
{
AbstractRouter::~AbstractRouter()
{
}
void
AbstractRouter::EnsureRouter(RouterID router, RouterLookupHandler handler)
{

View File

@ -24,6 +24,10 @@ namespace llarp
struct Profiling;
struct SecretKey;
struct Signature;
struct IOutboundMessageHandler;
struct IOutboundSessionMaker;
struct ILinkManager;
struct I_RCLookupHandler;
namespace exit
{
@ -52,10 +56,7 @@ namespace llarp
struct AbstractRouter
{
virtual ~AbstractRouter() = 0;
virtual bool
OnSessionEstablished(ILinkSession *) = 0;
virtual ~AbstractRouter() = default;
virtual bool
HandleRecvLinkMessageBuffer(ILinkSession *from,
@ -106,11 +107,23 @@ namespace llarp
virtual const service::Context &
hiddenServiceContext() const = 0;
virtual IOutboundMessageHandler &
outboundMessageHandler() = 0;
virtual IOutboundSessionMaker &
outboundSessionMaker() = 0;
virtual ILinkManager &
linkManager() = 0;
virtual I_RCLookupHandler &
rcLookupHandler() = 0;
virtual bool
Sign(Signature &sig, const llarp_buffer_t &buf) const = 0;
virtual bool
Configure(Config *conf) = 0;
Configure(Config *conf, llarp_nodedb *nodedb) = 0;
virtual bool
Run(struct llarp_nodedb *nodedb) = 0;
@ -129,9 +142,6 @@ namespace llarp
virtual const byte_t *
pubkey() const = 0;
virtual void
OnConnectTimeout(ILinkSession *session) = 0;
/// connect to N random routers
virtual void
ConnectToRandomRouters(int N) = 0;
@ -219,11 +229,6 @@ namespace llarp
virtual bool
HasSessionTo(const RouterID &router) const = 0;
/// return true if we are currently looking up this router either directly
/// or via an anonymous endpoint
virtual bool
HasPendingRouterLookup(const RouterID &router) const = 0;
virtual util::StatusObject
ExtractStatus() const = 0;

View File

@ -0,0 +1 @@
#include <router/i_outbound_message_handler.hpp>

View File

@ -0,0 +1,43 @@
#ifndef LLARP_ROUTER_I_OUTBOUND_MESSAGE_HANDLER_HPP
#define LLARP_ROUTER_I_OUTBOUND_MESSAGE_HANDLER_HPP
#include <cstdint>
#include <functional>
namespace llarp
{
enum class SendStatus
{
Success,
Timeout,
NoLink,
InvalidRouter,
RouterNotFound,
Congestion
};
struct ILinkMessage;
struct RouterID;
namespace util
{
struct StatusObject;
}
using SendStatusHandler = std::function< void(SendStatus) >;
struct IOutboundMessageHandler
{
virtual ~IOutboundMessageHandler() = default;
virtual bool
QueueMessage(const RouterID &remote, const ILinkMessage *msg,
SendStatusHandler callback) = 0;
virtual util::StatusObject
ExtractStatus() const = 0;
};
} // namespace llarp
#endif // LLARP_ROUTER_I_OUTBOUND_MESSAGE_HANDLER_HPP

View File

@ -0,0 +1 @@
#include <router/i_outbound_session_maker.hpp>

View File

@ -0,0 +1,59 @@
#ifndef LLARP_ROUTER_I_OUTBOUND_SESSION_MAKER_HPP
#define LLARP_ROUTER_I_OUTBOUND_SESSION_MAKER_HPP
#include <util/types.hpp>
#include <functional>
namespace llarp
{
namespace util
{
struct StatusObject;
} // namespace util
struct ILinkSession;
struct RouterID;
struct RouterContact;
enum class SessionResult
{
Establish,
Timeout,
RouterNotFound,
InvalidRouter,
NoLink
};
using RouterCallback =
std::function< void(const RouterID &, const SessionResult) >;
struct IOutboundSessionMaker
{
virtual ~IOutboundSessionMaker() = default;
virtual bool
OnSessionEstablished(ILinkSession *session) = 0;
virtual void
OnConnectTimeout(ILinkSession *session) = 0;
virtual void
CreateSessionTo(const RouterID &router, RouterCallback on_result) = 0;
virtual void
CreateSessionTo(const RouterContact &rc, RouterCallback on_result) = 0;
virtual bool
HavePendingSessionTo(const RouterID &router) const = 0;
virtual void
ConnectToRandomRouters(int numDesired, llarp_time_t now) = 0;
virtual util::StatusObject
ExtractStatus() const = 0;
};
} // namespace llarp
#endif // LLARP_ROUTER_I_OUTBOUND_SESSION_MAKER_HPP

View File

@ -0,0 +1 @@
#include <router/i_rc_lookup_handler.hpp>

View File

@ -0,0 +1,63 @@
#ifndef LLARP_I_RC_LOOKUP_HANDLER_HPP
#define LLARP_I_RC_LOOKUP_HANDLER_HPP
#include <util/types.hpp>
#include <router_id.hpp>
#include <memory>
#include <set>
#include <vector>
namespace llarp
{
struct RouterContact;
enum class RCRequestResult
{
Success,
InvalidRouter,
RouterNotFound,
BadRC
};
using RCRequestCallback = std::function< void(
const RouterID &, const RouterContact *const, const RCRequestResult) >;
struct I_RCLookupHandler
{
virtual ~I_RCLookupHandler() = default;
virtual void
AddValidRouter(const RouterID &router) = 0;
virtual void
RemoveValidRouter(const RouterID &router) = 0;
virtual void
SetRouterWhitelist(const std::vector< RouterID > &routers) = 0;
virtual void
GetRC(const RouterID &router, RCRequestCallback callback) = 0;
virtual bool
RemoteIsAllowed(const RouterID &remote) const = 0;
virtual bool
CheckRC(const RouterContact &rc) const = 0;
virtual bool
GetRandomWhitelistRouter(RouterID &router) const = 0;
virtual bool
CheckRenegotiateValid(RouterContact newrc, RouterContact oldrc) = 0;
virtual void
PeriodicUpdate(llarp_time_t now) = 0;
virtual void
ExploreNetwork() = 0;
};
} // namespace llarp
#endif // LLARP_I_RC_LOOKUP_HANDLER_HPP

View File

@ -0,0 +1,224 @@
#include <router/outbound_message_handler.hpp>
#include <messages/link_message.hpp>
#include <router/i_outbound_session_maker.hpp>
#include <link/i_link_manager.hpp>
#include <constants/link_layer.hpp>
#include <util/memfn.hpp>
#include <util/status.hpp>
#include <algorithm>
#include <cstdlib>
namespace llarp
{
bool
OutboundMessageHandler::QueueMessage(const RouterID &remote,
const ILinkMessage *msg,
SendStatusHandler callback)
{
std::array< byte_t, MAX_LINK_MSG_SIZE > linkmsg_buffer;
llarp_buffer_t buf(linkmsg_buffer);
if(!EncodeBuffer(msg, buf))
{
return false;
}
Message message;
message.first.reserve(buf.sz);
message.second = callback;
std::copy_n(buf.base, buf.sz, message.first.data());
if(SendIfSession(remote, message))
{
return true;
}
bool shouldCreateSession = false;
{
util::Lock l(&_mutex);
// create queue for <remote> if it doesn't exist, and get iterator
auto itr_pair = outboundMessageQueue.emplace(remote, MessageQueue());
itr_pair.first->second.push_back(std::move(message));
shouldCreateSession = itr_pair.second;
}
if(shouldCreateSession)
{
QueueSessionCreation(remote);
}
return true;
}
// TODO: this
util::StatusObject
OutboundMessageHandler::ExtractStatus() const
{
util::StatusObject status{};
return status;
}
void
OutboundMessageHandler::Init(ILinkManager *linkManager,
std::shared_ptr< Logic > logic)
{
_linkManager = linkManager;
_logic = logic;
}
void
OutboundMessageHandler::OnSessionEstablished(const RouterID &router)
{
FinalizeRequest(router, SendStatus::Success);
}
void
OutboundMessageHandler::OnConnectTimeout(const RouterID &router)
{
FinalizeRequest(router, SendStatus::Timeout);
}
void
OutboundMessageHandler::OnRouterNotFound(const RouterID &router)
{
FinalizeRequest(router, SendStatus::RouterNotFound);
}
void
OutboundMessageHandler::OnInvalidRouter(const RouterID &router)
{
FinalizeRequest(router, SendStatus::InvalidRouter);
}
void
OutboundMessageHandler::OnNoLink(const RouterID &router)
{
FinalizeRequest(router, SendStatus::NoLink);
}
void
OutboundMessageHandler::OnSessionResult(const RouterID &router,
const SessionResult result)
{
switch(result)
{
case SessionResult::Establish:
OnSessionEstablished(router);
break;
case SessionResult::Timeout:
OnConnectTimeout(router);
break;
case SessionResult::RouterNotFound:
OnRouterNotFound(router);
break;
case SessionResult::InvalidRouter:
OnInvalidRouter(router);
break;
case SessionResult::NoLink:
OnNoLink(router);
break;
default:
LogError("Impossible situation: enum class value out of bounds.");
std::abort();
break;
}
}
void
OutboundMessageHandler::DoCallback(SendStatusHandler callback,
SendStatus status)
{
if(callback)
{
auto func = std::bind(callback, status);
_logic->queue_func(func);
}
}
void
OutboundMessageHandler::QueueSessionCreation(const RouterID &remote)
{
auto fn = util::memFn(&OutboundMessageHandler::OnSessionResult, this);
_linkManager->GetSessionMaker()->CreateSessionTo(remote, fn);
}
bool
OutboundMessageHandler::EncodeBuffer(const ILinkMessage *msg,
llarp_buffer_t &buf)
{
if(!msg->BEncode(&buf))
{
LogWarn("failed to encode outbound message, buffer size left: ",
buf.size_left());
return false;
}
// set size of message
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
return true;
}
bool
OutboundMessageHandler::Send(const RouterID &remote, const Message &msg)
{
llarp_buffer_t buf(msg.first);
if(_linkManager->SendTo(remote, buf))
{
DoCallback(msg.second, SendStatus::Success);
return true;
}
DoCallback(msg.second, SendStatus::Congestion);
return false;
}
bool
OutboundMessageHandler::SendIfSession(const RouterID &remote,
const Message &msg)
{
if(_linkManager->HasSessionTo(remote))
{
return Send(remote, msg);
}
return false;
}
void
OutboundMessageHandler::FinalizeRequest(const RouterID &router,
SendStatus status)
{
MessageQueue movedMessages;
{
util::Lock l(&_mutex);
auto itr = outboundMessageQueue.find(router);
if(itr == outboundMessageQueue.end())
{
return;
}
movedMessages.splice(movedMessages.begin(), itr->second);
outboundMessageQueue.erase(itr);
}
for(const auto &msg : movedMessages)
{
if(status == SendStatus::Success)
{
Send(router, msg);
}
else
{
DoCallback(msg.second, status);
}
}
}
} // namespace llarp

View File

@ -0,0 +1,88 @@
#ifndef LLARP_ROUTER_OUTBOUND_MESSAGE_HANDLER_HPP
#define LLARP_ROUTER_OUTBOUND_MESSAGE_HANDLER_HPP
#include <router/i_outbound_message_handler.hpp>
#include <util/threading.hpp>
#include <util/logic.hpp>
#include <router_id.hpp>
#include <list>
#include <unordered_map>
#include <utility>
struct llarp_buffer_t;
namespace llarp
{
struct ILinkManager;
struct Logic;
enum class SessionResult;
struct OutboundMessageHandler final : public IOutboundMessageHandler
{
public:
~OutboundMessageHandler() = default;
bool
QueueMessage(const RouterID &remote, const ILinkMessage *msg,
SendStatusHandler callback) override;
util::StatusObject
ExtractStatus() const override;
void
Init(ILinkManager *linkManager, std::shared_ptr< Logic > logic);
private:
using Message = std::pair< std::vector< byte_t >, SendStatusHandler >;
using MessageQueue = std::list< Message >;
void
OnSessionEstablished(const RouterID &router);
void
OnConnectTimeout(const RouterID &router);
void
OnRouterNotFound(const RouterID &router);
void
OnInvalidRouter(const RouterID &router);
void
OnNoLink(const RouterID &router);
void
OnSessionResult(const RouterID &router, const SessionResult result);
void
DoCallback(SendStatusHandler callback, SendStatus status);
void
QueueSessionCreation(const RouterID &remote);
bool
EncodeBuffer(const ILinkMessage *msg, llarp_buffer_t &buf);
bool
Send(const RouterID &remote, const Message &msg);
bool
SendIfSession(const RouterID &remote, const Message &msg);
void
FinalizeRequest(const RouterID &router, SendStatus status);
mutable util::Mutex _mutex; // protects outboundMessageQueue
std::unordered_map< RouterID, MessageQueue, RouterID::Hash >
outboundMessageQueue GUARDED_BY(_mutex);
ILinkManager *_linkManager;
std::shared_ptr< Logic > _logic;
};
} // namespace llarp
#endif // LLARP_ROUTER_OUTBOUND_MESSAGE_HANDLER_HPP

View File

@ -0,0 +1,317 @@
#include <router/outbound_session_maker.hpp>
#include <link/server.hpp>
#include <router_contact.hpp>
#include <nodedb.hpp>
#include <router/i_rc_lookup_handler.hpp>
#include <link/i_link_manager.hpp>
#include <util/logic.hpp>
#include <util/memfn.hpp>
#include <util/threading.hpp>
#include <util/status.hpp>
#include <crypto/crypto.hpp>
namespace llarp
{
struct PendingSession
{
// TODO: add session establish status metadata, e.g. num retries
const RouterContact rc;
LinkLayer_ptr link;
size_t attemptCount = 0;
PendingSession(const RouterContact &rc, LinkLayer_ptr link)
: rc(rc), link(link)
{
}
};
bool
OutboundSessionMaker::OnSessionEstablished(ILinkSession *session)
{
// TODO: do we want to keep it
const auto router = RouterID(session->GetPubKey());
const std::string remoteType =
session->GetRemoteRC().IsPublicRouter() ? "router" : "client";
LogInfo("session with ", remoteType, " [", router, "] established");
if(not _rcLookup->RemoteIsAllowed(router))
{
FinalizeRequest(router, SessionResult::InvalidRouter);
return false;
}
auto func = std::bind(&OutboundSessionMaker::VerifyRC, this,
session->GetRemoteRC());
_threadpool->addJob(func);
return true;
}
void
OutboundSessionMaker::OnConnectTimeout(ILinkSession *session)
{
// TODO: retry/num attempts
LogWarn("Session establish attempt to ", RouterID(session->GetPubKey()),
" timed out.");
FinalizeRequest(session->GetPubKey(), SessionResult::Timeout);
}
void
OutboundSessionMaker::CreateSessionTo(const RouterID &router,
RouterCallback on_result)
{
if(on_result)
{
util::Lock l(&_mutex);
auto itr_pair = pendingCallbacks.emplace(router, CallbacksQueue{});
itr_pair.first->second.push_back(on_result);
}
if(HavePendingSessionTo(router))
{
return;
}
CreatePendingSession(router);
LogDebug("Creating session establish attempt to ", router, " .");
auto fn = util::memFn(&OutboundSessionMaker::OnRouterContactResult, this);
_rcLookup->GetRC(router, fn);
}
void
OutboundSessionMaker::CreateSessionTo(const RouterContact &rc,
RouterCallback on_result)
{
if(on_result)
{
util::Lock l(&_mutex);
auto itr_pair = pendingCallbacks.emplace(rc.pubkey, CallbacksQueue{});
itr_pair.first->second.push_back(on_result);
}
if(not HavePendingSessionTo(rc.pubkey))
{
LogDebug("Creating session establish attempt to ", rc.pubkey, " .");
CreatePendingSession(rc.pubkey);
}
GotRouterContact(rc.pubkey, rc);
}
bool
OutboundSessionMaker::HavePendingSessionTo(const RouterID &router) const
{
util::Lock l(&_mutex);
return pendingSessions.find(router) != pendingSessions.end();
}
void
OutboundSessionMaker::ConnectToRandomRouters(int numDesired, llarp_time_t now)
{
int remainingDesired = numDesired;
_nodedb->visit([&](const RouterContact &other) -> bool {
// check if we really remainingDesired to
if(other.ExpiresSoon(now, 30000)) // TODO: make delta configurable
{
return remainingDesired > 0;
}
if(!_rcLookup->RemoteIsAllowed(other.pubkey))
{
return remainingDesired > 0;
}
if(randint() % 2 == 0
&& !(_linkManager->HasSessionTo(other.pubkey)
|| HavePendingSessionTo(other.pubkey)))
{
CreateSessionTo(other, nullptr);
--remainingDesired;
}
return remainingDesired > 0;
});
LogDebug("connecting to ", numDesired - remainingDesired, " out of ",
numDesired, " random routers");
}
// TODO: this
util::StatusObject
OutboundSessionMaker::ExtractStatus() const
{
util::StatusObject status{};
return status;
}
void
OutboundSessionMaker::Init(
ILinkManager *linkManager, I_RCLookupHandler *rcLookup,
std::shared_ptr< Logic > logic, llarp_nodedb *nodedb,
std::shared_ptr< llarp::thread::ThreadPool > threadpool)
{
_linkManager = linkManager;
_rcLookup = rcLookup;
_logic = logic;
_nodedb = nodedb;
_threadpool = threadpool;
}
void
OutboundSessionMaker::DoEstablish(const RouterID &router)
{
util::ReleasableLock l(&_mutex);
auto itr = pendingSessions.find(router);
if(itr == pendingSessions.end())
{
return;
}
const auto &job = itr->second;
if(!job->link->TryEstablishTo(job->rc))
{
// TODO: maybe different failure type?
l.Release();
FinalizeRequest(router, SessionResult::NoLink);
}
}
void
OutboundSessionMaker::GotRouterContact(const RouterID &router,
const RouterContact &rc)
{
{
util::Lock l(&_mutex);
// in case other request found RC for this router after this request was
// made
auto itr = pendingSessions.find(router);
if(itr == pendingSessions.end())
{
return;
}
LinkLayer_ptr link = _linkManager->GetCompatibleLink(rc);
if(!link)
{
FinalizeRequest(router, SessionResult::NoLink);
return;
}
auto session = std::make_shared< PendingSession >(rc, link);
itr->second = session;
}
auto fn = std::bind(&OutboundSessionMaker::DoEstablish, this, router);
_logic->queue_func(fn);
}
void
OutboundSessionMaker::InvalidRouter(const RouterID &router)
{
FinalizeRequest(router, SessionResult::InvalidRouter);
}
void
OutboundSessionMaker::RouterNotFound(const RouterID &router)
{
FinalizeRequest(router, SessionResult::RouterNotFound);
}
void
OutboundSessionMaker::OnRouterContactResult(const RouterID &router,
const RouterContact *const rc,
const RCRequestResult result)
{
if(not HavePendingSessionTo(router))
{
return;
}
switch(result)
{
case RCRequestResult::Success:
if(rc)
{
GotRouterContact(router, *rc);
}
else
{
LogError("RCRequestResult::Success but null rc pointer given");
}
break;
case RCRequestResult::InvalidRouter:
InvalidRouter(router);
break;
case RCRequestResult::RouterNotFound:
RouterNotFound(router);
break;
default:
break;
}
}
void
OutboundSessionMaker::VerifyRC(const RouterContact rc)
{
if(not _rcLookup->CheckRC(rc))
{
FinalizeRequest(rc.pubkey, SessionResult::InvalidRouter);
return;
}
FinalizeRequest(rc.pubkey, SessionResult::Establish);
}
void
OutboundSessionMaker::CreatePendingSession(const RouterID &router)
{
util::Lock l(&_mutex);
pendingSessions.emplace(router, nullptr);
}
void
OutboundSessionMaker::FinalizeRequest(const RouterID &router,
const SessionResult type)
{
CallbacksQueue movedCallbacks;
{
util::Lock l(&_mutex);
// TODO: Router profiling stuff
auto itr = pendingCallbacks.find(router);
if(itr != pendingCallbacks.end())
{
movedCallbacks.splice(movedCallbacks.begin(), itr->second);
pendingCallbacks.erase(itr);
}
}
for(const auto &callback : movedCallbacks)
{
auto func = std::bind(callback, router, type);
_logic->queue_func(func);
}
{
util::Lock l(&_mutex);
pendingSessions.erase(router);
}
}
} // namespace llarp

View File

@ -0,0 +1,103 @@
#ifndef LLARP_ROUTER_OUTBOUND_SESSION_MAKER_HPP
#define LLARP_ROUTER_OUTBOUND_SESSION_MAKER_HPP
#include <router/i_outbound_session_maker.hpp>
#include <router/i_rc_lookup_handler.hpp>
#include <util/threading.hpp>
#include <util/thread_pool.hpp>
#include <util/logic.hpp>
#include <unordered_map>
#include <list>
#include <memory>
struct llarp_nodedb;
namespace llarp
{
struct PendingSession;
struct ILinkManager;
struct I_RCLookupHandler;
struct OutboundSessionMaker final : public IOutboundSessionMaker
{
using CallbacksQueue = std::list< RouterCallback >;
public:
~OutboundSessionMaker() = default;
bool
OnSessionEstablished(ILinkSession *session) override;
void
OnConnectTimeout(ILinkSession *session) override;
void
CreateSessionTo(const RouterID &router,
RouterCallback on_result) /* override */;
void
CreateSessionTo(const RouterContact &rc,
RouterCallback on_result) /* override */;
bool
HavePendingSessionTo(const RouterID &router) const override;
void
ConnectToRandomRouters(int numDesired, llarp_time_t now) override;
util::StatusObject
ExtractStatus() const override;
void
Init(ILinkManager *linkManager, I_RCLookupHandler *rcLookup,
std::shared_ptr< Logic > logic, llarp_nodedb *nodedb,
std::shared_ptr< llarp::thread::ThreadPool > threadpool);
private:
void
DoEstablish(const RouterID &router);
void
GotRouterContact(const RouterID &router, const RouterContact &rc);
void
InvalidRouter(const RouterID &router);
void
RouterNotFound(const RouterID &router);
void
OnRouterContactResult(const RouterID &router, const RouterContact *const rc,
const RCRequestResult result);
void
VerifyRC(const RouterContact rc);
void
CreatePendingSession(const RouterID &router);
void
FinalizeRequest(const RouterID &router, const SessionResult type);
mutable util::Mutex _mutex; // protects pendingSessions, pendingCallbacks
std::unordered_map< RouterID, std::shared_ptr< PendingSession >,
RouterID::Hash >
pendingSessions GUARDED_BY(_mutex);
std::unordered_map< RouterID, CallbacksQueue, RouterID::Hash >
pendingCallbacks GUARDED_BY(_mutex);
ILinkManager *_linkManager;
I_RCLookupHandler *_rcLookup;
std::shared_ptr< Logic > _logic;
llarp_nodedb *_nodedb;
std::shared_ptr< llarp::thread::ThreadPool > _threadpool;
};
} // namespace llarp
#endif // LLARP_ROUTER_OUTBOUND_SESSION_MAKER_HPP

View File

@ -0,0 +1,337 @@
#include <router/rc_lookup_handler.hpp>
#include <link/i_link_manager.hpp>
#include <link/server.hpp>
#include <crypto/crypto.hpp>
#include <service/context.hpp>
#include <router_contact.hpp>
#include <util/memfn.hpp>
#include <util/types.hpp>
#include <util/threading.hpp>
#include <nodedb.hpp>
#include <dht/context.hpp>
#include <iterator>
#include <functional>
namespace llarp
{
void
RCLookupHandler::AddValidRouter(const RouterID &router)
{
util::Lock l(&_mutex);
whitelistRouters.insert(router);
}
void
RCLookupHandler::RemoveValidRouter(const RouterID &router)
{
util::Lock l(&_mutex);
whitelistRouters.erase(router);
}
void
RCLookupHandler::SetRouterWhitelist(const std::vector< RouterID > &routers)
{
util::Lock l(&_mutex);
whitelistRouters.clear();
for(auto &router : routers)
{
whitelistRouters.emplace(router);
}
LogInfo("lokinet service node list now has ", whitelistRouters.size(),
" routers");
}
void
RCLookupHandler::GetRC(const RouterID &router, RCRequestCallback callback)
{
RouterContact remoteRC;
if(_nodedb->Get(router, remoteRC))
{
if(callback)
{
callback(router, &remoteRC, RCRequestResult::Success);
}
FinalizeRequest(router, &remoteRC, RCRequestResult::Success);
return;
}
bool shouldDoLookup = false;
{
util::Lock l(&_mutex);
auto itr_pair = pendingCallbacks.emplace(router, CallbacksQueue{});
if(callback)
{
itr_pair.first->second.push_back(callback);
}
shouldDoLookup = itr_pair.second;
}
if(shouldDoLookup)
{
auto fn = std::bind(&RCLookupHandler::HandleDHTLookupResult, this, router,
std::placeholders::_1);
// if we are a client try using the hidden service endpoints
if(!isServiceNode)
{
bool sent = false;
LogInfo("Lookup ", router, " anonymously");
_hiddenServiceContext->ForEachService(
[&](const std::string &,
const std::shared_ptr< service::Endpoint > &ep) -> bool {
const bool success = ep->LookupRouterAnon(router, fn);
sent = sent || success;
return !success;
});
if(sent)
return;
LogWarn("cannot lookup ", router, " anonymously");
}
if(!_dht->impl->LookupRouter(router, fn))
{
FinalizeRequest(router, nullptr, RCRequestResult::RouterNotFound);
}
}
}
bool
RCLookupHandler::RemoteIsAllowed(const RouterID &remote) const
{
if(_strictConnectPubkeys.size() && _strictConnectPubkeys.count(remote) == 0
&& !RemoteInBootstrap(remote))
{
return false;
}
util::Lock l(&_mutex);
if(useWhitelist && whitelistRouters.find(remote) == whitelistRouters.end())
{
return false;
}
return true;
}
bool
RCLookupHandler::CheckRC(const RouterContact &rc) const
{
if(not RemoteIsAllowed(rc.pubkey))
{
_dht->impl->DelRCNodeAsync(dht::Key_t{rc.pubkey});
return false;
}
if(not rc.Verify(_dht->impl->Now()))
{
return false;
}
// update nodedb if required
if(rc.IsPublicRouter())
{
LogInfo("Adding or updating RC for ", RouterID(rc.pubkey),
" to nodedb and dht.");
_nodedb->UpdateAsyncIfNewer(rc);
_dht->impl->PutRCNodeAsync(rc);
}
return true;
}
bool
RCLookupHandler::GetRandomWhitelistRouter(RouterID &router) const
{
util::Lock l(&_mutex);
const auto sz = whitelistRouters.size();
auto itr = whitelistRouters.begin();
if(sz == 0)
return false;
if(sz > 1)
std::advance(itr, randint() % sz);
router = *itr;
return true;
}
bool
RCLookupHandler::CheckRenegotiateValid(RouterContact newrc,
RouterContact oldrc)
{
// missmatch of identity ?
if(newrc.pubkey != oldrc.pubkey)
return false;
if(!RemoteIsAllowed(newrc.pubkey))
return false;
auto func = std::bind(&RCLookupHandler::CheckRC, this, newrc);
_threadpool->addJob(func);
// update dht if required
if(_dht->impl->Nodes()->HasNode(dht::Key_t{newrc.pubkey}))
{
_dht->impl->Nodes()->PutNode(newrc);
}
// TODO: check for other places that need updating the RC
return true;
}
void
RCLookupHandler::PeriodicUpdate(llarp_time_t now)
{
// try looking up stale routers
std::set< RouterID > routersToLookUp;
_nodedb->VisitInsertedBefore(
[&](const RouterContact &rc) {
if(HavePendingLookup(rc.pubkey))
return;
routersToLookUp.insert(rc.pubkey);
},
now - RouterContact::UpdateInterval);
for(const auto &router : routersToLookUp)
{
GetRC(router, nullptr);
}
_nodedb->RemoveStaleRCs(_bootstrapRouterIDList,
now - RouterContact::StaleInsertionAge);
}
void
RCLookupHandler::ExploreNetwork()
{
if(_bootstrapRCList.size())
{
for(const auto &rc : _bootstrapRCList)
{
LogInfo("Doing explore via bootstrap node: ", RouterID(rc.pubkey));
_dht->impl->ExploreNetworkVia(dht::Key_t{rc.pubkey});
}
}
else
{
LogError("we have no bootstrap nodes specified");
}
// TODO: only explore via random subset
// explore via every connected peer
_linkManager->ForEachPeer([&](ILinkSession *s) {
if(!s->IsEstablished())
return;
const RouterContact rc = s->GetRemoteRC();
if(rc.IsPublicRouter()
&& (_bootstrapRCList.find(rc) == _bootstrapRCList.end()))
{
LogInfo("Doing explore via public node: ", RouterID(rc.pubkey));
_dht->impl->ExploreNetworkVia(dht::Key_t{rc.pubkey});
}
});
}
void
RCLookupHandler::Init(llarp_dht_context *dht, llarp_nodedb *nodedb,
std::shared_ptr< llarp::thread::ThreadPool > threadpool,
ILinkManager *linkManager,
service::Context *hiddenServiceContext,
const std::set< RouterID > &strictConnectPubkeys,
const std::set< RouterContact > &bootstrapRCList,
bool useWhitelist_arg, bool isServiceNode_arg)
{
_dht = dht;
_nodedb = nodedb;
_threadpool = threadpool;
_hiddenServiceContext = hiddenServiceContext;
_strictConnectPubkeys = strictConnectPubkeys;
_bootstrapRCList = bootstrapRCList;
_linkManager = linkManager;
useWhitelist = useWhitelist_arg;
isServiceNode = isServiceNode_arg;
for(const auto &rc : _bootstrapRCList)
{
_bootstrapRouterIDList.insert(rc.pubkey);
}
}
void
RCLookupHandler::HandleDHTLookupResult(
RouterID remote, const std::vector< RouterContact > &results)
{
if(not results.size())
{
FinalizeRequest(remote, nullptr, RCRequestResult::RouterNotFound);
return;
}
if(not RemoteIsAllowed(remote))
{
FinalizeRequest(remote, &results[0], RCRequestResult::InvalidRouter);
return;
}
if(not CheckRC(results[0]))
{
FinalizeRequest(remote, &results[0], RCRequestResult::BadRC);
return;
}
FinalizeRequest(remote, &results[0], RCRequestResult::Success);
}
bool
RCLookupHandler::HavePendingLookup(RouterID remote) const
{
return pendingCallbacks.find(remote) != pendingCallbacks.end();
}
bool
RCLookupHandler::RemoteInBootstrap(const RouterID &remote) const
{
for(const auto &rc : _bootstrapRCList)
{
if(rc.pubkey == remote)
{
return true;
}
}
return false;
}
void
RCLookupHandler::FinalizeRequest(const RouterID &router,
const RouterContact *const rc,
RCRequestResult result)
{
CallbacksQueue movedCallbacks;
{
util::Lock l(&_mutex);
auto itr = pendingCallbacks.find(router);
if(itr != pendingCallbacks.end())
{
movedCallbacks.splice(movedCallbacks.begin(), itr->second);
pendingCallbacks.erase(itr);
}
} // lock
for(const auto &callback : movedCallbacks)
{
callback(router, rc, result);
}
}
} // namespace llarp

View File

@ -0,0 +1,112 @@
#ifndef LLARP_RC_LOOKUP_HANDLER_HPP
#define LLARP_RC_LOOKUP_HANDLER_HPP
#include <router/i_rc_lookup_handler.hpp>
#include <util/threading.hpp>
#include <util/thread_pool.hpp>
#include <unordered_map>
#include <set>
#include <list>
struct llarp_nodedb;
struct llarp_dht_context;
namespace llarp
{
namespace service
{
struct Context;
} // namespace service
struct ILinkManager;
struct RCLookupHandler final : public I_RCLookupHandler
{
public:
using CallbacksQueue = std::list< RCRequestCallback >;
~RCLookupHandler() = default;
void
AddValidRouter(const RouterID &router) override;
void
RemoveValidRouter(const RouterID &router) override;
void
SetRouterWhitelist(const std::vector< RouterID > &routers) override;
void
GetRC(const RouterID &router, RCRequestCallback callback) override;
bool
RemoteIsAllowed(const RouterID &remote) const override;
bool
CheckRC(const RouterContact &rc) const override;
bool
GetRandomWhitelistRouter(RouterID &router) const override;
bool
CheckRenegotiateValid(RouterContact newrc, RouterContact oldrc) override;
void
PeriodicUpdate(llarp_time_t now) override;
void
ExploreNetwork() override;
void
Init(llarp_dht_context *dht, llarp_nodedb *nodedb,
std::shared_ptr< llarp::thread::ThreadPool > threadpool,
ILinkManager *linkManager, service::Context *hiddenServiceContext,
const std::set< RouterID > &strictConnectPubkeys,
const std::set< RouterContact > &bootstrapRCList,
bool useWhitelist_arg, bool isServiceNode_arg);
private:
void
HandleDHTLookupResult(RouterID remote,
const std::vector< RouterContact > &results);
bool
HavePendingLookup(RouterID remote) const;
bool
RemoteInBootstrap(const RouterID &remote) const;
void
FinalizeRequest(const RouterID &router, const RouterContact *const rc,
RCRequestResult result);
mutable util::Mutex _mutex; // protects pendingCallbacks, whitelistRouters
llarp_dht_context *_dht = nullptr;
llarp_nodedb *_nodedb = nullptr;
std::shared_ptr< llarp::thread::ThreadPool > _threadpool = nullptr;
service::Context *_hiddenServiceContext = nullptr;
ILinkManager *_linkManager = nullptr;
/// explicit whitelist of routers we will connect to directly (not for
/// service nodes)
std::set< RouterID > _strictConnectPubkeys;
std::set< RouterContact > _bootstrapRCList;
std::set< RouterID > _bootstrapRouterIDList;
std::unordered_map< RouterID, CallbacksQueue, RouterID::Hash >
pendingCallbacks GUARDED_BY(_mutex);
bool useWhitelist = false;
bool isServiceNode = false;
std::set< RouterID > whitelistRouters GUARDED_BY(_mutex);
};
} // namespace llarp
#endif // LLARP_RC_LOOKUP_HANDLER_HPP

File diff suppressed because it is too large Load Diff

View File

@ -25,6 +25,10 @@
#include <util/status.hpp>
#include <util/str.hpp>
#include <util/threadpool.h>
#include <router/outbound_message_handler.hpp>
#include <router/outbound_session_maker.hpp>
#include <link/link_manager.hpp>
#include <router/rc_lookup_handler.hpp>
#include <functional>
#include <list>
@ -50,8 +54,6 @@ bool
llarp_loadServiceNodeIdentityKey(const fs::path &fpath,
llarp::SecretKey &secretkey);
struct TryConnectJob;
namespace llarp
{
struct Router final : public AbstractRouter
@ -191,9 +193,6 @@ namespace llarp
bool
Sign(Signature &sig, const llarp_buffer_t &buf) const override;
// buffer for serializing link messages
std::array< byte_t, MAX_LINK_MSG_SIZE > linkmsg_buffer;
uint16_t m_OutboundPort = 0;
/// always maintain this many connections to other routers
@ -233,10 +232,6 @@ namespace llarp
/// default network config for default network interface
NetConfig_t netConfig;
/// identity keys whitelist of routers we will connect to directly (not for
/// service nodes)
std::set< RouterID > strictConnectPubkeys;
/// bootstrap RCs
std::set< RouterContact > bootstrapRCList;
@ -267,55 +262,47 @@ namespace llarp
std::string lokidRPCUser;
std::string lokidRPCPassword;
using LinkSet = std::set< LinkLayer_ptr, ComparePtr< LinkLayer_ptr > >;
LinkSet outboundLinks;
LinkSet inboundLinks;
Profiling _routerProfiling;
std::string routerProfilesFile = "profiles.dat";
using MessageQueue = std::queue< std::vector< byte_t > >;
OutboundMessageHandler _outboundMessageHandler;
OutboundSessionMaker _outboundSessionMaker;
LinkManager _linkManager;
RCLookupHandler _rcLookupHandler;
/// outbound message queue
std::unordered_map< RouterID, MessageQueue, RouterID::Hash >
outboundMessageQueue;
IOutboundMessageHandler &
outboundMessageHandler() override
{
return _outboundMessageHandler;
}
/// loki verified routers
std::unordered_map< RouterID, RouterContact, RouterID::Hash > validRouters;
IOutboundSessionMaker &
outboundSessionMaker() override
{
return _outboundSessionMaker;
}
// pending establishing session with routers
std::unordered_map< RouterID, std::shared_ptr< TryConnectJob >,
RouterID::Hash >
pendingEstablishJobs;
ILinkManager &
linkManager() override
{
return _linkManager;
}
// pending RCs to be verified by pubkey
std::unordered_map< RouterID, llarp_async_verify_rc, RouterID::Hash >
pendingVerifyRC;
// sessions to persist -> timestamp to end persist at
std::unordered_map< RouterID, llarp_time_t, RouterID::Hash >
m_PersistingSessions;
// lokinet routers from lokid, maps pubkey to when we think it will expire,
// set to max value right now
std::unordered_map< RouterID, llarp_time_t, PubKey::Hash > lokinetRouters;
I_RCLookupHandler &
rcLookupHandler() override
{
return _rcLookupHandler;
}
Router(std::shared_ptr< llarp::thread::ThreadPool > worker,
llarp_ev_loop_ptr __netloop, std::shared_ptr< Logic > logic);
~Router();
bool
OnSessionEstablished(ILinkSession *) override;
bool
HandleRecvLinkMessageBuffer(ILinkSession *from,
const llarp_buffer_t &msg) override;
void
AddLink(std::shared_ptr< ILinkLayer > link, bool outbound = false);
bool
InitOutboundLinks();
@ -341,10 +328,7 @@ namespace llarp
AddHiddenService(const service::Config::section_t &config);
bool
Configure(Config *conf) override;
bool
Ready();
Configure(Config *conf, llarp_nodedb *nodedb = nullptr) override;
bool
Run(struct llarp_nodedb *nodedb) override;
@ -381,15 +365,6 @@ namespace llarp
return seckey_topublic(_identity);
}
void
OnConnectTimeout(ILinkSession *session) override;
bool
HasPendingConnectJob(const RouterID &remote);
bool
HasPendingRouterLookup(const RouterID &remote) const override;
void
try_connect(fs::path rcfile);
@ -397,6 +372,9 @@ namespace llarp
bool
Reconfigure(Config *conf) override;
bool
TryConnectAsync(RouterContact rc, uint16_t tries) override;
/// validate new configuration against old one
/// return true on 100% valid
/// return false if not 100% valid
@ -411,37 +389,6 @@ namespace llarp
bool
SendToOrQueue(const RouterID &remote, const ILinkMessage *msg) override;
/// sendto or drop
void
SendTo(RouterID remote, const ILinkMessage *msg, ILinkLayer *chosen);
/// manually flush outbound message queue for just 1 router
void
FlushOutboundFor(RouterID remote, ILinkLayer *chosen = nullptr);
void
LookupRouter(RouterID remote, RouterLookupHandler handler) override;
/// manually discard all pending messages to remote router
void
DiscardOutboundFor(const RouterID &remote);
/// try establishing a session to a remote router
void
TryEstablishTo(const RouterID &remote);
/// lookup a router by pubkey when it expires
void
LookupRouterWhenExpired(RouterID remote);
void
HandleDHTLookupForExplore(
RouterID remote, const std::vector< RouterContact > &results) override;
void
HandleRouterLookupForExpireUpdate(
RouterID remote, const std::vector< RouterContact > &results);
void
ForEachPeer(std::function< void(const ILinkSession *, bool) > visit,
bool randomize = false) const override;
@ -458,10 +405,6 @@ namespace llarp
bool
CheckRenegotiateValid(RouterContact newRc, RouterContact oldRC) override;
/// flush outbound message queue
void
FlushOutbound();
/// called by link when a remote session has no more sessions open
void
SessionClosed(RouterID remote) override;
@ -481,9 +424,6 @@ namespace llarp
void
ScheduleTicker(uint64_t i = 1000);
ILinkLayer *
GetLinkWithSessionByPubkey(const RouterID &remote);
/// parse a routing message in a buffer and handle it with a handler if
/// successful parsing return true on parse and handle success otherwise
/// return false
@ -503,52 +443,28 @@ namespace llarp
size_t
NumberOfConnectedClients() const override;
/// count unique router id's given filter to match session
size_t
NumberOfRoutersMatchingFilter(
std::function< bool(const ILinkSession *) > filter) const;
/// count the number of connections that match filter
size_t
NumberOfConnectionsMatchingFilter(
std::function< bool(const ILinkSession *) > filter) const;
bool
TryConnectAsync(RouterContact rc, uint16_t tries) override;
bool
GetRandomConnectedRouter(RouterContact &result) const override;
bool
async_verify_RC(const RouterContact rc);
void
HandleDHTLookupForExplore(
RouterID remote, const std::vector< RouterContact > &results) override;
void
HandleDHTLookupForSendTo(RouterID remote,
const std::vector< RouterContact > &results);
LookupRouter(RouterID remote, RouterLookupHandler resultHandler) override;
bool
HasSessionTo(const RouterID &remote) const override;
void
HandleDHTLookupForTryEstablishTo(
RouterID remote, const std::vector< RouterContact > &results);
static void
on_verify_client_rc(llarp_async_verify_rc *context);
static void
on_verify_server_rc(llarp_async_verify_rc *context);
static void
handle_router_ticker(void *user, uint64_t orig, uint64_t left);
static void
HandleAsyncLoadRCForSendTo(llarp_async_load_rc *async);
private:
std::atomic< bool > _stopping;
std::atomic< bool > _running;
bool m_isServiceNode = false;
llarp_time_t m_LastStatsReport = 0;
bool
@ -572,6 +488,9 @@ namespace llarp
bool
FromConfig(Config *conf);
void
MessageSent(const RouterID &remote, SendStatus status);
};
} // namespace llarp

View File

@ -32,12 +32,10 @@ namespace llarp
/// 1 day for real network
llarp_time_t RouterContact::Lifetime = 24 * 60 * 60 * 1000;
#endif
/// every 30 minutes an RC is stale and needs updating
llarp_time_t RouterContact::UpdateInterval = 30 * 60 * 1000;
// 1 minute window for update
llarp_time_t RouterContact::UpdateWindow = 60 * 1000;
// try 5 times to update
int RouterContact::UpdateTries = 5;
/// update RCs every 5 minutes
llarp_time_t RouterContact::UpdateInterval = 5 * 60 * 1000;
/// an RC inserted long enough ago (30 min) is considered stale and is removed
llarp_time_t RouterContact::StaleInsertionAge = 30 * 60 * 1000;
NetID::NetID(const byte_t *val) : AlignedBuffer< 8 >()
{

View File

@ -70,8 +70,7 @@ namespace llarp
static llarp_time_t Lifetime;
static llarp_time_t UpdateInterval;
static llarp_time_t UpdateWindow;
static int UpdateTries;
static llarp_time_t StaleInsertionAge;
RouterContact()
{

View File

@ -37,6 +37,7 @@ namespace llarp
using Mutex = absl::Mutex;
using Lock = absl::MutexLock;
using ReleasableLock = absl::ReleasableMutexLock;
using Condition = absl::CondVar;
class Semaphore

View File

@ -18,19 +18,6 @@ namespace llarp
reneg, timeout, closed);
}
LinkLayer_ptr
NewServerFromRouter(AbstractRouter* r)
{
return NewServer(
r->encryption(), util::memFn(&AbstractRouter::rc, r),
util::memFn(&AbstractRouter::HandleRecvLinkMessageBuffer, r),
util::memFn(&AbstractRouter::OnSessionEstablished, r),
util::memFn(&AbstractRouter::CheckRenegotiateValid, r),
util::memFn(&AbstractRouter::Sign, r),
util::memFn(&AbstractRouter::OnConnectTimeout, r),
util::memFn(&AbstractRouter::SessionClosed, r));
}
} // namespace utp
} // namespace llarp

View File

@ -15,9 +15,6 @@ namespace llarp
LinkMessageHandler h, SessionEstablishedHandler est,
SessionRenegotiateHandler reneg, SignBufferFunc sign,
TimeoutHandler timeout, SessionClosedHandler closed);
LinkLayer_ptr
NewServerFromRouter(AbstractRouter* r);
} // namespace utp
} // namespace llarp

View File

@ -117,6 +117,8 @@ namespace llarp
MOCK_METHOD0(AllowTransit, bool&());
MOCK_CONST_METHOD0(Nodes, dht::Bucket< dht::RCNode >*());
MOCK_METHOD1(PutRCNodeAsync, void(const dht::RCNode& val));
MOCK_METHOD1(DelRCNodeAsync, void(const dht::Key_t& val));
};
} // namespace test