[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #826: MINIFICPP-1274 - Commit delete operation before shutdown
szaszm commented on a change in pull request #826: URL: https://github.com/apache/nifi-minifi-cpp/pull/826#discussion_r447559056 ## File path: libminifi/test/rocksdb-tests/RepoTests.cpp ## @@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") { LogTestController::getInstance().reset(); } + +TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") { + using ConnectionMap = std::map>; + + class TestFlowFileRepository: public core::repository::FlowFileRepository{ + public: +explicit TestFlowFileRepository(const std::string& name) +: core::SerializableComponent(name), + FlowFileRepository(name, FLOWFILE_REPOSITORY_DIRECTORY, MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME, + MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1) {} +void flush() override { + FlowFileRepository::flush(); + if (onFlush_) { +onFlush_(); + } +} +std::function onFlush_; + }; + + TestController testController; + char format[] = "/tmp/testRepo.XX"; + auto dir = testController.createTempDirectory(format); + + auto config = std::make_shared(); + config->set(minifi::Configure::nifi_flowfile_repository_directory_default, utils::file::FileUtils::concat_path(dir, "flowfile_repository")); + + auto connection = std::make_shared(nullptr, nullptr, "Connection"); + ConnectionMap connectionMap{{connection->getUUIDStr(), connection}}; + // initialize repository + { +std::shared_ptr ff_repository = std::make_shared("flowFileRepository"); + +std::atomic flush_counter{0}; + +std::atomic stop{false}; +std::thread shutdown{[&] { + while (!stop.load()) {} + ff_repository->stop(); +}}; + +ff_repository->onFlush_ = [&] { + if (++flush_counter != 1) { +return; + } + + for (int keyIdx = 0; keyIdx < 100; ++keyIdx) { +auto file = std::make_shared(ff_repository, nullptr); +file->setUuidConnection(connection->getUUIDStr()); +// Serialize is sync +file->Serialize(); +if (keyIdx % 2 == 0) { + // delete every second flowFile + ff_repository->Delete(file->getUUIDStr()); +} + } + stop = true; + // wait for the shutdown thread to start waiting for the worker thread + std::this_thread::sleep_for(std::chrono::milliseconds{100}); +}; + +ff_repository->setConnectionMap(connectionMap); +ff_repository->initialize(config); +ff_repository->loadComponent(nullptr); +ff_repository->start(); + +shutdown.join(); + } + + // check if the deleted flowfiles are indeed deleted + { +std::shared_ptr ff_repository = std::make_shared("flowFileRepository"); +ff_repository->setConnectionMap(connectionMap); +ff_repository->initialize(config); +ff_repository->loadComponent(nullptr); +ff_repository->start(); +std::this_thread::sleep_for(std::chrono::milliseconds{100}); +REQUIRE(connection->getQueueSize() == 50); + } +} Review comment: https://stackoverflow.com/a/729795/3997716 Maybe it's more intuitive if you think about them as line-endings, not line separators. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #826: MINIFICPP-1274 - Commit delete operation before shutdown
szaszm commented on a change in pull request #826: URL: https://github.com/apache/nifi-minifi-cpp/pull/826#discussion_r447559056 ## File path: libminifi/test/rocksdb-tests/RepoTests.cpp ## @@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") { LogTestController::getInstance().reset(); } + +TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") { + using ConnectionMap = std::map>; + + class TestFlowFileRepository: public core::repository::FlowFileRepository{ + public: +explicit TestFlowFileRepository(const std::string& name) +: core::SerializableComponent(name), + FlowFileRepository(name, FLOWFILE_REPOSITORY_DIRECTORY, MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME, + MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1) {} +void flush() override { + FlowFileRepository::flush(); + if (onFlush_) { +onFlush_(); + } +} +std::function onFlush_; + }; + + TestController testController; + char format[] = "/tmp/testRepo.XX"; + auto dir = testController.createTempDirectory(format); + + auto config = std::make_shared(); + config->set(minifi::Configure::nifi_flowfile_repository_directory_default, utils::file::FileUtils::concat_path(dir, "flowfile_repository")); + + auto connection = std::make_shared(nullptr, nullptr, "Connection"); + ConnectionMap connectionMap{{connection->getUUIDStr(), connection}}; + // initialize repository + { +std::shared_ptr ff_repository = std::make_shared("flowFileRepository"); + +std::atomic flush_counter{0}; + +std::atomic stop{false}; +std::thread shutdown{[&] { + while (!stop.load()) {} + ff_repository->stop(); +}}; + +ff_repository->onFlush_ = [&] { + if (++flush_counter != 1) { +return; + } + + for (int keyIdx = 0; keyIdx < 100; ++keyIdx) { +auto file = std::make_shared(ff_repository, nullptr); +file->setUuidConnection(connection->getUUIDStr()); +// Serialize is sync +file->Serialize(); +if (keyIdx % 2 == 0) { + // delete every second flowFile + ff_repository->Delete(file->getUUIDStr()); +} + } + stop = true; + // wait for the shutdown thread to start waiting for the worker thread + std::this_thread::sleep_for(std::chrono::milliseconds{100}); +}; + +ff_repository->setConnectionMap(connectionMap); +ff_repository->initialize(config); +ff_repository->loadComponent(nullptr); +ff_repository->start(); + +shutdown.join(); + } + + // check if the deleted flowfiles are indeed deleted + { +std::shared_ptr ff_repository = std::make_shared("flowFileRepository"); +ff_repository->setConnectionMap(connectionMap); +ff_repository->initialize(config); +ff_repository->loadComponent(nullptr); +ff_repository->start(); +std::this_thread::sleep_for(std::chrono::milliseconds{100}); +REQUIRE(connection->getQueueSize() == 50); + } +} Review comment: https://stackoverflow.com/a/729795/3997716 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #826: MINIFICPP-1274 - Commit delete operation before shutdown
szaszm commented on a change in pull request #826: URL: https://github.com/apache/nifi-minifi-cpp/pull/826#discussion_r447549997 ## File path: libminifi/test/rocksdb-tests/RepoTests.cpp ## @@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") { LogTestController::getInstance().reset(); } + +TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") { + using ConnectionMap = std::map>; + + class TestFlowFileRepository: public core::repository::FlowFileRepository{ + public: +explicit TestFlowFileRepository(const std::string& name) +: core::SerializableComponent(name), + FlowFileRepository(name, FLOWFILE_REPOSITORY_DIRECTORY, MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME, + MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1) {} +void flush() override { + FlowFileRepository::flush(); + if (onFlush_) { +onFlush_(); + } +} +std::function onFlush_; + }; + + TestController testController; + char format[] = "/tmp/testRepo.XX"; Review comment: 1. I don't think we have a clear list. I think we just aim for a large GNU/Linux coverage. 2. Not sure if it's the second best place. Normally we clean up temporary directories, but this doesn't happen when tests crash, so there may be leftovers in some rare situations. In a discussion around the time of fixing the referred bug, we considered disabling direct IO, but that would mean we don't test the same behavior that we are running, so going for a different directory seemed to be the better approach. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #826: MINIFICPP-1274 - Commit delete operation before shutdown
szaszm commented on a change in pull request #826: URL: https://github.com/apache/nifi-minifi-cpp/pull/826#discussion_r447547205 ## File path: libminifi/test/rocksdb-tests/RepoTests.cpp ## @@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") { LogTestController::getInstance().reset(); } + +TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") { + using ConnectionMap = std::map>; + + class TestFlowFileRepository: public core::repository::FlowFileRepository{ + public: +explicit TestFlowFileRepository(const std::string& name) +: core::SerializableComponent(name), + FlowFileRepository(name, FLOWFILE_REPOSITORY_DIRECTORY, MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME, + MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1) {} +void flush() override { + FlowFileRepository::flush(); + if (onFlush_) { +onFlush_(); + } +} +std::function onFlush_; + }; + + TestController testController; + char format[] = "/tmp/testRepo.XX"; + auto dir = testController.createTempDirectory(format); + + auto config = std::make_shared(); + config->set(minifi::Configure::nifi_flowfile_repository_directory_default, utils::file::FileUtils::concat_path(dir, "flowfile_repository")); + + auto connection = std::make_shared(nullptr, nullptr, "Connection"); + ConnectionMap connectionMap{{connection->getUUIDStr(), connection}}; + // initialize repository + { +std::shared_ptr ff_repository = std::make_shared("flowFileRepository"); + +std::atomic flush_counter{0}; + +std::atomic stop{false}; +std::thread shutdown{[&] { + while (!stop.load()) {} + ff_repository->stop(); +}}; + +ff_repository->onFlush_ = [&] { + if (++flush_counter != 1) { +return; + } + + for (int keyIdx = 0; keyIdx < 100; ++keyIdx) { +auto file = std::make_shared(ff_repository, nullptr); +file->setUuidConnection(connection->getUUIDStr()); +// Serialize is sync +file->Serialize(); +if (keyIdx % 2 == 0) { + // delete every second flowFile + ff_repository->Delete(file->getUUIDStr()); +} + } + stop = true; + // wait for the shutdown thread to start waiting for the worker thread + std::this_thread::sleep_for(std::chrono::milliseconds{100}); +}; + +ff_repository->setConnectionMap(connectionMap); +ff_repository->initialize(config); Review comment: I agree that returning a bool to signal error is not the best idea. I would prefer the use of exceptions for all errors that are not usually part of the program flow (i.e. exceptional). I'm not aware of a list of changes we are "holding back", but creating a Jira issue with Fix version = "1.0" could be one way of maintaining such a list, because we can search for those later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #826: MINIFICPP-1274 - Commit delete operation before shutdown
szaszm commented on a change in pull request #826: URL: https://github.com/apache/nifi-minifi-cpp/pull/826#discussion_r447059548 ## File path: libminifi/test/rocksdb-tests/RepoTests.cpp ## @@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") { LogTestController::getInstance().reset(); } + +TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") { + using ConnectionMap = std::map>; + + class TestFlowFileRepository: public core::repository::FlowFileRepository{ + public: +explicit TestFlowFileRepository(const std::string& name) +: core::SerializableComponent(name), + FlowFileRepository(name, FLOWFILE_REPOSITORY_DIRECTORY, MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME, + MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1) {} +void flush() override { + FlowFileRepository::flush(); + if (onFlush_) { +onFlush_(); + } +} +std::function onFlush_; + }; + + TestController testController; + char format[] = "/tmp/testRepo.XX"; + auto dir = testController.createTempDirectory(format); + + auto config = std::make_shared(); + config->set(minifi::Configure::nifi_flowfile_repository_directory_default, utils::file::FileUtils::concat_path(dir, "flowfile_repository")); + + auto connection = std::make_shared(nullptr, nullptr, "Connection"); + ConnectionMap connectionMap{{connection->getUUIDStr(), connection}}; + // initialize repository + { +std::shared_ptr ff_repository = std::make_shared("flowFileRepository"); + +std::atomic flush_counter{0}; + +std::atomic stop{false}; +std::thread shutdown{[&] { + while (!stop.load()) {} + ff_repository->stop(); +}}; + +ff_repository->onFlush_ = [&] { + if (++flush_counter != 1) { +return; + } + + for (int keyIdx = 0; keyIdx < 100; ++keyIdx) { +auto file = std::make_shared(ff_repository, nullptr); +file->setUuidConnection(connection->getUUIDStr()); +// Serialize is sync +file->Serialize(); +if (keyIdx % 2 == 0) { + // delete every second flowFile + ff_repository->Delete(file->getUUIDStr()); +} + } + stop = true; + // wait for the shutdown thread to start waiting for the worker thread + std::this_thread::sleep_for(std::chrono::milliseconds{100}); +}; + +ff_repository->setConnectionMap(connectionMap); +ff_repository->initialize(config); Review comment: Errors should make the test fail early. ```suggestion const bool init_success = ff_repository->initialize(config); REQUIRE(init_success); ``` ## File path: libminifi/test/rocksdb-tests/RepoTests.cpp ## @@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") { LogTestController::getInstance().reset(); } + +TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") { + using ConnectionMap = std::map>; + + class TestFlowFileRepository: public core::repository::FlowFileRepository{ + public: +explicit TestFlowFileRepository(const std::string& name) +: core::SerializableComponent(name), + FlowFileRepository(name, FLOWFILE_REPOSITORY_DIRECTORY, MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME, + MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1) {} +void flush() override { + FlowFileRepository::flush(); + if (onFlush_) { +onFlush_(); + } +} +std::function onFlush_; + }; + + TestController testController; + char format[] = "/tmp/testRepo.XX"; + auto dir = testController.createTempDirectory(format); + + auto config = std::make_shared(); + config->set(minifi::Configure::nifi_flowfile_repository_directory_default, utils::file::FileUtils::concat_path(dir, "flowfile_repository")); + + auto connection = std::make_shared(nullptr, nullptr, "Connection"); + ConnectionMap connectionMap{{connection->getUUIDStr(), connection}}; + // initialize repository + { +std::shared_ptr ff_repository = std::make_shared("flowFileRepository"); + +std::atomic flush_counter{0}; + +std::atomic stop{false}; +std::thread shutdown{[&] { + while (!stop.load()) {} + ff_repository->stop(); +}}; + +ff_repository->onFlush_ = [&] { + if (++flush_counter != 1) { +return; + } + + for (int keyIdx = 0; keyIdx < 100; ++keyIdx) { +auto file = std::make_shared(ff_repository, nullptr); +file->setUuidConnection(connection->getUUIDStr()); +// Serialize is sync +file->Serialize(); +if (keyIdx % 2 == 0) { + // delete every second flowFile + ff_repository->Delete(file->getUUIDStr()); +} + } + stop = true; + // wait for the shutdown thread to start waiting for the worker thread + std::this_thread::sleep_for(std::chrono::milliseconds{100}); +}; + +