Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 77a20dbe3 -> 1a1716dc6


MINIFI-262: Configuration Listener

This closes #112.

Signed-off-by: Marc Parisi <phroc...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/1a1716dc
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/1a1716dc
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/1a1716dc

Branch: refs/heads/master
Commit: 1a1716dc675886689217860a299f7a8d36850adf
Parents: 77a20db
Author: Bin Qiu <benqiu2...@gmail.com>
Authored: Wed Jun 7 09:26:11 2017 -0700
Committer: Marc Parisi <phroc...@apache.org>
Committed: Wed Jun 7 13:55:28 2017 -0400

----------------------------------------------------------------------
 README.md                                       |  18 ++
 cmake/BuildTests.cmake                          |   3 +
 libminifi/include/ConfigurationListener.h       | 122 ++++++++++++++
 libminifi/include/FlowControlProtocol.h         |   4 -
 libminifi/include/FlowController.h              |  39 ++++-
 libminifi/include/HttpConfigurationListener.h   |  82 +++++++++
 libminifi/include/core/FlowConfiguration.h      |   9 +-
 libminifi/include/core/ProcessGroup.h           |   9 +-
 .../core/repository/FlowFileRepository.h        |   2 +-
 libminifi/include/core/yaml/YamlConfiguration.h |  16 ++
 libminifi/include/processors/InvokeHTTP.h       |  58 +------
 libminifi/include/properties/Configure.h        |  11 ++
 libminifi/include/utils/HTTPUtils.h             |  97 +++++++++++
 libminifi/src/ConfigurationListener.cpp         | 131 +++++++++++++++
 libminifi/src/Configure.cpp                     |  34 +++-
 libminifi/src/FlowController.cpp                |  37 +++-
 libminifi/src/HttpConfigurationListener.cpp     | 167 +++++++++++++++++++
 libminifi/src/core/FlowConfiguration.cpp        |   6 +-
 libminifi/src/core/ProcessGroup.cpp             |   4 +-
 libminifi/src/core/yaml/YamlConfiguration.cpp   |  14 +-
 libminifi/src/processors/InvokeHTTP.cpp         |  16 +-
 .../HttpConfigurationListenerTest.cpp           | 144 ++++++++++++++++
 22 files changed, 938 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index acaf8dd..0fb25f3 100644
