[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-06 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1039819121


##
libminifi/include/utils/Enum.h:
##
@@ -39,6 +39,7 @@ namespace utils {
 constexpr Clazz(Type value = static_cast(-1)) : value_{value} {} \
 explicit Clazz(const std::string& str) : value_{parse(str.c_str()).value_} 
{} \
 explicit Clazz(const char* str) : value_{parse(str).value_} {} \
+explicit Clazz(std::nullptr_t) = delete; \

Review Comment:
   Previously initializing with `0` was allowed, with the `const char*` 
constructor. This could be misleading, now it's not allowed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (MINIFICPP-1932) TailFileCronTest fails sometimes

2022-12-06 Thread Ferenc Gerlits (Jira)


 [ 
https://issues.apache.org/jira/browse/MINIFICPP-1932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ferenc Gerlits reassigned MINIFICPP-1932:
-

Assignee: Ferenc Gerlits

> TailFileCronTest fails sometimes
> 
>
> Key: MINIFICPP-1932
> URL: https://issues.apache.org/jira/browse/MINIFICPP-1932
> Project: Apache NiFi MiNiFi C++
>  Issue Type: Bug
>Reporter: Gábor Gyimesi
>Assignee: Ferenc Gerlits
>Priority: Minor
>  Labels: MiNiFi-CPP-Hygiene
> Attachments: TailFileCronTest_failure.log
>
>
> TailFileTest: 
> /home/ggyimesi/projects/nifi-minifi-cpp-fork/extensions/standard-processors/tests/integration/TailFileTest.cpp:70:
>  virtual void TailFileTestHarness::runAssertions(): Assertion 
> `verifyLogLinePresenceInPollTime(std::chrono::milliseconds(wait_time_), "5 
> flowfiles were received from TailFile input", "Looking for delimiter 0xA", 
> "li\\ne5")' failed.
>  
> More info in the attached logs



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-06 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1040845912


##
extensions/mqtt/processors/AbstractMQTTProcessor.cpp:
##
@@ -163,23 +200,259 @@ void AbstractMQTTProcessor::reconnect() {
   }
 
   logger_->log_info("Reconnecting to %s", uri_);
-  int ret = MQTTAsync_connect(client_, &conn_opts);
+  if (MQTTAsync_isConnected(client_)) {
+logger_->log_debug("Already connected to %s, no need to reconnect", uri_);
+return;
+  }
+  const int ret = MQTTAsync_connect(client_, &conn_opts);
+  MQTTProperties_free(&connect_props);
   if (ret != MQTTASYNC_SUCCESS) {
-logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_, 
ret);
+logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error 
code [%d]", uri_, ret);
+return;
   }
+
+  // wait until connection succeeds or fails
+  connect_finished_task.get_future().get();
+}
+
+void AbstractMQTTProcessor::setMqtt5ConnectOptions(MQTTAsync_connectOptions& 
conn_opts, MQTTProperties& connect_props, MQTTProperties& will_props) const {
+  conn_opts.cleanstart = getCleanStart();
+
+  {
+MQTTProperty property;
+property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
+property.value.integer4 = 
gsl::narrow(getSessionExpiryInterval().count());
+MQTTProperties_add(&connect_props, &property);
+  }

Review Comment:
   To me it looks like `property` is copied by value by `MQTTProperties_add()`: 
https://github.com/eclipse/paho.mqtt.c/blob/master/src/MQTTProperties.c#L127



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-06 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1040850939


##
extensions/mqtt/processors/AbstractMQTTProcessor.h:
##
@@ -46,150 +44,178 @@ class AbstractMQTTProcessor : public core::Processor {
 freeResources();
   }
 
+  SMART_ENUM(MqttVersions,
+(V_3X_AUTO, "3.x AUTO"),
+(V_3_1_0, "3.1.0"),
+(V_3_1_1, "3.1.1"),
+(V_5_0, "5.0"));
+
+  SMART_ENUM(MqttQoS,
+(LEVEL_0, "0"),
+(LEVEL_1, "1"),
+(LEVEL_2, "2"));
+
   EXTENSIONAPI static const core::Property BrokerURI;
   EXTENSIONAPI static const core::Property ClientID;
+  EXTENSIONAPI static const core::Property QoS;
+  EXTENSIONAPI static const core::Property MqttVersion;
+  EXTENSIONAPI static const core::Property ConnectionTimeout;
+  EXTENSIONAPI static const core::Property KeepAliveInterval;
+  EXTENSIONAPI static const core::Property LastWillTopic;
+  EXTENSIONAPI static const core::Property LastWillMessage;
+  EXTENSIONAPI static const core::Property LastWillQoS;
+  EXTENSIONAPI static const core::Property LastWillRetain;
+  EXTENSIONAPI static const core::Property LastWillContentType;
   EXTENSIONAPI static const core::Property Username;
   EXTENSIONAPI static const core::Property Password;
-  EXTENSIONAPI static const core::Property KeepAliveInterval;
-  EXTENSIONAPI static const core::Property MaxFlowSegSize;
-  EXTENSIONAPI static const core::Property ConnectionTimeout;
-  EXTENSIONAPI static const core::Property Topic;
-  EXTENSIONAPI static const core::Property QoS;
   EXTENSIONAPI static const core::Property SecurityProtocol;
   EXTENSIONAPI static const core::Property SecurityCA;
   EXTENSIONAPI static const core::Property SecurityCert;
   EXTENSIONAPI static const core::Property SecurityPrivateKey;
   EXTENSIONAPI static const core::Property SecurityPrivateKeyPassword;
-  EXTENSIONAPI static const core::Property LastWillTopic;
-  EXTENSIONAPI static const core::Property LastWillMessage;
-  EXTENSIONAPI static const core::Property LastWillQoS;
-  EXTENSIONAPI static const core::Property LastWillRetain;
 
-  EXTENSIONAPI static auto properties() {
+
+  static auto basicProperties() {
+return std::array{
+  BrokerURI,
+  ClientID,
+  MqttVersion
+};
+  }
+
+  static auto advancedProperties() {
 return std::array{
-BrokerURI,
-Topic,
-ClientID,
-QoS,
-ConnectionTimeout,
-KeepAliveInterval,
-MaxFlowSegSize,
-LastWillTopic,
-LastWillMessage,
-LastWillQoS,
-LastWillRetain,
-Username,
-Password,
-SecurityProtocol,
-SecurityCA,
-SecurityCert,
-SecurityPrivateKey,
-SecurityPrivateKeyPassword
+  QoS,
+  ConnectionTimeout,
+  KeepAliveInterval,
+  LastWillTopic,
+  LastWillMessage,
+  LastWillQoS,
+  LastWillRetain,
+  LastWillContentType,
+  Username,
+  Password,
+  SecurityProtocol,
+  SecurityCA,
+  SecurityCert,
+  SecurityPrivateKey,
+  SecurityPrivateKeyPassword
 };
   }
 
   void onSchedule(const std::shared_ptr& context, const 
std::shared_ptr& factory) override;
+  void onTrigger(const std::shared_ptr& context, const 
std::shared_ptr& session) override;
 
   void notifyStop() override {
 freeResources();
   }
 
  protected:
+  struct MQTTMessageDeleter {
+void operator()(MQTTAsync_message* message) {
+  MQTTAsync_freeMessage(&message);

Review Comment:
   It is required, the signature is `void 
MQTTAsync_freeMessage(MQTTAsync_message** msg)`:
   
https://www.eclipse.org/paho/files/mqttdoc/MQTTAsync/html/_m_q_t_t_async_8h.html#a9b45db63052fe29ab1fad22d2a00c91c



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-06 Thread GitBox


fgerlits commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1040921622


##
libminifi/include/utils/Enum.h:
##
@@ -39,6 +39,7 @@ namespace utils {
 constexpr Clazz(Type value = static_cast(-1)) : value_{value} {} \
 explicit Clazz(const std::string& str) : value_{parse(str.c_str()).value_} 
{} \
 explicit Clazz(const char* str) : value_{parse(str).value_} {} \
+explicit Clazz(std::nullptr_t) = delete; \

Review Comment:
   I see, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-06 Thread GitBox


fgerlits commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1040926309


##
libminifi/include/utils/Enum.h:
##
@@ -127,7 +128,7 @@ namespace utils {
 #define SMART_ENUM(Clazz, ...) \
   struct Clazz { \
 using Base = ::org::apache::nifi::minifi::utils::EnumBase; \
-enum Type { \
+enum Type : int { \

Review Comment:
   do we want to allow a `SMART_ENUM` to be initialized from an integer without 
a cast?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-06 Thread GitBox


fgerlits commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1040930018


##
extensions/mqtt/processors/AbstractMQTTProcessor.cpp:
##
@@ -163,23 +200,259 @@ void AbstractMQTTProcessor::reconnect() {
   }
 
   logger_->log_info("Reconnecting to %s", uri_);
-  int ret = MQTTAsync_connect(client_, &conn_opts);
+  if (MQTTAsync_isConnected(client_)) {
+logger_->log_debug("Already connected to %s, no need to reconnect", uri_);
+return;
+  }
+  const int ret = MQTTAsync_connect(client_, &conn_opts);
+  MQTTProperties_free(&connect_props);
   if (ret != MQTTASYNC_SUCCESS) {
-logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_, 
ret);
+logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error 
code [%d]", uri_, ret);
+return;
   }
+
+  // wait until connection succeeds or fails
+  connect_finished_task.get_future().get();
+}
+
+void AbstractMQTTProcessor::setMqtt5ConnectOptions(MQTTAsync_connectOptions& 
conn_opts, MQTTProperties& connect_props, MQTTProperties& will_props) const {
+  conn_opts.cleanstart = getCleanStart();
+
+  {
+MQTTProperty property;
+property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
+property.value.integer4 = 
gsl::narrow(getSessionExpiryInterval().count());
+MQTTProperties_add(&connect_props, &property);
+  }

Review Comment:
   You're right, I misread this.  Sorry.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-06 Thread GitBox


fgerlits commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1040931415


##
extensions/mqtt/processors/AbstractMQTTProcessor.h:
##
@@ -46,150 +44,178 @@ class AbstractMQTTProcessor : public core::Processor {
 freeResources();
   }
 
+  SMART_ENUM(MqttVersions,
+(V_3X_AUTO, "3.x AUTO"),
+(V_3_1_0, "3.1.0"),
+(V_3_1_1, "3.1.1"),
+(V_5_0, "5.0"));
+
+  SMART_ENUM(MqttQoS,
+(LEVEL_0, "0"),
+(LEVEL_1, "1"),
+(LEVEL_2, "2"));
+
   EXTENSIONAPI static const core::Property BrokerURI;
   EXTENSIONAPI static const core::Property ClientID;
+  EXTENSIONAPI static const core::Property QoS;
+  EXTENSIONAPI static const core::Property MqttVersion;
+  EXTENSIONAPI static const core::Property ConnectionTimeout;
+  EXTENSIONAPI static const core::Property KeepAliveInterval;
+  EXTENSIONAPI static const core::Property LastWillTopic;
+  EXTENSIONAPI static const core::Property LastWillMessage;
+  EXTENSIONAPI static const core::Property LastWillQoS;
+  EXTENSIONAPI static const core::Property LastWillRetain;
+  EXTENSIONAPI static const core::Property LastWillContentType;
   EXTENSIONAPI static const core::Property Username;
   EXTENSIONAPI static const core::Property Password;
-  EXTENSIONAPI static const core::Property KeepAliveInterval;
-  EXTENSIONAPI static const core::Property MaxFlowSegSize;
-  EXTENSIONAPI static const core::Property ConnectionTimeout;
-  EXTENSIONAPI static const core::Property Topic;
-  EXTENSIONAPI static const core::Property QoS;
   EXTENSIONAPI static const core::Property SecurityProtocol;
   EXTENSIONAPI static const core::Property SecurityCA;
   EXTENSIONAPI static const core::Property SecurityCert;
   EXTENSIONAPI static const core::Property SecurityPrivateKey;
   EXTENSIONAPI static const core::Property SecurityPrivateKeyPassword;
-  EXTENSIONAPI static const core::Property LastWillTopic;
-  EXTENSIONAPI static const core::Property LastWillMessage;
-  EXTENSIONAPI static const core::Property LastWillQoS;
-  EXTENSIONAPI static const core::Property LastWillRetain;
 
-  EXTENSIONAPI static auto properties() {
+
+  static auto basicProperties() {
+return std::array{
+  BrokerURI,
+  ClientID,
+  MqttVersion
+};
+  }
+
+  static auto advancedProperties() {
 return std::array{
-BrokerURI,
-Topic,
-ClientID,
-QoS,
-ConnectionTimeout,
-KeepAliveInterval,
-MaxFlowSegSize,
-LastWillTopic,
-LastWillMessage,
-LastWillQoS,
-LastWillRetain,
-Username,
-Password,
-SecurityProtocol,
-SecurityCA,
-SecurityCert,
-SecurityPrivateKey,
-SecurityPrivateKeyPassword
+  QoS,
+  ConnectionTimeout,
+  KeepAliveInterval,
+  LastWillTopic,
+  LastWillMessage,
+  LastWillQoS,
+  LastWillRetain,
+  LastWillContentType,
+  Username,
+  Password,
+  SecurityProtocol,
+  SecurityCA,
+  SecurityCert,
+  SecurityPrivateKey,
+  SecurityPrivateKeyPassword
 };
   }
 
   void onSchedule(const std::shared_ptr& context, const 
std::shared_ptr& factory) override;
+  void onTrigger(const std::shared_ptr& context, const 
std::shared_ptr& session) override;
 
   void notifyStop() override {
 freeResources();
   }
 
  protected:
+  struct MQTTMessageDeleter {
+void operator()(MQTTAsync_message* message) {
+  MQTTAsync_freeMessage(&message);

Review Comment:
   Yep, right again.  Sorry.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (MINIFICPP-1948) Possibility to display in the logs the uid of the component to which the logging relates

2022-12-06 Thread Ferenc Gerlits (Jira)


 [ 
https://issues.apache.org/jira/browse/MINIFICPP-1948?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ferenc Gerlits reassigned MINIFICPP-1948:
-

Assignee: Ferenc Gerlits

> Possibility to display in the logs the uid of the component to which the 
> logging relates
> 
>
> Key: MINIFICPP-1948
> URL: https://issues.apache.org/jira/browse/MINIFICPP-1948
> Project: Apache NiFi MiNiFi C++
>  Issue Type: Wish
>Affects Versions: 0.12.0
>Reporter: Kondakov Artem
>Assignee: Ferenc Gerlits
>Priority: Major
> Fix For: 0.14.0
>
>
> Is it possible, when logging to spdlog, to add the output of the identifier 
> of the component by which logging occurs, for example like this
> spdlog.pattern=[%Y-%m-%d %H:%M:%S.%e] [%n] [%l] [%uid] %v
> where [%uid] is the output [45b07291-7c57-46f0-ab48-6d4c88386f7f] of the 
> Processor ID or Controller Services ID or Connection ID or Flow Controller ID



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1443: MINIFICPP-1972 - Refactor State Manager code

2022-12-06 Thread GitBox


lordgamez commented on code in PR #1443:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1443#discussion_r1041021124


##
libminifi/src/controllers/keyvalue/KeyValueStateManager.cpp:
##
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+
+#include "controllers/keyvalue/KeyValueStateManager.h"
+#include "controllers/keyvalue/KeyValueStateStorage.h"
+
+namespace org::apache::nifi::minifi::controllers {
+
+KeyValueStateManager::KeyValueStateManager(
+const utils::Identifier& id,
+KeyValueStateStorage* storage)
+: StateManager(id)
+, storage_(storage)
+, transaction_in_progress_(false)
+, change_type_(ChangeType::NONE) {
+  std::string serialized;
+  if (storage_->get(id_.to_string(), serialized)) {
+state_ = KeyValueStateStorage::deserialize(serialized);
+  }
+}
+
+bool KeyValueStateManager::set(const core::StateManager::State& kvs) {
+  bool autoCommit = false;

Review Comment:
   Should be `auto_commit`



##
libminifi/src/controllers/keyvalue/KeyValueStateManager.cpp:
##
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+
+#include "controllers/keyvalue/KeyValueStateManager.h"
+#include "controllers/keyvalue/KeyValueStateStorage.h"
+
+namespace org::apache::nifi::minifi::controllers {
+
+KeyValueStateManager::KeyValueStateManager(
+const utils::Identifier& id,
+KeyValueStateStorage* storage)
+: StateManager(id)
+, storage_(storage)
+, transaction_in_progress_(false)
+, change_type_(ChangeType::NONE) {

Review Comment:
   Initializer list should follow the google style guide: 
https://google.github.io/styleguide/cppguide.html#Constructor_Initializer_Lists



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6757: NIFI-10944 FixTestConvertAvroToParquet

2022-12-06 Thread GitBox


exceptionfactory commented on code in PR #6757:
URL: https://github.com/apache/nifi/pull/6757#discussion_r1041048312


##
nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/TestConvertAvroToParquet.java:
##
@@ -228,8 +230,17 @@ public void testData() throws Exception {
 
assertEquals(firstRecord.getGroup("myarray",0).getGroup("list",1).getInteger("element",
 0), 2);
 
 // Map
-
assertEquals(firstRecord.getGroup("mymap",0).getGroup("key_value",0).getInteger("value",
 0), 1);
-
assertEquals(firstRecord.getGroup("mymap",0).getGroup("key_value",1).getInteger("value",
 0), 2);
+String key1 = 
firstRecord.getGroup("mymap",0).getGroup("key_value",0).getValueToString(0,0);
+String key2 = 
firstRecord.getGroup("mymap",0).getGroup("key_value",1).getValueToString(0,0);
+int v1 = 
firstRecord.getGroup("mymap",0).getGroup("key_value",0).getInteger("value", 0);
+int v2 = 
firstRecord.getGroup("mymap",0).getGroup("key_value",1).getInteger("value", 0);
+Map recordData = new LinkedHashMap();
+recordData.put(key1,v1);
+recordData.put(key2,v2);
+Map inputData = new HashMap();
+inputData.put("a",1);
+inputData.put("b",2);
+assertEquals(inputData, recordData);

Review Comment:
   Can this be simplified to the following?
   ```suggestion
   int v1 = 
firstRecord.getGroup("mymap",0).getGroup("key_value",0).getInteger("value", 0);
   int v2 = 
firstRecord.getGroup("mymap",0).getGroup("key_value",1).getInteger("value", 0);
   assertEquals(1, v1);
   assertEquals(2, v2);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi] exceptionfactory commented on pull request #6589: NIFI-10710 implement processor for AWS Polly, Textract, Translate, Tr…

2022-12-06 Thread GitBox


exceptionfactory commented on PR #6589:
URL: https://github.com/apache/nifi/pull/6589#issuecomment-1339483465

   @KalmanJantner, I will take a closer look at the latest changes soon. Please 
avoid introducing merge commits, instead, please rebase and force-push changes 
to align with the main branch. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (NIFI-10953) GCP Vision AI processors

2022-12-06 Thread Kalman Jantner (Jira)
Kalman Jantner created NIFI-10953:
-

 Summary: GCP Vision AI processors
 Key: NIFI-10953
 URL: https://issues.apache.org/jira/browse/NIFI-10953
 Project: Apache NiFi
  Issue Type: Improvement
  Components: Extensions
Reporter: Kalman Jantner


Create processors to interact with GCP Vision AI managed service to:
 * detect text in images
 * detect handwriting in images
 * detect text in files
 * detect faces
 * detect image properties
 * detect labels
 * etc



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (NIFI-10954) Add usage access from Properties tab in component configuration view

2022-12-06 Thread Pierre Villard (Jira)
Pierre Villard created NIFI-10954:
-

 Summary: Add usage access from Properties tab in component 
configuration view
 Key: NIFI-10954
 URL: https://issues.apache.org/jira/browse/NIFI-10954
 Project: Apache NiFi
  Issue Type: Improvement
  Components: Core UI
Reporter: Pierre Villard


It'd be awesome to have a "Usage" icon next to the two existing icons (Verify 
Properties, and Add Property) on the Properties tab of the configuration's view 
of the components. This icon would open the Usage documentation of the 
component. And when closing the documentation, it'd bring us back on the 
configuration's view.

I sometime find myself doing a lot of back and forth in the UI to access the 
additional details documentation of a component when configuring it (closing 
the configuration view, right click on the component and click "View usage", or 
click on the doc icon for controller services / reporting tasks).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [nifi] TheGreatRandall commented on pull request #6761: NIFI-10952 - Fix TestScriptedPartitionRecord.testWhenMultiplePartitions

2022-12-06 Thread GitBox


TheGreatRandall commented on PR #6761:
URL: https://github.com/apache/nifi/pull/6761#issuecomment-1339504069

   > @TheGreatRandall Please run a local build and test to verify changes prior 
to submitting a pull request. As shown in the automated checks, this change 
breaks the unit test on all platforms.
   
   Hi there,
   I got Build Successful on my end before I make a PR. And the result shows it 
failed one test in overall 92 tests. Is there any possible I can see the test 
which failed? Thank you!
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi] KalmanJantner commented on pull request #6589: NIFI-10710 implement processor for AWS Polly, Textract, Translate, Tr…

