Merge remote-tracking branch 'origin/staging'

This commit is contained in:
Jeff Becker 2019-03-26 10:06:59 -04:00
commit f2ee8fb52b
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05
28 changed files with 7276 additions and 2 deletions

View File

@ -189,7 +189,7 @@ if(NOT DEBIAN)
endif(NOT DEBIAN)
if(ASAN)
set(DEBUG_FLAGS ${DEBUG_FLAGS} -fsanitize=thread -fno-omit-frame-pointer)
set(DEBUG_FLAGS ${DEBUG_FLAGS} -fsanitize=address -fno-omit-frame-pointer)
set(OPTIMIZE_FLAGS "-O0")
endif(ASAN)

View File

@ -3,6 +3,9 @@
#include <llarp.h>
#include <util/fs.hpp>
#include <util/logger.hpp>
#include <util/metrics_core.hpp>
#include <util/metrics_publishers.hpp>
#include <util/scheduler.hpp>
#include <getopt.h>
#include <signal.h>
@ -248,10 +251,24 @@ main(int argc, char *argv[])
#ifndef _WIN32
signal(SIGHUP, handle_signal);
#endif
llarp::thread::Scheduler scheduler;
llarp::metrics::DefaultManagerGuard metricsGuard;
llarp::metrics::PublisherScheduler publisherScheduler(
scheduler, metricsGuard.instance());
metricsGuard.instance()->addGlobalPublisher(
std::make_shared< llarp::metrics::StreamPublisher >(std::cout));
publisherScheduler.setDefault(absl::Seconds(30));
scheduler.start();
code = llarp_main_setup(ctx);
if(code == 0)
code = llarp_main_run(ctx);
llarp_main_free(ctx);
scheduler.stop();
}
#ifdef _WIN32
::WSACleanup();

View File

