lokinet/llarp/router/outbound_message_handler.cpp

420 lines
10 KiB
C++
Raw Normal View History

#include <router/outbound_message_handler.hpp>
#include <messages/link_message.hpp>
#include <router/i_outbound_session_maker.hpp>
#include <link/i_link_manager.hpp>
#include <constants/link_layer.hpp>
2019-09-01 12:38:03 +00:00
#include <util/meta/memfn.hpp>
#include <util/status.hpp>
#include <algorithm>
#include <cstdlib>
namespace llarp
{
const PathID_t OutboundMessageHandler::zeroID;
OutboundMessageHandler::OutboundMessageHandler(size_t maxQueueSize)
: outboundQueue(maxQueueSize), removedPaths(20), removedSomePaths(false)
{
}
bool
OutboundMessageHandler::QueueMessage(
const RouterID& remote, const ILinkMessage* msg, SendStatusHandler callback)
{
2020-01-17 16:33:37 +00:00
const uint16_t priority = msg->Priority();
std::array<byte_t, MAX_LINK_MSG_SIZE> linkmsg_buffer;
llarp_buffer_t buf(linkmsg_buffer);
if (!EncodeBuffer(msg, buf))
{
return false;
}
Message message;
message.first.resize(buf.sz);
message.second = callback;
std::copy_n(buf.base, buf.sz, message.first.data());
if (_linkManager->HasSessionTo(remote))
{
2020-01-17 16:33:37 +00:00
QueueOutboundMessage(remote, std::move(message), msg->pathid, priority);
return true;
}
bool shouldCreateSession = false;
{
De-abseil, part 2: mutex, locks, (most) time - util::Mutex is now a std::shared_timed_mutex, which is capable of exclusive and shared locks. - util::Lock is still present as a std::lock_guard<util::Mutex>. - the locking annotations are preserved, but updated to the latest supported by clang rather than using abseil's older/deprecated ones. - ACQUIRE_LOCK macro is gone since we don't pass mutexes by pointer into locks anymore (WTF abseil). - ReleasableLock is gone. Instead there are now some llarp::util helper methods to obtain unique and/or shared locks: - `auto lock = util::unique_lock(mutex);` gets an RAII-but-also unlockable object (std::unique_lock<T>, with T inferred from `mutex`). - `auto lock = util::shared_lock(mutex);` gets an RAII shared (i.e. "reader") lock of the mutex. - `auto lock = util::unique_locks(mutex1, mutex2, mutex3);` can be used to atomically lock multiple mutexes at once (returning a tuple of the locks). This are templated on the mutex which makes them a bit more flexible than using a concrete type: they can be used for any type of lockable mutex, not only util::Mutex. (Some of the code here uses them for getting locks around a std::mutex). Until C++17, using the RAII types is painfully verbose: ```C++ // pre-C++17 - needing to figure out the mutex type here is annoying: std::unique_lock<util::Mutex> lock(mutex); // pre-C++17 and even more verbose (but at least the type isn't needed): std::unique_lock<decltype(mutex)> lock(mutex); // our compromise: auto lock = util::unique_lock(mutex); // C++17: std::unique_lock lock(mutex); ``` All of these functions will also warn (under gcc or clang) if you discard the return value. You can also do fancy things like `auto l = util::unique_lock(mutex, std::adopt_lock)` (which lets a lock take over an already-locked mutex). - metrics code is gone, which also removes a big pile of code that was only used by metrics: - llarp::util::Scheduler - llarp::thread::TimerQueue - llarp::util::Stopwatch
2020-02-21 17:21:11 +00:00
util::Lock l(_mutex);
// create queue for <remote> if it doesn't exist, and get iterator
auto itr_pair = pendingSessionMessageQueues.emplace(remote, MessageQueue());
MessageQueueEntry entry;
2020-01-17 16:33:37 +00:00
entry.priority = priority;
entry.message = message;
entry.router = remote;
itr_pair.first->second.push(std::move(entry));
shouldCreateSession = itr_pair.second;
}
if (shouldCreateSession)
{
QueueSessionCreation(remote);
}
return true;
}
void
OutboundMessageHandler::Tick()
{
m_Killer.TryAccess([self = this]() {
2019-11-14 18:50:45 +00:00
self->ProcessOutboundQueue();
self->RemoveEmptyPathQueues();
self->SendRoundRobin();
});
}
void
OutboundMessageHandler::QueueRemoveEmptyPath(const PathID_t& pathid)
{
m_Killer.TryAccess([self = this, pathid]() {
if (self->removedPaths.full())
{
self->RemoveEmptyPathQueues();
}
self->removedPaths.pushBack(pathid);
});
}
// TODO: this
util::StatusObject
OutboundMessageHandler::ExtractStatus() const
{
2020-01-17 18:19:53 +00:00
util::StatusObject status{"queueStats",
{{"queued", m_queueStats.queued},
{"dropped", m_queueStats.dropped},
{"sent", m_queueStats.sent},
{"queueWatermark", m_queueStats.queueWatermark},
{"perTickMax", m_queueStats.perTickMax},
{"numTicks", m_queueStats.numTicks}}};
return status;
}
void
OutboundMessageHandler::Init(ILinkManager* linkManager, std::shared_ptr<Logic> logic)
{
_linkManager = linkManager;
_logic = logic;
outboundMessageQueues.emplace(zeroID, MessageQueue());
}
void
OutboundMessageHandler::OnSessionEstablished(const RouterID& router)
{
FinalizeSessionRequest(router, SendStatus::Success);
}
void
OutboundMessageHandler::OnConnectTimeout(const RouterID& router)
{
FinalizeSessionRequest(router, SendStatus::Timeout);
}
void
OutboundMessageHandler::OnRouterNotFound(const RouterID& router)
{
FinalizeSessionRequest(router, SendStatus::RouterNotFound);
}
void
OutboundMessageHandler::OnInvalidRouter(const RouterID& router)
{
FinalizeSessionRequest(router, SendStatus::InvalidRouter);
}
void
OutboundMessageHandler::OnNoLink(const RouterID& router)
{
FinalizeSessionRequest(router, SendStatus::NoLink);
}
void
OutboundMessageHandler::OnSessionResult(const RouterID& router, const SessionResult result)
{
switch (result)
{
case SessionResult::Establish:
OnSessionEstablished(router);
break;
case SessionResult::Timeout:
OnConnectTimeout(router);
break;
case SessionResult::RouterNotFound:
OnRouterNotFound(router);
break;
case SessionResult::InvalidRouter:
OnInvalidRouter(router);
break;
case SessionResult::NoLink:
OnNoLink(router);
break;
default:
LogError("Impossible situation: enum class value out of bounds.");
std::abort();
break;
}
}
void
OutboundMessageHandler::DoCallback(SendStatusHandler callback, SendStatus status)
{
if (callback)
{
auto f = std::bind(callback, status);
LogicCall(_logic, [self = this, f]() { self->m_Killer.TryAccess(f); });
}
}
void
OutboundMessageHandler::QueueSessionCreation(const RouterID& remote)
{
auto fn = util::memFn(&OutboundMessageHandler::OnSessionResult, this);
_linkManager->GetSessionMaker()->CreateSessionTo(remote, fn);
}
bool
OutboundMessageHandler::EncodeBuffer(const ILinkMessage* msg, llarp_buffer_t& buf)
{
if (!msg->BEncode(&buf))
{
LogWarn("failed to encode outbound message, buffer size left: ", buf.size_left());
return false;
}
// set size of message
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
return true;
}
bool
OutboundMessageHandler::Send(const RouterID& remote, const Message& msg)
{
2019-07-26 16:19:31 +00:00
const llarp_buffer_t buf(msg.first);
auto callback = msg.second;
m_queueStats.sent++;
return _linkManager->SendTo(remote, buf, [=](ILinkSession::DeliveryStatus status) {
if (status == ILinkSession::DeliveryStatus::eDeliverySuccess)
DoCallback(callback, SendStatus::Success);
else
{
LogWarn("Send outbound message handler dropped message");
DoCallback(callback, SendStatus::Congestion);
}
});
}
bool
OutboundMessageHandler::SendIfSession(const RouterID& remote, const Message& msg)
{
if (_linkManager->HasSessionTo(remote))
{
return Send(remote, msg);
}
return false;
}
bool
OutboundMessageHandler::QueueOutboundMessage(
const RouterID& remote, Message&& msg, const PathID_t& pathid, uint16_t priority)
{
MessageQueueEntry entry;
entry.message = std::move(msg);
auto callback_copy = entry.message.second;
entry.router = remote;
entry.pathid = pathid;
entry.priority = priority;
if (outboundQueue.tryPushBack(std::move(entry)) != llarp::thread::QueueReturn::Success)
{
m_queueStats.dropped++;
2020-01-21 17:31:48 +00:00
LogWarn(
"QueueOutboundMessage outbound message handler dropped message on "
"pathid=",
pathid);
DoCallback(callback_copy, SendStatus::Congestion);
}
else
{
m_queueStats.queued++;
2020-01-17 18:19:53 +00:00
uint32_t queueSize = outboundQueue.size();
m_queueStats.queueWatermark = std::max(queueSize, m_queueStats.queueWatermark);
}
return true;
}
void
OutboundMessageHandler::ProcessOutboundQueue()
{
while (not outboundQueue.empty())
{
// TODO: can we add util::thread::Queue::front() for move semantics here?
MessageQueueEntry entry = outboundQueue.popFront();
auto itr_pair = outboundMessageQueues.emplace(entry.pathid, MessageQueue());
if (itr_pair.second && !entry.pathid.IsZero())
{
roundRobinOrder.push(entry.pathid);
}
MessageQueue& path_queue = itr_pair.first->second;
2020-01-18 21:59:50 +00:00
if (path_queue.size() < MAX_PATH_QUEUE_SIZE || entry.pathid.IsZero())
2020-01-18 21:59:50 +00:00
{
path_queue.push(std::move(entry));
}
else
{
2020-01-21 17:31:48 +00:00
LogWarn(
"ProcessOutboundQueue outbound message handler dropped message on "
"pathid=",
entry.pathid);
2020-01-18 22:03:24 +00:00
DoCallback(entry.message.second, SendStatus::Congestion);
m_queueStats.dropped++;
}
}
}
void
OutboundMessageHandler::RemoveEmptyPathQueues()
{
2019-11-14 18:50:45 +00:00
removedSomePaths = false;
if (removedPaths.empty())
2019-11-14 18:50:45 +00:00
return;
while (not removedPaths.empty())
{
auto itr = outboundMessageQueues.find(removedPaths.popFront());
if (itr != outboundMessageQueues.end())
{
outboundMessageQueues.erase(itr);
}
}
2019-11-14 18:50:45 +00:00
removedSomePaths = true;
}
void
OutboundMessageHandler::SendRoundRobin()
{
m_queueStats.numTicks++;
// send non-routing messages first priority
auto& non_routing_mq = outboundMessageQueues[zeroID];
while (not non_routing_mq.empty())
{
const MessageQueueEntry& entry = non_routing_mq.top();
Send(entry.router, entry.message);
2020-01-17 16:33:37 +00:00
non_routing_mq.pop();
}
size_t empty_count = 0;
size_t num_queues = roundRobinOrder.size();
if (removedSomePaths)
{
for (size_t i = 0; i < num_queues; i++)
{
PathID_t pathid = std::move(roundRobinOrder.front());
roundRobinOrder.pop();
if (outboundMessageQueues.find(pathid) != outboundMessageQueues.end())
{
roundRobinOrder.push(std::move(pathid));
}
}
}
num_queues = roundRobinOrder.size();
size_t sent_count = 0;
if (num_queues == 0) // if no queues, return
{
return;
}
while (sent_count < MAX_OUTBOUND_MESSAGES_PER_TICK) // TODO: better stop condition
{
PathID_t pathid = std::move(roundRobinOrder.front());
roundRobinOrder.pop();
auto& message_queue = outboundMessageQueues[pathid];
if (message_queue.size() > 0)
{
const MessageQueueEntry& entry = message_queue.top();
Send(entry.router, entry.message);
2020-01-17 16:33:37 +00:00
message_queue.pop();
empty_count = 0;
sent_count++;
}
else
{
empty_count++;
}
roundRobinOrder.push(std::move(pathid));
// if num_queues empty queues in a row, all queues empty.
if (empty_count == num_queues)
{
break;
}
}
m_queueStats.perTickMax = std::max((uint32_t)sent_count, m_queueStats.perTickMax);
}
void
OutboundMessageHandler::FinalizeSessionRequest(const RouterID& router, SendStatus status)
{
MessageQueue movedMessages;
{
De-abseil, part 2: mutex, locks, (most) time - util::Mutex is now a std::shared_timed_mutex, which is capable of exclusive and shared locks. - util::Lock is still present as a std::lock_guard<util::Mutex>. - the locking annotations are preserved, but updated to the latest supported by clang rather than using abseil's older/deprecated ones. - ACQUIRE_LOCK macro is gone since we don't pass mutexes by pointer into locks anymore (WTF abseil). - ReleasableLock is gone. Instead there are now some llarp::util helper methods to obtain unique and/or shared locks: - `auto lock = util::unique_lock(mutex);` gets an RAII-but-also unlockable object (std::unique_lock<T>, with T inferred from `mutex`). - `auto lock = util::shared_lock(mutex);` gets an RAII shared (i.e. "reader") lock of the mutex. - `auto lock = util::unique_locks(mutex1, mutex2, mutex3);` can be used to atomically lock multiple mutexes at once (returning a tuple of the locks). This are templated on the mutex which makes them a bit more flexible than using a concrete type: they can be used for any type of lockable mutex, not only util::Mutex. (Some of the code here uses them for getting locks around a std::mutex). Until C++17, using the RAII types is painfully verbose: ```C++ // pre-C++17 - needing to figure out the mutex type here is annoying: std::unique_lock<util::Mutex> lock(mutex); // pre-C++17 and even more verbose (but at least the type isn't needed): std::unique_lock<decltype(mutex)> lock(mutex); // our compromise: auto lock = util::unique_lock(mutex); // C++17: std::unique_lock lock(mutex); ``` All of these functions will also warn (under gcc or clang) if you discard the return value. You can also do fancy things like `auto l = util::unique_lock(mutex, std::adopt_lock)` (which lets a lock take over an already-locked mutex). - metrics code is gone, which also removes a big pile of code that was only used by metrics: - llarp::util::Scheduler - llarp::thread::TimerQueue - llarp::util::Stopwatch
2020-02-21 17:21:11 +00:00
util::Lock l(_mutex);
auto itr = pendingSessionMessageQueues.find(router);
if (itr == pendingSessionMessageQueues.end())
{
return;
}
movedMessages.swap(itr->second);
pendingSessionMessageQueues.erase(itr);
}
while (!movedMessages.empty())
{
const MessageQueueEntry& entry = movedMessages.top();
if (status == SendStatus::Success)
{
Send(entry.router, entry.message);
}
else
{
DoCallback(entry.message.second, status);
}
2020-01-17 16:33:37 +00:00
movedMessages.pop();
}
}
} // namespace llarp