Repository: helix
Updated Branches:
  refs/heads/master 5383e7318 -> b6fd8cb83


[HELIX-675] Refactor controller start/cleanup logic to ensure monitor 
register/reset is handled in any event orders.

Due to different possible event process order, controller init event might be 
processed later or earlier than expected.
This cause inconsistency when even handler thread process and record 
information in the cluster monitor.
This change ensures cluster monitor is off when the leadership changes to other 
node. So no extra metric data will be generated.

Also upgrade related test cases to verify MBean counts.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/b6fd8cb8
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/b6fd8cb8
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/b6fd8cb8

Branch: refs/heads/master
Commit: b6fd8cb8396e1453afba404b669534883ce796a7
Parents: 5383e73
Author: jiajunwang <ericwang1...@gmail.com>
Authored: Thu Mar 8 16:51:37 2018 -0800
Committer: jiajunwang <ericwang1...@gmail.com>
Committed: Tue Mar 13 15:55:44 2018 -0700

----------------------------------------------------------------------
 .../controller/GenericHelixController.java      | 262 +++++++++----------
 .../stages/CurrentStateComputationStage.java    |  11 +-
 .../stages/ExternalViewComputeStage.java        |  55 ++--
 .../controller/stages/TaskAssignmentStage.java  |   4 +-
 .../manager/zk/DistributedLeaderElection.java   |   5 +-
 .../monitoring/mbeans/ClusterStatusMonitor.java |  37 +--
 .../org/apache/helix/task/TaskRebalancer.java   |  14 +-
 .../TestClusterEventStatusMonitor.java          |   1 +
 .../TestClusterStatusMonitorLifecycle.java      | 144 +++++-----
 .../mbeans/TestClusterStatusMonitor.java        |   1 +
 .../mbeans/TestRebalancerMetrics.java           |   8 +-
 .../mbeans/TestTopStateHandoffMetrics.java      |   4 +-
 12 files changed, 284 insertions(+), 262 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/b6fd8cb8/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 2546bd2..933aa3e 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -19,18 +19,6 @@ package org.apache.helix.controller;
  * under the License.
  */
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicReference;
-
 import org.I0Itec.zkclient.exception.ZkInterruptedException;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
@@ -38,51 +26,25 @@ import org.apache.helix.NotificationContext;
 import org.apache.helix.NotificationContext.Type;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.api.listeners.ClusterConfigChangeListener;
-import org.apache.helix.api.listeners.ControllerChangeListener;
-import org.apache.helix.api.listeners.CurrentStateChangeListener;
-import org.apache.helix.api.listeners.IdealStateChangeListener;
-import org.apache.helix.api.listeners.InstanceConfigChangeListener;
-import org.apache.helix.api.listeners.LiveInstanceChangeListener;
-import org.apache.helix.api.listeners.MessageListener;
-import org.apache.helix.api.listeners.PreFetch;
-import org.apache.helix.api.listeners.ResourceConfigChangeListener;
+import org.apache.helix.api.listeners.*;
+import org.apache.helix.common.ClusterEventBlockingQueue;
 import org.apache.helix.controller.pipeline.Pipeline;
 import org.apache.helix.controller.pipeline.PipelineRegistry;
-import org.apache.helix.controller.stages.AttributeName;
-import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.common.ClusterEventBlockingQueue;
-import org.apache.helix.controller.stages.ClusterEventType;
-import org.apache.helix.controller.stages.CompatibilityCheckStage;
-import org.apache.helix.controller.stages.CurrentStateComputationStage;
-import org.apache.helix.controller.stages.ExternalViewComputeStage;
-import org.apache.helix.controller.stages.IntermediateStateCalcStage;
-import org.apache.helix.controller.stages.MessageGenerationPhase;
-import org.apache.helix.controller.stages.MessageSelectionStage;
-import org.apache.helix.controller.stages.MessageThrottleStage;
-import org.apache.helix.controller.stages.PersistAssignmentStage;
-import org.apache.helix.controller.stages.ReadClusterDataStage;
-import org.apache.helix.controller.stages.ResourceComputationStage;
-import org.apache.helix.controller.stages.ResourceValidationStage;
-import org.apache.helix.controller.stages.TargetExteralViewCalcStage;
-import org.apache.helix.controller.stages.TaskAssignmentStage;
-import org.apache.helix.model.ClusterConfig;
-import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.MaintenanceSignal;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.PauseSignal;
-import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.controller.stages.*;
+import org.apache.helix.model.*;
 import org.apache.helix.monitoring.mbeans.ClusterEventMonitor;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.helix.task.TaskDriver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
 import static org.apache.helix.HelixConstants.ChangeType;
 
 /**
@@ -113,7 +75,10 @@ public class GenericHelixController implements 
IdealStateChangeListener,
   final AtomicReference<Map<String, LiveInstance>> _lastSeenInstances;
   final AtomicReference<Map<String, LiveInstance>> _lastSeenSessions;
 
-  ClusterStatusMonitor _clusterStatusMonitor;
+  // By default not reporting status until controller status is changed to 
activate
+  // TODO This flag should be inside ClusterStatusMonitor. When false, no 
MBean registering.
+  private boolean _isMonitoring = false;
+  private final ClusterStatusMonitor _clusterStatusMonitor;
 
   /**
    * A queue for controller events and a thread that will consume it
@@ -192,10 +157,10 @@ public class GenericHelixController implements 
IdealStateChangeListener,
 
       NotificationContext changeContext = new NotificationContext(_manager);
       changeContext.setType(NotificationContext.Type.CALLBACK);
-      ClusterEvent event = new 
ClusterEvent(_clusterName,ClusterEventType.PeriodicalRebalance);
+      ClusterEvent event = new ClusterEvent(_clusterName, 
ClusterEventType.PeriodicalRebalance);
       event.addAttribute(AttributeName.helixmanager.name(), 
changeContext.getManager());
       event.addAttribute(AttributeName.changeContext.name(), changeContext);
-      List<ZNRecord> dummy = new ArrayList<ZNRecord>();
+      List<ZNRecord> dummy = new ArrayList<>();
       event.addAttribute(AttributeName.eventData.name(), dummy);
       // Should be able to process
       _eventQueue.put(event);
@@ -311,6 +276,8 @@ public class GenericHelixController implements 
IdealStateChangeListener,
 
     initPipelines(_eventThread, _cache, false);
     initPipelines(_taskEventThread, _taskCache, true);
+
+    _clusterStatusMonitor = new ClusterStatusMonitor(_clusterName);
   }
 
   /**
@@ -331,6 +298,9 @@ public class GenericHelixController implements 
IdealStateChangeListener,
       return;
     }
 
+    // TODO If init controller with paused = true, it may not take effect 
immediately
+    // _paused is default false. If any events come before 
controllerChangeEvent, the controller
+    // will be excuting in un-paused mode. Which might not be the config in ZK.
     if (_paused) {
       logger.info("Cluster " + manager.getClusterName() + " is paused. 
Ignoring the event:" + event
           .getEventType());
@@ -342,38 +312,28 @@ public class GenericHelixController implements 
IdealStateChangeListener,
       context = event.getAttribute(AttributeName.changeContext.name());
     }
 
-    // Initialize _clusterStatusMonitor
     if (context != null) {
       if (context.getType() == Type.FINALIZE) {
         stopRebalancingTimer();
         logger.info("Get FINALIZE notification, skip the pipeline. Event :" + 
event.getEventType());
         return;
       } else {
-        synchronized (this) {
-          if (_clusterStatusMonitor == null) {
-            _clusterStatusMonitor = new 
ClusterStatusMonitor(manager.getClusterName());
-          }
-        }
-        // TODO: should be in the initization of controller.
+        // TODO: should be in the initialization of controller.
         if (_cache != null) {
           checkRebalancingTimer(manager, Collections.EMPTY_LIST, 
_cache.getClusterConfig());
         }
-
-        if (cache.isTaskCache()) {
-          TaskDriver driver = new TaskDriver(manager);
-          _clusterStatusMonitor.refreshWorkflowsStatus(driver);
-          _clusterStatusMonitor.refreshJobsStatus(driver);
+        if (_isMonitoring) {
+          event.addAttribute(AttributeName.clusterStatusMonitor.name(), 
_clusterStatusMonitor);
         }
-        event.addAttribute(AttributeName.clusterStatusMonitor.name(), 
_clusterStatusMonitor);
       }
     }
 
     // add the cache
     event.addAttribute(AttributeName.ClusterDataCache.name(), cache);
 
-    List<Pipeline> pipelines = cache.isTaskCache()
-        ? _taskRegistry.getPipelinesForEvent(event.getEventType())
-        : _registry.getPipelinesForEvent(event.getEventType());
+    List<Pipeline> pipelines = cache.isTaskCache() ?
+        _taskRegistry.getPipelinesForEvent(event.getEventType()) :
+        _registry.getPipelinesForEvent(event.getEventType());
 
     if (pipelines == null || pipelines.size() == 0) {
       logger.info(
@@ -404,40 +364,51 @@ public class GenericHelixController implements 
IdealStateChangeListener,
 
     if (!cache.isTaskCache()) {
       // report event process durations
-      if (_clusterStatusMonitor != null) {
-        NotificationContext notificationContext =
-            event.getAttribute(AttributeName.changeContext.name());
-        long enqueueTime = event.getCreationTime();
-        long zkCallbackTime;
-        StringBuilder sb = new StringBuilder();
-        if (notificationContext != null) {
-          zkCallbackTime = notificationContext.getCreationTime();
+      NotificationContext notificationContext =
+          event.getAttribute(AttributeName.changeContext.name());
+      long enqueueTime = event.getCreationTime();
+      long zkCallbackTime;
+      StringBuilder sb = new StringBuilder();
+      if (notificationContext != null) {
+        zkCallbackTime = notificationContext.getCreationTime();
+        if (_isMonitoring) {
           _clusterStatusMonitor
               
.updateClusterEventDuration(ClusterEventMonitor.PhaseName.Callback.name(),
                   enqueueTime - zkCallbackTime);
-          sb.append(String.format(
-              "Callback time for event: " + event.getEventType() + " took: " + 
(enqueueTime
-                  - zkCallbackTime) + " ms\n"));
-
         }
+        sb.append(String.format(
+            "Callback time for event: " + event.getEventType() + " took: " + 
(enqueueTime
+                - zkCallbackTime) + " ms\n"));
+      }
+      if (_isMonitoring) {
         _clusterStatusMonitor
             
.updateClusterEventDuration(ClusterEventMonitor.PhaseName.InQueue.name(),
                 startTime - enqueueTime);
         _clusterStatusMonitor
             
.updateClusterEventDuration(ClusterEventMonitor.PhaseName.TotalProcessed.name(),
                 endTime - startTime);
-        sb.append(String.format(
-            "InQueue time for event: " + event.getEventType() + " took: " + 
(startTime
-                - enqueueTime) + " ms\n"));
-        sb.append(String.format(
-            "TotalProcessed time for event: " + event.getEventType() + " took: 
" + (endTime
-                - startTime) + " ms"));
-        logger.info(sb.toString());
       }
-    }
+      sb.append(String.format(
+          "InQueue time for event: " + event.getEventType() + " took: " + 
(startTime - enqueueTime)
+              + " ms\n"));
+      sb.append(String.format(
+          "TotalProcessed time for event: " + event.getEventType() + " took: " 
+ (endTime
+              - startTime) + " ms"));
+      logger.info(sb.toString());
+    } else if (_isMonitoring) {
+      // report workflow status
+      TaskDriver driver = new TaskDriver(manager);
+      _clusterStatusMonitor.refreshWorkflowsStatus(driver);
+      _clusterStatusMonitor.refreshJobsStatus(driver);
+    }
+
+    // If event handling happens before controller deactivate, the process may 
write unnecessary
+    // MBeans to monitoring after the monitor is disabled.
+    // So reset ClusterStatusMonitor according to it's status after all event 
handling.
+    // TODO remove this once clusterStatusMonitor blocks any MBean register on 
isMonitoring = false.
+    resetClusterStatusMonitor();
   }
 
-
   @Override
   @PreFetch(enabled = false)
   public void onStateChange(String instanceName, List<CurrentState> statesInfo,
@@ -457,7 +428,8 @@ public class GenericHelixController implements 
IdealStateChangeListener,
     notifyCaches(changeContext, ChangeType.MESSAGE);
     pushToEventQueues(ClusterEventType.MessageChange, changeContext,
         Collections.<String, 
Object>singletonMap(AttributeName.instanceName.name(), instanceName));
-    if (_clusterStatusMonitor != null && messages != null) {
+
+    if (_isMonitoring && messages != null) {
       _clusterStatusMonitor.addMessageQueueSize(instanceName, messages.size());
     }
 
@@ -608,42 +580,36 @@ public class GenericHelixController implements 
IdealStateChangeListener,
   @Override
   public void onControllerChange(NotificationContext changeContext) {
     logger.info("START: GenericClusterController.onControllerChange() for 
cluster " + _clusterName);
+
     _cache.requireFullRefresh();
     _taskCache.requireFullRefresh();
-    if (changeContext != null && changeContext.getType() == Type.FINALIZE) {
-      logger.info("GenericClusterController.onControllerChange() FINALIZE for 
cluster " + _clusterName);
-      return;
-    }
-    HelixDataAccessor accessor = 
changeContext.getManager().getHelixDataAccessor();
 
-    // double check if this controller is the leader
-    Builder keyBuilder = accessor.keyBuilder();
-    LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
-    if (leader == null) {
-      logger
-          .warn("No controller exists for cluster:" + 
changeContext.getManager().getClusterName());
-      return;
-    } else {
-      String leaderName = leader.getInstanceName();
+    boolean controllerIsLeader;
 
-      String instanceName = changeContext.getManager().getInstanceName();
-      if (leaderName == null || !leaderName.equals(instanceName)) {
-        logger.warn("leader name does NOT match, my name: " + instanceName + 
", leader: " + leader);
-        return;
-      }
+    if (changeContext != null && changeContext.getType() == Type.FINALIZE) {
+      logger.info(
+          "GenericClusterController.onControllerChange() FINALIZE for cluster 
" + _clusterName);
+      controllerIsLeader = false;
+    } else {
+      // double check if this controller is the leader
+      controllerIsLeader = changeContext.getManager().isLeader();
+    }
+
+    HelixManager manager = changeContext.getManager();
+    if (controllerIsLeader) {
+      HelixDataAccessor accessor = manager.getHelixDataAccessor();
+      Builder keyBuilder = accessor.keyBuilder();
+      PauseSignal pauseSignal = accessor.getProperty(keyBuilder.pause());
+      MaintenanceSignal maintenanceSignal = 
accessor.getProperty(keyBuilder.maintenance());
+      _paused = updateControllerState(changeContext, pauseSignal, _paused);
+      _inMaintenanceMode =
+          updateControllerState(changeContext, maintenanceSignal, 
_inMaintenanceMode);
+      enableClusterStatusMonitor(true);
+      _clusterStatusMonitor.setEnabled(!_paused);
+    } else {
+      enableClusterStatusMonitor(false);
     }
 
-    PauseSignal pauseSignal = accessor.getProperty(keyBuilder.pause());
-    MaintenanceSignal maintenanceSignal = 
accessor.getProperty(keyBuilder.maintenance());
-    _paused = updateControllerState(changeContext, pauseSignal, _paused);
-    _inMaintenanceMode = updateControllerState(changeContext, 
maintenanceSignal, _inMaintenanceMode);
-
-    synchronized (this) {
-      if (_clusterStatusMonitor == null) {
-        _clusterStatusMonitor = new 
ClusterStatusMonitor(changeContext.getManager().getClusterName());
-      }
-    }
-    _clusterStatusMonitor.setEnabled(!_paused);
     logger.info("END: GenericClusterController.onControllerChange() for 
cluster " + _clusterName);
   }
 
@@ -656,7 +622,7 @@ public class GenericHelixController implements 
IdealStateChangeListener,
       NotificationContext changeContext) {
 
     // construct maps for current live-instances
-    Map<String, LiveInstance> curInstances = new HashMap<String, 
LiveInstance>();
+    Map<String, LiveInstance> curInstances = new HashMap<>();
     Map<String, LiveInstance> curSessions = new HashMap<>();
     for (LiveInstance liveInstance : liveInstances) {
       curInstances.put(liveInstance.getInstanceName(), liveInstance);
@@ -723,21 +689,51 @@ public class GenericHelixController implements 
IdealStateChangeListener,
     }
   }
 
-  public void shutdownClusterStatusMonitor(String clusterName) {
-    if (_clusterStatusMonitor != null) {
-      logger.info("Shut down _clusterStatusMonitor for cluster " + 
clusterName);
-      _clusterStatusMonitor.reset();
-      _clusterStatusMonitor = null;
-    }
-  }
-
   public void shutdown() throws InterruptedException {
     stopRebalancingTimer();
 
     terminateEventThread(_eventThread);
     terminateEventThread(_taskEventThread);
 
-    _asyncTasksThreadPool.shutdown();
+    // shutdown asycTasksThreadpool and wait for terminate.
+    _asyncTasksThreadPool.shutdownNow();
+    try {
+      _asyncTasksThreadPool.awaitTermination(EVENT_THREAD_JOIN_TIMEOUT, 
TimeUnit.MILLISECONDS);
+    } catch (InterruptedException ex) {
+      logger.warn("Timeout when terminating async tasks. Some async tasks are 
still executing.");
+    }
+
+    enableClusterStatusMonitor(false);
+
+    // TODO controller shouldn't be used in anyway after shutdown.
+    // Need to record shutdown and throw Exception if the controller is used 
again.
+  }
+
+  private void enableClusterStatusMonitor(boolean enable) {
+    synchronized (_clusterStatusMonitor) {
+      if (_isMonitoring != enable) {
+        // monitoring state changed
+        if (enable) {
+          logger.info("Enable clusterStatusMonitor for cluster " + 
_clusterName);
+          _clusterStatusMonitor.active();
+        } else {
+          logger.info("Disable clusterStatusMonitor for cluster " + 
_clusterName);
+          // Reset will be done if (_isMonitoring = false) later, no matter if 
the state is changed or not.
+        }
+        _isMonitoring = enable;
+      }
+      // Due to multithreads processing, async thread may write to monitor 
even it is closed.
+      // So when it is disabled, always try to clear the monitor.
+      resetClusterStatusMonitor();
+    }
+  }
+
+  private void resetClusterStatusMonitor() {
+    synchronized (_clusterStatusMonitor) {
+      if (!_isMonitoring) {
+        _clusterStatusMonitor.reset();
+      }
+    }
   }
 
   private void terminateEventThread(Thread thread) throws InterruptedException 
{
@@ -784,12 +780,12 @@ public class GenericHelixController implements 
IdealStateChangeListener,
       _eventBlockingQueue = eventBlockingQueue;
     }
 
-    @Override public void run() {
+    @Override
+    public void run() {
       logger.info("START ClusterEventProcessor thread  for cluster " + 
_clusterName);
       while (!isInterrupted()) {
         try {
-          ClusterEvent event = _eventBlockingQueue.take();
-          handleEvent(event, _cache);
+          handleEvent(_eventBlockingQueue.take(), _cache);
         } catch (InterruptedException e) {
           logger.warn("ClusterEventProcessor interrupted", e);
           interrupt();

http://git-wip-us.apache.org/repos/asf/helix/blob/b6fd8cb8/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 3c2c977..f8f5a2f 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -219,8 +219,9 @@ public class CurrentStateComputationStage extends 
AbstractBaseStage {
         long startTime = 
missingTopStateMap.get(resourceName).get(partitionName);
         if (startTime > 0 && System.currentTimeMillis() - startTime > 
durationThreshold) {
           missingTopStateMap.get(resourceName).put(partitionName, 
TRANSITION_FAILED);
-          clusterStatusMonitor
-              .updateMissingTopStateDurationStats(resourceName, 0L, false);
+          if (clusterStatusMonitor != null) {
+            
clusterStatusMonitor.updateMissingTopStateDurationStats(resourceName, 0L, 
false);
+          }
         }
       }
     }
@@ -294,8 +295,10 @@ public class CurrentStateComputationStage extends 
AbstractBaseStage {
     if (handOffStartTime != TRANSITION_FAILED && handOffEndTime - 
handOffStartTime <= threshold) {
       LOG.info(String.format("Missing topstate duration is %d for partition 
%s",
           handOffEndTime - handOffStartTime, partition.getPartitionName()));
-      
clusterStatusMonitor.updateMissingTopStateDurationStats(resource.getResourceName(),
-          handOffEndTime - handOffStartTime, true);
+      if (clusterStatusMonitor != null) {
+        
clusterStatusMonitor.updateMissingTopStateDurationStats(resource.getResourceName(),
+            handOffEndTime - handOffStartTime, true);
+      }
     }
     removeFromStatsMap(missingTopStateMap, resource, partition);
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/b6fd8cb8/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index 8d1a48c..9e91ba5 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -19,39 +19,20 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixDefinedState;
-import org.apache.helix.HelixManager;
-import org.apache.helix.PropertyKey;
+import org.apache.helix.*;
 import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.ZNRecordDelta;
 import org.apache.helix.ZNRecordDelta.MergeOperation;
-import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.Message;
+import org.apache.helix.model.*;
 import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
-import org.apache.helix.model.ResourceConfig;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.model.StatusUpdate;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.*;
+
 public class ExternalViewComputeStage extends AbstractBaseStage {
   private static Logger LOG = 
LoggerFactory.getLogger(ExternalViewComputeStage.class);
 
@@ -106,22 +87,24 @@ public class ExternalViewComputeStage extends 
AbstractBaseStage {
       // Update cluster status monitor mbean
       IdealState idealState = cache.getIdealState(resourceName);
       if (!cache.isTaskCache()) {
+        ResourceConfig resourceConfig = cache.getResourceConfig(resourceName);
         ClusterStatusMonitor clusterStatusMonitor =
             event.getAttribute(AttributeName.clusterStatusMonitor.name());
-        ResourceConfig resourceConfig = cache.getResourceConfig(resourceName);
-        if (idealState != null && (resourceConfig == null || !resourceConfig
-            .isMonitoringDisabled())) {
-          if (clusterStatusMonitor != null && !idealState.getStateModelDefRef()
-              
.equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
-            StateModelDefinition stateModelDef =
-                cache.getStateModelDef(idealState.getStateModelDefRef());
-            clusterStatusMonitor
-                .setResourceStatus(view, 
cache.getIdealState(view.getResourceName()),
-                    stateModelDef);
+        if (clusterStatusMonitor != null) {
+          if (idealState != null && (resourceConfig == null || !resourceConfig
+              .isMonitoringDisabled())) {
+            if (!idealState.getStateModelDefRef()
+                
.equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+              StateModelDefinition stateModelDef =
+                  cache.getStateModelDef(idealState.getStateModelDefRef());
+              clusterStatusMonitor
+                  .setResourceStatus(view, 
cache.getIdealState(view.getResourceName()),
+                      stateModelDef);
+            }
+          } else {
+            // Drop the metrics if the resource is dropped, or the 
MonitorDisabled is changed to true.
+            clusterStatusMonitor.unregisterResource(view.getResourceName());
           }
-        } else {
-          // Drop the metrics if the resource is dropped, or the 
MonitorDisabled is changed to true.
-          clusterStatusMonitor.unregisterResource(view.getResourceName());
         }
       }
       ExternalView curExtView = curExtViews.get(resourceName);

http://git-wip-us.apache.org/repos/asf/helix/blob/b6fd8cb8/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
index ed6ede2..d8ccd0f 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
@@ -77,7 +77,9 @@ public class TaskAssignmentStage extends AbstractBaseStage {
     if (!cache.isTaskCache()) {
       ClusterStatusMonitor clusterStatusMonitor =
           event.getAttribute(AttributeName.clusterStatusMonitor.name());
-      clusterStatusMonitor.increaseMessageReceived(outputMessages);
+      if (clusterStatusMonitor != null) {
+        clusterStatusMonitor.increaseMessageReceived(outputMessages);
+      }
     }
     long cacheStart = System.currentTimeMillis();
     cache.cacheMessages(outputMessages);

http://git-wip-us.apache.org/repos/asf/helix/blob/b6fd8cb8/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
 
b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
index edc467b..6022edd 100644
--- 
a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
+++ 
b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
@@ -22,7 +22,6 @@ package org.apache.helix.manager.zk;
 import java.lang.management.ManagementFactory;
 import java.util.List;
 
-import org.apache.helix.ControllerChangeListener;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixTimerTask;
@@ -30,6 +29,7 @@ import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyType;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.api.listeners.ControllerChangeListener;
 import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.model.LeaderHistory;
 import org.apache.helix.model.LiveInstance;
@@ -97,11 +97,10 @@ public class DistributedLeaderElection implements 
ControllerChangeListener {
           }
         }
       } else if 
(changeContext.getType().equals(NotificationContext.Type.FINALIZE)) {
-        LOG.info(_manager.getInstanceName() + " reqlinquish leadership for 
cluster: "
+        LOG.info(_manager.getInstanceName() + " relinquish leadership for 
cluster: "
             + _manager.getClusterName());
         controllerHelper.stopControllerTimerTasks();
         controllerHelper.removeListenersFromController(_controller);
-        _controller.shutdownClusterStatusMonitor(_manager.getClusterName());
 
         /**
          * clear write-through cache

http://git-wip-us.apache.org/repos/asf/helix/blob/b6fd8cb8/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 2a99341..fe682ac 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -85,11 +85,6 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
   public ClusterStatusMonitor(String clusterName) {
     _clusterName = clusterName;
     _beanServer = ManagementFactory.getPlatformMBeanServer();
-    try {
-      register(this, getObjectName(clusterBeanName()));
-    } catch (Exception e) {
-      LOG.error("Fail to regiter ClusterStatusMonitor", e);
-    }
   }
 
   public ObjectName getObjectName(String name) throws 
MalformedObjectNameException {
@@ -275,20 +270,21 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
   }
 
   private ClusterEventMonitor getOrCreateClusterEventMonitor(String phase) {
-    if (!_clusterEventMbeanMap.containsKey(phase)) {
-      ClusterEventMonitor monitor = new ClusterEventMonitor(this, phase);
-      try {
-        ClusterEventMonitor prevEventMbean = _clusterEventMbeanMap.put(phase, 
monitor);
-        if (prevEventMbean != null) {
-          prevEventMbean.unregister();
+    try {
+      if (!_clusterEventMbeanMap.containsKey(phase)) {
+        synchronized (this) {
+          if (!_clusterEventMbeanMap.containsKey(phase)) {
+            ClusterEventMonitor monitor = new ClusterEventMonitor(this, phase);
+            monitor.register();
+            _clusterEventMbeanMap.put(phase, monitor);
+          }
         }
-        monitor.register();
-      } catch (JMException e) {
-        LOG.error("Failed to register ClusterEventMonitorMbean for cluster " + 
_clusterName
-            + " and phase type: " + phase, e);
-        return null;
       }
+    } catch (JMException e) {
+      LOG.error("Failed to register ClusterEventMonitorMbean for cluster " + 
_clusterName
+          + " and phase type: " + phase, e);
     }
+
     return _clusterEventMbeanMap.get(phase);
   }
 
@@ -477,6 +473,15 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
     _instanceMsgQueueSizes.put(instanceName, msgQueueSize);
   }
 
+  public void active() {
+    LOG.info("Active ClusterStatusMonitor");
+    try {
+      register(this, getObjectName(clusterBeanName()));
+    } catch (Exception e) {
+      LOG.error("Fail to register ClusterStatusMonitor", e);
+    }
+  }
+
   public void reset() {
     LOG.info("Reset ClusterStatusMonitor");
     try {

http://git-wip-us.apache.org/repos/asf/helix/blob/b6fd8cb8/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 3d3f86e..19a4049 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -78,13 +78,16 @@ public abstract class TaskRebalancer implements Rebalancer, 
MappingCalculator {
         failedJobs ++;
         if (!cfg.isJobQueue() && failedJobs > cfg.getFailureThreshold()) {
           ctx.setWorkflowState(TaskState.FAILED);
-          _clusterStatusMonitor.updateWorkflowCounters(cfg, TaskState.FAILED);
+          if (_clusterStatusMonitor != null) {
+            _clusterStatusMonitor.updateWorkflowCounters(cfg, 
TaskState.FAILED);
+          }
           for (String jobToFail : cfg.getJobDag().getAllNodes()) {
             if (ctx.getJobState(jobToFail) == TaskState.IN_PROGRESS) {
               ctx.setJobState(jobToFail, TaskState.ABORTED);
               // Skip aborted jobs latency since they are not accurate latency 
for job running time
-              _clusterStatusMonitor
-                  .updateJobCounters(jobConfigMap.get(jobToFail), 
TaskState.ABORTED);
+              if (_clusterStatusMonitor != null) {
+                
_clusterStatusMonitor.updateJobCounters(jobConfigMap.get(jobToFail), 
TaskState.ABORTED);
+              }
             }
           }
           return true;
@@ -98,8 +101,9 @@ public abstract class TaskRebalancer implements Rebalancer, 
MappingCalculator {
 
     if (!incomplete && cfg.isTerminable()) {
       ctx.setWorkflowState(TaskState.COMPLETED);
-      _clusterStatusMonitor.updateWorkflowCounters(cfg, TaskState.COMPLETED,
-          ctx.getFinishTime() - ctx.getStartTime());
+      if (_clusterStatusMonitor != null) {
+        _clusterStatusMonitor.updateWorkflowCounters(cfg, TaskState.COMPLETED, 
ctx.getFinishTime() - ctx.getStartTime());
+      }
       return true;
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/b6fd8cb8/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java
 
b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java
index b607add..69b553c 100644
--- 
a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java
+++ 
b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java
@@ -47,6 +47,7 @@ public class TestClusterEventStatusMonitor {
   private class ClusterStatusMonitorForTest extends ClusterStatusMonitor {
     public ClusterStatusMonitorForTest(String clusterName) {
       super(clusterName);
+      active();
     }
     public ConcurrentHashMap<String, ClusterEventMonitor> 
getClusterEventMBean() {
       return _clusterEventMbeanMap;

http://git-wip-us.apache.org/repos/asf/helix/blob/b6fd8cb8/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
 
b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
index 3db8237..ccaba4e 100644
--- 
a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
+++ 
b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
@@ -19,14 +19,6 @@ package org.apache.helix.monitoring;
  * under the License.
  */
 