@ -17,10 +17,17 @@ set(LIB_UTIL_SRC
util/logger.cpp
util/logic.cpp
util/mem.cpp
util/metrics_core.cpp
util/metrics_publishers.cpp
util/metrics_types.cpp
util/metrics.cpp
util/object.cpp
util/printer.cpp
util/queue_manager.cpp
util/queue.cpp
util/printer.cpp
util/scheduler.cpp
util/status.cpp
util/stopwatch.cpp
util/str.cpp
util/string_view.cpp
util/thread_pool.cpp
@ -28,6 +35,7 @@ set(LIB_UTIL_SRC
util/threadpool.cpp
util/time.cpp
util/timer.cpp
util/timerqueue.cpp
util/traits.cpp
util/types.cpp
)

View File

@ -15,6 +15,7 @@
#include <util/buffer.hpp>
#include <util/encode.hpp>
#include <util/logger.hpp>
#include <util/metrics.hpp>
#include <util/str.hpp>
#include <fstream>
@ -1136,12 +1137,14 @@ namespace llarp
bool
Router::Sign(Signature &sig, const llarp_buffer_t &buf) const
{
METRICS_TIME_BLOCK("Router", "Sign");
return crypto()->sign(sig, identity(), buf);
}
void
Router::SendTo(RouterID remote, const ILinkMessage *msg, ILinkLayer *selected)
{
METRICS_TIME_BLOCK("RouterSendTo", remote.ToString().c_str());
llarp_buffer_t buf(linkmsg_buffer);
if(!msg->BEncode(&buf))

1
llarp/util/metrics.cpp Normal file
View File

@ -0,0 +1 @@
#include <util/metrics.hpp>

179
llarp/util/metrics.hpp Normal file
View File

@ -0,0 +1,179 @@
#ifndef LLARP_METRICS_HPP
#define LLARP_METRICS_HPP
#include <util/metrics_types.hpp>
#include <util/metrics_core.hpp>
namespace llarp
{
namespace metrics
{
struct MetricsHelper
{
static void
initContainer(CategoryContainer& container, const char* category)
{
Manager* manager = DefaultManager::instance();
Registry& registry = manager->registry();
registry.registerContainer(registry.get(category), container);
}
static void
setType(const Id& id, Publication::Type type)
{
Manager* manager = DefaultManager::instance();
return manager->registry().publicationType(id, type);
}
};
} // namespace metrics
} // namespace llarp
// Some MSVC flags mess with __LINE__, but __COUNTER__ is better anyway
#ifdef _MSC_VER
#define METRICS_UNIQ_NUMBER __COUNTER__
#else
#define METRICS_UNIQ_NUMBER __LINE__
#endif
// Use a level of indirection to force the preprocessor to expand args first.
#define METRICS_NAME_CAT_IMP(X, Y) X##Y
#define METRICS_NAME_CAT(X, Y) METRICS_NAME_CAT_IMP(X, Y)
#define METRICS_UNIQUE_NAME(X) METRICS_NAME_CAT(X, METRICS_UNIQ_NUMBER)
#define METRICS_TIME_BLOCK_IMP(CAT, METRIC, VAR_NAME) \
llarp::metrics::DoubleCollector* VAR_NAME = nullptr; \
if(llarp::metrics::DefaultManager::instance()) \
{ \
using namespace llarp::metrics; \
CollectorRepo& repo = DefaultManager::instance()->collectorRepo(); \
VAR_NAME = repo.defaultDoubleCollector((CAT), (METRIC)); \
} \
llarp::metrics::TimerGuard METRICS_UNIQUE_NAME(timer_guard)(VAR_NAME);
#define METRICS_TIME_BLOCK(CAT, METRIC) \
METRICS_TIME_BLOCK_IMP(CAT, METRIC, METRICS_UNIQUE_NAME(time_block))
#define METRICS_IF_CATEGORY_ENABLED_IMP(CAT, NAME) \
static llarp::metrics::CategoryContainer NAME = {false, nullptr, nullptr}; \
if(!NAME.category() && llarp::metrics::DefaultManager::instance()) \
{ \
llarp::metrics::MetricsHelper::initContainer(NAME, CAT); \
} \
if(NAME.enabled())
#define METRICS_IF_CATEGORY_ENABLED(CAT) \
BALM_METRICS_IF_CATEGORY_ENABLED_IMP(CAT, METRICS_UNIQUE_NAME(Container))
// For when the category/metric may change during the program run
#define METRICS_DYNAMIC_INT_UPDATE(CAT, METRIC, VALUE) \
do \
{ \
using namespace llarp::metrics; \
if(DefaultManager::instance()) \
{ \
CollectorRepo& repository = DefaultManager::instance()->collectorRepo(); \
IntCollector* collector = \
repository.defaultIntCollector((CAT), (METRIC)); \
if(collector->id().category()->enabled()) \
{ \
collector->tick((VALUE)); \
} \
} \
} while(false)
// For when the category/metric remain static
#define METRICS_INT_UPDATE(CAT, METRIC, VALUE) \
do \
{ \
using namespace llarp::metrics; \
static CategoryContainer container = {false, nullptr, nullptr}; \
static IntCollector* collector = nullptr; \
if(container.category() == nullptr && DefaultManager::instance()) \
{ \
collector = MetricHelper::getIntCollector(CAT, METRIC); \
MetricHelper::initContainer(container, CAT); \
} \
if(container.enabled()) \
{ \
collector->tick(VALUE); \
} \
} while(false)
#define METRICS_TYPED_INT_UPDATE(CAT, METRIC, VALUE, TYPE) \
do \
{ \
using namespace llarp::metrics; \
static CategoryContainer container = {false, nullptr, nullptr}; \
static IntCollector* collector = nullptr; \
if(container.category() == nullptr && DefaultManager::instance()) \
{ \
collector = MetricHelper::getIntCollector(CAT, METRIC); \
MetricHelper::setType(collector->id(), TYPE); \
MetricHelper::initContainer(container, CAT); \
} \
if(container.enabled()) \
{ \
collector->tick(VALUE); \
} \
} while(false)
// For when the category/metric may change during the program run
#define METRICS_DYNAMIC_UPDATE(CAT, METRIC, VALUE) \
do \
{ \
using namespace llarp::metrics; \
if(DefaultManager::instance()) \
{ \
CollectorRepo& repository = DefaultManager::instance()->collectorRepo(); \
DoubleCollector* collector = \
repository.defaultDoubleCollector((CAT), (METRIC)); \
if(collector->id().category()->enabled()) \
{ \
collector->tick((VALUE)); \
} \
} \
} while(false)
// For when the category/metric remain static
#define METRICS_UPDATE(CAT, METRIC, VALUE) \
do \
{ \
using namespace llarp::metrics; \
static CategoryContainer container = {false, nullptr, nullptr}; \
static DoubleCollector* collector = nullptr; \
if(container.category() == nullptr && DefaultManager::instance()) \
{ \
collector = MetricHelper::getDoubleCollector(CAT, METRIC); \
MetricHelper::initContainer(container, CAT); \
} \
if(container.enabled()) \
{ \
collector->tick(VALUE); \
} \
} while(false)
#define METRICS_TYPED_UPDATE(CAT, METRIC, VALUE, TYPE) \
do \
{ \
using namespace llarp::metrics; \
static CategoryContainer container = {false, nullptr, nullptr}; \
static DoubleCollector* collector = nullptr; \
if(container.category() == nullptr && DefaultManager::instance()) \
{ \
collector = MetricHelper::getDoubleCollector(CAT, METRIC); \
MetricHelper::setType(collector->id(), TYPE); \
MetricHelper::initContainer(container, CAT); \
} \
if(container.enabled()) \
{ \
collector->tick(VALUE); \
} \
} while(false)
#define METRICS_DYNAMIC_INCREMENT(CAT, METRIC) \
METRICS_DYNAMIC_INT_UPDATE(CAT, METRIC, 1)
#define METRICS_INCREMENT(CAT, METRIC) METRICS_INT_UPDATE(CAT, METRIC, 1)
#endif

910
llarp/util/metrics_core.cpp Normal file
View File

@ -0,0 +1,910 @@
#include <util/metrics_core.hpp>
#include <iostream>
namespace llarp
{
namespace metrics
{
Record
IntCollector::loadAndClear()
{
size_t count;
uint64_t total;
int min;
int max;
{
absl::WriterMutexLock l(&m_mutex);
count = m_count;
total = m_total;
min = m_min;
max = m_max;
m_count = 0;
m_total = 0;
m_min = DEFAULT_MIN;
m_max = DEFAULT_MAX;
}
return {m_id, count, static_cast< double >(total),
(min == DEFAULT_MIN) ? Record::DEFAULT_MIN
: static_cast< double >(min),
(max == DEFAULT_MAX) ? Record::DEFAULT_MAX
: static_cast< double >(max)};
}
Record
IntCollector::load()
{
size_t count;
int64_t total;
int min;
int max;
{
absl::ReaderMutexLock l(&m_mutex);
count = m_count;
total = m_total;
min = m_min;
max = m_max;
}
return {m_id, count, static_cast< double >(total),
(min == DEFAULT_MIN) ? Record::DEFAULT_MIN : min,
(max == DEFAULT_MAX) ? Record::DEFAULT_MAX : max};
}
std::tuple< Id, bool >
Registry::insert(const char *category, const char *name)
{
// avoid life time issues, putting strings in the stringmem set
const char *cStr = m_stringmem.emplace(category).first->c_str();
const char *nStr = m_stringmem.emplace(name).first->c_str();
NamedCategory namedCategory(cStr, nStr);
const auto it = m_metrics.find(namedCategory);
if(it != m_metrics.end())
{
return std::make_tuple(Id(it->second.get()), false);
}
auto cIt = m_categories.find(cStr);
if(cIt == m_categories.end())
{
auto ptr = std::make_shared< Category >(cStr, m_defaultEnabled);
cIt = m_categories.emplace(cStr, ptr).first;
}
const auto mPtr =
std::make_shared< Description >(cIt->second.get(), nStr);
m_metrics.emplace(namedCategory, mPtr);
return {Id(mPtr.get()), true};
}
Id
Registry::add(const char *category, const char *name)
{
absl::WriterMutexLock l(&m_mutex);
auto result = insert(category, name);
return std::get< 1 >(result) ? std::get< 0 >(result) : Id();
}
Id
Registry::get(const char *category, const char *name)
{
Id result = findId(category, name);
if(result)
{
return result;
}
absl::WriterMutexLock l(&m_mutex);
return std::get< 0 >(insert(category, name));
}
const Category *
Registry::add(const char *category)
{
absl::WriterMutexLock l(&m_mutex);
const char *cStr = m_stringmem.emplace(category).first->c_str();
auto it = m_categories.find(cStr);
if(it == m_categories.end())
{
auto ptr = std::make_shared< Category >(cStr, m_defaultEnabled);
it = m_categories.emplace(cStr, ptr).first;
return it->second.get();
}
return nullptr;
}
const Category *
Registry::get(const char *category)
{
const Category *cPtr = findCategory(category);
if(cPtr)
{
return cPtr;
}
absl::WriterMutexLock l(&m_mutex);
const char *cStr = m_stringmem.emplace(category).first->c_str();
auto it = m_categories.find(cStr);
if(it == m_categories.end())
{
auto ptr = std::make_shared< Category >(cStr, m_defaultEnabled);
it = m_categories.emplace(cStr, ptr).first;
}
return it->second.get();
}
void
Registry::enable(const Category *category, bool value)
{
absl::WriterMutexLock l(&m_mutex);
const_cast< Category * >(category)->enabled(value);
}
void
Registry::enableAll(bool value)
{
absl::WriterMutexLock l(&m_mutex);
if(value == m_defaultEnabled)
{
return;
}
m_defaultEnabled = value;
std::for_each(m_categories.begin(), m_categories.end(),
[&](auto &x) { x.second->enabled(value); });
}
void
Registry::registerContainer(const Category *category,
CategoryContainer &container)
{
absl::WriterMutexLock l(&m_mutex);
if(container.m_category == nullptr)
{
const_cast< Category * >(category)->registerContainer(&container);
}
}
void
Registry::publicationType(const Id &id, Publication::Type type)
{
const_cast< Description * >(id.description())->type(type);
}
void
Registry::setFormat(const Id &id, const Format &format)
{
Description *description = const_cast< Description * >(id.description());
absl::WriterMutexLock l(&m_mutex);
auto fmtPtr = std::make_shared< Format >(format);
for(byte_t i = 0; i < Publication::MaxSize; ++i)
{
auto type = static_cast< Publication::Type >(i);
const FormatSpec *spec = format.specFor(type);
if(spec != nullptr)
{
const char *fmt = m_stringmem.emplace(spec->m_format).first->c_str();
fmtPtr->setSpec(type, FormatSpec(spec->m_scale, fmt));
}
}
description->format(fmtPtr);
}
const Category *
Registry::findCategory(const char *category) const
{
absl::ReaderMutexLock l(&m_mutex);
auto it = m_categories.find(category);
return it == m_categories.end() ? nullptr : it->second.get();
}
Id
Registry::findId(const char *category, const char *name) const
{
absl::ReaderMutexLock l(&m_mutex);
auto it = m_metrics.find(std::make_tuple(category, name));
return it == m_metrics.end() ? Id() : Id(it->second.get());
}
std::vector< const Category * >
Registry::getAll() const
{
absl::ReaderMutexLock l(&m_mutex);
std::vector< const Category * > result;
result.reserve(m_categories.size());
std::transform(m_categories.begin(), m_categories.end(),
std::back_inserter(result),
[](const auto &x) { return x.second.get(); });
return result;
}
MetricCollectors &
CollectorRepo::getCollectors(const Id &id)
{
auto it = m_collectors.find(id);
if(it == m_collectors.end())
{
assert(id.valid());
const Category *cat = id.category();
auto ptr = std::make_shared< MetricCollectors >(id);
auto &vec = m_categories[cat];
vec.reserve(vec.size() + 1);
it = m_collectors.emplace(id, ptr).first;
vec.push_back(ptr.get());
}
return *it->second.get();
}
std::vector< Record >
CollectorRepo::collectAndClear(const Category *category)
{
absl::WriterMutexLock l(&m_mutex);
std::vector< Record > result;
auto it = m_categories.find(category);
if(it != m_categories.end())
{
auto &collectors = it->second;
result.reserve(collectors.size());
std::transform(
collectors.begin(), collectors.end(), std::back_inserter(result),
[](MetricCollectors *c) { return c->combineAndClear(); });
}
return result;
}
std::vector< Record >
CollectorRepo::collect(const Category *category)
{
absl::WriterMutexLock l(&m_mutex);
std::vector< Record > result;
auto it = m_categories.find(category);
if(it != m_categories.end())
{
auto &collectors = it->second;
result.reserve(collectors.size());
std::transform(collectors.begin(), collectors.end(),
std::back_inserter(result),
[](MetricCollectors *c) { return c->combine(); });
}
return result;
}
DoubleCollector *
CollectorRepo::defaultDoubleCollector(const Id &id)
{
{
absl::ReaderMutexLock l(&m_mutex);
auto it = m_collectors.find(id);
if(it != m_collectors.end())
{
return it->second->doubleCollectors().defaultCollector();
}
}
{
absl::WriterMutexLock l(&m_mutex);
return getCollectors(id).doubleCollectors().defaultCollector();
}
}
IntCollector *
CollectorRepo::defaultIntCollector(const Id &id)
{
{
absl::ReaderMutexLock l(&m_mutex);
auto it = m_collectors.find(id);
if(it != m_collectors.end())
{
return it->second->intCollectors().defaultCollector();
}
}
{
absl::WriterMutexLock l(&m_mutex);
return getCollectors(id).intCollectors().defaultCollector();
}
}
std::pair< std::vector< std::shared_ptr< DoubleCollector > >,
std::vector< std::shared_ptr< IntCollector > > >
CollectorRepo::allCollectors(const Id &id)
{
absl::ReaderMutexLock l(&m_mutex);
auto it = m_collectors.find(id);
if(it == m_collectors.end())
{
return {};
}
return {it->second->doubleCollectors().collectors(),
it->second->intCollectors().collectors()};
}
struct PublisherHelper
{
using SampleCache = std::map< std::shared_ptr< Publisher >, Sample >;
static void
updateSampleCache(SampleCache &cache,
const std::shared_ptr< Publisher > &publisher,
const SampleGroup &sampleGroup,
const absl::Time &timeStamp)
{
SampleCache::iterator it = cache.find(publisher);
if(it == cache.end())
{
Sample newSample;
newSample.sampleTime(timeStamp);
it = cache.emplace(publisher, newSample).first;
}
it->second.pushGroup(sampleGroup);
}
static std::pair< std::vector< Record >, absl::Duration >
collect(Manager &manager, const Category *category,
const absl::Duration &now, bool clear)
EXCLUSIVE_LOCKS_REQUIRED(manager.m_mutex)
{
using Callback = Manager::RecordCallback;
using CallbackVector = std::vector< const Callback * >;
using RegistryIterator = CallbackRegistry::iterator;
CallbackVector callbacks;
RegistryIterator begin = manager.m_callbacks.lowerBound(category);
RegistryIterator end = manager.m_callbacks.upperBound(category);
std::vector< Record > result;
std::for_each(begin, end, [&](const auto &x) {
std::vector< Record > tmp = (x.second)(clear);
result.insert(result.end(), tmp.begin(), tmp.end());
});
// Collect records from the repo.
if(clear)
{
std::vector< Record > tmp = manager.m_repo.collectAndClear(category);
result.insert(result.end(), tmp.begin(), tmp.end());
}
else
{
std::vector< Record > tmp = manager.m_repo.collect(category);
result.insert(result.end(), tmp.begin(), tmp.end());
}
// Get the time since last reset, and clear if needed.
Manager::ResetTimes::iterator it = manager.m_resetTimes.find(category);
if(it == manager.m_resetTimes.end())
{
if(clear)
{
manager.m_resetTimes.emplace(category, now);
}
return {result, now - manager.m_createTime};
}
else
{
auto tmp = now - it->second;
if(clear)
{
it->second = now;
}
return {result, tmp};
}
}
template < typename CategoryIterator >
static void
publish(Manager &manager, const CategoryIterator &categoriesBegin,
const CategoryIterator &categoriesEnd, bool clear)
{
if(categoriesBegin == categoriesEnd)
{
return;
}
using RecordBuffer =
std::vector< std::shared_ptr< std::vector< Record > > >;
RecordBuffer recordBuffer;
SampleCache sampleCache;
absl::Time timeStamp = absl::Now();
absl::Duration now = absl::Now() - absl::UnixEpoch();
{
// 1.
absl::WriterMutexLock publishGuard(&manager.m_publishLock);
// 2.
absl::WriterMutexLock propertiesGuard(&manager.m_mutex);
// Build the 'sampleCache' by iterating over the categories and
// collecting records for those categories.
for(CategoryIterator catIt = categoriesBegin; catIt != categoriesEnd;
++catIt)
{
if(!(*catIt)->enabled())
{
continue;
}
// Collect the metrics.
auto result = collect(manager, *catIt, now, clear);
// If their are no collected records then this category can be
// ignored.
if(result.first.empty())
{
continue;
}
if(result.second == absl::Duration())
{
std::cerr << "Invalid elapsed time interval of 0 for "
"published metrics.";
result.second += absl::Nanoseconds(1);
}
// Append the collected records to the buffer of records.
auto records =
std::make_shared< std::vector< Record > >(result.first);
recordBuffer.push_back(records);
SampleGroup sampleGroup(absl::Span< Record >(*records),
result.second);
std::for_each(
manager.m_publishers.globalBegin(),
manager.m_publishers.globalEnd(), [&](const auto &ptr) {
updateSampleCache(sampleCache, ptr, sampleGroup, timeStamp);
});
std::for_each(manager.m_publishers.lowerBound(*catIt),
manager.m_publishers.upperBound(*catIt),
[&](const auto &val) {
updateSampleCache(sampleCache, val.second,
sampleGroup, timeStamp);
});
}
}
for(auto &entry : sampleCache)
{
Publisher *publisher = entry.first.get();
Sample &sample = entry.second;
publisher->publish(sample);
}
}
};
Sample
Manager::collectSample(std::vector< Record > &records,
absl::Span< const Category * > categories,
bool clear)
{
absl::Time timeStamp = absl::Now();
absl::Duration now = timeStamp - absl::UnixEpoch();
Sample sample;
sample.sampleTime(timeStamp);
// Use a tuple to hold 'references' to the collected records
using SampleDescription = std::tuple< size_t, size_t, absl::Duration >;
std::vector< SampleDescription > samples;
samples.reserve(categories.size());
// 1
absl::WriterMutexLock publishGuard(&m_publishLock);
// 2
absl::WriterMutexLock propertiesGuard(&m_mutex);
for(const Category *const category : categories)
{
if(!category->enabled())
{
continue;
}
size_t beginIndex = records.size();
// Collect the metrics.
std::vector< Record > catRecords;
absl::Duration elapsedTime;
std::tie(catRecords, elapsedTime) =
PublisherHelper::collect(*this, category, now, clear);
records.insert(records.end(), catRecords.begin(), catRecords.end());
size_t size = records.size() - beginIndex;
// If there are no collected records then this category can be ignored.
if(size != 0)
{
samples.emplace_back(beginIndex, size, elapsedTime);
}
}
// Now that we have all the records, we can build our sample
for(const SampleDescription &s : samples)
{
sample.pushGroup(&records[std::get< 0 >(s)], std::get< 1 >(s),
std::get< 2 >(s));
}
return sample;
}
void
Manager::publish(absl::Span< const Category * > categories, bool clear)
{
PublisherHelper::publish(*this, categories.begin(), categories.end(),
clear);
}
void
Manager::publish(const std::set< const Category * > &categories, bool clear)
{
PublisherHelper::publish(*this, categories.begin(), categories.end(),
clear);
}
Manager *DefaultManager::m_manager = nullptr;
struct PublisherSchedulerData
{
util::Mutex m_mutex;
thread::Scheduler::Handle m_handle;
std::set< const Category * > m_categories;
bool m_default;
std::set< const Category * > m_nonDefaultCategories;
PublisherSchedulerData()
: m_handle(thread::Scheduler::INVALID_HANDLE), m_default(false)
{
}
};
// Reverts a publisher scheduler back to its default state
class PublisherSchedulerGuard
{
PublisherScheduler *m_scheduler;
public:
PublisherSchedulerGuard(PublisherScheduler *scheduler)
: m_scheduler(scheduler)
{
}
~PublisherSchedulerGuard()
{
if(m_scheduler != nullptr)
{
for(auto &repeat : m_scheduler->m_repeaters)
{
if(repeat.second->m_handle != thread::Scheduler::INVALID_HANDLE)
{
m_scheduler->m_scheduler.cancelRepeat(repeat.second->m_handle);
}
}
m_scheduler->m_defaultInterval = absl::Duration();
m_scheduler->m_repeaters.clear();
m_scheduler->m_categories.clear();
}
}
void
release()
{
m_scheduler = nullptr;
}
};
void
PublisherScheduler::publish(
const std::shared_ptr< PublisherSchedulerData > &data) const
{
util::Lock l(&data->m_mutex);
if(data->m_default)
{
m_manager->publishAllExcluding(data->m_nonDefaultCategories);
}
else if(!data->m_categories.empty())
{
m_manager->publish(data->m_categories);
}
}
void
PublisherScheduler::cancel(Categories::iterator it)
{
assert(it != m_categories.end());
auto repeatIt = m_repeaters.find(it->second);
assert(repeatIt != m_repeaters.end());
const Category *category = it->first;
m_categories.erase(it);
auto data = repeatIt->second;
{
util::Lock l(&data->m_mutex);
assert(data->m_categories.find(category) != data->m_categories.end());
data->m_categories.erase(category);
}
if(!data->m_default)
{
if(data->m_categories.empty())
{
m_scheduler.cancelRepeat(data->m_handle);
m_repeaters.erase(repeatIt);
}
if(m_defaultInterval != absl::Duration())
{
auto defaultIntervalIt = m_repeaters.find(m_defaultInterval);
assert(defaultIntervalIt != m_repeaters.end());
auto &defaultRepeater = defaultIntervalIt->second;
util::Lock l(&defaultRepeater->m_mutex);
defaultRepeater->m_nonDefaultCategories.erase(category);
}
}
}
bool
PublisherScheduler::cancelDefault()
{
if(m_defaultInterval == absl::Duration())
{
return false;
}
absl::Duration interval = m_defaultInterval;
m_defaultInterval = absl::Duration();
auto repeatIt = m_repeaters.find(interval);
assert(repeatIt != m_repeaters.end());
auto data = repeatIt->second;
if(data->m_categories.empty())
{
assert(data->m_handle != thread::Scheduler::INVALID_HANDLE);
m_scheduler.cancelRepeat(data->m_handle);
m_repeaters.erase(repeatIt);
}
else
{
util::Lock l(&data->m_mutex);
data->m_default = false;
data->m_nonDefaultCategories.clear();
}
return true;
}
void
PublisherScheduler::schedule(const Category *category,
absl::Duration interval)
{
assert(absl::Seconds(0) < interval);
util::Lock l(&m_mutex);
auto catIt = m_categories.find(category);
if(catIt != m_categories.end())
{
if(catIt->second == interval)
{
return;
}
cancel(catIt);
}
// Make a guard, so if something throws, the scheduler is reset to a
// somewhat "sane" state (no metrics).
PublisherSchedulerGuard guard(this);
m_categories.emplace(category, interval);
auto repeatIt = m_repeaters.find(interval);
std::shared_ptr< PublisherSchedulerData > data;
// Create a new 'ClockData' object if one does not exist for the
// 'interval', otherwise update the existing 'data'.
if(repeatIt == m_repeaters.end())
{
data = std::make_shared< PublisherSchedulerData >();
data->m_categories.insert(category);
m_repeaters.emplace(interval, data);
util::Lock lock(&data->m_mutex);
data->m_handle = m_scheduler.scheduleRepeat(
interval, std::bind(&PublisherScheduler::publish, this, data));
}
else
{
data = repeatIt->second;
util::Lock lock(&data->m_mutex);
data->m_categories.insert(category);
}
// If this isn't being added to the default schedule, then add to the set
// of non-default categories in the default schedule.
if(!data->m_default && m_defaultInterval != absl::Duration())
{
auto defaultIntervalIt = m_repeaters.find(m_defaultInterval);
assert(defaultIntervalIt != m_repeaters.end());
auto &defaultInterval = defaultIntervalIt->second;
util::Lock lock(&defaultInterval->m_mutex);
defaultInterval->m_nonDefaultCategories.insert(category);
}
guard.release();
}
void
PublisherScheduler::setDefault(absl::Duration interval)
{
assert(absl::Seconds(0) < interval);
util::Lock l(&m_mutex);
// If its already this interval, return early.
if(interval == m_defaultInterval)
{
return;
}
cancelDefault();
m_defaultInterval = interval;
// Make a guard, so if something throws, the scheduler is reset to a
// somewhat "sane" state (no metrics).
PublisherSchedulerGuard guard(this);
std::shared_ptr< PublisherSchedulerData > data;
auto repeatIt = m_repeaters.find(interval);
if(repeatIt == m_repeaters.end())
{
data = std::make_shared< PublisherSchedulerData >();
m_repeaters.emplace(interval, data);
}
else
{
data = repeatIt->second;
}
util::Lock lock(&data->m_mutex);
data->m_default = true;
Categories::iterator cIt = m_categories.begin();
for(; cIt != m_categories.end(); ++cIt)
{
if(cIt->second != interval)
{
data->m_nonDefaultCategories.insert(cIt->first);
}
}
if(data->m_handle == thread::Scheduler::INVALID_HANDLE)
{
data->m_handle = m_scheduler.scheduleRepeat(
interval, std::bind(&PublisherScheduler::publish, this, data));
}
guard.release();
}
bool
PublisherScheduler::cancel(const Category *category)
{
util::Lock l(&m_mutex);
Categories::iterator it = m_categories.find(category);
if(it == m_categories.end())
{
// This category has no specific schedule.
return false;
}
cancel(it);
return true;
}
bool
PublisherScheduler::clearDefault()
{
util::Lock l(&m_mutex);
return cancelDefault();
}
void
PublisherScheduler::cancelAll()
{
util::Lock l(&m_mutex);
for(auto &repeat : m_repeaters)
{
m_scheduler.cancelRepeat(repeat.second->m_handle, true);
}
m_defaultInterval = absl::Duration();
m_repeaters.clear();
m_categories.clear();
}
absl::optional< absl::Duration >
PublisherScheduler::find(const Category *category) const
{
util::Lock l(&m_mutex);
auto it = m_categories.find(category);
if(it == m_categories.end())
{
return {};
}
else
{
return it->second;
}
}
absl::optional< absl::Duration >
PublisherScheduler::getDefault() const
{
util::Lock l(&m_mutex);
if(m_defaultInterval == absl::Duration())
{
return {};
}
else
{
return m_defaultInterval;
}
}
std::vector< std::pair< const Category *, absl::Duration > >
PublisherScheduler::getAll() const
{
util::Lock l(&m_mutex);
std::vector< std::pair< const Category *, absl::Duration > > result;
result.reserve(m_categories.size());
std::copy(m_categories.begin(), m_categories.end(),
std::back_inserter(result));
return result;
}
} // namespace metrics
} // namespace llarp

1335
llarp/util/metrics_core.hpp Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,189 @@
#include <util/metrics_publishers.hpp>
#include <iostream>
namespace llarp
{
namespace metrics
{
namespace
{
void
formatValue(std::ostream &stream, size_t value,
const FormatSpec *formatSpec)
{
if(formatSpec)
{
FormatSpec::format(stream, (double)value, *formatSpec);
}
else
{
stream << value;
}
}
void
formatValue(std::ostream &stream, int value, const FormatSpec *formatSpec)
{
if(formatSpec)
{
FormatSpec::format(stream, (double)value, *formatSpec);
}
else
{
stream << value;
}
}
void
formatValue(std::ostream &stream, double value,
const FormatSpec *formatSpec)
{
if(formatSpec)
{
FormatSpec::format(stream, value, *formatSpec);
}
else
{
stream << value;
}
}
void
formatValue(std::ostream &stream, const Record &record,
double elapsedTime, Publication::Type publicationType,
const FormatSpec *formatSpec)
{
switch(publicationType)
{
case Publication::Type::Unspecified:
{
assert(false && "Invalid publication type");
}
break;
case Publication::Type::Total:
{
formatValue(stream, record.total(), formatSpec);
}
break;
case Publication::Type::Count:
{
formatValue(stream, record.count(), formatSpec);
}
break;
case Publication::Type::Min:
{
formatValue(stream, record.min(), formatSpec);
}
break;
case Publication::Type::Max:
{
formatValue(stream, record.max(), formatSpec);
}
break;
case Publication::Type::Avg:
{
formatValue(stream, record.total() / record.count(), formatSpec);
}
break;
case Publication::Type::Rate:
{
formatValue(stream, record.total() / elapsedTime, formatSpec);
}
break;
case Publication::Type::RateCount:
{
formatValue(stream, record.count() / elapsedTime, formatSpec);
}
break;
}
}
void
publishRecord(std::ostream &stream, const Record &record,
double elapsedTime)
{
auto publicationType = record.id().description()->type();
std::shared_ptr< const Format > format =
record.id().description()->format();
stream << "\t\t" << record.id() << " [ ";
if(publicationType != Publication::Type::Unspecified)
{
stream << Publication::repr(publicationType) << " = ";
const FormatSpec *formatSpec =
format ? format->specFor(publicationType) : nullptr;
formatValue(stream, record, elapsedTime, publicationType, formatSpec);
}
else
{
const FormatSpec *countSpec = nullptr;
const FormatSpec *totalSpec = nullptr;
const FormatSpec *minSpec = nullptr;
const FormatSpec *maxSpec = nullptr;
if(format)
{
countSpec = format->specFor(Publication::Type::Count);
totalSpec = format->specFor(Publication::Type::Total);
minSpec = format->specFor(Publication::Type::Min);
maxSpec = format->specFor(Publication::Type::Max);
}
stream << "count = ";
formatValue(stream, record.count(), countSpec);
stream << ", total = ";
formatValue(stream, record.total(), totalSpec);
if(Record::DEFAULT_MIN == record.min())
{
stream << ", min = undefined";
}
else
{
stream << ", min = ";
formatValue(stream, record.min(), minSpec);
}
if(Record::DEFAULT_MAX == record.max())
{
stream << ", max = undefined";
}
else
{
stream << ", max = ";
formatValue(stream, record.max(), maxSpec);
}
}
stream << " ]\n";
}
} // namespace
void
StreamPublisher::publish(const Sample &values)
{
if(values.recordCount() > 0)
{
m_stream << values.sampleTime() << " " << values.recordCount()
<< " Records\n";
auto gIt = values.begin();
auto prev = values.begin();
for(; gIt != values.end(); ++gIt)
{
const double elapsedTime = absl::ToDoubleSeconds(gIt->samplePeriod());
if(gIt == prev || gIt->samplePeriod() != prev->samplePeriod())
{
m_stream << "\tElapsed Time: " << elapsedTime << "s\n";
}
for(const auto &record : *gIt)
{
publishRecord(m_stream, record, elapsedTime);
}
prev = gIt;
}
}
}
} // namespace metrics
} // namespace llarp

View File

@ -0,0 +1,32 @@
#ifndef LLARP_METRICS_PUBLISHERS_HPP
#define LLARP_METRICS_PUBLISHERS_HPP
#include <util/metrics_core.hpp>
#include <iosfwd>
namespace llarp
{
namespace metrics
{
class StreamPublisher final : public Publisher
{
std::ostream& m_stream;
public:
StreamPublisher(std::ostream& stream) : m_stream(stream)
{
}
~StreamPublisher()
{
}
void
publish(const Sample& values) override;
};
} // namespace metrics
} // namespace llarp
#endif

View File

@ -0,0 +1,169 @@
#include <util/metrics_types.hpp>
#include <util/printer.hpp>
namespace llarp
{
namespace metrics
{
std::ostream &
FormatSpec::format(std::ostream &stream, double data,
const FormatSpec &format)
{
static constexpr size_t INIT_SIZE = 32;
char buf[INIT_SIZE] = {0};
int rc = snprintf(buf, INIT_SIZE, format.m_format, data * format.m_scale);
if(rc < 0)
{
stream << "Bad format " << format.m_format << " applied to " << data;
return stream;
}
if(static_cast< size_t >(rc) < INIT_SIZE)
{
stream << buf;
return stream;
}
std::vector< char > vec(rc + 1);
rc = snprintf(vec.data(), vec.size(), format.m_format,
data * format.m_scale);
if(static_cast< size_t >(rc) > vec.size())
{
stream << "Bad format " << format.m_format << " applied to " << data;
return stream;
}
else
{
stream << vec.data();
return stream;
}
}
string_view
Publication::repr(Type val)
{
switch(val)
{
case Type::Unspecified:
return "Unspecified";
case Type::Total:
return "Total";
case Type::Count:
return "Count";
case Type::Min:
return "Min";
case Type::Max:
return "Max";
case Type::Avg:
return "Avg";
case Type::Rate:
return "Rate";
case Type::RateCount:
return "RateCount";
}
}
std::ostream &
Publication::print(std::ostream &stream, Type val)
{
stream << repr(val);
return stream;
}
Category::~Category()
{
while(m_container)
{
auto next = m_container->m_nextCategory;
m_container->clear();
m_container = next;
}
}
void
Category::enabled(bool val)
{
// sync point
if(m_enabled != val)
{
auto cont = m_container;
while(cont)
{
cont->m_enabled = val;
cont = cont->m_nextCategory;
}
m_enabled = val;
}
}
void
Category::registerContainer(CategoryContainer *container)
{
container->m_enabled = m_enabled;
container->m_category = this;
container->m_nextCategory = m_container;
m_container = container;
}
std::ostream &
Category::print(std::ostream &stream, int level, int spaces) const
{
Printer printer(stream, level, spaces);
printer.printAttribute("name", m_name);
printer.printAttribute("enabled",
m_enabled.load(std::memory_order_relaxed));
return stream;
}
std::ostream &
Description::print(std::ostream &stream) const
{
util::Lock l(&m_mutex);
stream << m_category->name() << '.' << m_name;
return stream;
}
const double Record::DEFAULT_MIN = std::numeric_limits< double >::max() * 2;
const double Record::DEFAULT_MAX =
std::numeric_limits< double >::max() * -2;
std::ostream &
Record::print(std::ostream &stream, int level, int spaces) const
{
Printer printer(stream, level, spaces);
printer.printAttribute("id", m_id);
printer.printAttribute("count", m_count);
printer.printAttribute("total", m_total);
printer.printAttribute("min", m_min);
printer.printAttribute("max", m_max);
return stream;
}
std::ostream &
SampleGroup::print(std::ostream &stream, int level, int spaces) const
{
Printer::PrintFunction< absl::Duration > durationPrinter =
[](std::ostream &stream, const absl::Duration &duration, int,
int) -> std::ostream & {
stream << duration;
return stream;
};
Printer printer(stream, level, spaces);
printer.printAttribute("records", m_records);
printer.printForeignAttribute("samplePeriod", m_samplePeriod,
durationPrinter);
return stream;
}
} // namespace metrics
} // namespace llarp

View File

@ -0,0 +1,536 @@
#ifndef LLARP_METRICS_TYPES_HPP
#define LLARP_METRICS_TYPES_HPP
#include <util/string_view.hpp>
#include <util/threading.hpp>
#include <util/types.hpp>
#include <absl/types/span.h>
#include <absl/types/optional.h>
#include <set>
#include <memory>
#include <cstring>
#include <iosfwd>
namespace llarp
{
namespace metrics
{
struct Publication
{
enum class Type : byte_t
{
Unspecified = 0, // no associated metric type
Total, // sum of seen values in the measurement period
Count, // count of seen events
Min, // Minimum value
Max, // Max value
Avg, // total / count
Rate, // total per second
RateCount // count per second
};
enum
{
MaxSize = static_cast< byte_t >(Type::RateCount) + 1
};
static string_view
repr(Type val);
static std::ostream &
print(std::ostream &stream, Type val);
};
struct FormatSpec
{
float m_scale;
const char *m_format;
static constexpr char DEFAULT_FORMAT[] = "%f";
constexpr FormatSpec() : m_scale(1.0), m_format(DEFAULT_FORMAT)
{
}
constexpr FormatSpec(float scale, const char *format)
: m_scale(scale), m_format(format)
{
}
static std::ostream &
format(std::ostream &stream, double data, const FormatSpec &spec);
};
inline bool
operator==(const FormatSpec &lhs, const FormatSpec &rhs)
{
return lhs.m_scale == rhs.m_scale
&& std::strcmp(lhs.m_format, rhs.m_format) == 0;
}
struct Format
{
using Spec = absl::optional< FormatSpec >;
std::array< Spec, Publication::MaxSize > m_specs;
constexpr Format() : m_specs()
{
}
void
setSpec(Publication::Type pub, const FormatSpec &spec)
{
m_specs[static_cast< size_t >(pub)].emplace(spec);
}
constexpr void
clear()
{
m_specs = decltype(m_specs)();
}
constexpr const FormatSpec *
specFor(Publication::Type val) const
{
const auto &spec = m_specs[static_cast< size_t >(val)];
return spec ? &spec.value() : nullptr;
}
};
inline bool
operator==(const Format &lhs, const Format &rhs)
{
return lhs.m_specs == rhs.m_specs;
}
struct CategoryContainer;
/// Represents a category of grouped metrics
class Category
{
const char *m_name;
std::atomic_bool m_enabled;
CategoryContainer *m_container;
public:
Category(const char *name, bool enabled = true)
: m_name(name), m_enabled(enabled), m_container(nullptr)
{
}
~Category();
void
enabled(bool flag);
void
registerContainer(CategoryContainer *container);
const std::atomic_bool &
enabledRaw() const
{
return m_enabled;
}
const char *
name() const
{
return m_name;
}
bool
enabled() const
{
return m_enabled;
}
std::ostream &
print(std::ostream &stream, int level, int spaces) const;
};
inline std::ostream &
operator<<(std::ostream &stream, const Category &c)
{
return c.print(stream, -1, -1);
}
struct CategoryContainer
{
bool m_enabled;
const Category *m_category;
CategoryContainer *m_nextCategory;
constexpr void
clear()
{
m_enabled = false;
m_category = nullptr;
m_nextCategory = nullptr;
}
};
class Description
{
mutable util::Mutex m_mutex;
const Category *m_category GUARDED_BY(m_mutex);
const char *m_name GUARDED_BY(m_mutex);
Publication::Type m_type GUARDED_BY(m_mutex);
std::shared_ptr< Format > m_format GUARDED_BY(m_mutex);
Description(const Description &) = delete;
Description &
operator=(const Description &) = delete;
public:
Description(const Category *category, const char *name)
: m_category(category)
, m_name(name)
, m_type(Publication::Type::Unspecified)
{
}
void
category(const Category *c)
{
util::Lock l(&m_mutex);
m_category = c;
}
const Category *
category() const
{
util::Lock l(&m_mutex);
return m_category;
}
void
name(const char *n)
{
util::Lock l(&m_mutex);
m_name = n;
}
const char *
name() const
{
util::Lock l(&m_mutex);
return m_name;
}
void
type(Publication::Type t)
{
util::Lock l(&m_mutex);
m_type = t;
}
Publication::Type
type() const
{
util::Lock l(&m_mutex);
return m_type;
}
void
format(const std::shared_ptr< Format > &f)
{
util::Lock l(&m_mutex);
m_format = f;
}
std::shared_ptr< Format >
format() const
{
util::Lock l(&m_mutex);
return m_format;
}
std::ostream &
print(std::ostream &stream) const;
};
inline std::ostream &
operator<<(std::ostream &stream, const Description &d)
{
return d.print(stream);
}
/// A metric id is what we will actually deal with in terms of metrics, in
/// order to make things like static initialisation cleaner.
class Id
{
const Description *m_description;
public:
constexpr Id() : m_description(nullptr)
{
}
constexpr Id(const Description *description) : m_description(description)
{
}
constexpr const Description *&
description()
{
return m_description;
}
constexpr const Description *const &
description() const
{
return m_description;
}
bool
valid() const noexcept
{
return m_description != nullptr;
}
explicit operator bool() const noexcept
{
return valid();
}
const Category *
category() const
{
assert(valid());
return m_description->category();
}
const char *
categoryName() const
{
assert(valid());
return m_description->category()->name();
}
const char *
metricName() const
{
assert(valid());
return m_description->name();
}
std::ostream &
print(std::ostream &stream, int, int) const
{
if(m_description)
{
stream << *m_description;
}
else
{
stream << "INVALID_METRIC";
}
return stream;
}
};
inline bool
operator==(const Id &lhs, const Id &rhs)
{
return lhs.description() == rhs.description();
}
inline bool
operator<(const Id &lhs, const Id &rhs)
{
return lhs.description() < rhs.description();
}
inline std::ostream &
operator<<(std::ostream &stream, const Id &id)
{
return id.print(stream, -1, -1);
}
class Record
{
Id m_id;
size_t m_count;
double m_total;
double m_min;
double m_max;
public:
static const double DEFAULT_MIN;
static const double DEFAULT_MAX;
Record()
: m_id()
, m_count(0)
, m_total(0.0)
, m_min(DEFAULT_MIN)
, m_max(DEFAULT_MAX)
{
}
explicit Record(const Id &id)
: m_id(id)
, m_count(0)
, m_total(0.0)
, m_min(DEFAULT_MIN)
, m_max(DEFAULT_MAX)
{
}
Record(const Id &id, size_t count, double total, double min, double max)
: m_id(id), m_count(count), m_total(total), m_min(min), m_max(max)
{
}
// clang-format off
const Id& id() const { return m_id; }
Id& id() { return m_id; }
size_t count() const { return m_count; }
size_t& count() { return m_count; }
double total() const { return m_total; }
double& total() { return m_total; }
double min() const { return m_min; }
double& min() { return m_min; }
double max() const { return m_max; }
double& max() { return m_max; }
// clang-format on
std::ostream &
print(std::ostream &stream, int level, int spaces) const;
};
inline std::ostream &
operator<<(std::ostream &stream, const Record &rec)
{
return rec.print(stream, -1, -1);
}
inline bool
operator==(const Record &lhs, const Record &rhs)
{
return (lhs.id() == rhs.id() && lhs.count() == rhs.count()
&& lhs.total() == rhs.total() && lhs.min() == rhs.min()
&& lhs.max() == rhs.max());
}
class SampleGroup
{
absl::Span< const Record > m_records;
absl::Duration m_samplePeriod;
public:
using const_iterator = absl::Span< const Record >::const_iterator;
SampleGroup() : m_records(), m_samplePeriod()
{
}
SampleGroup(const Record *records, size_t size,
absl::Duration samplePeriod)
: m_records(records, size), m_samplePeriod(samplePeriod)
{
}
SampleGroup(const absl::Span< const Record > &records,
absl::Duration samplePeriod)
: m_records(records), m_samplePeriod(samplePeriod)
{
}
// clang-format off
void samplePeriod(absl::Duration duration) { m_samplePeriod = duration; }
absl::Duration samplePeriod() const { return m_samplePeriod; }
void records(absl::Span<const Record> recs) { m_records = recs; }
absl::Span<const Record> records() const { return m_records; }
bool empty() const { return m_records.empty(); }
size_t size() const { return m_records.size(); }
const_iterator begin() const { return m_records.begin(); }
const_iterator end() const { return m_records.end(); }
// clang-format on
std::ostream &
print(std::ostream &stream, int level, int spaces) const;
};
inline std::ostream &
operator<<(std::ostream &stream, const SampleGroup &group)
{
return group.print(stream, -1, -1);
}
inline bool
operator==(const SampleGroup &lhs, const SampleGroup &rhs)
{
return lhs.records() == rhs.records()
&& lhs.samplePeriod() == rhs.samplePeriod();
}
class Sample
{
absl::Time m_sampleTime;
std::vector< SampleGroup > m_samples;
size_t m_recordCount;
public:
using const_iterator = std::vector< SampleGroup >::const_iterator;
Sample() : m_sampleTime(), m_recordCount(0)
{
}
// clang-format off
void sampleTime(const absl::Time& time) { m_sampleTime = time; }
absl::Time sampleTime() const { return m_sampleTime; }
void pushGroup(const SampleGroup& group) {
if (!group.empty()) {
m_samples.push_back(group);
m_recordCount += group.size();
}
}
void pushGroup(const Record *records, size_t size, absl::Duration duration) {
if (size != 0) {
m_samples.emplace_back(records, size, duration);
m_recordCount += size;
}
}
void pushGroup(const absl::Span< const Record > &records,absl::Duration duration) {
if (!records.empty()) {
m_samples.emplace_back(records, duration);
m_recordCount += records.size();
}
}
void clear() {
m_samples.clear();
m_recordCount = 0;
}
const SampleGroup& group(size_t index) {
assert(index < m_samples.size());
return m_samples[index];
}
const_iterator begin() const { return m_samples.begin(); }
const_iterator end() const { return m_samples.end(); }
size_t groupCount() const { return m_samples.size(); }
size_t recordCount() const { return m_recordCount; }
// clang-format on
};
} // namespace metrics
} // namespace llarp
#endif

1
llarp/util/object.cpp Normal file
View File

@ -0,0 +1 @@
#include <util/object.hpp>

413
llarp/util/object.hpp Normal file
View File

@ -0,0 +1,413 @@
#ifndef LLARP_OBJECT_HPP
#define LLARP_OBJECT_HPP
#include <util/threading.hpp>
#include <absl/types/optional.h>
#include <vector>
namespace llarp
{
namespace object
{
/// Provide a buffer, capable of holding a single `Value` object.
/// This is useful for node-based data structures.
/// Note:
/// - This union explicitly does *not* manage the lifetime of the object,
/// explicit calls to the constructor/destructor must be made.
template < typename Value >
union Buffer {
private:
char m_buffer[sizeof(Value)];
char m_align[alignof(Value)];
public:
Value*
address()
{
return reinterpret_cast< Value* >(static_cast< void* >(m_buffer));
}
const Value*
address() const
{
return reinterpret_cast< Value* >(static_cast< void* >(m_buffer));
}
char*
buffer()
{
return m_buffer;
}
const char*
buffer() const
{
return m_buffer;
}
Value&
value()
{
return *reinterpret_cast< Value* >(this);
}
const Value&
value() const
{
return *reinterpret_cast< const Value* >(this);
}
};
template < typename Value >
class Catalog;
template < typename Value >
class CatalogIterator;
template < typename Value >
class CatalogCleaner
{
Catalog< Value >* m_catalog;
typename Catalog< Value >::Node* m_node;
bool m_shouldDelete;
CatalogCleaner(const CatalogCleaner&) = delete;
CatalogCleaner&
operator=(const CatalogCleaner&) = delete;
public:
explicit CatalogCleaner(Catalog< Value >* catalog)
: m_catalog(catalog), m_node(nullptr), m_shouldDelete(false)
{
}
~CatalogCleaner();
void
manageNode(typename Catalog< Value >::Node* node, bool shouldDelete)
{
m_node = node;
m_shouldDelete = shouldDelete;
}
void
releaseNode()
{
m_node = nullptr;
}
void
release()
{
releaseNode();
m_catalog = nullptr;
}
};
/// A pooling catalog of objects, referred to by a 32-bit handle
template < typename Value >
class Catalog
{
static constexpr int32_t INDEX_MASK = 0X007FFFFF;
static constexpr int32_t BUSY_INDICATOR = 0x00800000;
static constexpr int32_t GENERATION_INC = 0x01000000;
static constexpr int32_t GENERATION_MASK = 0XFF000000;
struct Node
{
typedef union {
Buffer< Value > m_buffer;
Node* m_next;
} Payload;
Payload m_payload;
int32_t m_handle;
};
std::vector< Node* > m_nodes;
Node* m_next;
std::atomic_size_t m_size;
mutable util::Mutex m_mutex;
friend class CatalogCleaner< Value >;
friend class CatalogIterator< Value >;
static Value*
getValue(Node* node)
{
return node->m_payload.m_buffer.address();
}
void
freeNode(Node* node)
{
node->m_handle += GENERATION_INC;
node->m_handle &= ~BUSY_INDICATOR;
node->m_payload.m_next = m_next;
m_next = node;
}
Node*
findNode(int32_t handle) const
{
int32_t index = handle & INDEX_MASK;
if(0 > index || index >= (int32_t)m_nodes.size()
|| !(handle & BUSY_INDICATOR))
{
return nullptr;
}
Node* node = m_nodes[index];
return (node->m_handle == handle) ? node : nullptr;
}
public:
Catalog() : m_next(nullptr), m_size(0)
{
}
~Catalog()
{
removeAll();
}
int32_t
add(const Value& value)
{
int32_t handle;
absl::WriterMutexLock l(&m_mutex);
CatalogCleaner< Value > guard(this);
Node* node;
if(m_next)
{
node = m_next;
m_next = node->m_payload.m_next;
guard.manageNode(node, false);
}
else
{
assert(m_nodes.size() < BUSY_INDICATOR);
node = static_cast< Node* >(operator new(sizeof(Node)));
guard.manageNode(node, true);
m_nodes.push_back(node);
node->m_handle = static_cast< int32_t >(m_nodes.size()) - 1;
guard.manageNode(node, false);
}
node->m_handle |= BUSY_INDICATOR;
handle = node->m_handle;
// construct into the node.
::new(getValue(node)) Value(value);
guard.release();
++m_size;
return handle;
}
bool
remove(int32_t handle, Value* value = nullptr)
{
absl::WriterMutexLock l(&m_mutex);
Node* node = findNode(handle);
if(!node)
{
return false;
}
Value* val = getValue(node);
if(value)
{
*value = *val;
}
val->~Value();
freeNode(node);
--m_size;
return true;
}
void
removeAll(std::vector< Value >* output = nullptr)
{
absl::WriterMutexLock l(&m_mutex);
for(Node* node : m_nodes)
{
if(node->m_handle & BUSY_INDICATOR)
{
Value* value = getValue(node);
if(output)
{
output->push_back(*value);
}
value->~Value();
}
delete node;
}
m_nodes.clear();
m_next = nullptr;
m_size = 0;
}
bool
replace(const Value& newValue, int32_t handle)
{
absl::WriterMutexLock l(&m_mutex);
Node* node = findNode(handle);
if(!node)
{
return false;
}
Value* value = getValue(node);
value->~Value();
// construct into the node.
::new(value) Value(newValue);
return true;
}
absl::optional< Value >
find(int32_t handle)
{
util::Lock l(&m_mutex);
Node* node = findNode(handle);
if(!node)
{
return {};
}
return *getValue(node);
}
size_t
size() const
{
return m_size;
}
/// introduced for testing only. verify the current state of the catalog.
bool
verify() const;
};
template < typename Value >
class CatalogIterator
{
const Catalog< Value >* m_catalog;
size_t m_index;
public:
CatalogIterator(const Catalog< Value >& catalog)
: m_catalog(&catalog), m_index(-1)
{
m_catalog->m_mutex.ReaderLock();
operator++();
}
~CatalogIterator()
{
m_catalog->m_mutex.ReaderUnlock();
}
void
operator++()
{
m_index++;
while(m_index < m_catalog->m_nodes.size()
&& !(m_catalog->m_nodes[m_index]->m_handle
& Catalog< Value >::BUSY_INDICATOR))
{
++m_index;
}
}
explicit operator bool() const
{
return m_index < m_catalog->m_nodes.size();
}
std::pair< int32_t, Value >
operator()() const
{
auto* node = m_catalog->m_nodes[m_index];
return {node->m_handle, *Catalog< Value >::getValue(node)};
}
};
template < typename Value >
CatalogCleaner< Value >::~CatalogCleaner()
{
if(m_catalog && m_node)
{
if(m_shouldDelete)
{
// We call the destructor elsewhere.
operator delete(m_node);
}
else
{
m_catalog->freeNode(m_node);
}
}
}
template < typename Value >
bool
Catalog< Value >::verify() const
{
absl::WriterMutexLock l(&m_mutex);
if(m_nodes.size() < m_size)
{
return false;
}
size_t busyCount = 0;
for(size_t i = 0; i < m_nodes.size(); i++)
{
if((m_nodes[i]->m_handle & INDEX_MASK) != i)
{
return false;
}
if(m_nodes[i]->m_handle & BUSY_INDICATOR)
{
busyCount++;
}
}
if(m_size != busyCount)
{
return false;
}
size_t freeCount = 0;
for(Node* p = m_next; p != nullptr; p = p->m_payload.m_next)
{
freeCount++;
}
if(freeCount + busyCount != m_nodes.size())
{
return false;
}
return true;
}
} // namespace object
} // namespace llarp
#endif

424
llarp/util/scheduler.cpp Normal file
View File

@ -0,0 +1,424 @@
#include <util/scheduler.hpp>
namespace llarp
{
namespace thread
{
const Scheduler::Handle Scheduler::INVALID_HANDLE = -1;
void
Scheduler::dispatch()
{
using PendingRepeatItem = TimerQueueItem< RepeatDataPtr >;
std::vector< PendingRepeatItem > pendingRepeats;
while(true)
{
{
util::Lock l(&m_mutex);
if(!m_running.load(std::memory_order_relaxed))
{
return;
}
m_iterationCount++;
size_t newRepeatSize = 0, newEventSize = 0;
absl::Time now = m_clock();
static constexpr size_t MAX_PENDING_REPEAT = 64;
static constexpr size_t MAX_PENDING_EVENTS = 64;
absl::Time minRepeat, minEvent;
m_repeatQueue.popLess(now, MAX_PENDING_REPEAT, &pendingRepeats,
&newRepeatSize, &minRepeat);
m_eventQueue.popLess(now, MAX_PENDING_EVENTS, &m_events,
&newEventSize, &minEvent);
// If there are no pending events to process...
if(pendingRepeats.empty() && m_events.empty())
{
// if there are none in the queue *at all* block until woken
if(newRepeatSize == 0 && newEventSize == 0)
{
m_condition.Wait(&m_mutex);
}
else
{
absl::Time minTime;
if(newRepeatSize == 0)
{
minTime = minEvent;
}
else if(newEventSize == 0)
{
minTime = minRepeat;
}
else
{
minTime = std::min(minRepeat, minEvent);
}
m_condition.WaitWithDeadline(&m_mutex, minTime);
}
continue;
}
}
auto repeatIt = pendingRepeats.begin();
m_eventIt = m_events.begin();
while(repeatIt != pendingRepeats.end() && m_eventIt != m_events.end())
{
auto repeatTime = repeatIt->time();
auto eventTime = m_eventIt->time();
if(repeatTime < eventTime)
{
auto data = repeatIt->value();
if(!data->m_isCancelled)
{
m_dispatcher(data->m_callback);
if(!data->m_isCancelled)
{
data->m_handle =
m_repeatQueue.add(repeatTime + data->m_period, data);
}
}
repeatIt++;
}
else
{
m_eventCount--;
m_dispatcher(m_eventIt->value());
m_eventIt++;
}
}
// We've eaten one of the queues.
while(repeatIt != pendingRepeats.end())
{
auto repeatTime = repeatIt->time();
auto data = repeatIt->value();
if(!data->m_isCancelled)
{
m_dispatcher(data->m_callback);
if(!data->m_isCancelled)
{
data->m_handle =
m_repeatQueue.add(repeatTime + data->m_period, data);
}
}
repeatIt++;
}
while(m_eventIt != m_events.end())
{
m_eventCount--;
m_dispatcher(m_eventIt->value());
m_eventIt++;
}
pendingRepeats.clear();
m_events.clear();
}
}
void
Scheduler::yield()
{
if(m_running.load(std::memory_order_relaxed))
{
if(std::this_thread::get_id() != m_thread.get_id())
{
size_t iterations = m_iterationCount.load(std::memory_order_relaxed);
while(iterations == m_iterationCount.load(std::memory_order_relaxed)
&& m_running.load(std::memory_order_relaxed))
{
m_condition.Signal();
std::this_thread::yield();
}
}
}
}
Scheduler::Scheduler(const EventDispatcher& dispatcher, const Clock& clock)
: m_clock(clock)
, m_dispatcher(dispatcher)
, m_running(false)
, m_iterationCount(0)
, m_eventIt()
, m_repeatCount(0)
, m_eventCount(0)
{
}
Scheduler::~Scheduler()
{
stop();
}
bool
Scheduler::start()
{
util::Lock threadLock(&m_threadMutex);
util::Lock lock(&m_mutex);
if(m_running.load(std::memory_order_relaxed))
{
return true;
}
m_thread = std::thread(&Scheduler::dispatch, this);
m_running = true;
return true;
}
void
Scheduler::stop()
{
util::Lock threadLock(&m_threadMutex);
// Can't join holding the lock. <_<
{
util::Lock lock(&m_mutex);
if(!m_running.load(std::memory_order_relaxed))
{
return;
}
m_running = false;
m_condition.Signal();
}
m_thread.join();
}
Scheduler::Handle
Scheduler::schedule(absl::Time time,
const std::function< void() >& callback,
const EventKey& key)
{
Handle handle;
{
util::Lock lock(&m_mutex);
bool isAtHead = false;
handle = m_eventQueue.add(time, callback, key, &isAtHead);
if(handle == -1)
{
return INVALID_HANDLE;
}
m_eventCount++;
// If we have an event at the top of the queue, wake the dispatcher.
if(isAtHead)
{
m_condition.Signal();
}
}
return handle;
}
bool
Scheduler::reschedule(Handle handle, absl::Time time, bool wait)
{
bool result = false;
{
util::Lock lock(&m_mutex);
bool isAtHead = false;
result = m_eventQueue.update(handle, time, &isAtHead);
if(isAtHead)
{
m_condition.Signal();
}
}
if(result && wait)
{
yield();
}
return result;
}
bool
Scheduler::reschedule(Handle handle, const EventKey& key, absl::Time time,
bool wait)
{
bool result = false;
{
util::Lock lock(&m_mutex);
bool isAtHead = false;
result = m_eventQueue.update(handle, key, time, &isAtHead);
if(isAtHead)
{
m_condition.Signal();
}
}
if(result && wait)
{
yield();
}
return result;
}
bool
Scheduler::cancel(Handle handle, const EventKey& key, bool wait)
{
if(m_eventQueue.remove(handle, key))
{
m_eventCount--;
return true;
}
// Optimise for the dispatcher thread cancelling a pending event.
// On the dispatch thread, so we don't have to lock.
if(std::this_thread::get_id() == m_thread.get_id())
{
for(auto it = m_events.begin() + m_eventCount; it != m_events.end();
++it)
{
if(it->handle() == handle && it->key() == key)
{
m_eventCount--;
m_events.erase(it);
return true;
}
}
// We didn't find it.
return false;
}
if(handle != INVALID_HANDLE && wait)
{
yield();
}
return false;
}
void
Scheduler::cancelAll(bool wait)
{
std::vector< EventItem > events;
m_eventQueue.removeAll(&events);
m_eventCount -= events.size();
if(wait)
{
yield();
}
}
Scheduler::Handle
Scheduler::scheduleRepeat(absl::Duration interval,
const std::function< void() >& callback,
absl::Time startTime)
{
// Assert that we're not giving an empty duration
assert(interval != absl::Duration());
if(startTime == absl::Time())
{
startTime = interval + m_clock();
}
auto repeatData = std::make_shared< RepeatData >(callback, interval);
{
util::Lock l(&m_mutex);
bool isAtHead = false;
repeatData->m_handle =
m_repeatQueue.add(startTime, repeatData, &isAtHead);
if(repeatData->m_handle == -1)
{
return INVALID_HANDLE;
}
m_repeatCount++;
if(isAtHead)
{
m_condition.Signal();
}
}
return m_repeats.add(repeatData);
}
bool
Scheduler::cancelRepeat(Handle handle, bool wait)
{
RepeatDataPtr data;
if(!m_repeats.remove(handle, &data))
{
return false;
}
m_repeatCount--;
if(!m_repeatQueue.remove(data->m_handle))
{
data->m_isCancelled = true;
if(wait)
{
yield();
}
}
return true;
}
void
Scheduler::cancelAllRepeats(bool wait)
{
std::vector< RepeatDataPtr > repeats;
m_repeats.removeAll(&repeats);
m_repeatCount -= m_repeats.size();
for(auto& repeat : repeats)
{
repeat->m_isCancelled = true;
}
// if we fail to remove something, we *may* have a pending repeat event in
// the dispatcher
bool somethingFailed = false;
for(auto& repeat : repeats)
{
if(!m_repeatQueue.remove(repeat->m_handle))
{
somethingFailed = true;
}
}
if(wait && somethingFailed)
{
yield();
}
}
} // namespace thread
} // namespace llarp

230
llarp/util/scheduler.hpp Normal file
View File

@ -0,0 +1,230 @@
#ifndef LLARP_SCHEDULER_HPP
#define LLARP_SCHEDULER_HPP
#include <util/object.hpp>
#include <util/timerqueue.hpp>
#include <absl/time/time.h>
#include <atomic>
#include <functional>
#include <thread>
#include <vector>
namespace llarp
{
namespace thread
{
/// This is a general purpose event scheduler, supporting both one-off and
/// repeated events.
///
/// Notes:
/// - Events should not be started before their begin time
/// - Events may start an arbitrary amount of time after they are scheduled,
/// if there is a previous long running event.
class Scheduler
{
public:
using Callback = std::function< void() >;
using Handle = int;
static const Handle INVALID_HANDLE;
// Define our own clock so we can test easier
using Clock = std::function< absl::Time() >;
private:
/// struct for repeated events
struct RepeatData
{
Callback m_callback;
absl::Duration m_period;
std::atomic_bool m_isCancelled;
Handle m_handle;
RepeatData(const Callback& callback, absl::Duration period)
: m_callback(callback)
, m_period(period)
, m_isCancelled(false)
, m_handle(0)
{
}
};
using RepeatDataPtr = std::shared_ptr< RepeatData >;
using RepeatQueue = TimerQueue< RepeatDataPtr >;
// Just for naming purposes.
using Event = Callback;
using EventQueue = TimerQueue< Event >;
using EventItem = TimerQueueItem< Event >;
public:
// Looks more horrible than it is.
using EventDispatcher = std::function< void(const Callback&) >;
using EventKey = EventQueue::Key;
private:
Clock m_clock;
EventQueue m_eventQueue;
RepeatQueue m_repeatQueue;
object::Catalog< RepeatDataPtr > m_repeats;
util::Mutex m_threadMutex ACQUIRED_BEFORE(m_mutex); // protects running
util::Mutex m_mutex ACQUIRED_AFTER(m_threadMutex); // master mutex
absl::CondVar m_condition;
EventDispatcher m_dispatcher;
std::thread m_thread;
std::atomic_bool m_running;
std::atomic_size_t m_iterationCount;
std::vector< EventItem > m_events;
std::vector< EventItem >::iterator m_eventIt;
std::atomic_size_t m_repeatCount;
std::atomic_size_t m_eventCount;
Scheduler(const Scheduler&) = delete;
Scheduler&
operator=(const Scheduler&) = delete;
friend class DispatcherImpl;
friend class Tardis;
/// Dispatch thread function
void
dispatch();
/// Yield to the dispatch thread
void
yield();
public:
/// Return the epoch from which to create `Durations` from.
static absl::Time
epoch()
{
return absl::UnixEpoch();
}
static EventDispatcher
defaultDispatcher()
{
return [](const Callback& callback) { callback(); };
}
static Clock
defaultClock()
{
return &absl::Now;
}
Scheduler() : Scheduler(defaultDispatcher(), defaultClock())
{
}
explicit Scheduler(const EventDispatcher& dispatcher)
: Scheduler(dispatcher, defaultClock())
{
}
explicit Scheduler(const Clock& clock)
: Scheduler(defaultDispatcher(), clock)
{
}
Scheduler(const EventDispatcher& dispatcher, const Clock& clock);
~Scheduler();
/// Start the scheduler
/// Note that currently this "can't fail" and return `false`. If thread
/// spawning fails, an exception will be thrown.
bool
start();
void
stop();
Handle
schedule(absl::Time time, const Callback& callback,
const EventKey& key = EventKey(nullptr));
bool
reschedule(Handle handle, absl::Time time, bool wait = false);
bool
reschedule(Handle handle, const EventKey& key, absl::Time time,
bool wait = false);
bool
cancel(Handle handle, bool wait = false)
{
return cancel(handle, EventKey(nullptr), wait);
}
bool
cancel(Handle handle, const EventKey& key, bool wait = false);
void
cancelAll(bool wait = false);
Handle
scheduleRepeat(absl::Duration interval, const Callback& callback,
absl::Time startTime = absl::Time());
bool
cancelRepeat(Handle handle, bool wait = false);
void
cancelAllRepeats(bool wait = false);
size_t
repeatCount() const
{
return m_repeatCount;
}
size_t
eventCount() const
{
return m_eventCount;
}
};
class Tardis
{
mutable util::Mutex m_mutex;
absl::Time m_time;
Scheduler& m_scheduler;
public:
Tardis(Scheduler& scheduler) : m_time(absl::Now()), m_scheduler(scheduler)
{
m_scheduler.m_clock = std::bind(&Tardis::now, this);
}
void
advanceTime(absl::Duration duration)
{
{
absl::WriterMutexLock l(&m_mutex);
m_time += duration;
}
{
absl::WriterMutexLock l(&m_scheduler.m_mutex);
m_scheduler.m_condition.Signal();
}
}
absl::Time
now() const
{
absl::ReaderMutexLock l(&m_mutex);
return m_time;
}
};
} // namespace thread
} // namespace llarp
#endif

1
llarp/util/stopwatch.cpp Normal file
View File

@ -0,0 +1 @@
#include <util/stopwatch.hpp>

55
llarp/util/stopwatch.hpp Normal file
View File

@ -0,0 +1,55 @@
#ifndef LLARP_STOPWATCH_HPP
#define LLARP_STOPWATCH_HPP
#include <absl/types/optional.h>
#include <absl/time/clock.h>
namespace llarp
{
namespace util
{
class Stopwatch
{
absl::optional< absl::Time > m_start;
absl::optional< absl::Time > m_stop;
public:
Stopwatch()
{
}
void
start()
{
assert(!m_start);
assert(!m_stop);
m_start.emplace(absl::Now());
}
void
stop()
{
assert(m_start);
assert(!m_stop);
m_stop.emplace(absl::Now());
}
bool
done() const
{
return m_start && m_stop;
}
absl::Duration
time() const
{
assert(m_start);
assert(m_stop);
return m_stop.value() - m_start.value();
}
};
} // namespace util
} // namespace llarp
#endif

View File

@ -0,0 +1 @@
#include <util/timerqueue.hpp>

748
llarp/util/timerqueue.hpp Normal file
View File

@ -0,0 +1,748 @@
#ifndef LLARP_UTIL_TIMERQUEUE_HPP
#define LLARP_UTIL_TIMERQUEUE_HPP
#include <util/object.hpp>
#include <util/threading.hpp>
#include <atomic>
#include <absl/time/time.h>
#include <absl/types/optional.h>
#include <map>
namespace llarp
{
namespace thread
{
template < typename Value >
class TimerQueueItem;
template < typename Value >
class TimerQueue
{
static constexpr int INDEX_BITS_MIN = 8;
static constexpr int INDEX_BITS_MAX = 24;
static constexpr int INDEX_BITS_DEFAULT = 17;
public:
using Handle = int;
static constexpr Handle INVALID_HANDLE = -1;
class Key
{
const void* m_key;
public:
explicit Key(const void* key) : m_key(key)
{
}
explicit Key(int value) : m_key(reinterpret_cast< const void* >(value))
{
}
bool
operator==(const Key& other) const
{
return m_key == other.m_key;
}
bool
operator!=(const Key& other) const
{
return m_key != other.m_key;
}
};
private:
struct Node
{
int m_index;
absl::Time m_time;
Key m_key;
Node* m_prev;
Node* m_next;
object::Buffer< Value > m_value;
Node()
: m_index(0)
, m_time()
, m_key(nullptr)
, m_prev(nullptr)
, m_next(nullptr)
, m_value()
{
}
explicit Node(const absl::Time& time)
: m_index(0)
, m_time(time)
, m_key(nullptr)
, m_prev(nullptr)
, m_next(nullptr)
, m_value()
{
}
};
using NodeMap = std::map< absl::Time, Node* >;
using MapIterator = typename NodeMap::iterator;
const int m_indexMask;
const int m_indexIterationMask;
const int m_indexIterationInc;
mutable util::Mutex m_mutex;
std::vector< Node* > m_nodes GUARDED_BY(m_mutex);
std::atomic< Node* > m_nextNode;
NodeMap m_nodeMap GUARDED_BY(m_mutex);
std::atomic_size_t m_size;
void
freeNode(Node* node)
{
node->m_index =
((node->m_index + m_indexIterationInc) & m_indexIterationMask)
| (node->m_index & m_indexMask);
if(!(node->m_index & m_indexIterationMask))
{
node->m_index += m_indexIterationInc;
}
node->m_prev = nullptr;
}
void
putFreeNode(Node* node)
{
// destroy in place
node->m_value.value().~Value();
Node* nextFreeNode = m_nextNode;
node->m_next = nextFreeNode;
while(!m_nextNode.compare_exchange_strong(nextFreeNode, node))
{
nextFreeNode = m_nextNode;
node->m_next = nextFreeNode;
}
}
void
putFreeNodeList(Node* node)
{
if(node)
{
node->m_value.value().~Value();
Node* end = node;
while(end->m_next)
{
end = end->m_next;
end->m_value.value().~Value();
}
Node* nextFreeNode = m_nextNode;
end->m_next = nextFreeNode;
while(!m_nextNode.compare_exchange_strong(nextFreeNode, node))
{
nextFreeNode = m_nextNode;
end->m_next = nextFreeNode;
}
}
}
TimerQueue(const TimerQueue&) = delete;
TimerQueue&
operator=(const TimerQueue&) = delete;
public:
TimerQueue()
: m_indexMask((1 << INDEX_BITS_DEFAULT) - 1)
, m_indexIterationMask(~m_indexMask)
, m_indexIterationInc(m_indexMask + 1)
, m_nextNode(nullptr)
, m_size(0)
{
}
explicit TimerQueue(int indexBits)
: m_indexMask((1 << indexBits) - 1)
, m_indexIterationMask(~m_indexMask)
, m_indexIterationInc(m_indexMask + 1)
, m_nextNode(nullptr)
, m_size(0)
{
assert(INDEX_BITS_MIN <= indexBits && indexBits <= INDEX_BITS_MAX);
}
~TimerQueue()
{
removeAll();
for(Node* node : m_nodes)
{
delete node;
}
}
/// Add a new `value` to the queue, scheduled for `time`. If not null:
/// - set `isAtHead` to true if the new item is at the front of the
/// queue (eg the item with the lowest `time` value).
/// - set `newSize` to be the length of the new queue.
Handle
add(absl::Time time, const Value& value, bool* isAtHead = nullptr,
size_t* newSize = nullptr)
{
return add(time, value, Key(nullptr), isAtHead, newSize);
}
Handle
add(absl::Time time, const Value& value, const Key& key,
bool* isAtHead = nullptr, size_t* newSize = nullptr);
Handle
add(const TimerQueueItem< Value >& value, bool* isAtHead = nullptr,
size_t* newSize = nullptr);
/// Pop the front of the queue into `item` (if not null).
bool
popFront(TimerQueueItem< Value >* item = nullptr,
size_t* newSize = nullptr, absl::Time* newMinTime = nullptr);
/// Append all records which are less than *or* equal to `time`.
void
popLess(absl::Time time,
std::vector< TimerQueueItem< Value > >* items = nullptr,
size_t* newSize = nullptr, absl::Time* newMinTime = nullptr);
void
popLess(absl::Time time, size_t maxItems,
std::vector< TimerQueueItem< Value > >* items = nullptr,
size_t* newSize = nullptr, absl::Time* newMinTime = nullptr);
bool
remove(Handle handle, TimerQueueItem< Value >* item = nullptr,
size_t* newSize = nullptr, absl::Time* newMinTime = nullptr)
{
return remove(handle, Key(nullptr), item, newSize, newMinTime);
}
bool
remove(Handle handle, const Key& key,
TimerQueueItem< Value >* item = nullptr, size_t* newSize = nullptr,
absl::Time* newMinTime = nullptr);
void
removeAll(std::vector< TimerQueueItem< Value > >* items = nullptr);
/// Update the `time` for the item referred to by the handle
bool
update(Handle handle, absl::Time time, bool* isNewTop = nullptr)
{
return update(handle, Key(nullptr), time, isNewTop);
}
bool
update(Handle handle, const Key& key, absl::Time time,
bool* isNewTop = nullptr);
size_t
size() const
{
return m_size;
}
bool
isValid(Handle handle) const
{
return isValid(handle, Key(nullptr));
}
bool
isValid(Handle handle, const Key& key) const
{
absl::ReaderMutexLock lock(&m_mutex);
int index = (handle & m_indexMask) - 1;
if(0 > index || index >= static_cast< int >(m_nodes.size()))
{
return false;
}
Node* node = m_nodes[index];
if(node->m_index != handle || node->m_key != key)
{
return false;
}
return true;
}
absl::optional< absl::Time >
nextTime() const
{
absl::ReaderMutexLock lock(&m_mutex);
if(m_nodeMap.empty())
{
return {};
}
return m_nodeMap.begin()->first;
}
};
template < typename Value >
class TimerQueueItem
{
public:
using Handle = typename TimerQueue< Value >::Handle;
using Key = typename TimerQueue< Value >::Key;
private:
absl::Time m_time;
Value m_value;
Handle m_handle;
Key m_key;
public:
TimerQueueItem() : m_time(), m_value(), m_handle(0), m_key(nullptr)
{
}
TimerQueueItem(absl::Time time, const Value& value, Handle handle)
: m_time(time), m_value(value), m_handle(handle), m_key(nullptr)
{
}
TimerQueueItem(absl::Time time, const Value& value, Handle handle,
const Key& key)
: m_time(time), m_value(value), m_handle(handle), m_key(key)
{
}
// clang-format off
absl::Time& time() { return m_time; }
absl::Time time() const { return m_time; }
Value& value() { return m_value; }
const Value& value() const { return m_value; }
Handle& handle() { return m_handle; }
Handle handle() const { return m_handle; }
Key& key() { return m_key; }
const Key& key() const { return m_key; }
// clang-format on
};
template < typename Value >
typename TimerQueue< Value >::Handle
TimerQueue< Value >::add(absl::Time time, const Value& value,
const Key& key, bool* isAtHead, size_t* newSize)
{
absl::WriterMutexLock lock(&m_mutex);
Node* node;
if(m_nextNode)
{
// Even though we lock, other threads might be freeing nodes
node = m_nextNode;
Node* next = node->m_next;
while(!m_nextNode.compare_exchange_strong(node, next))
{
node = m_nextNode;
next = node->m_next;
}
}
else
{
// The number of nodes cannot grow to a size larger than the range of
// available indices.
if((int)m_nodes.size() >= m_indexMask - 1)
{
return INVALID_HANDLE;
}
node = new Node;
m_nodes.push_back(node);
node->m_index =
static_cast< int >(m_nodes.size()) | m_indexIterationInc;
}
node->m_time = time;
node->m_key = key;
new(node->m_value.buffer()) Value(value);
{
auto it = m_nodeMap.find(time);
if(m_nodeMap.end() == it)
{
node->m_prev = node;
node->m_next = node;
m_nodeMap[time] = node;
}
else
{
node->m_prev = it->second->m_prev;
it->second->m_prev->m_next = node;
node->m_next = it->second;
it->second->m_prev = node;
}
}
++m_size;
if(isAtHead)
{
*isAtHead = m_nodeMap.begin()->second == node && node->m_prev == node;
}
if(newSize)
{
*newSize = m_size;
}
assert(-1 != node->m_index);
return node->m_index;
}
template < typename Value >
typename TimerQueue< Value >::Handle
TimerQueue< Value >::add(const TimerQueueItem< Value >& value,
bool* isAtHead, size_t* newSize)
{
return add(value.time(), value.value(), value.key(), isAtHead, newSize);
}
template < typename Value >
bool
TimerQueue< Value >::popFront(TimerQueueItem< Value >* item,
size_t* newSize, absl::Time* newMinTime)
{
Node* node = nullptr;
{
absl::WriterMutexLock lock(&m_mutex);
auto it = m_nodeMap.begin();
if(m_nodeMap.end() == it)
{
return false;
}
node = it->second;
if(item)
{
item->time() = node->m_time;
item->value() = node->m_value.value();
item->handle() = node->m_index;
item->key() = node->m_key;
}
if(node->m_next != node)
{
node->m_prev->m_next = node->m_next;
node->m_next->m_prev = node->m_prev;
if(it->second == node)
{
it->second = node->m_next;
}
}
else
{
m_nodeMap.erase(it);
}
freeNode(node);
--m_size;
if(m_size && newMinTime && !m_nodeMap.empty())
{
*newMinTime = m_nodeMap.begin()->first;
}
if(newSize)
{
*newSize = m_size;
}
}
putFreeNode(node);
return true;
}
template < typename Value >
void
TimerQueue< Value >::popLess(absl::Time time,
std::vector< TimerQueueItem< Value > >* items,
size_t* newSize, absl::Time* newMinTime)
{
Node* begin = nullptr;
{
absl::WriterMutexLock lock(&m_mutex);
auto it = m_nodeMap.begin();
while(m_nodeMap.end() != it && it->first <= time)
{
Node* const first = it->second;
Node* const last = first->m_prev;
Node* node = first;
do
{
if(items)
{
items->emplace_back(it->first, node->m_value.value(),
node->m_index, node->m_key);
}
freeNode(node);
node = node->m_next;
--m_size;
} while(node != first);
last->m_next = begin;
begin = first;
auto condemned = it;
++it;
m_nodeMap.erase(condemned);
}
if(newSize)
{
*newSize = m_size;
}
if(m_nodeMap.end() != it && newMinTime)
{
*newMinTime = it->first;
}
}
putFreeNodeList(begin);
}
template < typename Value >
void
TimerQueue< Value >::popLess(absl::Time time, size_t maxItems,
std::vector< TimerQueueItem< Value > >* items,
size_t* newSize, absl::Time* newMinTime)
{
Node* begin = nullptr;
{
absl::WriterMutexLock lock(&m_mutex);
auto it = m_nodeMap.begin();
while(m_nodeMap.end() != it && it->first <= time && 0 < maxItems)
{
Node* const first = it->second;
Node* const last = first->m_prev;
Node* node = first;
Node* prevNode = first->m_prev;
do
{
if(items)
{
items->emplace_back(it->first, node->m_value.value(),
node->m_index, node->m_key);
}
freeNode(node);
prevNode = node;
node = node->m_next;
--m_size;
--maxItems;
} while(0 < maxItems && node != first);
prevNode->m_next = begin;
begin = first;
if(node == first)
{
auto condemned = it;
++it;
m_nodeMap.erase(condemned);
}
else
{
node->m_prev = last;
last->m_next = node;
it->second = node;
break;
}
}
if(newSize)
{
*newSize = m_size;
}
if(m_nodeMap.end() != it && newMinTime)
{
*newMinTime = it->first;
}
}
putFreeNodeList(begin);
}
template < typename Value >
bool
TimerQueue< Value >::remove(Handle handle, const Key& key,
TimerQueueItem< Value >* item, size_t* newSize,
absl::Time* newMinTime)
{
Node* node = nullptr;
{
absl::WriterMutexLock lock(&m_mutex);
int index = (handle & m_indexMask) - 1;
if(index < 0 || index >= (int)m_nodes.size())
{
return false;
}
node = m_nodes[index];
if(node->m_index != (int)handle || node->m_key != key
|| nullptr == node->m_prev)
{
return false;
}
if(item)
{
item->time() = node->m_time;
item->value() = node->m_value.value();
item->handle() = node->m_index;
item->key() = node->m_key;
}
if(node->m_next != node)
{
node->m_prev->m_next = node->m_next;
node->m_next->m_prev = node->m_prev;
auto it = m_nodeMap.find(node->m_time);
if(it->second == node)
{
it->second = node->m_next;
}
}
else
{
m_nodeMap.erase(node->m_time);
}
freeNode(node);
--m_size;
if(newSize)
{
*newSize = m_size;
}
if(m_size && newMinTime)
{
assert(!m_nodeMap.empty());
*newMinTime = m_nodeMap.begin()->first;
}
}
putFreeNode(node);
return true;
}
template < typename Value >
void
TimerQueue< Value >::removeAll(
std::vector< TimerQueueItem< Value > >* items)
{
Node* begin = nullptr;
{
absl::WriterMutexLock lock(&m_mutex);
auto it = m_nodeMap.begin();
while(m_nodeMap.end() != it)
{
Node* const first = it->second;
Node* const last = first->m_prev;
Node* node = first;
do
{
if(items)
{
items->emplace_back(it->first, node->m_value.value(),
node->m_index, node->m_key);
}
freeNode(node);
node = node->m_next;
--m_size;
} while(node != first);
last->m_next = begin;
begin = first;
auto condemned = it;
++it;
m_nodeMap.erase(condemned);
}
}
putFreeNodeList(begin);
}
template < typename Value >
bool
TimerQueue< Value >::update(Handle handle, const Key& key, absl::Time time,
bool* isNewTop)
{
absl::WriterMutexLock lock(&m_mutex);
int index = (handle & m_indexMask) - 1;
if(index < 0 || index >= (int)m_nodes.size())
{
return false;
}
Node* node = m_nodes[index];
if(node->m_index != handle || node->m_key != key)
{
return false;
}
if(node->m_prev != node)
{
node->m_prev->m_next = node->m_next;
node->m_next->m_prev = node->m_prev;
auto it = m_nodeMap.find(node->m_time);
if(it->second == node)
{
it->second = node->m_next;
}
}
else
{
m_nodeMap.erase(node->m_time);
}
node->m_time = time;
auto it = m_nodeMap.find(time);
if(m_nodeMap.end() == it)
{
node->m_prev = node;
node->m_next = node;
m_nodeMap[time] = node;
}
else
{
node->m_prev = it->second->m_prev;
it->second->m_prev->m_next = node;
node->m_next = it->second;
it->second->m_prev = node;
}
if(isNewTop)
{
*isNewTop = m_nodeMap.begin()->second == node && node->m_prev == node;
}
return true;
}
} // namespace thread
} // namespace llarp
#endif

View File

@ -35,10 +35,16 @@ list(APPEND TEST_SRC
util/test_llarp_util_bits.cpp
util/test_llarp_util_encode.cpp
util/test_llarp_util_ini.cpp
util/test_llarp_util_metrics_core.cpp
util/test_llarp_util_metrics_publisher.cpp
util/test_llarp_util_metrics_types.cpp
util/test_llarp_util_object.cpp
util/test_llarp_util_printer.cpp
util/test_llarp_util_queue_manager.cpp
util/test_llarp_util_queue.cpp
util/test_llarp_util_thread_pool.cpp
util/test_llarp_utils_scheduler.cpp
util/test_llarp_util_timerqueue.cpp
util/test_llarp_util_traits.cpp
)

View File

@ -4,6 +4,9 @@
#include <util/fs.hpp>
#include <util/types.hpp>
#include <bitset>
#include <vector>
namespace llarp
{
namespace test
@ -37,6 +40,55 @@ namespace llarp
}
};
template < typename T >
struct CombinationIterator
{
std::vector< T > toCombine;
std::vector< T > currentCombo;
int bits;
int maxBits;
void
createCombo()
{
currentCombo.clear();
for(size_t i = 0; i < toCombine.size(); ++i)
{
if(bits & (1 << i))
{
currentCombo.push_back(toCombine[i]);
}
}
}
CombinationIterator(const std::vector< T > &values)
: toCombine(values), bits(0), maxBits((1 << values.size()) - 1)
{
currentCombo.reserve(values.size());
createCombo();
}
bool
next()
{
if(bits >= maxBits)
{
return false;
}
++bits;
createCombo();
return true;
}
bool
includesElement(size_t index)
{
return bits & (1 << index);
}
};
} // namespace test
} // namespace llarp

