More carving + libquic bump

- bumped libquic to begin implementing dependent features on connection open/close signals
- more gutting of interface classes
pull/2204/head
dr7ana 9 months ago
parent aaf688cf81
commit bfa9629779

@ -1 +1 @@
Subproject commit 329a6ba6ff79f7a94762f0e55d8366190cbfb318
Subproject commit 2c6b8774c53426b4c99ca6a84c71149789cd77e4

@ -183,7 +183,7 @@ add_library(lokinet-layer-flow
# with onion paths. onion paths anonymize routing layer pdu.
add_library(lokinet-layer-onion
STATIC
path/ihophandler.cpp
path/abstracthophandler.cpp
path/path_context.cpp
path/path.cpp
path/pathbuilder.cpp

@ -2,7 +2,7 @@
#include <llarp/crypto/types.hpp>
#include <llarp/net/ip_packet.hpp>
#include <llarp/path/ihophandler.hpp>
#include <llarp/path/abstracthophandler.hpp>
#include <llarp/routing/transfer_traffic_message.hpp>
#include <llarp/service/protocol_type.hpp>
#include <llarp/util/time.hpp>

@ -1,12 +0,0 @@
#pragma once
#include "connection.hpp"
#include "link_manager.hpp"
#include <llarp/router/router.hpp>
#include <llarp/router_id.hpp>
#include <external/oxen-libquic/include/quic.hpp>
namespace llarp::link
{} // namespace llarp::link

@ -15,11 +15,8 @@ namespace llarp
std::shared_ptr<link::Connection>
Endpoint::get_conn(const RouterContact& rc) const
{
for (const auto& [rid, conn] : conns)
{
if (conn->remote_rc == rc)
return conn;
}
if (auto itr = conns.find(rc.pubkey); itr != conns.end())
return itr->second;
return nullptr;
}
@ -79,33 +76,131 @@ namespace llarp
}
} // namespace link
// TODO: pass connection open callback to endpoint constructor!
LinkManager::LinkManager(Router& r)
: router{r}
, quic{std::make_unique<oxen::quic::Network>()}
, tls_creds{oxen::quic::GNUTLSCreds::make_from_ed_keys(
{reinterpret_cast<const char*>(router.identity().data()), size_t{32}},
{reinterpret_cast<const char*>(router.identity().toPublic().data()), size_t{32}})}
, ep{quic->endpoint(router.local), *this}
, ep{quic->endpoint(
router.public_ip(),
[this](oxen::quic::connection_interface& ci) { return on_conn_open(ci); }),
*this}
{}
// TODO: replace with control/data message sending with libquic
bool
LinkManager::send_to(const RouterID& remote, const llarp_buffer_t&, uint16_t)
LinkManager::send_to(const RouterID& remote, bstring data, uint16_t priority)
{
if (stopping)
return false;
if (not have_connection_to(remote))
{
auto pending = PendingMessage(data, priority);
auto [itr, b] = pending_conn_msg_queue.emplace(remote, MessageQueue());
itr->second.push(std::move(pending));
rc_lookup->get_rc(
remote,
[this](
[[maybe_unused]] const RouterID& rid,
const RouterContact* const rc,
const RCRequestResult res) {
if (res == RCRequestResult::Success)
connect_to(*rc);
else
log::warning(quic_cat, "Do something intelligent here for error handling");
});
// TODO: some error callback to report message send failure
// or, should we connect and pass a send-msg callback as the connection successful cb?
return false;
}
// TODO: send the message
// TODO: if we keep bool return type, change this accordingly
return false;
}
void
LinkManager::connect_to(RouterID router)
{
rc_lookup->get_rc(
router,
[this](
[[maybe_unused]] const RouterID& rid,
const RouterContact* const rc,
const RCRequestResult res) {
if (res == RCRequestResult::Success)
connect_to(*rc);
/* TODO:
else
RC lookup failure callback here
*/
});
}
// This function assumes the RC has already had its signature verified and connection is allowed.
void
LinkManager::connect_to(RouterContact rc)
{
if (have_connection_to(rc.pubkey))
{
// TODO: connection failed callback
return;
}
// TODO: connection established/failed callbacks
oxen::quic::stream_data_callback stream_cb =
[this](oxen::quic::Stream& stream, bstring_view packet) {
recv_control_message(stream, packet);
};
// TODO: once "compatible link" cares about address, actually choose addr to connect to
// based on which one is compatible with the link we chose. For now, just use
// the first one.
auto& remote_addr = rc.addr;
// TODO: confirm remote end is using the expected pubkey (RouterID).
// TODO: ALPN for "client" vs "relay" (could just be set on endpoint creation)
// TODO: does connect() inherit the endpoint's datagram data callback, and do we want it to if
// so?
if (auto rv = ep.establish_connection(remote_addr, rc, stream_cb, tls_creds); rv)
{
log::info(quic_cat, "Connection to {} successfully established!", remote_addr);
return;
}
log::warning(quic_cat, "Connection to {} successfully established!", remote_addr);
}
void
LinkManager::on_conn_open(oxen::quic::connection_interface& ci)
{
router.loop()->call([]() {});
const auto& scid = ci.scid();
const auto& rid = ep.connid_map[scid];
if (auto itr = pending_conn_msg_queue.find(rid); itr != pending_conn_msg_queue.end())
{
auto& que = itr->second;
while (not que.empty())
{
auto& m = que.top();
(m.is_control) ? ep.conns[rid]->control_stream->send(std::move(m.buf))
: ci.send_datagram(std::move(m.buf));
que.pop();
}
}
};
bool
LinkManager::have_connection_to(const RouterID& remote, bool client_only) const
{
@ -124,10 +219,10 @@ namespace llarp
if (auto rv = ep.deregister_peer(remote); rv)
{
persisting_conns.erase(remote);
log::info(logcat, "Peer {} successfully de-registered");
log::info(logcat, "Peer {} successfully de-registered", remote);
}
else
log::warning(logcat, "Peer {} not found for de-registeration!");
log::warning(logcat, "Peer {} not found for de-registration!", remote);
}
void
@ -208,57 +303,6 @@ namespace llarp
node_db = router.node_db();
}
void
LinkManager::connect_to(RouterID router)
{
auto fn = [this](
[[maybe_unused]] const RouterID& rid,
const RouterContact* const rc,
const RCRequestResult res) {
if (res == RCRequestResult::Success)
connect_to(*rc);
/* TODO:
else
RC lookup failure callback here
*/
};
rc_lookup->get_rc(router, fn);
}
// This function assumes the RC has already had its signature verified and connection is allowed.
void
LinkManager::connect_to(RouterContact rc)
{
if (have_connection_to(rc.pubkey))
{
// TODO: connection failed callback
return;
}
// TODO: connection established/failed callbacks
oxen::quic::stream_data_callback stream_cb =
[this](oxen::quic::Stream& stream, bstring_view packet) {
recv_control_message(stream, packet);
};
// TODO: once "compatible link" cares about address, actually choose addr to connect to
// based on which one is compatible with the link we chose. For now, just use
// the first one.
auto& remote_addr = rc.addr;
// TODO: confirm remote end is using the expected pubkey (RouterID).
// TODO: ALPN for "client" vs "relay" (could just be set on endpoint creation)
// TODO: does connect() inherit the endpoint's datagram data callback, and do we want it to if
// so?
if (auto rv = ep.establish_connection(remote_addr, rc, stream_cb, tls_creds); rv)
{
log::info(quic_cat, "Connection to {} successfully established!", remote_addr);
return;
}
log::warning(quic_cat, "Connection to {} successfully established!", remote_addr);
}
void
LinkManager::connect_to_random(int num_conns)
{

@ -1,5 +1,7 @@
#pragma once
#include "connection.hpp"
#include <llarp/router/rc_lookup_handler.hpp>
#include <llarp/router_contact.hpp>
#include <llarp/peerstats/peer_db.hpp>
@ -13,6 +15,7 @@
#include <atomic>
#include <llarp/util/logging.hpp>
#include <llarp/util/priority_queue.hpp>
namespace
{
@ -91,15 +94,65 @@ namespace llarp
template <>
constexpr inline bool IsToStringFormattable<SessionResult> = true;
struct PendingMessage
{
bstring buf;
uint16_t priority;
bool is_control{false};
PendingMessage(bstring b, uint16_t p, bool c = false)
: buf{std::move(b)}, priority{p}, is_control{c}
{}
};
using MessageQueue = util::ascending_priority_queue<PendingMessage>;
struct Router;
struct LinkManager
{
private:
friend struct link::Endpoint;
std::atomic<bool> stopping;
// DISCUSS: is this necessary? can we reduce the amount of locking and nuke this
mutable util::Mutex m; // protects persisting_conns
// sessions to persist -> timestamp to end persist at
std::unordered_map<RouterID, llarp_time_t> persisting_conns GUARDED_BY(_mutex);
// holds any messages we attempt to send while connections are establishing
std::unordered_map<RouterID, MessageQueue> pending_conn_msg_queue;
util::DecayingHashSet<RouterID> clients{path::default_lifetime};
RCLookupHandler* rc_lookup;
std::shared_ptr<NodeDB> node_db;
oxen::quic::Address addr;
Router& router;
// FIXME: Lokinet currently expects to be able to kill all network functionality before
// finishing other shutdown things, including destroying this class, and that is all in
// Network's destructor, so we need to be able to destroy it before this class.
std::unique_ptr<oxen::quic::Network> quic;
std::shared_ptr<oxen::quic::GNUTLSCreds> tls_creds;
link::Endpoint ep;
void
recv_data_message(oxen::quic::dgram_interface& dgi, bstring dgram);
void
recv_control_message(oxen::quic::Stream& stream, bstring_view packet);
void
on_conn_open(oxen::quic::connection_interface& ci);
public:
explicit LinkManager(Router& r);
bool
send_to(const RouterID& remote, const llarp_buffer_t& buf, uint16_t priority);
send_to(const RouterID& remote, bstring data, uint16_t priority);
bool
have_connection_to(const RouterID& remote, bool client_only = false) const;
@ -156,37 +209,6 @@ namespace llarp
size_t min_connected_routers = 4;
/// hard upperbound limit on the number of router to router connections
size_t max_connected_routers = 6;
private:
friend struct link::Endpoint;
std::atomic<bool> stopping;
// DISCUSS: is this necessary? can we reduce the amount of locking and nuke this
mutable util::Mutex m; // protects persisting_conns
// sessions to persist -> timestamp to end persist at
std::unordered_map<RouterID, llarp_time_t> persisting_conns GUARDED_BY(_mutex);
util::DecayingHashSet<RouterID> clients{path::default_lifetime};
RCLookupHandler* rc_lookup;
std::shared_ptr<NodeDB> node_db;
oxen::quic::Address addr;
Router& router;
// FIXME: Lokinet currently expects to be able to kill all network functionality before
// finishing other shutdown things, including destroying this class, and that is all in
// Network's destructor, so we need to be able to destroy it before this class.
std::unique_ptr<oxen::quic::Network> quic;
std::shared_ptr<oxen::quic::GNUTLSCreds> tls_creds;
link::Endpoint ep;
void
recv_data_message(oxen::quic::dgram_interface& dgi, bstring dgram);
void
recv_control_message(oxen::quic::Stream& stream, bstring_view packet);
};
namespace link
@ -205,13 +227,13 @@ namespace llarp
auto conn_interface =
endpoint->connect(remote, link_manager.tls_creds, dgram_cb, std::forward<Opt>(opts)...);
auto control_stream = conn_interface->get_new_stream();
// TOFIX: get a real RouterID after refactoring RouterID
RouterID rid;
auto [itr, b] = conns.emplace(rid);
// emplace immediately for connection open callback to find scid
connid_map.emplace(conn_interface->scid(), rc.pubkey);
auto [itr, b] = conns.emplace(rc.pubkey);
auto control_stream = conn_interface->get_new_stream();
itr->second = std::make_shared<link::Connection>(conn_interface, rc, control_stream);
connid_map.emplace(conn_interface->scid(), rid);
return true;
}
@ -232,6 +254,7 @@ namespace llarp
- Combine llarp/link/server.hpp::ILinkLayer into llarp/link/endpoint.hpp::Endpoint
- must maintain metadata storage, callbacks, etc
- If: one endpoint for ipv4 and ipv6
- Then: can potentially combine:
- llarp/link/endpoint.hpp
@ -242,4 +265,12 @@ namespace llarp
-> Yields mega-combo endpoint managing object?
- Can avoid "kitchen sink" by greatly reducing complexity of implementation
llarp/router/outbound_message_handler.hpp
- pendingsessionmessagequeue
- establish queue of messages to be sent on a connection we are creating
- upon creation, send these messages in the connection established callback
- if connection times out, flush queue
- TOCHECK: is priority used at all??
*/

@ -1,483 +0,0 @@
#include <llarp/ev/ev.hpp>
#include <llarp/ev/udp_handle.hpp>
#include <llarp/crypto/crypto.hpp>
#include <llarp/config/key_manager.hpp>
#include <memory>
#include <llarp/util/fs.hpp>
#include <utility>
#include <unordered_set>
#include <llarp/router/router.hpp>
#include <oxenc/variant.h>
static constexpr auto LINK_LAYER_TICK_INTERVAL = 100ms;
namespace llarp
{
ILinkLayer::ILinkLayer(
std::shared_ptr<KeyManager> keyManager,
GetRCFunc getrc,
LinkMessageHandler handler,
SignBufferFunc signbuf,
BeforeConnectFunc_t before,
SessionEstablishedHandler establishedSession,
SessionRenegotiateHandler reneg,
TimeoutHandler timeout,
SessionClosedHandler closed,
PumpDoneHandler pumpDone,
WorkerFunc_t work)
: HandleMessage(std::move(handler))
, HandleTimeout(std::move(timeout))
, Sign(std::move(signbuf))
, GetOurRC(std::move(getrc))
, BeforeConnect(std::move(before))
, SessionEstablished(std::move(establishedSession))
, SessionClosed(std::move(closed))
, SessionRenegotiate(std::move(reneg))
, PumpDone(std::move(pumpDone))
, QueueWork(std::move(work))
, m_RouterEncSecret(keyManager->encryptionKey)
, m_SecretKey(keyManager->transportKey)
{}
llarp_time_t
ILinkLayer::Now() const
{
return m_Router->loop()->time_now();
}
bool
ILinkLayer::HasSessionTo(const RouterID& id)
{
Lock_t l(m_AuthedLinksMutex);
return m_AuthedLinks.find(id) != m_AuthedLinks.end();
}
std::shared_ptr<AbstractLinkSession>
ILinkLayer::FindSessionByPubkey(RouterID id)
{
Lock_t l(m_AuthedLinksMutex);
auto itr = m_AuthedLinks.find(id);
if (itr == m_AuthedLinks.end())
return nullptr;
return itr->second;
}
void
ILinkLayer::ForEachSession(
std::function<void(const AbstractLinkSession*)> visit, bool randomize) const
{
std::vector<std::shared_ptr<AbstractLinkSession>> sessions;
{
Lock_t l(m_AuthedLinksMutex);
if (m_AuthedLinks.size() == 0)
return;
const size_t sz = randint() % m_AuthedLinks.size();
auto itr = m_AuthedLinks.begin();
auto begin = itr;
if (randomize)
{
std::advance(itr, sz);
begin = itr;
}
while (itr != m_AuthedLinks.end())
{
sessions.emplace_back(itr->second);
++itr;
}
if (randomize)
{
itr = m_AuthedLinks.begin();
while (itr != begin)
{
sessions.emplace_back(itr->second);
++itr;
}
}
}
for (const auto& session : sessions)
visit(session.get());
}
bool
ILinkLayer::VisitSessionByPubkey(
const RouterID& pk, std::function<bool(AbstractLinkSession*)> visit)
{
std::shared_ptr<AbstractLinkSession> session;
{
Lock_t l(m_AuthedLinksMutex);
auto itr = m_AuthedLinks.find(pk);
if (itr == m_AuthedLinks.end())
return false;
session = itr->second;
}
return visit(session.get());
}
void
ILinkLayer::ForEachSession(std::function<void(AbstractLinkSession*)> visit)
{
std::vector<std::shared_ptr<AbstractLinkSession>> sessions;
{
Lock_t l(m_AuthedLinksMutex);
auto itr = m_AuthedLinks.begin();
while (itr != m_AuthedLinks.end())
{
sessions.emplace_back(itr->second);
++itr;
}
}
for (const auto& s : sessions)
visit(s.get());
}
void
ILinkLayer::Bind(Router* router, SockAddr bind_addr)
{
if (router->Net().IsLoopbackAddress(bind_addr.getIP()))
throw std::runtime_error{"cannot udp bind socket on loopback"};
m_ourAddr = bind_addr;
m_Router = router;
m_udp = m_Router->loop()->make_udp(
[this]([[maybe_unused]] UDPHandle& udp, const SockAddr& from, llarp_buffer_t buf) {
AbstractLinkSession::Packet_t pkt;
pkt.resize(buf.sz);
std::copy_n(buf.base, buf.sz, pkt.data());
RecvFrom(from, std::move(pkt));
});
if (m_udp->listen(m_ourAddr))
return;
throw std::runtime_error{
fmt::format("failed to listen {} udp socket on {}", Name(), m_ourAddr)};
}
void
ILinkLayer::Pump()
{
std::unordered_set<RouterID> closedSessions;
std::vector<std::shared_ptr<AbstractLinkSession>> closedPending;
auto _now = Now();
{
Lock_t l(m_AuthedLinksMutex);
auto itr = m_AuthedLinks.begin();
while (itr != m_AuthedLinks.end())
{
if (not itr->second->TimedOut(_now))
{
itr->second->Pump();
++itr;
}
else
{
llarp::LogInfo("session to ", RouterID(itr->second->GetPubKey()), " timed out");
itr->second->Close();
closedSessions.emplace(itr->first);
UnmapAddr(itr->second->GetRemoteEndpoint());
itr = m_AuthedLinks.erase(itr);
}
}
}
{
Lock_t l(m_PendingMutex);
auto itr = m_Pending.begin();
while (itr != m_Pending.end())
{
if (not itr->second->TimedOut(_now))
{
itr->second->Pump();
++itr;
}
else
{
LogInfo("pending session at ", itr->first, " timed out");
UnmapAddr(itr->second->GetRemoteEndpoint());
// defer call so we can acquire mutexes later
closedPending.emplace_back(std::move(itr->second));
itr = m_Pending.erase(itr);
}
}
}
{
Lock_t l(m_AuthedLinksMutex);
for (const auto& r : closedSessions)
{
if (m_AuthedLinks.count(r) == 0)
{
SessionClosed(r);
}
}
}
for (const auto& pending : closedPending)
{
if (pending->IsInbound())
continue;
HandleTimeout(pending.get());
}
}
void
ILinkLayer::UnmapAddr(const SockAddr& addr)
{
m_AuthedAddrs.erase(addr);
}
bool
ILinkLayer::MapAddr(const RouterID& pk, AbstractLinkSession* s)
{
Lock_t l_authed(m_AuthedLinksMutex);
Lock_t l_pending(m_PendingMutex);
const auto addr = s->GetRemoteEndpoint();
auto itr = m_Pending.find(addr);
if (itr != m_Pending.end())
{
if (m_AuthedLinks.count(pk))
{
LogWarn("too many session for ", pk);
s->Close();
return false;
}
m_AuthedAddrs.emplace(addr, pk);
m_AuthedLinks.emplace(pk, itr->second);
itr = m_Pending.erase(itr);
m_Router->TriggerPump();
return true;
}
return false;
}
util::StatusObject
ILinkLayer::ExtractStatus() const
{
std::vector<util::StatusObject> pending, established;
{
Lock_t l(m_PendingMutex);
std::transform(
m_Pending.cbegin(),
m_Pending.cend(),
std::back_inserter(pending),
[](const auto& item) -> util::StatusObject { return item.second->ExtractStatus(); });
}
{
Lock_t l(m_AuthedLinksMutex);
std::transform(
m_AuthedLinks.cbegin(),
m_AuthedLinks.cend(),
std::back_inserter(established),
[](const auto& item) -> util::StatusObject { return item.second->ExtractStatus(); });
}
return {
{"name", Name()},
{"rank", uint64_t(Rank())},
{"addr", m_ourAddr.ToString()},
{"sessions", util::StatusObject{{"pending", pending}, {"established", established}}}};
}
bool
ILinkLayer::TryEstablishTo(RouterContact rc)
{
{
Lock_t l(m_AuthedLinksMutex);
if (m_AuthedLinks.count(rc.pubkey))
{
LogWarn("Too many links to ", RouterID{rc.pubkey}, ", not establishing another one");
return false;
}
}
llarp::AddressInfo to;
if (not PickAddress(rc, to))
{
LogWarn("router ", RouterID{rc.pubkey}, " has no acceptable inbound addresses");
return false;
}
const SockAddr address{to};
{
Lock_t l(m_PendingMutex);
if (m_Pending.count(address))
{
LogWarn(
"Too many pending connections to ",
address,
" while establishing to ",
RouterID{rc.pubkey},
", not establishing another");
return false;
}
}
std::shared_ptr<AbstractLinkSession> s = NewOutboundSession(rc, to);
if (BeforeConnect)
{
BeforeConnect(std::move(rc));
}
if (not PutSession(s))
{
return false;
}
s->Start();
return true;
}
bool
ILinkLayer::Start()
{
// Tie the lifetime of this repeater to this arbitrary shared_ptr:
m_repeater_keepalive = std::make_shared<int>(0);
m_Router->loop()->call_every(
LINK_LAYER_TICK_INTERVAL, m_repeater_keepalive, [this] { Tick(Now()); });
return true;
}
void
ILinkLayer::Tick(const llarp_time_t now)
{
{
Lock_t l(m_AuthedLinksMutex);
for (const auto& [routerid, link] : m_AuthedLinks)
link->Tick(now);
}
{
Lock_t l(m_PendingMutex);
for (const auto& [addr, link] : m_Pending)
link->Tick(now);
}
{
// decay recently closed list
auto itr = m_RecentlyClosed.begin();
while (itr != m_RecentlyClosed.end())
{
if (itr->second >= now)
itr = m_RecentlyClosed.erase(itr);
else
++itr;
}
}
}
void
ILinkLayer::Stop()
{
m_repeater_keepalive.reset(); // make the repeater kill itself
{
Lock_t l(m_AuthedLinksMutex);
for (const auto& [router, link] : m_AuthedLinks)
link->Close();
}
{
Lock_t l(m_PendingMutex);
for (const auto& [addr, link] : m_Pending)
link->Close();
}
}
void
ILinkLayer::CloseSessionTo(const RouterID& remote)
{
static constexpr auto CloseGraceWindow = 500ms;
const auto now = Now();
{
Lock_t l(m_AuthedLinksMutex);
RouterID r = remote;
llarp::LogInfo("Closing all to ", r);
for (auto [itr, end] = m_AuthedLinks.equal_range(r); itr != end;)
{
itr->second->Close();
m_RecentlyClosed.emplace(itr->second->GetRemoteEndpoint(), now + CloseGraceWindow);
itr = m_AuthedLinks.erase(itr);
}
}
SessionClosed(remote);
}
void
ILinkLayer::KeepAliveSessionTo(const RouterID& remote)
{
Lock_t l(m_AuthedLinksMutex);
for (auto [itr, end] = m_AuthedLinks.equal_range(remote); itr != end; ++itr)
{
if (itr->second->ShouldPing())
{
LogDebug("keepalive to ", remote);
itr->second->SendKeepAlive();
}
}
}
void
ILinkLayer::SendTo_LL(const SockAddr& to, const llarp_buffer_t& pkt)
{
if (not m_udp->send(to, pkt))
LogError("could not send udp packet to ", to);
}
bool
ILinkLayer::SendTo(
const RouterID& remote,
const llarp_buffer_t& buf,
AbstractLinkSession::CompletionHandler completed,
uint16_t priority)
{
std::shared_ptr<AbstractLinkSession> s;
{
Lock_t l(m_AuthedLinksMutex);
// pick lowest backlog session
size_t min = std::numeric_limits<size_t>::max();
for (auto [itr, end] = m_AuthedLinks.equal_range(remote); itr != end; ++itr)
{
if (const auto backlog = itr->second->SendQueueBacklog(); backlog < min)
{
s = itr->second;
min = backlog;
}
}
}
AbstractLinkSession::Message_t pkt(buf.sz);
std::copy_n(buf.base, buf.sz, pkt.begin());
return s && s->SendMessageBuffer(std::move(pkt), completed, priority);
}
bool
ILinkLayer::GetOurAddressInfo(llarp::AddressInfo& addr) const
{
addr.fromSockAddr(m_ourAddr);
addr.dialect = Name();
addr.pubkey = TransportPubKey();
addr.rank = Rank();
return true;
}
const byte_t*
ILinkLayer::TransportPubKey() const
{
return llarp::seckey_topublic(TransportSecretKey());
}
const SecretKey&
ILinkLayer::TransportSecretKey() const
{
return m_SecretKey;
}
bool
ILinkLayer::PutSession(const std::shared_ptr<AbstractLinkSession>& s)
{
Lock_t lock(m_PendingMutex);
const auto address = s->GetRemoteEndpoint();
if (m_Pending.count(address))
return false;
m_Pending.emplace(address, s);
return true;
}
std::optional<int>
ILinkLayer::GetUDPFD() const
{
return m_udp->file_descriptor();
}
} // namespace llarp

@ -18,7 +18,7 @@ namespace llarp
{
struct LRSM_AsyncHandler : public std::enable_shared_from_this<LRSM_AsyncHandler>
{
using HopHandler_ptr = std::shared_ptr<llarp::path::IHopHandler>;
using HopHandler_ptr = std::shared_ptr<llarp::path::AbstractHopHandler>;
std::array<EncryptedFrame, 8> frames;
uint64_t status = 0;

@ -17,7 +17,7 @@ namespace llarp
namespace path
{
struct PathContext;
struct IHopHandler;
struct AbstractHopHandler;
struct TransitHop;
} // namespace path

@ -0,0 +1,36 @@
#include "abstracthophandler.hpp"
#include <llarp/router/router.hpp>
namespace llarp::path
{
// handle data in upstream direction
bool
AbstractHopHandler::HandleUpstream(const llarp_buffer_t& X, const TunnelNonce& Y, Router* r)
{
auto& pkt = m_UpstreamQueue.emplace_back();
pkt.first.resize(X.sz);
std::copy_n(X.base, X.sz, pkt.first.begin());
pkt.second = Y;
r->TriggerPump();
return true;
}
// handle data in downstream direction
bool
AbstractHopHandler::HandleDownstream(const llarp_buffer_t& X, const TunnelNonce& Y, Router* r)
{
auto& pkt = m_DownstreamQueue.emplace_back();
pkt.first.resize(X.sz);
std::copy_n(X.base, X.sz, pkt.first.begin());
pkt.second = Y;
r->TriggerPump();
return true;
}
void
AbstractHopHandler::DecayFilters(llarp_time_t now)
{
m_UpstreamReplayFilter.Decay(now);
m_DownstreamReplayFilter.Decay(now);
}
} // namespace llarp::path

@ -22,12 +22,12 @@ namespace llarp
namespace path
{
struct IHopHandler
struct AbstractHopHandler
{
using TrafficEvent_t = std::pair<std::vector<byte_t>, TunnelNonce>;
using TrafficQueue_t = std::list<TrafficEvent_t>;
virtual ~IHopHandler() = default;
virtual ~AbstractHopHandler() = default;
virtual PathID_t
RXID() const = 0;
@ -90,6 +90,6 @@ namespace llarp
HandleAllDownstream(std::vector<RelayDownstreamMessage> msgs, Router* r) = 0;
};
using HopHandler_ptr = std::shared_ptr<IHopHandler>;
using HopHandler_ptr = std::shared_ptr<AbstractHopHandler>;
} // namespace path
} // namespace llarp

@ -1,39 +0,0 @@
#include "ihophandler.hpp"
#include <llarp/router/router.hpp>
namespace llarp
{
namespace path
{
// handle data in upstream direction
bool
IHopHandler::HandleUpstream(const llarp_buffer_t& X, const TunnelNonce& Y, Router* r)
{
auto& pkt = m_UpstreamQueue.emplace_back();
pkt.first.resize(X.sz);
std::copy_n(X.base, X.sz, pkt.first.begin());
pkt.second = Y;
r->TriggerPump();
return true;
}
// handle data in downstream direction
bool
IHopHandler::HandleDownstream(const llarp_buffer_t& X, const TunnelNonce& Y, Router* r)
{
auto& pkt = m_DownstreamQueue.emplace_back();
pkt.first.resize(X.sz);
std::copy_n(X.base, X.sz, pkt.first.begin());
pkt.second = Y;
r->TriggerPump();
return true;
}
void
IHopHandler::DecayFilters(llarp_time_t now)
{
m_UpstreamReplayFilter.Decay(now);
m_DownstreamReplayFilter.Decay(now);
}
} // namespace path
} // namespace llarp

@ -70,7 +70,7 @@ namespace llarp::path
{
if (not m_UpstreamReplayFilter.Insert(Y))
return false;
return IHopHandler::HandleUpstream(X, Y, r);
return AbstractHopHandler::HandleUpstream(X, Y, r);
}
bool
@ -78,7 +78,7 @@ namespace llarp::path
{
if (not m_DownstreamReplayFilter.Insert(Y))
return false;
return IHopHandler::HandleDownstream(X, Y, r);
return AbstractHopHandler::HandleDownstream(X, Y, r);
}
RouterID
@ -332,9 +332,8 @@ namespace llarp::path
util::StatusObject
PathHopConfig::ExtractStatus() const
{
const auto ip = net::In6ToHUInt(rc.addrs[0].ip);
util::StatusObject obj{
{"ip", ip.ToString()},
{"ip", rc.addr.to_string()},
{"lifetime", to_json(lifetime)},
{"router", rc.pubkey.ToHex()},
{"txid", txID.ToHex()},

@ -4,7 +4,7 @@
#include <llarp/crypto/encrypted_frame.hpp>
#include <llarp/crypto/types.hpp>
#include <llarp/messages/relay.hpp>
#include "ihophandler.hpp"
#include "abstracthophandler.hpp"
#include "path_types.hpp"
#include "pathbuilder.hpp"
#include "pathset.hpp"
@ -69,7 +69,7 @@ namespace llarp
}
/// A path we made
struct Path final : public IHopHandler,
struct Path final : public AbstractHopHandler,
public routing::AbstractRoutingMessageHandler,
public std::enable_shared_from_this<Path>
{

@ -2,7 +2,7 @@
#include <llarp/crypto/encrypted_frame.hpp>
#include <llarp/net/ip_address.hpp>
#include "ihophandler.hpp"
#include "abstracthophandler.hpp"
#include "path_types.hpp"
#include "pathset.hpp"
#include "transit_hop.hpp"

@ -1,6 +1,7 @@
#pragma once
#include "path_types.hpp"
#include <llarp/router_contact.hpp>
#include <llarp/service/protocol_type.hpp>
#include <llarp/router_id.hpp>
#include <llarp/routing/message.hpp>

@ -27,7 +27,7 @@ namespace llarp::path
}
TransitHop::TransitHop()
: IHopHandler{}
: AbstractHopHandler{}
, m_UpstreamGather{transit_hop_queue_size}
, m_DownstreamGather{transit_hop_queue_size}
{

@ -1,7 +1,7 @@
#pragma once
#include <llarp/constants/path.hpp>
#include <llarp/path/ihophandler.hpp>
#include <llarp/path/abstracthophandler.hpp>
#include <llarp/path/path_types.hpp>
#include <llarp/routing/handler.hpp>
#include <llarp/router_id.hpp>
@ -52,7 +52,7 @@ namespace llarp
< std::tie(rhs.txID, rhs.rxID, rhs.upstream, rhs.downstream);
}
struct TransitHop : public IHopHandler,
struct TransitHop : public AbstractHopHandler,
public routing::AbstractRoutingMessageHandler,
std::enable_shared_from_this<TransitHop>
{

@ -1,8 +1,7 @@
#include "outbound_session_maker.hpp"
#include "Router.hpp"
#include "router.hpp"
#include <llarp/tooling/peer_stats_event.hpp>
#include <llarp/link/server.hpp>
#include <llarp/router_contact.hpp>
#include <llarp/nodedb.hpp>
#include "rc_lookup_handler.hpp"

@ -4,6 +4,7 @@
#include <llarp/util/time.hpp>
#include <llarp/constants/link_layer.hpp>
#include <llarp/tooling/rc_event.hpp>
#include <llarp/link/link_manager.hpp>
namespace llarp
{
@ -16,11 +17,11 @@ namespace llarp
{}
void
RCGossiper::Init(LinkManager* l, const RouterID& ourID, Router* router)
RCGossiper::Init(LinkManager* l, const RouterID& ourID, Router* r)
{
rid = ourID;
link_manager = l;
router = router;
router = r;
}
bool

@ -2,12 +2,13 @@
#include <llarp/util/decaying_hashset.hpp>
#include "outbound_message_handler.hpp"
#include <llarp/link/link_manager.hpp>
namespace llarp
{
/// The maximum number of peers we will flood a gossiped RC to when propagating an RC
constexpr size_t MaxGossipPeers = 20;
struct LinkManager;
struct RouterContact;
struct RCGossiper
{

@ -46,20 +46,18 @@ namespace llarp
static auto logcat = log::Cat("router");
Router::Router(EventLoop_ptr loop, std::shared_ptr<vpn::Platform> vpnPlatform)
: _lmq{std::make_shared<oxenmq::OxenMQ>()}
: _route_poker{std::make_shared<RoutePoker>()}
, _lmq{std::make_shared<oxenmq::OxenMQ>()}
, _loop{std::move(loop)}
, _vpn{std::move(vpnPlatform)}
, paths{this}
, _exitContext{this}
, _exit_context{this}
, _dht{dht::make_handler()}
, _disk_thread{_lmq->add_tagged_thread("disk")}
, inbound_link_msg_parser{this}
, _hiddenServiceContext{this}
, m_RoutePoker{std::make_shared<RoutePoker>()}
, m_RPCServer{nullptr}
, _randomStartDelay{
platform::is_simulation ? std::chrono::milliseconds{(llarp::randint() % 1250) + 2000}
: 0s}
, _randomStartDelay{platform::is_simulation ? std::chrono::milliseconds{(llarp::randint() % 1250) + 2000} : 0s}
, inbound_link_msg_parser{this}
, _hidden_service_context{this}
{
_key_manager = std::make_shared<KeyManager>();
// for lokid, so we don't close the connection when syncing the whitelist
@ -67,7 +65,7 @@ namespace llarp
is_stopping.store(false);
is_running.store(false);
_lastTick = llarp::time_now_ms();
m_NextExploreAt = std::chrono::steady_clock::now();
_next_explore_at = std::chrono::steady_clock::now();
loop_wakeup = _loop->make_waker([this]() { PumpLL(); });
}
@ -86,11 +84,12 @@ namespace llarp
return;
paths.PumpDownstream();
paths.PumpUpstream();
_hiddenServiceContext.Pump();
_outboundMessageHandler.Pump();
_hidden_service_context.Pump();
// _outboundMessageHandler.Pump();
llarp::LogTrace("Router::PumpLL() end");
}
// TOFIX: this
util::StatusObject
Router::ExtractStatus() const
{
@ -101,10 +100,10 @@ namespace llarp
{"running", true},
{"numNodesKnown", _node_db->NumLoaded()},
{"dht", _dht->ExtractStatus()},
{"services", _hiddenServiceContext.ExtractStatus()},
{"exit", _exitContext.ExtractStatus()},
{"services", _hidden_service_context.ExtractStatus()},
{"exit", _exit_context.ExtractStatus()},
{"links", _link_manager.extract_status()},
{"outboundMessages", _outboundMessageHandler.ExtractStatus()}};
/* {"outboundMessages", _outboundMessageHandler.ExtractStatus()} */};
}
// TODO: investigate changes needed for libquic integration
@ -114,7 +113,7 @@ namespace llarp
if (!is_running)
return util::StatusObject{{"running", false}};
auto services = _hiddenServiceContext.ExtractStatus();
auto services = _hidden_service_context.ExtractStatus();
auto link_types = _link_manager.extract_status();
@ -411,8 +410,8 @@ namespace llarp
{
llarp::sys::service_manager->starting();
config = std::move(c);
auto& conf = *config;
_config = std::move(c);
auto& conf = *_config;
// Do logging config as early as possible to get the configured log level applied
@ -429,7 +428,7 @@ namespace llarp
log::add_sink(log_type, log_type == log::Type::System ? "lokinet" : conf.logging.m_logFile);
// re-add rpc log sink if rpc enabled, else free it
if (config->api.m_enableRPCServer and llarp::logRingBuffer)
if (_config->api.m_enableRPCServer and llarp::logRingBuffer)
log::add_sink(llarp::logRingBuffer, llarp::log::DEFAULT_PATTERN_MONO);
else
llarp::logRingBuffer = nullptr;
@ -439,7 +438,7 @@ namespace llarp
follow_whitelist = conf.lokid.whitelistRouters;
if (follow_whitelist)
{
lokidRPCAddr = oxenmq::address(conf.lokid.lokidRPCAddr);
rpc_addr = oxenmq::address(conf.lokid.lokidRPCAddr);
_rpc_client = std::make_shared<rpc::LokidRpcClient>(_lmq, weak_from_this());
}
@ -461,7 +460,7 @@ namespace llarp
if (follow_whitelist)
{
_rpc_client->ConnectAsync(lokidRPCAddr);
_rpc_client->ConnectAsync(rpc_addr);
}
log::debug(logcat, "Initializing key manager");
@ -520,7 +519,7 @@ namespace llarp
{
// If we're in the white or gray list then we *should* be establishing connections to other
// routers, so if we have almost no peers then something is almost certainly wrong.
if (appear_funder() and insufficient_peers())
if (appears_funded() and insufficient_peers())
return "too few peer connections; lokinet is not adequately connected to the network";
return std::nullopt;
}
@ -544,21 +543,21 @@ namespace llarp
}
bool
Router::appear_decommed() const
Router::appears_decommed() const
{
return IsServiceNode() and follow_whitelist and _rc_lookup_handler.has_received_whitelist()
and _rc_lookup_handler.is_grey_listed(pubkey());
}
bool
Router::appear_funder() const
Router::appears_funded() const
{
return IsServiceNode() and follow_whitelist and _rc_lookup_handler.has_received_whitelist()
and _rc_lookup_handler.is_session_allowed(pubkey());
}
bool
Router::appear_registered() const
Router::appears_registered() const
{
return IsServiceNode() and follow_whitelist and _rc_lookup_handler.has_received_whitelist()
and _rc_lookup_handler.is_registered(pubkey());
@ -585,7 +584,7 @@ namespace llarp
bool
Router::PathToRouterAllowed(const RouterID& router) const
{
if (appear_decommed())
if (appears_decommed())
{
// we are decom'd don't allow any paths outbound at all
return false;
@ -728,22 +727,22 @@ namespace llarp
}
}
bootstrapRCList.clear();
bootstrap_rc_list.clear();
for (const auto& router : configRouters)
{
log::debug(logcat, "Loading bootstrap router list from {}", defaultBootstrapFile);
bootstrapRCList.AddFromFile(router);
bootstrap_rc_list.AddFromFile(router);
}
for (const auto& rc : conf.bootstrap.routers)
{
bootstrapRCList.emplace(rc);
bootstrap_rc_list.emplace(rc);
}
// in case someone has an old bootstrap file and is trying to use a bootstrap
// that no longer exists
auto clearBadRCs = [this]() {
for (auto it = bootstrapRCList.begin(); it != bootstrapRCList.end();)
for (auto it = bootstrap_rc_list.begin(); it != bootstrap_rc_list.end();)
{
if (it->IsObsoleteBootstrap())
log::warning(logcat, "ignoring obsolete boostrap RC: {}", RouterID{it->pubkey});
@ -755,22 +754,23 @@ namespace llarp
continue;
}
// we are in one of the above error cases that we warned about:
it = bootstrapRCList.erase(it);
it = bootstrap_rc_list.erase(it);
}
};
clearBadRCs();
if (bootstrapRCList.empty() and not conf.bootstrap.seednode)
if (bootstrap_rc_list.empty() and not conf.bootstrap.seednode)
{
auto fallbacks = llarp::load_bootstrap_fallbacks();
if (auto itr = fallbacks.find(router_contact.netID.ToString()); itr != fallbacks.end())
{
bootstrapRCList = itr->second;
log::debug(logcat, "loaded {} default fallback bootstrap routers", bootstrapRCList.size());
bootstrap_rc_list = itr->second;
log::debug(
logcat, "loaded {} default fallback bootstrap routers", bootstrap_rc_list.size());
clearBadRCs();
}
if (bootstrapRCList.empty() and not conf.bootstrap.seednode)
if (bootstrap_rc_list.empty() and not conf.bootstrap.seednode)
{
// empty after trying fallback, if set
log::error(
@ -785,7 +785,7 @@ namespace llarp
if (conf.bootstrap.seednode)
LogInfo("we are a seed node");
else
LogInfo("Loaded ", bootstrapRCList.size(), " bootstrap routers");
LogInfo("Loaded ", bootstrap_rc_list.size(), " bootstrap routers");
// Init components after relevant config settings loaded
_outboundMessageHandler.Init(this);
@ -796,9 +796,9 @@ namespace llarp
_loop,
util::memFn(&Router::queue_work, this),
&_link_manager,
&_hiddenServiceContext,
&_hidden_service_context,
strictConnectPubkeys,
bootstrapRCList,
bootstrap_rc_list,
follow_whitelist,
is_service_node);
@ -809,20 +809,20 @@ namespace llarp
InitOutboundLinks();
// profiling
_profilesFile = conf.router.m_dataDir / "profiles.dat";
_profile_file = conf.router.m_dataDir / "profiles.dat";
// Network config
if (conf.network.m_enableProfiling.value_or(false))
{
LogInfo("router profiling enabled");
if (not fs::exists(_profilesFile))
if (not fs::exists(_profile_file))
{
LogInfo("no profiles file at ", _profilesFile, " skipping");
LogInfo("no profiles file at ", _profile_file, " skipping");
}
else
{
LogInfo("loading router profiles from ", _profilesFile);
router_profiling().Load(_profilesFile);
LogInfo("loading router profiles from ", _profile_file);
router_profiling().Load(_profile_file);
}
}
else
@ -850,8 +850,8 @@ namespace llarp
Router::IsBootstrapNode(const RouterID r) const
{
return std::count_if(
bootstrapRCList.begin(),
bootstrapRCList.end(),
bootstrap_rc_list.begin(),
bootstrap_rc_list.end(),
[r](const RouterContact& rc) -> bool { return rc.pubkey == r; })
> 0;
}
@ -868,7 +868,7 @@ namespace llarp
{
const auto now = Now();
LogInfo(node_db()->NumLoaded(), " RCs loaded");
LogInfo(bootstrapRCList.size(), " bootstrap peers");
LogInfo(bootstrap_rc_list.size(), " bootstrap peers");
LogInfo(NumberOfConnectedRouters(), " router connections");
if (IsServiceNode())
{
@ -964,12 +964,12 @@ namespace llarp
const bool gotWhitelist = _rc_lookup_handler.has_received_whitelist();
const bool isSvcNode = IsServiceNode();
const bool decom = appear_decommed();
const bool decom = appears_decommed();
bool shouldGossip = isSvcNode and follow_whitelist and gotWhitelist
and _rc_lookup_handler.is_session_allowed(pubkey());
if (isSvcNode
and (router_contact.ExpiresSoon(now, std::chrono::milliseconds(randint() % 10000)) or (now - router_contact.last_updated) > rcRegenInterval))
and (router_contact.ExpiresSoon(now, std::chrono::milliseconds(randint() % 10000)) or (now - router_contact.last_updated) > rc_regen_interval))
{
LogInfo("regenerating RC");
if (update_rc())
@ -1068,10 +1068,10 @@ namespace llarp
const int interval = isSvcNode ? 5 : 2;
const auto timepoint_now = std::chrono::steady_clock::now();
if (timepoint_now >= m_NextExploreAt and not decom)
if (timepoint_now >= _next_explore_at and not decom)
{
_rc_lookup_handler.explore_network();
m_NextExploreAt = timepoint_now + std::chrono::seconds(interval);
_next_explore_at = timepoint_now + std::chrono::seconds(interval);
}
size_t connectToNum = _link_manager.min_connected_routers;
const auto strictConnect = _rc_lookup_handler.num_strict_connect_routers();
@ -1080,10 +1080,10 @@ namespace llarp
connectToNum = strictConnect;
}
if (isSvcNode and now >= next_decomm_warning)
if (isSvcNode and now >= _next_decomm_warning)
{
constexpr auto DecommissionWarnInterval = 5min;
if (auto registered = appear_registered(), funded = appear_funder();
if (auto registered = appears_registered(), funded = appears_funded();
not(registered and funded and not decom))
{
// complain about being deregistered/decommed/unfunded
@ -1093,7 +1093,7 @@ namespace llarp
not registered ? "deregistered"
: decom ? "decommissioned"
: "not fully staked");
next_decomm_warning = now + DecommissionWarnInterval;
_next_decomm_warning = now + DecommissionWarnInterval;
}
else if (insufficient_peers())
{
@ -1101,26 +1101,26 @@ namespace llarp
logcat,
"We appear to be an active service node, but have only {} known peers.",
node_db()->NumLoaded());
next_decomm_warning = now + DecommissionWarnInterval;
_next_decomm_warning = now + DecommissionWarnInterval;
}
}
// if we need more sessions to routers and we are not a service node kicked from the network or
// we are a client we shall connect out to others
if (connected < connectToNum and (appear_funder() or not isSvcNode))
if (connected < connectToNum and (appears_funded() or not isSvcNode))
{
size_t dlt = connectToNum - connected;
LogDebug("connecting to ", dlt, " random routers to keep alive");
_link_manager.connect_to_random(dlt);
}
_hiddenServiceContext.Tick(now);
_exitContext.Tick(now);
_hidden_service_context.Tick(now);
_exit_context.Tick(now);
// save profiles
if (router_profiling().ShouldSave(now) and config->network.m_saveProfiles)
if (router_profiling().ShouldSave(now) and _config->network.m_saveProfiles)
{
queue_disk_io([&]() { router_profiling().Save(_profilesFile); });
queue_disk_io([&]() { router_profiling().Save(_profile_file); });
}
_node_db->Tick(now);
@ -1276,7 +1276,7 @@ namespace llarp
bool
Router::StartRpcServer()
{
if (config->api.m_enableRPCServer)
if (_config->api.m_enableRPCServer)
m_RPCServer = std::make_unique<rpc::RPCServer>(_lmq, *this);
return true;
@ -1365,7 +1365,7 @@ namespace llarp
_dht->Init(llarp::dht::Key_t(pubkey()), this);
for (const auto& rc : bootstrapRCList)
for (const auto& rc : bootstrap_rc_list)
{
node_db()->Put(rc);
_dht->Nodes()->PutNode(rc);
@ -1375,9 +1375,9 @@ namespace llarp
LogInfo("have ", _node_db->NumLoaded(), " routers");
_loop->call_every(ROUTER_TICK_INTERVAL, weak_from_this(), [this] { Tick(); });
m_RoutePoker->Start(this);
_route_poker->Start(this);
is_running.store(true);
_startedAt = Now();
_started_at = Now();
if (follow_whitelist)
{
// do service node testing if we are in service node whitelist mode
@ -1470,8 +1470,8 @@ namespace llarp
Router::Uptime() const
{
const llarp_time_t _now = Now();
if (_startedAt > 0s && _now > _startedAt)
return _now - _startedAt;
if (_started_at > 0s && _now > _started_at)
return _now - _started_at;
return 0s;
}
@ -1515,7 +1515,7 @@ namespace llarp
LogWarn("stopping router hard");
llarp::sys::service_manager->stopping();
hiddenServiceContext().StopAll();
_exitContext.Stop();
_exit_context.Stop();
StopLinks();
Close();
}
@ -1544,7 +1544,7 @@ namespace llarp
hiddenServiceContext().StopAll();
llarp::sys::service_manager->stopping();
log::debug(logcat, "stopping exit context");
_exitContext.Stop();
_exit_context.Stop();
llarp::sys::service_manager->stopping();
log::debug(logcat, "final upstream pump");
paths.PumpUpstream();
@ -1587,7 +1587,7 @@ namespace llarp
LogInfo("accepting transit traffic");
paths.AllowTransit();
_dht->AllowTransit() = true;
_exitContext.AddExitEndpoint("default", config->network, config->dns);
_exit_context.AddExitEndpoint("default", _config->network, _config->dns);
return true;
}
@ -1632,40 +1632,23 @@ namespace llarp
return ep and ep->HasExit();
}
std::optional<std::variant<nuint32_t, nuint128_t>>
oxen::quic::Address
Router::public_ip() const
{
if (_ourAddress)
return _ourAddress->getIP();
std::optional<std::variant<nuint32_t, nuint128_t>> found;
/*
*TODO: change to use new LinkManager foreach semantics, or make function for this
* on LinkManager itself
*
_linkManager.ForEachInboundLink([&found](const auto& link) {
if (found)
return;
AddressInfo ai;
if (link->GetOurAddressInfo(ai))
found = ai.IP();
});
*
*
*/
return found;
return _local_addr;
}
void
Router::InitInboundLinks()
{
auto addrs = config->links.InboundListenAddrs;
auto addrs = _config->links.InboundListenAddrs;
if (is_service_node and addrs.empty())
{
LogInfo("Inferring Public Address");
auto maybe_port = config->links.PublicPort;
if (config->router.PublicPort and not maybe_port)
maybe_port = config->router.PublicPort;
auto maybe_port = _config->links.PublicPort;
if (_config->router.PublicPort and not maybe_port)
maybe_port = _config->router.PublicPort;
if (not maybe_port)
maybe_port = net::port_t::from_host(constants::DefaultInboundIWPPort);
@ -1686,7 +1669,7 @@ namespace llarp
if (net().IsWildcardAddress(bind_addr.getIP()))
{
if (auto maybe_ip = OurPublicIP())
if (auto maybe_ip = public_ip())
bind_addr.setIP(*maybe_ip);
else
throw std::runtime_error{"no public ip provided for inbound socket"};
@ -1695,7 +1678,7 @@ namespace llarp
AddressInfo ai;
ai.fromSockAddr(bind_addr);
_linkManager.connect_to({ai.IPString(), ai.port}, true);
_link_manager.connect_to({ai.IPString(), ai.port}, true);
ai.pubkey = llarp::seckey_topublic(_identity);
ai.dialect = "quicinet"; // FIXME: constant, also better name?

@ -63,7 +63,15 @@ namespace llarp
struct Router : std::enable_shared_from_this<Router>
{
explicit Router(EventLoop_ptr loop, std::shared_ptr<vpn::Platform> vpnPlatform);
~Router();
private:
std::shared_ptr<RoutePoker> _route_poker;
/// bootstrap RCs
BootstrapList bootstrap_rc_list;
std::chrono::steady_clock::time_point _next_explore_at;
llarp_time_t last_pump = 0s;
// transient iwp encryption key
fs::path transport_keyfile;
@ -89,27 +97,53 @@ namespace llarp
bool is_service_node = false;
std::optional<SockAddr> _ourAddress;
oxen::quic::Address local;
oxen::quic::Address _local_addr;
EventLoop_ptr _loop;
std::shared_ptr<vpn::Platform> _vpn;
path::PathContext paths;
exit::Context _exitContext;
exit::Context _exit_context;
SecretKey _identity;
SecretKey _encryption;
std::shared_ptr<dht::AbstractDHTMessageHandler> _dht;
std::shared_ptr<NodeDB> _node_db;
llarp_time_t _startedAt;
llarp_time_t _started_at;
const oxenmq::TaggedThreadID _disk_thread;
oxen::quic::Network _net;
llarp_time_t _last_stats_report = 0s;
llarp_time_t next_decomm_warning = time_now_ms() + 15s;
llarp_time_t _next_decomm_warning = time_now_ms() + 15s;
std::shared_ptr<llarp::KeyManager> _key_manager;
std::shared_ptr<PeerDb> _peer_db;
std::shared_ptr<Config> _config;
uint32_t _path_build_count = 0;
std::unique_ptr<rpc::RPCServer> m_RPCServer;
const llarp_time_t _randomStartDelay;
std::shared_ptr<rpc::LokidRpcClient> _rpc_client;
oxenmq::address rpc_addr;
Profiling _router_profiling;
fs::path _profile_file;
OutboundMessageHandler _outboundMessageHandler;
LinkManager _link_manager{*this};
RCLookupHandler _rc_lookup_handler;
RCGossiper _rcGossiper;
/// how often do we resign our RC? milliseconds.
// TODO: make configurable
llarp_time_t rc_regen_interval = 1h;
// should we be sending padded messages every interval?
bool send_padding = false;
LinkMessageParser inbound_link_msg_parser;
routing::InboundMessageParser inbound_routing_msg_parser;
service::Context _hidden_service_context;
consensus::reachability_testing router_testing;
bool
@ -208,7 +242,7 @@ namespace llarp
exit::Context&
exitContext()
{
return _exitContext;
return _exit_context;
}
const std::shared_ptr<KeyManager>&
@ -265,7 +299,7 @@ namespace llarp
return router_contact;
}
std::optional<std::variant<nuint32_t, nuint128_t>>
oxen::quic::Address
public_ip() const;
util::StatusObject
@ -306,18 +340,18 @@ namespace llarp
/// return true if we look like we are a decommissioned service node
bool
appear_decommed() const;
appears_decommed() const;
/// return true if we look like we are a registered, fully-staked service node (either active or
/// decommissioned). This condition determines when we are allowed to (and attempt to) connect
/// to other peers when running as a service node.
bool
appear_funded() const;
appears_funded() const;
/// return true if we a registered service node; not that this only requires a partial stake,
/// and does not imply that this service node is *active* or fully funded.
bool
appear_registered() const;
appears_registered() const;
/// return true if we look like we are allowed and able to test other routers
bool
@ -329,28 +363,16 @@ namespace llarp
bool
Sign(Signature& sig, const llarp_buffer_t& buf) const;
/// how often do we resign our RC? milliseconds.
// TODO: make configurable
llarp_time_t rcRegenInterval = 1h;
// should we be sending padded messages every interval?
bool sendPadding = false;
LinkMessageParser inbound_link_msg_parser;
routing::InboundMessageParser inbound_routing_msg_parser;
service::Context _hiddenServiceContext;
service::Context&
hiddenServiceContext()
{
return _hiddenServiceContext;
return _hidden_service_context;
}
const service::Context&
hiddenServiceContext() const
{
return _hiddenServiceContext;
return _hidden_service_context;
}
llarp_time_t _lastTick = 0s;
@ -370,51 +392,24 @@ namespace llarp
return now <= _lastTick || (now - _lastTick) <= llarp_time_t{30000};
}
/// bootstrap RCs
BootstrapList bootstrapRCList;
const std::shared_ptr<RoutePoker>&
routePoker() const
{
return m_RoutePoker;
return _route_poker;
}
std::shared_ptr<RoutePoker> m_RoutePoker;
void
TriggerPump();
void
PumpLL();
std::unique_ptr<rpc::RPCServer> m_RPCServer;
const llarp_time_t _randomStartDelay;
std::shared_ptr<rpc::LokidRpcClient> _rpc_client;
oxenmq::address lokidRPCAddr;
Profiling _router_profiling;
fs::path _profilesFile;
OutboundMessageHandler _outboundMessageHandler;
LinkManager _link_manager{*this};
RCLookupHandler _rc_lookup_handler;
RCGossiper _rcGossiper;
std::string
status_line();
using TimePoint_t = std::chrono::steady_clock::time_point;
TimePoint_t m_NextExploreAt;
void
GossipRCIfNeeded(const RouterContact rc);
explicit Router(EventLoop_ptr loop, std::shared_ptr<vpn::Platform> vpnPlatform);
~Router();
bool
HandleRecvLinkMessageBuffer(AbstractLinkSession* from, const llarp_buffer_t& msg);

@ -123,8 +123,7 @@ namespace llarp
{
std::string result;
auto out = std::back_inserter(result);
for (const auto& addr : addrs)
fmt::format_to(out, "ai_addr={}; ai_pk={}; ", addr.toIpAddress(), addr.pubkey);
fmt::format_to(out, "addr={}; pk={}", addr.to_string(), pubkey);
fmt::format_to(out, "updated={}; onion_pk={}; ", last_updated.count(), enckey.ToHex());
if (routerVersion.has_value())
fmt::format_to(out, "router_version={}; ", *routerVersion);

@ -9,5 +9,4 @@ namespace llarp::util
template <typename T, typename Container = std::vector<T>>
using ascending_priority_queue =
std::priority_queue<T, Container, std::greater<typename Container::value_type>>;
} // namespace llarp::util

Loading…
Cancel
Save