2022-12-06 Thread GitBox


KalmanJantner commented on PR #6589:
URL: https://github.com/apache/nifi/pull/6589#issuecomment-1339536388

   > @KalmanJantner, I will take a closer look at the latest changes soon. 
Please avoid introducing merge commits, instead, please rebase and force-push 
changes to align with the main branch. Thanks!
   
   Thanks @exceptionfactory, I have removed the merge commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi] TheGreatRandall commented on pull request #6757: NIFI-10944 FixTestConvertAvroToParquet

2022-12-06 Thread GitBox


TheGreatRandall commented on PR #6757:
URL: https://github.com/apache/nifi/pull/6757#issuecomment-1339568171

   Hi there, after more tests locally, the code can not been simplify because 
the order of the value inside hashmap is non deterministic which you can not 
guarentee. And accoring to command, it want to assert equal if the maps' 
equality so I think creating map variable is necessary.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (NIFI-10954) Add usage access from Properties tab in component configuration view

2022-12-06 Thread Matt Gilman (Jira)


[ 
https://issues.apache.org/jira/browse/NIFI-10954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17643932#comment-17643932
 ] 

Matt Gilman commented on NIFI-10954:


[~pvillard] Which details from usage are you most interested in here? The 
tooltip next to the Property name should already include all the property 
details from usage. We also include a short history of recent values. If 
there's something from usage that we could incorporate inline it may helpful 
and prevent the need to go back and forth.

> Add usage access from Properties tab in component configuration view
> 
>
> Key: NIFI-10954
> URL: https://issues.apache.org/jira/browse/NIFI-10954
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Core UI
>Reporter: Pierre Villard
>Priority: Major
>
> It'd be awesome to have a "Usage" icon next to the two existing icons (Verify 
> Properties, and Add Property) on the Properties tab of the configuration's 
> view of the components. This icon would open the Usage documentation of the 
> component. And when closing the documentation, it'd bring us back on the 
> configuration's view.
> I sometime find myself doing a lot of back and forth in the UI to access the 
> additional details documentation of a component when configuring it (closing 
> the configuration view, right click on the component and click "View usage", 
> or click on the doc icon for controller services / reporting tasks).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-06 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041148586


