Unfuck metrics

pull/640/head
Michael 5 years ago
parent a1ef2ca342
commit 0a5ac10880
No known key found for this signature in database
GPG Key ID: 2D51757B47E2434C

@ -318,7 +318,8 @@ namespace llarp
}
void
MetricTankPublisherInterface::publish(const Sample< double > &values)
MetricTankPublisherInterface::publish(const Sample< double > &values,
const Sample< int > &)
{
if(values.recordCount() == 0)
{

@ -40,7 +40,8 @@ namespace llarp
makeSuffix(const Tags& tags);
void
publish(const Sample< double >& values) override;
publish(const Sample< double > &values,
const Sample< int > &) override;
virtual void
publish(const std::vector< PublishData >& data) = 0;

@ -215,20 +215,21 @@ namespace llarp
} // namespace
void
StreamPublisher::publish(const Sample< double > &values)
StreamPublisher::publish(const Sample< double > &doubleValues,
const Sample< int > &)
{
if(values.recordCount() == 0)
if(doubleValues.recordCount() == 0)
{
// nothing to publish
return;
}
m_stream << values.sampleTime() << " " << values.recordCount()
m_stream << doubleValues.sampleTime() << " " << doubleValues.recordCount()
<< " Records\n";
auto gIt = values.begin();
auto prev = values.begin();
for(; gIt != values.end(); ++gIt)
auto gIt = doubleValues.begin();
auto prev = doubleValues.begin();
for(; gIt != doubleValues.end(); ++gIt)
{
const double elapsedTime = absl::ToDoubleSeconds(gIt->samplePeriod());
@ -246,20 +247,21 @@ namespace llarp
}
void
JsonPublisher::publish(const Sample< double > &values)
JsonPublisher::publish(const Sample< double > &doubleValues,
const Sample< int > &)
{
if(values.recordCount() == 0)
if(doubleValues.recordCount() == 0)
{
// nothing to publish
return;
}
nlohmann::json result;
result["sampleTime"] = absl::UnparseFlag(values.sampleTime());
result["recordCount"] = values.recordCount();
auto gIt = values.begin();
auto prev = values.begin();
for(; gIt != values.end(); ++gIt)
result["sampleTime"] = absl::UnparseFlag(doubleValues.sampleTime());
result["recordCount"] = doubleValues.recordCount();
auto gIt = doubleValues.begin();
auto prev = doubleValues.begin();
for(; gIt != doubleValues.end(); ++gIt)
{
const double elapsedTime = absl::ToDoubleSeconds(gIt->samplePeriod());

@ -26,7 +26,8 @@ namespace llarp
}
void
publish(const Sample< double >& values) override;
publish(const Sample< double >& doubleValues,
const Sample< int >& intValues) override;
};
class JsonPublisher final : public Publisher
@ -47,7 +48,8 @@ namespace llarp
}
void
publish(const Sample< double >& values) override;
publish(const Sample< double >& doubleValues,
const Sample< int >& intValues) override;
static void
directoryPublisher(const nlohmann::json& result, const fs::path& path);

@ -41,14 +41,15 @@ namespace llarp
#define METRICS_UNIQUE_NAME(X) METRICS_NAME_CAT(X, METRICS_UNIQ_NUMBER)
#define METRICS_TIME_BLOCK_IMP(CAT, METRIC, VAR_NAME) \
llarp::metrics::DoubleCollector* VAR_NAME = nullptr; \
if(llarp::metrics::DefaultManager::instance()) \
{ \
using namespace llarp::metrics; \
CollectorRepo& repo = DefaultManager::instance()->collectorRepo(); \
VAR_NAME = repo.defaultDoubleCollector((CAT), (METRIC)); \
} \
#define METRICS_TIME_BLOCK_IMP(CAT, METRIC, VAR_NAME) \
llarp::metrics::DoubleCollector* VAR_NAME = nullptr; \
if(llarp::metrics::DefaultManager::instance()) \
{ \
using namespace llarp::metrics; \
CollectorRepo< double >& repo = \
DefaultManager::instance()->doubleCollectorRepo(); \
VAR_NAME = repo.defaultCollector((CAT), (METRIC)); \
} \
llarp::metrics::TimerGuard METRICS_UNIQUE_NAME(timer_guard)(VAR_NAME);
#define METRICS_TIME_BLOCK(CAT, METRIC) \
@ -66,20 +67,20 @@ namespace llarp
BALM_METRICS_IF_CATEGORY_ENABLED_IMP(CAT, METRICS_UNIQUE_NAME(Container))
// For when the category/metric may change during the program run
#define METRICS_DYNAMIC_INT_UPDATE(CAT, METRIC, VALUE) \
do \
{ \
using namespace llarp::metrics; \
if(DefaultManager::instance()) \
{ \
CollectorRepo& repository = DefaultManager::instance()->collectorRepo(); \
IntCollector* collector = \
repository.defaultIntCollector((CAT), (METRIC)); \
if(collector->id().category()->enabled()) \
{ \
collector->tick((VALUE)); \
} \
} \
#define METRICS_DYNAMIC_INT_UPDATE(CAT, METRIC, VALUE) \
do \
{ \
using namespace llarp::metrics; \
if(DefaultManager::instance()) \
{ \
CollectorRepo< int >& repository = \
DefaultManager::instance()->intCollectorRepo(); \
IntCollector* collector = repository.defaultCollector((CAT), (METRIC)); \
if(collector->id().category()->enabled()) \
{ \
collector->tick((VALUE)); \
} \
} \
} while(false)
// For when the category/metric remain static
@ -119,20 +120,21 @@ namespace llarp
} while(false)
// For when the category/metric may change during the program run
#define METRICS_DYNAMIC_UPDATE(CAT, METRIC, VALUE) \
do \
{ \
using namespace llarp::metrics; \
if(DefaultManager::instance()) \
{ \
CollectorRepo& repository = DefaultManager::instance()->collectorRepo(); \
DoubleCollector* collector = \
repository.defaultDoubleCollector((CAT), (METRIC)); \
if(collector->id().category()->enabled()) \
{ \
collector->tick((VALUE)); \
} \
} \
#define METRICS_DYNAMIC_UPDATE(CAT, METRIC, VALUE) \
do \
{ \
using namespace llarp::metrics; \
if(DefaultManager::instance()) \
{ \
CollectorRepo< double >& repository = \
DefaultManager::instance()->doubleCollectorRepo(); \
DoubleCollector* collector = \
repository.defaultCollector((CAT), (METRIC)); \
if(collector->id().category()->enabled()) \
{ \
collector->tick((VALUE)); \
} \
} \
} while(false)
// For when the category/metric remain static

@ -6,6 +6,9 @@ namespace llarp
{
namespace metrics
{
using DoubleRecords = std::vector< Record< double > >;
using IntRecords = std::vector< Record< int > >;
std::tuple< Id, bool >
Registry::insert(const char *category, const char *name)
{
@ -186,178 +189,49 @@ namespace llarp
return result;
}
MetricCollectors &
CollectorRepo::getCollectors(const Id &id)
{
auto it = m_collectors.find(id);
if(it == m_collectors.end())
{
assert(id.valid());
const Category *cat = id.category();
auto ptr = std::make_shared< MetricCollectors >(id);
auto &vec = m_categories[cat];
vec.reserve(vec.size() + 1);
it = m_collectors.emplace(id, ptr).first;
vec.push_back(ptr.get());
}
return *it->second.get();
}
std::vector< Record< double > >
CollectorRepo::collectAndClear(const Category *category)
{
absl::WriterMutexLock l(&m_mutex);
std::vector< Record< double > > result;
auto it = m_categories.find(category);
if(it != m_categories.end())
{
auto &collectors = it->second;
result.reserve(collectors.size());
std::transform(
collectors.begin(), collectors.end(), std::back_inserter(result),
[](MetricCollectors *c) { return c->combineAndClear(); });
}
return result;
}
std::vector< Record< double > >
CollectorRepo::collect(const Category *category)
{
absl::WriterMutexLock l(&m_mutex);
std::vector< Record< double > > result;
auto it = m_categories.find(category);
if(it != m_categories.end())
{
auto &collectors = it->second;
result.reserve(collectors.size());
std::transform(collectors.begin(), collectors.end(),
std::back_inserter(result),
[](MetricCollectors *c) { return c->combine(); });
}
return result;
}
DoubleCollector *
CollectorRepo::defaultDoubleCollector(const Id &id)
{
{
absl::ReaderMutexLock l(&m_mutex);
auto it = m_collectors.find(id);
if(it != m_collectors.end())
{
return it->second->doubleCollectors().defaultCollector();
}
}
{
absl::WriterMutexLock l(&m_mutex);
return getCollectors(id).doubleCollectors().defaultCollector();
}
}
IntCollector *
CollectorRepo::defaultIntCollector(const Id &id)
{
{
absl::ReaderMutexLock l(&m_mutex);
auto it = m_collectors.find(id);
if(it != m_collectors.end())
{
return it->second->intCollectors().defaultCollector();
}
}
{
absl::WriterMutexLock l(&m_mutex);
return getCollectors(id).intCollectors().defaultCollector();
}
}
std::pair< std::vector< std::shared_ptr< DoubleCollector > >,
std::vector< std::shared_ptr< IntCollector > > >
CollectorRepo::allCollectors(const Id &id)
{
absl::ReaderMutexLock l(&m_mutex);
auto it = m_collectors.find(id);
if(it == m_collectors.end())
{
return {};
}
return {it->second->doubleCollectors().collectors(),
it->second->intCollectors().collectors()};
}
struct PublisherHelper
{
using SampleCache =
std::map< std::shared_ptr< Publisher >, Sample< double > >;
std::map< std::shared_ptr< Publisher >,
std::pair< Sample< double >, Sample< int > > >;
static void
updateSampleCache(SampleCache &cache,
const std::shared_ptr< Publisher > &publisher,
const SampleGroup< double > &sampleGroup,
const SampleGroup< double > &doubleGroup,
const SampleGroup< int > &intGroup,
const absl::Time &timeStamp)
{
SampleCache::iterator it = cache.find(publisher);
if(it == cache.end())
{
Sample< double > newSample;
newSample.sampleTime(timeStamp);
it = cache.emplace(publisher, newSample).first;
Sample< double > dSample;
dSample.sampleTime(timeStamp);
Sample< int > iSample;
iSample.sampleTime(timeStamp);
it = cache.emplace(publisher, std::make_pair(dSample, iSample)).first;
}
it->second.pushGroup(sampleGroup);
it->second.first.pushGroup(doubleGroup);
it->second.second.pushGroup(intGroup);
}
static std::pair< std::vector< Record< double > >, absl::Duration >
struct CollectResult
{
Records records;
absl::Duration samplePeriod;
};
static CollectResult
collect(Manager &manager, const Category *category,
const absl::Duration &now, bool clear)
EXCLUSIVE_LOCKS_REQUIRED(manager.m_mutex)
{
using Callback = Manager::RecordCallback;
using CallbackVector = std::vector< const Callback * >;
using RegistryIterator = CallbackRegistry::iterator;
CallbackVector callbacks;
RegistryIterator begin = manager.m_callbacks.lowerBound(category);
RegistryIterator end = manager.m_callbacks.upperBound(category);
std::vector< Record< double > > result;
std::for_each(begin, end, [&](const auto &x) {
std::vector< Record< double > > tmp = (x.second)(clear);
result.insert(result.end(), tmp.begin(), tmp.end());
});
// Collect records from the repo.
if(clear)
{
std::vector< Record< double > > tmp =
manager.m_repo.collectAndClear(category);
result.insert(result.end(), tmp.begin(), tmp.end());
}
else
{
std::vector< Record< double > > tmp =
manager.m_repo.collect(category);
result.insert(result.end(), tmp.begin(), tmp.end());
}
const Records result = clear
? Records(manager.m_doubleRepo.collectAndClear(category),
manager.m_intRepo.collectAndClear(category))
: Records(manager.m_doubleRepo.collect(category),
manager.m_intRepo.collect(category));
// Get the time since last reset, and clear if needed.
Manager::ResetTimes::iterator it = manager.m_resetTimes.find(category);
@ -380,6 +254,10 @@ namespace llarp
}
}
template < typename Type >
using RecordBuffer =
std::vector< std::shared_ptr< std::vector< Record< Type > > > >;
template < typename CategoryIterator >
static void
publish(Manager &manager, const CategoryIterator &categoriesBegin,
@ -389,10 +267,9 @@ namespace llarp
{
return;
}
using RecordBuffer =
std::vector< std::shared_ptr< std::vector< Record< double > > > >;
RecordBuffer recordBuffer;
RecordBuffer< double > doubleRecordBuffer;
RecordBuffer< int > intRecordBuffer;
SampleCache sampleCache;
@ -414,69 +291,81 @@ namespace llarp
continue;
}
// Collect the metrics.
auto result = collect(manager, *catIt, now, clear);
auto result = collect(manager, *catIt, now, clear);
const auto &records = result.records;
// If their are no collected records then this category can be
// If there are no collected records then this category can be
// ignored.
if(result.first.empty())
if(records.doubleRecords.empty() && records.intRecords.empty())
{
continue;
}
if(result.second == absl::Duration())
if(result.samplePeriod == absl::Duration())
{
std::cerr << "Invalid elapsed time interval of 0 for "
"published metrics.";
result.second += absl::Nanoseconds(1);
result.samplePeriod += absl::Nanoseconds(1);
}
// Append the collected records to the buffer of records.
auto records = std::make_shared< std::vector< Record< double > > >(
result.first);
recordBuffer.push_back(records);
SampleGroup< double > sampleGroup(
absl::Span< Record< double > >(*records), result.second);
std::for_each(
manager.m_publishers.globalBegin(),
manager.m_publishers.globalEnd(), [&](const auto &ptr) {
updateSampleCache(sampleCache, ptr, sampleGroup, timeStamp);
});
auto dRecords =
std::make_shared< DoubleRecords >(records.doubleRecords);
doubleRecordBuffer.push_back(dRecords);
SampleGroup< double > doubleGroup(
absl::Span< Record< double > >(*dRecords), result.samplePeriod);
auto iRecords = std::make_shared< IntRecords >(records.intRecords);
intRecordBuffer.push_back(iRecords);
SampleGroup< int > intGroup(absl::Span< Record< int > >(*iRecords),
result.samplePeriod);
std::for_each(manager.m_publishers.globalBegin(),
manager.m_publishers.globalEnd(),
[&](const auto &ptr) {
updateSampleCache(sampleCache, ptr, doubleGroup,
intGroup, timeStamp);
});
std::for_each(manager.m_publishers.lowerBound(*catIt),
manager.m_publishers.upperBound(*catIt),
[&](const auto &val) {
updateSampleCache(sampleCache, val.second,
sampleGroup, timeStamp);
doubleGroup, intGroup, timeStamp);
});
}
}
for(auto &entry : sampleCache)
{
Publisher *publisher = entry.first.get();
Sample< double > &sample = entry.second;
Publisher *publisher = entry.first.get();
Sample< double > &is = entry.second.first;
Sample< int > &ds = entry.second.second;
publisher->publish(sample);
publisher->publish(is, ds);
}
}
};
Sample< double >
Manager::collectSample(std::vector< Record< double > > &records,
Samples
Manager::collectSample(Records &records,
absl::Span< const Category * > categories,
bool clear)
{
absl::Time timeStamp = absl::Now();
absl::Duration now = timeStamp - absl::UnixEpoch();
Sample< double > sample;
sample.sampleTime(timeStamp);
Sample< double > dSample;
Sample< int > iSample;
dSample.sampleTime(timeStamp);
iSample.sampleTime(timeStamp);
// Use a tuple to hold 'references' to the collected records
using SampleDescription = std::tuple< size_t, size_t, absl::Duration >;
std::vector< SampleDescription > samples;
samples.reserve(categories.size());
std::vector< SampleDescription > dSamples;
std::vector< SampleDescription > iSamples;
dSamples.reserve(categories.size());
iSamples.reserve(categories.size());
// 1
absl::WriterMutexLock publishGuard(&m_publishLock);
@ -490,32 +379,49 @@ namespace llarp
continue;
}
size_t beginIndex = records.size();
size_t dBeginIndex = records.doubleRecords.size();
size_t iBeginIndex = records.intRecords.size();
// Collect the metrics.
std::vector< Record< double > > catRecords;
absl::Duration elapsedTime;
std::tie(catRecords, elapsedTime) =
PublisherHelper::collect(*this, category, now, clear);
records.insert(records.end(), catRecords.begin(), catRecords.end());
auto collectRes = PublisherHelper::collect(*this, category, now, clear);
DoubleRecords catDRecords = collectRes.records.doubleRecords;
IntRecords catIRecords = collectRes.records.intRecords;
absl::Duration elapsedTime = collectRes.samplePeriod;
records.doubleRecords.insert(records.doubleRecords.end(),
catDRecords.begin(), catDRecords.end());
records.intRecords.insert(records.intRecords.end(), catIRecords.begin(),
catIRecords.end());
size_t size = records.size() - beginIndex;
size_t dSize = records.doubleRecords.size() - dBeginIndex;
size_t iSize = records.intRecords.size() - iBeginIndex;
// If there are no collected records then this category can be ignored.
if(size != 0)
if(dSize != 0)
{
samples.emplace_back(beginIndex, size, elapsedTime);
dSamples.emplace_back(dBeginIndex, dSize, elapsedTime);
}
if(iSize != 0)
{
iSamples.emplace_back(iBeginIndex, iSize, elapsedTime);
}
}
// Now that we have all the records, we can build our sample
for(const SampleDescription &s : samples)
for(const SampleDescription &s : dSamples)
{
dSample.pushGroup(&records.doubleRecords[std::get< 0 >(s)],
std::get< 1 >(s), std::get< 2 >(s));
}
for(const SampleDescription &s : iSamples)
{
sample.pushGroup(&records[std::get< 0 >(s)], std::get< 1 >(s),
std::get< 2 >(s));
iSample.pushGroup(&records.intRecords[std::get< 0 >(s)],
std::get< 1 >(s), std::get< 2 >(s));
}
return sample;
return {dSample, iSample};
}
void

