This is an automated email from the ASF dual-hosted git repository. fgerlits pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 7888ca4312ba7cb63163b12b0e07673c29ef4a56 Author: Adam Debreceni <adebrec...@apache.org> AuthorDate: Wed Nov 9 11:45:54 2022 +0100 MINIFICPP-1978 - Flush MergeContent bundles when its size would grow beyond max group size Signed-off-by: Ferenc Gerlits <fgerl...@gmail.com> This closes #1449 --- extensions/libarchive/BinFiles.h | 7 ++- libminifi/test/archive-tests/MergeFileTests.cpp | 66 +++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 2 deletions(-) diff --git a/extensions/libarchive/BinFiles.h b/extensions/libarchive/BinFiles.h index 71cece114..dcc55974f 100644 --- a/extensions/libarchive/BinFiles.h +++ b/extensions/libarchive/BinFiles.h @@ -63,7 +63,7 @@ class Bin { } // check whether the bin meet the min required size and entries so that it can be processed for merge [[nodiscard]] bool isReadyForMerge() const { - return isFull() || (queued_data_size_ >= minSize_ && queue_.size() >= minEntries_); + return closed_ || isFull() || (queued_data_size_ >= minSize_ && queue_.size() >= minEntries_); } // check whether the bin is older than the time specified in msec [[nodiscard]] bool isOlderThan(const std::chrono::milliseconds duration) const { @@ -87,8 +87,10 @@ class Bin { } } - if ((queued_data_size_ + flow->getSize()) > maxSize_ || (queue_.size() + 1) > maxEntries_) + if ((queued_data_size_ + flow->getSize()) > maxSize_ || (queue_.size() + 1) > maxEntries_) { + closed_ = true; return false; + } queue_.push_back(flow); queued_data_size_ += flow->getSize(); @@ -119,6 +121,7 @@ class Bin { size_t minEntries_; // Queued data size uint64_t queued_data_size_; + bool closed_{false}; // Queue for the Flow File std::deque<std::shared_ptr<core::FlowFile>> queue_; std::chrono::system_clock::time_point creation_dated_; diff --git a/libminifi/test/archive-tests/MergeFileTests.cpp b/libminifi/test/archive-tests/MergeFileTests.cpp index d3bb0c1c0..4152dc8de 100644 --- a/libminifi/test/archive-tests/MergeFileTests.cpp +++ b/libminifi/test/archive-tests/MergeFileTests.cpp @@ -822,3 +822,69 @@ TEST_CASE_METHOD(MergeTestController, "Batch Size", "[testMergeFileBatchSize]") REQUIRE(callback.to_string() == expected[1]); } } + +TEST_CASE_METHOD(MergeTestController, "Maximum Group Size is respected", "[testMergeFileMaximumGroupSize]") { + // each flowfile content is 32 bytes + for (auto& ff : flowFileContents_) { + REQUIRE(ff.length() == 32); + } + std::string expected[2]{ + flowFileContents_[0] + flowFileContents_[1], + flowFileContents_[2] + flowFileContents_[3] + }; + + context_->setProperty(minifi::processors::MergeContent::MergeFormat, minifi::processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE); + context_->setProperty(minifi::processors::MergeContent::MergeStrategy, minifi::processors::merge_content_options::MERGE_STRATEGY_BIN_PACK); + context_->setProperty(minifi::processors::MergeContent::DelimiterStrategy, minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT); + context_->setProperty(minifi::processors::BinFiles::BatchSize, "1000"); + + // we want a bit more than 2 flowfiles + context_->setProperty(minifi::processors::BinFiles::MinSize, "65"); + context_->setProperty(minifi::processors::BinFiles::MaxSize, "65"); + + context_->setProperty(minifi::processors::BinFiles::MinEntries, "3"); + context_->setProperty(minifi::processors::BinFiles::MaxEntries, "3"); + + core::ProcessSession sessionGenFlowFile(context_); + // enqueue 6 (six) flowFiles + for (const int i : {0, 1, 2, 3, 4, 5}) { + const auto flow = sessionGenFlowFile.create(); + sessionGenFlowFile.writeBuffer(flow, flowFileContents_[i]); + sessionGenFlowFile.flushContent(); + input_->put(flow); + } + + REQUIRE(merge_content_processor_->getName() == "mergecontent"); + auto factory = std::make_shared<core::ProcessSessionFactory>(context_); + merge_content_processor_->onSchedule(context_, factory); + // a single trigger is enough to process all five flowFiles + { + auto session = std::make_shared<core::ProcessSession>(context_); + merge_content_processor_->onTrigger(context_, session); + session->commit(); + } + // validate the merge content + std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords; + std::shared_ptr<core::FlowFile> flow1 = output_->poll(expiredFlowRecords); + REQUIRE(expiredFlowRecords.empty()); + REQUIRE(flow1); + { + FixedBuffer callback(gsl::narrow<size_t>(flow1->getSize())); + sessionGenFlowFile.read(flow1, std::ref(callback)); + REQUIRE(callback.to_string() == expected[0]); + } + + std::shared_ptr<core::FlowFile> flow2 = output_->poll(expiredFlowRecords); + REQUIRE(expiredFlowRecords.empty()); + REQUIRE(flow2); + { + FixedBuffer callback(gsl::narrow<size_t>(flow2->getSize())); + sessionGenFlowFile.read(flow2, std::ref(callback)); + REQUIRE(callback.to_string() == expected[1]); + } + + // no more flowfiles + std::shared_ptr<core::FlowFile> flow3 = output_->poll(expiredFlowRecords); + REQUIRE(expiredFlowRecords.empty()); + REQUIRE_FALSE(flow3); +}