diff --git a/llarp/metrics/metrictank_publisher.cpp b/llarp/metrics/metrictank_publisher.cpp new file mode 100644 index 000000000..8762ba498 --- /dev/null +++ b/llarp/metrics/metrictank_publisher.cpp @@ -0,0 +1,375 @@ +#include + +#include + +#include +#include +#include + +#ifndef _WIN32 +#include +#include +#include +#include +#else +#define WIN32_LEAN_AND_MEAN +#include +#include +#include +#include +#endif + +namespace llarp +{ + namespace metrics + { + namespace + { + absl::optional< std::string > + makeStr(double d) + { + if(std::isnan(d) || std::isinf(d)) + { + return {}; + } + else + { + return std::to_string(d); + } + } + + absl::optional< std::string > + formatValue(const Record &record, double elapsedTime, + Publication::Type publicationType) + { + switch(publicationType) + { + case Publication::Type::Unspecified: + { + assert(false && "Invalid publication type"); + } + break; + case Publication::Type::Total: + { + return makeStr(record.total()); + } + break; + case Publication::Type::Count: + { + return std::to_string(record.count()); + } + break; + case Publication::Type::Min: + { + return makeStr(record.min()); + } + break; + case Publication::Type::Max: + { + return makeStr(record.max()); + } + break; + case Publication::Type::Avg: + { + return makeStr(record.total() / record.count()); + } + break; + case Publication::Type::Rate: + { + return makeStr(record.total() / elapsedTime); + } + break; + case Publication::Type::RateCount: + { + return makeStr(record.count() / elapsedTime); + } + break; + } + } + + std::string + addName(string_view id, string_view name, string_view suffix) + { + return absl::StrCat(id, ".", name, suffix); + } + + std::vector< MetricTankPublisherInterface::PublishData > + recordToData(const Record &record, absl::Time time, double elapsedTime, + string_view suffix) + { + std::vector< MetricTankPublisherInterface::PublishData > result; + + std::string id = record.id().toString(); + + auto publicationType = record.id().description()->type(); + if(publicationType != Publication::Type::Unspecified) + { + auto val = formatValue(record, elapsedTime, publicationType); + + if(val) + { + result.emplace_back( + addName(id, Publication::repr(publicationType), suffix), + val.value(), time); + } + } + else + { + result.emplace_back(addName(id, "count", suffix), + std::to_string(record.count()), time); + result.emplace_back(addName(id, "total", suffix), + std::to_string(record.total()), time); + + if(Record::DEFAULT_MIN != record.min() && !std::isnan(record.min()) + && !std::isinf(record.min())) + { + result.emplace_back(addName(id, "min", suffix), + std::to_string(record.min()), time); + } + if(Record::DEFAULT_MAX == record.max() && !std::isnan(record.max()) + && !std::isinf(record.max())) + { + result.emplace_back(addName(id, "max", suffix), + std::to_string(record.max()), time); + } + } + return result; + } + +#ifndef _WIN32 + void + publishData(const std::vector< std::string > &toSend, + const std::string &host, short port) + { + struct addrinfo hints, *addrs; + bzero(&hints, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + + const std::string portAsStr = std::to_string(port); + + if(getaddrinfo(host.c_str(), portAsStr.c_str(), &hints, &addrs) != 0) + { + LogError("Failed to get address info"); + return; + } + + int sock = + ::socket(addrs->ai_family, addrs->ai_socktype, addrs->ai_protocol); + + if(sock < 0) + { + LogError("Failed to open socket"); + freeaddrinfo(addrs); + return; + } + + if(connect(sock, addrs->ai_addr, addrs->ai_addrlen) < 0) + { + LogError("Failed to connect to metrictank"); + close(sock); + freeaddrinfo(addrs); + return; + } + + freeaddrinfo(addrs); + + for(const std::string &val : toSend) + { + ssize_t sentLen = 0; + + do + { + sentLen = + ::send(sock, val.c_str() + sentLen, val.size() - sentLen, 0); + if(sentLen == -1) + { + LogError("Error ", strerror(errno)); + } + } while((0 <= sentLen) + && (static_cast< size_t >(sentLen) < val.size())); + } + + shutdown(sock, SHUT_RDWR); + close(sock); + } +#else + void + publishData(const std::vector< std::string > &toSend, + const std::string &host, short port) + { + struct addrinfo *result = NULL, hints; + bzero(&hints, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = IPPROTO_TCP; + + const std::string portAsStr = std::to_string(port); + + if(getaddrinfo(host.c_str(), portAsStr.c_str(), &hints, &result) != 0) + { + LogError("Failed to get address info"); + return; + } + + int sock = + ::socket(addrs->ai_family, addrs->ai_socktype, addrs->ai_protocol); + + if(sock == INVALID_SOCKET) + { + LogError("Failed to open socket"); + freeaddrinfo(addrs); + return; + } + + if(connect(sock, addrs->ai_addr, addrs->ai_addrlen) == SOCKET_ERROR) + { + LogError("Failed to connect to metrictank"); + closesocket(sock); + freeaddrinfo(addrs); + return; + } + + freeaddrinfo(addrs); + + for(const std::string &val : toSend) + { + ssize_t sentLen = 0; + + do + { + sentLen = + ::send(sock, val.c_str() + sentLen, val.size() - sentLen, 0); + if(sentLen == SOCKET_ERROR) + { + LogError("Error ", strerror(errno)); + } + } while((0 <= sentLen) + && (static_cast< size_t >(sentLen) < val.size())); + } + + shutdown(sock, SHUT_RDWR); + closesocket(sock); + } +#endif + + MetricTankPublisherInterface::Tags + updateTags(MetricTankPublisherInterface::Tags tags) + { + if(tags.count("system") == 0) + { +#if defined(_WIN32) || defined(_WIN64) || defined(__NT__) + tags["system"] = "windows"; +#elif defined(__APPLE__) + tags["system"] = "macos"; +#elif defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__) + tags["system"] = "bsd"; +#elif defined(__sun) + tags["system"] = "solaris"; +#elif defined(__linux__) + tags["system"] = "linux"; +#else + tags["system"] = "unknown"; +#endif + } + + if(tags.count("user") == 0) + { +#ifndef _WIN32 + tags["user"] = getlogin(); +#else + char username[UNLEN + 1]; + DWORD username_len = UNLEN + 1; + GetUserName(username, &username_len); + tags["user"] = username; +#endif + } + + return tags; + } + } // namespace + + std::string + MetricTankPublisherInterface::makeSuffix(const Tags &tags) + { + std::string result; + for(const auto &tag : updateTags(tags)) + { + absl::StrAppend(&result, ";", tag.first, "=", tag.second); + } + return result; + } + + void + MetricTankPublisherInterface::publish(const Sample &values) + { + if(values.recordCount() == 0) + { + // nothing to publish + return; + } + + absl::Time sampleTime = values.sampleTime(); + + std::vector< PublishData > result; + result.reserve(values.recordCount()); + + auto gIt = values.begin(); + auto prev = values.begin(); + for(; gIt != values.end(); ++gIt) + { + const double elapsedTime = absl::ToDoubleSeconds(gIt->samplePeriod()); + + for(const auto &record : *gIt) + { + auto partial = + recordToData(record, sampleTime, elapsedTime, m_suffix); + result.insert(result.end(), partial.begin(), partial.end()); + } + prev = gIt; + } + + publish(result); + } + + void + MetricTankPublisher::publish(const std::vector< PublishData > &data) + { + if(m_queue.tryPushBack(data) == thread::QueueReturn::QueueFull) + { + LogWarn("Dropping metrictank logs!"); + } + } + + void + MetricTankPublisher::work() + { + while(true) + { + auto data = m_queue.popFront(); // block until we get something + + // Finish + if(absl::holds_alternative< StopStruct >(data)) + { + return; + } + + assert(absl::holds_alternative< std::vector< PublishData > >(data)); + + auto vec = absl::get< std::vector< PublishData > >(data); + + std::vector< std::string > toSend; + toSend.reserve(vec.size()); + + std::transform(vec.begin(), vec.end(), std::back_inserter(toSend), + [](const PublishData &d) -> std::string { + return absl::StrCat( + std::get< 0 >(d), " ", std::get< 1 >(d), " ", + absl::ToUnixSeconds(std::get< 2 >(d)), "\n"); + }); + + publishData(toSend, m_host, m_port); + } + } + } // namespace metrics +} // namespace llarp diff --git a/llarp/metrics/metrictank_publisher.hpp b/llarp/metrics/metrictank_publisher.hpp new file mode 100644 index 000000000..23fbb699f --- /dev/null +++ b/llarp/metrics/metrictank_publisher.hpp @@ -0,0 +1,91 @@ +#ifndef LLARP_METRICS_METRICTANK_PUBLISHER_HPP +#define LLARP_METRICS_METRICTANK_PUBLISHER_HPP + +#include + +#include + +#include +#include +#include +#include +#include + +namespace llarp +{ + namespace metrics + { + class MetricTankPublisherInterface : public Publisher + { + public: + // Format for metrictank is + // Metric path = metrics.namespaces.metric;key=value;key1=value2 + using PublishData = std::tuple< std::string, std::string, absl::Time >; + using Tags = std::map< std::string, std::string >; + + private: + const std::string m_suffix; // tags to send to metric tank + + public: + MetricTankPublisherInterface(const Tags& tags) + : m_suffix(makeSuffix(tags)) + { + } + + ~MetricTankPublisherInterface() + { + } + + static std::string + makeSuffix(const Tags& tags); + + void + publish(const Sample& values) override; + + virtual void + publish(const std::vector< PublishData >& data) = 0; + }; + + class MetricTankPublisher final : public MetricTankPublisherInterface + { + private: + const std::string m_host; + const short m_port; + + struct StopStruct + { + }; + + using Queue = thread::Queue< + absl::variant< std::vector< PublishData >, StopStruct > >; + + Queue m_queue; // queue of things to publish + std::thread m_worker; // worker thread + + void + work(); + + public: + MetricTankPublisher(const Tags& tags, const std::string& host, short port) + : MetricTankPublisherInterface(tags) + , m_host(host) + , m_port(port) + , m_queue(100) + , m_worker(&MetricTankPublisher::work, this) + { + } + + ~MetricTankPublisher() + { + // Push back a signal value onto the queue + m_queue.pushBack(StopStruct()); + } + + void + publish(const std::vector< PublishData >& data) override; + }; + + } // namespace metrics +} // namespace llarp + +#endif diff --git a/llarp/metrics/publishers.cpp b/llarp/metrics/publishers.cpp index 95558e94f..33118369d 100644 --- a/llarp/metrics/publishers.cpp +++ b/llarp/metrics/publishers.cpp @@ -10,40 +10,14 @@ namespace llarp { namespace { + template < typename Value > void - formatValue(std::ostream &stream, size_t value, + formatValue(std::ostream &stream, Value value, const FormatSpec *formatSpec) { if(formatSpec) { - FormatSpec::format(stream, (double)value, *formatSpec); - } - else - { - stream << value; - } - } - - void - formatValue(std::ostream &stream, int value, const FormatSpec *formatSpec) - { - if(formatSpec) - { - FormatSpec::format(stream, (double)value, *formatSpec); - } - else - { - stream << value; - } - } - - void - formatValue(std::ostream &stream, double value, - const FormatSpec *formatSpec) - { - if(formatSpec) - { - FormatSpec::format(stream, value, *formatSpec); + FormatSpec::format(stream, static_cast< double >(value), *formatSpec); } else { @@ -238,7 +212,6 @@ namespace llarp return result; } - } // namespace void diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 3be9ce42c..b43b7d03f 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -18,6 +18,7 @@ list(APPEND TEST_SRC ev/test_ev_loop.cpp exit/test_llarp_exit_context.cpp link/test_llarp_link.cpp + metrics/test_llarp_metrics_metricktank.cpp metrics/test_llarp_metrics_publisher.cpp net/test_llarp_net_inaddr.cpp net/test_llarp_net.cpp diff --git a/test/metrics/test_llarp_metrics_metricktank.cpp b/test/metrics/test_llarp_metrics_metricktank.cpp new file mode 100644 index 000000000..5a17a1bb9 --- /dev/null +++ b/test/metrics/test_llarp_metrics_metricktank.cpp @@ -0,0 +1,21 @@ +#include + +#include +#include + +using namespace llarp; +using namespace ::testing; + +using Interface = metrics::MetricTankPublisherInterface; + +TEST(MetricTank, maketags) +{ + Interface::Tags tags; + std::string result = Interface::makeSuffix(tags); + + ASSERT_THAT(result, Not(IsEmpty())); + + tags["user"] = "Thanos"; + result = Interface::makeSuffix(tags); + ASSERT_THAT(result, HasSubstr(";user=Thanos")); +}