View File

@ -0,0 +1,898 @@
#include <util/metrics_core.hpp>
#include <array>
#include <thread>
#include <test_util.hpp>
#include <gtest/gtest.h>
#include <gmock/gmock.h>
using namespace llarp;
using namespace metrics;
using namespace ::testing;
MATCHER(IsValid, "")
{
return arg.valid();
}
static const Category STAT_CAT("A", true);
static const Description desc_A(&STAT_CAT, "A");
static const Description *DESC_A = &desc_A;
static const Description desc_B(&STAT_CAT, "B");
static const Description *DESC_B = &desc_B;
static const Id METRIC_A(DESC_A);
static const Id METRIC_B(DESC_B);
template < typename T >
class CollectorTest : public ::testing::Test
{
};
TYPED_TEST_SUITE_P(CollectorTest);
TYPED_TEST_P(CollectorTest, Collector)
{
TypeParam collector1(METRIC_A);
TypeParam collector2(METRIC_B);
ASSERT_EQ(METRIC_A, collector1.id().description());
ASSERT_EQ(METRIC_B, collector2.id().description());
Record record1 = collector1.load();
ASSERT_EQ(METRIC_A, record1.id().description());
ASSERT_EQ(0, record1.count());
ASSERT_EQ(0, record1.total());
ASSERT_EQ(Record::DEFAULT_MAX, record1.max());
ASSERT_EQ(Record::DEFAULT_MIN, record1.min());
Record record2 = collector2.load();
ASSERT_EQ(METRIC_B, record2.id().description());
ASSERT_EQ(0, record2.count());
ASSERT_EQ(0, record2.total());
ASSERT_EQ(Record::DEFAULT_MIN, record2.min());
ASSERT_EQ(Record::DEFAULT_MAX, record2.max());
collector1.tick(1);
record1 = collector1.load();
ASSERT_EQ(METRIC_A, record1.id().description());
ASSERT_EQ(1, record1.count());
ASSERT_EQ(1, record1.total());
ASSERT_EQ(1, record1.min());
ASSERT_EQ(1, record1.max());
collector1.tick(2);
record1 = collector1.load();
ASSERT_EQ(METRIC_A, record1.id().description());
ASSERT_EQ(2, record1.count());
ASSERT_EQ(3, record1.total());
ASSERT_EQ(1, record1.min());
ASSERT_EQ(2, record1.max());
collector1.tick(-5);
record1 = collector1.load();
ASSERT_EQ(METRIC_A, record1.id().description());
ASSERT_EQ(3, record1.count());
ASSERT_EQ(-2, record1.total());
ASSERT_EQ(-5, record1.min());
ASSERT_EQ(2, record1.max());
collector1.clear();
record1 = collector1.load();
ASSERT_EQ(METRIC_A, record1.id().description());
ASSERT_EQ(0, record1.count());
ASSERT_EQ(0, record1.total());
ASSERT_EQ(Record::DEFAULT_MIN, record1.min());
ASSERT_EQ(Record::DEFAULT_MAX, record1.max());
collector1.tick(3);
record1 = collector1.loadAndClear();
ASSERT_EQ(METRIC_A, record1.id().description());
ASSERT_EQ(1, record1.count());
ASSERT_EQ(3, record1.total());
ASSERT_EQ(3, record1.min());
ASSERT_EQ(3, record1.max());
record1 = collector1.load();
ASSERT_EQ(METRIC_A, record1.id().description());
ASSERT_EQ(0, record1.count());
ASSERT_EQ(0, record1.total());
ASSERT_EQ(Record::DEFAULT_MIN, record1.min());
ASSERT_EQ(Record::DEFAULT_MAX, record1.max());
}
REGISTER_TYPED_TEST_SUITE_P(CollectorTest, Collector);
using CollectorTestTypes = ::testing::Types< DoubleCollector, IntCollector >;
INSTANTIATE_TYPED_TEST_SUITE_P(MetricsCore, CollectorTest, CollectorTestTypes);
TEST(MetricsCore, Registry)
{
Registry registry;
Id idA = registry.add("MyCategory", "MetricA");
Id invalidId = registry.add("MyCategory", "MetricA");
ASSERT_THAT(invalidId, Not(IsValid()));
Id idA_copy1 = registry.get("MyCategory", "MetricA");
ASSERT_THAT(idA_copy1, IsValid());
ASSERT_EQ(idA_copy1, idA);
Id idA_copy2 = registry.findId("MyCategory", "MetricA");
ASSERT_THAT(idA_copy2, IsValid());
ASSERT_EQ(idA_copy2, idA);
Id idB = registry.get("MyCategory", "MetricB");
ASSERT_THAT(idB, IsValid());
ASSERT_EQ(idB, registry.get("MyCategory", "MetricB"));
ASSERT_EQ(idB, registry.findId("MyCategory", "MetricB"));
ASSERT_THAT(registry.add("MyCategory", "MetricB"), Not(IsValid()));
const Category *myCategory = registry.get("MyCategory");
ASSERT_EQ(myCategory, idA.category());
ASSERT_EQ(myCategory, idB.category());
ASSERT_TRUE(myCategory->enabled());
registry.enable(myCategory, false);
ASSERT_FALSE(myCategory->enabled());
}
TEST(MetricsCore, RegistryAddr)
{
Registry registry;
const Category *CAT_A = registry.add("A");
const Category *CAT_B = registry.get("B");
Id METRIC_AA = registry.add("A", "A");
Id METRIC_AB = registry.add("A", "B");
Id METRIC_AC = registry.add("A", "C");
Id METRIC_BA = registry.get("B", "A");
Id METRIC_BB = registry.get("B", "B");
Id METRIC_BD = registry.get("B", "D");
const Category *CAT_C = registry.add("C");
const Category *CAT_D = registry.add("D");
Id METRIC_EE = registry.add("E", "E");
Id METRIC_FF = registry.get("F", "F");
ASSERT_EQ(CAT_A->name(), METRIC_AA.metricName());
ASSERT_EQ(CAT_B->name(), METRIC_AB.metricName());
ASSERT_EQ(CAT_A->name(), METRIC_BA.metricName());
ASSERT_EQ(CAT_B->name(), METRIC_BB.metricName());
ASSERT_EQ(CAT_C->name(), METRIC_AC.metricName());
ASSERT_EQ(CAT_D->name(), METRIC_BD.metricName());
ASSERT_EQ(METRIC_EE.metricName(), METRIC_EE.categoryName());
ASSERT_EQ(METRIC_FF.metricName(), METRIC_FF.categoryName());
}
TEST(MetricsCore, RegistryOps)
{
struct
{
const char *d_category;
const char *d_name;
} METRICS[] = {
{
"",
"",
},
{"C0", "M0"},
{"C0", "M1"},
{"C1", "M2"},
{"C3", "M3"},
};
const size_t NUM_METRICS = sizeof METRICS / sizeof *METRICS;
{
std::set< std::string > categoryNames;
Registry registry;
for(size_t i = 0; i < NUM_METRICS; ++i)
{
const char *CATEGORY = METRICS[i].d_category;
const char *NAME = METRICS[i].d_name;
categoryNames.insert(CATEGORY);
// Add a new id and verify the returned properties.
Id id = registry.add(CATEGORY, NAME);
ASSERT_TRUE(id.valid()) << id;
ASSERT_NE(nullptr, id.description());
ASSERT_NE(nullptr, id.category());
ASSERT_STREQ(id.metricName(), NAME);
ASSERT_STREQ(id.categoryName(), CATEGORY);
ASSERT_TRUE(id.category()->enabled());
// Attempt to find the id.
Id foundId = registry.findId(CATEGORY, NAME);
ASSERT_TRUE(foundId.valid());
ASSERT_EQ(foundId, id);
// Attempt to add the id a second time
Id invalidId = registry.add(CATEGORY, NAME);
ASSERT_FALSE(invalidId.valid());
// Attempt to find the category.
const Category *foundCat = registry.findCategory(CATEGORY);
ASSERT_EQ(id.category(), foundCat);
ASSERT_EQ(nullptr, registry.add(CATEGORY));
ASSERT_EQ(i + 1, registry.metricCount());
ASSERT_EQ(categoryNames.size(), registry.categoryCount());
}
ASSERT_EQ(NUM_METRICS, registry.metricCount());
ASSERT_EQ(categoryNames.size(), registry.categoryCount());
const Category *NEW_CAT = registry.add("NewCategory");
ASSERT_NE(nullptr, NEW_CAT);
ASSERT_STREQ("NewCategory", NEW_CAT->name());
ASSERT_TRUE(NEW_CAT->enabled());
}
const char *CATEGORIES[] = {"", "A", "B", "CAT_A", "CAT_B", "name"};
const size_t NUM_CATEGORIES = sizeof CATEGORIES / sizeof *CATEGORIES;
{
Registry registry;
for(size_t i = 0; i < NUM_CATEGORIES; ++i)
{
const char *CATEGORY = CATEGORIES[i];
const Category *cat = registry.add(CATEGORY);
ASSERT_NE(nullptr, cat);
ASSERT_STREQ(cat->name(), CATEGORY);
ASSERT_TRUE(cat->enabled());
ASSERT_EQ(nullptr, registry.add(CATEGORY));
ASSERT_EQ(cat, registry.findCategory(CATEGORY));
Id id = registry.add(CATEGORY, "Metric");
ASSERT_TRUE(id.valid());
ASSERT_EQ(cat, id.category());
ASSERT_STREQ(id.categoryName(), CATEGORY);
ASSERT_STREQ(id.metricName(), "Metric");
ASSERT_EQ(i + 1, registry.metricCount());
ASSERT_EQ(i + 1, registry.categoryCount());
}
}
}
MATCHER_P6(RecordEq, category, name, count, total, min, max, "")
{
// clang-format off
return (
arg.id().categoryName() == std::string(category) &&
arg.id().metricName() == std::string(name) &&
arg.count() == count &&
arg.total() == total &&
arg.min() == min &&
arg.max() == max
);
// clang-format on
}
MATCHER_P5(RecordEq, id, count, total, min, max, "")
{
// clang-format off
return (
arg.id() == id &&
arg.count() == count &&
arg.total() == total &&
arg.min() == min &&
arg.max() == max
);
// clang-format on
}
MATCHER_P4(RecordEq, count, total, min, max, "")
{
// clang-format off
return (
arg.count() == count &&
arg.total() == total &&
arg.min() == min &&
arg.max() == max
);
// clang-format on
}
MATCHER_P5(RecordCatEq, category, count, total, min, max, "")
{
// clang-format off
return (
arg.id().categoryName() == std::string(category) &&
arg.count() == count &&
arg.total() == total &&
arg.min() == min &&
arg.max() == max
);
// clang-format on
}
TEST(MetricsCore, RepoBasic)
{
Registry registry;
CollectorRepo repo(&registry);
DoubleCollector *collector1 = repo.defaultDoubleCollector("Test", "C1");
DoubleCollector *collector2 = repo.defaultDoubleCollector("Test", "C2");
IntCollector *intCollector1 = repo.defaultIntCollector("Test", "C3");
IntCollector *intCollector2 = repo.defaultIntCollector("Test", "C4");
ASSERT_NE(collector1, collector2);
ASSERT_EQ(collector1, repo.defaultDoubleCollector("Test", "C1"));
ASSERT_NE(intCollector1, intCollector2);
ASSERT_EQ(intCollector1, repo.defaultIntCollector("Test", "C3"));
collector1->tick(1.0);
collector1->tick(2.0);
collector2->tick(4.0);
intCollector1->tick(5);
intCollector2->tick(6);
std::vector< Record > records = repo.collectAndClear(registry.get("Test"));
ASSERT_THAT(records, SizeIs(4));
// clang-format off
ASSERT_THAT(
records,
ElementsAre(
RecordEq("Test", "C1", 2u, 3, 1, 2),
RecordEq("Test", "C2", 1u, 4, 4, 4),
RecordEq("Test", "C3", 1u, 5, 5, 5),
RecordEq("Test", "C4", 1u, 6, 6, 6)
)
);
// clang-format on
for(const auto &rec : records)
{
std::cout << rec << std::endl;
}
}
TEST(MetricsCore, RepoCollect)
{
Registry registry;
std::array< const char *, 3 > CATEGORIES = {"A", "B", "C"};
std::array< const char *, 3 > METRICS = {"A", "B", "C"};
const int NUM_COLS = 3;
for(int i = 0; i < static_cast< int >(CATEGORIES.size()); ++i)
{
CollectorRepo repo(&registry);
for(int j = 0; j < static_cast< int >(CATEGORIES.size()); ++j)
{
const char *CATEGORY = CATEGORIES[j];
for(int k = 0; k < static_cast< int >(METRICS.size()); ++k)
{
Id metric = registry.get(CATEGORY, METRICS[k]);
for(int l = 0; l < NUM_COLS; ++l)
{
DoubleCollector *dCol = repo.addDoubleCollector(metric).get();
IntCollector *iCol = repo.addIntCollector(metric).get();
if(i == j)
{
dCol->set(k, 2 * k, -k, k);
iCol->set(k, 2 * k, -k, k);
}
else
{
dCol->set(100, 100, 100, 100);
iCol->set(100, 100, 100, 100);
}
}
}
}
// Collect records for the metrics we're testing
{
const char *CATEGORY = CATEGORIES[i];
const Category *category = registry.get(CATEGORY);
std::vector< Record > records = repo.collect(category);
ASSERT_THAT(records, SizeIs(static_cast< int >(METRICS.size())));
// clang-format off
ASSERT_THAT(
records,
UnorderedElementsAre(
RecordEq(CATEGORY, "A", 0u, 0, 0, 0),
RecordEq(CATEGORY, "B", 6u, 12, -1, 1),
RecordEq(CATEGORY, "C", 12u, 24, -2, 2)
)
);
// clang-format on
// Validate initial values.
for(int j = 0; j < static_cast< int >(METRICS.size()); ++j)
{
Id metric = registry.get(CATEGORY, METRICS[j]);
auto collectors = repo.allCollectors(metric);
const auto &doubleCols = collectors.first;
const auto &intCols = collectors.second;
for(int k = 0; k < static_cast< int >(doubleCols.size()); ++k)
{
Record E(metric, j, 2 * j, -j, j);
Record record1 = doubleCols[k]->load();
Record record2 = intCols[k]->load();
ASSERT_EQ(record1, E);
ASSERT_EQ(record2, E);
}
}
}
// Verify the collectors for other categories haven't changed.
for(int j = 0; j < static_cast< int >(CATEGORIES.size()); ++j)
{
if(i == j)
{
continue;
}
const char *CATEGORY = CATEGORIES[j];
for(int k = 0; k < static_cast< int >(METRICS.size()); ++k)
{
Id metric = registry.get(CATEGORY, METRICS[j]);
auto collectors = repo.allCollectors(metric);
const auto &doubleCols = collectors.first;
const auto &intCols = collectors.second;
for(int l = 0; l < static_cast< int >(doubleCols.size()); ++l)
{
Record record1 = doubleCols[k]->load();
ASSERT_THAT(record1, RecordEq(metric, 100u, 100, 100, 100));
Record record2 = intCols[k]->load();
ASSERT_THAT(record2, RecordEq(metric, 100u, 100, 100, 100));
}
}
}
}
}
MATCHER_P2(WithinWindow, expectedTime, window, "")
{
auto begin = expectedTime - window;
auto end = expectedTime + window;
return (begin < arg && arg < end);
}
const Category *
firstCategory(const SampleGroup &group)
{
EXPECT_THAT(group, Not(IsEmpty()));
const Category *value = group.begin()->id().category();
for(const Record &record : group.records())
{
EXPECT_EQ(value, record.id().category());
}
return value;
}
TEST(MetricsCore, ManagerCollectSample1)
{
const char *CATEGORIES[] = {"A", "B", "C", "Test", "12312category"};
const int NUM_CATEGORIES = sizeof(CATEGORIES) / sizeof(*CATEGORIES);
const char *METRICS[] = {"A", "B", "C", "MyMetric", "90123metric"};
const int NUM_METRICS = sizeof(METRICS) / sizeof(*METRICS);
Manager manager;
CollectorRepo &rep = manager.collectorRepo();
for(int i = 0; i < NUM_CATEGORIES; ++i)
{
for(int j = 0; j < NUM_METRICS; ++j)
{
rep.defaultDoubleCollector(CATEGORIES[i], METRICS[j])->tick(1);
}
}
absl::Time start = absl::Now();
std::this_thread::sleep_for(std::chrono::microseconds(100000));
std::vector< Record > records;
Sample sample = manager.collectSample(records, false);
absl::Duration window = absl::Now() - start;
absl::Time now = absl::Now();
ASSERT_EQ(NUM_CATEGORIES * NUM_METRICS, records.size());
ASSERT_EQ(NUM_CATEGORIES * NUM_METRICS, sample.recordCount());
ASSERT_EQ(NUM_CATEGORIES, sample.groupCount());
ASSERT_THAT(sample.sampleTime(), WithinWindow(now, absl::Milliseconds(10)));
for(size_t i = 0; i < sample.groupCount(); ++i)
{
const SampleGroup &group = sample.group(i);
ASSERT_EQ(NUM_METRICS, group.size());
ASSERT_THAT(group.samplePeriod(),
WithinWindow(window, absl::Milliseconds(10)))
<< group;
const char *name = group.records()[0].id().categoryName();
for(const Record &record : group.records())
{
ASSERT_THAT(record, RecordCatEq(name, 1u, 1, 1, 1));
}
}
for(size_t i = 0; i < NUM_CATEGORIES; ++i)
{
for(size_t j = 0; j < NUM_METRICS; ++j)
{
DoubleCollector *col =
rep.defaultDoubleCollector(CATEGORIES[i], METRICS[j]);
Record record = col->load();
ASSERT_THAT(record, RecordEq(1u, 1, 1, 1));
}
}
records.clear();
sample = manager.collectSample(records, true);
ASSERT_EQ(NUM_CATEGORIES * NUM_METRICS, records.size());
ASSERT_EQ(NUM_CATEGORIES * NUM_METRICS, sample.recordCount());
ASSERT_EQ(NUM_CATEGORIES, sample.groupCount());
for(size_t i = 0; i < NUM_CATEGORIES; ++i)
{
for(size_t j = 0; j < NUM_METRICS; ++j)
{
DoubleCollector *col =
rep.defaultDoubleCollector(CATEGORIES[i], METRICS[j]);
Record record = col->load();
ASSERT_EQ(Record(record.id()), record);
}
}
}
TEST(MetricsCore, ManagerCollectSample2)
{
const char *CATEGORIES[] = {"A", "B", "C", "Test", "12312category"};
const int NUM_CATEGORIES = sizeof(CATEGORIES) / sizeof(*CATEGORIES);
const char *METRICS[] = {"A", "B", "C", "MyMetric", "90123metric"};
const int NUM_METRICS = sizeof(METRICS) / sizeof(*METRICS);
Manager manager;
std::vector< const Category * > allCategories;
CollectorRepo &rep = manager.collectorRepo();
Registry &reg = manager.registry();
for(size_t i = 0; i < NUM_CATEGORIES; ++i)
{
const Category *cat = reg.get(CATEGORIES[i]);
ASSERT_NE(nullptr, cat);
allCategories.push_back(cat);
}
test::CombinationIterator< const Category * > combIt{allCategories};
do
{
for(size_t i = 0; i < NUM_CATEGORIES; ++i)
{
for(size_t j = 0; j < NUM_METRICS; ++j)
{
DoubleCollector *col =
rep.defaultDoubleCollector(CATEGORIES[i], METRICS[j]);
col->clear();
col->tick(1);
}
}
// Test without a reset.
std::vector< const Category * > cats = combIt.currentCombo;
std::vector< Record > records;
Sample sample = manager.collectSample(
records, absl::Span< const Category * >{cats}, false);
ASSERT_EQ(NUM_METRICS * cats.size(), sample.recordCount());
ASSERT_EQ(cats.size(), sample.groupCount());
for(size_t i = 0; i < NUM_CATEGORIES; ++i)
{
// Verify the correct categories are in the sample (once)
const Category *CATEGORY = allCategories[i];
bool found = false;
for(size_t j = 0; j < sample.groupCount(); ++j)
{
if(CATEGORY == firstCategory(sample.group(j)))
{
found = true;
}
}
ASSERT_EQ(found, combIt.includesElement(i));
}
for(size_t i = 0; i < NUM_CATEGORIES; ++i)
{
for(size_t j = 0; j < NUM_METRICS; ++j)
{
DoubleCollector *col =
rep.defaultDoubleCollector(CATEGORIES[i], METRICS[j]);
Record record = col->load();
ASSERT_THAT(record, RecordEq(1u, 1, 1, 1));
}
}
std::vector< Record > records2;
// Test with a reset.
sample = manager.collectSample(records2,
absl::Span< const Category * >{cats}, true);
ASSERT_EQ(NUM_METRICS * cats.size(), sample.recordCount());
ASSERT_EQ(cats.size(), sample.groupCount());
ASSERT_EQ(records, records2);
for(size_t i = 0; i < NUM_CATEGORIES; ++i)
{
// Verify the correct categories are in the sample
const Category *CATEGORY = allCategories[i];
bool found = false;
for(size_t j = 0; j < sample.groupCount(); ++j)
{
if(CATEGORY == firstCategory(sample.group(j)))
{
found = true;
}
}
ASSERT_EQ(found, combIt.includesElement(i));
}
for(size_t i = 0; i < NUM_CATEGORIES; ++i)
{
for(size_t j = 0; j < NUM_METRICS; ++j)
{
DoubleCollector *col =
rep.defaultDoubleCollector(CATEGORIES[i], METRICS[j]);
Record record = col->load();
if(combIt.includesElement(i))
{
ASSERT_EQ(Record(record.id()), record);
}
else
{
ASSERT_THAT(record, RecordEq(1u, 1, 1, 1));
}
}
}
} while(combIt.next());
}
struct MockPublisher : public Publisher
{
std::atomic_int invocations;
std::vector< Record > recordBuffer;
std::vector< Record > sortedRecords;
Sample m_sample;
std::set< absl::Duration > times;
void
publish(const Sample &sample) override
{
invocations++;
m_sample.clear();
recordBuffer.clear();
sortedRecords.clear();
times.clear();
m_sample.sampleTime(sample.sampleTime());
if(sample.recordCount() == 0)
{
return;
}
recordBuffer.reserve(sample.recordCount());
for(const auto &s : sample)
{
auto git = s.begin();
ASSERT_NE(git, s.end());
recordBuffer.push_back(*git);
Record *head = &recordBuffer.back();
for(++git; git != s.end(); ++git)
{
recordBuffer.push_back(*git);
}
m_sample.pushGroup(head, s.size(), s.samplePeriod());
times.insert(s.samplePeriod());
}
sortedRecords = recordBuffer;
std::sort(
sortedRecords.begin(), sortedRecords.end(),
[](const auto &lhs, const auto &rhs) { return lhs.id() < rhs.id(); });
}
void
reset()
{
invocations = 0;
m_sample.clear();
recordBuffer.clear();
sortedRecords.clear();
times.clear();
}
int
indexOf(const Id &id)
{
Record searchRecord(id);
auto it = std::lower_bound(
sortedRecords.begin(), sortedRecords.end(), searchRecord,
[](const auto &lhs, const auto &rhs) { return lhs.id() < rhs.id(); });
if(it == sortedRecords.end())
{
return -1;
}
return (it->id() == id) ? it - sortedRecords.begin() : -1;
}
bool
contains(const Id &id)
{
return indexOf(id) != -1;
}
};
TEST(MetricsCore, ManagerAddCatPub)
{
const char *CATEGORIES[] = {"A", "B", "C", "Test", "12312category"};
const int NUM_CATEGORIES = sizeof(CATEGORIES) / sizeof(*CATEGORIES);
const int NUM_PUBLISHERS = 4;
std::multimap< const char *, std::shared_ptr< Publisher > > publishers;
Manager manager;
Registry &registry = manager.registry();
for(int i = 0; i < NUM_CATEGORIES; ++i)
{
for(int j = 0; j < NUM_PUBLISHERS; ++j)
{
auto globalPub = std::make_shared< MockPublisher >();
manager.addPublisher(CATEGORIES[i], globalPub);
publishers.emplace(CATEGORIES[i], globalPub);
}
}
for(int i = 0; i < NUM_CATEGORIES; ++i)
{
const char *CATEGORY = CATEGORIES[i];
const Category *CAT = registry.get(CATEGORY);
std::vector< Publisher * > results = manager.publishersForCategory(CAT);
ASSERT_EQ(NUM_PUBLISHERS, results.size());
auto it = publishers.lower_bound(CATEGORY);
for(const auto &pub : results)
{
ASSERT_EQ(pub, it->second.get());
++it;
}
}
}
TEST(MetricsCore, ManagerEnableAll)
{
const char *CATEGORIES[] = {"A", "B", "C", "Test", "12312category"};
const int NUM_CATEGORIES = sizeof(CATEGORIES) / sizeof(*CATEGORIES);
Manager manager;
Registry &registry = manager.registry();
for(int i = 0; i < NUM_CATEGORIES; ++i)
{
const Category *CAT = registry.get(CATEGORIES[i]);
ASSERT_TRUE(CAT->enabled());
manager.enableCategory(CAT, false);
ASSERT_FALSE(CAT->enabled());
manager.enableCategory(CAT, true);
ASSERT_TRUE(CAT->enabled());
manager.enableCategory(CATEGORIES[i], false);
ASSERT_FALSE(CAT->enabled());
manager.enableCategory(CATEGORIES[i], true);
ASSERT_TRUE(CAT->enabled());
}
manager.enableAll(false);
for(int i = 0; i < NUM_CATEGORIES; ++i)
{
ASSERT_FALSE(registry.get(CATEGORIES[i])->enabled());
}
manager.enableAll(true);
for(int i = 0; i < NUM_CATEGORIES; ++i)
{
ASSERT_TRUE(registry.get(CATEGORIES[i])->enabled());
}
}
TEST(MetricsCore, PublishAll)
{
const char *CATEGORIES[] = {"A", "B", "C", "Test", "12312category"};
const int NUM_CATEGORIES = sizeof(CATEGORIES) / sizeof(*CATEGORIES);
const char *METRICS[] = {"A", "B", "C", "MyMetric", "903metric"};
const int NUM_METRICS = sizeof(METRICS) / sizeof(*METRICS);
Manager manager;
Registry &registry = manager.registry();
CollectorRepo &repository = manager.collectorRepo();
auto globalPub = std::make_shared< MockPublisher >();
manager.addGlobalPublisher(globalPub);
std::vector< const Category * > allCategories;
for(int i = 0; i < NUM_CATEGORIES; ++i)
{
const Category *CAT = registry.get(CATEGORIES[i]);
auto mockPubCat = std::make_shared< MockPublisher >();
manager.addPublisher(CAT, mockPubCat);
allCategories.push_back(CAT);
}
test::CombinationIterator< const Category * > combIt(allCategories);
do
{
for(int i = 0; i < NUM_CATEGORIES; ++i)
{
for(int j = 0; j < NUM_METRICS; ++j)
{
DoubleCollector *col =
repository.defaultDoubleCollector(CATEGORIES[i], METRICS[j]);
col->clear();
col->tick(1);
}
}
std::set< const Category * > excludedSet;
for(int i = 0; i < NUM_CATEGORIES; ++i)
{
if(!combIt.includesElement(i))
{
excludedSet.insert(allCategories[i]);
}
}
ASSERT_EQ(allCategories.size(),
excludedSet.size() + combIt.currentCombo.size());
// Publish the records.
absl::Time tmStamp = absl::Now();
manager.publishAllExcluding(excludedSet);
if(combIt.currentCombo.empty())
{
ASSERT_EQ(0, globalPub->invocations.load());
}
else
{
ASSERT_EQ(1, globalPub->invocations.load());
ASSERT_THAT(globalPub->m_sample.sampleTime(),
WithinWindow(tmStamp, absl::Milliseconds(10)));
ASSERT_EQ(combIt.currentCombo.size(), globalPub->m_sample.groupCount());
}
// Verify the correct "specific" publishers have been invoked.
for(int i = 0; i < NUM_CATEGORIES; ++i)
{
for(int j = 0; j < NUM_METRICS; ++j)
{
Id id = registry.get(CATEGORIES[i], METRICS[j]);
ASSERT_EQ(combIt.includesElement(i), globalPub->contains(id));
}
const int EXP_INV = combIt.includesElement(i) ? 1 : 0;
std::vector< Publisher * > pubs =
manager.publishersForCategory(allCategories[i]);
MockPublisher *specPub = (MockPublisher *)pubs.front();
ASSERT_EQ(EXP_INV, specPub->invocations.load());
specPub->reset();
}
globalPub->reset();
} while(combIt.next());
}

