From 4185d47d4be0c2928a56329c6adb2a459f77f9db Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Fri, 17 Jan 2020 11:33:37 -0500 Subject: [PATCH] link layer message priority --- llarp/messages/link_intro.hpp | 7 ++++++ llarp/messages/link_message.hpp | 7 ++++++ llarp/messages/relay.hpp | 11 +++++++++ llarp/messages/relay_commit.hpp | 6 +++++ llarp/messages/relay_status.hpp | 5 ++++ llarp/router/outbound_message_handler.cpp | 28 ++++++++++++++--------- llarp/router/outbound_message_handler.hpp | 11 +++++++-- 7 files changed, 62 insertions(+), 13 deletions(-) diff --git a/llarp/messages/link_intro.hpp b/llarp/messages/link_intro.hpp index 0e37b6669..054691384 100644 --- a/llarp/messages/link_intro.hpp +++ b/llarp/messages/link_intro.hpp @@ -45,6 +45,13 @@ namespace llarp { return "LinkIntro"; } + + // always first + uint16_t + Priority() const override + { + return std::numeric_limits< uint16_t >::max(); + } }; } // namespace llarp diff --git a/llarp/messages/link_message.hpp b/llarp/messages/link_message.hpp index dd838495f..1c5eddf2f 100644 --- a/llarp/messages/link_message.hpp +++ b/llarp/messages/link_message.hpp @@ -55,6 +55,13 @@ namespace llarp // the name of this kind of message virtual const char* Name() const = 0; + + /// get message prority, higher value means more important + virtual uint16_t + Priority() const + { + return 1; + } }; } // namespace llarp diff --git a/llarp/messages/relay.hpp b/llarp/messages/relay.hpp index d28654995..9ef671289 100644 --- a/llarp/messages/relay.hpp +++ b/llarp/messages/relay.hpp @@ -32,6 +32,11 @@ namespace llarp { return "RelayUpstream"; } + uint16_t + Priority() const override + { + return 0; + } }; struct RelayDownstreamMessage : public ILinkMessage @@ -56,6 +61,12 @@ namespace llarp { return "RelayDownstream"; } + + uint16_t + Priority() const override + { + return 0; + } }; } // namespace llarp diff --git a/llarp/messages/relay_commit.hpp b/llarp/messages/relay_commit.hpp index 7c54349fc..8f432e320 100644 --- a/llarp/messages/relay_commit.hpp +++ b/llarp/messages/relay_commit.hpp @@ -79,6 +79,12 @@ namespace llarp { return "RelayCommit"; } + + virtual uint16_t + Priority() const override + { + return 5; + } }; } // namespace llarp diff --git a/llarp/messages/relay_status.hpp b/llarp/messages/relay_status.hpp index 77ad47592..23f2f7b95 100644 --- a/llarp/messages/relay_status.hpp +++ b/llarp/messages/relay_status.hpp @@ -103,6 +103,11 @@ namespace llarp { return "RelayStatus"; } + virtual uint16_t + Priority() const override + { + return 6; + } }; } // namespace llarp diff --git a/llarp/router/outbound_message_handler.cpp b/llarp/router/outbound_message_handler.cpp index 87d63178f..9e2e1b640 100644 --- a/llarp/router/outbound_message_handler.cpp +++ b/llarp/router/outbound_message_handler.cpp @@ -24,6 +24,7 @@ namespace llarp const ILinkMessage *msg, SendStatusHandler callback) { + const uint16_t priority = msg->Priority(); std::array< byte_t, MAX_LINK_MSG_SIZE > linkmsg_buffer; llarp_buffer_t buf(linkmsg_buffer); @@ -40,7 +41,7 @@ namespace llarp if(_linkManager->HasSessionTo(remote)) { - QueueOutboundMessage(remote, std::move(message), msg->pathid); + QueueOutboundMessage(remote, std::move(message), msg->pathid, priority); return true; } @@ -53,8 +54,9 @@ namespace llarp pendingSessionMessageQueues.emplace(remote, MessageQueue()); MessageQueueEntry entry; - entry.message = message; - entry.router = remote; + entry.priority = priority; + entry.message = message; + entry.router = remote; itr_pair.first->second.push(std::move(entry)); shouldCreateSession = itr_pair.second; @@ -232,13 +234,15 @@ namespace llarp bool OutboundMessageHandler::QueueOutboundMessage(const RouterID &remote, Message &&msg, - const PathID_t &pathid) + const PathID_t &pathid, + uint16_t priority) { MessageQueueEntry entry; entry.message = std::move(msg); auto callback_copy = entry.message.second; entry.router = remote; entry.pathid = pathid; + entry.priority = priority; if(outboundQueue.tryPushBack(std::move(entry)) != llarp::thread::QueueReturn::Success) { @@ -274,11 +278,13 @@ namespace llarp } MessageQueue &path_queue = itr_pair.first->second; + /* if(path_queue.size() >= MAX_PATH_QUEUE_SIZE) { m_queueStats.dropped++; path_queue.pop(); // head drop } + */ path_queue.push(std::move(entry)); } } @@ -310,10 +316,9 @@ namespace llarp auto &non_routing_mq = outboundMessageQueues[zeroID]; while(not non_routing_mq.empty()) { - MessageQueueEntry entry = std::move(non_routing_mq.front()); - non_routing_mq.pop(); - + const MessageQueueEntry &entry = non_routing_mq.top(); Send(entry.router, entry.message); + non_routing_mq.pop(); } size_t empty_count = 0; @@ -349,10 +354,11 @@ namespace llarp auto &message_queue = outboundMessageQueues[pathid]; if(message_queue.size() > 0) { - MessageQueueEntry entry = std::move(message_queue.front()); - message_queue.pop(); + const MessageQueueEntry &entry = message_queue.top(); Send(entry.router, entry.message); + message_queue.pop(); + empty_count = 0; sent_count++; } @@ -395,8 +401,7 @@ namespace llarp while(!movedMessages.empty()) { - MessageQueueEntry entry = std::move(movedMessages.front()); - movedMessages.pop(); + const MessageQueueEntry &entry = movedMessages.top(); if(status == SendStatus::Success) { @@ -406,6 +411,7 @@ namespace llarp { DoCallback(entry.message.second, status); } + movedMessages.pop(); } } diff --git a/llarp/router/outbound_message_handler.hpp b/llarp/router/outbound_message_handler.hpp index 7a201f850..2846f0c85 100644 --- a/llarp/router/outbound_message_handler.hpp +++ b/llarp/router/outbound_message_handler.hpp @@ -49,9 +49,16 @@ namespace llarp struct MessageQueueEntry { + uint16_t priority; Message message; PathID_t pathid; RouterID router; + + bool + operator<(const MessageQueueEntry &other) const + { + return other.priority < priority; + } }; struct MessageQueueStats @@ -65,7 +72,7 @@ namespace llarp uint32_t numTicks = 0; }; - using MessageQueue = std::queue< MessageQueueEntry >; + using MessageQueue = std::priority_queue< MessageQueueEntry >; void OnSessionEstablished(const RouterID &router); @@ -102,7 +109,7 @@ namespace llarp bool QueueOutboundMessage(const RouterID &remote, Message &&msg, - const PathID_t &pathid); + const PathID_t &pathid, uint16_t priority = 0); void ProcessOutboundQueue();