libquic bparser merged

- bumped version to latest main branch commit
- wired up callbacks to set RPC request stream on creation
- methods for I/O of control and data messages through link_manager
pull/2204/head
dr7ana 9 months ago
parent a3e6cec7e7
commit d0c3837384

@ -1 +1 @@
Subproject commit 009d61e251cf4cfaa70f224b13c4bb7e905d734b
Subproject commit ba1958311a4ff672af6d7d947c46c13ac177a711

@ -4,7 +4,7 @@ namespace llarp::link
{
Connection::Connection(
std::shared_ptr<oxen::quic::connection_interface>& c,
std::shared_ptr<oxen::quic::Stream>& s,
std::shared_ptr<oxen::quic::BTRequestStream>& s,
RouterContact& rc)
: conn{c}, control_stream{s}, remote_rc{std::move(rc)}
{}

@ -3,14 +3,14 @@
#include <llarp/router_id.hpp>
#include <llarp/router_contact.hpp>
#include <external/oxen-libquic/include/quic.hpp>
#include <quic.hpp>
namespace llarp::link
{
struct Connection
{
std::shared_ptr<oxen::quic::connection_interface> conn;
std::shared_ptr<oxen::quic::Stream> control_stream;
std::shared_ptr<oxen::quic::BTRequestStream> control_stream;
RouterContact remote_rc;
// one side of a connection will be responsible for some things, e.g. heartbeat
@ -19,7 +19,12 @@ namespace llarp::link
Connection(
std::shared_ptr<oxen::quic::connection_interface>& c,
std::shared_ptr<oxen::quic::Stream>& s,
std::shared_ptr<oxen::quic::BTRequestStream>& s,
RouterContact& rc);
};
} // namespace llarp::link
/*
TODO:
- make control_stream a weak pointer?
*/

@ -21,6 +21,15 @@ namespace llarp
return nullptr;
}
std::shared_ptr<link::Connection>
Endpoint::get_conn(const RouterID& rid) const
{
if (auto itr = conns.find(rid); itr != conns.end())
return itr->second;
return nullptr;
}
bool
Endpoint::have_conn(const RouterID& remote, bool client_only) const
{
@ -118,35 +127,64 @@ namespace llarp
return ep.for_each_connection(func);
}
// TODO: pass connection open callback to endpoint constructor!
std::shared_ptr<oxen::quic::Endpoint>
LinkManager::startup_endpoint()
{
/** Parameters:
- local bind address
- conection open callback
- connection close callback
- stream constructor callback
- will return a BTRequestStream on the first call to get_new_stream<BTRequestStream>
*/
return quic->endpoint(
router.public_ip(),
[this](oxen::quic::connection_interface& ci) { return on_conn_open(ci); },
[this](oxen::quic::connection_interface& ci, uint64_t ec) {
return on_conn_closed(ci, ec);
},
[this](oxen::quic::dgram_interface& di, bstring dgram) { recv_data_message(di, dgram); },
[&](oxen::quic::Connection& c,
oxen::quic::Endpoint& e,
std::optional<int64_t> id) -> std::shared_ptr<oxen::quic::Stream> {
if (id && id == 0)
{
return std::make_shared<oxen::quic::BTRequestStream>(
c, e, [this](oxen::quic::message msg) { return recv_control_message(msg); });
}
return std::make_shared<oxen::quic::Stream>(c, e);
});
}
LinkManager::LinkManager(Router& r)
: router{r}
, quic{std::make_unique<oxen::quic::Network>()}
, tls_creds{oxen::quic::GNUTLSCreds::make_from_ed_keys(
{reinterpret_cast<const char*>(router.identity().data()), size_t{32}},
{reinterpret_cast<const char*>(router.identity().toPublic().data()), size_t{32}})}
, ep{quic->endpoint(
router.public_ip(),
[this](oxen::quic::connection_interface& ci) { return on_conn_open(ci); },
[this](oxen::quic::connection_interface& ci, uint64_t ec) {
return on_conn_closed(ci, ec);
}),
*this}
, ep{startup_endpoint(), *this}
{}
// TODO: replace with control/data message sending with libquic
bool
LinkManager::send_to(const RouterID& remote, bstring data, uint16_t priority)
LinkManager::send_control_message(
const RouterID& remote, std::string endpoint, std::string body, bool is_request)
{
if (is_stopping)
return false;
if (not have_connection_to(remote))
if (auto conn = ep.get_conn(remote); conn)
{
auto pending = PendingMessage(data, priority);
(is_request) ? conn->control_stream->request(endpoint, body)
: conn->control_stream->command(endpoint, body);
return true;
}
router.loop()->call([&]() {
auto pending = PendingControlMessage(body, endpoint);
auto [itr, b] = pending_conn_msg_queue.emplace(remote, MessageQueue());
itr->second.push(std::move(pending));
itr->second.push_back(std::move(pending));
rc_lookup->get_rc(
remote,
@ -159,14 +197,41 @@ namespace llarp
else
log::warning(quic_cat, "Do something intelligent here for error handling");
});
});
return false;
}
// TODO: some error callback to report message send failure
// or, should we connect and pass a send-msg callback as the connection successful cb?
bool
LinkManager::send_data_message(const RouterID& remote, std::string body)
{
if (is_stopping)
return false;
if (auto conn = ep.get_conn(remote); conn)
{
conn->conn->send_datagram(std::move(body));
return true;
}
// TODO: send the message
// TODO: if we keep bool return type, change this accordingly
router.loop()->call([&]() {
auto pending = PendingDataMessage(body);
auto [itr, b] = pending_conn_msg_queue.emplace(remote, MessageQueue());
itr->second.push_back(std::move(pending));
rc_lookup->get_rc(
remote,
[this](
[[maybe_unused]] const RouterID& rid,
const RouterContact* const rc,
const RCRequestResult res) {
if (res == RCRequestResult::Success)
connect_to(*rc);
else
log::warning(quic_cat, "Do something intelligent here for error handling");
});
});
return false;
}
@ -199,28 +264,18 @@ namespace llarp
void
LinkManager::connect_to(RouterContact rc)
{
if (have_connection_to(rc.pubkey))
if (auto conn = ep.get_conn(rc.pubkey); conn)
{
// TODO: connection failed callback
// TODO: should implement some connection failed logic, but not the same logic that
// would be executed for another failure case
return;
}
// TODO: connection established/failed callbacks
oxen::quic::stream_data_callback stream_cb =
[this](oxen::quic::Stream& stream, bstring_view packet) {
recv_control_message(stream, packet);
};
// TODO: once "compatible link" cares about address, actually choose addr to connect to
// based on which one is compatible with the link we chose. For now, just use
// the first one.
auto& remote_addr = rc.addr;
// TODO: confirm remote end is using the expected pubkey (RouterID).
// TODO: ALPN for "client" vs "relay" (could just be set on endpoint creation)
// TODO: does connect() inherit the endpoint's datagram data callback, and do we want it to if
// so?
if (auto rv = ep.establish_connection(remote_addr, rc, stream_cb, tls_creds); rv)
if (auto rv = ep.establish_connection(remote_addr, rc, tls_creds); rv)
{
log::info(quic_cat, "Connection to {} successfully established!", remote_addr);
return;
@ -236,18 +291,29 @@ namespace llarp
const auto& scid = conn_interface.scid();
const auto& rid = ep.connid_map[scid];
// check to see if this connection was established while we were attempting to queue
// messages to the remote
if (auto itr = pending_conn_msg_queue.find(rid); itr != pending_conn_msg_queue.end())
{
auto& que = itr->second;
while (not que.empty())
{
auto& m = que.top();
(m.is_control) ? ep.conns[rid]->control_stream->send(std::move(m.buf))
: conn_interface.send_datagram(std::move(m.buf));
auto& m = que.front();
if (m.is_control)
{
auto& msg = reinterpret_cast<PendingControlMessage&>(m);
msg.is_request ? ep.conns[rid]->control_stream->request(msg.endpoint, msg.body)
: ep.conns[rid]->control_stream->command(msg.endpoint, msg.body);
}
else
{
auto& msg = reinterpret_cast<PendingDataMessage&>(m);
conn_interface.send_datagram(std::move(msg.body));
}
que.pop();
que.pop_front();
}
}
});
@ -410,9 +476,44 @@ namespace llarp
}
void
LinkManager::recv_control_message(oxen::quic::Stream&, bstring_view)
LinkManager::recv_control_message(oxen::quic::message msg)
{
// TODO: this
// if the message is not expired, it will pass this conditional
if (msg)
{
std::string ep{msg.endpoint()}, body{msg.body()};
bool is_request = (msg.type() == "Q"sv) ? true : false;
if (auto itr = rpc_map.find(ep); itr != rpc_map.end())
{
router.loop()->call([&]() {
// execute mapped callback
auto maybe_response = itr->second(body);
if (is_request)
{
if (maybe_response)
{
// respond here
msg.respond(msg.rid(), *maybe_response);
}
// TODO: revisit the logic of these conditionals after defining the callback functions
// to see if returning/taking optionals makes sense
}
});
}
else
{
msg.respond(msg.rid(), "INVALID REQUEST", true);
return;
}
}
else
{
// RPC request was sent out but we received no response
log::info(link_cat, "RPC request (RID: {}) timed out", msg.rid());
}
}
} // namespace llarp

