Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 77f32f10e -> 0756097aa


MINIFICPP-327: Rename PutKafka to PublishKafka

This closes #209.

Signed-off-by: Marc Parisi <phroc...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/0756097a
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/0756097a
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/0756097a

Branch: refs/heads/master
Commit: 0756097aacef069ef61990309d9456a8d0ee7790
Parents: 77f32f1
Author: Bin Qiu <benqiu2...@gmail.com>
Authored: Tue Nov 28 07:25:15 2017 -0800
Committer: Marc Parisi <phroc...@apache.org>
Committed: Tue Nov 28 14:05:48 2017 -0500

----------------------------------------------------------------------
 CMakeLists.txt                         |   2 +-
 README.md                              |   2 +-
 extensions/librdkafka/PublishKafka.cpp | 280 ++++++++++++++++++++++++++++
 extensions/librdkafka/PublishKafka.h   | 180 ++++++++++++++++++
 extensions/librdkafka/PutKafka.cpp     | 274 ---------------------------
 extensions/librdkafka/PutKafka.h       | 179 ------------------
 extensions/librdkafka/RdKafkaLoader.h  |   8 +-
 7 files changed, 466 insertions(+), 459 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0756097a/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index e2c68f5..59253eb 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -150,7 +150,7 @@ endif(ENABLE_PCAP)
 ## Create LibRdKafka Extension
 option(ENABLE_LIBRDKAFKA "Enables the librdkafka extension." OFF)
 if (ENABLE_LIBRDKAFKA)
-       createExtension(RDKAFKA-EXTENSIONS "RDKAFKA EXTENSIONS" "This Enables 
librdkafka functionality including PutKafka" "extensions/librdkafka" 
"${TEST_DIR}/kafka-tests" "TRUE" "thirdparty/librdkafka-0.11.1")
+       createExtension(RDKAFKA-EXTENSIONS "RDKAFKA EXTENSIONS" "This Enables 
librdkafka functionality including PublishKafka" "extensions/librdkafka" 
"${TEST_DIR}/kafka-tests" "TRUE" "thirdparty/librdkafka-0.11.1")
 endif()
 
 ## Scripting extensions

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0756097a/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index a21306c..ea5e45f 100644
--- a/README.md
+++ b/README.md
@@ -63,7 +63,7 @@ Perspectives of the role of MiNiFi should be from the 
perspective of the agent a
   * FocusArchive
   * UnfocusArchive
   * ManipulateArchive
-  * PutKafka 
+  * PublishKafka 
 * Provenance events generation is supported and are persisted using RocksDB. 
