add link layer delivery feedback

pull/737/head
Jeff Becker 5 years ago
parent b29ec20ad4
commit 822f529be8
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -34,7 +34,8 @@ namespace llarp
GetSessionMaker() const = 0;
virtual bool
SendTo(const RouterID &remote, const llarp_buffer_t &buf) = 0;
SendTo(const RouterID &remote, const llarp_buffer_t &buf,
ILinkSession::CompletionHandler completed) = 0;
virtual bool
HasSessionTo(const RouterID &remote) const = 0;

@ -35,7 +35,8 @@ namespace llarp
}
bool
LinkManager::SendTo(const RouterID &remote, const llarp_buffer_t &buf)
LinkManager::SendTo(const RouterID &remote, const llarp_buffer_t &buf,
ILinkSession::CompletionHandler completed)
{
if(stopping)
return false;
@ -43,10 +44,14 @@ namespace llarp
auto link = GetLinkWithSessionTo(remote);
if(link == nullptr)
{
if(completed)
{
completed(ILinkSession::DeliveryStatus::eDeliveryDropped);
}
return false;
}
return link->SendTo(remote, buf);
return link->SendTo(remote, buf, completed);
}
bool

@ -27,7 +27,8 @@ namespace llarp
GetSessionMaker() const override;
bool
SendTo(const RouterID &remote, const llarp_buffer_t &buf) override;
SendTo(const RouterID &remote, const llarp_buffer_t &buf,
ILinkSession::CompletionHandler completed) override;
bool
HasSessionTo(const RouterID &remote) const override;

@ -345,7 +345,8 @@ namespace llarp
}
bool
ILinkLayer::SendTo(const RouterID& remote, const llarp_buffer_t& buf)
ILinkLayer::SendTo(const RouterID& remote, const llarp_buffer_t& buf,
ILinkSession::CompletionHandler completed)
{
ILinkSession* s = nullptr;
{
@ -366,7 +367,7 @@ namespace llarp
++itr;
}
}
return s && s->SendMessageBuffer(buf);
return s && s->SendMessageBuffer(buf, completed);
}
bool

@ -141,7 +141,8 @@ namespace llarp
KeepAliveSessionTo(const RouterID& remote);
bool
SendTo(const RouterID& remote, const llarp_buffer_t& buf);
SendTo(const RouterID& remote, const llarp_buffer_t& buf,
ILinkSession::CompletionHandler completed);
bool
GetOurAddressInfo(AddressInfo& addr) const;

