[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1052: MINIFICPP-1244 Support the Initial Start Position property in TailFile

2021-04-23 Thread GitBox


adamdebreceni commented on a change in pull request #1052:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1052#discussion_r619061515



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -653,29 +688,57 @@ void TailFile::onTrigger(const 
std::shared_ptr &, const st
   if (!session->existsFlowFileInRelationship(Success)) {
 yield();
   }
+
+  first_trigger_ = false;
+}
+
+bool TailFile::isOldFileInitiallyRead(TailState &state) const {
+  // This is our initial processing and no stored state was found
+  return first_trigger_ && state.last_read_time_ == 
std::chrono::system_clock::time_point{};
 }
 
 void TailFile::processFile(const std::shared_ptr 
&session,
const std::string &full_file_name,
TailState &state) {
-  uint64_t fsize = utils::file::FileUtils::file_size(full_file_name);
-  if (fsize < state.position_) {
-processRotatedFiles(session, state);
-  } else if (fsize == state.position_) {
-logger_->log_trace("Skipping file %s as its size hasn't change since last 
read", state.file_name_);
-return;
+  if (isOldFileInitiallyRead(state)) {
+if (initial_start_position_ == InitialStartPositions::BEGINNING_OF_TIME) {
+  processAllRotatedFiles(session, state);
+} else if (initial_start_position_ == InitialStartPositions::CURRENT_TIME) 
{
+  state.position_ = utils::file::FileUtils::file_size(full_file_name);
+  state.last_read_time_ = std::chrono::system_clock::now();
+  state.checksum_ = 
utils::file::FileUtils::computeChecksum(full_file_name, state.position_);
+  storeState();
+  return;
+}
+  } else {
+uint64_t fsize = utils::file::FileUtils::file_size(full_file_name);
+if (fsize < state.position_) {
+  processRotatedFilesAfterLastReadTime(session, state);
+} else if (fsize == state.position_) {
+  logger_->log_trace("Skipping file %s as its size hasn't change since 
last read", state.file_name_);

Review comment:
   minor: typo
   ```suggestion
 logger_->log_trace("Skipping file %s as its size hasn't changed since 
last read", state.file_name_);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1052: MINIFICPP-1244 Support the Initial Start Position property in TailFile

2021-04-23 Thread GitBox


adamdebreceni commented on a change in pull request #1052:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1052#discussion_r619055105



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -609,6 +640,10 @@ std::vector TailFile::findRotatedFiles(const 
TailState &state) const
 
   utils::file::FileUtils::list_dir(state.path_, collect_matching_files, 
logger_, false);
 
+  return findRotatedFiles(state, matched_files_with_mtime);
+}
+
+std::vector TailFile::findRotatedFiles(const TailState &state, 
std::vector& matched_files_with_mtime) const {

Review comment:
   this might be a misleading name, a `sortAndSkipMainFilePrefix` or some 
similar contraption could better describe its function 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1052: MINIFICPP-1244 Support the Initial Start Position property in TailFile

2021-04-23 Thread GitBox


adamdebreceni commented on a change in pull request #1052:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1052#discussion_r619043103



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -653,29 +688,57 @@ void TailFile::onTrigger(const 
std::shared_ptr &, const st
   if (!session->existsFlowFileInRelationship(Success)) {
 yield();
   }
+
+  first_trigger_ = false;
+}
+
+bool TailFile::isOldFileInitiallyRead(TailState &state) const {
+  // This is our initial processing and no stored state was found
+  return first_trigger_ && state.last_read_time_ == 
std::chrono::system_clock::time_point{};
 }
 
 void TailFile::processFile(const std::shared_ptr 
&session,
const std::string &full_file_name,
TailState &state) {
-  uint64_t fsize = utils::file::FileUtils::file_size(full_file_name);
-  if (fsize < state.position_) {
-processRotatedFiles(session, state);
-  } else if (fsize == state.position_) {
-logger_->log_trace("Skipping file %s as its size hasn't change since last 
read", state.file_name_);
-return;
+  if (isOldFileInitiallyRead(state)) {
+if (initial_start_position_ == InitialStartPositions::BEGINNING_OF_TIME) {
+  processAllRotatedFiles(session, state);
+} else if (initial_start_position_ == InitialStartPositions::CURRENT_TIME) 
{
+  state.position_ = utils::file::FileUtils::file_size(full_file_name);
+  state.last_read_time_ = std::chrono::system_clock::now();
+  state.checksum_ = 
utils::file::FileUtils::computeChecksum(full_file_name, state.position_);
+  storeState();
+  return;
+}
+  } else {
+uint64_t fsize = utils::file::FileUtils::file_size(full_file_name);
+if (fsize < state.position_) {
+  processRotatedFilesAfterLastReadTime(session, state);
+} else if (fsize == state.position_) {
+  logger_->log_trace("Skipping file %s as its size hasn't change since 
last read", state.file_name_);
+  return;
+}
   }
 
   processSingleFile(session, full_file_name, state);
 }
 
-void TailFile::processRotatedFiles(const std::shared_ptr 
&session, TailState &state) {
-std::vector rotated_file_states = findRotatedFiles(state);
-for (TailState &file_state : rotated_file_states) {
-  processSingleFile(session, file_state.fileNameWithPath(), file_state);
-}
-state.position_ = 0;
-state.checksum_ = 0;
+void TailFile::processRotatedFilesAfterLastReadTime(const 
std::shared_ptr &session, TailState &state) {
+  std::vector rotated_file_states = 
findRotatedFilesAfterLastReadTime(state);
+  processRotatedFiles(session, state, rotated_file_states);
+}
+
+void TailFile::processAllRotatedFiles(const 
std::shared_ptr &session, TailState &state) {
+  std::vector rotated_file_states = findAllRotatedFiles(state);
+  processRotatedFiles(session, state, rotated_file_states);
+}
+
+void TailFile::processRotatedFiles(const std::shared_ptr 
&session, TailState &state, std::vector &rotated_file_states) {
+  for (TailState &file_state : rotated_file_states) {
+processSingleFile(session, file_state.fileNameWithPath(), file_state);

Review comment:
   I ment that `storeState` persists the `tail_states_` vector, which, as 
fas as I understand, does not contains the `state` parameter when called with 
rotated files, so in this case `storeState` seems noop (or rather 
"misleading"), could moving `storeState` to the end of `processFile` improve it 
in this regard?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1052: MINIFICPP-1244 Support the Initial Start Position property in TailFile

2021-04-22 Thread GitBox


adamdebreceni commented on a change in pull request #1052:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1052#discussion_r618457506



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -653,29 +688,57 @@ void TailFile::onTrigger(const 
std::shared_ptr &, const st
   if (!session->existsFlowFileInRelationship(Success)) {
 yield();
   }
+
+  first_trigger_ = false;
+}
+
+bool TailFile::isOldFileInitiallyRead(TailState &state) const {
+  // This is our initial processing and no stored state was found
+  return first_trigger_ && state.last_read_time_ == 
std::chrono::system_clock::time_point{};
 }
 
 void TailFile::processFile(const std::shared_ptr 
&session,
const std::string &full_file_name,
TailState &state) {
-  uint64_t fsize = utils::file::FileUtils::file_size(full_file_name);
-  if (fsize < state.position_) {
-processRotatedFiles(session, state);
-  } else if (fsize == state.position_) {
-logger_->log_trace("Skipping file %s as its size hasn't change since last 
read", state.file_name_);
-return;
+  if (isOldFileInitiallyRead(state)) {
+if (initial_start_position_ == InitialStartPositions::BEGINNING_OF_TIME) {
+  processAllRotatedFiles(session, state);
+} else if (initial_start_position_ == InitialStartPositions::CURRENT_TIME) 
{
+  state.position_ = utils::file::FileUtils::file_size(full_file_name);
+  state.last_read_time_ = std::chrono::system_clock::now();
+  state.checksum_ = 
utils::file::FileUtils::computeChecksum(full_file_name, state.position_);
+  storeState();
+  return;
+}
+  } else {
+uint64_t fsize = utils::file::FileUtils::file_size(full_file_name);
+if (fsize < state.position_) {
+  processRotatedFilesAfterLastReadTime(session, state);
+} else if (fsize == state.position_) {
+  logger_->log_trace("Skipping file %s as its size hasn't change since 
last read", state.file_name_);
+  return;
+}
   }
 
   processSingleFile(session, full_file_name, state);
 }
 
-void TailFile::processRotatedFiles(const std::shared_ptr 
&session, TailState &state) {
-std::vector rotated_file_states = findRotatedFiles(state);
-for (TailState &file_state : rotated_file_states) {
-  processSingleFile(session, file_state.fileNameWithPath(), file_state);
-}
-state.position_ = 0;
-state.checksum_ = 0;
+void TailFile::processRotatedFilesAfterLastReadTime(const 
std::shared_ptr &session, TailState &state) {
+  std::vector rotated_file_states = 
findRotatedFilesAfterLastReadTime(state);
+  processRotatedFiles(session, state, rotated_file_states);
+}
+
+void TailFile::processAllRotatedFiles(const 
std::shared_ptr &session, TailState &state) {
+  std::vector rotated_file_states = findAllRotatedFiles(state);
+  processRotatedFiles(session, state, rotated_file_states);
+}
+
+void TailFile::processRotatedFiles(const std::shared_ptr 
&session, TailState &state, std::vector &rotated_file_states) {
+  for (TailState &file_state : rotated_file_states) {
+processSingleFile(session, file_state.fileNameWithPath(), file_state);

Review comment:
   `processSingleFile` calls `storeState`, is that desirable with rotated 
files?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1052: MINIFICPP-1244 Support the Initial Start Position property in TailFile

2021-04-22 Thread GitBox


adamdebreceni commented on a change in pull request #1052:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1052#discussion_r618437327



##
File path: extensions/standard-processors/processors/TailFile.h
##
@@ -91,9 +99,12 @@ class TailFile : public core::Processor {
   static core::Property RecursiveLookup;
   static core::Property LookupFrequency;
   static core::Property RollingFilenamePattern;
+  static core::Property InitialStartPosition;
   // Supported Relationships
   static core::Relationship Success;
 
+  static const std::set INITIAL_START_POSITIONS;

Review comment:
   is this used somewhere?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1052: MINIFICPP-1244 Support the Initial Start Position property in TailFile

2021-04-22 Thread GitBox


adamdebreceni commented on a change in pull request #1052:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1052#discussion_r618393554



##
File path: libminifi/include/utils/ProcessorConfigUtils.h
##
@@ -19,59 +19,23 @@
 
 #include 
 #include 
+#include 
 
-#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
 
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace utils {
 
-std::string getRequiredPropertyOrThrow(const core::ProcessContext* context, 
const std::string& property_name) {
-  std::string value;
-  if (!context->getProperty(property_name, value)) {
-throw std::runtime_error(property_name + " property missing or invalid");
-  }
-  return value;
-}
-
-std::vector listFromCommaSeparatedProperty(const 
core::ProcessContext* context, const std::string& property_name) {
-  std::string property_string;
-  context->getProperty(property_name, property_string);
-  return utils::StringUtils::splitAndTrim(property_string, ",");
-}
-
-std::vector listFromRequiredCommaSeparatedProperty(const 
core::ProcessContext* context, const std::string& property_name) {
-  return utils::StringUtils::splitAndTrim(getRequiredPropertyOrThrow(context, 
property_name), ",");
-}
-
-bool parseBooleanPropertyOrThrow(core::ProcessContext* context, const 
std::string& property_name) {
-  const std::string value_str = getRequiredPropertyOrThrow(context, 
property_name);
-  utils::optional maybe_value = utils::StringUtils::toBool(value_str);
-  if (!maybe_value) {
-throw std::runtime_error(property_name + " property is invalid: value is " 
+ value_str);
-  }
-  return maybe_value.value();
-}
-
-std::chrono::milliseconds parseTimePropertyMSOrThrow(core::ProcessContext* 
context, const std::string& property_name) {
-  core::TimeUnit unit;
-  uint64_t time_value_ms;
-  const std::string value_str = getRequiredPropertyOrThrow(context, 
property_name);
-  if (!core::Property::StringToTime(value_str, time_value_ms, unit) || 
!core::Property::ConvertTimeUnitToMS(time_value_ms, unit, time_value_ms)) {
-throw std::runtime_error(property_name + " property is invalid: value is " 
+ value_str);
-  }
-  return std::chrono::milliseconds(time_value_ms);
-}
-
-utils::optional getOptionalUintProperty(const core::ProcessContext& 
context, const std::string& property_name) {
-  uint64_t value;
-  if (context.getProperty(property_name, value)) {
-return { value };
-  }
-  return utils::nullopt;
-}
+std::string getRequiredPropertyOrThrow(const core::ProcessContext* context, 
const std::string& property_name);

Review comment:
   I was under the impression that `getProperty` already throws on missing 
required properties
   
[here](https://github.com/apache/nifi-minifi-cpp/blob/89d68e21bf28f3b25e4a64117b631668b752889f/libminifi/include/core/ConfigurableComponent.h#L227)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org