-import java.io.IOException;
-import java.util.Date;
-
-import javax.management.InstanceNotFoundException;
-import javax.management.MBeanServerConnection;
-import javax.management.MBeanServerNotification;
-import javax.management.MalformedObjectNameException;
-
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.TestHelper;
 import org.apache.helix.integration.common.ZkIntegrationTestBase;
@@ -34,7 +26,6 @@ import 
org.apache.helix.integration.manager.ClusterDistributedController;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.monitoring.mbeans.ClusterMBeanObserver;
-import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
 import 
org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
@@ -45,8 +36,16 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import javax.management.*;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Set;
+
 public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
-  private static final Logger LOG = 
LoggerFactory.getLogger(TestClusterStatusMonitorLifecycle.class);
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestClusterStatusMonitorLifecycle.class);
 
   MockParticipantManager[] _participants;
   ClusterDistributedController[] _controllers;
@@ -60,11 +59,10 @@ public class TestClusterStatusMonitorLifecycle extends 
ZkIntegrationTestBase {
   @BeforeClass
   public void beforeClass() throws Exception {
     String className = TestHelper.getTestClassName();
-    String methodName = TestHelper.getTestMethodName();
-    _clusterNamePrefix = className + "_" + methodName;
+    _clusterNamePrefix = className;
 
-    System.out.println("START " + _clusterNamePrefix + " at "
-        + new Date(System.currentTimeMillis()));
+    System.out
+        .println("START " + _clusterNamePrefix + " at " + new 
Date(System.currentTimeMillis()));
 
     // setup 10 clusters
     for (int i = 0; i < clusterNb; i++) {
@@ -83,8 +81,8 @@ public class TestClusterStatusMonitorLifecycle extends 
ZkIntegrationTestBase {
 
     // setup controller cluster
     _controllerClusterName = "CONTROLLER_" + _clusterNamePrefix;
-    TestHelper.setupCluster("CONTROLLER_" + _clusterNamePrefix, ZK_ADDR, 0, // 
controller
-                                                                            // 
port
+    TestHelper.setupCluster(_controllerClusterName, ZK_ADDR, // controller
+        0, // port
         "controller", // participant name prefix
         _clusterNamePrefix, // resource name prefix
         1, // resources
@@ -101,10 +99,9 @@ public class TestClusterStatusMonitorLifecycle extends 
ZkIntegrationTestBase {
       _controllers[i].syncStart();
     }
 
-    boolean result =
-        ClusterStateVerifier.verifyByZkCallback(
-            new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, 
_controllerClusterName),
-            30000);
+    boolean result = ClusterStateVerifier.verifyByZkCallback(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, 
_controllerClusterName),
+        30000);
     Assert.assertTrue(result, "Controller cluster NOT in ideal state");
 
     // start first cluster
@@ -116,9 +113,8 @@ public class TestClusterStatusMonitorLifecycle extends 
ZkIntegrationTestBase {
       _participants[i].syncStart();
     }
 
-    result =
-        ClusterStateVerifier.verifyByZkCallback(new 
BestPossAndExtViewZkVerifier(ZK_ADDR,
-            _firstClusterName));
+    result = ClusterStateVerifier
+        .verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, 
_firstClusterName));
     Assert.assertTrue(result, "first cluster NOT in ideal state");
 
     // add more controllers to controller cluster
@@ -135,32 +131,23 @@ public class TestClusterStatusMonitorLifecycle extends 
ZkIntegrationTestBase {
     }
 
     // verify controller cluster
-    result =
-        ClusterStateVerifier
-            .verifyByZkCallback(new 
ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
-                _controllerClusterName));
+    result = ClusterStateVerifier.verifyByZkCallback(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, 
_controllerClusterName));
     Assert.assertTrue(result, "Controller cluster NOT in ideal state");
 
     // verify first cluster