@ -104,7 +104,8 @@ namespace llarp
virtual ~Publisher() = default;
virtual void
publish(const Sample< double > &sample) = 0;
publish(const Sample< double > &doubleSample,
const Sample< int > &intSample) = 0;
};
template < typename LhsType, typename RhsType >
@ -202,69 +203,6 @@ namespace llarp
using DoubleCollectors = Collectors< double >;
using IntCollectors = Collectors< int >;
class MetricCollectors
{
private:
DoubleCollectors m_doubleCollectors;
IntCollectors m_intCollectors;
MetricCollectors(const MetricCollectors &) = delete;
MetricCollectors &
operator=(const MetricCollectors &) = delete;
public:
MetricCollectors(const Id &id)
: m_doubleCollectors(id), m_intCollectors(id)
{
}
DoubleCollectors &
doubleCollectors()
{
return m_doubleCollectors;
}
IntCollectors &
intCollectors()
{
return m_intCollectors;
}
const DoubleCollectors &
doubleCollectors() const
{
return m_doubleCollectors;
}
const IntCollectors &
intCollectors() const
{
return m_intCollectors;
}
Record< double >
combineAndClear()
{
Record< double > res = m_doubleCollectors.combineAndClear();
metrics::combine(res, m_intCollectors.combineAndClear());
return res;
}
Record< double >
combine()
{
Record< double > res = m_doubleCollectors.combine();
metrics::combine(res, m_intCollectors.combine());
return res;
}
const Id &
id() const
{
return m_intCollectors.id();
}
};
class Registry
{
using NamedCategory = std::tuple< const char *, const char * >;
@ -359,83 +297,163 @@ namespace llarp
getAll() const;
};
using DoubleRecords = std::vector< Record< double > >;
using IntRecords = std::vector< Record< int > >;
struct Records
{
DoubleRecords doubleRecords;
IntRecords intRecords;
Records()
{
}
Records(const DoubleRecords &d, const IntRecords &i)
: doubleRecords(d), intRecords(i)
{
}
};
inline bool
operator==(const Records &lhs, const Records &rhs)
{
return std::tie(lhs.doubleRecords, lhs.intRecords)
== std::tie(rhs.doubleRecords, rhs.intRecords);
}
template < typename Type >
class CollectorRepo
{
using MetricCollectorsPtr = std::shared_ptr< MetricCollectors >;
using IdCollectors = std::map< Id, MetricCollectorsPtr >;
using CollectorsPtr = std::shared_ptr< Collectors< Type > >;
using IdCollectors = std::map< Id, CollectorsPtr >;
using CategoryCollectors =
std::map< const Category *, std::vector< MetricCollectors * > >;
std::map< const Category *, std::vector< Collectors< Type > * > >;
Registry *m_registry;
IdCollectors m_collectors;
CategoryCollectors m_categories;
mutable util::Mutex m_mutex;
CollectorRepo(const CollectorRepo &) = delete;
CollectorRepo &
operator=(const CollectorRepo &) = delete;
MetricCollectors &
getCollectors(const Id &id);
public:
CollectorRepo(Registry *registry) : m_registry(registry)
Collectors< Type > &
getCollectors(const Id &id)
{
}
auto it = m_collectors.find(id);
std::vector< Record< double > >
collectAndClear(const Category *category);
if(it == m_collectors.end())
{
assert(id.valid());
std::vector< Record< double > >
collect(const Category *category);
const Category *cat = id.category();
DoubleCollector *
defaultDoubleCollector(const char *category, const char *name)
auto ptr = std::make_shared< Collectors< Type > >(id);
auto &vec = m_categories[cat];
vec.reserve(vec.size() + 1);
it = m_collectors.emplace(id, ptr).first;
vec.push_back(ptr.get());
}
return *it->second.get();
}
template < Record< Type > (Collectors< Type >::*func)() >
std::vector< Record< Type > >
collectOp(const Category *category)
{
return defaultDoubleCollector(m_registry->get(category, name));
absl::WriterMutexLock l(&m_mutex);
auto it = m_categories.find(category);
if(it == m_categories.end())
{
return {};
}
std::vector< Record< Type > > result;
auto &collectors = it->second;
result.reserve(collectors.size());
std::transform(collectors.begin(), collectors.end(),
std::back_inserter(result),
[](auto &collector) { return (collector->*func)(); });
return result;
}
DoubleCollector *
defaultDoubleCollector(const Id &id);
public:
explicit CollectorRepo(Registry *registry) : m_registry(registry)
{
}
IntCollector *
defaultIntCollector(const char *category, const char *name)
std::vector< Record< Type > >
collectAndClear(const Category *category)
{
return defaultIntCollector(m_registry->get(category, name));
return collectOp< &Collectors< Type >::combineAndClear >(category);
}
IntCollector *
defaultIntCollector(const Id &id);
std::vector< Record< Type > >
collect(const Category *category)
{
return collectOp< &Collectors< Type >::combine >(category);
}
std::shared_ptr< DoubleCollector >
addDoubleCollector(const char *category, const char *name)
Collector< Type > *
defaultCollector(const char *category, const char *name)
{
return addDoubleCollector(m_registry->get(category, name));
return defaultCollector(m_registry->get(category, name));
}
std::shared_ptr< DoubleCollector >
addDoubleCollector(const Id &id)
Collector< Type > *
defaultCollector(const Id &id)
{
absl::WriterMutexLock l(&m_mutex);
return getCollectors(id).doubleCollectors().add();
{
absl::ReaderMutexLock l(&m_mutex);
auto it = m_collectors.find(id);
if(it != m_collectors.end())
{
return it->second->defaultCollector();
}
}
{
absl::WriterMutexLock l(&m_mutex);
return getCollectors(id).defaultCollector();
}
}
std::shared_ptr< IntCollector >
addIntCollector(const char *category, const char *name)
std::shared_ptr< Collector< Type > >
addCollector(const char *category, const char *name)
{
return addIntCollector(m_registry->get(category, name));
return addCollector(m_registry->get(category, name));
}
std::shared_ptr< IntCollector >
addIntCollector(const Id &id)
std::shared_ptr< Collector< Type > >
addCollector(const Id &id)
{
absl::WriterMutexLock l(&m_mutex);
return getCollectors(id).intCollectors().add();
return getCollectors(id).add();
}
std::pair< std::vector< std::shared_ptr< DoubleCollector > >,
std::vector< std::shared_ptr< IntCollector > > >
allCollectors(const Id &id);
std::vector< std::shared_ptr< Collector< Type > > >
allCollectors(const Id &id)
{
absl::ReaderMutexLock l(&m_mutex);
auto it = m_collectors.find(id);
if(it == m_collectors.end())
{
return {};
}
return it->second->collectors();
}
Registry &
registry()
@ -600,94 +618,14 @@ namespace llarp
}
};
class CallbackRegistry
{
using Handle = uint64_t;
using RecordCallback =
std::function< std::vector< Record< double > >(bool) >;
using CallbackMap = std::multimap< const Category *, RecordCallback >;
using HandleMap = std::map< Handle, CallbackMap::iterator >;
Handle m_next;
CallbackMap m_callbackMap;
HandleMap m_handleMap;
CallbackRegistry(const CallbackRegistry &) = delete;
CallbackRegistry &
operator=(const CallbackRegistry &) = delete;
public:
using iterator = CallbackMap::iterator;
CallbackRegistry() : m_next(1)
{
}
Handle
registerCallback(const Category *category, const RecordCallback &callback)
{
Handle handle = m_next++;
auto it = m_callbackMap.emplace(category, callback);
m_handleMap.emplace(handle, it);
return handle;
}
bool
removeCallback(Handle handle)
{
auto it = m_handleMap.find(handle);
if(it == m_handleMap.end())
{
return false;
}
m_callbackMap.erase(it->second);
m_handleMap.erase(it);
return true;
}
iterator
begin()
{
return m_callbackMap.begin();
}
iterator
end()
{
return m_callbackMap.end();
}
iterator
lowerBound(const Category *category)
{
return m_callbackMap.lower_bound(category);
}
iterator
upperBound(const Category *category)
{
return m_callbackMap.upper_bound(category);
}
std::vector< const RecordCallback * >
callbacksFor(const Category *category) const
{
std::vector< const RecordCallback * > result;
auto beg = m_callbackMap.lower_bound(category);
auto end = m_callbackMap.upper_bound(category);
result.reserve(std::distance(beg, end));
std::transform(beg, end, std::back_inserter(result),
[](const auto &x) { return &x.second; });
struct PublisherHelper;
return result;
}
struct Samples
{
Sample< double > doubleSample;
Sample< int > intSample;
};
struct PublisherHelper;
/// The big dog.
/// This class owns everything else, and is responsible for managing the
/// gathering and publishing of metrics
@ -696,9 +634,6 @@ namespace llarp
public:
// Public callback. If the bool flag is true, clear the metrics back to
// their default state.
using RecordCallback =
std::function< std::vector< Record< double > >(bool) >;
using Handle = uint64_t;
private:
@ -708,8 +643,8 @@ namespace llarp
friend struct PublisherHelper;
Registry m_registry;
CollectorRepo m_repo;
CallbackRegistry m_callbacks GUARDED_BY(m_mutex);
CollectorRepo< double > m_doubleRepo;
CollectorRepo< int > m_intRepo;
PublisherRegistry m_publishers GUARDED_BY(m_mutex);
const absl::Duration m_createTime;
@ -723,28 +658,10 @@ namespace llarp
std::numeric_limits< Handle >::max();
Manager()
: m_repo(&m_registry), m_createTime(absl::Now() - absl::UnixEpoch())
{
}
/// Register a callback for
Handle
registerCallback(const char *categoryName, const RecordCallback &callback)
: m_doubleRepo(&m_registry)
, m_intRepo(&m_registry)
, m_createTime(absl::Now() - absl::UnixEpoch())
{
return registerCallback(m_registry.get(categoryName), callback);
}
Handle
registerCallback(const Category *category, const RecordCallback &callback)
{
absl::WriterMutexLock l(&m_mutex);
return m_callbacks.registerCallback(category, callback);
}
bool
removeCallback(Handle handle)
{
absl::WriterMutexLock l(&m_mutex);
return m_callbacks.removeCallback(handle);
}
/// Add a `publisher` which will receive all events
@ -787,24 +704,23 @@ namespace llarp
}
// clang-format off
CollectorRepo& collectorRepo() { return m_repo; }
const CollectorRepo& collectorRepo() const { return m_repo; }
CollectorRepo<double>& doubleCollectorRepo() { return m_doubleRepo; }
CollectorRepo<int>& intCollectorRepo() { return m_intRepo; }
Registry& registry() { return m_registry; }
const Registry& registry() const { return m_registry; }
// clang-format on
/// Publish specific categories of metric matching the category/categories
Sample< double >
collectSample(std::vector< Record< double > > &records,
bool clear = false)
Samples
collectSample(Records &records, bool clear = false)
{
std::vector< const Category * > allCategories = m_registry.getAll();
return collectSample(
records, absl::Span< const Category * >{allCategories}, clear);
}
Sample< double >
collectSample(std::vector< Record< double > > &records,
absl::Span< const Category * > categories,
Samples
collectSample(Records &records, absl::Span< const Category * > categories,
bool clear = false);
/// Publish specific categories of metric matching the category/categories
@ -952,8 +868,7 @@ namespace llarp
};
template < typename Collector, typename Value,
Collector *(CollectorRepo::*catFunc)(const char *, const char *),
Collector *(CollectorRepo::*idFunc)(const Id &) >
CollectorRepo< Value > &(Manager::*repoFunc)() >
class Metric
{
Collector *m_collector; // can be null
@ -964,7 +879,7 @@ namespace llarp
lookup(const char *category, const char *name, Manager *manager = nullptr)
{
manager = DefaultManager::manager(manager);
return manager ? (manager->collectorRepo().*catFunc)(category, name)
return manager ? (manager->*repoFunc)().defaultCollector(category, name)
: 0;
}
@ -972,7 +887,7 @@ namespace llarp
lookup(const Id &id, Manager *manager = nullptr)
{
manager = DefaultManager::manager(manager);
return manager ? (manager->collectorRepo().*idFunc)(id) : 0;
return manager ? (manager->*repoFunc)().defaultCollector(id) : 0;
}
Metric(const char *category, const char *name, Manager *manager)
@ -1061,7 +976,7 @@ namespace llarp
const char *category, const char *metric)
{
Manager *manager = DefaultManager::instance();
*collector = manager->collectorRepo().*catFunc(category, metric);
*collector = (manager->*repoFunc().defaultCollector)(category, metric);
manager->registry().registerContainer((*collector)->id().category(),
container);
}
@ -1072,7 +987,7 @@ namespace llarp
Publication::Type type)
{
Manager *manager = DefaultManager::instance();
*collector = manager->collectorRepo().*catFunc(category, metric);
*collector = (manager->*repoFunc().defaultCollector)(category, metric);
manager->registry().registerContainer((*collector)->id().category(),
container);
manager->registry().publicationType((*collector)->id(), type);
@ -1080,12 +995,9 @@ namespace llarp
};
using DoubleMetric =
Metric< DoubleCollector, double, &CollectorRepo::defaultDoubleCollector,
&CollectorRepo::defaultDoubleCollector >;
Metric< DoubleCollector, double, &Manager::doubleCollectorRepo >;
using IntMetric =
Metric< IntCollector, int, &CollectorRepo::defaultIntCollector,
&CollectorRepo::defaultIntCollector >;
using IntMetric = Metric< IntCollector, int, &Manager::intCollectorRepo >;
class TimerGuard
{

@ -416,7 +416,7 @@ namespace llarp
explicit Record(const Id &id)
: m_id(id)
, m_count(0)
, m_total(0.0)
, m_total()
, m_min(DEFAULT_MIN())
, m_max(DEFAULT_MAX())
{
@ -483,6 +483,13 @@ namespace llarp
&& lhs.max() == rhs.max());
}
template < typename Type >
inline bool
operator!=(const Record< Type > &lhs, const Record< Type > &rhs)
{
return !(lhs == rhs);
}
template < typename Type >
class SampleGroup
{

@ -26,7 +26,7 @@ TEST(MetricsPublisher, StreamPublisher)
sample.sampleTime(absl::Now());
sample.pushGroup(records.data(), records.size(), absl::Seconds(5));
myPublisher.publish(sample);
myPublisher.publish(sample, metrics::Sample< int >());
std::cout << stream.str();
}