View File

@ -0,0 +1,32 @@
#include <util/metrics_publishers.hpp>
#include <gtest/gtest.h>
#include <gmock/gmock.h>
using namespace llarp;
TEST(MetricsPublisher, StreamPublisher)
{
metrics::Category myCategory("MyCategory");
metrics::Description descA(&myCategory, "MetricA");
metrics::Description descB(&myCategory, "MetricB");
metrics::Id metricA(&descA);
metrics::Id metricB(&descB);
std::stringstream stream;
metrics::StreamPublisher myPublisher(stream);
std::vector< metrics::Record > records;
records.emplace_back(metricA, 5, 25.0, 6.0, 25.0);
records.emplace_back(metricB, 2, 7.0, 3.0, 11.0);
metrics::Sample sample;
sample.sampleTime(absl::Now());
sample.pushGroup(records.data(), records.size(), absl::Seconds(5));
myPublisher.publish(sample);
std::cout << stream.str();
}

View File

@ -0,0 +1,341 @@
#include <util/metrics_types.hpp>
#include <array>
#include <gtest/gtest.h>
#include <gmock/gmock.h>
using namespace llarp;
using namespace ::testing;
struct MetricFormatSpecTestData
{
float m_scale;
const char *m_spec;
double m_value;
const char *m_expected;
};
struct MetricFormatSpecTest : public TestWithParam< MetricFormatSpecTestData >
{
};
TEST_P(MetricFormatSpecTest, print)
{
auto d = GetParam();
metrics::FormatSpec spec(d.m_scale, d.m_spec);
std::ostringstream stream;
metrics::FormatSpec::format(stream, d.m_value, spec);
ASSERT_EQ(d.m_expected, stream.str());
}
MetricFormatSpecTestData metricFormatTestData[] = {
MetricFormatSpecTestData{0.0, "", 1.5, ""},
MetricFormatSpecTestData{1.0, "%.4f", 1.5, "1.5000"},
MetricFormatSpecTestData{1.0, "%.0f", 2.0, "2"},
MetricFormatSpecTestData{1.0, "%.0f", 1.1, "1"},
MetricFormatSpecTestData{1.0, "%.0f", 1.5, "2"},
MetricFormatSpecTestData{1.0, "%.0f", 1.7, "2"},
MetricFormatSpecTestData{1.0, "%.0f", 3.0, "3"},
MetricFormatSpecTestData{2.0, "%.0f", 3.0, "6"},
MetricFormatSpecTestData{2.0, "%.1f", 1.1, "2.2"}};
INSTANTIATE_TEST_CASE_P(MetricsTypes, MetricFormatSpecTest,
ValuesIn(metricFormatTestData));
TEST(MetricsTypes, Format)
{
metrics::Format format;
format.setSpec(metrics::Publication::Type::Max,
metrics::FormatSpec(1.0, "%0.2f"));
format.setSpec(metrics::Publication::Type::Total,
metrics::FormatSpec(2.0, "%0.3f"));
ASSERT_EQ(nullptr, format.specFor(metrics::Publication::Type::Avg));
auto ptr = format.specFor(metrics::Publication::Type::Total);
ASSERT_NE(nullptr, ptr);
ASSERT_STREQ("%0.3f", ptr->m_format);
ASSERT_DOUBLE_EQ(2.0, ptr->m_scale);
ptr = format.specFor(metrics::Publication::Type::Max);
ASSERT_NE(nullptr, ptr);
ASSERT_STREQ("%0.2f", ptr->m_format);
ASSERT_DOUBLE_EQ(1.0, ptr->m_scale);
format.clear();
ASSERT_EQ(nullptr, format.specFor(metrics::Publication::Type::Total));
ASSERT_EQ(nullptr, format.specFor(metrics::Publication::Type::Max));
}
TEST(MetricsTypes, CatContainer)
{
std::array< metrics::CategoryContainer, 10 > containers;
{
metrics::Category c("A");
for(size_t i = 0; i < containers.size(); ++i)
{
c.registerContainer(&containers[i]);
metrics::CategoryContainer *next = (0 == i) ? 0 : &containers[i - 1];
ASSERT_EQ(&c, containers[i].m_category);
ASSERT_TRUE(containers[i].m_enabled);
ASSERT_EQ(next, containers[i].m_nextCategory);
}
for(size_t i = 0; i < containers.size(); ++i)
{
metrics::CategoryContainer *next = (0 == i) ? 0 : &containers[i - 1];
ASSERT_EQ(&c, containers[i].m_category);
ASSERT_TRUE(containers[i].m_enabled);
ASSERT_EQ(next, containers[i].m_nextCategory);
}
const std::atomic_bool *enabled = &c.enabledRaw();
c.enabled(false);
ASSERT_FALSE(*enabled);
ASSERT_EQ(&c.enabledRaw(), enabled);
for(size_t i = 0; i < containers.size(); ++i)
{
metrics::CategoryContainer *next = (0 == i) ? 0 : &containers[i - 1];
ASSERT_EQ(&c, containers[i].m_category);
ASSERT_FALSE(containers[i].m_enabled);
ASSERT_EQ(next, containers[i].m_nextCategory);
}
c.enabled(true);
ASSERT_TRUE(*enabled);
ASSERT_EQ(&c.enabledRaw(), enabled);
for(size_t i = 0; i < containers.size(); ++i)
{
metrics::CategoryContainer *next = (0 == i) ? 0 : &containers[i - 1];
ASSERT_EQ(&c, containers[i].m_category);
ASSERT_TRUE(containers[i].m_enabled);
ASSERT_EQ(next, containers[i].m_nextCategory);
}
}
for(const auto &container : containers)
{
ASSERT_THAT(container.m_category, IsNull());
ASSERT_FALSE(container.m_enabled);
ASSERT_THAT(container.m_nextCategory, IsNull());
}
}
TEST(MetricsTypes, Record)
{
metrics::Record r;
ASSERT_GT(r.min(), r.max());
}
TEST(MetricsTypes, Sample)
{
metrics::Category myCategory("MyCategory");
metrics::Description descA(&myCategory, "MetricA");
metrics::Description descB(&myCategory, "MetricB");
metrics::Description descC(&myCategory, "MetricC");
metrics::Id metricA(&descA);
metrics::Id metricB(&descB);
metrics::Id metricC(&descC);
absl::Time timeStamp = absl::Now();
metrics::Record recordA(metricA, 0, 0, 0, 0);
metrics::Record recordB(metricB, 1, 2, 3, 4);
metrics::Record recordC(metricC, 4, 3, 2, 1);
metrics::Record buffer1[] = {recordA, recordB};
std::vector< metrics::Record > buffer2;
buffer2.push_back(recordC);
metrics::Sample sample;
sample.sampleTime(timeStamp);
sample.pushGroup(buffer1, sizeof(buffer1) / sizeof(*buffer1),
absl::Seconds(1.0));
sample.pushGroup(buffer2.data(), buffer2.size(), absl::Seconds(2.0));
ASSERT_EQ(timeStamp, sample.sampleTime());
ASSERT_EQ(2u, sample.groupCount());
ASSERT_EQ(3u, sample.recordCount());
ASSERT_EQ(absl::Seconds(1), sample.group(0).samplePeriod());
ASSERT_EQ(buffer1, sample.group(0).records().data());
ASSERT_EQ(2, sample.group(0).size());
ASSERT_EQ(absl::Seconds(2), sample.group(1).samplePeriod());
ASSERT_EQ(buffer2.data(), sample.group(1).records().data());
ASSERT_EQ(1, sample.group(1).size());
for(auto sampleIt = sample.begin(); sampleIt != sample.end(); ++sampleIt)
{
;
for(auto groupIt = sampleIt->begin(); groupIt != sampleIt->end(); ++groupIt)
{
std::cout << *groupIt << std::endl;
}
}
}
struct SampleTest
: public ::testing::TestWithParam< std::pair< absl::Time, std::string > >
{
metrics::Category cat_A;
metrics::Description DESC_A;
metrics::Description DESC_B;
metrics::Description DESC_C;
metrics::Description DESC_D;
metrics::Description DESC_E;
metrics::Description DESC_F;
metrics::Description DESC_G;
metrics::Id id_A;
metrics::Id id_B;
metrics::Id id_C;
metrics::Id id_D;
metrics::Id id_E;
metrics::Id id_F;
metrics::Id id_G;
std::vector< metrics::Record > recordBuffer;
SampleTest()
: cat_A("A", true)
, DESC_A(&cat_A, "A")
, DESC_B(&cat_A, "B")
, DESC_C(&cat_A, "C")
, DESC_D(&cat_A, "D")
, DESC_E(&cat_A, "E")
, DESC_F(&cat_A, "F")
, DESC_G(&cat_A, "G")
, id_A(&DESC_A)
, id_B(&DESC_B)
, id_C(&DESC_C)
, id_D(&DESC_D)
, id_E(&DESC_E)
, id_F(&DESC_F)
, id_G(&DESC_G)
{
recordBuffer.emplace_back(metrics::Id(0), 1, 1, 1, 1);
recordBuffer.emplace_back(id_A, 2, 2, 2, 2);
recordBuffer.emplace_back(id_B, 3, 3, 3, 3);
recordBuffer.emplace_back(id_C, 4, 4, 4, 4);
recordBuffer.emplace_back(id_D, 5, 5, 5, 5);
recordBuffer.emplace_back(id_E, 6, 6, 6, 6);
recordBuffer.emplace_back(id_F, 7, 7, 7, 7);
recordBuffer.emplace_back(id_G, 8, 8, 8, 8);
recordBuffer.emplace_back(id_A, 9, 9, 9, 9);
}
};
std::pair< std::vector< metrics::SampleGroup >, size_t >
generate(const std::string &specification,
const std::vector< metrics::Record > &recordBuffer)
{
const char *c = specification.c_str();
std::vector< metrics::SampleGroup > groups;
size_t size = 0;
const metrics::Record *head = recordBuffer.data();
const metrics::Record *current = head;
while(*c)
{
int numRecords = *(c + 1) - '0';
int elapsedTime = *(c + 3) - '0';
if(head + recordBuffer.size() < current + numRecords)
{
current = head;
}
groups.emplace_back(current, numRecords, absl::Seconds(elapsedTime));
size += numRecords;
current += numRecords;
c += 4;
}
return {groups, size};
}
TEST_P(SampleTest, basics)
{
absl::Time timestamp;
std::string spec;
std::tie(timestamp, spec) = GetParam();
std::vector< metrics::SampleGroup > groups;
size_t size;
std::tie(groups, size) = generate(spec, recordBuffer);
// Create the sample.
metrics::Sample sample;
sample.sampleTime(timestamp);
for(size_t j = 0; j < groups.size(); ++j)
{
sample.pushGroup(groups[j]);
}
// Test the sample.
ASSERT_EQ(timestamp, sample.sampleTime());
ASSERT_EQ(groups.size(), sample.groupCount());
ASSERT_EQ(size, sample.recordCount());
for(size_t j = 0; j < sample.groupCount(); ++j)
{
ASSERT_EQ(groups[j], sample.group(j));
}
}
TEST_P(SampleTest, append)
{
absl::Time timestamp;
std::string spec;
std::tie(timestamp, spec) = GetParam();
std::vector< metrics::SampleGroup > groups;
size_t size;
std::tie(groups, size) = generate(spec, recordBuffer);
// Create the sample.
metrics::Sample sample;
sample.sampleTime(timestamp);
std::for_each(groups.begin(), groups.end(), [&](const auto &group) {
sample.pushGroup(group.records(), group.samplePeriod());
});
// Test the sample.
ASSERT_EQ(timestamp, sample.sampleTime());
ASSERT_EQ(groups.size(), sample.groupCount());
ASSERT_EQ(size, sample.recordCount());
for(size_t j = 0; j < sample.groupCount(); ++j)
{
ASSERT_EQ(groups[j], sample.group(j));
}
}
absl::Time
fromYYMMDD(int year, int month, int day)
{
return absl::FromCivil(absl::CivilDay(year, month, day), absl::UTCTimeZone());
}
std::pair< absl::Time, std::string > sampleTestData[] = {
{fromYYMMDD(1900, 1, 1), ""},
{fromYYMMDD(1999, 1, 1), "R1E1"},
{fromYYMMDD(1999, 2, 1), "R2E2"},
{fromYYMMDD(2001, 9, 9), "R1E1R2E2"},
{fromYYMMDD(2001, 9, 9), "R3E3R3E3"},
{fromYYMMDD(2009, 9, 9), "R2E4R1E1"},
{fromYYMMDD(2001, 9, 9), "R1E1R2E2R3E3"},
{fromYYMMDD(2001, 9, 9), "R4E1R3E2R2E3R1E4"},
{fromYYMMDD(2001, 9, 9), "R1E1R2E2R1E1R2E2R1E1R2E1R1E2"}};
INSTANTIATE_TEST_CASE_P(MetricsTypes, SampleTest,
::testing::ValuesIn(sampleTestData));

