propagate link layer message priority to link layer so it can order retransmissions with that in mind

pull/1907/head
Jeff 2 years ago committed by Jeff
parent a80f7e1cf6
commit 8960ca08f3
No known key found for this signature in database
GPG Key ID: 025C02EE3A092F2D

@ -10,12 +10,14 @@ namespace llarp
uint64_t msgid, uint64_t msgid,
ILinkSession::Message_t msg, ILinkSession::Message_t msg,
llarp_time_t now, llarp_time_t now,
ILinkSession::CompletionHandler handler) ILinkSession::CompletionHandler handler,
uint16_t priority)
: m_Data{std::move(msg)} : m_Data{std::move(msg)}
, m_MsgID{msgid} , m_MsgID{msgid}
, m_Completed{handler} , m_Completed{handler}
, m_LastFlush{now} , m_LastFlush{now}
, m_StartedAt{now} , m_StartedAt{now}
, m_ResendPriority{priority}
{ {
const llarp_buffer_t buf(m_Data); const llarp_buffer_t buf(m_Data);
CryptoManager::instance()->shorthash(m_Digest, buf); CryptoManager::instance()->shorthash(m_Digest, buf);

@ -40,7 +40,8 @@ namespace llarp
uint64_t msgid, uint64_t msgid,
ILinkSession::Message_t data, ILinkSession::Message_t data,
llarp_time_t now, llarp_time_t now,
ILinkSession::CompletionHandler handler); ILinkSession::CompletionHandler handler,
uint16_t priority);
ILinkSession::Message_t m_Data; ILinkSession::Message_t m_Data;
uint64_t m_MsgID = 0; uint64_t m_MsgID = 0;
@ -49,6 +50,15 @@ namespace llarp
llarp_time_t m_LastFlush = 0s; llarp_time_t m_LastFlush = 0s;
ShortHash m_Digest; ShortHash m_Digest;
llarp_time_t m_StartedAt = 0s; llarp_time_t m_StartedAt = 0s;
uint16_t m_ResendPriority;
bool
operator<(const OutboundMessage& msg) const
{
// yes, the first order is reversed as higher means more important
// second part is for queue order
return msg.m_ResendPriority < m_ResendPriority or m_MsgID < msg.m_MsgID;
}
ILinkSession::Packet_t ILinkSession::Packet_t
XMIT() const; XMIT() const;

