[GitHub] [nifi] pvillard31 edited a comment on pull request #4768: NIFI-8155 - add banner text in page title
pvillard31 edited a comment on pull request #4768: URL: https://github.com/apache/nifi/pull/4768#issuecomment-764453581 Thanks for looking into this @exceptionfactory. As far as I can tell the other places you mentioned are not "really" opening new windows and this not impacting the overall page title: https://user-images.githubusercontent.com/11541012/105320361-801ab280-5bdf-11eb-8fc0-969aac536b0c.png;> The only edge case I can think of is when looking at the content of a flow file: this will open a new tab/window and the title won't contain the info. But I believe this a very special edge case and I think this is best to leave it outside of the change. https://user-images.githubusercontent.com/11541012/105320689-e8699400-5bdf-11eb-84a7-dff5269efa25.png;> This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] pvillard31 commented on pull request #4768: NIFI-8155 - add banner text in page title
pvillard31 commented on pull request #4768: URL: https://github.com/apache/nifi/pull/4768#issuecomment-764453581 Thanks for looking into this @exceptionfactory. As far as I can tell the other places you mentioned are not "really" opening new windows and this not impacting the overall page title: https://user-images.githubusercontent.com/11541012/105320361-801ab280-5bdf-11eb-8fc0-969aac536b0c.png;> The only edge case I can think of is when looking at the content of a flow file: this will open a new tab/window and the title won't contain the info. But I believe this a very special edge case and I think this is best to leave it outside of the change. https://user-images.githubusercontent.com/11541012/105320626-d687f100-5bdf-11eb-9785-7dc48cb1da51.png;> This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (NIFI-7263) Add a No tracking Strategy to ListFile/ListFTP
[ https://issues.apache.org/jira/browse/NIFI-7263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17269099#comment-17269099 ] Jens M Kofoed commented on NIFI-7263: - I have 2 cases. Case 1: We have a 2 networks which is isolated from each other with a datadiode. So one of the network has no contact to the internet. From time to time we need to transfer a new driver and software from the internet at transfer it to the inside network. All transfering flows is handled by NIFI. in this case we have a drop-folder where NIFI moves all files to a similar folder on the inside. Here we use a List and Fetch processors instead of a getfile because we like to use the benefit of the cluster. Since we manually copy files to the drop folder, these files keeps there timestamp. So we can not use the "Tracking Timestamps" strategy. If using the "Tracking Entities" strategy the "Entity Tracking Time Window" needs to be set to years. Case 2: We have a file server where different systems write files in different subfolders. We use NIFI to Syncronise all files looking in the root folder and set Recurse Subdirectories to true. We are not allowed to delete files. So all files will be there all the time. Therefore we can't use a GetFile process. If we use the "Tracking Timestamps" strategy we have had a situation where a file was not picked up by NIFI. If there are many files when NIFI start scanning all files/folders, and a new files is written to the first folder just after NIFI has looked in that folder, this file will not be in the list. If another file is written to the last folder NIFI is scanning it will be in the list and that file will have a newer/younger timestamp. So next time NIFI is scanning the file will not be picked up, because it will be older than the last timestamp. Therefore we are using the "Tracking Entities" strategy which has another issue. If you are using a filter regex and change it. The "Tracking Entities" starts all over again, listing all files. Therefore we have made our own flow where we create a hash value from path, filename, filesize and timestamp check if that hash have been seen before. We have had situation where some kind of files needed to be transferred again. So with our own detection flow we can route all duplicated Hashes to a subflow and create a new route for special situations. In both these cases we really don't need any strategy built-in in the listfile processor. We just need it to list all files, no matter timestamp. > Add a No tracking Strategy to ListFile/ListFTP > -- > > Key: NIFI-7263 > URL: https://issues.apache.org/jira/browse/NIFI-7263 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Jens M Kofoed >Assignee: Waleed Al Aibani >Priority: Major > Labels: ListFile, listftp > Fix For: 1.13.0 > > Time Spent: 1h 20m > Remaining Estimate: 0h > > The Listfile/ListFTP has 2 Listing Strategies: Tracking Timestamps and > Tracking Entities. > It would be very very nice if the List process also could have a No Tracking > (fix it your self) strategy > If running NIFI in a cluster the List/Fetch is the perfect solution instead > of using a GetFile. But we have had many caces where files in the pickup > folder has old timestamps, so here we have to use Tracking Entities. > The issue is in cases where you are not allowed to delete files but you have > to make a change to the file filter. The tracking entities start all over, > and list all files again. > In other situations we need to resent all data, and would like to clear the > state of the Tracking Entities. But you can't. > So I have to make a small flow for detecting duplicates. And in some cases > just ignore duplicates and in other caces open up for sending duplicates. But > it is a pain in the ... to use the Tracking Entities. > So a NO STRATEGY would be very very nice -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (NIFI-8158) Extensibility of NiFi itself
Maciej Gromuł created NIFI-8158: --- Summary: Extensibility of NiFi itself Key: NIFI-8158 URL: https://issues.apache.org/jira/browse/NIFI-8158 Project: Apache NiFi Issue Type: Improvement Reporter: Maciej Gromuł It would be nice if it were possible to add custom modules to application through nar's loaded together with app at first boot or through extensions. Currently if we would want to add anything we would need to recompile whole nifi-framework-bundle since it contains all resources without exposing any way of accessing them. That's not about custom processors but more like adding functionality to nifi itself without having to create forks of whole nifi and then managing updates between those. For example a way to hook with the spring context to create additional endpoints to api, and way to hook to menu builder (which currently is simply a static list in html connected with js) to add your own menu options with custom actions. That way we could for example add something like marketplace solution for nifi where users can search for processors in some kind of repository and install those processors dynamically (similar to marketplaces in CMS systems). Currently few of the places in the app are theoretically behind interfaces but the implementations are still hardcoded. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (NIFI-8156) PutCassandraRecord does not wrap byte arrays with a ByteBuffer, causing write failures
[ https://issues.apache.org/jira/browse/NIFI-8156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17269086#comment-17269086 ] ASF subversion and git services commented on NIFI-8156: --- Commit 953327cdf587c6b68765c0d32508873d8a0031e7 in nifi's branch refs/heads/main from Mike Thomsen [ https://gitbox.apache.org/repos/asf?p=nifi.git;h=953327c ] NIFI-8156 Fixed byte handling bug in cassandra. Signed-off-by: Pierre Villard This closes #4771. > PutCassandraRecord does not wrap byte arrays with a ByteBuffer, causing write > failures > -- > > Key: NIFI-8156 > URL: https://issues.apache.org/jira/browse/NIFI-8156 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.12.1 >Reporter: Mike Thomsen >Assignee: Mike Thomsen >Priority: Major > Fix For: 1.13.0 > > Time Spent: 1h 10m > Remaining Estimate: 0h > > As the subject line says, types that are meant for a Cassandra bytes field > are not wrapped inside of a ByteBuffer. This causes a write failure when the > Cassandra driver attempts to write the array. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (NIFI-8156) PutCassandraRecord does not wrap byte arrays with a ByteBuffer, causing write failures
[ https://issues.apache.org/jira/browse/NIFI-8156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pierre Villard resolved NIFI-8156. -- Resolution: Fixed > PutCassandraRecord does not wrap byte arrays with a ByteBuffer, causing write > failures > -- > > Key: NIFI-8156 > URL: https://issues.apache.org/jira/browse/NIFI-8156 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.12.1 >Reporter: Mike Thomsen >Assignee: Mike Thomsen >Priority: Major > Fix For: 1.13.0 > > Time Spent: 1h 10m > Remaining Estimate: 0h > > As the subject line says, types that are meant for a Cassandra bytes field > are not wrapped inside of a ByteBuffer. This causes a write failure when the > Cassandra driver attempts to write the array. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi] asfgit closed pull request #4771: NIFI-8156 Fixed byte handling bug in cassandra.
asfgit closed pull request #4771: URL: https://github.com/apache/nifi/pull/4771 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (NIFI-8043) PutDatabaseRecord Postgres Upsert On Conflict keys not quoted
[ https://issues.apache.org/jira/browse/NIFI-8043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pierre Villard updated NIFI-8043: - Fix Version/s: 1.13.0 Resolution: Fixed Status: Resolved (was: Patch Available) > PutDatabaseRecord Postgres Upsert On Conflict keys not quoted > - > > Key: NIFI-8043 > URL: https://issues.apache.org/jira/browse/NIFI-8043 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Reporter: Daniel Cheung >Assignee: Matt Burgess >Priority: Major > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > h2. First attempt with camel case (fails when translate field name is true or > false) > Given that "Quote Column Identifiers" is enabled, one would expect the column > names inside the conflict clause be quoted as well. However, they didn't seem > to have been quoted, because my table's column names contain upper and > lowercases and the flowfile is routed to the failure relationship of the > PutDatabaseRecord processor with the DB error: {{ERROR: column "camelcase" > does not exist}}. > Whether setting "Update Keys" or not did not affect the outcome. If I > understand, "Update Keys" would also affect the conflict clause, but it's > also not quoted, and does not accept a string with manually quoted column > names. > SQL in question found in the DB error in the log, simplified from what I saw. > {{INSERT INTO "public"."my_table"("camelCase", "txt")}} > {{VALUES ("test", "test")}} > {{ON CONFLICT (CAMELCASE)}} > {{DO UPDATE SET ("camelCase", "txt") = (}} > {{ EXCLUDED."camelCase",}} > {{ EXCLUDED."txt"}} > {{)}} > h2. Second attempt with snake case (fails when translate field name is true) > I changed my column names to {{_snake_case, txt}} and try upserting again and > it still failed with this SQL in nifi-app.log: > {{INSERT INTO "public"."my_table"("_snake_case", "txt")}} > {{VALUES ("test", "test")}} > {{ON CONFLICT (SNAKECASE)}} > {{DO UPDATE SET ("}}{{_snake_case}}{{", "txt") = (}} > {{ EXCLUDED."}}{{_snake_case}}{{",}} > {{ EXCLUDED."txt"}} > {{)}} > > h2. Current workaround > I currently need to *disable translate field name* and set my table to *use > snake case names as column names* to be able to use upsert -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (NIFI-8043) PutDatabaseRecord Postgres Upsert On Conflict keys not quoted
[ https://issues.apache.org/jira/browse/NIFI-8043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17269085#comment-17269085 ] ASF subversion and git services commented on NIFI-8043: --- Commit fb2a8b5820b816d7afddb5141cd064275267f796 in nifi's branch refs/heads/main from Matt Burgess [ https://gitbox.apache.org/repos/asf?p=nifi.git;h=fb2a8b5 ] NIFI-8043: Quote update key column names in PutDatabaseRecord Signed-off-by: Pierre Villard This closes #4772. > PutDatabaseRecord Postgres Upsert On Conflict keys not quoted > - > > Key: NIFI-8043 > URL: https://issues.apache.org/jira/browse/NIFI-8043 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Reporter: Daniel Cheung >Assignee: Matt Burgess >Priority: Major > Time Spent: 10m > Remaining Estimate: 0h > > h2. First attempt with camel case (fails when translate field name is true or > false) > Given that "Quote Column Identifiers" is enabled, one would expect the column > names inside the conflict clause be quoted as well. However, they didn't seem > to have been quoted, because my table's column names contain upper and > lowercases and the flowfile is routed to the failure relationship of the > PutDatabaseRecord processor with the DB error: {{ERROR: column "camelcase" > does not exist}}. > Whether setting "Update Keys" or not did not affect the outcome. If I > understand, "Update Keys" would also affect the conflict clause, but it's > also not quoted, and does not accept a string with manually quoted column > names. > SQL in question found in the DB error in the log, simplified from what I saw. > {{INSERT INTO "public"."my_table"("camelCase", "txt")}} > {{VALUES ("test", "test")}} > {{ON CONFLICT (CAMELCASE)}} > {{DO UPDATE SET ("camelCase", "txt") = (}} > {{ EXCLUDED."camelCase",}} > {{ EXCLUDED."txt"}} > {{)}} > h2. Second attempt with snake case (fails when translate field name is true) > I changed my column names to {{_snake_case, txt}} and try upserting again and > it still failed with this SQL in nifi-app.log: > {{INSERT INTO "public"."my_table"("_snake_case", "txt")}} > {{VALUES ("test", "test")}} > {{ON CONFLICT (SNAKECASE)}} > {{DO UPDATE SET ("}}{{_snake_case}}{{", "txt") = (}} > {{ EXCLUDED."}}{{_snake_case}}{{",}} > {{ EXCLUDED."txt"}} > {{)}} > > h2. Current workaround > I currently need to *disable translate field name* and set my table to *use > snake case names as column names* to be able to use upsert -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi] asfgit closed pull request #4772: NIFI-8043: Quote update key column names in PutDatabaseRecord
asfgit closed pull request #4772: URL: https://github.com/apache/nifi/pull/4772 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] pvillard31 commented on a change in pull request #4594: NIFI-3669 Add SSL Support to CaptureChangeMySQL
pvillard31 commented on a change in pull request #4594: URL: https://github.com/apache/nifi/pull/4594#discussion_r561646469 ## File path: nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java ## @@ -368,6 +393,23 @@ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); +public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() +.name("SSL Context Service") +.displayName("SSL Context Service") +.description("SSL Context Service supporting encrypted socket communication") +.required(false) +.identifiesControllerService(SSLContextService.class) +.build(); + +public static final PropertyDescriptor SSL_MODE = new PropertyDescriptor.Builder() Review comment: Do we want to use the new .dependsOn functionality to display this property only if the SSL Context Service is set? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (NIFI-8157) Adding support for other kinds of Parameter Contexts
Maciej Gromuł created NIFI-8157: --- Summary: Adding support for other kinds of Parameter Contexts Key: NIFI-8157 URL: https://issues.apache.org/jira/browse/NIFI-8157 Project: Apache NiFi Issue Type: New Feature Components: Extensions Reporter: Maciej Gromuł Currently there's only 1 implementation of Parameter Context which is standard parameter context. It would be nice if it would be possible to provide to nifi your own implementation. In our case we've got a service which is leveraging AWS KMS to store credentials and some additional stuff, but current implementation doesn't support connecting to it or providing any way to add custom Parameter Context. It would be nice if we could provide custom implementation which would then be used in nifi (it would get serialized/deserialized etc.) get spread on the nodes in cluster mode. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (NIFI-8043) PutDatabaseRecord Postgres Upsert On Conflict keys not quoted
[ https://issues.apache.org/jira/browse/NIFI-8043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matt Burgess updated NIFI-8043: --- Status: Patch Available (was: In Progress) > PutDatabaseRecord Postgres Upsert On Conflict keys not quoted > - > > Key: NIFI-8043 > URL: https://issues.apache.org/jira/browse/NIFI-8043 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Reporter: Daniel Cheung >Assignee: Matt Burgess >Priority: Major > > h2. First attempt with camel case (fails when translate field name is true or > false) > Given that "Quote Column Identifiers" is enabled, one would expect the column > names inside the conflict clause be quoted as well. However, they didn't seem > to have been quoted, because my table's column names contain upper and > lowercases and the flowfile is routed to the failure relationship of the > PutDatabaseRecord processor with the DB error: {{ERROR: column "camelcase" > does not exist}}. > Whether setting "Update Keys" or not did not affect the outcome. If I > understand, "Update Keys" would also affect the conflict clause, but it's > also not quoted, and does not accept a string with manually quoted column > names. > SQL in question found in the DB error in the log, simplified from what I saw. > {{INSERT INTO "public"."my_table"("camelCase", "txt")}} > {{VALUES ("test", "test")}} > {{ON CONFLICT (CAMELCASE)}} > {{DO UPDATE SET ("camelCase", "txt") = (}} > {{ EXCLUDED."camelCase",}} > {{ EXCLUDED."txt"}} > {{)}} > h2. Second attempt with snake case (fails when translate field name is true) > I changed my column names to {{_snake_case, txt}} and try upserting again and > it still failed with this SQL in nifi-app.log: > {{INSERT INTO "public"."my_table"("_snake_case", "txt")}} > {{VALUES ("test", "test")}} > {{ON CONFLICT (SNAKECASE)}} > {{DO UPDATE SET ("}}{{_snake_case}}{{", "txt") = (}} > {{ EXCLUDED."}}{{_snake_case}}{{",}} > {{ EXCLUDED."txt"}} > {{)}} > > h2. Current workaround > I currently need to *disable translate field name* and set my table to *use > snake case names as column names* to be able to use upsert -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (NIFI-8043) PutDatabaseRecord Postgres Upsert On Conflict keys not quoted
[ https://issues.apache.org/jira/browse/NIFI-8043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matt Burgess updated NIFI-8043: --- Affects Version/s: (was: 1.12.0) > PutDatabaseRecord Postgres Upsert On Conflict keys not quoted > - > > Key: NIFI-8043 > URL: https://issues.apache.org/jira/browse/NIFI-8043 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Reporter: Daniel Cheung >Assignee: Matt Burgess >Priority: Major > > h2. First attempt with camel case (fails when translate field name is true or > false) > Given that "Quote Column Identifiers" is enabled, one would expect the column > names inside the conflict clause be quoted as well. However, they didn't seem > to have been quoted, because my table's column names contain upper and > lowercases and the flowfile is routed to the failure relationship of the > PutDatabaseRecord processor with the DB error: {{ERROR: column "camelcase" > does not exist}}. > Whether setting "Update Keys" or not did not affect the outcome. If I > understand, "Update Keys" would also affect the conflict clause, but it's > also not quoted, and does not accept a string with manually quoted column > names. > SQL in question found in the DB error in the log, simplified from what I saw. > {{INSERT INTO "public"."my_table"("camelCase", "txt")}} > {{VALUES ("test", "test")}} > {{ON CONFLICT (CAMELCASE)}} > {{DO UPDATE SET ("camelCase", "txt") = (}} > {{ EXCLUDED."camelCase",}} > {{ EXCLUDED."txt"}} > {{)}} > h2. Second attempt with snake case (fails when translate field name is true) > I changed my column names to {{_snake_case, txt}} and try upserting again and > it still failed with this SQL in nifi-app.log: > {{INSERT INTO "public"."my_table"("_snake_case", "txt")}} > {{VALUES ("test", "test")}} > {{ON CONFLICT (SNAKECASE)}} > {{DO UPDATE SET ("}}{{_snake_case}}{{", "txt") = (}} > {{ EXCLUDED."}}{{_snake_case}}{{",}} > {{ EXCLUDED."txt"}} > {{)}} > > h2. Current workaround > I currently need to *disable translate field name* and set my table to *use > snake case names as column names* to be able to use upsert -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi] mtien-apache commented on a change in pull request #4767: NIFI-1355 Implemented new methods in KeyStoreUtils to programmatical…
mtien-apache commented on a change in pull request #4767: URL: https://github.com/apache/nifi/pull/4767#discussion_r561433596 ## File path: nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/KeyStoreUtils.java ## @@ -245,7 +322,7 @@ public static TrustManagerFactory loadTrustManagerFactory(TlsConfiguration tlsCo */ public static TrustManagerFactory loadTrustManagerFactory(String truststorePath, String truststorePassword, String truststoreType) throws TlsException { // Legacy truststore passwords can be empty -final char[] truststorePasswordChars = StringUtils.isNotBlank(truststorePassword) ? truststorePassword.toCharArray() : null; +final char[] truststorePasswordChars = StringUtils.isNotBlank(truststorePassword) ? truststorePassword.toCharArray() : "".toCharArray(); Review comment: @exceptionfactory I received a Null Pointer Exception for an empty password when the truststore type is PKCS12, so I changed it to an empty string. But after some investigation, I found that the Bouncy Castle PKCS12 store type does not allow empty passwords. Since we allow passwordless truststores, I'll add a check for the truststore type. If it's PKCS12, then I'll throw an Illegal Argument Exception, otherwise I'll set it back to `null`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] mattyb149 opened a new pull request #4772: NIFI-8043: Quote update key column names in PutDatabaseRecord
mattyb149 opened a new pull request #4772: URL: https://github.com/apache/nifi/pull/4772 Thank you for submitting a contribution to Apache NiFi. Please provide a short description of the PR here: Description of PR The "Quote Column Names" property was not being honored for update keys gleaned from the table metadata. This PR quotes them (if configured to do so) before passing to the DatabaseAdapter for UPSERT and UPDATE. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with **NIFI-** where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically `main`)? - [x] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._ ### For code changes: - [ ] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder? - [x] Have you written or updated unit tests to verify your changes? - [x] Have you verified that the full build is successful on JDK 8? - [ ] Have you verified that the full build is successful on JDK 11? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`? - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`? - [ ] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (NIFI-8043) PutDatabaseRecord Postgres Upsert On Conflict keys not quoted
[ https://issues.apache.org/jira/browse/NIFI-8043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17268952#comment-17268952 ] Matt Burgess commented on NIFI-8043: The key column names don't get quoted in PutDatabaseRecord before passing them to the database adapter to generate an UPSERT statement. > PutDatabaseRecord Postgres Upsert On Conflict keys not quoted > - > > Key: NIFI-8043 > URL: https://issues.apache.org/jira/browse/NIFI-8043 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Affects Versions: 1.12.0 >Reporter: Daniel Cheung >Assignee: Matt Burgess >Priority: Major > > h2. First attempt with camel case (fails when translate field name is true or > false) > Given that "Quote Column Identifiers" is enabled, one would expect the column > names inside the conflict clause be quoted as well. However, they didn't seem > to have been quoted, because my table's column names contain upper and > lowercases and the flowfile is routed to the failure relationship of the > PutDatabaseRecord processor with the DB error: {{ERROR: column "camelcase" > does not exist}}. > Whether setting "Update Keys" or not did not affect the outcome. If I > understand, "Update Keys" would also affect the conflict clause, but it's > also not quoted, and does not accept a string with manually quoted column > names. > SQL in question found in the DB error in the log, simplified from what I saw. > {{INSERT INTO "public"."my_table"("camelCase", "txt")}} > {{VALUES ("test", "test")}} > {{ON CONFLICT (CAMELCASE)}} > {{DO UPDATE SET ("camelCase", "txt") = (}} > {{ EXCLUDED."camelCase",}} > {{ EXCLUDED."txt"}} > {{)}} > h2. Second attempt with snake case (fails when translate field name is true) > I changed my column names to {{_snake_case, txt}} and try upserting again and > it still failed with this SQL in nifi-app.log: > {{INSERT INTO "public"."my_table"("_snake_case", "txt")}} > {{VALUES ("test", "test")}} > {{ON CONFLICT (SNAKECASE)}} > {{DO UPDATE SET ("}}{{_snake_case}}{{", "txt") = (}} > {{ EXCLUDED."}}{{_snake_case}}{{",}} > {{ EXCLUDED."txt"}} > {{)}} > > h2. Current workaround > I currently need to *disable translate field name* and set my table to *use > snake case names as column names* to be able to use upsert -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (NIFI-8043) PutDatabaseRecord Postgres Upsert On Conflict keys not quoted
[ https://issues.apache.org/jira/browse/NIFI-8043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matt Burgess reassigned NIFI-8043: -- Assignee: Matt Burgess > PutDatabaseRecord Postgres Upsert On Conflict keys not quoted > - > > Key: NIFI-8043 > URL: https://issues.apache.org/jira/browse/NIFI-8043 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Affects Versions: 1.12.0 >Reporter: Daniel Cheung >Assignee: Matt Burgess >Priority: Major > > h2. First attempt with camel case (fails when translate field name is true or > false) > Given that "Quote Column Identifiers" is enabled, one would expect the column > names inside the conflict clause be quoted as well. However, they didn't seem > to have been quoted, because my table's column names contain upper and > lowercases and the flowfile is routed to the failure relationship of the > PutDatabaseRecord processor with the DB error: {{ERROR: column "camelcase" > does not exist}}. > Whether setting "Update Keys" or not did not affect the outcome. If I > understand, "Update Keys" would also affect the conflict clause, but it's > also not quoted, and does not accept a string with manually quoted column > names. > SQL in question found in the DB error in the log, simplified from what I saw. > {{INSERT INTO "public"."my_table"("camelCase", "txt")}} > {{VALUES ("test", "test")}} > {{ON CONFLICT (CAMELCASE)}} > {{DO UPDATE SET ("camelCase", "txt") = (}} > {{ EXCLUDED."camelCase",}} > {{ EXCLUDED."txt"}} > {{)}} > h2. Second attempt with snake case (fails when translate field name is true) > I changed my column names to {{_snake_case, txt}} and try upserting again and > it still failed with this SQL in nifi-app.log: > {{INSERT INTO "public"."my_table"("_snake_case", "txt")}} > {{VALUES ("test", "test")}} > {{ON CONFLICT (SNAKECASE)}} > {{DO UPDATE SET ("}}{{_snake_case}}{{", "txt") = (}} > {{ EXCLUDED."}}{{_snake_case}}{{",}} > {{ EXCLUDED."txt"}} > {{)}} > > h2. Current workaround > I currently need to *disable translate field name* and set my table to *use > snake case names as column names* to be able to use upsert -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi] thenatog commented on pull request #4753: NIFI-7356 - Enable TLS for embedded Zookeeper when NiFi has TLS enabled
thenatog commented on pull request #4753: URL: https://github.com/apache/nifi/pull/4753#issuecomment-764042674 I've also updated the admin-guide similar to requested above. Let me know if any further changes are required. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] turcsanyip commented on a change in pull request #4738: NIFI-7890 - Added record support to ConsumeMQTT processor
turcsanyip commented on a change in pull request #4738: URL: https://github.com/apache/nifi/pull/4738#discussion_r561332768 ## File path: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java ## @@ -334,14 +443,210 @@ public void process(final OutputStream out) throws IOException { if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) { logger.warn(new StringBuilder("FlowFile ") .append(messageFlowfile.getAttribute(CoreAttributes.UUID.key())) -.append(" for Mqtt message ") +.append(" for MQTT message ") .append(mqttMessage) .append(" had already been removed from queue, possible duplication of flow files") .toString()); } } } +private void transferQueueDemarcator(final ProcessContext context, final ProcessSession session){ +final byte[] demarcator = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8); + +FlowFile messageFlowfile = session.create(); +session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker); + + +messageFlowfile = session.append(messageFlowfile, out -> { +while (!mqttQueue.isEmpty()) { +final MQTTQueueMessage mqttMessage = mqttQueue.poll(); +out.write(mqttMessage.getPayload()); +out.write(demarcator); +session.adjustCounter(COUNTER_RECORDS_RECEIVED, 1L, false); +} +}); + +session.getProvenanceReporter().receive(messageFlowfile, new StringBuilder(broker).append(topicPrefix).append(topicFilter).toString()); Review comment: There is no separator character between the broker and the topic prefix (eg.: `tcp://myhost:1883mytopic`). `'/'` cloud be added before topic prefix. It could be changed in the existing `transferQueue()` method too. ## File path: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java ## @@ -334,14 +443,210 @@ public void process(final OutputStream out) throws IOException { if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) { logger.warn(new StringBuilder("FlowFile ") .append(messageFlowfile.getAttribute(CoreAttributes.UUID.key())) -.append(" for Mqtt message ") +.append(" for MQTT message ") .append(mqttMessage) .append(" had already been removed from queue, possible duplication of flow files") .toString()); } } } +private void transferQueueDemarcator(final ProcessContext context, final ProcessSession session){ +final byte[] demarcator = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8); + +FlowFile messageFlowfile = session.create(); +session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker); + + +messageFlowfile = session.append(messageFlowfile, out -> { +while (!mqttQueue.isEmpty()) { Review comment: Emptying the queue seems to me a bit non-deterministic behaviour because the queue is being written at the same time by the receiver thread. Would not it be useful to define a max. size that may be fetched in one go? (a magic number or a processor property) ## File path: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java ## @@ -334,14 +443,210 @@ public void process(final OutputStream out) throws IOException { if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) { logger.warn(new StringBuilder("FlowFile ") .append(messageFlowfile.getAttribute(CoreAttributes.UUID.key())) -.append(" for Mqtt message ") +.append(" for MQTT message ") .append(mqttMessage) .append(" had already been removed from queue, possible duplication of flow files") .toString()); } } } +private void transferQueueDemarcator(final ProcessContext context, final ProcessSession session){ +final byte[] demarcator = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8); + +FlowFile messageFlowfile = session.create(); +session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker); + + +messageFlowfile = session.append(messageFlowfile, out -> { +
[jira] [Updated] (NIFI-8154) AvroParquetHDFSRecordReader fails to read parquet file containing nested structs
[ https://issues.apache.org/jira/browse/NIFI-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Glenn Jones updated NIFI-8154: -- Status: Patch Available (was: Open) > AvroParquetHDFSRecordReader fails to read parquet file containing nested > structs > > > Key: NIFI-8154 > URL: https://issues.apache.org/jira/browse/NIFI-8154 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Affects Versions: 1.12.1, 1.11.3 >Reporter: Glenn Jones >Priority: Minor > Attachments: > 0001-NIFI-8154-upversion-parquet-avro-to-1.11.1-and-add-u.patch > > Time Spent: 0.5h > Remaining Estimate: 0h > > FetchParquet can't be used to process files containing nested structs. When > trying to create a RecordSchema it runs into > https://issues.apache.org/jira/browse/PARQUET-1441, which causes it to fail. > We've patched this locally by building the nifi-parquet-processors with > parquet-avro 1.11.0, but it would be great if this made it into the next > release. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (NIFI-8154) AvroParquetHDFSRecordReader fails to read parquet file containing nested structs
[ https://issues.apache.org/jira/browse/NIFI-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Glenn Jones updated NIFI-8154: -- Attachment: 0001-NIFI-8154-upversion-parquet-avro-to-1.11.1-and-add-u.patch > AvroParquetHDFSRecordReader fails to read parquet file containing nested > structs > > > Key: NIFI-8154 > URL: https://issues.apache.org/jira/browse/NIFI-8154 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Affects Versions: 1.11.3, 1.12.1 >Reporter: Glenn Jones >Priority: Minor > Attachments: > 0001-NIFI-8154-upversion-parquet-avro-to-1.11.1-and-add-u.patch > > Time Spent: 0.5h > Remaining Estimate: 0h > > FetchParquet can't be used to process files containing nested structs. When > trying to create a RecordSchema it runs into > https://issues.apache.org/jira/browse/PARQUET-1441, which causes it to fail. > We've patched this locally by building the nifi-parquet-processors with > parquet-avro 1.11.0, but it would be great if this made it into the next > release. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (NIFI-8150) Change "Download flow" context menu selection for PGs to "Download flow definition"
[ https://issues.apache.org/jira/browse/NIFI-8150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bryan Bende resolved NIFI-8150. --- Fix Version/s: 1.13.0 Resolution: Fixed > Change "Download flow" context menu selection for PGs to "Download flow > definition" > - > > Key: NIFI-8150 > URL: https://issues.apache.org/jira/browse/NIFI-8150 > Project: Apache NiFi > Issue Type: Improvement > Components: Core UI >Reporter: Andrew M. Lim >Assignee: Andrew M. Lim >Priority: Major > Fix For: 1.13.0 > > Time Spent: 40m > Remaining Estimate: 0h > > The current text that says "Download flow" is very ambiguous. Changing to > "Download flow definition" will better alert the user that the action > downloads: > A JSON file that is a flow definition of the process group and not to be > confused with a template, flow.xml.gz or a versioned flow. > Should update the docs for accordingly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi] bbende merged pull request #4766: NIFI-8150 Change Download flow to Download flow definition for proces…
bbende merged pull request #4766: URL: https://github.com/apache/nifi/pull/4766 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (NIFI-8150) Change "Download flow" context menu selection for PGs to "Download flow definition"
[ https://issues.apache.org/jira/browse/NIFI-8150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17268870#comment-17268870 ] ASF subversion and git services commented on NIFI-8150: --- Commit 27f57e64635c9478b811886b6a0b207c5972d5ac in nifi's branch refs/heads/main from Andrew Lim [ https://gitbox.apache.org/repos/asf?p=nifi.git;h=27f57e6 ] NIFI-8150 Change Download flow to Download flow definition for process groups (#4766) > Change "Download flow" context menu selection for PGs to "Download flow > definition" > - > > Key: NIFI-8150 > URL: https://issues.apache.org/jira/browse/NIFI-8150 > Project: Apache NiFi > Issue Type: Improvement > Components: Core UI >Reporter: Andrew M. Lim >Assignee: Andrew M. Lim >Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > > The current text that says "Download flow" is very ambiguous. Changing to > "Download flow definition" will better alert the user that the action > downloads: > A JSON file that is a flow definition of the process group and not to be > confused with a template, flow.xml.gz or a versioned flow. > Should update the docs for accordingly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi] bbende commented on pull request #4766: NIFI-8150 Change Download flow to Download flow definition for proces…
bbende commented on pull request #4766: URL: https://github.com/apache/nifi/pull/4766#issuecomment-763942429 +1 LGTM This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (NIFI-8154) AvroParquetHDFSRecordReader fails to read parquet file containing nested structs
[ https://issues.apache.org/jira/browse/NIFI-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17268869#comment-17268869 ] Glenn Jones commented on NIFI-8154: --- The test fails because it expects a field in the Record produced by ConvertAvroToParquet to be named "map", but it is actually named "key_value". In parquet-avro 1.10.0, AvroParquetWriter produces parquet with a schema that includes the following definition for the mymap field from the test avro: required group mymap (MAP) { repeated group map (MAP_KEY_VALUE) { required binary key (UTF8); required int32 value; } } This doesn't conform to the Map logical type, but it is within the [backward compatibility rules|https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules-1] In parquet-avro 1.11.1, AvroParquetWriter produces the following which I think is more correct (the middle level is named "key_value" instead of "map") required group mymap (MAP) { repeated group key_value (MAP_KEY_VALUE) { required binary key (STRING); required int32 value; } } The test uses GroupReadSupport to read the parquet into something it can examine and as a result the middle level group name has changed from "map" to "key_value". I doubt that other ReadSupport implementations would expose the name of the middle level group in this way, so perhaps this wouldn't have been an issue if the tests had used AvroReadSupport. In any case, I think it's fine to simply update the tests to expect the field names from the 1.11.1 AvroParquetWriter. > AvroParquetHDFSRecordReader fails to read parquet file containing nested > structs > > > Key: NIFI-8154 > URL: https://issues.apache.org/jira/browse/NIFI-8154 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Affects Versions: 1.11.3, 1.12.1 >Reporter: Glenn Jones >Priority: Minor > Time Spent: 0.5h > Remaining Estimate: 0h > > FetchParquet can't be used to process files containing nested structs. When > trying to create a RecordSchema it runs into > https://issues.apache.org/jira/browse/PARQUET-1441, which causes it to fail. > We've patched this locally by building the nifi-parquet-processors with > parquet-avro 1.11.0, but it would be great if this made it into the next > release. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi] thenatog commented on pull request #4753: NIFI-7356 - Enable TLS for embedded Zookeeper when NiFi has TLS enabled
thenatog commented on pull request #4753: URL: https://github.com/apache/nifi/pull/4753#issuecomment-763911925 Updated again. Thanks to both of you for your reviews! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] mtien-apache commented on a change in pull request #4767: NIFI-1355 Implemented new methods in KeyStoreUtils to programmatical…
mtien-apache commented on a change in pull request #4767: URL: https://github.com/apache/nifi/pull/4767#discussion_r561247262 ## File path: nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/KeyStoreUtils.java ## @@ -125,6 +146,63 @@ public static KeyStore loadKeyStore(String keystorePath, char[] keystorePassword } } +/** + * Creates a temporary default Keystore and Truststore and returns it wrapped in a TLS configuration. + * + * @return a {@link org.apache.nifi.security.util.TlsConfiguration} + */ +public static TlsConfiguration createTlsConfigAndNewKeystoreTruststore() throws IOException, GeneralSecurityException { +return createTlsConfigAndNewKeystoreTruststore(new StandardTlsConfiguration()); +} + +/** + * Creates a temporary Keystore and Truststore and returns it wrapped in a new TLS configuration with the given values. + * + * @param tlsConfiguration a {@link org.apache.nifi.security.util.TlsConfiguration} + * @return a {@link org.apache.nifi.security.util.TlsConfiguration} + */ +public static TlsConfiguration createTlsConfigAndNewKeystoreTruststore(final TlsConfiguration tlsConfiguration) throws IOException, GeneralSecurityException { +final Path keyStorePath; +final String keystorePassword = StringUtils.isNotBlank(tlsConfiguration.getKeystorePassword()) ? tlsConfiguration.getKeystorePassword() : generatePassword(); +final String keyPassword = StringUtils.isNotBlank(tlsConfiguration.getKeyPassword())? tlsConfiguration.getKeyPassword() : keystorePassword; +final KeystoreType keystoreType = tlsConfiguration.getKeystoreType() != null ? tlsConfiguration.getKeystoreType() : KeystoreType.PKCS12; +final Path trustStorePath; +final String truststorePassword = StringUtils.isNotBlank(tlsConfiguration.getTruststorePassword()) ? tlsConfiguration.getTruststorePassword() : ""; +final KeystoreType truststoreType = tlsConfiguration.getTruststoreType() != null ? tlsConfiguration.getTruststoreType() : KeystoreType.PKCS12; + +// Create temporary Keystore file +try { +keyStorePath = generateTempKeystorePath(keystoreType); +} catch (IOException e) { +logger.error(KEYSTORE_ERROR_MSG); +throw new UncheckedIOException(KEYSTORE_ERROR_MSG, e); +} + +// Create temporary Truststore file +try { +trustStorePath = generateTempTruststorePath(truststoreType); +} catch (IOException e) { +logger.error(TRUSTSTORE_ERROR_MSG); +throw new UncheckedIOException(TRUSTSTORE_ERROR_MSG, e); +} + +// Create X509 Certificate +final X509Certificate clientCert = createKeyStoreAndGetX509Certificate(KEY_ALIAS, keystorePassword, keyPassword, keyStorePath.toString(), keystoreType); + +// Create Truststore +createTrustStore(clientCert, CERT_ALIAS, truststorePassword, trustStorePath.toString(), getKeystoreType(truststoreType.toString())); + +return new StandardTlsConfiguration( +keyStorePath.toString(), +keystorePassword, +keyPassword, +getKeystoreType(keystoreType.toString()), +trustStorePath.toString(), +truststorePassword, +getKeystoreType(truststoreType.toString()), Review comment: The call is not necessary. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] exceptionfactory commented on a change in pull request #4767: NIFI-1355 Implemented new methods in KeyStoreUtils to programmatical…
exceptionfactory commented on a change in pull request #4767: URL: https://github.com/apache/nifi/pull/4767#discussion_r561195875 ## File path: nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/KeyStoreUtils.java ## @@ -125,6 +146,63 @@ public static KeyStore loadKeyStore(String keystorePath, char[] keystorePassword } } +/** + * Creates a temporary default Keystore and Truststore and returns it wrapped in a TLS configuration. + * + * @return a {@link org.apache.nifi.security.util.TlsConfiguration} + */ +public static TlsConfiguration createTlsConfigAndNewKeystoreTruststore() throws IOException, GeneralSecurityException { +return createTlsConfigAndNewKeystoreTruststore(new StandardTlsConfiguration()); +} + +/** + * Creates a temporary Keystore and Truststore and returns it wrapped in a new TLS configuration with the given values. + * + * @param tlsConfiguration a {@link org.apache.nifi.security.util.TlsConfiguration} + * @return a {@link org.apache.nifi.security.util.TlsConfiguration} + */ +public static TlsConfiguration createTlsConfigAndNewKeystoreTruststore(final TlsConfiguration tlsConfiguration) throws IOException, GeneralSecurityException { +final Path keyStorePath; +final String keystorePassword = StringUtils.isNotBlank(tlsConfiguration.getKeystorePassword()) ? tlsConfiguration.getKeystorePassword() : generatePassword(); +final String keyPassword = StringUtils.isNotBlank(tlsConfiguration.getKeyPassword())? tlsConfiguration.getKeyPassword() : keystorePassword; +final KeystoreType keystoreType = tlsConfiguration.getKeystoreType() != null ? tlsConfiguration.getKeystoreType() : KeystoreType.PKCS12; +final Path trustStorePath; +final String truststorePassword = StringUtils.isNotBlank(tlsConfiguration.getTruststorePassword()) ? tlsConfiguration.getTruststorePassword() : ""; +final KeystoreType truststoreType = tlsConfiguration.getTruststoreType() != null ? tlsConfiguration.getTruststoreType() : KeystoreType.PKCS12; + +// Create temporary Keystore file +try { +keyStorePath = generateTempKeystorePath(keystoreType); +} catch (IOException e) { +logger.error(KEYSTORE_ERROR_MSG); +throw new UncheckedIOException(KEYSTORE_ERROR_MSG, e); +} + +// Create temporary Truststore file +try { +trustStorePath = generateTempTruststorePath(truststoreType); +} catch (IOException e) { +logger.error(TRUSTSTORE_ERROR_MSG); +throw new UncheckedIOException(TRUSTSTORE_ERROR_MSG, e); +} + +// Create X509 Certificate +final X509Certificate clientCert = createKeyStoreAndGetX509Certificate(KEY_ALIAS, keystorePassword, keyPassword, keyStorePath.toString(), keystoreType); + +// Create Truststore +createTrustStore(clientCert, CERT_ALIAS, truststorePassword, trustStorePath.toString(), getKeystoreType(truststoreType.toString())); + +return new StandardTlsConfiguration( +keyStorePath.toString(), +keystorePassword, +keyPassword, +getKeystoreType(keystoreType.toString()), +trustStorePath.toString(), +truststorePassword, +getKeystoreType(truststoreType.toString()), Review comment: Are these calls to `getKeystoreType()` necessary? It looks like the values are already of the `KeystoreType` enum. ```suggestion truststoreType, ``` ## File path: nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/KeyStoreUtils.java ## @@ -125,6 +146,63 @@ public static KeyStore loadKeyStore(String keystorePath, char[] keystorePassword } } +/** + * Creates a temporary default Keystore and Truststore and returns it wrapped in a TLS configuration. + * + * @return a {@link org.apache.nifi.security.util.TlsConfiguration} + */ +public static TlsConfiguration createTlsConfigAndNewKeystoreTruststore() throws IOException, GeneralSecurityException { +return createTlsConfigAndNewKeystoreTruststore(new StandardTlsConfiguration()); +} + +/** + * Creates a temporary Keystore and Truststore and returns it wrapped in a new TLS configuration with the given values. + * + * @param tlsConfiguration a {@link org.apache.nifi.security.util.TlsConfiguration} + * @return a {@link org.apache.nifi.security.util.TlsConfiguration} + */ +public static TlsConfiguration createTlsConfigAndNewKeystoreTruststore(final TlsConfiguration tlsConfiguration) throws IOException, GeneralSecurityException { +final Path keyStorePath; +final String keystorePassword = StringUtils.isNotBlank(tlsConfiguration.getKeystorePassword()) ?
[GitHub] [nifi] jfrazee commented on pull request #4753: NIFI-7356 - Enable TLS for embedded Zookeeper when NiFi has TLS enabled
jfrazee commented on pull request #4753: URL: https://github.com/apache/nifi/pull/4753#issuecomment-763887511 I did a full round of testing yesterday and everything looked good. Will do another round today with the updates. On the docs side of things I think we need a stronger call out to the behavioral changes. I made some suggestions. We'll also need to add some migration guidance for the wiki. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] jfrazee edited a comment on pull request #4753: NIFI-7356 - Enable TLS for embedded Zookeeper when NiFi has TLS enabled
jfrazee edited a comment on pull request #4753: URL: https://github.com/apache/nifi/pull/4753#issuecomment-763887511 I did a full round of testing yesterday and everything looked good. Will do another round today with the updates. On the docs side of things I think we need a stronger call out to the behavioral changes. I made some suggestions. We'll also need to add some migration guidance to the wiki. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] jfrazee commented on a change in pull request #4753: NIFI-7356 - Enable TLS for embedded Zookeeper when NiFi has TLS enabled
jfrazee commented on a change in pull request #4753: URL: https://github.com/apache/nifi/pull/4753#discussion_r561228356 ## File path: nifi-docs/src/main/asciidoc/administration-guide.adoc ## @@ -2246,6 +2246,53 @@ _true_. Once Netty is enabled, you should see log messages like the following in 2020-02-24 23:37:54,082 INFO [nioEventLoopGroup-3-1] o.apache.zookeeper.ClientCnxnSocketNetty SSL handler added for channel: [id: 0xa831f9c3] 2020-02-24 23:37:54,104 INFO [nioEventLoopGroup-3-1] o.apache.zookeeper.ClientCnxnSocketNetty channel is connected: [id: 0xa831f9c3, L:/172.17.0.4:56510 - R:8e38869cd1d1/172.17.0.3:2281] +=== Embedded ZooKeeper with TLS + +A NiFi cluster can also be deployed using a ZooKeeper instance(s) embedded in NiFi itself which all nodes can communicate with. Communication between nodes and this embedded ZooKeeper can also be secured with TLS. The configuration for the client side of the connection will operate in the same way as an external ZooKeeper. That is, it will use the `+nifi.security.*+` properties from the nifi.properties file by default, unless you specifiy explicit ZooKeeper keystore/truststore properties with `+nifi.zookeeper.security.*+` as described above. Review comment: "Communication between nodes and this embedded ZooKeeper can also be secured with TLS." => "Communication between nodes and this embedded ZooKeeper will be secured with TLS if NiFi is secured with TLS. Versions of NiFi prior to 1.13 did not use secure client access with embedded ZooKeeper(s)." This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] turcsanyip commented on a change in pull request #4746: NIFI-8034: Fixed PropertyValue.isExpressionLanguagePresent always ret…
turcsanyip commented on a change in pull request #4746: URL: https://github.com/apache/nifi/pull/4746#discussion_r561226467 ## File path: nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestStandardPreparedQuery.java ## @@ -309,6 +309,24 @@ public void testVariableImpacted() { assertTrue(Query.prepare("${anyMatchingAttribute('a.*'):equals('hello')}").getVariableImpact().isImpacted("attr")); } +@Test +public void testIsExpressionLanguagePresent() { +assertFalse(Query.prepare("value").isExpressionLanguagePresent()); +assertFalse(Query.prepare("").isExpressionLanguagePresent()); + +assertTrue(Query.prepare("${variable}").isExpressionLanguagePresent()); + assertTrue(Query.prepare("${hostname()}").isExpressionLanguagePresent()); + assertTrue(Query.prepare("${hostname():equals('localhost')}").isExpressionLanguagePresent()); + assertTrue(Query.prepare("prefix-${hostname()}").isExpressionLanguagePresent()); + assertTrue(Query.prepare("${hostname()}-suffix").isExpressionLanguagePresent()); + assertTrue(Query.prepare("${variable1}${hostname()}${variable2}").isExpressionLanguagePresent()); + assertTrue(Query.prepare("${${variable}}").isExpressionLanguagePresent()); + +assertFalse(Query.prepare("${}").isExpressionLanguagePresent()); + +assertTrue(Query.prepare("#{param}").isExpressionLanguagePresent()); Review comment: Thanks @markap14. Changed the logic, also rebased the branch. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] jfrazee commented on a change in pull request #4753: NIFI-7356 - Enable TLS for embedded Zookeeper when NiFi has TLS enabled
jfrazee commented on a change in pull request #4753: URL: https://github.com/apache/nifi/pull/4753#discussion_r561224503 ## File path: nifi-docs/src/main/asciidoc/administration-guide.adoc ## @@ -2246,6 +2246,53 @@ _true_. Once Netty is enabled, you should see log messages like the following in 2020-02-24 23:37:54,082 INFO [nioEventLoopGroup-3-1] o.apache.zookeeper.ClientCnxnSocketNetty SSL handler added for channel: [id: 0xa831f9c3] 2020-02-24 23:37:54,104 INFO [nioEventLoopGroup-3-1] o.apache.zookeeper.ClientCnxnSocketNetty channel is connected: [id: 0xa831f9c3, L:/172.17.0.4:56510 - R:8e38869cd1d1/172.17.0.3:2281] +=== Embedded ZooKeeper with TLS + +A NiFi cluster can also be deployed using a ZooKeeper instance(s) embedded in NiFi itself which all nodes can communicate with. Communication between nodes and this embedded ZooKeeper can also be secured with TLS. The configuration for the client side of the connection will operate in the same way as an external ZooKeeper. That is, it will use the `+nifi.security.*+` properties from the nifi.properties file by default, unless you specifiy explicit ZooKeeper keystore/truststore properties with `+nifi.zookeeper.security.*+` as described above. + +The server configuration will operate in the same way as an insecure embedded server, but with the `+secureClientPort+` set (typically port `+2281+`). + Review comment: We should add something like this: ```suggestion NOTE: When using a secure server, the secure embedded ZooKeeper server ignores any +clientPort+ or +clientPortAddress+ specified in _$NIFI_HOME/conf/zookeeper.properties_. I.e., if the NiFi-embedded ZooKeeper exposes a +secureClientPort+ it will not expose an insecure +clientPort+ regardless of configuration. This is a behavioral difference between the embedded server and an external ZooKeeper server. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] thenatog commented on a change in pull request #4753: NIFI-7356 - Enable TLS for embedded Zookeeper when NiFi has TLS enabled
thenatog commented on a change in pull request #4753: URL: https://github.com/apache/nifi/pull/4753#discussion_r561219030 ## File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java ## @@ -198,6 +219,144 @@ public static ZooKeeperStateServer create(final NiFiProperties properties) throw zkProperties.load(bis); } -return new ZooKeeperStateServer(zkProperties); +return new ZooKeeperStateServer(reconcileProperties(properties, zkProperties)); +} + +/** + * Reconcile properties between the nifi.properties and zookeeper.properties (zoo.cfg) files. Most of the ZooKeeper server properties are derived from + * the zookeeper.properties file, while the TLS key/truststore properties are taken from nifi.properties. + * @param niFiProperties NiFiProperties file containing ZooKeeper client and TLS configuration + * @param zkProperties The zookeeper.properties file containing Zookeeper server configuration + * @return A reconciled QuorumPeerConfig which will include TLS properties set if they are available. + * @throws IOException If configuration files fail to parse. + * @throws ConfigException If secure configuration is not as expected. Check administration documentation. + */ +private static QuorumPeerConfig reconcileProperties(NiFiProperties niFiProperties, Properties zkProperties) throws IOException, ConfigException { +QuorumPeerConfig peerConfig = new QuorumPeerConfig(); +peerConfig.parseProperties(zkProperties); + +final boolean niFiConfigIsSecure = isNiFiConfigSecureForZooKeeper(niFiProperties); +final boolean zooKeeperConfigIsSecure = isZooKeeperConfigSecure(peerConfig); + +if (!zooKeeperConfigIsSecure && !niFiConfigIsSecure) { +logger.info("{} property is set to false or is not present, and zookeeper.properties file does not contain secureClientPort property, so embedded ZooKeeper will be started without TLS.", +NiFiProperties.ZOOKEEPER_CLIENT_SECURE); +return peerConfig; +} + +// If secureClientPort is set but no TLS config is set, fail to start. +if (zooKeeperConfigIsSecure && !niFiConfigIsSecure) { +throw new ConfigException( +String.format("Zookeeper properties file %s was configured to be secure but there was no valid TLS config present in nifi.properties or " + + "nifi.zookeeper.client.secure was set to false. Check the administration guide.", + niFiProperties.getProperty(NiFiProperties.STATE_MANAGEMENT_ZOOKEEPER_PROPERTIES))); +} + +// Remove any insecure ports if they were set in zookeeper.properties +ensureOnlySecurePortsAreEnabled(peerConfig, zkProperties); + +// Set base ZooKeeper TLS server properties +setTlsProperties(zkProperties, new ZooKeeperServerX509Util(), niFiProperties); +// Set quorum ZooKeeper TLS server properties +setTlsProperties(zkProperties, new ZooKeeperQuorumX509Util(), niFiProperties); +// Set TLS client port: +zkProperties.setProperty("secureClientPort", getSecurePort(peerConfig)); + +// Set the required connection factory for TLS +zkProperties.setProperty(ZOOKEEPER_SERVER_CNXN_FACTORY, NettyServerCnxnFactory.class.getName()); +zkProperties.setProperty(ZOOKEEPER_SSL_QUORUM, Boolean.TRUE.toString()); Review comment: setTlsProperties() is setting the 'system' level properties for the keystores ie. ssl.keyStore.location and ssl.quorum.keyStore.location. These above properties are only set once and do not use the same property naming scheme. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] jfrazee commented on a change in pull request #4753: NIFI-7356 - Enable TLS for embedded Zookeeper when NiFi has TLS enabled
jfrazee commented on a change in pull request #4753: URL: https://github.com/apache/nifi/pull/4753#discussion_r561157059 ## File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java ## @@ -136,13 +156,14 @@ private void startDistributed() throws IOException { quorumPeer.setInitLimit(quorumPeerConfig.getInitLimit()); quorumPeer.setSyncLimit(quorumPeerConfig.getSyncLimit()); quorumPeer.setQuorumVerifier(quorumPeerConfig.getQuorumVerifier(), false); -quorumPeer.setCnxnFactory(connectionFactory); quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory())); quorumPeer.setLearnerType(quorumPeerConfig.getPeerType()); quorumPeer.setSyncEnabled(quorumPeerConfig.getSyncEnabled()); quorumPeer.setQuorumListenOnAllIPs(quorumPeerConfig.getQuorumListenOnAllIPs()); +quorumPeer.setSslQuorum(quorumPeerConfig.isSslQuorum()); quorumPeer.start(); + Review comment: ```suggestion ``` ## File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java ## @@ -198,6 +219,144 @@ public static ZooKeeperStateServer create(final NiFiProperties properties) throw zkProperties.load(bis); } -return new ZooKeeperStateServer(zkProperties); +return new ZooKeeperStateServer(reconcileProperties(properties, zkProperties)); +} + +/** + * Reconcile properties between the nifi.properties and zookeeper.properties (zoo.cfg) files. Most of the ZooKeeper server properties are derived from + * the zookeeper.properties file, while the TLS key/truststore properties are taken from nifi.properties. + * @param niFiProperties NiFiProperties file containing ZooKeeper client and TLS configuration + * @param zkProperties The zookeeper.properties file containing Zookeeper server configuration Review comment: ```suggestion * @param zkProperties The zookeeper.properties file containing ZooKeeper server configuration ``` ## File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java ## @@ -198,6 +219,144 @@ public static ZooKeeperStateServer create(final NiFiProperties properties) throw zkProperties.load(bis); } -return new ZooKeeperStateServer(zkProperties); +return new ZooKeeperStateServer(reconcileProperties(properties, zkProperties)); +} + +/** + * Reconcile properties between the nifi.properties and zookeeper.properties (zoo.cfg) files. Most of the ZooKeeper server properties are derived from + * the zookeeper.properties file, while the TLS key/truststore properties are taken from nifi.properties. + * @param niFiProperties NiFiProperties file containing ZooKeeper client and TLS configuration + * @param zkProperties The zookeeper.properties file containing Zookeeper server configuration + * @return A reconciled QuorumPeerConfig which will include TLS properties set if they are available. + * @throws IOException If configuration files fail to parse. + * @throws ConfigException If secure configuration is not as expected. Check administration documentation. + */ +private static QuorumPeerConfig reconcileProperties(NiFiProperties niFiProperties, Properties zkProperties) throws IOException, ConfigException { +QuorumPeerConfig peerConfig = new QuorumPeerConfig(); +peerConfig.parseProperties(zkProperties); + +final boolean niFiConfigIsSecure = isNiFiConfigSecureForZooKeeper(niFiProperties); +final boolean zooKeeperConfigIsSecure = isZooKeeperConfigSecure(peerConfig); + +if (!zooKeeperConfigIsSecure && !niFiConfigIsSecure) { +logger.info("{} property is set to false or is not present, and zookeeper.properties file does not contain secureClientPort property, so embedded ZooKeeper will be started without TLS.", +NiFiProperties.ZOOKEEPER_CLIENT_SECURE); +return peerConfig; +} + +// If secureClientPort is set but no TLS config is set, fail to start. +if (zooKeeperConfigIsSecure && !niFiConfigIsSecure) { +throw new ConfigException( +String.format("Zookeeper properties file %s was configured to be secure but there was no valid TLS config present in nifi.properties or " + + "nifi.zookeeper.client.secure was set to false. Check the administration guide.", + niFiProperties.getProperty(NiFiProperties.STATE_MANAGEMENT_ZOOKEEPER_PROPERTIES))); +} + +// Remove
[GitHub] [nifi] exceptionfactory commented on pull request #4734: NIFI-8023 Added toLocalDate() and updated toDate() in DataTypeUtils
exceptionfactory commented on pull request #4734: URL: https://github.com/apache/nifi/pull/4734#issuecomment-763865777 @adenes After evaluating the unit tests mentioned, I found that each one was performing date conversion in order to compare the expected results. The conversion changed the expected value, which was throwing off the tests. I updated the tests and confirmed successful test completion with the CET time zone. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (NIFI-8146) Allow RecordPath to be used for specifying operation type and data fields when using PutDatabaseRecord
[ https://issues.apache.org/jira/browse/NIFI-8146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matt Burgess resolved NIFI-8146. Resolution: Fixed > Allow RecordPath to be used for specifying operation type and data fields > when using PutDatabaseRecord > -- > > Key: NIFI-8146 > URL: https://issues.apache.org/jira/browse/NIFI-8146 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Mark Payne >Assignee: Mark Payne >Priority: Major > Fix For: 1.13.0 > > Time Spent: 1h > Remaining Estimate: 0h > > PutDatbaseRecord requires that the Statement Type be defined as a property or > a FlowFile attribute. This means that if a FlowFile has many records, it must > be split apart into individual Records if there is more than 1 type of > statement needed per FlowFile. > It also assumes that the data to be inserted/updated/deleted/etc is the full > record. However, it's common to have some wrapper around the actual data, as > is the case with a tool like Debezium, which includes an Operation Type, a > 'before' snapshot and an 'after' snapshot. To accommodate this, we should > allow Record-friendly methods for specifying the path to the data and the > operation type. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (NIFI-8146) Allow RecordPath to be used for specifying operation type and data fields when using PutDatabaseRecord
[ https://issues.apache.org/jira/browse/NIFI-8146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17268778#comment-17268778 ] ASF subversion and git services commented on NIFI-8146: --- Commit 803ba882aa15142a9986dc0c23bbf4db11fe15a7 in nifi's branch refs/heads/main from Mark Payne [ https://gitbox.apache.org/repos/asf?p=nifi.git;h=803ba88 ] NIFI-8146: Ensure that we close the Connection/Statement/PreparedStatement objects in finally blocks or try-with-resources Signed-off-by: Matthew Burgess This closes #4770 > Allow RecordPath to be used for specifying operation type and data fields > when using PutDatabaseRecord > -- > > Key: NIFI-8146 > URL: https://issues.apache.org/jira/browse/NIFI-8146 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Mark Payne >Assignee: Mark Payne >Priority: Major > Fix For: 1.13.0 > > Time Spent: 1h > Remaining Estimate: 0h > > PutDatbaseRecord requires that the Statement Type be defined as a property or > a FlowFile attribute. This means that if a FlowFile has many records, it must > be split apart into individual Records if there is more than 1 type of > statement needed per FlowFile. > It also assumes that the data to be inserted/updated/deleted/etc is the full > record. However, it's common to have some wrapper around the actual data, as > is the case with a tool like Debezium, which includes an Operation Type, a > 'before' snapshot and an 'after' snapshot. To accommodate this, we should > allow Record-friendly methods for specifying the path to the data and the > operation type. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi] mattyb149 closed pull request #4770: NIFI-8146: Ensure that we close the Connection/Statement/PreparedStat…
mattyb149 closed pull request #4770: URL: https://github.com/apache/nifi/pull/4770 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] mattyb149 commented on pull request #4770: NIFI-8146: Ensure that we close the Connection/Statement/PreparedStat…
mattyb149 commented on pull request #4770: URL: https://github.com/apache/nifi/pull/4770#issuecomment-763856888 +1 LGTM, tested scenarios with debugger and verified connections and statements were closed. Thanks for the fix! Merging to main This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] markap14 commented on a change in pull request #4746: NIFI-8034: Fixed PropertyValue.isExpressionLanguagePresent always ret…
markap14 commented on a change in pull request #4746: URL: https://github.com/apache/nifi/pull/4746#discussion_r561166527 ## File path: nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestStandardPreparedQuery.java ## @@ -309,6 +309,24 @@ public void testVariableImpacted() { assertTrue(Query.prepare("${anyMatchingAttribute('a.*'):equals('hello')}").getVariableImpact().isImpacted("attr")); } +@Test +public void testIsExpressionLanguagePresent() { +assertFalse(Query.prepare("value").isExpressionLanguagePresent()); +assertFalse(Query.prepare("").isExpressionLanguagePresent()); + +assertTrue(Query.prepare("${variable}").isExpressionLanguagePresent()); + assertTrue(Query.prepare("${hostname()}").isExpressionLanguagePresent()); + assertTrue(Query.prepare("${hostname():equals('localhost')}").isExpressionLanguagePresent()); + assertTrue(Query.prepare("prefix-${hostname()}").isExpressionLanguagePresent()); + assertTrue(Query.prepare("${hostname()}-suffix").isExpressionLanguagePresent()); + assertTrue(Query.prepare("${variable1}${hostname()}${variable2}").isExpressionLanguagePresent()); + assertTrue(Query.prepare("${${variable}}").isExpressionLanguagePresent()); + +assertFalse(Query.prepare("${}").isExpressionLanguagePresent()); + +assertTrue(Query.prepare("#{param}").isExpressionLanguagePresent()); Review comment: Well, that's interesting. The user guide says: ``` Value - The value that will be used when the Parameter is referenced. Parameter values do not support Expression Language or embedded parameter references. ``` Parameters are not supposed to be able to make use of EL. But I just tried it with UpdateAttribute, and interestingly it did evaluate the reference. I think that is actually a bug, though. So I would tend to say that we should return `false` since Parameters are not supposed to support EL. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (NIFI-8154) AvroParquetHDFSRecordReader fails to read parquet file containing nested structs
[ https://issues.apache.org/jira/browse/NIFI-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17268733#comment-17268733 ] Glenn Jones commented on NIFI-8154: --- I see what you mean [~pvillard]. I think the failure in [TestConvertAvroToParquet.test_Data()|[https://github.com/apache/nifi/blob/25ab050ed78cedd11450cdc7e3165a58682fddb2/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/TestConvertAvroToParquet.java#L186]] may be related to [PARQUET-1879|https://issues.apache.org/jira/browse/PARQUET-1879]. I'll try to figure out what's going on. > AvroParquetHDFSRecordReader fails to read parquet file containing nested > structs > > > Key: NIFI-8154 > URL: https://issues.apache.org/jira/browse/NIFI-8154 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Affects Versions: 1.11.3, 1.12.1 >Reporter: Glenn Jones >Priority: Minor > Time Spent: 0.5h > Remaining Estimate: 0h > > FetchParquet can't be used to process files containing nested structs. When > trying to create a RecordSchema it runs into > https://issues.apache.org/jira/browse/PARQUET-1441, which causes it to fail. > We've patched this locally by building the nifi-parquet-processors with > parquet-avro 1.11.0, but it would be great if this made it into the next > release. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi] mtien-apache commented on pull request #4767: NIFI-1355 Implemented new methods in KeyStoreUtils to programmatical…
mtien-apache commented on pull request #4767: URL: https://github.com/apache/nifi/pull/4767#issuecomment-763823481 Fixing the build failure now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] turcsanyip commented on a change in pull request #4746: NIFI-8034: Fixed PropertyValue.isExpressionLanguagePresent always ret…
turcsanyip commented on a change in pull request #4746: URL: https://github.com/apache/nifi/pull/4746#discussion_r561153888 ## File path: nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestStandardPreparedQuery.java ## @@ -309,6 +309,24 @@ public void testVariableImpacted() { assertTrue(Query.prepare("${anyMatchingAttribute('a.*'):equals('hello')}").getVariableImpact().isImpacted("attr")); } +@Test +public void testIsExpressionLanguagePresent() { +assertFalse(Query.prepare("value").isExpressionLanguagePresent()); +assertFalse(Query.prepare("").isExpressionLanguagePresent()); + +assertTrue(Query.prepare("${variable}").isExpressionLanguagePresent()); + assertTrue(Query.prepare("${hostname()}").isExpressionLanguagePresent()); + assertTrue(Query.prepare("${hostname():equals('localhost')}").isExpressionLanguagePresent()); + assertTrue(Query.prepare("prefix-${hostname()}").isExpressionLanguagePresent()); + assertTrue(Query.prepare("${hostname()}-suffix").isExpressionLanguagePresent()); + assertTrue(Query.prepare("${variable1}${hostname()}${variable2}").isExpressionLanguagePresent()); + assertTrue(Query.prepare("${${variable}}").isExpressionLanguagePresent()); + +assertFalse(Query.prepare("${}").isExpressionLanguagePresent()); + +assertTrue(Query.prepare("#{param}").isExpressionLanguagePresent()); Review comment: The parameter may or may not contain EL. My idea was to handle it as "unknown" and I felt it safer to say `true` in this case. I ran into this "isExpressionLanguagePresent always returns true" issue in `customValidate()` and the parameter reference is already resolved when `customValidate()` gets called. So only `StringLiteralExpression` (non-EL) and `CompiledExpression` (EL) can occur there but no `ParameterExpression`. I was uncertain about `ParameterExpression` because I do not know where else it is being used in context of EL. If you confirm that returning `false` is the right way I can easily change the logic. I just wanted to explain what my approach was. Thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] andrewmlim commented on pull request #4766: NIFI-8150 Change Download flow to Download flow definition for proces…
andrewmlim commented on pull request #4766: URL: https://github.com/apache/nifi/pull/4766#issuecomment-763813787 Thanks for taking a look @exceptionfactory! Appreciate it! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] exceptionfactory commented on pull request #4734: NIFI-8023 Added toLocalDate() and updated toDate() in DataTypeUtils
exceptionfactory commented on pull request #4734: URL: https://github.com/apache/nifi/pull/4734#issuecomment-763806901 > @exceptionfactory , thanks for the follow-up commits. I ran the tests with CET system TZ and unfortunately some test cases fail: > > ``` > [ERROR] Failures: > [ERROR] TestAvroReaderWithEmbeddedSchema.testLogicalTypes:66->testLogicalTypes:125 expected:<2017-04-0[4]> but was:<2017-04-0[3]> > [ERROR] TestAvroReaderWithEmbeddedSchema.testNullableLogicalTypes:72->testLogicalTypes:125 expected:<2017-04-0[4]> but was:<2017-04-0[3]> > [ERROR] TestCSVRecordReader.testDate:119 expected:<30> but was:<29> > [ERROR] TestCSVRecordReader.testDateNoCoersionExpectedFormat:163 expected:<30> but was:<29> > [ERROR] TestJacksonCSVRecordReader.testDate:105 expected:<30> but was:<29> > ``` > > Could you please have a look at them? Thanks @adenes I will take a look at those tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] markap14 commented on a change in pull request #4730: NIFI-8095: Created StatelessNiFi Sink Connector and Source Connector.…
markap14 commented on a change in pull request #4730: URL: https://github.com/apache/nifi/pull/4730#discussion_r561138544 ## File path: nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSourceTask.java ## @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.kafka.connect; + +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.errors.RetriableException; +import org.apache.kafka.connect.header.ConnectHeaders; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.stateless.flow.DataflowTrigger; +import org.apache.nifi.stateless.flow.StatelessDataflow; +import org.apache.nifi.stateless.flow.TriggerResult; +import org.apache.nifi.util.FormatUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Pattern; + +public class StatelessNiFiSourceTask extends SourceTask { +public static final String STATE_MAP_KEY = "task.index"; +private static final Logger logger = LoggerFactory.getLogger(StatelessNiFiSourceTask.class); + +private StatelessDataflow dataflow; +private String outputPortName; +private String topicName; +private String topicNameAttribute; +private TriggerResult triggerResult; +private String keyAttributeName; +private Pattern headerAttributeNamePattern; +private long timeoutMillis; +private String dataflowName; +private long failureYieldExpiration = 0L; + +private final Map clusterStatePartitionMap = Collections.singletonMap(STATE_MAP_KEY, "CLUSTER"); +private Map localStatePartitionMap = new HashMap<>(); +private boolean primaryNodeOnly; +private boolean primaryNodeTask; + +private final AtomicLong unacknowledgedRecords = new AtomicLong(0L); + +@Override +public String version() { +return StatelessKafkaConnectorUtil.getVersion(); +} + +@Override +public void start(final Map properties) { +logger.info("Starting Source Task with properties {}", StatelessKafkaConnectorUtil.getLoggableProperties(properties)); + +final String timeout = properties.getOrDefault(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT); +timeoutMillis = (long) FormatUtils.getPreciseTimeDuration(timeout, TimeUnit.MILLISECONDS); + +topicName = properties.get(StatelessNiFiSourceConnector.TOPIC_NAME); +topicNameAttribute = properties.get(StatelessNiFiSourceConnector.TOPIC_NAME_ATTRIBUTE); +keyAttributeName = properties.get(StatelessNiFiSourceConnector.KEY_ATTRIBUTE); + +if (topicName == null && topicNameAttribute == null) { +throw new ConfigException("Either the topic.name or topic.name.attribute configuration must be specified"); +} + +final String headerRegex = properties.get(StatelessNiFiSourceConnector.HEADER_REGEX); +headerAttributeNamePattern = headerRegex == null ? null : Pattern.compile(headerRegex); + +dataflow = StatelessKafkaConnectorUtil.createDataflow(properties); +primaryNodeOnly = dataflow.isSourcePrimaryNodeOnly(); + +// Determine the name of the Output Port to retrieve data from +dataflowName = properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME); +outputPortName = properties.get(StatelessNiFiSourceConnector.OUTPUT_PORT_NAME); +if (outputPortName == null) { +final Set outputPorts = dataflow.getOutputPortNames(); +if (outputPorts.isEmpty()) { +throw new ConfigException("The dataflow specified for <" +
[GitHub] [nifi] markap14 commented on a change in pull request #4730: NIFI-8095: Created StatelessNiFi Sink Connector and Source Connector.…
markap14 commented on a change in pull request #4730: URL: https://github.com/apache/nifi/pull/4730#discussion_r561138442 ## File path: nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSourceTask.java ## @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.kafka.connect; + +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.errors.RetriableException; +import org.apache.kafka.connect.header.ConnectHeaders; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.stateless.flow.DataflowTrigger; +import org.apache.nifi.stateless.flow.StatelessDataflow; +import org.apache.nifi.stateless.flow.TriggerResult; +import org.apache.nifi.util.FormatUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Pattern; + +public class StatelessNiFiSourceTask extends SourceTask { +public static final String STATE_MAP_KEY = "task.index"; +private static final Logger logger = LoggerFactory.getLogger(StatelessNiFiSourceTask.class); + +private StatelessDataflow dataflow; +private String outputPortName; +private String topicName; +private String topicNameAttribute; +private TriggerResult triggerResult; +private String keyAttributeName; +private Pattern headerAttributeNamePattern; +private long timeoutMillis; +private String dataflowName; +private long failureYieldExpiration = 0L; + +private final Map clusterStatePartitionMap = Collections.singletonMap(STATE_MAP_KEY, "CLUSTER"); +private Map localStatePartitionMap = new HashMap<>(); +private boolean primaryNodeOnly; +private boolean primaryNodeTask; + +private final AtomicLong unacknowledgedRecords = new AtomicLong(0L); + +@Override +public String version() { +return StatelessKafkaConnectorUtil.getVersion(); +} + +@Override +public void start(final Map properties) { +logger.info("Starting Source Task with properties {}", StatelessKafkaConnectorUtil.getLoggableProperties(properties)); + +final String timeout = properties.getOrDefault(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT); +timeoutMillis = (long) FormatUtils.getPreciseTimeDuration(timeout, TimeUnit.MILLISECONDS); + +topicName = properties.get(StatelessNiFiSourceConnector.TOPIC_NAME); +topicNameAttribute = properties.get(StatelessNiFiSourceConnector.TOPIC_NAME_ATTRIBUTE); +keyAttributeName = properties.get(StatelessNiFiSourceConnector.KEY_ATTRIBUTE); + +if (topicName == null && topicNameAttribute == null) { +throw new ConfigException("Either the topic.name or topic.name.attribute configuration must be specified"); +} + +final String headerRegex = properties.get(StatelessNiFiSourceConnector.HEADER_REGEX); +headerAttributeNamePattern = headerRegex == null ? null : Pattern.compile(headerRegex); + +dataflow = StatelessKafkaConnectorUtil.createDataflow(properties); +primaryNodeOnly = dataflow.isSourcePrimaryNodeOnly(); + +// Determine the name of the Output Port to retrieve data from +dataflowName = properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME); +outputPortName = properties.get(StatelessNiFiSourceConnector.OUTPUT_PORT_NAME); +if (outputPortName == null) { +final Set outputPorts = dataflow.getOutputPortNames(); +if (outputPorts.isEmpty()) { +throw new ConfigException("The dataflow specified for <" +
[GitHub] [nifi] adenes commented on pull request #4734: NIFI-8023 Added toLocalDate() and updated toDate() in DataTypeUtils
adenes commented on pull request #4734: URL: https://github.com/apache/nifi/pull/4734#issuecomment-763799131 @exceptionfactory , thanks for the follow-up commits. I ran the tests with CET system TZ and unfortunately some test cases fail: ``` [ERROR] Failures: [ERROR] TestAvroReaderWithEmbeddedSchema.testLogicalTypes:66->testLogicalTypes:125 expected:<2017-04-0[4]> but was:<2017-04-0[3]> [ERROR] TestAvroReaderWithEmbeddedSchema.testNullableLogicalTypes:72->testLogicalTypes:125 expected:<2017-04-0[4]> but was:<2017-04-0[3]> [ERROR] TestCSVRecordReader.testDate:119 expected:<30> but was:<29> [ERROR] TestCSVRecordReader.testDateNoCoersionExpectedFormat:163 expected:<30> but was:<29> [ERROR] TestJacksonCSVRecordReader.testDate:105 expected:<30> but was:<29> ``` Could you please have a look at them? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] exceptionfactory commented on a change in pull request #4753: NIFI-7356 - Enable TLS for embedded Zookeeper when NiFi has TLS enabled
exceptionfactory commented on a change in pull request #4753: URL: https://github.com/apache/nifi/pull/4753#discussion_r561113641 ## File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java ## @@ -36,11 +39,17 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.net.InetSocketAddress; import java.util.Properties; public class ZooKeeperStateServer extends ZooKeeperServerMain { private static final Logger logger = LoggerFactory.getLogger(ZooKeeperStateServer.class); +static final int MIN_PORT = 1024; +static final int MAX_PORT = 65353; +static final String ZOOKEEPER_SSL_QUORUM = "sslQuorum"; +static final String ZOOKEEPER_PORT_UNIFICATION = "portUnification"; +static final String ZOOKEEPER_SERVER_CNXN_FACTORY = "serverCnxnFactory"; Review comment: Should these static variables also be marked private? ## File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java ## @@ -36,11 +39,17 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.net.InetSocketAddress; import java.util.Properties; public class ZooKeeperStateServer extends ZooKeeperServerMain { private static final Logger logger = LoggerFactory.getLogger(ZooKeeperStateServer.class); +static final int MIN_PORT = 1024; +static final int MAX_PORT = 65353; Review comment: Should this value be `65535`, or is there a reason for the lower number? ## File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java ## @@ -198,6 +219,144 @@ public static ZooKeeperStateServer create(final NiFiProperties properties) throw zkProperties.load(bis); } -return new ZooKeeperStateServer(zkProperties); +return new ZooKeeperStateServer(reconcileProperties(properties, zkProperties)); +} + +/** + * Reconcile properties between the nifi.properties and zookeeper.properties (zoo.cfg) files. Most of the ZooKeeper server properties are derived from + * the zookeeper.properties file, while the TLS key/truststore properties are taken from nifi.properties. + * @param niFiProperties NiFiProperties file containing ZooKeeper client and TLS configuration + * @param zkProperties The zookeeper.properties file containing Zookeeper server configuration + * @return A reconciled QuorumPeerConfig which will include TLS properties set if they are available. + * @throws IOException If configuration files fail to parse. + * @throws ConfigException If secure configuration is not as expected. Check administration documentation. + */ +private static QuorumPeerConfig reconcileProperties(NiFiProperties niFiProperties, Properties zkProperties) throws IOException, ConfigException { +QuorumPeerConfig peerConfig = new QuorumPeerConfig(); +peerConfig.parseProperties(zkProperties); + +final boolean niFiConfigIsSecure = isNiFiConfigSecureForZooKeeper(niFiProperties); +final boolean zooKeeperConfigIsSecure = isZooKeeperConfigSecure(peerConfig); + +if (!zooKeeperConfigIsSecure && !niFiConfigIsSecure) { +logger.info("{} property is set to false or is not present, and zookeeper.properties file does not contain secureClientPort property, so embedded ZooKeeper will be started without TLS.", Review comment: Should this be a debug message instead of an info message? Also recommend removing the trailing period character from the log message. ## File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/server/TestZooKeeperStateServerConfigurations.java ## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.state.server; + +import
[GitHub] [nifi] urbandan commented on a change in pull request #4730: NIFI-8095: Created StatelessNiFi Sink Connector and Source Connector.…
urbandan commented on a change in pull request #4730: URL: https://github.com/apache/nifi/pull/4730#discussion_r561128503 ## File path: nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSourceTask.java ## @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.kafka.connect; + +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.errors.RetriableException; +import org.apache.kafka.connect.header.ConnectHeaders; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.stateless.flow.DataflowTrigger; +import org.apache.nifi.stateless.flow.StatelessDataflow; +import org.apache.nifi.stateless.flow.TriggerResult; +import org.apache.nifi.util.FormatUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Pattern; + +public class StatelessNiFiSourceTask extends SourceTask { +public static final String STATE_MAP_KEY = "task.index"; +private static final Logger logger = LoggerFactory.getLogger(StatelessNiFiSourceTask.class); + +private StatelessDataflow dataflow; +private String outputPortName; +private String topicName; +private String topicNameAttribute; +private TriggerResult triggerResult; +private String keyAttributeName; +private Pattern headerAttributeNamePattern; +private long timeoutMillis; +private String dataflowName; +private long failureYieldExpiration = 0L; + +private final Map clusterStatePartitionMap = Collections.singletonMap(STATE_MAP_KEY, "CLUSTER"); +private Map localStatePartitionMap = new HashMap<>(); +private boolean primaryNodeOnly; +private boolean primaryNodeTask; + +private final AtomicLong unacknowledgedRecords = new AtomicLong(0L); + +@Override +public String version() { +return StatelessKafkaConnectorUtil.getVersion(); +} + +@Override +public void start(final Map properties) { +logger.info("Starting Source Task with properties {}", StatelessKafkaConnectorUtil.getLoggableProperties(properties)); + +final String timeout = properties.getOrDefault(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT); +timeoutMillis = (long) FormatUtils.getPreciseTimeDuration(timeout, TimeUnit.MILLISECONDS); + +topicName = properties.get(StatelessNiFiSourceConnector.TOPIC_NAME); +topicNameAttribute = properties.get(StatelessNiFiSourceConnector.TOPIC_NAME_ATTRIBUTE); +keyAttributeName = properties.get(StatelessNiFiSourceConnector.KEY_ATTRIBUTE); + +if (topicName == null && topicNameAttribute == null) { +throw new ConfigException("Either the topic.name or topic.name.attribute configuration must be specified"); +} + +final String headerRegex = properties.get(StatelessNiFiSourceConnector.HEADER_REGEX); +headerAttributeNamePattern = headerRegex == null ? null : Pattern.compile(headerRegex); + +dataflow = StatelessKafkaConnectorUtil.createDataflow(properties); +primaryNodeOnly = dataflow.isSourcePrimaryNodeOnly(); + +// Determine the name of the Output Port to retrieve data from +dataflowName = properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME); +outputPortName = properties.get(StatelessNiFiSourceConnector.OUTPUT_PORT_NAME); +if (outputPortName == null) { +final Set outputPorts = dataflow.getOutputPortNames(); +if (outputPorts.isEmpty()) { +throw new ConfigException("The dataflow specified for <" +
[GitHub] [nifi] markap14 commented on a change in pull request #4730: NIFI-8095: Created StatelessNiFi Sink Connector and Source Connector.…
markap14 commented on a change in pull request #4730: URL: https://github.com/apache/nifi/pull/4730#discussion_r561112193 ## File path: nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSourceTask.java ## @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.kafka.connect; + +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.errors.RetriableException; +import org.apache.kafka.connect.header.ConnectHeaders; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.stateless.flow.DataflowTrigger; +import org.apache.nifi.stateless.flow.StatelessDataflow; +import org.apache.nifi.stateless.flow.TriggerResult; +import org.apache.nifi.util.FormatUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Pattern; + +public class StatelessNiFiSourceTask extends SourceTask { +public static final String STATE_MAP_KEY = "task.index"; +private static final Logger logger = LoggerFactory.getLogger(StatelessNiFiSourceTask.class); + +private StatelessDataflow dataflow; +private String outputPortName; +private String topicName; +private String topicNameAttribute; +private TriggerResult triggerResult; +private String keyAttributeName; +private Pattern headerAttributeNamePattern; +private long timeoutMillis; +private String dataflowName; +private long failureYieldExpiration = 0L; + +private final Map clusterStatePartitionMap = Collections.singletonMap(STATE_MAP_KEY, "CLUSTER"); +private Map localStatePartitionMap = new HashMap<>(); +private boolean primaryNodeOnly; +private boolean primaryNodeTask; + +private final AtomicLong unacknowledgedRecords = new AtomicLong(0L); + +@Override +public String version() { +return StatelessKafkaConnectorUtil.getVersion(); +} + +@Override +public void start(final Map properties) { +logger.info("Starting Source Task with properties {}", StatelessKafkaConnectorUtil.getLoggableProperties(properties)); + +final String timeout = properties.getOrDefault(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT); +timeoutMillis = (long) FormatUtils.getPreciseTimeDuration(timeout, TimeUnit.MILLISECONDS); + +topicName = properties.get(StatelessNiFiSourceConnector.TOPIC_NAME); +topicNameAttribute = properties.get(StatelessNiFiSourceConnector.TOPIC_NAME_ATTRIBUTE); +keyAttributeName = properties.get(StatelessNiFiSourceConnector.KEY_ATTRIBUTE); + +if (topicName == null && topicNameAttribute == null) { +throw new ConfigException("Either the topic.name or topic.name.attribute configuration must be specified"); +} + +final String headerRegex = properties.get(StatelessNiFiSourceConnector.HEADER_REGEX); +headerAttributeNamePattern = headerRegex == null ? null : Pattern.compile(headerRegex); + +dataflow = StatelessKafkaConnectorUtil.createDataflow(properties); +primaryNodeOnly = dataflow.isSourcePrimaryNodeOnly(); + +// Determine the name of the Output Port to retrieve data from +dataflowName = properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME); +outputPortName = properties.get(StatelessNiFiSourceConnector.OUTPUT_PORT_NAME); +if (outputPortName == null) { +final Set outputPorts = dataflow.getOutputPortNames(); +if (outputPorts.isEmpty()) { +throw new ConfigException("The dataflow specified for <" +
[GitHub] [nifi] markap14 commented on a change in pull request #4730: NIFI-8095: Created StatelessNiFi Sink Connector and Source Connector.…
markap14 commented on a change in pull request #4730: URL: https://github.com/apache/nifi/pull/4730#discussion_r561109996 ## File path: nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSinkTask.java ## @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.kafka.connect; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.errors.RetriableException; +import org.apache.kafka.connect.header.Header; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.apache.nifi.controller.queue.QueueSize; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.stateless.flow.DataflowTrigger; +import org.apache.nifi.stateless.flow.StatelessDataflow; +import org.apache.nifi.stateless.flow.TriggerResult; +import org.apache.nifi.util.FormatUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; + +public class StatelessNiFiSinkTask extends SinkTask { +private static final Logger logger = LoggerFactory.getLogger(StatelessNiFiSinkTask.class); + +private StatelessDataflow dataflow; +private String inputPortName; +private Set failurePortNames; +private long timeoutMillis; +private Pattern headerNameRegex; +private String headerNamePrefix; +private int batchSize; +private long batchBytes; +private QueueSize queueSize; +private String dataflowName; + +private long backoffMillis = 0L; +private boolean lastTriggerSuccessful = true; +private ExecutorService backgroundTriggerExecutor; + +@Override +public String version() { +return StatelessKafkaConnectorUtil.getVersion(); +} + +@Override +public void start(final Map properties) { +logger.info("Starting Sink Task with properties {}", StatelessKafkaConnectorUtil.getLoggableProperties(properties)); + +final String timeout = properties.getOrDefault(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT); +timeoutMillis = (long) FormatUtils.getPreciseTimeDuration(timeout, TimeUnit.MILLISECONDS); + +dataflowName = properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME); + +final String regex = properties.get(StatelessNiFiSinkConnector.HEADERS_AS_ATTRIBUTES_REGEX); +headerNameRegex = regex == null ? null : Pattern.compile(regex); +headerNamePrefix = properties.getOrDefault(StatelessNiFiSinkConnector.HEADER_ATTRIBUTE_NAME_PREFIX, ""); + +batchSize = Integer.parseInt(properties.getOrDefault(StatelessNiFiSinkConnector.BATCH_SIZE_COUNT, "0")); +batchBytes = Long.parseLong(properties.getOrDefault(StatelessNiFiSinkConnector.BATCH_SIZE_BYTES, "0")); + +dataflow = StatelessKafkaConnectorUtil.createDataflow(properties); + +// Determine input port name. If input port is explicitly set, use the value given. Otherwise, if only one port exists, use that. Otherwise, throw ConfigException. +final String dataflowName = properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME); +inputPortName = properties.get(StatelessNiFiSinkConnector.INPUT_PORT_NAME); +if (inputPortName == null) { +final Set inputPorts = dataflow.getInputPortNames(); +if (inputPorts.isEmpty()) { +throw new ConfigException("The dataflow specified for <" + dataflowName + "> does not have an Input Port at the root level. Dataflows used for a Kafka Connect Sink Task " ++ "must have at least one Input Port at the root level.");
[GitHub] [nifi] urbandan commented on a change in pull request #4730: NIFI-8095: Created StatelessNiFi Sink Connector and Source Connector.…
urbandan commented on a change in pull request #4730: URL: https://github.com/apache/nifi/pull/4730#discussion_r561093484 ## File path: nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSinkTask.java ## @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.kafka.connect; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.errors.RetriableException; +import org.apache.kafka.connect.header.Header; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.apache.nifi.controller.queue.QueueSize; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.stateless.flow.DataflowTrigger; +import org.apache.nifi.stateless.flow.StatelessDataflow; +import org.apache.nifi.stateless.flow.TriggerResult; +import org.apache.nifi.util.FormatUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; + +public class StatelessNiFiSinkTask extends SinkTask { +private static final Logger logger = LoggerFactory.getLogger(StatelessNiFiSinkTask.class); + +private StatelessDataflow dataflow; +private String inputPortName; +private Set failurePortNames; +private long timeoutMillis; +private Pattern headerNameRegex; +private String headerNamePrefix; +private int batchSize; +private long batchBytes; +private QueueSize queueSize; +private String dataflowName; + +private long backoffMillis = 0L; +private boolean lastTriggerSuccessful = true; +private ExecutorService backgroundTriggerExecutor; + +@Override +public String version() { +return StatelessKafkaConnectorUtil.getVersion(); +} + +@Override +public void start(final Map properties) { +logger.info("Starting Sink Task with properties {}", StatelessKafkaConnectorUtil.getLoggableProperties(properties)); + +final String timeout = properties.getOrDefault(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT); +timeoutMillis = (long) FormatUtils.getPreciseTimeDuration(timeout, TimeUnit.MILLISECONDS); + +dataflowName = properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME); + +final String regex = properties.get(StatelessNiFiSinkConnector.HEADERS_AS_ATTRIBUTES_REGEX); +headerNameRegex = regex == null ? null : Pattern.compile(regex); +headerNamePrefix = properties.getOrDefault(StatelessNiFiSinkConnector.HEADER_ATTRIBUTE_NAME_PREFIX, ""); + +batchSize = Integer.parseInt(properties.getOrDefault(StatelessNiFiSinkConnector.BATCH_SIZE_COUNT, "0")); +batchBytes = Long.parseLong(properties.getOrDefault(StatelessNiFiSinkConnector.BATCH_SIZE_BYTES, "0")); + +dataflow = StatelessKafkaConnectorUtil.createDataflow(properties); + +// Determine input port name. If input port is explicitly set, use the value given. Otherwise, if only one port exists, use that. Otherwise, throw ConfigException. +final String dataflowName = properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME); +inputPortName = properties.get(StatelessNiFiSinkConnector.INPUT_PORT_NAME); +if (inputPortName == null) { +final Set inputPorts = dataflow.getInputPortNames(); +if (inputPorts.isEmpty()) { +throw new ConfigException("The dataflow specified for <" + dataflowName + "> does not have an Input Port at the root level. Dataflows used for a Kafka Connect Sink Task " ++ "must have at least one Input Port at the root level.");
[GitHub] [nifi] exceptionfactory commented on pull request #4768: NIFI-8155 - add banner text in page title
exceptionfactory commented on pull request #4768: URL: https://github.com/apache/nifi/pull/4768#issuecomment-763755846 This change looks straightforward as it stands, but there are several other places where the `document.title` is set when opening new windows, such as Provenance contents or Bulletin Board messages. Should those other references also be updated? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] thenatog commented on pull request #4753: NIFI-7356 - Enable TLS for embedded Zookeeper when NiFi has TLS enabled
thenatog commented on pull request #4753: URL: https://github.com/apache/nifi/pull/4753#issuecomment-763753967 Addressing your first two points: > - TLS is required for the embedded ZK when cluster TLS is enabled but NiFi won't try to connect securely unless nifi.zookeeper.client.secure is set to true in nifi.properties. > - Similarly, the embedded ZK won't actually run with TLS enabled unless secureClientPort is set in zookeeper.properties. It appears that clientPort is successfully removed but secureClientPort doesn't get added. I have put in configuration logic that will stop NiFi from starting if secureClientPort is configured in zookeeper.properties but nifi.zookeeper.client.secure=false. The requirement being that they will need to configure zookeeper.properties with a clientPort value instead. When starting securely, the intent is to remove any additional clientPort that may allow insecure connections. In contrast, secureClientPort will not be added if clientPort was set but everything else should be secure. The user will need to manually edit the zookeeper.properties file to set secureClientPort. Let me know what you think of the latest set of commits. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] markap14 commented on a change in pull request #4746: NIFI-8034: Fixed PropertyValue.isExpressionLanguagePresent always ret…
markap14 commented on a change in pull request #4746: URL: https://github.com/apache/nifi/pull/4746#discussion_r561092480 ## File path: nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestStandardPreparedQuery.java ## @@ -309,6 +309,24 @@ public void testVariableImpacted() { assertTrue(Query.prepare("${anyMatchingAttribute('a.*'):equals('hello')}").getVariableImpact().isImpacted("attr")); } +@Test +public void testIsExpressionLanguagePresent() { +assertFalse(Query.prepare("value").isExpressionLanguagePresent()); +assertFalse(Query.prepare("").isExpressionLanguagePresent()); + +assertTrue(Query.prepare("${variable}").isExpressionLanguagePresent()); + assertTrue(Query.prepare("${hostname()}").isExpressionLanguagePresent()); + assertTrue(Query.prepare("${hostname():equals('localhost')}").isExpressionLanguagePresent()); + assertTrue(Query.prepare("prefix-${hostname()}").isExpressionLanguagePresent()); + assertTrue(Query.prepare("${hostname()}-suffix").isExpressionLanguagePresent()); + assertTrue(Query.prepare("${variable1}${hostname()}${variable2}").isExpressionLanguagePresent()); + assertTrue(Query.prepare("${${variable}}").isExpressionLanguagePresent()); + +assertFalse(Query.prepare("${}").isExpressionLanguagePresent()); + +assertTrue(Query.prepare("#{param}").isExpressionLanguagePresent()); Review comment: This should be false, not true. This is not expression language but rather a parameter reference. The value that will be made available to the component will be the fully resolved value, after substituting in the parameter. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] joewitt commented on pull request #4771: NIFI-8156 Fixed byte handling bug in cassandra.
joewitt commented on pull request #4771: URL: https://github.com/apache/nifi/pull/4771#issuecomment-763714975 Ha! Me too. I trust it was broken and you've verified your fix ;) But yeah thanks for sharing and thanks for tackling. Will monitor the build. Tagged to 1.13 so I wont miss it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] MikeThomsen commented on pull request #4771: NIFI-8156 Fixed byte handling bug in cassandra.
MikeThomsen commented on pull request #4771: URL: https://github.com/apache/nifi/pull/4771#issuecomment-763711741 @joewitt cool, but I'm a believer in "trust, but verify" so here is a screenshot of my terminal showing queries against the table matching the steps I showed: https://user-images.githubusercontent.com/108184/105197394-93a81980-5b0a-11eb-9282-a1a2476c4c3c.png;> This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] joewitt commented on pull request #4771: NIFI-8156 Fixed byte handling bug in cassandra.
joewitt commented on pull request #4771: URL: https://github.com/apache/nifi/pull/4771#issuecomment-763708883 I will simply review code and trust you validated it. We're good if the build is good on this in my view. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] MikeThomsen edited a comment on pull request #4771: NIFI-8156 Fixed byte handling bug in cassandra.
MikeThomsen edited a comment on pull request #4771: URL: https://github.com/apache/nifi/pull/4771#issuecomment-763707997 @joewitt here are test instructions: ``` docker run -p 7000:7000 -p 9042:9042 --name cassandra -d cassandra:3 ``` ``` CREATE KEYSPACE byte_test WITH replication = {'class':'SimpleStrategy', 'replication_factor' : 3}; use byte_test; create table binary_test (id text, data blob, primary key(id)); ``` [Cassandra_Byte_Array_Test.xml.txt](https://github.com/apache/nifi/files/5843448/Cassandra_Byte_Array_Test.xml.txt) Once it's running, you can test with: ``` select id, blobastext(data) from binary_test; ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] MikeThomsen commented on pull request #4771: NIFI-8156 Fixed byte handling bug in cassandra.
MikeThomsen commented on pull request #4771: URL: https://github.com/apache/nifi/pull/4771#issuecomment-763707997 @joewitt here are test instructions: ``` docker run -p 7000:7000 -p 9042:9042 --name cassandra -d cassandra:3 ``` ``` CREATE KEYSPACE byte_test WITH replication = {'class':'SimpleStrategy', 'replication_factor' : 3}; use byte_test; create table binary_test (id text, data blob, primary key(id)); ``` [Cassandra_Byte_Array_Test.xml.txt](https://github.com/apache/nifi/files/5843448/Cassandra_Byte_Array_Test.xml.txt) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] markap14 commented on a change in pull request #4760: NIFI-8142 Add "on conflict do nothing" feature to PutDatabaseRecord
markap14 commented on a change in pull request #4760: URL: https://github.com/apache/nifi/pull/4760#discussion_r561045998 ## File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java ## @@ -301,13 +301,23 @@ .name("put-db-record-max-batch-size") .displayName("Maximum Batch Size") .description("Specifies maximum batch size for INSERT and UPDATE statements. This parameter has no effect for other statements specified in 'Statement Type'." -+ " Zero means the batch size is not limited.") ++ " Zero means the batch size is not limited.") .defaultValue("0") .required(false) .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); +static final PropertyDescriptor UPSERT_DO_NOTHING = new PropertyDescriptor.Builder() Review comment: This feels odd to me. A property named "Upsert Do Nothing" I feel is confusing and misleading. It sounds like any upsert should be ignored and not acted upon. And in this case, it's not really upserts that are being ignored, but conflicting Inserts. What makes more sense to me is to add another option for the Statement Type: "INSERT_IGNORE" or something to that effect. This would be consistent with how it is done with PutKudu, also. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (NIFI-8156) PutCassandraRecord does not wrap byte arrays with a ByteBuffer, causing write failures
[ https://issues.apache.org/jira/browse/NIFI-8156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17268641#comment-17268641 ] Joe Witt commented on NIFI-8156: pr is up. i'll give it a look and assuming build is good get it merged before RC > PutCassandraRecord does not wrap byte arrays with a ByteBuffer, causing write > failures > -- > > Key: NIFI-8156 > URL: https://issues.apache.org/jira/browse/NIFI-8156 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.12.1 >Reporter: Mike Thomsen >Assignee: Mike Thomsen >Priority: Major > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > As the subject line says, types that are meant for a Cassandra bytes field > are not wrapped inside of a ByteBuffer. This causes a write failure when the > Cassandra driver attempts to write the array. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (NIFI-8156) PutCassandraRecord does not wrap byte arrays with a ByteBuffer, causing write failures
[ https://issues.apache.org/jira/browse/NIFI-8156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Witt updated NIFI-8156: --- Fix Version/s: 1.13.0 > PutCassandraRecord does not wrap byte arrays with a ByteBuffer, causing write > failures > -- > > Key: NIFI-8156 > URL: https://issues.apache.org/jira/browse/NIFI-8156 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.12.1 >Reporter: Mike Thomsen >Assignee: Mike Thomsen >Priority: Major > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > As the subject line says, types that are meant for a Cassandra bytes field > are not wrapped inside of a ByteBuffer. This causes a write failure when the > Cassandra driver attempts to write the array. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi] joewitt commented on pull request #4771: NIFI-8156 Fixed byte handling bug in cassandra.
joewitt commented on pull request #4771: URL: https://github.com/apache/nifi/pull/4771#issuecomment-763703079 assuming build is happy i'll merge This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] MikeThomsen opened a new pull request #4771: NIFI-8156 Fixed byte handling bug in cassandra.
MikeThomsen opened a new pull request #4771: URL: https://github.com/apache/nifi/pull/4771 Thank you for submitting a contribution to Apache NiFi. Please provide a short description of the PR here: Description of PR _Enables X functionality; fixes bug NIFI-._ In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with **NIFI-** where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically `main`)? - [ ] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._ ### For code changes: - [ ] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] Have you verified that the full build is successful on JDK 8? - [ ] Have you verified that the full build is successful on JDK 11? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`? - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`? - [ ] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #955: MINIFICPP-1414 Create in-memory compressed logs
adamdebreceni commented on a change in pull request #955: URL: https://github.com/apache/nifi-minifi-cpp/pull/955#discussion_r561038296 ## File path: libminifi/test/unit/LoggerTests.cpp ## @@ -107,3 +110,71 @@ TEST_CASE("Test ShortenNames", "[ttl6]") { LogTestController::getInstance(props)->reset(); LogTestController::getInstance().reset(); } + +using namespace minifi::io; + +std::string decompress(const std::shared_ptr& input) { + auto output = utils::make_unique(); + auto decompressor = std::make_shared(gsl::make_not_null(output.get())); + minifi::internal::pipe(input, decompressor); + decompressor->close(); + return std::string{reinterpret_cast(output->getBuffer()), output->size()}; +} + +TEST_CASE("Test Compression", "[ttl7]") { + auto& log_config = logging::LoggerConfiguration::getConfiguration(); + auto properties = std::make_shared(); + std::string className; + SECTION("Using root logger") { +className = "CompressionTestClassUsingRoot"; +// by default the root logger is OFF +properties->set("logger.root", "INFO"); + } + SECTION("Inherit compression sink") { +className = "CompressionTestClassInheriting"; +properties->set("appender.null", "null"); +properties->set("logger." + className, "INFO,null"); + } + log_config.initialize(properties); + auto logger = log_config.getLogger(className); + logger->log_error("Hi there"); + std::shared_ptr compressed_log{logging::LoggerConfiguration::getCompressedLog(true)}; + REQUIRE(compressed_log); + auto logs = decompress(compressed_log); + REQUIRE(logs.find("Hi there") != std::string::npos); Review comment: actually it turns out `logs == "Hi there"` so I changed it to that This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #955: MINIFICPP-1414 Create in-memory compressed logs
adamdebreceni commented on a change in pull request #955: URL: https://github.com/apache/nifi-minifi-cpp/pull/955#discussion_r561037947 ## File path: libminifi/include/core/logging/internal/LogBuffer.h ## @@ -0,0 +1,62 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "io/BufferStream.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace logging { +namespace internal { + +class LogBuffer { + public: + LogBuffer() = default; + explicit LogBuffer(std::unique_ptr buffer): buffer_{std::move(buffer)} {} + + static LogBuffer allocate(size_t max_size) { +LogBuffer instance{utils::make_unique()}; +instance.buffer_->reserve(max_size * 3 / 2); Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #955: MINIFICPP-1414 Create in-memory compressed logs
adamdebreceni commented on a change in pull request #955: URL: https://github.com/apache/nifi-minifi-cpp/pull/955#discussion_r561037821 ## File path: libminifi/src/core/logging/LoggerConfiguration.cpp ## @@ -85,6 +108,7 @@ LoggerConfiguration::LoggerConfiguration() void LoggerConfiguration::initialize(const std::shared_ptr _properties) { std::lock_guard lock(mutex); root_namespace_ = initialize_namespaces(logger_properties); + initializeCompression(lock, logger_properties); Review comment: setting any of the compression specific limits to 0 now results in it being completely disabled (so it is opt-out) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #955: MINIFICPP-1414 Create in-memory compressed logs
adamdebreceni commented on a change in pull request #955: URL: https://github.com/apache/nifi-minifi-cpp/pull/955#discussion_r561037369 ## File path: libminifi/include/io/BufferStream.h ## @@ -42,6 +42,14 @@ class BufferStream : public BaseStream { write(reinterpret_cast(data.c_str()), data.length()); } + /* + * prepares the stream to accept and additional byte_count bytes + * @param byte_count number of bytes we expect to write + */ + void reserve(size_t byte_count) { Review comment: renamed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #955: MINIFICPP-1414 Create in-memory compressed logs
adamdebreceni commented on a change in pull request #955: URL: https://github.com/apache/nifi-minifi-cpp/pull/955#discussion_r561011823 ## File path: libminifi/include/core/logging/internal/LogBuffer.h ## @@ -0,0 +1,62 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "io/BufferStream.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace logging { +namespace internal { + +class LogBuffer { + public: + LogBuffer() = default; + explicit LogBuffer(std::unique_ptr buffer): buffer_{std::move(buffer)} {} + + static LogBuffer allocate(size_t max_size) { +LogBuffer instance{utils::make_unique()}; +instance.buffer_->reserve(max_size * 3 / 2); Review comment: I am adding the comment to `StagingQueue` and moving the multiplication there This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (NIFI-8156) PutCassandraRecord does not wrap byte arrays with a ByteBuffer, causing write failures
Mike Thomsen created NIFI-8156: -- Summary: PutCassandraRecord does not wrap byte arrays with a ByteBuffer, causing write failures Key: NIFI-8156 URL: https://issues.apache.org/jira/browse/NIFI-8156 Project: Apache NiFi Issue Type: Bug Affects Versions: 1.12.1 Reporter: Mike Thomsen Assignee: Mike Thomsen As the subject line says, types that are meant for a Cassandra bytes field are not wrapped inside of a ByteBuffer. This causes a write failure when the Cassandra driver attempts to write the array. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #955: MINIFICPP-1414 Create in-memory compressed logs
adamdebreceni commented on a change in pull request #955: URL: https://github.com/apache/nifi-minifi-cpp/pull/955#discussion_r561008200 ## File path: libminifi/include/core/logging/internal/LogBuffer.h ## @@ -0,0 +1,62 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "io/BufferStream.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace logging { +namespace internal { + +class LogBuffer { + public: + LogBuffer() = default; + explicit LogBuffer(std::unique_ptr buffer): buffer_{std::move(buffer)} {} + + static LogBuffer allocate(size_t max_size) { +LogBuffer instance{utils::make_unique()}; +instance.buffer_->reserve(max_size * 3 / 2); Review comment: `max_size` is a soft limit, i.e. reaching `max_size` is an indicator that that block should be compressed and committed (or for the compressed blocks to be rotated), we cannot guarantee that only `max_size` content is written to the buffer (not unless we plan on compressing in and blocking the logger thread), since `max_size` is the "trigger limit", presumable each block would contain (at the trigger point) a little more than `max_size` content This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #955: MINIFICPP-1414 Create in-memory compressed logs
szaszm commented on a change in pull request #955: URL: https://github.com/apache/nifi-minifi-cpp/pull/955#discussion_r561009640 ## File path: libminifi/include/core/logging/internal/LogBuffer.h ## @@ -0,0 +1,62 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "io/BufferStream.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace logging { +namespace internal { + +class LogBuffer { + public: + LogBuffer() = default; + explicit LogBuffer(std::unique_ptr buffer): buffer_{std::move(buffer)} {} + + static LogBuffer allocate(size_t max_size) { +LogBuffer instance{utils::make_unique()}; +instance.buffer_->reserve(max_size * 3 / 2); Review comment: I see. I think this explanations would be nice to have in the code for future readers. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #955: MINIFICPP-1414 Create in-memory compressed logs
adamdebreceni commented on a change in pull request #955: URL: https://github.com/apache/nifi-minifi-cpp/pull/955#discussion_r561008200 ## File path: libminifi/include/core/logging/internal/LogBuffer.h ## @@ -0,0 +1,62 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "io/BufferStream.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace logging { +namespace internal { + +class LogBuffer { + public: + LogBuffer() = default; + explicit LogBuffer(std::unique_ptr buffer): buffer_{std::move(buffer)} {} + + static LogBuffer allocate(size_t max_size) { +LogBuffer instance{utils::make_unique()}; +instance.buffer_->reserve(max_size * 3 / 2); Review comment: `max_size` is a soft limit, i.e. reaching `max_size` is an indicator that that block should be compressed and committed (or for the compressed blocks to be discarded), we cannot guarantee that only `max_size` content is written to the buffer (not unless we plan on compressing in and blocking the logger thread), since `max_size` is the "trigger limit", presumable each block would contain (at the trigger point) a little more than `max_size` content This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #955: MINIFICPP-1414 Create in-memory compressed logs
adamdebreceni commented on a change in pull request #955: URL: https://github.com/apache/nifi-minifi-cpp/pull/955#discussion_r561008200 ## File path: libminifi/include/core/logging/internal/LogBuffer.h ## @@ -0,0 +1,62 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "io/BufferStream.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace logging { +namespace internal { + +class LogBuffer { + public: + LogBuffer() = default; + explicit LogBuffer(std::unique_ptr buffer): buffer_{std::move(buffer)} {} + + static LogBuffer allocate(size_t max_size) { +LogBuffer instance{utils::make_unique()}; +instance.buffer_->reserve(max_size * 3 / 2); Review comment: `max_size` is a soft limit, i.e. reaching `max_size` is an indicator that that block should be compressed and committed (or for the compressed blocks to be discarded), we cannot guarantee that only `max_size` content is written to the buffer (not unless we plan on compressing and blocking the logger thread), since `max_size` is the "trigger limit", presumable each block would contain (at the trigger point) a little more than `max_size` content This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #955: MINIFICPP-1414 Create in-memory compressed logs
szaszm commented on a change in pull request #955: URL: https://github.com/apache/nifi-minifi-cpp/pull/955#discussion_r560995043 ## File path: libminifi/src/core/logging/LoggerConfiguration.cpp ## @@ -85,6 +108,7 @@ LoggerConfiguration::LoggerConfiguration() void LoggerConfiguration::initialize(const std::shared_ptr _properties) { std::lock_guard lock(mutex); root_namespace_ = initialize_namespaces(logger_properties); + initializeCompression(lock, logger_properties); Review comment: If it can be implemented reasonably easily, then yes, it would be nice to be able to fully opt out from this feature. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #955: MINIFICPP-1414 Create in-memory compressed logs
adamdebreceni commented on a change in pull request #955: URL: https://github.com/apache/nifi-minifi-cpp/pull/955#discussion_r560993751 ## File path: libminifi/src/core/logging/LoggerConfiguration.cpp ## @@ -85,6 +108,7 @@ LoggerConfiguration::LoggerConfiguration() void LoggerConfiguration::initialize(const std::shared_ptr _properties) { std::lock_guard lock(mutex); root_namespace_ = initialize_namespaces(logger_properties); + initializeCompression(lock, logger_properties); Review comment: note that the maximum size of the in-memory logs (cached and compressed) is configurable, would like the whole compression to be configurable as well? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #975: MINIFICPP-1400 Create ListS3 processor
adamdebreceni commented on a change in pull request #975: URL: https://github.com/apache/nifi-minifi-cpp/pull/975#discussion_r560987684 ## File path: extensions/aws/s3/S3Wrapper.cpp ## @@ -30,46 +37,253 @@ namespace minifi { namespace aws { namespace s3 { -minifi::utils::optional S3Wrapper::sendPutObjectRequest(const Aws::S3::Model::PutObjectRequest& request) { - Aws::S3::S3Client s3_client(credentials_, client_config_); - auto outcome = s3_client.PutObject(request); +void HeadObjectResult::setFilePaths(const std::string& key) { + absolute_path = key; + std::tie(path, filename) = minifi::utils::file::FileUtils::split_path(key, true /*force_posix*/); +} + +S3Wrapper::S3Wrapper() : request_sender_(minifi::utils::make_unique()) { +} + +S3Wrapper::S3Wrapper(std::unique_ptr request_sender) : request_sender_(std::move(request_sender)) { +} + +void S3Wrapper::setCredentials(const Aws::Auth::AWSCredentials& cred) { + request_sender_->setCredentials(cred); +} + +void S3Wrapper::setRegion(const Aws::String& region) { + request_sender_->setRegion(region); +} + +void S3Wrapper::setTimeout(uint64_t timeout) { + request_sender_->setTimeout(timeout); +} + +void S3Wrapper::setEndpointOverrideUrl(const Aws::String& url) { + request_sender_->setEndpointOverrideUrl(url); +} + +void S3Wrapper::setProxy(const ProxyOptions& proxy) { + request_sender_->setProxy(proxy); +} + +void S3Wrapper::setCannedAcl(Aws::S3::Model::PutObjectRequest& request, const std::string& canned_acl) const { + if (canned_acl.empty() || CANNED_ACL_MAP.find(canned_acl) == CANNED_ACL_MAP.end()) +return; + + logger_->log_debug("Setting AWS canned ACL [%s]", canned_acl); + request.SetACL(CANNED_ACL_MAP.at(canned_acl)); +} + +Expiration S3Wrapper::getExpiration(const std::string& expiration) { + minifi::utils::Regex expr("expiry-date=\"(.*)\", rule-id=\"(.*)\""); + const auto match = expr.match(expiration); + const auto& results = expr.getResult(); + if (!match || results.size() < 3) +return Expiration{}; + return Expiration{results[1], results[2]}; +} + +std::string S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption encryption) { + if (encryption == Aws::S3::Model::ServerSideEncryption::NOT_SET) { +return ""; + } + + auto it = std::find_if(SERVER_SIDE_ENCRYPTION_MAP.begin(), SERVER_SIDE_ENCRYPTION_MAP.end(), +[&](const std::pair pair) { + return pair.second == encryption; +}); + if (it != SERVER_SIDE_ENCRYPTION_MAP.end()) { +return it->first; + } + return ""; +} + +minifi::utils::optional S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, std::shared_ptr data_stream) { + Aws::S3::Model::PutObjectRequest request; + request.SetBucket(put_object_params.bucket); + request.SetKey(put_object_params.object_key); + request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class)); + request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption)); + request.SetContentType(put_object_params.content_type); + request.SetMetadata(put_object_params.user_metadata_map); + request.SetBody(data_stream); + request.SetGrantFullControl(put_object_params.fullcontrol_user_list); + request.SetGrantRead(put_object_params.read_permission_user_list); + request.SetGrantReadACP(put_object_params.read_acl_user_list); + request.SetGrantWriteACP(put_object_params.write_acl_user_list); + setCannedAcl(request, put_object_params.canned_acl); + + auto aws_result = request_sender_->sendPutObjectRequest(request); + if (!aws_result) { +return minifi::utils::nullopt; + } + + PutObjectResult result; + // Etags are returned by AWS in quoted form that should be removed + result.etag = minifi::utils::StringUtils::removeFramingCharacters(aws_result->GetETag(), '"'); + result.version = aws_result->GetVersionId(); + + // GetExpiration returns a string pair with a date and a ruleid in 'expiry-date=\"\", rule-id=\"\"' format + // s3.expiration only needs the date member of this pair + result.expiration = getExpiration(aws_result->GetExpiration()).expiration_time; + result.ssealgorithm = getEncryptionString(aws_result->GetServerSideEncryption()); + return result; +} + +bool S3Wrapper::deleteObject(const std::string& bucket, const std::string& object_key, const std::string& version) { + Aws::S3::Model::DeleteObjectRequest request; + request.SetBucket(bucket); + request.SetKey(object_key); + if (!version.empty()) { +request.SetVersionId(version); + } + return request_sender_->sendDeleteObjectRequest(request); +} + +int64_t S3Wrapper::writeFetchedBody(Aws::IOStream& source, const int64_t data_size, const std::shared_ptr& output) { + static const uint64_t BUFFER_SIZE = 4096; + std::vector buffer; + buffer.reserve(BUFFER_SIZE); - if (outcome.IsSuccess()) { - logger_->log_info("Added S3 object '%s' to bucket '%s'", request.GetKey(), request.GetBucket()); - return
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka
szaszm commented on a change in pull request #940: URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r560987064 ## File path: extensions/librdkafka/rdkafka_utils.h ## @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "core/logging/LoggerConfiguration.h" +#include "utils/OptionalUtils.h" +#include "rdkafka.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +enum class KafkaEncoding { + UTF8, + HEX +}; + +struct rd_kafka_conf_deleter { + void operator()(rd_kafka_conf_t* ptr) const noexcept { rd_kafka_conf_destroy(ptr); } +}; + +struct rd_kafka_producer_deleter { + void operator()(rd_kafka_t* ptr) const noexcept { +rd_kafka_resp_err_t flush_ret = rd_kafka_flush(ptr, 1 /* ms */); // Matching the wait time of KafkaConnection.cpp +// If concerned, we could log potential errors here: +// if (RD_KAFKA_RESP_ERR__TIMED_OUT == flush_ret) { +// std::cerr << "Deleting producer failed: time-out while trying to flush" << std::endl; +// } +rd_kafka_destroy(ptr); + } +}; + +struct rd_kafka_consumer_deleter { + void operator()(rd_kafka_t* ptr) const noexcept { +rd_kafka_consumer_close(ptr); +rd_kafka_destroy(ptr); + } +}; + +struct rd_kafka_topic_partition_list_deleter { + void operator()(rd_kafka_topic_partition_list_t* ptr) const noexcept { rd_kafka_topic_partition_list_destroy(ptr); } +}; + +struct rd_kafka_topic_conf_deleter { + void operator()(rd_kafka_topic_conf_t* ptr) const noexcept { rd_kafka_topic_conf_destroy(ptr); } +}; +struct rd_kafka_topic_deleter { + void operator()(rd_kafka_topic_t* ptr) const noexcept { rd_kafka_topic_destroy(ptr); } +}; + +struct rd_kafka_message_deleter { + void operator()(rd_kafka_message_t* ptr) const noexcept { rd_kafka_message_destroy(ptr); } +}; + +struct rd_kafka_headers_deleter { + void operator()(rd_kafka_headers_t* ptr) const noexcept { rd_kafka_headers_destroy(ptr); } +}; + +template +void kafka_headers_for_each(const rd_kafka_headers_t* headers, T key_value_handle) { + const char *key; // Null terminated, not to be freed + const void *value; + std::size_t size; + for (std::size_t i = 0; RD_KAFKA_RESP_ERR_NO_ERROR == rd_kafka_header_get_all(headers, i, , , ); ++i) { +key_value_handle(std::string(key), std::string(static_cast(value), size)); Review comment: In that case pass a span down. The point is that the copy is not necessary because the usage is fully enclosed in the loop body i.e. the lifetime of `*value`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #955: MINIFICPP-1414 Create in-memory compressed logs
szaszm commented on a change in pull request #955: URL: https://github.com/apache/nifi-minifi-cpp/pull/955#discussion_r558340027 ## File path: libminifi/src/core/logging/LoggerConfiguration.cpp ## @@ -85,6 +108,7 @@ LoggerConfiguration::LoggerConfiguration() void LoggerConfiguration::initialize(const std::shared_ptr _properties) { std::lock_guard lock(mutex); root_namespace_ = initialize_namespaces(logger_properties); + initializeCompression(lock, logger_properties); Review comment: I think this should be configurable. It may not be feasible to use memory to store logs in low memory environments or when lightweight operation is desired. ## File path: libminifi/include/io/BufferStream.h ## @@ -42,6 +42,14 @@ class BufferStream : public BaseStream { write(reinterpret_cast(data.c_str()), data.length()); } + /* + * prepares the stream to accept and additional byte_count bytes + * @param byte_count number of bytes we expect to write + */ + void reserve(size_t byte_count) { Review comment: I would name this `extend` or similar to avoid confusion with STL `reserve` that takes the new capacity, not the difference. ## File path: libminifi/include/core/logging/internal/LogBuffer.h ## @@ -0,0 +1,62 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "io/BufferStream.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace logging { +namespace internal { + +class LogBuffer { + public: + LogBuffer() = default; + explicit LogBuffer(std::unique_ptr buffer): buffer_{std::move(buffer)} {} + + static LogBuffer allocate(size_t max_size) { +LogBuffer instance{utils::make_unique()}; +instance.buffer_->reserve(max_size * 3 / 2); Review comment: why is this multiplication? ## File path: libminifi/test/unit/LoggerTests.cpp ## @@ -107,3 +110,71 @@ TEST_CASE("Test ShortenNames", "[ttl6]") { LogTestController::getInstance(props)->reset(); LogTestController::getInstance().reset(); } + +using namespace minifi::io; + +std::string decompress(const std::shared_ptr& input) { + auto output = utils::make_unique(); + auto decompressor = std::make_shared(gsl::make_not_null(output.get())); + minifi::internal::pipe(input, decompressor); + decompressor->close(); + return std::string{reinterpret_cast(output->getBuffer()), output->size()}; +} + +TEST_CASE("Test Compression", "[ttl7]") { + auto& log_config = logging::LoggerConfiguration::getConfiguration(); + auto properties = std::make_shared(); + std::string className; + SECTION("Using root logger") { +className = "CompressionTestClassUsingRoot"; +// by default the root logger is OFF +properties->set("logger.root", "INFO"); + } + SECTION("Inherit compression sink") { +className = "CompressionTestClassInheriting"; +properties->set("appender.null", "null"); +properties->set("logger." + className, "INFO,null"); + } + log_config.initialize(properties); + auto logger = log_config.getLogger(className); + logger->log_error("Hi there"); + std::shared_ptr compressed_log{logging::LoggerConfiguration::getCompressedLog(true)}; + REQUIRE(compressed_log); + auto logs = decompress(compressed_log); + REQUIRE(logs.find("Hi there") != std::string::npos); Review comment: Shouldn't this be equal or `StringUtils::endsWith`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #975: MINIFICPP-1400 Create ListS3 processor
adamdebreceni commented on a change in pull request #975: URL: https://github.com/apache/nifi-minifi-cpp/pull/975#discussion_r560982313 ## File path: extensions/aws/s3/S3Wrapper.cpp ## @@ -30,46 +37,253 @@ namespace minifi { namespace aws { namespace s3 { -minifi::utils::optional S3Wrapper::sendPutObjectRequest(const Aws::S3::Model::PutObjectRequest& request) { - Aws::S3::S3Client s3_client(credentials_, client_config_); - auto outcome = s3_client.PutObject(request); +void HeadObjectResult::setFilePaths(const std::string& key) { + absolute_path = key; + std::tie(path, filename) = minifi::utils::file::FileUtils::split_path(key, true /*force_posix*/); +} + +S3Wrapper::S3Wrapper() : request_sender_(minifi::utils::make_unique()) { +} + +S3Wrapper::S3Wrapper(std::unique_ptr request_sender) : request_sender_(std::move(request_sender)) { +} + +void S3Wrapper::setCredentials(const Aws::Auth::AWSCredentials& cred) { + request_sender_->setCredentials(cred); +} + +void S3Wrapper::setRegion(const Aws::String& region) { + request_sender_->setRegion(region); +} + +void S3Wrapper::setTimeout(uint64_t timeout) { + request_sender_->setTimeout(timeout); +} + +void S3Wrapper::setEndpointOverrideUrl(const Aws::String& url) { + request_sender_->setEndpointOverrideUrl(url); +} + +void S3Wrapper::setProxy(const ProxyOptions& proxy) { + request_sender_->setProxy(proxy); +} + +void S3Wrapper::setCannedAcl(Aws::S3::Model::PutObjectRequest& request, const std::string& canned_acl) const { + if (canned_acl.empty() || CANNED_ACL_MAP.find(canned_acl) == CANNED_ACL_MAP.end()) +return; + + logger_->log_debug("Setting AWS canned ACL [%s]", canned_acl); + request.SetACL(CANNED_ACL_MAP.at(canned_acl)); +} + +Expiration S3Wrapper::getExpiration(const std::string& expiration) { + minifi::utils::Regex expr("expiry-date=\"(.*)\", rule-id=\"(.*)\""); + const auto match = expr.match(expiration); + const auto& results = expr.getResult(); + if (!match || results.size() < 3) +return Expiration{}; + return Expiration{results[1], results[2]}; +} + +std::string S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption encryption) { + if (encryption == Aws::S3::Model::ServerSideEncryption::NOT_SET) { +return ""; + } + + auto it = std::find_if(SERVER_SIDE_ENCRYPTION_MAP.begin(), SERVER_SIDE_ENCRYPTION_MAP.end(), +[&](const std::pair pair) { + return pair.second == encryption; +}); + if (it != SERVER_SIDE_ENCRYPTION_MAP.end()) { +return it->first; + } + return ""; +} + +minifi::utils::optional S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, std::shared_ptr data_stream) { + Aws::S3::Model::PutObjectRequest request; + request.SetBucket(put_object_params.bucket); + request.SetKey(put_object_params.object_key); + request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class)); + request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption)); + request.SetContentType(put_object_params.content_type); + request.SetMetadata(put_object_params.user_metadata_map); + request.SetBody(data_stream); + request.SetGrantFullControl(put_object_params.fullcontrol_user_list); + request.SetGrantRead(put_object_params.read_permission_user_list); + request.SetGrantReadACP(put_object_params.read_acl_user_list); + request.SetGrantWriteACP(put_object_params.write_acl_user_list); + setCannedAcl(request, put_object_params.canned_acl); + + auto aws_result = request_sender_->sendPutObjectRequest(request); + if (!aws_result) { +return minifi::utils::nullopt; + } + + PutObjectResult result; + // Etags are returned by AWS in quoted form that should be removed + result.etag = minifi::utils::StringUtils::removeFramingCharacters(aws_result->GetETag(), '"'); + result.version = aws_result->GetVersionId(); + + // GetExpiration returns a string pair with a date and a ruleid in 'expiry-date=\"\", rule-id=\"\"' format + // s3.expiration only needs the date member of this pair + result.expiration = getExpiration(aws_result->GetExpiration()).expiration_time; + result.ssealgorithm = getEncryptionString(aws_result->GetServerSideEncryption()); + return result; +} + +bool S3Wrapper::deleteObject(const std::string& bucket, const std::string& object_key, const std::string& version) { + Aws::S3::Model::DeleteObjectRequest request; + request.SetBucket(bucket); + request.SetKey(object_key); + if (!version.empty()) { +request.SetVersionId(version); + } + return request_sender_->sendDeleteObjectRequest(request); +} + +int64_t S3Wrapper::writeFetchedBody(Aws::IOStream& source, const int64_t data_size, const std::shared_ptr& output) { + static const uint64_t BUFFER_SIZE = 4096; + std::vector buffer; + buffer.reserve(BUFFER_SIZE); - if (outcome.IsSuccess()) { - logger_->log_info("Added S3 object '%s' to bucket '%s'", request.GetKey(), request.GetBucket()); - return
[GitHub] [nifi] markap14 opened a new pull request #4770: NIFI-8146: Ensure that we close the Connection/Statement/PreparedStat…
markap14 opened a new pull request #4770: URL: https://github.com/apache/nifi/pull/4770 …ement objects in finally blocks or try-with-resources Thank you for submitting a contribution to Apache NiFi. Please provide a short description of the PR here: Description of PR _Enables X functionality; fixes bug NIFI-._ In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with **NIFI-** where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically `main`)? - [ ] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._ ### For code changes: - [ ] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] Have you verified that the full build is successful on JDK 8? - [ ] Have you verified that the full build is successful on JDK 11? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`? - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`? - [ ] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Reopened] (NIFI-8146) Allow RecordPath to be used for specifying operation type and data fields when using PutDatabaseRecord
[ https://issues.apache.org/jira/browse/NIFI-8146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mark Payne reopened NIFI-8146: -- Re-opening issue because I realized that we don't always close the JDBC Statements / Connection objects. > Allow RecordPath to be used for specifying operation type and data fields > when using PutDatabaseRecord > -- > > Key: NIFI-8146 > URL: https://issues.apache.org/jira/browse/NIFI-8146 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Mark Payne >Assignee: Mark Payne >Priority: Major > Fix For: 1.13.0 > > Time Spent: 0.5h > Remaining Estimate: 0h > > PutDatbaseRecord requires that the Statement Type be defined as a property or > a FlowFile attribute. This means that if a FlowFile has many records, it must > be split apart into individual Records if there is more than 1 type of > statement needed per FlowFile. > It also assumes that the data to be inserted/updated/deleted/etc is the full > record. However, it's common to have some wrapper around the actual data, as > is the case with a tool like Debezium, which includes an Operation Type, a > 'before' snapshot and an 'after' snapshot. To accommodate this, we should > allow Record-friendly methods for specifying the path to the data and the > operation type. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #975: MINIFICPP-1400 Create ListS3 processor
adamdebreceni commented on a change in pull request #975: URL: https://github.com/apache/nifi-minifi-cpp/pull/975#discussion_r560970459 ## File path: extensions/aws/s3/S3Wrapper.cpp ## @@ -30,46 +37,253 @@ namespace minifi { namespace aws { namespace s3 { -minifi::utils::optional S3Wrapper::sendPutObjectRequest(const Aws::S3::Model::PutObjectRequest& request) { - Aws::S3::S3Client s3_client(credentials_, client_config_); - auto outcome = s3_client.PutObject(request); +void HeadObjectResult::setFilePaths(const std::string& key) { + absolute_path = key; + std::tie(path, filename) = minifi::utils::file::FileUtils::split_path(key, true /*force_posix*/); +} + +S3Wrapper::S3Wrapper() : request_sender_(minifi::utils::make_unique()) { +} + +S3Wrapper::S3Wrapper(std::unique_ptr request_sender) : request_sender_(std::move(request_sender)) { +} + +void S3Wrapper::setCredentials(const Aws::Auth::AWSCredentials& cred) { + request_sender_->setCredentials(cred); +} + +void S3Wrapper::setRegion(const Aws::String& region) { + request_sender_->setRegion(region); +} + +void S3Wrapper::setTimeout(uint64_t timeout) { + request_sender_->setTimeout(timeout); +} + +void S3Wrapper::setEndpointOverrideUrl(const Aws::String& url) { + request_sender_->setEndpointOverrideUrl(url); +} + +void S3Wrapper::setProxy(const ProxyOptions& proxy) { + request_sender_->setProxy(proxy); +} + +void S3Wrapper::setCannedAcl(Aws::S3::Model::PutObjectRequest& request, const std::string& canned_acl) const { + if (canned_acl.empty() || CANNED_ACL_MAP.find(canned_acl) == CANNED_ACL_MAP.end()) +return; + + logger_->log_debug("Setting AWS canned ACL [%s]", canned_acl); + request.SetACL(CANNED_ACL_MAP.at(canned_acl)); +} + +Expiration S3Wrapper::getExpiration(const std::string& expiration) { + minifi::utils::Regex expr("expiry-date=\"(.*)\", rule-id=\"(.*)\""); + const auto match = expr.match(expiration); + const auto& results = expr.getResult(); + if (!match || results.size() < 3) +return Expiration{}; + return Expiration{results[1], results[2]}; +} + +std::string S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption encryption) { + if (encryption == Aws::S3::Model::ServerSideEncryption::NOT_SET) { +return ""; + } + + auto it = std::find_if(SERVER_SIDE_ENCRYPTION_MAP.begin(), SERVER_SIDE_ENCRYPTION_MAP.end(), +[&](const std::pair pair) { + return pair.second == encryption; +}); + if (it != SERVER_SIDE_ENCRYPTION_MAP.end()) { +return it->first; + } + return ""; +} + +minifi::utils::optional S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, std::shared_ptr data_stream) { + Aws::S3::Model::PutObjectRequest request; + request.SetBucket(put_object_params.bucket); + request.SetKey(put_object_params.object_key); + request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class)); + request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption)); + request.SetContentType(put_object_params.content_type); + request.SetMetadata(put_object_params.user_metadata_map); + request.SetBody(data_stream); + request.SetGrantFullControl(put_object_params.fullcontrol_user_list); + request.SetGrantRead(put_object_params.read_permission_user_list); + request.SetGrantReadACP(put_object_params.read_acl_user_list); + request.SetGrantWriteACP(put_object_params.write_acl_user_list); + setCannedAcl(request, put_object_params.canned_acl); + + auto aws_result = request_sender_->sendPutObjectRequest(request); + if (!aws_result) { +return minifi::utils::nullopt; + } + + PutObjectResult result; + // Etags are returned by AWS in quoted form that should be removed + result.etag = minifi::utils::StringUtils::removeFramingCharacters(aws_result->GetETag(), '"'); + result.version = aws_result->GetVersionId(); + + // GetExpiration returns a string pair with a date and a ruleid in 'expiry-date=\"\", rule-id=\"\"' format + // s3.expiration only needs the date member of this pair + result.expiration = getExpiration(aws_result->GetExpiration()).expiration_time; + result.ssealgorithm = getEncryptionString(aws_result->GetServerSideEncryption()); + return result; +} + +bool S3Wrapper::deleteObject(const std::string& bucket, const std::string& object_key, const std::string& version) { + Aws::S3::Model::DeleteObjectRequest request; + request.SetBucket(bucket); + request.SetKey(object_key); + if (!version.empty()) { +request.SetVersionId(version); + } + return request_sender_->sendDeleteObjectRequest(request); +} + +int64_t S3Wrapper::writeFetchedBody(Aws::IOStream& source, const int64_t data_size, const std::shared_ptr& output) { + static const uint64_t BUFFER_SIZE = 4096; + std::vector buffer; + buffer.reserve(BUFFER_SIZE); - if (outcome.IsSuccess()) { - logger_->log_info("Added S3 object '%s' to bucket '%s'", request.GetKey(), request.GetBucket()); - return
[GitHub] [nifi] exceptionfactory commented on pull request #4734: NIFI-8023 Added toLocalDate() and updated toDate() in DataTypeUtils
exceptionfactory commented on pull request #4734: URL: https://github.com/apache/nifi/pull/4734#issuecomment-763605759 @markap14 Thanks for the review and additional feedback, I pushed an update to address your comments. There still appear to be some edge cases for some record-oriented processors when running NiFi in different time zones, based on running some test flows from @turcsanyip. The PR as it stands addresses the test failures for certain time zones as described in NIFI-8023 through the use of the new `DataTypeUtils.toLocalDate()` method. Additional work is probably necessary to address system time zone issues when using some of the JSON Record Readers and Writers, but perhaps it would be better to create a new issue to resolve those problems. Any additional feedback @turcsanyip or @adenes? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (NIFI-8154) AvroParquetHDFSRecordReader fails to read parquet file containing nested structs
[ https://issues.apache.org/jira/browse/NIFI-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17268566#comment-17268566 ] Pierre Villard commented on NIFI-8154: -- Hi [~jonesgh], Looks like just upgrading the dependency is causing errors in the unit tests. I'll need more time to revisit all of this when time permits. Feel free to submit a pull request if you have one that is working with the existing unit tests. > AvroParquetHDFSRecordReader fails to read parquet file containing nested > structs > > > Key: NIFI-8154 > URL: https://issues.apache.org/jira/browse/NIFI-8154 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Affects Versions: 1.11.3, 1.12.1 >Reporter: Glenn Jones >Assignee: Pierre Villard >Priority: Minor > Time Spent: 0.5h > Remaining Estimate: 0h > > FetchParquet can't be used to process files containing nested structs. When > trying to create a RecordSchema it runs into > https://issues.apache.org/jira/browse/PARQUET-1441, which causes it to fail. > We've patched this locally by building the nifi-parquet-processors with > parquet-avro 1.11.0, but it would be great if this made it into the next > release. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (NIFI-8154) AvroParquetHDFSRecordReader fails to read parquet file containing nested structs
[ https://issues.apache.org/jira/browse/NIFI-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pierre Villard reassigned NIFI-8154: Assignee: (was: Pierre Villard) > AvroParquetHDFSRecordReader fails to read parquet file containing nested > structs > > > Key: NIFI-8154 > URL: https://issues.apache.org/jira/browse/NIFI-8154 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Affects Versions: 1.11.3, 1.12.1 >Reporter: Glenn Jones >Priority: Minor > Time Spent: 0.5h > Remaining Estimate: 0h > > FetchParquet can't be used to process files containing nested structs. When > trying to create a RecordSchema it runs into > https://issues.apache.org/jira/browse/PARQUET-1441, which causes it to fail. > We've patched this locally by building the nifi-parquet-processors with > parquet-avro 1.11.0, but it would be great if this made it into the next > release. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi] exceptionfactory commented on a change in pull request #4734: NIFI-8023 Added toLocalDate() and updated toDate() in DataTypeUtils
exceptionfactory commented on a change in pull request #4734: URL: https://github.com/apache/nifi/pull/4734#discussion_r560952390 ## File path: nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java ## @@ -1085,6 +1180,31 @@ private static Object toEnum(Object value, EnumDataType dataType, String fieldNa throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Date for field " + fieldName); } +private static Date parseDate(final String string, final DateFormat dateFormat) throws ParseException { +// DateFormat.parse() creates java.util.Date with System Default Time Zone +final java.util.Date parsed = dateFormat.parse(string); + +Instant parsedInstant = parsed.toInstant(); +if (isTimeZoneAdjustmentRequired(dateFormat)) { +// Adjust parsed date using System Default Time Zone offset milliseconds when time zone format not found +parsedInstant = parsedInstant.minus(TimeZone.getDefault().getRawOffset(), ChronoUnit.MILLIS); +} + +return new Date(parsedInstant.toEpochMilli()); +} + +private static boolean isTimeZoneAdjustmentRequired(final DateFormat dateFormat) { +boolean adjustmentRequired = false; + +if (dateFormat instanceof SimpleDateFormat) { +final SimpleDateFormat simpleDateFormat = (SimpleDateFormat) dateFormat; +final String pattern = simpleDateFormat.toPattern(); +adjustmentRequired = !pattern.contains(TIME_ZONE_PATTERN); Review comment: Thanks, I adjusted the check to use a regular expression pattern for all three characters. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (NIFI-8154) AvroParquetHDFSRecordReader fails to read parquet file containing nested structs
[ https://issues.apache.org/jira/browse/NIFI-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pierre Villard updated NIFI-8154: - Status: Open (was: Patch Available) > AvroParquetHDFSRecordReader fails to read parquet file containing nested > structs > > > Key: NIFI-8154 > URL: https://issues.apache.org/jira/browse/NIFI-8154 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Affects Versions: 1.12.1, 1.11.3 >Reporter: Glenn Jones >Assignee: Pierre Villard >Priority: Minor > Time Spent: 0.5h > Remaining Estimate: 0h > > FetchParquet can't be used to process files containing nested structs. When > trying to create a RecordSchema it runs into > https://issues.apache.org/jira/browse/PARQUET-1441, which causes it to fail. > We've patched this locally by building the nifi-parquet-processors with > parquet-avro 1.11.0, but it would be great if this made it into the next > release. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi] pvillard31 commented on pull request #4769: NIFI-8154 - upgrade parquet-avro from 1.10.0 to 1.11.1
pvillard31 commented on pull request #4769: URL: https://github.com/apache/nifi/pull/4769#issuecomment-763599719 Tests are failing, will need to revisit when time permits. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] pvillard31 closed pull request #4769: NIFI-8154 - upgrade parquet-avro from 1.10.0 to 1.11.1
pvillard31 closed pull request #4769: URL: https://github.com/apache/nifi/pull/4769 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] exceptionfactory commented on a change in pull request #4734: NIFI-8023 Added toLocalDate() and updated toDate() in DataTypeUtils
exceptionfactory commented on a change in pull request #4734: URL: https://github.com/apache/nifi/pull/4734#discussion_r560951664 ## File path: nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java ## @@ -1040,6 +1049,93 @@ private static Object toEnum(Object value, EnumDataType dataType, String fieldNa throw new IllegalTypeConversionException("Cannot convert value " + value + " of type " + dataType.toString() + " for field " + fieldName); } +/** + * Convert value to Local Date with support for conversion from numbers or formatted strings + * + * @param value Value to be converted + * @param formatter Supplier for Date Time Formatter can be null values other than numeric strings + * @param fieldName Field Name for value to be converted + * @return Local Date or null when value to be converted is null + * @throws IllegalTypeConversionException Thrown when conversion from string fails or unsupported value provided + */ +public static LocalDate toLocalDate(final Object value, final Supplier formatter, final String fieldName) { +LocalDate localDate; + +if (value == null) { +return null; +} else if (value instanceof LocalDate) { +localDate = (LocalDate) value; +} else if (value instanceof java.util.Date) { +final java.util.Date date = (java.util.Date) value; +localDate = parseLocalDateEpochMillis(date.getTime()); +} else if (value instanceof Number) { +final long epochMillis = ((Number) value).longValue(); +localDate = parseLocalDateEpochMillis(epochMillis); +} else if (value instanceof String) { +try { +localDate = parseLocalDate((String) value, formatter); +} catch (final RuntimeException e) { +final String message = String.format("Failed Conversion of Field [%s] from String [%s] to LocalDate with Formatter [%s]", fieldName, value, formatter, e); Review comment: Thanks for catching that detail, I have updated the call to use `formatter.get()` and check for `null` formatter. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (NIFI-7263) Add a No tracking Strategy to ListFile/ListFTP
[ https://issues.apache.org/jira/browse/NIFI-7263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17268565#comment-17268565 ] humpfhumpf commented on NIFI-7263: -- In our production case, files are available on SFTP server without chronological order. We want ListSFTP to list all available files, and FetchSFTP to move them on a remote directory, after getting them. > Add a No tracking Strategy to ListFile/ListFTP > -- > > Key: NIFI-7263 > URL: https://issues.apache.org/jira/browse/NIFI-7263 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Jens M Kofoed >Assignee: Waleed Al Aibani >Priority: Major > Labels: ListFile, listftp > Fix For: 1.13.0 > > Time Spent: 1h 20m > Remaining Estimate: 0h > > The Listfile/ListFTP has 2 Listing Strategies: Tracking Timestamps and > Tracking Entities. > It would be very very nice if the List process also could have a No Tracking > (fix it your self) strategy > If running NIFI in a cluster the List/Fetch is the perfect solution instead > of using a GetFile. But we have had many caces where files in the pickup > folder has old timestamps, so here we have to use Tracking Entities. > The issue is in cases where you are not allowed to delete files but you have > to make a change to the file filter. The tracking entities start all over, > and list all files again. > In other situations we need to resent all data, and would like to clear the > state of the Tracking Entities. But you can't. > So I have to make a small flow for detecting duplicates. And in some cases > just ignore duplicates and in other caces open up for sending duplicates. But > it is a pain in the ... to use the Tracking Entities. > So a NO STRATEGY would be very very nice -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #4755: NIFI-8133 Add ability to suppress null/empty values in ElasticSearchCl…
ChrisSamo632 commented on a change in pull request #4755: URL: https://github.com/apache/nifi/pull/4755#discussion_r560939470 ## File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearch5ClientService_IT.groovy ## @@ -175,6 +177,41 @@ class ElasticSearch5ClientService_IT { runner.assertValid() } +@Test +void testNullSuppression() { Review comment: Happy for this thread to be resolved given you mention having run the integration tests below? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #975: MINIFICPP-1400 Create ListS3 processor
adamdebreceni commented on a change in pull request #975: URL: https://github.com/apache/nifi-minifi-cpp/pull/975#discussion_r560938789 ## File path: extensions/aws/processors/ListS3.cpp ## @@ -0,0 +1,294 @@ +/** + * @file ListS3.cpp + * ListS3 class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ListS3.h" + +#include +#include +#include +#include +#include + +#include "utils/StringUtils.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace aws { +namespace processors { + +const std::string ListS3::LATEST_LISTED_KEY_PREFIX = "listed_key."; +const std::string ListS3::LATEST_LISTED_KEY_TIMESTAMP = "listed_key.timestamp"; + +const core::Property ListS3::Delimiter( + core::PropertyBuilder::createProperty("Delimiter") +->withDescription("The string used to delimit directories within the bucket. Please consult the AWS documentation for the correct use of this field.") +->build()); +const core::Property ListS3::Prefix( + core::PropertyBuilder::createProperty("Prefix") +->withDescription("The prefix used to filter the object list. In most cases, it should end with a forward slash ('/').") +->build()); +const core::Property ListS3::UseVersions( + core::PropertyBuilder::createProperty("Use Versions") +->isRequired(true) +->withDefaultValue(false) +->withDescription("Specifies whether to use S3 versions, if applicable. If false, only the latest version of each object will be returned.") +->build()); +const core::Property ListS3::MinimumObjectAge( + core::PropertyBuilder::createProperty("Minimum Object Age") +->isRequired(true) +->withDefaultValue("0 sec") +->withDescription("The minimum age that an S3 object must be in order to be considered; any object younger than this amount of time (according to last modification date) will be ignored.") +->build()); +const core::Property ListS3::WriteObjectTags( + core::PropertyBuilder::createProperty("Write Object Tags") +->isRequired(true) +->withDefaultValue(false) +->withDescription("If set to 'true', the tags associated with the S3 object will be written as FlowFile attributes.") +->build()); +const core::Property ListS3::WriteUserMetadata( + core::PropertyBuilder::createProperty("Write User Metadata") +->isRequired(true) +->withDefaultValue(false) +->withDescription("If set to 'true', the user defined metadata associated with the S3 object will be added to FlowFile attributes/records.") +->build()); +const core::Property ListS3::RequesterPays( + core::PropertyBuilder::createProperty("Requester Pays") +->isRequired(true) +->withDefaultValue(false) +->withDescription("If true, indicates that the requester consents to pay any charges associated with listing the S3 bucket. This sets the 'x-amz-request-payer' header to 'requester'. " + "Note that this setting is only used if Write User Metadata is true.") +->build()); + +const core::Relationship ListS3::Success("success", "FlowFiles are routed to success relationship"); + +void ListS3::initialize() { + // Add new supported properties + updateSupportedProperties({Delimiter, Prefix, UseVersions, MinimumObjectAge, WriteObjectTags, WriteUserMetadata, RequesterPays}); + // Set the supported relationships + setSupportedRelationships({Success}); +} + +void ListS3::onSchedule(const std::shared_ptr , const std::shared_ptr ) { + S3Processor::onSchedule(context, sessionFactory); + + state_manager_ = context->getStateManager(); + if (state_manager_ == nullptr) { +throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager"); + } + + auto common_properties = getCommonELSupportedProperties(context, nullptr); + if (!common_properties) { +throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Required property is not set or invalid"); + } + configureS3Wrapper(common_properties.value()); + list_request_params_.bucket = common_properties->bucket; + + context->getProperty(Delimiter.getName(), list_request_params_.delimiter); + logger_->log_debug("ListS3: Delimiter [%s]", list_request_params_.delimiter); + + context->getProperty(Prefix.getName(),
[jira] [Updated] (NIFI-8154) AvroParquetHDFSRecordReader fails to read parquet file containing nested structs
[ https://issues.apache.org/jira/browse/NIFI-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pierre Villard updated NIFI-8154: - Status: Patch Available (was: Open) > AvroParquetHDFSRecordReader fails to read parquet file containing nested > structs > > > Key: NIFI-8154 > URL: https://issues.apache.org/jira/browse/NIFI-8154 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Affects Versions: 1.12.1, 1.11.3 >Reporter: Glenn Jones >Assignee: Pierre Villard >Priority: Minor > Time Spent: 10m > Remaining Estimate: 0h > > FetchParquet can't be used to process files containing nested structs. When > trying to create a RecordSchema it runs into > https://issues.apache.org/jira/browse/PARQUET-1441, which causes it to fail. > We've patched this locally by building the nifi-parquet-processors with > parquet-avro 1.11.0, but it would be great if this made it into the next > release. -- This message was sent by Atlassian Jira (v8.3.4#803005)