Skip to content

Commit

Permalink
MINIFICPP-2503 Remove C2 metric duplication
Browse files Browse the repository at this point in the history
  • Loading branch information
lordgamez committed Dec 14, 2024
1 parent 25882c1 commit 4322d94
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 27 deletions.
5 changes: 3 additions & 2 deletions extensions/prometheus/PrometheusMetricsPublisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ std::vector<gsl::not_null<std::shared_ptr<state::PublishedMetricProvider>>> Prom
metric_classes_str = configuration_->get(minifi::Configuration::nifi_metrics_publisher_metrics);
}
if (metric_classes_str && !metric_classes_str->empty()) {
auto metric_classes = utils::string::split(*metric_classes_str, ",");
for (const std::string& clazz : metric_classes) {
auto metric_classes = utils::string::splitAndTrimRemovingEmpty(*metric_classes_str, ",");
std::unordered_set<std::string> unique_metric_classes{metric_classes.begin(), metric_classes.end()};
for (const std::string& clazz : unique_metric_classes) {
auto response_nodes = response_node_loader_->loadResponseNodes(clazz);
if (response_nodes.empty()) {
logger_->log_warn("Metric class '{}' could not be loaded.", clazz);
Expand Down
2 changes: 2 additions & 0 deletions libminifi/include/core/state/nodes/ResponseNodeLoader.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,11 @@ class ResponseNodeLoader {
mutable std::mutex root_mutex_;
mutable std::mutex component_metrics_mutex_;
mutable std::mutex system_metrics_mutex_;
mutable std::mutex initialization_mutex_;
core::ProcessGroup* root_{};
std::unordered_map<std::string, std::vector<SharedResponseNode>> component_metrics_;
std::unordered_map<std::string, SharedResponseNode> system_metrics_;
std::unordered_set<std::string> initialized_metrics_;
std::shared_ptr<Configure> configuration_;
std::vector<std::shared_ptr<core::RepositoryMetricsSource>> repository_metric_sources_;
std::shared_ptr<core::FlowConfiguration> flow_configuration_;
Expand Down
25 changes: 15 additions & 10 deletions libminifi/src/c2/C2MetricsPublisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ namespace org::apache::nifi::minifi::c2 {

void C2MetricsPublisher::loadNodeClasses(const std::string& class_definitions, const state::response::SharedResponseNode& new_node) {
gsl_Expects(response_node_loader_);
auto classes = utils::string::split(class_definitions, ",");
for (const std::string& clazz : classes) {
auto classes = utils::string::splitAndTrimRemovingEmpty(class_definitions, ",");
std::unordered_set<std::string> unique_classes{classes.begin(), classes.end()};
for (const std::string& clazz : unique_classes) {
auto response_nodes = response_node_loader_->loadResponseNodes(clazz);
if (response_nodes.empty()) {
continue;
Expand All @@ -60,9 +61,10 @@ void C2MetricsPublisher::loadC2ResponseConfiguration(const std::string &prefix)
return;
}

std::vector<std::string> classes = utils::string::split(class_definitions, ",");
auto classes = utils::string::splitAndTrimRemovingEmpty(class_definitions, ",");
std::unordered_set<std::string> unique_classes{classes.begin(), classes.end()};

for (const std::string& metricsClass : classes) {
for (const std::string& metricsClass : unique_classes) {
try {
std::string option = utils::string::join_pack(prefix, ".", metricsClass);
std::string classOption = option + ".classes";
Expand Down Expand Up @@ -97,9 +99,10 @@ state::response::SharedResponseNode C2MetricsPublisher::loadC2ResponseConfigurat
if (!configuration_->get(prefix, class_definitions)) {
return prev_node;
}
std::vector<std::string> classes = utils::string::split(class_definitions, ",");
auto classes = utils::string::splitAndTrimRemovingEmpty(class_definitions, ",");
std::unordered_set<std::string> unique_classes{classes.begin(), classes.end()};

for (const std::string& metricsClass : classes) {
for (const std::string& metricsClass : unique_classes) {
try {
std::string option = utils::string::join_pack(prefix, ".", metricsClass);
std::string classOption = option + ".classes";
Expand All @@ -111,8 +114,9 @@ state::response::SharedResponseNode C2MetricsPublisher::loadC2ResponseConfigurat
}
state::response::SharedResponseNode new_node = gsl::make_not_null(std::make_shared<state::response::ObjectNode>(name));
if (name.find(',') != std::string::npos) {
std::vector<std::string> sub_classes = utils::string::split(name, ",");
for (const std::string& subClassStr : classes) {
auto sub_classes = utils::string::splitAndTrimRemovingEmpty(name, ",");
std::unordered_set<std::string> unique_sub_classes{sub_classes.begin(), sub_classes.end()};
for (const std::string& subClassStr : unique_sub_classes) {
auto node = loadC2ResponseConfiguration(subClassStr, prev_node);
static_cast<state::response::ObjectNode*>(prev_node.get())->add_node(node);
}
Expand Down Expand Up @@ -198,9 +202,10 @@ void C2MetricsPublisher::loadMetricNodes() {
std::string class_csv;
std::lock_guard<std::mutex> guard{metrics_mutex_};
if (configuration_->get(minifi::Configuration::nifi_c2_root_classes, class_csv)) {
std::vector<std::string> classes = utils::string::split(class_csv, ",");
auto classes = utils::string::splitAndTrimRemovingEmpty(class_csv, ",");
std::unordered_set<std::string> unique_classes{classes.begin(), classes.end()};

for (const std::string& clazz : classes) {
for (const std::string& clazz : unique_classes) {
auto response_nodes = response_node_loader_->loadResponseNodes(clazz);
if (response_nodes.empty()) {
continue;
Expand Down
5 changes: 3 additions & 2 deletions libminifi/src/core/state/LogMetricsPublisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,10 @@ void LogMetricsPublisher::loadMetricNodes() {
metric_classes_str = configuration_->get(minifi::Configuration::nifi_metrics_publisher_metrics);
}
if (metric_classes_str && !metric_classes_str->empty()) {
auto metric_classes = utils::string::split(*metric_classes_str, ",");
auto metric_classes = utils::string::splitAndTrimRemovingEmpty(*metric_classes_str, ",");
std::unordered_set<std::string> unique_metric_classes{metric_classes.begin(), metric_classes.end()};
std::lock_guard<std::mutex> lock(response_nodes_mutex_);
for (const std::string& clazz : metric_classes) {
for (const std::string& clazz : unique_metric_classes) {
auto loaded_response_nodes = response_node_loader_->loadResponseNodes(clazz);
if (loaded_response_nodes.empty()) {
logger_->log_warn("Metric class '{}' could not be loaded.", clazz);
Expand Down
7 changes: 4 additions & 3 deletions libminifi/src/core/state/MetricsPublisherFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ std::vector<gsl::not_null<std::unique_ptr<MetricsPublisher>>> createMetricsPubli
const std::shared_ptr<Configure>& configuration, const std::shared_ptr<state::response::ResponseNodeLoader>& response_node_loader) {
if (auto metrics_publisher_class_str = configuration->get(minifi::Configure::nifi_metrics_publisher_class)) {
std::vector<gsl::not_null<std::unique_ptr<MetricsPublisher>>> publishers;
auto publisher_classes = minifi::utils::string::split(*metrics_publisher_class_str, ",");
publishers.reserve(publisher_classes.size());
for (const auto& publisher_class : publisher_classes) {
auto publisher_classes = minifi::utils::string::splitAndTrimRemovingEmpty(*metrics_publisher_class_str, ",");
std::unordered_set<std::string> unique_publisher_classes{publisher_classes.begin(), publisher_classes.end()};
publishers.reserve(unique_publisher_classes.size());
for (const auto& publisher_class : unique_publisher_classes) {
publishers.push_back(createMetricsPublisher(publisher_class, configuration, response_node_loader));
}
return publishers;
Expand Down
9 changes: 9 additions & 0 deletions libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ ResponseNodeLoader::ResponseNodeLoader(std::shared_ptr<Configure> configuration,
}

void ResponseNodeLoader::clearConfigRoot() {
{
std::lock_guard<std::mutex> guard(initialization_mutex_);
initialized_metrics_.clear();
}
{
std::lock_guard<std::mutex> guard(system_metrics_mutex_);
system_metrics_.clear();
Expand Down Expand Up @@ -238,6 +242,10 @@ std::vector<SharedResponseNode> ResponseNodeLoader::loadResponseNodes(const std:
}

for (const auto& response_node : response_nodes) {
std::lock_guard<std::mutex> guard(initialization_mutex_);
if (initialized_metrics_.contains(response_node->getName())) {
continue;
}
initializeRepositoryMetrics(response_node);
initializeQueueMetrics(response_node);
initializeAgentIdentifier(response_node);
Expand All @@ -247,6 +255,7 @@ std::vector<SharedResponseNode> ResponseNodeLoader::loadResponseNodes(const std:
initializeConfigurationChecksums(response_node);
initializeFlowMonitor(response_node);
initializeAssetInformation(response_node);
initialized_metrics_.insert(response_node->getName());
}
return response_nodes;
}
Expand Down
15 changes: 12 additions & 3 deletions libminifi/test/integration/C2MetricsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,26 +128,34 @@ class MetricsHandler: public HeartbeatHandler {

static bool verifyRuntimeMetrics(const rapidjson::Value& runtime_metrics) {
return runtime_metrics.HasMember("deviceInfo") &&
runtime_metrics["deviceInfo"]["systemInfo"].HasMember("operatingSystem") &&
runtime_metrics["deviceInfo"]["networkInfo"].HasMember("hostname") &&
runtime_metrics.HasMember("flowInfo") &&
runtime_metrics["flowInfo"].HasMember("versionedFlowSnapshotURI") &&
runtime_metrics["flowInfo"].HasMember("queues") &&
runtime_metrics["flowInfo"].HasMember("components") &&
runtime_metrics["flowInfo"]["queues"].HasMember("2438e3c8-015a-1000-79ca-83af40ec1997") &&
runtime_metrics["flowInfo"]["components"].HasMember("FlowController") &&
runtime_metrics["flowInfo"]["components"].HasMember("GetTCP") &&
runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute");
runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute") &&
runtime_metrics.HasMember("agentInfo") &&
runtime_metrics["agentInfo"]["status"]["repositories"]["ff"].HasMember("size");
}

static bool verifyUpdatedRuntimeMetrics(const rapidjson::Value& runtime_metrics) {
return runtime_metrics.HasMember("deviceInfo") &&
runtime_metrics["deviceInfo"]["systemInfo"].HasMember("operatingSystem") &&
runtime_metrics["deviceInfo"]["networkInfo"].HasMember("hostname") &&
runtime_metrics.HasMember("flowInfo") &&
runtime_metrics["flowInfo"].HasMember("versionedFlowSnapshotURI") &&
runtime_metrics["flowInfo"].HasMember("queues") &&
runtime_metrics["flowInfo"].HasMember("components") &&
runtime_metrics["flowInfo"]["queues"].HasMember("8368e3c8-015a-1003-52ca-83af40ec1332") &&
runtime_metrics["flowInfo"]["components"].HasMember("FlowController") &&
runtime_metrics["flowInfo"]["components"].HasMember("GenerateFlowFile") &&
runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute");
runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute") &&
runtime_metrics.HasMember("agentInfo") &&
runtime_metrics["agentInfo"]["status"]["repositories"]["ff"].HasMember("size");
}

static bool verifyLoadMetrics(const rapidjson::Value& load_metrics) {
Expand Down Expand Up @@ -186,11 +194,12 @@ class MetricsHandler: public HeartbeatHandler {
TEST_CASE("C2MetricsTest", "[c2test]") {
std::atomic_bool metrics_updated_successfully{false};
VerifyC2Metrics harness(metrics_updated_successfully);
harness.getConfiguration()->set("nifi.c2.root.classes", "FlowInformation,AgentInformation");
harness.getConfiguration()->set("nifi.c2.root.class.definitions", "metrics");
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.name", "metrics");
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics", "runtimemetrics,loadmetrics,processorMetrics");
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.runtimemetrics.name", "RuntimeMetrics");
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.runtimemetrics.classes", "DeviceInfoNode,FlowInformation");
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.runtimemetrics.classes", "DeviceInfoNode,FlowInformation,AssetInformation,DeviceInfoNode,AgentInformation");
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.loadmetrics.name", "LoadMetrics");
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.loadmetrics.classes", "QueueMetrics,RepositoryMetrics");
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.name", "ProcessorMetrics");
Expand Down
16 changes: 9 additions & 7 deletions libminifi/test/unit/LogMetricsPublisherTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify multiple metric nodes in logs"
publisher_.initialize(configuration_, response_node_loader_);
publisher_.loadMetricNodes();
using org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime;
std::string expected_log = R"([info] {
"LogMetrics": {
"RepositoryMetrics": {
std::string expected_log_1 = R"([info] {
"LogMetrics": {)";
std::string expected_log_2 = R"("RepositoryMetrics": {
"provenancerepository": {
"running": "false",
"full": "false",
Expand All @@ -117,10 +117,12 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify multiple metric nodes in logs"
"rocksDbTableReadersSize": "0",
"rocksDbAllMemoryTablesSize": "2048"
}
},
"deviceInfo": {
})";
std::string expected_log_3 = R"("deviceInfo": {
"identifier":)";
REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log));
REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log_1));
REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log_2));
REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log_3));
}

TEST_CASE_METHOD(LogPublisherTestFixture, "Verify reloading different metrics", "[LogMetricsPublisher]") {
Expand Down Expand Up @@ -218,7 +220,7 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify changing log level property fo
publisher_.initialize(configuration_, response_node_loader_);
publisher_.loadMetricNodes();
using org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime;
std::string expected_log = R"([debug] {
std::string expected_log = R"([info] {
"LogMetrics": {
"RepositoryMetrics": {
"provenancerepository": {
Expand Down

0 comments on commit 4322d94

Please sign in to comment.