Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 2e71beb9a -> a17c7e1bc


MINIFICPP-396: Make ability to configure SSL in HTTP and Socket SiteToSite 
backwards compatible

MINIFICPP-396: Fix MQTT travis failures

This closes #263.

Signed-off-by: Marc Parisi <phroc...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/a17c7e1b
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/a17c7e1b
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/a17c7e1b

Branch: refs/heads/master
Commit: a17c7e1bc5e2d5e54ce40ca4a47b6c8a1b4d4169
Parents: 2e71beb
Author: Marc Parisi <phroc...@apache.org>
Authored: Fri Feb 9 13:02:45 2018 -0500
Committer: Marc Parisi <phroc...@apache.org>
Committed: Mon Feb 12 06:09:04 2018 -0500

----------------------------------------------------------------------
 README.md                                       | 47 +++++++++++-
 extensions/http-curl/client/HTTPClient.cpp      |  5 ++
 extensions/http-curl/client/HTTPClient.h        |  2 +
 .../http-curl/sitetosite/HTTPProtocol.cpp       |  3 -
 extensions/http-curl/sitetosite/HTTPProtocol.h  | 18 +++--
 extensions/mqtt/AbstractMQTTProcessor.h         |  2 +-
 extensions/mqtt/ConsumeMQTT.h                   |  4 +-
 .../include/controllers/SSLContextService.h     | 31 ++++++++
 libminifi/include/io/tls/TLSSocket.h            |  2 +
 libminifi/include/properties/Configure.h        |  2 +
 libminifi/include/sitetosite/SiteToSiteClient.h |  7 ++
 .../include/sitetosite/SiteToSiteFactory.h      |  2 +
 libminifi/include/utils/HTTPClient.h            |  3 +
 libminifi/src/Configure.cpp                     |  2 +
 libminifi/src/RemoteProcessorGroupPort.cpp      | 80 ++++++++++++++------
 libminifi/src/controllers/SSLContextService.cpp |  4 +-
 libminifi/src/io/tls/TLSSocket.cpp              | 30 ++++++++
 17 files changed, 206 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a17c7e1b/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 1eb2d08..9b0c4cc 100644
