@ -15,14 +15,17 @@ namespace llarp
{
const PathID_t OutboundMessageHandler : : zeroID ;
using namespace std : : chrono_literals ;
OutboundMessageHandler : : OutboundMessageHandler ( size_t maxQueueSize )
: outboundQueue ( maxQueueSize ) , removedPaths ( 20 ) , removedSomePaths ( false )
: outboundQueue ( maxQueueSize ) , re centlyRemovedPaths( 5 s ) , removedSomePaths ( false )
{ }
bool
OutboundMessageHandler : : QueueMessage (
const RouterID & remote , const ILinkMessage & msg , SendStatusHandler callback )
{
// if the destination is invalid, callback with failure and return
if ( not _linkManager - > SessionIsClient ( remote ) and not _lookupHandler - > RemoteIsAllowed ( remote ) )
{
DoCallback ( callback , SendStatus : : InvalidRouter ) ;
@ -44,26 +47,31 @@ namespace llarp
std : : copy_n ( buf . base , buf . sz , message . first . data ( ) ) ;
// if we have a session to the destination, queue the message and return
if ( _linkManager - > HasSessionTo ( remote ) )
{
QueueOutboundMessage ( remote , std : : move ( message ) , msg . pathid , priority ) ;
return true ;
}
// if we don't have a session to the destination, queue the message onto
// a special pending session queue for that destination, and then create
// that pending session if there is not already a session establish attempt
// in progress.
bool shouldCreateSession = false ;
{
util : : Lock l ( _mutex ) ;
// create queue for <remote> if it doesn't exist, and get iterator
auto itr_pair = pendingSessionMessageQueues . emplace ( remote , MessageQueue ( ) ) ;
auto [ queue_itr , is_new ] = pendingSessionMessageQueues . emplace ( remote , MessageQueue ( ) ) ;
MessageQueueEntry entry ;
entry . priority = priority ;
entry . message = message ;
entry . router = remote ;
itr_pair. first - > second . push ( std : : move ( entry ) ) ;
queue_ itr- > second . push ( std : : move ( entry ) ) ;
shouldCreateSession = i tr_pair. second ;
shouldCreateSession = i s_new ;
}
if ( shouldCreateSession )
@ -77,26 +85,33 @@ namespace llarp
void
OutboundMessageHandler : : Tick ( )
{
m_Killer . TryAccess ( [ self = this ] ( ) {
self- > ProcessOutboundQueue ( ) ;
self- > RemoveEmptyPathQueues ( ) ;
self- > SendRoundRobin( ) ;
m_Killer . TryAccess ( [ this ] ( ) {
recentlyRemovedPaths. Decay ( ) ;
ProcessOutboundQueue ( ) ;
SendRoundRobin( ) ;
} ) ;
}
void
OutboundMessageHandler : : Queue RemoveEmpty Path( const PathID_t & pathid )
OutboundMessageHandler : : RemovePath( const PathID_t & pathid )
{
m_Killer . TryAccess ( [ self = this , pathid ] ( ) {
if ( self - > removedPaths . full ( ) )
m_Killer . TryAccess ( [ this , pathid ] ( ) {
/* add the path id to a list of recently removed paths to act as a filter
* for messages that are queued but haven ' t been sorted into path queues yet .
*
* otherwise these messages would re - create the path queue we just removed , and
* those path queues would be leaked / never removed .
*/
recentlyRemovedPaths . Insert ( pathid ) ;
auto itr = outboundMessageQueues . find ( pathid ) ;
if ( itr ! = outboundMessageQueues . end ( ) )
{
self - > RemoveEmptyPathQueues ( ) ;
outboundMessageQueues. erase ( itr ) ;
}
self - > removedPaths . pushBack ( pathid ) ;
removedSomePaths = true ;
} ) ;
}
// TODO: this
util : : StatusObject
OutboundMessageHandler : : ExtractStatus ( ) const
{
@ -241,6 +256,8 @@ namespace llarp
{
MessageQueueEntry entry ;
entry . message = std : : move ( msg ) ;
// copy callback in case we need to call it, so we can std::move(entry)
auto callback_copy = entry . message . second ;
entry . router = remote ;
entry . pathid = pathid ;
@ -266,17 +283,23 @@ namespace llarp
{
while ( not outboundQueue . empty ( ) )
{
// TODO: can we add util::thread::Queue::front() for move semantics here?
MessageQueueEntry entry = outboundQueue . popFront ( ) ;
auto itr_pair = outboundMessageQueues . emplace ( entry . pathid , MessageQueue ( ) ) ;
// messages may still be queued for processing when a pathid is removed,
// so check here if the pathid was recently removed.
if ( recentlyRemovedPaths . Contains ( entry . pathid ) )
{
return ;
}
auto [ queue_itr , is_new ] = outboundMessageQueues . emplace ( entry . pathid , MessageQueue ( ) ) ;
if ( itr_pair . second & & ! entry . pathid . IsZero ( ) )
if ( i s_new & & ! entry . pathid . IsZero ( ) )
{
roundRobinOrder . push ( entry . pathid ) ;
}
MessageQueue & path_queue = itr_pair . first - > second ;
MessageQueue & path_queue = queue_ itr- > second ;
if ( path_queue . size ( ) < MAX_PATH_QUEUE_SIZE | | entry . pathid . IsZero ( ) )
{
@ -290,41 +313,25 @@ namespace llarp
}
}
void
OutboundMessageHandler : : RemoveEmptyPathQueues ( )
{
removedSomePaths = false ;
if ( removedPaths . empty ( ) )
return ;
while ( not removedPaths . empty ( ) )
{
auto itr = outboundMessageQueues . find ( removedPaths . popFront ( ) ) ;
if ( itr ! = outboundMessageQueues . end ( ) )
{
outboundMessageQueues . erase ( itr ) ;
}
}
removedSomePaths = true ;
}
void
OutboundMessageHandler : : SendRoundRobin ( )
{
m_queueStats . numTicks + + ;
// send non- routing messages first priority
auto & non_ routing_mq = outboundMessageQueues [ zeroID ] ;
while ( not non_ routing_mq. empty ( ) )
// send routing messages first priority
auto & routing_mq = outboundMessageQueues [ zeroID ] ;
while ( not routing_mq . empty ( ) )
{
const MessageQueueEntry & entry = non_ routing_mq. top ( ) ;
const MessageQueueEntry & entry = routing_mq . top ( ) ;
Send ( entry . router , entry . message ) ;
non_ routing_mq. pop ( ) ;
routing_mq . pop ( ) ;
}
size_t empty_count = 0 ;
size_t num_queues = roundRobinOrder . size ( ) ;
// if any paths have been removed since last tick, remove any stale
// entries from the round-robin ordering
if ( removedSomePaths )
{
for ( size_t i = 0 ; i < num_queues ; i + + )
@ -338,6 +345,7 @@ namespace llarp
}
}
}
removedSomePaths = false ;
num_queues = roundRobinOrder . size ( ) ;
size_t sent_count = 0 ;
@ -346,7 +354,10 @@ namespace llarp
return ;
}
while ( sent_count < MAX_OUTBOUND_MESSAGES_PER_TICK ) // TODO: better stop condition
// send messages for each pathid in roundRobinOrder, stopping when
// either every path's queue is empty or a set maximum amount of
// messages have been sent.
while ( sent_count < MAX_OUTBOUND_MESSAGES_PER_TICK )
{
PathID_t pathid = std : : move ( roundRobinOrder . front ( ) ) ;
roundRobinOrder . pop ( ) ;