Fix client latency bug; De-shared_ptr IHopHandler queues

- Replace m_FlushWakeup with a call to the router's god mode pump
  method.  m_FlushWakeup apparently isn't enough to get things out, and
  we can end up with incoming packets that don't get properly handled
  right away without it.

- The shared_ptr around the ihophandler queues isn't needed and is just
  adding a layer of obfuscation; instead just exchange the list directly
  into the lambda.

- Use std::exchange rather than swap

- A couple other small code cleanups.
pull/1795/head
Jason Rhinelander 3 years ago
parent 70553c7627
commit 0fe7153f6e

@ -9,10 +9,7 @@ namespace llarp
bool
IHopHandler::HandleUpstream(const llarp_buffer_t& X, const TunnelNonce& Y, AbstractRouter* r)
{
if (m_UpstreamQueue == nullptr)
m_UpstreamQueue = std::make_shared<TrafficQueue_t>();
m_UpstreamQueue->emplace_back();
auto& pkt = m_UpstreamQueue->back();
auto& pkt = m_UpstreamQueue.emplace_back();
pkt.first.resize(X.sz);
std::copy_n(X.base, X.sz, pkt.first.begin());
pkt.second = Y;
@ -24,10 +21,7 @@ namespace llarp
bool
IHopHandler::HandleDownstream(const llarp_buffer_t& X, const TunnelNonce& Y, AbstractRouter* r)
{
if (m_DownstreamQueue == nullptr)
m_DownstreamQueue = std::make_shared<TrafficQueue_t>();
m_DownstreamQueue->emplace_back();
auto& pkt = m_DownstreamQueue->back();
auto& pkt = m_DownstreamQueue.emplace_back();
pkt.first.resize(X.sz);
std::copy_n(X.base, X.sz, pkt.first.begin());
pkt.second = Y;

@ -26,7 +26,6 @@ namespace llarp
{
using TrafficEvent_t = std::pair<std::vector<byte_t>, TunnelNonce>;
using TrafficQueue_t = std::list<TrafficEvent_t>;
using TrafficQueue_ptr = std::shared_ptr<TrafficQueue_t>;
virtual ~IHopHandler() = default;
@ -74,16 +73,16 @@ namespace llarp
protected:
uint64_t m_SequenceNum = 0;
TrafficQueue_ptr m_UpstreamQueue;
TrafficQueue_ptr m_DownstreamQueue;
TrafficQueue_t m_UpstreamQueue;
TrafficQueue_t m_DownstreamQueue;
util::DecayingHashSet<TunnelNonce> m_UpstreamReplayFilter;
util::DecayingHashSet<TunnelNonce> m_DownstreamReplayFilter;
virtual void
UpstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) = 0;
UpstreamWork(TrafficQueue_t queue, AbstractRouter* r) = 0;
virtual void
DownstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) = 0;
DownstreamWork(TrafficQueue_t queue, AbstractRouter* r) = 0;
virtual void
HandleAllUpstream(std::vector<RelayUpstreamMessage> msgs, AbstractRouter* r) = 0;

@ -29,7 +29,7 @@ namespace llarp
std::weak_ptr<PathSet> pathset,
PathRole startingRoles,
std::string shortName)
: m_PathSet{pathset}, _role{startingRoles}, m_shortName{std::move(shortName)}
: m_PathSet{std::move(pathset)}, _role{startingRoles}, m_shortName{std::move(shortName)}
{
hops.resize(h.size());
@ -492,11 +492,11 @@ namespace llarp
}
void
Path::UpstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r)
Path::UpstreamWork(TrafficQueue_t msgs, AbstractRouter* r)
{
std::vector<RelayUpstreamMessage> sendmsgs(msgs->size());
std::vector<RelayUpstreamMessage> sendmsgs(msgs.size());
size_t idx = 0;
for (auto& ev : *msgs)
for (auto& ev : msgs)
{
const llarp_buffer_t buf(ev.first);
TunnelNonce n = ev.second;
@ -519,24 +519,22 @@ namespace llarp
void
Path::FlushUpstream(AbstractRouter* r)
{
if (m_UpstreamQueue && not m_UpstreamQueue->empty())
if (not m_UpstreamQueue.empty())
{
TrafficQueue_ptr data = nullptr;
std::swap(m_UpstreamQueue, data);
r->QueueWork(
[self = shared_from_this(), data, r]() { self->UpstreamWork(std::move(data), r); });
r->QueueWork([self = shared_from_this(),
data = std::exchange(m_UpstreamQueue, {}),
r]() mutable { self->UpstreamWork(std::move(data), r); });
}
}
void
Path::FlushDownstream(AbstractRouter* r)
{
if (m_DownstreamQueue && not m_DownstreamQueue->empty())
if (not m_DownstreamQueue.empty())
{
TrafficQueue_ptr data = nullptr;
std::swap(m_DownstreamQueue, data);
r->QueueWork(
[self = shared_from_this(), data, r]() { self->DownstreamWork(std::move(data), r); });
r->QueueWork([self = shared_from_this(),
data = std::exchange(m_DownstreamQueue, {}),
r]() mutable { self->DownstreamWork(std::move(data), r); });
}
}
@ -570,11 +568,11 @@ namespace llarp
}
void
Path::DownstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r)
Path::DownstreamWork(TrafficQueue_t msgs, AbstractRouter* r)
{
std::vector<RelayDownstreamMessage> sendMsgs(msgs->size());
std::vector<RelayDownstreamMessage> sendMsgs(msgs.size());
size_t idx = 0;
for (auto& ev : *msgs)
for (auto& ev : msgs)
{
const llarp_buffer_t buf(ev.first);
sendMsgs[idx].Y = ev.second;

@ -388,10 +388,10 @@ namespace llarp
protected:
void
UpstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) override;
UpstreamWork(TrafficQueue_t queue, AbstractRouter* r) override;
void
DownstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) override;
DownstreamWork(TrafficQueue_t queue, AbstractRouter* r) override;
void
HandleAllUpstream(std::vector<RelayUpstreamMessage> msgs, AbstractRouter* r) override;

