Merge pull request #737 from majestrate/add-link-layer-delivery-feedback

Add link layer delivery feedback
pull/738/head
Jeff 5 years ago committed by GitHub
commit af74ee6f70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -6,4 +6,5 @@
constexpr size_t MAX_LINK_MSG_SIZE = 8192;
constexpr llarp_time_t DefaultLinkSessionLifetime = 60 * 1000;
constexpr size_t MaxSendQueueSize = 128;
#endif

@ -171,10 +171,20 @@ namespace llarp
}
else if(m_SNodeKeys.find(pubKey) == m_SNodeKeys.end())
{
// we do not have it mapped
// map it
ip = ObtainServiceNodeIP(r);
msg.AddINReply(ip, isV6);
// we do not have it mapped, async obtain it
ObtainSNodeSession(
r, [&](std::shared_ptr< exit::BaseSession > session) {
if(session && session->IsReady())
{
msg.AddINReply(m_KeyToIP[pubKey], isV6);
}
else
{
msg.AddNXReply();
}
reply(msg);
});
return true;
}
else
{
@ -196,6 +206,14 @@ namespace llarp
return true;
}
void
ExitEndpoint::ObtainSNodeSession(const RouterID &router,
exit::SessionReadyFunc obtainCb)
{
ObtainServiceNodeIP(router);
m_SNodeSessions[router]->AddReadyHook(obtainCb);
}
llarp_time_t
ExitEndpoint::Now() const
{

@ -120,6 +120,11 @@ namespace llarp
huint128_t
ObtainServiceNodeIP(const RouterID& router);
/// async obtain snode session and call callback when it's ready to send
void
ObtainSNodeSession(const RouterID& router,
exit::SessionReadyFunc obtainCb);
bool
QueueSNodePacket(const llarp_buffer_t& buf, huint128_t from);

@ -34,7 +34,8 @@ namespace llarp
GetSessionMaker() const = 0;
virtual bool
SendTo(const RouterID &remote, const llarp_buffer_t &buf) = 0;
SendTo(const RouterID &remote, const llarp_buffer_t &buf,
ILinkSession::CompletionHandler completed) = 0;
virtual bool
HasSessionTo(const RouterID &remote) const = 0;

@ -35,7 +35,8 @@ namespace llarp
}
bool
LinkManager::SendTo(const RouterID &remote, const llarp_buffer_t &buf)
LinkManager::SendTo(const RouterID &remote, const llarp_buffer_t &buf,
ILinkSession::CompletionHandler completed)
{
if(stopping)
return false;
@ -43,10 +44,14 @@ namespace llarp
auto link = GetLinkWithSessionTo(remote);
if(link == nullptr)
{
if(completed)
{
completed(ILinkSession::DeliveryStatus::eDeliveryDropped);
}
return false;
}
return link->SendTo(remote, buf);
return link->SendTo(remote, buf, completed);
}
bool

@ -27,7 +27,8 @@ namespace llarp
GetSessionMaker() const override;
bool
SendTo(const RouterID &remote, const llarp_buffer_t &buf) override;
SendTo(const RouterID &remote, const llarp_buffer_t &buf,
ILinkSession::CompletionHandler completed) override;
bool
HasSessionTo(const RouterID &remote) const override;

@ -345,7 +345,8 @@ namespace llarp
}
bool
ILinkLayer::SendTo(const RouterID& remote, const llarp_buffer_t& buf)
ILinkLayer::SendTo(const RouterID& remote, const llarp_buffer_t& buf,
ILinkSession::CompletionHandler completed)
{
ILinkSession* s = nullptr;
{
@ -366,7 +367,7 @@ namespace llarp
++itr;
}
}
return s && s->SendMessageBuffer(buf);
return s && s->SendMessageBuffer(buf, completed);
}
bool

@ -141,7 +141,8 @@ namespace llarp
KeepAliveSessionTo(const RouterID& remote);
bool
SendTo(const RouterID& remote, const llarp_buffer_t& buf);
SendTo(const RouterID& remote, const llarp_buffer_t& buf,
ILinkSession::CompletionHandler completed);
bool
GetOurAddressInfo(AddressInfo& addr) const;