--- a/README.md
+++ b/README.md
@@ -323,6 +323,24 @@ Additionally, users can utilize the MiNiFi Toolkit 
Converter (version 0.0.1 - sc
       host: localhost
       port uuid: 471deef6-2a6e-4a7d-912a-81cc17e3a204
       batch size: 100
+
+### Http Configuration Listener
+
+    Http Configuration Listener will pull flow file configuration from the 
remote command control server, 
+    validate and apply the new flow configuration
+
+    in minifi.properties 
+
+    nifi.configuration.listener.type=http
+    nifi.configuration.listener.http.url=https://localhost:8080
+    nifi.configuration.listener.pull.interval=1 sec 
+    nifi.configuration.listener.client.ca.certificate=./conf/nifi-cert.pem
+
+    if you want to enable client certificate
+    nifi.configuration.listener.need.ClientAuth=true
+    nifi.configuration.listener.client.certificate=./conf/client.pem
+    nifi.configuration.listener.client.private.key=./conf/client.key
+    nifi.configuration.listener.client.pass.phrase=./conf/password
       
 ### Controller Services
  If you need to reference a controller service in your config.yml file, use 
the following template. In the example, below, ControllerServiceClass is the 
name of the class defining the controller Service. ControllerService1 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/cmake/BuildTests.cmake
----------------------------------------------------------------------
diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake
index bad01d3..aedae10 100644
--- a/cmake/BuildTests.cmake
+++ b/cmake/BuildTests.cmake
@@ -36,6 +36,7 @@ function(createTests testName)
     target_include_directories(${testName} PRIVATE BEFORE 
"thirdparty/spdlog-0.13.0/include")
     target_include_directories(${testName} PRIVATE BEFORE 
"thirdparty/yaml-cpp-yaml-cpp-0.5.3/include")
     target_include_directories(${testName} PRIVATE BEFORE 
"thirdparty/jsoncpp/include")
+    target_include_directories(${testName} PRIVATE BEFORE 
"thirdparty/civetweb-1.9.1/include")
     target_include_directories(${testName} PRIVATE BEFORE 
${LEVELDB_INCLUDE_DIRS})
     target_include_directories(${testName} PRIVATE BEFORE "include")
     target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/")
@@ -87,6 +88,8 @@ add_test(NAME ControllerServiceIntegrationTests COMMAND 
ControllerServiceIntegra
 
 add_test(NAME HttpGetIntegrationTest COMMAND HttpGetIntegrationTest 
"${TEST_RESOURCES}/TestHTTPGet.yml"  "${TEST_RESOURCES}/")
 
+add_test(NAME HttpConfigurationListenerTest COMMAND 
HttpConfigurationListenerTest "${TEST_RESOURCES}/TestHTTPGet.yml"  
"${TEST_RESOURCES}/")
+
 add_test(NAME HttpGetIntegrationTestSecure COMMAND HttpGetIntegrationTest 
"${TEST_RESOURCES}/TestHTTPGetSecure.yml"  "${TEST_RESOURCES}/")
 
 add_test(NAME HttpPostIntegrationTest COMMAND HttpPostIntegrationTest 
"${TEST_RESOURCES}/TestHTTPPost.yml" )

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/include/ConfigurationListener.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ConfigurationListener.h 
b/libminifi/include/ConfigurationListener.h
new file mode 100644
index 0000000..5574226
--- /dev/null
+++ b/libminifi/include/ConfigurationListener.h
@@ -0,0 +1,122 @@
+/**
+ * ConfigurationListener class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __CONFIGURATION_LISTENER__
+#define __CONFIGURATION_LISTENER__
+
+#include <memory>
+#include <atomic>
+#include <cstdint>
+#include <cstring>
+#include <iostream>
+#include <string>
+#include <thread>
+
+#include "yaml-cpp/yaml.h"
+#include "core/Core.h"
+#include "core/Property.h"
+#include "properties/Configure.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+// Forwarder declaration
+class FlowController;
+// ConfigurationListener Class
+class ConfigurationListener {
+public:
+
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  ConfigurationListener(std::shared_ptr<FlowController> controller,
+      std::shared_ptr<Configure> configure, std::string type) :
+      connect_timeout_(20000), read_timeout_(20000), type_(type), configure_(
+          configure), controller_(controller), need_client_certificate_(false) 
{
+    logger_ = logging::LoggerFactory<ConfigurationListener>::getLogger();
+    running_ = false;
+  }
+  // Destructor
+  virtual ~ConfigurationListener() {
+    stop();
+  }
+
+  // Start the thread
+  void start();
+  // Stop the thread
+  void stop();
+  // whether the thread is enable
+  bool isRunning() {
+    return running_;
+  }
+  // pull the new configuration from the remote host
+  virtual bool pullConfiguration(std::string &configuration) {
+    return false;
+  }
+
+protected:
+
+  // Run function for the thread
+  void run();
+
+  // Run function for the thread
+  void threadExecutor() {
+    run();
+  }
+
+  // Mutex for protection
+  std::mutex mutex_;
+  // thread
+  std::thread thread_;
+  // whether the thread is running
+  std::atomic<bool> running_;
+
+  // url
+  std::string url_;
+  // connection timeout
+  int64_t connect_timeout_;
+  // read timeout.
+  int64_t read_timeout_;
+  // pull interval
+  int64_t pull_interval_;
+  // type (http/rest)
+  std::string type_;
+  // last applied configuration
+  std::string lastAppliedConfiguration;
+
+  std::shared_ptr<Configure> configure_;
+  std::shared_ptr<logging::Logger> logger_;
+  std::shared_ptr<FlowController> controller_;
+
+  bool need_client_certificate_;
+  std::string certificate_;
+  std::string private_key_;
+  std::string passphrase_;
+  std::string ca_certificate_;
+};
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/include/FlowControlProtocol.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowControlProtocol.h 
b/libminifi/include/FlowControlProtocol.h
index 8992049..c0781b8 100644
--- a/libminifi/include/FlowControlProtocol.h
+++ b/libminifi/include/FlowControlProtocol.h
@@ -218,10 +218,6 @@ class FlowControlProtocol {
   }
   // Run function for the thread
   static void run(FlowControlProtocol *protocol);
-  // set 8 bytes SerialNumber
-  void setSerialNumber(uint8_t *number) {
-    memcpy(_serialNumber, number, 8);
-  }
 
  protected:
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/include/FlowController.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowController.h 
b/libminifi/include/FlowController.h
index dc0d610..6ea802c 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -21,6 +21,7 @@
 #define __FLOW_CONTROLLER_H__
 
 #include <uuid/uuid.h>
+#include <stdio.h>
 #include <vector>
 #include <queue>
 #include <map>
@@ -43,6 +44,8 @@
 #include "TimerDrivenSchedulingAgent.h"
 #include "EventDrivenSchedulingAgent.h"
 #include "FlowControlProtocol.h"
+#include "ConfigurationListener.h"
+#include "HttpConfigurationListener.h"
 
 #include "core/Property.h"
 
@@ -127,9 +130,36 @@ class FlowController : public 
core::controller::ControllerServiceProvider, publi
       root_->updatePropertyValue(processorName, propertyName, propertyValue);
   }
 
-  // set 8 bytes SerialNumber
-  virtual void setSerialNumber(uint8_t *number) {
-    protocol_->setSerialNumber(number);
+  // set SerialNumber
+  void setSerialNumber(std::string number) {
+    serial_number_ = number;
+  }
+
+  // get serial number as string
+  std::string getSerialNumber() {
+    return serial_number_;
+  }
+
+  // validate and apply passing yaml configuration payload
+  // first it will validate the payload with the current root node config for 
flowController
+  // like FlowController id/name is the same and new version is greater than 
the current version
+  // after that, it will apply the configuration
+  bool applyConfiguration(std::string &configurePayload);
+
+  // get name
+  std::string getName() {
+    if (root_ != nullptr)
+      return root_->getName();
+    else
+      return "";
+  }
+
+  // get version
+  int getVersion() {
+    if (root_ != nullptr)
+      return root_->getVersion();
+    else
+      return 0;
   }
 
   /**
@@ -292,6 +322,9 @@ class FlowController : public 
core::controller::ControllerServiceProvider, publi
 
  private:
   std::shared_ptr<logging::Logger> logger_;
+  // http configuration listener object.
+  std::unique_ptr<HttpConfigurationListener> http_configuration_listener_;
+  std::string serial_number_;
 };
 
 } /* namespace minifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/include/HttpConfigurationListener.h
----------------------------------------------------------------------
diff --git a/libminifi/include/HttpConfigurationListener.h 
b/libminifi/include/HttpConfigurationListener.h
new file mode 100644
index 0000000..72d4728
--- /dev/null
+++ b/libminifi/include/HttpConfigurationListener.h
@@ -0,0 +1,82 @@
+/**
+ * HttpConfigurationListener class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __HTTP_CONFIGURATION_LISTENER__
+#define __HTTP_CONFIGURATION_LISTENER__
+
+#include <curl/curl.h>
+#include "core/Core.h"
+#include "core/Property.h"
+#include "ConfigurationListener.h"
+#include "utils/HTTPUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+// HttpConfigurationListener Class
+class HttpConfigurationListener: public ConfigurationListener {
+public:
+
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  HttpConfigurationListener(std::shared_ptr<FlowController> controller,
+      std::shared_ptr<Configure> configure) :
+      minifi::ConfigurationListener(controller, configure, "http") {
+      std::string value;
+
+      if (configure->get(Configure::nifi_configuration_listener_http_url, 
value)) {
+        url_ = value;
+        logger_->log_info("Http configuration listener URL %s", url_.c_str());
+      } else {
+        url_ = "";
+      }
+
+      curl_global_init(CURL_GLOBAL_DEFAULT);
+      this->start();
+  }
+
+  bool pullConfiguration(std::string &configuration);
+
+  /**
+    * Configures a secure connection
+    */
+  void configureSecureConnection(CURL *http_session);
+
+  static CURLcode configureSSLContext(CURL *curl, void *ctx, void *param);
+  static int pemPassWordCb(char *buf, int size, int rwflag, void *param);
+
+  // Destructor
+  virtual ~HttpConfigurationListener() {
+    this->stop();
+    curl_global_cleanup();
+  }
+
+protected:
+
+};
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/include/core/FlowConfiguration.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/FlowConfiguration.h 
b/libminifi/include/core/FlowConfiguration.h
index edcb2b6..6e2b700 100644
--- a/libminifi/include/core/FlowConfiguration.h
+++ b/libminifi/include/core/FlowConfiguration.h
@@ -75,8 +75,8 @@ class FlowConfiguration : public CoreComponent {
   // Create Processor (Node/Input/Output Port) based on the name
   std::shared_ptr<core::Processor> createProcessor(std::string name, uuid_t 
uuid);
   // Create Root Processor Group
-  std::unique_ptr<core::ProcessGroup> createRootProcessGroup(std::string name, 
uuid_t uuid);
-
+  std::unique_ptr<core::ProcessGroup> createRootProcessGroup(std::string name,
+                                                             uuid_t uuid, int 
version);
   std::shared_ptr<core::controller::ControllerServiceNode> 
createControllerService(const std::string &class_name, const std::string &name, 
uuid_t uuid);
 
   // Create Remote Processor Group
@@ -98,6 +98,11 @@ class FlowConfiguration : public CoreComponent {
     return getRoot(config_path_);
   }
 
+  virtual std::unique_ptr<core::ProcessGroup> getRootFromPayload(
+      std::string &yamlConfigPayload) {
+    return nullptr;
+  }
+
   /**
    * Base implementation that returns a null root pointer.
    * @return Extensions should return a non-null pointer in order to

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/include/core/ProcessGroup.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessGroup.h 
b/libminifi/include/core/ProcessGroup.h
index f54f5b4..4978886 100644
--- a/libminifi/include/core/ProcessGroup.h
+++ b/libminifi/include/core/ProcessGroup.h
@@ -55,7 +55,8 @@ class ProcessGroup {
   /*!
    * Create a new process group
    */
-  ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid = NULL, 
ProcessGroup *parent = NULL);
+  ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid = NULL, 
int version = 0,
+               ProcessGroup *parent = NULL);
   // Destructor
   virtual ~ProcessGroup();
   // Set Processor Name
