@ -2,6 +2,7 @@
# include <llarp/service/convotag.hpp>
# include <llarp/service/endpoint.hpp>
# include <llarp/service/name.hpp>
# include "llarp/net/net_int.hpp"
# include "stream.hpp"
# include <limits>
# include <llarp/util/logging.hpp>
@ -16,6 +17,8 @@ namespace llarp::quic
{
namespace
{
static auto logcat = log : : Cat ( " quic " ) ;
// Takes data from the tcp connection and pushes it down the quic tunnel
void
on_outgoing_data ( uvw : : DataEvent & event , uvw : : TCPHandle & client )
@ -24,21 +27,24 @@ namespace llarp::quic
assert ( stream ) ;
std : : string_view data { event . data . get ( ) , event . length } ;
auto peer = client . peer ( ) ;
LogTrace ( peer . ip , " : " , peer . port , " → lokinet " , buffer_printer { data } ) ;
// log::trace(logcat, "{}:{} → lokinet {}", peer.ip, peer.port, buffer_printer{data});
log : : debug ( logcat , " {}:{} → lokinet {} " , peer . ip , peer . port , buffer_printer { data } ) ;
// Steal the buffer from the DataEvent's unique_ptr<char[]>:
stream - > append_buffer ( reinterpret_cast < const std : : byte * > ( event . data . release ( ) ) , event . length ) ;
if ( stream - > used ( ) > = tunnel : : PAUSE_SIZE )
{
LogDebug (
" quic tunnel is congested (have " ,
stream - > used ( ) ,
" bytes in flight); pausing local tcp connection reads " ) ;
log : : debug (
logcat ,
" quic tunnel is congested (have {} bytes in flight); pausing local tcp connection "
" reads " ,
stream - > used ( ) ) ;
client . stop ( ) ;
stream - > when_available ( [ ] ( Stream & s ) {
auto client = s . data < uvw : : TCPHandle > ( ) ;
if ( s . used ( ) < tunnel : : PAUSE_SIZE )
{
LogDebug ( " quic tunnel is no longer congested; resuming tcp connection reading " ) ;
log : : debug (
logcat , " quic tunnel is no longer congested; resuming tcp connection reading " ) ;
client - > read ( ) ;
return true ;
}
@ -47,7 +53,7 @@ namespace llarp::quic
}
else
{
LogDebug( " Queued " , event . length , " bytes " ) ;
log: : debug ( logcat , " Queued {} bytes " , event . length ) ;
}
}
@ -62,7 +68,7 @@ namespace llarp::quic
std : : string_view data { reinterpret_cast < const char * > ( bdata . data ( ) ) , bdata . size ( ) } ;
auto peer = tcp - > peer ( ) ;
LogTrace( peer . ip , " : " , peer . port , " ← lokinet " , buffer_printer { data } ) ;
log: : trace ( logcat , " {}:{} ← lokinet {} " , peer . ip , peer . port , buffer_printer { data } ) ;
if ( data . empty ( ) )
return ;
@ -85,7 +91,7 @@ namespace llarp::quic
{
if ( auto tcp = st . data < uvw : : TCPHandle > ( ) )
{
LogTrace( " Closing TCP connection " ) ;
log: : trace ( logcat , " Closing TCP connection " ) ;
tcp - > close ( ) ;
}
} ;
@ -96,42 +102,58 @@ namespace llarp::quic
{
tcp . data ( stream . shared_from_this ( ) ) ;
stream . weak_data ( tcp . weak_from_this ( ) ) ;
auto weak_conn = stream . get_connection ( ) . weak_from_this ( ) ;
tcp . clear ( ) ; // Clear any existing initial event handlers
tcp . on < uvw : : CloseEvent > ( [ ] ( auto & , uvw : : TCPHandle & c ) {
tcp . on < uvw : : CloseEvent > ( [ weak_conn = std : : move ( weak_conn ) ] ( auto & , uvw : : TCPHandle & c ) {
// This fires sometime after we call `close()` to signal that the close is done.
if ( auto stream = c . data < Stream > ( ) )
{
LogInfo ( " Local TCP connection closed, closing associated quic stream " , stream - > id ( ) ) ;
stream - > close ( ) ;
log : : info (
logcat ,
" Local TCP connection closed, closing associated quic stream {} " ,
stream - > id ( ) ) ;
// There is an awkwardness with Stream ownership, so make sure the Connection
// which it holds a reference to still exists, as stream->close will segfault
// otherwise
if ( auto locked_conn = weak_conn . lock ( ) )
{
// If this end of the stream closed due to an abrupt close to the local TCP
// connection rather than an EOF, send an error code along so the other end
// knows this end is dead. Otherwise, the streams on either end should die
// gracefully when both ends of the TCP connection are properly closed.
stream - > close ( stream - > has_eof ( ) ? 0 : tunnel : : ERROR_TCP ) ;
}
stream - > data ( nullptr ) ;
}
c . data ( nullptr ) ;
} ) ;
tcp . on < uvw : : EndEvent > ( [ ] ( auto & , uvw : : TCPHandle & c ) {
// This fires on eof, most likely because the other side of the TCP connection closed it.
LogInfo ( " EOF on connection to " , c . peer ( ) . ip , " : " , c . peer ( ) . port ) ;
log : : info ( logcat , " EOF on connection to {}:{} " , c . peer ( ) . ip , c . peer ( ) . port ) ;
if ( auto stream = c . data < Stream > ( ) )
{
stream - > set_eof ( ) ; // CloseEvent will send graceful shutdown to other end
}
c . close ( ) ;
} ) ;
tcp . on < uvw : : ErrorEvent > ( [ ] ( const uvw : : ErrorEvent & e , uvw : : TCPHandle & tcp ) {
LogError (
" ErrorEvent[ " ,
log : : error (
logcat ,
" ErrorEvent[{}:{}] on connection with {}:{}, shutting down quic stream " ,
e . name ( ) ,
" : " ,
e . what ( ) ,
" ] on connection with " ,
tcp . peer ( ) . ip ,
" : " ,
tcp . peer ( ) . port ,
" , shutting down quic stream " ) ;
tcp . peer ( ) . port ) ;
if ( auto stream = tcp . data < Stream > ( ) )
{
stream - > close ( tunnel : : ERROR_TCP ) ;
stream - > data ( nullptr ) ;
tcp . data ( nullptr ) ;
}
// tcp.closeReset();
tcp . close ( ) ;
} ) ;
tcp . on < uvw : : DataEvent > ( on_outgoing_data ) ;
stream . data_callback = on_incoming_data ;
@ -147,7 +169,7 @@ namespace llarp::quic
void
initial_client_data_handler ( uvw : : TCPHandle & client , Stream & stream , bstring_view bdata )
{
LogTrace( " initial client handler; data: " , buffer_printer { bdata } ) ;
log: : trace ( logcat , " initial client handler; data: {} " , buffer_printer { bdata } ) ;
if ( bdata . empty ( ) )
return ;
client . clear ( ) ; // Clear these initial event handlers: we either set up the proper ones, or
@ -164,14 +186,15 @@ namespace llarp::quic
bdata . remove_prefix ( 1 ) ;
stream . data_callback ( stream , std : : move ( bdata ) ) ;
}
LogTrace( " starting client reading " ) ;
log: : trace ( logcat , " starting client reading " ) ;
}
else
{
LogWarn (
" Remote connection returned invalid initial byte (0x " ,
oxenc : : to_hex ( bdata . begin ( ) , bdata . begin ( ) + 1 ) ,
" ); dropping connection " ) ;
log : : warning (
logcat ,
" Remote connection returned invalid initial byte (0x{}); dropping "
" connection " ,
oxenc : : to_hex ( bdata . begin ( ) , bdata . begin ( ) + 1 ) ) ;
stream . close ( tunnel : : ERROR_BAD_INIT ) ;
client . close ( ) ;
}
@ -187,14 +210,14 @@ namespace llarp::quic
uvw : : TCPHandle & client , Stream & /*stream*/ , std : : optional < uint64_t > error_code )
{
if ( error_code & & * error_code = = tunnel : : ERROR_CONNECT )
LogDebug( " Remote TCP connection failed, closing local connection " ) ;
log: : debug ( logcat , " Remote TCP connection failed, closing local connection " ) ;
else
LogWarn (
" Stream connection closed " ,
error_code ? " with error " + std : : to_string ( * error_code ) : " gracefully " ,
" ; closing local TCP connection. " ) ;
log: : warning (
logcat ,
" Stream connection closed {}; closing local TCP connection. " ,
error_code ? " with error " + std : : to_string ( * error_code ) : " gracefully " ) ;
auto peer = client . peer ( ) ;
LogDebug( " Closing connection to " , peer . ip , " : " , peer . port ) ;
log: : debug ( logcat , " Closing connection to {}:{} " , peer . ip , peer . port ) ;
client . clear ( ) ;
if ( error_code )
client . close ( ) ;
@ -208,7 +231,7 @@ namespace llarp::quic
{
// Cleanup callback to clear out closed tunnel connections
service_endpoint_ . Loop ( ) - > call_every ( 500 ms , timer_keepalive_ , [ this ] {
LogTrace( " Checking quic tunnels for finished connections " ) ;
log: : trace ( logcat , " Checking quic tunnels for finished connections " ) ;
for ( auto ctit = client_tunnels_ . begin ( ) ; ctit ! = client_tunnels_ . end ( ) ; )
{
// Clear any accepted connections that have been closed:
@ -220,7 +243,7 @@ namespace llarp::quic
// stop the TCP connection when the quic side gets congested.
if ( not * it or not ( * it ) - > data ( ) )
{
LogDebug( " Cleanup up closed outgoing tunnel on quic: " , port ) ;
log: : debug ( logcat , " Cleanup up closed outgoing tunnel on quic: {} " , port ) ;
it = ct . conns . erase ( it ) ;
}
else
@ -231,43 +254,40 @@ namespace llarp::quic
// destroy the whole thing.
if ( ct . conns . empty ( ) and ( not ct . tcp or not ct . tcp - > active ( ) ) )
{
LogDebug ( " All sockets closed on quic: " , port , " , destroying tunnel data " ) ;
log : : debug ( logcat , " All sockets closed on quic:{}, destroying tunnel data " , port ) ;
if ( ct . close_cb )
ct . close_cb ( 0 , nullptr ) ;
ctit = client_tunnels_ . erase ( ctit ) ;
}
else
+ + ctit ;
}
LogTrace( " Done quic tunnel cleanup check " ) ;
log: : trace ( logcat , " Done quic tunnel cleanup check " ) ;
} ) ;
}
void
TunnelManager : : make_server ( )
{
// auto loop = get_loop();
server_ = std : : make_unique < Server > ( service_endpoint_ ) ;
server_ - > stream_open_callback = [ this ] ( Stream & stream , uint16_t port ) - > bool {
stream . close_callback = close_tcp_pair ;
// FIXME
auto & conn = stream . get_connection ( ) ;
auto remote = service_endpoint_ . GetEndpointWithConvoTag ( conn . path . remote ) ;
if ( ! remote )
{
LogWarn ( " Received new stream open from invalid/unknown convo tag, dropping stream " ) ;
return false ;
}
auto lokinet_addr = var : : visit ( [ ] ( auto & & remote ) { return remote . ToString ( ) ; } , * remote ) ;
auto lokinet_addr =
var : : visit ( [ ] ( auto & & remote ) { return remote . ToString ( ) ; } , * conn . path . remote . endpoint ) ;
auto tunnel_to = allow_connection ( lokinet_addr , port ) ;
if ( not tunnel_to )
return false ;
LogInfo ( " quic stream from " , lokinet_addr , " to " , port , " tunnelling to " , * tunnel_to ) ;
log : : debug (
logcat , " quic stream from {} to {} tunnelling to {} " , lokinet_addr , port , * tunnel_to ) ;
auto tcp = get_loop ( ) - > resource < uvw : : TCPHandle > ( ) ;
auto error_handler = tcp - > once < uvw : : ErrorEvent > (
[ & stream , to = * tunnel_to ] ( const uvw : : ErrorEvent & , uvw : : TCPHandle & ) {
LogWarn( " Failed to connect to " , to , " , shutting down quic stream " ) ;
log: : warning ( logcat , " Failed to connect to {}, shutting down quic stream " , to ) ;
stream . close ( tunnel : : ERROR_CONNECT ) ;
} ) ;
@ -281,16 +301,16 @@ namespace llarp::quic
auto stream = streamw . lock ( ) ;
if ( ! stream )
{
LogWarn (
" Connected to TCP " ,
log : : warning (
logcat ,
" Connected to TCP {}:{} but quic stream has gone away; close/resetting local TCP "
" connection " ,
peer . ip ,
" : " ,
peer . port ,
" but quic stream has gone away; close/resetting local TCP connection " ) ;
peer . port ) ;
tcp . close ( ) ;
return ;
}
LogDebug( " Connected to " , peer . ip , " : " , peer . port , " for quic " , stream - > id ( ) ) ;
log: : debug ( logcat , " Connected to {}:{} for quic {} " , peer . ip , peer . port , stream - > id ( ) ) ;
// Set up the data stream forwarding (which also clears these initial handlers).
install_stream_forwarding ( tcp , * stream ) ;
assert ( stream - > used ( ) = = 0 ) ;
@ -324,7 +344,7 @@ namespace llarp::quic
TunnelManager : : listen ( SockAddr addr )
{
return listen ( [ addr ] ( std : : string_view , uint16_t p ) - > std : : optional < SockAddr > {
LogInfo( " try accepting " , addr . getPort ( ) ) ;
log: : info ( logcat , " try accepting {} " , addr . getPort ( ) ) ;
if ( p = = addr . getPort ( ) )
return addr ;
return std : : nullopt ;
@ -349,19 +369,20 @@ namespace llarp::quic
}
catch ( const std : : exception & e )
{
LogWarn (
" Incoming quic connection from " ,
log : : warning (
logcat ,
" Incoming quic connection from {} to {} denied via exception ({}) " ,
lokinet_addr ,
" to " ,
port ,
" denied via exception ( " ,
e . what ( ) ,
" ) " ) ;
e . what ( ) ) ;
return std : : nullopt ;
}
}
LogWarn (
" Incoming quic connection from " , lokinet_addr , " to " , port , " declined by all handlers " ) ;
log : : warning (
logcat ,
" Incoming quic connection from {} to {} declined by all handlers " ,
lokinet_addr ,
port ) ;
return std : : nullopt ;
}
@ -413,15 +434,15 @@ namespace llarp::quic
auto it = client_tunnels_ . find ( pseudo_port ) ;
if ( it = = client_tunnels_ . end ( ) )
{
LogDebug( " QUIC tunnel to " , addr , " closed before " , step_name , " finished " ) ;
log: : debug ( logcat , " QUIC tunnel to {} closed before {} finished " , addr , step_name ) ;
return false ;
}
if ( ! step_success )
{
LogWarn( " QUIC tunnel to " , addr , " failed during " , step_name , " ; aborting tunnel " ) ;
log: : warning ( logcat , " QUIC tunnel to {} failed during {}; aborting tunnel " , addr , step_name ) ;
it - > second . tcp - > close ( ) ;
if ( it - > second . open_cb )
it - > second . open_cb ( false );
it - > second . open_cb ( false , nullptr );
client_tunnels_ . erase ( it ) ;
}
return step_success ;
@ -429,7 +450,11 @@ namespace llarp::quic
std : : pair < SockAddr , uint16_t >
TunnelManager : : open (
std : : string_view remote_address , uint16_t port , OpenCallback on_open , SockAddr bind_addr )
std : : string_view remote_address ,
uint16_t port ,
OpenCallback on_open ,
CloseCallback on_close ,
SockAddr bind_addr )
{
std : : string remote_addr = lowercase_ascii_string ( std : : string { remote_address } ) ;
@ -494,24 +519,26 @@ namespace llarp::quic
" Unable to open an outgoing quic connection: too many existing connections " } ;
( next_pseudo_port_ = pport ) + + ;
LogInfo( " Bound TCP tunnel " , saddr , " for quic client : " , pport ) ;
log: : trace ( logcat , " Bound TCP tunnel {} for quic client :{} " , saddr , pport ) ;
// We are emplacing into client_tunnels_ here: beyond this point we must not throw until we
// return (or if we do, make sure we remove this row from client_tunnels_ first).
assert ( client_tunnels_ . count ( pport ) = = 0 ) ;
auto & ct = client_tunnels_ [ pport ] ;
ct . open_cb = std : : move ( on_open ) ;
ct . close_cb = std : : move ( on_close ) ;
ct . tcp = std : : move ( tcp_tunnel ) ;
// We use this pport shared_ptr value on the listening tcp socket both to hand to pport into the
// accept handler, and to let the accept handler know that `this` is still safe to use.
ct . tcp - > data ( std : : make_shared < uint16_t > ( pport ) ) ;
auto after_path = [ this , port , pport = pport , remote_addr ] ( auto maybe_convo ) {
if ( not continue_connecting ( pport , ( bool ) maybe_convo , " path build " , remote_addr ) )
auto after_path = [ this , port , pport = pport , remote_addr ] ( auto maybe_addr ) {
if ( maybe_addr )
{
make_client ( port , * maybe_addr , * client_tunnels_ . find ( pport ) ) ;
return ;
SockAddr dest { maybe_convo - > ToV6 ( ) } ;
dest . setPort ( port ) ;
make_client ( dest , * client_tunnels_ . find ( pport ) ) ;
}
continue_connecting ( pport , false , " path build " , remote_addr ) ;
} ;
if ( ! maybe_remote )
@ -520,10 +547,8 @@ namespace llarp::quic
// then we have to build a path to that address.
service_endpoint_ . LookupNameAsync (
remote_addr ,
[ this ,
after_path = std : : move ( after_path ) ,
pport = pport ,
remote_addr = std : : move ( remote_addr ) ] ( auto maybe_remote ) {
[ this , after_path = std : : move ( after_path ) , pport = pport , remote_addr ] (
auto maybe_remote ) {
if ( not continue_connecting (
pport , ( bool ) maybe_remote , " endpoint ONS lookup " , remote_addr ) )
return ;
@ -536,8 +561,9 @@ namespace llarp::quic
auto & remote = * maybe_remote ;
// See if we have an existing convo tag we can use to start things immediately
if ( auto maybe_convo = service_endpoint_ . GetBestConvoTagFor ( remote ) )
after_path ( maybe_convo ) ;
if ( auto maybe_convo = service_endpoint_ . GetBestConvoTagFor ( remote ) ;
auto maybe_addr = service_endpoint_ . GetEndpointWithConvoTag ( * maybe_convo ) )
after_path ( maybe_addr ) ;
else
{
service_endpoint_ . MarkAddressOutbound ( remote ) ;
@ -582,18 +608,47 @@ namespace llarp::quic
}
void
TunnelManager : : make_client ( const SockAddr & remote , std : : pair < const uint16_t , ClientTunnel > & row )
TunnelManager : : make_client (
const uint16_t port ,
std : : variant < service : : Address , RouterID > remote ,
std : : pair < const uint16_t , ClientTunnel > & row )
{
assert ( remote . getPort ( ) > 0 ) ;
assert ( port > 0 ) ;
auto & [ pport , tunnel ] = row ;
assert ( not tunnel . client ) ;
tunnel . client = std : : make_unique < Client > ( service_endpoint_ , remote, pport ) ;
tunnel . client = std : : make_unique < Client > ( service_endpoint_ , port, std : : move ( remote) , pport ) ;
auto conn = tunnel . client - > get_connection ( ) ;
conn - > on_stream_available = [ this , id = row . first ] ( Connection & ) {
LogDebug( " QUIC connection : " , id , " established; streams now available " ) ;
log: : debug ( logcat , " QUIC connection :{} established; streams now available " , id ) ;
if ( auto it = client_tunnels_ . find ( id ) ; it ! = client_tunnels_ . end ( ) )
{
flush_pending_incoming ( it - > second ) ;
if ( it - > second . open_cb )
{
log : : trace ( logcat , " Calling ClientTunnel.open_cb() " ) ;
it - > second . open_cb ( true , nullptr ) ;
it - > second . open_cb = nullptr ; // only call once
}
}
else
log : : warning (
logcat , " Connection.on_stream_available fired but we have no associated ClientTunnel! " ) ;
} ;
conn - > on_closing = [ this , id = row . first ] ( Connection & ) {
log : : debug ( logcat , " QUIC connection :{} closing, closing tunnel " , id ) ;
if ( auto it = client_tunnels_ . find ( id ) ; it ! = client_tunnels_ . end ( ) )
{
if ( it - > second . close_cb )
{
log : : trace ( logcat , " Calling ClientTunnel.close_cb() " ) ;
it - > second . close_cb ( 0 , nullptr ) ;
}
}
else
log : : debug ( logcat , " Connection.on_closing fired but no associated ClientTunnel found. " ) ;
this - > close ( id ) ;
} ;
}
@ -626,21 +681,22 @@ namespace llarp::quic
}
catch ( const std : : exception & e )
{
LogWarn( " Opening quic stream failed: " , e . what ( ) ) ;
log: : warning ( logcat , " Opening quic stream failed: {} " , e . what ( ) ) ;
tcp_client - > close ( ) ;
}
LogTrace( " Set up new stream " ) ;
log: : trace ( logcat , " Set up new stream " ) ;
conn . io_ready ( ) ;
}
}
void
TunnelManager : : receive_packet ( const service : : ConvoTag & tag , const llarp_buffer_t & buf )
TunnelManager : : receive_packet (
std : : variant < service : : Address , RouterID > remote , const llarp_buffer_t & buf )
{
if ( buf . sz < = 4 )
{
LogWarn( " invalid quic packet: packet size ( " , buf . sz , " ) too small " ) ;
log: : warning ( logcat , " invalid quic packet: packet size ( {}) too small " , buf . sz ) ;
return ;
}
auto type = static_cast < std : : byte > ( buf . base [ 0 ] ) ;
@ -650,51 +706,62 @@ namespace llarp::quic
auto ecn = static_cast < uint8_t > ( buf . base [ 3 ] ) ;
bstring_view data { reinterpret_cast < const std : : byte * > ( & buf . base [ 4 ] ) , buf . sz - 4 } ;
SockAddr remote { tag . ToV6 ( ) } ;
huint16_t remote_port { pseudo_port } ;
quic : : Endpoint * ep = nullptr ;
if ( type = = CLIENT_TO_SERVER )
{
LogTrace ( " packet is client-to-server from client pport " , pseudo_port ) ;
// Client-to-server: the header port is the return port
remote . setPort ( pseudo_port ) ;
log : : debug ( logcat , " packet is client-to-server from client pport {} " , pseudo_port ) ;
if ( ! server_ )
{
LogWarn( " Dropping incoming quic packet to server: no listeners " ) ;
log: : warning ( logcat , " Dropping incoming quic packet to server: no listeners " ) ;
return ;
}
ep = server_ . get ( ) ;
}
else if ( type = = SERVER_TO_CLIENT )
{
LogTrace( " packet is server-to-client to client pport " , pseudo_port ) ;
log: : debug ( logcat , " packet is server-to-client to client pport {} " , pseudo_port ) ;
// Server-to-client: the header port tells us which client tunnel this is going to
if ( auto it = client_tunnels_ . find ( pseudo_port ) ; it ! = client_tunnels_ . end ( ) )
ep = it - > second . client . get ( ) ;
if ( not ep )
{
LogWarn( " Incoming quic packet to invalid/closed client; dropping " ) ;
log: : warning ( logcat , " Incoming quic packet to invalid/closed client; dropping " ) ;
return ;
}
// The server doesn't send back the port because we already know it 1-to-1 from our outgoing
// connection .
// connection
if ( auto conn = static_cast < quic : : Client & > ( * ep ) . get_connection ( ) )
{
remote . setPor t( conn - > path . remote . port ( ) ) ;
LogTrace( " remote port is " , remote . getPort ( ) ) ;
remote _port = llarp : : net : : ToHos t( conn - > path . remote . port ( ) ) ;
log: : debug ( logcat , " remote port is {} " , remote_port ) ;
}
else
{
LogWarn ( " Incoming quic to a quic::Client without an active quic::Connection; dropping " ) ;
log : : warning (
logcat , " Incoming quic to a quic::Client without an active quic::Connection; dropping " ) ;
return ;
}
}
else
{
LogWarn( " Invalid incoming quic packet type " , type , " ; dropping packet " ) ;
log: : warning ( logcat , " Invalid incoming quic packet type {}; dropping packet " , type ) ;
return ;
}
ep - > receive_packet ( remote , ecn , data ) ;
log : : debug (
logcat ,
" remote_port = {}, pseudo_port = {} at line {} " ,
remote_port ,
pseudo_port ,
__LINE__ ) ;
auto remote_addr = Address { SockAddr { " ::1 " sv , remote_port } , std : : move ( remote ) } ;
ep - > receive_packet ( std : : move ( remote_addr ) , ecn , data ) ;
}
} // namespace llarp::quic