[jira] [Commented] (NIFI-8101) Improvement PutDatabaseRecord for refresh table schema cache

2021-01-08 Thread ZhangCheng (Jira)


[ 
https://issues.apache.org/jira/browse/NIFI-8101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17261747#comment-17261747
 ] 

ZhangCheng commented on NIFI-8101:
--

Hi [~mattyb149],happy new year!

Firstly ,it would be better to generate a new Processor for creating(alter 
drop) table. In this way, the function of the Processor is single and 
independent, and we can design the process more flexibly.(And if we want to 
develop new Processors for synchronize the table structure, i think some 
thoughts in Kettle is very useful)

Secondly, I think the 'PutDatabaseRecord' function should focus on the `data`, 
and strive to ensure that the correct data is written to the target table, with 
no missing data and no dirty data. And we should follow the table structure of 
the target table, instead of modifying the target table according to the data 
structure. Modify the target table structure based on the structure of the 
data, I know this is useful sometimes, but the data content in the FlowFile 
should be treated as indeterminate (including the structure of the data).

Even if we provide the ability to synchronize the table structure in our flow, 
I think that modifying the target table should happen before writing data to 
the target table, not when writing data to the target table using 
'putDatabaseRecord' or after some error occurs.I always think that modifying 
the structure of the target table is a very serious thing, and we should do it 
explicitly and visibly when we need to.

Additionally, 'Unmatched Field Behavior' and 'Unmatched Column Behavior' are 
very useful (I really really like this design),There are always situations 
where the incoming data is more or less columns than the target table, and 
these columns are likely to be the ones the designer wants to ignore. NIFI-8101 
is just the enhancement and supplement of Unmatched Field Behavior' and 
'Unmatched Column Behavior'. If the user already knows how to use 
PutDatabaseRecord and understands the 'Unmatched Field Behavior' and 'Unmatched 
Column Behavior', then I believe they will be able to easily accept the 
'Refresh Cached Schema'(Refresh Unmatched Fields/Refresh Unmatched Columns...).

That is my thoughts, what do you think about.

 

> Improvement  PutDatabaseRecord for refresh table schema cache
> -
>
> Key: NIFI-8101
> URL: https://issues.apache.org/jira/browse/NIFI-8101
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: ZhangCheng
>Assignee: ZhangCheng
>Priority: Major
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> * Sometimes,  the target table has changed and the `PutDatabaseRecord` cached 
> outdated  table schema informatiion.  Maybe we need a new property to tell 
> the `PutDatabaseRecord` to refresh the table schema cache under certain 
> conditions. 
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [nifi] exceptionfactory commented on pull request #4734: NIFI-8023 Added toLocalDate() and updated toDate() in DataTypeUtils

2021-01-08 Thread GitBox


exceptionfactory commented on pull request #4734:
URL: https://github.com/apache/nifi/pull/4734#issuecomment-757089861


   @turcsanyip The other use of `AvroTypeUtil.LOGICAL_TYPE_DATE` was in the 
`normalizeValue()`, which was causing the problem with `ConvertRecord`.  The 
use of `new java.sql.Date()` with milliseconds resulted in the system default 
timezone throwing off the converted date.  I updated `normalizeValue()` to use 
`LocalDate.ofEpochDay()` and added a unit test method to confirm the corrected 
behavior.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] exceptionfactory commented on pull request #4729: NIFI-8094 Added support for BCFKS Keystore Type

2021-01-08 Thread GitBox


exceptionfactory commented on pull request #4729:
URL: https://github.com/apache/nifi/pull/4729#issuecomment-757076803


   Thanks for the review and testing @thenatog!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] thenatog commented on pull request #4729: NIFI-8094 Added support for BCFKS Keystore Type

2021-01-08 Thread GitBox


thenatog commented on pull request #4729:
URL: https://github.com/apache/nifi/pull/4729#issuecomment-757070464


   Looks good to me, I set up a cluster using a BCFKS keystore and again just 
using JKS to check backwards compatibility. Also created a InvokeHTTP processor 
using BCFKS as the keystore. Was able to send data to a ListenHTTP.
   
   +1



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] MikeThomsen commented on a change in pull request #4739: NIFI-7906 Fix a Cypher bug that was missed during initial review

2021-01-08 Thread GitBox


MikeThomsen commented on a change in pull request #4739:
URL: https://github.com/apache/nifi/pull/4739#discussion_r554147955



##
File path: 
nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecord.java
##
@@ -240,6 +259,7 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
 delta = (end - start) / 1000;
 if (getLogger().isDebugEnabled()){
 getLogger().debug(String.format("Took %s seconds.", delta));
+getLogger().debug(String.format("Handled %d records", 
records));

Review comment:
   I prefer to put the allocation inside of that check, but yeah, it could 
be one line.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] MikeThomsen commented on a change in pull request #4739: NIFI-7906 Fix a Cypher bug that was missed during initial review

2021-01-08 Thread GitBox


MikeThomsen commented on a change in pull request #4739:
URL: https://github.com/apache/nifi/pull/4739#discussion_r554147729



##
File path: 
nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecord.java
##
@@ -223,14 +237,19 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
 }
 
 dynamicPropertyMap.putAll(input.getAttributes());
+if (getLogger().isDebugEnabled()) {
+getLogger().info("Parameter map: " + 
dynamicPropertyMap);
+}
 List> graphResponses = new 
ArrayList<>(executeQuery(recordScript, dynamicPropertyMap));
 
 OutputStream graphOutputStream = session.write(graph);
 String graphOutput = 
mapper.writerWithDefaultPrettyPrinter().writeValueAsString(graphResponses);
 
graphOutputStream.write(graphOutput.getBytes(StandardCharsets.UTF_8));
 graphOutputStream.close();
 session.transfer(graph, GRAPH);