@@ -109,6 +110,10 @@ class ProcessGroup {
     } else
       return false;
   }
+  // getVersion
+  int getVersion() {
+    return version_;
+  }
   // Start Processing
   void startProcessing(TimerDrivenSchedulingAgent *timeScheduler, 
EventDrivenSchedulingAgent *eventScheduler);
   // Stop Processing
@@ -165,6 +170,8 @@ class ProcessGroup {
   uuid_t uuid_;
   // Processor Group Name
   std::string name_;
+  // version
+  int version_;
   // Process Group Type
   ProcessGroupType type_;
   // Processors (ProcessNode) inside this process group which include 
Input/Output Port, Remote Process Group input/Output port

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/include/core/repository/FlowFileRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/repository/FlowFileRepository.h 
b/libminifi/include/core/repository/FlowFileRepository.h
index b7c43b2..2e19286 100644
--- a/libminifi/include/core/repository/FlowFileRepository.h
+++ b/libminifi/include/core/repository/FlowFileRepository.h
@@ -37,7 +37,7 @@ namespace repository {
 #define FLOWFILE_REPOSITORY_DIRECTORY "./flowfile_repository"
 #define MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE (10*1024*1024) // 10M
 #define MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME (600000) // 10 minute
-#define FLOWFILE_REPOSITORY_PURGE_PERIOD (2500) // 2500 msec
+#define FLOWFILE_REPOSITORY_PURGE_PERIOD (2000) // 2000 msec
 
 /**
  * Flow File repository

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/include/core/yaml/YamlConfiguration.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/yaml/YamlConfiguration.h 
b/libminifi/include/core/yaml/YamlConfiguration.h
index 3bfaefd..61bf271 100644
--- a/libminifi/include/core/yaml/YamlConfiguration.h
+++ b/libminifi/include/core/yaml/YamlConfiguration.h
@@ -91,6 +91,22 @@ class YamlConfiguration : public FlowConfiguration {
     return getRoot(&rootYamlNode);
   }
 
+  /**
+    * Returns a shared pointer to a ProcessGroup object containing the
+    * flow configuration. The yamlConfigPayload argument must be
+    * a payload for the raw YAML configuration.
+    *
+    * @param yamlConfigPayload an input payload for the raw YAML configuration
+    *                           to be parsed and loaded into the flow
+    *                           configuration tree
+    * @return                 the root ProcessGroup node of the flow
+    *                           configuration tree
+    */
+   std::unique_ptr<core::ProcessGroup> getRootFromPayload(std::string 
&yamlConfigPayload) {
+     YAML::Node rootYamlNode = YAML::Load(yamlConfigPayload);
+     return getRoot(&rootYamlNode);
+   }
+
  protected:
 
   /**

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/include/processors/InvokeHTTP.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/InvokeHTTP.h 
b/libminifi/include/processors/InvokeHTTP.h
index ab78fd5..c8e0c10 100644
--- a/libminifi/include/processors/InvokeHTTP.h
+++ b/libminifi/include/processors/InvokeHTTP.h
@@ -32,6 +32,7 @@
 #include "controllers/SSLContextService.h"
 #include "utils/ByteInputCallBack.h"
 #include "core/logging/LoggerConfiguration.h"
+#include "utils/HTTPUtils.h"
 
 namespace org {
 namespace apache {
@@ -39,63 +40,6 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-struct CallBackPosition {
-  utils::ByteInputCallBack *ptr;
-  size_t pos;
-};
-
-/**
- * HTTP Response object
- */
-struct HTTPRequestResponse {
-  std::vector<char> data;
-
-  /**
-   * Receive HTTP Response.
-   */
-  static size_t recieve_write(char * data, size_t size, size_t nmemb, void * 
p) {
-    return static_cast<HTTPRequestResponse*>(p)->write_content(data, size, 
nmemb);
-  }
-
-  /**
-   * Callback for post, put, and patch operations
-   * @param buffer
-   * @param size size of buffer
-   * @param nitems items to add
-   * @param insteam input stream object.
-   */
-
-  static size_t send_write(char * data, size_t size, size_t nmemb, void * p) {
-    if (p != 0) {
-      CallBackPosition *callback = (CallBackPosition*) p;
-      if (callback->pos <= callback->ptr->getBufferSize()) {
-        char *ptr = callback->ptr->getBuffer();
-        int len = callback->ptr->getBufferSize() - callback->pos;
-        if (len <= 0) {
-          delete callback->ptr;
-          delete callback;
-          return 0;
-        }
-        if (len > size * nmemb)
-          len = size * nmemb;
-        memcpy(data, callback->ptr->getBuffer() + callback->pos, len);
-        callback->pos += len;
-        return len;
-      }
-    } else {
-      return CURL_READFUNC_ABORT;
-    }
-
-    return 0;
-  }
-
-  size_t write_content(char* ptr, size_t size, size_t nmemb) {
-    data.insert(data.end(), ptr, ptr + size * nmemb);
-    return size * nmemb;
-  }
-
-};
-
 // InvokeHTTP Class
 class InvokeHTTP : public core::Processor {
  public:

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/include/properties/Configure.h
----------------------------------------------------------------------
diff --git a/libminifi/include/properties/Configure.h 
b/libminifi/include/properties/Configure.h
index 58f6679..fa19a18 100644
--- a/libminifi/include/properties/Configure.h
+++ b/libminifi/include/properties/Configure.h
@@ -54,10 +54,21 @@ class Configure : public Properties {
   static const char *nifi_flowfile_repository_enable;
   static const char *nifi_remote_input_secure;
   static const char *nifi_security_need_ClientAuth;
+  // site2site security config
   static const char *nifi_security_client_certificate;
   static const char *nifi_security_client_private_key;
   static const char *nifi_security_client_pass_phrase;
   static const char *nifi_security_client_ca_certificate;
+  static const char *nifi_configuration_listener_pull_interval;
+  static const char *nifi_configuration_listener_http_url;
+  static const char *nifi_configuration_listener_rest_url;
+  static const char *nifi_configuration_listener_type; // http or rest
+  // configuration listener security config
+  static const char *nifi_configuration_listener_need_ClientAuth;
+  static const char *nifi_configuration_listener_client_certificate;
+  static const char *nifi_configuration_listener_private_key;
+  static const char *nifi_configuration_listener_client_pass_phrase;
+  static const char *nifi_configuration_listener_client_ca_certificate;
 };
 
 } /* namespace minifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/include/utils/HTTPUtils.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/HTTPUtils.h 
b/libminifi/include/utils/HTTPUtils.h
new file mode 100644
index 0000000..3f20f5e
--- /dev/null
+++ b/libminifi/include/utils/HTTPUtils.h
@@ -0,0 +1,97 @@
+/**
+ * HTTPUtils class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __HTTP_UTILS_H__
+#define __HTTP_UTILS_H__
+
+#include <curl/curl.h>
+#include <vector>
+#include "ByteInputCallBack.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+struct CallBackPosition {
+  ByteInputCallBack *ptr;
+  size_t pos;
+};
+
+/**
+ * HTTP Response object
+ */
+struct HTTPRequestResponse {
+  std::vector<char> data;
+
+  /**
+   * Receive HTTP Response.
+   */
+  static size_t recieve_write(char * data, size_t size, size_t nmemb,
+                              void * p) {
+    return static_cast<HTTPRequestResponse*>(p)->write_content(data, size,
+                                                               nmemb);
+  }
+
+  /**
+   * Callback for post, put, and patch operations
+   * @param buffer
+   * @param size size of buffer
+   * @param nitems items to add
+   * @param insteam input stream object.
+   */
+
+  static size_t send_write(char * data, size_t size, size_t nmemb, void * p) {
+    if (p != 0) {
+      CallBackPosition *callback = (CallBackPosition*) p;
+      if (callback->pos <= callback->ptr->getBufferSize()) {
+        char *ptr = callback->ptr->getBuffer();
+        int len = callback->ptr->getBufferSize() - callback->pos;
+        if (len <= 0) {
+          delete callback->ptr;
+          delete callback;
+          return 0;
+        }
+        if (len > size * nmemb)
+          len = size * nmemb;
+        memcpy(data, callback->ptr->getBuffer() + callback->pos, len);
+        callback->pos += len;
+        return len;
+      }
+    } else {
+      return CURL_READFUNC_ABORT;
+    }
+
+    return 0;
+  }
+
+  size_t write_content(char* ptr, size_t size, size_t nmemb) {
+    data.insert(data.end(), ptr, ptr + size * nmemb);
+    return size * nmemb;
+  }
+
+};
+
+} /* namespace utils */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/src/ConfigurationListener.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ConfigurationListener.cpp 
b/libminifi/src/ConfigurationListener.cpp
new file mode 100644
index 0000000..d52a088
--- /dev/null
+++ b/libminifi/src/ConfigurationListener.cpp
@@ -0,0 +1,131 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ConfigurationListener.h"
+#include "FlowController.h"
+#include <openssl/ssl.h>
+#include <openssl/err.h>
+#include <string>
+#include <memory>
+#include <utility>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+void ConfigurationListener::start() {
+  if (running_)
+    return;
+
+  pull_interval_ = 60 * 1000;
+  std::string value;
+  // grab the value for configuration
+  if (configure_->get(Configure::nifi_configuration_listener_pull_interval,
+      value)) {
+    core::TimeUnit unit;
+    if (core::Property::StringToTime(value, pull_interval_, unit)
+        && core::Property::ConvertTimeUnitToMS(pull_interval_, unit,
+            pull_interval_)) {
+      logger_->log_info("Configuration Listener pull interval: [%d] ms",
+           pull_interval_);
+    }
+  }
+
+  std::string clientAuthStr;
+  if (configure_->get(Configure::nifi_configuration_listener_need_ClientAuth, 
clientAuthStr)) {
+    org::apache::nifi::minifi::utils::StringUtils::StringToBool(clientAuthStr, 
this->need_client_certificate_);
+  }
+
+  if (configure_->get(
+          Configure::nifi_configuration_listener_client_ca_certificate,
+      this->ca_certificate_)) {
+    logger_->log_info("Configuration Listener CA certificates: [%s]",
+        this->ca_certificate_);
+  }
+
+  if (this->need_client_certificate_) {
+    std::string passphrase_file;
+
+    if (!(configure_->get(
+        Configure::nifi_configuration_listener_client_certificate, 
this->certificate_)
+        && configure_->get(Configure::nifi_configuration_listener_private_key,
+            this->private_key_))) {
+      logger_->log_error(
+          "Certificate and Private Key PEM file not configured for 
configuration listener, error: %s.",
+          std::strerror(errno));
+    }
+
+    if (configure_->get(
+        Configure::nifi_configuration_listener_client_pass_phrase,
+        passphrase_file)) {
+      // load the passphase from file
+      std::ifstream file(passphrase_file.c_str(), std::ifstream::in);
+      if (file.good()) {
+        this->passphrase_.assign((std::istreambuf_iterator<char>(file)),
+            std::istreambuf_iterator<char>());
+        file.close();
+      }
+    }
+
+    logger_->log_info("Configuration Listener certificate: [%s], private key: 
[%s], passphrase file: [%s]",
+            this->certificate_, this->private_key_, passphrase_file);
+  }
+
+  thread_ = std::thread(&ConfigurationListener::threadExecutor, this);
+  thread_.detach();
+  running_ = true;
+  logger_->log_info("%s ConfigurationListener Thread Start", type_);
+}
+
+void ConfigurationListener::stop() {
+  if (!running_)
+    return;
+  running_ = false;
+  if (thread_.joinable())
+    thread_.join();
+  logger_->log_info("%s ConfigurationListener Thread Stop", type_);
+}
+
+void ConfigurationListener::run() {
+  std::unique_lock<std::mutex> lk(mutex_);
+  std::condition_variable cv;
+  int64_t interval = 0;
+  while (!cv.wait_for(lk, std::chrono::milliseconds(100), [this] {return 
(running_ == false);})) {
+    interval += 100;
+    if (interval >= pull_interval_) {
+      std::string payload;
+      bool ret = false;
+      ret = pullConfiguration(payload);
+      if (ret) {
+        if (payload.empty() || payload == lastAppliedConfiguration) {
+          interval = 0;
+          continue;
+        }
+        ret = this->controller_->applyConfiguration(payload);
+        if (ret)
+          this->lastAppliedConfiguration = payload;
+      }
+      interval = 0;
+    }
+  }
+}
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/src/Configure.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp
index d8e049c..e1bc225 100644
--- a/libminifi/src/Configure.cpp
+++ b/libminifi/src/Configure.cpp
@@ -43,11 +43,35 @@ const char 
*Configure::nifi_flowfile_repository_max_storage_size = "nifi.flowfil
 const char *Configure::nifi_flowfile_repository_max_storage_time = 
