#include "outbound_message_handler.hpp" #include #include "i_outbound_session_maker.hpp" #include "i_rc_lookup_handler.hpp" #include #include #include #include #include #include namespace llarp { const PathID_t OutboundMessageHandler::zeroID; OutboundMessageHandler::OutboundMessageHandler(size_t maxQueueSize) : outboundQueue(maxQueueSize), removedPaths(20), removedSomePaths(false) {} bool OutboundMessageHandler::QueueMessage( const RouterID& remote, const ILinkMessage* msg, SendStatusHandler callback) { if (not _linkManager->SessionIsClient(remote) and not _lookupHandler->RemoteIsAllowed(remote)) { DoCallback(callback, SendStatus::InvalidRouter); return true; } const uint16_t priority = msg->Priority(); std::array linkmsg_buffer; llarp_buffer_t buf(linkmsg_buffer); if (!EncodeBuffer(msg, buf)) { return false; } Message message; message.first.resize(buf.sz); message.second = callback; std::copy_n(buf.base, buf.sz, message.first.data()); if (_linkManager->HasSessionTo(remote)) { QueueOutboundMessage(remote, std::move(message), msg->pathid, priority); return true; } bool shouldCreateSession = false; { util::Lock l(_mutex); // create queue for if it doesn't exist, and get iterator auto itr_pair = pendingSessionMessageQueues.emplace(remote, MessageQueue()); MessageQueueEntry entry; entry.priority = priority; entry.message = message; entry.router = remote; itr_pair.first->second.push(std::move(entry)); shouldCreateSession = itr_pair.second; } if (shouldCreateSession) { QueueSessionCreation(remote); } return true; } void OutboundMessageHandler::Tick() { m_Killer.TryAccess([self = this]() { self->ProcessOutboundQueue(); self->RemoveEmptyPathQueues(); self->SendRoundRobin(); }); } void OutboundMessageHandler::QueueRemoveEmptyPath(const PathID_t& pathid) { m_Killer.TryAccess([self = this, pathid]() { if (self->removedPaths.full()) { self->RemoveEmptyPathQueues(); } self->removedPaths.pushBack(pathid); }); } // TODO: this util::StatusObject OutboundMessageHandler::ExtractStatus() const { util::StatusObject status{ "queueStats", {{"queued", m_queueStats.queued}, {"dropped", m_queueStats.dropped}, {"sent", m_queueStats.sent}, {"queueWatermark", m_queueStats.queueWatermark}, {"perTickMax", m_queueStats.perTickMax}, {"numTicks", m_queueStats.numTicks}}}; return status; } void OutboundMessageHandler::Init( ILinkManager* linkManager, I_RCLookupHandler* lookupHandler, EventLoop_ptr loop) { _linkManager = linkManager; _lookupHandler = lookupHandler; _loop = std::move(loop); outboundMessageQueues.emplace(zeroID, MessageQueue()); } void OutboundMessageHandler::OnSessionEstablished(const RouterID& router) { FinalizeSessionRequest(router, SendStatus::Success); } void OutboundMessageHandler::OnConnectTimeout(const RouterID& router) { FinalizeSessionRequest(router, SendStatus::Timeout); } void OutboundMessageHandler::OnRouterNotFound(const RouterID& router) { FinalizeSessionRequest(router, SendStatus::RouterNotFound); } void OutboundMessageHandler::OnInvalidRouter(const RouterID& router) { FinalizeSessionRequest(router, SendStatus::InvalidRouter); } void OutboundMessageHandler::OnNoLink(const RouterID& router) { FinalizeSessionRequest(router, SendStatus::NoLink); } void OutboundMessageHandler::OnSessionResult(const RouterID& router, const SessionResult result) { switch (result) { case SessionResult::Establish: OnSessionEstablished(router); break; case SessionResult::Timeout: OnConnectTimeout(router); break; case SessionResult::RouterNotFound: OnRouterNotFound(router); break; case SessionResult::InvalidRouter: OnInvalidRouter(router); break; case SessionResult::NoLink: OnNoLink(router); break; default: LogError("Impossible situation: enum class value out of bounds."); std::abort(); break; } } void OutboundMessageHandler::DoCallback(SendStatusHandler callback, SendStatus status) { if (callback) _loop->call([f = std::move(callback), status] { f(status); }); } void OutboundMessageHandler::QueueSessionCreation(const RouterID& remote) { auto fn = util::memFn(&OutboundMessageHandler::OnSessionResult, this); _linkManager->GetSessionMaker()->CreateSessionTo(remote, fn); } bool OutboundMessageHandler::EncodeBuffer(const ILinkMessage* msg, llarp_buffer_t& buf) { if (!msg->BEncode(&buf)) { LogWarn("failed to encode outbound message, buffer size left: ", buf.size_left()); return false; } // set size of message buf.sz = buf.cur - buf.base; buf.cur = buf.base; return true; } bool OutboundMessageHandler::Send(const RouterID& remote, const Message& msg) { const llarp_buffer_t buf(msg.first); auto callback = msg.second; m_queueStats.sent++; return _linkManager->SendTo(remote, buf, [=](ILinkSession::DeliveryStatus status) { if (status == ILinkSession::DeliveryStatus::eDeliverySuccess) DoCallback(callback, SendStatus::Success); else { DoCallback(callback, SendStatus::Congestion); } }); } bool OutboundMessageHandler::SendIfSession(const RouterID& remote, const Message& msg) { if (_linkManager->HasSessionTo(remote)) { return Send(remote, msg); } return false; } bool OutboundMessageHandler::QueueOutboundMessage( const RouterID& remote, Message&& msg, const PathID_t& pathid, uint16_t priority) { MessageQueueEntry entry; entry.message = std::move(msg); auto callback_copy = entry.message.second; entry.router = remote; entry.pathid = pathid; entry.priority = priority; if (outboundQueue.tryPushBack(std::move(entry)) != llarp::thread::QueueReturn::Success) { m_queueStats.dropped++; LogWarn( "QueueOutboundMessage outbound message handler dropped message on " "pathid=", pathid); DoCallback(callback_copy, SendStatus::Congestion); } else { m_queueStats.queued++; uint32_t queueSize = outboundQueue.size(); m_queueStats.queueWatermark = std::max(queueSize, m_queueStats.queueWatermark); } return true; } void OutboundMessageHandler::ProcessOutboundQueue() { while (not outboundQueue.empty()) { // TODO: can we add util::thread::Queue::front() for move semantics here? MessageQueueEntry entry = outboundQueue.popFront(); auto itr_pair = outboundMessageQueues.emplace(entry.pathid, MessageQueue()); if (itr_pair.second && !entry.pathid.IsZero()) { roundRobinOrder.push(entry.pathid); } MessageQueue& path_queue = itr_pair.first->second; if (path_queue.size() < MAX_PATH_QUEUE_SIZE || entry.pathid.IsZero()) { path_queue.push(std::move(entry)); } else { LogWarn( "ProcessOutboundQueue outbound message handler dropped message on " "pathid=", entry.pathid); DoCallback(entry.message.second, SendStatus::Congestion); m_queueStats.dropped++; } } } void OutboundMessageHandler::RemoveEmptyPathQueues() { removedSomePaths = false; if (removedPaths.empty()) return; while (not removedPaths.empty()) { auto itr = outboundMessageQueues.find(removedPaths.popFront()); if (itr != outboundMessageQueues.end()) { outboundMessageQueues.erase(itr); } } removedSomePaths = true; } void OutboundMessageHandler::SendRoundRobin() { m_queueStats.numTicks++; // send non-routing messages first priority auto& non_routing_mq = outboundMessageQueues[zeroID]; while (not non_routing_mq.empty()) { const MessageQueueEntry& entry = non_routing_mq.top(); Send(entry.router, entry.message); non_routing_mq.pop(); } size_t empty_count = 0; size_t num_queues = roundRobinOrder.size(); if (removedSomePaths) { for (size_t i = 0; i < num_queues; i++) { PathID_t pathid = std::move(roundRobinOrder.front()); roundRobinOrder.pop(); if (outboundMessageQueues.find(pathid) != outboundMessageQueues.end()) { roundRobinOrder.push(std::move(pathid)); } } } num_queues = roundRobinOrder.size(); size_t sent_count = 0; if (num_queues == 0) // if no queues, return { return; } while (sent_count < MAX_OUTBOUND_MESSAGES_PER_TICK) // TODO: better stop condition { PathID_t pathid = std::move(roundRobinOrder.front()); roundRobinOrder.pop(); auto& message_queue = outboundMessageQueues[pathid]; if (message_queue.size() > 0) { const MessageQueueEntry& entry = message_queue.top(); Send(entry.router, entry.message); message_queue.pop(); empty_count = 0; sent_count++; } else { empty_count++; } roundRobinOrder.push(std::move(pathid)); // if num_queues empty queues in a row, all queues empty. if (empty_count == num_queues) { break; } } m_queueStats.perTickMax = std::max((uint32_t)sent_count, m_queueStats.perTickMax); } void OutboundMessageHandler::FinalizeSessionRequest(const RouterID& router, SendStatus status) { MessageQueue movedMessages; { util::Lock l(_mutex); auto itr = pendingSessionMessageQueues.find(router); if (itr == pendingSessionMessageQueues.end()) { return; } movedMessages.swap(itr->second); pendingSessionMessageQueues.erase(itr); } while (!movedMessages.empty()) { const MessageQueueEntry& entry = movedMessages.top(); if (status == SendStatus::Success) { Send(entry.router, entry.message); } else { DoCallback(entry.message.second, status); } movedMessages.pop(); } } } // namespace llarp