use lockless queues to gather results of transit traffic work

pull/935/head
Jeff Becker 5 years ago committed by Jason Rhinelander
parent d591394ad2
commit 9e305c5b30

@ -32,7 +32,11 @@ namespace llarp
return stream;
}
TransitHop::TransitHop() = default;
TransitHop::TransitHop() : m_UpstreamGather(128), m_DownstreamGather(128)
{
m_UpstreamGather.enable();
m_DownstreamGather.enable();
}
bool
TransitHop::Expired(llarp_time_t now) const
@ -118,43 +122,61 @@ namespace llarp
void
TransitHop::DownstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r)
{
std::vector< RelayDownstreamMessage > sendmsgs(msgs->size());
size_t idx = 0;
for(auto& ev : *msgs)
{
RelayDownstreamMessage msg;
const llarp_buffer_t buf(ev.first);
auto& msg = sendmsgs[idx];
msg.pathid = info.rxID;
msg.Y = ev.second ^ nonceXOR;
CryptoManager::instance()->xchacha20(buf, pathKey, ev.second);
msg.X = buf;
llarp::LogDebug("relay ", msg.X.size(), " bytes downstream from ",
info.upstream, " to ", info.downstream);
++idx;
if(m_DownstreamGather.empty() || m_DownstreamGather.full())
{
LogicCall(r->logic(), [self = shared_from_this(), r]() {
std::vector< RelayDownstreamMessage > msgs;
do
{
auto maybe = self->m_DownstreamGather.tryPopFront();
if(not maybe.has_value())
break;
msgs.emplace_back(maybe.value());
} while(true);
self->HandleAllDownstream(std::move(msgs), r);
});
}
m_DownstreamGather.pushBack(msg);
}
LogicCall(r->logic(),
std::bind(&TransitHop::HandleAllDownstream, shared_from_this(),
std::move(sendmsgs), r));
}
void
TransitHop::UpstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r)
{
std::vector< RelayUpstreamMessage > sendmsgs(msgs->size());
size_t idx = 0;
for(auto& ev : *msgs)
{
const llarp_buffer_t buf(ev.first);
auto& msg = sendmsgs[idx];
RelayUpstreamMessage msg;
CryptoManager::instance()->xchacha20(buf, pathKey, ev.second);
msg.pathid = info.txID;
msg.Y = ev.second ^ nonceXOR;
msg.X = buf;
++idx;
if(m_UpstreamGather.empty() || m_UpstreamGather.full())
{
LogicCall(r->logic(), [self = shared_from_this(), r]() {
std::vector< RelayUpstreamMessage > msgs;
do
{
auto maybe = self->m_UpstreamGather.tryPopFront();
if(not maybe.has_value())
break;
msgs.emplace_back(maybe.value());
} while(true);
self->HandleAllUpstream(std::move(msgs), r);
});
}
m_UpstreamGather.pushBack(msg);
}
LogicCall(r->logic(),
std::bind(&TransitHop::HandleAllUpstream, shared_from_this(),
std::move(sendmsgs), r));
}
void

@ -233,6 +233,8 @@ namespace llarp
std::set< std::shared_ptr< TransitHop >,
ComparePtr< std::shared_ptr< TransitHop > > >
m_FlushOthers;
thread::Queue< RelayUpstreamMessage > m_UpstreamGather;
thread::Queue< RelayDownstreamMessage > m_DownstreamGather;
};
inline std::ostream&

Loading…
Cancel
Save