Fix/refactor stream closing

Make stream closing with expiring connections work better.  Fixes an
issue where the stream's uv_async could outlive the stream and/or
connection and segfault.
pull/1576/head
Jason Rhinelander 3 years ago committed by Jeff Becker
parent 60c813d306
commit ac34835c12
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -537,6 +537,8 @@ namespace llarp::quic
retransmit_timer->stop(); retransmit_timer->stop();
retransmit_timer->close(); retransmit_timer->close();
} }
for (auto& [id, str] : streams)
str->hard_close();
} }
void void
@ -820,7 +822,7 @@ namespace llarp::quic
" data callback raised exception (", " data callback raised exception (",
e.what(), e.what(),
"); closing stream with app code ", "); closing stream with app code ",
STREAM_EXCEPTION_ERROR_CODE); STREAM_ERROR_EXCEPTION);
} }
catch (...) catch (...)
{ {
@ -828,11 +830,11 @@ namespace llarp::quic
"Stream ", "Stream ",
str->id(), str->id(),
" data callback raised an unknown exception; closing stream with app code ", " data callback raised an unknown exception; closing stream with app code ",
STREAM_EXCEPTION_ERROR_CODE); STREAM_ERROR_EXCEPTION);
} }
if (!good) if (!good)
{ {
str->close(STREAM_EXCEPTION_ERROR_CODE); str->close(STREAM_ERROR_EXCEPTION);
return NGTCP2_ERR_CALLBACK_FAILURE; return NGTCP2_ERR_CALLBACK_FAILURE;
} }
} }

@ -69,6 +69,11 @@ namespace llarp::quic
: Stream{conn, nullptr, nullptr, buffer_size, std::move(id)} : Stream{conn, nullptr, nullptr, buffer_size, std::move(id)}
{} {}
Stream::~Stream()
{
hard_close();
}
void void
Stream::set_buffer_size(size_t size) Stream::set_buffer_size(size_t size)
{ {
@ -257,6 +262,8 @@ namespace llarp::quic
void void
Stream::handle_unblocked() Stream::handle_unblocked()
{ {
if (is_closing)
return;
if (buffer.empty()) if (buffer.empty())
{ {
while (!unblocked_callbacks.empty() && unblocked_callbacks.front()(*this)) while (!unblocked_callbacks.empty() && unblocked_callbacks.front()(*this))
@ -281,7 +288,8 @@ namespace llarp::quic
void void
Stream::available_ready() Stream::available_ready()
{ {
avail_trigger->send(); if (avail_trigger)
avail_trigger->send();
} }
void void
@ -322,6 +330,19 @@ namespace llarp::quic
conn.io_ready(); conn.io_ready();
} }
void
Stream::hard_close()
{
if (avail_trigger)
{
avail_trigger->close();
avail_trigger.reset();
}
if (!is_closing && close_callback)
close_callback(*this, STREAM_ERROR_CONNECTION_EXPIRED);
is_closing = is_shutdown = true;
}
void void
Stream::data(std::shared_ptr<void> data) Stream::data(std::shared_ptr<void> data)
{ {

@ -73,7 +73,11 @@ namespace llarp::quic
}; };
// Application error code we close with if the data handle throws // Application error code we close with if the data handle throws
constexpr uint64_t STREAM_EXCEPTION_ERROR_CODE = (1ULL << 62) - 2; inline constexpr uint64_t STREAM_ERROR_EXCEPTION = (1ULL << 62) - 2;
// Error code we send to a stream close callback if the stream's connection expires; this is *not*
// sent over quic, hence using a value >= 2^62 (quic's maximum serializable integer).
inline constexpr uint64_t STREAM_ERROR_CONNECTION_EXPIRED = (1ULL << 62) + 1;
std::ostream& std::ostream&
operator<<(std::ostream& o, const StreamID& s); operator<<(std::ostream& o, const StreamID& s);
@ -266,6 +270,8 @@ namespace llarp::quic
return conn; return conn;
} }
~Stream();
private: private:
friend class Connection; friend class Connection;
@ -310,6 +316,11 @@ namespace llarp::quic
void void
wrote(size_t bytes); wrote(size_t bytes);
// Called by the owning Connection to do a "hard" close of a stream during Connection
// destruction: unlike a regular close this doesn't try to transmit a close over the wire (which
// won't work since the Connection is dead), it just fires the close callback and cleans up.
void hard_close();
// ngtcp2 stream_id, assigned during stream creation // ngtcp2 stream_id, assigned during stream creation
StreamID stream_id{-1}; StreamID stream_id{-1};

@ -78,6 +78,11 @@ namespace llarp::quic
} }
} }
void close_tcp_pair(quic::Stream& st, std::optional<uint64_t> /*errcode*/) {
if (auto tcp = st.data<uvw::TCPHandle>())
tcp->close();
};
// Creates a new tcp handle that forwards incoming data/errors/closes into appropriate actions // Creates a new tcp handle that forwards incoming data/errors/closes into appropriate actions
// on the given quic stream. // on the given quic stream.
void void
@ -124,6 +129,7 @@ namespace llarp::quic
}); });
tcp.on<uvw::DataEvent>(on_outgoing_data); tcp.on<uvw::DataEvent>(on_outgoing_data);
stream.data_callback = on_incoming_data; stream.data_callback = on_incoming_data;
stream.close_callback = close_tcp_pair;
} }
// This initial data handler is responsible for pulling off the initial stream data that comes // This initial data handler is responsible for pulling off the initial stream data that comes
// back, confirming that the tunnel is opened on the other end. Currently this is a null byte // back, confirming that the tunnel is opened on the other end. Currently this is a null byte
@ -236,12 +242,7 @@ namespace llarp::quic
server_ = std::make_unique<Server>(service_endpoint_); server_ = std::make_unique<Server>(service_endpoint_);
server_->stream_open_callback = [this](Stream& stream, uint16_t port) -> bool { server_->stream_open_callback = [this](Stream& stream, uint16_t port) -> bool {
stream.close_callback = [](quic::Stream& st, stream.close_callback = close_tcp_pair;
[[maybe_unused]] std::optional<uint64_t> errcode) {
auto tcp = st.data<uvw::TCPHandle>();
if (tcp)
tcp->close();
};
auto& conn = stream.get_connection(); auto& conn = stream.get_connection();
auto remote = service_endpoint_.GetEndpointWithConvoTag(conn.path.remote); auto remote = service_endpoint_.GetEndpointWithConvoTag(conn.path.remote);

Loading…
Cancel
Save