fetch RID result handler logic

This commit is contained in:
dr7ana 2023-11-28 12:50:07 -08:00
parent d6b8b55727
commit baabfabedc
3 changed files with 41 additions and 52 deletions

View File

@ -167,7 +167,7 @@ namespace llarp
_router.loop()->call([this, msg = std::move(m), func = std::move(func)]() mutable {
auto body = msg.body_str();
auto respond = [m = std::move(msg)](std::string response) mutable {
m.respond(std::move(response));
m.respond(std::move(response), not m);
};
std::invoke(func, this, body, std::move(respond));
});
@ -517,12 +517,6 @@ namespace llarp
LinkManager::fetch_router_ids(
const RouterID& via, std::string payload, std::function<void(oxen::quic::message m)> func)
{
if (ep.conns.empty())
{
log::debug(link_cat, "Not attempting to fetch Router IDs: not connected to any relays.");
return;
}
send_control_message(via, "fetch_router_ids"s, std::move(payload), std::move(func));
}
@ -571,7 +565,7 @@ namespace llarp
[source_rid = std::move(source_rid),
orig_mess = std::move(m)](oxen::quic::message m) mutable {
if (not m.timed_out)
orig_mess.respond(m.body_str());
orig_mess.respond(m.body_str(), not m);
// on timeout, just silently drop (as original requester will just time out anyway)
});
}

View File

@ -208,11 +208,9 @@ namespace llarp
}
void
NodeDB::ingest_rid_fetch_responses(const RemoteRC& source, std::vector<RouterID> ids)
NodeDB::ingest_rid_fetch_responses(const RouterID& source, std::vector<RouterID> ids)
{
const auto& rid = source.router_id();
fetch_rid_responses[rid] = std::move(ids);
fetch_rid_responses[source] = std::move(ids);
}
// TODO: trust model
@ -355,7 +353,6 @@ namespace llarp
while (num_failures < MAX_FETCH_ATTEMPTS)
{
RemoteRC& src_rc = known_rcs[src];
auto success = std::make_shared<std::promise<int>>();
auto f = success->get_future();
fails.clear();
@ -365,12 +362,12 @@ namespace llarp
_router.link_manager().fetch_router_ids(
src,
RouterIDFetch::serialize(target),
[this, src, src_rc, target, p = std::move(success)](oxen::quic::message m) mutable {
[this, src, target, p = std::move(success)](oxen::quic::message m) mutable {
if (not m)
{
log::info(link_cat, "RID fetch from {} via {} timed out", src, target);
ingest_rid_fetch_responses(src_rc);
ingest_rid_fetch_responses(src);
p->set_value(-1);
return;
}
@ -405,7 +402,7 @@ namespace llarp
router_ids.emplace_back(s.data());
}
ingest_rid_fetch_responses(src_rc, std::move(router_ids));
ingest_rid_fetch_responses(src, std::move(router_ids));
return;
}
catch (const std::exception& e)
@ -414,7 +411,7 @@ namespace llarp
p->set_value(0);
}
ingest_rid_fetch_responses(src_rc); // empty response == failure
ingest_rid_fetch_responses(src); // empty response == failure
});
switch (f.get())
@ -581,12 +578,10 @@ namespace llarp
void
NodeDB::fetch_router_ids()
{
auto& num_failures = fetch_failures;
// base case; this function is called recursively
if (num_failures > MAX_FETCH_ATTEMPTS)
if (fetch_failures > MAX_FETCH_ATTEMPTS)
{
fetch_rids_result(fetch_source, true);
fetch_rids_result();
return;
}
@ -604,20 +599,20 @@ namespace llarp
fetch_rid_responses.clear();
RouterID& src = fetch_source;
RemoteRC& src_rc = known_rcs[src];
for (const auto& target : rid_sources)
{
_router.link_manager().fetch_router_ids(
src,
RouterIDFetch::serialize(target),
[this, src, src_rc, target](oxen::quic::message m) mutable {
if (not m)
[this, src, target](oxen::quic::message m) mutable {
if (m.timed_out)
{
log::info(link_cat, "RID fetch from {} via {} timed out", src, target);
ingest_rid_fetch_responses(src_rc);
fetch_rids_result(src, true);
++fetch_failures;
ingest_rid_fetch_responses(src);
fetch_rids_result();
return;
}
@ -644,55 +639,55 @@ namespace llarp
{
log::warning(
link_cat, "RID fetch from {} via {} returned bad RouterID", target, src);
ingest_rid_fetch_responses(src_rc);
fetch_rids_result(src, true);
ingest_rid_fetch_responses(target);
fail_sources.insert(target);
fetch_rids_result();
return;
}
router_ids.emplace_back(s.data());
}
ingest_rid_fetch_responses(src_rc, std::move(router_ids));
fetch_rids_result(src);
ingest_rid_fetch_responses(target, std::move(router_ids));
fetch_rids_result(); // success
return;
}
catch (const std::exception& e)
{
log::info(link_cat, "Error handling fetch RouterIDs response: {}", e.what());
ingest_rid_fetch_responses(src_rc);
fetch_rids_result(src, true);
ingest_rid_fetch_responses(target);
fail_sources.insert(target);
fetch_rids_result();
}
});
}
}
void
NodeDB::fetch_rids_result(const RouterID& target, bool error)
NodeDB::fetch_rids_result()
{
if (error)
if (fetch_failures > MAX_FETCH_ATTEMPTS)
{
fail_sources.insert(target);
++fetch_failures;
log::info(
logcat,
"Failed {} attempts to fetch RID's from {}; reverting to bootstrap...",
MAX_FETCH_ATTEMPTS,
fetch_source);
if (fetch_failures > MAX_FETCH_ATTEMPTS)
{
log::info(
logcat,
"Failed {} attempts to fetch RID's from {}; reverting to bootstrap...",
MAX_FETCH_ATTEMPTS,
fetch_source);
// TODO: revert to bootstrap
// set rc_fetch_source to bootstrap and START OVER!
}
else
// find new non-bootstrap RC fetch source and try again buddy
fetch_source = std::next(known_rcs.begin(), csrng() % known_rcs.size())->first;
// TODO: revert rc_source to bootstrap, start over
fetch_router_ids();
return;
}
log::debug(logcat, "Successfully fetched RID's from {}", fetch_source);
auto n_responses = fetch_rid_responses.size();
if (n_responses < MIN_RID_FETCHES)
{
log::debug(logcat, "Received {}/{} fetch RID requests", n_responses, 12);
return;
}
auto n_fails = fail_sources.size();
if (n_fails <= MAX_RID_ERRORS)

View File

@ -143,7 +143,7 @@ namespace llarp
process_fetched_rcs(RouterID source, std::vector<RemoteRC> rcs, rc_time timestamp);
void
ingest_rid_fetch_responses(const RemoteRC& source, std::vector<RouterID> ids = {});
ingest_rid_fetch_responses(const RouterID& source, std::vector<RouterID> ids = {});
bool
process_fetched_rids();
@ -173,7 +173,7 @@ namespace llarp
post_fetch_rids();
void
fetch_rids_result(const RouterID& target, bool error = false);
fetch_rids_result();
void
select_router_id_sources(std::unordered_set<RouterID> excluded = {});