@ -8,7 +8,6 @@
#include <llarp/crypto/crypto.hpp>
#include <llarp/util/compare_ptr.hpp>
#include <external/oxen-libquic/include/quic.hpp>
#include <quic.hpp>
#include <unordered_map>
@ -45,8 +44,11 @@ namespace llarp
std::unordered_map<RouterID, std::shared_ptr<link::Connection>> conns;
std::unordered_map<oxen::quic::ConnectionID, RouterID> connid_map;
// TODO: see which of these is actually useful and delete the other
std::shared_ptr<link::Connection>
get_conn(const RouterContact&) const;
std::shared_ptr<link::Connection>
get_conn(const RouterID&) const;
bool
have_conn(const RouterID& remote, bool client_only) const;
@ -103,24 +105,62 @@ namespace llarp
struct PendingMessage
{
bstring buf;
uint16_t priority;
std::string body;
RouterID rid;
bool is_control{false};
PendingMessage(bstring b, uint16_t p, bool c = false)
: buf{std::move(b)}, priority{p}, is_control{c}
PendingMessage(std::string b, bool control = false) : body{std::move(b)}, is_control{control}
{}
};
struct PendingDataMessage : PendingMessage
{
PendingDataMessage(std::string b) : PendingMessage(b)
{}
};
struct PendingControlMessage : PendingMessage
{
std::string endpoint;
bool is_request{false}; // true if request, false if command
PendingControlMessage(std::string b, std::string e, bool request = true)
: PendingMessage(b, true), endpoint{std::move(e)}, is_request{request}
{}
};
using MessageQueue = util::ascending_priority_queue<PendingMessage>;
using MessageQueue = std::deque<PendingMessage>;
struct Router;
struct LinkManager
{
public:
explicit LinkManager(Router& r);
// set is_request to true for RPC requests, false for RPC commands
bool
send_control_message(
const RouterID& remote, std::string endpoint, std::string body, bool is_request = true);
bool
send_data_message(const RouterID& remote, std::string data);
private:
friend struct link::Endpoint;
const std::unordered_map<
std::string,
std::function<std::optional<std::string>(std::optional<std::string>)>>
rpc_map{
/** TODO:
key: RPC endpoint name
value: function that takes command body as parameter
returns: commands will return std::nullopt while requests will return a response
*/
};
std::atomic<bool> is_stopping;
// DISCUSS: is this necessary? can we reduce the amount of locking and nuke this
mutable util::Mutex m; // protects persisting_conns
@ -149,8 +189,9 @@ namespace llarp
void
recv_data_message(oxen::quic::dgram_interface& dgi, bstring dgram);
void
recv_control_message(oxen::quic::Stream& stream, bstring_view packet);
recv_control_message(oxen::quic::message msg);
void
on_conn_open(oxen::quic::connection_interface& ci);
@ -158,9 +199,10 @@ namespace llarp
void
on_conn_closed(oxen::quic::connection_interface& ci, uint64_t ec);
public:
explicit LinkManager(Router& r);
std::shared_ptr<oxen::quic::Endpoint>
startup_endpoint();
public:
const link::Endpoint&
endpoint()
{
@ -173,9 +215,6 @@ namespace llarp
return addr;
}
bool
send_to(const RouterID& remote, bstring data, uint16_t priority);
bool
have_connection_to(const RouterID& remote, bool client_only = false) const;
@ -248,19 +287,15 @@ namespace llarp
{
try
{
oxen::quic::dgram_data_callback dgram_cb =
[this](oxen::quic::dgram_interface& dgi, bstring dgram) {
link_manager.recv_data_message(dgi, dgram);
};
auto conn_interface =
endpoint->connect(remote, link_manager.tls_creds, dgram_cb, std::forward<Opt>(opts)...);
endpoint->connect(remote, link_manager.tls_creds, std::forward<Opt>(opts)...);
// emplace immediately for connection open callback to find scid
connid_map.emplace(conn_interface->scid(), rc.pubkey);
auto [itr, b] = conns.emplace(rc.pubkey);
auto control_stream = conn_interface->get_new_stream();
auto control_stream =
conn_interface->template get_new_stream<oxen::quic::BTRequestStream>();
itr->second = std::make_shared<link::Connection>(conn_interface, rc, control_stream);
return true;
@ -293,7 +328,6 @@ namespace llarp
-> Yields mega-combo endpoint managing object?
- Can avoid "kitchen sink" by greatly reducing complexity of implementation
llarp/router/outbound_message_handler.hpp
- pendingsessionmessagequeue
- establish queue of messages to be sent on a connection we are creating

@ -95,7 +95,7 @@ namespace llarp
{
if (!verify())
return false;
return session->GotLIM(this);
return conn->GotLIM(this);
}
void

@ -54,7 +54,7 @@ namespace llarp
bool
RelayUpstreamMessage::handle_message(Router* r) const
{
auto path = r->path_context().GetByDownstream(session->GetPubKey(), pathid);
auto path = r->path_context().GetByDownstream(conn->remote_rc.pubkey, pathid);
if (path)
{
return path->HandleUpstream(llarp_buffer_t(enc), nonce, r);
@ -110,7 +110,7 @@ namespace llarp
bool
RelayDownstreamMessage::handle_message(Router* r) const
{
auto path = r->path_context().GetByUpstream(session->GetPubKey(), pathid);
auto path = r->path_context().GetByUpstream(conn->remote_rc.pubkey, pathid);
if (path)
{
return path->HandleDownstream(llarp_buffer_t(enc), nonce, r);

@ -201,11 +201,10 @@ namespace llarp
, context(ctx)
, hop(std::make_shared<Hop>())
, fromAddr(
commit->session->GetRemoteRC().IsPublicRouter()
? std::optional<IpAddress>{}
: commit->session->GetRemoteEndpoint())
commit->conn->remote_rc.IsPublicRouter() ? std::optional<oxen::quic::Address>{}
: commit->conn->remote_rc.addr)
{
hop->info.downstream = commit->session->GetPubKey();
hop->info.downstream = commit->conn->remote_rc.pubkey;
}
~LRCMFrameDecrypt() = default;
@ -434,8 +433,8 @@ namespace llarp
// TODO: check if we really want to accept it
self->hop->started = now;
self->context->router()->NotifyRouterEvent<tooling::PathRequestReceivedEvent>(
self->context->router()->pubkey(), self->hop);
// self->context->router()->NotifyRouterEvent<tooling::PathRequestReceivedEvent>(
// self->context->router()->pubkey(), self->hop);
size_t sz = self->frames[0].size();
// shift
@ -487,7 +486,7 @@ namespace llarp
// decrypt frames async
frameDecrypt->decrypter->AsyncDecrypt(
frameDecrypt->frames[0], frameDecrypt, [r = context->router()](auto func) {
r->QueueWork(std::move(func));
r->loop()->call([&]() { func(); });
});
return true;
}

@ -4,6 +4,7 @@
#include <llarp/crypto/types.hpp>
#include <llarp/messages/link_message.hpp>
#include <llarp/path/path_types.hpp>
#include <llarp/router_contact.hpp>
#include <llarp/pow.hpp>
#include <array>

@ -2,7 +2,6 @@
#include <llarp/crypto/crypto.hpp>
#include <llarp/path/path_context.hpp>
#include <llarp/path/ihophandler.hpp>
#include <llarp/router/router.hpp>
#include <llarp/routing/path_confirm_message.hpp>
#include <llarp/util/bencode.hpp>
@ -129,14 +128,14 @@ namespace llarp
bool
LR_StatusMessage::handle_message(Router* router) const
{
llarp::LogDebug("Received LR_Status message from (", session->GetPubKey(), ")");
llarp::LogDebug("Received LR_Status message from (", conn->remote_rc.pubkey, ")");
if (frames.size() != path::max_len)
{
llarp::LogError("LRSM invalid number of records, ", frames.size(), "!=", path::max_len);
return false;
}
auto path = router->path_context().GetByUpstream(session->GetPubKey(), pathid);
auto path = router->path_context().GetByUpstream(conn->remote_rc.pubkey, pathid);
if (not path)
{
llarp::LogWarn("unhandled LR_Status message: no associated path found pathid=", pathid);

@ -1,6 +1,5 @@
#include "sock_addr.hpp"
#include "ip_range.hpp"
#include "address_info.hpp"
#include "ip.hpp"
#include "net_bits.hpp"
#include "net.hpp"

@ -10,7 +10,7 @@ namespace llarp::path
static constexpr auto DefaultPathBuildLimit = 500ms;
PathContext::PathContext(Router* router)
: router(router), m_AllowTransit(false), m_PathLimits(DefaultPathBuildLimit)
: _router(router), m_AllowTransit(false), m_PathLimits(DefaultPathBuildLimit)
{}
void
@ -43,19 +43,19 @@ namespace llarp::path
const EventLoop_ptr&
PathContext::loop()
{
return router->loop();
return _router->loop();
}
const SecretKey&
PathContext::EncryptionSecretKey()
{
return router->encryption();
return _router->encryption();
}
bool
PathContext::HopIsUs(const RouterID& k) const
{
return std::equal(router->pubkey(), router->pubkey() + PUBKEYSIZE, k.begin());
return std::equal(_router->pubkey(), _router->pubkey() + PUBKEYSIZE, k.begin());
}
PathContext::EndpointPathPtrSet
@ -85,7 +85,7 @@ namespace llarp::path
LogDebug("forwarding LRCM to ", nextHop);
return router->SendToOrQueue(nextHop, msg, handler);
return _router->SendToOrQueue(nextHop, msg, handler);
}
template <
@ -273,7 +273,7 @@ namespace llarp::path
const byte_t*
PathContext::OurRouterID() const
{
return router->pubkey();
return _router->pubkey();
}
TransitHop_ptr
@ -296,15 +296,15 @@ namespace llarp::path
void
PathContext::PumpUpstream()
{
m_TransitPaths.ForEach([&](auto& ptr) { ptr->FlushUpstream(router); });
m_OurPaths.ForEach([&](auto& ptr) { ptr->FlushUpstream(router); });
m_TransitPaths.ForEach([&](auto& ptr) { ptr->FlushUpstream(_router); });
m_OurPaths.ForEach([&](auto& ptr) { ptr->FlushUpstream(_router); });
}
void
PathContext::PumpDownstream()
{
m_TransitPaths.ForEach([&](auto& ptr) { ptr->FlushDownstream(router); });
m_OurPaths.ForEach([&](auto& ptr) { ptr->FlushDownstream(router); });
m_TransitPaths.ForEach([&](auto& ptr) { ptr->FlushDownstream(_router); });
m_OurPaths.ForEach([&](auto& ptr) { ptr->FlushDownstream(_router); });
}
uint64_t
@ -350,7 +350,7 @@ namespace llarp::path
{
if (itr->second->Expired(now))
{
router->outboundMessageHandler().RemovePath(itr->first);
_router->outboundMessageHandler().RemovePath(itr->first);
itr = map.erase(itr);
}
else

@ -174,8 +174,14 @@ namespace llarp
uint64_t
CurrentOwnedPaths(path::PathStatus status = path::PathStatus::ePathEstablished);
Router*
router() const
{
return _router;
}
private:
Router* router;
Router* _router;
SyncTransitMap_t m_TransitPaths;
SyncOwnedPathsMap_t m_OurPaths;
bool m_AllowTransit;

@ -84,7 +84,6 @@ namespace llarp
paths.PumpDownstream();
paths.PumpUpstream();
_hidden_service_context.Pump();
// _outboundMessageHandler.Pump();
llarp::LogTrace("Router::PumpLL() end");
}
@ -203,22 +202,6 @@ namespace llarp
return stats;
}
// TODO: libquic change
bool
Router::recv_link_message_buffer(std::shared_ptr<link::Connection> conn, bstring_view buf)
{
if (is_stopping)
return true;
if (!conn)
{
log::warning(quic_cat, "No connection to pass link message buffer to!");
return false;
}
return inbound_link_msg_parser.ProcessFrom(session, buf);
}
void
Router::Freeze()
{
@ -509,9 +492,7 @@ namespace llarp
bool
Router::ParseRoutingMessageBuffer(
const llarp_buffer_t& buf, routing::AbstractRoutingMessageHandler* h, const PathID_t& rxid)
{
return inbound_routing_msg_parser.ParseMessageBuffer(buf, h, rxid, this);
}
{}
bool
Router::appears_decommed() const

@ -210,7 +210,11 @@ namespace llarp
return _dht;
}
// TOFIX: THIS
/** TOFIX: this
- refactor path types (path_context, pathset) to use unified ID type, not PathID_t
- refactor all callers to use new implementation of remove_path
*/
OutboundMessageHandler&
outboundMessageHandler()
{
@ -412,10 +416,6 @@ namespace llarp
void
GossipRCIfNeeded(const RouterContact rc);
// TODO: this is not used anywhere?
bool
recv_link_message_buffer(std::shared_ptr<link::Connection> conn, bstring_view msg);
void
InitInboundLinks();

Loading…
Cancel
Save