@ -5,6 +5,8 @@
#include <llarp/util/meta/memfn.hpp> #include <llarp/util/meta/memfn.hpp>
#include <llarp/router/abstractrouter.hpp> #include <llarp/router/abstractrouter.hpp>
#include <queue>
namespace llarp namespace llarp
{ {
namespace iwp namespace iwp
@ -127,7 +129,7 @@ namespace llarp
{ {
LogError("failed to encode LIM for ", m_RemoteAddr); LogError("failed to encode LIM for ", m_RemoteAddr);
} }
if (!SendMessageBuffer(std::move(data), h)) if (not SendMessageBuffer(std::move(data), h))
{ {
LogError("failed to send LIM to ", m_RemoteAddr); LogError("failed to send LIM to ", m_RemoteAddr);
} }
@ -183,7 +185,7 @@ namespace llarp
bool bool
Session::SendMessageBuffer( Session::SendMessageBuffer(
ILinkSession::Message_t buf, ILinkSession::CompletionHandler completed) ILinkSession::Message_t buf, ILinkSession::CompletionHandler completed, uint16_t priority)
{ {
if (m_TXMsgs.size() >= MaxSendQueueSize) if (m_TXMsgs.size() >= MaxSendQueueSize)
{ {
@ -194,8 +196,9 @@ namespace llarp
const auto now = m_Parent->Now(); const auto now = m_Parent->Now();
const auto msgid = m_TXID++; const auto msgid = m_TXID++;
const auto bufsz = buf.size(); const auto bufsz = buf.size();
auto& msg = m_TXMsgs.emplace(msgid, OutboundMessage{msgid, std::move(buf), now, completed}) auto& msg =
.first->second; m_TXMsgs.emplace(msgid, OutboundMessage{msgid, std::move(buf), now, completed, priority})
.first->second;
TriggerPump(); TriggerPump();
EncryptAndSend(msg.XMIT()); EncryptAndSend(msg.XMIT());
if (bufsz > FragmentSize) if (bufsz > FragmentSize)
@ -253,15 +256,22 @@ namespace llarp
msg.SendACKS(util::memFn(&Session::EncryptAndSend, this), now); msg.SendACKS(util::memFn(&Session::EncryptAndSend, this), now);
} }
} }
std::priority_queue<
OutboundMessage*,
std::vector<OutboundMessage*>,
ComparePtr<OutboundMessage*>>
resend;
for (auto& [id, msg] : m_TXMsgs) for (auto& [id, msg] : m_TXMsgs)
{ {
if (msg.ShouldFlush(now)) if (msg.ShouldFlush(now))
{ resend.push(&msg);
msg.FlushUnAcked(util::memFn(&Session::EncryptAndSend, this), now); }
} if (not resend.empty())
{
for (auto& msg = resend.top(); not resend.empty(); resend.pop())
msg->FlushUnAcked(util::memFn(&Session::EncryptAndSend, this), now);
} }
} }
assert(shared_from_this().use_count() > 1);
if (not m_EncryptNext.empty()) if (not m_EncryptNext.empty())
{ {
m_Parent->QueueWork( m_Parent->QueueWork(

@ -60,7 +60,10 @@ namespace llarp
Tick(llarp_time_t now) override; Tick(llarp_time_t now) override;
bool bool
SendMessageBuffer(ILinkSession::Message_t msg, CompletionHandler resultHandler) override; SendMessageBuffer(
ILinkSession::Message_t msg,
CompletionHandler resultHandler,
uint16_t priority = 0) override;
void void
Send_LL(const byte_t* buf, size_t sz); Send_LL(const byte_t* buf, size_t sz);

@ -30,7 +30,8 @@ namespace llarp
SendTo( SendTo(
const RouterID& remote, const RouterID& remote,
const llarp_buffer_t& buf, const llarp_buffer_t& buf,
ILinkSession::CompletionHandler completed) = 0; ILinkSession::CompletionHandler completed,
uint16_t priority = 0) = 0;
virtual bool virtual bool
HasSessionTo(const RouterID& remote) const = 0; HasSessionTo(const RouterID& remote) const = 0;

@ -19,7 +19,7 @@ namespace llarp
// TODO: may want to add some memory of session failures for a given // TODO: may want to add some memory of session failures for a given
// router on a given link and not return that link here for a // router on a given link and not return that link here for a
// duration // duration
if (!link->IsCompatable(rc)) if (not link->IsCompatable(rc))
continue; continue;
return link; return link;
@ -36,7 +36,10 @@ namespace llarp
bool bool
LinkManager::SendTo( LinkManager::SendTo(
const RouterID& remote, const llarp_buffer_t& buf, ILinkSession::CompletionHandler completed) const RouterID& remote,
const llarp_buffer_t& buf,
ILinkSession::CompletionHandler completed,
uint16_t priority)
{ {
if (stopping) if (stopping)
return false; return false;
@ -51,7 +54,7 @@ namespace llarp
return false; return false;
} }
return link->SendTo(remote, buf, completed); return link->SendTo(remote, buf, completed, priority);
} }
bool bool

@ -28,7 +28,8 @@ namespace llarp
SendTo( SendTo(
const RouterID& remote, const RouterID& remote,
const llarp_buffer_t& buf, const llarp_buffer_t& buf,
ILinkSession::CompletionHandler completed) override; ILinkSession::CompletionHandler completed,
uint16_t priority) override;
bool bool
HasSessionTo(const RouterID& remote) const override; HasSessionTo(const RouterID& remote) const override;

@ -440,7 +440,10 @@ namespace llarp
bool bool
ILinkLayer::SendTo( ILinkLayer::SendTo(
const RouterID& remote, const llarp_buffer_t& buf, ILinkSession::CompletionHandler completed) const RouterID& remote,
const llarp_buffer_t& buf,
ILinkSession::CompletionHandler completed,
uint16_t priority)
{ {
std::shared_ptr<ILinkSession> s; std::shared_ptr<ILinkSession> s;
{ {
@ -459,7 +462,7 @@ namespace llarp
} }
ILinkSession::Message_t pkt(buf.sz); ILinkSession::Message_t pkt(buf.sz);
std::copy_n(buf.base, buf.sz, pkt.begin()); std::copy_n(buf.base, buf.sz, pkt.begin());
return s && s->SendMessageBuffer(std::move(pkt), completed); return s && s->SendMessageBuffer(std::move(pkt), completed, priority);
} }
bool bool

@ -148,7 +148,8 @@ namespace llarp
SendTo( SendTo(
const RouterID& remote, const RouterID& remote,
const llarp_buffer_t& buf, const llarp_buffer_t& buf,
ILinkSession::CompletionHandler completed); ILinkSession::CompletionHandler completed,
uint16_t priority);
virtual bool virtual bool
GetOurAddressInfo(AddressInfo& addr) const; GetOurAddressInfo(AddressInfo& addr) const;

@ -57,7 +57,7 @@ namespace llarp
/// send a message buffer to the remote endpoint /// send a message buffer to the remote endpoint
virtual bool virtual bool
SendMessageBuffer(Message_t, CompletionHandler handler) = 0; SendMessageBuffer(Message_t, CompletionHandler handler, uint16_t priority) = 0;
/// start the connection /// start the connection
virtual void virtual void

@ -30,25 +30,28 @@ namespace llarp
DoCallback(callback, SendStatus::InvalidRouter); DoCallback(callback, SendStatus::InvalidRouter);
return true; return true;
} }
const uint16_t priority = msg.Priority(); MessageQueueEntry ent;
ent.router = remote;
ent.inform = std::move(callback);
ent.pathid = msg.pathid;
ent.priority = msg.Priority();
std::array<byte_t, MAX_LINK_MSG_SIZE> linkmsg_buffer; std::array<byte_t, MAX_LINK_MSG_SIZE> linkmsg_buffer;
llarp_buffer_t buf(linkmsg_buffer); llarp_buffer_t buf{linkmsg_buffer};
if (!EncodeBuffer(msg, buf)) if (!EncodeBuffer(msg, buf))
{ {
return false; return false;
} }
Message message; ent.message.resize(buf.sz);
message.first.resize(buf.sz);
message.second = callback;
std::copy_n(buf.base, buf.sz, message.first.data()); std::copy_n(buf.base, buf.sz, ent.message.data());
// if we have a session to the destination, queue the message and return // if we have a session to the destination, queue the message and return
if (_router->linkManager().HasSessionTo(remote)) if (_router->linkManager().HasSessionTo(remote))
{ {
QueueOutboundMessage(remote, std::move(message), msg.pathid, priority); QueueOutboundMessage(std::move(ent));
return true; return true;
} }
@ -58,16 +61,11 @@ namespace llarp
// in progress. // in progress.
bool shouldCreateSession = false; bool shouldCreateSession = false;
{ {
util::Lock l(_mutex); util::Lock l{_mutex};
// create queue for <remote> if it doesn't exist, and get iterator // create queue for <remote> if it doesn't exist, and get iterator
auto [queue_itr, is_new] = pendingSessionMessageQueues.emplace(remote, MessageQueue()); auto [queue_itr, is_new] = pendingSessionMessageQueues.emplace(remote, MessageQueue());
queue_itr->second.push(std::move(ent));
MessageQueueEntry entry;
entry.priority = priority;
entry.message = message;
entry.router = remote;
queue_itr->second.push(std::move(entry));
shouldCreateSession = is_new; shouldCreateSession = is_new;
} }
@ -86,7 +84,7 @@ namespace llarp
m_Killer.TryAccess([this]() { m_Killer.TryAccess([this]() {
recentlyRemovedPaths.Decay(); recentlyRemovedPaths.Decay();
ProcessOutboundQueue(); ProcessOutboundQueue();
if (/*bool more = */ SendRoundRobin()) if (SendRoundRobin())
_router->TriggerPump(); _router->TriggerPump();
}); });
} }
@ -190,52 +188,48 @@ namespace llarp
} }
bool bool
OutboundMessageHandler::Send(const RouterID& remote, const Message& msg) OutboundMessageHandler::Send(const MessageQueueEntry& ent)
{ {
const llarp_buffer_t buf(msg.first); const llarp_buffer_t buf{ent.message};
auto callback = msg.second;
m_queueStats.sent++; m_queueStats.sent++;
return _router->linkManager().SendTo(remote, buf, [=](ILinkSession::DeliveryStatus status) { SendStatusHandler callback = ent.inform;
if (status == ILinkSession::DeliveryStatus::eDeliverySuccess) return _router->linkManager().SendTo(
DoCallback(callback, SendStatus::Success); ent.router,
else buf,
{ [this, callback](ILinkSession::DeliveryStatus status) {
DoCallback(callback, SendStatus::Congestion); if (status == ILinkSession::DeliveryStatus::eDeliverySuccess)
} DoCallback(callback, SendStatus::Success);
}); else
{
DoCallback(callback, SendStatus::Congestion);
}
},
ent.priority);
} }
bool bool
OutboundMessageHandler::SendIfSession(const RouterID& remote, const Message& msg) OutboundMessageHandler::SendIfSession(const MessageQueueEntry& ent)
{ {
if (_router->linkManager().HasSessionTo(remote)) if (_router->linkManager().HasSessionTo(ent.router))
{ {
return Send(remote, msg); return Send(ent);
} }
return false; return false;
} }
bool bool
OutboundMessageHandler::QueueOutboundMessage( OutboundMessageHandler::QueueOutboundMessage(MessageQueueEntry entry)
const RouterID& remote, Message&& msg, const PathID_t& pathid, uint16_t priority)
{ {
MessageQueueEntry entry;
entry.message = std::move(msg);
// copy callback in case we need to call it, so we can std::move(entry) // copy callback in case we need to call it, so we can std::move(entry)
auto callback_copy = entry.message.second; auto callback = entry.inform;
entry.router = remote;
entry.pathid = pathid;
entry.priority = priority;
if (outboundQueue.tryPushBack(std::move(entry)) != llarp::thread::QueueReturn::Success) if (outboundQueue.tryPushBack(std::move(entry)) != llarp::thread::QueueReturn::Success)
{ {
m_queueStats.dropped++; m_queueStats.dropped++;
DoCallback(callback_copy, SendStatus::Congestion); DoCallback(callback, SendStatus::Congestion);
} }
else else
{ {
m_queueStats.queued++; m_queueStats.queued++;
uint32_t queueSize = outboundQueue.size(); uint32_t queueSize = outboundQueue.size();
m_queueStats.queueWatermark = std::max(queueSize, m_queueStats.queueWatermark); m_queueStats.queueWatermark = std::max(queueSize, m_queueStats.queueWatermark);
} }
@ -272,7 +266,7 @@ namespace llarp
} }
else else
{ {
DoCallback(entry.message.second, SendStatus::Congestion); DoCallback(entry.inform, SendStatus::Congestion);
m_queueStats.dropped++; m_queueStats.dropped++;
} }
} }
@ -288,7 +282,7 @@ namespace llarp
while (not routing_mq.empty()) while (not routing_mq.empty())
{ {
const MessageQueueEntry& entry = routing_mq.top(); const MessageQueueEntry& entry = routing_mq.top();
Send(entry.router, entry.message); Send(entry);
routing_mq.pop(); routing_mq.pop();
} }
@ -331,7 +325,7 @@ namespace llarp
{ {
const MessageQueueEntry& entry = message_queue.top(); const MessageQueueEntry& entry = message_queue.top();
Send(entry.router, entry.message); Send(entry);
message_queue.pop(); message_queue.pop();
consecutive_empty = 0; consecutive_empty = 0;
@ -380,11 +374,11 @@ namespace llarp
if (status == SendStatus::Success) if (status == SendStatus::Success)
{ {
Send(entry.router, entry.message); Send(entry);
} }
else else
{ {
DoCallback(entry.message.second, status); DoCallback(entry.inform, status);
} }
movedMessages.pop(); movedMessages.pop();
} }

