remove locks (#1336)

* remove locks

* use tryPushBack to attempt to prevent deadlocks
This commit is contained in:
Jeff 2020-09-04 15:55:49 -04:00 committed by GitHub
parent 838d28c6ed
commit 681459185f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 30 additions and 35 deletions

View File

@ -34,7 +34,11 @@ namespace llarp
namespace service namespace service
{ {
Endpoint::Endpoint(AbstractRouter* r, Context* parent) Endpoint::Endpoint(AbstractRouter* r, Context* parent)
: path::Builder(r, 3, path::default_len), context(parent), m_RecvQueue(128) : path::Builder(r, 3, path::default_len)
, context(parent)
, m_InboundTrafficQueue(512)
, m_SendQueue(512)
, m_RecvQueue(512)
{ {
m_state = std::make_unique<EndpointState>(); m_state = std::make_unique<EndpointState>();
m_state->m_Router = r; m_state->m_Router = r;
@ -846,8 +850,7 @@ namespace llarp
&& (m_state->m_ExitEnabled || m_ExitMap.ContainsValue(msg->sender.Addr()))) && (m_state->m_ExitEnabled || m_ExitMap.ContainsValue(msg->sender.Addr())))
|| msg->proto == eProtocolTrafficV4 || msg->proto == eProtocolTrafficV6) || msg->proto == eProtocolTrafficV4 || msg->proto == eProtocolTrafficV6)
{ {
util::Lock l(m_state->m_InboundTrafficQueueMutex); m_InboundTrafficQueue.tryPushBack(std::move(msg));
m_state->m_InboundTrafficQueue.emplace(msg);
return true; return true;
} }
if (msg->proto == eProtocolControl) if (msg->proto == eProtocolControl)
@ -886,9 +889,8 @@ namespace llarp
if (f.Sign(m_Identity)) if (f.Sign(m_Identity))
{ {
util::Lock lock(m_state->m_SendQueueMutex); m_SendQueue.tryPushBack(
m_state->m_SendQueue.emplace_back( SendEvent_t{std::make_shared<const routing::PathTransferMessage>(f, replyPath), path});
std::make_shared<const routing::PathTransferMessage>(f, replyPath), path);
} }
} }
@ -927,9 +929,8 @@ namespace llarp
{ {
LogWarn("invalidating convotag T=", frame.T); LogWarn("invalidating convotag T=", frame.T);
RemoveConvoTag(frame.T); RemoveConvoTag(frame.T);
util::Lock lock(m_state->m_SendQueueMutex); m_SendQueue.tryPushBack(
m_state->m_SendQueue.emplace_back( SendEvent_t{std::make_shared<const routing::PathTransferMessage>(f, frame.F), p});
std::make_shared<const routing::PathTransferMessage>(f, frame.F), p);
} }
} }
return true; return true;
@ -1141,7 +1142,7 @@ namespace llarp
void Endpoint::Pump(llarp_time_t) void Endpoint::Pump(llarp_time_t)
{ {
const auto& sessions = m_state->m_SNodeSessions; const auto& sessions = m_state->m_SNodeSessions;
auto& queue = m_state->m_InboundTrafficQueue; auto& queue = m_InboundTrafficQueue;
auto epPump = [&]() { auto epPump = [&]() {
FlushRecvData(); FlushRecvData();
@ -1149,13 +1150,11 @@ namespace llarp
for (const auto& item : sessions) for (const auto& item : sessions)
item.second.first->FlushDownstream(); item.second.first->FlushDownstream();
// send downstream traffic to user for hidden service // send downstream traffic to user for hidden service
util::Lock lock(m_state->m_InboundTrafficQueueMutex);
while (not queue.empty()) while (not queue.empty())
{ {
const auto& msg = queue.top(); auto msg = queue.popFront();
const llarp_buffer_t buf(msg->payload); const llarp_buffer_t buf(msg->payload);
HandleInboundPacket(msg->tag, buf, msg->proto); HandleInboundPacket(msg->tag, buf, msg->proto);
queue.pop();
} }
}; };
@ -1174,16 +1173,15 @@ namespace llarp
// TODO: locking on this container // TODO: locking on this container
for (const auto& item : sessions) for (const auto& item : sessions)
item.second.first->FlushUpstream(); item.second.first->FlushUpstream();
// send queue flush
while (not m_SendQueue.empty())
{ {
util::Lock lock(m_state->m_SendQueueMutex); auto item = m_SendQueue.popFront();
// send outbound traffic
for (const auto& item : m_state->m_SendQueue)
{
item.second->SendRoutingMessage(*item.first, router); item.second->SendRoutingMessage(*item.first, router);
MarkConvoTagActive(item.first->T.T); MarkConvoTagActive(item.first->T.T);
} }
m_state->m_SendQueue.clear();
}
UpstreamFlush(router); UpstreamFlush(router);
router->linkManager().PumpLinks(); router->linkManager().PumpLinks();
} }
@ -1274,9 +1272,8 @@ namespace llarp
LogError("failed to encrypt and sign"); LogError("failed to encrypt and sign");
return; return;
} }
self->m_SendQueue.pushBack(SendEvent_t{transfer, p});
util::Lock lock(self->m_state->m_SendQueueMutex); ;
self->m_state->m_SendQueue.emplace_back(transfer, p);
}); });
return true; return true;
} }