@ -13,12 +13,20 @@ namespace llarp
struct LinkIntroMessage;
struct ILinkMessage;
struct ILinkLayer;
struct ILinkSession
{
virtual ~ILinkSession()
{
}
/// delivery status of a message
enum class DeliveryStatus
{
eDeliverySuccess = 0,
eDeliveryDropped = 1
};
/// hook for utp for when we have established a connection
virtual void
OnLinkEstablished(ILinkLayer *p) = 0;
@ -30,9 +38,12 @@ namespace llarp
/// called every timer tick
virtual void Tick(llarp_time_t) = 0;
/// message delivery result hook function
using CompletionHandler = std::function< void(DeliveryStatus) >;
/// send a message buffer to the remote endpoint
virtual bool
SendMessageBuffer(const llarp_buffer_t &) = 0;
SendMessageBuffer(const llarp_buffer_t &, CompletionHandler handler) = 0;
/// start the connection
virtual void

@ -168,14 +168,20 @@ namespace llarp
bool
OutboundMessageHandler::Send(const RouterID &remote, const Message &msg)
{
llarp_buffer_t buf(msg.first);
if(_linkManager->SendTo(remote, buf))
const llarp_buffer_t buf(msg.first);
auto callback = msg.second;
if(!_linkManager->SendTo(
remote, buf, [=](ILinkSession::DeliveryStatus status) {
if(status == ILinkSession::DeliveryStatus::eDeliverySuccess)
DoCallback(callback, SendStatus::Success);
else
DoCallback(callback, SendStatus::Congestion);
}))
{
DoCallback(msg.second, SendStatus::Success);
return true;
DoCallback(callback, SendStatus::Congestion);
return false;
}
DoCallback(msg.second, SendStatus::Congestion);
return false;
return true;
}
bool

@ -26,12 +26,15 @@ namespace llarp
return;
ssize_t expect = 0;
std::vector< utp_iovec > send;
for(const auto& vec : vecq)
for(const auto& msg : sendq)
{
if(vec.iov_len)
for(const auto& vec : msg.vecs)
{
expect += vec.iov_len;
send.emplace_back(vec);
if(vec.iov_len)
{
expect += vec.iov_len;
send.emplace_back(vec);
}
}
}
if(expect)
@ -46,18 +49,28 @@ namespace llarp
RouterID(remoteRC.pubkey).ToString());
m_TXRate += s;
size_t sz = s;
while(vecq.size() && sz >= vecq.front().iov_len)
{
sz -= vecq.front().iov_len;
vecq.pop_front();
sendq.pop_front();
}
if(vecq.size())
do
{
auto& front = vecq.front();
front.iov_len -= sz;
front.iov_base = ((byte_t*)front.iov_base) + sz;
}
auto& msg = sendq.front();
while(msg.vecs.size() && sz >= msg.vecs.front().iov_len)
{
sz -= msg.vecs.front().iov_len;
msg.vecs.pop_front();
msg.fragments.pop_front();
}
if(msg.vecs.size() == 0)
{
msg.Delivered();
sendq.pop_front();
}
else
{
auto& front = msg.vecs.front();
front.iov_len -= sz;
front.iov_base = ((byte_t*)front.iov_base) + sz;
return;
}
} while(sendq.size());
}
}
@ -257,14 +270,16 @@ namespace llarp
}
bool
Session::SendMessageBuffer(const llarp_buffer_t& buf)
Session::SendMessageBuffer(
const llarp_buffer_t& buf,
ILinkSession::CompletionHandler completionHandler)
{
if(sendq.size() >= MaxSendQueueSize)
if(SendQueueBacklog() >= MaxSendQueueSize)
{
// pump write queue if we seem to be full
PumpWrite();
}
if(sendq.size() >= MaxSendQueueSize)
if(SendQueueBacklog() >= MaxSendQueueSize)
{
// we didn't pump anything wtf
// this means we're stalled
@ -273,12 +288,14 @@ namespace llarp
size_t sz = buf.sz;
byte_t* ptr = buf.base;
uint32_t msgid = m_NextTXMsgID++;
sendq.emplace_back(msgid, completionHandler);
while(sz)
{
uint32_t s = std::min(FragmentBodyPayloadSize, sz);
if(!EncryptThenHash(ptr, msgid, s, sz - s))
{
LogError("EncryptThenHash failed?!");
Close();
return false;
}
LogDebug("encrypted ", s, " bytes");
@ -302,7 +319,7 @@ namespace llarp
return false;
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
return this->SendMessageBuffer(buf);
return this->SendMessageBuffer(buf, nullptr);
}
return true;
}
@ -340,7 +357,7 @@ namespace llarp
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
// send
if(!SendMessageBuffer(buf))
if(!SendMessageBuffer(buf, nullptr))
{
LogError("failed to send handshake to ", remoteAddr);
Close();
@ -370,10 +387,11 @@ namespace llarp
uint16_t remaining)
{
sendq.emplace_back();
auto& buf = sendq.back();
vecq.emplace_back();
auto& vec = vecq.back();
auto& msg = sendq.back();
msg.vecs.emplace_back();
auto& vec = msg.vecs.back();
msg.fragments.emplace_back();
auto& buf = msg.fragments.back();
vec.iov_base = buf.data();
vec.iov_len = FragmentBufferSize;
buf.Randomize();
@ -472,7 +490,7 @@ namespace llarp
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
// send message
if(!SendMessageBuffer(buf))
if(!SendMessageBuffer(buf, nullptr))
return false;
// regen our tx Key
return DoClientKeyExchange(txKey, lim.N, remoteRC.enckey,
@ -600,6 +618,13 @@ namespace llarp
metrics::integerTick("utp.session.close", "to", 1, "id",
RouterID(remoteRC.pubkey).ToString());
}
// discard sendq
// TODO: retry on another session ?
while(sendq.size())
{
sendq.front().Dropped();
sendq.pop_front();
}
}
}
EnterState(eClose);
@ -675,7 +700,7 @@ namespace llarp
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
// send
if(!SendMessageBuffer(buf))
if(!SendMessageBuffer(buf, nullptr))
{
LogError("failed to repl to handshake from ", remoteAddr);
Close();

@ -39,10 +39,45 @@ namespace llarp
/// session timeout (60s)
const static llarp_time_t sessionTimeout = DefaultLinkSessionLifetime;
/// send queue for utp
std::deque< utp_iovec > vecq;
/// tx fragment queue
std::deque< FragmentBuffer > sendq;
struct OutboundMessage
{
OutboundMessage(uint32_t id, CompletionHandler func)
: msgid{id}, completed{func}
{
}
const uint32_t msgid;
std::deque< utp_iovec > vecs;
std::deque< FragmentBuffer > fragments;
CompletionHandler completed;
void
Dropped()
{
if(completed)
{
completed(DeliveryStatus::eDeliveryDropped);
completed = nullptr;
}
}
void
Delivered()
{
if(completed)
{
completed(DeliveryStatus::eDeliverySuccess);
completed = nullptr;
}
}
bool
operator<(const OutboundMessage& other) const
{
return msgid < other.msgid;
}
};
/// current rx fragment buffer
FragmentBuffer recvBuf;
/// current offset in current rx fragment buffer
@ -54,6 +89,8 @@ namespace llarp
uint32_t m_NextTXMsgID;
/// the next message id for rx
uint32_t m_NextRXMsgID;
/// messages we are currently sending
std::deque< OutboundMessage > sendq;
/// messages we are recving right now
std::unordered_map< uint32_t, InboundMessage > m_RecvMsgs;
/// are we stalled or nah?
@ -137,7 +174,8 @@ namespace llarp
/// queue a fully formed message
bool
SendMessageBuffer(const llarp_buffer_t& buf) override;
SendMessageBuffer(const llarp_buffer_t& buf,
ILinkSession::CompletionHandler) override;
/// prune expired inbound messages
void

@ -216,7 +216,7 @@ TEST_F(LinkLayerTest, TestUTPAliceRenegWithBob)
return false;
otherBuf.sz = otherBuf.cur - otherBuf.base;
otherBuf.cur = otherBuf.base;
return s->SendMessageBuffer(otherBuf);
return s->SendMessageBuffer(otherBuf, nullptr);
};
Bob.link = utp::NewServer(
@ -327,7 +327,7 @@ TEST_F(LinkLayerTest, TestUTPAliceConnectToBob)
return;
otherBuf.sz = otherBuf.cur - otherBuf.base;
otherBuf.cur = otherBuf.base;
self->SendMessageBuffer(otherBuf);
self->SendMessageBuffer(otherBuf, nullptr);
}});
return true;
},

Loading…
Cancel
Save