@ -74,15 +74,14 @@ namespace llarp
Init(AbstractRouter* router); Init(AbstractRouter* router);
private: private:
using Message = std::pair<std::vector<byte_t>, SendStatusHandler>;
/* A message that has been queued for sending, but not yet /* A message that has been queued for sending, but not yet
* processed into an individual path's message queue. * processed into an individual path's message queue.
*/ */
struct MessageQueueEntry struct MessageQueueEntry
{ {
uint16_t priority; uint16_t priority;
Message message; std::vector<byte_t> message;
SendStatusHandler inform;
PathID_t pathid; PathID_t pathid;
RouterID router; RouterID router;
@ -131,14 +130,14 @@ namespace llarp
* returns the result of the call to LinkManager::SendTo() * returns the result of the call to LinkManager::SendTo()
*/ */
bool bool
Send(const RouterID& remote, const Message& msg); Send(const MessageQueueEntry& ent);
/* Sends the message along to the link layer if we have a session to the remote /* Sends the message along to the link layer if we have a session to the remote
* *
* returns the result of the Send() call, or false if no session. * returns the result of the Send() call, or false if no session.
*/ */
bool bool
SendIfSession(const RouterID& remote, const Message& msg); SendIfSession(const MessageQueueEntry& ent);
/* queues a message to the shared outbound message queue. /* queues a message to the shared outbound message queue.
* *
@ -149,8 +148,7 @@ namespace llarp
* are placed in their paths' respective individual queues. * are placed in their paths' respective individual queues.
*/ */
bool bool
QueueOutboundMessage( QueueOutboundMessage(MessageQueueEntry entry);
const RouterID& remote, Message&& msg, const PathID_t& pathid, uint16_t priority = 0);
/* Processes messages on the shared message queue into their paths' respective /* Processes messages on the shared message queue into their paths' respective
* individual queues. * individual queues.

@ -116,7 +116,7 @@ namespace llarp
m_router->NotifyRouterEvent<tooling::RCGossipSentEvent>(m_router->pubkey(), rc); m_router->NotifyRouterEvent<tooling::RCGossipSentEvent>(m_router->pubkey(), rc);
// send message // send message
peerSession->SendMessageBuffer(std::move(msg), nullptr); peerSession->SendMessageBuffer(std::move(msg), nullptr, gossip.Priority());
}); });
return true; return true;
} }

Loading…
Cancel
Save