[GitHub] [nifi] pvillard31 edited a comment on pull request #4768: NIFI-8155 - add banner text in page title

2021-01-20 Thread GitBox


pvillard31 edited a comment on pull request #4768:
URL: https://github.com/apache/nifi/pull/4768#issuecomment-764453581


   Thanks for looking into this @exceptionfactory. As far as I can tell the 
other places you mentioned are not "really" opening new windows and this not 
impacting the overall page title:
   
   https://user-images.githubusercontent.com/11541012/105320361-801ab280-5bdf-11eb-8fc0-969aac536b0c.png;>
   
   The only edge case I can think of is when looking at the content of a flow 
file: this will open a new tab/window and the title won't contain the info. But 
I believe this a very special edge case and I think this is best to leave it 
outside of the change.
   
   https://user-images.githubusercontent.com/11541012/105320689-e8699400-5bdf-11eb-84a7-dff5269efa25.png;>
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] pvillard31 commented on pull request #4768: NIFI-8155 - add banner text in page title

2021-01-20 Thread GitBox


pvillard31 commented on pull request #4768:
URL: https://github.com/apache/nifi/pull/4768#issuecomment-764453581


   Thanks for looking into this @exceptionfactory. As far as I can tell the 
other places you mentioned are not "really" opening new windows and this not 
impacting the overall page title:
   
   https://user-images.githubusercontent.com/11541012/105320361-801ab280-5bdf-11eb-8fc0-969aac536b0c.png;>
   
   The only edge case I can think of is when looking at the content of a flow 
file: this will open a new tab/window and the title won't contain the info. But 
I believe this a very special edge case and I think this is best to leave it 
outside of the change.
   
   https://user-images.githubusercontent.com/11541012/105320626-d687f100-5bdf-11eb-9785-7dc48cb1da51.png;>
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (NIFI-7263) Add a No tracking Strategy to ListFile/ListFTP

2021-01-20 Thread Jens M Kofoed (Jira)


[ 
https://issues.apache.org/jira/browse/NIFI-7263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17269099#comment-17269099
 ] 

Jens M Kofoed commented on NIFI-7263:
-

I have 2 cases.

Case 1:
 We have a 2 networks which is isolated from each other with a datadiode. So 
one of the network has no contact to the internet. From time to time we need to 
transfer a new driver and software from the internet at transfer it to the 
inside network. All transfering flows is handled by NIFI. in this case we have 
a drop-folder where NIFI moves all files to a similar folder on the inside. 
Here we use a List and Fetch processors instead of a getfile because we like to 
use the benefit of the cluster. Since we manually copy files to the drop 
folder, these files keeps there timestamp. So we can not use the "Tracking 
Timestamps" strategy. If using the "Tracking Entities" strategy the "Entity 
Tracking Time Window" needs to be set to years.

Case 2:
We have a file server where different systems write files in different 
subfolders. We use NIFI to Syncronise all files looking in the root folder and 
set Recurse Subdirectories to true. We are not allowed to delete files. So all 
files will be there all the time. Therefore we can't use a GetFile process.
If we use the "Tracking Timestamps" strategy we have had a situation where a 
file was not picked up by NIFI. If there are many files when NIFI start 
scanning all files/folders, and a new files is written to the first folder just 
after NIFI has looked in that folder, this file will not be in the list. If 
another file is written to the last folder NIFI is scanning it will be in the 
list and that file will have a newer/younger timestamp. So next time NIFI is 
scanning the file will not be picked up, because it will be older than the last 
timestamp.
Therefore we are using the "Tracking Entities" strategy which has another 
issue. If you are using a filter regex and change it. The "Tracking Entities" 
starts all over again, listing all files.
Therefore we have made our own flow where we create a hash value from path, 
filename, filesize and timestamp check if that hash have been seen before.
We have had situation where some kind of files needed to be transferred again. 
So with our own detection flow we can route all duplicated Hashes to a subflow 
and create a new route for special situations.

In both these cases we really don't need any strategy built-in in the listfile 
processor. We just need it to list all files, no matter timestamp.

 

> Add a No tracking Strategy to ListFile/ListFTP
> --
>
> Key: NIFI-7263
> URL: https://issues.apache.org/jira/browse/NIFI-7263
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Jens M Kofoed
>Assignee: Waleed Al Aibani
>Priority: Major
>  Labels: ListFile, listftp
> Fix For: 1.13.0
>
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> The Listfile/ListFTP has 2 Listing Strategies: Tracking Timestamps and 
> Tracking Entities.
> It would be very very nice if the List process also could have a No Tracking 
> (fix it your self) strategy
> If running NIFI in a cluster the List/Fetch is the perfect solution instead 
> of using a GetFile. But we have had many caces where files in the pickup 
> folder has old timestamps, so here we have to use Tracking Entities.
> The issue is in cases where you are not allowed to delete files but you have 
> to make a change to the file filter. The tracking entities start all over, 
> and list all files again.
> In other situations we need to resent all data, and would like to clear the 
> state of the Tracking Entities. But you can't.
> So I have to make a small flow for detecting duplicates. And in some cases 
> just ignore duplicates and in other caces open up for sending duplicates. But 
> it is a pain in the ... to use the Tracking Entities.
> So a NO STRATEGY would be very very nice



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (NIFI-8158) Extensibility of NiFi itself

2021-01-20 Thread Jira
Maciej Gromuł created NIFI-8158:
---

 Summary: Extensibility of NiFi itself
 Key: NIFI-8158
 URL: https://issues.apache.org/jira/browse/NIFI-8158
 Project: Apache NiFi
  Issue Type: Improvement
Reporter: Maciej Gromuł


It would be nice if it were possible to add custom modules to application 
through nar's loaded together with app at first boot or through extensions. 
Currently if we would want to add anything we would need to recompile whole 
nifi-framework-bundle since it contains all resources without exposing any way 
of accessing them. That's not about custom processors but more like adding 
functionality to nifi itself without having to create forks of whole nifi and 
then managing updates between those.

For example a way to hook with the spring context to create additional 
endpoints to api, and way to hook to menu builder (which currently is simply a 
static list in html connected with js) to add your own menu options with custom 
actions. That way we could for example add something like marketplace solution 
for nifi where users can search for processors in some kind of repository and 
install those processors dynamically (similar to marketplaces in CMS systems). 
Currently few of the places in the app are theoretically behind interfaces but 
the implementations are still hardcoded.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (NIFI-8156) PutCassandraRecord does not wrap byte arrays with a ByteBuffer, causing write failures

2021-01-20 Thread ASF subversion and git services (Jira)


[ 
https://issues.apache.org/jira/browse/NIFI-8156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17269086#comment-17269086
 ] 

ASF subversion and git services commented on NIFI-8156:
---