-    result =
-        ClusterStateVerifier.verifyByZkCallback(new 
BestPossAndExtViewZkVerifier(ZK_ADDR,
-            _firstClusterName));
+    result = ClusterStateVerifier
+        .verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, 
_firstClusterName));
     Assert.assertTrue(result, "first cluster NOT in ideal state");
   }
 
   @AfterClass
   public void afterClass() {
     System.out.println("Cleaning up...");
-    for (int i = 0; i < 5; i++) {
-      _controllers[i].syncStop();
-    }
-
-    for (int i = 0; i < 5; i++) {
+    for (int i = 0; i < _participants.length; i++) {
       _participants[i].syncStop();
     }
-
     System.out.println("END " + _clusterNamePrefix + " at " + new 
Date(System.currentTimeMillis()));
-
   }
 
   class ParticipantMonitorListener extends ClusterMBeanObserver {
@@ -168,8 +155,9 @@ public class TestClusterStatusMonitorLifecycle extends 
ZkIntegrationTestBase {
     int _nMbeansUnregistered = 0;
     int _nMbeansRegistered = 0;
 
-    public ParticipantMonitorListener(String domain) throws 
InstanceNotFoundException, IOException,
-        MalformedObjectNameException, NullPointerException {
+    public ParticipantMonitorListener(String domain)
+        throws InstanceNotFoundException, IOException, 
MalformedObjectNameException,
+        NullPointerException {
       super(domain);
     }
 
@@ -188,14 +176,21 @@ public class TestClusterStatusMonitorLifecycle extends 
ZkIntegrationTestBase {
     }
   }
 
-  @Test (enabled = false)
-  public void testClusterStatusMonitorLifecycle() throws 
InstanceNotFoundException,
-      MalformedObjectNameException, NullPointerException, IOException, 
InterruptedException {
-    ParticipantMonitorListener listener =
-        new 
ParticipantMonitorListener(MonitorDomainNames.ClusterStatus.name());
+  private void cleanupControllers() {
+    for (int i = 0; i < _controllers.length; i++) {
+      _controllers[i].syncStop();
+    }
+  }
 
-    int nMbeansUnregistered = listener._nMbeansUnregistered;
-    int nMbeansRegistered = listener._nMbeansRegistered;
+  @Test
+  public void testClusterStatusMonitorLifecycle()
+      throws InstanceNotFoundException, MalformedObjectNameException, 
NullPointerException,
+      IOException, InterruptedException {
+    // Filter other unrelated clusters' metrics
+    QueryExp exp =
+        Query.match(Query.attr("SensorName"), Query.value("*" + 
_clusterNamePrefix + "*"));
+    Set<ObjectInstance> mbeans = new 
HashSet<>(ManagementFactory.getPlatformMBeanServer()
+        .queryMBeans(new ObjectName("ClusterStatus:*"), exp));
 
     _participants[0].disconnect();
 
@@ -203,8 +198,10 @@ public class TestClusterStatusMonitorLifecycle extends 
ZkIntegrationTestBase {
     // No change in instance/resource mbean
     // Unregister 1 per-instance resource mbean and message queue mbean
     Thread.sleep(1000);
-    Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 
2);
-    Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered);
+    int previousMBeanCount = mbeans.size();
+    mbeans = new HashSet<>(ManagementFactory.getPlatformMBeanServer()
+        .queryMBeans(new ObjectName("ClusterStatus:*"), exp));
+    Assert.assertEquals(mbeans.size(), previousMBeanCount - 2);
 
     HelixDataAccessor accessor = _participants[n - 1].getHelixDataAccessor();
     String firstControllerName =
@@ -217,13 +214,14 @@ public class TestClusterStatusMonitorLifecycle extends 
ZkIntegrationTestBase {
       }
     }
     firstController.disconnect();
-    Thread.sleep(2000);
 
-    // 1 cluster status monitor, 1 resource monitor, 5 instances
-    // Unregister 1+4+1 per-instance resource mbean
-    // Register 4 per-instance resource mbean
-    Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 
33);
-    Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 29);
+    // 1 controller goes away
+    // 1 message queue mbean, 1 PerInstanceResource mbean, and one message 
queue mbean
+    Thread.sleep(2000);
+    previousMBeanCount = mbeans.size();
+    mbeans = new HashSet<>(ManagementFactory.getPlatformMBeanServer()
+        .queryMBeans(new ObjectName("ClusterStatus:*"), exp));
+    Assert.assertEquals(mbeans.size(), previousMBeanCount - 3);
 
     String instanceName = "localhost0_" + (12918 + 0);
     _participants[0] = new MockParticipantManager(ZK_ADDR, _firstClusterName, 
instanceName);
@@ -231,10 +229,12 @@ public class TestClusterStatusMonitorLifecycle extends 
ZkIntegrationTestBase {
 
     // 1 participant comes back
     // No change in instance/resource mbean
-    // Register 1 per-instance resource mbean
+    // Register 1 per-instance resource mbean and 1 message queue mbean
     Thread.sleep(2000);
-    Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 
33);
-    Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 31);
+    previousMBeanCount = mbeans.size();
+    mbeans = new HashSet<>(ManagementFactory.getPlatformMBeanServer()
+        .queryMBeans(new ObjectName("ClusterStatus:*"), exp));
+    Assert.assertEquals(mbeans.size(), previousMBeanCount + 2);
 
     // Add a resource
     // Register 1 resource mbean
