[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-15 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r440097068



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -204,52 +418,78 @@ void TailFile::parseStateFileLine(char *buf) {
   }
 
   std::string value = equal;
-  key = trimRight(key);
-  value = trimRight(value);
+  key = utils::StringUtils::trimRight(key);
+  value = utils::StringUtils::trimRight(value);
 
   if (key == "FILENAME") {
 std::string fileLocation, fileName;
 if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, 
fileName)) {
   logger_->log_debug("State migration received path %s, file %s", 
fileLocation, fileName);
-  tail_states_.insert(std::make_pair(fileName, TailState { fileLocation, 
fileName, 0, 0 }));
+  state.emplace(fileName, TailState{fileLocation, fileName});
 } else {
-  tail_states_.insert(std::make_pair(value, TailState { fileLocation, 
value, 0, 0 }));
+  state.emplace(value, TailState{fileLocation, value});
 }
   }
   if (key == "POSITION") {
 // for backwards compatibility
-if (tail_states_.size() != 1) {
+if (tail_states_.size() != std::size_t{1}) {
   throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, 
"Incompatible state file types");
 }
 const auto position = std::stoull(value);
 logger_->log_debug("Received position %d", position);
-tail_states_.begin()->second.currentTailFilePosition_ = position;
+state.begin()->second.position_ = position;
   }
   if (key.find(CURRENT_STR) == 0) {
 const auto file = key.substr(strlen(CURRENT_STR));
 std::string fileLocation, fileName;
 if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, 
fileName)) {
-  tail_states_[file].path_ = fileLocation;
-  tail_states_[file].current_file_name_ = fileName;
+  state[file].path_ = fileLocation;
+  state[file].file_name_ = fileName;
 } else {
   throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "State file 
contains an invalid file name");
 }
   }
 
   if (key.find(POSITION_STR) == 0) {
 const auto file = key.substr(strlen(POSITION_STR));
-tail_states_[file].currentTailFilePosition_ = std::stoull(value);
+state[file].position_ = std::stoull(value);
   }
 }
 
