Add threading annotations and fix potential deadlocks

pull/371/head
Michael 5 years ago
parent c5a129ddff
commit 61f3273dc4
No known key found for this signature in database
GPG Key ID: 2D51757B47E2434C

@ -72,6 +72,10 @@ add_compile_options(-Wvla)
add_compile_options($<$<COMPILE_LANGUAGE:CXX>:-fpermissive>)
add_compile_options(-Wno-unused-function -Wno-deprecated-declarations -Wno-unknown-pragmas)
if (USING_CLANG)
add_compile_options(-Wthread-safety -Wthread-safety-negative)
endif()
if (WITH_COVERAGE)
if (USING_CLANG)
add_compile_options( -fprofile-instr-generate -fcoverage-mapping )

@ -14,6 +14,7 @@
#include <algorithm>
#include <deque>
#include <list>
#include <future>
#include <unistd.h>
#ifdef _WIN32

@ -9,6 +9,8 @@
#include <util/codel.hpp>
#include <util/threading.hpp>
#include <future>
namespace llarp
{
namespace handlers

@ -36,6 +36,7 @@ namespace llarp
ILinkLayer::ForEachSession(
std::function< void(const ILinkSession*) > visit) const
{
Lock l(&m_AuthedLinksMutex);
auto itr = m_AuthedLinks.begin();
while(itr != m_AuthedLinks.end())
{
@ -48,6 +49,7 @@ namespace llarp
ILinkLayer::VisitSessionByPubkey(const RouterID& pk,
std::function< bool(ILinkSession*) > visit)
{
Lock l(&m_AuthedLinksMutex);
auto itr = m_AuthedLinks.find(pk);
if(itr != m_AuthedLinks.end())
{
@ -59,6 +61,7 @@ namespace llarp
void
ILinkLayer::ForEachSession(std::function< void(ILinkSession*) > visit)
{
Lock l(&m_AuthedLinksMutex);
auto itr = m_AuthedLinks.begin();
while(itr != m_AuthedLinks.end())
{
@ -166,6 +169,7 @@ namespace llarp
void
ILinkLayer::RemovePending(ILinkSession* s)
{
Lock l(&m_PendingMutex);
llarp::Addr remote = s->GetRemoteEndpoint();
m_Pending.erase(remote);
}
@ -175,16 +179,22 @@ namespace llarp
{
std::vector< util::StatusObject > pending, established;
std::transform(m_Pending.begin(), m_Pending.end(),
std::back_inserter(pending),
[](const auto& item) -> util::StatusObject {
return item.second->ExtractStatus();
});
std::transform(m_AuthedLinks.begin(), m_AuthedLinks.end(),
std::back_inserter(established),
[](const auto& item) -> util::StatusObject {
return item.second->ExtractStatus();
});
{
Lock l(&m_PendingMutex);
std::transform(m_Pending.cbegin(), m_Pending.cend(),
std::back_inserter(pending),
[](const auto& item) -> util::StatusObject {
return item.second->ExtractStatus();
});
}
{
Lock l(&m_AuthedLinksMutex);
std::transform(m_AuthedLinks.cbegin(), m_AuthedLinks.cend(),
std::back_inserter(established),
[](const auto& item) -> util::StatusObject {
return item.second->ExtractStatus();
});
}
return {{"name", Name()},
{"rank", uint64_t(Rank())},

@ -67,10 +67,12 @@ namespace llarp
HasSessionVia(const Addr& addr);
void
ForEachSession(std::function< void(const ILinkSession*) > visit) const;
ForEachSession(std::function< void(const ILinkSession*) > visit) const
LOCKS_EXCLUDED(m_AuthedLinksMutex);
void
ForEachSession(std::function< void(ILinkSession*) > visit);
ForEachSession(std::function< void(ILinkSession*) > visit)
LOCKS_EXCLUDED(m_AuthedLinksMutex);
static void
udp_tick(llarp_udp_io* udp)
@ -127,7 +129,7 @@ namespace llarp
Name() const = 0;
util::StatusObject
ExtractStatus() const override;
ExtractStatus() const override LOCKS_EXCLUDED(m_AuthedLinksMutex);
void
CloseSessionTo(const RouterID& remote);
@ -143,7 +145,8 @@ namespace llarp
bool
VisitSessionByPubkey(const RouterID& pk,
std::function< bool(ILinkSession*) > visit);
std::function< bool(ILinkSession*) > visit)
LOCKS_EXCLUDED(m_AuthedLinksMutex);
virtual uint16_t
Rank() const = 0;
@ -195,7 +198,7 @@ namespace llarp
/// called by link session to remove a pending session who is timed out
void
RemovePending(ILinkSession* s);
RemovePending(ILinkSession* s) LOCKS_EXCLUDED(m_PendingMutex);
private:
static void
@ -229,14 +232,16 @@ namespace llarp
llarp_udp_io m_udp;
SecretKey m_SecretKey;
Mutex m_AuthedLinksMutex;
Mutex m_AuthedLinksMutex
ACQUIRED_BEFORE(m_PendingMutex); // protects m_AuthedLinks
std::unordered_multimap< RouterID, std::unique_ptr< ILinkSession >,
RouterID::Hash >
m_AuthedLinks;
Mutex m_PendingMutex;
m_AuthedLinks GUARDED_BY(m_AuthedLinksMutex);
Mutex m_PendingMutex
ACQUIRED_AFTER(m_AuthedLinksMutex); // protects m_Pending
std::unordered_map< llarp::Addr, std::unique_ptr< ILinkSession >,
llarp::Addr::Hash >
m_Pending;
m_Pending GUARDED_BY(m_PendingMutex);
};
} // namespace llarp

@ -35,14 +35,9 @@ llarp_nodedb::Clear()
}
bool
llarp_nodedb::Get(const llarp::RouterID &pk, llarp::RouterContact &result,
bool lock)
llarp_nodedb::Get(const llarp::RouterID &pk, llarp::RouterContact &result)
{
absl::optional< llarp::util::Lock > l;
if(lock)
{
l.emplace(&access);
}
llarp::util::Lock l(&access);
auto itr = entries.find(pk);
if(itr == entries.end())
return false;
@ -364,7 +359,7 @@ int
llarp_nodedb::iterate_all(struct llarp_nodedb_iter i)
{
iterate(i);
return entries.size();
return num_loaded();
}
/// maybe rename to verify_and_set
@ -390,6 +385,7 @@ llarp_nodedb_async_load_rc(struct llarp_async_load_rc *job)
size_t
llarp_nodedb::num_loaded() const
{
absl::ReaderMutexLock l(&access);
return entries.size();
}

@ -5,6 +5,9 @@
#include <router_id.hpp>
#include <util/common.hpp>
#include <util/fs.hpp>
#include <util/threading.hpp>
#include <absl/base/thread_annotations.h>
/**
* nodedb.hpp
@ -42,31 +45,31 @@ struct llarp_nodedb
llarp::Crypto *crypto;
llarp_threadpool *disk;
llarp::util::Mutex access;
mutable llarp::util::Mutex access; // protects entries
std::unordered_map< llarp::RouterID, llarp::RouterContact,
llarp::RouterID::Hash >
entries;
entries GUARDED_BY(access);
fs::path nodePath;
bool
Remove(const llarp::RouterID &pk);
Remove(const llarp::RouterID &pk) LOCKS_EXCLUDED(access);
void
Clear();
Clear() LOCKS_EXCLUDED(access);
bool
Get(const llarp::RouterID &pk, llarp::RouterContact &result,
bool lock = true);
Get(const llarp::RouterID &pk, llarp::RouterContact &result)
LOCKS_EXCLUDED(access);
bool
Has(const llarp::RouterID &pk);
Has(const llarp::RouterID &pk) LOCKS_EXCLUDED(access);
std::string
getRCFilePath(const llarp::RouterID &pubkey) const;
/// insert and write to disk
bool
Insert(const llarp::RouterContact &rc);
Insert(const llarp::RouterContact &rc) LOCKS_EXCLUDED(access);
/// insert and write to disk in background
void
@ -79,13 +82,14 @@ struct llarp_nodedb
loadSubdir(const fs::path &dir);
bool
loadfile(const fs::path &fpath);
loadfile(const fs::path &fpath) LOCKS_EXCLUDED(access);
void
visit(std::function< bool(const llarp::RouterContact &) > visit);
visit(std::function< bool(const llarp::RouterContact &) > visit)
LOCKS_EXCLUDED(access);
bool
iterate(llarp_nodedb_iter &i);
iterate(llarp_nodedb_iter &i) LOCKS_EXCLUDED(access);
void
set_dir(const char *dir);
@ -99,14 +103,15 @@ struct llarp_nodedb
iterate_all(llarp_nodedb_iter i);
size_t
num_loaded() const;
num_loaded() const LOCKS_EXCLUDED(access);
bool
select_random_exit(llarp::RouterContact &rc);
select_random_exit(llarp::RouterContact &rc) LOCKS_EXCLUDED(access);
bool
select_random_hop(const llarp::RouterContact &prev,
llarp::RouterContact &result, size_t N);
llarp::RouterContact &result, size_t N)
LOCKS_EXCLUDED(access);
static bool
ensure_dir(const char *dir);

@ -653,12 +653,20 @@ namespace llarp
using TransitHopsMap_t =
std::multimap< PathID_t, std::shared_ptr< TransitHop > >;
using SyncTransitMap_t = std::pair< util::Mutex, TransitHopsMap_t >;
struct SyncTransitMap_t
{
util::Mutex first; // protects second
TransitHopsMap_t second GUARDED_BY(first);
};
// maps path id -> pathset owner of path
using OwnedPathsMap_t = std::map< PathID_t, PathSet* >;
using SyncOwnedPathsMap_t = std::pair< util::Mutex, OwnedPathsMap_t >;
struct SyncOwnedPathsMap_t
{
util::Mutex first; // protects second
OwnedPathsMap_t second GUARDED_BY(first);
};
llarp_threadpool*
Worker();

@ -6,8 +6,9 @@
#include <routing/message.hpp>
#include <service/IntroSet.hpp>
#include <service/lookup.hpp>
#include <util/time.hpp>
#include <util/status.hpp>
#include <util/threading.hpp>
#include <util/time.hpp>
#include <functional>
#include <list>

@ -97,12 +97,12 @@ namespace llarp
bool
Profiling::Save(const char* fname)
{
lock_t lock(&m_ProfilesMutex);
absl::ReaderMutexLock lock(&m_ProfilesMutex);
size_t sz = (m_Profiles.size() * (RouterProfile::MaxSize + 32 + 8)) + 8;
std::vector< byte_t > tmp(sz, 0);
llarp_buffer_t buf(tmp);
auto res = BEncode(&buf);
auto res = BEncodeNoLock(&buf);
if(res)
{
buf.sz = buf.cur - buf.base;
@ -118,9 +118,17 @@ namespace llarp
bool
Profiling::BEncode(llarp_buffer_t* buf) const
{
absl::ReaderMutexLock lock(&m_ProfilesMutex);
return BEncodeNoLock(buf);
}
bool
Profiling::BEncodeNoLock(llarp_buffer_t* buf) const
{
if(!bencode_start_dict(buf))
return false;
auto itr = m_Profiles.begin();
while(itr != m_Profiles.end())
{
@ -142,6 +150,7 @@ namespace llarp
if(!profile.BDecode(buf))
return false;
RouterID pk = k.base;
absl::WriterMutexLock l(&m_ProfilesMutex);
return m_Profiles.emplace(pk, profile).second;
}

@ -6,6 +6,7 @@
#include <util/bencode.hpp>
#include <util/threading.hpp>
#include <absl/base/thread_annotations.h>
#include <map>
namespace llarp
@ -43,37 +44,40 @@ namespace llarp
}
bool
IsBad(const RouterID& r, uint64_t chances = 2);
IsBad(const RouterID& r, uint64_t chances = 2)
LOCKS_EXCLUDED(m_ProfilesMutex);
void
MarkSuccess(const RouterID& r);
MarkTimeout(const RouterID& r) LOCKS_EXCLUDED(m_ProfilesMutex);
void
MarkTimeout(const RouterID& r);
MarkSuccess(const RouterID& r) LOCKS_EXCLUDED(m_ProfilesMutex);
void
MarkPathFail(path::Path* p) LOCKS_EXCLUDED(m_ProfilesMutex);
void
MarkPathSuccess(path::Path* p) LOCKS_EXCLUDED(m_ProfilesMutex);
bool
BEncode(llarp_buffer_t* buf) const override;
BEncode(llarp_buffer_t* buf) const override LOCKS_EXCLUDED(m_ProfilesMutex);
bool
DecodeKey(const llarp_buffer_t& k, llarp_buffer_t* buf) override;
bool
Load(const char* fname);
Load(const char* fname) LOCKS_EXCLUDED(m_ProfilesMutex);
bool
Save(const char* fname);
void
MarkPathFail(path::Path* p);
void
MarkPathSuccess(path::Path* p);
Save(const char* fname) LOCKS_EXCLUDED(m_ProfilesMutex);
private:
using lock_t = llarp::util::Lock;
using mtx_t = llarp::util::Mutex;
mtx_t m_ProfilesMutex;
std::map< RouterID, RouterProfile > m_Profiles;
bool
BEncodeNoLock(llarp_buffer_t* buf) const
SHARED_LOCKS_REQUIRED(m_ProfilesMutex);
using lock_t = util::Lock;
mutable util::Mutex m_ProfilesMutex; // protects m_Profiles
std::map< RouterID, RouterProfile > m_Profiles GUARDED_BY(m_ProfilesMutex);
};
} // namespace llarp

@ -277,6 +277,7 @@ namespace llarp
bool
Router::GetRandomGoodRouter(RouterID &router)
{
absl::ReaderMutexLock l(&nodedb()->access);
auto sz = nodedb()->entries.size();
if(sz == 0)
return false;

@ -32,12 +32,12 @@ namespace llarp
struct CoDelQueue
{
CoDelQueue(const std::string& name, const PutTime& put, const GetNow& now)
: m_name(name), _putTime(put), _getNow(now)
: m_QueueIdx(0), m_name(name), _putTime(put), _getNow(now)
{
}
size_t
Size()
Size() LOCKS_EXCLUDED(m_QueueMutex)
{
Lock_t lock(m_QueueMutex);
return m_QueueIdx;
@ -46,6 +46,7 @@ namespace llarp
template < typename... Args >
bool
EmplaceIf(std::function< bool(T&) > pred, Args&&... args)
LOCKS_EXCLUDED(m_QueueMutex)
{
Lock_t lock(&m_QueueMutex);
if(m_QueueIdx == MaxSize)
@ -68,7 +69,7 @@ namespace llarp
template < typename... Args >
void
Emplace(Args&&... args)
Emplace(Args&&... args) LOCKS_EXCLUDED(m_QueueMutex)
{
Lock_t lock(&m_QueueMutex);
if(m_QueueIdx == MaxSize)
@ -90,7 +91,7 @@ namespace llarp
template < typename Visit, typename Filter >
void
Process(Visit visitor, Filter f)
Process(Visit visitor, Filter f) LOCKS_EXCLUDED(m_QueueMutex)
{
llarp_time_t lowest = std::numeric_limits< llarp_time_t >::max();
if(_getNow() < nextTickAt)
@ -149,8 +150,8 @@ namespace llarp
llarp_time_t nextTickInterval = initialIntervalMs;
llarp_time_t nextTickAt = 0;
Mutex_t m_QueueMutex;
size_t m_QueueIdx = 0;
T m_Queue[MaxSize];
size_t m_QueueIdx GUARDED_BY(m_QueueMutex);
std::array< T, MaxSize > m_Queue GUARDED_BY(m_QueueMutex);
std::string m_name;
GetTime _getTime;
PutTime _putTime;

@ -1,7 +1,6 @@
#ifndef LLARP_LOGGER_HPP
#define LLARP_LOGGER_HPP
#include <util/threading.hpp>
#include <util/time.hpp>
#ifdef _WIN32
@ -20,6 +19,7 @@
#include <iostream>
#include <sstream>
#include <string>
#include <thread>
namespace llarp
{
@ -40,8 +40,6 @@ namespace llarp
std::ostream& out;
std::function< void(const std::string&) > customLog;
llarp::util::Mutex access;
#ifdef _WIN32
bool isConsoleModern =
true; // qol fix so oldfag clients don't see ugly escapes

@ -1,6 +1,8 @@
#include <util/queue_manager.hpp>
#include <util/threading.hpp>
#include <thread>
namespace llarp
{
namespace thread

@ -4,8 +4,6 @@ namespace llarp
{
namespace thread
{
using LockGuard = std::unique_lock< std::mutex >;
void
ThreadPool::join()
{
@ -64,25 +62,22 @@ namespace llarp
void
ThreadPool::waitThreads()
{
LockGuard lock(m_gateMutex);
m_threadsReadyCond.wait(
lock, [this]() { return m_numThreadsReady == m_threads.size(); });
util::Lock lock(&m_gateMutex);
m_gateMutex.Await(absl::Condition(this, &ThreadPool::allThreadsReady));
}
void
ThreadPool::releaseThreads()
{
LockGuard lock(m_gateMutex);
util::Lock lock(&m_gateMutex);
m_numThreadsReady = 0;
++m_gateCount;
m_gateCond.notify_all();
}
void
ThreadPool::interrupt()
{
LockGuard lock(m_gateMutex);
util::Lock lock(&m_gateMutex);
size_t count = m_idleThreads;
@ -95,16 +90,22 @@ namespace llarp
void
ThreadPool::worker()
{
size_t gateCount = m_gateCount;
// Lock will be valid until the end of the statement
size_t gateCount = (absl::ReaderMutexLock(&m_gateMutex), m_gateCount);
for(;;)
{
{
LockGuard lock(m_gateMutex);
util::Lock lock(&m_gateMutex);
++m_numThreadsReady;
m_threadsReadyCond.notify_one();
m_gateCond.wait(lock, [&]() { return gateCount != m_gateCount; });
using CondArg = std::pair< size_t, ThreadPool* >;
CondArg args(gateCount, this);
m_gateMutex.Await(absl::Condition(
+[](CondArg* x) SHARED_LOCKS_REQUIRED(x->second->m_gateMutex) {
return x->first != x->second->m_gateCount;
},
&args));
gateCount = m_gateCount;
}
@ -230,7 +231,7 @@ namespace llarp
void
ThreadPool::drain()
{
LockGuard lock(m_mutex);
util::Lock lock(&m_mutex);
if(m_status.load(std::memory_order_relaxed) == Status::Run)
{
@ -248,7 +249,7 @@ namespace llarp
void
ThreadPool::shutdown()
{
LockGuard lock(m_mutex);
util::Lock lock(&m_mutex);
if(m_status.load(std::memory_order_relaxed) == Status::Run)
{
@ -265,7 +266,7 @@ namespace llarp
bool
ThreadPool::start()
{
LockGuard lock(m_mutex);
util::Lock lock(&m_mutex);
if(m_status.load(std::memory_order_relaxed) != Status::Stop)
{
@ -301,7 +302,7 @@ namespace llarp
void
ThreadPool::stop()
{
LockGuard lock(m_mutex);
util::Lock lock(&m_mutex);
if(m_status.load(std::memory_order_relaxed) == Status::Run)
{

@ -4,6 +4,7 @@
#include <util/queue.hpp>
#include <util/threading.hpp>
#include <atomic>
#include <functional>
#include <thread>
#include <vector>
@ -36,17 +37,15 @@ namespace llarp
std::atomic_size_t m_idleThreads; // Number of idle threads
std::mutex m_mutex;
util::Mutex m_mutex;
std::atomic< Status > m_status;
size_t m_gateCount;
size_t m_numThreadsReady; // Threads ready to go through the gate.
size_t m_gateCount GUARDED_BY(m_gateMutex);
size_t m_numThreadsReady
GUARDED_BY(m_gateMutex); // Threads ready to go through the gate.
std::mutex m_gateMutex;
std::condition_variable m_threadsReadyCond;
std::condition_variable m_gateCond;
util::Mutex m_gateMutex;
std::vector< std::thread > m_threads;
size_t m_createdThreads;
@ -75,6 +74,12 @@ namespace llarp
bool
spawn();
bool
allThreadsReady() const SHARED_LOCKS_REQUIRED(m_gateMutex)
{
return m_numThreadsReady == m_threads.size();
}
public:
ThreadPool(size_t numThreads, size_t maxJobs);

@ -3,30 +3,26 @@
#include <absl/synchronization/barrier.h>
#include <absl/synchronization/mutex.h>
// We only support posix threads:
// MSYS2 has a full native C++11 toolset, and a suitable
// cross-compilation system can be assembled on Linux and UNIX
// Last time i checked, Red Hat has this problem (no libpthread)
// Not sure about the other distros generally -rick
#include <condition_variable>
#include <thread>
#include <future>
#include <memory>
#include <cassert>
#include <absl/time/time.h>
namespace llarp
{
namespace util
{
/// a mutex that does nothing
struct NullMutex
struct LOCKABLE NullMutex
{
};
/// a lock that does nothing
struct NullLock
struct SCOPED_LOCKABLE NullLock
{
NullLock(__attribute__((unused)) NullMutex* mtx)
NullLock(__attribute__((unused)) const NullMutex* mtx)
EXCLUSIVE_LOCK_FUNCTION(mtx)
{
}
~NullLock() UNLOCK_FUNCTION()
{
}
};
@ -38,9 +34,14 @@ namespace llarp
class Semaphore
{
private:
Mutex m_mutex;
Condition m_cv;
size_t m_count;
Mutex m_mutex; // protects m_count
size_t m_count GUARDED_BY(m_mutex);
bool
ready() const SHARED_LOCKS_REQUIRED(m_mutex)
{
return m_count > 0;
}
public:
Semaphore(size_t count) : m_count(count)
@ -48,37 +49,30 @@ namespace llarp
}
void
notify()
notify() LOCKS_EXCLUDED(m_mutex)
{
Lock lock(&m_mutex);
m_count++;
m_cv.Signal();
}
void
wait()
wait() LOCKS_EXCLUDED(m_mutex)
{
Lock lock(&m_mutex);
while(this->m_count == 0)
{
m_cv.Wait(&m_mutex);
}
m_mutex.Await(absl::Condition(this, &Semaphore::ready));
m_count--;
}
bool
waitFor(absl::Duration timeout)
waitFor(absl::Duration timeout) LOCKS_EXCLUDED(m_mutex)
{
Lock lock(&m_mutex);
while(this->m_count == 0)
if(!m_mutex.AwaitWithTimeout(absl::Condition(this, &Semaphore::ready),
timeout))
{
if(m_cv.WaitWithTimeout(&m_mutex, timeout))
{
return false;
}
return false;
}
m_count--;

@ -70,7 +70,7 @@ llarp_threadpool_queue_job(struct llarp_threadpool *pool,
void
llarp_threadpool_tick(struct llarp_threadpool *pool)
{
while(pool->jobs.size())
while(pool->size())
{
std::function< void(void) > job;
{
@ -79,7 +79,9 @@ llarp_threadpool_tick(struct llarp_threadpool *pool)
pool->jobs.pop();
}
if(job)
{
job();
}
}
}

@ -4,6 +4,7 @@
#include <util/threading.hpp>
#include <util/threadpool.hpp>
#include <absl/base/thread_annotations.h>
#include <memory>
#include <queue>
@ -11,13 +12,12 @@ struct llarp_threadpool
{
std::unique_ptr< llarp::thread::ThreadPool > impl;
llarp::util::Mutex m_access;
uint32_t ids = 0;
std::queue< std::function< void(void) > > jobs;
mutable llarp::util::Mutex m_access; // protects jobs
std::queue< std::function< void(void) > > jobs GUARDED_BY(m_access);
llarp_threadpool(int workers, const char *name)
: impl(std::make_unique< llarp::thread::ThreadPool >(workers,
workers * 128))
: impl(
std::make_unique< llarp::thread::ThreadPool >(workers, workers * 128))
{
(void)name;
}
@ -25,6 +25,13 @@ struct llarp_threadpool
llarp_threadpool()
{
}
size_t
size() const LOCKS_EXCLUDED(m_access)
{
absl::ReaderMutexLock l(&m_access);
return jobs.size();
}
};
struct llarp_threadpool *

@ -9,9 +9,6 @@ namespace llarp
{
namespace thread
{
using mtx_t = util::Mutex;
using lock_t = util::Lock;
using Pool = ThreadPool;
struct IsolatedPool : public Pool

@ -56,8 +56,9 @@ namespace llarp
struct llarp_timer_context
{
llarp::util::Mutex timersMutex;
std::unordered_map< uint32_t, std::unique_ptr< llarp::timer > > timers;
llarp::util::Mutex timersMutex; // protects timers
std::unordered_map< uint32_t, std::unique_ptr< llarp::timer > > timers
GUARDED_BY(timersMutex);
std::priority_queue< std::unique_ptr< llarp::timer > > calling;
llarp::util::Mutex tickerMutex;
std::unique_ptr< llarp::util::Condition > ticker;
@ -90,7 +91,7 @@ struct llarp_timer_context
}
void
cancel(uint32_t id)
cancel(uint32_t id) LOCKS_EXCLUDED(timersMutex)
{
llarp::util::Lock lock(&timersMutex);
const auto& itr = timers.find(id);
@ -100,7 +101,7 @@ struct llarp_timer_context
}
void
remove(uint32_t id)
remove(uint32_t id) LOCKS_EXCLUDED(timersMutex)
{
llarp::util::Lock lock(&timersMutex);
const auto& itr = timers.find(id);
@ -112,6 +113,7 @@ struct llarp_timer_context
uint32_t
call_later(void* user, llarp_timer_handler_func func, uint64_t timeout_ms)
LOCKS_EXCLUDED(timersMutex)
{
llarp::util::Lock lock(&timersMutex);
@ -122,7 +124,7 @@ struct llarp_timer_context
}
void
cancel_all()
cancel_all() LOCKS_EXCLUDED(timersMutex)
{
std::list< uint32_t > ids;
@ -174,6 +176,7 @@ llarp_timer_stop(struct llarp_timer_context* t)
{
// destroy all timers
// don't call callbacks on timers
llarp::util::Lock lock(&t->timersMutex);
t->timers.clear();
t->stop();
if(t->ticker)

@ -2,15 +2,15 @@
#include <util/threading.hpp>
#include <array>
#include <thread>
#include <functional>
#include <thread>
#include <gtest/gtest.h>
using namespace llarp;
using namespace llarp::thread;
using LockGuard = std::unique_lock< std::mutex >;
using LockGuard = absl::MutexLock;
class Element
{
@ -49,7 +49,7 @@ class Args
public:
std::condition_variable startCond;
std::condition_variable runCond;
std::mutex mutex;
absl::Mutex mutex;
ObjQueue queue;
@ -71,18 +71,29 @@ class Args
, endSignal(0)
{
}
bool
signal() const
{
return !!runSignal;
}
};
using CondArgs = std::pair< Args*, size_t >;
bool
waitFunc(CondArgs* a)
{
return a->first->count != a->second;
};
void
popFrontTester(Args& args)
{
{
LockGuard guard(args.mutex);
LockGuard guard(&args.mutex);
args.count++;
args.startCond.notify_one();
args.runCond.wait(guard, [&args]() { return !!args.runSignal; });
args.mutex.Await(absl::Condition(&args, &Args::signal));
}
for(;;)
@ -99,12 +110,10 @@ void
pushBackTester(Args& args)
{
{
LockGuard guard(args.mutex);
LockGuard guard(&args.mutex);
args.count++;
args.startCond.notify_one();
args.runCond.wait(guard, [&args]() { return !!args.runSignal; });
args.mutex.Await(absl::Condition(&args, &Args::signal));
}
for(size_t i = 0; i < args.iterations; ++i)
@ -274,18 +283,19 @@ TEST(TestQueue, singleProducerManyConsumer)
Args args{iterations};
LockGuard lock(args.mutex);
for(size_t i = 0; i < threads.size(); ++i)
{
threads[i] = std::thread(std::bind(&popFrontTester, std::ref(args)));
LockGuard lock(&args.mutex);
args.startCond.wait(lock, [&args, i]() { return args.count == (i + 1); });
}
for(size_t i = 0; i < threads.size(); ++i)
{
threads[i] = std::thread(std::bind(&popFrontTester, std::ref(args)));
args.runSignal++;
args.runCond.notify_all();
lock.unlock();
CondArgs cArgs(&args, i + 1);
args.mutex.Await(absl::Condition(&waitFunc, &cArgs));
}
args.runSignal++;
}
for(size_t i = 0; i < iterations; ++i)
{
@ -316,27 +326,28 @@ TEST(TestQueue, manyProducerManyConsumer)
Args args{iterations};
LockGuard lock(args.mutex);
for(size_t i = 0; i < numThreads; ++i)
{
threads[i] = std::thread(std::bind(&popFrontTester, std::ref(args)));
LockGuard lock(&args.mutex);
args.startCond.wait(lock, [&]() { return args.count == (i + 1); });
}
for(size_t i = 0; i < numThreads; ++i)
{
threads[i] = std::thread(std::bind(&popFrontTester, std::ref(args)));
for(size_t i = 0; i < numThreads; ++i)
{
threads[i + numThreads] =
std::thread(std::bind(&pushBackTester, std::ref(args)));
CondArgs cArgs(&args, i + 1);
args.mutex.Await(absl::Condition(+waitFunc, &cArgs));
}
args.startCond.wait(lock,
[&]() { return args.count == (numThreads + i + 1); });
}
for(size_t i = 0; i < numThreads; ++i)
{
threads[i + numThreads] =
std::thread(std::bind(&pushBackTester, std::ref(args)));
args.runSignal++;
args.runCond.notify_all();
lock.unlock();
CondArgs cArgs(&args, numThreads + i + 1);
args.mutex.Await(absl::Condition(+waitFunc, &cArgs));
}
args.runSignal++;
}
for(auto& thread : threads)
{

Loading…
Cancel
Save