Repository: nifi Updated Branches: refs/heads/master 289dde098 -> 8e1c79eaa
NIFI-1963 Allows a node reconnecting to the cluster to inherit non-fingerprinted processor settings Forces a node reconnecting to a cluster to serialize the updated flow to disk Added most processor settings to the flow fingerprint (excluding name, style, comment, position, and schedule state) Updated some test data for FingerprintFactoryTest to test for new fields added to the flow fingerprint Updated StandardProcessorNode to allow processor comments and name to be settable while a processor is running Updated StandardFlowSynchronizer to inherit non-fingerprinted processor settings (name, style, comment, and position) when flow is already synchronized This closes #1812 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8e1c79ea Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8e1c79ea Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8e1c79ea Branch: refs/heads/master Commit: 8e1c79eaafe886e85e4ceaf8436b961eccfef568 Parents: 289dde0 Author: Jeff Storck <jtsw...@gmail.com> Authored: Mon May 15 17:21:11 2017 -0400 Committer: Matt Gilman <matt.c.gil...@gmail.com> Committed: Wed May 17 17:35:33 2017 -0400 ---------------------------------------------------------------------- .../nifi/controller/StandardFlowService.java | 4 ++- .../controller/StandardFlowSynchronizer.java | 14 +++++++--- .../nifi/controller/StandardProcessorNode.java | 6 ----- .../nifi/fingerprint/FingerprintFactory.java | 19 ++++++++++++++ .../test/resources/nifi/fingerprint/flow1a.xml | 17 +++++++++--- .../test/resources/nifi/fingerprint/flow1b.xml | 27 +++++++++++++++----- .../test/resources/nifi/fingerprint/flow2.xml | 17 +++++++++--- 7 files changed, 80 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/8e1c79ea/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java index 0ce6742..b2c1628 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java @@ -643,7 +643,9 @@ public class StandardFlowService implements FlowService, ProtocolHandler { clusterCoordinator.resetNodeStatuses(connectionResponse.getNodeConnectionStatuses().stream() .collect(Collectors.toMap(status -> status.getNodeIdentifier(), status -> status))); - controller.resumeHeartbeats(); // we are now connected, so resume sending heartbeats. + // reconnected, this node needs to explicitly write the inherited flow to disk, and resume heartbeats + saveFlowChanges(); + controller.resumeHeartbeats(); logger.info("Node reconnected."); } catch (final Exception ex) { http://git-wip-us.apache.org/repos/asf/nifi/blob/8e1c79ea/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 09338c9..975f954 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -725,6 +725,8 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { final ProcessorDTO dto = FlowFromDOMFactory.getProcessor(processorElement, encryptor); final ProcessorNode procNode = processGroup.getProcessor(dto.getId()); + updateNonFingerprintedProcessorSettings(procNode, dto); + if (!procNode.getScheduledState().name().equals(dto.getState())) { try { switch (ScheduledState.valueOf(dto.getState())) { @@ -964,15 +966,12 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { private void updateProcessor(final ProcessorNode procNode, final ProcessorDTO processorDTO, final ProcessGroup processGroup, final FlowController controller) throws ProcessorInstantiationException { final ProcessorConfigDTO config = processorDTO.getConfig(); - procNode.setPosition(toPosition(processorDTO.getPosition())); - procNode.setName(processorDTO.getName()); - procNode.setStyle(processorDTO.getStyle()); procNode.setProcessGroup(processGroup); - procNode.setComments(config.getComments()); procNode.setLossTolerant(config.isLossTolerant()); procNode.setPenalizationPeriod(config.getPenaltyDuration()); procNode.setYieldPeriod(config.getYieldDuration()); procNode.setBulletinLevel(LogLevel.valueOf(config.getBulletinLevel())); + updateNonFingerprintedProcessorSettings(procNode, processorDTO); if (config.getSchedulingStrategy() != null) { procNode.setSchedulingStrategy(SchedulingStrategy.valueOf(config.getSchedulingStrategy())); @@ -1011,6 +1010,13 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } } + private void updateNonFingerprintedProcessorSettings(final ProcessorNode procNode, final ProcessorDTO processorDTO) { + procNode.setName(processorDTO.getName()); + procNode.setPosition(toPosition(processorDTO.getPosition())); + procNode.setStyle(processorDTO.getStyle()); + procNode.setComments(processorDTO.getConfig().getComments()); + } + private ProcessGroup addProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement, final StringEncryptor encryptor, final FlowEncodingVersion encodingVersion) throws ProcessorInstantiationException { // get the parent group ID http://git-wip-us.apache.org/repos/asf/nifi/blob/8e1c79ea/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index 1a1acc0..6d96a5c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -256,9 +256,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable */ @Override public synchronized void setComments(final String comments) { - if (isRunning()) { - throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); - } this.comments.set(CharacterFilterUtils.filterInvalidXmlCharacters(comments)); } @@ -405,9 +402,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable @Override public synchronized void setName(final String name) { - if (isRunning()) { - throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); - } super.setName(name); } http://git-wip-us.apache.org/repos/asf/nifi/blob/8e1c79ea/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java index d9e048e..1ef3e8b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java @@ -341,6 +341,25 @@ public class FingerprintFactory { final BundleDTO bundle = FlowFromDOMFactory.getBundle(DomUtils.getChild(processorElem, "bundle")); addBundleFingerprint(builder, bundle); + // max concurrent tasks + appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "maxConcurrentTasks")); + // scheduling period + appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "schedulingPeriod")); + // penalization period + appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "penalizationPeriod")); + // yield period + appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "yieldPeriod")); + // bulletin level + appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "bulletinLevel")); + // loss tolerant + appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "lossTolerant")); + // scheduling strategy + appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "schedulingStrategy")); + // execution node + appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "executionNode")); + // run duration nanos + appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "runDurationNanos")); + // get the temp instance of the Processor so that we know the default property values final BundleCoordinate coordinate = getCoordinate(className, bundle); final ConfigurableComponent configurableComponent = ExtensionManager.getTempComponent(className, coordinate); http://git-wip-us.apache.org/repos/asf/nifi/blob/8e1c79ea/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow1a.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow1a.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow1a.xml index beccdcc..2896767 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow1a.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow1a.xml @@ -25,13 +25,24 @@ <id>d89ada5d-35fb-44ff-83f1-4cc00b48b2df</id> <name>GenerateFlowFile</name> <position x="0.0" y="0.0"/> - <style>processor</style> + <styles/> <comment/> <class>org.apache.nifi.processors.standard.GenerateFlowFile</class> + <bundle> + <group>org.apache.nifi</group> + <artifact>nifi-standard-nar</artifact> + <version>1.3.0-SNAPSHOT</version> + </bundle> <maxConcurrentTasks>1</maxConcurrentTasks> - <schedulingPeriod>0 s</schedulingPeriod> + <schedulingPeriod>0 sec</schedulingPeriod> + <penalizationPeriod>30 sec</penalizationPeriod> + <yieldPeriod>1 sec</yieldPeriod> + <bulletinLevel>WARN</bulletinLevel> <lossTolerant>false</lossTolerant> - <running>false</running> + <scheduledState>RUNNING</scheduledState> + <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> + <executionNode>ALL</executionNode> + <runDurationNanos>0</runDurationNanos> <property> <name>file.size</name> <value>5</value> http://git-wip-us.apache.org/repos/asf/nifi/blob/8e1c79ea/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow1b.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow1b.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow1b.xml index 19ed079..e8d95db 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow1b.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow1b.xml @@ -23,15 +23,28 @@ <comment/> <processor> <id>d89ada5d-35fb-44ff-83f1-4cc00b48b2df</id> - <name>GenerateFlowFile</name> - <position x="0.0" y="0.0"/> - <style>processor</style> - <comment/> + <name>GenerateFlowFile1</name> + <position x="0.0" y="1.0"/> + <styles> + <style name="background-color">#00ff00</style> + </styles> + <comment>this is a comment</comment> <class>org.apache.nifi.processors.standard.GenerateFlowFile</class> + <bundle> + <group>org.apache.nifi</group> + <artifact>nifi-standard-nar</artifact> + <version>1.3.0-SNAPSHOT</version> + </bundle> <maxConcurrentTasks>1</maxConcurrentTasks> - <schedulingPeriod>0 s</schedulingPeriod> + <schedulingPeriod>0 sec</schedulingPeriod> + <penalizationPeriod>30 sec</penalizationPeriod> + <yieldPeriod>1 sec</yieldPeriod> + <bulletinLevel>WARN</bulletinLevel> <lossTolerant>false</lossTolerant> - <running>false</running> + <scheduledState>DISABLED</scheduledState> + <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> + <executionNode>ALL</executionNode> + <runDurationNanos>0</runDurationNanos> <property> <name>file.size</name> <value>5</value> @@ -45,7 +58,7 @@ <style>processor</style> <comment/> <class>org.apache.nifi.processors.standard.LogAttribute</class> - <maxConcurrentTasks>10</maxConcurrentTasks> + <maxConcurrentTasks>1</maxConcurrentTasks> <schedulingPeriod>0 s</schedulingPeriod> <lossTolerant>false</lossTolerant> <running>false</running> http://git-wip-us.apache.org/repos/asf/nifi/blob/8e1c79ea/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow2.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow2.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow2.xml index 8c0e641..bab1778 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow2.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow2.xml @@ -25,13 +25,24 @@ <id>d89ada5d-35fb-44ff-83f1-4cc00b48b2dd</id> <name>GenerateFlowFile</name> <position x="0.0" y="0.0"/> - <style>processor</style> + <styles/> <comment/> <class>org.apache.nifi.processors.standard.GenerateFlowFile</class> + <bundle> + <group>org.apache.nifi</group> + <artifact>nifi-standard-nar</artifact> + <version>1.4.0-SNAPSHOT</version> + </bundle> <maxConcurrentTasks>1</maxConcurrentTasks> <schedulingPeriod>0 s</schedulingPeriod> - <lossTolerant>false</lossTolerant> - <running>false</running> + <penalizationPeriod>30 s</penalizationPeriod> + <yieldPeriod>1 s</yieldPeriod> + <bulletinLevel>ERROR</bulletinLevel> + <lossTolerant>true</lossTolerant> + <scheduledState>RUNNING</scheduledState> + <schedulingStrategy>CRON_DRIVEN</schedulingStrategy> + <executionNode>PRIMARY</executionNode> + <runDurationNanos>1</runDurationNanos> <property> <name>file.size</name> <value>5</value>