[jira] [Updated] (NIFI-4846) AvroTypeUtil to support more input types for logical decimal conversion
[ https://issues.apache.org/jira/browse/NIFI-4846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Koji Kawamura updated NIFI-4846: Status: Patch Available (was: In Progress) > AvroTypeUtil to support more input types for logical decimal conversion > --- > > Key: NIFI-4846 > URL: https://issues.apache.org/jira/browse/NIFI-4846 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Affects Versions: 1.3.0 >Reporter: Koji Kawamura >Assignee: Koji Kawamura >Priority: Minor > > Currently, only double and BigDecimal can be mapped to a logical decimal Avro > field. AvroTypeUtil should support String, Integer and Long as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi pull request #2451: NIFI-4846: AvroTypeUtil to support more input types...
GitHub user ijokarumawak opened a pull request: https://github.com/apache/nifi/pull/2451 NIFI-4846: AvroTypeUtil to support more input types for logical decimal conversion ## NOTE This PR is based on #2450 commit to add more capability on top of that. Please review #2450 first. Thank you. --- Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ijokarumawak/nifi nifi-4846 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2451.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2451 commit 342fcfa2b667b73890e2ed24d74d2ae84fb9acc8 Author: Koji KawamuraDate: 2018-02-06T04:52:48Z NIFI-4844: Adjust BigDecimal scale to the target Avro schema - Applied the same scale adjustment not only to BigDecimal inputs, but also to Double values. commit c4c5f8240fc7d004a780c9c1aa462755ef3d6507 Author: Koji Kawamura Date: 2018-02-06T07:36:15Z NIFI-4846: AvroTypeUtil to support more input types for logical decimal conversion ---
[jira] [Commented] (NIFI-4846) AvroTypeUtil to support more input types for logical decimal conversion
[ https://issues.apache.org/jira/browse/NIFI-4846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353512#comment-16353512 ] ASF GitHub Bot commented on NIFI-4846: -- GitHub user ijokarumawak opened a pull request: https://github.com/apache/nifi/pull/2451 NIFI-4846: AvroTypeUtil to support more input types for logical decimal conversion ## NOTE This PR is based on #2450 commit to add more capability on top of that. Please review #2450 first. Thank you. --- Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ijokarumawak/nifi nifi-4846 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2451.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2451 commit 342fcfa2b667b73890e2ed24d74d2ae84fb9acc8 Author: Koji KawamuraDate: 2018-02-06T04:52:48Z NIFI-4844: Adjust BigDecimal scale to the target Avro schema - Applied the same scale adjustment not only to BigDecimal inputs, but also to Double values. commit c4c5f8240fc7d004a780c9c1aa462755ef3d6507 Author: Koji Kawamura Date: 2018-02-06T07:36:15Z NIFI-4846: AvroTypeUtil to support more input types for logical decimal conversion > AvroTypeUtil to support more input types for logical decimal conversion > --- > > Key: NIFI-4846 > URL: https://issues.apache.org/jira/browse/NIFI-4846 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Affects Versions: 1.3.0 >Reporter: Koji Kawamura >Assignee: Koji Kawamura >Priority: Minor > > Currently, only double and BigDecimal can be mapped to a logical decimal Avro > field. AvroTypeUtil should support String, Integer and Long as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4845) Add JanusGraph put processor
[ https://issues.apache.org/jira/browse/NIFI-4845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353509#comment-16353509 ] Fred Liu commented on NIFI-4845: Hi Admin, Please assign this task to me so that I can begin working on the implementation. > Add JanusGraph put processor > > > Key: NIFI-4845 > URL: https://issues.apache.org/jira/browse/NIFI-4845 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Fred Liu >Priority: Major > > Create processor for Reading records from an incoming FlowFile using the > provided Record Reader, and writting those records to JanusGraph. And using a > JanusGraphControllerService is good. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (NIFI-4846) AvroTypeUtil to support more input types for logical decimal conversion
Koji Kawamura created NIFI-4846: --- Summary: AvroTypeUtil to support more input types for logical decimal conversion Key: NIFI-4846 URL: https://issues.apache.org/jira/browse/NIFI-4846 Project: Apache NiFi Issue Type: Improvement Components: Extensions Affects Versions: 1.3.0 Reporter: Koji Kawamura Assignee: Koji Kawamura Currently, only double and BigDecimal can be mapped to a logical decimal Avro field. AvroTypeUtil should support String, Integer and Long as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (NIFI-4845) Add JanusGraph put processor
Fred Liu created NIFI-4845: -- Summary: Add JanusGraph put processor Key: NIFI-4845 URL: https://issues.apache.org/jira/browse/NIFI-4845 Project: Apache NiFi Issue Type: New Feature Components: Extensions Reporter: Fred Liu Create processor for Reading records from an incoming FlowFile using the provided Record Reader, and writting those records to JanusGraph. And using a JanusGraphControllerService is good. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (NIFI-4844) AvroRecordSetWriter should be able to convert a double having less scale than intended target Avro schema instead of throwing an AvroTypeException
[ https://issues.apache.org/jira/browse/NIFI-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Koji Kawamura updated NIFI-4844: Status: Patch Available (was: In Progress) > AvroRecordSetWriter should be able to convert a double having less scale than > intended target Avro schema instead of throwing an AvroTypeException > -- > > Key: NIFI-4844 > URL: https://issues.apache.org/jira/browse/NIFI-4844 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Affects Versions: 1.3.0 >Reporter: Koji Kawamura >Assignee: Koji Kawamura >Priority: Major > > Current AvroTypeUtil conversion logic can throw AvroTypeException when it > maps double values into Avro decimal logical type fields if the double value > has less scale than the one defined at the target Avro decimal field schema. > For example, with following schema: > {code} > { > "type": "record", > "name": "logicalDecimalTest", > "fields": [ > {"name": "id", "type": "int"}, > {"name": "name", "type": "string"}, > { > "name": "price", > "type": { > "type": "bytes", > "logicalType": "decimal", > "precision": 18, > "scale": 8 > }}]} > {code} > And following CSV records: > {code} > id|name|price > 1|one|1.23 > 2|two|2.34 > {code} > Would produce this Exception: > {code} > 2018-02-06 09:57:27,461 ERROR [Timer-Driven Process Thread-7] > o.a.n.processors.standard.ConvertRecord > ConvertRecord[id=6897bc30-0161-1000-a8e7-9ce0ce8eb9ae] Failed to process > StandardFlowFileRecord[uuid=a97366a0-79bb-42ff-9023-c5d62ecfdbc5,claim=StandardContentClaim > [resourceClaim=StandardResourceClaim[id=1517878123416-2, container=default, > section=2], offset=5, length=48],offset=0,name=220105646548465,size=48]; will > route to failure: org.apache.avro.AvroTypeException: Cannot encode decimal > with scale 17 as scale 8 > org.apache.avro.AvroTypeException: Cannot encode decimal with scale 17 as > scale 8 > at > org.apache.avro.Conversions$DecimalConversion.toBytes(Conversions.java:86) > at > org.apache.nifi.avro.AvroTypeUtil.convertToAvroObject(AvroTypeUtil.java:546) > at > org.apache.nifi.avro.AvroTypeUtil.createAvroRecord(AvroTypeUtil.java:457) > at > org.apache.nifi.avro.WriteAvroResultWithExternalSchema.writeRecord(WriteAvroResultWithExternalSchema.java:76) > at > org.apache.nifi.serialization.AbstractRecordSetWriter.write(AbstractRecordSetWriter.java:59) > at > org.apache.nifi.processors.standard.AbstractRecordProcessor$1.process(AbstractRecordProcessor.java:122) > at > org.apache.nifi.controller.repository.StandardProcessSession.write(StandardProcessSession.java:2827) > at > org.apache.nifi.processors.standard.AbstractRecordProcessor.onTrigger(AbstractRecordProcessor.java:109) > at > org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) > at > org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1122) > at > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147) > at > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) > at > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > The same issue is reported in the Avro project, > [AVRO-1864|https://issues.apache.org/jira/browse/AVRO-1864]. The recommended > approach is to adjust the scale at NiFi side. Actually, for BigDecimal input > values, NiFi already does this, but not with double values. AvroTypeUtil > should do the same scale adjustment for double values, too. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4844) AvroRecordSetWriter should be able to convert a double having less scale than intended target Avro schema instead of throwing an AvroTypeException
[ https://issues.apache.org/jira/browse/NIFI-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353477#comment-16353477 ] ASF GitHub Bot commented on NIFI-4844: -- GitHub user ijokarumawak opened a pull request: https://github.com/apache/nifi/pull/2450 NIFI-4844: Adjust BigDecimal scale to the target Avro schema - Applied the same scale adjustment not only to BigDecimal inputs, but also to Double values. Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [x] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [x] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ijokarumawak/nifi nifi-4844 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2450.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2450 commit 342fcfa2b667b73890e2ed24d74d2ae84fb9acc8 Author: Koji KawamuraDate: 2018-02-06T04:52:48Z NIFI-4844: Adjust BigDecimal scale to the target Avro schema - Applied the same scale adjustment not only to BigDecimal inputs, but also to Double values. > AvroRecordSetWriter should be able to convert a double having less scale than > intended target Avro schema instead of throwing an AvroTypeException > -- > > Key: NIFI-4844 > URL: https://issues.apache.org/jira/browse/NIFI-4844 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Affects Versions: 1.3.0 >Reporter: Koji Kawamura >Assignee: Koji Kawamura >Priority: Major > > Current AvroTypeUtil conversion logic can throw AvroTypeException when it > maps double values into Avro decimal logical type fields if the double value > has less scale than the one defined at the target Avro decimal field schema. > For example, with following schema: > {code} > { > "type": "record", > "name": "logicalDecimalTest", > "fields": [ > {"name": "id", "type": "int"}, > {"name": "name", "type": "string"}, > { > "name": "price", > "type": { > "type": "bytes", > "logicalType": "decimal", > "precision": 18, > "scale": 8 > }}]} > {code} > And following CSV records: > {code} > id|name|price > 1|one|1.23 > 2|two|2.34 > {code} > Would produce this Exception: > {code} > 2018-02-06 09:57:27,461 ERROR [Timer-Driven Process Thread-7] > o.a.n.processors.standard.ConvertRecord > ConvertRecord[id=6897bc30-0161-1000-a8e7-9ce0ce8eb9ae] Failed to process > StandardFlowFileRecord[uuid=a97366a0-79bb-42ff-9023-c5d62ecfdbc5,claim=StandardContentClaim > [resourceClaim=StandardResourceClaim[id=1517878123416-2, container=default, > section=2], offset=5, length=48],offset=0,name=220105646548465,size=48]; will > route to failure: org.apache.avro.AvroTypeException: Cannot encode decimal > with scale 17 as scale 8 > org.apache.avro.AvroTypeException: Cannot encode decimal with
[GitHub] nifi pull request #2450: NIFI-4844: Adjust BigDecimal scale to the target Av...
GitHub user ijokarumawak opened a pull request: https://github.com/apache/nifi/pull/2450 NIFI-4844: Adjust BigDecimal scale to the target Avro schema - Applied the same scale adjustment not only to BigDecimal inputs, but also to Double values. Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [x] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [x] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ijokarumawak/nifi nifi-4844 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2450.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2450 commit 342fcfa2b667b73890e2ed24d74d2ae84fb9acc8 Author: Koji KawamuraDate: 2018-02-06T04:52:48Z NIFI-4844: Adjust BigDecimal scale to the target Avro schema - Applied the same scale adjustment not only to BigDecimal inputs, but also to Double values. ---
[jira] [Commented] (NIFI-4540) Support for AWS SQS FIFO message queue
[ https://issues.apache.org/jira/browse/NIFI-4540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353365#comment-16353365 ] James Wing commented on NIFI-4540: -- [~SunSatION], thanks for submitting this patch, I agree FIFO queue support is a feature gap we would love to fill. I did a quick pass on applying it, compiling, running, and testing. I ran into compiler errors for missing imports Collection and SendMessageBatchResult. The code as submitted appears to work OK only with FIFO queues that have Content-Based Deduplication enabled. Otherwise, an exception is thrown about the absence of a deduplication id in the request: {quote}2018-02-06 05:08:30,884 ERROR [Timer-Driven Process Thread-1] o.apache.nifi.processors.aws.sqs.PutSQS PutSQS[id=697edb3a-0161-1000-71aa-22cc71a0e96d] Failed to send messages to Amazon SQS due to com.amazonaws.services.sqs.model.AmazonSQSException: The queue should either have ContentBasedDeduplication enabled or MessageDeduplicationId provided explicitly (Service: AmazonSQS; Status Code: 400; Error Code: InvalidParameterValue; Request ID: 4be71d3c-3abd-5f2e-b8a8-4fcf00332e2b); routing to failure: {} {quote} * Would you be willing to add Deduplication Id? If not, would you please update the processor documentation annotations to not the requirement of a Content-Based Deduplication queue? * Would you be willing to update a unit test to cover the FIFO case? > Support for AWS SQS FIFO message queue > -- > > Key: NIFI-4540 > URL: https://issues.apache.org/jira/browse/NIFI-4540 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Affects Versions: 1.4.0 >Reporter: Dorian Bugeja >Priority: Major > Labels: easyfix, features, newbie > Attachments: NIFI_4540___Support_for_AWS_SQS_FIFO_message_queue.patch > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (NIFI-4844) AvroRecordSetWriter should be able to convert a double having less scale than intended target Avro schema instead of throwing an AvroTypeException
Koji Kawamura created NIFI-4844: --- Summary: AvroRecordSetWriter should be able to convert a double having less scale than intended target Avro schema instead of throwing an AvroTypeException Key: NIFI-4844 URL: https://issues.apache.org/jira/browse/NIFI-4844 Project: Apache NiFi Issue Type: Bug Components: Extensions Affects Versions: 1.3.0 Reporter: Koji Kawamura Assignee: Koji Kawamura Current AvroTypeUtil conversion logic can throw AvroTypeException when it maps double values into Avro decimal logical type fields if the double value has less scale than the one defined at the target Avro decimal field schema. For example, with following schema: {code} { "type": "record", "name": "logicalDecimalTest", "fields": [ {"name": "id", "type": "int"}, {"name": "name", "type": "string"}, { "name": "price", "type": { "type": "bytes", "logicalType": "decimal", "precision": 18, "scale": 8 }}]} {code} And following CSV records: {code} id|name|price 1|one|1.23 2|two|2.34 {code} Would produce this Exception: {code} 2018-02-06 09:57:27,461 ERROR [Timer-Driven Process Thread-7] o.a.n.processors.standard.ConvertRecord ConvertRecord[id=6897bc30-0161-1000-a8e7-9ce0ce8eb9ae] Failed to process StandardFlowFileRecord[uuid=a97366a0-79bb-42ff-9023-c5d62ecfdbc5,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1517878123416-2, container=default, section=2], offset=5, length=48],offset=0,name=220105646548465,size=48]; will route to failure: org.apache.avro.AvroTypeException: Cannot encode decimal with scale 17 as scale 8 org.apache.avro.AvroTypeException: Cannot encode decimal with scale 17 as scale 8 at org.apache.avro.Conversions$DecimalConversion.toBytes(Conversions.java:86) at org.apache.nifi.avro.AvroTypeUtil.convertToAvroObject(AvroTypeUtil.java:546) at org.apache.nifi.avro.AvroTypeUtil.createAvroRecord(AvroTypeUtil.java:457) at org.apache.nifi.avro.WriteAvroResultWithExternalSchema.writeRecord(WriteAvroResultWithExternalSchema.java:76) at org.apache.nifi.serialization.AbstractRecordSetWriter.write(AbstractRecordSetWriter.java:59) at org.apache.nifi.processors.standard.AbstractRecordProcessor$1.process(AbstractRecordProcessor.java:122) at org.apache.nifi.controller.repository.StandardProcessSession.write(StandardProcessSession.java:2827) at org.apache.nifi.processors.standard.AbstractRecordProcessor.onTrigger(AbstractRecordProcessor.java:109) at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1122) at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147) at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) {code} The same issue is reported in the Avro project, [AVRO-1864|https://issues.apache.org/jira/browse/AVRO-1864]. The recommended approach is to adjust the scale at NiFi side. Actually, for BigDecimal input values, NiFi already does this, but not with double values. AvroTypeUtil should do the same scale adjustment for double values, too. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (NIFI-4837) Thread leak on HandleHTTPRequest processor
[ https://issues.apache.org/jira/browse/NIFI-4837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joseph Percivall updated NIFI-4837: --- Description: When you have multiple HandleHTTPRequest processors trying to listen on the same port, for every Listen attempt NiFi builds a new thread and never recycles the old thread which eventually leads to NiFi shutting down when reaching the OS limit of the number of threads (default is 10.000). The following error can be seen in nifi-app.log: Caused by: java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:714) This has happened before with version 1.2 and probably even with older versions. but I could also replicate the issue with the latest 1.5 version. Steps to replicate the issue: 1) build a simple flow with 2 HandleHTTPRequest processors listening on the same port. !image-2018-02-02-11-14-51-964.png! 2) Start the processors. — The second HandleHTTPRequest processor starts logging following as expected: 2018-02-02 16:18:29,518 ERROR [Timer-Driven Process Thread-3] o.a.n.p.standard.HandleHttpRequest HandleHttpRequest[id=af013c62-b26f-1eeb-ae81-8423c70bdc7f] Failed to process session due to org.apache.nifi.processor.exception.ProcessException: Failed to initialize the server: {} org.apache.nifi.processor.exception.ProcessException: Failed to initialize the server Caused by: java.net.BindException: Address already in use ... ... 12 common frames omitted 3) Go to the Summary section in NiFi and watch the number of threads going up to 9959. !image-2018-02-02-11-16-52-389.png! With above, I had processors scheduled on primary node only as to not affect every node. If you stop the second HandleHTTPRequest processor the threads stop climbing, but are not released. After this, NiFi will soon stop. A restart of NIFi is required to release these threads. was: When you have multiple HandleHTTPRequest processors trying to listen on the same port, for every Listen attempt NiFi builds a new thread and never recycles the old thread which eventually leads to NiFi shutting down when reaching the OS limit of the number of threads (default is 10.000). The following error can be seen in nifi-app.log: Caused by: java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:714) This has happened before with HDF 3.0.1 and probably even with older versions. but I could also replicate the issue with the latest HDF 3.1. Steps to replicate the issue: 1) build a simple flow with 2 HandleHTTPRequest processors listening on the same port. !image-2018-02-02-11-14-51-964.png! 2) Start the processors. — The second HandleHTTPRequest processor starts logging following as expected: 2018-02-02 16:18:29,518 ERROR [Timer-Driven Process Thread-3] o.a.n.p.standard.HandleHttpRequest HandleHttpRequest[id=af013c62-b26f-1eeb-ae81-8423c70bdc7f] Failed to process session due to org.apache.nifi.processor.exception.ProcessException: Failed to initialize the server: {} org.apache.nifi.processor.exception.ProcessException: Failed to initialize the server Caused by: java.net.BindException: Address already in use ... ... 12 common frames omitted 3) Go to the Summary section in NiFi and watch the number of threads going up to 9959. !image-2018-02-02-11-16-52-389.png! With above, I had processors scheduled on primary node only as to not affect every node. If you stop the second HandleHTTPRequest processor the threads stop climbing, but are not released. After this, NiFi will soon stop. A restart of NIFi is required to release these threads. > Thread leak on HandleHTTPRequest processor > -- > > Key: NIFI-4837 > URL: https://issues.apache.org/jira/browse/NIFI-4837 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.2.0, 1.3.0, 1.4.0, 1.5.0 > Environment: CENTOS 7 >Reporter: Matthew Clarke >Priority: Blocker > Attachments: image-2018-02-02-11-14-51-964.png, > image-2018-02-02-11-16-52-389.png > > > When you have multiple HandleHTTPRequest processors trying to listen on the > same port, for every Listen attempt NiFi builds a new thread and never > recycles the old thread which eventually leads to NiFi shutting down when > reaching the OS limit of the number of threads (default is 10.000). > The following error can be seen in nifi-app.log: > Caused by: java.lang.OutOfMemoryError: unable to create new native thread > at java.lang.Thread.start0(Native Method) > at java.lang.Thread.start(Thread.java:714) > This has happened before with version 1.2 and probably even with older
[jira] [Commented] (NIFI-4561) ExecuteSQL Stopped Returning FlowFile for non-ResultSet Queries
[ https://issues.apache.org/jira/browse/NIFI-4561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353322#comment-16353322 ] ASF GitHub Bot commented on NIFI-4561: -- Github user patricker commented on the issue: https://github.com/apache/nifi/pull/2243 Updated > ExecuteSQL Stopped Returning FlowFile for non-ResultSet Queries > --- > > Key: NIFI-4561 > URL: https://issues.apache.org/jira/browse/NIFI-4561 > Project: Apache NiFi > Issue Type: Bug >Reporter: Peter Wicks >Assignee: Peter Wicks >Priority: Major > > While most people use ExecuteSQL for Select statements, some JDBC drivers > allow you to execute any kind of statement, including multi-statement > requests. > This allowed users to submit multiple SQL statements in one JDBC Statement > and get back multiple result sets. This was part of the reason I wrote > [NIFI-3432]. > After having NIFI-3432 merged, I found that some request types no longer > cause a FlowFile to be generated because there is no ResultSet. Also, if > request types are mixed, such as an insert followed by a Select, then no > ResultSet is returned because the first result is not a result set but an > Update Count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi issue #2243: NIFI-4561 ExecuteSQL returns no FlowFile for some queries
Github user patricker commented on the issue: https://github.com/apache/nifi/pull/2243 Updated ---
[jira] [Commented] (NIFI-4561) ExecuteSQL Stopped Returning FlowFile for non-ResultSet Queries
[ https://issues.apache.org/jira/browse/NIFI-4561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353319#comment-16353319 ] ASF GitHub Bot commented on NIFI-4561: -- Github user patricker commented on a diff in the pull request: https://github.com/apache/nifi/pull/2243#discussion_r166180285 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java --- @@ -202,56 +205,63 @@ public void process(InputStream in) throws IOException { st.setQueryTimeout(queryTimeout); // timeout in seconds logger.debug("Executing query {}", new Object[]{selectQuery}); -boolean results = st.execute(selectQuery); +boolean hasResults = st.execute(selectQuery); +boolean hasUpdateCount = st.getUpdateCount() != -1; --- End diff -- I haven't run into any issues with calling `execute` instead of `executeUpdate`. I did some additional research and found the following concerning `execute`: https://stackoverflow.com/a/16625802/328968 > "*Executes the given SQL statement, which may return multiple results. In some (uncommon) situations, a single SQL statement may return multiple result sets and/or update counts. Normally you can ignore this unless you are (1) executing a stored procedure that you know may return multiple results or (2) you are dynamically executing an unknown SQL string.*" Or the answer after that which provides more details on each type of call, `execute`, `executeUpdate`, and `executeQuery`: https://stackoverflow.com/a/37509744/328968. > ExecuteSQL Stopped Returning FlowFile for non-ResultSet Queries > --- > > Key: NIFI-4561 > URL: https://issues.apache.org/jira/browse/NIFI-4561 > Project: Apache NiFi > Issue Type: Bug >Reporter: Peter Wicks >Assignee: Peter Wicks >Priority: Major > > While most people use ExecuteSQL for Select statements, some JDBC drivers > allow you to execute any kind of statement, including multi-statement > requests. > This allowed users to submit multiple SQL statements in one JDBC Statement > and get back multiple result sets. This was part of the reason I wrote > [NIFI-3432]. > After having NIFI-3432 merged, I found that some request types no longer > cause a FlowFile to be generated because there is no ResultSet. Also, if > request types are mixed, such as an insert followed by a Select, then no > ResultSet is returned because the first result is not a result set but an > Update Count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi pull request #2243: NIFI-4561 ExecuteSQL returns no FlowFile for some q...
Github user patricker commented on a diff in the pull request: https://github.com/apache/nifi/pull/2243#discussion_r166180285 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java --- @@ -202,56 +205,63 @@ public void process(InputStream in) throws IOException { st.setQueryTimeout(queryTimeout); // timeout in seconds logger.debug("Executing query {}", new Object[]{selectQuery}); -boolean results = st.execute(selectQuery); +boolean hasResults = st.execute(selectQuery); +boolean hasUpdateCount = st.getUpdateCount() != -1; --- End diff -- I haven't run into any issues with calling `execute` instead of `executeUpdate`. I did some additional research and found the following concerning `execute`: https://stackoverflow.com/a/16625802/328968 > "*Executes the given SQL statement, which may return multiple results. In some (uncommon) situations, a single SQL statement may return multiple result sets and/or update counts. Normally you can ignore this unless you are (1) executing a stored procedure that you know may return multiple results or (2) you are dynamically executing an unknown SQL string.*" Or the answer after that which provides more details on each type of call, `execute`, `executeUpdate`, and `executeQuery`: https://stackoverflow.com/a/37509744/328968. ---
[GitHub] nifi pull request #2243: NIFI-4561 ExecuteSQL returns no FlowFile for some q...
Github user patricker commented on a diff in the pull request: https://github.com/apache/nifi/pull/2243#discussion_r166179486 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java --- @@ -268,8 +278,23 @@ public void process(OutputStream out) throws IOException { } }); +fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY); session.transfer(fileToProcess, REL_SUCCESS); } +} else if(resultCount == 0){ +//If we had no inbound FlowFile, no exceptions, and the SQL generated no result sets (Insert/Update/Delete statements only) --- End diff -- Prior to my previous change (https://github.com/apache/nifi/pull/1471) only a single flowfile could be returned. The way the code was written, if no incoming flowfile existed then `session.create()` was called. The result, whether it be a large resultset or no result at all, was written using a `session.write` call. Thus a flowfile would always be returned. ---
[jira] [Commented] (NIFI-4561) ExecuteSQL Stopped Returning FlowFile for non-ResultSet Queries
[ https://issues.apache.org/jira/browse/NIFI-4561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353313#comment-16353313 ] ASF GitHub Bot commented on NIFI-4561: -- Github user patricker commented on a diff in the pull request: https://github.com/apache/nifi/pull/2243#discussion_r166179486 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java --- @@ -268,8 +278,23 @@ public void process(OutputStream out) throws IOException { } }); +fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY); session.transfer(fileToProcess, REL_SUCCESS); } +} else if(resultCount == 0){ +//If we had no inbound FlowFile, no exceptions, and the SQL generated no result sets (Insert/Update/Delete statements only) --- End diff -- Prior to my previous change (https://github.com/apache/nifi/pull/1471) only a single flowfile could be returned. The way the code was written, if no incoming flowfile existed then `session.create()` was called. The result, whether it be a large resultset or no result at all, was written using a `session.write` call. Thus a flowfile would always be returned. > ExecuteSQL Stopped Returning FlowFile for non-ResultSet Queries > --- > > Key: NIFI-4561 > URL: https://issues.apache.org/jira/browse/NIFI-4561 > Project: Apache NiFi > Issue Type: Bug >Reporter: Peter Wicks >Assignee: Peter Wicks >Priority: Major > > While most people use ExecuteSQL for Select statements, some JDBC drivers > allow you to execute any kind of statement, including multi-statement > requests. > This allowed users to submit multiple SQL statements in one JDBC Statement > and get back multiple result sets. This was part of the reason I wrote > [NIFI-3432]. > After having NIFI-3432 merged, I found that some request types no longer > cause a FlowFile to be generated because there is no ResultSet. Also, if > request types are mixed, such as an insert followed by a Select, then no > ResultSet is returned because the first result is not a result set but an > Update Count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi issue #2231: NIFI-4521 MS SQL CDC Processor
Github user patricker commented on the issue: https://github.com/apache/nifi/pull/2231 @mattyb149 Code updated. ---
[jira] [Commented] (NIFI-4521) MS SQL CDC Processor
[ https://issues.apache.org/jira/browse/NIFI-4521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353262#comment-16353262 ] ASF GitHub Bot commented on NIFI-4521: -- Github user patricker commented on the issue: https://github.com/apache/nifi/pull/2231 @mattyb149 Code updated. > MS SQL CDC Processor > > > Key: NIFI-4521 > URL: https://issues.apache.org/jira/browse/NIFI-4521 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Peter Wicks >Assignee: Peter Wicks >Priority: Major > > Creation of a new processor that reads Change Data Capture details from > Microsoft SQL Server and outputs the changes a Records. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4521) MS SQL CDC Processor
[ https://issues.apache.org/jira/browse/NIFI-4521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353261#comment-16353261 ] ASF GitHub Bot commented on NIFI-4521: -- Github user patricker commented on a diff in the pull request: https://github.com/apache/nifi/pull/2231#discussion_r166174215 --- Diff: nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java --- @@ -175,10 +175,12 @@ private static DataType getDataType(final int sqlType, final ResultSet rs, final return RecordFieldType.RECORD.getDataType(); } -final String columnName = rs.getMetaData().getColumnName(columnIndex); --- End diff -- @markap14 In https://github.com/apache/nifi/pull/2386 you added a `readerSchema` to the `ResultSetRecordSet` constructor. I was working with this class and do not need this functionality. I've updated the code so that a null `readerSchema` can be passed, as in my case there is no record reader, just a record writer. Let me know if you have any concerns. I ran the unit tests for `QueryRecord` and they all ran without failure. > MS SQL CDC Processor > > > Key: NIFI-4521 > URL: https://issues.apache.org/jira/browse/NIFI-4521 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Peter Wicks >Assignee: Peter Wicks >Priority: Major > > Creation of a new processor that reads Change Data Capture details from > Microsoft SQL Server and outputs the changes a Records. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi pull request #2231: NIFI-4521 MS SQL CDC Processor
Github user patricker commented on a diff in the pull request: https://github.com/apache/nifi/pull/2231#discussion_r166174215 --- Diff: nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java --- @@ -175,10 +175,12 @@ private static DataType getDataType(final int sqlType, final ResultSet rs, final return RecordFieldType.RECORD.getDataType(); } -final String columnName = rs.getMetaData().getColumnName(columnIndex); --- End diff -- @markap14 In https://github.com/apache/nifi/pull/2386 you added a `readerSchema` to the `ResultSetRecordSet` constructor. I was working with this class and do not need this functionality. I've updated the code so that a null `readerSchema` can be passed, as in my case there is no record reader, just a record writer. Let me know if you have any concerns. I ran the unit tests for `QueryRecord` and they all ran without failure. ---
[jira] [Commented] (NIFI-4521) MS SQL CDC Processor
[ https://issues.apache.org/jira/browse/NIFI-4521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353235#comment-16353235 ] ASF GitHub Bot commented on NIFI-4521: -- Github user patricker commented on a diff in the pull request: https://github.com/apache/nifi/pull/2231#discussion_r166171052 --- Diff: nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/test/java/org/apache/nifi/cdc/mssql/CaptureChangeMSSQLTest.java --- @@ -0,0 +1,792 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mssql; + +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.cdc.event.ColumnDefinition; +import org.apache.nifi.cdc.mssql.event.MSSQLColumnDefinition; +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo; +import org.apache.nifi.cdc.mssql.processors.CaptureChangeMSSQL; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.record.MockRecordWriter; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.nifi.util.file.FileUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.SQLNonTransientConnectionException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class CaptureChangeMSSQLTest { + +private TestRunner runner; +private MockCaptureChangeMSSQL processor; +private final static String DB_LOCATION = "target/db_qdt"; + + +@BeforeClass +public static void setupBeforeClass() throws IOException, SQLException { +System.setProperty("derby.stream.error.file", "target/derby.log"); + +// remove previous test database, if any +final File dbLocation = new File(DB_LOCATION); +try { +FileUtils.deleteFile(dbLocation, true); +} catch (IOException ioe) { +// Do nothing, may not have existed +} + +// load CDC schema to database +final DBCPService dbcp = new DBCPServiceSimpleImpl(); +final Connection con = dbcp.getConnection(); +Statement stmt = con.createStatement(); + +stmt.execute("CREATE TABLE cdc.change_tables(\n" + +"object_id int,\n" + +//These four columns are computed from object_id/source_object_id in MS SQL, but for testing we put them as strings +"schemaName varchar(128),\n" + +"tableName varchar(128),\n" + +"sourceSchemaName varchar(128),\n" + +"sourceTableName varchar(128),\n" + + +"version int,\n" + +"capture_instance varchar(128),\n" + +"start_lsn int,\n" + +"end_lsn int,\n" + +"supports_net_changes BOOLEAN,\n" + +"has_drop_pending BOOLEAN,\n" + +"role_name varchar(128),\n" + +"index_name varchar(128),\n" + +"filegroup_name varchar(128),\n" + +"create_date TIMESTAMP,\n" + +"partition_switch BOOLEAN)"); + +
[GitHub] nifi pull request #2231: NIFI-4521 MS SQL CDC Processor
Github user patricker commented on a diff in the pull request: https://github.com/apache/nifi/pull/2231#discussion_r166171052 --- Diff: nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/test/java/org/apache/nifi/cdc/mssql/CaptureChangeMSSQLTest.java --- @@ -0,0 +1,792 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mssql; + +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.cdc.event.ColumnDefinition; +import org.apache.nifi.cdc.mssql.event.MSSQLColumnDefinition; +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo; +import org.apache.nifi.cdc.mssql.processors.CaptureChangeMSSQL; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.record.MockRecordWriter; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.nifi.util.file.FileUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.SQLNonTransientConnectionException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class CaptureChangeMSSQLTest { + +private TestRunner runner; +private MockCaptureChangeMSSQL processor; +private final static String DB_LOCATION = "target/db_qdt"; + + +@BeforeClass +public static void setupBeforeClass() throws IOException, SQLException { +System.setProperty("derby.stream.error.file", "target/derby.log"); + +// remove previous test database, if any +final File dbLocation = new File(DB_LOCATION); +try { +FileUtils.deleteFile(dbLocation, true); +} catch (IOException ioe) { +// Do nothing, may not have existed +} + +// load CDC schema to database +final DBCPService dbcp = new DBCPServiceSimpleImpl(); +final Connection con = dbcp.getConnection(); +Statement stmt = con.createStatement(); + +stmt.execute("CREATE TABLE cdc.change_tables(\n" + +"object_id int,\n" + +//These four columns are computed from object_id/source_object_id in MS SQL, but for testing we put them as strings +"schemaName varchar(128),\n" + +"tableName varchar(128),\n" + +"sourceSchemaName varchar(128),\n" + +"sourceTableName varchar(128),\n" + + +"version int,\n" + +"capture_instance varchar(128),\n" + +"start_lsn int,\n" + +"end_lsn int,\n" + +"supports_net_changes BOOLEAN,\n" + +"has_drop_pending BOOLEAN,\n" + +"role_name varchar(128),\n" + +"index_name varchar(128),\n" + +"filegroup_name varchar(128),\n" + +"create_date TIMESTAMP,\n" + +"partition_switch BOOLEAN)"); + +stmt.execute("CREATE TABLE cdc.lsn_time_mapping(\n" + +"start_lsn int,\n" + +"tran_begin_time TIMESTAMP,\n" + +"tran_end_time TIMESTAMP,\n" + +"tran_id
[jira] [Commented] (NIFI-4521) MS SQL CDC Processor
[ https://issues.apache.org/jira/browse/NIFI-4521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353231#comment-16353231 ] ASF GitHub Bot commented on NIFI-4521: -- Github user patricker commented on a diff in the pull request: https://github.com/apache/nifi/pull/2231#discussion_r166170695 --- Diff: nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java --- @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mssql.processors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.cdc.CDCException; +import org.apache.nifi.cdc.mssql.MSSQLCDCUtils; +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo; +import org.apache.nifi.cdc.mssql.event.TableCapturePlan; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.ResultSetRecordSet; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +@TriggerSerially +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags({"sql", "jdbc", "cdc", "mssql"}) +@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a Microsoft SQL database. CDC Events include INSERT, UPDATE, DELETE operations. Events " ++ "for each table are output as Record Sets, ordered by the time, and sequence, at which the operation occurred.") +@Stateful(scopes = Scope.CLUSTER, description = "Information including the timestamp of the last CDC event per table in the database is
[jira] [Commented] (NIFI-4521) MS SQL CDC Processor
[ https://issues.apache.org/jira/browse/NIFI-4521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353228#comment-16353228 ] ASF GitHub Bot commented on NIFI-4521: -- Github user patricker commented on a diff in the pull request: https://github.com/apache/nifi/pull/2231#discussion_r166169705 --- Diff: nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java --- @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mssql.processors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.cdc.CDCException; +import org.apache.nifi.cdc.mssql.MSSQLCDCUtils; +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo; +import org.apache.nifi.cdc.mssql.event.TableCapturePlan; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.ResultSetRecordSet; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +@TriggerSerially +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags({"sql", "jdbc", "cdc", "mssql"}) +@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a Microsoft SQL database. CDC Events include INSERT, UPDATE, DELETE operations. Events " ++ "for each table are output as Record Sets, ordered by the time, and sequence, at which the operation occurred.") +@Stateful(scopes = Scope.CLUSTER, description = "Information including the timestamp of the last CDC event per table in the database is
[GitHub] nifi pull request #2231: NIFI-4521 MS SQL CDC Processor
Github user patricker commented on a diff in the pull request: https://github.com/apache/nifi/pull/2231#discussion_r166169705 --- Diff: nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java --- @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mssql.processors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.cdc.CDCException; +import org.apache.nifi.cdc.mssql.MSSQLCDCUtils; +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo; +import org.apache.nifi.cdc.mssql.event.TableCapturePlan; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.ResultSetRecordSet; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +@TriggerSerially +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags({"sql", "jdbc", "cdc", "mssql"}) +@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a Microsoft SQL database. CDC Events include INSERT, UPDATE, DELETE operations. Events " ++ "for each table are output as Record Sets, ordered by the time, and sequence, at which the operation occurred.") +@Stateful(scopes = Scope.CLUSTER, description = "Information including the timestamp of the last CDC event per table in the database is stored by this processor, so " ++ "that it can continue from the same point in time if restarted.") +@WritesAttributes({ +@WritesAttribute(attribute = "tablename", description="Name of the table this
[jira] [Commented] (NIFI-4521) MS SQL CDC Processor
[ https://issues.apache.org/jira/browse/NIFI-4521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353224#comment-16353224 ] ASF GitHub Bot commented on NIFI-4521: -- Github user patricker commented on a diff in the pull request: https://github.com/apache/nifi/pull/2231#discussion_r166169349 --- Diff: nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/MSSQLCDCUtils.java --- @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mssql; + +import org.apache.nifi.cdc.CDCException; +import org.apache.nifi.cdc.event.ColumnDefinition; +import org.apache.nifi.cdc.mssql.event.MSSQLColumnDefinition; +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo; + +import java.sql.Connection; +import java.sql.Timestamp; +import java.sql.Types; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; + +public class MSSQLCDCUtils { +private static final String _columnSplit = "\n,"; + +final String LIST_CHANGE_TRACKING_TABLES_SQL = "SELECT object_id,\n" + +" DB_NAME() AS [databaseName], \n" + +" SCHEMA_NAME(OBJECTPROPERTY(object_id, 'SchemaId')) AS [schemaName], \n" + +" OBJECT_NAME(object_id) AS [tableName], \n" + +" SCHEMA_NAME(OBJECTPROPERTY(source_object_id, 'SchemaId')) AS [sourceSchemaName],\n" + +" OBJECT_NAME(source_object_id) AS [sourceTableName] \n" + +"FROM [cdc].[change_tables]"; + +final String LIST_TABLE_COLUMNS = "select cc.object_id\n" + +",cc.column_name\n" + +",cc.column_id\n" + +",cc.column_type\n" + +",cc.column_ordinal\n" + +",CASE WHEN ic.object_id IS NULL THEN 0 ELSE 1 END \"key\"\n" + +"FROM cdc.captured_columns cc\n" + +"LEFT OUTER JOIN cdc.index_columns ic ON \n" + +"(ic.object_id = cc.object_id AND ic.column_name = cc.column_name)\n" + +"where cc.object_id=?\n" + +"ORDER BY cc.column_ordinal"; + +public String getLIST_CHANGE_TRACKING_TABLES_SQL(){ +return LIST_CHANGE_TRACKING_TABLES_SQL; +} + +public String getLIST_TABLE_COLUMNS(){ +return LIST_TABLE_COLUMNS; +} + +public String getCURRENT_TIMESTAMP(){ +return "CURRENT_TIMESTAMP"; +} + +public List getCDCTableList(Connection con) throws SQLException, CDCException { +ArrayList cdcTables = new ArrayList<>(); + +try(final Statement st = con.createStatement()){ +final ResultSet resultSet = st.executeQuery(getLIST_CHANGE_TRACKING_TABLES_SQL()); + +while (resultSet.next()) { +int objectId = resultSet.getInt("object_id"); +String databaseName = resultSet.getString("databaseName"); +String schemaName = resultSet.getString("schemaName"); +String tableName = resultSet.getString("tableName"); +String sourceSchemaName = resultSet.getString("sourceSchemaName"); +String sourceTableName = resultSet.getString("sourceTableName"); + +MSSQLTableInfo ti = new MSSQLTableInfo(databaseName, schemaName, tableName, sourceSchemaName, sourceTableName, Integer.toUnsignedLong(objectId), null); +cdcTables.add(ti); +} + +for (MSSQLTableInfo ti:cdcTables) { +List tableColums = getCaptureColumns(con, ti.getTableId()); + +ti.setColumns(tableColums); +} +} + +return cdcTables; +} + +public List getCaptureColumns(Connection
[jira] [Commented] (NIFI-4521) MS SQL CDC Processor
[ https://issues.apache.org/jira/browse/NIFI-4521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353225#comment-16353225 ] ASF GitHub Bot commented on NIFI-4521: -- Github user patricker commented on a diff in the pull request: https://github.com/apache/nifi/pull/2231#discussion_r166169358 --- Diff: nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/event/TableCapturePlan.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mssql.event; + +import org.apache.nifi.cdc.mssql.MSSQLCDCUtils; +import org.apache.nifi.util.StringUtils; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; + +public class TableCapturePlan { + +public enum PlanTypes{ +CDC, +SNAPSHOT +} + +public MSSQLTableInfo getTable() { +return table; +} + +public int getRowLimit() { +return rowLimit; +} + +public boolean getCaptureBaseline() { +return captureBaseline; +} + +public Timestamp getMaxTime() { +return maxTime; +} + +public PlanTypes getPlanType() { +return planType; +} + +private MSSQLTableInfo table; +private int rowLimit; +private boolean captureBaseline; +private Timestamp maxTime; + +private PlanTypes planType; + +public TableCapturePlan(MSSQLTableInfo table, int rowLimit, boolean captureBaseline, String sTime){ +this.table = table; + +this.rowLimit = rowLimit; +this.captureBaseline = captureBaseline; + +if(!StringUtils.isEmpty(sTime)){ +this.maxTime = Timestamp.valueOf(sTime); +} +} + +public void ComputeCapturePlan(Connection con, MSSQLCDCUtils mssqlcdcUtils) throws SQLException { --- End diff -- Agreed. > MS SQL CDC Processor > > > Key: NIFI-4521 > URL: https://issues.apache.org/jira/browse/NIFI-4521 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Peter Wicks >Assignee: Peter Wicks >Priority: Major > > Creation of a new processor that reads Change Data Capture details from > Microsoft SQL Server and outputs the changes a Records. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi pull request #2231: NIFI-4521 MS SQL CDC Processor
Github user patricker commented on a diff in the pull request: https://github.com/apache/nifi/pull/2231#discussion_r166169349 --- Diff: nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/MSSQLCDCUtils.java --- @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mssql; + +import org.apache.nifi.cdc.CDCException; +import org.apache.nifi.cdc.event.ColumnDefinition; +import org.apache.nifi.cdc.mssql.event.MSSQLColumnDefinition; +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo; + +import java.sql.Connection; +import java.sql.Timestamp; +import java.sql.Types; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; + +public class MSSQLCDCUtils { +private static final String _columnSplit = "\n,"; + +final String LIST_CHANGE_TRACKING_TABLES_SQL = "SELECT object_id,\n" + +" DB_NAME() AS [databaseName], \n" + +" SCHEMA_NAME(OBJECTPROPERTY(object_id, 'SchemaId')) AS [schemaName], \n" + +" OBJECT_NAME(object_id) AS [tableName], \n" + +" SCHEMA_NAME(OBJECTPROPERTY(source_object_id, 'SchemaId')) AS [sourceSchemaName],\n" + +" OBJECT_NAME(source_object_id) AS [sourceTableName] \n" + +"FROM [cdc].[change_tables]"; + +final String LIST_TABLE_COLUMNS = "select cc.object_id\n" + +",cc.column_name\n" + +",cc.column_id\n" + +",cc.column_type\n" + +",cc.column_ordinal\n" + +",CASE WHEN ic.object_id IS NULL THEN 0 ELSE 1 END \"key\"\n" + +"FROM cdc.captured_columns cc\n" + +"LEFT OUTER JOIN cdc.index_columns ic ON \n" + +"(ic.object_id = cc.object_id AND ic.column_name = cc.column_name)\n" + +"where cc.object_id=?\n" + +"ORDER BY cc.column_ordinal"; + +public String getLIST_CHANGE_TRACKING_TABLES_SQL(){ +return LIST_CHANGE_TRACKING_TABLES_SQL; +} + +public String getLIST_TABLE_COLUMNS(){ +return LIST_TABLE_COLUMNS; +} + +public String getCURRENT_TIMESTAMP(){ +return "CURRENT_TIMESTAMP"; +} + +public List getCDCTableList(Connection con) throws SQLException, CDCException { +ArrayList cdcTables = new ArrayList<>(); + +try(final Statement st = con.createStatement()){ +final ResultSet resultSet = st.executeQuery(getLIST_CHANGE_TRACKING_TABLES_SQL()); + +while (resultSet.next()) { +int objectId = resultSet.getInt("object_id"); +String databaseName = resultSet.getString("databaseName"); +String schemaName = resultSet.getString("schemaName"); +String tableName = resultSet.getString("tableName"); +String sourceSchemaName = resultSet.getString("sourceSchemaName"); +String sourceTableName = resultSet.getString("sourceTableName"); + +MSSQLTableInfo ti = new MSSQLTableInfo(databaseName, schemaName, tableName, sourceSchemaName, sourceTableName, Integer.toUnsignedLong(objectId), null); +cdcTables.add(ti); +} + +for (MSSQLTableInfo ti:cdcTables) { +List tableColums = getCaptureColumns(con, ti.getTableId()); + +ti.setColumns(tableColums); +} +} + +return cdcTables; +} + +public List getCaptureColumns(Connection con, long objectId) throws SQLException, CDCException { +ArrayList tableColumns = new ArrayList<>(); +try(final PreparedStatement st = con.prepareStatement(getLIST_TABLE_COLUMNS())){ +
[GitHub] nifi pull request #2231: NIFI-4521 MS SQL CDC Processor
Github user patricker commented on a diff in the pull request: https://github.com/apache/nifi/pull/2231#discussion_r166169358 --- Diff: nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/event/TableCapturePlan.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mssql.event; + +import org.apache.nifi.cdc.mssql.MSSQLCDCUtils; +import org.apache.nifi.util.StringUtils; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; + +public class TableCapturePlan { + +public enum PlanTypes{ +CDC, +SNAPSHOT +} + +public MSSQLTableInfo getTable() { +return table; +} + +public int getRowLimit() { +return rowLimit; +} + +public boolean getCaptureBaseline() { +return captureBaseline; +} + +public Timestamp getMaxTime() { +return maxTime; +} + +public PlanTypes getPlanType() { +return planType; +} + +private MSSQLTableInfo table; +private int rowLimit; +private boolean captureBaseline; +private Timestamp maxTime; + +private PlanTypes planType; + +public TableCapturePlan(MSSQLTableInfo table, int rowLimit, boolean captureBaseline, String sTime){ +this.table = table; + +this.rowLimit = rowLimit; +this.captureBaseline = captureBaseline; + +if(!StringUtils.isEmpty(sTime)){ +this.maxTime = Timestamp.valueOf(sTime); +} +} + +public void ComputeCapturePlan(Connection con, MSSQLCDCUtils mssqlcdcUtils) throws SQLException { --- End diff -- Agreed. ---
[jira] [Commented] (NIFI-4521) MS SQL CDC Processor
[ https://issues.apache.org/jira/browse/NIFI-4521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353220#comment-16353220 ] ASF GitHub Bot commented on NIFI-4521: -- Github user patricker commented on a diff in the pull request: https://github.com/apache/nifi/pull/2231#discussion_r166168965 --- Diff: nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/MSSQLCDCUtils.java --- @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mssql; + +import org.apache.nifi.cdc.CDCException; +import org.apache.nifi.cdc.event.ColumnDefinition; +import org.apache.nifi.cdc.mssql.event.MSSQLColumnDefinition; +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo; + +import java.sql.Connection; +import java.sql.Timestamp; +import java.sql.Types; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; + +public class MSSQLCDCUtils { --- End diff -- Yes and no. The main code would be fine with just static's, but the unit test needs to override certain methods. (ref `MockCaptureChangeMSSQL` in `CaptureChangeMSSQLTest`) > MS SQL CDC Processor > > > Key: NIFI-4521 > URL: https://issues.apache.org/jira/browse/NIFI-4521 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Peter Wicks >Assignee: Peter Wicks >Priority: Major > > Creation of a new processor that reads Change Data Capture details from > Microsoft SQL Server and outputs the changes a Records. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi-minifi-cpp pull request #259: MINIFICPP-393: Add security support for M...
Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/259#discussion_r166148325 --- Diff: extensions/mqtt/ConsumeMQTT.cpp --- @@ -35,7 +35,7 @@ namespace nifi { namespace minifi { namespace processors { -core::Property ConsumeMQTT::MaxQueueSize("Max Flow Segment Size", "Maximum flow content payload segment size for the MQTT record", ""); +core::Property ConsumeMQTT::MaxQueueSize("Max Queue Size", "Maximum receive queue size for the MQTT record", ""); --- End diff -- I see your point that the variable name doesn't equal the option name "Max Flow Segment Size" vs "Max Queue Size" , but if a user happens to be using "Max flow segment size" per the option we've changed the semantics of what they want. Does that make sense? We should have that option available to them until we can deprecate it. Further, "receive queue size" seems to not connote if the size is in the number of flow files or the number of bytes. That should be clarified. ---
[jira] [Commented] (NIFI-4838) Make GetMongo support multiple commits and give some progress indication
[ https://issues.apache.org/jira/browse/NIFI-4838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353087#comment-16353087 ] ASF GitHub Bot commented on NIFI-4838: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2448#discussion_r166145010 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java --- @@ -221,22 +258,33 @@ private void configureMapper(String setting) { } } -private ObjectWriter getObjectWriter(ObjectMapper mapper, String ppSetting) { -return ppSetting.equals(YES_PP.getValue()) ? mapper.writerWithDefaultPrettyPrinter() +private ObjectWriter getObjectWriter(ObjectMapper mapper, boolean ppSetting) { +return ppSetting ? mapper.writerWithDefaultPrettyPrinter() : mapper.writer(); } -private void writeBatch(String payload, ProcessContext context, ProcessSession session) { +private void writeBatch(String payload, ProcessContext context, ProcessSession session, boolean doCommit, Long count, long index, int batchSize) { FlowFile flowFile = session.create(); flowFile = session.write(flowFile, new OutputStreamCallback() { @Override public void process(OutputStream out) throws IOException { out.write(payload.getBytes("UTF-8")); } }); -flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json"); +Mapattrs = new HashMap<>(); +attrs.put(CoreAttributes.MIME_TYPE.key(), "application/json"); +if (count != null) { +attrs.put(PROGRESS_START, String.valueOf(index - batchSize)); +attrs.put(PROGRESS_END, String.valueOf(index)); +attrs.put(PROGRESS_ESTIMATE, String.valueOf(count)); +} +flowFile = session.putAllAttributes(flowFile, attrs); session.getProvenanceReporter().receive(flowFile, getURI(context)); + session.transfer(flowFile, REL_SUCCESS); +if (doCommit) { +session.commit(); +} --- End diff -- The commit has two goals: 1. Make progressive commits a configurable option. 2. Enable people to run a count() on the collection and put progress markers on each flowfile. > the current version also transfers a flowfile for every batch, right? It can do 1 result/flowfile or batches of results in one flowfile. The TL;DR version of why I did this is I have a client who sometimes pulls down 100GB+ at once from Mongo and without these features would have to check the size of the content repository to get an approximation of what percentage has been downloaded. > Make GetMongo support multiple commits and give some progress indication > > > Key: NIFI-4838 > URL: https://issues.apache.org/jira/browse/NIFI-4838 > Project: Apache NiFi > Issue Type: Improvement >Reporter: Mike Thomsen >Assignee: Mike Thomsen >Priority: Major > > It shouldn't wait until the end to do a commit() call because the effect is > that GetMongo looks like it has hung to a user who is pulling a very large > data set. > It should also have an option for running a count query to get the current > approximate count of documents that would match the query and append an > attribute that indicates where a flowfile stands in the total result count. > Ex: > query.progress.point.start = 2500 > query.progress.point.end = 5000 > query.count.estimate = 17,568,231 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2448#discussion_r166145010 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java --- @@ -221,22 +258,33 @@ private void configureMapper(String setting) { } } -private ObjectWriter getObjectWriter(ObjectMapper mapper, String ppSetting) { -return ppSetting.equals(YES_PP.getValue()) ? mapper.writerWithDefaultPrettyPrinter() +private ObjectWriter getObjectWriter(ObjectMapper mapper, boolean ppSetting) { +return ppSetting ? mapper.writerWithDefaultPrettyPrinter() : mapper.writer(); } -private void writeBatch(String payload, ProcessContext context, ProcessSession session) { +private void writeBatch(String payload, ProcessContext context, ProcessSession session, boolean doCommit, Long count, long index, int batchSize) { FlowFile flowFile = session.create(); flowFile = session.write(flowFile, new OutputStreamCallback() { @Override public void process(OutputStream out) throws IOException { out.write(payload.getBytes("UTF-8")); } }); -flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json"); +Mapattrs = new HashMap<>(); +attrs.put(CoreAttributes.MIME_TYPE.key(), "application/json"); +if (count != null) { +attrs.put(PROGRESS_START, String.valueOf(index - batchSize)); +attrs.put(PROGRESS_END, String.valueOf(index)); +attrs.put(PROGRESS_ESTIMATE, String.valueOf(count)); +} +flowFile = session.putAllAttributes(flowFile, attrs); session.getProvenanceReporter().receive(flowFile, getURI(context)); + session.transfer(flowFile, REL_SUCCESS); +if (doCommit) { +session.commit(); +} --- End diff -- The commit has two goals: 1. Make progressive commits a configurable option. 2. Enable people to run a count() on the collection and put progress markers on each flowfile. > the current version also transfers a flowfile for every batch, right? It can do 1 result/flowfile or batches of results in one flowfile. The TL;DR version of why I did this is I have a client who sometimes pulls down 100GB+ at once from Mongo and without these features would have to check the size of the content repository to get an approximation of what percentage has been downloaded. ---
[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2180#discussion_r166141568 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java --- @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.mongodb; + +import com.mongodb.BasicDBObject; +import com.mongodb.client.AggregateIterable; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.bson.conversions.Bson; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@Tags({"mongo", "aggregation", "aggregate"}) +@CapabilityDescription("A processor that runs an aggregation query whenever a flowfile is received.") +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +public class RunMongoAggregation extends AbstractMongoProcessor { + +private final static Set relationships; +private final static List propertyDescriptors; + +static final Relationship REL_ORIGINAL = new Relationship.Builder() +.description("The input flowfile gets sent to this relationship when the query succeeds.") +.name("original") +.build(); +static final Relationship REL_FAILURE = new Relationship.Builder() +.description("The input flowfile gets sent to this relationship when the query fails.") +.name("failure") +.build(); +static final Relationship REL_RESULTS = new Relationship.Builder() +.description("The result set of the aggregation will be sent to this relationship.") +.name("results") +.build(); + +static final List buildAggregationQuery(String query) throws IOException { +List result = new ArrayList<>(); + +ObjectMapper mapper = new ObjectMapper(); +List values = mapper.readValue(query, List.class); +for (Map val : values) { +result.add(new BasicDBObject(val)); +} + +return result; +} + +public static final Validator AGG_VALIDATOR = (subject, value, context) -> { +final ValidationResult.Builder builder = new ValidationResult.Builder(); +builder.subject(subject).input(value); + +if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { +return builder.valid(true).explanation("Contains Expression Language").build(); +} + +String reason = null; +try { +buildAggregationQuery(value); +} catch (final RuntimeException | IOException e) { +reason = e.getLocalizedMessage(); +} + +return builder.explanation(reason).valid(reason == null).build(); +}; + +static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() +
[jira] [Updated] (NIFI-4700) PostHTTP: close client
[ https://issues.apache.org/jira/browse/NIFI-4700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Hogue updated NIFI-4700: Status: Open (was: Patch Available) > PostHTTP: close client > -- > > Key: NIFI-4700 > URL: https://issues.apache.org/jira/browse/NIFI-4700 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.4.0 >Reporter: Brandon DeVries >Assignee: Michael Hogue >Priority: Major > Fix For: 1.6.0 > > > In PostHTTP, the CloseableHttpClient never actually appears to be closed... > Additionally, we could leverage CloseableHttpResponse to close responses. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4700) PostHTTP: close client
[ https://issues.apache.org/jira/browse/NIFI-4700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353042#comment-16353042 ] Michael Hogue commented on NIFI-4700: - No problem. It's closed. > PostHTTP: close client > -- > > Key: NIFI-4700 > URL: https://issues.apache.org/jira/browse/NIFI-4700 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.4.0 >Reporter: Brandon DeVries >Assignee: Michael Hogue >Priority: Major > > In PostHTTP, the CloseableHttpClient never actually appears to be closed... > Additionally, we could leverage CloseableHttpResponse to close responses. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (NIFI-4700) PostHTTP: close client
[ https://issues.apache.org/jira/browse/NIFI-4700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Hogue updated NIFI-4700: Fix Version/s: (was: 1.6.0) > PostHTTP: close client > -- > > Key: NIFI-4700 > URL: https://issues.apache.org/jira/browse/NIFI-4700 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.4.0 >Reporter: Brandon DeVries >Assignee: Michael Hogue >Priority: Major > > In PostHTTP, the CloseableHttpClient never actually appears to be closed... > Additionally, we could leverage CloseableHttpResponse to close responses. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (NIFI-4700) PostHTTP: close client
[ https://issues.apache.org/jira/browse/NIFI-4700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Hogue resolved NIFI-4700. - Resolution: Won't Fix > PostHTTP: close client > -- > > Key: NIFI-4700 > URL: https://issues.apache.org/jira/browse/NIFI-4700 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.4.0 >Reporter: Brandon DeVries >Assignee: Michael Hogue >Priority: Major > > In PostHTTP, the CloseableHttpClient never actually appears to be closed... > Additionally, we could leverage CloseableHttpResponse to close responses. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4700) PostHTTP: close client
[ https://issues.apache.org/jira/browse/NIFI-4700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353038#comment-16353038 ] Michael Moser commented on NIFI-4700: - OK, thanks. [~m-hogue] if you also agree, do you mind closing PR [https://github.com/apache/nifi/pull/2434]? > PostHTTP: close client > -- > > Key: NIFI-4700 > URL: https://issues.apache.org/jira/browse/NIFI-4700 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.4.0 >Reporter: Brandon DeVries >Assignee: Michael Hogue >Priority: Major > Fix For: 1.6.0 > > > In PostHTTP, the CloseableHttpClient never actually appears to be closed... > Additionally, we could leverage CloseableHttpResponse to close responses. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4700) PostHTTP: close client
[ https://issues.apache.org/jira/browse/NIFI-4700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353040#comment-16353040 ] ASF GitHub Bot commented on NIFI-4700: -- Github user m-hogue closed the pull request at: https://github.com/apache/nifi/pull/2434 > PostHTTP: close client > -- > > Key: NIFI-4700 > URL: https://issues.apache.org/jira/browse/NIFI-4700 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.4.0 >Reporter: Brandon DeVries >Assignee: Michael Hogue >Priority: Major > Fix For: 1.6.0 > > > In PostHTTP, the CloseableHttpClient never actually appears to be closed... > Additionally, we could leverage CloseableHttpResponse to close responses. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi pull request #2434: NIFI-4700: Moved all PostHTTP http clients, http re...
Github user m-hogue closed the pull request at: https://github.com/apache/nifi/pull/2434 ---
[jira] [Commented] (NIFI-4700) PostHTTP: close client
[ https://issues.apache.org/jira/browse/NIFI-4700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353033#comment-16353033 ] Brandon DeVries commented on NIFI-4700: --- +1, close it. > PostHTTP: close client > -- > > Key: NIFI-4700 > URL: https://issues.apache.org/jira/browse/NIFI-4700 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.4.0 >Reporter: Brandon DeVries >Assignee: Michael Hogue >Priority: Major > Fix For: 1.6.0 > > > In PostHTTP, the CloseableHttpClient never actually appears to be closed... > Additionally, we could leverage CloseableHttpResponse to close responses. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (MINIFICPP-393) add security support for MQTT
[ https://issues.apache.org/jira/browse/MINIFICPP-393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353013#comment-16353013 ] ASF GitHub Bot commented on MINIFICPP-393: -- Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/259#discussion_r166130342 --- Diff: extensions/mqtt/ConsumeMQTT.cpp --- @@ -35,7 +35,7 @@ namespace nifi { namespace minifi { namespace processors { -core::Property ConsumeMQTT::MaxQueueSize("Max Flow Segment Size", "Maximum flow content payload segment size for the MQTT record", ""); +core::Property ConsumeMQTT::MaxQueueSize("Max Queue Size", "Maximum receive queue size for the MQTT record", ""); --- End diff -- The previous version of the parameter configuration is wrong so that is no backward compatible. > add security support for MQTT > - > > Key: MINIFICPP-393 > URL: https://issues.apache.org/jira/browse/MINIFICPP-393 > Project: NiFi MiNiFi C++ > Issue Type: Improvement >Affects Versions: 1.0.0 >Reporter: bqiu >Assignee: bqiu >Priority: Minor > Fix For: 1.0.0 > > > add security support for MQTT -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi-minifi-cpp pull request #259: MINIFICPP-393: Add security support for M...
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/259#discussion_r166130342 --- Diff: extensions/mqtt/ConsumeMQTT.cpp --- @@ -35,7 +35,7 @@ namespace nifi { namespace minifi { namespace processors { -core::Property ConsumeMQTT::MaxQueueSize("Max Flow Segment Size", "Maximum flow content payload segment size for the MQTT record", ""); +core::Property ConsumeMQTT::MaxQueueSize("Max Queue Size", "Maximum receive queue size for the MQTT record", ""); --- End diff -- The previous version of the parameter configuration is wrong so that is no backward compatible. ---
[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2180#discussion_r166111534 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java --- @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.mongodb; + +import com.mongodb.BasicDBObject; +import com.mongodb.client.AggregateIterable; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.bson.conversions.Bson; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@Tags({"mongo", "aggregation", "aggregate"}) +@CapabilityDescription("A processor that runs an aggregation query whenever a flowfile is received.") +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +public class RunMongoAggregation extends AbstractMongoProcessor { + +private final static Set relationships; +private final static List propertyDescriptors; + +static final Relationship REL_ORIGINAL = new Relationship.Builder() +.description("The input flowfile gets sent to this relationship when the query succeeds.") +.name("original") +.build(); +static final Relationship REL_FAILURE = new Relationship.Builder() +.description("The input flowfile gets sent to this relationship when the query fails.") +.name("failure") +.build(); +static final Relationship REL_RESULTS = new Relationship.Builder() +.description("The result set of the aggregation will be sent to this relationship.") +.name("results") +.build(); + +static final List buildAggregationQuery(String query) throws IOException { +List result = new ArrayList<>(); + +ObjectMapper mapper = new ObjectMapper(); +List values = mapper.readValue(query, List.class); +for (Map val : values) { +result.add(new BasicDBObject(val)); +} + +return result; +} + +public static final Validator AGG_VALIDATOR = (subject, value, context) -> { +final ValidationResult.Builder builder = new ValidationResult.Builder(); +builder.subject(subject).input(value); + +if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { +return builder.valid(true).explanation("Contains Expression Language").build(); +} + +String reason = null; +try { +buildAggregationQuery(value); +} catch (final RuntimeException | IOException e) { +reason = e.getLocalizedMessage(); +} + +return builder.explanation(reason).valid(reason == null).build(); +}; + +static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() +
[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2180#discussion_r166111208 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java --- @@ -95,13 +101,50 @@ public static final PropertyDescriptor WRITE_CONCERN = new PropertyDescriptor.Builder() .name("Write Concern") +.displayName("Write Concern") .description("The write concern to use") .required(true) .allowableValues(WRITE_CONCERN_ACKNOWLEDGED, WRITE_CONCERN_UNACKNOWLEDGED, WRITE_CONCERN_FSYNCED, WRITE_CONCERN_JOURNALED, WRITE_CONCERN_REPLICA_ACKNOWLEDGED, WRITE_CONCERN_MAJORITY) .defaultValue(WRITE_CONCERN_ACKNOWLEDGED) .build(); +static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder() +.name("results-per-flowfile") +.displayName("Results Per FlowFile") +.description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("1") +.build(); + +static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() +.name("Batch Size") +.displayName("Batch Size") +.description("The number of elements returned from the server in one batch.") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("100") +.build(); + +static final PropertyDescriptor QUERY_ATTRIBUTE = new PropertyDescriptor.Builder() +.name("mongo-agg-query-attribute") --- End diff -- Done. ---
[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2180#discussion_r16666 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java --- @@ -152,21 +146,25 @@ public ValidationResult validate(final String subject, final String value, final .displayName("JSON Type") .name("json-type") .description("By default, MongoDB's Java driver returns \"extended JSON\". Some of the features of this variant of JSON" + -" may cause problems for other JSON parsers that expect only standard JSON types and conventions. This configuration setting " + -" controls whether to use extended JSON or provide a clean view that conforms to standard JSON.") +" may cause problems for other JSON parsers that expect only standard JSON types and conventions. This configuration setting " + +" controls whether to use extended JSON or provide a clean view that conforms to standard JSON.") .expressionLanguageSupported(false) .required(true) .build(); private final static Set relationships; private final static List propertyDescriptors; +static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All files are routed to success").build(); + static { List _propertyDescriptors = new ArrayList<>(); _propertyDescriptors.addAll(descriptors); _propertyDescriptors.add(JSON_TYPE); _propertyDescriptors.add(USE_PRETTY_PRINTING); +_propertyDescriptors.add(CHARSET); _propertyDescriptors.add(QUERY); +_propertyDescriptors.add(QUERY_ATTRIBUTE); --- End diff -- Added it. ---
[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2180#discussion_r166111239 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java --- @@ -95,13 +101,50 @@ public static final PropertyDescriptor WRITE_CONCERN = new PropertyDescriptor.Builder() .name("Write Concern") +.displayName("Write Concern") .description("The write concern to use") .required(true) .allowableValues(WRITE_CONCERN_ACKNOWLEDGED, WRITE_CONCERN_UNACKNOWLEDGED, WRITE_CONCERN_FSYNCED, WRITE_CONCERN_JOURNALED, WRITE_CONCERN_REPLICA_ACKNOWLEDGED, WRITE_CONCERN_MAJORITY) .defaultValue(WRITE_CONCERN_ACKNOWLEDGED) .build(); +static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder() +.name("results-per-flowfile") +.displayName("Results Per FlowFile") +.description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("1") +.build(); + +static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() +.name("Batch Size") +.displayName("Batch Size") +.description("The number of elements returned from the server in one batch.") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("100") +.build(); + +static final PropertyDescriptor QUERY_ATTRIBUTE = new PropertyDescriptor.Builder() +.name("mongo-agg-query-attribute") +.displayName("Query Output Attribute") +.description("If set, the query will be written to a specified attribute on the output flowfiles.") +.expressionLanguageSupported(true) + .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) +.required(false) +.build(); +static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() +.name("el5-charset") --- End diff -- Done. ---
[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2180#discussion_r166110849 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java --- @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.mongodb; + +import com.mongodb.BasicDBObject; +import com.mongodb.client.AggregateIterable; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.bson.conversions.Bson; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@Tags({"mongo", "aggregation", "aggregate"}) +@CapabilityDescription("A processor that runs an aggregation query whenever a flowfile is received.") +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +public class RunMongoAggregation extends AbstractMongoProcessor { + +private final static Set relationships; +private final static List propertyDescriptors; + +static final Relationship REL_ORIGINAL = new Relationship.Builder() +.description("The input flowfile gets sent to this relationship when the query succeeds.") +.name("original") +.build(); +static final Relationship REL_FAILURE = new Relationship.Builder() +.description("The input flowfile gets sent to this relationship when the query fails.") +.name("failure") +.build(); +static final Relationship REL_RESULTS = new Relationship.Builder() +.description("The result set of the aggregation will be sent to this relationship.") +.name("results") +.build(); + +static final List buildAggregationQuery(String query) throws IOException { +List result = new ArrayList<>(); + +ObjectMapper mapper = new ObjectMapper(); +List values = mapper.readValue(query, List.class); +for (Map val : values) { +result.add(new BasicDBObject(val)); +} + +return result; +} + +public static final Validator AGG_VALIDATOR = (subject, value, context) -> { +final ValidationResult.Builder builder = new ValidationResult.Builder(); +builder.subject(subject).input(value); + +if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { +return builder.valid(true).explanation("Contains Expression Language").build(); +} + +String reason = null; +try { +buildAggregationQuery(value); +} catch (final RuntimeException | IOException e) { +reason = e.getLocalizedMessage(); +} + +return builder.explanation(reason).valid(reason == null).build(); +}; + +static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() +
[jira] [Commented] (NIFI-4700) PostHTTP: close client
[ https://issues.apache.org/jira/browse/NIFI-4700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352895#comment-16352895 ] Michael Moser commented on NIFI-4700: - I also ran the PostHTTP to ListenHTTP test while monitoring with jvisualvm. I didn't see any problems with the HttpClient leaking objects or exerting any unusual pressure on memory usage. So if you concur [~devriesb] then I suggest we close this ticket. > PostHTTP: close client > -- > > Key: NIFI-4700 > URL: https://issues.apache.org/jira/browse/NIFI-4700 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.4.0 >Reporter: Brandon DeVries >Assignee: Michael Hogue >Priority: Major > Fix For: 1.6.0 > > > In PostHTTP, the CloseableHttpClient never actually appears to be closed... > Additionally, we could leverage CloseableHttpResponse to close responses. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi-minifi-cpp pull request #259: MINIFICPP-393: Add security support for M...
Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/259#discussion_r166103910 --- Diff: extensions/mqtt/ConsumeMQTT.cpp --- @@ -35,7 +35,7 @@ namespace nifi { namespace minifi { namespace processors { -core::Property ConsumeMQTT::MaxQueueSize("Max Flow Segment Size", "Maximum flow content payload segment size for the MQTT record", ""); +core::Property ConsumeMQTT::MaxQueueSize("Max Queue Size", "Maximum receive queue size for the MQTT record", ""); --- End diff -- Do we intend to follow semver? ---
[jira] [Commented] (MINIFICPP-393) add security support for MQTT
[ https://issues.apache.org/jira/browse/MINIFICPP-393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352897#comment-16352897 ] ASF GitHub Bot commented on MINIFICPP-393: -- Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/259#discussion_r166103910 --- Diff: extensions/mqtt/ConsumeMQTT.cpp --- @@ -35,7 +35,7 @@ namespace nifi { namespace minifi { namespace processors { -core::Property ConsumeMQTT::MaxQueueSize("Max Flow Segment Size", "Maximum flow content payload segment size for the MQTT record", ""); +core::Property ConsumeMQTT::MaxQueueSize("Max Queue Size", "Maximum receive queue size for the MQTT record", ""); --- End diff -- Do we intend to follow semver? > add security support for MQTT > - > > Key: MINIFICPP-393 > URL: https://issues.apache.org/jira/browse/MINIFICPP-393 > Project: NiFi MiNiFi C++ > Issue Type: Improvement >Affects Versions: 1.0.0 >Reporter: bqiu >Assignee: bqiu >Priority: Minor > Fix For: 1.0.0 > > > add security support for MQTT -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4841) NPE when reverting local modifications to a versioned process group
[ https://issues.apache.org/jira/browse/NIFI-4841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352881#comment-16352881 ] Charlie Meyer commented on NIFI-4841: - I uploaded a template that I was able to reproduce this with. # instantiate the template # get it versioned in the registry # update the RPG configuration with a nifi hostname that actually has an input port that can accept site to site # connect the funnel to the RPG # start up the process group, but dont enable transmission on the RPG # step out of the process group and attempt to revert the local changes > NPE when reverting local modifications to a versioned process group > --- > > Key: NIFI-4841 > URL: https://issues.apache.org/jira/browse/NIFI-4841 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.5.0 >Reporter: Charlie Meyer >Priority: Major > Attachments: NIFI-4841.xml > > > I created a process group via importing from the registry. I then made a few > modifications including settings properties and connecting some components. I > then attempted to revert my local changes so I could update the flow to a > newer version. When reverting the local changes, NiFi threw a NPE with the > following stack trace: > {noformat} > 2018-02-05 17:18:52,356 INFO [Version Control Update Thread-1] > org.apache.nifi.web.api.VersionsResource Stopping 1 Processors > 2018-02-05 17:18:52,477 ERROR [Version Control Update Thread-1] > org.apache.nifi.web.api.VersionsResource Failed to update flow to new version > java.lang.NullPointerException: null > at > org.apache.nifi.web.dao.impl.StandardProcessGroupDAO.scheduleComponents(StandardProcessGroupDAO.java:179) > at > org.apache.nifi.web.dao.impl.StandardProcessGroupDAO$$FastClassBySpringCGLIB$$10a99b47.invoke() > at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) > at > org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:738) > at > org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) > at > org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92) > at > org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) > at > org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:673) > at > org.apache.nifi.web.dao.impl.StandardProcessGroupDAO$$EnhancerBySpringCGLIB$$bc287b8b.scheduleComponents() > at > org.apache.nifi.web.StandardNiFiServiceFacade$3.update(StandardNiFiServiceFacade.java:981) > at > org.apache.nifi.web.revision.NaiveRevisionManager.updateRevision(NaiveRevisionManager.java:120) > at > org.apache.nifi.web.StandardNiFiServiceFacade.scheduleComponents(StandardNiFiServiceFacade.java:976) > at > org.apache.nifi.web.StandardNiFiServiceFacade$$FastClassBySpringCGLIB$$358780e0.invoke() > at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) > at > org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:738) > at > org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) > at > org.springframework.aop.aspectj.MethodInvocationProceedingJoinPoint.proceed(MethodInvocationProceedingJoinPoint.java:85) > at > org.apache.nifi.web.NiFiServiceFacadeLock.proceedWithWriteLock(NiFiServiceFacadeLock.java:173) > at > org.apache.nifi.web.NiFiServiceFacadeLock.scheduleLock(NiFiServiceFacadeLock.java:102) > at sun.reflect.GeneratedMethodAccessor557.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethodWithGivenArgs(AbstractAspectJAdvice.java:629) > at > org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:618) > at > org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:70) > at > org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) > at > org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92) > at > org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) > at > org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:673) > at > org.apache.nifi.web.StandardNiFiServiceFacade$$EnhancerBySpringCGLIB$$8a758fa4.scheduleComponents() > at >
[jira] [Updated] (NIFI-4841) NPE when reverting local modifications to a versioned process group
[ https://issues.apache.org/jira/browse/NIFI-4841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Charlie Meyer updated NIFI-4841: Attachment: NIFI-4841.xml > NPE when reverting local modifications to a versioned process group > --- > > Key: NIFI-4841 > URL: https://issues.apache.org/jira/browse/NIFI-4841 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.5.0 >Reporter: Charlie Meyer >Priority: Major > Attachments: NIFI-4841.xml > > > I created a process group via importing from the registry. I then made a few > modifications including settings properties and connecting some components. I > then attempted to revert my local changes so I could update the flow to a > newer version. When reverting the local changes, NiFi threw a NPE with the > following stack trace: > {noformat} > 2018-02-05 17:18:52,356 INFO [Version Control Update Thread-1] > org.apache.nifi.web.api.VersionsResource Stopping 1 Processors > 2018-02-05 17:18:52,477 ERROR [Version Control Update Thread-1] > org.apache.nifi.web.api.VersionsResource Failed to update flow to new version > java.lang.NullPointerException: null > at > org.apache.nifi.web.dao.impl.StandardProcessGroupDAO.scheduleComponents(StandardProcessGroupDAO.java:179) > at > org.apache.nifi.web.dao.impl.StandardProcessGroupDAO$$FastClassBySpringCGLIB$$10a99b47.invoke() > at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) > at > org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:738) > at > org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) > at > org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92) > at > org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) > at > org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:673) > at > org.apache.nifi.web.dao.impl.StandardProcessGroupDAO$$EnhancerBySpringCGLIB$$bc287b8b.scheduleComponents() > at > org.apache.nifi.web.StandardNiFiServiceFacade$3.update(StandardNiFiServiceFacade.java:981) > at > org.apache.nifi.web.revision.NaiveRevisionManager.updateRevision(NaiveRevisionManager.java:120) > at > org.apache.nifi.web.StandardNiFiServiceFacade.scheduleComponents(StandardNiFiServiceFacade.java:976) > at > org.apache.nifi.web.StandardNiFiServiceFacade$$FastClassBySpringCGLIB$$358780e0.invoke() > at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) > at > org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:738) > at > org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) > at > org.springframework.aop.aspectj.MethodInvocationProceedingJoinPoint.proceed(MethodInvocationProceedingJoinPoint.java:85) > at > org.apache.nifi.web.NiFiServiceFacadeLock.proceedWithWriteLock(NiFiServiceFacadeLock.java:173) > at > org.apache.nifi.web.NiFiServiceFacadeLock.scheduleLock(NiFiServiceFacadeLock.java:102) > at sun.reflect.GeneratedMethodAccessor557.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethodWithGivenArgs(AbstractAspectJAdvice.java:629) > at > org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:618) > at > org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:70) > at > org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) > at > org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92) > at > org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) > at > org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:673) > at > org.apache.nifi.web.StandardNiFiServiceFacade$$EnhancerBySpringCGLIB$$8a758fa4.scheduleComponents() > at > org.apache.nifi.web.util.LocalComponentLifecycle.stopComponents(LocalComponentLifecycle.java:125) > at > org.apache.nifi.web.util.LocalComponentLifecycle.scheduleComponents(LocalComponentLifecycle.java:66) > at > org.apache.nifi.web.api.VersionsResource.updateFlowVersion(VersionsResource.java:1365) > at > org.apache.nifi.web.api.VersionsResource.lambda$null$22(VersionsResource.java:1305) > at >
[jira] [Commented] (NIFI-4794) Improve Garbage Collection required by Provenance Repository
[ https://issues.apache.org/jira/browse/NIFI-4794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352823#comment-16352823 ] ASF GitHub Bot commented on NIFI-4794: -- Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2437#discussion_r166090089 --- Diff: nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedSchemaRecordReader.java --- @@ -23,42 +23,29 @@ import java.io.InputStream; import java.util.Collection; import java.util.Optional; -import java.util.concurrent.TimeUnit; + import org.apache.nifi.provenance.schema.LookupTableEventRecord; import org.apache.nifi.provenance.toc.TocReader; import org.apache.nifi.repository.schema.Record; import org.apache.nifi.stream.io.LimitingInputStream; import org.apache.nifi.stream.io.StreamUtils; -import org.apache.nifi.util.timebuffer.LongEntityAccess; -import org.apache.nifi.util.timebuffer.TimedBuffer; -import org.apache.nifi.util.timebuffer.TimestampedLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class EncryptedSchemaRecordReader extends EventIdFirstSchemaRecordReader { private static final Logger logger = LoggerFactory.getLogger(EncryptedSchemaRecordReader.class); -private static final int DEFAULT_DEBUG_FREQUENCY = 1_000_000; --- End diff -- Are changes to this file part of the PR? Doesn't seem like it. Or is it additional cleanup, or should it be restored? > Improve Garbage Collection required by Provenance Repository > > > Key: NIFI-4794 > URL: https://issues.apache.org/jira/browse/NIFI-4794 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Mark Payne >Assignee: Mark Payne >Priority: Major > Fix For: 1.6.0 > > > The EventIdFirstSchemaRecordWriter that is used by the provenance repository > has a writeRecord(ProvenanceEventRecord) method. Within this method, it > serializes the given record into a byte array by serializing to a > ByteArrayOutputStream (after wrapping the BAOS in a DataOutputStream). Once > this is done, it calls toByteArray() on that BAOS so that it can write the > byte[] directly to another OutputStream. > This can create a rather large amount of garbage to be collected. We can > improve this significantly: > # Instead of creating a new ByteArrayOutputStream each time, create a pool > of them. This avoids constantly having to garbage collect them. > # If said BAOS grows beyond a certain size, we should not return it to the > pool because we don't want to keep a huge impact on the heap. > # Instead of wrapping the BAOS in a new DataOutputStream, the > DataOutputStream should be pooled/recycled as well. Since it must create an > internal byte[] for the writeUTF method, this can save a significant amount > of garbage. > # Avoid calling ByteArrayOutputStream.toByteArray(). We can instead just use > ByteArrayOutputStream.writeTo(OutputStream). This avoids both allocating that > new array/copying the data, and the GC overhead. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi pull request #2437: NIFI-4794: Updated event writers to avoid creating ...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2437#discussion_r166090089 --- Diff: nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedSchemaRecordReader.java --- @@ -23,42 +23,29 @@ import java.io.InputStream; import java.util.Collection; import java.util.Optional; -import java.util.concurrent.TimeUnit; + import org.apache.nifi.provenance.schema.LookupTableEventRecord; import org.apache.nifi.provenance.toc.TocReader; import org.apache.nifi.repository.schema.Record; import org.apache.nifi.stream.io.LimitingInputStream; import org.apache.nifi.stream.io.StreamUtils; -import org.apache.nifi.util.timebuffer.LongEntityAccess; -import org.apache.nifi.util.timebuffer.TimedBuffer; -import org.apache.nifi.util.timebuffer.TimestampedLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class EncryptedSchemaRecordReader extends EventIdFirstSchemaRecordReader { private static final Logger logger = LoggerFactory.getLogger(EncryptedSchemaRecordReader.class); -private static final int DEFAULT_DEBUG_FREQUENCY = 1_000_000; --- End diff -- Are changes to this file part of the PR? Doesn't seem like it. Or is it additional cleanup, or should it be restored? ---
[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2180#discussion_r166088202 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java --- @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.mongodb; + +import com.mongodb.BasicDBObject; +import com.mongodb.client.AggregateIterable; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.bson.conversions.Bson; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@Tags({"mongo", "aggregation", "aggregate"}) +@CapabilityDescription("A processor that runs an aggregation query whenever a flowfile is received.") +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +public class RunMongoAggregation extends AbstractMongoProcessor { + +private final static Set relationships; +private final static List propertyDescriptors; + +static final Relationship REL_ORIGINAL = new Relationship.Builder() +.description("The input flowfile gets sent to this relationship when the query succeeds.") +.name("original") +.build(); +static final Relationship REL_FAILURE = new Relationship.Builder() +.description("The input flowfile gets sent to this relationship when the query fails.") +.name("failure") +.build(); +static final Relationship REL_RESULTS = new Relationship.Builder() +.description("The result set of the aggregation will be sent to this relationship.") +.name("results") +.build(); + +static final List buildAggregationQuery(String query) throws IOException { +List result = new ArrayList<>(); + +ObjectMapper mapper = new ObjectMapper(); +List values = mapper.readValue(query, List.class); +for (Map val : values) { +result.add(new BasicDBObject(val)); +} + +return result; +} + +public static final Validator AGG_VALIDATOR = (subject, value, context) -> { +final ValidationResult.Builder builder = new ValidationResult.Builder(); +builder.subject(subject).input(value); + +if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { +return builder.valid(true).explanation("Contains Expression Language").build(); +} + +String reason = null; +try { +buildAggregationQuery(value); +} catch (final RuntimeException | IOException e) { +reason = e.getLocalizedMessage(); +} + +return builder.explanation(reason).valid(reason == null).build(); +}; + +static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() +
[jira] [Resolved] (MINIFICPP-388) DockerBuild.sh missing controller/ and LibExample/ dirs
[ https://issues.apache.org/jira/browse/MINIFICPP-388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Christianson resolved MINIFICPP-388. --- Resolution: Fixed > DockerBuild.sh missing controller/ and LibExample/ dirs > --- > > Key: MINIFICPP-388 > URL: https://issues.apache.org/jira/browse/MINIFICPP-388 > Project: NiFi MiNiFi C++ > Issue Type: Bug >Reporter: Andrew Christianson >Assignee: Andrew Christianson >Priority: Major > > The controller/ and LibExample/ dirs are not copied to docker/minificppsource > during make docker, leading to a swift build failure. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (MINIFICPP-387) Allow TF path to be manually specified via env var
[ https://issues.apache.org/jira/browse/MINIFICPP-387?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Christianson resolved MINIFICPP-387. --- Resolution: Fixed > Allow TF path to be manually specified via env var > -- > > Key: MINIFICPP-387 > URL: https://issues.apache.org/jira/browse/MINIFICPP-387 > Project: NiFi MiNiFi C++ > Issue Type: Improvement >Reporter: Andrew Christianson >Assignee: Andrew Christianson >Priority: Minor > > If TensorFlow is installed in a custom or unusual location, we should allow > the user to specify the path via environment variable as part of the CMake > build. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2180#discussion_r166080619 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java --- @@ -221,4 +264,16 @@ protected WriteConcern getWriteConcern(final ProcessContext context) { } return writeConcern; } + +protected void writeBatch(String payload, FlowFile parent, ProcessContext context, ProcessSession session, Map extraAttributes, Relationship rel) throws UnsupportedEncodingException { +String charset = parent != null ? context.getProperty(CHARSET).evaluateAttributeExpressions(parent).getValue() +: context.getProperty(CHARSET).evaluateAttributeExpressions().getValue(); + +FlowFile flowFile = session.create(parent); --- End diff -- The ternary operator for charset is fine, I'm referring to the session.create(parent). If parent is null, then an NPE will be thrown when it tries to get the parent's attributes. ---
[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2180#discussion_r166079130 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java --- @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.mongodb; + +import com.mongodb.BasicDBObject; +import com.mongodb.client.AggregateIterable; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.bson.conversions.Bson; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@Tags({"mongo", "aggregation", "aggregate"}) +@CapabilityDescription("A processor that runs an aggregation query whenever a flowfile is received.") +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +public class RunMongoAggregation extends AbstractMongoProcessor { + +private final static Set relationships; +private final static List propertyDescriptors; + +static final Relationship REL_ORIGINAL = new Relationship.Builder() +.description("The input flowfile gets sent to this relationship when the query succeeds.") +.name("original") +.build(); +static final Relationship REL_FAILURE = new Relationship.Builder() +.description("The input flowfile gets sent to this relationship when the query fails.") +.name("failure") +.build(); +static final Relationship REL_RESULTS = new Relationship.Builder() +.description("The result set of the aggregation will be sent to this relationship.") +.name("results") +.build(); + +static final List buildAggregationQuery(String query) throws IOException { +List result = new ArrayList<>(); + +ObjectMapper mapper = new ObjectMapper(); +List values = mapper.readValue(query, List.class); +for (Map val : values) { +result.add(new BasicDBObject(val)); +} + +return result; +} + +public static final Validator AGG_VALIDATOR = (subject, value, context) -> { +final ValidationResult.Builder builder = new ValidationResult.Builder(); +builder.subject(subject).input(value); + +if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { +return builder.valid(true).explanation("Contains Expression Language").build(); +} + +String reason = null; +try { +buildAggregationQuery(value); +} catch (final RuntimeException | IOException e) { +reason = e.getLocalizedMessage(); +} + +return builder.explanation(reason).valid(reason == null).build(); +}; + +static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() +
[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2180#discussion_r166078811 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java --- @@ -221,4 +264,16 @@ protected WriteConcern getWriteConcern(final ProcessContext context) { } return writeConcern; } + +protected void writeBatch(String payload, FlowFile parent, ProcessContext context, ProcessSession session, Map extraAttributes, Relationship rel) throws UnsupportedEncodingException { +String charset = parent != null ? context.getProperty(CHARSET).evaluateAttributeExpressions(parent).getValue() +: context.getProperty(CHARSET).evaluateAttributeExpressions().getValue(); + +FlowFile flowFile = session.create(parent); --- End diff -- I just threw a ternary operator in there. ---
[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2180#discussion_r166075574 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java --- @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.mongodb; + +import com.mongodb.BasicDBObject; +import com.mongodb.client.AggregateIterable; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.bson.conversions.Bson; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@Tags({"mongo", "aggregation", "aggregate"}) +@CapabilityDescription("A processor that runs an aggregation query whenever a flowfile is received.") +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +public class RunMongoAggregation extends AbstractMongoProcessor { + +private final static Set relationships; +private final static List propertyDescriptors; + +static final Relationship REL_ORIGINAL = new Relationship.Builder() +.description("The input flowfile gets sent to this relationship when the query succeeds.") +.name("original") +.build(); +static final Relationship REL_FAILURE = new Relationship.Builder() +.description("The input flowfile gets sent to this relationship when the query fails.") +.name("failure") +.build(); +static final Relationship REL_RESULTS = new Relationship.Builder() +.description("The result set of the aggregation will be sent to this relationship.") +.name("results") +.build(); + +static final List buildAggregationQuery(String query) throws IOException { +List result = new ArrayList<>(); + +ObjectMapper mapper = new ObjectMapper(); +List values = mapper.readValue(query, List.class); +for (Map val : values) { +result.add(new BasicDBObject(val)); +} + +return result; +} + +public static final Validator AGG_VALIDATOR = (subject, value, context) -> { +final ValidationResult.Builder builder = new ValidationResult.Builder(); +builder.subject(subject).input(value); + +if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { +return builder.valid(true).explanation("Contains Expression Language").build(); +} + +String reason = null; +try { +buildAggregationQuery(value); +} catch (final RuntimeException | IOException e) { +reason = e.getLocalizedMessage(); +} + +return builder.explanation(reason).valid(reason == null).build(); +}; + +static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() +
[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2180#discussion_r166073173 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java --- @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.mongodb; + +import com.mongodb.BasicDBObject; +import com.mongodb.client.AggregateIterable; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.bson.conversions.Bson; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@Tags({"mongo", "aggregation", "aggregate"}) +@CapabilityDescription("A processor that runs an aggregation query whenever a flowfile is received.") +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +public class RunMongoAggregation extends AbstractMongoProcessor { + +private final static Set relationships; +private final static List propertyDescriptors; + +static final Relationship REL_ORIGINAL = new Relationship.Builder() +.description("The input flowfile gets sent to this relationship when the query succeeds.") +.name("original") +.build(); +static final Relationship REL_FAILURE = new Relationship.Builder() +.description("The input flowfile gets sent to this relationship when the query fails.") +.name("failure") +.build(); +static final Relationship REL_RESULTS = new Relationship.Builder() +.description("The result set of the aggregation will be sent to this relationship.") +.name("results") +.build(); + +static final List buildAggregationQuery(String query) throws IOException { +List result = new ArrayList<>(); + +ObjectMapper mapper = new ObjectMapper(); +List values = mapper.readValue(query, List.class); +for (Map val : values) { +result.add(new BasicDBObject(val)); +} + +return result; +} + +public static final Validator AGG_VALIDATOR = (subject, value, context) -> { +final ValidationResult.Builder builder = new ValidationResult.Builder(); +builder.subject(subject).input(value); + +if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { +return builder.valid(true).explanation("Contains Expression Language").build(); +} + +String reason = null; +try { +buildAggregationQuery(value); +} catch (final RuntimeException | IOException e) { +reason = e.getLocalizedMessage(); +} + +return builder.explanation(reason).valid(reason == null).build(); +}; + +static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() +
[jira] [Commented] (MINIFICPP-393) add security support for MQTT
[ https://issues.apache.org/jira/browse/MINIFICPP-393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352757#comment-16352757 ] ASF GitHub Bot commented on MINIFICPP-393: -- Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/259#discussion_r166072113 --- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp --- @@ -119,6 +126,38 @@ void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::Proc qos_ = valInt; logger_->log_debug("AbstractMQTTProcessor: QOS [%ll]", qos_); } + value = ""; + + if (context->getProperty(SecurityProtocol.getName(), value) && !value.empty()) { +if (value == MQTT_SECURITY_PROTOCOL_SSL) { + sslEnabled_ = true; --- End diff -- the SSL handshake will fail and based on the error, user need to config the right certs. > add security support for MQTT > - > > Key: MINIFICPP-393 > URL: https://issues.apache.org/jira/browse/MINIFICPP-393 > Project: NiFi MiNiFi C++ > Issue Type: Improvement >Affects Versions: 1.0.0 >Reporter: bqiu >Assignee: bqiu >Priority: Minor > Fix For: 1.0.0 > > > add security support for MQTT -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi-minifi-cpp pull request #259: MINIFICPP-393: Add security support for M...
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/259#discussion_r166072113 --- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp --- @@ -119,6 +126,38 @@ void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::Proc qos_ = valInt; logger_->log_debug("AbstractMQTTProcessor: QOS [%ll]", qos_); } + value = ""; + + if (context->getProperty(SecurityProtocol.getName(), value) && !value.empty()) { +if (value == MQTT_SECURITY_PROTOCOL_SSL) { + sslEnabled_ = true; --- End diff -- the SSL handshake will fail and based on the error, user need to config the right certs. ---
[GitHub] nifi-minifi-cpp pull request #259: MINIFICPP-393: Add security support for M...
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/259#discussion_r166071814 --- Diff: extensions/mqtt/ConsumeMQTT.cpp --- @@ -35,7 +35,7 @@ namespace nifi { namespace minifi { namespace processors { -core::Property ConsumeMQTT::MaxQueueSize("Max Flow Segment Size", "Maximum flow content payload segment size for the MQTT record", ""); +core::Property ConsumeMQTT::MaxQueueSize("Max Queue Size", "Maximum receive queue size for the MQTT record", ""); --- End diff -- we need to correct that because the variable is not right in the first place. also if user was using that option, it will default to the default value. ---
[jira] [Commented] (NIFIREG-136) Switch to unique human-friendly names for buckets and flows
[ https://issues.apache.org/jira/browse/NIFIREG-136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352724#comment-16352724 ] Bryan Bende commented on NIFIREG-136: - Good ideas [~kdoran] ! Those sound like promising options. > Switch to unique human-friendly names for buckets and flows > --- > > Key: NIFIREG-136 > URL: https://issues.apache.org/jira/browse/NIFIREG-136 > Project: NiFi Registry > Issue Type: Improvement >Affects Versions: 0.1.0 >Reporter: Andrew Grande >Priority: Major > > I have been playing with the Registry and using [~bende] 's early CLI to > accomplish some automation tasks. Have had really tough times with UUIDs > being used for buckets and flows, it introduced a lot of context switches to > locate/save/copy/paste those when using the API. > I would strongly suggest considering the human-friendly names and convert > deep links to using those instead. This not only provides for an easy > portable full URI, but also addresses compatibility issues between instances > of the registry, as buckets & flows with the same name are guaranteed to have > different UUIDs. A kind of copy/paste between environments. > I never came across a unique name requirement within a tree-like structure to > be an issue when dealing with NiFi. E.g. NiFi and NiFi Registry could > transparently reverse-look up the UUID by extracting names from the URI. The > goal is to have a great user experience. > P.S.: spaces in the name in the URI could be substituted for '+' sign > transparently, using the %20 would defeat the purpose of smooth ease-of-use. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4841) NPE when reverting local modifications to a versioned process group
[ https://issues.apache.org/jira/browse/NIFI-4841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352710#comment-16352710 ] Bryan Bende commented on NIFI-4841: --- [~cemeyer2] thanks for reporting this! Would you be able to provide the list of changes being reverted when this happens (assuming it isn't too large of a list)? Also, after this error happened were the local changes still present, or did it revert them but just showed an error while it happened? > NPE when reverting local modifications to a versioned process group > --- > > Key: NIFI-4841 > URL: https://issues.apache.org/jira/browse/NIFI-4841 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.5.0 >Reporter: Charlie Meyer >Priority: Major > > I created a process group via importing from the registry. I then made a few > modifications including settings properties and connecting some components. I > then attempted to revert my local changes so I could update the flow to a > newer version. When reverting the local changes, NiFi threw a NPE with the > following stack trace: > {noformat} > 2018-02-05 17:18:52,356 INFO [Version Control Update Thread-1] > org.apache.nifi.web.api.VersionsResource Stopping 1 Processors > 2018-02-05 17:18:52,477 ERROR [Version Control Update Thread-1] > org.apache.nifi.web.api.VersionsResource Failed to update flow to new version > java.lang.NullPointerException: null > at > org.apache.nifi.web.dao.impl.StandardProcessGroupDAO.scheduleComponents(StandardProcessGroupDAO.java:179) > at > org.apache.nifi.web.dao.impl.StandardProcessGroupDAO$$FastClassBySpringCGLIB$$10a99b47.invoke() > at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) > at > org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:738) > at > org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) > at > org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92) > at > org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) > at > org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:673) > at > org.apache.nifi.web.dao.impl.StandardProcessGroupDAO$$EnhancerBySpringCGLIB$$bc287b8b.scheduleComponents() > at > org.apache.nifi.web.StandardNiFiServiceFacade$3.update(StandardNiFiServiceFacade.java:981) > at > org.apache.nifi.web.revision.NaiveRevisionManager.updateRevision(NaiveRevisionManager.java:120) > at > org.apache.nifi.web.StandardNiFiServiceFacade.scheduleComponents(StandardNiFiServiceFacade.java:976) > at > org.apache.nifi.web.StandardNiFiServiceFacade$$FastClassBySpringCGLIB$$358780e0.invoke() > at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) > at > org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:738) > at > org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) > at > org.springframework.aop.aspectj.MethodInvocationProceedingJoinPoint.proceed(MethodInvocationProceedingJoinPoint.java:85) > at > org.apache.nifi.web.NiFiServiceFacadeLock.proceedWithWriteLock(NiFiServiceFacadeLock.java:173) > at > org.apache.nifi.web.NiFiServiceFacadeLock.scheduleLock(NiFiServiceFacadeLock.java:102) > at sun.reflect.GeneratedMethodAccessor557.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethodWithGivenArgs(AbstractAspectJAdvice.java:629) > at > org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:618) > at > org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:70) > at > org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) > at > org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92) > at > org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) > at > org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:673) > at > org.apache.nifi.web.StandardNiFiServiceFacade$$EnhancerBySpringCGLIB$$8a758fa4.scheduleComponents() > at > org.apache.nifi.web.util.LocalComponentLifecycle.stopComponents(LocalComponentLifecycle.java:125) > at >
[jira] [Commented] (NIFIREG-136) Switch to unique human-friendly names for buckets and flows
[ https://issues.apache.org/jira/browse/NIFIREG-136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352688#comment-16352688 ] Kevin Doran commented on NIFIREG-136: - I like the idea of having the REST API supporting both names and identifiers in resource paths, and I agree we should introduce this enhancement in a non-breaking manner. In terms of implementation, I'm fine with introducing something in the path to indicate you are specifying a name. Alternatively, here are some ideas for how to implement this so that the meaning of the url path parameter is inferred without explicit syntax used by the client: I've never tried it before, but I think we may be able offer this distinction transparently to the client/user by leveraging JAX-RS support for regex-constrained path parameter matching to "overload" the {{/buckets/}} and {{/buckets//flows/}} endpoints to first determine if the parameter is a UUID or not. I've never tried this in a case where one regex (UUID) is a subset of another regex (e.g, name would just be any arbitrary string presumable, or any string not-matching a UUID regex?), but it might be worth a try. If doing this with two regexes in the JAX-RS annotation is not possible, a similar implementation variant would be to introduce two "explicit" endpoints with distinguishing fields such as {{n=}} or {{i=}} to give the parameter a specific meaning, and then change the logic of the existing endpoint to determine the parameter type (UUID or not) and then map to the correct endpoint, either through redirect or URL Path Rewriting in the application filter chain. Clients that wanted to be explicit could use the explicit syntax. Clients that don't care could just pass a name or UUID and I'm fairly certain we could infer the meaning in ever case. We would have to introduce a constraint during bucket/flow creation that the name cannot match a UUID regex. > Switch to unique human-friendly names for buckets and flows > --- > > Key: NIFIREG-136 > URL: https://issues.apache.org/jira/browse/NIFIREG-136 > Project: NiFi Registry > Issue Type: Improvement >Affects Versions: 0.1.0 >Reporter: Andrew Grande >Priority: Major > > I have been playing with the Registry and using [~bende] 's early CLI to > accomplish some automation tasks. Have had really tough times with UUIDs > being used for buckets and flows, it introduced a lot of context switches to > locate/save/copy/paste those when using the API. > I would strongly suggest considering the human-friendly names and convert > deep links to using those instead. This not only provides for an easy > portable full URI, but also addresses compatibility issues between instances > of the registry, as buckets & flows with the same name are guaranteed to have > different UUIDs. A kind of copy/paste between environments. > I never came across a unique name requirement within a tree-like structure to > be an issue when dealing with NiFi. E.g. NiFi and NiFi Registry could > transparently reverse-look up the UUID by extracting names from the URI. The > goal is to have a great user experience. > P.S.: spaces in the name in the URI could be substituted for '+' sign > transparently, using the %20 would defeat the purpose of smooth ease-of-use. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (NIFI-4841) NPE when reverting local modifications to a versioned process group
Charlie Meyer created NIFI-4841: --- Summary: NPE when reverting local modifications to a versioned process group Key: NIFI-4841 URL: https://issues.apache.org/jira/browse/NIFI-4841 Project: Apache NiFi Issue Type: Bug Reporter: Charlie Meyer I created a process group via importing from the registry. I then made a few modifications including settings properties and connecting some components. I then attempted to revert my local changes so I could update the flow to a newer version. When reverting the local changes, NiFi threw a NPE with the following stack trace: {noformat} 2018-02-05 17:18:52,356 INFO [Version Control Update Thread-1] org.apache.nifi.web.api.VersionsResource Stopping 1 Processors 2018-02-05 17:18:52,477 ERROR [Version Control Update Thread-1] org.apache.nifi.web.api.VersionsResource Failed to update flow to new version java.lang.NullPointerException: null at org.apache.nifi.web.dao.impl.StandardProcessGroupDAO.scheduleComponents(StandardProcessGroupDAO.java:179) at org.apache.nifi.web.dao.impl.StandardProcessGroupDAO$$FastClassBySpringCGLIB$$10a99b47.invoke() at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:738) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:673) at org.apache.nifi.web.dao.impl.StandardProcessGroupDAO$$EnhancerBySpringCGLIB$$bc287b8b.scheduleComponents() at org.apache.nifi.web.StandardNiFiServiceFacade$3.update(StandardNiFiServiceFacade.java:981) at org.apache.nifi.web.revision.NaiveRevisionManager.updateRevision(NaiveRevisionManager.java:120) at org.apache.nifi.web.StandardNiFiServiceFacade.scheduleComponents(StandardNiFiServiceFacade.java:976) at org.apache.nifi.web.StandardNiFiServiceFacade$$FastClassBySpringCGLIB$$358780e0.invoke() at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:738) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) at org.springframework.aop.aspectj.MethodInvocationProceedingJoinPoint.proceed(MethodInvocationProceedingJoinPoint.java:85) at org.apache.nifi.web.NiFiServiceFacadeLock.proceedWithWriteLock(NiFiServiceFacadeLock.java:173) at org.apache.nifi.web.NiFiServiceFacadeLock.scheduleLock(NiFiServiceFacadeLock.java:102) at sun.reflect.GeneratedMethodAccessor557.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethodWithGivenArgs(AbstractAspectJAdvice.java:629) at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:618) at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:70) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:673) at org.apache.nifi.web.StandardNiFiServiceFacade$$EnhancerBySpringCGLIB$$8a758fa4.scheduleComponents() at org.apache.nifi.web.util.LocalComponentLifecycle.stopComponents(LocalComponentLifecycle.java:125) at org.apache.nifi.web.util.LocalComponentLifecycle.scheduleComponents(LocalComponentLifecycle.java:66) at org.apache.nifi.web.api.VersionsResource.updateFlowVersion(VersionsResource.java:1365) at org.apache.nifi.web.api.VersionsResource.lambda$null$22(VersionsResource.java:1305) at org.apache.nifi.web.api.concurrent.AsyncRequestManager$2.run(AsyncRequestManager.java:109) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748){noformat} -- This message was sent by
[jira] [Updated] (NIFI-4841) NPE when reverting local modifications to a versioned process group
[ https://issues.apache.org/jira/browse/NIFI-4841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Charlie Meyer updated NIFI-4841: Affects Version/s: 1.5.0 > NPE when reverting local modifications to a versioned process group > --- > > Key: NIFI-4841 > URL: https://issues.apache.org/jira/browse/NIFI-4841 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.5.0 >Reporter: Charlie Meyer >Priority: Major > > I created a process group via importing from the registry. I then made a few > modifications including settings properties and connecting some components. I > then attempted to revert my local changes so I could update the flow to a > newer version. When reverting the local changes, NiFi threw a NPE with the > following stack trace: > {noformat} > 2018-02-05 17:18:52,356 INFO [Version Control Update Thread-1] > org.apache.nifi.web.api.VersionsResource Stopping 1 Processors > 2018-02-05 17:18:52,477 ERROR [Version Control Update Thread-1] > org.apache.nifi.web.api.VersionsResource Failed to update flow to new version > java.lang.NullPointerException: null > at > org.apache.nifi.web.dao.impl.StandardProcessGroupDAO.scheduleComponents(StandardProcessGroupDAO.java:179) > at > org.apache.nifi.web.dao.impl.StandardProcessGroupDAO$$FastClassBySpringCGLIB$$10a99b47.invoke() > at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) > at > org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:738) > at > org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) > at > org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92) > at > org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) > at > org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:673) > at > org.apache.nifi.web.dao.impl.StandardProcessGroupDAO$$EnhancerBySpringCGLIB$$bc287b8b.scheduleComponents() > at > org.apache.nifi.web.StandardNiFiServiceFacade$3.update(StandardNiFiServiceFacade.java:981) > at > org.apache.nifi.web.revision.NaiveRevisionManager.updateRevision(NaiveRevisionManager.java:120) > at > org.apache.nifi.web.StandardNiFiServiceFacade.scheduleComponents(StandardNiFiServiceFacade.java:976) > at > org.apache.nifi.web.StandardNiFiServiceFacade$$FastClassBySpringCGLIB$$358780e0.invoke() > at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) > at > org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:738) > at > org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) > at > org.springframework.aop.aspectj.MethodInvocationProceedingJoinPoint.proceed(MethodInvocationProceedingJoinPoint.java:85) > at > org.apache.nifi.web.NiFiServiceFacadeLock.proceedWithWriteLock(NiFiServiceFacadeLock.java:173) > at > org.apache.nifi.web.NiFiServiceFacadeLock.scheduleLock(NiFiServiceFacadeLock.java:102) > at sun.reflect.GeneratedMethodAccessor557.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethodWithGivenArgs(AbstractAspectJAdvice.java:629) > at > org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:618) > at > org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:70) > at > org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) > at > org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92) > at > org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) > at > org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:673) > at > org.apache.nifi.web.StandardNiFiServiceFacade$$EnhancerBySpringCGLIB$$8a758fa4.scheduleComponents() > at > org.apache.nifi.web.util.LocalComponentLifecycle.stopComponents(LocalComponentLifecycle.java:125) > at > org.apache.nifi.web.util.LocalComponentLifecycle.scheduleComponents(LocalComponentLifecycle.java:66) > at > org.apache.nifi.web.api.VersionsResource.updateFlowVersion(VersionsResource.java:1365) > at > org.apache.nifi.web.api.VersionsResource.lambda$null$22(VersionsResource.java:1305) > at >
[jira] [Resolved] (MINIFICPP-391) MiNiFi/NiFi versions are incorrect in DockerVerify & int test code
[ https://issues.apache.org/jira/browse/MINIFICPP-391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] marco polo resolved MINIFICPP-391. -- Resolution: Fixed Fix Version/s: 0.5.0 Merged > MiNiFi/NiFi versions are incorrect in DockerVerify & int test code > -- > > Key: MINIFICPP-391 > URL: https://issues.apache.org/jira/browse/MINIFICPP-391 > Project: NiFi MiNiFi C++ > Issue Type: Bug >Reporter: Andrew Christianson >Assignee: Andrew Christianson >Priority: Major > Fix For: 0.5.0 > > > MiNiFi/NiFi versions are incorrect in DockerVerify & int test code causing > test failures. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (MINIFICPP-389) FlexLexer.h missing, causing broken Alpine docker build
[ https://issues.apache.org/jira/browse/MINIFICPP-389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] marco polo resolved MINIFICPP-389. -- Resolution: Fixed Fix Version/s: 0.5.0 > FlexLexer.h missing, causing broken Alpine docker build > --- > > Key: MINIFICPP-389 > URL: https://issues.apache.org/jira/browse/MINIFICPP-389 > Project: NiFi MiNiFi C++ > Issue Type: Bug >Reporter: Andrew Christianson >Assignee: Andrew Christianson >Priority: Major > Fix For: 0.5.0 > > > Needs flex-dev in the docker build deps. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (MINIFICPP-326) Build failure in Fedora 27
[ https://issues.apache.org/jira/browse/MINIFICPP-326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] marco polo updated MINIFICPP-326: - Fix Version/s: 0.4.0 > Build failure in Fedora 27 > -- > > Key: MINIFICPP-326 > URL: https://issues.apache.org/jira/browse/MINIFICPP-326 > Project: NiFi MiNiFi C++ > Issue Type: Bug >Affects Versions: 0.3.0 > Environment: Linux vandal 4.13.13-300.fc27.x86_64 #1 SMP Wed Nov 15 > 15:47:50 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux > gcc (GCC) 7.2.1 20170915 (Red Hat 7.2.1-2) > Copyright (C) 2017 Free Software Foundation, Inc. > This is free software; see the source for copying conditions. There is NO > warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. >Reporter: Joseph Witt >Priority: Major > Fix For: 0.4.0 > > > Following is the last many lines of output from > {quote} > #define __USE_POSIX > > In file included from > /usr/include/c++/7/x86_64-redhat-linux/bits/os_defines.h:39:0, > from > /usr/include/c++/7/x86_64-redhat-linux/bits/c++config.h:2494, > from /usr/include/c++/7/utility:68, > from /usr/include/c++/7/algorithm:60, > from > /development/build-verify/nifi-minifi-cpp-0.3.0-source/libminifi/include/core/Property.h:21, > from > /development/build-verify/nifi-minifi-cpp-0.3.0-source/libminifi/include/processors/AppendHostInfo.h:23, > from > /development/build-verify/nifi-minifi-cpp-0.3.0-source/libminifi/src/processors/AppendHostInfo.cpp:20: > /usr/include/features.h:295:0: note: this is the location of the previous > definition > # define __USE_POSIX 1 > > [ 16%] Building CXX object > libminifi/CMakeFiles/minifi.dir/src/processors/ExecuteProcess.cpp.o > [ 16%] Building CXX object > libminifi/CMakeFiles/minifi.dir/src/processors/ExtractText.cpp.o > [ 16%] Building CXX object > libminifi/CMakeFiles/minifi.dir/src/processors/GenerateFlowFile.cpp.o > [ 16%] Building CXX object > libminifi/CMakeFiles/minifi.dir/src/processors/GetFile.cpp.o > [ 16%] Building CXX object > libminifi/CMakeFiles/minifi.dir/src/processors/GetTCP.cpp.o > [ 16%] Building CXX object > libminifi/CMakeFiles/minifi.dir/src/processors/ListenHTTP.cpp.o > [ 16%] Building CXX object > libminifi/CMakeFiles/minifi.dir/src/processors/ListenSyslog.cpp.o > [ 17%] Building CXX object > libminifi/CMakeFiles/minifi.dir/src/processors/LogAttribute.cpp.o > [ 17%] Building CXX object > libminifi/CMakeFiles/minifi.dir/src/processors/PutFile.cpp.o > [ 17%] Building CXX object > libminifi/CMakeFiles/minifi.dir/src/processors/TailFile.cpp.o > [ 17%] Linking CXX static library libminifi.a > [ 17%] Built target minifi > Scanning dependencies of target spd_lib > [ 17%] Building CXX object > CMakeFiles/spd_lib.dir/thirdparty/spdlog-20170710/include/spdlog/dummy.cpp.o > [ 17%] Linking CXX static library libspd_lib.a > [ 17%] Built target spd_lib > Scanning dependencies of target catch_main > [ 18%] Building CXX object > CMakeFiles/catch_main.dir/libminifi/test/CatchMain.cpp.o > [ 18%] Linking CXX static library libcatch_main.a > [ 18%] Built target catch_main > Scanning dependencies of target test_base > [ 18%] Building CXX object > CMakeFiles/test_base.dir/libminifi/test/TestBase.cpp.o > [ 18%] Linking CXX static library libtest_base.a > [ 18%] Built target test_base > Scanning dependencies of target civetweb-cpp > [ 18%] Building CXX object > thirdparty/civetweb-1.9.1/src/CMakeFiles/civetweb-cpp.dir/CivetServer.cpp.o > [ 18%] Linking CXX static library libcivetweb-cpp.a > Error running link command: Segmentation fault > make[2]: *** > [thirdparty/civetweb-1.9.1/src/CMakeFiles/civetweb-cpp.dir/build.make:96: > thirdparty/civetweb-1.9.1/src/libcivetweb-cpp.a] Error 1 > make[1]: *** [CMakeFiles/Makefile2:1759: > thirdparty/civetweb-1.9.1/src/CMakeFiles/civetweb-cpp.dir/all] Error 2 > make: *** [Makefile:163: all] Error 2 > {quote} > Here is the command history i ran as privileged user to prepare the env > {quote} > 116 yum install cmake gcc gcc-c++ libcurl-devel rocksdb-devel > rocksdb libuuid libuuid-devel boost-devel openssl-devel bzip2-devel > xz-devel doxygen > 117 yum install python34-devel > 118 dnf install python34-devel > 119 dnf list installed python > 120 dnf list installed python3 > 121 dnf install python3-devel > 122 dnf install lua-devel > 123 dnf install libusb-devel libpng-devel > 124 dnf install docker > 125 dnf install docker python-virtualenv > 126 python --version > 127 python3 --version > 128 dnf install gpsd-devel > 129 dfn install libpcap-devel > 130 dnf install libpcap-devel > {quote} > Here is the commands ran as build user > {quote} > 598 mkdir build > 599 cd build/ > 600 ls > 601
[jira] [Resolved] (MINIFICPP-327) The PutKafka processor should be renamed to PublishKafka
[ https://issues.apache.org/jira/browse/MINIFICPP-327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] marco polo resolved MINIFICPP-327. -- Resolution: Fixed This was resolved. > The PutKafka processor should be renamed to PublishKafka > > > Key: MINIFICPP-327 > URL: https://issues.apache.org/jira/browse/MINIFICPP-327 > Project: NiFi MiNiFi C++ > Issue Type: Bug >Affects Versions: 0.3.0 >Reporter: Joseph Witt >Priority: Major > Fix For: 0.4.0 > > > The PutKafka processor appears to have been modelled after PutKafka from NiFi > in terms of name and processor properties supported. The processor should > probably be modelled after PublishKafka which is what effectively supercedes > PutKafka. > The Kafka client libraries have been historically problematic in terms of > multi broker compatibility so we've ended up naming them a little strangely. > That said, if this is against the 1.0 kafka client then we can probably just > keep the name simple and call it 'PublishKafka' and use the same properties > we have in PublishKafka in niFi. > For the 0.3.0 release of minificpp we should flag this as something that will > change/not be honored (the PutKafka name) and then fix it to be PublishKafka > with different properties for the next release. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (MINIFICPP-326) Build failure in Fedora 27
[ https://issues.apache.org/jira/browse/MINIFICPP-326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] marco polo resolved MINIFICPP-326. -- Resolution: Fixed This was resolved in 0.4.0 > Build failure in Fedora 27 > -- > > Key: MINIFICPP-326 > URL: https://issues.apache.org/jira/browse/MINIFICPP-326 > Project: NiFi MiNiFi C++ > Issue Type: Bug >Affects Versions: 0.3.0 > Environment: Linux vandal 4.13.13-300.fc27.x86_64 #1 SMP Wed Nov 15 > 15:47:50 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux > gcc (GCC) 7.2.1 20170915 (Red Hat 7.2.1-2) > Copyright (C) 2017 Free Software Foundation, Inc. > This is free software; see the source for copying conditions. There is NO > warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. >Reporter: Joseph Witt >Priority: Major > Fix For: 0.4.0 > > > Following is the last many lines of output from > {quote} > #define __USE_POSIX > > In file included from > /usr/include/c++/7/x86_64-redhat-linux/bits/os_defines.h:39:0, > from > /usr/include/c++/7/x86_64-redhat-linux/bits/c++config.h:2494, > from /usr/include/c++/7/utility:68, > from /usr/include/c++/7/algorithm:60, > from > /development/build-verify/nifi-minifi-cpp-0.3.0-source/libminifi/include/core/Property.h:21, > from > /development/build-verify/nifi-minifi-cpp-0.3.0-source/libminifi/include/processors/AppendHostInfo.h:23, > from > /development/build-verify/nifi-minifi-cpp-0.3.0-source/libminifi/src/processors/AppendHostInfo.cpp:20: > /usr/include/features.h:295:0: note: this is the location of the previous > definition > # define __USE_POSIX 1 > > [ 16%] Building CXX object > libminifi/CMakeFiles/minifi.dir/src/processors/ExecuteProcess.cpp.o > [ 16%] Building CXX object > libminifi/CMakeFiles/minifi.dir/src/processors/ExtractText.cpp.o > [ 16%] Building CXX object > libminifi/CMakeFiles/minifi.dir/src/processors/GenerateFlowFile.cpp.o > [ 16%] Building CXX object > libminifi/CMakeFiles/minifi.dir/src/processors/GetFile.cpp.o > [ 16%] Building CXX object > libminifi/CMakeFiles/minifi.dir/src/processors/GetTCP.cpp.o > [ 16%] Building CXX object > libminifi/CMakeFiles/minifi.dir/src/processors/ListenHTTP.cpp.o > [ 16%] Building CXX object > libminifi/CMakeFiles/minifi.dir/src/processors/ListenSyslog.cpp.o > [ 17%] Building CXX object > libminifi/CMakeFiles/minifi.dir/src/processors/LogAttribute.cpp.o > [ 17%] Building CXX object > libminifi/CMakeFiles/minifi.dir/src/processors/PutFile.cpp.o > [ 17%] Building CXX object > libminifi/CMakeFiles/minifi.dir/src/processors/TailFile.cpp.o > [ 17%] Linking CXX static library libminifi.a > [ 17%] Built target minifi > Scanning dependencies of target spd_lib > [ 17%] Building CXX object > CMakeFiles/spd_lib.dir/thirdparty/spdlog-20170710/include/spdlog/dummy.cpp.o > [ 17%] Linking CXX static library libspd_lib.a > [ 17%] Built target spd_lib > Scanning dependencies of target catch_main > [ 18%] Building CXX object > CMakeFiles/catch_main.dir/libminifi/test/CatchMain.cpp.o > [ 18%] Linking CXX static library libcatch_main.a > [ 18%] Built target catch_main > Scanning dependencies of target test_base > [ 18%] Building CXX object > CMakeFiles/test_base.dir/libminifi/test/TestBase.cpp.o > [ 18%] Linking CXX static library libtest_base.a > [ 18%] Built target test_base > Scanning dependencies of target civetweb-cpp > [ 18%] Building CXX object > thirdparty/civetweb-1.9.1/src/CMakeFiles/civetweb-cpp.dir/CivetServer.cpp.o > [ 18%] Linking CXX static library libcivetweb-cpp.a > Error running link command: Segmentation fault > make[2]: *** > [thirdparty/civetweb-1.9.1/src/CMakeFiles/civetweb-cpp.dir/build.make:96: > thirdparty/civetweb-1.9.1/src/libcivetweb-cpp.a] Error 1 > make[1]: *** [CMakeFiles/Makefile2:1759: > thirdparty/civetweb-1.9.1/src/CMakeFiles/civetweb-cpp.dir/all] Error 2 > make: *** [Makefile:163: all] Error 2 > {quote} > Here is the command history i ran as privileged user to prepare the env > {quote} > 116 yum install cmake gcc gcc-c++ libcurl-devel rocksdb-devel > rocksdb libuuid libuuid-devel boost-devel openssl-devel bzip2-devel > xz-devel doxygen > 117 yum install python34-devel > 118 dnf install python34-devel > 119 dnf list installed python > 120 dnf list installed python3 > 121 dnf install python3-devel > 122 dnf install lua-devel > 123 dnf install libusb-devel libpng-devel > 124 dnf install docker > 125 dnf install docker python-virtualenv > 126 python --version > 127 python3 --version > 128 dnf install gpsd-devel > 129 dfn install libpcap-devel > 130 dnf install libpcap-devel > {quote} > Here is the commands ran as build user > {quote} > 598 mkdir build > 599 cd
[jira] [Updated] (MINIFICPP-390) EL generated sources from host are copied to docker build
[ https://issues.apache.org/jira/browse/MINIFICPP-390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] marco polo updated MINIFICPP-390: - Fix Version/s: 0.5.0 > EL generated sources from host are copied to docker build > - > > Key: MINIFICPP-390 > URL: https://issues.apache.org/jira/browse/MINIFICPP-390 > Project: NiFi MiNiFi C++ > Issue Type: Bug >Reporter: Andrew Christianson >Assignee: Andrew Christianson >Priority: Major > Fix For: 0.5.0 > > > EL source files generated for the host environment are copied to the docker > environment, causing build failure. These files should instead be generated > as part of the docker build process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (MINIFICPP-390) EL generated sources from host are copied to docker build
[ https://issues.apache.org/jira/browse/MINIFICPP-390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] marco polo resolved MINIFICPP-390. -- Resolution: Fixed > EL generated sources from host are copied to docker build > - > > Key: MINIFICPP-390 > URL: https://issues.apache.org/jira/browse/MINIFICPP-390 > Project: NiFi MiNiFi C++ > Issue Type: Bug >Reporter: Andrew Christianson >Assignee: Andrew Christianson >Priority: Major > Fix For: 0.5.0 > > > EL source files generated for the host environment are copied to the docker > environment, causing build failure. These files should instead be generated > as part of the docker build process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (MINIFICPP-327) The PutKafka processor should be renamed to PublishKafka
[ https://issues.apache.org/jira/browse/MINIFICPP-327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] marco polo updated MINIFICPP-327: - Fix Version/s: 0.4.0 > The PutKafka processor should be renamed to PublishKafka > > > Key: MINIFICPP-327 > URL: https://issues.apache.org/jira/browse/MINIFICPP-327 > Project: NiFi MiNiFi C++ > Issue Type: Bug >Affects Versions: 0.3.0 >Reporter: Joseph Witt >Priority: Major > Fix For: 0.4.0 > > > The PutKafka processor appears to have been modelled after PutKafka from NiFi > in terms of name and processor properties supported. The processor should > probably be modelled after PublishKafka which is what effectively supercedes > PutKafka. > The Kafka client libraries have been historically problematic in terms of > multi broker compatibility so we've ended up naming them a little strangely. > That said, if this is against the 1.0 kafka client then we can probably just > keep the name simple and call it 'PublishKafka' and use the same properties > we have in PublishKafka in niFi. > For the 0.3.0 release of minificpp we should flag this as something that will > change/not be honored (the PutKafka name) and then fix it to be PublishKafka > with different properties for the next release. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (MINIFICPP-383) No matter the start directory for minifi.sh, repositories are created in the current working directory
[ https://issues.apache.org/jira/browse/MINIFICPP-383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] marco polo updated MINIFICPP-383: - Status: Patch Available (was: Open) > No matter the start directory for minifi.sh, repositories are created in the > current working directory > -- > > Key: MINIFICPP-383 > URL: https://issues.apache.org/jira/browse/MINIFICPP-383 > Project: NiFi MiNiFi C++ > Issue Type: Improvement >Affects Versions: 0.3.0, 0.2.0, 0.1.0 >Reporter: marco polo >Priority: Major > Fix For: 0.5.0 > > > No matter where the user starts MiNiFi from the directory in which the repos > are created is the current working directory. This should be changed to use a > persistent directory based on the root of the installation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (MINIFICPP-385) RPG destruction can lead to EOFException in NiFi when sockets are not closed.
[ https://issues.apache.org/jira/browse/MINIFICPP-385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] marco polo updated MINIFICPP-385: - Status: Patch Available (was: Open) > RPG destruction can lead to EOFException in NiFi when sockets are not closed. > -- > > Key: MINIFICPP-385 > URL: https://issues.apache.org/jira/browse/MINIFICPP-385 > Project: NiFi MiNiFi C++ > Issue Type: Improvement >Reporter: marco polo >Assignee: marco polo >Priority: Major > > Current solution has not caused the issue in hours: > > Setup a countdown latch using RAII to control closure while there are any > open sockets in the ontrigger function in RPG. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFIREG-139) Nifi registry should pick up the controller service definitions while deploying a flow
[ https://issues.apache.org/jira/browse/NIFIREG-139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352620#comment-16352620 ] Joseph Witt commented on NIFIREG-139: - [~israelio] If the controller service is scoped/configured within the item being version controlled it will be included/copied. If it is at a higher level then it is an external reference and will not be copied but rather an attempt to bind to a similar one in the other environment will be made. If you create the service in the other environment just as you did in the original environment it will likely do what you want. If you change to have the CS in the same scope as the versioned item it will too. Please confirm how you have it setup and let us know if that new understanding helps you. Others can confirm if I've stated anything inaccurate here. > Nifi registry should pick up the controller service definitions while > deploying a flow > -- > > Key: NIFIREG-139 > URL: https://issues.apache.org/jira/browse/NIFIREG-139 > Project: NiFi Registry > Issue Type: Bug >Affects Versions: 0.1.0 >Reporter: ohad israeli >Priority: Critical > > I am trying to use the registry to copy a simple flow for example of > httprequest->joltconversion->httpresponse. > This flow is using the internal http context service which is installed and > configured on the source (dev env). While deploying the flow to the target > (prod env) the flow cant find the service on the target (prod end) > to summarize, on deployment of a flow if the flow is using a service > controller its setting should also be deployed on the target machine -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (NIFIREG-139) Nifi registry should pick up the controller service definitions while deploying a flow
ohad israeli created NIFIREG-139: Summary: Nifi registry should pick up the controller service definitions while deploying a flow Key: NIFIREG-139 URL: https://issues.apache.org/jira/browse/NIFIREG-139 Project: NiFi Registry Issue Type: Bug Affects Versions: 0.1.0 Reporter: ohad israeli I am trying to use the registry to copy a simple flow for example of httprequest->joltconversion->httpresponse. This flow is using the internal http context service which is installed and configured on the source (dev env). While deploying the flow to the target (prod env) the flow cant find the service on the target (prod end) to summarize, on deployment of a flow if the flow is using a service controller its setting should also be deployed on the target machine -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (MINIFICPP-393) add security support for MQTT
[ https://issues.apache.org/jira/browse/MINIFICPP-393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352572#comment-16352572 ] ASF GitHub Bot commented on MINIFICPP-393: -- Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/259#discussion_r166013537 --- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp --- @@ -119,6 +126,38 @@ void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::Proc qos_ = valInt; logger_->log_debug("AbstractMQTTProcessor: QOS [%ll]", qos_); } + value = ""; + + if (context->getProperty(SecurityProtocol.getName(), value) && !value.empty()) { +if (value == MQTT_SECURITY_PROTOCOL_SSL) { + sslEnabled_ = true; --- End diff -- what happens if a cert isn't specified or the CA isn't specified but needs to be? > add security support for MQTT > - > > Key: MINIFICPP-393 > URL: https://issues.apache.org/jira/browse/MINIFICPP-393 > Project: NiFi MiNiFi C++ > Issue Type: Improvement >Affects Versions: 1.0.0 >Reporter: bqiu >Assignee: bqiu >Priority: Minor > Fix For: 1.0.0 > > > add security support for MQTT -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi-minifi-cpp pull request #259: MINIFICPP-393: Add security support for M...
Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/259#discussion_r166013537 --- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp --- @@ -119,6 +126,38 @@ void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::Proc qos_ = valInt; logger_->log_debug("AbstractMQTTProcessor: QOS [%ll]", qos_); } + value = ""; + + if (context->getProperty(SecurityProtocol.getName(), value) && !value.empty()) { +if (value == MQTT_SECURITY_PROTOCOL_SSL) { + sslEnabled_ = true; --- End diff -- what happens if a cert isn't specified or the CA isn't specified but needs to be? ---
[jira] [Commented] (NIFIREG-136) Switch to unique human-friendly names for buckets and flows
[ https://issues.apache.org/jira/browse/NIFIREG-136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352555#comment-16352555 ] Bryan Bende commented on NIFIREG-136: - I like the idea of having a way to reference the bucket and flow by their names, but we also can't just change the meaning of the existing end-point as the NiFi integration with registry is already built using the UUID based API (plus anyone who else may have built something). I'd prefer to create some alternative way to access by name, such that both approaches could co-exist. Currently we have: {code:java} /buckets/{uuid}/flows/{uuid}/versions/{version|latest}{code} Could we do something like: {code:java} /buckets/n={name}/flows/n={name}/versions/{version|latest}{code} This way there is some indication that we are using names instead of UUIDs. Also, just wanted to mention that the item name is only unique per bucket, so two different flows named "Bryan's Flow" can exist in bucket1 and bucket2. I'm only mentioning this because it means we can't access a flow by name without the bucket id. > Switch to unique human-friendly names for buckets and flows > --- > > Key: NIFIREG-136 > URL: https://issues.apache.org/jira/browse/NIFIREG-136 > Project: NiFi Registry > Issue Type: Improvement >Affects Versions: 0.1.0 >Reporter: Andrew Grande >Priority: Major > > I have been playing with the Registry and using [~bende] 's early CLI to > accomplish some automation tasks. Have had really tough times with UUIDs > being used for buckets and flows, it introduced a lot of context switches to > locate/save/copy/paste those when using the API. > I would strongly suggest considering the human-friendly names and convert > deep links to using those instead. This not only provides for an easy > portable full URI, but also addresses compatibility issues between instances > of the registry, as buckets & flows with the same name are guaranteed to have > different UUIDs. A kind of copy/paste between environments. > I never came across a unique name requirement within a tree-like structure to > be an issue when dealing with NiFi. E.g. NiFi and NiFi Registry could > transparently reverse-look up the UUID by extracting names from the URI. The > goal is to have a great user experience. > P.S.: spaces in the name in the URI could be substituted for '+' sign > transparently, using the %20 would defeat the purpose of smooth ease-of-use. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (MINIFICPP-393) add security support for MQTT
[ https://issues.apache.org/jira/browse/MINIFICPP-393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352547#comment-16352547 ] ASF GitHub Bot commented on MINIFICPP-393: -- Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/259#discussion_r166009342 --- Diff: extensions/mqtt/ConsumeMQTT.cpp --- @@ -35,7 +35,7 @@ namespace nifi { namespace minifi { namespace processors { -core::Property ConsumeMQTT::MaxQueueSize("Max Flow Segment Size", "Maximum flow content payload segment size for the MQTT record", ""); +core::Property ConsumeMQTT::MaxQueueSize("Max Queue Size", "Maximum receive queue size for the MQTT record", ""); --- End diff -- May not want to change a variable name. If someone is using this option it may break their configuration. > add security support for MQTT > - > > Key: MINIFICPP-393 > URL: https://issues.apache.org/jira/browse/MINIFICPP-393 > Project: NiFi MiNiFi C++ > Issue Type: Improvement >Affects Versions: 1.0.0 >Reporter: bqiu >Assignee: bqiu >Priority: Minor > Fix For: 1.0.0 > > > add security support for MQTT -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (MINIFICPP-393) add security support for MQTT
[ https://issues.apache.org/jira/browse/MINIFICPP-393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352542#comment-16352542 ] ASF GitHub Bot commented on MINIFICPP-393: -- GitHub user minifirocks opened a pull request: https://github.com/apache/nifi-minifi-cpp/pull/259 MINIFICPP-393: Add security support for MQTT Thank you for submitting a contribution to Apache NiFi - MiNiFi C++. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with MINIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file? - [ ] If applicable, have you updated the NOTICE file? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/minifirocks/nifi-minifi-cpp mqtt_security Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi-minifi-cpp/pull/259.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #259 commit c5d46bdb9dde46c9c3b8e2bb2f8936c5b59c861d Author: Bin QiuDate: 2018-02-05T15:32:14Z MINIFICPP-393: Add security support for MQTT > add security support for MQTT > - > > Key: MINIFICPP-393 > URL: https://issues.apache.org/jira/browse/MINIFICPP-393 > Project: NiFi MiNiFi C++ > Issue Type: Improvement >Affects Versions: 1.0.0 >Reporter: bqiu >Assignee: bqiu >Priority: Minor > Fix For: 1.0.0 > > > add security support for MQTT -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi-minifi-cpp pull request #259: MINIFICPP-393: Add security support for M...
GitHub user minifirocks opened a pull request: https://github.com/apache/nifi-minifi-cpp/pull/259 MINIFICPP-393: Add security support for MQTT Thank you for submitting a contribution to Apache NiFi - MiNiFi C++. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with MINIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file? - [ ] If applicable, have you updated the NOTICE file? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/minifirocks/nifi-minifi-cpp mqtt_security Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi-minifi-cpp/pull/259.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #259 commit c5d46bdb9dde46c9c3b8e2bb2f8936c5b59c861d Author: Bin QiuDate: 2018-02-05T15:32:14Z MINIFICPP-393: Add security support for MQTT ---
[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2180#discussion_r165995911 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java --- @@ -152,21 +146,25 @@ public ValidationResult validate(final String subject, final String value, final .displayName("JSON Type") .name("json-type") .description("By default, MongoDB's Java driver returns \"extended JSON\". Some of the features of this variant of JSON" + -" may cause problems for other JSON parsers that expect only standard JSON types and conventions. This configuration setting " + -" controls whether to use extended JSON or provide a clean view that conforms to standard JSON.") +" may cause problems for other JSON parsers that expect only standard JSON types and conventions. This configuration setting " + +" controls whether to use extended JSON or provide a clean view that conforms to standard JSON.") .expressionLanguageSupported(false) .required(true) .build(); private final static Set relationships; private final static List propertyDescriptors; +static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All files are routed to success").build(); + static { List _propertyDescriptors = new ArrayList<>(); _propertyDescriptors.addAll(descriptors); _propertyDescriptors.add(JSON_TYPE); _propertyDescriptors.add(USE_PRETTY_PRINTING); +_propertyDescriptors.add(CHARSET); _propertyDescriptors.add(QUERY); +_propertyDescriptors.add(QUERY_ATTRIBUTE); --- End diff -- Since you are adding an (albeit optional) property to GetMongo, there should be a unit test to cover it. ---
[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2180#discussion_r166001750 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java --- @@ -221,4 +264,16 @@ protected WriteConcern getWriteConcern(final ProcessContext context) { } return writeConcern; } + +protected void writeBatch(String payload, FlowFile parent, ProcessContext context, ProcessSession session, Map extraAttributes, Relationship rel) throws UnsupportedEncodingException { +String charset = parent != null ? context.getProperty(CHARSET).evaluateAttributeExpressions(parent).getValue() +: context.getProperty(CHARSET).evaluateAttributeExpressions().getValue(); + +FlowFile flowFile = session.create(parent); --- End diff -- Need to check if parent is null before using it in the create. A unit test where there is no incoming flow file will illustrate this. ---
[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2180#discussion_r166003849 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java --- @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.mongodb; + +import com.mongodb.BasicDBObject; +import com.mongodb.client.AggregateIterable; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.bson.conversions.Bson; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@Tags({"mongo", "aggregation", "aggregate"}) +@CapabilityDescription("A processor that runs an aggregation query whenever a flowfile is received.") +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +public class RunMongoAggregation extends AbstractMongoProcessor { + +private final static Set relationships; +private final static List propertyDescriptors; + +static final Relationship REL_ORIGINAL = new Relationship.Builder() +.description("The input flowfile gets sent to this relationship when the query succeeds.") +.name("original") +.build(); +static final Relationship REL_FAILURE = new Relationship.Builder() +.description("The input flowfile gets sent to this relationship when the query fails.") +.name("failure") +.build(); +static final Relationship REL_RESULTS = new Relationship.Builder() +.description("The result set of the aggregation will be sent to this relationship.") +.name("results") +.build(); + +static final List buildAggregationQuery(String query) throws IOException { +List result = new ArrayList<>(); + +ObjectMapper mapper = new ObjectMapper(); +List values = mapper.readValue(query, List.class); +for (Map val : values) { +result.add(new BasicDBObject(val)); +} + +return result; +} + +public static final Validator AGG_VALIDATOR = (subject, value, context) -> { +final ValidationResult.Builder builder = new ValidationResult.Builder(); +builder.subject(subject).input(value); + +if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { +return builder.valid(true).explanation("Contains Expression Language").build(); +} + +String reason = null; +try { +buildAggregationQuery(value); +} catch (final RuntimeException | IOException e) { +reason = e.getLocalizedMessage(); +} + +return builder.explanation(reason).valid(reason == null).build(); +}; + +static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() +
[jira] [Created] (MINIFICPP-393) add security support for MQTT
bqiu created MINIFICPP-393: -- Summary: add security support for MQTT Key: MINIFICPP-393 URL: https://issues.apache.org/jira/browse/MINIFICPP-393 Project: NiFi MiNiFi C++ Issue Type: Improvement Affects Versions: 1.0.0 Reporter: bqiu Assignee: bqiu Fix For: 1.0.0 add security support for MQTT -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2180#discussion_r166003772 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java --- @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.mongodb; + +import com.mongodb.BasicDBObject; +import com.mongodb.client.AggregateIterable; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.bson.conversions.Bson; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@Tags({"mongo", "aggregation", "aggregate"}) +@CapabilityDescription("A processor that runs an aggregation query whenever a flowfile is received.") +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +public class RunMongoAggregation extends AbstractMongoProcessor { + +private final static Set relationships; +private final static List propertyDescriptors; + +static final Relationship REL_ORIGINAL = new Relationship.Builder() +.description("The input flowfile gets sent to this relationship when the query succeeds.") +.name("original") +.build(); +static final Relationship REL_FAILURE = new Relationship.Builder() +.description("The input flowfile gets sent to this relationship when the query fails.") +.name("failure") +.build(); +static final Relationship REL_RESULTS = new Relationship.Builder() +.description("The result set of the aggregation will be sent to this relationship.") +.name("results") +.build(); + +static final List buildAggregationQuery(String query) throws IOException { +List result = new ArrayList<>(); + +ObjectMapper mapper = new ObjectMapper(); +List values = mapper.readValue(query, List.class); +for (Map val : values) { +result.add(new BasicDBObject(val)); +} + +return result; +} + +public static final Validator AGG_VALIDATOR = (subject, value, context) -> { +final ValidationResult.Builder builder = new ValidationResult.Builder(); +builder.subject(subject).input(value); + +if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { +return builder.valid(true).explanation("Contains Expression Language").build(); +} + +String reason = null; +try { +buildAggregationQuery(value); +} catch (final RuntimeException | IOException e) { +reason = e.getLocalizedMessage(); +} + +return builder.explanation(reason).valid(reason == null).build(); +}; + +static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() +
[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2180#discussion_r166006323 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java --- @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.mongodb; + +import com.mongodb.BasicDBObject; +import com.mongodb.client.AggregateIterable; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.bson.conversions.Bson; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@Tags({"mongo", "aggregation", "aggregate"}) +@CapabilityDescription("A processor that runs an aggregation query whenever a flowfile is received.") +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +public class RunMongoAggregation extends AbstractMongoProcessor { + +private final static Set relationships; +private final static List propertyDescriptors; + +static final Relationship REL_ORIGINAL = new Relationship.Builder() +.description("The input flowfile gets sent to this relationship when the query succeeds.") +.name("original") +.build(); +static final Relationship REL_FAILURE = new Relationship.Builder() +.description("The input flowfile gets sent to this relationship when the query fails.") +.name("failure") +.build(); +static final Relationship REL_RESULTS = new Relationship.Builder() +.description("The result set of the aggregation will be sent to this relationship.") +.name("results") +.build(); + +static final List buildAggregationQuery(String query) throws IOException { +List result = new ArrayList<>(); + +ObjectMapper mapper = new ObjectMapper(); +List values = mapper.readValue(query, List.class); +for (Map val : values) { +result.add(new BasicDBObject(val)); +} + +return result; +} + +public static final Validator AGG_VALIDATOR = (subject, value, context) -> { +final ValidationResult.Builder builder = new ValidationResult.Builder(); +builder.subject(subject).input(value); + +if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { +return builder.valid(true).explanation("Contains Expression Language").build(); +} + +String reason = null; +try { +buildAggregationQuery(value); +} catch (final RuntimeException | IOException e) { +reason = e.getLocalizedMessage(); +} + +return builder.explanation(reason).valid(reason == null).build(); +}; + +static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() +
[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2180#discussion_r165995402 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java --- @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.mongodb; + +import com.mongodb.BasicDBObject; +import com.mongodb.client.AggregateIterable; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.bson.conversions.Bson; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@Tags({"mongo", "aggregation", "aggregate"}) +@CapabilityDescription("A processor that runs an aggregation query whenever a flowfile is received.") +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +public class RunMongoAggregation extends AbstractMongoProcessor { + +private final static Set relationships; +private final static List propertyDescriptors; + +static final Relationship REL_ORIGINAL = new Relationship.Builder() +.description("The input flowfile gets sent to this relationship when the query succeeds.") +.name("original") +.build(); +static final Relationship REL_FAILURE = new Relationship.Builder() +.description("The input flowfile gets sent to this relationship when the query fails.") +.name("failure") +.build(); +static final Relationship REL_RESULTS = new Relationship.Builder() +.description("The result set of the aggregation will be sent to this relationship.") +.name("results") +.build(); + +static final List buildAggregationQuery(String query) throws IOException { +List result = new ArrayList<>(); + +ObjectMapper mapper = new ObjectMapper(); +List values = mapper.readValue(query, List.class); +for (Map val : values) { +result.add(new BasicDBObject(val)); +} + +return result; +} + +public static final Validator AGG_VALIDATOR = (subject, value, context) -> { +final ValidationResult.Builder builder = new ValidationResult.Builder(); +builder.subject(subject).input(value); + +if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { +return builder.valid(true).explanation("Contains Expression Language").build(); +} + +String reason = null; +try { +buildAggregationQuery(value); +} catch (final RuntimeException | IOException e) { +reason = e.getLocalizedMessage(); +} + +return builder.explanation(reason).valid(reason == null).build(); +}; + +static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() +
[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2180#discussion_r165994803 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java --- @@ -95,13 +101,50 @@ public static final PropertyDescriptor WRITE_CONCERN = new PropertyDescriptor.Builder() .name("Write Concern") +.displayName("Write Concern") .description("The write concern to use") .required(true) .allowableValues(WRITE_CONCERN_ACKNOWLEDGED, WRITE_CONCERN_UNACKNOWLEDGED, WRITE_CONCERN_FSYNCED, WRITE_CONCERN_JOURNALED, WRITE_CONCERN_REPLICA_ACKNOWLEDGED, WRITE_CONCERN_MAJORITY) .defaultValue(WRITE_CONCERN_ACKNOWLEDGED) .build(); +static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder() +.name("results-per-flowfile") +.displayName("Results Per FlowFile") +.description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("1") +.build(); + +static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() +.name("Batch Size") +.displayName("Batch Size") +.description("The number of elements returned from the server in one batch.") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("100") +.build(); + +static final PropertyDescriptor QUERY_ATTRIBUTE = new PropertyDescriptor.Builder() +.name("mongo-agg-query-attribute") +.displayName("Query Output Attribute") +.description("If set, the query will be written to a specified attribute on the output flowfiles.") +.expressionLanguageSupported(true) + .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) +.required(false) +.build(); +static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() +.name("el5-charset") --- End diff -- Looks like a copy-paste name, perhaps add a mongo prefix here? ---