[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-12 Thread GitBox


szaszm commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r439450357



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -335,170 +563,210 @@ bool TailFile::storeState(const 
std::shared_ptr& context)
   return true;
 }
 
-static bool sortTailMatchedFileItem(TailMatchedFileItem i, TailMatchedFileItem 
j) {
-  return (i.modifiedTime < j.modifiedTime);
-}
-void TailFile::checkRollOver(const std::shared_ptr& 
context, TailState , const std::string _file_name) {
-  struct stat statbuf;
-  std::vector matchedFiles;
-  std::string fullPath = file.path_ + utils::file::FileUtils::get_separator() 
+ file.current_file_name_;
-
-  if (stat(fullPath.c_str(), ) == 0) {
-logger_->log_trace("Searching for files rolled over");
-std::string pattern = file.current_file_name_;
-std::size_t found = file.current_file_name_.find_last_of(".");
-if (found != std::string::npos)
-  pattern = file.current_file_name_.substr(0, found);
-
-// Callback, called for each file entry in the listed directory
-// Return value is used to break (false) or continue (true) listing
-auto lambda = [&](const std::string& path, const std::string& filename) -> 
bool {
-  struct stat sb;
-  std::string fileFullName = path + 
utils::file::FileUtils::get_separator() + filename;
-  if ((fileFullName.find(pattern) != std::string::npos) && 
stat(fileFullName.c_str(), ) == 0) {
-uint64_t candidateModTime = ((uint64_t) (sb.st_mtime) * 1000);
-if (candidateModTime >= file.currentTailFileModificationTime_) {
-  logging::LOG_TRACE(logger_) << "File " << filename << " (short name 
" << file.current_file_name_ <<
-  ") disk mod time " << candidateModTime << ", struct mod time " << 
file.currentTailFileModificationTime_ << ", size on disk " << sb.st_size << ", 
position " << file.currentTailFilePosition_;
-  if (filename == file.current_file_name_ && candidateModTime == 
file.currentTailFileModificationTime_ &&
-  sb.st_size == file.currentTailFilePosition_) {
-return true;  // Skip the current file as a candidate in case it 
wasn't updated
+std::vector TailFile::findRotatedFiles(const TailState ) 
const {
+  logger_->log_debug("Searching for files rolled over; last read time is %llu",
+  
std::chrono::time_point_cast(state.last_read_time_).time_since_epoch().count());
+
+  std::size_t last_dot_position = state.file_name_.find_last_of('.');
+  std::string base_name = state.file_name_.substr(0, last_dot_position);
+  std::string pattern = 
utils::StringUtils::replaceOne(rolling_filename_pattern_, "${filename}", 
base_name);
+
+  std::vector matched_files_with_mtime;
+  auto collect_matching_files = [&](const std::string , const std::string 
_name) -> bool {
+if (file_name != state.file_name_ && 
utils::Regex::matchesFullInput(pattern, file_name)) {
+  std::string full_file_name = path + 
utils::file::FileUtils::get_separator() + file_name;
+  TailStateWithMtime::TimePoint 
mtime{utils::file::FileUtils::last_write_time_point(full_file_name)};
+  logger_->log_debug("File %s with mtime %llu matches rolling filename 
pattern %s", file_name, mtime.time_since_epoch().count(), pattern);
+  if (mtime >= 
std::chrono::time_point_cast(state.last_read_time_)) {
+logger_->log_debug("File %s has mtime >= last read time, so we are 
going to read it", file_name);
+matched_files_with_mtime.emplace_back(TailState{path, file_name}, 
mtime);
+  }
+}
+return true;
+  };
+
+  utils::file::FileUtils::list_dir(state.path_, collect_matching_files, 
logger_, false);
+
+  std::sort(matched_files_with_mtime.begin(), matched_files_with_mtime.end(), 
[](const TailStateWithMtime , const TailStateWithMtime ) {
+return std::tie(left.mtime_, left.tail_state_.file_name_) <
+   std::tie(right.mtime_, right.tail_state_.file_name_);
+  });
+
+  if (!matched_files_with_mtime.empty() && state.position_ > 0) {
+TailState _rotated_file = matched_files_with_mtime[0].tail_state_;
+std::string full_file_name = first_rotated_file.fileNameWithPath();
+if (utils::file::FileUtils::file_size(full_file_name) >= state.position_) {
+  uint64_t checksum = 
utils::file::FileUtils::computeChecksum(full_file_name, state.position_);
+  if (checksum == state.checksum_) {
+first_rotated_file.position_ = state.position_;
+first_rotated_file.checksum_ = state.checksum_;
   }
-  TailMatchedFileItem item;
-  item.fileName = filename;
-  item.modifiedTime = ((uint64_t) (sb.st_mtime) * 1000);
-  matchedFiles.push_back(item);
 }
   }
-  return true;};
-
-utils::file::FileUtils::list_dir(file.path_, lambda, logger_, false);
 
-if (matchedFiles.size() < 1) {
-  logger_->log_debug("No newer files found in directory!");
-  return;
-}
+  std::vector matched_files;
+  

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-12 Thread GitBox


szaszm commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r439449272



##
File path: libminifi/test/unit/StringUtilsTests.cpp
##
@@ -338,3 +338,30 @@ TEST_CASE("TestStringUtils::testJoinPackNegative", "[test 
join_pack negative]")
   == "rvalue c string, c string, rval std::string, std::string, 
char array");
 }
  */
+
+TEST_CASE("StringUtils::replaceOne works correctly", "[replaceOne]") {
+  REQUIRE(utils::StringUtils::replaceOne("", "x", "y") == "");
+  REQUIRE(utils::StringUtils::replaceOne("banana", "a", "_") == "b_nana");
+  REQUIRE(utils::StringUtils::replaceOne("banana", "b", "_") == "_anana");
+  REQUIRE(utils::StringUtils::replaceOne("banana", "x", "y") == "banana");
+  REQUIRE(utils::StringUtils::replaceOne("banana", "an", "") == "bana");
+  REQUIRE(utils::StringUtils::replaceOne("banana", "an", "AN") == "bANana");
+  REQUIRE(utils::StringUtils::replaceOne("banana", "an", "***") == "b***ana");
+  REQUIRE(utils::StringUtils::replaceOne("banana", "banana", "kiwi") == 
"kiwi");
+  REQUIRE(utils::StringUtils::replaceOne("banana", "banana", "grapefruit") == 
"grapefruit");
+}
+
+TEST_CASE("StringUtils::replaceAll works correctly", "[replaceAll]") {
+  auto replaceAll = [](std::string input, const std::string , const 
std::string ) -> std::string {
+return utils::StringUtils::replaceAll(input, from, to);
+  };

Review comment:
   I didn't realize that `StringUtils::replaceAll` modifies the source 
string. My bad.
   I'm fine with it this way.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-12 Thread GitBox


szaszm commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r439449272



##
File path: libminifi/test/unit/StringUtilsTests.cpp
##
@@ -338,3 +338,30 @@ TEST_CASE("TestStringUtils::testJoinPackNegative", "[test 
join_pack negative]")
   == "rvalue c string, c string, rval std::string, std::string, 
char array");
 }
  */
+
+TEST_CASE("StringUtils::replaceOne works correctly", "[replaceOne]") {
+  REQUIRE(utils::StringUtils::replaceOne("", "x", "y") == "");
+  REQUIRE(utils::StringUtils::replaceOne("banana", "a", "_") == "b_nana");
+  REQUIRE(utils::StringUtils::replaceOne("banana", "b", "_") == "_anana");
+  REQUIRE(utils::StringUtils::replaceOne("banana", "x", "y") == "banana");
+  REQUIRE(utils::StringUtils::replaceOne("banana", "an", "") == "bana");
+  REQUIRE(utils::StringUtils::replaceOne("banana", "an", "AN") == "bANana");
+  REQUIRE(utils::StringUtils::replaceOne("banana", "an", "***") == "b***ana");
+  REQUIRE(utils::StringUtils::replaceOne("banana", "banana", "kiwi") == 
"kiwi");
+  REQUIRE(utils::StringUtils::replaceOne("banana", "banana", "grapefruit") == 
"grapefruit");
+}
+
+TEST_CASE("StringUtils::replaceAll works correctly", "[replaceAll]") {
+  auto replaceAll = [](std::string input, const std::string , const 
std::string ) -> std::string {
+return utils::StringUtils::replaceAll(input, from, to);
+  };

Review comment:
   I didn't realize that `StringUtils::replaceAll` modifies the source 
string. My bad.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-11 Thread GitBox


szaszm commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r439020454



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -40,60 +35,283 @@
 #include 
 #include 
 
+#include "io/CRCStream.h"
 #include "utils/file/FileUtils.h"
 #include "utils/file/PathUtils.h"
 #include "utils/TimeUtil.h"
 #include "utils/StringUtils.h"
 #include "utils/RegexUtils.h"
-#ifdef HAVE_REGEX_CPP
-#include 
-#else
-#include 
-#endif
 #include "TailFile.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 
-#ifndef S_ISDIR
-#define S_ISDIR(mode)  (((mode) & S_IFMT) == S_IFDIR)
-#endif
-
-#if defined(__clang__)
-#pragma clang diagnostic push
-#pragma clang diagnostic ignored "-Wsign-compare"
-#elif defined(__GNUC__) || defined(__GNUG__)
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Wsign-compare"
-#endif
-
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace processors {
 
-core::Property TailFile::FileName("File to Tail", "Fully-qualified filename of 
the file that should be tailed when using single file mode, or a file regex 
when using multifile mode", "");
-core::Property TailFile::StateFile("State File", "DEPRECATED. Only use it for 
state migration from the legacy state file.",
-   "TailFileState");
-core::Property TailFile::Delimiter("Input Delimiter", "Specifies the character 
that should be used for delimiting the data being tailed"
-   "from the incoming file."
-   "If none is specified, data will be 
ingested as it becomes available.",
-   "");
+core::Property TailFile::FileName(
+core::PropertyBuilder::createProperty("File to Tail")
+->withDescription("Fully-qualified filename of the file that should be 
tailed when using single file mode, or a file regex when using multifile mode")
+->isRequired(true)
+->build());
+
+core::Property TailFile::StateFile(
+core::PropertyBuilder::createProperty("State File")
+->withDescription("DEPRECATED. Only use it for state migration from 
the legacy state file.")
+->isRequired(false)
+->withDefaultValue("TailFileState")
+->build());
+
+core::Property TailFile::Delimiter(
+core::PropertyBuilder::createProperty("Input Delimiter")
+->withDescription("Specifies the character that should be used for 
delimiting the data being tailed"
+ "from the incoming file. If none is specified, data will be ingested 
as it becomes available.")
+->isRequired(false)
+->withDefaultValue("\\n")
+->build());
 
 core::Property TailFile::TailMode(
-core::PropertyBuilder::createProperty("tail-mode", "Tailing 
Mode")->withDescription(
-"Specifies the tail file mode. In 'Single file' mode only a single 
file will be watched. "
+core::PropertyBuilder::createProperty("tail-mode", "Tailing Mode")
+->withDescription("Specifies the tail file mode. In 'Single file' mode 
only a single file will be watched. "
 "In 'Multiple file' mode a regex may be used. Note that in multiple 
file mode we will still continue to watch for rollover on the initial set of 
watched files. "
 "The Regex used to locate multiple files will be run during the 
schedule phrase. Note that if rotated files are matched by the regex, those 
files will be tailed.")->isRequired(true)
-->withAllowableValue("Single 
file")->withAllowableValue("Multiple file")->withDefaultValue("Single 
file")->build());
-
-core::Property 
TailFile::BaseDirectory(core::PropertyBuilder::createProperty("tail-base-directory",
 "Base Directory")->isRequired(false)->build());
+->withAllowableValue("Single 
file")->withAllowableValue("Multiple file")->withDefaultValue("Single file")
+->build());
+
+core::Property TailFile::BaseDirectory(
+core::PropertyBuilder::createProperty("tail-base-directory", "Base 
Directory")
+->withDescription("Base directory used to look for files to tail. This 
property is required when using Multiple file mode.")
+->isRequired(false)
+->build());
+
+core::Property TailFile::RecursiveLookup(
+core::PropertyBuilder::createProperty("Recursive lookup")
+->withDescription("When using Multiple file mode, this property 
determines whether files are tailed in "
+"child directories of the Base Directory or not.")
+->isRequired(false)
+->withDefaultValue(false)
+->build());
+
+core::Property TailFile::LookupFrequency(
+core::PropertyBuilder::createProperty("Lookup frequency")
+->withDescription("When using Multiple file mode, this property 
specifies the minimum duration "
+"the processor will wait between looking for new files to tail in the 
Base Directory.")
+->isRequired(false)
+

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-11 Thread GitBox


szaszm commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r438997857



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -204,52 +418,78 @@ void TailFile::parseStateFileLine(char *buf) {
   }
 
   std::string value = equal;
-  key = trimRight(key);
-  value = trimRight(value);
+  key = utils::StringUtils::trimRight(key);
+  value = utils::StringUtils::trimRight(value);
 
   if (key == "FILENAME") {
 std::string fileLocation, fileName;
 if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, 
fileName)) {
   logger_->log_debug("State migration received path %s, file %s", 
fileLocation, fileName);
-  tail_states_.insert(std::make_pair(fileName, TailState { fileLocation, 
fileName, 0, 0 }));
+  state.emplace(fileName, TailState{fileLocation, fileName});
 } else {
-  tail_states_.insert(std::make_pair(value, TailState { fileLocation, 
value, 0, 0 }));
+  state.emplace(value, TailState{fileLocation, value});
 }
   }
   if (key == "POSITION") {
 // for backwards compatibility
-if (tail_states_.size() != 1) {
+if (tail_states_.size() != std::size_t{1}) {
   throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, 
"Incompatible state file types");
 }
 const auto position = std::stoull(value);
 logger_->log_debug("Received position %d", position);

Review comment:
   Could you change fix this logging line to use the `llu` as the format 
specifier, since `position` is `unsigned long long` here?

##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -260,72 +500,60 @@ bool TailFile::recoverState(const 
std::shared_ptr& context
   try {
 const std::string& current = state_map.at("file." + std::to_string(i) 
+ ".current");
 uint64_t position = std::stoull(state_map.at("file." + 
std::to_string(i) + ".position"));
+uint64_t checksum = readOptionalUint64(state_map, "file." + 
std::to_string(i) + ".checksum");
 
 std::string fileLocation, fileName;
 if (utils::file::PathUtils::getFileNameAndPath(current, fileLocation, 
fileName)) {
   logger_->log_debug("Received path %s, file %s", fileLocation, 
fileName);
-  new_tail_states.emplace(fileName, TailState { fileLocation, 
fileName, position, 0 });
+  new_tail_states.emplace(current, TailState{fileLocation, fileName, 
position, std::chrono::system_clock::time_point{}, checksum});
 } else {
-  new_tail_states.emplace(current, TailState { fileLocation, current, 
position, 0 });
+  new_tail_states.emplace(current, TailState{fileLocation, current, 
position, std::chrono::system_clock::time_point{}, checksum});
 }
   } catch (...) {
 continue;
   }
 }
-state_load_success = true;
-tail_states_ = std::move(new_tail_states);
 for (const auto& s : tail_states_) {
-  logger_->log_debug("TailState %s: %s, %s, %llu, %llu", s.first, 
s.second.path_, s.second.current_file_name_, s.second.currentTailFilePosition_, 
s.second.currentTailFileModificationTime_);
+  logger_->log_debug("TailState %s: %s, %s, %llu, %llu",

Review comment:
   Could you fix the format string to use `PRIu64` as the format specifier 
for `uint64_t`? It's not guaranteed to be the same as `unsigned long long`, 
it's actually `unsigned long` on my machine. `PRIu64` is declared in 
``.

##
File path: extensions/standard-processors/tests/unit/TailFileTests.cpp
##
@@ -652,46 +872,606 @@ TEST_CASE("TailFileWithMultileRolledOverFiles", 
"[tailfiletest2]") {
   char format[] = "/tmp/gt.XX";
   auto dir = testController.createTempDirectory(format);
 
-  // Define test input file
-  std::string in_file(dir);
-  in_file.append("fruits.txt");
+  std::string test_file = dir + utils::file::FileUtils::get_separator() + 
"fruits.log";
 
-  for (int i = 2; 0 <= i; --i) {
-if (i < 2) {
-  std::this_thread::sleep_for(std::chrono::milliseconds(1000));  // make 
sure the new file gets newer modification time
-}
-std::ofstream in_file_stream(in_file + (i > 0 ? std::to_string(i) : ""));
-for (int j = 0; j <= i; j++) {
-  in_file_stream << "Apple" << DELIM;
-}
-in_file_stream.close();
-  }
+  std::ofstream test_file_stream_0(test_file, std::ios::binary);
+  test_file_stream_0 << "Apple" << DELIM << "Orange" << DELIM;
+  test_file_stream_0.flush();
 
   // Build MiNiFi processing graph
   auto tail_file = plan->addProcessor("TailFile", "Tail");
   plan->setProperty(tail_file, processors::TailFile::Delimiter.getName(), 
std::string(1, DELIM));
-  plan->setProperty(tail_file, processors::TailFile::FileName.getName(), 
in_file);
+  plan->setProperty(tail_file, processors::TailFile::FileName.getName(), 
test_file);
   auto log_attr = plan->addProcessor("LogAttribute", "Log", 
core::Relationship("success", "description"), true);
   

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-10 Thread GitBox


szaszm commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r438223344



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -403,102 +435,136 @@ void TailFile::onTrigger(const 
std::shared_ptr , c
 state_file_ = st_file + "." + getUUIDStr();
   }
   if (!this->state_recovered_) {
-state_recovered_ = true;
-// recover the state if we have not done so
 this->recoverState(context);
+state_recovered_ = true;
+  }
+
+  if (tail_mode_ == Mode::MULTIPLE) {
+checkForRemovedFiles();
+checkForNewFiles();
   }
 
-  /**
-   * iterate over file states. may modify them
-   */
+  // iterate over file states. may modify them
   for (auto  : tail_states_) {
-auto fileLocation = state.second.path_;
-
-checkRollOver(context, state.second, state.first);
-std::string fullPath = fileLocation + 
utils::file::FileUtils::get_separator() + state.second.current_file_name_;
-struct stat statbuf;
-
-logger_->log_debug("Tailing file %s from %llu", fullPath, 
state.second.currentTailFilePosition_);
-if (stat(fullPath.c_str(), ) == 0) {
-  if ((uint64_t) statbuf.st_size <= state.second.currentTailFilePosition_) 
{
-logger_->log_trace("Current pos: %llu", 
state.second.currentTailFilePosition_);
-logger_->log_trace("%s", "there are no new input for the current tail 
file");
-context->yield();
-return;
-  }
-  std::size_t found = state.first.find_last_of(".");
-  std::string baseName = state.first.substr(0, found);
-  std::string extension = state.first.substr(found + 1);
-
-  if (!delimiter_.empty()) {
-char delim = delimiter_.c_str()[0];
-if (delim == '\\') {
-  if (delimiter_.size() > 1) {
-switch (delimiter_.c_str()[1]) {
-  case 'r':
-delim = '\r';
-break;
-  case 't':
-delim = '\t';
-break;
-  case 'n':
-delim = '\n';
-break;
-  case '\\':
-delim = '\\';
-break;
-  default:
-// previous behavior
-break;
-}
-  }
-}
-logger_->log_debug("Looking for delimiter 0x%X", delim);
-std::vector> flowFiles;
-session->import(fullPath, flowFiles, 
state.second.currentTailFilePosition_, delim);
-logger_->log_info("%u flowfiles were received from TailFile input", 
flowFiles.size());
-
-for (auto ffr : flowFiles) {
-  logger_->log_info("TailFile %s for %u bytes", state.first, 
ffr->getSize());
-  std::string logName = baseName + "." + 
std::to_string(state.second.currentTailFilePosition_) + "-" + 
std::to_string(state.second.currentTailFilePosition_ + ffr->getSize()) + "." + 
extension;
-  ffr->updateKeyedAttribute(PATH, fileLocation);
-  ffr->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
-  ffr->updateKeyedAttribute(FILENAME, logName);
-  session->transfer(ffr, Success);
-  state.second.currentTailFilePosition_ += ffr->getSize() + 1;
-  storeState(context);
-}
+processFile(context, session, state.first, state.second);
+  }
 
-  } else {
-std::shared_ptr flowFile = 
std::static_pointer_cast(session->create());
-if (flowFile) {
-  flowFile->updateKeyedAttribute(PATH, fileLocation);
-  flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
-  session->import(fullPath, flowFile, true, 
state.second.currentTailFilePosition_);
-  session->transfer(flowFile, Success);
-  logger_->log_info("TailFile %s for %llu bytes", state.first, 
flowFile->getSize());
-  std::string logName = baseName + "." + 
std::to_string(state.second.currentTailFilePosition_) + "-" + 
std::to_string(state.second.currentTailFilePosition_ + flowFile->getSize()) + 
"."
-  + extension;
-  flowFile->updateKeyedAttribute(FILENAME, logName);
-  state.second.currentTailFilePosition_ += flowFile->getSize();
-  storeState(context);
-}
-  }
-  state.second.currentTailFileModificationTime_ = ((uint64_t) 
(statbuf.st_mtime) * 1000);
-} else {
-  logger_->log_warn("Unable to stat file %s", fullPath);
+  if (!session->existsFlowFileInRelationship(Success)) {
+yield();
+  }
+}
+
+void TailFile::processFile(const std::shared_ptr 
,
+   const std::shared_ptr 
,
+   const std::string ,
+   TailState ) {
+  if (utils::file::FileUtils::file_size(state.fileNameWithPath()) < 
state.position_) {
+processRotatedFiles(context, session, state);
+  }
+
+  processSingleFile(context, session, fileName, state);
+}
+
+void TailFile::processRotatedFiles(const std::shared_ptr 
,
+   

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-09 Thread GitBox


szaszm commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r436792573



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -403,102 +435,136 @@ void TailFile::onTrigger(const 
std::shared_ptr , c
 state_file_ = st_file + "." + getUUIDStr();
   }
   if (!this->state_recovered_) {
-state_recovered_ = true;
-// recover the state if we have not done so
 this->recoverState(context);
+state_recovered_ = true;
+  }
+
+  if (tail_mode_ == Mode::MULTIPLE) {
+checkForRemovedFiles();
+checkForNewFiles();
   }
 
-  /**
-   * iterate over file states. may modify them
-   */
+  // iterate over file states. may modify them
   for (auto  : tail_states_) {
-auto fileLocation = state.second.path_;
-
-checkRollOver(context, state.second, state.first);
-std::string fullPath = fileLocation + 
utils::file::FileUtils::get_separator() + state.second.current_file_name_;
-struct stat statbuf;
-
-logger_->log_debug("Tailing file %s from %llu", fullPath, 
state.second.currentTailFilePosition_);
-if (stat(fullPath.c_str(), ) == 0) {
-  if ((uint64_t) statbuf.st_size <= state.second.currentTailFilePosition_) 
{
-logger_->log_trace("Current pos: %llu", 
state.second.currentTailFilePosition_);
-logger_->log_trace("%s", "there are no new input for the current tail 
file");
-context->yield();
-return;
-  }
-  std::size_t found = state.first.find_last_of(".");
-  std::string baseName = state.first.substr(0, found);
-  std::string extension = state.first.substr(found + 1);
-
-  if (!delimiter_.empty()) {
-char delim = delimiter_.c_str()[0];
-if (delim == '\\') {
-  if (delimiter_.size() > 1) {
-switch (delimiter_.c_str()[1]) {
-  case 'r':
-delim = '\r';
-break;
-  case 't':
-delim = '\t';
-break;
-  case 'n':
-delim = '\n';
-break;
-  case '\\':
-delim = '\\';
-break;
-  default:
-// previous behavior
-break;
-}
-  }
-}
-logger_->log_debug("Looking for delimiter 0x%X", delim);
-std::vector> flowFiles;
-session->import(fullPath, flowFiles, 
state.second.currentTailFilePosition_, delim);
-logger_->log_info("%u flowfiles were received from TailFile input", 
flowFiles.size());
-
-for (auto ffr : flowFiles) {
-  logger_->log_info("TailFile %s for %u bytes", state.first, 
ffr->getSize());
-  std::string logName = baseName + "." + 
std::to_string(state.second.currentTailFilePosition_) + "-" + 
std::to_string(state.second.currentTailFilePosition_ + ffr->getSize()) + "." + 
extension;
-  ffr->updateKeyedAttribute(PATH, fileLocation);
-  ffr->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
-  ffr->updateKeyedAttribute(FILENAME, logName);
-  session->transfer(ffr, Success);
-  state.second.currentTailFilePosition_ += ffr->getSize() + 1;
-  storeState(context);
-}
+processFile(context, session, state.first, state.second);
+  }
 
-  } else {
-std::shared_ptr flowFile = 
std::static_pointer_cast(session->create());
-if (flowFile) {
-  flowFile->updateKeyedAttribute(PATH, fileLocation);
-  flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
-  session->import(fullPath, flowFile, true, 
state.second.currentTailFilePosition_);
-  session->transfer(flowFile, Success);
-  logger_->log_info("TailFile %s for %llu bytes", state.first, 
flowFile->getSize());
-  std::string logName = baseName + "." + 
std::to_string(state.second.currentTailFilePosition_) + "-" + 
std::to_string(state.second.currentTailFilePosition_ + flowFile->getSize()) + 
"."
-  + extension;
-  flowFile->updateKeyedAttribute(FILENAME, logName);
-  state.second.currentTailFilePosition_ += flowFile->getSize();
-  storeState(context);
-}
-  }
-  state.second.currentTailFileModificationTime_ = ((uint64_t) 
(statbuf.st_mtime) * 1000);
-} else {
-  logger_->log_warn("Unable to stat file %s", fullPath);
+  if (!session->existsFlowFileInRelationship(Success)) {
+yield();
+  }
+}
+
+void TailFile::processFile(const std::shared_ptr 
,
+   const std::shared_ptr 
,
+   const std::string ,
+   TailState ) {
+  if (utils::file::FileUtils::file_size(state.fileNameWithPath()) < 
state.position_) {
+processRotatedFiles(context, session, state);
+  }
+
+  processSingleFile(context, session, fileName, state);
+}
+
+void TailFile::processRotatedFiles(const std::shared_ptr 
,
+   

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-08 Thread GitBox


szaszm commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r436606464



##
File path: libminifi/src/core/ProcessSession.cpp
##
@@ -469,6 +470,14 @@ void ProcessSession::import(std::string source, const 
std::shared_ptr> crcStream;
+  if (crc) {
+crcStream = utils::make_unique>(stream, 
*crc);
+stream = crcStream.get();
+  }

Review comment:
   I thought a bit about this. There is a function similar to 
`ProcessSession::import` called `ProcessSession::importFrom` which takes an 
input stream instead of a file name. It doesn't support FlowFile delimiters, 
though.
   
   My idea is to extend `ProcessSession::importFrom` to support delimiters and 
use that with a `CRCStream` over a `FileStream`. If I understand correctly, we 
include every byte that we processed in the checksum, so the stream composition 
wouldn't change behavior, but correct me if I'm wrong.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-08 Thread GitBox


szaszm commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r436597137



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -403,102 +435,136 @@ void TailFile::onTrigger(const 
std::shared_ptr , c
 state_file_ = st_file + "." + getUUIDStr();
   }
   if (!this->state_recovered_) {
-state_recovered_ = true;
-// recover the state if we have not done so
 this->recoverState(context);
+state_recovered_ = true;
+  }
+
+  if (tail_mode_ == Mode::MULTIPLE) {
+checkForRemovedFiles();
+checkForNewFiles();
   }
 
-  /**
-   * iterate over file states. may modify them
-   */
+  // iterate over file states. may modify them
   for (auto  : tail_states_) {
-auto fileLocation = state.second.path_;
-
-checkRollOver(context, state.second, state.first);
-std::string fullPath = fileLocation + 
utils::file::FileUtils::get_separator() + state.second.current_file_name_;
-struct stat statbuf;
-
-logger_->log_debug("Tailing file %s from %llu", fullPath, 
state.second.currentTailFilePosition_);
-if (stat(fullPath.c_str(), ) == 0) {
-  if ((uint64_t) statbuf.st_size <= state.second.currentTailFilePosition_) 
{
-logger_->log_trace("Current pos: %llu", 
state.second.currentTailFilePosition_);
-logger_->log_trace("%s", "there are no new input for the current tail 
file");
-context->yield();
-return;
-  }
-  std::size_t found = state.first.find_last_of(".");
-  std::string baseName = state.first.substr(0, found);
-  std::string extension = state.first.substr(found + 1);
-
-  if (!delimiter_.empty()) {
-char delim = delimiter_.c_str()[0];
-if (delim == '\\') {
-  if (delimiter_.size() > 1) {
-switch (delimiter_.c_str()[1]) {
-  case 'r':
-delim = '\r';
-break;
-  case 't':
-delim = '\t';
-break;
-  case 'n':
-delim = '\n';
-break;
-  case '\\':
-delim = '\\';
-break;
-  default:
-// previous behavior
-break;
-}
-  }
-}
-logger_->log_debug("Looking for delimiter 0x%X", delim);
-std::vector> flowFiles;
-session->import(fullPath, flowFiles, 
state.second.currentTailFilePosition_, delim);
-logger_->log_info("%u flowfiles were received from TailFile input", 
flowFiles.size());
-
-for (auto ffr : flowFiles) {
-  logger_->log_info("TailFile %s for %u bytes", state.first, 
ffr->getSize());
-  std::string logName = baseName + "." + 
std::to_string(state.second.currentTailFilePosition_) + "-" + 
std::to_string(state.second.currentTailFilePosition_ + ffr->getSize()) + "." + 
extension;
-  ffr->updateKeyedAttribute(PATH, fileLocation);
-  ffr->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
-  ffr->updateKeyedAttribute(FILENAME, logName);
-  session->transfer(ffr, Success);
-  state.second.currentTailFilePosition_ += ffr->getSize() + 1;
-  storeState(context);
-}
+processFile(context, session, state.first, state.second);
+  }
 
-  } else {
-std::shared_ptr flowFile = 
std::static_pointer_cast(session->create());
-if (flowFile) {
-  flowFile->updateKeyedAttribute(PATH, fileLocation);
-  flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
-  session->import(fullPath, flowFile, true, 
state.second.currentTailFilePosition_);
-  session->transfer(flowFile, Success);
-  logger_->log_info("TailFile %s for %llu bytes", state.first, 
flowFile->getSize());
-  std::string logName = baseName + "." + 
std::to_string(state.second.currentTailFilePosition_) + "-" + 
std::to_string(state.second.currentTailFilePosition_ + flowFile->getSize()) + 
"."
-  + extension;
-  flowFile->updateKeyedAttribute(FILENAME, logName);
-  state.second.currentTailFilePosition_ += flowFile->getSize();
-  storeState(context);
-}
-  }
-  state.second.currentTailFileModificationTime_ = ((uint64_t) 
(statbuf.st_mtime) * 1000);
-} else {
-  logger_->log_warn("Unable to stat file %s", fullPath);
+  if (!session->existsFlowFileInRelationship(Success)) {
+yield();
+  }
+}
+
+void TailFile::processFile(const std::shared_ptr 
,
+   const std::shared_ptr 
,
+   const std::string ,
+   TailState ) {
+  if (utils::file::FileUtils::file_size(state.fileNameWithPath()) < 
state.position_) {
+processRotatedFiles(context, session, state);
+  }
+
+  processSingleFile(context, session, fileName, state);
+}
+
+void TailFile::processRotatedFiles(const std::shared_ptr 
,
+   

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-03 Thread GitBox


szaszm commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r433821309



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -204,52 +230,65 @@ void TailFile::parseStateFileLine(char *buf) {
   }
 
   std::string value = equal;
-  key = trimRight(key);
-  value = trimRight(value);
+  key = utils::StringUtils::trimRight(key);
+  value = utils::StringUtils::trimRight(value);
 
   if (key == "FILENAME") {
 std::string fileLocation, fileName;
 if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, 
fileName)) {
   logger_->log_debug("State migration received path %s, file %s", 
fileLocation, fileName);
-  tail_states_.insert(std::make_pair(fileName, TailState { fileLocation, 
fileName, 0, 0 }));
+  state.insert(std::make_pair(fileName, TailState{fileLocation, fileName, 
0, 0, 0, 0}));
 } else {
-  tail_states_.insert(std::make_pair(value, TailState { fileLocation, 
value, 0, 0 }));
+  state.insert(std::make_pair(value, TailState{fileLocation, value, 0, 0, 
0, 0}));
 }
   }
   if (key == "POSITION") {
 // for backwards compatibility
-if (tail_states_.size() != 1) {
+if (tail_states_.size() != (std::size_t) 1) {
   throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, 
"Incompatible state file types");
 }
 const auto position = std::stoull(value);
 logger_->log_debug("Received position %d", position);
-tail_states_.begin()->second.currentTailFilePosition_ = position;
+state.begin()->second.position_ = position;
   }
   if (key.find(CURRENT_STR) == 0) {
 const auto file = key.substr(strlen(CURRENT_STR));
 std::string fileLocation, fileName;
 if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, 
fileName)) {
-  tail_states_[file].path_ = fileLocation;
-  tail_states_[file].current_file_name_ = fileName;
+  state[file].path_ = fileLocation;
+  state[file].file_name_ = fileName;
 } else {
   throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "State file 
contains an invalid file name");
 }
   }
 
   if (key.find(POSITION_STR) == 0) {
 const auto file = key.substr(strlen(POSITION_STR));
-tail_states_[file].currentTailFilePosition_ = std::stoull(value);
+state[file].position_ = std::stoull(value);
   }
 }
 
+bool TailFile::recoverState(const std::shared_ptr& 
context) {
+  std::map new_tail_states;
+  bool state_load_success = getStateFromStateManager(new_tail_states) ||
+getStateFromLegacyStateFile(new_tail_states);
+  if (!state_load_success) {
+return false;
+  }
 
+  logger_->log_debug("load state succeeded");
 
-bool TailFile::recoverState(const std::shared_ptr& 
context) {
-  bool state_load_success = false;
+  tail_states_ = std::move(new_tail_states);
+
+  // Save the state to the state manager
+  storeState(context);

Review comment:
   Is this `storeState()` call required to fix the bug? It is not always 
cheap, because one can configure the state storage to be durable, i.e. flush 
the WAL on every commit in the rocksdb case, causing a blocking IO operation.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #791: MINIFICPP-1177 Improvements to the TailFile processor

2020-06-03 Thread GitBox


szaszm commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r432671663



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -58,42 +47,108 @@
 #define S_ISDIR(mode)  (((mode) & S_IFMT) == S_IFDIR)
 #endif
 
-#if defined(__clang__)
-#pragma clang diagnostic push
-#pragma clang diagnostic ignored "-Wsign-compare"
-#elif defined(__GNUC__) || defined(__GNUG__)
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Wsign-compare"
-#endif
-
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace processors {
 
-core::Property TailFile::FileName("File to Tail", "Fully-qualified filename of 
the file that should be tailed when using single file mode, or a file regex 
when using multifile mode", "");
-core::Property TailFile::StateFile("State File", "DEPRECATED. Only use it for 
state migration from the legacy state file.",
-   "TailFileState");
-core::Property TailFile::Delimiter("Input Delimiter", "Specifies the character 
that should be used for delimiting the data being tailed"
-   "from the incoming file."
-   "If none is specified, data will be 
ingested as it becomes available.",
-   "");
+core::Property TailFile::FileName(
+core::PropertyBuilder::createProperty("File to Tail")
+->withDescription("Fully-qualified filename of the file that should be 
tailed when using single file mode, or a file regex when using multifile mode")
+->isRequired(true)
+->build());
+
+core::Property TailFile::StateFile(
+core::PropertyBuilder::createProperty("State File")
+->withDescription("DEPRECATED. Only use it for state migration from 
the legacy state file.")
+->isRequired(false)
+->withDefaultValue("TailFileState")
+->build());
+
+core::Property TailFile::Delimiter(
+core::PropertyBuilder::createProperty("Input Delimiter")
+->withDescription("Specifies the character that should be used for 
delimiting the data being tailed"
+ "from the incoming file. If none is specified, data will be ingested 
as it becomes available.")
+->isRequired(false)
+->withDefaultValue("\\n")
+->build());
 
 core::Property TailFile::TailMode(
-core::PropertyBuilder::createProperty("tail-mode", "Tailing 
Mode")->withDescription(
-"Specifies the tail file mode. In 'Single file' mode only a single 
file will be watched. "
+core::PropertyBuilder::createProperty("tail-mode", "Tailing Mode")
+->withDescription("Specifies the tail file mode. In 'Single file' mode 
only a single file will be watched. "
 "In 'Multiple file' mode a regex may be used. Note that in multiple 
file mode we will still continue to watch for rollover on the initial set of 
watched files. "
 "The Regex used to locate multiple files will be run during the 
schedule phrase. Note that if rotated files are matched by the regex, those 
files will be tailed.")->isRequired(true)
-->withAllowableValue("Single 
file")->withAllowableValue("Multiple file")->withDefaultValue("Single 
file")->build());
-
-core::Property 
TailFile::BaseDirectory(core::PropertyBuilder::createProperty("tail-base-directory",
 "Base Directory")->isRequired(false)->build());
+->withAllowableValue("Single 
file")->withAllowableValue("Multiple file")->withDefaultValue("Single file")
+->build());
+
+core::Property TailFile::BaseDirectory(
+core::PropertyBuilder::createProperty("tail-base-directory", "Base 
Directory")
+->withDescription("Base directory used to look for files to tail. This 
property is required when using Multiple file mode.")
+->isRequired(false)
+->build());
+
+core::Property TailFile::RollingFilenamePattern(
+core::PropertyBuilder::createProperty("Rolling Filename Pattern")
+->withDescription("If the file to tail \"rolls over\" as would be the 
case with log files, this filename pattern will be used to "
+"identify files that have rolled over so MiNiFi can read the remaining 
of the rolled-over file and then continue with the new log file. "
+"This pattern supports the wildcard characters * and ?, it also 
supports the notation ${filename} to specify a pattern based on the name of the 
file "
+"(without extension), and will assume that the files that have rolled 
over live in the same directory as the file being tailed.")
+->isRequired(false)
+->withDefaultValue("${filename}.*")
+->build());
 
 core::Relationship TailFile::Success("success", "All files are routed to 
success");
 
 const char *TailFile::CURRENT_STR = "CURRENT.";
 const char *TailFile::POSITION_STR = "POSITION.";
 
+namespace {
+  template
+  bool containsKey(const Container , const Key ) {
+return