Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 77a20dbe3 -> 1a1716dc6
MINIFI-262: Configuration Listener This closes #112. Signed-off-by: Marc Parisi <phroc...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/1a1716dc Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/1a1716dc Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/1a1716dc Branch: refs/heads/master Commit: 1a1716dc675886689217860a299f7a8d36850adf Parents: 77a20db Author: Bin Qiu <benqiu2...@gmail.com> Authored: Wed Jun 7 09:26:11 2017 -0700 Committer: Marc Parisi <phroc...@apache.org> Committed: Wed Jun 7 13:55:28 2017 -0400 ---------------------------------------------------------------------- README.md | 18 ++ cmake/BuildTests.cmake | 3 + libminifi/include/ConfigurationListener.h | 122 ++++++++++++++ libminifi/include/FlowControlProtocol.h | 4 - libminifi/include/FlowController.h | 39 ++++- libminifi/include/HttpConfigurationListener.h | 82 +++++++++ libminifi/include/core/FlowConfiguration.h | 9 +- libminifi/include/core/ProcessGroup.h | 9 +- .../core/repository/FlowFileRepository.h | 2 +- libminifi/include/core/yaml/YamlConfiguration.h | 16 ++ libminifi/include/processors/InvokeHTTP.h | 58 +------ libminifi/include/properties/Configure.h | 11 ++ libminifi/include/utils/HTTPUtils.h | 97 +++++++++++ libminifi/src/ConfigurationListener.cpp | 131 +++++++++++++++ libminifi/src/Configure.cpp | 34 +++- libminifi/src/FlowController.cpp | 37 +++- libminifi/src/HttpConfigurationListener.cpp | 167 +++++++++++++++++++ libminifi/src/core/FlowConfiguration.cpp | 6 +- libminifi/src/core/ProcessGroup.cpp | 4 +- libminifi/src/core/yaml/YamlConfiguration.cpp | 14 +- libminifi/src/processors/InvokeHTTP.cpp | 16 +- .../HttpConfigurationListenerTest.cpp | 144 ++++++++++++++++ 22 files changed, 938 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index acaf8dd..0fb25f3 100644 --- a/README.md +++ b/README.md @@ -323,6 +323,24 @@ Additionally, users can utilize the MiNiFi Toolkit Converter (version 0.0.1 - sc host: localhost port uuid: 471deef6-2a6e-4a7d-912a-81cc17e3a204 batch size: 100 + +### Http Configuration Listener + + Http Configuration Listener will pull flow file configuration from the remote command control server, + validate and apply the new flow configuration + + in minifi.properties + + nifi.configuration.listener.type=http + nifi.configuration.listener.http.url=https://localhost:8080 + nifi.configuration.listener.pull.interval=1 sec + nifi.configuration.listener.client.ca.certificate=./conf/nifi-cert.pem + + if you want to enable client certificate + nifi.configuration.listener.need.ClientAuth=true + nifi.configuration.listener.client.certificate=./conf/client.pem + nifi.configuration.listener.client.private.key=./conf/client.key + nifi.configuration.listener.client.pass.phrase=./conf/password ### Controller Services If you need to reference a controller service in your config.yml file, use the following template. In the example, below, ControllerServiceClass is the name of the class defining the controller Service. ControllerService1 http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/cmake/BuildTests.cmake ---------------------------------------------------------------------- diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake index bad01d3..aedae10 100644 --- a/cmake/BuildTests.cmake +++ b/cmake/BuildTests.cmake @@ -36,6 +36,7 @@ function(createTests testName) target_include_directories(${testName} PRIVATE BEFORE "thirdparty/spdlog-0.13.0/include") target_include_directories(${testName} PRIVATE BEFORE "thirdparty/yaml-cpp-yaml-cpp-0.5.3/include") target_include_directories(${testName} PRIVATE BEFORE "thirdparty/jsoncpp/include") + target_include_directories(${testName} PRIVATE BEFORE "thirdparty/civetweb-1.9.1/include") target_include_directories(${testName} PRIVATE BEFORE ${LEVELDB_INCLUDE_DIRS}) target_include_directories(${testName} PRIVATE BEFORE "include") target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/") @@ -87,6 +88,8 @@ add_test(NAME ControllerServiceIntegrationTests COMMAND ControllerServiceIntegra add_test(NAME HttpGetIntegrationTest COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") +add_test(NAME HttpConfigurationListenerTest COMMAND HttpConfigurationListenerTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") + add_test(NAME HttpGetIntegrationTestSecure COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGetSecure.yml" "${TEST_RESOURCES}/") add_test(NAME HttpPostIntegrationTest COMMAND HttpPostIntegrationTest "${TEST_RESOURCES}/TestHTTPPost.yml" ) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/include/ConfigurationListener.h ---------------------------------------------------------------------- diff --git a/libminifi/include/ConfigurationListener.h b/libminifi/include/ConfigurationListener.h new file mode 100644 index 0000000..5574226 --- /dev/null +++ b/libminifi/include/ConfigurationListener.h @@ -0,0 +1,122 @@ +/** + * ConfigurationListener class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __CONFIGURATION_LISTENER__ +#define __CONFIGURATION_LISTENER__ + +#include <memory> +#include <atomic> +#include <cstdint> +#include <cstring> +#include <iostream> +#include <string> +#include <thread> + +#include "yaml-cpp/yaml.h" +#include "core/Core.h" +#include "core/Property.h" +#include "properties/Configure.h" +#include "core/logging/Logger.h" +#include "core/logging/LoggerConfiguration.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +// Forwarder declaration +class FlowController; +// ConfigurationListener Class +class ConfigurationListener { +public: + + // Constructor + /*! + * Create a new processor + */ + ConfigurationListener(std::shared_ptr<FlowController> controller, + std::shared_ptr<Configure> configure, std::string type) : + connect_timeout_(20000), read_timeout_(20000), type_(type), configure_( + configure), controller_(controller), need_client_certificate_(false) { + logger_ = logging::LoggerFactory<ConfigurationListener>::getLogger(); + running_ = false; + } + // Destructor + virtual ~ConfigurationListener() { + stop(); + } + + // Start the thread + void start(); + // Stop the thread + void stop(); + // whether the thread is enable + bool isRunning() { + return running_; + } + // pull the new configuration from the remote host + virtual bool pullConfiguration(std::string &configuration) { + return false; + } + +protected: + + // Run function for the thread + void run(); + + // Run function for the thread + void threadExecutor() { + run(); + } + + // Mutex for protection + std::mutex mutex_; + // thread + std::thread thread_; + // whether the thread is running + std::atomic<bool> running_; + + // url + std::string url_; + // connection timeout + int64_t connect_timeout_; + // read timeout. + int64_t read_timeout_; + // pull interval + int64_t pull_interval_; + // type (http/rest) + std::string type_; + // last applied configuration + std::string lastAppliedConfiguration; + + std::shared_ptr<Configure> configure_; + std::shared_ptr<logging::Logger> logger_; + std::shared_ptr<FlowController> controller_; + + bool need_client_certificate_; + std::string certificate_; + std::string private_key_; + std::string passphrase_; + std::string ca_certificate_; +}; + +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/include/FlowControlProtocol.h ---------------------------------------------------------------------- diff --git a/libminifi/include/FlowControlProtocol.h b/libminifi/include/FlowControlProtocol.h index 8992049..c0781b8 100644 --- a/libminifi/include/FlowControlProtocol.h +++ b/libminifi/include/FlowControlProtocol.h @@ -218,10 +218,6 @@ class FlowControlProtocol { } // Run function for the thread static void run(FlowControlProtocol *protocol); - // set 8 bytes SerialNumber - void setSerialNumber(uint8_t *number) { - memcpy(_serialNumber, number, 8); - } protected: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/include/FlowController.h ---------------------------------------------------------------------- diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h index dc0d610..6ea802c 100644 --- a/libminifi/include/FlowController.h +++ b/libminifi/include/FlowController.h @@ -21,6 +21,7 @@ #define __FLOW_CONTROLLER_H__ #include <uuid/uuid.h> +#include <stdio.h> #include <vector> #include <queue> #include <map> @@ -43,6 +44,8 @@ #include "TimerDrivenSchedulingAgent.h" #include "EventDrivenSchedulingAgent.h" #include "FlowControlProtocol.h" +#include "ConfigurationListener.h" +#include "HttpConfigurationListener.h" #include "core/Property.h" @@ -127,9 +130,36 @@ class FlowController : public core::controller::ControllerServiceProvider, publi root_->updatePropertyValue(processorName, propertyName, propertyValue); } - // set 8 bytes SerialNumber - virtual void setSerialNumber(uint8_t *number) { - protocol_->setSerialNumber(number); + // set SerialNumber + void setSerialNumber(std::string number) { + serial_number_ = number; + } + + // get serial number as string + std::string getSerialNumber() { + return serial_number_; + } + + // validate and apply passing yaml configuration payload + // first it will validate the payload with the current root node config for flowController + // like FlowController id/name is the same and new version is greater than the current version + // after that, it will apply the configuration + bool applyConfiguration(std::string &configurePayload); + + // get name + std::string getName() { + if (root_ != nullptr) + return root_->getName(); + else + return ""; + } + + // get version + int getVersion() { + if (root_ != nullptr) + return root_->getVersion(); + else + return 0; } /** @@ -292,6 +322,9 @@ class FlowController : public core::controller::ControllerServiceProvider, publi private: std::shared_ptr<logging::Logger> logger_; + // http configuration listener object. + std::unique_ptr<HttpConfigurationListener> http_configuration_listener_; + std::string serial_number_; }; } /* namespace minifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/include/HttpConfigurationListener.h ---------------------------------------------------------------------- diff --git a/libminifi/include/HttpConfigurationListener.h b/libminifi/include/HttpConfigurationListener.h new file mode 100644 index 0000000..72d4728 --- /dev/null +++ b/libminifi/include/HttpConfigurationListener.h @@ -0,0 +1,82 @@ +/** + * HttpConfigurationListener class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __HTTP_CONFIGURATION_LISTENER__ +#define __HTTP_CONFIGURATION_LISTENER__ + +#include <curl/curl.h> +#include "core/Core.h" +#include "core/Property.h" +#include "ConfigurationListener.h" +#include "utils/HTTPUtils.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { + +// HttpConfigurationListener Class +class HttpConfigurationListener: public ConfigurationListener { +public: + + // Constructor + /*! + * Create a new processor + */ + HttpConfigurationListener(std::shared_ptr<FlowController> controller, + std::shared_ptr<Configure> configure) : + minifi::ConfigurationListener(controller, configure, "http") { + std::string value; + + if (configure->get(Configure::nifi_configuration_listener_http_url, value)) { + url_ = value; + logger_->log_info("Http configuration listener URL %s", url_.c_str()); + } else { + url_ = ""; + } + + curl_global_init(CURL_GLOBAL_DEFAULT); + this->start(); + } + + bool pullConfiguration(std::string &configuration); + + /** + * Configures a secure connection + */ + void configureSecureConnection(CURL *http_session); + + static CURLcode configureSSLContext(CURL *curl, void *ctx, void *param); + static int pemPassWordCb(char *buf, int size, int rwflag, void *param); + + // Destructor + virtual ~HttpConfigurationListener() { + this->stop(); + curl_global_cleanup(); + } + +protected: + +}; + +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/include/core/FlowConfiguration.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h index edcb2b6..6e2b700 100644 --- a/libminifi/include/core/FlowConfiguration.h +++ b/libminifi/include/core/FlowConfiguration.h @@ -75,8 +75,8 @@ class FlowConfiguration : public CoreComponent { // Create Processor (Node/Input/Output Port) based on the name std::shared_ptr<core::Processor> createProcessor(std::string name, uuid_t uuid); // Create Root Processor Group - std::unique_ptr<core::ProcessGroup> createRootProcessGroup(std::string name, uuid_t uuid); - + std::unique_ptr<core::ProcessGroup> createRootProcessGroup(std::string name, + uuid_t uuid, int version); std::shared_ptr<core::controller::ControllerServiceNode> createControllerService(const std::string &class_name, const std::string &name, uuid_t uuid); // Create Remote Processor Group @@ -98,6 +98,11 @@ class FlowConfiguration : public CoreComponent { return getRoot(config_path_); } + virtual std::unique_ptr<core::ProcessGroup> getRootFromPayload( + std::string &yamlConfigPayload) { + return nullptr; + } + /** * Base implementation that returns a null root pointer. * @return Extensions should return a non-null pointer in order to http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/include/core/ProcessGroup.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h index f54f5b4..4978886 100644 --- a/libminifi/include/core/ProcessGroup.h +++ b/libminifi/include/core/ProcessGroup.h @@ -55,7 +55,8 @@ class ProcessGroup { /*! * Create a new process group */ - ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid = NULL, ProcessGroup *parent = NULL); + ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid = NULL, int version = 0, + ProcessGroup *parent = NULL); // Destructor virtual ~ProcessGroup(); // Set Processor Name @@ -109,6 +110,10 @@ class ProcessGroup { } else return false; } + // getVersion + int getVersion() { + return version_; + } // Start Processing void startProcessing(TimerDrivenSchedulingAgent *timeScheduler, EventDrivenSchedulingAgent *eventScheduler); // Stop Processing @@ -165,6 +170,8 @@ class ProcessGroup { uuid_t uuid_; // Processor Group Name std::string name_; + // version + int version_; // Process Group Type ProcessGroupType type_; // Processors (ProcessNode) inside this process group which include Input/Output Port, Remote Process Group input/Output port http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/include/core/repository/FlowFileRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/repository/FlowFileRepository.h b/libminifi/include/core/repository/FlowFileRepository.h index b7c43b2..2e19286 100644 --- a/libminifi/include/core/repository/FlowFileRepository.h +++ b/libminifi/include/core/repository/FlowFileRepository.h @@ -37,7 +37,7 @@ namespace repository { #define FLOWFILE_REPOSITORY_DIRECTORY "./flowfile_repository" #define MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE (10*1024*1024) // 10M #define MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME (600000) // 10 minute -#define FLOWFILE_REPOSITORY_PURGE_PERIOD (2500) // 2500 msec +#define FLOWFILE_REPOSITORY_PURGE_PERIOD (2000) // 2000 msec /** * Flow File repository http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/include/core/yaml/YamlConfiguration.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/yaml/YamlConfiguration.h b/libminifi/include/core/yaml/YamlConfiguration.h index 3bfaefd..61bf271 100644 --- a/libminifi/include/core/yaml/YamlConfiguration.h +++ b/libminifi/include/core/yaml/YamlConfiguration.h @@ -91,6 +91,22 @@ class YamlConfiguration : public FlowConfiguration { return getRoot(&rootYamlNode); } + /** + * Returns a shared pointer to a ProcessGroup object containing the + * flow configuration. The yamlConfigPayload argument must be + * a payload for the raw YAML configuration. + * + * @param yamlConfigPayload an input payload for the raw YAML configuration + * to be parsed and loaded into the flow + * configuration tree + * @return the root ProcessGroup node of the flow + * configuration tree + */ + std::unique_ptr<core::ProcessGroup> getRootFromPayload(std::string &yamlConfigPayload) { + YAML::Node rootYamlNode = YAML::Load(yamlConfigPayload); + return getRoot(&rootYamlNode); + } + protected: /** http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/include/processors/InvokeHTTP.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/InvokeHTTP.h b/libminifi/include/processors/InvokeHTTP.h index ab78fd5..c8e0c10 100644 --- a/libminifi/include/processors/InvokeHTTP.h +++ b/libminifi/include/processors/InvokeHTTP.h @@ -32,6 +32,7 @@ #include "controllers/SSLContextService.h" #include "utils/ByteInputCallBack.h" #include "core/logging/LoggerConfiguration.h" +#include "utils/HTTPUtils.h" namespace org { namespace apache { @@ -39,63 +40,6 @@ namespace nifi { namespace minifi { namespace processors { -struct CallBackPosition { - utils::ByteInputCallBack *ptr; - size_t pos; -}; - -/** - * HTTP Response object - */ -struct HTTPRequestResponse { - std::vector<char> data; - - /** - * Receive HTTP Response. - */ - static size_t recieve_write(char * data, size_t size, size_t nmemb, void * p) { - return static_cast<HTTPRequestResponse*>(p)->write_content(data, size, nmemb); - } - - /** - * Callback for post, put, and patch operations - * @param buffer - * @param size size of buffer - * @param nitems items to add - * @param insteam input stream object. - */ - - static size_t send_write(char * data, size_t size, size_t nmemb, void * p) { - if (p != 0) { - CallBackPosition *callback = (CallBackPosition*) p; - if (callback->pos <= callback->ptr->getBufferSize()) { - char *ptr = callback->ptr->getBuffer(); - int len = callback->ptr->getBufferSize() - callback->pos; - if (len <= 0) { - delete callback->ptr; - delete callback; - return 0; - } - if (len > size * nmemb) - len = size * nmemb; - memcpy(data, callback->ptr->getBuffer() + callback->pos, len); - callback->pos += len; - return len; - } - } else { - return CURL_READFUNC_ABORT; - } - - return 0; - } - - size_t write_content(char* ptr, size_t size, size_t nmemb) { - data.insert(data.end(), ptr, ptr + size * nmemb); - return size * nmemb; - } - -}; - // InvokeHTTP Class class InvokeHTTP : public core::Processor { public: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/include/properties/Configure.h ---------------------------------------------------------------------- diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h index 58f6679..fa19a18 100644 --- a/libminifi/include/properties/Configure.h +++ b/libminifi/include/properties/Configure.h @@ -54,10 +54,21 @@ class Configure : public Properties { static const char *nifi_flowfile_repository_enable; static const char *nifi_remote_input_secure; static const char *nifi_security_need_ClientAuth; + // site2site security config static const char *nifi_security_client_certificate; static const char *nifi_security_client_private_key; static const char *nifi_security_client_pass_phrase; static const char *nifi_security_client_ca_certificate; + static const char *nifi_configuration_listener_pull_interval; + static const char *nifi_configuration_listener_http_url; + static const char *nifi_configuration_listener_rest_url; + static const char *nifi_configuration_listener_type; // http or rest + // configuration listener security config + static const char *nifi_configuration_listener_need_ClientAuth; + static const char *nifi_configuration_listener_client_certificate; + static const char *nifi_configuration_listener_private_key; + static const char *nifi_configuration_listener_client_pass_phrase; + static const char *nifi_configuration_listener_client_ca_certificate; }; } /* namespace minifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/include/utils/HTTPUtils.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/HTTPUtils.h b/libminifi/include/utils/HTTPUtils.h new file mode 100644 index 0000000..3f20f5e --- /dev/null +++ b/libminifi/include/utils/HTTPUtils.h @@ -0,0 +1,97 @@ +/** + * HTTPUtils class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __HTTP_UTILS_H__ +#define __HTTP_UTILS_H__ + +#include <curl/curl.h> +#include <vector> +#include "ByteInputCallBack.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +struct CallBackPosition { + ByteInputCallBack *ptr; + size_t pos; +}; + +/** + * HTTP Response object + */ +struct HTTPRequestResponse { + std::vector<char> data; + + /** + * Receive HTTP Response. + */ + static size_t recieve_write(char * data, size_t size, size_t nmemb, + void * p) { + return static_cast<HTTPRequestResponse*>(p)->write_content(data, size, + nmemb); + } + + /** + * Callback for post, put, and patch operations + * @param buffer + * @param size size of buffer + * @param nitems items to add + * @param insteam input stream object. + */ + + static size_t send_write(char * data, size_t size, size_t nmemb, void * p) { + if (p != 0) { + CallBackPosition *callback = (CallBackPosition*) p; + if (callback->pos <= callback->ptr->getBufferSize()) { + char *ptr = callback->ptr->getBuffer(); + int len = callback->ptr->getBufferSize() - callback->pos; + if (len <= 0) { + delete callback->ptr; + delete callback; + return 0; + } + if (len > size * nmemb) + len = size * nmemb; + memcpy(data, callback->ptr->getBuffer() + callback->pos, len); + callback->pos += len; + return len; + } + } else { + return CURL_READFUNC_ABORT; + } + + return 0; + } + + size_t write_content(char* ptr, size_t size, size_t nmemb) { + data.insert(data.end(), ptr, ptr + size * nmemb); + return size * nmemb; + } + +}; + +} /* namespace utils */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/src/ConfigurationListener.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/ConfigurationListener.cpp b/libminifi/src/ConfigurationListener.cpp new file mode 100644 index 0000000..d52a088 --- /dev/null +++ b/libminifi/src/ConfigurationListener.cpp @@ -0,0 +1,131 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "ConfigurationListener.h" +#include "FlowController.h" +#include <openssl/ssl.h> +#include <openssl/err.h> +#include <string> +#include <memory> +#include <utility> + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { + +void ConfigurationListener::start() { + if (running_) + return; + + pull_interval_ = 60 * 1000; + std::string value; + // grab the value for configuration + if (configure_->get(Configure::nifi_configuration_listener_pull_interval, + value)) { + core::TimeUnit unit; + if (core::Property::StringToTime(value, pull_interval_, unit) + && core::Property::ConvertTimeUnitToMS(pull_interval_, unit, + pull_interval_)) { + logger_->log_info("Configuration Listener pull interval: [%d] ms", + pull_interval_); + } + } + + std::string clientAuthStr; + if (configure_->get(Configure::nifi_configuration_listener_need_ClientAuth, clientAuthStr)) { + org::apache::nifi::minifi::utils::StringUtils::StringToBool(clientAuthStr, this->need_client_certificate_); + } + + if (configure_->get( + Configure::nifi_configuration_listener_client_ca_certificate, + this->ca_certificate_)) { + logger_->log_info("Configuration Listener CA certificates: [%s]", + this->ca_certificate_); + } + + if (this->need_client_certificate_) { + std::string passphrase_file; + + if (!(configure_->get( + Configure::nifi_configuration_listener_client_certificate, this->certificate_) + && configure_->get(Configure::nifi_configuration_listener_private_key, + this->private_key_))) { + logger_->log_error( + "Certificate and Private Key PEM file not configured for configuration listener, error: %s.", + std::strerror(errno)); + } + + if (configure_->get( + Configure::nifi_configuration_listener_client_pass_phrase, + passphrase_file)) { + // load the passphase from file + std::ifstream file(passphrase_file.c_str(), std::ifstream::in); + if (file.good()) { + this->passphrase_.assign((std::istreambuf_iterator<char>(file)), + std::istreambuf_iterator<char>()); + file.close(); + } + } + + logger_->log_info("Configuration Listener certificate: [%s], private key: [%s], passphrase file: [%s]", + this->certificate_, this->private_key_, passphrase_file); + } + + thread_ = std::thread(&ConfigurationListener::threadExecutor, this); + thread_.detach(); + running_ = true; + logger_->log_info("%s ConfigurationListener Thread Start", type_); +} + +void ConfigurationListener::stop() { + if (!running_) + return; + running_ = false; + if (thread_.joinable()) + thread_.join(); + logger_->log_info("%s ConfigurationListener Thread Stop", type_); +} + +void ConfigurationListener::run() { + std::unique_lock<std::mutex> lk(mutex_); + std::condition_variable cv; + int64_t interval = 0; + while (!cv.wait_for(lk, std::chrono::milliseconds(100), [this] {return (running_ == false);})) { + interval += 100; + if (interval >= pull_interval_) { + std::string payload; + bool ret = false; + ret = pullConfiguration(payload); + if (ret) { + if (payload.empty() || payload == lastAppliedConfiguration) { + interval = 0; + continue; + } + ret = this->controller_->applyConfiguration(payload); + if (ret) + this->lastAppliedConfiguration = payload; + } + interval = 0; + } + } +} + +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/src/Configure.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp index d8e049c..e1bc225 100644 --- a/libminifi/src/Configure.cpp +++ b/libminifi/src/Configure.cpp @@ -43,11 +43,35 @@ const char *Configure::nifi_flowfile_repository_max_storage_size = "nifi.flowfil const char *Configure::nifi_flowfile_repository_max_storage_time = "nifi.flowfile.repository.max.storage.time"; const char *Configure::nifi_flowfile_repository_directory_default = "nifi.flowfile.repository.directory.default"; const char *Configure::nifi_remote_input_secure = "nifi.remote.input.secure"; -const char *Configure::nifi_security_need_ClientAuth = "nifi.security.need.ClientAuth"; -const char *Configure::nifi_security_client_certificate = "nifi.security.client.certificate"; -const char *Configure::nifi_security_client_private_key = "nifi.security.client.private.key"; -const char *Configure::nifi_security_client_pass_phrase = "nifi.security.client.pass.phrase"; -const char *Configure::nifi_security_client_ca_certificate = "nifi.security.client.ca.certificate"; +const char *Configure::nifi_security_need_ClientAuth = + "nifi.security.need.ClientAuth"; +const char *Configure::nifi_security_client_certificate = + "nifi.security.client.certificate"; +const char *Configure::nifi_security_client_private_key = + "nifi.security.client.private.key"; +const char *Configure::nifi_security_client_pass_phrase = + "nifi.security.client.pass.phrase"; +const char *Configure::nifi_security_client_ca_certificate = + "nifi.security.client.ca.certificate"; +const char *Configure::nifi_configuration_listener_pull_interval = + "nifi.configuration.listener.pull.interval"; +const char *Configure::nifi_configuration_listener_http_url = + "nifi.configuration.listener.http.url"; +const char *Configure::nifi_configuration_listener_rest_url = + "nifi.configuration.listener.rest.url"; +const char *Configure::nifi_configuration_listener_type = + "nifi.configuration.listener.type"; +const char *Configure::nifi_configuration_listener_need_ClientAuth = + "nifi.configuration.listener.need.ClientAuth"; +const char *Configure::nifi_configuration_listener_client_certificate = + "nifi.configuration.listener.client.certificate"; +const char *Configure::nifi_configuration_listener_private_key = + "nifi.configuration.listener.client.private.key"; +const char *Configure::nifi_configuration_listener_client_pass_phrase = + "nifi.configuration.listener.client.pass.phrase"; +const char *Configure::nifi_configuration_listener_client_ca_certificate = + "nifi.configuration.listener.client.ca.certificate"; + } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/src/FlowController.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index 62cf21c..fd75fdd 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -33,6 +33,7 @@ #include <utility> #include <memory> #include <string> +#include "yaml-cpp/yaml.h" #include "core/ProcessContext.h" #include "core/ProcessGroup.h" #include "utils/StringUtils.h" @@ -152,6 +153,31 @@ FlowController::~FlowController() { provenance_repo_ = nullptr; } +bool FlowController::applyConfiguration(std::string &configurePayload) { + std::unique_ptr<core::ProcessGroup> newRoot; + try { + newRoot = std::move(flow_configuration_->getRootFromPayload(configurePayload)); + } + catch (const YAML::Exception& e) { + logger_->log_error("Invalid configuration payload"); + return false; + } + + if (newRoot == nullptr) + return false; + + logger_->log_info("Starting to reload Flow Controller with flow control name %s, version %d", + newRoot->getName().c_str(), newRoot->getVersion()); + + std::lock_guard<std::recursive_mutex> flow_lock(mutex_); + stop(true); + waitUnload(30000); + this->root_ = std::move(newRoot); + loadFlowRepo(); + initialized_ = true; + return start(); +} + void FlowController::stop(bool force) { std::lock_guard<std::recursive_mutex> flow_lock(mutex_); if (running_) { @@ -164,7 +190,7 @@ void FlowController::stop(bool force) { this->flow_file_repo_->stop(); this->provenance_repo_->stop(); // Wait for sometime for thread stop - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + std::this_thread::sleep_for(std::chrono::milliseconds(3000)); if (this->root_) this->root_->stopProcessing(this->timer_scheduler_.get(), this->event_scheduler_.get()); } @@ -214,6 +240,15 @@ void FlowController::load() { stop(true); } if (!initialized_) { + std::string listenerType; + // grab the value for configuration + if (this->http_configuration_listener_ == nullptr && configuration_->get(Configure::nifi_configuration_listener_type, listenerType)) { + if (listenerType == "http") { + this->http_configuration_listener_ = + std::unique_ptr<minifi::HttpConfigurationListener>(new minifi::HttpConfigurationListener(shared_from_this(), configuration_)); + } + } + logger_->log_info("Initializing timers"); if (nullptr == timer_scheduler_) { timer_scheduler_ = std::make_shared<TimerDrivenSchedulingAgent>(std::static_pointer_cast<core::controller::ControllerServiceProvider>(shared_from_this()), provenance_repo_, configuration_); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/src/HttpConfigurationListener.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/HttpConfigurationListener.cpp b/libminifi/src/HttpConfigurationListener.cpp new file mode 100644 index 0000000..70d5793 --- /dev/null +++ b/libminifi/src/HttpConfigurationListener.cpp @@ -0,0 +1,167 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "HttpConfigurationListener.h" +#include "FlowController.h" +#include <curl/curlbuild.h> +#include <curl/easy.h> +#include <iostream> +#include <iterator> +#include <string> +#include <vector> +#include <utility> + +#include "core/logging/Logger.h" +#include "core/ProcessContext.h" +#include "core/Relationship.h" +#include "io/DataStream.h" +#include "io/StreamFactory.h" +#include "utils/StringUtils.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { + +int HttpConfigurationListener::pemPassWordCb(char *buf, int size, int rwflag, + void *param) { + minifi::HttpConfigurationListener *listener = + static_cast<minifi::HttpConfigurationListener*>(param); + + if (listener->passphrase_.length() > 0) { + memset(buf, 0x00, size); + memcpy(buf, listener->passphrase_.c_str(), + listener->passphrase_.length() - 1); + return listener->passphrase_.length() - 1; + } + return 0; +} + +CURLcode HttpConfigurationListener::configureSSLContext(CURL *curl, void *ctx, + void *param) { + minifi::HttpConfigurationListener *listener = + static_cast<minifi::HttpConfigurationListener*>(param); + SSL_CTX* sslCtx = static_cast<SSL_CTX*>(ctx); + + SSL_CTX_load_verify_locations(sslCtx, listener->ca_certificate_.c_str(), 0); + SSL_CTX_use_certificate_file(sslCtx, listener->certificate_.c_str(), + SSL_FILETYPE_PEM); + SSL_CTX_set_default_passwd_cb(sslCtx, + HttpConfigurationListener::pemPassWordCb); + SSL_CTX_set_default_passwd_cb_userdata(sslCtx, param); + SSL_CTX_use_PrivateKey_file(sslCtx, listener->private_key_.c_str(), + SSL_FILETYPE_PEM); + // verify private key + if (!SSL_CTX_check_private_key(sslCtx)) { + listener->logger_->log_error( + "Private key does not match the public certificate, error : %s", + std::strerror(errno)); + return CURLE_FAILED_INIT; + } + + listener->logger_->log_debug( + "HttpConfigurationListener load Client Certificates OK"); + return CURLE_OK; +} + +void HttpConfigurationListener::configureSecureConnection(CURL *http_session) { + curl_easy_setopt(http_session, CURLOPT_VERBOSE, 1L); + curl_easy_setopt(http_session, CURLOPT_CAINFO, this->ca_certificate_.c_str()); + curl_easy_setopt(http_session, CURLOPT_SSLCERTTYPE, "PEM"); + curl_easy_setopt(http_session, CURLOPT_SSL_VERIFYPEER, 1L); + if (this->need_client_certificate_) { + CURLcode ret; + ret = curl_easy_setopt(http_session, CURLOPT_SSL_CTX_FUNCTION, + &HttpConfigurationListener::configureSSLContext); + if (ret != CURLE_OK) + logger_->log_error("CURLOPT_SSL_CTX_FUNCTION not supported %d", ret); + curl_easy_setopt(http_session, CURLOPT_SSL_CTX_DATA, + static_cast<void*>(this)); + curl_easy_setopt(http_session, CURLOPT_SSLKEYTYPE, "PEM"); + } +} + +bool HttpConfigurationListener::pullConfiguration(std::string &configuration) { + if (url_.empty()) + return false; + + bool ret = false; + + std::string fullUrl = url_; + + CURL *http_session = curl_easy_init(); + + curl_easy_setopt(http_session, CURLOPT_URL, fullUrl.c_str()); + + if (connect_timeout_ > 0) { + curl_easy_setopt(http_session, CURLOPT_TIMEOUT, connect_timeout_); + } + + if (read_timeout_ > 0) { + curl_easy_setopt(http_session, CURLOPT_TIMEOUT, read_timeout_); + } + + if (fullUrl.find("https") != std::string::npos) { + configureSecureConnection(http_session); + } + + utils::HTTPRequestResponse content; + curl_easy_setopt(http_session, CURLOPT_WRITEFUNCTION, + &utils::HTTPRequestResponse::recieve_write); + + curl_easy_setopt(http_session, CURLOPT_WRITEDATA, + static_cast<void*>(&content)); + + CURLcode res = curl_easy_perform(http_session); + + if (res == CURLE_OK) { + logger_->log_debug("HttpConfigurationListener -- curl successful to %s", + fullUrl.c_str()); + + std::string response_body(content.data.begin(), content.data.end()); + int64_t http_code = 0; + curl_easy_getinfo(http_session, CURLINFO_RESPONSE_CODE, &http_code); + char *content_type; + /* ask for the content-type */ + curl_easy_getinfo(http_session, CURLINFO_CONTENT_TYPE, &content_type); + + bool isSuccess = ((int32_t) (http_code / 100)) == 2 + && res != CURLE_ABORTED_BY_CALLBACK; + bool body_empty = IsNullOrEmpty(content.data); + + if (isSuccess && !body_empty) { + configuration = std::move(response_body); + logger_->log_debug("config %s", configuration.c_str()); + ret = true; + } else { + logger_->log_error("Cannot output body to content"); + } + } else { + logger_->log_error( + "HttpConfigurationListener -- curl_easy_perform() failed %s\n", + curl_easy_strerror(res)); + } + curl_easy_cleanup(http_session); + + return ret; +} + +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/src/core/FlowConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp index 6635701..cc6e0e5 100644 --- a/libminifi/src/core/FlowConfiguration.cpp +++ b/libminifi/src/core/FlowConfiguration.cpp @@ -53,8 +53,10 @@ std::shared_ptr<core::Processor> FlowConfiguration::createProvenanceReportTask() return processor; } -std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRootProcessGroup(std::string name, uuid_t uuid) { - return std::unique_ptr<core::ProcessGroup>(new core::ProcessGroup(core::ROOT_PROCESS_GROUP, name, uuid)); +std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRootProcessGroup( + std::string name, uuid_t uuid, int version) { + return std::unique_ptr<core::ProcessGroup>( + new core::ProcessGroup(core::ROOT_PROCESS_GROUP, name, uuid, version)); } std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRemoteProcessGroup(std::string name, uuid_t uuid) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/src/core/ProcessGroup.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp index 9e6778c..7ac139b 100644 --- a/libminifi/src/core/ProcessGroup.cpp +++ b/libminifi/src/core/ProcessGroup.cpp @@ -37,10 +37,12 @@ namespace nifi { namespace minifi { namespace core { -ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid, ProcessGroup *parent) +ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid, int version, + ProcessGroup *parent) : logger_(logging::LoggerFactory<ProcessGroup>::getLogger()), name_(name), type_(type), + version_(version), parent_process_group_(parent) { if (!uuid) // Generate the global UUID for the flow record http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/src/core/yaml/YamlConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp index a11db2b..44aec12 100644 --- a/libminifi/src/core/yaml/YamlConfiguration.cpp +++ b/libminifi/src/core/yaml/YamlConfiguration.cpp @@ -31,14 +31,24 @@ namespace core { core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml(YAML::Node rootFlowNode) { uuid_t uuid; + int64_t version = 0; checkRequiredField(&rootFlowNode, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY); std::string flowName = rootFlowNode["name"].as<std::string>(); std::string id = getOrGenerateId(&rootFlowNode); uuid_parse(id.c_str(), uuid); - logger_->log_debug("parseRootProcessGroup: id => [%s], name => [%s]", id, flowName); - std::unique_ptr<core::ProcessGroup> group = FlowConfiguration::createRootProcessGroup(flowName, uuid); + if (rootFlowNode["version"]) { + std::string value = rootFlowNode["version"].as<std::string>(); + if (core::Property::StringToInt(value, version)) { + logger_->log_debug("parseRootProcessorGroup: version => [%d]", version); + } + } + + logger_->log_debug( + "parseRootProcessGroup: id => [%s], name => [%s]", id, flowName); + std::unique_ptr<core::ProcessGroup> group = + FlowConfiguration::createRootProcessGroup(flowName, uuid, version); this->name_ = flowName; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/src/processors/InvokeHTTP.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/InvokeHTTP.cpp b/libminifi/src/processors/InvokeHTTP.cpp index fd39a64..c636201 100644 --- a/libminifi/src/processors/InvokeHTTP.cpp +++ b/libminifi/src/processors/InvokeHTTP.cpp @@ -315,8 +315,9 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession * if (read_timeout_ > 0) { curl_easy_setopt(http_session, CURLOPT_TIMEOUT, read_timeout_); } - HTTPRequestResponse content; - curl_easy_setopt(http_session, CURLOPT_WRITEFUNCTION, &HTTPRequestResponse::recieve_write); + utils::HTTPRequestResponse content; + curl_easy_setopt(http_session, CURLOPT_WRITEFUNCTION, + &utils::HTTPRequestResponse::recieve_write); curl_easy_setopt(http_session, CURLOPT_WRITEDATA, static_cast<void*>(&content)); @@ -326,14 +327,17 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession * if (claim) { utils::ByteInputCallBack *callback = new utils::ByteInputCallBack(); session->read(flowFile, callback); - CallBackPosition *callbackObj = new CallBackPosition; + utils::CallBackPosition *callbackObj = new utils::CallBackPosition; callbackObj->ptr = callback; callbackObj->pos = 0; logger_->log_info("InvokeHTTP -- Setting callback"); curl_easy_setopt(http_session, CURLOPT_UPLOAD, 1L); - curl_easy_setopt(http_session, CURLOPT_INFILESIZE_LARGE, (curl_off_t)callback->getBufferSize()); - curl_easy_setopt(http_session, CURLOPT_READFUNCTION, &HTTPRequestResponse::send_write); - curl_easy_setopt(http_session, CURLOPT_READDATA, static_cast<void*>(callbackObj)); + curl_easy_setopt(http_session, CURLOPT_INFILESIZE_LARGE, + (curl_off_t)callback->getBufferSize()); + curl_easy_setopt(http_session, CURLOPT_READFUNCTION, + &utils::HTTPRequestResponse::send_write); + curl_easy_setopt(http_session, CURLOPT_READDATA, + static_cast<void*>(callbackObj)); } else { logger_->log_error("InvokeHTTP -- no resource claim"); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a1716dc/libminifi/test/integration/HttpConfigurationListenerTest.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/integration/HttpConfigurationListenerTest.cpp b/libminifi/test/integration/HttpConfigurationListenerTest.cpp new file mode 100644 index 0000000..a86b884 --- /dev/null +++ b/libminifi/test/integration/HttpConfigurationListenerTest.cpp @@ -0,0 +1,144 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <sys/stat.h> +#include <cassert> +#include <utility> +#include <chrono> +#include <fstream> +#include <memory> +#include <string> +#include <thread> +#include <type_traits> +#include <vector> +#include <iostream> +#include <sstream> +#include "../TestBase.h" +#include "utils/StringUtils.h" +#include "core/Core.h" +#include "../include/core/logging/Logger.h" +#include "core/ProcessGroup.h" +#include "core/yaml/YamlConfiguration.h" +#include "HttpConfigurationListener.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "../unit/ProvenanceTestHelper.h" +#include "io/StreamFactory.h" +#include "CivetServer.h" +#include <cstring> + +void waitToVerifyProcessor() { + std::this_thread::sleep_for(std::chrono::seconds(10)); +} + +class ConfigHandler: public CivetHandler { + public: + bool handleGet(CivetServer *server, struct mg_connection *conn) { + std::ifstream myfile(test_file_location_.c_str()); + + if (myfile.is_open()) { + std::stringstream buffer; + buffer << myfile.rdbuf(); + std::string str = buffer.str(); + myfile.close(); + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + str.length()); + mg_printf(conn, "%s", str.c_str()); + } else { + mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n"); + } + + return true; + } + std::string test_file_location_; +}; + +int main(int argc, char **argv) { + LogTestController::getInstance().setInfo<minifi::ConfigurationListener>(); + LogTestController::getInstance().setInfo<minifi::FlowController>(); + LogTestController::getInstance().setInfo<minifi::HttpConfigurationListener>(); + + const char *options[] = { "document_root", ".", "listening_ports", "9090", 0 }; + std::vector < std::string > cpp_options; + for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { + cpp_options.push_back(options[i]); + } + + CivetServer server(cpp_options); + ConfigHandler h_ex; + server.addHandler("/config", h_ex); + LogTestController::getInstance().setDebug<minifi::ConfigurationListener>(); + std::string key_dir, test_file_location; + if (argc > 1) { + h_ex.test_file_location_ = test_file_location = argv[1]; + key_dir = argv[2]; + } + std::shared_ptr<minifi::Configure> configuration = std::make_shared< + minifi::Configure>(); + configuration->set(minifi::Configure::nifi_default_directory, key_dir); + configuration->set(minifi::Configure::nifi_configuration_listener_type, + "http"); + configuration->set( + minifi::Configure::nifi_configuration_listener_pull_interval, "1 sec"); + configuration->set(minifi::Configure::nifi_configuration_listener_http_url, + "http://localhost:9090/config"); + mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); + + std::shared_ptr<core::Repository> test_repo = + std::make_shared<TestRepository>(); + std::shared_ptr<core::Repository> test_flow_repo = std::make_shared< + TestFlowRepository>(); + + configuration->set(minifi::Configure::nifi_flow_configuration_file, + test_file_location); + + std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared + < minifi::io::StreamFactory > (configuration); + std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr + < core::YamlConfiguration + > (new core::YamlConfiguration(test_repo, test_repo, stream_factory, + configuration, test_file_location)); + std::shared_ptr<TestRepository> repo = std::static_pointer_cast + < TestRepository > (test_repo); + + std::shared_ptr<minifi::FlowController> controller = + std::make_shared < minifi::FlowController + > (test_repo, test_flow_repo, configuration, std::move(yaml_ptr), DEFAULT_ROOT_GROUP_NAME, true); + + core::YamlConfiguration yaml_config(test_repo, test_repo, stream_factory, + configuration, test_file_location); + + std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot( + test_file_location); + std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr < core::ProcessGroup + > (ptr.get()); + ptr.release(); + + controller->load(); + controller->start(); + waitToVerifyProcessor(); + + controller->waitUnload(60000); + std::string logs = LogTestController::getInstance().log_output.str(); + assert(logs.find("HttpConfigurationListener -- curl successful to http://localhost:9090/config") != std::string::npos); + assert(logs.find("Starting to reload Flow Controller with flow control name MiNiFi Flow, version 0") != std::string::npos); + LogTestController::getInstance().reset(); + rmdir("./content_repository"); + return 0; +}