Deprecate pending_msg_que in favor of libquic internal stream buffers

pull/2232/head
dr7ana 4 months ago
parent 674edab31f
commit 95fe45eb5d

@ -246,38 +246,38 @@ namespace llarp::exit
bool
BaseSession::FlushUpstream()
{
auto now = router->now();
auto path = PickEstablishedPath(llarp::path::ePathRoleExit);
if (path)
{
// for (auto& [i, queue] : m_Upstream)
// {
// while (queue.size())
// {
// auto& msg = queue.front();
// msg.sequence_number = path->NextSeqNo();
// path->SendRoutingMessage(msg, router);
// queue.pop_front();
// }
// }
}
else
{
// if (m_Upstream.size())
// llarp::LogWarn("no path for exit session");
// // discard upstream
// for (auto& [i, queue] : m_Upstream)
// queue.clear();
// m_Upstream.clear();
if (numHops == 1)
{
if (const auto maybe = router->node_db()->get_rc(exit_router); maybe.has_value())
router->connect_to(*maybe);
}
else if (UrgentBuild(now))
BuildOneAlignedTo(exit_router);
}
// auto now = router->now();
// auto path = PickEstablishedPath(llarp::path::ePathRoleExit);
// if (path)
// {
// for (auto& [i, queue] : m_Upstream)
// {
// while (queue.size())
// {
// auto& msg = queue.front();
// msg.sequence_number = path->NextSeqNo();
// path->SendRoutingMessage(msg, router);
// queue.pop_front();
// }
// }
// }
// else
// {
// if (m_Upstream.size())
// llarp::LogWarn("no path for exit session");
// // discard upstream
// for (auto& [i, queue] : m_Upstream)
// queue.clear();
// m_Upstream.clear();
// if (numHops == 1)
// {
// if (const auto maybe = router->node_db()->get_rc(exit_router); maybe.has_value())
// router->connect_to(*maybe);
// }
// else if (UrgentBuild(now))
// BuildOneAlignedTo(exit_router);
// }
return true;
}

