Skip to content

Commit

Permalink
add new metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
lordgamez committed Dec 10, 2024
1 parent 25882c1 commit 43dae9b
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 37 deletions.
7 changes: 6 additions & 1 deletion libminifi/include/core/ProcessorMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,14 @@ class ProcessorMetrics : public state::response::ResponseNode {
std::chrono::milliseconds getLastSessionCommitRuntime() const;
void addLastSessionCommitRuntime(std::chrono::milliseconds runtime);

std::atomic<size_t> iterations{0};
std::atomic<size_t> invocations{0};
std::atomic<size_t> incoming_flow_files{0};
std::atomic<size_t> transferred_flow_files{0};
std::atomic<uint64_t> incoming_bytes{0};
std::atomic<uint64_t> transferred_bytes{0};
std::atomic<uint64_t> bytes_read{0};
std::atomic<uint64_t> bytes_written{0};
std::atomic<uint64_t> processing_nanos{0};

protected:
template<typename ValueType>
Expand Down
42 changes: 17 additions & 25 deletions libminifi/include/core/state/nodes/FlowInformation.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "core/state/nodes/StateMonitor.h"
#include "Connection.h"
#include "core/state/ConnectionStore.h"
#include "core/Processor.h"

namespace org::apache::nifi::minifi::state::response {

Expand Down Expand Up @@ -92,16 +93,22 @@ class FlowVersion : public DeviceInformation {
std::shared_ptr<FlowIdentifier> identifier;
};

class FlowMonitor : public StateMonitorNode {
class FlowInformation : public StateMonitorNode {
public:
FlowMonitor(std::string_view name, const utils::Identifier &uuid)
FlowInformation(std::string_view name, const utils::Identifier &uuid)
: StateMonitorNode(name, uuid) {
}

explicit FlowMonitor(std::string_view name)
explicit FlowInformation(std::string_view name)
: StateMonitorNode(name) {
}

MINIFIAPI static constexpr const char* Description = "Metric node that defines the flow ID and flow URL deployed to this agent";

std::string getName() const override {
return "flowInfo";
}

void setFlowVersion(std::shared_ptr<state::response::FlowVersion> flow_version) {
flow_version_ = std::move(flow_version);
}
Expand All @@ -110,32 +117,17 @@ class FlowMonitor : public StateMonitorNode {
connection_store_.updateConnection(connection);
}

protected:
std::shared_ptr<state::response::FlowVersion> flow_version_;
ConnectionStore connection_store_;
};

/**
* Justification and Purpose: Provides flow version Information
*/
class FlowInformation : public FlowMonitor {
public:
FlowInformation(std::string_view name, const utils::Identifier &uuid)
: FlowMonitor(name, uuid) {
}

explicit FlowInformation(std::string_view name)
: FlowMonitor(name) {
}

MINIFIAPI static constexpr const char* Description = "Metric node that defines the flow ID and flow URL deployed to this agent";

std::string getName() const override {
return "flowInfo";
void setProcessors(std::vector<core::Processor*> processors) {
processors_ = std::move(processors);
}

std::vector<SerializedResponseNode> serialize() override;
std::vector<PublishedMetric> calculateMetrics() override;

private:
std::shared_ptr<state::response::FlowVersion> flow_version_;
ConnectionStore connection_store_;
std::vector<core::Processor*> processors_;
};

} // namespace org::apache::nifi::minifi::state::response
2 changes: 1 addition & 1 deletion libminifi/include/core/state/nodes/ResponseNodeLoader.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class ResponseNodeLoader {
void initializeAgentNode(const SharedResponseNode& response_node) const;
void initializeAgentStatus(const SharedResponseNode& response_node) const;
void initializeConfigurationChecksums(const SharedResponseNode& response_node) const;
void initializeFlowMonitor(const SharedResponseNode& response_node) const;
void initializeFlowInformation(const SharedResponseNode& response_node) const;
void initializeAssetInformation(const SharedResponseNode& response_node) const;
std::vector<SharedResponseNode> getMatchingComponentMetricsNodes(const std::string& regex_str) const;

Expand Down
2 changes: 1 addition & 1 deletion libminifi/src/core/Processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ void Processor::triggerAndCommit(const std::shared_ptr<ProcessContext>& context,
}

void Processor::trigger(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSession>& process_session) {
++metrics_->iterations;
++metrics_->invocations;
const auto start = std::chrono::steady_clock::now();
onTrigger(*context, *process_session);
metrics_->addLastOnTriggerRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start));
Expand Down
4 changes: 2 additions & 2 deletions libminifi/src/core/ProcessorMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ std::vector<state::response::SerializedResponseNode> ProcessorMetrics::serialize
state::response::SerializedResponseNode root_node {
.name = source_processor_.getUUIDStr(),
.children = {
{.name = "OnTriggerInvocations", .value = static_cast<uint32_t>(iterations.load())},
{.name = "OnTriggerInvocations", .value = static_cast<uint32_t>(invocations.load())},
{.name = "AverageOnTriggerRunTime", .value = static_cast<uint64_t>(getAverageOnTriggerRuntime().count())},
{.name = "LastOnTriggerRunTime", .value = static_cast<uint64_t>(getLastOnTriggerRuntime().count())},
{.name = "AverageSessionCommitRunTime", .value = static_cast<uint64_t>(getAverageSessionCommitRuntime().count())},
Expand Down Expand Up @@ -73,7 +73,7 @@ std::vector<state::response::SerializedResponseNode> ProcessorMetrics::serialize

std::vector<state::PublishedMetric> ProcessorMetrics::calculateMetrics() {
std::vector<state::PublishedMetric> metrics = {
{"onTrigger_invocations", static_cast<double>(iterations.load()), getCommonLabels()},
{"onTrigger_invocations", static_cast<double>(invocations.load()), getCommonLabels()},
{"average_onTrigger_runtime_milliseconds", static_cast<double>(getAverageOnTriggerRuntime().count()), getCommonLabels()},
{"last_onTrigger_runtime_milliseconds", static_cast<double>(getLastOnTriggerRuntime().count()), getCommonLabels()},
{"average_session_commit_runtime_milliseconds", static_cast<double>(getAverageSessionCommitRuntime().count()), getCommonLabels()},
Expand Down
32 changes: 32 additions & 0 deletions libminifi/src/core/state/nodes/FlowInformation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,36 @@ std::vector<SerializedResponseNode> FlowInformation::serialize() {
serialized.push_back(componentsNode);
}

if (!processors_.empty()) {
SerializedResponseNode processorsStatusesNode{.name = "processorStatuses", .array = true, .collapsible = false};
for (const auto processor : processors_) {
if (!processor) {
continue;
}

auto metrics = processor->getMetrics();
processorsStatusesNode.children.push_back({
.name = processor->getName(),
.collapsible = false,
.children = {
{.name = "id", .value = std::string{processor->getUUIDStr()}},
// {.name = "groupId", .value = std::string{processor->getUUIDStr()}},
{.name = "bytesRead", .value = metrics->bytes_read.load()},
{.name = "bytesWritten", .value = metrics->bytes_written.load()},
{.name = "flowFilesIn", .value = metrics->incoming_flow_files.load()},
{.name = "flowFilesOut", .value = metrics->transferred_flow_files.load()},
{.name = "bytesIn", .value = metrics->incoming_bytes.load()},
{.name = "bytesOut", .value = metrics->transferred_bytes.load()},
{.name = "invocations", .value = metrics->invocations.load()},
{.name = "processingNanos", .value = metrics->processing_nanos.load()},
{.name = "activeThreadCount", .value = -1},
{.name = "terminatedThreadCount", .value = -1}
}
});
}
serialized.push_back(processorsStatusesNode);
}

return serialized;
}

Expand All @@ -88,6 +118,8 @@ std::vector<PublishedMetric> FlowInformation::calculateMetrics() {
{{"component_uuid", component.getComponentUUID().to_string()}, {"component_name", component.getComponentName()}, {"metric_class", "FlowInformation"}}});
});
}

// TODO add processor metrics
return metrics;
}

Expand Down
20 changes: 13 additions & 7 deletions libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,9 @@ void ResponseNodeLoader::initializeAssetInformation(const SharedResponseNode& re
}
}

