lokinet/llarp/link/link_manager.cpp

487 lines
11 KiB
C++
Raw Permalink Normal View History

#include "link_manager.hpp"
#include <llarp/router/i_outbound_session_maker.hpp>
#include <llarp/crypto/crypto.hpp>
#include <algorithm>
#include <set>
namespace llarp
{
LinkLayer_ptr
LinkManager::GetCompatibleLink(const RouterContact& rc) const
{
if (stopping)
return nullptr;
for (auto& link : outboundLinks)
{
// TODO: may want to add some memory of session failures for a given
// router on a given link and not return that link here for a
// duration
if (not link->IsCompatable(rc))
continue;
return link;
}
return nullptr;
}
IOutboundSessionMaker*
LinkManager::GetSessionMaker() const
{
return _sessionMaker;
}
bool
LinkManager::SendTo(
const RouterID& remote,
const llarp_buffer_t& buf,
ILinkSession::CompletionHandler completed,
uint16_t priority)
{
if (stopping)
return false;
auto link = GetLinkWithSessionTo(remote);
if (link == nullptr)
{
if (completed)
2019-07-26 16:19:31 +00:00
{
completed(ILinkSession::DeliveryStatus::eDeliveryDropped);
}
return false;
}
return link->SendTo(remote, buf, completed, priority);
}
bool
LinkManager::HasSessionTo(const RouterID& remote) const
{
return GetLinkWithSessionTo(remote) != nullptr;
}
bool
LinkManager::HasOutboundSessionTo(const RouterID& remote) const
{
for (const auto& link : outboundLinks)
{
if (link->HasSessionTo(remote))
return true;
}
return false;
}
std::optional<bool>
LinkManager::SessionIsClient(RouterID remote) const
{
for (const auto& link : inboundLinks)
{
const auto session = link->FindSessionByPubkey(remote);
if (session)
return not session->IsRelay();
}
if (HasOutboundSessionTo(remote))
return false;
return std::nullopt;
}
void
LinkManager::DeregisterPeer(RouterID remote)
{
m_PersistingSessions.erase(remote);
for (const auto& link : inboundLinks)
{
link->CloseSessionTo(remote);
}
for (const auto& link : outboundLinks)
{
link->CloseSessionTo(remote);
}
LogInfo(remote, " has been de-registered");
}
void
LinkManager::PumpLinks()
{
for (const auto& link : inboundLinks)
{
link->Pump();
}
for (const auto& link : outboundLinks)
{
link->Pump();
}
}
void
LinkManager::AddLink(LinkLayer_ptr link, bool inbound)
{
De-abseil, part 2: mutex, locks, (most) time - util::Mutex is now a std::shared_timed_mutex, which is capable of exclusive and shared locks. - util::Lock is still present as a std::lock_guard<util::Mutex>. - the locking annotations are preserved, but updated to the latest supported by clang rather than using abseil's older/deprecated ones. - ACQUIRE_LOCK macro is gone since we don't pass mutexes by pointer into locks anymore (WTF abseil). - ReleasableLock is gone. Instead there are now some llarp::util helper methods to obtain unique and/or shared locks: - `auto lock = util::unique_lock(mutex);` gets an RAII-but-also unlockable object (std::unique_lock<T>, with T inferred from `mutex`). - `auto lock = util::shared_lock(mutex);` gets an RAII shared (i.e. "reader") lock of the mutex. - `auto lock = util::unique_locks(mutex1, mutex2, mutex3);` can be used to atomically lock multiple mutexes at once (returning a tuple of the locks). This are templated on the mutex which makes them a bit more flexible than using a concrete type: they can be used for any type of lockable mutex, not only util::Mutex. (Some of the code here uses them for getting locks around a std::mutex). Until C++17, using the RAII types is painfully verbose: ```C++ // pre-C++17 - needing to figure out the mutex type here is annoying: std::unique_lock<util::Mutex> lock(mutex); // pre-C++17 and even more verbose (but at least the type isn't needed): std::unique_lock<decltype(mutex)> lock(mutex); // our compromise: auto lock = util::unique_lock(mutex); // C++17: std::unique_lock lock(mutex); ``` All of these functions will also warn (under gcc or clang) if you discard the return value. You can also do fancy things like `auto l = util::unique_lock(mutex, std::adopt_lock)` (which lets a lock take over an already-locked mutex). - metrics code is gone, which also removes a big pile of code that was only used by metrics: - llarp::util::Scheduler - llarp::thread::TimerQueue - llarp::util::Stopwatch
2020-02-21 17:21:11 +00:00
util::Lock l(_mutex);
if (inbound)
{
inboundLinks.emplace(link);
}
else
{
outboundLinks.emplace(link);
}
}
bool
2021-03-02 22:27:35 +00:00
LinkManager::StartLinks()
{
LogInfo("starting ", outboundLinks.size(), " outbound links");
for (const auto& link : outboundLinks)
{
2021-03-02 22:27:35 +00:00
if (!link->Start())
{
LogWarn("outbound link '", link->Name(), "' failed to start");
return false;
}
LogDebug("Outbound Link ", link->Name(), " started");
}
if (inboundLinks.size())
{
LogInfo("starting ", inboundLinks.size(), " inbound links");
for (const auto& link : inboundLinks)
{
2021-03-02 22:27:35 +00:00
if (!link->Start())
{
LogWarn("Link ", link->Name(), " failed to start");
return false;
}
LogDebug("Inbound Link ", link->Name(), " started");
}
}
return true;
}
void
LinkManager::Stop()
{
if (stopping)
{
return;
}
De-abseil, part 2: mutex, locks, (most) time - util::Mutex is now a std::shared_timed_mutex, which is capable of exclusive and shared locks. - util::Lock is still present as a std::lock_guard<util::Mutex>. - the locking annotations are preserved, but updated to the latest supported by clang rather than using abseil's older/deprecated ones. - ACQUIRE_LOCK macro is gone since we don't pass mutexes by pointer into locks anymore (WTF abseil). - ReleasableLock is gone. Instead there are now some llarp::util helper methods to obtain unique and/or shared locks: - `auto lock = util::unique_lock(mutex);` gets an RAII-but-also unlockable object (std::unique_lock<T>, with T inferred from `mutex`). - `auto lock = util::shared_lock(mutex);` gets an RAII shared (i.e. "reader") lock of the mutex. - `auto lock = util::unique_locks(mutex1, mutex2, mutex3);` can be used to atomically lock multiple mutexes at once (returning a tuple of the locks). This are templated on the mutex which makes them a bit more flexible than using a concrete type: they can be used for any type of lockable mutex, not only util::Mutex. (Some of the code here uses them for getting locks around a std::mutex). Until C++17, using the RAII types is painfully verbose: ```C++ // pre-C++17 - needing to figure out the mutex type here is annoying: std::unique_lock<util::Mutex> lock(mutex); // pre-C++17 and even more verbose (but at least the type isn't needed): std::unique_lock<decltype(mutex)> lock(mutex); // our compromise: auto lock = util::unique_lock(mutex); // C++17: std::unique_lock lock(mutex); ``` All of these functions will also warn (under gcc or clang) if you discard the return value. You can also do fancy things like `auto l = util::unique_lock(mutex, std::adopt_lock)` (which lets a lock take over an already-locked mutex). - metrics code is gone, which also removes a big pile of code that was only used by metrics: - llarp::util::Scheduler - llarp::thread::TimerQueue - llarp::util::Stopwatch
2020-02-21 17:21:11 +00:00
util::Lock l(_mutex);
LogInfo("stopping links");
stopping = true;
for (const auto& link : outboundLinks)
link->Stop();
for (const auto& link : inboundLinks)
link->Stop();
}
void
LinkManager::PersistSessionUntil(const RouterID& remote, llarp_time_t until)
{
if (stopping)
return;
De-abseil, part 2: mutex, locks, (most) time - util::Mutex is now a std::shared_timed_mutex, which is capable of exclusive and shared locks. - util::Lock is still present as a std::lock_guard<util::Mutex>. - the locking annotations are preserved, but updated to the latest supported by clang rather than using abseil's older/deprecated ones. - ACQUIRE_LOCK macro is gone since we don't pass mutexes by pointer into locks anymore (WTF abseil). - ReleasableLock is gone. Instead there are now some llarp::util helper methods to obtain unique and/or shared locks: - `auto lock = util::unique_lock(mutex);` gets an RAII-but-also unlockable object (std::unique_lock<T>, with T inferred from `mutex`). - `auto lock = util::shared_lock(mutex);` gets an RAII shared (i.e. "reader") lock of the mutex. - `auto lock = util::unique_locks(mutex1, mutex2, mutex3);` can be used to atomically lock multiple mutexes at once (returning a tuple of the locks). This are templated on the mutex which makes them a bit more flexible than using a concrete type: they can be used for any type of lockable mutex, not only util::Mutex. (Some of the code here uses them for getting locks around a std::mutex). Until C++17, using the RAII types is painfully verbose: ```C++ // pre-C++17 - needing to figure out the mutex type here is annoying: std::unique_lock<util::Mutex> lock(mutex); // pre-C++17 and even more verbose (but at least the type isn't needed): std::unique_lock<decltype(mutex)> lock(mutex); // our compromise: auto lock = util::unique_lock(mutex); // C++17: std::unique_lock lock(mutex); ``` All of these functions will also warn (under gcc or clang) if you discard the return value. You can also do fancy things like `auto l = util::unique_lock(mutex, std::adopt_lock)` (which lets a lock take over an already-locked mutex). - metrics code is gone, which also removes a big pile of code that was only used by metrics: - llarp::util::Scheduler - llarp::thread::TimerQueue - llarp::util::Stopwatch
2020-02-21 17:21:11 +00:00
util::Lock l(_mutex);
2021-06-06 14:51:29 +00:00
m_PersistingSessions[remote] = std::max(until, m_PersistingSessions[remote]);
2021-06-07 12:39:38 +00:00
if (auto maybe = SessionIsClient(remote))
{
if (*maybe)
{
// mark this as a client so we don't try to back connect
m_Clients.Upsert(remote);
}
}
}
void
LinkManager::ForEachPeer(
std::function<void(const ILinkSession*, bool)> visit, bool randomize) const
{
if (stopping)
return;
for (const auto& link : outboundLinks)
{
link->ForEachSession([visit](const ILinkSession* peer) { visit(peer, true); }, randomize);
}
for (const auto& link : inboundLinks)
{
link->ForEachSession([visit](const ILinkSession* peer) { visit(peer, false); }, randomize);
}
}
void
LinkManager::ForEachPeer(std::function<void(ILinkSession*)> visit)
{
if (stopping)
return;
for (const auto& link : outboundLinks)
{
link->ForEachSession([visit](ILinkSession* peer) { visit(peer); });
}
for (const auto& link : inboundLinks)
{
link->ForEachSession([visit](ILinkSession* peer) { visit(peer); });
}
}
void
LinkManager::ForEachInboundLink(std::function<void(LinkLayer_ptr)> visit) const
{
for (const auto& link : inboundLinks)
{
visit(link);
}
}
void
LinkManager::ForEachOutboundLink(std::function<void(LinkLayer_ptr)> visit) const
{
for (const auto& link : outboundLinks)
{
visit(link);
}
}
size_t
LinkManager::NumberOfConnectedRouters() const
{
std::set<RouterID> connectedRouters;
auto fn = [&connectedRouters](const ILinkSession* session, bool) {
if (session->IsEstablished())
{
const RouterContact rc(session->GetRemoteRC());
if (rc.IsPublicRouter())
{
connectedRouters.insert(rc.pubkey);
}
}
};
ForEachPeer(fn);
return connectedRouters.size();
}
size_t
LinkManager::NumberOfConnectedClients() const
{
std::set<RouterID> connectedClients;
auto fn = [&connectedClients](const ILinkSession* session, bool) {
if (session->IsEstablished())
{
const RouterContact rc(session->GetRemoteRC());
if (!rc.IsPublicRouter())
{
connectedClients.insert(rc.pubkey);
}
}
};
ForEachPeer(fn);
return connectedClients.size();
}
2019-12-03 17:03:19 +00:00
size_t
LinkManager::NumberOfPendingConnections() const
{
size_t pending = 0;
for (const auto& link : inboundLinks)
2019-12-03 17:03:19 +00:00
{
pending += link->NumberOfPendingSessions();
}
for (const auto& link : outboundLinks)
2019-12-03 17:03:19 +00:00
{
pending += link->NumberOfPendingSessions();
}
return pending;
}
bool
LinkManager::GetRandomConnectedRouter(RouterContact& router) const
{
std::unordered_map<RouterID, RouterContact> connectedRouters;
ForEachPeer(
[&connectedRouters](const ILinkSession* peer, bool unused) {
(void)unused;
connectedRouters[peer->GetPubKey()] = peer->GetRemoteRC();
},
false);
const auto sz = connectedRouters.size();
if (sz)
{
auto itr = connectedRouters.begin();
if (sz > 1)
{
std::advance(itr, randint() % sz);
}
router = itr->second;
return true;
}
return false;
}
void
LinkManager::CheckPersistingSessions(llarp_time_t now)
{
if (stopping)
return;
std::vector<RouterID> sessionsNeeded;
2021-06-07 12:39:38 +00:00
std::vector<RouterID> sessionsClosed;
{
De-abseil, part 2: mutex, locks, (most) time - util::Mutex is now a std::shared_timed_mutex, which is capable of exclusive and shared locks. - util::Lock is still present as a std::lock_guard<util::Mutex>. - the locking annotations are preserved, but updated to the latest supported by clang rather than using abseil's older/deprecated ones. - ACQUIRE_LOCK macro is gone since we don't pass mutexes by pointer into locks anymore (WTF abseil). - ReleasableLock is gone. Instead there are now some llarp::util helper methods to obtain unique and/or shared locks: - `auto lock = util::unique_lock(mutex);` gets an RAII-but-also unlockable object (std::unique_lock<T>, with T inferred from `mutex`). - `auto lock = util::shared_lock(mutex);` gets an RAII shared (i.e. "reader") lock of the mutex. - `auto lock = util::unique_locks(mutex1, mutex2, mutex3);` can be used to atomically lock multiple mutexes at once (returning a tuple of the locks). This are templated on the mutex which makes them a bit more flexible than using a concrete type: they can be used for any type of lockable mutex, not only util::Mutex. (Some of the code here uses them for getting locks around a std::mutex). Until C++17, using the RAII types is painfully verbose: ```C++ // pre-C++17 - needing to figure out the mutex type here is annoying: std::unique_lock<util::Mutex> lock(mutex); // pre-C++17 and even more verbose (but at least the type isn't needed): std::unique_lock<decltype(mutex)> lock(mutex); // our compromise: auto lock = util::unique_lock(mutex); // C++17: std::unique_lock lock(mutex); ``` All of these functions will also warn (under gcc or clang) if you discard the return value. You can also do fancy things like `auto l = util::unique_lock(mutex, std::adopt_lock)` (which lets a lock take over an already-locked mutex). - metrics code is gone, which also removes a big pile of code that was only used by metrics: - llarp::util::Scheduler - llarp::thread::TimerQueue - llarp::util::Stopwatch
2020-02-21 17:21:11 +00:00
util::Lock l(_mutex);
2021-06-07 12:39:38 +00:00
for (auto [remote, until] : m_PersistingSessions)
{
2021-06-07 12:39:38 +00:00
if (now < until)
{
2021-06-07 12:39:38 +00:00
auto link = GetLinkWithSessionTo(remote);
if (link)
{
2021-06-07 12:39:38 +00:00
link->KeepAliveSessionTo(remote);
}
2021-06-07 12:39:38 +00:00
else if (not m_Clients.Contains(remote))
{
2021-06-07 12:39:38 +00:00
sessionsNeeded.push_back(remote);
}
}
2021-06-07 12:39:38 +00:00
else if (not m_Clients.Contains(remote))
{
2021-06-07 12:39:38 +00:00
sessionsClosed.push_back(remote);
}
}
}
for (const auto& router : sessionsNeeded)
{
2021-06-10 00:15:06 +00:00
LogDebug("ensuring session to ", router, " for previously made commitment");
_sessionMaker->CreateSessionTo(router, nullptr);
}
2021-06-07 12:39:38 +00:00
2021-06-08 13:01:01 +00:00
for (const auto& router : sessionsClosed)
{
m_PersistingSessions.erase(router);
ForEachOutboundLink([router](auto link) { link->CloseSessionTo(router); });
}
}
2020-06-04 16:00:30 +00:00
void
LinkManager::updatePeerDb(std::shared_ptr<PeerDb> peerDb)
{
std::vector<std::pair<RouterID, SessionStats>> statsToUpdate;
int64_t diffTotalTX = 0;
ForEachPeer([&](ILinkSession* session) {
// derive RouterID
RouterID id = RouterID(session->GetRemoteRC().pubkey);
SessionStats sessionStats = session->GetSessionStats();
SessionStats diff;
SessionStats& lastStats = m_lastRouterStats[id];
// TODO: operator overloads / member func for diff
diff.currentRateRX = std::max(sessionStats.currentRateRX, lastStats.currentRateRX);
diff.currentRateTX = std::max(sessionStats.currentRateTX, lastStats.currentRateTX);
diff.totalPacketsRX = sessionStats.totalPacketsRX - lastStats.totalPacketsRX;
diff.totalAckedTX = sessionStats.totalAckedTX - lastStats.totalAckedTX;
diff.totalDroppedTX = sessionStats.totalDroppedTX - lastStats.totalDroppedTX;
diffTotalTX = diff.totalAckedTX + diff.totalDroppedTX + diff.totalInFlightTX;
lastStats = sessionStats;
// TODO: if we have both inbound and outbound session, this will overwrite
statsToUpdate.push_back({id, diff});
});
for (auto& routerStats : statsToUpdate)
{
peerDb->modifyPeerStats(routerStats.first, [&](PeerStats& stats) {
// TODO: store separate stats for up vs down
const auto& diff = routerStats.second;
// note that 'currentRateRX' and 'currentRateTX' are per-second
stats.peakBandwidthBytesPerSec = std::max(
stats.peakBandwidthBytesPerSec,
(double)std::max(diff.currentRateRX, diff.currentRateTX));
stats.numPacketsDropped += diff.totalDroppedTX;
stats.numPacketsSent = diff.totalAckedTX;
stats.numPacketsAttempted = diffTotalTX;
// TODO: others -- we have slight mismatch on what we store
});
}
}
util::StatusObject
LinkManager::ExtractStatus() const
{
std::vector<util::StatusObject> ob_links, ib_links;
std::transform(
inboundLinks.begin(),
inboundLinks.end(),
std::back_inserter(ib_links),
[](const auto& link) -> util::StatusObject { return link->ExtractStatus(); });
std::transform(
outboundLinks.begin(),
outboundLinks.end(),
std::back_inserter(ob_links),
[](const auto& link) -> util::StatusObject { return link->ExtractStatus(); });
util::StatusObject obj{{"outbound", ob_links}, {"inbound", ib_links}};
return obj;
}
void
LinkManager::Init(IOutboundSessionMaker* sessionMaker)
{
stopping = false;
_sessionMaker = sessionMaker;
}
LinkLayer_ptr
LinkManager::GetLinkWithSessionTo(const RouterID& remote) const
{
if (stopping)
return nullptr;
for (const auto& link : outboundLinks)
{
if (link->HasSessionTo(remote))
{
return link;
}
}
for (const auto& link : inboundLinks)
{
if (link->HasSessionTo(remote))
{
return link;
}
}
return nullptr;
}
} // namespace llarp