diff --git a/llarp/ev/ev_libuv.cpp b/llarp/ev/ev_libuv.cpp index b003e76bb..f08fdd9b5 100644 --- a/llarp/ev/ev_libuv.cpp +++ b/llarp/ev/ev_libuv.cpp @@ -10,7 +10,7 @@ namespace libuv /// call a function in logic thread via a handle template < typename Handle, typename Func > void - Call(Handle* h, Func&& f) + Call(Handle* h, Func f) { static_cast< Loop* >(h->loop->data)->Call(f); } @@ -416,7 +416,7 @@ namespace libuv OnTick(uv_check_t* t) { ticker_glue* ticker = static_cast< ticker_glue* >(t->data); - Call(&ticker->m_Ticker, [ticker]() { ticker->func(); }); + Call(t, ticker->func); } bool diff --git a/llarp/router/outbound_message_handler.cpp b/llarp/router/outbound_message_handler.cpp index 0774f974b..abe7edc2a 100644 --- a/llarp/router/outbound_message_handler.cpp +++ b/llarp/router/outbound_message_handler.cpp @@ -71,9 +71,12 @@ namespace llarp void OutboundMessageHandler::Tick() { - ProcessOutboundQueue(); - RemoveEmptyPathQueues(); - SendRoundRobin(); + auto self = this; + m_Killer.TryAccess([self]() { + self->ProcessOutboundQueue(); + self->RemoveEmptyPathQueues(); + self->SendRoundRobin(); + }); } void @@ -265,7 +268,9 @@ namespace llarp void OutboundMessageHandler::RemoveEmptyPathQueues() { - removedSomePaths = (not removedPaths.empty()); + removedSomePaths = false; + if(removedPaths.empty()) + return; while(not removedPaths.empty()) { @@ -275,6 +280,7 @@ namespace llarp outboundMessageQueues.erase(itr); } } + removedSomePaths = true; } void @@ -282,7 +288,7 @@ namespace llarp { // send non-routing messages first priority auto &non_routing_mq = outboundMessageQueues[zeroID]; - while(!non_routing_mq.empty()) + while(not non_routing_mq.empty()) { MessageQueueEntry entry = std::move(non_routing_mq.front()); non_routing_mq.pop(); diff --git a/llarp/router/outbound_message_handler.hpp b/llarp/router/outbound_message_handler.hpp index af18bc9d6..9db745efd 100644 --- a/llarp/router/outbound_message_handler.hpp +++ b/llarp/router/outbound_message_handler.hpp @@ -123,6 +123,8 @@ namespace llarp ILinkManager *_linkManager; std::shared_ptr< Logic > _logic; + util::ContentionKiller m_Killer; + // paths cannot have pathid "0", so it can be used as the "pathid" // for non-traffic (control) messages, so they can be prioritized. static const PathID_t zeroID; diff --git a/llarp/util/thread/threading.cpp b/llarp/util/thread/threading.cpp index 359417a03..74d5c6652 100644 --- a/llarp/util/thread/threading.cpp +++ b/llarp/util/thread/threading.cpp @@ -53,5 +53,13 @@ namespace llarp (void)name; #endif } + + void + ContentionKiller::TryAccess(std::function< void(void) > visit) const + { + NullLock lock(&__access); + visit(); + } + } // namespace util } // namespace llarp diff --git a/llarp/util/thread/threading.hpp b/llarp/util/thread/threading.hpp index 960acc917..a61b3d627 100644 --- a/llarp/util/thread/threading.hpp +++ b/llarp/util/thread/threading.hpp @@ -156,6 +156,16 @@ namespace llarp return ::getpid(); #endif } + + // type for detecting contention on a resource + struct ContentionKiller + { + void + TryAccess(std::function< void(void) > visit) const; + + private: + mutable NullMutex __access; + }; } // namespace util } // namespace llarp