[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #826: MINIFICPP-1274 - Commit delete operation before shutdown

2020-06-29 Thread GitBox


arpadboda commented on a change in pull request #826:
URL: https://github.com/apache/nifi-minifi-cpp/pull/826#discussion_r447271999



##
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##
@@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
+  using ConnectionMap = std::map>;
+
+  class TestFlowFileRepository: public core::repository::FlowFileRepository{
+   public:
+explicit TestFlowFileRepository(const std::string& name)
+: core::SerializableComponent(name),
+  FlowFileRepository(name, FLOWFILE_REPOSITORY_DIRECTORY, 
MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
+ MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1) {}
+void flush() override {
+  FlowFileRepository::flush();
+  if (onFlush_) {
+onFlush_();
+  }
+}
+std::function onFlush_;
+  };
+
+  TestController testController;
+  char format[] = "/tmp/testRepo.XX";
+  auto dir = testController.createTempDirectory(format);
+
+  auto config = std::make_shared();
+  config->set(minifi::Configure::nifi_flowfile_repository_directory_default, 
utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+  auto connection = std::make_shared(nullptr, nullptr, 
"Connection");
+  ConnectionMap connectionMap{{connection->getUUIDStr(), connection}};
+  // initialize repository
+  {
+std::shared_ptr ff_repository = 
std::make_shared("flowFileRepository");
+
+std::atomic flush_counter{0};
+
+std::atomic stop{false};
+std::thread shutdown{[&] {
+  while (!stop.load()) {}

Review comment:
   Maybe it's a bit too late, but it seems like a busy wait for me. 
   Which naturally works, but I would avoid that on hour already overloaded CI 
infra. 
   
   I don't expect a CV here, sleeping 1ms in the body of the loop is good 
enough :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #826: MINIFICPP-1274 - Commit delete operation before shutdown

2020-06-29 Thread GitBox


arpadboda commented on a change in pull request #826:
URL: https://github.com/apache/nifi-minifi-cpp/pull/826#discussion_r447271999



##
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##
@@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
+  using ConnectionMap = std::map>;
+
+  class TestFlowFileRepository: public core::repository::FlowFileRepository{
+   public:
+explicit TestFlowFileRepository(const std::string& name)
+: core::SerializableComponent(name),
+  FlowFileRepository(name, FLOWFILE_REPOSITORY_DIRECTORY, 
MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
+ MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1) {}
+void flush() override {
+  FlowFileRepository::flush();
+  if (onFlush_) {
+onFlush_();
+  }
+}
+std::function onFlush_;
+  };
+
+  TestController testController;
+  char format[] = "/tmp/testRepo.XX";
+  auto dir = testController.createTempDirectory(format);
+
+  auto config = std::make_shared();
+  config->set(minifi::Configure::nifi_flowfile_repository_directory_default, 
utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+  auto connection = std::make_shared(nullptr, nullptr, 
"Connection");
+  ConnectionMap connectionMap{{connection->getUUIDStr(), connection}};
+  // initialize repository
+  {
+std::shared_ptr ff_repository = 
std::make_shared("flowFileRepository");
+
+std::atomic flush_counter{0};
+
+std::atomic stop{false};
+std::thread shutdown{[&] {
+  while (!stop.load()) {}

Review comment:
   Maybe it's a bit too late, but it seems like a busy wait for me. 
   Which naturally works, but I would avoid that on hour already overloaded CI 
infra. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #826: MINIFICPP-1274 - Commit delete operation before shutdown

2020-06-29 Thread GitBox


arpadboda commented on a change in pull request #826:
URL: https://github.com/apache/nifi-minifi-cpp/pull/826#discussion_r447269648



##
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##
@@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
+  using ConnectionMap = std::map>;
+
+  class TestFlowFileRepository: public core::repository::FlowFileRepository{
+   public:
+explicit TestFlowFileRepository(const std::string& name)
+: core::SerializableComponent(name),
+  FlowFileRepository(name, FLOWFILE_REPOSITORY_DIRECTORY, 
MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
+ MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1) {}
+void flush() override {
+  FlowFileRepository::flush();
+  if (onFlush_) {
+onFlush_();
+  }
+}
+std::function onFlush_;
+  };
+
+  TestController testController;
+  char format[] = "/tmp/testRepo.XX";
+  auto dir = testController.createTempDirectory(format);
+
+  auto config = std::make_shared();
+  config->set(minifi::Configure::nifi_flowfile_repository_directory_default, 
utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+  auto connection = std::make_shared(nullptr, nullptr, 
"Connection");
+  ConnectionMap connectionMap{{connection->getUUIDStr(), connection}};
+  // initialize repository
+  {
+std::shared_ptr ff_repository = 
std::make_shared("flowFileRepository");
+
+std::atomic flush_counter{0};
+
+std::atomic stop{false};
+std::thread shutdown{[&] {
+  while (!stop.load()) {}
+  ff_repository->stop();
+}};
+
+ff_repository->onFlush_ = [&] {
+  if (++flush_counter != 1) {
+return;
+  }
+  
+  for (int keyIdx = 0; keyIdx < 100; ++keyIdx) {
+auto file = std::make_shared(ff_repository, 
nullptr);
+file->setUuidConnection(connection->getUUIDStr());
+// Serialize is sync
+file->Serialize();
+if (keyIdx % 2 == 0) {
+  // delete every second flowFile
+  ff_repository->Delete(file->getUUIDStr());
+}
+  }
+  stop = true;
+  // wait for the shutdown thread to start waiting for the worker thread
+  std::this_thread::sleep_for(std::chrono::milliseconds{100});
+};
+
+ff_repository->setConnectionMap(connectionMap);
+ff_repository->initialize(config);

Review comment:
   More like ```REQUIRE(ff_repository->initialize(config))``` imho :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #826: MINIFICPP-1274 - Commit delete operation before shutdown

2020-06-29 Thread GitBox


arpadboda commented on a change in pull request #826:
URL: https://github.com/apache/nifi-minifi-cpp/pull/826#discussion_r447269150



##
File path: libminifi/src/FlowFileRecord.cpp
##
@@ -366,7 +366,7 @@ bool FlowFileRecord::DeSerialize(const uint8_t *buffer, 
const int bufferSize) {
 return false;
   }
 
-  if (nullptr == claim_) {

Review comment:
   Yep, the goal would be to *always* have claim, even if the content is 
empty.
   The reason behind is that we don't have to deal with failing streams and 
claims when handling flowfiles. A good example of this is PutFile or 
PublishKafka, where handling both types of "empty" flowfiles is a pain. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org