Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 77f32f10e -> 0756097aa
MINIFICPP-327: Rename PutKafka to PublishKafka This closes #209. Signed-off-by: Marc Parisi <phroc...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/0756097a Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/0756097a Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/0756097a Branch: refs/heads/master Commit: 0756097aacef069ef61990309d9456a8d0ee7790 Parents: 77f32f1 Author: Bin Qiu <benqiu2...@gmail.com> Authored: Tue Nov 28 07:25:15 2017 -0800 Committer: Marc Parisi <phroc...@apache.org> Committed: Tue Nov 28 14:05:48 2017 -0500 ---------------------------------------------------------------------- CMakeLists.txt | 2 +- README.md | 2 +- extensions/librdkafka/PublishKafka.cpp | 280 ++++++++++++++++++++++++++++ extensions/librdkafka/PublishKafka.h | 180 ++++++++++++++++++ extensions/librdkafka/PutKafka.cpp | 274 --------------------------- extensions/librdkafka/PutKafka.h | 179 ------------------ extensions/librdkafka/RdKafkaLoader.h | 8 +- 7 files changed, 466 insertions(+), 459 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0756097a/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index e2c68f5..59253eb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -150,7 +150,7 @@ endif(ENABLE_PCAP) ## Create LibRdKafka Extension option(ENABLE_LIBRDKAFKA "Enables the librdkafka extension." OFF) if (ENABLE_LIBRDKAFKA) - createExtension(RDKAFKA-EXTENSIONS "RDKAFKA EXTENSIONS" "This Enables librdkafka functionality including PutKafka" "extensions/librdkafka" "${TEST_DIR}/kafka-tests" "TRUE" "thirdparty/librdkafka-0.11.1") + createExtension(RDKAFKA-EXTENSIONS "RDKAFKA EXTENSIONS" "This Enables librdkafka functionality including PublishKafka" "extensions/librdkafka" "${TEST_DIR}/kafka-tests" "TRUE" "thirdparty/librdkafka-0.11.1") endif() ## Scripting extensions http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0756097a/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index a21306c..ea5e45f 100644 --- a/README.md +++ b/README.md @@ -63,7 +63,7 @@ Perspectives of the role of MiNiFi should be from the perspective of the agent a * FocusArchive * UnfocusArchive * ManipulateArchive - * PutKafka + * PublishKafka * Provenance events generation is supported and are persisted using RocksDB. Volatile repositories can be used on systems without persistent storage. ## System Requirements http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0756097a/extensions/librdkafka/PublishKafka.cpp ---------------------------------------------------------------------- diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp new file mode 100644 index 0000000..dc99a80 --- /dev/null +++ b/extensions/librdkafka/PublishKafka.cpp @@ -0,0 +1,280 @@ +/** + * @file PublishKafka.cpp + * PublishKafka class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "PublishKafka.h" +#include <stdio.h> +#include <algorithm> +#include <memory> +#include <string> +#include <map> +#include <set> +#include "utils/TimeUtil.h" +#include "utils/StringUtils.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Property PublishKafka::SeedBrokers("Known Brokers", "A comma-separated list of known Kafka Brokers in the format <host>:<port>", ""); +core::Property PublishKafka::Topic("Topic Name", "The Kafka Topic of interest", ""); +core::Property PublishKafka::DeliveryGuarantee("Delivery Guarantee", "Specifies the requirement for guaranteeing that a message is sent to Kafka", DELIVERY_ONE_NODE); +core::Property PublishKafka::MaxMessageSize("Max Request Size", "Maximum Kafka protocol request message size", ""); +core::Property PublishKafka::RequestTimeOut("Request Timeout", "The ack timeout of the producer request in milliseconds", ""); +core::Property PublishKafka::ClientName("Client Name", "Client Name to use when communicating with Kafka", ""); +core::Property PublishKafka::BatchSize("Batch Size", "Maximum number of messages batched in one MessageSet", ""); +core::Property PublishKafka::QueueBufferMaxTime("Queue Buffering Max Time", "Delay to wait for messages in the producer queue to accumulate before constructing message batches", ""); +core::Property PublishKafka::QueueBufferMaxSize("Queue Max Buffer Size", "Maximum total message size sum allowed on the producer queue", ""); +core::Property PublishKafka::QueueBufferMaxMessage("Queue Max Message", "Maximum number of messages allowed on the producer queue", ""); +core::Property PublishKafka::CompressCodec("Compress Codec", "compression codec to use for compressing message sets", COMPRESSION_CODEC_NONE); +core::Property PublishKafka::MaxFlowSegSize("Max Flow Segment Size", "Maximum flow content payload segment size for the kafka record", ""); +core::Property PublishKafka::SecurityProtocol("Security Protocol", "Protocol used to communicate with brokers", ""); +core::Property PublishKafka::SecurityCA("Security CA", "File or directory path to CA certificate(s) for verifying the broker's key", ""); +core::Property PublishKafka::SecurityCert("Security Cert", "Path to client's public key (PEM) used for authentication", ""); +core::Property PublishKafka::SecurityPrivateKey("Security Private Key", "Path to client's private key (PEM) used for authentication", ""); +core::Property PublishKafka::SecurityPrivateKeyPassWord("Security Pass Phrase", "Private key passphrase", ""); +core::Relationship PublishKafka::Success("success", "Any FlowFile that is successfully sent to Kafka will be routed to this Relationship"); +core::Relationship PublishKafka::Failure("failure", "Any FlowFile that cannot be sent to Kafka will be routed to this Relationship"); + +void PublishKafka::initialize() { + // Set the supported properties + std::set<core::Property> properties; + properties.insert(SeedBrokers); + properties.insert(Topic); + properties.insert(DeliveryGuarantee); + properties.insert(MaxMessageSize); + properties.insert(RequestTimeOut); + properties.insert(ClientName); + properties.insert(BatchSize); + properties.insert(QueueBufferMaxTime); + properties.insert(QueueBufferMaxSize); + properties.insert(QueueBufferMaxMessage); + properties.insert(CompressCodec); + properties.insert(MaxFlowSegSize); + properties.insert(SecurityProtocol); + properties.insert(SecurityCA); + properties.insert(SecurityCert); + properties.insert(SecurityPrivateKey); + properties.insert(SecurityPrivateKeyPassWord); + setSupportedProperties(properties); + // Set the supported relationships + std::set<core::Relationship> relationships; + relationships.insert(Failure); + relationships.insert(Success); + setSupportedRelationships(relationships); +} + +void PublishKafka::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { + std::string value; + int64_t valInt; + std::string valueConf; + char errstr[512]; + rd_kafka_conf_res_t result; + + conf_ = rd_kafka_conf_new(); + topic_conf_ = rd_kafka_topic_conf_new(); + + if (context->getProperty(SeedBrokers.getName(), value) && !value.empty()) { + result = rd_kafka_conf_set(conf_, "bootstrap.servers", value.c_str(), errstr, sizeof(errstr)); + logger_->log_info("PublishKafka: bootstrap.servers [%s]", value); + if (result != RD_KAFKA_CONF_OK) + logger_->log_error("PublishKafka: configure error result [%s]", errstr); + } + value = ""; + if (context->getProperty(MaxMessageSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) { + valueConf = std::to_string(valInt); + result = rd_kafka_conf_set(conf_, "message.max.bytes", valueConf.c_str(), errstr, sizeof(errstr)); + logger_->log_info("PublishKafka: message.max.bytes [%s]", valueConf); + if (result != RD_KAFKA_CONF_OK) + logger_->log_error("PublishKafka: configure error result [%s]", errstr); + } + value = ""; + if (context->getProperty(ClientName.getName(), value) && !value.empty()) { + rd_kafka_conf_set(conf_, "client.id", value.c_str(), errstr, sizeof(errstr)); + logger_->log_info("PublishKafka: client.id [%s]", value); + if (result != RD_KAFKA_CONF_OK) + logger_->log_error("PublishKafka: configure error result [%s]", errstr); + } + value = ""; + if (context->getProperty(QueueBufferMaxMessage.getName(), value) && !value.empty()) { + rd_kafka_conf_set(conf_, "queue.buffering.max.messages", value.c_str(), errstr, sizeof(errstr)); + logger_->log_info("PublishKafka: queue.buffering.max.messages [%s]", value); + if (result != RD_KAFKA_CONF_OK) + logger_->log_error("PublishKafka: configure error result [%s]", errstr); + } + value = ""; + if (context->getProperty(QueueBufferMaxSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) { + valInt = valInt/1024; + valueConf = std::to_string(valInt); + rd_kafka_conf_set(conf_, "queue.buffering.max.kbytes", valueConf.c_str(), errstr, sizeof(errstr)); + logger_->log_info("PublishKafka: queue.buffering.max.kbytes [%s]", valueConf); + if (result != RD_KAFKA_CONF_OK) + logger_->log_error("PublishKafka: configure error result [%s]", errstr); + } + value = ""; + max_seg_size_ = ULLONG_MAX; + if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) { + max_seg_size_ = valInt; + logger_->log_info("PublishKafka: max flow segment size [%d]", max_seg_size_); + } + value = ""; + if (context->getProperty(QueueBufferMaxTime.getName(), value) && !value.empty()) { + core::TimeUnit unit; + if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) { + valueConf = std::to_string(valInt); + rd_kafka_conf_set(conf_, "queue.buffering.max.ms", valueConf.c_str(), errstr, sizeof(errstr)); + logger_->log_info("PublishKafka: queue.buffering.max.ms [%s]", valueConf); + if (result != RD_KAFKA_CONF_OK) + logger_->log_error("PublishKafka: configure error result [%s]", errstr); + } + } + value = ""; + if (context->getProperty(BatchSize.getName(), value) && !value.empty()) { + rd_kafka_conf_set(conf_, "batch.num.messages", value.c_str(), errstr, sizeof(errstr)); + logger_->log_info("PublishKafka: batch.num.messages [%s]", value); + if (result != RD_KAFKA_CONF_OK) + logger_->log_error("PublishKafka: configure error result [%s]", errstr); + } + value = ""; + if (context->getProperty(CompressCodec.getName(), value) && !value.empty()) { + rd_kafka_conf_set(conf_, "compression.codec", value.c_str(), errstr, sizeof(errstr)); + logger_->log_info("PublishKafka: compression.codec [%s]", value); + if (result != RD_KAFKA_CONF_OK) + logger_->log_error("PublishKafka: configure error result [%s]", errstr); + } + value = ""; + if (context->getProperty(DeliveryGuarantee.getName(), value) && !value.empty()) { + rd_kafka_topic_conf_set(topic_conf_, "request.required.acks", value.c_str(), errstr, sizeof(errstr)); + logger_->log_info("PublishKafka: request.required.acks [%s]", value); + if (result != RD_KAFKA_CONF_OK) + logger_->log_error("PublishKafka: configure error result [%s]", errstr); + } + value = ""; + if (context->getProperty(RequestTimeOut.getName(), value) && !value.empty()) { + core::TimeUnit unit; + if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) { + valueConf = std::to_string(valInt); + rd_kafka_topic_conf_set(topic_conf_, "request.timeout.ms", valueConf.c_str(), errstr, sizeof(errstr)); + logger_->log_info("PublishKafka: request.timeout.ms [%s]", valueConf); + if (result != RD_KAFKA_CONF_OK) + logger_->log_error("PublishKafka: configure error result [%s]", errstr); + } + } + value = ""; + if (context->getProperty(SecurityProtocol.getName(), value) && !value.empty()) { + if (value == SECURITY_PROTOCOL_SSL) { + rd_kafka_conf_set(conf_, "security.protocol", value.c_str(), errstr, sizeof(errstr)); + logger_->log_info("PublishKafka: security.protocol [%s]", value); + if (result != RD_KAFKA_CONF_OK) { + logger_->log_error("PublishKafka: configure error result [%s]", errstr); + } else { + value = ""; + if (context->getProperty(SecurityCA.getName(), value) && !value.empty()) { + rd_kafka_conf_set(conf_, "ssl.ca.location", value.c_str(), errstr, sizeof(errstr)); + logger_->log_info("PublishKafka: ssl.ca.location [%s]", value); + if (result != RD_KAFKA_CONF_OK) + logger_->log_error("PublishKafka: configure error result [%s]", errstr); + } + value = ""; + if (context->getProperty(SecurityCert.getName(), value) && !value.empty()) { + rd_kafka_conf_set(conf_, "ssl.certificate.location", value.c_str(), errstr, sizeof(errstr)); + logger_->log_info("PublishKafka: ssl.certificate.location [%s]", value); + if (result != RD_KAFKA_CONF_OK) + logger_->log_error("PublishKafka: configure error result [%s]", errstr); + } + value = ""; + if (context->getProperty(SecurityPrivateKey.getName(), value) && !value.empty()) { + rd_kafka_conf_set(conf_, "ssl.key.location", value.c_str(), errstr, sizeof(errstr)); + logger_->log_info("PublishKafka: ssl.key.location [%s]", value); + if (result != RD_KAFKA_CONF_OK) + logger_->log_error("PublishKafka: configure error result [%s]", errstr); + } + value = ""; + if (context->getProperty(SecurityPrivateKeyPassWord.getName(), value) && !value.empty()) { + rd_kafka_conf_set(conf_, "ssl.key.password", value.c_str(), errstr, sizeof(errstr)); + logger_->log_info("PublishKafka: ssl.key.password [%s]", value); + if (result != RD_KAFKA_CONF_OK) + logger_->log_error("PublishKafka: configure error result [%s]", errstr); + } + } + } + } + value = ""; + if (context->getProperty(Topic.getName(), value) && !value.empty()) { + topic_ = value; + logger_->log_info("PublishKafka: topic [%s]", topic_); + } else { + logger_->log_info("PublishKafka: topic not configured"); + return; + } + + rk_= rd_kafka_new(RD_KAFKA_PRODUCER, conf_, + errstr, sizeof(errstr)); + + if (!rk_) { + logger_->log_error("Failed to create kafak producer %s", errstr); + return; + } + + rkt_ = rd_kafka_topic_new(rk_, topic_.c_str(), topic_conf_); + + if (!rkt_) { + logger_->log_error("Failed to create topic %s", errstr); + return; + } + +} + +void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) { + std::shared_ptr<core::FlowFile> flowFile = session->get(); + + if (!flowFile) { + return; + } + + if (!rk_ || !rkt_) { + session->transfer(flowFile, Failure); + return; + } + + std::string kafkaKey = flowFile->getUUIDStr();; + std::string value; + + if (flowFile->getAttribute(KAFKA_KEY_ATTRIBUTE, value)) + kafkaKey = value; + + PublishKafka::ReadCallback callback(flowFile->getSize(), max_seg_size_, kafkaKey, rkt_); + session->read(flowFile, &callback); + if (callback.status_ < 0) { + logger_->log_error("Failed to send flow to kafka topic %s", topic_); + session->transfer(flowFile, Failure); + } else { + logger_->log_debug("Sent flow with length %d to kafka topic %s", callback.read_size_, topic_); + session->transfer(flowFile, Success); + } +} + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0756097a/extensions/librdkafka/PublishKafka.h ---------------------------------------------------------------------- diff --git a/extensions/librdkafka/PublishKafka.h b/extensions/librdkafka/PublishKafka.h new file mode 100644 index 0000000..65ed849 --- /dev/null +++ b/extensions/librdkafka/PublishKafka.h @@ -0,0 +1,180 @@ +/** + * @file PublishKafka.h + * PublishKafka class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __PUT_KAFKA_H__ +#define __PUT_KAFKA_H__ + +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/Property.h" +#include "core/logging/LoggerConfiguration.h" +#include "rdkafka.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +#define COMPRESSION_CODEC_NONE "none" +#define COMPRESSION_CODEC_GZIP "gzip" +#define COMPRESSION_CODEC_SNAPPY "snappy" +#define ROUND_ROBIN_PARTITIONING "Round Robin" +#define RANDOM_PARTITIONING "Random Robin" +#define USER_DEFINED_PARTITIONING "User-Defined" +#define DELIVERY_REPLICATED "all" +#define DELIVERY_ONE_NODE "1" +#define DELIVERY_BEST_EFFORT "0" +#define SECURITY_PROTOCOL_PLAINTEXT "plaintext" +#define SECURITY_PROTOCOL_SSL "ssl" +#define SECURITY_PROTOCOL_SASL_PLAINTEXT "sasl_plaintext" +#define SECURITY_PROTOCOL_SASL_SSL "sasl_ssl" +#define KAFKA_KEY_ATTRIBUTE "kafka.key" + +// PublishKafka Class +class PublishKafka: public core::Processor { +public: + // Constructor + /*! + * Create a new processor + */ + explicit PublishKafka(std::string name, uuid_t uuid = NULL) : + core::Processor(name, uuid), logger_(logging::LoggerFactory<PublishKafka>::getLogger()) { + conf_ = nullptr; + rk_ = nullptr; + topic_conf_ = nullptr; + rkt_ = nullptr; + } + // Destructor + virtual ~PublishKafka() { + if (rk_) + rd_kafka_flush(rk_, 10*1000); /* wait for max 10 seconds */ + if (rkt_) + rd_kafka_topic_destroy(rkt_); + if (rk_) + rd_kafka_destroy(rk_); + } + // Processor Name + static constexpr char const* ProcessorName = "PublishKafka"; + // Supported Properties + static core::Property SeedBrokers; + static core::Property Topic; + static core::Property DeliveryGuarantee; + static core::Property MaxMessageSize; + static core::Property RequestTimeOut; + static core::Property ClientName; + static core::Property BatchSize; + static core::Property QueueBufferMaxTime; + static core::Property QueueBufferMaxSize; + static core::Property QueueBufferMaxMessage; + static core::Property CompressCodec; + static core::Property MaxFlowSegSize; + static core::Property SecurityProtocol; + static core::Property SecurityCA; + static core::Property SecurityCert; + static core::Property SecurityPrivateKey; + static core::Property SecurityPrivateKeyPassWord; + + // Supported Relationships + static core::Relationship Failure; + static core::Relationship Success; + + // Nest Callback Class for read stream + class ReadCallback: public InputStreamCallback { + public: + ReadCallback(uint64_t flow_size, uint64_t max_seg_size, const std::string &key, rd_kafka_topic_t *rkt) : + flow_size_(flow_size), max_seg_size_(max_seg_size), key_(key), rkt_(rkt) { + status_ = 0; + read_size_ = 0; + } + ~ReadCallback() { + } + int64_t process(std::shared_ptr<io::BaseStream> stream) { + if (flow_size_ < max_seg_size_) + max_seg_size_ = flow_size_; + std::vector<unsigned char> buffer; + buffer.reserve(max_seg_size_); + read_size_ = 0; + status_ = 0; + while (read_size_ < flow_size_) { + int readRet = stream->read(&buffer[0], max_seg_size_); + if (readRet < 0) { + status_ = -1; + return read_size_; + } + if (readRet > 0) { + if (rd_kafka_produce(rkt_, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, &buffer[0], readRet, key_.c_str(), key_.size(), NULL) == -1) { + status_ = -1; + return read_size_; + } + read_size_ += readRet; + } else { + break; + } + } + return read_size_; + } + uint64_t flow_size_; + uint64_t max_seg_size_; + std::string key_; + rd_kafka_topic_t *rkt_; + int status_; + int read_size_; + }; + +public: + /** + * Function that's executed when the processor is scheduled. + * @param context process context. + * @param sessionFactory process session factory that is used when creating + * ProcessSession objects. + */ + void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); + // OnTrigger method, implemented by NiFi PublishKafka + virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session) { + } + // OnTrigger method, implemented by NiFi PublishKafka + virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session); + // Initialize, over write by NiFi PublishKafka + virtual void initialize(void); + +protected: + +private: + std::shared_ptr<logging::Logger> logger_; + rd_kafka_conf_t *conf_; + rd_kafka_t *rk_; + rd_kafka_topic_conf_t *topic_conf_; + rd_kafka_topic_t *rkt_; + std::string topic_; + uint64_t max_seg_size_; +}; + +REGISTER_RESOURCE (PublishKafka); + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0756097a/extensions/librdkafka/PutKafka.cpp ---------------------------------------------------------------------- diff --git a/extensions/librdkafka/PutKafka.cpp b/extensions/librdkafka/PutKafka.cpp deleted file mode 100644 index ed7391e..0000000 --- a/extensions/librdkafka/PutKafka.cpp +++ /dev/null @@ -1,274 +0,0 @@ -/** - * @file PutKafka.cpp - * PutKafka class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "PutKafka.h" -#include <stdio.h> -#include <algorithm> -#include <memory> -#include <string> -#include <map> -#include <set> -#include "utils/TimeUtil.h" -#include "utils/StringUtils.h" -#include "core/ProcessContext.h" -#include "core/ProcessSession.h" - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace processors { - -core::Property PutKafka::SeedBrokers("Known Brokers", "A comma-separated list of known Kafka Brokers in the format <host>:<port>", ""); -core::Property PutKafka::Topic("Topic Name", "The Kafka Topic of interest", ""); -core::Property PutKafka::DeliveryGuarantee("Delivery Guarantee", "Specifies the requirement for guaranteeing that a message is sent to Kafka", DELIVERY_ONE_NODE); -core::Property PutKafka::MaxMessageSize("Max Request Size", "Maximum Kafka protocol request message size", ""); -core::Property PutKafka::RequestTimeOut("Request Timeout", "The ack timeout of the producer request in milliseconds", ""); -core::Property PutKafka::ClientName("Client Name", "Client Name to use when communicating with Kafka", ""); -core::Property PutKafka::BatchSize("Batch Size", "Maximum number of messages batched in one MessageSet", ""); -core::Property PutKafka::QueueBufferMaxTime("Queue Buffering Max Time", "Delay to wait for messages in the producer queue to accumulate before constructing message batches", ""); -core::Property PutKafka::QueueBufferMaxSize("Queue Max Buffer Size", "Maximum total message size sum allowed on the producer queue", ""); -core::Property PutKafka::QueueBufferMaxMessage("Queue Max Message", "Maximum number of messages allowed on the producer queue", ""); -core::Property PutKafka::CompressCodec("Compress Codec", "compression codec to use for compressing message sets", COMPRESSION_CODEC_NONE); -core::Property PutKafka::MaxFlowSegSize("Max Flow Segment Size", "Maximum flow content payload segment size for the kafka record", ""); -core::Property PutKafka::SecurityProtocol("Security Protocol", "Protocol used to communicate with brokers", ""); -core::Property PutKafka::SecurityCA("Security CA", "File or directory path to CA certificate(s) for verifying the broker's key", ""); -core::Property PutKafka::SecurityCert("Security Cert", "Path to client's public key (PEM) used for authentication", ""); -core::Property PutKafka::SecurityPrivateKey("Security Private Key", "Path to client's private key (PEM) used for authentication", ""); -core::Property PutKafka::SecurityPrivateKeyPassWord("Security Pass Phrase", "Private key passphrase", ""); -core::Relationship PutKafka::Success("success", "Any FlowFile that is successfully sent to Kafka will be routed to this Relationship"); -core::Relationship PutKafka::Failure("failure", "Any FlowFile that cannot be sent to Kafka will be routed to this Relationship"); - -void PutKafka::initialize() { - // Set the supported properties - std::set<core::Property> properties; - properties.insert(SeedBrokers); - properties.insert(Topic); - properties.insert(DeliveryGuarantee); - properties.insert(MaxMessageSize); - properties.insert(RequestTimeOut); - properties.insert(ClientName); - properties.insert(BatchSize); - properties.insert(QueueBufferMaxTime); - properties.insert(QueueBufferMaxSize); - properties.insert(QueueBufferMaxMessage); - properties.insert(CompressCodec); - properties.insert(MaxFlowSegSize); - properties.insert(SecurityProtocol); - properties.insert(SecurityCA); - properties.insert(SecurityCert); - properties.insert(SecurityPrivateKey); - properties.insert(SecurityPrivateKeyPassWord); - setSupportedProperties(properties); - // Set the supported relationships - std::set<core::Relationship> relationships; - relationships.insert(Failure); - relationships.insert(Success); - setSupportedRelationships(relationships); -} - -void PutKafka::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { - std::string value; - int64_t valInt; - std::string valueConf; - char errstr[512]; - rd_kafka_conf_res_t result; - - conf_ = rd_kafka_conf_new(); - topic_conf_ = rd_kafka_topic_conf_new(); - - if (context->getProperty(SeedBrokers.getName(), value) && !value.empty()) { - result = rd_kafka_conf_set(conf_, "bootstrap.servers", value.c_str(), errstr, sizeof(errstr)); - logger_->log_info("PutKafka: bootstrap.servers [%s]", value); - if (result != RD_KAFKA_CONF_OK) - logger_->log_error("PutKafka: configure error result [%s]", errstr); - } - value = ""; - if (context->getProperty(MaxMessageSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) { - valueConf = std::to_string(valInt); - result = rd_kafka_conf_set(conf_, "message.max.bytes", valueConf.c_str(), errstr, sizeof(errstr)); - logger_->log_info("PutKafka: message.max.bytes [%s]", valueConf); - if (result != RD_KAFKA_CONF_OK) - logger_->log_error("PutKafka: configure error result [%s]", errstr); - } - value = ""; - if (context->getProperty(ClientName.getName(), value) && !value.empty()) { - rd_kafka_conf_set(conf_, "client.id", value.c_str(), errstr, sizeof(errstr)); - logger_->log_info("PutKafka: client.id [%s]", value); - if (result != RD_KAFKA_CONF_OK) - logger_->log_error("PutKafka: configure error result [%s]", errstr); - } - value = ""; - if (context->getProperty(QueueBufferMaxMessage.getName(), value) && !value.empty()) { - rd_kafka_conf_set(conf_, "queue.buffering.max.messages", value.c_str(), errstr, sizeof(errstr)); - logger_->log_info("PutKafka: queue.buffering.max.messages [%s]", value); - if (result != RD_KAFKA_CONF_OK) - logger_->log_error("PutKafka: configure error result [%s]", errstr); - } - value = ""; - if (context->getProperty(QueueBufferMaxSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) { - valInt = valInt/1024; - valueConf = std::to_string(valInt); - rd_kafka_conf_set(conf_, "queue.buffering.max.kbytes", valueConf.c_str(), errstr, sizeof(errstr)); - logger_->log_info("PutKafka: queue.buffering.max.kbytes [%s]", valueConf); - if (result != RD_KAFKA_CONF_OK) - logger_->log_error("PutKafka: configure error result [%s]", errstr); - } - value = ""; - max_seg_size_ = ULLONG_MAX; - if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) { - max_seg_size_ = valInt; - logger_->log_info("PutKafka: max flow segment size [%d]", max_seg_size_); - } - value = ""; - if (context->getProperty(QueueBufferMaxTime.getName(), value) && !value.empty()) { - core::TimeUnit unit; - if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) { - valueConf = std::to_string(valInt); - rd_kafka_conf_set(conf_, "queue.buffering.max.ms", valueConf.c_str(), errstr, sizeof(errstr)); - logger_->log_info("PutKafka: queue.buffering.max.ms [%s]", valueConf); - if (result != RD_KAFKA_CONF_OK) - logger_->log_error("PutKafka: configure error result [%s]", errstr); - } - } - value = ""; - if (context->getProperty(BatchSize.getName(), value) && !value.empty()) { - rd_kafka_conf_set(conf_, "batch.num.messages", value.c_str(), errstr, sizeof(errstr)); - logger_->log_info("PutKafka: batch.num.messages [%s]", value); - if (result != RD_KAFKA_CONF_OK) - logger_->log_error("PutKafka: configure error result [%s]", errstr); - } - value = ""; - if (context->getProperty(CompressCodec.getName(), value) && !value.empty()) { - rd_kafka_conf_set(conf_, "compression.codec", value.c_str(), errstr, sizeof(errstr)); - logger_->log_info("PutKafka: compression.codec [%s]", value); - if (result != RD_KAFKA_CONF_OK) - logger_->log_error("PutKafka: configure error result [%s]", errstr); - } - value = ""; - if (context->getProperty(DeliveryGuarantee.getName(), value) && !value.empty()) { - rd_kafka_topic_conf_set(topic_conf_, "request.required.acks", value.c_str(), errstr, sizeof(errstr)); - logger_->log_info("PutKafka: request.required.acks [%s]", value); - if (result != RD_KAFKA_CONF_OK) - logger_->log_error("PutKafka: configure error result [%s]", errstr); - } - value = ""; - if (context->getProperty(RequestTimeOut.getName(), value) && !value.empty()) { - core::TimeUnit unit; - if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) { - valueConf = std::to_string(valInt); - rd_kafka_topic_conf_set(topic_conf_, "request.timeout.ms", valueConf.c_str(), errstr, sizeof(errstr)); - logger_->log_info("PutKafka: request.timeout.ms [%s]", valueConf); - if (result != RD_KAFKA_CONF_OK) - logger_->log_error("PutKafka: configure error result [%s]", errstr); - } - } - value = ""; - if (context->getProperty(SecurityProtocol.getName(), value) && !value.empty()) { - if (value == SECURITY_PROTOCOL_SSL) { - rd_kafka_conf_set(conf_, "security.protocol", value.c_str(), errstr, sizeof(errstr)); - logger_->log_info("PutKafka: security.protocol [%s]", value); - if (result != RD_KAFKA_CONF_OK) { - logger_->log_error("PutKafka: configure error result [%s]", errstr); - } else { - value = ""; - if (context->getProperty(SecurityCA.getName(), value) && !value.empty()) { - rd_kafka_conf_set(conf_, "ssl.ca.location", value.c_str(), errstr, sizeof(errstr)); - logger_->log_info("PutKafka: ssl.ca.location [%s]", value); - if (result != RD_KAFKA_CONF_OK) - logger_->log_error("PutKafka: configure error result [%s]", errstr); - } - value = ""; - if (context->getProperty(SecurityCert.getName(), value) && !value.empty()) { - rd_kafka_conf_set(conf_, "ssl.certificate.location", value.c_str(), errstr, sizeof(errstr)); - logger_->log_info("PutKafka: ssl.certificate.location [%s]", value); - if (result != RD_KAFKA_CONF_OK) - logger_->log_error("PutKafka: configure error result [%s]", errstr); - } - value = ""; - if (context->getProperty(SecurityPrivateKey.getName(), value) && !value.empty()) { - rd_kafka_conf_set(conf_, "ssl.key.location", value.c_str(), errstr, sizeof(errstr)); - logger_->log_info("PutKafka: ssl.key.location [%s]", value); - if (result != RD_KAFKA_CONF_OK) - logger_->log_error("PutKafka: configure error result [%s]", errstr); - } - value = ""; - if (context->getProperty(SecurityPrivateKeyPassWord.getName(), value) && !value.empty()) { - rd_kafka_conf_set(conf_, "ssl.key.password", value.c_str(), errstr, sizeof(errstr)); - logger_->log_info("PutKafka: ssl.key.password [%s]", value); - if (result != RD_KAFKA_CONF_OK) - logger_->log_error("PutKafka: configure error result [%s]", errstr); - } - } - } - } - value = ""; - if (context->getProperty(Topic.getName(), value) && !value.empty()) { - topic_ = value; - logger_->log_info("PutKafka: topic [%s]", topic_); - } else { - logger_->log_info("PutKafka: topic not configured"); - return; - } - - rk_= rd_kafka_new(RD_KAFKA_PRODUCER, conf_, - errstr, sizeof(errstr)); - - if (!rk_) { - logger_->log_error("Failed to create kafak producer %s", errstr); - return; - } - - rkt_ = rd_kafka_topic_new(rk_, topic_.c_str(), topic_conf_); - - if (!rkt_) { - logger_->log_error("Failed to create topic %s", errstr); - return; - } - -} - -void PutKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) { - std::shared_ptr<core::FlowFile> flowFile = session->get(); - - if (!flowFile) { - return; - } - - if (!rk_ || !rkt_) { - session->transfer(flowFile, Failure); - return; - } - - PutKafka::ReadCallback callback(flowFile->getSize(), max_seg_size_, flowFile->getUUIDStr(), rkt_); - session->read(flowFile, &callback); - if (callback.status_ < 0) { - logger_->log_error("Failed to send flow to kafka topic %s", topic_); - session->transfer(flowFile, Failure); - } else { - logger_->log_debug("Sent flow with length %d to kafka topic %s", callback.read_size_, topic_); - session->transfer(flowFile, Success); - } -} - -} /* namespace processors */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0756097a/extensions/librdkafka/PutKafka.h ---------------------------------------------------------------------- diff --git a/extensions/librdkafka/PutKafka.h b/extensions/librdkafka/PutKafka.h deleted file mode 100644 index 0ef372e..0000000 --- a/extensions/librdkafka/PutKafka.h +++ /dev/null @@ -1,179 +0,0 @@ -/** - * @file PutKafka.h - * PutKafka class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __PUT_KAFKA_H__ -#define __PUT_KAFKA_H__ - -#include "FlowFileRecord.h" -#include "core/Processor.h" -#include "core/ProcessSession.h" -#include "core/Core.h" -#include "core/Resource.h" -#include "core/Property.h" -#include "core/logging/LoggerConfiguration.h" -#include "rdkafka.h" - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace processors { - -#define COMPRESSION_CODEC_NONE "none" -#define COMPRESSION_CODEC_GZIP "gzip" -#define COMPRESSION_CODEC_SNAPPY "snappy" -#define ROUND_ROBIN_PARTITIONING "Round Robin" -#define RANDOM_PARTITIONING "Random Robin" -#define USER_DEFINED_PARTITIONING "User-Defined" -#define DELIVERY_REPLICATED "all" -#define DELIVERY_ONE_NODE "1" -#define DELIVERY_BEST_EFFORT "0" -#define SECURITY_PROTOCOL_PLAINTEXT "plaintext" -#define SECURITY_PROTOCOL_SSL "ssl" -#define SECURITY_PROTOCOL_SASL_PLAINTEXT "sasl_plaintext" -#define SECURITY_PROTOCOL_SASL_SSL "sasl_ssl" - -// PutKafka Class -class PutKafka: public core::Processor { -public: - // Constructor - /*! - * Create a new processor - */ - explicit PutKafka(std::string name, uuid_t uuid = NULL) : - core::Processor(name, uuid), logger_(logging::LoggerFactory<PutKafka>::getLogger()) { - conf_ = nullptr; - rk_ = nullptr; - topic_conf_ = nullptr; - rkt_ = nullptr; - } - // Destructor - virtual ~PutKafka() { - if (rk_) - rd_kafka_flush(rk_, 10*1000); /* wait for max 10 seconds */ - if (rkt_) - rd_kafka_topic_destroy(rkt_); - if (rk_) - rd_kafka_destroy(rk_); - } - // Processor Name - static constexpr char const* ProcessorName = "PutKafka"; - // Supported Properties - static core::Property SeedBrokers; - static core::Property Topic; - static core::Property DeliveryGuarantee; - static core::Property MaxMessageSize; - static core::Property RequestTimeOut; - static core::Property ClientName; - static core::Property BatchSize; - static core::Property QueueBufferMaxTime; - static core::Property QueueBufferMaxSize; - static core::Property QueueBufferMaxMessage; - static core::Property CompressCodec; - static core::Property MaxFlowSegSize; - static core::Property SecurityProtocol; - static core::Property SecurityCA; - static core::Property SecurityCert; - static core::Property SecurityPrivateKey; - static core::Property SecurityPrivateKeyPassWord; - - // Supported Relationships - static core::Relationship Failure; - static core::Relationship Success; - - // Nest Callback Class for read stream - class ReadCallback: public InputStreamCallback { - public: - ReadCallback(uint64_t flow_size, uint64_t max_seg_size, const std::string &key, rd_kafka_topic_t *rkt) : - flow_size_(flow_size), max_seg_size_(max_seg_size), key_(key), rkt_(rkt) { - status_ = 0; - read_size_ = 0; - } - ~ReadCallback() { - } - int64_t process(std::shared_ptr<io::BaseStream> stream) { - if (flow_size_ < max_seg_size_) - max_seg_size_ = flow_size_; - std::vector<unsigned char> buffer; - buffer.reserve(max_seg_size_); - read_size_ = 0; - status_ = 0; - while (read_size_ < flow_size_) { - int readRet = stream->read(&buffer[0], max_seg_size_); - if (readRet < 0) { - status_ = -1; - return read_size_; - } - if (readRet > 0) { - if (rd_kafka_produce(rkt_, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, &buffer[0], readRet, key_.c_str(), key_.size(), NULL) == -1) { - status_ = -1; - return read_size_; - } - read_size_ += readRet; - } else { - break; - } - } - return read_size_; - } - uint64_t flow_size_; - uint64_t max_seg_size_; - std::string key_; - rd_kafka_topic_t *rkt_; - int status_; - int read_size_; - }; - -public: - /** - * Function that's executed when the processor is scheduled. - * @param context process context. - * @param sessionFactory process session factory that is used when creating - * ProcessSession objects. - */ - void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); - // OnTrigger method, implemented by NiFi PutKafka - virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session) { - } - // OnTrigger method, implemented by NiFi PutKafka - virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session); - // Initialize, over write by NiFi PutKafka - virtual void initialize(void); - -protected: - -private: - std::shared_ptr<logging::Logger> logger_; - rd_kafka_conf_t *conf_; - rd_kafka_t *rk_; - rd_kafka_topic_conf_t *topic_conf_; - rd_kafka_topic_t *rkt_; - std::string topic_; - uint64_t max_seg_size_; -}; - -REGISTER_RESOURCE (PutKafka); - -} /* namespace processors */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0756097a/extensions/librdkafka/RdKafkaLoader.h ---------------------------------------------------------------------- diff --git a/extensions/librdkafka/RdKafkaLoader.h b/extensions/librdkafka/RdKafkaLoader.h index 2da0b57..ea650fc 100644 --- a/extensions/librdkafka/RdKafkaLoader.h +++ b/extensions/librdkafka/RdKafkaLoader.h @@ -18,7 +18,7 @@ #ifndef EXTENSION_RDKAFKALOADER_H #define EXTENSION_RDKAFKALOADER_H -#include "PutKafka.h" +#include "PublishKafka.h" #include "core/ClassLoader.h" class __attribute__((visibility("default"))) RdKafkaFactory : public core::ObjectFactory { @@ -44,13 +44,13 @@ class __attribute__((visibility("default"))) RdKafkaFactory : public core::Objec */ virtual std::vector<std::string> getClassNames() { std::vector<std::string> class_names; - class_names.push_back("PutKafka"); + class_names.push_back("PublishKafka"); return class_names; } virtual std::unique_ptr<ObjectFactory> assign(const std::string &class_name) { - if (utils::StringUtils::equalsIgnoreCase(class_name, "PutKafka")) { - return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::processors::PutKafka>()); + if (utils::StringUtils::equalsIgnoreCase(class_name, "PublishKafka")) { + return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::processors::PublishKafka>()); } else { return nullptr; }