[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #826: MINIFICPP-1274 - Commit delete operation before shutdown

2020-06-30 Thread GitBox


szaszm commented on a change in pull request #826:
URL: https://github.com/apache/nifi-minifi-cpp/pull/826#discussion_r447559056



##
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##
@@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
+  using ConnectionMap = std::map>;
+
+  class TestFlowFileRepository: public core::repository::FlowFileRepository{
+   public:
+explicit TestFlowFileRepository(const std::string& name)
+: core::SerializableComponent(name),
+  FlowFileRepository(name, FLOWFILE_REPOSITORY_DIRECTORY, 
MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
+ MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1) {}
+void flush() override {
+  FlowFileRepository::flush();
+  if (onFlush_) {
+onFlush_();
+  }
+}
+std::function onFlush_;
+  };
+
+  TestController testController;
+  char format[] = "/tmp/testRepo.XX";
+  auto dir = testController.createTempDirectory(format);
+
+  auto config = std::make_shared();
+  config->set(minifi::Configure::nifi_flowfile_repository_directory_default, 
utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+  auto connection = std::make_shared(nullptr, nullptr, 
"Connection");
+  ConnectionMap connectionMap{{connection->getUUIDStr(), connection}};
+  // initialize repository
+  {
+std::shared_ptr ff_repository = 
std::make_shared("flowFileRepository");
+
+std::atomic flush_counter{0};
+
+std::atomic stop{false};
+std::thread shutdown{[&] {
+  while (!stop.load()) {}
+  ff_repository->stop();
+}};
+
+ff_repository->onFlush_ = [&] {
+  if (++flush_counter != 1) {
+return;
+  }
+  
+  for (int keyIdx = 0; keyIdx < 100; ++keyIdx) {
+auto file = std::make_shared(ff_repository, 
nullptr);
+file->setUuidConnection(connection->getUUIDStr());
+// Serialize is sync
+file->Serialize();
+if (keyIdx % 2 == 0) {
+  // delete every second flowFile
+  ff_repository->Delete(file->getUUIDStr());
+}
+  }
+  stop = true;
+  // wait for the shutdown thread to start waiting for the worker thread
+  std::this_thread::sleep_for(std::chrono::milliseconds{100});
+};
+
+ff_repository->setConnectionMap(connectionMap);
+ff_repository->initialize(config);
+ff_repository->loadComponent(nullptr);
+ff_repository->start();
+
+shutdown.join();
+  }
+
+  // check if the deleted flowfiles are indeed deleted
+  {
+std::shared_ptr ff_repository = 
std::make_shared("flowFileRepository");
+ff_repository->setConnectionMap(connectionMap);
+ff_repository->initialize(config);
+ff_repository->loadComponent(nullptr);
+ff_repository->start();
+std::this_thread::sleep_for(std::chrono::milliseconds{100});
+REQUIRE(connection->getQueueSize() == 50);
+  }
+}

Review comment:
   https://stackoverflow.com/a/729795/3997716
   
   Maybe it's more intuitive if you think about them as line-endings, not line 
separators.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #826: MINIFICPP-1274 - Commit delete operation before shutdown

2020-06-30 Thread GitBox


szaszm commented on a change in pull request #826:
URL: https://github.com/apache/nifi-minifi-cpp/pull/826#discussion_r447559056



##
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##
@@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
+  using ConnectionMap = std::map>;
+
+  class TestFlowFileRepository: public core::repository::FlowFileRepository{
+   public:
+explicit TestFlowFileRepository(const std::string& name)
+: core::SerializableComponent(name),
+  FlowFileRepository(name, FLOWFILE_REPOSITORY_DIRECTORY, 
MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
+ MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1) {}
+void flush() override {
+  FlowFileRepository::flush();
+  if (onFlush_) {
+onFlush_();
+  }
+}
+std::function onFlush_;
+  };
+
+  TestController testController;
+  char format[] = "/tmp/testRepo.XX";
+  auto dir = testController.createTempDirectory(format);
+
+  auto config = std::make_shared();
+  config->set(minifi::Configure::nifi_flowfile_repository_directory_default, 
utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+  auto connection = std::make_shared(nullptr, nullptr, 
"Connection");
+  ConnectionMap connectionMap{{connection->getUUIDStr(), connection}};
+  // initialize repository
+  {
+std::shared_ptr ff_repository = 
std::make_shared("flowFileRepository");
+
+std::atomic flush_counter{0};
+
+std::atomic stop{false};
+std::thread shutdown{[&] {
+  while (!stop.load()) {}
+  ff_repository->stop();
+}};
+
+ff_repository->onFlush_ = [&] {
+  if (++flush_counter != 1) {
+return;
+  }
+  
+  for (int keyIdx = 0; keyIdx < 100; ++keyIdx) {
+auto file = std::make_shared(ff_repository, 
nullptr);
+file->setUuidConnection(connection->getUUIDStr());
+// Serialize is sync
+file->Serialize();
+if (keyIdx % 2 == 0) {
+  // delete every second flowFile
+  ff_repository->Delete(file->getUUIDStr());
+}
+  }
+  stop = true;
+  // wait for the shutdown thread to start waiting for the worker thread
+  std::this_thread::sleep_for(std::chrono::milliseconds{100});
+};
+
+ff_repository->setConnectionMap(connectionMap);
+ff_repository->initialize(config);
+ff_repository->loadComponent(nullptr);
+ff_repository->start();
+
+shutdown.join();
+  }
+
+  // check if the deleted flowfiles are indeed deleted
+  {
+std::shared_ptr ff_repository = 
std::make_shared("flowFileRepository");
+ff_repository->setConnectionMap(connectionMap);
+ff_repository->initialize(config);
+ff_repository->loadComponent(nullptr);
+ff_repository->start();
+std::this_thread::sleep_for(std::chrono::milliseconds{100});
+REQUIRE(connection->getQueueSize() == 50);
+  }
+}

Review comment:
   https://stackoverflow.com/a/729795/3997716





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #826: MINIFICPP-1274 - Commit delete operation before shutdown

2020-06-30 Thread GitBox


szaszm commented on a change in pull request #826:
URL: https://github.com/apache/nifi-minifi-cpp/pull/826#discussion_r447549997



##
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##
@@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
+  using ConnectionMap = std::map>;
+
+  class TestFlowFileRepository: public core::repository::FlowFileRepository{
+   public:
+explicit TestFlowFileRepository(const std::string& name)
+: core::SerializableComponent(name),
+  FlowFileRepository(name, FLOWFILE_REPOSITORY_DIRECTORY, 
MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
+ MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1) {}
+void flush() override {
+  FlowFileRepository::flush();
+  if (onFlush_) {
+onFlush_();
+  }
+}
+std::function onFlush_;
+  };
+
+  TestController testController;
+  char format[] = "/tmp/testRepo.XX";

Review comment:
   1. I don't think we have a clear list. I think we just aim for a large 
GNU/Linux coverage.
   2. Not sure if it's the second best place. Normally we clean up temporary 
directories, but this doesn't happen when tests crash, so there may be 
leftovers in some rare situations. In a discussion around the time of fixing 
the referred bug, we considered disabling direct IO, but that would mean we 
don't test the same behavior that we are running, so going for a different 
directory seemed to be the better approach.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #826: MINIFICPP-1274 - Commit delete operation before shutdown

2020-06-30 Thread GitBox


szaszm commented on a change in pull request #826:
URL: https://github.com/apache/nifi-minifi-cpp/pull/826#discussion_r447547205



##
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##
@@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
+  using ConnectionMap = std::map>;
+
+  class TestFlowFileRepository: public core::repository::FlowFileRepository{
+   public:
+explicit TestFlowFileRepository(const std::string& name)
+: core::SerializableComponent(name),
+  FlowFileRepository(name, FLOWFILE_REPOSITORY_DIRECTORY, 
MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
+ MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1) {}
+void flush() override {
+  FlowFileRepository::flush();
+  if (onFlush_) {
+onFlush_();
+  }
+}
+std::function onFlush_;
+  };
+
+  TestController testController;
+  char format[] = "/tmp/testRepo.XX";
+  auto dir = testController.createTempDirectory(format);
+
+  auto config = std::make_shared();
+  config->set(minifi::Configure::nifi_flowfile_repository_directory_default, 
utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+  auto connection = std::make_shared(nullptr, nullptr, 
"Connection");
+  ConnectionMap connectionMap{{connection->getUUIDStr(), connection}};
+  // initialize repository
+  {
+std::shared_ptr ff_repository = 
std::make_shared("flowFileRepository");
+
+std::atomic flush_counter{0};
+
+std::atomic stop{false};
+std::thread shutdown{[&] {
+  while (!stop.load()) {}
+  ff_repository->stop();
+}};
+
+ff_repository->onFlush_ = [&] {
+  if (++flush_counter != 1) {
+return;
+  }
+  
+  for (int keyIdx = 0; keyIdx < 100; ++keyIdx) {
+auto file = std::make_shared(ff_repository, 
nullptr);
+file->setUuidConnection(connection->getUUIDStr());
+// Serialize is sync
+file->Serialize();
+if (keyIdx % 2 == 0) {
+  // delete every second flowFile
+  ff_repository->Delete(file->getUUIDStr());
+}
+  }
+  stop = true;
+  // wait for the shutdown thread to start waiting for the worker thread
+  std::this_thread::sleep_for(std::chrono::milliseconds{100});
+};
+
+ff_repository->setConnectionMap(connectionMap);
+ff_repository->initialize(config);

Review comment:
   I agree that returning a bool to signal error is not the best idea. I 
would prefer the use of exceptions for all errors that are not usually part of 
the program flow (i.e. exceptional).
   
   I'm not aware of a list of changes we are "holding back", but creating a 
Jira issue with Fix version = "1.0" could be one way of maintaining such a 
list, because we can search for those later.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #826: MINIFICPP-1274 - Commit delete operation before shutdown

2020-06-29 Thread GitBox


szaszm commented on a change in pull request #826:
URL: https://github.com/apache/nifi-minifi-cpp/pull/826#discussion_r447059548



##
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##
@@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
+  using ConnectionMap = std::map>;
+
+  class TestFlowFileRepository: public core::repository::FlowFileRepository{
+   public:
+explicit TestFlowFileRepository(const std::string& name)
+: core::SerializableComponent(name),
+  FlowFileRepository(name, FLOWFILE_REPOSITORY_DIRECTORY, 
MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
+ MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1) {}
+void flush() override {
+  FlowFileRepository::flush();
+  if (onFlush_) {
+onFlush_();
+  }
+}
+std::function onFlush_;
+  };
+
+  TestController testController;
+  char format[] = "/tmp/testRepo.XX";
+  auto dir = testController.createTempDirectory(format);
+
+  auto config = std::make_shared();
+  config->set(minifi::Configure::nifi_flowfile_repository_directory_default, 
utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+  auto connection = std::make_shared(nullptr, nullptr, 
"Connection");
+  ConnectionMap connectionMap{{connection->getUUIDStr(), connection}};
+  // initialize repository
+  {
+std::shared_ptr ff_repository = 
std::make_shared("flowFileRepository");
+
+std::atomic flush_counter{0};
+
+std::atomic stop{false};
+std::thread shutdown{[&] {
+  while (!stop.load()) {}
+  ff_repository->stop();
+}};
+
+ff_repository->onFlush_ = [&] {
+  if (++flush_counter != 1) {
+return;
+  }
+  
+  for (int keyIdx = 0; keyIdx < 100; ++keyIdx) {
+auto file = std::make_shared(ff_repository, 
nullptr);
+file->setUuidConnection(connection->getUUIDStr());
+// Serialize is sync
+file->Serialize();
+if (keyIdx % 2 == 0) {
+  // delete every second flowFile
+  ff_repository->Delete(file->getUUIDStr());
+}
+  }
+  stop = true;
+  // wait for the shutdown thread to start waiting for the worker thread
+  std::this_thread::sleep_for(std::chrono::milliseconds{100});
+};
+
+ff_repository->setConnectionMap(connectionMap);
+ff_repository->initialize(config);

Review comment:
   Errors should make the test fail early.
   ```suggestion
   const bool init_success = ff_repository->initialize(config);
   REQUIRE(init_success);
   ```

##
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##
@@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
+  using ConnectionMap = std::map>;
+
+  class TestFlowFileRepository: public core::repository::FlowFileRepository{
+   public:
+explicit TestFlowFileRepository(const std::string& name)
+: core::SerializableComponent(name),
+  FlowFileRepository(name, FLOWFILE_REPOSITORY_DIRECTORY, 
MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
+ MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1) {}
+void flush() override {
+  FlowFileRepository::flush();
+  if (onFlush_) {
+onFlush_();
+  }
+}
+std::function onFlush_;
+  };
+
+  TestController testController;
+  char format[] = "/tmp/testRepo.XX";
+  auto dir = testController.createTempDirectory(format);
+
+  auto config = std::make_shared();
+  config->set(minifi::Configure::nifi_flowfile_repository_directory_default, 
utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+  auto connection = std::make_shared(nullptr, nullptr, 
"Connection");
+  ConnectionMap connectionMap{{connection->getUUIDStr(), connection}};
+  // initialize repository
+  {
+std::shared_ptr ff_repository = 
std::make_shared("flowFileRepository");
+
+std::atomic flush_counter{0};
+
+std::atomic stop{false};
+std::thread shutdown{[&] {
+  while (!stop.load()) {}
+  ff_repository->stop();
+}};
+
+ff_repository->onFlush_ = [&] {
+  if (++flush_counter != 1) {
+return;
+  }
+  
+  for (int keyIdx = 0; keyIdx < 100; ++keyIdx) {
+auto file = std::make_shared(ff_repository, 
nullptr);
+file->setUuidConnection(connection->getUUIDStr());
+// Serialize is sync
+file->Serialize();
+if (keyIdx % 2 == 0) {
+  // delete every second flowFile
+  ff_repository->Delete(file->getUUIDStr());
+}
+  }
+  stop = true;
+  // wait for the shutdown thread to start waiting for the worker thread
+  std::this_thread::sleep_for(std::chrono::milliseconds{100});
+};
+
+