##
libminifi/include/utils/Enum.h:
##
@@ -127,7 +128,7 @@ namespace utils {
 #define SMART_ENUM(Clazz, ...) \
   struct Clazz { \
 using Base = ::org::apache::nifi::minifi::utils::EnumBase; \
-enum Type { \
+enum Type : int { \

Review Comment:
   It would still not enable `SMART_ENUM` initialization from integer without a 
cast. Only it's `Type` structure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (NIFI-10916) Controller Service allowable values dropdown list should be sorted

2022-12-06 Thread Nathan Gough (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFI-10916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nathan Gough updated NIFI-10916:

Fix Version/s: 1.20.0
   Resolution: Fixed
   Status: Resolved  (was: Patch Available)

> Controller Service allowable values dropdown list should be sorted
> --
>
> Key: NIFI-10916
> URL: https://issues.apache.org/jira/browse/NIFI-10916
> Project: Apache NiFi
>  Issue Type: Improvement
>Reporter: Paul Grey
>Assignee: Paul Grey
>Priority: Minor
> Fix For: 1.20.0
>
> Attachments: NIFI-10916.png
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> When a processor (or controller service) configuration property allows for 
> the selection of a controller service (via combobox), the available values 
> should be sorted, to facilitate selection of the intended controller service. 
>  See attachment.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [nifi] mr1716 commented on pull request #6760: NIFI-10921 Update poi.version to 5.2.3

2022-12-06 Thread GitBox


mr1716 commented on PR #6760:
URL: https://github.com/apache/nifi/pull/6760#issuecomment-1339586829

   When I ran this locally, every check passed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (NIFI-10915) Add additionaDetails information to extension manifest

2022-12-06 Thread Bryan Bende (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFI-10915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bryan Bende updated NIFI-10915:
---
Fix Version/s: (was: nifi-nar-maven-plugin-1.4.0)

> Add additionaDetails information to extension manifest
> --
>
> Key: NIFI-10915
> URL: https://issues.apache.org/jira/browse/NIFI-10915
> Project: Apache NiFi
>  Issue Type: Improvement
>Reporter: Bryan Bende
>Assignee: Bryan Bende
>Priority: Minor
>
> The extension-manifest.xml contains all of the information used to generate 
> the documentation of a component, but one missing part is the 
> additionalDetails.html that can optionally be provided. This ticket is to 
> figure out how to convey the additionaDetails information through the 
> extension-manifest.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1443: MINIFICPP-1972 - Refactor State Manager code

2022-12-06 Thread GitBox


adam-markovics commented on code in PR #1443:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1443#discussion_r1041157615


##
libminifi/src/controllers/keyvalue/KeyValueStateManager.cpp:
##
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+
+#include "controllers/keyvalue/KeyValueStateManager.h"
+#include "controllers/keyvalue/KeyValueStateStorage.h"
+
+namespace org::apache::nifi::minifi::controllers {
+
+KeyValueStateManager::KeyValueStateManager(
+const utils::Identifier& id,
+KeyValueStateStorage* storage)
+: StateManager(id)
+, storage_(storage)
+, transaction_in_progress_(false)
+, change_type_(ChangeType::NONE) {
+  std::string serialized;
+  if (storage_->get(id_.to_string(), serialized)) {
+state_ = KeyValueStateStorage::deserialize(serialized);
+  }
+}
+
+bool KeyValueStateManager::set(const core::StateManager::State& kvs) {
+  bool autoCommit = false;

Review Comment:
   Thanks for noticing. It's old code (by me), when I still didn't know 
snake_case is the way to go for local variables.



##
libminifi/src/controllers/keyvalue/KeyValueStateManager.cpp:
##
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+
+#include "controllers/keyvalue/KeyValueStateManager.h"
+#include "controllers/keyvalue/KeyValueStateStorage.h"
+
+namespace org::apache::nifi::minifi::controllers {
+
+KeyValueStateManager::KeyValueStateManager(
+const utils::Identifier& id,
+KeyValueStateStorage* storage)
+: StateManager(id)
+, storage_(storage)
+, transaction_in_progress_(false)
+, change_type_(ChangeType::NONE) {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (NIFI-10915) Add additionaDetails information to extension manifest

2022-12-06 Thread Bryan Bende (Jira)


[ 
https://issues.apache.org/jira/browse/NIFI-10915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17643936#comment-17643936
 ] 

Bryan Bende commented on NIFI-10915:


After looking at this a bit, there are no changes to the NAR plugin required to 
implement this, so I removed the Fix Version that previously had the NAR plugin 
version, it would be a normal NiFi change.

> Add additionaDetails information to extension manifest
> --
>
> Key: NIFI-10915
> URL: https://issues.apache.org/jira/browse/NIFI-10915
> Project: Apache NiFi
>  Issue Type: Improvement
>Reporter: Bryan Bende
>Assignee: Bryan Bende
>Priority: Minor
>
> The extension-manifest.xml contains all of the information used to generate 
> the documentation of a component, but one missing part is the 
> additionalDetails.html that can optionally be provided. This ticket is to 
> figure out how to convey the additionaDetails information through the 
> extension-manifest.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-06 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041159887


##
extensions/mqtt/processors/AbstractMQTTProcessor.cpp:
##
@@ -163,23 +200,259 @@ void AbstractMQTTProcessor::reconnect() {
   }
 
   logger_->log_info("Reconnecting to %s", uri_);
-  int ret = MQTTAsync_connect(client_, &conn_opts);
+  if (MQTTAsync_isConnected(client_)) {
+logger_->log_debug("Already connected to %s, no need to reconnect", uri_);
+return;
+  }
+  const int ret = MQTTAsync_connect(client_, &conn_opts);
+  MQTTProperties_free(&connect_props);
   if (ret != MQTTASYNC_SUCCESS) {
-logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_, 
ret);
+logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error 
code [%d]", uri_, ret);
+return;
   }
+
+  // wait until connection succeeds or fails
+  connect_finished_task.get_future().get();
+}
+
+void AbstractMQTTProcessor::setMqtt5ConnectOptions(MQTTAsync_connectOptions& 
conn_opts, MQTTProperties& connect_props, MQTTProperties& will_props) const {
+  conn_opts.cleanstart = getCleanStart();
+
+  {
+MQTTProperty property;
+property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
+property.value.integer4 = 
gsl::narrow(getSessionExpiryInterval().count());
+MQTTProperties_add(&connect_props, &property);
+  }
+
+  if (!last_will_content_type_.empty()) {
+MQTTProperty property;
+property.identifier = MQTTPROPERTY_CODE_CONTENT_TYPE;
+property.value.data.len = last_will_content_type_.length();
+property.value.data.data = 
const_cast(last_will_content_type_.data());
+MQTTProperties_add(&will_props, &property);
+  }
+
+  conn_opts.willProperties = &will_props;
+
+  setMqtt5ConnectOptionsImpl(connect_props);
+}
+
+void AbstractMQTTProcessor::onTrigger(const 
std::shared_ptr& context, const 
std::shared_ptr& session) {
+  // read lock
+  std::shared_lock client_lock{client_mutex_};
+  if (client_ == nullptr) {
+// we are shutting down
+return;
+  }
+
+  // reconnect if needed
+  reconnect();
+
+  if (!MQTTAsync_isConnected(client_)) {
+logger_->log_error("Could not work with MQTT broker because disconnected 
to %s", uri_);
+yield();
+return;
+  }
+
+  onTriggerImpl(context, session);
 }
 
 void AbstractMQTTProcessor::freeResources() {
-  if (client_ && MQTTAsync_isConnected(client_)) {
-MQTTAsync_disconnectOptions disconnect_options = 
MQTTAsync_disconnectOptions_initializer;
-disconnect_options.context = this;
-disconnect_options.onSuccess = disconnectionSuccess;
-disconnect_options.onFailure = disconnectionFailure;
-disconnect_options.timeout = 
gsl::narrow(std::chrono::milliseconds{connection_timeout_}.count());
-MQTTAsync_disconnect(client_, &disconnect_options);
+  // write lock
+  std::lock_guard client_lock{client_mutex_};
+
+  if (!client_) {
+return;
   }
-  if (client_) {
-MQTTAsync_destroy(&client_);
+
+  disconnect();
+
+  MQTTAsync_destroy(&client_);
+}
+
+void AbstractMQTTProcessor::disconnect() {
+  if (!MQTTAsync_isConnected(client_)) {
+return;
+  }
+
+  MQTTAsync_disconnectOptions disconnect_options = 
MQTTAsync_disconnectOptions_initializer;
+  std::packaged_task disconnect_finished_task(
+  [this] (MQTTAsync_successData* success_data, MQTTAsync_successData5* 
success_data_5, MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* 
failure_data_5) {
+onDisconnectFinished(success_data, success_data_5, failure_data, 
failure_data_5);
+  });
+  disconnect_options.context = &disconnect_finished_task;
+
+  if (mqtt_version_.value() == MqttVersions::V_5_0) {
+disconnect_options.onSuccess5 = connectionSuccess5;
+disconnect_options.onFailure5 = connectionFailure5;
+  } else {
+disconnect_options.onSuccess = connectionSuccess;
+disconnect_options.onFailure = connectionFailure;
+  }
+
+  disconnect_options.timeout = 
gsl::narrow(std::chrono::milliseconds{connection_timeout_}.count());
+
+  const int ret = MQTTAsync_disconnect(client_, &disconnect_options);
+  if (ret != MQTTASYNC_SUCCESS) {
+logger_->log_error("MQTTAsync_disconnect failed to MQTT broker %s with 
error code [%d]", uri_, ret);
+return;
+  }
+
+  // wait until connection succeeds or fails
+  disconnect_finished_task.get_future().get();
+}
+
+void AbstractMQTTProcessor::setBrokerLimits(MQTTAsync_successData5* response) {
+  auto readProperty = [response] (MQTTPropertyCodes property_code, auto& 
out_var) {
+const int value = MQTTProperties_getNumericValue(&response->properties, 
property_code);
+if (value != PAHO_MQTT_C_FAILURE_CODE) {
+  if constexpr (std::is_same_v&>) {
+out_var = std::chrono::seconds(value);
+  } else {
+out_var = gsl::narrow::value_type>(value);
+  }
+} else {
+  out_var.reset();
+}
+  };
+
+  readProperty(MQTTPROPERTY_CODE_RETAIN_AVAILABLE, retain_available_);
+  readProperty

[jira] [Updated] (NIFI-10915) Add field to runtime manifest to indicate presence of additionaDetails information

2022-12-06 Thread Bryan Bende (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFI-10915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bryan Bende updated NIFI-10915:
---
Summary: Add field to runtime manifest to indicate presence of 
additionaDetails information  (was: Add additionaDetails information to 
extension manifest)

> Add field to runtime manifest to indicate presence of additionaDetails 
> information
> --
>
> Key: NIFI-10915
> URL: https://issues.apache.org/jira/browse/NIFI-10915
> Project: Apache NiFi
>  Issue Type: Improvement
>Reporter: Bryan Bende
>Assignee: Bryan Bende
>Priority: Minor
>
> The extension-manifest.xml contains all of the information used to generate 
> the documentation of a component, but one missing part is the 
> additionalDetails.html that can optionally be provided. This ticket is to 
> figure out how to convey the additionaDetails information through the 
> extension-manifest.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (NIFI-10915) Add field to runtime manifest to indicate presence of additionaDetails information

2022-12-06 Thread Bryan Bende (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFI-10915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bryan Bende updated NIFI-10915:
---
Description: The NAR plugin writes the additionalDetails.html files next to 
the extension-manifest.xml in the docs directory. This ticket is to leverage 
that and pass along the fact that additional details exist for a given 
component in the C2 runtime manifest that is generated, so that a consumer of 
the manifest would know there are additional docs.  (was: The 
extension-manifest.xml contains all of the information used to generate the 
documentation of a component, but one missing part is the 
additionalDetails.html that can optionally be provided. This ticket is to 
figure out how to convey the additionaDetails information through the 
extension-manifest.)

> Add field to runtime manifest to indicate presence of additionaDetails 
> information
> --
>
> Key: NIFI-10915
> URL: https://issues.apache.org/jira/browse/NIFI-10915
> Project: Apache NiFi
>  Issue Type: Improvement
>Reporter: Bryan Bende
>Assignee: Bryan Bende
>Priority: Minor
>
> The NAR plugin writes the additionalDetails.html files next to the 
> extension-manifest.xml in the docs directory. This ticket is to leverage that 
> and pass along the fact that additional details exist for a given component 
> in the C2 runtime manifest that is generated, so that a consumer of the 
> manifest would know there are additional docs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (NIFI-8820) NiFi Registry should implement context path verification similar to NiFi

2022-12-06 Thread Julien G. (Jira)


[ 
https://issues.apache.org/jira/browse/NIFI-8820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17643937#comment-17643937
 ] 

Julien G. commented on NIFI-8820:
-

This feature is still needed.

When we want to port forward in docker/K8s environment we can't.
And if we cant to put the NiFi Registry behind Istio we can't either.

> NiFi Registry should implement context path verification similar to NiFi
> 
>
> Key: NIFI-8820
> URL: https://issues.apache.org/jira/browse/NIFI-8820
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: NiFi Registry
>Reporter: Koji Kawamura
>Priority: Major
>
> NIFIREG-295 makes NiFi Registry can be accessed behind a reverse proxy. In 
> such deployments NiFi Registry can be accessed with a context path configured 
> at the reverse proxy.
> NiFi API verifies context path at WebUtils.verifyContextPath() using white 
> listed context paths configured at 'nifi.web.proxy.context.path' in 
> nifi.properties.
> NiFi Registry API should do the same context path verification.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (NIFI-10951) Null exception error with DBCP Controller service and AWS parameter provider

2022-12-06 Thread Paul Grey (Jira)


[ 
https://issues.apache.org/jira/browse/NIFI-10951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17643938#comment-17643938
 ] 

Paul Grey commented on NIFI-10951:
--

Going from this line in the stack trace above:

{noformat}
at 
org.apache.nifi.dbcp.DBCPConnectionPool.lambda$configureDataSource$0(DBCPConnectionPool.java:554)
{noformat}

There was a recent refactor that migrated this code to a new class:

https://github.com/apache/nifi/blob/89426d167b34f49bfce28abf38ccb990f8bb7393/nifi-nar-bundles/nifi-extension-utils/nifi-dbcp-base/src/main/java/org/apache/nifi/dbcp/AbstractDBCPConnectionPool.java#L446-L447

I suspect the issue is still present (unaffected by the refactor).


> Null exception error with DBCP Controller service and AWS parameter provider
> 
>
> Key: NIFI-10951
> URL: https://issues.apache.org/jira/browse/NIFI-10951
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: NiFi Stateless
>Affects Versions: 1.18.0, 1.19.0
>Reporter: Lucas Read
>Priority: Major
>
> I have a AWS Secrets Manager parameter provider configured and assigned to a 
> process group. I am trying to execute the process group as a stateless flow 
> but getting a null exception returned. It works when I do not run it as a 
> nonstateless flow but not when it is stateless. 
>  
> {code:java}
> 2022-12-02 23:34:35,762 ERROR [Component Lifecycle for dataflow 
> stateless_test Thread-1] o.a.n.c.s.StandardControllerServiceNode 
> StandardControllerServiceNode[service=DBCPConnectionPool[id=4e17ba60-aba6-3443-83ff-5f6da6de96ac],
>  name=stateless_DBCPConnectionPool, active=true] Failed to invoke @OnEnabled 
> method
> java.lang.NullPointerException: null
> at 
> java.base/java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1011)
> at 
> java.base/java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006)
> at java.base/java.util.Properties.put(Properties.java:1340)
> at 
> org.apache.commons.dbcp2.BasicDataSource.addConnectionProperty(BasicDataSource.java:384)
> at 
> org.apache.nifi.dbcp.DBCPConnectionPool.lambda$configureDataSource$0(DBCPConnectionPool.java:554)
> at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
> at 
> org.apache.nifi.dbcp.DBCPConnectionPool.configureDataSource(DBCPConnectionPool.java:550)
> at 
> org.apache.nifi.dbcp.DBCPConnectionPool.onConfigured(DBCPConnectionPool.java:497)
> at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at 
> org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:145)
> at 
> org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:133)
> at 
> org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:78)
> at 
> org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotation(ReflectionUtils.java:55)
> at 
> org.apache.nifi.controller.service.StandardControllerServiceNode$2.run(StandardControllerServiceNode.java:598)
> at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
> at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
> at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:829)
> 2022-12-02 23:34:35,763 ERROR [Timer-Driven Process Thread-2] 
> o.a.n.p.stateless.ExecuteStateless 
> ExecuteStateless[id=294238ae-8268-1987--fd45d0f8] Could not create 
> dataflow from snapshot
> java.lang.IllegalStateException: Controller Service 
> StandardControllerServiceNode[service=DBCPConnectionPool[id=4e17ba60-aba6-3443-83ff-5f6da6de96ac],
>  name=stateless_DBCPConnectionPool, active=true] has not fully enabled. 
> Current Validation Status is VALID with validation Errors: []
> at 
> org.apache.nifi.stateless.flow.StandardStatelessFlow.enableControllerServices(StandardStatelessFlow.java:391)
> at 
> org.apache.nifi.stateless.flow.StandardStatelessFlow.initialize(Standa

[jira] [Commented] (NIFI-10954) Add usage access from Properties tab in component configuration view

2022-12-06 Thread Pierre Villard (Jira)


[ 
https://issues.apache.org/jira/browse/NIFI-10954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17643945#comment-17643945
 ] 

Pierre Villard commented on NIFI-10954:
---

In this case, it's really the content of the additional details for the 
component. Maybe displaying the additional details in a tab next to the 
Comments tab could be an option? Not sure though if displaying arbitrary HTML 
there would be good.

> Add usage access from Properties tab in component configuration view
> 
>
> Key: NIFI-10954
> URL: https://issues.apache.org/jira/browse/NIFI-10954
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Core UI
>Reporter: Pierre Villard
>Priority: Major
>
> It'd be awesome to have a "Usage" icon next to the two existing icons (Verify 
> Properties, and Add Property) on the Properties tab of the configuration's 
> view of the components. This icon would open the Usage documentation of the 
> component. And when closing the documentation, it'd bring us back on the 
> configuration's view.
> I sometime find myself doing a lot of back and forth in the UI to access the 
> additional details documentation of a component when configuring it (closing 
> the configuration view, right click on the component and click "View usage", 
> or click on the doc icon for controller services / reporting tasks).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [nifi] exceptionfactory commented on pull request #6761: NIFI-10952 - Fix TestScriptedPartitionRecord.testWhenMultiplePartitions

2022-12-06 Thread GitBox


exceptionfactory commented on PR #6761:
URL: https://github.com/apache/nifi/pull/6761#issuecomment-1339639014

   Running a full build with `mvn clean install` should run the tests and show 
the problems.
   
   For this particular module, running `mvn clean install -am -pl 
:nifi-scripting-processors` should run the impacted code and show the test 
failures.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm closed pull request #1468: MINIFICPP-2005 - Use gsl::span instead of std::span

2022-12-06 Thread GitBox


szaszm closed pull request #1468: MINIFICPP-2005 - Use gsl::span instead of 
std::span
URL: https://github.com/apache/nifi-minifi-cpp/pull/1468


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm closed pull request #1467: MINIFICPP-1986 Upgrade github action versions

2022-12-06 Thread GitBox


szaszm closed pull request #1467: MINIFICPP-1986 Upgrade github action versions
URL: https://github.com/apache/nifi-minifi-cpp/pull/1467


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6416: NIFI-10234 implement PutIoTDB

2022-12-06 Thread GitBox


exceptionfactory commented on code in PR #6416:
URL: https://github.com/apache/nifi/pull/6416#discussion_r1041235006


##
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDBRecord.java:
##
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.io.InputStream;
+import java.time.format.DateTimeFormatter;
+import java.util.Set;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processors.model.IoTDBSchema;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.model.ValidationResult;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import java.sql.Timestamp;
+import java.sql.Time;
+import java.sql.Date;
+
+@Tags({"iotdb", "insert", "tablet"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription(
+"This is a record aware processor that reads the content of the 
incoming FlowFile as individual records using the "
++ "configured 'Record Reader' and writes them to Apache IoTDB 
using native interface.")
+public class PutIoTDBRecord extends AbstractIoTDB {
+
+static final PropertyDescriptor RECORD_READER_FACTORY =
+new PropertyDescriptor.Builder()
+.name("Record Reader")
+.description(
+"Specifies the type of Record Reader controller 
service to use for parsing the incoming data "
++ "and determining the schema")
+.identifiesControllerService(RecordReaderFactory.class)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.required(true)
+.build();
+
+static final PropertyDescriptor SCHEMA =
+new PropertyDescriptor.Builder()
+.name("Schema Template")
+.description(
+"The Apache IoTDB Schema Template defined using 
JSON.\n" +
+"The Processor will infer the IoTDB Schema 
when this property is not configured.\n" +
+"Besides, you can set encoding type and 
compression type by this method.\n" +
+"If you want to know more detail about 
this, you can browse this link: 
https://iotdb.apache.org/UserGuide/Master/Ecosystem-Integration/NiFi-IoTDB.html";)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.required(false)
+.build();
+
+static final PropertyDescriptor PREFIX =
+new PropertyDescriptor.Builder()
+.name("Prefix")
+.description(
+"The Prefix begin with root. that will be add to 
the tsName in data.\n")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.required(t

[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-06 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041258769


##
extensions/mqtt/processors/ConsumeMQTT.cpp:
##
@@ -34,111 +34,333 @@ void ConsumeMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr message) {
-  if (queue_.size_approx() >= maxQueueSize_) {
-logger_->log_warn("MQTT queue full");
+void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) {
+  if (queue_.size_approx() >= max_queue_size_) {
+logger_->log_error("MQTT queue full");
 return;
   }
 
-  if (gsl::narrow(message->payloadlen) > max_seg_size_) {
-logger_->log_debug("MQTT message was truncated while enqueuing, original 
length: %d", message->payloadlen);
-message->payloadlen = gsl::narrow(max_seg_size_);
-  }
-
-  logger_->log_debug("enqueuing MQTT message with length %d", 
message->payloadlen);
+  logger_->log_debug("enqueuing MQTT message with length %d", 
message.contents->payloadlen);
   queue_.enqueue(std::move(message));
 }
 
-void ConsumeMQTT::onSchedule(const std::shared_ptr 
&context, const std::shared_ptr &factory) {
+void ConsumeMQTT::readProperties(const std::shared_ptr& 
context) {
+  if (auto value = context->getProperty(Topic)) {
+topic_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_);
+
   if (const auto value = context->getProperty(CleanSession)) {
-cleanSession_ = *value;
-logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_);
+clean_session_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_);
+
+  if (const auto value = context->getProperty(CleanStart)) {
+clean_start_ = *value;
+  }
+  logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_);
+
+  if (const auto session_expiry_interval = 
context->getProperty(SessionExpiryInterval)) {
+session_expiry_interval_ = 
std::chrono::duration_cast(session_expiry_interval->getMilliseconds());
+  }
+  logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s", 
int64_t{session_expiry_interval_.count()});
 
   if (const auto value = 
context->getProperty(QueueBufferMaxMessage)) {
-maxQueueSize_ = *value;
-logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", 
maxQueueSize_);
+max_queue_size_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", 
max_queue_size_);
 
-  // this connects to broker, so properties of this processor must be read 
before
-  AbstractMQTTProcessor::onSchedule(context, factory);
-}
+  if (auto value = context->getProperty(AttributeFromContentType)) {
+attribute_from_content_type_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]", 
attribute_from_content_type_);
 
-void ConsumeMQTT::onTrigger(const std::shared_ptr& 
/*context*/, const std::shared_ptr &session) {
-  // reconnect if needed
-  reconnect();
+  if (const auto topic_alias_maximum = 
context->getProperty(TopicAliasMaximum)) {
+topic_alias_maximum_ = gsl::narrow(*topic_alias_maximum);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]", 
topic_alias_maximum_);
 
-  if (!MQTTAsync_isConnected(client_)) {
-logger_->log_error("Could not consume from MQTT broker because 
disconnected to %s", uri_);
-yield();
-return;
+  if (const auto receive_maximum = 
context->getProperty(ReceiveMaximum)) {
+receive_maximum_ = gsl::narrow(*receive_maximum);
   }
+  logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]", 
receive_maximum_);
+}
 
-  std::deque> msg_queue;
-  getReceivedMQTTMsg(msg_queue);
+void ConsumeMQTT::onTriggerImpl(const std::shared_ptr& 
/*context*/, const std::shared_ptr& session) {
+  std::queue msg_queue = getReceivedMqttMessages();
   while (!msg_queue.empty()) {
 const auto& message = msg_queue.front();
-std::shared_ptr processFlowFile = session->create();
-int write_status{};
-session->write(processFlowFile, [&message, &write_status](const 
std::shared_ptr& stream) -> int64_t {
-  if (message->payloadlen < 0) {
-write_status = -1;
-return -1;
-  }
-  const auto len = 
stream->write(reinterpret_cast(message->payload), 
gsl::narrow(message->payloadlen));
-  if (io::isError(len)) {
-write_status = -1;
-return -1;
-  }
-  return gsl::narrow(len);
-});
-if (write_status < 0) {
-  logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", 
processFlowFile->getUUIDStr());
-  session->remove(processFlowFile);
+std::shared_ptr flow_file = session->create();
+WriteCallback write_callback(message, logger_);
+try {
+  session->write(flow_file, write_callback);

Review Comment:
   Good catch! I tested it on dummy code, `std::ref` will really be necessary.



-- 
This is an automated message from the Apache Git Service.
To re

[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-06 Thread GitBox


fgerlits commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041264186


##
extensions/mqtt/processors/AbstractMQTTProcessor.cpp:
##
@@ -163,23 +200,259 @@ void AbstractMQTTProcessor::reconnect() {
   }
 
   logger_->log_info("Reconnecting to %s", uri_);
-  int ret = MQTTAsync_connect(client_, &conn_opts);
+  if (MQTTAsync_isConnected(client_)) {
+logger_->log_debug("Already connected to %s, no need to reconnect", uri_);
+return;
+  }
+  const int ret = MQTTAsync_connect(client_, &conn_opts);
+  MQTTProperties_free(&connect_props);
   if (ret != MQTTASYNC_SUCCESS) {
-logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_, 
ret);
+logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error 
code [%d]", uri_, ret);
+return;
   }
+
+  // wait until connection succeeds or fails
+  connect_finished_task.get_future().get();
+}
+
+void AbstractMQTTProcessor::setMqtt5ConnectOptions(MQTTAsync_connectOptions& 
conn_opts, MQTTProperties& connect_props, MQTTProperties& will_props) const {
+  conn_opts.cleanstart = getCleanStart();
+
+  {
+MQTTProperty property;
+property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
+property.value.integer4 = 
gsl::narrow(getSessionExpiryInterval().count());
+MQTTProperties_add(&connect_props, &property);
+  }
+
+  if (!last_will_content_type_.empty()) {
+MQTTProperty property;
+property.identifier = MQTTPROPERTY_CODE_CONTENT_TYPE;
+property.value.data.len = last_will_content_type_.length();
+property.value.data.data = 
const_cast(last_will_content_type_.data());
+MQTTProperties_add(&will_props, &property);
+  }
+
+  conn_opts.willProperties = &will_props;
+
+  setMqtt5ConnectOptionsImpl(connect_props);
+}
+
+void AbstractMQTTProcessor::onTrigger(const 
std::shared_ptr& context, const 
std::shared_ptr& session) {
+  // read lock
+  std::shared_lock client_lock{client_mutex_};
+  if (client_ == nullptr) {
+// we are shutting down
+return;
+  }
+
+  // reconnect if needed
+  reconnect();
+
+  if (!MQTTAsync_isConnected(client_)) {
+logger_->log_error("Could not work with MQTT broker because disconnected 
to %s", uri_);
+yield();
+return;
+  }
+
+  onTriggerImpl(context, session);
 }
 
 void AbstractMQTTProcessor::freeResources() {
-  if (client_ && MQTTAsync_isConnected(client_)) {
-MQTTAsync_disconnectOptions disconnect_options = 
MQTTAsync_disconnectOptions_initializer;
-disconnect_options.context = this;
-disconnect_options.onSuccess = disconnectionSuccess;
-disconnect_options.onFailure = disconnectionFailure;
-disconnect_options.timeout = 
gsl::narrow(std::chrono::milliseconds{connection_timeout_}.count());
-MQTTAsync_disconnect(client_, &disconnect_options);
+  // write lock
+  std::lock_guard client_lock{client_mutex_};
+
+  if (!client_) {
+return;
   }
-  if (client_) {
-MQTTAsync_destroy(&client_);
+
+  disconnect();
+
+  MQTTAsync_destroy(&client_);
+}
+
+void AbstractMQTTProcessor::disconnect() {
+  if (!MQTTAsync_isConnected(client_)) {
+return;
+  }
+
+  MQTTAsync_disconnectOptions disconnect_options = 
MQTTAsync_disconnectOptions_initializer;
+  std::packaged_task disconnect_finished_task(
+  [this] (MQTTAsync_successData* success_data, MQTTAsync_successData5* 
success_data_5, MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* 
failure_data_5) {
+onDisconnectFinished(success_data, success_data_5, failure_data, 
failure_data_5);
+  });
+  disconnect_options.context = &disconnect_finished_task;
+
+  if (mqtt_version_.value() == MqttVersions::V_5_0) {
+disconnect_options.onSuccess5 = connectionSuccess5;
+disconnect_options.onFailure5 = connectionFailure5;
+  } else {
+disconnect_options.onSuccess = connectionSuccess;
+disconnect_options.onFailure = connectionFailure;
+  }
+
+  disconnect_options.timeout = 
gsl::narrow(std::chrono::milliseconds{connection_timeout_}.count());
+
+  const int ret = MQTTAsync_disconnect(client_, &disconnect_options);
+  if (ret != MQTTASYNC_SUCCESS) {
+logger_->log_error("MQTTAsync_disconnect failed to MQTT broker %s with 
error code [%d]", uri_, ret);
+return;
+  }
+
+  // wait until connection succeeds or fails
+  disconnect_finished_task.get_future().get();
+}
+
+void AbstractMQTTProcessor::setBrokerLimits(MQTTAsync_successData5* response) {
+  auto readProperty = [response] (MQTTPropertyCodes property_code, auto& 
out_var) {
+const int value = MQTTProperties_getNumericValue(&response->properties, 
property_code);
+if (value != PAHO_MQTT_C_FAILURE_CODE) {
+  if constexpr (std::is_same_v&>) {
+out_var = std::chrono::seconds(value);
+  } else {
+out_var = gsl::narrow::value_type>(value);
+  }
+} else {
+  out_var.reset();
+}
+  };
+
+  readProperty(MQTTPROPERTY_CODE_RETAIN_AVAILABLE, retain_available_);
+  readProperty(MQTTP

[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-06 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041268666


##
extensions/mqtt/processors/ConsumeMQTT.cpp:
##
@@ -34,111 +34,333 @@ void ConsumeMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr message) {
-  if (queue_.size_approx() >= maxQueueSize_) {
-logger_->log_warn("MQTT queue full");
+void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) {
+  if (queue_.size_approx() >= max_queue_size_) {
+logger_->log_error("MQTT queue full");
 return;
   }
 
-  if (gsl::narrow(message->payloadlen) > max_seg_size_) {
-logger_->log_debug("MQTT message was truncated while enqueuing, original 
length: %d", message->payloadlen);
-message->payloadlen = gsl::narrow(max_seg_size_);
-  }
-
-  logger_->log_debug("enqueuing MQTT message with length %d", 
message->payloadlen);
+  logger_->log_debug("enqueuing MQTT message with length %d", 
message.contents->payloadlen);
   queue_.enqueue(std::move(message));
 }
 
-void ConsumeMQTT::onSchedule(const std::shared_ptr 
&context, const std::shared_ptr &factory) {
+void ConsumeMQTT::readProperties(const std::shared_ptr& 
context) {
+  if (auto value = context->getProperty(Topic)) {
+topic_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_);
+
   if (const auto value = context->getProperty(CleanSession)) {
-cleanSession_ = *value;
-logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_);
+clean_session_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_);
+
+  if (const auto value = context->getProperty(CleanStart)) {
+clean_start_ = *value;
+  }
+  logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_);
+
+  if (const auto session_expiry_interval = 
context->getProperty(SessionExpiryInterval)) {
+session_expiry_interval_ = 
std::chrono::duration_cast(session_expiry_interval->getMilliseconds());
+  }
+  logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s", 
int64_t{session_expiry_interval_.count()});
 
   if (const auto value = 
context->getProperty(QueueBufferMaxMessage)) {
-maxQueueSize_ = *value;
-logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", 
maxQueueSize_);
+max_queue_size_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", 
max_queue_size_);
 
-  // this connects to broker, so properties of this processor must be read 
before
-  AbstractMQTTProcessor::onSchedule(context, factory);
-}
+  if (auto value = context->getProperty(AttributeFromContentType)) {
+attribute_from_content_type_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]", 
attribute_from_content_type_);
 
-void ConsumeMQTT::onTrigger(const std::shared_ptr& 
/*context*/, const std::shared_ptr &session) {
-  // reconnect if needed
-  reconnect();
+  if (const auto topic_alias_maximum = 
context->getProperty(TopicAliasMaximum)) {
+topic_alias_maximum_ = gsl::narrow(*topic_alias_maximum);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]", 
topic_alias_maximum_);
 
-  if (!MQTTAsync_isConnected(client_)) {
-logger_->log_error("Could not consume from MQTT broker because 
disconnected to %s", uri_);
-yield();
-return;
+  if (const auto receive_maximum = 
context->getProperty(ReceiveMaximum)) {
+receive_maximum_ = gsl::narrow(*receive_maximum);
   }
+  logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]", 
receive_maximum_);
+}
 
-  std::deque> msg_queue;
-  getReceivedMQTTMsg(msg_queue);
+void ConsumeMQTT::onTriggerImpl(const std::shared_ptr& 
/*context*/, const std::shared_ptr& session) {
+  std::queue msg_queue = getReceivedMqttMessages();
   while (!msg_queue.empty()) {
 const auto& message = msg_queue.front();
-std::shared_ptr processFlowFile = session->create();
-int write_status{};
-session->write(processFlowFile, [&message, &write_status](const 
std::shared_ptr& stream) -> int64_t {
-  if (message->payloadlen < 0) {
-write_status = -1;
-return -1;
-  }
-  const auto len = 
stream->write(reinterpret_cast(message->payload), 
gsl::narrow(message->payloadlen));
-  if (io::isError(len)) {
-write_status = -1;
-return -1;
-  }
-  return gsl::narrow(len);
-});
-if (write_status < 0) {
-  logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", 
processFlowFile->getUUIDStr());
-  session->remove(processFlowFile);
+std::shared_ptr flow_file = session->create();
+WriteCallback write_callback(message, logger_);
+try {
+  session->write(flow_file, write_callback);
+} catch (const Exception& ex) {
+  logger_->log_error("Error when processing message queue: %s", ex.what());
+}
+if (!write_callback.getSuccessStatus

[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-06 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041280626


##
extensions/mqtt/processors/ConsumeMQTT.cpp:
##
@@ -34,111 +34,333 @@ void ConsumeMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr message) {
-  if (queue_.size_approx() >= maxQueueSize_) {
-logger_->log_warn("MQTT queue full");
+void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) {
+  if (queue_.size_approx() >= max_queue_size_) {
+logger_->log_error("MQTT queue full");
 return;
   }
 
-  if (gsl::narrow(message->payloadlen) > max_seg_size_) {
-logger_->log_debug("MQTT message was truncated while enqueuing, original 
length: %d", message->payloadlen);
-message->payloadlen = gsl::narrow(max_seg_size_);
-  }
-
-  logger_->log_debug("enqueuing MQTT message with length %d", 
message->payloadlen);
+  logger_->log_debug("enqueuing MQTT message with length %d", 
message.contents->payloadlen);
   queue_.enqueue(std::move(message));
 }
 
-void ConsumeMQTT::onSchedule(const std::shared_ptr 
&context, const std::shared_ptr &factory) {
+void ConsumeMQTT::readProperties(const std::shared_ptr& 
context) {
+  if (auto value = context->getProperty(Topic)) {
+topic_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_);
+
   if (const auto value = context->getProperty(CleanSession)) {
-cleanSession_ = *value;
-logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_);
+clean_session_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_);
+
+  if (const auto value = context->getProperty(CleanStart)) {
+clean_start_ = *value;
+  }
+  logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_);
+
+  if (const auto session_expiry_interval = 
context->getProperty(SessionExpiryInterval)) {
+session_expiry_interval_ = 
std::chrono::duration_cast(session_expiry_interval->getMilliseconds());
+  }
+  logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s", 
int64_t{session_expiry_interval_.count()});
 
   if (const auto value = 
context->getProperty(QueueBufferMaxMessage)) {
-maxQueueSize_ = *value;
-logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", 
maxQueueSize_);
+max_queue_size_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", 
max_queue_size_);
 
-  // this connects to broker, so properties of this processor must be read 
before
-  AbstractMQTTProcessor::onSchedule(context, factory);
-}
+  if (auto value = context->getProperty(AttributeFromContentType)) {
+attribute_from_content_type_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]", 
attribute_from_content_type_);
 
-void ConsumeMQTT::onTrigger(const std::shared_ptr& 
/*context*/, const std::shared_ptr &session) {
-  // reconnect if needed
-  reconnect();
+  if (const auto topic_alias_maximum = 
context->getProperty(TopicAliasMaximum)) {
+topic_alias_maximum_ = gsl::narrow(*topic_alias_maximum);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]", 
topic_alias_maximum_);
 
-  if (!MQTTAsync_isConnected(client_)) {
-logger_->log_error("Could not consume from MQTT broker because 
disconnected to %s", uri_);
-yield();
-return;
+  if (const auto receive_maximum = 
context->getProperty(ReceiveMaximum)) {
+receive_maximum_ = gsl::narrow(*receive_maximum);
   }
+  logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]", 
receive_maximum_);
+}
 
-  std::deque> msg_queue;
-  getReceivedMQTTMsg(msg_queue);
+void ConsumeMQTT::onTriggerImpl(const std::shared_ptr& 
/*context*/, const std::shared_ptr& session) {
+  std::queue msg_queue = getReceivedMqttMessages();
   while (!msg_queue.empty()) {
 const auto& message = msg_queue.front();
-std::shared_ptr processFlowFile = session->create();
-int write_status{};
-session->write(processFlowFile, [&message, &write_status](const 
std::shared_ptr& stream) -> int64_t {
-  if (message->payloadlen < 0) {
-write_status = -1;
-return -1;
-  }
-  const auto len = 
stream->write(reinterpret_cast(message->payload), 
gsl::narrow(message->payloadlen));
-  if (io::isError(len)) {
-write_status = -1;
-return -1;
-  }
-  return gsl::narrow(len);
-});
-if (write_status < 0) {
-  logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", 
processFlowFile->getUUIDStr());
-  session->remove(processFlowFile);
+std::shared_ptr flow_file = session->create();
+WriteCallback write_callback(message, logger_);
+try {
+  session->write(flow_file, write_callback);
+} catch (const Exception& ex) {
+  logger_->log_error("Error when processing message queue: %s", ex.what());
+}
+if (!write_callback.getSuccessStatus

[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-06 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041281303


##
extensions/mqtt/processors/ConsumeMQTT.cpp:
##
@@ -34,111 +34,333 @@ void ConsumeMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr message) {
-  if (queue_.size_approx() >= maxQueueSize_) {
-logger_->log_warn("MQTT queue full");
+void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) {
+  if (queue_.size_approx() >= max_queue_size_) {
+logger_->log_error("MQTT queue full");
 return;
   }
 
-  if (gsl::narrow(message->payloadlen) > max_seg_size_) {
-logger_->log_debug("MQTT message was truncated while enqueuing, original 
length: %d", message->payloadlen);
-message->payloadlen = gsl::narrow(max_seg_size_);
-  }
-
-  logger_->log_debug("enqueuing MQTT message with length %d", 
message->payloadlen);
+  logger_->log_debug("enqueuing MQTT message with length %d", 
message.contents->payloadlen);
   queue_.enqueue(std::move(message));
 }
 
-void ConsumeMQTT::onSchedule(const std::shared_ptr 
&context, const std::shared_ptr &factory) {
+void ConsumeMQTT::readProperties(const std::shared_ptr& 
context) {
+  if (auto value = context->getProperty(Topic)) {
+topic_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_);
+
   if (const auto value = context->getProperty(CleanSession)) {
-cleanSession_ = *value;
-logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_);
+clean_session_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_);
+
+  if (const auto value = context->getProperty(CleanStart)) {
+clean_start_ = *value;
+  }
+  logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_);
+
+  if (const auto session_expiry_interval = 
context->getProperty(SessionExpiryInterval)) {
+session_expiry_interval_ = 
std::chrono::duration_cast(session_expiry_interval->getMilliseconds());
+  }
+  logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s", 
int64_t{session_expiry_interval_.count()});
 
   if (const auto value = 
context->getProperty(QueueBufferMaxMessage)) {
-maxQueueSize_ = *value;
-logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", 
maxQueueSize_);
+max_queue_size_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", 
max_queue_size_);
 
-  // this connects to broker, so properties of this processor must be read 
before
-  AbstractMQTTProcessor::onSchedule(context, factory);
-}
+  if (auto value = context->getProperty(AttributeFromContentType)) {
+attribute_from_content_type_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]", 
attribute_from_content_type_);
 
-void ConsumeMQTT::onTrigger(const std::shared_ptr& 
/*context*/, const std::shared_ptr &session) {
-  // reconnect if needed
-  reconnect();
+  if (const auto topic_alias_maximum = 
context->getProperty(TopicAliasMaximum)) {
+topic_alias_maximum_ = gsl::narrow(*topic_alias_maximum);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]", 
topic_alias_maximum_);
 
-  if (!MQTTAsync_isConnected(client_)) {
-logger_->log_error("Could not consume from MQTT broker because 
disconnected to %s", uri_);
-yield();
-return;
+  if (const auto receive_maximum = 
context->getProperty(ReceiveMaximum)) {
+receive_maximum_ = gsl::narrow(*receive_maximum);
   }
+  logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]", 
receive_maximum_);
+}
 
-  std::deque> msg_queue;
-  getReceivedMQTTMsg(msg_queue);
+void ConsumeMQTT::onTriggerImpl(const std::shared_ptr& 
/*context*/, const std::shared_ptr& session) {
+  std::queue msg_queue = getReceivedMqttMessages();
   while (!msg_queue.empty()) {
 const auto& message = msg_queue.front();
-std::shared_ptr processFlowFile = session->create();
-int write_status{};
-session->write(processFlowFile, [&message, &write_status](const 
std::shared_ptr& stream) -> int64_t {
-  if (message->payloadlen < 0) {
-write_status = -1;
-return -1;
-  }
-  const auto len = 
stream->write(reinterpret_cast(message->payload), 
gsl::narrow(message->payloadlen));
-  if (io::isError(len)) {
-write_status = -1;
-return -1;
-  }
-  return gsl::narrow(len);
-});
-if (write_status < 0) {
-  logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", 
processFlowFile->getUUIDStr());
-  session->remove(processFlowFile);
+std::shared_ptr flow_file = session->create();
+WriteCallback write_callback(message, logger_);
+try {
+  session->write(flow_file, write_callback);
+} catch (const Exception& ex) {
+  logger_->log_error("Error when processing message queue: %s", ex.what());
+}
+if (!write_callback.getSuccessStatus

[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-06 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041281537


##
extensions/mqtt/processors/ConsumeMQTT.cpp:
##
@@ -34,111 +34,333 @@ void ConsumeMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr message) {
-  if (queue_.size_approx() >= maxQueueSize_) {
-logger_->log_warn("MQTT queue full");
+void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) {
+  if (queue_.size_approx() >= max_queue_size_) {
+logger_->log_error("MQTT queue full");
 return;
   }
 
-  if (gsl::narrow(message->payloadlen) > max_seg_size_) {
-logger_->log_debug("MQTT message was truncated while enqueuing, original 
length: %d", message->payloadlen);
-message->payloadlen = gsl::narrow(max_seg_size_);
-  }
-
-  logger_->log_debug("enqueuing MQTT message with length %d", 
message->payloadlen);
+  logger_->log_debug("enqueuing MQTT message with length %d", 
message.contents->payloadlen);
   queue_.enqueue(std::move(message));
 }
 
-void ConsumeMQTT::onSchedule(const std::shared_ptr 
&context, const std::shared_ptr &factory) {
+void ConsumeMQTT::readProperties(const std::shared_ptr& 
context) {
+  if (auto value = context->getProperty(Topic)) {
+topic_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_);
+
   if (const auto value = context->getProperty(CleanSession)) {
-cleanSession_ = *value;
-logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_);
+clean_session_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_);
+
+  if (const auto value = context->getProperty(CleanStart)) {
+clean_start_ = *value;
+  }
+  logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_);
+
+  if (const auto session_expiry_interval = 
context->getProperty(SessionExpiryInterval)) {
+session_expiry_interval_ = 
std::chrono::duration_cast(session_expiry_interval->getMilliseconds());
+  }
+  logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s", 
int64_t{session_expiry_interval_.count()});
 
   if (const auto value = 
context->getProperty(QueueBufferMaxMessage)) {
-maxQueueSize_ = *value;
-logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", 
maxQueueSize_);
+max_queue_size_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", 
max_queue_size_);
 
-  // this connects to broker, so properties of this processor must be read 
before
-  AbstractMQTTProcessor::onSchedule(context, factory);
-}
+  if (auto value = context->getProperty(AttributeFromContentType)) {
+attribute_from_content_type_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]", 
attribute_from_content_type_);
 
-void ConsumeMQTT::onTrigger(const std::shared_ptr& 
/*context*/, const std::shared_ptr &session) {
-  // reconnect if needed
-  reconnect();
+  if (const auto topic_alias_maximum = 
context->getProperty(TopicAliasMaximum)) {
+topic_alias_maximum_ = gsl::narrow(*topic_alias_maximum);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]", 
topic_alias_maximum_);
 
-  if (!MQTTAsync_isConnected(client_)) {
-logger_->log_error("Could not consume from MQTT broker because 
disconnected to %s", uri_);
-yield();
-return;
+  if (const auto receive_maximum = 
context->getProperty(ReceiveMaximum)) {
+receive_maximum_ = gsl::narrow(*receive_maximum);
   }
+  logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]", 
receive_maximum_);
+}
 
-  std::deque> msg_queue;
-  getReceivedMQTTMsg(msg_queue);
+void ConsumeMQTT::onTriggerImpl(const std::shared_ptr& 
/*context*/, const std::shared_ptr& session) {
+  std::queue msg_queue = getReceivedMqttMessages();
   while (!msg_queue.empty()) {
 const auto& message = msg_queue.front();
-std::shared_ptr processFlowFile = session->create();
-int write_status{};
-session->write(processFlowFile, [&message, &write_status](const 
std::shared_ptr& stream) -> int64_t {
-  if (message->payloadlen < 0) {
-write_status = -1;
-return -1;
-  }
-  const auto len = 
stream->write(reinterpret_cast(message->payload), 
gsl::narrow(message->payloadlen));
-  if (io::isError(len)) {
-write_status = -1;
-return -1;
-  }
-  return gsl::narrow(len);
-});
-if (write_status < 0) {
-  logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", 
processFlowFile->getUUIDStr());
-  session->remove(processFlowFile);
+std::shared_ptr flow_file = session->create();
+WriteCallback write_callback(message, logger_);
+try {
+  session->write(flow_file, write_callback);
+} catch (const Exception& ex) {
+  logger_->log_error("Error when processing message queue: %s", ex.what());
+}
+if (!write_callback.getSuccessStatus

[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-06 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041288113


##
extensions/mqtt/processors/PublishMQTT.cpp:
##
@@ -34,76 +34,229 @@ void PublishMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void PublishMQTT::onSchedule(const std::shared_ptr 
&context, const std::shared_ptr &factory) {
+void PublishMQTT::readProperties(const std::shared_ptr& 
context) {
+  if (!context->getProperty(Topic).has_value()) {
+logger_->log_error("PublishMQTT: could not get Topic");
+  }

Review Comment:
   `Topic` is an expression language property. So you are right, it can become 
valid later. I am removing this error message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-06 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041290532


##
extensions/mqtt/processors/PublishMQTT.cpp:
##
@@ -34,76 +34,229 @@ void PublishMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void PublishMQTT::onSchedule(const std::shared_ptr 
&context, const std::shared_ptr &factory) {
+void PublishMQTT::readProperties(const std::shared_ptr& 
context) {
+  if (!context->getProperty(Topic).has_value()) {
+logger_->log_error("PublishMQTT: could not get Topic");
+  }
+
   if (const auto retain_opt = context->getProperty(Retain)) {
 retain_ = *retain_opt;
   }
   logger_->log_debug("PublishMQTT: Retain [%d]", retain_);
 
-  AbstractMQTTProcessor::onSchedule(context, factory);
+  if (const auto message_expiry_interval = 
context->getProperty(MessageExpiryInterval)) {
+message_expiry_interval_ = 
std::chrono::duration_cast(message_expiry_interval->getMilliseconds());
+logger_->log_debug("PublishMQTT: MessageExpiryInterval [%" PRId64 "] s", 
int64_t{message_expiry_interval_->count()});
+  }
+
+  in_flight_message_counter_.setMqttVersion(mqtt_version_);
+  in_flight_message_counter_.setQoS(qos_);
 }
 
-void PublishMQTT::onTrigger(const std::shared_ptr& 
/*context*/, const std::shared_ptr &session) {
-  // reconnect if needed
-  reconnect();
+void PublishMQTT::onTriggerImpl(const std::shared_ptr& 
context, const std::shared_ptr &session) {
+  std::shared_ptr flow_file = session->get();
 
-  if (!MQTTAsync_isConnected(client_)) {
-logger_->log_error("Could not publish to MQTT broker because disconnected 
to %s", uri_);
-yield();
+  if (!flow_file) {
 return;

Review Comment:
   Good point! Added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-06 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041313305


##
extensions/mqtt/processors/PublishMQTT.cpp:
##
@@ -34,76 +34,229 @@ void PublishMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void PublishMQTT::onSchedule(const std::shared_ptr 
&context, const std::shared_ptr &factory) {
+void PublishMQTT::readProperties(const std::shared_ptr& 
context) {
+  if (!context->getProperty(Topic).has_value()) {
+logger_->log_error("PublishMQTT: could not get Topic");
+  }
+
   if (const auto retain_opt = context->getProperty(Retain)) {
 retain_ = *retain_opt;
   }
   logger_->log_debug("PublishMQTT: Retain [%d]", retain_);
 
-  AbstractMQTTProcessor::onSchedule(context, factory);
+  if (const auto message_expiry_interval = 
context->getProperty(MessageExpiryInterval)) {
+message_expiry_interval_ = 
std::chrono::duration_cast(message_expiry_interval->getMilliseconds());
+logger_->log_debug("PublishMQTT: MessageExpiryInterval [%" PRId64 "] s", 
int64_t{message_expiry_interval_->count()});
+  }
+
+  in_flight_message_counter_.setMqttVersion(mqtt_version_);
+  in_flight_message_counter_.setQoS(qos_);
 }
 
-void PublishMQTT::onTrigger(const std::shared_ptr& 
/*context*/, const std::shared_ptr &session) {
-  // reconnect if needed
-  reconnect();
+void PublishMQTT::onTriggerImpl(const std::shared_ptr& 
context, const std::shared_ptr &session) {
+  std::shared_ptr flow_file = session->get();
 
-  if (!MQTTAsync_isConnected(client_)) {
-logger_->log_error("Could not publish to MQTT broker because disconnected 
to %s", uri_);
-yield();
+  if (!flow_file) {
 return;
   }
 
-  std::shared_ptr flowFile = session->get();
+  // broker's Receive Maximum can change after reconnect
+  
in_flight_message_counter_.setMax(broker_receive_maximum_.value_or(MQTT_MAX_RECEIVE_MAXIMUM));
 
-  if (!flowFile) {
-return;
+  const auto topic = getTopic(context, flow_file);
+  try {
+const auto result = session->readBuffer(flow_file);
+if (result.status < 0 || !sendMessage(result.buffer, topic, 
getContentType(context, flow_file), flow_file)) {
+  logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on 
broker %s", flow_file->getUUIDStr(), topic, uri_);
+  session->transfer(flow_file, Failure);
+  return;
+}
+logger_->log_debug("Sent flow file [%s] with length %" PRId64 " to MQTT 
topic '%s' on broker %s", flow_file->getUUIDStr(), result.status, topic, uri_);
+session->transfer(flow_file, Success);
+  } catch (const Exception& ex) {
+logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on 
broker %s, exception string: '%s'", flow_file->getUUIDStr(), topic, uri_, 
ex.what());
+session->transfer(flow_file, Failure);
+  }
+}
+
+bool PublishMQTT::sendMessage(const std::vector& buffer, const 
std::string& topic, const std::string& content_type, const 
std::shared_ptr& flow_file) {
+  if (buffer.size() > 268'435'455) {

Review Comment:
   It is coming from the MQTT specification. Remaining Length field is a 
variable byte integer:
   https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901024
   The maximum value of a Variable Byte Integer is 268,435,455. I found the 
decimal representation all over the internet, so I think we should stay with 
the one more commonly used.
   https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901011
   But I'm extracting it to a constant as requested, I don't like magic numbers 
either.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-06 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041315835


##
extensions/mqtt/processors/PublishMQTT.cpp:
##
@@ -34,76 +34,229 @@ void PublishMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void PublishMQTT::onSchedule(const std::shared_ptr 
&context, const std::shared_ptr &factory) {
+void PublishMQTT::readProperties(const std::shared_ptr& 
context) {
+  if (!context->getProperty(Topic).has_value()) {
+logger_->log_error("PublishMQTT: could not get Topic");
+  }
+
   if (const auto retain_opt = context->getProperty(Retain)) {
 retain_ = *retain_opt;
   }
   logger_->log_debug("PublishMQTT: Retain [%d]", retain_);
 
-  AbstractMQTTProcessor::onSchedule(context, factory);
+  if (const auto message_expiry_interval = 
context->getProperty(MessageExpiryInterval)) {
+message_expiry_interval_ = 
std::chrono::duration_cast(message_expiry_interval->getMilliseconds());
+logger_->log_debug("PublishMQTT: MessageExpiryInterval [%" PRId64 "] s", 
int64_t{message_expiry_interval_->count()});
+  }
+
+  in_flight_message_counter_.setMqttVersion(mqtt_version_);
+  in_flight_message_counter_.setQoS(qos_);
 }
 
-void PublishMQTT::onTrigger(const std::shared_ptr& 
/*context*/, const std::shared_ptr &session) {
-  // reconnect if needed
-  reconnect();
+void PublishMQTT::onTriggerImpl(const std::shared_ptr& 
context, const std::shared_ptr &session) {
+  std::shared_ptr flow_file = session->get();
 
-  if (!MQTTAsync_isConnected(client_)) {
-logger_->log_error("Could not publish to MQTT broker because disconnected 
to %s", uri_);
-yield();
+  if (!flow_file) {
 return;
   }
 
-  std::shared_ptr flowFile = session->get();
+  // broker's Receive Maximum can change after reconnect
+  
in_flight_message_counter_.setMax(broker_receive_maximum_.value_or(MQTT_MAX_RECEIVE_MAXIMUM));
 
-  if (!flowFile) {
-return;
+  const auto topic = getTopic(context, flow_file);
+  try {
+const auto result = session->readBuffer(flow_file);
+if (result.status < 0 || !sendMessage(result.buffer, topic, 
getContentType(context, flow_file), flow_file)) {
+  logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on 
broker %s", flow_file->getUUIDStr(), topic, uri_);
+  session->transfer(flow_file, Failure);
+  return;
+}
+logger_->log_debug("Sent flow file [%s] with length %" PRId64 " to MQTT 
topic '%s' on broker %s", flow_file->getUUIDStr(), result.status, topic, uri_);
+session->transfer(flow_file, Success);
+  } catch (const Exception& ex) {
+logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on 
broker %s, exception string: '%s'", flow_file->getUUIDStr(), topic, uri_, 
ex.what());
+session->transfer(flow_file, Failure);
+  }
+}
+
+bool PublishMQTT::sendMessage(const std::vector& buffer, const 
std::string& topic, const std::string& content_type, const 
std::shared_ptr& flow_file) {
+  if (buffer.size() > 268'435'455) {
+logger_->log_error("Sending message failed because MQTT limit maximum 
packet size [268'435'455] is exceeded by FlowFile of [%zu]", buffer.size());
   }
 
-  PublishMQTT::ReadCallback callback(this, flowFile->getSize(), max_seg_size_, 
topic_, client_, gsl::narrow(qos_), retain_);
-  session->read(flowFile, std::ref(callback));
-  if (callback.status_ < 0) {
-logger_->log_error("Failed to send flow to MQTT topic %s", topic_);
-session->transfer(flowFile, Failure);
+  if (maximum_packet_size_.has_value() && buffer.size() > 
*(maximum_packet_size_)) {
+logger_->log_error("Sending message failed because broker-requested 
maximum packet size [%" PRIu32 "] is exceeded by FlowFile of [%zu]",
+   *maximum_packet_size_, buffer.size());
+  }
+
+  MQTTAsync_message message_to_publish = MQTTAsync_message_initializer;
+  message_to_publish.payload = const_cast(buffer.data());
+  message_to_publish.payloadlen = buffer.size();
+  message_to_publish.qos = qos_.value();
+  message_to_publish.retained = retain_;
+
+  setMqtt5Properties(message_to_publish, content_type, flow_file);
+
+  MQTTAsync_responseOptions response_options = 
MQTTAsync_responseOptions_initializer;
+  if (mqtt_version_ == MqttVersions::V_5_0) {
+response_options.onSuccess5 = sendSuccess5;
+response_options.onFailure5 = sendFailure5;
   } else {
-logger_->log_debug("Sent flow with length %d to MQTT topic %s", 
callback.read_size_, topic_);
-session->transfer(flowFile, Success);
-  }
-}
-
-int64_t PublishMQTT::ReadCallback::operator()(const 
std::shared_ptr& stream) {
-  if (flow_size_ < max_seg_size_)
-max_seg_size_ = flow_size_;
-  gsl_Expects(max_seg_size_ < 
gsl::narrow(std::numeric_limits::max()));
-  std::vector buffer(max_seg_size_);
-  read_size_ = 0;
-  status_ = 0;
-  while (read_size_ < flow_size_) {
-// MQTTClient_message::payloadlen is int, so we can't handle 2GB+
-const auto readRet

[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-06 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041328112


##
extensions/mqtt/processors/PublishMQTT.cpp:
##
@@ -34,76 +34,229 @@ void PublishMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void PublishMQTT::onSchedule(const std::shared_ptr 
&context, const std::shared_ptr &factory) {
+void PublishMQTT::readProperties(const std::shared_ptr& 
context) {
+  if (!context->getProperty(Topic).has_value()) {
+logger_->log_error("PublishMQTT: could not get Topic");
+  }
+
   if (const auto retain_opt = context->getProperty(Retain)) {
 retain_ = *retain_opt;
   }
   logger_->log_debug("PublishMQTT: Retain [%d]", retain_);
 
-  AbstractMQTTProcessor::onSchedule(context, factory);
+  if (const auto message_expiry_interval = 
context->getProperty(MessageExpiryInterval)) {
+message_expiry_interval_ = 
std::chrono::duration_cast(message_expiry_interval->getMilliseconds());
+logger_->log_debug("PublishMQTT: MessageExpiryInterval [%" PRId64 "] s", 
int64_t{message_expiry_interval_->count()});
+  }
+
+  in_flight_message_counter_.setMqttVersion(mqtt_version_);
+  in_flight_message_counter_.setQoS(qos_);
 }
 
-void PublishMQTT::onTrigger(const std::shared_ptr& 
/*context*/, const std::shared_ptr &session) {
-  // reconnect if needed
-  reconnect();
+void PublishMQTT::onTriggerImpl(const std::shared_ptr& 
context, const std::shared_ptr &session) {
+  std::shared_ptr flow_file = session->get();
 
-  if (!MQTTAsync_isConnected(client_)) {
-logger_->log_error("Could not publish to MQTT broker because disconnected 
to %s", uri_);
-yield();
+  if (!flow_file) {
 return;
   }
 
-  std::shared_ptr flowFile = session->get();
+  // broker's Receive Maximum can change after reconnect
+  
in_flight_message_counter_.setMax(broker_receive_maximum_.value_or(MQTT_MAX_RECEIVE_MAXIMUM));
 
-  if (!flowFile) {
-return;
+  const auto topic = getTopic(context, flow_file);
+  try {
+const auto result = session->readBuffer(flow_file);
+if (result.status < 0 || !sendMessage(result.buffer, topic, 
getContentType(context, flow_file), flow_file)) {
+  logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on 
broker %s", flow_file->getUUIDStr(), topic, uri_);
+  session->transfer(flow_file, Failure);
+  return;
+}
+logger_->log_debug("Sent flow file [%s] with length %" PRId64 " to MQTT 
topic '%s' on broker %s", flow_file->getUUIDStr(), result.status, topic, uri_);
+session->transfer(flow_file, Success);
+  } catch (const Exception& ex) {
+logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on 
broker %s, exception string: '%s'", flow_file->getUUIDStr(), topic, uri_, 
ex.what());
+session->transfer(flow_file, Failure);
+  }
+}
+
+bool PublishMQTT::sendMessage(const std::vector& buffer, const 
std::string& topic, const std::string& content_type, const 
std::shared_ptr& flow_file) {
+  if (buffer.size() > 268'435'455) {
+logger_->log_error("Sending message failed because MQTT limit maximum 
packet size [268'435'455] is exceeded by FlowFile of [%zu]", buffer.size());
   }
 
-  PublishMQTT::ReadCallback callback(this, flowFile->getSize(), max_seg_size_, 
topic_, client_, gsl::narrow(qos_), retain_);
-  session->read(flowFile, std::ref(callback));
-  if (callback.status_ < 0) {
-logger_->log_error("Failed to send flow to MQTT topic %s", topic_);
-session->transfer(flowFile, Failure);
+  if (maximum_packet_size_.has_value() && buffer.size() > 
*(maximum_packet_size_)) {
+logger_->log_error("Sending message failed because broker-requested 
maximum packet size [%" PRIu32 "] is exceeded by FlowFile of [%zu]",
+   *maximum_packet_size_, buffer.size());
+  }
+
+  MQTTAsync_message message_to_publish = MQTTAsync_message_initializer;
+  message_to_publish.payload = const_cast(buffer.data());
+  message_to_publish.payloadlen = buffer.size();
+  message_to_publish.qos = qos_.value();
+  message_to_publish.retained = retain_;
+
+  setMqtt5Properties(message_to_publish, content_type, flow_file);
+
+  MQTTAsync_responseOptions response_options = 
MQTTAsync_responseOptions_initializer;
+  if (mqtt_version_ == MqttVersions::V_5_0) {
+response_options.onSuccess5 = sendSuccess5;
+response_options.onFailure5 = sendFailure5;
   } else {
-logger_->log_debug("Sent flow with length %d to MQTT topic %s", 
callback.read_size_, topic_);
-session->transfer(flowFile, Success);
-  }
-}
-
-int64_t PublishMQTT::ReadCallback::operator()(const 
std::shared_ptr& stream) {
-  if (flow_size_ < max_seg_size_)
-max_seg_size_ = flow_size_;
-  gsl_Expects(max_seg_size_ < 
gsl::narrow(std::numeric_limits::max()));
-  std::vector buffer(max_seg_size_);
-  read_size_ = 0;
-  status_ = 0;
-  while (read_size_ < flow_size_) {
-// MQTTClient_message::payloadlen is int, so we can't handle 2GB+
-const auto readRet

[jira] [Created] (NIFI-10955) In JASN1Reader allow preprocess of ASN files to reconcile unsupported features

2022-12-06 Thread Tamas Palfy (Jira)
Tamas Palfy created NIFI-10955:
--

 Summary: In JASN1Reader allow preprocess of ASN files to reconcile 
unsupported features
 Key: NIFI-10955
 URL: https://issues.apache.org/jira/browse/NIFI-10955
 Project: Apache NiFi
  Issue Type: Improvement
Reporter: Tamas Palfy


The ASN specification allows the creation of valid ASN files that has features 
unrecognized by the asn1bean library we are using in JASN1Reader.

We can add a preprocessing step that creates modified versions of the provided 
ASN files (leaving the originals intact) that removes unsupported features in a 
way that makes them less strict but otherwise should still be compatible with 
incoming data.

Identified unsupported features:
 * Certain constraint types
 ** E.g.
{code:java}
SomeType ::= INTEGER (ALL EXCEPT (0..15))
{code}

 * Extension marker (a.k.a "ellipsis" or
{code:java}
...
{code}
)

 ** Can occur in constraints (e.g.
{code:java}
SomeType ::= INTEGER (0..8,...,100..200)
{code}
although
{code:java}
SomeType ::= INTEGER (0..8,...)
{code}
works)

 ** or in type definitions (e.g.
{code:java}
RootType::= SEQUENCE {
field1  INTEGER,
field2  INTEGER,
...,
field3  INTEGER
}
{code}
but this seems to work as well).

 * Version brackets e.g.
{code:java}
SomeType ::= SEQUENCE {
integerField1   INTEGER,
integerField2   INTEGER,
...,
[[ -- from version 2
integerField3   INTEGER,
integerField4   INTEGER ]]
}
{code}
Seems to require extension marker as well.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (NIFI-10632) AWS S3 CopyObject

2022-12-06 Thread Mike Thomsen (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFI-10632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mike Thomsen reassigned NIFI-10632:
---

Assignee: (was: Mike Thomsen)

> AWS S3 CopyObject
> -
>
> Key: NIFI-10632
> URL: https://issues.apache.org/jira/browse/NIFI-10632
> Project: Apache NiFi
>  Issue Type: Improvement
>Reporter: Malthe Borch
>Priority: Minor
>
> AWS S3 provides a 
> [CopyObject|https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html]
>  API call which:
> {quote}Creates a copy of an object that is already stored in Amazon S3.{quote}
> In NiFi, the PutS3Object processor could be extended to support this API such 
> that one can specify a source for the copy operation, either a different AWS 
> Credentials Provider Service or simply a different bucket and/or object name.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (NIFI-10956) Schema Inference returns incorrect datatype for records where some arrays are empty

2022-12-06 Thread Matt Burgess (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFI-10956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matt Burgess reassigned NIFI-10956:
---

Assignee: Matt Burgess

> Schema Inference returns incorrect datatype for records where some arrays are 
> empty
> ---
>
> Key: NIFI-10956
> URL: https://issues.apache.org/jira/browse/NIFI-10956
> Project: Apache NiFi
>  Issue Type: Bug
>Reporter: Matt Burgess
>Assignee: Matt Burgess
>Priority: Major
>
> If in a FlowFile there is an array field in the schema and for at least one 
> record the value is an empty array and for at least one other record the 
> value is for example a record, the inference logic returns a choice between 
> array and array, and it is possible for the array to 
> be used for the array elements even if they are records. 
> For text-based writers such as JsonRecordSetWriter, this results in a string 
> representation of the record, something like "MapRecord[{a=1,b=2}]" instead 
> of an actual record object. This is a result of empty arrays defaulting to 
> array even if they are part of a choice where there are non-empty 
> arrays. Instead the inference logic should determine if any of the possible 
> choice datatypes are empty arrays and remove them from the list of possible 
> choices (unless that is the only choice, in which case it should default to 
> array as it does now).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (NIFI-10956) Schema Inference returns incorrect datatype for records where some arrays are empty

2022-12-06 Thread Matt Burgess (Jira)
Matt Burgess created NIFI-10956:
---

 Summary: Schema Inference returns incorrect datatype for records 
where some arrays are empty
 Key: NIFI-10956
 URL: https://issues.apache.org/jira/browse/NIFI-10956
 Project: Apache NiFi
  Issue Type: Bug
Reporter: Matt Burgess


If in a FlowFile there is an array field in the schema and for at least one 
record the value is an empty array and for at least one other record the value 
is for example a record, the inference logic returns a choice between 
array and array, and it is possible for the array to be 
used for the array elements even if they are records. 

For text-based writers such as JsonRecordSetWriter, this results in a string 
representation of the record, something like "MapRecord[{a=1,b=2}]" instead of 
an actual record object. This is a result of empty arrays defaulting to 
array even if they are part of a choice where there are non-empty 
arrays. Instead the inference logic should determine if any of the possible 
choice datatypes are empty arrays and remove them from the list of possible 
choices (unless that is the only choice, in which case it should default to 
array as it does now).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (NIFI-10951) Null exception error with DBCP Controller service and AWS parameter provider

2022-12-06 Thread Paul Grey (Jira)


[ 
https://issues.apache.org/jira/browse/NIFI-10951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17644074#comment-17644074
 ] 

Paul Grey commented on NIFI-10951:
--

It looks like we narrowed things down to an environment difference between 
stateful and stateless NiFi in this case.

When running the ExecuteStateless processor, it is necessary to explicitly 
define the flow parameters that ExecuteStateless should propagate from its flow 
to its stateless invocation.

I'd like to scope this ticket to providing a sensible error message when 
properties are unresolved from the environment configuration, for whatever 
reason.  It would've been helpful in this situation for the error message to 
state explicitly something along the lines of: "property value for 
'SENSITIVE.Password' is undefined", rather than "NullPointerException".  This 
would have helped resolve things more quickly.


> Null exception error with DBCP Controller service and AWS parameter provider
> 
>
> Key: NIFI-10951
> URL: https://issues.apache.org/jira/browse/NIFI-10951
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: NiFi Stateless
>Affects Versions: 1.18.0, 1.19.0
>Reporter: Lucas Read
>Priority: Major
>
> I have a AWS Secrets Manager parameter provider configured and assigned to a 
> process group. I am trying to execute the process group as a stateless flow 
> but getting a null exception returned. It works when I do not run it as a 
> nonstateless flow but not when it is stateless. 
>  
> {code:java}
> 2022-12-02 23:34:35,762 ERROR [Component Lifecycle for dataflow 
> stateless_test Thread-1] o.a.n.c.s.StandardControllerServiceNode 
> StandardControllerServiceNode[service=DBCPConnectionPool[id=4e17ba60-aba6-3443-83ff-5f6da6de96ac],
>  name=stateless_DBCPConnectionPool, active=true] Failed to invoke @OnEnabled 
> method
> java.lang.NullPointerException: null
> at 
> java.base/java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1011)
> at 
> java.base/java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006)
> at java.base/java.util.Properties.put(Properties.java:1340)
> at 
> org.apache.commons.dbcp2.BasicDataSource.addConnectionProperty(BasicDataSource.java:384)
> at 
> org.apache.nifi.dbcp.DBCPConnectionPool.lambda$configureDataSource$0(DBCPConnectionPool.java:554)
> at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
> at 
> org.apache.nifi.dbcp.DBCPConnectionPool.configureDataSource(DBCPConnectionPool.java:550)
> at 
> org.apache.nifi.dbcp.DBCPConnectionPool.onConfigured(DBCPConnectionPool.java:497)
> at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at 
> org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:145)
> at 
> org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:133)
> at 
> org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:78)
> at 
> org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotation(ReflectionUtils.java:55)
> at 
> org.apache.nifi.controller.service.StandardControllerServiceNode$2.run(StandardControllerServiceNode.java:598)
> at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
> at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
> at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:829)
> 2022-12-02 23:34:35,763 ERROR [Timer-Driven Process Thread-2] 
> o.a.n.p.stateless.ExecuteStateless 
> ExecuteStateless[id=294238ae-8268-1987--fd45d0f8] Could not create 
> dataflow from snapshot
> java.lang.IllegalStateException: Controller Service 
> StandardControllerServiceNode[service=DBCPConnectionPool[id=4e17ba60-aba6-3443-83ff-5f6da6de96ac],
>  name=stateless_DBCPConnectionPool, active=true] has not fully enabled. 
> Current Validation Status is VALID with validation Errors: []
>

[GitHub] [nifi-fds] dependabot[bot] opened a new pull request, #72: Bump qs, body-parser and express

2022-12-06 Thread GitBox


dependabot[bot] opened a new pull request, #72:
URL: https://github.com/apache/nifi-fds/pull/72

   Bumps [qs](https://github.com/ljharb/qs), 
[body-parser](https://github.com/expressjs/body-parser) and 
[express](https://github.com/expressjs/express). These dependencies needed to 
be updated together.
   Updates `qs` from 6.9.6 to 6.11.0
   
   Changelog
   Sourced from https://github.com/ljharb/qs/blob/main/CHANGELOG.md";>qs's 
changelog.
   
   6.11.0
   
   [New] [Fix] stringify: revert 0e903c0; add 
commaRoundTrip option (https://github-redirect.dependabot.com/ljharb/qs/issues/442";>#442)
   [readme] fix version badge
   
   6.10.5
   
   [Fix] stringify: with arrayFormat: comma, 
properly include an explicit [] on a single-item array (https://github-redirect.dependabot.com/ljharb/qs/issues/434";>#434)
   
   6.10.4
   
   [Fix] stringify: with arrayFormat: comma, 
include an explicit [] on a single-item array (https://github-redirect.dependabot.com/ljharb/qs/issues/441";>#441)
   [meta] use npmignore to autogenerate an npmignore file
   [Dev Deps] update eslint, 
@ljharb/eslint-config, aud, has-symbol, 
object-inspect, tape
   
   6.10.3
   
   [Fix] parse: ignore __proto__ keys (https://github-redirect.dependabot.com/ljharb/qs/issues/428";>#428)
   [Robustness] stringify: avoid relying on a global 
undefined (https://github-redirect.dependabot.com/ljharb/qs/issues/427";>#427)
   [actions] reuse common workflows
   [Dev Deps] update eslint, 
@ljharb/eslint-config, object-inspect, 
tape
   
   6.10.2
   
   [Fix] stringify: actually fix cyclic references (https://github-redirect.dependabot.com/ljharb/qs/issues/426";>#426)
   [Fix] stringify: avoid encoding arrayformat comma when 
encodeValuesOnly = true (https://github-redirect.dependabot.com/ljharb/qs/issues/424";>#424)
   [readme] remove travis badge; add github actions/codecov badges; update 
URLs
   [Docs] add note and links for coercing primitive values (https://github-redirect.dependabot.com/ljharb/qs/issues/408";>#408)
   [actions] update codecov uploader
   [actions] update workflows
   [Tests] clean up stringify tests slightly
   [Dev Deps] update eslint, 
@ljharb/eslint-config, aud, 
object-inspect, safe-publish-latest, 
tape
   
   6.10.1
   
   [Fix] stringify: avoid exception on repeated object values 
(https://github-redirect.dependabot.com/ljharb/qs/issues/402";>#402)
   
   6.10.0
   
   [New] stringify: throw on cycles, instead of an infinite 
loop (https://github-redirect.dependabot.com/ljharb/qs/issues/395";>#395, https://github-redirect.dependabot.com/ljharb/qs/issues/394";>#394, https://github-redirect.dependabot.com/ljharb/qs/issues/393";>#393)
   [New] parse: add allowSparse option for 
collapsing arrays with missing indices (https://github-redirect.dependabot.com/ljharb/qs/issues/312";>#312)
   [meta] fix README.md (https://github-redirect.dependabot.com/ljharb/qs/issues/399";>#399)
   [meta] only run npm run dist in publish, not install
   [Dev Deps] update eslint, 
@ljharb/eslint-config, aud, has-symbols, 
tape
   [Tests] fix tests on node v0.6
   [Tests] use ljharb/actions/node/install instead of 
ljharb/actions/node/run
   [Tests] Revert "[meta] ignore eclint transitive audit 
warning"
   
   6.9.7
   
   [Fix] parse: ignore __proto__ keys (https://github-redirect.dependabot.com/ljharb/qs/issues/428";>#428)
   [Fix] stringify: avoid encoding arrayformat comma when 
encodeValuesOnly = true (https://github-redirect.dependabot.com/ljharb/qs/issues/424";>#424)
   [Robustness] stringify: avoid relying on a global 
undefined (https://github-redirect.dependabot.com/ljharb/qs/issues/427";>#427)
   [readme] remove travis badge; add github actions/codecov badges; update 
URLs
   [Docs] add note and links for coercing primitive values (https://github-redirect.dependabot.com/ljharb/qs/issues/408";>#408)
   [Tests] clean up stringify tests slightly
   [meta] fix README.md (https://github-redirect.dependabot.com/ljharb/qs/issues/399";>#399)
   Revert "[meta] ignore eclint transitive audit warning"
   
   
   
   ... (truncated)
   
   
   Commits
   
   https://github.com/ljharb/qs/commit/56763c12ec4fbf72cbb32371cbd386c33cbb";>56763c1
 v6.11.0
   https://github.com/ljharb/qs/commit/ddd3e293b801df7a06cb7f2746462a6ca1dd3fb2";>ddd3e29
 [readme] fix version badge
   https://github.com/ljharb/qs/commit/c31347299f34afca90e8b5ff793eb4d0f77cfa56";>c313472
 [New] [Fix] stringify: revert 0e903c0; add 
commaRoundTrip option
   https://github.com/ljharb/qs/commit/95bc0185e157d400da4f43f1fcf1c7f008fd847e";>95bc018
 v6.10.5
   https://github.com/ljharb/qs/commit/0e903c0a9092618756b0962f1b80655ac0da436a";>0e903c0
 [Fix] stringify: with arrayFormat: comma, properly 
include an explicit `[...
   https://github.com/ljharb/qs/commit/ba9703c0340dfdeb73cb4387d6ab32c37768aa5b";>ba9703c
 v6.10.4
   https://github.com/ljharb/qs/commit/4e440195c7647f21c20bb76340774cb3a0cb6eac";>4e44019
 [Fix] stringify: with arrayFormat: comma, include an 
explicit [] on a

[jira] [Assigned] (NIFI-10836) Support Receiving RFC 3195 Syslog Messages

2022-12-06 Thread Nathan Gough (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFI-10836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nathan Gough reassigned NIFI-10836:
---

Assignee: Nathan Gough

> Support Receiving RFC 3195 Syslog Messages
> --
>
> Key: NIFI-10836
> URL: https://issues.apache.org/jira/browse/NIFI-10836
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: MiNiFi
>Reporter: CHANDAN KUMAR
>Assignee: Nathan Gough
>Priority: Major
>
> [RFC 3195|https://www.rfc-editor.org/rfc/rfc3195] defines a reliable delivery 
> format for syslog messages. The {{ListenTCP}} and {{ListenSyslog}} Processors 
> do not work with this format because messages span multiple lines and both 
> processors expect messages to be terminated by a single newline. A new 
> processor could be created to support handling RFC 3195 messages.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [nifi] mattyb149 opened a new pull request, #6763: NIFI-10956: Fix inference issues with mixed arrays

2022-12-06 Thread GitBox


mattyb149 opened a new pull request, #6763:
URL: https://github.com/apache/nifi/pull/6763

   # Summary
   
   [NIFI-10956](https://issues.apache.org/jira/browse/NIFI-10956)
   
   # Tracking
   
   Please complete the following tracking steps prior to pull request creation.
   
   ### Issue Tracking
   
   - [x] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI) issue 
created
   
   ### Pull Request Tracking
   
   - [x] Pull Request title starts with Apache NiFi Jira issue number, such as 
`NIFI-0`
   - [x] Pull Request commit message starts with Apache NiFi Jira issue number, 
as such `NIFI-0`
   
   ### Pull Request Formatting
   
   - [x] Pull Request based on current revision of the `main` branch
   - [x] Pull Request refers to a feature branch with one commit containing 
changes
   
   # Verification
   
   Please indicate the verification steps performed prior to pull request 
creation.
   
   ### Build
   
   - [ ] Build completed using `mvn clean install -P contrib-check`
 - [ ] JDK 8
 - [x] JDK 11
 - [ ] JDK 17
   
   ### Licensing
   
   - [ ] New dependencies are compatible with the [Apache License 
2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License 
Policy](https://www.apache.org/legal/resolved.html)
   - [ ] New dependencies are documented in applicable `LICENSE` and `NOTICE` 
files
   
   ### Documentation
   
   - [ ] Documentation formatting appears as expected in rendered files
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (NIFI-10956) Schema Inference returns incorrect datatype for records where some arrays are empty

2022-12-06 Thread Matt Burgess (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFI-10956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matt Burgess updated NIFI-10956:

Status: Patch Available  (was: In Progress)

> Schema Inference returns incorrect datatype for records where some arrays are 
> empty
> ---
>
> Key: NIFI-10956
> URL: https://issues.apache.org/jira/browse/NIFI-10956
> Project: Apache NiFi
>  Issue Type: Bug
>Reporter: Matt Burgess
>Assignee: Matt Burgess
>Priority: Major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> If in a FlowFile there is an array field in the schema and for at least one 
> record the value is an empty array and for at least one other record the 
> value is for example a record, the inference logic returns a choice between 
> array and array, and it is possible for the array to 
> be used for the array elements even if they are records. 
> For text-based writers such as JsonRecordSetWriter, this results in a string 
> representation of the record, something like "MapRecord[{a=1,b=2}]" instead 
> of an actual record object. This is a result of empty arrays defaulting to 
> array even if they are part of a choice where there are non-empty 
> arrays. Instead the inference logic should determine if any of the possible 
> choice datatypes are empty arrays and remove them from the list of possible 
> choices (unless that is the only choice, in which case it should default to 
> array as it does now).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (NIFI-4491) Make CaptureChangeMySQL a record-aware processor

2022-12-06 Thread Matt Burgess (Jira)


[ 
https://issues.apache.org/jira/browse/NIFI-4491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17644142#comment-17644142
 ] 

Matt Burgess commented on NIFI-4491:


Agreed, with the caveat that an outgoing record-based FlowFile should contain a 
single transaction if possible and prudent, regardless of the number of records 
within, in order to preserve behavior between source and target DBs.

> Make CaptureChangeMySQL a record-aware processor
> 
>
> Key: NIFI-4491
> URL: https://issues.apache.org/jira/browse/NIFI-4491
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Matt Burgess
>Priority: Major
>
> The main reason CaptureChangeMySQL doesn't leverage the RecordSetWriter API 
> is that those capabilities were being developed in parallel with that 
> processor. Whether a new record-aware processor is better than an improvement 
> to the original is up for discussion; however, it would be a good idea to 
> support the RecordSetWriter API for any CDC (CaptureChangeXYZ) processor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [nifi] dependabot[bot] opened a new pull request, #6764: Bump qs, body-parser and express in /nifi-registry/nifi-registry-core/nifi-registry-web-ui/src/main

2022-12-06 Thread GitBox


dependabot[bot] opened a new pull request, #6764:
URL: https://github.com/apache/nifi/pull/6764

   Bumps [qs](https://github.com/ljharb/qs), 
[body-parser](https://github.com/expressjs/body-parser) and 
[express](https://github.com/expressjs/express). These dependencies needed to 
be updated together.
   Updates `qs` from 6.9.6 to 6.11.0
   
   Changelog
   Sourced from https://github.com/ljharb/qs/blob/main/CHANGELOG.md";>qs's 
changelog.
   
   6.11.0
   
   [New] [Fix] stringify: revert 0e903c0; add 
commaRoundTrip option (https://github-redirect.dependabot.com/ljharb/qs/issues/442";>#442)
   [readme] fix version badge
   
   6.10.5
   
   [Fix] stringify: with arrayFormat: comma, 
properly include an explicit [] on a single-item array (https://github-redirect.dependabot.com/ljharb/qs/issues/434";>#434)
   
   6.10.4
   
   [Fix] stringify: with arrayFormat: comma, 
include an explicit [] on a single-item array (https://github-redirect.dependabot.com/ljharb/qs/issues/441";>#441)
   [meta] use npmignore to autogenerate an npmignore file
   [Dev Deps] update eslint, 
@ljharb/eslint-config, aud, has-symbol, 
object-inspect, tape
   
   6.10.3
   
   [Fix] parse: ignore __proto__ keys (https://github-redirect.dependabot.com/ljharb/qs/issues/428";>#428)
   [Robustness] stringify: avoid relying on a global 
undefined (https://github-redirect.dependabot.com/ljharb/qs/issues/427";>#427)
   [actions] reuse common workflows
   [Dev Deps] update eslint, 
@ljharb/eslint-config, object-inspect, 
tape
   
   6.10.2
   
   [Fix] stringify: actually fix cyclic references (https://github-redirect.dependabot.com/ljharb/qs/issues/426";>#426)
   [Fix] stringify: avoid encoding arrayformat comma when 
encodeValuesOnly = true (https://github-redirect.dependabot.com/ljharb/qs/issues/424";>#424)
   [readme] remove travis badge; add github actions/codecov badges; update 
URLs
   [Docs] add note and links for coercing primitive values (https://github-redirect.dependabot.com/ljharb/qs/issues/408";>#408)
   [actions] update codecov uploader
   [actions] update workflows
   [Tests] clean up stringify tests slightly
   [Dev Deps] update eslint, 
@ljharb/eslint-config, aud, 
object-inspect, safe-publish-latest, 
tape
   
   6.10.1
   
   [Fix] stringify: avoid exception on repeated object values 
(https://github-redirect.dependabot.com/ljharb/qs/issues/402";>#402)
   
   6.10.0
   
   [New] stringify: throw on cycles, instead of an infinite 
loop (https://github-redirect.dependabot.com/ljharb/qs/issues/395";>#395, https://github-redirect.dependabot.com/ljharb/qs/issues/394";>#394, https://github-redirect.dependabot.com/ljharb/qs/issues/393";>#393)
   [New] parse: add allowSparse option for 
collapsing arrays with missing indices (https://github-redirect.dependabot.com/ljharb/qs/issues/312";>#312)
   [meta] fix README.md (https://github-redirect.dependabot.com/ljharb/qs/issues/399";>#399)
   [meta] only run npm run dist in publish, not install
   [Dev Deps] update eslint, 
@ljharb/eslint-config, aud, has-symbols, 
tape
   [Tests] fix tests on node v0.6
   [Tests] use ljharb/actions/node/install instead of 
ljharb/actions/node/run
   [Tests] Revert "[meta] ignore eclint transitive audit 
warning"
   
   6.9.7
   
   [Fix] parse: ignore __proto__ keys (https://github-redirect.dependabot.com/ljharb/qs/issues/428";>#428)
   [Fix] stringify: avoid encoding arrayformat comma when 
encodeValuesOnly = true (https://github-redirect.dependabot.com/ljharb/qs/issues/424";>#424)
   [Robustness] stringify: avoid relying on a global 
undefined (https://github-redirect.dependabot.com/ljharb/qs/issues/427";>#427)
   [readme] remove travis badge; add github actions/codecov badges; update 
URLs
   [Docs] add note and links for coercing primitive values (https://github-redirect.dependabot.com/ljharb/qs/issues/408";>#408)
   [Tests] clean up stringify tests slightly
   [meta] fix README.md (https://github-redirect.dependabot.com/ljharb/qs/issues/399";>#399)
   Revert "[meta] ignore eclint transitive audit warning"
   
   
   
   ... (truncated)
   
   
   Commits
   
   https://github.com/ljharb/qs/commit/56763c12ec4fbf72cbb32371cbd386c33cbb";>56763c1
 v6.11.0
   https://github.com/ljharb/qs/commit/ddd3e293b801df7a06cb7f2746462a6ca1dd3fb2";>ddd3e29
 [readme] fix version badge
   https://github.com/ljharb/qs/commit/c31347299f34afca90e8b5ff793eb4d0f77cfa56";>c313472
 [New] [Fix] stringify: revert 0e903c0; add 
commaRoundTrip option
   https://github.com/ljharb/qs/commit/95bc0185e157d400da4f43f1fcf1c7f008fd847e";>95bc018
 v6.10.5
   https://github.com/ljharb/qs/commit/0e903c0a9092618756b0962f1b80655ac0da436a";>0e903c0
 [Fix] stringify: with arrayFormat: comma, properly 
include an explicit `[...
   https://github.com/ljharb/qs/commit/ba9703c0340dfdeb73cb4387d6ab32c37768aa5b";>ba9703c
 v6.10.4
   https://github.com/ljharb/qs/commit/4e440195c7647f21c20bb76340774cb3a0cb6eac";>4e44019
 [Fix] stringify: with arrayFormat: comma, include an 
explicit [] on a

[GitHub] [nifi] malthe opened a new pull request, #6765: NIFI-9972 Put blob can now pull from another blob source

2022-12-06 Thread GitBox


malthe opened a new pull request, #6765:
URL: https://github.com/apache/nifi/pull/6765

   
   
   
   
   
   
   
   
   
   
   
   
   
   # Summary
   
   [NIFI-9972](https://issues.apache.org/jira/browse/NIFI-9972)
   
   This adds support for the "Put Blob from URL" operation which provides 
service-to-service copying of blobs – the client is only responsible for the 
orchestration which copies individual blocks, not for transferring the data in 
those blocks.
   
   The functionality is added as an optional new data source of the 
[PutAzureBlobStorage_v12](https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-azure-nar/1.19.0/org.apache.nifi.processors.azure.storage.PutAzureBlobStorage_v12/)
 processor.
   
   The test case is not comprehensive in the sense that only the built-in 
credential is tested (account key). Meanwhile, all credentials are indeed 
supported (defined by the `AzureStorageCredentialsType` enum):
   
   - Account key
   - SAS token
   - Access token
   - Managed identity
   - Service principal
   
   The logic of the authentication code is that it's a SAS token (provided 
directly or derived using an account key) or it's based on OAuth2 (which 
captures the remaining cases).
   
   # Tracking
   
   Please complete the following tracking steps prior to pull request creation.
   
   ### Issue Tracking
   
   - [ ] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI) issue 
created
   
   ### Pull Request Tracking
   
   - [ ] Pull Request title starts with Apache NiFi Jira issue number, such as 
`NIFI-0`
   - [ ] Pull Request commit message starts with Apache NiFi Jira issue number, 
as such `NIFI-0`
   
   ### Pull Request Formatting
   
   - [ ] Pull Request based on current revision of the `main` branch
   - [ ] Pull Request refers to a feature branch with one commit containing 
changes
   
   # Verification
   
   Please indicate the verification steps performed prior to pull request 
creation.
   
   ### Build
   
   - [ ] Build completed using `mvn clean install -P contrib-check`
 - [ ] JDK 8
 - [ ] JDK 11
 - [ ] JDK 17
   
   ### Licensing
   
   - [ ] New dependencies are compatible with the [Apache License 
2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License 
Policy](https://www.apache.org/legal/resolved.html)
   - [ ] New dependencies are documented in applicable `LICENSE` and `NOTICE` 
files
   
   ### Documentation
   
   - [ ] Documentation formatting appears as expected in rendered files
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org