Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 2e71beb9a -> a17c7e1bc
MINIFICPP-396: Make ability to configure SSL in HTTP and Socket SiteToSite backwards compatible MINIFICPP-396: Fix MQTT travis failures This closes #263. Signed-off-by: Marc Parisi <phroc...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/a17c7e1b Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/a17c7e1b Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/a17c7e1b Branch: refs/heads/master Commit: a17c7e1bc5e2d5e54ce40ca4a47b6c8a1b4d4169 Parents: 2e71beb Author: Marc Parisi <phroc...@apache.org> Authored: Fri Feb 9 13:02:45 2018 -0500 Committer: Marc Parisi <phroc...@apache.org> Committed: Mon Feb 12 06:09:04 2018 -0500 ---------------------------------------------------------------------- README.md | 47 +++++++++++- extensions/http-curl/client/HTTPClient.cpp | 5 ++ extensions/http-curl/client/HTTPClient.h | 2 + .../http-curl/sitetosite/HTTPProtocol.cpp | 3 - extensions/http-curl/sitetosite/HTTPProtocol.h | 18 +++-- extensions/mqtt/AbstractMQTTProcessor.h | 2 +- extensions/mqtt/ConsumeMQTT.h | 4 +- .../include/controllers/SSLContextService.h | 31 ++++++++ libminifi/include/io/tls/TLSSocket.h | 2 + libminifi/include/properties/Configure.h | 2 + libminifi/include/sitetosite/SiteToSiteClient.h | 7 ++ .../include/sitetosite/SiteToSiteFactory.h | 2 + libminifi/include/utils/HTTPClient.h | 3 + libminifi/src/Configure.cpp | 2 + libminifi/src/RemoteProcessorGroupPort.cpp | 80 ++++++++++++++------ libminifi/src/controllers/SSLContextService.cpp | 4 +- libminifi/src/io/tls/TLSSocket.cpp | 30 ++++++++ 17 files changed, 206 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a17c7e1b/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index 1eb2d08..9b0c4cc 100644 --- a/README.md +++ b/README.md @@ -467,8 +467,53 @@ Additionally, users can utilize the MiNiFi Toolkit Converter (version 0.0.1 - sc if you do not want to enable client certificate base authorization nifi.security.need.ClientAuth=false +Alternatively you may specify an SSL Context Service definition for the RPGs. This will link +to a corresponding SSL Context service defined in the flow. + +To do this specify the SSL Context Service Property in your RPGs and link it to +a defined controller service. If you do not take this approach the options, above, will be used +for TCP and secure HTTPS communications. + + Remote Processing Groups: + - name: NiFi Flow + id: 2438e3c8-015a-1000-79ca-83af40ec1998 + url: http://127.0.0.1:8080/nifi + timeout: 30 secs + yield period: 5 sec + Input Ports: + - id: 2438e3c8-015a-1000-79ca-83af40ec1999 + name: fromnifi + max concurrent tasks: 1 + Properties: + Port: 10443 + SSL Context Service: SSLServiceName + Host Name: 127.0.0.1 + Output Ports: + - id: ac82e521-015c-1000-2b21-41279516e19a + name: tominifi + max concurrent tasks: 2 + Properties: + Port: 10443 + SSL Context Service: SSLServiceName + Host Name: 127.0.0.1 + Controller Services: + - name: SSLServiceName + id: 2438e3c8-015a-1000-79ca-83af40ec1974 + class: SSLContextService + Properties: + Client Certificate: <client cert path> + Private Key: < private key path > + Passphrase: <passphrase path or passphrase> + CA Certificate: <CA cert path> + +If during testing you have a need to disable host or peer verification, you may use the following options: + + # in minifi.properties + nifi.security.client.disable.host.verification=true + nifi.security.client.disable.peer.verification=true + ### HTTP SiteToSite Configuration -To enable HTTPSiteToSite you must set the following flag to true +To enable HTTPSiteToSite you must set the following flag to true. nifi.remote.input.http.enabled=true http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a17c7e1b/extensions/http-curl/client/HTTPClient.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/client/HTTPClient.cpp b/extensions/http-curl/client/HTTPClient.cpp index 31214cc..a8643d5 100644 --- a/extensions/http-curl/client/HTTPClient.cpp +++ b/extensions/http-curl/client/HTTPClient.cpp @@ -136,6 +136,11 @@ void HTTPClient::setDisablePeerVerification() { curl_easy_setopt(http_session_, CURLOPT_SSL_VERIFYPEER, 0L); } +void HTTPClient::setDisableHostVerification(){ + logger_->log_debug("Disabling host verification"); + curl_easy_setopt(http_session_, CURLOPT_SSL_VERIFYHOST, 0L); +} + void HTTPClient::setConnectionTimeout(int64_t timeout) { connect_timeout_ = timeout; curl_easy_setopt(http_session_, CURLOPT_NOSIGNAL, 1); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a17c7e1b/extensions/http-curl/client/HTTPClient.h ---------------------------------------------------------------------- diff --git a/extensions/http-curl/client/HTTPClient.h b/extensions/http-curl/client/HTTPClient.h index ace479c..8f61ad3 100644 --- a/extensions/http-curl/client/HTTPClient.h +++ b/extensions/http-curl/client/HTTPClient.h @@ -125,6 +125,8 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable { void setDisablePeerVerification() override; + void setDisableHostVerification() override; + std::string getURL() const{ return url_; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a17c7e1b/extensions/http-curl/sitetosite/HTTPProtocol.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/sitetosite/HTTPProtocol.cpp b/extensions/http-curl/sitetosite/HTTPProtocol.cpp index e83c71d..0f4a605 100644 --- a/extensions/http-curl/sitetosite/HTTPProtocol.cpp +++ b/extensions/http-curl/sitetosite/HTTPProtocol.cpp @@ -56,11 +56,8 @@ std::shared_ptr<Transaction> HttpSiteToSiteClient::createTransaction(std::string std::stringstream uri; uri << getBaseURI() << "data-transfer/" << dir_str << "/" << getPortId() << "/transactions"; auto client = create_http_client(uri.str(), "POST"); - client->appendHeader(PROTOCOL_VERSION_HEADER, "1"); - client->setConnectionTimeout(5); - client->setContentType("application/json"); client->appendHeader("Accept: application/json"); client->setUseChunkedEncoding(); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a17c7e1b/extensions/http-curl/sitetosite/HTTPProtocol.h ---------------------------------------------------------------------- diff --git a/extensions/http-curl/sitetosite/HTTPProtocol.h b/extensions/http-curl/sitetosite/HTTPProtocol.h index 26b59b3..32074f0 100644 --- a/extensions/http-curl/sitetosite/HTTPProtocol.h +++ b/extensions/http-curl/sitetosite/HTTPProtocol.h @@ -47,7 +47,6 @@ #include "utils/Id.h" #include "../client/HTTPClient.h" - namespace org { namespace apache { namespace nifi { @@ -59,7 +58,8 @@ namespace sitetosite { */ typedef struct Site2SitePeerStatus { std::string host_; - int port_;bool isSecure_; + int port_; + bool isSecure_; } Site2SitePeerStatus; // HttpSiteToSiteClient Class @@ -154,7 +154,7 @@ class HttpSiteToSiteClient : public sitetosite::SiteToSiteClient { std::shared_ptr<minifi::utils::HTTPClient> openConnectionForReceive(const std::shared_ptr<HttpTransaction> &transaction); const std::string getBaseURI() { - std::string uri = "http://"; + std::string uri = ssl_context_service_ != nullptr ? "https://" : "http://"; uri.append(peer_->getHostName()); uri.append(":"); uri.append(std::to_string(peer_->getPort())); @@ -167,8 +167,16 @@ class HttpSiteToSiteClient : public sitetosite::SiteToSiteClient { const std::string parseTransactionId(const std::string &uri); std::unique_ptr<utils::HTTPClient> create_http_client(const std::string &uri, const std::string &method = "POST", bool setPropertyHeaders = false) { - std::unique_ptr<utils::HTTPClient> http_client_ = std::unique_ptr<utils::HTTPClient>(new minifi::utils::HTTPClient(uri, nullptr)); - http_client_->initialize(method, uri, nullptr); + std::unique_ptr<utils::HTTPClient> http_client_ = std::unique_ptr<utils::HTTPClient>(new minifi::utils::HTTPClient(uri, ssl_context_service_)); + http_client_->initialize(method, uri, ssl_context_service_); + if (ssl_context_service_) { + if (ssl_context_service_->getDisableHostVerification()) { + http_client_->setDisableHostVerification(); + } + if (ssl_context_service_->getDisablePeerVerification()) { + http_client_->setDisablePeerVerification(); + } + } if (setPropertyHeaders) { if (_currentVersion >= 5) { // batch count, size, and duratin don't appear to be set through the interfaces. http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a17c7e1b/extensions/mqtt/AbstractMQTTProcessor.h ---------------------------------------------------------------------- diff --git a/extensions/mqtt/AbstractMQTTProcessor.h b/extensions/mqtt/AbstractMQTTProcessor.h index 7278f15..aab0ef5 100644 --- a/extensions/mqtt/AbstractMQTTProcessor.h +++ b/extensions/mqtt/AbstractMQTTProcessor.h @@ -150,7 +150,7 @@ class AbstractMQTTProcessor : public core::Processor { private: std::shared_ptr<logging::Logger> logger_; MQTTClient_SSLOptions sslopts_; - std::string sslEnabled_; + bool sslEnabled_; std::string securityCA_; std::string securityCert_; std::string securityPrivateKey_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a17c7e1b/extensions/mqtt/ConsumeMQTT.h ---------------------------------------------------------------------- diff --git a/extensions/mqtt/ConsumeMQTT.h b/extensions/mqtt/ConsumeMQTT.h index da3ca6c..0b26d42 100644 --- a/extensions/mqtt/ConsumeMQTT.h +++ b/extensions/mqtt/ConsumeMQTT.h @@ -20,7 +20,7 @@ #ifndef __CONSUME_MQTT_H__ #define __CONSUME_MQTT_H__ -#include <climits> +#include <limits> #include <deque> #include "FlowFileRecord.h" #include "core/Processor.h" @@ -53,7 +53,7 @@ public: : processors::AbstractMQTTProcessor(name, uuid), logger_(logging::LoggerFactory<ConsumeMQTT>::getLogger()) { isSubscriber_ = true; maxQueueSize_ = 100; - maxSegsize_ = ULLONG_MAX; + maxSegSize_ = ULLONG_MAX; } // Destructor virtual ~ConsumeMQTT() { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a17c7e1b/libminifi/include/controllers/SSLContextService.h ---------------------------------------------------------------------- diff --git a/libminifi/include/controllers/SSLContextService.h b/libminifi/include/controllers/SSLContextService.h index 9ceb10c..f9a8a10 100644 --- a/libminifi/include/controllers/SSLContextService.h +++ b/libminifi/include/controllers/SSLContextService.h @@ -63,6 +63,8 @@ class SSLContextService : public core::controller::ControllerService { : ControllerService(name, id), initialized_(false), valid_(false), + disable_host_verification_(false), + disable_peer_verification_(false), logger_(logging::LoggerFactory<SSLContextService>::getLogger()) { } @@ -70,6 +72,8 @@ class SSLContextService : public core::controller::ControllerService { : ControllerService(name, uuid), initialized_(false), valid_(false), + disable_host_verification_(false), + disable_peer_verification_(false), logger_(logging::LoggerFactory<SSLContextService>::getLogger()) { } @@ -77,6 +81,8 @@ class SSLContextService : public core::controller::ControllerService { : ControllerService(name, nullptr), initialized_(false), valid_(false), + disable_host_verification_(false), + disable_peer_verification_(false), logger_(logging::LoggerFactory<SSLContextService>::getLogger()) { setConfiguration(configuration); initialize(); @@ -102,6 +108,13 @@ class SSLContextService : public core::controller::ControllerService { if (configuration_->get(Configure::nifi_security_client_ca_certificate, value)) { setProperty(caCert.getName(), value); } + + if (configuration_->get(Configure::nifi_security_client_disable_host_verification, value)) { + org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, disable_host_verification_); + } + if (configuration_->get(Configure::nifi_security_client_disable_peer_verification, value)) { + org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, disable_peer_verification_); + } } virtual void initialize(); @@ -118,6 +131,22 @@ class SSLContextService : public core::controller::ControllerService { const std::string &getCACertificate(); + void setDisableHostVerification() { + disable_host_verification_ = true; + } + + void setDisablePeerVerification() { + disable_peer_verification_ = true; + } + + bool getDisableHostVerification() const { + return disable_host_verification_; + } + + bool getDisablePeerVerification() const { + return disable_peer_verification_; + } + void yield() { } @@ -193,6 +222,8 @@ class SSLContextService : public core::controller::ControllerService { std::string passphrase_; std::string passphrase_file_; std::string ca_certificate_; + bool disable_host_verification_; + bool disable_peer_verification_; private: std::shared_ptr<logging::Logger> logger_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a17c7e1b/libminifi/include/io/tls/TLSSocket.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/tls/TLSSocket.h b/libminifi/include/io/tls/TLSSocket.h index b8915a6..14978e4 100644 --- a/libminifi/include/io/tls/TLSSocket.h +++ b/libminifi/include/io/tls/TLSSocket.h @@ -157,6 +157,8 @@ class TLSSocket : public Socket { */ virtual int16_t select_descriptor(const uint16_t msec); + virtual int readData(std::vector<uint8_t> &buf, int buflen); + /** * Reads data and places it into buf * @param buf buffer in which we extract data http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a17c7e1b/libminifi/include/properties/Configure.h ---------------------------------------------------------------------- diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h index 6911ae2..374647a 100644 --- a/libminifi/include/properties/Configure.h +++ b/libminifi/include/properties/Configure.h @@ -63,6 +63,8 @@ class Configure : public Properties { static const char *nifi_security_client_private_key; static const char *nifi_security_client_pass_phrase; static const char *nifi_security_client_ca_certificate; + static const char *nifi_security_client_disable_host_verification; + static const char *nifi_security_client_disable_peer_verification; static const char *nifi_configuration_listener_pull_interval; static const char *nifi_configuration_listener_http_url; static const char *nifi_configuration_listener_rest_url; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a17c7e1b/libminifi/include/sitetosite/SiteToSiteClient.h ---------------------------------------------------------------------- diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h b/libminifi/include/sitetosite/SiteToSiteClient.h index 3200bed..259b95e 100644 --- a/libminifi/include/sitetosite/SiteToSiteClient.h +++ b/libminifi/include/sitetosite/SiteToSiteClient.h @@ -59,6 +59,7 @@ class SiteToSiteClient : public core::Connectable { : core::Connectable("SitetoSiteClient", 0), peer_state_(IDLE), _batchSendNanos(5000000000), + ssl_context_service_(nullptr), logger_(logging::LoggerFactory<SiteToSiteClient>::getLogger()) { _supportedVersion[0] = 5; _supportedVersion[1] = 4; @@ -76,6 +77,10 @@ class SiteToSiteClient : public core::Connectable { } + void setSSLContextService(const std::shared_ptr<minifi::controllers::SSLContextService> &context_service) { + ssl_context_service_ = context_service; + } + /** * Creates a transaction using the transaction ID and the direction * @param transactionID transaction identifier @@ -259,6 +264,8 @@ class SiteToSiteClient : public core::Connectable { uint32_t _currentCodecVersion; int _currentCodecVersionIndex; + std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_; + private: std::shared_ptr<logging::Logger> logger_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a17c7e1b/libminifi/include/sitetosite/SiteToSiteFactory.h ---------------------------------------------------------------------- diff --git a/libminifi/include/sitetosite/SiteToSiteFactory.h b/libminifi/include/sitetosite/SiteToSiteFactory.h index 35e12b9..96865b0 100644 --- a/libminifi/include/sitetosite/SiteToSiteFactory.h +++ b/libminifi/include/sitetosite/SiteToSiteFactory.h @@ -51,6 +51,7 @@ static std::unique_ptr<SiteToSiteClient> createRawSocket(const SiteToSiteClientC client_configuration.getPeer()->getPortId(uuid); auto ptr = std::unique_ptr<SiteToSiteClient>(new RawSiteToSiteClient(createStreamingPeer(client_configuration))); ptr->setPortId(uuid); + ptr->setSSLContextService(client_configuration.getSecurityContext()); return ptr; } @@ -70,6 +71,7 @@ static std::unique_ptr<SiteToSiteClient> createClient(const SiteToSiteClientConf auto http_protocol = core::ClassLoader::getDefaultClassLoader().instantiateRaw("HttpProtocol", "HttpProtocol"); if (nullptr != http_protocol) { auto ptr = std::unique_ptr<SiteToSiteClient>(static_cast<SiteToSiteClient*>(http_protocol)); + ptr->setSSLContextService(client_configuration.getSecurityContext()); auto peer = std::unique_ptr<SiteToSitePeer>(new SiteToSitePeer(client_configuration.getPeer()->getHost(), client_configuration.getPeer()->getPort())); char idStr[37]; uuid_unparse_lower(uuid, idStr); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a17c7e1b/libminifi/include/utils/HTTPClient.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/HTTPClient.h b/libminifi/include/utils/HTTPClient.h index 0c4b179..b09b450 100644 --- a/libminifi/include/utils/HTTPClient.h +++ b/libminifi/include/utils/HTTPClient.h @@ -255,6 +255,9 @@ class BaseHTTPClient { virtual void setDisablePeerVerification() { } + virtual void setDisableHostVerification() { + } + virtual const std::vector<std::string> &getHeaders() { return headers_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a17c7e1b/libminifi/src/Configure.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp index a8ea78f..6daee37 100644 --- a/libminifi/src/Configure.cpp +++ b/libminifi/src/Configure.cpp @@ -52,6 +52,8 @@ const char *Configure::nifi_security_client_certificate = "nifi.security.client. const char *Configure::nifi_security_client_private_key = "nifi.security.client.private.key"; const char *Configure::nifi_security_client_pass_phrase = "nifi.security.client.pass.phrase"; const char *Configure::nifi_security_client_ca_certificate = "nifi.security.client.ca.certificate"; +const char *Configure::nifi_security_client_disable_host_verification = "nifi.security.client.disable.host.verification"; +const char *Configure::nifi_security_client_disable_peer_verification = "nifi.security.client.disable.peer.verification"; const char *Configure::nifi_configuration_listener_pull_interval = "nifi.configuration.listener.pull.interval"; const char *Configure::nifi_configuration_listener_http_url = "nifi.configuration.listener.http.url"; const char *Configure::nifi_configuration_listener_rest_url = "nifi.configuration.listener.rest.url"; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a17c7e1b/libminifi/src/RemoteProcessorGroupPort.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp index 436a7a7..332de0b 100644 --- a/libminifi/src/RemoteProcessorGroupPort.cpp +++ b/libminifi/src/RemoteProcessorGroupPort.cpp @@ -73,7 +73,7 @@ std::unique_ptr<sitetosite::SiteToSiteClient> RemoteProcessorGroupPort::getNextP std::lock_guard<std::mutex> lock(peer_mutex_); logger_->log_debug("Creating client from peer %ll", peer_index_.load()); sitetosite::SiteToSiteClientConfiguration config(stream_factory_, peers_[this->peer_index_].getPeer(), client_type_); - + config.setSecurityContext(ssl_service); peer_index_++; if (peer_index_ >= static_cast<int>(peers_.size())) { peer_index_ = 0; @@ -123,6 +123,39 @@ void RemoteProcessorGroupPort::initialize() { client_type_ = sitetosite::HTTP; } } + logger_->log_trace("Finished initialization"); +} + +void RemoteProcessorGroupPort::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) { + std::string value; + if (context->getProperty(portUUID.getName(), value)) { + uuid_parse(value.c_str(), protocol_uuid_); + } + + std::string context_name; + if (!context->getProperty(SSLContext.getName(), context_name) || IsNullOrEmpty(context_name)) { + context_name = RPG_SSL_CONTEXT_SERVICE_NAME; + } + std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(context_name); + if (nullptr != service) { + ssl_service = std::static_pointer_cast<minifi::controllers::SSLContextService>(service); + } else { + std::string secureStr; + bool is_secure = false; + if (configure_->get(Configure::nifi_remote_input_secure, secureStr) && org::apache::nifi::minifi::utils::StringUtils::StringToBool(secureStr, is_secure)) { + ssl_service = std::make_shared<minifi::controllers::SSLContextService>(RPG_SSL_CONTEXT_SERVICE_NAME, configure_); + ssl_service->onEnable(); + } + } + + bool disable = false; + if (ssl_service && configure_->get(Configure::nifi_security_client_disable_host_verification, value) && org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, disable)) { + ssl_service->setDisableHostVerification(); + } + disable = false; + if (ssl_service && configure_->get(Configure::nifi_security_client_disable_peer_verification, value) && org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, disable)) { + ssl_service->setDisablePeerVerification(); + } std::lock_guard<std::mutex> lock(peer_mutex_); if (!url_.empty()) { @@ -138,6 +171,7 @@ void RemoteProcessorGroupPort::initialize() { for (uint32_t i = 0; i < count; i++) { std::unique_ptr<sitetosite::SiteToSiteClient> nextProtocol = nullptr; sitetosite::SiteToSiteClientConfiguration config(stream_factory_, peers_[this->peer_index_].getPeer(), client_type_); + config.setSecurityContext(ssl_service); peer_index_++; if (peer_index_ >= static_cast<int>(peers_.size())) { peer_index_ = 0; @@ -148,36 +182,20 @@ void RemoteProcessorGroupPort::initialize() { returnProtocol(std::move(nextProtocol)); } } - logger_->log_trace("Finished initialization"); } -void RemoteProcessorGroupPort::notifyStop(){ +void RemoteProcessorGroupPort::notifyStop() { transmitting_ = false; - RPGLatch count(false); // we're just a monitor + RPGLatch count(false); // we're just a monitor // we use the latch - while(count.getCount() > 0); + while (count.getCount() > 0) { + } std::unique_ptr<sitetosite::SiteToSiteClient> nextProtocol = nullptr; - while(available_protocols_.try_dequeue(nextProtocol)){ + while (available_protocols_.try_dequeue(nextProtocol)) { // clear all protocols now } } -void RemoteProcessorGroupPort::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) { - std::string value; - if (context->getProperty(portUUID.getName(), value)) { - uuid_parse(value.c_str(), protocol_uuid_); - } - - std::string context_name; - if (!context->getProperty(SSLContext.getName(), context_name) || IsNullOrEmpty(context_name)) { - context_name = RPG_SSL_CONTEXT_SERVICE_NAME; - } - std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(context_name); - if (nullptr != service) { - ssl_service = std::static_pointer_cast<minifi::controllers::SSLContextService>(service); - } -} - void RemoteProcessorGroupPort::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) { logger_->log_trace("On trigger %s", getUUIDStr()); if (!transmitting_) { @@ -253,6 +271,14 @@ void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() { } client = std::unique_ptr<utils::BaseHTTPClient>(dynamic_cast<utils::BaseHTTPClient*>(client_ptr)); client->initialize("GET", loginUrl, ssl_service); + if (ssl_service) { + if (ssl_service->getDisableHostVerification()) { + client->setDisableHostVerification(); + } + if (ssl_service->getDisablePeerVerification()) { + client->setDisablePeerVerification(); + } + } token = utils::get_token(client.get(), this->rest_user_name_, this->rest_password_); logger_->log_debug("Token from NiFi REST Api endpoint %s, %s", loginUrl, token); @@ -267,7 +293,14 @@ void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() { } client = std::unique_ptr<utils::BaseHTTPClient>(dynamic_cast<utils::BaseHTTPClient*>(client_ptr)); client->initialize("GET", fullUrl.c_str(), ssl_service); - + if (ssl_service) { + if (ssl_service->getDisableHostVerification()) { + client->setDisableHostVerification(); + } + if (ssl_service->getDisablePeerVerification()) { + client->setDisablePeerVerification(); + } + } if (!token.empty()) { std::string header = "Authorization: " + token; client->appendHeader(header); @@ -312,6 +345,7 @@ void RemoteProcessorGroupPort::refreshPeerList() { std::unique_ptr<sitetosite::SiteToSiteClient> protocol; sitetosite::SiteToSiteClientConfiguration config(stream_factory_, std::make_shared<sitetosite::Peer>(protocol_uuid_, host_, site2site_port_, ssl_service != nullptr), client_type_); + config.setSecurityContext(ssl_service); protocol = sitetosite::createClient(config); protocol->getPeerList(peers_); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a17c7e1b/libminifi/src/controllers/SSLContextService.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/controllers/SSLContextService.cpp b/libminifi/src/controllers/SSLContextService.cpp index 1c3fd71..352ba37 100644 --- a/libminifi/src/controllers/SSLContextService.cpp +++ b/libminifi/src/controllers/SSLContextService.cpp @@ -79,7 +79,7 @@ std::unique_ptr<SSLContext> SSLContextService::createSSLContext() { int retp = SSL_CTX_load_verify_locations(ctx, ca_certificate_.c_str(), 0); if (retp == 0) { - logger_->log_error("Can not load CA certificate, Exiting, error : %s", std::strerror(errno)); + logger_->log_error("Can not load CA certificate %s, Exiting, error : %s", ca_certificate_, std::strerror(errno)); } return std::unique_ptr<SSLContext>(new SSLContext(ctx)); } @@ -122,8 +122,6 @@ void SSLContextService::onEnable() { logger_->log_trace("onEnable()"); if (getProperty(property.getName(), certificate) && getProperty(privKey.getName(), private_key_)) { - logger_->log_error("Certificate and Private Key PEM file not configured, error: %s.", std::strerror(errno)); - std::ifstream cert_file(certificate); std::ifstream priv_file(private_key_); if (!cert_file.good()) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a17c7e1b/libminifi/src/io/tls/TLSSocket.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/tls/TLSSocket.cpp b/libminifi/src/io/tls/TLSSocket.cpp index d51fa76..a38ce0e 100644 --- a/libminifi/src/io/tls/TLSSocket.cpp +++ b/libminifi/src/io/tls/TLSSocket.cpp @@ -182,6 +182,36 @@ int TLSSocket::writeData(std::vector<uint8_t>& buf, int buflen) { return Socket::writeData(buf, buflen); } +int TLSSocket::readData(std::vector<uint8_t> &buf, int buflen) { + if (buf.capacity() < buflen) { + buf.reserve(buflen); + } + if (IsNullOrEmpty(ssl)) + return -1; + int total_read = 0; + int status = 0; + int loc = 0; + while (buflen) { + int16_t fd = select_descriptor(1000); + if (fd <= 0) { + close(socket_file_descriptor_); + return -1; + } + + int sslStatus; + do { + status = SSL_read(ssl, buf.data() + loc, buflen); + sslStatus = SSL_get_error(ssl, status); + } while (status < 0 && sslStatus == SSL_ERROR_WANT_READ); + + buflen -= status; + loc += status; + total_read += status; + } + + return total_read; +} + int TLSSocket::writeData(uint8_t *value, int size) { if (IsNullOrEmpty(ssl)) return -1;