lokinet/llarp/link/server.cpp

485 lines
12 KiB
C++
Raw Normal View History

2018-12-12 01:32:10 +00:00
#include <link/server.hpp>
2019-10-02 13:06:14 +00:00
#include <ev/ev.hpp>
#include <crypto/crypto.hpp>
#include <util/fs.hpp>
2019-08-29 11:45:58 +00:00
#include <utility>
namespace llarp
{
2019-05-08 12:17:48 +00:00
static constexpr size_t MaxSessionsPerKey = 16;
ILinkLayer::ILinkLayer(const SecretKey& routerEncSecret, GetRCFunc getrc,
LinkMessageHandler handler, SignBufferFunc signbuf,
SessionEstablishedHandler establishedSession,
SessionRenegotiateHandler reneg,
2019-11-04 18:53:53 +00:00
TimeoutHandler timeout, SessionClosedHandler closed,
PumpDoneHandler pumpDone)
2019-08-29 11:45:58 +00:00
: HandleMessage(std::move(handler))
, HandleTimeout(std::move(timeout))
, Sign(std::move(signbuf))
, GetOurRC(std::move(getrc))
, SessionEstablished(std::move(establishedSession))
, SessionClosed(std::move(closed))
, SessionRenegotiate(std::move(reneg))
2019-11-04 18:53:53 +00:00
, PumpDone(std::move(pumpDone))
, m_RouterEncSecret(routerEncSecret)
{
}
2019-08-29 11:45:58 +00:00
ILinkLayer::~ILinkLayer() = default;
bool
ILinkLayer::HasSessionTo(const RouterID& id)
{
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
return m_AuthedLinks.find(id) != m_AuthedLinks.end();
}
void
2019-04-08 12:01:52 +00:00
ILinkLayer::ForEachSession(std::function< void(const ILinkSession*) > visit,
bool randomize) const
{
2019-07-03 12:42:11 +00:00
std::vector< std::shared_ptr< ILinkSession > > sessions;
{
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
2019-07-03 12:42:11 +00:00
if(m_AuthedLinks.size() == 0)
return;
const size_t sz = randint() % m_AuthedLinks.size();
auto itr = m_AuthedLinks.begin();
auto begin = itr;
if(randomize)
{
2019-07-03 12:42:11 +00:00
std::advance(itr, sz);
begin = itr;
}
while(itr != m_AuthedLinks.end())
{
sessions.emplace_back(itr->second);
++itr;
}
2019-07-03 12:42:11 +00:00
if(randomize)
{
itr = m_AuthedLinks.begin();
while(itr != begin)
{
sessions.emplace_back(itr->second);
++itr;
}
}
}
2019-07-03 12:42:11 +00:00
for(const auto& session : sessions)
visit(session.get());
}
bool
ILinkLayer::VisitSessionByPubkey(const RouterID& pk,
std::function< bool(ILinkSession*) > visit)
{
2019-07-03 12:42:11 +00:00
std::shared_ptr< ILinkSession > session;
{
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
2019-07-03 12:42:11 +00:00
auto itr = m_AuthedLinks.find(pk);
if(itr == m_AuthedLinks.end())
return false;
session = itr->second;
}
2019-07-03 12:42:11 +00:00
return visit(session.get());
}
void
ILinkLayer::ForEachSession(std::function< void(ILinkSession*) > visit)
{
2019-07-03 12:42:11 +00:00
std::vector< std::shared_ptr< ILinkSession > > sessions;
{
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
2019-07-03 12:42:11 +00:00
auto itr = m_AuthedLinks.begin();
while(itr != m_AuthedLinks.end())
{
sessions.emplace_back(itr->second);
++itr;
}
}
2019-07-03 12:42:11 +00:00
for(const auto& s : sessions)
visit(s.get());
}
bool
2019-04-08 12:01:52 +00:00
ILinkLayer::Configure(llarp_ev_loop_ptr loop, const std::string& ifname,
int af, uint16_t port)
{
2019-05-21 15:24:20 +00:00
m_Loop = loop;
m_udp.user = this;
2019-10-02 13:06:14 +00:00
m_udp.recvfrom = nullptr;
m_udp.tick = &ILinkLayer::udp_tick;
if(ifname == "*")
{
if(!AllInterfaces(af, m_ourAddr))
return false;
}
else if(!GetIFAddr(ifname, m_ourAddr, af))
m_ourAddr = Addr(ifname);
2018-09-04 19:15:06 +00:00
m_ourAddr.port(port);
2019-04-08 12:01:52 +00:00
return llarp_ev_add_udp(m_Loop.get(), &m_udp, m_ourAddr) != -1;
}
void
ILinkLayer::Pump()
{
auto _now = Now();
{
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
2018-09-07 20:36:06 +00:00
auto itr = m_AuthedLinks.begin();
while(itr != m_AuthedLinks.end())
2018-09-07 17:41:49 +00:00
{
2019-08-29 11:45:58 +00:00
if(not itr->second->TimedOut(_now))
2018-09-07 20:36:06 +00:00
{
itr->second->Pump();
++itr;
}
else
2018-11-19 21:55:41 +00:00
{
2018-12-11 13:33:23 +00:00
llarp::LogInfo("session to ", RouterID(itr->second->GetPubKey()),
" timed out");
2019-05-25 14:54:30 +00:00
itr->second->Close();
2018-09-07 20:36:06 +00:00
itr = m_AuthedLinks.erase(itr);
2018-11-19 21:55:41 +00:00
}
2018-09-07 17:41:49 +00:00
}
2018-09-07 20:36:06 +00:00
}
{
ACQUIRE_LOCK(Lock_t l, m_PendingMutex);
2018-09-07 20:36:06 +00:00
auto itr = m_Pending.begin();
while(itr != m_Pending.end())
{
2019-08-29 11:45:58 +00:00
if(not itr->second->TimedOut(_now))
2018-09-07 20:36:06 +00:00
{
2019-01-07 12:47:57 +00:00
itr->second->Pump();
2018-09-07 20:36:06 +00:00
++itr;
}
else
2019-03-11 13:01:43 +00:00
{
LogInfo("pending session at ", itr->first, " timed out");
2019-08-29 11:45:58 +00:00
// defer call so we can acquire mutexes later
auto self = itr->second->BorrowSelf();
LogicCall(m_Logic, [&, self]() {
2019-08-29 11:45:58 +00:00
this->HandleTimeout(self.get());
self->Close();
});
2018-09-07 20:36:06 +00:00
itr = m_Pending.erase(itr);
2019-03-11 13:01:43 +00:00
}
2018-09-06 13:16:24 +00:00
}
}
}
2019-01-07 12:47:57 +00:00
bool
ILinkLayer::MapAddr(const RouterID& pk, ILinkSession* s)
2018-09-07 17:41:49 +00:00
{
ACQUIRE_LOCK(Lock_t l_authed, m_AuthedLinksMutex);
ACQUIRE_LOCK(Lock_t l_pending, m_PendingMutex);
2019-01-10 12:30:21 +00:00
llarp::Addr addr = s->GetRemoteEndpoint();
auto itr = m_Pending.find(addr);
2019-01-04 12:43:53 +00:00
if(itr != m_Pending.end())
2018-09-07 17:41:49 +00:00
{
2019-01-10 12:30:21 +00:00
if(m_AuthedLinks.count(pk) > MaxSessionsPerKey)
{
2019-08-29 11:45:58 +00:00
LogWarn("too many session for ", pk);
2019-04-02 09:03:53 +00:00
s->Close();
2019-01-10 12:30:21 +00:00
return false;
}
2019-04-02 09:03:53 +00:00
m_AuthedLinks.emplace(pk, itr->second);
2019-01-04 12:43:53 +00:00
itr = m_Pending.erase(itr);
2019-01-10 12:30:21 +00:00
return true;
2019-01-07 12:47:57 +00:00
}
2019-01-10 12:30:21 +00:00
return false;
2018-09-07 17:41:49 +00:00
}
bool
ILinkLayer::PickAddress(const RouterContact& rc,
llarp::AddressInfo& picked) const
{
std::string OurDialect = Name();
for(const auto& addr : rc.addrs)
{
if(addr.dialect == OurDialect)
{
picked = addr;
return true;
}
}
return false;
}
util::StatusObject
ILinkLayer::ExtractStatus() const
{
2019-02-18 23:58:12 +00:00
std::vector< util::StatusObject > pending, established;
{
ACQUIRE_LOCK(Lock_t l, m_PendingMutex);
std::transform(m_Pending.cbegin(), m_Pending.cend(),
std::back_inserter(pending),
[](const auto& item) -> util::StatusObject {
return item.second->ExtractStatus();
});
}
{
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
std::transform(m_AuthedLinks.cbegin(), m_AuthedLinks.cend(),
std::back_inserter(established),
[](const auto& item) -> util::StatusObject {
return item.second->ExtractStatus();
});
}
2019-02-18 23:58:12 +00:00
return {{"name", Name()},
{"rank", uint64_t(Rank())},
{"addr", m_ourAddr.ToString()},
{"sessions",
util::StatusObject{{"pending", pending},
{"established", established}}}};
}
bool
2018-12-19 17:48:29 +00:00
ILinkLayer::TryEstablishTo(RouterContact rc)
{
2019-05-08 12:17:48 +00:00
{
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
2019-05-08 12:17:48 +00:00
if(m_AuthedLinks.count(rc.pubkey) >= MaxSessionsPerKey)
return false;
}
llarp::AddressInfo to;
if(!PickAddress(rc, to))
return false;
2019-05-14 17:35:01 +00:00
const llarp::Addr addr(to);
2019-05-08 12:17:48 +00:00
{
ACQUIRE_LOCK(Lock_t l, m_PendingMutex);
2019-05-08 12:17:48 +00:00
if(m_Pending.count(addr) >= MaxSessionsPerKey)
return false;
}
2019-04-02 09:03:53 +00:00
std::shared_ptr< ILinkSession > s = NewOutboundSession(rc, to);
2019-01-07 12:47:57 +00:00
if(PutSession(s))
{
s->Start();
return true;
}
return false;
}
bool
2019-09-05 17:39:09 +00:00
ILinkLayer::Start(std::shared_ptr< Logic > l,
std::shared_ptr< thread::ThreadPool > worker)
{
m_Recv = std::make_shared< TrafficQueue_t >();
2019-09-05 17:39:09 +00:00
m_Worker = worker;
m_Logic = l;
2018-09-07 17:41:49 +00:00
ScheduleTick(100);
return true;
}
void
ILinkLayer::Tick(llarp_time_t now)
{
{
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
2019-03-07 15:17:29 +00:00
auto itr = m_AuthedLinks.begin();
while(itr != m_AuthedLinks.end())
{
itr->second->Tick(now);
++itr;
}
}
2019-03-07 15:17:29 +00:00
{
ACQUIRE_LOCK(Lock_t l, m_PendingMutex);
2019-03-07 15:17:29 +00:00
auto itr = m_Pending.begin();
while(itr != m_Pending.end())
{
itr->second->Tick(now);
++itr;
}
}
}
void
ILinkLayer::Stop()
{
if(m_Logic && tick_id)
2018-12-10 14:14:55 +00:00
m_Logic->remove_call(tick_id);
{
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
auto itr = m_AuthedLinks.begin();
while(itr != m_AuthedLinks.end())
{
2019-04-02 09:03:53 +00:00
itr->second->Close();
2018-12-24 16:31:58 +00:00
++itr;
}
}
{
ACQUIRE_LOCK(Lock_t l, m_PendingMutex);
auto itr = m_Pending.begin();
while(itr != m_Pending.end())
{
2019-04-02 09:03:53 +00:00
itr->second->Close();
2018-12-24 16:31:58 +00:00
++itr;
}
}
m_Recv.reset();
}
void
ILinkLayer::CloseSessionTo(const RouterID& remote)
{
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
RouterID r = remote;
llarp::LogInfo("Closing all to ", r);
auto range = m_AuthedLinks.equal_range(r);
2018-09-07 20:36:06 +00:00
auto itr = range.first;
while(itr != range.second)
{
2019-04-02 09:03:53 +00:00
itr->second->Close();
2018-09-07 20:36:06 +00:00
itr = m_AuthedLinks.erase(itr);
}
}
void
ILinkLayer::KeepAliveSessionTo(const RouterID& remote)
{
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
2018-09-07 20:36:06 +00:00
auto range = m_AuthedLinks.equal_range(remote);
auto itr = range.first;
while(itr != range.second)
{
2019-03-18 12:25:32 +00:00
if(itr->second->ShouldPing())
itr->second->SendKeepAlive();
2018-09-07 20:36:06 +00:00
++itr;
}
}
bool
2019-07-26 16:19:31 +00:00
ILinkLayer::SendTo(const RouterID& remote, const llarp_buffer_t& buf,
ILinkSession::CompletionHandler completed)
{
2019-10-02 13:06:14 +00:00
std::shared_ptr< ILinkSession > s;
2018-09-07 20:36:06 +00:00
{
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
auto range = m_AuthedLinks.equal_range(remote);
auto itr = range.first;
// pick lowest backlog session
size_t min = std::numeric_limits< size_t >::max();
while(itr != range.second)
{
2019-10-02 13:06:14 +00:00
const auto backlog = itr->second->SendQueueBacklog();
if(backlog < min)
{
2019-10-02 13:06:14 +00:00
s = itr->second;
min = backlog;
}
++itr;
}
2018-09-07 20:36:06 +00:00
}
ILinkSession::Message_t pkt(buf.sz);
std::copy_n(buf.base, buf.sz, pkt.begin());
return s && s->SendMessageBuffer(std::move(pkt), completed);
}
bool
ILinkLayer::GetOurAddressInfo(llarp::AddressInfo& addr) const
{
addr.dialect = Name();
addr.pubkey = TransportPubKey();
addr.rank = Rank();
addr.port = m_ourAddr.port();
addr.ip = *m_ourAddr.addr6();
return true;
}
const byte_t*
ILinkLayer::TransportPubKey() const
{
2018-09-04 12:55:20 +00:00
return llarp::seckey_topublic(TransportSecretKey());
}
const SecretKey&
2018-09-04 12:55:20 +00:00
ILinkLayer::TransportSecretKey() const
{
return m_SecretKey;
}
bool
ILinkLayer::GenEphemeralKeys()
{
return KeyGen(m_SecretKey);
}
bool
ILinkLayer::EnsureKeys(const char* f)
{
fs::path fpath(f);
llarp::SecretKey keys;
std::error_code ec;
if(!fs::exists(fpath, ec))
{
if(!KeyGen(m_SecretKey))
return false;
// generated new keys
if(!BEncodeWriteFile< decltype(keys), 128 >(f, m_SecretKey))
return false;
}
// load keys
if(!BDecodeReadFile(f, m_SecretKey))
2018-09-04 19:15:06 +00:00
{
llarp::LogError("Failed to load keyfile ", f);
return false;
2018-09-04 19:15:06 +00:00
}
return true;
}
2019-01-07 12:47:57 +00:00
bool
2019-04-02 09:03:53 +00:00
ILinkLayer::PutSession(const std::shared_ptr< ILinkSession >& s)
2018-09-07 17:41:49 +00:00
{
2019-03-26 13:51:57 +00:00
static constexpr size_t MaxSessionsPerEndpoint = 5;
ACQUIRE_LOCK(Lock_t lock, m_PendingMutex);
2019-01-09 14:21:55 +00:00
llarp::Addr addr = s->GetRemoteEndpoint();
2019-03-26 13:51:57 +00:00
if(m_Pending.count(addr) >= MaxSessionsPerEndpoint)
2019-01-07 12:47:57 +00:00
return false;
2019-04-02 09:03:53 +00:00
m_Pending.emplace(addr, s);
2019-01-07 12:47:57 +00:00
return true;
2018-09-07 17:41:49 +00:00
}
void
2018-10-29 16:48:36 +00:00
ILinkLayer::OnTick(uint64_t interval)
{
2018-12-27 19:10:38 +00:00
auto now = Now();
Tick(now);
ScheduleTick(interval);
}
void
ILinkLayer::ScheduleTick(uint64_t interval)
{
2018-12-10 14:14:55 +00:00
tick_id = m_Logic->call_later({interval, this, &ILinkLayer::on_timer_tick});
}
void
ILinkLayer::udp_tick(llarp_udp_io* udp)
{
ILinkLayer* link = static_cast< ILinkLayer* >(udp->user);
2019-10-02 13:06:14 +00:00
auto pkts = std::make_shared< llarp_pkt_list >();
llarp_ev_udp_recvmany(&link->m_udp, pkts.get());
auto logic = link->logic();
if(logic == nullptr)
return;
LogicCall(logic, [pkts, link]() {
2019-10-02 13:06:14 +00:00
auto itr = pkts->begin();
while(itr != pkts->end())
{
2019-10-02 13:06:14 +00:00
link->RecvFrom(itr->remote, std::move(itr->pkt));
++itr;
}
2019-10-02 13:06:14 +00:00
link->Pump();
});
}
} // namespace llarp