[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-02-17 Thread GitBox


szaszm commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r577733272



##
File path: extensions/librdkafka/rdkafka_utils.cpp
##
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+
+#include "rdkafka_utils.h"
+
+#include "Exception.h"
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+void setKafkaConfigurationField(rd_kafka_conf_t* configuration, const 
std::string& field_name, const std::string& value) {
+  static std::array errstr{};
+  rd_kafka_conf_res_t result;
+  result = rd_kafka_conf_set(configuration, field_name.c_str(), value.c_str(), 
errstr.data(), errstr.size());
+  if (RD_KAFKA_CONF_OK != result) {
+const std::string error_msg { errstr.data() };
+throw Exception(PROCESS_SCHEDULE_EXCEPTION, "rd_kafka configuration error: 
" + error_msg);
+  }
+}
+
+void print_topics_list(logging::Logger& logger, 
rd_kafka_topic_partition_list_t* kf_topic_partition_list) {
+  for (std::size_t i = 0; i < kf_topic_partition_list->cnt; ++i) {
+logger.log_debug("kf_topic_partition_list: topic: %s, partition: %d, 
offset: %" PRId64 ".",
+kf_topic_partition_list->elems[i].topic, 
kf_topic_partition_list->elems[i].partition, 
kf_topic_partition_list->elems[i].offset);
+  }
+}
+
+std::string get_human_readable_kafka_message_timestamp(const 
rd_kafka_message_t* rkmessage) {
+  rd_kafka_timestamp_type_t tstype;
+  int64_t timestamp;
+  timestamp = rd_kafka_message_timestamp(rkmessage, );
+  const char *tsname = "?";
+  if (tstype == RD_KAFKA_TIMESTAMP_CREATE_TIME) {
+tsname = "create time";
+  } else if (tstype == RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME) {
+tsname = "log append time";
+  }
+  const int64_t seconds_since_timestamp = timestamp == -1 ? 0 : 
static_cast(time(NULL)) - static_cast(timestamp / 1000);
+  return {"[Timestamp](" + std::string(tsname) + " " + 
std::to_string(timestamp) + " (" + std::to_string(seconds_since_timestamp) + " 
s ago)"};
+}
+
+std::string get_human_readable_kafka_message_headers(const rd_kafka_message_t* 
rkmessage, logging::Logger& logger) {
+  rd_kafka_headers_t* hdrs;
+  const rd_kafka_resp_err_t get_header_response = 
rd_kafka_message_headers(rkmessage, );
+  if (RD_KAFKA_RESP_ERR_NO_ERROR == get_header_response) {
+std::vector header_list;
+kafka_headers_for_each(hdrs, [&] (const std::string& key, gsl::span val) { header_list.emplace_back(key + ": " + std::string{ val.data(), 
val.size() }); });
+return StringUtils::join(", ", header_list);
+  }
+  if (RD_KAFKA_RESP_ERR__NOENT == get_header_response) {
+return "[None]";
+  }
+  logger.log_error("Failed to fetch message headers: %d: %s", 
rd_kafka_last_error(), rd_kafka_err2str(rd_kafka_last_error()));
+  return "[Error]";
+}
+
+void print_kafka_message(const rd_kafka_message_t* rkmessage, logging::Logger& 
logger) {

Review comment:
   This is still taking an unchecked pointer

##
File path: extensions/librdkafka/rdkafka_utils.cpp
##
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+
+#include "rdkafka_utils.h"
+
+#include "Exception.h"
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+void setKafkaConfigurationField(rd_kafka_conf_t* configuration, const 
std::string& 

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-02-17 Thread GitBox


szaszm commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r577651674



##
File path: extensions/librdkafka/ConsumeKafka.h
##
@@ -124,6 +124,17 @@ class ConsumeKafka : public core::Processor {
   // Initialize, overwrite by NiFi RetryFlowFile
   void initialize() override;
 
+  class WriteCallback : public OutputStreamCallback {

Review comment:
   Does it need to be public? It's only used inside the class IIRC





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-02-17 Thread GitBox


szaszm commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r577519674



##
File path: extensions/librdkafka/ConsumeKafka.cpp
##
@@ -0,0 +1,578 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConsumeKafka.h"
+
+#include 
+#include 
+
+#include "core/PropertyValidation.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/gsl.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+// The upper limit for Max Poll Time is 4 seconds. This is because Watchdog 
would potentially start
+// reporting issues with the processor health otherwise
+class ConsumeKafkaMaxPollTimeValidator : public TimePeriodValidator {
+ public:
+  ConsumeKafkaMaxPollTimeValidator(const std::string ) // NOLINT
+  : TimePeriodValidator(name) {
+  }
+  ~ConsumeKafkaMaxPollTimeValidator() override = default;
+
+  ValidationResult validate(const std::string& subject, const std::string& 
input) const override {
+uint64_t value;
+TimeUnit timeUnit;
+uint64_t value_as_ms;
+return 
ValidationResult::Builder::createBuilder().withSubject(subject).withInput(input).isValid(
+core::TimePeriodValue::StringToTime(input, value, timeUnit) &&
+org::apache::nifi::minifi::core::Property::ConvertTimeUnitToMS(value, 
timeUnit, value_as_ms) &&
+0 < value_as_ms && value_as_ms <= 4000).build();
+  }
+};
+}  // namespace core
+namespace processors {
+
+constexpr const std::size_t ConsumeKafka::DEFAULT_MAX_POLL_RECORDS;
+constexpr char const* ConsumeKafka::DEFAULT_MAX_POLL_TIME;
+
+constexpr char const* ConsumeKafka::TOPIC_FORMAT_NAMES;
+constexpr char const* ConsumeKafka::TOPIC_FORMAT_PATTERNS;
+
+core::Property 
ConsumeKafka::KafkaBrokers(core::PropertyBuilder::createProperty("Kafka 
Brokers")
+  ->withDescription("A comma-separated list of known Kafka Brokers in the 
format :.")
+  ->withDefaultValue("localhost:9092", 
core::StandardValidators::get().NON_BLANK_VALIDATOR)
+  ->supportsExpressionLanguage(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::SecurityProtocol(core::PropertyBuilder::createProperty("Security 
Protocol")
+  ->withDescription("This property is currently not supported. Protocol used 
to communicate with brokers. Corresponds to Kafka's 'security.protocol' 
property.")
+  ->withAllowableValues({SECURITY_PROTOCOL_PLAINTEXT/*, 
SECURITY_PROTOCOL_SSL, SECURITY_PROTOCOL_SASL_PLAINTEXT, 
SECURITY_PROTOCOL_SASL_SSL*/ })
+  ->withDefaultValue(SECURITY_PROTOCOL_PLAINTEXT)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::TopicNames(core::PropertyBuilder::createProperty("Topic Names")
+  ->withDescription("The name of the Kafka Topic(s) to pull from. Multiple 
topic names are supported as a comma separated list.")
+  ->supportsExpressionLanguage(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::TopicNameFormat(core::PropertyBuilder::createProperty("Topic Name 
Format")
+  ->withDescription("Specifies whether the Topic(s) provided are a comma 
separated list of names or a single regular expression.")
+  ->withAllowableValues({TOPIC_FORMAT_NAMES, 
TOPIC_FORMAT_PATTERNS})
+  ->withDefaultValue(TOPIC_FORMAT_NAMES)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::HonorTransactions(core::PropertyBuilder::createProperty("Honor 
Transactions")
+  ->withDescription(
+  "Specifies whether or not MiNiFi should honor transactional guarantees 
when communicating with Kafka. If false, the Processor will use an \"isolation 
level\" of "
+  "read_uncomitted. This means that messages will be received as soon as 
they are written to Kafka but will be pulled, even if the producer cancels the 
transactions. "
+  "If this value is true, MiNiFi will not receive any messages for which 
the producer's transaction was canceled, but this can result in some latency 
since the consumer "
+  "must wait for the producer to finish its entire transaction instead of 
pulling as the messages become available.")
+  ->withDefaultValue(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-02-17 Thread GitBox


szaszm commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r577493327



##
File path: extensions/librdkafka/ConsumeKafka.h
##
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include 
+#include 
+#include 
+#include 
+
+#include "core/Processor.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "rdkafka.h"
+#include "rdkafka_utils.h"
+#include "KafkaConnection.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class ConsumeKafka : public core::Processor {
+ public:
+  static constexpr char const* ProcessorName = "ConsumeKafka";
+
+  // Supported Properties
+  static core::Property KafkaBrokers;
+  static core::Property SecurityProtocol;
+  static core::Property TopicNames;
+  static core::Property TopicNameFormat;
+  static core::Property HonorTransactions;
+  static core::Property GroupID;
+  static core::Property OffsetReset;
+  static core::Property KeyAttributeEncoding;
+  static core::Property MessageDemarcator;
+  static core::Property MessageHeaderEncoding;
+  static core::Property HeadersToAddAsAttributes;
+  static core::Property DuplicateHeaderHandling;
+  static core::Property MaxPollRecords;
+  static core::Property MaxPollTime;
+  static core::Property SessionTimeout;
+
+  // Supported Relationships
+  static const core::Relationship Success;
+
+  // Security Protocol allowable values
+  static constexpr char const* SECURITY_PROTOCOL_PLAINTEXT = "PLAINTEXT";
+  static constexpr char const* SECURITY_PROTOCOL_SSL = "SSL";
+  static constexpr char const* SECURITY_PROTOCOL_SASL_PLAINTEXT = 
"SASL_PLAINTEXT";
+  static constexpr char const* SECURITY_PROTOCOL_SASL_SSL = "SASL_SSL";
+
+  // Topic Name Format allowable values
+  static constexpr char const* TOPIC_FORMAT_NAMES = "Names";
+  static constexpr char const* TOPIC_FORMAT_PATTERNS = "Patterns";
+
+  // Offset Reset allowable values
+  static constexpr char const* OFFSET_RESET_EARLIEST = "earliest";
+  static constexpr char const* OFFSET_RESET_LATEST = "latest";
+  static constexpr char const* OFFSET_RESET_NONE = "none";
+
+  // Key Attribute Encoding allowable values
+  static constexpr char const* KEY_ATTR_ENCODING_UTF_8 = "UTF-8";
+  static constexpr char const* KEY_ATTR_ENCODING_HEX = "Hex";
+
+  // Message Header Encoding allowable values
+  static constexpr char const* MSG_HEADER_ENCODING_UTF_8 = "UTF-8";
+  static constexpr char const* MSG_HEADER_ENCODING_HEX = "Hex";
+
+  // Duplicate Header Handling allowable values
+  static constexpr char const* MSG_HEADER_KEEP_FIRST = "Keep First";
+  static constexpr char const* MSG_HEADER_KEEP_LATEST = "Keep Latest";
+  static constexpr char const* MSG_HEADER_COMMA_SEPARATED_MERGE = 
"Comma-separated Merge";
+
+  // Flowfile attributes written
+  static constexpr char const* KAFKA_COUNT_ATTR = "kafka.count";  // Always 1 
until we start supporting merging from batches
+  static constexpr char const* KAFKA_MESSAGE_KEY_ATTR = "kafka.key";
+  static constexpr char const* KAFKA_OFFSET_ATTR = "kafka.offset";
+  static constexpr char const* KAFKA_PARTITION_ATTR = "kafka.partition";
+  static constexpr char const* KAFKA_TOPIC_ATTR = "kafka.topic";
+
+  static constexpr const std::size_t DEFAULT_MAX_POLL_RECORDS{ 1 };
+  static constexpr char const* DEFAULT_MAX_POLL_TIME = "4 seconds";
+  static constexpr const std::size_t METADATA_COMMUNICATIONS_TIMEOUT_MS{ 6 
};
+
+  explicit ConsumeKafka(std::string name, utils::Identifier uuid = 
utils::Identifier()) :

Review comment:
   I wouldn't bother, but feel free if you like.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-02-16 Thread GitBox


szaszm commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r576730170



##
File path: libminifi/test/TestBase.cpp
##
@@ -111,28 +109,19 @@ std::shared_ptr 
TestPlan::addProcessor(const std::shared_ptr node = 
std::make_shared(processor);
-
   processor_nodes_.push_back(node);
-
   // std::shared_ptr context = 
std::make_shared(node, controller_services_provider_, 
prov_repo_, flow_repo_, configuration_, content_repo_);
-
   auto contextBuilder = 
core::ClassLoader::getDefaultClassLoader().instantiate("ProcessContextBuilder");
-
   contextBuilder = 
contextBuilder->withContentRepository(content_repo_)->withFlowFileRepository(flow_repo_)->withProvider(controller_services_provider_.get())->withProvenanceRepository(prov_repo_)->withConfiguration(configuration_);
-
   auto context = contextBuilder->build(node);
-
   processor_contexts_.push_back(context);
-
   processor_queue_.push_back(processor);
-
   return processor;
 }
 
 std::shared_ptr TestPlan::addProcessor(const std::string 
_name, const utils::Identifier& uuid, const std::string ,
-const 
std::initializer_list& relationships, bool linkToPrevious) {
+  const std::initializer_list& relationships, bool 
linkToPrevious) {

Review comment:
   Continuation indentation should be 4 spaces or aligned. My preference is 
4 spaces (or more generally 2 levels of indentation), the style guide's is 
aligned.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-02-16 Thread GitBox


szaszm commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r576730170



##
File path: libminifi/test/TestBase.cpp
##
@@ -111,28 +109,19 @@ std::shared_ptr 
TestPlan::addProcessor(const std::shared_ptr node = 
std::make_shared(processor);
-
   processor_nodes_.push_back(node);
-
   // std::shared_ptr context = 
std::make_shared(node, controller_services_provider_, 
prov_repo_, flow_repo_, configuration_, content_repo_);
-
   auto contextBuilder = 
core::ClassLoader::getDefaultClassLoader().instantiate("ProcessContextBuilder");
-
   contextBuilder = 
contextBuilder->withContentRepository(content_repo_)->withFlowFileRepository(flow_repo_)->withProvider(controller_services_provider_.get())->withProvenanceRepository(prov_repo_)->withConfiguration(configuration_);
-
   auto context = contextBuilder->build(node);
-
   processor_contexts_.push_back(context);
-
   processor_queue_.push_back(processor);
-
   return processor;
 }
 
 std::shared_ptr TestPlan::addProcessor(const std::string 
_name, const utils::Identifier& uuid, const std::string ,
-const 
std::initializer_list& relationships, bool linkToPrevious) {
+  const std::initializer_list& relationships, bool 
linkToPrevious) {

Review comment:
   Continuation indentation should be 4 spaces or aligned. My preference is 
4 spaces, the style guide's is aligned.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-02-16 Thread GitBox


szaszm commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r576711179



##
File path: extensions/librdkafka/ConsumeKafka.h
##
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include 
+#include 
+#include 
+#include 
+
+#include "core/Processor.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "rdkafka.h"
+#include "rdkafka_utils.h"
+#include "KafkaConnection.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class ConsumeKafka : public core::Processor {
+ public:
+  static constexpr char const* ProcessorName = "ConsumeKafka";
+
+  // Supported Properties
+  static core::Property KafkaBrokers;
+  static core::Property SecurityProtocol;
+  static core::Property TopicNames;
+  static core::Property TopicNameFormat;
+  static core::Property HonorTransactions;
+  static core::Property GroupID;
+  static core::Property OffsetReset;
+  static core::Property KeyAttributeEncoding;
+  static core::Property MessageDemarcator;
+  static core::Property MessageHeaderEncoding;
+  static core::Property HeadersToAddAsAttributes;
+  static core::Property DuplicateHeaderHandling;
+  static core::Property MaxPollRecords;
+  static core::Property MaxPollTime;
+  static core::Property SessionTimeout;
+
+  // Supported Relationships
+  static const core::Relationship Success;
+
+  // Security Protocol allowable values
+  static constexpr char const* SECURITY_PROTOCOL_PLAINTEXT = "PLAINTEXT";
+  static constexpr char const* SECURITY_PROTOCOL_SSL = "SSL";
+  static constexpr char const* SECURITY_PROTOCOL_SASL_PLAINTEXT = 
"SASL_PLAINTEXT";
+  static constexpr char const* SECURITY_PROTOCOL_SASL_SSL = "SASL_SSL";
+
+  // Topic Name Format allowable values
+  static constexpr char const* TOPIC_FORMAT_NAMES = "Names";
+  static constexpr char const* TOPIC_FORMAT_PATTERNS = "Patterns";
+
+  // Offset Reset allowable values
+  static constexpr char const* OFFSET_RESET_EARLIEST = "earliest";
+  static constexpr char const* OFFSET_RESET_LATEST = "latest";
+  static constexpr char const* OFFSET_RESET_NONE = "none";
+
+  // Key Attribute Encoding allowable values
+  static constexpr char const* KEY_ATTR_ENCODING_UTF_8 = "UTF-8";
+  static constexpr char const* KEY_ATTR_ENCODING_HEX = "Hex";
+
+  // Message Header Encoding allowable values
+  static constexpr char const* MSG_HEADER_ENCODING_UTF_8 = "UTF-8";
+  static constexpr char const* MSG_HEADER_ENCODING_HEX = "Hex";
+
+  // Duplicate Header Handling allowable values
+  static constexpr char const* MSG_HEADER_KEEP_FIRST = "Keep First";
+  static constexpr char const* MSG_HEADER_KEEP_LATEST = "Keep Latest";
+  static constexpr char const* MSG_HEADER_COMMA_SEPARATED_MERGE = 
"Comma-separated Merge";
+
+  // Flowfile attributes written
+  static constexpr char const* KAFKA_COUNT_ATTR = "kafka.count";  // Always 1 
until we start supporting merging from batches
+  static constexpr char const* KAFKA_MESSAGE_KEY_ATTR = "kafka.key";
+  static constexpr char const* KAFKA_OFFSET_ATTR = "kafka.offset";
+  static constexpr char const* KAFKA_PARTITION_ATTR = "kafka.partition";
+  static constexpr char const* KAFKA_TOPIC_ATTR = "kafka.topic";
+
+  static constexpr const std::size_t DEFAULT_MAX_POLL_RECORDS{ 1 };
+  static constexpr char const* DEFAULT_MAX_POLL_TIME = "4 seconds";
+  static constexpr const std::size_t METADATA_COMMUNICATIONS_TIMEOUT_MS{ 6 
};
+
+  explicit ConsumeKafka(std::string name, utils::Identifier uuid = 
utils::Identifier()) :

Review comment:
   Both `name` and `uuid` are taken by value, but are only used as const 
references. Consider passing by const ref.

##
File path: libminifi/test/TestBase.cpp
##
@@ -111,28 +109,19 @@ std::shared_ptr 
TestPlan::addProcessor(const std::shared_ptr node = 
std::make_shared(processor);
-
   processor_nodes_.push_back(node);
-
   // std::shared_ptr context = 
std::make_shared(node, controller_services_provider_, 
prov_repo_, flow_repo_, configuration_, content_repo_);
-
   auto contextBuilder = 
core::ClassLoader::getDefaultClassLoader().instantiate("ProcessContextBuilder");
-
  

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-02-01 Thread GitBox


szaszm commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r567680555



##
File path: libminifi/test/TestBase.cpp
##
@@ -62,78 +63,51 @@ TestPlan::~TestPlan() {
   for (auto& processor : configured_processors_) {
 processor->setScheduledState(core::ScheduledState::STOPPED);
   }
+  for (auto& connection : relationships_) {
+// This is a patch solving circular references between processors and 
connections
+connection->setSource(nullptr);
+connection->setDestination(nullptr);
+  }
   controller_services_provider_->clearControllerServices();
 }
 
 std::shared_ptr TestPlan::addProcessor(const 
std::shared_ptr , const std::string , const 
std::initializer_list& relationships,
-bool linkToPrevious) {
+bool linkToPrevious) {
   if (finalized) {
 return nullptr;
   }
   std::lock_guard guard(mutex);
-
   utils::Identifier uuid = utils::IdGenerator::getIdGenerator()->generate();
-
   processor->setStreamFactory(stream_factory);
   // initialize the processor
   processor->initialize();
   processor->setFlowIdentifier(flow_version_->getFlowIdentifier());
-
   processor_mapping_[processor->getUUID()] = processor;
-
   if (!linkToPrevious) {
 termination_ = *(relationships.begin());
   } else {
 std::shared_ptr last = processor_queue_.back();
-
 if (last == nullptr) {
   last = processor;
   termination_ = *(relationships.begin());
 }
-
-std::stringstream connection_name;
-connection_name << last->getUUIDStr() << "-to-" << processor->getUUIDStr();
-logger_->log_info("Creating %s connection for proc %d", 
connection_name.str(), processor_queue_.size() + 1);
-std::shared_ptr connection = 
std::make_shared(flow_repo_, content_repo_, 
connection_name.str());
-
 for (const auto& relationship : relationships) {
-  connection->addRelationship(relationship);
-}
-
-// link the connections so that we can test results at the end for this
-connection->setSource(last);
-connection->setDestination(processor);
-
-connection->setSourceUUID(last->getUUID());
-connection->setDestinationUUID(processor->getUUID());
-last->addConnection(connection);
-if (last != processor) {
-  processor->addConnection(connection);
+  addConnection(last, relationship, processor);

Review comment:
   There is a difference between 1 connection for 3 relationships and 3 
connections for 3 relationships. The latter may or may not be fine, so it's 
better not to change the behavior without a good reason.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-02-01 Thread GitBox


szaszm commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r567675222



##
File path: extensions/librdkafka/ConsumeKafka.cpp
##
@@ -0,0 +1,579 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConsumeKafka.h"
+
+#include 
+#include 
+
+#include "core/PropertyValidation.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/gsl.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+// The upper limit for Max Poll Time is 4 seconds. This is because Watchdog 
would potentially start
+// reporting issues with the processor health otherwise
+class ConsumeKafkaMaxPollTimeValidator : public TimePeriodValidator {
+ public:
+  ConsumeKafkaMaxPollTimeValidator(const std::string ) // NOLINT
+  : TimePeriodValidator(name) {
+  }
+  ~ConsumeKafkaMaxPollTimeValidator() override = default;
+
+  ValidationResult validate(const std::string& subject, const std::string& 
input) const override {
+uint64_t value;
+TimeUnit timeUnit;
+uint64_t value_as_ms;
+return 
ValidationResult::Builder::createBuilder().withSubject(subject).withInput(input).isValid(
+core::TimePeriodValue::StringToTime(input, value, timeUnit) &&
+org::apache::nifi::minifi::core::Property::ConvertTimeUnitToMS(value, 
timeUnit, value_as_ms) &&
+0 < value_as_ms && value_as_ms <= 4000).build();
+  }
+};
+}  // namespace core
+namespace processors {
+
+constexpr const std::size_t ConsumeKafka::DEFAULT_MAX_POLL_RECORDS;
+constexpr char const* ConsumeKafka::DEFAULT_MAX_POLL_TIME;
+
+constexpr char const* ConsumeKafka::TOPIC_FORMAT_NAMES;
+constexpr char const* ConsumeKafka::TOPIC_FORMAT_PATTERNS;
+
+core::Property 
ConsumeKafka::KafkaBrokers(core::PropertyBuilder::createProperty("Kafka 
Brokers")
+  ->withDescription("A comma-separated list of known Kafka Brokers in the 
format :.")
+  ->withDefaultValue("localhost:9092", 
core::StandardValidators::get().NON_BLANK_VALIDATOR)
+  ->supportsExpressionLanguage(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::SecurityProtocol(core::PropertyBuilder::createProperty("Security 
Protocol")
+  ->withDescription("This property is currently not supported. Protocol used 
to communicate with brokers. Corresponds to Kafka's 'security.protocol' 
property.")
+  ->withAllowableValues({SECURITY_PROTOCOL_PLAINTEXT/*, 
SECURITY_PROTOCOL_SSL, SECURITY_PROTOCOL_SASL_PLAINTEXT, 
SECURITY_PROTOCOL_SASL_SSL*/ })
+  ->withDefaultValue(SECURITY_PROTOCOL_PLAINTEXT)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::TopicNames(core::PropertyBuilder::createProperty("Topic Names")
+  ->withDescription("The name of the Kafka Topic(s) to pull from. Multiple 
topic names are supported as a comma separated list.")
+  ->supportsExpressionLanguage(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::TopicNameFormat(core::PropertyBuilder::createProperty("Topic Name 
Format")
+  ->withDescription("Specifies whether the Topic(s) provided are a comma 
separated list of names or a single regular expression.")
+  ->withAllowableValues({TOPIC_FORMAT_NAMES, 
TOPIC_FORMAT_PATTERNS})
+  ->withDefaultValue(TOPIC_FORMAT_NAMES)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::HonorTransactions(core::PropertyBuilder::createProperty("Honor 
Transactions")
+  ->withDescription(
+  "Specifies whether or not MiNiFi should honor transactional guarantees 
when communicating with Kafka. If false, the Processor will use an \"isolation 
level\" of "
+  "read_uncomitted. This means that messages will be received as soon as 
they are written to Kafka but will be pulled, even if the producer cancels the 
transactions. "
+  "If this value is true, MiNiFi will not receive any messages for which 
the producer's transaction was canceled, but this can result in some latency 
since the consumer "
+  "must wait for the producer to finish its entire transaction instead of 
pulling as the messages become available.")
+  ->withDefaultValue(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-25 Thread GitBox


szaszm commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r560990362



##
File path: extensions/librdkafka/ConsumeKafka.cpp
##
@@ -0,0 +1,579 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConsumeKafka.h"
+
+#include 
+#include 
+
+#include "core/PropertyValidation.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/gsl.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+// The upper limit for Max Poll Time is 4 seconds. This is because Watchdog 
would potentially start
+// reporting issues with the processor health otherwise
+class ConsumeKafkaMaxPollTimeValidator : public TimePeriodValidator {
+ public:
+  ConsumeKafkaMaxPollTimeValidator(const std::string ) // NOLINT
+  : TimePeriodValidator(name) {
+  }
+  ~ConsumeKafkaMaxPollTimeValidator() override = default;
+
+  ValidationResult validate(const std::string& subject, const std::string& 
input) const override {
+uint64_t value;
+TimeUnit timeUnit;
+uint64_t value_as_ms;
+return 
ValidationResult::Builder::createBuilder().withSubject(subject).withInput(input).isValid(
+core::TimePeriodValue::StringToTime(input, value, timeUnit) &&
+org::apache::nifi::minifi::core::Property::ConvertTimeUnitToMS(value, 
timeUnit, value_as_ms) &&
+0 < value_as_ms && value_as_ms <= 4000).build();
+  }
+};
+}  // namespace core
+namespace processors {
+
+constexpr const std::size_t ConsumeKafka::DEFAULT_MAX_POLL_RECORDS;
+constexpr char const* ConsumeKafka::DEFAULT_MAX_POLL_TIME;
+
+constexpr char const* ConsumeKafka::TOPIC_FORMAT_NAMES;
+constexpr char const* ConsumeKafka::TOPIC_FORMAT_PATTERNS;
+
+core::Property 
ConsumeKafka::KafkaBrokers(core::PropertyBuilder::createProperty("Kafka 
Brokers")
+  ->withDescription("A comma-separated list of known Kafka Brokers in the 
format :.")
+  ->withDefaultValue("localhost:9092", 
core::StandardValidators::get().NON_BLANK_VALIDATOR)
+  ->supportsExpressionLanguage(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::SecurityProtocol(core::PropertyBuilder::createProperty("Security 
Protocol")
+  ->withDescription("This property is currently not supported. Protocol used 
to communicate with brokers. Corresponds to Kafka's 'security.protocol' 
property.")
+  ->withAllowableValues({SECURITY_PROTOCOL_PLAINTEXT/*, 
SECURITY_PROTOCOL_SSL, SECURITY_PROTOCOL_SASL_PLAINTEXT, 
SECURITY_PROTOCOL_SASL_SSL*/ })
+  ->withDefaultValue(SECURITY_PROTOCOL_PLAINTEXT)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::TopicNames(core::PropertyBuilder::createProperty("Topic Names")
+  ->withDescription("The name of the Kafka Topic(s) to pull from. Multiple 
topic names are supported as a comma separated list.")
+  ->supportsExpressionLanguage(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::TopicNameFormat(core::PropertyBuilder::createProperty("Topic Name 
Format")
+  ->withDescription("Specifies whether the Topic(s) provided are a comma 
separated list of names or a single regular expression.")
+  ->withAllowableValues({TOPIC_FORMAT_NAMES, 
TOPIC_FORMAT_PATTERNS})
+  ->withDefaultValue(TOPIC_FORMAT_NAMES)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::HonorTransactions(core::PropertyBuilder::createProperty("Honor 
Transactions")
+  ->withDescription(
+  "Specifies whether or not MiNiFi should honor transactional guarantees 
when communicating with Kafka. If false, the Processor will use an \"isolation 
level\" of "
+  "read_uncomitted. This means that messages will be received as soon as 
they are written to Kafka but will be pulled, even if the producer cancels the 
transactions. "
+  "If this value is true, MiNiFi will not receive any messages for which 
the producer's transaction was canceled, but this can result in some latency 
since the consumer "
+  "must wait for the producer to finish its entire transaction instead of 
pulling as the messages become available.")
+  ->withDefaultValue(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-25 Thread GitBox


szaszm commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r560990362



##
File path: extensions/librdkafka/ConsumeKafka.cpp
##
@@ -0,0 +1,579 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConsumeKafka.h"
+
+#include 
+#include 
+
+#include "core/PropertyValidation.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/gsl.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+// The upper limit for Max Poll Time is 4 seconds. This is because Watchdog 
would potentially start
+// reporting issues with the processor health otherwise
+class ConsumeKafkaMaxPollTimeValidator : public TimePeriodValidator {
+ public:
+  ConsumeKafkaMaxPollTimeValidator(const std::string ) // NOLINT
+  : TimePeriodValidator(name) {
+  }
+  ~ConsumeKafkaMaxPollTimeValidator() override = default;
+
+  ValidationResult validate(const std::string& subject, const std::string& 
input) const override {
+uint64_t value;
+TimeUnit timeUnit;
+uint64_t value_as_ms;
+return 
ValidationResult::Builder::createBuilder().withSubject(subject).withInput(input).isValid(
+core::TimePeriodValue::StringToTime(input, value, timeUnit) &&
+org::apache::nifi::minifi::core::Property::ConvertTimeUnitToMS(value, 
timeUnit, value_as_ms) &&
+0 < value_as_ms && value_as_ms <= 4000).build();
+  }
+};
+}  // namespace core
+namespace processors {
+
+constexpr const std::size_t ConsumeKafka::DEFAULT_MAX_POLL_RECORDS;
+constexpr char const* ConsumeKafka::DEFAULT_MAX_POLL_TIME;
+
+constexpr char const* ConsumeKafka::TOPIC_FORMAT_NAMES;
+constexpr char const* ConsumeKafka::TOPIC_FORMAT_PATTERNS;
+
+core::Property 
ConsumeKafka::KafkaBrokers(core::PropertyBuilder::createProperty("Kafka 
Brokers")
+  ->withDescription("A comma-separated list of known Kafka Brokers in the 
format :.")
+  ->withDefaultValue("localhost:9092", 
core::StandardValidators::get().NON_BLANK_VALIDATOR)
+  ->supportsExpressionLanguage(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::SecurityProtocol(core::PropertyBuilder::createProperty("Security 
Protocol")
+  ->withDescription("This property is currently not supported. Protocol used 
to communicate with brokers. Corresponds to Kafka's 'security.protocol' 
property.")
+  ->withAllowableValues({SECURITY_PROTOCOL_PLAINTEXT/*, 
SECURITY_PROTOCOL_SSL, SECURITY_PROTOCOL_SASL_PLAINTEXT, 
SECURITY_PROTOCOL_SASL_SSL*/ })
+  ->withDefaultValue(SECURITY_PROTOCOL_PLAINTEXT)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::TopicNames(core::PropertyBuilder::createProperty("Topic Names")
+  ->withDescription("The name of the Kafka Topic(s) to pull from. Multiple 
topic names are supported as a comma separated list.")
+  ->supportsExpressionLanguage(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::TopicNameFormat(core::PropertyBuilder::createProperty("Topic Name 
Format")
+  ->withDescription("Specifies whether the Topic(s) provided are a comma 
separated list of names or a single regular expression.")
+  ->withAllowableValues({TOPIC_FORMAT_NAMES, 
TOPIC_FORMAT_PATTERNS})
+  ->withDefaultValue(TOPIC_FORMAT_NAMES)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::HonorTransactions(core::PropertyBuilder::createProperty("Honor 
Transactions")
+  ->withDescription(
+  "Specifies whether or not MiNiFi should honor transactional guarantees 
when communicating with Kafka. If false, the Processor will use an \"isolation 
level\" of "
+  "read_uncomitted. This means that messages will be received as soon as 
they are written to Kafka but will be pulled, even if the producer cancels the 
transactions. "
+  "If this value is true, MiNiFi will not receive any messages for which 
the producer's transaction was canceled, but this can result in some latency 
since the consumer "
+  "must wait for the producer to finish its entire transaction instead of 
pulling as the messages become available.")
+  ->withDefaultValue(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-20 Thread GitBox


szaszm commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r560987064



##
File path: extensions/librdkafka/rdkafka_utils.h
##
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/OptionalUtils.h"
+#include "rdkafka.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+enum class KafkaEncoding {
+  UTF8,
+  HEX
+};
+
+struct rd_kafka_conf_deleter {
+  void operator()(rd_kafka_conf_t* ptr) const noexcept { 
rd_kafka_conf_destroy(ptr); }
+};
+
+struct rd_kafka_producer_deleter {
+  void operator()(rd_kafka_t* ptr) const noexcept {
+rd_kafka_resp_err_t flush_ret = rd_kafka_flush(ptr, 1 /* ms */);  // 
Matching the wait time of KafkaConnection.cpp
+// If concerned, we could log potential errors here:
+// if (RD_KAFKA_RESP_ERR__TIMED_OUT == flush_ret) {
+//   std::cerr << "Deleting producer failed: time-out while trying to 
flush" << std::endl;
+// }
+rd_kafka_destroy(ptr);
+  }
+};
+
+struct rd_kafka_consumer_deleter {
+  void operator()(rd_kafka_t* ptr) const noexcept {
+rd_kafka_consumer_close(ptr);
+rd_kafka_destroy(ptr);
+  }
+};
+
+struct rd_kafka_topic_partition_list_deleter {
+  void operator()(rd_kafka_topic_partition_list_t* ptr) const noexcept { 
rd_kafka_topic_partition_list_destroy(ptr); }
+};
+
+struct rd_kafka_topic_conf_deleter {
+  void operator()(rd_kafka_topic_conf_t* ptr) const noexcept { 
rd_kafka_topic_conf_destroy(ptr); }
+};
+struct rd_kafka_topic_deleter {
+  void operator()(rd_kafka_topic_t* ptr) const noexcept { 
rd_kafka_topic_destroy(ptr); }
+};
+
+struct rd_kafka_message_deleter {
+  void operator()(rd_kafka_message_t* ptr) const noexcept { 
rd_kafka_message_destroy(ptr); }
+};
+
+struct rd_kafka_headers_deleter {
+  void operator()(rd_kafka_headers_t* ptr) const noexcept { 
rd_kafka_headers_destroy(ptr); }
+};
+
+template 
+void kafka_headers_for_each(const rd_kafka_headers_t* headers, T 
key_value_handle) {
+  const char *key;  // Null terminated, not to be freed
+  const void *value;
+  std::size_t size;
+  for (std::size_t i = 0; RD_KAFKA_RESP_ERR_NO_ERROR == 
rd_kafka_header_get_all(headers, i, , , ); ++i) {
+key_value_handle(std::string(key), std::string(static_cast(value), size));

Review comment:
   In that case pass a span down. The point is that the copy is not 
necessary because the usage is fully enclosed in the loop body i.e. the 
lifetime of `*value`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-13 Thread GitBox


szaszm commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r556352310



##
File path: extensions/librdkafka/rdkafka_utils.h
##
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/OptionalUtils.h"
+#include "rdkafka.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+enum class KafkaEncoding {
+  UTF8,
+  HEX
+};
+
+struct rd_kafka_conf_deleter {
+  void operator()(rd_kafka_conf_t* ptr) const noexcept { 
rd_kafka_conf_destroy(ptr); }
+};
+
+struct rd_kafka_producer_deleter {
+  void operator()(rd_kafka_t* ptr) const noexcept {
+rd_kafka_resp_err_t flush_ret = rd_kafka_flush(ptr, 1 /* ms */);  // 
Matching the wait time of KafkaConnection.cpp
+// If concerned, we could log potential errors here:
+// if (RD_KAFKA_RESP_ERR__TIMED_OUT == flush_ret) {
+//   std::cerr << "Deleting producer failed: time-out while trying to 
flush" << std::endl;
+// }
+rd_kafka_destroy(ptr);
+  }
+};
+
+struct rd_kafka_consumer_deleter {
+  void operator()(rd_kafka_t* ptr) const noexcept {
+rd_kafka_consumer_close(ptr);
+rd_kafka_destroy(ptr);
+  }
+};
+
+struct rd_kafka_topic_partition_list_deleter {
+  void operator()(rd_kafka_topic_partition_list_t* ptr) const noexcept { 
rd_kafka_topic_partition_list_destroy(ptr); }
+};
+
+struct rd_kafka_topic_conf_deleter {
+  void operator()(rd_kafka_topic_conf_t* ptr) const noexcept { 
rd_kafka_topic_conf_destroy(ptr); }
+};
+struct rd_kafka_topic_deleter {
+  void operator()(rd_kafka_topic_t* ptr) const noexcept { 
rd_kafka_topic_destroy(ptr); }
+};
+
+struct rd_kafka_message_deleter {
+  void operator()(rd_kafka_message_t* ptr) const noexcept { 
rd_kafka_message_destroy(ptr); }
+};
+
+struct rd_kafka_headers_deleter {
+  void operator()(rd_kafka_headers_t* ptr) const noexcept { 
rd_kafka_headers_destroy(ptr); }
+};
+
+template 
+void kafka_headers_for_each(const rd_kafka_headers_t* headers, T 
key_value_handle) {
+  const char *key;  // Null terminated, not to be freed
+  const void *value;
+  std::size_t size;
+  for (std::size_t i = 0; RD_KAFKA_RESP_ERR_NO_ERROR == 
rd_kafka_header_get_all(headers, i, , , ); ++i) {
+key_value_handle(std::string(key), std::string(static_cast(value), size));

Review comment:
   We can avoid allocation in every cycle by passing the raw `const char*` 
down to the consumer.

##
File path: extensions/librdkafka/rdkafka_utils.cpp
##
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+
+#include "rdkafka_utils.h"
+
+#include "Exception.h"
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+void setKafkaConfigurationField(rd_kafka_conf_t* configuration, const 
std::string& field_name, const std::string& value) {
+  static std::array errstr{};
+  rd_kafka_conf_res_t result;
+  result = rd_kafka_conf_set(configuration, field_name.c_str(), value.c_str(), 
errstr.data(), errstr.size());
+  if (RD_KAFKA_CONF_OK != result) {
+const std::string error_msg { errstr.data() };
+throw Exception(PROCESS_SCHEDULE_EXCEPTION, "rd_kafka configuration error: 
" + error_msg);
+  }
+}
+
+void 

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-09 Thread GitBox


szaszm commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r553850276



##
File path: libminifi/include/utils/ProcessorConfigUtils.h
##
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include 
+#include 
+
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+std::string getRequiredPropertyOrThrow(const core::ProcessContext* context, 
const std::string& property_name) {
+  std::string value;
+  if (!context->getProperty(property_name, value)) {
+throw Exception(PROCESS_SCHEDULE_EXCEPTION, property_name + " property 
missing or invalid");
+  }
+  return value;
+}
+
+std::vector listFromCommaSeparatedProperty(const 
core::ProcessContext* context, const std::string& property_name) {
+  std::string property_string;
+  context->getProperty(property_name, property_string);
+  return utils::StringUtils::splitAndTrim(property_string, ",");
+}
+
+std::vector listFromRequiredCommaSeparatedProperty(const 
core::ProcessContext* context, const std::string& property_name) {
+  return utils::StringUtils::splitAndTrim(getRequiredPropertyOrThrow(context, 
property_name), ",");
+}
+
+bool parseBooleanPropertyOrThrow(core::ProcessContext* context, const 
std::string& property_name) {
+  bool value;
+  std::string value_str = getRequiredPropertyOrThrow(context, property_name);
+  utils::optional maybe_value = utils::StringUtils::toBool(value_str);
+  if (!maybe_value) {
+throw Exception(PROCESS_SCHEDULE_EXCEPTION, property_name + " property is 
invalid: value is " + value_str);

Review comment:
   I'm fine with `std::runtime_error` or `std::invalid_argument` as well, 
but I don't want miscategorized exceptions.

##
File path: extensions/librdkafka/rdkafka_utils.cpp
##
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+
+#include "rdkafka_utils.h"
+
+#include "Exception.h"
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+void setKafkaConfigurationField(rd_kafka_conf_t* configuration, const 
std::string& field_name, const std::string& value) {
+  static std::array errstr{};
+  rd_kafka_conf_res_t result;
+  result = rd_kafka_conf_set(configuration, field_name.c_str(), value.c_str(), 
errstr.data(), errstr.size());
+  if (RD_KAFKA_CONF_OK != result) {
+const std::string error_msg { errstr.begin(), errstr.end() };
+throw Exception(PROCESS_SCHEDULE_EXCEPTION, "rd_kafka configuration error" 
+ error_msg);
+  }
+}
+
+void print_topics_list(std::shared_ptr logger, 
rd_kafka_topic_partition_list_t* kf_topic_partition_list) {
+  for (std::size_t i = 0; i < kf_topic_partition_list->cnt; ++i) {
+logger->log_debug("kf_topic_partition_list: topic: %s, partition: %d, 
offset:%lld]",
+kf_topic_partition_list->elems[i].topic, 
kf_topic_partition_list->elems[i].partition, 
kf_topic_partition_list->elems[i].offset);
+  }
+}
+
+void print_kafka_message(const rd_kafka_message_t* rkmessage, const 
std::shared_ptr& logger) {

Review comment:
   Taking a shared pointer instead of an observer pointer/reference makes 
it impossible to use the function with anything other than a shared pointer, 
like unique ptr, stack object, manually allocated or member of another object.





[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-08 Thread GitBox


szaszm commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r553897459



##
File path: extensions/librdkafka/rdkafka_utils.cpp
##
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+
+#include "rdkafka_utils.h"
+
+#include "Exception.h"
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+void setKafkaConfigurationField(rd_kafka_conf_t* configuration, const 
std::string& field_name, const std::string& value) {
+  static std::array errstr{};
+  rd_kafka_conf_res_t result;
+  result = rd_kafka_conf_set(configuration, field_name.c_str(), value.c_str(), 
errstr.data(), errstr.size());
+  if (RD_KAFKA_CONF_OK != result) {
+const std::string error_msg { errstr.begin(), errstr.end() };
+throw Exception(PROCESS_SCHEDULE_EXCEPTION, "rd_kafka configuration error" 
+ error_msg);
+  }
+}
+
+void print_topics_list(std::shared_ptr logger, 
rd_kafka_topic_partition_list_t* kf_topic_partition_list) {
+  for (std::size_t i = 0; i < kf_topic_partition_list->cnt; ++i) {
+logger->log_debug("kf_topic_partition_list: topic: %s, partition: %d, 
offset:%lld]",
+kf_topic_partition_list->elems[i].topic, 
kf_topic_partition_list->elems[i].partition, 
kf_topic_partition_list->elems[i].offset);
+  }
+}
+
+void print_kafka_message(const rd_kafka_message_t* rkmessage, const 
std::shared_ptr& logger) {

Review comment:
   Taking a shared pointer instead of an observer pointer/reference makes 
it impossible to use the function with anything other than a shared pointer, 
like unique ptr, stack object, manually allocated or member of another object.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-08 Thread GitBox


szaszm commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r553850276



##
File path: libminifi/include/utils/ProcessorConfigUtils.h
##
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include 
+#include 
+
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+std::string getRequiredPropertyOrThrow(const core::ProcessContext* context, 
const std::string& property_name) {
+  std::string value;
+  if (!context->getProperty(property_name, value)) {
+throw Exception(PROCESS_SCHEDULE_EXCEPTION, property_name + " property 
missing or invalid");
+  }
+  return value;
+}
+
+std::vector listFromCommaSeparatedProperty(const 
core::ProcessContext* context, const std::string& property_name) {
+  std::string property_string;
+  context->getProperty(property_name, property_string);
+  return utils::StringUtils::splitAndTrim(property_string, ",");
+}
+
+std::vector listFromRequiredCommaSeparatedProperty(const 
core::ProcessContext* context, const std::string& property_name) {
+  return utils::StringUtils::splitAndTrim(getRequiredPropertyOrThrow(context, 
property_name), ",");
+}
+
+bool parseBooleanPropertyOrThrow(core::ProcessContext* context, const 
std::string& property_name) {
+  bool value;
+  std::string value_str = getRequiredPropertyOrThrow(context, property_name);
+  utils::optional maybe_value = utils::StringUtils::toBool(value_str);
+  if (!maybe_value) {
+throw Exception(PROCESS_SCHEDULE_EXCEPTION, property_name + " property is 
invalid: value is " + value_str);

Review comment:
   I'm fine with `std::runtime_error` or `std::invalid_argument` as well, 
but I don't want miscategorized exceptions.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-07 Thread GitBox


szaszm commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r553477912



##
File path: libminifi/src/utils/StringUtils.cpp
##
@@ -46,8 +46,13 @@ std::string StringUtils::trim(const std::string& s) {
   return trimRight(trimLeft(s));
 }
 
-std::vector StringUtils::split(const std::string , const 
std::string ) {
+template
+std::vector split_transformed(const std::string& str, const 
std::string& delimiter, Fun transformation) {
   std::vector result;
+  if (delimiter.empty()) {
+std::transform(str.begin(), str.end(), std::back_inserter(result), [] 
(const char c) { return std::string{c}; });

Review comment:
   This branch should apply `transformation` as well.

##
File path: libminifi/include/utils/ProcessorConfigUtils.h
##
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include 
+#include 
+
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+std::string getRequiredPropertyOrThrow(const core::ProcessContext* context, 
const std::string& property_name) {
+  std::string value;
+  if (!context->getProperty(property_name, value)) {
+throw Exception(PROCESS_SCHEDULE_EXCEPTION, property_name + " property 
missing or invalid");
+  }
+  return value;
+}
+
+std::vector listFromCommaSeparatedProperty(const 
core::ProcessContext* context, const std::string& property_name) {
+  std::string property_string;
+  context->getProperty(property_name, property_string);
+  return utils::StringUtils::splitAndTrim(property_string, ",");
+}
+
+std::vector listFromRequiredCommaSeparatedProperty(const 
core::ProcessContext* context, const std::string& property_name) {
+  return utils::StringUtils::splitAndTrim(getRequiredPropertyOrThrow(context, 
property_name), ",");
+}
+
+bool parseBooleanPropertyOrThrow(core::ProcessContext* context, const 
std::string& property_name) {
+  bool value;
+  std::string value_str = getRequiredPropertyOrThrow(context, property_name);
+  utils::optional maybe_value = utils::StringUtils::toBool(value_str);
+  if (!maybe_value) {
+throw Exception(PROCESS_SCHEDULE_EXCEPTION, property_name + " property is 
invalid: value is " + value_str);

Review comment:
   Why are exceptions from these generic utilities categorized as 
`PROCESS_SCHEDULE_EXCEPTION`? I think `GENERAL_EXCEPTION` is the closest.

##
File path: libminifi/include/utils/ProcessorConfigUtils.h
##
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include 
+#include 
+
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+std::string getRequiredPropertyOrThrow(const core::ProcessContext* context, 
const std::string& property_name) {
+  std::string value;
+  if (!context->getProperty(property_name, value)) {
+throw Exception(PROCESS_SCHEDULE_EXCEPTION, property_name + " property 
missing or invalid");
+  }
+  return value;
+}
+
+std::vector listFromCommaSeparatedProperty(const 
core::ProcessContext* context, const std::string& property_name) {
+  std::string property_string;
+  context->getProperty(property_name, property_string);
+  return utils::StringUtils::splitAndTrim(property_string, ",");
+}
+
+std::vector listFromRequiredCommaSeparatedProperty(const 

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-07 Thread GitBox


szaszm commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r553241026



##
File path: extensions/librdkafka/rdkafka_utils.cpp
##
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+
+#include "rdkafka_utils.h"
+
+#include "Exception.h"
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+void setKafkaConfigurationField(rd_kafka_conf_t* configuration, const 
std::string& field_name, const std::string& value) {
+  static std::array errstr{};
+  rd_kafka_conf_res_t result;
+  result = rd_kafka_conf_set(configuration, field_name.c_str(), value.c_str(), 
errstr.data(), errstr.size());
+  if (RD_KAFKA_CONF_OK != result) {
+const std::string error_msg { errstr.begin(), errstr.end() };
+throw Exception(PROCESS_SCHEDULE_EXCEPTION, "rd_kafka configuration error" 
+ error_msg);
+  }
+}
+
+void print_topics_list(std::shared_ptr logger, 
rd_kafka_topic_partition_list_t* kf_topic_partition_list) {
+  for (std::size_t i = 0; i < kf_topic_partition_list->cnt; ++i) {
+logger->log_debug("kf_topic_partition_list: topic: %s, partition: %d, 
offset:%lld]",
+kf_topic_partition_list->elems[i].topic, 
kf_topic_partition_list->elems[i].partition, 
kf_topic_partition_list->elems[i].offset);
+  }
+}
+
+void print_kafka_message(const rd_kafka_message_t* rkmessage, const 
std::shared_ptr& logger) {
+  if (RD_KAFKA_RESP_ERR_NO_ERROR != rkmessage->err) {
+const std::string error_msg = "ConsumeKafka: received error message from 
broker. Librdkafka error msg: " + std::string(rd_kafka_err2str(rkmessage->err));
+throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, error_msg);

Review comment:
   ~I recommend passing a non-const rvalue to the `Exception` constructor 
to avoid a long string copy.~
   
   edit: I realized that due to noexcept copy requirements of exceptions, the 
message is copied in any case, so ignore this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-07 Thread GitBox


szaszm commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r553344804



##
File path: extensions/librdkafka/rdkafka_utils.cpp
##
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+
+#include "rdkafka_utils.h"
+
+#include "Exception.h"
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+void setKafkaConfigurationField(rd_kafka_conf_t* configuration, const 
std::string& field_name, const std::string& value) {
+  static std::array errstr{};
+  rd_kafka_conf_res_t result;
+  result = rd_kafka_conf_set(configuration, field_name.c_str(), value.c_str(), 
errstr.data(), errstr.size());
+  if (RD_KAFKA_CONF_OK != result) {
+const std::string error_msg { errstr.begin(), errstr.end() };
+throw Exception(PROCESS_SCHEDULE_EXCEPTION, "rd_kafka configuration error" 
+ error_msg);
+  }
+}
+
+void print_topics_list(std::shared_ptr logger, 
rd_kafka_topic_partition_list_t* kf_topic_partition_list) {
+  for (std::size_t i = 0; i < kf_topic_partition_list->cnt; ++i) {
+logger->log_debug("kf_topic_partition_list: topic: %s, partition: %d, 
offset:%lld]",
+kf_topic_partition_list->elems[i].topic, 
kf_topic_partition_list->elems[i].partition, 
kf_topic_partition_list->elems[i].offset);
+  }
+}
+
+void print_kafka_message(const rd_kafka_message_t* rkmessage, const 
std::shared_ptr& logger) {
+  if (RD_KAFKA_RESP_ERR_NO_ERROR != rkmessage->err) {
+const std::string error_msg = "ConsumeKafka: received error message from 
broker. Librdkafka error msg: " + std::string(rd_kafka_err2str(rkmessage->err));
+throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, error_msg);
+  }
+  std::string topicName = rd_kafka_topic_name(rkmessage->rkt);
+  std::string message(reinterpret_cast(rkmessage->payload), 
rkmessage->len);
+  const char* key = reinterpret_cast(rkmessage->key);
+  const std::size_t key_len = rkmessage->key_len;
+  rd_kafka_timestamp_type_t tstype;
+  int64_t timestamp;
+  timestamp = rd_kafka_message_timestamp(rkmessage, );
+  const char *tsname = "?";
+  if (tstype != RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) {
+if (tstype == RD_KAFKA_TIMESTAMP_CREATE_TIME) {
+  tsname = "create time";
+} else if (tstype == RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME) {
+  tsname = "log append time";
+}
+  }
+  const int64_t seconds_since_timestamp = timestamp ? 
static_cast(time(NULL)) - static_cast(timestamp / 1000) : 0;

Review comment:
   
   
   Is there a reason for printing a relative timestamp instead of the actual 
absolute one? It takes <= 1 sec of being on the screen (or in the log) for a 
relative timestamp to become outdated.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-07 Thread GitBox


szaszm commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r553216382



##
File path: extensions/librdkafka/ConsumeKafka.cpp
##
@@ -0,0 +1,553 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConsumeKafka.h"
+
+#include 
+#include 
+
+#include "core/PropertyValidation.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/gsl.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+// The upper limit for Max Poll Time is 4 seconds. This is because Watchdog 
would potentially start
+// reporting issues with the processor health otherwise
+class ConsumeKafkaMaxPollTimeValidator : public TimePeriodValidator {
+ public:
+  ConsumeKafkaMaxPollTimeValidator(const std::string ) // NOLINT
+  : TimePeriodValidator(name) {
+  }
+  ~ConsumeKafkaMaxPollTimeValidator() override = default;
+
+  ValidationResult validate(const std::string& subject, const std::string& 
input) const override {
+uint64_t value;
+TimeUnit timeUnit;
+uint64_t value_as_ms;
+return 
ValidationResult::Builder::createBuilder().withSubject(subject).withInput(input).isValid(
+core::TimePeriodValue::StringToTime(input, value, timeUnit) &&
+org::apache::nifi::minifi::core::Property::ConvertTimeUnitToMS(value, 
timeUnit, value_as_ms) &&
+0 < value_as_ms && value_as_ms <= 4000).build();
+  }
+};
+}  // namespace core
+namespace processors {
+
+constexpr const std::size_t ConsumeKafka::DEFAULT_MAX_POLL_RECORDS;
+constexpr char const* ConsumeKafka::DEFAULT_MAX_POLL_TIME;
+
+constexpr char const* ConsumeKafka::TOPIC_FORMAT_NAMES;
+constexpr char const* ConsumeKafka::TOPIC_FORMAT_PATTERNS;
+
+core::Property 
ConsumeKafka::KafkaBrokers(core::PropertyBuilder::createProperty("Kafka 
Brokers")
+  ->withDescription("A comma-separated list of known Kafka Brokers in the 
format :.")
+  ->withDefaultValue("localhost:9092", 
core::StandardValidators::get().NON_BLANK_VALIDATOR)
+  ->supportsExpressionLanguage(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::SecurityProtocol(core::PropertyBuilder::createProperty("Security 
Protocol")
+  ->withDescription("This property is currently not supported. Protocol used 
to communicate with brokers. Corresponds to Kafka's 'security.protocol' 
property.")
+  ->withAllowableValues({SECURITY_PROTOCOL_PLAINTEXT/*, 
SECURITY_PROTOCOL_SSL, SECURITY_PROTOCOL_SASL_PLAINTEXT, 
SECURITY_PROTOCOL_SASL_SSL*/ })
+  ->withDefaultValue(SECURITY_PROTOCOL_PLAINTEXT)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::TopicNames(core::PropertyBuilder::createProperty("Topic Names")
+  ->withDescription("The name of the Kafka Topic(s) to pull from. Multiple 
topic names are supported as a comma separated list.")
+  ->supportsExpressionLanguage(true)
+  ->build());
+
+core::Property 
ConsumeKafka::TopicNameFormat(core::PropertyBuilder::createProperty("Topic Name 
Format")
+  ->withDescription("Specifies whether the Topic(s) provided are a comma 
separated list of names or a single regular expression.")
+  ->withAllowableValues({TOPIC_FORMAT_NAMES, 
TOPIC_FORMAT_PATTERNS})
+  ->withDefaultValue(TOPIC_FORMAT_NAMES)
+  ->build());
+
+core::Property 
ConsumeKafka::HonorTransactions(core::PropertyBuilder::createProperty("Honor 
Transactions")
+  ->withDescription(
+  "Specifies whether or not MiNiFi should honor transactional guarantees 
when communicating with Kafka. If false, the Processor will use an \"isolation 
level\" of "
+  "read_uncomitted. This means that messages will be received as soon as 
they are written to Kafka but will be pulled, even if the producer cancels the 
transactions. "
+  "If this value is true, MiNiFi will not receive any messages for which 
the producer's transaction was canceled, but this can result in some latency 
since the consumer "
+  "must wait for the producer to finish its entire transaction instead of 
pulling as the messages become available.")
+  ->withDefaultValue(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::GroupID(core::PropertyBuilder::createProperty("Group ID")
+  

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-07 Thread GitBox


szaszm commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r553169167



##
File path: extensions/librdkafka/ConsumeKafka.cpp
##
@@ -0,0 +1,553 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConsumeKafka.h"
+
+#include 
+#include 
+
+#include "core/PropertyValidation.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/gsl.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+// The upper limit for Max Poll Time is 4 seconds. This is because Watchdog 
would potentially start
+// reporting issues with the processor health otherwise
+class ConsumeKafkaMaxPollTimeValidator : public TimePeriodValidator {
+ public:
+  ConsumeKafkaMaxPollTimeValidator(const std::string ) // NOLINT
+  : TimePeriodValidator(name) {
+  }
+  ~ConsumeKafkaMaxPollTimeValidator() override = default;
+
+  ValidationResult validate(const std::string& subject, const std::string& 
input) const override {
+uint64_t value;
+TimeUnit timeUnit;
+uint64_t value_as_ms;
+return 
ValidationResult::Builder::createBuilder().withSubject(subject).withInput(input).isValid(
+core::TimePeriodValue::StringToTime(input, value, timeUnit) &&
+org::apache::nifi::minifi::core::Property::ConvertTimeUnitToMS(value, 
timeUnit, value_as_ms) &&
+0 < value_as_ms && value_as_ms <= 4000).build();
+  }
+};
+}  // namespace core
+namespace processors {
+
+constexpr const std::size_t ConsumeKafka::DEFAULT_MAX_POLL_RECORDS;
+constexpr char const* ConsumeKafka::DEFAULT_MAX_POLL_TIME;
+
+constexpr char const* ConsumeKafka::TOPIC_FORMAT_NAMES;
+constexpr char const* ConsumeKafka::TOPIC_FORMAT_PATTERNS;
+
+core::Property 
ConsumeKafka::KafkaBrokers(core::PropertyBuilder::createProperty("Kafka 
Brokers")
+  ->withDescription("A comma-separated list of known Kafka Brokers in the 
format :.")
+  ->withDefaultValue("localhost:9092", 
core::StandardValidators::get().NON_BLANK_VALIDATOR)
+  ->supportsExpressionLanguage(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::SecurityProtocol(core::PropertyBuilder::createProperty("Security 
Protocol")
+  ->withDescription("This property is currently not supported. Protocol used 
to communicate with brokers. Corresponds to Kafka's 'security.protocol' 
property.")
+  ->withAllowableValues({SECURITY_PROTOCOL_PLAINTEXT/*, 
SECURITY_PROTOCOL_SSL, SECURITY_PROTOCOL_SASL_PLAINTEXT, 
SECURITY_PROTOCOL_SASL_SSL*/ })
+  ->withDefaultValue(SECURITY_PROTOCOL_PLAINTEXT)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::TopicNames(core::PropertyBuilder::createProperty("Topic Names")
+  ->withDescription("The name of the Kafka Topic(s) to pull from. Multiple 
topic names are supported as a comma separated list.")
+  ->supportsExpressionLanguage(true)
+  ->build());
+
+core::Property 
ConsumeKafka::TopicNameFormat(core::PropertyBuilder::createProperty("Topic Name 
Format")
+  ->withDescription("Specifies whether the Topic(s) provided are a comma 
separated list of names or a single regular expression.")
+  ->withAllowableValues({TOPIC_FORMAT_NAMES, 
TOPIC_FORMAT_PATTERNS})
+  ->withDefaultValue(TOPIC_FORMAT_NAMES)
+  ->build());
+
+core::Property 
ConsumeKafka::HonorTransactions(core::PropertyBuilder::createProperty("Honor 
Transactions")
+  ->withDescription(
+  "Specifies whether or not MiNiFi should honor transactional guarantees 
when communicating with Kafka. If false, the Processor will use an \"isolation 
level\" of "
+  "read_uncomitted. This means that messages will be received as soon as 
they are written to Kafka but will be pulled, even if the producer cancels the 
transactions. "
+  "If this value is true, MiNiFi will not receive any messages for which 
the producer's transaction was canceled, but this can result in some latency 
since the consumer "
+  "must wait for the producer to finish its entire transaction instead of 
pulling as the messages become available.")
+  ->withDefaultValue(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::GroupID(core::PropertyBuilder::createProperty("Group ID")
+  

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-06 Thread GitBox


szaszm commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r552525343



##
File path: extensions/librdkafka/ConsumeKafka.h
##
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include 
+#include 
+#include 
+#include 
+
+#include "core/Processor.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "rdkafka.h"
+#include "rdkafka_utils.h"
+#include "KafkaConnection.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class ConsumeKafka : public core::Processor {
+ public:
+  static constexpr char const* ProcessorName = "ConsumeKafka";
+
+  // Supported Properties
+  static core::Property KafkaBrokers;
+  static core::Property SecurityProtocol;
+  static core::Property TopicNames;
+  static core::Property TopicNameFormat;
+  static core::Property HonorTransactions;
+  static core::Property GroupID;
+  static core::Property OffsetReset;
+  static core::Property KeyAttributeEncoding;
+  static core::Property MessageDemarcator;
+  static core::Property MessageHeaderEncoding;
+  static core::Property HeadersToAddAsAttributes;
+  static core::Property DuplicateHeaderHandling;
+  static core::Property MaxPollRecords;
+  static core::Property MaxPollTime;
+  static core::Property SessionTimeout;
+
+  // Supported Relationships
+  static const core::Relationship Success;
+
+  // Security Protocol allowable values
+  static constexpr char const* SECURITY_PROTOCOL_PLAINTEXT = "PLAINTEXT";
+  static constexpr char const* SECURITY_PROTOCOL_SSL = "SSL";
+  static constexpr char const* SECURITY_PROTOCOL_SASL_PLAINTEXT = 
"SASL_PLAINTEXT";
+  static constexpr char const* SECURITY_PROTOCOL_SASL_SSL = "SASL_SSL";
+
+  // Topic Name Format allowable values
+  static constexpr char const* TOPIC_FORMAT_NAMES = "Names";
+  static constexpr char const* TOPIC_FORMAT_PATTERNS = "Patterns";
+
+  // Offset Reset allowable values
+  static constexpr char const* OFFSET_RESET_EARLIEST = "earliest";
+  static constexpr char const* OFFSET_RESET_LATEST = "latest";
+  static constexpr char const* OFFSET_RESET_NONE = "none";
+
+  // Key Attribute Encoding allowable values
+  static constexpr char const* KEY_ATTR_ENCODING_UTF_8 = "UTF-8";
+  static constexpr char const* KEY_ATTR_ENCODING_HEX = "Hex";
+
+  // Message Header Encoding allowable values
+  static constexpr char const* MSG_HEADER_ENCODING_UTF_8 = "UTF-8";
+  static constexpr char const* MSG_HEADER_ENCODING_HEX = "Hex";
+
+  // Duplicate Header Handling allowable values
+  static constexpr char const* MSG_HEADER_KEEP_FIRST = "Keep First";
+  static constexpr char const* MSG_HEADER_KEEP_LATEST = "Keep Latest";
+  static constexpr char const* MSG_HEADER_COMMA_SEPARATED_MERGE = 
"Comma-separated Merge";
+
+  // Flowfile attributes written
+  static constexpr char const* KAFKA_COUNT_ATTR = "kafka.count";  // Always 1 
until we start supporting merging from batches
+  static constexpr char const* KAFKA_MESSAGE_KEY_ATTR = "kafka.key";
+  static constexpr char const* KAFKA_OFFSET_ATTR = "kafka.offset";
+  static constexpr char const* KAFKA_PARTITION_ATTR = "kafka.partition";
+  static constexpr char const* KAFKA_TOPIC_ATTR = "kafka.topic";
+
+  static constexpr const std::size_t DEFAULT_MAX_POLL_RECORDS{ 1 };
+  static constexpr char const* DEFAULT_MAX_POLL_TIME = "4 seconds";
+  static constexpr const std::size_t METADATA_COMMUNICATIONS_TIMEOUT_MS{ 6 
};
+
+  explicit ConsumeKafka(std::string name, utils::Identifier uuid = 
utils::Identifier()) :
+  Processor(name, uuid),
+  logger_(logging::LoggerFactory::getLogger()) {}
+
+  virtual ~ConsumeKafka() = default;
+
+ public:
+  bool supportsDynamicProperties() override {
+return true;
+  }
+  /**
+   * Function that's executed when the processor is scheduled.
+   * @param context process context.
+   * @param sessionFactory process session factory that is used when creating
+   * ProcessSession objects.
+   */
+  void onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* 
/* sessionFactory */) override;
+  /**
+   * Execution trigger for the RetryFlowFile Processor
+   * 

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2020-12-10 Thread GitBox


szaszm commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r540164161



##
File path: extensions/librdkafka/ConsumeKafka.h
##
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include 
+#include 
+#include 
+#include 
+
+#include "core/Processor.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "rdkafka.h"
+#include "rdkafka_utils.h"
+#include "KafkaConnection.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+// The upper limit for Max Poll Time is 4 seconds. This is because Watchdog 
would potentially start
+// reporting issues with the processor health otherwise
+class ConsumeKafkaMaxPollTimeValidator : public TimePeriodValidator {
+ public:
+  ConsumeKafkaMaxPollTimeValidator(const std::string ) // NOLINT
+  : TimePeriodValidator(name) {
+  }
+  ~ConsumeKafkaMaxPollTimeValidator() override = default;
+
+  ValidationResult validate(const std::string& subject, const std::string& 
input) const override {
+uint64_t value;
+TimeUnit timeUnit;
+uint64_t value_as_ms;
+return 
ValidationResult::Builder::createBuilder().withSubject(subject).withInput(input).isValid(
+core::TimePeriodValue::StringToTime(input, value, timeUnit) &&
+org::apache::nifi::minifi::core::Property::ConvertTimeUnitToMS(value, 
timeUnit, value_as_ms) &&
+0 < value_as_ms && value_as_ms <= 4000).build();
+  }
+};

Review comment:
   I think ConsumeKafka.cpp would be fine as it seems to be used in 
ConsumeKafka only.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2020-12-10 Thread GitBox


szaszm commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r540113524



##
File path: libminifi/test/unit/StringUtilsTests.cpp
##
@@ -50,6 +50,16 @@ TEST_CASE("TestStringUtils::split4", "[test split 
classname]") {
   REQUIRE(expected == 
StringUtils::split(org::apache::nifi::minifi::core::getClassName(),
 "::"));
 }
 
+TEST_CASE("TestStringUtils::split5", "[test split delimiter not specified]") {

Review comment:
   Default (no delimiter specified) and empty delimiter are different. 
Python fails on empty delimiter, ruby splits by characters.
   
   The delimiter can have a default argument of `" "` (whitespace).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2020-12-08 Thread GitBox


szaszm commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r538697896



##
File path: libminifi/include/utils/GeneralUtils.h
##
@@ -49,6 +49,14 @@ constexpr T intdiv_ceil(T numerator, T denominator) {
   : numerator / denominator + (numerator % denominator != 0));
 }
 
+// from https://stackoverflow.com/questions/15202474
+struct identity {
+template
+constexpr auto operator()(U&& v) const noexcept -> 
decltype(std::forward(v)) {
+return std::forward(v);
+}
+};
+

Review comment:
   Consider transparently falling back to 
[`std::identity`](https://en.cppreference.com/w/cpp/utility/functional/identity)
 when compiled on >=C++20

##
File path: libminifi/src/utils/StringUtils.cpp
##
@@ -59,13 +63,21 @@ std::vector StringUtils::split(const 
std::string , const std::s
   break;
 }
 auto next = std::find_if(curr, end, is_func);
-result.push_back(std::string(curr, next));
+result.push_back(transformation(std::string(curr, next)));

Review comment:
   I would take `transformation` by value. You could also forward it to the 
call, but this doesn't make sense when it's called more than once, because the 
move case would call a moved-from function object.
   
   If the caller needs to keep state, they can use `std::ref` and pass by value.

##
File path: libminifi/test/unit/StringUtilsTests.cpp
##
@@ -50,6 +50,16 @@ TEST_CASE("TestStringUtils::split4", "[test split 
classname]") {
   REQUIRE(expected == 
StringUtils::split(org::apache::nifi::minifi::core::getClassName(),
 "::"));
 }
 
+TEST_CASE("TestStringUtils::split5", "[test split delimiter not specified]") {

Review comment:
   What was the failure? Most languages split between each character on 
empty delimiter.
   
![2020-12-08-193418_485x47_scrot](https://user-images.githubusercontent.com/1170582/101526035-6c4b2200-398c-11eb-90df-b4bbc57196aa.png)
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org