From ba0fd223d96b460b1218c7fc45502caa22c3d127 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Mon, 25 Nov 2019 08:18:24 -0500 Subject: [PATCH] reduce number of jobs we put onto the logic thread --- Makefile | 2 +- llarp/handlers/tun.cpp | 38 +++++++++++++++++++++----------------- llarp/handlers/tun.hpp | 3 +++ llarp/service/endpoint.cpp | 29 ++++++++++++++++++++--------- llarp/service/protocol.cpp | 7 ++----- 5 files changed, 47 insertions(+), 32 deletions(-) diff --git a/Makefile b/Makefile index e2ab5cd34..a37be1e0b 100644 --- a/Makefile +++ b/Makefile @@ -16,7 +16,7 @@ BUILD_TYPE ?= Debug PYTHON ?= python PYTHON3 ?= python3 -FORMAT ?= clang-format +FORMAT ?= clang-format-8 SETCAP ?= which setcap && setcap cap_net_admin,cap_net_bind_service=+eip diff --git a/llarp/handlers/tun.cpp b/llarp/handlers/tun.cpp index 40e9e842f..97fad6a0d 100644 --- a/llarp/handlers/tun.cpp +++ b/llarp/handlers/tun.cpp @@ -978,17 +978,25 @@ namespace llarp TunEndpoint::tunifBeforeWrite(llarp_tun_io *tun) { // called in the isolated network thread - auto *self = static_cast< TunEndpoint * >(tun->user); - auto sendpkt = [self, tun](net::IPPacket &pkt) -> bool { - if(!llarp_ev_tun_async_write(tun, pkt.Buffer())) + auto *self = static_cast< TunEndpoint * >(tun->user); + auto _pkts = std::move(self->m_TunPkts); + self->m_TunPkts = std::vector< net::IPPacket >(); + + LogicCall(self->EndpointLogic(), [tun, self, pkts = std::move(_pkts)]() { + for(auto &pkt : pkts) { - llarp::LogWarn(self->Name(), " packet dropped"); - return true; + self->m_UserToNetworkPktQueue.Emplace(pkt); } - return false; - }; - LogicCall(self->EndpointLogic(), - std::bind(&TunEndpoint::FlushToUser, self, sendpkt)); + self->Flush(); + self->FlushToUser([self, tun](net::IPPacket &pkt) -> bool { + if(!llarp_ev_tun_async_write(tun, pkt.Buffer())) + { + llarp::LogWarn(self->Name(), " packet dropped"); + return true; + } + return false; + }); + }); } void @@ -996,14 +1004,10 @@ namespace llarp { // called for every packet read from user in isolated network thread auto *self = static_cast< TunEndpoint * >(tun->user); - std::vector< byte_t > pkt; - pkt.resize(b.sz); - std::copy_n(b.base, b.sz, pkt.data()); - LogicCall(self->RouterLogic(), [self, buffer = std::move(pkt)]() { - const llarp_buffer_t pbuf(buffer); - self->m_UserToNetworkPktQueue.EmplaceIf( - [&pbuf](net::IPPacket &p) -> bool { return p.Load(pbuf); }); - }); + net::IPPacket pkt; + if(not pkt.Load(b)) + return; + self->m_TunPkts.emplace_back(pkt); } TunEndpoint::~TunEndpoint() = default; diff --git a/llarp/handlers/tun.hpp b/llarp/handlers/tun.hpp index 2889e7a5d..1cf4d0c86 100644 --- a/llarp/handlers/tun.hpp +++ b/llarp/handlers/tun.hpp @@ -185,6 +185,9 @@ namespace llarp using PacketQueue_t = llarp::util::CoDelQueue< net::IPPacket, net::IPPacket::GetTime, net::IPPacket::PutTime, net::IPPacket::CompareOrder, net::IPPacket::GetNow >; + + /// queue packet for send on net thread from user + std::vector< net::IPPacket > m_TunPkts; /// queue for sending packets over the network from us PacketQueue_t m_UserToNetworkPktQueue; /// queue for sending packets to user from network diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index 94c344f7c..97c799548 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -1047,20 +1047,29 @@ namespace llarp const auto& sessions = m_state->m_SNodeSessions; auto& queue = m_state->m_InboundTrafficQueue; - LogicCall(EndpointLogic(), [&]() { + auto epPump = [&]() { // send downstream packets to user for snode for(const auto& item : sessions) item.second.first->FlushDownstream(); // send downstream traffic to user for hidden service util::Lock lock(&m_state->m_InboundTrafficQueueMutex); - while(queue.size()) + while(not queue.empty()) { const auto& msg = queue.top(); const llarp_buffer_t buf(msg->payload); HandleInboundPacket(msg->tag, buf, msg->proto); queue.pop(); } - }); + }; + + if(NetworkIsIsolated()) + { + LogicCall(EndpointLogic(), epPump); + } + else + { + epPump(); + } auto router = Router(); // TODO: locking on this container @@ -1069,14 +1078,16 @@ namespace llarp // TODO: locking on this container for(const auto& item : sessions) item.second.first->FlushUpstream(); - util::Lock lock(&m_state->m_SendQueueMutex); - // send outbound traffic - for(const auto& item : m_state->m_SendQueue) { - item.second->SendRoutingMessage(*item.first, router); - MarkConvoTagActive(item.first->T.T); + util::Lock lock(&m_state->m_SendQueueMutex); + // send outbound traffic + for(const auto& item : m_state->m_SendQueue) + { + item.second->SendRoutingMessage(*item.first, router); + MarkConvoTagActive(item.first->T.T); + } + m_state->m_SendQueue.clear(); } - m_state->m_SendQueue.clear(); router->PumpLL(); } diff --git a/llarp/service/protocol.cpp b/llarp/service/protocol.cpp index 236998848..d603b8015 100644 --- a/llarp/service/protocol.cpp +++ b/llarp/service/protocol.cpp @@ -407,11 +407,8 @@ namespace llarp LogError("convotag missmatch: ", T, " != ", msg->tag); return false; } - msg->handler = handler; - const PathID_t fromPath = F; - LogicCall(logic, [=]() { - ProtocolMessage::ProcessAsync(recvPath, fromPath, msg); - }); + msg->handler = handler; + ProtocolMessage::ProcessAsync(recvPath, F, msg); return true; }