Add missing client TCP accept/forwarding handlers

Somehow the TCP client connection accept and stream forwarding got
dropped in the quic refactor.
pull/1576/head
Jason Rhinelander 3 years ago committed by Jeff Becker
parent 738f16366b
commit b8be889291
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -3,7 +3,6 @@
#include <llarp/util/logging/buffer.hpp>
#include <llarp/util/logging/logger.hpp>
#include <uvw/tcp.h>
#include <oxenmq/variant.h>
#include <llarp/service/address.hpp>
#include <llarp/service/endpoint.hpp>

@ -5,12 +5,6 @@
#include <optional>
namespace uvw
{
struct ListenEvent;
class TCPHandle;
} // namespace uvw
namespace llarp::quic
{
class Client : public Endpoint

@ -438,6 +438,25 @@ namespace llarp::quic
auto err_handler =
tcp_tunnel->once<uvw::ErrorEvent>([&failed](auto& evt, auto&) { failed = evt.what(); });
tcp_tunnel->bind(*bind_addr.operator const sockaddr*());
tcp_tunnel->on<uvw::ListenEvent>([this] (const uvw::ListenEvent&, uvw::TCPHandle& tcp_tunnel) {
auto client = tcp_tunnel.loop().resource<uvw::TCPHandle>();
tcp_tunnel.accept(*client);
// Freeze the connection (after accepting) because we may need to stall it until a stream
// becomes available; flush_pending_incoming will unfreeze it.
client->stop();
auto pport = tcp_tunnel.data<uint16_t>();
if (pport)
{
if (auto it = client_tunnels_.find(*pport); it != client_tunnels_.end())
{
it->second.pending_incoming.emplace(std::move(client));
flush_pending_incoming(it->second);
return;
}
tcp_tunnel.data(nullptr);
}
client->closeReset();
});
tcp_tunnel->listen();
tcp_tunnel->erase(err_handler);
@ -466,6 +485,9 @@ namespace llarp::quic
auto& ct = client_tunnels_[pport];
ct.open_cb = std::move(on_open);
ct.tcp = std::move(tcp_tunnel);
// We use this pport shared_ptr value on the listening tcp socket both to hand to pport into the
// accept handler, and to let the accept handler know that `this` is still safe to use.
ct.tcp->data(std::make_shared<uint16_t>(pport));
auto after_path = [this, port, pport = pport, remote_addr](auto maybe_convo) {
if (not continue_connecting(pport, (bool)maybe_convo, "path build", remote_addr))
@ -510,6 +532,7 @@ namespace llarp::quic
if (auto it = client_tunnels_.find(id); it != client_tunnels_.end())
{
it->second.tcp->close();
it->second.tcp->data(nullptr);
it->second.tcp.reset();
}
}
@ -519,6 +542,7 @@ namespace llarp::quic
if (tcp)
{
tcp->close();
tcp->data(nullptr);
tcp.reset();
}
for (auto& conn : conns)
@ -545,38 +569,43 @@ namespace llarp::quic
tunnel.client = std::make_unique<Client>(service_endpoint_, remote, pport);
auto conn = tunnel.client->get_connection();
conn->on_stream_available = [this, id = row.first](Connection& conn) {
conn->on_stream_available = [this, id = row.first](Connection&) {
if (auto it = client_tunnels_.find(id); it != client_tunnels_.end())
flush_pending_incoming(it->second, conn);
flush_pending_incoming(it->second);
};
}
void
TunnelManager::flush_pending_incoming(ClientTunnel& ct, Connection& conn)
TunnelManager::flush_pending_incoming(ClientTunnel& ct)
{
if (!ct.client) return; // Happens if we're still waiting for a path to build
assert(ct.client->get_connection());
auto& conn = *ct.client->get_connection();
int available = conn.get_streams_available();
while (available > 0 and not ct.pending_incoming.empty())
{
auto client = ct.pending_incoming.front().lock();
auto tcp_client = ct.pending_incoming.front().lock();
ct.pending_incoming.pop();
if (not client)
if (not tcp_client)
continue;
try
{
conn.open_stream(
[client](auto&&... args) {
initial_client_data_handler(*client, std::forward<decltype(args)>(args)...);
auto str = conn.open_stream(
[tcp_client](auto&&... args) {
initial_client_data_handler(*tcp_client, std::forward<decltype(args)>(args)...);
},
[client](auto&&... args) {
initial_client_close_handler(*client, std::forward<decltype(args)>(args)...);
[tcp_client](auto&&... args) {
initial_client_close_handler(*tcp_client, std::forward<decltype(args)>(args)...);
});
install_stream_forwarding(*tcp_client, *str);
tcp_client->read(); // Unfreeze (we stop() before putting into pending)
available--;
}
catch (const std::exception& e)
{
LogWarn("Opening quic stream failed: ", e.what());
client->closeReset();
tcp_client->closeReset();
}
LogTrace("Set up new stream");

@ -166,7 +166,7 @@ namespace llarp::quic
make_client(const SockAddr& remote, std::pair<const uint16_t, ClientTunnel>& row);
void
flush_pending_incoming(ClientTunnel& ct, Connection& conn);
flush_pending_incoming(ClientTunnel& ct);
// Server instance; this listens on pseudo-port 0 (if it listens). This is automatically
// instantiated the first time `listen()` is called; if not instantiated we simply drop any

Loading…
Cancel
Save