"nifi.flowfile.repository.max.storage.time";
 const char *Configure::nifi_flowfile_repository_directory_default = 
"nifi.flowfile.repository.directory.default";
 const char *Configure::nifi_remote_input_secure = "nifi.remote.input.secure";
-const char *Configure::nifi_security_need_ClientAuth = 
"nifi.security.need.ClientAuth";
-const char *Configure::nifi_security_client_certificate = 
"nifi.security.client.certificate";
-const char *Configure::nifi_security_client_private_key = 
"nifi.security.client.private.key";
-const char *Configure::nifi_security_client_pass_phrase = 
"nifi.security.client.pass.phrase";
-const char *Configure::nifi_security_client_ca_certificate = 
"nifi.security.client.ca.certificate";
+const char *Configure::nifi_security_need_ClientAuth =
+    "nifi.security.need.ClientAuth";
+const char *Configure::nifi_security_client_certificate =
+    "nifi.security.client.certificate";
+const char *Configure::nifi_security_client_private_key =
+    "nifi.security.client.private.key";
+const char *Configure::nifi_security_client_pass_phrase =
+    "nifi.security.client.pass.phrase";
+const char *Configure::nifi_security_client_ca_certificate =
+    "nifi.security.client.ca.certificate";
+const char *Configure::nifi_configuration_listener_pull_interval =
+    "nifi.configuration.listener.pull.interval";
+const char *Configure::nifi_configuration_listener_http_url =
+    "nifi.configuration.listener.http.url";
+const char *Configure::nifi_configuration_listener_rest_url =
+    "nifi.configuration.listener.rest.url";
+const char *Configure::nifi_configuration_listener_type =
+    "nifi.configuration.listener.type";
+const char *Configure::nifi_configuration_listener_need_ClientAuth =
+    "nifi.configuration.listener.need.ClientAuth";
+const char *Configure::nifi_configuration_listener_client_certificate =
+    "nifi.configuration.listener.client.certificate";
+const char *Configure::nifi_configuration_listener_private_key =
+    "nifi.configuration.listener.client.private.key";
+const char *Configure::nifi_configuration_listener_client_pass_phrase =
+    "nifi.configuration.listener.client.pass.phrase";
+const char *Configure::nifi_configuration_listener_client_ca_certificate =
+    "nifi.configuration.listener.client.ca.certificate";
+
 
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 62cf21c..fd75fdd 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -33,6 +33,7 @@
 #include <utility>
 #include <memory>
 #include <string>