@ -13,12 +13,20 @@ namespace llarp
struct LinkIntroMessage;
struct ILinkMessage;
struct ILinkLayer;
struct ILinkSession
{
virtual ~ILinkSession()
{
}
/// delivery status of a message
enum class DeliveryStatus
{
eDeliverySuccess = 0,
eDeliveryDropped = 1
};
/// hook for utp for when we have established a connection
virtual void
OnLinkEstablished(ILinkLayer *p) = 0;
@ -30,9 +38,12 @@ namespace llarp
/// called every timer tick
virtual void Tick(llarp_time_t) = 0;
/// message delivery result hook function
using CompletionHandler = std::function< void(DeliveryStatus) >;
/// send a message buffer to the remote endpoint
virtual bool
SendMessageBuffer(const llarp_buffer_t &) = 0;
SendMessageBuffer(const llarp_buffer_t &, CompletionHandler handler) = 0;
/// start the connection
virtual void

@ -168,14 +168,20 @@ namespace llarp
bool
OutboundMessageHandler::Send(const RouterID &remote, const Message &msg)
{
llarp_buffer_t buf(msg.first);
if(_linkManager->SendTo(remote, buf))
const llarp_buffer_t buf(msg.first);
auto callback = msg.second;
if(!_linkManager->SendTo(
remote, buf, [=](ILinkSession::DeliveryStatus status) {
if(status == ILinkSession::DeliveryStatus::eDeliverySuccess)
DoCallback(callback, SendStatus::Success);
else
DoCallback(callback, SendStatus::Congestion);
}))
{
DoCallback(msg.second, SendStatus::Success);
return true;
DoCallback(callback, SendStatus::Congestion);
return false;
}
DoCallback(msg.second, SendStatus::Congestion);
return false;
return true;
}
bool

@ -16,7 +16,7 @@ struct llarp_buffer_t;
namespace llarp
{
struct ILinkManager;
struct Logic;
class Logic;
enum class SessionResult;
struct OutboundMessageHandler final : public IOutboundMessageHandler

@ -35,12 +35,10 @@ namespace llarp
OnConnectTimeout(ILinkSession *session) override;
void
CreateSessionTo(const RouterID &router,
RouterCallback on_result) /* override */;
CreateSessionTo(const RouterID &router, RouterCallback on_result) override;
void
CreateSessionTo(const RouterContact &rc,
RouterCallback on_result) /* override */;
CreateSessionTo(const RouterContact &rc, RouterCallback on_result) override;
bool
HavePendingSessionTo(const RouterID &router) const override;

@ -139,8 +139,8 @@ namespace llarp
// update nodedb if required
if(rc.IsPublicRouter())
{
LogInfo("Adding or updating RC for ", RouterID(rc.pubkey),
" to nodedb and dht.");
LogDebug("Adding or updating RC for ", RouterID(rc.pubkey),
" to nodedb and dht.");
_nodedb->UpdateAsyncIfNewer(rc);
_dht->impl->PutRCNodeAsync(rc);
}
@ -294,6 +294,7 @@ namespace llarp
bool
RCLookupHandler::HavePendingLookup(RouterID remote) const
{
util::Lock l(&_mutex);
return pendingCallbacks.find(remote) != pendingCallbacks.end();
}

@ -481,7 +481,7 @@ namespace llarp
// create inbound links, if we are a service node
for(const auto &serverConfig : conf->iwp_links.servers())
{
auto server = llarp::utp::NewServer(
auto server = llarp::utp::NewInboundLink(
encryption(), util::memFn(&AbstractRouter::rc, this),
util::memFn(&AbstractRouter::HandleRecvLinkMessageBuffer, this),
util::memFn(&IOutboundSessionMaker::OnSessionEstablished,
@ -1064,7 +1064,7 @@ namespace llarp
SessionEstablishedHandler, SessionRenegotiateHandler, SignBufferFunc,
TimeoutHandler, SessionClosedHandler) >;
static std::list< LinkFactory > linkFactories = {utp::NewServer,
static std::list< LinkFactory > linkFactories = {utp::NewOutboundLink,
iwp::NewServer};
bool addedAtLeastOne = false;
@ -1089,9 +1089,9 @@ namespace llarp
continue;
}
auto afs = {AF_INET, AF_INET6};
const auto afs = {AF_INET, AF_INET6};
for(auto af : afs)
for(const auto af : afs)
{
if(!link->Configure(netloop(), "*", af, m_OutboundPort))
continue;

@ -58,28 +58,20 @@ namespace llarp
bool
Logic::queue_func(std::function< void(void) > f)
{
size_t left = 10;
while(!this->thread->QueueFunc(f))
if(!this->thread->QueueFunc(f))
{
// our queue is full
if(this->can_flush())
{
// we can flush the queue here so let's do it
this->tick(llarp::time_now_ms());
}
else
{
// wait a bit and retry queuing because we are not in the same thread as
// we are calling the jobs in
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
left--;
if(left == 0) // too many retries
return false;
// try calling it later if the job queue overflows
this->call_later(1, f);
}
return true;
}
void
Logic::call_later(llarp_time_t timeout, std::function< void(void) > func)
{
llarp_timer_call_func_later(this->timer, timeout, func);
}
uint32_t
Logic::call_later(const llarp_timeout_job& job)
{

@ -47,6 +47,9 @@ namespace llarp
uint32_t
call_later(const llarp_timeout_job& job);
void
call_later(llarp_time_t later, std::function< void(void) > func);
void
cancel_call(uint32_t id);

@ -18,6 +18,7 @@ namespace llarp
uint64_t started;
uint64_t timeout;
llarp_timer_handler_func func;
std::function< void(void) > deferredFunc;
bool done;
bool canceled;
@ -117,12 +118,24 @@ struct llarp_timer_context
{
llarp::util::Lock lock(&timersMutex);
uint32_t id = ++currentId;
const uint32_t id = ++currentId;
timers.emplace(
id, std::make_unique< llarp::timer >(m_Now, timeout_ms, user, func));
return id;
}
uint32_t
call_func_later(std::function< void(void) > func, llarp_time_t timeout)
{
llarp::util::Lock lock(&timersMutex);
const uint32_t id = ++currentId;
timers.emplace(
id, std::make_unique< llarp::timer >(m_Now, timeout, nullptr, nullptr));
timers[id]->deferredFunc = func;
return id;
}
void
cancel_all() LOCKS_EXCLUDED(timersMutex)
{
@ -157,6 +170,13 @@ llarp_timer_call_later(struct llarp_timer_context* t,
return t->call_later(job.user, job.handler, job.timeout);
}
uint32_t
llarp_timer_call_func_later(struct llarp_timer_context* t, llarp_time_t timeout,
std::function< void(void) > func)
{
return t->call_func_later(func, timeout);
}
void
llarp_free_timer(struct llarp_timer_context** t)
{
@ -285,6 +305,8 @@ namespace llarp
else
call(user, timeout, diff);
}
if(deferredFunc)
deferredFunc();
done = true;
}
} // namespace llarp

@ -27,6 +27,10 @@ uint32_t
llarp_timer_call_later(struct llarp_timer_context *t,
struct llarp_timeout_job job);
uint32_t
llarp_timer_call_func_later(llarp_timer_context *t, llarp_time_t timeout,
std::function< void(void) > func);
void
llarp_timer_cancel_job(struct llarp_timer_context *t, uint32_t id);

@ -38,9 +38,6 @@ namespace llarp
/// buffer for a single utp fragment
using FragmentBuffer = AlignedBuffer< FragmentBufferSize >;
/// maximum size for send queue for a session before we drop
constexpr size_t MaxSendQueueSize = 64;
/// buffer for a link layer message
using MessageBuffer = AlignedBuffer< MAX_LINK_MSG_SIZE >;

@ -56,10 +56,7 @@ namespace llarp
if(session && link)
{
if(arg->error_code == UTP_ETIMEDOUT)
{
link->HandleTimeout(session);
}
link->HandleTimeout(session);
session->Close();
}
return 0;
@ -76,14 +73,16 @@ namespace llarp
LinkMessageHandler h, SignBufferFunc sign,
SessionEstablishedHandler established,
SessionRenegotiateHandler reneg,
TimeoutHandler timeout, SessionClosedHandler closed)
TimeoutHandler timeout, SessionClosedHandler closed,
bool permitInbound)
: ILinkLayer(routerEncSecret, getrc, h, sign, established, reneg,
timeout, closed)
{
_utp_ctx = utp_init(2);
utp_context_set_userdata(_utp_ctx, this);
utp_set_callback(_utp_ctx, UTP_SENDTO, &LinkLayer::SendTo);
utp_set_callback(_utp_ctx, UTP_ON_ACCEPT, &LinkLayer::OnAccept);
if(permitInbound)
utp_set_callback(_utp_ctx, UTP_ON_ACCEPT, &LinkLayer::OnAccept);
utp_set_callback(_utp_ctx, UTP_ON_CONNECT, &LinkLayer::OnConnect);
utp_set_callback(_utp_ctx, UTP_ON_STATE_CHANGE,
&LinkLayer::OnStateChange);
@ -93,8 +92,12 @@ namespace llarp
utp_context_set_option(_utp_ctx, UTP_LOG_NORMAL, 1);
utp_context_set_option(_utp_ctx, UTP_LOG_MTU, 1);
utp_context_set_option(_utp_ctx, UTP_LOG_DEBUG, 1);
utp_context_set_option(_utp_ctx, UTP_SNDBUF, MAX_LINK_MSG_SIZE * 64);
utp_context_set_option(_utp_ctx, UTP_RCVBUF, MAX_LINK_MSG_SIZE * 64);
utp_context_set_option(
_utp_ctx, UTP_SNDBUF,
(MAX_LINK_MSG_SIZE * MaxSendQueueSize * size_t{3}) / size_t{2});
utp_context_set_option(
_utp_ctx, UTP_RCVBUF,
(MAX_LINK_MSG_SIZE * MaxSendQueueSize * size_t{3}) / size_t{2});
}
LinkLayer::~LinkLayer()

@ -50,7 +50,7 @@ namespace llarp
LinkMessageHandler h, SignBufferFunc sign,
SessionEstablishedHandler established,
SessionRenegotiateHandler reneg, TimeoutHandler timeout,
SessionClosedHandler closed);
SessionClosedHandler closed, bool acceptInbound);
/// destruct
~LinkLayer();

@ -26,38 +26,51 @@ namespace llarp
return;
ssize_t expect = 0;
std::vector< utp_iovec > send;
for(const auto& vec : vecq)
for(const auto& msg : sendq)
{
if(vec.iov_len)
for(const auto& vec : msg.vecs)
{
expect += vec.iov_len;
send.emplace_back(vec);
if(vec.iov_len)
{
expect += vec.iov_len;
send.emplace_back(vec);
}
}
}
if(expect)
{
ssize_t s = utp_writev(sock, send.data(), send.size());
if(s < 0)
if(s <= 0)
return;
if(s > 0)
lastSend = parent->Now();
lastSend = parent->Now();
metrics::integerTick("utp.session.tx", "writes", s, "id",
RouterID(remoteRC.pubkey).ToString());
m_TXRate += s;
size_t sz = s;
while(vecq.size() && sz >= vecq.front().iov_len)
{
sz -= vecq.front().iov_len;
vecq.pop_front();
sendq.pop_front();
}
if(vecq.size())
do
{
auto& front = vecq.front();
front.iov_len -= sz;
front.iov_base = ((byte_t*)front.iov_base) + sz;
}
auto& msg = sendq.front();
while(msg.vecs.size() && sz >= msg.vecs.front().iov_len)
{
sz -= msg.vecs.front().iov_len;
msg.vecs.pop_front();
msg.fragments.pop_front();
}
if(msg.vecs.size() == 0)
{
msg.Delivered();
sendq.pop_front();
}
else
{
auto& front = msg.vecs.front();
front.iov_len -= sz;
front.iov_base = ((byte_t*)front.iov_base) + sz;
return;
}
} while(sendq.size());
}
}
@ -257,14 +270,16 @@ namespace llarp
}
bool
Session::SendMessageBuffer(const llarp_buffer_t& buf)
Session::SendMessageBuffer(
const llarp_buffer_t& buf,
ILinkSession::CompletionHandler completionHandler)
{
if(sendq.size() >= MaxSendQueueSize)
if(SendQueueBacklog() >= MaxSendQueueSize)
{
// pump write queue if we seem to be full
PumpWrite();
}
if(sendq.size() >= MaxSendQueueSize)
if(SendQueueBacklog() >= MaxSendQueueSize)
{
// we didn't pump anything wtf
// this means we're stalled
@ -273,12 +288,14 @@ namespace llarp
size_t sz = buf.sz;
byte_t* ptr = buf.base;
uint32_t msgid = m_NextTXMsgID++;
sendq.emplace_back(msgid, completionHandler);
while(sz)
{
uint32_t s = std::min(FragmentBodyPayloadSize, sz);
if(!EncryptThenHash(ptr, msgid, s, sz - s))
{
LogError("EncryptThenHash failed?!");
Close();
return false;
}
LogDebug("encrypted ", s, " bytes");
@ -302,7 +319,7 @@ namespace llarp
return false;
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
return this->SendMessageBuffer(buf);
return this->SendMessageBuffer(buf, nullptr);
}
return true;
}
@ -340,7 +357,7 @@ namespace llarp
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
// send
if(!SendMessageBuffer(buf))
if(!SendMessageBuffer(buf, nullptr))
{
LogError("failed to send handshake to ", remoteAddr);
Close();
@ -370,10 +387,11 @@ namespace llarp
uint16_t remaining)
{
sendq.emplace_back();
auto& buf = sendq.back();
vecq.emplace_back();
auto& vec = vecq.back();
auto& msg = sendq.back();
msg.vecs.emplace_back();
auto& vec = msg.vecs.back();
msg.fragments.emplace_back();
auto& buf = msg.fragments.back();
vec.iov_base = buf.data();
vec.iov_len = FragmentBufferSize;
buf.Randomize();
@ -472,7 +490,7 @@ namespace llarp
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
// send message
if(!SendMessageBuffer(buf))
if(!SendMessageBuffer(buf, nullptr))
return false;
// regen our tx Key
return DoClientKeyExchange(txKey, lim.N, remoteRC.enckey,
@ -600,6 +618,13 @@ namespace llarp
metrics::integerTick("utp.session.close", "to", 1, "id",
RouterID(remoteRC.pubkey).ToString());
}
// discard sendq
// TODO: retry on another session ?
while(sendq.size())
{
sendq.front().Dropped();
sendq.pop_front();
}
}
}
EnterState(eClose);
@ -675,7 +700,7 @@ namespace llarp
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
// send
if(!SendMessageBuffer(buf))
if(!SendMessageBuffer(buf, nullptr))
{
LogError("failed to repl to handshake from ", remoteAddr);
Close();

@ -39,10 +39,45 @@ namespace llarp
/// session timeout (60s)
const static llarp_time_t sessionTimeout = DefaultLinkSessionLifetime;
/// send queue for utp
std::deque< utp_iovec > vecq;
/// tx fragment queue
std::deque< FragmentBuffer > sendq;
struct OutboundMessage
{
OutboundMessage(uint32_t id, CompletionHandler func)
: msgid{id}, completed{func}
{
}
const uint32_t msgid;
std::deque< utp_iovec > vecs;
std::deque< FragmentBuffer > fragments;
CompletionHandler completed;
void
Dropped()
{
if(completed)
{
completed(DeliveryStatus::eDeliveryDropped);
completed = nullptr;
}
}
void
Delivered()
{
if(completed)
{
completed(DeliveryStatus::eDeliverySuccess);
completed = nullptr;
}
}
bool
operator<(const OutboundMessage& other) const
{
return msgid < other.msgid;
}
};
/// current rx fragment buffer
FragmentBuffer recvBuf;
/// current offset in current rx fragment buffer
@ -54,6 +89,10 @@ namespace llarp
uint32_t m_NextTXMsgID;
/// the next message id for rx
uint32_t m_NextRXMsgID;
using SendQueue_t = std::deque< OutboundMessage >;
/// messages we are currently sending
SendQueue_t sendq;
/// messages we are recving right now
std::unordered_map< uint32_t, InboundMessage > m_RecvMsgs;
/// are we stalled or nah?
@ -137,7 +176,8 @@ namespace llarp
/// queue a fully formed message
bool
SendMessageBuffer(const llarp_buffer_t& buf) override;
SendMessageBuffer(const llarp_buffer_t& buf,
ILinkSession::CompletionHandler) override;
/// prune expired inbound messages
void

@ -9,13 +9,23 @@ namespace llarp
namespace utp
{
LinkLayer_ptr
NewServer(const SecretKey& routerEncSecret, GetRCFunc getrc,
LinkMessageHandler h, SessionEstablishedHandler est,
SessionRenegotiateHandler reneg, SignBufferFunc sign,
TimeoutHandler timeout, SessionClosedHandler closed)
NewOutboundLink(const SecretKey& routerEncSecret, GetRCFunc getrc,
LinkMessageHandler h, SessionEstablishedHandler est,
SessionRenegotiateHandler reneg, SignBufferFunc sign,
TimeoutHandler timeout, SessionClosedHandler closed)
{
return std::make_shared< LinkLayer >(routerEncSecret, getrc, h, sign, est,
reneg, timeout, closed);
reneg, timeout, closed, false);
}
LinkLayer_ptr
NewInboundLink(const SecretKey& routerEncSecret, GetRCFunc getrc,
LinkMessageHandler h, SessionEstablishedHandler est,
SessionRenegotiateHandler reneg, SignBufferFunc sign,
TimeoutHandler timeout, SessionClosedHandler closed)
{
return std::make_shared< LinkLayer >(routerEncSecret, getrc, h, sign, est,
reneg, timeout, closed, true);
}
} // namespace utp

