From 9e305c5b30371c1f26626e6fc56647e11a1576de Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Thu, 28 Nov 2019 10:52:10 -0500 Subject: [PATCH] use lockless queues to gather results of transit traffic work --- llarp/path/transit_hop.cpp | 52 +++++++++++++++++++++++++++----------- llarp/path/transit_hop.hpp | 2 ++ 2 files changed, 39 insertions(+), 15 deletions(-) diff --git a/llarp/path/transit_hop.cpp b/llarp/path/transit_hop.cpp index 34ff3579f..fd1b370d9 100644 --- a/llarp/path/transit_hop.cpp +++ b/llarp/path/transit_hop.cpp @@ -32,7 +32,11 @@ namespace llarp return stream; } - TransitHop::TransitHop() = default; + TransitHop::TransitHop() : m_UpstreamGather(128), m_DownstreamGather(128) + { + m_UpstreamGather.enable(); + m_DownstreamGather.enable(); + } bool TransitHop::Expired(llarp_time_t now) const @@ -118,43 +122,61 @@ namespace llarp void TransitHop::DownstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r) { - std::vector< RelayDownstreamMessage > sendmsgs(msgs->size()); - size_t idx = 0; for(auto& ev : *msgs) { + RelayDownstreamMessage msg; const llarp_buffer_t buf(ev.first); - auto& msg = sendmsgs[idx]; msg.pathid = info.rxID; msg.Y = ev.second ^ nonceXOR; CryptoManager::instance()->xchacha20(buf, pathKey, ev.second); msg.X = buf; llarp::LogDebug("relay ", msg.X.size(), " bytes downstream from ", info.upstream, " to ", info.downstream); - ++idx; + if(m_DownstreamGather.empty() || m_DownstreamGather.full()) + { + LogicCall(r->logic(), [self = shared_from_this(), r]() { + std::vector< RelayDownstreamMessage > msgs; + do + { + auto maybe = self->m_DownstreamGather.tryPopFront(); + if(not maybe.has_value()) + break; + msgs.emplace_back(maybe.value()); + } while(true); + self->HandleAllDownstream(std::move(msgs), r); + }); + } + m_DownstreamGather.pushBack(msg); } - LogicCall(r->logic(), - std::bind(&TransitHop::HandleAllDownstream, shared_from_this(), - std::move(sendmsgs), r)); } void TransitHop::UpstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r) { - std::vector< RelayUpstreamMessage > sendmsgs(msgs->size()); - size_t idx = 0; for(auto& ev : *msgs) { const llarp_buffer_t buf(ev.first); - auto& msg = sendmsgs[idx]; + RelayUpstreamMessage msg; CryptoManager::instance()->xchacha20(buf, pathKey, ev.second); msg.pathid = info.txID; msg.Y = ev.second ^ nonceXOR; msg.X = buf; - ++idx; + if(m_UpstreamGather.empty() || m_UpstreamGather.full()) + { + LogicCall(r->logic(), [self = shared_from_this(), r]() { + std::vector< RelayUpstreamMessage > msgs; + do + { + auto maybe = self->m_UpstreamGather.tryPopFront(); + if(not maybe.has_value()) + break; + msgs.emplace_back(maybe.value()); + } while(true); + self->HandleAllUpstream(std::move(msgs), r); + }); + } + m_UpstreamGather.pushBack(msg); } - LogicCall(r->logic(), - std::bind(&TransitHop::HandleAllUpstream, shared_from_this(), - std::move(sendmsgs), r)); } void diff --git a/llarp/path/transit_hop.hpp b/llarp/path/transit_hop.hpp index 60300b541..3e9648526 100644 --- a/llarp/path/transit_hop.hpp +++ b/llarp/path/transit_hop.hpp @@ -233,6 +233,8 @@ namespace llarp std::set< std::shared_ptr< TransitHop >, ComparePtr< std::shared_ptr< TransitHop > > > m_FlushOthers; + thread::Queue< RelayUpstreamMessage > m_UpstreamGather; + thread::Queue< RelayDownstreamMessage > m_DownstreamGather; }; inline std::ostream&