View File

@ -0,0 +1,161 @@
#include <util/object.hpp>
#include <array>
#include <thread>
#include <gtest/gtest.h>
#include <gmock/gmock.h>
using namespace llarp::object;
TEST(Object, VerifySize)
{
static_assert(sizeof(Buffer< char >) == sizeof(char), "");
static_assert(sizeof(Buffer< int >) == sizeof(int), "");
static_assert(sizeof(Buffer< double >) == sizeof(double), "");
static_assert(sizeof(Buffer< std::string >) == sizeof(std::string), "");
}
TEST(Object, Inplace)
{
// Verify we can create and destroy a type with a non-trivial destructor
Buffer< std::vector< std::string > > strBuf;
new(strBuf.buffer()) std::vector< std::string >(100, "abc");
strBuf.value().~vector();
}
TEST(Catalog, smoke)
{
const double value1 = 1.0;
const double value2 = 2.0;
int handle1 = -1;
int handle2 = -1;
Catalog< double > catalog;
handle1 = catalog.add(value1);
catalog.remove(handle1);
for(size_t j = 0; j < 5; ++j)
{
for(size_t i = 1; i < 256; ++i)
{
ASSERT_FALSE(catalog.find(handle1));
handle2 = catalog.add(value2);
catalog.remove(handle2);
}
handle2 = catalog.add(value2);
ASSERT_EQ(handle1, handle2);
ASSERT_TRUE(catalog.find(handle1));
absl::optional< double > result = catalog.find(handle1);
ASSERT_TRUE(result);
ASSERT_EQ(value2, result);
catalog.remove(handle2);
}
}
TEST(Catalog, Iterator)
{
static constexpr size_t THREAD_COUNT = 10;
static constexpr size_t ITERATION_COUNT = 1000;
std::array< std::thread, THREAD_COUNT + 3 > threads;
using llarp::util::Barrier;
using Iterator = CatalogIterator< int >;
using Cat = Catalog< int >;
Barrier barrier(THREAD_COUNT + 3);
Cat catalog;
// Repeatedly remove + add values from the catalog
for(size_t i = 0; i < THREAD_COUNT; ++i)
{
threads[i] = std::thread(
[](Barrier *barrier, Cat *catalog, int id) {
barrier->Block();
for(size_t i = 0; i < ITERATION_COUNT; ++i)
{
int h = catalog->add(id);
absl::optional< int > res = catalog->find(h);
ASSERT_TRUE(res);
ASSERT_EQ(res.value(), id);
ASSERT_TRUE(catalog->replace(-id - 1, h));
res = catalog->find(h);
ASSERT_TRUE(res);
ASSERT_EQ(-id - 1, res.value());
int removed = -1;
ASSERT_TRUE(catalog->remove(h, &removed));
ASSERT_EQ(removed, -id - 1);
ASSERT_FALSE(catalog->find(h));
}
},
&barrier, &catalog, i);
}
// Verify the length constraint is never violated
threads[THREAD_COUNT] = std::thread(
[](Barrier *barrier, Cat *catalog) {
barrier->Block();
for(size_t i = 0; i < ITERATION_COUNT; ++i)
{
size_t size = catalog->size();
ASSERT_LE(size, THREAD_COUNT);
}
},
&barrier, &catalog);
// Verify that iteration always produces a valid state
threads[THREAD_COUNT + 1] = std::thread(
[](Barrier *barrier, Cat *catalog) {
barrier->Block();
for(size_t i = 0; i < ITERATION_COUNT; ++i)
{
int arr[100];
size_t size = 0;
for(Iterator it(*catalog); it; ++it)
{
arr[size++] = it().second;
}
for(int i = 0; i < 100; i++)
{
// value must be valid
bool present = false;
for(int id = 0; id < static_cast< int >(THREAD_COUNT); id++)
{
if(id == arr[i] || -id - 1 == arr[i])
{
present = true;
break;
}
}
ASSERT_TRUE(present);
// no duplicate should be there
for(size_t j = i + 1; j < size; j++)
{
ASSERT_NE(arr[i], arr[j]);
}
}
}
},
&barrier, &catalog);
// And that we don't have an invalid catalog
threads[THREAD_COUNT + 2] = std::thread(
[](Barrier *barrier, Cat *catalog) {
barrier->Block();
for(size_t i = 0; i < ITERATION_COUNT; ++i)
{
catalog->verify();
}
},
&barrier, &catalog);
for(std::thread &t : threads)
{
t.join();
}
}