View File

@ -19,6 +19,7 @@
#include <hook/ihook.hpp> #include <hook/ihook.hpp>
#include <util/compare_ptr.hpp> #include <util/compare_ptr.hpp>
#include <util/thread/logic.hpp> #include <util/thread/logic.hpp>
#include <service/endpoint_types.hpp>
#include <service/auth.hpp> #include <service/auth.hpp>
@ -459,6 +460,9 @@ namespace llarp
std::shared_ptr<IAuthPolicy> m_AuthPolicy; std::shared_ptr<IAuthPolicy> m_AuthPolicy;
std::unordered_map<Address, AuthInfo, Address::Hash> m_RemoteAuthInfos; std::unordered_map<Address, AuthInfo, Address::Hash> m_RemoteAuthInfos;
RecvPacketQueue_t m_InboundTrafficQueue;
SendMessageQueue_t m_SendQueue;
void void
FlushRecvData(); FlushRecvData();

View File

@ -40,10 +40,6 @@ namespace llarp
hooks::Backend_ptr m_OnDown; hooks::Backend_ptr m_OnDown;
hooks::Backend_ptr m_OnReady; hooks::Backend_ptr m_OnReady;
util::Mutex m_InboundTrafficQueueMutex; // protects m_InboundTrafficQueue
/// ordered queue for inbound hidden service traffic
RecvPacketQueue_t m_InboundTrafficQueue GUARDED_BY(m_InboundTrafficQueueMutex);
std::set<RouterID> m_SnodeBlacklist; std::set<RouterID> m_SnodeBlacklist;
AbstractRouter* m_Router; AbstractRouter* m_Router;
@ -54,9 +50,6 @@ namespace llarp
std::string m_NetNS; std::string m_NetNS;
bool m_ExitEnabled = false; bool m_ExitEnabled = false;
util::Mutex m_SendQueueMutex; // protects m_SendQueue
std::deque<SendEvent_t> m_SendQueue GUARDED_BY(m_SendQueueMutex);
PendingTraffic m_PendingTraffic; PendingTraffic m_PendingTraffic;
Sessions m_RemoteSessions; Sessions m_RemoteSessions;

View File

@ -5,6 +5,7 @@
#include <service/router_lookup_job.hpp> #include <service/router_lookup_job.hpp>
#include <service/session.hpp> #include <service/session.hpp>
#include <util/compare_ptr.hpp> #include <util/compare_ptr.hpp>
#include <util/thread/queue.hpp>
#include <deque> #include <deque>
#include <memory> #include <memory>
@ -25,15 +26,15 @@ namespace llarp
struct OutboundContext; struct OutboundContext;
using Msg_ptr = std::shared_ptr<const routing::PathTransferMessage>; using Msg_ptr = std::shared_ptr<const routing::PathTransferMessage>;
using SendEvent_t = std::pair<Msg_ptr, path::Path_ptr>; using SendEvent_t = std::pair<Msg_ptr, path::Path_ptr>;
using SendMessageQueue_t = thread::Queue<SendEvent_t>;
using PendingBufferQueue = std::deque<PendingBuffer>; using PendingBufferQueue = std::deque<PendingBuffer>;
using PendingTraffic = std::unordered_map<Address, PendingBufferQueue, Address::Hash>; using PendingTraffic = std::unordered_map<Address, PendingBufferQueue, Address::Hash>;
using ProtocolMessagePtr = std::shared_ptr<ProtocolMessage>; using ProtocolMessagePtr = std::shared_ptr<ProtocolMessage>;
using RecvPacketQueue_t = std::priority_queue< using RecvPacketQueue_t = thread::Queue<ProtocolMessagePtr>;
ProtocolMessagePtr,
std::vector<ProtocolMessagePtr>,
ComparePtr<ProtocolMessagePtr>>;
using PendingRouters = std::unordered_map<RouterID, RouterLookupJob, RouterID::Hash>; using PendingRouters = std::unordered_map<RouterID, RouterLookupJob, RouterID::Hash>;