[GitHub] [nifi-minifi-cpp] nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and
nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, - PublishKafka, OPC processors should config and URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r376420231 ## File path: libminifi/include/core/state/Value.h ## @@ -87,6 +88,15 @@ class Value { type_id = std::type_index(typeid(T)); } + virtual bool getValue(uint32_t ) { +const auto negative = string_value.find_first_of('-') != std::string::npos; Review comment: Will keep the current impl for now This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and
nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, - PublishKafka, OPC processors should config and URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r376419000 ## File path: libminifi/test/integration/IntegrationBase.h ## @@ -42,6 +42,17 @@ class IntegrationBase { configureSecurity(); } + // Return the last position and number of occurrences. + std::pair countPatInStr(const std::string , const std::string ) { +size_t last_pos = 0; +unsigned int occurrences = 0; Review comment: Done, @szaszm thanks for the detailed explanation. ^^ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and
nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, - PublishKafka, OPC processors should config and URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r376417867 ## File path: libminifi/include/core/state/Value.h ## @@ -115,6 +125,64 @@ class Value { std::type_index type_id; }; +class UInt32Value : public Value { + public: + explicit UInt32Value(uint32_t value) + : Value(std::to_string(value)), +value(value) { +setTypeId(); + } + + explicit UInt32Value(const std::string ) + : Value(strvalue), +value(std::stoul(strvalue)) { +/** + * This is a fundamental change in that we would be changing where this error occurs. + * We should be prudent about breaking backwards compatibility, but since Uint32Value + * is only created with a validator and type, we **should** be okay. + */ +const auto negative = strvalue.find_first_of('-') != std::string::npos; + if (negative){ + throw std::out_of_range("negative value detected"); + } +setTypeId(); + } + + uint32_t getValue() const { +return value; + } + protected: + + virtual bool getValue(uint32_t ) { +ref = value; +return true; + } + + virtual bool getValue(int ) { +if (value <= (std::numeric_limits::max)()) { Review comment: Could you elaborate it? This is derived from previous impl of uint64, but I think it makes sense (in logic, not sure that's actually what we want). From my understanding, this check whether `value`(0..2^32-1) is smaller than 2^31-1 or not, if it is smaller then we assign to `ref` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and
nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, - PublishKafka, OPC processors should config and URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r375325208 ## File path: extensions/librdkafka/PublishKafka.cpp ## @@ -187,81 +239,77 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr auto key = conn->getKey(); if (key->brokers_.empty()) { -logger_->log_error("There are no brokers"); -return false; +throw Exception(PROCESS_SCHEDULE_EXCEPTION, "There are no brokers"); } result = rd_kafka_conf_set(conf_, "bootstrap.servers", key->brokers_.c_str(), errstr.data(), errstr.size()); logger_->log_debug("PublishKafka: bootstrap.servers [%s]", key->brokers_); if (result != RD_KAFKA_CONF_OK) { -logger_->log_error("PublishKafka: configure error result [%s]", errstr.data()); -return false; +auto error_msg = utils::StringUtils::join_pack("PublishKafka: configure error result [%s]", errstr.data()); Review comment: okay, then I will do a follow-up refactoring later This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and
nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, - PublishKafka, OPC processors should config and URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r374837546 ## File path: extensions/librdkafka/tests/PublishKafkaOnScheduleTests.cpp ## @@ -0,0 +1,84 @@ +/** + * @file GenerateFlowFile.h + * GenerateFlowFile class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#undef NDEBUG +#include +#include "../../../libminifi/test/integration/IntegrationBase.h" +#include "core/logging/Logger.h" +#include "../../../libminifi/test/TestBase.h" +#include "../PublishKafka.h" + +class PublishKafkaOnScheduleTests : public IntegrationBase { +public: +virtual void runAssertions() { + std::string logs = LogTestController::getInstance().log_output.str(); + size_t pos = 0; + size_t last_pos = 0; + unsigned int occurances = 0; + do { +pos = logs.find(" value 1 is outside allowed range 1000..10", pos); Review comment: Wrapped the logic to a new function This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and
nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, - PublishKafka, OPC processors should config and URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r374837145 ## File path: extensions/librdkafka/PublishKafka.cpp ## @@ -141,6 +141,59 @@ void PublishKafka::initialize() { void PublishKafka::onSchedule(const std::shared_ptr , const std::shared_ptr ) { interrupted_ = false; + + // Try to get a KafkaConnection + std::string client_id, brokers; + if (!context->getProperty(ClientName.getName(), client_id)) { +throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Client Name property missing or invalid"); + } + if (!context->getProperty(SeedBrokers.getName(), brokers)) { +throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Known Brokers property missing or invalid"); + } + + // Get some properties not (only) used directly to set up librdkafka + std::string value; + + // Batch Size + value = ""; + if (context->getProperty(BatchSize.getName(), batch_size_)) { +logger_->log_debug("PublishKafka: Batch Size [%lu]", batch_size_); + } else { +batch_size_ = 10; + } + + // Target Batch Payload Size + value = ""; + if (context->getProperty(TargetBatchPayloadSize.getName(), target_batch_payload_size_)) { +logger_->log_debug("PublishKafka: Target Batch Payload Size [%llu]", target_batch_payload_size_); + } else { +target_batch_payload_size_ = 512 * 1024U; + } + + // Max Flow Segment Size + value = ""; + if (context->getProperty(MaxFlowSegSize.getName(), max_flow_seg_size_)) { +logger_->log_debug("PublishKafka: Max Flow Segment Size [%llu]", max_flow_seg_size_); + } else { +max_flow_seg_size_ = 0U; Review comment: Fixed, we already had default value, maybe we were overcareful This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and
nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, - PublishKafka, OPC processors should config and URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r374835864 ## File path: extensions/standard-processors/processors/LogAttribute.cpp ## @@ -88,23 +88,14 @@ void LogAttribute::initialize() { } void LogAttribute::onSchedule(const std::shared_ptr , const std::shared_ptr ) { - core::Property flowsToLog = FlowFilesToLog; - - if (getProperty(FlowFilesToLog.getName(), flowsToLog)) { -// we are going this route since to avoid breaking backwards compatibility the get property function doesn't perform validation ( That's done -// in configuration. In future releases we can add that exception handling there. -if (!flowsToLog.getValue().validate("Validating FlowFilesToLog").valid()) - throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid value for flowfiles to log: " + flowsToLog.getValue().to_string()); -flowfiles_to_log_ = flowsToLog.getValue(); - } Review comment: This line, if I wrote like: ``` if (!context->getProperty(FlowFilesToLog.getName(), flowfiles_to_log_)) { throw ... } ``` Then the GetTCPTest (only this) will fail due to exception, so I guess that test's configuration has issues, will take a look later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and
nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, - PublishKafka, OPC processors should config and URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r374834609 ## File path: extensions/standard-processors/processors/LogAttribute.cpp ## @@ -88,23 +88,14 @@ void LogAttribute::initialize() { } void LogAttribute::onSchedule(const std::shared_ptr , const std::shared_ptr ) { - core::Property flowsToLog = FlowFilesToLog; - - if (getProperty(FlowFilesToLog.getName(), flowsToLog)) { -// we are going this route since to avoid breaking backwards compatibility the get property function doesn't perform validation ( That's done -// in configuration. In future releases we can add that exception handling there. -if (!flowsToLog.getValue().validate("Validating FlowFilesToLog").valid()) - throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid value for flowfiles to log: " + flowsToLog.getValue().to_string()); -flowfiles_to_log_ = flowsToLog.getValue(); - } - std::string value; - if (context->getProperty(HexencodePayload.getName(), value)) { -utils::StringUtils::StringToBool(value, hexencode_); - } - if (context->getProperty(MaxPayloadLineLength.getName(), value)) { -core::Property::StringToInt(value, max_line_length_); - } + context->getProperty(FlowFilesToLog.getName(), flowfiles_to_log_); Review comment: This line, if I wrote like: ``` if (!context->getProperty(FlowFilesToLog.getName(), flowfiles_to_log_)) { throw ... } ``` Then the GetTCPTest (only this) will fail due to exception, so I guess that test's configuration has issues, will take a look later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and
nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, - PublishKafka, OPC processors should config and URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r374834609 ## File path: extensions/standard-processors/processors/LogAttribute.cpp ## @@ -88,23 +88,14 @@ void LogAttribute::initialize() { } void LogAttribute::onSchedule(const std::shared_ptr , const std::shared_ptr ) { - core::Property flowsToLog = FlowFilesToLog; - - if (getProperty(FlowFilesToLog.getName(), flowsToLog)) { -// we are going this route since to avoid breaking backwards compatibility the get property function doesn't perform validation ( That's done -// in configuration. In future releases we can add that exception handling there. -if (!flowsToLog.getValue().validate("Validating FlowFilesToLog").valid()) - throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid value for flowfiles to log: " + flowsToLog.getValue().to_string()); -flowfiles_to_log_ = flowsToLog.getValue(); - } - std::string value; - if (context->getProperty(HexencodePayload.getName(), value)) { -utils::StringUtils::StringToBool(value, hexencode_); - } - if (context->getProperty(MaxPayloadLineLength.getName(), value)) { -core::Property::StringToInt(value, max_line_length_); - } + context->getProperty(FlowFilesToLog.getName(), flowfiles_to_log_); Review comment: This line, if I wrote like: ``` if (!context->getProperty(FlowFilesToLog.getName(), flowfiles_to_log_)) { throw ... } ``` Then the GetTCPTest (only this) will fail due to exception, so I guess that test's configuration has issues, will take a look later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and
nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, - PublishKafka, OPC processors should config and URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r370248291 ## File path: libminifi/src/core/PropertyValidation.cpp ## @@ -27,6 +27,7 @@ std::shared_ptr StandardValidators::VALID = std::make_shared< StandardValidators::StandardValidators() { INVALID = std::make_shared(false, "INVALID"); INTEGER_VALIDATOR = std::make_shared("INTEGER_VALIDATOR"); + UNSIGNED_INT_VALIDATOR = std::make_shared("UNSIGNED_INT_VALIDATOR"); Review comment: Side note: IIRC, Java 8 provides some Unsinged methods for Integer class so that it can work in range 0..2^32-1, though I am not proficient enough at Java to understand why they dont provide unsigned keyword. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and
nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, - PublishKafka, OPC processors should config and URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r370248291 ## File path: libminifi/src/core/PropertyValidation.cpp ## @@ -27,6 +27,7 @@ std::shared_ptr StandardValidators::VALID = std::make_shared< StandardValidators::StandardValidators() { INVALID = std::make_shared(false, "INVALID"); INTEGER_VALIDATOR = std::make_shared("INTEGER_VALIDATOR"); + UNSIGNED_INT_VALIDATOR = std::make_shared("UNSIGNED_INT_VALIDATOR"); Review comment: Side note: IIRC, Java 8 provides some Unsinged methods for Integer class so that it can work in range 0..2^32-1. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and
nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, - PublishKafka, OPC processors should config and URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r369668462 ## File path: extensions/librdkafka/PublishKafka.cpp ## @@ -187,81 +239,77 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr auto key = conn->getKey(); if (key->brokers_.empty()) { -logger_->log_error("There are no brokers"); -return false; +throw Exception(PROCESS_SCHEDULE_EXCEPTION, "There are no brokers"); } result = rd_kafka_conf_set(conf_, "bootstrap.servers", key->brokers_.c_str(), errstr.data(), errstr.size()); logger_->log_debug("PublishKafka: bootstrap.servers [%s]", key->brokers_); if (result != RD_KAFKA_CONF_OK) { -logger_->log_error("PublishKafka: configure error result [%s]", errstr.data()); -return false; +auto error_msg = utils::StringUtils::join_pack("PublishKafka: configure error result [%s]", errstr.data()); Review comment: I actually did not expect I forgot this one :), @arpadboda what do you think about adding formatting library? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and
nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, - PublishKafka, OPC processors should config and URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r369668522 ## File path: extensions/librdkafka/tests/CMakeLists.txt ## @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +file(GLOB KAFKA_TESTS "*.cpp") + +SET(KAFKA_TEST_COUNT 0) + +FOREACH(testfile ${KAFKA_TESTS}) +get_filename_component(testfilename "${testfile}" NAME_WE) +add_executable("${testfilename}" "${testfile}") +target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/librdkafka/") +target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/standard-processors") +target_wholearchive_library(${testfilename} minifi-rdkafka-extensions) +target_wholearchive_library(${testfilename} minifi-standard-processors) +createTests("${testfilename}") +MATH(EXPR KAFKA_TEST_COUNT "${KAFKA_TEST_COUNT}+1") +if (${testfilename} MATCHES "(.)*OnSchedule(.)*") Review comment: Just a workaround for myself, I have some unit tests which require local Kafka server but never commit them. Should I add them in or modify this file for this test only? (will rewrite anyway because that line looks weird) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and
nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, - PublishKafka, OPC processors should config and URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r369656857 ## File path: libminifi/src/core/PropertyValidation.cpp ## @@ -27,6 +27,7 @@ std::shared_ptr StandardValidators::VALID = std::make_shared< StandardValidators::StandardValidators() { INVALID = std::make_shared(false, "INVALID"); INTEGER_VALIDATOR = std::make_shared("INTEGER_VALIDATOR"); + UNSIGNED_INT_VALIDATOR = std::make_shared("UNSIGNED_INT_VALIDATOR"); Review comment: https://github.com/apache/nifi/blob/master/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java#L292 :sweat: This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services