Carving through llarp/link

This commit is contained in:
dr7ana 2023-09-14 07:54:51 -07:00
parent d1b7eee7c8
commit fd527d612f
16 changed files with 502 additions and 399 deletions

View File

@ -208,6 +208,8 @@ add_library(lokinet-layer-wire
# a series of layer 1 symbols which are then transmitted between lokinet instances
add_library(lokinet-layer-link
STATIC
link/connection.cpp
link/endpoint.cpp
link/link_manager.cpp
link/session.cpp
link/server.cpp

View File

@ -12,267 +12,264 @@
#include <llarp/util/priority_queue.hpp>
#include <llarp/util/thread/queue.hpp>
namespace llarp
namespace llarp::iwp
{
namespace iwp
/// packet crypto overhead size
static constexpr size_t PacketOverhead = HMACSIZE + TUNNONCESIZE;
/// creates a packet with plaintext size + wire overhead + random pad
AbstractLinkSession::Packet_t
CreatePacket(Command cmd, size_t plainsize, size_t min_pad = 16, size_t pad_variance = 16);
/// Time how long we try delivery for
static constexpr std::chrono::milliseconds DeliveryTimeout = 500ms;
/// Time how long we wait to recieve a message
static constexpr auto ReceivalTimeout = (DeliveryTimeout * 8) / 5;
/// How long to keep a replay window for
static constexpr auto ReplayWindow = (ReceivalTimeout * 3) / 2;
/// How often to acks RX messages
static constexpr auto ACKResendInterval = DeliveryTimeout / 2;
/// How often to retransmit TX fragments
static constexpr auto TXFlushInterval = (DeliveryTimeout / 5) * 4;
/// How often we send a keepalive
static constexpr std::chrono::milliseconds PingInterval = 5s;
/// How long we wait for a session to die with no tx from them
static constexpr auto SessionAliveTimeout = PingInterval * 5;
struct Session : public AbstractLinkSession, public std::enable_shared_from_this<Session>
{
/// packet crypto overhead size
static constexpr size_t PacketOverhead = HMACSIZE + TUNNONCESIZE;
/// creates a packet with plaintext size + wire overhead + random pad
AbstractLinkSession::Packet_t
CreatePacket(Command cmd, size_t plainsize, size_t min_pad = 16, size_t pad_variance = 16);
/// Time how long we try delivery for
static constexpr std::chrono::milliseconds DeliveryTimeout = 500ms;
/// Time how long we wait to recieve a message
static constexpr auto ReceivalTimeout = (DeliveryTimeout * 8) / 5;
/// How long to keep a replay window for
static constexpr auto ReplayWindow = (ReceivalTimeout * 3) / 2;
/// How often to acks RX messages
static constexpr auto ACKResendInterval = DeliveryTimeout / 2;
/// How often to retransmit TX fragments
static constexpr auto TXFlushInterval = (DeliveryTimeout / 5) * 4;
/// How often we send a keepalive
static constexpr std::chrono::milliseconds PingInterval = 5s;
/// How long we wait for a session to die with no tx from them
static constexpr auto SessionAliveTimeout = PingInterval * 5;
using Time_t = std::chrono::milliseconds;
struct Session : public AbstractLinkSession, public std::enable_shared_from_this<Session>
/// maximum number of messages we can ack in a multiack
static constexpr std::size_t MaxACKSInMACK = 1024 / sizeof(uint64_t);
/// outbound session
Session(LinkLayer* parent, const RouterContact& rc, const AddressInfo& ai);
/// inbound session
Session(LinkLayer* parent, const SockAddr& from);
// Signal the event loop that a pump is needed (idempotent)
void
TriggerPump();
// Does the actual pump
void
Pump() override;
void
Tick(llarp_time_t now) override;
bool
SendMessageBuffer(
AbstractLinkSession::Message_t msg,
CompletionHandler resultHandler,
uint16_t priority = 0) override;
void
Send_LL(const byte_t* buf, size_t sz);
void EncryptAndSend(AbstractLinkSession::Packet_t);
void
Start() override;
void
Close() override;
bool Recv_LL(AbstractLinkSession::Packet_t) override;
bool
SendKeepAlive() override;
bool
IsEstablished() const override;
bool
TimedOut(llarp_time_t now) const override;
PubKey
GetPubKey() const override
{
using Time_t = std::chrono::milliseconds;
return m_RemoteRC.pubkey;
}
/// maximum number of messages we can ack in a multiack
static constexpr std::size_t MaxACKSInMACK = 1024 / sizeof(uint64_t);
const SockAddr&
GetRemoteEndpoint() const override
{
return m_RemoteAddr;
}
/// outbound session
Session(LinkLayer* parent, const RouterContact& rc, const AddressInfo& ai);
/// inbound session
Session(LinkLayer* parent, const SockAddr& from);
RouterContact
GetRemoteRC() const override
{
return m_RemoteRC;
}
// Signal the event loop that a pump is needed (idempotent)
void
TriggerPump();
size_t
SendQueueBacklog() const override
{
return m_TXMsgs.size();
}
// Does the actual pump
void
Pump() override;
ILinkLayer*
GetLinkLayer() const override
{
return m_Parent;
}
void
Tick(llarp_time_t now) override;
bool
RenegotiateSession() override;
bool
SendMessageBuffer(
AbstractLinkSession::Message_t msg,
CompletionHandler resultHandler,
uint16_t priority = 0) override;
bool
ShouldPing() const override;
void
Send_LL(const byte_t* buf, size_t sz);
SessionStats
GetSessionStats() const override;
void EncryptAndSend(AbstractLinkSession::Packet_t);
util::StatusObject
ExtractStatus() const override;
void
Start() override;
bool
IsInbound() const override
{
return m_Inbound;
}
void
HandlePlaintext() override;
void
Close() override;
bool Recv_LL(AbstractLinkSession::Packet_t) override;
bool
SendKeepAlive() override;
bool
IsEstablished() const override;
bool
TimedOut(llarp_time_t now) const override;
PubKey
GetPubKey() const override
{
return m_RemoteRC.pubkey;
}
const SockAddr&
GetRemoteEndpoint() const override
{
return m_RemoteAddr;
}
RouterContact
GetRemoteRC() const override
{
return m_RemoteRC;
}
size_t
SendQueueBacklog() const override
{
return m_TXMsgs.size();
}
ILinkLayer*
GetLinkLayer() const override
{
return m_Parent;
}
bool
RenegotiateSession() override;
bool
ShouldPing() const override;
SessionStats
GetSessionStats() const override;
util::StatusObject
ExtractStatus() const override;
bool
IsInbound() const override
{
return m_Inbound;
}
void
HandlePlaintext() override;
private:
enum class State
{
/// we have no data recv'd
Initial,
/// we are in introduction phase
Introduction,
/// we sent our LIM
LinkIntro,
/// handshake done and LIM has been obtained
Ready,
/// we are closed now
Closed
};
static std::string
StateToString(State state);
State m_State;
SessionStats m_Stats;
/// are we inbound session ?
const bool m_Inbound;
/// parent link layer
LinkLayer* const m_Parent;
const llarp_time_t m_CreatedAt;
const SockAddr m_RemoteAddr;
AddressInfo m_ChosenAI;
/// remote rc
RouterContact m_RemoteRC;
/// session key
SharedSecret m_SessionKey;
/// session token
AlignedBuffer<24> token;
PubKey m_ExpectedIdent;
PubKey m_RemoteOnionKey;
llarp_time_t m_LastTX = 0s;
llarp_time_t m_LastRX = 0s;
// accumulate for periodic rate calculation
uint64_t m_TXRate = 0;
uint64_t m_RXRate = 0;
llarp_time_t m_ResetRatesAt = 0s;
uint64_t m_TXID = 0;
bool
ShouldResetRates(llarp_time_t now) const;
void
ResetRates();
std::map<uint64_t, InboundMessage> m_RXMsgs;
std::map<uint64_t, OutboundMessage> m_TXMsgs;
/// maps rxid to time recieved
std::unordered_map<uint64_t, llarp_time_t> m_ReplayFilter;
/// rx messages to send in next round of multiacks
util::ascending_priority_queue<uint64_t> m_SendMACKs;
using CryptoQueue_t = std::vector<Packet_t>;
CryptoQueue_t m_EncryptNext;
CryptoQueue_t m_DecryptNext;
std::atomic_flag m_PlaintextEmpty;
llarp::thread::Queue<CryptoQueue_t> m_PlaintextRecv;
std::atomic_flag m_SentClosed;
void
EncryptWorker(CryptoQueue_t msgs);
void
DecryptWorker(CryptoQueue_t msgs);
void
HandleGotIntro(Packet_t pkt);
void
HandleGotIntroAck(Packet_t pkt);
void
HandleCreateSessionRequest(Packet_t pkt);
void
HandleAckSession(Packet_t pkt);
void
HandleSessionData(Packet_t pkt);
bool
DecryptMessageInPlace(Packet_t& pkt);
void
SendMACK();
void
HandleRecvMsgCompleted(const InboundMessage& msg);
void
GenerateAndSendIntro();
bool
GotInboundLIM(const LinkIntroMessage* msg);
bool
GotOutboundLIM(const LinkIntroMessage* msg);
bool
GotRenegLIM(const LinkIntroMessage* msg);
void
SendOurLIM(AbstractLinkSession::CompletionHandler h = nullptr);
void
HandleXMIT(Packet_t msg);
void
HandleDATA(Packet_t msg);
void
HandleACKS(Packet_t msg);
void
HandleNACK(Packet_t msg);
void
HandlePING(Packet_t msg);
void
HandleCLOS(Packet_t msg);
void
HandleMACK(Packet_t msg);
private:
enum class State
{
/// we have no data recv'd
Initial,
/// we are in introduction phase
Introduction,
/// we sent our LIM
LinkIntro,
/// handshake done and LIM has been obtained
Ready,
/// we are closed now
Closed
};
} // namespace iwp
} // namespace llarp
static std::string
StateToString(State state);
State m_State;
SessionStats m_Stats;
/// are we inbound session ?
const bool m_Inbound;
/// parent link layer
LinkLayer* const m_Parent;
const llarp_time_t m_CreatedAt;
const SockAddr m_RemoteAddr;
AddressInfo m_ChosenAI;
/// remote rc
RouterContact m_RemoteRC;
/// session key
SharedSecret m_SessionKey;
/// session token
AlignedBuffer<24> token;
PubKey m_ExpectedIdent;
PubKey m_RemoteOnionKey;
llarp_time_t m_LastTX = 0s;
llarp_time_t m_LastRX = 0s;
// accumulate for periodic rate calculation
uint64_t m_TXRate = 0;
uint64_t m_RXRate = 0;
llarp_time_t m_ResetRatesAt = 0s;
uint64_t m_TXID = 0;
bool
ShouldResetRates(llarp_time_t now) const;
void
ResetRates();
std::map<uint64_t, InboundMessage> m_RXMsgs;
std::map<uint64_t, OutboundMessage> m_TXMsgs;
/// maps rxid to time recieved
std::unordered_map<uint64_t, llarp_time_t> m_ReplayFilter;
/// rx messages to send in next round of multiacks
util::ascending_priority_queue<uint64_t> m_SendMACKs;
using CryptoQueue_t = std::vector<Packet_t>;
CryptoQueue_t m_EncryptNext;
CryptoQueue_t m_DecryptNext;
std::atomic_flag m_PlaintextEmpty;
llarp::thread::Queue<CryptoQueue_t> m_PlaintextRecv;
std::atomic_flag m_SentClosed;
void
EncryptWorker(CryptoQueue_t msgs);
void
DecryptWorker(CryptoQueue_t msgs);
void
HandleGotIntro(Packet_t pkt);
void
HandleGotIntroAck(Packet_t pkt);
void
HandleCreateSessionRequest(Packet_t pkt);
void
HandleAckSession(Packet_t pkt);
void
HandleSessionData(Packet_t pkt);
bool
DecryptMessageInPlace(Packet_t& pkt);
void
SendMACK();
void
HandleRecvMsgCompleted(const InboundMessage& msg);
void
GenerateAndSendIntro();
bool
GotInboundLIM(const LinkIntroMessage* msg);
bool
GotOutboundLIM(const LinkIntroMessage* msg);
bool
GotRenegLIM(const LinkIntroMessage* msg);
void
SendOurLIM(AbstractLinkSession::CompletionHandler h = nullptr);
void
HandleXMIT(Packet_t msg);
void
HandleDATA(Packet_t msg);
void
HandleACKS(Packet_t msg);
void
HandleNACK(Packet_t msg);
void
HandlePING(Packet_t msg);
void
HandleCLOS(Packet_t msg);
void
HandleMACK(Packet_t msg);
};
} // namespace llarp::iwp