@ -311,36 +311,27 @@ MATCHER_P5(RecordCatEq, category, count, total, min, max, "")
TEST(MetricsCore, RepoBasic)
{
Registry registry;
CollectorRepo repo(&registry);
CollectorRepo< double > repo(&registry);
DoubleCollector *collector1 = repo.defaultDoubleCollector("Test", "C1");
DoubleCollector *collector2 = repo.defaultDoubleCollector("Test", "C2");
IntCollector *intCollector1 = repo.defaultIntCollector("Test", "C3");
IntCollector *intCollector2 = repo.defaultIntCollector("Test", "C4");
DoubleCollector *collector1 = repo.defaultCollector("Test", "C1");
DoubleCollector *collector2 = repo.defaultCollector("Test", "C2");
ASSERT_NE(collector1, collector2);
ASSERT_EQ(collector1, repo.defaultDoubleCollector("Test", "C1"));
ASSERT_NE(intCollector1, intCollector2);
ASSERT_EQ(intCollector1, repo.defaultIntCollector("Test", "C3"));
ASSERT_EQ(collector1, repo.defaultCollector("Test", "C1"));
collector1->tick(1.0);
collector1->tick(2.0);
collector2->tick(4.0);
intCollector1->tick(5);
intCollector2->tick(6);
std::vector< Record< double > > records =
repo.collectAndClear(registry.get("Test"));
ASSERT_THAT(records, SizeIs(4));
EXPECT_THAT(records, SizeIs(2));
// clang-format off
ASSERT_THAT(
EXPECT_THAT(
records,
ElementsAre(
RecordEq("Test", "C1", 2u, 3, 1, 2),
RecordEq("Test", "C2", 1u, 4, 4, 4),
RecordEq("Test", "C3", 1u, 5, 5, 5),
RecordEq("Test", "C4", 1u, 6, 6, 6)
RecordEq("Test", "C2", 1u, 4, 4, 4)
)
);
// clang-format on
@ -360,7 +351,7 @@ TEST(MetricsCore, RepoCollect)
for(int i = 0; i < static_cast< int >(CATEGORIES.size()); ++i)
{
CollectorRepo repo(&registry);
CollectorRepo< int > repo(&registry);
for(int j = 0; j < static_cast< int >(CATEGORIES.size()); ++j)
{
@ -370,16 +361,13 @@ TEST(MetricsCore, RepoCollect)
Id metric = registry.get(CATEGORY, METRICS[k]);
for(int l = 0; l < NUM_COLS; ++l)
{
DoubleCollector *dCol = repo.addDoubleCollector(metric).get();
IntCollector *iCol = repo.addIntCollector(metric).get();
IntCollector *iCol = repo.addCollector(metric).get();
if(i == j)
{
dCol->set(k, 2 * k, -k, k);
iCol->set(k, 2 * k, -k, k);
}
else
{
dCol->set(100, 100, 100, 100);
iCol->set(100, 100, 100, 100);
}
}
@ -391,7 +379,7 @@ TEST(MetricsCore, RepoCollect)
const char *CATEGORY = CATEGORIES[i];
const Category *category = registry.get(CATEGORY);
std::vector< Record< double > > records = repo.collect(category);
std::vector< Record< int > > records = repo.collect(category);
ASSERT_THAT(records, SizeIs(static_cast< int >(METRICS.size())));
// clang-format off
@ -399,8 +387,8 @@ TEST(MetricsCore, RepoCollect)
records,
UnorderedElementsAre(
RecordEq(CATEGORY, "A", 0u, 0, 0, 0),
RecordEq(CATEGORY, "B", 6u, 12, -1, 1),
RecordEq(CATEGORY, "C", 12u, 24, -2, 2)
RecordEq(CATEGORY, "B", 3u, 6, -1, 1),
RecordEq(CATEGORY, "C", 6u, 12, -2, 2)
)
);
// clang-format on
@ -410,17 +398,12 @@ TEST(MetricsCore, RepoCollect)
{
Id metric = registry.get(CATEGORY, METRICS[j]);
auto collectors = repo.allCollectors(metric);
const auto &doubleCols = collectors.first;
const auto &intCols = collectors.second;
for(int k = 0; k < static_cast< int >(doubleCols.size()); ++k)
auto collectors = repo.allCollectors(metric);
for(int k = 0; k < static_cast< int >(collectors.size()); ++k)
{
Record< double > ED(metric, j, 2 * j, -j, j);
Record< int > EI(metric, j, 2 * j, -j, j);
Record< double > record1 = doubleCols[k]->load();
Record< int > record2 = intCols[k]->load();
ASSERT_EQ(record1, ED);
ASSERT_EQ(record2, EI);
Record< int > record = collectors[k]->load();
ASSERT_EQ(record, EI);
}
}
}
@ -436,17 +419,13 @@ TEST(MetricsCore, RepoCollect)
for(int k = 0; k < static_cast< int >(METRICS.size()); ++k)
{
Id metric = registry.get(CATEGORY, METRICS[j]);
auto collectors = repo.allCollectors(metric);
const auto &doubleCols = collectors.first;
const auto &intCols = collectors.second;
Id metric = registry.get(CATEGORY, METRICS[j]);
auto collectors = repo.allCollectors(metric);
for(int l = 0; l < static_cast< int >(doubleCols.size()); ++l)
for(int l = 0; l < static_cast< int >(collectors.size()); ++l)
{
Record< double > record1 = doubleCols[k]->load();
ASSERT_THAT(record1, RecordEq(metric, 100u, 100, 100, 100));
Record< int > record2 = intCols[k]->load();
ASSERT_THAT(record2, RecordEq(metric, 100u, 100, 100, 100));
Record< int > record = collectors[k]->load();
ASSERT_THAT(record, RecordEq(metric, 100u, 100, 100, 100));
}
}
}
@ -483,32 +462,33 @@ TEST(MetricsCore, ManagerCollectSample1)
const int NUM_METRICS = sizeof(METRICS) / sizeof(*METRICS);
Manager manager;
CollectorRepo &rep = manager.collectorRepo();
CollectorRepo< double > &rep = manager.doubleCollectorRepo();
for(int i = 0; i < NUM_CATEGORIES; ++i)
{
for(int j = 0; j < NUM_METRICS; ++j)
{
rep.defaultDoubleCollector(CATEGORIES[i], METRICS[j])->tick(1);
rep.defaultCollector(CATEGORIES[i], METRICS[j])->tick(1);
}
}
absl::Time start = absl::Now();
std::this_thread::sleep_for(std::chrono::microseconds(100000));
std::vector< Record< double > > records;
Sample< double > sample = manager.collectSample(records, false);
Records records;
Samples sample = manager.collectSample(records, false);
absl::Duration window = absl::Now() - start;
absl::Time now = absl::Now();
ASSERT_EQ(NUM_CATEGORIES * NUM_METRICS, records.size());
ASSERT_EQ(NUM_CATEGORIES * NUM_METRICS, sample.recordCount());
ASSERT_EQ(NUM_CATEGORIES, sample.groupCount());
ASSERT_THAT(sample.sampleTime(), WithinWindow(now, absl::Milliseconds(10)));
ASSERT_EQ(NUM_CATEGORIES * NUM_METRICS, records.doubleRecords.size());
ASSERT_EQ(NUM_CATEGORIES * NUM_METRICS, sample.doubleSample.recordCount());
ASSERT_EQ(NUM_CATEGORIES, sample.doubleSample.groupCount());
ASSERT_THAT(sample.doubleSample.sampleTime(),
WithinWindow(now, absl::Milliseconds(10)));
for(size_t i = 0; i < sample.groupCount(); ++i)
for(size_t i = 0; i < sample.doubleSample.groupCount(); ++i)
{
const SampleGroup< double > &group = sample.group(i);
const SampleGroup< double > &group = sample.doubleSample.group(i);
ASSERT_EQ(NUM_METRICS, group.size());
ASSERT_THAT(group.samplePeriod(),
WithinWindow(window, absl::Milliseconds(10)))
@ -524,26 +504,25 @@ TEST(MetricsCore, ManagerCollectSample1)
{
for(size_t j = 0; j < NUM_METRICS; ++j)
{
DoubleCollector *col =
rep.defaultDoubleCollector(CATEGORIES[i], METRICS[j]);
DoubleCollector *col = rep.defaultCollector(CATEGORIES[i], METRICS[j]);
Record< double > record = col->load();
ASSERT_THAT(record, RecordEq(1u, 1, 1, 1));
}
}
records.clear();
records.doubleRecords.clear();
records.intRecords.clear();
sample = manager.collectSample(records, true);
ASSERT_EQ(NUM_CATEGORIES * NUM_METRICS, records.size());
ASSERT_EQ(NUM_CATEGORIES * NUM_METRICS, sample.recordCount());
ASSERT_EQ(NUM_CATEGORIES, sample.groupCount());
ASSERT_EQ(NUM_CATEGORIES * NUM_METRICS, records.doubleRecords.size());
ASSERT_EQ(NUM_CATEGORIES * NUM_METRICS, sample.doubleSample.recordCount());
ASSERT_EQ(NUM_CATEGORIES, sample.doubleSample.groupCount());
for(size_t i = 0; i < NUM_CATEGORIES; ++i)
{
for(size_t j = 0; j < NUM_METRICS; ++j)
{
DoubleCollector *col =
rep.defaultDoubleCollector(CATEGORIES[i], METRICS[j]);
DoubleCollector *col = rep.defaultCollector(CATEGORIES[i], METRICS[j]);
Record< double > record = col->load();
ASSERT_EQ(Record< double >(record.id()), record);
}
@ -561,8 +540,8 @@ TEST(MetricsCore, ManagerCollectSample2)
Manager manager;
std::vector< const Category * > allCategories;
CollectorRepo &rep = manager.collectorRepo();
Registry &reg = manager.registry();
CollectorRepo< double > &rep = manager.doubleCollectorRepo();
Registry &reg = manager.registry();
for(size_t i = 0; i < NUM_CATEGORIES; ++i)
{
const Category *cat = reg.get(CATEGORIES[i]);
@ -577,8 +556,7 @@ TEST(MetricsCore, ManagerCollectSample2)
{
for(size_t j = 0; j < NUM_METRICS; ++j)
{
DoubleCollector *col =
rep.defaultDoubleCollector(CATEGORIES[i], METRICS[j]);
DoubleCollector *col = rep.defaultCollector(CATEGORIES[i], METRICS[j]);
col->clear();
col->tick(1);
}
@ -586,20 +564,20 @@ TEST(MetricsCore, ManagerCollectSample2)
// Test without a reset.
std::vector< const Category * > cats = combIt.currentCombo;
std::vector< Record< double > > records;
Sample< double > sample = manager.collectSample(
Records records;
Samples sample = manager.collectSample(
records, absl::Span< const Category * >{cats}, false);
ASSERT_EQ(NUM_METRICS * cats.size(), sample.recordCount());
ASSERT_EQ(cats.size(), sample.groupCount());
ASSERT_EQ(NUM_METRICS * cats.size(), sample.doubleSample.recordCount());
ASSERT_EQ(cats.size(), sample.doubleSample.groupCount());
for(size_t i = 0; i < NUM_CATEGORIES; ++i)
{
// Verify the correct categories are in the sample (once)
const Category *CATEGORY = allCategories[i];
bool found = false;
for(size_t j = 0; j < sample.groupCount(); ++j)
for(size_t j = 0; j < sample.doubleSample.groupCount(); ++j)
{
if(CATEGORY == firstCategory(sample.group(j)))
if(CATEGORY == firstCategory(sample.doubleSample.group(j)))
{
found = true;
}
@ -610,29 +588,28 @@ TEST(MetricsCore, ManagerCollectSample2)
{
for(size_t j = 0; j < NUM_METRICS; ++j)
{
DoubleCollector *col =
rep.defaultDoubleCollector(CATEGORIES[i], METRICS[j]);
DoubleCollector *col = rep.defaultCollector(CATEGORIES[i], METRICS[j]);
Record< double > record = col->load();
ASSERT_THAT(record, RecordEq(1u, 1, 1, 1));
}
}
std::vector< Record< double > > records2;
Records records2;
// Test with a reset.
sample = manager.collectSample(records2,
absl::Span< const Category * >{cats}, true);
ASSERT_EQ(NUM_METRICS * cats.size(), sample.recordCount());
ASSERT_EQ(cats.size(), sample.groupCount());
ASSERT_EQ(NUM_METRICS * cats.size(), sample.doubleSample.recordCount());
ASSERT_EQ(cats.size(), sample.doubleSample.groupCount());
ASSERT_EQ(records, records2);
for(size_t i = 0; i < NUM_CATEGORIES; ++i)
{
// Verify the correct categories are in the sample
const Category *CATEGORY = allCategories[i];
bool found = false;
for(size_t j = 0; j < sample.groupCount(); ++j)
for(size_t j = 0; j < sample.doubleSample.groupCount(); ++j)
{
if(CATEGORY == firstCategory(sample.group(j)))
if(CATEGORY == firstCategory(sample.doubleSample.group(j)))
{
found = true;
}
@ -643,8 +620,7 @@ TEST(MetricsCore, ManagerCollectSample2)
{
for(size_t j = 0; j < NUM_METRICS; ++j)
{
DoubleCollector *col =
rep.defaultDoubleCollector(CATEGORIES[i], METRICS[j]);
DoubleCollector *col = rep.defaultCollector(CATEGORIES[i], METRICS[j]);
Record< double > record = col->load();
if(combIt.includesElement(i))
{
@ -670,7 +646,7 @@ struct MockPublisher : public Publisher
std::set< absl::Duration > times;
void
publish(const Sample< double > &sample) override
publish(const Sample< double > &sample, const Sample< int > &) override
{
invocations++;
@ -822,8 +798,8 @@ TEST(MetricsCore, PublishAll)
const int NUM_METRICS = sizeof(METRICS) / sizeof(*METRICS);
Manager manager;
Registry &registry = manager.registry();
CollectorRepo &repository = manager.collectorRepo();
Registry &registry = manager.registry();
CollectorRepo< double > &repository = manager.doubleCollectorRepo();
auto globalPub = std::make_shared< MockPublisher >();
@ -846,7 +822,7 @@ TEST(MetricsCore, PublishAll)
for(int j = 0; j < NUM_METRICS; ++j)
{
DoubleCollector *col =
repository.defaultDoubleCollector(CATEGORIES[i], METRICS[j]);
repository.defaultCollector(CATEGORIES[i], METRICS[j]);
col->clear();
col->tick(1);
}

Loading…
Cancel
Save