NIFI-5012: When connecting to cluster, esure that controller services 
appropriately enabled/disabled

This closes #2579.

Signed-off-by: Bryan Bende <bbe...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ae6ad055
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ae6ad055
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ae6ad055

Branch: refs/heads/HDF-3.1-maint
Commit: ae6ad0556f78efa649485ce3da52d642976614fb
Parents: 79e4402
Author: Mark Payne <marka...@hotmail.com>
Authored: Fri Mar 23 15:44:06 2018 -0400
Committer: Matt Gilman <matt.c.gil...@gmail.com>
Committed: Tue Apr 3 15:51:11 2018 -0400

----------------------------------------------------------------------
 .../controller/StandardFlowSynchronizer.java    | 44 ++++++++++++++++++++
 1 file changed, 44 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/ae6ad055/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index 425110c..8a79acf 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -73,6 +73,7 @@ import 
org.apache.nifi.controller.serialization.FlowSynchronizer;
 import org.apache.nifi.controller.serialization.StandardFlowSerializer;
 import org.apache.nifi.controller.service.ControllerServiceLoader;
 import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.events.BulletinFactory;
 import org.apache.nifi.fingerprint.FingerprintException;
@@ -749,6 +750,19 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
         }
     }
 
+    private ControllerServiceState getFinalTransitionState(final 
ControllerServiceState state) {
+        switch (state) {
+            case DISABLED:
+            case DISABLING:
+                return ControllerServiceState.DISABLED;
+            case ENABLED:
+            case ENABLING:
+                return ControllerServiceState.ENABLED;
+            default:
+                throw new AssertionError();
+        }
+    }
+
     private ProcessGroup updateProcessGroup(final FlowController controller, 
final ProcessGroup parentGroup, final Element processGroupElement,
             final StringEncryptor encryptor, final FlowEncodingVersion 
encodingVersion) throws ProcessorInstantiationException {
 
@@ -779,6 +793,36 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
         // get the real process group and ID
         final ProcessGroup processGroup = 
controller.getGroup(processGroupDto.getId());
 
+        // determine the scheduled state of all of the Controller Service
+        final List<Element> controllerServiceNodeList = 
getChildrenByTagName(processGroupElement, "controllerService");
+        final Set<ControllerServiceNode> toDisable = new HashSet<>();
+        final Set<ControllerServiceNode> toEnable = new HashSet<>();
+
+        for (final Element serviceElement : controllerServiceNodeList) {
+            final ControllerServiceDTO dto = 
FlowFromDOMFactory.getControllerService(serviceElement, encryptor);
+            final ControllerServiceNode serviceNode = 
processGroup.getControllerService(dto.getId());
+
+            // Check if the controller service is in the correct state. We 
consider it the correct state if
+            // we are in a transitional state and heading in the right 
direction or already in the correct state.
+            // E.g., it is the correct state if it should be 'DISABLED' and it 
is either DISABLED or DISABLING.
+            final ControllerServiceState serviceState = 
getFinalTransitionState(serviceNode.getState());
+            final ControllerServiceState clusterState = 
getFinalTransitionState(ControllerServiceState.valueOf(dto.getState()));
+
+            if (serviceState != clusterState) {
+                switch (clusterState) {
+                    case DISABLED:
+                        toDisable.add(serviceNode);
+                        break;
+                    case ENABLED:
+                        toEnable.add(serviceNode);
+                        break;
+                }
+            }
+        }
+
+        controller.disableControllerServicesAsync(toDisable);
+        controller.enableControllerServices(toEnable);
+
         // processors & ports cannot be updated - they must be the same. 
Except for the scheduled state.
         final List<Element> processorNodeList = 
getChildrenByTagName(processGroupElement, "processor");
         for (final Element processorElement : processorNodeList) {

Reply via email to