+records++;
 } catch (Exception e) {
+getLogger().error("Error processing record", e);

Review comment:
   Not a good one, but I decided to add the record index.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (NIFI-8126) Include Total Queued Duration in metrics reported via ConnectionStatus

2021-01-08 Thread Jon Kessler (Jira)
Jon Kessler created NIFI-8126:
-

 Summary: Include Total Queued Duration in metrics reported via 
ConnectionStatus
 Key: NIFI-8126
 URL: https://issues.apache.org/jira/browse/NIFI-8126
 Project: Apache NiFi
  Issue Type: Improvement
  Components: Core Framework
Affects Versions: 1.12.1
Reporter: Jon Kessler
Assignee: Jon Kessler


On the graph, when listing a queue, you are able to see the queued duration for 
individual flowfiles. I believe that either a total queued duration or an 
average queued duration for the connection as a whole would be a valuable 
metric to include in the ConnectionStatus object that is available to 
ReportingTasks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (NIFI-8120) Improve HandleHttpResponse processing of Runtime Exceptions

2021-01-08 Thread Peter Turcsanyi (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFI-8120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Peter Turcsanyi updated NIFI-8120:
--
Fix Version/s: 1.13.0
   Resolution: Fixed
   Status: Resolved  (was: Patch Available)

> Improve HandleHttpResponse processing of Runtime Exceptions
> ---
>
> Key: NIFI-8120
> URL: https://issues.apache.org/jira/browse/NIFI-8120
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Affects Versions: 1.2.0, 1.12.1
>Reporter: David Handermann
>Assignee: David Handermann
>Priority: Minor
>  Labels: HTTP, HandleHttpResponse
> Fix For: 1.13.0
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> When catching a {{ProcessException}} while trying to export Flow File 
> contents to an HTTP output stream, the {{HandleHttpResponse}} Processor will 
> attempt to call {{HttpContextMap.complete()}} in order to finish processing 
> the HTTP transaction.  If the HTTP Context Identifier is no longer present in 
> the {{HttpContextMap}} or if the HTTP client associated with the transaction 
> has already terminated the connection, {{HttpContextMap.complete()}} will 
> throw an {{IllegalStateException}} or some other type of 
> {{RuntimeException}}.  When this occurs, {{HandleHttpResponse}} falls back to 
> administratively yielding and does not route the Flow File to the failure 
> relationship.  The end result is that Flow Files associated with closed HTTP 
> connections remain queued.
> The other call to {{HttpContextMap.complete()}} already handles an 
> {{IllegalStateException}} so both calls should be handled in the same way to 
> avoid leaving files in the queue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (NIFI-8120) Improve HandleHttpResponse processing of Runtime Exceptions

2021-01-08 Thread ASF subversion and git services (Jira)


[ 
https://issues.apache.org/jira/browse/NIFI-8120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17261457#comment-17261457
 ] 

ASF subversion and git services commented on NIFI-8120:
---

Commit a4027e8e77facdf94817e343d88ecbfeeacbf7fc in nifi's branch 
refs/heads/main from exceptionfactory
[ https://gitbox.apache.org/repos/asf?p=nifi.git;h=a4027e8 ]

NIFI-8120 Added RuntimeException handling on HttpContextMap.complete()

NIFI-8120 Renamed exception variable and reordered log statements

This closes #4747.

Signed-off-by: Peter Turcsanyi 


> Improve HandleHttpResponse processing of Runtime Exceptions
> ---
>
> Key: NIFI-8120
> URL: https://issues.apache.org/jira/browse/NIFI-8120
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Affects Versions: 1.2.0, 1.12.1
>Reporter: David Handermann
>Assignee: David Handermann
>Priority: Minor
>  Labels: HTTP, HandleHttpResponse
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> When catching a {{ProcessException}} while trying to export Flow File 
> contents to an HTTP output stream, the {{HandleHttpResponse}} Processor will 
> attempt to call {{HttpContextMap.complete()}} in order to finish processing 
> the HTTP transaction.  If the HTTP Context Identifier is no longer present in 
> the {{HttpContextMap}} or if the HTTP client associated with the transaction 
> has already terminated the connection, {{HttpContextMap.complete()}} will 
> throw an {{IllegalStateException}} or some other type of 
> {{RuntimeException}}.  When this occurs, {{HandleHttpResponse}} falls back to 
> administratively yielding and does not route the Flow File to the failure 
> relationship.  The end result is that Flow Files associated with closed HTTP 
> connections remain queued.
> The other call to {{HttpContextMap.complete()}} already handles an 
> {{IllegalStateException}} so both calls should be handled in the same way to 
> avoid leaving files in the queue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (NIFI-8120) Improve HandleHttpResponse processing of Runtime Exceptions

2021-01-08 Thread ASF subversion and git services (Jira)


[ 
https://issues.apache.org/jira/browse/NIFI-8120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17261456#comment-17261456
 ] 

ASF subversion and git services commented on NIFI-8120:
---

Commit a4027e8e77facdf94817e343d88ecbfeeacbf7fc in nifi's branch 
refs/heads/main from exceptionfactory
[ https://gitbox.apache.org/repos/asf?p=nifi.git;h=a4027e8 ]

NIFI-8120 Added RuntimeException handling on HttpContextMap.complete()

NIFI-8120 Renamed exception variable and reordered log statements

This closes #4747.

Signed-off-by: Peter Turcsanyi 


> Improve HandleHttpResponse processing of Runtime Exceptions
> ---
>
> Key: NIFI-8120
> URL: https://issues.apache.org/jira/browse/NIFI-8120
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Affects Versions: 1.2.0, 1.12.1
>Reporter: David Handermann
>Assignee: David Handermann
>Priority: Minor
>  Labels: HTTP, HandleHttpResponse
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> When catching a {{ProcessException}} while trying to export Flow File 
> contents to an HTTP output stream, the {{HandleHttpResponse}} Processor will 
> attempt to call {{HttpContextMap.complete()}} in order to finish processing 
> the HTTP transaction.  If the HTTP Context Identifier is no longer present in 
> the {{HttpContextMap}} or if the HTTP client associated with the transaction 
> has already terminated the connection, {{HttpContextMap.complete()}} will 
> throw an {{IllegalStateException}} or some other type of 
> {{RuntimeException}}.  When this occurs, {{HandleHttpResponse}} falls back to 
> administratively yielding and does not route the Flow File to the failure 
> relationship.  The end result is that Flow Files associated with closed HTTP 
> connections remain queued.
> The other call to {{HttpContextMap.complete()}} already handles an 
> {{IllegalStateException}} so both calls should be handled in the same way to 
> avoid leaving files in the queue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [nifi] asfgit closed pull request #4747: NIFI-8120 Added RuntimeException handling on HttpContextMap.complete()

2021-01-08 Thread GitBox


asfgit closed pull request #4747:
URL: https://github.com/apache/nifi/pull/4747


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (NIFI-8125) Provide a way to prefer JWT when client cert is present

2021-01-08 Thread Bryan Bende (Jira)
Bryan Bende created NIFI-8125:
-

 Summary: Provide a way to prefer JWT when client cert is present
 Key: NIFI-8125
 URL: https://issues.apache.org/jira/browse/NIFI-8125
 Project: Apache NiFi
  Issue Type: Improvement
Reporter: Bryan Bende
Assignee: Bryan Bende


Currently the x509 filter always runs before the JWT filter, but there may be a 
case where a client cert is sent in the request along with an Authorization 
header containing the JWT where the user would prefer the JWT to be used. We 
need a way to control this, or possibly make JWT the first check



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [nifi-minifi-cpp] arpadboda closed pull request #969: MINIFICPP-1439 - Startup without agent class, handle class update

2021-01-08 Thread GitBox


arpadboda closed pull request #969:
URL: https://github.com/apache/nifi-minifi-cpp/pull/969


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] urbandan commented on a change in pull request #4730: NIFI-8095: Created StatelessNiFi Sink Connector and Source Connector.…

2021-01-08 Thread GitBox


urbandan commented on a change in pull request #4730:
URL: https://github.com/apache/nifi/pull/4730#discussion_r554047254



##
File path: 
nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSourceConnector.java
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.source.SourceConnector;
+import 
org.apache.nifi.kafka.connect.validators.ConnectRegularExpressionValidator;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class StatelessNiFiSourceConnector extends SourceConnector {
+static final String OUTPUT_PORT_NAME = "output.port";
+static final String TOPIC_NAME = "topic.name";
+
+static final String TOPIC_NAME_ATTRIBUTE = "topic.name.attribute";
+static final String KEY_ATTRIBUTE = "key.attribute";
+static final String HEADER_REGEX = "header.attribute.regex";
+
+private Map properties;
+
+@Override
+public void start(final Map properties) {
+this.properties = new HashMap<>(properties);
+}
+
+@Override
+public void reconfigure(final Map properties) {
+this.properties = new HashMap<>(this.properties);
+}
+
+@Override
+public Class taskClass() {
+return StatelessNiFiSourceTask.class;
+}
+
+@Override
+public List> taskConfigs(final int maxTasks) {
+final List> configs = new ArrayList<>();
+for (int i=0; i < maxTasks; i++) {
+configs.add(new HashMap<>(properties));

Review comment:
   At this phase, the connector has to have some information about the 
input data, and partition that between the created tasks. Not sure how hard it 
is, or if the stateless nifi flows can coordinate under the hood.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] thenatog commented on pull request #4729: NIFI-8094 Added support for BCFKS Keystore Type

2021-01-08 Thread GitBox


thenatog commented on pull request #4729:
URL: https://github.com/apache/nifi/pull/4729#issuecomment-756837064


   Reviewing



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (MINIFICPP-1396) Create FetchS3Object processor

2021-01-08 Thread Gabor Gyimesi (Jira)


 [ 
https://issues.apache.org/jira/browse/MINIFICPP-1396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gyimesi resolved MINIFICPP-1396.
--
Resolution: Fixed

> Create FetchS3Object processor
> --
>
> Key: MINIFICPP-1396
> URL: https://issues.apache.org/jira/browse/MINIFICPP-1396
> Project: Apache NiFi MiNiFi C++
>  Issue Type: New Feature
>Reporter: Gabor Gyimesi
>Assignee: Gabor Gyimesi
>Priority: Major
>  Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> Create new processor to fetch existing S3 object with similar functionality 
> defined in Nifi:
> https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-aws-nar/1.12.1/org.apache.nifi.processors.aws.s3.FetchS3Object/index.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [nifi] exceptionfactory commented on pull request #4747: NIFI-8120 Added RuntimeException handling on HttpContextMap.complete()

2021-01-08 Thread GitBox


exceptionfactory commented on pull request #4747:
URL: https://github.com/apache/nifi/pull/4747#issuecomment-756821716


   @turcsanyip Thanks for the review and detailed feedback.  I pushed an update 
incorporating your recommendations.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] exceptionfactory commented on a change in pull request #4747: NIFI-8120 Added RuntimeException handling on HttpContextMap.complete()

2021-01-08 Thread GitBox


exceptionfactory commented on a change in pull request #4747:
URL: https://github.com/apache/nifi/pull/4747#discussion_r554015927



##
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java
##
@@ -194,7 +192,11 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
 } catch (final ProcessException e) {
 session.transfer(flowFile, REL_FAILURE);
 getLogger().error("Failed to respond to HTTP request for {} due to 
{}", new Object[]{flowFile, e});
-contextMap.complete(contextIdentifier);
+try {

Review comment:
   That makes sense, reordered this and other `session.transfer()` calls to 
occur after logging.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] szaszm closed pull request #895: MINIFICPP-1352 - Comment out unused parameters (for enabling -Wall)

2021-01-08 Thread GitBox


szaszm closed pull request #895:
URL: https://github.com/apache/nifi-minifi-cpp/pull/895


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] exceptionfactory commented on a change in pull request #4747: NIFI-8120 Added RuntimeException handling on HttpContextMap.complete()

2021-01-08 Thread GitBox


exceptionfactory commented on a change in pull request #4747:
URL: https://github.com/apache/nifi/pull/4747#discussion_r554015409



##
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java
##
@@ -204,7 +206,7 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
 
 try {
 contextMap.complete(contextIdentifier);
-} catch (final IllegalStateException ise) {
+} catch (final RuntimeException ise) {

Review comment:
   Renamed to `ce` as requested.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] arpadboda closed pull request #970: MINIFICPP-1396 Create FetchS3Object processor

2021-01-08 Thread GitBox


arpadboda closed pull request #970:
URL: https://github.com/apache/nifi-minifi-cpp/pull/970


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] turcsanyip commented on a change in pull request #4747: NIFI-8120 Added RuntimeException handling on HttpContextMap.complete()

2021-01-08 Thread GitBox


turcsanyip commented on a change in pull request #4747:
URL: https://github.com/apache/nifi/pull/4747#discussion_r554009142



##
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java
##
@@ -204,7 +206,7 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
 
 try {
 contextMap.complete(contextIdentifier);
-} catch (final IllegalStateException ise) {
+} catch (final RuntimeException ise) {

Review comment:
   `ise` seems to be a leftover from the previous version.
   Could you please use `ce` here too like in line 197. 

##
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java
##
@@ -194,7 +192,11 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
 } catch (final ProcessException e) {
 session.transfer(flowFile, REL_FAILURE);
 getLogger().error("Failed to respond to HTTP request for {} due to 
{}", new Object[]{flowFile, e});
-contextMap.complete(contextIdentifier);
+try {

Review comment:
   Not a new issue but it could be improved around exception handling:
   The typical (by convention) error handling flow is logging first, then 
sending the FF to FAILURE (in case of an exception in `session.transfer()`). 
Like in the last catch block at lines 210-211.
   
   This catch and the others throughout onTrigger()  do it vice versa.
   I would consider moving the
   `session.transfer(flowFile, REL_FAILURE);`
   lines just before the return statements.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] szaszm commented on pull request #973: MINIFICPP-1446 - SQL extension doesn't compile on Debian

2021-01-08 Thread GitBox


szaszm commented on pull request #973:
URL: https://github.com/apache/nifi-minifi-cpp/pull/973#issuecomment-756767879


   A bit more info after trying to build this PR:
   ```
   szaszm@szaszm-pc:~/nifi-minifi-cpp-2/cmake-build-relwithdebinfo-gcc-large$ 
find -name libsoci_odbc.a
   
./extensions/sql/soci-external-prefix/src/soci-external-build/lib/libsoci_odbc.a
   ./extensions/sql/thirdparty/soci-install/lib64/libsoci_odbc.a
   szaszm@szaszm-pc:~/nifi-minifi-cpp-2/cmake-build-relwithdebinfo-gcc-large$ 
find -name libiodbc.a
   ./extensions/sql/thirdparty/iodbc-install/lib/libiodbc.a
   ./extensions/sql/thirdparty/iodbc-src/iodbc/.libs/libiodbc.a
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] arpadboda commented on pull request #973: MINIFICPP-1446 - SQL extension doesn't compile on Debian

2021-01-08 Thread GitBox


arpadboda commented on pull request #973:
URL: https://github.com/apache/nifi-minifi-cpp/pull/973#issuecomment-756765763


   > Not debian, but I get this error when trying to build this PR with SQL 
enabled
   > `make[2]: *** No rule to make target 
'extensions/sql/thirdparty/iodbc-install//lib64/libiodbc.a', needed by 
'main/minifi'. Stop.`
   > on main I get a similar error, but for `libsoci_odbc.a` instead.
   
   This PR was meant to address the latter, but cmake caching is so f... up 
that you never know when you have the good setup. Will tackle this a bit more. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] szaszm commented on pull request #973: MINIFICPP-1446 - SQL extension doesn't compile on Debian

2021-01-08 Thread GitBox


szaszm commented on pull request #973:
URL: https://github.com/apache/nifi-minifi-cpp/pull/973#issuecomment-756763519


   Not debian, but I get this error when trying to build this PR with SQL 
enabled
   ```make[2]: *** No rule to make target 
'extensions/sql/thirdparty/iodbc-install//lib64/libiodbc.a', needed by 
'main/minifi'.  Stop.```
   on main I get a similar error, but for `libsoci_odbc.a` instead.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] urbandan commented on a change in pull request #4730: NIFI-8095: Created StatelessNiFi Sink Connector and Source Connector.…

2021-01-08 Thread GitBox


urbandan commented on a change in pull request #4730:
URL: https://github.com/apache/nifi/pull/4730#discussion_r553786280



##
File path: 
nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSinkTask.java
##
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.stateless.flow.DataflowTrigger;
+import org.apache.nifi.stateless.flow.StatelessDataflow;
+import org.apache.nifi.stateless.flow.TriggerResult;
+import org.apache.nifi.util.FormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+public class StatelessNiFiSinkTask extends SinkTask {
+private static final Logger logger = 
LoggerFactory.getLogger(StatelessNiFiSinkTask.class);
+
+private StatelessDataflow dataflow;
+private String inputPortName;
+private Set failurePortNames;
+private long timeoutMillis;
+private Pattern headerNameRegex;
+private String headerNamePrefix;
+private int batchSize;
+private long batchBytes;
+private QueueSize queueSize;
+private String dataflowName;
+
+private long backoffMillis = 0L;
+
+@Override
+public String version() {
+return StatelessKafkaConnectorUtil.getVersion();
+}
+
+@Override
+public void start(final Map properties) {
+logger.info("Starting Sink Task with properties {}", 
StatelessKafkaConnectorUtil.getLoggableProperties(properties));
+
+final String timeout = 
properties.getOrDefault(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, 
StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT);
+timeoutMillis = (long) FormatUtils.getPreciseTimeDuration(timeout, 
TimeUnit.MILLISECONDS);
+
+dataflowName = 
properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME);
+
+final String regex = 
properties.get(StatelessNiFiSinkConnector.HEADERS_AS_ATTRIBUTES_REGEX);
+headerNameRegex = regex == null ? null : Pattern.compile(regex);
+headerNamePrefix = 
properties.getOrDefault(StatelessNiFiSinkConnector.HEADER_ATTRIBUTE_NAME_PREFIX,
 "");
+
+batchSize = 
Integer.parseInt(properties.getOrDefault(StatelessNiFiSinkConnector.BATCH_SIZE_COUNT,
 "0"));
+batchBytes = 
Long.parseLong(properties.getOrDefault(StatelessNiFiSinkConnector.BATCH_SIZE_BYTES,
 "0"));
+
+dataflow = StatelessKafkaConnectorUtil.createDataflow(properties);
+
+// Determine input port name. If input port is explicitly set, use the 
value given. Otherwise, if only one port exists, use that. Otherwise, throw 
ConfigException.
+final String dataflowName = 
properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME);
+inputPortName = 
properties.get(StatelessNiFiSinkConnector.INPUT_PORT_NAME);
+if (inputPortName == null) {
+final Set inputPorts = dataflow.getInputPortNames();
+if (inputPorts.isEmpty()) {
+throw new ConfigException("The dataflow specified for <" + 
dataflowName + "> does not have an Input Port at the root level. Dataflows used 
for a Kafka Connect Sink Task "
++ "must have at least one Input Port at the root level.");
+}
+
+if (inputPorts.size() > 1) {
+throw new ConfigException("The dataflow specified for <" + 
dataflowName + "> has multiple Input Ports at the root 

[GitHub] [nifi] moranr commented on pull request #4563: NIFI-7738 Reverse Provenance Query

2021-01-08 Thread GitBox


moranr commented on pull request #4563:
URL: https://github.com/apache/nifi/pull/4563#issuecomment-756752576


   @NissimShiman – good catch. That style text is used elsewhere in that way, 
so I'd say you're correct to apply here as well. Looks good to me; thanks for 
your work and attention to detail!
   CC @markap14 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] heritamas commented on a change in pull request #4730: NIFI-8095: Created StatelessNiFi Sink Connector and Source Connector.…

2021-01-08 Thread GitBox


heritamas commented on a change in pull request #4730:
URL: https://github.com/apache/nifi/pull/4730#discussion_r553931809



##
File path: 
nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSourceTask.java
##
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.stateless.flow.DataflowTrigger;
+import org.apache.nifi.stateless.flow.StatelessDataflow;
+import org.apache.nifi.stateless.flow.TriggerResult;
+import org.apache.nifi.util.FormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
+
+public class StatelessNiFiSourceTask extends SourceTask {
+private static final Logger logger = 
LoggerFactory.getLogger(StatelessNiFiSourceTask.class);
+
+private StatelessDataflow dataflow;
+private String outputPortName;
+private String topicName;
+private String topicNameAttribute;
+private TriggerResult triggerResult;
+private String keyAttributeName;
+private Pattern headerAttributeNamePattern;
+private long timeoutMillis;
+private String dataflowName;
+
+private final AtomicLong unacknowledgedRecords = new AtomicLong(0L);
+
+@Override
+public String version() {
+return StatelessKafkaConnectorUtil.getVersion();
+}
+
+@Override
+public void start(final Map properties) {
+logger.info("Starting Source Task with properties {}", properties);
+
+final String timeout = 
properties.getOrDefault(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, 
StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT);
+timeoutMillis = (long) FormatUtils.getPreciseTimeDuration(timeout, 
TimeUnit.MILLISECONDS);
+
+topicName = properties.get(StatelessNiFiSourceConnector.TOPIC_NAME);
+topicNameAttribute = 
properties.get(StatelessNiFiSourceConnector.TOPIC_NAME_ATTRIBUTE);
+keyAttributeName = 
properties.get(StatelessNiFiSourceConnector.KEY_ATTRIBUTE);
+
+if (topicName == null && topicNameAttribute == null) {
+throw new ConfigException("Either the topic.name or 
topic.name.attribute configuration must be specified");
+}
+
+final String headerRegex = 
properties.get(StatelessNiFiSourceConnector.HEADER_REGEX);
+headerAttributeNamePattern = headerRegex == null ? null : 
Pattern.compile(headerRegex);
+
+dataflow = StatelessKafkaConnectorUtil.createDataflow(properties);
+
+// Determine the name of the Output Port to retrieve data from
+dataflowName = properties.get("name");
+outputPortName = 
properties.get(StatelessNiFiSourceConnector.OUTPUT_PORT_NAME);
+if (outputPortName == null) {
+final Set outputPorts = dataflow.getOutputPortNames();
+if (outputPorts.isEmpty()) {
+throw new ConfigException("The dataflow specified for <" + 
dataflowName + "> does not have an Output Port at the root level. Dataflows 
used for a Kafka Connect Source Task "
++ "must have at least one Output Port at the root level.");
+}
+
+if (outputPorts.size() > 1) {
+throw new ConfigException("The dataflow specified for <" + 
dataflowName + "> has multiple Output Ports at the root level (" + 
outputPorts.toString()
++ "). The " + 
StatelessNiFiSourceConnector.OUTPUT_PORT_NAME + " property must be set to 
indicate which of these Ports Kafka records should 

[GitHub] [nifi] exceptionfactory commented on pull request #4749: NIFI-8121 Updated ListenHTTP with inferred Client Authentication Policy

2021-01-08 Thread GitBox


exceptionfactory commented on pull request #4749:
URL: https://github.com/apache/nifi/pull/4749#issuecomment-756743075


   Thanks for the review and testing @turcsanyip!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-08 Thread GitBox


hunyadi-dev commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r553914086



##
File path: extensions/librdkafka/ConsumeKafka.cpp
##
@@ -0,0 +1,582 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConsumeKafka.h"
+
+#include 
+#include 
+
+#include "core/PropertyValidation.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/gsl.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+// The upper limit for Max Poll Time is 4 seconds. This is because Watchdog 
would potentially start
+// reporting issues with the processor health otherwise
+class ConsumeKafkaMaxPollTimeValidator : public TimePeriodValidator {
+ public:
+  ConsumeKafkaMaxPollTimeValidator(const std::string ) // NOLINT
+  : TimePeriodValidator(name) {
+  }
+  ~ConsumeKafkaMaxPollTimeValidator() override = default;
+
+  ValidationResult validate(const std::string& subject, const std::string& 
input) const override {
+uint64_t value;
+TimeUnit timeUnit;
+uint64_t value_as_ms;
+return 
ValidationResult::Builder::createBuilder().withSubject(subject).withInput(input).isValid(
+core::TimePeriodValue::StringToTime(input, value, timeUnit) &&
+org::apache::nifi::minifi::core::Property::ConvertTimeUnitToMS(value, 
timeUnit, value_as_ms) &&
+0 < value_as_ms && value_as_ms <= 4000).build();
+  }
+};
+}  // namespace core
+namespace processors {
+
+constexpr const std::size_t ConsumeKafka::DEFAULT_MAX_POLL_RECORDS;
+constexpr char const* ConsumeKafka::DEFAULT_MAX_POLL_TIME;
+
+constexpr char const* ConsumeKafka::TOPIC_FORMAT_NAMES;
+constexpr char const* ConsumeKafka::TOPIC_FORMAT_PATTERNS;
+
+core::Property 
ConsumeKafka::KafkaBrokers(core::PropertyBuilder::createProperty("Kafka 
Brokers")
+  ->withDescription("A comma-separated list of known Kafka Brokers in the 
format :.")
+  ->withDefaultValue("localhost:9092", 
core::StandardValidators::get().NON_BLANK_VALIDATOR)
+  ->supportsExpressionLanguage(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::SecurityProtocol(core::PropertyBuilder::createProperty("Security 
Protocol")
+  ->withDescription("This property is currently not supported. Protocol used 
to communicate with brokers. Corresponds to Kafka's 'security.protocol' 
property.")
+  ->withAllowableValues({SECURITY_PROTOCOL_PLAINTEXT/*, 
SECURITY_PROTOCOL_SSL, SECURITY_PROTOCOL_SASL_PLAINTEXT, 
SECURITY_PROTOCOL_SASL_SSL*/ })
+  ->withDefaultValue(SECURITY_PROTOCOL_PLAINTEXT)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::TopicNames(core::PropertyBuilder::createProperty("Topic Names")
+  ->withDescription("The name of the Kafka Topic(s) to pull from. Multiple 
topic names are supported as a comma separated list.")
+  ->supportsExpressionLanguage(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::TopicNameFormat(core::PropertyBuilder::createProperty("Topic Name 
Format")
+  ->withDescription("Specifies whether the Topic(s) provided are a comma 
separated list of names or a single regular expression.")
+  ->withAllowableValues({TOPIC_FORMAT_NAMES, 
TOPIC_FORMAT_PATTERNS})
+  ->withDefaultValue(TOPIC_FORMAT_NAMES)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::HonorTransactions(core::PropertyBuilder::createProperty("Honor 
Transactions")
+  ->withDescription(
+  "Specifies whether or not MiNiFi should honor transactional guarantees 
when communicating with Kafka. If false, the Processor will use an \"isolation 
level\" of "
+  "read_uncomitted. This means that messages will be received as soon as 
they are written to Kafka but will be pulled, even if the producer cancels the 
transactions. "
+  "If this value is true, MiNiFi will not receive any messages for which 
the producer's transaction was canceled, but this can result in some latency 
since the consumer "
+  "must wait for the producer to finish its entire transaction instead of 
pulling as the messages become available.")
+  ->withDefaultValue(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 

[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-08 Thread GitBox


hunyadi-dev commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r553912946



##
File path: libminifi/include/core/FlowFile.h
##
@@ -134,7 +135,9 @@ class FlowFile : public CoreComponent, public 
ReferenceContainer {
* @param value value to set
* @return result of finding key
*/
-  bool getAttribute(std::string key, std::string& value) const;
+  bool getAttribute(const std::string& key, std::string& value) const;
+
+  utils::optional> 
getAttribute(const std::string& key) const;

Review comment:
   Updated.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-08 Thread GitBox


hunyadi-dev commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r553912615



##
File path: extensions/librdkafka/tests/ConsumeKafkaTests.cpp
##
@@ -0,0 +1,590 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define CATCH_CONFIG_MAIN
+
+#include 
+#include 
+#include 
+#include 
+
+#include "TestBase.h"
+
+#include "../ConsumeKafka.h"
+#include "../rdkafka_utils.h"
+#include "../../standard-processors/processors/ExtractText.h"
+#include "utils/file/FileUtils.h"
+#include "utils/OptionalUtils.h"
+#include "utils/RegexUtils.h"
+#include "utils/StringUtils.h"
+#include "utils/TestUtils.h"
+
+#include "utils/IntegrationTestUtils.h"
+
+namespace {
+using org::apache::nifi::minifi::utils::optional;
+
+class KafkaTestProducer {
+ public:
+  enum class PublishEvent {
+PUBLISH,
+TRANSACTION_START,
+TRANSACTION_COMMIT,
+CANCEL
+  };
+  KafkaTestProducer(const std::string& kafka_brokers, const std::string& 
topic, const bool transactional) :
+  logger_(logging::LoggerFactory::getLogger()) {
+using utils::setKafkaConfigurationField;
+
+std::unique_ptr conf = { 
rd_kafka_conf_new(), utils::rd_kafka_conf_deleter() };
+
+setKafkaConfigurationField(conf.get(), "bootstrap.servers", kafka_brokers);
+setKafkaConfigurationField(conf.get(), "compression.codec", "snappy");
+setKafkaConfigurationField(conf.get(), "batch.num.messages", "1");
+
+if (transactional) {
+  setKafkaConfigurationField(conf.get(), "transactional.id", 
"ConsumeKafkaTest_transaction_id");
+}
+
+static std::array errstr{};
+producer_ = { rd_kafka_new(RD_KAFKA_PRODUCER, conf.release(), 
errstr.data(), errstr.size()), utils::rd_kafka_producer_deleter() };
+if (producer_ == nullptr) {
+  auto error_msg = "Failed to create Kafka producer" + std::string{ 
errstr.data() };
+  throw std::runtime_error(error_msg);
+}
+
+// The last argument is a config here, but it is already owned by the 
producer. I assume that this would mean an override on the original config if 
used
+topic_ = { rd_kafka_topic_new(producer_.get(), topic.c_str(), nullptr), 
utils::rd_kafka_topic_deleter() };
+
+if (transactional) {
+  rd_kafka_init_transactions(producer_.get(), 
TRANSACTIONS_TIMEOUT_MS.count());
+}
+  }
+
+  // Uses all the headers for every published message
+  void publish_messages_to_topic(
+  const std::vector& messages_on_topic, const std::string& 
message_key, std::vector events,
+  const std::vector>& message_headers, 
const optional& message_header_encoding) {
+auto next_message = messages_on_topic.cbegin();
+for (const PublishEvent event : events) {
+  switch (event) {
+case PublishEvent::PUBLISH:
+  REQUIRE(messages_on_topic.cend() != next_message);
+  publish_message(*next_message, message_key, message_headers, 
message_header_encoding);
+  std::advance(next_message, 1);
+  break;
+case PublishEvent::TRANSACTION_START:
+  logger_->log_debug("Starting new transaction...");
+  rd_kafka_begin_transaction(producer_.get());
+  break;
+case PublishEvent::TRANSACTION_COMMIT:
+  logger_->log_debug("Committing transaction...");
+  rd_kafka_commit_transaction(producer_.get(), 
TRANSACTIONS_TIMEOUT_MS.count());
+  break;
+case PublishEvent::CANCEL:
+  logger_->log_debug("Cancelling transaction...");
+  rd_kafka_abort_transaction(producer_.get(), 
TRANSACTIONS_TIMEOUT_MS.count());
+  }
+}
+  }
+
+ private:
+  void publish_message(
+  const std::string& message, const std::string& message_key, const 
std::vector>& message_headers, const 
optional& message_header_encoding) {
+logger_->log_debug("Producing: %s", message.c_str());
+std::unique_ptr 
headers(rd_kafka_headers_new(message_headers.size()), 
utils::rd_kafka_headers_deleter());
+if (!headers) {
+  throw std::runtime_error("Generating message headers failed.");
+}
+for (const std::pair& message_header : 
message_headers) {
+  rd_kafka_header_add(headers.get(),
+  

[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-08 Thread GitBox


hunyadi-dev commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r553912267



##
File path: extensions/librdkafka/docker_tests/CMakeLists.txt
##
@@ -0,0 +1,36 @@
+#

Review comment:
   I mean these will sit in docker, but right now they are just something 
to run manually on minifi with a working local broker. I will soon get to start 
working on dockerized tests, that PR will probably get rid of these.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-08 Thread GitBox


hunyadi-dev commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r553911182



##
File path: extensions/librdkafka/ConsumeKafka.cpp
##
@@ -0,0 +1,582 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConsumeKafka.h"
+
+#include 
+#include 
+
+#include "core/PropertyValidation.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/gsl.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+// The upper limit for Max Poll Time is 4 seconds. This is because Watchdog 
would potentially start
+// reporting issues with the processor health otherwise
+class ConsumeKafkaMaxPollTimeValidator : public TimePeriodValidator {
+ public:
+  ConsumeKafkaMaxPollTimeValidator(const std::string ) // NOLINT
+  : TimePeriodValidator(name) {
+  }
+  ~ConsumeKafkaMaxPollTimeValidator() override = default;
+
+  ValidationResult validate(const std::string& subject, const std::string& 
input) const override {
+uint64_t value;
+TimeUnit timeUnit;
+uint64_t value_as_ms;
+return 
ValidationResult::Builder::createBuilder().withSubject(subject).withInput(input).isValid(
+core::TimePeriodValue::StringToTime(input, value, timeUnit) &&
+org::apache::nifi::minifi::core::Property::ConvertTimeUnitToMS(value, 
timeUnit, value_as_ms) &&
+0 < value_as_ms && value_as_ms <= 4000).build();
+  }
+};
+}  // namespace core
+namespace processors {
+
+constexpr const std::size_t ConsumeKafka::DEFAULT_MAX_POLL_RECORDS;
+constexpr char const* ConsumeKafka::DEFAULT_MAX_POLL_TIME;
+
+constexpr char const* ConsumeKafka::TOPIC_FORMAT_NAMES;
+constexpr char const* ConsumeKafka::TOPIC_FORMAT_PATTERNS;
+
+core::Property 
ConsumeKafka::KafkaBrokers(core::PropertyBuilder::createProperty("Kafka 
Brokers")
+  ->withDescription("A comma-separated list of known Kafka Brokers in the 
format :.")
+  ->withDefaultValue("localhost:9092", 
core::StandardValidators::get().NON_BLANK_VALIDATOR)
+  ->supportsExpressionLanguage(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::SecurityProtocol(core::PropertyBuilder::createProperty("Security 
Protocol")
+  ->withDescription("This property is currently not supported. Protocol used 
to communicate with brokers. Corresponds to Kafka's 'security.protocol' 
property.")
+  ->withAllowableValues({SECURITY_PROTOCOL_PLAINTEXT/*, 
SECURITY_PROTOCOL_SSL, SECURITY_PROTOCOL_SASL_PLAINTEXT, 
SECURITY_PROTOCOL_SASL_SSL*/ })
+  ->withDefaultValue(SECURITY_PROTOCOL_PLAINTEXT)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::TopicNames(core::PropertyBuilder::createProperty("Topic Names")
+  ->withDescription("The name of the Kafka Topic(s) to pull from. Multiple 
topic names are supported as a comma separated list.")
+  ->supportsExpressionLanguage(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::TopicNameFormat(core::PropertyBuilder::createProperty("Topic Name 
Format")
+  ->withDescription("Specifies whether the Topic(s) provided are a comma 
separated list of names or a single regular expression.")
+  ->withAllowableValues({TOPIC_FORMAT_NAMES, 
TOPIC_FORMAT_PATTERNS})
+  ->withDefaultValue(TOPIC_FORMAT_NAMES)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::HonorTransactions(core::PropertyBuilder::createProperty("Honor 
Transactions")
+  ->withDescription(
+  "Specifies whether or not MiNiFi should honor transactional guarantees 
when communicating with Kafka. If false, the Processor will use an \"isolation 
level\" of "
+  "read_uncomitted. This means that messages will be received as soon as 
they are written to Kafka but will be pulled, even if the producer cancels the 
transactions. "
+  "If this value is true, MiNiFi will not receive any messages for which 
the producer's transaction was canceled, but this can result in some latency 
since the consumer "
+  "must wait for the producer to finish its entire transaction instead of 
pulling as the messages become available.")
+  ->withDefaultValue(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 

[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-08 Thread GitBox


hunyadi-dev commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r553906593



##
File path: extensions/librdkafka/rdkafka_utils.cpp
##
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+
+#include "rdkafka_utils.h"
+
+#include "Exception.h"
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+void setKafkaConfigurationField(rd_kafka_conf_t* configuration, const 
std::string& field_name, const std::string& value) {
+  static std::array errstr{};
+  rd_kafka_conf_res_t result;
+  result = rd_kafka_conf_set(configuration, field_name.c_str(), value.c_str(), 
errstr.data(), errstr.size());
+  if (RD_KAFKA_CONF_OK != result) {
+const std::string error_msg { errstr.begin(), errstr.end() };
+throw Exception(PROCESS_SCHEDULE_EXCEPTION, "rd_kafka configuration error" 
+ error_msg);
+  }
+}
+
+void print_topics_list(std::shared_ptr logger, 
rd_kafka_topic_partition_list_t* kf_topic_partition_list) {
+  for (std::size_t i = 0; i < kf_topic_partition_list->cnt; ++i) {
+logger->log_debug("kf_topic_partition_list: topic: %s, partition: %d, 
offset:%lld]",
+kf_topic_partition_list->elems[i].topic, 
kf_topic_partition_list->elems[i].partition, 
kf_topic_partition_list->elems[i].offset);
+  }
+}
+
+void print_kafka_message(const rd_kafka_message_t* rkmessage, const 
std::shared_ptr& logger) {
+  if (RD_KAFKA_RESP_ERR_NO_ERROR != rkmessage->err) {
+const std::string error_msg = "ConsumeKafka: received error message from 
broker. Librdkafka error msg: " + std::string(rd_kafka_err2str(rkmessage->err));
+throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, error_msg);
+  }
+  std::string topicName = rd_kafka_topic_name(rkmessage->rkt);
+  std::string message(reinterpret_cast(rkmessage->payload), 
rkmessage->len);
+  const char* key = reinterpret_cast(rkmessage->key);
+  const std::size_t key_len = rkmessage->key_len;
+  rd_kafka_timestamp_type_t tstype;
+  int64_t timestamp;
+  timestamp = rd_kafka_message_timestamp(rkmessage, );
+  const char *tsname = "?";
+  if (tstype != RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) {
+if (tstype == RD_KAFKA_TIMESTAMP_CREATE_TIME) {
+  tsname = "create time";
+} else if (tstype == RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME) {
+  tsname = "log append time";
+}
+  }
+  const int64_t seconds_since_timestamp = timestamp ? 
static_cast(time(NULL)) - static_cast(timestamp / 1000) : 0;

Review comment:
   It does print the absolute one as well in this format:
   > [Timestamp](create time 1610107476940 (0 s ago)),





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-08 Thread GitBox


hunyadi-dev commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r553906593



##
File path: extensions/librdkafka/rdkafka_utils.cpp
##
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+
+#include "rdkafka_utils.h"
+
+#include "Exception.h"
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+void setKafkaConfigurationField(rd_kafka_conf_t* configuration, const 
std::string& field_name, const std::string& value) {
+  static std::array errstr{};
+  rd_kafka_conf_res_t result;
+  result = rd_kafka_conf_set(configuration, field_name.c_str(), value.c_str(), 
errstr.data(), errstr.size());
+  if (RD_KAFKA_CONF_OK != result) {
+const std::string error_msg { errstr.begin(), errstr.end() };
+throw Exception(PROCESS_SCHEDULE_EXCEPTION, "rd_kafka configuration error" 
+ error_msg);
+  }
+}
+
+void print_topics_list(std::shared_ptr logger, 
rd_kafka_topic_partition_list_t* kf_topic_partition_list) {
+  for (std::size_t i = 0; i < kf_topic_partition_list->cnt; ++i) {
+logger->log_debug("kf_topic_partition_list: topic: %s, partition: %d, 
offset:%lld]",
+kf_topic_partition_list->elems[i].topic, 
kf_topic_partition_list->elems[i].partition, 
kf_topic_partition_list->elems[i].offset);
+  }
+}
+
+void print_kafka_message(const rd_kafka_message_t* rkmessage, const 
std::shared_ptr& logger) {
+  if (RD_KAFKA_RESP_ERR_NO_ERROR != rkmessage->err) {
+const std::string error_msg = "ConsumeKafka: received error message from 
broker. Librdkafka error msg: " + std::string(rd_kafka_err2str(rkmessage->err));
+throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, error_msg);
+  }
+  std::string topicName = rd_kafka_topic_name(rkmessage->rkt);
+  std::string message(reinterpret_cast(rkmessage->payload), 
rkmessage->len);
+  const char* key = reinterpret_cast(rkmessage->key);
+  const std::size_t key_len = rkmessage->key_len;
+  rd_kafka_timestamp_type_t tstype;
+  int64_t timestamp;
+  timestamp = rd_kafka_message_timestamp(rkmessage, );
+  const char *tsname = "?";
+  if (tstype != RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) {
+if (tstype == RD_KAFKA_TIMESTAMP_CREATE_TIME) {
+  tsname = "create time";
+} else if (tstype == RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME) {
+  tsname = "log append time";
+}
+  }
+  const int64_t seconds_since_timestamp = timestamp ? 
static_cast(time(NULL)) - static_cast(timestamp / 1000) : 0;

Review comment:
   It does print the absolute one as well.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-08 Thread GitBox


hunyadi-dev commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r553906201



##
File path: extensions/librdkafka/ConsumeKafka.cpp
##
@@ -0,0 +1,569 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConsumeKafka.h"
+
+#include 
+#include 
+
+#include "core/PropertyValidation.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/gsl.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+// The upper limit for Max Poll Time is 4 seconds. This is because Watchdog 
would potentially start
+// reporting issues with the processor health otherwise
+class ConsumeKafkaMaxPollTimeValidator : public TimePeriodValidator {
+ public:
+  ConsumeKafkaMaxPollTimeValidator(const std::string ) // NOLINT
+  : TimePeriodValidator(name) {
+  }
+  ~ConsumeKafkaMaxPollTimeValidator() override = default;
+
+  ValidationResult validate(const std::string& subject, const std::string& 
input) const override {
+uint64_t value;
+TimeUnit timeUnit;
+uint64_t value_as_ms;
+return 
ValidationResult::Builder::createBuilder().withSubject(subject).withInput(input).isValid(
+core::TimePeriodValue::StringToTime(input, value, timeUnit) &&
+org::apache::nifi::minifi::core::Property::ConvertTimeUnitToMS(value, 
timeUnit, value_as_ms) &&
+0 < value_as_ms && value_as_ms <= 4000).build();
+  }
+};
+}  // namespace core
+namespace processors {
+
+constexpr const std::size_t ConsumeKafka::DEFAULT_MAX_POLL_RECORDS;
+constexpr char const* ConsumeKafka::DEFAULT_MAX_POLL_TIME;
+
+constexpr char const* ConsumeKafka::TOPIC_FORMAT_NAMES;
+constexpr char const* ConsumeKafka::TOPIC_FORMAT_PATTERNS;
+
+core::Property 
ConsumeKafka::KafkaBrokers(core::PropertyBuilder::createProperty("Kafka 
Brokers")
+  ->withDescription("A comma-separated list of known Kafka Brokers in the 
format :.")
+  ->withDefaultValue("localhost:9092", 
core::StandardValidators::get().NON_BLANK_VALIDATOR)
+  ->supportsExpressionLanguage(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::SecurityProtocol(core::PropertyBuilder::createProperty("Security 
Protocol")
+  ->withDescription("This property is currently not supported. Protocol used 
to communicate with brokers. Corresponds to Kafka's 'security.protocol' 
property.")
+  ->withAllowableValues({SECURITY_PROTOCOL_PLAINTEXT/*, 
SECURITY_PROTOCOL_SSL, SECURITY_PROTOCOL_SASL_PLAINTEXT, 
SECURITY_PROTOCOL_SASL_SSL*/ })
+  ->withDefaultValue(SECURITY_PROTOCOL_PLAINTEXT)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::TopicNames(core::PropertyBuilder::createProperty("Topic Names")
+  ->withDescription("The name of the Kafka Topic(s) to pull from. Multiple 
topic names are supported as a comma separated list.")
+  ->supportsExpressionLanguage(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::TopicNameFormat(core::PropertyBuilder::createProperty("Topic Name 
Format")
+  ->withDescription("Specifies whether the Topic(s) provided are a comma 
separated list of names or a single regular expression.")
+  ->withAllowableValues({TOPIC_FORMAT_NAMES, 
TOPIC_FORMAT_PATTERNS})
+  ->withDefaultValue(TOPIC_FORMAT_NAMES)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::HonorTransactions(core::PropertyBuilder::createProperty("Honor 
Transactions")
+  ->withDescription(
+  "Specifies whether or not MiNiFi should honor transactional guarantees 
when communicating with Kafka. If false, the Processor will use an \"isolation 
level\" of "
+  "read_uncomitted. This means that messages will be received as soon as 
they are written to Kafka but will be pulled, even if the producer cancels the 
transactions. "
+  "If this value is true, MiNiFi will not receive any messages for which 
the producer's transaction was canceled, but this can result in some latency 
since the consumer "
+  "must wait for the producer to finish its entire transaction instead of 
pulling as the messages become available.")
+  ->withDefaultValue(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 

[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-08 Thread GitBox


hunyadi-dev commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r553905938



##
File path: extensions/librdkafka/ConsumeKafka.cpp
##
@@ -0,0 +1,569 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConsumeKafka.h"
+
+#include 
+#include 
+
+#include "core/PropertyValidation.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/gsl.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+// The upper limit for Max Poll Time is 4 seconds. This is because Watchdog 
would potentially start
+// reporting issues with the processor health otherwise
+class ConsumeKafkaMaxPollTimeValidator : public TimePeriodValidator {
+ public:
+  ConsumeKafkaMaxPollTimeValidator(const std::string ) // NOLINT
+  : TimePeriodValidator(name) {
+  }
+  ~ConsumeKafkaMaxPollTimeValidator() override = default;
+
+  ValidationResult validate(const std::string& subject, const std::string& 
input) const override {
+uint64_t value;
+TimeUnit timeUnit;
+uint64_t value_as_ms;
+return 
ValidationResult::Builder::createBuilder().withSubject(subject).withInput(input).isValid(
+core::TimePeriodValue::StringToTime(input, value, timeUnit) &&
+org::apache::nifi::minifi::core::Property::ConvertTimeUnitToMS(value, 
timeUnit, value_as_ms) &&
+0 < value_as_ms && value_as_ms <= 4000).build();
+  }
+};
+}  // namespace core
+namespace processors {
+
+constexpr const std::size_t ConsumeKafka::DEFAULT_MAX_POLL_RECORDS;
+constexpr char const* ConsumeKafka::DEFAULT_MAX_POLL_TIME;
+
+constexpr char const* ConsumeKafka::TOPIC_FORMAT_NAMES;
+constexpr char const* ConsumeKafka::TOPIC_FORMAT_PATTERNS;
+
+core::Property 
ConsumeKafka::KafkaBrokers(core::PropertyBuilder::createProperty("Kafka 
Brokers")
+  ->withDescription("A comma-separated list of known Kafka Brokers in the 
format :.")
+  ->withDefaultValue("localhost:9092", 
core::StandardValidators::get().NON_BLANK_VALIDATOR)
+  ->supportsExpressionLanguage(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::SecurityProtocol(core::PropertyBuilder::createProperty("Security 
Protocol")
+  ->withDescription("This property is currently not supported. Protocol used 
to communicate with brokers. Corresponds to Kafka's 'security.protocol' 
property.")
+  ->withAllowableValues({SECURITY_PROTOCOL_PLAINTEXT/*, 
SECURITY_PROTOCOL_SSL, SECURITY_PROTOCOL_SASL_PLAINTEXT, 
SECURITY_PROTOCOL_SASL_SSL*/ })
+  ->withDefaultValue(SECURITY_PROTOCOL_PLAINTEXT)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::TopicNames(core::PropertyBuilder::createProperty("Topic Names")
+  ->withDescription("The name of the Kafka Topic(s) to pull from. Multiple 
topic names are supported as a comma separated list.")
+  ->supportsExpressionLanguage(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::TopicNameFormat(core::PropertyBuilder::createProperty("Topic Name 
Format")
+  ->withDescription("Specifies whether the Topic(s) provided are a comma 
separated list of names or a single regular expression.")
+  ->withAllowableValues({TOPIC_FORMAT_NAMES, 
TOPIC_FORMAT_PATTERNS})
+  ->withDefaultValue(TOPIC_FORMAT_NAMES)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::HonorTransactions(core::PropertyBuilder::createProperty("Honor 
Transactions")
+  ->withDescription(
+  "Specifies whether or not MiNiFi should honor transactional guarantees 
when communicating with Kafka. If false, the Processor will use an \"isolation 
level\" of "
+  "read_uncomitted. This means that messages will be received as soon as 
they are written to Kafka but will be pulled, even if the producer cancels the 
transactions. "
+  "If this value is true, MiNiFi will not receive any messages for which 
the producer's transaction was canceled, but this can result in some latency 
since the consumer "
+  "must wait for the producer to finish its entire transaction instead of 
pulling as the messages become available.")
+  ->withDefaultValue(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 

[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-08 Thread GitBox


hunyadi-dev commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r553901711



##
File path: extensions/librdkafka/ConsumeKafka.cpp
##
@@ -0,0 +1,569 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConsumeKafka.h"
+
+#include 
+#include 
+
+#include "core/PropertyValidation.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/gsl.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+// The upper limit for Max Poll Time is 4 seconds. This is because Watchdog 
would potentially start
+// reporting issues with the processor health otherwise
+class ConsumeKafkaMaxPollTimeValidator : public TimePeriodValidator {
+ public:
+  ConsumeKafkaMaxPollTimeValidator(const std::string ) // NOLINT
+  : TimePeriodValidator(name) {
+  }
+  ~ConsumeKafkaMaxPollTimeValidator() override = default;
+
+  ValidationResult validate(const std::string& subject, const std::string& 
input) const override {
+uint64_t value;
+TimeUnit timeUnit;
+uint64_t value_as_ms;
+return 
ValidationResult::Builder::createBuilder().withSubject(subject).withInput(input).isValid(
+core::TimePeriodValue::StringToTime(input, value, timeUnit) &&
+org::apache::nifi::minifi::core::Property::ConvertTimeUnitToMS(value, 
timeUnit, value_as_ms) &&
+0 < value_as_ms && value_as_ms <= 4000).build();
+  }
+};
+}  // namespace core
+namespace processors {
+
+constexpr const std::size_t ConsumeKafka::DEFAULT_MAX_POLL_RECORDS;
+constexpr char const* ConsumeKafka::DEFAULT_MAX_POLL_TIME;
+
+constexpr char const* ConsumeKafka::TOPIC_FORMAT_NAMES;
+constexpr char const* ConsumeKafka::TOPIC_FORMAT_PATTERNS;
+
+core::Property 
ConsumeKafka::KafkaBrokers(core::PropertyBuilder::createProperty("Kafka 
Brokers")
+  ->withDescription("A comma-separated list of known Kafka Brokers in the 
format :.")
+  ->withDefaultValue("localhost:9092", 
core::StandardValidators::get().NON_BLANK_VALIDATOR)
+  ->supportsExpressionLanguage(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::SecurityProtocol(core::PropertyBuilder::createProperty("Security 
Protocol")
+  ->withDescription("This property is currently not supported. Protocol used 
to communicate with brokers. Corresponds to Kafka's 'security.protocol' 
property.")
+  ->withAllowableValues({SECURITY_PROTOCOL_PLAINTEXT/*, 
SECURITY_PROTOCOL_SSL, SECURITY_PROTOCOL_SASL_PLAINTEXT, 
SECURITY_PROTOCOL_SASL_SSL*/ })
+  ->withDefaultValue(SECURITY_PROTOCOL_PLAINTEXT)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::TopicNames(core::PropertyBuilder::createProperty("Topic Names")
+  ->withDescription("The name of the Kafka Topic(s) to pull from. Multiple 
topic names are supported as a comma separated list.")
+  ->supportsExpressionLanguage(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::TopicNameFormat(core::PropertyBuilder::createProperty("Topic Name 
Format")
+  ->withDescription("Specifies whether the Topic(s) provided are a comma 
separated list of names or a single regular expression.")
+  ->withAllowableValues({TOPIC_FORMAT_NAMES, 
TOPIC_FORMAT_PATTERNS})
+  ->withDefaultValue(TOPIC_FORMAT_NAMES)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::HonorTransactions(core::PropertyBuilder::createProperty("Honor 
Transactions")
+  ->withDescription(
+  "Specifies whether or not MiNiFi should honor transactional guarantees 
when communicating with Kafka. If false, the Processor will use an \"isolation 
level\" of "
+  "read_uncomitted. This means that messages will be received as soon as 
they are written to Kafka but will be pulled, even if the producer cancels the 
transactions. "
+  "If this value is true, MiNiFi will not receive any messages for which 
the producer's transaction was canceled, but this can result in some latency 
since the consumer "
+  "must wait for the producer to finish its entire transaction instead of 
pulling as the messages become available.")
+  ->withDefaultValue(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 

[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #895: MINIFICPP-1352 - Comment out unused parameters (for enabling -Wall)

2021-01-08 Thread GitBox


hunyadi-dev commented on a change in pull request #895:
URL: https://github.com/apache/nifi-minifi-cpp/pull/895#discussion_r553901250



##
File path: extensions/civetweb/processors/ListenHTTP.cpp
##
@@ -211,7 +211,7 @@ void ListenHTTP::onSchedule(core::ProcessContext *context, 
core::ProcessSessionF
 
 ListenHTTP::~ListenHTTP() = default;
 
-void ListenHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession 
*session) {
+void ListenHTTP::onTrigger(core::ProcessContext* /*context*/, 
core::ProcessSession *session) {

Review comment:
   I don't have a quick-fix for that. People probably need to edit those 
cases one-by-one when they encounter them.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-08 Thread GitBox


szaszm commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r553897459



##
File path: extensions/librdkafka/rdkafka_utils.cpp
##
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+
+#include "rdkafka_utils.h"
+
+#include "Exception.h"
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+void setKafkaConfigurationField(rd_kafka_conf_t* configuration, const 
std::string& field_name, const std::string& value) {
+  static std::array errstr{};
+  rd_kafka_conf_res_t result;
+  result = rd_kafka_conf_set(configuration, field_name.c_str(), value.c_str(), 
errstr.data(), errstr.size());
+  if (RD_KAFKA_CONF_OK != result) {
+const std::string error_msg { errstr.begin(), errstr.end() };
+throw Exception(PROCESS_SCHEDULE_EXCEPTION, "rd_kafka configuration error" 
+ error_msg);
+  }
+}
+
+void print_topics_list(std::shared_ptr logger, 
rd_kafka_topic_partition_list_t* kf_topic_partition_list) {
+  for (std::size_t i = 0; i < kf_topic_partition_list->cnt; ++i) {
+logger->log_debug("kf_topic_partition_list: topic: %s, partition: %d, 
offset:%lld]",
+kf_topic_partition_list->elems[i].topic, 
kf_topic_partition_list->elems[i].partition, 
kf_topic_partition_list->elems[i].offset);
+  }
+}
+
+void print_kafka_message(const rd_kafka_message_t* rkmessage, const 
std::shared_ptr& logger) {

Review comment:
   Taking a shared pointer instead of an observer pointer/reference makes 
it impossible to use the function with anything other than a shared pointer, 
like unique ptr, stack object, manually allocated or member of another object.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-08 Thread GitBox


hunyadi-dev commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r553895379



##
File path: extensions/librdkafka/rdkafka_utils.cpp
##
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+
+#include "rdkafka_utils.h"
+
+#include "Exception.h"
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+void setKafkaConfigurationField(rd_kafka_conf_t* configuration, const 
std::string& field_name, const std::string& value) {
+  static std::array errstr{};
+  rd_kafka_conf_res_t result;
+  result = rd_kafka_conf_set(configuration, field_name.c_str(), value.c_str(), 
errstr.data(), errstr.size());
+  if (RD_KAFKA_CONF_OK != result) {
+const std::string error_msg { errstr.begin(), errstr.end() };
+throw Exception(PROCESS_SCHEDULE_EXCEPTION, "rd_kafka configuration error" 
+ error_msg);
+  }
+}
+
+void print_topics_list(std::shared_ptr logger, 
rd_kafka_topic_partition_list_t* kf_topic_partition_list) {
+  for (std::size_t i = 0; i < kf_topic_partition_list->cnt; ++i) {
+logger->log_debug("kf_topic_partition_list: topic: %s, partition: %d, 
offset:%lld]",
+kf_topic_partition_list->elems[i].topic, 
kf_topic_partition_list->elems[i].partition, 
kf_topic_partition_list->elems[i].offset);
+  }
+}
+
+void print_kafka_message(const rd_kafka_message_t* rkmessage, const 
std::shared_ptr& logger) {

Review comment:
   Can a const ref on a shared pointer even manipulate lifetime or 
ownership? I don't mind changing so updated them as requested.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-08 Thread GitBox


hunyadi-dev commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r553893682



##
File path: extensions/librdkafka/ConsumeKafka.cpp
##
@@ -0,0 +1,569 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConsumeKafka.h"
+
+#include 
+#include 
+
+#include "core/PropertyValidation.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/gsl.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+// The upper limit for Max Poll Time is 4 seconds. This is because Watchdog 
would potentially start
+// reporting issues with the processor health otherwise
+class ConsumeKafkaMaxPollTimeValidator : public TimePeriodValidator {
+ public:
+  ConsumeKafkaMaxPollTimeValidator(const std::string ) // NOLINT
+  : TimePeriodValidator(name) {
+  }
+  ~ConsumeKafkaMaxPollTimeValidator() override = default;
+
+  ValidationResult validate(const std::string& subject, const std::string& 
input) const override {
+uint64_t value;
+TimeUnit timeUnit;
+uint64_t value_as_ms;
+return 
ValidationResult::Builder::createBuilder().withSubject(subject).withInput(input).isValid(
+core::TimePeriodValue::StringToTime(input, value, timeUnit) &&
+org::apache::nifi::minifi::core::Property::ConvertTimeUnitToMS(value, 
timeUnit, value_as_ms) &&
+0 < value_as_ms && value_as_ms <= 4000).build();
+  }
+};
+}  // namespace core
+namespace processors {
+
+constexpr const std::size_t ConsumeKafka::DEFAULT_MAX_POLL_RECORDS;
+constexpr char const* ConsumeKafka::DEFAULT_MAX_POLL_TIME;
+
+constexpr char const* ConsumeKafka::TOPIC_FORMAT_NAMES;
+constexpr char const* ConsumeKafka::TOPIC_FORMAT_PATTERNS;
+
+core::Property 
ConsumeKafka::KafkaBrokers(core::PropertyBuilder::createProperty("Kafka 
Brokers")
+  ->withDescription("A comma-separated list of known Kafka Brokers in the 
format :.")
+  ->withDefaultValue("localhost:9092", 
core::StandardValidators::get().NON_BLANK_VALIDATOR)
+  ->supportsExpressionLanguage(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::SecurityProtocol(core::PropertyBuilder::createProperty("Security 
Protocol")
+  ->withDescription("This property is currently not supported. Protocol used 
to communicate with brokers. Corresponds to Kafka's 'security.protocol' 
property.")
+  ->withAllowableValues({SECURITY_PROTOCOL_PLAINTEXT/*, 
SECURITY_PROTOCOL_SSL, SECURITY_PROTOCOL_SASL_PLAINTEXT, 
SECURITY_PROTOCOL_SASL_SSL*/ })
+  ->withDefaultValue(SECURITY_PROTOCOL_PLAINTEXT)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::TopicNames(core::PropertyBuilder::createProperty("Topic Names")
+  ->withDescription("The name of the Kafka Topic(s) to pull from. Multiple 
topic names are supported as a comma separated list.")
+  ->supportsExpressionLanguage(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::TopicNameFormat(core::PropertyBuilder::createProperty("Topic Name 
Format")
+  ->withDescription("Specifies whether the Topic(s) provided are a comma 
separated list of names or a single regular expression.")
+  ->withAllowableValues({TOPIC_FORMAT_NAMES, 
TOPIC_FORMAT_PATTERNS})
+  ->withDefaultValue(TOPIC_FORMAT_NAMES)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::HonorTransactions(core::PropertyBuilder::createProperty("Honor 
Transactions")
+  ->withDescription(
+  "Specifies whether or not MiNiFi should honor transactional guarantees 
when communicating with Kafka. If false, the Processor will use an \"isolation 
level\" of "
+  "read_uncomitted. This means that messages will be received as soon as 
they are written to Kafka but will be pulled, even if the producer cancels the 
transactions. "
+  "If this value is true, MiNiFi will not receive any messages for which 
the producer's transaction was canceled, but this can result in some latency 
since the consumer "
+  "must wait for the producer to finish its entire transaction instead of 
pulling as the messages become available.")
+  ->withDefaultValue(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 

[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-08 Thread GitBox


hunyadi-dev commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r553893682



##
File path: extensions/librdkafka/ConsumeKafka.cpp
##
@@ -0,0 +1,569 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConsumeKafka.h"
+
+#include 
+#include 
+
+#include "core/PropertyValidation.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/gsl.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+// The upper limit for Max Poll Time is 4 seconds. This is because Watchdog 
would potentially start
+// reporting issues with the processor health otherwise
+class ConsumeKafkaMaxPollTimeValidator : public TimePeriodValidator {
+ public:
+  ConsumeKafkaMaxPollTimeValidator(const std::string ) // NOLINT
+  : TimePeriodValidator(name) {
+  }
+  ~ConsumeKafkaMaxPollTimeValidator() override = default;
+
+  ValidationResult validate(const std::string& subject, const std::string& 
input) const override {
+uint64_t value;
+TimeUnit timeUnit;
+uint64_t value_as_ms;
+return 
ValidationResult::Builder::createBuilder().withSubject(subject).withInput(input).isValid(
+core::TimePeriodValue::StringToTime(input, value, timeUnit) &&
+org::apache::nifi::minifi::core::Property::ConvertTimeUnitToMS(value, 
timeUnit, value_as_ms) &&
+0 < value_as_ms && value_as_ms <= 4000).build();
+  }
+};
+}  // namespace core
+namespace processors {
+
+constexpr const std::size_t ConsumeKafka::DEFAULT_MAX_POLL_RECORDS;
+constexpr char const* ConsumeKafka::DEFAULT_MAX_POLL_TIME;
+
+constexpr char const* ConsumeKafka::TOPIC_FORMAT_NAMES;
+constexpr char const* ConsumeKafka::TOPIC_FORMAT_PATTERNS;
+
+core::Property 
ConsumeKafka::KafkaBrokers(core::PropertyBuilder::createProperty("Kafka 
Brokers")
+  ->withDescription("A comma-separated list of known Kafka Brokers in the 
format :.")
+  ->withDefaultValue("localhost:9092", 
core::StandardValidators::get().NON_BLANK_VALIDATOR)
+  ->supportsExpressionLanguage(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::SecurityProtocol(core::PropertyBuilder::createProperty("Security 
Protocol")
+  ->withDescription("This property is currently not supported. Protocol used 
to communicate with brokers. Corresponds to Kafka's 'security.protocol' 
property.")
+  ->withAllowableValues({SECURITY_PROTOCOL_PLAINTEXT/*, 
SECURITY_PROTOCOL_SSL, SECURITY_PROTOCOL_SASL_PLAINTEXT, 
SECURITY_PROTOCOL_SASL_SSL*/ })
+  ->withDefaultValue(SECURITY_PROTOCOL_PLAINTEXT)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::TopicNames(core::PropertyBuilder::createProperty("Topic Names")
+  ->withDescription("The name of the Kafka Topic(s) to pull from. Multiple 
topic names are supported as a comma separated list.")
+  ->supportsExpressionLanguage(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::TopicNameFormat(core::PropertyBuilder::createProperty("Topic Name 
Format")
+  ->withDescription("Specifies whether the Topic(s) provided are a comma 
separated list of names or a single regular expression.")
+  ->withAllowableValues({TOPIC_FORMAT_NAMES, 
TOPIC_FORMAT_PATTERNS})
+  ->withDefaultValue(TOPIC_FORMAT_NAMES)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::HonorTransactions(core::PropertyBuilder::createProperty("Honor 
Transactions")
+  ->withDescription(
+  "Specifies whether or not MiNiFi should honor transactional guarantees 
when communicating with Kafka. If false, the Processor will use an \"isolation 
level\" of "
+  "read_uncomitted. This means that messages will be received as soon as 
they are written to Kafka but will be pulled, even if the producer cancels the 
transactions. "
+  "If this value is true, MiNiFi will not receive any messages for which 
the producer's transaction was canceled, but this can result in some latency 
since the consumer "
+  "must wait for the producer to finish its entire transaction instead of 
pulling as the messages become available.")
+  ->withDefaultValue(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 

[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-08 Thread GitBox


hunyadi-dev commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r553891631



##
File path: extensions/librdkafka/rdkafka_utils.cpp
##
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+
+#include "rdkafka_utils.h"
+
+#include "Exception.h"
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+void setKafkaConfigurationField(rd_kafka_conf_t* configuration, const 
std::string& field_name, const std::string& value) {
+  static std::array errstr{};
+  rd_kafka_conf_res_t result;
+  result = rd_kafka_conf_set(configuration, field_name.c_str(), value.c_str(), 
errstr.data(), errstr.size());
+  if (RD_KAFKA_CONF_OK != result) {
+const std::string error_msg { errstr.begin(), errstr.end() };
+throw Exception(PROCESS_SCHEDULE_EXCEPTION, "rd_kafka configuration error" 
+ error_msg);
+  }
+}
+
+void print_topics_list(std::shared_ptr logger, 
rd_kafka_topic_partition_list_t* kf_topic_partition_list) {
+  for (std::size_t i = 0; i < kf_topic_partition_list->cnt; ++i) {
+logger->log_debug("kf_topic_partition_list: topic: %s, partition: %d, 
offset:%lld]",
+kf_topic_partition_list->elems[i].topic, 
kf_topic_partition_list->elems[i].partition, 
kf_topic_partition_list->elems[i].offset);
+  }
+}
+
+void print_kafka_message(const rd_kafka_message_t* rkmessage, const 
std::shared_ptr& logger) {
+  if (RD_KAFKA_RESP_ERR_NO_ERROR != rkmessage->err) {
+const std::string error_msg = "ConsumeKafka: received error message from 
broker. Librdkafka error msg: " + std::string(rd_kafka_err2str(rkmessage->err));
+throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, error_msg);
+  }
+  std::string topicName = rd_kafka_topic_name(rkmessage->rkt);
+  std::string message(reinterpret_cast(rkmessage->payload), 
rkmessage->len);
+  const char* key = reinterpret_cast(rkmessage->key);
+  const std::size_t key_len = rkmessage->key_len;
+  rd_kafka_timestamp_type_t tstype;
+  int64_t timestamp;
+  timestamp = rd_kafka_message_timestamp(rkmessage, );
+  const char *tsname = "?";
+  if (tstype != RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) {
+if (tstype == RD_KAFKA_TIMESTAMP_CREATE_TIME) {
+  tsname = "create time";
+} else if (tstype == RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME) {
+  tsname = "log append time";
+}
+  }
+  const int64_t seconds_since_timestamp = timestamp ? 
static_cast(time(NULL)) - static_cast(timestamp / 1000) : 0;
+
+  std::string headers_as_string;
+  rd_kafka_headers_t* hdrs;
+  const rd_kafka_resp_err_t get_header_response = 
rd_kafka_message_headers(rkmessage, );
+  if (RD_KAFKA_RESP_ERR_NO_ERROR == get_header_response) {
+std::vector header_list;
+kafka_headers_for_each(hdrs, [&] (const std::string& key, const 
std::string& val) { header_list.emplace_back(key + ": " + val); });
+headers_as_string = StringUtils::join(", ", header_list);
+  } else if (RD_KAFKA_RESP_ERR__NOENT != get_header_response) {
+logger->log_error("Failed to fetch message headers: %d: %s", 
rd_kafka_last_error(), rd_kafka_err2str(rd_kafka_last_error()));
+  }
+
+  std::string message_as_string;
+  message_as_string += "[Topic](" + topicName + "), ";
+  message_as_string += "[Key](" + (key != nullptr ? std::string(key, key_len) 
: std::string("[None]")) + "), ";
+  message_as_string += "[Offset](" +  std::to_string(rkmessage->offset) + "), 
";
+  message_as_string += "[Message Length](" + std::to_string(rkmessage->len) + 
"), ";
+  message_as_string += "[Timestamp](" + std::string(tsname) + " " + 
std::to_string(timestamp) + " (" + std::to_string(seconds_since_timestamp) + " 
s ago)), ";
+  message_as_string += "[Headers](";
+  message_as_string += headers_as_string + "\n";
+  message_as_string += "[Payload](" + message + ")";
+
+  logger -> log_debug("Message: %s", message_as_string.c_str());
+}
+
+std::string get_encoded_string(const std::string& input, KafkaEncoding 
encoding) {
+  switch (encoding) {
+case KafkaEncoding::UTF8:
+  return input;
+case KafkaEncoding::HEX:
+  return StringUtils::to_hex(input, /* uppercase = */ true);
+  }
+  throw 

[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-08 Thread GitBox


hunyadi-dev commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r553890971



##
File path: extensions/librdkafka/rdkafka_utils.cpp
##
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+
+#include "rdkafka_utils.h"
+
+#include "Exception.h"
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+void setKafkaConfigurationField(rd_kafka_conf_t* configuration, const 
std::string& field_name, const std::string& value) {
+  static std::array errstr{};
+  rd_kafka_conf_res_t result;
+  result = rd_kafka_conf_set(configuration, field_name.c_str(), value.c_str(), 
errstr.data(), errstr.size());
+  if (RD_KAFKA_CONF_OK != result) {
+const std::string error_msg { errstr.begin(), errstr.end() };
+throw Exception(PROCESS_SCHEDULE_EXCEPTION, "rd_kafka configuration error" 
+ error_msg);

Review comment:
   Added missing " :".





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-08 Thread GitBox


hunyadi-dev commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r553890911



##
File path: extensions/librdkafka/rdkafka_utils.cpp
##
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+
+#include "rdkafka_utils.h"
+
+#include "Exception.h"
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+void setKafkaConfigurationField(rd_kafka_conf_t* configuration, const 
std::string& field_name, const std::string& value) {
+  static std::array errstr{};
+  rd_kafka_conf_res_t result;
+  result = rd_kafka_conf_set(configuration, field_name.c_str(), value.c_str(), 
errstr.data(), errstr.size());
+  if (RD_KAFKA_CONF_OK != result) {
+const std::string error_msg { errstr.begin(), errstr.end() };
+throw Exception(PROCESS_SCHEDULE_EXCEPTION, "rd_kafka configuration error" 
+ error_msg);
+  }
+}
+
+void print_topics_list(std::shared_ptr logger, 
rd_kafka_topic_partition_list_t* kf_topic_partition_list) {
+  for (std::size_t i = 0; i < kf_topic_partition_list->cnt; ++i) {
+logger->log_debug("kf_topic_partition_list: topic: %s, partition: %d, 
offset:%lld]",
+kf_topic_partition_list->elems[i].topic, 
kf_topic_partition_list->elems[i].partition, 
kf_topic_partition_list->elems[i].offset);
+  }
+}
+
+void print_kafka_message(const rd_kafka_message_t* rkmessage, const 
std::shared_ptr& logger) {
+  if (RD_KAFKA_RESP_ERR_NO_ERROR != rkmessage->err) {
+const std::string error_msg = "ConsumeKafka: received error message from 
broker. Librdkafka error msg: " + std::string(rd_kafka_err2str(rkmessage->err));
+throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, error_msg);
+  }
+  std::string topicName = rd_kafka_topic_name(rkmessage->rkt);
+  std::string message(reinterpret_cast(rkmessage->payload), 
rkmessage->len);
+  const char* key = reinterpret_cast(rkmessage->key);
+  const std::size_t key_len = rkmessage->key_len;
+  rd_kafka_timestamp_type_t tstype;
+  int64_t timestamp;
+  timestamp = rd_kafka_message_timestamp(rkmessage, );
+  const char *tsname = "?";
+  if (tstype != RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) {
+if (tstype == RD_KAFKA_TIMESTAMP_CREATE_TIME) {
+  tsname = "create time";
+} else if (tstype == RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME) {
+  tsname = "log append time";
+}
+  }
+  const int64_t seconds_since_timestamp = timestamp ? 
static_cast(time(NULL)) - static_cast(timestamp / 1000) : 0;
+
+  std::string headers_as_string;
+  rd_kafka_headers_t* hdrs;
+  const rd_kafka_resp_err_t get_header_response = 
rd_kafka_message_headers(rkmessage, );
+  if (RD_KAFKA_RESP_ERR_NO_ERROR == get_header_response) {
+std::vector header_list;
+kafka_headers_for_each(hdrs, [&] (const std::string& key, const 
std::string& val) { header_list.emplace_back(key + ": " + val); });
+headers_as_string = StringUtils::join(", ", header_list);
+  } else if (RD_KAFKA_RESP_ERR__NOENT != get_header_response) {
+logger->log_error("Failed to fetch message headers: %d: %s", 
rd_kafka_last_error(), rd_kafka_err2str(rd_kafka_last_error()));
+  }
+
+  std::string message_as_string;
+  message_as_string += "[Topic](" + topicName + "), ";
+  message_as_string += "[Key](" + (key != nullptr ? std::string(key, key_len) 
: std::string("[None]")) + "), ";
+  message_as_string += "[Offset](" +  std::to_string(rkmessage->offset) + "), 
";
+  message_as_string += "[Message Length](" + std::to_string(rkmessage->len) + 
"), ";
+  message_as_string += "[Timestamp](" + std::string(tsname) + " " + 
std::to_string(timestamp) + " (" + std::to_string(seconds_since_timestamp) + " 
s ago)), ";
+  message_as_string += "[Headers](";
+  message_as_string += headers_as_string + "\n";
+  message_as_string += "[Payload](" + message + ")";
+
+  logger -> log_debug("Message: %s", message_as_string.c_str());

Review comment:
   Seems like this was an outdated commit, fixed since then.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL 

[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-08 Thread GitBox


hunyadi-dev commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r553890229



##
File path: extensions/librdkafka/rdkafka_utils.h
##
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/OptionalUtils.h"
+#include "rdkafka.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+enum class KafkaEncoding {
+  UTF8,
+  HEX
+};
+
+struct rd_kafka_conf_deleter {
+  void operator()(rd_kafka_conf_t* ptr) const noexcept { 
rd_kafka_conf_destroy(ptr); }
+};
+
+struct rd_kafka_producer_deleter {
+  void operator()(rd_kafka_t* ptr) const noexcept {
+rd_kafka_resp_err_t flush_ret = rd_kafka_flush(ptr, 1 /* ms */);  // 
Matching the wait time of KafkaConnection.cpp
+// If concerned, we could log potential errors here:
+// if (RD_KAFKA_RESP_ERR__TIMED_OUT == flush_ret) {
+//   std::cerr << "Deleting producer failed: time-out while trying to 
flush" << std::endl;
+// }
+rd_kafka_destroy(ptr);
+  }
+};
+
+struct rd_kafka_consumer_deleter {
+  void operator()(rd_kafka_t* ptr) const noexcept {
+rd_kafka_consumer_close(ptr);
+rd_kafka_destroy(ptr);
+  }
+};
+
+struct rd_kafka_topic_partition_list_deleter {
+  void operator()(rd_kafka_topic_partition_list_t* ptr) const noexcept { 
rd_kafka_topic_partition_list_destroy(ptr); }
+};
+
+struct rd_kafka_topic_conf_deleter {
+  void operator()(rd_kafka_topic_conf_t* ptr) const noexcept { 
rd_kafka_topic_conf_destroy(ptr); }
+};
+struct rd_kafka_topic_deleter {
+  void operator()(rd_kafka_topic_t* ptr) const noexcept { 
rd_kafka_topic_destroy(ptr); }
+};
+
+struct rd_kafka_message_deleter {
+  void operator()(rd_kafka_message_t* ptr) const noexcept { 
rd_kafka_message_destroy(ptr); }
+};
+
+struct rd_kafka_headers_deleter {
+  void operator()(rd_kafka_headers_t* ptr) const noexcept { 
rd_kafka_headers_destroy(ptr); }
+};
+
+template 
+void kafka_headers_for_each(const rd_kafka_headers_t* headers, T&& 
key_value_handle) {
+  const char *key;  // Null terminated, not to be freed
+  const void *value;
+  std::size_t size;
+  for (std::size_t i = 0; RD_KAFKA_RESP_ERR_NO_ERROR == 
rd_kafka_header_get_all(headers, i, , , ); ++i) {
+std::forward(key_value_handle)(std::string(key), 
std::string(static_cast(value), size));

Review comment:
   Good call! Updated.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] hunyadi-dev closed pull request #911: MINIFICPP-1373 - Add proper FlowFile::getAttribute getter [QoL][mini-PR]

2021-01-08 Thread GitBox


hunyadi-dev closed pull request #911:
URL: https://github.com/apache/nifi-minifi-cpp/pull/911


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-08 Thread GitBox


hunyadi-dev commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r553882120



##
File path: libminifi/test/TestBase.cpp
##
@@ -62,78 +63,51 @@ TestPlan::~TestPlan() {
   for (auto& processor : configured_processors_) {
 processor->setScheduledState(core::ScheduledState::STOPPED);
   }
+  for (auto& connection : relationships_) {
+// This is a patch solving circular references between processors and 
connections
+connection->setSource(nullptr);
+connection->setDestination(nullptr);
+  }
   controller_services_provider_->clearControllerServices();
 }
 
 std::shared_ptr TestPlan::addProcessor(const 
std::shared_ptr , const std::string , const 
std::initializer_list& relationships,
-bool linkToPrevious) {
+bool linkToPrevious) {
   if (finalized) {
 return nullptr;
   }
   std::lock_guard guard(mutex);
-
   utils::Identifier uuid = utils::IdGenerator::getIdGenerator()->generate();
-
   processor->setStreamFactory(stream_factory);
   // initialize the processor
   processor->initialize();
   processor->setFlowIdentifier(flow_version_->getFlowIdentifier());
-
   processor_mapping_[processor->getUUID()] = processor;
-
   if (!linkToPrevious) {
 termination_ = *(relationships.begin());
   } else {
 std::shared_ptr last = processor_queue_.back();
-
 if (last == nullptr) {
   last = processor;
   termination_ = *(relationships.begin());
 }
-
-std::stringstream connection_name;
-connection_name << last->getUUIDStr() << "-to-" << processor->getUUIDStr();
-logger_->log_info("Creating %s connection for proc %d", 
connection_name.str(), processor_queue_.size() + 1);
-std::shared_ptr connection = 
std::make_shared(flow_repo_, content_repo_, 
connection_name.str());
-
 for (const auto& relationship : relationships) {
-  connection->addRelationship(relationship);
-}
-
-// link the connections so that we can test results at the end for this
-connection->setSource(last);
-connection->setDestination(processor);
-
-connection->setSourceUUID(last->getUUID());
-connection->setDestinationUUID(processor->getUUID());
-last->addConnection(connection);
-if (last != processor) {
-  processor->addConnection(connection);
+  addConnection(last, relationship, processor);

Review comment:
   I am not sure I understand the question, but It is a prerequisite if you 
want to connect up multiple relationships separately.

##
File path: libminifi/test/TestBase.cpp
##
@@ -62,78 +63,51 @@ TestPlan::~TestPlan() {
   for (auto& processor : configured_processors_) {
 processor->setScheduledState(core::ScheduledState::STOPPED);
   }
+  for (auto& connection : relationships_) {
+// This is a patch solving circular references between processors and 
connections
+connection->setSource(nullptr);
+connection->setDestination(nullptr);
+  }
   controller_services_provider_->clearControllerServices();
 }
 
 std::shared_ptr TestPlan::addProcessor(const 
std::shared_ptr , const std::string , const 
std::initializer_list& relationships,
-bool linkToPrevious) {
+bool linkToPrevious) {
   if (finalized) {
 return nullptr;
   }
   std::lock_guard guard(mutex);
-
   utils::Identifier uuid = utils::IdGenerator::getIdGenerator()->generate();
-
   processor->setStreamFactory(stream_factory);
   // initialize the processor
   processor->initialize();
   processor->setFlowIdentifier(flow_version_->getFlowIdentifier());
-
   processor_mapping_[processor->getUUID()] = processor;
-
   if (!linkToPrevious) {
 termination_ = *(relationships.begin());
   } else {
 std::shared_ptr last = processor_queue_.back();
-
 if (last == nullptr) {
   last = processor;
   termination_ = *(relationships.begin());
 }
-
-std::stringstream connection_name;
-connection_name << last->getUUIDStr() << "-to-" << processor->getUUIDStr();
-logger_->log_info("Creating %s connection for proc %d", 
connection_name.str(), processor_queue_.size() + 1);
-std::shared_ptr connection = 
std::make_shared(flow_repo_, content_repo_, 
connection_name.str());
-
 for (const auto& relationship : relationships) {
-  connection->addRelationship(relationship);
-}
-
-// link the connections so that we can test results at the end for this
-connection->setSource(last);
-connection->setDestination(processor);
-
-connection->setSourceUUID(last->getUUID());
-connection->setDestinationUUID(processor->getUUID());
-last->addConnection(connection);
-if (last != processor) {
-  processor->addConnection(connection);
+  addConnection(last, relationship, processor);

Review comment:
   I am not sure I understand the question, but is it not a prerequisite if 
you want to connect up multiple 

[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-08 Thread GitBox


hunyadi-dev commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r553877982



##
File path: libminifi/include/utils/ProcessorConfigUtils.h
##
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include 
+#include 
+
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+std::string getRequiredPropertyOrThrow(const core::ProcessContext* context, 
const std::string& property_name) {
+  std::string value;
+  if (!context->getProperty(property_name, value)) {
+throw Exception(PROCESS_SCHEDULE_EXCEPTION, property_name + " property 
missing or invalid");
+  }
+  return value;
+}
+
+std::vector listFromCommaSeparatedProperty(const 
core::ProcessContext* context, const std::string& property_name) {
+  std::string property_string;
+  context->getProperty(property_name, property_string);
+  return utils::StringUtils::splitAndTrim(property_string, ",");
+}
+
+std::vector listFromRequiredCommaSeparatedProperty(const 
core::ProcessContext* context, const std::string& property_name) {
+  return utils::StringUtils::splitAndTrim(getRequiredPropertyOrThrow(context, 
property_name), ",");
+}
+
+bool parseBooleanPropertyOrThrow(core::ProcessContext* context, const 
std::string& property_name) {
+  bool value;
+  std::string value_str = getRequiredPropertyOrThrow(context, property_name);
+  utils::optional maybe_value = utils::StringUtils::toBool(value_str);
+  if (!maybe_value) {
+throw Exception(PROCESS_SCHEDULE_EXCEPTION, property_name + " property is 
invalid: value is " + value_str);

Review comment:
   Updated to `runtime_error`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-08 Thread GitBox


hunyadi-dev commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r553877279



##
File path: libminifi/include/core/TypedValues.h
##
@@ -106,6 +106,8 @@ class TimePeriodValue : public TransformableValue, public 
state::response::UInt6
   static bool StringToTime(std::string input, uint64_t , TimeUnit 
) {
 return utils::internal::StringToTime(input, output, timeunit);
   }
+
+  TimePeriodValue& operator=(const TimePeriodValue& other) = default;

Review comment:
   I don't remember anymore :) Removed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-08 Thread GitBox


hunyadi-dev commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r553871478



##
File path: extensions/librdkafka/ConsumeKafka.cpp
##
@@ -0,0 +1,553 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConsumeKafka.h"
+
+#include 
+#include 
+
+#include "core/PropertyValidation.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/gsl.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+// The upper limit for Max Poll Time is 4 seconds. This is because Watchdog 
would potentially start
+// reporting issues with the processor health otherwise
+class ConsumeKafkaMaxPollTimeValidator : public TimePeriodValidator {
+ public:
+  ConsumeKafkaMaxPollTimeValidator(const std::string ) // NOLINT
+  : TimePeriodValidator(name) {
+  }
+  ~ConsumeKafkaMaxPollTimeValidator() override = default;
+
+  ValidationResult validate(const std::string& subject, const std::string& 
input) const override {
+uint64_t value;
+TimeUnit timeUnit;
+uint64_t value_as_ms;
+return 
ValidationResult::Builder::createBuilder().withSubject(subject).withInput(input).isValid(
+core::TimePeriodValue::StringToTime(input, value, timeUnit) &&
+org::apache::nifi::minifi::core::Property::ConvertTimeUnitToMS(value, 
timeUnit, value_as_ms) &&
+0 < value_as_ms && value_as_ms <= 4000).build();
+  }
+};
+}  // namespace core
+namespace processors {
+
+constexpr const std::size_t ConsumeKafka::DEFAULT_MAX_POLL_RECORDS;
+constexpr char const* ConsumeKafka::DEFAULT_MAX_POLL_TIME;
+
+constexpr char const* ConsumeKafka::TOPIC_FORMAT_NAMES;
+constexpr char const* ConsumeKafka::TOPIC_FORMAT_PATTERNS;
+
+core::Property 
ConsumeKafka::KafkaBrokers(core::PropertyBuilder::createProperty("Kafka 
Brokers")
+  ->withDescription("A comma-separated list of known Kafka Brokers in the 
format :.")
+  ->withDefaultValue("localhost:9092", 
core::StandardValidators::get().NON_BLANK_VALIDATOR)
+  ->supportsExpressionLanguage(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::SecurityProtocol(core::PropertyBuilder::createProperty("Security 
Protocol")
+  ->withDescription("This property is currently not supported. Protocol used 
to communicate with brokers. Corresponds to Kafka's 'security.protocol' 
property.")
+  ->withAllowableValues({SECURITY_PROTOCOL_PLAINTEXT/*, 
SECURITY_PROTOCOL_SSL, SECURITY_PROTOCOL_SASL_PLAINTEXT, 
SECURITY_PROTOCOL_SASL_SSL*/ })
+  ->withDefaultValue(SECURITY_PROTOCOL_PLAINTEXT)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::TopicNames(core::PropertyBuilder::createProperty("Topic Names")
+  ->withDescription("The name of the Kafka Topic(s) to pull from. Multiple 
topic names are supported as a comma separated list.")
+  ->supportsExpressionLanguage(true)
+  ->build());
+
+core::Property 
ConsumeKafka::TopicNameFormat(core::PropertyBuilder::createProperty("Topic Name 
Format")
+  ->withDescription("Specifies whether the Topic(s) provided are a comma 
separated list of names or a single regular expression.")
+  ->withAllowableValues({TOPIC_FORMAT_NAMES, 
TOPIC_FORMAT_PATTERNS})
+  ->withDefaultValue(TOPIC_FORMAT_NAMES)
+  ->build());
+
+core::Property 
ConsumeKafka::HonorTransactions(core::PropertyBuilder::createProperty("Honor 
Transactions")
+  ->withDescription(
+  "Specifies whether or not MiNiFi should honor transactional guarantees 
when communicating with Kafka. If false, the Processor will use an \"isolation 
level\" of "
+  "read_uncomitted. This means that messages will be received as soon as 
they are written to Kafka but will be pulled, even if the producer cancels the 
transactions. "
+  "If this value is true, MiNiFi will not receive any messages for which 
the producer's transaction was canceled, but this can result in some latency 
since the consumer "
+  "must wait for the producer to finish its entire transaction instead of 
pulling as the messages become available.")
+  ->withDefaultValue(true)
+  ->isRequired(true)
+  ->build());
+
+core::Property 
ConsumeKafka::GroupID(core::PropertyBuilder::createProperty("Group ID")
+  

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-08 Thread GitBox


szaszm commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r553850276



##
File path: libminifi/include/utils/ProcessorConfigUtils.h
##
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include 
+#include 
+
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+std::string getRequiredPropertyOrThrow(const core::ProcessContext* context, 
const std::string& property_name) {
+  std::string value;
+  if (!context->getProperty(property_name, value)) {
+throw Exception(PROCESS_SCHEDULE_EXCEPTION, property_name + " property 
missing or invalid");
+  }
+  return value;
+}
+
+std::vector listFromCommaSeparatedProperty(const 
core::ProcessContext* context, const std::string& property_name) {
+  std::string property_string;
+  context->getProperty(property_name, property_string);
+  return utils::StringUtils::splitAndTrim(property_string, ",");
+}
+
+std::vector listFromRequiredCommaSeparatedProperty(const 
core::ProcessContext* context, const std::string& property_name) {
+  return utils::StringUtils::splitAndTrim(getRequiredPropertyOrThrow(context, 
property_name), ",");
+}
+
+bool parseBooleanPropertyOrThrow(core::ProcessContext* context, const 
std::string& property_name) {
+  bool value;
+  std::string value_str = getRequiredPropertyOrThrow(context, property_name);
+  utils::optional maybe_value = utils::StringUtils::toBool(value_str);
+  if (!maybe_value) {
+throw Exception(PROCESS_SCHEDULE_EXCEPTION, property_name + " property is 
invalid: value is " + value_str);

Review comment:
   I'm fine with `std::runtime_error` or `std::invalid_argument` as well, 
but I don't want miscategorized exceptions.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (NIFI-1193) Add Hive support to Kite storage processor

2021-01-08 Thread fossilfuelsbeer (Jira)


[ 
https://issues.apache.org/jira/browse/NIFI-1193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17261121#comment-17261121
 ] 

fossilfuelsbeer commented on NIFI-1193:
---

[https://fossilfuelsbeer.com/|https://fossilfuelsbeer.com/]

> Add Hive support to Kite storage processor
> --
>
> Key: NIFI-1193
> URL: https://issues.apache.org/jira/browse/NIFI-1193
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Affects Versions: 0.3.0
>Reporter: Ryan Blue
>Assignee: Ryan Blue
>Priority: Major
> Fix For: 0.5.0
>
>
> When the Kite processors were initially added in NIFI-238, we removed support 
> for sending data directly to Hive tables because the dependencies were too 
> large. Contacting the Hive MetaStore pulled in all of hive-exec and 
> hive-metastore. I've created an alternative that increases the size by only 
> 6.7MB (about 10% of what it was before).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [nifi] lfrancke closed pull request #1941: NIFI-4099

2021-01-08 Thread GitBox


lfrancke closed pull request #1941:
URL: https://github.com/apache/nifi/pull/1941


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-08 Thread GitBox


hunyadi-dev commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r553806575



##
File path: libminifi/include/utils/ProcessorConfigUtils.h
##
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include 
+#include 
+
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+std::string getRequiredPropertyOrThrow(const core::ProcessContext* context, 
const std::string& property_name) {
+  std::string value;
+  if (!context->getProperty(property_name, value)) {
+throw Exception(PROCESS_SCHEDULE_EXCEPTION, property_name + " property 
missing or invalid");
+  }
+  return value;
+}
+
+std::vector listFromCommaSeparatedProperty(const 
core::ProcessContext* context, const std::string& property_name) {
+  std::string property_string;
+  context->getProperty(property_name, property_string);
+  return utils::StringUtils::splitAndTrim(property_string, ",");
+}
+
+std::vector listFromRequiredCommaSeparatedProperty(const 
core::ProcessContext* context, const std::string& property_name) {
+  return utils::StringUtils::splitAndTrim(getRequiredPropertyOrThrow(context, 
property_name), ",");
+}
+
+bool parseBooleanPropertyOrThrow(core::ProcessContext* context, const 
std::string& property_name) {
+  bool value;
+  std::string value_str = getRequiredPropertyOrThrow(context, property_name);
+  utils::optional maybe_value = utils::StringUtils::toBool(value_str);
+  if (!maybe_value) {
+throw Exception(PROCESS_SCHEDULE_EXCEPTION, property_name + " property is 
invalid: value is " + value_str);
+  }
+  return maybe_value.value();
+}
+
+std::chrono::milliseconds parseTimePropertyMSOrThrow(core::ProcessContext* 
context, const std::string& property_name) {
+  core::TimeUnit unit;
+  uint64_t time_value_ms;
+  if (!core::Property::StringToTime(getRequiredPropertyOrThrow(context, 
property_name), time_value_ms, unit) || 
!core::Property::ConvertTimeUnitToMS(time_value_ms, unit, time_value_ms)) {
+throw Exception(PROCESS_SCHEDULE_EXCEPTION, property_name + " property 
missing or invalid");
+  }
+  return std::chrono::milliseconds(time_value_ms);
+}
+
+utils::optional getOptionalUintProperty(core::ProcessContext* 
context, const std::string& property_name) {

Review comment:
   Good comment! Updated.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] lfrancke closed pull request #2985: NIFI-5573 Allow overriding of nifi-env.sh

2021-01-08 Thread GitBox


lfrancke closed pull request #2985:
URL: https://github.com/apache/nifi/pull/2985


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-08 Thread GitBox


hunyadi-dev commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r553796377



##
File path: libminifi/include/utils/ProcessorConfigUtils.h
##
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include 
+#include 
+
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+std::string getRequiredPropertyOrThrow(const core::ProcessContext* context, 
const std::string& property_name) {
+  std::string value;
+  if (!context->getProperty(property_name, value)) {
+throw Exception(PROCESS_SCHEDULE_EXCEPTION, property_name + " property 
missing or invalid");
+  }
+  return value;
+}
+
+std::vector listFromCommaSeparatedProperty(const 
core::ProcessContext* context, const std::string& property_name) {
+  std::string property_string;
+  context->getProperty(property_name, property_string);
+  return utils::StringUtils::splitAndTrim(property_string, ",");
+}
+
+std::vector listFromRequiredCommaSeparatedProperty(const 
core::ProcessContext* context, const std::string& property_name) {
+  return utils::StringUtils::splitAndTrim(getRequiredPropertyOrThrow(context, 
property_name), ",");
+}
+
+bool parseBooleanPropertyOrThrow(core::ProcessContext* context, const 
std::string& property_name) {
+  bool value;
+  std::string value_str = getRequiredPropertyOrThrow(context, property_name);
+  utils::optional maybe_value = utils::StringUtils::toBool(value_str);
+  if (!maybe_value) {
+throw Exception(PROCESS_SCHEDULE_EXCEPTION, property_name + " property is 
invalid: value is " + value_str);
+  }
+  return maybe_value.value();
+}
+
+std::chrono::milliseconds parseTimePropertyMSOrThrow(core::ProcessContext* 
context, const std::string& property_name) {
+  core::TimeUnit unit;
+  uint64_t time_value_ms;
+  if (!core::Property::StringToTime(getRequiredPropertyOrThrow(context, 
property_name), time_value_ms, unit) || 
!core::Property::ConvertTimeUnitToMS(time_value_ms, unit, time_value_ms)) {
+throw Exception(PROCESS_SCHEDULE_EXCEPTION, property_name + " property 
missing or invalid");

Review comment:
   Updated the error message.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org