NIFI-5012: When connecting to cluster, esure that controller services appropriately enabled/disabled
This closes #2579. Signed-off-by: Bryan Bende <bbe...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ae6ad055 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ae6ad055 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ae6ad055 Branch: refs/heads/HDF-3.1-maint Commit: ae6ad0556f78efa649485ce3da52d642976614fb Parents: 79e4402 Author: Mark Payne <marka...@hotmail.com> Authored: Fri Mar 23 15:44:06 2018 -0400 Committer: Matt Gilman <matt.c.gil...@gmail.com> Committed: Tue Apr 3 15:51:11 2018 -0400 ---------------------------------------------------------------------- .../controller/StandardFlowSynchronizer.java | 44 ++++++++++++++++++++ 1 file changed, 44 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/ae6ad055/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 425110c..8a79acf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -73,6 +73,7 @@ import org.apache.nifi.controller.serialization.FlowSynchronizer; import org.apache.nifi.controller.serialization.StandardFlowSerializer; import org.apache.nifi.controller.service.ControllerServiceLoader; import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.events.BulletinFactory; import org.apache.nifi.fingerprint.FingerprintException; @@ -749,6 +750,19 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } } + private ControllerServiceState getFinalTransitionState(final ControllerServiceState state) { + switch (state) { + case DISABLED: + case DISABLING: + return ControllerServiceState.DISABLED; + case ENABLED: + case ENABLING: + return ControllerServiceState.ENABLED; + default: + throw new AssertionError(); + } + } + private ProcessGroup updateProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement, final StringEncryptor encryptor, final FlowEncodingVersion encodingVersion) throws ProcessorInstantiationException { @@ -779,6 +793,36 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { // get the real process group and ID final ProcessGroup processGroup = controller.getGroup(processGroupDto.getId()); + // determine the scheduled state of all of the Controller Service + final List<Element> controllerServiceNodeList = getChildrenByTagName(processGroupElement, "controllerService"); + final Set<ControllerServiceNode> toDisable = new HashSet<>(); + final Set<ControllerServiceNode> toEnable = new HashSet<>(); + + for (final Element serviceElement : controllerServiceNodeList) { + final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(serviceElement, encryptor); + final ControllerServiceNode serviceNode = processGroup.getControllerService(dto.getId()); + + // Check if the controller service is in the correct state. We consider it the correct state if + // we are in a transitional state and heading in the right direction or already in the correct state. + // E.g., it is the correct state if it should be 'DISABLED' and it is either DISABLED or DISABLING. + final ControllerServiceState serviceState = getFinalTransitionState(serviceNode.getState()); + final ControllerServiceState clusterState = getFinalTransitionState(ControllerServiceState.valueOf(dto.getState())); + + if (serviceState != clusterState) { + switch (clusterState) { + case DISABLED: + toDisable.add(serviceNode); + break; + case ENABLED: + toEnable.add(serviceNode); + break; + } + } + } + + controller.disableControllerServicesAsync(toDisable); + controller.enableControllerServices(toEnable); + // processors & ports cannot be updated - they must be the same. Except for the scheduled state. final List<Element> processorNodeList = getChildrenByTagName(processGroupElement, "processor"); for (final Element processorElement : processorNodeList) {