[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-04-26 Thread GitBox


bakaid commented on a change in pull request #605:
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r415336955



##
File path: libminifi/src/c2/C2Agent.cpp
##
@@ -435,6 +437,27 @@ void C2Agent::handle_c2_server_response(const 
C2ContentResponse ) {
 update_sink_->drainRepositories();
 C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
 enqueue_c2_response(std::move(response));
+  } else if (resp.name == "corecomponentstate") {

Review comment:
   This is unfortunately still very much non-trivial to do (unlike testing 
DESCRIBE, which I do now), so I've added a TODO for it as suggested.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-04-26 Thread GitBox


bakaid commented on a change in pull request #605:
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r415336687



##
File path: extensions/sql/processors/QueryDatabaseTable.cpp
##
@@ -308,19 +240,43 @@ void QueryDatabaseTable::processOnSchedule(const 
core::ProcessContext )
   context.getProperty(s_sqlQuery.getName(), sqlQuery_);
   context.getProperty(s_maxRowsPerFlowFile.getName(), maxRowsPerFlowFile_);
 
-  std::string stateDir;
-  context.getProperty(s_stateDirectory.getName(), stateDir);
-  if (stateDir.empty()) {
-logger_->log_error("State Directory is empty");
-return;
+  mapState_.clear();
+
+  state_manager_ = context.getStateManager();
+  if (state_manager_ == nullptr) {
+throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
   }
 
-  pState_ = std::make_unique(tableName_, stateDir, getUUIDStr(), 
logger_);
-  if (!*pState_) {
-return;
+  std::unordered_map state_map;
+  if (state_manager_->get(state_map)) {
+if (state_map[TABLENAME_KEY] != tableName_) {
+  state_manager_->clear();
+} else {
+  for (auto&& elem : state_map) {
+if (elem.first.find(MAXVALUE_KEY_PREFIX) == 0) {
+  mapState_.emplace(elem.first.substr(MAXVALUE_KEY_PREFIX.length()), 
std::move(elem.second));
+}
+  }
+}
+  } else {
+// Try to migrate legacy state file
+std::string stateDir;
+context.getProperty(s_stateDirectory.getName(), stateDir);
+if (!stateDir.empty()) {
+  LegacyState legacyState(tableName_, stateDir, getUUIDStr(), logger_);
+  if (legacyState) {
+mapState_ = legacyState.getStateMap();
+if (saveState() && state_manager_->persist()) {
+  logger_->log_info("State migration successful");
+  legacyState.moveStateFileToMigrated();
+} else {
+  logger_->log_warn("Failed to persists migrated state");
+}
+  } else {
+logger_->log_warn("Could not migrate state from specified State 
Directory %s", stateDir);
+  }
+}

Review comment:
   It is short and very much unlikely to be reused, so I don't see much 
benefit in moving to a function.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-04-26 Thread GitBox


bakaid commented on a change in pull request #605:
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r415333419



##
File path: libminifi/src/c2/C2Agent.cpp
##
@@ -596,6 +619,29 @@ void C2Agent::handle_describe(const C2ContentResponse 
) {
   }
   enqueue_c2_response(std::move(response));
 }
+  } else if (resp.name == "corecomponentstate") {

Review comment:
   Added `C2DescribeCoreComponentStateTest` for this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-04-23 Thread GitBox


bakaid commented on a change in pull request #605:
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r413931006



##
File path: libminifi/src/c2/C2Agent.cpp
##
@@ -435,6 +437,27 @@ void C2Agent::handle_c2_server_response(const 
C2ContentResponse ) {
 update_sink_->drainRepositories();
 C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
 enqueue_c2_response(std::move(response));
+  } else if (resp.name == "corecomponentstate") {

Review comment:
   Let's revisit this after rebasing.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-04-23 Thread GitBox


bakaid commented on a change in pull request #605:
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r413921343



##
File path: extensions/sftp/processors/ListSFTP.cpp
##
@@ -507,85 +468,74 @@ ListSFTP::ListedEntity::ListedEntity(uint64_t timestamp_, 
uint64_t size_)
 , size(size_) {
 }
 
-bool ListSFTP::persistTrackingTimestampsCache(const std::string& hostname, 
const std::string& username, const std::string& remote_path) {
-  std::ofstream file(tracking_timestamps_state_filename_);
-  if (!file.is_open()) {
-logger_->log_error("Failed to store state to Tracking Timestamps state 
file \"%s\"", tracking_timestamps_state_filename_.c_str());
-return false;
-  }
-  file << "hostname=" << hostname << "\n";
-  file << "username=" << username << "\n";
-  file << "remote_path=" << remote_path << "\n";
-  file << "listing.timestamp=" << last_listed_latest_entry_timestamp_ << "\n";
-  file << "processed.timestamp=" << last_processed_latest_entry_timestamp_ << 
"\n";
+bool ListSFTP::persistTrackingTimestampsCache(const 
std::shared_ptr& context, const std::string& hostname, 
const std::string& username, const std::string& remote_path) {
+  std::unordered_map state;
+  state["listing_strategy"] = LISTING_STRATEGY_TRACKING_TIMESTAMPS;
+  state["hostname"] = hostname;
+  state["username"] = username;
+  state["remote_path"] = remote_path;
+  state["listing.timestamp"] = 
std::to_string(last_listed_latest_entry_timestamp_);
+  state["processed.timestamp"] = 
std::to_string(last_processed_latest_entry_timestamp_);
   size_t i = 0;
   for (const auto& identifier : latest_identifiers_processed_) {
-file << "id." << i << "=" << identifier << "\n";
+state["id." + std::to_string(i)] = identifier;
 ++i;
   }
-  return true;
+  return state_manager_->set(state);
 }
 
-bool ListSFTP::updateFromTrackingTimestampsCache(const std::string& hostname, 
const std::string& username, const std::string& remote_path) {
-  std::ifstream file(tracking_timestamps_state_filename_);
-  if (!file.is_open()) {
-logger_->log_error("Failed to open Tracking Timestamps state file \"%s\"", 
tracking_timestamps_state_filename_.c_str());
-return false;
-  }
+bool ListSFTP::updateFromTrackingTimestampsCache(const 
std::shared_ptr& context, const std::string& hostname, 
const std::string& username, const std::string& remote_path) {
+  std::string state_listing_strategy;
   std::string state_hostname;
   std::string state_username;
   std::string state_remote_path;
   uint64_t state_listing_timestamp;
   uint64_t state_processed_timestamp;
   std::set state_ids;
 
-  std::string line;
-  while (std::getline(file, line)) {
-size_t separator_pos = line.find('=');
-if (separator_pos == std::string::npos) {
-  logger_->log_warn("None key-value line found in Tracking Timestamps 
state file \"%s\": \"%s\"", tracking_timestamps_state_filename_.c_str(), 
line.c_str());
+  std::unordered_map state_map;
+  if (!state_manager_->get(state_map)) {
+logger_->log_info("Found no stored state");
+return false;
+  }
+  try {
+state_listing_strategy = state_map.at("listing_strategy");
+state_hostname = state_map.at("hostname");
+state_username = state_map.at("username");
+state_remote_path = state_map.at("remote_path");
+try {
+  state_listing_timestamp = stoull(state_map.at("listing.timestamp"));
+} catch (...) {
+  return false;
 }
-std::string key = line.substr(0, separator_pos);
-std::string value = line.substr(separator_pos + 1);
-if (key == "hostname") {
-  state_hostname = std::move(value);
-} else if (key == "username") {
-  state_username = std::move(value);
-} else if (key == "remote_path") {
-  state_remote_path = std::move(value);
-} else if (key == "listing.timestamp") {
-  try {
-state_listing_timestamp = stoull(value);
-  } catch (...) {
-logger_->log_error("listing.timestamp is not an uint64 in Tracking 
Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
-return false;
-  }
-} else if (key == "processed.timestamp") {
-  try {
-state_processed_timestamp = stoull(value);
-  } catch (...) {
-logger_->log_error("processed.timestamp is not an uint64 in Tracking 
Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
-return false;
+try {
+  state_processed_timestamp = stoull(state_map.at("processed.timestamp"));
+} catch (...) {
+  return false;
+}

Review comment:
   Added individual logging.

##
File path: extensions/sftp/processors/ListSFTP.cpp
##
@@ -760,140 +708,83 @@ void ListSFTP::listByTrackingTimestamps(
   }
 }
 
-bool ListSFTP::persistTrackingEntitiesCache(const std::string& hostname, const 
std::string& username, const std::string& remote_path) {
-  std::ofstream file(tracking_entities_state_filename_);
-  if (!file.is_open()) {
-

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-04-23 Thread GitBox


bakaid commented on a change in pull request #605:
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r413770032



##
File path: extensions/sftp/processors/ListSFTP.cpp
##
@@ -507,85 +463,87 @@ ListSFTP::ListedEntity::ListedEntity(uint64_t timestamp_, 
uint64_t size_)
 , size(size_) {
 }
 
-bool ListSFTP::persistTrackingTimestampsCache(const std::string& hostname, 
const std::string& username, const std::string& remote_path) {
-  std::ofstream file(tracking_timestamps_state_filename_);
-  if (!file.is_open()) {
-logger_->log_error("Failed to store state to Tracking Timestamps state 
file \"%s\"", tracking_timestamps_state_filename_.c_str());
+bool ListSFTP::persistTrackingTimestampsCache(const 
std::shared_ptr& context, const std::string& hostname, 
const std::string& username, const std::string& remote_path) {
+  auto state_manager = context->getStateManager();
+  if (state_manager == nullptr) {
 return false;
   }
-  file << "hostname=" << hostname << "\n";
-  file << "username=" << username << "\n";
-  file << "remote_path=" << remote_path << "\n";
-  file << "listing.timestamp=" << last_listed_latest_entry_timestamp_ << "\n";
-  file << "processed.timestamp=" << last_processed_latest_entry_timestamp_ << 
"\n";
+  std::unordered_map state;
+  state["listing_strategy"] = LISTING_STRATEGY_TRACKING_TIMESTAMPS;
+  state["hostname"] = hostname;
+  state["username"] = username;
+  state["remote_path"] = remote_path;
+  state["listing.timestamp"] = 
std::to_string(last_listed_latest_entry_timestamp_);
+  state["processed.timestamp"] = 
std::to_string(last_processed_latest_entry_timestamp_);
   size_t i = 0;
   for (const auto& identifier : latest_identifiers_processed_) {
-file << "id." << i << "=" << identifier << "\n";
+state["id." + std::to_string(i)] = identifier;
 ++i;
   }
+  state_manager->set(state);
+  if (!state_manager->persist()) {
+return false;
+  }
   return true;
 }
 
-bool ListSFTP::updateFromTrackingTimestampsCache(const std::string& hostname, 
const std::string& username, const std::string& remote_path) {
-  std::ifstream file(tracking_timestamps_state_filename_);
-  if (!file.is_open()) {
-logger_->log_error("Failed to open Tracking Timestamps state file \"%s\"", 
tracking_timestamps_state_filename_.c_str());
-return false;
-  }
+bool ListSFTP::updateFromTrackingTimestampsCache(const 
std::shared_ptr& context, const std::string& hostname, 
const std::string& username, const std::string& remote_path) {

Review comment:
   I am not really open to in the context of this PR. This is preexisting 
code, and is used this way in many processors. If you want to do a round of 
these modifications to make easier further API changes, that makes sense, but 
should be a separate issue (and ticket).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-04-23 Thread GitBox


bakaid commented on a change in pull request #605:
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r413766649



##
File path: extensions/standard-processors/tests/unit/TailFileTests.cpp
##
@@ -501,7 +481,7 @@ TEST_CASE("TailFileWithDelimiterMultipleDelimiters", 
"[tailfiletest2]") {
   std::string line1(4097, '\n');
   std::mt19937 gen(std::random_device { }());
   std::generate_n(line1.begin(), 4095, [&]() -> char {
-return 32 + gen() % (127 - 32);
+  return 32 + gen() % (127 - 32);

Review comment:
   I am not really.

##
File path: 
extensions/standard-processors/controllers/UnorderedMapKeyValueStoreService.cpp
##
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "UnorderedMapKeyValueStoreService.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+UnorderedMapKeyValueStoreService::UnorderedMapKeyValueStoreService(const 
std::string& name, const std::string& id)
+: KeyValueStoreService(name, id)
+, 
logger_(logging::LoggerFactory::getLogger()) {
+}
+
+UnorderedMapKeyValueStoreService::UnorderedMapKeyValueStoreService(const 
std::string& name, utils::Identifier uuid /*= utils::Identifier()*/)
+: KeyValueStoreService(name, uuid)
+, 
logger_(logging::LoggerFactory::getLogger()) {
+}
+
+UnorderedMapKeyValueStoreService::UnorderedMapKeyValueStoreService(const 
std::string& name, const std::shared_ptr )
+: KeyValueStoreService(name)
+, 
logger_(logging::LoggerFactory::getLogger())  
{
+  setConfiguration(configuration);
+  initialize();
+}
+
+UnorderedMapKeyValueStoreService::~UnorderedMapKeyValueStoreService() {
+}
+
+bool UnorderedMapKeyValueStoreService::set(const std::string& key, const 
std::string& value) {
+  std::lock_guard lock(mutex_);
+  map_[key] = value;
+  return true;
+}
+
+bool UnorderedMapKeyValueStoreService::get(const std::string& key, 
std::string& value) {
+  std::lock_guard lock(mutex_);
+  auto it = map_.find(key);
+  if (it == map_.end()) {
+return false;
+  } else {
+value = it->second;
+return true;
+  }
+}
+
+bool UnorderedMapKeyValueStoreService::get(std::unordered_map& kvs) {
+  std::lock_guard lock(mutex_);
+  kvs = map_;
+  return true;
+}
+
+bool UnorderedMapKeyValueStoreService::remove(const std::string& key) {
+  std::lock_guard lock(mutex_);
+  return map_.erase(key) == 1U;
+}
+
+bool UnorderedMapKeyValueStoreService::clear() {
+  std::lock_guard lock(mutex_);
+  map_.clear();
+  return true;
+}
+
+bool UnorderedMapKeyValueStoreService::update(const std::string& key, const 
std::function& update_func) {
+  std::lock_guard lock(mutex_);
+  bool exists = false;
+  std::string value;
+  auto it = map_.find(key);
+  if (it != map_.end()) {
+exists = true;
+value = it->second;
+  }
+  try {
+if (!update_func(exists, value)) {
+  return false;
+}
+  } catch (...) {
+return false;
+  }

Review comment:
   We are not silently swallowing it, we are making the operation fail, but 
added logging to the exception path.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-04-23 Thread GitBox


bakaid commented on a change in pull request #605:
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r413746596



##
File path: extensions/sql/processors/ExecuteSQL.cpp
##
@@ -81,14 +81,14 @@ void ExecuteSQL::initialize() {
   setSupportedRelationships( { s_success });
 }
 
-void ExecuteSQL::processOnSchedule(const core::ProcessContext ) {
+void ExecuteSQL::processOnSchedule(core::ProcessContext ) {
   initOutputFormat(context);
 
   context.getProperty(s_sqlSelectQuery.getName(), sqlSelectQuery_);
   context.getProperty(s_maxRowsPerFlowFile.getName(), max_rows_);
 }
 
-void ExecuteSQL::processOnTrigger(core::ProcessSession ) {
+void ExecuteSQL::processOnTrigger(core::ProcessContext& /*context*/, 
core::ProcessSession& session) {

Review comment:
   In a previous iteration QueryDatabaseTables's processOnTrigger needed 
the context, so it had to be put on the CRTP interface of SQLProcessor. It has 
been rewritten since and it is no longer needed by QDT, so I removed it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-04-23 Thread GitBox


bakaid commented on a change in pull request #605:
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r413745773



##
File path: extensions/standard-processors/processors/TailFile.cpp
##
@@ -232,20 +236,61 @@ void TailFile::parseStateFileLine(char *buf) {
 const auto file = key.substr(strlen(POSITION_STR));
 tail_states_[file].currentTailFilePosition_ = std::stoull(value);
   }
-
-  return;
 }
 
-bool TailFile::recoverState() {
-  std::ifstream file(state_file_.c_str(), std::ifstream::in);
-  if (!file.good()) {
-logger_->log_error("load state file failed %s", state_file_);
-return false;
+
+
+bool TailFile::recoverState(const std::shared_ptr& 
context) {
+  bool state_load_success = false;
+
+  std::unordered_map state_map;
+  if (state_manager_->get(state_map)) {
+std::map new_tail_states;
+size_t i = 0;
+while (true) {
+  std::string name;
+  try {
+name = state_map.at("file." + std::to_string(i) + ".name");
+  } catch (...) {
+break;
+  }
+  try {
+const std::string& current = state_map.at("file." + std::to_string(i) 
+ ".current");
+uint64_t position = std::stoull(state_map.at("file." + 
std::to_string(i) + ".position"));
+
+std::string fileLocation, fileName;
+if (utils::file::PathUtils::getFileNameAndPath(current, fileLocation, 
fileName)) {
+  logger_->log_debug("Received path %s, file %s", fileLocation, 
fileName);
+  new_tail_states.emplace(fileName, TailState { fileLocation, 
fileName, position, 0 });
+} else {
+  new_tail_states.emplace(current, TailState { fileLocation, current, 
position, 0 });
+}
+  } catch (...) {
+continue;
+  }
+  ++i;
+}
+state_load_success = true;
+tail_states_ = std::move(new_tail_states);
+for (const auto& s : tail_states_) {
+  logger_->log_debug("TailState %s: %s, %s, %llu, %llu", s.first, 
s.second.path_, s.second.current_file_name_, s.second.currentTailFilePosition_, 
s.second.currentTailFileModificationTime_);
+}
+  } else {
+logger_->log_info("Found no stored state");
   }
-  tail_states_.clear();
-  char buf[BUFFER_SIZE];
-  for (file.getline(buf, BUFFER_SIZE); file.good(); file.getline(buf, 
BUFFER_SIZE)) {
-parseStateFileLine(buf);
+
+  /* We could not get the state form the StateManager, try to migrate the old 
state file if it exists */

Review comment:
   Fixed.

##
File path: 
libminifi/src/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.cpp
##
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "controllers/keyvalue/AbstractCoreComponentStateManagerProvider.h"
+
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::AbstractCoreComponentStateManager(
+std::shared_ptr provider,
+const std::string& id)
+: provider_(std::move(provider))
+, id_(id)
+, state_valid_(false) {
+  std::string serialized;
+  if (provider_->getImpl(id_, serialized) && 
provider_->deserialize(serialized, state_)) {
+state_valid_ = true;
+  }
+}
+
+bool 
AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::set(const
 std::unordered_map& kvs) {
+  if (provider_->setImpl(id_, provider_->serialize(kvs))) {
+state_valid_ = true;
+state_ = kvs;
+return true;
+  } else {
+return false;
+  }
+}
+
+bool 
AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::get(std::unordered_map& kvs) {
+  if (!state_valid_) {
+return false;
+  }
+  kvs = state_;
+  return true;
+}
+
+bool 
AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::clear()
 {
+  if (!state_valid_) {
+return false;
+  }
+  if (provider_->removeImpl(id_)) {
+state_valid_ = false;
+state_.clear();
+return true;
+  } else {
+return false;
+  }
+}
+
+bool 

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-04-23 Thread GitBox


bakaid commented on a change in pull request #605:
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r413745426



##
File path: libminifi/test/TestBase.cpp
##
@@ -45,6 +45,23 @@ TestPlan::TestPlan(std::shared_ptr 
content_repo, std::s
   flow_version_(flow_version),
   logger_(logging::LoggerFactory::getLogger()) {
   stream_factory = 
org::apache::nifi::minifi::io::StreamFactory::getInstance(std::make_shared());
+  controller_services_ = 
std::make_shared();
+  controller_services_provider_ = 
std::make_shared(controller_services_,
 nullptr, configuration_);
+  /* Inject the default state provider ahead of ProcessContext to make sure we 
have a unique state directory */
+  if (state_dir == nullptr) {
+char state_dir_name_template[] = "/tmp/teststate.XX";
+state_dir_ = 
utils::file::FileUtils::create_temp_directory(state_dir_name_template);
+  } else {
+state_dir_ = state_dir;
+  }
+  state_manager_provider_ = 
core::ProcessContext::getOrCreateDefaultStateManagerProvider(controller_services_provider_,
 configuration_, state_dir_.c_str());

Review comment:
   Done.

##
File path: libminifi/test/integration/IntegrationBase.h
##
@@ -121,6 +122,11 @@ void IntegrationBase::run(std::string test_file_location) {
 
   core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, 
stream_factory, configuration, test_file_location);
 
+  auto controller_service_provider = yaml_ptr->getControllerServiceProvider();
+  char state_dir_name_template[] = "/tmp/integrationstate.XX";
+  state_dir = 
utils::file::FileUtils::create_temp_directory(state_dir_name_template);
+  
core::ProcessContext::getOrCreateDefaultStateManagerProvider(controller_service_provider,
 configuration, state_dir.c_str());

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-04-23 Thread GitBox


bakaid commented on a change in pull request #605:
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r413743841



##
File path: libminifi/test/keyvalue-tests/CMakeLists.txt
##
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+file(GLOB KEYVALUE_TESTS  "*.cpp")
+
+SET(KEYVALUE_INT_TEST_COUNT 0)
+
+FOREACH(testfile ${KEYVALUE_TESTS})
+get_filename_component(testfilename "${testfile}" NAME_WE)
+add_executable("${testfilename}" "${testfile}" )
+target_include_directories(${testfilename} PRIVATE BEFORE 
"${CMAKE_SOURCE_DIR}/libminifi/test/")
+createTests("${testfilename}")
+target_wholearchive_library(${testfilename} minifi-standard-processors)
+if (NOT DISABLE_ROCKSDB)
+target_wholearchive_library(${testfilename} minifi-rocksdb-repos)
+endif()
+
+MATH(EXPR KEYVALUE_INT_TEST_COUNT "${KEYVALUE_INT_TEST_COUNT}+1")
+ENDFOREACH()
+
+message("-- Finished building ${KEYVALUE_INT_TEST_COUNT} keyvalue controller 
test file(s)...")
+
+add_test(NAME UnorderedMapPersistableKeyValueStoreServiceTest COMMAND 
PersistableKeyValueStoreServiceTest --config-yaml 
"${TEST_RESOURCES}/UnorderedMapPersistableKeyValueStoreServiceTest.yml")
+if (NOT DISABLE_ROCKSDB)
+add_test(NAME RocksdDbPersistableKeyValueStoreServiceTest COMMAND 
PersistableKeyValueStoreServiceTest --config-yaml 
"${TEST_RESOURCES}/RocksDbPersistableKeyValueStoreServiceTest.yml")
+endif()

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-04-23 Thread GitBox


bakaid commented on a change in pull request #605:
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r413742717



##
File path: libminifi/src/core/ConfigurableComponent.cpp
##
@@ -195,6 +195,19 @@ bool 
ConfigurableComponent::setSupportedProperties(std::set properties
   return true;
 }
 
+bool ConfigurableComponent::updateSupportedProperties(std::set 
properties) {
+  if (!canEdit()) {
+return false;
+  }
+
+  std::lock_guard lock(configuration_mutex_);
+
+  for (auto item : properties) {
+properties_[item.getName()] = item;

Review comment:
   This function is a modified version of 
`ConfigurableComponent::setSupportedProperties`. Yes, the copies can be avoided 
both here and there, but it is completely inconsequential, as this happens only 
once per flow initialization.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-03-18 Thread GitBox
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement 
RocksDB controller service and component st…
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r394634612
 
 

 ##
 File path: 
extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.cpp
 ##
 @@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RocksDbPersistableKeyValueStoreService.h"
+
+#include "utils/StringUtils.h"
+
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+core::Property RocksDbPersistableKeyValueStoreService::Directory(
+core::PropertyBuilder::createProperty("Directory")->withDescription("Path 
to a directory for the database")
+->isRequired(true)->build());
+
+RocksDbPersistableKeyValueStoreService::RocksDbPersistableKeyValueStoreService(const
 std::string& name, const std::string& id)
+: controllers::KeyValueStoreService(name, id)
+, controllers::AbstractAutoPersistingKeyValueStoreService(name, id)
+, db_valid_(false)
+, 
logger_(logging::LoggerFactory::getLogger())
 {
+}
+
+RocksDbPersistableKeyValueStoreService::RocksDbPersistableKeyValueStoreService(const
 std::string& name, utils::Identifier uuid /*= utils::Identifier()*/)
+: controllers::KeyValueStoreService(name, uuid)
+, controllers::AbstractAutoPersistingKeyValueStoreService(name, uuid)
+, db_valid_(false)
+, 
logger_(logging::LoggerFactory::getLogger())
 {
+}
+
+RocksDbPersistableKeyValueStoreService::~RocksDbPersistableKeyValueStoreService()
 {
+  if (db_valid_) {
+delete db_;
+  }
+}
+
+void RocksDbPersistableKeyValueStoreService::initialize() {
+  AbstractAutoPersistingKeyValueStoreService::initialize();
+  std::set supportedProperties;
+  supportedProperties.insert(Directory);
+  updateSupportedProperties(supportedProperties);
+}
+
+void RocksDbPersistableKeyValueStoreService::onEnable() {
+  if (configuration_ == nullptr) {
+logger_->log_debug("Cannot enable RocksDbPersistableKeyValueStoreService");
+return;
+  }
+
+  AbstractAutoPersistingKeyValueStoreService::onEnable();
+
+  if (!getProperty(Directory.getName(), directory_)) {
+logger_->log_error("Invalid or missing property: Directory");
+return;
+  }
+
+  if (db_valid_) {
+db_valid_ = false;
+delete db_;
+  }
+  rocksdb::Options options;
+  options.create_if_missing = true;
+  if (!always_persist_) {
+options.manual_wal_flush = true;
+  }
+  rocksdb::Status status = rocksdb::DB::Open(options, directory_, _);
+  if (status.ok()) {
+db_valid_ = true;
+logger_->log_trace("Successfully opened RocksDB database at %s", 
directory_.c_str());
+  } else {
+logger_->log_error("Failed to open RocksDB database at %s, error: %s", 
directory_.c_str(), status.getState());
+return;
+  }
+
+  if (always_persist_) {
+default_write_options.sync = true;
+  }
+
+  logger_->log_trace("Enabled RocksDbPersistableKeyValueStoreService");
+}
+
+void RocksDbPersistableKeyValueStoreService::notifyStop() {
+  AbstractAutoPersistingKeyValueStoreService::notifyStop();
+
+  if (db_valid_) {
+delete db_;
+db_valid_= false;
+  }
+}
+
+bool RocksDbPersistableKeyValueStoreService::set(const std::string& key, const 
std::string& value) {
+  if (!db_valid_) {
 
 Review comment:
   Replaced with unique_ptr, so no longer relevant.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-03-18 Thread GitBox
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement 
RocksDB controller service and component st…
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r394634269
 
 

 ##
 File path: 
extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.h
 ##
 @@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef 
LIBMINIFI_INCLUDE_CONTROLLERS_KEYVALUE_RocksDbPersistableKeyValueStoreService_H_
+#define 
LIBMINIFI_INCLUDE_CONTROLLERS_KEYVALUE_RocksDbPersistableKeyValueStoreService_H_
+
+#include "controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h"
+#include "core/Core.h"
+#include "properties/Configure.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+
+#include "rocksdb/db.h"
+#include "rocksdb/options.h"
+#include "rocksdb/slice.h"
+
+#include 
+#include 
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+class RocksDbPersistableKeyValueStoreService : public 
AbstractAutoPersistingKeyValueStoreService {
+ public:
+  explicit RocksDbPersistableKeyValueStoreService(const std::string& name, 
const std::string& id);
+  explicit RocksDbPersistableKeyValueStoreService(const std::string& name, 
utils::Identifier uuid = utils::Identifier());
+
+  virtual ~RocksDbPersistableKeyValueStoreService();
+
+  static core::Property Directory;
+
+  virtual void initialize() override;
+  virtual void onEnable() override;
+  virtual void notifyStop() override;
+
+  virtual bool set(const std::string& key, const std::string& value) override;
+
+  virtual bool get(const std::string& key, std::string& value) override;
+
+  virtual bool get(std::unordered_map& kvs) override;
+
+  virtual bool remove(const std::string& key) override;
+
+  virtual bool clear() override;
+
+  virtual bool update(const std::string& key, const std::function& update_func) override;
+
+  virtual bool persist() override;
+
+ protected:
+  std::string directory_;
+
+  rocksdb::DB* db_;
+  rocksdb::WriteOptions default_write_options;
+  bool db_valid_;
 
 Review comment:
   It was a remnant from the UnorderedMap version (which was implemented before 
this). Replaced along with using a unique_ptr.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-03-18 Thread GitBox
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement 
RocksDB controller service and component st…
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r394634441
 
 

 ##
 File path: 
extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.cpp
 ##
 @@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RocksDbPersistableKeyValueStoreService.h"
+
+#include "utils/StringUtils.h"
+
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+core::Property RocksDbPersistableKeyValueStoreService::Directory(
+core::PropertyBuilder::createProperty("Directory")->withDescription("Path 
to a directory for the database")
+->isRequired(true)->build());
+
+RocksDbPersistableKeyValueStoreService::RocksDbPersistableKeyValueStoreService(const
 std::string& name, const std::string& id)
+: controllers::KeyValueStoreService(name, id)
+, controllers::AbstractAutoPersistingKeyValueStoreService(name, id)
+, db_valid_(false)
+, 
logger_(logging::LoggerFactory::getLogger())
 {
+}
+
+RocksDbPersistableKeyValueStoreService::RocksDbPersistableKeyValueStoreService(const
 std::string& name, utils::Identifier uuid /*= utils::Identifier()*/)
+: controllers::KeyValueStoreService(name, uuid)
+, controllers::AbstractAutoPersistingKeyValueStoreService(name, uuid)
+, db_valid_(false)
+, 
logger_(logging::LoggerFactory::getLogger())
 {
+}
+
+RocksDbPersistableKeyValueStoreService::~RocksDbPersistableKeyValueStoreService()
 {
+  if (db_valid_) {
+delete db_;
+  }
+}
+
+void RocksDbPersistableKeyValueStoreService::initialize() {
+  AbstractAutoPersistingKeyValueStoreService::initialize();
+  std::set supportedProperties;
+  supportedProperties.insert(Directory);
+  updateSupportedProperties(supportedProperties);
+}
+
+void RocksDbPersistableKeyValueStoreService::onEnable() {
+  if (configuration_ == nullptr) {
+logger_->log_debug("Cannot enable RocksDbPersistableKeyValueStoreService");
+return;
+  }
+
+  AbstractAutoPersistingKeyValueStoreService::onEnable();
+
+  if (!getProperty(Directory.getName(), directory_)) {
+logger_->log_error("Invalid or missing property: Directory");
+return;
+  }
+
+  if (db_valid_) {
+db_valid_ = false;
+delete db_;
+  }
+  rocksdb::Options options;
+  options.create_if_missing = true;
 
 Review comment:
   Good point, this was written last summer, we didn't know this back then.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-03-18 Thread GitBox
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement 
RocksDB controller service and component st…
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r394633928
 
 

 ##
 File path: 
extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.h
 ##
 @@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef 
LIBMINIFI_INCLUDE_CONTROLLERS_KEYVALUE_RocksDbPersistableKeyValueStoreService_H_
+#define 
LIBMINIFI_INCLUDE_CONTROLLERS_KEYVALUE_RocksDbPersistableKeyValueStoreService_H_
+
+#include "controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h"
+#include "core/Core.h"
+#include "properties/Configure.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+
+#include "rocksdb/db.h"
+#include "rocksdb/options.h"
+#include "rocksdb/slice.h"
+
+#include 
+#include 
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+class RocksDbPersistableKeyValueStoreService : public 
AbstractAutoPersistingKeyValueStoreService {
+ public:
+  explicit RocksDbPersistableKeyValueStoreService(const std::string& name, 
const std::string& id);
+  explicit RocksDbPersistableKeyValueStoreService(const std::string& name, 
utils::Identifier uuid = utils::Identifier());
+
+  virtual ~RocksDbPersistableKeyValueStoreService();
+
+  static core::Property Directory;
+
+  virtual void initialize() override;
+  virtual void onEnable() override;
+  virtual void notifyStop() override;
+
+  virtual bool set(const std::string& key, const std::string& value) override;
+
+  virtual bool get(const std::string& key, std::string& value) override;
+
+  virtual bool get(std::unordered_map& kvs) override;
+
+  virtual bool remove(const std::string& key) override;
+
+  virtual bool clear() override;
+
+  virtual bool update(const std::string& key, const std::function& update_func) override;
+
+  virtual bool persist() override;
+
+ protected:
+  std::string directory_;
+
+  rocksdb::DB* db_;
 
 Review comment:
   None of our rocksdb stuff uses a unique_ptr, but it can, rewritten.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-03-17 Thread GitBox
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement 
RocksDB controller service and component st…
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r393557670
 
 

 ##
 File path: 
extensions/standard-processors/controllers/UnorderedMapKeyValueStoreService.h
 ##
 @@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __UNORDERED_MAP_KEY_VALUE_STORE_SERVICE_H__
 
 Review comment:
   I can very well imagine having multiple volatile implementations of the 
`KeyValueStoreService` in the future, with different properties than this, and 
I didn't want to designate this **the** `VolatileKeyValueStoreService`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-03-17 Thread GitBox
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement 
RocksDB controller service and component st…
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r393557670
 
 

 ##
 File path: 
extensions/standard-processors/controllers/UnorderedMapKeyValueStoreService.h
 ##
 @@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __UNORDERED_MAP_KEY_VALUE_STORE_SERVICE_H__
 
 Review comment:
   I can very well imagine having multiple volatile implementations of the 
KeyValueStoreService in the future, with different properties than this, and I 
didn't want to designate this *the* VolatileKeyValueStoreService.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-03-17 Thread GitBox
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement 
RocksDB controller service and component st…
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r393555662
 
 

 ##
 File path: extensions/windows-event-log/Bookmark.cpp
 ##
 @@ -222,7 +184,7 @@ bool Bookmark::getBookmarkXmlFromFile(std::wstring& 
bookmarkXml) {
   if (std::wstring::npos == pos) {
 logger_->log_error("No '!' in bookmarXml '%ls'", bookmarkXml.c_str());
 bookmarkXml.clear();
-return createEmptyBookmarkXmlFile();
+   return false;
 
 Review comment:
   Thanks, since these are Windows-only, I edited them in VS when migrating 
them, and well... this is the result.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-03-17 Thread GitBox
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement 
RocksDB controller service and component st…
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r393554128
 
 

 ##
 File path: extensions/sftp/processors/ListSFTP.cpp
 ##
 @@ -760,140 +713,83 @@ void ListSFTP::listByTrackingTimestamps(
   }
 }
 
-bool ListSFTP::persistTrackingEntitiesCache(const std::string& hostname, const 
std::string& username, const std::string& remote_path) {
-  std::ofstream file(tracking_entities_state_filename_);
-  if (!file.is_open()) {
-logger_->log_error("Failed to store Tracking Entities state to state file 
\"%s\"", tracking_entities_state_filename_.c_str());
-return false;
-  }
-  file << "hostname=" << hostname << "\n";
-  file << "username=" << username << "\n";
-  file << "remote_path=" << remote_path << "\n";
-  file << "json_state_file=" << tracking_entities_state_json_filename_ << "\n";
-  file.close();
-
-  std::ofstream json_file(tracking_entities_state_json_filename_);
-  if (!json_file.is_open()) {
-logger_->log_error("Failed to store Tracking Entities state to state json 
file \"%s\"", tracking_entities_state_json_filename_.c_str());
-return false;
-  }
-
-  rapidjson::Document entities(rapidjson::kObjectType);
-  rapidjson::Document::AllocatorType& alloc = entities.GetAllocator();
-  for (const auto& already_listed_entity : already_listed_entities_) {
-rapidjson::Value entity(rapidjson::kObjectType);
-entity.AddMember("timestamp", already_listed_entity.second.timestamp, 
alloc);
-entity.AddMember("size", already_listed_entity.second.size, alloc);
-entities.AddMember(rapidjson::Value(already_listed_entity.first.c_str(), 
alloc), std::move(entity), alloc);
+bool ListSFTP::persistTrackingEntitiesCache(const 
std::shared_ptr& context, const std::string& hostname, 
const std::string& username, const std::string& remote_path) {
+  std::unordered_map state;
+  state["listing_strategy"] = listing_strategy_;
+  state["hostname"] = hostname;
+  state["username"] = username;
+  state["remote_path"] = remote_path;
+  size_t i = 0;
+  for (const auto _listed_entity : already_listed_entities_) {
+state["entity." + std::to_string(i) + ".name"] = 
already_listed_entity.first;
+state["entity." + std::to_string(i) + ".timestamp"] = 
std::to_string(already_listed_entity.second.timestamp);
+state["entity." + std::to_string(i) + ".size"] = 
std::to_string(already_listed_entity.second.size);
+++i;
   }
-
-  rapidjson::OStreamWrapper osw(json_file);
-  rapidjson::Writer writer(osw);
-  entities.Accept(writer);
-
-  return true;
+  return state_manager_->set(state);
 }
 
-bool ListSFTP::updateFromTrackingEntitiesCache(const std::string& hostname, 
const std::string& username, const std::string& remote_path) {
-  std::ifstream file(tracking_entities_state_filename_);
-  if (!file.is_open()) {
-logger_->log_error("Failed to open Tracking Entities state file \"%s\"", 
tracking_entities_state_filename_.c_str());
-return false;
-  }
+bool ListSFTP::updateFromTrackingEntitiesCache(const 
std::shared_ptr& context, const std::string& hostname, 
const std::string& username, const std::string& remote_path) {
+  std::string state_listing_strategy;
   std::string state_hostname;
   std::string state_username;
   std::string state_remote_path;
-  std::string state_json_state_file;
-
-  std::string line;
-  while (std::getline(file, line)) {
-size_t separator_pos = line.find('=');
-if (separator_pos == std::string::npos) {
-  logger_->log_warn("None key-value line found in Tracking Entities state 
file \"%s\": \"%s\"", tracking_entities_state_filename_.c_str(), line.c_str());
-  continue;
-}
-std::string key = line.substr(0, separator_pos);
-std::string value = line.substr(separator_pos + 1);
-if (key == "hostname") {
-  state_hostname = std::move(value);
-} else if (key == "username") {
-  state_username = std::move(value);
-} else if (key == "remote_path") {
-  state_remote_path = std::move(value);
-} else if (key == "json_state_file") {
-  state_json_state_file = std::move(value);
-} else {
-  logger_->log_warn("Unknown key found in Tracking Entities state file 
\"%s\": \"%s\"", tracking_entities_state_filename_.c_str(), key.c_str());
-}
-  }
-  file.close();
-
-  if (state_hostname != hostname ||
-  state_username != username ||
-  state_remote_path != remote_path) {
-logger_->log_error("Tracking Entities state file \"%s\" was created with 
different settings than the current ones, ignoring. "
-   "Hostname: \"%s\" vs. \"%s\", "
-   "Username: \"%s\" vs. \"%s\", "
-   "Remote Path: \"%s\" vs. \"%s\"",
-   tracking_entities_state_filename_.c_str(),
-   state_hostname, hostname,
-   state_username, username,
-   

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-03-17 Thread GitBox
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement 
RocksDB controller service and component st…
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r393551822
 
 

 ##
 File path: 
extensions/standard-processors/controllers/UnorderedMapPersistableKeyValueStoreService.h
 ##
 @@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __UNORDERED_MAP_PERSISTABLE_KEY_VALUE_STORE_SERVICE_H__
+#define __UNORDERED_MAP_PERSISTABLE_KEY_VALUE_STORE_SERVICE_H__
+
+#include "controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h"
+#include "UnorderedMapKeyValueStoreService.h"
+#include "core/Core.h"
+#include "properties/Configure.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "core/Resource.h"
+
+#include 
+#include 
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+class UnorderedMapPersistableKeyValueStoreService : public 
AbstractAutoPersistingKeyValueStoreService,
 
 Review comment:
   The motivation is twofold: to provide an implementation without any third 
party requirements, and to provide a serialized format which is human-readable 
and easily editable.
   
   As for the first motivation: we don't want to make it a hard requirement to 
have rocksdb to use stateful processors, because that would hinder 
low-footprint usages, nor do I want to make any other third party a dependency 
for this: it would, again, introduce an unwanted requirement for low-footprint 
usages and we would have to find one that supports all target platforms, and 
maintain it, which is not a negligible maintenance burden.
   
   For the second one: so far users could easily clear a state by deleting the 
state file or directory. With the rocksdb state storage this is no longer a 
trivial task. The C2 methods introduced here attempt to solve this problem, but 
until they are finalized (or if someone does not want to use, or can't use C2), 
they aren't really useful.
   A solution for this is using this state storage method: its serialized 
format is trivial and editable in text format without any third party tools, 
making it easy to clear (or even modify) the state of processors.
   
   Finally for the support argument: this is a trivial, and, if you take a look 
at `PersistableKeyValueStoreServiceTest.cpp`, well-tested implementation, that 
I don't think would introduce a significant maintenance burden going forward.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-03-10 Thread GitBox
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement 
RocksDB controller service and component st…
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r390714137
 
 

 ##
 File path: libminifi/src/c2/C2Agent.cpp
 ##
 @@ -435,6 +437,27 @@ void C2Agent::handle_c2_server_response(const 
C2ContentResponse ) {
 update_sink_->drainRepositories();
 C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
 enqueue_c2_response(std::move(response));
+  } else if (resp.name == "corecomponentstate") {
 
 Review comment:
   This has only been written to make sure that it fits the architecture, it 
hasn't even been tested manually. Once 
https://github.com/apache/nifi-minifi-cpp/pull/743 is done and merged, I can 
probably write reasonable tests for it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-03-10 Thread GitBox
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement 
RocksDB controller service and component st…
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r390713995
 
 

 ##
 File path: libminifi/src/c2/C2Agent.cpp
 ##
 @@ -596,6 +619,29 @@ void C2Agent::handle_describe(const C2ContentResponse 
) {
   }
   enqueue_c2_response(std::move(response));
 }
+  } else if (resp.name == "corecomponentstate") {
 
 Review comment:
   This has only been tested manually. Once 
https://github.com/apache/nifi-minifi-cpp/pull/743 is done and merged, I can 
probably write reasonable tests for it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-03-10 Thread GitBox
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement 
RocksDB controller service and component st…
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r390712104
 
 

 ##
 File path: libminifi/include/core/ProcessContext.h
 ##
 @@ -193,6 +216,61 @@ class ProcessContext : public 
controller::ControllerServiceLookup, public core::
 return controller_service_provider_->getControllerServiceName(identifier);
   }
 
+  static constexpr char const* DefaultStateManagerProviderName = 
"defaultstatemanagerprovider";
+
+  std::shared_ptr getStateManager() {
+if (state_manager_provider_ == nullptr) {
+  return nullptr;
+}
+return 
state_manager_provider_->getCoreComponentStateManager(*processor_node_);
+  }
+
+  static std::shared_ptr 
getOrCreateDefaultStateManagerProvider(
+  std::shared_ptr 
controller_service_provider,
+  const char *base_path = "") {
+static std::mutex mutex;
+std::lock_guard lock(mutex);
+
+/* See if we have already created a default provider */
+std::shared_ptr node = 
controller_service_provider->getControllerServiceNode(DefaultStateManagerProviderName);
 // TODO
+if (node != nullptr) {
+  return 
std::dynamic_pointer_cast(node->getControllerServiceImplementation());
+}
+
+/* Try to create a RocksDB-backed provider */
+node = 
controller_service_provider->createControllerService("RocksDbPersistableKeyValueStoreService",
+
"org.apache.nifi.minifi.controllers.RocksDbPersistableKeyValueStoreService",
+
DefaultStateManagerProviderName,
+true 
/*firstTimeAdded*/);
+if (node != nullptr) {
+  node->initialize();
+  auto provider = node->getControllerServiceImplementation();
+  if (provider != nullptr) {
+provider->setProperty("Directory", 
utils::file::FileUtils::concat_path(base_path, "corecomponentstate"));
+node->enable();
+return 
std::dynamic_pointer_cast(provider);
+  }
+}
+
+/* Fall back to a locked unordered map-backed provider */
+node = 
controller_service_provider->createControllerService("UnorderedMapPersistableKeyValueStoreService",
+
"org.apache.nifi.minifi.controllers.UnorderedMapPersistableKeyValueStoreService",
+
DefaultStateManagerProviderName,
+true 
/*firstTimeAdded*/);
+if (node != nullptr) {
+  node->initialize();
+  auto provider = node->getControllerServiceImplementation();
+  if (provider != nullptr) {
+provider->setProperty("File", 
utils::file::FileUtils::concat_path(base_path, "corecomponentstate.txt"));
+node->enable();
+return 
std::dynamic_pointer_cast(provider);
+  }
+}
 
 Review comment:
   @szaszm Ended up adding more code to this, and now it really made sense to 
deduplicate it, so I've done that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-03-10 Thread GitBox
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement 
RocksDB controller service and component st…
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r390660992
 
 

 ##
 File path: 
libminifi/include/controllers/keyvalue/PersistableKeyValueStoreService.h
 ##
 @@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_KEYVALUE_PersistableKeyValueStoreService_H_
+#define LIBMINIFI_INCLUDE_KEYVALUE_PersistableKeyValueStoreService_H_
+
+#include "KeyValueStoreService.h"
+#include "AbstractCoreComponentStateManagerProvider.h"
+#include "core/Core.h"
+#include "properties/Configure.h"
+
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+class PersistableKeyValueStoreService : virtual public KeyValueStoreService, 
public AbstractCoreComponentStateManagerProvider {
 
 Review comment:
   `PersistableKeyValueStoreService` is just an interface extension of 
`KeyValueStoreService`, and forcefully wrapping a 
`PersistableKeyValueStoreService` into some class that provides the 
`CoreComponentStateManagerProvider` interface after loading the 
`PersistableKeyValueStoreService` with the classloader would make it impossible 
to implement and use a `CoreComponentStateManagerProvider` directly.
   This way everything "just works": if you implement a 
`PersistableKeyValueStoreService` you automatically have a 
`CoreComponentStateManagerProvider`, but you can implement a 
`CoreComponentStateManagerProvider` directly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-03-10 Thread GitBox
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement 
RocksDB controller service and component st…
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r390660992
 
 

 ##
 File path: 
libminifi/include/controllers/keyvalue/PersistableKeyValueStoreService.h
 ##
 @@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_KEYVALUE_PersistableKeyValueStoreService_H_
+#define LIBMINIFI_INCLUDE_KEYVALUE_PersistableKeyValueStoreService_H_
+
+#include "KeyValueStoreService.h"
+#include "AbstractCoreComponentStateManagerProvider.h"
+#include "core/Core.h"
+#include "properties/Configure.h"
+
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+class PersistableKeyValueStoreService : virtual public KeyValueStoreService, 
public AbstractCoreComponentStateManagerProvider {
 
 Review comment:
   PersistableKeyValueStoreService is just an interface extension of 
PersistableKeyValueStoreService, and forcefully wrapping a 
PersistableKeyValueStoreService into some class that provides the 
CoreComponentStateManagerProvider interface after loading the  
PersistableKeyValueStoreService with the classloader would make it impossible 
to implement a CoreComponentStateManagerProvider directly.
   This way everything "just works": if you implement a 
PersistableKeyValueStoreService you automatically have a 
CoreComponentStateManagerProvider, but you can implement a 
CoreComponentStateManagerProvider directly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-03-10 Thread GitBox
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement 
RocksDB controller service and component st…
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r390660992
 
 

 ##
 File path: 
libminifi/include/controllers/keyvalue/PersistableKeyValueStoreService.h
 ##
 @@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_KEYVALUE_PersistableKeyValueStoreService_H_
+#define LIBMINIFI_INCLUDE_KEYVALUE_PersistableKeyValueStoreService_H_
+
+#include "KeyValueStoreService.h"
+#include "AbstractCoreComponentStateManagerProvider.h"
+#include "core/Core.h"
+#include "properties/Configure.h"
+
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+class PersistableKeyValueStoreService : virtual public KeyValueStoreService, 
public AbstractCoreComponentStateManagerProvider {
 
 Review comment:
   `PersistableKeyValueStoreService` is just an interface extension of 
`KeyValueStoreService`, and forcefully wrapping a 
`PersistableKeyValueStoreService` into some class that provides the 
`CoreComponentStateManagerProvider` interface after loading the 
`PersistableKeyValueStoreService` with the classloader would make it impossible 
to implement and use a `CoreComponentStateManagerProvider` directly.
   This way everything "just works": if you implement a 
`PersistableKeyValueStoreService` you automatically have a 
`CoreComponentStateManagerProvider`, but you can implement a 
CoreComponentStateManagerProvider directly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-03-09 Thread GitBox
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement 
RocksDB controller service and component st…
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r389775151
 
 

 ##
 File path: libminifi/test/integration/IntegrationBase.h
 ##
 @@ -109,6 +110,16 @@ void IntegrationBase::run(std::string test_file_location) 
{
 
   core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, 
stream_factory, configuration, test_file_location);
 
+  auto controller_service_provider = yaml_ptr->getControllerServiceProvider();
+  std::string state_dir_name_template = "/tmp/integrationstate.XX";
+  std::vector state_dir_buf(state_dir_name_template.c_str(),
+  state_dir_name_template.c_str() + 
state_dir_name_template.size() + 1);
+  if (mkdtemp(state_dir_buf.data()) == nullptr) {
+throw std::runtime_error("Failed to create temporary directory for state");
+  }
+  state_dir = state_dir_buf.data();
 
 Review comment:
   No longer exists in the rebased version.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-03-09 Thread GitBox
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement 
RocksDB controller service and component st…
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r389768446
 
 

 ##
 File path: 
libminifi/src/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.cpp
 ##
 @@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "controllers/keyvalue/AbstractCoreComponentStateManagerProvider.h"
+
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::AbstractCoreComponentStateManager(
+std::shared_ptr provider,
+const std::string& id)
+: provider_(std::move(provider))
+, id_(id)
+, state_valid_(false) {
+  std::string serialized;
+  if (provider_->getImpl(id_, serialized)) {
+if (provider_->deserialize(serialized, state_)) {
 
 Review comment:
   Ah, yeah, that would make sense, thank you. :D


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-03-09 Thread GitBox
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement 
RocksDB controller service and component st…
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r389762676
 
 

 ##
 File path: libminifi/include/core/ProcessContext.h
 ##
 @@ -193,6 +216,61 @@ class ProcessContext : public 
controller::ControllerServiceLookup, public core::
 return controller_service_provider_->getControllerServiceName(identifier);
   }
 
+  static constexpr char const* DefaultStateManagerProviderName = 
"defaultstatemanagerprovider";
+
+  std::shared_ptr getStateManager() {
+if (state_manager_provider_ == nullptr) {
+  return nullptr;
+}
+return 
state_manager_provider_->getCoreComponentStateManager(*processor_node_);
+  }
+
+  static std::shared_ptr 
getOrCreateDefaultStateManagerProvider(
+  std::shared_ptr 
controller_service_provider,
+  const char *base_path = "") {
+static std::mutex mutex;
+std::lock_guard lock(mutex);
+
+/* See if we have already created a default provider */
+std::shared_ptr node = 
controller_service_provider->getControllerServiceNode(DefaultStateManagerProviderName);
 // TODO
+if (node != nullptr) {
+  return 
std::dynamic_pointer_cast(node->getControllerServiceImplementation());
+}
+
+/* Try to create a RocksDB-backed provider */
+node = 
controller_service_provider->createControllerService("RocksDbPersistableKeyValueStoreService",
+
"org.apache.nifi.minifi.controllers.RocksDbPersistableKeyValueStoreService",
+
DefaultStateManagerProviderName,
+true 
/*firstTimeAdded*/);
+if (node != nullptr) {
+  node->initialize();
+  auto provider = node->getControllerServiceImplementation();
+  if (provider != nullptr) {
+provider->setProperty("Directory", 
utils::file::FileUtils::concat_path(base_path, "corecomponentstate"));
+node->enable();
+return 
std::dynamic_pointer_cast(provider);
+  }
+}
+
+/* Fall back to a locked unordered map-backed provider */
+node = 
controller_service_provider->createControllerService("UnorderedMapPersistableKeyValueStoreService",
+
"org.apache.nifi.minifi.controllers.UnorderedMapPersistableKeyValueStoreService",
+
DefaultStateManagerProviderName,
+true 
/*firstTimeAdded*/);
+if (node != nullptr) {
+  node->initialize();
+  auto provider = node->getControllerServiceImplementation();
+  if (provider != nullptr) {
+provider->setProperty("File", 
utils::file::FileUtils::concat_path(base_path, "corecomponentstate.txt"));
+node->enable();
+return 
std::dynamic_pointer_cast(provider);
+  }
+}
 
 Review comment:
   Because we need to set a File in one and a Directory in the other, obviously 
with different names, creating a function from this would not save us much - it 
would actually make it less readable in my opinion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-03-09 Thread GitBox
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement 
RocksDB controller service and component st…
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r389760968
 
 

 ##
 File path: 
libminifi/src/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.cpp
 ##
 @@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+core::Property AbstractAutoPersistingKeyValueStoreService::AlwaysPersist(
+core::PropertyBuilder::createProperty("Always 
Persist")->withDescription("Persist every change instead of persisting it 
periodically.")
+->isRequired(false)->withDefaultValue(false)->build());
+core::Property 
AbstractAutoPersistingKeyValueStoreService::AutoPersistenceInterval(
+core::PropertyBuilder::createProperty("Auto Persistence 
Interval")->withDescription("The interval of the periodic task persisting all 
values. "
+   
 "Only used if Always Persist is false. "
+   
 "If set to 0 seconds, auto persistence will be disabled.")
+->isRequired(false)->withDefaultValue("1 
min")->build());
+
+AbstractAutoPersistingKeyValueStoreService::AbstractAutoPersistingKeyValueStoreService(const
 std::string& name, const std::string& id)
+: KeyValueStoreService(name, id)
+, PersistableKeyValueStoreService(name, id)
+, always_persist_(false)
+, auto_persistence_interval_(0U)
+, running_(false)
+, 
logger_(logging::LoggerFactory::getLogger())
 {
+}
+
+AbstractAutoPersistingKeyValueStoreService::AbstractAutoPersistingKeyValueStoreService(const
 std::string& name, utils::Identifier uuid /*= utils::Identifier()*/)
+: KeyValueStoreService(name, uuid)
+, PersistableKeyValueStoreService(name, uuid)
+, always_persist_(false)
+, auto_persistence_interval_(0U)
+, running_(false)
+, 
logger_(logging::LoggerFactory::getLogger())
 {
+}
+
+AbstractAutoPersistingKeyValueStoreService::~AbstractAutoPersistingKeyValueStoreService()
 {
+  notifyStop();
+}
+
+void AbstractAutoPersistingKeyValueStoreService::initialize() {
+  ControllerService::initialize();
+  std::set supportedProperties;
+  supportedProperties.insert(AlwaysPersist);
+  supportedProperties.insert(AutoPersistenceInterval);
+  updateSupportedProperties(supportedProperties);
+}
+
+void AbstractAutoPersistingKeyValueStoreService::onEnable() {
+  std::unique_lock lock(persisting_mutex_);
+
+  if (configuration_ == nullptr) {
+logger_->log_debug("Cannot enable 
AbstractAutoPersistingKeyValueStoreService");
+return;
+  }
+
+  std::string value;
+  if (!getProperty(AlwaysPersist.getName(), value)) {
+logger_->log_error("Always Persist attribute is missing or invalid");
+  } else {
+utils::StringUtils::StringToBool(value, always_persist_);
+  }
+  if (!getProperty(AutoPersistenceInterval.getName(), value)) {
+logger_->log_error("Auto Persistence Interval attribute is missing or 
invalid");
+  } else {
+core::TimeUnit unit;
+if (!core::Property::StringToTime(value, auto_persistence_interval_, unit) 
|| !core::Property::ConvertTimeUnitToMS(auto_persistence_interval_, unit, 
auto_persistence_interval_)) {
+  logger_->log_error("Auto Persistence Interval attribute is invalid");
+}
+  }
+
+  if (!always_persist_ && auto_persistence_interval_ != 0U) {
+if (!persisting_thread_.joinable()) {
+  logger_->log_trace("Starting auto persistence thread");
+  running_ = true;
+  persisting_thread_ = 
std::thread(::persistingThreadFunc, 
this);
+}
+  }
+
+  logger_->log_trace("Enabled AbstractAutoPersistingKeyValueStoreService");
+}
+
+void AbstractAutoPersistingKeyValueStoreService::notifyStop() {
+  if (persisting_thread_.joinable()) {
+{
+  std::lock_guard lock(persisting_mutex_);
+  running_ = false;
+  persisting_cv_.notify_one();
+}
+persisting_thread_.join();
+  }
+}
+
+void 

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-03-09 Thread GitBox
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement 
RocksDB controller service and component st…
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r389760197
 
 

 ##
 File path: libminifi/test/TestBase.cpp
 ##
 @@ -30,6 +30,25 @@ TestPlan::TestPlan(std::shared_ptr 
content_repo, std::s
   flow_version_(flow_version),
   logger_(logging::LoggerFactory::getLogger()) {
   stream_factory = 
org::apache::nifi::minifi::io::StreamFactory::getInstance(std::make_shared());
+  controller_services_ = 
std::make_shared();
+  controller_services_provider_ = 
std::make_shared(controller_services_,
 nullptr, configuration_);
+  /* Inject the default state provider ahead of ProcessContext to make sure we 
have a unique state directory */
+  if (state_dir == nullptr) {
+std::string state_dir_name_template = "/tmp/teststate.XX";
+std::vector state_dir_buf(state_dir_name_template.c_str(),
+   state_dir_name_template.c_str() + 
state_dir_name_template.size() + 1);
 
 Review comment:
   No longer exists in the rebased version.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-03-09 Thread GitBox
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement 
RocksDB controller service and component st…
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r389759366
 
 

 ##
 File path: 
libminifi/test/keyvalue-tests/PersistableKeyValueStoreServiceTest.cpp
 ##
 @@ -0,0 +1,257 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define CATCH_CONFIG_RUNNER
+#include "catch.hpp"
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include "../TestBase.h"
+#include "../../controller/Controller.h"
+#include "core/controller/ControllerService.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "controllers/keyvalue/PersistableKeyValueStoreService.h"
+#include "controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h"
+
+static std::string config_yaml; // NOLINT
+
+static inline void configYamlHandler(Catch::ConfigData&, const std::string& 
path) {
+  config_yaml = path;
+}
+
+int main(int argc, char* argv[]) {
+  Catch::Session session;
+
+  Catch::Clara::CommandLine& cli = 
const_cast&>(session.cli());
+  cli["--config-yaml"]
+  .describe("path to the config.yaml containing the 
PersistableKeyValueStoreService controller service configuration")
+  .bind(, "path");
+
+  int ret = session.applyCommandLine(argc, argv);
+  if (ret != 0) {
+return ret;
+  }
+
+  if (config_yaml.empty()) {
+std::cerr << "Missing --config-yaml . It must contain the path to 
the config.yaml containing the PersistableKeyValueStoreService controller 
service configuration." << std::endl;
+return -1;
+  }
+
+  return session.run();
+}
+
+class PersistableKeyValueStoreServiceTestsFixture {
+ public:
+  PersistableKeyValueStoreServiceTestsFixture()
+  : state_dir(strdup("/tmp/state.XX")) {
+LogTestController::getInstance().setTrace();
+
LogTestController::getInstance().setTrace();
+
LogTestController::getInstance().setTrace();
+
+// Create temporary directories
+testController.createTempDirectory(state_dir, false /*cleanup*/);
+REQUIRE(state_dir != nullptr);
+REQUIRE(0 == chdir(state_dir));
+
+loadYaml();
+  }
+
+  virtual ~PersistableKeyValueStoreServiceTestsFixture() {
+free(state_dir);
+LogTestController::getInstance().reset();
+  }
+
+  void loadYaml() {
+controller.reset();
+persistable_key_value_store_service_node.reset();
+
+process_group.reset();
+yaml_config.reset();
+
+stream_factory.reset();
+content_repo.reset();
+test_flow_repo.reset();
+test_repo.reset();
+configuration.reset();
+
+configuration = std::make_shared();
+test_repo = std::make_shared();
+test_flow_repo = std::make_shared();
+
+configuration->set(minifi::Configure::nifi_flow_configuration_file, 
config_yaml);
+
+content_repo = 
std::make_shared();
+content_repo->initialize(configuration);
+stream_factory = minifi::io::StreamFactory::getInstance(configuration);
+
+yaml_config = std::unique_ptr(new 
core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, 
configuration, config_yaml));
+
+process_group = yaml_config->getRoot(config_yaml);
+persistable_key_value_store_service_node = 
process_group->findControllerService("testcontroller");
+REQUIRE(persistable_key_value_store_service_node != nullptr);
+persistable_key_value_store_service_node->enable();
+
+controller = 
std::dynamic_pointer_cast(
+
persistable_key_value_store_service_node->getControllerServiceImplementation());
+REQUIRE(controller != nullptr);
+  }
+
+ protected:
+  char *state_dir;
+  std::shared_ptr configuration;
+  std::shared_ptr test_repo;
+  std::shared_ptr test_flow_repo;
+  std::shared_ptr content_repo;
+  std::shared_ptr stream_factory;
+
+  std::unique_ptr yaml_config;
+  std::unique_ptr process_group;
+
+  std::shared_ptr 
persistable_key_value_store_service_node;
+  std::shared_ptr 
controller;
+
+  TestController testController;
+};
+
+
+TEST_CASE_METHOD(PersistableKeyValueStoreServiceTestsFixture, 
"PersistableKeyValueStoreServiceTestsFixture set and get", "[basic]") {
+  const char* key = "foobar";
+  const char* value 

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-03-09 Thread GitBox
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement 
RocksDB controller service and component st…
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r389757475
 
 

 ##
 File path: libminifi/include/core/ProcessContext.h
 ##
 @@ -77,6 +85,21 @@ class ProcessContext : public 
controller::ControllerServiceLookup, public core::
 processor_node_(processor),
 logger_(logging::LoggerFactory::getLogger()) {
 repo_ = repo;
+std::string id;
+if 
(configuration->get(minifi::Configure::nifi_state_management_provider_local, 
id)) {
+  auto node = controller_service_provider_->getControllerServiceNode(id);
+  if (node == nullptr) {
+logger_->log_error("Failed to find the 
CoreComponentStateManagerProvider %s defined by %s", id, 
minifi::Configure::nifi_state_management_provider_local);
 
 Review comment:
   This is not a fatal failure, only in the case of processors that really need 
to store state. If state storage is misconfigured, processors that do not 
require it should still run fine.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-03-09 Thread GitBox
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement 
RocksDB controller service and component st…
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r389756151
 
 

 ##
 File path: libminifi/include/core/CoreComponentState.h
 ##
 @@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIBMINIFI_INCLUDE_CORE_CoreComponentState_H_
+#define LIBMINIFI_INCLUDE_CORE_CoreComponentState_H_
+
+#include "Core.h"
+
+#include 
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+class CoreComponentStateManager {
+ public:
+  virtual ~CoreComponentStateManager() {
+  }
+
+  virtual bool set(const std::unordered_map& kvs) = 
0;
+
+  virtual bool get(std::unordered_map& kvs) = 0;
 
 Review comment:
   Agreed, but signalling errors here in bools is more practical than through 
exceptions.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-03-09 Thread GitBox
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement 
RocksDB controller service and component st…
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r389755764
 
 

 ##
 File path: 
libminifi/src/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.cpp
 ##
 @@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "controllers/keyvalue/AbstractCoreComponentStateManagerProvider.h"
+
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::AbstractCoreComponentStateManager(
+std::shared_ptr provider,
+const std::string& id)
+: provider_(std::move(provider))
+, id_(id)
+, state_valid_(false) {
+  std::string serialized;
+  if (provider_->getImpl(id_, serialized)) {
+if (provider_->deserialize(serialized, state_)) {
 
 Review comment:
   Could you elaborate? What should be `&&`? `serialized` is not modified, 
`state_` is the output.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-03-09 Thread GitBox
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement 
RocksDB controller service and component st…
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r389753888
 
 

 ##
 File path: libminifi/include/core/ProcessContext.h
 ##
 @@ -193,6 +216,61 @@ class ProcessContext : public 
controller::ControllerServiceLookup, public core::
 return controller_service_provider_->getControllerServiceName(identifier);
   }
 
+  static constexpr char const* DefaultStateManagerProviderName = 
"defaultstatemanagerprovider";
 
 Review comment:
   > default- ok
   > state- stateless is generally better, but ok
   This is a PR about creating a centralized state storage mechanism for 
processors that require state, instead of all of them implementing individual 
state files. "Stateless is generally better" is a mind-bogglingly meaningless 
comment here.
   > manager- meaningless. Only suggests doing something, so it should probably 
be a verb. Having a noun in place of a verb usually means trying to use a class 
where a function would be better.
   StateManager comes from NiFi, and it is a perfectly meaningful name: it is 
an object that helps manage the state of a single CoreComponent. It cannot be a 
function as it contains some caching and validation logic that requires it in 
itself to be stateful.
   > provider- again, a class instead of a function. This has a meaning though.
   CoreComponentStateManagerProvider needs to exist as interface, which is used 
by the ClassLoader and we access different implementations of it through that 
interface.
   > name- ok
   > 
   > I'm not saying that your code is bad, just that this is a red flag that 
suggests a design mistake that may or may not be in your changes.
   I'm not saying your comment is useless, but it could be more useful if you 
tried to interpret the name in the context of the PR, instead of in itself.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-03-09 Thread GitBox
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement 
RocksDB controller service and component st…
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r389753888
 
 

 ##
 File path: libminifi/include/core/ProcessContext.h
 ##
 @@ -193,6 +216,61 @@ class ProcessContext : public 
controller::ControllerServiceLookup, public core::
 return controller_service_provider_->getControllerServiceName(identifier);
   }
 
+  static constexpr char const* DefaultStateManagerProviderName = 
"defaultstatemanagerprovider";
 
 Review comment:
   > default- ok
   > state- stateless is generally better, but ok
   
   This is a PR about creating a centralized state storage mechanism for 
processors that require state, instead of all of them implementing individual 
state files. "Stateless is generally better" is a mind-bogglingly meaningless 
comment here.
   
   > manager- meaningless. Only suggests doing something, so it should probably 
be a verb. Having a noun in place of a verb usually means trying to use a class 
where a function would be better.
   
   StateManager comes from NiFi, and it is a perfectly meaningful name: it is 
an object that helps manage the state of a single CoreComponent. It cannot be a 
function as it contains some caching and validation logic that requires it in 
itself to be stateful.
   
   > provider- again, a class instead of a function. This has a meaning though.
   
   CoreComponentStateManagerProvider needs to exist as interface, which is used 
by the ClassLoader and we access different implementations of it through that 
interface.
   
   > name- ok
   
   I'm not saying that your code is bad, just that this is a red flag that 
suggests a design mistake that may or may not be in your changes.
   I'm not saying your comment is useless, but it could be more useful if you 
tried to interpret the name in the context of the PR, instead of in itself.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-03-09 Thread GitBox
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement 
RocksDB controller service and component st…
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r389753888
 
 

 ##
 File path: libminifi/include/core/ProcessContext.h
 ##
 @@ -193,6 +216,61 @@ class ProcessContext : public 
controller::ControllerServiceLookup, public core::
 return controller_service_provider_->getControllerServiceName(identifier);
   }
 
+  static constexpr char const* DefaultStateManagerProviderName = 
"defaultstatemanagerprovider";
 
 Review comment:
   > default- ok
   > state- stateless is generally better, but ok
   
   This is a PR about creating a centralized state storage mechanism for 
processors that require state, instead of all of them implementing individual 
state files. "Stateless is generally better" is a mind-bogglingly meaningless 
comment here.
   
   > manager- meaningless. Only suggests doing something, so it should probably 
be a verb. Having a noun in place of a verb usually means trying to use a class 
where a function would be better.
   
   StateManager comes from NiFi, and it is a perfectly meaningful name: it is 
an object that helps manage the state of a single CoreComponent. It cannot be a 
function as it contains some caching and validation logic that requires it in 
itself to be stateful.
   
   > provider- again, a class instead of a function. This has a meaning though.
   
   CoreComponentStateManagerProvider needs to exist as interface, which is used 
by the ClassLoader and we access different implementations of it through that 
interface.
   
   > name- ok
   
   >I'm not saying that your code is bad, just that this is a red flag that 
suggests a design mistake that may or may not be in your changes.
   
   I'm not saying your comment is useless, but it could be more useful if you 
tried to interpret the name in the context of the PR, instead of in itself.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st…

2020-03-09 Thread GitBox
bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement 
RocksDB controller service and component st…
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r389748596
 
 

 ##
 File path: extensions/sftp/processors/ListSFTP.cpp
 ##
 @@ -507,85 +463,87 @@ ListSFTP::ListedEntity::ListedEntity(uint64_t 
timestamp_, uint64_t size_)
 , size(size_) {
 }
 
-bool ListSFTP::persistTrackingTimestampsCache(const std::string& hostname, 
const std::string& username, const std::string& remote_path) {
-  std::ofstream file(tracking_timestamps_state_filename_);
-  if (!file.is_open()) {
-logger_->log_error("Failed to store state to Tracking Timestamps state 
file \"%s\"", tracking_timestamps_state_filename_.c_str());
+bool ListSFTP::persistTrackingTimestampsCache(const 
std::shared_ptr& context, const std::string& hostname, 
const std::string& username, const std::string& remote_path) {
+  auto state_manager = context->getStateManager();
+  if (state_manager == nullptr) {
 return false;
   }
-  file << "hostname=" << hostname << "\n";
-  file << "username=" << username << "\n";
-  file << "remote_path=" << remote_path << "\n";
-  file << "listing.timestamp=" << last_listed_latest_entry_timestamp_ << "\n";
-  file << "processed.timestamp=" << last_processed_latest_entry_timestamp_ << 
"\n";
+  std::unordered_map state;
+  state["listing_strategy"] = LISTING_STRATEGY_TRACKING_TIMESTAMPS;
+  state["hostname"] = hostname;
+  state["username"] = username;
+  state["remote_path"] = remote_path;
+  state["listing.timestamp"] = 
std::to_string(last_listed_latest_entry_timestamp_);
+  state["processed.timestamp"] = 
std::to_string(last_processed_latest_entry_timestamp_);
   size_t i = 0;
   for (const auto& identifier : latest_identifiers_processed_) {
-file << "id." << i << "=" << identifier << "\n";
+state["id." + std::to_string(i)] = identifier;
 ++i;
   }
+  state_manager->set(state);
+  if (!state_manager->persist()) {
+return false;
+  }
   return true;
 }
 
-bool ListSFTP::updateFromTrackingTimestampsCache(const std::string& hostname, 
const std::string& username, const std::string& remote_path) {
-  std::ifstream file(tracking_timestamps_state_filename_);
-  if (!file.is_open()) {
-logger_->log_error("Failed to open Tracking Timestamps state file \"%s\"", 
tracking_timestamps_state_filename_.c_str());
-return false;
-  }
+bool ListSFTP::updateFromTrackingTimestampsCache(const 
std::shared_ptr& context, const std::string& hostname, 
const std::string& username, const std::string& remote_path) {
 
 Review comment:
   Agreed, we use it because `onTrigger` already gets it this way and it is 
easier to just pass that to these functions.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services