Clean up oxend service node list handling

This aligns service node updating logic a bit closer to what happens in
storage server, and should make it a bit more resilient, hopefully
tracking down the (off-Github) reported issue where lokinet sometimes
doesn't see itself as active.

- Initiate a service node list update in the 30s timer lokinet ping
  timer (in case we miss a block notify for some reason); although this
  is expensive, the next point mitigates it:

- Retrieve the block hash with the SN state update, and feed it back
  into the next get_service_nodes call (as "poll_block_hash") so that
  oxend just sends back a mostly-empty response when the block hasn't
  changed, allowing both oxend and lokinet to skip nearly all of the
  work of a service node list update when the block hasn't changed since
  the last poll.  (This was already partially implemenated--we were
  already looking for "unchanged"--but without a block hash to get from
  and pass back to oxend we'd never actually get an "unchanged" result).

- Tighten up the service node list handling by moving the "unchanged"
  handling into the get_service_nodes response handler: this way the
  HandleNewServiceNodeList function is only handling the list but not
  the logic as to whether there actually is a new list or not.
pull/2015/head
Jason Rhinelander 2 years ago
parent 54fba30516
commit 0e576ff59e

@ -112,35 +112,57 @@ namespace llarp
void
LokidRpcClient::UpdateServiceNodeList()
{
nlohmann::json request, fields;
fields["pubkey_ed25519"] = true;
fields["service_node_pubkey"] = true;
fields["funded"] = true;
fields["active"] = true;
request["fields"] = fields;
m_UpdatingList = true;
if (m_UpdatingList.exchange(true))
return; // update already in progress
nlohmann::json request{
{"fields",
{
{"pubkey_ed25519", true},
{"service_node_pubkey", true},
{"funded", true},
{"active", true},
{"block_hash", true},
}},
};
if (!m_LastUpdateHash.empty())
request["fields"]["poll_block_hash"] = m_LastUpdateHash;
Request(
"rpc.get_service_nodes",
[self = shared_from_this()](bool success, std::vector<std::string> data) {
self->m_UpdatingList = false;
if (not success)
{
LogWarn("failed to update service node list");
return;
}
if (data.size() < 2)
{
LogWarn("lokid gave empty reply for service node list");
return;
}
try
else if (data.size() < 2)
LogWarn("oxend gave empty reply for service node list");
else
{
self->HandleGotServiceNodeList(std::move(data[1]));
}
catch (std::exception& ex)
{
LogError("failed to process service node list: ", ex.what());
try
{
auto json = nlohmann::json::parse(std::move(data[1]));
if (json.at("status") != "OK")
throw std::runtime_error{"get_service_nodes did not return 'OK' status"};
if (auto it = json.find("unchanged");
it != json.end() and it->is_boolean() and it->get<bool>())
LogDebug("service node list unchanged");
else
{
self->HandleNewServiceNodeList(json.at("service_node_states"));
if (auto it = json.find("block_hash"); it != json.end() and it->is_string())
self->m_LastUpdateHash = it->get<std::string>();
else
self->m_LastUpdateHash.clear();
}
}
catch (const std::exception& ex)
{
LogError("failed to process service node list: ", ex.what());
}
}
// set down here so that the 1) we don't start updating until we're completely finished
// with the previous update; and 2) so that m_UpdatingList also guards m_LastUpdateHash
self->m_UpdatingList = false;
},
request.dump());
}
@ -180,52 +202,51 @@ namespace llarp
}
LogDebug("subscribed to new blocks: ", data[0]);
});
// Trigger an update on a regular timer as well in case we missed a block notify for some
// reason (e.g. oxend restarts and loses the subscription); we poll using the last known
// hash so that the poll is very cheap (basically empty) if the block hasn't advanced.
self->UpdateServiceNodeList();
};
// Fire one ping off right away to get things going.
makePingRequest();
m_lokiMQ->add_timer(makePingRequest, PingInterval);
// initial fetch of service node list
UpdateServiceNodeList();
}
void
LokidRpcClient::HandleGotServiceNodeList(std::string data)
LokidRpcClient::HandleNewServiceNodeList(const nlohmann::json& j)
{
auto j = nlohmann::json::parse(std::move(data));
if (const auto itr = j.find("unchanged"); itr != j.end() and itr->get<bool>())
{
LogDebug("service node list unchanged");
return;
}
std::unordered_map<RouterID, PubKey> keymap;
std::vector<RouterID> activeNodeList, nonActiveNodeList;
if (const auto itr = j.find("service_node_states"); itr != j.end() and itr->is_array())
if (not j.is_array())
throw std::runtime_error{
"Invalid service node list: expected array of service node states"};
for (auto& snode : j)
{
for (auto& snode : *itr)
{
// Skip unstaked snodes:
if (const auto funded_itr = snode.find("funded"); funded_itr == snode.end()
or not funded_itr->is_boolean() or not funded_itr->get<bool>())
continue;
const auto ed_itr = snode.find("pubkey_ed25519");
if (ed_itr == snode.end() or not ed_itr->is_string())
continue;
const auto svc_itr = snode.find("service_node_pubkey");
if (svc_itr == snode.end() or not svc_itr->is_string())
continue;
const auto active_itr = snode.find("active");
if (active_itr == snode.end() or not active_itr->is_boolean())
continue;
const bool active = active_itr->get<bool>();
RouterID rid;
PubKey pk;
if (not rid.FromHex(ed_itr->get<std::string_view>())
or not pk.FromHex(svc_itr->get<std::string_view>()))
continue;
keymap[rid] = pk;
(active ? activeNodeList : nonActiveNodeList).push_back(std::move(rid));
}
// Skip unstaked snodes:
if (const auto funded_itr = snode.find("funded"); funded_itr == snode.end()
or not funded_itr->is_boolean() or not funded_itr->get<bool>())
continue;
const auto ed_itr = snode.find("pubkey_ed25519");
if (ed_itr == snode.end() or not ed_itr->is_string())
continue;
const auto svc_itr = snode.find("service_node_pubkey");
if (svc_itr == snode.end() or not svc_itr->is_string())
continue;
const auto active_itr = snode.find("active");
if (active_itr == snode.end() or not active_itr->is_boolean())
continue;
const bool active = active_itr->get<bool>();
RouterID rid;
PubKey pk;
if (not rid.FromHex(ed_itr->get<std::string_view>())
or not pk.FromHex(svc_itr->get<std::string_view>()))
continue;
keymap[rid] = pk;
(active ? activeNodeList : nonActiveNodeList).push_back(std::move(rid));
}
if (activeNodeList.empty())

@ -55,6 +55,8 @@ namespace llarp
void
Command(std::string_view cmd);
/// triggers a service node list refresh from oxend; thread-safe and will do nothing if an
/// update is already in progress.
void
UpdateServiceNodeList();
@ -72,8 +74,10 @@ namespace llarp
m_lokiMQ->request(*m_Connection, std::move(cmd), std::move(func));
}
// Handles a service node list update; takes the "service_node_states" object of an oxend
// "get_service_nodes" rpc request.
void
HandleGotServiceNodeList(std::string json);
HandleNewServiceNodeList(const nlohmann::json& json);
// Handles request from lokid for peer stats on a specific peer
void
@ -88,6 +92,7 @@ namespace llarp
std::weak_ptr<AbstractRouter> m_Router;
std::atomic<bool> m_UpdatingList;
std::string m_LastUpdateHash;
std::unordered_map<RouterID, PubKey> m_KeyMap;

Loading…
Cancel
Save