View File

@ -0,0 +1,338 @@
#include <util/timerqueue.hpp>
#include <thread>
#include <gtest/gtest.h>
#include <gmock/gmock.h>
using CharQueue = llarp::thread::TimerQueue< const char* >;
using CharItem = llarp::thread::TimerQueueItem< const char* >;
TEST(TimerQueue, smoke)
{
CharQueue queue;
const absl::Time TA = absl::Time();
const absl::Time TB = TA + absl::Seconds(1);
const absl::Time TC = TB + absl::Seconds(1);
const absl::Time TD = TC + absl::Seconds(1);
const absl::Time TE = TD + absl::Seconds(1);
const char* VA = "hello";
const char* VB = "world,";
const char* VC = "how";
const char* VD = "are";
const char* VE = "you";
int HA = queue.add(TA, VA);
int HB = queue.add(TB, VB);
int HC = queue.add(TC, VC);
int HD = queue.add(TD, VD);
int HE = queue.add(TE, VE);
CharItem tItem;
absl::Time newMinTime;
size_t newSize;
ASSERT_TRUE(queue.popFront(&tItem));
ASSERT_EQ(VA, tItem.value());
ASSERT_EQ(TA, tItem.time());
ASSERT_EQ(HA, tItem.handle());
ASSERT_TRUE(queue.popFront(&tItem, &newSize, &newMinTime));
ASSERT_EQ(3, newSize);
ASSERT_EQ(TC, newMinTime);
ASSERT_EQ(TB, tItem.time());
ASSERT_EQ(VB, tItem.value());
ASSERT_EQ(HB, tItem.handle());
std::vector< CharItem > a1;
queue.popLess(TD, &a1, &newSize, &newMinTime);
ASSERT_EQ(2, a1.size());
ASSERT_EQ(1, newSize);
ASSERT_EQ(TE, newMinTime);
ASSERT_EQ(TC, a1[0].time());
ASSERT_EQ(VC, a1[0].value());
ASSERT_EQ(HC, a1[0].handle());
ASSERT_EQ(TD, a1[1].time());
ASSERT_EQ(VD, a1[1].value());
ASSERT_EQ(HD, a1[1].handle());
std::vector< CharItem > a2;
queue.popLess(TD, &a2, &newSize, &newMinTime);
ASSERT_EQ(0, a2.size());
ASSERT_EQ(1, newSize);
ASSERT_EQ(TE, newMinTime);
std::vector< CharItem > a3;
queue.popLess(TE, &a3, &newSize, &newMinTime);
ASSERT_EQ(1, a3.size());
ASSERT_EQ(0, newSize);
ASSERT_EQ(TE, a3[0].time());
ASSERT_EQ(VE, a3[0].value());
ASSERT_EQ(HE, a3[0].handle());
}
TEST(TimerQueue, KeySmoke)
{
CharQueue x1;
const absl::Time TA = absl::Time();
const absl::Time TB = TA + absl::Seconds(1);
const absl::Time TC = TB + absl::Seconds(1);
const absl::Time TD = TC + absl::Seconds(1);
const absl::Time TE = TD + absl::Seconds(1);
const char* VA = "hello";
const char* VB = "world,";
const char* VC = "how";
const char* VD = "are";
const char* VE = "you";
typedef CharQueue::Key Key;
const Key KA = Key(&TA);
const Key KB = Key(&TB);
const Key KC = Key(382);
const Key KD = Key(123);
const Key KE = Key(&VE);
int HA = x1.add(TA, VA, KA);
int HB = x1.add(TB, VB, KB);
int HC = x1.add(TC, VC, KC);
int HD = x1.add(TD, VD, KD);
int HE = x1.add(TE, VE, KE);
ASSERT_FALSE(x1.remove(HA, KB));
ASSERT_TRUE(x1.isValid(HA, KA));
ASSERT_FALSE(x1.update(HC, KD, TE));
CharItem tItem;
absl::Time newMinTime;
size_t newSize;
ASSERT_TRUE(x1.popFront(&tItem));
ASSERT_EQ(VA, tItem.value());
ASSERT_EQ(TA, tItem.time());
ASSERT_EQ(HA, tItem.handle());
ASSERT_EQ(KA, tItem.key());
ASSERT_TRUE(x1.popFront(&tItem, &newSize, &newMinTime));
ASSERT_EQ(3, newSize);
ASSERT_EQ(TC, newMinTime);
ASSERT_EQ(TB, tItem.time());
ASSERT_EQ(VB, tItem.value());
ASSERT_EQ(HB, tItem.handle());
ASSERT_EQ(KB, tItem.key());
std::vector< CharItem > a1;
x1.popLess(TD, &a1, &newSize, &newMinTime);
ASSERT_EQ(2, a1.size());
ASSERT_EQ(1, newSize);
ASSERT_EQ(TE, newMinTime);
ASSERT_EQ(TC, a1[0].time());
ASSERT_EQ(VC, a1[0].value());
ASSERT_EQ(HC, a1[0].handle());
ASSERT_EQ(KC, a1[0].key());
ASSERT_EQ(TD, a1[1].time());
ASSERT_EQ(VD, a1[1].value());
ASSERT_EQ(HD, a1[1].handle());
ASSERT_EQ(KD, a1[1].key());
std::vector< CharItem > a2;
x1.popLess(TD, &a2, &newSize, &newMinTime);
ASSERT_EQ(0, a2.size());
ASSERT_EQ(1, newSize);
ASSERT_EQ(TE, newMinTime);
std::vector< CharItem > a3;
x1.popLess(TE, &a3, &newSize, &newMinTime);
ASSERT_EQ(1, a3.size());
ASSERT_EQ(0, newSize);
ASSERT_EQ(TE, a3[0].time());
ASSERT_EQ(VE, a3[0].value());
ASSERT_EQ(HE, a3[0].handle());
ASSERT_EQ(KE, a3[0].key());
}
TEST(TimerQueue, Update)
{
const char VA[] = "A";
const char VB[] = "B";
const char VC[] = "C";
const char VD[] = "D";
const char VE[] = "E";
// clang-format off
static const struct
{
int m_secs;
int m_nsecs;
const char* m_value;
int m_updsecs;
int m_updnsecs;
bool m_isNewTop;
} VALUES[] = {
{2, 1000000, VA, 0, 1000000, false},
{2, 1000000, VB, 3, 1000000, false},
{2, 1000000, VC, 0, 4000, false},
{2, 1000001, VB, 0, 3999, true},
{1, 9999998, VC, 4, 9999998, false},
{1, 9999999, VD, 0, 0, true},
{0, 4000, VE, 10, 4000, false}};
// clang-format on
static const int POP_ORDER[] = {5, 3, 2, 0, 1, 4, 6};
const int NUM_VALUES = sizeof VALUES / sizeof *VALUES;
int handles[NUM_VALUES];
CharQueue queue;
{
CharItem item;
ASSERT_FALSE(queue.popFront(&item));
}
for(int i = 0; i < NUM_VALUES; ++i)
{
const char* VAL = VALUES[i].m_value;
const int SECS = VALUES[i].m_secs;
const int NSECS = VALUES[i].m_nsecs;
absl::Time TIME =
absl::Time() + absl::Seconds(SECS) + absl::Nanoseconds(NSECS);
handles[i] = queue.add(TIME, VAL);
ASSERT_EQ(i + 1, queue.size());
ASSERT_TRUE(queue.isValid(handles[i]));
}
for(int i = 0; i < NUM_VALUES; ++i)
{
const int UPDSECS = VALUES[i].m_updsecs;
const bool EXPNEWTOP = VALUES[i].m_isNewTop;
const int UPDNSECS = VALUES[i].m_updnsecs;
absl::Time UPDTIME =
absl::Time() + absl::Seconds(UPDSECS) + absl::Nanoseconds(UPDNSECS);
bool isNewTop;
CharItem item;
ASSERT_TRUE(queue.isValid(handles[i])) << i;
ASSERT_TRUE(queue.update(handles[i], UPDTIME, &isNewTop)) << i;
EXPECT_EQ(EXPNEWTOP, isNewTop) << i;
ASSERT_TRUE(queue.isValid(handles[i])) << i;
}
for(int i = 0; i < NUM_VALUES; ++i)
{
const int I = POP_ORDER[i];
const char* EXPVAL = VALUES[I].m_value;
const int EXPSECS = VALUES[I].m_updsecs;
const int EXPNSECS = VALUES[I].m_updnsecs;
absl::Time EXPTIME =
absl::Time() + absl::Seconds(EXPSECS) + absl::Nanoseconds(EXPNSECS);
CharItem item;
ASSERT_TRUE(queue.isValid(handles[I]));
ASSERT_TRUE(queue.popFront(&item));
ASSERT_EQ(EXPTIME, item.time());
ASSERT_EQ(EXPVAL, item.value());
ASSERT_FALSE(queue.isValid(handles[I]));
}
}
TEST(TimerQueue, ThreadSafety)
{
using Data = std::string;
using StringQueue = llarp::thread::TimerQueue< std::string >;
using StringItem = llarp::thread::TimerQueueItem< std::string >;
using Info = std::pair< int, std::vector< StringItem >* >;
static constexpr size_t NUM_THREADS = 10;
static constexpr size_t NUM_ITERATIONS = 1000;
static constexpr size_t NUM_REMOVE_ALL = NUM_ITERATIONS / 2;
Info info[NUM_THREADS];
std::thread threads[NUM_THREADS + 1];
std::vector< StringItem > items[NUM_THREADS];
absl::Barrier barrier(NUM_THREADS + 1);
StringQueue queue;
for(size_t i = 0; i < NUM_THREADS; ++i)
{
info[i].first = i;
info[i].second = &items[i];
threads[i] = std::thread(
[](Info* info, absl::Barrier* barrier, StringQueue* queue) {
const int THREAD_ID = info->first;
std::vector< StringItem >* vPtr = info->second;
// We stagger the removeAll steps among the threads.
const int STEP_REMOVE_ALL = THREAD_ID * NUM_REMOVE_ALL / NUM_THREADS;
std::ostringstream oss;
oss << THREAD_ID;
Data V(oss.str());
barrier->Block();
size_t newSize;
absl::Time newMinTime;
StringItem item;
for(size_t i = 0; i < NUM_ITERATIONS; ++i)
{
const absl::Time TIME =
absl::Time() + absl::Seconds((i * (i + 3)) % NUM_ITERATIONS);
int h = queue->add(TIME, V);
queue->update(h, TIME);
if(queue->popFront(&item, &newSize, &newMinTime))
{
vPtr->push_back(item);
}
h = queue->add(newMinTime, V);
queue->popLess(newMinTime, vPtr);
if(queue->remove(h, &item, &newSize, &newMinTime))
{
vPtr->push_back(item);
}
if(i % NUM_REMOVE_ALL == STEP_REMOVE_ALL)
{
queue->removeAll(vPtr);
}
}
},
&info[i], &barrier, &queue);
}
threads[NUM_THREADS] = std::thread(
[](absl::Barrier* barrier, StringQueue* queue) {
barrier->Block();
for(size_t i = 0; i < NUM_ITERATIONS; ++i)
{
size_t size = queue->size();
ASSERT_GE(size, 0);
ASSERT_LE(size, NUM_THREADS);
}
},
&barrier, &queue);
size_t size = 0;
for(size_t i = 0; i < NUM_THREADS; ++i)
{
threads[i].join();
size += static_cast< int >(items[i].size());
}
threads[NUM_THREADS].join();
ASSERT_EQ(0, queue.size());
ASSERT_EQ(1000 * NUM_THREADS * 2, size);
}