@ -352,36 +352,6 @@ namespace llarp
if (auto it = ep.service_conns.find(rid); it != ep.service_conns.end())
{
log::critical(logcat, "Fetched configured outbound connection to relay RID:{}", rid);
auto& conn = it->second->conn;
auto& str = it->second->control_stream;
if (auto itr = pending_conn_msg_queue.find(rid); itr != pending_conn_msg_queue.end())
{
log::critical(logcat, "Clearing pending queue for RID:{}", rid);
auto& que = itr->second;
while (not que.empty())
{
auto& msg = que.front();
if (msg.is_control)
{
log::critical(
logcat, "Dispatching {} request (stream ID: {})!", *msg.endpoint, str->stream_id());
str->command(std::move(*msg.endpoint), std::move(msg.body), std::move(msg.func));
}
else
{
log::critical(logcat, "DIspatching data message: {}", msg.body);
conn->send_datagram(std::move(msg.body));
}
que.pop_front();
}
}
log::warning(logcat, "Pending queue empty for RID:{}", rid);
}
else
{
@ -427,10 +397,6 @@ namespace llarp
[this, scid = ci.scid(), rid = RouterID{ci.remote_key()}, error_code = ec]() {
log::critical(quic_cat, "Purging quic connection CID:{} (ec:{})", scid, error_code);
// in case this didn't clear earlier, do it now
if (auto p_itr = pending_conn_msg_queue.find(rid); p_itr != pending_conn_msg_queue.end())
pending_conn_msg_queue.erase(p_itr);
if (auto s_itr = ep.service_conns.find(rid); s_itr != ep.service_conns.end())
{
log::critical(quic_cat, "Quic connection to relay RID:{} purged successfully", rid);
@ -453,7 +419,12 @@ namespace llarp
std::string body,
std::function<void(oxen::quic::message m)> func)
{
assert(func); // makes no sense to send control message and ignore response
// DISCUSS: revisit if this assert makes sense. If so, there's no need to if (func) the
// next logic block
assert(func); // makes no sense to send control message and ignore response (maybe gossip?)
if (is_stopping)
return false;
if (func)
{
@ -463,19 +434,6 @@ namespace llarp
};
}
return send_control_message_impl(remote, std::move(endpoint), std::move(body), std::move(func));
}
bool
LinkManager::send_control_message_impl(
const RouterID& remote,
std::string endpoint,
std::string body,
std::function<void(oxen::quic::message m)> func)
{
if (is_stopping)
return false;
if (auto conn = ep.get_conn(remote); conn)
{
conn->control_stream->command(std::move(endpoint), std::move(body), std::move(func));
@ -487,21 +445,7 @@ namespace llarp
endpoint = std::move(endpoint),
body = std::move(body),
f = std::move(func)]() {
auto pending = PendingMessage(std::move(body), std::move(endpoint), std::move(f));
if (auto it = pending_conn_msg_queue.find(remote); it != pending_conn_msg_queue.end())
{
it->second.push_back(std::move(pending));
log::critical(
logcat, "Connection to RID:{} is pending; message appended to send queue!", remote);
}
else
{
log::critical(logcat, "Connection to RID:{} is pending; creating send queue!", remote);
auto [itr, b] = pending_conn_msg_queue.emplace(remote, MessageQueue());
itr->second.push_back(std::move(pending));
connect_to(remote);
}
connect_and_send(remote, std::move(endpoint), std::move(body), std::move(f));
});
return false;
@ -520,12 +464,7 @@ namespace llarp
}
_router.loop()->call([this, body = std::move(body), remote]() {
auto pending = PendingMessage(std::move(body));
auto [itr, b] = pending_conn_msg_queue.emplace(remote, MessageQueue());
itr->second.push_back(std::move(pending));
connect_to(remote);
connect_and_send(remote, std::nullopt, std::move(body));
});
return false;
@ -550,12 +489,35 @@ namespace llarp
}
void
LinkManager::connect_to(const RouterID& rid, conn_open_hook hook)
LinkManager::connect_and_send(
const RouterID& router,
std::optional<std::string> endpoint,
std::string body,
std::function<void(oxen::quic::message m)> func)
{
if (auto rc = node_db->get_rc(rid))
connect_to(*rc, std::move(hook));
// by the time we have called this, we have already checked if we have a connection to this RID
// in ::send_control_message_impl, at which point we will dispatch on that stream
if (auto rc = node_db->get_rc(router))
{
const auto& remote_addr = rc->addr();
if (auto rv = ep.establish_and_send(
oxen::quic::RemoteAddress{router.ToView(), remote_addr},
*rc,
std::move(endpoint),
std::move(body),
std::move(func));
rv)
{
log::info(quic_cat, "Begun establishing connection to {}", remote_addr);
return;
}
log::warning(quic_cat, "Failed to begin establishing connection to {}", remote_addr);
}
else
log::warning(quic_cat, "Could not find RouterContact for connection to rid:{}", rid);
log::error(
quic_cat, "Error: Could not find RC for connection to rid:{}, message not sent!", router);
}
void
@ -573,8 +535,6 @@ namespace llarp
const auto& remote_addr = rc.addr();
// TODO: confirm remote end is using the expected pubkey (RouterID).
// TODO: ALPN for "client" vs "relay" (could just be set on endpoint creation)
if (auto rv = ep.establish_connection(
oxen::quic::RemoteAddress{rid.ToView(), remote_addr},
rc,
@ -767,35 +727,28 @@ namespace llarp
log::critical(link_cat, "Received known or old RC, not storing or forwarding.");
}
// TODO: can probably use ::send_control_message instead. Need to discuss the potential difference
// in calling Endpoint::get_service_conn vs Endpoint::get_conn
void
LinkManager::fetch_bootstrap_rcs(
const RemoteRC& source, std::string payload, std::function<void(oxen::quic::message m)> func)
{
_router.loop()->call([this, source, payload, f = std::move(func)]() mutable {
if (f)
{
f = [this, func = std::move(f)](oxen::quic::message m) mutable {
_router.loop()->call(
[f = std::move(func), msg = std::move(m)]() mutable { f(std::move(msg)); });
};
}
const auto& rid = source.router_id();
if (auto conn = ep.get_service_conn(rid); conn)
{
conn->control_stream->command("bfetch_rcs"s, std::move(payload), std::move(f));
log::critical(logcat, "Dispatched bootstrap fetch request!");
return;
}
func = [this, f = std::move(func)](oxen::quic::message m) mutable {
_router.loop()->call(
[func = std::move(f), msg = std::move(m)]() mutable { func(std::move(msg)); });
};
log::critical(logcat, "Queuing bootstrap fetch request to {}", rid);
auto pending = PendingMessage(std::move(payload), "bfetch_rcs"s, std::move(f));
const auto& rid = source.router_id();
auto [itr, b] = pending_conn_msg_queue.emplace(rid, MessageQueue());
itr->second.push_back(std::move(pending));
if (auto conn = ep.get_service_conn(rid); conn)
{
conn->control_stream->command("bfetch_rcs"s, std::move(payload), std::move(func));
log::critical(logcat, "Dispatched bootstrap fetch request!");
return;
}
connect_to(source);
_router.loop()->call([this, source, payload, f = std::move(func), rid = rid]() mutable {
connect_and_send(rid, "bfetch_rcs"s, std::move(payload), std::move(f));
});
}

@ -106,6 +106,16 @@ namespace llarp
establish_connection(
const oxen::quic::RemoteAddress& remote, const RemoteRC& rc, Opt&&... opts);
template <typename... Opt>
bool
establish_and_send(
const oxen::quic::RemoteAddress& remote,
const RemoteRC& rc,
std::optional<std::string> endpoint,
std::string body,
std::function<void(oxen::quic::message m)> func = nullptr,
Opt&&... opts);
void
for_each_connection(std::function<void(link::Connection&)> func);
@ -188,13 +198,6 @@ namespace llarp
private:
explicit LinkManager(Router& r);
bool
send_control_message_impl(
const RouterID& remote,
std::string endpoint,
std::string body,
std::function<void(oxen::quic::message)> = nullptr);
friend struct link::Endpoint;
std::atomic<bool> is_stopping;
@ -202,9 +205,6 @@ namespace llarp
// sessions to persist -> timestamp to end persist at
std::unordered_map<RouterID, llarp_time_t> persisting_conns;
// holds any messages we attempt to send while connections are establishing
std::unordered_map<RouterID, MessageQueue> pending_conn_msg_queue;
util::DecayingHashSet<RouterID> clients{path::DEFAULT_LIFETIME};
std::shared_ptr<NodeDB> node_db;
@ -225,9 +225,6 @@ namespace llarp
void
recv_data_message(oxen::quic::dgram_interface& dgi, bstring dgram);
void
recv_control_message(oxen::quic::message msg);
std::shared_ptr<oxen::quic::BTRequestStream>
make_control(oxen::quic::connection_interface& ci, const RouterID& rid);
@ -309,10 +306,14 @@ namespace llarp
test_reachability(const RouterID& rid, conn_open_hook, conn_closed_hook);
void
connect_to(const RouterID& router, conn_open_hook = nullptr);
connect_to(const RemoteRC& rc, conn_open_hook = nullptr, conn_closed_hook = nullptr);
void
connect_to(const RemoteRC& rc, conn_open_hook = nullptr, conn_closed_hook = nullptr);
connect_and_send(
const RouterID& router,
std::optional<std::string> endpoint,
std::string body,
std::function<void(oxen::quic::message m)> func = nullptr);
void
close_connection(RouterID rid);
@ -426,6 +427,71 @@ namespace llarp
namespace link
{
template <typename... Opt>
bool
Endpoint::establish_and_send(
const oxen::quic::RemoteAddress& remote,
const RemoteRC& rc,
std::optional<std::string> ep,
std::string body,
std::function<void(oxen::quic::message m)> func,
Opt&&... opts)
{
try
{
const auto& rid = rc.router_id();
const auto& is_snode = _is_service_node;
const auto& is_control = ep.has_value();
const auto us = is_snode ? "Relay"s : "Client"s;
log::critical(logcat, "Establishing connection to RID:{}", rid);
// add to service conns
auto [itr, b] = service_conns.emplace(rid, nullptr);
auto conn_interface = endpoint->connect(
remote,
link_manager.tls_creds,
is_snode ? ROUTER_KEEP_ALIVE : CLIENT_KEEP_ALIVE,
std::forward<Opt>(opts)...);
// auto
std::shared_ptr<oxen::quic::BTRequestStream> control_stream =
conn_interface->template open_stream<oxen::quic::BTRequestStream>(
[this, rid = rid](oxen::quic::Stream&, uint64_t error_code) {
log::warning(
logcat,
"BTRequestStream closed unexpectedly (ec:{}); closing outbound connection...",
error_code);
close_connection(rid);
});
if (is_snode)
link_manager.register_commands(control_stream, rid);
else
log::critical(logcat, "Client NOT registering BTStream commands!");
log::critical(
logcat,
"{} dispatching {} on outbound connection to remote (rid:{})",
us,
is_control ? "control message (ep:{})"_format(ep) : "data message",
rid);
(is_control) ? control_stream->command(std::move(*ep), std::move(body), std::move(func))
: conn_interface->send_datagram(std::move(body));
itr->second = std::make_shared<link::Connection>(conn_interface, control_stream, true);
log::critical(logcat, "Outbound connection to RID:{} added to service conns...", rid);
return true;
}
catch (...)
{
log::error(quic_cat, "Error: failed to establish connection to {}", remote);
return false;
}
}
template <typename... Opt>
bool
Endpoint::establish_connection(

@ -247,18 +247,6 @@ namespace llarp
loop_wakeup->Trigger();
}
void
Router::connect_to(const RouterID& rid)
{
_link_manager->connect_to(rid);
}
void
Router::connect_to(const RemoteRC& rc)
{
_link_manager->connect_to(rc);
}
bool
Router::send_data_message(const RouterID& remote, std::string payload)
{

@ -202,12 +202,6 @@ namespace llarp
void
for_each_connection(std::function<void(link::Connection&)> func);
void
connect_to(const RouterID& rid);
void
connect_to(const RemoteRC& rc);
const Contacts&
contacts() const
{

Loading…
Cancel
Save