lokinet/llarp/router/outbound_session_maker.cpp
Jason Rhinelander b4440094b0 De-abseil, part 2: mutex, locks, (most) time
- util::Mutex is now a std::shared_timed_mutex, which is capable of
  exclusive and shared locks.

- util::Lock is still present as a std::lock_guard<util::Mutex>.

- the locking annotations are preserved, but updated to the latest
  supported by clang rather than using abseil's older/deprecated ones.

- ACQUIRE_LOCK macro is gone since we don't pass mutexes by pointer into
  locks anymore (WTF abseil).

- ReleasableLock is gone.  Instead there are now some llarp::util helper
  methods to obtain unique and/or shared locks:
    - `auto lock = util::unique_lock(mutex);` gets an RAII-but-also
      unlockable object (std::unique_lock<T>, with T inferred from
      `mutex`).
    - `auto lock = util::shared_lock(mutex);` gets an RAII shared (i.e.
      "reader") lock of the mutex.
    - `auto lock = util::unique_locks(mutex1, mutex2, mutex3);` can be
      used to atomically lock multiple mutexes at once (returning a
      tuple of the locks).
  This are templated on the mutex which makes them a bit more flexible
  than using a concrete type: they can be used for any type of lockable
  mutex, not only util::Mutex.  (Some of the code here uses them for
  getting locks around a std::mutex).  Until C++17, using the RAII types
  is painfully verbose:

  ```C++
  // pre-C++17 - needing to figure out the mutex type here is annoying:
  std::unique_lock<util::Mutex> lock(mutex);
  // pre-C++17 and even more verbose (but at least the type isn't needed):
  std::unique_lock<decltype(mutex)> lock(mutex);
  // our compromise:
  auto lock = util::unique_lock(mutex);
  // C++17:
  std::unique_lock lock(mutex);
  ```

  All of these functions will also warn (under gcc or clang) if you
  discard the return value.  You can also do fancy things like
  `auto l = util::unique_lock(mutex, std::adopt_lock)` (which lets a
  lock take over an already-locked mutex).

- metrics code is gone, which also removes a big pile of code that was
  only used by metrics:
  - llarp::util::Scheduler
  - llarp:🧵:TimerQueue
  - llarp::util::Stopwatch
2020-02-21 23:22:47 -04:00

347 lines
8.3 KiB
C++

