From 7970ad2d07bbafc40e1dd2f0014498ee9dcae34a Mon Sep 17 00:00:00 2001 From: dr7ana Date: Mon, 22 Jan 2024 13:05:06 -0800 Subject: [PATCH] Simul-defer connections - When two relays are repeatedly attempting connections to one another simultaneously, the connection initiated by the RouterID that appears first (in lexicographical order) is deferred to. The connection initiated by the other endpoint is marked to close quietly (w/o executing callbacks), and is rejected in the TLS verification hook - Bypassing callback execution is critical, as it will clean-up the link::Connection object for the connection that is being deferred to; this results in BOTH connections being destroyed. --- daemon/lokinet.cpp | 4 -- llarp/link/link_manager.cpp | 96 +++++++++++++++++++++---------------- llarp/link/link_manager.hpp | 57 +--------------------- llarp/router/router.cpp | 7 ++- llarp/router/router.hpp | 3 +- 5 files changed, 61 insertions(+), 106 deletions(-) diff --git a/daemon/lokinet.cpp b/daemon/lokinet.cpp index 3f930dc4d..19c673780 100644 --- a/daemon/lokinet.cpp +++ b/daemon/lokinet.cpp @@ -442,10 +442,6 @@ namespace cli.exit(e); }; - // TESTNET: - oxen::log::set_level("quic", oxen::log::Level::critical); - oxen::log::set_level("quicverbose", oxen::log::Level::debug); - if (configFile.has_value()) { // when we have an explicit filepath diff --git a/llarp/link/link_manager.cpp b/llarp/link/link_manager.cpp index 8fd2eb8d3..a19d75f50 100644 --- a/llarp/link/link_manager.cpp +++ b/llarp/link/link_manager.cpp @@ -266,29 +266,47 @@ namespace llarp ep.client_conns.emplace(other, nullptr); return true; } + if (alpn == alpns::SN_ALPNS) { // verify as service node! bool result = node_db->registered_routers().count(other); - log::critical( - logcat, - "{} node was {} to confirm remote (RID:{}) is registered; {} connection!", - us, - result ? "able" : "unable", - other, - result ? "allowing" : "rejecting"); - if (result) { - auto [_, b] = ep.service_conns.try_emplace(other, nullptr); + auto [itr, b] = ep.service_conns.try_emplace(other, nullptr); if (not b) + { + // If we fail to try_emplace a connection to the incoming RID, then we are + // simultaneously dealing with an outbound and inbound from the same connection. To + // resolve this, both endpoints will defer to the connection initiated by the RID + // that appears first in lexicographical order + auto defer_to_incoming = other < router().local_rid(); + + if (defer_to_incoming) + itr->second->conn->set_close_quietly(); + log::critical( - logcat, "{} node rejecting inbound -- already have connection to remote!", us); + logcat, + "{} node received inbound with ongoing outbound to remote (RID:{}); {}!", + us, + other, + defer_to_incoming ? "deferring to inbound" : "rejecting in favor of outbound"); + + return defer_to_incoming; + } - return result and b; + log::critical( + logcat, "{} node accepting inbound from registered remote (RID:{})", us, other); } + else + log::critical( + logcat, + "{} node was unable to confirm remote (RID:{}) is registered; rejecting " + "connection!", + us, + other); return result; } @@ -313,12 +331,8 @@ namespace llarp LinkManager::make_control(oxen::quic::connection_interface& ci, const RouterID& rid) { auto control_stream = ci.queue_incoming_stream( - [/* this, */ rid = rid](oxen::quic::Stream&, uint64_t error_code) { - log::warning( - logcat, - "BTRequestStream closed unexpectedly (ec:{}); closing inbound connection...", - error_code); - // ep.close_connection(rid); + [rid = rid](oxen::quic::Stream&, uint64_t error_code) { + log::warning(logcat, "BTRequestStream closed unexpectedly (ec:{})", error_code); }); log::critical(logcat, "Queued BTStream to be opened (ID:{})", control_stream->stream_id()); @@ -338,9 +352,7 @@ namespace llarp { log::critical(logcat, "Configuring inbound connection from relay RID:{}", rid); - if (!it->second) - it->second = - std::make_shared(ci.shared_from_this(), make_control(ci, rid)); + it->second = std::make_shared(ci.shared_from_this(), make_control(ci, rid)); } else if (auto it = ep.client_conns.find(rid); it != ep.client_conns.end()) { @@ -663,7 +675,7 @@ namespace llarp const auto& rid = rc.router_id(); auto res = client_only ? not ep.have_client_conn(rid) : not ep.have_conn(rid); - log::debug(logcat, "RID:{} {}", rid, res ? "ACCEPTED" : "REJECTED"); + log::trace(logcat, "RID:{} {}", rid, res ? "ACCEPTED" : "REJECTED"); return res; }; @@ -689,32 +701,34 @@ namespace llarp void LinkManager::gossip_rc(const RouterID& last_sender, const RemoteRC& rc) { - int count = 0; - const auto& gossip_src = rc.router_id(); + _router.loop()->call([this, last_sender, rc]() { + int count = 0; + const auto& gossip_src = rc.router_id(); - for (auto& [rid, conn] : ep.service_conns) - { - // don't send back to the gossip source or the last sender - if (rid == gossip_src or rid == last_sender) - continue; + for (auto& [rid, conn] : ep.service_conns) + { + // don't send back to the gossip source or the last sender + if (rid == gossip_src or rid == last_sender) + continue; - send_control_message( - rid, - "gossip_rc"s, - GossipRCMessage::serialize(last_sender, rc), - [](oxen::quic::message) mutable { - log::critical(logcat, "PLACEHOLDER FOR GOSSIP RC RESPONSE HANDLER"); - }); - ++count; - } + send_control_message( + rid, + "gossip_rc"s, + GossipRCMessage::serialize(last_sender, rc), + [](oxen::quic::message) mutable { + log::trace(logcat, "PLACEHOLDER FOR GOSSIP RC RESPONSE HANDLER"); + }); + ++count; + } - log::critical(logcat, "Dispatched {} GossipRC requests!", count); + log::critical(logcat, "Dispatched {} GossipRC requests!", count); + }); } void LinkManager::handle_gossip_rc(oxen::quic::message m) { - log::critical(logcat, "Handling GossipRC request..."); + log::debug(logcat, "Handling GossipRC request..."); // RemoteRC constructor wraps deserialization in a try/catch RemoteRC rc; @@ -730,7 +744,7 @@ namespace llarp } catch (const std::exception& e) { - log::info(link_cat, "Exception handling GossipRC request: {}", e.what()); + log::critical(link_cat, "Exception handling GossipRC request: {}", e.what()); return; } @@ -740,7 +754,7 @@ namespace llarp gossip_rc(_router.local_rid(), rc); } else - log::critical(link_cat, "Received known or old RC, not storing or forwarding."); + log::debug(link_cat, "Received known or old RC, not storing or forwarding."); } // TODO: can probably use ::send_control_message instead. Need to discuss the potential difference diff --git a/llarp/link/link_manager.hpp b/llarp/link/link_manager.hpp index 2be9c4ef1..c8272c46b 100644 --- a/llarp/link/link_manager.hpp +++ b/llarp/link/link_manager.hpp @@ -128,50 +128,6 @@ namespace llarp }; } // namespace link - enum class SessionResult - { - Establish, - Timeout, - RouterNotFound, - InvalidRouter, - NoLink, - EstablishFail - }; - - constexpr std::string_view - ToString(SessionResult sr) - { - return sr == llarp::SessionResult::Establish ? "success"sv - : sr == llarp::SessionResult::Timeout ? "timeout"sv - : sr == llarp::SessionResult::NoLink ? "no link"sv - : sr == llarp::SessionResult::InvalidRouter ? "invalid router"sv - : sr == llarp::SessionResult::RouterNotFound ? "not found"sv - : sr == llarp::SessionResult::EstablishFail ? "establish failed"sv - : "???"sv; - } - template <> - constexpr inline bool IsToStringFormattable = true; - - struct PendingMessage - { - std::string body; - std::optional endpoint = std::nullopt; - std::function func = nullptr; - - RouterID rid; - bool is_control = false; - - PendingMessage(std::string b) : body{std::move(b)} - {} - - PendingMessage( - std::string b, std::string ep, std::function f = nullptr) - : body{std::move(b)}, endpoint{std::move(ep)}, func{std::move(f)}, is_control{true} - {} - }; - - using MessageQueue = std::deque; - struct Router; struct LinkManager @@ -469,12 +425,7 @@ namespace llarp std::shared_ptr control_stream = conn_interface->template open_stream( [rid = rid](oxen::quic::Stream&, uint64_t error_code) { - log::warning( - logcat, - "BTRequestStream closed unexpectedly (ec:{}); closing outbound " - "connection...", - error_code); - // close_connection(rid); + log::warning(logcat, "BTRequestStream closed unexpectedly (ec:{})", error_code); }); if (is_snode) @@ -534,11 +485,7 @@ namespace llarp auto control_stream = conn_interface->template open_stream( [rid = rid](oxen::quic::Stream&, uint64_t error_code) { - log::warning( - logcat, - "BTRequestStream closed unexpectedly (ec:{}); closing outbound connection...", - error_code); - // close_connection(rid); + log::warning(logcat, "BTRequestStream closed unexpectedly (ec:{})", error_code); }); if (is_snode) diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index 329674ed1..063058609 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -381,8 +381,7 @@ namespace llarp llarp::logRingBuffer = nullptr; // TESTNET: - oxen::log::set_level("quic", oxen::log::Level::critical); - oxen::log::set_level("quicverbose", oxen::log::Level::debug); + // oxen::log::set_level("quic", oxen::log::Level::critical); log::debug(logcat, "Configuring router"); @@ -789,9 +788,9 @@ namespace llarp last_rc_gossip = now_timepoint; - // TESTNET: 1 to 5 minutes before testnet gossip interval + // TESTNET: 0 to 3 minutes before testnet gossip interval auto delta = - std::chrono::seconds{std::uniform_int_distribution{60, 300}(llarp::csrng)}; + std::chrono::seconds{std::uniform_int_distribution{0, 180}(llarp::csrng)}; next_rc_gossip = now_timepoint + TESTNET_GOSSIP_INTERVAL - delta; } diff --git a/llarp/router/router.hpp b/llarp/router/router.hpp index bc1f67ccf..7eeecf98d 100644 --- a/llarp/router/router.hpp +++ b/llarp/router/router.hpp @@ -11,7 +11,6 @@ #include #include #include -// #include #include #include #include @@ -56,7 +55,7 @@ namespace llarp (INTROSET_RELAY_REDUNDANCY * INTROSET_REQS_PER_RELAY)}; // TESTNET: these constants are shortened for testing purposes - inline constexpr std::chrono::milliseconds TESTNET_GOSSIP_INTERVAL{10min}; + inline constexpr std::chrono::milliseconds TESTNET_GOSSIP_INTERVAL{15min}; inline constexpr std::chrono::milliseconds RC_UPDATE_INTERVAL{5min}; inline constexpr std::chrono::milliseconds INITIAL_ATTEMPT_INTERVAL{30s}; // as we advance towards full mesh, we try to connect to this number per tick