[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r415336955 ## File path: libminifi/src/c2/C2Agent.cpp ## @@ -435,6 +437,27 @@ void C2Agent::handle_c2_server_response(const C2ContentResponse ) { update_sink_->drainRepositories(); C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); enqueue_c2_response(std::move(response)); + } else if (resp.name == "corecomponentstate") { Review comment: This is unfortunately still very much non-trivial to do (unlike testing DESCRIBE, which I do now), so I've added a TODO for it as suggested. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r415336687 ## File path: extensions/sql/processors/QueryDatabaseTable.cpp ## @@ -308,19 +240,43 @@ void QueryDatabaseTable::processOnSchedule(const core::ProcessContext ) context.getProperty(s_sqlQuery.getName(), sqlQuery_); context.getProperty(s_maxRowsPerFlowFile.getName(), maxRowsPerFlowFile_); - std::string stateDir; - context.getProperty(s_stateDirectory.getName(), stateDir); - if (stateDir.empty()) { -logger_->log_error("State Directory is empty"); -return; + mapState_.clear(); + + state_manager_ = context.getStateManager(); + if (state_manager_ == nullptr) { +throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager"); } - pState_ = std::make_unique(tableName_, stateDir, getUUIDStr(), logger_); - if (!*pState_) { -return; + std::unordered_map state_map; + if (state_manager_->get(state_map)) { +if (state_map[TABLENAME_KEY] != tableName_) { + state_manager_->clear(); +} else { + for (auto&& elem : state_map) { +if (elem.first.find(MAXVALUE_KEY_PREFIX) == 0) { + mapState_.emplace(elem.first.substr(MAXVALUE_KEY_PREFIX.length()), std::move(elem.second)); +} + } +} + } else { +// Try to migrate legacy state file +std::string stateDir; +context.getProperty(s_stateDirectory.getName(), stateDir); +if (!stateDir.empty()) { + LegacyState legacyState(tableName_, stateDir, getUUIDStr(), logger_); + if (legacyState) { +mapState_ = legacyState.getStateMap(); +if (saveState() && state_manager_->persist()) { + logger_->log_info("State migration successful"); + legacyState.moveStateFileToMigrated(); +} else { + logger_->log_warn("Failed to persists migrated state"); +} + } else { +logger_->log_warn("Could not migrate state from specified State Directory %s", stateDir); + } +} Review comment: It is short and very much unlikely to be reused, so I don't see much benefit in moving to a function. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r415333419 ## File path: libminifi/src/c2/C2Agent.cpp ## @@ -596,6 +619,29 @@ void C2Agent::handle_describe(const C2ContentResponse ) { } enqueue_c2_response(std::move(response)); } + } else if (resp.name == "corecomponentstate") { Review comment: Added `C2DescribeCoreComponentStateTest` for this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r413931006 ## File path: libminifi/src/c2/C2Agent.cpp ## @@ -435,6 +437,27 @@ void C2Agent::handle_c2_server_response(const C2ContentResponse ) { update_sink_->drainRepositories(); C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); enqueue_c2_response(std::move(response)); + } else if (resp.name == "corecomponentstate") { Review comment: Let's revisit this after rebasing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r413921343 ## File path: extensions/sftp/processors/ListSFTP.cpp ## @@ -507,85 +468,74 @@ ListSFTP::ListedEntity::ListedEntity(uint64_t timestamp_, uint64_t size_) , size(size_) { } -bool ListSFTP::persistTrackingTimestampsCache(const std::string& hostname, const std::string& username, const std::string& remote_path) { - std::ofstream file(tracking_timestamps_state_filename_); - if (!file.is_open()) { -logger_->log_error("Failed to store state to Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str()); -return false; - } - file << "hostname=" << hostname << "\n"; - file << "username=" << username << "\n"; - file << "remote_path=" << remote_path << "\n"; - file << "listing.timestamp=" << last_listed_latest_entry_timestamp_ << "\n"; - file << "processed.timestamp=" << last_processed_latest_entry_timestamp_ << "\n"; +bool ListSFTP::persistTrackingTimestampsCache(const std::shared_ptr& context, const std::string& hostname, const std::string& username, const std::string& remote_path) { + std::unordered_map state; + state["listing_strategy"] = LISTING_STRATEGY_TRACKING_TIMESTAMPS; + state["hostname"] = hostname; + state["username"] = username; + state["remote_path"] = remote_path; + state["listing.timestamp"] = std::to_string(last_listed_latest_entry_timestamp_); + state["processed.timestamp"] = std::to_string(last_processed_latest_entry_timestamp_); size_t i = 0; for (const auto& identifier : latest_identifiers_processed_) { -file << "id." << i << "=" << identifier << "\n"; +state["id." + std::to_string(i)] = identifier; ++i; } - return true; + return state_manager_->set(state); } -bool ListSFTP::updateFromTrackingTimestampsCache(const std::string& hostname, const std::string& username, const std::string& remote_path) { - std::ifstream file(tracking_timestamps_state_filename_); - if (!file.is_open()) { -logger_->log_error("Failed to open Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str()); -return false; - } +bool ListSFTP::updateFromTrackingTimestampsCache(const std::shared_ptr& context, const std::string& hostname, const std::string& username, const std::string& remote_path) { + std::string state_listing_strategy; std::string state_hostname; std::string state_username; std::string state_remote_path; uint64_t state_listing_timestamp; uint64_t state_processed_timestamp; std::set state_ids; - std::string line; - while (std::getline(file, line)) { -size_t separator_pos = line.find('='); -if (separator_pos == std::string::npos) { - logger_->log_warn("None key-value line found in Tracking Timestamps state file \"%s\": \"%s\"", tracking_timestamps_state_filename_.c_str(), line.c_str()); + std::unordered_map state_map; + if (!state_manager_->get(state_map)) { +logger_->log_info("Found no stored state"); +return false; + } + try { +state_listing_strategy = state_map.at("listing_strategy"); +state_hostname = state_map.at("hostname"); +state_username = state_map.at("username"); +state_remote_path = state_map.at("remote_path"); +try { + state_listing_timestamp = stoull(state_map.at("listing.timestamp")); +} catch (...) { + return false; } -std::string key = line.substr(0, separator_pos); -std::string value = line.substr(separator_pos + 1); -if (key == "hostname") { - state_hostname = std::move(value); -} else if (key == "username") { - state_username = std::move(value); -} else if (key == "remote_path") { - state_remote_path = std::move(value); -} else if (key == "listing.timestamp") { - try { -state_listing_timestamp = stoull(value); - } catch (...) { -logger_->log_error("listing.timestamp is not an uint64 in Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str()); -return false; - } -} else if (key == "processed.timestamp") { - try { -state_processed_timestamp = stoull(value); - } catch (...) { -logger_->log_error("processed.timestamp is not an uint64 in Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str()); -return false; +try { + state_processed_timestamp = stoull(state_map.at("processed.timestamp")); +} catch (...) { + return false; +} Review comment: Added individual logging. ## File path: extensions/sftp/processors/ListSFTP.cpp ## @@ -760,140 +708,83 @@ void ListSFTP::listByTrackingTimestamps( } } -bool ListSFTP::persistTrackingEntitiesCache(const std::string& hostname, const std::string& username, const std::string& remote_path) { - std::ofstream file(tracking_entities_state_filename_); - if (!file.is_open()) { -
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r413770032 ## File path: extensions/sftp/processors/ListSFTP.cpp ## @@ -507,85 +463,87 @@ ListSFTP::ListedEntity::ListedEntity(uint64_t timestamp_, uint64_t size_) , size(size_) { } -bool ListSFTP::persistTrackingTimestampsCache(const std::string& hostname, const std::string& username, const std::string& remote_path) { - std::ofstream file(tracking_timestamps_state_filename_); - if (!file.is_open()) { -logger_->log_error("Failed to store state to Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str()); +bool ListSFTP::persistTrackingTimestampsCache(const std::shared_ptr& context, const std::string& hostname, const std::string& username, const std::string& remote_path) { + auto state_manager = context->getStateManager(); + if (state_manager == nullptr) { return false; } - file << "hostname=" << hostname << "\n"; - file << "username=" << username << "\n"; - file << "remote_path=" << remote_path << "\n"; - file << "listing.timestamp=" << last_listed_latest_entry_timestamp_ << "\n"; - file << "processed.timestamp=" << last_processed_latest_entry_timestamp_ << "\n"; + std::unordered_map state; + state["listing_strategy"] = LISTING_STRATEGY_TRACKING_TIMESTAMPS; + state["hostname"] = hostname; + state["username"] = username; + state["remote_path"] = remote_path; + state["listing.timestamp"] = std::to_string(last_listed_latest_entry_timestamp_); + state["processed.timestamp"] = std::to_string(last_processed_latest_entry_timestamp_); size_t i = 0; for (const auto& identifier : latest_identifiers_processed_) { -file << "id." << i << "=" << identifier << "\n"; +state["id." + std::to_string(i)] = identifier; ++i; } + state_manager->set(state); + if (!state_manager->persist()) { +return false; + } return true; } -bool ListSFTP::updateFromTrackingTimestampsCache(const std::string& hostname, const std::string& username, const std::string& remote_path) { - std::ifstream file(tracking_timestamps_state_filename_); - if (!file.is_open()) { -logger_->log_error("Failed to open Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str()); -return false; - } +bool ListSFTP::updateFromTrackingTimestampsCache(const std::shared_ptr& context, const std::string& hostname, const std::string& username, const std::string& remote_path) { Review comment: I am not really open to in the context of this PR. This is preexisting code, and is used this way in many processors. If you want to do a round of these modifications to make easier further API changes, that makes sense, but should be a separate issue (and ticket). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r413766649 ## File path: extensions/standard-processors/tests/unit/TailFileTests.cpp ## @@ -501,7 +481,7 @@ TEST_CASE("TailFileWithDelimiterMultipleDelimiters", "[tailfiletest2]") { std::string line1(4097, '\n'); std::mt19937 gen(std::random_device { }()); std::generate_n(line1.begin(), 4095, [&]() -> char { -return 32 + gen() % (127 - 32); + return 32 + gen() % (127 - 32); Review comment: I am not really. ## File path: extensions/standard-processors/controllers/UnorderedMapKeyValueStoreService.cpp ## @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "UnorderedMapKeyValueStoreService.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace controllers { + +UnorderedMapKeyValueStoreService::UnorderedMapKeyValueStoreService(const std::string& name, const std::string& id) +: KeyValueStoreService(name, id) +, logger_(logging::LoggerFactory::getLogger()) { +} + +UnorderedMapKeyValueStoreService::UnorderedMapKeyValueStoreService(const std::string& name, utils::Identifier uuid /*= utils::Identifier()*/) +: KeyValueStoreService(name, uuid) +, logger_(logging::LoggerFactory::getLogger()) { +} + +UnorderedMapKeyValueStoreService::UnorderedMapKeyValueStoreService(const std::string& name, const std::shared_ptr ) +: KeyValueStoreService(name) +, logger_(logging::LoggerFactory::getLogger()) { + setConfiguration(configuration); + initialize(); +} + +UnorderedMapKeyValueStoreService::~UnorderedMapKeyValueStoreService() { +} + +bool UnorderedMapKeyValueStoreService::set(const std::string& key, const std::string& value) { + std::lock_guard lock(mutex_); + map_[key] = value; + return true; +} + +bool UnorderedMapKeyValueStoreService::get(const std::string& key, std::string& value) { + std::lock_guard lock(mutex_); + auto it = map_.find(key); + if (it == map_.end()) { +return false; + } else { +value = it->second; +return true; + } +} + +bool UnorderedMapKeyValueStoreService::get(std::unordered_map& kvs) { + std::lock_guard lock(mutex_); + kvs = map_; + return true; +} + +bool UnorderedMapKeyValueStoreService::remove(const std::string& key) { + std::lock_guard lock(mutex_); + return map_.erase(key) == 1U; +} + +bool UnorderedMapKeyValueStoreService::clear() { + std::lock_guard lock(mutex_); + map_.clear(); + return true; +} + +bool UnorderedMapKeyValueStoreService::update(const std::string& key, const std::function& update_func) { + std::lock_guard lock(mutex_); + bool exists = false; + std::string value; + auto it = map_.find(key); + if (it != map_.end()) { +exists = true; +value = it->second; + } + try { +if (!update_func(exists, value)) { + return false; +} + } catch (...) { +return false; + } Review comment: We are not silently swallowing it, we are making the operation fail, but added logging to the exception path. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r413746596 ## File path: extensions/sql/processors/ExecuteSQL.cpp ## @@ -81,14 +81,14 @@ void ExecuteSQL::initialize() { setSupportedRelationships( { s_success }); } -void ExecuteSQL::processOnSchedule(const core::ProcessContext ) { +void ExecuteSQL::processOnSchedule(core::ProcessContext ) { initOutputFormat(context); context.getProperty(s_sqlSelectQuery.getName(), sqlSelectQuery_); context.getProperty(s_maxRowsPerFlowFile.getName(), max_rows_); } -void ExecuteSQL::processOnTrigger(core::ProcessSession ) { +void ExecuteSQL::processOnTrigger(core::ProcessContext& /*context*/, core::ProcessSession& session) { Review comment: In a previous iteration QueryDatabaseTables's processOnTrigger needed the context, so it had to be put on the CRTP interface of SQLProcessor. It has been rewritten since and it is no longer needed by QDT, so I removed it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r413745773 ## File path: extensions/standard-processors/processors/TailFile.cpp ## @@ -232,20 +236,61 @@ void TailFile::parseStateFileLine(char *buf) { const auto file = key.substr(strlen(POSITION_STR)); tail_states_[file].currentTailFilePosition_ = std::stoull(value); } - - return; } -bool TailFile::recoverState() { - std::ifstream file(state_file_.c_str(), std::ifstream::in); - if (!file.good()) { -logger_->log_error("load state file failed %s", state_file_); -return false; + + +bool TailFile::recoverState(const std::shared_ptr& context) { + bool state_load_success = false; + + std::unordered_map state_map; + if (state_manager_->get(state_map)) { +std::map new_tail_states; +size_t i = 0; +while (true) { + std::string name; + try { +name = state_map.at("file." + std::to_string(i) + ".name"); + } catch (...) { +break; + } + try { +const std::string& current = state_map.at("file." + std::to_string(i) + ".current"); +uint64_t position = std::stoull(state_map.at("file." + std::to_string(i) + ".position")); + +std::string fileLocation, fileName; +if (utils::file::PathUtils::getFileNameAndPath(current, fileLocation, fileName)) { + logger_->log_debug("Received path %s, file %s", fileLocation, fileName); + new_tail_states.emplace(fileName, TailState { fileLocation, fileName, position, 0 }); +} else { + new_tail_states.emplace(current, TailState { fileLocation, current, position, 0 }); +} + } catch (...) { +continue; + } + ++i; +} +state_load_success = true; +tail_states_ = std::move(new_tail_states); +for (const auto& s : tail_states_) { + logger_->log_debug("TailState %s: %s, %s, %llu, %llu", s.first, s.second.path_, s.second.current_file_name_, s.second.currentTailFilePosition_, s.second.currentTailFileModificationTime_); +} + } else { +logger_->log_info("Found no stored state"); } - tail_states_.clear(); - char buf[BUFFER_SIZE]; - for (file.getline(buf, BUFFER_SIZE); file.good(); file.getline(buf, BUFFER_SIZE)) { -parseStateFileLine(buf); + + /* We could not get the state form the StateManager, try to migrate the old state file if it exists */ Review comment: Fixed. ## File path: libminifi/src/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.cpp ## @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "controllers/keyvalue/AbstractCoreComponentStateManagerProvider.h" + +#include "rapidjson/rapidjson.h" +#include "rapidjson/document.h" +#include "rapidjson/stringbuffer.h" +#include "rapidjson/writer.h" + +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace controllers { + +AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::AbstractCoreComponentStateManager( +std::shared_ptr provider, +const std::string& id) +: provider_(std::move(provider)) +, id_(id) +, state_valid_(false) { + std::string serialized; + if (provider_->getImpl(id_, serialized) && provider_->deserialize(serialized, state_)) { +state_valid_ = true; + } +} + +bool AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::set(const std::unordered_map& kvs) { + if (provider_->setImpl(id_, provider_->serialize(kvs))) { +state_valid_ = true; +state_ = kvs; +return true; + } else { +return false; + } +} + +bool AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::get(std::unordered_map& kvs) { + if (!state_valid_) { +return false; + } + kvs = state_; + return true; +} + +bool AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::clear() { + if (!state_valid_) { +return false; + } + if (provider_->removeImpl(id_)) { +state_valid_ = false; +state_.clear(); +return true; + } else { +return false; + } +} + +bool
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r413745426 ## File path: libminifi/test/TestBase.cpp ## @@ -45,6 +45,23 @@ TestPlan::TestPlan(std::shared_ptr content_repo, std::s flow_version_(flow_version), logger_(logging::LoggerFactory::getLogger()) { stream_factory = org::apache::nifi::minifi::io::StreamFactory::getInstance(std::make_shared()); + controller_services_ = std::make_shared(); + controller_services_provider_ = std::make_shared(controller_services_, nullptr, configuration_); + /* Inject the default state provider ahead of ProcessContext to make sure we have a unique state directory */ + if (state_dir == nullptr) { +char state_dir_name_template[] = "/tmp/teststate.XX"; +state_dir_ = utils::file::FileUtils::create_temp_directory(state_dir_name_template); + } else { +state_dir_ = state_dir; + } + state_manager_provider_ = core::ProcessContext::getOrCreateDefaultStateManagerProvider(controller_services_provider_, configuration_, state_dir_.c_str()); Review comment: Done. ## File path: libminifi/test/integration/IntegrationBase.h ## @@ -121,6 +122,11 @@ void IntegrationBase::run(std::string test_file_location) { core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location); + auto controller_service_provider = yaml_ptr->getControllerServiceProvider(); + char state_dir_name_template[] = "/tmp/integrationstate.XX"; + state_dir = utils::file::FileUtils::create_temp_directory(state_dir_name_template); + core::ProcessContext::getOrCreateDefaultStateManagerProvider(controller_service_provider, configuration, state_dir.c_str()); Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r413743841 ## File path: libminifi/test/keyvalue-tests/CMakeLists.txt ## @@ -0,0 +1,42 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +file(GLOB KEYVALUE_TESTS "*.cpp") + +SET(KEYVALUE_INT_TEST_COUNT 0) + +FOREACH(testfile ${KEYVALUE_TESTS}) +get_filename_component(testfilename "${testfile}" NAME_WE) +add_executable("${testfilename}" "${testfile}" ) +target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/libminifi/test/") +createTests("${testfilename}") +target_wholearchive_library(${testfilename} minifi-standard-processors) +if (NOT DISABLE_ROCKSDB) +target_wholearchive_library(${testfilename} minifi-rocksdb-repos) +endif() + +MATH(EXPR KEYVALUE_INT_TEST_COUNT "${KEYVALUE_INT_TEST_COUNT}+1") +ENDFOREACH() + +message("-- Finished building ${KEYVALUE_INT_TEST_COUNT} keyvalue controller test file(s)...") + +add_test(NAME UnorderedMapPersistableKeyValueStoreServiceTest COMMAND PersistableKeyValueStoreServiceTest --config-yaml "${TEST_RESOURCES}/UnorderedMapPersistableKeyValueStoreServiceTest.yml") +if (NOT DISABLE_ROCKSDB) +add_test(NAME RocksdDbPersistableKeyValueStoreServiceTest COMMAND PersistableKeyValueStoreServiceTest --config-yaml "${TEST_RESOURCES}/RocksDbPersistableKeyValueStoreServiceTest.yml") +endif() Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r413742717 ## File path: libminifi/src/core/ConfigurableComponent.cpp ## @@ -195,6 +195,19 @@ bool ConfigurableComponent::setSupportedProperties(std::set properties return true; } +bool ConfigurableComponent::updateSupportedProperties(std::set properties) { + if (!canEdit()) { +return false; + } + + std::lock_guard lock(configuration_mutex_); + + for (auto item : properties) { +properties_[item.getName()] = item; Review comment: This function is a modified version of `ConfigurableComponent::setSupportedProperties`. Yes, the copies can be avoided both here and there, but it is completely inconsequential, as this happens only once per flow initialization. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st… URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r394634612 ## File path: extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.cpp ## @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "RocksDbPersistableKeyValueStoreService.h" + +#include "utils/StringUtils.h" + +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace controllers { + +core::Property RocksDbPersistableKeyValueStoreService::Directory( +core::PropertyBuilder::createProperty("Directory")->withDescription("Path to a directory for the database") +->isRequired(true)->build()); + +RocksDbPersistableKeyValueStoreService::RocksDbPersistableKeyValueStoreService(const std::string& name, const std::string& id) +: controllers::KeyValueStoreService(name, id) +, controllers::AbstractAutoPersistingKeyValueStoreService(name, id) +, db_valid_(false) +, logger_(logging::LoggerFactory::getLogger()) { +} + +RocksDbPersistableKeyValueStoreService::RocksDbPersistableKeyValueStoreService(const std::string& name, utils::Identifier uuid /*= utils::Identifier()*/) +: controllers::KeyValueStoreService(name, uuid) +, controllers::AbstractAutoPersistingKeyValueStoreService(name, uuid) +, db_valid_(false) +, logger_(logging::LoggerFactory::getLogger()) { +} + +RocksDbPersistableKeyValueStoreService::~RocksDbPersistableKeyValueStoreService() { + if (db_valid_) { +delete db_; + } +} + +void RocksDbPersistableKeyValueStoreService::initialize() { + AbstractAutoPersistingKeyValueStoreService::initialize(); + std::set supportedProperties; + supportedProperties.insert(Directory); + updateSupportedProperties(supportedProperties); +} + +void RocksDbPersistableKeyValueStoreService::onEnable() { + if (configuration_ == nullptr) { +logger_->log_debug("Cannot enable RocksDbPersistableKeyValueStoreService"); +return; + } + + AbstractAutoPersistingKeyValueStoreService::onEnable(); + + if (!getProperty(Directory.getName(), directory_)) { +logger_->log_error("Invalid or missing property: Directory"); +return; + } + + if (db_valid_) { +db_valid_ = false; +delete db_; + } + rocksdb::Options options; + options.create_if_missing = true; + if (!always_persist_) { +options.manual_wal_flush = true; + } + rocksdb::Status status = rocksdb::DB::Open(options, directory_, _); + if (status.ok()) { +db_valid_ = true; +logger_->log_trace("Successfully opened RocksDB database at %s", directory_.c_str()); + } else { +logger_->log_error("Failed to open RocksDB database at %s, error: %s", directory_.c_str(), status.getState()); +return; + } + + if (always_persist_) { +default_write_options.sync = true; + } + + logger_->log_trace("Enabled RocksDbPersistableKeyValueStoreService"); +} + +void RocksDbPersistableKeyValueStoreService::notifyStop() { + AbstractAutoPersistingKeyValueStoreService::notifyStop(); + + if (db_valid_) { +delete db_; +db_valid_= false; + } +} + +bool RocksDbPersistableKeyValueStoreService::set(const std::string& key, const std::string& value) { + if (!db_valid_) { Review comment: Replaced with unique_ptr, so no longer relevant. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st… URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r394634269 ## File path: extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.h ## @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONTROLLERS_KEYVALUE_RocksDbPersistableKeyValueStoreService_H_ +#define LIBMINIFI_INCLUDE_CONTROLLERS_KEYVALUE_RocksDbPersistableKeyValueStoreService_H_ + +#include "controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h" +#include "core/Core.h" +#include "properties/Configure.h" +#include "core/logging/Logger.h" +#include "core/logging/LoggerConfiguration.h" + +#include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/slice.h" + +#include +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace controllers { + +class RocksDbPersistableKeyValueStoreService : public AbstractAutoPersistingKeyValueStoreService { + public: + explicit RocksDbPersistableKeyValueStoreService(const std::string& name, const std::string& id); + explicit RocksDbPersistableKeyValueStoreService(const std::string& name, utils::Identifier uuid = utils::Identifier()); + + virtual ~RocksDbPersistableKeyValueStoreService(); + + static core::Property Directory; + + virtual void initialize() override; + virtual void onEnable() override; + virtual void notifyStop() override; + + virtual bool set(const std::string& key, const std::string& value) override; + + virtual bool get(const std::string& key, std::string& value) override; + + virtual bool get(std::unordered_map& kvs) override; + + virtual bool remove(const std::string& key) override; + + virtual bool clear() override; + + virtual bool update(const std::string& key, const std::function& update_func) override; + + virtual bool persist() override; + + protected: + std::string directory_; + + rocksdb::DB* db_; + rocksdb::WriteOptions default_write_options; + bool db_valid_; Review comment: It was a remnant from the UnorderedMap version (which was implemented before this). Replaced along with using a unique_ptr. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st… URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r394634441 ## File path: extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.cpp ## @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "RocksDbPersistableKeyValueStoreService.h" + +#include "utils/StringUtils.h" + +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace controllers { + +core::Property RocksDbPersistableKeyValueStoreService::Directory( +core::PropertyBuilder::createProperty("Directory")->withDescription("Path to a directory for the database") +->isRequired(true)->build()); + +RocksDbPersistableKeyValueStoreService::RocksDbPersistableKeyValueStoreService(const std::string& name, const std::string& id) +: controllers::KeyValueStoreService(name, id) +, controllers::AbstractAutoPersistingKeyValueStoreService(name, id) +, db_valid_(false) +, logger_(logging::LoggerFactory::getLogger()) { +} + +RocksDbPersistableKeyValueStoreService::RocksDbPersistableKeyValueStoreService(const std::string& name, utils::Identifier uuid /*= utils::Identifier()*/) +: controllers::KeyValueStoreService(name, uuid) +, controllers::AbstractAutoPersistingKeyValueStoreService(name, uuid) +, db_valid_(false) +, logger_(logging::LoggerFactory::getLogger()) { +} + +RocksDbPersistableKeyValueStoreService::~RocksDbPersistableKeyValueStoreService() { + if (db_valid_) { +delete db_; + } +} + +void RocksDbPersistableKeyValueStoreService::initialize() { + AbstractAutoPersistingKeyValueStoreService::initialize(); + std::set supportedProperties; + supportedProperties.insert(Directory); + updateSupportedProperties(supportedProperties); +} + +void RocksDbPersistableKeyValueStoreService::onEnable() { + if (configuration_ == nullptr) { +logger_->log_debug("Cannot enable RocksDbPersistableKeyValueStoreService"); +return; + } + + AbstractAutoPersistingKeyValueStoreService::onEnable(); + + if (!getProperty(Directory.getName(), directory_)) { +logger_->log_error("Invalid or missing property: Directory"); +return; + } + + if (db_valid_) { +db_valid_ = false; +delete db_; + } + rocksdb::Options options; + options.create_if_missing = true; Review comment: Good point, this was written last summer, we didn't know this back then. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st… URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r394633928 ## File path: extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.h ## @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONTROLLERS_KEYVALUE_RocksDbPersistableKeyValueStoreService_H_ +#define LIBMINIFI_INCLUDE_CONTROLLERS_KEYVALUE_RocksDbPersistableKeyValueStoreService_H_ + +#include "controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h" +#include "core/Core.h" +#include "properties/Configure.h" +#include "core/logging/Logger.h" +#include "core/logging/LoggerConfiguration.h" + +#include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/slice.h" + +#include +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace controllers { + +class RocksDbPersistableKeyValueStoreService : public AbstractAutoPersistingKeyValueStoreService { + public: + explicit RocksDbPersistableKeyValueStoreService(const std::string& name, const std::string& id); + explicit RocksDbPersistableKeyValueStoreService(const std::string& name, utils::Identifier uuid = utils::Identifier()); + + virtual ~RocksDbPersistableKeyValueStoreService(); + + static core::Property Directory; + + virtual void initialize() override; + virtual void onEnable() override; + virtual void notifyStop() override; + + virtual bool set(const std::string& key, const std::string& value) override; + + virtual bool get(const std::string& key, std::string& value) override; + + virtual bool get(std::unordered_map& kvs) override; + + virtual bool remove(const std::string& key) override; + + virtual bool clear() override; + + virtual bool update(const std::string& key, const std::function& update_func) override; + + virtual bool persist() override; + + protected: + std::string directory_; + + rocksdb::DB* db_; Review comment: None of our rocksdb stuff uses a unique_ptr, but it can, rewritten. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st… URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r393557670 ## File path: extensions/standard-processors/controllers/UnorderedMapKeyValueStoreService.h ## @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __UNORDERED_MAP_KEY_VALUE_STORE_SERVICE_H__ Review comment: I can very well imagine having multiple volatile implementations of the `KeyValueStoreService` in the future, with different properties than this, and I didn't want to designate this **the** `VolatileKeyValueStoreService`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st… URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r393557670 ## File path: extensions/standard-processors/controllers/UnorderedMapKeyValueStoreService.h ## @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __UNORDERED_MAP_KEY_VALUE_STORE_SERVICE_H__ Review comment: I can very well imagine having multiple volatile implementations of the KeyValueStoreService in the future, with different properties than this, and I didn't want to designate this *the* VolatileKeyValueStoreService. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st… URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r393555662 ## File path: extensions/windows-event-log/Bookmark.cpp ## @@ -222,7 +184,7 @@ bool Bookmark::getBookmarkXmlFromFile(std::wstring& bookmarkXml) { if (std::wstring::npos == pos) { logger_->log_error("No '!' in bookmarXml '%ls'", bookmarkXml.c_str()); bookmarkXml.clear(); -return createEmptyBookmarkXmlFile(); + return false; Review comment: Thanks, since these are Windows-only, I edited them in VS when migrating them, and well... this is the result. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st… URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r393554128 ## File path: extensions/sftp/processors/ListSFTP.cpp ## @@ -760,140 +713,83 @@ void ListSFTP::listByTrackingTimestamps( } } -bool ListSFTP::persistTrackingEntitiesCache(const std::string& hostname, const std::string& username, const std::string& remote_path) { - std::ofstream file(tracking_entities_state_filename_); - if (!file.is_open()) { -logger_->log_error("Failed to store Tracking Entities state to state file \"%s\"", tracking_entities_state_filename_.c_str()); -return false; - } - file << "hostname=" << hostname << "\n"; - file << "username=" << username << "\n"; - file << "remote_path=" << remote_path << "\n"; - file << "json_state_file=" << tracking_entities_state_json_filename_ << "\n"; - file.close(); - - std::ofstream json_file(tracking_entities_state_json_filename_); - if (!json_file.is_open()) { -logger_->log_error("Failed to store Tracking Entities state to state json file \"%s\"", tracking_entities_state_json_filename_.c_str()); -return false; - } - - rapidjson::Document entities(rapidjson::kObjectType); - rapidjson::Document::AllocatorType& alloc = entities.GetAllocator(); - for (const auto& already_listed_entity : already_listed_entities_) { -rapidjson::Value entity(rapidjson::kObjectType); -entity.AddMember("timestamp", already_listed_entity.second.timestamp, alloc); -entity.AddMember("size", already_listed_entity.second.size, alloc); -entities.AddMember(rapidjson::Value(already_listed_entity.first.c_str(), alloc), std::move(entity), alloc); +bool ListSFTP::persistTrackingEntitiesCache(const std::shared_ptr& context, const std::string& hostname, const std::string& username, const std::string& remote_path) { + std::unordered_map state; + state["listing_strategy"] = listing_strategy_; + state["hostname"] = hostname; + state["username"] = username; + state["remote_path"] = remote_path; + size_t i = 0; + for (const auto _listed_entity : already_listed_entities_) { +state["entity." + std::to_string(i) + ".name"] = already_listed_entity.first; +state["entity." + std::to_string(i) + ".timestamp"] = std::to_string(already_listed_entity.second.timestamp); +state["entity." + std::to_string(i) + ".size"] = std::to_string(already_listed_entity.second.size); +++i; } - - rapidjson::OStreamWrapper osw(json_file); - rapidjson::Writer writer(osw); - entities.Accept(writer); - - return true; + return state_manager_->set(state); } -bool ListSFTP::updateFromTrackingEntitiesCache(const std::string& hostname, const std::string& username, const std::string& remote_path) { - std::ifstream file(tracking_entities_state_filename_); - if (!file.is_open()) { -logger_->log_error("Failed to open Tracking Entities state file \"%s\"", tracking_entities_state_filename_.c_str()); -return false; - } +bool ListSFTP::updateFromTrackingEntitiesCache(const std::shared_ptr& context, const std::string& hostname, const std::string& username, const std::string& remote_path) { + std::string state_listing_strategy; std::string state_hostname; std::string state_username; std::string state_remote_path; - std::string state_json_state_file; - - std::string line; - while (std::getline(file, line)) { -size_t separator_pos = line.find('='); -if (separator_pos == std::string::npos) { - logger_->log_warn("None key-value line found in Tracking Entities state file \"%s\": \"%s\"", tracking_entities_state_filename_.c_str(), line.c_str()); - continue; -} -std::string key = line.substr(0, separator_pos); -std::string value = line.substr(separator_pos + 1); -if (key == "hostname") { - state_hostname = std::move(value); -} else if (key == "username") { - state_username = std::move(value); -} else if (key == "remote_path") { - state_remote_path = std::move(value); -} else if (key == "json_state_file") { - state_json_state_file = std::move(value); -} else { - logger_->log_warn("Unknown key found in Tracking Entities state file \"%s\": \"%s\"", tracking_entities_state_filename_.c_str(), key.c_str()); -} - } - file.close(); - - if (state_hostname != hostname || - state_username != username || - state_remote_path != remote_path) { -logger_->log_error("Tracking Entities state file \"%s\" was created with different settings than the current ones, ignoring. " - "Hostname: \"%s\" vs. \"%s\", " - "Username: \"%s\" vs. \"%s\", " - "Remote Path: \"%s\" vs. \"%s\"", - tracking_entities_state_filename_.c_str(), - state_hostname, hostname, - state_username, username, -
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st… URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r393551822 ## File path: extensions/standard-processors/controllers/UnorderedMapPersistableKeyValueStoreService.h ## @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __UNORDERED_MAP_PERSISTABLE_KEY_VALUE_STORE_SERVICE_H__ +#define __UNORDERED_MAP_PERSISTABLE_KEY_VALUE_STORE_SERVICE_H__ + +#include "controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h" +#include "UnorderedMapKeyValueStoreService.h" +#include "core/Core.h" +#include "properties/Configure.h" +#include "core/logging/Logger.h" +#include "core/logging/LoggerConfiguration.h" +#include "core/Resource.h" + +#include +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace controllers { + +class UnorderedMapPersistableKeyValueStoreService : public AbstractAutoPersistingKeyValueStoreService, Review comment: The motivation is twofold: to provide an implementation without any third party requirements, and to provide a serialized format which is human-readable and easily editable. As for the first motivation: we don't want to make it a hard requirement to have rocksdb to use stateful processors, because that would hinder low-footprint usages, nor do I want to make any other third party a dependency for this: it would, again, introduce an unwanted requirement for low-footprint usages and we would have to find one that supports all target platforms, and maintain it, which is not a negligible maintenance burden. For the second one: so far users could easily clear a state by deleting the state file or directory. With the rocksdb state storage this is no longer a trivial task. The C2 methods introduced here attempt to solve this problem, but until they are finalized (or if someone does not want to use, or can't use C2), they aren't really useful. A solution for this is using this state storage method: its serialized format is trivial and editable in text format without any third party tools, making it easy to clear (or even modify) the state of processors. Finally for the support argument: this is a trivial, and, if you take a look at `PersistableKeyValueStoreServiceTest.cpp`, well-tested implementation, that I don't think would introduce a significant maintenance burden going forward. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st… URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r390714137 ## File path: libminifi/src/c2/C2Agent.cpp ## @@ -435,6 +437,27 @@ void C2Agent::handle_c2_server_response(const C2ContentResponse ) { update_sink_->drainRepositories(); C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); enqueue_c2_response(std::move(response)); + } else if (resp.name == "corecomponentstate") { Review comment: This has only been written to make sure that it fits the architecture, it hasn't even been tested manually. Once https://github.com/apache/nifi-minifi-cpp/pull/743 is done and merged, I can probably write reasonable tests for it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st… URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r390713995 ## File path: libminifi/src/c2/C2Agent.cpp ## @@ -596,6 +619,29 @@ void C2Agent::handle_describe(const C2ContentResponse ) { } enqueue_c2_response(std::move(response)); } + } else if (resp.name == "corecomponentstate") { Review comment: This has only been tested manually. Once https://github.com/apache/nifi-minifi-cpp/pull/743 is done and merged, I can probably write reasonable tests for it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st… URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r390712104 ## File path: libminifi/include/core/ProcessContext.h ## @@ -193,6 +216,61 @@ class ProcessContext : public controller::ControllerServiceLookup, public core:: return controller_service_provider_->getControllerServiceName(identifier); } + static constexpr char const* DefaultStateManagerProviderName = "defaultstatemanagerprovider"; + + std::shared_ptr getStateManager() { +if (state_manager_provider_ == nullptr) { + return nullptr; +} +return state_manager_provider_->getCoreComponentStateManager(*processor_node_); + } + + static std::shared_ptr getOrCreateDefaultStateManagerProvider( + std::shared_ptr controller_service_provider, + const char *base_path = "") { +static std::mutex mutex; +std::lock_guard lock(mutex); + +/* See if we have already created a default provider */ +std::shared_ptr node = controller_service_provider->getControllerServiceNode(DefaultStateManagerProviderName); // TODO +if (node != nullptr) { + return std::dynamic_pointer_cast(node->getControllerServiceImplementation()); +} + +/* Try to create a RocksDB-backed provider */ +node = controller_service_provider->createControllerService("RocksDbPersistableKeyValueStoreService", + "org.apache.nifi.minifi.controllers.RocksDbPersistableKeyValueStoreService", + DefaultStateManagerProviderName, +true /*firstTimeAdded*/); +if (node != nullptr) { + node->initialize(); + auto provider = node->getControllerServiceImplementation(); + if (provider != nullptr) { +provider->setProperty("Directory", utils::file::FileUtils::concat_path(base_path, "corecomponentstate")); +node->enable(); +return std::dynamic_pointer_cast(provider); + } +} + +/* Fall back to a locked unordered map-backed provider */ +node = controller_service_provider->createControllerService("UnorderedMapPersistableKeyValueStoreService", + "org.apache.nifi.minifi.controllers.UnorderedMapPersistableKeyValueStoreService", + DefaultStateManagerProviderName, +true /*firstTimeAdded*/); +if (node != nullptr) { + node->initialize(); + auto provider = node->getControllerServiceImplementation(); + if (provider != nullptr) { +provider->setProperty("File", utils::file::FileUtils::concat_path(base_path, "corecomponentstate.txt")); +node->enable(); +return std::dynamic_pointer_cast(provider); + } +} Review comment: @szaszm Ended up adding more code to this, and now it really made sense to deduplicate it, so I've done that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st… URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r390660992 ## File path: libminifi/include/controllers/keyvalue/PersistableKeyValueStoreService.h ## @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_KEYVALUE_PersistableKeyValueStoreService_H_ +#define LIBMINIFI_INCLUDE_KEYVALUE_PersistableKeyValueStoreService_H_ + +#include "KeyValueStoreService.h" +#include "AbstractCoreComponentStateManagerProvider.h" +#include "core/Core.h" +#include "properties/Configure.h" + +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace controllers { + +class PersistableKeyValueStoreService : virtual public KeyValueStoreService, public AbstractCoreComponentStateManagerProvider { Review comment: `PersistableKeyValueStoreService` is just an interface extension of `KeyValueStoreService`, and forcefully wrapping a `PersistableKeyValueStoreService` into some class that provides the `CoreComponentStateManagerProvider` interface after loading the `PersistableKeyValueStoreService` with the classloader would make it impossible to implement and use a `CoreComponentStateManagerProvider` directly. This way everything "just works": if you implement a `PersistableKeyValueStoreService` you automatically have a `CoreComponentStateManagerProvider`, but you can implement a `CoreComponentStateManagerProvider` directly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st… URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r390660992 ## File path: libminifi/include/controllers/keyvalue/PersistableKeyValueStoreService.h ## @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_KEYVALUE_PersistableKeyValueStoreService_H_ +#define LIBMINIFI_INCLUDE_KEYVALUE_PersistableKeyValueStoreService_H_ + +#include "KeyValueStoreService.h" +#include "AbstractCoreComponentStateManagerProvider.h" +#include "core/Core.h" +#include "properties/Configure.h" + +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace controllers { + +class PersistableKeyValueStoreService : virtual public KeyValueStoreService, public AbstractCoreComponentStateManagerProvider { Review comment: PersistableKeyValueStoreService is just an interface extension of PersistableKeyValueStoreService, and forcefully wrapping a PersistableKeyValueStoreService into some class that provides the CoreComponentStateManagerProvider interface after loading the PersistableKeyValueStoreService with the classloader would make it impossible to implement a CoreComponentStateManagerProvider directly. This way everything "just works": if you implement a PersistableKeyValueStoreService you automatically have a CoreComponentStateManagerProvider, but you can implement a CoreComponentStateManagerProvider directly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st… URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r390660992 ## File path: libminifi/include/controllers/keyvalue/PersistableKeyValueStoreService.h ## @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_KEYVALUE_PersistableKeyValueStoreService_H_ +#define LIBMINIFI_INCLUDE_KEYVALUE_PersistableKeyValueStoreService_H_ + +#include "KeyValueStoreService.h" +#include "AbstractCoreComponentStateManagerProvider.h" +#include "core/Core.h" +#include "properties/Configure.h" + +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace controllers { + +class PersistableKeyValueStoreService : virtual public KeyValueStoreService, public AbstractCoreComponentStateManagerProvider { Review comment: `PersistableKeyValueStoreService` is just an interface extension of `KeyValueStoreService`, and forcefully wrapping a `PersistableKeyValueStoreService` into some class that provides the `CoreComponentStateManagerProvider` interface after loading the `PersistableKeyValueStoreService` with the classloader would make it impossible to implement and use a `CoreComponentStateManagerProvider` directly. This way everything "just works": if you implement a `PersistableKeyValueStoreService` you automatically have a `CoreComponentStateManagerProvider`, but you can implement a CoreComponentStateManagerProvider directly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st… URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r389775151 ## File path: libminifi/test/integration/IntegrationBase.h ## @@ -109,6 +110,16 @@ void IntegrationBase::run(std::string test_file_location) { core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location); + auto controller_service_provider = yaml_ptr->getControllerServiceProvider(); + std::string state_dir_name_template = "/tmp/integrationstate.XX"; + std::vector state_dir_buf(state_dir_name_template.c_str(), + state_dir_name_template.c_str() + state_dir_name_template.size() + 1); + if (mkdtemp(state_dir_buf.data()) == nullptr) { +throw std::runtime_error("Failed to create temporary directory for state"); + } + state_dir = state_dir_buf.data(); Review comment: No longer exists in the rebased version. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st… URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r389768446 ## File path: libminifi/src/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.cpp ## @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "controllers/keyvalue/AbstractCoreComponentStateManagerProvider.h" + +#include "rapidjson/rapidjson.h" +#include "rapidjson/document.h" +#include "rapidjson/stringbuffer.h" +#include "rapidjson/writer.h" + +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace controllers { + +AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::AbstractCoreComponentStateManager( +std::shared_ptr provider, +const std::string& id) +: provider_(std::move(provider)) +, id_(id) +, state_valid_(false) { + std::string serialized; + if (provider_->getImpl(id_, serialized)) { +if (provider_->deserialize(serialized, state_)) { Review comment: Ah, yeah, that would make sense, thank you. :D This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st… URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r389762676 ## File path: libminifi/include/core/ProcessContext.h ## @@ -193,6 +216,61 @@ class ProcessContext : public controller::ControllerServiceLookup, public core:: return controller_service_provider_->getControllerServiceName(identifier); } + static constexpr char const* DefaultStateManagerProviderName = "defaultstatemanagerprovider"; + + std::shared_ptr getStateManager() { +if (state_manager_provider_ == nullptr) { + return nullptr; +} +return state_manager_provider_->getCoreComponentStateManager(*processor_node_); + } + + static std::shared_ptr getOrCreateDefaultStateManagerProvider( + std::shared_ptr controller_service_provider, + const char *base_path = "") { +static std::mutex mutex; +std::lock_guard lock(mutex); + +/* See if we have already created a default provider */ +std::shared_ptr node = controller_service_provider->getControllerServiceNode(DefaultStateManagerProviderName); // TODO +if (node != nullptr) { + return std::dynamic_pointer_cast(node->getControllerServiceImplementation()); +} + +/* Try to create a RocksDB-backed provider */ +node = controller_service_provider->createControllerService("RocksDbPersistableKeyValueStoreService", + "org.apache.nifi.minifi.controllers.RocksDbPersistableKeyValueStoreService", + DefaultStateManagerProviderName, +true /*firstTimeAdded*/); +if (node != nullptr) { + node->initialize(); + auto provider = node->getControllerServiceImplementation(); + if (provider != nullptr) { +provider->setProperty("Directory", utils::file::FileUtils::concat_path(base_path, "corecomponentstate")); +node->enable(); +return std::dynamic_pointer_cast(provider); + } +} + +/* Fall back to a locked unordered map-backed provider */ +node = controller_service_provider->createControllerService("UnorderedMapPersistableKeyValueStoreService", + "org.apache.nifi.minifi.controllers.UnorderedMapPersistableKeyValueStoreService", + DefaultStateManagerProviderName, +true /*firstTimeAdded*/); +if (node != nullptr) { + node->initialize(); + auto provider = node->getControllerServiceImplementation(); + if (provider != nullptr) { +provider->setProperty("File", utils::file::FileUtils::concat_path(base_path, "corecomponentstate.txt")); +node->enable(); +return std::dynamic_pointer_cast(provider); + } +} Review comment: Because we need to set a File in one and a Directory in the other, obviously with different names, creating a function from this would not save us much - it would actually make it less readable in my opinion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st… URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r389760968 ## File path: libminifi/src/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.cpp ## @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace controllers { + +core::Property AbstractAutoPersistingKeyValueStoreService::AlwaysPersist( +core::PropertyBuilder::createProperty("Always Persist")->withDescription("Persist every change instead of persisting it periodically.") +->isRequired(false)->withDefaultValue(false)->build()); +core::Property AbstractAutoPersistingKeyValueStoreService::AutoPersistenceInterval( +core::PropertyBuilder::createProperty("Auto Persistence Interval")->withDescription("The interval of the periodic task persisting all values. " + "Only used if Always Persist is false. " + "If set to 0 seconds, auto persistence will be disabled.") +->isRequired(false)->withDefaultValue("1 min")->build()); + +AbstractAutoPersistingKeyValueStoreService::AbstractAutoPersistingKeyValueStoreService(const std::string& name, const std::string& id) +: KeyValueStoreService(name, id) +, PersistableKeyValueStoreService(name, id) +, always_persist_(false) +, auto_persistence_interval_(0U) +, running_(false) +, logger_(logging::LoggerFactory::getLogger()) { +} + +AbstractAutoPersistingKeyValueStoreService::AbstractAutoPersistingKeyValueStoreService(const std::string& name, utils::Identifier uuid /*= utils::Identifier()*/) +: KeyValueStoreService(name, uuid) +, PersistableKeyValueStoreService(name, uuid) +, always_persist_(false) +, auto_persistence_interval_(0U) +, running_(false) +, logger_(logging::LoggerFactory::getLogger()) { +} + +AbstractAutoPersistingKeyValueStoreService::~AbstractAutoPersistingKeyValueStoreService() { + notifyStop(); +} + +void AbstractAutoPersistingKeyValueStoreService::initialize() { + ControllerService::initialize(); + std::set supportedProperties; + supportedProperties.insert(AlwaysPersist); + supportedProperties.insert(AutoPersistenceInterval); + updateSupportedProperties(supportedProperties); +} + +void AbstractAutoPersistingKeyValueStoreService::onEnable() { + std::unique_lock lock(persisting_mutex_); + + if (configuration_ == nullptr) { +logger_->log_debug("Cannot enable AbstractAutoPersistingKeyValueStoreService"); +return; + } + + std::string value; + if (!getProperty(AlwaysPersist.getName(), value)) { +logger_->log_error("Always Persist attribute is missing or invalid"); + } else { +utils::StringUtils::StringToBool(value, always_persist_); + } + if (!getProperty(AutoPersistenceInterval.getName(), value)) { +logger_->log_error("Auto Persistence Interval attribute is missing or invalid"); + } else { +core::TimeUnit unit; +if (!core::Property::StringToTime(value, auto_persistence_interval_, unit) || !core::Property::ConvertTimeUnitToMS(auto_persistence_interval_, unit, auto_persistence_interval_)) { + logger_->log_error("Auto Persistence Interval attribute is invalid"); +} + } + + if (!always_persist_ && auto_persistence_interval_ != 0U) { +if (!persisting_thread_.joinable()) { + logger_->log_trace("Starting auto persistence thread"); + running_ = true; + persisting_thread_ = std::thread(::persistingThreadFunc, this); +} + } + + logger_->log_trace("Enabled AbstractAutoPersistingKeyValueStoreService"); +} + +void AbstractAutoPersistingKeyValueStoreService::notifyStop() { + if (persisting_thread_.joinable()) { +{ + std::lock_guard lock(persisting_mutex_); + running_ = false; + persisting_cv_.notify_one(); +} +persisting_thread_.join(); + } +} + +void
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st… URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r389760197 ## File path: libminifi/test/TestBase.cpp ## @@ -30,6 +30,25 @@ TestPlan::TestPlan(std::shared_ptr content_repo, std::s flow_version_(flow_version), logger_(logging::LoggerFactory::getLogger()) { stream_factory = org::apache::nifi::minifi::io::StreamFactory::getInstance(std::make_shared()); + controller_services_ = std::make_shared(); + controller_services_provider_ = std::make_shared(controller_services_, nullptr, configuration_); + /* Inject the default state provider ahead of ProcessContext to make sure we have a unique state directory */ + if (state_dir == nullptr) { +std::string state_dir_name_template = "/tmp/teststate.XX"; +std::vector state_dir_buf(state_dir_name_template.c_str(), + state_dir_name_template.c_str() + state_dir_name_template.size() + 1); Review comment: No longer exists in the rebased version. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st… URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r389759366 ## File path: libminifi/test/keyvalue-tests/PersistableKeyValueStoreServiceTest.cpp ## @@ -0,0 +1,257 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define CATCH_CONFIG_RUNNER +#include "catch.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include "../TestBase.h" +#include "../../controller/Controller.h" +#include "core/controller/ControllerService.h" +#include "core/ProcessGroup.h" +#include "core/yaml/YamlConfiguration.h" +#include "controllers/keyvalue/PersistableKeyValueStoreService.h" +#include "controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h" + +static std::string config_yaml; // NOLINT + +static inline void configYamlHandler(Catch::ConfigData&, const std::string& path) { + config_yaml = path; +} + +int main(int argc, char* argv[]) { + Catch::Session session; + + Catch::Clara::CommandLine& cli = const_cast&>(session.cli()); + cli["--config-yaml"] + .describe("path to the config.yaml containing the PersistableKeyValueStoreService controller service configuration") + .bind(, "path"); + + int ret = session.applyCommandLine(argc, argv); + if (ret != 0) { +return ret; + } + + if (config_yaml.empty()) { +std::cerr << "Missing --config-yaml . It must contain the path to the config.yaml containing the PersistableKeyValueStoreService controller service configuration." << std::endl; +return -1; + } + + return session.run(); +} + +class PersistableKeyValueStoreServiceTestsFixture { + public: + PersistableKeyValueStoreServiceTestsFixture() + : state_dir(strdup("/tmp/state.XX")) { +LogTestController::getInstance().setTrace(); + LogTestController::getInstance().setTrace(); + LogTestController::getInstance().setTrace(); + +// Create temporary directories +testController.createTempDirectory(state_dir, false /*cleanup*/); +REQUIRE(state_dir != nullptr); +REQUIRE(0 == chdir(state_dir)); + +loadYaml(); + } + + virtual ~PersistableKeyValueStoreServiceTestsFixture() { +free(state_dir); +LogTestController::getInstance().reset(); + } + + void loadYaml() { +controller.reset(); +persistable_key_value_store_service_node.reset(); + +process_group.reset(); +yaml_config.reset(); + +stream_factory.reset(); +content_repo.reset(); +test_flow_repo.reset(); +test_repo.reset(); +configuration.reset(); + +configuration = std::make_shared(); +test_repo = std::make_shared(); +test_flow_repo = std::make_shared(); + +configuration->set(minifi::Configure::nifi_flow_configuration_file, config_yaml); + +content_repo = std::make_shared(); +content_repo->initialize(configuration); +stream_factory = minifi::io::StreamFactory::getInstance(configuration); + +yaml_config = std::unique_ptr(new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, config_yaml)); + +process_group = yaml_config->getRoot(config_yaml); +persistable_key_value_store_service_node = process_group->findControllerService("testcontroller"); +REQUIRE(persistable_key_value_store_service_node != nullptr); +persistable_key_value_store_service_node->enable(); + +controller = std::dynamic_pointer_cast( + persistable_key_value_store_service_node->getControllerServiceImplementation()); +REQUIRE(controller != nullptr); + } + + protected: + char *state_dir; + std::shared_ptr configuration; + std::shared_ptr test_repo; + std::shared_ptr test_flow_repo; + std::shared_ptr content_repo; + std::shared_ptr stream_factory; + + std::unique_ptr yaml_config; + std::unique_ptr process_group; + + std::shared_ptr persistable_key_value_store_service_node; + std::shared_ptr controller; + + TestController testController; +}; + + +TEST_CASE_METHOD(PersistableKeyValueStoreServiceTestsFixture, "PersistableKeyValueStoreServiceTestsFixture set and get", "[basic]") { + const char* key = "foobar"; + const char* value
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st… URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r389757475 ## File path: libminifi/include/core/ProcessContext.h ## @@ -77,6 +85,21 @@ class ProcessContext : public controller::ControllerServiceLookup, public core:: processor_node_(processor), logger_(logging::LoggerFactory::getLogger()) { repo_ = repo; +std::string id; +if (configuration->get(minifi::Configure::nifi_state_management_provider_local, id)) { + auto node = controller_service_provider_->getControllerServiceNode(id); + if (node == nullptr) { +logger_->log_error("Failed to find the CoreComponentStateManagerProvider %s defined by %s", id, minifi::Configure::nifi_state_management_provider_local); Review comment: This is not a fatal failure, only in the case of processors that really need to store state. If state storage is misconfigured, processors that do not require it should still run fine. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st… URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r389756151 ## File path: libminifi/include/core/CoreComponentState.h ## @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIBMINIFI_INCLUDE_CORE_CoreComponentState_H_ +#define LIBMINIFI_INCLUDE_CORE_CoreComponentState_H_ + +#include "Core.h" + +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +class CoreComponentStateManager { + public: + virtual ~CoreComponentStateManager() { + } + + virtual bool set(const std::unordered_map& kvs) = 0; + + virtual bool get(std::unordered_map& kvs) = 0; Review comment: Agreed, but signalling errors here in bools is more practical than through exceptions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st… URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r389755764 ## File path: libminifi/src/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.cpp ## @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "controllers/keyvalue/AbstractCoreComponentStateManagerProvider.h" + +#include "rapidjson/rapidjson.h" +#include "rapidjson/document.h" +#include "rapidjson/stringbuffer.h" +#include "rapidjson/writer.h" + +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace controllers { + +AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::AbstractCoreComponentStateManager( +std::shared_ptr provider, +const std::string& id) +: provider_(std::move(provider)) +, id_(id) +, state_valid_(false) { + std::string serialized; + if (provider_->getImpl(id_, serialized)) { +if (provider_->deserialize(serialized, state_)) { Review comment: Could you elaborate? What should be `&&`? `serialized` is not modified, `state_` is the output. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st… URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r389753888 ## File path: libminifi/include/core/ProcessContext.h ## @@ -193,6 +216,61 @@ class ProcessContext : public controller::ControllerServiceLookup, public core:: return controller_service_provider_->getControllerServiceName(identifier); } + static constexpr char const* DefaultStateManagerProviderName = "defaultstatemanagerprovider"; Review comment: > default- ok > state- stateless is generally better, but ok This is a PR about creating a centralized state storage mechanism for processors that require state, instead of all of them implementing individual state files. "Stateless is generally better" is a mind-bogglingly meaningless comment here. > manager- meaningless. Only suggests doing something, so it should probably be a verb. Having a noun in place of a verb usually means trying to use a class where a function would be better. StateManager comes from NiFi, and it is a perfectly meaningful name: it is an object that helps manage the state of a single CoreComponent. It cannot be a function as it contains some caching and validation logic that requires it in itself to be stateful. > provider- again, a class instead of a function. This has a meaning though. CoreComponentStateManagerProvider needs to exist as interface, which is used by the ClassLoader and we access different implementations of it through that interface. > name- ok > > I'm not saying that your code is bad, just that this is a red flag that suggests a design mistake that may or may not be in your changes. I'm not saying your comment is useless, but it could be more useful if you tried to interpret the name in the context of the PR, instead of in itself. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st… URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r389753888 ## File path: libminifi/include/core/ProcessContext.h ## @@ -193,6 +216,61 @@ class ProcessContext : public controller::ControllerServiceLookup, public core:: return controller_service_provider_->getControllerServiceName(identifier); } + static constexpr char const* DefaultStateManagerProviderName = "defaultstatemanagerprovider"; Review comment: > default- ok > state- stateless is generally better, but ok This is a PR about creating a centralized state storage mechanism for processors that require state, instead of all of them implementing individual state files. "Stateless is generally better" is a mind-bogglingly meaningless comment here. > manager- meaningless. Only suggests doing something, so it should probably be a verb. Having a noun in place of a verb usually means trying to use a class where a function would be better. StateManager comes from NiFi, and it is a perfectly meaningful name: it is an object that helps manage the state of a single CoreComponent. It cannot be a function as it contains some caching and validation logic that requires it in itself to be stateful. > provider- again, a class instead of a function. This has a meaning though. CoreComponentStateManagerProvider needs to exist as interface, which is used by the ClassLoader and we access different implementations of it through that interface. > name- ok I'm not saying that your code is bad, just that this is a red flag that suggests a design mistake that may or may not be in your changes. I'm not saying your comment is useless, but it could be more useful if you tried to interpret the name in the context of the PR, instead of in itself. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st… URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r389753888 ## File path: libminifi/include/core/ProcessContext.h ## @@ -193,6 +216,61 @@ class ProcessContext : public controller::ControllerServiceLookup, public core:: return controller_service_provider_->getControllerServiceName(identifier); } + static constexpr char const* DefaultStateManagerProviderName = "defaultstatemanagerprovider"; Review comment: > default- ok > state- stateless is generally better, but ok This is a PR about creating a centralized state storage mechanism for processors that require state, instead of all of them implementing individual state files. "Stateless is generally better" is a mind-bogglingly meaningless comment here. > manager- meaningless. Only suggests doing something, so it should probably be a verb. Having a noun in place of a verb usually means trying to use a class where a function would be better. StateManager comes from NiFi, and it is a perfectly meaningful name: it is an object that helps manage the state of a single CoreComponent. It cannot be a function as it contains some caching and validation logic that requires it in itself to be stateful. > provider- again, a class instead of a function. This has a meaning though. CoreComponentStateManagerProvider needs to exist as interface, which is used by the ClassLoader and we access different implementations of it through that interface. > name- ok >I'm not saying that your code is bad, just that this is a red flag that suggests a design mistake that may or may not be in your changes. I'm not saying your comment is useless, but it could be more useful if you tried to interpret the name in the context of the PR, instead of in itself. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st… URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r389748596 ## File path: extensions/sftp/processors/ListSFTP.cpp ## @@ -507,85 +463,87 @@ ListSFTP::ListedEntity::ListedEntity(uint64_t timestamp_, uint64_t size_) , size(size_) { } -bool ListSFTP::persistTrackingTimestampsCache(const std::string& hostname, const std::string& username, const std::string& remote_path) { - std::ofstream file(tracking_timestamps_state_filename_); - if (!file.is_open()) { -logger_->log_error("Failed to store state to Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str()); +bool ListSFTP::persistTrackingTimestampsCache(const std::shared_ptr& context, const std::string& hostname, const std::string& username, const std::string& remote_path) { + auto state_manager = context->getStateManager(); + if (state_manager == nullptr) { return false; } - file << "hostname=" << hostname << "\n"; - file << "username=" << username << "\n"; - file << "remote_path=" << remote_path << "\n"; - file << "listing.timestamp=" << last_listed_latest_entry_timestamp_ << "\n"; - file << "processed.timestamp=" << last_processed_latest_entry_timestamp_ << "\n"; + std::unordered_map state; + state["listing_strategy"] = LISTING_STRATEGY_TRACKING_TIMESTAMPS; + state["hostname"] = hostname; + state["username"] = username; + state["remote_path"] = remote_path; + state["listing.timestamp"] = std::to_string(last_listed_latest_entry_timestamp_); + state["processed.timestamp"] = std::to_string(last_processed_latest_entry_timestamp_); size_t i = 0; for (const auto& identifier : latest_identifiers_processed_) { -file << "id." << i << "=" << identifier << "\n"; +state["id." + std::to_string(i)] = identifier; ++i; } + state_manager->set(state); + if (!state_manager->persist()) { +return false; + } return true; } -bool ListSFTP::updateFromTrackingTimestampsCache(const std::string& hostname, const std::string& username, const std::string& remote_path) { - std::ifstream file(tracking_timestamps_state_filename_); - if (!file.is_open()) { -logger_->log_error("Failed to open Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str()); -return false; - } +bool ListSFTP::updateFromTrackingTimestampsCache(const std::shared_ptr& context, const std::string& hostname, const std::string& username, const std::string& remote_path) { Review comment: Agreed, we use it because `onTrigger` already gets it this way and it is easier to just pass that to these functions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services