View File

@ -0,0 +1,194 @@
#include <util/scheduler.hpp>
#include <gtest/gtest.h>
#include <gmock/gmock.h>
using namespace llarp;
using thread::Scheduler;
using thread::Tardis;
TEST(SchedulerTest, smoke)
{
Scheduler scheduler;
ASSERT_TRUE(scheduler.start());
scheduler.stop();
}
struct TestCallback
{
std::atomic_size_t m_startCount;
std::atomic_size_t m_execCount;
absl::Duration executeTime;
TestCallback() : m_startCount(0), m_execCount(0), executeTime()
{
}
void
callback()
{
m_startCount++;
if(executeTime != absl::Duration())
{
std::this_thread::sleep_for(absl::ToChronoSeconds(executeTime));
}
m_execCount++;
}
void
waitFor(absl::Duration duration, size_t attemptCount,
size_t executeCount) const
{
for(size_t i = 0; i < attemptCount; ++i)
{
if(executeCount + 1 <= m_execCount)
{
return;
}
std::this_thread::sleep_until(absl::ToChronoTime(absl::Now() + duration));
std::this_thread::yield();
}
}
};
TEST(SchedulerTest, fakeTime)
{
// Just test we can mock out Time itself
Scheduler scheduler;
Tardis time{scheduler};
absl::Time now = time.now();
TestCallback callback1, callback2;
Scheduler::Handle handle = scheduler.schedule(
now + absl::Seconds(30), std::bind(&TestCallback::callback, &callback1));
ASSERT_NE(Scheduler::INVALID_HANDLE, handle);
handle = scheduler.scheduleRepeat(
absl::Seconds(60), std::bind(&TestCallback::callback, &callback2));
ASSERT_NE(Scheduler::INVALID_HANDLE, handle);
scheduler.start();
time.advanceTime(absl::Seconds(35));
ASSERT_EQ(time.now(), now + absl::Seconds(35));
callback1.waitFor(absl::Milliseconds(10), 100, 0);
ASSERT_EQ(1u, callback1.m_execCount);
ASSERT_EQ(0u, callback2.m_execCount);
// jump forward another 30 seconds, the repeat event should kick off
time.advanceTime(absl::Seconds(30));
ASSERT_EQ(time.now(), now + absl::Seconds(65));
callback2.waitFor(absl::Milliseconds(10), 100, 0);
ASSERT_EQ(1u, callback1.m_execCount);
ASSERT_EQ(1u, callback2.m_execCount);
// jump forward another minute, the repeat event should have run again
time.advanceTime(absl::Seconds(60));
callback2.waitFor(absl::Milliseconds(10), 100, 1);
ASSERT_EQ(1u, callback1.m_execCount);
ASSERT_EQ(2u, callback2.m_execCount);
scheduler.stop();
}
TEST(SchedulerTest, func1)
{
Scheduler scheduler;
scheduler.start();
TestCallback callback1, callback2;
absl::Time now = absl::Now();
scheduler.scheduleRepeat(absl::Milliseconds(30),
std::bind(&TestCallback::callback, &callback1));
scheduler.schedule(now + absl::Milliseconds(60),
std::bind(&TestCallback::callback, &callback2));
std::this_thread::yield();
std::this_thread::sleep_for(absl::ToChronoSeconds(absl::Milliseconds(40)));
callback1.waitFor(absl::Milliseconds(10), 100, 0);
scheduler.stop();
absl::Duration elapsed = absl::Now() - now;
size_t count1 = callback1.m_execCount;
size_t count2 = callback2.m_execCount;
if(elapsed < absl::Milliseconds(60))
{
ASSERT_EQ(1u, count1);
ASSERT_EQ(0u, count2);
}
else
{
ASSERT_LE(1u, count1);
}
callback1.waitFor(absl::Milliseconds(10), 100, 0);
size_t count = callback1.m_execCount;
ASSERT_EQ(count1, count);
count = callback2.m_execCount;
ASSERT_EQ(count2, count);
if(count2 == 0)
{
// callback2 not executed
scheduler.start();
std::this_thread::yield();
std::this_thread::sleep_for(absl::ToChronoSeconds(absl::Milliseconds(40)));
callback2.waitFor(absl::Milliseconds(10), 100, count2);
count = callback2.m_execCount;
ASSERT_LE(count2 + 1, count);
}
else
{
ASSERT_LT(absl::Milliseconds(60), elapsed);
}
}
TEST(SchedulerTest, cancelAllRepeats)
{
Scheduler scheduler;
scheduler.start();
TestCallback callback1, callback2;
const Scheduler::Handle handle1 = scheduler.scheduleRepeat(
absl::Milliseconds(30), std::bind(&TestCallback::callback, &callback1));
const Scheduler::Handle handle2 = scheduler.scheduleRepeat(
absl::Milliseconds(30), std::bind(&TestCallback::callback, &callback2));
scheduler.cancelAllRepeats();
ASSERT_FALSE(scheduler.cancelRepeat(handle1));
ASSERT_FALSE(scheduler.cancelRepeat(handle2));
const size_t count1 = callback1.m_execCount;
const size_t count2 = callback2.m_execCount;
std::this_thread::yield();
std::this_thread::sleep_for(absl::ToChronoSeconds(absl::Milliseconds(100)));
size_t count = callback1.m_execCount;
ASSERT_EQ(count1, count);
count = callback2.m_execCount;
ASSERT_EQ(count2, count);
scheduler.stop();
}