#ifndef LLARP_ROUTER_OUTBOUND_MESSAGE_HANDLER_HPP #define LLARP_ROUTER_OUTBOUND_MESSAGE_HANDLER_HPP #include #include #include #include #include #include #include #include #include struct llarp_buffer_t; namespace llarp { struct ILinkManager; class Logic; enum class SessionResult; struct OutboundMessageHandler final : public IOutboundMessageHandler { public: ~OutboundMessageHandler() override = default; OutboundMessageHandler(size_t maxQueueSize = MAX_OUTBOUND_QUEUE_SIZE); bool QueueMessage(const RouterID& remote, const ILinkMessage* msg, SendStatusHandler callback) override EXCLUDES(_mutex); void Tick() override; void QueueRemoveEmptyPath(const PathID_t& pathid) override; util::StatusObject ExtractStatus() const override; void Init(ILinkManager* linkManager, std::shared_ptr logic); private: using Message = std::pair, SendStatusHandler>; struct MessageQueueEntry { uint16_t priority; Message message; PathID_t pathid; RouterID router; bool operator<(const MessageQueueEntry& other) const { return other.priority < priority; } }; struct MessageQueueStats { uint64_t queued = 0; uint64_t dropped = 0; uint64_t sent = 0; uint32_t queueWatermark = 0; uint32_t perTickMax = 0; uint32_t numTicks = 0; }; using MessageQueue = std::priority_queue; void OnSessionEstablished(const RouterID& router); void OnConnectTimeout(const RouterID& router); void OnRouterNotFound(const RouterID& router); void OnInvalidRouter(const RouterID& router); void OnNoLink(const RouterID& router); void OnSessionResult(const RouterID& router, const SessionResult result); void DoCallback(SendStatusHandler callback, SendStatus status); void QueueSessionCreation(const RouterID& remote); bool EncodeBuffer(const ILinkMessage* msg, llarp_buffer_t& buf); bool Send(const RouterID& remote, const Message& msg); bool SendIfSession(const RouterID& remote, const Message& msg); bool QueueOutboundMessage( const RouterID& remote, Message&& msg, const PathID_t& pathid, uint16_t priority = 0); void ProcessOutboundQueue(); void RemoveEmptyPathQueues(); void SendRoundRobin(); void FinalizeSessionRequest(const RouterID& router, SendStatus status) EXCLUDES(_mutex); llarp::thread::Queue outboundQueue; llarp::thread::Queue removedPaths; bool removedSomePaths; mutable util::Mutex _mutex; // protects pendingSessionMessageQueues std::unordered_map pendingSessionMessageQueues GUARDED_BY(_mutex); std::unordered_map outboundMessageQueues; std::queue roundRobinOrder; ILinkManager* _linkManager; std::shared_ptr _logic; util::ContentionKiller m_Killer; // paths cannot have pathid "0", so it can be used as the "pathid" // for non-traffic (control) messages, so they can be prioritized. static const PathID_t zeroID; MessageQueueStats m_queueStats; }; } // namespace llarp #endif // LLARP_ROUTER_OUTBOUND_MESSAGE_HANDLER_HPP