|
|
|
@ -6,6 +6,7 @@
|
|
|
|
|
#include <llarp/crypto/crypto.hpp>
|
|
|
|
|
#include <llarp/messages/common.hpp>
|
|
|
|
|
#include <llarp/path/transit_hop.hpp>
|
|
|
|
|
#include <llarp/router/router.hpp>
|
|
|
|
|
#include <llarp/router_contact.hpp>
|
|
|
|
|
#include <llarp/util/compare_ptr.hpp>
|
|
|
|
|
#include <llarp/util/decaying_hashset.hpp>
|
|
|
|
@ -245,7 +246,7 @@ namespace llarp
|
|
|
|
|
|
|
|
|
|
void
|
|
|
|
|
register_commands(
|
|
|
|
|
std::shared_ptr<oxen::quic::BTRequestStream>& s,
|
|
|
|
|
const std::shared_ptr<oxen::quic::BTRequestStream>& s,
|
|
|
|
|
const RouterID& rid,
|
|
|
|
|
bool client_only = false);
|
|
|
|
|
|
|
|
|
@ -437,59 +438,71 @@ namespace llarp
|
|
|
|
|
std::function<void(oxen::quic::message m)> func,
|
|
|
|
|
Opt&&... opts)
|
|
|
|
|
{
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
const auto& rid = rc.router_id();
|
|
|
|
|
const auto& is_snode = _is_service_node;
|
|
|
|
|
const auto& is_control = ep.has_value();
|
|
|
|
|
const auto us = is_snode ? "Relay"s : "Client"s;
|
|
|
|
|
|
|
|
|
|
log::critical(logcat, "Establishing connection to RID:{}", rid);
|
|
|
|
|
// add to service conns
|
|
|
|
|
auto [itr, b] = service_conns.emplace(rid, nullptr);
|
|
|
|
|
|
|
|
|
|
auto conn_interface = endpoint->connect(
|
|
|
|
|
remote,
|
|
|
|
|
link_manager.tls_creds,
|
|
|
|
|
is_snode ? ROUTER_KEEP_ALIVE : CLIENT_KEEP_ALIVE,
|
|
|
|
|
std::forward<Opt>(opts)...);
|
|
|
|
|
|
|
|
|
|
// auto
|
|
|
|
|
std::shared_ptr<oxen::quic::BTRequestStream> control_stream =
|
|
|
|
|
conn_interface->template open_stream<oxen::quic::BTRequestStream>(
|
|
|
|
|
[this, rid = rid](oxen::quic::Stream&, uint64_t error_code) {
|
|
|
|
|
log::warning(
|
|
|
|
|
logcat,
|
|
|
|
|
"BTRequestStream closed unexpectedly (ec:{}); closing outbound connection...",
|
|
|
|
|
error_code);
|
|
|
|
|
close_connection(rid);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
if (is_snode)
|
|
|
|
|
link_manager.register_commands(control_stream, rid);
|
|
|
|
|
else
|
|
|
|
|
log::critical(logcat, "Client NOT registering BTStream commands!");
|
|
|
|
|
|
|
|
|
|
log::critical(
|
|
|
|
|
logcat,
|
|
|
|
|
"{} dispatching {} on outbound connection to remote (rid:{})",
|
|
|
|
|
us,
|
|
|
|
|
is_control ? "control message (ep:{})"_format(ep) : "data message",
|
|
|
|
|
rid);
|
|
|
|
|
|
|
|
|
|
(is_control) ? control_stream->command(std::move(*ep), std::move(body), std::move(func))
|
|
|
|
|
: conn_interface->send_datagram(std::move(body));
|
|
|
|
|
|
|
|
|
|
itr->second = std::make_shared<link::Connection>(conn_interface, control_stream, true);
|
|
|
|
|
|
|
|
|
|
log::critical(logcat, "Outbound connection to RID:{} added to service conns...", rid);
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
catch (...)
|
|
|
|
|
{
|
|
|
|
|
log::error(quic_cat, "Error: failed to establish connection to {}", remote);
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
return link_manager.router().loop()->call_get([&]() {
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
const auto& rid = rc.router_id();
|
|
|
|
|
const auto& is_snode = _is_service_node;
|
|
|
|
|
const auto& is_control = ep.has_value();
|
|
|
|
|
const auto us = is_snode ? "Relay"s : "Client"s;
|
|
|
|
|
|
|
|
|
|
log::critical(logcat, "Establishing connection to RID:{}", rid);
|
|
|
|
|
// add to service conns
|
|
|
|
|
auto [itr, b] = service_conns.try_emplace(rid, nullptr);
|
|
|
|
|
|
|
|
|
|
if (not b)
|
|
|
|
|
{
|
|
|
|
|
log::critical(logcat, "ERROR: attempting to establish an already-existing connection");
|
|
|
|
|
(is_control) ? itr->second->control_stream->command(
|
|
|
|
|
std::move(*ep), std::move(body), std::move(func))
|
|
|
|
|
: itr->second->conn->send_datagram(std::move(body));
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
auto conn_interface = endpoint->connect(
|
|
|
|
|
remote,
|
|
|
|
|
link_manager.tls_creds,
|
|
|
|
|
is_snode ? ROUTER_KEEP_ALIVE : CLIENT_KEEP_ALIVE,
|
|
|
|
|
std::forward<Opt>(opts)...);
|
|
|
|
|
|
|
|
|
|
// auto
|
|
|
|
|
std::shared_ptr<oxen::quic::BTRequestStream> control_stream =
|
|
|
|
|
conn_interface->template open_stream<oxen::quic::BTRequestStream>(
|
|
|
|
|
[rid = rid](oxen::quic::Stream&, uint64_t error_code) {
|
|
|
|
|
log::warning(
|
|
|
|
|
logcat,
|
|
|
|
|
"BTRequestStream closed unexpectedly (ec:{}); closing outbound "
|
|
|
|
|
"connection...",
|
|
|
|
|
error_code);
|
|
|
|
|
// close_connection(rid);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
if (is_snode)
|
|
|
|
|
link_manager.register_commands(control_stream, rid);
|
|
|
|
|
else
|
|
|
|
|
log::critical(logcat, "Client NOT registering BTStream commands!");
|
|
|
|
|
|
|
|
|
|
log::critical(
|
|
|
|
|
logcat,
|
|
|
|
|
"{} dispatching {} on outbound connection to remote (rid:{})",
|
|
|
|
|
us,
|
|
|
|
|
is_control ? "control message (ep:{})"_format(ep) : "data message",
|
|
|
|
|
rid);
|
|
|
|
|
|
|
|
|
|
(is_control) ? control_stream->command(std::move(*ep), std::move(body), std::move(func))
|
|
|
|
|
: conn_interface->send_datagram(std::move(body));
|
|
|
|
|
|
|
|
|
|
itr->second = std::make_shared<link::Connection>(conn_interface, control_stream, true);
|
|
|
|
|
|
|
|
|
|
log::critical(logcat, "Outbound connection to RID:{} added to service conns...", rid);
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
catch (...)
|
|
|
|
|
{
|
|
|
|
|
log::error(quic_cat, "Error: failed to establish connection to {}", remote);
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
template <typename... Opt>
|
|
|
|
@ -497,44 +510,52 @@ namespace llarp
|
|
|
|
|
Endpoint::establish_connection(
|
|
|
|
|
const oxen::quic::RemoteAddress& remote, const RemoteRC& rc, Opt&&... opts)
|
|
|
|
|
{
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
const auto& rid = rc.router_id();
|
|
|
|
|
const auto& is_snode = _is_service_node;
|
|
|
|
|
|
|
|
|
|
log::critical(logcat, "Establishing connection to RID:{}", rid);
|
|
|
|
|
// add to service conns
|
|
|
|
|
auto [itr, b] = service_conns.emplace(rid, nullptr);
|
|
|
|
|
|
|
|
|
|
auto conn_interface = endpoint->connect(
|
|
|
|
|
remote,
|
|
|
|
|
link_manager.tls_creds,
|
|
|
|
|
is_snode ? ROUTER_KEEP_ALIVE : CLIENT_KEEP_ALIVE,
|
|
|
|
|
std::forward<Opt>(opts)...);
|
|
|
|
|
|
|
|
|
|
auto control_stream = conn_interface->template open_stream<oxen::quic::BTRequestStream>(
|
|
|
|
|
[this, rid = rid](oxen::quic::Stream&, uint64_t error_code) {
|
|
|
|
|
log::warning(
|
|
|
|
|
logcat,
|
|
|
|
|
"BTRequestStream closed unexpectedly (ec:{}); closing outbound connection...",
|
|
|
|
|
error_code);
|
|
|
|
|
close_connection(rid);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
if (is_snode)
|
|
|
|
|
link_manager.register_commands(control_stream, rid);
|
|
|
|
|
else
|
|
|
|
|
log::critical(logcat, "Client NOT registering BTStream commands!");
|
|
|
|
|
itr->second = std::make_shared<link::Connection>(conn_interface, control_stream, true);
|
|
|
|
|
|
|
|
|
|
log::critical(logcat, "Outbound connection to RID:{} added to service conns...", rid);
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
catch (...)
|
|
|
|
|
{
|
|
|
|
|
log::error(quic_cat, "Error: failed to establish connection to {}", remote);
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
return link_manager.router().loop()->call_get([&]() {
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
const auto& rid = rc.router_id();
|
|
|
|
|
const auto& is_snode = _is_service_node;
|
|
|
|
|
|
|
|
|
|
log::critical(logcat, "Establishing connection to RID:{}", rid);
|
|
|
|
|
// add to service conns
|
|
|
|
|
auto [itr, b] = service_conns.try_emplace(rid, nullptr);
|
|
|
|
|
|
|
|
|
|
if (not b)
|
|
|
|
|
{
|
|
|
|
|
log::critical(logcat, "ERROR: attempting to establish an already-existing connection");
|
|
|
|
|
return b;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
auto conn_interface = endpoint->connect(
|
|
|
|
|
remote,
|
|
|
|
|
link_manager.tls_creds,
|
|
|
|
|
is_snode ? ROUTER_KEEP_ALIVE : CLIENT_KEEP_ALIVE,
|
|
|
|
|
std::forward<Opt>(opts)...);
|
|
|
|
|
|
|
|
|
|
auto control_stream = conn_interface->template open_stream<oxen::quic::BTRequestStream>(
|
|
|
|
|
[rid = rid](oxen::quic::Stream&, uint64_t error_code) {
|
|
|
|
|
log::warning(
|
|
|
|
|
logcat,
|
|
|
|
|
"BTRequestStream closed unexpectedly (ec:{}); closing outbound connection...",
|
|
|
|
|
error_code);
|
|
|
|
|
// close_connection(rid);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
if (is_snode)
|
|
|
|
|
link_manager.register_commands(control_stream, rid);
|
|
|
|
|
else
|
|
|
|
|
log::critical(logcat, "Client NOT registering BTStream commands!");
|
|
|
|
|
itr->second = std::make_shared<link::Connection>(conn_interface, control_stream, true);
|
|
|
|
|
|
|
|
|
|
log::critical(logcat, "Outbound connection to RID:{} added to service conns...", rid);
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
catch (...)
|
|
|
|
|
{
|
|
|
|
|
log::error(quic_cat, "Error: failed to establish connection to {}", remote);
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
} // namespace link
|
|
|
|
|
} // namespace llarp
|
|
|
|
|