- pending conns container stops them from being counted towards active conns in the interim
- un-abstracted pendingmessages vs pendingdatamessages vs pendingcontrolmessages (gross)
- fixed bootstrap fetching and storage!
pull/2232/head
dr7ana 6 months ago
parent dbad0d596a
commit 9f0766f203

@ -1 +1 @@
Subproject commit 921a85852a12510d374e1042d983e98f40f07689 Subproject commit 357548ba0b6f3d99126148c27091afebac77aae9

@ -24,7 +24,7 @@ namespace llarp
std::shared_ptr<link::Connection> std::shared_ptr<link::Connection>
Endpoint::get_conn(const RemoteRC& rc) const Endpoint::get_conn(const RemoteRC& rc) const
{ {
if (auto itr = conns.find(rc.router_id()); itr != conns.end()) if (auto itr = active_conns.find(rc.router_id()); itr != active_conns.end())
return itr->second; return itr->second;
return nullptr; return nullptr;
@ -33,7 +33,7 @@ namespace llarp
std::shared_ptr<link::Connection> std::shared_ptr<link::Connection>
Endpoint::get_conn(const RouterID& rid) const Endpoint::get_conn(const RouterID& rid) const
{ {
if (auto itr = conns.find(rid); itr != conns.end()) if (auto itr = active_conns.find(rid); itr != active_conns.end())
return itr->second; return itr->second;
return nullptr; return nullptr;
@ -42,7 +42,7 @@ namespace llarp
bool bool
Endpoint::have_conn(const RouterID& remote, bool client_only) const Endpoint::have_conn(const RouterID& remote, bool client_only) const
{ {
if (auto itr = conns.find(remote); itr != conns.end()) if (auto itr = active_conns.find(remote); itr != active_conns.end())
{ {
if (not(itr->second->remote_is_relay and client_only)) if (not(itr->second->remote_is_relay and client_only))
return true; return true;
@ -56,7 +56,7 @@ namespace llarp
{ {
size_t count = 0; size_t count = 0;
for (const auto& c : conns) for (const auto& c : active_conns)
{ {
if (not(c.second->remote_is_relay and clients_only)) if (not(c.second->remote_is_relay and clients_only))
count += 1; count += 1;
@ -68,9 +68,9 @@ namespace llarp
bool bool
Endpoint::get_random_connection(RemoteRC& router) const Endpoint::get_random_connection(RemoteRC& router) const
{ {
if (const auto size = conns.size(); size) if (const auto size = active_conns.size(); size)
{ {
auto itr = conns.begin(); auto itr = active_conns.begin();
std::advance(itr, randint() % size); std::advance(itr, randint() % size);
RouterID rid{itr->second->conn->remote_key()}; RouterID rid{itr->second->conn->remote_key()};
@ -91,7 +91,7 @@ namespace llarp
void void
Endpoint::for_each_connection(std::function<void(link::Connection&)> func) Endpoint::for_each_connection(std::function<void(link::Connection&)> func)
{ {
for (const auto& [rid, conn] : conns) for (const auto& [rid, conn] : active_conns)
func(*conn); func(*conn);
} }
@ -99,14 +99,14 @@ namespace llarp
Endpoint::close_connection(RouterID _rid) Endpoint::close_connection(RouterID _rid)
{ {
assert(link_manager._router.loop()->inEventLoop()); assert(link_manager._router.loop()->inEventLoop());
auto itr = conns.find(_rid); auto itr = active_conns.find(_rid);
if (itr != conns.end()) if (itr != active_conns.end())
return; return;
auto& conn = *itr->second->conn; auto& conn = *itr->second->conn;
conn.close_connection(); conn.close_connection();
connid_map.erase(conn.scid()); connid_map.erase(conn.scid());
conns.erase(itr); active_conns.erase(itr);
} }
} // namespace link } // namespace link
@ -125,8 +125,22 @@ namespace llarp
void void
LinkManager::register_commands(std::shared_ptr<oxen::quic::BTRequestStream>& s) LinkManager::register_commands(std::shared_ptr<oxen::quic::BTRequestStream>& s)
{ {
assert(ep.connid_map.count(s->conn_id())); log::critical(logcat, "{} called", __PRETTY_FUNCTION__);
const RouterID& router_id = ep.connid_map[s->conn_id()]; const RouterID& router_id {s->conn.remote_key()};
log::critical(logcat, "Registering commands (RID:{})", router_id);
s->register_command("bfetch_rcs"s, [this](oxen::quic::message m) {
_router.loop()->call(
[this, msg = std::move(m)]() mutable { handle_fetch_bootstrap_rcs(std::move(msg)); });
});
// s->register_command("bfetch_rcs"s, [this](oxen::quic::message m) {
// _router.loop()->call(
// [this, msg = std::move(m)]() mutable { handle_fetch_bootstrap_rcs(std::move(msg)); });
// });
log::critical(logcat, "Registered `bfetch_rcs` (RID:{})", router_id);
s->register_command("path_build"s, [this, rid = router_id](oxen::quic::message m) { s->register_command("path_build"s, [this, rid = router_id](oxen::quic::message m) {
_router.loop()->call( _router.loop()->call(
@ -143,11 +157,6 @@ namespace llarp
[this, msg = std::move(m)]() mutable { handle_gossip_rc(std::move(msg)); }); [this, msg = std::move(m)]() mutable { handle_gossip_rc(std::move(msg)); });
}); });
s->register_command("bfetch_rcs"s, [this](oxen::quic::message m) {
_router.loop()->call(
[this, msg = std::move(m)]() mutable { handle_fetch_bootstrap_rcs(std::move(msg)); });
});
for (auto& method : direct_requests) for (auto& method : direct_requests)
{ {
s->register_command( s->register_command(
@ -162,6 +171,24 @@ namespace llarp
}); });
}); });
} }
log::critical(logcat, "Registered all commands! (RID:{})", router_id);
}
LinkManager::LinkManager(Router& r)
: _router{r}
, quic{std::make_unique<oxen::quic::Network>()}
, tls_creds{oxen::quic::GNUTLSCreds::make_from_ed_keys(
{reinterpret_cast<const char*>(_router.identity().data()), size_t{32}},
{reinterpret_cast<const char*>(_router.identity().toPublic().data()), size_t{32}})}
, ep{startup_endpoint(), *this}
{}
std::unique_ptr<LinkManager>
LinkManager::make(Router& r)
{
std::unique_ptr<LinkManager> p{new LinkManager(r)};
return p;
} }
std::shared_ptr<oxen::quic::Endpoint> std::shared_ptr<oxen::quic::Endpoint>
@ -184,25 +211,26 @@ namespace llarp
}, },
[this](oxen::quic::dgram_interface& di, bstring dgram) { recv_data_message(di, dgram); }); [this](oxen::quic::dgram_interface& di, bstring dgram) { recv_data_message(di, dgram); });
tls_creds->set_key_verify_callback([this](const ustring_view& key, const ustring_view&) { tls_creds->set_key_verify_callback([this](const ustring_view& key, const ustring_view&) {
bool result = false; bool result = true;
RouterID other{key.data()}; RouterID other{key.data()};
if (_router.is_bootstrap_seed()) if (_router.is_bootstrap_seed())
{ {
if (node_db->whitelist().count(other)) // FIXME: remove "|| true", this is just for local testing!
if (node_db->whitelist().count(other) || true)
{ {
log::critical(logcat, "Saving bootstrap seed requester...");
auto [it, b] = node_db->seeds().emplace(other); auto [it, b] = node_db->seeds().emplace(other);
result &= b; result |= b;
} }
log::critical( log::critical(
logcat, logcat,
"Bootstrap seed node was {} to confirm fetch requester is white-listed; saving RID", "Bootstrap seed node was {} to confirm fetch requester is white-listed; {}successfully saved RID",
result ? "able" : "unable"); result ? "able" : "unable", result ? "" : "un");
return result; return result;
} }
if (node_db->has_rc(other)) result = node_db->has_rc(other);
result = true;
log::critical( log::critical(
logcat, "{}uccessfully verified connection to {}!", result ? "S" : "Uns", other); logcat, "{}uccessfully verified connection to {}!", result ? "S" : "Uns", other);
@ -215,8 +243,10 @@ namespace llarp
[&](oxen::quic::Connection& c, [&](oxen::quic::Connection& c,
oxen::quic::Endpoint& e, oxen::quic::Endpoint& e,
std::optional<int64_t> id) -> std::shared_ptr<oxen::quic::Stream> { std::optional<int64_t> id) -> std::shared_ptr<oxen::quic::Stream> {
if (id && id == 0)
if (id && *id == 0)
{ {
log::critical(logcat, "Stream constructor constructing BTStream (ID:{})", id);
auto s = e.make_shared<oxen::quic::BTRequestStream>( auto s = e.make_shared<oxen::quic::BTRequestStream>(
c, e, [](oxen::quic::Stream& s, uint64_t error_code) { c, e, [](oxen::quic::Stream& s, uint64_t error_code) {
log::warning( log::warning(
@ -229,28 +259,133 @@ namespace llarp
return s; return s;
} }
log::critical(logcat, "Stream constructor constructing Stream (ID:{})!", id);
return e.make_shared<oxen::quic::Stream>(c, e); return e.make_shared<oxen::quic::Stream>(c, e);
}); });
} }
return ep; return ep;
} }
LinkManager::LinkManager(Router& r) void
: _router{r} LinkManager::on_inbound_conn(oxen::quic::connection_interface& ci)
, quic{std::make_unique<oxen::quic::Network>()} {
, tls_creds{oxen::quic::GNUTLSCreds::make_from_ed_keys( const auto& scid = ci.scid();
{reinterpret_cast<const char*>(_router.identity().data()), size_t{32}}, RouterID rid{ci.remote_key()};
{reinterpret_cast<const char*>(_router.identity().toPublic().data()), size_t{32}})} ep.connid_map.emplace(scid, rid);
, ep{startup_endpoint(), *this} auto [itr, b] = ep.active_conns.emplace(rid, nullptr);
{}
std::unique_ptr<LinkManager> log::critical(logcat, "Queueing BTStream to be opened...");
LinkManager::make(Router& r)
auto control_stream = ci.queue_stream<oxen::quic::BTRequestStream>([](oxen::quic::Stream& s,
uint64_t error_code) {
log::warning(
logcat, "BTRequestStream closed unexpectedly (ec:{}); closing connection...", error_code);
s.conn.close_connection(error_code);
});
log::critical(logcat, "Queued BTStream to be opened ID:{}", control_stream->stream_id());
assert(control_stream->stream_id() == 0);
// register_commands(control_stream);
itr->second = std::make_shared<link::Connection>(ci.shared_from_this(), control_stream);
log::critical(logcat, "Successfully configured inbound connection fom {}...", rid);
}
// TODO: should we add routes here now that Router::SessionOpen is gone?
void
LinkManager::on_conn_open(oxen::quic::connection_interface& ci)
{ {
std::unique_ptr<LinkManager> p{new LinkManager(r)}; _router.loop()->call([this, &conn_interface = ci]() {
return p; const auto rid = RouterID{conn_interface.remote_key()};
const auto& remote = conn_interface.remote();
const auto& scid = conn_interface.scid();
if (conn_interface.is_inbound())
{
log::critical(logcat, "Inbound connection fom {} (remote:{})", rid, remote);
on_inbound_conn(conn_interface);
}
else
{
if (auto itr = ep.pending_conns.find(rid); itr != ep.pending_conns.end())
{
ep.connid_map.emplace(scid, rid);
auto [it, b] = ep.active_conns.emplace(rid, nullptr);
it->second = std::move(itr->second);
log::critical(logcat, "Connection to RID:{} moved from pending to active conns!", rid);
}
else
throw std::runtime_error{"Could not find newly established connection in pending conns!"};
}
log::critical(
logcat,
"SERVICE NODE (RID:{}) ESTABLISHED CONNECTION TO RID:{}",
_router.local_rid(),
rid);
// check to see if this connection was established while we were attempting to queue
// messages to the remote
if (auto itr = pending_conn_msg_queue.find(rid); itr != pending_conn_msg_queue.end())
{
log::critical(logcat, "Clearing pending queue for RID:{}", rid);
auto& que = itr->second;
while (not que.empty())
{
auto& msg = que.front();
if (msg.is_control)
{
log::critical(logcat, "Dispatching {} request!", *msg.endpoint);
ep.active_conns[rid]->control_stream->command(
std::move(*msg.endpoint), std::move(msg.body), std::move(msg.func));
}
else
{
conn_interface.send_datagram(std::move(msg.body));
}
que.pop_front();
}
return;
}
log::warning(logcat, "No pending queue to clear for RID:{}", rid);
});
};
void
LinkManager::on_conn_closed(oxen::quic::connection_interface& ci, uint64_t ec)
{
_router.loop()->call([this, &conn_interface = ci, error_code = ec]() {
const auto& scid = conn_interface.scid();
log::critical(quic_cat, "Purging quic connection CID:{} (ec: {})", scid, error_code);
if (const auto& c_itr = ep.connid_map.find(scid); c_itr != ep.connid_map.end())
{
const auto& rid = c_itr->second;
// if (auto maybe = rids_pending_verification.find(rid);
// maybe != rids_pending_verification.end())
// rids_pending_verification.erase(maybe);
// in case this didn't clear earlier, do it now
if (auto p_itr = pending_conn_msg_queue.find(rid); p_itr != pending_conn_msg_queue.end())
pending_conn_msg_queue.erase(p_itr);
if (auto m_itr = ep.active_conns.find(rid); m_itr != ep.active_conns.end())
ep.active_conns.erase(m_itr);
ep.connid_map.erase(c_itr);
log::critical(quic_cat, "Quic connection CID:{} purged successfully", scid);
}
});
} }
bool bool
LinkManager::send_control_message( LinkManager::send_control_message(
const RouterID& remote, const RouterID& remote,
@ -293,11 +428,23 @@ namespace llarp
body = std::move(body), body = std::move(body),
f = std::move(func)]() { f = std::move(func)]() {
auto pending = PendingMessage(std::move(body), std::move(endpoint), std::move(f)); auto pending = PendingMessage(std::move(body), std::move(endpoint), std::move(f));
if (auto it1 = ep.pending_conns.find(remote); it1 != ep.pending_conns.end())
{
if (auto it2 = pending_conn_msg_queue.find(remote); it2 != pending_conn_msg_queue.end())
{
it2->second.push_back(std::move(pending));
log::critical(logcat, "Connection (RID:{}) is pending; message appended to send queue!", remote);
}
}
else
{
log::critical(logcat, "Connection (RID:{}) not found in pending conns; creating send queue!", remote);
auto [itr, b] = pending_conn_msg_queue.emplace(remote, MessageQueue());
itr->second.push_back(std::move(pending));
connect_to(remote);
}
auto [itr, b] = pending_conn_msg_queue.emplace(remote, MessageQueue());
itr->second.push_back(std::move(pending));
connect_to(remote);
}); });
return false; return false;
@ -386,106 +533,6 @@ namespace llarp
log::warning(quic_cat, "Failed to begin establishing connection to {}", remote_addr); log::warning(quic_cat, "Failed to begin establishing connection to {}", remote_addr);
} }
void
LinkManager::on_inbound_conn(oxen::quic::connection_interface& ci)
{
const auto& scid = ci.scid();
RouterID rid{ci.remote_key()};
ep.connid_map.emplace(scid, rid);
auto [itr, b] = ep.conns.emplace(rid, nullptr);
auto control_stream = ci.queue_stream<oxen::quic::BTRequestStream>([](oxen::quic::Stream& s,
uint64_t error_code) {
log::warning(
logcat, "BTRequestStream closed unexpectedly (ec:{}); closing connection...", error_code);
s.conn.close_connection(error_code);
});
log::critical(logcat, "Opened BTStream ID:{}", control_stream->stream_id());
itr->second = std::make_shared<link::Connection>(ci.shared_from_this(), control_stream);
log::critical(logcat, "Successfully configured inbound connection fom {}...", rid);
}
// TODO: should we add routes here now that Router::SessionOpen is gone?
void
LinkManager::on_conn_open(oxen::quic::connection_interface& ci)
{
_router.loop()->call([this, &conn_interface = ci]() {
const auto rid = RouterID{conn_interface.remote_key()};
const auto& remote = conn_interface.remote();
if (conn_interface.is_inbound())
{
log::critical(logcat, "Inbound connection fom {} (remote:{})", rid, remote);
on_inbound_conn(conn_interface);
}
log::critical(
logcat,
"SERVICE NODE (RID:{}) ESTABLISHED CONNECTION TO RID:{}",
_router.local_rid(),
rid);
// check to see if this connection was established while we were attempting to queue
// messages to the remote
if (auto itr = pending_conn_msg_queue.find(rid); itr != pending_conn_msg_queue.end())
{
log::critical(logcat, "Clearing pending queue for RID:{}", rid);
auto& que = itr->second;
while (not que.empty())
{
auto& msg = que.front();
if (msg.is_control)
{
log::critical(logcat, "Dispatching {} request!", *msg.endpoint);
ep.conns[rid]->control_stream->command(
std::move(*msg.endpoint), std::move(msg.body), std::move(msg.func));
}
else
{
conn_interface.send_datagram(std::move(msg.body));
}
que.pop_front();
}
return;
}
log::warning(logcat, "No pending queue to clear for RID:{}", rid);
});
};
void
LinkManager::on_conn_closed(oxen::quic::connection_interface& ci, uint64_t ec)
{
_router.loop()->call([this, &conn_interface = ci, error_code = ec]() {
const auto& scid = conn_interface.scid();
log::critical(quic_cat, "Purging quic connection CID:{} (ec: {})", scid, error_code);
if (const auto& c_itr = ep.connid_map.find(scid); c_itr != ep.connid_map.end())
{
const auto& rid = c_itr->second;
// if (auto maybe = rids_pending_verification.find(rid);
// maybe != rids_pending_verification.end())
// rids_pending_verification.erase(maybe);
// in case this didn't clear earlier, do it now
if (auto p_itr = pending_conn_msg_queue.find(rid); p_itr != pending_conn_msg_queue.end())
pending_conn_msg_queue.erase(p_itr);
if (auto m_itr = ep.conns.find(rid); m_itr != ep.conns.end())
ep.conns.erase(m_itr);
ep.connid_map.erase(c_itr);
log::critical(quic_cat, "Quic connection CID:{} purged successfully", scid);
}
});
}
bool bool
LinkManager::have_connection_to(const RouterID& remote, bool client_only) const LinkManager::have_connection_to(const RouterID& remote, bool client_only) const
{ {
@ -598,7 +645,7 @@ namespace llarp
void void
LinkManager::gossip_rc(const RouterID& rc_rid, std::string serialized_rc) LinkManager::gossip_rc(const RouterID& rc_rid, std::string serialized_rc)
{ {
for (auto& [rid, conn] : ep.conns) for (auto& [rid, conn] : ep.active_conns)
{ {
// don't send back to the owner... // don't send back to the owner...
if (rid == rc_rid) if (rid == rc_rid)
@ -607,7 +654,9 @@ namespace llarp
if (not conn->remote_is_relay) if (not conn->remote_is_relay)
continue; continue;
send_control_message(rid, "gossip_rc", serialized_rc); send_control_message(rid, "gossip_rc", serialized_rc, [](oxen::quic::message) mutable {
log::critical(logcat, "PLACEHOLDER FOR GOSSIP RC RESPONSE HANDLER");
});
} }
} }
@ -702,12 +751,12 @@ namespace llarp
} }
} }
auto& src = is_seed ? node_db->bootstrap_seeds() : node_db->get_known_rcs(); auto& src = node_db->get_known_rcs();
auto count = src.size(); auto count = src.size();
if (count == 0) if (count == 0)
{ {
log::error(logcat, "No {} locally to send!", is_seed ? "bootstrap seeds" : "known RCs"); log::error(logcat, "No known RCs locally to send!");
m.respond(messages::ERROR_RESPONSE, true); m.respond(messages::ERROR_RESPONSE, true);
return; return;
} }
@ -899,9 +948,9 @@ namespace llarp
void void
LinkManager::handle_find_name_response(oxen::quic::message m) LinkManager::handle_find_name_response(oxen::quic::message m)
{ {
if (m.timed_out) if (not m)
{ {
log::info(link_cat, "FindNameMessage timed out!"); log::info(link_cat, "FindNameMessage failed!");
return; return;
} }
@ -1035,7 +1084,7 @@ namespace llarp
"publish_intro", "publish_intro",
PublishIntroMessage::serialize(introset, relay_order, is_relayed), PublishIntroMessage::serialize(introset, relay_order, is_relayed),
[respond = std::move(respond)](oxen::quic::message m) { [respond = std::move(respond)](oxen::quic::message m) {
if (m.timed_out) if (not m)
return; // drop if timed out; requester will have timed out as well return; // drop if timed out; requester will have timed out as well
respond(m.body_str()); respond(m.body_str());
}); });
@ -1073,7 +1122,7 @@ namespace llarp
void void
LinkManager::handle_publish_intro_response(oxen::quic::message m) LinkManager::handle_publish_intro_response(oxen::quic::message m)
{ {
if (m.timed_out) if (not m)
{ {
log::info(link_cat, "PublishIntroMessage timed out!"); log::info(link_cat, "PublishIntroMessage timed out!");
return; return;
@ -1176,7 +1225,7 @@ namespace llarp
link_cat, link_cat,
"Relayed FindIntroMessage returned successful response; transmitting to initial " "Relayed FindIntroMessage returned successful response; transmitting to initial "
"requester"); "requester");
else if (relay_response.timed_out) else if (not relay_response)
log::critical( log::critical(
link_cat, "Relayed FindIntroMessage timed out! Notifying initial requester"); link_cat, "Relayed FindIntroMessage timed out! Notifying initial requester");
else else
@ -1203,7 +1252,7 @@ namespace llarp
void void
LinkManager::handle_find_intro_response(oxen::quic::message m) LinkManager::handle_find_intro_response(oxen::quic::message m)
{ {
if (m.timed_out) if (not m)
{ {
log::info(link_cat, "FindIntroMessage timed out!"); log::info(link_cat, "FindIntroMessage timed out!");
return; return;
@ -1396,7 +1445,7 @@ namespace llarp
"then relaying response"); "then relaying response");
_router.path_context().put_transit_hop(hop); _router.path_context().put_transit_hop(hop);
} }
if (m.timed_out) if (not m)
log::info(link_cat, "Upstream timed out on path build; relaying timeout"); log::info(link_cat, "Upstream timed out on path build; relaying timeout");
else else
log::info(link_cat, "Upstream returned path build failure; relaying response"); log::info(link_cat, "Upstream returned path build failure; relaying response");
@ -1513,7 +1562,7 @@ namespace llarp
void void
LinkManager::handle_obtain_exit_response(oxen::quic::message m) LinkManager::handle_obtain_exit_response(oxen::quic::message m)
{ {
if (m.timed_out) if (not m)
{ {
log::info(link_cat, "ObtainExitMessage timed out!"); log::info(link_cat, "ObtainExitMessage timed out!");
return; return;
@ -1591,7 +1640,7 @@ namespace llarp
void void
LinkManager::handle_update_exit_response(oxen::quic::message m) LinkManager::handle_update_exit_response(oxen::quic::message m)
{ {
if (m.timed_out) if (not m)
{ {
log::info(link_cat, "UpdateExitMessage timed out!"); log::info(link_cat, "UpdateExitMessage timed out!");
return; return;
@ -1676,7 +1725,7 @@ namespace llarp
void void
LinkManager::handle_close_exit_response(oxen::quic::message m) LinkManager::handle_close_exit_response(oxen::quic::message m)
{ {
if (m.timed_out) if (not m)
{ {
log::info(link_cat, "CloseExitMessage timed out!"); log::info(link_cat, "CloseExitMessage timed out!");
return; return;
@ -1828,7 +1877,7 @@ namespace llarp
void void
LinkManager::handle_convo_intro(oxen::quic::message m) LinkManager::handle_convo_intro(oxen::quic::message m)
{ {
if (m.timed_out) if (not m)
{ {
log::info(link_cat, "Path control message timed out!"); log::info(link_cat, "Path control message timed out!");
return; return;

@ -48,9 +48,12 @@ namespace llarp
// for outgoing packets, we route via RouterID; map RouterID->Connection // for outgoing packets, we route via RouterID; map RouterID->Connection
// for incoming packets, we get a ConnectionID; map ConnectionID->RouterID // for incoming packets, we get a ConnectionID; map ConnectionID->RouterID
std::unordered_map<RouterID, std::shared_ptr<link::Connection>> conns; std::unordered_map<RouterID, std::shared_ptr<link::Connection>> active_conns;
std::unordered_map<oxen::quic::ConnectionID, RouterID> connid_map; std::unordered_map<oxen::quic::ConnectionID, RouterID> connid_map;
// for pending connections, cleared in LinkManager::on_conn_open
std::unordered_map<RouterID, std::shared_ptr<link::Connection>> pending_conns;
// TODO: see which of these is actually useful and delete the other // TODO: see which of these is actually useful and delete the other
std::shared_ptr<link::Connection> std::shared_ptr<link::Connection>
get_conn(const RemoteRC&) const; get_conn(const RemoteRC&) const;
@ -395,16 +398,18 @@ namespace llarp
{ {
try try
{ {
log::critical(logcat, "Establishing connection to {}", remote); const auto& rid = rc.router_id();
log::critical(logcat, "Establishing connection to RID:{}", rid);
auto conn_interface = auto conn_interface =
endpoint->connect(remote, link_manager.tls_creds, std::forward<Opt>(opts)...); endpoint->connect(remote, link_manager.tls_creds, std::forward<Opt>(opts)...);
// emplace immediately for connection open callback to find scid // add to pending conns
connid_map.emplace(conn_interface->scid(), rc.router_id()); auto [itr, b] = pending_conns.emplace(rid, nullptr);
auto [itr, b] = conns.emplace(rc.router_id(), nullptr);
log::critical(logcat, "Establishing connection to {}...", rc.router_id()); // emplace immediately for connection open callback to find scid
// connid_map.emplace(conn_interface->scid(), rc.router_id());
// auto [itr, b] = conns.emplace(rc.router_id(), nullptr);
auto control_stream = conn_interface->template get_new_stream<oxen::quic::BTRequestStream>( auto control_stream = conn_interface->template get_new_stream<oxen::quic::BTRequestStream>(
[](oxen::quic::Stream& s, uint64_t error_code) { [](oxen::quic::Stream& s, uint64_t error_code) {
@ -418,6 +423,7 @@ namespace llarp
link_manager.register_commands(control_stream); link_manager.register_commands(control_stream);
itr->second = std::make_shared<link::Connection>(conn_interface, control_stream); itr->second = std::make_shared<link::Connection>(conn_interface, control_stream);
log::critical(logcat, "Connection to RID:{} added to pending connections...", rid);
return true; return true;
} }
catch (...) catch (...)

@ -379,16 +379,16 @@ namespace llarp
src, src,
FetchRCMessage::serialize(_router.last_rc_fetch, needed), FetchRCMessage::serialize(_router.last_rc_fetch, needed),
[this, src, initial](oxen::quic::message m) mutable { [this, src, initial](oxen::quic::message m) mutable {
if (m.timed_out) if (not m)
{ {
log::info(logcat, "RC fetch to {} timed out", src); log::info(logcat, "RC fetch to {} failed!", src);
fetch_rcs_result(initial, true); fetch_rcs_result(initial, true);
return; return;
} }
try try
{ {
oxenc::bt_dict_consumer btdc{m.body()}; oxenc::bt_dict_consumer btdc{m.body()};
// TODO: fix this shit after removing ::timed_out from message type
if (not m) if (not m)
{ {
auto reason = btdc.require<std::string_view>(messages::STATUS_KEY); auto reason = btdc.require<std::string_view>(messages::STATUS_KEY);
@ -689,7 +689,8 @@ namespace llarp
return; return;
} }
std::set<RouterID> rids; // std::set<RouterID> rids;
size_t num = 0;
try try
{ {
@ -701,7 +702,8 @@ namespace llarp
while (not btlc.is_finished()) while (not btlc.is_finished())
{ {
auto rc = RemoteRC{btlc.consume_dict_data()}; auto rc = RemoteRC{btlc.consume_dict_data()};
rids.emplace(rc.router_id()); put_rc(rc);
++num;
} }
} }
} }
@ -724,23 +726,30 @@ namespace llarp
// next call to fallback_to_bootstrap() and hit the base case, rotating sources // next call to fallback_to_bootstrap() and hit the base case, rotating sources
// bootstrap_attempts = MAX_BOOTSTRAP_FETCH_ATTEMPTS; // bootstrap_attempts = MAX_BOOTSTRAP_FETCH_ATTEMPTS;
if (rids.size() == BOOTSTRAP_SOURCE_COUNT) // const auto& num = rids.size();
{
known_rids.merge(rids); log::critical(logcat, "BootstrapRC fetch response from {} returned {}/{} needed RCs", fetch_source, num, BOOTSTRAP_SOURCE_COUNT);
fetch_initial(); // known_rids.merge(rids);
} fetch_initial();
else
{ // FIXME: when moving to testnet, uncomment this
// ++bootstrap_attempts; // if (rids.size() == BOOTSTRAP_SOURCE_COUNT)
log::warning( // {
logcat, // known_rids.merge(rids);
"BootstrapRC fetch response from {} returned insufficient number of RC's (error " // fetch_initial();
"{}/{})", // }
fetch_source, // else
bootstrap_attempts, // {
MAX_BOOTSTRAP_FETCH_ATTEMPTS); // // ++bootstrap_attempts;
fallback_to_bootstrap(); // log::warning(
} // logcat,
// "BootstrapRC fetch response from {} returned insufficient number of RC's (error "
// "{}/{})",
// fetch_source,
// bootstrap_attempts,
// MAX_BOOTSTRAP_FETCH_ATTEMPTS);
// fallback_to_bootstrap();
// }
}); });
} }
@ -955,9 +964,6 @@ namespace llarp
{ {
const auto& rid = rc.router_id(); const auto& rid = rc.router_id();
if (not want_rc(rid))
return false;
known_rcs.erase(rc); known_rcs.erase(rc);
rc_lookup.erase(rid); rc_lookup.erase(rid);
@ -975,6 +981,12 @@ namespace llarp
return known_rcs.size(); return known_rcs.size();
} }
size_t
NodeDB::num_rids() const
{
return known_rids.size();
}
bool bool
NodeDB::put_rc_if_newer(RemoteRC rc, rc_time now) NodeDB::put_rc_if_newer(RemoteRC rc, rc_time now)
{ {

@ -403,6 +403,9 @@ namespace llarp
size_t size_t
num_rcs() const; num_rcs() const;
size_t
num_rids() const;
/// do periodic tasks like flush to disk and expiration /// do periodic tasks like flush to disk and expiration
void void
Tick(llarp_time_t now); Tick(llarp_time_t now);

@ -118,7 +118,7 @@ namespace llarp::path
if ((not self) or (not response_cb)) if ((not self) or (not response_cb))
return; return;
if (m.timed_out) if (not m)
{ {
response_cb(messages::TIMEOUT_RESPONSE); response_cb(messages::TIMEOUT_RESPONSE);
return; return;

@ -513,9 +513,9 @@ namespace llarp
path->EnterState(path::ePathEstablished); path->EnterState(path::ePathEstablished);
return; return;
} }
if (m.timed_out) if (not m)
{ {
log::warning(path_cat, "Path build timed out"); log::warning(path_cat, "Path build request failed!");
} }
else else
{ {

@ -768,8 +768,9 @@ namespace llarp
log::critical( log::critical(
logcat, logcat,
"{} RCs loaded with {} bootstrap peers and {} router connections!", "{} RCs loaded with {} RIDs, {} bootstrap peers, and {} router connections!",
_node_db->num_rcs(), _node_db->num_rcs(),
_node_db->num_rids(),
_node_db->num_bootstraps(), _node_db->num_bootstraps(),
num_router_connections()); num_router_connections());
@ -1138,7 +1139,7 @@ namespace llarp
log::info(logcat, "Loading NodeDB from disk..."); log::info(logcat, "Loading NodeDB from disk...");
_node_db->load_from_disk(); _node_db->load_from_disk();
_node_db->store_bootstraps(); // _node_db->store_bootstraps();
oxen::log::flush(); oxen::log::flush();

@ -97,7 +97,7 @@ namespace llarp::rpc
return; // bail return; // bail
} }
LogDebug("new block at height ", m_BlockHeight); log::trace(logcat, "new block at height {}", m_BlockHeight);
// don't upadate on block notification if an update is pending // don't upadate on block notification if an update is pending
if (not m_UpdatingList) if (not m_UpdatingList)
UpdateServiceNodeList(); UpdateServiceNodeList();
@ -138,7 +138,7 @@ namespace llarp::rpc
throw std::runtime_error{"get_service_nodes did not return 'OK' status"}; throw std::runtime_error{"get_service_nodes did not return 'OK' status"};
if (auto it = json.find("unchanged"); if (auto it = json.find("unchanged");
it != json.end() and it->is_boolean() and it->get<bool>()) it != json.end() and it->is_boolean() and it->get<bool>())
LogDebug("service node list unchanged"); log::trace(logcat, "service node list unchanged");
else else
{ {
self->HandleNewServiceNodeList(json.at("service_node_states")); self->HandleNewServiceNodeList(json.at("service_node_states"));

Loading…
Cancel
Save