@ -21,10 +21,16 @@ namespace llarp
{
{
namespace link
namespace link
{
{
Endpoint : : Endpoint ( std : : shared_ptr < oxen : : quic : : Endpoint > ep , LinkManager & lm )
: endpoint { std : : move ( ep ) }
, link_manager { lm }
, _is_service_node { link_manager . is_service_node ( ) }
{ }
std : : shared_ptr < link : : Connection >
std : : shared_ptr < link : : Connection >
Endpoint : : get_conn ( const RemoteRC & rc ) const
Endpoint : : get_conn ( const RemoteRC & rc ) const
{
{
if ( auto itr = active_conns . find ( rc . router_id ( ) ) ; itr ! = active_conns . end ( ) )
if ( auto itr = servic e_conns. find ( rc . router_id ( ) ) ; itr ! = servic e_conns. end ( ) )
return itr - > second ;
return itr - > second ;
// if (auto itr = pending_conns.find(rc.router_id()); itr != pending_conns.end())
// if (auto itr = pending_conns.find(rc.router_id()); itr != pending_conns.end())
@ -36,7 +42,7 @@ namespace llarp
std : : shared_ptr < link : : Connection >
std : : shared_ptr < link : : Connection >
Endpoint : : get_conn ( const RouterID & rid ) const
Endpoint : : get_conn ( const RouterID & rid ) const
{
{
if ( auto itr = activ e_conns. find ( rid ) ; itr ! = activ e_conns. end ( ) )
if ( auto itr = servic e_conns. find ( rid ) ; itr ! = servic e_conns. end ( ) )
return itr - > second ;
return itr - > second ;
// if (auto itr = pending_conns.find(rid); itr != pending_conns.end())
// if (auto itr = pending_conns.find(rid); itr != pending_conns.end())
@ -48,7 +54,7 @@ namespace llarp
bool
bool
Endpoint : : have_client_conn ( const RouterID & remote ) const
Endpoint : : have_client_conn ( const RouterID & remote ) const
{
{
if ( auto itr = activ e_conns. find ( remote ) ; itr ! = activ e_conns. end ( ) )
if ( auto itr = servic e_conns. find ( remote ) ; itr ! = servic e_conns. end ( ) )
{
{
return not itr - > second - > remote_is_relay ;
return not itr - > second - > remote_is_relay ;
}
}
@ -64,7 +70,7 @@ namespace llarp
bool
bool
Endpoint : : have_conn ( const RouterID & remote ) const
Endpoint : : have_conn ( const RouterID & remote ) const
{
{
return activ e_conns. count ( remote ) or pending_conns . count ( remote ) ;
return servic e_conns. count ( remote ) or pending_conns . count ( remote ) ;
}
}
std : : pair < size_t , size_t >
std : : pair < size_t , size_t >
@ -72,7 +78,7 @@ namespace llarp
{
{
size_t in { 0 } , out { 0 } ;
size_t in { 0 } , out { 0 } ;
for ( const auto & c : activ e_conns)
for ( const auto & c : servic e_conns)
{
{
if ( c . second - > inbound )
if ( c . second - > inbound )
+ + in ;
+ + in ;
@ -88,7 +94,7 @@ namespace llarp
{
{
size_t count = 0 ;
size_t count = 0 ;
for ( const auto & c : activ e_conns)
for ( const auto & c : servic e_conns)
{
{
if ( not ( c . second - > remote_is_relay and clients_only ) )
if ( not ( c . second - > remote_is_relay and clients_only ) )
count + = 1 ;
count + = 1 ;
@ -100,9 +106,9 @@ namespace llarp
bool
bool
Endpoint : : get_random_connection ( RemoteRC & router ) const
Endpoint : : get_random_connection ( RemoteRC & router ) const
{
{
if ( const auto size = activ e_conns. size ( ) ; size )
if ( const auto size = servic e_conns. size ( ) ; size )
{
{
auto itr = activ e_conns. begin ( ) ;
auto itr = servic e_conns. begin ( ) ;
std : : advance ( itr , randint ( ) % size ) ;
std : : advance ( itr , randint ( ) % size ) ;
RouterID rid { itr - > second - > conn - > remote_key ( ) } ;
RouterID rid { itr - > second - > conn - > remote_key ( ) } ;
@ -123,7 +129,7 @@ namespace llarp
void
void
Endpoint : : for_each_connection ( std : : function < void ( link : : Connection & ) > func )
Endpoint : : for_each_connection ( std : : function < void ( link : : Connection & ) > func )
{
{
for ( const auto & [ rid , conn ] : activ e_conns)
for ( const auto & [ rid , conn ] : servic e_conns)
func ( * conn ) ;
func ( * conn ) ;
}
}
@ -134,7 +140,7 @@ namespace llarp
link_manager . _router . loop ( ) - > call ( [ this , rid = _rid ] ( ) {
link_manager . _router . loop ( ) - > call ( [ this , rid = _rid ] ( ) {
// deletion from pending_conns, pending_conn_msg_queue, active_conns, etc is taken care
// deletion from pending_conns, pending_conn_msg_queue, active_conns, etc is taken care
// of by LinkManager::on_conn_closed
// of by LinkManager::on_conn_closed
if ( auto itr = activ e_conns. find ( rid ) ; itr ! = activ e_conns. end ( ) )
if ( auto itr = servic e_conns. find ( rid ) ; itr ! = servic e_conns. end ( ) )
{
{
auto & conn = * itr - > second - > conn ;
auto & conn = * itr - > second - > conn ;
conn . close_connection ( ) ;
conn . close_connection ( ) ;
@ -195,7 +201,7 @@ namespace llarp
_router . loop ( ) - > call ( [ this , msg = std : : move ( m ) , func = std : : move ( func ) ] ( ) mutable {
_router . loop ( ) - > call ( [ this , msg = std : : move ( m ) , func = std : : move ( func ) ] ( ) mutable {
auto body = msg . body_str ( ) ;
auto body = msg . body_str ( ) ;
auto respond = [ m = std : : move ( msg ) ] ( std : : string response ) mutable {
auto respond = [ m = std : : move ( msg ) ] ( std : : string response ) mutable {
m . respond ( std : : move ( response ) , not m ) ;
m . respond ( std : : move ( response ) , m. is_error ( ) ) ;
} ;
} ;
std : : invoke ( func , this , body , std : : move ( respond ) ) ;
std : : invoke ( func , this , body , std : : move ( respond ) ) ;
} ) ;
} ) ;
@ -207,6 +213,7 @@ namespace llarp
LinkManager : : LinkManager ( Router & r )
LinkManager : : LinkManager ( Router & r )
: _router { r }
: _router { r }
, _is_service_node { _router . is_service_node ( ) }
, quic { std : : make_unique < oxen : : quic : : Network > ( ) }
, quic { std : : make_unique < oxen : : quic : : Network > ( ) }
, tls_creds { oxen : : quic : : GNUTLSCreds : : make_from_ed_keys (
, tls_creds { oxen : : quic : : GNUTLSCreds : : make_from_ed_keys (
{ reinterpret_cast < const char * > ( _router . identity ( ) . data ( ) ) , size_t { 32 } } ,
{ reinterpret_cast < const char * > ( _router . identity ( ) . data ( ) ) , size_t { 32 } } ,
@ -239,20 +246,44 @@ namespace llarp
[ this ] ( oxen : : quic : : connection_interface & ci , uint64_t ec ) {
[ this ] ( oxen : : quic : : connection_interface & ci , uint64_t ec ) {
return on_conn_closed ( ci , ec ) ;
return on_conn_closed ( ci , ec ) ;
} ,
} ,
[ this ] ( oxen : : quic : : dgram_interface & di , bstring dgram ) { recv_data_message ( di , dgram ) ; } ) ;
[ this ] ( oxen : : quic : : dgram_interface & di , bstring dgram ) { recv_data_message ( di , dgram ) ; } ,
tls_creds - > set_key_verify_callback ( [ this ] ( const ustring_view & key , const ustring_view & ) {
is_service_node ( ) ? alpns : : SERVICE_INBOUND : alpns : : CLIENT_INBOUND ,
is_service_node ( ) ? alpns : : SERVICE_OUTBOUND : alpns : : CLIENT_OUTBOUND ) ;
tls_creds - > set_key_verify_callback ( [ this ] ( const ustring_view & key , const ustring_view & alpn ) {
auto is_snode = is_service_node ( ) ;
RouterID other { key . data ( ) } ;
RouterID other { key . data ( ) } ;
auto us = router ( ) . is_bootstrap_seed ( ) ? " Bootstrap seed node " s : " Service node " s ;
if ( is_snode )
{
if ( alpn = = alpns : : C_ALPNS )
{
log : : critical ( logcat , " {} node accepting client connection (remote ID:{})! " , us , other ) ;
return true ;
}
if ( alpn = = alpns : : SN_ALPNS )
{
// verify as service node!
bool result = node_db - > registered_routers ( ) . count ( other ) ;
bool result = node_db - > registered_routers ( ) . count ( other ) ;
log : : critical (
log : : critical (
logcat ,
logcat ,
" {} node was {} to confirm remote (RID:{}) is registered; allowing connection! " ,
" {} node was {} to confirm remote (RID:{}) is registered; allowing connection! " ,
router ( ) . is_bootstrap_seed ( ) ? " Bootstrap seed node " : " Service node " ,
us ,
result ? " able " : " unable " ,
result ? " able " : " unable " ,
other ) ;
other ) ;
return result ;
return result ;
}
log : : critical ( logcat , " {} node received unknown ALPN; rejecting connection! " , us ) ;
return false ;
}
throw std : : runtime_error { " Clients should not be validating inbound connections! " } ;
} ) ;
} ) ;
if ( _router . is_service_node ( ) )
if ( _router . is_service_node ( ) )
{
{
@ -266,16 +297,17 @@ namespace llarp
{
{
const auto & scid = ci . scid ( ) ;
const auto & scid = ci . scid ( ) ;
RouterID rid { ci . remote_key ( ) } ;
RouterID rid { ci . remote_key ( ) } ;
ep . connid_map. emplace ( scid , rid ) ;
ep . service_ connid_map. emplace ( scid , rid ) ;
auto [ itr , b ] = ep . activ e_conns. emplace ( rid , nullptr ) ;
auto [ itr , b ] = ep . servic e_conns. emplace ( rid , nullptr ) ;
log : : critical ( logcat , " Queueing BTStream to be opened... " ) ;
log : : critical ( logcat , " Queueing BTStream to be opened... " ) ;
auto control_stream = ci . queue_stream < oxen : : quic : : BTRequestStream > ( [ this , rid = rid ] (
auto control_stream = ci . queue_incoming_stream < oxen : : quic : : BTRequestStream > (
oxen : : quic : : Stream & ,
[ this , rid = rid ] ( oxen : : quic : : Stream & , uint64_t error_code ) {
uint64_t error_code ) {
log : : warning (
log : : warning (
logcat , " BTRequestStream closed unexpectedly (ec:{}); closing connection... " , error_code ) ;
logcat ,
" BTRequestStream closed unexpectedly (ec:{}); closing connection... " ,
error_code ) ;
ep . close_connection ( rid ) ;
ep . close_connection ( rid ) ;
} ) ;
} ) ;
@ -307,8 +339,8 @@ namespace llarp
if ( auto itr = ep . pending_conns . find ( rid ) ; itr ! = ep . pending_conns . end ( ) )
if ( auto itr = ep . pending_conns . find ( rid ) ; itr ! = ep . pending_conns . end ( ) )
{
{
ep . connid_map. emplace ( scid , rid ) ;
ep . service_ connid_map. emplace ( scid , rid ) ;
auto [ it , b ] = ep . activ e_conns. emplace ( rid , nullptr ) ;
auto [ it , b ] = ep . servic e_conns. emplace ( rid , nullptr ) ;
it - > second = std : : move ( itr - > second ) ;
it - > second = std : : move ( itr - > second ) ;
ep . pending_conns . erase ( itr ) ;
ep . pending_conns . erase ( itr ) ;
@ -338,7 +370,7 @@ namespace llarp
if ( msg . is_control )
if ( msg . is_control )
{
{
log : : critical ( logcat , " Dispatching {} request! " , * msg . endpoint ) ;
log : : critical ( logcat , " Dispatching {} request! " , * msg . endpoint ) ;
ep . activ e_conns[ rid ] - > control_stream - > command (
ep . servic e_conns[ rid ] - > control_stream - > command (
std : : move ( * msg . endpoint ) , std : : move ( msg . body ) , std : : move ( msg . func ) ) ;
std : : move ( * msg . endpoint ) , std : : move ( msg . body ) , std : : move ( msg . func ) ) ;
}
}
else
else
@ -373,15 +405,16 @@ namespace llarp
log : : critical ( quic_cat , " Pending quic connection CID:{} purged successfully " , scid ) ;
log : : critical ( quic_cat , " Pending quic connection CID:{} purged successfully " , scid ) ;
}
}
else if ( const auto & c_itr = ep . connid_map . find ( scid ) ; c_itr ! = ep . connid_map . end ( ) )
else if ( const auto & c_itr = ep . service_connid_map . find ( scid ) ;
c_itr ! = ep . service_connid_map . end ( ) )
{
{
const auto & rid = c_itr - > second ;
const auto & rid = c_itr - > second ;
assert ( _rid = = rid ) ; // this should hold true
assert ( _rid = = rid ) ; // this should hold true
if ( auto m_itr = ep . activ e_conns. find ( rid ) ; m_itr ! = ep . activ e_conns. end ( ) )
if ( auto m_itr = ep . servic e_conns. find ( rid ) ; m_itr ! = ep . servic e_conns. end ( ) )
ep . activ e_conns. erase ( m_itr ) ;
ep . servic e_conns. erase ( m_itr ) ;
ep . connid_map. erase ( c_itr ) ;
ep . service_ connid_map. erase ( c_itr ) ;
log : : critical ( quic_cat , " Quic connection CID:{} purged successfully " , scid ) ;
log : : critical ( quic_cat , " Quic connection CID:{} purged successfully " , scid ) ;
}
}
@ -604,7 +637,7 @@ namespace llarp
bool
bool
LinkManager : : is_service_node ( ) const
LinkManager : : is_service_node ( ) const
{
{
return _ router. is_service_node( ) ;
return _ is_service_node;
}
}
// TODO: this? perhaps no longer necessary in the same way?
// TODO: this? perhaps no longer necessary in the same way?
@ -660,12 +693,12 @@ namespace llarp
}
}
void
void
LinkManager : : gossip_rc (
LinkManager : : gossip_rc ( const RouterID & last_sender , const RemoteRC & rc )
const RouterID & gossip_src , const RouterID & last_sender , std : : string serialized_rc )
{
{
int count = 0 ;
int count = 0 ;
const auto & gossip_src = rc . router_id ( ) ;
for ( auto & [ rid , conn ] : ep . activ e_conns)
for ( auto & [ rid , conn ] : ep . servic e_conns)
{
{
// don't send back to the gossip source or the last sender
// don't send back to the gossip source or the last sender
if ( rid = = gossip_src or rid = = last_sender )
if ( rid = = gossip_src or rid = = last_sender )
@ -678,7 +711,7 @@ namespace llarp
send_control_message (
send_control_message (
rid ,
rid ,
" gossip_rc " s ,
" gossip_rc " s ,
GossipRCMessage : : serialize ( gossip_src, last_sender, serialized_ rc) ,
GossipRCMessage : : serialize ( last_sender, rc) ,
[ ] ( oxen : : quic : : message ) mutable {
[ ] ( oxen : : quic : : message ) mutable {
log : : critical ( logcat , " PLACEHOLDER FOR GOSSIP RC RESPONSE HANDLER " ) ;
log : : critical ( logcat , " PLACEHOLDER FOR GOSSIP RC RESPONSE HANDLER " ) ;
} ) ;
} ) ;
@ -704,7 +737,6 @@ namespace llarp
btdc . required ( " rc " ) ;
btdc . required ( " rc " ) ;
rc = RemoteRC { btdc . consume_dict_data ( ) } ;
rc = RemoteRC { btdc . consume_dict_data ( ) } ;
src . from_string ( btdc . require < std : : string > ( " sender " ) ) ;
src . from_string ( btdc . require < std : : string > ( " sender " ) ) ;
sender . from_string ( btdc . require < std : : string > ( " src " ) ) ;
}
}
catch ( const std : : exception & e )
catch ( const std : : exception & e )
{
{
@ -715,7 +747,7 @@ namespace llarp
if ( node_db - > verify_store_gossip_rc ( rc ) )
if ( node_db - > verify_store_gossip_rc ( rc ) )
{
{
log : : critical ( link_cat , " Received updated RC, forwarding to relay peers. " ) ;
log : : critical ( link_cat , " Received updated RC, forwarding to relay peers. " ) ;
gossip_rc ( src, _router. local_rid ( ) , std: : string { rc. view ( ) } ) ;
gossip_rc ( _router. local_rid ( ) , rc) ;
}
}
else
else
log : : critical ( link_cat , " Received known or old RC, not storing or forwarding. " ) ;
log : : critical ( link_cat , " Received known or old RC, not storing or forwarding. " ) ;
@ -934,7 +966,7 @@ namespace llarp
" fetch_router_ids " s ,
" fetch_router_ids " s ,
m . body_str ( ) ,
m . body_str ( ) ,
[ source_rid = std : : move ( source ) , original = std : : move ( m ) ] ( oxen : : quic : : message m ) mutable {
[ source_rid = std : : move ( source ) , original = std : : move ( m ) ] ( oxen : : quic : : message m ) mutable {
original . respond ( m . body_str ( ) , not m ) ;
original . respond ( m . body_str ( ) , m. is_error ( ) ) ;
} ) ;
} ) ;
return ;
return ;
}
}
@ -994,9 +1026,9 @@ namespace llarp
void
void
LinkManager : : handle_find_name_response ( oxen : : quic : : message m )
LinkManager : : handle_find_name_response ( oxen : : quic : : message m )
{
{
if ( not m )
if ( m. timed_out )
{
{
log : : info ( link_cat , " FindNameMessage failed !" ) ;
log : : info ( link_cat , " FindNameMessage request timed out !" ) ;
return ;
return ;
}
}
@ -1130,7 +1162,7 @@ namespace llarp
" publish_intro " ,
" publish_intro " ,
PublishIntroMessage : : serialize ( introset , relay_order , is_relayed ) ,
PublishIntroMessage : : serialize ( introset , relay_order , is_relayed ) ,
[ respond = std : : move ( respond ) ] ( oxen : : quic : : message m ) {
[ respond = std : : move ( respond ) ] ( oxen : : quic : : message m ) {
if ( not m )
if ( m. timed_out )
return ; // drop if timed out; requester will have timed out as well
return ; // drop if timed out; requester will have timed out as well
respond ( m . body_str ( ) ) ;
respond ( m . body_str ( ) ) ;
} ) ;
} ) ;
@ -1165,10 +1197,13 @@ namespace llarp
addr ) ;
addr ) ;
}
}
// DISCUSS: I feel like ::handle_publish_intro_response should be the callback that handles the
// response to a relayed publish_intro (above line 1131-ish)
void
void
LinkManager : : handle_publish_intro_response ( oxen : : quic : : message m )
LinkManager : : handle_publish_intro_response ( oxen : : quic : : message m )
{
{
if ( not m )
if ( m. timed_out )
{
{
log : : info ( link_cat , " PublishIntroMessage timed out! " ) ;
log : : info ( link_cat , " PublishIntroMessage timed out! " ) ;
return ;
return ;
@ -1271,7 +1306,7 @@ namespace llarp
link_cat ,
link_cat ,
" Relayed FindIntroMessage returned successful response; transmitting to initial "
" Relayed FindIntroMessage returned successful response; transmitting to initial "
" requester " ) ;
" requester " ) ;
else if ( not relay_response)
else if ( relay_response. timed_out )
log : : critical (
log : : critical (
link_cat , " Relayed FindIntroMessage timed out! Notifying initial requester " ) ;
link_cat , " Relayed FindIntroMessage timed out! Notifying initial requester " ) ;
else
else
@ -1298,7 +1333,7 @@ namespace llarp
void
void
LinkManager : : handle_find_intro_response ( oxen : : quic : : message m )
LinkManager : : handle_find_intro_response ( oxen : : quic : : message m )
{
{
if ( not m )
if ( m. timed_out )
{
{
log : : info ( link_cat , " FindIntroMessage timed out! " ) ;
log : : info ( link_cat , " FindIntroMessage timed out! " ) ;
return ;
return ;
@ -1491,12 +1526,12 @@ namespace llarp
" then relaying response " ) ;
" then relaying response " ) ;
_router . path_context ( ) . put_transit_hop ( hop ) ;
_router . path_context ( ) . put_transit_hop ( hop ) ;
}
}
if ( not m )
if ( m. timed_out )
log : : info ( link_cat , " Upstream timed out on path build; relaying timeout " ) ;
log : : info ( link_cat , " Upstream timed out on path build; relaying timeout " ) ;
else
else
log : : info ( link_cat , " Upstream returned path build failure; relaying response " ) ;
log : : info ( link_cat , " Upstream returned path build failure; relaying response " ) ;
m . respond ( m . body_str ( ) , not m ) ;
m . respond ( m . body_str ( ) , m. is_error ( ) ) ;
} ) ;
} ) ;
}
}
catch ( const std : : exception & e )
catch ( const std : : exception & e )
@ -1608,12 +1643,12 @@ namespace llarp
void
void
LinkManager : : handle_obtain_exit_response ( oxen : : quic : : message m )
LinkManager : : handle_obtain_exit_response ( oxen : : quic : : message m )
{
{
if ( not m )
if ( m. timed_out )
{
{
log : : info ( link_cat , " ObtainExitMessage timed out! " ) ;
log : : info ( link_cat , " ObtainExitMessage timed out! " ) ;
return ;
return ;
}
}
if ( m . is_error )
if ( m . is_error () )
{
{
// TODO: what to do here
// TODO: what to do here
}
}
@ -1686,12 +1721,12 @@ namespace llarp
void
void
LinkManager : : handle_update_exit_response ( oxen : : quic : : message m )
LinkManager : : handle_update_exit_response ( oxen : : quic : : message m )
{
{
if ( not m )
if ( m. timed_out )
{
{
log : : info ( link_cat , " UpdateExitMessage timed out! " ) ;
log : : info ( link_cat , " UpdateExitMessage timed out! " ) ;
return ;
return ;
}
}
if ( m . is_error )
if ( m . is_error () )
{
{
// TODO: what to do here
// TODO: what to do here
}
}
@ -1771,12 +1806,12 @@ namespace llarp
void
void
LinkManager : : handle_close_exit_response ( oxen : : quic : : message m )
LinkManager : : handle_close_exit_response ( oxen : : quic : : message m )
{
{
if ( not m )
if ( m. timed_out )
{
{
log : : info ( link_cat , " CloseExitMessage timed out! " ) ;
log : : info ( link_cat , " CloseExitMessage timed out! " ) ;
return ;
return ;
}
}
if ( m . is_error )
if ( m . is_error () )
{
{
// TODO: what to do here
// TODO: what to do here
}
}