+#include "yaml-cpp/yaml.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessGroup.h"
 #include "utils/StringUtils.h"
@@ -152,6 +153,31 @@ FlowController::~FlowController() {
   provenance_repo_ = nullptr;
 }
 
+bool FlowController::applyConfiguration(std::string &configurePayload) {
+  std::unique_ptr<core::ProcessGroup> newRoot;
+  try {
+    newRoot = 
std::move(flow_configuration_->getRootFromPayload(configurePayload));
+  }
+  catch (const YAML::Exception& e) {
+    logger_->log_error("Invalid configuration payload");
+    return false;
+  }
+
+  if (newRoot == nullptr)
+    return false;
+
+  logger_->log_info("Starting to reload Flow Controller with flow control name 
%s, version %d",
+      newRoot->getName().c_str(), newRoot->getVersion());
+
+  std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
+  stop(true);
+  waitUnload(30000);
+  this->root_ = std::move(newRoot);
+  loadFlowRepo();
+  initialized_ = true;
+  return start();
+}
+
 void FlowController::stop(bool force) {
   std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
   if (running_) {
@@ -164,7 +190,7 @@ void FlowController::stop(bool force) {
     this->flow_file_repo_->stop();
     this->provenance_repo_->stop();
     // Wait for sometime for thread stop
-    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+    std::this_thread::sleep_for(std::chrono::milliseconds(3000));
     if (this->root_)
       this->root_->stopProcessing(this->timer_scheduler_.get(), 
this->event_scheduler_.get());
   }
@@ -214,6 +240,15 @@ void FlowController::load() {
     stop(true);
   }
   if (!initialized_) {
+    std::string listenerType;
+    // grab the value for configuration
+    if (this->http_configuration_listener_ == nullptr && 
configuration_->get(Configure::nifi_configuration_listener_type, listenerType)) 
{
+      if (listenerType == "http") {
+        this->http_configuration_listener_ =
+              std::unique_ptr<minifi::HttpConfigurationListener>(new 
minifi::HttpConfigurationListener(shared_from_this(), configuration_));
+      }
+    }
+
     logger_->log_info("Initializing timers");
     if (nullptr == timer_scheduler_) {
       timer_scheduler_ = 
std::make_shared<TimerDrivenSchedulingAgent>(std::static_pointer_cast<core::controller::ControllerServiceProvider>(shared_from_this()),
 provenance_repo_, configuration_);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/src/HttpConfigurationListener.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/HttpConfigurationListener.cpp 
b/libminifi/src/HttpConfigurationListener.cpp
new file mode 100644
index 0000000..70d5793
--- /dev/null
+++ b/libminifi/src/HttpConfigurationListener.cpp
@@ -0,0 +1,167 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "HttpConfigurationListener.h"
+#include "FlowController.h"
+#include <curl/curlbuild.h>
+#include <curl/easy.h>
+#include <iostream>
+#include <iterator>
+#include <string>
+#include <vector>
+#include <utility>
+
+#include "core/logging/Logger.h"
+#include "core/ProcessContext.h"
+#include "core/Relationship.h"
+#include "io/DataStream.h"
+#include "io/StreamFactory.h"
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+int HttpConfigurationListener::pemPassWordCb(char *buf, int size, int rwflag,
+    void *param) {
+  minifi::HttpConfigurationListener *listener =
+      static_cast<minifi::HttpConfigurationListener*>(param);
+
+  if (listener->passphrase_.length() > 0) {
+    memset(buf, 0x00, size);
+    memcpy(buf, listener->passphrase_.c_str(),
+        listener->passphrase_.length() - 1);
+    return listener->passphrase_.length() - 1;
+  }
+  return 0;
+}
+
+CURLcode HttpConfigurationListener::configureSSLContext(CURL *curl, void *ctx,
+    void *param) {
+  minifi::HttpConfigurationListener *listener =
+      static_cast<minifi::HttpConfigurationListener*>(param);
+  SSL_CTX* sslCtx = static_cast<SSL_CTX*>(ctx);
+
+  SSL_CTX_load_verify_locations(sslCtx, listener->ca_certificate_.c_str(), 0);
+  SSL_CTX_use_certificate_file(sslCtx, listener->certificate_.c_str(),
+      SSL_FILETYPE_PEM);
+  SSL_CTX_set_default_passwd_cb(sslCtx,
+      HttpConfigurationListener::pemPassWordCb);
+  SSL_CTX_set_default_passwd_cb_userdata(sslCtx, param);
+  SSL_CTX_use_PrivateKey_file(sslCtx, listener->private_key_.c_str(),
+      SSL_FILETYPE_PEM);
+  // verify private key
+  if (!SSL_CTX_check_private_key(sslCtx)) {
+    listener->logger_->log_error(
+        "Private key does not match the public certificate, error : %s",
+        std::strerror(errno));
+    return CURLE_FAILED_INIT;
+  }
+
+  listener->logger_->log_debug(
+      "HttpConfigurationListener load Client Certificates OK");
+  return CURLE_OK;
+}
+
+void HttpConfigurationListener::configureSecureConnection(CURL *http_session) {
+  curl_easy_setopt(http_session, CURLOPT_VERBOSE, 1L);
+  curl_easy_setopt(http_session, CURLOPT_CAINFO, 
this->ca_certificate_.c_str());
+  curl_easy_setopt(http_session, CURLOPT_SSLCERTTYPE, "PEM");
+  curl_easy_setopt(http_session, CURLOPT_SSL_VERIFYPEER, 1L);
+  if (this->need_client_certificate_) {
+    CURLcode ret;
+    ret = curl_easy_setopt(http_session, CURLOPT_SSL_CTX_FUNCTION,
+        &HttpConfigurationListener::configureSSLContext);
+    if (ret != CURLE_OK)
+      logger_->log_error("CURLOPT_SSL_CTX_FUNCTION not supported %d", ret);
+    curl_easy_setopt(http_session, CURLOPT_SSL_CTX_DATA,
+        static_cast<void*>(this));
+    curl_easy_setopt(http_session, CURLOPT_SSLKEYTYPE, "PEM");
+  }
+}
+
+bool HttpConfigurationListener::pullConfiguration(std::string &configuration) {
+  if (url_.empty())
+    return false;
+
+  bool ret = false;
+
+  std::string fullUrl = url_;
+
+  CURL *http_session = curl_easy_init();
+
+  curl_easy_setopt(http_session, CURLOPT_URL, fullUrl.c_str());
+
+  if (connect_timeout_ > 0) {
+    curl_easy_setopt(http_session, CURLOPT_TIMEOUT, connect_timeout_);
+  }
+
+  if (read_timeout_ > 0) {
+    curl_easy_setopt(http_session, CURLOPT_TIMEOUT, read_timeout_);
+  }
+
+  if (fullUrl.find("https") != std::string::npos) {
+    configureSecureConnection(http_session);
+  }
+
+  utils::HTTPRequestResponse content;
+  curl_easy_setopt(http_session, CURLOPT_WRITEFUNCTION,
+      &utils::HTTPRequestResponse::recieve_write);
+
+  curl_easy_setopt(http_session, CURLOPT_WRITEDATA,
+      static_cast<void*>(&content));
+
+  CURLcode res = curl_easy_perform(http_session);
+
+  if (res == CURLE_OK) {
+    logger_->log_debug("HttpConfigurationListener -- curl successful to %s",
+        fullUrl.c_str());
+
+    std::string response_body(content.data.begin(), content.data.end());
+    int64_t http_code = 0;
+    curl_easy_getinfo(http_session, CURLINFO_RESPONSE_CODE, &http_code);
+    char *content_type;
+    /* ask for the content-type */
+    curl_easy_getinfo(http_session, CURLINFO_CONTENT_TYPE, &content_type);
+
+    bool isSuccess = ((int32_t) (http_code / 100)) == 2
+        && res != CURLE_ABORTED_BY_CALLBACK;
+    bool body_empty = IsNullOrEmpty(content.data);
+
+    if (isSuccess && !body_empty) {
+      configuration = std::move(response_body);
+      logger_->log_debug("config %s", configuration.c_str());
+      ret = true;
+    } else {
+      logger_->log_error("Cannot output body to content");
+    }
+  } else {
+    logger_->log_error(
+        "HttpConfigurationListener -- curl_easy_perform() failed %s\n",
+        curl_easy_strerror(res));
+  }
+  curl_easy_cleanup(http_session);
+
+  return ret;
+}
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/src/core/FlowConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/FlowConfiguration.cpp 
b/libminifi/src/core/FlowConfiguration.cpp
index 6635701..cc6e0e5 100644
--- a/libminifi/src/core/FlowConfiguration.cpp
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -53,8 +53,10 @@ std::shared_ptr<core::Processor> 
FlowConfiguration::createProvenanceReportTask()
   return processor;
 }
 
-std::unique_ptr<core::ProcessGroup> 
FlowConfiguration::createRootProcessGroup(std::string name, uuid_t uuid) {
-  return std::unique_ptr<core::ProcessGroup>(new 
core::ProcessGroup(core::ROOT_PROCESS_GROUP, name, uuid));
+std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRootProcessGroup(
+    std::string name, uuid_t uuid, int version) {
+  return std::unique_ptr<core::ProcessGroup>(
+      new core::ProcessGroup(core::ROOT_PROCESS_GROUP, name, uuid, version));
 }
 
 std::unique_ptr<core::ProcessGroup> 
FlowConfiguration::createRemoteProcessGroup(std::string name, uuid_t uuid) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/src/core/ProcessGroup.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessGroup.cpp 
b/libminifi/src/core/ProcessGroup.cpp
index 9e6778c..7ac139b 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -37,10 +37,12 @@ namespace nifi {
 namespace minifi {
 namespace core {
 
-ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t 
uuid, ProcessGroup *parent)
+ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t 
uuid, int version,
+                           ProcessGroup *parent)
     : logger_(logging::LoggerFactory<ProcessGroup>::getLogger()),
       name_(name),
       type_(type),
+      version_(version),
       parent_process_group_(parent) {
   if (!uuid)
     // Generate the global UUID for the flow record

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/src/core/yaml/YamlConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp 
b/libminifi/src/core/yaml/YamlConfiguration.cpp
index a11db2b..44aec12 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -31,14 +31,24 @@ namespace core {
 
 core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml(YAML::Node 
rootFlowNode) {
   uuid_t uuid;
+  int64_t version = 0;
 
   checkRequiredField(&rootFlowNode, "name", 
CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
   std::string flowName = rootFlowNode["name"].as<std::string>();
   std::string id = getOrGenerateId(&rootFlowNode);
   uuid_parse(id.c_str(), uuid);
 
-  logger_->log_debug("parseRootProcessGroup: id => [%s], name => [%s]", id, 
flowName);
-  std::unique_ptr<core::ProcessGroup> group = 
FlowConfiguration::createRootProcessGroup(flowName, uuid);
+  if (rootFlowNode["version"]) {
+    std::string value = rootFlowNode["version"].as<std::string>();
+    if (core::Property::StringToInt(value, version)) {
+      logger_->log_debug("parseRootProcessorGroup: version => [%d]", version);
+    }
+  }
+
+  logger_->log_debug(
+      "parseRootProcessGroup: id => [%s], name => [%s]", id, flowName);
+  std::unique_ptr<core::ProcessGroup> group =
+      FlowConfiguration::createRootProcessGroup(flowName, uuid, version);
 
   this->name_ = flowName;
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/src/processors/InvokeHTTP.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/InvokeHTTP.cpp 
b/libminifi/src/processors/InvokeHTTP.cpp
index fd39a64..c636201 100644
--- a/libminifi/src/processors/InvokeHTTP.cpp
+++ b/libminifi/src/processors/InvokeHTTP.cpp
@@ -315,8 +315,9 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context, 
core::ProcessSession *
   if (read_timeout_ > 0) {
     curl_easy_setopt(http_session, CURLOPT_TIMEOUT, read_timeout_);
   }
-  HTTPRequestResponse content;
-  curl_easy_setopt(http_session, CURLOPT_WRITEFUNCTION, 
&HTTPRequestResponse::recieve_write);
+  utils::HTTPRequestResponse content;
+  curl_easy_setopt(http_session, CURLOPT_WRITEFUNCTION,
+                   &utils::HTTPRequestResponse::recieve_write);
 
   curl_easy_setopt(http_session, CURLOPT_WRITEDATA, 
static_cast<void*>(&content));
 
@@ -326,14 +327,17 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context, 
core::ProcessSession *
     if (claim) {
       utils::ByteInputCallBack *callback = new utils::ByteInputCallBack();
       session->read(flowFile, callback);
-      CallBackPosition *callbackObj = new CallBackPosition;
+      utils::CallBackPosition *callbackObj = new utils::CallBackPosition;
       callbackObj->ptr = callback;
       callbackObj->pos = 0;
       logger_->log_info("InvokeHTTP -- Setting callback");
       curl_easy_setopt(http_session, CURLOPT_UPLOAD, 1L);
-      curl_easy_setopt(http_session, CURLOPT_INFILESIZE_LARGE, 
(curl_off_t)callback->getBufferSize());
-      curl_easy_setopt(http_session, CURLOPT_READFUNCTION, 
&HTTPRequestResponse::send_write);
-      curl_easy_setopt(http_session, CURLOPT_READDATA, 
static_cast<void*>(callbackObj));
+      curl_easy_setopt(http_session, CURLOPT_INFILESIZE_LARGE,
+                       (curl_off_t)callback->getBufferSize());
+      curl_easy_setopt(http_session, CURLOPT_READFUNCTION,
+                       &utils::HTTPRequestResponse::send_write);
+      curl_easy_setopt(http_session, CURLOPT_READDATA,
+                       static_cast<void*>(callbackObj));
     } else {
       logger_->log_error("InvokeHTTP -- no resource claim");
     }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/test/integration/HttpConfigurationListenerTest.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/integration/HttpConfigurationListenerTest.cpp 
b/libminifi/test/integration/HttpConfigurationListenerTest.cpp
new file mode 100644
index 0000000..a86b884
--- /dev/null
+++ b/libminifi/test/integration/HttpConfigurationListenerTest.cpp
@@ -0,0 +1,144 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/stat.h>
+#include <cassert>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include "../TestBase.h"
+#include "utils/StringUtils.h"
+#include "core/Core.h"
+#include "../include/core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "HttpConfigurationListener.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "../unit/ProvenanceTestHelper.h"
+#include "io/StreamFactory.h"
+#include "CivetServer.h"
+#include <cstring>
+
+void waitToVerifyProcessor() {
+  std::this_thread::sleep_for(std::chrono::seconds(10));
+}
+
+class ConfigHandler: public CivetHandler {
+ public:
+  bool handleGet(CivetServer *server, struct mg_connection *conn) {
+    std::ifstream myfile(test_file_location_.c_str());
+
+    if (myfile.is_open()) {
+      std::stringstream buffer;
+      buffer << myfile.rdbuf();
+      std::string str = buffer.str();
+      myfile.close();
+      mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+          "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+          str.length());
+      mg_printf(conn, "%s", str.c_str());
+    } else {
+      mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n");
+    }
+
+    return true;
+  }
+  std::string test_file_location_;
+};
+
+int main(int argc, char **argv) {
+  LogTestController::getInstance().setInfo<minifi::ConfigurationListener>();
+  LogTestController::getInstance().setInfo<minifi::FlowController>();
+  
LogTestController::getInstance().setInfo<minifi::HttpConfigurationListener>();
+
+  const char *options[] = { "document_root", ".", "listening_ports", "9090", 0 
};
+  std::vector < std::string > cpp_options;
+  for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
+    cpp_options.push_back(options[i]);
+  }
+
+  CivetServer server(cpp_options);
+  ConfigHandler h_ex;
+  server.addHandler("/config", h_ex);
+  LogTestController::getInstance().setDebug<minifi::ConfigurationListener>();
+  std::string key_dir, test_file_location;
+  if (argc > 1) {
+    h_ex.test_file_location_ = test_file_location = argv[1];
+    key_dir = argv[2];
+  }
+  std::shared_ptr<minifi::Configure> configuration = std::make_shared<
+      minifi::Configure>();
+  configuration->set(minifi::Configure::nifi_default_directory, key_dir);
+  configuration->set(minifi::Configure::nifi_configuration_listener_type,
+      "http");
+  configuration->set(
+      minifi::Configure::nifi_configuration_listener_pull_interval, "1 sec");
+  configuration->set(minifi::Configure::nifi_configuration_listener_http_url,
+      "http://localhost:9090/config";);
+  mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
+
+  std::shared_ptr<core::Repository> test_repo =
+      std::make_shared<TestRepository>();
+  std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<
+      TestFlowRepository>();
+
+  configuration->set(minifi::Configure::nifi_flow_configuration_file,
+      test_file_location);
+
+  std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared
+      < minifi::io::StreamFactory > (configuration);
+  std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr
+      < core::YamlConfiguration
+      > (new core::YamlConfiguration(test_repo, test_repo, stream_factory,
+          configuration, test_file_location));
+  std::shared_ptr<TestRepository> repo = std::static_pointer_cast
+      < TestRepository > (test_repo);
+
+  std::shared_ptr<minifi::FlowController> controller =
+      std::make_shared < minifi::FlowController
+          > (test_repo, test_flow_repo, configuration, std::move(yaml_ptr), 
DEFAULT_ROOT_GROUP_NAME, true);
+
+  core::YamlConfiguration yaml_config(test_repo, test_repo, stream_factory,
+      configuration, test_file_location);
+
+  std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(
+      test_file_location);
+  std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr < core::ProcessGroup
+      > (ptr.get());
+  ptr.release();
+
+  controller->load();
+  controller->start();
+  waitToVerifyProcessor();
+
+  controller->waitUnload(60000);
+  std::string logs = LogTestController::getInstance().log_output.str();
+  assert(logs.find("HttpConfigurationListener -- curl successful to 
http://localhost:9090/config";) != std::string::npos);
+  assert(logs.find("Starting to reload Flow Controller with flow control name 
MiNiFi Flow, version 0") != std::string::npos);
+  LogTestController::getInstance().reset();
+  rmdir("./content_repository");
+  return 0;
+}

Reply via email to