void ResponseNodeLoader::initializeFlowMonitor(const SharedResponseNode& response_node) const {
auto flowMonitor = dynamic_cast<state::response::FlowMonitor*>(response_node.get());
if (flowMonitor == nullptr) {
void ResponseNodeLoader::initializeFlowInformation(const SharedResponseNode& response_node) const {
auto flow_information = dynamic_cast<state::response::FlowInformation*>(response_node.get());
if (flow_information == nullptr) {
return;
}

Expand All @@ -222,11 +222,17 @@ void ResponseNodeLoader::initializeFlowMonitor(const SharedResponseNode& respons
}

for (auto &con : connections) {
flowMonitor->updateConnection(con.second);
flow_information->updateConnection(con.second);
}
flowMonitor->setStateMonitor(update_sink_);
flow_information->setStateMonitor(update_sink_);
if (flow_configuration_) {
flowMonitor->setFlowVersion(flow_configuration_->getFlowVersion());
flow_information->setFlowVersion(flow_configuration_->getFlowVersion());
}

if (root_) {
std::vector<core::Processor*> processors;
root_->getAllProcessors(processors);
flow_information->setProcessors(processors);
}
}

Expand All @@ -245,7 +251,7 @@ std::vector<SharedResponseNode> ResponseNodeLoader::loadResponseNodes(const std:
initializeAgentNode(response_node);
initializeAgentStatus(response_node);
initializeConfigurationChecksums(response_node);
initializeFlowMonitor(response_node);
initializeFlowInformation(response_node);
initializeAssetInformation(response_node);
}
return response_nodes;
Expand Down

0 comments on commit 43dae9b

Please sign in to comment.