@@ -248,15 +248,37 @@ public class TestClusterStatusMonitorLifecycle extends 
ZkIntegrationTestBase {
         Integer.parseInt(idealState.getReplicas()));
 
     Thread.sleep(2000);
-    Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 
33);
-    Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 37);
+    // Add one resource, PerInstanceResource mbeans and 1 resource monitor
+    previousMBeanCount = mbeans.size();
+    mbeans = new HashSet<>(ManagementFactory.getPlatformMBeanServer()
+        .queryMBeans(new ObjectName("ClusterStatus:*"), exp));
+    Assert.assertEquals(mbeans.size(), previousMBeanCount + 
_participants.length + 1);
 
     // Remove a resource
     // No change in instance/resource mbean
     // Unregister 5 per-instance resource mbean
     setupTool.dropResourceFromCluster(_firstClusterName, "TestDB1");
     Thread.sleep(2000);
-    Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 
39);
-    Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 37);
+    previousMBeanCount = mbeans.size();
+    mbeans = new HashSet<>(ManagementFactory.getPlatformMBeanServer()
+        .queryMBeans(new ObjectName("ClusterStatus:*"), exp));
+    Assert.assertEquals(mbeans.size(), previousMBeanCount - 
(_participants.length + 1));
+
+    // Cleanup controllers then MBeans should all be removed.
+    cleanupControllers();
+    Thread.sleep(2000);
+
+    // Check if any MBeans leftover.
+    // Note that MessageQueueStatus is not bound with controller only. So it 
will still exist.
+    exp = Query
+        .and(Query.not(Query.match(Query.attr("SensorName"), 
Query.value("MessageQueueStatus.*"))),
+            exp);
+    if (!ManagementFactory.getPlatformMBeanServer()
+        .queryMBeans(new ObjectName("ClusterStatus:*"), exp).isEmpty()) {
+      System.out.println(ManagementFactory.getPlatformMBeanServer()
+          .queryMBeans(new ObjectName("ClusterStatus:*"), exp));
+    }
+    Assert.assertTrue(ManagementFactory.getPlatformMBeanServer()
+        .queryMBeans(new ObjectName("ClusterStatus:*"), exp).isEmpty());
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/b6fd8cb8/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
index 723d969..755997b 100644
--- 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
+++ 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
@@ -54,6 +54,7 @@ public class TestClusterStatusMonitor {
     System.out.println("START " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
 
     ClusterStatusMonitor monitor = new ClusterStatusMonitor(clusterName);
+    monitor.active();
     ObjectName clusterMonitorObjName = 
monitor.getObjectName(monitor.clusterBeanName());
     try {
       _server.getMBeanInfo(clusterMonitorObjName);

http://git-wip-us.apache.org/repos/asf/helix/blob/b6fd8cb8/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java
 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java
index df88397..61acdb5 100644
--- 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java
+++ 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java
@@ -66,7 +66,9 @@ public class TestRebalancerMetrics extends BaseStageTest {
     event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
     event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), 
resourceMap);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
-    event.addAttribute(AttributeName.clusterStatusMonitor.name(), new 
ClusterStatusMonitor(_clusterName));
+    ClusterStatusMonitor monitor = new ClusterStatusMonitor(_clusterName);
+    monitor.active();
+    event.addAttribute(AttributeName.clusterStatusMonitor.name(), monitor);
 
     runStage(event, new ReadClusterDataStage());
     ClusterDataCache cache = 
event.getAttribute(AttributeName.ClusterDataCache.name());
@@ -110,7 +112,9 @@ public class TestRebalancerMetrics extends BaseStageTest {
     event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
     event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), 
resourceMap);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
-    event.addAttribute(AttributeName.clusterStatusMonitor.name(), new 
ClusterStatusMonitor(_clusterName));
+    ClusterStatusMonitor monitor = new ClusterStatusMonitor(_clusterName);
+    monitor.active();
+    event.addAttribute(AttributeName.clusterStatusMonitor.name(), monitor);
 
     runStage(event, new ReadClusterDataStage());
     runStage(event, new BestPossibleStateCalcStage());

http://git-wip-us.apache.org/repos/asf/helix/blob/b6fd8cb8/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
index 51b048d..7b97ac3 100644
--- 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
+++ 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
@@ -54,7 +54,9 @@ public class TestTopStateHandoffMetrics extends BaseStageTest 
{
     resource.setStateModelDefRef("MasterSlave");
     resource.addPartition(PARTITION);
     event.addAttribute(AttributeName.RESOURCES.name(), 
Collections.singletonMap(TEST_RESOURCE, resource));
-    event.addAttribute(AttributeName.clusterStatusMonitor.name(), new 
ClusterStatusMonitor("TestCluster"));
+    ClusterStatusMonitor monitor = new ClusterStatusMonitor("TestCluster");
+    monitor.active();
+    event.addAttribute(AttributeName.clusterStatusMonitor.name(), monitor);
   }
 
   @Test(dataProvider = "successCurrentStateInput")

Reply via email to