Commit 953327cdf587c6b68765c0d32508873d8a0031e7 in nifi's branch 
refs/heads/main from Mike Thomsen
[ https://gitbox.apache.org/repos/asf?p=nifi.git;h=953327c ]

NIFI-8156 Fixed byte handling bug in cassandra.

Signed-off-by: Pierre Villard 

This closes #4771.


> PutCassandraRecord does not wrap byte arrays with a ByteBuffer, causing write 
> failures
> --
>
> Key: NIFI-8156
> URL: https://issues.apache.org/jira/browse/NIFI-8156
> Project: Apache NiFi
>  Issue Type: Bug
>Affects Versions: 1.12.1
>Reporter: Mike Thomsen
>Assignee: Mike Thomsen
>Priority: Major
> Fix For: 1.13.0
>
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> As the subject line says, types that are meant for a Cassandra bytes field 
> are not wrapped inside of a ByteBuffer. This causes a write failure when the 
> Cassandra driver attempts to write the array.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (NIFI-8156) PutCassandraRecord does not wrap byte arrays with a ByteBuffer, causing write failures

2021-01-20 Thread Pierre Villard (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFI-8156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pierre Villard resolved NIFI-8156.
--
Resolution: Fixed

> PutCassandraRecord does not wrap byte arrays with a ByteBuffer, causing write 
> failures
> --
>
> Key: NIFI-8156
> URL: https://issues.apache.org/jira/browse/NIFI-8156
> Project: Apache NiFi
>  Issue Type: Bug
>Affects Versions: 1.12.1
>Reporter: Mike Thomsen
>Assignee: Mike Thomsen
>Priority: Major
> Fix For: 1.13.0
>
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> As the subject line says, types that are meant for a Cassandra bytes field 
> are not wrapped inside of a ByteBuffer. This causes a write failure when the 
> Cassandra driver attempts to write the array.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [nifi] asfgit closed pull request #4771: NIFI-8156 Fixed byte handling bug in cassandra.

2021-01-20 Thread GitBox


asfgit closed pull request #4771:
URL: https://github.com/apache/nifi/pull/4771


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (NIFI-8043) PutDatabaseRecord Postgres Upsert On Conflict keys not quoted

2021-01-20 Thread Pierre Villard (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFI-8043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pierre Villard updated NIFI-8043:
-
Fix Version/s: 1.13.0
   Resolution: Fixed
   Status: Resolved  (was: Patch Available)

> PutDatabaseRecord Postgres Upsert On Conflict keys not quoted
> -
>
> Key: NIFI-8043
> URL: https://issues.apache.org/jira/browse/NIFI-8043
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Extensions
>Reporter: Daniel Cheung
>Assignee: Matt Burgess
>Priority: Major
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> h2. First attempt with camel case (fails when translate field name is true or 
> false)
> Given that "Quote Column Identifiers" is enabled, one would expect the column 
> names inside the conflict clause be quoted as well. However, they didn't seem 
> to have been quoted, because my table's column names contain upper and 
> lowercases and the flowfile is routed to the failure relationship of the 
> PutDatabaseRecord processor with the DB error: {{ERROR: column "camelcase" 
> does not exist}}.
> Whether setting "Update Keys" or not did not affect the outcome. If I 
> understand, "Update Keys" would also affect the conflict clause, but it's 
> also not quoted, and does not accept a string with manually quoted column 
> names.
> SQL in question found in the DB error in the log, simplified from what I saw.
> {{INSERT INTO "public"."my_table"("camelCase", "txt")}}
>  {{VALUES ("test", "test")}}
>  {{ON CONFLICT (CAMELCASE)}}
>  {{DO UPDATE SET ("camelCase", "txt") = (}}
>  {{    EXCLUDED."camelCase",}}
>  {{    EXCLUDED."txt"}}
>  {{)}}
> h2. Second attempt with snake case (fails when translate field name is true)
> I changed my column names to {{_snake_case, txt}} and try upserting again and 
> it still failed with this SQL in nifi-app.log:
> {{INSERT INTO "public"."my_table"("_snake_case", "txt")}}
>  {{VALUES ("test", "test")}}
>  {{ON CONFLICT (SNAKECASE)}}
>  {{DO UPDATE SET ("}}{{_snake_case}}{{", "txt") = (}}
>  {{    EXCLUDED."}}{{_snake_case}}{{",}}
>  {{    EXCLUDED."txt"}}
>  {{)}}
>  
> h2. Current workaround
> I currently need to *disable translate field name* and set my table to *use 
> snake case names as column names* to be able to use upsert



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (NIFI-8043) PutDatabaseRecord Postgres Upsert On Conflict keys not quoted

2021-01-20 Thread ASF subversion and git services (Jira)


[ 
https://issues.apache.org/jira/browse/NIFI-8043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17269085#comment-17269085
 ] 

ASF subversion and git services commented on NIFI-8043:
---

Commit fb2a8b5820b816d7afddb5141cd064275267f796 in nifi's branch 
refs/heads/main from Matt Burgess
[ https://gitbox.apache.org/repos/asf?p=nifi.git;h=fb2a8b5 ]

NIFI-8043: Quote update key column names in PutDatabaseRecord

Signed-off-by: Pierre Villard 

This closes #4772.


> PutDatabaseRecord Postgres Upsert On Conflict keys not quoted
> -
>
> Key: NIFI-8043
> URL: https://issues.apache.org/jira/browse/NIFI-8043
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Extensions
>Reporter: Daniel Cheung
>Assignee: Matt Burgess
>Priority: Major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> h2. First attempt with camel case (fails when translate field name is true or 
> false)
> Given that "Quote Column Identifiers" is enabled, one would expect the column 
> names inside the conflict clause be quoted as well. However, they didn't seem 
> to have been quoted, because my table's column names contain upper and 
> lowercases and the flowfile is routed to the failure relationship of the 
> PutDatabaseRecord processor with the DB error: {{ERROR: column "camelcase" 
> does not exist}}.
> Whether setting "Update Keys" or not did not affect the outcome. If I 
> understand, "Update Keys" would also affect the conflict clause, but it's 
> also not quoted, and does not accept a string with manually quoted column 
> names.
> SQL in question found in the DB error in the log, simplified from what I saw.
> {{INSERT INTO "public"."my_table"("camelCase", "txt")}}
>  {{VALUES ("test", "test")}}
>  {{ON CONFLICT (CAMELCASE)}}
>  {{DO UPDATE SET ("camelCase", "txt") = (}}
>  {{    EXCLUDED."camelCase",}}
>  {{    EXCLUDED."txt"}}
>  {{)}}
> h2. Second attempt with snake case (fails when translate field name is true)
> I changed my column names to {{_snake_case, txt}} and try upserting again and 
> it still failed with this SQL in nifi-app.log:
> {{INSERT INTO "public"."my_table"("_snake_case", "txt")}}
>  {{VALUES ("test", "test")}}
>  {{ON CONFLICT (SNAKECASE)}}
>  {{DO UPDATE SET ("}}{{_snake_case}}{{", "txt") = (}}
>  {{    EXCLUDED."}}{{_snake_case}}{{",}}
>  {{    EXCLUDED."txt"}}
>  {{)}}
>  
> h2. Current workaround
> I currently need to *disable translate field name* and set my table to *use 
> snake case names as column names* to be able to use upsert



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [nifi] asfgit closed pull request #4772: NIFI-8043: Quote update key column names in PutDatabaseRecord

2021-01-20 Thread GitBox


asfgit closed pull request #4772:
URL: https://github.com/apache/nifi/pull/4772


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] pvillard31 commented on a change in pull request #4594: NIFI-3669 Add SSL Support to CaptureChangeMySQL

2021-01-20 Thread GitBox


pvillard31 commented on a change in pull request #4594:
URL: https://github.com/apache/nifi/pull/4594#discussion_r561646469



##
File path: 
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
##
@@ -368,6 +393,23 @@
 
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
 .build();
 
+public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
+.name("SSL Context Service")
+.displayName("SSL Context Service")
+.description("SSL Context Service supporting encrypted socket 
communication")
+.required(false)
+.identifiesControllerService(SSLContextService.class)
+.build();
+
+public static final PropertyDescriptor SSL_MODE = new 
PropertyDescriptor.Builder()

Review comment:
   Do we want to use the new .dependsOn functionality to display this 
property only if the SSL Context Service is set?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (NIFI-8157) Adding support for other kinds of Parameter Contexts

2021-01-20 Thread Jira
Maciej Gromuł created NIFI-8157:
---

 Summary: Adding support for other kinds of Parameter Contexts
 Key: NIFI-8157
 URL: https://issues.apache.org/jira/browse/NIFI-8157
 Project: Apache NiFi
  Issue Type: New Feature
  Components: Extensions
Reporter: Maciej Gromuł


Currently there's only 1 implementation of Parameter Context which is standard 
parameter context. It would be nice if it would be possible to provide to nifi 
your own implementation. In our case we've got a service which is leveraging 
AWS KMS to store credentials and some additional stuff, but current 
implementation doesn't support connecting to it or providing any way to add 
custom Parameter Context.

 

It would be nice if we could provide custom implementation which would then be 
used in nifi (it would get serialized/deserialized etc.) get spread on the 
nodes in cluster mode.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (NIFI-8043) PutDatabaseRecord Postgres Upsert On Conflict keys not quoted

2021-01-20 Thread Matt Burgess (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFI-8043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matt Burgess updated NIFI-8043:
---
Status: Patch Available  (was: In Progress)

> PutDatabaseRecord Postgres Upsert On Conflict keys not quoted
> -
>
> Key: NIFI-8043
> URL: https://issues.apache.org/jira/browse/NIFI-8043
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Extensions
>Reporter: Daniel Cheung
>Assignee: Matt Burgess
>Priority: Major
>
> h2. First attempt with camel case (fails when translate field name is true or 
> false)
> Given that "Quote Column Identifiers" is enabled, one would expect the column 
> names inside the conflict clause be quoted as well. However, they didn't seem 
> to have been quoted, because my table's column names contain upper and 
> lowercases and the flowfile is routed to the failure relationship of the 
> PutDatabaseRecord processor with the DB error: {{ERROR: column "camelcase" 
> does not exist}}.
> Whether setting "Update Keys" or not did not affect the outcome. If I 
> understand, "Update Keys" would also affect the conflict clause, but it's 
> also not quoted, and does not accept a string with manually quoted column 
> names.
> SQL in question found in the DB error in the log, simplified from what I saw.
> {{INSERT INTO "public"."my_table"("camelCase", "txt")}}
>  {{VALUES ("test", "test")}}
>  {{ON CONFLICT (CAMELCASE)}}
>  {{DO UPDATE SET ("camelCase", "txt") = (}}
>  {{    EXCLUDED."camelCase",}}
>  {{    EXCLUDED."txt"}}
>  {{)}}
> h2. Second attempt with snake case (fails when translate field name is true)
> I changed my column names to {{_snake_case, txt}} and try upserting again and 
> it still failed with this SQL in nifi-app.log:
> {{INSERT INTO "public"."my_table"("_snake_case", "txt")}}
>  {{VALUES ("test", "test")}}
>  {{ON CONFLICT (SNAKECASE)}}
>  {{DO UPDATE SET ("}}{{_snake_case}}{{", "txt") = (}}
>  {{    EXCLUDED."}}{{_snake_case}}{{",}}
>  {{    EXCLUDED."txt"}}
>  {{)}}
>  
> h2. Current workaround
> I currently need to *disable translate field name* and set my table to *use 
> snake case names as column names* to be able to use upsert



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (NIFI-8043) PutDatabaseRecord Postgres Upsert On Conflict keys not quoted

2021-01-20 Thread Matt Burgess (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFI-8043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matt Burgess updated NIFI-8043:
---
Affects Version/s: (was: 1.12.0)

> PutDatabaseRecord Postgres Upsert On Conflict keys not quoted
> -
>
> Key: NIFI-8043
> URL: https://issues.apache.org/jira/browse/NIFI-8043
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Extensions
>Reporter: Daniel Cheung
>Assignee: Matt Burgess
>Priority: Major
>
> h2. First attempt with camel case (fails when translate field name is true or 
> false)
> Given that "Quote Column Identifiers" is enabled, one would expect the column 
> names inside the conflict clause be quoted as well. However, they didn't seem 
> to have been quoted, because my table's column names contain upper and 
> lowercases and the flowfile is routed to the failure relationship of the 
> PutDatabaseRecord processor with the DB error: {{ERROR: column "camelcase" 
> does not exist}}.
> Whether setting "Update Keys" or not did not affect the outcome. If I 
> understand, "Update Keys" would also affect the conflict clause, but it's 
> also not quoted, and does not accept a string with manually quoted column 
> names.
> SQL in question found in the DB error in the log, simplified from what I saw.
> {{INSERT INTO "public"."my_table"("camelCase", "txt")}}
>  {{VALUES ("test", "test")}}
>  {{ON CONFLICT (CAMELCASE)}}
>  {{DO UPDATE SET ("camelCase", "txt") = (}}
>  {{    EXCLUDED."camelCase",}}
>  {{    EXCLUDED."txt"}}
>  {{)}}
> h2. Second attempt with snake case (fails when translate field name is true)
> I changed my column names to {{_snake_case, txt}} and try upserting again and 
> it still failed with this SQL in nifi-app.log:
> {{INSERT INTO "public"."my_table"("_snake_case", "txt")}}
>  {{VALUES ("test", "test")}}
>  {{ON CONFLICT (SNAKECASE)}}
>  {{DO UPDATE SET ("}}{{_snake_case}}{{", "txt") = (}}
>  {{    EXCLUDED."}}{{_snake_case}}{{",}}
>  {{    EXCLUDED."txt"}}
>  {{)}}
>  
> h2. Current workaround
> I currently need to *disable translate field name* and set my table to *use 
> snake case names as column names* to be able to use upsert



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [nifi] mtien-apache commented on a change in pull request #4767: NIFI-1355 Implemented new methods in KeyStoreUtils to programmatical…

2021-01-20 Thread GitBox


mtien-apache commented on a change in pull request #4767:
URL: https://github.com/apache/nifi/pull/4767#discussion_r561433596



##
File path: 
nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/KeyStoreUtils.java
##
@@ -245,7 +322,7 @@ public static TrustManagerFactory 
loadTrustManagerFactory(TlsConfiguration tlsCo
  */
 public static TrustManagerFactory loadTrustManagerFactory(String 
truststorePath, String truststorePassword, String truststoreType) throws 
TlsException {
 // Legacy truststore passwords can be empty
-final char[] truststorePasswordChars = 
StringUtils.isNotBlank(truststorePassword) ? truststorePassword.toCharArray() : 
null;
+final char[] truststorePasswordChars = 
StringUtils.isNotBlank(truststorePassword) ? truststorePassword.toCharArray() : 
"".toCharArray();

Review comment:
   @exceptionfactory I received a Null Pointer Exception for an empty 
password when the truststore type is PKCS12, so I changed it to an empty 
string. But after some investigation, I found that the Bouncy Castle PKCS12 
store type does not allow empty passwords. 
   
   Since we allow passwordless truststores, I'll add a check for the truststore 
type. If it's PKCS12, then I'll throw an Illegal Argument Exception, otherwise 
I'll set it back to `null`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] mattyb149 opened a new pull request #4772: NIFI-8043: Quote update key column names in PutDatabaseRecord

2021-01-20 Thread GitBox


mattyb149 opened a new pull request #4772:
URL: https://github.com/apache/nifi/pull/4772


   Thank you for submitting a contribution to Apache NiFi.
   
   Please provide a short description of the PR here:
   
    Description of PR
   
   The "Quote Column Names" property was not being honored for update keys 
gleaned from the table metadata. This PR quotes them (if configured to do so) 
before passing to the DatabaseAdapter for UPSERT and UPDATE.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [x] Is there a JIRA ticket associated with this PR? Is it referenced 
in the commit message?
   
   - [x] Does your PR title start with **NIFI-** where  is the JIRA 
number you are trying to resolve? Pay particular attention to the hyphen "-" 
character.
   
   - [x] Has your PR been rebased against the latest commit within the target 
branch (typically `main`)?
   
   - [x] Is your initial contribution a single, squashed commit? _Additional 
commits in response to PR reviewer feedback should be made on this branch and 
pushed to allow change tracking. Do not `squash` or use `--force` when pushing 
to allow for clean monitoring of changes._
   
   ### For code changes:
   - [ ] Have you ensured that the full suite of tests is executed via `mvn 
-Pcontrib-check clean install` at the root `nifi` folder?
   - [x] Have you written or updated unit tests to verify your changes?
   - [x] Have you verified that the full build is successful on JDK 8?
   - [ ] Have you verified that the full build is successful on JDK 11?
   - [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
   - [ ] If applicable, have you updated the `LICENSE` file, including the main 
`LICENSE` file under `nifi-assembly`?
   - [ ] If applicable, have you updated the `NOTICE` file, including the main 
`NOTICE` file found under `nifi-assembly`?
   - [ ] If adding new Properties, have you added `.displayName` in addition to 
.name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which 
it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI for 
build issues and submit an update to your PR as soon as possible.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (NIFI-8043) PutDatabaseRecord Postgres Upsert On Conflict keys not quoted

2021-01-20 Thread Matt Burgess (Jira)


[ 
https://issues.apache.org/jira/browse/NIFI-8043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17268952#comment-17268952
 ] 

Matt Burgess commented on NIFI-8043:


The key column names don't get quoted in PutDatabaseRecord before passing them 
to the database adapter to generate an UPSERT statement.

> PutDatabaseRecord Postgres Upsert On Conflict keys not quoted
> -
>
> Key: NIFI-8043
> URL: https://issues.apache.org/jira/browse/NIFI-8043
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Extensions
>Affects Versions: 1.12.0
>Reporter: Daniel Cheung
>Assignee: Matt Burgess
>Priority: Major
>
> h2. First attempt with camel case (fails when translate field name is true or 
> false)
> Given that "Quote Column Identifiers" is enabled, one would expect the column 
> names inside the conflict clause be quoted as well. However, they didn't seem 
> to have been quoted, because my table's column names contain upper and 
> lowercases and the flowfile is routed to the failure relationship of the 
> PutDatabaseRecord processor with the DB error: {{ERROR: column "camelcase" 
> does not exist}}.
> Whether setting "Update Keys" or not did not affect the outcome. If I 
> understand, "Update Keys" would also affect the conflict clause, but it's 
> also not quoted, and does not accept a string with manually quoted column 
> names.
> SQL in question found in the DB error in the log, simplified from what I saw.
> {{INSERT INTO "public"."my_table"("camelCase", "txt")}}
>  {{VALUES ("test", "test")}}
>  {{ON CONFLICT (CAMELCASE)}}
>  {{DO UPDATE SET ("camelCase", "txt") = (}}
>  {{    EXCLUDED."camelCase",}}
>  {{    EXCLUDED."txt"}}
>  {{)}}
> h2. Second attempt with snake case (fails when translate field name is true)
> I changed my column names to {{_snake_case, txt}} and try upserting again and 
> it still failed with this SQL in nifi-app.log:
> {{INSERT INTO "public"."my_table"("_snake_case", "txt")}}
>  {{VALUES ("test", "test")}}
>  {{ON CONFLICT (SNAKECASE)}}
>  {{DO UPDATE SET ("}}{{_snake_case}}{{", "txt") = (}}
>  {{    EXCLUDED."}}{{_snake_case}}{{",}}
>  {{    EXCLUDED."txt"}}
>  {{)}}
>  
> h2. Current workaround
> I currently need to *disable translate field name* and set my table to *use 
> snake case names as column names* to be able to use upsert



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (NIFI-8043) PutDatabaseRecord Postgres Upsert On Conflict keys not quoted

2021-01-20 Thread Matt Burgess (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFI-8043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matt Burgess reassigned NIFI-8043:
--

Assignee: Matt Burgess

> PutDatabaseRecord Postgres Upsert On Conflict keys not quoted
> -
>
> Key: NIFI-8043
> URL: https://issues.apache.org/jira/browse/NIFI-8043
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Extensions
>Affects Versions: 1.12.0
>Reporter: Daniel Cheung
>Assignee: Matt Burgess
>Priority: Major
>
> h2. First attempt with camel case (fails when translate field name is true or 
> false)
> Given that "Quote Column Identifiers" is enabled, one would expect the column 
> names inside the conflict clause be quoted as well. However, they didn't seem 
> to have been quoted, because my table's column names contain upper and 
> lowercases and the flowfile is routed to the failure relationship of the 
> PutDatabaseRecord processor with the DB error: {{ERROR: column "camelcase" 
> does not exist}}.
> Whether setting "Update Keys" or not did not affect the outcome. If I 
> understand, "Update Keys" would also affect the conflict clause, but it's 
> also not quoted, and does not accept a string with manually quoted column 
> names.
> SQL in question found in the DB error in the log, simplified from what I saw.
> {{INSERT INTO "public"."my_table"("camelCase", "txt")}}
>  {{VALUES ("test", "test")}}
>  {{ON CONFLICT (CAMELCASE)}}
>  {{DO UPDATE SET ("camelCase", "txt") = (}}
>  {{    EXCLUDED."camelCase",}}
>  {{    EXCLUDED."txt"}}
>  {{)}}
> h2. Second attempt with snake case (fails when translate field name is true)
> I changed my column names to {{_snake_case, txt}} and try upserting again and 
> it still failed with this SQL in nifi-app.log:
> {{INSERT INTO "public"."my_table"("_snake_case", "txt")}}
>  {{VALUES ("test", "test")}}
>  {{ON CONFLICT (SNAKECASE)}}
>  {{DO UPDATE SET ("}}{{_snake_case}}{{", "txt") = (}}
>  {{    EXCLUDED."}}{{_snake_case}}{{",}}
>  {{    EXCLUDED."txt"}}
>  {{)}}
>  
> h2. Current workaround
> I currently need to *disable translate field name* and set my table to *use 
> snake case names as column names* to be able to use upsert



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [nifi] thenatog commented on pull request #4753: NIFI-7356 - Enable TLS for embedded Zookeeper when NiFi has TLS enabled

2021-01-20 Thread GitBox


thenatog commented on pull request #4753:
URL: https://github.com/apache/nifi/pull/4753#issuecomment-764042674


   I've also updated the admin-guide similar to requested above. Let me know if 
any further changes are required.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] turcsanyip commented on a change in pull request #4738: NIFI-7890 - Added record support to ConsumeMQTT processor

2021-01-20 Thread GitBox


turcsanyip commented on a change in pull request #4738:
URL: https://github.com/apache/nifi/pull/4738#discussion_r561332768



##
File path: 
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##
@@ -334,14 +443,210 @@ public void process(final OutputStream out) throws 
IOException {
 if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) {
 logger.warn(new StringBuilder("FlowFile ")
 
.append(messageFlowfile.getAttribute(CoreAttributes.UUID.key()))
-.append(" for Mqtt message ")
+.append(" for MQTT message ")
 .append(mqttMessage)
 .append(" had already been removed from queue, 
possible duplication of flow files")
 .toString());
 }
 }
 }
 
+private void transferQueueDemarcator(final ProcessContext context, final 
ProcessSession session){
+final byte[] demarcator = 
context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
+
+FlowFile messageFlowfile = session.create();
+session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker);
+
+
+messageFlowfile = session.append(messageFlowfile, out -> {
+while (!mqttQueue.isEmpty()) {
+final MQTTQueueMessage mqttMessage = mqttQueue.poll();
+out.write(mqttMessage.getPayload());
+out.write(demarcator);
+session.adjustCounter(COUNTER_RECORDS_RECEIVED, 1L, false);
+}
+});
+
+session.getProvenanceReporter().receive(messageFlowfile, new 
StringBuilder(broker).append(topicPrefix).append(topicFilter).toString());

Review comment:
   There is no separator character between the broker and the topic prefix 
(eg.: `tcp://myhost:1883mytopic`).
   `'/'` cloud be added before topic prefix.
   It could be changed in the existing `transferQueue()` method too. 

##
File path: 
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##
@@ -334,14 +443,210 @@ public void process(final OutputStream out) throws 
IOException {
 if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) {
 logger.warn(new StringBuilder("FlowFile ")
 
.append(messageFlowfile.getAttribute(CoreAttributes.UUID.key()))
-.append(" for Mqtt message ")
+.append(" for MQTT message ")
 .append(mqttMessage)
 .append(" had already been removed from queue, 
possible duplication of flow files")
 .toString());
 }
 }
 }
 
+private void transferQueueDemarcator(final ProcessContext context, final 
ProcessSession session){
+final byte[] demarcator = 
context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
+
+FlowFile messageFlowfile = session.create();
+session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker);
+
+
+messageFlowfile = session.append(messageFlowfile, out -> {
+while (!mqttQueue.isEmpty()) {

Review comment:
   Emptying the queue seems to me a bit non-deterministic behaviour because 
the queue is being written at the same time by the receiver thread.
   Would not it be useful to define a max. size that may be fetched in one go? 
(a magic number or a processor property) 

##
File path: 
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##
@@ -334,14 +443,210 @@ public void process(final OutputStream out) throws 
IOException {
 if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) {
 logger.warn(new StringBuilder("FlowFile ")
 
.append(messageFlowfile.getAttribute(CoreAttributes.UUID.key()))
-.append(" for Mqtt message ")
+.append(" for MQTT message ")
 .append(mqttMessage)
 .append(" had already been removed from queue, 
possible duplication of flow files")
 .toString());
 }
 }
 }
 
+private void transferQueueDemarcator(final ProcessContext context, final 
ProcessSession session){
+final byte[] demarcator = 
context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
+
+FlowFile messageFlowfile = session.create();
+session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker);
+
+
+messageFlowfile = session.append(messageFlowfile, out -> {
+

[jira] [Updated] (NIFI-8154) AvroParquetHDFSRecordReader fails to read parquet file containing nested structs

2021-01-20 Thread Glenn Jones (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFI-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Glenn Jones updated NIFI-8154:
--
Status: Patch Available  (was: Open)

> AvroParquetHDFSRecordReader fails to read parquet file containing nested 
> structs
> 
>
> Key: NIFI-8154
> URL: https://issues.apache.org/jira/browse/NIFI-8154
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Extensions
>Affects Versions: 1.12.1, 1.11.3
>Reporter: Glenn Jones
>Priority: Minor
> Attachments: 
> 0001-NIFI-8154-upversion-parquet-avro-to-1.11.1-and-add-u.patch
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> FetchParquet can't be used to process files containing nested structs.  When 
> trying to create a RecordSchema it runs into 
> https://issues.apache.org/jira/browse/PARQUET-1441, which causes it to fail.  
> We've patched this locally by building the nifi-parquet-processors with 
> parquet-avro 1.11.0, but it would be great if this made it into the next 
> release.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (NIFI-8154) AvroParquetHDFSRecordReader fails to read parquet file containing nested structs

2021-01-20 Thread Glenn Jones (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFI-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Glenn Jones updated NIFI-8154:
--
Attachment: 0001-NIFI-8154-upversion-parquet-avro-to-1.11.1-and-add-u.patch

> AvroParquetHDFSRecordReader fails to read parquet file containing nested 
> structs
> 
>
> Key: NIFI-8154
> URL: https://issues.apache.org/jira/browse/NIFI-8154
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Extensions
>Affects Versions: 1.11.3, 1.12.1
>Reporter: Glenn Jones
>Priority: Minor
> Attachments: 
> 0001-NIFI-8154-upversion-parquet-avro-to-1.11.1-and-add-u.patch
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> FetchParquet can't be used to process files containing nested structs.  When 
> trying to create a RecordSchema it runs into 
> https://issues.apache.org/jira/browse/PARQUET-1441, which causes it to fail.  
> We've patched this locally by building the nifi-parquet-processors with 
> parquet-avro 1.11.0, but it would be great if this made it into the next 
> release.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (NIFI-8150) Change "Download flow" context menu selection for PGs to "Download flow definition"

2021-01-20 Thread Bryan Bende (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFI-8150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bryan Bende resolved NIFI-8150.
---
Fix Version/s: 1.13.0
   Resolution: Fixed

> Change "Download flow" context menu selection for PGs  to "Download flow 
> definition" 
> -
>
> Key: NIFI-8150
> URL: https://issues.apache.org/jira/browse/NIFI-8150
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Core UI
>Reporter: Andrew M. Lim
>Assignee: Andrew M. Lim
>Priority: Major
> Fix For: 1.13.0
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> The current text that says "Download flow" is very ambiguous.  Changing to 
> "Download flow definition" will better alert the user that the action 
> downloads:
> A JSON file that is a flow definition of the process group and not to be 
> confused with a template, flow.xml.gz or a versioned flow. 
> Should update the docs for accordingly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [nifi] bbende merged pull request #4766: NIFI-8150 Change Download flow to Download flow definition for proces…

2021-01-20 Thread GitBox


bbende merged pull request #4766:
URL: https://github.com/apache/nifi/pull/4766


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (NIFI-8150) Change "Download flow" context menu selection for PGs to "Download flow definition"

2021-01-20 Thread ASF subversion and git services (Jira)


[ 
https://issues.apache.org/jira/browse/NIFI-8150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17268870#comment-17268870
 ] 

ASF subversion and git services commented on NIFI-8150:
---

Commit 27f57e64635c9478b811886b6a0b207c5972d5ac in nifi's branch 
refs/heads/main from Andrew Lim
[ https://gitbox.apache.org/repos/asf?p=nifi.git;h=27f57e6 ]

NIFI-8150 Change Download flow to Download flow definition for process groups 
(#4766)



> Change "Download flow" context menu selection for PGs  to "Download flow 
> definition" 
> -
>
> Key: NIFI-8150
> URL: https://issues.apache.org/jira/browse/NIFI-8150
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Core UI
>Reporter: Andrew M. Lim
>Assignee: Andrew M. Lim
>Priority: Major
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> The current text that says "Download flow" is very ambiguous.  Changing to 
> "Download flow definition" will better alert the user that the action 
> downloads:
> A JSON file that is a flow definition of the process group and not to be 
> confused with a template, flow.xml.gz or a versioned flow. 
> Should update the docs for accordingly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [nifi] bbende commented on pull request #4766: NIFI-8150 Change Download flow to Download flow definition for proces…

2021-01-20 Thread GitBox


bbende commented on pull request #4766:
URL: https://github.com/apache/nifi/pull/4766#issuecomment-763942429


   +1 LGTM



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (NIFI-8154) AvroParquetHDFSRecordReader fails to read parquet file containing nested structs

2021-01-20 Thread Glenn Jones (Jira)


[ 
https://issues.apache.org/jira/browse/NIFI-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17268869#comment-17268869
 ] 

Glenn Jones commented on NIFI-8154:
---

The test fails because it expects a field in the Record produced by 
ConvertAvroToParquet to be named "map", but it is actually named "key_value".

In parquet-avro 1.10.0, AvroParquetWriter produces parquet with a schema that 
includes the following definition for the mymap field from the test avro:

required group mymap (MAP) {
 repeated group map (MAP_KEY_VALUE) {
 required binary key (UTF8);
 required int32 value;
 }
 }

This doesn't conform to the Map logical type, but it is within the [backward 
compatibility 
rules|https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules-1]

In parquet-avro 1.11.1, AvroParquetWriter produces the following which I think 
is more correct (the middle level is named "key_value" instead of "map")
 
 required group mymap (MAP) {
 repeated group key_value (MAP_KEY_VALUE) {
 required binary key (STRING);
 required int32 value;
 }
 }

The test uses GroupReadSupport to read the parquet into something it can 
examine and as a result the middle level group name has changed from "map" to 
"key_value".  I doubt that other ReadSupport implementations would expose the 
name of the middle level group in this way, so perhaps this wouldn't have been 
an issue if the tests had used AvroReadSupport.  In any case, I think it's fine 
to simply update the tests to expect the field names from the 1.11.1 
AvroParquetWriter.

> AvroParquetHDFSRecordReader fails to read parquet file containing nested 
> structs
> 
>
> Key: NIFI-8154
> URL: https://issues.apache.org/jira/browse/NIFI-8154
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Extensions
>Affects Versions: 1.11.3, 1.12.1
>Reporter: Glenn Jones
>Priority: Minor
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> FetchParquet can't be used to process files containing nested structs.  When 
> trying to create a RecordSchema it runs into 
> https://issues.apache.org/jira/browse/PARQUET-1441, which causes it to fail.  
> We've patched this locally by building the nifi-parquet-processors with 
> parquet-avro 1.11.0, but it would be great if this made it into the next 
> release.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [nifi] thenatog commented on pull request #4753: NIFI-7356 - Enable TLS for embedded Zookeeper when NiFi has TLS enabled

2021-01-20 Thread GitBox


thenatog commented on pull request #4753:
URL: https://github.com/apache/nifi/pull/4753#issuecomment-763911925


   Updated again. Thanks to both of you for your reviews!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] mtien-apache commented on a change in pull request #4767: NIFI-1355 Implemented new methods in KeyStoreUtils to programmatical…

2021-01-20 Thread GitBox


mtien-apache commented on a change in pull request #4767:
URL: https://github.com/apache/nifi/pull/4767#discussion_r561247262



##
File path: 
nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/KeyStoreUtils.java
##
@@ -125,6 +146,63 @@ public static KeyStore loadKeyStore(String keystorePath, 
char[] keystorePassword
 }
 }
 
+/**
+ * Creates a temporary default Keystore and Truststore and returns it 
wrapped in a TLS configuration.
+ *
+ * @return a {@link org.apache.nifi.security.util.TlsConfiguration}
+ */
+public static TlsConfiguration createTlsConfigAndNewKeystoreTruststore() 
throws IOException, GeneralSecurityException {
+return createTlsConfigAndNewKeystoreTruststore(new 
StandardTlsConfiguration());
+}
+
+/**
+ * Creates a temporary Keystore and Truststore and returns it wrapped in a 
new TLS configuration with the given values.
+ *
+ * @param tlsConfiguration   a {@link 
org.apache.nifi.security.util.TlsConfiguration}
+ * @return a {@link org.apache.nifi.security.util.TlsConfiguration}
+ */
+public static TlsConfiguration 
createTlsConfigAndNewKeystoreTruststore(final TlsConfiguration 
tlsConfiguration) throws IOException, GeneralSecurityException {
+final Path keyStorePath;
+final String keystorePassword = 
StringUtils.isNotBlank(tlsConfiguration.getKeystorePassword()) ? 
tlsConfiguration.getKeystorePassword() : generatePassword();
+final String keyPassword = 
StringUtils.isNotBlank(tlsConfiguration.getKeyPassword())? 
tlsConfiguration.getKeyPassword() : keystorePassword;
+final KeystoreType keystoreType = tlsConfiguration.getKeystoreType() 
!= null ? tlsConfiguration.getKeystoreType() : KeystoreType.PKCS12;
+final Path trustStorePath;
+final String truststorePassword = 
StringUtils.isNotBlank(tlsConfiguration.getTruststorePassword()) ? 
tlsConfiguration.getTruststorePassword() : "";
+final KeystoreType truststoreType = 
tlsConfiguration.getTruststoreType() != null ? 
tlsConfiguration.getTruststoreType() : KeystoreType.PKCS12;
+
+// Create temporary Keystore file
+try {
+keyStorePath = generateTempKeystorePath(keystoreType);
+} catch (IOException e) {
+logger.error(KEYSTORE_ERROR_MSG);
+throw new UncheckedIOException(KEYSTORE_ERROR_MSG, e);
+}
+
+// Create temporary Truststore file
+try {
+trustStorePath = generateTempTruststorePath(truststoreType);
+} catch (IOException e) {
+logger.error(TRUSTSTORE_ERROR_MSG);
+throw new UncheckedIOException(TRUSTSTORE_ERROR_MSG, e);
+}
+
+// Create X509 Certificate
+final X509Certificate clientCert = 
createKeyStoreAndGetX509Certificate(KEY_ALIAS, keystorePassword, keyPassword, 
keyStorePath.toString(), keystoreType);
+
+// Create Truststore
+createTrustStore(clientCert, CERT_ALIAS, truststorePassword, 
trustStorePath.toString(), getKeystoreType(truststoreType.toString()));
+
+return new StandardTlsConfiguration(
+keyStorePath.toString(),
+keystorePassword,
+keyPassword,
+getKeystoreType(keystoreType.toString()),
+trustStorePath.toString(),
+truststorePassword,
+getKeystoreType(truststoreType.toString()),

Review comment:
   The call is not necessary.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] exceptionfactory commented on a change in pull request #4767: NIFI-1355 Implemented new methods in KeyStoreUtils to programmatical…

2021-01-20 Thread GitBox


exceptionfactory commented on a change in pull request #4767:
URL: https://github.com/apache/nifi/pull/4767#discussion_r561195875



##
File path: 
nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/KeyStoreUtils.java
##
@@ -125,6 +146,63 @@ public static KeyStore loadKeyStore(String keystorePath, 
char[] keystorePassword
 }
 }
 
+/**
+ * Creates a temporary default Keystore and Truststore and returns it 
wrapped in a TLS configuration.
+ *
+ * @return a {@link org.apache.nifi.security.util.TlsConfiguration}
+ */
+public static TlsConfiguration createTlsConfigAndNewKeystoreTruststore() 
throws IOException, GeneralSecurityException {
+return createTlsConfigAndNewKeystoreTruststore(new 
StandardTlsConfiguration());
+}
+
+/**
+ * Creates a temporary Keystore and Truststore and returns it wrapped in a 
new TLS configuration with the given values.
+ *
+ * @param tlsConfiguration   a {@link 
org.apache.nifi.security.util.TlsConfiguration}
+ * @return a {@link org.apache.nifi.security.util.TlsConfiguration}
+ */
+public static TlsConfiguration 
createTlsConfigAndNewKeystoreTruststore(final TlsConfiguration 
tlsConfiguration) throws IOException, GeneralSecurityException {
+final Path keyStorePath;
+final String keystorePassword = 
StringUtils.isNotBlank(tlsConfiguration.getKeystorePassword()) ? 
tlsConfiguration.getKeystorePassword() : generatePassword();
+final String keyPassword = 
StringUtils.isNotBlank(tlsConfiguration.getKeyPassword())? 
tlsConfiguration.getKeyPassword() : keystorePassword;
+final KeystoreType keystoreType = tlsConfiguration.getKeystoreType() 
!= null ? tlsConfiguration.getKeystoreType() : KeystoreType.PKCS12;
+final Path trustStorePath;
+final String truststorePassword = 
StringUtils.isNotBlank(tlsConfiguration.getTruststorePassword()) ? 
tlsConfiguration.getTruststorePassword() : "";
+final KeystoreType truststoreType = 
tlsConfiguration.getTruststoreType() != null ? 
tlsConfiguration.getTruststoreType() : KeystoreType.PKCS12;
+
+// Create temporary Keystore file
+try {
+keyStorePath = generateTempKeystorePath(keystoreType);
+} catch (IOException e) {
+logger.error(KEYSTORE_ERROR_MSG);
+throw new UncheckedIOException(KEYSTORE_ERROR_MSG, e);
+}
+
+// Create temporary Truststore file
+try {
+trustStorePath = generateTempTruststorePath(truststoreType);
+} catch (IOException e) {
+logger.error(TRUSTSTORE_ERROR_MSG);
+throw new UncheckedIOException(TRUSTSTORE_ERROR_MSG, e);
+}
+
+// Create X509 Certificate
+final X509Certificate clientCert = 
createKeyStoreAndGetX509Certificate(KEY_ALIAS, keystorePassword, keyPassword, 
keyStorePath.toString(), keystoreType);
+
+// Create Truststore
+createTrustStore(clientCert, CERT_ALIAS, truststorePassword, 
trustStorePath.toString(), getKeystoreType(truststoreType.toString()));
+
+return new StandardTlsConfiguration(
+keyStorePath.toString(),
+keystorePassword,
+keyPassword,
+getKeystoreType(keystoreType.toString()),
+trustStorePath.toString(),
+truststorePassword,
+getKeystoreType(truststoreType.toString()),

Review comment:
   Are these calls to `getKeystoreType()` necessary?  It looks like the 
values are already of the `KeystoreType` enum.
   ```suggestion
   truststoreType,
   ```

##
File path: 
nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/KeyStoreUtils.java
##
@@ -125,6 +146,63 @@ public static KeyStore loadKeyStore(String keystorePath, 
char[] keystorePassword
 }
 }
 
+/**
+ * Creates a temporary default Keystore and Truststore and returns it 
wrapped in a TLS configuration.
+ *
+ * @return a {@link org.apache.nifi.security.util.TlsConfiguration}
+ */
+public static TlsConfiguration createTlsConfigAndNewKeystoreTruststore() 
throws IOException, GeneralSecurityException {
+return createTlsConfigAndNewKeystoreTruststore(new 
StandardTlsConfiguration());
+}
+
+/**
+ * Creates a temporary Keystore and Truststore and returns it wrapped in a 
new TLS configuration with the given values.
+ *
+ * @param tlsConfiguration   a {@link 
org.apache.nifi.security.util.TlsConfiguration}
+ * @return a {@link org.apache.nifi.security.util.TlsConfiguration}
+ */
+public static TlsConfiguration 
createTlsConfigAndNewKeystoreTruststore(final TlsConfiguration 
tlsConfiguration) throws IOException, GeneralSecurityException {
+final Path keyStorePath;
+final String keystorePassword = 
StringUtils.isNotBlank(tlsConfiguration.getKeystorePassword()) ? 

[GitHub] [nifi] jfrazee commented on pull request #4753: NIFI-7356 - Enable TLS for embedded Zookeeper when NiFi has TLS enabled

2021-01-20 Thread GitBox


jfrazee commented on pull request #4753:
URL: https://github.com/apache/nifi/pull/4753#issuecomment-763887511


   I did a full round of testing yesterday and everything looked good. Will do 
another round today with the updates.
   
   On the docs side of things I think we need a stronger call out to the 
behavioral changes. I made some suggestions.
   
   We'll also need to add some migration guidance for the wiki.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] jfrazee edited a comment on pull request #4753: NIFI-7356 - Enable TLS for embedded Zookeeper when NiFi has TLS enabled

2021-01-20 Thread GitBox


jfrazee edited a comment on pull request #4753:
URL: https://github.com/apache/nifi/pull/4753#issuecomment-763887511


   I did a full round of testing yesterday and everything looked good. Will do 
another round today with the updates.
   
   On the docs side of things I think we need a stronger call out to the 
behavioral changes. I made some suggestions.
   
   We'll also need to add some migration guidance to the wiki.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] jfrazee commented on a change in pull request #4753: NIFI-7356 - Enable TLS for embedded Zookeeper when NiFi has TLS enabled

2021-01-20 Thread GitBox


jfrazee commented on a change in pull request #4753:
URL: https://github.com/apache/nifi/pull/4753#discussion_r561228356



##
File path: nifi-docs/src/main/asciidoc/administration-guide.adoc
##
@@ -2246,6 +2246,53 @@ _true_. Once Netty is enabled, you should see log 
messages like the following in
 2020-02-24 23:37:54,082 INFO [nioEventLoopGroup-3-1] 
o.apache.zookeeper.ClientCnxnSocketNetty SSL handler added for channel: [id: 
0xa831f9c3]
 2020-02-24 23:37:54,104 INFO [nioEventLoopGroup-3-1] 
o.apache.zookeeper.ClientCnxnSocketNetty channel is connected: [id: 0xa831f9c3, 
L:/172.17.0.4:56510 - R:8e38869cd1d1/172.17.0.3:2281]
 
+=== Embedded ZooKeeper with TLS
+
+A NiFi cluster can also be deployed using a ZooKeeper instance(s) embedded in 
NiFi itself which all nodes can communicate with. Communication between nodes 
and this embedded ZooKeeper can also be secured with TLS. The configuration for 
the client side of the connection will operate in the same way as an external 
ZooKeeper. That is, it will use the `+nifi.security.*+` properties from the 
nifi.properties file by default, unless you specifiy explicit ZooKeeper 
keystore/truststore properties with `+nifi.zookeeper.security.*+` as described 
above.

Review comment:
   "Communication between nodes and this embedded ZooKeeper can also be 
secured with TLS." => "Communication between nodes and this embedded ZooKeeper 
will be secured with TLS if NiFi is secured with TLS. Versions of NiFi prior to 
1.13 did not use secure client access with embedded ZooKeeper(s)."





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] turcsanyip commented on a change in pull request #4746: NIFI-8034: Fixed PropertyValue.isExpressionLanguagePresent always ret…

2021-01-20 Thread GitBox


turcsanyip commented on a change in pull request #4746:
URL: https://github.com/apache/nifi/pull/4746#discussion_r561226467



##
File path: 
nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestStandardPreparedQuery.java
##
@@ -309,6 +309,24 @@ public void testVariableImpacted() {
 
assertTrue(Query.prepare("${anyMatchingAttribute('a.*'):equals('hello')}").getVariableImpact().isImpacted("attr"));
 }
 
+@Test
+public void testIsExpressionLanguagePresent() {
+assertFalse(Query.prepare("value").isExpressionLanguagePresent());
+assertFalse(Query.prepare("").isExpressionLanguagePresent());
+
+assertTrue(Query.prepare("${variable}").isExpressionLanguagePresent());
+
assertTrue(Query.prepare("${hostname()}").isExpressionLanguagePresent());
+
assertTrue(Query.prepare("${hostname():equals('localhost')}").isExpressionLanguagePresent());
+
assertTrue(Query.prepare("prefix-${hostname()}").isExpressionLanguagePresent());
+
assertTrue(Query.prepare("${hostname()}-suffix").isExpressionLanguagePresent());
+
assertTrue(Query.prepare("${variable1}${hostname()}${variable2}").isExpressionLanguagePresent());
+
assertTrue(Query.prepare("${${variable}}").isExpressionLanguagePresent());
+
+assertFalse(Query.prepare("${}").isExpressionLanguagePresent());
+
+assertTrue(Query.prepare("#{param}").isExpressionLanguagePresent());

Review comment:
   Thanks @markap14. Changed the logic, also rebased the branch.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] jfrazee commented on a change in pull request #4753: NIFI-7356 - Enable TLS for embedded Zookeeper when NiFi has TLS enabled

2021-01-20 Thread GitBox


jfrazee commented on a change in pull request #4753:
URL: https://github.com/apache/nifi/pull/4753#discussion_r561224503



##
File path: nifi-docs/src/main/asciidoc/administration-guide.adoc
##
@@ -2246,6 +2246,53 @@ _true_. Once Netty is enabled, you should see log 
messages like the following in
 2020-02-24 23:37:54,082 INFO [nioEventLoopGroup-3-1] 
o.apache.zookeeper.ClientCnxnSocketNetty SSL handler added for channel: [id: 
0xa831f9c3]
 2020-02-24 23:37:54,104 INFO [nioEventLoopGroup-3-1] 
o.apache.zookeeper.ClientCnxnSocketNetty channel is connected: [id: 0xa831f9c3, 
L:/172.17.0.4:56510 - R:8e38869cd1d1/172.17.0.3:2281]
 
+=== Embedded ZooKeeper with TLS
+
+A NiFi cluster can also be deployed using a ZooKeeper instance(s) embedded in 
NiFi itself which all nodes can communicate with. Communication between nodes 
and this embedded ZooKeeper can also be secured with TLS. The configuration for 
the client side of the connection will operate in the same way as an external 
ZooKeeper. That is, it will use the `+nifi.security.*+` properties from the 
nifi.properties file by default, unless you specifiy explicit ZooKeeper 
keystore/truststore properties with `+nifi.zookeeper.security.*+` as described 
above.
+
+The server configuration will operate in the same way as an insecure embedded 
server, but with the `+secureClientPort+` set (typically port `+2281+`).
+

Review comment:
   We should add something like this:
   ```suggestion
   NOTE: When using a secure server, the secure embedded ZooKeeper server 
ignores any +clientPort+ or +clientPortAddress+ specified in 
_$NIFI_HOME/conf/zookeeper.properties_. I.e., if the NiFi-embedded ZooKeeper 
exposes a +secureClientPort+ it will not expose an insecure +clientPort+ 
regardless of configuration. This is a behavioral difference between the 
embedded server and an external ZooKeeper server.
   
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] thenatog commented on a change in pull request #4753: NIFI-7356 - Enable TLS for embedded Zookeeper when NiFi has TLS enabled

2021-01-20 Thread GitBox


thenatog commented on a change in pull request #4753:
URL: https://github.com/apache/nifi/pull/4753#discussion_r561219030



##
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java
##
@@ -198,6 +219,144 @@ public static ZooKeeperStateServer create(final 
NiFiProperties properties) throw
 zkProperties.load(bis);
 }
 
-return new ZooKeeperStateServer(zkProperties);
+return new ZooKeeperStateServer(reconcileProperties(properties, 
zkProperties));
+}
+
+/**
+ * Reconcile properties between the nifi.properties and 
zookeeper.properties (zoo.cfg) files. Most of the ZooKeeper server properties 
are derived from
+ * the zookeeper.properties file, while the TLS key/truststore properties 
are taken from nifi.properties.
+ * @param niFiProperties NiFiProperties file containing ZooKeeper client 
and TLS configuration
+ * @param zkProperties The zookeeper.properties file containing Zookeeper 
server configuration
+ * @return A reconciled QuorumPeerConfig which will include TLS properties 
set if they are available.
+ * @throws IOException If configuration files fail to parse.
+ * @throws ConfigException If secure configuration is not as expected. 
Check administration documentation.
+ */
+private static QuorumPeerConfig reconcileProperties(NiFiProperties 
niFiProperties, Properties zkProperties) throws IOException, ConfigException {
+QuorumPeerConfig peerConfig = new QuorumPeerConfig();
+peerConfig.parseProperties(zkProperties);
+
+final boolean niFiConfigIsSecure = 
isNiFiConfigSecureForZooKeeper(niFiProperties);
+final boolean zooKeeperConfigIsSecure = 
isZooKeeperConfigSecure(peerConfig);
+
+if (!zooKeeperConfigIsSecure && !niFiConfigIsSecure) {
+logger.info("{} property is set to false or is not present, and 
zookeeper.properties file does not contain secureClientPort property, so 
embedded ZooKeeper will be started without TLS.",
+NiFiProperties.ZOOKEEPER_CLIENT_SECURE);
+return peerConfig;
+}
+
+// If secureClientPort is set but no TLS config is set, fail to start.
+if (zooKeeperConfigIsSecure && !niFiConfigIsSecure) {
+throw new ConfigException(
+String.format("Zookeeper properties file %s was configured 
to be secure but there was no valid TLS config present in nifi.properties or " +
+  "nifi.zookeeper.client.secure was set to 
false. Check the administration guide.",
+   
niFiProperties.getProperty(NiFiProperties.STATE_MANAGEMENT_ZOOKEEPER_PROPERTIES)));
+}
+
+// Remove any insecure ports if they were set in zookeeper.properties
+ensureOnlySecurePortsAreEnabled(peerConfig, zkProperties);
+
+// Set base ZooKeeper TLS server properties
+setTlsProperties(zkProperties, new ZooKeeperServerX509Util(), 
niFiProperties);
+// Set quorum ZooKeeper TLS server properties
+setTlsProperties(zkProperties, new ZooKeeperQuorumX509Util(), 
niFiProperties);
+// Set TLS client port:
+zkProperties.setProperty("secureClientPort", 
getSecurePort(peerConfig));
+
+// Set the required connection factory for TLS
+zkProperties.setProperty(ZOOKEEPER_SERVER_CNXN_FACTORY, 
NettyServerCnxnFactory.class.getName());
+zkProperties.setProperty(ZOOKEEPER_SSL_QUORUM, 
Boolean.TRUE.toString());

Review comment:
   setTlsProperties() is setting the 'system' level properties for the 
keystores ie. ssl.keyStore.location and ssl.quorum.keyStore.location. These 
above properties are only set once and do not use the same property naming 
scheme.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] jfrazee commented on a change in pull request #4753: NIFI-7356 - Enable TLS for embedded Zookeeper when NiFi has TLS enabled

2021-01-20 Thread GitBox


jfrazee commented on a change in pull request #4753:
URL: https://github.com/apache/nifi/pull/4753#discussion_r561157059



##
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java
##
@@ -136,13 +156,14 @@ private void startDistributed() throws IOException {
 quorumPeer.setInitLimit(quorumPeerConfig.getInitLimit());
 quorumPeer.setSyncLimit(quorumPeerConfig.getSyncLimit());
 quorumPeer.setQuorumVerifier(quorumPeerConfig.getQuorumVerifier(), 
false);
-quorumPeer.setCnxnFactory(connectionFactory);
 quorumPeer.setZKDatabase(new 
ZKDatabase(quorumPeer.getTxnFactory()));
 quorumPeer.setLearnerType(quorumPeerConfig.getPeerType());
 quorumPeer.setSyncEnabled(quorumPeerConfig.getSyncEnabled());
 
quorumPeer.setQuorumListenOnAllIPs(quorumPeerConfig.getQuorumListenOnAllIPs());
+quorumPeer.setSslQuorum(quorumPeerConfig.isSslQuorum());
 
 quorumPeer.start();
+

Review comment:
   ```suggestion
   ```

##
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java
##
@@ -198,6 +219,144 @@ public static ZooKeeperStateServer create(final 
NiFiProperties properties) throw
 zkProperties.load(bis);
 }
 
-return new ZooKeeperStateServer(zkProperties);
+return new ZooKeeperStateServer(reconcileProperties(properties, 
zkProperties));
+}
+
+/**
+ * Reconcile properties between the nifi.properties and 
zookeeper.properties (zoo.cfg) files. Most of the ZooKeeper server properties 
are derived from
+ * the zookeeper.properties file, while the TLS key/truststore properties 
are taken from nifi.properties.
+ * @param niFiProperties NiFiProperties file containing ZooKeeper client 
and TLS configuration
+ * @param zkProperties The zookeeper.properties file containing Zookeeper 
server configuration

Review comment:
   ```suggestion
* @param zkProperties The zookeeper.properties file containing 
ZooKeeper server configuration
   ```

##
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java
##
@@ -198,6 +219,144 @@ public static ZooKeeperStateServer create(final 
NiFiProperties properties) throw
 zkProperties.load(bis);
 }
 
-return new ZooKeeperStateServer(zkProperties);
+return new ZooKeeperStateServer(reconcileProperties(properties, 
zkProperties));
+}
+
+/**
+ * Reconcile properties between the nifi.properties and 
zookeeper.properties (zoo.cfg) files. Most of the ZooKeeper server properties 
are derived from
+ * the zookeeper.properties file, while the TLS key/truststore properties 
are taken from nifi.properties.
+ * @param niFiProperties NiFiProperties file containing ZooKeeper client 
and TLS configuration
+ * @param zkProperties The zookeeper.properties file containing Zookeeper 
server configuration
+ * @return A reconciled QuorumPeerConfig which will include TLS properties 
set if they are available.
+ * @throws IOException If configuration files fail to parse.
+ * @throws ConfigException If secure configuration is not as expected. 
Check administration documentation.
+ */
+private static QuorumPeerConfig reconcileProperties(NiFiProperties 
niFiProperties, Properties zkProperties) throws IOException, ConfigException {
+QuorumPeerConfig peerConfig = new QuorumPeerConfig();
+peerConfig.parseProperties(zkProperties);
+
+final boolean niFiConfigIsSecure = 
isNiFiConfigSecureForZooKeeper(niFiProperties);
+final boolean zooKeeperConfigIsSecure = 
isZooKeeperConfigSecure(peerConfig);
+
+if (!zooKeeperConfigIsSecure && !niFiConfigIsSecure) {
+logger.info("{} property is set to false or is not present, and 
zookeeper.properties file does not contain secureClientPort property, so 
embedded ZooKeeper will be started without TLS.",
+NiFiProperties.ZOOKEEPER_CLIENT_SECURE);
+return peerConfig;
+}
+
+// If secureClientPort is set but no TLS config is set, fail to start.
+if (zooKeeperConfigIsSecure && !niFiConfigIsSecure) {
+throw new ConfigException(
+String.format("Zookeeper properties file %s was configured 
to be secure but there was no valid TLS config present in nifi.properties or " +
+  "nifi.zookeeper.client.secure was set to 
false. Check the administration guide.",
+   
niFiProperties.getProperty(NiFiProperties.STATE_MANAGEMENT_ZOOKEEPER_PROPERTIES)));
+}
+
+// Remove 

[GitHub] [nifi] exceptionfactory commented on pull request #4734: NIFI-8023 Added toLocalDate() and updated toDate() in DataTypeUtils

2021-01-20 Thread GitBox


exceptionfactory commented on pull request #4734:
URL: https://github.com/apache/nifi/pull/4734#issuecomment-763865777


   @adenes After evaluating the unit tests mentioned, I found that each one was 
performing date conversion in order to compare the expected results.  The 
conversion changed the expected value, which was throwing off the tests.  I 
updated the tests and confirmed successful test completion with the CET time 
zone.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (NIFI-8146) Allow RecordPath to be used for specifying operation type and data fields when using PutDatabaseRecord

2021-01-20 Thread Matt Burgess (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFI-8146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matt Burgess resolved NIFI-8146.

Resolution: Fixed

> Allow RecordPath to be used for specifying operation type and data fields 
> when using PutDatabaseRecord
> --
>
> Key: NIFI-8146
> URL: https://issues.apache.org/jira/browse/NIFI-8146
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Reporter: Mark Payne
>Assignee: Mark Payne
>Priority: Major
> Fix For: 1.13.0
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> PutDatbaseRecord requires that the Statement Type be defined as a property or 
> a FlowFile attribute. This means that if a FlowFile has many records, it must 
> be split apart into individual Records if there is more than 1 type of 
> statement needed per FlowFile.
> It also assumes that the data to be inserted/updated/deleted/etc is the full 
> record. However, it's common to have some wrapper around the actual data, as 
> is the case with a tool like Debezium, which includes an Operation Type, a 
> 'before' snapshot and an 'after' snapshot. To accommodate this, we should 
> allow Record-friendly methods for specifying the path to the data and the 
> operation type. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (NIFI-8146) Allow RecordPath to be used for specifying operation type and data fields when using PutDatabaseRecord

2021-01-20 Thread ASF subversion and git services (Jira)


[ 
https://issues.apache.org/jira/browse/NIFI-8146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17268778#comment-17268778
 ] 

ASF subversion and git services commented on NIFI-8146:
---

Commit 803ba882aa15142a9986dc0c23bbf4db11fe15a7 in nifi's branch 
refs/heads/main from Mark Payne
[ https://gitbox.apache.org/repos/asf?p=nifi.git;h=803ba88 ]

NIFI-8146: Ensure that we close the Connection/Statement/PreparedStatement 
objects in finally blocks or try-with-resources

Signed-off-by: Matthew Burgess 

This closes #4770


> Allow RecordPath to be used for specifying operation type and data fields 
> when using PutDatabaseRecord
> --
>
> Key: NIFI-8146
> URL: https://issues.apache.org/jira/browse/NIFI-8146
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Reporter: Mark Payne
>Assignee: Mark Payne
>Priority: Major
> Fix For: 1.13.0
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> PutDatbaseRecord requires that the Statement Type be defined as a property or 
> a FlowFile attribute. This means that if a FlowFile has many records, it must 
> be split apart into individual Records if there is more than 1 type of 
> statement needed per FlowFile.
> It also assumes that the data to be inserted/updated/deleted/etc is the full 
> record. However, it's common to have some wrapper around the actual data, as 
> is the case with a tool like Debezium, which includes an Operation Type, a 
> 'before' snapshot and an 'after' snapshot. To accommodate this, we should 
> allow Record-friendly methods for specifying the path to the data and the 
> operation type. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [nifi] mattyb149 closed pull request #4770: NIFI-8146: Ensure that we close the Connection/Statement/PreparedStat…

2021-01-20 Thread GitBox


mattyb149 closed pull request #4770:
URL: https://github.com/apache/nifi/pull/4770


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] mattyb149 commented on pull request #4770: NIFI-8146: Ensure that we close the Connection/Statement/PreparedStat…

2021-01-20 Thread GitBox


mattyb149 commented on pull request #4770:
URL: https://github.com/apache/nifi/pull/4770#issuecomment-763856888


   +1 LGTM, tested scenarios with debugger and verified connections and 
statements were closed. Thanks for the fix! Merging to main



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] markap14 commented on a change in pull request #4746: NIFI-8034: Fixed PropertyValue.isExpressionLanguagePresent always ret…

2021-01-20 Thread GitBox


markap14 commented on a change in pull request #4746:
URL: https://github.com/apache/nifi/pull/4746#discussion_r561166527



##
File path: 
nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestStandardPreparedQuery.java
##
@@ -309,6 +309,24 @@ public void testVariableImpacted() {
 
assertTrue(Query.prepare("${anyMatchingAttribute('a.*'):equals('hello')}").getVariableImpact().isImpacted("attr"));
 }
 
+@Test
+public void testIsExpressionLanguagePresent() {
+assertFalse(Query.prepare("value").isExpressionLanguagePresent());
+assertFalse(Query.prepare("").isExpressionLanguagePresent());
+
+assertTrue(Query.prepare("${variable}").isExpressionLanguagePresent());
+
assertTrue(Query.prepare("${hostname()}").isExpressionLanguagePresent());
+
assertTrue(Query.prepare("${hostname():equals('localhost')}").isExpressionLanguagePresent());
+
assertTrue(Query.prepare("prefix-${hostname()}").isExpressionLanguagePresent());
+
assertTrue(Query.prepare("${hostname()}-suffix").isExpressionLanguagePresent());
+
assertTrue(Query.prepare("${variable1}${hostname()}${variable2}").isExpressionLanguagePresent());
+
assertTrue(Query.prepare("${${variable}}").isExpressionLanguagePresent());
+
+assertFalse(Query.prepare("${}").isExpressionLanguagePresent());
+
+assertTrue(Query.prepare("#{param}").isExpressionLanguagePresent());

Review comment:
   Well, that's interesting. The user guide says:
   ```
   Value - The value that will be used when the Parameter is referenced. 
Parameter values do not support Expression Language or embedded parameter 
references.
   ```
   
   Parameters are not supposed to be able to make use of EL. But I just tried 
it with UpdateAttribute, and interestingly it did evaluate the reference. I 
think that is actually a bug, though. So I would tend to say that we should 
return `false` since Parameters are not supposed to support EL.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (NIFI-8154) AvroParquetHDFSRecordReader fails to read parquet file containing nested structs

2021-01-20 Thread Glenn Jones (Jira)


[ 
https://issues.apache.org/jira/browse/NIFI-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17268733#comment-17268733
 ] 

Glenn Jones commented on NIFI-8154:
---

I see what you mean [~pvillard].  I think the failure in 
[TestConvertAvroToParquet.test_Data()|[https://github.com/apache/nifi/blob/25ab050ed78cedd11450cdc7e3165a58682fddb2/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/TestConvertAvroToParquet.java#L186]]
 may be related to 
[PARQUET-1879|https://issues.apache.org/jira/browse/PARQUET-1879]. I'll try to 
figure out what's going on.

> AvroParquetHDFSRecordReader fails to read parquet file containing nested 
> structs
> 
>
> Key: NIFI-8154
> URL: https://issues.apache.org/jira/browse/NIFI-8154
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Extensions
>Affects Versions: 1.11.3, 1.12.1
>Reporter: Glenn Jones
>Priority: Minor
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> FetchParquet can't be used to process files containing nested structs.  When 
> trying to create a RecordSchema it runs into 
> https://issues.apache.org/jira/browse/PARQUET-1441, which causes it to fail.  
> We've patched this locally by building the nifi-parquet-processors with 
> parquet-avro 1.11.0, but it would be great if this made it into the next 
> release.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [nifi] mtien-apache commented on pull request #4767: NIFI-1355 Implemented new methods in KeyStoreUtils to programmatical…

2021-01-20 Thread GitBox


mtien-apache commented on pull request #4767:
URL: https://github.com/apache/nifi/pull/4767#issuecomment-763823481


   Fixing the build failure now.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] turcsanyip commented on a change in pull request #4746: NIFI-8034: Fixed PropertyValue.isExpressionLanguagePresent always ret…

2021-01-20 Thread GitBox


turcsanyip commented on a change in pull request #4746:
URL: https://github.com/apache/nifi/pull/4746#discussion_r561153888



##
File path: 
nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestStandardPreparedQuery.java
##
@@ -309,6 +309,24 @@ public void testVariableImpacted() {
 
assertTrue(Query.prepare("${anyMatchingAttribute('a.*'):equals('hello')}").getVariableImpact().isImpacted("attr"));
 }
 
+@Test
+public void testIsExpressionLanguagePresent() {
+assertFalse(Query.prepare("value").isExpressionLanguagePresent());
+assertFalse(Query.prepare("").isExpressionLanguagePresent());
+
+assertTrue(Query.prepare("${variable}").isExpressionLanguagePresent());
+
assertTrue(Query.prepare("${hostname()}").isExpressionLanguagePresent());
+
assertTrue(Query.prepare("${hostname():equals('localhost')}").isExpressionLanguagePresent());
+
assertTrue(Query.prepare("prefix-${hostname()}").isExpressionLanguagePresent());
+
assertTrue(Query.prepare("${hostname()}-suffix").isExpressionLanguagePresent());
+
assertTrue(Query.prepare("${variable1}${hostname()}${variable2}").isExpressionLanguagePresent());
+
assertTrue(Query.prepare("${${variable}}").isExpressionLanguagePresent());
+
+assertFalse(Query.prepare("${}").isExpressionLanguagePresent());
+
+assertTrue(Query.prepare("#{param}").isExpressionLanguagePresent());

Review comment:
   The parameter may or may not contain EL. My idea was to handle it as 
"unknown" and I felt it safer to say `true` in this case.
   
   I ran into this "isExpressionLanguagePresent always returns true" issue in 
`customValidate()` and the parameter reference is already resolved when 
`customValidate()` gets called. So only `StringLiteralExpression` (non-EL) and 
`CompiledExpression` (EL) can occur there but no `ParameterExpression`. I was 
uncertain about `ParameterExpression` because I do not know where else it is 
being used in context of EL.
   
   If you confirm that returning `false` is the right way I can easily change 
the logic. I just wanted to explain what my approach was.
   
   Thanks





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] andrewmlim commented on pull request #4766: NIFI-8150 Change Download flow to Download flow definition for proces…

2021-01-20 Thread GitBox


andrewmlim commented on pull request #4766:
URL: https://github.com/apache/nifi/pull/4766#issuecomment-763813787


   Thanks for taking a look @exceptionfactory! Appreciate it!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] exceptionfactory commented on pull request #4734: NIFI-8023 Added toLocalDate() and updated toDate() in DataTypeUtils

2021-01-20 Thread GitBox


exceptionfactory commented on pull request #4734:
URL: https://github.com/apache/nifi/pull/4734#issuecomment-763806901


   > @exceptionfactory , thanks for the follow-up commits. I ran the tests with 
CET system TZ and unfortunately some test cases fail:
   > 
   > ```
   > [ERROR] Failures:
   > [ERROR]   
TestAvroReaderWithEmbeddedSchema.testLogicalTypes:66->testLogicalTypes:125 
expected:<2017-04-0[4]> but was:<2017-04-0[3]>
   > [ERROR]   
TestAvroReaderWithEmbeddedSchema.testNullableLogicalTypes:72->testLogicalTypes:125
 expected:<2017-04-0[4]> but was:<2017-04-0[3]>
   > [ERROR]   TestCSVRecordReader.testDate:119 expected:<30> but was:<29>
   > [ERROR]   TestCSVRecordReader.testDateNoCoersionExpectedFormat:163 
expected:<30> but was:<29>
   > [ERROR]   TestJacksonCSVRecordReader.testDate:105 expected:<30> but 
was:<29>
   > ```
   > 
   > Could you please have a look at them?
   
   Thanks @adenes I will take a look at those tests.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] markap14 commented on a change in pull request #4730: NIFI-8095: Created StatelessNiFi Sink Connector and Source Connector.…

2021-01-20 Thread GitBox


markap14 commented on a change in pull request #4730:
URL: https://github.com/apache/nifi/pull/4730#discussion_r561138544



##
File path: 
nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSourceTask.java
##
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.header.ConnectHeaders;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.stateless.flow.DataflowTrigger;
+import org.apache.nifi.stateless.flow.StatelessDataflow;
+import org.apache.nifi.stateless.flow.TriggerResult;
+import org.apache.nifi.util.FormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
+
+public class StatelessNiFiSourceTask extends SourceTask {
+public static final String STATE_MAP_KEY = "task.index";
+private static final Logger logger = 
LoggerFactory.getLogger(StatelessNiFiSourceTask.class);
+
+private StatelessDataflow dataflow;
+private String outputPortName;
+private String topicName;
+private String topicNameAttribute;
+private TriggerResult triggerResult;
+private String keyAttributeName;
+private Pattern headerAttributeNamePattern;
+private long timeoutMillis;
+private String dataflowName;
+private long failureYieldExpiration = 0L;
+
+private final Map clusterStatePartitionMap = 
Collections.singletonMap(STATE_MAP_KEY, "CLUSTER");
+private Map localStatePartitionMap = new HashMap<>();
+private boolean primaryNodeOnly;
+private boolean primaryNodeTask;
+
+private final AtomicLong unacknowledgedRecords = new AtomicLong(0L);
+
+@Override
+public String version() {
+return StatelessKafkaConnectorUtil.getVersion();
+}
+
+@Override
+public void start(final Map properties) {
+logger.info("Starting Source Task with properties {}", 
StatelessKafkaConnectorUtil.getLoggableProperties(properties));
+
+final String timeout = 
properties.getOrDefault(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, 
StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT);
+timeoutMillis = (long) FormatUtils.getPreciseTimeDuration(timeout, 
TimeUnit.MILLISECONDS);
+
+topicName = properties.get(StatelessNiFiSourceConnector.TOPIC_NAME);
+topicNameAttribute = 
properties.get(StatelessNiFiSourceConnector.TOPIC_NAME_ATTRIBUTE);
+keyAttributeName = 
properties.get(StatelessNiFiSourceConnector.KEY_ATTRIBUTE);
+
+if (topicName == null && topicNameAttribute == null) {
+throw new ConfigException("Either the topic.name or 
topic.name.attribute configuration must be specified");
+}
+
+final String headerRegex = 
properties.get(StatelessNiFiSourceConnector.HEADER_REGEX);
+headerAttributeNamePattern = headerRegex == null ? null : 
Pattern.compile(headerRegex);
+
+dataflow = StatelessKafkaConnectorUtil.createDataflow(properties);
+primaryNodeOnly = dataflow.isSourcePrimaryNodeOnly();
+
+// Determine the name of the Output Port to retrieve data from
+dataflowName = 
properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME);
+outputPortName = 
properties.get(StatelessNiFiSourceConnector.OUTPUT_PORT_NAME);
+if (outputPortName == null) {
+final Set outputPorts = dataflow.getOutputPortNames();
+if (outputPorts.isEmpty()) {
+throw new ConfigException("The dataflow specified for <" + 

[GitHub] [nifi] markap14 commented on a change in pull request #4730: NIFI-8095: Created StatelessNiFi Sink Connector and Source Connector.…

2021-01-20 Thread GitBox


markap14 commented on a change in pull request #4730:
URL: https://github.com/apache/nifi/pull/4730#discussion_r561138442



##
File path: 
nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSourceTask.java
##
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.header.ConnectHeaders;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.stateless.flow.DataflowTrigger;
+import org.apache.nifi.stateless.flow.StatelessDataflow;
+import org.apache.nifi.stateless.flow.TriggerResult;
+import org.apache.nifi.util.FormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
+
+public class StatelessNiFiSourceTask extends SourceTask {
+public static final String STATE_MAP_KEY = "task.index";
+private static final Logger logger = 
LoggerFactory.getLogger(StatelessNiFiSourceTask.class);
+
+private StatelessDataflow dataflow;
+private String outputPortName;
+private String topicName;
+private String topicNameAttribute;
+private TriggerResult triggerResult;
+private String keyAttributeName;
+private Pattern headerAttributeNamePattern;
+private long timeoutMillis;
+private String dataflowName;
+private long failureYieldExpiration = 0L;
+
+private final Map clusterStatePartitionMap = 
Collections.singletonMap(STATE_MAP_KEY, "CLUSTER");
+private Map localStatePartitionMap = new HashMap<>();
+private boolean primaryNodeOnly;
+private boolean primaryNodeTask;
+
+private final AtomicLong unacknowledgedRecords = new AtomicLong(0L);
+
+@Override
+public String version() {
+return StatelessKafkaConnectorUtil.getVersion();
+}
+
+@Override
+public void start(final Map properties) {
+logger.info("Starting Source Task with properties {}", 
StatelessKafkaConnectorUtil.getLoggableProperties(properties));
+
+final String timeout = 
properties.getOrDefault(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, 
StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT);
+timeoutMillis = (long) FormatUtils.getPreciseTimeDuration(timeout, 
TimeUnit.MILLISECONDS);
+
+topicName = properties.get(StatelessNiFiSourceConnector.TOPIC_NAME);
+topicNameAttribute = 
properties.get(StatelessNiFiSourceConnector.TOPIC_NAME_ATTRIBUTE);
+keyAttributeName = 
properties.get(StatelessNiFiSourceConnector.KEY_ATTRIBUTE);
+
+if (topicName == null && topicNameAttribute == null) {
+throw new ConfigException("Either the topic.name or 
topic.name.attribute configuration must be specified");
+}
+
+final String headerRegex = 
properties.get(StatelessNiFiSourceConnector.HEADER_REGEX);
+headerAttributeNamePattern = headerRegex == null ? null : 
Pattern.compile(headerRegex);
+
+dataflow = StatelessKafkaConnectorUtil.createDataflow(properties);
+primaryNodeOnly = dataflow.isSourcePrimaryNodeOnly();
+
+// Determine the name of the Output Port to retrieve data from
+dataflowName = 
properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME);
+outputPortName = 
properties.get(StatelessNiFiSourceConnector.OUTPUT_PORT_NAME);
+if (outputPortName == null) {
+final Set outputPorts = dataflow.getOutputPortNames();
+if (outputPorts.isEmpty()) {
+throw new ConfigException("The dataflow specified for <" + 

[GitHub] [nifi] adenes commented on pull request #4734: NIFI-8023 Added toLocalDate() and updated toDate() in DataTypeUtils

2021-01-20 Thread GitBox


adenes commented on pull request #4734:
URL: https://github.com/apache/nifi/pull/4734#issuecomment-763799131


   @exceptionfactory , thanks for the follow-up commits. I ran the tests with 
CET system TZ and unfortunately some test cases fail:
   ```
   [ERROR] Failures:
   [ERROR]   
TestAvroReaderWithEmbeddedSchema.testLogicalTypes:66->testLogicalTypes:125 
expected:<2017-04-0[4]> but was:<2017-04-0[3]>
   [ERROR]   
TestAvroReaderWithEmbeddedSchema.testNullableLogicalTypes:72->testLogicalTypes:125
 expected:<2017-04-0[4]> but was:<2017-04-0[3]>
   [ERROR]   TestCSVRecordReader.testDate:119 expected:<30> but was:<29>
   [ERROR]   TestCSVRecordReader.testDateNoCoersionExpectedFormat:163 
expected:<30> but was:<29>
   [ERROR]   TestJacksonCSVRecordReader.testDate:105 expected:<30> but was:<29>
   ```
   
   Could you please have a look at them?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] exceptionfactory commented on a change in pull request #4753: NIFI-7356 - Enable TLS for embedded Zookeeper when NiFi has TLS enabled

2021-01-20 Thread GitBox


exceptionfactory commented on a change in pull request #4753:
URL: https://github.com/apache/nifi/pull/4753#discussion_r561113641



##
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java
##
@@ -36,11 +39,17 @@
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.InetSocketAddress;
 import java.util.Properties;
 
 public class ZooKeeperStateServer extends ZooKeeperServerMain {
 private static final Logger logger = 
LoggerFactory.getLogger(ZooKeeperStateServer.class);
 
+static final int MIN_PORT = 1024;
+static final int MAX_PORT = 65353;
+static final String ZOOKEEPER_SSL_QUORUM = "sslQuorum";
+static final String ZOOKEEPER_PORT_UNIFICATION = "portUnification";
+static final String ZOOKEEPER_SERVER_CNXN_FACTORY = "serverCnxnFactory";

Review comment:
   Should these static variables also be marked private?

##
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java
##
@@ -36,11 +39,17 @@
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.InetSocketAddress;
 import java.util.Properties;
 
 public class ZooKeeperStateServer extends ZooKeeperServerMain {
 private static final Logger logger = 
LoggerFactory.getLogger(ZooKeeperStateServer.class);
 
+static final int MIN_PORT = 1024;
+static final int MAX_PORT = 65353;

Review comment:
   Should this value be `65535`, or is there a reason for the lower number?

##
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java
##
@@ -198,6 +219,144 @@ public static ZooKeeperStateServer create(final 
NiFiProperties properties) throw
 zkProperties.load(bis);
 }
 
-return new ZooKeeperStateServer(zkProperties);
+return new ZooKeeperStateServer(reconcileProperties(properties, 
zkProperties));
+}
+
+/**
+ * Reconcile properties between the nifi.properties and 
zookeeper.properties (zoo.cfg) files. Most of the ZooKeeper server properties 
are derived from
+ * the zookeeper.properties file, while the TLS key/truststore properties 
are taken from nifi.properties.
+ * @param niFiProperties NiFiProperties file containing ZooKeeper client 
and TLS configuration
+ * @param zkProperties The zookeeper.properties file containing Zookeeper 
server configuration
+ * @return A reconciled QuorumPeerConfig which will include TLS properties 
set if they are available.
+ * @throws IOException If configuration files fail to parse.
+ * @throws ConfigException If secure configuration is not as expected. 
Check administration documentation.
+ */
+private static QuorumPeerConfig reconcileProperties(NiFiProperties 
niFiProperties, Properties zkProperties) throws IOException, ConfigException {
+QuorumPeerConfig peerConfig = new QuorumPeerConfig();
+peerConfig.parseProperties(zkProperties);
+
+final boolean niFiConfigIsSecure = 
isNiFiConfigSecureForZooKeeper(niFiProperties);
+final boolean zooKeeperConfigIsSecure = 
isZooKeeperConfigSecure(peerConfig);
+
+if (!zooKeeperConfigIsSecure && !niFiConfigIsSecure) {
+logger.info("{} property is set to false or is not present, and 
zookeeper.properties file does not contain secureClientPort property, so 
embedded ZooKeeper will be started without TLS.",

Review comment:
   Should this be a debug message instead of an info message?  Also 
recommend removing the trailing period character from the log message.

##
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/server/TestZooKeeperStateServerConfigurations.java
##
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.state.server;
+
+import 

[GitHub] [nifi] urbandan commented on a change in pull request #4730: NIFI-8095: Created StatelessNiFi Sink Connector and Source Connector.…

2021-01-20 Thread GitBox


urbandan commented on a change in pull request #4730:
URL: https://github.com/apache/nifi/pull/4730#discussion_r561128503



##
File path: 
nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSourceTask.java
##
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.header.ConnectHeaders;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.stateless.flow.DataflowTrigger;
+import org.apache.nifi.stateless.flow.StatelessDataflow;
+import org.apache.nifi.stateless.flow.TriggerResult;
+import org.apache.nifi.util.FormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
+
+public class StatelessNiFiSourceTask extends SourceTask {
+public static final String STATE_MAP_KEY = "task.index";
+private static final Logger logger = 
LoggerFactory.getLogger(StatelessNiFiSourceTask.class);
+
+private StatelessDataflow dataflow;
+private String outputPortName;
+private String topicName;
+private String topicNameAttribute;
+private TriggerResult triggerResult;
+private String keyAttributeName;
+private Pattern headerAttributeNamePattern;
+private long timeoutMillis;
+private String dataflowName;
+private long failureYieldExpiration = 0L;
+
+private final Map clusterStatePartitionMap = 
Collections.singletonMap(STATE_MAP_KEY, "CLUSTER");
+private Map localStatePartitionMap = new HashMap<>();
+private boolean primaryNodeOnly;
+private boolean primaryNodeTask;
+
+private final AtomicLong unacknowledgedRecords = new AtomicLong(0L);
+
+@Override
+public String version() {
+return StatelessKafkaConnectorUtil.getVersion();
+}
+
+@Override
+public void start(final Map properties) {
+logger.info("Starting Source Task with properties {}", 
StatelessKafkaConnectorUtil.getLoggableProperties(properties));
+
+final String timeout = 
properties.getOrDefault(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, 
StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT);
+timeoutMillis = (long) FormatUtils.getPreciseTimeDuration(timeout, 
TimeUnit.MILLISECONDS);
+
+topicName = properties.get(StatelessNiFiSourceConnector.TOPIC_NAME);
+topicNameAttribute = 
properties.get(StatelessNiFiSourceConnector.TOPIC_NAME_ATTRIBUTE);
+keyAttributeName = 
properties.get(StatelessNiFiSourceConnector.KEY_ATTRIBUTE);
+
+if (topicName == null && topicNameAttribute == null) {
+throw new ConfigException("Either the topic.name or 
topic.name.attribute configuration must be specified");
+}
+
+final String headerRegex = 
properties.get(StatelessNiFiSourceConnector.HEADER_REGEX);
+headerAttributeNamePattern = headerRegex == null ? null : 
Pattern.compile(headerRegex);
+
+dataflow = StatelessKafkaConnectorUtil.createDataflow(properties);
+primaryNodeOnly = dataflow.isSourcePrimaryNodeOnly();
+
+// Determine the name of the Output Port to retrieve data from
+dataflowName = 
properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME);
+outputPortName = 
properties.get(StatelessNiFiSourceConnector.OUTPUT_PORT_NAME);
+if (outputPortName == null) {
+final Set outputPorts = dataflow.getOutputPortNames();
+if (outputPorts.isEmpty()) {
+throw new ConfigException("The dataflow specified for <" + 

[GitHub] [nifi] markap14 commented on a change in pull request #4730: NIFI-8095: Created StatelessNiFi Sink Connector and Source Connector.…

2021-01-20 Thread GitBox


markap14 commented on a change in pull request #4730:
URL: https://github.com/apache/nifi/pull/4730#discussion_r561112193



##
File path: 
nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSourceTask.java
##
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.header.ConnectHeaders;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.stateless.flow.DataflowTrigger;
+import org.apache.nifi.stateless.flow.StatelessDataflow;
+import org.apache.nifi.stateless.flow.TriggerResult;
+import org.apache.nifi.util.FormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
+
+public class StatelessNiFiSourceTask extends SourceTask {
+public static final String STATE_MAP_KEY = "task.index";
+private static final Logger logger = 
LoggerFactory.getLogger(StatelessNiFiSourceTask.class);
+
+private StatelessDataflow dataflow;
+private String outputPortName;
+private String topicName;
+private String topicNameAttribute;
+private TriggerResult triggerResult;
+private String keyAttributeName;
+private Pattern headerAttributeNamePattern;
+private long timeoutMillis;
+private String dataflowName;
+private long failureYieldExpiration = 0L;
+
+private final Map clusterStatePartitionMap = 
Collections.singletonMap(STATE_MAP_KEY, "CLUSTER");
+private Map localStatePartitionMap = new HashMap<>();
+private boolean primaryNodeOnly;
+private boolean primaryNodeTask;
+
+private final AtomicLong unacknowledgedRecords = new AtomicLong(0L);
+
+@Override
+public String version() {
+return StatelessKafkaConnectorUtil.getVersion();
+}
+
+@Override
+public void start(final Map properties) {
+logger.info("Starting Source Task with properties {}", 
StatelessKafkaConnectorUtil.getLoggableProperties(properties));
+
+final String timeout = 
properties.getOrDefault(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, 
StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT);
+timeoutMillis = (long) FormatUtils.getPreciseTimeDuration(timeout, 
TimeUnit.MILLISECONDS);
+
+topicName = properties.get(StatelessNiFiSourceConnector.TOPIC_NAME);
+topicNameAttribute = 
properties.get(StatelessNiFiSourceConnector.TOPIC_NAME_ATTRIBUTE);
+keyAttributeName = 
properties.get(StatelessNiFiSourceConnector.KEY_ATTRIBUTE);
+
+if (topicName == null && topicNameAttribute == null) {
+throw new ConfigException("Either the topic.name or 
topic.name.attribute configuration must be specified");
+}
+
+final String headerRegex = 
properties.get(StatelessNiFiSourceConnector.HEADER_REGEX);
+headerAttributeNamePattern = headerRegex == null ? null : 
Pattern.compile(headerRegex);
+
+dataflow = StatelessKafkaConnectorUtil.createDataflow(properties);
+primaryNodeOnly = dataflow.isSourcePrimaryNodeOnly();
+
+// Determine the name of the Output Port to retrieve data from
+dataflowName = 
properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME);
+outputPortName = 
properties.get(StatelessNiFiSourceConnector.OUTPUT_PORT_NAME);
+if (outputPortName == null) {
+final Set outputPorts = dataflow.getOutputPortNames();
+if (outputPorts.isEmpty()) {
+throw new ConfigException("The dataflow specified for <" + 

[GitHub] [nifi] markap14 commented on a change in pull request #4730: NIFI-8095: Created StatelessNiFi Sink Connector and Source Connector.…

2021-01-20 Thread GitBox


markap14 commented on a change in pull request #4730:
URL: https://github.com/apache/nifi/pull/4730#discussion_r561109996



##
File path: 
nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSinkTask.java
##
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.stateless.flow.DataflowTrigger;
+import org.apache.nifi.stateless.flow.StatelessDataflow;
+import org.apache.nifi.stateless.flow.TriggerResult;
+import org.apache.nifi.util.FormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+public class StatelessNiFiSinkTask extends SinkTask {
+private static final Logger logger = 
LoggerFactory.getLogger(StatelessNiFiSinkTask.class);
+
+private StatelessDataflow dataflow;
+private String inputPortName;
+private Set failurePortNames;
+private long timeoutMillis;
+private Pattern headerNameRegex;
+private String headerNamePrefix;
+private int batchSize;
+private long batchBytes;
+private QueueSize queueSize;
+private String dataflowName;
+
+private long backoffMillis = 0L;
+private boolean lastTriggerSuccessful = true;
+private ExecutorService backgroundTriggerExecutor;
+
+@Override
+public String version() {
+return StatelessKafkaConnectorUtil.getVersion();
+}
+
+@Override
+public void start(final Map properties) {
+logger.info("Starting Sink Task with properties {}", 
StatelessKafkaConnectorUtil.getLoggableProperties(properties));
+
+final String timeout = 
properties.getOrDefault(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, 
StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT);
+timeoutMillis = (long) FormatUtils.getPreciseTimeDuration(timeout, 
TimeUnit.MILLISECONDS);
+
+dataflowName = 
properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME);
+
+final String regex = 
properties.get(StatelessNiFiSinkConnector.HEADERS_AS_ATTRIBUTES_REGEX);
+headerNameRegex = regex == null ? null : Pattern.compile(regex);
+headerNamePrefix = 
properties.getOrDefault(StatelessNiFiSinkConnector.HEADER_ATTRIBUTE_NAME_PREFIX,
 "");
+
+batchSize = 
Integer.parseInt(properties.getOrDefault(StatelessNiFiSinkConnector.BATCH_SIZE_COUNT,
 "0"));
+batchBytes = 
Long.parseLong(properties.getOrDefault(StatelessNiFiSinkConnector.BATCH_SIZE_BYTES,
 "0"));
+
+dataflow = StatelessKafkaConnectorUtil.createDataflow(properties);
+
+// Determine input port name. If input port is explicitly set, use the 
value given. Otherwise, if only one port exists, use that. Otherwise, throw 
ConfigException.
+final String dataflowName = 
properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME);
+inputPortName = 
properties.get(StatelessNiFiSinkConnector.INPUT_PORT_NAME);
+if (inputPortName == null) {
+final Set inputPorts = dataflow.getInputPortNames();
+if (inputPorts.isEmpty()) {
+throw new ConfigException("The dataflow specified for <" + 
dataflowName + "> does not have an Input Port at the root level. Dataflows used 
for a Kafka Connect Sink Task "
++ "must have at least one Input Port at the root level.");

[GitHub] [nifi] urbandan commented on a change in pull request #4730: NIFI-8095: Created StatelessNiFi Sink Connector and Source Connector.…

2021-01-20 Thread GitBox


urbandan commented on a change in pull request #4730:
URL: https://github.com/apache/nifi/pull/4730#discussion_r561093484



##
File path: 
nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSinkTask.java
##
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.stateless.flow.DataflowTrigger;
+import org.apache.nifi.stateless.flow.StatelessDataflow;
+import org.apache.nifi.stateless.flow.TriggerResult;
+import org.apache.nifi.util.FormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+public class StatelessNiFiSinkTask extends SinkTask {
+private static final Logger logger = 
LoggerFactory.getLogger(StatelessNiFiSinkTask.class);
+
+private StatelessDataflow dataflow;
+private String inputPortName;
+private Set failurePortNames;
+private long timeoutMillis;
+private Pattern headerNameRegex;
+private String headerNamePrefix;
+private int batchSize;
+private long batchBytes;
+private QueueSize queueSize;
+private String dataflowName;
+
+private long backoffMillis = 0L;
+private boolean lastTriggerSuccessful = true;
+private ExecutorService backgroundTriggerExecutor;
+
+@Override
+public String version() {
+return StatelessKafkaConnectorUtil.getVersion();
+}
+
+@Override
+public void start(final Map properties) {
+logger.info("Starting Sink Task with properties {}", 
StatelessKafkaConnectorUtil.getLoggableProperties(properties));
+
+final String timeout = 
properties.getOrDefault(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, 
StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT);
+timeoutMillis = (long) FormatUtils.getPreciseTimeDuration(timeout, 
TimeUnit.MILLISECONDS);
+
+dataflowName = 
properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME);
+
+final String regex = 
properties.get(StatelessNiFiSinkConnector.HEADERS_AS_ATTRIBUTES_REGEX);
+headerNameRegex = regex == null ? null : Pattern.compile(regex);
+headerNamePrefix = 
properties.getOrDefault(StatelessNiFiSinkConnector.HEADER_ATTRIBUTE_NAME_PREFIX,
 "");
+
+batchSize = 
Integer.parseInt(properties.getOrDefault(StatelessNiFiSinkConnector.BATCH_SIZE_COUNT,
 "0"));
+batchBytes = 
Long.parseLong(properties.getOrDefault(StatelessNiFiSinkConnector.BATCH_SIZE_BYTES,
 "0"));
+
+dataflow = StatelessKafkaConnectorUtil.createDataflow(properties);
+
+// Determine input port name. If input port is explicitly set, use the 
value given. Otherwise, if only one port exists, use that. Otherwise, throw 
ConfigException.
+final String dataflowName = 
properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME);
+inputPortName = 
properties.get(StatelessNiFiSinkConnector.INPUT_PORT_NAME);
+if (inputPortName == null) {
+final Set inputPorts = dataflow.getInputPortNames();
+if (inputPorts.isEmpty()) {
+throw new ConfigException("The dataflow specified for <" + 
dataflowName + "> does not have an Input Port at the root level. Dataflows used 
for a Kafka Connect Sink Task "
++ "must have at least one Input Port at the root level.");

[GitHub] [nifi] exceptionfactory commented on pull request #4768: NIFI-8155 - add banner text in page title

2021-01-20 Thread GitBox


exceptionfactory commented on pull request #4768:
URL: https://github.com/apache/nifi/pull/4768#issuecomment-763755846


   This change looks straightforward as it stands, but there are several other 
places where the `document.title` is set when opening new windows, such as 
Provenance contents or Bulletin Board messages.  Should those other references 
also be updated?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] thenatog commented on pull request #4753: NIFI-7356 - Enable TLS for embedded Zookeeper when NiFi has TLS enabled

2021-01-20 Thread GitBox


thenatog commented on pull request #4753:
URL: https://github.com/apache/nifi/pull/4753#issuecomment-763753967


   Addressing your first two points:
   
   > - TLS is required for the embedded ZK when cluster TLS is enabled but NiFi 
won't try to connect securely unless nifi.zookeeper.client.secure is set to 
true in nifi.properties.
   > - Similarly, the embedded ZK won't actually run with TLS enabled unless 
secureClientPort is set in zookeeper.properties. It appears that clientPort is 
successfully removed but secureClientPort doesn't get added.
   
   I have put in configuration logic that will stop NiFi from starting if 
secureClientPort is configured in zookeeper.properties but 
nifi.zookeeper.client.secure=false. The requirement being that they will need 
to configure zookeeper.properties with a clientPort value instead.
   
   When starting securely, the intent is to remove any additional clientPort 
that may allow insecure connections. In contrast, secureClientPort will not be 
added if clientPort was set but everything else should be secure. The user will 
need to manually edit the zookeeper.properties file to set secureClientPort. 
   
   Let me know what you think of the latest set of commits. 
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] markap14 commented on a change in pull request #4746: NIFI-8034: Fixed PropertyValue.isExpressionLanguagePresent always ret…

2021-01-20 Thread GitBox


markap14 commented on a change in pull request #4746:
URL: https://github.com/apache/nifi/pull/4746#discussion_r561092480



##
File path: 
nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestStandardPreparedQuery.java
##
@@ -309,6 +309,24 @@ public void testVariableImpacted() {
 
assertTrue(Query.prepare("${anyMatchingAttribute('a.*'):equals('hello')}").getVariableImpact().isImpacted("attr"));
 }
 
+@Test
+public void testIsExpressionLanguagePresent() {
+assertFalse(Query.prepare("value").isExpressionLanguagePresent());
+assertFalse(Query.prepare("").isExpressionLanguagePresent());
+
+assertTrue(Query.prepare("${variable}").isExpressionLanguagePresent());
+
assertTrue(Query.prepare("${hostname()}").isExpressionLanguagePresent());
+
assertTrue(Query.prepare("${hostname():equals('localhost')}").isExpressionLanguagePresent());
+
assertTrue(Query.prepare("prefix-${hostname()}").isExpressionLanguagePresent());
+
assertTrue(Query.prepare("${hostname()}-suffix").isExpressionLanguagePresent());
+
assertTrue(Query.prepare("${variable1}${hostname()}${variable2}").isExpressionLanguagePresent());
+
assertTrue(Query.prepare("${${variable}}").isExpressionLanguagePresent());
+
+assertFalse(Query.prepare("${}").isExpressionLanguagePresent());
+
+assertTrue(Query.prepare("#{param}").isExpressionLanguagePresent());

Review comment:
   This should be false, not true. This is not expression language but 
rather a parameter reference. The value that will be made available to the 
component will be the fully resolved value, after substituting in the parameter.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] joewitt commented on pull request #4771: NIFI-8156 Fixed byte handling bug in cassandra.

2021-01-20 Thread GitBox


joewitt commented on pull request #4771:
URL: https://github.com/apache/nifi/pull/4771#issuecomment-763714975


   Ha!  Me too.  I trust it was broken and you've verified your fix ;)
   
   But yeah thanks for sharing and thanks for tackling.  Will monitor the 
build.  Tagged to 1.13 so I wont miss it.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] MikeThomsen commented on pull request #4771: NIFI-8156 Fixed byte handling bug in cassandra.

2021-01-20 Thread GitBox


MikeThomsen commented on pull request #4771:
URL: https://github.com/apache/nifi/pull/4771#issuecomment-763711741


   @joewitt cool, but I'm a believer in "trust, but verify" so here is a 
screenshot of my terminal showing queries against the table matching the steps 
I showed:
   
   https://user-images.githubusercontent.com/108184/105197394-93a81980-5b0a-11eb-9282-a1a2476c4c3c.png;>
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] joewitt commented on pull request #4771: NIFI-8156 Fixed byte handling bug in cassandra.

2021-01-20 Thread GitBox


joewitt commented on pull request #4771:
URL: https://github.com/apache/nifi/pull/4771#issuecomment-763708883


   I will simply review code and trust you validated it.  We're good if the 
build is good on this in my view.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] MikeThomsen edited a comment on pull request #4771: NIFI-8156 Fixed byte handling bug in cassandra.

2021-01-20 Thread GitBox


MikeThomsen edited a comment on pull request #4771:
URL: https://github.com/apache/nifi/pull/4771#issuecomment-763707997


   @joewitt here are test instructions:
   
   ```
   docker run -p 7000:7000 -p 9042:9042 --name cassandra -d cassandra:3
   ```
   
   ```
   CREATE KEYSPACE byte_test WITH replication = {'class':'SimpleStrategy', 
'replication_factor' : 3};
   use byte_test;
   create table binary_test (id text, data blob, primary key(id));
   ```
   
   
[Cassandra_Byte_Array_Test.xml.txt](https://github.com/apache/nifi/files/5843448/Cassandra_Byte_Array_Test.xml.txt)
   
   Once it's running, you can test with:
   
   ```
   select id, blobastext(data) from binary_test;
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] MikeThomsen commented on pull request #4771: NIFI-8156 Fixed byte handling bug in cassandra.

2021-01-20 Thread GitBox


MikeThomsen commented on pull request #4771:
URL: https://github.com/apache/nifi/pull/4771#issuecomment-763707997


   @joewitt here are test instructions:
   
   ```
   docker run -p 7000:7000 -p 9042:9042 --name cassandra -d cassandra:3
   ```
   
   ```
   CREATE KEYSPACE byte_test WITH replication = {'class':'SimpleStrategy', 
'replication_factor' : 3};
   use byte_test;
   create table binary_test (id text, data blob, primary key(id));
   ```
   
   
[Cassandra_Byte_Array_Test.xml.txt](https://github.com/apache/nifi/files/5843448/Cassandra_Byte_Array_Test.xml.txt)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] markap14 commented on a change in pull request #4760: NIFI-8142 Add "on conflict do nothing" feature to PutDatabaseRecord

2021-01-20 Thread GitBox


markap14 commented on a change in pull request #4760:
URL: https://github.com/apache/nifi/pull/4760#discussion_r561045998



##
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
##
@@ -301,13 +301,23 @@
 .name("put-db-record-max-batch-size")
 .displayName("Maximum Batch Size")
 .description("Specifies maximum batch size for INSERT and UPDATE 
statements. This parameter has no effect for other statements specified in 
'Statement Type'."
-+ " Zero means the batch size is not limited.")
++ " Zero means the batch size is not limited.")
 .defaultValue("0")
 .required(false)
 .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
 
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
 .build();
 
+static final PropertyDescriptor UPSERT_DO_NOTHING = new 
PropertyDescriptor.Builder()

Review comment:
   This feels odd to me. A property named "Upsert Do Nothing" I feel is 
confusing and misleading. It sounds like any upsert should be ignored and not 
acted upon. And in this case, it's not really upserts that are being ignored, 
but conflicting Inserts. What makes more sense to me is to add another option 
for the Statement Type: "INSERT_IGNORE" or something to that effect. This would 
be consistent with how it is done with PutKudu, also.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (NIFI-8156) PutCassandraRecord does not wrap byte arrays with a ByteBuffer, causing write failures

2021-01-20 Thread Joe Witt (Jira)


[ 
https://issues.apache.org/jira/browse/NIFI-8156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17268641#comment-17268641
 ] 

Joe Witt commented on NIFI-8156:


pr is up.  i'll give it a look and assuming build is good get it merged before 
RC

> PutCassandraRecord does not wrap byte arrays with a ByteBuffer, causing write 
> failures
> --
>
> Key: NIFI-8156
> URL: https://issues.apache.org/jira/browse/NIFI-8156
> Project: Apache NiFi
>  Issue Type: Bug
>Affects Versions: 1.12.1
>Reporter: Mike Thomsen
>Assignee: Mike Thomsen
>Priority: Major
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> As the subject line says, types that are meant for a Cassandra bytes field 
> are not wrapped inside of a ByteBuffer. This causes a write failure when the 
> Cassandra driver attempts to write the array.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (NIFI-8156) PutCassandraRecord does not wrap byte arrays with a ByteBuffer, causing write failures

2021-01-20 Thread Joe Witt (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFI-8156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Witt updated NIFI-8156:
---
Fix Version/s: 1.13.0

> PutCassandraRecord does not wrap byte arrays with a ByteBuffer, causing write 
> failures
> --
>
> Key: NIFI-8156
> URL: https://issues.apache.org/jira/browse/NIFI-8156
> Project: Apache NiFi
>  Issue Type: Bug
>Affects Versions: 1.12.1
>Reporter: Mike Thomsen
>Assignee: Mike Thomsen
>Priority: Major
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> As the subject line says, types that are meant for a Cassandra bytes field 
> are not wrapped inside of a ByteBuffer. This causes a write failure when the 
> Cassandra driver attempts to write the array.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [nifi] joewitt commented on pull request #4771: NIFI-8156 Fixed byte handling bug in cassandra.

2021-01-20 Thread GitBox


joewitt commented on pull request #4771:
URL: https://github.com/apache/nifi/pull/4771#issuecomment-763703079


   assuming build is happy i'll merge



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] MikeThomsen opened a new pull request #4771: NIFI-8156 Fixed byte handling bug in cassandra.

2021-01-20 Thread GitBox


MikeThomsen opened a new pull request #4771:
URL: https://github.com/apache/nifi/pull/4771


   Thank you for submitting a contribution to Apache NiFi.
   
   Please provide a short description of the PR here:
   
    Description of PR
   
   _Enables X functionality; fixes bug NIFI-._
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
in the commit message?
   
   - [ ] Does your PR title start with **NIFI-** where  is the JIRA 
number you are trying to resolve? Pay particular attention to the hyphen "-" 
character.
   
   - [ ] Has your PR been rebased against the latest commit within the target 
branch (typically `main`)?
   
   - [ ] Is your initial contribution a single, squashed commit? _Additional 
commits in response to PR reviewer feedback should be made on this branch and 
pushed to allow change tracking. Do not `squash` or use `--force` when pushing 
to allow for clean monitoring of changes._
   
   ### For code changes:
   - [ ] Have you ensured that the full suite of tests is executed via `mvn 
-Pcontrib-check clean install` at the root `nifi` folder?
   - [ ] Have you written or updated unit tests to verify your changes?
   - [ ] Have you verified that the full build is successful on JDK 8?
   - [ ] Have you verified that the full build is successful on JDK 11?
   - [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
   - [ ] If applicable, have you updated the `LICENSE` file, including the main 
`LICENSE` file under `nifi-assembly`?
   - [ ] If applicable, have you updated the `NOTICE` file, including the main 
`NOTICE` file found under `nifi-assembly`?
   - [ ] If adding new Properties, have you added `.displayName` in addition to 
.name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which 
it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI for 
build issues and submit an update to your PR as soon as possible.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #955: MINIFICPP-1414 Create in-memory compressed logs

2021-01-20 Thread GitBox


adamdebreceni commented on a change in pull request #955:
URL: https://github.com/apache/nifi-minifi-cpp/pull/955#discussion_r561038296



##
File path: libminifi/test/unit/LoggerTests.cpp
##
@@ -107,3 +110,71 @@ TEST_CASE("Test ShortenNames", "[ttl6]") {
   LogTestController::getInstance(props)->reset();
   LogTestController::getInstance().reset();
 }
+
+using namespace minifi::io;
+
+std::string decompress(const std::shared_ptr& input) {
+  auto output = utils::make_unique();
+  auto decompressor = 
std::make_shared(gsl::make_not_null(output.get()));
+  minifi::internal::pipe(input, decompressor);
+  decompressor->close();
+  return std::string{reinterpret_cast(output->getBuffer()), 
output->size()};
+}
+
+TEST_CASE("Test Compression", "[ttl7]") {
+  auto& log_config = logging::LoggerConfiguration::getConfiguration();
+  auto properties = std::make_shared();
+  std::string className;
+  SECTION("Using root logger") {
+className = "CompressionTestClassUsingRoot";
+// by default the root logger is OFF
+properties->set("logger.root", "INFO");
+  }
+  SECTION("Inherit compression sink") {
+className = "CompressionTestClassInheriting";
+properties->set("appender.null", "null");
+properties->set("logger." + className, "INFO,null");
+  }
+  log_config.initialize(properties);
+  auto logger = log_config.getLogger(className);
+  logger->log_error("Hi there");
+  std::shared_ptr 
compressed_log{logging::LoggerConfiguration::getCompressedLog(true)};
+  REQUIRE(compressed_log);
+  auto logs = decompress(compressed_log);
+  REQUIRE(logs.find("Hi there") != std::string::npos);

Review comment:
   actually it turns out `logs == "Hi there"` so I changed it to that





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #955: MINIFICPP-1414 Create in-memory compressed logs

2021-01-20 Thread GitBox


adamdebreceni commented on a change in pull request #955:
URL: https://github.com/apache/nifi-minifi-cpp/pull/955#discussion_r561037947



##
File path: libminifi/include/core/logging/internal/LogBuffer.h
##
@@ -0,0 +1,62 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include 
+#include 
+
+#include "io/BufferStream.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace logging {
+namespace internal {
+
+class LogBuffer {
+ public:
+  LogBuffer() = default;
+  explicit LogBuffer(std::unique_ptr buffer): 
buffer_{std::move(buffer)} {}
+
+  static LogBuffer allocate(size_t max_size) {
+LogBuffer instance{utils::make_unique()};
+instance.buffer_->reserve(max_size * 3 / 2);

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #955: MINIFICPP-1414 Create in-memory compressed logs

2021-01-20 Thread GitBox


adamdebreceni commented on a change in pull request #955:
URL: https://github.com/apache/nifi-minifi-cpp/pull/955#discussion_r561037821



##
File path: libminifi/src/core/logging/LoggerConfiguration.cpp
##
@@ -85,6 +108,7 @@ LoggerConfiguration::LoggerConfiguration()
 void LoggerConfiguration::initialize(const std::shared_ptr 
_properties) {
   std::lock_guard lock(mutex);
   root_namespace_ = initialize_namespaces(logger_properties);
+  initializeCompression(lock, logger_properties);

Review comment:
   setting any of the compression specific limits to 0 now results in it 
being completely disabled (so it is opt-out)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #955: MINIFICPP-1414 Create in-memory compressed logs

2021-01-20 Thread GitBox


adamdebreceni commented on a change in pull request #955:
URL: https://github.com/apache/nifi-minifi-cpp/pull/955#discussion_r561037369



##
File path: libminifi/include/io/BufferStream.h
##
@@ -42,6 +42,14 @@ class BufferStream : public BaseStream {
 write(reinterpret_cast(data.c_str()), data.length());
   }
 
+  /*
+   * prepares the stream to accept and additional byte_count bytes
+   * @param byte_count number of bytes we expect to write
+   */
+  void reserve(size_t byte_count) {

Review comment:
   renamed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #955: MINIFICPP-1414 Create in-memory compressed logs

2021-01-20 Thread GitBox


adamdebreceni commented on a change in pull request #955:
URL: https://github.com/apache/nifi-minifi-cpp/pull/955#discussion_r561011823



##
File path: libminifi/include/core/logging/internal/LogBuffer.h
##
@@ -0,0 +1,62 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include 
+#include 
+
+#include "io/BufferStream.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace logging {
+namespace internal {
+
+class LogBuffer {
+ public:
+  LogBuffer() = default;
+  explicit LogBuffer(std::unique_ptr buffer): 
buffer_{std::move(buffer)} {}
+
+  static LogBuffer allocate(size_t max_size) {
+LogBuffer instance{utils::make_unique()};
+instance.buffer_->reserve(max_size * 3 / 2);

Review comment:
   I am adding the comment to `StagingQueue` and moving the multiplication 
there





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (NIFI-8156) PutCassandraRecord does not wrap byte arrays with a ByteBuffer, causing write failures

2021-01-20 Thread Mike Thomsen (Jira)
Mike Thomsen created NIFI-8156:
--

 Summary: PutCassandraRecord does not wrap byte arrays with a 
ByteBuffer, causing write failures
 Key: NIFI-8156
 URL: https://issues.apache.org/jira/browse/NIFI-8156
 Project: Apache NiFi
  Issue Type: Bug
Affects Versions: 1.12.1
Reporter: Mike Thomsen
Assignee: Mike Thomsen


As the subject line says, types that are meant for a Cassandra bytes field are 
not wrapped inside of a ByteBuffer. This causes a write failure when the 
Cassandra driver attempts to write the array.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #955: MINIFICPP-1414 Create in-memory compressed logs

2021-01-20 Thread GitBox


adamdebreceni commented on a change in pull request #955:
URL: https://github.com/apache/nifi-minifi-cpp/pull/955#discussion_r561008200



##
File path: libminifi/include/core/logging/internal/LogBuffer.h
##
@@ -0,0 +1,62 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include 
+#include 
+
+#include "io/BufferStream.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace logging {
+namespace internal {
+
+class LogBuffer {
+ public:
+  LogBuffer() = default;
+  explicit LogBuffer(std::unique_ptr buffer): 
buffer_{std::move(buffer)} {}
+
+  static LogBuffer allocate(size_t max_size) {
+LogBuffer instance{utils::make_unique()};
+instance.buffer_->reserve(max_size * 3 / 2);

Review comment:
   `max_size` is a soft limit, i.e. reaching `max_size` is an indicator 
that that block should be compressed and committed (or for the compressed 
blocks to be rotated), we cannot guarantee that only `max_size` content is 
written to the buffer (not unless we plan on compressing in and blocking the 
logger thread), since `max_size` is the "trigger limit", presumable each block 
would contain (at the trigger point) a little more than `max_size` content





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #955: MINIFICPP-1414 Create in-memory compressed logs

2021-01-20 Thread GitBox


szaszm commented on a change in pull request #955:
URL: https://github.com/apache/nifi-minifi-cpp/pull/955#discussion_r561009640



##
File path: libminifi/include/core/logging/internal/LogBuffer.h
##
@@ -0,0 +1,62 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include 
+#include 
+
+#include "io/BufferStream.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace logging {
+namespace internal {
+
+class LogBuffer {
+ public:
+  LogBuffer() = default;
+  explicit LogBuffer(std::unique_ptr buffer): 
buffer_{std::move(buffer)} {}
+
+  static LogBuffer allocate(size_t max_size) {
+LogBuffer instance{utils::make_unique()};
+instance.buffer_->reserve(max_size * 3 / 2);

Review comment:
   I see. I think this explanations would be nice to have in the code for 
future readers.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #955: MINIFICPP-1414 Create in-memory compressed logs

2021-01-20 Thread GitBox


adamdebreceni commented on a change in pull request #955:
URL: https://github.com/apache/nifi-minifi-cpp/pull/955#discussion_r561008200



##
File path: libminifi/include/core/logging/internal/LogBuffer.h
##
@@ -0,0 +1,62 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include 
+#include 
+
+#include "io/BufferStream.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace logging {
+namespace internal {
+
+class LogBuffer {
+ public:
+  LogBuffer() = default;
+  explicit LogBuffer(std::unique_ptr buffer): 
buffer_{std::move(buffer)} {}
+
+  static LogBuffer allocate(size_t max_size) {
+LogBuffer instance{utils::make_unique()};
+instance.buffer_->reserve(max_size * 3 / 2);

Review comment:
   `max_size` is a soft limit, i.e. reaching `max_size` is an indicator 
that that block should be compressed and committed (or for the compressed 
blocks to be discarded), we cannot guarantee that only `max_size` content is 
written to the buffer (not unless we plan on compressing in and blocking the 
logger thread), since `max_size` is the "trigger limit", presumable each block 
would contain (at the trigger point) a little more than `max_size` content





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #955: MINIFICPP-1414 Create in-memory compressed logs

2021-01-20 Thread GitBox


adamdebreceni commented on a change in pull request #955:
URL: https://github.com/apache/nifi-minifi-cpp/pull/955#discussion_r561008200



##
File path: libminifi/include/core/logging/internal/LogBuffer.h
##
@@ -0,0 +1,62 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include 
+#include 
+
+#include "io/BufferStream.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace logging {
+namespace internal {
+
+class LogBuffer {
+ public:
+  LogBuffer() = default;
+  explicit LogBuffer(std::unique_ptr buffer): 
buffer_{std::move(buffer)} {}
+
+  static LogBuffer allocate(size_t max_size) {
+LogBuffer instance{utils::make_unique()};
+instance.buffer_->reserve(max_size * 3 / 2);

Review comment:
   `max_size` is a soft limit, i.e. reaching `max_size` is an indicator 
that that block should be compressed and committed (or for the compressed 
blocks to be discarded), we cannot guarantee that only `max_size` content is 
written to the buffer (not unless we plan on compressing and blocking the 
logger thread), since `max_size` is the "trigger limit", presumable each block 
would contain (at the trigger point) a little more than `max_size` content





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #955: MINIFICPP-1414 Create in-memory compressed logs

2021-01-20 Thread GitBox


szaszm commented on a change in pull request #955:
URL: https://github.com/apache/nifi-minifi-cpp/pull/955#discussion_r560995043



##
File path: libminifi/src/core/logging/LoggerConfiguration.cpp
##
@@ -85,6 +108,7 @@ LoggerConfiguration::LoggerConfiguration()
 void LoggerConfiguration::initialize(const std::shared_ptr 
_properties) {
   std::lock_guard lock(mutex);
   root_namespace_ = initialize_namespaces(logger_properties);
+  initializeCompression(lock, logger_properties);

Review comment:
   If it can be implemented reasonably easily, then yes, it would be nice 
to be able to fully opt out from this feature.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #955: MINIFICPP-1414 Create in-memory compressed logs

2021-01-20 Thread GitBox


adamdebreceni commented on a change in pull request #955:
URL: https://github.com/apache/nifi-minifi-cpp/pull/955#discussion_r560993751



##
File path: libminifi/src/core/logging/LoggerConfiguration.cpp
##
@@ -85,6 +108,7 @@ LoggerConfiguration::LoggerConfiguration()
 void LoggerConfiguration::initialize(const std::shared_ptr 
_properties) {
   std::lock_guard lock(mutex);
   root_namespace_ = initialize_namespaces(logger_properties);
+  initializeCompression(lock, logger_properties);

Review comment:
   note that the maximum size of the in-memory logs (cached and compressed) 
is configurable, would like the whole compression to be configurable as well?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #975: MINIFICPP-1400 Create ListS3 processor

2021-01-20 Thread GitBox


adamdebreceni commented on a change in pull request #975:
URL: https://github.com/apache/nifi-minifi-cpp/pull/975#discussion_r560987684



##
File path: extensions/aws/s3/S3Wrapper.cpp
##
@@ -30,46 +37,253 @@ namespace minifi {
 namespace aws {
 namespace s3 {
 
-minifi::utils::optional 
S3Wrapper::sendPutObjectRequest(const Aws::S3::Model::PutObjectRequest& 
request) {
-  Aws::S3::S3Client s3_client(credentials_, client_config_);
-  auto outcome = s3_client.PutObject(request);
+void HeadObjectResult::setFilePaths(const std::string& key) {
+  absolute_path = key;
+  std::tie(path, filename) = minifi::utils::file::FileUtils::split_path(key, 
true /*force_posix*/);
+}
+
+S3Wrapper::S3Wrapper() : 
request_sender_(minifi::utils::make_unique()) {
+}
+
+S3Wrapper::S3Wrapper(std::unique_ptr request_sender) : 
request_sender_(std::move(request_sender)) {
+}
+
+void S3Wrapper::setCredentials(const Aws::Auth::AWSCredentials& cred) {
+  request_sender_->setCredentials(cred);
+}
+
+void S3Wrapper::setRegion(const Aws::String& region) {
+  request_sender_->setRegion(region);
+}
+
+void S3Wrapper::setTimeout(uint64_t timeout) {
+  request_sender_->setTimeout(timeout);
+}
+
+void S3Wrapper::setEndpointOverrideUrl(const Aws::String& url) {
+  request_sender_->setEndpointOverrideUrl(url);
+}
+
+void S3Wrapper::setProxy(const ProxyOptions& proxy) {
+  request_sender_->setProxy(proxy);
+}
+
+void S3Wrapper::setCannedAcl(Aws::S3::Model::PutObjectRequest& request, const 
std::string& canned_acl) const {
+  if (canned_acl.empty() || CANNED_ACL_MAP.find(canned_acl) == 
CANNED_ACL_MAP.end())
+return;
+
+  logger_->log_debug("Setting AWS canned ACL [%s]", canned_acl);
+  request.SetACL(CANNED_ACL_MAP.at(canned_acl));
+}
+
+Expiration S3Wrapper::getExpiration(const std::string& expiration) {
+  minifi::utils::Regex expr("expiry-date=\"(.*)\", rule-id=\"(.*)\"");
+  const auto match = expr.match(expiration);
+  const auto& results = expr.getResult();
+  if (!match || results.size() < 3)
+return Expiration{};
+  return Expiration{results[1], results[2]};
+}
+
+std::string 
S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption encryption) 
{
+  if (encryption == Aws::S3::Model::ServerSideEncryption::NOT_SET) {
+return "";
+  }
+
+  auto it = std::find_if(SERVER_SIDE_ENCRYPTION_MAP.begin(), 
SERVER_SIDE_ENCRYPTION_MAP.end(),
+[&](const std::pair pair) {
+  return pair.second == encryption;
+});
+  if (it != SERVER_SIDE_ENCRYPTION_MAP.end()) {
+return it->first;
+  }
+  return "";
+}
+
+minifi::utils::optional S3Wrapper::putObject(const 
PutObjectRequestParameters& put_object_params, std::shared_ptr 
data_stream) {
+  Aws::S3::Model::PutObjectRequest request;
+  request.SetBucket(put_object_params.bucket);
+  request.SetKey(put_object_params.object_key);
+  
request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
+  
request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
+  request.SetContentType(put_object_params.content_type);
+  request.SetMetadata(put_object_params.user_metadata_map);
+  request.SetBody(data_stream);
+  request.SetGrantFullControl(put_object_params.fullcontrol_user_list);
+  request.SetGrantRead(put_object_params.read_permission_user_list);
+  request.SetGrantReadACP(put_object_params.read_acl_user_list);
+  request.SetGrantWriteACP(put_object_params.write_acl_user_list);
+  setCannedAcl(request, put_object_params.canned_acl);
+
+  auto aws_result = request_sender_->sendPutObjectRequest(request);
+  if (!aws_result) {
+return minifi::utils::nullopt;
+  }
+
+  PutObjectResult result;
+  // Etags are returned by AWS in quoted form that should be removed
+  result.etag = 
minifi::utils::StringUtils::removeFramingCharacters(aws_result->GetETag(), '"');
+  result.version = aws_result->GetVersionId();
+
+  // GetExpiration returns a string pair with a date and a ruleid in 
'expiry-date=\"\", rule-id=\"\"' format
+  // s3.expiration only needs the date member of this pair
+  result.expiration = 
getExpiration(aws_result->GetExpiration()).expiration_time;
+  result.ssealgorithm = 
getEncryptionString(aws_result->GetServerSideEncryption());
+  return result;
+}
+
+bool S3Wrapper::deleteObject(const std::string& bucket, const std::string& 
object_key, const std::string& version) {
+  Aws::S3::Model::DeleteObjectRequest request;
+  request.SetBucket(bucket);
+  request.SetKey(object_key);
+  if (!version.empty()) {
+request.SetVersionId(version);
+  }
+  return request_sender_->sendDeleteObjectRequest(request);
+}
+
+int64_t S3Wrapper::writeFetchedBody(Aws::IOStream& source, const int64_t 
data_size, const std::shared_ptr& output) {
+  static const uint64_t BUFFER_SIZE = 4096;
+  std::vector buffer;
+  buffer.reserve(BUFFER_SIZE);
 
-  if (outcome.IsSuccess()) {
-  logger_->log_info("Added S3 object '%s' to bucket '%s'", 
request.GetKey(), request.GetBucket());
-  return 

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

2021-01-20 Thread GitBox


szaszm commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r560987064



##
File path: extensions/librdkafka/rdkafka_utils.h
##
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/OptionalUtils.h"
+#include "rdkafka.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+enum class KafkaEncoding {
+  UTF8,
+  HEX
+};
+
+struct rd_kafka_conf_deleter {
+  void operator()(rd_kafka_conf_t* ptr) const noexcept { 
rd_kafka_conf_destroy(ptr); }
+};
+
+struct rd_kafka_producer_deleter {
+  void operator()(rd_kafka_t* ptr) const noexcept {
+rd_kafka_resp_err_t flush_ret = rd_kafka_flush(ptr, 1 /* ms */);  // 
Matching the wait time of KafkaConnection.cpp
+// If concerned, we could log potential errors here:
+// if (RD_KAFKA_RESP_ERR__TIMED_OUT == flush_ret) {
+//   std::cerr << "Deleting producer failed: time-out while trying to 
flush" << std::endl;
+// }
+rd_kafka_destroy(ptr);
+  }
+};
+
+struct rd_kafka_consumer_deleter {
+  void operator()(rd_kafka_t* ptr) const noexcept {
+rd_kafka_consumer_close(ptr);
+rd_kafka_destroy(ptr);
+  }
+};
+
+struct rd_kafka_topic_partition_list_deleter {
+  void operator()(rd_kafka_topic_partition_list_t* ptr) const noexcept { 
rd_kafka_topic_partition_list_destroy(ptr); }
+};
+
+struct rd_kafka_topic_conf_deleter {
+  void operator()(rd_kafka_topic_conf_t* ptr) const noexcept { 
rd_kafka_topic_conf_destroy(ptr); }
+};
+struct rd_kafka_topic_deleter {
+  void operator()(rd_kafka_topic_t* ptr) const noexcept { 
rd_kafka_topic_destroy(ptr); }
+};
+
+struct rd_kafka_message_deleter {
+  void operator()(rd_kafka_message_t* ptr) const noexcept { 
rd_kafka_message_destroy(ptr); }
+};
+
+struct rd_kafka_headers_deleter {
+  void operator()(rd_kafka_headers_t* ptr) const noexcept { 
rd_kafka_headers_destroy(ptr); }
+};
+
+template 
+void kafka_headers_for_each(const rd_kafka_headers_t* headers, T 
key_value_handle) {
+  const char *key;  // Null terminated, not to be freed
+  const void *value;
+  std::size_t size;
+  for (std::size_t i = 0; RD_KAFKA_RESP_ERR_NO_ERROR == 
rd_kafka_header_get_all(headers, i, , , ); ++i) {
+key_value_handle(std::string(key), std::string(static_cast(value), size));

Review comment:
   In that case pass a span down. The point is that the copy is not 
necessary because the usage is fully enclosed in the loop body i.e. the 
lifetime of `*value`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #955: MINIFICPP-1414 Create in-memory compressed logs

2021-01-20 Thread GitBox


szaszm commented on a change in pull request #955:
URL: https://github.com/apache/nifi-minifi-cpp/pull/955#discussion_r558340027



##
File path: libminifi/src/core/logging/LoggerConfiguration.cpp
##
@@ -85,6 +108,7 @@ LoggerConfiguration::LoggerConfiguration()
 void LoggerConfiguration::initialize(const std::shared_ptr 
_properties) {
   std::lock_guard lock(mutex);
   root_namespace_ = initialize_namespaces(logger_properties);
+  initializeCompression(lock, logger_properties);

Review comment:
   I think this should be configurable. It may not be feasible to use 
memory to store logs in low memory environments or when lightweight operation 
is desired.

##
File path: libminifi/include/io/BufferStream.h
##
@@ -42,6 +42,14 @@ class BufferStream : public BaseStream {
 write(reinterpret_cast(data.c_str()), data.length());
   }
 
+  /*
+   * prepares the stream to accept and additional byte_count bytes
+   * @param byte_count number of bytes we expect to write
+   */
+  void reserve(size_t byte_count) {

Review comment:
   I would name this `extend` or similar to avoid confusion with STL 
`reserve` that takes the new capacity, not the difference.

##
File path: libminifi/include/core/logging/internal/LogBuffer.h
##
@@ -0,0 +1,62 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include 
+#include 
+
+#include "io/BufferStream.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace logging {
+namespace internal {
+
+class LogBuffer {
+ public:
+  LogBuffer() = default;
+  explicit LogBuffer(std::unique_ptr buffer): 
buffer_{std::move(buffer)} {}
+
+  static LogBuffer allocate(size_t max_size) {
+LogBuffer instance{utils::make_unique()};
+instance.buffer_->reserve(max_size * 3 / 2);

Review comment:
   why is this multiplication?

##
File path: libminifi/test/unit/LoggerTests.cpp
##
@@ -107,3 +110,71 @@ TEST_CASE("Test ShortenNames", "[ttl6]") {
   LogTestController::getInstance(props)->reset();
   LogTestController::getInstance().reset();
 }
+
+using namespace minifi::io;
+
+std::string decompress(const std::shared_ptr& input) {
+  auto output = utils::make_unique();
+  auto decompressor = 
std::make_shared(gsl::make_not_null(output.get()));
+  minifi::internal::pipe(input, decompressor);
+  decompressor->close();
+  return std::string{reinterpret_cast(output->getBuffer()), 
output->size()};
+}
+
+TEST_CASE("Test Compression", "[ttl7]") {
+  auto& log_config = logging::LoggerConfiguration::getConfiguration();
+  auto properties = std::make_shared();
+  std::string className;
+  SECTION("Using root logger") {
+className = "CompressionTestClassUsingRoot";
+// by default the root logger is OFF
+properties->set("logger.root", "INFO");
+  }
+  SECTION("Inherit compression sink") {
+className = "CompressionTestClassInheriting";
+properties->set("appender.null", "null");
+properties->set("logger." + className, "INFO,null");
+  }
+  log_config.initialize(properties);
+  auto logger = log_config.getLogger(className);
+  logger->log_error("Hi there");
+  std::shared_ptr 
compressed_log{logging::LoggerConfiguration::getCompressedLog(true)};
+  REQUIRE(compressed_log);
+  auto logs = decompress(compressed_log);
+  REQUIRE(logs.find("Hi there") != std::string::npos);

Review comment:
   Shouldn't this be equal or `StringUtils::endsWith`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #975: MINIFICPP-1400 Create ListS3 processor

2021-01-20 Thread GitBox


adamdebreceni commented on a change in pull request #975:
URL: https://github.com/apache/nifi-minifi-cpp/pull/975#discussion_r560982313



##
File path: extensions/aws/s3/S3Wrapper.cpp
##
@@ -30,46 +37,253 @@ namespace minifi {
 namespace aws {
 namespace s3 {
 
-minifi::utils::optional 
S3Wrapper::sendPutObjectRequest(const Aws::S3::Model::PutObjectRequest& 
request) {
-  Aws::S3::S3Client s3_client(credentials_, client_config_);
-  auto outcome = s3_client.PutObject(request);
+void HeadObjectResult::setFilePaths(const std::string& key) {
+  absolute_path = key;
+  std::tie(path, filename) = minifi::utils::file::FileUtils::split_path(key, 
true /*force_posix*/);
+}
+
+S3Wrapper::S3Wrapper() : 
request_sender_(minifi::utils::make_unique()) {
+}
+
+S3Wrapper::S3Wrapper(std::unique_ptr request_sender) : 
request_sender_(std::move(request_sender)) {
+}
+
+void S3Wrapper::setCredentials(const Aws::Auth::AWSCredentials& cred) {
+  request_sender_->setCredentials(cred);
+}
+
+void S3Wrapper::setRegion(const Aws::String& region) {
+  request_sender_->setRegion(region);
+}
+
+void S3Wrapper::setTimeout(uint64_t timeout) {
+  request_sender_->setTimeout(timeout);
+}
+
+void S3Wrapper::setEndpointOverrideUrl(const Aws::String& url) {
+  request_sender_->setEndpointOverrideUrl(url);
+}
+
+void S3Wrapper::setProxy(const ProxyOptions& proxy) {
+  request_sender_->setProxy(proxy);
+}
+
+void S3Wrapper::setCannedAcl(Aws::S3::Model::PutObjectRequest& request, const 
std::string& canned_acl) const {
+  if (canned_acl.empty() || CANNED_ACL_MAP.find(canned_acl) == 
CANNED_ACL_MAP.end())
+return;
+
+  logger_->log_debug("Setting AWS canned ACL [%s]", canned_acl);
+  request.SetACL(CANNED_ACL_MAP.at(canned_acl));
+}
+
+Expiration S3Wrapper::getExpiration(const std::string& expiration) {
+  minifi::utils::Regex expr("expiry-date=\"(.*)\", rule-id=\"(.*)\"");
+  const auto match = expr.match(expiration);
+  const auto& results = expr.getResult();
+  if (!match || results.size() < 3)
+return Expiration{};
+  return Expiration{results[1], results[2]};
+}
+
+std::string 
S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption encryption) 
{
+  if (encryption == Aws::S3::Model::ServerSideEncryption::NOT_SET) {
+return "";
+  }
+
+  auto it = std::find_if(SERVER_SIDE_ENCRYPTION_MAP.begin(), 
SERVER_SIDE_ENCRYPTION_MAP.end(),
+[&](const std::pair pair) {
+  return pair.second == encryption;
+});
+  if (it != SERVER_SIDE_ENCRYPTION_MAP.end()) {
+return it->first;
+  }
+  return "";
+}
+
+minifi::utils::optional S3Wrapper::putObject(const 
PutObjectRequestParameters& put_object_params, std::shared_ptr 
data_stream) {
+  Aws::S3::Model::PutObjectRequest request;
+  request.SetBucket(put_object_params.bucket);
+  request.SetKey(put_object_params.object_key);
+  
request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
+  
request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
+  request.SetContentType(put_object_params.content_type);
+  request.SetMetadata(put_object_params.user_metadata_map);
+  request.SetBody(data_stream);
+  request.SetGrantFullControl(put_object_params.fullcontrol_user_list);
+  request.SetGrantRead(put_object_params.read_permission_user_list);
+  request.SetGrantReadACP(put_object_params.read_acl_user_list);
+  request.SetGrantWriteACP(put_object_params.write_acl_user_list);
+  setCannedAcl(request, put_object_params.canned_acl);
+
+  auto aws_result = request_sender_->sendPutObjectRequest(request);
+  if (!aws_result) {
+return minifi::utils::nullopt;
+  }
+
+  PutObjectResult result;
+  // Etags are returned by AWS in quoted form that should be removed
+  result.etag = 
minifi::utils::StringUtils::removeFramingCharacters(aws_result->GetETag(), '"');
+  result.version = aws_result->GetVersionId();
+
+  // GetExpiration returns a string pair with a date and a ruleid in 
'expiry-date=\"\", rule-id=\"\"' format
+  // s3.expiration only needs the date member of this pair
+  result.expiration = 
getExpiration(aws_result->GetExpiration()).expiration_time;
+  result.ssealgorithm = 
getEncryptionString(aws_result->GetServerSideEncryption());
+  return result;
+}
+
+bool S3Wrapper::deleteObject(const std::string& bucket, const std::string& 
object_key, const std::string& version) {
+  Aws::S3::Model::DeleteObjectRequest request;
+  request.SetBucket(bucket);
+  request.SetKey(object_key);
+  if (!version.empty()) {
+request.SetVersionId(version);
+  }
+  return request_sender_->sendDeleteObjectRequest(request);
+}
+
+int64_t S3Wrapper::writeFetchedBody(Aws::IOStream& source, const int64_t 
data_size, const std::shared_ptr& output) {
+  static const uint64_t BUFFER_SIZE = 4096;
+  std::vector buffer;
+  buffer.reserve(BUFFER_SIZE);
 
-  if (outcome.IsSuccess()) {
-  logger_->log_info("Added S3 object '%s' to bucket '%s'", 
request.GetKey(), request.GetBucket());
-  return 

[GitHub] [nifi] markap14 opened a new pull request #4770: NIFI-8146: Ensure that we close the Connection/Statement/PreparedStat…

2021-01-20 Thread GitBox


markap14 opened a new pull request #4770:
URL: https://github.com/apache/nifi/pull/4770


   …ement objects in finally blocks or try-with-resources
   
   Thank you for submitting a contribution to Apache NiFi.
   
   Please provide a short description of the PR here:
   
    Description of PR
   
   _Enables X functionality; fixes bug NIFI-._
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
in the commit message?
   
   - [ ] Does your PR title start with **NIFI-** where  is the JIRA 
number you are trying to resolve? Pay particular attention to the hyphen "-" 
character.
   
   - [ ] Has your PR been rebased against the latest commit within the target 
branch (typically `main`)?
   
   - [ ] Is your initial contribution a single, squashed commit? _Additional 
commits in response to PR reviewer feedback should be made on this branch and 
pushed to allow change tracking. Do not `squash` or use `--force` when pushing 
to allow for clean monitoring of changes._
   
   ### For code changes:
   - [ ] Have you ensured that the full suite of tests is executed via `mvn 
-Pcontrib-check clean install` at the root `nifi` folder?
   - [ ] Have you written or updated unit tests to verify your changes?
   - [ ] Have you verified that the full build is successful on JDK 8?
   - [ ] Have you verified that the full build is successful on JDK 11?
   - [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
   - [ ] If applicable, have you updated the `LICENSE` file, including the main 
`LICENSE` file under `nifi-assembly`?
   - [ ] If applicable, have you updated the `NOTICE` file, including the main 
`NOTICE` file found under `nifi-assembly`?
   - [ ] If adding new Properties, have you added `.displayName` in addition to 
.name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which 
it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI for 
build issues and submit an update to your PR as soon as possible.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Reopened] (NIFI-8146) Allow RecordPath to be used for specifying operation type and data fields when using PutDatabaseRecord

2021-01-20 Thread Mark Payne (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFI-8146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mark Payne reopened NIFI-8146:
--

Re-opening issue because I realized that we don't always close the JDBC 
Statements / Connection objects.

> Allow RecordPath to be used for specifying operation type and data fields 
> when using PutDatabaseRecord
> --
>
> Key: NIFI-8146
> URL: https://issues.apache.org/jira/browse/NIFI-8146
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Reporter: Mark Payne
>Assignee: Mark Payne
>Priority: Major
> Fix For: 1.13.0
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> PutDatbaseRecord requires that the Statement Type be defined as a property or 
> a FlowFile attribute. This means that if a FlowFile has many records, it must 
> be split apart into individual Records if there is more than 1 type of 
> statement needed per FlowFile.
> It also assumes that the data to be inserted/updated/deleted/etc is the full 
> record. However, it's common to have some wrapper around the actual data, as 
> is the case with a tool like Debezium, which includes an Operation Type, a 
> 'before' snapshot and an 'after' snapshot. To accommodate this, we should 
> allow Record-friendly methods for specifying the path to the data and the 
> operation type. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #975: MINIFICPP-1400 Create ListS3 processor

2021-01-20 Thread GitBox


adamdebreceni commented on a change in pull request #975:
URL: https://github.com/apache/nifi-minifi-cpp/pull/975#discussion_r560970459



##
File path: extensions/aws/s3/S3Wrapper.cpp
##
@@ -30,46 +37,253 @@ namespace minifi {
 namespace aws {
 namespace s3 {
 
-minifi::utils::optional 
S3Wrapper::sendPutObjectRequest(const Aws::S3::Model::PutObjectRequest& 
request) {
-  Aws::S3::S3Client s3_client(credentials_, client_config_);
-  auto outcome = s3_client.PutObject(request);
+void HeadObjectResult::setFilePaths(const std::string& key) {
+  absolute_path = key;
+  std::tie(path, filename) = minifi::utils::file::FileUtils::split_path(key, 
true /*force_posix*/);
+}
+
+S3Wrapper::S3Wrapper() : 
request_sender_(minifi::utils::make_unique()) {
+}
+
+S3Wrapper::S3Wrapper(std::unique_ptr request_sender) : 
request_sender_(std::move(request_sender)) {
+}
+
+void S3Wrapper::setCredentials(const Aws::Auth::AWSCredentials& cred) {
+  request_sender_->setCredentials(cred);
+}
+
+void S3Wrapper::setRegion(const Aws::String& region) {
+  request_sender_->setRegion(region);
+}
+
+void S3Wrapper::setTimeout(uint64_t timeout) {
+  request_sender_->setTimeout(timeout);
+}
+
+void S3Wrapper::setEndpointOverrideUrl(const Aws::String& url) {
+  request_sender_->setEndpointOverrideUrl(url);
+}
+
+void S3Wrapper::setProxy(const ProxyOptions& proxy) {
+  request_sender_->setProxy(proxy);
+}
+
+void S3Wrapper::setCannedAcl(Aws::S3::Model::PutObjectRequest& request, const 
std::string& canned_acl) const {
+  if (canned_acl.empty() || CANNED_ACL_MAP.find(canned_acl) == 
CANNED_ACL_MAP.end())
+return;
+
+  logger_->log_debug("Setting AWS canned ACL [%s]", canned_acl);
+  request.SetACL(CANNED_ACL_MAP.at(canned_acl));
+}
+
+Expiration S3Wrapper::getExpiration(const std::string& expiration) {
+  minifi::utils::Regex expr("expiry-date=\"(.*)\", rule-id=\"(.*)\"");
+  const auto match = expr.match(expiration);
+  const auto& results = expr.getResult();
+  if (!match || results.size() < 3)
+return Expiration{};
+  return Expiration{results[1], results[2]};
+}
+
+std::string 
S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption encryption) 
{
+  if (encryption == Aws::S3::Model::ServerSideEncryption::NOT_SET) {
+return "";
+  }
+
+  auto it = std::find_if(SERVER_SIDE_ENCRYPTION_MAP.begin(), 
SERVER_SIDE_ENCRYPTION_MAP.end(),
+[&](const std::pair pair) {
+  return pair.second == encryption;
+});
+  if (it != SERVER_SIDE_ENCRYPTION_MAP.end()) {
+return it->first;
+  }
+  return "";
+}
+
+minifi::utils::optional S3Wrapper::putObject(const 
PutObjectRequestParameters& put_object_params, std::shared_ptr 
data_stream) {
+  Aws::S3::Model::PutObjectRequest request;
+  request.SetBucket(put_object_params.bucket);
+  request.SetKey(put_object_params.object_key);
+  
request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
+  
request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
+  request.SetContentType(put_object_params.content_type);
+  request.SetMetadata(put_object_params.user_metadata_map);
+  request.SetBody(data_stream);
+  request.SetGrantFullControl(put_object_params.fullcontrol_user_list);
+  request.SetGrantRead(put_object_params.read_permission_user_list);
+  request.SetGrantReadACP(put_object_params.read_acl_user_list);
+  request.SetGrantWriteACP(put_object_params.write_acl_user_list);
+  setCannedAcl(request, put_object_params.canned_acl);
+
+  auto aws_result = request_sender_->sendPutObjectRequest(request);
+  if (!aws_result) {
+return minifi::utils::nullopt;
+  }
+
+  PutObjectResult result;
+  // Etags are returned by AWS in quoted form that should be removed
+  result.etag = 
minifi::utils::StringUtils::removeFramingCharacters(aws_result->GetETag(), '"');
+  result.version = aws_result->GetVersionId();
+
+  // GetExpiration returns a string pair with a date and a ruleid in 
'expiry-date=\"\", rule-id=\"\"' format
+  // s3.expiration only needs the date member of this pair
+  result.expiration = 
getExpiration(aws_result->GetExpiration()).expiration_time;
+  result.ssealgorithm = 
getEncryptionString(aws_result->GetServerSideEncryption());
+  return result;
+}
+
+bool S3Wrapper::deleteObject(const std::string& bucket, const std::string& 
object_key, const std::string& version) {
+  Aws::S3::Model::DeleteObjectRequest request;
+  request.SetBucket(bucket);
+  request.SetKey(object_key);
+  if (!version.empty()) {
+request.SetVersionId(version);
+  }
+  return request_sender_->sendDeleteObjectRequest(request);
+}
+
+int64_t S3Wrapper::writeFetchedBody(Aws::IOStream& source, const int64_t 
data_size, const std::shared_ptr& output) {
+  static const uint64_t BUFFER_SIZE = 4096;
+  std::vector buffer;
+  buffer.reserve(BUFFER_SIZE);
 
-  if (outcome.IsSuccess()) {
-  logger_->log_info("Added S3 object '%s' to bucket '%s'", 
request.GetKey(), request.GetBucket());
-  return 

[GitHub] [nifi] exceptionfactory commented on pull request #4734: NIFI-8023 Added toLocalDate() and updated toDate() in DataTypeUtils

2021-01-20 Thread GitBox


exceptionfactory commented on pull request #4734:
URL: https://github.com/apache/nifi/pull/4734#issuecomment-763605759


   @markap14 Thanks for the review and additional feedback, I pushed an update 
to address your comments.  There still appear to be some edge cases for some 
record-oriented processors when running NiFi in different time zones, based on 
running some test flows from @turcsanyip.  The PR as it stands addresses the 
test failures for certain time zones as described in NIFI-8023 through the use 
of the new `DataTypeUtils.toLocalDate()` method.
   
   Additional work is probably necessary to address system time zone issues 
when using some of the JSON Record Readers and Writers, but perhaps it would be 
better to create a new issue to resolve those problems.
   
   Any additional feedback @turcsanyip or @adenes?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (NIFI-8154) AvroParquetHDFSRecordReader fails to read parquet file containing nested structs

2021-01-20 Thread Pierre Villard (Jira)


[ 
https://issues.apache.org/jira/browse/NIFI-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17268566#comment-17268566
 ] 

Pierre Villard commented on NIFI-8154:
--

Hi [~jonesgh],

Looks like just upgrading the dependency is causing errors in the unit tests. 
I'll need more time to revisit all of this when time permits. Feel free to 
submit a pull request if you have one that is working with the existing unit 
tests.

> AvroParquetHDFSRecordReader fails to read parquet file containing nested 
> structs
> 
>
> Key: NIFI-8154
> URL: https://issues.apache.org/jira/browse/NIFI-8154
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Extensions
>Affects Versions: 1.11.3, 1.12.1
>Reporter: Glenn Jones
>Assignee: Pierre Villard
>Priority: Minor
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> FetchParquet can't be used to process files containing nested structs.  When 
> trying to create a RecordSchema it runs into 
> https://issues.apache.org/jira/browse/PARQUET-1441, which causes it to fail.  
> We've patched this locally by building the nifi-parquet-processors with 
> parquet-avro 1.11.0, but it would be great if this made it into the next 
> release.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (NIFI-8154) AvroParquetHDFSRecordReader fails to read parquet file containing nested structs

2021-01-20 Thread Pierre Villard (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFI-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pierre Villard reassigned NIFI-8154:


Assignee: (was: Pierre Villard)

> AvroParquetHDFSRecordReader fails to read parquet file containing nested 
> structs
> 
>
> Key: NIFI-8154
> URL: https://issues.apache.org/jira/browse/NIFI-8154
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Extensions
>Affects Versions: 1.11.3, 1.12.1
>Reporter: Glenn Jones
>Priority: Minor
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> FetchParquet can't be used to process files containing nested structs.  When 
> trying to create a RecordSchema it runs into 
> https://issues.apache.org/jira/browse/PARQUET-1441, which causes it to fail.  
> We've patched this locally by building the nifi-parquet-processors with 
> parquet-avro 1.11.0, but it would be great if this made it into the next 
> release.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [nifi] exceptionfactory commented on a change in pull request #4734: NIFI-8023 Added toLocalDate() and updated toDate() in DataTypeUtils

2021-01-20 Thread GitBox


exceptionfactory commented on a change in pull request #4734:
URL: https://github.com/apache/nifi/pull/4734#discussion_r560952390



##
File path: 
nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
##
@@ -1085,6 +1180,31 @@ private static Object toEnum(Object value, EnumDataType 
dataType, String fieldNa
 throw new IllegalTypeConversionException("Cannot convert value [" + 
value + "] of type " + value.getClass() + " to Date for field " + fieldName);
 }
 
+private static Date parseDate(final String string, final DateFormat 
dateFormat) throws ParseException {
+// DateFormat.parse() creates java.util.Date with System Default Time 
Zone
+final java.util.Date parsed = dateFormat.parse(string);
+
+Instant parsedInstant = parsed.toInstant();
+if (isTimeZoneAdjustmentRequired(dateFormat)) {
+// Adjust parsed date using System Default Time Zone offset 
milliseconds when time zone format not found
+parsedInstant = 
parsedInstant.minus(TimeZone.getDefault().getRawOffset(), ChronoUnit.MILLIS);
+}
+
+return new Date(parsedInstant.toEpochMilli());
+}
+
+private static boolean isTimeZoneAdjustmentRequired(final DateFormat 
dateFormat) {
+boolean adjustmentRequired = false;
+
+if (dateFormat instanceof SimpleDateFormat) {
+final SimpleDateFormat simpleDateFormat = (SimpleDateFormat) 
dateFormat;
+final String pattern = simpleDateFormat.toPattern();
+adjustmentRequired = !pattern.contains(TIME_ZONE_PATTERN);

Review comment:
   Thanks, I adjusted the check to use a regular expression pattern for all 
three characters.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (NIFI-8154) AvroParquetHDFSRecordReader fails to read parquet file containing nested structs

2021-01-20 Thread Pierre Villard (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFI-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pierre Villard updated NIFI-8154:
-
Status: Open  (was: Patch Available)

> AvroParquetHDFSRecordReader fails to read parquet file containing nested 
> structs
> 
>
> Key: NIFI-8154
> URL: https://issues.apache.org/jira/browse/NIFI-8154
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Extensions
>Affects Versions: 1.12.1, 1.11.3
>Reporter: Glenn Jones
>Assignee: Pierre Villard
>Priority: Minor
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> FetchParquet can't be used to process files containing nested structs.  When 
> trying to create a RecordSchema it runs into 
> https://issues.apache.org/jira/browse/PARQUET-1441, which causes it to fail.  
> We've patched this locally by building the nifi-parquet-processors with 
> parquet-avro 1.11.0, but it would be great if this made it into the next 
> release.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [nifi] pvillard31 commented on pull request #4769: NIFI-8154 - upgrade parquet-avro from 1.10.0 to 1.11.1

2021-01-20 Thread GitBox


pvillard31 commented on pull request #4769:
URL: https://github.com/apache/nifi/pull/4769#issuecomment-763599719


   Tests are failing, will need to revisit when time permits.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] pvillard31 closed pull request #4769: NIFI-8154 - upgrade parquet-avro from 1.10.0 to 1.11.1

2021-01-20 Thread GitBox


pvillard31 closed pull request #4769:
URL: https://github.com/apache/nifi/pull/4769


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] exceptionfactory commented on a change in pull request #4734: NIFI-8023 Added toLocalDate() and updated toDate() in DataTypeUtils

2021-01-20 Thread GitBox


exceptionfactory commented on a change in pull request #4734:
URL: https://github.com/apache/nifi/pull/4734#discussion_r560951664



##
File path: 
nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
##
@@ -1040,6 +1049,93 @@ private static Object toEnum(Object value, EnumDataType 
dataType, String fieldNa
 throw new IllegalTypeConversionException("Cannot convert value " + 
value + " of type " + dataType.toString() + " for field " + fieldName);
 }
 
+/**
+ * Convert value to Local Date with support for conversion from numbers or 
formatted strings
+ *
+ * @param value Value to be converted
+ * @param formatter Supplier for Date Time Formatter can be null values 
other than numeric strings
+ * @param fieldName Field Name for value to be converted
+ * @return Local Date or null when value to be converted is null
+ * @throws IllegalTypeConversionException Thrown when conversion from 
string fails or unsupported value provided
+ */
+public static LocalDate toLocalDate(final Object value, final 
Supplier formatter, final String fieldName) {
+LocalDate localDate;
+
+if (value == null) {
+return null;
+} else if (value instanceof LocalDate) {
+localDate = (LocalDate) value;
+} else if (value instanceof java.util.Date) {
+final java.util.Date date = (java.util.Date) value;
+localDate = parseLocalDateEpochMillis(date.getTime());
+} else if (value instanceof Number) {
+final long epochMillis = ((Number) value).longValue();
+localDate = parseLocalDateEpochMillis(epochMillis);
+} else if (value instanceof String) {
+try {
+localDate = parseLocalDate((String) value, formatter);
+} catch (final RuntimeException e) {
+final String message = String.format("Failed Conversion of 
Field [%s] from String [%s] to LocalDate with Formatter [%s]", fieldName, 
value, formatter, e);

Review comment:
   Thanks for catching that detail, I have updated the call to use 
`formatter.get()` and check for `null` formatter.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (NIFI-7263) Add a No tracking Strategy to ListFile/ListFTP

2021-01-20 Thread humpfhumpf (Jira)


[ 
https://issues.apache.org/jira/browse/NIFI-7263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17268565#comment-17268565
 ] 

humpfhumpf commented on NIFI-7263:
--

In our production case, files are available on SFTP server without 
chronological order. We want ListSFTP to list all available files, and 
FetchSFTP to move them on a remote directory, after getting them.

> Add a No tracking Strategy to ListFile/ListFTP
> --
>
> Key: NIFI-7263
> URL: https://issues.apache.org/jira/browse/NIFI-7263
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Jens M Kofoed
>Assignee: Waleed Al Aibani
>Priority: Major
>  Labels: ListFile, listftp
> Fix For: 1.13.0
>
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> The Listfile/ListFTP has 2 Listing Strategies: Tracking Timestamps and 
> Tracking Entities.
> It would be very very nice if the List process also could have a No Tracking 
> (fix it your self) strategy
> If running NIFI in a cluster the List/Fetch is the perfect solution instead 
> of using a GetFile. But we have had many caces where files in the pickup 
> folder has old timestamps, so here we have to use Tracking Entities.
> The issue is in cases where you are not allowed to delete files but you have 
> to make a change to the file filter. The tracking entities start all over, 
> and list all files again.
> In other situations we need to resent all data, and would like to clear the 
> state of the Tracking Entities. But you can't.
> So I have to make a small flow for detecting duplicates. And in some cases 
> just ignore duplicates and in other caces open up for sending duplicates. But 
> it is a pain in the ... to use the Tracking Entities.
> So a NO STRATEGY would be very very nice



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #4755: NIFI-8133 Add ability to suppress null/empty values in ElasticSearchCl…

2021-01-20 Thread GitBox


ChrisSamo632 commented on a change in pull request #4755:
URL: https://github.com/apache/nifi/pull/4755#discussion_r560939470



##
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearch5ClientService_IT.groovy
##
@@ -175,6 +177,41 @@ class ElasticSearch5ClientService_IT {
 runner.assertValid()
 }
 
+@Test
+void testNullSuppression() {

Review comment:
   Happy for this thread to be resolved given you mention having run the 
integration tests below?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #975: MINIFICPP-1400 Create ListS3 processor

2021-01-20 Thread GitBox


adamdebreceni commented on a change in pull request #975:
URL: https://github.com/apache/nifi-minifi-cpp/pull/975#discussion_r560938789



##
File path: extensions/aws/processors/ListS3.cpp
##
@@ -0,0 +1,294 @@
+/**
+ * @file ListS3.cpp
+ * ListS3 class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ListS3.h"
+
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace aws {
+namespace processors {
+
+const std::string ListS3::LATEST_LISTED_KEY_PREFIX = "listed_key.";
+const std::string ListS3::LATEST_LISTED_KEY_TIMESTAMP = "listed_key.timestamp";
+
+const core::Property ListS3::Delimiter(
+  core::PropertyBuilder::createProperty("Delimiter")
+->withDescription("The string used to delimit directories within the 
bucket. Please consult the AWS documentation for the correct use of this 
field.")
+->build());
+const core::Property ListS3::Prefix(
+  core::PropertyBuilder::createProperty("Prefix")
+->withDescription("The prefix used to filter the object list. In most 
cases, it should end with a forward slash ('/').")
+->build());
+const core::Property ListS3::UseVersions(
+  core::PropertyBuilder::createProperty("Use Versions")
+->isRequired(true)
+->withDefaultValue(false)
+->withDescription("Specifies whether to use S3 versions, if applicable. If 
false, only the latest version of each object will be returned.")
+->build());
+const core::Property ListS3::MinimumObjectAge(
+  core::PropertyBuilder::createProperty("Minimum Object Age")
+->isRequired(true)
+->withDefaultValue("0 sec")
+->withDescription("The minimum age that an S3 object must be in order to 
be considered; any object younger than this amount of time (according to last 
modification date) will be ignored.")
+->build());
+const core::Property ListS3::WriteObjectTags(
+  core::PropertyBuilder::createProperty("Write Object Tags")
+->isRequired(true)
+->withDefaultValue(false)
+->withDescription("If set to 'true', the tags associated with the S3 
object will be written as FlowFile attributes.")
+->build());
+const core::Property ListS3::WriteUserMetadata(
+  core::PropertyBuilder::createProperty("Write User Metadata")
+->isRequired(true)
+->withDefaultValue(false)
+->withDescription("If set to 'true', the user defined metadata associated 
with the S3 object will be added to FlowFile attributes/records.")
+->build());
+const core::Property ListS3::RequesterPays(
+  core::PropertyBuilder::createProperty("Requester Pays")
+->isRequired(true)
+->withDefaultValue(false)
+->withDescription("If true, indicates that the requester consents to pay 
any charges associated with listing the S3 bucket. This sets the 
'x-amz-request-payer' header to 'requester'. "
+  "Note that this setting is only used if Write User 
Metadata is true.")
+->build());
+
+const core::Relationship ListS3::Success("success", "FlowFiles are routed to 
success relationship");
+
+void ListS3::initialize() {
+  // Add new supported properties
+  updateSupportedProperties({Delimiter, Prefix, UseVersions, MinimumObjectAge, 
WriteObjectTags, WriteUserMetadata, RequesterPays});
+  // Set the supported relationships
+  setSupportedRelationships({Success});
+}
+
+void ListS3::onSchedule(const std::shared_ptr , 
const std::shared_ptr ) {
+  S3Processor::onSchedule(context, sessionFactory);
+
+  state_manager_ = context->getStateManager();
+  if (state_manager_ == nullptr) {
+throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+  }
+
+  auto common_properties = getCommonELSupportedProperties(context, nullptr);
+  if (!common_properties) {
+throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Required property is not set 
or invalid");
+  }
+  configureS3Wrapper(common_properties.value());
+  list_request_params_.bucket = common_properties->bucket;
+
+  context->getProperty(Delimiter.getName(), list_request_params_.delimiter);
+  logger_->log_debug("ListS3: Delimiter [%s]", list_request_params_.delimiter);
+
+  context->getProperty(Prefix.getName(), 

[jira] [Updated] (NIFI-8154) AvroParquetHDFSRecordReader fails to read parquet file containing nested structs

2021-01-20 Thread Pierre Villard (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFI-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pierre Villard updated NIFI-8154:
-
Status: Patch Available  (was: Open)

> AvroParquetHDFSRecordReader fails to read parquet file containing nested 
> structs
> 
>
> Key: NIFI-8154
> URL: https://issues.apache.org/jira/browse/NIFI-8154
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Extensions
>Affects Versions: 1.12.1, 1.11.3
>Reporter: Glenn Jones
>Assignee: Pierre Villard
>Priority: Minor
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> FetchParquet can't be used to process files containing nested structs.  When 
> trying to create a RecordSchema it runs into 
> https://issues.apache.org/jira/browse/PARQUET-1441, which causes it to fail.  
> We've patched this locally by building the nifi-parquet-processors with 
> parquet-avro 1.11.0, but it would be great if this made it into the next 
> release.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >