[jira] [Commented] (NIFI-2078) State management for processors whose states are managed externally
[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403207#comment-15403207 ] ASF GitHub Bot commented on NIFI-2078: -- Github user ijokarumawak closed the pull request at: https://github.com/apache/nifi/pull/563 > State management for processors whose states are managed externally > --- > > Key: NIFI-2078 > URL: https://issues.apache.org/jira/browse/NIFI-2078 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.1.0 > > > Inherently by the nature of a given processor it may involve state managed by > itself (using nifi state management), or can be managed by some external > service it interacts with (kafka's offset), and theoretically some might have > both going on. With the new state management, we're giving users a way to > reset state managed by nifi for a given processor. But it doesnt apply to > those processors who have external state. > we should consider offering a way to reset state that allows a processor to > call out to whatever external store it impacts -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2078) State management for processors whose states are managed externally
[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403206#comment-15403206 ] ASF GitHub Bot commented on NIFI-2078: -- Github user ijokarumawak commented on the issue: https://github.com/apache/nifi/pull/563 @JPercivall Thanks for taking your time to review it, really appreciated. I agree with breaking this into separate PRs, and moving it to 1.1. I think only NIFI-2441: View state JS issue. I will close this PR and going to submit separate PRs. > State management for processors whose states are managed externally > --- > > Key: NIFI-2078 > URL: https://issues.apache.org/jira/browse/NIFI-2078 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.1.0 > > > Inherently by the nature of a given processor it may involve state managed by > itself (using nifi state management), or can be managed by some external > service it interacts with (kafka's offset), and theoretically some might have > both going on. With the new state management, we're giving users a way to > reset state managed by nifi for a given processor. But it doesnt apply to > those processors who have external state. > we should consider offering a way to reset state that allows a processor to > call out to whatever external store it impacts -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2078) State management for processors whose states are managed externally
[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403171#comment-15403171 ] Joseph Witt commented on NIFI-2078: --- thanks [~ijokarumawak] This is definitely a powerful feature/concept. We need to be sure we can address all the already existing 1.0 items as this release is already quite massive. This being additive in nature to the API means we should be able to add in 1.1 or whatever release without breaking anything. But this one definitely needs considerable testing of course. > State management for processors whose states are managed externally > --- > > Key: NIFI-2078 > URL: https://issues.apache.org/jira/browse/NIFI-2078 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.1.0 > > > Inherently by the nature of a given processor it may involve state managed by > itself (using nifi state management), or can be managed by some external > service it interacts with (kafka's offset), and theoretically some might have > both going on. With the new state management, we're giving users a way to > reset state managed by nifi for a given processor. But it doesnt apply to > those processors who have external state. > we should consider offering a way to reset state that allows a processor to > call out to whatever external store it impacts -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2078) State management for processors whose states are managed externally
[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402977#comment-15402977 ] ASF GitHub Bot commented on NIFI-2078: -- Github user JPercivall commented on the issue: https://github.com/apache/nifi/pull/563 @ijokarumawak there is a lot of great work here but the various tickets that are included in this one PR are making it difficult to review and test. Could we close this PR and break it out into separate ones for each ticket so that they can be addressed individually? That would make it much more efficient to get the bugs fixes into master and properly test the new external state management. Also could we move NIFI-2078 out of 1.0 to 1.1? This is an important new feature with API implications and it would be nice to finishing reviewing once the other API changes take effect and the various UI blocking bugs are addressed. > State management for processors whose states are managed externally > --- > > Key: NIFI-2078 > URL: https://issues.apache.org/jira/browse/NIFI-2078 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.0.0 > > > Inherently by the nature of a given processor it may involve state managed by > itself (using nifi state management), or can be managed by some external > service it interacts with (kafka's offset), and theoretically some might have > both going on. With the new state management, we're giving users a way to > reset state managed by nifi for a given processor. But it doesnt apply to > those processors who have external state. > we should consider offering a way to reset state that allows a processor to > call out to whatever external store it impacts -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2078) State management for processors whose states are managed externally
[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402946#comment-15402946 ] ASF GitHub Bot commented on NIFI-2078: -- Github user JPercivall commented on the issue: https://github.com/apache/nifi/pull/563 Continuing the comment on "linger.ms", it appears when the "linger.ms" property is used in PublishKafka it yields the same results. Going forward it looks like we should be taking two steps: 1: Change our default in PutKafka to match the Kafka default. Looking at the [Kafka documentation](http://kafka.apache.org/documentation.html) the default value for "linger.ms" is 0 seconds, for no delay. 2: Figure out whether the mishandling of the property is a problem in our Processors or the underlying libraries. This may be tricky to do and I can start but I will probably need help. @olegz could you help here? > State management for processors whose states are managed externally > --- > > Key: NIFI-2078 > URL: https://issues.apache.org/jira/browse/NIFI-2078 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.0.0 > > > Inherently by the nature of a given processor it may involve state managed by > itself (using nifi state management), or can be managed by some external > service it interacts with (kafka's offset), and theoretically some might have > both going on. With the new state management, we're giving users a way to > reset state managed by nifi for a given processor. But it doesnt apply to > those processors who have external state. > we should consider offering a way to reset state that allows a processor to > call out to whatever external store it impacts -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2078) State management for processors whose states are managed externally
[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402855#comment-15402855 ] ASF GitHub Bot commented on NIFI-2078: -- Github user JPercivall commented on the issue: https://github.com/apache/nifi/pull/563 @olegz @ijokarumawak I don't think "Queue Buffering Max Time" is working properly. I was getting super poor performance out of PutKafka and couldn't understand why it would take ~5 seconds for a single message to be sent. I then found this property and when I adjusted it lower it fixed my performance problems (and higher according made it worse). Looking at the property descriptor and corresponding property in Kafka ([seach for "linger.ms"](http://kafka.apache.org/documentation.html)), I would expect it to batch multiple flowfiles but instead it just hung and only processed one flowfile per batch. Below are a couple screenshots of the configuration and the stats of running 3 node cluster (saw the saw phenomenon running primary node only and the scheduling period was "0 secs"): ![screen shot 2016-08-01 at 5 22 23 pm](https://cloud.githubusercontent.com/assets/11302527/17309375/8d1189e0-580c-11e6-9c58-cf1a8f9bbc99.png) ![screen shot 2016-08-01 at 5 12 26 pm](https://cloud.githubusercontent.com/assets/11302527/17309350/792e9512-580c-11e6-8d70-19233f7132e7.png) > State management for processors whose states are managed externally > --- > > Key: NIFI-2078 > URL: https://issues.apache.org/jira/browse/NIFI-2078 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.0.0 > > > Inherently by the nature of a given processor it may involve state managed by > itself (using nifi state management), or can be managed by some external > service it interacts with (kafka's offset), and theoretically some might have > both going on. With the new state management, we're giving users a way to > reset state managed by nifi for a given processor. But it doesnt apply to > those processors who have external state. > we should consider offering a way to reset state that allows a processor to > call out to whatever external store it impacts -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2078) State management for processors whose states are managed externally
[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402824#comment-15402824 ] ASF GitHub Bot commented on NIFI-2078: -- Github user JPercivall commented on the issue: https://github.com/apache/nifi/pull/563 Also would it make sense to have in the "View State" menu, instead of just "External" for clustered external state make it "External - Clustered"? > State management for processors whose states are managed externally > --- > > Key: NIFI-2078 > URL: https://issues.apache.org/jira/browse/NIFI-2078 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.0.0 > > > Inherently by the nature of a given processor it may involve state managed by > itself (using nifi state management), or can be managed by some external > service it interacts with (kafka's offset), and theoretically some might have > both going on. With the new state management, we're giving users a way to > reset state managed by nifi for a given processor. But it doesnt apply to > those processors who have external state. > we should consider offering a way to reset state that allows a processor to > call out to whatever external store it impacts -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2078) State management for processors whose states are managed externally
[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15401877#comment-15401877 ] ASF GitHub Bot commented on NIFI-2078: -- Github user ijokarumawak commented on the issue: https://github.com/apache/nifi/pull/563 @JPercivall I've changed how to capture property value changes. Added validation method in ExternalStateManager to validate and capture property values with a validation context. It also eliminates the dependency to EL module from Kafka nar, because it doesn't have to use EL implementation classes at processor codes. Tested with standalone and cluster. Confirmed setting properties with EL using variable registry. I hope this can be merged. Please let me know how you think. Thanks! > State management for processors whose states are managed externally > --- > > Key: NIFI-2078 > URL: https://issues.apache.org/jira/browse/NIFI-2078 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.0.0 > > > Inherently by the nature of a given processor it may involve state managed by > itself (using nifi state management), or can be managed by some external > service it interacts with (kafka's offset), and theoretically some might have > both going on. With the new state management, we're giving users a way to > reset state managed by nifi for a given processor. But it doesnt apply to > those processors who have external state. > we should consider offering a way to reset state that allows a processor to > call out to whatever external store it impacts -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2078) State management for processors whose states are managed externally
[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15401051#comment-15401051 ] ASF GitHub Bot commented on NIFI-2078: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/563#discussion_r72900441 --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/pom.xml --- @@ -35,6 +35,10 @@ nifi-utils +org.apache.nifi +nifi-expression-language --- End diff -- We may be able to improve the framework to provide onPropertyModified a PropertyValue which is already setup. I think valuable registry can be useful with env configs like broker address. Raised an issue [NIFI-2364](https://issues.apache.org/jira/browse/NIFI-2364). > State management for processors whose states are managed externally > --- > > Key: NIFI-2078 > URL: https://issues.apache.org/jira/browse/NIFI-2078 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.0.0 > > > Inherently by the nature of a given processor it may involve state managed by > itself (using nifi state management), or can be managed by some external > service it interacts with (kafka's offset), and theoretically some might have > both going on. With the new state management, we're giving users a way to > reset state managed by nifi for a given processor. But it doesnt apply to > those processors who have external state. > we should consider offering a way to reset state that allows a processor to > call out to whatever external store it impacts -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2078) State management for processors whose states are managed externally
[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15401002#comment-15401002 ] ASF GitHub Bot commented on NIFI-2078: -- Github user ijokarumawak commented on the issue: https://github.com/apache/nifi/pull/563 @JPercivall I was able to reproduce the behavior with Kafka 0.10.0. But figured out the cause comes from a different change. [NIFI-2355](https://issues.apache.org/jira/browse/NIFI-2355) and its PR #715 added permission check to table sort comparator, in nf-component-state.js. I think it shouldn't be added to state table, since a given user won't have record level permission difference for component state. #715 has the same change for changes nf-controller-services.js and other JS files. I assume adding the permission change to nf-component-state.js was a mistake. By removing the change from nf-component-state.js, I was able to view external state with Kafka 0.10.0 Broker. It's worth noting that I got the same JS error with ListFile when I viewed its state, so the problem is not limited to external state. Failed NIFI-2441 for that issue. > State management for processors whose states are managed externally > --- > > Key: NIFI-2078 > URL: https://issues.apache.org/jira/browse/NIFI-2078 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.0.0 > > > Inherently by the nature of a given processor it may involve state managed by > itself (using nifi state management), or can be managed by some external > service it interacts with (kafka's offset), and theoretically some might have > both going on. With the new state management, we're giving users a way to > reset state managed by nifi for a given processor. But it doesnt apply to > those processors who have external state. > we should consider offering a way to reset state that allows a processor to > call out to whatever external store it impacts -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2078) State management for processors whose states are managed externally
[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15399764#comment-15399764 ] ASF GitHub Bot commented on NIFI-2078: -- Github user JPercivall commented on the issue: https://github.com/apache/nifi/pull/563 Hey @ijokarumawak, I tried testing this using Consume/Publish Kafka with Kafka 0.10.0 but I'm not seeing any state come up with I view state for ConsumeKafka. In the below screenshots you can see the data is following but no state is being stored. I have tried all 3 values of the "offset reset" for ConsumeKafka but that doesn't make a difference. Is there something special I should be doing? ![screen shot 2016-07-29 at 12 54 58 pm](https://cloud.githubusercontent.com/assets/11302527/17256496/c2e0005c-558b-11e6-8616-ec40ded55155.png) ![screen shot 2016-07-29 at 12 54 09 pm](https://cloud.githubusercontent.com/assets/11302527/17256501/cb3bd4ba-558b-11e6-896e-378f8a08c51f.png) > State management for processors whose states are managed externally > --- > > Key: NIFI-2078 > URL: https://issues.apache.org/jira/browse/NIFI-2078 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.0.0 > > > Inherently by the nature of a given processor it may involve state managed by > itself (using nifi state management), or can be managed by some external > service it interacts with (kafka's offset), and theoretically some might have > both going on. With the new state management, we're giving users a way to > reset state managed by nifi for a given processor. But it doesnt apply to > those processors who have external state. > we should consider offering a way to reset state that allows a processor to > call out to whatever external store it impacts -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2078) State management for processors whose states are managed externally
[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15399724#comment-15399724 ] ASF GitHub Bot commented on NIFI-2078: -- Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/563#discussion_r72829933 --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java --- @@ -481,4 +493,59 @@ private void releaseFlowFile(FlowFile flowFile, ProcessSession session, Map partitionOffsets = KafkaUtils.retrievePartitionOffsets(zookeeperConnectionString, topic, groupId); + +return new StandardStateMap(partitionOffsets, System.currentTimeMillis()); +} + +private boolean isReadyToAccessState() { +return !StringUtils.isEmpty(zookeeperConnectionString) +&& !StringUtils.isEmpty(topic) +&& !StringUtils.isEmpty(groupId); +} + +@Override +public void clearExternalState() throws IOException { +if (!isReadyToAccessState()) { +return; +} +// Block onTrigger starts creating new consumer until clear offset finishes. +synchronized (this.consumerStreamsReady) { +KafkaUtils.clearPartitionOffsets(zookeeperConnectionString, topic, groupId); +} +} + +/** + * GetKafka overrides this method in order to capture processor's property values required when it retrieves + * its state managed externally at Kafka. Since view/clear state operation can be executed before onTrigger() is called, + * we need to capture these values as it's modified. This method is also called when NiFi restarts and loads configs, + * so users can access external states right after restart of NiFi. + * @param descriptor of the modified property + * @param oldValue non-null property value (previous) + * @param newValue the new property value or if null indicates the property + */ +@Override +public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { +if (ZOOKEEPER_CONNECTION_STRING.equals(descriptor)) { +zookeeperConnectionString = newValue; +} else if (TOPIC.equals(descriptor)) { +topic = newValue; +} else if (GROUP_ID.equals(descriptor)) { +groupId = newValue; --- End diff -- I ran into an error due to how GroupId is getting set. This only gets called if someone modifies the groupid but it has a default value. So it will be null if the user never modifies it and they will hit this error: 2016-07-29 13:24:12,283 WARN [Timer-Driven Process Thread-6] o.apache.nifi.processors.kafka.GetKafka GetKafka[id=3798f2c6-0156-1000--138083b7] Processor Administratively Yielded for 1 sec due to processing failure 2016-07-29 13:24:12,283 WARN [Timer-Driven Process Thread-6] o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding GetKafka[id=3798f2c6-0156-1000--138083b7] due to uncaught Exception: java.lang.IllegalStateException: java.util.concurrent.ExecutionException: java.lang.NullPointerException 2016-07-29 13:24:12,284 WARN [Timer-Driven Process Thread-6] o.a.n.c.t.ContinuallyRunProcessorTask java.lang.IllegalStateException: java.util.concurrent.ExecutionException: java.lang.NullPointerException at org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:367) ~[na:na] at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) ~[nifi-api-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT] at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1060) ~[nifi-framework-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT] at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT] at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT] at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123) [nifi-framework-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_74] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_74] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_74] at
[jira] [Commented] (NIFI-2078) State management for processors whose states are managed externally
[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15399552#comment-15399552 ] ASF GitHub Bot commented on NIFI-2078: -- Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/563#discussion_r72814952 --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/pom.xml --- @@ -35,6 +35,10 @@ nifi-utils +org.apache.nifi +nifi-expression-language --- End diff -- @ijokarumawak supporting EL for those properties makes sense. There are subject-less functions in EL as well as the ability to reference system, environment and custom variables set through the variable registry by using EL. I was just confused because since they already had references to EL functions, why did this need to get added as an explicit dependency? > State management for processors whose states are managed externally > --- > > Key: NIFI-2078 > URL: https://issues.apache.org/jira/browse/NIFI-2078 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.0.0 > > > Inherently by the nature of a given processor it may involve state managed by > itself (using nifi state management), or can be managed by some external > service it interacts with (kafka's offset), and theoretically some might have > both going on. With the new state management, we're giving users a way to > reset state managed by nifi for a given processor. But it doesnt apply to > those processors who have external state. > we should consider offering a way to reset state that allows a processor to > call out to whatever external store it impacts -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2078) State management for processors whose states are managed externally
[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15397205#comment-15397205 ] ASF GitHub Bot commented on NIFI-2078: -- Github user ijokarumawak commented on the issue: https://github.com/apache/nifi/pull/563 @JPercivall Added another commit to address feedback, added more documentations. Could you review the changes? Thanks! > State management for processors whose states are managed externally > --- > > Key: NIFI-2078 > URL: https://issues.apache.org/jira/browse/NIFI-2078 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.0.0 > > > Inherently by the nature of a given processor it may involve state managed by > itself (using nifi state management), or can be managed by some external > service it interacts with (kafka's offset), and theoretically some might have > both going on. With the new state management, we're giving users a way to > reset state managed by nifi for a given processor. But it doesnt apply to > those processors who have external state. > we should consider offering a way to reset state that allows a processor to > call out to whatever external store it impacts -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2078) State management for processors whose states are managed externally
[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15397203#comment-15397203 ] ASF GitHub Bot commented on NIFI-2078: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/563#discussion_r72577880 --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java --- @@ -54,7 +72,12 @@ @InputRequirement(Requirement.INPUT_FORBIDDEN) @CapabilityDescription("Consumes messages from Apache Kafka") @Tags({ "Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume" }) -public class ConsumeKafka extends AbstractKafkaProcessor> { +@Stateful(scopes = {Scope.EXTERNAL}, description = "After consuming messages, ConsumeKafka commits its offset information to Kafka" + +" so that the state of a consumer group can be retained across events such as consumer reconnect." + +" Offsets can be cleared when there is no consumer subscribing with the same consumer group id." + +" It may take more than 30 seconds for a consumer group to become able to be cleared after it is stopped from NiFi." + +" Once offsets are cleared, ConsumeKafka will resume consuming messages based on Offset Reset configuration.") +public class ConsumeKafka extends AbstractKafkaProcessor > implements ExternalStateManager { --- End diff -- Added documentation in Developer Guide. > State management for processors whose states are managed externally > --- > > Key: NIFI-2078 > URL: https://issues.apache.org/jira/browse/NIFI-2078 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.0.0 > > > Inherently by the nature of a given processor it may involve state managed by > itself (using nifi state management), or can be managed by some external > service it interacts with (kafka's offset), and theoretically some might have > both going on. With the new state management, we're giving users a way to > reset state managed by nifi for a given processor. But it doesnt apply to > those processors who have external state. > we should consider offering a way to reset state that allows a processor to > call out to whatever external store it impacts -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2078) State management for processors whose states are managed externally
[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15397200#comment-15397200 ] ASF GitHub Bot commented on NIFI-2078: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/563#discussion_r7257 --- Diff: nifi-api/src/main/java/org/apache/nifi/components/state/ExternalStateManager.java --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.state; + +import org.apache.nifi.annotation.behavior.Stateful; + +import java.io.IOException; + +/** + * + * The ExternalStateManager is responsible for providing NiFi a mechanism for retrieving + * and clearing state stored in an external system a NiFi component interact with. + * + * + * + * When calling methods in this class, the state is always retrieved/cleared from external system + * regardless NiFi instance is a part of a cluster or standalone. + * + * + * + * This mechanism is designed to allow developers to easily store and retrieve small amounts of state. + * Since implementation of this interface interacts with remote system, one should consider the cost of + * retrieving this data, and the amount of data should be kept to the minimum required. --- End diff -- Updated, please check the latest commit. > State management for processors whose states are managed externally > --- > > Key: NIFI-2078 > URL: https://issues.apache.org/jira/browse/NIFI-2078 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.0.0 > > > Inherently by the nature of a given processor it may involve state managed by > itself (using nifi state management), or can be managed by some external > service it interacts with (kafka's offset), and theoretically some might have > both going on. With the new state management, we're giving users a way to > reset state managed by nifi for a given processor. But it doesnt apply to > those processors who have external state. > we should consider offering a way to reset state that allows a processor to > call out to whatever external store it impacts -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2078) State management for processors whose states are managed externally
[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15396881#comment-15396881 ] ASF GitHub Bot commented on NIFI-2078: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/563#discussion_r72562357 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java --- @@ -91,7 +92,7 @@ public ClusterProtocolHeartbeatMonitor(final ClusterCoordinator clusterCoordinat this.clusterNodesPath = zkClientConfig.resolvePath("cluster/nodes"); String hostname = properties.getProperty(NiFiProperties.CLUSTER_NODE_ADDRESS); -if (hostname == null) { +if (StringUtils.isEmpty(hostname)) { --- End diff -- Thanks for pointing this out. I will remove these changes from this PR. Posted my comments on #688. > State management for processors whose states are managed externally > --- > > Key: NIFI-2078 > URL: https://issues.apache.org/jira/browse/NIFI-2078 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.0.0 > > > Inherently by the nature of a given processor it may involve state managed by > itself (using nifi state management), or can be managed by some external > service it interacts with (kafka's offset), and theoretically some might have > both going on. With the new state management, we're giving users a way to > reset state managed by nifi for a given processor. But it doesnt apply to > those processors who have external state. > we should consider offering a way to reset state that allows a processor to > call out to whatever external store it impacts -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2078) State management for processors whose states are managed externally
[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15396843#comment-15396843 ] ASF GitHub Bot commented on NIFI-2078: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/563#discussion_r72559943 --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/pom.xml --- @@ -35,6 +35,10 @@ nifi-utils +org.apache.nifi +nifi-expression-language --- End diff -- BOOTSTRAP_SERVERS, TOPIC, CLIENT_ID are the properties shared among Consume/ProduceKafka. If we can separate and let Consume, ProduceKafka have those as its own property, ProduceKafka to support EL but not ConsumeKafka, then the dependency can be removed. But existing flow configuration may have to be updated if one uses EL for those properties of ConsumeKafka. @olegz @JPercivall How do you think? > State management for processors whose states are managed externally > --- > > Key: NIFI-2078 > URL: https://issues.apache.org/jira/browse/NIFI-2078 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.0.0 > > > Inherently by the nature of a given processor it may involve state managed by > itself (using nifi state management), or can be managed by some external > service it interacts with (kafka's offset), and theoretically some might have > both going on. With the new state management, we're giving users a way to > reset state managed by nifi for a given processor. But it doesnt apply to > those processors who have external state. > we should consider offering a way to reset state that allows a processor to > call out to whatever external store it impacts -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2078) State management for processors whose states are managed externally
[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15396834#comment-15396834 ] ASF GitHub Bot commented on NIFI-2078: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/563#discussion_r72559265 --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/pom.xml --- @@ -35,6 +35,10 @@ nifi-utils +org.apache.nifi +nifi-expression-language --- End diff -- I added EL dependency to use StandardPropertyValue class at ConsumeKafka.onPropertyModified. Since ConsumeKafka allows EL for properties that needed to get state from Kafka, EL has to be evaluated at onPropertyModified, too. But to do so, we need PropertyValue instance, which is not available at onPropertyModified because it only receives String representation of oldValue and newValue. An implementation class of PropertyValue is not included in nifi-api. So, I needed to add the EL dependency. However, I felt it's a little bit strange to support EL for things like 'topic' or 'bootstrap_servers', since ConsumeKafka doesn't take input flow files, and these properties can not be changed after ConsumerKafka connects to Kafka. It keep using the same kafkaResource instance. So, those property values are more static, rather than dynamically evaluated. It makes sense to support EL for those values for PublishKafka, though. > State management for processors whose states are managed externally > --- > > Key: NIFI-2078 > URL: https://issues.apache.org/jira/browse/NIFI-2078 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.0.0 > > > Inherently by the nature of a given processor it may involve state managed by > itself (using nifi state management), or can be managed by some external > service it interacts with (kafka's offset), and theoretically some might have > both going on. With the new state management, we're giving users a way to > reset state managed by nifi for a given processor. But it doesnt apply to > those processors who have external state. > we should consider offering a way to reset state that allows a processor to > call out to whatever external store it impacts -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2078) State management for processors whose states are managed externally
[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15396807#comment-15396807 ] ASF GitHub Bot commented on NIFI-2078: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/563#discussion_r72557366 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ClearComponentStateEndpointMerger.java --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.entity.ClearComponentStateResultEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +public class ClearComponentStateEndpointMerger extends AbstractSingleEntityEndpoint { +public static final Pattern PROCESSOR_STATE_URI_PATTERN = Pattern.compile("/nifi-api/processors/[a-f0-9\\-]{36}/state/clear-requests"); +public static final Pattern CONTROLLER_SERVICE_STATE_URI_PATTERN = Pattern.compile("/nifi-api/controller-services/[a-f0-9\\-]{36}/state/clear-requests"); +public static final Pattern REPORTING_TASK_STATE_URI_PATTERN = Pattern.compile("/nifi-api/reporting-tasks/[a-f0-9\\-]{36}/state/clear-requests"); --- End diff -- Yes, that's how it's done today. There may be some way to get URI patterns align with corresponding Resource class methods, though. > State management for processors whose states are managed externally > --- > > Key: NIFI-2078 > URL: https://issues.apache.org/jira/browse/NIFI-2078 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.0.0 > > > Inherently by the nature of a given processor it may involve state managed by > itself (using nifi state management), or can be managed by some external > service it interacts with (kafka's offset), and theoretically some might have > both going on. With the new state management, we're giving users a way to > reset state managed by nifi for a given processor. But it doesnt apply to > those processors who have external state. > we should consider offering a way to reset state that allows a processor to > call out to whatever external store it impacts -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2078) State management for processors whose states are managed externally
[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15394552#comment-15394552 ] ASF GitHub Bot commented on NIFI-2078: -- Github user JPercivall commented on the issue: https://github.com/apache/nifi/pull/563 @olegz, when you get a chance could you review the Get/Consume Kafka changes in this PR? > State management for processors whose states are managed externally > --- > > Key: NIFI-2078 > URL: https://issues.apache.org/jira/browse/NIFI-2078 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.0.0 > > > Inherently by the nature of a given processor it may involve state managed by > itself (using nifi state management), or can be managed by some external > service it interacts with (kafka's offset), and theoretically some might have > both going on. With the new state management, we're giving users a way to > reset state managed by nifi for a given processor. But it doesnt apply to > those processors who have external state. > we should consider offering a way to reset state that allows a processor to > call out to whatever external store it impacts -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2078) State management for processors whose states are managed externally
[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15394524#comment-15394524 ] ASF GitHub Bot commented on NIFI-2078: -- Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/563#discussion_r72334765 --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/pom.xml --- @@ -35,6 +35,10 @@ nifi-utils +org.apache.nifi +nifi-expression-language --- End diff -- This module didn't already have access to EL? > State management for processors whose states are managed externally > --- > > Key: NIFI-2078 > URL: https://issues.apache.org/jira/browse/NIFI-2078 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.0.0 > > > Inherently by the nature of a given processor it may involve state managed by > itself (using nifi state management), or can be managed by some external > service it interacts with (kafka's offset), and theoretically some might have > both going on. With the new state management, we're giving users a way to > reset state managed by nifi for a given processor. But it doesnt apply to > those processors who have external state. > we should consider offering a way to reset state that allows a processor to > call out to whatever external store it impacts -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2078) State management for processors whose states are managed externally
[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15394460#comment-15394460 ] ASF GitHub Bot commented on NIFI-2078: -- Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/563#discussion_r72326199 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ClearComponentStateEndpointMerger.java --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.entity.ClearComponentStateResultEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +public class ClearComponentStateEndpointMerger extends AbstractSingleEntityEndpoint { +public static final Pattern PROCESSOR_STATE_URI_PATTERN = Pattern.compile("/nifi-api/processors/[a-f0-9\\-]{36}/state/clear-requests"); +public static final Pattern CONTROLLER_SERVICE_STATE_URI_PATTERN = Pattern.compile("/nifi-api/controller-services/[a-f0-9\\-]{36}/state/clear-requests"); +public static final Pattern REPORTING_TASK_STATE_URI_PATTERN = Pattern.compile("/nifi-api/reporting-tasks/[a-f0-9\\-]{36}/state/clear-requests"); --- End diff -- Ah I see that's how it's done in all the classes in the same package > State management for processors whose states are managed externally > --- > > Key: NIFI-2078 > URL: https://issues.apache.org/jira/browse/NIFI-2078 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.0.0 > > > Inherently by the nature of a given processor it may involve state managed by > itself (using nifi state management), or can be managed by some external > service it interacts with (kafka's offset), and theoretically some might have > both going on. With the new state management, we're giving users a way to > reset state managed by nifi for a given processor. But it doesnt apply to > those processors who have external state. > we should consider offering a way to reset state that allows a processor to > call out to whatever external store it impacts -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2078) State management for processors whose states are managed externally
[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15394457#comment-15394457 ] ASF GitHub Bot commented on NIFI-2078: -- Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/563#discussion_r72325932 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java --- @@ -91,7 +92,7 @@ public ClusterProtocolHeartbeatMonitor(final ClusterCoordinator clusterCoordinat this.clusterNodesPath = zkClientConfig.resolvePath("cluster/nodes"); String hostname = properties.getProperty(NiFiProperties.CLUSTER_NODE_ADDRESS); -if (hostname == null) { +if (StringUtils.isEmpty(hostname)) { --- End diff -- This change will conflict with @markap14 change in this PR: https://github.com/apache/nifi/pull/688/files > State management for processors whose states are managed externally > --- > > Key: NIFI-2078 > URL: https://issues.apache.org/jira/browse/NIFI-2078 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.0.0 > > > Inherently by the nature of a given processor it may involve state managed by > itself (using nifi state management), or can be managed by some external > service it interacts with (kafka's offset), and theoretically some might have > both going on. With the new state management, we're giving users a way to > reset state managed by nifi for a given processor. But it doesnt apply to > those processors who have external state. > we should consider offering a way to reset state that allows a processor to > call out to whatever external store it impacts -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2078) State management for processors whose states are managed externally
[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15394454#comment-15394454 ] ASF GitHub Bot commented on NIFI-2078: -- Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/563#discussion_r72325489 --- Diff: nifi-api/src/main/java/org/apache/nifi/components/state/ExternalStateManager.java --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.state; + +import org.apache.nifi.annotation.behavior.Stateful; + +import java.io.IOException; + +/** + * + * The ExternalStateManager is responsible for providing NiFi a mechanism for retrieving + * and clearing state stored in an external system a NiFi component interact with. + * + * + * + * When calling methods in this class, the state is always retrieved/cleared from external system + * regardless NiFi instance is a part of a cluster or standalone. + * + * + * + * This mechanism is designed to allow developers to easily store and retrieve small amounts of state. + * Since implementation of this interface interacts with remote system, one should consider the cost of + * retrieving this data, and the amount of data should be kept to the minimum required. --- End diff -- This paragraph doesn't make sense for External state since the user probably isn't choosing what to store > State management for processors whose states are managed externally > --- > > Key: NIFI-2078 > URL: https://issues.apache.org/jira/browse/NIFI-2078 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.0.0 > > > Inherently by the nature of a given processor it may involve state managed by > itself (using nifi state management), or can be managed by some external > service it interacts with (kafka's offset), and theoretically some might have > both going on. With the new state management, we're giving users a way to > reset state managed by nifi for a given processor. But it doesnt apply to > those processors who have external state. > we should consider offering a way to reset state that allows a processor to > call out to whatever external store it impacts -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2078) State management for processors whose states are managed externally
[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15394344#comment-15394344 ] ASF GitHub Bot commented on NIFI-2078: -- Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/563#discussion_r72315293 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ClearComponentStateEndpointMerger.java --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.entity.ClearComponentStateResultEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +public class ClearComponentStateEndpointMerger extends AbstractSingleEntityEndpoint { +public static final Pattern PROCESSOR_STATE_URI_PATTERN = Pattern.compile("/nifi-api/processors/[a-f0-9\\-]{36}/state/clear-requests"); +public static final Pattern CONTROLLER_SERVICE_STATE_URI_PATTERN = Pattern.compile("/nifi-api/controller-services/[a-f0-9\\-]{36}/state/clear-requests"); +public static final Pattern REPORTING_TASK_STATE_URI_PATTERN = Pattern.compile("/nifi-api/reporting-tasks/[a-f0-9\\-]{36}/state/clear-requests"); --- End diff -- Hard coding these URI patterns feels wrong. Any changes to these URIs in another PR could mess this up entirely without anyone noticing. > State management for processors whose states are managed externally > --- > > Key: NIFI-2078 > URL: https://issues.apache.org/jira/browse/NIFI-2078 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.0.0 > > > Inherently by the nature of a given processor it may involve state managed by > itself (using nifi state management), or can be managed by some external > service it interacts with (kafka's offset), and theoretically some might have > both going on. With the new state management, we're giving users a way to > reset state managed by nifi for a given processor. But it doesnt apply to > those processors who have external state. > we should consider offering a way to reset state that allows a processor to > call out to whatever external store it impacts -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2078) State management for processors whose states are managed externally
[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15391478#comment-15391478 ] ASF GitHub Bot commented on NIFI-2078: -- Github user ijokarumawak commented on the issue: https://github.com/apache/nifi/pull/563 Hello @JPercivall @olegz @markap14 I've updated Cluster and External state behavior in a NiFi cluster, so that only a Primary node access Zookeeper or External data store. Tested, with the latest master branch. Would you be able to resume review process for this? Now this PR contains three related JIRA issues: - [NIFI-2078](https://issues.apache.org/jira/browse/NIFI-2078): State managed externally - [NIFI-2363](https://issues.apache.org/jira/browse/NIFI-2363): CLUSTER scope should be managed on primary node - [NIFI-2364](https://issues.apache.org/jira/browse/NIFI-2364): Avoid being disconnected by error related to external system Code change summaries for each topic is written in the commit message. For External state, it's also possible to make the state external local (per node). Please see following screenshots, fig-1 and 2. fig-1: External - CLUSTER ![External - CLUSTER](https://cloud.githubusercontent.com/assets/1107620/17093451/5be50578-5284-11e6-8c74-f370a75c43da.png) fig-2: External - LOCAL, GetKafka in this image is just an example. The actual code uses External - CLUSTER scope. There's no implementation using External - LOCAL as of now ![External - LOCAL](https://cloud.githubusercontent.com/assets/1107620/17093484/7f60e3dc-5284-11e6-87cc-b0acc0c4a861.png) Thanks! > State management for processors whose states are managed externally > --- > > Key: NIFI-2078 > URL: https://issues.apache.org/jira/browse/NIFI-2078 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.0.0 > > > Inherently by the nature of a given processor it may involve state managed by > itself (using nifi state management), or can be managed by some external > service it interacts with (kafka's offset), and theoretically some might have > both going on. With the new state management, we're giving users a way to > reset state managed by nifi for a given processor. But it doesnt apply to > those processors who have external state. > we should consider offering a way to reset state that allows a processor to > call out to whatever external store it impacts -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2078) State management for processors whose states are managed externally
[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15373900#comment-15373900 ] ASF GitHub Bot commented on NIFI-2078: -- Github user mcgilman commented on the issue: https://github.com/apache/nifi/pull/563 @ijokarumawak Awesome, thank you. There was just some confusion about whether we could resolve the PR for NIFI-2078 and just wanted to verify with you first. > State management for processors whose states are managed externally > --- > > Key: NIFI-2078 > URL: https://issues.apache.org/jira/browse/NIFI-2078 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.0.0 > > > Inherently by the nature of a given processor it may involve state managed by > itself (using nifi state management), or can be managed by some external > service it interacts with (kafka's offset), and theoretically some might have > both going on. With the new state management, we're giving users a way to > reset state managed by nifi for a given processor. But it doesnt apply to > those processors who have external state. > we should consider offering a way to reset state that allows a processor to > call out to whatever external store it impacts -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2078) State management for processors whose states are managed externally
[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15370002#comment-15370002 ] ASF GitHub Bot commented on NIFI-2078: -- Github user ijokarumawak commented on the issue: https://github.com/apache/nifi/pull/563 Thanks @markap14 . I understand, I'll test to see if this works well with primary node failure and auto-elect scenario. It'd be the best if we could control nodes we replicate the HTTP requests to, however, getState and clearState are designed as per component request, meaning, local, cluster and external state are accessed all at once. So I will simply replicate HTTP requests to all nodes as it does now, and check the external state scope at DAO or service layer. > State management for processors whose states are managed externally > --- > > Key: NIFI-2078 > URL: https://issues.apache.org/jira/browse/NIFI-2078 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.0.0 > > > Inherently by the nature of a given processor it may involve state managed by > itself (using nifi state management), or can be managed by some external > service it interacts with (kafka's offset), and theoretically some might have > both going on. With the new state management, we're giving users a way to > reset state managed by nifi for a given processor. But it doesnt apply to > those processors who have external state. > we should consider offering a way to reset state that allows a processor to > call out to whatever external store it impacts -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2078) State management for processors whose states are managed externally
[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15366309#comment-15366309 ] ASF GitHub Bot commented on NIFI-2078: -- Github user olegz commented on the issue: https://github.com/apache/nifi/pull/563 @ijokarumawak I am not yet done, but good that you have rebased. I am just tied up with another thing at the moment, but will be reviewing later on > State management for processors whose states are managed externally > --- > > Key: NIFI-2078 > URL: https://issues.apache.org/jira/browse/NIFI-2078 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.0.0 > > > Inherently by the nature of a given processor it may involve state managed by > itself (using nifi state management), or can be managed by some external > service it interacts with (kafka's offset), and theoretically some might have > both going on. With the new state management, we're giving users a way to > reset state managed by nifi for a given processor. But it doesnt apply to > those processors who have external state. > we should consider offering a way to reset state that allows a processor to > call out to whatever external store it impacts -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2078) State management for processors whose states are managed externally
[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15366305#comment-15366305 ] ASF GitHub Bot commented on NIFI-2078: -- Github user ijokarumawak commented on the issue: https://github.com/apache/nifi/pull/563 @JPercivall @olegz Thanks for reviewing this! I've rebased the branch, and addressed the feedback. However, while I was testing this again, I noticed that if it's running on a clustered NiFi environment, every node gets and clears offsets from Kafka. While it's working, I think these method should be executed on only the primary node, or if it's a standalone instance since offsets information is a global state among consumers. With `@OnPrimaryNodeStateChange` annotation, it's possible to know if it's a cluster and a primary node. But it's only called if a node get elected as a primary, or a node was primary but revoked. Since other node is not notified, I couldn't distinguish standalone or primary from a processor.. So, I left it as it is for now. Do you know If there's a way to know if NiFi is clustered from a processor? > State management for processors whose states are managed externally > --- > > Key: NIFI-2078 > URL: https://issues.apache.org/jira/browse/NIFI-2078 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.0.0 > > > Inherently by the nature of a given processor it may involve state managed by > itself (using nifi state management), or can be managed by some external > service it interacts with (kafka's offset), and theoretically some might have > both going on. With the new state management, we're giving users a way to > reset state managed by nifi for a given processor. But it doesnt apply to > those processors who have external state. > we should consider offering a way to reset state that allows a processor to > call out to whatever external store it impacts -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2078) State management for processors whose states are managed externally
[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15366295#comment-15366295 ] ASF GitHub Bot commented on NIFI-2078: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/563#discussion_r69931690 --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java --- @@ -166,4 +170,44 @@ public MessageAndMetadata answer(InvocationOnMock invocation) throws Throwable { } } +@Test +public void testGetState() throws Exception { +final GetKafka processor = new GetKafka(); +final TestRunner runner = TestRunners.newTestRunner(processor); + +assertNull("State should be null when required properties are not specified.", processor.getState()); + +runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, "0.0.0.0:invalid-port"); +runner.setProperty(GetKafka.TOPIC, "testX"); + +assertNull("State should be null when required properties are not specified.", processor.getState()); + +runner.setProperty(GetKafka.GROUP_ID, "consumer-group-id"); + +try { +processor.getState(); +fail("The processor should try to access Zookeeper and should fail since it can not connect."); +} catch (IOException e) { +} +} + +@Test +public void testClearState() throws Exception { +final GetKafka processor = new GetKafka(); +final TestRunner runner = TestRunners.newTestRunner(processor); + +// Clear doesn't do anything until required properties are set. +processor.clear(); + +runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, "0.0.0.0:invalid-port"); +runner.setProperty(GetKafka.TOPIC, "testX"); +runner.setProperty(GetKafka.GROUP_ID, "consumer-group-id"); + +try { +processor.getState(); --- End diff -- Totally a mistake, thanks! Addressed. > State management for processors whose states are managed externally > --- > > Key: NIFI-2078 > URL: https://issues.apache.org/jira/browse/NIFI-2078 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.0.0 > > > Inherently by the nature of a given processor it may involve state managed by > itself (using nifi state management), or can be managed by some external > service it interacts with (kafka's offset), and theoretically some might have > both going on. With the new state management, we're giving users a way to > reset state managed by nifi for a given processor. But it doesnt apply to > those processors who have external state. > we should consider offering a way to reset state that allows a processor to > call out to whatever external store it impacts -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2078) State management for processors whose states are managed externally
[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15366292#comment-15366292 ] ASF GitHub Bot commented on NIFI-2078: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/563#discussion_r69931510 --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java --- @@ -481,4 +496,51 @@ private void releaseFlowFile(FlowFile flowFile, ProcessSession session, Map partitionOffsets = KafkaUtils.retrievePartitionOffsets(zookeeperConnectionString, topic, groupId); + +return new StandardStateMap(partitionOffsets, System.currentTimeMillis()); +} + +private boolean isReadyToAccessState() { +if(StringUtils.isEmpty(zookeeperConnectionString) +|| StringUtils.isEmpty(topic) +|| StringUtils.isEmpty(groupId)) { +return false; +} +return true; +} + +@Override +public void clear() throws IOException { +if (!isReadyToAccessState()) { +return; +} +KafkaUtils.clearPartitionOffsets(zookeeperConnectionString, topic, groupId); --- End diff -- Thanks, good point, I've added synchronized block in clearExternalState(). > State management for processors whose states are managed externally > --- > > Key: NIFI-2078 > URL: https://issues.apache.org/jira/browse/NIFI-2078 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.0.0 > > > Inherently by the nature of a given processor it may involve state managed by > itself (using nifi state management), or can be managed by some external > service it interacts with (kafka's offset), and theoretically some might have > both going on. With the new state management, we're giving users a way to > reset state managed by nifi for a given processor. But it doesnt apply to > those processors who have external state. > we should consider offering a way to reset state that allows a processor to > call out to whatever external store it impacts -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2078) State management for processors whose states are managed externally
[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15366294#comment-15366294 ] ASF GitHub Bot commented on NIFI-2078: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/563#discussion_r69931587 --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java --- @@ -481,4 +496,51 @@ private void releaseFlowFile(FlowFile flowFile, ProcessSession session, Map partitionOffsets = KafkaUtils.retrievePartitionOffsets(zookeeperConnectionString, topic, groupId); + +return new StandardStateMap(partitionOffsets, System.currentTimeMillis()); +} + +private boolean isReadyToAccessState() { +if(StringUtils.isEmpty(zookeeperConnectionString) +|| StringUtils.isEmpty(topic) +|| StringUtils.isEmpty(groupId)) { +return false; +} +return true; --- End diff -- Yes, it would. Addressed, thanks! > State management for processors whose states are managed externally > --- > > Key: NIFI-2078 > URL: https://issues.apache.org/jira/browse/NIFI-2078 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.0.0 > > > Inherently by the nature of a given processor it may involve state managed by > itself (using nifi state management), or can be managed by some external > service it interacts with (kafka's offset), and theoretically some might have > both going on. With the new state management, we're giving users a way to > reset state managed by nifi for a given processor. But it doesnt apply to > those processors who have external state. > we should consider offering a way to reset state that allows a processor to > call out to whatever external store it impacts -- This message was sent by Atlassian JIRA (v6.3.4#6332)