Volatile repositories can be used on systems without persistent storage.
 
 ## System Requirements

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0756097a/extensions/librdkafka/PublishKafka.cpp
----------------------------------------------------------------------
diff --git a/extensions/librdkafka/PublishKafka.cpp 
b/extensions/librdkafka/PublishKafka.cpp
new file mode 100644
index 0000000..dc99a80
--- /dev/null
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -0,0 +1,280 @@
+/**
+ * @file PublishKafka.cpp
+ * PublishKafka class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PublishKafka.h"
+#include <stdio.h>
+#include <algorithm>
+#include <memory>
+#include <string>
+#include <map>
+#include <set>
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property PublishKafka::SeedBrokers("Known Brokers", "A comma-separated 
list of known Kafka Brokers in the format <host>:<port>", "");
+core::Property PublishKafka::Topic("Topic Name", "The Kafka Topic of 
interest", "");
+core::Property PublishKafka::DeliveryGuarantee("Delivery Guarantee", 
"Specifies the requirement for guaranteeing that a message is sent to Kafka", 
DELIVERY_ONE_NODE);
+core::Property PublishKafka::MaxMessageSize("Max Request Size", "Maximum Kafka 
protocol request message size", "");
+core::Property PublishKafka::RequestTimeOut("Request Timeout", "The ack 
timeout of the producer request in milliseconds", "");
+core::Property PublishKafka::ClientName("Client Name", "Client Name to use 
when communicating with Kafka", "");
+core::Property PublishKafka::BatchSize("Batch Size", "Maximum number of 
messages batched in one MessageSet", "");
+core::Property PublishKafka::QueueBufferMaxTime("Queue Buffering Max Time", 
"Delay to wait for messages in the producer queue to accumulate before 
constructing message batches", "");
+core::Property PublishKafka::QueueBufferMaxSize("Queue Max Buffer Size", 
"Maximum total message size sum allowed on the producer queue", "");
+core::Property PublishKafka::QueueBufferMaxMessage("Queue Max Message", 
"Maximum number of messages allowed on the producer queue", "");
+core::Property PublishKafka::CompressCodec("Compress Codec", "compression 
codec to use for compressing message sets", COMPRESSION_CODEC_NONE);
+core::Property PublishKafka::MaxFlowSegSize("Max Flow Segment Size", "Maximum 
flow content payload segment size for the kafka record", "");
+core::Property PublishKafka::SecurityProtocol("Security Protocol", "Protocol 
used to communicate with brokers", "");
+core::Property PublishKafka::SecurityCA("Security CA", "File or directory path 
to CA certificate(s) for verifying the broker's key", "");
+core::Property PublishKafka::SecurityCert("Security Cert", "Path to client's 
public key (PEM) used for authentication", "");
+core::Property PublishKafka::SecurityPrivateKey("Security Private Key", "Path 
to client's private key (PEM) used for authentication", "");
+core::Property PublishKafka::SecurityPrivateKeyPassWord("Security Pass 
Phrase", "Private key passphrase", "");
+core::Relationship PublishKafka::Success("success", "Any FlowFile that is 
successfully sent to Kafka will be routed to this Relationship");
+core::Relationship PublishKafka::Failure("failure", "Any FlowFile that cannot 
be sent to Kafka will be routed to this Relationship");
+
+void PublishKafka::initialize() {
+  // Set the supported properties
+  std::set<core::Property> properties;
+  properties.insert(SeedBrokers);
+  properties.insert(Topic);
+  properties.insert(DeliveryGuarantee);
+  properties.insert(MaxMessageSize);
+  properties.insert(RequestTimeOut);
+  properties.insert(ClientName);
+  properties.insert(BatchSize);
+  properties.insert(QueueBufferMaxTime);
+  properties.insert(QueueBufferMaxSize);
+  properties.insert(QueueBufferMaxMessage);
+  properties.insert(CompressCodec);
+  properties.insert(MaxFlowSegSize);
+  properties.insert(SecurityProtocol);
+  properties.insert(SecurityCA);
+  properties.insert(SecurityCert);
+  properties.insert(SecurityPrivateKey);
+  properties.insert(SecurityPrivateKeyPassWord);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set<core::Relationship> relationships;
+  relationships.insert(Failure);
+  relationships.insert(Success);
+  setSupportedRelationships(relationships);
+}
+
+void PublishKafka::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
+  std::string value;
+  int64_t valInt;
+  std::string valueConf;
+  char errstr[512];
+  rd_kafka_conf_res_t result;
+
+  conf_ = rd_kafka_conf_new();
+  topic_conf_ = rd_kafka_topic_conf_new();
+
+  if (context->getProperty(SeedBrokers.getName(), value) && !value.empty()) {
+    result = rd_kafka_conf_set(conf_, "bootstrap.servers", value.c_str(), 
errstr, sizeof(errstr));
+    logger_->log_info("PublishKafka: bootstrap.servers [%s]", value);
+    if (result != RD_KAFKA_CONF_OK)
+      logger_->log_error("PublishKafka: configure error result [%s]", errstr);
+  }
+  value = "";
+  if (context->getProperty(MaxMessageSize.getName(), value) && !value.empty() 
&& core::Property::StringToInt(value, valInt)) {
+    valueConf = std::to_string(valInt);
+    result = rd_kafka_conf_set(conf_, "message.max.bytes", valueConf.c_str(), 
errstr, sizeof(errstr));
+    logger_->log_info("PublishKafka: message.max.bytes [%s]", valueConf);
+    if (result != RD_KAFKA_CONF_OK)
+      logger_->log_error("PublishKafka: configure error result [%s]", errstr);
+  }
+  value = "";
+  if (context->getProperty(ClientName.getName(), value) && !value.empty()) {
+    rd_kafka_conf_set(conf_, "client.id", value.c_str(), errstr, 
sizeof(errstr));
+    logger_->log_info("PublishKafka: client.id [%s]", value);
+    if (result != RD_KAFKA_CONF_OK)
+      logger_->log_error("PublishKafka: configure error result [%s]", errstr);
+  }
+  value = "";
+  if (context->getProperty(QueueBufferMaxMessage.getName(), value) && 
!value.empty()) {
+    rd_kafka_conf_set(conf_, "queue.buffering.max.messages", value.c_str(), 
errstr, sizeof(errstr));
+    logger_->log_info("PublishKafka: queue.buffering.max.messages [%s]", 
value);
+    if (result != RD_KAFKA_CONF_OK)
+      logger_->log_error("PublishKafka: configure error result [%s]", errstr);
+  }
+  value = "";
+  if (context->getProperty(QueueBufferMaxSize.getName(), value) && 
!value.empty() && core::Property::StringToInt(value, valInt)) {
+      valInt = valInt/1024;
+      valueConf = std::to_string(valInt);
+      rd_kafka_conf_set(conf_, "queue.buffering.max.kbytes", 
valueConf.c_str(), errstr, sizeof(errstr));
+      logger_->log_info("PublishKafka: queue.buffering.max.kbytes [%s]", 
valueConf);
+      if (result != RD_KAFKA_CONF_OK)
+        logger_->log_error("PublishKafka: configure error result [%s]", 
errstr);
+  }
+  value = "";
+  max_seg_size_ = ULLONG_MAX;
+  if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() 
&& core::Property::StringToInt(value, valInt)) {
+    max_seg_size_ = valInt;
+    logger_->log_info("PublishKafka: max flow segment size [%d]", 
max_seg_size_);
+  }
+  value = "";
+  if (context->getProperty(QueueBufferMaxTime.getName(), value) && 
!value.empty()) {
+    core::TimeUnit unit;
+    if (core::Property::StringToTime(value, valInt, unit) && 
core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
+      valueConf = std::to_string(valInt);
+      rd_kafka_conf_set(conf_, "queue.buffering.max.ms", valueConf.c_str(), 
errstr, sizeof(errstr));
+      logger_->log_info("PublishKafka: queue.buffering.max.ms [%s]", 
valueConf);
+      if (result != RD_KAFKA_CONF_OK)
+        logger_->log_error("PublishKafka: configure error result [%s]", 
errstr);
+    }
+  }
+  value = "";
+  if (context->getProperty(BatchSize.getName(), value) && !value.empty()) {
+    rd_kafka_conf_set(conf_, "batch.num.messages", value.c_str(), errstr, 
sizeof(errstr));
+    logger_->log_info("PublishKafka: batch.num.messages [%s]", value);
+    if (result != RD_KAFKA_CONF_OK)
+      logger_->log_error("PublishKafka: configure error result [%s]", errstr);
+  }
+  value = "";
+  if (context->getProperty(CompressCodec.getName(), value) && !value.empty()) {
+    rd_kafka_conf_set(conf_, "compression.codec", value.c_str(), errstr, 
sizeof(errstr));
+    logger_->log_info("PublishKafka: compression.codec [%s]", value);
+    if (result != RD_KAFKA_CONF_OK)
+      logger_->log_error("PublishKafka: configure error result [%s]", errstr);
+  }
+  value = "";
+  if (context->getProperty(DeliveryGuarantee.getName(), value) && 
!value.empty()) {
+    rd_kafka_topic_conf_set(topic_conf_, "request.required.acks", 
value.c_str(), errstr, sizeof(errstr));
+    logger_->log_info("PublishKafka: request.required.acks [%s]", value);
+    if (result != RD_KAFKA_CONF_OK)
+      logger_->log_error("PublishKafka: configure error result [%s]", errstr);
+  }
+  value = "";
+  if (context->getProperty(RequestTimeOut.getName(), value) && !value.empty()) 
{
+    core::TimeUnit unit;
+    if (core::Property::StringToTime(value, valInt, unit) && 
core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
+      valueConf = std::to_string(valInt);
+      rd_kafka_topic_conf_set(topic_conf_, "request.timeout.ms", 
valueConf.c_str(), errstr, sizeof(errstr));
+      logger_->log_info("PublishKafka: request.timeout.ms [%s]", valueConf);
+      if (result != RD_KAFKA_CONF_OK)
+        logger_->log_error("PublishKafka: configure error result [%s]", 
errstr);
+    }
+  }
+  value = "";
+  if (context->getProperty(SecurityProtocol.getName(), value) && 
!value.empty()) {
+    if (value == SECURITY_PROTOCOL_SSL) {
+      rd_kafka_conf_set(conf_, "security.protocol", value.c_str(), errstr, 
sizeof(errstr));
+      logger_->log_info("PublishKafka: security.protocol [%s]", value);
+      if (result != RD_KAFKA_CONF_OK) {
+        logger_->log_error("PublishKafka: configure error result [%s]", 
errstr);
+      } else {
+        value = "";
+        if (context->getProperty(SecurityCA.getName(), value) && 
!value.empty()) {
+          rd_kafka_conf_set(conf_, "ssl.ca.location", value.c_str(), errstr, 
sizeof(errstr));
+          logger_->log_info("PublishKafka: ssl.ca.location [%s]", value);
+          if (result != RD_KAFKA_CONF_OK)
+            logger_->log_error("PublishKafka: configure error result [%s]", 
errstr);
+        }
+        value = "";
+        if (context->getProperty(SecurityCert.getName(), value) && 
!value.empty()) {
+          rd_kafka_conf_set(conf_, "ssl.certificate.location", value.c_str(), 
errstr, sizeof(errstr));
+          logger_->log_info("PublishKafka: ssl.certificate.location [%s]", 
value);
+          if (result != RD_KAFKA_CONF_OK)
+            logger_->log_error("PublishKafka: configure error result [%s]", 
errstr);
+        }
+        value = "";
+        if (context->getProperty(SecurityPrivateKey.getName(), value) && 
!value.empty()) {
+          rd_kafka_conf_set(conf_, "ssl.key.location", value.c_str(), errstr, 
sizeof(errstr));
+          logger_->log_info("PublishKafka: ssl.key.location [%s]", value);
+          if (result != RD_KAFKA_CONF_OK)
+            logger_->log_error("PublishKafka: configure error result [%s]", 
errstr);
+        }
+        value = "";
+        if (context->getProperty(SecurityPrivateKeyPassWord.getName(), value) 
&& !value.empty()) {
+          rd_kafka_conf_set(conf_, "ssl.key.password", value.c_str(), errstr, 
sizeof(errstr));
+          logger_->log_info("PublishKafka: ssl.key.password [%s]", value);
+          if (result != RD_KAFKA_CONF_OK)
+            logger_->log_error("PublishKafka: configure error result [%s]", 
errstr);
+        }
+      }
+    }
+  }
+  value = "";
+  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
+    topic_ = value;
+    logger_->log_info("PublishKafka: topic [%s]", topic_);
+  } else {
+    logger_->log_info("PublishKafka: topic not configured");
+    return;
+  }
+
+  rk_= rd_kafka_new(RD_KAFKA_PRODUCER, conf_,
+            errstr, sizeof(errstr));
+
+  if (!rk_) {
+    logger_->log_error("Failed to create kafak producer %s", errstr);
+    return;
+  }
+
+  rkt_ = rd_kafka_topic_new(rk_, topic_.c_str(), topic_conf_);
+
+  if (!rkt_) {
+    logger_->log_error("Failed to create topic %s", errstr);
+    return;
+  }
+
+}
+
+void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSession> &session) {
+  std::shared_ptr<core::FlowFile> flowFile = session->get();
+
+  if (!flowFile) {
+    return;
+  }
+
+  if (!rk_ || !rkt_) {
+    session->transfer(flowFile, Failure);
+    return;
+  }
+
+  std::string kafkaKey = flowFile->getUUIDStr();;
+  std::string value;
+
+  if (flowFile->getAttribute(KAFKA_KEY_ATTRIBUTE, value))
+    kafkaKey = value;
+
+  PublishKafka::ReadCallback callback(flowFile->getSize(), max_seg_size_, 
kafkaKey, rkt_);
+  session->read(flowFile, &callback);
+  if (callback.status_ < 0) {
+    logger_->log_error("Failed to send flow to kafka topic %s", topic_);
+    session->transfer(flowFile, Failure);
+  } else {
+    logger_->log_debug("Sent flow with length %d to kafka topic %s", 
callback.read_size_, topic_);
+    session->transfer(flowFile, Success);
+  }
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0756097a/extensions/librdkafka/PublishKafka.h
----------------------------------------------------------------------
diff --git a/extensions/librdkafka/PublishKafka.h 
b/extensions/librdkafka/PublishKafka.h
new file mode 100644
index 0000000..65ed849
--- /dev/null
+++ b/extensions/librdkafka/PublishKafka.h
@@ -0,0 +1,180 @@
+/**
+ * @file PublishKafka.h
+ * PublishKafka class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PUT_KAFKA_H__
+#define __PUT_KAFKA_H__
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/Property.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "rdkafka.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+#define COMPRESSION_CODEC_NONE "none"
+#define COMPRESSION_CODEC_GZIP "gzip"
+#define COMPRESSION_CODEC_SNAPPY "snappy"
+#define ROUND_ROBIN_PARTITIONING "Round Robin"
+#define RANDOM_PARTITIONING "Random Robin"
+#define USER_DEFINED_PARTITIONING "User-Defined"
+#define DELIVERY_REPLICATED "all"
+#define DELIVERY_ONE_NODE "1"
+#define DELIVERY_BEST_EFFORT "0"
+#define SECURITY_PROTOCOL_PLAINTEXT "plaintext"
+#define SECURITY_PROTOCOL_SSL "ssl"
+#define SECURITY_PROTOCOL_SASL_PLAINTEXT "sasl_plaintext"
+#define SECURITY_PROTOCOL_SASL_SSL "sasl_ssl"
+#define KAFKA_KEY_ATTRIBUTE "kafka.key"
+
+// PublishKafka Class
+class PublishKafka: public core::Processor {
+public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  explicit PublishKafka(std::string name, uuid_t uuid = NULL) :
+      core::Processor(name, uuid), 
logger_(logging::LoggerFactory<PublishKafka>::getLogger()) {
+    conf_ = nullptr;
+    rk_ = nullptr;
+    topic_conf_ = nullptr;
+    rkt_ = nullptr;
+  }
+  // Destructor
+  virtual ~PublishKafka() {
+    if (rk_)
+      rd_kafka_flush(rk_, 10*1000); /* wait for max 10 seconds */
+    if (rkt_)
+      rd_kafka_topic_destroy(rkt_);
+    if (rk_)
+      rd_kafka_destroy(rk_);
+  }
+  // Processor Name
+  static constexpr char const* ProcessorName = "PublishKafka";
+  // Supported Properties
+  static core::Property SeedBrokers;
+  static core::Property Topic;
+  static core::Property DeliveryGuarantee;
+  static core::Property MaxMessageSize;
+  static core::Property RequestTimeOut;
+  static core::Property ClientName;
+  static core::Property BatchSize;
+  static core::Property QueueBufferMaxTime;
+  static core::Property QueueBufferMaxSize;
+  static core::Property QueueBufferMaxMessage;
+  static core::Property CompressCodec;
+  static core::Property MaxFlowSegSize;
+  static core::Property SecurityProtocol;
+  static core::Property SecurityCA;
+  static core::Property SecurityCert;
+  static core::Property SecurityPrivateKey;
+  static core::Property SecurityPrivateKeyPassWord;
+
+  // Supported Relationships
+  static core::Relationship Failure;
+  static core::Relationship Success;
+
+  // Nest Callback Class for read stream
+  class ReadCallback: public InputStreamCallback {
+  public:
+    ReadCallback(uint64_t flow_size, uint64_t max_seg_size, const std::string 
&key, rd_kafka_topic_t *rkt) :
+        flow_size_(flow_size), max_seg_size_(max_seg_size), key_(key), 
rkt_(rkt) {
+      status_ = 0;
+      read_size_ = 0;
+    }
+    ~ReadCallback() {
+    }
+    int64_t process(std::shared_ptr<io::BaseStream> stream) {
+      if (flow_size_ < max_seg_size_)
+        max_seg_size_ = flow_size_;
+      std::vector<unsigned char> buffer;
+      buffer.reserve(max_seg_size_);
+      read_size_ = 0;
+      status_ = 0;
+      while (read_size_ < flow_size_) {
+        int readRet = stream->read(&buffer[0], max_seg_size_);
+        if (readRet < 0) {
+          status_ = -1;
+          return read_size_;
+        }
+        if (readRet > 0) {
+          if (rd_kafka_produce(rkt_, RD_KAFKA_PARTITION_UA, 
RD_KAFKA_MSG_F_COPY, &buffer[0], readRet, key_.c_str(), key_.size(), NULL) == 
-1) {
+            status_ = -1;
+            return read_size_;
+          }
+          read_size_ += readRet;
+        } else {
+          break;
+        }
+      }
+      return read_size_;
+    }
+    uint64_t flow_size_;
+    uint64_t max_seg_size_;
+    std::string key_;
+    rd_kafka_topic_t *rkt_;
+    int status_;
+    int read_size_;
+  };
+
+public:
+  /**
+   * Function that's executed when the processor is scheduled.
+   * @param context process context.
+   * @param sessionFactory process session factory that is used when creating
+   * ProcessSession objects.
+   */
+  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory 
*sessionFactory);
+  // OnTrigger method, implemented by NiFi PublishKafka
+  virtual void onTrigger(core::ProcessContext *context, core::ProcessSession 
*session) {
+  }
+  // OnTrigger method, implemented by NiFi PublishKafka
+  virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, 
const std::shared_ptr<core::ProcessSession> &session);
+  // Initialize, over write by NiFi PublishKafka
+  virtual void initialize(void);
+
+protected:
+
+private:
+  std::shared_ptr<logging::Logger> logger_;
+  rd_kafka_conf_t *conf_;
+  rd_kafka_t *rk_;
+  rd_kafka_topic_conf_t *topic_conf_;
+  rd_kafka_topic_t *rkt_;
+  std::string topic_;
+  uint64_t max_seg_size_;
+};
+
+REGISTER_RESOURCE (PublishKafka);
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0756097a/extensions/librdkafka/PutKafka.cpp
----------------------------------------------------------------------
diff --git a/extensions/librdkafka/PutKafka.cpp 
b/extensions/librdkafka/PutKafka.cpp
deleted file mode 100644
index ed7391e..0000000
--- a/extensions/librdkafka/PutKafka.cpp
+++ /dev/null
@@ -1,274 +0,0 @@
-/**
- * @file PutKafka.cpp
- * PutKafka class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "PutKafka.h"
-#include <stdio.h>
-#include <algorithm>
-#include <memory>
-#include <string>
-#include <map>
-#include <set>
-#include "utils/TimeUtil.h"
-#include "utils/StringUtils.h"
-#include "core/ProcessContext.h"
-#include "core/ProcessSession.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-
-core::Property PutKafka::SeedBrokers("Known Brokers", "A comma-separated list 
of known Kafka Brokers in the format <host>:<port>", "");
-core::Property PutKafka::Topic("Topic Name", "The Kafka Topic of interest", 
"");
-core::Property PutKafka::DeliveryGuarantee("Delivery Guarantee", "Specifies 
the requirement for guaranteeing that a message is sent to Kafka", 
DELIVERY_ONE_NODE);
-core::Property PutKafka::MaxMessageSize("Max Request Size", "Maximum Kafka 
protocol request message size", "");
-core::Property PutKafka::RequestTimeOut("Request Timeout", "The ack timeout of 
the producer request in milliseconds", "");
-core::Property PutKafka::ClientName("Client Name", "Client Name to use when 
communicating with Kafka", "");
-core::Property PutKafka::BatchSize("Batch Size", "Maximum number of messages 
batched in one MessageSet", "");
-core::Property PutKafka::QueueBufferMaxTime("Queue Buffering Max Time", "Delay 
to wait for messages in the producer queue to accumulate before constructing 
message batches", "");
-core::Property PutKafka::QueueBufferMaxSize("Queue Max Buffer Size", "Maximum 
total message size sum allowed on the producer queue", "");
-core::Property PutKafka::QueueBufferMaxMessage("Queue Max Message", "Maximum 
number of messages allowed on the producer queue", "");
-core::Property PutKafka::CompressCodec("Compress Codec", "compression codec to 
use for compressing message sets", COMPRESSION_CODEC_NONE);
-core::Property PutKafka::MaxFlowSegSize("Max Flow Segment Size", "Maximum flow 
content payload segment size for the kafka record", "");
-core::Property PutKafka::SecurityProtocol("Security Protocol", "Protocol used 
to communicate with brokers", "");
-core::Property PutKafka::SecurityCA("Security CA", "File or directory path to 
CA certificate(s) for verifying the broker's key", "");
-core::Property PutKafka::SecurityCert("Security Cert", "Path to client's 
public key (PEM) used for authentication", "");
-core::Property PutKafka::SecurityPrivateKey("Security Private Key", "Path to 
client's private key (PEM) used for authentication", "");
-core::Property PutKafka::SecurityPrivateKeyPassWord("Security Pass Phrase", 
"Private key passphrase", "");
-core::Relationship PutKafka::Success("success", "Any FlowFile that is 
successfully sent to Kafka will be routed to this Relationship");
-core::Relationship PutKafka::Failure("failure", "Any FlowFile that cannot be 
sent to Kafka will be routed to this Relationship");
-
-void PutKafka::initialize() {
-  // Set the supported properties
-  std::set<core::Property> properties;
-  properties.insert(SeedBrokers);
-  properties.insert(Topic);
-  properties.insert(DeliveryGuarantee);
-  properties.insert(MaxMessageSize);
-  properties.insert(RequestTimeOut);
-  properties.insert(ClientName);
-  properties.insert(BatchSize);
-  properties.insert(QueueBufferMaxTime);
-  properties.insert(QueueBufferMaxSize);
-  properties.insert(QueueBufferMaxMessage);
-  properties.insert(CompressCodec);
-  properties.insert(MaxFlowSegSize);
-  properties.insert(SecurityProtocol);
-  properties.insert(SecurityCA);
-  properties.insert(SecurityCert);
-  properties.insert(SecurityPrivateKey);
-  properties.insert(SecurityPrivateKeyPassWord);
-  setSupportedProperties(properties);
-  // Set the supported relationships
-  std::set<core::Relationship> relationships;
-  relationships.insert(Failure);
-  relationships.insert(Success);
-  setSupportedRelationships(relationships);
-}
-
-void PutKafka::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
-  std::string value;
-  int64_t valInt;
-  std::string valueConf;
-  char errstr[512];
-  rd_kafka_conf_res_t result;
-
-  conf_ = rd_kafka_conf_new();
-  topic_conf_ = rd_kafka_topic_conf_new();
-
-  if (context->getProperty(SeedBrokers.getName(), value) && !value.empty()) {
-    result = rd_kafka_conf_set(conf_, "bootstrap.servers", value.c_str(), 
errstr, sizeof(errstr));
-    logger_->log_info("PutKafka: bootstrap.servers [%s]", value);
-    if (result != RD_KAFKA_CONF_OK)
-      logger_->log_error("PutKafka: configure error result [%s]", errstr);
-  }
-  value = "";
-  if (context->getProperty(MaxMessageSize.getName(), value) && !value.empty() 
&& core::Property::StringToInt(value, valInt)) {
-    valueConf = std::to_string(valInt);
-    result = rd_kafka_conf_set(conf_, "message.max.bytes", valueConf.c_str(), 
errstr, sizeof(errstr));
-    logger_->log_info("PutKafka: message.max.bytes [%s]", valueConf);
-    if (result != RD_KAFKA_CONF_OK)
-      logger_->log_error("PutKafka: configure error result [%s]", errstr);
-  }
-  value = "";
-  if (context->getProperty(ClientName.getName(), value) && !value.empty()) {
-    rd_kafka_conf_set(conf_, "client.id", value.c_str(), errstr, 
sizeof(errstr));
-    logger_->log_info("PutKafka: client.id [%s]", value);
-    if (result != RD_KAFKA_CONF_OK)
-      logger_->log_error("PutKafka: configure error result [%s]", errstr);
-  }
-  value = "";
-  if (context->getProperty(QueueBufferMaxMessage.getName(), value) && 
!value.empty()) {
-    rd_kafka_conf_set(conf_, "queue.buffering.max.messages", value.c_str(), 
errstr, sizeof(errstr));
-    logger_->log_info("PutKafka: queue.buffering.max.messages [%s]", value);
-    if (result != RD_KAFKA_CONF_OK)
-      logger_->log_error("PutKafka: configure error result [%s]", errstr);
-  }
-  value = "";
-  if (context->getProperty(QueueBufferMaxSize.getName(), value) && 
!value.empty() && core::Property::StringToInt(value, valInt)) {
-      valInt = valInt/1024;
-      valueConf = std::to_string(valInt);
-      rd_kafka_conf_set(conf_, "queue.buffering.max.kbytes", 
valueConf.c_str(), errstr, sizeof(errstr));
-      logger_->log_info("PutKafka: queue.buffering.max.kbytes [%s]", 
valueConf);
-      if (result != RD_KAFKA_CONF_OK)
-        logger_->log_error("PutKafka: configure error result [%s]", errstr);
-  }
-  value = "";
-  max_seg_size_ = ULLONG_MAX;
-  if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() 
&& core::Property::StringToInt(value, valInt)) {
-    max_seg_size_ = valInt;
-    logger_->log_info("PutKafka: max flow segment size [%d]", max_seg_size_);
-  }
-  value = "";
-  if (context->getProperty(QueueBufferMaxTime.getName(), value) && 
!value.empty()) {
-    core::TimeUnit unit;
-    if (core::Property::StringToTime(value, valInt, unit) && 
core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
-      valueConf = std::to_string(valInt);
-      rd_kafka_conf_set(conf_, "queue.buffering.max.ms", valueConf.c_str(), 
errstr, sizeof(errstr));
-      logger_->log_info("PutKafka: queue.buffering.max.ms [%s]", valueConf);
-      if (result != RD_KAFKA_CONF_OK)
-        logger_->log_error("PutKafka: configure error result [%s]", errstr);
-    }
-  }
-  value = "";
-  if (context->getProperty(BatchSize.getName(), value) && !value.empty()) {
-    rd_kafka_conf_set(conf_, "batch.num.messages", value.c_str(), errstr, 
sizeof(errstr));
-    logger_->log_info("PutKafka: batch.num.messages [%s]", value);
-    if (result != RD_KAFKA_CONF_OK)
-      logger_->log_error("PutKafka: configure error result [%s]", errstr);
-  }
-  value = "";
-  if (context->getProperty(CompressCodec.getName(), value) && !value.empty()) {
-    rd_kafka_conf_set(conf_, "compression.codec", value.c_str(), errstr, 
sizeof(errstr));
-    logger_->log_info("PutKafka: compression.codec [%s]", value);
-    if (result != RD_KAFKA_CONF_OK)
-      logger_->log_error("PutKafka: configure error result [%s]", errstr);
-  }
-  value = "";
-  if (context->getProperty(DeliveryGuarantee.getName(), value) && 
!value.empty()) {
-    rd_kafka_topic_conf_set(topic_conf_, "request.required.acks", 
value.c_str(), errstr, sizeof(errstr));
-    logger_->log_info("PutKafka: request.required.acks [%s]", value);
-    if (result != RD_KAFKA_CONF_OK)
-      logger_->log_error("PutKafka: configure error result [%s]", errstr);
-  }
-  value = "";
-  if (context->getProperty(RequestTimeOut.getName(), value) && !value.empty()) 
{
-    core::TimeUnit unit;
-    if (core::Property::StringToTime(value, valInt, unit) && 
core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
-      valueConf = std::to_string(valInt);
-      rd_kafka_topic_conf_set(topic_conf_, "request.timeout.ms", 
valueConf.c_str(), errstr, sizeof(errstr));
-      logger_->log_info("PutKafka: request.timeout.ms [%s]", valueConf);
-      if (result != RD_KAFKA_CONF_OK)
-        logger_->log_error("PutKafka: configure error result [%s]", errstr);
-    }
-  }
-  value = "";
-  if (context->getProperty(SecurityProtocol.getName(), value) && 
!value.empty()) {
-    if (value == SECURITY_PROTOCOL_SSL) {
-      rd_kafka_conf_set(conf_, "security.protocol", value.c_str(), errstr, 
sizeof(errstr));
-      logger_->log_info("PutKafka: security.protocol [%s]", value);
-      if (result != RD_KAFKA_CONF_OK) {
-        logger_->log_error("PutKafka: configure error result [%s]", errstr);
-      } else {
-        value = "";
-        if (context->getProperty(SecurityCA.getName(), value) && 
!value.empty()) {
-          rd_kafka_conf_set(conf_, "ssl.ca.location", value.c_str(), errstr, 
sizeof(errstr));
-          logger_->log_info("PutKafka: ssl.ca.location [%s]", value);
-          if (result != RD_KAFKA_CONF_OK)
-            logger_->log_error("PutKafka: configure error result [%s]", 
errstr);
-        }
-        value = "";
-        if (context->getProperty(SecurityCert.getName(), value) && 
!value.empty()) {
-          rd_kafka_conf_set(conf_, "ssl.certificate.location", value.c_str(), 
errstr, sizeof(errstr));
-          logger_->log_info("PutKafka: ssl.certificate.location [%s]", value);
-          if (result != RD_KAFKA_CONF_OK)
-            logger_->log_error("PutKafka: configure error result [%s]", 
errstr);
-        }
-        value = "";
-        if (context->getProperty(SecurityPrivateKey.getName(), value) && 
!value.empty()) {
-          rd_kafka_conf_set(conf_, "ssl.key.location", value.c_str(), errstr, 
sizeof(errstr));
-          logger_->log_info("PutKafka: ssl.key.location [%s]", value);
-          if (result != RD_KAFKA_CONF_OK)
-            logger_->log_error("PutKafka: configure error result [%s]", 
errstr);
-        }
-        value = "";
-        if (context->getProperty(SecurityPrivateKeyPassWord.getName(), value) 
&& !value.empty()) {
-          rd_kafka_conf_set(conf_, "ssl.key.password", value.c_str(), errstr, 
sizeof(errstr));
-          logger_->log_info("PutKafka: ssl.key.password [%s]", value);
-          if (result != RD_KAFKA_CONF_OK)
-            logger_->log_error("PutKafka: configure error result [%s]", 
errstr);
-        }
-      }
-    }
-  }
-  value = "";
-  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
-    topic_ = value;
-    logger_->log_info("PutKafka: topic [%s]", topic_);
-  } else {
-    logger_->log_info("PutKafka: topic not configured");
-    return;
-  }
-
-  rk_= rd_kafka_new(RD_KAFKA_PRODUCER, conf_,
-            errstr, sizeof(errstr));
-
-  if (!rk_) {
-    logger_->log_error("Failed to create kafak producer %s", errstr);
-    return;
-  }
-
-  rkt_ = rd_kafka_topic_new(rk_, topic_.c_str(), topic_conf_);
-
-  if (!rkt_) {
-    logger_->log_error("Failed to create topic %s", errstr);
-    return;
-  }
-
-}
-
-void PutKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &context, 
const std::shared_ptr<core::ProcessSession> &session) {
-  std::shared_ptr<core::FlowFile> flowFile = session->get();
-
-  if (!flowFile) {
-    return;
-  }
-
-  if (!rk_ || !rkt_) {
-    session->transfer(flowFile, Failure);
-    return;
-  }
-
-  PutKafka::ReadCallback callback(flowFile->getSize(), max_seg_size_, 
flowFile->getUUIDStr(), rkt_);
-  session->read(flowFile, &callback);
-  if (callback.status_ < 0) {
-    logger_->log_error("Failed to send flow to kafka topic %s", topic_);
-    session->transfer(flowFile, Failure);
-  } else {
-    logger_->log_debug("Sent flow with length %d to kafka topic %s", 
callback.read_size_, topic_);
-    session->transfer(flowFile, Success);
-  }
-}
-
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0756097a/extensions/librdkafka/PutKafka.h
----------------------------------------------------------------------
diff --git a/extensions/librdkafka/PutKafka.h b/extensions/librdkafka/PutKafka.h
deleted file mode 100644
index 0ef372e..0000000
--- a/extensions/librdkafka/PutKafka.h
+++ /dev/null
@@ -1,179 +0,0 @@
-/**
- * @file PutKafka.h
- * PutKafka class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __PUT_KAFKA_H__
-#define __PUT_KAFKA_H__
-
-#include "FlowFileRecord.h"
-#include "core/Processor.h"
-#include "core/ProcessSession.h"
-#include "core/Core.h"
-#include "core/Resource.h"
-#include "core/Property.h"
-#include "core/logging/LoggerConfiguration.h"
-#include "rdkafka.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-
-#define COMPRESSION_CODEC_NONE "none"
-#define COMPRESSION_CODEC_GZIP "gzip"
-#define COMPRESSION_CODEC_SNAPPY "snappy"
-#define ROUND_ROBIN_PARTITIONING "Round Robin"
-#define RANDOM_PARTITIONING "Random Robin"
-#define USER_DEFINED_PARTITIONING "User-Defined"
-#define DELIVERY_REPLICATED "all"
-#define DELIVERY_ONE_NODE "1"
-#define DELIVERY_BEST_EFFORT "0"
-#define SECURITY_PROTOCOL_PLAINTEXT "plaintext"
-#define SECURITY_PROTOCOL_SSL "ssl"
-#define SECURITY_PROTOCOL_SASL_PLAINTEXT "sasl_plaintext"
-#define SECURITY_PROTOCOL_SASL_SSL "sasl_ssl"
-
-// PutKafka Class
-class PutKafka: public core::Processor {
-public:
-  // Constructor
-  /*!
-   * Create a new processor
-   */
-  explicit PutKafka(std::string name, uuid_t uuid = NULL) :
-      core::Processor(name, uuid), 
logger_(logging::LoggerFactory<PutKafka>::getLogger()) {
-    conf_ = nullptr;
-    rk_ = nullptr;
-    topic_conf_ = nullptr;
-    rkt_ = nullptr;
-  }
-  // Destructor
-  virtual ~PutKafka() {
-    if (rk_)
-      rd_kafka_flush(rk_, 10*1000); /* wait for max 10 seconds */
-    if (rkt_)
-      rd_kafka_topic_destroy(rkt_);
-    if (rk_)
-      rd_kafka_destroy(rk_);
-  }
-  // Processor Name
-  static constexpr char const* ProcessorName = "PutKafka";
-  // Supported Properties
-  static core::Property SeedBrokers;
-  static core::Property Topic;
-  static core::Property DeliveryGuarantee;
-  static core::Property MaxMessageSize;
-  static core::Property RequestTimeOut;
-  static core::Property ClientName;
-  static core::Property BatchSize;
-  static core::Property QueueBufferMaxTime;
-  static core::Property QueueBufferMaxSize;
-  static core::Property QueueBufferMaxMessage;
-  static core::Property CompressCodec;
-  static core::Property MaxFlowSegSize;
-  static core::Property SecurityProtocol;
-  static core::Property SecurityCA;
-  static core::Property SecurityCert;
-  static core::Property SecurityPrivateKey;
-  static core::Property SecurityPrivateKeyPassWord;
-
-  // Supported Relationships
-  static core::Relationship Failure;
-  static core::Relationship Success;
-
-  // Nest Callback Class for read stream
-  class ReadCallback: public InputStreamCallback {
-  public:
-    ReadCallback(uint64_t flow_size, uint64_t max_seg_size, const std::string 
&key, rd_kafka_topic_t *rkt) :
-        flow_size_(flow_size), max_seg_size_(max_seg_size), key_(key), 
rkt_(rkt) {
-      status_ = 0;
-      read_size_ = 0;
-    }
-    ~ReadCallback() {
-    }
-    int64_t process(std::shared_ptr<io::BaseStream> stream) {
-      if (flow_size_ < max_seg_size_)
-        max_seg_size_ = flow_size_;
-      std::vector<unsigned char> buffer;
-      buffer.reserve(max_seg_size_);
-      read_size_ = 0;
-      status_ = 0;
-      while (read_size_ < flow_size_) {
-        int readRet = stream->read(&buffer[0], max_seg_size_);
-        if (readRet < 0) {
-          status_ = -1;
-          return read_size_;
-        }
-        if (readRet > 0) {
-          if (rd_kafka_produce(rkt_, RD_KAFKA_PARTITION_UA, 
RD_KAFKA_MSG_F_COPY, &buffer[0], readRet, key_.c_str(), key_.size(), NULL) == 
-1) {
-            status_ = -1;
-            return read_size_;
-          }
-          read_size_ += readRet;
-        } else {
-          break;
-        }
-      }
-      return read_size_;
-    }
-    uint64_t flow_size_;
-    uint64_t max_seg_size_;
-    std::string key_;
-    rd_kafka_topic_t *rkt_;
-    int status_;
-    int read_size_;
-  };
-
-public:
-  /**
-   * Function that's executed when the processor is scheduled.
-   * @param context process context.
-   * @param sessionFactory process session factory that is used when creating
-   * ProcessSession objects.
-   */
-  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory 
*sessionFactory);
-  // OnTrigger method, implemented by NiFi PutKafka
-  virtual void onTrigger(core::ProcessContext *context, core::ProcessSession 
*session) {
-  }
-  // OnTrigger method, implemented by NiFi PutKafka
-  virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, 
const std::shared_ptr<core::ProcessSession> &session);
-  // Initialize, over write by NiFi PutKafka
-  virtual void initialize(void);
-
-protected:
-
-private:
-  std::shared_ptr<logging::Logger> logger_;
-  rd_kafka_conf_t *conf_;
-  rd_kafka_t *rk_;
-  rd_kafka_topic_conf_t *topic_conf_;
-  rd_kafka_topic_t *rkt_;
-  std::string topic_;
-  uint64_t max_seg_size_;
-};
-
-REGISTER_RESOURCE (PutKafka);
-
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0756097a/extensions/librdkafka/RdKafkaLoader.h
----------------------------------------------------------------------
diff --git a/extensions/librdkafka/RdKafkaLoader.h 
b/extensions/librdkafka/RdKafkaLoader.h
index 2da0b57..ea650fc 100644
--- a/extensions/librdkafka/RdKafkaLoader.h
+++ b/extensions/librdkafka/RdKafkaLoader.h
@@ -18,7 +18,7 @@
 #ifndef EXTENSION_RDKAFKALOADER_H
 #define EXTENSION_RDKAFKALOADER_H
 
-#include "PutKafka.h"
+#include "PublishKafka.h"
 #include "core/ClassLoader.h"
 
 class __attribute__((visibility("default"))) RdKafkaFactory : public 
core::ObjectFactory {
@@ -44,13 +44,13 @@ class __attribute__((visibility("default"))) RdKafkaFactory 
: public core::Objec
    */
   virtual std::vector<std::string> getClassNames() {
     std::vector<std::string> class_names;
-    class_names.push_back("PutKafka");
+    class_names.push_back("PublishKafka");
     return class_names;
   }
 
   virtual std::unique_ptr<ObjectFactory> assign(const std::string &class_name) 
{
-    if (utils::StringUtils::equalsIgnoreCase(class_name, "PutKafka")) {
-      return std::unique_ptr<ObjectFactory>(new 
core::DefautObjectFactory<minifi::processors::PutKafka>());
+    if (utils::StringUtils::equalsIgnoreCase(class_name, "PublishKafka")) {
+      return std::unique_ptr<ObjectFactory>(new 
core::DefautObjectFactory<minifi::processors::PublishKafka>());
     } else {
       return nullptr;
     }

Reply via email to