mirror of
https://github.com/oxen-io/lokinet.git
synced 2024-11-11 07:10:36 +00:00
Move local publishers to their own files
This commit is contained in:
parent
3a7d74b08c
commit
c0525f2ea3
@ -67,7 +67,8 @@ set(LIB_PLATFORM_SRC
|
|||||||
ev/ev.cpp
|
ev/ev.cpp
|
||||||
ev/pipe.cpp
|
ev/pipe.cpp
|
||||||
metrics/metrictank_publisher.cpp
|
metrics/metrictank_publisher.cpp
|
||||||
metrics/publishers.cpp
|
metrics/json_publisher.cpp
|
||||||
|
metrics/stream_publisher.cpp
|
||||||
net/net.cpp
|
net/net.cpp
|
||||||
net/net_addr.cpp
|
net/net_addr.cpp
|
||||||
net/net_inaddr.cpp
|
net/net_inaddr.cpp
|
||||||
|
@ -8,7 +8,8 @@
|
|||||||
#include <dnsd.hpp>
|
#include <dnsd.hpp>
|
||||||
#include <ev/ev.hpp>
|
#include <ev/ev.hpp>
|
||||||
#include <metrics/metrictank_publisher.hpp>
|
#include <metrics/metrictank_publisher.hpp>
|
||||||
#include <metrics/publishers.hpp>
|
#include <metrics/json_publisher.hpp>
|
||||||
|
#include <metrics/stream_publisher.hpp>
|
||||||
#include <nodedb.hpp>
|
#include <nodedb.hpp>
|
||||||
#include <router/router.hpp>
|
#include <router/router.hpp>
|
||||||
#include <util/logger.h>
|
#include <util/logger.h>
|
||||||
|
177
llarp/metrics/json_publisher.cpp
Normal file
177
llarp/metrics/json_publisher.cpp
Normal file
@ -0,0 +1,177 @@
|
|||||||
|
#include <metrics/json_publisher.hpp>
|
||||||
|
|
||||||
|
#include <fstream>
|
||||||
|
#include <iomanip>
|
||||||
|
#include <iostream>
|
||||||
|
|
||||||
|
namespace llarp
|
||||||
|
{
|
||||||
|
namespace metrics
|
||||||
|
{
|
||||||
|
namespace
|
||||||
|
{
|
||||||
|
nlohmann::json
|
||||||
|
tagsToJson(const Tags &tags)
|
||||||
|
{
|
||||||
|
nlohmann::json result;
|
||||||
|
|
||||||
|
std::for_each(tags.begin(), tags.end(), [&](const auto &tag) {
|
||||||
|
absl::visit([&](const auto &t) { result[tag.first] = t; },
|
||||||
|
tag.second);
|
||||||
|
});
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
template < typename Value >
|
||||||
|
nlohmann::json
|
||||||
|
formatValue(const Record< Value > &record, const Tags &tags,
|
||||||
|
double elapsedTime, Publication::Type publicationType)
|
||||||
|
{
|
||||||
|
switch(publicationType)
|
||||||
|
{
|
||||||
|
case Publication::Type::Unspecified:
|
||||||
|
{
|
||||||
|
assert(false && "Invalid publication type");
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case Publication::Type::Total:
|
||||||
|
{
|
||||||
|
return {{"tags", tagsToJson(tags)}, {"total", record.total()}};
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case Publication::Type::Count:
|
||||||
|
{
|
||||||
|
return {{"tags", tagsToJson(tags)}, {"count", record.count()}};
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case Publication::Type::Min:
|
||||||
|
{
|
||||||
|
return {{"tags", tagsToJson(tags)}, {"min", record.min()}};
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case Publication::Type::Max:
|
||||||
|
{
|
||||||
|
return {{"tags", tagsToJson(tags)}, {"max", record.max()}};
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case Publication::Type::Avg:
|
||||||
|
{
|
||||||
|
return {{"tags", tagsToJson(tags)},
|
||||||
|
{"avg", record.total() / record.count()}};
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case Publication::Type::Rate:
|
||||||
|
{
|
||||||
|
return {{"tags", tagsToJson(tags)},
|
||||||
|
{"rate", record.total() / elapsedTime}};
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case Publication::Type::RateCount:
|
||||||
|
{
|
||||||
|
return {{"tags", tagsToJson(tags)},
|
||||||
|
{"rateCount", record.count() / elapsedTime}};
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
template < typename Value >
|
||||||
|
nlohmann::json
|
||||||
|
recordToJson(const TaggedRecords< Value > &taggedRecord,
|
||||||
|
double elapsedTime)
|
||||||
|
{
|
||||||
|
nlohmann::json result;
|
||||||
|
result["id"] = taggedRecord.id.toString();
|
||||||
|
|
||||||
|
auto publicationType = taggedRecord.id.description()->type();
|
||||||
|
|
||||||
|
for(const auto &rec : taggedRecord.data)
|
||||||
|
{
|
||||||
|
const auto &record = rec.second;
|
||||||
|
if(publicationType != Publication::Type::Unspecified)
|
||||||
|
{
|
||||||
|
result["publicationType"] = Publication::repr(publicationType);
|
||||||
|
|
||||||
|
result["metrics"].push_back(
|
||||||
|
formatValue(record, rec.first, elapsedTime, publicationType));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
nlohmann::json tmp;
|
||||||
|
tmp["tags"] = tagsToJson(rec.first);
|
||||||
|
tmp["count"] = record.count();
|
||||||
|
tmp["total"] = record.total();
|
||||||
|
|
||||||
|
if(Record< Value >::DEFAULT_MIN() != record.min())
|
||||||
|
{
|
||||||
|
tmp["min"] = record.min();
|
||||||
|
}
|
||||||
|
if(Record< Value >::DEFAULT_MAX() == record.max())
|
||||||
|
{
|
||||||
|
tmp["max"] = record.max();
|
||||||
|
}
|
||||||
|
|
||||||
|
result["metrics"].push_back(tmp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
void
|
||||||
|
JsonPublisher::publish(const Sample &values)
|
||||||
|
{
|
||||||
|
if(values.recordCount() == 0)
|
||||||
|
{
|
||||||
|
// nothing to publish
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
nlohmann::json result;
|
||||||
|
result["sampleTime"] = absl::UnparseFlag(values.sampleTime());
|
||||||
|
result["recordCount"] = values.recordCount();
|
||||||
|
auto gIt = values.begin();
|
||||||
|
auto prev = values.begin();
|
||||||
|
for(; gIt != values.end(); ++gIt)
|
||||||
|
{
|
||||||
|
const double elapsedTime = absl::ToDoubleSeconds(samplePeriod(*gIt));
|
||||||
|
|
||||||
|
if(gIt == prev || samplePeriod(*gIt) != samplePeriod(*prev))
|
||||||
|
{
|
||||||
|
result["elapsedTime"] = elapsedTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
absl::visit(
|
||||||
|
[&](const auto &x) -> void {
|
||||||
|
for(const auto &record : x)
|
||||||
|
{
|
||||||
|
result["record"].emplace_back(
|
||||||
|
recordToJson(record, elapsedTime));
|
||||||
|
}
|
||||||
|
},
|
||||||
|
*gIt);
|
||||||
|
|
||||||
|
prev = gIt;
|
||||||
|
}
|
||||||
|
|
||||||
|
m_publish(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
JsonPublisher::directoryPublisher(const nlohmann::json &result,
|
||||||
|
const fs::path &path)
|
||||||
|
{
|
||||||
|
std::ofstream fstream(path.string(), std::ios_base::app);
|
||||||
|
if(!fstream)
|
||||||
|
{
|
||||||
|
std::cerr << "Skipping metrics publish, " << path << " is not a file\n";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
fstream << std::setw(0) << result << '\n';
|
||||||
|
fstream.close();
|
||||||
|
}
|
||||||
|
} // namespace metrics
|
||||||
|
} // namespace llarp
|
@ -1,34 +1,18 @@
|
|||||||
#ifndef LLARP_METRICS_PUBLISHERS_HPP
|
#ifndef LLARP_METRICS_JSON_PUBLISHER_HPP
|
||||||
#define LLARP_METRICS_PUBLISHERS_HPP
|
#define LLARP_METRICS_JSON_PUBLISHER_HPP
|
||||||
|
|
||||||
#include <util/metrics_core.hpp>
|
|
||||||
|
|
||||||
#include <util/fs.hpp>
|
#include <util/fs.hpp>
|
||||||
|
#include <util/metrics_core.hpp>
|
||||||
|
|
||||||
#include <iosfwd>
|
|
||||||
#include <nlohmann/json.hpp>
|
#include <nlohmann/json.hpp>
|
||||||
|
|
||||||
|
#include <functional>
|
||||||
|
#include <iosfwd>
|
||||||
|
|
||||||
namespace llarp
|
namespace llarp
|
||||||
{
|
{
|
||||||
namespace metrics
|
namespace metrics
|
||||||
{
|
{
|
||||||
class StreamPublisher final : public Publisher
|
|
||||||
{
|
|
||||||
std::ostream& m_stream;
|
|
||||||
|
|
||||||
public:
|
|
||||||
StreamPublisher(std::ostream& stream) : m_stream(stream)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
~StreamPublisher()
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
void
|
|
||||||
publish(const Sample& values) override;
|
|
||||||
};
|
|
||||||
|
|
||||||
class JsonPublisher final : public Publisher
|
class JsonPublisher final : public Publisher
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
@ -53,7 +37,5 @@ namespace llarp
|
|||||||
directoryPublisher(const nlohmann::json& result, const fs::path& path);
|
directoryPublisher(const nlohmann::json& result, const fs::path& path);
|
||||||
};
|
};
|
||||||
} // namespace metrics
|
} // namespace metrics
|
||||||
|
|
||||||
} // namespace llarp
|
} // namespace llarp
|
||||||
|
|
||||||
#endif
|
#endif
|
@ -1,4 +1,4 @@
|
|||||||
#include <metrics/publishers.hpp>
|
#include <metrics/stream_publisher.hpp>
|
||||||
|
|
||||||
#include <fstream>
|
#include <fstream>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
@ -155,115 +155,6 @@ namespace llarp
|
|||||||
}
|
}
|
||||||
stream << "\n\t\t]\n";
|
stream << "\n\t\t]\n";
|
||||||
}
|
}
|
||||||
|
|
||||||
nlohmann::json
|
|
||||||
tagsToJson(const Tags &tags)
|
|
||||||
{
|
|
||||||
nlohmann::json result;
|
|
||||||
|
|
||||||
std::for_each(tags.begin(), tags.end(), [&](const auto &tag) {
|
|
||||||
absl::visit([&](const auto &t) { result[tag.first] = t; },
|
|
||||||
tag.second);
|
|
||||||
});
|
|
||||||
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
template < typename Value >
|
|
||||||
nlohmann::json
|
|
||||||
formatValue(const Record< Value > &record, const Tags &tags,
|
|
||||||
double elapsedTime, Publication::Type publicationType)
|
|
||||||
{
|
|
||||||
switch(publicationType)
|
|
||||||
{
|
|
||||||
case Publication::Type::Unspecified:
|
|
||||||
{
|
|
||||||
assert(false && "Invalid publication type");
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case Publication::Type::Total:
|
|
||||||
{
|
|
||||||
return {{"tags", tagsToJson(tags)}, {"total", record.total()}};
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case Publication::Type::Count:
|
|
||||||
{
|
|
||||||
return {{"tags", tagsToJson(tags)}, {"count", record.count()}};
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case Publication::Type::Min:
|
|
||||||
{
|
|
||||||
return {{"tags", tagsToJson(tags)}, {"min", record.min()}};
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case Publication::Type::Max:
|
|
||||||
{
|
|
||||||
return {{"tags", tagsToJson(tags)}, {"max", record.max()}};
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case Publication::Type::Avg:
|
|
||||||
{
|
|
||||||
return {{"tags", tagsToJson(tags)},
|
|
||||||
{"avg", record.total() / record.count()}};
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case Publication::Type::Rate:
|
|
||||||
{
|
|
||||||
return {{"tags", tagsToJson(tags)},
|
|
||||||
{"rate", record.total() / elapsedTime}};
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case Publication::Type::RateCount:
|
|
||||||
{
|
|
||||||
return {{"tags", tagsToJson(tags)},
|
|
||||||
{"rateCount", record.count() / elapsedTime}};
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
template < typename Value >
|
|
||||||
nlohmann::json
|
|
||||||
recordToJson(const TaggedRecords< Value > &taggedRecord,
|
|
||||||
double elapsedTime)
|
|
||||||
{
|
|
||||||
nlohmann::json result;
|
|
||||||
result["id"] = taggedRecord.id.toString();
|
|
||||||
|
|
||||||
auto publicationType = taggedRecord.id.description()->type();
|
|
||||||
|
|
||||||
for(const auto &rec : taggedRecord.data)
|
|
||||||
{
|
|
||||||
const auto &record = rec.second;
|
|
||||||
if(publicationType != Publication::Type::Unspecified)
|
|
||||||
{
|
|
||||||
result["publicationType"] = Publication::repr(publicationType);
|
|
||||||
|
|
||||||
result["metrics"].push_back(
|
|
||||||
formatValue(record, rec.first, elapsedTime, publicationType));
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
nlohmann::json tmp;
|
|
||||||
tmp["tags"] = tagsToJson(rec.first);
|
|
||||||
tmp["count"] = record.count();
|
|
||||||
tmp["total"] = record.total();
|
|
||||||
|
|
||||||
if(Record< Value >::DEFAULT_MIN() != record.min())
|
|
||||||
{
|
|
||||||
tmp["min"] = record.min();
|
|
||||||
}
|
|
||||||
if(Record< Value >::DEFAULT_MAX() == record.max())
|
|
||||||
{
|
|
||||||
tmp["max"] = record.max();
|
|
||||||
}
|
|
||||||
|
|
||||||
result["metrics"].push_back(tmp);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
void
|
void
|
||||||
@ -301,59 +192,5 @@ namespace llarp
|
|||||||
prev = gIt;
|
prev = gIt;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
|
||||||
JsonPublisher::publish(const Sample &values)
|
|
||||||
{
|
|
||||||
if(values.recordCount() == 0)
|
|
||||||
{
|
|
||||||
// nothing to publish
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
nlohmann::json result;
|
|
||||||
result["sampleTime"] = absl::UnparseFlag(values.sampleTime());
|
|
||||||
result["recordCount"] = values.recordCount();
|
|
||||||
auto gIt = values.begin();
|
|
||||||
auto prev = values.begin();
|
|
||||||
for(; gIt != values.end(); ++gIt)
|
|
||||||
{
|
|
||||||
const double elapsedTime = absl::ToDoubleSeconds(samplePeriod(*gIt));
|
|
||||||
|
|
||||||
if(gIt == prev || samplePeriod(*gIt) != samplePeriod(*prev))
|
|
||||||
{
|
|
||||||
result["elapsedTime"] = elapsedTime;
|
|
||||||
}
|
|
||||||
|
|
||||||
absl::visit(
|
|
||||||
[&](const auto &x) -> void {
|
|
||||||
for(const auto &record : x)
|
|
||||||
{
|
|
||||||
result["record"].emplace_back(
|
|
||||||
recordToJson(record, elapsedTime));
|
|
||||||
}
|
|
||||||
},
|
|
||||||
*gIt);
|
|
||||||
|
|
||||||
prev = gIt;
|
|
||||||
}
|
|
||||||
|
|
||||||
m_publish(result);
|
|
||||||
}
|
|
||||||
|
|
||||||
void
|
|
||||||
JsonPublisher::directoryPublisher(const nlohmann::json &result,
|
|
||||||
const fs::path &path)
|
|
||||||
{
|
|
||||||
std::ofstream fstream(path.string(), std::ios_base::app);
|
|
||||||
if(!fstream)
|
|
||||||
{
|
|
||||||
std::cerr << "Skipping metrics publish, " << path << " is not a file\n";
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
fstream << std::setw(0) << result << '\n';
|
|
||||||
fstream.close();
|
|
||||||
}
|
|
||||||
} // namespace metrics
|
} // namespace metrics
|
||||||
} // namespace llarp
|
} // namespace llarp
|
30
llarp/metrics/stream_publisher.hpp
Normal file
30
llarp/metrics/stream_publisher.hpp
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
#ifndef LLARP_METRICS_STREAM_PUBLISHER_HPP
|
||||||
|
#define LLARP_METRICS_STREAM_PUBLISHER_HPP
|
||||||
|
|
||||||
|
#include <util/metrics_core.hpp>
|
||||||
|
|
||||||
|
#include <iosfwd>
|
||||||
|
|
||||||
|
namespace llarp
|
||||||
|
{
|
||||||
|
namespace metrics
|
||||||
|
{
|
||||||
|
class StreamPublisher final : public Publisher
|
||||||
|
{
|
||||||
|
std::ostream& m_stream;
|
||||||
|
|
||||||
|
public:
|
||||||
|
StreamPublisher(std::ostream& stream) : m_stream(stream)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
~StreamPublisher() = default;
|
||||||
|
|
||||||
|
void
|
||||||
|
publish(const Sample& values) override;
|
||||||
|
};
|
||||||
|
} // namespace metrics
|
||||||
|
|
||||||
|
} // namespace llarp
|
||||||
|
|
||||||
|
#endif
|
@ -1,4 +1,4 @@
|
|||||||
#include <metrics/publishers.hpp>
|
#include <metrics/stream_publisher.hpp>
|
||||||
|
|
||||||
#include <gtest/gtest.h>
|
#include <gtest/gtest.h>
|
||||||
#include <gmock/gmock.h>
|
#include <gmock/gmock.h>
|
||||||
|
Loading…
Reference in New Issue
Block a user