diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index d39b09d35..a0c13e83e 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -34,7 +34,11 @@ namespace llarp namespace service { Endpoint::Endpoint(AbstractRouter* r, Context* parent) - : path::Builder(r, 3, path::default_len), context(parent), m_RecvQueue(128) + : path::Builder(r, 3, path::default_len) + , context(parent) + , m_InboundTrafficQueue(512) + , m_SendQueue(512) + , m_RecvQueue(512) { m_state = std::make_unique(); m_state->m_Router = r; @@ -846,8 +850,7 @@ namespace llarp && (m_state->m_ExitEnabled || m_ExitMap.ContainsValue(msg->sender.Addr()))) || msg->proto == eProtocolTrafficV4 || msg->proto == eProtocolTrafficV6) { - util::Lock l(m_state->m_InboundTrafficQueueMutex); - m_state->m_InboundTrafficQueue.emplace(msg); + m_InboundTrafficQueue.tryPushBack(std::move(msg)); return true; } if (msg->proto == eProtocolControl) @@ -886,9 +889,8 @@ namespace llarp if (f.Sign(m_Identity)) { - util::Lock lock(m_state->m_SendQueueMutex); - m_state->m_SendQueue.emplace_back( - std::make_shared(f, replyPath), path); + m_SendQueue.tryPushBack( + SendEvent_t{std::make_shared(f, replyPath), path}); } } @@ -927,9 +929,8 @@ namespace llarp { LogWarn("invalidating convotag T=", frame.T); RemoveConvoTag(frame.T); - util::Lock lock(m_state->m_SendQueueMutex); - m_state->m_SendQueue.emplace_back( - std::make_shared(f, frame.F), p); + m_SendQueue.tryPushBack( + SendEvent_t{std::make_shared(f, frame.F), p}); } } return true; @@ -1141,7 +1142,7 @@ namespace llarp void Endpoint::Pump(llarp_time_t) { const auto& sessions = m_state->m_SNodeSessions; - auto& queue = m_state->m_InboundTrafficQueue; + auto& queue = m_InboundTrafficQueue; auto epPump = [&]() { FlushRecvData(); @@ -1149,13 +1150,11 @@ namespace llarp for (const auto& item : sessions) item.second.first->FlushDownstream(); // send downstream traffic to user for hidden service - util::Lock lock(m_state->m_InboundTrafficQueueMutex); while (not queue.empty()) { - const auto& msg = queue.top(); + auto msg = queue.popFront(); const llarp_buffer_t buf(msg->payload); HandleInboundPacket(msg->tag, buf, msg->proto); - queue.pop(); } }; @@ -1174,16 +1173,15 @@ namespace llarp // TODO: locking on this container for (const auto& item : sessions) item.second.first->FlushUpstream(); + + // send queue flush + while (not m_SendQueue.empty()) { - util::Lock lock(m_state->m_SendQueueMutex); - // send outbound traffic - for (const auto& item : m_state->m_SendQueue) - { - item.second->SendRoutingMessage(*item.first, router); - MarkConvoTagActive(item.first->T.T); - } - m_state->m_SendQueue.clear(); + auto item = m_SendQueue.popFront(); + item.second->SendRoutingMessage(*item.first, router); + MarkConvoTagActive(item.first->T.T); } + UpstreamFlush(router); router->linkManager().PumpLinks(); } @@ -1274,9 +1272,8 @@ namespace llarp LogError("failed to encrypt and sign"); return; } - - util::Lock lock(self->m_state->m_SendQueueMutex); - self->m_state->m_SendQueue.emplace_back(transfer, p); + self->m_SendQueue.pushBack(SendEvent_t{transfer, p}); + ; }); return true; } diff --git a/llarp/service/endpoint.hpp b/llarp/service/endpoint.hpp index d9911a0ab..24104d98f 100644 --- a/llarp/service/endpoint.hpp +++ b/llarp/service/endpoint.hpp @@ -19,6 +19,7 @@ #include #include #include +#include #include @@ -459,6 +460,9 @@ namespace llarp std::shared_ptr m_AuthPolicy; std::unordered_map m_RemoteAuthInfos; + RecvPacketQueue_t m_InboundTrafficQueue; + SendMessageQueue_t m_SendQueue; + void FlushRecvData(); diff --git a/llarp/service/endpoint_state.hpp b/llarp/service/endpoint_state.hpp index f865e7212..a189cbdc5 100644 --- a/llarp/service/endpoint_state.hpp +++ b/llarp/service/endpoint_state.hpp @@ -40,10 +40,6 @@ namespace llarp hooks::Backend_ptr m_OnDown; hooks::Backend_ptr m_OnReady; - util::Mutex m_InboundTrafficQueueMutex; // protects m_InboundTrafficQueue - /// ordered queue for inbound hidden service traffic - RecvPacketQueue_t m_InboundTrafficQueue GUARDED_BY(m_InboundTrafficQueueMutex); - std::set m_SnodeBlacklist; AbstractRouter* m_Router; @@ -54,9 +50,6 @@ namespace llarp std::string m_NetNS; bool m_ExitEnabled = false; - util::Mutex m_SendQueueMutex; // protects m_SendQueue - std::deque m_SendQueue GUARDED_BY(m_SendQueueMutex); - PendingTraffic m_PendingTraffic; Sessions m_RemoteSessions; diff --git a/llarp/service/endpoint_types.hpp b/llarp/service/endpoint_types.hpp index 5494b8efe..23320cc37 100644 --- a/llarp/service/endpoint_types.hpp +++ b/llarp/service/endpoint_types.hpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -25,15 +26,15 @@ namespace llarp struct OutboundContext; using Msg_ptr = std::shared_ptr; + using SendEvent_t = std::pair; + using SendMessageQueue_t = thread::Queue; + using PendingBufferQueue = std::deque; using PendingTraffic = std::unordered_map; using ProtocolMessagePtr = std::shared_ptr; - using RecvPacketQueue_t = std::priority_queue< - ProtocolMessagePtr, - std::vector, - ComparePtr>; + using RecvPacketQueue_t = thread::Queue; using PendingRouters = std::unordered_map;