From 822f529be88e97359326c680a97cc60a1ebc50e6 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Fri, 26 Jul 2019 12:19:31 -0400 Subject: [PATCH] add link layer delivery feedback --- llarp/link/i_link_manager.hpp | 3 +- llarp/link/link_manager.cpp | 9 ++- llarp/link/link_manager.hpp | 3 +- llarp/link/server.cpp | 5 +- llarp/link/server.hpp | 3 +- llarp/link/session.hpp | 13 +++- llarp/router/outbound_message_handler.cpp | 18 ++++-- llarp/utp/session.cpp | 77 +++++++++++++++-------- llarp/utp/session.hpp | 48 ++++++++++++-- test/link/test_llarp_link.cpp | 4 +- 10 files changed, 136 insertions(+), 47 deletions(-) diff --git a/llarp/link/i_link_manager.hpp b/llarp/link/i_link_manager.hpp index 7015b8e67..7ba246980 100644 --- a/llarp/link/i_link_manager.hpp +++ b/llarp/link/i_link_manager.hpp @@ -34,7 +34,8 @@ namespace llarp GetSessionMaker() const = 0; virtual bool - SendTo(const RouterID &remote, const llarp_buffer_t &buf) = 0; + SendTo(const RouterID &remote, const llarp_buffer_t &buf, + ILinkSession::CompletionHandler completed) = 0; virtual bool HasSessionTo(const RouterID &remote) const = 0; diff --git a/llarp/link/link_manager.cpp b/llarp/link/link_manager.cpp index d62e6c2cf..8c6c5a42f 100644 --- a/llarp/link/link_manager.cpp +++ b/llarp/link/link_manager.cpp @@ -35,7 +35,8 @@ namespace llarp } bool - LinkManager::SendTo(const RouterID &remote, const llarp_buffer_t &buf) + LinkManager::SendTo(const RouterID &remote, const llarp_buffer_t &buf, + ILinkSession::CompletionHandler completed) { if(stopping) return false; @@ -43,10 +44,14 @@ namespace llarp auto link = GetLinkWithSessionTo(remote); if(link == nullptr) { + if(completed) + { + completed(ILinkSession::DeliveryStatus::eDeliveryDropped); + } return false; } - return link->SendTo(remote, buf); + return link->SendTo(remote, buf, completed); } bool diff --git a/llarp/link/link_manager.hpp b/llarp/link/link_manager.hpp index ef6aa5c30..4e9296781 100644 --- a/llarp/link/link_manager.hpp +++ b/llarp/link/link_manager.hpp @@ -27,7 +27,8 @@ namespace llarp GetSessionMaker() const override; bool - SendTo(const RouterID &remote, const llarp_buffer_t &buf) override; + SendTo(const RouterID &remote, const llarp_buffer_t &buf, + ILinkSession::CompletionHandler completed) override; bool HasSessionTo(const RouterID &remote) const override; diff --git a/llarp/link/server.cpp b/llarp/link/server.cpp index 739dfef3b..ec6da7c3d 100644 --- a/llarp/link/server.cpp +++ b/llarp/link/server.cpp @@ -345,7 +345,8 @@ namespace llarp } bool - ILinkLayer::SendTo(const RouterID& remote, const llarp_buffer_t& buf) + ILinkLayer::SendTo(const RouterID& remote, const llarp_buffer_t& buf, + ILinkSession::CompletionHandler completed) { ILinkSession* s = nullptr; { @@ -366,7 +367,7 @@ namespace llarp ++itr; } } - return s && s->SendMessageBuffer(buf); + return s && s->SendMessageBuffer(buf, completed); } bool diff --git a/llarp/link/server.hpp b/llarp/link/server.hpp index 170c9756e..74f3cf3cf 100644 --- a/llarp/link/server.hpp +++ b/llarp/link/server.hpp @@ -141,7 +141,8 @@ namespace llarp KeepAliveSessionTo(const RouterID& remote); bool - SendTo(const RouterID& remote, const llarp_buffer_t& buf); + SendTo(const RouterID& remote, const llarp_buffer_t& buf, + ILinkSession::CompletionHandler completed); bool GetOurAddressInfo(AddressInfo& addr) const; diff --git a/llarp/link/session.hpp b/llarp/link/session.hpp index c7cdb98d4..5b904f633 100644 --- a/llarp/link/session.hpp +++ b/llarp/link/session.hpp @@ -13,12 +13,20 @@ namespace llarp struct LinkIntroMessage; struct ILinkMessage; struct ILinkLayer; + struct ILinkSession { virtual ~ILinkSession() { } + /// delivery status of a message + enum class DeliveryStatus + { + eDeliverySuccess = 0, + eDeliveryDropped = 1 + }; + /// hook for utp for when we have established a connection virtual void OnLinkEstablished(ILinkLayer *p) = 0; @@ -30,9 +38,12 @@ namespace llarp /// called every timer tick virtual void Tick(llarp_time_t) = 0; + /// message delivery result hook function + using CompletionHandler = std::function< void(DeliveryStatus) >; + /// send a message buffer to the remote endpoint virtual bool - SendMessageBuffer(const llarp_buffer_t &) = 0; + SendMessageBuffer(const llarp_buffer_t &, CompletionHandler handler) = 0; /// start the connection virtual void diff --git a/llarp/router/outbound_message_handler.cpp b/llarp/router/outbound_message_handler.cpp index c33257d03..d5102a2ed 100644 --- a/llarp/router/outbound_message_handler.cpp +++ b/llarp/router/outbound_message_handler.cpp @@ -168,14 +168,20 @@ namespace llarp bool OutboundMessageHandler::Send(const RouterID &remote, const Message &msg) { - llarp_buffer_t buf(msg.first); - if(_linkManager->SendTo(remote, buf)) + const llarp_buffer_t buf(msg.first); + auto callback = msg.second; + if(!_linkManager->SendTo( + remote, buf, [=](ILinkSession::DeliveryStatus status) { + if(status == ILinkSession::DeliveryStatus::eDeliverySuccess) + DoCallback(callback, SendStatus::Success); + else + DoCallback(callback, SendStatus::Congestion); + })) { - DoCallback(msg.second, SendStatus::Success); - return true; + DoCallback(callback, SendStatus::Congestion); + return false; } - DoCallback(msg.second, SendStatus::Congestion); - return false; + return true; } bool diff --git a/llarp/utp/session.cpp b/llarp/utp/session.cpp index b1cfcfdf8..ea991c97a 100644 --- a/llarp/utp/session.cpp +++ b/llarp/utp/session.cpp @@ -26,12 +26,15 @@ namespace llarp return; ssize_t expect = 0; std::vector< utp_iovec > send; - for(const auto& vec : vecq) + for(const auto& msg : sendq) { - if(vec.iov_len) + for(const auto& vec : msg.vecs) { - expect += vec.iov_len; - send.emplace_back(vec); + if(vec.iov_len) + { + expect += vec.iov_len; + send.emplace_back(vec); + } } } if(expect) @@ -46,18 +49,28 @@ namespace llarp RouterID(remoteRC.pubkey).ToString()); m_TXRate += s; size_t sz = s; - while(vecq.size() && sz >= vecq.front().iov_len) - { - sz -= vecq.front().iov_len; - vecq.pop_front(); - sendq.pop_front(); - } - if(vecq.size()) + do { - auto& front = vecq.front(); - front.iov_len -= sz; - front.iov_base = ((byte_t*)front.iov_base) + sz; - } + auto& msg = sendq.front(); + while(msg.vecs.size() && sz >= msg.vecs.front().iov_len) + { + sz -= msg.vecs.front().iov_len; + msg.vecs.pop_front(); + msg.fragments.pop_front(); + } + if(msg.vecs.size() == 0) + { + msg.Delivered(); + sendq.pop_front(); + } + else + { + auto& front = msg.vecs.front(); + front.iov_len -= sz; + front.iov_base = ((byte_t*)front.iov_base) + sz; + return; + } + } while(sendq.size()); } } @@ -257,14 +270,16 @@ namespace llarp } bool - Session::SendMessageBuffer(const llarp_buffer_t& buf) + Session::SendMessageBuffer( + const llarp_buffer_t& buf, + ILinkSession::CompletionHandler completionHandler) { - if(sendq.size() >= MaxSendQueueSize) + if(SendQueueBacklog() >= MaxSendQueueSize) { // pump write queue if we seem to be full PumpWrite(); } - if(sendq.size() >= MaxSendQueueSize) + if(SendQueueBacklog() >= MaxSendQueueSize) { // we didn't pump anything wtf // this means we're stalled @@ -273,12 +288,14 @@ namespace llarp size_t sz = buf.sz; byte_t* ptr = buf.base; uint32_t msgid = m_NextTXMsgID++; + sendq.emplace_back(msgid, completionHandler); while(sz) { uint32_t s = std::min(FragmentBodyPayloadSize, sz); if(!EncryptThenHash(ptr, msgid, s, sz - s)) { LogError("EncryptThenHash failed?!"); + Close(); return false; } LogDebug("encrypted ", s, " bytes"); @@ -302,7 +319,7 @@ namespace llarp return false; buf.sz = buf.cur - buf.base; buf.cur = buf.base; - return this->SendMessageBuffer(buf); + return this->SendMessageBuffer(buf, nullptr); } return true; } @@ -340,7 +357,7 @@ namespace llarp buf.sz = buf.cur - buf.base; buf.cur = buf.base; // send - if(!SendMessageBuffer(buf)) + if(!SendMessageBuffer(buf, nullptr)) { LogError("failed to send handshake to ", remoteAddr); Close(); @@ -370,10 +387,11 @@ namespace llarp uint16_t remaining) { - sendq.emplace_back(); - auto& buf = sendq.back(); - vecq.emplace_back(); - auto& vec = vecq.back(); + auto& msg = sendq.back(); + msg.vecs.emplace_back(); + auto& vec = msg.vecs.back(); + msg.fragments.emplace_back(); + auto& buf = msg.fragments.back(); vec.iov_base = buf.data(); vec.iov_len = FragmentBufferSize; buf.Randomize(); @@ -472,7 +490,7 @@ namespace llarp buf.sz = buf.cur - buf.base; buf.cur = buf.base; // send message - if(!SendMessageBuffer(buf)) + if(!SendMessageBuffer(buf, nullptr)) return false; // regen our tx Key return DoClientKeyExchange(txKey, lim.N, remoteRC.enckey, @@ -600,6 +618,13 @@ namespace llarp metrics::integerTick("utp.session.close", "to", 1, "id", RouterID(remoteRC.pubkey).ToString()); } + // discard sendq + // TODO: retry on another session ? + while(sendq.size()) + { + sendq.front().Dropped(); + sendq.pop_front(); + } } } EnterState(eClose); @@ -675,7 +700,7 @@ namespace llarp buf.sz = buf.cur - buf.base; buf.cur = buf.base; // send - if(!SendMessageBuffer(buf)) + if(!SendMessageBuffer(buf, nullptr)) { LogError("failed to repl to handshake from ", remoteAddr); Close(); diff --git a/llarp/utp/session.hpp b/llarp/utp/session.hpp index b3e2ea4cb..e57f3de45 100644 --- a/llarp/utp/session.hpp +++ b/llarp/utp/session.hpp @@ -39,10 +39,45 @@ namespace llarp /// session timeout (60s) const static llarp_time_t sessionTimeout = DefaultLinkSessionLifetime; - /// send queue for utp - std::deque< utp_iovec > vecq; - /// tx fragment queue - std::deque< FragmentBuffer > sendq; + struct OutboundMessage + { + OutboundMessage(uint32_t id, CompletionHandler func) + : msgid{id}, completed{func} + { + } + + const uint32_t msgid; + std::deque< utp_iovec > vecs; + std::deque< FragmentBuffer > fragments; + CompletionHandler completed; + + void + Dropped() + { + if(completed) + { + completed(DeliveryStatus::eDeliveryDropped); + completed = nullptr; + } + } + + void + Delivered() + { + if(completed) + { + completed(DeliveryStatus::eDeliverySuccess); + completed = nullptr; + } + } + + bool + operator<(const OutboundMessage& other) const + { + return msgid < other.msgid; + } + }; + /// current rx fragment buffer FragmentBuffer recvBuf; /// current offset in current rx fragment buffer @@ -54,6 +89,8 @@ namespace llarp uint32_t m_NextTXMsgID; /// the next message id for rx uint32_t m_NextRXMsgID; + /// messages we are currently sending + std::deque< OutboundMessage > sendq; /// messages we are recving right now std::unordered_map< uint32_t, InboundMessage > m_RecvMsgs; /// are we stalled or nah? @@ -137,7 +174,8 @@ namespace llarp /// queue a fully formed message bool - SendMessageBuffer(const llarp_buffer_t& buf) override; + SendMessageBuffer(const llarp_buffer_t& buf, + ILinkSession::CompletionHandler) override; /// prune expired inbound messages void diff --git a/test/link/test_llarp_link.cpp b/test/link/test_llarp_link.cpp index 2e1cb854b..3c83b5050 100644 --- a/test/link/test_llarp_link.cpp +++ b/test/link/test_llarp_link.cpp @@ -216,7 +216,7 @@ TEST_F(LinkLayerTest, TestUTPAliceRenegWithBob) return false; otherBuf.sz = otherBuf.cur - otherBuf.base; otherBuf.cur = otherBuf.base; - return s->SendMessageBuffer(otherBuf); + return s->SendMessageBuffer(otherBuf, nullptr); }; Bob.link = utp::NewServer( @@ -327,7 +327,7 @@ TEST_F(LinkLayerTest, TestUTPAliceConnectToBob) return; otherBuf.sz = otherBuf.cur - otherBuf.base; otherBuf.cur = otherBuf.base; - self->SendMessageBuffer(otherBuf); + self->SendMessageBuffer(otherBuf, nullptr); }}); return true; },