#include <router/outbound_session_maker.hpp>
#include <link/server.hpp>
#include <router_contact.hpp>
#include <nodedb.hpp>
#include <router/i_rc_lookup_handler.hpp>
#include <link/i_link_manager.hpp>
#include <util/meta/memfn.hpp>
#include <util/thread/logic.hpp>
#include <util/thread/threading.hpp>
#include <util/status.hpp>
#include <crypto/crypto.hpp>
#include <utility>
namespace llarp
{
struct PendingSession
{
// TODO: add session establish status metadata, e.g. num retries
const RouterContact rc;
LinkLayer_ptr link;
size_t attemptCount = 0;
PendingSession(RouterContact _rc, LinkLayer_ptr _link)
: rc(std::move(_rc)), link(std::move(_link))
{
}
};
bool
OutboundSessionMaker::OnSessionEstablished(ILinkSession *session)
{
// TODO: do we want to keep it
const auto router = RouterID(session->GetPubKey());
const std::string remoteType =
session->GetRemoteRC().IsPublicRouter() ? "router" : "client";
LogInfo("session with ", remoteType, " [", router, "] established");
if(not _rcLookup->RemoteIsAllowed(router))
{
FinalizeRequest(router, SessionResult::InvalidRouter);
return false;
}
auto func = std::bind(&OutboundSessionMaker::VerifyRC, this,
session->GetRemoteRC());
_threadpool->addJob(func);
return true;
}
void
OutboundSessionMaker::OnConnectTimeout(ILinkSession *session)
{
// TODO: retry/num attempts
LogWarn("Session establish attempt to ", RouterID(session->GetPubKey()),
" timed out.", session->GetRemoteEndpoint());
FinalizeRequest(session->GetPubKey(), SessionResult::Timeout);
}
void
OutboundSessionMaker::CreateSessionTo(const RouterID &router,
RouterCallback on_result)
{
if(on_result)
{
util::Lock l(_mutex);
auto itr_pair = pendingCallbacks.emplace(router, CallbacksQueue{});
itr_pair.first->second.push_back(on_result);
}
if(HavePendingSessionTo(router))
{
return;
}
CreatePendingSession(router);
LogDebug("Creating session establish attempt to ", router, " .");
auto fn = util::memFn(&OutboundSessionMaker::OnRouterContactResult, this);
_rcLookup->GetRC(router, fn);
}
void
OutboundSessionMaker::CreateSessionTo(const RouterContact &rc,
RouterCallback on_result)
{
if(on_result)
{
util::Lock l(_mutex);
auto itr_pair = pendingCallbacks.emplace(rc.pubkey, CallbacksQueue{});
itr_pair.first->second.push_back(on_result);
}
if(not HavePendingSessionTo(rc.pubkey))
{
LogDebug("Creating session establish attempt to ", rc.pubkey, " .");
CreatePendingSession(rc.pubkey);
}
GotRouterContact(rc.pubkey, rc);
}
bool
OutboundSessionMaker::HavePendingSessionTo(const RouterID &router) const
{
util::Lock l(_mutex);
return pendingSessions.find(router) != pendingSessions.end();
}
void
OutboundSessionMaker::ConnectToRandomRouters(int numDesired)
{
int remainingDesired = numDesired;
std::set< RouterID > exclude;
do
{
RouterContact other;
if(not _nodedb->select_random_hop_excluding(other, exclude))
break;
exclude.insert(other.pubkey);
if(not _rcLookup->RemoteIsAllowed(other.pubkey))
{
continue;
}
if(not(_linkManager->HasSessionTo(other.pubkey)
|| HavePendingSessionTo(other.pubkey)))
{
CreateSessionTo(other, nullptr);
--remainingDesired;
}
} while(remainingDesired > 0);
LogDebug("connecting to ", numDesired - remainingDesired, " out of ",
numDesired, " random routers");
}
// TODO: this
util::StatusObject
OutboundSessionMaker::ExtractStatus() const
{
util::StatusObject status{};
return status;
}
void
OutboundSessionMaker::Init(
ILinkManager *linkManager, I_RCLookupHandler *rcLookup,
Profiling *profiler, std::shared_ptr< Logic > logic, llarp_nodedb *nodedb,
std::shared_ptr< llarp::thread::ThreadPool > threadpool)
{
_linkManager = linkManager;
_rcLookup = rcLookup;
_logic = logic;
_nodedb = nodedb;
_threadpool = threadpool;
_profiler = profiler;
}
void
OutboundSessionMaker::DoEstablish(const RouterID &router)
{
auto l = util::unique_lock(_mutex);
auto itr = pendingSessions.find(router);
if(itr == pendingSessions.end())
{
return;
}
const auto &job = itr->second;
if(!job->link->TryEstablishTo(job->rc))
{
// TODO: maybe different failure type?
l.unlock();
FinalizeRequest(router, SessionResult::NoLink);
}
}
void
OutboundSessionMaker::GotRouterContact(const RouterID &router,
const RouterContact &rc)
{
{
auto l = util::unique_lock(_mutex);
// in case other request found RC for this router after this request was
// made
auto itr = pendingSessions.find(router);
if(itr == pendingSessions.end())
{
return;
}
LinkLayer_ptr link = _linkManager->GetCompatibleLink(rc);
if(!link)
{
l.unlock();
FinalizeRequest(router, SessionResult::NoLink);
return;
}
auto session = std::make_shared< PendingSession >(rc, link);
itr->second = session;
}
if(ShouldConnectTo(router))
{
auto fn = std::bind(&OutboundSessionMaker::DoEstablish, this, router);
LogicCall(_logic, fn);
}
}
bool
OutboundSessionMaker::ShouldConnectTo(const RouterID &router) const
{
if(router == us)
return false;
size_t numPending = 0;
{
util::Lock lock(_mutex);
if(pendingSessions.find(router) == pendingSessions.end())
numPending += pendingSessions.size();
}
if(_linkManager->HasSessionTo(router))
return false;
return _linkManager->NumberOfConnectedRouters() + numPending
< maxConnectedRouters;
}
void
OutboundSessionMaker::InvalidRouter(const RouterID &router)
{
FinalizeRequest(router, SessionResult::InvalidRouter);
}
void
OutboundSessionMaker::RouterNotFound(const RouterID &router)
{
FinalizeRequest(router, SessionResult::RouterNotFound);
}
void
OutboundSessionMaker::OnRouterContactResult(const RouterID &router,
const RouterContact *const rc,
const RCRequestResult result)
{
if(not HavePendingSessionTo(router))
{
return;
}
switch(result)
{
case RCRequestResult::Success:
if(rc)
{
GotRouterContact(router, *rc);
}
else
{
LogError("RCRequestResult::Success but null rc pointer given");
}
break;
case RCRequestResult::InvalidRouter:
InvalidRouter(router);
break;
case RCRequestResult::RouterNotFound:
RouterNotFound(router);
break;
default:
break;
}
}
void
OutboundSessionMaker::VerifyRC(const RouterContact rc)
{
if(not _rcLookup->CheckRC(rc))
{
FinalizeRequest(rc.pubkey, SessionResult::InvalidRouter);
return;
}
FinalizeRequest(rc.pubkey, SessionResult::Establish);
}
void
OutboundSessionMaker::CreatePendingSession(const RouterID &router)
{
util::Lock l(_mutex);
pendingSessions.emplace(router, nullptr);
}
void
OutboundSessionMaker::FinalizeRequest(const RouterID &router,
const SessionResult type)
{
CallbacksQueue movedCallbacks;
{
util::Lock l(_mutex);
if(type == SessionResult::Establish)
{
_profiler->MarkConnectSuccess(router);
}
else
{
// TODO: add non timeout related fail case
_profiler->MarkConnectTimeout(router);
}
auto itr = pendingCallbacks.find(router);
if(itr != pendingCallbacks.end())
{
movedCallbacks.splice(movedCallbacks.begin(), itr->second);
pendingCallbacks.erase(itr);
}
}
for(const auto &callback : movedCallbacks)
{
auto func = std::bind(callback, router, type);
LogicCall(_logic, func);
}
{
util::Lock l(_mutex);
pendingSessions.erase(router);
}
}
} // namespace llarp