10
llarp/link/connection.cpp Normal file
View File

@ -0,0 +1,10 @@
#include "connection.hpp"
namespace llarp::link
{
Connection::Connection(
std::shared_ptr<oxen::quic::connection_interface>& c, std::shared_ptr<oxen::quic::Stream>& s)
: conn{c}, control_stream{s}
{}
} // namespace llarp::link

View File

@ -7,9 +7,12 @@
namespace llarp::link
{
struct Connection
{
Connection(
std::shared_ptr<oxen::quic::connection_interface>& c,
std::shared_ptr<oxen::quic::Stream>& s);
std::shared_ptr<oxen::quic::connection_interface> conn;
std::shared_ptr<oxen::quic::Stream> control_stream;

72
llarp/link/endpoint.cpp Normal file
View File

@ -0,0 +1,72 @@
#include "endpoint.hpp"
#include "link_manager.hpp"
namespace llarp::link
{
std::shared_ptr<link::Connection>
Endpoint::get_conn(const RouterContact& rc) const
{
for (const auto& [rid, conn] : conns)
{
if (conn->remote_rc == rc)
return conn;
}
return nullptr;
}
bool
Endpoint::have_conn(const RouterID& remote, bool client_only) const
{
if (auto itr = conns.find(remote); itr != conns.end())
{
if (not(itr->second->remote_is_relay and client_only))
return true;
}
return false;
}
// TOFIX: use the new close methods after bumping libquic
bool
Endpoint::deregister_peer(RouterID remote)
{
if (auto itr = conns.find(remote); itr != conns.end())
{
endpoint->close_connection(*dynamic_cast<oxen::quic::Connection*>(itr->second->conn.get()));
conns.erase(itr);
return true;
}
return false;
}
bool
Endpoint::establish_connection(const oxen::quic::opt::local_addr& remote)
{
try
{
oxen::quic::dgram_data_callback dgram_cb =
[this](oxen::quic::dgram_interface& dgi, bstring dgram) {
link_manager.recv_data_message(dgi, dgram);
};
auto conn_interface = endpoint->connect(remote, link_manager.tls_creds, dgram_cb);
auto control_stream = conn_interface->get_new_stream();
// TOFIX: get a real RouterID after refactoring it
RouterID rid;
auto [itr, b] = conns.emplace(rid);
itr->second = std::make_shared<link::Connection>(conn_interface, control_stream);
connid_map.emplace(conn_interface->scid(), rid);
return true;
}
catch (...)
{
log::error(quic_cat, "Error: failed to establish connection to {}", remote);
return false;
}
}
} // namespace llarp::link

View File

@ -11,13 +11,34 @@ namespace llarp::link
{
struct Endpoint
{
Endpoint(std::shared_ptr<oxen::quic::Endpoint> ep, LinkManager& lm)
: endpoint{std::move(ep)}, link_manager{lm}
{}
std::shared_ptr<oxen::quic::Endpoint> endpoint;
bool inbound{false};
LinkManager& link_manager;
// for outgoing packets, we route via RouterID; map RouterID->Connection
// for incoming packets, we get a ConnectionID; map ConnectionID->RouterID
std::unordered_map<RouterID, link::Connection> connections;
std::unordered_map<RouterID, std::shared_ptr<link::Connection>> conns;
std::unordered_map<oxen::quic::ConnectionID, RouterID> connid_map;
std::shared_ptr<link::Connection>
get_conn(const RouterContact&) const;
bool
have_conn(const RouterID& remote, bool client_only) const;
bool
deregister_peer(RouterID remote);
bool
establish_connection(const oxen::quic::opt::local_addr& remote);
};
} // namespace llarp::link
/*
- Refactor RouterID to use gnutls info and maybe ConnectionID
*/

View File

@ -1,5 +1,6 @@
#include "link_manager.hpp"
#include <llarp/router/router.hpp>
#include <llarp/router/rc_lookup_handler.hpp>
#include <llarp/nodedb.hpp>
#include <llarp/crypto/crypto.hpp>
@ -9,27 +10,31 @@
namespace llarp
{
link::Endpoint*
LinkManager::GetCompatibleLink(const RouterContact&)
LinkManager::LinkManager(Router& r)
: router{r}
, quic{std::make_unique<oxen::quic::Network>()}
, tls_creds{oxen::quic::GNUTLSCreds::make_from_ed_keys(
{reinterpret_cast<const char*>(router.encryption().data()), router.encryption().size()},
{reinterpret_cast<const char*>(router.encryption().toPublic().data()), size_t{32}})}
, ep{quic->endpoint(router.local), *this}
{}
std::shared_ptr<link::Connection>
LinkManager::get_compatible_link(const RouterContact& rc)
{
if (stopping)
return nullptr;
for (auto& ep : endpoints)
{
// TODO: need some notion of "is this link compatible with that address".
// iwp just checks that the link dialect ("iwp") matches the address info dialect,
// but that feels insufficient. For now, just return the first endpoint we have;
// we should probably only have 1 for now anyway until we make ipv6 work.
return &ep;
}
if (auto c = ep.get_conn(rc); c)
return c;
return nullptr;
}
// TODO: replace with control/data message sending with libquic
bool
LinkManager::SendTo(
LinkManager::send_to(
const RouterID& remote,
const llarp_buffer_t&,
AbstractLinkSession::CompletionHandler completed,
@ -38,7 +43,7 @@ namespace llarp
if (stopping)
return false;
if (not HaveConnection(remote))
if (not have_connection_to(remote))
{
if (completed)
{
@ -53,68 +58,40 @@ namespace llarp
}
bool
LinkManager::HaveConnection(const RouterID& remote, bool client_only) const
LinkManager::have_connection_to(const RouterID& remote, bool client_only) const
{
for (const auto& ep : endpoints)
{
if (auto itr = ep.connections.find(remote); itr != ep.connections.end())
{
if (not(itr->second.remote_is_relay and client_only))
return true;
return false;
}
}
return false;
return ep.have_conn(remote, client_only);
}
bool
LinkManager::HaveClientConnection(const RouterID& remote) const
LinkManager::have_client_connection_to(const RouterID& remote) const
{
return HaveConnection(remote, true);
return ep.have_conn(remote, true);
}
void
LinkManager::DeregisterPeer(RouterID remote)
LinkManager::deregister_peer(RouterID remote)
{
m_PersistingSessions.erase(remote);
for (const auto& ep : endpoints)
if (auto rv = ep.deregister_peer(remote); rv)
{
if (auto itr = ep.connections.find(remote); itr != ep.connections.end())
{
/*
itr->second.conn->close(); //TODO: libquic needs some function for this
*/
}
persisting_conns.erase(remote);
log::info(logcat, "Peer {} successfully de-registered");
}
LogInfo(remote, " has been de-registered");
else
log::warning(logcat, "Peer {} not found for de-registeration!");
}
void
LinkManager::AddLink(const oxen::quic::opt::local_addr& bind, bool inbound)
LinkManager::connect_to(const oxen::quic::opt::local_addr& remote)
{
// TODO: libquic callbacks: new_conn_alpn_notify, new_conn_pubkey_ok, new_conn_established/ready
// stream_opened, stream_data, stream_closed, conn_closed
oxen::quic::dgram_data_callback dgram_cb =
[this](oxen::quic::dgram_interface& dgi, bstring dgram) {
HandleIncomingDataMessage(dgi, dgram);
};
auto ep = quic->endpoint(
bind,
std::move(dgram_cb),
oxen::quic::opt::enable_datagrams{oxen::quic::Splitting::ACTIVE});
endpoints.emplace_back();
auto& endp = endpoints.back();
endp.endpoint = std::move(ep);
if (inbound)
{
endp.endpoint->listen(tls_creds);
endp.inbound = true;
}
if (auto rv = ep.establish_connection(remote); rv)
log::info(quic_cat, "Connection to {} successfully established!", remote);
else
log::info(quic_cat, "Connection to {} unsuccessfully established", remote);
}
void
LinkManager::Stop()
LinkManager::stop()
{
if (stopping)
{
@ -130,23 +107,23 @@ namespace llarp
}
void
LinkManager::PersistSessionUntil(const RouterID& remote, llarp_time_t until)
LinkManager::set_conn_persist(const RouterID& remote, llarp_time_t until)
{
if (stopping)
return;
util::Lock l(_mutex);
m_PersistingSessions[remote] = std::max(until, m_PersistingSessions[remote]);
if (HaveClientConnection(remote))
persisting_conns[remote] = std::max(until, persisting_conns[remote]);
if (have_client_connection_to(remote))
{
// mark this as a client so we don't try to back connect
m_Clients.Upsert(remote);
clients.Upsert(remote);
}
}
size_t
LinkManager::NumberOfConnectedRouters(bool clients_only) const
LinkManager::get_num_connected(bool clients_only) const
{
size_t count{0};
for (const auto& ep : endpoints)
@ -162,13 +139,13 @@ namespace llarp
}
size_t
LinkManager::NumberOfConnectedClients() const
LinkManager::get_num_connected_clients() const
{
return NumberOfConnectedRouters(true);
return get_num_connected(true);
}
bool
LinkManager::GetRandomConnectedRouter(RouterContact& router) const
LinkManager::get_random_connected(RouterContact& router) const
{
std::unordered_map<RouterID, RouterContact> connectedRouters;
@ -199,7 +176,7 @@ namespace llarp
// TODO: this? perhaps no longer necessary in the same way?
void
LinkManager::CheckPersistingSessions(llarp_time_t)
LinkManager::check_persisting_conns(llarp_time_t)
{
if (stopping)
return;
@ -207,7 +184,7 @@ namespace llarp
// TODO: do we still need this concept?
void
LinkManager::updatePeerDb(std::shared_ptr<PeerDb>)
LinkManager::update_peer_db(std::shared_ptr<PeerDb>)
{}
// TODO: this
@ -218,7 +195,7 @@ namespace llarp
}
void
LinkManager::Init(RCLookupHandler* rcLookup)
LinkManager::init(RCLookupHandler* rcLookup)
{
stopping = false;
_rcLookup = rcLookup;
@ -226,11 +203,12 @@ namespace llarp
}
void
LinkManager::Connect(RouterID router)
LinkManager::connect_to(RouterID router)
{
auto fn = [this](const RouterID&, const RouterContact* const rc, const RCRequestResult res) {
auto fn = [this](
const RouterID& rid, const RouterContact* const rc, const RCRequestResult res) {
if (res == RCRequestResult::Success)
Connect(*rc);
connect_to(*rc);
/* TODO:
else
RC lookup failure callback here
@ -242,10 +220,10 @@ namespace llarp
// This function assumes the RC has already had its signature verified and connection is allowed.
void
LinkManager::Connect(RouterContact rc)
LinkManager::connect_to(RouterContact rc)
{
// TODO: connection failed callback
if (HaveConnection(rc.pubkey))
if (have_connection_to(rc.pubkey))
return;
// RC shouldn't be valid if this is the case, but may as well sanity check...
@ -254,14 +232,14 @@ namespace llarp
return;
// TODO: connection failed callback
auto* ep = GetCompatibleLink(rc);
auto* ep = get_compatible_link(rc);
if (ep == nullptr)
return;
// TODO: connection established/failed callbacks
oxen::quic::stream_data_callback stream_cb =
[this](oxen::quic::Stream& stream, bstring_view packet) {
HandleIncomingControlMessage(stream, packet);
recv_control_message(stream, packet);
};
// TODO: once "compatible link" cares about address, actually choose addr to connect to
@ -289,7 +267,7 @@ namespace llarp
}
void
LinkManager::ConnectToRandomRouters(int numDesired)
LinkManager::connect_to_random(int numDesired)
{
std::set<RouterID> exclude;
auto remainingDesired = numDesired;
@ -315,13 +293,13 @@ namespace llarp
}
void
LinkManager::HandleIncomingDataMessage(oxen::quic::dgram_interface&, bstring)
LinkManager::recv_data_message(oxen::quic::dgram_interface&, bstring)
{
// TODO: this
}
void
LinkManager::HandleIncomingControlMessage(oxen::quic::Stream&, bstring_view)
LinkManager::recv_control_message(oxen::quic::Stream&, bstring_view)
{
// TODO: this
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <llarp/util/compare_ptr.hpp>
#include "server.hpp"
#include "endpoint.hpp"
@ -10,6 +11,13 @@
#include <set>
#include <atomic>
#include <llarp/util/logging.hpp>
namespace
{
static auto quic_cat = llarp::log::Cat("lokinet.quic");
} // namespace
namespace llarp
{
enum class SessionResult
@ -36,63 +44,64 @@ namespace llarp
template <>
constexpr inline bool IsToStringFormattable<SessionResult> = true;
struct Router;
struct LinkManager
{
public:
explicit LinkManager(AbstractRouter* r) : router{r}
{}
explicit LinkManager(Router& r);
bool
SendTo(
send_to(
const RouterID& remote,
const llarp_buffer_t& buf,
AbstractLinkSession::CompletionHandler completed,
uint16_t priority);
bool
HaveConnection(const RouterID& remote, bool client_only = false) const;
have_connection_to(const RouterID& remote, bool client_only = false) const;
bool
HaveClientConnection(const RouterID& remote) const;
have_client_connection_to(const RouterID& remote) const;
void
DeregisterPeer(RouterID remote);
deregister_peer(RouterID remote);
void
AddLink(const oxen::quic::opt::local_addr& bind, bool inbound = false);
connect_to(const oxen::quic::opt::local_addr& remote);
void
Stop();
connect_to(RouterID router);
void
PersistSessionUntil(const RouterID& remote, llarp_time_t until);
connect_to(RouterContact rc);
void
stop();
void
set_conn_persist(const RouterID& remote, llarp_time_t until);
size_t
NumberOfConnectedRouters(bool clients_only = false) const;
get_num_connected(bool clients_only = false) const;
size_t
NumberOfConnectedClients() const;
get_num_connected_clients() const;
bool
GetRandomConnectedRouter(RouterContact& router) const;
get_random_connected(RouterContact& router) const;
void
CheckPersistingSessions(llarp_time_t now);
check_persisting_conns(llarp_time_t now);
void
updatePeerDb(std::shared_ptr<PeerDb> peerDb);
update_peer_db(std::shared_ptr<PeerDb> peerDb);
util::StatusObject
ExtractStatus() const;
void
Init(RCLookupHandler* rcLookup);
void
Connect(RouterID router);
void
Connect(RouterContact rc);
init(RCLookupHandler* rcLookup);
// Attempts to connect to a number of random routers.
//
@ -100,47 +109,46 @@ namespace llarp
// check if we already have a connection to any of the random set, as making
// that thread safe would be slow...I think.
void
ConnectToRandomRouters(int numDesired);
connect_to_random(int numDesired);
// TODO: tune these (maybe even remove max?) now that we're switching to quic
/// always maintain this many connections to other routers
size_t minConnectedRouters = 4;
size_t min_connected_routers = 4;
/// hard upperbound limit on the number of router to router connections
size_t maxConnectedRouters = 6;
size_t max_connected_routers = 6;
private:
link::Endpoint*
GetCompatibleLink(const RouterContact& rc);
friend struct link::Endpoint;
std::shared_ptr<link::Connection>
get_compatible_link(const RouterContact& rc);
std::atomic<bool> stopping;
mutable util::Mutex _mutex; // protects m_PersistingSessions
// sessions to persist -> timestamp to end persist at
std::unordered_map<RouterID, llarp_time_t> m_PersistingSessions GUARDED_BY(_mutex);
std::unordered_map<RouterID, llarp_time_t> persisting_conns GUARDED_BY(_mutex);
std::unordered_map<RouterID, SessionStats> m_lastRouterStats;
std::unordered_map<RouterID, SessionStats> last_router_stats;
util::DecayingHashSet<RouterID> m_Clients{path::default_lifetime};
util::DecayingHashSet<RouterID> clients{path::default_lifetime};
RCLookupHandler* _rcLookup;
std::shared_ptr<NodeDB> _nodedb;
AbstractRouter* router;
Router& router;
// FIXME: Lokinet currently expects to be able to kill all network functionality before
// finishing other shutdown things, including destroying this class, and that is all in
// Network's destructor, so we need to be able to destroy it before this class.
std::unique_ptr<oxen::quic::Network> quic{std::make_unique<oxen::quic::Network>()};
std::vector<link::Endpoint> endpoints;
// TODO: initialize creds
std::unique_ptr<oxen::quic::Network> quic;
std::shared_ptr<oxen::quic::GNUTLSCreds> tls_creds;
link::Endpoint ep;
void
HandleIncomingDataMessage(oxen::quic::dgram_interface& dgi, bstring dgram);
recv_data_message(oxen::quic::dgram_interface& dgi, bstring dgram);
void
HandleIncomingControlMessage(oxen::quic::Stream& stream, bstring_view packet);
recv_control_message(oxen::quic::Stream& stream, bstring_view packet);
};
} // namespace llarp

View File

@ -1,9 +1,15 @@
#pragma once
#include <llarp/util/buffer.hpp>
#include <oxenc/bt.h>
#include <llarp/util/logging.hpp>
namespace
{
static auto link_cat = llarp::log::Cat("lokinet.link");
} // namespace
namespace llarp
{
/// abstract base class for serialized messages

View File

@ -9,11 +9,6 @@
#include <vector>
namespace
{
static auto link_cat = llarp::log::Cat("lokinet.link");
} // namespace
namespace llarp
{
struct AbstractLinkSession;

View File

@ -505,7 +505,7 @@ namespace llarp
{
// remove all connections to this router as it's probably not registered anymore
LogWarn("removing router ", router, " because of path build timeout");
r->linkManager().DeregisterPeer(router);
r->linkManager().deregister_peer(router);
r->nodedb()->Remove(router);
}
},

View File

@ -24,7 +24,7 @@ namespace llarp
const RouterID& remote, const AbstractLinkMessage& msg, SendStatusHandler callback)
{
// if the destination is invalid, callback with failure and return
if (not _router->linkManager().HaveClientConnection(remote)
if (not _router->linkManager().have_client_connection_to(remote)
and not _router->rcLookupHandler().SessionIsAllowed(remote))
{
DoCallback(callback, SendStatus::InvalidRouter);
@ -47,7 +47,7 @@ namespace llarp
std::copy_n(_buf.data(), _buf.size(), ent.message.data());
// if we have a session to the destination, queue the message and return
if (_router->linkManager().HaveConnection(remote))
if (_router->linkManager().have_connection_to(remote))
{
QueueOutboundMessage(std::move(ent));
return true;
@ -196,7 +196,7 @@ namespace llarp
const llarp_buffer_t buf{ent.message};
m_queueStats.sent++;
SendStatusHandler callback = ent.inform;
return _router->linkManager().SendTo(
return _router->linkManager().send_to(
ent.router,
buf,
[this, callback](AbstractLinkSession::DeliveryStatus status) {
@ -213,7 +213,7 @@ namespace llarp
bool
OutboundMessageHandler::SendIfSession(const MessageQueueEntry& ent)
{
if (_router->linkManager().HaveConnection(ent.router))
if (_router->linkManager().have_connection_to(ent.router))
{
return Send(ent);
}

View File

@ -210,7 +210,7 @@ namespace llarp
numPending += pendingCallbacks.size();
}
return _linkManager->NumberOfConnectedRouters() + numPending < maxConnectedRouters;
return _linkManager->get_num_connected() + numPending < maxConnectedRouters;
}
void

View File

@ -273,7 +273,7 @@ namespace llarp
void
Router::PersistSessionUntil(const RouterID& remote, llarp_time_t until)
{
_linkManager.PersistSessionUntil(remote, until);
_linkManager.set_conn_persist(remote, until);
}
void
@ -599,13 +599,13 @@ namespace llarp
size_t
Router::NumberOfConnectedRouters() const
{
return _linkManager.NumberOfConnectedRouters();
return _linkManager.get_num_connected();
}
size_t
Router::NumberOfConnectedClients() const
{
return _linkManager.NumberOfConnectedClients();
return _linkManager.get_num_connected_clients();
}
bool
@ -671,8 +671,8 @@ namespace llarp
// Router config
_rc.SetNick(conf.router.m_nickname);
_linkManager.maxConnectedRouters = conf.router.m_maxConnectedRouters;
_linkManager.minConnectedRouters = conf.router.m_minConnectedRouters;
_linkManager.max_connected_routers = conf.router.m_maxConnectedRouters;
_linkManager.min_connected_routers = conf.router.m_minConnectedRouters;
encryption_keyfile = m_keyManager->m_encKeyPath;
our_rc_file = m_keyManager->m_rcPath;
@ -792,7 +792,7 @@ namespace llarp
// Init components after relevant config settings loaded
_outboundMessageHandler.Init(this);
_linkManager.Init(&_rcLookupHandler);
_linkManager.init(&_rcLookupHandler);
_rcLookupHandler.Init(
_dht,
_nodedb,
@ -1063,9 +1063,9 @@ namespace llarp
// mark peers as de-registered
for (auto& peer : closePeers)
_linkManager.DeregisterPeer(std::move(peer));
_linkManager.deregister_peer(std::move(peer));
_linkManager.CheckPersistingSessions(now);
_linkManager.check_persisting_conns(now);
size_t connected = NumberOfConnectedRouters();
@ -1076,7 +1076,7 @@ namespace llarp
_rcLookupHandler.ExploreNetwork();
m_NextExploreAt = timepoint_now + std::chrono::seconds(interval);
}
size_t connectToNum = _linkManager.minConnectedRouters;
size_t connectToNum = _linkManager.min_connected_routers;
const auto strictConnect = _rcLookupHandler.NumberOfStrictConnectRouters();
if (strictConnect > 0 && connectToNum > strictConnect)
{
@ -1114,7 +1114,7 @@ namespace llarp
{
size_t dlt = connectToNum - connected;
LogDebug("connecting to ", dlt, " random routers to keep alive");
_linkManager.ConnectToRandomRouters(dlt);
_linkManager.connect_to_random(dlt);
}
_hiddenServiceContext.Tick(now);
@ -1133,7 +1133,7 @@ namespace llarp
// TODO: throttle this?
// TODO: need to capture session stats when session terminates / is removed from link
// manager
_linkManager.updatePeerDb(m_peerDb);
_linkManager.update_peer_db(m_peerDb);
if (m_peerDb->shouldFlush(now))
{
@ -1235,7 +1235,7 @@ namespace llarp
bool
Router::GetRandomConnectedRouter(RouterContact& result) const
{
return _linkManager.GetRandomConnectedRouter(result);
return _linkManager.get_random_connected(result);
}
void
@ -1501,7 +1501,7 @@ namespace llarp
void
Router::StopLinks()
{
_linkManager.Stop();
_linkManager.stop();
}
void
@ -1559,7 +1559,7 @@ namespace llarp
bool
Router::HasSessionTo(const RouterID& remote) const
{
return _linkManager.HaveConnection(remote);
return _linkManager.have_connection_to(remote);
}
std::string
@ -1581,7 +1581,7 @@ namespace llarp
auto connected = NumberOfConnectedRouters();
if (connected >= want)
return;
_linkManager.ConnectToRandomRouters(want);
_linkManager.connect_to_random(want);
}
bool
@ -1729,7 +1729,7 @@ namespace llarp
AddressInfo ai;
ai.fromSockAddr(bind_addr);
_linkManager.AddLink({ai.IPString(), ai.port}, true);
_linkManager.connect_to({ai.IPString(), ai.port}, true);
ai.pubkey = llarp::seckey_topublic(_identity);
ai.dialect = "quicinet"; // FIXME: constant, also better name?
@ -1749,7 +1749,7 @@ namespace llarp
{
AddressInfo ai;
ai.fromSockAddr(bind_addr);
_linkManager.AddLink({ai.IPString(), ai.port}, false);
_linkManager.connect_to({ai.IPString(), ai.port}, false);
}
}

View File

@ -47,11 +47,22 @@
#include <oxenmq/address.h>
#include <external/oxen-libquic/include/quic.hpp>
namespace libquic = oxen::quic;
namespace llarp
{
struct Router : public AbstractRouter
class RouteManager final /* : public AbstractRouter */
{
public:
std::shared_ptr<libquic::connection_interface>
get_or_connect();
private:
std::shared_ptr<libquic::Endpoint> ep;
};
struct Router final : public AbstractRouter
{
llarp_time_t _lastPump = 0s;
bool ready;
@ -224,6 +235,7 @@ namespace llarp
ShouldTestOtherRouters() const;
std::optional<SockAddr> _ourAddress;
oxen::quic::Address local;
EventLoop_ptr _loop;
std::shared_ptr<vpn::Platform> _vpnPlatform;
@ -311,7 +323,7 @@ namespace llarp
Profiling _routerProfiling;
fs::path _profilesFile;
OutboundMessageHandler _outboundMessageHandler;
LinkManager _linkManager{this};
LinkManager _linkManager{*this};
RCLookupHandler _rcLookupHandler;
RCGossiper _rcGossiper;

View File

@ -112,7 +112,6 @@ namespace llarp
std::string
bt_encode() const;
// tofix: drop version 0 case, change parameter to take btlp reference
void
bt_encode_subdict(oxenc::bt_list_producer& btlp) const;