|
|
|
@ -142,14 +142,6 @@ namespace llarp
|
|
|
|
|
void
|
|
|
|
|
TransitHop::UpstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r)
|
|
|
|
|
{
|
|
|
|
|
auto flushIt = [self = shared_from_this(), r]() {
|
|
|
|
|
std::vector<RelayUpstreamMessage> msgs;
|
|
|
|
|
while (auto maybe = self->m_UpstreamGather.tryPopFront())
|
|
|
|
|
{
|
|
|
|
|
msgs.push_back(*maybe);
|
|
|
|
|
}
|
|
|
|
|
self->HandleAllUpstream(std::move(msgs), r);
|
|
|
|
|
};
|
|
|
|
|
for (auto& ev : *msgs)
|
|
|
|
|
{
|
|
|
|
|
const llarp_buffer_t buf(ev.first);
|
|
|
|
@ -158,14 +150,19 @@ namespace llarp
|
|
|
|
|
msg.pathid = info.txID;
|
|
|
|
|
msg.Y = ev.second ^ nonceXOR;
|
|
|
|
|
msg.X = buf;
|
|
|
|
|
if (m_UpstreamGather.full())
|
|
|
|
|
if (m_UpstreamGather.tryPushBack(msg) != thread::QueueReturn::Success)
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Flush it:
|
|
|
|
|
r->loop()->call([self = shared_from_this(), r] {
|
|
|
|
|
std::vector<RelayUpstreamMessage> msgs;
|
|
|
|
|
while (auto maybe = self->m_UpstreamGather.tryPopFront())
|
|
|
|
|
{
|
|
|
|
|
r->loop()->call(flushIt);
|
|
|
|
|
msgs.push_back(*maybe);
|
|
|
|
|
}
|
|
|
|
|
if (m_UpstreamGather.enabled())
|
|
|
|
|
m_UpstreamGather.pushBack(msg);
|
|
|
|
|
}
|
|
|
|
|
r->loop()->call(flushIt);
|
|
|
|
|
self->HandleAllUpstream(std::move(msgs), r);
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void
|
|
|
|
|