@ -104,7 +104,7 @@ namespace llarp
}
void
TransitHop::DownstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r)
TransitHop::DownstreamWork(TrafficQueue_t msgs, AbstractRouter* r)
{
auto flushIt = [self = shared_from_this(), r]() {
std::vector<RelayDownstreamMessage> msgs;
@ -114,7 +114,7 @@ namespace llarp
}
self->HandleAllDownstream(std::move(msgs), r);
};
for (auto& ev : *msgs)
for (auto& ev : msgs)
{
RelayDownstreamMessage msg;
const llarp_buffer_t buf(ev.first);
@ -140,9 +140,9 @@ namespace llarp
}
void
TransitHop::UpstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r)
TransitHop::UpstreamWork(TrafficQueue_t msgs, AbstractRouter* r)
{
for (auto& ev : *msgs)
for (auto& ev : msgs)
{
const llarp_buffer_t buf(ev.first);
RelayUpstreamMessage msg;
@ -223,25 +223,23 @@ namespace llarp
void
TransitHop::FlushUpstream(AbstractRouter* r)
{
if (m_UpstreamQueue && not m_UpstreamQueue->empty())
if (not m_UpstreamQueue.empty())
{
r->QueueWork([self = shared_from_this(), data = std::move(m_UpstreamQueue), r]() mutable {
self->UpstreamWork(std::move(data), r);
});
r->QueueWork([self = shared_from_this(),
data = std::exchange(m_UpstreamQueue, {}),
r]() mutable { self->UpstreamWork(std::move(data), r); });
}
m_UpstreamQueue = nullptr;
}
void
TransitHop::FlushDownstream(AbstractRouter* r)
{
if (m_DownstreamQueue && not m_DownstreamQueue->empty())
if (not m_DownstreamQueue.empty())
{
r->QueueWork([self = shared_from_this(), data = std::move(m_DownstreamQueue), r]() mutable {
self->DownstreamWork(std::move(data), r);
});
r->QueueWork([self = shared_from_this(),
data = std::exchange(m_DownstreamQueue, {}),
r]() mutable { self->DownstreamWork(std::move(data), r); });
}
m_DownstreamQueue = nullptr;
}
/// this is where a DHT message is handled at the end of a path, that is,

@ -190,10 +190,10 @@ namespace llarp
protected:
void
UpstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) override;
UpstreamWork(TrafficQueue_t queue, AbstractRouter* r) override;
void
DownstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) override;
DownstreamWork(TrafficQueue_t queue, AbstractRouter* r) override;
void
HandleAllUpstream(std::vector<RelayUpstreamMessage> msgs, AbstractRouter* r) override;

