Skip to content

Commit

Permalink
implement new metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
lordgamez committed Dec 11, 2024
1 parent 93203b0 commit ec5d63d
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 15 deletions.
7 changes: 6 additions & 1 deletion libminifi/include/io/StreamCallback.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@ namespace org::apache::nifi::minifi::io {
class InputStream;
class OutputStream;

struct ReadWriteResult {
int64_t bytes_written = 0;
int64_t bytes_read = 0;
};

// FlowFile IO Callback functions for input and output
// throw exception for error
using InputStreamCallback = std::function<int64_t(const std::shared_ptr<InputStream>& input_stream)>;
using OutputStreamCallback = std::function<int64_t(const std::shared_ptr<OutputStream>& output_stream)>;
using InputOutputStreamCallback = std::function<int64_t(const std::shared_ptr<InputStream>& input_stream, const std::shared_ptr<OutputStream>& output_stream)>;
using InputOutputStreamCallback = std::function<ReadWriteResult(const std::shared_ptr<InputStream>& input_stream, const std::shared_ptr<OutputStream>& output_stream)>;

} // namespace org::apache::nifi::minifi::io
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class LineByLineInputOutputStreamCallback {
public:
using CallbackType = std::function<std::string(const std::string& input_line, bool is_first_line, bool is_last_line)>;
explicit LineByLineInputOutputStreamCallback(CallbackType callback);
int64_t operator()(const std::shared_ptr<io::InputStream>& input, const std::shared_ptr<io::OutputStream>& output);
io::ReadWriteResult operator()(const std::shared_ptr<io::InputStream>& input, const std::shared_ptr<io::OutputStream>& output);

private:
int64_t readInput(io::InputStream& stream);
Expand Down
43 changes: 36 additions & 7 deletions libminifi/src/core/ProcessSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,9 @@ void ProcessSession::write(core::FlowFile &flow, const io::OutputStreamCallback&
std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flow.getUUIDStr();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time);
provenance_report_->modifyContent(flow, details, duration);
if (metrics_) {
metrics_->bytes_written += stream->size();
}
} catch (const std::exception& exception) {
logger_->log_debug("Caught Exception during process session write, type: {}, what: {}", typeid(exception).name(), exception.what());
throw;
Expand All @@ -280,6 +283,7 @@ void ProcessSession::write(core::FlowFile &flow, const io::OutputStreamCallback&
void ProcessSession::writeBuffer(const std::shared_ptr<core::FlowFile>& flow_file, std::span<const char> buffer) {
writeBuffer(flow_file, as_bytes(buffer));
}

void ProcessSession::writeBuffer(const std::shared_ptr<core::FlowFile>& flow_file, std::span<const std::byte> buffer) {
write(flow_file, [buffer](const std::shared_ptr<io::OutputStream>& output_stream) {
const auto write_status = output_stream->write(buffer);
Expand Down Expand Up @@ -316,6 +320,9 @@ void ProcessSession::append(const std::shared_ptr<core::FlowFile> &flow, const i
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content");
}
flow->setSize(flow_file_size + (stream->size() - stream_size_before_callback));
if (metrics_) {
metrics_->bytes_written += stream->size() - stream_size_before_callback;
}

std::stringstream details;
details << process_context_->getProcessorNode()->getName() << " modify flow record content " << flow->getUUIDStr();
Expand Down Expand Up @@ -373,6 +380,9 @@ int64_t ProcessSession::read(const core::FlowFile& flow_file, const io::InputStr
if (ret < 0) {
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content");
}
if (metrics_) {
metrics_->bytes_read += ret;
}
return ret;
} catch (const std::exception& exception) {
logger_->log_debug("Caught Exception {}", exception.what());
Expand All @@ -383,7 +393,6 @@ int64_t ProcessSession::read(const core::FlowFile& flow_file, const io::InputStr
}
}


int64_t ProcessSession::readWrite(const std::shared_ptr<core::FlowFile> &flow, const io::InputOutputStreamCallback& callback) {
gsl_Expects(callback);

Expand All @@ -409,19 +418,23 @@ int64_t ProcessSession::readWrite(const std::shared_ptr<core::FlowFile> &flow, c
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for write");
}

int64_t bytes_written = callback(input_stream, output_stream);
if (bytes_written < 0) {
auto read_write_result = callback(input_stream, output_stream);
if (read_write_result.bytes_written < 0) {
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content");
}

input_stream->close();
output_stream->close();

flow->setSize(gsl::narrow<uint64_t>(bytes_written));
flow->setSize(gsl::narrow<uint64_t>(read_write_result.bytes_written));
flow->setOffset(0);
flow->setResourceClaim(output_claim);
if (metrics_) {
metrics_->bytes_written += read_write_result.bytes_written;
metrics_->bytes_read += read_write_result.bytes_read;
}

return bytes_written;
return read_write_result.bytes_written;
} catch (const std::exception& exception) {
logger_->log_debug("Caught exception during process session readWrite, type: {}, what: {}", typeid(exception).name(), exception.what());
throw;
Expand Down Expand Up @@ -486,6 +499,9 @@ void ProcessSession::importFrom(io::InputStream &stream, const std::shared_ptr<c
flow->getOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath(), flow->getUUIDStr());

content_stream->close();
if (metrics_) {
metrics_->bytes_written += content_stream->size();
}
std::stringstream details;
details << process_context_->getProcessorNode()->getName() << " modify flow record content " << flow->getUUIDStr();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time);
Expand Down Expand Up @@ -547,6 +563,9 @@ void ProcessSession::import(const std::string& source, const std::shared_ptr<Flo
flow->getUUIDStr());

stream->close();
if (metrics_) {
metrics_->bytes_written += stream->size();
}
input.close();
if (!keepSource) {
(void)std::remove(source.c_str());
Expand Down Expand Up @@ -649,6 +668,9 @@ void ProcessSession::import(const std::string& source, std::vector<std::shared_p
logger_->log_debug("Import offset {} length {} into content {}, FlowFile UUID {}",
flowFile->getOffset(), flowFile->getSize(), flowFile->getResourceClaim()->getContentFullPath(), flowFile->getUUIDStr());
stream->close();
if (metrics_) {
metrics_->bytes_written += stream->size();
}
std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time);
provenance_report_->modifyContent(*flowFile, details, duration);
Expand Down Expand Up @@ -930,8 +952,11 @@ void ProcessSession::commit() {
// persistent the provenance report
this->provenance_report_->commit();
logger_->log_debug("ProcessSession committed for {}", process_context_->getProcessorNode()->getName());
if (metrics_)
metrics_->addLastSessionCommitRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - commit_start_time));
if (metrics_) {
auto time_delta = std::chrono::steady_clock::now() - commit_start_time;
metrics_->addLastSessionCommitRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(time_delta));
metrics_->processing_nanos += std::chrono::duration_cast<std::chrono::nanoseconds>(time_delta).count();
}
} catch (const std::exception& exception) {
logger_->log_debug("Caught Exception during process session commit, type: {}, what: {}", typeid(exception).name(), exception.what());
throw;
Expand Down Expand Up @@ -1141,6 +1166,10 @@ std::shared_ptr<core::FlowFile> ProcessSession::get() {
if (flow_version != nullptr) {
ret->setAttribute(SpecialFlowAttribute::FLOW_ID, flow_version->getFlowId());
}
if (metrics_) {
metrics_->incoming_bytes += ret->getSize();
++metrics_->incoming_flow_files;
}
return ret;
}
current = dynamic_cast<Connection*>(process_context_->getProcessorNode()->pickIncomingConnection());
Expand Down
21 changes: 15 additions & 6 deletions libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,36 @@ LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(Callbac
: callback_(std::move(callback)) {
}

int64_t LineByLineInputOutputStreamCallback::operator()(const std::shared_ptr<io::InputStream>& input, const std::shared_ptr<io::OutputStream>& output) {
io::ReadWriteResult LineByLineInputOutputStreamCallback::operator()(const std::shared_ptr<io::InputStream>& input, const std::shared_ptr<io::OutputStream>& output) {
gsl_Expects(input);
gsl_Expects(output);

io::ReadWriteResult result;

if (int64_t status = readInput(*input); status <= 0) {
return status;
result.bytes_read = status;
return result;
}

std::size_t total_bytes_written_ = 0;
result.bytes_read = gsl::narrow<int64_t>(input_.size());

std::size_t total_bytes_written = 0;
bool is_first_line = true;
readLine();
do {
readLine();
std::string output_line = callback_(*current_line_, is_first_line, isLastLine());
const auto bytes_written = output->write(reinterpret_cast<const uint8_t *>(output_line.data()), output_line.size());
if (io::isError(bytes_written)) { return -1; }
total_bytes_written_ += bytes_written;
if (io::isError(bytes_written)) {
result.bytes_written = bytes_written;
return result;
}
total_bytes_written += bytes_written;
is_first_line = false;
} while (!isLastLine());

return gsl::narrow<int64_t>(total_bytes_written_);
result.bytes_written = gsl::narrow<int64_t>(total_bytes_written);
return result;
}

int64_t LineByLineInputOutputStreamCallback::readInput(io::InputStream& stream) {
Expand Down

0 comments on commit ec5d63d

Please sign in to comment.