diff --git a/llarp/iwp/message_buffer.cpp b/llarp/iwp/message_buffer.cpp index e5098f656..000f80a6a 100644 --- a/llarp/iwp/message_buffer.cpp +++ b/llarp/iwp/message_buffer.cpp @@ -10,12 +10,14 @@ namespace llarp uint64_t msgid, ILinkSession::Message_t msg, llarp_time_t now, - ILinkSession::CompletionHandler handler) + ILinkSession::CompletionHandler handler, + uint16_t priority) : m_Data{std::move(msg)} , m_MsgID{msgid} , m_Completed{handler} , m_LastFlush{now} , m_StartedAt{now} + , m_ResendPriority{priority} { const llarp_buffer_t buf(m_Data); CryptoManager::instance()->shorthash(m_Digest, buf); diff --git a/llarp/iwp/message_buffer.hpp b/llarp/iwp/message_buffer.hpp index e760a8ac5..3ea352064 100644 --- a/llarp/iwp/message_buffer.hpp +++ b/llarp/iwp/message_buffer.hpp @@ -40,7 +40,8 @@ namespace llarp uint64_t msgid, ILinkSession::Message_t data, llarp_time_t now, - ILinkSession::CompletionHandler handler); + ILinkSession::CompletionHandler handler, + uint16_t priority); ILinkSession::Message_t m_Data; uint64_t m_MsgID = 0; @@ -49,6 +50,15 @@ namespace llarp llarp_time_t m_LastFlush = 0s; ShortHash m_Digest; llarp_time_t m_StartedAt = 0s; + uint16_t m_ResendPriority; + + bool + operator<(const OutboundMessage& msg) const + { + // yes, the first order is reversed as higher means more important + // second part is for queue order + return msg.m_ResendPriority < m_ResendPriority or m_MsgID < msg.m_MsgID; + } ILinkSession::Packet_t XMIT() const; diff --git a/llarp/iwp/session.cpp b/llarp/iwp/session.cpp index fcd4097d3..200fa7130 100644 --- a/llarp/iwp/session.cpp +++ b/llarp/iwp/session.cpp @@ -5,6 +5,8 @@ #include #include +#include + namespace llarp { namespace iwp @@ -127,7 +129,7 @@ namespace llarp { LogError("failed to encode LIM for ", m_RemoteAddr); } - if (!SendMessageBuffer(std::move(data), h)) + if (not SendMessageBuffer(std::move(data), h)) { LogError("failed to send LIM to ", m_RemoteAddr); } @@ -183,7 +185,7 @@ namespace llarp bool Session::SendMessageBuffer( - ILinkSession::Message_t buf, ILinkSession::CompletionHandler completed) + ILinkSession::Message_t buf, ILinkSession::CompletionHandler completed, uint16_t priority) { if (m_TXMsgs.size() >= MaxSendQueueSize) { @@ -194,8 +196,9 @@ namespace llarp const auto now = m_Parent->Now(); const auto msgid = m_TXID++; const auto bufsz = buf.size(); - auto& msg = m_TXMsgs.emplace(msgid, OutboundMessage{msgid, std::move(buf), now, completed}) - .first->second; + auto& msg = + m_TXMsgs.emplace(msgid, OutboundMessage{msgid, std::move(buf), now, completed, priority}) + .first->second; TriggerPump(); EncryptAndSend(msg.XMIT()); if (bufsz > FragmentSize) @@ -253,15 +256,22 @@ namespace llarp msg.SendACKS(util::memFn(&Session::EncryptAndSend, this), now); } } + std::priority_queue< + OutboundMessage*, + std::vector, + ComparePtr> + resend; for (auto& [id, msg] : m_TXMsgs) { if (msg.ShouldFlush(now)) - { - msg.FlushUnAcked(util::memFn(&Session::EncryptAndSend, this), now); - } + resend.push(&msg); + } + if (not resend.empty()) + { + for (auto& msg = resend.top(); not resend.empty(); resend.pop()) + msg->FlushUnAcked(util::memFn(&Session::EncryptAndSend, this), now); } } - assert(shared_from_this().use_count() > 1); if (not m_EncryptNext.empty()) { m_Parent->QueueWork( diff --git a/llarp/iwp/session.hpp b/llarp/iwp/session.hpp index dc429f5de..212b32812 100644 --- a/llarp/iwp/session.hpp +++ b/llarp/iwp/session.hpp @@ -60,7 +60,10 @@ namespace llarp Tick(llarp_time_t now) override; bool - SendMessageBuffer(ILinkSession::Message_t msg, CompletionHandler resultHandler) override; + SendMessageBuffer( + ILinkSession::Message_t msg, + CompletionHandler resultHandler, + uint16_t priority = 0) override; void Send_LL(const byte_t* buf, size_t sz); diff --git a/llarp/link/i_link_manager.hpp b/llarp/link/i_link_manager.hpp index abf8b8081..48ffd3d94 100644 --- a/llarp/link/i_link_manager.hpp +++ b/llarp/link/i_link_manager.hpp @@ -30,7 +30,8 @@ namespace llarp SendTo( const RouterID& remote, const llarp_buffer_t& buf, - ILinkSession::CompletionHandler completed) = 0; + ILinkSession::CompletionHandler completed, + uint16_t priority = 0) = 0; virtual bool HasSessionTo(const RouterID& remote) const = 0; diff --git a/llarp/link/link_manager.cpp b/llarp/link/link_manager.cpp index a5682ebdd..116bfb392 100644 --- a/llarp/link/link_manager.cpp +++ b/llarp/link/link_manager.cpp @@ -19,7 +19,7 @@ namespace llarp // TODO: may want to add some memory of session failures for a given // router on a given link and not return that link here for a // duration - if (!link->IsCompatable(rc)) + if (not link->IsCompatable(rc)) continue; return link; @@ -36,7 +36,10 @@ namespace llarp bool LinkManager::SendTo( - const RouterID& remote, const llarp_buffer_t& buf, ILinkSession::CompletionHandler completed) + const RouterID& remote, + const llarp_buffer_t& buf, + ILinkSession::CompletionHandler completed, + uint16_t priority) { if (stopping) return false; @@ -51,7 +54,7 @@ namespace llarp return false; } - return link->SendTo(remote, buf, completed); + return link->SendTo(remote, buf, completed, priority); } bool diff --git a/llarp/link/link_manager.hpp b/llarp/link/link_manager.hpp index 0f8eeac69..77179b713 100644 --- a/llarp/link/link_manager.hpp +++ b/llarp/link/link_manager.hpp @@ -28,7 +28,8 @@ namespace llarp SendTo( const RouterID& remote, const llarp_buffer_t& buf, - ILinkSession::CompletionHandler completed) override; + ILinkSession::CompletionHandler completed, + uint16_t priority) override; bool HasSessionTo(const RouterID& remote) const override; diff --git a/llarp/link/server.cpp b/llarp/link/server.cpp index 083bd71ea..ddc21da76 100644 --- a/llarp/link/server.cpp +++ b/llarp/link/server.cpp @@ -440,7 +440,10 @@ namespace llarp bool ILinkLayer::SendTo( - const RouterID& remote, const llarp_buffer_t& buf, ILinkSession::CompletionHandler completed) + const RouterID& remote, + const llarp_buffer_t& buf, + ILinkSession::CompletionHandler completed, + uint16_t priority) { std::shared_ptr s; { @@ -459,7 +462,7 @@ namespace llarp } ILinkSession::Message_t pkt(buf.sz); std::copy_n(buf.base, buf.sz, pkt.begin()); - return s && s->SendMessageBuffer(std::move(pkt), completed); + return s && s->SendMessageBuffer(std::move(pkt), completed, priority); } bool diff --git a/llarp/link/server.hpp b/llarp/link/server.hpp index 803120454..a976dbbc2 100644 --- a/llarp/link/server.hpp +++ b/llarp/link/server.hpp @@ -148,7 +148,8 @@ namespace llarp SendTo( const RouterID& remote, const llarp_buffer_t& buf, - ILinkSession::CompletionHandler completed); + ILinkSession::CompletionHandler completed, + uint16_t priority); virtual bool GetOurAddressInfo(AddressInfo& addr) const; diff --git a/llarp/link/session.hpp b/llarp/link/session.hpp index ab8b5d4f5..fc8df2414 100644 --- a/llarp/link/session.hpp +++ b/llarp/link/session.hpp @@ -57,7 +57,7 @@ namespace llarp /// send a message buffer to the remote endpoint virtual bool - SendMessageBuffer(Message_t, CompletionHandler handler) = 0; + SendMessageBuffer(Message_t, CompletionHandler handler, uint16_t priority) = 0; /// start the connection virtual void diff --git a/llarp/router/outbound_message_handler.cpp b/llarp/router/outbound_message_handler.cpp index 2a8639d09..2e56591ea 100644 --- a/llarp/router/outbound_message_handler.cpp +++ b/llarp/router/outbound_message_handler.cpp @@ -30,25 +30,28 @@ namespace llarp DoCallback(callback, SendStatus::InvalidRouter); return true; } - const uint16_t priority = msg.Priority(); + MessageQueueEntry ent; + ent.router = remote; + ent.inform = std::move(callback); + ent.pathid = msg.pathid; + ent.priority = msg.Priority(); + std::array linkmsg_buffer; - llarp_buffer_t buf(linkmsg_buffer); + llarp_buffer_t buf{linkmsg_buffer}; if (!EncodeBuffer(msg, buf)) { return false; } - Message message; - message.first.resize(buf.sz); - message.second = callback; + ent.message.resize(buf.sz); - std::copy_n(buf.base, buf.sz, message.first.data()); + std::copy_n(buf.base, buf.sz, ent.message.data()); // if we have a session to the destination, queue the message and return if (_router->linkManager().HasSessionTo(remote)) { - QueueOutboundMessage(remote, std::move(message), msg.pathid, priority); + QueueOutboundMessage(std::move(ent)); return true; } @@ -58,16 +61,11 @@ namespace llarp // in progress. bool shouldCreateSession = false; { - util::Lock l(_mutex); + util::Lock l{_mutex}; // create queue for if it doesn't exist, and get iterator auto [queue_itr, is_new] = pendingSessionMessageQueues.emplace(remote, MessageQueue()); - - MessageQueueEntry entry; - entry.priority = priority; - entry.message = message; - entry.router = remote; - queue_itr->second.push(std::move(entry)); + queue_itr->second.push(std::move(ent)); shouldCreateSession = is_new; } @@ -86,7 +84,7 @@ namespace llarp m_Killer.TryAccess([this]() { recentlyRemovedPaths.Decay(); ProcessOutboundQueue(); - if (/*bool more = */ SendRoundRobin()) + if (SendRoundRobin()) _router->TriggerPump(); }); } @@ -190,52 +188,48 @@ namespace llarp } bool - OutboundMessageHandler::Send(const RouterID& remote, const Message& msg) + OutboundMessageHandler::Send(const MessageQueueEntry& ent) { - const llarp_buffer_t buf(msg.first); - auto callback = msg.second; + const llarp_buffer_t buf{ent.message}; m_queueStats.sent++; - return _router->linkManager().SendTo(remote, buf, [=](ILinkSession::DeliveryStatus status) { - if (status == ILinkSession::DeliveryStatus::eDeliverySuccess) - DoCallback(callback, SendStatus::Success); - else - { - DoCallback(callback, SendStatus::Congestion); - } - }); + SendStatusHandler callback = ent.inform; + return _router->linkManager().SendTo( + ent.router, + buf, + [this, callback](ILinkSession::DeliveryStatus status) { + if (status == ILinkSession::DeliveryStatus::eDeliverySuccess) + DoCallback(callback, SendStatus::Success); + else + { + DoCallback(callback, SendStatus::Congestion); + } + }, + ent.priority); } bool - OutboundMessageHandler::SendIfSession(const RouterID& remote, const Message& msg) + OutboundMessageHandler::SendIfSession(const MessageQueueEntry& ent) { - if (_router->linkManager().HasSessionTo(remote)) + if (_router->linkManager().HasSessionTo(ent.router)) { - return Send(remote, msg); + return Send(ent); } return false; } bool - OutboundMessageHandler::QueueOutboundMessage( - const RouterID& remote, Message&& msg, const PathID_t& pathid, uint16_t priority) + OutboundMessageHandler::QueueOutboundMessage(MessageQueueEntry entry) { - MessageQueueEntry entry; - entry.message = std::move(msg); - // copy callback in case we need to call it, so we can std::move(entry) - auto callback_copy = entry.message.second; - entry.router = remote; - entry.pathid = pathid; - entry.priority = priority; + auto callback = entry.inform; if (outboundQueue.tryPushBack(std::move(entry)) != llarp::thread::QueueReturn::Success) { m_queueStats.dropped++; - DoCallback(callback_copy, SendStatus::Congestion); + DoCallback(callback, SendStatus::Congestion); } else { m_queueStats.queued++; - uint32_t queueSize = outboundQueue.size(); m_queueStats.queueWatermark = std::max(queueSize, m_queueStats.queueWatermark); } @@ -272,7 +266,7 @@ namespace llarp } else { - DoCallback(entry.message.second, SendStatus::Congestion); + DoCallback(entry.inform, SendStatus::Congestion); m_queueStats.dropped++; } } @@ -288,7 +282,7 @@ namespace llarp while (not routing_mq.empty()) { const MessageQueueEntry& entry = routing_mq.top(); - Send(entry.router, entry.message); + Send(entry); routing_mq.pop(); } @@ -331,7 +325,7 @@ namespace llarp { const MessageQueueEntry& entry = message_queue.top(); - Send(entry.router, entry.message); + Send(entry); message_queue.pop(); consecutive_empty = 0; @@ -380,11 +374,11 @@ namespace llarp if (status == SendStatus::Success) { - Send(entry.router, entry.message); + Send(entry); } else { - DoCallback(entry.message.second, status); + DoCallback(entry.inform, status); } movedMessages.pop(); } diff --git a/llarp/router/outbound_message_handler.hpp b/llarp/router/outbound_message_handler.hpp index 3edb6d917..9e4c7fcdc 100644 --- a/llarp/router/outbound_message_handler.hpp +++ b/llarp/router/outbound_message_handler.hpp @@ -74,15 +74,14 @@ namespace llarp Init(AbstractRouter* router); private: - using Message = std::pair, SendStatusHandler>; - /* A message that has been queued for sending, but not yet * processed into an individual path's message queue. */ struct MessageQueueEntry { uint16_t priority; - Message message; + std::vector message; + SendStatusHandler inform; PathID_t pathid; RouterID router; @@ -131,14 +130,14 @@ namespace llarp * returns the result of the call to LinkManager::SendTo() */ bool - Send(const RouterID& remote, const Message& msg); + Send(const MessageQueueEntry& ent); /* Sends the message along to the link layer if we have a session to the remote * * returns the result of the Send() call, or false if no session. */ bool - SendIfSession(const RouterID& remote, const Message& msg); + SendIfSession(const MessageQueueEntry& ent); /* queues a message to the shared outbound message queue. * @@ -149,8 +148,7 @@ namespace llarp * are placed in their paths' respective individual queues. */ bool - QueueOutboundMessage( - const RouterID& remote, Message&& msg, const PathID_t& pathid, uint16_t priority = 0); + QueueOutboundMessage(MessageQueueEntry entry); /* Processes messages on the shared message queue into their paths' respective * individual queues. diff --git a/llarp/router/rc_gossiper.cpp b/llarp/router/rc_gossiper.cpp index 7eb8aa88a..70493523b 100644 --- a/llarp/router/rc_gossiper.cpp +++ b/llarp/router/rc_gossiper.cpp @@ -116,7 +116,7 @@ namespace llarp m_router->NotifyRouterEvent(m_router->pubkey(), rc); // send message - peerSession->SendMessageBuffer(std::move(msg), nullptr); + peerSession->SendMessageBuffer(std::move(msg), nullptr, gossip.Priority()); }); return true; }