--- a/README.md
+++ b/README.md
@@ -467,8 +467,53 @@ Additionally, users can utilize the MiNiFi Toolkit 
Converter (version 0.0.1 - sc
     if you do not want to enable client certificate base authorization
     nifi.security.need.ClientAuth=false
     
+Alternatively you may specify an SSL Context Service definition for the RPGs. 
This will link
+to a corresponding SSL Context service defined in the flow. 
+    
+To do this specify the SSL Context Service Property in your RPGs and link it 
to 
+a defined controller service. If you do not take this approach the options, 
above, will be used
+for TCP and secure HTTPS communications.
+    
+    Remote Processing Groups:
+    - name: NiFi Flow
+      id: 2438e3c8-015a-1000-79ca-83af40ec1998
+      url: http://127.0.0.1:8080/nifi
+      timeout: 30 secs
+      yield period: 5 sec
+      Input Ports:
+          - id: 2438e3c8-015a-1000-79ca-83af40ec1999
+            name: fromnifi
+            max concurrent tasks: 1
+            Properties:
+                Port: 10443
+                SSL Context Service: SSLServiceName
+                Host Name: 127.0.0.1
+      Output Ports:
+          - id: ac82e521-015c-1000-2b21-41279516e19a
+            name: tominifi
+            max concurrent tasks: 2
+            Properties:
+                Port: 10443
+                       SSL Context Service: SSLServiceName
+                       Host Name: 127.0.0.1
+       Controller Services:
+    - name: SSLServiceName
+      id: 2438e3c8-015a-1000-79ca-83af40ec1974
+      class: SSLContextService
+      Properties:
+          Client Certificate: <client cert path>
+          Private Key: < private key path > 
+          Passphrase: <passphrase path or passphrase>
+          CA Certificate: <CA cert path>
+
+If during testing you have a need to disable host or peer verification, you 
may use the following options:
+
+    # in minifi.properties
+    nifi.security.client.disable.host.verification=true
+       nifi.security.client.disable.peer.verification=true
+    
 ### HTTP SiteToSite Configuration
-To enable HTTPSiteToSite you must set the following flag to true
+To enable HTTPSiteToSite you must set the following flag to true. 
        
     nifi.remote.input.http.enabled=true
     

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a17c7e1b/extensions/http-curl/client/HTTPClient.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/client/HTTPClient.cpp 
b/extensions/http-curl/client/HTTPClient.cpp
index 31214cc..a8643d5 100644
--- a/extensions/http-curl/client/HTTPClient.cpp
+++ b/extensions/http-curl/client/HTTPClient.cpp
@@ -136,6 +136,11 @@ void HTTPClient::setDisablePeerVerification() {
   curl_easy_setopt(http_session_, CURLOPT_SSL_VERIFYPEER, 0L);
 }
 
+void HTTPClient::setDisableHostVerification(){
+  logger_->log_debug("Disabling host verification");
+  curl_easy_setopt(http_session_, CURLOPT_SSL_VERIFYHOST, 0L);
+}
+
 void HTTPClient::setConnectionTimeout(int64_t timeout) {
   connect_timeout_ = timeout;
   curl_easy_setopt(http_session_, CURLOPT_NOSIGNAL, 1);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a17c7e1b/extensions/http-curl/client/HTTPClient.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/client/HTTPClient.h 
b/extensions/http-curl/client/HTTPClient.h
index ace479c..8f61ad3 100644
--- a/extensions/http-curl/client/HTTPClient.h
+++ b/extensions/http-curl/client/HTTPClient.h
@@ -125,6 +125,8 @@ class HTTPClient : public BaseHTTPClient, public 
core::Connectable {
 
   void setDisablePeerVerification() override;
 
+  void setDisableHostVerification() override;
+
   std::string getURL() const{
     return url_;
   }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a17c7e1b/extensions/http-curl/sitetosite/HTTPProtocol.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/sitetosite/HTTPProtocol.cpp 
b/extensions/http-curl/sitetosite/HTTPProtocol.cpp
index e83c71d..0f4a605 100644
--- a/extensions/http-curl/sitetosite/HTTPProtocol.cpp
+++ b/extensions/http-curl/sitetosite/HTTPProtocol.cpp
@@ -56,11 +56,8 @@ std::shared_ptr<Transaction> 
HttpSiteToSiteClient::createTransaction(std::string
   std::stringstream uri;
   uri << getBaseURI() << "data-transfer/" << dir_str << "/" << getPortId() << 
"/transactions";
   auto client = create_http_client(uri.str(), "POST");
-
   client->appendHeader(PROTOCOL_VERSION_HEADER, "1");
-
   client->setConnectionTimeout(5);
-
   client->setContentType("application/json");
   client->appendHeader("Accept: application/json");
   client->setUseChunkedEncoding();

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a17c7e1b/extensions/http-curl/sitetosite/HTTPProtocol.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/sitetosite/HTTPProtocol.h 
b/extensions/http-curl/sitetosite/HTTPProtocol.h
index 26b59b3..32074f0 100644
--- a/extensions/http-curl/sitetosite/HTTPProtocol.h
+++ b/extensions/http-curl/sitetosite/HTTPProtocol.h
@@ -47,7 +47,6 @@
 #include "utils/Id.h"
 #include "../client/HTTPClient.h"
 
-
 namespace org {
 namespace apache {
 namespace nifi {
@@ -59,7 +58,8 @@ namespace sitetosite {
  */
 typedef struct Site2SitePeerStatus {
   std::string host_;
-  int port_;bool isSecure_;
+  int port_;
+  bool isSecure_;
 } Site2SitePeerStatus;
 
 // HttpSiteToSiteClient Class
@@ -154,7 +154,7 @@ class HttpSiteToSiteClient : public 
sitetosite::SiteToSiteClient {
   std::shared_ptr<minifi::utils::HTTPClient> openConnectionForReceive(const 
std::shared_ptr<HttpTransaction> &transaction);
 
   const std::string getBaseURI() {
-    std::string uri = "http://";;
+    std::string uri = ssl_context_service_ != nullptr ? "https://"; : "http://";;
     uri.append(peer_->getHostName());
     uri.append(":");
     uri.append(std::to_string(peer_->getPort()));
@@ -167,8 +167,16 @@ class HttpSiteToSiteClient : public 
sitetosite::SiteToSiteClient {
   const std::string parseTransactionId(const std::string &uri);
 
   std::unique_ptr<utils::HTTPClient> create_http_client(const std::string 
&uri, const std::string &method = "POST", bool setPropertyHeaders = false) {
-    std::unique_ptr<utils::HTTPClient> http_client_ = 
std::unique_ptr<utils::HTTPClient>(new minifi::utils::HTTPClient(uri, nullptr));
-    http_client_->initialize(method, uri, nullptr);
+    std::unique_ptr<utils::HTTPClient> http_client_ = 
std::unique_ptr<utils::HTTPClient>(new minifi::utils::HTTPClient(uri, 
ssl_context_service_));
+    http_client_->initialize(method, uri, ssl_context_service_);
+    if (ssl_context_service_) {
+      if (ssl_context_service_->getDisableHostVerification()) {
+        http_client_->setDisableHostVerification();
+      }
+      if (ssl_context_service_->getDisablePeerVerification()) {
+        http_client_->setDisablePeerVerification();
+      }
+    }
     if (setPropertyHeaders) {
       if (_currentVersion >= 5) {
         // batch count, size, and duratin don't appear to be set through the 
interfaces.

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a17c7e1b/extensions/mqtt/AbstractMQTTProcessor.h
----------------------------------------------------------------------
diff --git a/extensions/mqtt/AbstractMQTTProcessor.h 
b/extensions/mqtt/AbstractMQTTProcessor.h
index 7278f15..aab0ef5 100644
--- a/extensions/mqtt/AbstractMQTTProcessor.h
+++ b/extensions/mqtt/AbstractMQTTProcessor.h
@@ -150,7 +150,7 @@ class AbstractMQTTProcessor : public core::Processor {
  private:
   std::shared_ptr<logging::Logger> logger_;
   MQTTClient_SSLOptions sslopts_;
-  std::string sslEnabled_;
+  bool sslEnabled_;
   std::string securityCA_;
   std::string securityCert_;
   std::string securityPrivateKey_;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a17c7e1b/extensions/mqtt/ConsumeMQTT.h
----------------------------------------------------------------------
diff --git a/extensions/mqtt/ConsumeMQTT.h b/extensions/mqtt/ConsumeMQTT.h
index da3ca6c..0b26d42 100644
--- a/extensions/mqtt/ConsumeMQTT.h
+++ b/extensions/mqtt/ConsumeMQTT.h
@@ -20,7 +20,7 @@
 #ifndef __CONSUME_MQTT_H__
 #define __CONSUME_MQTT_H__
 
-#include <climits>
+#include <limits>
 #include <deque>
 #include "FlowFileRecord.h"
 #include "core/Processor.h"
@@ -53,7 +53,7 @@ public:
     : processors::AbstractMQTTProcessor(name, uuid), 
logger_(logging::LoggerFactory<ConsumeMQTT>::getLogger()) {
     isSubscriber_ = true;
     maxQueueSize_ = 100;
-    maxSegsize_ = ULLONG_MAX;
+    maxSegSize_ = ULLONG_MAX;
   }
   // Destructor
   virtual ~ConsumeMQTT() {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a17c7e1b/libminifi/include/controllers/SSLContextService.h
----------------------------------------------------------------------
diff --git a/libminifi/include/controllers/SSLContextService.h 
b/libminifi/include/controllers/SSLContextService.h
index 9ceb10c..f9a8a10 100644
--- a/libminifi/include/controllers/SSLContextService.h
+++ b/libminifi/include/controllers/SSLContextService.h
@@ -63,6 +63,8 @@ class SSLContextService : public 
core::controller::ControllerService {
       : ControllerService(name, id),
         initialized_(false),
         valid_(false),
+        disable_host_verification_(false),
+        disable_peer_verification_(false),
         logger_(logging::LoggerFactory<SSLContextService>::getLogger()) {
   }
 
@@ -70,6 +72,8 @@ class SSLContextService : public 
core::controller::ControllerService {
       : ControllerService(name, uuid),
         initialized_(false),
         valid_(false),
+        disable_host_verification_(false),
+        disable_peer_verification_(false),
         logger_(logging::LoggerFactory<SSLContextService>::getLogger()) {
   }
 
@@ -77,6 +81,8 @@ class SSLContextService : public 
core::controller::ControllerService {
       : ControllerService(name, nullptr),
         initialized_(false),
         valid_(false),
+        disable_host_verification_(false),
+        disable_peer_verification_(false),
         logger_(logging::LoggerFactory<SSLContextService>::getLogger()) {
     setConfiguration(configuration);
     initialize();
@@ -102,6 +108,13 @@ class SSLContextService : public 
core::controller::ControllerService {
     if (configuration_->get(Configure::nifi_security_client_ca_certificate, 
value)) {
       setProperty(caCert.getName(), value);
     }
+
+    if 
(configuration_->get(Configure::nifi_security_client_disable_host_verification, 
value)) {
+      org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, 
disable_host_verification_);
+    }
+    if 
(configuration_->get(Configure::nifi_security_client_disable_peer_verification, 
value)) {
+      org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, 
disable_peer_verification_);
+    }
   }
 
   virtual void initialize();
@@ -118,6 +131,22 @@ class SSLContextService : public 
core::controller::ControllerService {
 
   const std::string &getCACertificate();
 
+  void setDisableHostVerification() {
+    disable_host_verification_ = true;
+  }
+
+  void setDisablePeerVerification() {
+    disable_peer_verification_ = true;
+  }
+
+  bool getDisableHostVerification() const {
+    return disable_host_verification_;
+  }
+
+  bool getDisablePeerVerification() const {
+    return disable_peer_verification_;
+  }
+
   void yield() {
 
   }
@@ -193,6 +222,8 @@ class SSLContextService : public 
core::controller::ControllerService {
   std::string passphrase_;
   std::string passphrase_file_;
   std::string ca_certificate_;
+  bool disable_host_verification_;
+  bool disable_peer_verification_;
 
  private:
   std::shared_ptr<logging::Logger> logger_;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a17c7e1b/libminifi/include/io/tls/TLSSocket.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/tls/TLSSocket.h 
b/libminifi/include/io/tls/TLSSocket.h
index b8915a6..14978e4 100644
--- a/libminifi/include/io/tls/TLSSocket.h
+++ b/libminifi/include/io/tls/TLSSocket.h
@@ -157,6 +157,8 @@ class TLSSocket : public Socket {
    */
   virtual int16_t select_descriptor(const uint16_t msec);
 
+  virtual int readData(std::vector<uint8_t> &buf, int buflen);
+
   /**
    * Reads data and places it into buf
    * @param buf buffer in which we extract data

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a17c7e1b/libminifi/include/properties/Configure.h
----------------------------------------------------------------------
diff --git a/libminifi/include/properties/Configure.h 
b/libminifi/include/properties/Configure.h
index 6911ae2..374647a 100644
--- a/libminifi/include/properties/Configure.h
+++ b/libminifi/include/properties/Configure.h
@@ -63,6 +63,8 @@ class Configure : public Properties {
   static const char *nifi_security_client_private_key;
   static const char *nifi_security_client_pass_phrase;
   static const char *nifi_security_client_ca_certificate;
+  static const char *nifi_security_client_disable_host_verification;
+  static const char *nifi_security_client_disable_peer_verification;
   static const char *nifi_configuration_listener_pull_interval;
   static const char *nifi_configuration_listener_http_url;
   static const char *nifi_configuration_listener_rest_url;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a17c7e1b/libminifi/include/sitetosite/SiteToSiteClient.h
----------------------------------------------------------------------
diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h 
b/libminifi/include/sitetosite/SiteToSiteClient.h
index 3200bed..259b95e 100644
--- a/libminifi/include/sitetosite/SiteToSiteClient.h
+++ b/libminifi/include/sitetosite/SiteToSiteClient.h
@@ -59,6 +59,7 @@ class SiteToSiteClient : public core::Connectable {
       : core::Connectable("SitetoSiteClient", 0),
         peer_state_(IDLE),
         _batchSendNanos(5000000000),
+        ssl_context_service_(nullptr),
         logger_(logging::LoggerFactory<SiteToSiteClient>::getLogger()) {
     _supportedVersion[0] = 5;
     _supportedVersion[1] = 4;
@@ -76,6 +77,10 @@ class SiteToSiteClient : public core::Connectable {
 
   }
 
+  void setSSLContextService(const 
std::shared_ptr<minifi::controllers::SSLContextService> &context_service) {
+    ssl_context_service_ = context_service;
+  }
+
   /**
    * Creates a transaction using the transaction ID and the direction
    * @param transactionID transaction identifier
@@ -259,6 +264,8 @@ class SiteToSiteClient : public core::Connectable {
   uint32_t _currentCodecVersion;
   int _currentCodecVersionIndex;
 
+  std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_;
+
  private:
   std::shared_ptr<logging::Logger> logger_;
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a17c7e1b/libminifi/include/sitetosite/SiteToSiteFactory.h
----------------------------------------------------------------------
diff --git a/libminifi/include/sitetosite/SiteToSiteFactory.h 
b/libminifi/include/sitetosite/SiteToSiteFactory.h
index 35e12b9..96865b0 100644
--- a/libminifi/include/sitetosite/SiteToSiteFactory.h
+++ b/libminifi/include/sitetosite/SiteToSiteFactory.h
@@ -51,6 +51,7 @@ static std::unique_ptr<SiteToSiteClient> 
createRawSocket(const SiteToSiteClientC
   client_configuration.getPeer()->getPortId(uuid);
   auto ptr = std::unique_ptr<SiteToSiteClient>(new 
RawSiteToSiteClient(createStreamingPeer(client_configuration)));
   ptr->setPortId(uuid);
+  ptr->setSSLContextService(client_configuration.getSecurityContext());
   return ptr;
 }
 
@@ -70,6 +71,7 @@ static std::unique_ptr<SiteToSiteClient> createClient(const 
SiteToSiteClientConf
       auto http_protocol = 
core::ClassLoader::getDefaultClassLoader().instantiateRaw("HttpProtocol", 
"HttpProtocol");
       if (nullptr != http_protocol) {
         auto ptr = 
std::unique_ptr<SiteToSiteClient>(static_cast<SiteToSiteClient*>(http_protocol));
+        ptr->setSSLContextService(client_configuration.getSecurityContext());
         auto peer = std::unique_ptr<SiteToSitePeer>(new 
SiteToSitePeer(client_configuration.getPeer()->getHost(), 
client_configuration.getPeer()->getPort()));
         char idStr[37];
         uuid_unparse_lower(uuid, idStr);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a17c7e1b/libminifi/include/utils/HTTPClient.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/HTTPClient.h 
b/libminifi/include/utils/HTTPClient.h
index 0c4b179..b09b450 100644
--- a/libminifi/include/utils/HTTPClient.h
+++ b/libminifi/include/utils/HTTPClient.h
@@ -255,6 +255,9 @@ class BaseHTTPClient {
   virtual void setDisablePeerVerification() {
   }
 
+  virtual void setDisableHostVerification() {
+  }
+
   virtual const std::vector<std::string> &getHeaders() {
     return headers_;
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a17c7e1b/libminifi/src/Configure.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp
index a8ea78f..6daee37 100644
--- a/libminifi/src/Configure.cpp
+++ b/libminifi/src/Configure.cpp
@@ -52,6 +52,8 @@ const char *Configure::nifi_security_client_certificate = 
"nifi.security.client.
 const char *Configure::nifi_security_client_private_key = 
"nifi.security.client.private.key";
 const char *Configure::nifi_security_client_pass_phrase = 
"nifi.security.client.pass.phrase";
 const char *Configure::nifi_security_client_ca_certificate = 
"nifi.security.client.ca.certificate";
+const char *Configure::nifi_security_client_disable_host_verification = 
"nifi.security.client.disable.host.verification";
+const char *Configure::nifi_security_client_disable_peer_verification = 
"nifi.security.client.disable.peer.verification";
 const char *Configure::nifi_configuration_listener_pull_interval = 
"nifi.configuration.listener.pull.interval";
 const char *Configure::nifi_configuration_listener_http_url = 
"nifi.configuration.listener.http.url";
 const char *Configure::nifi_configuration_listener_rest_url = 
"nifi.configuration.listener.rest.url";

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a17c7e1b/libminifi/src/RemoteProcessorGroupPort.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp 
b/libminifi/src/RemoteProcessorGroupPort.cpp
index 436a7a7..332de0b 100644
--- a/libminifi/src/RemoteProcessorGroupPort.cpp
+++ b/libminifi/src/RemoteProcessorGroupPort.cpp
@@ -73,7 +73,7 @@ std::unique_ptr<sitetosite::SiteToSiteClient> 
RemoteProcessorGroupPort::getNextP
         std::lock_guard<std::mutex> lock(peer_mutex_);
         logger_->log_debug("Creating client from peer %ll", 
peer_index_.load());
         sitetosite::SiteToSiteClientConfiguration config(stream_factory_, 
peers_[this->peer_index_].getPeer(), client_type_);
-
+        config.setSecurityContext(ssl_service);
         peer_index_++;
         if (peer_index_ >= static_cast<int>(peers_.size())) {
           peer_index_ = 0;
@@ -123,6 +123,39 @@ void RemoteProcessorGroupPort::initialize() {
       client_type_ = sitetosite::HTTP;
     }
   }
+  logger_->log_trace("Finished initialization");
+}
+
+void RemoteProcessorGroupPort::onSchedule(const 
std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+  std::string value;
+  if (context->getProperty(portUUID.getName(), value)) {
+    uuid_parse(value.c_str(), protocol_uuid_);
+  }
+
+  std::string context_name;
+  if (!context->getProperty(SSLContext.getName(), context_name) || 
IsNullOrEmpty(context_name)) {
+    context_name = RPG_SSL_CONTEXT_SERVICE_NAME;
+  }
+  std::shared_ptr<core::controller::ControllerService> service = 
context->getControllerService(context_name);
+  if (nullptr != service) {
+    ssl_service = 
std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
+  } else {
+    std::string secureStr;
+    bool is_secure = false;
+    if (configure_->get(Configure::nifi_remote_input_secure, secureStr) && 
org::apache::nifi::minifi::utils::StringUtils::StringToBool(secureStr, 
is_secure)) {
+      ssl_service = 
std::make_shared<minifi::controllers::SSLContextService>(RPG_SSL_CONTEXT_SERVICE_NAME,
 configure_);
+      ssl_service->onEnable();
+    }
+  }
+
+  bool disable = false;
+  if (ssl_service && 
configure_->get(Configure::nifi_security_client_disable_host_verification, 
value) && org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, 
disable)) {
+    ssl_service->setDisableHostVerification();
+  }
+  disable = false;
+  if (ssl_service && 
configure_->get(Configure::nifi_security_client_disable_peer_verification, 
value) && org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, 
disable)) {
+    ssl_service->setDisablePeerVerification();
+  }
 
   std::lock_guard<std::mutex> lock(peer_mutex_);
   if (!url_.empty()) {
@@ -138,6 +171,7 @@ void RemoteProcessorGroupPort::initialize() {
     for (uint32_t i = 0; i < count; i++) {
       std::unique_ptr<sitetosite::SiteToSiteClient> nextProtocol = nullptr;
       sitetosite::SiteToSiteClientConfiguration config(stream_factory_, 
peers_[this->peer_index_].getPeer(), client_type_);
+      config.setSecurityContext(ssl_service);
       peer_index_++;
       if (peer_index_ >= static_cast<int>(peers_.size())) {
         peer_index_ = 0;
@@ -148,36 +182,20 @@ void RemoteProcessorGroupPort::initialize() {
       returnProtocol(std::move(nextProtocol));
     }
   }
-  logger_->log_trace("Finished initialization");
 }
 
-void RemoteProcessorGroupPort::notifyStop(){
+void RemoteProcessorGroupPort::notifyStop() {
   transmitting_ = false;
-  RPGLatch count(false); // we're just a monitor
+  RPGLatch count(false);  // we're just a monitor
   // we use the latch
-  while(count.getCount() > 0);
+  while (count.getCount() > 0) {
+  }
   std::unique_ptr<sitetosite::SiteToSiteClient> nextProtocol = nullptr;
-  while(available_protocols_.try_dequeue(nextProtocol)){
+  while (available_protocols_.try_dequeue(nextProtocol)) {
     // clear all protocols now
   }
 }
 
-void RemoteProcessorGroupPort::onSchedule(const 
std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  std::string value;
-  if (context->getProperty(portUUID.getName(), value)) {
-    uuid_parse(value.c_str(), protocol_uuid_);
-  }
-
-  std::string context_name;
-  if (!context->getProperty(SSLContext.getName(), context_name) || 
IsNullOrEmpty(context_name)) {
-    context_name = RPG_SSL_CONTEXT_SERVICE_NAME;
-  }
-  std::shared_ptr<core::controller::ControllerService> service = 
context->getControllerService(context_name);
-  if (nullptr != service) {
-    ssl_service = 
std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
-  }
-}
-
 void RemoteProcessorGroupPort::onTrigger(const 
std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSession> &session) {
   logger_->log_trace("On trigger %s", getUUIDStr());
   if (!transmitting_) {
@@ -253,6 +271,14 @@ void 
RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() {
     }
     client = 
std::unique_ptr<utils::BaseHTTPClient>(dynamic_cast<utils::BaseHTTPClient*>(client_ptr));
     client->initialize("GET", loginUrl, ssl_service);
+    if (ssl_service) {
+      if (ssl_service->getDisableHostVerification()) {
+        client->setDisableHostVerification();
+      }
+      if (ssl_service->getDisablePeerVerification()) {
+        client->setDisablePeerVerification();
+      }
+    }
 
     token = utils::get_token(client.get(), this->rest_user_name_, 
this->rest_password_);
     logger_->log_debug("Token from NiFi REST Api endpoint %s,  %s", loginUrl, 
token);
@@ -267,7 +293,14 @@ void 
RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() {
   }
   client = 
std::unique_ptr<utils::BaseHTTPClient>(dynamic_cast<utils::BaseHTTPClient*>(client_ptr));
   client->initialize("GET", fullUrl.c_str(), ssl_service);
-
+  if (ssl_service) {
+    if (ssl_service->getDisableHostVerification()) {
+      client->setDisableHostVerification();
+    }
+    if (ssl_service->getDisablePeerVerification()) {
+      client->setDisablePeerVerification();
+    }
+  }
   if (!token.empty()) {
     std::string header = "Authorization: " + token;
     client->appendHeader(header);
@@ -312,6 +345,7 @@ void RemoteProcessorGroupPort::refreshPeerList() {
 
   std::unique_ptr<sitetosite::SiteToSiteClient> protocol;
   sitetosite::SiteToSiteClientConfiguration config(stream_factory_, 
std::make_shared<sitetosite::Peer>(protocol_uuid_, host_, site2site_port_, 
ssl_service != nullptr), client_type_);
+  config.setSecurityContext(ssl_service);
   protocol = sitetosite::createClient(config);
 
   protocol->getPeerList(peers_);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a17c7e1b/libminifi/src/controllers/SSLContextService.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/controllers/SSLContextService.cpp 
b/libminifi/src/controllers/SSLContextService.cpp
index 1c3fd71..352ba37 100644
--- a/libminifi/src/controllers/SSLContextService.cpp
+++ b/libminifi/src/controllers/SSLContextService.cpp
@@ -79,7 +79,7 @@ std::unique_ptr<SSLContext> 
SSLContextService::createSSLContext() {
 
   int retp = SSL_CTX_load_verify_locations(ctx, ca_certificate_.c_str(), 0);
   if (retp == 0) {
-    logger_->log_error("Can not load CA certificate, Exiting, error : %s", 
std::strerror(errno));
+    logger_->log_error("Can not load CA certificate %s, Exiting, error : %s", 
ca_certificate_, std::strerror(errno));
   }
   return std::unique_ptr<SSLContext>(new SSLContext(ctx));
 }
@@ -122,8 +122,6 @@ void SSLContextService::onEnable() {
   logger_->log_trace("onEnable()");
 
   if (getProperty(property.getName(), certificate) && 
getProperty(privKey.getName(), private_key_)) {
-    logger_->log_error("Certificate and Private Key PEM file not configured, 
error: %s.", std::strerror(errno));
-
     std::ifstream cert_file(certificate);
     std::ifstream priv_file(private_key_);
     if (!cert_file.good()) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a17c7e1b/libminifi/src/io/tls/TLSSocket.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/tls/TLSSocket.cpp 
b/libminifi/src/io/tls/TLSSocket.cpp
index d51fa76..a38ce0e 100644
--- a/libminifi/src/io/tls/TLSSocket.cpp
+++ b/libminifi/src/io/tls/TLSSocket.cpp
@@ -182,6 +182,36 @@ int TLSSocket::writeData(std::vector<uint8_t>& buf, int 
buflen) {
   return Socket::writeData(buf, buflen);
 }
 
+int TLSSocket::readData(std::vector<uint8_t> &buf, int buflen) {
+  if (buf.capacity() < buflen) {
+    buf.reserve(buflen);
+  }
+  if (IsNullOrEmpty(ssl))
+    return -1;
+  int total_read = 0;
+  int status = 0;
+  int loc = 0;
+  while (buflen) {
+    int16_t fd = select_descriptor(1000);
+    if (fd <= 0) {
+      close(socket_file_descriptor_);
+      return -1;
+    }
+
+    int sslStatus;
+    do {
+      status = SSL_read(ssl, buf.data() + loc, buflen);
+      sslStatus = SSL_get_error(ssl, status);
+    } while (status < 0 && sslStatus == SSL_ERROR_WANT_READ);
+
+    buflen -= status;
+    loc += status;
+    total_read += status;
+  }
+
+  return total_read;
+}
+
 int TLSSocket::writeData(uint8_t *value, int size) {
   if (IsNullOrEmpty(ssl))
     return -1;

Reply via email to