[jira] [Updated] (NIFI-9210) Upgrade jsoup to 1.14.2
[ https://issues.apache.org/jira/browse/NIFI-9210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Handermann updated NIFI-9210: --- Status: Patch Available (was: Open) > Upgrade jsoup to 1.14.2 > --- > > Key: NIFI-9210 > URL: https://issues.apache.org/jira/browse/NIFI-9210 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Affects Versions: 1.14.0, 1.15.0 >Reporter: David Handermann >Assignee: David Handermann >Priority: Minor > Labels: dependency-upgrade, security > Time Spent: 10m > Remaining Estimate: 0h > > The following NiFi modules have a dependency on jsoup for HTML parsing: > * nifi-html-processors > * nifi-media-processors > Versions of jsoup prior to 1.14.2 have an associated vulnerability > [CVE-2021-37714|https://nvd.nist.gov/vuln/detail/CVE-2021-37714]. Upgrading > to the latest version of jsoup addresses this potential vulnerability as well > as a number of minor bugs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (NIFI-9210) Upgrade jsoup to 1.14.2
[ https://issues.apache.org/jira/browse/NIFI-9210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Handermann updated NIFI-9210: --- Labels: dependency-upgrade security (was: ) > Upgrade jsoup to 1.14.2 > --- > > Key: NIFI-9210 > URL: https://issues.apache.org/jira/browse/NIFI-9210 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Affects Versions: 1.14.0, 1.15.0 >Reporter: David Handermann >Assignee: David Handermann >Priority: Minor > Labels: dependency-upgrade, security > Time Spent: 10m > Remaining Estimate: 0h > > The following NiFi modules have a dependency on jsoup for HTML parsing: > * nifi-html-processors > * nifi-media-processors > Versions of jsoup prior to 1.14.2 have an associated vulnerability > [CVE-2021-37714|https://nvd.nist.gov/vuln/detail/CVE-2021-37714]. Upgrading > to the latest version of jsoup addresses this potential vulnerability as well > as a number of minor bugs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi] exceptionfactory opened a new pull request #5379: NIFI-9210 Upgrade jsoup from 1.8.3 to 1.14.2
exceptionfactory opened a new pull request #5379: URL: https://github.com/apache/nifi/pull/5379 Description of PR NIFI-9210 Upgrades `jsoup` from 1.8.3 to 1.14.2 in several modules to address potential vulnerabilities. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [X] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [X] Does your PR title start with **NIFI-** where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [X] Has your PR been rebased against the latest commit within the target branch (typically `main`)? - [X] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._ ### For code changes: - [X] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder? - [X] Have you written or updated unit tests to verify your changes? - [X] Have you verified that the full build is successful on JDK 8? - [X] Have you verified that the full build is successful on JDK 11? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`? - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`? - [ ] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (NIFI-9167) Refactor nifi-framework-bundle to use JUnit 5
[ https://issues.apache.org/jira/browse/NIFI-9167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mike Thomsen reassigned NIFI-9167: -- Assignee: Mike Thomsen > Refactor nifi-framework-bundle to use JUnit 5 > - > > Key: NIFI-9167 > URL: https://issues.apache.org/jira/browse/NIFI-9167 > Project: Apache NiFi > Issue Type: Sub-task >Reporter: Mike Thomsen >Assignee: Mike Thomsen >Priority: Minor > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi] MikeThomsen opened a new pull request #5378: NIFI-9166 Refactored nifi-standard-services to use JUnit 5.
MikeThomsen opened a new pull request #5378: URL: https://github.com/apache/nifi/pull/5378 Thank you for submitting a contribution to Apache NiFi. Please provide a short description of the PR here: Description of PR _Enables X functionality; fixes bug NIFI-._ In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with **NIFI-** where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically `main`)? - [ ] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._ ### For code changes: - [ ] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] Have you verified that the full build is successful on JDK 8? - [ ] Have you verified that the full build is successful on JDK 11? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`? - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`? - [ ] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (NIFI-9210) Upgrade jsoup to 1.14.2
David Handermann created NIFI-9210: -- Summary: Upgrade jsoup to 1.14.2 Key: NIFI-9210 URL: https://issues.apache.org/jira/browse/NIFI-9210 Project: Apache NiFi Issue Type: Improvement Components: Extensions Affects Versions: 1.14.0, 1.15.0 Reporter: David Handermann Assignee: David Handermann The following NiFi modules have a dependency on jsoup for HTML parsing: * nifi-html-processors * nifi-media-processors Versions of jsoup prior to 1.14.2 have an associated vulnerability [CVE-2021-37714|https://nvd.nist.gov/vuln/detail/CVE-2021-37714]. Upgrading to the latest version of jsoup addresses this potential vulnerability as well as a number of minor bugs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (NIFI-9209) Class version conflict due to hive-exec-3.1.2.jar
Denes Arvay created NIFI-9209: - Summary: Class version conflict due to hive-exec-3.1.2.jar Key: NIFI-9209 URL: https://issues.apache.org/jira/browse/NIFI-9209 Project: Apache NiFi Issue Type: Bug Reporter: Denes Arvay Assignee: Denes Arvay hive-exec-3.1.2.jar contains a couple of 3rd party libraries (org.apache.avro, org.apache.commons.lang, com.google.common, com.google.protobuf, etc) that can cause unexpected classloading issues as it happened recently with a PR [1], [2]. hive-exec has a core classifier version without these dependencies, that should be used instead. [1] https://github.com/apache/nifi/pull/5358 [2] https://github.com/apache/nifi/pull/5358/checks?check_run_id=3544604627 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi] mattyb149 commented on a change in pull request #5365: NIFI-9192: ResultSetRecordSet considers value of useLogicalTypes flag…
mattyb149 commented on a change in pull request #5365: URL: https://github.com/apache/nifi/pull/5365#discussion_r705635141 ## File path: nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java ## @@ -297,7 +271,54 @@ private DataType getDataType(final int sqlType, final ResultSet rs, final int co } } -private static DataType getArrayBaseType(final Array array) throws SQLException { +private DataType getArrayDataType(ResultSet rs, int columnIndex, boolean useLogicalTypes) throws SQLException { +// The JDBC API does not allow us to know what the base type of an array is through the metadata. +// As a result, we have to obtain the actual Array for this record. Once we have this, we can determine +// the base type. However, if the base type is, itself, an array, we will simply return a base type of +// String because otherwise, we need the ResultSet for the array itself, and many JDBC Drivers do not +// support calling Array.getResultSet() and will throw an Exception if that is not supported. +if (rs.isAfterLast()) { Review comment: There are merge conflicts due to a try-catch added here and some other changes via [NIFI-8376](https://issues.apache.org/jira/browse/NIFI-8376) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (NIFI-9166) Refactor nifi-standard-services-bundle to use JUnit 5
[ https://issues.apache.org/jira/browse/NIFI-9166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mike Thomsen reassigned NIFI-9166: -- Assignee: Mike Thomsen > Refactor nifi-standard-services-bundle to use JUnit 5 > - > > Key: NIFI-9166 > URL: https://issues.apache.org/jira/browse/NIFI-9166 > Project: Apache NiFi > Issue Type: Sub-task >Reporter: Mike Thomsen >Assignee: Mike Thomsen >Priority: Minor > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi] MikeThomsen opened a new pull request #5377: NIFI-9165 Refactored nifi-standard-bundle to use JUnit 5.
MikeThomsen opened a new pull request #5377: URL: https://github.com/apache/nifi/pull/5377 Thank you for submitting a contribution to Apache NiFi. Please provide a short description of the PR here: Description of PR _Enables X functionality; fixes bug NIFI-._ In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with **NIFI-** where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically `main`)? - [ ] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._ ### For code changes: - [ ] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] Have you verified that the full build is successful on JDK 8? - [ ] Have you verified that the full build is successful on JDK 11? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`? - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`? - [ ] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] tpalfy opened a new pull request #5376: NIFI-9203 Improve GrokReader to be able to handle complex grok expression properly.
tpalfy opened a new pull request #5376: URL: https://github.com/apache/nifi/pull/5376 https://issues.apache.org/jira/browse/NIFI-9203 ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with **NIFI-** where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically `main`)? - [ ] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._ ### For code changes: - [ ] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] Have you verified that the full build is successful on JDK 8? - [ ] Have you verified that the full build is successful on JDK 11? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`? - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`? - [ ] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (NIFI-9208) Adding capability to collect necessary resources similar to NAR Provider function
Simon Bence created NIFI-9208: - Summary: Adding capability to collect necessary resources similar to NAR Provider function Key: NIFI-9208 URL: https://issues.apache.org/jira/browse/NIFI-9208 Project: Apache NiFi Issue Type: New Feature Components: Core Framework Reporter: Simon Bence Assignee: Simon Bence Having a similar capability for "resources" as we have with NAR autoloader, that it is capable to gather NARs from external sources would be very useful. This could be applied to database drivers, Hadoop config files and so. Note: as the two feature seems pretty similar, I plan to share as much functionality as it is reasonable. Probably (and preferably) I can separate the "listing and fetching" from NAR specific parts and let the autoloader depending on the more generic solution. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (NIFI-9207) Add max read size property to Distributed Cache Servers
Paul Grey created NIFI-9207: --- Summary: Add max read size property to Distributed Cache Servers Key: NIFI-9207 URL: https://issues.apache.org/jira/browse/NIFI-9207 Project: Apache NiFi Issue Type: Improvement Reporter: Paul Grey Assignee: Paul Grey NiFi services DistributedMapCacheServer and DistributedSetCacheServer include a socket-based server, allowing service clients to collaboratively manage a cache of data values. IPC is performed via a defined set of APIs, which are implemented as a series of raw socket reads and writes. Service processing of incoming socket data should validate and limit its size, to ensure proper functioning of the server. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #1120: MINIFICPP-1374 Implement security protocol support for ConsumeKafka
lordgamez commented on a change in pull request #1120: URL: https://github.com/apache/nifi-minifi-cpp/pull/1120#discussion_r705419173 ## File path: docker/test/integration/features/kafka.feature ## @@ -44,12 +44,9 @@ Feature: Sending data to using Kafka streaming platform using PublishKafka | PublishKafka | Delivery Guarantee | 1 | | PublishKafka | Request Timeout| 10 sec | | PublishKafka | Message Timeout| 12 sec | - | PublishKafka | Security CA| /tmp/resources/certs/ca-cert | Review comment: Makes sense, I added that version as well in 0cf6d007765d021d20573b0be61c6b793b60bf80 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] exceptionfactory commented on pull request #5318: NIFI-9064:Support Oracle timestamp when `Use Avro Logical Types` is t…
exceptionfactory commented on pull request #5318: URL: https://github.com/apache/nifi/pull/5318#issuecomment-916114656 > Hi exceptionfactory, I tried to add a unit text. But as you can see, rs.getObject() and rs.getTimestamp() is standard methods in > java.sql.ResultSet. I can't figure out how to do this unit test,Do you have any good suggestions? The `ResultSetRecordSetTest` class has an existing `testCreateRecord()` method that uses a mocked `ResultSet`. You could either modify that method, or create a new test method that uses a sample `RecordSchema` with a timestamp column. It might be clearer to create a new test method since the existing method is already large. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] KuKuDeCheng commented on pull request #5318: NIFI-9064:Support Oracle timestamp when `Use Avro Logical Types` is t…
KuKuDeCheng commented on pull request #5318: URL: https://github.com/apache/nifi/pull/5318#issuecomment-916087773 Hi exceptionfactory, I tried to add a unit text. But as you can see, rs.getObject() and rs.getTimestamp() is standard methods in java.sql.ResultSet. I can't figure out how to do this unit test,Do you have any good suggestions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] exceptionfactory commented on pull request #5318: NIFI-9064:Support Oracle timestamp when `Use Avro Logical Types` is t…
exceptionfactory commented on pull request #5318: URL: https://github.com/apache/nifi/pull/5318#issuecomment-916066468 Thanks for the update @KuKuDeCheng, the changes in `ResultSetRecordSet` seem like a much cleaner solution. Can you add a unit test for the change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1120: MINIFICPP-1374 Implement security protocol support for ConsumeKafka
adamdebreceni commented on a change in pull request #1120: URL: https://github.com/apache/nifi-minifi-cpp/pull/1120#discussion_r705286235 ## File path: docker/test/integration/features/kafka.feature ## @@ -44,12 +44,9 @@ Feature: Sending data to using Kafka streaming platform using PublishKafka | PublishKafka | Delivery Guarantee | 1 | | PublishKafka | Request Timeout| 10 sec | | PublishKafka | Message Timeout| 12 sec | - | PublishKafka | Security CA| /tmp/resources/certs/ca-cert | Review comment: although we deprecated these properties in favor of the context service, shouldn't we keep testing the feature until we remove them? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor
fgerlits commented on a change in pull request #1170: URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705279874 ## File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp ## @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "utils/LineByLineInputOutputStreamCallback.h" + +#include "utils/gsl.h" + +namespace org::apache::nifi::minifi::utils { + +LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(uint64_t buffer_size, std::shared_ptr logger, CallbackType callback) + : buffer_(buffer_size), +logger_(std::move(logger)), +callback_(std::move(callback)) { +} + +int64_t LineByLineInputOutputStreamCallback::process(const std::shared_ptr& input, const std::shared_ptr& output) { + gsl_Expects(input); + gsl_Expects(output); + + input_size_ = input->size(); + + if (int64_t status = readLine(*input); status <= 0) { +return status; + } + + bool is_first_line = true; + do { +if (int64_t status = readLine(*input); status < 0) { + return status; +} +std::string output_line = callback_(*current_line_, is_first_line, isLastLine()); +output->write(reinterpret_cast(output_line.data()), output_line.size()); +is_first_line = false; + } while (!isLastLine()); + + return gsl::narrow(total_bytes_read_); +} + +int64_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream) { + std::string input_line; + const size_t current_read = readLine(stream, input_line); + if (io::isError(current_read)) { +return -1; + } + if (current_read == 0) { +current_line_ = next_line_; +next_line_ = std::nullopt; +return 0; + } + current_line_ = next_line_; + next_line_ = input_line; + total_bytes_read_ += current_read; + return gsl::narrow(current_read); +} + +size_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream, std::string& line) { + if (current_pos_ == end_pos_) { +const auto current_bytes_read = fillBuffer(stream); +if (io::isError(current_bytes_read) || current_bytes_read == 0) { + return current_bytes_read; +} + } + + int64_t pos = current_pos_; + while (pos < end_pos_ && buffer_[pos] != '\n') { +++pos; + } + if (pos < end_pos_ && buffer_[pos] == '\n') { +++pos; +line = std::string{buffer_.begin() + current_pos_, buffer_.begin() + pos}; +current_pos_ = pos; +return line.size(); + } + + std::string start_of_line{buffer_.begin() + current_pos_, buffer_.begin() + end_pos_}; + + const auto current_bytes_read = fillBuffer(stream); + if (io::isError(current_bytes_read)) { +return io::STREAM_ERROR; + } else if (current_bytes_read == 0) { +line = start_of_line; +return line.size(); + } + + pos = current_pos_; + while (pos < end_pos_ && buffer_[pos] != '\n') { Review comment: the bug in (3) is fixed by https://github.com/apache/nifi-minifi-cpp/pull/1170/commits/4df70bdec67d7f91182e10b01e6df646acf5e85a -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor
fgerlits commented on a change in pull request #1170: URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705200372 ## File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp ## @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "utils/LineByLineInputOutputStreamCallback.h" + +#include "utils/gsl.h" + +namespace org::apache::nifi::minifi::utils { + +LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(uint64_t buffer_size, std::shared_ptr logger, CallbackType callback) + : buffer_(buffer_size), +logger_(std::move(logger)), +callback_(std::move(callback)) { +} + +int64_t LineByLineInputOutputStreamCallback::process(const std::shared_ptr& input, const std::shared_ptr& output) { + gsl_Expects(input); + gsl_Expects(output); + + input_size_ = input->size(); + + if (int64_t status = readLine(*input); status <= 0) { +return status; + } + + bool is_first_line = true; + do { +if (int64_t status = readLine(*input); status < 0) { + return status; +} +std::string output_line = callback_(*current_line_, is_first_line, isLastLine()); +output->write(reinterpret_cast(output_line.data()), output_line.size()); +is_first_line = false; + } while (!isLastLine()); + + return gsl::narrow(total_bytes_read_); +} + +int64_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream) { + std::string input_line; + const size_t current_read = readLine(stream, input_line); + if (io::isError(current_read)) { +return -1; + } + if (current_read == 0) { +current_line_ = next_line_; +next_line_ = std::nullopt; +return 0; + } + current_line_ = next_line_; + next_line_ = input_line; + total_bytes_read_ += current_read; + return gsl::narrow(current_read); +} + +size_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream, std::string& line) { + if (current_pos_ == end_pos_) { +const auto current_bytes_read = fillBuffer(stream); +if (io::isError(current_bytes_read) || current_bytes_read == 0) { + return current_bytes_read; +} + } + + int64_t pos = current_pos_; + while (pos < end_pos_ && buffer_[pos] != '\n') { +++pos; + } + if (pos < end_pos_ && buffer_[pos] == '\n') { +++pos; +line = std::string{buffer_.begin() + current_pos_, buffer_.begin() + pos}; +current_pos_ = pos; +return line.size(); + } + + std::string start_of_line{buffer_.begin() + current_pos_, buffer_.begin() + end_pos_}; + + const auto current_bytes_read = fillBuffer(stream); + if (io::isError(current_bytes_read)) { +return io::STREAM_ERROR; + } else if (current_bytes_read == 0) { +line = start_of_line; +return line.size(); + } + + pos = current_pos_; + while (pos < end_pos_ && buffer_[pos] != '\n') { Review comment: (3) is handled correctly (the `\n`-less last line is treated as a line, is processed and output without a `\n`), **except** in the edge case where the last line doesn't end in a `\n` **and** it straddles a buffer boundary, which causes an error (flow file routed to failure). Thanks for pointing this out, I'll fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor
adamdebreceni commented on a change in pull request #1170: URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705173760 ## File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp ## @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "utils/LineByLineInputOutputStreamCallback.h" + +#include "utils/gsl.h" + +namespace org::apache::nifi::minifi::utils { + +LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(uint64_t buffer_size, std::shared_ptr logger, CallbackType callback) + : buffer_(buffer_size), +logger_(std::move(logger)), +callback_(std::move(callback)) { +} + +int64_t LineByLineInputOutputStreamCallback::process(const std::shared_ptr& input, const std::shared_ptr& output) { + gsl_Expects(input); + gsl_Expects(output); + + input_size_ = input->size(); + + if (int64_t status = readLine(*input); status <= 0) { +return status; + } + + bool is_first_line = true; + do { +if (int64_t status = readLine(*input); status < 0) { + return status; +} +std::string output_line = callback_(*current_line_, is_first_line, isLastLine()); +output->write(reinterpret_cast(output_line.data()), output_line.size()); +is_first_line = false; + } while (!isLastLine()); + + return gsl::narrow(total_bytes_read_); +} + +int64_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream) { + std::string input_line; + const size_t current_read = readLine(stream, input_line); + if (io::isError(current_read)) { +return -1; + } + if (current_read == 0) { +current_line_ = next_line_; +next_line_ = std::nullopt; +return 0; + } + current_line_ = next_line_; + next_line_ = input_line; + total_bytes_read_ += current_read; + return gsl::narrow(current_read); +} + +size_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream, std::string& line) { + if (current_pos_ == end_pos_) { +const auto current_bytes_read = fillBuffer(stream); +if (io::isError(current_bytes_read) || current_bytes_read == 0) { + return current_bytes_read; +} + } + + int64_t pos = current_pos_; + while (pos < end_pos_ && buffer_[pos] != '\n') { +++pos; + } + if (pos < end_pos_ && buffer_[pos] == '\n') { +++pos; +line = std::string{buffer_.begin() + current_pos_, buffer_.begin() + pos}; +current_pos_ = pos; +return line.size(); + } + + std::string start_of_line{buffer_.begin() + current_pos_, buffer_.begin() + end_pos_}; + + const auto current_bytes_read = fillBuffer(stream); + if (io::isError(current_bytes_read)) { +return io::STREAM_ERROR; + } else if (current_bytes_read == 0) { +line = start_of_line; +return line.size(); + } + + pos = current_pos_; + while (pos < end_pos_ && buffer_[pos] != '\n') { +++pos; + } + if (pos < end_pos_ && buffer_[pos] == '\n') { +++pos; +line = start_of_line + std::string{buffer_.begin() + current_pos_, buffer_.begin() + pos}; +current_pos_ = pos; +return line.size(); + } + + logger_->log_error("Found a line longer than the max buffer size (%zu bytes)", buffer_.size()); Review comment: I guess I was just making an observation aloud :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor
fgerlits commented on a change in pull request #1170: URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705172435 ## File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp ## @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "utils/LineByLineInputOutputStreamCallback.h" + +#include "utils/gsl.h" + +namespace org::apache::nifi::minifi::utils { + +LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(uint64_t buffer_size, std::shared_ptr logger, CallbackType callback) + : buffer_(buffer_size), +logger_(std::move(logger)), +callback_(std::move(callback)) { +} + +int64_t LineByLineInputOutputStreamCallback::process(const std::shared_ptr& input, const std::shared_ptr& output) { + gsl_Expects(input); + gsl_Expects(output); + + input_size_ = input->size(); + + if (int64_t status = readLine(*input); status <= 0) { +return status; + } + + bool is_first_line = true; + do { +if (int64_t status = readLine(*input); status < 0) { + return status; +} +std::string output_line = callback_(*current_line_, is_first_line, isLastLine()); +output->write(reinterpret_cast(output_line.data()), output_line.size()); +is_first_line = false; + } while (!isLastLine()); + + return gsl::narrow(total_bytes_read_); +} + +int64_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream) { + std::string input_line; + const size_t current_read = readLine(stream, input_line); + if (io::isError(current_read)) { +return -1; + } + if (current_read == 0) { +current_line_ = next_line_; +next_line_ = std::nullopt; +return 0; + } + current_line_ = next_line_; + next_line_ = input_line; + total_bytes_read_ += current_read; + return gsl::narrow(current_read); +} + +size_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream, std::string& line) { + if (current_pos_ == end_pos_) { +const auto current_bytes_read = fillBuffer(stream); +if (io::isError(current_bytes_read) || current_bytes_read == 0) { + return current_bytes_read; +} + } + + int64_t pos = current_pos_; + while (pos < end_pos_ && buffer_[pos] != '\n') { +++pos; + } + if (pos < end_pos_ && buffer_[pos] == '\n') { +++pos; +line = std::string{buffer_.begin() + current_pos_, buffer_.begin() + pos}; +current_pos_ = pos; +return line.size(); + } + + std::string start_of_line{buffer_.begin() + current_pos_, buffer_.begin() + end_pos_}; + + const auto current_bytes_read = fillBuffer(stream); + if (io::isError(current_bytes_read)) { +return io::STREAM_ERROR; + } else if (current_bytes_read == 0) { +line = start_of_line; +return line.size(); + } + + pos = current_pos_; + while (pos < end_pos_ && buffer_[pos] != '\n') { Review comment: Just speculating, but that may have been done in NiFi to deal with the old MacOS line endings of bare `\r`. Since these line endings are not used by any computer in use today, I think using `\n`, which covers Windows, Linux and MacOS X+, is fine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor
adamdebreceni commented on a change in pull request #1170: URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705169032 ## File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp ## @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "utils/LineByLineInputOutputStreamCallback.h" + +#include "utils/gsl.h" + +namespace org::apache::nifi::minifi::utils { + +LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(uint64_t buffer_size, std::shared_ptr logger, CallbackType callback) + : buffer_(buffer_size), +logger_(std::move(logger)), +callback_(std::move(callback)) { +} + +int64_t LineByLineInputOutputStreamCallback::process(const std::shared_ptr& input, const std::shared_ptr& output) { + gsl_Expects(input); + gsl_Expects(output); + + input_size_ = input->size(); + + if (int64_t status = readLine(*input); status <= 0) { +return status; + } + + bool is_first_line = true; + do { +if (int64_t status = readLine(*input); status < 0) { + return status; +} +std::string output_line = callback_(*current_line_, is_first_line, isLastLine()); +output->write(reinterpret_cast(output_line.data()), output_line.size()); +is_first_line = false; + } while (!isLastLine()); + + return gsl::narrow(total_bytes_read_); +} + +int64_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream) { + std::string input_line; + const size_t current_read = readLine(stream, input_line); + if (io::isError(current_read)) { +return -1; + } + if (current_read == 0) { +current_line_ = next_line_; +next_line_ = std::nullopt; +return 0; + } + current_line_ = next_line_; + next_line_ = input_line; + total_bytes_read_ += current_read; + return gsl::narrow(current_read); +} + +size_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream, std::string& line) { + if (current_pos_ == end_pos_) { +const auto current_bytes_read = fillBuffer(stream); +if (io::isError(current_bytes_read) || current_bytes_read == 0) { + return current_bytes_read; +} + } + + int64_t pos = current_pos_; + while (pos < end_pos_ && buffer_[pos] != '\n') { +++pos; + } + if (pos < end_pos_ && buffer_[pos] == '\n') { +++pos; +line = std::string{buffer_.begin() + current_pos_, buffer_.begin() + pos}; +current_pos_ = pos; +return line.size(); + } + + std::string start_of_line{buffer_.begin() + current_pos_, buffer_.begin() + end_pos_}; + + const auto current_bytes_read = fillBuffer(stream); + if (io::isError(current_bytes_read)) { +return io::STREAM_ERROR; + } else if (current_bytes_read == 0) { +line = start_of_line; +return line.size(); + } + + pos = current_pos_; + while (pos < end_pos_ && buffer_[pos] != '\n') { Review comment: unfortunately it seems nifi has a different concept of "line", it uses the `LineDemarcator` which 1. considers all `\n` as end-of-line 2. considers all `\r` followed by a non-`\n` as end-of-line 3. considers a non-empty (!) tail after the last end-of-line as a line (or beginning of string) (it's quite confusing) this was implemented in RouteText to be in-line with nifi, but of course we could decide to change them to use a different segmentation logic -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor
adamdebreceni commented on a change in pull request #1170: URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705169032 ## File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp ## @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "utils/LineByLineInputOutputStreamCallback.h" + +#include "utils/gsl.h" + +namespace org::apache::nifi::minifi::utils { + +LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(uint64_t buffer_size, std::shared_ptr logger, CallbackType callback) + : buffer_(buffer_size), +logger_(std::move(logger)), +callback_(std::move(callback)) { +} + +int64_t LineByLineInputOutputStreamCallback::process(const std::shared_ptr& input, const std::shared_ptr& output) { + gsl_Expects(input); + gsl_Expects(output); + + input_size_ = input->size(); + + if (int64_t status = readLine(*input); status <= 0) { +return status; + } + + bool is_first_line = true; + do { +if (int64_t status = readLine(*input); status < 0) { + return status; +} +std::string output_line = callback_(*current_line_, is_first_line, isLastLine()); +output->write(reinterpret_cast(output_line.data()), output_line.size()); +is_first_line = false; + } while (!isLastLine()); + + return gsl::narrow(total_bytes_read_); +} + +int64_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream) { + std::string input_line; + const size_t current_read = readLine(stream, input_line); + if (io::isError(current_read)) { +return -1; + } + if (current_read == 0) { +current_line_ = next_line_; +next_line_ = std::nullopt; +return 0; + } + current_line_ = next_line_; + next_line_ = input_line; + total_bytes_read_ += current_read; + return gsl::narrow(current_read); +} + +size_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream, std::string& line) { + if (current_pos_ == end_pos_) { +const auto current_bytes_read = fillBuffer(stream); +if (io::isError(current_bytes_read) || current_bytes_read == 0) { + return current_bytes_read; +} + } + + int64_t pos = current_pos_; + while (pos < end_pos_ && buffer_[pos] != '\n') { +++pos; + } + if (pos < end_pos_ && buffer_[pos] == '\n') { +++pos; +line = std::string{buffer_.begin() + current_pos_, buffer_.begin() + pos}; +current_pos_ = pos; +return line.size(); + } + + std::string start_of_line{buffer_.begin() + current_pos_, buffer_.begin() + end_pos_}; + + const auto current_bytes_read = fillBuffer(stream); + if (io::isError(current_bytes_read)) { +return io::STREAM_ERROR; + } else if (current_bytes_read == 0) { +line = start_of_line; +return line.size(); + } + + pos = current_pos_; + while (pos < end_pos_ && buffer_[pos] != '\n') { Review comment: unfortunately it seems nifi has a different concept of "line", it uses the `LineDemarcator` which 1. considers all `\n` as end-of-line 2. considers all `\r` followed by a non-`\n` as end-of-line 3. considers a non-empty (!) tail after the last end-of-line as a line (or beginning of line) (it's quite confusing) this was implemented in RouteText to be in-line with nifi, but of course we could decide to change them to use a different segmentation logic -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor
adamdebreceni commented on a change in pull request #1170: URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705169032 ## File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp ## @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "utils/LineByLineInputOutputStreamCallback.h" + +#include "utils/gsl.h" + +namespace org::apache::nifi::minifi::utils { + +LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(uint64_t buffer_size, std::shared_ptr logger, CallbackType callback) + : buffer_(buffer_size), +logger_(std::move(logger)), +callback_(std::move(callback)) { +} + +int64_t LineByLineInputOutputStreamCallback::process(const std::shared_ptr& input, const std::shared_ptr& output) { + gsl_Expects(input); + gsl_Expects(output); + + input_size_ = input->size(); + + if (int64_t status = readLine(*input); status <= 0) { +return status; + } + + bool is_first_line = true; + do { +if (int64_t status = readLine(*input); status < 0) { + return status; +} +std::string output_line = callback_(*current_line_, is_first_line, isLastLine()); +output->write(reinterpret_cast(output_line.data()), output_line.size()); +is_first_line = false; + } while (!isLastLine()); + + return gsl::narrow(total_bytes_read_); +} + +int64_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream) { + std::string input_line; + const size_t current_read = readLine(stream, input_line); + if (io::isError(current_read)) { +return -1; + } + if (current_read == 0) { +current_line_ = next_line_; +next_line_ = std::nullopt; +return 0; + } + current_line_ = next_line_; + next_line_ = input_line; + total_bytes_read_ += current_read; + return gsl::narrow(current_read); +} + +size_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream, std::string& line) { + if (current_pos_ == end_pos_) { +const auto current_bytes_read = fillBuffer(stream); +if (io::isError(current_bytes_read) || current_bytes_read == 0) { + return current_bytes_read; +} + } + + int64_t pos = current_pos_; + while (pos < end_pos_ && buffer_[pos] != '\n') { +++pos; + } + if (pos < end_pos_ && buffer_[pos] == '\n') { +++pos; +line = std::string{buffer_.begin() + current_pos_, buffer_.begin() + pos}; +current_pos_ = pos; +return line.size(); + } + + std::string start_of_line{buffer_.begin() + current_pos_, buffer_.begin() + end_pos_}; + + const auto current_bytes_read = fillBuffer(stream); + if (io::isError(current_bytes_read)) { +return io::STREAM_ERROR; + } else if (current_bytes_read == 0) { +line = start_of_line; +return line.size(); + } + + pos = current_pos_; + while (pos < end_pos_ && buffer_[pos] != '\n') { Review comment: unfortunately it seems nifi has a different concept of "line", it uses the `LineDemarcator` which 1. considers all `\n` as end-of-line 2. considers all `\r` followed by a non-`\n` as end-of-line 3. considers a non-empty (!) tail after the last end-of-line as a line (it's quite confusing) this was implemented in RouteText to be in-line with nifi, but of course we could decide to change them to use a different segmentation logic -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor
fgerlits commented on a change in pull request #1170: URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705166242 ## File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp ## @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "utils/LineByLineInputOutputStreamCallback.h" + +#include "utils/gsl.h" + +namespace org::apache::nifi::minifi::utils { + +LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(uint64_t buffer_size, std::shared_ptr logger, CallbackType callback) + : buffer_(buffer_size), +logger_(std::move(logger)), +callback_(std::move(callback)) { +} + +int64_t LineByLineInputOutputStreamCallback::process(const std::shared_ptr& input, const std::shared_ptr& output) { + gsl_Expects(input); + gsl_Expects(output); + + input_size_ = input->size(); + + if (int64_t status = readLine(*input); status <= 0) { +return status; + } + + bool is_first_line = true; + do { +if (int64_t status = readLine(*input); status < 0) { + return status; +} +std::string output_line = callback_(*current_line_, is_first_line, isLastLine()); +output->write(reinterpret_cast(output_line.data()), output_line.size()); +is_first_line = false; + } while (!isLastLine()); + + return gsl::narrow(total_bytes_read_); +} + +int64_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream) { + std::string input_line; + const size_t current_read = readLine(stream, input_line); + if (io::isError(current_read)) { +return -1; + } + if (current_read == 0) { +current_line_ = next_line_; +next_line_ = std::nullopt; +return 0; + } + current_line_ = next_line_; + next_line_ = input_line; + total_bytes_read_ += current_read; + return gsl::narrow(current_read); +} + +size_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream, std::string& line) { + if (current_pos_ == end_pos_) { +const auto current_bytes_read = fillBuffer(stream); +if (io::isError(current_bytes_read) || current_bytes_read == 0) { + return current_bytes_read; +} + } + + int64_t pos = current_pos_; + while (pos < end_pos_ && buffer_[pos] != '\n') { +++pos; + } + if (pos < end_pos_ && buffer_[pos] == '\n') { +++pos; +line = std::string{buffer_.begin() + current_pos_, buffer_.begin() + pos}; +current_pos_ = pos; +return line.size(); + } + + std::string start_of_line{buffer_.begin() + current_pos_, buffer_.begin() + end_pos_}; + + const auto current_bytes_read = fillBuffer(stream); + if (io::isError(current_bytes_read)) { +return io::STREAM_ERROR; + } else if (current_bytes_read == 0) { +line = start_of_line; +return line.size(); + } + + pos = current_pos_; + while (pos < end_pos_ && buffer_[pos] != '\n') { +++pos; + } + if (pos < end_pos_ && buffer_[pos] == '\n') { +++pos; +line = start_of_line + std::string{buffer_.begin() + current_pos_, buffer_.begin() + pos}; +current_pos_ = pos; +return line.size(); + } + + logger_->log_error("Found a line longer than the max buffer size (%zu bytes)", buffer_.size()); Review comment: That's correct, lines up to 2 times the buffer size may be accepted, but only lines <= the buffer size are guaranteed to be accepted. Do you think that is a problem? Or do you think we need to make the error message more verbose? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor
fgerlits commented on a change in pull request #1170: URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705163626 ## File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp ## @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "utils/LineByLineInputOutputStreamCallback.h" + +#include "utils/gsl.h" + +namespace org::apache::nifi::minifi::utils { + +LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(uint64_t buffer_size, std::shared_ptr logger, CallbackType callback) + : buffer_(buffer_size), +logger_(std::move(logger)), +callback_(std::move(callback)) { +} + +int64_t LineByLineInputOutputStreamCallback::process(const std::shared_ptr& input, const std::shared_ptr& output) { + gsl_Expects(input); + gsl_Expects(output); + + input_size_ = input->size(); + + if (int64_t status = readLine(*input); status <= 0) { +return status; + } + + bool is_first_line = true; + do { +if (int64_t status = readLine(*input); status < 0) { + return status; +} +std::string output_line = callback_(*current_line_, is_first_line, isLastLine()); +output->write(reinterpret_cast(output_line.data()), output_line.size()); +is_first_line = false; + } while (!isLastLine()); + + return gsl::narrow(total_bytes_read_); +} + +int64_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream) { + std::string input_line; + const size_t current_read = readLine(stream, input_line); + if (io::isError(current_read)) { +return -1; + } + if (current_read == 0) { +current_line_ = next_line_; +next_line_ = std::nullopt; +return 0; + } + current_line_ = next_line_; + next_line_ = input_line; + total_bytes_read_ += current_read; + return gsl::narrow(current_read); +} + +size_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream, std::string& line) { + if (current_pos_ == end_pos_) { +const auto current_bytes_read = fillBuffer(stream); Review comment: thanks, I'll take a look -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor
fgerlits commented on a change in pull request #1170: URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705163375 ## File path: extensions/standard-processors/processors/ReplaceText.h ## @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include + +#include "core/Annotation.h" +#include "core/Processor.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/ProcessSessionFactory.h" +#include "core/Resource.h" +#include "core/logging/Logger.h" +#include "utils/Enum.h" + +namespace org::apache::nifi::minifi::processors { + +SMART_ENUM(EvaluationModeType, + (LINE_BY_LINE, "Line-by-Line"), + (ENTIRE_TEXT, "Entire text") +) + +SMART_ENUM(LineByLineEvaluationModeType, + (ALL, "All"), + (FIRST_LINE, "First-Line"), + (LAST_LINE, "Last-Line"), + (EXCEPT_FIRST_LINE, "Except-First-Line"), + (EXCEPT_LAST_LINE, "Except-Last-Line") +) + +SMART_ENUM(ReplacementStrategyType, + (PREPEND, "Prepend"), + (APPEND, "Append"), + (REGEX_REPLACE, "Regex Replace"), + (LITERAL_REPLACE, "Literal Replace"), + (ALWAYS_REPLACE, "Always Replace"), + (SUBSTITUTE_VARIABLES, "Substitute Variables") +) + +class ReplaceText : public core::Processor { + public: + static const core::Property EvaluationMode; + static const core::Property LineByLineEvaluationMode; + static const core::Property ReplacementStrategy; + static const core::Property MaximumBufferSize; + static const core::Property SearchValue; + static const core::Property ReplacementValue; + + static const core::Relationship Success; + static const core::Relationship Failure; + + explicit ReplaceText(const std::string& name, const utils::Identifier& uuid = {}); + core::annotation::Input getInputRequirement() const override; + void initialize() override; + void onSchedule(const std::shared_ptr& context, const std::shared_ptr&) override; + void onTrigger(const std::shared_ptr& context, const std::shared_ptr& session) override; + + private: + friend struct ReplaceTextTestAccessor; + + void readSearchValueProperty(const std::shared_ptr& context, const std::shared_ptr& flow_file); + void readReplacementValueProperty(const std::shared_ptr& context, const std::shared_ptr& flow_file); + + void replaceTextInEntireFile(const std::shared_ptr& flow_file, const std::shared_ptr& session) const; + void replaceTextLineByLine(const std::shared_ptr& flow_file, const std::shared_ptr& session) const; + + std::string applyReplacements(const std::string& input, const std::shared_ptr& flow_file) const; + std::string applyRegexReplace(const std::string& input) const; + std::string applyLiteralReplace(const std::string& input) const; + std::string applySubstituteVariables(const std::string& input, const std::shared_ptr& flow_file) const; + std::string getAttributeValue(const std::shared_ptr& flow_file, const std::smatch& match) const; + + std::shared_ptr logger_; + std::string search_value_; + std::regex search_regex_; + std::string replacement_value_; + uint64_t maximum_buffer_size_ = 0; + EvaluationModeType evaluation_mode_ = EvaluationModeType::LINE_BY_LINE; + LineByLineEvaluationModeType line_by_line_evaluation_mode_ = LineByLineEvaluationModeType::ALL; + ReplacementStrategyType replacement_strategy_ = ReplacementStrategyType::REGEX_REPLACE; +}; + +REGISTER_RESOURCE(ReplaceText, "Updates the content of a FlowFile by replacing parts of it using various replacement strategies."); Review comment: OK, I'll move it after #1138 is merged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor
fgerlits commented on a change in pull request #1170: URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705162395 ## File path: docker/test/integration/MiNiFi_integration_test_driver.py ## @@ -104,7 +104,7 @@ def generate_input_port_for_remote_process_group(remote_process_group, name): return input_port_node def add_test_data(self, path, test_data, file_name=str(uuid.uuid4())): -self.docker_directory_bindings.put_file_to_docker_path(self.test_id, path, file_name, test_data.encode('utf-8')) +self.docker_directory_bindings.put_file_to_docker_path(self.test_id, path, file_name, test_data.replace('\\n', '\n').encode('utf-8')) Review comment: OK, I'll revert these bits if your PR is merged first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor
adamdebreceni commented on a change in pull request #1170: URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705159596 ## File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp ## @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "utils/LineByLineInputOutputStreamCallback.h" + +#include "utils/gsl.h" + +namespace org::apache::nifi::minifi::utils { + +LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(uint64_t buffer_size, std::shared_ptr logger, CallbackType callback) + : buffer_(buffer_size), +logger_(std::move(logger)), +callback_(std::move(callback)) { +} + +int64_t LineByLineInputOutputStreamCallback::process(const std::shared_ptr& input, const std::shared_ptr& output) { + gsl_Expects(input); + gsl_Expects(output); + + input_size_ = input->size(); + + if (int64_t status = readLine(*input); status <= 0) { +return status; + } + + bool is_first_line = true; + do { +if (int64_t status = readLine(*input); status < 0) { + return status; +} +std::string output_line = callback_(*current_line_, is_first_line, isLastLine()); +output->write(reinterpret_cast(output_line.data()), output_line.size()); +is_first_line = false; + } while (!isLastLine()); + + return gsl::narrow(total_bytes_read_); +} + +int64_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream) { + std::string input_line; + const size_t current_read = readLine(stream, input_line); + if (io::isError(current_read)) { +return -1; + } + if (current_read == 0) { +current_line_ = next_line_; +next_line_ = std::nullopt; +return 0; + } + current_line_ = next_line_; + next_line_ = input_line; + total_bytes_read_ += current_read; + return gsl::narrow(current_read); +} + +size_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream, std::string& line) { + if (current_pos_ == end_pos_) { +const auto current_bytes_read = fillBuffer(stream); +if (io::isError(current_bytes_read) || current_bytes_read == 0) { + return current_bytes_read; +} + } + + int64_t pos = current_pos_; + while (pos < end_pos_ && buffer_[pos] != '\n') { +++pos; + } + if (pos < end_pos_ && buffer_[pos] == '\n') { +++pos; +line = std::string{buffer_.begin() + current_pos_, buffer_.begin() + pos}; +current_pos_ = pos; +return line.size(); + } + + std::string start_of_line{buffer_.begin() + current_pos_, buffer_.begin() + end_pos_}; + + const auto current_bytes_read = fillBuffer(stream); + if (io::isError(current_bytes_read)) { +return io::STREAM_ERROR; + } else if (current_bytes_read == 0) { +line = start_of_line; +return line.size(); + } + + pos = current_pos_; + while (pos < end_pos_ && buffer_[pos] != '\n') { +++pos; + } + if (pos < end_pos_ && buffer_[pos] == '\n') { +++pos; +line = start_of_line + std::string{buffer_.begin() + current_pos_, buffer_.begin() + pos}; +current_pos_ = pos; +return line.size(); + } + + logger_->log_error("Found a line longer than the max buffer size (%zu bytes)", buffer_.size()); Review comment: it seems we might process longer than `buffer size` lines, as we might append the end of the previous buffer fill -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor
adamdebreceni commented on a change in pull request #1170: URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705156210 ## File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp ## @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "utils/LineByLineInputOutputStreamCallback.h" + +#include "utils/gsl.h" + +namespace org::apache::nifi::minifi::utils { + +LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(uint64_t buffer_size, std::shared_ptr logger, CallbackType callback) + : buffer_(buffer_size), +logger_(std::move(logger)), +callback_(std::move(callback)) { +} + +int64_t LineByLineInputOutputStreamCallback::process(const std::shared_ptr& input, const std::shared_ptr& output) { + gsl_Expects(input); + gsl_Expects(output); + + input_size_ = input->size(); + + if (int64_t status = readLine(*input); status <= 0) { +return status; + } + + bool is_first_line = true; + do { +if (int64_t status = readLine(*input); status < 0) { + return status; +} +std::string output_line = callback_(*current_line_, is_first_line, isLastLine()); +output->write(reinterpret_cast(output_line.data()), output_line.size()); +is_first_line = false; + } while (!isLastLine()); + + return gsl::narrow(total_bytes_read_); +} + +int64_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream) { + std::string input_line; + const size_t current_read = readLine(stream, input_line); + if (io::isError(current_read)) { +return -1; + } + if (current_read == 0) { +current_line_ = next_line_; +next_line_ = std::nullopt; +return 0; + } + current_line_ = next_line_; + next_line_ = input_line; + total_bytes_read_ += current_read; + return gsl::narrow(current_read); +} + +size_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream, std::string& line) { + if (current_pos_ == end_pos_) { +const auto current_bytes_read = fillBuffer(stream); Review comment: note: the way it is implemented in RouteText is to check if the input stream supports access to its underlying buffer (BufferStream and the default RocksDbStream do) and only read into a buffer as a fallback -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor
adamdebreceni commented on a change in pull request #1170: URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705156210 ## File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp ## @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "utils/LineByLineInputOutputStreamCallback.h" + +#include "utils/gsl.h" + +namespace org::apache::nifi::minifi::utils { + +LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(uint64_t buffer_size, std::shared_ptr logger, CallbackType callback) + : buffer_(buffer_size), +logger_(std::move(logger)), +callback_(std::move(callback)) { +} + +int64_t LineByLineInputOutputStreamCallback::process(const std::shared_ptr& input, const std::shared_ptr& output) { + gsl_Expects(input); + gsl_Expects(output); + + input_size_ = input->size(); + + if (int64_t status = readLine(*input); status <= 0) { +return status; + } + + bool is_first_line = true; + do { +if (int64_t status = readLine(*input); status < 0) { + return status; +} +std::string output_line = callback_(*current_line_, is_first_line, isLastLine()); +output->write(reinterpret_cast(output_line.data()), output_line.size()); +is_first_line = false; + } while (!isLastLine()); + + return gsl::narrow(total_bytes_read_); +} + +int64_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream) { + std::string input_line; + const size_t current_read = readLine(stream, input_line); + if (io::isError(current_read)) { +return -1; + } + if (current_read == 0) { +current_line_ = next_line_; +next_line_ = std::nullopt; +return 0; + } + current_line_ = next_line_; + next_line_ = input_line; + total_bytes_read_ += current_read; + return gsl::narrow(current_read); +} + +size_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream, std::string& line) { + if (current_pos_ == end_pos_) { +const auto current_bytes_read = fillBuffer(stream); Review comment: the way it is implemented in RouteText is to check if the input stream supports access to its underlying buffer (BufferStream and the default RocksDbStream do) and only read into a buffer as a fallback -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] Lehel44 commented on a change in pull request #5356: NIFI-9183: Add a command-line option to save status history
Lehel44 commented on a change in pull request #5356: URL: https://github.com/apache/nifi/pull/5356#discussion_r705143354 ## File path: nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java ## @@ -275,6 +282,9 @@ public static void main(String[] args) throws IOException, InterruptedException case "env": runNiFi.env(); break; +case "status-history": Review comment: Diagnostics works the same way. If you run a command with the same filename you ran it before with, you can generally expect that to be overwritten. I'll extend the documentation with this piece of information. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor
adamdebreceni commented on a change in pull request #1170: URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705141850 ## File path: extensions/standard-processors/processors/ReplaceText.h ## @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include + +#include "core/Annotation.h" +#include "core/Processor.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/ProcessSessionFactory.h" +#include "core/Resource.h" +#include "core/logging/Logger.h" +#include "utils/Enum.h" + +namespace org::apache::nifi::minifi::processors { + +SMART_ENUM(EvaluationModeType, + (LINE_BY_LINE, "Line-by-Line"), + (ENTIRE_TEXT, "Entire text") +) + +SMART_ENUM(LineByLineEvaluationModeType, + (ALL, "All"), + (FIRST_LINE, "First-Line"), + (LAST_LINE, "Last-Line"), + (EXCEPT_FIRST_LINE, "Except-First-Line"), + (EXCEPT_LAST_LINE, "Except-Last-Line") +) + +SMART_ENUM(ReplacementStrategyType, + (PREPEND, "Prepend"), + (APPEND, "Append"), + (REGEX_REPLACE, "Regex Replace"), + (LITERAL_REPLACE, "Literal Replace"), + (ALWAYS_REPLACE, "Always Replace"), + (SUBSTITUTE_VARIABLES, "Substitute Variables") +) + +class ReplaceText : public core::Processor { + public: + static const core::Property EvaluationMode; + static const core::Property LineByLineEvaluationMode; + static const core::Property ReplacementStrategy; + static const core::Property MaximumBufferSize; + static const core::Property SearchValue; + static const core::Property ReplacementValue; + + static const core::Relationship Success; + static const core::Relationship Failure; + + explicit ReplaceText(const std::string& name, const utils::Identifier& uuid = {}); + core::annotation::Input getInputRequirement() const override; + void initialize() override; + void onSchedule(const std::shared_ptr& context, const std::shared_ptr&) override; + void onTrigger(const std::shared_ptr& context, const std::shared_ptr& session) override; + + private: + friend struct ReplaceTextTestAccessor; + + void readSearchValueProperty(const std::shared_ptr& context, const std::shared_ptr& flow_file); + void readReplacementValueProperty(const std::shared_ptr& context, const std::shared_ptr& flow_file); + + void replaceTextInEntireFile(const std::shared_ptr& flow_file, const std::shared_ptr& session) const; + void replaceTextLineByLine(const std::shared_ptr& flow_file, const std::shared_ptr& session) const; + + std::string applyReplacements(const std::string& input, const std::shared_ptr& flow_file) const; + std::string applyRegexReplace(const std::string& input) const; + std::string applyLiteralReplace(const std::string& input) const; + std::string applySubstituteVariables(const std::string& input, const std::shared_ptr& flow_file) const; + std::string getAttributeValue(const std::shared_ptr& flow_file, const std::smatch& match) const; + + std::shared_ptr logger_; + std::string search_value_; + std::regex search_regex_; + std::string replacement_value_; + uint64_t maximum_buffer_size_ = 0; + EvaluationModeType evaluation_mode_ = EvaluationModeType::LINE_BY_LINE; + LineByLineEvaluationModeType line_by_line_evaluation_mode_ = LineByLineEvaluationModeType::ALL; + ReplacementStrategyType replacement_strategy_ = ReplacementStrategyType::REGEX_REPLACE; +}; + +REGISTER_RESOURCE(ReplaceText, "Updates the content of a FlowFile by replacing parts of it using various replacement strategies."); Review comment: note: after the dynamic PR, resource registration should happen in cpp files -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor
adamdebreceni commented on a change in pull request #1170: URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705139497 ## File path: docker/test/integration/MiNiFi_integration_test_driver.py ## @@ -104,7 +104,7 @@ def generate_input_port_for_remote_process_group(remote_process_group, name): return input_port_node def add_test_data(self, path, test_data, file_name=str(uuid.uuid4())): -self.docker_directory_bindings.put_file_to_docker_path(self.test_id, path, file_name, test_data.encode('utf-8')) +self.docker_directory_bindings.put_file_to_docker_path(self.test_id, path, file_name, test_data.replace('\\n', '\n').encode('utf-8')) Review comment: note: I have also touched this part for the same reason (use escaped `\t`, `\n`, ... chars) in [1168](https://github.com/apache/nifi-minifi-cpp/pull/1168) (same with the file content validation) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (NIFI-9206) Create a processor that is capable of removing fields from records
Peter Gyori created NIFI-9206: - Summary: Create a processor that is capable of removing fields from records Key: NIFI-9206 URL: https://issues.apache.org/jira/browse/NIFI-9206 Project: Apache NiFi Issue Type: New Feature Components: Extensions Reporter: Peter Gyori Assignee: Peter Gyori A processor should be created that is capable of removing fields from records (RemoveRecordField might be a name for it). The processor should have 3 properties: * Record Reader (a Reader controller service could be specified) * Record Writer (a Writer controller service could be specified) * Field To Remove (expects a RecordPath that points to the field to be removed) The processor should be able to accept additional dynamic properties that specify further fields (by RecordPath) to be removed from the record. +*Example*+ +input:+ {code:java} { "id": 1, "name": "John", "address": { "zip": , "street": "Main", "building": 11 } } {code} +Field to remove:+ /address/building +output:+ {code:java} { "id": 1, "name": "John", "address": { "zip": , "street": "Main" } } {code} The record's schema should be modified accordingly (removing the /address/building field from the schema). Field removal should be permitted regardless of the field being nullable or not. Generally, the removal of a field should include the field's removal from the schema AND the data. The exception is if the removal is data-dependent (the field should be removed if its value equals "xyz"). In this case no schema modification should occur. The processor should be able to remove one or more elements from arrays (e.g.: /addresses[ 1 ] shoud remove the element from the addresses array from the 1st position). When removing a field from elements of an array, the array's schema should only be modified if the removal is applied to all elements of the array (i.e.: /addresses[ * ]/building should modify the schema of the array, but /addresses[ 1 ]/building should not). The same rule should be applied when handling Map datatype. If the record does not contain the field that is expected to be removed, the record should still be transferred to the 'success' relationship, with no modification. The expectation is that if /x/y is expected to be removed from the record, then the record leaving the processor should not contain /x/y field. If a certain field can be of different types (e.g. the address field can be "string" as well as "record" and possibly another "record" with a schema different from the former record type) then if /address/building is expected to be removed from the record, the processor is expected to remove the building field from the schema of all the possible types of the address field regardless of the address field being whatever concrete type in the particular record that is being processed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #1120: MINIFICPP-1374 Implement security protocol support for ConsumeKafka
lordgamez commented on a change in pull request #1120: URL: https://github.com/apache/nifi-minifi-cpp/pull/1120#discussion_r705083561 ## File path: extensions/librdkafka/ConsumeKafka.cpp ## @@ -211,14 +218,22 @@ void ConsumeKafka::onSchedule(core::ProcessContext* context, core::ProcessSessio context->getProperty(MessageHeaderEncoding.getName(), message_header_encoding_); context->getProperty(DuplicateHeaderHandling.getName(), duplicate_header_handling_); + std::string ssl_service_name; + std::shared_ptr ssl_service; + if (context->getProperty(SSLContextService.getName(), ssl_service_name) && !ssl_service_name.empty()) { +std::shared_ptr service = context->getControllerService(ssl_service_name); +if (service) { + ssl_service = std::static_pointer_cast(service); + ssl_data_.ca_loc = ssl_service->getCACertificate(); + ssl_data_.cert_loc = ssl_service->getCertificateFile(); + ssl_data_.key_loc = ssl_service->getPrivateKeyFile(); + ssl_data_.key_pw = ssl_service->getPassphrase(); +} + } Review comment: Updated in 0e5f15b290e21b011e99628a6e2198e88cfad106 ## File path: extensions/librdkafka/PublishKafka.cpp ## @@ -709,37 +717,70 @@ bool PublishKafka::configureNewConnection(const std::shared_ptrgetProperty(SecurityCA.getName(), value) && !value.empty()) { -result = rd_kafka_conf_set(conf_.get(), "ssl.ca.location", value.c_str(), errstr.data(), errstr.size()); -logger_->log_debug("PublishKafka: ssl.ca.location [%s]", value); + + std::shared_ptr ssl_service; + if (context->getProperty(SSLContextService.getName(), value) && !value.empty()) { +std::shared_ptr service = context->getControllerService(value); +if (service) { + ssl_service = std::static_pointer_cast(service); +} + } Review comment: Updated in 0e5f15b290e21b011e99628a6e2198e88cfad106 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (NIFI-9200) AbstractCSVLookupService cache remains on the heap after disabling the service
[ https://issues.apache.org/jira/browse/NIFI-9200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Peter Turcsanyi updated NIFI-9200: -- Fix Version/s: 1.15.0 Resolution: Fixed Status: Resolved (was: Patch Available) > AbstractCSVLookupService cache remains on the heap after disabling the service > -- > > Key: NIFI-9200 > URL: https://issues.apache.org/jira/browse/NIFI-9200 > Project: Apache NiFi > Issue Type: Bug >Reporter: Lehel Boér >Assignee: Lehel Boér >Priority: Major > Fix For: 1.15.0 > > Time Spent: 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi] asfgit closed pull request #5372: NIFI-9200: Free cache on heap after disabling AbstractCSVLookupService
asfgit closed pull request #5372: URL: https://github.com/apache/nifi/pull/5372 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (NIFI-9200) AbstractCSVLookupService cache remains on the heap after disabling the service
[ https://issues.apache.org/jira/browse/NIFI-9200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17412414#comment-17412414 ] ASF subversion and git services commented on NIFI-9200: --- Commit d96398feb89a77a27370c47e7e52d7f90721193e in nifi's branch refs/heads/main from Lehel Boér [ https://gitbox.apache.org/repos/asf?p=nifi.git;h=d96398f ] NIFI-9200: Free cache on heap after disabling AbstractCSVLookupService This closes #5372. Signed-off-by: Peter Turcsanyi > AbstractCSVLookupService cache remains on the heap after disabling the service > -- > > Key: NIFI-9200 > URL: https://issues.apache.org/jira/browse/NIFI-9200 > Project: Apache NiFi > Issue Type: Bug >Reporter: Lehel Boér >Assignee: Lehel Boér >Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi] pvillard31 commented on pull request #3641: NIFI-4542, add target.dir.created to indicate if the target directory created
pvillard31 commented on pull request #3641: URL: https://github.com/apache/nifi/pull/3641#issuecomment-915839442 @avseq1234 - it sounds like a straightforward change, can you rebase against main and fix conflicts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] KuKuDeCheng commented on a change in pull request #5318: NIFI-9064:Support Oracle timestamp when `Use Avro Logical Types` is t…
KuKuDeCheng commented on a change in pull request #5318: URL: https://github.com/apache/nifi/pull/5318#discussion_r705064997 ## File path: nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java ## @@ -1423,9 +1423,40 @@ public static Timestamp toTimestamp(final Object value, final Supplier
[GitHub] [nifi] avseq1234 opened a new pull request #3641: NIFI-4542, add target.dir.created to indicate if the target directory created
avseq1234 opened a new pull request #3641: URL: https://github.com/apache/nifi/pull/3641 … created Thank you for submitting a contribution to Apache NiFi. Please provide a short description of the PR here: ### Description of PR Add target.dir.created attribute to output flowfile to indicate if the target directory created. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [X] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [X] Does your PR title start with **NIFI-** where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [X] Has your PR been rebased against the latest commit within the target branch (typically `master`)? - [ ] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._ ### For code changes: - [ ] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder? - [X] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`? - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`? - [ ] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1138: MINIFICPP-1471 - Build extensions as dynamic libraries and dynamically load them
adamdebreceni commented on a change in pull request #1138: URL: https://github.com/apache/nifi-minifi-cpp/pull/1138#discussion_r705061409 ## File path: libminifi/include/core/extension/DynamicLibrary.h ## @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "Module.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace extension { + +class DynamicLibrary : public Module { + friend class ExtensionManager; + + public: + DynamicLibrary(std::string name, std::filesystem::path library_path); + ~DynamicLibrary() override; + + private: +#ifdef WIN32 + std::map resource_mapping_; + + std::string error_str_; + std::string current_error_; + + void store_error(); + void* dlsym(void* handle, const char* name); + const char* dlerror(); + void* dlopen(const char* file, int mode); + int dlclose(void* handle); +#endif + + bool load(); + bool unload(); + + std::filesystem::path library_path_; + gsl::owner handle_ = nullptr; + + static std::shared_ptr logger_; Review comment: should be done now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1138: MINIFICPP-1471 - Build extensions as dynamic libraries and dynamically load them
adamdebreceni commented on a change in pull request #1138: URL: https://github.com/apache/nifi-minifi-cpp/pull/1138#discussion_r705060665 ## File path: libminifi/test/unit/FilePatternTests.cpp ## @@ -0,0 +1,253 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define CUSTOM_EXTENSION_INIT + +#include + +#include "../TestBase.h" +#include "utils/file/FilePattern.h" +#include "range/v3/view/transform.hpp" +#include "range/v3/view/map.hpp" +#include "range/v3/view/join.hpp" +#include "range/v3/view/cache1.hpp" +#include "range/v3/view/c_str.hpp" +#include "range/v3/range/conversion.hpp" +#include "range/v3/range.hpp" + +struct FilePatternTestAccessor { + using FilePatternSegment = fileutils::FilePattern::FilePatternSegment; +}; + +using FilePatternSegment = FilePatternTestAccessor::FilePatternSegment; +using FilePattern = fileutils::FilePattern; + +#define REQUIRE_INCLUDE(val) REQUIRE((val == FilePatternSegment::MatchResult::INCLUDE)) +#define REQUIRE_EXCLUDE(val) REQUIRE((val == FilePatternSegment::MatchResult::EXCLUDE)) +#define REQUIRE_NOT_MATCHING(val) REQUIRE((val == FilePatternSegment::MatchResult::NOT_MATCHING)) + +#ifdef WIN32 +static std::filesystem::path root{"C:\\"}; +#else +static std::filesystem::path root{"/"}; +#endif + +TEST_CASE("Invalid paths") { + REQUIRE_THROWS(FilePatternSegment("")); + REQUIRE_THROWS(FilePatternSegment(".")); + REQUIRE_THROWS(FilePatternSegment("..")); + REQUIRE_THROWS(FilePatternSegment("!")); + REQUIRE_THROWS(FilePatternSegment("!.")); + REQUIRE_THROWS(FilePatternSegment("!..")); + // parent accessor after wildcard + FilePatternSegment("./../file.txt"); // sanity check + REQUIRE_THROWS(FilePatternSegment("./*/../file.txt")); +} + +TEST_CASE("FilePattern reports error in correct subpattern") { + std::vector> invalid_subpattern{ Review comment: changed ## File path: libminifi/src/core/extension/ExtensionManager.cpp ## @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "core/extension/ExtensionManager.h" +#include "core/logging/LoggerConfiguration.h" +#include "utils/file/FileUtils.h" +#include "core/extension/Executable.h" +#include "utils/file/FilePattern.h" +#include "core/extension/DynamicLibrary.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace extension { + +namespace { + +struct LibraryDescriptor { + std::string name; + std::filesystem::path dir; + std::string filename; + + bool verify(const std::shared_ptr& /*logger*/) const { +// TODO(adebreceni): check signature +return true; + } + + std::filesystem::path getFullPath() const { +return dir / filename; + } Review comment: added -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1138: MINIFICPP-1471 - Build extensions as dynamic libraries and dynamically load them
adamdebreceni commented on a change in pull request #1138: URL: https://github.com/apache/nifi-minifi-cpp/pull/1138#discussion_r705059132 ## File path: libminifi/include/core/logging/Logger.h ## @@ -26,6 +26,7 @@ #include #include #include +#include Review comment: remove ## File path: libminifi/include/utils/file/FilePattern.h ## @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "utils/OptionalUtils.h" +#include "core/logging/Logger.h" Review comment: removed ## File path: libminifi/src/core/extension/DynamicLibrary.cpp ## @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#ifndef WIN32 +#include +#define DLL_EXPORT +#else +#include +#define WIN32_LEAN_AND_MEAN 1 +#include // Windows specific libraries for collecting software metrics. +#include +#pragma comment(lib, "psapi.lib" ) +#define DLL_EXPORT __declspec(dllexport) +#define RTLD_LAZY 0 +#define RTLD_NOW0 + +#define RTLD_GLOBAL (1 << 1) +#define RTLD_LOCAL (1 << 2) +#endif + +#include "core/extension/DynamicLibrary.h" +#include "core/extension/Extension.h" +#include "utils/GeneralUtils.h" +#include "core/logging/LoggerConfiguration.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace extension { + +std::shared_ptr DynamicLibrary::logger_ = logging::LoggerFactory::getLogger(); + +DynamicLibrary::DynamicLibrary(std::string name, std::filesystem::path library_path) + : Module(std::move(name)), +library_path_(std::move(library_path)) { +} + +bool DynamicLibrary::load() { + dlerror(); + handle_ = dlopen(library_path_.string().c_str(), RTLD_NOW | RTLD_LOCAL); + if (!handle_) { +logger_->log_error("Failed to load extension '%s' at '%s': %s", name_, library_path_.string(), dlerror()); +return false; + } else { +logger_->log_trace("Loaded extension '%s' at '%s'", name_, library_path_.string()); +return true; + } +} + +bool DynamicLibrary::unload() { + logger_->log_trace("Unloading library '%s' at '%s'", name_, library_path_.string()); + if (!handle_) { +logger_->log_error("Extension does not have a handle_ '%s' at '%s'", name_, library_path_.string()); +return true; + } + dlerror(); + if (dlclose(handle_)) { +logger_->log_error("Failed to unload extension '%s' at '%': %s", name_, library_path_.string(), dlerror()); +return false; + } + logger_->log_trace("Unloaded extension '%s' at '%s'", name_, library_path_.string()); + handle_ = nullptr; + return true; +} + +DynamicLibrary::~DynamicLibrary() = default; + +#ifdef WIN32 + +void DynamicLibrary::store_error() { + auto error = GetLastError(); + + if (error == 0) { +error_str_ = ""; +return; + } + + current_error_ = std::system_category().message(error); +} + +void* DynamicLibrary::dlsym(void* handle, const char* name) { + FARPROC symbol; + + symbol = GetProcAddress((HMODULE)handle, name); + + if (symbol == nullptr) { +store_error(); + +for (auto hndl : resource_mapping_) { + symbol = GetProcAddress((HMODULE)hndl.first, name); + if (symbol != nullptr) { +break; + } +} + } + +#ifdef _MSC_VER +#pragma warning(suppress: 4054 ) +#endif + return reinterpret_cast(symbol); +} + +const char*
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1138: MINIFICPP-1471 - Build extensions as dynamic libraries and dynamically load them
adamdebreceni commented on a change in pull request #1138: URL: https://github.com/apache/nifi-minifi-cpp/pull/1138#discussion_r705041686 ## File path: libminifi/include/core/extension/DynamicLibrary.h ## @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "Module.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace extension { + +class DynamicLibrary : public Module { + friend class ExtensionManager; + + public: + DynamicLibrary(std::string name, std::filesystem::path library_path); + ~DynamicLibrary() override; + + private: +#ifdef WIN32 + std::map resource_mapping_; + + std::string error_str_; + std::string current_error_; + + void store_error(); + void* dlsym(void* handle, const char* name); + const char* dlerror(); + void* dlopen(const char* file, int mode); + int dlclose(void* handle); +#endif + + bool load(); + bool unload(); + + std::filesystem::path library_path_; + gsl::owner handle_ = nullptr; + + static std::shared_ptr logger_; Review comment: it seems it was never fixed, I made the ExtensionManager::logger_ const :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org