|
|
|
@ -89,7 +89,17 @@ namespace llarp
|
|
|
|
|
util::StatusObject
|
|
|
|
|
OutboundMessageHandler::ExtractStatus() const
|
|
|
|
|
{
|
|
|
|
|
util::StatusObject status{};
|
|
|
|
|
util::StatusObject status{
|
|
|
|
|
"queueStats", {
|
|
|
|
|
{ "queued", m_queueStats.queued },
|
|
|
|
|
{ "dropped", m_queueStats.dropped },
|
|
|
|
|
{ "sent", m_queueStats.sent },
|
|
|
|
|
{ "queueWatermark", m_queueStats.queueWatermark },
|
|
|
|
|
{ "perTickMax", m_queueStats.perTickMax },
|
|
|
|
|
{ "numTicks", m_queueStats.numTicks }
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
return status;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -201,6 +211,7 @@ namespace llarp
|
|
|
|
|
{
|
|
|
|
|
const llarp_buffer_t buf(msg.first);
|
|
|
|
|
auto callback = msg.second;
|
|
|
|
|
m_queueStats.sent++;
|
|
|
|
|
return _linkManager->SendTo(
|
|
|
|
|
remote, buf, [=](ILinkSession::DeliveryStatus status) {
|
|
|
|
|
if(status == ILinkSession::DeliveryStatus::eDeliverySuccess)
|
|
|
|
@ -234,8 +245,19 @@ namespace llarp
|
|
|
|
|
if(outboundQueue.tryPushBack(std::move(entry))
|
|
|
|
|
!= llarp::thread::QueueReturn::Success)
|
|
|
|
|
{
|
|
|
|
|
m_queueStats.dropped++;
|
|
|
|
|
DoCallback(callback_copy, SendStatus::Congestion);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
m_queueStats.queued++;
|
|
|
|
|
|
|
|
|
|
// calculate queue high watermark
|
|
|
|
|
size_t queueSize = outboundQueue.size();
|
|
|
|
|
if (queueSize > m_queueStats.queueWatermark)
|
|
|
|
|
m_queueStats.queueWatermark = queueSize;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
@ -259,6 +281,7 @@ namespace llarp
|
|
|
|
|
MessageQueue &path_queue = itr_pair.first->second;
|
|
|
|
|
if(path_queue.size() >= MAX_PATH_QUEUE_SIZE)
|
|
|
|
|
{
|
|
|
|
|
m_queueStats.dropped++;
|
|
|
|
|
path_queue.pop(); // head drop
|
|
|
|
|
}
|
|
|
|
|
path_queue.push(std::move(entry));
|
|
|
|
@ -286,6 +309,8 @@ namespace llarp
|
|
|
|
|
void
|
|
|
|
|
OutboundMessageHandler::SendRoundRobin()
|
|
|
|
|
{
|
|
|
|
|
m_queueStats.numTicks++;
|
|
|
|
|
|
|
|
|
|
// send non-routing messages first priority
|
|
|
|
|
auto &non_routing_mq = outboundMessageQueues[zeroID];
|
|
|
|
|
while(not non_routing_mq.empty())
|
|
|
|
@ -349,6 +374,10 @@ namespace llarp
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (sent_count > m_queueStats.perTickMax)
|
|
|
|
|
m_queueStats.perTickMax = sent_count;
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void
|
|
|
|
|