[GitHub] [nifi-minifi-cpp] martinzink commented on a change in pull request #1248: MINIFICPP-1702: DefragmentText multiinput improvement
martinzink commented on a change in pull request #1248: URL: https://github.com/apache/nifi-minifi-cpp/pull/1248#discussion_r799641338 ## File path: extensions/standard-processors/processors/DefragmentText.h ## @@ -64,32 +65,48 @@ class DefragmentText : public core::Processor { protected: class Buffer { public: -bool isCompatible(const core::FlowFile& fragment) const; void append(core::ProcessSession* session, const gsl::not_null>& flow_file_to_append); -bool maxSizeReached() const; -bool maxAgeReached() const; -void setMaxAge(std::chrono::milliseconds max_age); -void setMaxSize(size_t max_size); +bool maxSizeReached(const std::optional max_size) const; +bool maxAgeReached(const std::optional max_age) const; void flushAndReplace(core::ProcessSession* session, const core::Relationship& relationship, const std::shared_ptr& new_buffered_flow_file); bool empty() const { return buffered_flow_file_ == nullptr; } +std::optional getNextFragmentOffset() const; private: void store(core::ProcessSession* session, const std::shared_ptr& new_buffered_flow_file); std::shared_ptr buffered_flow_file_; std::chrono::steady_clock::time_point creation_time_; -std::optional max_age_; -std::optional max_size_; }; + class FragmentSource { + public: +class Id { + public: + explicit Id(const core::FlowFile& flow_file); + struct hash { +size_t operator()(const Id& fragment_id) const; + }; + bool operator==(const Id& rhs) const = default; + protected: + std::optional base_name_attribute_; + std::optional post_name_attribute_; +}; + +Buffer buffer_; Review comment: good idea, https://github.com/apache/nifi-minifi-cpp/pull/1248/commits/5fa9ae6f2c1b5f2be1bbcb4a745ba51251ae300c -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] martinzink commented on a change in pull request #1248: MINIFICPP-1702: DefragmentText multiinput improvement
martinzink commented on a change in pull request #1248: URL: https://github.com/apache/nifi-minifi-cpp/pull/1248#discussion_r797743675 ## File path: extensions/standard-processors/processors/DefragmentText.cpp ## @@ -297,29 +297,21 @@ void DefragmentText::Buffer::store(core::ProcessSession* session, const std::sha } } -bool DefragmentText::Buffer::isCompatible(const core::FlowFile& fragment) const { +std::optional DefragmentText::Buffer::getNextFragmentOffset() const { if (empty()) -return true; - if (buffered_flow_file_->getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE) - != fragment.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)) { -return false; - } - if (buffered_flow_file_->getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE) - != fragment.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)) { -return false; - } - std::string current_offset_str, append_offset_str; - if (buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, current_offset_str) - != fragment.getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, append_offset_str)) { -return false; - } - if (!current_offset_str.empty() && !append_offset_str.empty()) { -size_t current_offset = std::stoi(current_offset_str); -size_t append_offset = std::stoi(append_offset_str); -if (current_offset + buffered_flow_file_->getSize() != append_offset) - return false; - } - return true; +return std::nullopt; + if (auto offset_attribute = buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE)) +return std::stoi(*offset_attribute) + buffered_flow_file_->getSize(); + return std::nullopt; +} + +DefragmentText::FragmentSource::Id::Id(const core::FlowFile& flow_file) { + if (auto absolute_path = flow_file.getAttribute(core::SpecialFlowAttribute::ABSOLUTE_PATH)) +absolute_path_ = *absolute_path; +} + +size_t DefragmentText::FragmentSource::Id::hash::operator() (const Id& fragment_id) const { + return std::hash>{}(fragment_id.absolute_path_); } REGISTER_RESOURCE(DefragmentText, "DefragmentText splits and merges incoming flowfiles so cohesive messages are not split between them"); Review comment: https://github.com/apache/nifi-minifi-cpp/pull/1248/commits/beaab09f917fa18929887fd88252fc72d453f62e -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] martinzink commented on a change in pull request #1248: MINIFICPP-1702: DefragmentText multiinput improvement
martinzink commented on a change in pull request #1248: URL: https://github.com/apache/nifi-minifi-cpp/pull/1248#discussion_r797474209 ## File path: extensions/standard-processors/processors/DefragmentText.cpp ## @@ -297,29 +297,34 @@ void DefragmentText::Buffer::store(core::ProcessSession* session, const std::sha } } -bool DefragmentText::Buffer::isCompatible(const core::FlowFile& fragment) const { +std::optional DefragmentText::Buffer::getNextFragmentOffset() const { if (empty()) -return true; - if (buffered_flow_file_->getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE) - != fragment.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)) { -return false; - } - if (buffered_flow_file_->getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE) - != fragment.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)) { -return false; - } - std::string current_offset_str, append_offset_str; - if (buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, current_offset_str) - != fragment.getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, append_offset_str)) { -return false; - } - if (!current_offset_str.empty() && !append_offset_str.empty()) { -size_t current_offset = std::stoi(current_offset_str); -size_t append_offset = std::stoi(append_offset_str); -if (current_offset + buffered_flow_file_->getSize() != append_offset) - return false; - } - return true; +return std::nullopt; + if (auto offset_attribute = buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE)) +return std::stoi(*offset_attribute) + buffered_flow_file_->getSize(); + return std::nullopt; +} + +DefragmentText::FragmentSource::Id::Id(const core::FlowFile& flow_file) { + if (auto base_name_attribute = flow_file.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)) +base_name_attribute_ = *base_name_attribute; + if (auto post_name_attribute = flow_file.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)) +post_name_attribute_ = *post_name_attribute; Review comment: didn't think of that, but you are right `absolute.path` is a good way to differentiate between inputs changed this in https://github.com/apache/nifi-minifi-cpp/pull/1248/commits/098620837edc75bba8ca2494346b889d0c881837 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] martinzink commented on a change in pull request #1248: MINIFICPP-1702: DefragmentText multiinput improvement
martinzink commented on a change in pull request #1248: URL: https://github.com/apache/nifi-minifi-cpp/pull/1248#discussion_r797473068 ## File path: docker/test/integration/features/defragtextflowfiles.feature ## @@ -2,7 +2,34 @@ Feature: DefragmentText can defragment fragmented data from TailFile Background: Given the content of "/tmp/output" is monitored - Scenario Outline: DefragmentText merges split messages from TailFile + Scenario Outline: DefragmentText correctly merges split messages from multiple TailFile +Given a TailFile processor with the name "TailOne" and the "File to Tail" property set to "/tmp/input/test_file_one.log" +And the "Initial Start Position" property of the TailOne processor is set to "Beginning of File" +And the "Input Delimiter" property of the TailOne processor is set to "%" +And a TailFile processor with the name "TailTwo" and the "File to Tail" property set to "/tmp/input/test_file_two.log" +And the "Initial Start Position" property of the TailTwo processor is set to "Beginning of File" +And the "Input Delimiter" property of the TailTwo processor is set to "%" +And "TailTwo" processor is a start node Review comment: good idea, I've added that case as well in https://github.com/apache/nifi-minifi-cpp/pull/1248/commits/098620837edc75bba8ca2494346b889d0c881837 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] martinzink commented on a change in pull request #1248: MINIFICPP-1702: DefragmentText multiinput improvement
martinzink commented on a change in pull request #1248: URL: https://github.com/apache/nifi-minifi-cpp/pull/1248#discussion_r791826948 ## File path: extensions/standard-processors/processors/DefragmentText.cpp ## @@ -297,29 +297,34 @@ void DefragmentText::Buffer::store(core::ProcessSession* session, const std::sha } } -bool DefragmentText::Buffer::isCompatible(const core::FlowFile& fragment) const { +std::optional DefragmentText::Buffer::getNextFragmentOffset() const { if (empty()) -return true; - if (buffered_flow_file_->getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE) - != fragment.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)) { -return false; - } - if (buffered_flow_file_->getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE) - != fragment.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)) { -return false; - } - std::string current_offset_str, append_offset_str; - if (buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, current_offset_str) - != fragment.getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, append_offset_str)) { -return false; - } - if (!current_offset_str.empty() && !append_offset_str.empty()) { -size_t current_offset = std::stoi(current_offset_str); -size_t append_offset = std::stoi(append_offset_str); -if (current_offset + buffered_flow_file_->getSize() != append_offset) - return false; - } - return true; +return std::nullopt; + if (auto offset_attribute = buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE)) +return std::stoi(*offset_attribute) + buffered_flow_file_->getSize(); + return std::nullopt; +} + +DefragmentText::FragmentSource::Id::Id(const core::FlowFile& flow_file) { + if (auto base_name_attribute = flow_file.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)) +base_name_attribute_ = *base_name_attribute; + if (auto post_name_attribute = flow_file.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)) +post_name_attribute_ = *post_name_attribute; +} + +namespace { +template +void hash_combine(size_t& seed, const T& v, Rest... rest) { + std::hash hasher; + seed ^= hasher(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2); + (hash_combine(seed, rest), ...); +} +} Review comment: you are right and I even found invalid indents in the file (e.g. line 325) which should also set off the linter. So I checked but it is indeed running, and if I remove the comment in the last line `} // namespace org::apache::nifi::minifi::processors` it does complain... I also ran the linter directly on this file, and still no errors `python ../thirdparty/google-styleguide/cpplint.py --linelength=200 ../extensions/standard-processors/processors/DefragmentText.cpp` I even tried it with the up-to-date version from https://github.com/cpplint/cpplint/blob/develop/cpplint.py but it still doesnt catch this style violation ## File path: extensions/standard-processors/processors/DefragmentText.cpp ## @@ -297,29 +297,34 @@ void DefragmentText::Buffer::store(core::ProcessSession* session, const std::sha } } -bool DefragmentText::Buffer::isCompatible(const core::FlowFile& fragment) const { +std::optional DefragmentText::Buffer::getNextFragmentOffset() const { if (empty()) -return true; - if (buffered_flow_file_->getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE) - != fragment.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)) { -return false; - } - if (buffered_flow_file_->getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE) - != fragment.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)) { -return false; - } - std::string current_offset_str, append_offset_str; - if (buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, current_offset_str) - != fragment.getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, append_offset_str)) { -return false; - } - if (!current_offset_str.empty() && !append_offset_str.empty()) { -size_t current_offset = std::stoi(current_offset_str); -size_t append_offset = std::stoi(append_offset_str); -if (current_offset + buffered_flow_file_->getSize() != append_offset) - return false; - } - return true; +return std::nullopt; + if (auto offset_attribute = buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE)) +return std::stoi(*offset_attribute) + buffered_flow_file_->getSize(); + return std::nullopt; +} + +DefragmentText::FragmentSource::Id::Id(const core::FlowFile& flow_file) { + if (auto base_name_attribute = flow_file.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)) +base_name_attribute_ = *base_name_attribute; + if (auto post_name_attribute = flow_file.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)) +post_name_attribute_ = *post_name_attribute; +} + +namespace { +template +void hash_combine(size_t& seed, const T& v, Rest... rest) { +
[GitHub] [nifi-minifi-cpp] martinzink commented on a change in pull request #1248: MINIFICPP-1702: DefragmentText multiinput improvement
martinzink commented on a change in pull request #1248: URL: https://github.com/apache/nifi-minifi-cpp/pull/1248#discussion_r791826948 ## File path: extensions/standard-processors/processors/DefragmentText.cpp ## @@ -297,29 +297,34 @@ void DefragmentText::Buffer::store(core::ProcessSession* session, const std::sha } } -bool DefragmentText::Buffer::isCompatible(const core::FlowFile& fragment) const { +std::optional DefragmentText::Buffer::getNextFragmentOffset() const { if (empty()) -return true; - if (buffered_flow_file_->getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE) - != fragment.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)) { -return false; - } - if (buffered_flow_file_->getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE) - != fragment.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)) { -return false; - } - std::string current_offset_str, append_offset_str; - if (buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, current_offset_str) - != fragment.getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, append_offset_str)) { -return false; - } - if (!current_offset_str.empty() && !append_offset_str.empty()) { -size_t current_offset = std::stoi(current_offset_str); -size_t append_offset = std::stoi(append_offset_str); -if (current_offset + buffered_flow_file_->getSize() != append_offset) - return false; - } - return true; +return std::nullopt; + if (auto offset_attribute = buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE)) +return std::stoi(*offset_attribute) + buffered_flow_file_->getSize(); + return std::nullopt; +} + +DefragmentText::FragmentSource::Id::Id(const core::FlowFile& flow_file) { + if (auto base_name_attribute = flow_file.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)) +base_name_attribute_ = *base_name_attribute; + if (auto post_name_attribute = flow_file.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)) +post_name_attribute_ = *post_name_attribute; +} + +namespace { +template +void hash_combine(size_t& seed, const T& v, Rest... rest) { + std::hash hasher; + seed ^= hasher(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2); + (hash_combine(seed, rest), ...); +} +} Review comment: you are right and I even found invalid indents in the file (e.g. line 325) which should also set off the linter(although I'm not sure if that the linter checks that). So I checked but it is indeed running, and if I remove the comment in the last line `} // namespace org::apache::nifi::minifi::processors` it does complain... I also ran the linter directly on this file, and still no errors `python ../thirdparty/google-styleguide/cpplint.py --linelength=200 ../extensions/standard-processors/processors/DefragmentText.cpp` I even tried it with the up-to-date version from https://github.com/cpplint/cpplint/blob/develop/cpplint.py but it still doesnt catch this style violation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] martinzink commented on a change in pull request #1248: MINIFICPP-1702: DefragmentText multiinput improvement
martinzink commented on a change in pull request #1248: URL: https://github.com/apache/nifi-minifi-cpp/pull/1248#discussion_r791826948 ## File path: extensions/standard-processors/processors/DefragmentText.cpp ## @@ -297,29 +297,34 @@ void DefragmentText::Buffer::store(core::ProcessSession* session, const std::sha } } -bool DefragmentText::Buffer::isCompatible(const core::FlowFile& fragment) const { +std::optional DefragmentText::Buffer::getNextFragmentOffset() const { if (empty()) -return true; - if (buffered_flow_file_->getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE) - != fragment.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)) { -return false; - } - if (buffered_flow_file_->getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE) - != fragment.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)) { -return false; - } - std::string current_offset_str, append_offset_str; - if (buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, current_offset_str) - != fragment.getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, append_offset_str)) { -return false; - } - if (!current_offset_str.empty() && !append_offset_str.empty()) { -size_t current_offset = std::stoi(current_offset_str); -size_t append_offset = std::stoi(append_offset_str); -if (current_offset + buffered_flow_file_->getSize() != append_offset) - return false; - } - return true; +return std::nullopt; + if (auto offset_attribute = buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE)) +return std::stoi(*offset_attribute) + buffered_flow_file_->getSize(); + return std::nullopt; +} + +DefragmentText::FragmentSource::Id::Id(const core::FlowFile& flow_file) { + if (auto base_name_attribute = flow_file.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)) +base_name_attribute_ = *base_name_attribute; + if (auto post_name_attribute = flow_file.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)) +post_name_attribute_ = *post_name_attribute; +} + +namespace { +template +void hash_combine(size_t& seed, const T& v, Rest... rest) { + std::hash hasher; + seed ^= hasher(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2); + (hash_combine(seed, rest), ...); +} +} Review comment: you are right and I even found invalid indents in the file (e.g. line 325) which should also set off the linter. So I checked but it is indeed running, and if I remove the comment in the last line `} // namespace org::apache::nifi::minifi::processors` it does complain... I also ran the linter directly on this file, and still no errors `python ../thirdparty/google-styleguide/cpplint.py --linelength=200 ../extensions/standard-processors/processors/DefragmentText.cpp` I even tried it with the up-to-date version from https://github.com/cpplint/cpplint/blob/develop/cpplint.py but it still doesnt catch this style violation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org