[jira] [Updated] (NIFI-9210) Upgrade jsoup to 1.14.2

2021-09-09 Thread David Handermann (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFI-9210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Handermann updated NIFI-9210:
---
Status: Patch Available  (was: Open)

> Upgrade jsoup to 1.14.2
> ---
>
> Key: NIFI-9210
> URL: https://issues.apache.org/jira/browse/NIFI-9210
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Affects Versions: 1.14.0, 1.15.0
>Reporter: David Handermann
>Assignee: David Handermann
>Priority: Minor
>  Labels: dependency-upgrade, security
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The following NiFi modules have a dependency on jsoup for HTML parsing:
>  * nifi-html-processors
>  * nifi-media-processors
> Versions of jsoup prior to 1.14.2 have an associated vulnerability 
> [CVE-2021-37714|https://nvd.nist.gov/vuln/detail/CVE-2021-37714]. Upgrading 
> to the latest version of jsoup addresses this potential vulnerability as well 
> as a number of minor bugs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (NIFI-9210) Upgrade jsoup to 1.14.2

2021-09-09 Thread David Handermann (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFI-9210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Handermann updated NIFI-9210:
---
Labels: dependency-upgrade security  (was: )

> Upgrade jsoup to 1.14.2
> ---
>
> Key: NIFI-9210
> URL: https://issues.apache.org/jira/browse/NIFI-9210
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Affects Versions: 1.14.0, 1.15.0
>Reporter: David Handermann
>Assignee: David Handermann
>Priority: Minor
>  Labels: dependency-upgrade, security
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The following NiFi modules have a dependency on jsoup for HTML parsing:
>  * nifi-html-processors
>  * nifi-media-processors
> Versions of jsoup prior to 1.14.2 have an associated vulnerability 
> [CVE-2021-37714|https://nvd.nist.gov/vuln/detail/CVE-2021-37714]. Upgrading 
> to the latest version of jsoup addresses this potential vulnerability as well 
> as a number of minor bugs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [nifi] exceptionfactory opened a new pull request #5379: NIFI-9210 Upgrade jsoup from 1.8.3 to 1.14.2

2021-09-09 Thread GitBox


exceptionfactory opened a new pull request #5379:
URL: https://github.com/apache/nifi/pull/5379


    Description of PR
   
   NIFI-9210 Upgrades `jsoup` from 1.8.3 to 1.14.2 in several modules to 
address potential vulnerabilities.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [X] Is there a JIRA ticket associated with this PR? Is it referenced 
in the commit message?
   
   - [X] Does your PR title start with **NIFI-** where  is the JIRA 
number you are trying to resolve? Pay particular attention to the hyphen "-" 
character.
   
   - [X] Has your PR been rebased against the latest commit within the target 
branch (typically `main`)?
   
   - [X] Is your initial contribution a single, squashed commit? _Additional 
commits in response to PR reviewer feedback should be made on this branch and 
pushed to allow change tracking. Do not `squash` or use `--force` when pushing 
to allow for clean monitoring of changes._
   
   ### For code changes:
   - [X] Have you ensured that the full suite of tests is executed via `mvn 
-Pcontrib-check clean install` at the root `nifi` folder?
   - [X] Have you written or updated unit tests to verify your changes?
   - [X] Have you verified that the full build is successful on JDK 8?
   - [X] Have you verified that the full build is successful on JDK 11?
   - [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
   - [ ] If applicable, have you updated the `LICENSE` file, including the main 
`LICENSE` file under `nifi-assembly`?
   - [ ] If applicable, have you updated the `NOTICE` file, including the main 
`NOTICE` file found under `nifi-assembly`?
   - [ ] If adding new Properties, have you added `.displayName` in addition to 
.name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which 
it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI for 
build issues and submit an update to your PR as soon as possible.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (NIFI-9167) Refactor nifi-framework-bundle to use JUnit 5

2021-09-09 Thread Mike Thomsen (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFI-9167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mike Thomsen reassigned NIFI-9167:
--

Assignee: Mike Thomsen

> Refactor nifi-framework-bundle to use JUnit 5
> -
>
> Key: NIFI-9167
> URL: https://issues.apache.org/jira/browse/NIFI-9167
> Project: Apache NiFi
>  Issue Type: Sub-task
>Reporter: Mike Thomsen
>Assignee: Mike Thomsen
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [nifi] MikeThomsen opened a new pull request #5378: NIFI-9166 Refactored nifi-standard-services to use JUnit 5.

2021-09-09 Thread GitBox


MikeThomsen opened a new pull request #5378:
URL: https://github.com/apache/nifi/pull/5378


   
   Thank you for submitting a contribution to Apache NiFi.
   
   Please provide a short description of the PR here:
   
    Description of PR
   
   _Enables X functionality; fixes bug NIFI-._
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
in the commit message?
   
   - [ ] Does your PR title start with **NIFI-** where  is the JIRA 
number you are trying to resolve? Pay particular attention to the hyphen "-" 
character.
   
   - [ ] Has your PR been rebased against the latest commit within the target 
branch (typically `main`)?
   
   - [ ] Is your initial contribution a single, squashed commit? _Additional 
commits in response to PR reviewer feedback should be made on this branch and 
pushed to allow change tracking. Do not `squash` or use `--force` when pushing 
to allow for clean monitoring of changes._
   
   ### For code changes:
   - [ ] Have you ensured that the full suite of tests is executed via `mvn 
-Pcontrib-check clean install` at the root `nifi` folder?
   - [ ] Have you written or updated unit tests to verify your changes?
   - [ ] Have you verified that the full build is successful on JDK 8?
   - [ ] Have you verified that the full build is successful on JDK 11?
   - [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
   - [ ] If applicable, have you updated the `LICENSE` file, including the main 
`LICENSE` file under `nifi-assembly`?
   - [ ] If applicable, have you updated the `NOTICE` file, including the main 
`NOTICE` file found under `nifi-assembly`?
   - [ ] If adding new Properties, have you added `.displayName` in addition to 
.name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which 
it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI for 
build issues and submit an update to your PR as soon as possible.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (NIFI-9210) Upgrade jsoup to 1.14.2

2021-09-09 Thread David Handermann (Jira)
David Handermann created NIFI-9210:
--

 Summary: Upgrade jsoup to 1.14.2
 Key: NIFI-9210
 URL: https://issues.apache.org/jira/browse/NIFI-9210
 Project: Apache NiFi
  Issue Type: Improvement
  Components: Extensions
Affects Versions: 1.14.0, 1.15.0
Reporter: David Handermann
Assignee: David Handermann


The following NiFi modules have a dependency on jsoup for HTML parsing:

 * nifi-html-processors
 * nifi-media-processors

Versions of jsoup prior to 1.14.2 have an associated vulnerability 
[CVE-2021-37714|https://nvd.nist.gov/vuln/detail/CVE-2021-37714]. Upgrading to 
the latest version of jsoup addresses this potential vulnerability as well as a 
number of minor bugs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (NIFI-9209) Class version conflict due to hive-exec-3.1.2.jar

2021-09-09 Thread Denes Arvay (Jira)
Denes Arvay created NIFI-9209:
-

 Summary: Class version conflict due to hive-exec-3.1.2.jar
 Key: NIFI-9209
 URL: https://issues.apache.org/jira/browse/NIFI-9209
 Project: Apache NiFi
  Issue Type: Bug
Reporter: Denes Arvay
Assignee: Denes Arvay


hive-exec-3.1.2.jar contains a couple of 3rd party libraries (org.apache.avro, 
org.apache.commons.lang, com.google.common, com.google.protobuf, etc) that can 
cause unexpected classloading issues as it happened recently with a PR [1], [2].

hive-exec has a core classifier version without these dependencies, that should 
be used instead.

[1] https://github.com/apache/nifi/pull/5358
[2] https://github.com/apache/nifi/pull/5358/checks?check_run_id=3544604627



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [nifi] mattyb149 commented on a change in pull request #5365: NIFI-9192: ResultSetRecordSet considers value of useLogicalTypes flag…

2021-09-09 Thread GitBox


mattyb149 commented on a change in pull request #5365:
URL: https://github.com/apache/nifi/pull/5365#discussion_r705635141



##
File path: 
nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
##
@@ -297,7 +271,54 @@ private DataType getDataType(final int sqlType, final 
ResultSet rs, final int co
 }
 }
 
-private static DataType getArrayBaseType(final Array array) throws 
SQLException {
+private DataType getArrayDataType(ResultSet rs, int columnIndex, boolean 
useLogicalTypes) throws SQLException {
+// The JDBC API does not allow us to know what the base type of an 
array is through the metadata.
+// As a result, we have to obtain the actual Array for this record. 
Once we have this, we can determine
+// the base type. However, if the base type is, itself, an array, we 
will simply return a base type of
+// String because otherwise, we need the ResultSet for the array 
itself, and many JDBC Drivers do not
+// support calling Array.getResultSet() and will throw an Exception if 
that is not supported.
+if (rs.isAfterLast()) {

Review comment:
   There are merge conflicts due to a try-catch added here and some other 
changes via [NIFI-8376](https://issues.apache.org/jira/browse/NIFI-8376)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (NIFI-9166) Refactor nifi-standard-services-bundle to use JUnit 5

2021-09-09 Thread Mike Thomsen (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFI-9166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mike Thomsen reassigned NIFI-9166:
--

Assignee: Mike Thomsen

> Refactor nifi-standard-services-bundle to use JUnit 5
> -
>
> Key: NIFI-9166
> URL: https://issues.apache.org/jira/browse/NIFI-9166
> Project: Apache NiFi
>  Issue Type: Sub-task
>Reporter: Mike Thomsen
>Assignee: Mike Thomsen
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [nifi] MikeThomsen opened a new pull request #5377: NIFI-9165 Refactored nifi-standard-bundle to use JUnit 5.

2021-09-09 Thread GitBox


MikeThomsen opened a new pull request #5377:
URL: https://github.com/apache/nifi/pull/5377


   
   Thank you for submitting a contribution to Apache NiFi.
   
   Please provide a short description of the PR here:
   
    Description of PR
   
   _Enables X functionality; fixes bug NIFI-._
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
in the commit message?
   
   - [ ] Does your PR title start with **NIFI-** where  is the JIRA 
number you are trying to resolve? Pay particular attention to the hyphen "-" 
character.
   
   - [ ] Has your PR been rebased against the latest commit within the target 
branch (typically `main`)?
   
   - [ ] Is your initial contribution a single, squashed commit? _Additional 
commits in response to PR reviewer feedback should be made on this branch and 
pushed to allow change tracking. Do not `squash` or use `--force` when pushing 
to allow for clean monitoring of changes._
   
   ### For code changes:
   - [ ] Have you ensured that the full suite of tests is executed via `mvn 
-Pcontrib-check clean install` at the root `nifi` folder?
   - [ ] Have you written or updated unit tests to verify your changes?
   - [ ] Have you verified that the full build is successful on JDK 8?
   - [ ] Have you verified that the full build is successful on JDK 11?
   - [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
   - [ ] If applicable, have you updated the `LICENSE` file, including the main 
`LICENSE` file under `nifi-assembly`?
   - [ ] If applicable, have you updated the `NOTICE` file, including the main 
`NOTICE` file found under `nifi-assembly`?
   - [ ] If adding new Properties, have you added `.displayName` in addition to 
.name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which 
it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI for 
build issues and submit an update to your PR as soon as possible.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] tpalfy opened a new pull request #5376: NIFI-9203 Improve GrokReader to be able to handle complex grok expression properly.

2021-09-09 Thread GitBox


tpalfy opened a new pull request #5376:
URL: https://github.com/apache/nifi/pull/5376


   https://issues.apache.org/jira/browse/NIFI-9203
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
in the commit message?
   
   - [ ] Does your PR title start with **NIFI-** where  is the JIRA 
number you are trying to resolve? Pay particular attention to the hyphen "-" 
character.
   
   - [ ] Has your PR been rebased against the latest commit within the target 
branch (typically `main`)?
   
   - [ ] Is your initial contribution a single, squashed commit? _Additional 
commits in response to PR reviewer feedback should be made on this branch and 
pushed to allow change tracking. Do not `squash` or use `--force` when pushing 
to allow for clean monitoring of changes._
   
   ### For code changes:
   - [ ] Have you ensured that the full suite of tests is executed via `mvn 
-Pcontrib-check clean install` at the root `nifi` folder?
   - [ ] Have you written or updated unit tests to verify your changes?
   - [ ] Have you verified that the full build is successful on JDK 8?
   - [ ] Have you verified that the full build is successful on JDK 11?
   - [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
   - [ ] If applicable, have you updated the `LICENSE` file, including the main 
`LICENSE` file under `nifi-assembly`?
   - [ ] If applicable, have you updated the `NOTICE` file, including the main 
`NOTICE` file found under `nifi-assembly`?
   - [ ] If adding new Properties, have you added `.displayName` in addition to 
.name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which 
it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI for 
build issues and submit an update to your PR as soon as possible.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (NIFI-9208) Adding capability to collect necessary resources similar to NAR Provider function

2021-09-09 Thread Simon Bence (Jira)
Simon Bence created NIFI-9208:
-

 Summary: Adding capability to collect necessary resources similar 
to NAR Provider function
 Key: NIFI-9208
 URL: https://issues.apache.org/jira/browse/NIFI-9208
 Project: Apache NiFi
  Issue Type: New Feature
  Components: Core Framework
Reporter: Simon Bence
Assignee: Simon Bence


Having a similar capability for "resources" as we have with NAR autoloader, 
that it is capable to gather NARs from external sources would be very useful. 
This could be applied to database drivers, Hadoop config files and so.

Note: as the two feature seems pretty similar, I plan to share as much 
functionality as it is reasonable. Probably (and preferably) I can separate the 
"listing and fetching" from NAR specific parts and let the autoloader depending 
on the more generic solution.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (NIFI-9207) Add max read size property to Distributed Cache Servers

2021-09-09 Thread Paul Grey (Jira)
Paul Grey created NIFI-9207:
---

 Summary: Add max read size property to Distributed Cache Servers
 Key: NIFI-9207
 URL: https://issues.apache.org/jira/browse/NIFI-9207
 Project: Apache NiFi
  Issue Type: Improvement
Reporter: Paul Grey
Assignee: Paul Grey


NiFi services DistributedMapCacheServer and DistributedSetCacheServer include a 
socket-based server, allowing service clients to collaboratively manage a cache 
of data values. IPC is performed via a defined set of APIs, which are 
implemented as a series of raw socket reads and writes.

Service processing of incoming socket data should validate and limit its size, 
to ensure proper functioning of the server.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #1120: MINIFICPP-1374 Implement security protocol support for ConsumeKafka

2021-09-09 Thread GitBox


lordgamez commented on a change in pull request #1120:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1120#discussion_r705419173



##
File path: docker/test/integration/features/kafka.feature
##
@@ -44,12 +44,9 @@ Feature: Sending data to using Kafka streaming platform 
using PublishKafka
   | PublishKafka   | Delivery Guarantee | 1
  |
   | PublishKafka   | Request Timeout| 10 sec   
  |
   | PublishKafka   | Message Timeout| 12 sec   
  |
-  | PublishKafka   | Security CA| /tmp/resources/certs/ca-cert 
  |

Review comment:
   Makes sense, I added that version as well in 
0cf6d007765d021d20573b0be61c6b793b60bf80




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] exceptionfactory commented on pull request #5318: NIFI-9064:Support Oracle timestamp when `Use Avro Logical Types` is t…

2021-09-09 Thread GitBox


exceptionfactory commented on pull request #5318:
URL: https://github.com/apache/nifi/pull/5318#issuecomment-916114656


   > Hi exceptionfactory, I tried to add a unit text. But as you can see, 
rs.getObject() and rs.getTimestamp() is standard methods in
   > java.sql.ResultSet. I can't figure out how to do this unit test,Do you 
have any good suggestions?
   
   The `ResultSetRecordSetTest` class has an existing `testCreateRecord()` 
method that uses a mocked `ResultSet`. You could either modify that method, or 
create a new test method that uses a sample `RecordSchema` with a timestamp 
column.  It might be clearer to create a new test method since the existing 
method is already large.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] KuKuDeCheng commented on pull request #5318: NIFI-9064:Support Oracle timestamp when `Use Avro Logical Types` is t…

2021-09-09 Thread GitBox


KuKuDeCheng commented on pull request #5318:
URL: https://github.com/apache/nifi/pull/5318#issuecomment-916087773


   Hi exceptionfactory, I tried to add a unit text. But as you can see, 
rs.getObject() and rs.getTimestamp() is standard methods in  
   java.sql.ResultSet.  I can't figure out how to do this unit test,Do you have 
any good suggestions? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] exceptionfactory commented on pull request #5318: NIFI-9064:Support Oracle timestamp when `Use Avro Logical Types` is t…

2021-09-09 Thread GitBox


exceptionfactory commented on pull request #5318:
URL: https://github.com/apache/nifi/pull/5318#issuecomment-916066468


   Thanks for the update @KuKuDeCheng, the changes in `ResultSetRecordSet` seem 
like a much cleaner solution. Can you add a unit test for the change?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1120: MINIFICPP-1374 Implement security protocol support for ConsumeKafka

2021-09-09 Thread GitBox


adamdebreceni commented on a change in pull request #1120:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1120#discussion_r705286235



##
File path: docker/test/integration/features/kafka.feature
##
@@ -44,12 +44,9 @@ Feature: Sending data to using Kafka streaming platform 
using PublishKafka
   | PublishKafka   | Delivery Guarantee | 1
  |
   | PublishKafka   | Request Timeout| 10 sec   
  |
   | PublishKafka   | Message Timeout| 12 sec   
  |
-  | PublishKafka   | Security CA| /tmp/resources/certs/ca-cert 
  |

Review comment:
   although we deprecated these properties in favor of the context service, 
shouldn't we keep testing the feature until we remove them?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

2021-09-09 Thread GitBox


fgerlits commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705279874



##
File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
##
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(uint64_t
 buffer_size, std::shared_ptr logger, CallbackType 
callback)
+  : buffer_(buffer_size),
+logger_(std::move(logger)),
+callback_(std::move(callback)) {
+}
+
+int64_t LineByLineInputOutputStreamCallback::process(const 
std::shared_ptr& input, const std::shared_ptr& 
output) {
+  gsl_Expects(input);
+  gsl_Expects(output);
+
+  input_size_ = input->size();
+
+  if (int64_t status = readLine(*input); status <= 0) {
+return status;
+  }
+
+  bool is_first_line = true;
+  do {
+if (int64_t status = readLine(*input); status < 0) {
+  return status;
+}
+std::string output_line = callback_(*current_line_, is_first_line, 
isLastLine());
+output->write(reinterpret_cast(output_line.data()), 
output_line.size());
+is_first_line = false;
+  } while (!isLastLine());
+
+  return gsl::narrow(total_bytes_read_);
+}
+
+int64_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream) 
{
+  std::string input_line;
+  const size_t current_read = readLine(stream, input_line);
+  if (io::isError(current_read)) {
+return -1;
+  }
+  if (current_read == 0) {
+current_line_ = next_line_;
+next_line_ = std::nullopt;
+return 0;
+  }
+  current_line_ = next_line_;
+  next_line_ = input_line;
+  total_bytes_read_ += current_read;
+  return gsl::narrow(current_read);
+}
+
+size_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream, 
std::string& line) {
+  if (current_pos_ == end_pos_) {
+const auto current_bytes_read = fillBuffer(stream);
+if (io::isError(current_bytes_read) || current_bytes_read == 0) {
+  return current_bytes_read;
+}
+  }
+
+  int64_t pos = current_pos_;
+  while (pos < end_pos_ && buffer_[pos] != '\n') {
+++pos;
+  }
+  if (pos < end_pos_ && buffer_[pos] == '\n') {
+++pos;
+line = std::string{buffer_.begin() + current_pos_, buffer_.begin() + pos};
+current_pos_ = pos;
+return line.size();
+  }
+
+  std::string start_of_line{buffer_.begin() + current_pos_, buffer_.begin() + 
end_pos_};
+
+  const auto current_bytes_read = fillBuffer(stream);
+  if (io::isError(current_bytes_read)) {
+return io::STREAM_ERROR;
+  } else if (current_bytes_read == 0) {
+line = start_of_line;
+return line.size();
+  }
+
+  pos = current_pos_;
+  while (pos < end_pos_ && buffer_[pos] != '\n') {

Review comment:
   the bug in (3) is fixed by 
https://github.com/apache/nifi-minifi-cpp/pull/1170/commits/4df70bdec67d7f91182e10b01e6df646acf5e85a




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

2021-09-09 Thread GitBox


fgerlits commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705200372



##
File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
##
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(uint64_t
 buffer_size, std::shared_ptr logger, CallbackType 
callback)
+  : buffer_(buffer_size),
+logger_(std::move(logger)),
+callback_(std::move(callback)) {
+}
+
+int64_t LineByLineInputOutputStreamCallback::process(const 
std::shared_ptr& input, const std::shared_ptr& 
output) {
+  gsl_Expects(input);
+  gsl_Expects(output);
+
+  input_size_ = input->size();
+
+  if (int64_t status = readLine(*input); status <= 0) {
+return status;
+  }
+
+  bool is_first_line = true;
+  do {
+if (int64_t status = readLine(*input); status < 0) {
+  return status;
+}
+std::string output_line = callback_(*current_line_, is_first_line, 
isLastLine());
+output->write(reinterpret_cast(output_line.data()), 
output_line.size());
+is_first_line = false;
+  } while (!isLastLine());
+
+  return gsl::narrow(total_bytes_read_);
+}
+
+int64_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream) 
{
+  std::string input_line;
+  const size_t current_read = readLine(stream, input_line);
+  if (io::isError(current_read)) {
+return -1;
+  }
+  if (current_read == 0) {
+current_line_ = next_line_;
+next_line_ = std::nullopt;
+return 0;
+  }
+  current_line_ = next_line_;
+  next_line_ = input_line;
+  total_bytes_read_ += current_read;
+  return gsl::narrow(current_read);
+}
+
+size_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream, 
std::string& line) {
+  if (current_pos_ == end_pos_) {
+const auto current_bytes_read = fillBuffer(stream);
+if (io::isError(current_bytes_read) || current_bytes_read == 0) {
+  return current_bytes_read;
+}
+  }
+
+  int64_t pos = current_pos_;
+  while (pos < end_pos_ && buffer_[pos] != '\n') {
+++pos;
+  }
+  if (pos < end_pos_ && buffer_[pos] == '\n') {
+++pos;
+line = std::string{buffer_.begin() + current_pos_, buffer_.begin() + pos};
+current_pos_ = pos;
+return line.size();
+  }
+
+  std::string start_of_line{buffer_.begin() + current_pos_, buffer_.begin() + 
end_pos_};
+
+  const auto current_bytes_read = fillBuffer(stream);
+  if (io::isError(current_bytes_read)) {
+return io::STREAM_ERROR;
+  } else if (current_bytes_read == 0) {
+line = start_of_line;
+return line.size();
+  }
+
+  pos = current_pos_;
+  while (pos < end_pos_ && buffer_[pos] != '\n') {

Review comment:
   (3) is handled correctly (the `\n`-less last line is treated as a line, 
is processed and output without a `\n`), **except** in the edge case where the 
last line doesn't end in a `\n` **and** it straddles a buffer boundary, which 
causes an error (flow file routed to failure).  Thanks for pointing this out, 
I'll fix it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

2021-09-09 Thread GitBox


adamdebreceni commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705173760



##
File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
##
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(uint64_t
 buffer_size, std::shared_ptr logger, CallbackType 
callback)
+  : buffer_(buffer_size),
+logger_(std::move(logger)),
+callback_(std::move(callback)) {
+}
+
+int64_t LineByLineInputOutputStreamCallback::process(const 
std::shared_ptr& input, const std::shared_ptr& 
output) {
+  gsl_Expects(input);
+  gsl_Expects(output);
+
+  input_size_ = input->size();
+
+  if (int64_t status = readLine(*input); status <= 0) {
+return status;
+  }
+
+  bool is_first_line = true;
+  do {
+if (int64_t status = readLine(*input); status < 0) {
+  return status;
+}
+std::string output_line = callback_(*current_line_, is_first_line, 
isLastLine());
+output->write(reinterpret_cast(output_line.data()), 
output_line.size());
+is_first_line = false;
+  } while (!isLastLine());
+
+  return gsl::narrow(total_bytes_read_);
+}
+
+int64_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream) 
{
+  std::string input_line;
+  const size_t current_read = readLine(stream, input_line);
+  if (io::isError(current_read)) {
+return -1;
+  }
+  if (current_read == 0) {
+current_line_ = next_line_;
+next_line_ = std::nullopt;
+return 0;
+  }
+  current_line_ = next_line_;
+  next_line_ = input_line;
+  total_bytes_read_ += current_read;
+  return gsl::narrow(current_read);
+}
+
+size_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream, 
std::string& line) {
+  if (current_pos_ == end_pos_) {
+const auto current_bytes_read = fillBuffer(stream);
+if (io::isError(current_bytes_read) || current_bytes_read == 0) {
+  return current_bytes_read;
+}
+  }
+
+  int64_t pos = current_pos_;
+  while (pos < end_pos_ && buffer_[pos] != '\n') {
+++pos;
+  }
+  if (pos < end_pos_ && buffer_[pos] == '\n') {
+++pos;
+line = std::string{buffer_.begin() + current_pos_, buffer_.begin() + pos};
+current_pos_ = pos;
+return line.size();
+  }
+
+  std::string start_of_line{buffer_.begin() + current_pos_, buffer_.begin() + 
end_pos_};
+
+  const auto current_bytes_read = fillBuffer(stream);
+  if (io::isError(current_bytes_read)) {
+return io::STREAM_ERROR;
+  } else if (current_bytes_read == 0) {
+line = start_of_line;
+return line.size();
+  }
+
+  pos = current_pos_;
+  while (pos < end_pos_ && buffer_[pos] != '\n') {
+++pos;
+  }
+  if (pos < end_pos_ && buffer_[pos] == '\n') {
+++pos;
+line = start_of_line + std::string{buffer_.begin() + current_pos_, 
buffer_.begin() + pos};
+current_pos_ = pos;
+return line.size();
+  }
+
+  logger_->log_error("Found a line longer than the max buffer size (%zu 
bytes)", buffer_.size());

Review comment:
   I guess I was just making an observation aloud :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

2021-09-09 Thread GitBox


fgerlits commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705172435



##
File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
##
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(uint64_t
 buffer_size, std::shared_ptr logger, CallbackType 
callback)
+  : buffer_(buffer_size),
+logger_(std::move(logger)),
+callback_(std::move(callback)) {
+}
+
+int64_t LineByLineInputOutputStreamCallback::process(const 
std::shared_ptr& input, const std::shared_ptr& 
output) {
+  gsl_Expects(input);
+  gsl_Expects(output);
+
+  input_size_ = input->size();
+
+  if (int64_t status = readLine(*input); status <= 0) {
+return status;
+  }
+
+  bool is_first_line = true;
+  do {
+if (int64_t status = readLine(*input); status < 0) {
+  return status;
+}
+std::string output_line = callback_(*current_line_, is_first_line, 
isLastLine());
+output->write(reinterpret_cast(output_line.data()), 
output_line.size());
+is_first_line = false;
+  } while (!isLastLine());
+
+  return gsl::narrow(total_bytes_read_);
+}
+
+int64_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream) 
{
+  std::string input_line;
+  const size_t current_read = readLine(stream, input_line);
+  if (io::isError(current_read)) {
+return -1;
+  }
+  if (current_read == 0) {
+current_line_ = next_line_;
+next_line_ = std::nullopt;
+return 0;
+  }
+  current_line_ = next_line_;
+  next_line_ = input_line;
+  total_bytes_read_ += current_read;
+  return gsl::narrow(current_read);
+}
+
+size_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream, 
std::string& line) {
+  if (current_pos_ == end_pos_) {
+const auto current_bytes_read = fillBuffer(stream);
+if (io::isError(current_bytes_read) || current_bytes_read == 0) {
+  return current_bytes_read;
+}
+  }
+
+  int64_t pos = current_pos_;
+  while (pos < end_pos_ && buffer_[pos] != '\n') {
+++pos;
+  }
+  if (pos < end_pos_ && buffer_[pos] == '\n') {
+++pos;
+line = std::string{buffer_.begin() + current_pos_, buffer_.begin() + pos};
+current_pos_ = pos;
+return line.size();
+  }
+
+  std::string start_of_line{buffer_.begin() + current_pos_, buffer_.begin() + 
end_pos_};
+
+  const auto current_bytes_read = fillBuffer(stream);
+  if (io::isError(current_bytes_read)) {
+return io::STREAM_ERROR;
+  } else if (current_bytes_read == 0) {
+line = start_of_line;
+return line.size();
+  }
+
+  pos = current_pos_;
+  while (pos < end_pos_ && buffer_[pos] != '\n') {

Review comment:
   Just speculating, but that may have been done in NiFi to deal with the 
old MacOS line endings of bare `\r`.  Since these line endings are not used by 
any computer in use today, I think using `\n`, which covers Windows, Linux and 
MacOS X+, is fine.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

2021-09-09 Thread GitBox


adamdebreceni commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705169032



##
File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
##
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(uint64_t
 buffer_size, std::shared_ptr logger, CallbackType 
callback)
+  : buffer_(buffer_size),
+logger_(std::move(logger)),
+callback_(std::move(callback)) {
+}
+
+int64_t LineByLineInputOutputStreamCallback::process(const 
std::shared_ptr& input, const std::shared_ptr& 
output) {
+  gsl_Expects(input);
+  gsl_Expects(output);
+
+  input_size_ = input->size();
+
+  if (int64_t status = readLine(*input); status <= 0) {
+return status;
+  }
+
+  bool is_first_line = true;
+  do {
+if (int64_t status = readLine(*input); status < 0) {
+  return status;
+}
+std::string output_line = callback_(*current_line_, is_first_line, 
isLastLine());
+output->write(reinterpret_cast(output_line.data()), 
output_line.size());
+is_first_line = false;
+  } while (!isLastLine());
+
+  return gsl::narrow(total_bytes_read_);
+}
+
+int64_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream) 
{
+  std::string input_line;
+  const size_t current_read = readLine(stream, input_line);
+  if (io::isError(current_read)) {
+return -1;
+  }
+  if (current_read == 0) {
+current_line_ = next_line_;
+next_line_ = std::nullopt;
+return 0;
+  }
+  current_line_ = next_line_;
+  next_line_ = input_line;
+  total_bytes_read_ += current_read;
+  return gsl::narrow(current_read);
+}
+
+size_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream, 
std::string& line) {
+  if (current_pos_ == end_pos_) {
+const auto current_bytes_read = fillBuffer(stream);
+if (io::isError(current_bytes_read) || current_bytes_read == 0) {
+  return current_bytes_read;
+}
+  }
+
+  int64_t pos = current_pos_;
+  while (pos < end_pos_ && buffer_[pos] != '\n') {
+++pos;
+  }
+  if (pos < end_pos_ && buffer_[pos] == '\n') {
+++pos;
+line = std::string{buffer_.begin() + current_pos_, buffer_.begin() + pos};
+current_pos_ = pos;
+return line.size();
+  }
+
+  std::string start_of_line{buffer_.begin() + current_pos_, buffer_.begin() + 
end_pos_};
+
+  const auto current_bytes_read = fillBuffer(stream);
+  if (io::isError(current_bytes_read)) {
+return io::STREAM_ERROR;
+  } else if (current_bytes_read == 0) {
+line = start_of_line;
+return line.size();
+  }
+
+  pos = current_pos_;
+  while (pos < end_pos_ && buffer_[pos] != '\n') {

Review comment:
   unfortunately it seems nifi has a different concept of "line", it  uses 
the `LineDemarcator` which
   
   1. considers all `\n` as end-of-line
   2. considers all `\r` followed by a non-`\n` as end-of-line
   3. considers a non-empty (!) tail after the last end-of-line as a line (or 
beginning of string)
   
   (it's quite confusing) 
   
   this was implemented in RouteText to be in-line with nifi, but of course we 
could decide to change them to use a different segmentation logic




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

2021-09-09 Thread GitBox


adamdebreceni commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705169032



##
File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
##
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(uint64_t
 buffer_size, std::shared_ptr logger, CallbackType 
callback)
+  : buffer_(buffer_size),
+logger_(std::move(logger)),
+callback_(std::move(callback)) {
+}
+
+int64_t LineByLineInputOutputStreamCallback::process(const 
std::shared_ptr& input, const std::shared_ptr& 
output) {
+  gsl_Expects(input);
+  gsl_Expects(output);
+
+  input_size_ = input->size();
+
+  if (int64_t status = readLine(*input); status <= 0) {
+return status;
+  }
+
+  bool is_first_line = true;
+  do {
+if (int64_t status = readLine(*input); status < 0) {
+  return status;
+}
+std::string output_line = callback_(*current_line_, is_first_line, 
isLastLine());
+output->write(reinterpret_cast(output_line.data()), 
output_line.size());
+is_first_line = false;
+  } while (!isLastLine());
+
+  return gsl::narrow(total_bytes_read_);
+}
+
+int64_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream) 
{
+  std::string input_line;
+  const size_t current_read = readLine(stream, input_line);
+  if (io::isError(current_read)) {
+return -1;
+  }
+  if (current_read == 0) {
+current_line_ = next_line_;
+next_line_ = std::nullopt;
+return 0;
+  }
+  current_line_ = next_line_;
+  next_line_ = input_line;
+  total_bytes_read_ += current_read;
+  return gsl::narrow(current_read);
+}
+
+size_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream, 
std::string& line) {
+  if (current_pos_ == end_pos_) {
+const auto current_bytes_read = fillBuffer(stream);
+if (io::isError(current_bytes_read) || current_bytes_read == 0) {
+  return current_bytes_read;
+}
+  }
+
+  int64_t pos = current_pos_;
+  while (pos < end_pos_ && buffer_[pos] != '\n') {
+++pos;
+  }
+  if (pos < end_pos_ && buffer_[pos] == '\n') {
+++pos;
+line = std::string{buffer_.begin() + current_pos_, buffer_.begin() + pos};
+current_pos_ = pos;
+return line.size();
+  }
+
+  std::string start_of_line{buffer_.begin() + current_pos_, buffer_.begin() + 
end_pos_};
+
+  const auto current_bytes_read = fillBuffer(stream);
+  if (io::isError(current_bytes_read)) {
+return io::STREAM_ERROR;
+  } else if (current_bytes_read == 0) {
+line = start_of_line;
+return line.size();
+  }
+
+  pos = current_pos_;
+  while (pos < end_pos_ && buffer_[pos] != '\n') {

Review comment:
   unfortunately it seems nifi has a different concept of "line", it  uses 
the `LineDemarcator` which
   
   1. considers all `\n` as end-of-line
   2. considers all `\r` followed by a non-`\n` as end-of-line
   3. considers a non-empty (!) tail after the last end-of-line as a line (or 
beginning of line)
   
   (it's quite confusing) 
   
   this was implemented in RouteText to be in-line with nifi, but of course we 
could decide to change them to use a different segmentation logic




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

2021-09-09 Thread GitBox


adamdebreceni commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705169032



##
File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
##
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(uint64_t
 buffer_size, std::shared_ptr logger, CallbackType 
callback)
+  : buffer_(buffer_size),
+logger_(std::move(logger)),
+callback_(std::move(callback)) {
+}
+
+int64_t LineByLineInputOutputStreamCallback::process(const 
std::shared_ptr& input, const std::shared_ptr& 
output) {
+  gsl_Expects(input);
+  gsl_Expects(output);
+
+  input_size_ = input->size();
+
+  if (int64_t status = readLine(*input); status <= 0) {
+return status;
+  }
+
+  bool is_first_line = true;
+  do {
+if (int64_t status = readLine(*input); status < 0) {
+  return status;
+}
+std::string output_line = callback_(*current_line_, is_first_line, 
isLastLine());
+output->write(reinterpret_cast(output_line.data()), 
output_line.size());
+is_first_line = false;
+  } while (!isLastLine());
+
+  return gsl::narrow(total_bytes_read_);
+}
+
+int64_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream) 
{
+  std::string input_line;
+  const size_t current_read = readLine(stream, input_line);
+  if (io::isError(current_read)) {
+return -1;
+  }
+  if (current_read == 0) {
+current_line_ = next_line_;
+next_line_ = std::nullopt;
+return 0;
+  }
+  current_line_ = next_line_;
+  next_line_ = input_line;
+  total_bytes_read_ += current_read;
+  return gsl::narrow(current_read);
+}
+
+size_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream, 
std::string& line) {
+  if (current_pos_ == end_pos_) {
+const auto current_bytes_read = fillBuffer(stream);
+if (io::isError(current_bytes_read) || current_bytes_read == 0) {
+  return current_bytes_read;
+}
+  }
+
+  int64_t pos = current_pos_;
+  while (pos < end_pos_ && buffer_[pos] != '\n') {
+++pos;
+  }
+  if (pos < end_pos_ && buffer_[pos] == '\n') {
+++pos;
+line = std::string{buffer_.begin() + current_pos_, buffer_.begin() + pos};
+current_pos_ = pos;
+return line.size();
+  }
+
+  std::string start_of_line{buffer_.begin() + current_pos_, buffer_.begin() + 
end_pos_};
+
+  const auto current_bytes_read = fillBuffer(stream);
+  if (io::isError(current_bytes_read)) {
+return io::STREAM_ERROR;
+  } else if (current_bytes_read == 0) {
+line = start_of_line;
+return line.size();
+  }
+
+  pos = current_pos_;
+  while (pos < end_pos_ && buffer_[pos] != '\n') {

Review comment:
   unfortunately it seems nifi has a different concept of "line", it  uses 
the `LineDemarcator` which
   
   1. considers all `\n` as end-of-line
   2. considers all `\r` followed by a non-`\n` as end-of-line
   3. considers a non-empty (!) tail after the last end-of-line as a line
   
   (it's quite confusing) 
   
   this was implemented in RouteText to be in-line with nifi, but of course we 
could decide to change them to use a different segmentation logic




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

2021-09-09 Thread GitBox


fgerlits commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705166242



##
File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
##
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(uint64_t
 buffer_size, std::shared_ptr logger, CallbackType 
callback)
+  : buffer_(buffer_size),
+logger_(std::move(logger)),
+callback_(std::move(callback)) {
+}
+
+int64_t LineByLineInputOutputStreamCallback::process(const 
std::shared_ptr& input, const std::shared_ptr& 
output) {
+  gsl_Expects(input);
+  gsl_Expects(output);
+
+  input_size_ = input->size();
+
+  if (int64_t status = readLine(*input); status <= 0) {
+return status;
+  }
+
+  bool is_first_line = true;
+  do {
+if (int64_t status = readLine(*input); status < 0) {
+  return status;
+}
+std::string output_line = callback_(*current_line_, is_first_line, 
isLastLine());
+output->write(reinterpret_cast(output_line.data()), 
output_line.size());
+is_first_line = false;
+  } while (!isLastLine());
+
+  return gsl::narrow(total_bytes_read_);
+}
+
+int64_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream) 
{
+  std::string input_line;
+  const size_t current_read = readLine(stream, input_line);
+  if (io::isError(current_read)) {
+return -1;
+  }
+  if (current_read == 0) {
+current_line_ = next_line_;
+next_line_ = std::nullopt;
+return 0;
+  }
+  current_line_ = next_line_;
+  next_line_ = input_line;
+  total_bytes_read_ += current_read;
+  return gsl::narrow(current_read);
+}
+
+size_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream, 
std::string& line) {
+  if (current_pos_ == end_pos_) {
+const auto current_bytes_read = fillBuffer(stream);
+if (io::isError(current_bytes_read) || current_bytes_read == 0) {
+  return current_bytes_read;
+}
+  }
+
+  int64_t pos = current_pos_;
+  while (pos < end_pos_ && buffer_[pos] != '\n') {
+++pos;
+  }
+  if (pos < end_pos_ && buffer_[pos] == '\n') {
+++pos;
+line = std::string{buffer_.begin() + current_pos_, buffer_.begin() + pos};
+current_pos_ = pos;
+return line.size();
+  }
+
+  std::string start_of_line{buffer_.begin() + current_pos_, buffer_.begin() + 
end_pos_};
+
+  const auto current_bytes_read = fillBuffer(stream);
+  if (io::isError(current_bytes_read)) {
+return io::STREAM_ERROR;
+  } else if (current_bytes_read == 0) {
+line = start_of_line;
+return line.size();
+  }
+
+  pos = current_pos_;
+  while (pos < end_pos_ && buffer_[pos] != '\n') {
+++pos;
+  }
+  if (pos < end_pos_ && buffer_[pos] == '\n') {
+++pos;
+line = start_of_line + std::string{buffer_.begin() + current_pos_, 
buffer_.begin() + pos};
+current_pos_ = pos;
+return line.size();
+  }
+
+  logger_->log_error("Found a line longer than the max buffer size (%zu 
bytes)", buffer_.size());

Review comment:
   That's correct, lines up to 2 times the buffer size may be accepted, but 
only lines <= the buffer size are guaranteed to be accepted.  Do you think that 
is a problem?  Or do you think we need to make the error message more verbose?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

2021-09-09 Thread GitBox


fgerlits commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705163626



##
File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
##
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(uint64_t
 buffer_size, std::shared_ptr logger, CallbackType 
callback)
+  : buffer_(buffer_size),
+logger_(std::move(logger)),
+callback_(std::move(callback)) {
+}
+
+int64_t LineByLineInputOutputStreamCallback::process(const 
std::shared_ptr& input, const std::shared_ptr& 
output) {
+  gsl_Expects(input);
+  gsl_Expects(output);
+
+  input_size_ = input->size();
+
+  if (int64_t status = readLine(*input); status <= 0) {
+return status;
+  }
+
+  bool is_first_line = true;
+  do {
+if (int64_t status = readLine(*input); status < 0) {
+  return status;
+}
+std::string output_line = callback_(*current_line_, is_first_line, 
isLastLine());
+output->write(reinterpret_cast(output_line.data()), 
output_line.size());
+is_first_line = false;
+  } while (!isLastLine());
+
+  return gsl::narrow(total_bytes_read_);
+}
+
+int64_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream) 
{
+  std::string input_line;
+  const size_t current_read = readLine(stream, input_line);
+  if (io::isError(current_read)) {
+return -1;
+  }
+  if (current_read == 0) {
+current_line_ = next_line_;
+next_line_ = std::nullopt;
+return 0;
+  }
+  current_line_ = next_line_;
+  next_line_ = input_line;
+  total_bytes_read_ += current_read;
+  return gsl::narrow(current_read);
+}
+
+size_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream, 
std::string& line) {
+  if (current_pos_ == end_pos_) {
+const auto current_bytes_read = fillBuffer(stream);

Review comment:
   thanks, I'll take a look




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

2021-09-09 Thread GitBox


fgerlits commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705163375



##
File path: extensions/standard-processors/processors/ReplaceText.h
##
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include 
+#include 
+#include 
+#include 
+
+#include "core/Annotation.h"
+#include "core/Processor.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessSessionFactory.h"
+#include "core/Resource.h"
+#include "core/logging/Logger.h"
+#include "utils/Enum.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+SMART_ENUM(EvaluationModeType,
+  (LINE_BY_LINE, "Line-by-Line"),
+  (ENTIRE_TEXT, "Entire text")
+)
+
+SMART_ENUM(LineByLineEvaluationModeType,
+  (ALL, "All"),
+  (FIRST_LINE, "First-Line"),
+  (LAST_LINE, "Last-Line"),
+  (EXCEPT_FIRST_LINE, "Except-First-Line"),
+  (EXCEPT_LAST_LINE, "Except-Last-Line")
+)
+
+SMART_ENUM(ReplacementStrategyType,
+  (PREPEND, "Prepend"),
+  (APPEND, "Append"),
+  (REGEX_REPLACE, "Regex Replace"),
+  (LITERAL_REPLACE, "Literal Replace"),
+  (ALWAYS_REPLACE, "Always Replace"),
+  (SUBSTITUTE_VARIABLES, "Substitute Variables")
+)
+
+class ReplaceText : public core::Processor {
+ public:
+  static const core::Property EvaluationMode;
+  static const core::Property LineByLineEvaluationMode;
+  static const core::Property ReplacementStrategy;
+  static const core::Property MaximumBufferSize;
+  static const core::Property SearchValue;
+  static const core::Property ReplacementValue;
+
+  static const core::Relationship Success;
+  static const core::Relationship Failure;
+
+  explicit ReplaceText(const std::string& name, const utils::Identifier& uuid 
= {});
+  core::annotation::Input getInputRequirement() const override;
+  void initialize() override;
+  void onSchedule(const std::shared_ptr& context, const 
std::shared_ptr&) override;
+  void onTrigger(const std::shared_ptr& context, const 
std::shared_ptr& session) override;
+
+ private:
+  friend struct ReplaceTextTestAccessor;
+
+  void readSearchValueProperty(const std::shared_ptr& 
context, const std::shared_ptr& flow_file);
+  void readReplacementValueProperty(const 
std::shared_ptr& context, const 
std::shared_ptr& flow_file);
+
+  void replaceTextInEntireFile(const std::shared_ptr& 
flow_file, const std::shared_ptr& session) const;
+  void replaceTextLineByLine(const std::shared_ptr& flow_file, 
const std::shared_ptr& session) const;
+
+  std::string applyReplacements(const std::string& input, const 
std::shared_ptr& flow_file) const;
+  std::string applyRegexReplace(const std::string& input) const;
+  std::string applyLiteralReplace(const std::string& input) const;
+  std::string applySubstituteVariables(const std::string& input, const 
std::shared_ptr& flow_file) const;
+  std::string getAttributeValue(const std::shared_ptr& 
flow_file, const std::smatch& match) const;
+
+  std::shared_ptr logger_;
+  std::string search_value_;
+  std::regex search_regex_;
+  std::string replacement_value_;
+  uint64_t maximum_buffer_size_ = 0;
+  EvaluationModeType evaluation_mode_ = EvaluationModeType::LINE_BY_LINE;
+  LineByLineEvaluationModeType line_by_line_evaluation_mode_ = 
LineByLineEvaluationModeType::ALL;
+  ReplacementStrategyType replacement_strategy_ = 
ReplacementStrategyType::REGEX_REPLACE;
+};
+
+REGISTER_RESOURCE(ReplaceText, "Updates the content of a FlowFile by replacing 
parts of it using various replacement strategies.");

Review comment:
   OK, I'll move it after #1138 is merged.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

2021-09-09 Thread GitBox


fgerlits commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705162395



##
File path: docker/test/integration/MiNiFi_integration_test_driver.py
##
@@ -104,7 +104,7 @@ def 
generate_input_port_for_remote_process_group(remote_process_group, name):
 return input_port_node
 
 def add_test_data(self, path, test_data, file_name=str(uuid.uuid4())):
-self.docker_directory_bindings.put_file_to_docker_path(self.test_id, 
path, file_name, test_data.encode('utf-8'))
+self.docker_directory_bindings.put_file_to_docker_path(self.test_id, 
path, file_name, test_data.replace('\\n', '\n').encode('utf-8'))

Review comment:
   OK, I'll revert these bits if your PR is merged first.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

2021-09-09 Thread GitBox


adamdebreceni commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705159596



##
File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
##
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(uint64_t
 buffer_size, std::shared_ptr logger, CallbackType 
callback)
+  : buffer_(buffer_size),
+logger_(std::move(logger)),
+callback_(std::move(callback)) {
+}
+
+int64_t LineByLineInputOutputStreamCallback::process(const 
std::shared_ptr& input, const std::shared_ptr& 
output) {
+  gsl_Expects(input);
+  gsl_Expects(output);
+
+  input_size_ = input->size();
+
+  if (int64_t status = readLine(*input); status <= 0) {
+return status;
+  }
+
+  bool is_first_line = true;
+  do {
+if (int64_t status = readLine(*input); status < 0) {
+  return status;
+}
+std::string output_line = callback_(*current_line_, is_first_line, 
isLastLine());
+output->write(reinterpret_cast(output_line.data()), 
output_line.size());
+is_first_line = false;
+  } while (!isLastLine());
+
+  return gsl::narrow(total_bytes_read_);
+}
+
+int64_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream) 
{
+  std::string input_line;
+  const size_t current_read = readLine(stream, input_line);
+  if (io::isError(current_read)) {
+return -1;
+  }
+  if (current_read == 0) {
+current_line_ = next_line_;
+next_line_ = std::nullopt;
+return 0;
+  }
+  current_line_ = next_line_;
+  next_line_ = input_line;
+  total_bytes_read_ += current_read;
+  return gsl::narrow(current_read);
+}
+
+size_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream, 
std::string& line) {
+  if (current_pos_ == end_pos_) {
+const auto current_bytes_read = fillBuffer(stream);
+if (io::isError(current_bytes_read) || current_bytes_read == 0) {
+  return current_bytes_read;
+}
+  }
+
+  int64_t pos = current_pos_;
+  while (pos < end_pos_ && buffer_[pos] != '\n') {
+++pos;
+  }
+  if (pos < end_pos_ && buffer_[pos] == '\n') {
+++pos;
+line = std::string{buffer_.begin() + current_pos_, buffer_.begin() + pos};
+current_pos_ = pos;
+return line.size();
+  }
+
+  std::string start_of_line{buffer_.begin() + current_pos_, buffer_.begin() + 
end_pos_};
+
+  const auto current_bytes_read = fillBuffer(stream);
+  if (io::isError(current_bytes_read)) {
+return io::STREAM_ERROR;
+  } else if (current_bytes_read == 0) {
+line = start_of_line;
+return line.size();
+  }
+
+  pos = current_pos_;
+  while (pos < end_pos_ && buffer_[pos] != '\n') {
+++pos;
+  }
+  if (pos < end_pos_ && buffer_[pos] == '\n') {
+++pos;
+line = start_of_line + std::string{buffer_.begin() + current_pos_, 
buffer_.begin() + pos};
+current_pos_ = pos;
+return line.size();
+  }
+
+  logger_->log_error("Found a line longer than the max buffer size (%zu 
bytes)", buffer_.size());

Review comment:
   it seems we might process longer than `buffer size` lines, as we might 
append the end of the previous buffer fill




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

2021-09-09 Thread GitBox


adamdebreceni commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705156210



##
File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
##
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(uint64_t
 buffer_size, std::shared_ptr logger, CallbackType 
callback)
+  : buffer_(buffer_size),
+logger_(std::move(logger)),
+callback_(std::move(callback)) {
+}
+
+int64_t LineByLineInputOutputStreamCallback::process(const 
std::shared_ptr& input, const std::shared_ptr& 
output) {
+  gsl_Expects(input);
+  gsl_Expects(output);
+
+  input_size_ = input->size();
+
+  if (int64_t status = readLine(*input); status <= 0) {
+return status;
+  }
+
+  bool is_first_line = true;
+  do {
+if (int64_t status = readLine(*input); status < 0) {
+  return status;
+}
+std::string output_line = callback_(*current_line_, is_first_line, 
isLastLine());
+output->write(reinterpret_cast(output_line.data()), 
output_line.size());
+is_first_line = false;
+  } while (!isLastLine());
+
+  return gsl::narrow(total_bytes_read_);
+}
+
+int64_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream) 
{
+  std::string input_line;
+  const size_t current_read = readLine(stream, input_line);
+  if (io::isError(current_read)) {
+return -1;
+  }
+  if (current_read == 0) {
+current_line_ = next_line_;
+next_line_ = std::nullopt;
+return 0;
+  }
+  current_line_ = next_line_;
+  next_line_ = input_line;
+  total_bytes_read_ += current_read;
+  return gsl::narrow(current_read);
+}
+
+size_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream, 
std::string& line) {
+  if (current_pos_ == end_pos_) {
+const auto current_bytes_read = fillBuffer(stream);

Review comment:
   note: the way it is implemented in RouteText is to check if the input 
stream supports access to its underlying buffer (BufferStream and the default 
RocksDbStream do) and only read into a buffer as a fallback




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

2021-09-09 Thread GitBox


adamdebreceni commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705156210



##
File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
##
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(uint64_t
 buffer_size, std::shared_ptr logger, CallbackType 
callback)
+  : buffer_(buffer_size),
+logger_(std::move(logger)),
+callback_(std::move(callback)) {
+}
+
+int64_t LineByLineInputOutputStreamCallback::process(const 
std::shared_ptr& input, const std::shared_ptr& 
output) {
+  gsl_Expects(input);
+  gsl_Expects(output);
+
+  input_size_ = input->size();
+
+  if (int64_t status = readLine(*input); status <= 0) {
+return status;
+  }
+
+  bool is_first_line = true;
+  do {
+if (int64_t status = readLine(*input); status < 0) {
+  return status;
+}
+std::string output_line = callback_(*current_line_, is_first_line, 
isLastLine());
+output->write(reinterpret_cast(output_line.data()), 
output_line.size());
+is_first_line = false;
+  } while (!isLastLine());
+
+  return gsl::narrow(total_bytes_read_);
+}
+
+int64_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream) 
{
+  std::string input_line;
+  const size_t current_read = readLine(stream, input_line);
+  if (io::isError(current_read)) {
+return -1;
+  }
+  if (current_read == 0) {
+current_line_ = next_line_;
+next_line_ = std::nullopt;
+return 0;
+  }
+  current_line_ = next_line_;
+  next_line_ = input_line;
+  total_bytes_read_ += current_read;
+  return gsl::narrow(current_read);
+}
+
+size_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream, 
std::string& line) {
+  if (current_pos_ == end_pos_) {
+const auto current_bytes_read = fillBuffer(stream);

Review comment:
   the way it is implemented in RouteText is to check if the input stream 
supports access to its underlying buffer (BufferStream and the default 
RocksDbStream do) and only read into a buffer as a fallback




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] Lehel44 commented on a change in pull request #5356: NIFI-9183: Add a command-line option to save status history

2021-09-09 Thread GitBox


Lehel44 commented on a change in pull request #5356:
URL: https://github.com/apache/nifi/pull/5356#discussion_r705143354



##
File path: nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
##
@@ -275,6 +282,9 @@ public static void main(String[] args) throws IOException, 
InterruptedException
 case "env":
 runNiFi.env();
 break;
+case "status-history":

Review comment:
   Diagnostics works the same way. If you run a command with the same 
filename you ran it before with, you can generally expect that to be 
overwritten. I'll extend the documentation with this piece of information.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

2021-09-09 Thread GitBox


adamdebreceni commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705141850



##
File path: extensions/standard-processors/processors/ReplaceText.h
##
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include 
+#include 
+#include 
+#include 
+
+#include "core/Annotation.h"
+#include "core/Processor.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessSessionFactory.h"
+#include "core/Resource.h"
+#include "core/logging/Logger.h"
+#include "utils/Enum.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+SMART_ENUM(EvaluationModeType,
+  (LINE_BY_LINE, "Line-by-Line"),
+  (ENTIRE_TEXT, "Entire text")
+)
+
+SMART_ENUM(LineByLineEvaluationModeType,
+  (ALL, "All"),
+  (FIRST_LINE, "First-Line"),
+  (LAST_LINE, "Last-Line"),
+  (EXCEPT_FIRST_LINE, "Except-First-Line"),
+  (EXCEPT_LAST_LINE, "Except-Last-Line")
+)
+
+SMART_ENUM(ReplacementStrategyType,
+  (PREPEND, "Prepend"),
+  (APPEND, "Append"),
+  (REGEX_REPLACE, "Regex Replace"),
+  (LITERAL_REPLACE, "Literal Replace"),
+  (ALWAYS_REPLACE, "Always Replace"),
+  (SUBSTITUTE_VARIABLES, "Substitute Variables")
+)
+
+class ReplaceText : public core::Processor {
+ public:
+  static const core::Property EvaluationMode;
+  static const core::Property LineByLineEvaluationMode;
+  static const core::Property ReplacementStrategy;
+  static const core::Property MaximumBufferSize;
+  static const core::Property SearchValue;
+  static const core::Property ReplacementValue;
+
+  static const core::Relationship Success;
+  static const core::Relationship Failure;
+
+  explicit ReplaceText(const std::string& name, const utils::Identifier& uuid 
= {});
+  core::annotation::Input getInputRequirement() const override;
+  void initialize() override;
+  void onSchedule(const std::shared_ptr& context, const 
std::shared_ptr&) override;
+  void onTrigger(const std::shared_ptr& context, const 
std::shared_ptr& session) override;
+
+ private:
+  friend struct ReplaceTextTestAccessor;
+
+  void readSearchValueProperty(const std::shared_ptr& 
context, const std::shared_ptr& flow_file);
+  void readReplacementValueProperty(const 
std::shared_ptr& context, const 
std::shared_ptr& flow_file);
+
+  void replaceTextInEntireFile(const std::shared_ptr& 
flow_file, const std::shared_ptr& session) const;
+  void replaceTextLineByLine(const std::shared_ptr& flow_file, 
const std::shared_ptr& session) const;
+
+  std::string applyReplacements(const std::string& input, const 
std::shared_ptr& flow_file) const;
+  std::string applyRegexReplace(const std::string& input) const;
+  std::string applyLiteralReplace(const std::string& input) const;
+  std::string applySubstituteVariables(const std::string& input, const 
std::shared_ptr& flow_file) const;
+  std::string getAttributeValue(const std::shared_ptr& 
flow_file, const std::smatch& match) const;
+
+  std::shared_ptr logger_;
+  std::string search_value_;
+  std::regex search_regex_;
+  std::string replacement_value_;
+  uint64_t maximum_buffer_size_ = 0;
+  EvaluationModeType evaluation_mode_ = EvaluationModeType::LINE_BY_LINE;
+  LineByLineEvaluationModeType line_by_line_evaluation_mode_ = 
LineByLineEvaluationModeType::ALL;
+  ReplacementStrategyType replacement_strategy_ = 
ReplacementStrategyType::REGEX_REPLACE;
+};
+
+REGISTER_RESOURCE(ReplaceText, "Updates the content of a FlowFile by replacing 
parts of it using various replacement strategies.");

Review comment:
   note: after the dynamic PR, resource registration should happen in cpp 
files




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

2021-09-09 Thread GitBox


adamdebreceni commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705139497



##
File path: docker/test/integration/MiNiFi_integration_test_driver.py
##
@@ -104,7 +104,7 @@ def 
generate_input_port_for_remote_process_group(remote_process_group, name):
 return input_port_node
 
 def add_test_data(self, path, test_data, file_name=str(uuid.uuid4())):
-self.docker_directory_bindings.put_file_to_docker_path(self.test_id, 
path, file_name, test_data.encode('utf-8'))
+self.docker_directory_bindings.put_file_to_docker_path(self.test_id, 
path, file_name, test_data.replace('\\n', '\n').encode('utf-8'))

Review comment:
   note: I have also touched this part for the same reason (use escaped 
`\t`, `\n`, ... chars) in 
[1168](https://github.com/apache/nifi-minifi-cpp/pull/1168) (same with the file 
content validation)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (NIFI-9206) Create a processor that is capable of removing fields from records

2021-09-09 Thread Peter Gyori (Jira)
Peter Gyori created NIFI-9206:
-

 Summary: Create a processor that is capable of removing fields 
from records
 Key: NIFI-9206
 URL: https://issues.apache.org/jira/browse/NIFI-9206
 Project: Apache NiFi
  Issue Type: New Feature
  Components: Extensions
Reporter: Peter Gyori
Assignee: Peter Gyori


A processor should be created that is capable of removing fields from records 
(RemoveRecordField might be a name for it).

The processor should have 3 properties:
 * Record Reader (a Reader controller service could be specified)
 * Record Writer (a Writer controller service could be specified)
 * Field To Remove (expects a RecordPath that points to the field to be removed)

The processor should be able to accept additional dynamic properties that 
specify further fields (by RecordPath) to be removed from the record.

+*Example*+

+input:+
{code:java}
{
"id": 1,
"name": "John",
"address": {
"zip": ,
"street": "Main",
"building": 11
}
}
{code}
+Field to remove:+ /address/building

+output:+
{code:java}
{
"id": 1,
"name": "John",
"address": {
"zip": ,
"street": "Main"
}
}
{code}
The record's schema should be modified accordingly (removing the 
/address/building field from the schema). Field removal should be permitted 
regardless of the field being nullable or not.

Generally, the removal of a field should include the field's removal from the 
schema AND the data. The exception is if the removal is data-dependent (the 
field should be removed if its value equals "xyz"). In this case no schema 
modification should occur.

The processor should be able to remove one or more elements from arrays (e.g.: 
/addresses[ 1 ] shoud remove the element from the addresses array from the 1st 
position). When removing a field from elements of an array, the array's schema 
should only be modified if the removal is applied to all elements of the array 
(i.e.: /addresses[ * ]/building should modify the schema of the array, but 
/addresses[ 1 ]/building should not).

The same rule should be applied when handling Map datatype.

If the record does not contain the field that is expected to be removed, the 
record should still be transferred to the 'success' relationship, with no 
modification. The expectation is that if /x/y is expected to be removed from 
the record, then the record leaving the processor should not contain /x/y field.

If a certain field can be of different types (e.g. the address field can be 
"string" as well as "record" and possibly another "record" with a schema 
different from the former record type) then if /address/building is expected to 
be removed from the record, the processor is expected to remove the building 
field from the schema of all the possible types of the address field regardless 
of the address field being whatever concrete type in the particular record that 
is being processed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #1120: MINIFICPP-1374 Implement security protocol support for ConsumeKafka

2021-09-09 Thread GitBox


lordgamez commented on a change in pull request #1120:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1120#discussion_r705083561



##
File path: extensions/librdkafka/ConsumeKafka.cpp
##
@@ -211,14 +218,22 @@ void ConsumeKafka::onSchedule(core::ProcessContext* 
context, core::ProcessSessio
   context->getProperty(MessageHeaderEncoding.getName(), 
message_header_encoding_);
   context->getProperty(DuplicateHeaderHandling.getName(), 
duplicate_header_handling_);
 
+  std::string ssl_service_name;
+  std::shared_ptr ssl_service;
+  if (context->getProperty(SSLContextService.getName(), ssl_service_name) && 
!ssl_service_name.empty()) {
+std::shared_ptr service = 
context->getControllerService(ssl_service_name);
+if (service) {
+  ssl_service = 
std::static_pointer_cast(service);
+  ssl_data_.ca_loc = ssl_service->getCACertificate();
+  ssl_data_.cert_loc = ssl_service->getCertificateFile();
+  ssl_data_.key_loc = ssl_service->getPrivateKeyFile();
+  ssl_data_.key_pw = ssl_service->getPassphrase();
+}
+  }

Review comment:
   Updated in 0e5f15b290e21b011e99628a6e2198e88cfad106

##
File path: extensions/librdkafka/PublishKafka.cpp
##
@@ -709,37 +717,70 @@ bool PublishKafka::configureNewConnection(const 
std::shared_ptrgetProperty(SecurityCA.getName(), value) && !value.empty()) 
{
-result = rd_kafka_conf_set(conf_.get(), "ssl.ca.location", 
value.c_str(), errstr.data(), errstr.size());
-logger_->log_debug("PublishKafka: ssl.ca.location [%s]", value);
+
+  std::shared_ptr ssl_service;
+  if (context->getProperty(SSLContextService.getName(), value) && 
!value.empty()) {
+std::shared_ptr service = 
context->getControllerService(value);
+if (service) {
+  ssl_service = 
std::static_pointer_cast(service);
+}
+  }

Review comment:
   Updated in 0e5f15b290e21b011e99628a6e2198e88cfad106




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (NIFI-9200) AbstractCSVLookupService cache remains on the heap after disabling the service

2021-09-09 Thread Peter Turcsanyi (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFI-9200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Peter Turcsanyi updated NIFI-9200:
--
Fix Version/s: 1.15.0
   Resolution: Fixed
   Status: Resolved  (was: Patch Available)

> AbstractCSVLookupService cache remains on the heap after disabling the service
> --
>
> Key: NIFI-9200
> URL: https://issues.apache.org/jira/browse/NIFI-9200
> Project: Apache NiFi
>  Issue Type: Bug
>Reporter: Lehel Boér
>Assignee: Lehel Boér
>Priority: Major
> Fix For: 1.15.0
>
>  Time Spent: 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [nifi] asfgit closed pull request #5372: NIFI-9200: Free cache on heap after disabling AbstractCSVLookupService

2021-09-09 Thread GitBox


asfgit closed pull request #5372:
URL: https://github.com/apache/nifi/pull/5372


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (NIFI-9200) AbstractCSVLookupService cache remains on the heap after disabling the service

2021-09-09 Thread ASF subversion and git services (Jira)


[ 
https://issues.apache.org/jira/browse/NIFI-9200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17412414#comment-17412414
 ] 

ASF subversion and git services commented on NIFI-9200:
---

Commit d96398feb89a77a27370c47e7e52d7f90721193e in nifi's branch 
refs/heads/main from Lehel Boér
[ https://gitbox.apache.org/repos/asf?p=nifi.git;h=d96398f ]

NIFI-9200: Free cache on heap after disabling AbstractCSVLookupService

This closes #5372.

Signed-off-by: Peter Turcsanyi 


> AbstractCSVLookupService cache remains on the heap after disabling the service
> --
>
> Key: NIFI-9200
> URL: https://issues.apache.org/jira/browse/NIFI-9200
> Project: Apache NiFi
>  Issue Type: Bug
>Reporter: Lehel Boér
>Assignee: Lehel Boér
>Priority: Major
>  Time Spent: 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [nifi] pvillard31 commented on pull request #3641: NIFI-4542, add target.dir.created to indicate if the target directory created

2021-09-09 Thread GitBox


pvillard31 commented on pull request #3641:
URL: https://github.com/apache/nifi/pull/3641#issuecomment-915839442


   @avseq1234 - it sounds like a straightforward change, can you rebase against 
main and fix conflicts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] KuKuDeCheng commented on a change in pull request #5318: NIFI-9064:Support Oracle timestamp when `Use Avro Logical Types` is t…

2021-09-09 Thread GitBox


KuKuDeCheng commented on a change in pull request #5318:
URL: https://github.com/apache/nifi/pull/5318#discussion_r705064997



##
File path: 
nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
##
@@ -1423,9 +1423,40 @@ public static Timestamp toTimestamp(final Object value, 
final Supplier

[GitHub] [nifi] avseq1234 opened a new pull request #3641: NIFI-4542, add target.dir.created to indicate if the target directory created

2021-09-09 Thread GitBox


avseq1234 opened a new pull request #3641:
URL: https://github.com/apache/nifi/pull/3641


   … created
   
   Thank you for submitting a contribution to Apache NiFi.
   
   Please provide a short description of the PR here:
   
   ### Description of PR
   
   Add target.dir.created attribute to output flowfile to indicate if the 
target directory created.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [X] Is there a JIRA ticket associated with this PR? Is it referenced 
in the commit message?
   
   - [X] Does your PR title start with **NIFI-** where  is the JIRA 
number you are trying to resolve? Pay particular attention to the hyphen "-" 
character.
   
   - [X] Has your PR been rebased against the latest commit within the target 
branch (typically `master`)?
   
   - [ ] Is your initial contribution a single, squashed commit? _Additional 
commits in response to PR reviewer feedback should be made on this branch and 
pushed to allow change tracking. Do not `squash` or use `--force` when pushing 
to allow for clean monitoring of changes._
   
   ### For code changes:
   - [ ] Have you ensured that the full suite of tests is executed via `mvn 
-Pcontrib-check clean install` at the root `nifi` folder?
   - [X] Have you written or updated unit tests to verify your changes?
   - [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
   - [ ] If applicable, have you updated the `LICENSE` file, including the main 
`LICENSE` file under `nifi-assembly`?
   - [ ] If applicable, have you updated the `NOTICE` file, including the main 
`NOTICE` file found under `nifi-assembly`?
   - [ ] If adding new Properties, have you added `.displayName` in addition to 
.name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which 
it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1138: MINIFICPP-1471 - Build extensions as dynamic libraries and dynamically load them

2021-09-09 Thread GitBox


adamdebreceni commented on a change in pull request #1138:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1138#discussion_r705061409



##
File path: libminifi/include/core/extension/DynamicLibrary.h
##
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include 
+#include 
+#include 
+#include 
+
+#include "Module.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace extension {
+
+class DynamicLibrary : public Module {
+  friend class ExtensionManager;
+
+ public:
+  DynamicLibrary(std::string name, std::filesystem::path library_path);
+  ~DynamicLibrary() override;
+
+ private:
+#ifdef WIN32
+  std::map resource_mapping_;
+
+  std::string error_str_;
+  std::string current_error_;
+
+  void store_error();
+  void* dlsym(void* handle, const char* name);
+  const char* dlerror();
+  void* dlopen(const char* file, int mode);
+  int dlclose(void* handle);
+#endif
+
+  bool load();
+  bool unload();
+
+  std::filesystem::path library_path_;
+  gsl::owner handle_ = nullptr;
+
+  static std::shared_ptr logger_;

Review comment:
   should be done now




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1138: MINIFICPP-1471 - Build extensions as dynamic libraries and dynamically load them

2021-09-09 Thread GitBox


adamdebreceni commented on a change in pull request #1138:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1138#discussion_r705060665



##
File path: libminifi/test/unit/FilePatternTests.cpp
##
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define CUSTOM_EXTENSION_INIT
+
+#include 
+
+#include "../TestBase.h"
+#include "utils/file/FilePattern.h"
+#include "range/v3/view/transform.hpp"
+#include "range/v3/view/map.hpp"
+#include "range/v3/view/join.hpp"
+#include "range/v3/view/cache1.hpp"
+#include "range/v3/view/c_str.hpp"
+#include "range/v3/range/conversion.hpp"
+#include "range/v3/range.hpp"
+
+struct FilePatternTestAccessor {
+  using FilePatternSegment = fileutils::FilePattern::FilePatternSegment;
+};
+
+using FilePatternSegment = FilePatternTestAccessor::FilePatternSegment;
+using FilePattern = fileutils::FilePattern;
+
+#define REQUIRE_INCLUDE(val) REQUIRE((val == 
FilePatternSegment::MatchResult::INCLUDE))
+#define REQUIRE_EXCLUDE(val) REQUIRE((val == 
FilePatternSegment::MatchResult::EXCLUDE))
+#define REQUIRE_NOT_MATCHING(val) REQUIRE((val == 
FilePatternSegment::MatchResult::NOT_MATCHING))
+
+#ifdef WIN32
+static std::filesystem::path root{"C:\\"};
+#else
+static std::filesystem::path root{"/"};
+#endif
+
+TEST_CASE("Invalid paths") {
+  REQUIRE_THROWS(FilePatternSegment(""));
+  REQUIRE_THROWS(FilePatternSegment("."));
+  REQUIRE_THROWS(FilePatternSegment(".."));
+  REQUIRE_THROWS(FilePatternSegment("!"));
+  REQUIRE_THROWS(FilePatternSegment("!."));
+  REQUIRE_THROWS(FilePatternSegment("!.."));
+  // parent accessor after wildcard
+  FilePatternSegment("./../file.txt");  // sanity check
+  REQUIRE_THROWS(FilePatternSegment("./*/../file.txt"));
+}
+
+TEST_CASE("FilePattern reports error in correct subpattern") {
+  std::vector> invalid_subpattern{

Review comment:
   changed

##
File path: libminifi/src/core/extension/ExtensionManager.cpp
##
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/extension/ExtensionManager.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/file/FileUtils.h"
+#include "core/extension/Executable.h"
+#include "utils/file/FilePattern.h"
+#include "core/extension/DynamicLibrary.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace extension {
+
+namespace {
+
+struct LibraryDescriptor {
+  std::string name;
+  std::filesystem::path dir;
+  std::string filename;
+
+  bool verify(const std::shared_ptr& /*logger*/) const {
+// TODO(adebreceni): check signature
+return true;
+  }
+
+  std::filesystem::path getFullPath() const {
+return dir / filename;
+  }

Review comment:
   added




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1138: MINIFICPP-1471 - Build extensions as dynamic libraries and dynamically load them

2021-09-09 Thread GitBox


adamdebreceni commented on a change in pull request #1138:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1138#discussion_r705059132



##
File path: libminifi/include/core/logging/Logger.h
##
@@ -26,6 +26,7 @@
 #include 
 #include 
 #include 
+#include 

Review comment:
   remove

##
File path: libminifi/include/utils/file/FilePattern.h
##
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include "utils/OptionalUtils.h"
+#include "core/logging/Logger.h"

Review comment:
   removed

##
File path: libminifi/src/core/extension/DynamicLibrary.cpp
##
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+#ifndef WIN32
+#include 
+#define DLL_EXPORT
+#else
+#include 
+#define WIN32_LEAN_AND_MEAN 1
+#include // Windows specific libraries for collecting software 
metrics.
+#include 
+#pragma comment(lib, "psapi.lib" )
+#define DLL_EXPORT __declspec(dllexport)
+#define RTLD_LAZY   0
+#define RTLD_NOW0
+
+#define RTLD_GLOBAL (1 << 1)
+#define RTLD_LOCAL  (1 << 2)
+#endif
+
+#include "core/extension/DynamicLibrary.h"
+#include "core/extension/Extension.h"
+#include "utils/GeneralUtils.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace extension {
+
+std::shared_ptr DynamicLibrary::logger_ = 
logging::LoggerFactory::getLogger();
+
+DynamicLibrary::DynamicLibrary(std::string name, std::filesystem::path 
library_path)
+  : Module(std::move(name)),
+library_path_(std::move(library_path)) {
+}
+
+bool DynamicLibrary::load() {
+  dlerror();
+  handle_ = dlopen(library_path_.string().c_str(), RTLD_NOW | RTLD_LOCAL);
+  if (!handle_) {
+logger_->log_error("Failed to load extension '%s' at '%s': %s", name_, 
library_path_.string(), dlerror());
+return false;
+  } else {
+logger_->log_trace("Loaded extension '%s' at '%s'", name_, 
library_path_.string());
+return true;
+  }
+}
+
+bool DynamicLibrary::unload() {
+  logger_->log_trace("Unloading library '%s' at '%s'", name_, 
library_path_.string());
+  if (!handle_) {
+logger_->log_error("Extension does not have a handle_ '%s' at '%s'", 
name_, library_path_.string());
+return true;
+  }
+  dlerror();
+  if (dlclose(handle_)) {
+logger_->log_error("Failed to unload extension '%s' at '%': %s", name_, 
library_path_.string(), dlerror());
+return false;
+  }
+  logger_->log_trace("Unloaded extension '%s' at '%s'", name_, 
library_path_.string());
+  handle_ = nullptr;
+  return true;
+}
+
+DynamicLibrary::~DynamicLibrary() = default;
+
+#ifdef WIN32
+
+void DynamicLibrary::store_error() {
+  auto error = GetLastError();
+
+  if (error == 0) {
+error_str_ = "";
+return;
+  }
+
+  current_error_ = std::system_category().message(error);
+}
+
+void* DynamicLibrary::dlsym(void* handle, const char* name) {
+  FARPROC symbol;
+
+  symbol = GetProcAddress((HMODULE)handle, name);
+
+  if (symbol == nullptr) {
+store_error();
+
+for (auto hndl : resource_mapping_) {
+  symbol = GetProcAddress((HMODULE)hndl.first, name);
+  if (symbol != nullptr) {
+break;
+  }
+}
+  }
+
+#ifdef _MSC_VER
+#pragma warning(suppress: 4054 )
+#endif
+  return reinterpret_cast(symbol);
+}
+
+const char* 

[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1138: MINIFICPP-1471 - Build extensions as dynamic libraries and dynamically load them

2021-09-09 Thread GitBox


adamdebreceni commented on a change in pull request #1138:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1138#discussion_r705041686



##
File path: libminifi/include/core/extension/DynamicLibrary.h
##
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include 
+#include 
+#include 
+#include 
+
+#include "Module.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace extension {
+
+class DynamicLibrary : public Module {
+  friend class ExtensionManager;
+
+ public:
+  DynamicLibrary(std::string name, std::filesystem::path library_path);
+  ~DynamicLibrary() override;
+
+ private:
+#ifdef WIN32
+  std::map resource_mapping_;
+
+  std::string error_str_;
+  std::string current_error_;
+
+  void store_error();
+  void* dlsym(void* handle, const char* name);
+  const char* dlerror();
+  void* dlopen(const char* file, int mode);
+  int dlclose(void* handle);
+#endif
+
+  bool load();
+  bool unload();
+
+  std::filesystem::path library_path_;
+  gsl::owner handle_ = nullptr;
+
+  static std::shared_ptr logger_;

Review comment:
   it seems it was never fixed, I made the ExtensionManager::logger_ const 
:) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org