Fixed pending message queue weirdness

pull/2232/head
dr7ana 6 months ago
parent a591d4424b
commit dbad0d596a

@ -5,7 +5,7 @@ namespace llarp::link
Connection::Connection( Connection::Connection(
const std::shared_ptr<oxen::quic::connection_interface>& c, const std::shared_ptr<oxen::quic::connection_interface>& c,
std::shared_ptr<oxen::quic::BTRequestStream>& s) std::shared_ptr<oxen::quic::BTRequestStream>& s)
: conn{c}, control_stream{s}/* , remote_rc{std::move(rc)} */ : conn{c}, control_stream{s} /* , remote_rc{std::move(rc)} */
{} {}
} // namespace llarp::link } // namespace llarp::link

@ -74,7 +74,7 @@ namespace llarp
std::advance(itr, randint() % size); std::advance(itr, randint() % size);
RouterID rid{itr->second->conn->remote_key()}; RouterID rid{itr->second->conn->remote_key()};
if (auto maybe = link_manager.node_db->get_rc(rid)) if (auto maybe = link_manager.node_db->get_rc(rid))
{ {
router = *maybe; router = *maybe;
@ -151,7 +151,8 @@ namespace llarp
for (auto& method : direct_requests) for (auto& method : direct_requests)
{ {
s->register_command( s->register_command(
std::string{method.first}, [this, func = std::move(method.second)](oxen::quic::message m) { std::string{method.first},
[this, func = std::move(method.second)](oxen::quic::message m) {
_router.loop()->call([this, msg = std::move(m), func = std::move(func)]() mutable { _router.loop()->call([this, msg = std::move(m), func = std::move(func)]() mutable {
auto body = msg.body_str(); auto body = msg.body_str();
auto respond = [m = std::move(msg)](std::string response) mutable { auto respond = [m = std::move(msg)](std::string response) mutable {
@ -291,7 +292,7 @@ namespace llarp
endpoint = std::move(endpoint), endpoint = std::move(endpoint),
body = std::move(body), body = std::move(body),
f = std::move(func)]() { f = std::move(func)]() {
auto pending = PendingControlMessage(std::move(body), std::move(endpoint), f); auto pending = PendingMessage(std::move(body), std::move(endpoint), std::move(f));
auto [itr, b] = pending_conn_msg_queue.emplace(remote, MessageQueue()); auto [itr, b] = pending_conn_msg_queue.emplace(remote, MessageQueue());
itr->second.push_back(std::move(pending)); itr->second.push_back(std::move(pending));
@ -315,7 +316,7 @@ namespace llarp
} }
_router.loop()->call([this, body = std::move(body), remote]() { _router.loop()->call([this, body = std::move(body), remote]() {
auto pending = PendingDataMessage(body); auto pending = PendingMessage(std::move(body));
auto [itr, b] = pending_conn_msg_queue.emplace(remote, MessageQueue()); auto [itr, b] = pending_conn_msg_queue.emplace(remote, MessageQueue());
itr->second.push_back(std::move(pending)); itr->second.push_back(std::move(pending));
@ -434,17 +435,16 @@ namespace llarp
while (not que.empty()) while (not que.empty())
{ {
auto& m = que.front(); auto& msg = que.front();
if (m.is_control) if (msg.is_control)
{ {
auto& msg = reinterpret_cast<PendingControlMessage&>(m); log::critical(logcat, "Dispatching {} request!", *msg.endpoint);
log::critical(logcat, "Dispatching {} request!", msg.endpoint); ep.conns[rid]->control_stream->command(
ep.conns[rid]->control_stream->command(msg.endpoint, msg.body, msg.func); std::move(*msg.endpoint), std::move(msg.body), std::move(msg.func));
} }
else else
{ {
auto& msg = reinterpret_cast<PendingDataMessage&>(m);
conn_interface.send_datagram(std::move(msg.body)); conn_interface.send_datagram(std::move(msg.body));
} }
@ -647,7 +647,7 @@ namespace llarp
} }
log::critical(logcat, "Queuing bootstrap fetch request to {}", source.router_id()); log::critical(logcat, "Queuing bootstrap fetch request to {}", source.router_id());
auto pending = PendingControlMessage(std::move(payload), "bfetch_rcs"s, std::move(f)); auto pending = PendingMessage(std::move(payload), "bfetch_rcs"s, std::move(f));
auto [itr, b] = pending_conn_msg_queue.emplace(source.router_id(), MessageQueue()); auto [itr, b] = pending_conn_msg_queue.emplace(source.router_id(), MessageQueue());
itr->second.push_back(std::move(pending)); itr->second.push_back(std::move(pending));
@ -688,12 +688,16 @@ namespace llarp
// TODO: if we are not the seed, how do we check the requester // TODO: if we are not the seed, how do we check the requester
if (is_seed) if (is_seed)
{ {
// we already insert the // we already insert the
auto& seeds = node_db->seeds(); auto& seeds = node_db->seeds();
if (auto itr = seeds.find(rid); itr != seeds.end()) if (auto itr = seeds.find(rid); itr != seeds.end())
{ {
log::critical(logcat, "Bootstrap seed confirmed RID:{} is white-listed seeds; approving fetch request and saving RC!", rid); log::critical(
logcat,
"Bootstrap seed confirmed RID:{} is white-listed seeds; approving fetch request and "
"saving RC!",
rid);
node_db->put_rc(remote); node_db->put_rc(remote);
} }
} }

@ -109,27 +109,18 @@ namespace llarp
struct PendingMessage struct PendingMessage
{ {
std::string body; std::string body;
RouterID rid; std::optional<std::string> endpoint = std::nullopt;
bool is_control{false}; std::function<void(oxen::quic::message)> func = nullptr;
PendingMessage(std::string b, bool control = false) : body{std::move(b)}, is_control{control} RouterID rid;
{} bool is_control = false;
};
struct PendingDataMessage : PendingMessage PendingMessage(std::string b) : body{std::move(b)}
{
PendingDataMessage(std::string b) : PendingMessage(b)
{} {}
};
struct PendingControlMessage : PendingMessage
{
std::string endpoint;
std::function<void(oxen::quic::message)> func;
PendingControlMessage( PendingMessage(
std::string b, std::string e, std::function<void(oxen::quic::message)> f = nullptr) std::string b, std::string ep, std::function<void(oxen::quic::message)> f = nullptr)
: PendingMessage(b, true), endpoint{std::move(e)}, func{std::move(f)} : body{std::move(b)}, endpoint{std::move(ep)}, func{std::move(f)}, is_control{true}
{} {}
}; };

@ -668,13 +668,14 @@ namespace llarp
_needs_rebootstrap = false; _needs_rebootstrap = false;
++bootstrap_attempts; ++bootstrap_attempts;
log::critical( log::critical(logcat, "Dispatching BootstrapRC fetch request to {}", fetch_source);
logcat, "Dispatching BootstrapRC fetch request to {}", _bootstraps.current().view());
_router.link_manager().fetch_bootstrap_rcs( _router.link_manager().fetch_bootstrap_rcs(
rc, rc,
BootstrapFetchMessage::serialize(_router.router_contact, BOOTSTRAP_SOURCE_COUNT), BootstrapFetchMessage::serialize(_router.router_contact, BOOTSTRAP_SOURCE_COUNT),
[this](oxen::quic::message m) mutable { [this](oxen::quic::message m) mutable {
log::critical(logcat, "Received response to BootstrapRC fetch request...");
if (not m) if (not m)
{ {
// ++bootstrap_attempts; // ++bootstrap_attempts;

@ -218,9 +218,8 @@ namespace llarp
std::unordered_set<RouterID> peer_pubkeys; std::unordered_set<RouterID> peer_pubkeys;
for_each_connection([&peer_pubkeys](link::Connection& conn) { for_each_connection(
peer_pubkeys.emplace(conn.conn->remote_key()); [&peer_pubkeys](link::Connection& conn) { peer_pubkeys.emplace(conn.conn->remote_key()); });
});
loop()->call([this, &peer_pubkeys]() { loop()->call([this, &peer_pubkeys]() {
for (auto& pk : peer_pubkeys) for (auto& pk : peer_pubkeys)

Loading…
Cancel
Save