@ -11,10 +11,17 @@ namespace llarp
namespace utp
{
LinkLayer_ptr
NewServer(const SecretKey& routerEncSecret, GetRCFunc getrc,
LinkMessageHandler h, SessionEstablishedHandler est,
SessionRenegotiateHandler reneg, SignBufferFunc sign,
TimeoutHandler timeout, SessionClosedHandler closed);
NewInboundLink(const SecretKey& routerEncSecret, GetRCFunc getrc,
LinkMessageHandler h, SessionEstablishedHandler est,
SessionRenegotiateHandler reneg, SignBufferFunc sign,
TimeoutHandler timeout, SessionClosedHandler closed);
LinkLayer_ptr
NewOutboundLink(const SecretKey& routerEncSecret, GetRCFunc getrc,
LinkMessageHandler h, SessionEstablishedHandler est,
SessionRenegotiateHandler reneg, SignBufferFunc sign,
TimeoutHandler timeout, SessionClosedHandler closed);
/// shim
const auto NewServer = NewInboundLink;
} // namespace utp
} // namespace llarp

@ -216,7 +216,7 @@ TEST_F(LinkLayerTest, TestUTPAliceRenegWithBob)
return false;
otherBuf.sz = otherBuf.cur - otherBuf.base;
otherBuf.cur = otherBuf.base;
return s->SendMessageBuffer(otherBuf);
return s->SendMessageBuffer(otherBuf, nullptr);
};
Bob.link = utp::NewServer(
@ -327,7 +327,7 @@ TEST_F(LinkLayerTest, TestUTPAliceConnectToBob)
return;
otherBuf.sz = otherBuf.cur - otherBuf.base;
otherBuf.cur = otherBuf.base;
self->SendMessageBuffer(otherBuf);
self->SendMessageBuffer(otherBuf, nullptr);
}});
return true;
},

Loading…
Cancel
Save