From 43dae9b9e6bc4b6c7cf11806d482129d279aa344 Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Tue, 10 Dec 2024 15:38:25 +0100 Subject: [PATCH] add new metrics --- libminifi/include/core/ProcessorMetrics.h | 7 +++- .../core/state/nodes/FlowInformation.h | 42 ++++++++----------- .../core/state/nodes/ResponseNodeLoader.h | 2 +- libminifi/src/core/Processor.cpp | 2 +- libminifi/src/core/ProcessorMetrics.cpp | 4 +- .../src/core/state/nodes/FlowInformation.cpp | 32 ++++++++++++++ .../core/state/nodes/ResponseNodeLoader.cpp | 20 +++++---- 7 files changed, 72 insertions(+), 37 deletions(-) diff --git a/libminifi/include/core/ProcessorMetrics.h b/libminifi/include/core/ProcessorMetrics.h index de5c3d157e..4a2fa1815d 100644 --- a/libminifi/include/core/ProcessorMetrics.h +++ b/libminifi/include/core/ProcessorMetrics.h @@ -53,9 +53,14 @@ class ProcessorMetrics : public state::response::ResponseNode { std::chrono::milliseconds getLastSessionCommitRuntime() const; void addLastSessionCommitRuntime(std::chrono::milliseconds runtime); - std::atomic iterations{0}; + std::atomic invocations{0}; + std::atomic incoming_flow_files{0}; std::atomic transferred_flow_files{0}; + std::atomic incoming_bytes{0}; std::atomic transferred_bytes{0}; + std::atomic bytes_read{0}; + std::atomic bytes_written{0}; + std::atomic processing_nanos{0}; protected: template diff --git a/libminifi/include/core/state/nodes/FlowInformation.h b/libminifi/include/core/state/nodes/FlowInformation.h index a605bd8b25..5bfc257e14 100644 --- a/libminifi/include/core/state/nodes/FlowInformation.h +++ b/libminifi/include/core/state/nodes/FlowInformation.h @@ -26,6 +26,7 @@ #include "core/state/nodes/StateMonitor.h" #include "Connection.h" #include "core/state/ConnectionStore.h" +#include "core/Processor.h" namespace org::apache::nifi::minifi::state::response { @@ -92,16 +93,22 @@ class FlowVersion : public DeviceInformation { std::shared_ptr identifier; }; -class FlowMonitor : public StateMonitorNode { +class FlowInformation : public StateMonitorNode { public: - FlowMonitor(std::string_view name, const utils::Identifier &uuid) + FlowInformation(std::string_view name, const utils::Identifier &uuid) : StateMonitorNode(name, uuid) { } - explicit FlowMonitor(std::string_view name) + explicit FlowInformation(std::string_view name) : StateMonitorNode(name) { } + MINIFIAPI static constexpr const char* Description = "Metric node that defines the flow ID and flow URL deployed to this agent"; + + std::string getName() const override { + return "flowInfo"; + } + void setFlowVersion(std::shared_ptr flow_version) { flow_version_ = std::move(flow_version); } @@ -110,32 +117,17 @@ class FlowMonitor : public StateMonitorNode { connection_store_.updateConnection(connection); } - protected: - std::shared_ptr flow_version_; - ConnectionStore connection_store_; -}; - -/** - * Justification and Purpose: Provides flow version Information - */ -class FlowInformation : public FlowMonitor { - public: - FlowInformation(std::string_view name, const utils::Identifier &uuid) - : FlowMonitor(name, uuid) { - } - - explicit FlowInformation(std::string_view name) - : FlowMonitor(name) { - } - - MINIFIAPI static constexpr const char* Description = "Metric node that defines the flow ID and flow URL deployed to this agent"; - - std::string getName() const override { - return "flowInfo"; + void setProcessors(std::vector processors) { + processors_ = std::move(processors); } std::vector serialize() override; std::vector calculateMetrics() override; + + private: + std::shared_ptr flow_version_; + ConnectionStore connection_store_; + std::vector processors_; }; } // namespace org::apache::nifi::minifi::state::response diff --git a/libminifi/include/core/state/nodes/ResponseNodeLoader.h b/libminifi/include/core/state/nodes/ResponseNodeLoader.h index 9eb55f413e..284496c0a3 100644 --- a/libminifi/include/core/state/nodes/ResponseNodeLoader.h +++ b/libminifi/include/core/state/nodes/ResponseNodeLoader.h @@ -62,7 +62,7 @@ class ResponseNodeLoader { void initializeAgentNode(const SharedResponseNode& response_node) const; void initializeAgentStatus(const SharedResponseNode& response_node) const; void initializeConfigurationChecksums(const SharedResponseNode& response_node) const; - void initializeFlowMonitor(const SharedResponseNode& response_node) const; + void initializeFlowInformation(const SharedResponseNode& response_node) const; void initializeAssetInformation(const SharedResponseNode& response_node) const; std::vector getMatchingComponentMetricsNodes(const std::string& regex_str) const; diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp index b5cf616e23..77b8d3fa1d 100644 --- a/libminifi/src/core/Processor.cpp +++ b/libminifi/src/core/Processor.cpp @@ -198,7 +198,7 @@ void Processor::triggerAndCommit(const std::shared_ptr& context, } void Processor::trigger(const std::shared_ptr& context, const std::shared_ptr& process_session) { - ++metrics_->iterations; + ++metrics_->invocations; const auto start = std::chrono::steady_clock::now(); onTrigger(*context, *process_session); metrics_->addLastOnTriggerRuntime(std::chrono::duration_cast(std::chrono::steady_clock::now() - start)); diff --git a/libminifi/src/core/ProcessorMetrics.cpp b/libminifi/src/core/ProcessorMetrics.cpp index 8cd0548883..8b199872f7 100644 --- a/libminifi/src/core/ProcessorMetrics.cpp +++ b/libminifi/src/core/ProcessorMetrics.cpp @@ -44,7 +44,7 @@ std::vector ProcessorMetrics::serialize state::response::SerializedResponseNode root_node { .name = source_processor_.getUUIDStr(), .children = { - {.name = "OnTriggerInvocations", .value = static_cast(iterations.load())}, + {.name = "OnTriggerInvocations", .value = static_cast(invocations.load())}, {.name = "AverageOnTriggerRunTime", .value = static_cast(getAverageOnTriggerRuntime().count())}, {.name = "LastOnTriggerRunTime", .value = static_cast(getLastOnTriggerRuntime().count())}, {.name = "AverageSessionCommitRunTime", .value = static_cast(getAverageSessionCommitRuntime().count())}, @@ -73,7 +73,7 @@ std::vector ProcessorMetrics::serialize std::vector ProcessorMetrics::calculateMetrics() { std::vector metrics = { - {"onTrigger_invocations", static_cast(iterations.load()), getCommonLabels()}, + {"onTrigger_invocations", static_cast(invocations.load()), getCommonLabels()}, {"average_onTrigger_runtime_milliseconds", static_cast(getAverageOnTriggerRuntime().count()), getCommonLabels()}, {"last_onTrigger_runtime_milliseconds", static_cast(getLastOnTriggerRuntime().count()), getCommonLabels()}, {"average_session_commit_runtime_milliseconds", static_cast(getAverageSessionCommitRuntime().count()), getCommonLabels()}, diff --git a/libminifi/src/core/state/nodes/FlowInformation.cpp b/libminifi/src/core/state/nodes/FlowInformation.cpp index 5756072acf..350f7cfd66 100644 --- a/libminifi/src/core/state/nodes/FlowInformation.cpp +++ b/libminifi/src/core/state/nodes/FlowInformation.cpp @@ -76,6 +76,36 @@ std::vector FlowInformation::serialize() { serialized.push_back(componentsNode); } + if (!processors_.empty()) { + SerializedResponseNode processorsStatusesNode{.name = "processorStatuses", .array = true, .collapsible = false}; + for (const auto processor : processors_) { + if (!processor) { + continue; + } + + auto metrics = processor->getMetrics(); + processorsStatusesNode.children.push_back({ + .name = processor->getName(), + .collapsible = false, + .children = { + {.name = "id", .value = std::string{processor->getUUIDStr()}}, + // {.name = "groupId", .value = std::string{processor->getUUIDStr()}}, + {.name = "bytesRead", .value = metrics->bytes_read.load()}, + {.name = "bytesWritten", .value = metrics->bytes_written.load()}, + {.name = "flowFilesIn", .value = metrics->incoming_flow_files.load()}, + {.name = "flowFilesOut", .value = metrics->transferred_flow_files.load()}, + {.name = "bytesIn", .value = metrics->incoming_bytes.load()}, + {.name = "bytesOut", .value = metrics->transferred_bytes.load()}, + {.name = "invocations", .value = metrics->invocations.load()}, + {.name = "processingNanos", .value = metrics->processing_nanos.load()}, + {.name = "activeThreadCount", .value = -1}, + {.name = "terminatedThreadCount", .value = -1} + } + }); + } + serialized.push_back(processorsStatusesNode); + } + return serialized; } @@ -88,6 +118,8 @@ std::vector FlowInformation::calculateMetrics() { {{"component_uuid", component.getComponentUUID().to_string()}, {"component_name", component.getComponentName()}, {"metric_class", "FlowInformation"}}}); }); } + + // TODO add processor metrics return metrics; } diff --git a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp index 68b74da48d..fa50f4e73a 100644 --- a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp +++ b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp @@ -209,9 +209,9 @@ void ResponseNodeLoader::initializeAssetInformation(const SharedResponseNode& re } } -void ResponseNodeLoader::initializeFlowMonitor(const SharedResponseNode& response_node) const { - auto flowMonitor = dynamic_cast(response_node.get()); - if (flowMonitor == nullptr) { +void ResponseNodeLoader::initializeFlowInformation(const SharedResponseNode& response_node) const { + auto flow_information = dynamic_cast(response_node.get()); + if (flow_information == nullptr) { return; } @@ -222,11 +222,17 @@ void ResponseNodeLoader::initializeFlowMonitor(const SharedResponseNode& respons } for (auto &con : connections) { - flowMonitor->updateConnection(con.second); + flow_information->updateConnection(con.second); } - flowMonitor->setStateMonitor(update_sink_); + flow_information->setStateMonitor(update_sink_); if (flow_configuration_) { - flowMonitor->setFlowVersion(flow_configuration_->getFlowVersion()); + flow_information->setFlowVersion(flow_configuration_->getFlowVersion()); + } + + if (root_) { + std::vector processors; + root_->getAllProcessors(processors); + flow_information->setProcessors(processors); } } @@ -245,7 +251,7 @@ std::vector ResponseNodeLoader::loadResponseNodes(const std: initializeAgentNode(response_node); initializeAgentStatus(response_node); initializeConfigurationChecksums(response_node); - initializeFlowMonitor(response_node); + initializeFlowInformation(response_node); initializeAssetInformation(response_node); } return response_nodes;