diff --git a/llarp/quic/client.cpp b/llarp/quic/client.cpp index 09fc22855..affbc5991 100644 --- a/llarp/quic/client.cpp +++ b/llarp/quic/client.cpp @@ -3,7 +3,6 @@ #include #include -#include #include #include #include diff --git a/llarp/quic/client.hpp b/llarp/quic/client.hpp index 522460689..d6d728153 100644 --- a/llarp/quic/client.hpp +++ b/llarp/quic/client.hpp @@ -5,12 +5,6 @@ #include -namespace uvw -{ - struct ListenEvent; - class TCPHandle; -} // namespace uvw - namespace llarp::quic { class Client : public Endpoint diff --git a/llarp/quic/tunnel.cpp b/llarp/quic/tunnel.cpp index aaa9cc909..8f057345b 100644 --- a/llarp/quic/tunnel.cpp +++ b/llarp/quic/tunnel.cpp @@ -438,6 +438,25 @@ namespace llarp::quic auto err_handler = tcp_tunnel->once([&failed](auto& evt, auto&) { failed = evt.what(); }); tcp_tunnel->bind(*bind_addr.operator const sockaddr*()); + tcp_tunnel->on([this] (const uvw::ListenEvent&, uvw::TCPHandle& tcp_tunnel) { + auto client = tcp_tunnel.loop().resource(); + tcp_tunnel.accept(*client); + // Freeze the connection (after accepting) because we may need to stall it until a stream + // becomes available; flush_pending_incoming will unfreeze it. + client->stop(); + auto pport = tcp_tunnel.data(); + if (pport) + { + if (auto it = client_tunnels_.find(*pport); it != client_tunnels_.end()) + { + it->second.pending_incoming.emplace(std::move(client)); + flush_pending_incoming(it->second); + return; + } + tcp_tunnel.data(nullptr); + } + client->closeReset(); + }); tcp_tunnel->listen(); tcp_tunnel->erase(err_handler); @@ -466,6 +485,9 @@ namespace llarp::quic auto& ct = client_tunnels_[pport]; ct.open_cb = std::move(on_open); ct.tcp = std::move(tcp_tunnel); + // We use this pport shared_ptr value on the listening tcp socket both to hand to pport into the + // accept handler, and to let the accept handler know that `this` is still safe to use. + ct.tcp->data(std::make_shared(pport)); auto after_path = [this, port, pport = pport, remote_addr](auto maybe_convo) { if (not continue_connecting(pport, (bool)maybe_convo, "path build", remote_addr)) @@ -510,6 +532,7 @@ namespace llarp::quic if (auto it = client_tunnels_.find(id); it != client_tunnels_.end()) { it->second.tcp->close(); + it->second.tcp->data(nullptr); it->second.tcp.reset(); } } @@ -519,6 +542,7 @@ namespace llarp::quic if (tcp) { tcp->close(); + tcp->data(nullptr); tcp.reset(); } for (auto& conn : conns) @@ -545,38 +569,43 @@ namespace llarp::quic tunnel.client = std::make_unique(service_endpoint_, remote, pport); auto conn = tunnel.client->get_connection(); - conn->on_stream_available = [this, id = row.first](Connection& conn) { + conn->on_stream_available = [this, id = row.first](Connection&) { if (auto it = client_tunnels_.find(id); it != client_tunnels_.end()) - flush_pending_incoming(it->second, conn); + flush_pending_incoming(it->second); }; } void - TunnelManager::flush_pending_incoming(ClientTunnel& ct, Connection& conn) + TunnelManager::flush_pending_incoming(ClientTunnel& ct) { + if (!ct.client) return; // Happens if we're still waiting for a path to build + assert(ct.client->get_connection()); + auto& conn = *ct.client->get_connection(); int available = conn.get_streams_available(); while (available > 0 and not ct.pending_incoming.empty()) { - auto client = ct.pending_incoming.front().lock(); + auto tcp_client = ct.pending_incoming.front().lock(); ct.pending_incoming.pop(); - if (not client) + if (not tcp_client) continue; try { - conn.open_stream( - [client](auto&&... args) { - initial_client_data_handler(*client, std::forward(args)...); + auto str = conn.open_stream( + [tcp_client](auto&&... args) { + initial_client_data_handler(*tcp_client, std::forward(args)...); }, - [client](auto&&... args) { - initial_client_close_handler(*client, std::forward(args)...); + [tcp_client](auto&&... args) { + initial_client_close_handler(*tcp_client, std::forward(args)...); }); + install_stream_forwarding(*tcp_client, *str); + tcp_client->read(); // Unfreeze (we stop() before putting into pending) available--; } catch (const std::exception& e) { LogWarn("Opening quic stream failed: ", e.what()); - client->closeReset(); + tcp_client->closeReset(); } LogTrace("Set up new stream"); diff --git a/llarp/quic/tunnel.hpp b/llarp/quic/tunnel.hpp index 93916717d..c6f91aa70 100644 --- a/llarp/quic/tunnel.hpp +++ b/llarp/quic/tunnel.hpp @@ -166,7 +166,7 @@ namespace llarp::quic make_client(const SockAddr& remote, std::pair& row); void - flush_pending_incoming(ClientTunnel& ct, Connection& conn); + flush_pending_incoming(ClientTunnel& ct); // Server instance; this listens on pseudo-port 0 (if it listens). This is automatically // instantiated the first time `listen()` is called; if not instantiated we simply drop any