+bool TailFile::recoverState(const std::shared_ptr& 
context) {
+  std::map new_tail_states;
+  bool state_load_success = getStateFromStateManager(new_tail_states) ||
+getStateFromLegacyStateFile(context, 
new_tail_states);
+  if (!state_load_success) {
+return false;
+  }
 
+  logger_->log_debug("load state succeeded");
 
-bool TailFile::recoverState(const std::shared_ptr& 
context) {
-  bool state_load_success = false;
+  if (tail_mode_ == Mode::SINGLE) {
+if (tail_states_.size() == 1) {
+  auto state_it = tail_states_.begin();
+  const auto it = new_tail_states.find(state_it->first);
+  if (it != new_tail_states.end()) {
+state_it->second = it->second;
+  }
+} else {
+  throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "This should 
never happen: "

Review comment:
   I don't think it can happen, as a configuration change will restart the 
processor.  A newly created processor in Single file mode starts with a 
one-element state, and we only update that one item here, we don't add to it.  
This `throw` is here only to guard against later code changes which change this 
behavior.
   
   As to invalidating the state, I'll need to look at how NiFi does that.  I 
think there is value in keeping the state of existing files after a 
configuration change: for example, if the selection is broadened in Multi file 
mode (say from `*.log` to `*.*`), we probably don't want to restart the files 
we have already tailed from the beginning.  But we should do whatever NiFi 
does.  Can that be a separate pull request?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-12 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r439433613



##
File path: extensions/standard-processors/tests/unit/TailFileTests.cpp
##
@@ -652,46 +872,606 @@ TEST_CASE("TailFileWithMultileRolledOverFiles", 
"[tailfiletest2]") {
   char format[] = "/tmp/gt.XX";
   auto dir = testController.createTempDirectory(format);
 
-  // Define test input file
-  std::string in_file(dir);
-  in_file.append("fruits.txt");
+  std::string test_file = dir + utils::file::FileUtils::get_separator() + 
"fruits.log";
 
-  for (int i = 2; 0 <= i; --i) {
-if (i < 2) {
-  std::this_thread::sleep_for(std::chrono::milliseconds(1000));  // make 
sure the new file gets newer modification time
-}
-std::ofstream in_file_stream(in_file + (i > 0 ? std::to_string(i) : ""));
-for (int j = 0; j <= i; j++) {
-  in_file_stream << "Apple" << DELIM;
-}
-in_file_stream.close();
-  }
+  std::ofstream test_file_stream_0(test_file, std::ios::binary);
+  test_file_stream_0 << "Apple" << DELIM << "Orange" << DELIM;
+  test_file_stream_0.flush();
 
   // Build MiNiFi processing graph
   auto tail_file = plan->addProcessor("TailFile", "Tail");
   plan->setProperty(tail_file, processors::TailFile::Delimiter.getName(), 
std::string(1, DELIM));
-  plan->setProperty(tail_file, processors::TailFile::FileName.getName(), 
in_file);
+  plan->setProperty(tail_file, processors::TailFile::FileName.getName(), 
test_file);
   auto log_attr = plan->addProcessor("LogAttribute", "Log", 
core::Relationship("success", "description"), true);
   plan->setProperty(log_attr, 
processors::LogAttribute::FlowFilesToLog.getName(), "0");
-  // Log as many FFs as it can to make sure exactly the expected amount is 
produced
 
-  // Each iteration should go through one file and log all flowfiles
-  for (int i = 2; 0 <= i; --i) {
-plan->reset();
-plan->runNextProcessor();  // Tail
-plan->runNextProcessor();  // Log
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename 
value:fruits.0-5.log"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename 
value:fruits.6-12.log"));
+
+  std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+  test_file_stream_0 << "Pear" << DELIM;
+  test_file_stream_0.close();
+
+  std::string first_rotated_file = dir + 
utils::file::FileUtils::get_separator() + "fruits.0.log";
+  REQUIRE(rename(test_file.c_str(), first_rotated_file.c_str()) == 0);
+
+  std::ofstream test_file_stream_1(test_file, std::ios::binary);
+  test_file_stream_1 << "Pineapple" << DELIM << "Kiwi" << DELIM;
+  test_file_stream_1.close();
+
+  std::string second_rotated_file = dir + 
utils::file::FileUtils::get_separator() + "fruits.1.log";
+  REQUIRE(rename(test_file.c_str(), second_rotated_file.c_str()) == 0);
+
+  std::ofstream test_file_stream_2(test_file, std::ios::binary);
+  test_file_stream_2 << "Apricont" << DELIM;

Review comment:
   An apricont is a type of apricot often depicted in Russian religious 
paintings.  Fixed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-12 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r439393349



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -335,170 +563,210 @@ bool TailFile::storeState(const 
std::shared_ptr& context)
   return true;
 }
 
-static bool sortTailMatchedFileItem(TailMatchedFileItem i, TailMatchedFileItem 
j) {
-  return (i.modifiedTime < j.modifiedTime);
-}
-void TailFile::checkRollOver(const std::shared_ptr& 
context, TailState , const std::string _file_name) {
-  struct stat statbuf;
-  std::vector matchedFiles;
-  std::string fullPath = file.path_ + utils::file::FileUtils::get_separator() 
+ file.current_file_name_;
-
-  if (stat(fullPath.c_str(), ) == 0) {
-logger_->log_trace("Searching for files rolled over");
-std::string pattern = file.current_file_name_;
-std::size_t found = file.current_file_name_.find_last_of(".");
-if (found != std::string::npos)
-  pattern = file.current_file_name_.substr(0, found);
-
-// Callback, called for each file entry in the listed directory
-// Return value is used to break (false) or continue (true) listing
-auto lambda = [&](const std::string& path, const std::string& filename) -> 
bool {
-  struct stat sb;
-  std::string fileFullName = path + 
utils::file::FileUtils::get_separator() + filename;
-  if ((fileFullName.find(pattern) != std::string::npos) && 
stat(fileFullName.c_str(), ) == 0) {
-uint64_t candidateModTime = ((uint64_t) (sb.st_mtime) * 1000);
-if (candidateModTime >= file.currentTailFileModificationTime_) {
-  logging::LOG_TRACE(logger_) << "File " << filename << " (short name 
" << file.current_file_name_ <<
-  ") disk mod time " << candidateModTime << ", struct mod time " << 
file.currentTailFileModificationTime_ << ", size on disk " << sb.st_size << ", 
position " << file.currentTailFilePosition_;
-  if (filename == file.current_file_name_ && candidateModTime == 
file.currentTailFileModificationTime_ &&
-  sb.st_size == file.currentTailFilePosition_) {
-return true;  // Skip the current file as a candidate in case it 
wasn't updated
+std::vector TailFile::findRotatedFiles(const TailState ) 
const {
+  logger_->log_debug("Searching for files rolled over; last read time is %llu",
+  
std::chrono::time_point_cast(state.last_read_time_).time_since_epoch().count());
+
+  std::size_t last_dot_position = state.file_name_.find_last_of('.');
+  std::string base_name = state.file_name_.substr(0, last_dot_position);
+  std::string pattern = 
utils::StringUtils::replaceOne(rolling_filename_pattern_, "${filename}", 
base_name);
+
+  std::vector matched_files_with_mtime;
+  auto collect_matching_files = [&](const std::string , const std::string 
_name) -> bool {
+if (file_name != state.file_name_ && 
utils::Regex::matchesFullInput(pattern, file_name)) {
+  std::string full_file_name = path + 
utils::file::FileUtils::get_separator() + file_name;
+  TailStateWithMtime::TimePoint 
mtime{utils::file::FileUtils::last_write_time_point(full_file_name)};
+  logger_->log_debug("File %s with mtime %llu matches rolling filename 
pattern %s", file_name, mtime.time_since_epoch().count(), pattern);

Review comment:
   done

##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -335,170 +563,210 @@ bool TailFile::storeState(const 
std::shared_ptr& context)
   return true;
 }
 
-static bool sortTailMatchedFileItem(TailMatchedFileItem i, TailMatchedFileItem 
j) {
-  return (i.modifiedTime < j.modifiedTime);
-}
-void TailFile::checkRollOver(const std::shared_ptr& 
context, TailState , const std::string _file_name) {
-  struct stat statbuf;
-  std::vector matchedFiles;
-  std::string fullPath = file.path_ + utils::file::FileUtils::get_separator() 
+ file.current_file_name_;
-
-  if (stat(fullPath.c_str(), ) == 0) {
-logger_->log_trace("Searching for files rolled over");
-std::string pattern = file.current_file_name_;
-std::size_t found = file.current_file_name_.find_last_of(".");
-if (found != std::string::npos)
-  pattern = file.current_file_name_.substr(0, found);
-
-// Callback, called for each file entry in the listed directory
-// Return value is used to break (false) or continue (true) listing
-auto lambda = [&](const std::string& path, const std::string& filename) -> 
bool {
-  struct stat sb;
-  std::string fileFullName = path + 
utils::file::FileUtils::get_separator() + filename;
-  if ((fileFullName.find(pattern) != std::string::npos) && 
stat(fileFullName.c_str(), ) == 0) {
-uint64_t candidateModTime = ((uint64_t) (sb.st_mtime) * 1000);
-if (candidateModTime >= file.currentTailFileModificationTime_) {
-  logging::LOG_TRACE(logger_) << "File " << filename << " (short name 
" << file.current_file_name_ <<
-  ") disk mod time " << 

[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-12 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r439393096



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -40,60 +35,283 @@
 #include 
 #include 
 
+#include "io/CRCStream.h"
 #include "utils/file/FileUtils.h"
 #include "utils/file/PathUtils.h"
 #include "utils/TimeUtil.h"
 #include "utils/StringUtils.h"
 #include "utils/RegexUtils.h"
-#ifdef HAVE_REGEX_CPP
-#include 
-#else
-#include 
-#endif
 #include "TailFile.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 
-#ifndef S_ISDIR
-#define S_ISDIR(mode)  (((mode) & S_IFMT) == S_IFDIR)
-#endif
-
-#if defined(__clang__)
-#pragma clang diagnostic push
-#pragma clang diagnostic ignored "-Wsign-compare"
-#elif defined(__GNUC__) || defined(__GNUG__)
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Wsign-compare"
-#endif
-
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace processors {
 
-core::Property TailFile::FileName("File to Tail", "Fully-qualified filename of 
the file that should be tailed when using single file mode, or a file regex 
when using multifile mode", "");
-core::Property TailFile::StateFile("State File", "DEPRECATED. Only use it for 
state migration from the legacy state file.",
-   "TailFileState");
-core::Property TailFile::Delimiter("Input Delimiter", "Specifies the character 
that should be used for delimiting the data being tailed"
-   "from the incoming file."
-   "If none is specified, data will be 
ingested as it becomes available.",
-   "");
+core::Property TailFile::FileName(
+core::PropertyBuilder::createProperty("File to Tail")
+->withDescription("Fully-qualified filename of the file that should be 
tailed when using single file mode, or a file regex when using multifile mode")
+->isRequired(true)
+->build());
+
+core::Property TailFile::StateFile(
+core::PropertyBuilder::createProperty("State File")
+->withDescription("DEPRECATED. Only use it for state migration from 
the legacy state file.")
+->isRequired(false)
+->withDefaultValue("TailFileState")
+->build());
+
+core::Property TailFile::Delimiter(
+core::PropertyBuilder::createProperty("Input Delimiter")
+->withDescription("Specifies the character that should be used for 
delimiting the data being tailed"
+ "from the incoming file. If none is specified, data will be ingested 
as it becomes available.")
+->isRequired(false)
+->withDefaultValue("\\n")
+->build());
 
 core::Property TailFile::TailMode(
-core::PropertyBuilder::createProperty("tail-mode", "Tailing 
Mode")->withDescription(
-"Specifies the tail file mode. In 'Single file' mode only a single 
file will be watched. "
+core::PropertyBuilder::createProperty("tail-mode", "Tailing Mode")
+->withDescription("Specifies the tail file mode. In 'Single file' mode 
only a single file will be watched. "
 "In 'Multiple file' mode a regex may be used. Note that in multiple 
file mode we will still continue to watch for rollover on the initial set of 
watched files. "
 "The Regex used to locate multiple files will be run during the 
schedule phrase. Note that if rotated files are matched by the regex, those 
files will be tailed.")->isRequired(true)
-->withAllowableValue("Single 
file")->withAllowableValue("Multiple file")->withDefaultValue("Single 
file")->build());
-
-core::Property 
TailFile::BaseDirectory(core::PropertyBuilder::createProperty("tail-base-directory",
 "Base Directory")->isRequired(false)->build());
+->withAllowableValue("Single 
file")->withAllowableValue("Multiple file")->withDefaultValue("Single file")
+->build());
+
+core::Property TailFile::BaseDirectory(
+core::PropertyBuilder::createProperty("tail-base-directory", "Base 
Directory")
+->withDescription("Base directory used to look for files to tail. This 
property is required when using Multiple file mode.")
+->isRequired(false)
+->build());
+
+core::Property TailFile::RecursiveLookup(
+core::PropertyBuilder::createProperty("Recursive lookup")
+->withDescription("When using Multiple file mode, this property 
determines whether files are tailed in "
+"child directories of the Base Directory or not.")
+->isRequired(false)
+->withDefaultValue(false)
+->build());
+
+core::Property TailFile::LookupFrequency(
+core::PropertyBuilder::createProperty("Lookup frequency")
+->withDescription("When using Multiple file mode, this property 
specifies the minimum duration "
+"the processor will wait between looking for new files to tail in the 
Base Directory.")
+->isRequired(false)
+

[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-12 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r439392365



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -40,60 +35,283 @@
 #include 
 #include 
 
+#include "io/CRCStream.h"
 #include "utils/file/FileUtils.h"
 #include "utils/file/PathUtils.h"
 #include "utils/TimeUtil.h"
 #include "utils/StringUtils.h"
 #include "utils/RegexUtils.h"
-#ifdef HAVE_REGEX_CPP
-#include 
-#else
-#include 
-#endif
 #include "TailFile.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 
-#ifndef S_ISDIR
-#define S_ISDIR(mode)  (((mode) & S_IFMT) == S_IFDIR)
-#endif
-
-#if defined(__clang__)
-#pragma clang diagnostic push
-#pragma clang diagnostic ignored "-Wsign-compare"
-#elif defined(__GNUC__) || defined(__GNUG__)
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Wsign-compare"
-#endif
-
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace processors {
 
-core::Property TailFile::FileName("File to Tail", "Fully-qualified filename of 
the file that should be tailed when using single file mode, or a file regex 
when using multifile mode", "");
-core::Property TailFile::StateFile("State File", "DEPRECATED. Only use it for 
state migration from the legacy state file.",
-   "TailFileState");
-core::Property TailFile::Delimiter("Input Delimiter", "Specifies the character 
that should be used for delimiting the data being tailed"
-   "from the incoming file."
-   "If none is specified, data will be 
ingested as it becomes available.",
-   "");
+core::Property TailFile::FileName(
+core::PropertyBuilder::createProperty("File to Tail")
+->withDescription("Fully-qualified filename of the file that should be 
tailed when using single file mode, or a file regex when using multifile mode")
+->isRequired(true)
+->build());
+
+core::Property TailFile::StateFile(
+core::PropertyBuilder::createProperty("State File")
+->withDescription("DEPRECATED. Only use it for state migration from 
the legacy state file.")
+->isRequired(false)
+->withDefaultValue("TailFileState")
+->build());
+
+core::Property TailFile::Delimiter(
+core::PropertyBuilder::createProperty("Input Delimiter")
+->withDescription("Specifies the character that should be used for 
delimiting the data being tailed"
+ "from the incoming file. If none is specified, data will be ingested 
as it becomes available.")
+->isRequired(false)
+->withDefaultValue("\\n")
+->build());
 
 core::Property TailFile::TailMode(
-core::PropertyBuilder::createProperty("tail-mode", "Tailing 
Mode")->withDescription(
-"Specifies the tail file mode. In 'Single file' mode only a single 
file will be watched. "
+core::PropertyBuilder::createProperty("tail-mode", "Tailing Mode")
+->withDescription("Specifies the tail file mode. In 'Single file' mode 
only a single file will be watched. "
 "In 'Multiple file' mode a regex may be used. Note that in multiple 
file mode we will still continue to watch for rollover on the initial set of 
watched files. "
 "The Regex used to locate multiple files will be run during the 
schedule phrase. Note that if rotated files are matched by the regex, those 
files will be tailed.")->isRequired(true)
-->withAllowableValue("Single 
file")->withAllowableValue("Multiple file")->withDefaultValue("Single 
file")->build());
-
-core::Property 
TailFile::BaseDirectory(core::PropertyBuilder::createProperty("tail-base-directory",
 "Base Directory")->isRequired(false)->build());
+->withAllowableValue("Single 
file")->withAllowableValue("Multiple file")->withDefaultValue("Single file")
+->build());
+
+core::Property TailFile::BaseDirectory(
+core::PropertyBuilder::createProperty("tail-base-directory", "Base 
Directory")
+->withDescription("Base directory used to look for files to tail. This 
property is required when using Multiple file mode.")
+->isRequired(false)
+->build());
+
+core::Property TailFile::RecursiveLookup(
+core::PropertyBuilder::createProperty("Recursive lookup")
+->withDescription("When using Multiple file mode, this property 
determines whether files are tailed in "
+"child directories of the Base Directory or not.")
+->isRequired(false)
+->withDefaultValue(false)
+->build());
+
+core::Property TailFile::LookupFrequency(
+core::PropertyBuilder::createProperty("Lookup frequency")
+->withDescription("When using Multiple file mode, this property 
specifies the minimum duration "
+"the processor will wait between looking for new files to tail in the 
Base Directory.")
+->isRequired(false)
+

[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-12 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r439392365



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -40,60 +35,283 @@
 #include 
 #include 
 
+#include "io/CRCStream.h"
 #include "utils/file/FileUtils.h"
 #include "utils/file/PathUtils.h"
 #include "utils/TimeUtil.h"
 #include "utils/StringUtils.h"
 #include "utils/RegexUtils.h"
-#ifdef HAVE_REGEX_CPP
-#include 
-#else
-#include 
-#endif
 #include "TailFile.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 
-#ifndef S_ISDIR
-#define S_ISDIR(mode)  (((mode) & S_IFMT) == S_IFDIR)
-#endif
-
-#if defined(__clang__)
-#pragma clang diagnostic push
-#pragma clang diagnostic ignored "-Wsign-compare"
-#elif defined(__GNUC__) || defined(__GNUG__)
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Wsign-compare"
-#endif
-
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace processors {
 
-core::Property TailFile::FileName("File to Tail", "Fully-qualified filename of 
the file that should be tailed when using single file mode, or a file regex 
when using multifile mode", "");
-core::Property TailFile::StateFile("State File", "DEPRECATED. Only use it for 
state migration from the legacy state file.",
-   "TailFileState");
-core::Property TailFile::Delimiter("Input Delimiter", "Specifies the character 
that should be used for delimiting the data being tailed"
-   "from the incoming file."
-   "If none is specified, data will be 
ingested as it becomes available.",
-   "");
+core::Property TailFile::FileName(
+core::PropertyBuilder::createProperty("File to Tail")
+->withDescription("Fully-qualified filename of the file that should be 
tailed when using single file mode, or a file regex when using multifile mode")
+->isRequired(true)
+->build());
+
+core::Property TailFile::StateFile(
+core::PropertyBuilder::createProperty("State File")
+->withDescription("DEPRECATED. Only use it for state migration from 
the legacy state file.")
+->isRequired(false)
+->withDefaultValue("TailFileState")
+->build());
+
+core::Property TailFile::Delimiter(
+core::PropertyBuilder::createProperty("Input Delimiter")
+->withDescription("Specifies the character that should be used for 
delimiting the data being tailed"
+ "from the incoming file. If none is specified, data will be ingested 
as it becomes available.")
+->isRequired(false)
+->withDefaultValue("\\n")
+->build());
 
 core::Property TailFile::TailMode(
-core::PropertyBuilder::createProperty("tail-mode", "Tailing 
Mode")->withDescription(
-"Specifies the tail file mode. In 'Single file' mode only a single 
file will be watched. "
+core::PropertyBuilder::createProperty("tail-mode", "Tailing Mode")
+->withDescription("Specifies the tail file mode. In 'Single file' mode 
only a single file will be watched. "
 "In 'Multiple file' mode a regex may be used. Note that in multiple 
file mode we will still continue to watch for rollover on the initial set of 
watched files. "
 "The Regex used to locate multiple files will be run during the 
schedule phrase. Note that if rotated files are matched by the regex, those 
files will be tailed.")->isRequired(true)
-->withAllowableValue("Single 
file")->withAllowableValue("Multiple file")->withDefaultValue("Single 
file")->build());
-
-core::Property 
TailFile::BaseDirectory(core::PropertyBuilder::createProperty("tail-base-directory",
 "Base Directory")->isRequired(false)->build());
+->withAllowableValue("Single 
file")->withAllowableValue("Multiple file")->withDefaultValue("Single file")
+->build());
+
+core::Property TailFile::BaseDirectory(
+core::PropertyBuilder::createProperty("tail-base-directory", "Base 
Directory")
+->withDescription("Base directory used to look for files to tail. This 
property is required when using Multiple file mode.")
+->isRequired(false)
+->build());
+
+core::Property TailFile::RecursiveLookup(
+core::PropertyBuilder::createProperty("Recursive lookup")
+->withDescription("When using Multiple file mode, this property 
determines whether files are tailed in "
+"child directories of the Base Directory or not.")
+->isRequired(false)
+->withDefaultValue(false)
+->build());
+
+core::Property TailFile::LookupFrequency(
+core::PropertyBuilder::createProperty("Lookup frequency")
+->withDescription("When using Multiple file mode, this property 
specifies the minimum duration "
+"the processor will wait between looking for new files to tail in the 
Base Directory.")
+->isRequired(false)
+

[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-12 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r439390975



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -40,60 +35,283 @@
 #include 
 #include 
 
+#include "io/CRCStream.h"
 #include "utils/file/FileUtils.h"
 #include "utils/file/PathUtils.h"
 #include "utils/TimeUtil.h"
 #include "utils/StringUtils.h"
 #include "utils/RegexUtils.h"
-#ifdef HAVE_REGEX_CPP
-#include 
-#else
-#include 
-#endif
 #include "TailFile.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 
-#ifndef S_ISDIR
-#define S_ISDIR(mode)  (((mode) & S_IFMT) == S_IFDIR)
-#endif
-
-#if defined(__clang__)
-#pragma clang diagnostic push
-#pragma clang diagnostic ignored "-Wsign-compare"
-#elif defined(__GNUC__) || defined(__GNUG__)
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Wsign-compare"
-#endif
-
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace processors {
 
-core::Property TailFile::FileName("File to Tail", "Fully-qualified filename of 
the file that should be tailed when using single file mode, or a file regex 
when using multifile mode", "");
-core::Property TailFile::StateFile("State File", "DEPRECATED. Only use it for 
state migration from the legacy state file.",
-   "TailFileState");
-core::Property TailFile::Delimiter("Input Delimiter", "Specifies the character 
that should be used for delimiting the data being tailed"
-   "from the incoming file."
-   "If none is specified, data will be 
ingested as it becomes available.",
-   "");
+core::Property TailFile::FileName(
+core::PropertyBuilder::createProperty("File to Tail")
+->withDescription("Fully-qualified filename of the file that should be 
tailed when using single file mode, or a file regex when using multifile mode")
+->isRequired(true)
+->build());
+
+core::Property TailFile::StateFile(
+core::PropertyBuilder::createProperty("State File")
+->withDescription("DEPRECATED. Only use it for state migration from 
the legacy state file.")
+->isRequired(false)
+->withDefaultValue("TailFileState")
+->build());
+
+core::Property TailFile::Delimiter(
+core::PropertyBuilder::createProperty("Input Delimiter")
+->withDescription("Specifies the character that should be used for 
delimiting the data being tailed"
+ "from the incoming file. If none is specified, data will be ingested 
as it becomes available.")
+->isRequired(false)
+->withDefaultValue("\\n")
+->build());
 
 core::Property TailFile::TailMode(
-core::PropertyBuilder::createProperty("tail-mode", "Tailing 
Mode")->withDescription(
-"Specifies the tail file mode. In 'Single file' mode only a single 
file will be watched. "
+core::PropertyBuilder::createProperty("tail-mode", "Tailing Mode")
+->withDescription("Specifies the tail file mode. In 'Single file' mode 
only a single file will be watched. "
 "In 'Multiple file' mode a regex may be used. Note that in multiple 
file mode we will still continue to watch for rollover on the initial set of 
watched files. "
 "The Regex used to locate multiple files will be run during the 
schedule phrase. Note that if rotated files are matched by the regex, those 
files will be tailed.")->isRequired(true)
-->withAllowableValue("Single 
file")->withAllowableValue("Multiple file")->withDefaultValue("Single 
file")->build());
-
-core::Property 
TailFile::BaseDirectory(core::PropertyBuilder::createProperty("tail-base-directory",
 "Base Directory")->isRequired(false)->build());
+->withAllowableValue("Single 
file")->withAllowableValue("Multiple file")->withDefaultValue("Single file")
+->build());
+
+core::Property TailFile::BaseDirectory(
+core::PropertyBuilder::createProperty("tail-base-directory", "Base 
Directory")
+->withDescription("Base directory used to look for files to tail. This 
property is required when using Multiple file mode.")
+->isRequired(false)
+->build());
+
+core::Property TailFile::RecursiveLookup(
+core::PropertyBuilder::createProperty("Recursive lookup")
+->withDescription("When using Multiple file mode, this property 
determines whether files are tailed in "
+"child directories of the Base Directory or not.")
+->isRequired(false)
+->withDefaultValue(false)
+->build());
+
+core::Property TailFile::LookupFrequency(
+core::PropertyBuilder::createProperty("Lookup frequency")
+->withDescription("When using Multiple file mode, this property 
specifies the minimum duration "
+"the processor will wait between looking for new files to tail in the 
Base Directory.")
+->isRequired(false)
+

[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-12 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r439391205



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -40,60 +35,283 @@
 #include 
 #include 
 
+#include "io/CRCStream.h"
 #include "utils/file/FileUtils.h"
 #include "utils/file/PathUtils.h"
 #include "utils/TimeUtil.h"
 #include "utils/StringUtils.h"
 #include "utils/RegexUtils.h"
-#ifdef HAVE_REGEX_CPP
-#include 
-#else
-#include 
-#endif
 #include "TailFile.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 
-#ifndef S_ISDIR
-#define S_ISDIR(mode)  (((mode) & S_IFMT) == S_IFDIR)
-#endif
-
-#if defined(__clang__)
-#pragma clang diagnostic push
-#pragma clang diagnostic ignored "-Wsign-compare"
-#elif defined(__GNUC__) || defined(__GNUG__)
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Wsign-compare"
-#endif
-
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace processors {
 
-core::Property TailFile::FileName("File to Tail", "Fully-qualified filename of 
the file that should be tailed when using single file mode, or a file regex 
when using multifile mode", "");
-core::Property TailFile::StateFile("State File", "DEPRECATED. Only use it for 
state migration from the legacy state file.",
-   "TailFileState");
-core::Property TailFile::Delimiter("Input Delimiter", "Specifies the character 
that should be used for delimiting the data being tailed"
-   "from the incoming file."
-   "If none is specified, data will be 
ingested as it becomes available.",
-   "");
+core::Property TailFile::FileName(
+core::PropertyBuilder::createProperty("File to Tail")
+->withDescription("Fully-qualified filename of the file that should be 
tailed when using single file mode, or a file regex when using multifile mode")
+->isRequired(true)
+->build());
+
+core::Property TailFile::StateFile(
+core::PropertyBuilder::createProperty("State File")
+->withDescription("DEPRECATED. Only use it for state migration from 
the legacy state file.")
+->isRequired(false)
+->withDefaultValue("TailFileState")
+->build());
+
+core::Property TailFile::Delimiter(
+core::PropertyBuilder::createProperty("Input Delimiter")
+->withDescription("Specifies the character that should be used for 
delimiting the data being tailed"
+ "from the incoming file. If none is specified, data will be ingested 
as it becomes available.")
+->isRequired(false)
+->withDefaultValue("\\n")
+->build());
 
 core::Property TailFile::TailMode(
-core::PropertyBuilder::createProperty("tail-mode", "Tailing 
Mode")->withDescription(
-"Specifies the tail file mode. In 'Single file' mode only a single 
file will be watched. "
+core::PropertyBuilder::createProperty("tail-mode", "Tailing Mode")
+->withDescription("Specifies the tail file mode. In 'Single file' mode 
only a single file will be watched. "
 "In 'Multiple file' mode a regex may be used. Note that in multiple 
file mode we will still continue to watch for rollover on the initial set of 
watched files. "
 "The Regex used to locate multiple files will be run during the 
schedule phrase. Note that if rotated files are matched by the regex, those 
files will be tailed.")->isRequired(true)
-->withAllowableValue("Single 
file")->withAllowableValue("Multiple file")->withDefaultValue("Single 
file")->build());
-
-core::Property 
TailFile::BaseDirectory(core::PropertyBuilder::createProperty("tail-base-directory",
 "Base Directory")->isRequired(false)->build());
+->withAllowableValue("Single 
file")->withAllowableValue("Multiple file")->withDefaultValue("Single file")
+->build());
+
+core::Property TailFile::BaseDirectory(
+core::PropertyBuilder::createProperty("tail-base-directory", "Base 
Directory")
+->withDescription("Base directory used to look for files to tail. This 
property is required when using Multiple file mode.")
+->isRequired(false)
+->build());
+
+core::Property TailFile::RecursiveLookup(
+core::PropertyBuilder::createProperty("Recursive lookup")
+->withDescription("When using Multiple file mode, this property 
determines whether files are tailed in "
+"child directories of the Base Directory or not.")
+->isRequired(false)
+->withDefaultValue(false)
+->build());
+
+core::Property TailFile::LookupFrequency(
+core::PropertyBuilder::createProperty("Lookup frequency")
+->withDescription("When using Multiple file mode, this property 
specifies the minimum duration "
+"the processor will wait between looking for new files to tail in the 
Base Directory.")
+->isRequired(false)
+

[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-12 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r439390114



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -40,60 +35,283 @@
 #include 
 #include 
 
+#include "io/CRCStream.h"
 #include "utils/file/FileUtils.h"
 #include "utils/file/PathUtils.h"
 #include "utils/TimeUtil.h"
 #include "utils/StringUtils.h"
 #include "utils/RegexUtils.h"
-#ifdef HAVE_REGEX_CPP
-#include 
-#else
-#include 
-#endif
 #include "TailFile.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 
-#ifndef S_ISDIR
-#define S_ISDIR(mode)  (((mode) & S_IFMT) == S_IFDIR)
-#endif
-
-#if defined(__clang__)
-#pragma clang diagnostic push
-#pragma clang diagnostic ignored "-Wsign-compare"
-#elif defined(__GNUC__) || defined(__GNUG__)
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Wsign-compare"
-#endif
-
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace processors {
 
-core::Property TailFile::FileName("File to Tail", "Fully-qualified filename of 
the file that should be tailed when using single file mode, or a file regex 
when using multifile mode", "");
-core::Property TailFile::StateFile("State File", "DEPRECATED. Only use it for 
state migration from the legacy state file.",
-   "TailFileState");
-core::Property TailFile::Delimiter("Input Delimiter", "Specifies the character 
that should be used for delimiting the data being tailed"
-   "from the incoming file."
-   "If none is specified, data will be 
ingested as it becomes available.",
-   "");
+core::Property TailFile::FileName(
+core::PropertyBuilder::createProperty("File to Tail")
+->withDescription("Fully-qualified filename of the file that should be 
tailed when using single file mode, or a file regex when using multifile mode")
+->isRequired(true)
+->build());
+
+core::Property TailFile::StateFile(
+core::PropertyBuilder::createProperty("State File")
+->withDescription("DEPRECATED. Only use it for state migration from 
the legacy state file.")
+->isRequired(false)
+->withDefaultValue("TailFileState")
+->build());
+
+core::Property TailFile::Delimiter(
+core::PropertyBuilder::createProperty("Input Delimiter")
+->withDescription("Specifies the character that should be used for 
delimiting the data being tailed"
+ "from the incoming file. If none is specified, data will be ingested 
as it becomes available.")
+->isRequired(false)
+->withDefaultValue("\\n")
+->build());
 
 core::Property TailFile::TailMode(
-core::PropertyBuilder::createProperty("tail-mode", "Tailing 
Mode")->withDescription(
-"Specifies the tail file mode. In 'Single file' mode only a single 
file will be watched. "
+core::PropertyBuilder::createProperty("tail-mode", "Tailing Mode")
+->withDescription("Specifies the tail file mode. In 'Single file' mode 
only a single file will be watched. "
 "In 'Multiple file' mode a regex may be used. Note that in multiple 
file mode we will still continue to watch for rollover on the initial set of 
watched files. "
 "The Regex used to locate multiple files will be run during the 
schedule phrase. Note that if rotated files are matched by the regex, those 
files will be tailed.")->isRequired(true)
-->withAllowableValue("Single 
file")->withAllowableValue("Multiple file")->withDefaultValue("Single 
file")->build());
-
-core::Property 
TailFile::BaseDirectory(core::PropertyBuilder::createProperty("tail-base-directory",
 "Base Directory")->isRequired(false)->build());
+->withAllowableValue("Single 
file")->withAllowableValue("Multiple file")->withDefaultValue("Single file")
+->build());
+
+core::Property TailFile::BaseDirectory(
+core::PropertyBuilder::createProperty("tail-base-directory", "Base 
Directory")
+->withDescription("Base directory used to look for files to tail. This 
property is required when using Multiple file mode.")
+->isRequired(false)
+->build());
+
+core::Property TailFile::RecursiveLookup(
+core::PropertyBuilder::createProperty("Recursive lookup")
+->withDescription("When using Multiple file mode, this property 
determines whether files are tailed in "
+"child directories of the Base Directory or not.")
+->isRequired(false)
+->withDefaultValue(false)
+->build());
+
+core::Property TailFile::LookupFrequency(
+core::PropertyBuilder::createProperty("Lookup frequency")
+->withDescription("When using Multiple file mode, this property 
specifies the minimum duration "
+"the processor will wait between looking for new files to tail in the 
Base Directory.")
+->isRequired(false)
+

[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-12 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r439388761



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -335,170 +563,210 @@ bool TailFile::storeState(const 
std::shared_ptr& context)
   return true;
 }
 
-static bool sortTailMatchedFileItem(TailMatchedFileItem i, TailMatchedFileItem 
j) {
-  return (i.modifiedTime < j.modifiedTime);
-}
-void TailFile::checkRollOver(const std::shared_ptr& 
context, TailState , const std::string _file_name) {
-  struct stat statbuf;
-  std::vector matchedFiles;
-  std::string fullPath = file.path_ + utils::file::FileUtils::get_separator() 
+ file.current_file_name_;
-
-  if (stat(fullPath.c_str(), ) == 0) {
-logger_->log_trace("Searching for files rolled over");
-std::string pattern = file.current_file_name_;
-std::size_t found = file.current_file_name_.find_last_of(".");
-if (found != std::string::npos)
-  pattern = file.current_file_name_.substr(0, found);
-
-// Callback, called for each file entry in the listed directory
-// Return value is used to break (false) or continue (true) listing
-auto lambda = [&](const std::string& path, const std::string& filename) -> 
bool {
-  struct stat sb;
-  std::string fileFullName = path + 
utils::file::FileUtils::get_separator() + filename;
-  if ((fileFullName.find(pattern) != std::string::npos) && 
stat(fileFullName.c_str(), ) == 0) {
-uint64_t candidateModTime = ((uint64_t) (sb.st_mtime) * 1000);
-if (candidateModTime >= file.currentTailFileModificationTime_) {
-  logging::LOG_TRACE(logger_) << "File " << filename << " (short name 
" << file.current_file_name_ <<
-  ") disk mod time " << candidateModTime << ", struct mod time " << 
file.currentTailFileModificationTime_ << ", size on disk " << sb.st_size << ", 
position " << file.currentTailFilePosition_;
-  if (filename == file.current_file_name_ && candidateModTime == 
file.currentTailFileModificationTime_ &&
-  sb.st_size == file.currentTailFilePosition_) {
-return true;  // Skip the current file as a candidate in case it 
wasn't updated
+std::vector TailFile::findRotatedFiles(const TailState ) 
const {
+  logger_->log_debug("Searching for files rolled over; last read time is %llu",
+  
std::chrono::time_point_cast(state.last_read_time_).time_since_epoch().count());
+
+  std::size_t last_dot_position = state.file_name_.find_last_of('.');
+  std::string base_name = state.file_name_.substr(0, last_dot_position);
+  std::string pattern = 
utils::StringUtils::replaceOne(rolling_filename_pattern_, "${filename}", 
base_name);
+
+  std::vector matched_files_with_mtime;
+  auto collect_matching_files = [&](const std::string , const std::string 
_name) -> bool {
+if (file_name != state.file_name_ && 
utils::Regex::matchesFullInput(pattern, file_name)) {
+  std::string full_file_name = path + 
utils::file::FileUtils::get_separator() + file_name;
+  TailStateWithMtime::TimePoint 
mtime{utils::file::FileUtils::last_write_time_point(full_file_name)};
+  logger_->log_debug("File %s with mtime %llu matches rolling filename 
pattern %s", file_name, mtime.time_since_epoch().count(), pattern);
+  if (mtime >= 
std::chrono::time_point_cast(state.last_read_time_)) {
+logger_->log_debug("File %s has mtime >= last read time, so we are 
going to read it", file_name);
+matched_files_with_mtime.emplace_back(TailState{path, file_name}, 
mtime);
+  }
+}
+return true;
+  };
+
+  utils::file::FileUtils::list_dir(state.path_, collect_matching_files, 
logger_, false);
+
+  std::sort(matched_files_with_mtime.begin(), matched_files_with_mtime.end(), 
[](const TailStateWithMtime , const TailStateWithMtime ) {
+return std::tie(left.mtime_, left.tail_state_.file_name_) <
+   std::tie(right.mtime_, right.tail_state_.file_name_);
+  });
+
+  if (!matched_files_with_mtime.empty() && state.position_ > 0) {
+TailState _rotated_file = matched_files_with_mtime[0].tail_state_;
+std::string full_file_name = first_rotated_file.fileNameWithPath();
+if (utils::file::FileUtils::file_size(full_file_name) >= state.position_) {
+  uint64_t checksum = 
utils::file::FileUtils::computeChecksum(full_file_name, state.position_);
+  if (checksum == state.checksum_) {
+first_rotated_file.position_ = state.position_;
+first_rotated_file.checksum_ = state.checksum_;
   }
-  TailMatchedFileItem item;
-  item.fileName = filename;
-  item.modifiedTime = ((uint64_t) (sb.st_mtime) * 1000);
-  matchedFiles.push_back(item);
 }
   }
-  return true;};
-
-utils::file::FileUtils::list_dir(file.path_, lambda, logger_, false);
 
-if (matchedFiles.size() < 1) {
-  logger_->log_debug("No newer files found in directory!");
-  return;
-}
+  std::vector matched_files;
+  

[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-12 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r439385766



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -335,170 +563,210 @@ bool TailFile::storeState(const 
std::shared_ptr& context)
   return true;
 }
 
-static bool sortTailMatchedFileItem(TailMatchedFileItem i, TailMatchedFileItem 
j) {
-  return (i.modifiedTime < j.modifiedTime);
-}
-void TailFile::checkRollOver(const std::shared_ptr& 
context, TailState , const std::string _file_name) {
-  struct stat statbuf;
-  std::vector matchedFiles;
-  std::string fullPath = file.path_ + utils::file::FileUtils::get_separator() 
+ file.current_file_name_;
-
-  if (stat(fullPath.c_str(), ) == 0) {
-logger_->log_trace("Searching for files rolled over");
-std::string pattern = file.current_file_name_;
-std::size_t found = file.current_file_name_.find_last_of(".");
-if (found != std::string::npos)
-  pattern = file.current_file_name_.substr(0, found);
-
-// Callback, called for each file entry in the listed directory
-// Return value is used to break (false) or continue (true) listing
-auto lambda = [&](const std::string& path, const std::string& filename) -> 
bool {
-  struct stat sb;
-  std::string fileFullName = path + 
utils::file::FileUtils::get_separator() + filename;
-  if ((fileFullName.find(pattern) != std::string::npos) && 
stat(fileFullName.c_str(), ) == 0) {
-uint64_t candidateModTime = ((uint64_t) (sb.st_mtime) * 1000);
-if (candidateModTime >= file.currentTailFileModificationTime_) {
-  logging::LOG_TRACE(logger_) << "File " << filename << " (short name 
" << file.current_file_name_ <<
-  ") disk mod time " << candidateModTime << ", struct mod time " << 
file.currentTailFileModificationTime_ << ", size on disk " << sb.st_size << ", 
position " << file.currentTailFilePosition_;
-  if (filename == file.current_file_name_ && candidateModTime == 
file.currentTailFileModificationTime_ &&
-  sb.st_size == file.currentTailFilePosition_) {
-return true;  // Skip the current file as a candidate in case it 
wasn't updated
+std::vector TailFile::findRotatedFiles(const TailState ) 
const {
+  logger_->log_debug("Searching for files rolled over; last read time is %llu",
+  
std::chrono::time_point_cast(state.last_read_time_).time_since_epoch().count());
+
+  std::size_t last_dot_position = state.file_name_.find_last_of('.');
+  std::string base_name = state.file_name_.substr(0, last_dot_position);
+  std::string pattern = 
utils::StringUtils::replaceOne(rolling_filename_pattern_, "${filename}", 
base_name);
+
+  std::vector matched_files_with_mtime;
+  auto collect_matching_files = [&](const std::string , const std::string 
_name) -> bool {
+if (file_name != state.file_name_ && 
utils::Regex::matchesFullInput(pattern, file_name)) {
+  std::string full_file_name = path + 
utils::file::FileUtils::get_separator() + file_name;
+  TailStateWithMtime::TimePoint 
mtime{utils::file::FileUtils::last_write_time_point(full_file_name)};
+  logger_->log_debug("File %s with mtime %llu matches rolling filename 
pattern %s", file_name, mtime.time_since_epoch().count(), pattern);
+  if (mtime >= 
std::chrono::time_point_cast(state.last_read_time_)) {
+logger_->log_debug("File %s has mtime >= last read time, so we are 
going to read it", file_name);
+matched_files_with_mtime.emplace_back(TailState{path, file_name}, 
mtime);
+  }
+}
+return true;
+  };
+
+  utils::file::FileUtils::list_dir(state.path_, collect_matching_files, 
logger_, false);
+
+  std::sort(matched_files_with_mtime.begin(), matched_files_with_mtime.end(), 
[](const TailStateWithMtime , const TailStateWithMtime ) {
+return std::tie(left.mtime_, left.tail_state_.file_name_) <
+   std::tie(right.mtime_, right.tail_state_.file_name_);
+  });
+
+  if (!matched_files_with_mtime.empty() && state.position_ > 0) {
+TailState _rotated_file = matched_files_with_mtime[0].tail_state_;
+std::string full_file_name = first_rotated_file.fileNameWithPath();
+if (utils::file::FileUtils::file_size(full_file_name) >= state.position_) {
+  uint64_t checksum = 
utils::file::FileUtils::computeChecksum(full_file_name, state.position_);
+  if (checksum == state.checksum_) {
+first_rotated_file.position_ = state.position_;
+first_rotated_file.checksum_ = state.checksum_;
   }
-  TailMatchedFileItem item;
-  item.fileName = filename;
-  item.modifiedTime = ((uint64_t) (sb.st_mtime) * 1000);
-  matchedFiles.push_back(item);
 }
   }
-  return true;};
-
-utils::file::FileUtils::list_dir(file.path_, lambda, logger_, false);
 
-if (matchedFiles.size() < 1) {
-  logger_->log_debug("No newer files found in directory!");
-  return;
-}
+  std::vector matched_files;
+  

[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-12 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r439386010



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -335,170 +563,210 @@ bool TailFile::storeState(const 
std::shared_ptr& context)
   return true;
 }
 
-static bool sortTailMatchedFileItem(TailMatchedFileItem i, TailMatchedFileItem 
j) {
-  return (i.modifiedTime < j.modifiedTime);
-}
-void TailFile::checkRollOver(const std::shared_ptr& 
context, TailState , const std::string _file_name) {
-  struct stat statbuf;
-  std::vector matchedFiles;
-  std::string fullPath = file.path_ + utils::file::FileUtils::get_separator() 
+ file.current_file_name_;
-
-  if (stat(fullPath.c_str(), ) == 0) {
-logger_->log_trace("Searching for files rolled over");
-std::string pattern = file.current_file_name_;
-std::size_t found = file.current_file_name_.find_last_of(".");
-if (found != std::string::npos)
-  pattern = file.current_file_name_.substr(0, found);
-
-// Callback, called for each file entry in the listed directory
-// Return value is used to break (false) or continue (true) listing
-auto lambda = [&](const std::string& path, const std::string& filename) -> 
bool {
-  struct stat sb;
-  std::string fileFullName = path + 
utils::file::FileUtils::get_separator() + filename;
-  if ((fileFullName.find(pattern) != std::string::npos) && 
stat(fileFullName.c_str(), ) == 0) {
-uint64_t candidateModTime = ((uint64_t) (sb.st_mtime) * 1000);
-if (candidateModTime >= file.currentTailFileModificationTime_) {
-  logging::LOG_TRACE(logger_) << "File " << filename << " (short name 
" << file.current_file_name_ <<
-  ") disk mod time " << candidateModTime << ", struct mod time " << 
file.currentTailFileModificationTime_ << ", size on disk " << sb.st_size << ", 
position " << file.currentTailFilePosition_;
-  if (filename == file.current_file_name_ && candidateModTime == 
file.currentTailFileModificationTime_ &&
-  sb.st_size == file.currentTailFilePosition_) {
-return true;  // Skip the current file as a candidate in case it 
wasn't updated
+std::vector TailFile::findRotatedFiles(const TailState ) 
const {
+  logger_->log_debug("Searching for files rolled over; last read time is %llu",
+  
std::chrono::time_point_cast(state.last_read_time_).time_since_epoch().count());
+
+  std::size_t last_dot_position = state.file_name_.find_last_of('.');
+  std::string base_name = state.file_name_.substr(0, last_dot_position);
+  std::string pattern = 
utils::StringUtils::replaceOne(rolling_filename_pattern_, "${filename}", 
base_name);
+
+  std::vector matched_files_with_mtime;
+  auto collect_matching_files = [&](const std::string , const std::string 
_name) -> bool {
+if (file_name != state.file_name_ && 
utils::Regex::matchesFullInput(pattern, file_name)) {
+  std::string full_file_name = path + 
utils::file::FileUtils::get_separator() + file_name;
+  TailStateWithMtime::TimePoint 
mtime{utils::file::FileUtils::last_write_time_point(full_file_name)};
+  logger_->log_debug("File %s with mtime %llu matches rolling filename 
pattern %s", file_name, mtime.time_since_epoch().count(), pattern);
+  if (mtime >= 
std::chrono::time_point_cast(state.last_read_time_)) {
+logger_->log_debug("File %s has mtime >= last read time, so we are 
going to read it", file_name);
+matched_files_with_mtime.emplace_back(TailState{path, file_name}, 
mtime);
+  }
+}
+return true;
+  };
+
+  utils::file::FileUtils::list_dir(state.path_, collect_matching_files, 
logger_, false);
+
+  std::sort(matched_files_with_mtime.begin(), matched_files_with_mtime.end(), 
[](const TailStateWithMtime , const TailStateWithMtime ) {
+return std::tie(left.mtime_, left.tail_state_.file_name_) <
+   std::tie(right.mtime_, right.tail_state_.file_name_);
+  });
+
+  if (!matched_files_with_mtime.empty() && state.position_ > 0) {
+TailState _rotated_file = matched_files_with_mtime[0].tail_state_;
+std::string full_file_name = first_rotated_file.fileNameWithPath();
+if (utils::file::FileUtils::file_size(full_file_name) >= state.position_) {
+  uint64_t checksum = 
utils::file::FileUtils::computeChecksum(full_file_name, state.position_);
+  if (checksum == state.checksum_) {
+first_rotated_file.position_ = state.position_;
+first_rotated_file.checksum_ = state.checksum_;
   }
-  TailMatchedFileItem item;
-  item.fileName = filename;
-  item.modifiedTime = ((uint64_t) (sb.st_mtime) * 1000);
-  matchedFiles.push_back(item);
 }
   }
-  return true;};
-
-utils::file::FileUtils::list_dir(file.path_, lambda, logger_, false);
 
-if (matchedFiles.size() < 1) {
-  logger_->log_debug("No newer files found in directory!");
-  return;
-}
+  std::vector matched_files;
+  

[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-12 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r439385828



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -335,170 +563,210 @@ bool TailFile::storeState(const 
std::shared_ptr& context)
   return true;
 }
 
-static bool sortTailMatchedFileItem(TailMatchedFileItem i, TailMatchedFileItem 
j) {
-  return (i.modifiedTime < j.modifiedTime);
-}
-void TailFile::checkRollOver(const std::shared_ptr& 
context, TailState , const std::string _file_name) {
-  struct stat statbuf;
-  std::vector matchedFiles;
-  std::string fullPath = file.path_ + utils::file::FileUtils::get_separator() 
+ file.current_file_name_;
-
-  if (stat(fullPath.c_str(), ) == 0) {
-logger_->log_trace("Searching for files rolled over");
-std::string pattern = file.current_file_name_;
-std::size_t found = file.current_file_name_.find_last_of(".");
-if (found != std::string::npos)
-  pattern = file.current_file_name_.substr(0, found);
-
-// Callback, called for each file entry in the listed directory
-// Return value is used to break (false) or continue (true) listing
-auto lambda = [&](const std::string& path, const std::string& filename) -> 
bool {
-  struct stat sb;
-  std::string fileFullName = path + 
utils::file::FileUtils::get_separator() + filename;
-  if ((fileFullName.find(pattern) != std::string::npos) && 
stat(fileFullName.c_str(), ) == 0) {
-uint64_t candidateModTime = ((uint64_t) (sb.st_mtime) * 1000);
-if (candidateModTime >= file.currentTailFileModificationTime_) {
-  logging::LOG_TRACE(logger_) << "File " << filename << " (short name 
" << file.current_file_name_ <<
-  ") disk mod time " << candidateModTime << ", struct mod time " << 
file.currentTailFileModificationTime_ << ", size on disk " << sb.st_size << ", 
position " << file.currentTailFilePosition_;
-  if (filename == file.current_file_name_ && candidateModTime == 
file.currentTailFileModificationTime_ &&
-  sb.st_size == file.currentTailFilePosition_) {
-return true;  // Skip the current file as a candidate in case it 
wasn't updated
+std::vector TailFile::findRotatedFiles(const TailState ) 
const {
+  logger_->log_debug("Searching for files rolled over; last read time is %llu",
+  
std::chrono::time_point_cast(state.last_read_time_).time_since_epoch().count());
+
+  std::size_t last_dot_position = state.file_name_.find_last_of('.');
+  std::string base_name = state.file_name_.substr(0, last_dot_position);
+  std::string pattern = 
utils::StringUtils::replaceOne(rolling_filename_pattern_, "${filename}", 
base_name);
+
+  std::vector matched_files_with_mtime;
+  auto collect_matching_files = [&](const std::string , const std::string 
_name) -> bool {
+if (file_name != state.file_name_ && 
utils::Regex::matchesFullInput(pattern, file_name)) {
+  std::string full_file_name = path + 
utils::file::FileUtils::get_separator() + file_name;
+  TailStateWithMtime::TimePoint 
mtime{utils::file::FileUtils::last_write_time_point(full_file_name)};
+  logger_->log_debug("File %s with mtime %llu matches rolling filename 
pattern %s", file_name, mtime.time_since_epoch().count(), pattern);
+  if (mtime >= 
std::chrono::time_point_cast(state.last_read_time_)) {
+logger_->log_debug("File %s has mtime >= last read time, so we are 
going to read it", file_name);
+matched_files_with_mtime.emplace_back(TailState{path, file_name}, 
mtime);
+  }
+}
+return true;
+  };
+
+  utils::file::FileUtils::list_dir(state.path_, collect_matching_files, 
logger_, false);
+
+  std::sort(matched_files_with_mtime.begin(), matched_files_with_mtime.end(), 
[](const TailStateWithMtime , const TailStateWithMtime ) {
+return std::tie(left.mtime_, left.tail_state_.file_name_) <
+   std::tie(right.mtime_, right.tail_state_.file_name_);
+  });
+
+  if (!matched_files_with_mtime.empty() && state.position_ > 0) {
+TailState _rotated_file = matched_files_with_mtime[0].tail_state_;
+std::string full_file_name = first_rotated_file.fileNameWithPath();
+if (utils::file::FileUtils::file_size(full_file_name) >= state.position_) {
+  uint64_t checksum = 
utils::file::FileUtils::computeChecksum(full_file_name, state.position_);
+  if (checksum == state.checksum_) {
+first_rotated_file.position_ = state.position_;
+first_rotated_file.checksum_ = state.checksum_;
   }
-  TailMatchedFileItem item;
-  item.fileName = filename;
-  item.modifiedTime = ((uint64_t) (sb.st_mtime) * 1000);
-  matchedFiles.push_back(item);
 }
   }
-  return true;};
-
-utils::file::FileUtils::list_dir(file.path_, lambda, logger_, false);
 
-if (matchedFiles.size() < 1) {
-  logger_->log_debug("No newer files found in directory!");
-  return;
-}
+  std::vector matched_files;
+  

[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-12 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r439385627



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -204,52 +418,78 @@ void TailFile::parseStateFileLine(char *buf) {
   }
 
   std::string value = equal;
-  key = trimRight(key);
-  value = trimRight(value);
+  key = utils::StringUtils::trimRight(key);
+  value = utils::StringUtils::trimRight(value);
 
   if (key == "FILENAME") {
 std::string fileLocation, fileName;
 if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, 
fileName)) {
   logger_->log_debug("State migration received path %s, file %s", 
fileLocation, fileName);
-  tail_states_.insert(std::make_pair(fileName, TailState { fileLocation, 
fileName, 0, 0 }));
+  state.emplace(fileName, TailState{fileLocation, fileName});
 } else {
-  tail_states_.insert(std::make_pair(value, TailState { fileLocation, 
value, 0, 0 }));
+  state.emplace(value, TailState{fileLocation, value});
 }
   }
   if (key == "POSITION") {
 // for backwards compatibility
-if (tail_states_.size() != 1) {
+if (tail_states_.size() != std::size_t{1}) {
   throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, 
"Incompatible state file types");
 }
 const auto position = std::stoull(value);
 logger_->log_debug("Received position %d", position);

Review comment:
   done

##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -260,72 +500,60 @@ bool TailFile::recoverState(const 
std::shared_ptr& context
   try {
 const std::string& current = state_map.at("file." + std::to_string(i) 
+ ".current");
 uint64_t position = std::stoull(state_map.at("file." + 
std::to_string(i) + ".position"));
+uint64_t checksum = readOptionalUint64(state_map, "file." + 
std::to_string(i) + ".checksum");
 
 std::string fileLocation, fileName;
 if (utils::file::PathUtils::getFileNameAndPath(current, fileLocation, 
fileName)) {
   logger_->log_debug("Received path %s, file %s", fileLocation, 
fileName);
-  new_tail_states.emplace(fileName, TailState { fileLocation, 
fileName, position, 0 });
+  new_tail_states.emplace(current, TailState{fileLocation, fileName, 
position, std::chrono::system_clock::time_point{}, checksum});
 } else {
-  new_tail_states.emplace(current, TailState { fileLocation, current, 
position, 0 });
+  new_tail_states.emplace(current, TailState{fileLocation, current, 
position, std::chrono::system_clock::time_point{}, checksum});
 }
   } catch (...) {
 continue;
   }
 }
-state_load_success = true;
-tail_states_ = std::move(new_tail_states);
 for (const auto& s : tail_states_) {
-  logger_->log_debug("TailState %s: %s, %s, %llu, %llu", s.first, 
s.second.path_, s.second.current_file_name_, s.second.currentTailFilePosition_, 
s.second.currentTailFileModificationTime_);
+  logger_->log_debug("TailState %s: %s, %s, %llu, %llu",

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-12 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r439385943



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -335,170 +563,210 @@ bool TailFile::storeState(const 
std::shared_ptr& context)
   return true;
 }
 
-static bool sortTailMatchedFileItem(TailMatchedFileItem i, TailMatchedFileItem 
j) {
-  return (i.modifiedTime < j.modifiedTime);
-}
-void TailFile::checkRollOver(const std::shared_ptr& 
context, TailState , const std::string _file_name) {
-  struct stat statbuf;
-  std::vector matchedFiles;
-  std::string fullPath = file.path_ + utils::file::FileUtils::get_separator() 
+ file.current_file_name_;
-
-  if (stat(fullPath.c_str(), ) == 0) {
-logger_->log_trace("Searching for files rolled over");
-std::string pattern = file.current_file_name_;
-std::size_t found = file.current_file_name_.find_last_of(".");
-if (found != std::string::npos)
-  pattern = file.current_file_name_.substr(0, found);
-
-// Callback, called for each file entry in the listed directory
-// Return value is used to break (false) or continue (true) listing
-auto lambda = [&](const std::string& path, const std::string& filename) -> 
bool {
-  struct stat sb;
-  std::string fileFullName = path + 
utils::file::FileUtils::get_separator() + filename;
-  if ((fileFullName.find(pattern) != std::string::npos) && 
stat(fileFullName.c_str(), ) == 0) {
-uint64_t candidateModTime = ((uint64_t) (sb.st_mtime) * 1000);
-if (candidateModTime >= file.currentTailFileModificationTime_) {
-  logging::LOG_TRACE(logger_) << "File " << filename << " (short name 
" << file.current_file_name_ <<
-  ") disk mod time " << candidateModTime << ", struct mod time " << 
file.currentTailFileModificationTime_ << ", size on disk " << sb.st_size << ", 
position " << file.currentTailFilePosition_;
-  if (filename == file.current_file_name_ && candidateModTime == 
file.currentTailFileModificationTime_ &&
-  sb.st_size == file.currentTailFilePosition_) {
-return true;  // Skip the current file as a candidate in case it 
wasn't updated
+std::vector TailFile::findRotatedFiles(const TailState ) 
const {
+  logger_->log_debug("Searching for files rolled over; last read time is %llu",
+  
std::chrono::time_point_cast(state.last_read_time_).time_since_epoch().count());
+
+  std::size_t last_dot_position = state.file_name_.find_last_of('.');
+  std::string base_name = state.file_name_.substr(0, last_dot_position);
+  std::string pattern = 
utils::StringUtils::replaceOne(rolling_filename_pattern_, "${filename}", 
base_name);
+
+  std::vector matched_files_with_mtime;
+  auto collect_matching_files = [&](const std::string , const std::string 
_name) -> bool {
+if (file_name != state.file_name_ && 
utils::Regex::matchesFullInput(pattern, file_name)) {
+  std::string full_file_name = path + 
utils::file::FileUtils::get_separator() + file_name;
+  TailStateWithMtime::TimePoint 
mtime{utils::file::FileUtils::last_write_time_point(full_file_name)};
+  logger_->log_debug("File %s with mtime %llu matches rolling filename 
pattern %s", file_name, mtime.time_since_epoch().count(), pattern);
+  if (mtime >= 
std::chrono::time_point_cast(state.last_read_time_)) {
+logger_->log_debug("File %s has mtime >= last read time, so we are 
going to read it", file_name);
+matched_files_with_mtime.emplace_back(TailState{path, file_name}, 
mtime);
+  }
+}
+return true;
+  };
+
+  utils::file::FileUtils::list_dir(state.path_, collect_matching_files, 
logger_, false);
+
+  std::sort(matched_files_with_mtime.begin(), matched_files_with_mtime.end(), 
[](const TailStateWithMtime , const TailStateWithMtime ) {
+return std::tie(left.mtime_, left.tail_state_.file_name_) <
+   std::tie(right.mtime_, right.tail_state_.file_name_);
+  });
+
+  if (!matched_files_with_mtime.empty() && state.position_ > 0) {
+TailState _rotated_file = matched_files_with_mtime[0].tail_state_;
+std::string full_file_name = first_rotated_file.fileNameWithPath();
+if (utils::file::FileUtils::file_size(full_file_name) >= state.position_) {
+  uint64_t checksum = 
utils::file::FileUtils::computeChecksum(full_file_name, state.position_);
+  if (checksum == state.checksum_) {
+first_rotated_file.position_ = state.position_;
+first_rotated_file.checksum_ = state.checksum_;
   }
-  TailMatchedFileItem item;
-  item.fileName = filename;
-  item.modifiedTime = ((uint64_t) (sb.st_mtime) * 1000);
-  matchedFiles.push_back(item);
 }
   }
-  return true;};
-
-utils::file::FileUtils::list_dir(file.path_, lambda, logger_, false);
 
-if (matchedFiles.size() < 1) {
-  logger_->log_debug("No newer files found in directory!");
-  return;
-}
+  std::vector matched_files;
+  

[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-12 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r439385438



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -40,60 +35,283 @@
 #include 
 #include 
 
+#include "io/CRCStream.h"
 #include "utils/file/FileUtils.h"
 #include "utils/file/PathUtils.h"
 #include "utils/TimeUtil.h"
 #include "utils/StringUtils.h"
 #include "utils/RegexUtils.h"
-#ifdef HAVE_REGEX_CPP
-#include 
-#else
-#include 
-#endif
 #include "TailFile.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 
-#ifndef S_ISDIR
-#define S_ISDIR(mode)  (((mode) & S_IFMT) == S_IFDIR)
-#endif
-
-#if defined(__clang__)
-#pragma clang diagnostic push
-#pragma clang diagnostic ignored "-Wsign-compare"
-#elif defined(__GNUC__) || defined(__GNUG__)
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Wsign-compare"
-#endif
-
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace processors {
 
-core::Property TailFile::FileName("File to Tail", "Fully-qualified filename of 
the file that should be tailed when using single file mode, or a file regex 
when using multifile mode", "");
-core::Property TailFile::StateFile("State File", "DEPRECATED. Only use it for 
state migration from the legacy state file.",
-   "TailFileState");
-core::Property TailFile::Delimiter("Input Delimiter", "Specifies the character 
that should be used for delimiting the data being tailed"
-   "from the incoming file."
-   "If none is specified, data will be 
ingested as it becomes available.",
-   "");
+core::Property TailFile::FileName(
+core::PropertyBuilder::createProperty("File to Tail")
+->withDescription("Fully-qualified filename of the file that should be 
tailed when using single file mode, or a file regex when using multifile mode")
+->isRequired(true)
+->build());
+
+core::Property TailFile::StateFile(
+core::PropertyBuilder::createProperty("State File")
+->withDescription("DEPRECATED. Only use it for state migration from 
the legacy state file.")
+->isRequired(false)
+->withDefaultValue("TailFileState")
+->build());
+
+core::Property TailFile::Delimiter(
+core::PropertyBuilder::createProperty("Input Delimiter")
+->withDescription("Specifies the character that should be used for 
delimiting the data being tailed"
+ "from the incoming file. If none is specified, data will be ingested 
as it becomes available.")
+->isRequired(false)
+->withDefaultValue("\\n")
+->build());
 
 core::Property TailFile::TailMode(
-core::PropertyBuilder::createProperty("tail-mode", "Tailing 
Mode")->withDescription(
-"Specifies the tail file mode. In 'Single file' mode only a single 
file will be watched. "
+core::PropertyBuilder::createProperty("tail-mode", "Tailing Mode")
+->withDescription("Specifies the tail file mode. In 'Single file' mode 
only a single file will be watched. "
 "In 'Multiple file' mode a regex may be used. Note that in multiple 
file mode we will still continue to watch for rollover on the initial set of 
watched files. "
 "The Regex used to locate multiple files will be run during the 
schedule phrase. Note that if rotated files are matched by the regex, those 
files will be tailed.")->isRequired(true)
-->withAllowableValue("Single 
file")->withAllowableValue("Multiple file")->withDefaultValue("Single 
file")->build());
-
-core::Property 
TailFile::BaseDirectory(core::PropertyBuilder::createProperty("tail-base-directory",
 "Base Directory")->isRequired(false)->build());
+->withAllowableValue("Single 
file")->withAllowableValue("Multiple file")->withDefaultValue("Single file")
+->build());
+
+core::Property TailFile::BaseDirectory(
+core::PropertyBuilder::createProperty("tail-base-directory", "Base 
Directory")
+->withDescription("Base directory used to look for files to tail. This 
property is required when using Multiple file mode.")
+->isRequired(false)
+->build());
+
+core::Property TailFile::RecursiveLookup(
+core::PropertyBuilder::createProperty("Recursive lookup")
+->withDescription("When using Multiple file mode, this property 
determines whether files are tailed in "
+"child directories of the Base Directory or not.")
+->isRequired(false)
+->withDefaultValue(false)
+->build());
+
+core::Property TailFile::LookupFrequency(
+core::PropertyBuilder::createProperty("Lookup frequency")
+->withDescription("When using Multiple file mode, this property 
specifies the minimum duration "
+"the processor will wait between looking for new files to tail in the 
Base Directory.")
+->isRequired(false)
+

[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-12 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r439384441



##
File path: libminifi/test/unit/StringUtilsTests.cpp
##
@@ -338,3 +338,30 @@ TEST_CASE("TestStringUtils::testJoinPackNegative", "[test 
join_pack negative]")
   == "rvalue c string, c string, rval std::string, std::string, 
char array");
 }
  */
+
+TEST_CASE("StringUtils::replaceOne works correctly", "[replaceOne]") {
+  REQUIRE(utils::StringUtils::replaceOne("", "x", "y") == "");
+  REQUIRE(utils::StringUtils::replaceOne("banana", "a", "_") == "b_nana");
+  REQUIRE(utils::StringUtils::replaceOne("banana", "b", "_") == "_anana");
+  REQUIRE(utils::StringUtils::replaceOne("banana", "x", "y") == "banana");
+  REQUIRE(utils::StringUtils::replaceOne("banana", "an", "") == "bana");
+  REQUIRE(utils::StringUtils::replaceOne("banana", "an", "AN") == "bANana");
+  REQUIRE(utils::StringUtils::replaceOne("banana", "an", "***") == "b***ana");
+  REQUIRE(utils::StringUtils::replaceOne("banana", "banana", "kiwi") == 
"kiwi");
+  REQUIRE(utils::StringUtils::replaceOne("banana", "banana", "grapefruit") == 
"grapefruit");
+}
+
+TEST_CASE("StringUtils::replaceAll works correctly", "[replaceAll]") {
+  auto replaceAll = [](std::string input, const std::string , const 
std::string ) -> std::string {
+return utils::StringUtils::replaceAll(input, from, to);
+  };

Review comment:
   The reason this lambda is there is that `replaceAll` takes its first 
argument as a non-const reference.  I could make this clearer by copying 
explicity, eg.
   ```c++
 auto replaceAll = [](const std::string , const std::string , 
const std::string ) -> std::string {
   std::string input_copy{input};
   return utils::StringUtils::replaceAll(input_copy, from, to);
 };
   ```
   Would you prefer that?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-12 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r439382655



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -40,60 +35,283 @@
 #include 
 #include 
 
+#include "io/CRCStream.h"
 #include "utils/file/FileUtils.h"
 #include "utils/file/PathUtils.h"
 #include "utils/TimeUtil.h"
 #include "utils/StringUtils.h"
 #include "utils/RegexUtils.h"
-#ifdef HAVE_REGEX_CPP
-#include 
-#else
-#include 
-#endif
 #include "TailFile.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 
-#ifndef S_ISDIR
-#define S_ISDIR(mode)  (((mode) & S_IFMT) == S_IFDIR)
-#endif
-
-#if defined(__clang__)
-#pragma clang diagnostic push
-#pragma clang diagnostic ignored "-Wsign-compare"
-#elif defined(__GNUC__) || defined(__GNUG__)
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Wsign-compare"
-#endif
-
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace processors {
 
-core::Property TailFile::FileName("File to Tail", "Fully-qualified filename of 
the file that should be tailed when using single file mode, or a file regex 
when using multifile mode", "");
-core::Property TailFile::StateFile("State File", "DEPRECATED. Only use it for 
state migration from the legacy state file.",
-   "TailFileState");
-core::Property TailFile::Delimiter("Input Delimiter", "Specifies the character 
that should be used for delimiting the data being tailed"
-   "from the incoming file."
-   "If none is specified, data will be 
ingested as it becomes available.",
-   "");
+core::Property TailFile::FileName(
+core::PropertyBuilder::createProperty("File to Tail")
+->withDescription("Fully-qualified filename of the file that should be 
tailed when using single file mode, or a file regex when using multifile mode")
+->isRequired(true)
+->build());
+
+core::Property TailFile::StateFile(
+core::PropertyBuilder::createProperty("State File")
+->withDescription("DEPRECATED. Only use it for state migration from 
the legacy state file.")
+->isRequired(false)
+->withDefaultValue("TailFileState")
+->build());
+
+core::Property TailFile::Delimiter(
+core::PropertyBuilder::createProperty("Input Delimiter")
+->withDescription("Specifies the character that should be used for 
delimiting the data being tailed"
+ "from the incoming file. If none is specified, data will be ingested 
as it becomes available.")
+->isRequired(false)
+->withDefaultValue("\\n")
+->build());
 
 core::Property TailFile::TailMode(
-core::PropertyBuilder::createProperty("tail-mode", "Tailing 
Mode")->withDescription(
-"Specifies the tail file mode. In 'Single file' mode only a single 
file will be watched. "
+core::PropertyBuilder::createProperty("tail-mode", "Tailing Mode")
+->withDescription("Specifies the tail file mode. In 'Single file' mode 
only a single file will be watched. "
 "In 'Multiple file' mode a regex may be used. Note that in multiple 
file mode we will still continue to watch for rollover on the initial set of 
watched files. "
 "The Regex used to locate multiple files will be run during the 
schedule phrase. Note that if rotated files are matched by the regex, those 
files will be tailed.")->isRequired(true)
-->withAllowableValue("Single 
file")->withAllowableValue("Multiple file")->withDefaultValue("Single 
file")->build());
-
-core::Property 
TailFile::BaseDirectory(core::PropertyBuilder::createProperty("tail-base-directory",
 "Base Directory")->isRequired(false)->build());
+->withAllowableValue("Single 
file")->withAllowableValue("Multiple file")->withDefaultValue("Single file")
+->build());
+
+core::Property TailFile::BaseDirectory(
+core::PropertyBuilder::createProperty("tail-base-directory", "Base 
Directory")
+->withDescription("Base directory used to look for files to tail. This 
property is required when using Multiple file mode.")
+->isRequired(false)
+->build());
+
+core::Property TailFile::RecursiveLookup(
+core::PropertyBuilder::createProperty("Recursive lookup")
+->withDescription("When using Multiple file mode, this property 
determines whether files are tailed in "
+"child directories of the Base Directory or not.")
+->isRequired(false)
+->withDefaultValue(false)
+->build());
+
+core::Property TailFile::LookupFrequency(
+core::PropertyBuilder::createProperty("Lookup frequency")
+->withDescription("When using Multiple file mode, this property 
specifies the minimum duration "
+"the processor will wait between looking for new files to tail in the 
Base Directory.")
+->isRequired(false)
+

[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-12 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r439382107



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -40,60 +35,283 @@
 #include 
 #include 
 
+#include "io/CRCStream.h"
 #include "utils/file/FileUtils.h"
 #include "utils/file/PathUtils.h"
 #include "utils/TimeUtil.h"
 #include "utils/StringUtils.h"
 #include "utils/RegexUtils.h"
-#ifdef HAVE_REGEX_CPP
-#include 
-#else
-#include 
-#endif
 #include "TailFile.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 
-#ifndef S_ISDIR
-#define S_ISDIR(mode)  (((mode) & S_IFMT) == S_IFDIR)
-#endif
-
-#if defined(__clang__)
-#pragma clang diagnostic push
-#pragma clang diagnostic ignored "-Wsign-compare"
-#elif defined(__GNUC__) || defined(__GNUG__)
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Wsign-compare"
-#endif
-
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace processors {
 
-core::Property TailFile::FileName("File to Tail", "Fully-qualified filename of 
the file that should be tailed when using single file mode, or a file regex 
when using multifile mode", "");
-core::Property TailFile::StateFile("State File", "DEPRECATED. Only use it for 
state migration from the legacy state file.",
-   "TailFileState");
-core::Property TailFile::Delimiter("Input Delimiter", "Specifies the character 
that should be used for delimiting the data being tailed"
-   "from the incoming file."
-   "If none is specified, data will be 
ingested as it becomes available.",
-   "");
+core::Property TailFile::FileName(
+core::PropertyBuilder::createProperty("File to Tail")
+->withDescription("Fully-qualified filename of the file that should be 
tailed when using single file mode, or a file regex when using multifile mode")
+->isRequired(true)
+->build());
+
+core::Property TailFile::StateFile(
+core::PropertyBuilder::createProperty("State File")
+->withDescription("DEPRECATED. Only use it for state migration from 
the legacy state file.")
+->isRequired(false)
+->withDefaultValue("TailFileState")
+->build());
+
+core::Property TailFile::Delimiter(
+core::PropertyBuilder::createProperty("Input Delimiter")
+->withDescription("Specifies the character that should be used for 
delimiting the data being tailed"
+ "from the incoming file. If none is specified, data will be ingested 
as it becomes available.")
+->isRequired(false)
+->withDefaultValue("\\n")
+->build());
 
 core::Property TailFile::TailMode(
-core::PropertyBuilder::createProperty("tail-mode", "Tailing 
Mode")->withDescription(
-"Specifies the tail file mode. In 'Single file' mode only a single 
file will be watched. "
+core::PropertyBuilder::createProperty("tail-mode", "Tailing Mode")
+->withDescription("Specifies the tail file mode. In 'Single file' mode 
only a single file will be watched. "
 "In 'Multiple file' mode a regex may be used. Note that in multiple 
file mode we will still continue to watch for rollover on the initial set of 
watched files. "
 "The Regex used to locate multiple files will be run during the 
schedule phrase. Note that if rotated files are matched by the regex, those 
files will be tailed.")->isRequired(true)
-->withAllowableValue("Single 
file")->withAllowableValue("Multiple file")->withDefaultValue("Single 
file")->build());
-
-core::Property 
TailFile::BaseDirectory(core::PropertyBuilder::createProperty("tail-base-directory",
 "Base Directory")->isRequired(false)->build());
+->withAllowableValue("Single 
file")->withAllowableValue("Multiple file")->withDefaultValue("Single file")
+->build());
+
+core::Property TailFile::BaseDirectory(
+core::PropertyBuilder::createProperty("tail-base-directory", "Base 
Directory")
+->withDescription("Base directory used to look for files to tail. This 
property is required when using Multiple file mode.")
+->isRequired(false)
+->build());
+
+core::Property TailFile::RecursiveLookup(
+core::PropertyBuilder::createProperty("Recursive lookup")
+->withDescription("When using Multiple file mode, this property 
determines whether files are tailed in "
+"child directories of the Base Directory or not.")
+->isRequired(false)
+->withDefaultValue(false)
+->build());
+
+core::Property TailFile::LookupFrequency(
+core::PropertyBuilder::createProperty("Lookup frequency")
+->withDescription("When using Multiple file mode, this property 
specifies the minimum duration "
+"the processor will wait between looking for new files to tail in the 
Base Directory.")
+->isRequired(false)
+

[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-12 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r439379651



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -17,19 +17,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include 
 #include 
-#include 
-#include 
-
-#include 
 #ifndef WIN32
 #include 

Review comment:
   I have removed it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-10 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r438233905



##
File path: libminifi/src/core/ProcessSession.cpp
##
@@ -469,6 +470,14 @@ void ProcessSession::import(std::string source, const 
std::shared_ptr> crcStream;
+  if (crc) {
+crcStream = utils::make_unique>(stream, 
*crc);
+stream = crcStream.get();
+  }

Review comment:
   I have rewritten the code to use `ProcessSession::write()` instead of 
`import()`, as you suggested.  I have also marked this version of `import()` 
deprecated (it was only used by TailFile in our code).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-10 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r438229933



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -403,102 +403,141 @@ void TailFile::onTrigger(const 
std::shared_ptr , c
 state_file_ = st_file + "." + getUUIDStr();
   }
   if (!this->state_recovered_) {
-state_recovered_ = true;
-// recover the state if we have not done so
 this->recoverState(context);
+state_recovered_ = true;
+  }
+
+  if (tail_mode_ == Mode::MULTIPLE) {
+checkForRemovedFiles();
+checkForNewFiles();
   }
 
-  /**
-   * iterate over file states. may modify them
-   */
+  bool did_something = false;
+
+  // iterate over file states. may modify them
   for (auto  : tail_states_) {
-auto fileLocation = state.second.path_;
-
-checkRollOver(context, state.second, state.first);
-std::string fullPath = fileLocation + 
utils::file::FileUtils::get_separator() + state.second.current_file_name_;
-struct stat statbuf;
-
-logger_->log_debug("Tailing file %s from %llu", fullPath, 
state.second.currentTailFilePosition_);
-if (stat(fullPath.c_str(), ) == 0) {
-  if ((uint64_t) statbuf.st_size <= state.second.currentTailFilePosition_) 
{
-logger_->log_trace("Current pos: %llu", 
state.second.currentTailFilePosition_);
-logger_->log_trace("%s", "there are no new input for the current tail 
file");
-context->yield();
-return;
+did_something |= processFile(context, session, state.first, state.second);
+  }
+
+  if (!did_something) {
+yield();
+  }
+}
+
+  bool TailFile::processFile(const std::shared_ptr 
,
+ const std::shared_ptr 
,
+ const std::string ,
+ TailState ) {
+std::string full_file_name = state.fileNameWithPath();
+
+bool did_something = false;
+
+if (utils::file::FileUtils::file_size(full_file_name) < state.position_) {
+  std::vector rotated_file_states = findRotatedFiles(state);
+  for (TailState _state : rotated_file_states) {
+did_something |= processSingleFile(context, session, 
file_state.file_name_, file_state);
   }
-  std::size_t found = state.first.find_last_of(".");
-  std::string baseName = state.first.substr(0, found);
-  std::string extension = state.first.substr(found + 1);
-
-  if (!delimiter_.empty()) {
-char delim = delimiter_.c_str()[0];
-if (delim == '\\') {
-  if (delimiter_.size() > 1) {
-switch (delimiter_.c_str()[1]) {
-  case 'r':
-delim = '\r';
-break;
-  case 't':
-delim = '\t';
-break;
-  case 'n':
-delim = '\n';
-break;
-  case '\\':
-delim = '\\';
-break;
-  default:
-// previous behavior
-break;
-}
-  }
-}
-logger_->log_debug("Looking for delimiter 0x%X", delim);
-std::vector> flowFiles;
-session->import(fullPath, flowFiles, 
state.second.currentTailFilePosition_, delim);
-logger_->log_info("%u flowfiles were received from TailFile input", 
flowFiles.size());
-
-for (auto ffr : flowFiles) {
-  logger_->log_info("TailFile %s for %u bytes", state.first, 
ffr->getSize());
-  std::string logName = baseName + "." + 
std::to_string(state.second.currentTailFilePosition_) + "-" + 
std::to_string(state.second.currentTailFilePosition_ + ffr->getSize()) + "." + 
extension;
-  ffr->updateKeyedAttribute(PATH, fileLocation);
-  ffr->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
-  ffr->updateKeyedAttribute(FILENAME, logName);
-  session->transfer(ffr, Success);
-  state.second.currentTailFilePosition_ += ffr->getSize() + 1;
-  storeState(context);
-}
+  state.position_ = 0;
+  state.checksum_ = 0;
+}
 
-  } else {
-std::shared_ptr flowFile = 
std::static_pointer_cast(session->create());
-if (flowFile) {
-  flowFile->updateKeyedAttribute(PATH, fileLocation);
-  flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
-  session->import(fullPath, flowFile, true, 
state.second.currentTailFilePosition_);
-  session->transfer(flowFile, Success);
-  logger_->log_info("TailFile %s for %llu bytes", state.first, 
flowFile->getSize());
-  std::string logName = baseName + "." + 
std::to_string(state.second.currentTailFilePosition_) + "-" + 
std::to_string(state.second.currentTailFilePosition_ + flowFile->getSize()) + 
"."
-  + extension;
-  flowFile->updateKeyedAttribute(FILENAME, logName);
-  state.second.currentTailFilePosition_ += flowFile->getSize();
-  storeState(context);
-}
+  

[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-09 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r436862565



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -403,102 +435,136 @@ void TailFile::onTrigger(const 
std::shared_ptr , c
 state_file_ = st_file + "." + getUUIDStr();
   }
   if (!this->state_recovered_) {
-state_recovered_ = true;
-// recover the state if we have not done so
 this->recoverState(context);
+state_recovered_ = true;
+  }
+
+  if (tail_mode_ == Mode::MULTIPLE) {
+checkForRemovedFiles();
+checkForNewFiles();
   }
 
-  /**
-   * iterate over file states. may modify them
-   */
+  // iterate over file states. may modify them
   for (auto  : tail_states_) {
-auto fileLocation = state.second.path_;
-
-checkRollOver(context, state.second, state.first);
-std::string fullPath = fileLocation + 
utils::file::FileUtils::get_separator() + state.second.current_file_name_;
-struct stat statbuf;
-
-logger_->log_debug("Tailing file %s from %llu", fullPath, 
state.second.currentTailFilePosition_);
-if (stat(fullPath.c_str(), ) == 0) {
-  if ((uint64_t) statbuf.st_size <= state.second.currentTailFilePosition_) 
{
-logger_->log_trace("Current pos: %llu", 
state.second.currentTailFilePosition_);
-logger_->log_trace("%s", "there are no new input for the current tail 
file");
-context->yield();
-return;
-  }
-  std::size_t found = state.first.find_last_of(".");
-  std::string baseName = state.first.substr(0, found);
-  std::string extension = state.first.substr(found + 1);
-
-  if (!delimiter_.empty()) {
-char delim = delimiter_.c_str()[0];
-if (delim == '\\') {
-  if (delimiter_.size() > 1) {
-switch (delimiter_.c_str()[1]) {
-  case 'r':
-delim = '\r';
-break;
-  case 't':
-delim = '\t';
-break;
-  case 'n':
-delim = '\n';
-break;
-  case '\\':
-delim = '\\';
-break;
-  default:
-// previous behavior
-break;
-}
-  }
-}
-logger_->log_debug("Looking for delimiter 0x%X", delim);
-std::vector> flowFiles;
-session->import(fullPath, flowFiles, 
state.second.currentTailFilePosition_, delim);
-logger_->log_info("%u flowfiles were received from TailFile input", 
flowFiles.size());
-
-for (auto ffr : flowFiles) {
-  logger_->log_info("TailFile %s for %u bytes", state.first, 
ffr->getSize());
-  std::string logName = baseName + "." + 
std::to_string(state.second.currentTailFilePosition_) + "-" + 
std::to_string(state.second.currentTailFilePosition_ + ffr->getSize()) + "." + 
extension;
-  ffr->updateKeyedAttribute(PATH, fileLocation);
-  ffr->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
-  ffr->updateKeyedAttribute(FILENAME, logName);
-  session->transfer(ffr, Success);
-  state.second.currentTailFilePosition_ += ffr->getSize() + 1;
-  storeState(context);
-}
+processFile(context, session, state.first, state.second);
+  }
 
-  } else {
-std::shared_ptr flowFile = 
std::static_pointer_cast(session->create());
-if (flowFile) {
-  flowFile->updateKeyedAttribute(PATH, fileLocation);
-  flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
-  session->import(fullPath, flowFile, true, 
state.second.currentTailFilePosition_);
-  session->transfer(flowFile, Success);
-  logger_->log_info("TailFile %s for %llu bytes", state.first, 
flowFile->getSize());
-  std::string logName = baseName + "." + 
std::to_string(state.second.currentTailFilePosition_) + "-" + 
std::to_string(state.second.currentTailFilePosition_ + flowFile->getSize()) + 
"."
-  + extension;
-  flowFile->updateKeyedAttribute(FILENAME, logName);
-  state.second.currentTailFilePosition_ += flowFile->getSize();
-  storeState(context);
-}
-  }
-  state.second.currentTailFileModificationTime_ = ((uint64_t) 
(statbuf.st_mtime) * 1000);
-} else {
-  logger_->log_warn("Unable to stat file %s", fullPath);
+  if (!session->existsFlowFileInRelationship(Success)) {
+yield();
+  }
+}
+
+void TailFile::processFile(const std::shared_ptr 
,
+   const std::shared_ptr 
,
+   const std::string ,
+   TailState ) {
+  if (utils::file::FileUtils::file_size(state.fileNameWithPath()) < 
state.position_) {
+processRotatedFiles(context, session, state);
+  }
+
+  processSingleFile(context, session, fileName, state);
+}
+
+void TailFile::processRotatedFiles(const std::shared_ptr 
,
+   

[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-04 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r435329784



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -204,52 +230,65 @@ void TailFile::parseStateFileLine(char *buf) {
   }
 
   std::string value = equal;
-  key = trimRight(key);
-  value = trimRight(value);
+  key = utils::StringUtils::trimRight(key);
+  value = utils::StringUtils::trimRight(value);
 
   if (key == "FILENAME") {
 std::string fileLocation, fileName;
 if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, 
fileName)) {
   logger_->log_debug("State migration received path %s, file %s", 
fileLocation, fileName);
-  tail_states_.insert(std::make_pair(fileName, TailState { fileLocation, 
fileName, 0, 0 }));
+  state.insert(std::make_pair(fileName, TailState{fileLocation, fileName, 
0, 0, 0, 0}));
 } else {
-  tail_states_.insert(std::make_pair(value, TailState { fileLocation, 
value, 0, 0 }));
+  state.insert(std::make_pair(value, TailState{fileLocation, value, 0, 0, 
0, 0}));
 }
   }
   if (key == "POSITION") {
 // for backwards compatibility
-if (tail_states_.size() != 1) {
+if (tail_states_.size() != (std::size_t) 1) {
   throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, 
"Incompatible state file types");
 }
 const auto position = std::stoull(value);
 logger_->log_debug("Received position %d", position);
-tail_states_.begin()->second.currentTailFilePosition_ = position;
+state.begin()->second.position_ = position;
   }
   if (key.find(CURRENT_STR) == 0) {
 const auto file = key.substr(strlen(CURRENT_STR));
 std::string fileLocation, fileName;
 if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, 
fileName)) {
-  tail_states_[file].path_ = fileLocation;
-  tail_states_[file].current_file_name_ = fileName;
+  state[file].path_ = fileLocation;
+  state[file].file_name_ = fileName;
 } else {
   throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "State file 
contains an invalid file name");
 }
   }
 
   if (key.find(POSITION_STR) == 0) {
 const auto file = key.substr(strlen(POSITION_STR));
-tail_states_[file].currentTailFilePosition_ = std::stoull(value);
+state[file].position_ = std::stoull(value);
   }
 }
 
+bool TailFile::recoverState(const std::shared_ptr& 
context) {
+  std::map new_tail_states;
+  bool state_load_success = getStateFromStateManager(new_tail_states) ||
+getStateFromLegacyStateFile(new_tail_states);
+  if (!state_load_success) {
+return false;
+  }
 
+  logger_->log_debug("load state succeeded");
 
-bool TailFile::recoverState(const std::shared_ptr& 
context) {
-  bool state_load_success = false;
+  tail_states_ = std::move(new_tail_states);
+
+  // Save the state to the state manager
+  storeState(context);

Review comment:
   In some cases (migration from the old state file to the state manager, 
or when the File to Tail property changes in Single file mode) the state could 
have changed, so it is necessary to write it.  I could limit the write to those 
cases, but this seemed safer, and this part of the code only runs when the 
processor is (re-)scheduled, so a small performance gain didn't seem worth the 
extra complexity.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-04 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r435325700



##
File path: extensions/standard-processors/tests/unit/TailFileTests.cpp
##
@@ -47,7 +47,33 @@ static std::string NEWLINE_FILE = ""  // NOLINT
 static const char *TMP_FILE = "minifi-tmpfile.txt";
 static const char *STATE_FILE = "minifi-state-file.txt";
 
-TEST_CASE("TailFileWithDelimiter", "[tailfiletest2]") {
+namespace {
+  std::string createTempFile(const std::string , const std::string 
_name, const std::string ,
+  std::ios_base::openmode open_mode = std::ios::out | std::ios::binary) {
+std::string full_file_name = directory + 
utils::file::FileUtils::get_separator() + file_name;
+std::ofstream tmpfile{full_file_name, open_mode};
+tmpfile << contents;
+return full_file_name;
+  }
+
+  void appendTempFile(const std::string , const std::string 
_name, const std::string ,
+  std::ios_base::openmode open_mode = std::ios::app | std::ios::binary) {
+createTempFile(directory, file_name, contents, open_mode);
+  }
+
+  void removeFile(const std::string , const std::string _name) {
+std::string full_file_name = directory + 
utils::file::FileUtils::get_separator() + file_name;
+std::remove(full_file_name.c_str());
+  }
+
+  void renameTempFile(const std::string , const std::string 
_file_name, const std::string _file_name) {
+std::string old_full_file_name = directory + 
utils::file::FileUtils::get_separator() + old_file_name;
+std::string new_full_file_name = directory + 
utils::file::FileUtils::get_separator() + new_file_name;
+rename(old_full_file_name.c_str(), new_full_file_name.c_str());
+  }
+}  // namespace

Review comment:
   fixed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-04 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r435325310



##
File path: libminifi/test/unit/FileUtilsTests.cpp
##
@@ -178,3 +179,157 @@ TEST_CASE("TestFileUtils::getFullPath", 
"[TestGetFullPath]") {
   REQUIRE(tempDir1 == 
utils::file::PathUtils::getFullPath(".\\test2\\..\\test1"));
 #endif
 }
+
+TEST_CASE("FileUtils::last_write_time works", "[last_write_time]") {
+  uint64_t timeBeforeWrite = getTimeMillis() / 1000;
+
+  TestController testController;
+
+  char format[] = "/tmp/gt.XX";
+  std::string dir = testController.createTempDirectory(format);
+
+  std::string test_file = dir + FileUtils::get_separator() + "test.txt";
+  REQUIRE(FileUtils::last_write_time(test_file) == 0);
+
+  std::ofstream test_file_stream(test_file);
+  test_file_stream << "foo\n";
+  test_file_stream.flush();
+
+  uint64_t timeAfterFirstWrite = getTimeMillis() / 1000;
+
+  uint64_t first_mtime = FileUtils::last_write_time(test_file);
+  REQUIRE(first_mtime >= timeBeforeWrite);
+  REQUIRE(first_mtime <= timeAfterFirstWrite);
+
+  test_file_stream << "bar\n";
+  test_file_stream.flush();
+
+  uint64_t timeAfterSecondWrite = getTimeMillis() / 1000;
+
+  uint64_t second_mtime = FileUtils::last_write_time(test_file);
+  REQUIRE(second_mtime >= first_mtime);
+  REQUIRE(second_mtime >= timeAfterFirstWrite);
+  REQUIRE(second_mtime <= timeAfterSecondWrite);
+
+  test_file_stream.close();
+  uint64_t third_mtime = FileUtils::last_write_time(test_file);
+  REQUIRE(third_mtime == second_mtime);

Review comment:
   There is no write to `test_file` between the two calls to 
`FileUtils::last_write_time` (and the stream was flushed earlier), so the 
mtimes should be identical, I think.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-04 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r435323181



##
File path: libminifi/src/utils/file/FileUtils.cpp
##
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/file/FileUtils.h"
+
+#include 
+
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+namespace file {
+
+  uint64_t FileUtils::computeChecksum(std::string file_name, uint64_t 
up_to_position) {
+constexpr uint64_t BUFFER_SIZE = 4096u;
+std::array buffer;
+
+std::ifstream stream{file_name, std::ios::in | std::ios::binary};
+
+uint64_t checksum = 0;
+uint64_t remaining_bytes_to_be_read = up_to_position;
+
+while (stream && remaining_bytes_to_be_read > 0) {
+  using std::min;  // for min(), to work around 
https://docs.microsoft.com/en-us/windows/win32/multimedia/min

Review comment:
   I've changed it to `(std::min)`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-04 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r435322591



##
File path: libminifi/src/utils/file/FileUtils.cpp
##
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/file/FileUtils.h"
+
+#include 
+
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+namespace file {
+
+  uint64_t FileUtils::computeChecksum(std::string file_name, uint64_t 
up_to_position) {
+constexpr uint64_t BUFFER_SIZE = 4096u;
+std::array buffer;
+
+std::ifstream stream{file_name, std::ios::in | std::ios::binary};
+
+uint64_t checksum = 0;
+uint64_t remaining_bytes_to_be_read = up_to_position;
+
+while (stream && remaining_bytes_to_be_read > 0) {
+  using std::min;  // for min(), to work around 
https://docs.microsoft.com/en-us/windows/win32/multimedia/min
+  stream.read(buffer.data(), min(BUFFER_SIZE, remaining_bytes_to_be_read));
+  uint64_t bytes_read = stream.gcount();
+  checksum = crc32(checksum, reinterpret_cast(buffer.data()), bytes_read);
+  remaining_bytes_to_be_read -= bytes_read;
+}
+
+return checksum;
+  }

Review comment:
   fixed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-04 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r435321877



##
File path: libminifi/src/utils/RegexUtils.cpp
##
@@ -158,6 +158,16 @@ const std::vector& Regex::getResult() const { 
return results_; }
 
 const std::string& Regex::getSuffix() const { return suffix_; }
 
+bool Regex::matchesFullInput(const std::string , const std::string 
) {
+#ifdef NO_MORE_REGFREEE
+  std::regex re{regex};
+  return std::regex_match(input, re);
+#else
+  Regex rgx('^' + regex + '$');
+  return rgx.match(input);
+#endif

Review comment:
   `Regex` is still needed, unfortunately, as we still support gcc 4.8, 
which has a broken `` implementation.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-04 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r435321877



##
File path: libminifi/src/utils/RegexUtils.cpp
##
@@ -158,6 +158,16 @@ const std::vector& Regex::getResult() const { 
return results_; }
 
 const std::string& Regex::getSuffix() const { return suffix_; }
 
+bool Regex::matchesFullInput(const std::string , const std::string 
) {
+#ifdef NO_MORE_REGFREEE
+  std::regex re{regex};
+  return std::regex_match(input, re);
+#else
+  Regex rgx('^' + regex + '$');
+  return rgx.match(input);
+#endif

Review comment:
   `Regex` is still needed, unfortunately, as we still support gcc 4.8, 
which has a broken  implementation.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-04 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r435320644



##
File path: libminifi/src/core/ProcessSession.cpp
##
@@ -469,6 +470,14 @@ void ProcessSession::import(std::string source, const 
std::shared_ptr> crcStream;
+  if (crc) {
+crcStream = utils::make_unique>(stream, 
*crc);
+stream = crcStream.get();
+  }

Review comment:
   I agree this is a bit nasty.  Let me think of a better way, too, and 
then we'll discuss it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-04 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r435320821



##
File path: libminifi/src/core/ProcessSession.cpp
##
@@ -574,15 +589,16 @@ void ProcessSession::import(const std::string& source, 
std::vector::min() || zlen > 
std::numeric_limits::max()) {
 logger_->log_error("narrowing conversion failed");
   }
-  const int len = zlen;
+  const bool foundDelimiter = (delimiterPos != end);
+  const int len = (includeDelimiter && foundDelimiter) ? zlen + 1 : 
zlen;

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-04 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r435319933



##
File path: extensions/standard-processors/processors/TailFile.h
##
@@ -33,38 +33,40 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-// TailFile Class
+struct TailState {
+  TailState(std::string path, std::string file_name, uint64_t position, 
uint64_t timestamp,
+uint64_t mtime, uint64_t checksum)
+  : path_(std::move(path)), file_name_(std::move(file_name)), 
position_(position), timestamp_(timestamp),
+mtime_(mtime), checksum_(checksum) {}
 
+  TailState() : TailState("", "", 0, 0, 0, 0) {}
 
-typedef struct {
-  std::string path_;
-  std::string current_file_name_;
-  uint64_t currentTailFilePosition_;
-  uint64_t currentTailFileModificationTime_;
-} TailState;
-
-// Matched File Item for Roll over check
-typedef struct {
-  std::string fileName;
-  uint64_t modifiedTime;
-} TailMatchedFileItem;
+  std::string fileNameWithPath() const {
+return path_ + utils::file::FileUtils::get_separator() + file_name_;
+  }
 
+  std::string path_;
+  std::string file_name_;
+  uint64_t position_;
+  uint64_t timestamp_;
+  uint64_t mtime_;

Review comment:
   I have moved `mtime_` out of `TailState` as it is not really part of the 
state, and I'm using an ad-hoc `{TailState, mtime}` struct in the one place 
where it is needed (`findRotatedFiles`).
   
   I have also renamed `timestamp_` to `last_read_time_`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-04 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r435318031



##
File path: libminifi/include/io/CRCStream.h
##
@@ -45,6 +45,7 @@ class CRCStream : public BaseStream {
* it will exceed our lifetime.
*/
   explicit CRCStream(T *child_stream);
+  explicit CRCStream(T *child_stream, uint64_t initial_crc);

Review comment:
   I've removed it





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-04 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r435316889



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -403,102 +435,136 @@ void TailFile::onTrigger(const 
std::shared_ptr , c
 state_file_ = st_file + "." + getUUIDStr();
   }
   if (!this->state_recovered_) {
-state_recovered_ = true;
-// recover the state if we have not done so
 this->recoverState(context);
+state_recovered_ = true;
+  }
+
+  if (tail_mode_ == Mode::MULTIPLE) {
+checkForRemovedFiles();
+checkForNewFiles();
   }
 
-  /**
-   * iterate over file states. may modify them
-   */
+  // iterate over file states. may modify them
   for (auto  : tail_states_) {
-auto fileLocation = state.second.path_;
-
-checkRollOver(context, state.second, state.first);
-std::string fullPath = fileLocation + 
utils::file::FileUtils::get_separator() + state.second.current_file_name_;
-struct stat statbuf;
-
-logger_->log_debug("Tailing file %s from %llu", fullPath, 
state.second.currentTailFilePosition_);
-if (stat(fullPath.c_str(), ) == 0) {
-  if ((uint64_t) statbuf.st_size <= state.second.currentTailFilePosition_) 
{
-logger_->log_trace("Current pos: %llu", 
state.second.currentTailFilePosition_);
-logger_->log_trace("%s", "there are no new input for the current tail 
file");
-context->yield();
-return;
-  }
-  std::size_t found = state.first.find_last_of(".");
-  std::string baseName = state.first.substr(0, found);
-  std::string extension = state.first.substr(found + 1);
-
-  if (!delimiter_.empty()) {
-char delim = delimiter_.c_str()[0];
-if (delim == '\\') {
-  if (delimiter_.size() > 1) {
-switch (delimiter_.c_str()[1]) {
-  case 'r':
-delim = '\r';
-break;
-  case 't':
-delim = '\t';
-break;
-  case 'n':
-delim = '\n';
-break;
-  case '\\':
-delim = '\\';
-break;
-  default:
-// previous behavior
-break;
-}
-  }
-}
-logger_->log_debug("Looking for delimiter 0x%X", delim);
-std::vector> flowFiles;
-session->import(fullPath, flowFiles, 
state.second.currentTailFilePosition_, delim);
-logger_->log_info("%u flowfiles were received from TailFile input", 
flowFiles.size());
-
-for (auto ffr : flowFiles) {
-  logger_->log_info("TailFile %s for %u bytes", state.first, 
ffr->getSize());
-  std::string logName = baseName + "." + 
std::to_string(state.second.currentTailFilePosition_) + "-" + 
std::to_string(state.second.currentTailFilePosition_ + ffr->getSize()) + "." + 
extension;
-  ffr->updateKeyedAttribute(PATH, fileLocation);
-  ffr->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
-  ffr->updateKeyedAttribute(FILENAME, logName);
-  session->transfer(ffr, Success);
-  state.second.currentTailFilePosition_ += ffr->getSize() + 1;
-  storeState(context);
-}
+processFile(context, session, state.first, state.second);
+  }
 
-  } else {
-std::shared_ptr flowFile = 
std::static_pointer_cast(session->create());
-if (flowFile) {
-  flowFile->updateKeyedAttribute(PATH, fileLocation);
-  flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
-  session->import(fullPath, flowFile, true, 
state.second.currentTailFilePosition_);
-  session->transfer(flowFile, Success);
-  logger_->log_info("TailFile %s for %llu bytes", state.first, 
flowFile->getSize());
-  std::string logName = baseName + "." + 
std::to_string(state.second.currentTailFilePosition_) + "-" + 
std::to_string(state.second.currentTailFilePosition_ + flowFile->getSize()) + 
"."
-  + extension;
-  flowFile->updateKeyedAttribute(FILENAME, logName);
-  state.second.currentTailFilePosition_ += flowFile->getSize();
-  storeState(context);
-}
-  }
-  state.second.currentTailFileModificationTime_ = ((uint64_t) 
(statbuf.st_mtime) * 1000);
-} else {
-  logger_->log_warn("Unable to stat file %s", fullPath);
+  if (!session->existsFlowFileInRelationship(Success)) {
+yield();
+  }
+}
+
+void TailFile::processFile(const std::shared_ptr 
,
+   const std::shared_ptr 
,
+   const std::string ,
+   TailState ) {
+  if (utils::file::FileUtils::file_size(state.fileNameWithPath()) < 
state.position_) {
+processRotatedFiles(context, session, state);
+  }
+
+  processSingleFile(context, session, fileName, state);
+}
+
+void TailFile::processRotatedFiles(const std::shared_ptr 
,
+   

[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-04 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r435311535



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -403,102 +435,136 @@ void TailFile::onTrigger(const 
std::shared_ptr , c
 state_file_ = st_file + "." + getUUIDStr();
   }
   if (!this->state_recovered_) {
-state_recovered_ = true;
-// recover the state if we have not done so
 this->recoverState(context);
+state_recovered_ = true;
+  }
+
+  if (tail_mode_ == Mode::MULTIPLE) {
+checkForRemovedFiles();
+checkForNewFiles();
   }
 
-  /**
-   * iterate over file states. may modify them
-   */
+  // iterate over file states. may modify them
   for (auto  : tail_states_) {
-auto fileLocation = state.second.path_;
-
-checkRollOver(context, state.second, state.first);
-std::string fullPath = fileLocation + 
utils::file::FileUtils::get_separator() + state.second.current_file_name_;
-struct stat statbuf;
-
-logger_->log_debug("Tailing file %s from %llu", fullPath, 
state.second.currentTailFilePosition_);
-if (stat(fullPath.c_str(), ) == 0) {
-  if ((uint64_t) statbuf.st_size <= state.second.currentTailFilePosition_) 
{
-logger_->log_trace("Current pos: %llu", 
state.second.currentTailFilePosition_);
-logger_->log_trace("%s", "there are no new input for the current tail 
file");
-context->yield();
-return;
-  }
-  std::size_t found = state.first.find_last_of(".");
-  std::string baseName = state.first.substr(0, found);
-  std::string extension = state.first.substr(found + 1);
-
-  if (!delimiter_.empty()) {
-char delim = delimiter_.c_str()[0];
-if (delim == '\\') {
-  if (delimiter_.size() > 1) {
-switch (delimiter_.c_str()[1]) {
-  case 'r':
-delim = '\r';
-break;
-  case 't':
-delim = '\t';
-break;
-  case 'n':
-delim = '\n';
-break;
-  case '\\':
-delim = '\\';
-break;
-  default:
-// previous behavior
-break;
-}
-  }
-}
-logger_->log_debug("Looking for delimiter 0x%X", delim);
-std::vector> flowFiles;
-session->import(fullPath, flowFiles, 
state.second.currentTailFilePosition_, delim);
-logger_->log_info("%u flowfiles were received from TailFile input", 
flowFiles.size());
-
-for (auto ffr : flowFiles) {
-  logger_->log_info("TailFile %s for %u bytes", state.first, 
ffr->getSize());
-  std::string logName = baseName + "." + 
std::to_string(state.second.currentTailFilePosition_) + "-" + 
std::to_string(state.second.currentTailFilePosition_ + ffr->getSize()) + "." + 
extension;
-  ffr->updateKeyedAttribute(PATH, fileLocation);
-  ffr->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
-  ffr->updateKeyedAttribute(FILENAME, logName);
-  session->transfer(ffr, Success);
-  state.second.currentTailFilePosition_ += ffr->getSize() + 1;
-  storeState(context);
-}
+processFile(context, session, state.first, state.second);
+  }
 
-  } else {
-std::shared_ptr flowFile = 
std::static_pointer_cast(session->create());
-if (flowFile) {
-  flowFile->updateKeyedAttribute(PATH, fileLocation);
-  flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
-  session->import(fullPath, flowFile, true, 
state.second.currentTailFilePosition_);
-  session->transfer(flowFile, Success);
-  logger_->log_info("TailFile %s for %llu bytes", state.first, 
flowFile->getSize());
-  std::string logName = baseName + "." + 
std::to_string(state.second.currentTailFilePosition_) + "-" + 
std::to_string(state.second.currentTailFilePosition_ + flowFile->getSize()) + 
"."
-  + extension;
-  flowFile->updateKeyedAttribute(FILENAME, logName);
-  state.second.currentTailFilePosition_ += flowFile->getSize();
-  storeState(context);
-}
-  }
-  state.second.currentTailFileModificationTime_ = ((uint64_t) 
(statbuf.st_mtime) * 1000);
-} else {
-  logger_->log_warn("Unable to stat file %s", fullPath);
+  if (!session->existsFlowFileInRelationship(Success)) {
+yield();
+  }
+}
+
+void TailFile::processFile(const std::shared_ptr 
,
+   const std::shared_ptr 
,
+   const std::string ,
+   TailState ) {
+  if (utils::file::FileUtils::file_size(state.fileNameWithPath()) < 
state.position_) {
+processRotatedFiles(context, session, state);
+  }
+
+  processSingleFile(context, session, fileName, state);
+}
+
+void TailFile::processRotatedFiles(const std::shared_ptr 
,
+   

[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-04 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r435310291



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -403,102 +435,136 @@ void TailFile::onTrigger(const 
std::shared_ptr , c
 state_file_ = st_file + "." + getUUIDStr();
   }
   if (!this->state_recovered_) {
-state_recovered_ = true;
-// recover the state if we have not done so
 this->recoverState(context);
+state_recovered_ = true;
+  }
+
+  if (tail_mode_ == Mode::MULTIPLE) {
+checkForRemovedFiles();
+checkForNewFiles();
   }
 
-  /**
-   * iterate over file states. may modify them
-   */
+  // iterate over file states. may modify them
   for (auto  : tail_states_) {
-auto fileLocation = state.second.path_;
-
-checkRollOver(context, state.second, state.first);
-std::string fullPath = fileLocation + 
utils::file::FileUtils::get_separator() + state.second.current_file_name_;
-struct stat statbuf;
-
-logger_->log_debug("Tailing file %s from %llu", fullPath, 
state.second.currentTailFilePosition_);
-if (stat(fullPath.c_str(), ) == 0) {
-  if ((uint64_t) statbuf.st_size <= state.second.currentTailFilePosition_) 
{
-logger_->log_trace("Current pos: %llu", 
state.second.currentTailFilePosition_);
-logger_->log_trace("%s", "there are no new input for the current tail 
file");
-context->yield();
-return;
-  }
-  std::size_t found = state.first.find_last_of(".");
-  std::string baseName = state.first.substr(0, found);
-  std::string extension = state.first.substr(found + 1);
-
-  if (!delimiter_.empty()) {
-char delim = delimiter_.c_str()[0];
-if (delim == '\\') {
-  if (delimiter_.size() > 1) {
-switch (delimiter_.c_str()[1]) {
-  case 'r':
-delim = '\r';
-break;
-  case 't':
-delim = '\t';
-break;
-  case 'n':
-delim = '\n';
-break;
-  case '\\':
-delim = '\\';
-break;
-  default:
-// previous behavior
-break;
-}
-  }
-}
-logger_->log_debug("Looking for delimiter 0x%X", delim);
-std::vector> flowFiles;
-session->import(fullPath, flowFiles, 
state.second.currentTailFilePosition_, delim);
-logger_->log_info("%u flowfiles were received from TailFile input", 
flowFiles.size());
-
-for (auto ffr : flowFiles) {
-  logger_->log_info("TailFile %s for %u bytes", state.first, 
ffr->getSize());
-  std::string logName = baseName + "." + 
std::to_string(state.second.currentTailFilePosition_) + "-" + 
std::to_string(state.second.currentTailFilePosition_ + ffr->getSize()) + "." + 
extension;
-  ffr->updateKeyedAttribute(PATH, fileLocation);
-  ffr->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
-  ffr->updateKeyedAttribute(FILENAME, logName);
-  session->transfer(ffr, Success);
-  state.second.currentTailFilePosition_ += ffr->getSize() + 1;
-  storeState(context);
-}
+processFile(context, session, state.first, state.second);
+  }
 
-  } else {
-std::shared_ptr flowFile = 
std::static_pointer_cast(session->create());
-if (flowFile) {
-  flowFile->updateKeyedAttribute(PATH, fileLocation);
-  flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
-  session->import(fullPath, flowFile, true, 
state.second.currentTailFilePosition_);
-  session->transfer(flowFile, Success);
-  logger_->log_info("TailFile %s for %llu bytes", state.first, 
flowFile->getSize());
-  std::string logName = baseName + "." + 
std::to_string(state.second.currentTailFilePosition_) + "-" + 
std::to_string(state.second.currentTailFilePosition_ + flowFile->getSize()) + 
"."
-  + extension;
-  flowFile->updateKeyedAttribute(FILENAME, logName);
-  state.second.currentTailFilePosition_ += flowFile->getSize();
-  storeState(context);
-}
-  }
-  state.second.currentTailFileModificationTime_ = ((uint64_t) 
(statbuf.st_mtime) * 1000);
-} else {
-  logger_->log_warn("Unable to stat file %s", fullPath);
+  if (!session->existsFlowFileInRelationship(Success)) {
+yield();
+  }
+}
+
+void TailFile::processFile(const std::shared_ptr 
,
+   const std::shared_ptr 
,
+   const std::string ,
+   TailState ) {
+  if (utils::file::FileUtils::file_size(state.fileNameWithPath()) < 
state.position_) {
+processRotatedFiles(context, session, state);
+  }
+
+  processSingleFile(context, session, fileName, state);
+}
+
+void TailFile::processRotatedFiles(const std::shared_ptr 
,
+   

[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-04 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r435309190



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -204,52 +249,78 @@ void TailFile::parseStateFileLine(char *buf) {
   }
 
   std::string value = equal;
-  key = trimRight(key);
-  value = trimRight(value);
+  key = utils::StringUtils::trimRight(key);
+  value = utils::StringUtils::trimRight(value);
 
   if (key == "FILENAME") {
 std::string fileLocation, fileName;
 if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, 
fileName)) {
   logger_->log_debug("State migration received path %s, file %s", 
fileLocation, fileName);
-  tail_states_.insert(std::make_pair(fileName, TailState { fileLocation, 
fileName, 0, 0 }));
+  state.emplace(fileName, TailState{fileLocation, fileName, 0, 0, 0, 0});
 } else {
-  tail_states_.insert(std::make_pair(value, TailState { fileLocation, 
value, 0, 0 }));
+  state.emplace(value, TailState{fileLocation, value, 0, 0, 0, 0});
 }
   }
   if (key == "POSITION") {
 // for backwards compatibility
-if (tail_states_.size() != 1) {
+if (tail_states_.size() != (std::size_t) 1) {
   throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, 
"Incompatible state file types");
 }
 const auto position = std::stoull(value);
 logger_->log_debug("Received position %d", position);
-tail_states_.begin()->second.currentTailFilePosition_ = position;
+state.begin()->second.position_ = position;
   }
   if (key.find(CURRENT_STR) == 0) {
 const auto file = key.substr(strlen(CURRENT_STR));
 std::string fileLocation, fileName;
 if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, 
fileName)) {
-  tail_states_[file].path_ = fileLocation;
-  tail_states_[file].current_file_name_ = fileName;
+  state[file].path_ = fileLocation;
+  state[file].file_name_ = fileName;
 } else {
   throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "State file 
contains an invalid file name");
 }
   }
 
   if (key.find(POSITION_STR) == 0) {
 const auto file = key.substr(strlen(POSITION_STR));
-tail_states_[file].currentTailFilePosition_ = std::stoull(value);
+state[file].position_ = std::stoull(value);
   }
 }
 
+bool TailFile::recoverState(const std::shared_ptr& 
context) {
+  std::map new_tail_states;
+  bool state_load_success = getStateFromStateManager(new_tail_states) ||
+getStateFromLegacyStateFile(new_tail_states);

Review comment:
   thanks





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-04 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r435308461



##
File path: libminifi/src/utils/file/PathUtils.cpp
##
@@ -84,6 +108,19 @@ std::string PathUtils::getFullPath(const std::string& path) 
{
 #endif
 }
 
+  std::string PathUtils::globToRegex(const std::string ) {
+std::string glob_copy = glob;
+replaceAll(glob_copy, ".", "\\.");
+replaceAll(glob_copy, "*", ".*");
+replaceAll(glob_copy, "?", ".");
+return glob_copy;
+  }
+
+  std::string PathUtils::replacePlaceholderWithBaseName(const std::string 
, const std::string& base_name) {
+static const std::string PLACEHOLDER = "${filename}";

Review comment:
   I have moved `replaceOne` to `StringUtils` (and discovered that an 
almost identical `replaceAll` was already there), and inlined this 
`replacePlaceholderWithBaseName` function into TailFile

##
File path: libminifi/src/utils/file/PathUtils.cpp
##
@@ -84,6 +108,19 @@ std::string PathUtils::getFullPath(const std::string& path) 
{
 #endif
 }
 
+  std::string PathUtils::globToRegex(const std::string ) {
+std::string glob_copy = glob;
+replaceAll(glob_copy, ".", "\\.");
+replaceAll(glob_copy, "*", ".*");
+replaceAll(glob_copy, "?", ".");
+return glob_copy;
+  }

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-04 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r435308929



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -335,65 +387,45 @@ bool TailFile::storeState(const 
std::shared_ptr& context)
   return true;
 }
 
-static bool sortTailMatchedFileItem(TailMatchedFileItem i, TailMatchedFileItem 
j) {
-  return (i.modifiedTime < j.modifiedTime);
-}
-void TailFile::checkRollOver(const std::shared_ptr& 
context, TailState , const std::string _file_name) {
-  struct stat statbuf;
-  std::vector matchedFiles;
-  std::string fullPath = file.path_ + utils::file::FileUtils::get_separator() 
+ file.current_file_name_;
-
-  if (stat(fullPath.c_str(), ) == 0) {
-logger_->log_trace("Searching for files rolled over");
-std::string pattern = file.current_file_name_;
-std::size_t found = file.current_file_name_.find_last_of(".");
-if (found != std::string::npos)
-  pattern = file.current_file_name_.substr(0, found);
-
-// Callback, called for each file entry in the listed directory
-// Return value is used to break (false) or continue (true) listing
-auto lambda = [&](const std::string& path, const std::string& filename) -> 
bool {
-  struct stat sb;
-  std::string fileFullName = path + 
utils::file::FileUtils::get_separator() + filename;
-  if ((fileFullName.find(pattern) != std::string::npos) && 
stat(fileFullName.c_str(), ) == 0) {
-uint64_t candidateModTime = ((uint64_t) (sb.st_mtime) * 1000);
-if (candidateModTime >= file.currentTailFileModificationTime_) {
-  logging::LOG_TRACE(logger_) << "File " << filename << " (short name 
" << file.current_file_name_ <<
-  ") disk mod time " << candidateModTime << ", struct mod time " << 
file.currentTailFileModificationTime_ << ", size on disk " << sb.st_size << ", 
position " << file.currentTailFilePosition_;
-  if (filename == file.current_file_name_ && candidateModTime == 
file.currentTailFileModificationTime_ &&
-  sb.st_size == file.currentTailFilePosition_) {
-return true;  // Skip the current file as a candidate in case it 
wasn't updated
-  }
-  TailMatchedFileItem item;
-  item.fileName = filename;
-  item.modifiedTime = ((uint64_t) (sb.st_mtime) * 1000);
-  matchedFiles.push_back(item);
-}
-  }
-  return true;};
+std::vector TailFile::findRotatedFiles(const TailState ) 
const {
+  logger_->log_trace("Searching for files rolled over");
 
-utils::file::FileUtils::list_dir(file.path_, lambda, logger_, false);
+  std::size_t last_dot_position = state.file_name_.find_last_of('.');
+  std::string base_name = state.file_name_.substr(0, last_dot_position);
+  std::string pattern = 
utils::file::PathUtils::replacePlaceholderWithBaseName(rolling_filename_pattern_,
 base_name);
 
-if (matchedFiles.size() < 1) {
-  logger_->log_debug("No newer files found in directory!");
-  return;
+  std::vector matched_files;
+  auto collect_matching_files = [&](const std::string , const std::string 
_name) -> bool {
+if (file_name != state.file_name_ && 
utils::Regex::matchesFullInput(pattern, file_name)) {
+  std::string full_file_name = path + 
utils::file::FileUtils::get_separator() + file_name;
+  uint64_t mtime = utils::file::FileUtils::last_write_time(full_file_name);
+  if (mtime >= state.timestamp_ / 1000) {

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-04 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r435306719



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -204,52 +249,78 @@ void TailFile::parseStateFileLine(char *buf) {
   }
 
   std::string value = equal;
-  key = trimRight(key);
-  value = trimRight(value);
+  key = utils::StringUtils::trimRight(key);
+  value = utils::StringUtils::trimRight(value);
 
   if (key == "FILENAME") {
 std::string fileLocation, fileName;
 if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, 
fileName)) {
   logger_->log_debug("State migration received path %s, file %s", 
fileLocation, fileName);
-  tail_states_.insert(std::make_pair(fileName, TailState { fileLocation, 
fileName, 0, 0 }));
+  state.emplace(fileName, TailState{fileLocation, fileName, 0, 0, 0, 0});
 } else {
-  tail_states_.insert(std::make_pair(value, TailState { fileLocation, 
value, 0, 0 }));
+  state.emplace(value, TailState{fileLocation, value, 0, 0, 0, 0});
 }
   }
   if (key == "POSITION") {
 // for backwards compatibility
-if (tail_states_.size() != 1) {
+if (tail_states_.size() != (std::size_t) 1) {

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-04 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r435306612



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -58,42 +47,108 @@
 #define S_ISDIR(mode)  (((mode) & S_IFMT) == S_IFDIR)
 #endif
 
-#if defined(__clang__)
-#pragma clang diagnostic push
-#pragma clang diagnostic ignored "-Wsign-compare"
-#elif defined(__GNUC__) || defined(__GNUG__)
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Wsign-compare"
-#endif
-
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace processors {
 
-core::Property TailFile::FileName("File to Tail", "Fully-qualified filename of 
the file that should be tailed when using single file mode, or a file regex 
when using multifile mode", "");
-core::Property TailFile::StateFile("State File", "DEPRECATED. Only use it for 
state migration from the legacy state file.",
-   "TailFileState");
-core::Property TailFile::Delimiter("Input Delimiter", "Specifies the character 
that should be used for delimiting the data being tailed"
-   "from the incoming file."
-   "If none is specified, data will be 
ingested as it becomes available.",
-   "");
+core::Property TailFile::FileName(
+core::PropertyBuilder::createProperty("File to Tail")
+->withDescription("Fully-qualified filename of the file that should be 
tailed when using single file mode, or a file regex when using multifile mode")
+->isRequired(true)
+->build());
+
+core::Property TailFile::StateFile(
+core::PropertyBuilder::createProperty("State File")
+->withDescription("DEPRECATED. Only use it for state migration from 
the legacy state file.")
+->isRequired(false)
+->withDefaultValue("TailFileState")
+->build());
+
+core::Property TailFile::Delimiter(
+core::PropertyBuilder::createProperty("Input Delimiter")
+->withDescription("Specifies the character that should be used for 
delimiting the data being tailed"
+ "from the incoming file. If none is specified, data will be ingested 
as it becomes available.")
+->isRequired(false)
+->withDefaultValue("\\n")
+->build());
 
 core::Property TailFile::TailMode(
-core::PropertyBuilder::createProperty("tail-mode", "Tailing 
Mode")->withDescription(
-"Specifies the tail file mode. In 'Single file' mode only a single 
file will be watched. "
+core::PropertyBuilder::createProperty("tail-mode", "Tailing Mode")
+->withDescription("Specifies the tail file mode. In 'Single file' mode 
only a single file will be watched. "
 "In 'Multiple file' mode a regex may be used. Note that in multiple 
file mode we will still continue to watch for rollover on the initial set of 
watched files. "
 "The Regex used to locate multiple files will be run during the 
schedule phrase. Note that if rotated files are matched by the regex, those 
files will be tailed.")->isRequired(true)
-->withAllowableValue("Single 
file")->withAllowableValue("Multiple file")->withDefaultValue("Single 
file")->build());
-
-core::Property 
TailFile::BaseDirectory(core::PropertyBuilder::createProperty("tail-base-directory",
 "Base Directory")->isRequired(false)->build());
+->withAllowableValue("Single 
file")->withAllowableValue("Multiple file")->withDefaultValue("Single file")
+->build());
+
+core::Property TailFile::BaseDirectory(
+core::PropertyBuilder::createProperty("tail-base-directory", "Base 
Directory")
+->withDescription("Base directory used to look for files to tail. This 
property is required when using Multiple file mode.")
+->isRequired(false)
+->build());
+
+core::Property TailFile::RollingFilenamePattern(
+core::PropertyBuilder::createProperty("Rolling Filename Pattern")
+->withDescription("If the file to tail \"rolls over\" as would be the 
case with log files, this filename pattern will be used to "
+"identify files that have rolled over so MiNiFi can read the remaining 
of the rolled-over file and then continue with the new log file. "
+"This pattern supports the wildcard characters * and ?, it also 
supports the notation ${filename} to specify a pattern based on the name of the 
file "
+"(without extension), and will assume that the files that have rolled 
over live in the same directory as the file being tailed.")
+->isRequired(false)
+->withDefaultValue("${filename}.*")
+->build());
 
 core::Relationship TailFile::Success("success", "All files are routed to 
success");
 
 const char *TailFile::CURRENT_STR = "CURRENT.";
 const char *TailFile::POSITION_STR = "POSITION.";
 
+namespace {
+  template
+  bool containsKey(const Container , const Key ) {
+return 

[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-05-25 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r429847276



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -403,102 +403,141 @@ void TailFile::onTrigger(const 
std::shared_ptr , c
 state_file_ = st_file + "." + getUUIDStr();
   }
   if (!this->state_recovered_) {
-state_recovered_ = true;
-// recover the state if we have not done so
 this->recoverState(context);
+state_recovered_ = true;
+  }
+
+  if (tail_mode_ == Mode::MULTIPLE) {
+checkForRemovedFiles();
+checkForNewFiles();
   }
 
-  /**
-   * iterate over file states. may modify them
-   */
+  bool did_something = false;
+
+  // iterate over file states. may modify them
   for (auto  : tail_states_) {
-auto fileLocation = state.second.path_;
-
-checkRollOver(context, state.second, state.first);
-std::string fullPath = fileLocation + 
utils::file::FileUtils::get_separator() + state.second.current_file_name_;
-struct stat statbuf;
-
-logger_->log_debug("Tailing file %s from %llu", fullPath, 
state.second.currentTailFilePosition_);
-if (stat(fullPath.c_str(), ) == 0) {
-  if ((uint64_t) statbuf.st_size <= state.second.currentTailFilePosition_) 
{
-logger_->log_trace("Current pos: %llu", 
state.second.currentTailFilePosition_);
-logger_->log_trace("%s", "there are no new input for the current tail 
file");
-context->yield();
-return;
+did_something |= processFile(context, session, state.first, state.second);

Review comment:
   `any_of` short-circuits, so I can't use it directly; I could do 
something like
   ```c++
 std::vector did_something(tail_states_.size());
 std::transform(tail_states_.begin(), tail_states_.end(), 
std::back_inserter(did_something),
 [this, , ](std::map::value_type ) {
   return processFile(context, session, state.first, state.second);
 });
   
 if (std::none_of(did_something.begin(), did_something.end(), [](bool x){ 
return x; })) {
   yield();
 }
   ```
   but I'm not sure that is better.  Would you prefer this?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-05-22 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r429359617



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -403,102 +403,141 @@ void TailFile::onTrigger(const 
std::shared_ptr , c
 state_file_ = st_file + "." + getUUIDStr();
   }
   if (!this->state_recovered_) {
-state_recovered_ = true;
-// recover the state if we have not done so
 this->recoverState(context);
+state_recovered_ = true;
+  }
+
+  if (tail_mode_ == Mode::MULTIPLE) {
+checkForRemovedFiles();
+checkForNewFiles();
   }
 
-  /**
-   * iterate over file states. may modify them
-   */
+  bool did_something = false;
+
+  // iterate over file states. may modify them
   for (auto  : tail_states_) {
-auto fileLocation = state.second.path_;
-
-checkRollOver(context, state.second, state.first);
-std::string fullPath = fileLocation + 
utils::file::FileUtils::get_separator() + state.second.current_file_name_;
-struct stat statbuf;
-
-logger_->log_debug("Tailing file %s from %llu", fullPath, 
state.second.currentTailFilePosition_);
-if (stat(fullPath.c_str(), ) == 0) {
-  if ((uint64_t) statbuf.st_size <= state.second.currentTailFilePosition_) 
{
-logger_->log_trace("Current pos: %llu", 
state.second.currentTailFilePosition_);
-logger_->log_trace("%s", "there are no new input for the current tail 
file");
-context->yield();
-return;
+did_something |= processFile(context, session, state.first, state.second);
+  }
+
+  if (!did_something) {
+yield();
+  }
+}
+
+  bool TailFile::processFile(const std::shared_ptr 
,
+ const std::shared_ptr 
,
+ const std::string ,
+ TailState ) {
+std::string full_file_name = state.fileNameWithPath();
+
+bool did_something = false;
+
+if (utils::file::FileUtils::file_size(full_file_name) < state.position_) {
+  std::vector rotated_file_states = findRotatedFiles(state);
+  for (TailState _state : rotated_file_states) {
+did_something |= processSingleFile(context, session, 
file_state.file_name_, file_state);
   }
-  std::size_t found = state.first.find_last_of(".");
-  std::string baseName = state.first.substr(0, found);
-  std::string extension = state.first.substr(found + 1);
-
-  if (!delimiter_.empty()) {
-char delim = delimiter_.c_str()[0];
-if (delim == '\\') {
-  if (delimiter_.size() > 1) {
-switch (delimiter_.c_str()[1]) {
-  case 'r':
-delim = '\r';
-break;
-  case 't':
-delim = '\t';
-break;
-  case 'n':
-delim = '\n';
-break;
-  case '\\':
-delim = '\\';
-break;
-  default:
-// previous behavior
-break;
-}
-  }
-}
-logger_->log_debug("Looking for delimiter 0x%X", delim);
-std::vector> flowFiles;
-session->import(fullPath, flowFiles, 
state.second.currentTailFilePosition_, delim);
-logger_->log_info("%u flowfiles were received from TailFile input", 
flowFiles.size());
-
-for (auto ffr : flowFiles) {
-  logger_->log_info("TailFile %s for %u bytes", state.first, 
ffr->getSize());
-  std::string logName = baseName + "." + 
std::to_string(state.second.currentTailFilePosition_) + "-" + 
std::to_string(state.second.currentTailFilePosition_ + ffr->getSize()) + "." + 
extension;
-  ffr->updateKeyedAttribute(PATH, fileLocation);
-  ffr->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
-  ffr->updateKeyedAttribute(FILENAME, logName);
-  session->transfer(ffr, Success);
-  state.second.currentTailFilePosition_ += ffr->getSize() + 1;
-  storeState(context);
-}
+  state.position_ = 0;
+  state.checksum_ = 0;
+}
 
-  } else {
-std::shared_ptr flowFile = 
std::static_pointer_cast(session->create());
-if (flowFile) {
-  flowFile->updateKeyedAttribute(PATH, fileLocation);
-  flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
-  session->import(fullPath, flowFile, true, 
state.second.currentTailFilePosition_);
-  session->transfer(flowFile, Success);
-  logger_->log_info("TailFile %s for %llu bytes", state.first, 
flowFile->getSize());
-  std::string logName = baseName + "." + 
std::to_string(state.second.currentTailFilePosition_) + "-" + 
std::to_string(state.second.currentTailFilePosition_ + flowFile->getSize()) + 
"."
-  + extension;
-  flowFile->updateKeyedAttribute(FILENAME, logName);
-  state.second.currentTailFilePosition_ += flowFile->getSize();
-  storeState(context);
-}
+  

[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-05-22 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r429328320



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -120,56 +158,42 @@ void TailFile::onSchedule(const 
std::shared_ptr ,
   std::string value;
 
   if (context->getProperty(Delimiter.getName(), value)) {
-delimiter_ = value;
+delimiter_ = parseDelimiter(value);
+  }
+
+  if (!context->getProperty(FileName.getName(), file_to_tail_)) {
+throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "File to Tail 
is a required property");
   }
 
   std::string mode;
   context->getProperty(TailMode.getName(), mode);
 
-  std::string file = "";
-  if (!context->getProperty(FileName.getName(), file)) {
-throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "File to Tail 
is a required property");
-  }
   if (mode == "Multiple file") {
-// file is a regex
-std::string base_dir;
-if (!context->getProperty(BaseDirectory.getName(), base_dir)) {
+tail_mode_ = Mode::MULTIPLE;
+
+if (!context->getProperty(BaseDirectory.getName(), base_dir_)) {
   throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Base 
directory is required for multiple tail mode.");
 }
 
-auto fileRegexSelect = [&](const std::string& path, const std::string& 
filename) -> bool {
-  if (acceptFile(file, filename)) {
-tail_states_.insert(std::make_pair(filename, TailState {path, 
filename, 0, 0}));
-  }
-  return true;
-};
-
-utils::file::FileUtils::list_dir(base_dir, fileRegexSelect, logger_, 
false);
+// in multiple mode, we check for new/removed files in every onTrigger
 
   } else {
+tail_mode_ = Mode::SINGLE;
+
 std::string fileLocation, fileName;
-if (utils::file::PathUtils::getFileNameAndPath(file, fileLocation, 
fileName)) {
-  tail_states_.insert(std::make_pair(fileName, TailState { fileLocation, 
fileName, 0, 0 }));
+if (utils::file::PathUtils::getFileNameAndPath(file_to_tail_, 
fileLocation, fileName)) {
+  tail_states_.emplace(fileName, TailState{fileLocation, fileName, 0, 0, 
0, 0});

Review comment:
   I have added a comment





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-05-22 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r429326546



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -204,52 +230,65 @@ void TailFile::parseStateFileLine(char *buf) {
   }
 
   std::string value = equal;
-  key = trimRight(key);
-  value = trimRight(value);
+  key = utils::StringUtils::trimRight(key);
+  value = utils::StringUtils::trimRight(value);
 
   if (key == "FILENAME") {
 std::string fileLocation, fileName;
 if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, 
fileName)) {
   logger_->log_debug("State migration received path %s, file %s", 
fileLocation, fileName);
-  tail_states_.insert(std::make_pair(fileName, TailState { fileLocation, 
fileName, 0, 0 }));
+  state.insert(std::make_pair(fileName, TailState{fileLocation, fileName, 
0, 0, 0, 0}));
 } else {
-  tail_states_.insert(std::make_pair(value, TailState { fileLocation, 
value, 0, 0 }));
+  state.insert(std::make_pair(value, TailState{fileLocation, value, 0, 0, 
0, 0}));
 }
   }
   if (key == "POSITION") {
 // for backwards compatibility
-if (tail_states_.size() != 1) {
+if (tail_states_.size() != (std::size_t) 1) {
   throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, 
"Incompatible state file types");
 }
 const auto position = std::stoull(value);
 logger_->log_debug("Received position %d", position);
-tail_states_.begin()->second.currentTailFilePosition_ = position;
+state.begin()->second.position_ = position;
   }
   if (key.find(CURRENT_STR) == 0) {
 const auto file = key.substr(strlen(CURRENT_STR));
 std::string fileLocation, fileName;
 if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, 
fileName)) {
-  tail_states_[file].path_ = fileLocation;
-  tail_states_[file].current_file_name_ = fileName;
+  state[file].path_ = fileLocation;
+  state[file].file_name_ = fileName;
 } else {
   throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "State file 
contains an invalid file name");
 }
   }
 
   if (key.find(POSITION_STR) == 0) {
 const auto file = key.substr(strlen(POSITION_STR));
-tail_states_[file].currentTailFilePosition_ = std::stoull(value);
+state[file].position_ = std::stoull(value);
   }
 }
 
+bool TailFile::recoverState(const std::shared_ptr& 
context) {
+  std::map new_tail_states;
+  bool state_load_success = getStateFromStateManager(new_tail_states) ||
+getStateFromLegacyStateFile(new_tail_states);
+  if (!state_load_success) {
+return false;
+  }
 
+  logger_->log_debug("load state succeeded");
 
-bool TailFile::recoverState(const std::shared_ptr& 
context) {
-  bool state_load_success = false;
+  tail_states_ = std::move(new_tail_states);
+
+  // Save the state to the state manager
+  storeState(context);

Review comment:
   I have just made some changes here to fix a bug (in Single file mode, 
the old state overwrote the File to Tail property if the property changed but 
the state wasn't cleared), so it makes more sense now than before. :)
   
   In any case, `storeState()` is not expensive, as it only updates the 
StateManager, which has its own schedule for how often to write changes to the 
DB.  Also, `recoverState()` runs only once during the lifetime of the 
processor.  So I think this is fine.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-05-22 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r429326546



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -204,52 +230,65 @@ void TailFile::parseStateFileLine(char *buf) {
   }
 
   std::string value = equal;
-  key = trimRight(key);
-  value = trimRight(value);
+  key = utils::StringUtils::trimRight(key);
+  value = utils::StringUtils::trimRight(value);
 
   if (key == "FILENAME") {
 std::string fileLocation, fileName;
 if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, 
fileName)) {
   logger_->log_debug("State migration received path %s, file %s", 
fileLocation, fileName);
-  tail_states_.insert(std::make_pair(fileName, TailState { fileLocation, 
fileName, 0, 0 }));
+  state.insert(std::make_pair(fileName, TailState{fileLocation, fileName, 
0, 0, 0, 0}));
 } else {
-  tail_states_.insert(std::make_pair(value, TailState { fileLocation, 
value, 0, 0 }));
+  state.insert(std::make_pair(value, TailState{fileLocation, value, 0, 0, 
0, 0}));
 }
   }
   if (key == "POSITION") {
 // for backwards compatibility
-if (tail_states_.size() != 1) {
+if (tail_states_.size() != (std::size_t) 1) {
   throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, 
"Incompatible state file types");
 }
 const auto position = std::stoull(value);
 logger_->log_debug("Received position %d", position);
-tail_states_.begin()->second.currentTailFilePosition_ = position;
+state.begin()->second.position_ = position;
   }
   if (key.find(CURRENT_STR) == 0) {
 const auto file = key.substr(strlen(CURRENT_STR));
 std::string fileLocation, fileName;
 if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, 
fileName)) {
-  tail_states_[file].path_ = fileLocation;
-  tail_states_[file].current_file_name_ = fileName;
+  state[file].path_ = fileLocation;
+  state[file].file_name_ = fileName;
 } else {
   throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "State file 
contains an invalid file name");
 }
   }
 
   if (key.find(POSITION_STR) == 0) {
 const auto file = key.substr(strlen(POSITION_STR));
-tail_states_[file].currentTailFilePosition_ = std::stoull(value);
+state[file].position_ = std::stoull(value);
   }
 }
 
+bool TailFile::recoverState(const std::shared_ptr& 
context) {
+  std::map new_tail_states;
+  bool state_load_success = getStateFromStateManager(new_tail_states) ||
+getStateFromLegacyStateFile(new_tail_states);
+  if (!state_load_success) {
+return false;
+  }
 
+  logger_->log_debug("load state succeeded");
 
-bool TailFile::recoverState(const std::shared_ptr& 
context) {
-  bool state_load_success = false;
+  tail_states_ = std::move(new_tail_states);
+
+  // Save the state to the state manager
+  storeState(context);

Review comment:
   I have just made some changes here to fix a bug (in Single file mode, 
the old state overwrote the File to Tail property if the property changed but 
the state wasn't cleared), so it makes more sense now than before. :)
   
   In any case, `storeState()` is not expensive, as it only updates the 
StateManager, which has its own schedule for how often to write changes to the 
DB.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-05-22 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r429323720



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -120,56 +158,42 @@ void TailFile::onSchedule(const 
std::shared_ptr ,
   std::string value;
 
   if (context->getProperty(Delimiter.getName(), value)) {
-delimiter_ = value;
+delimiter_ = parseDelimiter(value);
+  }
+
+  if (!context->getProperty(FileName.getName(), file_to_tail_)) {
+throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "File to Tail 
is a required property");
   }
 
   std::string mode;
   context->getProperty(TailMode.getName(), mode);
 
-  std::string file = "";
-  if (!context->getProperty(FileName.getName(), file)) {
-throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "File to Tail 
is a required property");
-  }
   if (mode == "Multiple file") {
-// file is a regex
-std::string base_dir;
-if (!context->getProperty(BaseDirectory.getName(), base_dir)) {
+tail_mode_ = Mode::MULTIPLE;
+
+if (!context->getProperty(BaseDirectory.getName(), base_dir_)) {
   throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Base 
directory is required for multiple tail mode.");

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-05-22 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r429323576



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -120,56 +158,42 @@ void TailFile::onSchedule(const 
std::shared_ptr ,
   std::string value;
 
   if (context->getProperty(Delimiter.getName(), value)) {
-delimiter_ = value;
+delimiter_ = parseDelimiter(value);
+  }
+
+  if (!context->getProperty(FileName.getName(), file_to_tail_)) {
+throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "File to Tail 
is a required property");

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-05-21 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r428757113



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -120,56 +158,42 @@ void TailFile::onSchedule(const 
std::shared_ptr ,
   std::string value;
 
   if (context->getProperty(Delimiter.getName(), value)) {
-delimiter_ = value;
+delimiter_ = parseDelimiter(value);
+  }
+
+  if (!context->getProperty(FileName.getName(), file_to_tail_)) {
+throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "File to Tail 
is a required property");
   }
 
   std::string mode;
   context->getProperty(TailMode.getName(), mode);
 
-  std::string file = "";
-  if (!context->getProperty(FileName.getName(), file)) {
-throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "File to Tail 
is a required property");
-  }
   if (mode == "Multiple file") {
-// file is a regex
-std::string base_dir;
-if (!context->getProperty(BaseDirectory.getName(), base_dir)) {
+tail_mode_ = Mode::MULTIPLE;
+
+if (!context->getProperty(BaseDirectory.getName(), base_dir_)) {
   throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Base 
directory is required for multiple tail mode.");
 }
 
-auto fileRegexSelect = [&](const std::string& path, const std::string& 
filename) -> bool {
-  if (acceptFile(file, filename)) {
-tail_states_.insert(std::make_pair(filename, TailState {path, 
filename, 0, 0}));
-  }
-  return true;
-};
-
-utils::file::FileUtils::list_dir(base_dir, fileRegexSelect, logger_, 
false);
+// in multiple mode, we check for new/removed files in every onTrigger
 
   } else {
+tail_mode_ = Mode::SINGLE;
+
 std::string fileLocation, fileName;
-if (utils::file::PathUtils::getFileNameAndPath(file, fileLocation, 
fileName)) {
-  tail_states_.insert(std::make_pair(fileName, TailState { fileLocation, 
fileName, 0, 0 }));
+if (utils::file::PathUtils::getFileNameAndPath(file_to_tail_, 
fileLocation, fileName)) {
+  tail_states_.emplace(fileName, TailState{fileLocation, fileName, 0, 0, 
0, 0});

Review comment:
   this will be overwritten in `recoverState()` if there is a persisted 
state, but I agree it looks misleading -- I'll think about how to make it 
clearer





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-05-21 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r428755248



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -120,56 +158,42 @@ void TailFile::onSchedule(const 
std::shared_ptr ,
   std::string value;
 
   if (context->getProperty(Delimiter.getName(), value)) {
-delimiter_ = value;
+delimiter_ = parseDelimiter(value);
+  }
+
+  if (!context->getProperty(FileName.getName(), file_to_tail_)) {
+throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "File to Tail 
is a required property");
   }
 
   std::string mode;
   context->getProperty(TailMode.getName(), mode);
 
-  std::string file = "";
-  if (!context->getProperty(FileName.getName(), file)) {
-throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "File to Tail 
is a required property");
-  }
   if (mode == "Multiple file") {
-// file is a regex
-std::string base_dir;
-if (!context->getProperty(BaseDirectory.getName(), base_dir)) {
+tail_mode_ = Mode::MULTIPLE;
+
+if (!context->getProperty(BaseDirectory.getName(), base_dir_)) {
   throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Base 
directory is required for multiple tail mode.");

Review comment:
   hm, not sure...  do we want to refuse to start if the directory does not 
exist (yet)?  maybe it will be created later





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-05-21 Thread GitBox


fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r428752891



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -120,56 +158,42 @@ void TailFile::onSchedule(const 
std::shared_ptr ,
   std::string value;
 
   if (context->getProperty(Delimiter.getName(), value)) {
-delimiter_ = value;
+delimiter_ = parseDelimiter(value);
+  }
+
+  if (!context->getProperty(FileName.getName(), file_to_tail_)) {
+throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "File to Tail 
is a required property");

Review comment:
   this is old code just moved around, but good point, I'll change it





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org