@ -1079,14 +1079,11 @@ namespace llarp
void
Endpoint::FlushRecvData()
{
do
while (auto maybe = m_RecvQueue.tryPopFront())
{
auto maybe = m_RecvQueue.tryPopFront();
if (not maybe)
return;
auto ev = std::move(*maybe);
auto& ev = *maybe;
ProtocolMessage::ProcessAsync(ev.fromPath, ev.pathid, ev.msg);
} while (true);
}
}
void

@ -21,19 +21,20 @@ namespace llarp
, m_Endpoint(ep)
, createdAt(ep->Now())
, m_SendQueue(SendContextQueueSize)
{
m_FlushWakeup = ep->Loop()->make_waker([this] { FlushUpstream(); });
}
{}
bool
SendContext::Send(std::shared_ptr<ProtocolFrame> msg, path::Path_ptr path)
{
if (not path->IsReady())
return false;
m_FlushWakeup->Trigger();
return m_SendQueue.tryPushBack(std::make_pair(
std::make_shared<routing::PathTransferMessage>(*msg, remoteIntro.pathID), path))
== thread::QueueReturn::Success;
if (path->IsReady()
and m_SendQueue.tryPushBack(std::make_pair(
std::make_shared<routing::PathTransferMessage>(*msg, remoteIntro.pathID), path))
== thread::QueueReturn::Success)
{
m_Endpoint->Router()->TriggerPump();
return true;
}
return false;
}
void
@ -42,19 +43,17 @@ namespace llarp
auto r = m_Endpoint->Router();
std::unordered_set<path::Path_ptr, path::Path::Ptr_Hash> flushpaths;
auto rttRMS = 0ms;
while (auto maybe = m_SendQueue.tryPopFront())
{
while (auto maybe = m_SendQueue.tryPopFront())
auto& [msg, path] = *maybe;
msg->S = path->NextSeqNo();
if (path->SendRoutingMessage(*msg, r))
{
auto& [msg, path] = *maybe;
msg->S = path->NextSeqNo();
if (path->SendRoutingMessage(*msg, r))
{
lastGoodSend = r->Now();
flushpaths.emplace(path);
m_Endpoint->ConvoTagTX(msg->T.T);
const auto rtt = (path->intro.latency + remoteIntro.latency) * 2;
rttRMS += rtt * rtt.count();
}
lastGoodSend = r->Now();
flushpaths.emplace(path);
m_Endpoint->ConvoTagTX(msg->T.T);
const auto rtt = (path->intro.latency + remoteIntro.latency) * 2;
rttRMS += rtt * rtt.count();
}
}
// flush the select path's upstream

@ -56,8 +56,6 @@ namespace llarp
std::function<void(AuthResult)> authResultListener;
std::shared_ptr<EventLoopWakeup> m_FlushWakeup;
virtual bool
ShiftIntroduction(bool rebuild = true)
{

Loading…
Cancel
Save