link layer message priority

pull/1053/head
Jeff Becker 4 years ago
parent a681c28e5f
commit 4185d47d4b
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -45,6 +45,13 @@ namespace llarp
{
return "LinkIntro";
}
// always first
uint16_t
Priority() const override
{
return std::numeric_limits< uint16_t >::max();
}
};
} // namespace llarp

@ -55,6 +55,13 @@ namespace llarp
// the name of this kind of message
virtual const char*
Name() const = 0;
/// get message prority, higher value means more important
virtual uint16_t
Priority() const
{
return 1;
}
};
} // namespace llarp

@ -32,6 +32,11 @@ namespace llarp
{
return "RelayUpstream";
}
uint16_t
Priority() const override
{
return 0;
}
};
struct RelayDownstreamMessage : public ILinkMessage
@ -56,6 +61,12 @@ namespace llarp
{
return "RelayDownstream";
}
uint16_t
Priority() const override
{
return 0;
}
};
} // namespace llarp

@ -79,6 +79,12 @@ namespace llarp
{
return "RelayCommit";
}
virtual uint16_t
Priority() const override
{
return 5;
}
};
} // namespace llarp

@ -103,6 +103,11 @@ namespace llarp
{
return "RelayStatus";
}
virtual uint16_t
Priority() const override
{
return 6;
}
};
} // namespace llarp

@ -24,6 +24,7 @@ namespace llarp
const ILinkMessage *msg,
SendStatusHandler callback)
{
const uint16_t priority = msg->Priority();
std::array< byte_t, MAX_LINK_MSG_SIZE > linkmsg_buffer;
llarp_buffer_t buf(linkmsg_buffer);
@ -40,7 +41,7 @@ namespace llarp
if(_linkManager->HasSessionTo(remote))
{
QueueOutboundMessage(remote, std::move(message), msg->pathid);
QueueOutboundMessage(remote, std::move(message), msg->pathid, priority);
return true;
}
@ -53,8 +54,9 @@ namespace llarp
pendingSessionMessageQueues.emplace(remote, MessageQueue());
MessageQueueEntry entry;
entry.message = message;
entry.router = remote;
entry.priority = priority;
entry.message = message;
entry.router = remote;
itr_pair.first->second.push(std::move(entry));
shouldCreateSession = itr_pair.second;
@ -232,13 +234,15 @@ namespace llarp
bool
OutboundMessageHandler::QueueOutboundMessage(const RouterID &remote,
Message &&msg,
const PathID_t &pathid)
const PathID_t &pathid,
uint16_t priority)
{
MessageQueueEntry entry;
entry.message = std::move(msg);
auto callback_copy = entry.message.second;
entry.router = remote;
entry.pathid = pathid;
entry.priority = priority;
if(outboundQueue.tryPushBack(std::move(entry))
!= llarp::thread::QueueReturn::Success)
{
@ -274,11 +278,13 @@ namespace llarp
}
MessageQueue &path_queue = itr_pair.first->second;
/*
if(path_queue.size() >= MAX_PATH_QUEUE_SIZE)
{
m_queueStats.dropped++;
path_queue.pop(); // head drop
}
*/
path_queue.push(std::move(entry));
}
}
@ -310,10 +316,9 @@ namespace llarp
auto &non_routing_mq = outboundMessageQueues[zeroID];
while(not non_routing_mq.empty())
{
MessageQueueEntry entry = std::move(non_routing_mq.front());
non_routing_mq.pop();
const MessageQueueEntry &entry = non_routing_mq.top();
Send(entry.router, entry.message);
non_routing_mq.pop();
}
size_t empty_count = 0;
@ -349,10 +354,11 @@ namespace llarp
auto &message_queue = outboundMessageQueues[pathid];
if(message_queue.size() > 0)
{
MessageQueueEntry entry = std::move(message_queue.front());
message_queue.pop();
const MessageQueueEntry &entry = message_queue.top();
Send(entry.router, entry.message);
message_queue.pop();
empty_count = 0;
sent_count++;
}
@ -395,8 +401,7 @@ namespace llarp
while(!movedMessages.empty())
{
MessageQueueEntry entry = std::move(movedMessages.front());
movedMessages.pop();
const MessageQueueEntry &entry = movedMessages.top();
if(status == SendStatus::Success)
{
@ -406,6 +411,7 @@ namespace llarp
{
DoCallback(entry.message.second, status);
}
movedMessages.pop();
}
}

@ -49,9 +49,16 @@ namespace llarp
struct MessageQueueEntry
{
uint16_t priority;
Message message;
PathID_t pathid;
RouterID router;
bool
operator<(const MessageQueueEntry &other) const
{
return other.priority < priority;
}
};
struct MessageQueueStats
@ -65,7 +72,7 @@ namespace llarp
uint32_t numTicks = 0;
};
using MessageQueue = std::queue< MessageQueueEntry >;
using MessageQueue = std::priority_queue< MessageQueueEntry >;
void
OnSessionEstablished(const RouterID &router);
@ -102,7 +109,7 @@ namespace llarp
bool
QueueOutboundMessage(const RouterID &remote, Message &&msg,
const PathID_t &pathid);
const PathID_t &pathid, uint16_t priority = 0);
void
ProcessOutboundQueue();

Loading…
Cancel
Save